Agent系统监控与调试

深入探讨Agent系统的监控与调试技术,包括指标设计、分布式追踪、实时调试工具和性能优化

概述与动机

在大语言模型驱动的Agent系统中,监控和调试是保证系统稳定性、性能和可靠性的关键环节。与传统的软件系统不同,Agent系统具有非确定性、复杂交互链路和动态行为等特点,这使得传统的监控和调试方法面临新的挑战。Agent的输出可能因模型随机性而变化,多Agent协作涉及复杂的交互模式,而上下文依赖使得错误传播难以追踪。

有效的监控系统需要从多个维度捕获Agent系统的运行状态,包括性能指标、行为模式、资源使用和业务结果。性能指标帮助识别系统瓶颈,行为模式监控帮助发现异常情况,资源使用监控确保系统稳定运行,业务结果监控验证系统有效性。这些监控数据不仅用于问题诊断,还为系统优化和决策提供数据支持。

调试Agent系统需要专门的工具和方法论。传统的断点调试在分布式、异步的Agent环境中效果有限。我们需要基于日志、追踪和状态监控的调试方法,能够重现问题场景、分析执行路径、识别错误根源。此外,Agent系统的调试还需要考虑模型行为的不确定性,需要从概率角度理解系统行为。

本文将深入探讨Agent系统的监控与调试技术,从基础架构到具体实现,提供完整的解决方案和最佳实践,帮助读者构建可观测性强、易于调试的Agent系统。

核心概念与架构设计

监控系统架构

Agent系统监控需要采用分层架构,从基础设施到业务逻辑全面覆盖。下图展示了完整的监控系统架构:

Rendering diagram...

核心监控指标设计

Agent系统的监控指标需要覆盖多个维度,确保全面了解系统状态:

性能指标:衡量系统的性能表现,包括响应时间、吞吐量、资源利用率等。关键指标包括:

  • 响应时间:P50、P95、P99延迟,区分不同类型的请求
  • 吞吐量:每秒处理请求数(RPS)、每分钟处理请求数(RPM)
  • 资源利用率:CPU使用率、内存使用率、网络带宽、GPU利用率
  • 并发度:活跃连接数、并发请求数、队列长度

业务指标:衡量系统对业务目标的贡献,包括任务完成率、输出质量、用户满意度等:

  • 任务完成率:成功完成的任务占总任务的比例
  • 输出质量:基于用户反馈、自动化评分的质量指标
  • 用户满意度:用户评分、反馈情绪、留存率
  • 成本效益:单位任务成本、资源使用效率

模型指标:监控模型本身的性能和行为:

  • 推理时间:模型推理的平均时间和分布
  • Token使用:输入Token、输出Token、总Token数
  • 成本指标:每次推理的成本、总体成本趋势
  • 模型选择:不同模型的使用分布和效果

系统健康指标:评估系统的整体健康状况:

  • 可用性:系统正常运行时间的比例
  • 错误率:失败请求占总请求的比例
  • 依赖健康:外部依赖(数据库、API)的健康状态
  • 容量指标:当前负载与系统容量的比例

分布式追踪系统

分布式追踪是理解Agent系统中请求流转的关键技术。Agent系统通常涉及多个组件的协同工作,一个用户请求可能触发多个Agent的交互,分布式追踪能够帮助我们:

完整链路可视化:追踪从用户请求到最终响应的完整路径,包括每个Agent的处理过程、API调用、数据访问等。链路可视化帮助理解系统架构和性能瓶颈。

性能瓶颈识别:通过分析链路中各个节点的耗时,识别性能瓶颈。性能瓶颈可能出现在特定Agent、模型推理、数据库查询等环节。

错误传播分析:追踪错误在系统中的传播路径,帮助理解错误根源。错误可能在某个环节产生,然后传播到后续环节,导致最终的失败。

依赖关系分析:分析系统组件之间的依赖关系,识别关键路径和单点故障。依赖关系分析有助于系统优化和容错设计。

调试方法论

Agent系统的调试需要采用专门的方法论:

基于日志的调试:通过分析日志记录重现问题场景。日志应该包含足够的上下文信息,包括输入、输出、中间状态、错误信息等。日志级别应该合理设置,避免产生过多日志影响性能。

基于追踪的调试:使用分布式追踪信息理解问题发生时的系统状态。追踪信息提供了请求的完整执行路径,有助于定位问题发生的具体环节。

状态快照调试:在关键时刻保存系统状态快照,用于问题分析。状态快照可以包括Agent状态、上下文内容、变量值等。

回放调试:重放历史请求,观察系统行为。回放调试需要确保环境一致性,可能需要模拟外部依赖的行为。

对比调试:对比正常和异常情况的差异,识别问题特征。对比调试可以包括日志对比、指标对比、状态对比等。

关键技术实现

指标采集系统实现

下面是一个完整的指标采集系统实现,支持多种指标类型和采集策略:

import time
import threading
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from collections import defaultdict, deque
from enum import Enum
import statistics
import json
import sqlite3
from datetime import datetime, timedelta

class MetricType(Enum):
    """指标类型"""
    COUNTER = "counter"      # 计数器
    GAUGE = "gauge"          # 仪表盘
    HISTOGRAM = "histogram"  # 直方图
    SUMMARY = "summary"      # 摘要

@dataclass
class MetricValue:
    """指标值"""
    name: str
    type: MetricType
    value: float
    timestamp: float
    labels: Dict[str, str] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)

class Metric:
    """指标基类"""
    
    def __init__(
        self,
        name: str,
        metric_type: MetricType,
        description: str = "",
        labels: Optional[List[str]] = None
    ):
        """
        初始化指标
        
        Args:
            name: 指标名称
            metric_type: 指标类型
            description: 指标描述
            labels: 标签列表
        """
        self.name = name
        self.type = metric_type
        self.description = description
        self.labels = labels or []
        self.created_at = time.time()
    
    def collect(self, **label_values) -> MetricValue:
        """收集指标值"""
        raise NotImplementedError
    
    def get_metadata(self) -> Dict[str, Any]:
        """获取指标元数据"""
        return {
            'name': self.name,
            'type': self.type.value,
            'description': self.description,
            'labels': self.labels,
            'created_at': self.created_at
        }

class Counter(Metric):
    """计数器指标"""
    
    def __init__(
        self,
        name: str,
        description: str = "",
        labels: Optional[List[str]] = None
    ):
        super().__init__(name, MetricType.COUNTER, description, labels)
        self._counters: Dict[tuple, float] = defaultdict(float)
        self._lock = threading.Lock()
    
    def inc(self, value: float = 1.0, **label_values):
        """
        增加计数器
        
        Args:
            value: 增加的值
            **label_values: 标签值
        """
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            self._counters[key] += value
    
    def collect(self, **label_values) -> MetricValue:
        """收集指标值"""
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            value = self._counters.get(key, 0.0)
        
        return MetricValue(
            name=self.name,
            type=self.type,
            value=value,
            timestamp=time.time(),
            labels=label_values
        )
    
    def reset(self):
        """重置所有计数器"""
        with self._lock:
            self._counters.clear()

class Gauge(Metric):
    """仪表盘指标"""
    
    def __init__(
        self,
        name: str,
        description: str = "",
        labels: Optional[List[str]] = None
    ):
        super().__init__(name, MetricType.GAUGE, description, labels)
        self._gauges: Dict[tuple, float] = defaultdict(float)
        self._lock = threading.Lock()
    
    def set(self, value: float, **label_values):
        """
        设置仪表盘值
        
        Args:
            value: 设置的值
            **label_values: 标签值
        """
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            self._gauges[key] = value
    
    def inc(self, value: float = 1.0, **label_values):
        """增加仪表盘值"""
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            self._gauges[key] += value
    
    def dec(self, value: float = 1.0, **label_values):
        """减少仪表盘值"""
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            self._gauges[key] -= value
    
    def collect(self, **label_values) -> MetricValue:
        """收集指标值"""
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            value = self._gauges.get(key, 0.0)
        
        return MetricValue(
            name=self.name,
            type=self.type,
            value=value,
            timestamp=time.time(),
            labels=label_values
        )

class Histogram(Metric):
    """直方图指标"""
    
    def __init__(
        self,
        name: str,
        buckets: Optional[List[float]] = None,
        description: str = "",
        labels: Optional[List[str]] = None
    ):
        super().__init__(name, MetricType.HISTOGRAM, description, labels)
        self.buckets = buckets or [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
        self._samples: Dict[tuple, List[float]] = defaultdict(list)
        self._lock = threading.Lock()
    
    def observe(self, value: float, **label_values):
        """
        观察值
        
        Args:
            value: 观察的值
            **label_values: 标签值
        """
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            self._samples[key].append(value)
    
    def collect(self, **label_values) -> MetricValue:
        """收集指标值"""
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            samples = self._samples.get(key, [])
        
        if not samples:
            return MetricValue(
                name=self.name,
                type=self.type,
                value=0.0,
                timestamp=time.time(),
                labels=label_values
            )
        
        # 计算百分位数
        sorted_samples = sorted(samples)
        count = len(sorted_samples)
        
        # 计算每个桶的计数
        bucket_counts = {}
        cumulative_count = 0
        for bucket in self.buckets:
            cumulative_count += sum(1 for sample in sorted_samples if sample <= bucket)
            bucket_counts[bucket] = cumulative_count
        
        # 计算百分位数
        percentiles = {
            'p50': sorted_samples[int(count * 0.5)] if count > 0 else 0,
            'p95': sorted_samples[int(count * 0.95)] if count > 0 else 0,
            'p99': sorted_samples[int(count * 0.99)] if count > 0 else 0
        }
        
        return MetricValue(
            name=self.name,
            type=self.type,
            value=statistics.mean(samples),
            timestamp=time.time(),
            labels=label_values,
            metadata={
                'count': count,
                'sum': sum(samples),
                'buckets': bucket_counts,
                'percentiles': percentiles
            }
        )
    
    def reset(self):
        """重置所有样本"""
        with self._lock:
            self._samples.clear()

class Summary(Metric):
    """摘要指标"""
    
    def __init__(
        self,
        name: str,
        quantiles: Optional[List[float]] = None,
        description: str = "",
        labels: Optional[List[str]] = None,
        max_age_seconds: int = 600,
        age_buckets: int = 5
    ):
        super().__init__(name, MetricType.SUMMARY, description, labels)
        self.quantiles = quantiles or [0.5, 0.9, 0.95, 0.99]
        self.max_age_seconds = max_age_seconds
        self.age_buckets = age_buckets
        self._samples: Dict[tuple, deque] = {}
        self._lock = threading.Lock()
    
    def observe(self, value: float, **label_values):
        """
        观察值
        
        Args:
            value: 观察的值
            **label_values: 标签值
        """
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            if key not in self._samples:
                self._samples[key] = deque(maxlen=1000)
            
            sample = (time.time(), value)
            self._samples[key].append(sample)
    
    def collect(self, **label_values) -> MetricValue:
        """收集指标值"""
        key = tuple(label_values.get(label, '') for label in self.labels)
        with self._lock:
            samples = list(self._samples.get(key, []))
        
        if not samples:
            return MetricValue(
                name=self.name,
                type=self.type,
                value=0.0,
                timestamp=time.time(),
                labels=label_values
            )
        
        # 过滤过期的样本
        current_time = time.time()
        recent_samples = [
            value for timestamp, value in samples
            if current_time - timestamp <= self.max_age_seconds
        ]
        
        if not recent_samples:
            return MetricValue(
                name=self.name,
                type=self.type,
                value=0.0,
                timestamp=current_time,
                labels=label_values
            )
        
        sorted_values = sorted(recent_samples)
        count = len(sorted_values)
        
        # 计算百分位数
        quantile_values = {}
        for quantile in self.quantiles:
            index = min(int(quantile * count), count - 1)
            quantile_values[f'p{int(quantile * 100)}'] = sorted_values[index]
        
        return MetricValue(
            name=self.name,
            type=self.type,
            value=statistics.mean(recent_samples),
            timestamp=current_time,
            labels=label_values,
            metadata={
                'count': count,
                'sum': sum(recent_samples),
                'quantiles': quantile_values
            }
        )

class MetricsRegistry:
    """指标注册表"""
    
    def __init__(self):
        self._metrics: Dict[str, Metric] = {}
        self._lock = threading.Lock()
    
    def register(self, metric: Metric) -> Metric:
        """
        注册指标
        
        Args:
            metric: 指标对象
            
        Returns:
            注册的指标对象
        """
        with self._lock:
            if metric.name in self._metrics:
                raise ValueError(f"Metric {metric.name} already registered")
            self._metrics[metric.name] = metric
        return metric
    
    def get_metric(self, name: str) -> Optional[Metric]:
        """
        获取指标
        
        Args:
            name: 指标名称
            
        Returns:
            指标对象或None
        """
        with self._lock:
            return self._metrics.get(name)
    
    def unregister(self, name: str):
        """
        注销指标
        
        Args:
            name: 指标名称
        """
        with self._lock:
            if name in self._metrics:
                del self._metrics[name]
    
    def collect_all(self) -> List[MetricValue]:
        """
        收集所有指标值
        
        Returns:
            指标值列表
        """
        metric_values = []
        with self._lock:
            for metric in self._metrics.values():
                try:
                    # 对于Counter和Gauge,收集所有标签组合的值
                    if metric.type in [MetricType.COUNTER, MetricType.GAUGE]:
                        # 这里简化处理,实际需要跟踪所有标签组合
                        metric_values.append(metric.collect())
                    else:
                        metric_values.append(metric.collect())
                except Exception as e:
                    print(f"Error collecting metric {metric.name}: {e}")
        
        return metric_values
    
    def get_all_metadata(self) -> List[Dict[str, Any]]:
        """
        获取所有指标的元数据
        
        Returns:
            元数据列表
        """
        with self._lock:
            return [metric.get_metadata() for metric in self._metrics.values()]
    
    def create_counter(
        self,
        name: str,
        description: str = "",
        labels: Optional[List[str]] = None
    ) -> Counter:
        """
        创建计数器
        
        Args:
            name: 指标名称
            description: 指标描述
            labels: 标签列表
            
        Returns:
            计数器对象
        """
        counter = Counter(name, description, labels)
        return self.register(counter)
    
    def create_gauge(
        self,
        name: str,
        description: str = "",
        labels: Optional[List[str]] = None
    ) -> Gauge:
        """
        创建仪表盘
        
        Args:
            name: 指标名称
            description: 指标描述
            labels: 标签列表
            
        Returns:
            仪表盘对象
        """
        gauge = Gauge(name, description, labels)
        return self.register(gauge)
    
    def create_histogram(
        self,
        name: str,
        buckets: Optional[List[float]] = None,
        description: str = "",
        labels: Optional[List[str]] = None
    ) -> Histogram:
        """
        创建直方图
        
        Args:
            name: 指标名称
            buckets: 桶边界
            description: 指标描述
            labels: 标签列表
            
        Returns:
            直方图对象
        """
        histogram = Histogram(name, buckets, description, labels)
        return self.register(histogram)
    
    def create_summary(
        self,
        name: str,
        quantiles: Optional[List[float]] = None,
        description: str = "",
        labels: Optional[List[str]] = None,
        max_age_seconds: int = 600,
        age_buckets: int = 5
    ) -> Summary:
        """
        创建摘要
        
        Args:
            name: 指标名称
            quantiles: 百分位数
            description: 指标描述
            labels: 标签列表
            max_age_seconds: 最大年龄
            age_buckets: 年龄桶数
            
        Returns:
            摘要对象
        """
        summary = Summary(
            name, quantiles, description, labels,
            max_age_seconds, age_buckets
        )
        return self.register(summary)

class MetricsExporter:
    """指标导出器"""
    
    def __init__(self, registry: MetricsRegistry):
        """
        初始化导出器
        
        Args:
            registry: 指标注册表
        """
        self.registry = registry
        self.exporters = {
            'prometheus': self._export_prometheus,
            'json': self._export_json
        }
    
    def _export_prometheus(self) -> str:
        """导出为Prometheus格式"""
        lines = []
        metric_values = self.registry.collect_all()
        
        for metric_value in metric_values:
            # 添加指标定义
            metadata = next(
                (m for m in self.registry.get_all_metadata()
                 if m['name'] == metric_value.name),
                None
            )
            
            if metadata:
                lines.append(f"# HELP {metric_value.name} {metadata['description']}")
                lines.append(f"# TYPE {metric_value.name} {metadata['type']}")
            
            # 添加指标值
            label_str = ""
            if metric_value.labels:
                label_pairs = [
                    f'{k}="{v}"'
                    for k, v in metric_value.labels.items()
                ]
                label_str = "{" + ",".join(label_pairs) + "}"
            
            lines.append(f"{metric_value.name}{label_str} {metric_value.value} {int(metric_value.timestamp * 1000)}")
        
        return "\n".join(lines)
    
    def _export_json(self) -> str:
        """导出为JSON格式"""
        metric_values = self.registry.collect_all()
        return json.dumps([mv.__dict__ for mv in metric_values], indent=2, default=str)
    
    def export(self, format: str = 'prometheus') -> str:
        """
        导出指标
        
        Args:
            format: 导出格式
            
        Returns:
            导出的字符串
        """
        exporter = self.exporters.get(format.lower())
        if not exporter:
            raise ValueError(f"Unknown export format: {format}")
        
        return exporter()

class MetricsStorage:
    """指标存储"""
    
    def __init__(self, db_path: str = "metrics.db"):
        """
        初始化存储
        
        Args:
            db_path: 数据库路径
        """
        self.db_path = db_path
        self._init_database()
    
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建指标数据表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                type TEXT NOT NULL,
                value REAL NOT NULL,
                timestamp REAL NOT NULL,
                labels TEXT,
                metadata TEXT
            )
        """)
        
        # 创建索引
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_metric_name 
            ON metrics(name)
        """)
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_metric_timestamp 
            ON metrics(timestamp)
        """)
        
        conn.commit()
        conn.close()
    
    def store(self, metric_value: MetricValue):
        """
        存储指标值
        
        Args:
            metric_value: 指标值
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO metrics (name, type, value, timestamp, labels, metadata)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            metric_value.name,
            metric_value.type.value,
            metric_value.value,
            metric_value.timestamp,
            json.dumps(metric_value.labels),
            json.dumps(metric_value.metadata)
        ))
        
        conn.commit()
        conn.close()
    
    def query(
        self,
        name: Optional[str] = None,
        start_time: Optional[float] = None,
        end_time: Optional[float] = None,
        limit: int = 1000
    ) -> List[Dict[str, Any]]:
        """
        查询指标值
        
        Args:
            name: 指标名称
            start_time: 开始时间
            end_time: 结束时间
            limit: 限制数量
            
        Returns:
            查询结果列表
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        query = "SELECT * FROM metrics WHERE 1=1"
        params = []
        
        if name:
            query += " AND name = ?"
            params.append(name)
        
        if start_time:
            query += " AND timestamp >= ?"
            params.append(start_time)
        
        if end_time:
            query += " AND timestamp <= ?"
            params.append(end_time)
        
        query += " ORDER BY timestamp DESC LIMIT ?"
        params.append(limit)
        
        cursor.execute(query, params)
        results = cursor.fetchall()
        
        conn.close()
        
        return [
            {
                'id': row[0],
                'name': row[1],
                'type': row[2],
                'value': row[3],
                'timestamp': row[4],
                'labels': json.loads(row[5]) if row[5] else {},
                'metadata': json.loads(row[6]) if row[6] else {}
            }
            for row in results
        ]

# 全局指标注册表
registry = MetricsRegistry()

# 使用示例
def demonstrate_metrics_system():
    """演示指标系统的使用"""
    print("=== 指标采集系统演示 ===\n")
    
    # 创建各种指标
    request_counter = registry.create_counter(
        "http_requests_total",
        "Total HTTP requests",
        labels=["method", "endpoint", "status"]
    )
    
    response_time = registry.create_histogram(
        "http_request_duration_seconds",
        "HTTP request duration",
        buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0],
        labels=["method", "endpoint"]
    )
    
    active_connections = registry.create_gauge(
        "active_connections",
        "Active connections",
        labels=["service"]
    )
    
    task_completion = registry.create_summary(
        "task_completion_duration",
        "Task completion duration",
        quantiles=[0.5, 0.9, 0.95, 0.99],
        labels=["task_type"]
    )
    
    print("模拟指标采集:")
    print("-" * 80)
    
    # 模拟一些请求
    for i in range(10):
        # 增加请求计数
        status = "200" if i % 4 != 0 else "500"
        request_counter.inc(method="GET", endpoint="/api/agent", status=status)
        
        # 记录响应时间
        duration = 0.1 + (i % 5) * 0.2
        response_time.observe(duration, method="GET", endpoint="/api/agent")
        
        # 更新活跃连接
        active_connections.set(i + 1, service="agent_service")
        
        # 记录任务完成时间
        task_duration = 1.0 + (i % 3) * 0.5
        task_completion.observe(task_duration, task_type="code_generation")
        
        print(f"{i+1}. 记录指标: 请求计数={i+1}, 响应时间={duration:.2f}s, "
              f"活跃连接={i+1}, 任务耗时={task_duration:.2f}s")
    
    # 收集和显示指标
    print(f"\n{'='*80}")
    print("收集的指标:")
    print("="*80)
    
    exporter = MetricsExporter(registry)
    
    print("\nPrometheus格式:")
    print(exporter.export('prometheus'))
    
    # 存储到数据库
    print(f"\n{'='*80}")
    print("存储指标到数据库:")
    print("="*80)
    
    storage = MetricsStorage(db_path=":memory:")
    metric_values = registry.collect_all()
    
    for mv in metric_values:
        storage.store(mv)
        print(f"存储指标: {mv.name} = {mv.value}")
    
    # 查询指标
    print(f"\n{'='*80}")
    print("查询历史指标:")
    print("="*80)
    
    results = storage.query(name="http_request_duration_seconds", limit=5)
    for result in results:
        print(f"时间: {datetime.fromtimestamp(result['timestamp'])}, "
              f"值: {result['value']:.4f}, 标签: {result['labels']}")

if __name__ == "__main__":
    demonstrate_metrics_system()

分布式追踪系统实现

下面是一个完整的分布式追踪系统实现,支持链路追踪、性能分析和问题定位:

import uuid
import time
import threading
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from collections import defaultdict
from enum import Enum
import json
import sqlite3
from datetime import datetime
import functools

class SpanKind(Enum):
    """Span类型"""
    SERVER = "server"
    CLIENT = "client"
    PRODUCER = "producer"
    CONSUMER = "consumer"
    INTERNAL = "internal"

class SpanStatus(Enum):
    """Span状态"""
    UNSET = "unset"
    OK = "ok"
    ERROR = "error"

@dataclass
class Span:
    """追踪Span"""
    trace_id: str
    span_id: str
    parent_span_id: Optional[str] = None
    name: str = ""
    kind: SpanKind = SpanKind.INTERNAL
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    status: SpanStatus = SpanStatus.UNSET
    status_message: str = ""
    attributes: Dict[str, Any] = field(default_factory=dict)
    events: List[Dict[str, Any]] = field(default_factory=list)
    links: List[Dict[str, Any]] = field(default_factory=list)
    
    def duration(self) -> float:
        """计算持续时间"""
        if self.end_time is None:
            return time.time() - self.start_time
        return self.end_time - self.start_time
    
    def add_event(self, name: str, attributes: Optional[Dict[str, Any]] = None):
        """
        添加事件
        
        Args:
            name: 事件名称
            attributes: 事件属性
        """
        event = {
            'name': name,
            'timestamp': time.time(),
            'attributes': attributes or {}
        }
        self.events.append(event)
    
    def set_attribute(self, key: str, value: Any):
        """
        设置属性
        
        Args:
            key: 属性键
            value: 属性值
        """
        self.attributes[key] = value
    
    def set_status(self, status: SpanStatus, message: str = ""):
        """
        设置状态
        
        Args:
            status: 状态
            message: 状态消息
        """
        self.status = status
        self.status_message = message
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            'trace_id': self.trace_id,
            'span_id': self.span_id,
            'parent_span_id': self.parent_span_id,
            'name': self.name,
            'kind': self.kind.value,
            'start_time': self.start_time,
            'end_time': self.end_time,
            'duration': self.duration(),
            'status': self.status.value,
            'status_message': self.status_message,
            'attributes': self.attributes,
            'events': self.events,
            'links': self.links
        }

class Tracer:
    """追踪器"""
    
    def __init__(
        self,
        service_name: str,
        tracer_name: str = "agent_tracer"
    ):
        """
        初始化追踪器
        
        Args:
            service_name: 服务名称
            tracer_name: 追踪器名称
        """
        self.service_name = service_name
        self.tracer_name = tracer_name
        self._current_span: Optional[Span] = None
        self._active_spans: Dict[str, Span] = {}
        self._lock = threading.Lock()
    
    def start_span(
        self,
        name: str,
        parent_span_id: Optional[str] = None,
        kind: SpanKind = SpanKind.INTERNAL,
        attributes: Optional[Dict[str, Any]] = None
    ) -> Span:
        """
        开始Span
        
        Args:
            name: Span名称
            parent_span_id: 父Span ID
            kind: Span类型
            attributes: 属性
            
        Returns:
            Span对象
        """
        span_id = str(uuid.uuid4())
        
        # 如果没有指定父Span,使用当前Span
        if parent_span_id is None and self._current_span is not None:
            parent_span_id = self._current_span.span_id
            trace_id = self._current_span.trace_id
        else:
            trace_id = str(uuid.uuid4())
        
        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_span_id=parent_span_id,
            name=name,
            kind=kind,
            attributes=attributes or {}
        )
        
        # 添加服务名称属性
        span.set_attribute('service.name', self.service_name)
        span.set_attribute('tracer.name', self.tracer_name)
        
        with self._lock:
            self._active_spans[span_id] = span
            self._current_span = span
        
        return span
    
    def end_span(
        self,
        span_id: str,
        status: SpanStatus = SpanStatus.OK,
        status_message: str = ""
    ):
        """
        结束Span
        
        Args:
            span_id: Span ID
            status: 状态
            status_message: 状态消息
        """
        with self._lock:
            span = self._active_spans.get(span_id)
            if span:
                span.end_time = time.time()
                span.set_status(status, status_message)
                
                # 如果是当前Span,清理
                if self._current_span and self._current_span.span_id == span_id:
                    self._current_span = span.parent_span_id
                
                del self._active_spans[span_id]
    
    def get_current_span(self) -> Optional[Span]:
        """获取当前Span"""
        with self._lock:
            return self._current_span
    
    def trace_function(
        self,
        name: Optional[str] = None,
        kind: SpanKind = SpanKind.INTERNAL
    ):
        """
        函数追踪装饰器
        
        Args:
            name: Span名称
            kind: Span类型
            
        Returns:
            装饰器函数
        """
        def decorator(func: Callable) -> Callable:
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                span_name = name or func.__name__
                span = self.start_span(span_name, kind=kind)
                
                try:
                    result = func(*args, **kwargs)
                    self.end_span(span.span_id, SpanStatus.OK)
                    return result
                except Exception as e:
                    self.end_span(span.span_id, SpanStatus.ERROR, str(e))
                    raise
            
            return wrapper
        return decorator
    
    def trace_async_function(
        self,
        name: Optional[str] = None,
        kind: SpanKind = SpanKind.INTERNAL
    ):
        """
        异步函数追踪装饰器
        
        Args:
            name: Span名称
            kind: Span类型
            
        Returns:
            装饰器函数
        """
        def decorator(func: Callable) -> Callable:
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                span_name = name or func.__name__
                span = self.start_span(span_name, kind=kind)
                
                try:
                    result = await func(*args, **kwargs)
                    self.end_span(span.span_id, SpanStatus.OK)
                    return result
                except Exception as e:
                    self.end_span(span.span_id, SpanStatus.ERROR, str(e))
                    raise
            
            return wrapper
        return decorator

class TraceStorage:
    """追踪存储"""
    
    def __init__(self, db_path: str = "traces.db"):
        """
        初始化存储
        
        Args:
            db_path: 数据库路径
        """
        self.db_path = db_path
        self._init_database()
    
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建Span表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS spans (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                trace_id TEXT NOT NULL,
                span_id TEXT NOT NULL,
                parent_span_id TEXT,
                name TEXT NOT NULL,
                kind TEXT NOT NULL,
                start_time REAL NOT NULL,
                end_time REAL,
                duration REAL,
                status TEXT NOT NULL,
                status_message TEXT,
                attributes TEXT,
                events TEXT,
                links TEXT
            )
        """)
        
        # 创建索引
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_trace_id 
            ON spans(trace_id)
        """)
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_span_id 
            ON spans(span_id)
        """)
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_start_time 
            ON spans(start_time)
        """)
        
        conn.commit()
        conn.close()
    
    def store_span(self, span: Span):
        """
        存储Span
        
        Args:
            span: Span对象
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO spans (
                trace_id, span_id, parent_span_id, name, kind,
                start_time, end_time, duration, status, status_message,
                attributes, events, links
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            span.trace_id,
            span.span_id,
            span.parent_span_id,
            span.name,
            span.kind.value,
            span.start_time,
            span.end_time,
            span.duration(),
            span.status.value,
            span.status_message,
            json.dumps(span.attributes),
            json.dumps(span.events),
            json.dumps(span.links)
        ))
        
        conn.commit()
        conn.close()
    
    def get_trace(self, trace_id: str) -> List[Dict[str, Any]]:
        """
        获取追踪
        
        Args:
            trace_id: 追踪ID
            
        Returns:
            Span列表
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT * FROM spans WHERE trace_id = ? ORDER BY start_time
        """, (trace_id,))
        
        results = cursor.fetchall()
        conn.close()
        
        return [
            {
                'id': row[0],
                'trace_id': row[1],
                'span_id': row[2],
                'parent_span_id': row[3],
                'name': row[4],
                'kind': row[5],
                'start_time': row[6],
                'end_time': row[7],
                'duration': row[8],
                'status': row[9],
                'status_message': row[10],
                'attributes': json.loads(row[11]) if row[11] else {},
                'events': json.loads(row[12]) if row[12] else [],
                'links': json.loads(row[13]) if row[13] else []
            }
            for row in results
        ]
    
    def query_spans(
        self,
        trace_id: Optional[str] = None,
        name: Optional[str] = None,
        status: Optional[str] = None,
        start_time: Optional[float] = None,
        end_time: Optional[float] = None,
        limit: int = 100
    ) -> List[Dict[str, Any]]:
        """
        查询Span
        
        Args:
            trace_id: 追踪ID
            name: Span名称
            status: 状态
            start_time: 开始时间
            end_time: 结束时间
            limit: 限制数量
            
        Returns:
            Span列表
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        query = "SELECT * FROM spans WHERE 1=1"
        params = []
        
        if trace_id:
            query += " AND trace_id = ?"
            params.append(trace_id)
        
        if name:
            query += " AND name = ?"
            params.append(name)
        
        if status:
            query += " AND status = ?"
            params.append(status)
        
        if start_time:
            query += " AND start_time >= ?"
            params.append(start_time)
        
        if end_time:
            query += " AND start_time <= ?"
            params.append(end_time)
        
        query += " ORDER BY start_time DESC LIMIT ?"
        params.append(limit)
        
        cursor.execute(query, params)
        results = cursor.fetchall()
        conn.close()
        
        return [
            {
                'id': row[0],
                'trace_id': row[1],
                'span_id': row[2],
                'parent_span_id': row[3],
                'name': row[4],
                'kind': row[5],
                'start_time': row[6],
                'end_time': row[7],
                'duration': row[8],
                'status': row[9],
                'status_message': row[10],
                'attributes': json.loads(row[11]) if row[11] else {},
                'events': json.loads(row[12]) if row[12] else [],
                'links': json.loads(row[13]) if row[13] else []
            }
            for row in results
        ]

class TracingAnalyzer:
    """追踪分析器"""
    
    def __init__(self, storage: TraceStorage):
        """
        初始化分析器
        
        Args:
            storage: 追踪存储
        """
        self.storage = storage
    
    def analyze_trace_performance(self, trace_id: str) -> Dict[str, Any]:
        """
        分析追踪性能
        
        Args:
            trace_id: 追踪ID
            
        Returns:
            性能分析结果
        """
        spans = self.storage.get_trace(trace_id)
        
        if not spans:
            return {'error': 'Trace not found'}
        
        # 计算总持续时间
        total_duration = max(span['end_time'] for span in spans if span['end_time']) - \
                         min(span['start_time'] for span in spans)
        
        # 分析各个Span的耗时
        span_durations = [
            (span['name'], span['duration'])
            for span in spans if span['duration'] is not None
        ]
        
        # 找出耗时最长的Span
        slowest_spans = sorted(
            [(name, duration) for name, duration in span_durations if duration],
            key=lambda x: x[1],
            reverse=True
        )[:5]
        
        # 计算各类型Span的平均耗时
        span_type_duration = defaultdict(list)
        for span in spans:
            if span['duration']:
                span_type_duration[span['name']].append(span['duration'])
        
        avg_durations = {
            name: sum(durations) / len(durations)
            for name, durations in span_type_duration.items()
        }
        
        # 统计错误
        error_spans = [
            span for span in spans
            if span['status'] == 'error'
        ]
        
        return {
            'trace_id': trace_id,
            'total_duration': total_duration,
            'total_spans': len(spans),
            'slowest_spans': slowest_spans,
            'avg_durations': avg_durations,
            'error_count': len(error_spans),
            'error_spans': [
                {
                    'name': span['name'],
                    'message': span['status_message']
                }
                for span in error_spans
            ]
        }
    
    def find_bottlenecks(
        self,
        time_window: int = 3600,
        threshold_percentile: float = 95
    ) -> List[Dict[str, Any]]:
        """
        查找性能瓶颈
        
        Args:
            time_window: 时间窗口(秒)
            threshold_percentile: 阈值百分位数
            
        Returns:
            瓶颈列表
        """
        end_time = time.time()
        start_time = end_time - time_window
        
        # 获取时间窗口内的所有Span
        all_spans = self.storage.query_spans(
            start_time=start_time,
            end_time=end_time
        )
        
        if not all_spans:
            return []
        
        # 按Span名称分组
        span_groups = defaultdict(list)
        for span in all_spans:
            if span['duration']:
                span_groups[span['name']].append(span['duration'])
        
        # 计算每个Span的统计信息
        bottlenecks = []
        for name, durations in span_groups.items():
            if len(durations) < 5:  # 样本太少,跳过
                continue
            
            durations.sort()
            count = len(durations)
            
            p95 = durations[int(count * 0.95)] if count > 0 else 0
            p99 = durations[int(count * 0.99)] if count > 0 else 0
            avg = sum(durations) / count
            
            # 如果P95超过阈值,认为是瓶颈
            if p95 > threshold_percentile:
                bottlenecks.append({
                    'span_name': name,
                    'count': count,
                    'avg_duration': avg,
                    'p95_duration': p95,
                    'p99_duration': p99,
                    'max_duration': max(durations)
                })
        
        # 按P95排序
        bottlenecks.sort(key=lambda x: x['p95_duration'], reverse=True)
        
        return bottlenecks[:10]

# 全局追踪器
tracer = Tracer(service_name="agent_service")

# 使用示例
def demonstrate_tracing_system():
    """演示追踪系统的使用"""
    print("=== 分布式追踪系统演示 ===\n")
    
    # 初始化组件
    trace_storage = TraceStorage(db_path=":memory:")
    analyzer = TracingAnalyzer(trace_storage)
    
    # 模拟Agent处理流程
    @tracer.trace_function(name="process_request", kind=SpanKind.SERVER)
    def process_request(user_input: str) -> str:
        """处理用户请求"""
        # 记录输入
        current_span = tracer.get_current_span()
        if current_span:
            current_span.set_attribute("input.length", len(user_input))
        
        # 模拟LLM调用
        @tracer.trace_function(name="llm_inference", kind=SpanKind.CLIENT)
        def llm_call(prompt: str) -> str:
            # 模拟API调用
            time.sleep(0.1)
            return f"Response to: {prompt[:20]}..."
        
        response = llm_call(user_input)
        
        # 模拟后处理
        @tracer.trace_function(name="post_process", kind=SpanKind.INTERNAL)
        def post_process(result: str) -> str:
            time.sleep(0.05)
            return result.upper()
        
        return post_process(response)
    
    print("模拟请求处理流程:")
    print("-" * 80)
    
    # 处理多个请求
    for i in range(5):
        input_text = f"这是第{i+1}个用户输入,包含一些测试内容"
        print(f"\n{i+1}. 处理请求: {input_text[:30]}...")
        
        try:
            result = process_request(input_text)
            print(f"   结果: {result[:30]}...")
        except Exception as e:
            print(f"   错误: {e}")
        
        # 存储Span
        current_span = tracer.get_current_span()
        if current_span:
            trace_storage.store_span(current_span)
    
    # 分析追踪性能
    print(f"\n{'='*80}")
    print("追踪性能分析:")
    print("="*80)
    
    # 获取最近的追踪
    recent_spans = trace_storage.query_spans(limit=10)
    if recent_spans:
        trace_id = recent_spans[0]['trace_id']
        analysis = analyzer.analyze_trace_performance(trace_id)
        
        print(f"\n追踪ID: {trace_id}")
        print(f"总持续时间: {analysis['total_duration']:.3f}s")
        print(f"Span数量: {analysis['total_spans']}")
        print(f"错误数量: {analysis['error_count']}")
        
        print(f"\n最慢的Span:")
        for name, duration in analysis['slowest_spans']:
            print(f"  {name}: {duration:.3f}s")
        
        print(f"\n平均耗时:")
        for name, avg_duration in analysis['avg_durations'].items():
            print(f"  {name}: {avg_duration:.3f}s")
    
    # 查找性能瓶颈
    print(f"\n{'='*80}")
    print("性能瓶颈分析:")
    print("="*80)
    
    bottlenecks = analyzer.find_bottlenecks(time_window=3600, threshold_percentile=0.05)
    if bottlenecks:
        print(f"\n发现 {len(bottlenecks)} 个潜在瓶颈:")
        for i, bottleneck in enumerate(bottlenecks, 1):
            print(f"\n{i}. {bottleneck['span_name']}:")
            print(f"   调用次数: {bottleneck['count']}")
            print(f"   平均耗时: {bottleneck['avg_duration']:.3f}s")
            print(f"   P95耗时: {bottleneck['p95_duration']:.3f}s")
            print(f"   P99耗时: {bottleneck['p99_duration']:.3f}s")
            print(f"   最大耗时: {bottleneck['max_duration']:.3f}s")
    else:
        print("\n未发现明显的性能瓶颈")

if __name__ == "__main__":
    demonstrate_tracing_system()

实时调试工具实现

下面是一个实时调试工具的实现,支持交互式调试、状态监控和问题诊断:

import json
import time
import threading
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from collections import defaultdict, deque
from enum import Enum
import queue
import socket
import select

class DebugLevel(Enum):
    """调试级别"""
    DEBUG = "debug"
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class DebugEvent:
    """调试事件"""
    timestamp: float
    level: DebugLevel
    source: str
    message: str
    data: Dict[str, Any] = field(default_factory=dict)
    tags: List[str] = field(default_factory=list)

class DebugSession:
    """调试会话"""
    
    def __init__(self, session_id: str):
        """
        初始化调试会话
        
        Args:
            session_id: 会话ID
        """
        self.session_id = session_id
        self.created_at = time.time()
        self.events: List[DebugEvent] = []
        self.breakpoints: Dict[str, bool] = {}
        self.variables: Dict[str, Any] = {}
        self.state: Dict[str, Any] = {}
        self.enabled = True
    
    def add_event(self, event: DebugEvent):
        """
        添加事件
        
        Args:
            event: 调试事件
        """
        if self.enabled:
            self.events.append(event)
            # 限制事件数量
            if len(self.events) > 1000:
                self.events = self.events[-1000:]
    
    def set_breakpoint(self, location: str, enabled: bool = True):
        """
        设置断点
        
        Args:
            location: 断点位置
            enabled: 是否启用
        """
        self.breakpoints[location] = enabled
    
    def check_breakpoint(self, location: str) -> bool:
        """
        检查断点
        
        Args:
            location: 断点位置
            
        Returns:
            是否触发断点
        """
        return self.breakpoints.get(location, False)
    
    def set_variable(self, name: str, value: Any):
        """
        设置变量
        
        Args:
            name: 变量名
            value: 变量值
        """
        self.variables[name] = value
    
    def get_variable(self, name: str) -> Optional[Any]:
        """
        获取变量
        
        Args:
            name: 变量名
            
        Returns:
            变量值或None
        """
        return self.variables.get(name)
    
    def update_state(self, state: Dict[str, Any]):
        """
        更新状态
        
        Args:
            state: 状态字典
        """
        self.state.update(state)
    
    def get_events(
        self,
        level: Optional[DebugLevel] = None,
        source: Optional[str] = None,
        limit: int = 100
    ) -> List[DebugEvent]:
        """
        获取事件
        
        Args:
            level: 调试级别
            source: 来源
            limit: 限制数量
            
        Returns:
            事件列表
        """
        events = self.events
        
        if level:
            events = [e for e in events if e.level == level]
        
        if source:
            events = [e for e in events if e.source == source]
        
        return events[-limit:]

class Debugger:
    """调试器"""
    
    def __init__(
        self,
        host: str = "localhost",
        port: int = 9999,
        max_sessions: int = 10
    ):
        """
        初始化调试器
        
        Args:
            host: 主机地址
            port: 端口
            max_sessions: 最大会话数
        """
        self.host = host
        self.port = port
        self.max_sessions = max_sessions
        self.sessions: Dict[str, DebugSession] = {}
        self.active_session: Optional[DebugSession] = None
        self.event_queue: queue.Queue = queue.Queue()
        self.running = False
        self.server_thread: Optional[threading.Thread] = None
        self.command_handlers: Dict[str, Callable] = {}
        self._register_default_handlers()
    
    def _register_default_handlers(self):
        """注册默认命令处理器"""
        self.command_handlers = {
            'start_session': self._handle_start_session,
            'stop_session': self._handle_stop_session,
            'add_event': self._handle_add_event,
            'set_breakpoint': self._handle_set_breakpoint,
            'get_events': self._handle_get_events,
            'get_state': self._handle_get_state,
            'set_variable': self._handle_set_variable,
            'get_variables': self._handle_get_variables,
            'enable_debugging': self._handle_enable_debugging,
            'disable_debugging': self._handle_disable_debugging,
        }
    
    def start(self):
        """启动调试服务器"""
        if self.running:
            return
        
        self.running = True
        self.server_thread = threading.Thread(target=self._server_loop, daemon=True)
        self.server_thread.start()
        print(f"调试服务器启动: {self.host}:{self.port}")
    
    def stop(self):
        """停止调试服务器"""
        self.running = False
        if self.server_thread:
            self.server_thread.join(timeout=5)
        print("调试服务器已停止")
    
    def _server_loop(self):
        """服务器主循环"""
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind((self.host, self.port))
        server_socket.listen(5)
        server_socket.settimeout(1)  # 设置超时以便检查running标志
        
        while self.running:
            try:
                # 使用select检查是否有连接
                readable, _, _ = select.select([server_socket], [], [], 1)
                
                if readable:
                    client_socket, address = server_socket.accept()
                    threading.Thread(
                        target=self._handle_client,
                        args=(client_socket, address),
                        daemon=True
                    ).start()
            
            except socket.timeout:
                continue
            except Exception as e:
                print(f"服务器错误: {e}")
                if self.running:
                    time.sleep(1)
        
        server_socket.close()
    
    def _handle_client(self, client_socket: socket.socket, address: tuple):
        """
        处理客户端连接
        
        Args:
            client_socket: 客户端套接字
            address: 客户端地址
        """
        try:
            while self.running:
                # 接收数据
                data = client_socket.recv(4096)
                if not data:
                    break
                
                try:
                    command = json.loads(data.decode('utf-8'))
                    response = self._process_command(command)
                    client_socket.send(json.dumps(response).encode('utf-8'))
                except json.JSONDecodeError:
                    error_response = {'error': 'Invalid JSON'}
                    client_socket.send(json.dumps(error_response).encode('utf-8'))
        
        except Exception as e:
            print(f"客户端处理错误 {address}: {e}")
        finally:
            client_socket.close()
    
    def _process_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理命令
        
        Args:
            command: 命令字典
            
        Returns:
            响应字典
        """
        command_type = command.get('command')
        handler = self.command_handlers.get(command_type)
        
        if not handler:
            return {'error': f'Unknown command: {command_type}'}
        
        try:
            return handler(command)
        except Exception as e:
            return {'error': str(e)}
    
    def _handle_start_session(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理开始会话命令"""
        session_id = command.get('session_id', str(uuid.uuid4()))
        
        if len(self.sessions) >= self.max_sessions:
            return {'error': 'Maximum sessions reached'}
        
        session = DebugSession(session_id)
        self.sessions[session_id] = session
        self.active_session = session
        
        return {
            'session_id': session_id,
            'status': 'started'
        }
    
    def _handle_stop_session(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理停止会话命令"""
        session_id = command.get('session_id')
        
        if session_id in self.sessions:
            del self.sessions[session_id]
            
            if self.active_session and self.active_session.session_id == session_id:
                self.active_session = None
            
            return {'status': 'stopped'}
        else:
            return {'error': 'Session not found'}
    
    def _handle_add_event(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理添加事件命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        event = DebugEvent(
            timestamp=command.get('timestamp', time.time()),
            level=DebugLevel(command.get('level', 'info')),
            source=command.get('source', 'unknown'),
            message=command.get('message', ''),
            data=command.get('data', {}),
            tags=command.get('tags', [])
        )
        
        self.active_session.add_event(event)
        return {'status': 'added'}
    
    def _handle_set_breakpoint(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理设置断点命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        location = command.get('location')
        enabled = command.get('enabled', True)
        
        self.active_session.set_breakpoint(location, enabled)
        return {'status': 'set'}
    
    def _handle_get_events(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理获取事件命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        level_str = command.get('level')
        level = DebugLevel(level_str) if level_str else None
        source = command.get('source')
        limit = command.get('limit', 100)
        
        events = self.active_session.get_events(level, source, limit)
        
        return {
            'events': [
                {
                    'timestamp': event.timestamp,
                    'level': event.level.value,
                    'source': event.source,
                    'message': event.message,
                    'data': event.data,
                    'tags': event.tags
                }
                for event in events
            ]
        }
    
    def _handle_get_state(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理获取状态命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        return {
            'state': self.active_session.state,
            'breakpoints': self.active_session.breakpoints
        }
    
    def _handle_set_variable(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理设置变量命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        name = command.get('name')
        value = command.get('value')
        
        self.active_session.set_variable(name, value)
        return {'status': 'set'}
    
    def _handle_get_variables(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理获取变量命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        return {'variables': self.active_session.variables}
    
    def _handle_enable_debugging(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理启用调试命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        self.active_session.enabled = True
        return {'status': 'enabled'}
    
    def _handle_disable_debugging(self, command: Dict[str, Any]) -> Dict[str, Any]:
        """处理禁用调试命令"""
        if not self.active_session:
            return {'error': 'No active session'}
        
        self.active_session.enabled = False
        return {'status': 'disabled'}
    
    # 便捷方法
    def log(
        self,
        level: DebugLevel,
        message: str,
        source: str = "agent",
        data: Optional[Dict[str, Any]] = None,
        tags: Optional[List[str]] = None
    ):
        """
        记录日志
        
        Args:
            level: 调试级别
            message: 日志消息
            source: 来源
            data: 附加数据
            tags: 标签
        """
        if self.active_session and self.active_session.enabled:
            event = DebugEvent(
                timestamp=time.time(),
                level=level,
                source=source,
                message=message,
                data=data or {},
                tags=tags or []
            )
            self.active_session.add_event(event)
    
    def checkpoint(self, location: str):
        """
        检查断点
        
        Args:
            location: 断点位置
            
        Returns:
            是否触发断点
        """
        if self.active_session and self.active_session.enabled:
            return self.active_session.check_breakpoint(location)
        return False
    
    def update_state(self, state: Dict[str, Any]):
        """
        更新状态
        
        Args:
            state: 状态字典
        """
        if self.active_session:
            self.active_session.update_state(state)

# 全局调试器实例
debugger = Debugger()

# 使用示例
def demonstrate_debugging_tool():
    """演示调试工具的使用"""
    print("=== 实时调试工具演示 ===\n")
    
    # 启动调试服务器
    debugger.start()
    
    # 模拟Agent处理流程
    def process_agent_request(user_input: str):
        """处理Agent请求"""
        # 记录开始
        debugger.log(DebugLevel.INFO, "开始处理请求", "agent_processor", {
            'input_length': len(user_input)
        })
        
        # 检查断点
        if debugger.checkpoint("agent_processor.start"):
            debugger.log(DebugLevel.DEBUG, "断点触发: agent_processor.start", "agent_processor")
        
        # 模拟处理步骤
        try:
            # 步骤1: 预处理
            debugger.log(DebugLevel.DEBUG, "执行预处理", "preprocessor")
            processed_input = user_input.strip().lower()
            
            # 检查断点
            if debugger.checkpoint("agent_processor.after_preprocessing"):
                debugger.log(DebugLevel.DEBUG, "断点触发: after_preprocessing", "agent_processor", {
                    'processed_input': processed_input
                })
            
            # 步骤2: LLM推理
            debugger.log(DebugLevel.INFO, "执行LLM推理", "llm_client", {
                'input_length': len(processed_input)
            })
            
            # 模拟LLM调用
            time.sleep(0.1)
            response = f"AI回复: {processed_input[:20]}..."
            
            debugger.log(DebugLevel.INFO, "LLM推理完成", "llm_client", {
                'output_length': len(response)
            })
            
            # 步骤3: 后处理
            debugger.log(DebugLevel.DEBUG, "执行后处理", "postprocessor")
            final_response = response.upper()
            
            debugger.update_state({
                'total_processing_time': 0.1,
                'input_tokens': len(processed_input) // 4,
                'output_tokens': len(response) // 4
            })
            
            debugger.log(DebugLevel.INFO, "处理完成", "agent_processor", {
                'output_length': len(final_response)
            })
            
            return final_response
            
        except Exception as e:
            debugger.log(DebugLevel.ERROR, f"处理失败: {str(e)}", "agent_processor", {
                'error_type': type(e).__name__
            })
            raise
    
    # 模拟客户端启动调试会话
    print("模拟调试会话:")
    print("-" * 80)
    
    # 这里模拟客户端命令
    commands = [
        {'command': 'start_session', 'session_id': 'test_session_1'},
        {'command': 'set_breakpoint', 'location': 'agent_processor.start'},
        {'command': 'add_event', 'level': 'info', 'source': 'test_client', 'message': '调试会话已启动'},
    ]
    
    for cmd in commands:
        response = debugger._process_command(cmd)
        print(f"命令: {cmd['command']}")
        print(f"响应: {response}\n")
    
    # 处理一些请求
    print("模拟Agent请求处理:")
    print("-" * 80)
    
    test_inputs = [
        "这是一个测试输入",
        "另一个测试输入,内容更长一些",
        "第三个测试输入,包含更多内容用于测试"
    ]
    
    for i, test_input in enumerate(test_inputs, 1):
        print(f"\n{i.} 处理请求: {test_input}")
        try:
            result = process_agent_request(test_input)
            print(f"   结果: {result[:30]}...")
        except Exception as e:
            print(f"   错误: {e}")
    
    # 获取调试信息
    print(f"\n{'='*80}")
    print("调试信息:")
    print("="*80)
    
    # 获取状态
    state_response = debugger._process_command({'command': 'get_state'})
    print(f"\n系统状态: {state_response.get('state', {})}")
    print(f"断点设置: {state_response.get('breakpoints', {})}")
    
    # 获取事件
    events_response = debugger._process_command({
        'command': 'get_events',
        'limit': 20
    })
    
    print(f"\n调试事件 (最近{len(events_response.get('events', []))}条):")
    for event in events_response.get('events', []):
        timestamp = datetime.fromtimestamp(event['timestamp']).strftime('%H:%M:%S')
        print(f"  [{timestamp}] [{event['level'].upper()}] {event['source']}: {event['message']}")
        if event.get('data'):
            print(f"    数据: {event['data']}")
    
    # 停止调试服务器
    print(f"\n{'='*80}")
    debugger.stop()

if __name__ == "__main__":
    demonstrate_debugging_tool()

最佳实践与常见陷阱

监控系统最佳实践

分层监控策略:采用分层监控策略,从基础设施到业务逻辑全面覆盖。基础设施监控关注资源使用和系统健康,应用监控关注性能和可用性,业务监控关注用户体验和业务结果。分层监控确保问题能够被及时发现和定位。

关键指标识别:识别和监控关键指标,避免监控过多指标造成信息过载。关键指标应该与业务目标直接相关,能够反映系统的核心健康状况和性能表现。定期review关键指标,确保其与业务目标保持一致。

告警阈值设置:设置合理的告警阈值,平衡告警灵敏度和误报率。阈值设置应该基于历史数据和业务需求,采用渐进式告警策略。对于重要指标,可以设置多个阈值的分级告警。

监控数据保留策略:制定合理的监控数据保留策略,平衡存储成本和分析需求。高频指标可以保留较短的时期,低频指标可以保留较长的时期。定期归档历史数据,确保存储效率。

分布式追踪最佳实践

合理设置追踪范围:合理设置追踪的范围,避免过度追踪影响性能。追踪应该覆盖关键的请求路径和业务流程,但不需要追踪所有操作。可以根据业务重要性动态调整追踪策略。

标准化Span命名:使用标准化的Span命名约定,便于理解和分析。Span名称应该清晰描述操作内容,避免过于技术化或过于抽象。命名约定应该在整个系统内保持一致。

合理设置属性:为Span设置合理的属性,提供有用的上下文信息。属性应该包括请求ID、用户ID、错误信息等关键数据。避免设置过多属性影响性能。

采样策略:采用合适的采样策略,平衡追踪完整性和系统开销。对于高流量系统,可以使用概率采样;对于关键请求,可以使用全量采样。采样策略应该根据系统特点和业务需求进行调整。

调试工具最佳实践

调试级别合理设置:合理设置调试级别,避免产生过多调试信息。生产环境通常只记录WARNING及以上级别的日志,开发环境可以记录DEBUG级别。调试级别应该根据环境和需求动态调整。

调试信息脱敏:对敏感信息进行脱敏处理,避免泄露用户隐私。调试信息中不应包含密码、令牌、个人身份信息等敏感数据。脱敏策略应该全面且一致。

调试会话管理:合理管理调试会话,避免内存泄漏和资源浪费。调试会话应该有明确的生命周期管理,长时间不活动的会话应该被自动清理。会话数量应该有合理的限制。

调试权限控制:实现严格的调试权限控制,防止未经授权的调试操作。调试接口应该有身份验证和授权机制,操作应该有审计日志。

常见陷阱及解决方案

监控数据爆炸:监控数据过多导致存储和处理成本过高。解决方案包括:优化指标采集策略、采用数据采样、设置合理的数据保留周期、使用压缩和聚合技术。

告警疲劳:过多的告警导致运维人员忽视真正的问题。解决方案包括:优化告警阈值、实现告警聚合和抑制、建立告警优先级、设置合理的告警静默期。

追踪性能影响:分布式追踪对系统性能造成显著影响。解决方案包括:采用采样策略、优化追踪实现、异步处理追踪数据、选择性追踪关键路径。

调试信息泄露:调试信息包含敏感数据导致安全问题。解决方案包括:实现信息脱敏、设置调试权限、审计调试操作、加密存储调试数据。

性能优化考虑

监控系统性能优化

监控系统的性能对被监控系统的影响应该最小化。优化策略包括:

异步数据采集:采用异步方式采集监控数据,避免阻塞主业务流程。异步采集可以使用线程池、协程等技术实现,确保监控操作不会影响业务性能。

批量数据处理:将多个监控数据批量处理,减少IO操作次数。批量处理可以显著提升数据写入效率,特别是在高频率采集场景下。

内存优化:优化监控系统的内存使用,避免内存泄漏和过度消耗。内存优化包括:限制数据缓存大小、使用高效的数据结构、及时清理无用数据。

网络优化:优化监控数据的网络传输,减少网络开销。网络优化包括:数据压缩、批量传输、使用更高效的序列化格式。

调试工具性能优化

调试工具的性能对开发效率至关重要。优化策略包括:

惰性加载:采用惰性加载策略,只在需要时加载调试信息。惰性加载可以减少内存占用和初始化时间。

增量更新:使用增量更新机制,只更新变化的数据。增量更新可以减少数据传输和处理开销。

查询优化:优化调试数据的查询性能,提供快速的检索能力。查询优化包括:建立合适的索引、使用缓存、优化查询逻辑。

资源限制:设置合理的资源限制,防止调试工具消耗过多资源。资源限制包括:内存限制、CPU限制、并发限制。

参考资源

官方文档

技术论文和文章

  • "Distributed Tracing: A Survey": 分布式追踪技术的综述论文,涵盖了各种追踪算法和实现方案
  • "Observability in Modern Software Systems": 现代软件系统可观测性的研究文章,讨论了监控、日志、追踪的最佳实践
  • "Debugging Distributed Systems": 分布式系统调试的技术文章,提供了实用的调试方法和工具

开源工具和库

实战案例

  • Large-Scale Monitoring Architecture: 大规模监控架构的实战案例,分享了千万级指标的监控经验
  • Distributed Tracing in Production: 生产环境分布式追踪的实践指南,包含了各种场景的追踪方案
  • Debugging Microservices: 微服务调试的实战经验,提供了复杂系统调试的方法论和工具

通过本文的学习,读者应该能够掌握Agent系统监控与调试的核心技术,能够在实际项目中构建可观测性强、易于调试的Agent系统。下一篇文章将深入探讨Agent错误处理与容错机制的技术细节。