- Published on
Elasticsearch 最佳实践 | Best Practices
- Authors
- Name
- Shelton Ma
最佳实践
0. 索引创建
- ES 索引做好预定义
Index Mapping
,避免动态映射 - 数据清洗, 如果数据格式不固定,可以使用:
- Logstash 或 Elasticsearch Ingest Pipeline 进行格式转换.
- JSON Schema 验证,在 Node.js 端提前检查数据格式,避免错误写入.
1. 每日索引创建
根据数据查询特征, 按日期划分索引: logs-YYYY-MM-DD
,查询更快
默认给整个ES索引配置按天切分, 这样将日志切分交给ES底层, 像 对于 Node.js 项目的日志, 比如使用 Pino-elasticsearch, 不需要再关注相关配置
- 在 Elasticsearch(或 OpenSearch)中,索引别名(Alias) 是一个指向实际索引的逻辑名称,可以帮助:
- 自动切换索引(比如 logs 指向 logs-2024.05.28).
- 统一查询多个索引(所有 logs-* 索引可以用 logs 访问).
- 滚动索引(Rolling Index),无需修改应用代码.
- 使用方法
使用 Index Template 自动创建别名
PUT _index_template/logs-template { "index_patterns": ["logs-*"], "template": { "aliases": { "logs": {} // 所有匹配 "logs-*" 规则的索引,都自动加上 "logs" 别名 } } }
手动批量给已有索引加别名
POST _aliases { "actions": [ { "add": { "index": "logs-2025.03.26", "alias": "logs" }}, { "add": { "index": "logs-2025.03.27", "alias": "logs" }}, { "add": { "index": "logs-2025.03.28", "alias": "logs" }} ] }
每天使用 alias 进行滚动索引(Rolling Index), 这样每天切换新的索引,但 logs 别名始终指向最新的索引
POST _aliases { "actions": [ { "remove": { "index": "logs-2024.05.27", "alias": "logs" }}, { "add": { "index": "logs-2024.05.28", "alias": "logs" }} ] }
2. 写入优化
- 批量写入
bulk()
, 一次 1000 条,减少 ES 压力 - 队列缓冲, 先存入内存队列,满了再写入
- 异步写入
setInterval()
定时提交,减少请求
3. 失败重试
失败日志存入 Kafka, ES 宕机时,日志不会丢失,稍后可以从 Kafka 重新消费数据
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ clientId: 'log-service', brokers: ['kafka:9092'] });
const producer = kafka.producer();
await producer.connect();
async function flushQueue() {
if (queue.length === 0) return;
const batch = queue.splice(0, BATCH_SIZE);
const body = batch.flatMap(doc => [{ index: {_index: 'logs' } }, doc]);
try {
await esClient.bulk({ body, refresh: false });
} catch (error) {
console.error('ES 写入失败,存入 Kafka:', error);
await producer.send({
topic: 'failed-logs',
messages: batch.map(log => ({ value: JSON.stringify(log) })),
});
}
}
4. 监控 Elasticsearch 的写入性能,避免日志堆积
可以用 Elasticsearch 自带 API + Prometheus + Grafana 进行监控.
索引写入速率 (indexing rate) 每秒写入文档数
_cat/thread_pool API
写入队列长度 (bulk queue size) 如果队列满了,说明写入慢
_cat/thread_pool API
curl -XGET "http://localhost:9200/_cat/thread_pool/bulk?v&h=node_name,queue,rejected" # 返回 node_name queue rejected es-node-1 0 0 es-node-2 50 100 # queue → 当前 bulk 队列长度(如果大于 0,说明正在排队) # rejected → 拒绝的 bulk 请求数(如果大于 0,说明 ES 处理不过来) # queue > 100 → ES 处理不过来,可以加节点或减少 bulk 频率 # rejected > 0 → 日志可能丢失,需要 Kafka 缓冲
写入耗时 (refresh time) 反映磁盘 IO 压力
_cat/indices?v API
curl -XGET "http://localhost:9200/_cat/indices?v&s=index" health status index docs.count store.size pri rep green open logs-2024-03-28 1000000 2gb 1 1 # docs.count → 文档数量(增长速度是否正常) # store.size → 索引大小(是否超出磁盘容量) # 写入速率低 → 可能是 bulk 过大,调小批量大小 # 索引大小太大 → 需要定期归档到 S3(ILM 策略)
磁盘使用率 超过 80% 可能导致写入变慢
_cat/allocation?v API
JVM 内存占用 超过 75% 时 GC 频繁,写入变慢
_cat/nodes?v API
用
_nodes/stats
监控写入耗时curl -XGET "http://localhost:9200/_nodes/stats/indices" { "indices": { "indexing": { "index_total": 5000000, "index_time_in_millis": 120000, "throttle_time_in_millis": 5000 } } } # 分析指标: # index_total → 总写入文档数 # index_time_in_millis → 写入总耗时 # throttle_time_in_millis → ES 限流时间(如果 > 0,说明写入太快被限流) # 如何处理? # throttle_time_in_millis > 0 → 写入速度过快,降低批量大小 # index_time_in_millis 持续增长 → 磁盘 IO 可能是瓶颈,需要加 SSD
5. 设置自动限流调整
import { Client } from '@elastic/elasticsearch';
const esClient = new Client({ node: 'http://localhost:9200' });
let bulkInterval = 5000; // 默认 5 秒批量提交
const MIN_INTERVAL = 1000; // 最小 1 秒
const MAX_INTERVAL = 10000; // 最大 10 秒
async function adjustBulkInterval() {
const { body: stats } = await esClient.cat.thread_pool({ format: 'json', h: 'queue,rejected' });
const queueSize = parseInt(stats[0]?.queue || '0', 10);
const rejectedCount = parseInt(stats[0]?.rejected || '0', 10);
if (queueSize > 100 || rejectedCount > 0) {
bulkInterval = Math.min(bulkInterval + 1000, MAX_INTERVAL); // 增加间隔,降低压力
} else {
bulkInterval = Math.max(bulkInterval - 1000, MIN_INTERVAL); // 降低间隔,提高写入速度
}
}
// 每 10 秒检查一次
setInterval(adjustBulkInterval, 10000);