任务依赖与 DAG 执行
分析 go-ai-scheduler 的任务依赖系统,理解 DAG 定义、依赖检查、下游触发与可视化实现。
为什么需要任务依赖
Cron 任务适合独立的定时执行,但真实业务中任务之间往往有先后顺序:
- 数据清洗完成后才能做数据聚合
- 报表生成依赖多个上游数据源
- A/B 测试需要等流量导入完成
go-ai-scheduler 支持任务间的上游依赖定义,形成 DAG(有向无环图)执行。
依赖模型
// model/dependency.go
type TaskDependency struct {
ID int64
TaskID int64 // 下游任务
DependsOnTaskID int64 // 上游任务
CreatedAt time.Time
}
依赖关系是单向的:Task B "depends on" Task A,表示 A 必须先成功,B 才能执行。
Task-A (数据清洗) ──成功──▶ Task-B (数据聚合) ──成功──▶ Task-C (报表生成)
│ │
└── 失败 ──▶ B 等待 └── 失败 ──▶ C 等待
依赖检查时机
依赖检查发生在触发循环中,每次扫描到期任务时执行:
func (l *Loop) handleTask(ctx context.Context, task *model.Task) error {
// 1. 检查上游依赖
upstream, _ := l.taskRepo.ListUpstreamDeps(ctx, task.ID)
if len(upstream) > 0 {
allSatisfied := true
for _, depID := range upstream {
// 检查上游任务是否有成功实例
instances, _ := l.instanceRepo.ListInstancesByStatus(ctx, "success", 1)
found := false
for _, inst := range instances {
if inst.TaskID == depID {
found = true
break
}
}
if !found {
allSatisfied = false
break
}
}
if !allSatisfied {
// 延迟 5 秒再检查
task.NextTriggerTime = time.Now().Add(5 * time.Second)
l.taskRepo.UpdateTask(ctx, task)
return nil
}
}
// ... 继续执行
}
检查逻辑
- 获取当前任务的所有上游依赖任务 ID
- 对每个上游任务,查询是否存在
status = "success"的实例 - 如果全部满足,创建实例并分发
- 如果有任一不满足,推迟 5 秒再检查
DAG 执行流程
Rendering diagram...
执行时序
02:00 Task-A 触发(无依赖)
Task-A 执行中...
Task-D 触发(无依赖)
Task-D 执行中...
02:15 Task-A 成功完成
Task-D 成功完成
02:30 Task-B 触发,检查依赖:
- Task-A 有成功实例 ✓
- Task-D 有成功实例 ✓
Task-B 执行中...
03:00 Task-C 触发,检查依赖:
- Task-B 有成功实例 ✓
Task-C 执行中...
循环依赖检测
DAG 要求无环。系统在创建依赖时会检测循环:
func (r *repo) AddDependency(ctx context.Context, taskID, dependsOnID int64) error {
// 检查是否形成环
if r.wouldCreateCycle(ctx, taskID, dependsOnID) {
return fmt.Errorf("dependency would create cycle: %d -> %d", taskID, dependsOnID)
}
// ... 插入依赖记录
}
func (r *repo) wouldCreateCycle(ctx context.Context, start, target int64) bool {
visited := make(map[int64]bool)
queue := []int64{target}
for len(queue) > 0 {
curr := queue[0]
queue = queue[1:]
if curr == start {
return true // 发现环
}
if visited[curr] {
continue
}
visited[curr] = true
deps, _ := r.ListUpstreamDeps(ctx, curr)
queue = append(queue, deps...)
}
return false
}
使用 BFS 从 dependsOnID 向上游遍历,如果最终能到达 taskID,说明会形成环。
依赖与 Cron 的关系
依赖检查不改变 Cron 的触发逻辑:
- Cron 时间到达时,任务仍然被"触发"
- 但触发后先检查依赖,不满足则推迟执行
- 任务的
NextTriggerTime被推迟,下次 Cron 扫描时再次检查
Cron 触发时间: 02:30
实际执行时间: 02:45 (因为上游 02:15 才完成)
下游任务链
成功执行后,可以自动触发下游任务:
// 伪代码:任务成功后检查下游
func onTaskSuccess(ctx context.Context, instance *model.TaskInstance) {
downstream, _ := taskRepo.ListDownstreamDeps(ctx, instance.TaskID)
for _, depTaskID := range downstream {
depTask, _ := taskRepo.GetTask(ctx, depTaskID)
// 将下游任务的下次触发时间设为"现在"
depTask.NextTriggerTime = time.Now()
taskRepo.UpdateTask(ctx, depTask)
}
}
这样下游任务不需要等 Cron 时间到达,上游一成功就立即触发。
小结
任务依赖系统的设计要点:
- 简单有效:只用一张依赖表,BFS 检测循环
- 与 Cron 解耦:依赖不改变 Cron 触发逻辑,只影响实际执行时机
- 延迟检查:依赖不满足时推迟 5 秒,避免频繁查询
- 下游链式触发:上游成功可立即触发下游,减少等待
DAG 支持让 go-ai-scheduler 从"定时器"升级为"工作流引擎",能表达更复杂的业务执行顺序。