混合调度引擎:时间轮与最小堆的实现

深入分析 go-ai-scheduler 的调度引擎实现,理解时间轮与最小堆的混合设计如何兼顾周期任务与精确触发。

为什么需要混合引擎

调度系统通常面对两类任务:

  • 周期任务:Cron 表达式定义的定期执行,精度到秒级即可
  • 一次性/重试任务:需要精确到毫秒级的触发,比如失败后的固定间隔重试

单一数据结构难以同时满足两种需求。时间轮适合周期任务但精度受 tick 限制,最小堆适合精确触发但扫描成本高。

go-ai-scheduler 的做法是两者结合:时间轮负责粗粒度 Cron 触发,最小堆负责精度的重试和短期任务。

Engine (混合调度引擎)
  ├─ TimingWheel: 100ms tick, 600 slots → 覆盖 60 秒
  └─ taskHeap (min-heap): 50ms 精确扫描

时间轮实现

// timing_wheel.go
type TimingWheel struct {
    mu           sync.Mutex
    slots        []slot           // 环形槽数组
    tickDuration time.Duration    // 每个 tick 的时长
    wheelSize    int              // 槽数量
    currentPos   int              // 当前位置
}

type slot struct {
    taskIDs map[int64]struct{}   // 该槽位到期的任务 ID 集合
}

参数选择

  • tickDuration = 100ms:Cron 任务通常不需要亚秒级精度
  • wheelSize = 600:覆盖 100ms × 600 = 60秒
  • 意味着时间轮只能管理未来 60 秒内的到期任务

核心操作

func (tw *TimingWheel) Add(taskID int64, triggerTime time.Time) {
    ticks := int(triggerTime.UnixNano() / tw.tickDuration.Nanoseconds())
    pos := ticks % tw.wheelSize
    tw.slots[pos].taskIDs[taskID] = struct{}{}
}

func (tw *TimingWheel) Tick() []int64 {
    tw.currentPos = (tw.currentPos + 1) % tw.wheelSize
    // 提取并清空当前槽位
    taskIDs := extract(tw.slots[tw.currentPos])
    tw.slots[tw.currentPos] = emptySlot()
    return taskIDs
}

每次 Tick() 前进一格,返回当前槽位所有任务 ID。操作是 O(1) 的。

时间轮的局限

  1. 固定时间范围:只能管理未来 60 秒内的任务
  2. 精度受限:100ms tick 意味着最快 100ms 才能触发
  3. 无优先级概念:同一槽位内的任务没有执行顺序

这些局限正好是最小堆的强项。

最小堆实现

// engine.go
type Engine struct {
    wheel   *TimingWheel
    heap    *taskHeap           // 基于 container/heap 的最小堆
    mu      sync.Mutex
    // ...
}

type heapItem struct {
    TaskID      int64
    TriggerTime time.Time
}

type taskHeap []heapItem

func (h taskHeap) Less(i, j int) bool {
    return h[i].TriggerTime.Before(h[j].TriggerTime)
}

核心操作

func (e *Engine) AddToHeap(taskID int64, triggerTime time.Time) {
    e.mu.Lock()
    heap.Push(e.heap, heapItem{TaskID: taskID, TriggerTime: triggerTime})
    e.mu.Unlock()
}

func (h *taskHeap) PopUntil(now time.Time) []heapItem {
    var result []heapItem
    for len(*h) > 0 && (*h)[0].TriggerTime.Before(now) {
        item := heap.Pop(h).(heapItem)
        result = append(result, item)
    }
    return result
}

最小堆保证堆顶是最近到期的任务,PopUntil 一次性取出所有已到期项。插入 O(log n),弹出 O(log n)

混合引擎的协作

func (e *Engine) Start(ctx context.Context) {
    wheelTicker := time.NewTicker(e.wheel.TickDuration())  // 100ms
    heapTicker := time.NewTicker(50 * time.Millisecond)     // 50ms

    for {
        select {
        case <-wheelTicker.C:
            e.processWheelTick()   // 处理 Cron 周期任务
        case <-heapTicker.C:
            e.processHeapTick()    // 处理精确触发任务
        }
    }
}

Engine Warm:预热机制

func (e *Engine) Warm(ctx context.Context) error {
    span := e.wheel.SlotSpan()        // 60 秒
    cutoff := time.Now().Add(span)

    // 加载未来 60 秒内的 Cron 任务到时间轮
    tasks, _ := e.taskRepo.ListDueTasks(ctx, 500)
    for _, t := range tasks {
        if t.NextTriggerTime.Before(cutoff) {
            e.AddToWheel(t.ID, t.NextTriggerTime)
        }
    }

    // 加载待重试实例到最小堆
    retryInstances, _ := e.instanceRepo.ListDueRetryInstances(ctx, cutoff, 500)
    for _, inst := range retryInstances {
        e.AddToHeap(inst.TaskID, inst.NextRetryTime)
    }
    return nil
}

Warm 在 Scheduler 启动时执行一次,之后通过 WarmPeriodically 每 10 秒增量刷新。

触发后的完整链路

Rendering diagram...

关键设计取舍

为什么不用单一数据结构?

方案优点缺点
纯时间轮触发 O(1),实现简单精度受限,范围受限,无法处理优先级
纯最小堆精度高,范围无限每次扫描需遍历,大量任务时性能下降
混合兼顾两者优势实现复杂度略高,需要维护两个结构

为什么不使用延迟队列?

Redis ZSET、RabbitMQ 延迟队列等方案确实可以替代最小堆。但项目有一条设计约束:Redis 是可选的。最小堆不依赖外部中间件,保证系统在无 Redis 时仍然可以精确调度重试任务。

小结

go-ai-scheduler 的调度引擎设计体现了几条工程原则:

  1. 分层触发:Cron 走时间轮(粗、快),重试走最小堆(精、准)
  2. 不依赖外部组件:最小堆纯内存实现,Redis 可选
  3. 预热机制:启动时批量加载,运行时增量刷新
  4. 双 ticker 并行:100ms + 50ms 各自独立运行,互不阻塞

这个混合引擎的代码量不大(TimingWheel 91 行 + Engine 152 行),但涵盖了调度系统最核心的数据结构选择问题。理解它,就能理解为什么调度系统很难用单一方案解决所有场景。