原理与踩坑经验)
本人在日常开发中遇到流式输出相关的问题一般都需要靠大模型协助定位问题归其根本是因为我对流式输出的原理认识不足。所以本篇文章记录我学习流式输出的原理以及在实际开发中遇到的问题。整体流程大模型生成 token↓打包成 chunk一个或多个 token↓SSE 协议封装data: {...}\n\n↓FastAPI StreamingResponse 转发↓前端 fetch ReadableStream有时候为了统一调用方式还需要把非openai 的调用封装成统一格式。流式输出的本质是看清大模型生成的数据如何被’切块’和’封装’像流水线一样在网络中流转最终在屏幕上实时解析、逐字显现形成我们看到的连贯对话。下面从底层协议开始逐个环节拆开讲。流式输出的原理流式输出Streaming Output指数据不是一次性全部返回而是像水流一样逐段、逐部分传输到接收端。大模型生成内容时通常按token-by-token方式逐步生成每生成一部分就可以立即发送给客户端而非等待整个内容生成完成。这种方式通常依赖持久化 HTTP 连接或Server-Sent Events (SSE) 协议服务端在生成每个文本块chunk后立即推送客户端可实时处理和展示适用于长文本生成、实时交互场景。token 与 chunk 的区别token 是模型生成的最小单位chunk 是网络传输的最小单位。模型逐个 token 生成但服务端会把若干 token 攒成一个 chunk 再发送减少网络请求次数。模型逐个生成 token[你] [好] [世] [界] [] [今] [天]服务端攒成 chunk 发送每次发送的 token 数量不固定┌─────────┐ ┌──────┐ ┌──────────────┐│ 你 好 世 │ → │ 界 │ → │ 今 天 │ → ...└─────────┘ └──────┘ └──────────────┘chunk 1 chunk 2 chunk 3(3个token) (2个token) (2个token)一个 chunk 可能包含 1 个 token也可能包含多个 token取决于服务端的缓冲策略。SSE 协议详解上面讲的 chunk 要在网络上传输需要一套协议来承载这就是 SSE。流式输出的底层协议一般是 SSEServer-Sent Events它是 W3C 标准基于 HTTP 长连接实现服务端推送。SSE 数据格式标准SSE 的消息由若干行组成每行以key: value的格式组织支持的字段字段含义示例data数据内容可多行拼接data: {text: 你好}event事件类型默认为messageevent: errorid事件 ID用于断线重连id: 12345retry重连间隔毫秒retry: 3000消息之间用空行\n\n分隔。标准 SSE 格式示例data: {choices:[{delta:{content:你好}}]}data: {choices:[{delta:{content:世}}]}data: [DONE]最后一行data: [DONE]是流结束标记但要注意某些非标准平台可能不发这个标记或者把它拼到前一条数据的末尾。核心 1大模型 API 的流式输出格式详解要理解整条链路首先要搞清楚大模型到底返回了什么。这部分以 OpenAI 格式为主线讲解原因有三GPT-3.5/4 是最早大规模商用的 LLM其 API 格式被开发者最先熟悉LangChain、LlamaIndex 等主流框架默认以 OpenAI 格式设计生态绑定深后来的云厂商阿里通义、火山引擎、DeepSeek、智谱为降低开发者迁移成本主动提供 OpenAI 兼容接口进一步巩固了其事实标准的地位。OpenAI 格式本身没有技术上不可替代的优势只是市场选择了它作为最大公约数。OpenAI 标准格式当设置streamTrue调用 OpenAI 兼容 API 时服务端返回的是 SSE 格式的流每个 chunk 被包在data: ...\n\n里data:后面是一个 JSON 对象data: {choices:[{delta:{content:你好},index:0,finish_reason:null}],id:chatcmpl-xxx,object:chat.completion.chunk}data: {choices:[{delta:{content:世},index:0,finish_reason:null}],id:chatcmpl-xxx,object:chat.completion.chunk}data: [DONE]其中每个 JSON 的结构如下{choices: [{delta: {content: 你好},index: 0,finish_reason: null}],id: chatcmpl-xxx,object: chat.completion.chunk}delta.content当前 chunk 的增量文本。第一个 chunk 的delta通常不带content而是带role: assistant后续 chunk 才有content。前端或后端拼接文本时如果直接取delta.content第一个 chunk 拿到空值是正常的不要因此跳过。finish_reason最后一个 chunk 为stop模型自然结束或lengthmax_tokens 截断其余为null。排查问题时注意区分这两种情况stop说明模型正常输出完length说明输出被截断了。非标准格式的常见差异点有些平台的 SSE 格式不严格遵循 OpenAI 规范常见的坑data:后面的空格差异标准data: {choices:...}有空格某些平台data:{choices:...}无空格解析时若按if line.startswith(data: )判断无空格的格式会被跳过[DONE]标记位置OpenAI单独一行data: [DONE]某些平台可能放在最后一条数据的末尾或根本不发chunk 内字段结构不同非 OpenAI 平台可能使用text而非delta.content或直接返回纯文本打印原始 SSE 数据进行对比import requestsresponse requests.post(url, jsonpayload, streamTrue)for line in response.iter_lines():if line:print(repr(line)) # 打印原始字节repr() 是 Python 内置函数返回对象的精确字符串表示会把不可见字符换行、空格等显示出来这是我调试流式问题时最常用的手段直接看原始数据比猜结构要快得多。核心 2LangChain 的流式输出原理LangChain 的ChatOpenAI内部调用 OpenAI API然后把原始 JSON chunk 转成AIMessageChunk对象每次 yield 对应 OpenAI 返回的一个 chunk。理解这层映射关系排查问题时才能知道 LangChain 返回的字段对应原始数据的哪个位置。AIMessageChunk 的结构在用 LangChain 处理流式输出时每个 chunk 是AIMessageChunk对象。它的核心字段chunk.content # 文本内容str大多数场景直接取这个chunk.tool_calls # 工具调用信息list模型调用 function calling 时用chunk.additional_kwargs # 额外元信息dict如部分平台的特殊字段一般场景下只用.content就够了。如果模型同时返回工具调用function calling就需要处理tool_calls。另外现在很多模型的输出会将思考内容与非思考内容分开思考内容存放在additional_kwargs中。OpenAI 原始 chunk 与 AIMessageChunk 的映射关系OpenAI 原始 chunkLangChainAIMessageChunk文本内容choices[0].delta.content.content工具调用choices[0].delta.tool_calls.tool_calls已解析为结构化对象额外字段JSON 顶层字段统一放入.additional_kwargs如思考内容结束标记finish_reason: stop流结束后单独处理chunk 本身无标记跨平台兼容只适用于 OpenAI 格式统一接口切换模型不改业务代码AIMessageChunk支持__add__累加可以用sum_chunk chunk把所有 chunk 累加得到完整的AIMessage。统一模型调用接口项目中同时依赖多个大模型平台时需要统一调用接口业务代码只写一套切换模型改配置就行。根据平台是否提供 OpenAI 兼容接口有两种方案方案一平台提供 OpenAI 兼容接口推荐直接替换base_url即可无需额外封装from langchain_openai import ChatOpenAImodel ChatOpenAI(modelqwen-plus,base_urlhttps://dashscope.aliyuncs.com/compatible-mode/v1,api_keyyour-key,streamingTrue)很多云厂商阿里云、火山引擎、DeepSeek都提供 OpenAI 兼容接口能直接这样用。方案二平台不提供兼容接口需要封装自定义 LangChain LLM 类本质是格式转换——把平台返回的自定义 chunk 格式转为 LangChain 的GenerationChunkfrom langchain_core.language_models.llms import BaseLLMfrom langchain_core.outputs import GenerationChunkimport httpxclass MyCustomLLM(BaseLLM):async def _astream(self, prompt, **kwargs):# 1. 调用平台的流式接口异步调用防阻塞async with httpx.AsyncClient() as client:async with client.stream(POST, api_url, json{prompt: prompt}) as resp:async for line in resp.aiter_lines():if line:# 2. 解析平台自定义格式提取文本chunk_text self._parse_custom_format(line)# 3. 转为 LangChain 标准格式yield GenerationChunk(textchunk_text)async def _agenerate(self, prompt, **kwargs):# 非流式走这里可以聚合流式结果return self._astream(prompt, **kwargs)关键就是三步调平台接口 - 解析自定义格式 - yield 标准GenerationChunk。解析流式输出的 AIMessageChunk 数据LangChain 的流式调用非常简洁不需要手动解析 SSE 格式from langchain_openai import ChatOpenAIfrom langchain_core.messages import HumanMessagemodel ChatOpenAI(modelgpt-4o-mini, streamingTrue)messages [HumanMessage(content你好介绍一下自己)]# 同步流式for chunk in model.stream(messages):print(chunk.content, end, flushTrue)# 异步流式FastAPI 等异步框架中使用async for chunk in model.astream(messages):print(chunk.content, end, flushTrue)每个chunk是AIMessageChunk对象.content是本次增量文本。如果需要拿到完整响应可以累加full_response async for chunk in model.astream(messages):full_response chunk.contentprint(full_response)核心 3FastAPI 服务的流式输出原理FastAPI 是一个基于 Python 的 Web 框架底层依赖Starlette一个轻量级 ASGI 框架。简单理解FastAPI Starlette 参数校验 API 文档生成。StreamingResponse 实际上是 Starlette 提供的FastAPI 直接拿过来用。基本实现from fastapi.responses import StreamingResponseasync def generate():async for chunk in model.astream(messages):yield fdata: {chunk.json()}\n\n # 注意这里需要手动封装成SSE格式app.post(/chat)async def chat():return StreamingResponse(generate(), media_typetext/event-stream)StreamingResponse 的完整流转过程简单理解一下就行为了理解 StreamingResponse 底层发生了什么我们从头到尾看一条请求的完整路径客户端发起 POST /chat↓UvicornASGI 服务器 接收 TCP 连接解析 HTTP 请求