Worker 执行引擎与多任务类型

深入分析 go-ai-scheduler 的 Worker 执行模块,理解 shell、http、container 三种任务类型的实现与沙箱隔离机制。

Worker 的职责

Worker 是 go-ai-scheduler 中唯一执行任务的组件。它的职责很简单:

  1. 注册:启动时向 Scheduler 注册自己的地址和标签
  2. 心跳:定期向 Scheduler 报告存活状态
  3. 执行:接收 Scheduler 分发的任务,按类型执行
  4. 上报:执行完成后向 Scheduler 报告结果
Scheduler                        Worker
    │                              │
    │── Dispatch(task) ──────────▶│
    │                              │── 执行 (shell/http/container)
    │                              │
    │◀── ReportResult(result) ────│
    │                              │
    │◀── Heartbeat() ─────────────│ (定期)

三种任务类型

Worker 通过 task.Type 字段判断执行方式,支持三种类型:

func Execute(ctx context.Context, taskType string, payload string, image string, extraEnv map[string]string) (string, error) {
    switch strings.ToLower(strings.TrimSpace(taskType)) {
    case "shell":
        return executeShell(ctx, payload, extraEnv)
    case "http":
        return executeHTTP(ctx, payload, extraEnv)
    case "container":
        return executeContainer(ctx, payload, image)
    default:
        return "", fmt.Errorf("unsupported task type: %s", taskType)
    }
}

Shell 任务

case "shell":
    cmd := exec.CommandContext(ctx, "bash", "-lc", payload)
    cmd.Env = append(cmd.Env,
        "IDEMPOTENCY_KEY="+getEnv(extraEnv, "IDEMPOTENCY_KEY"),
        "SHARD_NO="+getEnv(extraEnv, "SHARD_NO"),
        "SHARD_TOTAL="+getEnv(extraEnv, "SHARD_TOTAL"),
        "SCHEDULE_INSTANCE_ID="+getEnv(extraEnv, "SCHEDULE_INSTANCE_ID"),
    )
    output, err := cmd.CombinedOutput()

Shell 任务是最灵活的类型。payload 就是一段 bash 脚本,Worker 用 bash -lc 执行。额外注入的环境变量让脚本可以感知调度上下文:

环境变量说明
IDEMPOTENCY_KEY幂等键,脚本可用来去重
SHARD_NO / SHARD_TOTAL分片信息,用于并行数据处理
SCHEDULE_INSTANCE_ID唯一实例 ID,用于日志追踪

HTTP 任务

case "http":
    req, _ := http.NewRequestWithContext(ctx, http.MethodGet, payload, nil)
    if key := extraEnv["IDEMPOTENCY_KEY"]; key != "" {
        req.Header.Set("X-Idempotency-Key", key)
    }
    resp, _ := http.DefaultClient.Do(req)
    body, _ := io.ReadAll(resp.Body)
    if resp.StatusCode >= 300 {
        return string(body), fmt.Errorf("http execute status=%s", resp.Status)
    }
    return string(body), nil

HTTP 任务的 payload 是一个 URL,Worker 发起 GET 请求。幂等键通过 X-Idempotency-Key Header 传递。适用于调用外部 webhook 或微服务接口。

Container 任务

case "container":
    if image == "" {
        return "", fmt.Errorf("container task requires a non-empty image")
    }
    args := []string{"run", "--rm"}
    if payload != "" {
        args = append(args, strings.Fields(payload)...)
    }
    args = append(args, image)
    cmd := exec.CommandContext(ctx, "docker", args...)
    output, _ := cmd.CombinedOutput()

Container 任务需要指定 image 字段。Worker 本地执行 docker run --rm [payload] [image]payload 可以作为额外的 docker run 参数(如 -e FOO=bar)。

执行上下文与超时

所有任务类型都通过 context.Context 控制超时:

// Scheduler 侧设置超时
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(task.TimeoutSeconds)*time.Second)
defer cancel()

// Worker 侧执行
output, err := worker.Execute(timeoutCtx, task.Type, task.Payload, task.Image, extraEnv)

当超时到达时,context.Cancel() 会中断:

  • Shell 任务:exec.CommandContext 自动发送 SIGKILL
  • HTTP 任务:http.NewRequestWithContext 自动取消连接
  • Container 任务:docker run 进程被终止

沙箱隔离

Worker 对每种任务类型做了不同程度的隔离:

任务类型进程隔离资源限制网络隔离文件系统隔离
Shellbash 子进程
HTTPhttp.Client 连接
ContainerDocker 容器cgroup可选镜像级

资源限制(Linux)

// sandbox_resource_linux.go
func applyResourceLimits(cmd *exec.Cmd, limits ResourceLimits) error {
    cmd.SysProcAttr = &syscall.SysProcAttr{
        // 设置 cgroup 限制
    }
    return nil
}

Linux 环境下可以对 Shell 任务施加 cgroup 资源限制,防止单个任务耗尽 Worker 资源。

结果上报

Worker 执行完成后向 Scheduler 上报结果:

// HTTP 上报
resp, _ := http.Post(schedulerURL+"/internal/tasks/result", "application/json", body)

// gRPC 上报
client.ReportResult(ctx, &schedulerpb.TaskResult{
    ScheduleInstanceId: instanceID,
    Status:            "success" / "failed",
    Output:            output,
    ErrorMessage:      errMsg,
})

Scheduler 收到结果后:

  1. 更新 task_instance 状态
  2. 释放 Worker 容量(router.Release
  3. 如果是失败状态,触发重试逻辑
  4. 如果是成功状态,检查下游依赖任务

Worker 的高可用

心跳机制

Worker 定期(默认 5 秒)向 Scheduler 发送心跳:

type HeartbeatClient struct {
    schedulerURL string
    workerID     string
    interval     time.Duration
}

func (c *HeartbeatClient) Start(ctx context.Context) {
    ticker := time.NewTicker(c.interval)
    for {
        select {
        case <-ticker.C:
            c.sendHeartbeat(ctx)
        case <-ctx.Done():
            return
        }
    }
}

Scheduler 维护 LastHeartbeat 时间戳,超过阈值(默认 30 秒)未收到心跳的 Worker 被标记为 offline

Worker 本地状态

type LocalStore struct {
    mu        sync.RWMutex
    instances map[string]*InstanceRecord  // schedule_instance_id -> 记录
}

Worker 本地缓存正在执行的实例,崩溃重启后可以恢复执行状态。

小结

Worker 模块的设计体现了几个原则:

  1. 简单可扩展:三种任务类型覆盖绝大多数场景,新增类型只需加 case
  2. 上下文贯穿context.Context 统一控制超时、取消和追踪
  3. 渐进式隔离:Shell 最轻量,Container 最隔离,按需选择
  4. 自愈能力:心跳检测 + 本地状态缓存,Worker 故障可恢复

Worker 的代码量不大(executor_run.go 76 行 + heartbeat_client.go),但它是整个调度系统的"最后一公里"——再精巧的调度决策,最终都要靠 Worker 落地执行。