- Published on
在 Express 项目中使用 Kafka 处理数据导出任务
- Authors
- Name
- Shelton Ma
1. 整体架构
- 用户提交导出任务 → Express API 将任务信息存入数据库,并发送 Kafka 消息
- Kafka Producer 发送任务消息 → Express API 作为 Producer,将任务消息推送到 Kafka
- Kafka Consumer 处理导出任务 → 独立的 Worker 进程监听 Kafka,读取数据库数据并生成 .csv.gz 文件
- 上传到 S3 并校验文件 → Worker 进程上传文件到 S3,检查文件是否存在
- 任务完成后发送邮件 → 任务成功后,发送通知邮件
2. 任务实现
1. Express API:提交导出任务并推送 Kafka
安装依赖
npm install kafkajs fast-csv zlib aws-sdk nodemailer
Kafka Producer
import { Kafka } from 'kafkajs'; import { v4 as uuidv4 } from 'uuid'; const kafka = new Kafka({ brokers: ['localhost:9092'] }); const producer = kafka.producer(); await producer.connect(); export async function exportData(userEmail: string) { const taskId = uuidv4(); // 生成任务 ID const message = { taskId, userEmail }; // 发送消息到 Kafka await producer.send({ topic: 'export_task', messages: [{ value: JSON.stringify(message) }], }); return taskId; }
Express 路由
import express from 'express'; import { exportData } from './kafkaProducer'; const app = express(); app.use(express.json()); app.post('/export', async (req, res) => { const { email } = req.body; const taskId = await exportData(email); res.json({ message: 'Task submitted', taskId }); }); app.listen(3000, () => console.log('Server is running on port 3000'));
2. Kafka Consumer:处理导出任务
消费者进程监听 Kafka,读取数据、生成 .csv.gz 文件并上传 S3.
Kafka Consumer
import { Kafka } from 'kafkajs'; import { exportAndUploadToS3 } from './exportService'; const kafka = new Kafka({ brokers: ['localhost:9092'] }); const consumer = kafka.consumer({ groupId: 'export_group' }); await consumer.connect(); await consumer.subscribe({ topic: 'export_task', fromBeginning: false }); await consumer.run({ eachMessage: async ({ message }) => { const { taskId, userEmail } = JSON.parse(message.value.toString()); await exportAndUploadToS3(taskId, userEmail); }, });
3. 处理 CSV 生成、压缩、上传 S3
在 exportService.ts 处理 CSV 导出、压缩 .gz 并上传到 S3.
导出数据并上传
import fs from 'fs'; import path from 'path'; import zlib from 'zlib'; import AWS from 'aws-sdk'; import { createObjectCsvWriter } from 'csv-writer'; import nodemailer from 'nodemailer'; // 配置 S3 const s3 = new AWS.S3({ accessKeyId: process.env.AWS_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, region: 'us-east-1', }); // 生成 CSV 并压缩 async function generateCsv(taskId: string): Promise<string> { const filePath = path.join(__dirname, `${taskId}.csv`); const zipPath = `${filePath}.gz`; const csvWriter = createObjectCsvWriter({ path: filePath, header: [{ id: 'id', title: 'ID' }, { id: 'name', title: 'Name' }] }); const data = [ { id: 1, name: 'Alice' }, { id: 2, name: 'Bob' } ]; await csvWriter.writeRecords(data); return new Promise((resolve, reject) => { const input = fs.createReadStream(filePath); const output = fs.createWriteStream(zipPath); const gzip = zlib.createGzip(); input.pipe(gzip).pipe(output).on('finish', () => { fs.unlinkSync(filePath); // 删除原始 CSV resolve(zipPath); }).on('error', reject); }); } // 上传到 S3 async function uploadToS3(filePath: string, taskId: string): Promise<string> { const fileContent = fs.readFileSync(filePath); const s3Key = `exports/${taskId}.csv.gz`; await s3.upload({ Bucket: 'your-s3-bucket', Key: s3Key, Body: fileContent, ContentType: 'application/gzip' }).promise(); return s3Key; } // 发送邮件 async function sendEmail(userEmail: string, downloadLink: string) { const transporter = nodemailer.createTransport({ service: 'gmail', auth: { user: process.env.EMAIL_USER, pass: process.env.EMAIL_PASS } }); await transporter.sendMail({ from: process.env.EMAIL_USER, to: userEmail, subject: 'Your Export is Ready', text: `Your data export is ready. Download it here: ${downloadLink}` }); } // 任务执行逻辑 export async function exportAndUploadToS3(taskId: string, userEmail: string) { try { const zipPath = await generateCsv(taskId); const s3Key = await uploadToS3(zipPath, taskId); // 检查文件是否成功上传 const headParams = { Bucket: 'your-s3-bucket', Key: s3Key }; await s3.headObject(headParams).promise(); // 发送邮件 const downloadLink = `https://your-s3-bucket.s3.amazonaws.com/${s3Key}`; await sendEmail(userEmail, downloadLink); console.log(`Export ${taskId} completed.`); } catch (error) { console.error(`Error processing task ${taskId}:`, error); } }