- Published on
使用 WebSocket 实现流式对话
- Authors
- Name
- Shelton Ma
WebSocket 保持持久连接,确保更低延迟的暂停/继续控制.
1. WebSocket 实现流式对话的暂停/继续控制
1. 创建 WebSocket 服务 (Hono API)
import { Hono } from 'hono';
import { streamText } from 'hono/streaming';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import axios from 'axios';
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
const app = new Hono();
// WebSocket 服务
const server = createServer();
const wss = new WebSocketServer({ server });
interface ControlState {
isPaused: boolean;
buffer: string[];
}
// WebSocket 连接处理
wss.on('connection', (ws, req) => {
const sessionId = new URL(req.url || '', 'http://localhost').searchParams.get('sessionId');
if (!sessionId) {
ws.close(4000, 'Missing sessionId');
return;
}
ws.on('message', async (message) => {
const { action } = JSON.parse(message.toString());
if (action === 'pause') {
await redis.set(`pause:${sessionId}`, 'true', 'EX', 3600);
}
if (action === 'resume') {
await redis.del(`pause:${sessionId}`);
const bufferedContent = await redis.lrange(`buffer:${sessionId}`, 0, -1);
bufferedContent.forEach((chunk) => ws.send(chunk));
await redis.del(`buffer:${sessionId}`);
}
});
});
// Chat API (流式)
app.post('/api/chat', async (c) => {
const { prompt, sessionId } = await c.req.json();
const response = await axios.post(
'https://api.openai.com/v1/chat/completions',
{
model: 'gpt-4',
messages: [{ role: 'user', content: prompt }],
stream: true,
},
{
headers: {
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
'Content-Type': 'application/json',
},
responseType: 'stream',
}
);
return streamText(c, async (stream) => {
const decoder = new TextDecoder('utf-8');
for await (const chunk of response.data) {
const text = decoder.decode(chunk, { stream: true });
const parsedLines = text
.trim()
.split('\n')
.filter((line) => line.startsWith('data: '));
for (const line of parsedLines) {
const json = JSON.parse(line.replace('data: ', '').trim());
const content = json.choices[0]?.delta?.content;
if (content) {
const isPaused = await redis.get(`pause:${sessionId}`);
if (isPaused) {
await redis.rpush(`buffer:${sessionId}`, content);
} else {
const bufferedContent = await redis.lrange(`buffer:${sessionId}`, 0, -1);
for (const bufferedChunk of bufferedContent) {
stream.write(bufferedChunk);
}
await redis.del(`buffer:${sessionId}`);
stream.write(content);
}
}
}
}
stream.close();
});
});
// 启动 HTTP 服务
server.on('request', app.fetch);
server.listen(3000, () => console.log('Server running on http://localhost:3000'));
2. 前端 WebSocket 客户端实现
import { useState, useEffect, useRef } from 'react';
export default function Chat() {
const [responseText, setResponseText] = useState('');
const [isPaused, setIsPaused] = useState(false);
const wsRef = useRef<WebSocket | null>(null);
const sessionId = 'user-123'; // 示例用户 ID
// WebSocket 初始化
useEffect(() => {
wsRef.current = new WebSocket(`ws://localhost:3000?sessionId=${sessionId}`);
wsRef.current.onmessage = (event) => {
setResponseText((prev) => prev + event.data);
};
return () => wsRef.current?.close();
}, []);
// 开始聊天 (流式请求)
const handleChat = async () => {
await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ prompt: '请讲一个关于宇宙的故事', sessionId }),
});
};
const handlePause = () => {
setIsPaused(true);
wsRef.current?.send(JSON.stringify({ action: 'pause' }));
};
const handleResume = () => {
setIsPaused(false);
wsRef.current?.send(JSON.stringify({ action: 'resume' }));
};
return (
<div>
<button onClick={handleChat}>开始聊天</button>
<button onClick={handlePause}>暂停</button>
<button onClick={handleResume}>继续</button>
<div>
<pre>{responseText}</pre>
</div>
</div>
);
}
3. 总结
WebSocket 保持持久连接,确保更低延迟的暂停/继续控制.
2. WebSocket 的断线重连如何实现?
1. 断线重连核心思路
- 捕获 onclose 和 onerror 事件,检测断线
- 实现自动重连机制
- 使用 指数退避算法 (Exponential Backoff) 降低频繁重连的风险
- 限制最大重试次数,避免陷入无限循环
- 心跳检测 (Heartbeat) 机制,主动发现断连并触发重连
- 状态检查,避免重复创建连接
2. 客户端 WebSocket 断线重连实现
以下实现考虑了:
- 自动重连
- 指数退避
- 心跳检测
- 断线时恢复未发送的消息
WebSocket 客户端
ReconnectingWebSocket
class ReconnectingWebSocket { constructor(url, options = {}) { this.url = url; this.reconnectInterval = options.reconnectInterval || 1000; // 初始重连间隔,单位 ms this.maxReconnectInterval = options.maxReconnectInterval || 30000; // 最大重连间隔,单位 ms this.reconnectAttempts = 0; // 重连次数 this.maxReconnectAttempts = options.maxReconnectAttempts || 5; // 最大重连次数 this.heartbeatInterval = options.heartbeatInterval || 5000; // 心跳包发送间隔,单位 ms this.heartbeatTimer = null; // 用于心跳包的定时器 this.messageQueue = []; // 用于缓存未发送的消息 this.socket = null; this.connect(); } // 连接 WebSocket connect() { this.socket = new WebSocket(this.url); this.socket.onopen = () => { console.log('[WebSocket] 连接成功'); this.reconnectAttempts = 0; this.startHeartbeat(); // 重连后发送缓存的消息 this.messageQueue.forEach((msg) => this.socket.send(msg)); this.messageQueue = []; // 清空缓存消息 }; this.socket.onmessage = (event) => { console.log('[WebSocket] 收到消息:', event.data); if (this.onMessage) { this.onMessage(event.data); } }; this.socket.onclose = () => { console.warn('[WebSocket] 连接关闭'); this.reconnect(); }; this.socket.onerror = (error) => { console.error('[WebSocket] 连接错误:', error); this.socket.close(); }; } // 断线后重连 reconnect() { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('[WebSocket] 达到最大重连次数,停止重连'); return; } const delay = Math.min( this.reconnectInterval * (2 ** this.reconnectAttempts), // 指数退避算法 this.maxReconnectInterval ); setTimeout(() => { console.log(`[WebSocket] 正在尝试第 ${this.reconnectAttempts + 1} 次重连...`); this.reconnectAttempts++; this.connect(); // 尝试重新连接 }, delay); } // 启动心跳包,定时发送 ping 消息 startHeartbeat() { if (this.heartbeatTimer) clearInterval(this.heartbeatTimer); this.heartbeatTimer = setInterval(() => { if (this.socket.readyState === WebSocket.OPEN) { console.log('[WebSocket] 发送心跳包'); this.socket.send(JSON.stringify({ type: 'ping' })); } }, this.heartbeatInterval); } // 发送消息 send(data) { if (this.socket.readyState === WebSocket.OPEN) { this.socket.send(data); } else { console.warn('[WebSocket] 连接已断开,消息已缓存'); this.messageQueue.push(data); // 连接未建立时,缓存消息 } } // 关闭连接 close() { if (this.heartbeatTimer) clearInterval(this.heartbeatTimer); this.socket.close(); } }
使用
const ws = new ReconnectingWebSocket('ws://localhost:3000', { reconnectInterval: 1000, // 初始重连间隔 maxReconnectInterval: 30000, // 最大重连间隔 maxReconnectAttempts: 10, // 最大重连次数 heartbeatInterval: 5000 // 心跳包间隔 }); ws.onMessage = (msg) => { console.log(`从服务器收到消息: ${msg}`); }; ws.send('Hello, Server!'); }
3. 服务端断线需要使用 Redis 持久化缓存
服务端
const WebSocket = require('ws'); const redis = require('redis'); const wss = new WebSocket.Server({ port: 3000 }); const redisClient = redis.createClient(); // 连接 Redis redisClient.on('connect', () => { console.log('连接到 Redis 成功'); }); wss.on('connection', (ws, req) => { const userId = req.url.split('/')[1]; // 假设 URL 中包含用户 ID console.log(`[WebSocket] 用户 ${userId} 已连接`); // 在连接时发送缓存的消息 redisClient.lrange(userId, 0, -1, (err, messages) => { if (messages && messages.length > 0) { messages.forEach((message) => { ws.send(message); // 发送缓存的消息 }); redisClient.del(userId); // 清空缓存 } }); ws.on('message', (message) => { console.log(`[WebSocket] 接收到来自 ${userId} 的消息:`, message); // 处理消息逻辑 }); ws.on('close', () => { console.log(`[WebSocket] 用户 ${userId} 断开连接`); // 将未接收到的消息保存在 Redis 中 redisClient.rpush(userId, '未接收到的消息'); // 这里用实际消息代替 }); ws.on('error', (err) => { console.error(`[WebSocket] 发生错误: ${err}`); }); }); console.log('[WebSocket] 服务器启动在端口 3000');
3. 其他机制
- Token 续期机制
- WebSocket 升级 (Upgrade) 流程
- 连接池管理