任务触发与重试的可靠性设计
拆解 go-ai-scheduler 的任务触发、分发和重试链路,探讨 DAG 依赖、分片调度、双协议分发和去重幂等的设计取舍。
问题背景
调度系统的可靠性体现在两个层面:触发侧不能漏任务、不能重复触发;执行侧不能重复执行、不能丢失结果。本文从 trigger loop 到 worker 端,拆解 go-ai-scheduler 在这两个层面的设计。
一条完整的任务链路是这样的:
Trigger Loop 扫描到期任务
→ DAG 依赖检查
→ 创建 TaskInstance
→ Router 选择 Worker
→ Dispatch Client 分发
→ Worker 执行并上报结果
→ (失败时) Retry Loop 重新调度
每个环节都有特定的可靠性保障机制。
Trigger Loop:从扫描到分发
Trigger Loop 是调度器的主循环,每 5 秒扫描一次到期任务:
// internal/scheduler/trigger/loop.go
func (l *Loop) Start(ctx context.Context) {
ticker := time.NewTicker(l.interval) // 5s
for {
select {
case <-ticker.C:
l.scan(ctx)
}
}
}
背压前置检查
扫描开始前,先检查系统负载:
pending, _ := l.instanceRepo.CountInstancesByStatus(ctx, "pending")
if l.bp != nil {
l.bp.UpdatePending(ctx, pending)
if !l.bp.AllowDispatch() {
return // Red 状态直接拒绝扫描
}
if delay := l.bp.ThrottleDelay(); delay > 0 {
time.Sleep(delay) // Yellow 状态延迟处理
}
}
这个设计把背压检查放在了扫描入口,而不是分发环节。原因是:如果 pending 任务已经堆积,继续扫描只会让情况更糟。宁可让任务在数据库里多等几秒,也不要让系统过载。
双数据源:缓存优先,数据库兜底
Trigger Loop 支持从 Redis 缓存读取到期任务 ID,减少数据库查询:
if l.cache != nil && l.cache.Enabled() {
cachedIDs, cacheErr := l.cache.GetCachedDueTaskIDs(ctx, time.Now())
if cacheErr == nil && len(cachedIDs) > 0 {
// 从缓存获取,按 ID 查详情
for _, id := range cachedIDs {
t, _ := l.taskRepo.GetTask(ctx, id)
if t.Status == "enabled" && !t.NextTriggerTime.After(time.Now()) {
tasks = append(tasks, t)
}
}
if len(tasks) > 100 {
tasks = tasks[:100]
}
}
}
if len(tasks) == 0 {
tasks, _ = l.taskRepo.ListDueTasks(ctx, 100)
}
这里有两个细节:
- 缓存降级:Redis 不可用或缓存为空时,自动回退到数据库查询
- 批量限制:单次最多处理 100 个任务,防止单次扫描拖垮系统
DAG 依赖检查
任务可能配置上游依赖。Trigger Loop 在创建实例前会检查所有上游任务是否有成功的执行记录:
upstream, _ := l.taskRepo.ListUpstreamDeps(ctx, task.ID)
if len(upstream) > 0 {
allSatisfied := true
for _, depID := range upstream {
instances, _ := l.instanceRepo.ListInstancesByStatus(ctx, "success", 1)
found := false
for _, inst := range instances {
if inst.TaskID == depID {
found = true; break
}
}
if !found { allSatisfied = false; break }
}
if !allSatisfied {
task.NextTriggerTime = time.Now().Add(5 * time.Second)
l.taskRepo.UpdateTask(ctx, task)
return // 延迟 5 秒再试
}
}
依赖检查的实现比较朴素:查询每个上游任务的最新成功实例。这种方式在依赖链较长时会有多次查询,但胜在简单可靠。
<Callout type="tip" title="为什么没有使用拓扑排序预计算?"
DAG 依赖在运行时动态变化(任务可以新增/删除依赖),预计算拓扑序需要额外的维护成本。当前场景下依赖链通常很短(2-3 层),实时检查的开销可以接受。
任务分片
对于需要横向扩展的任务,Trigger Loop 支持分片调度:
shardTotal := task.TotalShards
if shardTotal <= 0 { shardTotal = 1 }
for shard := 0; shard < shardTotal; shard++ {
instance := &model.TaskInstance{
TaskID: task.ID,
ShardNo: shard,
ShardTotal: task.TotalShards,
Status: "pending",
}
l.instanceRepo.CreateInstance(ctx, instance)
// 选 Worker → 分发
}
每个分片生成独立的 TaskInstance,拥有独立的 ScheduleInstanceID。Worker 执行时通过环境变量 SHARD_NO 和 SHARD_TOTAL 感知自己的分片身份。
Retry Loop:失败后的自愈
Retry Loop 独立于 Trigger Loop 运行,每 3 秒扫描状态为 retry_waiting 的实例:
// internal/scheduler/retry/loop.go
func (l *Loop) Start(ctx context.Context) {
ticker := time.NewTicker(l.interval) // 3s
for {
select {
case <-ticker.C:
l.scan(ctx)
}
}
}
Retry Loop 的 scan 逻辑与 Trigger Loop 类似:查询待重试实例 → 加载对应任务 → 选 Worker → 分发。关键区别在于:
- 不检查 DAG 依赖:重试实例已经通过了首次触发的依赖检查
- 保留 retry count:分发时传递当前的
RetryCount,Worker 可以据此调整行为 - 失败回退:如果分发再次失败,实例状态回退为
retry_waiting,等待下一轮
if err := l.dispatcher.Dispatch(ctx, worker, req); err != nil {
_ = l.instances.UpdateInstanceStatus(ctx, instance.ID, "retry_waiting")
_ = l.router.Release(ctx, worker)
continue
}
<Callout type="info" title="重试策略在哪里配置?"
重试间隔和最大重试次数存储在 Task 模型的字段中(RetryInterval、MaxRetryCount)。Retry Loop 只负责按配置调度,不硬编码策略。
Dispatch Client:双协议透明切换
调度器与 Worker 之间的通信支持 HTTP 和 gRPC 两种协议。Dispatch Client 根据 Worker 注册的协议自动选择:
// internal/scheduler/dispatch/client.go
func (c *Client) Dispatch(ctx context.Context, worker *model.WorkerNode, req model.ExecuteTaskRequest) error {
if c.rateLimiter != nil {
if !c.rateLimiter.Allow() {
return fmt.Errorf("dispatch rate limit exceeded")
}
}
if strings.EqualFold(worker.Protocol, "grpc") {
return c.dispatchGRPC(ctx, worker.GRPCAddr, req)
}
return c.dispatchHTTP(ctx, worker.CallbackURL, req)
}
令牌桶限流
Dispatch Client 内置了令牌桶限流器,默认 3000 QPS:
func NewClientWithRateLimiter(dispatchRatePerSec int) *Client {
return &Client{
rateLimiter: ratelimit.NewTokenBucket(dispatchRatePerSec, dispatchRatePerSec*2),
}
}
令牌桶的参数设计:
- rate:每秒产生令牌数,决定长期吞吐量
- burst:桶容量(rate × 2),允许短时间的突发流量
gRPC vs HTTP 的取舍
| 维度 | HTTP | gRPC |
|---|---|---|
| 连接开销 | 短连接,每次新建 | 长连接,复用 |
| 序列化 | JSON,可读性好 | Protobuf,体积小 |
| 流式通信 | 需要 SSE / WebSocket | 原生支持双向流 |
| 调试成本 | 低(curl 即可) | 高(需要 grpcurl) |
| 适用场景 | 快速接入、跨语言 | 高频通信、低延迟 |
在 go-ai-scheduler 的场景中,HTTP 是默认协议,gRPC 作为可选优化。Worker 注册时声明自己支持的协议,调度器据此选择。
Worker 端的可靠性保障
去重缓存
Worker 使用 sync.Map 维护了一个带 TTL 的去重缓存,防止同一个实例被重复执行:
// internal/worker/handler.go
func (h *Handler) isDuplicate(scheduleID string) bool {
if _, loaded := h.dedupMap.LoadOrStore(scheduleID, time.Now()); loaded {
return true
}
return false
}
后台有一个 30 秒周期的清理 goroutine,删除超过 5 分钟的老旧条目:
func (h *Handler) StartDedupEviction(ctx context.Context, ttl, interval time.Duration) {
go func() {
ticker := time.NewTicker(interval) // 30s
for {
cutoff := time.Now().Add(-ttl) // 5min
h.dedupMap.Range(func(key, value any) bool {
if seen, ok := value.(time.Time); ok && seen.Before(cutoff) {
h.dedupMap.Delete(key)
}
return true
})
}
}()
}
执行幂等
Worker 接收到的请求中包含 IdempotencyKey,Shell 和 HTTP 任务执行时会将其注入环境变量:
output, execErr = Execute(ctx, req.TaskType, req.Payload, req.Image, map[string]string{
"IDEMPOTENCY_KEY": req.IdempotencyKey,
"SHARD_NO": fmt.Sprintf("%d", req.ShardNo),
"SHARD_TOTAL": fmt.Sprintf("%d", req.ShardTotal),
"SCHEDULE_INSTANCE_ID": req.ScheduleInstanceID,
})
业务逻辑可以读取 IDEMPOTENCY_KEY 实现自身的幂等控制(例如写入数据库时做唯一索引检查)。
本地缓冲与断网续传
当 Worker 完成任务但上报状态到调度器失败时,会将报告写入本地磁盘,后台定期重试:
// internal/worker/handler.go
if err := h.reporter.Report(ctx, req.SchedulerURL, statusReq); err != nil {
if h.localStore != nil {
h.localStore.Buffer(req.SchedulerURL, statusReq)
}
}
LocalStore 的实现很朴素:每个报告序列化为 JSON 写入文件,文件名使用 ScheduleInstanceID。Flush loop 每 30 秒读取目录,逐个重试投递。
全链路的可靠性总结
| 环节 | 风险 | 保障机制 |
|---|---|---|
| 任务扫描 | 漏扫/重复扫描 | 背压限流 + 双数据源 + 批量限制 |
| DAG 依赖 | 依赖未满足就触发 | 实时检查 + 延迟重试 |
| 任务分发 | 重复分发 | Worker 端去重缓存 |
| 任务执行 | 重复执行 | 全局唯一 ScheduleInstanceID + 幂等 Key |
| 结果上报 | 网络失败导致状态丢失 | 本地磁盘缓冲 + 后台重试 |
| 失败恢复 | 重试风暴 | 独立 Retry Loop + 令牌桶限流 |
总结
go-ai-scheduler 的可靠性设计遵循一个核心原则:每个环节都假设上游可能不可靠。
- Trigger Loop 假设数据库可能慢,所以加了缓存和批量限制
- Dispatch Client 假设 Worker 可能失联,所以加了限流和协议降级
- Worker 假设调度器可能重复分发,所以加了去重和幂等
- Reporter 假设网络可能抖动,所以加了本地缓冲
这些机制层层叠加,最终形成了一条可以容忍单点故障的调度链路。
下一篇会分析调度器的 Leader 选举机制,探讨如何在 etcd、MySQL 和本地模式之间优雅降级。