混合调度引擎:时间轮 + 最小堆
深入 go-ai-scheduler 的调度引擎实现,剖析时间轮与最小堆如何在 Go 中协同工作,实现低延迟、高精度的任务触发。
问题背景
在 go-ai-scheduler 的调度器中,最核心的需求是:在大量定时任务中快速找出当前需要触发的任务。如果依赖数据库轮询,假设每 100ms 查询一次、每次查询 10000 条任务,数据库很快会成为瓶颈。
业界常见的解决方案有几种:
| 方案 | 优点 | 缺点 |
|---|---|---|
| 数据库轮询 | 实现简单 | 性能差,精度受限于轮询间隔 |
| 最小堆 | 精确到毫秒级 | 大量任务时堆调整开销大 |
| 时间轮 | 触发极快 O(1) | 只能覆盖固定时间窗口,精度受 tick 限制 |
| 层级时间轮 | 覆盖任意时间范围 | 实现复杂,需要多级跳转 |
go-ai-scheduler 选择了一个务实的折中:单层时间轮处理高频周期性任务,最小堆处理精确触发和重试任务。这个混合引擎的代码不到 250 行,但设计上有不少值得细品的地方。
时间轮的实现
时间轮的本质是一个环形缓冲区。每个槽位对应一个时间 tick,任务按触发时间取模后落入对应槽位。
// internal/scheduler/engine/timing_wheel.go
const (
defaultTickDuration = 100 * time.Millisecond
defaultWheelSize = 600 // 60 seconds with 100ms ticks
)
type TimingWheel struct {
mu sync.Mutex
slots []slot
tickDuration time.Duration
wheelSize int
currentPos int
}
type slot struct {
taskIDs map[int64]struct{}
}
任务入槽
任务按触发时间的纳秒数除以 tick 时长取模,决定落入哪个槽位:
func (tw *TimingWheel) Add(taskID int64, triggerTime time.Time) {
tw.mu.Lock()
defer tw.mu.Unlock()
ticks := int(triggerTime.UnixNano() / tw.tickDuration.Nanoseconds())
pos := ticks % tw.wheelSize
tw.slots[pos].taskIDs[taskID] = struct{}{}
}
这里有一个隐含的假设:所有任务的触发时间都在时间轮覆盖的 60 秒窗口内。对于超出窗口的任务(比如 5 分钟后才触发的),调度器不会在启动时就加入时间轮,而是等引擎每 10 秒预热时重新评估。
Tick 推进
每次 tick 只处理当前槽位,提取任务后清空槽位:
func (tw *TimingWheel) Tick() []int64 {
tw.mu.Lock()
tw.currentPos = (tw.currentPos + 1) % tw.wheelSize
cur := tw.slots[tw.currentPos]
taskIDs := make([]int64, 0, len(cur.taskIDs))
for id := range cur.taskIDs {
taskIDs = append(taskIDs, id)
}
tw.slots[tw.currentPos].taskIDs = make(map[int64]struct{})
tw.mu.Unlock()
return taskIDs
}
注意 Tick() 的设计非常克制:
- 只提取,不触发:返回任务 ID 列表,由上层决定如何处理(创建实例、选 Worker、分发)
- 清空槽位:防止同一个任务被重复触发
- 锁粒度小:只保护槽位操作,不阻塞触发回调
最小堆的引入
时间轮有两个天然限制:
- 精度限制:100ms 的 tick 意味着任务触发时间会被"对齐"到最近的 tick
- 范围限制:只能覆盖 60 秒的窗口
对于重试任务(可能需要 3 秒后精确触发)和短期一次性任务,最小堆是更好的选择。
// internal/scheduler/engine/engine.go
type Engine struct {
wheel *TimingWheel
heap *taskHeap
mu sync.Mutex
OnTrigger func(taskID int64)
}
func (e *Engine) Start(ctx context.Context) {
wheelTicker := time.NewTicker(e.wheel.TickDuration())
heapTicker := time.NewTicker(50 * time.Millisecond)
for {
select {
case <-wheelTicker.C:
e.processWheelTick()
case <-heapTicker.C:
e.processHeapTick()
}
}
}
堆的扫描频率是 50ms,精度比时间轮高一倍。堆中存储的是 triggerTime 小于当前时间的所有任务:
func (e *Engine) processHeapTick() {
e.mu.Lock()
items := e.heap.PopUntil(time.Now())
e.mu.Unlock()
for _, item := range items {
if e.OnTrigger != nil {
e.OnTrigger(item.TaskID)
}
}
}
预热机制:从数据库到内存
引擎启动时不会立即工作,而是先执行一次 Warm(),从数据库加载数据:
func (e *Engine) Warm(ctx context.Context) error {
span := e.wheel.SlotSpan() // 60 seconds
cutoff := time.Now().Add(span)
// 加载未来 60 秒内的 cron 任务到时间轮
tasks, err := e.taskRepo.ListDueTasks(ctx, 500)
for _, t := range tasks {
if t.NextTriggerTime.Before(cutoff) {
e.AddToWheel(t.ID, t.NextTriggerTime)
}
}
// 加载重试任务到最小堆
retryInstances, err := e.instanceRepo.ListDueRetryInstances(ctx, cutoff, 500)
for _, inst := range retryInstances {
if !inst.NextRetryTime.IsZero() && inst.NextRetryTime.Before(cutoff) {
e.AddToHeap(inst.TaskID, inst.NextRetryTime)
}
}
return nil
}
预热有两个设计细节:
- 批量加载:每次最多 500 条,防止大数据量拖慢启动
- 双数据源:cron 任务进时间轮,重试任务进最小堆,各司其职
预热完成后,后台还有一个 10 秒周期的 WarmPeriodically loop,持续补充新进入窗口的任务。
两种引擎的协作模式
| 维度 | 时间轮 | 最小堆 |
|---|---|---|
| 扫描间隔 | 100ms | 50ms |
| 适用场景 | 周期性 cron 任务 | 重试、短期精确任务 |
| 插入复杂度 | O(1) | O(log n) |
| 查询复杂度 | O(1) | O(log n) |
| 时间覆盖 | 60 秒 | 任意 |
| 精度 | 100ms | 50ms |
踩坑:时间轮的边界情况
实现时间轮时遇到过几个实际问题。
任务更新后的槽位清理
当用户修改了任务的 cron 表达式,旧槽位上的任务 ID 需要被移除:
func (tw *TimingWheel) Remove(taskID int64) {
tw.mu.Lock()
defer tw.mu.Unlock()
for i := range tw.slots {
delete(tw.slots[i].taskIDs, taskID)
}
}
当前实现是全量扫描所有槽位。600 个槽位 × 每个槽位平均少量任务,这个开销可以接受。如果槽位数量增加到数万,就需要建立反向索引(taskID → slotPos)来优化。
任务触发后下次入轮
时间轮只覆盖 60 秒,一个 cron 任务触发后,下次触发时间可能是 1 小时后。此时不能直接 AddToWheel,而是等下一次预热周期:
// cmd/scheduler/main.go
nextTrigger, err := cronexpr.NextAfter(time.Now(), task.CronExpr)
if err == nil && !nextTrigger.IsZero() {
task.NextTriggerTime = nextTrigger
resources.Repositories.Task.UpdateTask(leaderCtx, task)
schedEngine.AddToWheel(task.ID, nextTrigger)
}
这里其实有一个微妙的 race condition:如果更新数据库成功但 AddToWheel 失败(比如引擎刚好在 tick),任务会丢失直到下次预热。生产环境中可以增加一个兜底检查:每次预热时对比数据库中 NextTriggerTime 已过期但时间轮中不存在的任务。
与 Redis 缓存的协同
在更大规模的部署中,预热循环可以从 Redis 读取而不是直接查 MySQL:
// internal/scheduler/cache/manager.go
func (m *Manager) warmDueTasks(ctx context.Context) {
tasks, err := m.taskRepo.ListDueTasks(ctx, 500)
// 将任务 ID 和触发时间写入 Redis Sorted Set
m.redis.WarmDueTasks(ctx, ids, scores)
}
Redis 在这里扮演的是读缓存角色。即使 Redis 不可用,调度器也会优雅降级到直接查询 MySQL。这种 optional 设计让系统在小规模部署时不需要额外的基础设施。
总结
go-ai-scheduler 的混合引擎是一个典型的**"够用就好"**设计:
- 时间轮负责高频、粗粒度的周期性触发,O(1) 的查询性能让它可以轻松处理数千任务的并发场景
- 最小堆补充精确触发和重试场景,50ms 的扫描间隔提供了足够的精度
- 预热机制桥接了持久化存储与内存引擎,10 秒的刷新周期在实时性和数据库负载之间取得了平衡
这个设计的关键洞察是:不需要一个完美的数据结构解决所有问题,而是让两个 imperfect 的结构在各自擅长的领域协同工作。
下一篇会分析调度器的分发链路,包括负载感知路由、背压控制,以及 HTTP/gRPC 双协议透明的实现。