1216 字
6 分钟
SSE 在大语言模型中的流式数据处理简单封装
效果展示
背景
最近我在使用大语言模型做一些简单、好玩的工具应用,比如 算命、生成名人大事记等,然而大语言模型返回的数据量比较大,如果一次性返回,那么用户体验会比较差,所以需要使用 SSE 来实现 流式数据传输,提升用户体验。
SSE 是什么?
SSE(Server-Sent Events)是一种 单向服务器推送技术,即:
服务器 → 客户端
• 长连接(保持 HTTP 连接) • 逐步推送数据,适用于 流式响应(如大模型返回 Token)
与 WebSocket 的区别
特性 | SSE | WebSocket |
---|---|---|
连接方向 | 单向(服务器到客户端) | 双向 |
适用场景 | 流式数据 | 实时聊天、多人协作 |
兼容性 | 所有现代浏览器 | 需要服务器和客户端都支持 |
连接数 | 受浏览器限制 | 无限制 |
LLM 中的 SSE 工作流程
应用场景:ChatGPT、DeepSeek Chat、Gemini 等对话模型返回数据 逐步传输,而不是一次性返回整个 JSON。
步骤
- 客户端发起请求 • 使用 EventSource(浏览器)或 fetch + ReadableStream(Node.js)创建 SSE 连接
- 服务器逐步返回数据 • 服务器端不断向 SSE 连接推送 Token
- 客户端逐步解析数据 • 浏览器或前端代码监听 SSE 事件,逐步渲染消息
SSE 代码示例
服务端 Node.js 代码示例
import OpenAI from 'openai';
const openai = new OpenAI({
baseURL: 'https://api.deepseek.com',
apiKey: process.env.DEEPSEEK_KEY,
});
const stream = await openai.chat.completions.create({
model: "deepseek-chat",
messages: [{ role: "user", content: "Hello, world!" }],
stream: true,
});
客户端代码示例 JavaScript
const stream = await fetch('http://localhost:3000/sse', {
method: 'GET',
}).then(res => res.body);
const reader = stream?.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader!.read();
if (done) break;
const text = decoder.decode(value);
console.log(text);
}
简单封装
基于大模型返回的 SSE 数据,进行简单封装,实现流式数据传输以及页面渲染,包含思考模型处理。
服务端
函数定义
// @/utils/server.ts
/**
* 获取大模型响应流
* @param completion 大模型响应流 await service.chat.completions.create 返回的流式数据 create({ stream: true })
* @param model 模型名称
* @param type 类型
* @returns 流式数据
*/
export const getStreamData = (completion: Stream<ChatCompletionChunk>) => {
let count = 0;
let thinkingCount = 0;
const stream = new ReadableStream({
async start(controller) {
try {
console.log('Starting stream processing...');
for await (const chunk of completion) {
const { choices } = chunk;
const delta = choices[0]?.delta as Delta ?? {};
if (!delta) continue;
const { reasoning_content = null, content = null } = delta;
// 添加 thinking 标签
if (count === 0 && reasoning_content) {
controller.enqueue(new TextEncoder().encode('<thinking>'));
// 等待 100ms 后避免批处理
await new Promise(resolve => setTimeout(resolve, 100));
}
if (reasoning_content) {
count++;
thinkingCount++;
controller.enqueue(new TextEncoder().encode(reasoning_content));
}
if (count - thinkingCount === 0 && thinkingCount !== 0 && content) {
controller.enqueue(new TextEncoder().encode('</thinking>'));
// 等待 100ms 后避免批处理
await new Promise(resolve => setTimeout(resolve, 100));
}
if (content) {
count++;
controller.enqueue(new TextEncoder().encode(content));
}
}
} catch (error) {
console.error('Stream processing error:', error);
controller.error(error);
} finally {
if (count === 0) {
controller.enqueue(new TextEncoder().encode('[服务器繁忙,请稍后再试。]'));
} else {
controller.enqueue(new TextEncoder().encode('[DONE]'));
}
controller.close();
}
},
});
return stream;
};
使用
import OpenAI from 'openai';
import { getStreamData } from '@/utils/server';
const openai = new OpenAI({
baseURL: 'https://api.deepseek.com',
apiKey: process.env.DEEPSEEK_KEY,
});
const completion = await openai.chat.completions.create({
model: "deepseek-chat",
messages: [{ role: "user", content: "Hello, world!" }],
stream: true,
});
const stream = await getStreamData(completion);
// 返回流式数据
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
},
});
客户端
函数定义
// @/utils/client.ts
/**
* 解析流式数据
* @param stream 流式数据 ReadableStream
* @param options 选项
* @param options.output 输出类型 默认 text 如果 output 为 json 则onResult 的 result 返回解析后的 json 对象
* @param options.onStart 流式数据开始
* @param options.onEnd 流式数据结束 形参 thinking 为思考内容,result 为结果
* @param options.onchange 实时更新最新内容 形参 thinking 为思考内容,result 为结果
* @param options.onThinkingStart 如果是思考模型,则 onThinkingStart 会触发
* @param options.onThinkingEnd 如果是思考模型,则 onThinkingEnd 会触发
*/
export const parseReadableStream = async (stream: ReadableStream<Uint8Array<ArrayBufferLike>>, options: {
output?: 'text' | 'json';
onStart?: () => void;
onEnd?: (thinking: string, result: string | Record<string, unknown>) => void;
onchange?: (thinking: string, result: string) => void;
onThinkingStart?: () => void;
onThinkingEnd?: () => void;
}) => {
const { output = 'text', onStart = () => {}, onEnd = () => {}, onThinkingStart = () => {}, onThinkingEnd = () => {}, onchange = () => {} } = options;
const reader = stream?.getReader();
const decoder = new TextDecoder();
let isReasoning = false;
let thinking = '';
let content = '';
onStart();
while (true) {
const { done, value } = await reader!.read();
if (done) break;
const text = decoder.decode(value);
if (text.includes('<thinking>')) {
isReasoning = true;
onThinkingStart();
continue;
}
if (text.includes('</thinking>')) {
isReasoning = false;
onThinkingEnd();
continue;
}
if (text.includes('[DONE]')) {
let result = content;
if (output === 'json') {
// 取出 ```json 和 ``` 之间的内容
const jsonContent = result.match(/```json\s*([\s\S]*?)\s*```/)?.[1] || '';
try {
result = JSON.parse(jsonContent);
} catch (error) {
console.error(error);
}
}
onEnd(thinking, result);
break;
}
if (isReasoning) {
thinking += text;
} else {
content += text;
}
onchange(thinking, content);
}
};
使用
import { parseReadableStream } from '@/utils/client';
const stream = await fetch('http://localhost:3000/sse', {
method: 'GET',
}).then(res => res.body);
try {
parseReadableStream(stream, {
output: 'json',
onStart: () => {
console.log('开始');
},
onEnd: (thinking, result) => {
console.log('结束', thinking, result);
},
onchange: (thinking, result) => {
// 如果 output 为 json 则 result 为 已经解析后对象
console.log('实时更新', thinking, result);
},
onThinkingStart: () => {
console.log('思考开始');
},
onThinkingEnd: () => {
console.log('思考结束');
},
});
} catch (error) {
console.error(error);
}