任务分发与 Worker 路由策略
分析 go-ai-scheduler 的任务分发链路,理解 Worker 注册、心跳、负载路由与背压控制的实现细节。
分发链路概述
Scheduler 决定"何时触发"任务后,下一步是"发给谁"。这条链路包括:
Scheduler Trigger Loop
│
├─ 1. 检查背压(pending 实例数是否超限)
├─ 2. 检查依赖(上游任务是否完成)
├─ 3. 拆分 shard(水平分片并行执行)
├─ 4. Router 选择 Worker
├─ 5. Dispatcher 发送请求
└─ 6. 更新任务下次触发时间
本文聚焦第 4、5 步:Router 和 Dispatcher。
Worker 注册与心跳
Worker 启动时向 Scheduler 注册:
type WorkerNode struct {
ID string
Host string
CallbackURL string
GRPCAddr string
Protocol string // "http" or "grpc"
Labels string // JSON-encoded label selector
CurrentLoad int // 当前负载计数
MaxLoad int // 最大负载
LastHeartbeat time.Time
Status string // "online", "offline"
}
Worker 定期发送心跳,Scheduler 通过心跳判断 Worker 是否存活。心跳超时的 Worker 会被标记为 offline,不再参与路由。
Router 实现
type Router struct {
workers repo.WorkerRepository
rrCounter atomic.Int64 // 轮询计数器
}
路由策略
支持两种策略,通过 task.RouteStrategy 字段配置:
1. Least-Loaded(默认)
func (r *Router) pickLeastLoaded(workers []*model.WorkerNode) *model.WorkerNode {
best := workers[0]
for _, w := range workers[1:] {
if w.CurrentLoad < best.CurrentLoad {
best = w
}
}
return best
}
选择当前负载最低的 Worker。注意这里的 "load" 是调度器视角的计数,不是 CPU/内存使用率。每次分发 CurrentLoad++,执行完成或分发失败时 CurrentLoad--。
2. Round-Robin
func (r *Router) pickRoundRobin(workers []*model.WorkerNode) *model.WorkerNode {
idx := int(r.rrCounter.Add(1)-1) % len(workers)
return workers[idx]
}
使用原子计数器保证多线程安全。轮询策略简单公平,但不考虑 Worker 实际负载。
Label 过滤
func (r *Router) filterByLabels(workers []*model.WorkerNode, selector map[string]string) []*model.WorkerNode {
if len(selector) == 0 {
return workers
}
var filtered []*model.WorkerNode
for _, w := range workers {
if model.MatchLabels(model.DecodeLabels(w.Labels), selector) {
filtered = append(filtered, w)
}
}
return filtered
}
任务可以指定 Labels(如 {"zone": "cn-north", "gpu": "true"}),Router 只将任务分发给匹配的 Worker。
完整的 Pick 流程
func (r *Router) Pick(ctx context.Context, opts SelectOptions) (*model.WorkerNode, error) {
// 1. 获取所有可用 Worker
all, _ := r.workers.ListAvailableWorkers(ctx)
// 2. Label 过滤
filtered := r.filterByLabels(all, opts.Labels)
if len(filtered) == 0 {
return nil, ErrNoAvailableWorker
}
// 3. 按策略选择
var best *model.WorkerNode
switch opts.Strategy {
case "round_robin":
best = r.pickRoundRobin(filtered)
default:
best = r.pickLeastLoaded(filtered)
}
// 4. 预留容量
best.CurrentLoad++
r.workers.UpsertWorker(ctx, best)
return best, nil
}
容量释放
func (r *Router) Release(ctx context.Context, worker *model.WorkerNode) error {
if worker.CurrentLoad > 0 {
worker.CurrentLoad--
}
return r.workers.UpsertWorker(ctx, worker)
}
释放时机:
- Worker 执行成功或失败后上报结果
- 分发失败(Worker 未收到请求)
Dispatcher 实现
Router 选出 Worker 后,Dispatcher 负责实际发送请求。
type Client struct {
httpClient *http.Client
rateLimiter *ratelimit.TokenBucket // 可选的令牌桶限流
}
双协议支持
func (c *Client) Dispatch(ctx context.Context, worker *model.WorkerNode, req model.ExecuteTaskRequest) error {
if c.rateLimiter != nil && !c.rateLimiter.Allow() {
return fmt.Errorf("dispatch rate limit exceeded")
}
if strings.EqualFold(worker.Protocol, "grpc") {
return c.dispatchGRPC(ctx, worker.GRPCAddr, req)
}
return c.dispatchHTTP(ctx, worker.CallbackURL, req)
}
HTTP 分发
func (c *Client) dispatchHTTP(ctx context.Context, callbackURL string, req model.ExecuteTaskRequest) error {
body, _ := json.Marshal(req)
url := strings.TrimRight(callbackURL, "/") + "/internal/tasks/execute"
httpReq, _ := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(httpReq)
if resp.StatusCode >= 300 {
return fmt.Errorf("worker returned status %s", resp.Status)
}
return nil
}
gRPC 分发
func (c *Client) dispatchGRPC(ctx context.Context, target string, req model.ExecuteTaskRequest) error {
conn, _ := grpc.DialContext(ctx, target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
defer conn.Close()
client := schedulerv1.NewExecutorServiceClient(conn)
resp, _ := client.ExecuteTask(ctx, &schedulerv1.ExecuteTaskRequest{...})
if !resp.GetAccepted() {
return fmt.Errorf("grpc worker rejected dispatch")
}
return nil
}
协议由 Worker 注册时声明,Scheduler 根据 worker.Protocol 自动选择。
背压控制
Trigger Loop 在分发前有两层背压保护:
1. 全局 pending 上限
pending, _ := l.instanceRepo.CountInstancesByStatus(ctx, "pending")
if pending >= l.maxPending {
// 拒绝本次扫描,等待 pending 下降
return
}
默认 maxPending = 1000。当系统中 pending 实例超过阈值时,Scheduler 停止创建新实例。
2. 渐进式节流
if delay := l.bp.ThrottleDelay(); delay > 0 {
time.Sleep(delay)
}
背压控制器根据 pending 数量计算延迟:pending 越高,扫描间隔越长。
分发失败处理
if err := l.dispatcher.Dispatch(ctx, worker, req); err != nil {
// 1. 记录失败
metrics.DefaultRegistry.IncCounter("scheduler_dispatch_total", map[string]string{"result": "error"})
l.instanceRepo.UpdateInstanceResult(ctx, instance.ScheduleInstanceID, "failed", "dispatch_failed", err.Error())
// 2. 释放 Worker 容量
l.router.Release(ctx, worker)
// 3. 继续下一个 shard
continue
}
分发失败不算任务执行失败,而是"调度失败"。实例状态标记为 failed,由重试循环后续处理。
小结
任务分发模块的设计要点:
- 路由与发送分离:Router 只管"选谁",Dispatcher 只管"怎么发",职责清晰
- 双策略可配置:Least-Loaded 适合大多数场景,Round-Robin 适合同构 Worker
- Label 过滤:支持基于标签的 Worker 分组,实现异构集群调度
- 双协议自适应:HTTP 简单通用,gRPC 高性能,Worker 自选
- 背压保护:pending 上限 + 渐进式节流,防止系统被任务淹没
整个 Router + Dispatcher 不到 300 行代码,但覆盖了调度系统的核心分发能力。