AI 辅助调度:智能建议、日志分析与任务生成

深入分析 go-ai-scheduler 的 AI 辅助模块,理解 Advisor、Log Analysis、Task Parser 和 Chat Agent 的实现与边界。

AI 的设计边界

go-ai-scheduler 对 AI 有一条硬约束:AI 不允许决定任务是否运行

允许:AI 建议"Worker-3 负载过高,建议迁移任务"
禁止:AI 直接调用 API 修改任务分发目标

允许:AI 分析"失败原因是数据库连接池耗尽"
禁止:AI 自动重启 Worker 或修改任务配置

AI 是顾问(Advisor),不是操作者(Operator)。所有 AI 输出都是建议性的,最终决策权在 Scheduler。

AI Service 架构

                    ┌─────────────────────────────┐
                    │        AI Service           │
                    │           :8083             │
                    └─────────────┬───────────────┘
                                  │
        ┌─────────────────────────┼─────────────────────────┐
        │                         │                         │
 ┌──────▼──────┐          ┌──────▼──────┐          ┌──────▼──────┐
 │   Chat      │          │   Advisor   │          │   Analysis  │
 │   Agent     │          │   Service   │          │   Service   │
 │  (SSE/WS)   │          │             │          │             │
 └─────────────┘          └─────────────┘          └─────────────┘
        │                         │                         │
        └─────────────────────────┼─────────────────────────┘
                                  │
                         ┌────────▼────────┐
                         │   LLM Adapter   │
                         │ (OpenAI-compat) │
                         └─────────────────┘

Chat Agent

Chat Agent 是对话式 AI,用户可以通过 Web Console 与调度系统对话。Agent 拥有 12 个工具函数:

var tools = []Tool{
    {Name: "query_tasks",        Description: "查询任务列表"},
    {Name: "query_instances",    Description: "查询实例列表"},
    {Name: "query_workers",      Description: "查询 Worker 状态"},
    {Name: "get_task_detail",    Description: "获取任务详情"},
    {Name: "get_system_health",  Description: "获取系统健康状态"},
    {Name: "analyze_failure",    Description: "分析失败原因"},
    {Name: "create_task",        Description: "创建新任务"},
    {Name: "trigger_task",       Description: "手动触发任务"},
    {Name: "pause_task",         Description: "暂停任务"},
    {Name: "retry_failed_instance", Description: "重试失败实例"},
    {Name: "delete_task",        Description: "删除任务"},
    {Name: "get_worker_load_history", Description: "获取 Worker 负载历史"},
}

工具调用循环

func (a *Agent) Chat(ctx context.Context, message string) (<-chan string, error) {
    messages := []Message{{Role: "user", Content: message}}

    for {
        // 1. 调用 LLM
        response, _ := a.llm.Complete(ctx, a.systemPrompt, messages)

        // 2. 检查是否需要工具调用
        if response.ToolCalls == nil {
            // 直接回复,结束循环
            return response.Content, nil
        }

        // 3. 执行工具
        for _, tc := range response.ToolCalls {
            result := a.executeTool(ctx, tc.Name, tc.Arguments)
            messages = append(messages,
                Message{Role: "assistant", ToolCalls: []ToolCall{tc}},
                Message{Role: "tool", Content: result, ToolCallID: tc.ID},
            )
        }
        // 4. 带着工具结果再次调用 LLM
    }
}

典型的多轮对话:

用户: "最近有什么任务失败了?"
AI: 调用 query_instances(status="failed")
    → 返回 3 个失败实例
AI: 调用 analyze_failure(instance_id=123)
    → 返回"数据库连接超时"
AI: "最近有 3 个任务失败,其中实例 123 的失败原因是数据库连接超时,
       建议检查数据库连接池配置或增加超时时间。"

流式输出

支持 SSE 和 WebSocket 两种协议:

// SSE
func (h *Handler) ChatSSE(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    for chunk := range agent.Chat(ctx, message) {
        fmt.Fprintf(w, "data: %s\n\n", chunk)
        w.(http.Flusher).Flush()
    }
}

// WebSocket
func (h *Handler) ChatWS(ws *websocket.Conn) {
    for chunk := range agent.Chat(ctx, message) {
        ws.WriteJSON(map[string]string{"content": chunk})
    }
}

Advisor:智能建议

Advisor 根据系统指标生成调度优化建议:

type Context struct {
    AvgWorkerLoad        float64  // 平均 Worker 负载
    TotalWorkers         int      // Worker 总数
    OnlineWorkers        int      // 在线 Worker 数
    PendingInstances     int      // pending 实例数
    FailedLastHour       int      // 近一小时失败数
    AvgDispatchLatencyMs float64  // 平均分发延迟
    MaxPendingConfig     int      // 配置的最大 pending 数
}

func Generate(ctx context.Context, llm *adapter.LLMAdapter, sctx Context) ([]Advice, error) {
    userPrompt := fmt.Sprintf(`
Avg worker load: %.2f%%
Total workers: %d (online: %d)
Pending instances: %d (max: %d)
Failed in last hour: %d
Avg dispatch latency: %.2f ms`,
        sctx.AvgWorkerLoad*100, sctx.TotalWorkers, sctx.OnlineWorkers,
        sctx.PendingInstances, sctx.MaxPendingConfig,
        sctx.FailedLastHour, sctx.AvgDispatchLatencyMs)

    result, _ := llm.Complete(ctx, prompts.Advisor, userPrompt)
    var advices []Advice
    json.Unmarshal([]byte(result), &advices)
    return advices, nil
}

建议结构

type Advice struct {
    Type        string   `json:"type"`        // "throttle" / "migrate" / "scale"
    Title       string   `json:"title"`
    Description string   `json:"description"`
    Confidence  float64  `json:"confidence"`   // 0.0 - 1.0
    AutoApply   bool     `json:"auto_apply"`   // 是否可自动应用
}

示例输出:

[
  {
    "type": "scale",
    "title": "建议扩容 Worker",
    "description": "平均负载 85%, pending 实例 800/1000,建议增加 2 个 Worker",
    "confidence": 0.92,
    "auto_apply": false
  }
]

Log Analysis:失败根因分析

当任务失败时,AI 自动分析日志:

type logAnalysisRequest struct {
    Log        string `json:"log"`
    ErrorCode  string `json:"error_code"`
    TaskType   string `json:"task_type"`
    RetryCount int    `json:"retry_count"`
    InstanceID int64  `json:"instance_id,omitempty"`
}

分析结果存储在 task_instance.AnalysisJSON 字段:

{
  "root_cause": "数据库连接池耗尽",
  "error_pattern": "too many connections",
  "suggested_action": "增加 max_connections 或引入连接池复用",
  "severity": "high",
  "retry_recommendation": "建议等待 5 分钟后重试"
}

Task Parser:自然语言生成任务

用户可以用自然语言描述任务,AI 将其转换为任务定义:

用户: "每天凌晨 2 点清理 /tmp 目录下 7 天前的日志文件"

AI 输出:
{
  "name": "清理临时日志",
  "type": "shell",
  "cron_expr": "0 2 * * *",
  "payload": "find /tmp -name '*.log' -mtime +7 -delete",
  "timeout_seconds": 300,
  "max_retry": 3
}

限流与降级

速率限制

var rl *ratelimit.TokenBucket
if rateLimitRPM > 0 {
    rl = ratelimit.NewTokenBucket(rateLimitRPM/60, rateLimitRPM)
}

// 每个 LLM 请求前检查
if rl != nil && !rl.Allow() {
    writeJSON(w, http.StatusTooManyRequests, map[string]string{"error": "rate limit exceeded"})
    return
}
}

LLM 不可用降级

func Generate(ctx context.Context, llm *adapter.LLMAdapter, sctx Context) ([]Advice, error) {
    if llm == nil || !llm.Enabled() {
        return nil, ErrLLMRequired  // 返回错误,调用方决定如何处理
    }
    // ...
}

当 LLM 不可用时,AI Service 返回 ErrLLMRequired,调用方可以选择:

  • 返回错误给客户端
  • 使用预设的启发式规则替代
  • 记录日志并跳过

小结

AI 辅助模块的设计要点:

  1. 边界清晰:AI 只建议不决策,所有操作需人工或 Scheduler 确认
  2. 工具化:Chat Agent 通过 12 个工具函数与系统交互,而非直接操作数据库
  3. 流式体验:SSE/WebSocket 支持实时输出,提升交互体验
  4. 限流保护:Token Bucket 防止 LLM 费用失控
  5. 优雅降级:LLM 不可用时系统仍然可用

AI 不是调度系统的必需品,但它让运维人员从"看日志找问题"变成"问 AI 拿建议",显著降低了问题排查的门槛。