任务触发与重试的可靠性设计

拆解 go-ai-scheduler 的任务触发、分发和重试链路,探讨 DAG 依赖、分片调度、双协议分发和去重幂等的设计取舍。

问题背景

调度系统的可靠性体现在两个层面:触发侧不能漏任务、不能重复触发;执行侧不能重复执行、不能丢失结果。本文从 trigger loop 到 worker 端,拆解 go-ai-scheduler 在这两个层面的设计。

一条完整的任务链路是这样的:

Trigger Loop 扫描到期任务
  → DAG 依赖检查
  → 创建 TaskInstance
  → Router 选择 Worker
  → Dispatch Client 分发
  → Worker 执行并上报结果
  → (失败时) Retry Loop 重新调度

每个环节都有特定的可靠性保障机制。

Trigger Loop:从扫描到分发

Trigger Loop 是调度器的主循环,每 5 秒扫描一次到期任务:

// internal/scheduler/trigger/loop.go
func (l *Loop) Start(ctx context.Context) {
    ticker := time.NewTicker(l.interval) // 5s
    for {
        select {
        case <-ticker.C:
            l.scan(ctx)
        }
    }
}

背压前置检查

扫描开始前,先检查系统负载:

pending, _ := l.instanceRepo.CountInstancesByStatus(ctx, "pending")
if l.bp != nil {
    l.bp.UpdatePending(ctx, pending)
    if !l.bp.AllowDispatch() {
        return // Red 状态直接拒绝扫描
    }
    if delay := l.bp.ThrottleDelay(); delay > 0 {
        time.Sleep(delay) // Yellow 状态延迟处理
    }
}

这个设计把背压检查放在了扫描入口,而不是分发环节。原因是:如果 pending 任务已经堆积,继续扫描只会让情况更糟。宁可让任务在数据库里多等几秒,也不要让系统过载。

双数据源:缓存优先,数据库兜底

Trigger Loop 支持从 Redis 缓存读取到期任务 ID,减少数据库查询:

if l.cache != nil && l.cache.Enabled() {
    cachedIDs, cacheErr := l.cache.GetCachedDueTaskIDs(ctx, time.Now())
    if cacheErr == nil && len(cachedIDs) > 0 {
        // 从缓存获取,按 ID 查详情
        for _, id := range cachedIDs {
            t, _ := l.taskRepo.GetTask(ctx, id)
            if t.Status == "enabled" && !t.NextTriggerTime.After(time.Now()) {
                tasks = append(tasks, t)
            }
        }
        if len(tasks) > 100 {
            tasks = tasks[:100]
        }
    }
}

if len(tasks) == 0 {
    tasks, _ = l.taskRepo.ListDueTasks(ctx, 100)
}

这里有两个细节:

  1. 缓存降级:Redis 不可用或缓存为空时,自动回退到数据库查询
  2. 批量限制:单次最多处理 100 个任务,防止单次扫描拖垮系统

DAG 依赖检查

任务可能配置上游依赖。Trigger Loop 在创建实例前会检查所有上游任务是否有成功的执行记录:

upstream, _ := l.taskRepo.ListUpstreamDeps(ctx, task.ID)
if len(upstream) > 0 {
    allSatisfied := true
    for _, depID := range upstream {
        instances, _ := l.instanceRepo.ListInstancesByStatus(ctx, "success", 1)
        found := false
        for _, inst := range instances {
            if inst.TaskID == depID {
                found = true; break
            }
        }
        if !found { allSatisfied = false; break }
    }
    if !allSatisfied {
        task.NextTriggerTime = time.Now().Add(5 * time.Second)
        l.taskRepo.UpdateTask(ctx, task)
        return // 延迟 5 秒再试
    }
}

依赖检查的实现比较朴素:查询每个上游任务的最新成功实例。这种方式在依赖链较长时会有多次查询,但胜在简单可靠。

<Callout type="tip" title="为什么没有使用拓扑排序预计算?"

DAG 依赖在运行时动态变化(任务可以新增/删除依赖),预计算拓扑序需要额外的维护成本。当前场景下依赖链通常很短(2-3 层),实时检查的开销可以接受。

任务分片

对于需要横向扩展的任务,Trigger Loop 支持分片调度:

shardTotal := task.TotalShards
if shardTotal <= 0 { shardTotal = 1 }

for shard := 0; shard < shardTotal; shard++ {
    instance := &model.TaskInstance{
        TaskID:      task.ID,
        ShardNo:     shard,
        ShardTotal:  task.TotalShards,
        Status:      "pending",
    }
    l.instanceRepo.CreateInstance(ctx, instance)
    // 选 Worker → 分发
}

每个分片生成独立的 TaskInstance,拥有独立的 ScheduleInstanceID。Worker 执行时通过环境变量 SHARD_NOSHARD_TOTAL 感知自己的分片身份。

Retry Loop:失败后的自愈

Retry Loop 独立于 Trigger Loop 运行,每 3 秒扫描状态为 retry_waiting 的实例:

// internal/scheduler/retry/loop.go
func (l *Loop) Start(ctx context.Context) {
    ticker := time.NewTicker(l.interval) // 3s
    for {
        select {
        case <-ticker.C:
            l.scan(ctx)
        }
    }
}

Retry Loop 的 scan 逻辑与 Trigger Loop 类似:查询待重试实例 → 加载对应任务 → 选 Worker → 分发。关键区别在于:

  1. 不检查 DAG 依赖:重试实例已经通过了首次触发的依赖检查
  2. 保留 retry count:分发时传递当前的 RetryCount,Worker 可以据此调整行为
  3. 失败回退:如果分发再次失败,实例状态回退为 retry_waiting,等待下一轮
if err := l.dispatcher.Dispatch(ctx, worker, req); err != nil {
    _ = l.instances.UpdateInstanceStatus(ctx, instance.ID, "retry_waiting")
    _ = l.router.Release(ctx, worker)
    continue
}

<Callout type="info" title="重试策略在哪里配置?"

重试间隔和最大重试次数存储在 Task 模型的字段中(RetryIntervalMaxRetryCount)。Retry Loop 只负责按配置调度,不硬编码策略。

Dispatch Client:双协议透明切换

调度器与 Worker 之间的通信支持 HTTP 和 gRPC 两种协议。Dispatch Client 根据 Worker 注册的协议自动选择:

// internal/scheduler/dispatch/client.go
func (c *Client) Dispatch(ctx context.Context, worker *model.WorkerNode, req model.ExecuteTaskRequest) error {
    if c.rateLimiter != nil {
        if !c.rateLimiter.Allow() {
            return fmt.Errorf("dispatch rate limit exceeded")
        }
    }
    if strings.EqualFold(worker.Protocol, "grpc") {
        return c.dispatchGRPC(ctx, worker.GRPCAddr, req)
    }
    return c.dispatchHTTP(ctx, worker.CallbackURL, req)
}

令牌桶限流

Dispatch Client 内置了令牌桶限流器,默认 3000 QPS:

func NewClientWithRateLimiter(dispatchRatePerSec int) *Client {
    return &Client{
        rateLimiter: ratelimit.NewTokenBucket(dispatchRatePerSec, dispatchRatePerSec*2),
    }
}

令牌桶的参数设计:

  • rate:每秒产生令牌数,决定长期吞吐量
  • burst:桶容量(rate × 2),允许短时间的突发流量

gRPC vs HTTP 的取舍

维度HTTPgRPC
连接开销短连接,每次新建长连接,复用
序列化JSON,可读性好Protobuf,体积小
流式通信需要 SSE / WebSocket原生支持双向流
调试成本低(curl 即可)高(需要 grpcurl)
适用场景快速接入、跨语言高频通信、低延迟

在 go-ai-scheduler 的场景中,HTTP 是默认协议,gRPC 作为可选优化。Worker 注册时声明自己支持的协议,调度器据此选择。

Worker 端的可靠性保障

去重缓存

Worker 使用 sync.Map 维护了一个带 TTL 的去重缓存,防止同一个实例被重复执行:

// internal/worker/handler.go
func (h *Handler) isDuplicate(scheduleID string) bool {
    if _, loaded := h.dedupMap.LoadOrStore(scheduleID, time.Now()); loaded {
        return true
    }
    return false
}

后台有一个 30 秒周期的清理 goroutine,删除超过 5 分钟的老旧条目:

func (h *Handler) StartDedupEviction(ctx context.Context, ttl, interval time.Duration) {
    go func() {
        ticker := time.NewTicker(interval) // 30s
        for {
            cutoff := time.Now().Add(-ttl) // 5min
            h.dedupMap.Range(func(key, value any) bool {
                if seen, ok := value.(time.Time); ok && seen.Before(cutoff) {
                    h.dedupMap.Delete(key)
                }
                return true
            })
        }
    }()
}

执行幂等

Worker 接收到的请求中包含 IdempotencyKey,Shell 和 HTTP 任务执行时会将其注入环境变量:

output, execErr = Execute(ctx, req.TaskType, req.Payload, req.Image, map[string]string{
    "IDEMPOTENCY_KEY":      req.IdempotencyKey,
    "SHARD_NO":             fmt.Sprintf("%d", req.ShardNo),
    "SHARD_TOTAL":          fmt.Sprintf("%d", req.ShardTotal),
    "SCHEDULE_INSTANCE_ID": req.ScheduleInstanceID,
})

业务逻辑可以读取 IDEMPOTENCY_KEY 实现自身的幂等控制(例如写入数据库时做唯一索引检查)。

本地缓冲与断网续传

当 Worker 完成任务但上报状态到调度器失败时,会将报告写入本地磁盘,后台定期重试:

// internal/worker/handler.go
if err := h.reporter.Report(ctx, req.SchedulerURL, statusReq); err != nil {
    if h.localStore != nil {
        h.localStore.Buffer(req.SchedulerURL, statusReq)
    }
}

LocalStore 的实现很朴素:每个报告序列化为 JSON 写入文件,文件名使用 ScheduleInstanceID。Flush loop 每 30 秒读取目录,逐个重试投递。

全链路的可靠性总结

环节风险保障机制
任务扫描漏扫/重复扫描背压限流 + 双数据源 + 批量限制
DAG 依赖依赖未满足就触发实时检查 + 延迟重试
任务分发重复分发Worker 端去重缓存
任务执行重复执行全局唯一 ScheduleInstanceID + 幂等 Key
结果上报网络失败导致状态丢失本地磁盘缓冲 + 后台重试
失败恢复重试风暴独立 Retry Loop + 令牌桶限流

总结

go-ai-scheduler 的可靠性设计遵循一个核心原则:每个环节都假设上游可能不可靠

  • Trigger Loop 假设数据库可能慢,所以加了缓存和批量限制
  • Dispatch Client 假设 Worker 可能失联,所以加了限流和协议降级
  • Worker 假设调度器可能重复分发,所以加了去重和幂等
  • Reporter 假设网络可能抖动,所以加了本地缓冲

这些机制层层叠加,最终形成了一条可以容忍单点故障的调度链路。

下一篇会分析调度器的 Leader 选举机制,探讨如何在 etcd、MySQL 和本地模式之间优雅降级。