实战 LangGraph 循环执行:构建带自动重试的并行任务流

发布时间:2026/6/25 13:48:26
实战 LangGraph 循环执行:构建带自动重试的并行任务流 核心场景与需求拆解我们的目标场景很明确并行执行两个独立任务获取天气、获取时间两个任务都存在随机失败概率只要任意一个任务失败就自动触发循环重试无需手动重启流程支持设置最大重试次数避免无限循环保障流程稳定性全程通过状态管理数据重试时复用原始输入自动更新执行结果。这种模式广泛适用于第三方 API 重试、异步数据拉取、批量任务容错处理、AI 工具调用容错等实际业务场景通用性极强。二、LangGraph 循环执行的核心设计思路LangGraph 实现循环的核心逻辑完全围绕 **「状态管理 条件决策」** 展开没有复杂的语法核心分为三大模块1. 状态定义循环的「数据载体」首先定义专属的工作流状态把输入数据、任务结果、重试次数、最大重试数、执行状态全部封装起来。状态是 LangGraph 循环的核心 —— 所有节点的执行、重试的判断、结果的更新都基于这个状态流转每一次循环都会自动更新重试次数和任务结果为决策提供依据。2. 执行节点循环的「任务核心」这是循环的主体节点负责并行执行目标任务。我们通过线程池实现多任务并行执行提升效率执行完成后会自动判断两个任务是否全部成功并把结果、重试次数更新到状态中传递给下一个节点。这个节点是循环的「动作执行者」每一次重试都会重新执行这个节点。3. 决策节点循环的「开关控制器」这是实现循环的关键核心决策节点会读取最新的状态根据规则判断流程走向✅ 任务全部成功 → 结束流程❌ 任务失败 未达最大重试次数 →回到执行节点重新循环❌ 任务失败 达到最大重试次数 → 强制结束流程。通过这个决策节点LangGraph 轻松实现了闭环循环这也是它比传统工作流框架更灵活的地方。三、LangGraph 循环执行的核心优势对比传统代码写循环 重试的方式LangGraph 的循环执行有三大突出优势1. 逻辑解耦可读性拉满循环逻辑、任务执行逻辑、决策逻辑完全分离每个节点只做一件事。后续修改重试次数、调整任务逻辑、新增失败处理都不用改动核心循环代码维护成本极低。2. 状态自动流转无需手动管理不用自己定义变量记录重试次数、不用手动传递任务结果LangGraph 会自动管理状态的更新与传递彻底避免手动管理状态带来的 bug。3. 容错性强适配不稳定场景针对第三方接口、网络请求等易失败场景自动重试机制能大幅提升任务成功率同时限制最大重试次数兼顾效率与稳定性生产环境直接可用。4. 并行 循环完美结合支持在循环节点内执行并行任务既解决了任务容错问题又保证了执行效率复杂工作流也能轻松编排。四、落地价值与适用场景这套 LangGraph 循环重试模式是生产级工作流的必备能力适用场景非常广泛第三方 API 调用容错天气、地图、支付等接口AI Agent 工具调用自动重试LLM 接口、插件调用批量数据拉取 / 处理保障任务完整性异步任务执行失败自动恢复。它让工作流从「一次性执行」变成「智能容错执行」大幅提升系统的稳定性和健壮性。五、总结LangGraph 的循环执行能力通过状态驱动 条件决策的设计让复杂的循环重试工作流变得极简、优雅。我们不需要编写嵌套的循环代码只需要定义状态承载数据→编写执行节点处理任务→编写决策节点控制循环就能快速构建出支持并行、自动重试、容错性强的工作流。这也是 LangGraph 的核心魅力用最简洁的编排方式解决最复杂的工作流问题无论是 AI 应用还是常规业务流程都能轻松驾驭。代码流程图实现代码123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169fromlanggraph.graphimportStateGraph, ENDfromtypingimportTypedDict, Annotatedimportoperatorimporttimeimportrandomfromconcurrent.futuresimportThreadPoolExecutor, as_completed# 1. 定义带重试状态的 StateclassLoopState(TypedDict):input_data:str# 输入数据weather_data:str|None# 天气数据time_data:str|None# 时间数据retry_count:int# 重试次数max_retries:int# 最大重试次数is_success:bool# 是否执行成功用于判断循环退出# 2. 模拟不稳定的并行任务可能失败触发循环deffetch_weather(input_data:str)-str:模拟天气数据获取30%概率失败print(f\n[天气任务] 处理: {input_data})ifrandom.random() 0.3:# 30%失败概率print([天气任务] ❌ 数据获取失败)returntime.sleep(1)# 模拟耗时resultf{input_data} 天气25℃ 晴天print(f[天气任务] ✅ 成功: {result})returnresultdeffetch_time(input_data:str)-str:模拟时间数据获取20%概率失败print(f\n[时间任务] 处理: {input_data})ifrandom.random() 0.2:# 20%失败概率print([时间任务] ❌ 数据获取失败)returntime.sleep(1)# 模拟耗时resultf{input_data} 时间15:30 下午print(f[时间任务] ✅ 成功: {result})returnresult# 3. 并行执行节点循环的核心执行节点defparallel_execution(state: LoopState)- LoopState:并行执行天气/时间任务返回更新后的状态input_datastate[input_data]retry_countstate[retry_count]1print(f\n{*50})print(f[循环执行] 第 {retry_count} 次重试)print(*50)# 线程池并行执行with ThreadPoolExecutor(max_workers2) as executor:future_weatherexecutor.submit(fetch_weather, input_data)future_timeexecutor.submit(fetch_time, input_data)# 收集结果weather_datatime_dataforfutureinas_completed([future_weather, future_time]):iffuturefuture_weather:weather_datafuture.result()eliffuturefuture_time:time_datafuture.result()# 判断本次执行是否成功is_successbool(weather_dataandtime_data)# 返回更新后的状态return{input_data: input_data,weather_data: weather_data,time_data: time_data,retry_count: retry_count,max_retries: state[max_retries],is_success: is_success}# 4. 循环决策节点判断是否继续重试defloop_decision(state: LoopState)-str:决策节点返回下一个节点名称循环/结束- 成功 → END- 失败且未达最大重试 → parallel_execution继续循环- 失败且达最大重试 → ENDprint(f\n[决策节点] 重试次数: {state[retry_count]}/{state[max_retries]} | 执行成功: {state[is_success]})# 退出条件1执行成功ifstate[is_success]:print([决策节点] ✅ 所有任务成功结束循环)returnEND# 退出条件2达到最大重试次数elifstate[retry_count] state[max_retries]:print([决策节点] ❌ 达到最大重试次数强制结束)returnEND# 继续循环返回并行执行节点名称else:print([决策节点] 任务失败继续重试)returnparallel_execution# 5. 初始化节点设置初始状态definit_state(state: LoopState)- LoopState:初始化循环状态return{input_data: state[input_data],weather_data:None,time_data:None,retry_count:0,# 初始重试次数0max_retries: state[max_retries],# 最大重试次数外部传入is_success:False# 初始未成功}