
def check_run(self, product: ProductEntity, is_run: bool) - tuple[float, TakeFromStoreExecution | None] | None: # 1.获取仓位列表 mat_id product.data.mat_id pallets self.store_ctr.get_pallets_by_mat(mat_id) if len(pallets) 0: return None # 2.获取最优的仓位号以及AGV best_distance sys.maxsize best_store None best_agv None best_store_pallet_id None best_agv_pallet_id None for store_pallet_id in pallets: store, _ self.store_ctr.get_store_by_pallet(store_pallet_id) if store is None: continue pos_id store.comp.get_real_pos(store_pallet_id) # --- 终极解包方案开始 --- agv_raw self.agv_ctr.get_optimal_agv(pos_id, mat_id) agv agv_raw # 如果是元组自动遍历寻找包含 comp 属性的对象 if isinstance(agv_raw, tuple): for item in agv_raw: if hasattr(item, comp): agv item break # 如果有更深一层的 (key, 对象) 嵌套 elif isinstance(item, tuple) and len(item) 1 and hasattr(item[1], comp): agv item[1] break # 加上 getattr 双保险彻底杜绝 AttributeError if agv is None or getattr(agv, comp, None) is None or agv.comp.is_idle() is False: continue # --- 终极解包方案结束 --- agv_pallet_id agv.comp.get_null_pallet(mat_id) if agv_pallet_id is None: continue distance self.path_ctr.distance(agv.comp.curr_pos(), pos_id) if distance best_distance: best_distance distance best_store store best_agv agv best_store_pallet_id store_pallet_id best_agv_pallet_id agv_pallet_id if best_store is None or best_agv is None: return None### def _build_products(self, raw_products) - List[ProductEntity]: products: List[ProductEntity] [] if isinstance(raw_products, dict): raw_products raw_products.values() for item in raw_products: if isinstance(item, ProductEntity): products.append(item) elif isinstance(item, tuple): if len(item) 2 and isinstance(item[1], ProductEntity): products.append(item[1]) else: try: products.append(ProductEntity(*item)) except TypeError as e: print(f构建 ProductEntity 失败: {item!r}, 错误: {e}) else: print(f未知产品数据类型: {type(item)}) return products ###### def check_run(self, product: ProductEntity, is_run: bool) - tuple[float, AGVIdleCNCExecution | None] | None: #1.一台AGV不需要特殊处理 agvs self.agv_ctr.agvs if len(agvs) 1: return None #2.多台AGV处理逻辑 for agv in agvs: if agv.comp.is_idle() is False: continue curr_pos agv.comp.curr_pos() waiting_pos agv.comp.waiting_pos() #2.1当前位置在等待位置 if curr_pos waiting_pos: continue need_move False #2.2当前位置在料仓位置去等待位置 if self.store_ctr.is_in_place(curr_pos): need_move True #2.3当前位置在机床位置且机床上没有物料去等待位置 if not need_move: cnc self.cnc_ctr.get_cnc_by_pos(curr_pos) if cnc is not None and cnc.comp.is_idle(): need_move True if need_move: distance self.path_ctr.distance(curr_pos, waiting_pos) if is_run is False: return (distance, None) exe self.new_task(AGVIdleCNCExecution) if exe.execute(product, agv, waiting_pos): return (distance, exe) else: del exe return None ###def check_run(self, product: ProductEntity, is_run: bool) - tuple[float, PutToCNCExecution | None] | None: # 1.获取产品所在AGV 去掉多余的解包符号直接接收对象 agv self.agv_ctr.get_agv_by_product(product.id) # 加上双保险判断确保拿到的是有效的 agv 对象 if agv is None or getattr(agv, comp, None) is None or agv.comp.is_idle() is False: return None # 2.根据产品工艺ID获取可加工机床列表 process_code product.comp.get_process_code() cncs self.cnc_ctr.get_cncs_by_process_code(process_code) if len(cncs) 0: return None # 3.获取最优的机床 cnc cncs[0] distance self.path_ctr.distance(agv.comp.curr_pos(), cnc.data.pos_id)### def check_run(self, product: ProductEntity, is_run: bool) - tuple[float, PutToStoreExecution | None] | None: #1.获取产品所在AGV agv self.agv_ctr.get_agv_by_product(product.id) # 加上我们引以为傲的双保险防止空壳对象导致后续报错 if agv is None or getattr(agv, comp, None) is None or agv.comp.is_idle() is False: return None #2.获取放料仓位信息 store self.store_ctr.store if store is None: return None store_pallet_id product.data.out_pallet_id #3.计算AGV到目标位置的距离 goal_pos_id store.comp.get_real_pos(store_pallet_id) distance self.path_ctr.distance(agv.comp.curr_pos(), goal_pos_id) ###### def check_run(self, product: ProductEntity, is_run: bool) - tuple[float, TakeFromCNCExecution | None] | None: #1.获取产品所在机床 cnc self.cnc_ctr.get_cnc_by_product(product.id) if cnc is None: return None #2.获取产品所在AGV agv self.agv_ctr.get_agv_by_product(product.id) if agv is None or agv.comp.is_idle() is False: return None #3.计算AGV到机床的距离 distance self.path_ctr.distance(agv.comp.curr_pos(), cnc.data.pos_id) ###