Worker 高可用:心跳、本地缓冲与沙箱执行
深入 go-ai-scheduler Worker 端的设计,分析心跳机制、本地状态缓冲、去重幂等和沙箱隔离的工程实现。
问题背景
在分布式调度系统中,Worker 是最容易发生故障的环节。它可能因以下原因失联或异常:
- 网络抖动导致心跳丢失
- 进程 OOM 被系统杀掉
- 任务执行脚本本身有问题(死循环、资源泄漏)
- 调度器重启后 Worker 状态不同步
go-ai-scheduler 的 Worker 设计遵循一个核心原则:尽可能自治。Worker 不依赖调度器的实时可用性,即使调度器短暂宕机,Worker 仍能继续执行已接收的任务,并在恢复后补报状态。
Worker 生命周期
Worker 启动后的完整生命周期:
启动
→ 注册到调度器(Register)
→ 启动 HTTP/gRPC 服务
→ 启动心跳循环(10s 周期)
→ 启动去重清理循环(30s 周期)
→ 启动本地缓冲刷新循环(30s 周期)
→ 等待接收任务
// cmd/worker/main.go
client := workerapp.NewHeartbeatClient(cfg.SchedulerURL, cfg.SchedulerGRPCAddr, cfg.InternalProtocol)
workerHandler := workerapp.NewHandler(workerID, reportClient, l, workerapp.HandlerConfig{
SandboxDir: os.TempDir(),
MaxMemoryBytes: 256 * 1024 * 1024,
LocalStoreDir: os.TempDir(),
})
// 启动本地缓冲刷新
if ls := workerHandler.LocalStore(); ls != nil {
go ls.StartFlushLoop(ctx, 30*time.Second)
}
// 启动去重缓存清理
workerHandler.StartDedupEviction(ctx, 5*time.Minute, 30*time.Second)
// 注册到调度器
client.Register(ctx, registerReq)
// 心跳循环
ticker := time.NewTicker(10 * time.Second)
for {
heartbeatReq := apiservice.WorkerHeartbeatRequest{
WorkerID: workerID,
CurrentLoad: runningTasks,
RunningTasks: runningTasks,
}
client.Heartbeat(ctx, heartbeatReq)
<-ticker.C
}
心跳机制
Worker 每 10 秒向调度器发送一次心跳,报告当前负载和运行中的任务数:
// internal/worker/heartbeat_client.go
func (c *HeartbeatClient) Heartbeat(ctx context.Context, req service.WorkerHeartbeatRequest) error {
if c.protocol == "grpc" {
return c.heartbeatGRPC(ctx, req)
}
return c.post(ctx, "/api/v1/workers/heartbeat", req)
}
心跳客户端同样支持 HTTP 和 gRPC 双协议,与调度器的 Dispatch Client 形成对称设计。
调度器端的 Health Checker 每 10 秒扫描一次 Worker 列表,将 30 秒内未上报心跳的 Worker 标记为离线:
// internal/scheduler/health/checker.go
count, err := c.workers.EvictStaleWorkers(ctx, c.timeout) // timeout = 30s
心跳间隔的权衡
| 间隔 | 故障发现时间 | 网络开销 | 适用场景 |
|---|---|---|---|
| 1s | 1-3s | 高 | 对延迟极敏感 |
| 10s | 10-30s | 中 | 平衡(当前选择) |
| 60s | 60-180s | 低 | 大规模集群 |
10 秒的间隔是一个经验值:在大多数网络环境下,10 秒 × 3 次(30 秒超时)可以有效区分"网络抖动"和"真实失联"。
去重与幂等
sync.Map 去重缓存
Worker 使用 sync.Map 维护了一个去重缓存,防止同一个任务实例被重复执行:
// internal/worker/handler.go
type Handler struct {
dedupMap sync.Map
}
func (h *Handler) isDuplicate(scheduleID string) bool {
if _, loaded := h.dedupMap.LoadOrStore(scheduleID, time.Now()); loaded {
return true
}
return false
}
LoadOrStore 是原子操作:如果 key 已存在返回 loaded=true,不存在则写入并返回 loaded=false。这保证了在并发场景下也不会有重复执行。
TTL 清理
去重缓存不能无限增长,后台有一个清理循环:
func (h *Handler) StartDedupEviction(ctx context.Context, ttl, interval time.Duration) {
go func() {
ticker := time.NewTicker(interval) // 30s
for {
cutoff := time.Now().Add(-ttl) // 5min
h.dedupMap.Range(func(key, value any) bool {
if seen, ok := value.(time.Time); ok && seen.Before(cutoff) {
h.dedupMap.Delete(key)
}
return true
})
}
}()
}
5 分钟的 TTL 足够覆盖大多数场景:调度器重试间隔通常是秒级,同一个实例在 5 分钟内被重复分发的概率已经很低。
幂等环境变量
Worker 将 IdempotencyKey 注入任务执行环境,让业务逻辑可以实现自身的幂等控制:
output, execErr = Execute(ctx, req.TaskType, req.Payload, req.Image, map[string]string{
"IDEMPOTENCY_KEY": req.IdempotencyKey,
"SHARD_NO": fmt.Sprintf("%d", req.ShardNo),
"SHARD_TOTAL": fmt.Sprintf("%d", req.ShardTotal),
"SCHEDULE_INSTANCE_ID": req.ScheduleInstanceID,
})
<Callout type="info" title="为什么需要两层去重?"
Worker 端的 sync.Map 去重防止的是"同一个 Worker 被重复分发"。但如果调度器将同一个实例分发给了两个不同的 Worker,仅靠单个 Worker 的去重是不够的。这个场景由调度器保证——它通过数据库唯一约束确保同一个 ScheduleInstanceID 不会创建多个 TaskInstance 记录。
本地缓冲:断网续传
当 Worker 完成任务后,如果上报状态到调度器失败(网络断开、调度器宕机),会将报告写入本地磁盘:
// internal/worker/handler.go
if err := h.reporter.Report(ctx, req.SchedulerURL, statusReq); err != nil {
if h.localStore != nil {
h.localStore.Buffer(req.SchedulerURL, statusReq)
}
}
本地存储的实现
// internal/worker/local_store.go
func (s *Store) Buffer(schedulerURL string, report apiservice.TaskStatusReportRequest) {
sr := StoredReport{
SchedulerURL: schedulerURL,
Report: report,
StoredAt: time.Now().Unix(),
}
data, _ := json.Marshal(sr)
filename := filepath.Join(s.dir, report.ScheduleInstanceID+".json")
os.WriteFile(filename, data, 0600)
}
每个报告序列化为 JSON,文件名使用 ScheduleInstanceID,保证唯一性。
后台刷新
Flush loop 每 30 秒读取目录,逐个重试投递:
func (s *Store) Flush(ctx context.Context) int {
entries, _ := os.ReadDir(s.dir)
var remaining int
for _, entry := range entries {
filename := filepath.Join(s.dir, entry.Name())
data, _ := os.ReadFile(filename)
var sr StoredReport
json.Unmarshal(data, &sr)
deliveryCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
err = s.reporter.Report(deliveryCtx, sr.SchedulerURL, sr.Report)
cancel()
if err != nil {
remaining++
continue
}
os.Remove(filename)
}
return remaining
}
<Callout type="tip" title="为什么用文件系统而不是内存队列?"
文件系统的优势在于进程重启后数据不丢失。如果 Worker 进程在缓冲了报告后崩溃,内存队列会丢失数据,而文件系统中的 JSON 文件会在重启后继续被刷新。
沙箱执行
对于 Shell 类型的任务,Worker 支持在沙箱中执行,提供工作目录隔离和资源限制:
// internal/worker/sandbox.go
type SandboxConfig struct {
WorkDir string
MaxMemoryBytes int64
MaxCPUPercent int
Timeout time.Duration
}
type Sandbox struct {
workDir string
config SandboxConfig
}
隔离机制
func (s *Sandbox) ShellExec(ctx context.Context, command string, extraEnv map[string]string) ([]byte, error) {
cmd := exec.CommandContext(ctx, "bash", "-lc", command)
cmd.Dir = s.workDir
// 环境隔离:只保留 PATH,其余重置
cmd.Env = []string{
"PATH=" + os.Getenv("PATH"),
"HOME=" + s.workDir,
"USER=nobody",
"TMPDIR=" + s.workDir,
"SANDBOX=true",
}
// 脱离父进程组,独立管理生命周期
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
applyResourceLimits(cmd, s.config)
return cmd.CombinedOutput()
}
沙箱的几个关键点:
- 独立工作目录:每次创建临时目录,任务执行完后清理
- 环境变量隔离:只保留
PATH,重置HOME、TMPDIR等 - 进程组隔离:
Setpgid: true让子进程脱离父进程组,可以独立 kill - 资源限制:通过 cgroup 限制内存和 CPU(
applyResourceLimits是平台相关实现)
沙箱的局限
当前实现是一个轻量级沙箱,不是容器级别的隔离:
- 可以访问宿主机的文件系统(通过
PATH中的命令) - 可以访问网络
- 共享宿主机的用户空间
对于需要更强隔离的场景,建议将任务类型设为 container,由外部容器运行时(Docker/containerd)执行。
执行流程全景
一个任务到达 Worker 后的完整处理流程:
接收 HTTP/gRPC 请求
→ 解析 ExecuteTaskRequest
→ isDuplicate()?
├─ 是 → 直接返回(已执行)
└─ 否 → 继续
→ running.Add(1)
→ 上报 "running" 状态
→ 创建带超时的 context
→ 按任务类型执行
├─ shell + sandboxDir → 沙箱执行
└─ 其他 → 直接执行
→ 收集输出和错误
→ 上报最终状态
├─ 成功 → 删除本地缓冲
└─ 失败 → Buffer 到本地磁盘
→ running.Add(-1)
总结
Worker 的高可用设计可以概括为**"自治 + 防御"**:
| 机制 | 解决的问题 | 实现方式 |
|---|---|---|
| 心跳 | Worker 失联检测 | 10s 上报,30s 超时驱逐 |
| 去重缓存 | 重复分发导致重复执行 | sync.Map + 5min TTL |
| 幂等 Key | 业务层幂等 | 环境变量注入 |
| 本地缓冲 | 网络断开导致状态丢失 | 文件系统缓冲 + 30s 刷新 |
| 沙箱 | 任务脚本污染/泄漏 | 工作目录隔离 + 进程组分离 |
Worker 被设计为"无状态"的执行节点,除了本地缓冲目录外不持有任何持久化状态。这使得 Worker 可以随时重启、水平扩展,而不影响系统的整体可用性。
下一篇会进入 AI 服务的设计,分析 LLM Agent 的工具调用框架和推理循环。