Published on

服务解藕与性能优化

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

1. 何时选择 HTTP,何时选择函数调用?

  1. 使用 HTTP:
    • 服务解耦(不同服务可以独立部署、扩展、重启)
    • 需要跨服务器或容器进行通信
    • 微服务架构需要独立伸缩
  2. 使用函数调用:
    • 同一进程内部调用
    • 高性能场景,减少网络开销(如在同一后端服务的不同模块之间调用)

对于一个支持多模态的AI应用, 涉及ASR/LLM Reasoning/TTS, 各服务之间如果解藕, 可以灵活选择多种服务, 实现分流/降级等功能

2. 优化建议

  1. 若必须使用 HTTP,可考虑:
    • gRPC 代替 REST,减少序列化开销,提高吞吐量
    • HTTP/2 连接复用,减少 TCP 连接开销
    • 缓存 以减少不必要的请求
  2. 若服务间通信频繁,可以考虑:
    • 消息队列(Kafka, RabbitMQ) 处理异步任务
    • 共享数据库或缓存(Redis) 进行数据共享,减少不必要的请求

3. 事件驱动架构实现

将 TTS(Text-to-Speech)、ASR(Automatic Speech Recognition) 和 LLM(Large Language Model Reasoning) 服务进行分离,并且多实例运行

1. 解耦方式

  1. 用户请求 发送到 API 网关.
  2. API 网关 将请求 发送到消息队列(MQ)(如 Kafka).
  3. ASR、TTS、LLM 分别监听消息队列,异步处理任务.
  4. 任务完成后,将结果存入 数据库 / S3,并通过 WebSocket / Webhook 通知客户端.

2. 数据流

User Request → API Gateway → MQ (Kafka) → ASR Service → MQ → LLM Service → MQ → TTS Service → Return Response

3. 服务实现

  1. ASR 服务(消费 MQ 并返回文本)

    // asr/index.ts
    import { Kafka } from "kafkajs";
    
    // 连接 Kafka
    const kafka = new Kafka({ brokers: ["kafka-broker:9092"] });
    const consumer = kafka.consumer({ groupId: "asr-group" });
    
    async function run() {
      await consumer.connect();
      await consumer.subscribe({ topic: "audio-to-text", fromBeginning: false });
    
      await consumer.run({
        eachMessage: async ({ message }) => {
          const audioData = message.value.toString();
          console.log("Processing ASR for:", audioData);
    
          // 处理音频识别
          const text = await processAudioToText(audioData);
    
          // 推送到下一个队列
          const producer = kafka.producer();
          await producer.connect();
          await producer.send({
            topic: "text-to-llm",
            messages: [{ value: JSON.stringify({ text }) }],
          });
          await producer.disconnect();
        },
      });
    }
    run();
    
  2. LLM 服务(消费文本并返回 AI 响应)

    const consumer = kafka.consumer({ groupId: "llm-group" });
    
    async function run() {
      await consumer.connect();
      await consumer.subscribe({ topic: "text-to-llm", fromBeginning: false });
    
      await consumer.run({
        eachMessage: async ({ message }) => {
          const { text } = JSON.parse(message.value.toString());
          console.log("Processing LLM for:", text);
    
          // AI 生成回复
          const response = await generateAIResponse(text);
    
          // 推送到下一个队列
          const producer = kafka.producer();
          await producer.connect();
          await producer.send({
            topic: "llm-to-tts",
            messages: [{ value: JSON.stringify({ response }) }],
          });
          await producer.disconnect();
        },
      });
    }
    run();
    
  3. TTS 服务(消费 AI 生成的文本并返回音频)

    const consumer = kafka.consumer({ groupId: "tts-group" });
    
    async function run() {
      await consumer.connect();
      await consumer.subscribe({ topic: "llm-to-tts", fromBeginning: false });
    
      await consumer.run({
        eachMessage: async ({ message }) => {
          const { response } = JSON.parse(message.value.toString());
          console.log("Processing TTS for:", response);
    
          // 生成语音
          const audio = await generateSpeech(response);
    
          // 存储到 S3,并返回 URL
          const audioUrl = await uploadToS3(audio);
    
          console.log("TTS Completed:", audioUrl);
        },
      });
    }
    run();
    

4. 汇总

  1. TTS / ASR / LLM 服务分离,解耦架构
  2. 使用消息队列(Kafka / RabbitMQ) 进行事件驱动
  3. API Gateway 作为入口
  4. 容器化部署(Docker + Kubernetes / AWS ECS)
  5. 自动扩容(K8s HPA / AWS Auto Scaling)