【智能问数·6】用户断网了,对话还在——流式续流怎么做到断点不丢

发布时间:2026/6/30 7:19:41
【智能问数·6】用户断网了,对话还在——流式续流怎么做到断点不丢 这是【问数系统拆解】系列第6篇。上篇讲数据权限隔离这篇讲一个更基础的问题——流式续流。一个让人抓狂的场景上线第三天运营同学反馈“你们的问数系统网络抖一下就得重新问。”我看了一眼日志。用户问查下上海网点昨天的签收率系统开始流式返回结果SSE 连接正常。返回到一半用户网络抖动连接断了。用户刷新页面重新连接。系统从头开始回答。前面查了一半的数据没了。用户又等了一遍。这不是 bug是架构问题。传统 SSE 流式响应连接断了流就断了。服务端不知道客户端断了继续往黑洞里推数据。客户端重连后只能重新发起请求。断网 对话中断这个体验太差了。第一反应前端缓存最直接的想法前端把收到的事件缓存起来断线后从缓存恢复。但很快发现问题前端缓存有限— 用户可能问了一个复杂问题返回 100 个事件前端只缓存了前 50 个状态不一致— 服务端已经处理到第 80 个事件前端缓存只到第 50 个中间 30 个丢了后台任务— 有些任务是后台异步执行的前端根本不知道任务还在跑前端缓存解决不了问题必须在服务端做。核心设计Active Stream Registry重新设计后服务端维护一个活跃流注册表┌─────────────────────────────────────────────────────────────────┐ │ Active Stream Registry │ ├─────────────────────────────────────────────────────────────────┤ │ response_id_1 → _ActiveStreamEntry │ │ ├── events: [(seq_0, event_0), (seq_1, event_1), ...] │ │ └── subscribers: [client_A_queue, client_B_queue] │ │ │ │ response_id_2 → _ActiveStreamEntry │ │ ├── events: [(seq_0, event_0), ...] │ │ └── subscribers: [client_C_queue] │ └─────────────────────────────────────────────────────────────────┘关键点服务端持有事件历史— 不依赖前端缓存后台 Worker 独立运行— 不依赖客户端连接多个客户端可以订阅同一个流— 支持断线重连流程创建 → 订阅 → 断线 → 续流第一步创建流式响应POST /v1/responses { query: 查下上海网点昨天的签收率, session_id: sess_abc123 } → 创建 active stream entry → 启动后台 Worker → 返回 response_id: resp_xyz789 → SSE 推送事件给客户端第二步客户端订阅客户端通过 SSE 连接订阅事件流GET /v1/responses/resp_xyz789?streamtrue → 返回 SSE 事件流 → 每个事件带 sequence_number第三步断线用户网络抖动SSE 连接断了。关键客户端断开只触发 unsubscribe不影响后台 Worker。Worker 继续执行事件继续写入 replay buffer。第四步续流客户端重新连接从断点续流GET /v1/responses/resp_xyz789?streamtruestarting_after42 → 从 sequence_number 42 之后 replay → 继续订阅 live eventsReplay Buffer事件重放缓冲区class_ActiveStreamEntry:def__init__(self):self.events[]# [(sequence_number, raw_sse_line), ...]self.subscribers[]# [asyncio.Queue, ...]defadd_event(self,seq,event):self.events.append((seq,event))# 推送给所有订阅者forqueueinself.subscribers:queue.put_nowait((seq,event))defsubscribe(self,starting_afterNone):queueasyncio.Queue()self.subscribers.append(queue)ifstarting_afterisNone:# 没有 starting_afterreplay 全量forseq,eventinself.events:queue.put_nowait((seq,event))else:# 从指定位置之后 replayforseq,eventinself.events:ifseqstarting_after:queue.put_nowait((seq,event))returnqueuereplay 行为场景行为没有starting_afterreplay 当前 buffer 全量starting_after在 buffer 范围内只 replay 更晚事件starting_after早于或晚于 buffer 范围replay 当前 buffer 全量后台 Worker不依赖客户端连接classStreamWorker:asyncdefrun(self,response_id,query):entryregistry.get(response_id)seq0# 执行查询resultawaitexecute_query(query)# 流式返回结果forchunkinresult.chunks():eventcreate_sse_event(chunk)entry.add_event(seq,event)seq1# 标记完成entry.add_event(seq,create_done_event())关键设计Worker 不持有客户端连接— 只往 entry 里写事件客户端断开不影响 Worker— 只是少了一个订阅者Worker 完成后 entry 保留— 新客户端还能 replay多 Pod 的限制Active Stream Registry 是进程内内存不是共享存储。Pod A: 创建 response保存 active buffer Pod B: 收到 GET 请求内存里没有这个 buffer Pod B: 返回 no_active_stream (409)当前解决方案网关按X-Session-Id做 sticky routing。# 网关配置upstream:sticky_route:header:X-Session-Id hash_method:md5长期方案Redis Stream / PubSub 共享 stream registry任意实例都能 replay 和订阅 live events。断线续流的完整流程┌─────────────────────────────────────────────────────────────────┐ │ 1. POST /v1/responses │ │ → 创建 Active Stream Entry │ │ → 启动后台 Worker │ │ → 返回 response_id │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 2. SSE 连接建立 │ │ → 客户端订阅 Entry │ │ → 接收实时事件 │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 3. 网络抖动连接断开 │ │ → 客户端 unsubscribe │ │ → 后台 Worker 继续执行 │ │ → 事件继续写入 replay buffer │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 4. 客户端重连 │ │ → GET /v1/responses/{id}?streamtruestarting_after42 │ │ → replay seq 42 的事件 │ │ → 继续订阅 live events │ └─────────────────────────────────────────────────────────────────┘实际效果没有流式续流用户断网 → 对话中断 → 重新提问 → 从头等待用户体验沮丧浪费时间有流式续流用户断网 → 自动重连 → 从断点继续 → 无缝衔接用户体验流畅几乎无感知写在最后流式续流的核心是三个组件Active Stream Registry— 服务端持有事件历史不依赖前端缓存Replay Buffer— 支持从任意断点重放后台 Worker— 独立于客户端连接断网不影响执行传统做法断网 对话中断。新做法断网 自动续流。这个设计不是前端缓存一下就能解决的必须在服务端做。前端缓存有上限服务端 replay buffer 没有。