Worker 执行引擎与多任务类型
深入分析 go-ai-scheduler 的 Worker 执行模块,理解 shell、http、container 三种任务类型的实现与沙箱隔离机制。
Worker 的职责
Worker 是 go-ai-scheduler 中唯一执行任务的组件。它的职责很简单:
- 注册:启动时向 Scheduler 注册自己的地址和标签
- 心跳:定期向 Scheduler 报告存活状态
- 执行:接收 Scheduler 分发的任务,按类型执行
- 上报:执行完成后向 Scheduler 报告结果
Scheduler Worker
│ │
│── Dispatch(task) ──────────▶│
│ │── 执行 (shell/http/container)
│ │
│◀── ReportResult(result) ────│
│ │
│◀── Heartbeat() ─────────────│ (定期)
三种任务类型
Worker 通过 task.Type 字段判断执行方式,支持三种类型:
func Execute(ctx context.Context, taskType string, payload string, image string, extraEnv map[string]string) (string, error) {
switch strings.ToLower(strings.TrimSpace(taskType)) {
case "shell":
return executeShell(ctx, payload, extraEnv)
case "http":
return executeHTTP(ctx, payload, extraEnv)
case "container":
return executeContainer(ctx, payload, image)
default:
return "", fmt.Errorf("unsupported task type: %s", taskType)
}
}
Shell 任务
case "shell":
cmd := exec.CommandContext(ctx, "bash", "-lc", payload)
cmd.Env = append(cmd.Env,
"IDEMPOTENCY_KEY="+getEnv(extraEnv, "IDEMPOTENCY_KEY"),
"SHARD_NO="+getEnv(extraEnv, "SHARD_NO"),
"SHARD_TOTAL="+getEnv(extraEnv, "SHARD_TOTAL"),
"SCHEDULE_INSTANCE_ID="+getEnv(extraEnv, "SCHEDULE_INSTANCE_ID"),
)
output, err := cmd.CombinedOutput()
Shell 任务是最灵活的类型。payload 就是一段 bash 脚本,Worker 用 bash -lc 执行。额外注入的环境变量让脚本可以感知调度上下文:
| 环境变量 | 说明 |
|---|---|
IDEMPOTENCY_KEY | 幂等键,脚本可用来去重 |
SHARD_NO / SHARD_TOTAL | 分片信息,用于并行数据处理 |
SCHEDULE_INSTANCE_ID | 唯一实例 ID,用于日志追踪 |
HTTP 任务
case "http":
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, payload, nil)
if key := extraEnv["IDEMPOTENCY_KEY"]; key != "" {
req.Header.Set("X-Idempotency-Key", key)
}
resp, _ := http.DefaultClient.Do(req)
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
return string(body), fmt.Errorf("http execute status=%s", resp.Status)
}
return string(body), nil
HTTP 任务的 payload 是一个 URL,Worker 发起 GET 请求。幂等键通过 X-Idempotency-Key Header 传递。适用于调用外部 webhook 或微服务接口。
Container 任务
case "container":
if image == "" {
return "", fmt.Errorf("container task requires a non-empty image")
}
args := []string{"run", "--rm"}
if payload != "" {
args = append(args, strings.Fields(payload)...)
}
args = append(args, image)
cmd := exec.CommandContext(ctx, "docker", args...)
output, _ := cmd.CombinedOutput()
Container 任务需要指定 image 字段。Worker 本地执行 docker run --rm [payload] [image]。payload 可以作为额外的 docker run 参数(如 -e FOO=bar)。
执行上下文与超时
所有任务类型都通过 context.Context 控制超时:
// Scheduler 侧设置超时
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(task.TimeoutSeconds)*time.Second)
defer cancel()
// Worker 侧执行
output, err := worker.Execute(timeoutCtx, task.Type, task.Payload, task.Image, extraEnv)
当超时到达时,context.Cancel() 会中断:
- Shell 任务:
exec.CommandContext自动发送 SIGKILL - HTTP 任务:
http.NewRequestWithContext自动取消连接 - Container 任务:
docker run进程被终止
沙箱隔离
Worker 对每种任务类型做了不同程度的隔离:
| 任务类型 | 进程隔离 | 资源限制 | 网络隔离 | 文件系统隔离 |
|---|---|---|---|---|
| Shell | bash 子进程 | 无 | 无 | 无 |
| HTTP | http.Client 连接 | 无 | 无 | 无 |
| Container | Docker 容器 | cgroup | 可选 | 镜像级 |
资源限制(Linux)
// sandbox_resource_linux.go
func applyResourceLimits(cmd *exec.Cmd, limits ResourceLimits) error {
cmd.SysProcAttr = &syscall.SysProcAttr{
// 设置 cgroup 限制
}
return nil
}
Linux 环境下可以对 Shell 任务施加 cgroup 资源限制,防止单个任务耗尽 Worker 资源。
结果上报
Worker 执行完成后向 Scheduler 上报结果:
// HTTP 上报
resp, _ := http.Post(schedulerURL+"/internal/tasks/result", "application/json", body)
// gRPC 上报
client.ReportResult(ctx, &schedulerpb.TaskResult{
ScheduleInstanceId: instanceID,
Status: "success" / "failed",
Output: output,
ErrorMessage: errMsg,
})
Scheduler 收到结果后:
- 更新
task_instance状态 - 释放 Worker 容量(
router.Release) - 如果是失败状态,触发重试逻辑
- 如果是成功状态,检查下游依赖任务
Worker 的高可用
心跳机制
Worker 定期(默认 5 秒)向 Scheduler 发送心跳:
type HeartbeatClient struct {
schedulerURL string
workerID string
interval time.Duration
}
func (c *HeartbeatClient) Start(ctx context.Context) {
ticker := time.NewTicker(c.interval)
for {
select {
case <-ticker.C:
c.sendHeartbeat(ctx)
case <-ctx.Done():
return
}
}
}
Scheduler 维护 LastHeartbeat 时间戳,超过阈值(默认 30 秒)未收到心跳的 Worker 被标记为 offline。
Worker 本地状态
type LocalStore struct {
mu sync.RWMutex
instances map[string]*InstanceRecord // schedule_instance_id -> 记录
}
Worker 本地缓存正在执行的实例,崩溃重启后可以恢复执行状态。
小结
Worker 模块的设计体现了几个原则:
- 简单可扩展:三种任务类型覆盖绝大多数场景,新增类型只需加 case
- 上下文贯穿:
context.Context统一控制超时、取消和追踪 - 渐进式隔离:Shell 最轻量,Container 最隔离,按需选择
- 自愈能力:心跳检测 + 本地状态缓存,Worker 故障可恢复
Worker 的代码量不大(executor_run.go 76 行 + heartbeat_client.go),但它是整个调度系统的"最后一公里"——再精巧的调度决策,最终都要靠 Worker 落地执行。