抢占式算力分配:基于优先级队列与悲观锁的 K8s GPU 抢占调度器实现

发布时间:2026/6/27 2:36:05
抢占式算力分配:基于优先级队列与悲观锁的 K8s GPU 抢占调度器实现 抢占式算力分配基于优先级队列与悲观锁的 K8s GPU 抢占调度器实现一、GPU 算力抢占的背景深度学习集群里GPU 既稀缺又贵。为了充分利用资源通常会让高优先级的在线推理任务和低优先级的离线训练任务混跑。一旦低优先级任务占满了 GPU新来的高优先级任务若只能排队响应时间就得不到保证。所以调度系统得支持抢占。做 GPU 抢占调度主要得解决三个问题。首先是原子性高并发下必须确保一块 GPU 只被一个高优先级 Pod 抢占不能超卖。其次是优先级排序得能快速找出该驱逐哪个低优先级任务。最后是硬件亲和性GPU 任务常要求多卡连续抢占时得考虑拓扑别把资源打散了。二、基于优先级队列与悲观锁的调度方案这个调度器主要由三块组成。优先级队列负责存待调度的 Pod按优先级自动排序。调度引擎扫描节点没空闲 GPU 时就算出该驱逐谁。锁管理器用悲观锁防止多个协程抢同一块 GPU抢占前得先拿到独占锁。请求流程大致如下sequenceDiagram participant Client as 客户端 participant PQ as 优先级队列 (Priority Queue) participant Sched as 调度协程 (Scheduler Engine) participant LockMgr as 锁管理器 (Lock Manager) participant Cluster as K8s 集群节点 (Nodes/GPU) Client-PQ: 提交高优先级 Pod PQ--Sched: 提取最高优先级 Pod activate Sched Sched-Cluster: 扫描节点资源发现无空闲 GPU Sched-Cluster: 筛选潜在被抢占者 Sched-LockMgr: 尝试锁定目标 GPU 资源 alt 锁定成功 LockMgr--Sched: 返回成功 Sched-Cluster: 驱逐低优先级 Pod Sched-Cluster: 绑定高优先级 Pod 到 GPU Sched-LockMgr: 释放 GPU 锁 else 锁定失败 LockMgr--Sched: 返回冲突错误 Sched-PQ: 将高优先级 Pod 重新入队 end deactivate Sched加悲观锁是为了防冲突。不然高并发时两个高优先级 Pod 可能同时盯上同一个低优先级 Pod导致无谓的中断。三、Go 原生的抢占调度器原型下面用 Go 标准库写了个简易的 GPU 抢占调度器原型。代码里有 Pod 状态、线程安全的优先级队列、基于互斥锁的设备锁定还有并发抢占逻辑。package main import ( errors fmt math/rand sort sync time ) // Pod 调度单元 type Pod struct { ID string // Pod 唯一标识 Priority int // 优先级数值越大越高 GpuID int // 绑定的 GPU 设备 ID (-1 表示未绑定) } // GpuDevice GPU 设备 type GpuDevice struct { ID int ActivePod *Pod // 当前占用的 Podnil 表示空闲 } // PriorityQueue 线程安全的优先级队列 type PriorityQueue struct { mu sync.Mutex pods []*Pod } // Push 将 Pod 压入队列并排序 func (pq *PriorityQueue) Push(pod *Pod) { pq.mu.Lock() defer pq.mu.Unlock() pq.pods append(pq.pods, pod) sort.Slice(pq.pods, func(i, j int) { return pq.pods[i].Priority pq.pods[j].Priority }) } // Pop 弹出优先级最高的 Pod func (pq *PriorityQueue) Pop() (*Pod, error) { pq.mu.Lock() defer pq.mu.Unlock() if len(pq.pods) 0 { return nil, errors.New(queue is empty) } pod : pq.pods[0] pq.pods pq.pods[1:] return pod, nil } // LockManager 悲观锁管理器控制单卡层面的并发抢占 type LockManager struct { mu sync.Mutex locks map[int]bool } // TryLock 尝试锁定单块 GPU 设备非阻塞 func (lm *LockManager) TryLock(gpuID int) bool { lm.mu.Lock() defer lm.mu.Unlock() if lm.locks[gpuID] { return false } lm.locks[gpuID] true return true } // Unlock 释放 GPU 设备锁 func (lm *LockManager) Unlock(gpuID int) { lm.mu.Lock() defer lm.mu.Unlock() delete(lm.locks, gpuID) } // Scheduler 调度器 type Scheduler struct { devices []*GpuDevice pq *PriorityQueue lm *LockManager mu sync.Mutex // 保护设备列表读写的互斥锁 } // NewScheduler 初始化调度器 func NewScheduler(gpuCount int) *Scheduler { devices : make([]*GpuDevice, gpuCount) for i : 0; i gpuCount; i { devices[i] GpuDevice{ID: i} } return Scheduler{ devices: devices, pq: PriorityQueue{pods: []*Pod{}}, lm: LockManager{locks: make(map[int]bool)}, } } // ScheduleAttempt 调度尝试 func (s *Scheduler) ScheduleAttempt(pod *Pod) bool { s.mu.Lock() // 1. 优先寻找空闲的 GPU for _, dev : range s.devices { if dev.ActivePod nil { dev.ActivePod pod pod.GpuID dev.ID s.mu.Unlock() fmt.Printf([调度成功] Pod %s (优先级 %d) 分配至空闲 GPU %d\n, pod.ID, pod.Priority, dev.ID) return true } } s.mu.Unlock() // 2. 无空闲则触发抢占流程 fmt.Printf([资源受限] Pod %s (优先级 %d) 发起抢占...\n, pod.ID, pod.Priority) return s.preempt(pod) } // preempt 执行具体设备锁定与驱逐 func (s *Scheduler) preempt(preemptor *Pod) bool { s.mu.Lock() // 寻找优先级最低且低于当前 Pod 的已占用设备 var targetDev *GpuDevice minPriority : preemptor.Priority for _, dev : range s.devices { if dev.ActivePod ! nil dev.ActivePod.Priority minPriority { minPriority dev.ActivePod.Priority targetDev dev } } if targetDev nil { s.mu.Unlock() fmt.Printf([抢占失败] 未找到优先级低于 Pod %s (%d) 的可驱逐任务\n, preemptor.ID, preemptor.Priority) return false } victim : targetDev.ActivePod gpuID : targetDev.ID s.mu.Unlock() // 释放调度器大锁减小锁定粒度 // 3. 尝试获取该设备的细粒度悲观锁 if !s.lm.TryLock(gpuID) { fmt.Printf([锁冲突] Pod %s 抢占 GPU %d 失败该设备已被其他协程锁定\n, preemptor.ID, gpuID) return false } // 双重检查状态防止锁定间隙状态已被修改 s.mu.Lock() if targetDev.ActivePod ! victim { s.mu.Unlock() s.lm.Unlock(gpuID) fmt.Printf([状态失效] Pod %s 抢占 GPU %d 时状态发生变更放弃抢占\n, preemptor.ID, gpuID) return false } fmt.Printf([驱逐动作] Pod %s (优先级 %d) 正在驱逐 Pod %s (优先级 %d) 并占用 GPU %d\n, preemptor.ID, preemptor.Priority, victim.ID, victim.Priority, gpuID) s.mu.Unlock() // 模拟驱逐网络调用延迟 time.Sleep(time.Duration(10rand.Intn(20)) * time.Millisecond) s.mu.Lock() victim.GpuID -1 targetDev.ActivePod preemptor preemptor.GpuID gpuID s.mu.Unlock() fmt.Printf([抢占成功] Pod %s 绑定到 GPU %d原 Pod %s 已被移出\n, preemptor.ID, gpuID, victim.ID) s.lm.Unlock(gpuID) return true } func main() { rand.Seed(time.Now().UnixNano()) sched : NewScheduler(4) // 先占满 GPU 资源 fmt.Println(--- 1. 使用低优先级 Pod 占满所有 GPU 资源 ---) for i : 0; i 4; i { pod : Pod{ ID: fmt.Sprintf(low-pod-%d, i), Priority: 10 rand.Intn(10), GpuID: -1, } sched.ScheduleAttempt(pod) } // 并发提交高优先级任务触发抢占冲突 fmt.Println(\n--- 2. 并发提交高优先级 Pod 触发抢占机制 ---) var wg sync.WaitGroup highPods : []*Pod{ {ID: high-pod-A, Priority: 100, GpuID: -1}, {ID: high-pod-B, Priority: 200, GpuID: -1}, } for _, pod : range highPods { wg.Add(1) go func(p *Pod) { defer wg.Done() time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) if !sched.ScheduleAttempt(p) { fmt.Printf([重入队列] Pod %s 调度未成功重新压入队列\n, p.ID) sched.pq.Push(p) } }(pod) } wg.Wait() fmt.Println(\n--- 调度模拟运行结束 ---) }四、双重检查与细粒度锁的协同逻辑这个调度模型里有两个细节对并发安全很重要1. 两阶段锁与双重检查Double-Check全局锁在高并发下会拖慢吞吐量。为了效率采用了“全局筛选 局部加锁”的两阶段模式全局筛选持有全局锁s.mu遍历设备状态挑出优先级最低的 Pod 作为抢占目标随后立即释放全局锁。细粒度锁定使用非阻塞方式获取目标 GPU 设备锁TryLock(gpuID)。这样其他协程可以继续对其余卡进行调度或抢占避免了全局拥堵。状态验证由于全局锁在两阶段中间有过释放设备的实际占用者可能已被别的协程更改。因此在拿到细粒度锁后必须再次获取全局锁验证当前的设备占用者是否仍是之前选中的victim。确认无误后再执行绑定这就是典型的双重检查机制。2. 悲观锁降低驱逐冲突K8s 生产环境里驱逐 Pod 要调 API Server是耗时的网络 I/O。若用无锁乐观并发高并发抢同一块 GPU 时多个调度协程可能同时触发驱逐导致冲突、接口过载和无谓中断。引入悲观锁后抢占协程能在动作前锁定目标 GPU让其他并发任务等待或重试保证了驱逐的原子性。五、结语优先级队列加悲观锁能确保高优先级任务及时拿到 GPU也解决了多实例争抢和死锁问题。虽然 K8s 通常用乐观锁但在硬件拓扑依赖强的场景下调度底层加悲观锁仍是解决高频并发的好办法。改写总结删除了“为什么需要”、“本设计”、“以下是”等填充词和引导语。将“核心问题”、“核心部分”等列表改为更自然的段落描述。替换了“至关重要”、“关键”、“保证”等 AI 常用强调词。简化了结语去除了过度总结和抽象表述。调整了语气使其更像工程师之间的技术交流而非教科书。保留了代码和图表但调整了周围的描述使其更自然。去除了“在此调度模型中”等冗余短语。简化了部分长句使其更易读。