前言 最近项目需求,开发与AI相关的功能,接触到一些AI大模型相关的技术,其中前端处理大模型返回的数据流是必不可少的一环,流式SSE处理工具fetch-event-source,可以很好的解决这个问题。
其次 MCP
也定义了两种标准的数据格式,stdio
以及sse
,其中stdio
是标准输入输出(对本地集成和命令行工具特别有用),sse
是服务器发送事件,这两种数据格式在AI大模型中非常常见。
什么是SSE SSE(Server-Sent Events)是一种服务器向浏览器发送事件的机制。它允许服务器向浏览器发送实时数据,而不需要浏览器发起请求。SSE是一种单向的、基于HTTP的协议,适用于需要实时数据传输的场景。
传统处理方式 浏览器内置EventSource
对象,可以很方便的实现SSE处理。不过这个EventSource
有一个非常致命的缺点,那就是只支持GET类型的请求,并且不支持任何自定义的头部 。需要与业务交互,显然是不可接受的。
EventSource官方链接
1 2 3 4 5 6 const response = await fetch(url, { method: 'GET' , headers: { 'Content-Type' : 'text/event-stream' , }, });
@microsoft/fetch-event-source @microsoft/fetch-event-source 是一个用于处理服务器发送事件(SSE)的工具,它提供了一种方便的方式来接收和处理来自服务器的实时数据流。这个包的主要作用是提供一个遵循 WHATWG Fetch
标准的 API 来处理 SSE。实际上目前的ChatGPT实现的双向信息流也是基于这个库进行开发的
安装 1 npm install @microsoft/fetch-event-source
使用 1 2 3 4 5 6 7 import { fetchEventSource } from '@microsoft/fetch-event-source' ;fetchEventSource(url, { onmessage: (event ) => { console .log(event); }, });
简单封装 封装参考开源项目 Wren AI
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import { EventStreamContentType, fetchEventSource } from '@microsoft/fetch-event-source' ;commonSseProcess( url: string, params: { options: any; signal?: AbortSignal; startCallback: (chunk: string ) => void ; messageRecived: (chunk: string, eventName?: string ) => void ; doneCallback: (chunk: string ) => void ; errorCallback: (error: string ) => void ; }, ) { fetchEventSource(url, { method: 'POST' , headers: { 'Content-Type' : 'application/json' , }, signal: params.signal, body: JSON .stringify({ ...params.options, }), onopen: async (response) => { const resStatus = response.status; const resStatusText = response.statusText; if (response.ok && response.headers.get('content-type' ) === EventStreamContentType) { } else { this .createErrorPromise(resStatus, resStatusText, response); } }, onmessage(eventMessage) { if (eventMessage.event === 'START' ) { params.startCallback(eventMessage.data); return ; } else if (eventMessage.event === 'ERROR' ) { console .log(`error:${eventMessage} ` ); params.errorCallback(eventMessage.data); return ; } else if (eventMessage.event === 'END' ) { params.doneCallback(eventMessage.data); return ; } params.messageRecived(eventMessage.data, eventMessage.event); }, onerror(error) { console .log(`sse error:${error} ` ); params.errorCallback(error); throw error; }, openWhenHidden: true , }); }
注意点
后端返回的数据流<field>:<value>
fetch-event-source只会处理冒号,后端返回的数据流为<field>:<value>
,后端会处理冒号和冒号后面的空格
相关源码:
https://github.com/Azure/fetch-event-source/blob/45ac3cfffd30b05b79fbf95c21e67d4ef59aa56a/src/parse.ts#L129-L133
当响应结果的content中包含有多行文本时,SSE会将包含有换行符的那一行内容替换为空字符串,故需要先将换行符与后面的内容拆分并转成,前端碰到换行标志时转成换行符处理1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 messageRecived: (chunk: string, eventName: STREAM_EVENT ) => { const { question } = options || {}; const processedChunk = chunk ? chunk.replace('-_-_wrap_-_-' , '\r\n' ) : chunk; if (onMessage) onMessage(processedChunk, eventName, conversationId, question); try { const parsedData = JSON .parse(processedChunk); setData(parsedData as T); } catch (e) { console .error('解析SSE消息失败:' , e); } },
这个问题非常的隐蔽且难以理解,问题过程如下(后续会解释一下源码里的处理流程) 假设服务器发送了这样的数据:
根据 SSE (Server-Sent Events) 规范,当数据中包含换行符 \n 时,应该被拆分成多个 “data:” 行发送:
让我们看看数据是如何在代码中流动的:
首先在 getLines 函数中:
getLines源码
1 2 3 4 case ControlChars.NewLine: lineEnd = position; break ;
然后在 getMessages 函数中:
getMessages源码
1 2 3 4 5 case 'data' : message.data = message.data ? message.data + '\n' + value : value; break ;
问题就出在这里:
当第一行 data: Hello 到达时:message.data 为空,所以 message.data = “Hello”
当第二行 data: World 到达时:message.data 已经有值,所以执行 message.data + ‘\n’ + value
变成 “Hello\nWorld”
但是由于 SSE 的处理机制,这个包含换行符的内容可能会被错误地解析,导致内容被清空或丢失。
@microsoft/fetch-event-source 源码解析 1. 核心方法fetchEventSource 核心方法源码
入参 1 2 3 4 5 6 7 8 9 10 11 fetchEventSource(input: RequestInfo, { signal: inputSignal, headers: inputHeaders, onopen: inputOnOpen, onmessage, onclose, onerror, openWhenHidden, fetch: inputFetch, ...rest }: FetchEventSourceInit)
其中openWhenHidden的默认值为false,if false, 后续 document.addEventListener('visibilitychange', onVisibilityChange);
1 2 3 4 5 6 function onVisibilityChange ( ) { curRequestController.abort(); if (!document .hidden) { create(); } }
数据处理 数据处理主要是getBytes
getLines
getMessages
,整个代码的核心部分像是一个洋葱圈模型,分别使用闭包记录了每一层的数据,从文本 -> 行 -> 消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 await getBytes( response.body!, getLines( getMessages(id => { if (id) { headers[LastEventId] = id; } else { delete headers[LastEventId]; } }, retry => { retryInterval = retry; }, onmessage ) ));
getBytes 这个方法非常的简单,仅仅是读取后端返回的流,并调用onChunk
回调函数,将流中的数据传递给onChunk
,这个onChunk
就是getLines
返回的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 export async function getBytes (stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array ) => void ) { const reader = stream.getReader(); let result: ReadableStreamDefaultReadResult<Uint8Array >; while (!(result = await reader.read()).done) { onChunk(result.value); } }
getLines 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 export function getLines (onLine: (line: Uint8Array, fieldLength: number ) => void ) { let buffer: Uint8Array | undefined ; let position: number; let fieldLength: number; let discardTrailingNewline = false ; return function onChunk (arr: Uint8Array ) { if (buffer === undefined ) { buffer = arr; position = 0 ; fieldLength = -1 ; } else { buffer = concat(buffer, arr); } const bufLength = buffer.length; let lineStart = 0 ; while (position < bufLength) { if (discardTrailingNewline) { if (buffer[position] === ControlChars.NewLine) { lineStart = ++position; } discardTrailingNewline = false ; } let lineEnd = -1 ; for (; position < bufLength && lineEnd === -1 ; ++position) { switch (buffer[position]) { case ControlChars.Colon: if (fieldLength === -1 ) { fieldLength = position - lineStart; } break ; case ControlChars.CarriageReturn: discardTrailingNewline = true ; case ControlChars.NewLine: lineEnd = position; break ; } } if (lineEnd === -1 ) { break ; } onLine(buffer.subarray(lineStart, lineEnd), fieldLength); lineStart = position; fieldLength = -1 ; } if (lineStart === bufLength) { buffer = undefined ; } else if (lineStart !== 0 ) { buffer = buffer.subarray(lineStart); position -= lineStart; } } }
这是一个行缓冲解析器 。
行缓冲解析器 :
行缓冲解析器是一种用于处理流式数据的解析器,它将输入数据分割成行,并提供回调函数来处理每一行(onLine方法)。
onLine
前面一大段代码就是一个滑动窗口,当遇到\r\n
时,会调用onLine
方法,将窗口内的数据传递给onLine
方法。onLine
方法是下面的getMessages
方法返回的方法,这里借用了闭包存储每一行的数据
getMessages 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 export function getMessages ( onId: (id: string ) => void , onRetry : (retry: number ) => void , onMessage ?: (msg: EventSourceMessage ) => void ) { let message = newMessage(); const decoder = new TextDecoder(); return function onLine (line: Uint8Array, fieldLength: number ) { if (line.length === 0 ) { onMessage?.(message); message = newMessage(); } else if (fieldLength > 0 ) { const field = decoder.decode(line.subarray(0 , fieldLength)); const valueOffset = fieldLength + (line[fieldLength + 1 ] === ControlChars.Space ? 2 : 1 ); const value = decoder.decode(line.subarray(valueOffset)); switch (field) { case 'data' : message.data = message.data ? message.data + '\n' + value : value; break ; case 'event' : message.event = value; break ; case 'id' : onId(message.id = value); break ; case 'retry' : const retry = parseInt (value, 10 ); if (!isNaN (retry)) { onRetry(message.retry = retry); } break ; } } } }
这个方法的逻辑是:
创建一个message
对象,用于存储每一行的数据
创建一个decoder
对象,用于解码line
中的数据
返回一个onLine
方法,用于处理每一行的数据
在onLine
方法中,会根据field
的值,将数据存储到message
对象中
当遇到\r\n
时,会调用onMessage
方法,将message
对象传递给onMessage
方法
1 2 3 4 5 6 data: 第一行数据 data: 第二行数据 此时会走 message.data + '\n' + value 最终message.data = '第一行数据\n第二行数据'
总结 本文详细介绍了在AI大模型开发中处理数据流的重要工具 @microsoft/fetch-event-source。主要包含以下几个要点:
SSE(Server-Sent Events)是一种服务器向浏览器发送实时数据的单向通信机制。
传统的EventSource对象虽然能处理SSE,但仅支持GET请求且不支持自定义头部,限制了其实际应用场景。
@microsoft/fetch-event-source提供了更灵活的解决方案:
支持POST请求和自定义头部
允许建立持久连接
支持双向数据流通信
在实际使用中需要注意几个关键点:
后端返回的数据流格式需要严格遵循<field>:<value>
的格式
处理多行文本时需要特别注意换行符的处理
建议使用封装好的通用处理方法来简化开发
源码层面,该库通过getBytes
、getLines
和getMessages
三层处理机制,实现了对SSE数据流的完整解析和处理。
这个工具为AI大模型应用开发提供了可靠的数据流处理方案,特别适合类似ChatGPT这样需要流式响应的应用场景。