任务依赖与 DAG 执行

分析 go-ai-scheduler 的任务依赖系统,理解 DAG 定义、依赖检查、下游触发与可视化实现。

为什么需要任务依赖

Cron 任务适合独立的定时执行,但真实业务中任务之间往往有先后顺序:

  • 数据清洗完成后才能做数据聚合
  • 报表生成依赖多个上游数据源
  • A/B 测试需要等流量导入完成

go-ai-scheduler 支持任务间的上游依赖定义,形成 DAG(有向无环图)执行。

依赖模型

// model/dependency.go
type TaskDependency struct {
    ID           int64
    TaskID       int64   // 下游任务
    DependsOnTaskID int64 // 上游任务
    CreatedAt    time.Time
}

依赖关系是单向的:Task B "depends on" Task A,表示 A 必须先成功,B 才能执行。

Task-A (数据清洗) ──成功──▶ Task-B (数据聚合) ──成功──▶ Task-C (报表生成)
       │                               │
       └── 失败 ──▶ B 等待             └── 失败 ──▶ C 等待

依赖检查时机

依赖检查发生在触发循环中,每次扫描到期任务时执行:

func (l *Loop) handleTask(ctx context.Context, task *model.Task) error {
    // 1. 检查上游依赖
    upstream, _ := l.taskRepo.ListUpstreamDeps(ctx, task.ID)
    if len(upstream) > 0 {
        allSatisfied := true
        for _, depID := range upstream {
            // 检查上游任务是否有成功实例
            instances, _ := l.instanceRepo.ListInstancesByStatus(ctx, "success", 1)
            found := false
            for _, inst := range instances {
                if inst.TaskID == depID {
                    found = true
                    break
                }
            }
            if !found {
                allSatisfied = false
                break
            }
        }
        if !allSatisfied {
            // 延迟 5 秒再检查
            task.NextTriggerTime = time.Now().Add(5 * time.Second)
            l.taskRepo.UpdateTask(ctx, task)
            return nil
        }
    }
    // ... 继续执行
}

检查逻辑

  1. 获取当前任务的所有上游依赖任务 ID
  2. 对每个上游任务,查询是否存在 status = "success" 的实例
  3. 如果全部满足,创建实例并分发
  4. 如果有任一不满足,推迟 5 秒再检查

DAG 执行流程

Rendering diagram...

执行时序

02:00  Task-A 触发(无依赖)
       Task-A 执行中...
       Task-D 触发(无依赖)
       Task-D 执行中...

02:15  Task-A 成功完成
       Task-D 成功完成

02:30  Task-B 触发,检查依赖:
       - Task-A 有成功实例 ✓
       - Task-D 有成功实例 ✓
       Task-B 执行中...

03:00  Task-C 触发,检查依赖:
       - Task-B 有成功实例 ✓
       Task-C 执行中...

循环依赖检测

DAG 要求无环。系统在创建依赖时会检测循环:

func (r *repo) AddDependency(ctx context.Context, taskID, dependsOnID int64) error {
    // 检查是否形成环
    if r.wouldCreateCycle(ctx, taskID, dependsOnID) {
        return fmt.Errorf("dependency would create cycle: %d -> %d", taskID, dependsOnID)
    }
    // ... 插入依赖记录
}

func (r *repo) wouldCreateCycle(ctx context.Context, start, target int64) bool {
    visited := make(map[int64]bool)
    queue := []int64{target}
    for len(queue) > 0 {
        curr := queue[0]
        queue = queue[1:]
        if curr == start {
            return true // 发现环
        }
        if visited[curr] {
            continue
        }
        visited[curr] = true
        deps, _ := r.ListUpstreamDeps(ctx, curr)
        queue = append(queue, deps...)
    }
    return false
}

使用 BFS 从 dependsOnID 向上游遍历,如果最终能到达 taskID,说明会形成环。

依赖与 Cron 的关系

依赖检查不改变 Cron 的触发逻辑:

  • Cron 时间到达时,任务仍然被"触发"
  • 但触发后先检查依赖,不满足则推迟执行
  • 任务的 NextTriggerTime 被推迟,下次 Cron 扫描时再次检查
Cron 触发时间: 02:30
实际执行时间: 02:45 (因为上游 02:15 才完成)

下游任务链

成功执行后,可以自动触发下游任务:

// 伪代码:任务成功后检查下游
func onTaskSuccess(ctx context.Context, instance *model.TaskInstance) {
    downstream, _ := taskRepo.ListDownstreamDeps(ctx, instance.TaskID)
    for _, depTaskID := range downstream {
        depTask, _ := taskRepo.GetTask(ctx, depTaskID)
        // 将下游任务的下次触发时间设为"现在"
        depTask.NextTriggerTime = time.Now()
        taskRepo.UpdateTask(ctx, depTask)
    }
}

这样下游任务不需要等 Cron 时间到达,上游一成功就立即触发。

小结

任务依赖系统的设计要点:

  1. 简单有效:只用一张依赖表,BFS 检测循环
  2. 与 Cron 解耦:依赖不改变 Cron 触发逻辑,只影响实际执行时机
  3. 延迟检查:依赖不满足时推迟 5 秒,避免频繁查询
  4. 下游链式触发:上游成功可立即触发下游,减少等待

DAG 支持让 go-ai-scheduler 从"定时器"升级为"工作流引擎",能表达更复杂的业务执行顺序。