- Published on
Hono.js 中的流式返回 (Streaming)
- Authors
- Name
- Shelton Ma
1. Hono.js + OpenAI API (流式返回) 实现指南
1. 思路
- Hono.js 提供
c.execution()
等工具来处理流式响应. - OpenAI API 的 stream: true 选项会返回 ReadableStream 数据流.
- 需使用
ReadableStream、TextEncoder、TextDecoder
等 Web 标准 API.
2. 实现
后端返回
import { Hono } from 'hono'; import { streamText } from 'hono/streaming'; import axios from 'axios'; const app = new Hono(); app.post('/api/chat', async (c) => { const { prompt } = await c.req.json(); const response = await axios.post( 'https://api.openai.com/v1/chat/completions', { model: 'gpt-4', messages: [{ role: 'user', content: prompt }], stream: true, // 启用流式响应 }, { headers: { Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, 'Content-Type': 'application/json', }, responseType: 'stream', // Axios 流式模式 } ); // 使用 `streamText` 提供流式响应 return streamText(c, async (stream) => { const decoder = new TextDecoder('utf-8'); for await (const chunk of response.data) { const text = decoder.decode(chunk, { stream: true }); const parsedLines = text .trim() .split('\n') .filter((line) => line.startsWith('data: ')); for (const line of parsedLines) { const json = JSON.parse(line.replace('data: ', '').trim()); const content = json.choices[0]?.delta?.content; if (content) { stream.write(content); // 将数据传递给前端 } } } stream.close(); // 结束流 }); }); export default app;
前端请求
async function getChatResponse(prompt: string) { const res = await fetch('/api/chat', { method: 'POST', body: JSON.stringify({ prompt }), }); const reader = res.body?.getReader(); const decoder = new TextDecoder(); let result = ''; while (reader) { const { value, done } = await reader.read(); if (done) break; const chunk = decoder.decode(value, { stream: true }); result += chunk; console.log('New chunk:', chunk); // 实时打印接收的数据 } return result; }
2. 选择 Redis 来存储暂停/继续状态
1. 实现
在 Hono API 中引入 Redis 及暂停控制逻辑
import { Hono } from 'hono'; import { streamText } from 'hono/streaming'; import axios from 'axios'; import Redis from 'ioredis'; const redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379'); const app = new Hono(); // 设置暂停状态 app.post('/api/pause', async (c) => { const { sessionId } = await c.req.json(); await redis.set(`pause:${sessionId}`, 'true', 'EX', 3600); // 暂停状态,过期时间1小时 return c.json({ success: true, message: 'Paused successfully' }); }); // 取消暂停状态 app.post('/api/resume', async (c) => { const { sessionId } = await c.req.json(); await redis.del(`pause:${sessionId}`); // 删除暂停状态 return c.json({ success: true, message: 'Resumed successfully' }); }); // Chat API (流式) app.post('/api/chat', async (c) => { const { prompt, sessionId } = await c.req.json(); const response = await axios.post( 'https://api.openai.com/v1/chat/completions', { model: 'gpt-4', messages: [{ role: 'user', content: prompt }], stream: true, }, { headers: { Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, 'Content-Type': 'application/json', }, responseType: 'stream', } ); return streamText(c, async (stream) => { const decoder = new TextDecoder('utf-8'); for await (const chunk of response.data) { const text = decoder.decode(chunk, { stream: true }); const parsedLines = text .trim() .split('\n') .filter((line) => line.startsWith('data: ')); for (const line of parsedLines) { const json = JSON.parse(line.replace('data: ', '').trim()); const content = json.choices[0]?.delta?.content; if (content) { const isPaused = await redis.get(`pause:${sessionId}`); // 检查暂停状态 if (isPaused) { await redis.rpush(`buffer:${sessionId}`, content); // 缓存暂停时的内容 } else { // 推送缓冲区的内容 + 当前流式数据 const bufferedContent = await redis.lrange(`buffer:${sessionId}`, 0, -1); for (const bufferedChunk of bufferedContent) { stream.write(bufferedChunk); } await redis.del(`buffer:${sessionId}`); // 清空已发送的缓冲数据 stream.write(content); } } } } stream.close(); }); }); export default app;
2. 最佳实践总结
- 在 Hono API 端,使用 Redis 存储用户的暂停/继续状态,保证多用户隔离.
- 在前端,使用 fetch() 实现流式数据获取,并通过 API 发送暂停/继续指令.
- 实现数据缓冲,避免暂停时丢失模型响应内容.
- 利用 Redis 的过期机制,防止数据堆积造成资源占用.