Published on

Elasticsearch 最佳实践 | Best Practices

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

最佳实践

0. 索引创建

  1. ES 索引做好预定义 Index Mapping,避免动态映射
  2. 数据清洗, 如果数据格式不固定,可以使用:
    • Logstash 或 Elasticsearch Ingest Pipeline 进行格式转换.
    • JSON Schema 验证,在 Node.js 端提前检查数据格式,避免错误写入.

1. 每日索引创建

根据数据查询特征, 按日期划分索引: logs-YYYY-MM-DD,查询更快

默认给整个ES索引配置按天切分, 这样将日志切分交给ES底层, 像 对于 Node.js 项目的日志, 比如使用 Pino-elasticsearch, 不需要再关注相关配置

  1. 在 Elasticsearch(或 OpenSearch)中,索引别名(Alias) 是一个指向实际索引的逻辑名称,可以帮助:
    • 自动切换索引(比如 logs 指向 logs-2024.05.28).
    • 统一查询多个索引(所有 logs-* 索引可以用 logs 访问).
    • 滚动索引(Rolling Index),无需修改应用代码.
  2. 使用方法
    • 使用 Index Template 自动创建别名

      PUT _index_template/logs-template
      {
        "index_patterns": ["logs-*"], 
        "template": {
          "aliases": {
            "logs": {}  // 所有匹配 "logs-*" 规则的索引,都自动加上 "logs" 别名
          }
        }
      }
      
    • 手动批量给已有索引加别名

      POST _aliases
      {
        "actions": [
          { "add": { "index": "logs-2025.03.26", "alias": "logs" }},
          { "add": { "index": "logs-2025.03.27", "alias": "logs" }},
          { "add": { "index": "logs-2025.03.28", "alias": "logs" }}
        ]
      }
      
    • 每天使用 alias 进行滚动索引(Rolling Index), 这样每天切换新的索引,但 logs 别名始终指向最新的索引

      POST _aliases
      {
        "actions": [
          { "remove": { "index": "logs-2024.05.27", "alias": "logs" }},
          { "add": { "index": "logs-2024.05.28", "alias": "logs" }}
        ]
      }
      

2. 写入优化

  1. 批量写入 bulk(), 一次 1000 条,减少 ES 压力
  2. 队列缓冲, 先存入内存队列,满了再写入
  3. 异步写入 setInterval() 定时提交,减少请求

3. 失败重试

失败日志存入 Kafka, ES 宕机时,日志不会丢失,稍后可以从 Kafka 重新消费数据

import { Kafka } from 'kafkajs';

const kafka = new Kafka({ clientId: 'log-service', brokers: ['kafka:9092'] });
const producer = kafka.producer();
await producer.connect();

async function flushQueue() {
  if (queue.length === 0) return;
  const batch = queue.splice(0, BATCH_SIZE);
  const body = batch.flatMap(doc => [{ index: {_index: 'logs' } }, doc]);

  try {
    await esClient.bulk({ body, refresh: false });
  } catch (error) {
    console.error('ES 写入失败,存入 Kafka:', error);
    await producer.send({
      topic: 'failed-logs',
      messages: batch.map(log => ({ value: JSON.stringify(log) })),
    });
  }
}

4. 监控 Elasticsearch 的写入性能,避免日志堆积

可以用 Elasticsearch 自带 API + Prometheus + Grafana 进行监控.

  1. 索引写入速率 (indexing rate) 每秒写入文档数 _cat/thread_pool API

  2. 写入队列长度 (bulk queue size) 如果队列满了,说明写入慢 _cat/thread_pool API

    curl -XGET "http://localhost:9200/_cat/thread_pool/bulk?v&h=node_name,queue,rejected"
    
    # 返回
    node_name  queue  rejected
    es-node-1     0        0
    es-node-2    50      100
    
    # queue → 当前 bulk 队列长度(如果大于 0,说明正在排队)
    # rejected → 拒绝的 bulk 请求数(如果大于 0,说明 ES 处理不过来)
    # queue > 100 → ES 处理不过来,可以加节点或减少 bulk 频率
    # rejected > 0 → 日志可能丢失,需要 Kafka 缓冲
    
  3. 写入耗时 (refresh time) 反映磁盘 IO 压力 _cat/indices?v API

    curl -XGET "http://localhost:9200/_cat/indices?v&s=index"
    
    health status index             docs.count store.size pri rep
    green  open   logs-2024-03-28   1000000    2gb        1   1
    
    # docs.count → 文档数量(增长速度是否正常)
    # store.size → 索引大小(是否超出磁盘容量)
    # 写入速率低 → 可能是 bulk 过大,调小批量大小
    # 索引大小太大 → 需要定期归档到 S3(ILM 策略)
    
  4. 磁盘使用率 超过 80% 可能导致写入变慢 _cat/allocation?v API

  5. JVM 内存占用 超过 75% 时 GC 频繁,写入变慢 _cat/nodes?v API

  6. _nodes/stats 监控写入耗时

    curl -XGET "http://localhost:9200/_nodes/stats/indices"
    {
      "indices": {
        "indexing": {
          "index_total": 5000000,
          "index_time_in_millis": 120000,
          "throttle_time_in_millis": 5000
        }
      }
    }
    
    # 分析指标:
    # index_total → 总写入文档数
    # index_time_in_millis → 写入总耗时
    # throttle_time_in_millis → ES 限流时间(如果 > 0,说明写入太快被限流)
    # 如何处理?
    # throttle_time_in_millis > 0 → 写入速度过快,降低批量大小
    # index_time_in_millis 持续增长 → 磁盘 IO 可能是瓶颈,需要加 SSD
    

5. 设置自动限流调整

import { Client } from '@elastic/elasticsearch';

const esClient = new Client({ node: 'http://localhost:9200' });

let bulkInterval = 5000; // 默认 5 秒批量提交
const MIN_INTERVAL = 1000; // 最小 1 秒
const MAX_INTERVAL = 10000; // 最大 10 秒

async function adjustBulkInterval() {
  const { body: stats } = await esClient.cat.thread_pool({ format: 'json', h: 'queue,rejected' });
  const queueSize = parseInt(stats[0]?.queue || '0', 10);
  const rejectedCount = parseInt(stats[0]?.rejected || '0', 10);

  if (queueSize > 100 || rejectedCount > 0) {
    bulkInterval = Math.min(bulkInterval + 1000, MAX_INTERVAL); // 增加间隔,降低压力
  } else {
    bulkInterval = Math.max(bulkInterval - 1000, MIN_INTERVAL); // 降低间隔,提高写入速度
  }
}

// 每 10 秒检查一次
setInterval(adjustBulkInterval, 10000);

6. 查询最佳实践, 参考: 在 Elasticsearch (ES) 进行查询

2. Opensearch 参考: 使用 AWS OpenSearch Service(托管版的 Elasticsearch)