
AI 系统性能优化从请求到输出的实战指南一、性能瓶颈往往不在模型本身AI 推理服务涉及多个环节网关、负载均衡、请求调度、Tokenize、模型推理、后处理、响应序列化等。任何一个环节卡住整体性能都会受影响。之前排查过一个案例模型推理 P50 只有 200ms但用户感知的端到端延迟却高达 1.8s。后来发现请求序列化占了 400msTokenize 占了 300ms后处理模板渲染占了 500ms。模型本身只占总延迟的 11%。二、请求链路分析与关键指标完整的请求链路如下graph LR A[客户端请求] -- B[API Gatewaybr/限流/认证] B -- C[负载均衡br/请求路由] C -- D[请求调度器br/Batch组装] D -- E[预处理br/Tokenize/Padding] E -- F[模型推理br/GPU计算] F -- G[后处理br/Detokenize/模板] G -- H[响应序列化br/JSON/SSE] H -- I[客户端接收] style F fill:#ff6b6b,stroke:#333,color:#fff style D fill:#ffd93d,stroke:#333 style G fill:#ffd93d,stroke:#333红色节点是通常认为的瓶颈黄色节点是常被忽视但可能成为瓶颈的环节。优化前需要先建立全链路的可观测性。关键指标指标含义采集位置TTFT首 Token 延迟调度器 → 首 Token 生成TPOT每 Token 延迟相邻 Token 间隔E2E Latency端到端延迟客户端发出 → 接收完成Throughput系统吞吐量单位时间完成请求数GPU UtilizationGPU 利用率SM 活跃率三、优化实践3.1 性能数据采集import time from dataclasses import dataclass dataclass class RequestMetrics: request_id: str gateway_ts: float 0.0 schedule_ts: float 0.0 first_token_ts: float 0.0 last_token_ts: float 0.0 output_tokens: int 0 property def ttft_ms(self) - float: if self.first_token_ts 0 or self.schedule_ts 0: return -1.0 return (self.first_token_ts - self.schedule_ts) * 1000 property def e2e_ms(self) - float: if self.last_token_ts 0 or self.gateway_ts 0: return -1.0 return (self.last_token_ts - self.gateway_ts) * 10003.2 自适应批调度class AdaptiveBatchScheduler: def __init__(self, min_batch1, max_batch64, target_gpu_util0.85): self.min_batch min_batch self.max_batch max_batch self.target_gpu_util target_gpu_util self.current_batch min_batch def compute_batch_size(self, queue_depth, gpu_util, kv_cache_usage): if kv_cache_usage 0.9: self.current_batch max(self.min_batch, int(self.current_batch * 0.7)) return self.current_batch if gpu_util self.target_gpu_util - 0.1: scale min(1.2, self.target_gpu_util / max(gpu_util, 0.1)) self.current_batch min( int(self.current_batch * scale), self.max_batch, queue_depth ) elif gpu_util self.target_gpu_util 0.1: self.current_batch max(self.min_batch, int(self.current_batch * 0.9)) return self.current_batch3.3 流式响应处理class StreamingResponseHandler: def __init__(self, metrics): self.metrics metrics async def stream_tokens(self, token_generator): token_count 0 self.metrics.first_token_ts time.time() async for token_id in token_generator: token_count 1 yield fdata: {{\token\: {token_id}, \count\: {token_count}}}\n\n self.metrics.last_token_ts time.time() self.metrics.output_tokens token_count yield data: [DONE]\n\n3.4 Tokenize 并行化from concurrent.futures import ThreadPoolExecutor class ParallelTokenizer: def __init__(self, tokenizer, num_workers4): self.tokenizer tokenizer self.executor ThreadPoolExecutor(max_workersnum_workers) def encode_batch(self, texts, max_length4096): futures [self.executor.submit(self._encode_single, text, max_length) for text in texts] results [] for future in futures: try: results.append(future.result(timeout5.0)) except: results.append(None) return results def _encode_single(self, text, max_length): return self.tokenizer(text, max_lengthmax_length, truncationTrue, return_tensorspt)四、优化权衡批大小调优增大可提升吞吐但 TTFT 会上升。在线场景 TTFT 超过 2s 用户就会流失。建议批大小 min(队列深度, 显存允许最大批, 延迟约束最大批)。流式输出降低感知延迟但增加网络开销。1000 个 Token 就是 1000 次 HTTP 分片。弱网环境下可攒 2-3 个 Token 再推送。预处理并行化Python 的 GIL 限制多线程收益。如果 Tokenize 确实是瓶颈考虑用 C 扩展或多进程方案。优化环节收益代价优先级全链路可观测性定位瓶颈5-10% CPU 开销P0自适应批调度20-40% 吞吐提升调度延迟增加P0流式输出感知延迟降 60%网络包数增加P1Tokenize 并行化预处理快 3-4x内存增加P2五、总结性能优化的核心先建可观测性再定位瓶颈最后针对性优化。不要凭直觉优化不要优化非瓶颈环节。优化是持续迭代的过程。上线后持续监控发现新瓶颈再优化。每一轮都要用数据说话——优化前后的 P50/P99 延迟、吞吐量、资源利用率缺一不可。