Published on

发送数据到 Elasticsearch / Opensearch

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

1. 发送数据到 Elasticsearch

1. 使用 Node.js 发送数据

  1. 安装依赖

    npm install @elastic/elasticsearch
    
  2. 创建 Elasticsearch 客户端

    import { Client } from '@elastic/elasticsearch';
    
    // 创建 ES 客户端
    const esClient = new Client({
      node: '<http://localhost:9200>', // ES 服务器地址
      auth: {
        username: 'elastic',  // 替换为你的用户名
        password: 'your-password', // 替换为你的密码
      },
    });
    
  3. 实时插入(单条数据)

    /**
     * 发送单条数据到 Elasticsearch
    * @param {string} index - 目标索引
    * @param {Object} data - 需要存储的 JSON 数据
    */
    async function sendToElasticsearch(index, data) {
      try {
        const response = await esClient.index({
          index,
          body: data,
        });
        console.log('数据发送成功:', response);
        return response;
      } catch (error) {
        console.error('发送数据失败:', error);
        throw error;
      }
    }
    
    sendToElasticsearch('my-index', {
      user: 'Alice',
      message: 'Hello Elasticsearch!',
      timestamp: new Date().toISOString(),
    });
    
  4. 批量插入(高效写入)

    /**
    * 批量插入数据到 Elasticsearch
    * @param {string} index - 目标索引
    * @param {Array<Object>} documents - 需要存储的 JSON 数据数组
    */
    async function bulkInsert(index, documents) {
      if (!documents.length) return;
    
      const body = documents.flatMap(doc => [{ index: { _index: index } }, doc]);
    
      try {
        const response = await esClient.bulk({ body });
        console.log(`成功批量插入 ${documents.length} 条数据`);
        return response;
      } catch (error) {
        console.error('批量插入失败:', error);
        throw error;
      }
    }
    
    bulkInsert('my-index', [
      { user: 'Bob', message: 'Message 1', timestamp: new Date().toISOString() },
      { user: 'Charlie', message: 'Message 2', timestamp: new Date().toISOString() },
      { user: 'Dave', message: 'Message 3', timestamp: new Date().toISOString() },
    ]);
    
  5. 自动批量提交

    const queue = [];
    const FLUSH_INTERVAL = 5000; // 每 5 秒提交一次
    const BATCH_SIZE = 1000; // 每次最多 1000 条
    
    function addToQueue(data) {
      queue.push(data);
      if (queue.length >= BATCH_SIZE) {
        flushQueue();
      }
    }
    
    async function flushQueue() {
      if (queue.length === 0) return;
      const batch = queue.splice(0, BATCH_SIZE);
      await bulkInsert('my-index', batch);
    }
    
    // 定期刷新队列
    setInterval(flushQueue, FLUSH_INTERVAL);
    
    // ✅ 示例:自动写入数据
    addToQueue({ user: 'Eve', message: 'Hello from queue', timestamp: new Date().toISOString() });
    

2. 在处理高并发日志系统, 终极高效方案

  1. 日志进入队列(addToQueue())
  2. 队列满了就批量提交(1000 条一批)
  3. 定时器每 5 秒强制提交(避免数据积压)