混合调度引擎:时间轮 + 最小堆

深入 go-ai-scheduler 的调度引擎实现,剖析时间轮与最小堆如何在 Go 中协同工作,实现低延迟、高精度的任务触发。

问题背景

在 go-ai-scheduler 的调度器中,最核心的需求是:在大量定时任务中快速找出当前需要触发的任务。如果依赖数据库轮询,假设每 100ms 查询一次、每次查询 10000 条任务,数据库很快会成为瓶颈。

业界常见的解决方案有几种:

方案优点缺点
数据库轮询实现简单性能差,精度受限于轮询间隔
最小堆精确到毫秒级大量任务时堆调整开销大
时间轮触发极快 O(1)只能覆盖固定时间窗口,精度受 tick 限制
层级时间轮覆盖任意时间范围实现复杂,需要多级跳转

go-ai-scheduler 选择了一个务实的折中:单层时间轮处理高频周期性任务,最小堆处理精确触发和重试任务。这个混合引擎的代码不到 250 行,但设计上有不少值得细品的地方。

时间轮的实现

时间轮的本质是一个环形缓冲区。每个槽位对应一个时间 tick,任务按触发时间取模后落入对应槽位。

// internal/scheduler/engine/timing_wheel.go
const (
    defaultTickDuration = 100 * time.Millisecond
    defaultWheelSize    = 600 // 60 seconds with 100ms ticks
)

type TimingWheel struct {
    mu           sync.Mutex
    slots        []slot
    tickDuration time.Duration
    wheelSize    int
    currentPos   int
}

type slot struct {
    taskIDs map[int64]struct{}
}

任务入槽

任务按触发时间的纳秒数除以 tick 时长取模,决定落入哪个槽位:

func (tw *TimingWheel) Add(taskID int64, triggerTime time.Time) {
    tw.mu.Lock()
    defer tw.mu.Unlock()

    ticks := int(triggerTime.UnixNano() / tw.tickDuration.Nanoseconds())
    pos := ticks % tw.wheelSize
    tw.slots[pos].taskIDs[taskID] = struct{}{}
}

这里有一个隐含的假设:所有任务的触发时间都在时间轮覆盖的 60 秒窗口内。对于超出窗口的任务(比如 5 分钟后才触发的),调度器不会在启动时就加入时间轮,而是等引擎每 10 秒预热时重新评估。

Tick 推进

每次 tick 只处理当前槽位,提取任务后清空槽位:

func (tw *TimingWheel) Tick() []int64 {
    tw.mu.Lock()
    tw.currentPos = (tw.currentPos + 1) % tw.wheelSize
    cur := tw.slots[tw.currentPos]

    taskIDs := make([]int64, 0, len(cur.taskIDs))
    for id := range cur.taskIDs {
        taskIDs = append(taskIDs, id)
    }
    tw.slots[tw.currentPos].taskIDs = make(map[int64]struct{})
    tw.mu.Unlock()
    return taskIDs
}

注意 Tick() 的设计非常克制:

  1. 只提取,不触发:返回任务 ID 列表,由上层决定如何处理(创建实例、选 Worker、分发)
  2. 清空槽位:防止同一个任务被重复触发
  3. 锁粒度小:只保护槽位操作,不阻塞触发回调

最小堆的引入

时间轮有两个天然限制:

  1. 精度限制:100ms 的 tick 意味着任务触发时间会被"对齐"到最近的 tick
  2. 范围限制:只能覆盖 60 秒的窗口

对于重试任务(可能需要 3 秒后精确触发)和短期一次性任务,最小堆是更好的选择。

// internal/scheduler/engine/engine.go
type Engine struct {
    wheel   *TimingWheel
    heap    *taskHeap
    mu      sync.Mutex
    OnTrigger func(taskID int64)
}

func (e *Engine) Start(ctx context.Context) {
    wheelTicker := time.NewTicker(e.wheel.TickDuration())
    heapTicker := time.NewTicker(50 * time.Millisecond)

    for {
        select {
        case <-wheelTicker.C:
            e.processWheelTick()
        case <-heapTicker.C:
            e.processHeapTick()
        }
    }
}

堆的扫描频率是 50ms,精度比时间轮高一倍。堆中存储的是 triggerTime 小于当前时间的所有任务:

func (e *Engine) processHeapTick() {
    e.mu.Lock()
    items := e.heap.PopUntil(time.Now())
    e.mu.Unlock()

    for _, item := range items {
        if e.OnTrigger != nil {
            e.OnTrigger(item.TaskID)
        }
    }
}

预热机制:从数据库到内存

引擎启动时不会立即工作,而是先执行一次 Warm(),从数据库加载数据:

func (e *Engine) Warm(ctx context.Context) error {
    span := e.wheel.SlotSpan()       // 60 seconds
    cutoff := time.Now().Add(span)

    // 加载未来 60 秒内的 cron 任务到时间轮
    tasks, err := e.taskRepo.ListDueTasks(ctx, 500)
    for _, t := range tasks {
        if t.NextTriggerTime.Before(cutoff) {
            e.AddToWheel(t.ID, t.NextTriggerTime)
        }
    }

    // 加载重试任务到最小堆
    retryInstances, err := e.instanceRepo.ListDueRetryInstances(ctx, cutoff, 500)
    for _, inst := range retryInstances {
        if !inst.NextRetryTime.IsZero() && inst.NextRetryTime.Before(cutoff) {
            e.AddToHeap(inst.TaskID, inst.NextRetryTime)
        }
    }
    return nil
}

预热有两个设计细节:

  1. 批量加载:每次最多 500 条,防止大数据量拖慢启动
  2. 双数据源:cron 任务进时间轮,重试任务进最小堆,各司其职

预热完成后,后台还有一个 10 秒周期的 WarmPeriodically loop,持续补充新进入窗口的任务。

两种引擎的协作模式

维度时间轮最小堆
扫描间隔100ms50ms
适用场景周期性 cron 任务重试、短期精确任务
插入复杂度O(1)O(log n)
查询复杂度O(1)O(log n)
时间覆盖60 秒任意
精度100ms50ms

踩坑:时间轮的边界情况

实现时间轮时遇到过几个实际问题。

任务更新后的槽位清理

当用户修改了任务的 cron 表达式,旧槽位上的任务 ID 需要被移除:

func (tw *TimingWheel) Remove(taskID int64) {
    tw.mu.Lock()
    defer tw.mu.Unlock()
    for i := range tw.slots {
        delete(tw.slots[i].taskIDs, taskID)
    }
}

当前实现是全量扫描所有槽位。600 个槽位 × 每个槽位平均少量任务,这个开销可以接受。如果槽位数量增加到数万,就需要建立反向索引(taskID → slotPos)来优化。

任务触发后下次入轮

时间轮只覆盖 60 秒,一个 cron 任务触发后,下次触发时间可能是 1 小时后。此时不能直接 AddToWheel,而是等下一次预热周期:

// cmd/scheduler/main.go
nextTrigger, err := cronexpr.NextAfter(time.Now(), task.CronExpr)
if err == nil && !nextTrigger.IsZero() {
    task.NextTriggerTime = nextTrigger
    resources.Repositories.Task.UpdateTask(leaderCtx, task)
    schedEngine.AddToWheel(task.ID, nextTrigger)
}

这里其实有一个微妙的 race condition:如果更新数据库成功但 AddToWheel 失败(比如引擎刚好在 tick),任务会丢失直到下次预热。生产环境中可以增加一个兜底检查:每次预热时对比数据库中 NextTriggerTime 已过期但时间轮中不存在的任务。

与 Redis 缓存的协同

在更大规模的部署中,预热循环可以从 Redis 读取而不是直接查 MySQL:

// internal/scheduler/cache/manager.go
func (m *Manager) warmDueTasks(ctx context.Context) {
    tasks, err := m.taskRepo.ListDueTasks(ctx, 500)
    // 将任务 ID 和触发时间写入 Redis Sorted Set
    m.redis.WarmDueTasks(ctx, ids, scores)
}

Redis 在这里扮演的是读缓存角色。即使 Redis 不可用,调度器也会优雅降级到直接查询 MySQL。这种 optional 设计让系统在小规模部署时不需要额外的基础设施。

总结

go-ai-scheduler 的混合引擎是一个典型的**"够用就好"**设计:

  • 时间轮负责高频、粗粒度的周期性触发,O(1) 的查询性能让它可以轻松处理数千任务的并发场景
  • 最小堆补充精确触发和重试场景,50ms 的扫描间隔提供了足够的精度
  • 预热机制桥接了持久化存储与内存引擎,10 秒的刷新周期在实时性和数据库负载之间取得了平衡

这个设计的关键洞察是:不需要一个完美的数据结构解决所有问题,而是让两个 imperfect 的结构在各自擅长的领域协同工作

下一篇会分析调度器的分发链路,包括负载感知路由、背压控制,以及 HTTP/gRPC 双协议透明的实现。