Published on

使用 WebSocket 实现流式对话

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

WebSocket 保持持久连接,确保更低延迟的暂停/继续控制.

1. WebSocket 实现流式对话的暂停/继续控制

1. 创建 WebSocket 服务 (Hono API)

import { Hono } from 'hono';
import { streamText } from 'hono/streaming';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import axios from 'axios';
import Redis from 'ioredis';

const redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
const app = new Hono();

// WebSocket 服务
const server = createServer();
const wss = new WebSocketServer({ server });

interface ControlState {
  isPaused: boolean;
  buffer: string[];
}

// WebSocket 连接处理
wss.on('connection', (ws, req) => {
  const sessionId = new URL(req.url || '', 'http://localhost').searchParams.get('sessionId');

  if (!sessionId) {
    ws.close(4000, 'Missing sessionId');
    return;
  }

  ws.on('message', async (message) => {
    const { action } = JSON.parse(message.toString());

    if (action === 'pause') {
      await redis.set(`pause:${sessionId}`, 'true', 'EX', 3600);
    }

    if (action === 'resume') {
      await redis.del(`pause:${sessionId}`);
      const bufferedContent = await redis.lrange(`buffer:${sessionId}`, 0, -1);
      bufferedContent.forEach((chunk) => ws.send(chunk));
      await redis.del(`buffer:${sessionId}`);
    }
  });
});

// Chat API (流式)
app.post('/api/chat', async (c) => {
  const { prompt, sessionId } = await c.req.json();

  const response = await axios.post(
    'https://api.openai.com/v1/chat/completions',
    {
      model: 'gpt-4',
      messages: [{ role: 'user', content: prompt }],
      stream: true,
    },
    {
      headers: {
        Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
        'Content-Type': 'application/json',
      },
      responseType: 'stream',
    }
  );

  return streamText(c, async (stream) => {
    const decoder = new TextDecoder('utf-8');

    for await (const chunk of response.data) {
      const text = decoder.decode(chunk, { stream: true });
      const parsedLines = text
        .trim()
        .split('\n')
        .filter((line) => line.startsWith('data: '));

      for (const line of parsedLines) {
        const json = JSON.parse(line.replace('data: ', '').trim());
        const content = json.choices[0]?.delta?.content;

        if (content) {
          const isPaused = await redis.get(`pause:${sessionId}`);
          if (isPaused) {
            await redis.rpush(`buffer:${sessionId}`, content);
          } else {
            const bufferedContent = await redis.lrange(`buffer:${sessionId}`, 0, -1);
            for (const bufferedChunk of bufferedContent) {
              stream.write(bufferedChunk);
            }
            await redis.del(`buffer:${sessionId}`);
            stream.write(content);
          }
        }
      }
    }

    stream.close();
  });
});

// 启动 HTTP 服务
server.on('request', app.fetch);
server.listen(3000, () => console.log('Server running on http://localhost:3000'));

2. 前端 WebSocket 客户端实现

import { useState, useEffect, useRef } from 'react';

export default function Chat() {
  const [responseText, setResponseText] = useState('');
  const [isPaused, setIsPaused] = useState(false);
  const wsRef = useRef<WebSocket | null>(null);

  const sessionId = 'user-123'; // 示例用户 ID

  // WebSocket 初始化
  useEffect(() => {
    wsRef.current = new WebSocket(`ws://localhost:3000?sessionId=${sessionId}`);

    wsRef.current.onmessage = (event) => {
      setResponseText((prev) => prev + event.data);
    };

    return () => wsRef.current?.close();
  }, []);

  // 开始聊天 (流式请求)
  const handleChat = async () => {
    await fetch('/api/chat', {
      method: 'POST',
      body: JSON.stringify({ prompt: '请讲一个关于宇宙的故事', sessionId }),
    });
  };

  const handlePause = () => {
    setIsPaused(true);
    wsRef.current?.send(JSON.stringify({ action: 'pause' }));
  };

  const handleResume = () => {
    setIsPaused(false);
    wsRef.current?.send(JSON.stringify({ action: 'resume' }));
  };

  return (
    <div>
      <button onClick={handleChat}>开始聊天</button>
      <button onClick={handlePause}>暂停</button>
      <button onClick={handleResume}>继续</button>
      <div>
        <pre>{responseText}</pre>
      </div>
    </div>
  );
}

3. 总结

WebSocket 保持持久连接,确保更低延迟的暂停/继续控制.

2. WebSocket 的断线重连如何实现?

1. 断线重连核心思路

  1. 捕获 onclose 和 onerror 事件,检测断线
  2. 实现自动重连机制
    • 使用 指数退避算法 (Exponential Backoff) 降低频繁重连的风险
    • 限制最大重试次数,避免陷入无限循环
  3. 心跳检测 (Heartbeat) 机制,主动发现断连并触发重连
  4. 状态检查,避免重复创建连接

2. 客户端 WebSocket 断线重连实现

以下实现考虑了:

  • 自动重连
  • 指数退避
  • 心跳检测
  • 断线时恢复未发送的消息
  1. WebSocket 客户端 ReconnectingWebSocket

    class ReconnectingWebSocket {
      constructor(url, options = {}) {
        this.url = url;
        this.reconnectInterval = options.reconnectInterval || 1000;  // 初始重连间隔,单位 ms
        this.maxReconnectInterval = options.maxReconnectInterval || 30000;  // 最大重连间隔,单位 ms
        this.reconnectAttempts = 0;  // 重连次数
        this.maxReconnectAttempts = options.maxReconnectAttempts || 5;  // 最大重连次数
        this.heartbeatInterval = options.heartbeatInterval || 5000;  // 心跳包发送间隔,单位 ms
        this.heartbeatTimer = null;  // 用于心跳包的定时器
        this.messageQueue = [];  // 用于缓存未发送的消息
        this.socket = null;
    
        this.connect();
      }
    
      // 连接 WebSocket
      connect() {
        this.socket = new WebSocket(this.url);
    
        this.socket.onopen = () => {
          console.log('[WebSocket] 连接成功');
          this.reconnectAttempts = 0;
          this.startHeartbeat();
          // 重连后发送缓存的消息
          this.messageQueue.forEach((msg) => this.socket.send(msg));
          this.messageQueue = [];  // 清空缓存消息
        };
    
        this.socket.onmessage = (event) => {
          console.log('[WebSocket] 收到消息:', event.data);
          if (this.onMessage) {
            this.onMessage(event.data);
          }
        };
    
        this.socket.onclose = () => {
          console.warn('[WebSocket] 连接关闭');
          this.reconnect();
        };
    
        this.socket.onerror = (error) => {
          console.error('[WebSocket] 连接错误:', error);
          this.socket.close();
        };
      }
    
      // 断线后重连
      reconnect() {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
          console.error('[WebSocket] 达到最大重连次数,停止重连');
          return;
        }
    
        const delay = Math.min(
          this.reconnectInterval * (2 ** this.reconnectAttempts),  // 指数退避算法
          this.maxReconnectInterval
        );
    
        setTimeout(() => {
          console.log(`[WebSocket] 正在尝试第 ${this.reconnectAttempts + 1} 次重连...`);
          this.reconnectAttempts++;
          this.connect();  // 尝试重新连接
        }, delay);
      }
    
      // 启动心跳包,定时发送 ping 消息
      startHeartbeat() {
        if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
        this.heartbeatTimer = setInterval(() => {
          if (this.socket.readyState === WebSocket.OPEN) {
            console.log('[WebSocket] 发送心跳包');
            this.socket.send(JSON.stringify({ type: 'ping' }));
          }
        }, this.heartbeatInterval);
      }
    
      // 发送消息
      send(data) {
        if (this.socket.readyState === WebSocket.OPEN) {
          this.socket.send(data);
        } else {
          console.warn('[WebSocket] 连接已断开,消息已缓存');
          this.messageQueue.push(data);  // 连接未建立时,缓存消息
        }
      }
    
      // 关闭连接
      close() {
        if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
        this.socket.close();
      }
    }
    
  2. 使用

    const ws = new ReconnectingWebSocket('ws://localhost:3000', {
        reconnectInterval: 1000, // 初始重连间隔
        maxReconnectInterval: 30000, // 最大重连间隔
        maxReconnectAttempts: 10, // 最大重连次数
        heartbeatInterval: 5000 // 心跳包间隔
      });
    
      ws.onMessage = (msg) => {
        console.log(`从服务器收到消息: ${msg}`);
      };
    
      ws.send('Hello, Server!');
    }
    

3. 服务端断线需要使用 Redis 持久化缓存

  1. 服务端

    const WebSocket = require('ws');
    const redis = require('redis');
    const wss = new WebSocket.Server({ port: 3000 });
    
    const redisClient = redis.createClient();
    
    // 连接 Redis
    redisClient.on('connect', () => {
      console.log('连接到 Redis 成功');
    });
    
    wss.on('connection', (ws, req) => {
      const userId = req.url.split('/')[1]; // 假设 URL 中包含用户 ID
      console.log(`[WebSocket] 用户 ${userId} 已连接`);
    
      // 在连接时发送缓存的消息
      redisClient.lrange(userId, 0, -1, (err, messages) => {
        if (messages && messages.length > 0) {
          messages.forEach((message) => {
            ws.send(message);  // 发送缓存的消息
          });
          redisClient.del(userId);  // 清空缓存
        }
      });
    
      ws.on('message', (message) => {
        console.log(`[WebSocket] 接收到来自 ${userId} 的消息:`, message);
        // 处理消息逻辑
      });
    
      ws.on('close', () => {
        console.log(`[WebSocket] 用户 ${userId} 断开连接`);
        // 将未接收到的消息保存在 Redis 中
        redisClient.rpush(userId, '未接收到的消息');  // 这里用实际消息代替
      });
    
      ws.on('error', (err) => {
        console.error(`[WebSocket] 发生错误: ${err}`);
      });
    });
    
    console.log('[WebSocket] 服务器启动在端口 3000');
    

3. 其他机制

  • Token 续期机制
  • WebSocket 升级 (Upgrade) 流程
  • 连接池管理