可观测性平台与自动化运维
构建完整的可观测性平台,实现从指标监控到自动化故障诊断的智能运维体系
在现代复杂的分布式系统中,传统的监控手段已经无法满足运维需求。可观测性平台通过整合指标、日志、链路追踪三大支柱,结合自动化运维技术,实现了从被动响应到主动预测的转变。本文将深入探讨如何构建完整的可观测性平台和自动化运维体系。
可观测性三大支柱
可观测性架构设计
Rendering diagram...
指标收集架构
# Prometheus配置示例
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
cluster: 'production'
environment: 'us-west-2'
# 告警管理器配置
alerting:
alertmanagers:
- static_configs:
- targets:
- 'alertmanager:9093'
# 告警规则文件
rule_files:
- 'alerts/*.yml'
# 数据采集配置
scrape_configs:
# Prometheus自监控
- job_name: 'prometheus'
static_configs:
- targets: ["localhost:9090"]
# Linux节点监控
- job_name: 'node'
static_configs:
- targets: ["node-exporter:9100"]
relabel_configs:
- source_labels: [__address__]
target_label: instance
replacement: 'linux-server-01'
# 应用服务监控
- job_name: 'webapp'
static_configs:
- targets: ["webapp:8080"]
labels:
service: 'web-application'
team: 'backend'
metrics_path: '/actuator/prometheus'
scrape_interval: 5s
# Kubernetes监控
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
replacement: $1
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
replacement: $1
自定义Exporter实现
package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"runtime"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// 自定义应用指标
var (
// 业务指标
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "app_http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method","endpoint","status"},
)
httpRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "app_http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method","endpoint"},
)
activeConnections = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "app_active_connections",
Help: "Current number of active connections",
},
)
// 系统指标
processMemoryUsage = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "app_process_memory_usage_bytes",
Help: "Process memory usage in bytes",
},
[]string{"type"},
)
processCpuUsage = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "app_process_cpu_usage_percent",
Help: "Process CPU usage percentage",
},
)
// 数据库指标
dbConnectionPoolSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "app_db_connection_pool_size",
Help: "Database connection pool size",
},
[]string{"database","state"},
)
dbQueryDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "app_db_query_duration_seconds",
Help: "Database query duration in seconds",
Buckets: []float64{0.001,0.005,0.01,0.05,0.1,0.5,1.0},
},
[]string{"database","operation"},
)
// 缓存指标
cacheHitRatio = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "app_cache_hit_ratio",
Help: "Cache hit ratio",
},
[]string{"cache"},
)
)
func init() {
// 注册所有指标
prometheus.MustRegister(httpRequestsTotal)
prometheus.MustRegister(httpRequestDuration)
prometheus.MustRegister(activeConnections)
prometheus.MustRegister(processMemoryUsage)
prometheus.MustRegister(processCpuUsage)
prometheus.MustRegister(dbConnectionPoolSize)
prometheus.MustRegister(dbQueryDuration)
prometheus.MustRegister(cacheHitRatio)
}
// 更新系统指标
func updateSystemMetrics() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// 更新内存指标
processMemoryUsage.WithLabelValues("heap").Set(float64(m.HeapAlloc))
processMemoryUsage.WithLabelValues("stack").Set(float64(m.StackInuse))
processMemoryUsage.WithLabelValues("gc").Set(float64(m.GCStats.PauseTotalNs))
// 更新CPU指标(简化版本)
// 实际应用中应该使用更精确的CPU使用率计算方法
processCpuUsage.Set(0.0)
}
// 模拟HTTP请求处理
func simulateHTTPRequest() {
start := time.Now()
// 模拟处理
time.Sleep(time.Duration(10+time.Now().UnixNano()%50) * time.Millisecond)
// 记录请求指标
method := "GET"
endpoint := "/api/users"
status := "200"
httpRequestsTotal.WithLabelValues(method,endpoint,status).Inc()
httpRequestDuration.WithLabelValues(method,endpoint).Observe(time.Since(start).Seconds())
}
// 模拟数据库操作
func simulateDatabaseQuery() {
start := time.Now()
// 模拟查询
time.Sleep(time.Duration(5+time.Now().UnixNano()%20) * time.Millisecond)
// 记录查询指标
dbQueryDuration.WithLabelValues("mysql","select").Observe(time.Since(start).Seconds())
// 模拟连接池变化
active := 10 + int(time.Now().UnixNano())%5
idle := 5 + int(time.now().UnixNano())%5
dbConnectionPoolSize.WithLabelValues("mysql","active").Set(float64(active))
dbConnectionPoolSize.WithLabelValues("mysql","idle").Set(float64(idle))
}
// 模拟缓存操作
func simulateCacheOperation() {
// 模拟缓存命中/未命中
hit := time.Now().UnixNano()%10 < 7 // 70%命中率
cache := "redis"
total := 100.0
hits := 70.0 + float64(time.Now().UnixNano()%10-5)
misses := total - hits
ratio := hits / total
cacheHitRatio.WithLabelValues(cache).Set(ratio)
}
// 启用pprof分析
func enablePprof() {
http.HandleFunc("/debug/pprof/",pprof.Index)
http.HandleFunc("/debug/pprof/cmdline",pprof.Cmdline)
http.HandleFunc("/debug/pprof/profile",pprof.Profile)
http.HandleFunc("/debug/pprof/symbol",pprof.Symbol)
http.HandleFunc("/debug/pprof/trace",pprof.Trace)
}
// 健康检查端点
func healthCheckHandler(w http.ResponseWriter,r *http.Request) {
health := map[string]interface{}{
"status": "healthy",
"timestamp": time.Now().Unix(),
"version": "1.0.0",
"uptime": time.Since(startTime).Seconds(),
}
w.Header().Set("Content-Type","application/json")
json.NewEncoder(w).Encode(health)
}
// 业务API端点
func apiHandler(w http.ResponseWriter,r *http.Request) {
start := time.Now()
// 处理业务逻辑
simulateHTTPRequest()
simulateDatabaseQuery()
simulateCacheOperation()
// 返回响应
response := map[string]interface{}{
"message": "Hello,World!",
"timestamp": time.Now().Unix(),
"request_id": generateRequestID(),
}
w.Header().Set("Content-Type","application/json")
json.NewEncoder(w).Encode(response)
// 记录请求完成
httpRequestDuration.WithLabelValues(r.Method,r.URL.Path).Observe(time.Since(start).Seconds())
}
// 生成请求ID
func generateRequestID() string {
return fmt.Sprintf("%d-%s",time.Now().UnixNano(),"req")
}
var startTime time.Time
func main() {
startTime = time.Now()
// 设置路由
http.HandleFunc("/metrics",promhttp.Handler())
http.HandleFunc("/health",healthCheckHandler)
http.HandleFunc("/api/users",apiHandler)
// 启用pprof
enablePprof()
// 启动指标更新协程
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
updateSystemMetrics()
}
}()
// 启动业务模拟协程
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
simulateHTTPRequest()
simulateDatabaseQuery()
simulateCacheOperation()
}
}()
// 获取端口配置
port := "8080"
if envPort := os.Getenv("PORT"); envPort != "" {
if portNum,err := strconv.Atoi(envPort); err == nil {
port = envPort
}
}
fmt.Printf("Starting application on :%s\n",port)
fmt.Println("Metrics available at http://localhost:" + port + "/metrics")
fmt.Println("Health check at http://localhost:" + port + "/health")
if err := http.ListenAndServe(":"+port,nil); err != nil {
fmt.Printf("Error starting server: %v\n",err)
}
}
日志分析与聚合
ELK Stack配置
# Logstash配置示例
# logstash.conf
input {
# 文件输入
file {
path => "/var/log/app/*.log"
type => "application"
start_position => "beginning"
sincedb_path => "/dev/null"
}
# 系统日志
syslog {
port => 5000
type => "syslog"
}
# 应用日志
tcp {
port => 5044
type => "tcp_logs"
codec => json_lines
}
# Docker日志
docker {
path => "/var/lib/docker/containers/*/*.log"
type => "docker"
}
}
filter {
# 时间戳解析
if [type] == "application" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{DATA:logger} - %{GREEDYDATA:log_message}" }
}
date {
match => [ "timestamp","ISO8601" ]
}
}
# JSON解析
if [type] == "tcp_logs" {
json {
source => "message"
target => "parsed_message"
}
}
# 提取字段
mutate {
add_field => { "environment" => "${ENVIRONMENT:production}" }
add_field => { "service" => "${SERVICE_NAME:webapp}" }
add_field => { "cluster" => "${CLUSTER_NAME:us-west-2}" }
}
# 去除敏感信息
mutate {
gsub => [
"log_message",
"password=\\S+",
"password=***",
"token=\\S+",
"token=***"
]
}
}
output {
# Elasticsearch输出
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
document_type => "_doc"
template_name => "app-logs"
template_overwrite => true
}
# 调试输出
stdout {
codec => rubydebug
}
# 条件输出到不同的索引
if [level] == "ERROR" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "error-logs-%{+YYYY.MM.dd}"
document_type => "_doc"
}
}
}
日志分析脚本
#!/bin/bash
# 日志分析自动化脚本
LOG_DIR="/var/log/app"
ERROR_LOG="/var/log/app/error.log"
ANALYSIS_DIR="/tmp/log_analysis"
REPORT_FILE="$ANALYSIS_DIR/log_analysis_report.txt"
# 创建分析目录
mkdir -p "$ANALYSIS_DIR"
# 错误日志统计
error_log_analysis() {
echo "=== 错误日志分析 ===" >> "$REPORT_FILE"
# 统计错误数量
error_count=$(grep -r "\[ERROR\]" "$LOG_DIR" | wc -l)
echo "错误总数: $error_count" >> "$REPORT_FILE"
# 错误类型统计
echo "错误类型统计:" >> "$REPORT_FILE"
grep -r "\[ERROR\]" "$LOG_DIR" | grep -oP '(?<=ERROR\]).*?(?=\[| -)' | sort | uniq -c | sort -rn >> "$REPORT_FILE"
# 最近错误
echo "最近10个错误:" >> "$REPORT_FILE"
grep -r "\[ERROR\]" "$LOG_DIR" | tail -10 >> "$REPORT_FILE"
}
# 性能日志分析
performance_log_analysis() {
echo -e "\n=== 性能日志分析 ===" >> "$REPORT_FILE"
# 响应时间分析
echo "响应时间统计:" >> "$REPORT_FILE"
grep -oP 'response_time=\K[0-9.]+' "$LOG_DIR"/*.log | awk '{sum+=$1; count++} END {print "平均响应时间: " sum/count" " ms"}' >> "$REPORT_FILE"
grep -oP 'response_time=\K[0-9.]+' "$LOG_DIR"/*.log | sort -n | awk 'BEGIN{print "最小: " $1; max=$1} {if($1>max) max=$1} END{print "最大: " max}' >> "$REPORT_FILE"
# 数据库查询分析
echo "数据库查询统计:" >> "$REPORT_FILE"
grep -oP 'db_query=\K[0-9.]+' "$LOG_DIR"/*.log | wc -l >> "$REPORT_FILE"
# 慢查询识别
echo "慢查询(>100ms):" >> "$REPORT_FILE"
grep -oP 'db_query=\K[0-9.]+' "$LOG_DIR"/*.log | awk '$1 > 100 {count++} END {print "数量: " count}' >> "$REPORT_FILE"
}
# 访问日志分析
access_log_analysis() {
echo -e "\n=== 访问日志分析 ===" >> "$REPORT_FILE"
# 访问统计
echo "访问统计:" >> "$REPORT_FILE"
total_access=$(wc -l < "$LOG_DIR/access.log")
echo "总访问量: $total_access" >> "$REPORT_FILE"
# 状态码统计
echo "状态码分布:" >> "$REPORT_FILE"
awk '{print $9}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn >> "$REPORT_FILE"
# 热门API统计
echo "热门API:" >> "$REPORT_FILE"
awk '{print $7}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn | head -10 >> "$REPORT_FILE"
# 用户行为分析
echo "用户访问统计:" >> "$REPORT_FILE"
awk '{print $1}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn | head -20 >> "$REPORT_FILE"
}
# 异常模式检测
anomaly_detection() {
echo -e "\n=== 异常模式检测 ===" >> "$REPORT_FILE"
# 检查异常访问模式
echo "检测异常访问:" >> "$REPORT_FILE"
# 单IP高频访问
echo "高频访问IP:" >> "$REPORT_FILE"
awk '{print $1}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn | awk '$1 > 1000 {print}' >> "$REPORT_FILE"
# 异常请求时间
echo "异常请求时间:" >> "$REPORT_FILE"
grep -oP 'response_time=\K[0-9.]+' "$LOG_DIR/access.log" | awk '$1 > 5000 {print "异常响应时间: " $1 " ms"}' >> "$REPORT_FILE"
# 异常错误率
echo "错误率异常:" >> "$REPORT_FILE"
error_rate=$(grep -oP 'status=[45]\d{2}' "$LOG_DIR/access.log" | wc -l)
total_requests=$(wc -l < "$LOG_DIR/access.log")
rate=$((error_rate * 100 / total_requests))
if [ $rate -gt 5 ]; then
echo "警告: 错误率异常 $rate%" >> "$REPORT_FILE"
fi
}
# 日志趋势分析
trend_analysis() {
echo -e "\n=== 日志趋势分析 ===" >> "$REPORT_FILE"
# 按小时统计
echo "每小时访问量:" >> "$REPORT_FILE"
awk '{print $4}' "$LOG_DIR/access.log" | cut -d':' -f1 | sort | uniq -c >> "$REPORT_FILE"
# 错误趋势
echo "错误趋势:" >> "$REPORT_FILE"
grep "\[ERROR\]" "$LOG_DIR/*.log" | awk '{print $1" "$2}' | cut -d':' -f1 | sort | uniq -c >> "$REPORT_FILE"
}
# 生成HTML报告
generate_html_report() {
html_file="$ANALYSIS_DIR/log_analysis_report.html"
cat > "$html_file" << 'HTML'
<!DOCTYPE html>
<html>
<head>
<title>日志分析报告</title>
<style>
body { font-family: Arial,sans-serif; margin: 20px; }
.section { margin-bottom: 30px; }
.chart { width: 100%; height: 400px; margin: 20px 0; }
.alert { color: red; font-weight: bold; }
pre { background: #f4f4f4; padding: 10px; border-radius: 5px; }
</style>
</head>
<body>
<h1>日志分析报告</h1>
<p>生成时间: $(date)</p>
<div class="section">
<h2>错误日志分析</h2>
<pre>$(cat "$REPORT_FILE" | sed -n '/=== 错误日志分析 ===/,/^$/p')</pre>
</div>
<div class="section">
<h2>性能日志分析</h2>
<pre>$(cat "$REPORT_FILE" | sed -n '/=== 性能日志分析 ===/,/^$/p')</pre>
</div>
<div class="section">
<h2>访问日志分析</h2>
<pre>$(cat "$REPORT_FILE" | sed -n '/=== 访问日志分析 ===/,/^$/p')</pre>
</div>
<div class="section">
<h2>异常模式检测</h2>
<pre>$(cat "$REPORT_FILE" | sed -n '/=== 异常模式检测 ===/,/^$/p')</pre>
</div>
</body>
</html>
HTML
echo "HTML报告已生成: $html_file"
}
# 主函数
main() {
echo "开始日志分析..."
# 运行各种分析
error_log_analysis
performance_log_analysis
access_log_analysis
anomaly_detection
trend_analysis
# 生成报告
generate_html_report
echo "日志分析完成,报告保存到: $REPORT_FILE"
}
main
分布式链路追踪
Jaeger集成配置
package main
import (
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
)
// 初始化Jaeger追踪器
func initJaeger(serviceName string) (io.Closer,error) {
cfg := jaegercfg.Configuration{
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1.0,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
},
}
tracer,closer,err := cfg.New(
serviceName,
jaegercfg.Logger(jaeger.StdLogger),
)
if err != nil {
return nil,fmt.Errorf("无法初始化Jaeger: %w",err)
}
opentracing.SetGlobalTracer(tracer)
return closer,nil
}
// HTTP客户端包装器
type TracedHTTPClient struct {
client *http.Client
}
func NewTracedHTTPClient() *TracedHTTPClient {
return &TracedHTTPClient{
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (t *TracedHTTPClient) Do(req *http.Request) (*http.Response,error) {
span,ctx := opentracing.StartSpanFromContext(
req.Context(),
"HTTP-"+req.Method,
opentracing.Tag{Key: "http.url",Value: req.URL.String()},
opentracing.Tag{Key: "http.method",Value: req.Method},
)
defer span.Finish()
req = req.WithContext(opentracing.ContextWithSpan(ctx,span))
// 添加追踪头
if err := span.Tracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.TextMapCarrier(req.Header),
); err != nil {
return nil,fmt.Errorf("无法注入追踪头: %w",err)
}
return t.client.Do(req)
}
// 数据库操作包装器
type TracedDBClient struct {
dsn string
}
func NewTracedDBClient(dsn string) *TracedDBClient {
return &TracedDBClient{dsn: dsn}
}
func (db *TracedDBClient) Query(ctx context.Context,query string) ([]map[string]interface{},error) {
span,ctx := opentracing.StartSpanFromContext(
ctx,
"DB-Query",
opentracing.Tag{Key: "db.query",Value: query},
)
defer span.Finish()
// 模拟数据库查询
time.Sleep(time.Duration(10+time.Now().UnixNano()%20) * time.Millisecond)
// 记录查询结果
results := []map[string]interface{}{
{"id": 1,"name": "Alice"},
{"id": 2,"name": "Bob"},
}
return results,nil
}
func (db *TracedDBClient) Insert(ctx context.Context,table string,data map[string]interface{}) (int64,error) {
span,ctx := opentracing.StartSpanFromContext(
ctx,
"DB-Insert",
opentracing.Tag{Key: "db.table",Value: table},
)
defer span.Finish()
// 模拟数据库插入
time.Sleep(time.Duration(5+time.Now().UnixNano()%15) * time.Millisecond)
// 记录插入结果
id := time.Now().UnixNano()
return id,nil
}
// API处理函数
func handleAPIRequest(w http.ResponseWriter,r *http.Request) {
span := opentracing.SpanFromContext(r.Context())
if span == nil {
span = opentracing.StartSpan("handleAPIRequest")
defer span.Finish()
r = r.WithContext(opentracing.ContextWithSpan(r.Context(),span))
}
// 第一步:查询用户数据
userSpan,ctx := opentracing.StartSpanFromContext(r.Context(),"getUserData")
userSpan.SetTag("user.id",12345)
defer userSpan.Finish()
db := NewTracedDBClient("mysql://user:pass@localhost/mydb")
users,err := db.Query(ctx,"SELECT * FROM users WHERE id = 12345")
if err != nil {
http.Error(w,err.Error(),http.StatusInternalServerError)
return
}
// 第二步:缓存处理
cacheSpan,ctx := opentracing.StartSpanFromContext(r.Context(),"cacheUserData")
defer cacheSpan.Finish()
// 模拟缓存操作
time.Sleep(5 * time.Millisecond)
// 第三步:外部API调用
apiSpan,ctx := opentracing.StartSpanFromContext(r.Context(),"callExternalAPI")
defer apiSpan.Finish()
httpClient := NewTracedHTTPClient()
externalReq,_ := http.NewRequestWithContext(ctx,"GET","http://external-api/service",nil)
resp,err := httpClient.Do(externalReq)
if err != nil {
http.Error(w,err.Error(),http.StatusInternalServerError)
return
}
defer resp.Body.Close()
// 返回响应
response := map[string]interface{}{
"users": users,
"status": "success",
"trace_id": span.BaggageItem("trace_id"),
}
w.Header().Set("Content-Type","application/json")
fmt.Fprintf(w,"Trace ID: %s",span.BaggageItem("trace_id"))
}
func main() {
// 初始化Jaeger
closer,err := initJaeger("web-service")
if err != nil {
fmt.Printf("无法初始化Jaeger: %v\n",err)
return
}
defer closer.Close()
// 设置路由
http.HandleFunc("/api/users",handleAPIRequest)
// 启动服务
fmt.Println("Starting traced web service on :8080")
fmt.Println("Jaeger UI: http://localhost:16686")
if err := http.ListenAndServe(":8080",nil); err != nil {
fmt.Printf("Error starting server: %v\n",err)
}
}
自动化故障诊断
智能告警系统
#!/usr/bin/env python3
# 智能告警系统
import time
import json
import requests
from datetime import datetime,timedelta
from typing import Dict,List,Optional
class AlertRule:
def __init__(self,name: str,condition: str,threshold: float,duration: int):
self.name = name
self.condition = condition
self.threshold = threshold
self.duration = duration
self.violations = []
self.alert_sent = False
def check(self,current_value: float) -> bool:
"""检查是否违反规则"""
violated = False
if self.condition == ">" and current_value > self.threshold:
violated = True
elif self.condition == "<" and current_value < self.threshold:
violated = True
elif self.condition == "==" and current_value == self.threshold:
violated = True
elif self.condition == "!=" and current_value != self.threshold:
violated = True
return violated
def record_violation(self,timestamp: datetime,value: float):
"""记录违规情况"""
self.violations.append({
'timestamp': timestamp,
'value': value
})
# 检查是否需要告警
if len(self.violations) >= self.duration and not self.alert_sent:
self.alert_sent = True
return True
return False
def reset(self):
"""重置告警状态"""
self.violations = []
self.alert_sent = False
class AlertManager:
def __init__(self):
self.rules: List[AlertRule] = []
self.alert_history: List[Dict] = []
def add_rule(self,rule: AlertRule):
"""添加告警规则"""
self.rules.append(rule)
def check_metrics(self,metrics: Dict[str,float]):
"""检查指标是否触发告警"""
current_time = datetime.now()
alerts_triggered = []
for rule in self.rules:
metric_value = metrics.get(rule.name,0)
if rule.check(metric_value):
if rule.record_violation(current_time,metric_value):
alert = {
'rule': rule.name,
'condition': rule.condition,
'threshold': rule.threshold,
'current_value': metric_value,
'timestamp': current_time.isoformat(),
'violation_count': len(rule.violations)
}
alerts_triggered.append(alert)
self.alert_history.append(alert)
# 发送告警
self.send_alert(alert)
# 重置规则
rule.reset()
else:
# 清理过期的违规记录
rule.violations = [
v for v in rule.violations
if (current_time - v["timestamp"]).seconds() < 300
]
return alerts_triggered
def send_alert(self,alert: Dict):
"""发送告警通知"""
print(f"[ALERT] {alert["rule"]} 违反阈值!")
print(f" 当前值: {alert["current_value"]}")
print(f" 阈值: {alert["threshold"]}")
print(f" 违规次数: {alert["violation_count"]}")
print(f" 时间: {alert["timestamp"]}")
# 这里可以添加实际的告警发送逻辑
# - 发送邮件
# - 发送短信
# - 发送到Slack/钉钉
# - 调用Webhook
# - 创建工单
def get_alert_summary(self,hours: int = 24) -> Dict:
"""获取告警摘要"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_alerts = [
alert for alert in self.alert_history
if datetime.fromisoformat(alert["timestamp"]) > cutoff_time
]
# 按规则分组统计
alert_counts = {}
for alert in recent_alerts:
rule = alert["rule"]
alert_counts[rule] = alert_counts.get(rule,0) + 1
return {
'total_alerts': len(recent_alerts),
'alert_counts': alert_counts,
'time_range': f"过去{hours}小时"
}
class MetricCollector:
def __init__(self):
self.metrics_cache = {}
def collect_from_prometheus(self,prometheus_url: str) -> Dict[str,float]:
"""从Prometheus收集指标"""
try:
# 健康检查端点
response = requests.get(f"{prometheus_url}/-/healthy",timeout=5)
# 查询基础指标
queries = {
'cpu_usage': '100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[1m])) * 100)',
'memory_usage': '(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100',
'disk_usage': '(1 - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"}) * 100',
'load_average': 'node_load1'
}
metrics = {}
for name,query in queries.items():
try:
result = requests.get(
f"{prometheus_url}/api/v1/query",
params={'query': query},
timeout=10
)
if result.status_code == 200:
data = result.json()
if data["data"]["result"]:
value = float(data["data"]["result"][0]["value"][1])
metrics[name] = value
except Exception as e:
print(f"查询 {name} 失败: {e}")
return metrics
except Exception as e:
print(f"连接Prometheus失败: {e}")
return {}
def collect_from_application(self,app_url: str) -> Dict[str,float]:
"""从应用收集指标"""
try:
# 应用健康检查
response = requests.get(f"{app_url}/health",timeout=5)
# 应用指标端点
metrics_response = requests.get(f"{app_url}/metrics",timeout=5)
if metrics_response.status_code == 200:
# 解析Prometheus格式指标
metrics = {}
for line in metrics_response.text.split('\n'):
if line and not line.startswith('#'):
parts = line.split()
if len(parts) >= 2:
try:
name = parts[0]
value = float(parts[1])
metrics[name] = value
except ValueError:
pass
return metrics
return {}
except Exception as e:
print(f"连接应用失败: {e}")
return {}
def main():
print("智能告警系统启动")
print("=" * 50)
# 初始化组件
alert_manager = AlertManager()
metric_collector = MetricCollector()
# 配置告警规则
alert_manager.add_rule(AlertRule("cpu_usage",">",80.0,3)) # CPU使用率>80%持续3分钟
alert_manager.add_rule(AlertRule("memory_usage",">",90.0,5)) # 内存使用率>90%持续5分钟
alert_manager.add_rule(AlertRule("disk_usage",">",90.0,10)) # 磁盘使用率>90%持续10分钟
alert_manager.add_rule(AlertRule("app_response_time",">",1.0,2)) # 应用响应时间>1秒持续2分钟
# 配置数据源
prometheus_url = "http://localhost:9090"
app_url = "http://localhost:8080"
print(f"Prometheus地址: {prometheus_url}")
print(f"应用地址: {app_url}")
print(f"告警规则数量: {len(alert_manager.rules)}")
print()
# 开始监控循环
try:
while True:
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始监控检查...")
# 收集指标
prometheus_metrics = metric_collector.collect_from_prometheus(prometheus_url)
app_metrics = metric_collector.collect_from_application(app_url)
# 合并指标
all_metrics = {**prometheus_metrics,**app_metrics}
# 检查告警
new_alerts = alert_manager.check_metrics(all_metrics)
if new_alerts:
print(f"触发 {len(new_alerts)} 个新告警")
else:
print("没有新告警")
# 显示当前指标状态
print("当前指标状态:")
for name,value in all_metrics.items():
print(f" {name}: {value:.2f}")
# 等待下一次检查
time.sleep(60)
except KeyboardInterrupt:
print("\n监控停止")
# 生成告警摘要
summary = alert_manager.get_alert_summary(24)
print(f"\n24小时告警摘要:")
print(f"总告警数: {summary["total_alerts"]}")
print(f"告警分布:")
for rule,count in summary["alert_counts"].items():
print(f" {rule}: {count} 次")
if __name__ == "__main__":
main()
自动化故障恢复
#!/bin/bash
# 自动化故障恢复脚本
# 故障检测函数
detect_failure() {
local service=$1
local check_url=$2
echo "检查服务: $service"
if ! curl -f -s --max-time 5 "$check_url" > /dev/null; then
echo "检测到故障: $service 不可用"
return 1
fi
echo "服务正常: $service"
return 0
}
# 自动重启服务
restart_service() {
local service=$1
local max_attempts=3
local attempt=1
echo "尝试重启服务: $service"
while [ $attempt -le $max_attempts ]; do
echo "重启尝试 $attempt/$max_attempts"
if systemctl restart "$service"; then
echo "服务重启成功"
sleep 10
# 验证服务状态
if systemctl is-active --quiet "$service"; then
echo "服务运行正常"
return 0
fi
fi
attempt=$((attempt + 1))
sleep 5
done
echo "服务重启失败"
return 1
}
# 回滚部署
rollback_deployment() {
local app=$1
local environment=$2
echo "执行回滚: $app ($environment)"
# 获取上一个版本
local previous_version=$(kubectl get deployment "$app" -n "$environment" -o jsonpath='{.spec.template.spec.containers[0].image}' | awk -F':' '{print $2}')
previous_version=$((previous_version - 1))
if [ $previous_version -lt 1 ]; then
echo "没有可用的历史版本"
return 1
fi
echo "回滚到版本: $previous_version"
# 执行回滚
if kubectl rollout undo deployment/"$app" -n "$environment"; then
echo "回滚成功"
return 0
else
echo "回滚失败"
return 1
fi
}
# 扩容服务
scale_service() {
local service=$1
local replicas=$2
echo "扩容服务: $service 到 $replicas 个副本"
if kubectl scale deployment "$service" --replicas="$replicas"; then
echo "扩容成功"
return 0
else
echo "扩容失败"
return 1
fi
}
# 清理资源
cleanup_resources() {
local service=$1
echo "清理资源: $service"
# 清理死锁进程
local deadlocks=$(ps aux | grep "$service" | grep defunct | awk '{print $2}')
if [ -n "$deadlocks" ]; then
echo "发现死锁进程: $deadlocks"
kill -9 $deadlocks
fi
# 清理僵尸进程
local zombies=$(ps aux | grep "$service" | grep "Z" | awk '{print $2}')
if [ -n "$zombies" ]; then
echo "发现僵尸进程: $zombies"
# 僵尸进程需要等待父进程处理
fi
}
# 主恢复流程
auto_recovery() {
local service=$1
local check_url=$2
local recovery_strategy=$3
echo "开始自动化恢复流程"
echo "服务: $service"
echo "检查URL: $check_url"
echo "恢复策略: $recovery_strategy"
# 故障检测
if ! detect_failure "$service" "$check_url"; then
return 0
fi
echo "执行恢复策略: $recovery_strategy"
case "$recovery_strategy" in
restart)
restart_service "$service"
;;
rollback)
rollback_deployment "$service" "production"
;;
scale)
scale_service "$service" "4"
;;
cleanup)
cleanup_resources "$service"
restart_service "$service"
;;
*)
echo "未知的恢复策略: $recovery_strategy"
return 1
;;
esac
# 恢复验证
sleep 15
if detect_failure "$service" "$check_url"; then
echo "恢复成功"
return 0
else
echo "恢复失败,需要人工介入"
return 1
fi
}
# 集群级别的故障恢复
cluster_recovery() {
echo "集群级故障恢复"
# 检查节点状态
echo "检查节点状态..."
local not_ready_nodes=$(kubectl get nodes | grep "NotReady" | wc -l)
if [ $not_ready_nodes -gt 0 ]; then
echo "发现 $not_ready_nodes 个不可用节点"
# 标记节点为不可调度
kubectl get nodes | grep "NotReady" | awk '{print $1}' | while read node; do
kubectl cordon "$node"
kubectl drain "$node" --ignore-daemonsets --delete-emptydir-data
done
fi
# 检查Pod状态
echo "检查Pod状态..."
local crashed_pods=$(kubectl get pods --all-namespaces | grep "CrashLoopBackOff" | wc -l)
if [ $crashed_pods -gt 0 ]; then
echo "发现 $crashed_pods 个崩溃Pod"
# 重启崩溃的Pod
kubectl get pods --all-namespaces | grep "CrashLoopBackOff" | awk '{print $1}' | while read pod; do
kubectl delete pod "$pod" --all-namespaces
done
fi
echo "集群恢复完成"
}
# 主函数
main() {
case "${1:-help}" in
detect)
auto_recovery "$2" "$3" "restart"
;;
rollback)
auto_recovery "$2" "" "rollback"
;;
scale)
auto_recovery "$2" "" "scale"
;;
cluster)
cluster_recovery
;;
*)
echo "自动化故障恢复工具"
echo "用法: $0 {detect|rollback|scale|cluster} [service] [check_url]"
echo ""
echo "示例:"
echo " $0 detect webapp http://localhost:8080/health"
echo " $0 rollback webapp"
echo " $0 scale webapp"
echo " $0 cluster"
;;
esac
}
main "$@"
通过建立完整的可观测性平台和自动化运维体系,可以显著提升系统的可维护性和可靠性,实现从被动响应到主动预测的运维模式转变,为业务的稳定运行提供坚实保障。