Worker 高可用:心跳、本地缓冲与沙箱执行

深入 go-ai-scheduler Worker 端的设计,分析心跳机制、本地状态缓冲、去重幂等和沙箱隔离的工程实现。

问题背景

在分布式调度系统中,Worker 是最容易发生故障的环节。它可能因以下原因失联或异常:

  • 网络抖动导致心跳丢失
  • 进程 OOM 被系统杀掉
  • 任务执行脚本本身有问题(死循环、资源泄漏)
  • 调度器重启后 Worker 状态不同步

go-ai-scheduler 的 Worker 设计遵循一个核心原则:尽可能自治。Worker 不依赖调度器的实时可用性,即使调度器短暂宕机,Worker 仍能继续执行已接收的任务,并在恢复后补报状态。

Worker 生命周期

Worker 启动后的完整生命周期:

启动
  → 注册到调度器(Register)
  → 启动 HTTP/gRPC 服务
  → 启动心跳循环(10s 周期)
  → 启动去重清理循环(30s 周期)
  → 启动本地缓冲刷新循环(30s 周期)
  → 等待接收任务
// cmd/worker/main.go
client := workerapp.NewHeartbeatClient(cfg.SchedulerURL, cfg.SchedulerGRPCAddr, cfg.InternalProtocol)
workerHandler := workerapp.NewHandler(workerID, reportClient, l, workerapp.HandlerConfig{
    SandboxDir:     os.TempDir(),
    MaxMemoryBytes: 256 * 1024 * 1024,
    LocalStoreDir:  os.TempDir(),
})

// 启动本地缓冲刷新
if ls := workerHandler.LocalStore(); ls != nil {
    go ls.StartFlushLoop(ctx, 30*time.Second)
}

// 启动去重缓存清理
workerHandler.StartDedupEviction(ctx, 5*time.Minute, 30*time.Second)

// 注册到调度器
client.Register(ctx, registerReq)

// 心跳循环
ticker := time.NewTicker(10 * time.Second)
for {
    heartbeatReq := apiservice.WorkerHeartbeatRequest{
        WorkerID:     workerID,
        CurrentLoad:  runningTasks,
        RunningTasks: runningTasks,
    }
    client.Heartbeat(ctx, heartbeatReq)
    <-ticker.C
}

心跳机制

Worker 每 10 秒向调度器发送一次心跳,报告当前负载和运行中的任务数:

// internal/worker/heartbeat_client.go
func (c *HeartbeatClient) Heartbeat(ctx context.Context, req service.WorkerHeartbeatRequest) error {
    if c.protocol == "grpc" {
        return c.heartbeatGRPC(ctx, req)
    }
    return c.post(ctx, "/api/v1/workers/heartbeat", req)
}

心跳客户端同样支持 HTTP 和 gRPC 双协议,与调度器的 Dispatch Client 形成对称设计。

调度器端的 Health Checker 每 10 秒扫描一次 Worker 列表,将 30 秒内未上报心跳的 Worker 标记为离线:

// internal/scheduler/health/checker.go
count, err := c.workers.EvictStaleWorkers(ctx, c.timeout) // timeout = 30s

心跳间隔的权衡

间隔故障发现时间网络开销适用场景
1s1-3s对延迟极敏感
10s10-30s平衡(当前选择)
60s60-180s大规模集群

10 秒的间隔是一个经验值:在大多数网络环境下,10 秒 × 3 次(30 秒超时)可以有效区分"网络抖动"和"真实失联"。

去重与幂等

sync.Map 去重缓存

Worker 使用 sync.Map 维护了一个去重缓存,防止同一个任务实例被重复执行:

// internal/worker/handler.go
type Handler struct {
    dedupMap sync.Map
}

func (h *Handler) isDuplicate(scheduleID string) bool {
    if _, loaded := h.dedupMap.LoadOrStore(scheduleID, time.Now()); loaded {
        return true
    }
    return false
}

LoadOrStore 是原子操作:如果 key 已存在返回 loaded=true,不存在则写入并返回 loaded=false。这保证了在并发场景下也不会有重复执行。

TTL 清理

去重缓存不能无限增长,后台有一个清理循环:

func (h *Handler) StartDedupEviction(ctx context.Context, ttl, interval time.Duration) {
    go func() {
        ticker := time.NewTicker(interval) // 30s
        for {
            cutoff := time.Now().Add(-ttl) // 5min
            h.dedupMap.Range(func(key, value any) bool {
                if seen, ok := value.(time.Time); ok && seen.Before(cutoff) {
                    h.dedupMap.Delete(key)
                }
                return true
            })
        }
    }()
}

5 分钟的 TTL 足够覆盖大多数场景:调度器重试间隔通常是秒级,同一个实例在 5 分钟内被重复分发的概率已经很低。

幂等环境变量

Worker 将 IdempotencyKey 注入任务执行环境,让业务逻辑可以实现自身的幂等控制:

output, execErr = Execute(ctx, req.TaskType, req.Payload, req.Image, map[string]string{
    "IDEMPOTENCY_KEY":      req.IdempotencyKey,
    "SHARD_NO":             fmt.Sprintf("%d", req.ShardNo),
    "SHARD_TOTAL":          fmt.Sprintf("%d", req.ShardTotal),
    "SCHEDULE_INSTANCE_ID": req.ScheduleInstanceID,
})

<Callout type="info" title="为什么需要两层去重?"

Worker 端的 sync.Map 去重防止的是"同一个 Worker 被重复分发"。但如果调度器将同一个实例分发给了两个不同的 Worker,仅靠单个 Worker 的去重是不够的。这个场景由调度器保证——它通过数据库唯一约束确保同一个 ScheduleInstanceID 不会创建多个 TaskInstance 记录。

本地缓冲:断网续传

当 Worker 完成任务后,如果上报状态到调度器失败(网络断开、调度器宕机),会将报告写入本地磁盘:

// internal/worker/handler.go
if err := h.reporter.Report(ctx, req.SchedulerURL, statusReq); err != nil {
    if h.localStore != nil {
        h.localStore.Buffer(req.SchedulerURL, statusReq)
    }
}

本地存储的实现

// internal/worker/local_store.go
func (s *Store) Buffer(schedulerURL string, report apiservice.TaskStatusReportRequest) {
    sr := StoredReport{
        SchedulerURL: schedulerURL,
        Report:       report,
        StoredAt:     time.Now().Unix(),
    }
    data, _ := json.Marshal(sr)
    filename := filepath.Join(s.dir, report.ScheduleInstanceID+".json")
    os.WriteFile(filename, data, 0600)
}

每个报告序列化为 JSON,文件名使用 ScheduleInstanceID,保证唯一性。

后台刷新

Flush loop 每 30 秒读取目录,逐个重试投递:

func (s *Store) Flush(ctx context.Context) int {
    entries, _ := os.ReadDir(s.dir)
    var remaining int
    for _, entry := range entries {
        filename := filepath.Join(s.dir, entry.Name())
        data, _ := os.ReadFile(filename)
        var sr StoredReport
        json.Unmarshal(data, &sr)

        deliveryCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
        err = s.reporter.Report(deliveryCtx, sr.SchedulerURL, sr.Report)
        cancel()
        if err != nil {
            remaining++
            continue
        }
        os.Remove(filename)
    }
    return remaining
}

<Callout type="tip" title="为什么用文件系统而不是内存队列?"

文件系统的优势在于进程重启后数据不丢失。如果 Worker 进程在缓冲了报告后崩溃,内存队列会丢失数据,而文件系统中的 JSON 文件会在重启后继续被刷新。

沙箱执行

对于 Shell 类型的任务,Worker 支持在沙箱中执行,提供工作目录隔离和资源限制:

// internal/worker/sandbox.go
type SandboxConfig struct {
    WorkDir        string
    MaxMemoryBytes int64
    MaxCPUPercent  int
    Timeout        time.Duration
}

type Sandbox struct {
    workDir string
    config  SandboxConfig
}

隔离机制

func (s *Sandbox) ShellExec(ctx context.Context, command string, extraEnv map[string]string) ([]byte, error) {
    cmd := exec.CommandContext(ctx, "bash", "-lc", command)
    cmd.Dir = s.workDir

    // 环境隔离:只保留 PATH,其余重置
    cmd.Env = []string{
        "PATH=" + os.Getenv("PATH"),
        "HOME=" + s.workDir,
        "USER=nobody",
        "TMPDIR=" + s.workDir,
        "SANDBOX=true",
    }

    // 脱离父进程组,独立管理生命周期
    cmd.SysProcAttr = &syscall.SysProcAttr{
        Setpgid: true,
    }

    applyResourceLimits(cmd, s.config)
    return cmd.CombinedOutput()
}

沙箱的几个关键点:

  1. 独立工作目录:每次创建临时目录,任务执行完后清理
  2. 环境变量隔离:只保留 PATH,重置 HOMETMPDIR
  3. 进程组隔离Setpgid: true 让子进程脱离父进程组,可以独立 kill
  4. 资源限制:通过 cgroup 限制内存和 CPU(applyResourceLimits 是平台相关实现)

沙箱的局限

当前实现是一个轻量级沙箱,不是容器级别的隔离:

  • 可以访问宿主机的文件系统(通过 PATH 中的命令)
  • 可以访问网络
  • 共享宿主机的用户空间

对于需要更强隔离的场景,建议将任务类型设为 container,由外部容器运行时(Docker/containerd)执行。

执行流程全景

一个任务到达 Worker 后的完整处理流程:

接收 HTTP/gRPC 请求
  → 解析 ExecuteTaskRequest
  → isDuplicate()? 
      ├─ 是 → 直接返回(已执行)
      └─ 否 → 继续
  → running.Add(1)
  → 上报 "running" 状态
  → 创建带超时的 context
  → 按任务类型执行
      ├─ shell + sandboxDir → 沙箱执行
      └─ 其他 → 直接执行
  → 收集输出和错误
  → 上报最终状态
      ├─ 成功 → 删除本地缓冲
      └─ 失败 → Buffer 到本地磁盘
  → running.Add(-1)

总结

Worker 的高可用设计可以概括为**"自治 + 防御"**:

机制解决的问题实现方式
心跳Worker 失联检测10s 上报,30s 超时驱逐
去重缓存重复分发导致重复执行sync.Map + 5min TTL
幂等 Key业务层幂等环境变量注入
本地缓冲网络断开导致状态丢失文件系统缓冲 + 30s 刷新
沙箱任务脚本污染/泄漏工作目录隔离 + 进程组分离

Worker 被设计为"无状态"的执行节点,除了本地缓冲目录外不持有任何持久化状态。这使得 Worker 可以随时重启、水平扩展,而不影响系统的整体可用性。

下一篇会进入 AI 服务的设计,分析 LLM Agent 的工具调用框架和推理循环。