Published on

在搭建 AI 会话 API 时,选择 WebSocket 还是 HTTP + Stream 返回

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

1. WebSocket 对比 HTTP + Stream

1. HTTP + 流式返回 (HTTP Streaming)

优点:

  • 实现简单:只需使用标准 HTTP 请求即可,无需额外的连接维护.
  • 兼容性好:大多数前端框架(如 Next.js、React)以及浏览器原生支持.
  • 易于调试和监控:HTTP 请求日志易于追踪.
  • 更适合非持续性对话:如单次提问、一次性获取结果的场景.

缺点:

  • 不适合高频交互:每次请求都要重新建立连接,增加开销.
  • 实时性较弱:如果需要实现快速的双向通信,HTTP 流式返回可能较慢.
  • 无内置的双向通信机制:暂停/继续、推送消息等需要额外设计.

适用场景:

  • 一次性获取结果的 AI 应用,例如文本生成、内容摘要等.
  • 短暂的对话,用户输入问题后直接返回完整结果.

2. WebSocket

优点:

  • 低延迟、双向通信:建立连接后,客户端和服务端都可以主动推送数据.
  • 更适合持续性对话:如实时对话、流畅的问答交互.
  • 状态保持 (Stateful):无需为每条消息重复身份验证,减少延迟.
  • 更适合高并发场景:WebSocket 连接可承载大量并发请求.

缺点:

  • 实现复杂度较高:需要额外的心跳机制、断线重连、超时处理等.
  • 部分 HTTP 代理可能不支持 WebSocket,需额外配置.
  • 状态管理难度较高:多用户会话需要有效管理.

适用场景:

  • 实时对话场景(如 AI 聊天、语音助手).
  • 流式传输并可随时暂停/继续的 AI 应用.
  • 多用户互动或聊天室类应用.

3. 混合方案

可以结合两者优势:

  • 使用 HTTP + 流式返回 处理初始响应,快速获取第一段回复.
  • 使用 WebSocket 维护后续的对话,实现更顺畅的交互体验.

2. 实现方案

架构 Client (Next.js) <------> WebSocket Server (Node.js Hono) <------> OpenAI API (或其他模型)

1. 服务端实现 (Node.js + Hono.js)

  1. WebSocket 服务
// /server/index.ts
import { Hono } from 'hono';
import { serve } from '@hono/node-server';
import { WebSocketServer } from 'ws';

const app = new Hono();
const wss = new WebSocketServer({ port: 3001 });

const sessions = new Map<string, { isPaused: boolean }>();

wss.on('connection', (ws) => {
  const sessionId = crypto.randomUUID();
  sessions.set(sessionId, { isPaused: false });

  ws.on('message', async (message) => {
    const { action, data } = JSON.parse(message.toString());

    if (action === 'start') {
      const { prompt } = data;
      const stream = await fetchOpenAIStream(prompt, sessionId);
      ws.send(JSON.stringify({ action: 'stream', data: stream }));
    }

    if (action === 'pause') {
      sessions.get(sessionId)!.isPaused = true;
    }

    if (action === 'resume') {
      sessions.get(sessionId)!.isPaused = false;
    }
  });

  ws.on('close', () => {
    sessions.delete(sessionId);
  });
});

async function fetchOpenAIStream(prompt: string, sessionId: string) {
  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      model: 'gpt-4',
      messages: [{ role: 'user', content: prompt }],
      stream: true
    })
  });

  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  let result = '';

  while (true) {
    const { done, value } = await reader!.read();
    if (done) break;

    const isPaused = sessions.get(sessionId)?.isPaused;
    if (isPaused) continue;

    result += decoder.decode(value);
    wss.clients.forEach((client) => {
      if (client.readyState === 1) {
        client.send(JSON.stringify({ action: 'update', data: result }));
      }
    });
  }

  return result;
}

// 处理 LLM 文本流 + TTS 语音合成
async function processLLMWithTTS(prompt: string, sessionId: string, ws: WebSocket) {
  const llmResponse = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      model: 'gpt-4',
      messages: [{ role: 'user', content: prompt }],
      stream: true
    })
  });

  const reader = llmResponse.body?.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader!.read();
    if (done) break;

    const session = sessions.get(sessionId);
    if (session?.isStopped) break;    // 💥 立即终止
    if (session?.isPaused) continue;  // ⏸️ 暂停

    const textChunk = decoder.decode(value);
    const audioChunk = await synthesizeTTS(textChunk);

    ws.send(JSON.stringify({ action: 'audio', data: audioChunk }));
  }
}

// 模拟 TTS 音频合成 (使用真实 API 替换)
async function synthesizeTTS(text: string): Promise<string> {
  const response = await fetch('https://api.tts-provider.com/v1/stream', {
    method: 'POST',
    headers: { 'Authorization': `Bearer ${process.env.TTS_API_KEY}` },
    body: JSON.stringify({ text })
  });

  const audioData = await response.arrayBuffer();
  return Buffer.from(audioData).toString('base64'); // 以 Base64 格式返回
}

serve({ fetch: app.fetch, port: 3000 });

2. WebSocket 前端逻辑

// /client/hooks/useWebSocket.ts

import { useEffect, useRef, useState } from 'react';

export const useWebSocket = (url: string) => {
  const ws = useRef<WebSocket | null>(null);
  const [messages, setMessages] = useState<string[]>([]);
  const [isPaused, setIsPaused] = useState(false);

  useEffect(() => {
    ws.current = new WebSocket(url);

    ws.current.onmessage = (event) => {
      const { action, data } = JSON.parse(event.data);
      if (action === 'update') {
        setMessages((prev) => [...prev, data]);
      }
    };

    ws.current.onclose = () => console.warn('WebSocket closed');
    ws.current.onerror = (err) => console.error('WebSocket error', err);

    return () => ws.current?.close();
  }, [url]);

  const sendMessage = (action: string, data: any) => {
    ws.current?.send(JSON.stringify({ action, data }));
  };

  const start = (prompt: string) => sendMessage('start', { prompt });
  const pause = () => {
    sendMessage('pause', {});
    setIsPaused(true);
  };
  const resume = () => {
    sendMessage('resume', {});
    setIsPaused(false);
  };

  return { messages, start, pause, resume, isPaused };
};

3 聊天界面

  1. 文本实现

    // /client/app/page.tsx
    "use client";
    import { useState } from 'react';
    import { useWebSocket } from '@/hooks/useWebSocket';
    
    export default function ChatPage() {
      const { messages, start, pause, resume, isPaused } = useWebSocket('ws://localhost:3001');
      const [prompt, setPrompt] = useState('');
    
      return (
        <div className="p-6 max-w-2xl mx-auto">
          <h1 className="text-2xl font-bold mb-4">AI Chat App</h1>
          <textarea
            value={prompt}
            onChange={(e) => setPrompt(e.target.value)}
            className="w-full p-2 border rounded mb-4"
            rows={4}
          />
          <div className="flex gap-2 mb-4">
            <button onClick={() => start(prompt)} className="px-4 py-2 bg-blue-500 text-white rounded">
              Start
            </button>
            <button
              onClick={isPaused ? resume : pause}
              className={`px-4 py-2 rounded ${isPaused ? 'bg-green-500' : 'bg-yellow-500'} text-white`}
            >
              {isPaused ? 'Resume' : 'Pause'}
            </button>
          </div>
          <div className="border p-4 rounded bg-gray-50 h-80 overflow-y-auto">
            {messages.map((msg, idx) => (
              <p key={idx} className="mb-2">{msg}</p>
            ))}
          </div>
        </div>
      );
    }
    
  2. 语音实现

    "use client";
    import { useState } from 'react';
    import { useWebSocket } from '@/hooks/useWebSocket';
    
    export default function ChatPage() {
      const { audioChunks, start, pause, resume, stop, isPaused } = useWebSocket('ws://localhost:3001');
      const [prompt, setPrompt] = useState('');
    
      return (
        <div className="p-6 max-w-2xl mx-auto">
          <h1 className="text-2xl font-bold mb-4">AI Voice Assistant</h1>
          <textarea
            value={prompt}
            onChange={(e) => setPrompt(e.target.value)}
            className="w-full p-2 border rounded mb-4"
            rows={4}
          />
          <div className="flex gap-2 mb-4">
            <button onClick={() => start(prompt)} className="px-4 py-2 bg-blue-500 text-white rounded">
              Start
            </button>
            <button
              onClick={isPaused ? resume : pause}
              className={`px-4 py-2 rounded ${isPaused ? 'bg-green-500' : 'bg-yellow-500'} text-white`}
            >
              {isPaused ? 'Resume' : 'Pause'}
            </button>
            <button onClick={stop} className="px-4 py-2 bg-red-500 text-white rounded">
              Stop
            </button>
          </div>
          <div className="border p-4 rounded bg-gray-50 h-80 overflow-y-auto">
            {audioChunks.map((chunk, idx) => (
              <audio key={idx} controls src={`data:audio/mp3;base64,${chunk}`} />
            ))}
          </div>
        </div>
      );
    }
    

4. 断线重连逻辑

useEffect(() => {
  const connect = () => {
    ws.current = new WebSocket(url);

    ws.current.onopen = () => console.log('WebSocket connected');
    ws.current.onclose = () => {
      console.warn('WebSocket closed, reconnecting in 3s...');
      setTimeout(connect, 3000); // 3s 重连机制
    };
    ws.current.onerror = (err) => console.error('WebSocket error', err);
  };

  connect();

  return () => ws.current?.close();
}, [url]);