- Published on
发送数据到 Elasticsearch / Opensearch
- Authors
- Name
- Shelton Ma
1. 发送数据到 Elasticsearch
1. 使用 Node.js 发送数据
安装依赖
npm install @elastic/elasticsearch
创建 Elasticsearch 客户端
import { Client } from '@elastic/elasticsearch'; // 创建 ES 客户端 const esClient = new Client({ node: '<http://localhost:9200>', // ES 服务器地址 auth: { username: 'elastic', // 替换为你的用户名 password: 'your-password', // 替换为你的密码 }, });
实时插入(单条数据)
/** * 发送单条数据到 Elasticsearch * @param {string} index - 目标索引 * @param {Object} data - 需要存储的 JSON 数据 */ async function sendToElasticsearch(index, data) { try { const response = await esClient.index({ index, body: data, }); console.log('数据发送成功:', response); return response; } catch (error) { console.error('发送数据失败:', error); throw error; } } sendToElasticsearch('my-index', { user: 'Alice', message: 'Hello Elasticsearch!', timestamp: new Date().toISOString(), });
批量插入(高效写入)
/** * 批量插入数据到 Elasticsearch * @param {string} index - 目标索引 * @param {Array<Object>} documents - 需要存储的 JSON 数据数组 */ async function bulkInsert(index, documents) { if (!documents.length) return; const body = documents.flatMap(doc => [{ index: { _index: index } }, doc]); try { const response = await esClient.bulk({ body }); console.log(`成功批量插入 ${documents.length} 条数据`); return response; } catch (error) { console.error('批量插入失败:', error); throw error; } } bulkInsert('my-index', [ { user: 'Bob', message: 'Message 1', timestamp: new Date().toISOString() }, { user: 'Charlie', message: 'Message 2', timestamp: new Date().toISOString() }, { user: 'Dave', message: 'Message 3', timestamp: new Date().toISOString() }, ]);
自动批量提交
const queue = []; const FLUSH_INTERVAL = 5000; // 每 5 秒提交一次 const BATCH_SIZE = 1000; // 每次最多 1000 条 function addToQueue(data) { queue.push(data); if (queue.length >= BATCH_SIZE) { flushQueue(); } } async function flushQueue() { if (queue.length === 0) return; const batch = queue.splice(0, BATCH_SIZE); await bulkInsert('my-index', batch); } // 定期刷新队列 setInterval(flushQueue, FLUSH_INTERVAL); // ✅ 示例:自动写入数据 addToQueue({ user: 'Eve', message: 'Hello from queue', timestamp: new Date().toISOString() });
2. 在处理高并发日志系统, 终极高效方案
- 日志进入队列(addToQueue())
- 队列满了就批量提交(1000 条一批)
- 定时器每 5 秒强制提交(避免数据积压)