Published on

在 Express 项目中使用 Kafka 处理数据导出任务

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

1. 整体架构

  1. 用户提交导出任务 → Express API 将任务信息存入数据库,并发送 Kafka 消息
  2. Kafka Producer 发送任务消息 → Express API 作为 Producer,将任务消息推送到 Kafka
  3. Kafka Consumer 处理导出任务 → 独立的 Worker 进程监听 Kafka,读取数据库数据并生成 .csv.gz 文件
  4. 上传到 S3 并校验文件 → Worker 进程上传文件到 S3,检查文件是否存在
  5. 任务完成后发送邮件 → 任务成功后,发送通知邮件

2. 任务实现

1. Express API:提交导出任务并推送 Kafka

  1. 安装依赖

    npm install kafkajs fast-csv zlib aws-sdk nodemailer
    
  2. Kafka Producer

    import { Kafka } from 'kafkajs';
    import { v4 as uuidv4 } from 'uuid';
    
    const kafka = new Kafka({ brokers: ['localhost:9092'] });
    const producer = kafka.producer();
    
    await producer.connect();
    
    export async function exportData(userEmail: string) {
        const taskId = uuidv4();  // 生成任务 ID
        const message = { taskId, userEmail };
    
        // 发送消息到 Kafka
        await producer.send({
            topic: 'export_task',
            messages: [{ value: JSON.stringify(message) }],
        });
    
        return taskId;
    }
    
  3. Express 路由

    import express from 'express';
    import { exportData } from './kafkaProducer';
    
    const app = express();
    app.use(express.json());
    
    app.post('/export', async (req, res) => {
        const { email } = req.body;
        const taskId = await exportData(email);
        res.json({ message: 'Task submitted', taskId });
    });
    
    app.listen(3000, () => console.log('Server is running on port 3000'));
    

2. Kafka Consumer:处理导出任务

消费者进程监听 Kafka,读取数据、生成 .csv.gz 文件并上传 S3.

  1. Kafka Consumer

    import { Kafka } from 'kafkajs';
    import { exportAndUploadToS3 } from './exportService';
    
    const kafka = new Kafka({ brokers: ['localhost:9092'] });
    const consumer = kafka.consumer({ groupId: 'export_group' });
    
    await consumer.connect();
    await consumer.subscribe({ topic: 'export_task', fromBeginning: false });
    
    await consumer.run({
        eachMessage: async ({ message }) => {
            const { taskId, userEmail } = JSON.parse(message.value.toString());
            await exportAndUploadToS3(taskId, userEmail);
        },
    });
    

3. 处理 CSV 生成、压缩、上传 S3

在 exportService.ts 处理 CSV 导出、压缩 .gz 并上传到 S3.

  1. 导出数据并上传

    import fs from 'fs';
    import path from 'path';
    import zlib from 'zlib';
    import AWS from 'aws-sdk';
    import { createObjectCsvWriter } from 'csv-writer';
    import nodemailer from 'nodemailer';
    
    // 配置 S3
    const s3 = new AWS.S3({
        accessKeyId: process.env.AWS_ACCESS_KEY_ID,
        secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
        region: 'us-east-1',
    });
    
    // 生成 CSV 并压缩
    async function generateCsv(taskId: string): Promise<string> {
        const filePath = path.join(__dirname, `${taskId}.csv`);
        const zipPath = `${filePath}.gz`;
    
        const csvWriter = createObjectCsvWriter({
            path: filePath,
            header: [{ id: 'id', title: 'ID' }, { id: 'name', title: 'Name' }]
        });
    
        const data = [
            { id: 1, name: 'Alice' },
            { id: 2, name: 'Bob' }
        ];
    
        await csvWriter.writeRecords(data);
    
        return new Promise((resolve, reject) => {
            const input = fs.createReadStream(filePath);
            const output = fs.createWriteStream(zipPath);
            const gzip = zlib.createGzip();
    
            input.pipe(gzip).pipe(output).on('finish', () => {
                fs.unlinkSync(filePath);  // 删除原始 CSV
                resolve(zipPath);
            }).on('error', reject);
        });
    }
    
    // 上传到 S3
    async function uploadToS3(filePath: string, taskId: string): Promise<string> {
        const fileContent = fs.readFileSync(filePath);
        const s3Key = `exports/${taskId}.csv.gz`;
    
        await s3.upload({
            Bucket: 'your-s3-bucket',
            Key: s3Key,
            Body: fileContent,
            ContentType: 'application/gzip'
        }).promise();
    
        return s3Key;
    }
    
    // 发送邮件
    async function sendEmail(userEmail: string, downloadLink: string) {
        const transporter = nodemailer.createTransport({
            service: 'gmail',
            auth: { user: process.env.EMAIL_USER, pass: process.env.EMAIL_PASS }
        });
    
        await transporter.sendMail({
            from: process.env.EMAIL_USER,
            to: userEmail,
            subject: 'Your Export is Ready',
            text: `Your data export is ready. Download it here: ${downloadLink}`
        });
    }
    
    // 任务执行逻辑
    export async function exportAndUploadToS3(taskId: string, userEmail: string) {
        try {
            const zipPath = await generateCsv(taskId);
            const s3Key = await uploadToS3(zipPath, taskId);
    
            // 检查文件是否成功上传
            const headParams = { Bucket: 'your-s3-bucket', Key: s3Key };
            await s3.headObject(headParams).promise();
    
            // 发送邮件
            const downloadLink = `https://your-s3-bucket.s3.amazonaws.com/${s3Key}`;
            await sendEmail(userEmail, downloadLink);
    
            console.log(`Export ${taskId} completed.`);
        } catch (error) {
            console.error(`Error processing task ${taskId}:`, error);
        }
    }