前言

最近项目需求,开发与AI相关的功能,接触到一些AI大模型相关的技术,其中前端处理大模型返回的数据流是必不可少的一环,流式SSE处理工具fetch-event-source,可以很好的解决这个问题。

其次 MCP 也定义了两种标准的数据格式,stdio以及sse,其中stdio是标准输入输出(对本地集成和命令行工具特别有用),sse是服务器发送事件,这两种数据格式在AI大模型中非常常见。

什么是SSE

SSE(Server-Sent Events)是一种服务器向浏览器发送事件的机制。它允许服务器向浏览器发送实时数据,而不需要浏览器发起请求。SSE是一种单向的、基于HTTP的协议,适用于需要实时数据传输的场景。

传统处理方式

浏览器内置EventSource对象,可以很方便的实现SSE处理。不过这个EventSource有一个非常致命的缺点,那就是只支持GET类型的请求,并且不支持任何自定义的头部。需要与业务交互,显然是不可接受的。

EventSource官方链接

1
2
3
4
5
6
const response = await fetch(url, {
method: 'GET',
headers: {
'Content-Type': 'text/event-stream',
},
});

@microsoft/fetch-event-source

@microsoft/fetch-event-source 是一个用于处理服务器发送事件(SSE)的工具,它提供了一种方便的方式来接收和处理来自服务器的实时数据流。这个包的主要作用是提供一个遵循 WHATWG Fetch 标准的 API 来处理 SSE。实际上目前的ChatGPT实现的双向信息流也是基于这个库进行开发的

  • 允许我们可以和对应的url地址简历持久连接

  • 允许我们在接收数据流信息的同时将我们想要发送的消息也通过相同的url进行发送

安装

1
npm install @microsoft/fetch-event-source

使用

1
2
3
4
5
6
7
import { fetchEventSource } from '@microsoft/fetch-event-source';

fetchEventSource(url, {
onmessage: (event) => {
console.log(event);
},
});

简单封装

封装参考开源项目 Wren AI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import { EventStreamContentType, fetchEventSource } from '@microsoft/fetch-event-source';

commonSseProcess(
url: string,
params: {
options: any;
signal?: AbortSignal;
startCallback: (chunk: string) => void;
messageRecived: (chunk: string, eventName?: string) => void;
doneCallback: (chunk: string) => void;
errorCallback: (error: string) => void;
},
) {
fetchEventSource(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
signal: params.signal,
body: JSON.stringify({
...params.options,
}),
onopen: async (response) => {
const resStatus = response.status;
const resStatusText = response.statusText;
if (response.ok && response.headers.get('content-type') === EventStreamContentType) {
// everything's good
} else {
this.createErrorPromise(resStatus, resStatusText, response);
}
},
onmessage(eventMessage) {
if (eventMessage.event === 'START') {
params.startCallback(eventMessage.data);
return;
} else if (eventMessage.event === 'ERROR') {
console.log(`error:${eventMessage}`);
params.errorCallback(eventMessage.data);
return;
} else if (eventMessage.event === 'END') {
params.doneCallback(eventMessage.data);
return;
}
// 会自动处理后端返回内容的首个空格,需在后端的返回内容前多加个空格,相关源码:https://github.com/Azure/fetch-event-source/blob/45ac3cfffd30b05b79fbf95c21e67d4ef59aa56a/src/parse.ts#L129-L133
params.messageRecived(eventMessage.data, eventMessage.event);
},
onerror(error) {
console.log(`sse error:${error}`);
params.errorCallback(error);
throw error;
},
openWhenHidden: true,
});
}

注意点

  1. 后端返回的数据流<field>:<value> fetch-event-source只会处理冒号,后端返回的数据流为<field>:<value>后端会处理冒号和冒号后面的空格

相关源码:

https://github.com/Azure/fetch-event-source/blob/45ac3cfffd30b05b79fbf95c21e67d4ef59aa56a/src/parse.ts#L129-L133

  1. 当响应结果的content中包含有多行文本时,SSE会将包含有换行符的那一行内容替换为空字符串,故需要先将换行符与后面的内容拆分并转成,前端碰到换行标志时转成换行符处理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    messageRecived: (chunk: string, eventName: STREAM_EVENT) => {
    const { question } = options || {};

    /**
    * 当响应结果的content中包含有多行文本时,
    * SSE会将包含有换行符的那一行内容替换为空字符串,
    * 故需要先将换行符与后面的内容拆分并转成,前端碰到换行标志时转成换行符处理
    */
    const processedChunk = chunk ? chunk.replace('-_-_wrap_-_-', '\r\n') : chunk;

    if (onMessage) onMessage(processedChunk, eventName, conversationId, question);

    try {
    const parsedData = JSON.parse(processedChunk);
    setData(parsedData as T);
    } catch (e) {
    console.error('解析SSE消息失败:', e);
    }
    },
    这个问题非常的隐蔽且难以理解,问题过程如下(后续会解释一下源码里的处理流程)
    假设服务器发送了这样的数据:
    1
    data: Hello\nWorld
    根据 SSE (Server-Sent Events) 规范,当数据中包含换行符 \n 时,应该被拆分成多个 “data:” 行发送:
    1
    2
    data: Hello
    data: World

让我们看看数据是如何在代码中流动的:

首先在 getLines 函数中:

getLines源码

1
2
3
4
// 当检测到换行符时,会将数据切分成行
case ControlChars.NewLine:
lineEnd = position;
break;

然后在 getMessages 函数中:

getMessages源码

1
2
3
4
5
case 'data':
message.data = message.data
? message.data + '\n' + value
: value;
break;

问题就出在这里:

当第一行 data: Hello 到达时:message.data 为空,所以 message.data = “Hello”

当第二行 data: World 到达时:message.data 已经有值,所以执行 message.data + ‘\n’ + value

变成 “Hello\nWorld”

但是由于 SSE 的处理机制,这个包含换行符的内容可能会被错误地解析,导致内容被清空或丢失。

@microsoft/fetch-event-source 源码解析

1. 核心方法fetchEventSource

核心方法源码

入参

1
2
3
4
5
6
7
8
9
10
11
fetchEventSource(input: RequestInfo, { // input为请求的url地址,后续fetch(input, ...)
signal: inputSignal, // 请求的信号,用于取消请求
headers: inputHeaders, // 请求的头部,用于设置请求的头部
onopen: inputOnOpen, // 请求打开时的回调
onmessage, // 收到消息时的回调
onclose, // 请求关闭时的回调
onerror, // 请求错误时的回调
openWhenHidden,
fetch: inputFetch, // 请求的fetch方法,用于自定义请求
...rest // 其他参数
}: FetchEventSourceInit)

其中openWhenHidden的默认值为false,if false, 后续 document.addEventListener('visibilitychange', onVisibilityChange);

1
2
3
4
5
6
function onVisibilityChange() {
curRequestController.abort(); // 每次可见性变化时关闭现有的请求
if (!document.hidden) {
create(); // 页面现在可见,重新创建请求。
}
}

数据处理

数据处理主要是getBytes getLines getMessages,整个代码的核心部分像是一个洋葱圈模型,分别使用闭包记录了每一层的数据,从文本 -> 行 -> 消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
await getBytes(
response.body!,
getLines(
getMessages(id => {
if (id) {
// store the id and send it back on the next retry:
headers[LastEventId] = id;
} else {
// don't send the last-event-id header anymore:
delete headers[LastEventId];
}
},
retry => {
retryInterval = retry;
},
onmessage
)
));
getBytes

这个方法非常的简单,仅仅是读取后端返回的流,并调用onChunk回调函数,将流中的数据传递给onChunk,这个onChunk就是getLines返回的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Converts a ReadableStream into a callback pattern.
* @param stream The input ReadableStream.
* @param onChunk A function that will be called on each new byte chunk in the stream.
* @returns {Promise<void>} A promise that will be resolved when the stream closes.
*/
export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) {
const reader = stream.getReader();
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
onChunk(result.value);
}
}
getLines

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/** 
* 将任意字节块解析为 EventSource 行缓冲区。
* 每行应为 "field: value" 格式,并以 \r、\n 或 \r\n 结尾。
* @param onLine 每次遇到新行时调用的函数。
* @returns 一个应该为每个传入字节块调用的函数。
*/
export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void) {
let buffer: Uint8Array | undefined;
let position: number; // 当前读取位置
let fieldLength: number; // 行的字段长度
let discardTrailingNewline = false;

// 返回一个可以处理每个传入字节块的函数:
return function onChunk(arr: Uint8Array) {
if (buffer === undefined) {
buffer = arr;
position = 0;
fieldLength = -1;
} else {
// 我们还在解析旧行。将新字节附加到缓冲区:
buffer = concat(buffer, arr);
}

const bufLength = buffer.length;
let lineStart = 0; // 当前行的起始位置
while (position < bufLength) {
if (discardTrailingNewline) {
if (buffer[position] === ControlChars.NewLine) {
lineStart = ++position; // 跳到下一个字符
}

discardTrailingNewline = false;
}

// 向前查找直到行结束:
let lineEnd = -1; // 行结束符的位置
for (; position < bufLength && lineEnd === -1; ++position) {
switch (buffer[position]) {
case ControlChars.Colon:
if (fieldLength === -1) { // 第一个冒号
fieldLength = position - lineStart;
}
break;
// @ts-ignore:7029 \r case below should fallthrough to \n:
case ControlChars.CarriageReturn:
discardTrailingNewline = true;
case ControlChars.NewLine:
lineEnd = position;
break;
}
}

if (lineEnd === -1) {
// 我们到达了缓冲区的末尾,但行还没有结束。
// 等待下一个 arr 并继续解析:
break;
}

// 我们到达了行结束,发送出去:
onLine(buffer.subarray(lineStart, lineEnd), fieldLength);
lineStart = position; // 我们现在在下一行
fieldLength = -1;
}

if (lineStart === bufLength) {
buffer = undefined; // 我们读完了
} else if (lineStart !== 0) {
// 创建一个从 lineStart 开始的新视图到 buffer,这样我们就不需要在新 arr 时复制前面的行:
buffer = buffer.subarray(lineStart);
position -= lineStart;
}
}
}

这是一个行缓冲解析器

行缓冲解析器

行缓冲解析器是一种用于处理流式数据的解析器,它将输入数据分割成行,并提供回调函数来处理每一行(onLine方法)。

onLine前面一大段代码就是一个滑动窗口,当遇到\r\n时,会调用onLine方法,将窗口内的数据传递给onLine方法。onLine方法是下面的getMessages方法返回的方法,这里借用了闭包存储每一行的数据

getMessages

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/** 
* 将行缓冲区解析为 EventSourceMessages。
* @param onId 每次遇到 `id` 字段时调用的函数。
* @param onRetry 每次遇到 `retry` 字段时调用的函数。
* @param onMessage 每次遇到消息时调用的函数。
* @returns 一个应该为每个传入行缓冲区调用的函数。
*/
export function getMessages(
onId: (id: string) => void,
onRetry: (retry: number) => void,
onMessage?: (msg: EventSourceMessage) => void
) {
let message = newMessage();
const decoder = new TextDecoder();

// 返回一个可以处理每个传入行缓冲区的函数:
return function onLine(line: Uint8Array, fieldLength: number) {
if (line.length === 0) {
// 空行表示消息结束。触发回调并开始新消息:
onMessage?.(message);
message = newMessage();
} else if (fieldLength > 0) { // 排除注释和没有值的行
// line is of format "<field>:<value>" or "<field>: <value>"
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
const field = decoder.decode(line.subarray(0, fieldLength));
const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);
const value = decoder.decode(line.subarray(valueOffset));

switch (field) {
case 'data':
// 如果此消息已经有数据,则将新值附加到旧值。
// 否则,只需设置为新值:
message.data = message.data
? message.data + '\n' + value
: value; // 否则,只需设置为新值:
break;
case 'event':
message.event = value;
break;
case 'id':
onId(message.id = value);
break;
case 'retry':
const retry = parseInt(value, 10);
if (!isNaN(retry)) { // 根据规范,忽略非整数
onRetry(message.retry = retry);
}
break;
}
}
}
}

这个方法的逻辑是:

  1. 创建一个message对象,用于存储每一行的数据

  2. 创建一个decoder对象,用于解码line中的数据

  3. 返回一个onLine方法,用于处理每一行的数据

  4. onLine方法中,会根据field的值,将数据存储到message对象中

  5. 当遇到\r\n时,会调用onMessage方法,将message对象传递给onMessage方法

1
2
3
4
5
6
data: 第一行数据
data: 第二行数据

此时会走 message.data + '\n' + value

最终message.data = '第一行数据\n第二行数据'

总结

本文详细介绍了在AI大模型开发中处理数据流的重要工具 @microsoft/fetch-event-source。主要包含以下几个要点:

  1. SSE(Server-Sent Events)是一种服务器向浏览器发送实时数据的单向通信机制。

  2. 传统的EventSource对象虽然能处理SSE,但仅支持GET请求且不支持自定义头部,限制了其实际应用场景。

  3. @microsoft/fetch-event-source提供了更灵活的解决方案:

    • 支持POST请求和自定义头部
    • 允许建立持久连接
    • 支持双向数据流通信
  4. 在实际使用中需要注意几个关键点:

    • 后端返回的数据流格式需要严格遵循<field>:<value>的格式
    • 处理多行文本时需要特别注意换行符的处理
    • 建议使用封装好的通用处理方法来简化开发
  5. 源码层面,该库通过getBytesgetLinesgetMessages三层处理机制,实现了对SSE数据流的完整解析和处理。

这个工具为AI大模型应用开发提供了可靠的数据流处理方案,特别适合类似ChatGPT这样需要流式响应的应用场景。