Published on

Hono.js 中的流式返回 (Streaming)

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

1. Hono.js + OpenAI API (流式返回) 实现指南

1. 思路

  • Hono.js 提供 c.execution() 等工具来处理流式响应.
  • OpenAI API 的 stream: true 选项会返回 ReadableStream 数据流.
  • 需使用 ReadableStream、TextEncoder、TextDecoder 等 Web 标准 API.

2. 实现

  1. 后端返回

    import { Hono } from 'hono';
    import { streamText } from 'hono/streaming';
    import axios from 'axios';
    
    const app = new Hono();
    
    app.post('/api/chat', async (c) => {
      const { prompt } = await c.req.json();
    
      const response = await axios.post(
        'https://api.openai.com/v1/chat/completions',
        {
          model: 'gpt-4',
          messages: [{ role: 'user', content: prompt }],
          stream: true, // 启用流式响应
        },
        {
          headers: {
            Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
            'Content-Type': 'application/json',
          },
          responseType: 'stream', // Axios 流式模式
        }
      );
    
      // 使用 `streamText` 提供流式响应
      return streamText(c, async (stream) => {
        const decoder = new TextDecoder('utf-8');
    
        for await (const chunk of response.data) {
          const text = decoder.decode(chunk, { stream: true });
          const parsedLines = text
            .trim()
            .split('\n')
            .filter((line) => line.startsWith('data: '));
    
          for (const line of parsedLines) {
            const json = JSON.parse(line.replace('data: ', '').trim());
            const content = json.choices[0]?.delta?.content;
    
            if (content) {
              stream.write(content); // 将数据传递给前端
            }
          }
        }
    
        stream.close(); // 结束流
      });
    });
    
    export default app;
    
  2. 前端请求

    async function getChatResponse(prompt: string) {
      const res = await fetch('/api/chat', {
        method: 'POST',
        body: JSON.stringify({ prompt }),
      });
    
      const reader = res.body?.getReader();
      const decoder = new TextDecoder();
    
      let result = '';
    
      while (reader) {
        const { value, done } = await reader.read();
        if (done) break;
    
        const chunk = decoder.decode(value, { stream: true });
        result += chunk;
    
        console.log('New chunk:', chunk); // 实时打印接收的数据
      }
    
      return result;
    }
    

2. 选择 Redis 来存储暂停/继续状态

1. 实现

  1. 在 Hono API 中引入 Redis 及暂停控制逻辑

    import { Hono } from 'hono';
    import { streamText } from 'hono/streaming';
    import axios from 'axios';
    import Redis from 'ioredis';
    
    const redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
    const app = new Hono();
    
    // 设置暂停状态
    app.post('/api/pause', async (c) => {
      const { sessionId } = await c.req.json();
      await redis.set(`pause:${sessionId}`, 'true', 'EX', 3600); // 暂停状态,过期时间1小时
      return c.json({ success: true, message: 'Paused successfully' });
    });
    
    // 取消暂停状态
    app.post('/api/resume', async (c) => {
      const { sessionId } = await c.req.json();
      await redis.del(`pause:${sessionId}`); // 删除暂停状态
      return c.json({ success: true, message: 'Resumed successfully' });
    });
    
    // Chat API (流式)
    app.post('/api/chat', async (c) => {
      const { prompt, sessionId } = await c.req.json();
    
      const response = await axios.post(
        'https://api.openai.com/v1/chat/completions',
        {
          model: 'gpt-4',
          messages: [{ role: 'user', content: prompt }],
          stream: true,
        },
        {
          headers: {
            Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
            'Content-Type': 'application/json',
          },
          responseType: 'stream',
        }
      );
    
      return streamText(c, async (stream) => {
        const decoder = new TextDecoder('utf-8');
    
        for await (const chunk of response.data) {
          const text = decoder.decode(chunk, { stream: true });
          const parsedLines = text
            .trim()
            .split('\n')
            .filter((line) => line.startsWith('data: '));
    
          for (const line of parsedLines) {
            const json = JSON.parse(line.replace('data: ', '').trim());
            const content = json.choices[0]?.delta?.content;
    
            if (content) {
              const isPaused = await redis.get(`pause:${sessionId}`); // 检查暂停状态
    
              if (isPaused) {
                await redis.rpush(`buffer:${sessionId}`, content); // 缓存暂停时的内容
              } else {
                // 推送缓冲区的内容 + 当前流式数据
                const bufferedContent = await redis.lrange(`buffer:${sessionId}`, 0, -1);
                for (const bufferedChunk of bufferedContent) {
                  stream.write(bufferedChunk);
                }
                await redis.del(`buffer:${sessionId}`); // 清空已发送的缓冲数据
                stream.write(content);
              }
            }
          }
        }
    
        stream.close();
      });
    });
    
    export default app;
    

2. 最佳实践总结

  • 在 Hono API 端,使用 Redis 存储用户的暂停/继续状态,保证多用户隔离.
  • 在前端,使用 fetch() 实现流式数据获取,并通过 API 发送暂停/继续指令.
  • 实现数据缓冲,避免暂停时丢失模型响应内容.
  • 利用 Redis 的过期机制,防止数据堆积造成资源占用.

3. WebSocket 实现流式对话的暂停/继续控制, 见使用 WebSocket 实现流式对话