任务分发与 Worker 路由策略

分析 go-ai-scheduler 的任务分发链路,理解 Worker 注册、心跳、负载路由与背压控制的实现细节。

分发链路概述

Scheduler 决定"何时触发"任务后,下一步是"发给谁"。这条链路包括:

Scheduler Trigger Loop
  │
  ├─ 1. 检查背压(pending 实例数是否超限)
  ├─ 2. 检查依赖(上游任务是否完成)
  ├─ 3. 拆分 shard(水平分片并行执行)
  ├─ 4. Router 选择 Worker
  ├─ 5. Dispatcher 发送请求
  └─ 6. 更新任务下次触发时间

本文聚焦第 4、5 步:Router 和 Dispatcher。

Worker 注册与心跳

Worker 启动时向 Scheduler 注册:

type WorkerNode struct {
    ID           string
    Host         string
    CallbackURL  string
    GRPCAddr     string
    Protocol     string        // "http" or "grpc"
    Labels       string        // JSON-encoded label selector
    CurrentLoad  int           // 当前负载计数
    MaxLoad      int           // 最大负载
    LastHeartbeat time.Time
    Status       string        // "online", "offline"
}

Worker 定期发送心跳,Scheduler 通过心跳判断 Worker 是否存活。心跳超时的 Worker 会被标记为 offline,不再参与路由。

Router 实现

type Router struct {
    workers   repo.WorkerRepository
    rrCounter atomic.Int64      // 轮询计数器
}

路由策略

支持两种策略,通过 task.RouteStrategy 字段配置:

1. Least-Loaded(默认)

func (r *Router) pickLeastLoaded(workers []*model.WorkerNode) *model.WorkerNode {
    best := workers[0]
    for _, w := range workers[1:] {
        if w.CurrentLoad < best.CurrentLoad {
            best = w
        }
    }
    return best
}

选择当前负载最低的 Worker。注意这里的 "load" 是调度器视角的计数,不是 CPU/内存使用率。每次分发 CurrentLoad++,执行完成或分发失败时 CurrentLoad--

2. Round-Robin

func (r *Router) pickRoundRobin(workers []*model.WorkerNode) *model.WorkerNode {
    idx := int(r.rrCounter.Add(1)-1) % len(workers)
    return workers[idx]
}

使用原子计数器保证多线程安全。轮询策略简单公平,但不考虑 Worker 实际负载。

Label 过滤

func (r *Router) filterByLabels(workers []*model.WorkerNode, selector map[string]string) []*model.WorkerNode {
    if len(selector) == 0 {
        return workers
    }
    var filtered []*model.WorkerNode
    for _, w := range workers {
        if model.MatchLabels(model.DecodeLabels(w.Labels), selector) {
            filtered = append(filtered, w)
        }
    }
    return filtered
}

任务可以指定 Labels(如 {"zone": "cn-north", "gpu": "true"}),Router 只将任务分发给匹配的 Worker。

完整的 Pick 流程

func (r *Router) Pick(ctx context.Context, opts SelectOptions) (*model.WorkerNode, error) {
    // 1. 获取所有可用 Worker
    all, _ := r.workers.ListAvailableWorkers(ctx)

    // 2. Label 过滤
    filtered := r.filterByLabels(all, opts.Labels)
    if len(filtered) == 0 {
        return nil, ErrNoAvailableWorker
    }

    // 3. 按策略选择
    var best *model.WorkerNode
    switch opts.Strategy {
    case "round_robin":
        best = r.pickRoundRobin(filtered)
    default:
        best = r.pickLeastLoaded(filtered)
    }

    // 4. 预留容量
    best.CurrentLoad++
    r.workers.UpsertWorker(ctx, best)
    return best, nil
}

容量释放

func (r *Router) Release(ctx context.Context, worker *model.WorkerNode) error {
    if worker.CurrentLoad > 0 {
        worker.CurrentLoad--
    }
    return r.workers.UpsertWorker(ctx, worker)
}

释放时机:

  • Worker 执行成功或失败后上报结果
  • 分发失败(Worker 未收到请求)

Dispatcher 实现

Router 选出 Worker 后,Dispatcher 负责实际发送请求。

type Client struct {
    httpClient  *http.Client
    rateLimiter *ratelimit.TokenBucket  // 可选的令牌桶限流
}

双协议支持

func (c *Client) Dispatch(ctx context.Context, worker *model.WorkerNode, req model.ExecuteTaskRequest) error {
    if c.rateLimiter != nil && !c.rateLimiter.Allow() {
        return fmt.Errorf("dispatch rate limit exceeded")
    }
    if strings.EqualFold(worker.Protocol, "grpc") {
        return c.dispatchGRPC(ctx, worker.GRPCAddr, req)
    }
    return c.dispatchHTTP(ctx, worker.CallbackURL, req)
}

HTTP 分发

func (c *Client) dispatchHTTP(ctx context.Context, callbackURL string, req model.ExecuteTaskRequest) error {
    body, _ := json.Marshal(req)
    url := strings.TrimRight(callbackURL, "/") + "/internal/tasks/execute"
    httpReq, _ := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
    httpReq.Header.Set("Content-Type", "application/json")

    resp, err := c.httpClient.Do(httpReq)
    if resp.StatusCode >= 300 {
        return fmt.Errorf("worker returned status %s", resp.Status)
    }
    return nil
}

gRPC 分发

func (c *Client) dispatchGRPC(ctx context.Context, target string, req model.ExecuteTaskRequest) error {
    conn, _ := grpc.DialContext(ctx, target,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithBlock(),
    )
    defer conn.Close()

    client := schedulerv1.NewExecutorServiceClient(conn)
    resp, _ := client.ExecuteTask(ctx, &schedulerv1.ExecuteTaskRequest{...})
    if !resp.GetAccepted() {
        return fmt.Errorf("grpc worker rejected dispatch")
    }
    return nil
}

协议由 Worker 注册时声明,Scheduler 根据 worker.Protocol 自动选择。

背压控制

Trigger Loop 在分发前有两层背压保护:

1. 全局 pending 上限

pending, _ := l.instanceRepo.CountInstancesByStatus(ctx, "pending")
if pending >= l.maxPending {
    // 拒绝本次扫描,等待 pending 下降
    return
}

默认 maxPending = 1000。当系统中 pending 实例超过阈值时,Scheduler 停止创建新实例。

2. 渐进式节流

if delay := l.bp.ThrottleDelay(); delay > 0 {
    time.Sleep(delay)
}

背压控制器根据 pending 数量计算延迟:pending 越高,扫描间隔越长。

Rendering diagram...

分发失败处理

if err := l.dispatcher.Dispatch(ctx, worker, req); err != nil {
    // 1. 记录失败
    metrics.DefaultRegistry.IncCounter("scheduler_dispatch_total", map[string]string{"result": "error"})
    l.instanceRepo.UpdateInstanceResult(ctx, instance.ScheduleInstanceID, "failed", "dispatch_failed", err.Error())
    // 2. 释放 Worker 容量
    l.router.Release(ctx, worker)
    // 3. 继续下一个 shard
    continue
}

分发失败不算任务执行失败,而是"调度失败"。实例状态标记为 failed,由重试循环后续处理。

小结

任务分发模块的设计要点:

  1. 路由与发送分离:Router 只管"选谁",Dispatcher 只管"怎么发",职责清晰
  2. 双策略可配置:Least-Loaded 适合大多数场景,Round-Robin 适合同构 Worker
  3. Label 过滤:支持基于标签的 Worker 分组,实现异构集群调度
  4. 双协议自适应:HTTP 简单通用,gRPC 高性能,Worker 自选
  5. 背压保护:pending 上限 + 渐进式节流,防止系统被任务淹没

整个 Router + Dispatcher 不到 300 行代码,但覆盖了调度系统的核心分发能力。