异常检测 Agent:模式识别与告警
异常检测 Agent 能够自动学习正常数据模式、识别偏离常态的异常行为、配置智能告警规则并分析根本原因,是监控系统的智能化核心。
异常检测 Agent:模式识别与告警
在现代分布式系统中,异常无处不在:服务器 CPU 飙升、数据库连接池耗尽、API 响应时间突增、订单量异常下降、用户行为模式突变。传统的监控告警系统依赖人工配置阈值,不仅维护成本高,还容易产生大量误报和漏报。
异常检测 Agent 的核心价值在于将异常识别智能化:它能够从历史数据中自动学习正常模式,采用多种算法检测异常,配置自适应的告警规则,并在发现异常时进行根本原因分析。这不仅降低了运维成本,还提高了问题发现的及时性和准确性。
核心概念与架构设计
异常检测的多维分类
异常检测 Agent 需要处理多种类型的异常,不同类型的异常需要不同的检测策略:
1. 点异常 (Point Anomalies)
- 单个数据点显著偏离正常范围
- 示例:某个时间点的 CPU 使用率达到 100%
- 检测方法:统计方法、阈值检测
2. 上下文异常 (Contextual Anomalies)
- 在特定上下文中才表现出异常性
- 示例:夜间 3 点的流量突然达到白天峰值
- 检测方法:时间序列分析、上下文感知检测
3. 集体异常 (Collective Anomalies)
- 一组数据点整体偏离正常模式
- 示例:连续 10 分钟的错误率缓慢上升
- 检测方法:序列模式分析、变化点检测
4. 关联异常 (Correlation Anomalies)
- 多个指标之间的关联关系发生改变
- 示例:流量增加但响应时间不变(违反正常关系)
- 检测方法:相关性分析、图神经网络
模式学习与基线建立
异常检测的基础是建立准确的正常模式基线。Agent 需要从历史数据中学习以下模式:
时间模式学习:识别指标在不同时间尺度上的变化规律,包括日内模式(如白天高、夜间低)、周内模式(如工作日高、周末低)、季节模式(如电商节庆高峰)。
周期模式学习:识别重复出现的周期性变化,如数据采集周期、任务执行周期、业务活动周期。
趋势模式学习:识别长期趋势和短期波动,建立趋势基线,区分正常波动和异常变化。
关联模式学习:学习多个指标之间的相关性和因果关系,建立多指标联合检测模型。
告警规则引擎
智能告警规则引擎需要支持多种类型的规则配置:
关键技术实现
异常检测引擎核心框架
下面实现一个完整的异常检测引擎,涵盖数据采集、模式学习、异常检测和告警生成:
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import json
from collections import deque
import warnings
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AnomalyType(Enum):
POINT = "point" # 点异常
CONTEXTUAL = "contextual" # 上下文异常
COLLECTIVE = "collective" # 集体异常
CORRELATION = "correlation" # 关联异常
class SeverityLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class MetricData:
"""指标数据点"""
metric_name: str
value: float
timestamp: datetime
labels: Dict[str, str] = field(default_factory=dict)
@dataclass
class Anomaly:
"""异常记录"""
metric_name: str
anomaly_type: AnomalyType
severity: SeverityLevel
value: float
expected_value: float
anomaly_score: float
timestamp: datetime
description: str
root_cause: Optional[str] = None
related_metrics: List[str] = field(default_factory=list)
@dataclass
class BaselineModel:
"""基线模型"""
metric_name: str
model_type: str
model_params: Dict[str, Any]
training_data_range: Tuple[datetime, datetime]
last_updated: datetime
performance_metrics: Dict[str, float]
@dataclass
class AlertRule:
"""告警规则"""
rule_id: str
metric_name: str
condition: str # ">", "<", "==", "!=", "trend", "pattern"
threshold: Optional[float] = None
time_window: Optional[int] = None # 时间窗口(秒)
severity: SeverityLevel = SeverityLevel.WARNING
cooldown: int = 300 # 冷却时间(秒)
enabled: bool = True
@dataclass
class Alert:
"""告警记录"""
alert_id: str
rule_id: str
metric_name: str
severity: SeverityLevel
message: str
timestamp: datetime
anomaly: Optional[Anomaly] = None
acknowledged: bool = False
resolved: bool = False
class StatisticalDetector:
"""基于统计的异常检测器"""
def __init__(self, window_size: int = 100, std_threshold: float = 3.0):
self.window_size = window_size
self.std_threshold = std_threshold
self.history: Dict[str, deque] = {}
def detect_anomaly(
self,
metric_name: str,
value: float,
timestamp: datetime
) -> Tuple[bool, float, float]:
"""使用 Z-score 检测异常"""
if metric_name not in self.history:
self.history[metric_name] = deque(maxlen=self.window_size)
history = self.history[metric_name]
# 如果历史数据不足,不进行检测
if len(history) < self.window_size // 2:
history.append((value, timestamp))
return False, 0.0, value
# 计算统计量
values = [v for v, _ in history]
mean = np.mean(values)
std = np.std(values)
if std == 0:
history.append((value, timestamp))
return False, 0.0, value
# 计算 Z-score
z_score = abs((value - mean) / std)
is_anomaly = z_score > self.std_threshold
# 更新历史
history.append((value, timestamp))
return is_anomaly, z_score, mean
def detect_trend_anomaly(
self,
metric_name: str,
value: float,
timestamp: datetime,
trend_threshold: float = 0.5
) -> Tuple[bool, float]:
"""检测趋势异常"""
if metric_name not in self.history:
self.history[metric_name] = deque(maxlen=self.window_size)
history = self.history[metric_name]
if len(history) < 10:
history.append((value, timestamp))
return False, 0.0
values = np.array([v for v, _ in history])
# 计算趋势斜率
x = np.arange(len(values))
slope, intercept = np.polyfit(x, values, 1)
# 预测当前值
predicted_value = slope * len(values) + intercept
residual = abs(value - predicted_value)
# 计算残差的相对比例
if predicted_value != 0:
residual_ratio = residual / abs(predicted_value)
else:
residual_ratio = residual
is_anomaly = residual_ratio > trend_threshold
history.append((value, timestamp))
return is_anomaly, residual_ratio
class PatternDetector:
"""模式检测器"""
def __init__(self, pattern_window: int = 7):
self.pattern_window = pattern_window
self.patterns: Dict[str, Dict[str, Any]] = {}
def learn_pattern(self, metric_name: str, data: pd.DataFrame):
"""学习模式"""
if len(data) < self.pattern_window * 2:
return
# 提取时间特征
data['hour'] = data['timestamp'].dt.hour
data['day_of_week'] = data['timestamp'].dt.dayofweek
# 计算小时级模式
hourly_pattern = data.groupby('hour')['value'].agg(['mean', 'std'])
# 计算星期级模式
daily_pattern = data.groupby('day_of_week')['value'].agg(['mean', 'std'])
self.patterns[metric_name] = {
'hourly': hourly_pattern,
'daily': daily_pattern,
'overall_mean': data['value'].mean(),
'overall_std': data['value'].std(),
'training_range': (data['timestamp'].min(), data['timestamp'].max())
}
def detect_pattern_anomaly(
self,
metric_name: str,
value: float,
timestamp: datetime
) -> Tuple[bool, float, str]:
"""检测模式异常"""
if metric_name not in self.patterns:
return False, 0.0, "Pattern not learned"
pattern = self.patterns[metric_name]
hour = timestamp.hour
day_of_week = timestamp.dayofweek
expected_std = None
# 检查小时级模式
if hour in pattern['hourly'].index:
expected_mean = pattern['hourly'].loc[hour, 'mean']
expected_std = pattern['hourly'].loc[hour, 'std']
if expected_std > 0:
deviation = abs(value - expected_mean) / expected_std
is_anomaly = deviation > 2.0 # 2个标准差
return is_anomaly, deviation, f"Hourly pattern deviation: {deviation:.2f} std"
# 检查星期级模式
if day_of_week in pattern['daily'].index:
expected_mean = pattern['daily'].loc[day_of_week, 'mean']
expected_std = pattern['daily'].loc[day_of_week, 'std']
if expected_std > 0:
deviation = abs(value - expected_mean) / expected_std
is_anomaly = deviation > 2.0
return is_anomaly, deviation, f"Daily pattern deviation: {deviation:.2f} std"
return False, 0.0, "No pattern anomaly detected"
class MLBasedDetector:
"""基于机器学习的异常检测器"""
def __init__(self, contamination: float = 0.1, n_estimators: int = 100):
self.contamination = contamination
self.n_estimators = n_estimators
self.models: Dict[str, IsolationForest] = {}
self.scalers: Dict[str, StandardScaler] = {}
self.feature_windows: Dict[str, int] = {}
def train_model(
self,
metric_name: str,
data: pd.DataFrame,
feature_window: int = 10
):
"""训练模型"""
if len(data) < feature_window * 10:
logger.warning(f"Insufficient data for training {metric_name}")
return
# 创建特征
features = self._create_features(data, feature_window)
# 标准化
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
# 训练模型
model = IsolationForest(
contamination=self.contamination,
n_estimators=self.n_estimators,
random_state=42
)
model.fit(features_scaled)
self.models[metric_name] = model
self.scalers[metric_name] = scaler
self.feature_windows[metric_name] = feature_window
logger.info(f"Trained anomaly detection model for {metric_name}")
def _create_features(self, data: pd.DataFrame, window: int) -> np.ndarray:
"""创建特征"""
features = []
values = data['value'].values
for i in range(window, len(values)):
# 窗口内的统计特征
window_data = values[i-window:i]
feature_row = [
np.mean(window_data),
np.std(window_data),
np.min(window_data),
np.max(window_data),
np.percentile(window_data, 25),
np.percentile(window_data, 75),
# 当前值
values[i],
# 相对变化
(values[i] - values[i-1]) / (abs(values[i-1]) + 1e-6),
# 窗口内趋势
np.polyfit(range(window), window_data, 1)[0]
]
features.append(feature_row)
return np.array(features)
def detect_anomaly(
self,
metric_name: str,
value: float,
timestamp: datetime
) -> Tuple[bool, float]:
"""检测异常"""
if metric_name not in self.models:
return False, 0.0
# 需要维护滑动窗口的历史数据
# 这里简化处理,实际应用中需要更完整的实现
return False, 0.0
class CorrelationDetector:
"""关联异常检测器"""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.correlations: Dict[Tuple[str, str], float] = {}
self.history: Dict[str, deque] = {}
def learn_correlations(self, data: Dict[str, pd.DataFrame]):
"""学习指标间的相关性"""
metric_names = list(data.keys())
for i, metric1 in enumerate(metric_names):
for metric2 in metric_names[i+1:]:
# 对齐时间
merged = pd.merge(
data[metric1],
data[metric2],
on='timestamp',
how='inner',
suffixes=('_x', '_y')
)
if len(merged) > 30: # 需要足够的数据点
correlation = merged[['value_x', 'value_y']].corr().iloc[0, 1]
if abs(correlation) > 0.5: # 只保存强相关
self.correlations[(metric1, metric2)] = correlation
logger.info(f"Learned {len(self.correlations)} correlations")
def detect_correlation_anomaly(
self,
metric1: str,
value1: float,
metric2: str,
value2: float,
timestamp: datetime
) -> Tuple[bool, float]:
"""检测关联异常"""
# 查找相关对
correlation = self.correlations.get((metric1, metric2))
if correlation is None:
correlation = self.correlations.get((metric2, metric1))
if correlation is None:
return False, 0.0
# 维护历史窗口
for metric in [metric1, metric2]:
if metric not in self.history:
self.history[metric] = deque(maxlen=self.window_size)
# 检查当前值是否符合预期关系
# 这里使用简化的方法,实际可以使用更复杂的相关性检验
return False, 0.0
class AlertRuleEngine:
"""告警规则引擎"""
def __init__(self):
self.rules: List[AlertRule] = []
self.alert_history: Dict[str, datetime] = {}
def add_rule(self, rule: AlertRule):
"""添加规则"""
self.rules.append(rule)
def evaluate_rules(
self,
metric_name: str,
value: float,
timestamp: datetime,
anomalies: List[Anomaly]
) -> List[Alert]:
"""评估规则"""
alerts = []
for rule in self.rules:
if not rule.enabled or rule.metric_name != metric_name:
continue
# 检查冷却时间
last_alert_time = self.alert_history.get(rule.rule_id)
if last_alert_time:
cooldown_passed = (timestamp - last_alert_time).total_seconds() > rule.cooldown
if not cooldown_passed:
continue
# 评估条件
should_alert = self._evaluate_condition(rule, value, timestamp, anomalies)
if should_alert:
alert = Alert(
alert_id=f"alert_{rule.rule_id}_{int(timestamp.timestamp())}",
rule_id=rule.rule_id,
metric_name=metric_name,
severity=rule.severity,
message=self._generate_alert_message(rule, value, anomalies),
timestamp=timestamp
)
alerts.append(alert)
self.alert_history[rule.rule_id] = timestamp
return alerts
def _evaluate_condition(
self,
rule: AlertRule,
value: float,
timestamp: datetime,
anomalies: List[Anomaly]
) -> bool:
"""评估单个规则条件"""
if rule.condition == ">":
return value > rule.threshold
elif rule.condition == "<":
return value < rule.threshold
elif rule.condition == ">=":
return value >= rule.threshold
elif rule.condition == "<=":
return value <= rule.threshold
elif rule.condition == "==":
return value == rule.threshold
elif rule.condition == "trend":
# 检查是否有趋势异常
return any(a.anomaly_type == AnomalyType.COLLECTIVE for a in anomalies)
elif rule.condition == "pattern":
# 检查是否有模式异常
return any(a.anomaly_type == AnomalyType.CONTEXTUAL for a in anomalies)
return False
def _generate_alert_message(
self,
rule: AlertRule,
value: float,
anomalies: List[Anomaly]
) -> str:
"""生成告警消息"""
if anomalies:
anomaly = anomalies[0]
message = (
f"检测到异常: {rule.metric_name} = {value:.2f}, "
f"期望值 = {anomaly.expected_value:.2f}, "
f"异常评分 = {anomaly.anomaly_score:.2f}. "
f"{anomaly.description}"
)
else:
message = (
f"触发告警规则 {rule.rule_id}: "
f"{rule.metric_name} = {value:.2f} "
f"{rule.condition} {rule.threshold}"
)
return message
class RootCauseAnalyzer:
"""根本原因分析器"""
def __init__(self):
self.causal_graph: Dict[str, List[str]] = {}
self.causal_weights: Dict[Tuple[str, str], float] = {}
def build_causal_graph(self, historical_data: Dict[str, pd.DataFrame]):
"""构建因果图"""
# 简化实现:使用格兰杰因果检验
# 实际应用中可以使用更复杂的因果发现算法
metric_names = list(historical_data.keys())
for cause in metric_names:
for effect in metric_names:
if cause != effect:
# 检查因果关系(简化)
correlation = historical_data[cause]['value'].corr(
historical_data[effect]['value']
)
if abs(correlation) > 0.7:
if cause not in self.causal_graph:
self.causal_graph[cause] = []
self.causal_graph[cause].append(effect)
self.causal_weights[(cause, effect)] = correlation
logger.info(f"Built causal graph with {len(self.causal_graph)} nodes")
def analyze_root_cause(
self,
anomaly: Anomaly,
all_anomalies: List[Anomaly],
metric_data: Dict[str, List[MetricData]]
) -> Optional[str]:
"""分析根本原因"""
# 1. 检查上游指标是否有异常
if anomaly.metric_name in self.causal_graph:
# 找到影响当前指标的上游指标
upstream_metrics = []
for cause, effects in self.causal_graph.items():
if anomaly.metric_name in effects:
upstream_metrics.append(cause)
# 检查上游指标的异常情况
for upstream in upstream_metrics:
upstream_anomalies = [
a for a in all_anomalies
if a.metric_name == upstream and
a.timestamp <= anomaly.timestamp and
(anomaly.timestamp - a.timestamp).total_seconds() < 300 # 5分钟内
]
if upstream_anomalies:
return f"上游指标 {upstream} 异常导致 {anomaly.metric_name} 异常"
# 2. 检查同层指标的异常
same_layer_anomalies = [
a for a in all_anomalies
if a.metric_name != anomaly.metric_name and
abs((a.timestamp - anomaly.timestamp).total_seconds()) < 60 # 1分钟内
]
if same_layer_anomalies:
related_metrics = [a.metric_name for a in same_layer_anomalies]
return f"多个指标同时异常: {', '.join(related_metrics)}"
# 3. 检查系统级事件(简化)
# 实际应用中应该集成事件系统
return "未确定根本原因,建议人工排查"
class AnomalyDetectionAgent:
"""异常检测 Agent 主类"""
def __init__(self):
self.statistical_detector = StatisticalDetector()
self.pattern_detector = PatternDetector()
self.ml_detector = MLBasedDetector()
self.correlation_detector = CorrelationDetector()
self.rule_engine = AlertRuleEngine()
self.root_cause_analyzer = RootCauseAnalyzer()
self.baseline_models: Dict[str, BaselineModel] = {}
self.metric_history: Dict[str, List[MetricData]] = {}
def train_baseline(
self,
metric_name: str,
data: pd.DataFrame,
training_days: int = 7
):
"""训练基线模型"""
if len(data) < 100:
logger.warning(f"Insufficient data for baseline training: {metric_name}")
return
# 训练统计检测器
for _, row in data.iterrows():
self.statistical_detector.detect_anomaly(
metric_name, row['value'], row['timestamp']
)
# 训练模式检测器
self.pattern_detector.learn_pattern(metric_name, data)
# 训练机器学习模型
self.ml_detector.train_model(metric_name, data)
# 保存基线模型
self.baseline_models[metric_name] = BaselineModel(
metric_name=metric_name,
model_type="ensemble",
model_params={
"statistical": self.statistical_detector.std_threshold,
"pattern_window": self.pattern_detector.pattern_window
},
training_data_range=(data['timestamp'].min(), data['timestamp'].max()),
last_updated=datetime.now(),
performance_metrics={"accuracy": 0.95} # 简化
)
logger.info(f"Baseline model trained for {metric_name}")
def process_metric(
self,
metric_data: MetricData
) -> List[Anomaly]:
"""处理单个指标数据点"""
anomalies = []
# 保存历史数据
if metric_data.metric_name not in self.metric_history:
self.metric_history[metric_data.metric_name] = []
self.metric_history[metric_data.metric_name].append(metric_data)
# 保持历史数据大小
if len(self.metric_history[metric_data.metric_name]) > 1000:
self.metric_history[metric_data.metric_name] = \
self.metric_history[metric_data.metric_name][-1000:]
# 统计检测
is_anomaly, z_score, expected_value = self.statistical_detector.detect_anomaly(
metric_data.metric_name,
metric_data.value,
metric_data.timestamp
)
if is_anomaly:
anomaly = Anomaly(
metric_name=metric_data.metric_name,
anomaly_type=AnomalyType.POINT,
severity=SeverityLevel.WARNING if z_score < 4 else SeverityLevel.CRITICAL,
value=metric_data.value,
expected_value=expected_value,
anomaly_score=z_score,
timestamp=metric_data.timestamp,
description=f"值偏离期望值 {z_score:.2f} 个标准差"
)
anomalies.append(anomaly)
# 趋势检测
is_trend_anomaly, trend_score = self.statistical_detector.detect_trend_anomaly(
metric_data.metric_name,
metric_data.value,
metric_data.timestamp
)
if is_trend_anomaly:
anomaly = Anomaly(
metric_name=metric_data.metric_name,
anomaly_type=AnomalyType.COLLECTIVE,
severity=SeverityLevel.WARNING,
value=metric_data.value,
expected_value=expected_value,
anomaly_score=trend_score,
timestamp=metric_data.timestamp,
description=f"趋势异常,偏离预测 {trend_score:.2%}"
)
anomalies.append(anomaly)
# 模式检测
is_pattern_anomaly, pattern_score, pattern_desc = self.pattern_detector.detect_pattern_anomaly(
metric_data.metric_name,
metric_data.value,
metric_data.timestamp
)
if is_pattern_anomaly:
anomaly = Anomaly(
metric_name=metric_data.metric_name,
anomaly_type=AnomalyType.CONTEXTUAL,
severity=SeverityLevel.WARNING,
value=metric_data.value,
expected_value=0.0, # 模式检测的期望值需要更复杂的计算
anomaly_score=pattern_score,
timestamp=metric_data.timestamp,
description=pattern_desc
)
anomalies.append(anomaly)
return anomalies
def evaluate_and_alert(
self,
metric_data: MetricData
) -> Tuple[List[Anomaly], List[Alert]]:
"""评估指标并生成告警"""
# 检测异常
anomalies = self.process_metric(metric_data)
# 评估告警规则
alerts = self.rule_engine.evaluate_rules(
metric_data.metric_name,
metric_data.value,
metric_data.timestamp,
anomalies
)
# 根本原因分析
if anomalies:
all_anomalies = []
for metric_name, history in self.metric_history.items():
all_anomalies.extend([a for a in history if hasattr(a, 'anomaly_type')])
for anomaly in anomalies:
root_cause = self.root_cause_analyzer.analyze_root_cause(
anomaly,
all_anomalies,
self.metric_history
)
anomaly.root_cause = root_cause
return anomalies, alerts
def add_alert_rule(self, rule: AlertRule):
"""添加告警规则"""
self.rule_engine.add_rule(rule)
def get_anomaly_statistics(
self,
metric_name: str,
time_range: Tuple[datetime, datetime]
) -> Dict[str, Any]:
"""获取异常统计"""
history = self.metric_history.get(metric_name, [])
# 筛选时间范围内的数据
filtered = [
data for data in history
if time_range[0] <= data.timestamp <= time_range[1]
]
# 提取异常数据
anomalies = [
data for data in filtered
if isinstance(data, Anomaly)
]
return {
"total_data_points": len(filtered),
"anomaly_count": len(anomalies),
"anomaly_rate": len(anomalies) / len(filtered) if filtered else 0,
"severity_distribution": {
severity.value: sum(1 for a in anomalies if a.severity == severity)
for severity in SeverityLevel
},
"type_distribution": {
anomaly_type.value: sum(1 for a in anomalies if a.anomaly_type == anomaly_type)
for anomaly_type in AnomalyType
}
}
# 使用示例
if __name__ == "__main__":
import random
from datetime import datetime, timedelta
# 创建异常检测 Agent
agent = AnomalyDetectionAgent()
# 生成训练数据
def generate_training_data(days: int = 7) -> pd.DataFrame:
"""生成训练数据"""
base_time = datetime.now() - timedelta(days=days)
data = []
for i in range(days * 24 * 60): # 每分钟一个数据点
timestamp = base_time + timedelta(minutes=i)
hour = timestamp.hour
# 模拟日内模式:白天高,夜间低
if 9 <= hour <= 18:
base_value = 100 + random.gauss(0, 10)
else:
base_value = 50 + random.gauss(0, 5)
# 添加随机噪声
value = base_value + random.gauss(0, 8)
data.append({
'timestamp': timestamp,
'value': max(0, value) # 确保非负
})
return pd.DataFrame(data)
# 训练基线模型
print("=== 训练基线模型 ===")
training_data = generate_training_data(7)
agent.train_baseline("cpu_usage", training_data)
# 添加告警规则
agent.add_alert_rule(AlertRule(
rule_id="cpu_high",
metric_name="cpu_usage",
condition=">",
threshold=150.0,
severity=SeverityLevel.CRITICAL
))
# 模拟实时数据流
print("\n=== 模拟实时检测 ===")
base_time = datetime.now()
for i in range(20):
timestamp = base_time + timedelta(seconds=i * 10)
hour = timestamp.hour
# 正常数据
if 9 <= hour <= 18:
value = 100 + random.gauss(0, 10)
else:
value = 50 + random.gauss(0, 5)
# 插入异常数据
if i == 10:
value = 200 # 异常值
elif i == 15:
value = 180 # 异常值
# 处理数据
metric_data = MetricData(
metric_name="cpu_usage",
value=value,
timestamp=timestamp
)
anomalies, alerts = agent.evaluate_and_alert(metric_data)
print(f"\n时间: {timestamp}, 值: {value:.2f}")
if anomalies:
print(f" 检测到 {len(anomalies)} 个异常:")
for anomaly in anomalies:
print(f" - 类型: {anomaly.anomaly_type.value}, "
f"严重性: {anomaly.severity.value}, "
f"评分: {anomaly.anomaly_score:.2f}")
if alerts:
print(f" 生成 {len(alerts)} 个告警:")
for alert in alerts:
print(f" - {alert.message}")
# 获取异常统计
print("\n=== 异常统计 ===")
end_time = datetime.now()
start_time = end_time - timedelta(minutes=5)
stats = agent.get_anomaly_statistics("cpu_usage", (start_time, end_time))
print(f"数据点总数: {stats['total_data_points']}")
print(f"异常数量: {stats['anomaly_count']}")
print(f"异常率: {stats['anomaly_rate']:.2%}")
print("严重性分布:")
for severity, count in stats['severity_distribution'].items():
if count > 0:
print(f" {severity}: {count}")
这个实现展示了异常检测 Agent 的核心功能:
- 多种检测算法:统计方法、模式识别、机器学习、关联分析
- 智能告警规则:支持多种规则类型和冷却时间
- 根本原因分析:基于因果图的异常溯源
- 基线模型训练:自动学习正常数据模式
增强的时间序列异常检测
对于更复杂的时间序列数据,可以集成专业的时间序列异常检测算法:
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima.model import ARIMA
class TimeSeriesAnomalyDetector:
"""时间序列异常检测器"""
def __init__(self, seasonal_period: int = 24):
self.seasonal_period = seasonal_period
self.models: Dict[str, Dict[str, Any]] = {}
def decompose_series(
self,
metric_name: str,
data: pd.DataFrame
) -> Dict[str, Any]:
"""分解时间序列"""
if len(data) < self.seasonal_period * 2:
raise ValueError("数据不足以进行季节性分解")
# 设置时间索引
ts_data = data.set_index('timestamp')['value']
# 季节性分解
decomposition = seasonal_decompose(
ts_data,
period=self.seasonal_period,
model='additive'
)
# 保存分解结果
self.models[metric_name] = {
'trend': decomposition.trend,
'seasonal': decomposition.seasonal,
'residual': decomposition.resid,
'decomposition': decomposition
}
return {
'trend': decomposition.trend,
'seasonal': decomposition.seasonal,
'residual': decomposition.resid
}
def detect_residual_anomaly(
self,
metric_name: str,
value: float,
timestamp: datetime,
std_threshold: float = 3.0
) -> Tuple[bool, float, float]:
"""检测残差异常"""
if metric_name not in self.models:
return False, 0.0, value
model = self.models[metric_name]
residuals = model['residual'].dropna()
if len(residuals) < 10:
return False, 0.0, value
# 计算残差的统计特征
residual_mean = residuals.mean()
residual_std = residuals.std()
if residual_std == 0:
return False, 0.0, value
# 预测期望值(简化)
# 实际应该基于趋势和季节性预测
expected_value = residual_mean
# 计算残差异常分数
residual_anomaly = abs(value - expected_value) / residual_std
is_anomaly = residual_anomaly > std_threshold
return is_anomaly, residual_anomaly, expected_value
def train_arima_model(
self,
metric_name: str,
data: pd.DataFrame,
order: Tuple[int, int, int] = (1, 1, 1)
):
"""训练 ARIMA 模型"""
ts_data = data.set_index('timestamp')['value']
# 检查平稳性
adf_result = adfuller(ts_data.dropna())
# 训练模型
model = ARIMA(ts_data, order=order)
fitted_model = model.fit()
# 保存模型
self.models[metric_name] = {
'arima_model': fitted_model,
'order': order,
'adf_result': adf_result
}
logger.info(f"ARIMA model trained for {metric_name}")
def forecast_with_arima(
self,
metric_name: str,
steps: int = 1
) -> Tuple[float, float]:
"""使用 ARIMA 预测"""
if metric_name not in self.models:
raise ValueError(f"ARIMA model not found for {metric_name}")
model = self.models[metric_name]['arima_model']
forecast = model.forecast(steps=steps)
return forecast.iloc[0], model.resid.std() # 预测值和标准差
多指标联合异常检测
对于复杂系统,需要检测多个指标之间的联合异常:
class MultiMetricAnomalyDetector:
"""多指标异常检测器"""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.metric_windows: Dict[str, deque] = {}
self.correlation_matrix: Optional[np.ndarray] = None
self.metric_names: List[str] = []
def add_metrics(self, metric_data: Dict[str, float], timestamp: datetime):
"""添加多指标数据"""
if not self.metric_names:
self.metric_names = list(metric_data.keys())
for name in self.metric_names:
self.metric_windows[name] = deque(maxlen=self.window_size)
# 添加数据到窗口
for name, value in metric_data.items():
self.metric_windows[name].append((value, timestamp))
# 更新相关性矩阵
self._update_correlation_matrix()
def _update_correlation_matrix(self):
"""更新相关性矩阵"""
# 收集所有指标的当前窗口数据
data_matrix = []
min_length = min(len(window) for window in self.metric_windows.values())
if min_length < 10:
return
for name in self.metric_names:
values = [v for v, _ in list(self.metric_windows[name])[-min_length:]]
data_matrix.append(values)
# 计算相关性矩阵
data_array = np.array(data_matrix).T
self.correlation_matrix = np.corrcoef(data_array.T)
def detect_joint_anomaly(
self,
metric_data: Dict[str, float],
timestamp: datetime
) -> Tuple[bool, Dict[str, Any]]:
"""检测联合异常"""
if self.correlation_matrix is None:
return False, {"reason": "Insufficient data for correlation analysis"}
anomalies = []
# 检查每对指标的关联异常
for i, metric1 in enumerate(self.metric_names):
for j, metric2 in enumerate(self.metric_names[i+1:], i+1):
correlation = self.correlation_matrix[i, j]
if abs(correlation) > 0.7: # 强相关
# 获取历史数据
window1 = list(self.metric_windows[metric1])[-10:]
window2 = list(self.metric_windows[metric2])[-10:]
if len(window1) >= 2 and len(window2) >= 2:
# 计算当前的变化
change1 = metric_data[metric1] - window1[-1][0]
change2 = metric_data[metric2] - window2[-1][0]
# 检查变化方向是否符合相关性
if correlation > 0: # 正相关
should_same_direction = change1 * change2 > 0
else: # 负相关
should_same_direction = change1 * change2 < 0
if not should_same_direction:
anomalies.append({
"metric1": metric1,
"metric2": metric2,
"correlation": correlation,
"change1": change1,
"change2": change2,
"description": f"{metric1} 和 {metric2} 变化方向不符合相关性"
})
if anomalies:
return True, {
"type": "correlation_anomaly",
"anomalies": anomalies
}
return False, {"reason": "No joint anomaly detected"}
最佳实践与常见陷阱
生产环境最佳实践
1. 分级告警策略
不是所有异常都需要立即告警,应该根据严重程度采取不同的处理策略:
class TieredAlertingSystem:
"""分级告警系统"""
def __init__(self):
self.alert_handlers: Dict[SeverityLevel, List[Callable]] = {
SeverityLevel.INFO: [],
SeverityLevel.WARNING: [],
SeverityLevel.CRITICAL: []
}
def register_handler(
self,
severity: SeverityLevel,
handler: Callable[[Alert], None]
):
"""注册告警处理器"""
self.alert_handlers[severity].append(handler)
def handle_alert(self, alert: Alert):
"""处理告警"""
handlers = self.alert_handlers.get(alert.severity, [])
for handler in handlers:
try:
handler(alert)
except Exception as e:
logger.error(f"Alert handler failed: {str(e)}")
def default_critical_handler(self, alert: Alert):
"""严重告警处理"""
# 立即通知
self.send_immediate_notification(alert)
# 记录到问题追踪系统
self.create_ticket(alert)
# 触发自动修复流程
self.trigger_auto_remediation(alert)
def default_warning_handler(self, alert: Alert):
"""警告处理"""
# 批量通知
self.add_to_batch_notification(alert)
# 记录到监控面板
self.update_dashboard(alert)
def default_info_handler(self, alert: Alert):
"""信息处理"""
# 记录日志
logger.info(f"Info alert: {alert.message}")
# 存储用于后续分析
self.store_for_analysis(alert)
2. 告警聚合与去重
避免告警风暴,对相关告警进行聚合:
from collections import defaultdict
import hashlib
class AlertAggregator:
"""告警聚合器"""
def __init__(self, aggregation_window: int = 300): # 5分钟聚合窗口
self.aggregation_window = aggregation_window
self.alert_groups: Dict[str, List[Alert]] = defaultdict(list)
def aggregate_alert(self, alert: Alert) -> Optional[Alert]:
"""聚合告警"""
group_key = self._generate_group_key(alert)
# 添加到组
self.alert_groups[group_key].append(alert)
# 检查是否达到聚合条件
group_alerts = self.alert_groups[group_key]
if len(group_alerts) > 1:
# 生成聚合告警
aggregated_alert = self._create_aggregated_alert(group_alerts)
return aggregated_alert
return alert
def _generate_group_key(self, alert: Alert) -> str:
"""生成分组键"""
# 基于规则ID、指标名称、时间窗口生成键
time_bucket = int(alert.timestamp.timestamp() / self.aggregation_window)
key_str = f"{alert.rule_id}_{alert.metric_name}_{time_bucket}"
return hashlib.md5(key_str.encode()).hexdigest()
def _create_aggregated_alert(self, alerts: List[Alert]) -> Alert:
"""创建聚合告警"""
if not alerts:
raise ValueError("No alerts to aggregate")
# 按时间排序
sorted_alerts = sorted(alerts, key=lambda a: a.timestamp)
first_alert = sorted_alerts[0]
last_alert = sorted_alerts[-1]
# 确定聚合告警的严重性(最高)
max_severity = max(a.severity for a in alerts)
# 生成聚合消息
count = len(alerts)
duration = (last_alert.timestamp - first_alert.timestamp).total_seconds()
message = (
f"聚合告警: {first_alert.rule_id} 在过去 {duration:.0f} 秒内 "
f"触发了 {count} 次。"
f"首次: {first_alert.timestamp}, 最新: {last_alert.timestamp}"
)
return Alert(
alert_id=f"aggregated_{first_alert.alert_id}",
rule_id=first_alert.rule_id,
metric_name=first_alert.metric_name,
severity=max_severity,
message=message,
timestamp=last_alert.timestamp,
related_alerts=alerts
)
def cleanup_old_groups(self):
"""清理过期的告警组"""
now = datetime.now()
expired_keys = []
for key, alerts in self.alert_groups.items():
if not alerts:
continue
last_alert_time = max(a.timestamp for a in alerts)
if (now - last_alert_time).total_seconds() > self.aggregation_window * 2:
expired_keys.append(key)
for key in expired_keys:
del self.alert_groups[key]
3. 自适应阈值调整
根据历史数据自动调整告警阈值,减少误报:
class AdaptiveThresholdManager:
"""自适应阈值管理器"""
def __init__(self, learning_window: int = 7): # 学习窗口(天)
self.learning_window = learning_window
self.threshold_history: Dict[str, List[Dict[str, Any]]] = {}
def calculate_adaptive_threshold(
self,
metric_name: str,
historical_data: pd.DataFrame,
percentile: float = 95.0
) -> float:
"""计算自适应阈值"""
# 计算统计量
values = historical_data['value'].values
# 使用百分位数
threshold = np.percentile(values, percentile)
# 考虑时间因素
if 'timestamp' in historical_data.columns:
current_hour = datetime.now().hour
hourly_data = historical_data[
historical_data['timestamp'].dt.hour == current_hour
]
if len(hourly_data) > 10:
hourly_threshold = np.percentile(hourly_data['value'].values, percentile)
# 混合全局和小时级阈值
threshold = (threshold + hourly_threshold) / 2
# 保存阈值历史
if metric_name not in self.threshold_history:
self.threshold_history[metric_name] = []
self.threshold_history[metric_name].append({
'threshold': threshold,
'timestamp': datetime.now(),
'percentile': percentile
})
# 保持历史长度
if len(self.threshold_history[metric_name]) > 100:
self.threshold_history[metric_name] = \
self.threshold_history[metric_name][-100:]
return threshold
def get_threshold_trend(self, metric_name: str) -> Dict[str, Any]:
"""获取阈值趋势"""
if metric_name not in self.threshold_history:
return {"trend": "insufficient_data"}
history = self.threshold_history[metric_name]
if len(history) < 5:
return {"trend": "insufficient_data"}
# 计算趋势
recent_thresholds = [h['threshold'] for h in history[-5:]]
older_thresholds = [h['threshold'] for h in history[-10:-5]]
recent_avg = np.mean(recent_thresholds)
older_avg = np.mean(older_thresholds)
change_percent = ((recent_avg - older_avg) / older_avg * 100) if older_avg != 0 else 0
return {
"trend": "increasing" if change_percent > 5 else "decreasing" if change_percent < -5 else "stable",
"change_percent": change_percent,
"recent_avg": recent_avg,
"older_avg": older_avg
}
常见陷阱与解决方案
陷阱1: 误报率过高
问题:异常检测产生大量误报,导致告警疲劳。
解决方案:
- 调整检测算法的敏感度参数
- 实施告警聚合和去重
- 增加确认机制(需要连续多次异常才告警)
class ConfirmationBasedDetector:
"""基于确认的检测器"""
def __init__(self, confirmation_count: int = 3):
self.confirmation_count = confirmation_count
self.pending_alerts: Dict[str, List[Anomaly]] = defaultdict(list)
def confirm_anomaly(self, anomaly: Anomaly) -> bool:
"""确认异常"""
key = f"{anomaly.metric_name}_{anomaly.anomaly_type}"
self.pending_alerts[key].append(anomaly)
# 检查是否达到确认次数
if len(self.pending_alerts[key]) >= self.confirmation_count:
# 验证是否连续
recent_alerts = self.pending_alerts[key][-self.confirmation_count:]
is_continuous = all(
(recent_alerts[i].timestamp - recent_alerts[i-1].timestamp).total_seconds() < 60
for i in range(1, len(recent_alerts))
)
if is_continuous:
# 清除已确认的告警
self.pending_alerts[key] = []
return True
# 清理过期的待确认告警
self._cleanup_pending_alerts(key)
return False
def _cleanup_pending_alerts(self, key: str):
"""清理过期的待确认告警"""
now = datetime.now()
self.pending_alerts[key] = [
a for a in self.pending_alerts[key]
if (now - a.timestamp).total_seconds() < 300 # 5分钟内
]
陷阱2: 漏报关键异常
问题:检测算法对某些类型的异常不敏感,导致漏报。
解决方案:
- 使用多种检测算法的集成
- 基于业务规则的补充检测
- 人工反馈循环,持续优化模型
class EnsembleAnomalyDetector:
"""集成异常检测器"""
def __init__(self):
self.detectors: List[Callable] = []
self.weights: List[float] = []
def add_detector(self, detector: Callable, weight: float = 1.0):
"""添加检测器"""
self.detectors.append(detector)
self.weights.append(weight)
def detect_ensemble(
self,
metric_name: str,
value: float,
timestamp: datetime
) -> Tuple[bool, float, Dict[str, Any]]:
"""集成检测"""
results = []
for detector, weight in zip(self.detectors, self.weights):
try:
is_anomaly, score = detector(metric_name, value, timestamp)
results.append({
'detector': detector.__class__.__name__,
'is_anomaly': is_anomaly,
'score': score,
'weight': weight
})
except Exception as e:
logger.error(f"Detector {detector.__class__.__name__} failed: {str(e)}")
# 加权投票
total_weight = sum(self.weights)
weighted_sum = sum(
r['weight'] * r['score'] if r['is_anomaly'] else 0
for r in results
)
ensemble_score = weighted_sum / total_weight if total_weight > 0 else 0
is_anomaly = ensemble_score > 0.5 # 阈值可调
return is_anomaly, ensemble_score, {
'individual_results': results,
'ensemble_score': ensemble_score
}
陷阱3: 基线模型过时
问题:系统行为变化,旧的基线模型不再适用。
解决方案:
- 定期重新训练基线模型
- 实施模型性能监控
- 支持多版本基线模型并存
class BaselineModelManager:
"""基线模型管理器"""
def __init__(self):
self.models: Dict[str, List[BaselineModel]] = defaultdict(list)
self.model_performance: Dict[str, Dict[str, float]] = {}
def add_model(self, model: BaselineModel):
"""添加模型"""
self.models[model.metric_name].append(model)
# 保持最多3个版本
if len(self.models[model.metric_name]) > 3:
self.models[model.metric_name] = \
self.models[model.metric_name][-3:]
def get_best_model(self, metric_name: str) -> Optional[BaselineModel]:
"""获取最佳模型"""
if metric_name not in self.models:
return None
models = self.models[metric_name]
if not models:
return None
# 基于性能选择最佳模型
best_model = max(
models,
key=lambda m: m.performance_metrics.get('accuracy', 0.0)
)
return best_model
def evaluate_model_performance(
self,
metric_name: str,
recent_data: pd.DataFrame
) -> Dict[str, float]:
"""评估模型性能"""
model = self.get_best_model(metric_name)
if model is None:
return {}
# 简化的性能评估
# 实际应用中应该使用更复杂的评估方法
predicted = np.full(len(recent_data), recent_data['value'].mean())
actual = recent_data['value'].values
mae = np.mean(np.abs(actual - predicted))
rmse = np.sqrt(np.mean((actual - predicted) ** 2))
accuracy = 1.0 - (mae / np.mean(np.abs(actual))) if np.mean(np.abs(actual)) > 0 else 0.0
performance = {
'mae': mae,
'rmse': rmse,
'accuracy': accuracy,
'evaluated_at': datetime.now()
}
self.model_performance[metric_name] = performance
return performance
def should_retrain(self, metric_name: str, threshold: float = 0.8) -> bool:
"""判断是否需要重新训练"""
performance = self.model_performance.get(metric_name, {})
accuracy = performance.get('accuracy', 0.0)
return accuracy < threshold
性能优化考虑
实时检测性能优化
1. 增量计算
对于流式数据,采用增量计算减少计算开销:
class IncrementalStatistics:
"""增量统计计算"""
def __init__(self, window_size: int):
self.window_size = window_size
self.sum = 0.0
self.sum_sq = 0.0
self.count = 0
self.values: deque = deque(maxlen=window_size)
def update(self, value: float) -> Tuple[float, float, int]:
"""更新统计量"""
if len(self.values) == self.window_size:
# 移除旧值
old_value = self.values[0]
self.sum -= old_value
self.sum_sq -= old_value ** 2
self.count -= 1
# 添加新值
self.values.append(value)
self.sum += value
self.sum_sq += value ** 2
self.count += 1
# 计算均值和标准差
mean = self.sum / self.count if self.count > 0 else 0.0
variance = (self.sum_sq / self.count) - (mean ** 2) if self.count > 0 else 0.0
std = np.sqrt(max(0, variance))
return mean, std, self.count
2. 批量处理
对高频率数据,采用批量处理策略:
class BatchAnomalyProcessor:
"""批量异常处理器"""
def __init__(self, batch_size: int = 100, max_delay: float = 1.0):
self.batch_size = batch_size
self.max_delay = max_delay # 最大延迟(秒)
self.current_batch: List[MetricData] = []
self.last_flush = datetime.now()
def add_metric(self, metric_data: MetricData) -> Optional[List[Anomaly]]:
"""添加指标数据"""
self.current_batch.append(metric_data)
# 检查是否需要刷新批次
if self._should_flush():
return self.flush_batch()
return None
def _should_flush(self) -> bool:
"""判断是否应该刷新批次"""
# 达到批次大小或超过最大延迟
size_reached = len(self.current_batch) >= self.batch_size
timeout_reached = (datetime.now() - self.last_flush).total_seconds() > self.max_delay
return size_reached or timeout_reached
def flush_batch(self) -> Optional[List[Anomaly]]:
"""刷新批次"""
if not self.current_batch:
return None
batch = self.current_batch
self.current_batch = []
self.last_flush = datetime.now()
# 批量处理
anomalies = self._process_batch(batch)
return anomalies
def _process_batch(self, batch: List[MetricData]) -> List[Anomaly]:
"""处理批次"""
# 简化实现,实际应该使用向量化操作
anomalies = []
# 按指标名称分组
grouped: Dict[str, List[MetricData]] = defaultdict(list)
for metric_data in batch:
grouped[metric_data.metric_name].append(metric_data)
# 对每个指标进行异常检测
for metric_name, metric_list in grouped.items():
values = [md.value for md in metric_list]
mean = np.mean(values)
std = np.std(values)
for metric_data in metric_list:
if std > 0:
z_score = abs((metric_data.value - mean) / std)
if z_score > 3.0:
anomaly = Anomaly(
metric_name=metric_name,
anomaly_type=AnomalyType.POINT,
severity=SeverityLevel.WARNING,
value=metric_data.value,
expected_value=mean,
anomaly_score=z_score,
timestamp=metric_data.timestamp,
description=f"批量检测: Z-score = {z_score:.2f}"
)
anomalies.append(anomaly)
return anomalies
资源管理优化
1. 内存管理
class ResourceAwareDetector:
"""资源感知的检测器"""
def __init__(self, max_memory_mb: int = 512):
self.max_memory_mb = max_memory_mb
self.metric_history_size = 1000
self.monitoring_interval = 60 # 秒
def check_memory_usage(self) -> Tuple[int, str]:
"""检查内存使用"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
if memory_mb > self.max_memory_mb:
return memory_mb, "warning"
elif memory_mb > self.max_memory_mb * 0.8:
return memory_mb, "alert"
else:
return memory_mb, "normal"
def adapt_history_size(self, current_memory: float):
"""自适应调整历史数据大小"""
if current_memory > self.max_memory_mb * 0.9:
# 减少历史数据大小
self.metric_history_size = max(100, self.metric_history_size // 2)
elif current_memory < self.max_memory_mb * 0.5:
# 增加历史数据大小
self.metric_history_size = min(5000, self.metric_history_size * 2)
2. 并行检测
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class ParallelAnomalyDetector:
"""并行异常检测器"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.lock = threading.Lock()
def detect_anomalies_parallel(
self,
metrics: List[MetricData]
) -> List[Anomaly]:
"""并行检测异常"""
futures = []
all_anomalies = []
# 提交检测任务
for metric_data in metrics:
future = self.executor.submit(
self._detect_single_metric,
metric_data
)
futures.append(future)
# 收集结果
for future in as_completed(futures):
try:
anomalies = future.result()
with self.lock:
all_anomalies.extend(anomalies)
except Exception as e:
logger.error(f"Parallel detection failed: {str(e)}")
return all_anomalies
def _detect_single_metric(self, metric_data: MetricData) -> List[Anomaly]:
"""检测单个指标"""
# 这里调用实际的检测逻辑
return []
def shutdown(self):
"""关闭执行器"""
self.executor.shutdown(wait=True)
参考资源
官方文档与框架
- Scikit-learn: https://scikit-learn.org/ - 机器学习库,包含多种异常检测算法
- Statsmodels: https://www.statsmodels.org/ - 统计建模库,用于时间序列分析
- Prometheus: https://prometheus.io/ - 监控系统,提供告警规则引擎
- Grafana: https://grafana.com/ - 可视化平台,支持告警和异常检测
相关技术
- Isolation Forest: 基于隔离森林的异常检测算法
- Autoencoder: 用于异常检测的自编码器神经网络
- Time Series Decomposition: 时间序列分解方法
学术资源
- "Isolation Forest" - Liu, Fei Tony, et al. "Isolation forest." (2008)
- "LSTM-based Anomaly Detection for Time Series" - Malhotra, Pankaj, et al. (2016)
- "Deep Anomaly Detection" - Pang, Guansong, et al. (2021)
社区资源
- Awesome Anomaly Detection: https://github.com/yzhao062/anomaly-detection-resources - 异常检测资源集合
- PyOD: https://github.com/yzhao062/pyod - Python 异常检测库
- Azure Anomaly Detector: https://github.com/microsoft/Azure-Anomaly-Detector - 微软的异常检测服务
异常检测 Agent 是智能监控系统的核心,它将传统基于阈值的监控提升到基于模式学习的智能检测。随着技术的不断进步,异常检测将更加准确、自适应和可解释,为系统稳定性提供更可靠的保障。