LLM Agent 设计与工具调用框架
深入 go-ai-scheduler 的 AI 服务设计,拆解 LLM Agent 的 10 轮推理循环、12 个工具函数和 SSE 流式响应的实现。
问题背景
将大模型集成到运维系统中,最常见的两个陷阱是:
- 幻觉:LLM 编造不存在的任务状态或错误信息
- 越权:LLM 直接修改生产配置,导致事故
go-ai-scheduler 的 AI 服务通过两个设计来规避这些问题:
- 工具调用(Function Calling):LLM 不直接访问数据库,只能通过预定义的工具查询和操作
- 建议层隔离:AI 服务不触碰调度核心链路,所有操作通过 API 代理
这篇文章会拆解 Agent 的推理循环、工具注册机制和流式响应实现。
Agent 推理循环
Agent 的核心是一个最多 10 轮的工具调用循环:
// internal/ai/agent/engine.go
const maxToolIterations = 10
func Run(ctx context.Context, llm *adapter.LLMAdapter, registry *tools.Registry,
systemPrompt string, history []adapter.Message, userMessage string,
sw stream.EventWriter) (*RunResult, error) {
messages := []adapter.Message{
{Role: "system", Content: systemPrompt},
}
messages = append(messages, history...)
messages = append(messages, adapter.Message{Role: "user", Content: userMessage})
toolDefs := registry.Definitions()
for iteration := 0; iteration < maxToolIterations; iteration++ {
ch := llm.CompleteStreamWithClient(ctx, agentClient, messages, toolDefs)
var currentContent strings.Builder
var pendingToolCalls []adapter.ToolCall
// 消费 SSE 流
for ev := range ch {
if ev.Error != nil { /* ... */ }
if ev.DeltaContent != "" {
currentContent.WriteString(ev.DeltaContent)
sw.Text(ev.DeltaContent) // 实时推送到前端
}
if len(ev.ToolCalls) > 0 {
pendingToolCalls = ev.ToolCalls
}
}
if len(pendingToolCalls) > 0 {
// 执行工具,将结果反馈给 LLM
for _, tc := range pendingToolCalls {
result, err := registry.Execute(ctx, tc.Function.Name, tc.Function.Arguments)
messages = append(messages, adapter.Message{
Role: "tool",
ToolCallID: tc.ID,
Content: fmt.Sprintf("%v", result),
})
}
continue // 下一轮推理
}
// 没有工具调用,输出最终答案
sw.Done()
return &RunResult{Content: currentContent.String()}, nil
}
return nil, fmt.Errorf("max tool iterations exceeded")
}
循环结构
每一轮循环的内部流程:
LLM 生成响应
→ 如果是文本 → 流式输出给用户
→ 如果是工具调用 → 执行工具 → 将结果附加到 messages
→ 继续下一轮
这个设计参考了 OpenAI 的 Function Calling 模式,但有几个自定义扩展:
- SSE 流式输出:用户不需要等待整个循环结束,可以实时看到 LLM 的思考过程
- ** reasoning content**:支持 DeepSeek 等模型的思维链输出
- 最大轮数限制:防止 LLM 陷入无限循环(比如反复查询同一个工具)
12 个工具函数
AI Agent 注册了 12 个工具,覆盖查询、分析、操作三大类:
| 工具 | 类型 | 功能 |
|---|---|---|
query_tasks | 查询 | 按名称/状态筛选任务列表 |
query_instances | 查询 | 按状态/时间范围筛选实例 |
query_workers | 查询 | 查看 Worker 节点状态 |
get_task_detail | 查询 | 获取任务详情和依赖关系 |
get_system_health | 查询 | 系统整体健康概览 |
get_worker_load_history | 查询 | Worker 负载历史趋势 |
analyze_failure | 分析 | 分析失败实例的根因 |
create_task | 操作 | 创建新任务 |
trigger_task | 操作 | 手动触发任务 |
pause_task | 操作 | 暂停/恢复任务 |
retry_failed_instance | 操作 | 重试失败实例 |
delete_task | 操作 | 删除任务 |
工具定义的结构
每个工具都需要提供定义(Schema)和实现(Execute):
// internal/ai/tools/registry.go
type Tool interface {
Definition() adapter.Tool
Execute(ctx context.Context, args json.RawMessage) (any, error)
}
定义使用 OpenAI 兼容的 JSON Schema:
// internal/ai/tools/tools.go
func (t *queryTasksTool) Definition() adapter.Tool {
return adapter.Tool{
Type: "function",
Function: adapter.FunctionDef{
Name: "query_tasks",
Description: "查询任务列表。可按名称模糊匹配、状态筛选。",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"name": map[string]any{"type": "string", "description": "任务名称关键词"},
"status": map[string]any{"type": "string", "description": "任务状态"},
"limit": map[string]any{"type": "integer", "description": "返回数量上限"},
},
},
},
}
}
工具注册
所有工具在启动时注册到 Registry:
// internal/ai/tools/registry.go
func AllTools(bundle *repo.Bundle) []Tool {
return []Tool{
&queryTasksTool{bundle: bundle},
&queryInstancesTool{bundle: bundle},
&queryWorkersTool{bundle: bundle},
// ... 共 12 个
}
}
// 在 ai-service 启动时
registry := tools.NewRegistry(tools.AllTools(repoBundle)...)
SSE 流式响应
AI 服务使用 Server-Sent Events (SSE) 向前端推送流式响应:
// internal/ai/adapter/adapter.go
func (a *LLMAdapter) CompleteStream(ctx context.Context, messages []Message, tools []Tool) <-chan StreamEvent {
ch := make(chan StreamEvent, 8)
go a.runStream(ctx, ch, messages, tools)
return ch
}
事件类型
流式通道 emits 三种事件:
type StreamEvent struct {
DeltaContent string // 增量文本
ReasoningContent string // 思维链(DeepSeek 风格)
ToolCalls []ToolCall // 工具调用请求
FinishReason string // 完成原因
Error error
}
工具调用的流式累积
LLM 的工具调用参数可能分多个 chunk 返回。Adapter 使用 map 按 Index 累积:
accum := make(map[int]*ToolCall)
for scanner.Scan() {
// 解析 SSE data 行
for _, tc := range delta.ToolCalls {
if existing, ok := accum[tc.Index]; ok {
existing.Function.Arguments += tc.Function.Arguments
} else {
accum[tc.Index] = &tc
}
}
}
前端展示
SSE 流让前端可以实时展示 Agent 的思考过程:
- 用户提问 → 前端建立 SSE 连接
- LLM 开始推理 → 实时显示文本
- LLM 调用工具 → 显示"正在查询任务列表..."
- 工具返回结果 → LLM 继续推理
- 输出最终答案 → 关闭连接
这种交互模式比"输入 → 等待 → 输出"的传统模式更符合人类对话直觉。
系统提示词的设计
系统提示词(System Prompt)是 Agent 行为的"宪法":
const SystemPrompt = `你是一个任务调度系统的 AI 运维助手。你可以:
1. 查询任务、实例、worker 的状态
2. 分析失败原因并给出修复建议
3. 创建、暂停、恢复、触发任务
4. 提供系统健康概览
你的回复风格:
- 使用中文,简洁明了
- 当查询结果很多时,只展示关键数据并总结
- 如果发现问题,给出具体的行动建议
- 不要编造你没有查询到的数据
当用户要创建一个任务但信息不全时,主动询问缺失的信息。
当用户询问系统状态时,先调用 get_system_health 获取概览数据。
当用户询问具体任务时,先调用 query_tasks 查找,再用 get_task_detail 查看详情。`
提示词中包含了几个关键的行为约束:
- 必须查询:不要凭记忆回答,每次都要调用工具获取最新数据
- 数据截断:结果太多时只展示关键信息
- 主动询问:信息不全时不猜测,而是反问用户
- 操作顺序:先概览再详情,避免一上来就查大量数据
安全边界
AI 不触碰核心链路
AI 服务运行在独立的 ai-service 进程中,通过 HTTP API 与 api 服务通信。它没有直接访问 scheduler 内部状态的权限。
操作需要信息确认
create_task 和 delete_task 等危险操作,工具实现中有参数校验:
func (t *createTaskTool) Execute(ctx context.Context, args json.RawMessage) (any, error) {
if p.Name == "" || p.Type == "" || p.CronExpr == "" || p.Payload == "" {
return nil, fmt.Errorf("name, type, cron_expr, and payload are required")
}
// 校验 cron 表达式
if _, err := cronexpr.NextAfter(time.Now(), p.CronExpr); err != nil {
return nil, fmt.Errorf("invalid cron expression")
}
// ...
}
LLM 如果生成不合法的参数,工具会返回错误,Agent 会将错误信息反馈给 LLM 让它修正。
审计日志
所有工具调用都记录在数据库中,包括调用时间、参数、结果。运维人员可以追溯 AI 的每一次操作。
总结
go-ai-scheduler 的 LLM Agent 设计体现了"能力强大但边界清晰"的原则:
- 12 个工具覆盖查询、分析、操作全场景
- 10 轮推理循环保证复杂任务可以被分解执行
- SSE 流式响应提供良好的交互体验
- 系统提示词约束行为,防止幻觉和越权
这个 Agent 不是"万能助手",而是一个有明确能力边界的运维 copilot。它不能替代运维人员的判断,但可以大幅降低查询和分析的工作量。
下一篇会分析自然语言到任务定义的转换,以及 AI 服务在整体架构中的位置。