AI 辅助调度:智能建议、日志分析与任务生成
深入分析 go-ai-scheduler 的 AI 辅助模块,理解 Advisor、Log Analysis、Task Parser 和 Chat Agent 的实现与边界。
AI 的设计边界
go-ai-scheduler 对 AI 有一条硬约束:AI 不允许决定任务是否运行。
允许:AI 建议"Worker-3 负载过高,建议迁移任务"
禁止:AI 直接调用 API 修改任务分发目标
允许:AI 分析"失败原因是数据库连接池耗尽"
禁止:AI 自动重启 Worker 或修改任务配置
AI 是顾问(Advisor),不是操作者(Operator)。所有 AI 输出都是建议性的,最终决策权在 Scheduler。
AI Service 架构
┌─────────────────────────────┐
│ AI Service │
│ :8083 │
└─────────────┬───────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Chat │ │ Advisor │ │ Analysis │
│ Agent │ │ Service │ │ Service │
│ (SSE/WS) │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└─────────────────────────┼─────────────────────────┘
│
┌────────▼────────┐
│ LLM Adapter │
│ (OpenAI-compat) │
└─────────────────┘
Chat Agent
Chat Agent 是对话式 AI,用户可以通过 Web Console 与调度系统对话。Agent 拥有 12 个工具函数:
var tools = []Tool{
{Name: "query_tasks", Description: "查询任务列表"},
{Name: "query_instances", Description: "查询实例列表"},
{Name: "query_workers", Description: "查询 Worker 状态"},
{Name: "get_task_detail", Description: "获取任务详情"},
{Name: "get_system_health", Description: "获取系统健康状态"},
{Name: "analyze_failure", Description: "分析失败原因"},
{Name: "create_task", Description: "创建新任务"},
{Name: "trigger_task", Description: "手动触发任务"},
{Name: "pause_task", Description: "暂停任务"},
{Name: "retry_failed_instance", Description: "重试失败实例"},
{Name: "delete_task", Description: "删除任务"},
{Name: "get_worker_load_history", Description: "获取 Worker 负载历史"},
}
工具调用循环
func (a *Agent) Chat(ctx context.Context, message string) (<-chan string, error) {
messages := []Message{{Role: "user", Content: message}}
for {
// 1. 调用 LLM
response, _ := a.llm.Complete(ctx, a.systemPrompt, messages)
// 2. 检查是否需要工具调用
if response.ToolCalls == nil {
// 直接回复,结束循环
return response.Content, nil
}
// 3. 执行工具
for _, tc := range response.ToolCalls {
result := a.executeTool(ctx, tc.Name, tc.Arguments)
messages = append(messages,
Message{Role: "assistant", ToolCalls: []ToolCall{tc}},
Message{Role: "tool", Content: result, ToolCallID: tc.ID},
)
}
// 4. 带着工具结果再次调用 LLM
}
}
典型的多轮对话:
用户: "最近有什么任务失败了?"
AI: 调用 query_instances(status="failed")
→ 返回 3 个失败实例
AI: 调用 analyze_failure(instance_id=123)
→ 返回"数据库连接超时"
AI: "最近有 3 个任务失败,其中实例 123 的失败原因是数据库连接超时,
建议检查数据库连接池配置或增加超时时间。"
流式输出
支持 SSE 和 WebSocket 两种协议:
// SSE
func (h *Handler) ChatSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
for chunk := range agent.Chat(ctx, message) {
fmt.Fprintf(w, "data: %s\n\n", chunk)
w.(http.Flusher).Flush()
}
}
// WebSocket
func (h *Handler) ChatWS(ws *websocket.Conn) {
for chunk := range agent.Chat(ctx, message) {
ws.WriteJSON(map[string]string{"content": chunk})
}
}
Advisor:智能建议
Advisor 根据系统指标生成调度优化建议:
type Context struct {
AvgWorkerLoad float64 // 平均 Worker 负载
TotalWorkers int // Worker 总数
OnlineWorkers int // 在线 Worker 数
PendingInstances int // pending 实例数
FailedLastHour int // 近一小时失败数
AvgDispatchLatencyMs float64 // 平均分发延迟
MaxPendingConfig int // 配置的最大 pending 数
}
func Generate(ctx context.Context, llm *adapter.LLMAdapter, sctx Context) ([]Advice, error) {
userPrompt := fmt.Sprintf(`
Avg worker load: %.2f%%
Total workers: %d (online: %d)
Pending instances: %d (max: %d)
Failed in last hour: %d
Avg dispatch latency: %.2f ms`,
sctx.AvgWorkerLoad*100, sctx.TotalWorkers, sctx.OnlineWorkers,
sctx.PendingInstances, sctx.MaxPendingConfig,
sctx.FailedLastHour, sctx.AvgDispatchLatencyMs)
result, _ := llm.Complete(ctx, prompts.Advisor, userPrompt)
var advices []Advice
json.Unmarshal([]byte(result), &advices)
return advices, nil
}
建议结构
type Advice struct {
Type string `json:"type"` // "throttle" / "migrate" / "scale"
Title string `json:"title"`
Description string `json:"description"`
Confidence float64 `json:"confidence"` // 0.0 - 1.0
AutoApply bool `json:"auto_apply"` // 是否可自动应用
}
示例输出:
[
{
"type": "scale",
"title": "建议扩容 Worker",
"description": "平均负载 85%, pending 实例 800/1000,建议增加 2 个 Worker",
"confidence": 0.92,
"auto_apply": false
}
]
Log Analysis:失败根因分析
当任务失败时,AI 自动分析日志:
type logAnalysisRequest struct {
Log string `json:"log"`
ErrorCode string `json:"error_code"`
TaskType string `json:"task_type"`
RetryCount int `json:"retry_count"`
InstanceID int64 `json:"instance_id,omitempty"`
}
分析结果存储在 task_instance.AnalysisJSON 字段:
{
"root_cause": "数据库连接池耗尽",
"error_pattern": "too many connections",
"suggested_action": "增加 max_connections 或引入连接池复用",
"severity": "high",
"retry_recommendation": "建议等待 5 分钟后重试"
}
Task Parser:自然语言生成任务
用户可以用自然语言描述任务,AI 将其转换为任务定义:
用户: "每天凌晨 2 点清理 /tmp 目录下 7 天前的日志文件"
AI 输出:
{
"name": "清理临时日志",
"type": "shell",
"cron_expr": "0 2 * * *",
"payload": "find /tmp -name '*.log' -mtime +7 -delete",
"timeout_seconds": 300,
"max_retry": 3
}
限流与降级
速率限制
var rl *ratelimit.TokenBucket
if rateLimitRPM > 0 {
rl = ratelimit.NewTokenBucket(rateLimitRPM/60, rateLimitRPM)
}
// 每个 LLM 请求前检查
if rl != nil && !rl.Allow() {
writeJSON(w, http.StatusTooManyRequests, map[string]string{"error": "rate limit exceeded"})
return
}
}
LLM 不可用降级
func Generate(ctx context.Context, llm *adapter.LLMAdapter, sctx Context) ([]Advice, error) {
if llm == nil || !llm.Enabled() {
return nil, ErrLLMRequired // 返回错误,调用方决定如何处理
}
// ...
}
当 LLM 不可用时,AI Service 返回 ErrLLMRequired,调用方可以选择:
- 返回错误给客户端
- 使用预设的启发式规则替代
- 记录日志并跳过
小结
AI 辅助模块的设计要点:
- 边界清晰:AI 只建议不决策,所有操作需人工或 Scheduler 确认
- 工具化:Chat Agent 通过 12 个工具函数与系统交互,而非直接操作数据库
- 流式体验:SSE/WebSocket 支持实时输出,提升交互体验
- 限流保护:Token Bucket 防止 LLM 费用失控
- 优雅降级:LLM 不可用时系统仍然可用
AI 不是调度系统的必需品,但它让运维人员从"看日志找问题"变成"问 AI 拿建议",显著降低了问题排查的门槛。