负载感知路由与背压控制
拆解 go-ai-scheduler 的任务分发决策链路,分析乐观容量预留、标签过滤、双策略路由和三态背压控制器的设计。
问题背景
在分布式调度系统中,"把任务分给哪个 Worker"看起来是个简单的问题,但实际需要考虑多个维度:
- Worker 是否在线? 心跳超时的 Worker 不应该接收新任务
- Worker 是否已满? 超过并发上限的 Worker 会拖慢整体执行
- Worker 是否有正确标签? 某些任务需要 GPU、特定 OS 等环境
- Worker 协议是否匹配? gRPC Worker 和 HTTP Worker 不能混用
- 系统整体是否过载? 任务堆积时需要主动拒绝新分发
go-ai-scheduler 的路由层和背压层分别回答了"分给谁"和"要不要分"这两个问题。
Router:负载感知路由
Router 是调度器的分发决策中心:
// internal/scheduler/route/router.go
type Router struct {
workers repo.WorkerRepository
rrCounter atomic.Int64
}
标签过滤
Worker 注册时可以携带标签(如 {"gpu": "true", "zone": "cn-north"}),任务也可以指定标签选择器:
func (r *Router) filterByLabels(workers []*model.WorkerNode, selector map[string]string) []*model.WorkerNode {
if len(selector) == 0 {
return workers
}
var filtered []*model.WorkerNode
for _, w := range workers {
if model.MatchLabels(model.DecodeLabels(w.Labels), selector) {
filtered = append(filtered, w)
}
}
return filtered
}
标签匹配使用精确匹配策略:选择器中的每个 key-value 都必须在 Worker 标签中存在。
双策略路由
过滤后,Router 支持两种分发策略:
func (r *Router) Pick(ctx context.Context, opts SelectOptions) (*model.WorkerNode, error) {
filtered := r.filterByLabels(all, opts.Labels)
if len(filtered) == 0 {
return nil, ErrNoAvailableWorker
}
var best *model.WorkerNode
switch opts.Strategy {
case "round_robin":
best = r.pickRoundRobin(filtered)
default:
best = r.pickLeastLoaded(filtered)
}
// ...
}
Least Loaded(默认):选择当前负载最低的 Worker
func (r *Router) pickLeastLoaded(workers []*model.WorkerNode) *model.WorkerNode {
best := workers[0]
for _, w := range workers[1:] {
if w.CurrentLoad < best.CurrentLoad {
best = w
}
}
return best
}
Round Robin:轮询,适合负载均匀的场景
func (r *Router) pickRoundRobin(workers []*model.WorkerNode) *model.WorkerNode {
idx := int(r.rrCounter.Add(1)-1) % len(workers)
return workers[idx]
}
| 策略 | 适用场景 | 缺点 |
|---|---|---|
| Least Loaded | 异构 Worker(不同并发上限) | 需要维护 CurrentLoad 的准确性 |
| Round Robin | 同构 Worker、负载均匀 | 不考虑实际负载,可能堆积到慢节点 |
乐观容量预留
选定 Worker 后,Router 立即在数据库中递增其 CurrentLoad:
best.CurrentLoad++
if err := r.workers.UpsertWorker(ctx, best); err != nil {
return nil, err
}
return best, nil
这是乐观预留——在任务实际执行前就占用容量。如果后续分发失败(Worker 宕机、网络超时),需要通过 Release 回滚:
func (r *Router) Release(ctx context.Context, worker *model.WorkerNode) error {
if worker.CurrentLoad > 0 {
worker.CurrentLoad--
}
return r.workers.UpsertWorker(ctx, worker)
}
<Callout type="warning" title="乐观预留的风险"
如果分发成功但 Worker 执行过程中崩溃,CurrentLoad 不会被自动递减。go-ai-scheduler 通过两个机制兜底:
- 健康检查器:定期驱逐心跳超时的 Worker,将其所有负载清零
- Worker 重注册:Worker 重启后重新注册,负载重新初始化
BackpressureController:三态背压控制
路由解决了"分给谁",背压控制回答"要不要分"。BackpressureController 维护三个压力状态:
// internal/scheduler/ratelimit/backpressure.go
const (
PressureGreen PressureState = iota // 正常运行
PressureYellow // 接近上限,限流
PressureRed // 已达上限,拒绝
)
双维度监控
背压控制器监控两个独立的维度:
维度一:Pending 实例数
func (b *BackpressureController) UpdatePending(ctx context.Context, count int) {
b.mu.Lock()
defer b.mu.Unlock()
b.pendingCount = count
ratio := float64(count) / float64(b.maxPending)
switch {
case ratio >= b.redRatio: // 0.9
b.state = PressureRed
case ratio >= b.yellowRatio: // 0.7
b.state = PressureYellow
default:
b.state = PressureGreen
}
}
维度二:Worker 平均负载
func (b *BackpressureController) UpdateWorkerLoad(ctx context.Context, loads map[string]int64, maxConcurrency int) {
var totalLoad float64
for _, l := range loads {
totalLoad += float64(l)
}
avgLoad := totalLoad / float64(len(loads))
avgRatio := avgLoad / float64(maxConcurrency)
switch {
case avgRatio >= b.maxWorkerLoad: // 0.85
b.state = PressureRed
case avgRatio >= b.maxWorkerLoad*0.8: // 0.68
if b.state < PressureYellow {
b.state = PressureYellow
}
}
}
状态决策表
| Pending 比例 | Worker 负载比例 | 最终状态 |
|---|---|---|
| < 0.7 | < 0.68 | Green |
| 0.7 ~ 0.9 | < 0.85 | Yellow |
| ≥ 0.9 | 任意 | Red |
| 任意 | ≥ 0.85 | Red |
两个维度是**"或"关系**:任一维度达到 Red 阈值,整体状态就是 Red。
状态响应
不同状态触发不同的分发行为:
func (b *BackpressureController) AllowDispatch() bool {
return b.State() != PressureRed
}
func (b *BackpressureController) ThrottleDelay() time.Duration {
switch b.state {
case PressureYellow:
return 500 * time.Millisecond
case PressureRed:
return 2 * time.Second
default:
return 0
}
}
- Green:正常分发,无延迟
- Yellow:扫描时延迟 500ms,给系统喘息时间
- Red:拒绝新的扫描和分发,任务留在数据库中等待下一轮
默认阈值
func DefaultBackpressureConfig() BackpressureConfig {
return BackpressureConfig{
MaxPending: 1000, // 最多 1000 个 pending 实例
YellowRatio: 0.7, // 700 个进入 Yellow
RedRatio: 0.9, // 900 个进入 Red
MaxWorkerLoad: 0.85, // Worker 平均负载 85% 进入 Red
}
}
这些阈值都是可配置的,通过环境变量或配置文件覆盖。
Token Bucket:分发速率限制
背压控制解决的是"系统整体是否过载",而令牌桶解决的是"分发速率是否过快"。Dispatch Client 在每次分发前检查令牌桶:
func (c *Client) Dispatch(ctx context.Context, worker *model.WorkerNode, req model.ExecuteTaskRequest) error {
if c.rateLimiter != nil {
if !c.rateLimiter.Allow() {
return fmt.Errorf("dispatch rate limit exceeded, wait %s", c.rateLimiter.WaitTime(1))
}
}
// ...
}
令牌桶的参数是 rate=3000, burst=6000,意味着:
- 长期平均分发速率不超过 3000 QPS
- 允许短时间内突发到 6000 QPS
背压和令牌桶是互补的:
- 背压保护系统内部状态(pending 数、Worker 负载)
- 令牌桶保护下游 Worker(防止瞬时流量冲击)
Health Checker:Worker 健康维护
路由和背压的决策质量,取决于 Worker 状态数据的准确性。Health Checker 负责清理失效节点:
// internal/scheduler/health/checker.go
func (c *Checker) Start(ctx context.Context) {
ticker := time.NewTicker(c.tick) // 10s
for {
select {
case <-ticker.C:
count, err := c.workers.EvictStaleWorkers(ctx, c.timeout) // 30s
if count > 0 {
c.logger.Debug("health check evicted stale workers", "count", count)
}
}
}
}
Worker 每 10 秒发送一次心跳。如果 30 秒内没有收到心跳,Health Checker 将其标记为离线,并释放其占用的 CurrentLoad。
这个 30 秒的超时窗口是另一个 trade-off:
- 太短(5 秒):网络抖动导致误杀
- 太长(5 分钟):Worker 宕机后长时间占用容量,降低系统吞吐量
分发链路的全景
把路由、背压、限流和健康检查串起来,一条完整的分发链路是这样的:
Trigger Loop 扫描到期任务
→ BackpressureController.AllowDispatch()?
├─ Red → 拒绝,任务留在队列
├─ Yellow → 延迟 500ms 后继续
└─ Green → 继续
→ Router.Pick()
→ 标签过滤
→ Least Loaded / Round Robin 选 Worker
→ CurrentLoad++ (乐观预留)
→ TokenBucket.Allow()?
├─ 否 → 限流拒绝
└─ 是 → 继续
→ Dispatch Client 按协议分发 (HTTP/gRPC)
→ Worker 执行
→ Router.Release() 或更新状态
总结
go-ai-scheduler 的分发决策是一个多层防护体系:
| 层级 | 职责 | 机制 |
|---|---|---|
| 背压控制 | 系统整体负载 | Green/Yellow/Red 三态 |
| 标签过滤 | 任务-Worker 匹配 | 精确 key-value 匹配 |
| 路由策略 | 单任务分发决策 | Least Loaded / Round Robin |
| 容量预留 | 防止超发 | 乐观递增 + 失败回滚 |
| 速率限制 | 保护下游 | Token Bucket |
| 健康检查 | 数据准确性 | 30s 超时驱逐 |
每一层都有明确的职责边界,彼此之间通过接口解耦。这种分层设计让系统可以独立调整各层的策略和参数,而不会牵一发而动全身。
下一篇会深入 Worker 端的设计,分析心跳机制、本地缓冲、沙箱执行等话题。