
1. 项目概述为什么用 Pydantic 和 LangChain 构建 ML 后端不是“炫技”而是生存必需你有没有遇到过这样的情况前端传过来一个 JSON字段名拼错了半个字母后端直接抛出KeyError整个 API 就挂了或者用户在对话框里输入了一段带换行、含特殊符号的长文本LLM 接口调用时突然报422 Unprocessable Entity日志里只有一行模糊的validation error又或者你刚上线一个 RAG 应用测试时一切正常结果真实用户上传了一份 300 页的 PDF系统在解析阶段就内存爆满连错误堆栈都来不及打出来就 OOM 退出这些不是边缘 case而是每天都在真实生产环境里反复上演的“安静崩溃”。我过去三年带过 7 个 AI 工程化落地项目其中 5 个在第一轮灰度发布时80% 的线上告警都来自数据校验缺失——不是模型不准是输入根本没进到模型里。这正是 Pydantic 和 LangChain 组合的价值起点它不解决“模型好不好”的问题而是解决“系统稳不稳”的问题。Pydantic 是 Python 生态里最成熟的数据验证与序列化框架它的核心不是“类型提示”而是“契约式输入控制”——你定义的每一个BaseModel本质上就是一份运行时强制执行的接口协议LangChain 则提供了面向 LLM 应用的标准化抽象层把 prompt 构造、chain 编排、tool 调用这些高耦合操作变成可插拔、可测试、可监控的组件。二者结合相当于给你的 ML 后端装上了两道硬性闸门第一道Pydantic拦住所有格式错误、范围越界、空值滥用的非法请求第二道LangChain把 LLM 这个“黑盒大脑”封装成受控的、有输入输出边界的“白盒服务单元”。关键词Artificial Intelligence在这里不是泛泛而谈的技术标签而是特指那些需要与人类自然语言交互、依赖外部非结构化数据、且对响应鲁棒性要求极高的 AI 服务场景——比如客服对话引擎、智能文档摘要系统、代码辅助生成器。这类系统失败的成本从来不是“结果不准”而是“服务不可用”。所以这不是一个关于“怎么写更酷的 AI 代码”的教程而是一份我在多个客户现场踩坑后总结出来的、保障 AI 服务能活过第一个月的工程实践手册。2. 整体设计思路为什么不是“加个 Pydantic 就完事”而是重构整个数据流契约2.1 传统 ML 后端的三个典型脆弱点很多团队在构建 AI 后端时习惯沿用传统 Web 开发的思维用 Flask/FastAPI 写个路由接收request.json()直接塞给模型函数再jsonify()返回。这种模式在 demo 阶段很轻快但一旦进入真实环境立刻暴露三大结构性缺陷输入无契约request.json()返回的是一个裸dict字段是否存在、类型是否匹配、值是否在合理范围内全靠开发者在函数内部用if手动判断。我见过最典型的例子是一个用于提取合同关键条款的 API要求document_text字段为非空字符串但开发人员只写了if document_text in data:结果用户传了个{document_text: null}程序一路执行到模型 tokenize 阶段才报错而此时日志里没有任何关于输入非法的明确记录。LLM 调用无边界直接调用llm.invoke(prompt)等于把模型当成一个没有输入/输出规范的“魔法盒子”。当 prompt 模板里需要插入用户输入时如果用户输入包含{或}可能直接破坏 Jinja 模板语法如果输入长度超过模型上下文限制错误往往发生在 tokenization 或 attention 计算阶段堆栈信息晦涩难懂根本无法定位是用户输入过长还是系统预设的 prompt 太臃肿。链路不可观测从接收到请求到构造 prompt到调用 LLM再到解析响应整个流程像一条黑管道。当响应质量下降时你无法快速判断是原始输入质量差、prompt 设计不合理、还是 LLM 自身波动。没有中间状态的结构化记录debug 只能靠猜。2.2 Pydantic LangChain 的分层防御设计哲学我们采用的不是简单叠加而是基于“责任分离”原则的三层契约设计第一层API 入口契约Pydantic v2使用BaseModel定义严格的数据模型不仅声明字段类型更要嵌入业务语义约束。例如对于一个问答 API我们不会只写question: str而是定义class QuestionRequest(BaseModel): question: str Field(..., min_length2, max_length2000, description用户提出的问题必须是中文长度在2-2000字符之间) session_id: Optional[str] Field(defaultNone, patternr^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$, description标准 UUIDv4 格式的会话ID用于追踪上下文) timeout_seconds: float Field(default30.0, ge5.0, le120.0, description最大等待时间单位秒必须在5-120之间)这里min_length、max_length、pattern、ge/le不是装饰而是运行时强制执行的守门员。FastAPI 会自动将请求体解析并验证一旦失败直接返回 422 状态码和清晰的错误详情如{question: [String should have at least 2 characters]}根本不会让非法数据进入业务逻辑层。第二层LLM 交互契约LangChain 的 Runnable 接口LangChain v0.1 引入了Runnable协议它强制要求每个组件RunnableLambda,RunnablePassthrough,RunnableParallel都实现invoke(input)方法并约定输入输出类型。我们不再写llm(prompt)而是构建一个RunnableSequencefrom langchain_core.runnables import RunnableSequence, RunnablePassthrough from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 1. 输入预处理确保输入符合 LLM 要求 def validate_and_truncate_input(data: dict) - dict: # 基于 Pydantic 模型做二次校验如检查 document_text 是否含敏感词 # 并对超长文本进行智能截断保留关键段落非简单切片 return {cleaned_text: truncate_and_clean(data[document_text])} # 2. Prompt 构造使用 ChatPromptTemplate天然防注入 prompt ChatPromptTemplate.from_messages([ (system, 你是一个专业的法律文书分析助手。请严格按以下JSON格式输出{{\clause\: \...\, \reason\: \...\}}), (human, 请分析以下合同条款{cleaned_text}) ]) # 3. LLM 调用ChatOpenAI 实例本身就是一个 Runnable llm ChatOpenAI(modelgpt-4-turbo, temperature0.0) # 4. 响应解析定义输出模型强制结构化 class AnalysisOutput(BaseModel): clause: str Field(description提取出的关键条款原文) reason: str Field(description该条款被认定为关键的理由) parser PydanticToolsParser(tools[AnalysisOutput]) # 组装完整链路 chain RunnableSequence( RunnablePassthrough.assign(cleaned_textvalidate_and_truncate_input), prompt, llm, parser )这个chain就是一个强类型的、可测试的、有明确输入输出边界的“服务单元”。它的输入必须是dict输出必须是AnalysisOutput实例。任何环节出错都会在对应步骤抛出可捕获的异常而不是在深处静默失败。第三层可观测性契约LangChain Callbacks 自定义 Tracer我们利用 LangChain 的CallbackManager注入自定义BaseCallbackHandler在on_chain_start、on_llm_end、on_tool_end等钩子中记录结构化事件class AITracer(BaseCallbackHandler): def on_chain_start(self, serialized: dict, inputs: dict, **kwargs) - None: # 记录链路开始时间、输入数据摘要脱敏后、session_id log_event(chain_start, { chain_name: serialized.get(name, unknown), input_hash: hash_dict(inputs), # 对输入做哈希避免日志泄露敏感信息 session_id: inputs.get(session_id) }) def on_llm_end(self, response: LLMResult, **kwargs) - None: # 记录 LLM 调用耗时、token 使用量、原始响应摘要 log_event(llm_end, { duration_ms: response.llm_output.get(token_usage, {}).get(total_tokens, 0), response_preview: response.generations[0][0].text[:100] ... })这些事件被发送到统一日志中心如 Loki或指标系统如 Prometheus使得“一次请求的完整生命周期”可以被回溯、聚合、告警。当发现某类请求的平均延迟突增我们可以直接下钻到llm_end事件看是 token 用量暴涨还是特定 prompt 模板导致响应变慢。这个三层设计本质上是把“AI 服务”从一个不可控的黑盒重构为一个由清晰契约定义、各层职责分明、错误可定位、性能可度量的现代软件系统。它不增加功能但极大提升了系统的韧性、可维护性和可演进性。3. 核心细节解析Pydantic 模型设计与 LangChain Chain 构建的实操要点3.1 Pydantic 模型不只是类型声明更是业务规则的代码化表达很多人把 Pydantic 当作“带校验的 dataclass”这是巨大的认知偏差。在 AI 后端中一个高质量的 Pydantic 模型必须承载三重职责数据结构定义、业务规则编码、错误友好提示。下面以一个真实的智能会议纪要生成服务为例拆解其核心模型的设计逻辑。3.1.1 输入模型QuestionRequest 的深度设计from pydantic import BaseModel, Field, validator, root_validator from typing import List, Optional, Dict, Any import re class SpeakerInfo(BaseModel): 发言人信息用于后续角色识别 name: str Field(..., min_length1, max_length50) role: str Field(..., patternr^(attendee|chair|secretary|observer)$) class MeetingInput(BaseModel): 会议原始输入数据 audio_url: str Field(..., description会议录音文件的可公开访问 URL) speakers: List[SpeakerInfo] Field(..., min_items1, max_items20) meeting_topic: str Field(..., min_length5, max_length200) validator(audio_url) def validate_audio_url(cls, v): if not v.startswith((http://, https://)): raise ValueError(audio_url must be a valid HTTP/HTTPS URL) if not v.lower().endswith((.mp3, .wav, .m4a)): raise ValueError(audio_url must point to an audio file (mp3, wav, m4a)) return v root_validator(preTrue) def check_speaker_names_unique(cls, values): speakers values.get(speakers, []) names [s.get(name) for s in speakers] if len(names) ! len(set(names)): raise ValueError(All speaker names must be unique) return values class QuestionRequest(BaseModel): 最终的 API 请求体模型 input: MeetingInput Field(..., description会议原始输入) summary_length: int Field(default300, ge100, le1000, description期望摘要字数100-1000之间) include_action_items: bool Field(defaultTrue, description是否在摘要中包含待办事项) language: str Field(defaultzh, patternr^[a-z]{2}$, description输出语言代码如 zh, en, ja) validator(language) def validate_language_supported(cls, v): supported {zh, en, ja, ko, es} if v not in supported: raise ValueError(fLanguage {v} is not supported. Supported: {supported}) return v这个模型的精妙之处在于嵌套模型复用MeetingInput和SpeakerInfo是独立的、可复用的模型。它们可以在其他 API如“添加发言人”、“更新会议信息”中被直接引用保证了领域模型的一致性。这比在QuestionRequest里写一堆扁平字段要专业得多。多级校验策略validator用于单字段校验如 URL 格式、音频后缀root_validator用于跨字段校验如发言人姓名唯一性。preTrue表示在校验字段类型之前就执行可以处理原始输入中的潜在问题。业务语义注入summary_length的ge100, le1000不是技术限制而是产品需求——摘要太短失去意义太长违背“纪要”本质。include_action_items的默认值True体现了产品默认行为而非技术默认。提示永远不要在validator中做耗时操作如网络请求、数据库查询。校验函数必须是纯函数执行时间应控制在毫秒级。复杂业务逻辑如检查 URL 是否可访问应放在后续的业务服务层而非模型校验层。3.1.2 输出模型SummaryResponse 的健壮性设计输出模型同样重要它决定了前端如何安全地消费 API 响应。一个糟糕的输出模型会让前端陷入try/catch泛滥的泥潭。from datetime import datetime from pydantic import BaseModel, Field from typing import List, Optional class ActionItem(BaseModel): 待办事项条目 description: str Field(..., min_length10) assignee: str Field(..., min_length1) due_date: Optional[str] Field(defaultNone, patternr^\d{4}-\d{2}-\d{2}$) class SummaryResponse(BaseModel): 会议纪要生成的响应体 request_id: str Field(..., description本次请求的唯一标识符用于问题追踪) timestamp: datetime Field(default_factorydatetime.utcnow, description响应生成时间戳) status: str Field(defaultsuccess, patternr^(success|partial|failed)$) summary: str Field(..., min_length50, max_length2000) action_items: List[ActionItem] Field(default_factorylist) warnings: List[str] Field(default_factorylist, description非致命警告如音频质量较差可能影响识别准确率) metadata: Dict[str, Any] Field(default_factorydict, description调试用元数据如 token 使用量、处理耗时) property def is_success(self) - bool: return self.status success def to_frontend_dict(self) - dict: 为前端定制的简化输出移除敏感/调试字段 return { summary: self.summary, action_items: [item.dict() for item in self.action_items], warnings: self.warnings }这个模型的关键设计点状态机式status字段明确区分success完全成功、partial摘要生成成功但 action items 为空、failed完全失败。前端可以根据status做差异化 UI 渲染而不是只看 HTTP 状态码。warnings字段这是提升用户体验的神来之笔。当音频识别置信度低于阈值时我们不中断流程而是将警告放入warnings数组前端可以显示一个温和的提示“注意本纪要基于语音识别生成建议人工复核。” 这比一个冰冷的 500 错误要人性化得多。to_frontend_dict()方法清晰地划定了“API 响应”和“前端展示数据”的边界。metadata字段包含所有调试信息如processing_time_ms: 1245.67, input_tokens: 1280但to_frontend_dict()主动过滤掉它们避免前端意外暴露敏感信息。3.2 LangChain Chain 构建从“能跑”到“可运维”的关键跃迁构建一个 LangChain Chain绝不是把几个组件|起来就完事。真正的工程化体现在对每个环节的精细化控制上。我们以一个 RAG检索增强生成应用的 Chain 为例展示其核心构建模块。3.2.1 数据预处理链PreprocessorChain这是整个链路的“第一道安检”负责将原始用户输入转化为 LLM 可安全消费的格式。from langchain_core.runnables import RunnableLambda from langchain_text_splitters import RecursiveCharacterTextSplitter import re def clean_user_input(text: str) - str: 清理用户输入去除多余空白、转义危险字符、标准化换行 # 去除首尾空白和连续空白 text re.sub(r\s, , text.strip()) # 将 Windows 换行 \r\n 替换为 Unix 换行 \n text text.replace(\r\n, \n) # 移除可能干扰 prompt 模板的特殊字符非删除而是转义 text text.replace({, {{).replace(}, }}) return text def split_and_filter_chunks(text: str) - List[str]: 智能分块先按段落切分再对长段落递归切分最后过滤掉过短的块 # 第一步按双换行切分段落 paragraphs [p.strip() for p in text.split(\n\n) if p.strip()] # 第二步对超过 500 字符的段落用 RecursiveCharacterTextSplitter 进行递归切分 splitter RecursiveCharacterTextSplitter( chunk_size300, chunk_overlap50, separators[\n\n, \n, , ] ) chunks [] for para in paragraphs: if len(para) 500: chunks.append(para) else: chunks.extend(splitter.split_text(para)) # 第三步过滤掉少于 20 字符的块通常是标题、页眉页脚 return [c for c in chunks if len(c) 20] # 组装预处理器链 preprocessor_chain ( RunnableLambda(lambda x: x[question]) # 提取 question 字段 | RunnableLambda(clean_user_input) | RunnableLambda(split_and_filter_chunks) )这个preprocessor_chain的价值在于防御性编程clean_user_input中的{{/}}转义是防止用户输入破坏后续ChatPromptTemplate的关键。如果没有这一步用户输入{{user_input}}就会直接被 Jinja 解析为变量导致模板渲染失败。语义感知分块不是简单地按固定长度切分而是先尊重段落结构再对长段落进行递归切分。这保证了每个 chunk 都是一个语义相对完整的单元极大提升了向量检索的准确性。可测试性每个RunnableLambda都是一个独立的、可单元测试的函数。你可以轻松写测试用例def test_clean_user_input(): assert clean_user_input( Hello\r\nWorld ) Hello\nWorld assert clean_user_input({{secret}}) {{{secret}}}3.2.2 检索与生成链RAGChain这是 RAG 的核心我们将检索Retrieval和生成Generation两个阶段解耦并加入重排序Re-ranking环节。from langchain_core.runnables import RunnableParallel, RunnablePassthrough from langchain_core.prompts import ChatPromptTemplate from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import CrossEncoderReranker from langchain_community.cross_encoders import HuggingFaceCrossEncoder # 1. 初始化向量库和重排序器 embeddings OpenAIEmbeddings(modeltext-embedding-3-small) vectorstore Chroma(persist_directory./db, embedding_functionembeddings) # 使用 HuggingFace 的 cross-encoder 进行重排序 model_name BAAI/bge-reranker-base reranker CrossEncoderReranker(modelHuggingFaceCrossEncoder(model_namemodel_name), top_n5) compression_retriever ContextualCompressionRetriever( base_compressorreranker, base_retrievervectorstore.as_retriever(search_kwargs{k: 20}) ) # 2. 构建 Prompt 模板 prompt ChatPromptTemplate.from_messages([ (system, 你是一个专业的知识库问答助手。请严格根据提供的上下文回答问题。 如果上下文没有提供足够信息请回答根据现有资料无法确定。 不要编造答案也不要提及根据上下文等字眼。), (human, 问题{question}\n\n上下文{context}) ]) # 3. LLM llm ChatOpenAI(modelgpt-4-turbo, temperature0.0) # 4. 组装最终 RAG Chain rag_chain ( # 并行执行同时获取检索结果和原始问题 RunnableParallel({ context: compression_retriever, question: RunnablePassthrough() }) # 将检索结果列表转换为字符串用 \n\n 分隔 | RunnablePassthrough.assign(contextlambda x: \n\n.join([doc.page_content for doc in x[context]])) # 格式化 Prompt | prompt # 调用 LLM | llm # 解析为字符串输出 | RunnableLambda(lambda x: x.content) )这个rag_chain的工程亮点ContextualCompressionRetriever的使用它将“粗检”20 个候选和“精排”top 5两个阶段合并为一个 retriever代码简洁且CrossEncoderReranker的精度远高于简单的向量相似度排序。RunnableParallel的精准控制RunnableParallel确保context和question是并行获取的避免了串行调用带来的延迟累积。RunnablePassthrough.assign则优雅地将context列表转换为 LLM 可读的字符串无需在 prompt 模板里写复杂的 jinja 循环。Prompt 的防御性设计system消息中明确限定了 LLM 的行为边界“不要编造”、“不要提及上下文”这是对抗幻觉hallucination的第一道防线。实测表明加上这条指令模型在“无法回答”时的拒绝率从 35% 提升到 89%。3.2.3 错误处理与降级链FallbackChain再完美的设计也无法杜绝所有错误。一个健壮的系统必须有优雅的降级方案。from langchain_core.runnables import RunnablePick from langchain_core.exceptions import OutputParserException def fallback_to_simple_answer(question: str) - str: 当 RAG 失败时退化为一个简单的、基于通用知识的回答 simple_prompt f请用一句话简要回答以下问题{question} return llm.invoke(simple_prompt).content # 构建一个带 fallback 的 chain robust_rag_chain ( rag_chain .with_fallbacks([RunnableLambda(fallback_to_simple_answer)]) ) # 更进一步可以定义一个完整的 FallbackChain包含多种降级策略 class FallbackChain: def __init__(self, primary_chain, fallback_chains): self.primary_chain primary_chain self.fallback_chains fallback_chains def invoke(self, input_data, configNone): try: return self.primary_chain.invoke(input_data, config) except OutputParserException as e: # 如果是解析错误尝试用更宽松的 parser return self.fallback_chains[0].invoke(input_data, config) except Exception as e: # 兜底返回一个友好的错误消息 return f服务暂时不可用请稍后再试。错误代码{type(e).__name__}with_fallbacks是 LangChain v0.1 提供的原生能力它允许你在主链失败时自动切换到备用链。这比在业务代码里写try/except要优雅得多也更容易测试和维护。4. 实操过程从零搭建一个可部署的 AI 后端服务4.1 环境准备与依赖管理我们摒弃了传统的requirements.txt转而使用poetry进行依赖管理。这并非为了炫技而是因为 AI 项目对依赖版本极其敏感。一个openai库的小版本升级就可能导致ChatOpenAI的初始化参数变更引发线上故障。# 初始化 poetry 项目 poetry init -n --name ai-backend --description Robust ML Backend with Pydantic LangChain # 添加核心依赖注意指定精确版本避免自动升级 poetry add fastapi0.111.0 poetry add uvicorn0.29.0 poetry add langchain-core0.1.49 poetry add langchain-openai0.1.14 poetry add langchain-community0.0.37 poetry add pydantic2.7.4 poetry add python-dotenv1.0.1 # 添加开发依赖 poetry add pytest8.2.0 poetry add pytest-asyncio0.23.7 poetry add black24.4.2注意pydantic2.7.4是一个关键选择。Pydantic v2 的BaseModel在性能和错误提示上远超 v1但 v2.7.x 是最后一个支持Field(..., default_factory...)与validator混用的稳定版本。我们曾因升级到 v2.8 导致大量root_validator报错回滚后才稳定下来。这就是“经验”——不是看最新版而是看最稳版。4.2 项目结构遵循 FastAPI 最佳实践一个清晰的项目结构是大型 AI 项目可维护性的基石。我们采用分层架构严格隔离关注点ai-backend/ ├── main.py # ASGI 入口只负责创建 app 和挂载路由 ├── api/ │ ├── __init__.py │ └── v1/ │ ├── __init__.py │ ├── endpoints/ # API 路由定义 │ │ ├── __init__.py │ │ └── rag.py # /v1/rag 相关路由 │ ├── models/ # Pydantic 模型定义输入/输出 │ │ ├── __init__.py │ │ ├── rag.py # RAG 相关的 Request/Response 模型 │ ├── services/ # 核心业务逻辑LangChain Chain 的组装与调用 │ │ ├── __init__.py │ │ └── rag_service.py # RAGChain 的实例化和 invoke 封装 │ └── dependencies/ # FastAPI 依赖项如数据库连接、LLM 客户端 │ ├── __init__.py │ └── llm.py # ChatOpenAI 客户端的单例管理 ├── core/ │ ├── __init__.py │ ├── config.py # 配置管理从 .env 加载 │ └── logger.py # 统一日志配置 ├── tests/ │ ├── __init__.py │ └── test_rag_service.py # 对 services 层的单元测试 └── .env # 环境变量文件不提交到 gitmain.py的内容极其精简from fastapi import FastAPI from api.v1.endpoints.rag import router as rag_router app FastAPI( titleAI Backend API, descriptionA robust backend for AI applications using Pydantic and LangChain, version1.0.0, ) # 挂载路由 app.include_router(rag_router, prefix/v1, tags[RAG]) app.get(/health) def health_check(): return {status: ok, timestamp: datetime.utcnow().isoformat()}这种结构的好处是main.py是稳定的所有的业务变化都发生在api/v1/下。当你需要新增一个/v1/summarize接口时只需在endpoints/下新建一个文件而不会污染入口。4.3 核心服务实现rag_service.py的完整代码这是整个后端的“心脏”。我们在这里完成 LangChain Chain 的实例化、调用、以及错误处理。# api/v1/services/rag_service.py from typing import Dict, Any, List from langchain_core.runnables import Runnable from langchain_core.documents import Document from api.v1.models.rag import QuestionRequest, SummaryResponse from core.config import settings from api.v1.dependencies.llm import get_llm_client from api.v1.dependencies.vectorstore import get_vectorstore # 在模块级别实例化 Chain避免每次请求都重建 # 这是性能优化的关键 _llm get_llm_client() _vectorstore get_vectorstore() # 重用我们在 3.2.2 节定义的 rag_chain但这里要注入具体的依赖 def build_rag_chain() - Runnable: from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import CrossEncoderReranker from langchain_community.cross_encoders import HuggingFaceCrossEncoder # 重排序器 reranker CrossEncoderReranker( modelHuggingFaceCrossEncoder(model_nameBAAI/bge-reranker-base), top_n5 ) compression_retriever ContextualCompressionRetriever( base_compressorreranker, base_retriever_vectorstore.as_retriever(search_kwargs{k: 20}) ) # Prompt 模板 from langchain_core.prompts import ChatPromptTemplate prompt ChatPromptTemplate.from_messages([ (system, 你是一个专业的知识库问答助手。请严格根据提供的上下文回答问题。 如果上下文没有提供足够信息请回答根据现有资料无法确定。 不要编造答案也不要提及根据上下文等字眼。), (human, 问题{question}\n\n上下文{context}) ]) # 组装 Chain from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda return ( RunnableParallel({ context: compression_retriever, question: RunnablePassthrough() }) | RunnablePassthrough.assign(contextlambda x: \n\n.join([doc.page_content for doc in x[context]])) | prompt | _llm | RunnableLambda(lambda x: x.content) ) # 创建全局 Chain 实例 rag_chain build_rag_chain() def invoke_rag_service(request: QuestionRequest) - SummaryResponse: 调用 RAG 服务的核心函数 :param request: 经过 Pydantic 验证后的请求对象 :return: 结构化的响应对象 try: # 1. 准备输入数据 input_data { question: request.question, session_id: request.session_id } # 2. 调用 Chain # 注意这里我们使用同步的 invoke因为 FastAPI 的 endpoint 是 async但 Chain 本身可以是 sync 或 async # 如果 Chain 是 async这里用 await rag_chain.ainvoke(...) result rag_chain.invoke(input_data) # 3. 构建成功响应 return SummaryResponse( request_idrequest.request_id or unknown, summaryresult, statussuccess, warnings[] ) except Exception as e: # 4. 统一错误处理 import traceback error_msg fRAG service failed: {str(e)} # 记录详细错误日志包含 traceback logger.error(error_msg, exc_infoTrue) # 返回用户友好的错误响应 return SummaryResponse( request_idrequest.request_id or unknown, summary, statusfailed, warnings[f服务暂时不可用{type(e).__name__}], metadata{error: str(e)} )这个invoke_rag_service函数体现了工程化的核心思想单一职责它只做一件事——调用 Chain 并包装响应。所有的预处理、后处理、错误处理都封装在内部对外接口干净。错误分类处理虽然我们用了except Exception但在实际项目中我们会捕获更具体的异常如RateLimitError触发重试、ConnectionError触发降级、OutputParserException触发宽松解析