可观测性平台与自动化运维

构建完整的可观测性平台,实现从指标监控到自动化故障诊断的智能运维体系

在现代复杂的分布式系统中,传统的监控手段已经无法满足运维需求。可观测性平台通过整合指标、日志、链路追踪三大支柱,结合自动化运维技术,实现了从被动响应到主动预测的转变。本文将深入探讨如何构建完整的可观测性平台和自动化运维体系。

可观测性三大支柱

可观测性架构设计

Rendering diagram...

指标收集架构

# Prometheus配置示例
# prometheus.yml

global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    cluster: 'production'
    environment: 'us-west-2'

# 告警管理器配置
alerting:
  alertmanagers:
    - static_configs:
     - targets:
       - 'alertmanager:9093'

# 告警规则文件
rule_files:
  - 'alerts/*.yml'

# 数据采集配置
scrape_configs:
  # Prometheus自监控
  - job_name: 'prometheus'
    static_configs:
   - targets: ["localhost:9090"]

  # Linux节点监控
  - job_name: 'node'
    static_configs:
   - targets: ["node-exporter:9100"]
    relabel_configs:
   - source_labels: [__address__]
     target_label: instance
     replacement: 'linux-server-01'

  # 应用服务监控
  - job_name: 'webapp'
    static_configs:
   - targets: ["webapp:8080"]
     labels:
       service: 'web-application'
       team: 'backend'
    metrics_path: '/actuator/prometheus'
    scrape_interval: 5s

  # Kubernetes监控
  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
   - role: pod
    relabel_configs:
   - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
     action: keep
     regex: true
   - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
     action: replace
     target_label: __metrics_path__
     regex: (.+)
     replacement: $1
   - action: labelmap
     regex: __meta_kubernetes_pod_label_(.+)
     replacement: $1

自定义Exporter实现

package main

import (
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"runtime"
	"strconv"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// 自定义应用指标
var (
	// 业务指标
	httpRequestsTotal = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "app_http_requests_total",
			Help: "Total number of HTTP requests",
		},
		[]string{"method","endpoint","status"},
	)

	httpRequestDuration = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "app_http_request_duration_seconds",
			Help:    "HTTP request duration in seconds",
			Buckets: prometheus.DefBuckets,
		},
		[]string{"method","endpoint"},
	)

	activeConnections = prometheus.NewGauge(
		prometheus.GaugeOpts{
			Name: "app_active_connections",
			Help: "Current number of active connections",
		},
	)

	// 系统指标
	processMemoryUsage = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "app_process_memory_usage_bytes",
			Help: "Process memory usage in bytes",
		},
		[]string{"type"},
	)

	processCpuUsage = prometheus.NewGauge(
		prometheus.GaugeOpts{
			Name: "app_process_cpu_usage_percent",
			Help: "Process CPU usage percentage",
		},
	)

	// 数据库指标
	dbConnectionPoolSize = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "app_db_connection_pool_size",
			Help: "Database connection pool size",
		},
		[]string{"database","state"},
	)

	dbQueryDuration = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "app_db_query_duration_seconds",
			Help:    "Database query duration in seconds",
			Buckets: []float64{0.001,0.005,0.01,0.05,0.1,0.5,1.0},
		},
		[]string{"database","operation"},
	)

	// 缓存指标
	cacheHitRatio = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "app_cache_hit_ratio",
			Help: "Cache hit ratio",
		},
		[]string{"cache"},
	)
)

func init() {
	// 注册所有指标
	prometheus.MustRegister(httpRequestsTotal)
	prometheus.MustRegister(httpRequestDuration)
	prometheus.MustRegister(activeConnections)
	prometheus.MustRegister(processMemoryUsage)
	prometheus.MustRegister(processCpuUsage)
	prometheus.MustRegister(dbConnectionPoolSize)
	prometheus.MustRegister(dbQueryDuration)
	prometheus.MustRegister(cacheHitRatio)
}

// 更新系统指标
func updateSystemMetrics() {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)

	// 更新内存指标
	processMemoryUsage.WithLabelValues("heap").Set(float64(m.HeapAlloc))
	processMemoryUsage.WithLabelValues("stack").Set(float64(m.StackInuse))
	processMemoryUsage.WithLabelValues("gc").Set(float64(m.GCStats.PauseTotalNs))

	// 更新CPU指标(简化版本)
	// 实际应用中应该使用更精确的CPU使用率计算方法
	processCpuUsage.Set(0.0)
}

// 模拟HTTP请求处理
func simulateHTTPRequest() {
	start := time.Now()

	// 模拟处理
	time.Sleep(time.Duration(10+time.Now().UnixNano()%50) * time.Millisecond)

	// 记录请求指标
	method := "GET"
	endpoint := "/api/users"
	status := "200"

	httpRequestsTotal.WithLabelValues(method,endpoint,status).Inc()
	httpRequestDuration.WithLabelValues(method,endpoint).Observe(time.Since(start).Seconds())
}

// 模拟数据库操作
func simulateDatabaseQuery() {
	start := time.Now()

	// 模拟查询
	time.Sleep(time.Duration(5+time.Now().UnixNano()%20) * time.Millisecond)

	// 记录查询指标
	dbQueryDuration.WithLabelValues("mysql","select").Observe(time.Since(start).Seconds())

	// 模拟连接池变化
	active := 10 + int(time.Now().UnixNano())%5
	idle := 5 + int(time.now().UnixNano())%5

	dbConnectionPoolSize.WithLabelValues("mysql","active").Set(float64(active))
	dbConnectionPoolSize.WithLabelValues("mysql","idle").Set(float64(idle))
}

// 模拟缓存操作
func simulateCacheOperation() {
	// 模拟缓存命中/未命中
	hit := time.Now().UnixNano()%10 < 7  // 70%命中率

	cache := "redis"
	total := 100.0
	hits := 70.0 + float64(time.Now().UnixNano()%10-5)
	misses := total - hits

	ratio := hits / total
	cacheHitRatio.WithLabelValues(cache).Set(ratio)
}

// 启用pprof分析
func enablePprof() {
	http.HandleFunc("/debug/pprof/",pprof.Index)
	http.HandleFunc("/debug/pprof/cmdline",pprof.Cmdline)
	http.HandleFunc("/debug/pprof/profile",pprof.Profile)
	http.HandleFunc("/debug/pprof/symbol",pprof.Symbol)
	http.HandleFunc("/debug/pprof/trace",pprof.Trace)
}

// 健康检查端点
func healthCheckHandler(w http.ResponseWriter,r *http.Request) {
	health := map[string]interface{}{
		"status": "healthy",
		"timestamp": time.Now().Unix(),
		"version": "1.0.0",
		"uptime": time.Since(startTime).Seconds(),
	}

	w.Header().Set("Content-Type","application/json")
	json.NewEncoder(w).Encode(health)
}

// 业务API端点
func apiHandler(w http.ResponseWriter,r *http.Request) {
	start := time.Now()

	// 处理业务逻辑
	simulateHTTPRequest()
	simulateDatabaseQuery()
	simulateCacheOperation()

	// 返回响应
	response := map[string]interface{}{
		"message": "Hello,World!",
		"timestamp": time.Now().Unix(),
		"request_id": generateRequestID(),
	}

	w.Header().Set("Content-Type","application/json")
	json.NewEncoder(w).Encode(response)

	// 记录请求完成
	httpRequestDuration.WithLabelValues(r.Method,r.URL.Path).Observe(time.Since(start).Seconds())
}

// 生成请求ID
func generateRequestID() string {
	return fmt.Sprintf("%d-%s",time.Now().UnixNano(),"req")
}

var startTime time.Time

func main() {
	startTime = time.Now()

	// 设置路由
	http.HandleFunc("/metrics",promhttp.Handler())
	http.HandleFunc("/health",healthCheckHandler)
	http.HandleFunc("/api/users",apiHandler)

	// 启用pprof
	enablePprof()

	// 启动指标更新协程
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()

		for range ticker.C {
			updateSystemMetrics()
		}
	}()

	// 启动业务模拟协程
	go func() {
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()

		for range ticker.C {
			simulateHTTPRequest()
			simulateDatabaseQuery()
			simulateCacheOperation()
		}
	}()

	// 获取端口配置
	port := "8080"
	if envPort := os.Getenv("PORT"); envPort != "" {
		if portNum,err := strconv.Atoi(envPort); err == nil {
			port = envPort
		}
	}

	fmt.Printf("Starting application on :%s\n",port)
	fmt.Println("Metrics available at http://localhost:" + port + "/metrics")
	fmt.Println("Health check at http://localhost:" + port + "/health")

	if err := http.ListenAndServe(":"+port,nil); err != nil {
		fmt.Printf("Error starting server: %v\n",err)
	}
}

日志分析与聚合

ELK Stack配置

# Logstash配置示例
# logstash.conf

input {
  # 文件输入
  file {
    path => "/var/log/app/*.log"
    type => "application"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
  
  # 系统日志
  syslog {
    port => 5000
    type => "syslog"
  }
  
  # 应用日志
  tcp {
    port => 5044
    type => "tcp_logs"
    codec => json_lines
  }
  
  # Docker日志
  docker {
    path => "/var/lib/docker/containers/*/*.log"
    type => "docker"
  }
}

filter {
  # 时间戳解析
  if [type] == "application" {
    grok {
   match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{DATA:logger} - %{GREEDYDATA:log_message}" }
    }
    
    date {
   match => [ "timestamp","ISO8601" ]
    }
  }
  
  # JSON解析
  if [type] == "tcp_logs" {
    json {
   source => "message"
   target => "parsed_message"
    }
  }
  
  # 提取字段
  mutate {
    add_field => { "environment" => "${ENVIRONMENT:production}" }
    add_field => { "service" => "${SERVICE_NAME:webapp}" }
    add_field => { "cluster" => "${CLUSTER_NAME:us-west-2}" }
  }
  
  # 去除敏感信息
  mutate {
    gsub => [
   "log_message",
   "password=\\S+",
   "password=***",
   "token=\\S+",
   "token=***"
    ]
  }
}

output {
  # Elasticsearch输出
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
    document_type => "_doc"
    template_name => "app-logs"
    template_overwrite => true
  }
  
  # 调试输出
  stdout {
    codec => rubydebug
  }
  
  # 条件输出到不同的索引
  if [level] == "ERROR" {
    elasticsearch {
   hosts => ["elasticsearch:9200"]
   index => "error-logs-%{+YYYY.MM.dd}"
   document_type => "_doc"
    }
  }
}

日志分析脚本

#!/bin/bash
# 日志分析自动化脚本

LOG_DIR="/var/log/app"
ERROR_LOG="/var/log/app/error.log"
ANALYSIS_DIR="/tmp/log_analysis"
REPORT_FILE="$ANALYSIS_DIR/log_analysis_report.txt"

# 创建分析目录
mkdir -p "$ANALYSIS_DIR"

# 错误日志统计
error_log_analysis() {
    echo "=== 错误日志分析 ===" >> "$REPORT_FILE"
    
    # 统计错误数量
    error_count=$(grep -r "\[ERROR\]" "$LOG_DIR" | wc -l)
    echo "错误总数: $error_count" >> "$REPORT_FILE"
    
    # 错误类型统计
    echo "错误类型统计:" >> "$REPORT_FILE"
    grep -r "\[ERROR\]" "$LOG_DIR" | grep -oP '(?<=ERROR\]).*?(?=\[| -)' | sort | uniq -c | sort -rn >> "$REPORT_FILE"
    
    # 最近错误
    echo "最近10个错误:" >> "$REPORT_FILE"
    grep -r "\[ERROR\]" "$LOG_DIR" | tail -10 >> "$REPORT_FILE"
}

# 性能日志分析
performance_log_analysis() {
    echo -e "\n=== 性能日志分析 ===" >> "$REPORT_FILE"
    
    # 响应时间分析
    echo "响应时间统计:" >> "$REPORT_FILE"
    grep -oP 'response_time=\K[0-9.]+' "$LOG_DIR"/*.log | awk '{sum+=$1; count++} END {print "平均响应时间: " sum/count" " ms"}' >> "$REPORT_FILE"
    
    grep -oP 'response_time=\K[0-9.]+' "$LOG_DIR"/*.log | sort -n | awk 'BEGIN{print "最小: " $1; max=$1} {if($1>max) max=$1} END{print "最大: " max}' >> "$REPORT_FILE"
    
    # 数据库查询分析
    echo "数据库查询统计:" >> "$REPORT_FILE"
    grep -oP 'db_query=\K[0-9.]+' "$LOG_DIR"/*.log | wc -l >> "$REPORT_FILE"
    
    # 慢查询识别
    echo "慢查询(>100ms):" >> "$REPORT_FILE"
    grep -oP 'db_query=\K[0-9.]+' "$LOG_DIR"/*.log | awk '$1 > 100 {count++} END {print "数量: " count}' >> "$REPORT_FILE"
}

# 访问日志分析
access_log_analysis() {
    echo -e "\n=== 访问日志分析 ===" >> "$REPORT_FILE"
    
    # 访问统计
    echo "访问统计:" >> "$REPORT_FILE"
    total_access=$(wc -l < "$LOG_DIR/access.log")
    echo "总访问量: $total_access" >> "$REPORT_FILE"
    
    # 状态码统计
    echo "状态码分布:" >> "$REPORT_FILE"
    awk '{print $9}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn >> "$REPORT_FILE"
    
    # 热门API统计
    echo "热门API:" >> "$REPORT_FILE"
    awk '{print $7}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn | head -10 >> "$REPORT_FILE"
    
    # 用户行为分析
    echo "用户访问统计:" >> "$REPORT_FILE"
    awk '{print $1}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn | head -20 >> "$REPORT_FILE"
}

# 异常模式检测
anomaly_detection() {
    echo -e "\n=== 异常模式检测 ===" >> "$REPORT_FILE"
    
    # 检查异常访问模式
    echo "检测异常访问:" >> "$REPORT_FILE"
    
    # 单IP高频访问
    echo "高频访问IP:" >> "$REPORT_FILE"
    awk '{print $1}' "$LOG_DIR/access.log" | sort | uniq -c | sort -rn | awk '$1 > 1000 {print}' >> "$REPORT_FILE"
    
    # 异常请求时间
    echo "异常请求时间:" >> "$REPORT_FILE"
    grep -oP 'response_time=\K[0-9.]+' "$LOG_DIR/access.log" | awk '$1 > 5000 {print "异常响应时间: " $1 " ms"}' >> "$REPORT_FILE"
    
    # 异常错误率
    echo "错误率异常:" >> "$REPORT_FILE"
    error_rate=$(grep -oP 'status=[45]\d{2}' "$LOG_DIR/access.log" | wc -l)
    total_requests=$(wc -l < "$LOG_DIR/access.log")
    rate=$((error_rate * 100 / total_requests))
    
    if [ $rate -gt 5 ]; then
     echo "警告: 错误率异常 $rate%" >> "$REPORT_FILE"
    fi
}

# 日志趋势分析
trend_analysis() {
    echo -e "\n=== 日志趋势分析 ===" >> "$REPORT_FILE"
    
    # 按小时统计
    echo "每小时访问量:" >> "$REPORT_FILE"
    awk '{print $4}' "$LOG_DIR/access.log" | cut -d':' -f1 | sort | uniq -c >> "$REPORT_FILE"
    
    # 错误趋势
    echo "错误趋势:" >> "$REPORT_FILE"
    grep "\[ERROR\]" "$LOG_DIR/*.log" | awk '{print $1" "$2}' | cut -d':' -f1 | sort | uniq -c >> "$REPORT_FILE"
}

# 生成HTML报告
generate_html_report() {
    html_file="$ANALYSIS_DIR/log_analysis_report.html"
    
    cat > "$html_file" << 'HTML'
<!DOCTYPE html>
<html>
<head>
    <title>日志分析报告</title>
    <style>
     body { font-family: Arial,sans-serif; margin: 20px; }
     .section { margin-bottom: 30px; }
     .chart { width: 100%; height: 400px; margin: 20px 0; }
     .alert { color: red; font-weight: bold; }
     pre { background: #f4f4f4; padding: 10px; border-radius: 5px; }
    </style>
</head>
<body>
    <h1>日志分析报告</h1>
    <p>生成时间: $(date)</p>
    
    <div class="section">
     <h2>错误日志分析</h2>
     <pre>$(cat "$REPORT_FILE" | sed -n '/=== 错误日志分析 ===/,/^$/p')</pre>
    </div>
    
    <div class="section">
     <h2>性能日志分析</h2>
     <pre>$(cat "$REPORT_FILE" | sed -n '/=== 性能日志分析 ===/,/^$/p')</pre>
    </div>
    
    <div class="section">
     <h2>访问日志分析</h2>
     <pre>$(cat "$REPORT_FILE" | sed -n '/=== 访问日志分析 ===/,/^$/p')</pre>
    </div>
    
    <div class="section">
     <h2>异常模式检测</h2>
     <pre>$(cat "$REPORT_FILE" | sed -n '/=== 异常模式检测 ===/,/^$/p')</pre>
    </div>
</body>
</html>
HTML
    
    echo "HTML报告已生成: $html_file"
}

# 主函数
main() {
    echo "开始日志分析..."
    
    # 运行各种分析
    error_log_analysis
    performance_log_analysis
    access_log_analysis
    anomaly_detection
    trend_analysis
    
    # 生成报告
    generate_html_report
    
    echo "日志分析完成,报告保存到: $REPORT_FILE"
}

main

分布式链路追踪

Jaeger集成配置

package main

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"time"

	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go"
	jaegercfg "github.com/uber/jaeger-client-go/config"
)

// 初始化Jaeger追踪器
func initJaeger(serviceName string) (io.Closer,error) {
	cfg := jaegercfg.Configuration{
		Sampler: &jaegercfg.SamplerConfig{
			Type:  jaeger.SamplerTypeConst,
			Param: 1.0,
		},
		Reporter: &jaegercfg.ReporterConfig{
			LogSpans: true,
		},
	}

	tracer,closer,err := cfg.New(
		serviceName,
		jaegercfg.Logger(jaeger.StdLogger),
	)
	if err != nil {
		return nil,fmt.Errorf("无法初始化Jaeger: %w",err)
	}

	opentracing.SetGlobalTracer(tracer)
	return closer,nil
}

// HTTP客户端包装器
type TracedHTTPClient struct {
	client *http.Client
}

func NewTracedHTTPClient() *TracedHTTPClient {
	return &TracedHTTPClient{
		client: &http.Client{Timeout: 30 * time.Second},
	}
}

func (t *TracedHTTPClient) Do(req *http.Request) (*http.Response,error) {
	span,ctx := opentracing.StartSpanFromContext(
		req.Context(),
		"HTTP-"+req.Method,
		opentracing.Tag{Key: "http.url",Value: req.URL.String()},
		opentracing.Tag{Key: "http.method",Value: req.Method},
	)
	defer span.Finish()

	req = req.WithContext(opentracing.ContextWithSpan(ctx,span))

	// 添加追踪头
	if err := span.Tracer().Inject(
		span.Context(),
		opentracing.HTTPHeaders,
		opentracing.TextMapCarrier(req.Header),
	); err != nil {
		return nil,fmt.Errorf("无法注入追踪头: %w",err)
	}

	return t.client.Do(req)
}

// 数据库操作包装器
type TracedDBClient struct {
	dsn string
}

func NewTracedDBClient(dsn string) *TracedDBClient {
	return &TracedDBClient{dsn: dsn}
}

func (db *TracedDBClient) Query(ctx context.Context,query string) ([]map[string]interface{},error) {
	span,ctx := opentracing.StartSpanFromContext(
		ctx,
		"DB-Query",
		opentracing.Tag{Key: "db.query",Value: query},
	)
	defer span.Finish()

	// 模拟数据库查询
	time.Sleep(time.Duration(10+time.Now().UnixNano()%20) * time.Millisecond)

	// 记录查询结果
	results := []map[string]interface{}{
		{"id": 1,"name": "Alice"},
		{"id": 2,"name": "Bob"},
	}

	return results,nil
}

func (db *TracedDBClient) Insert(ctx context.Context,table string,data map[string]interface{}) (int64,error) {
	span,ctx := opentracing.StartSpanFromContext(
		ctx,
		"DB-Insert",
		opentracing.Tag{Key: "db.table",Value: table},
	)
	defer span.Finish()

	// 模拟数据库插入
	time.Sleep(time.Duration(5+time.Now().UnixNano()%15) * time.Millisecond)

	// 记录插入结果
	id := time.Now().UnixNano()

	return id,nil
}

// API处理函数
func handleAPIRequest(w http.ResponseWriter,r *http.Request) {
	span := opentracing.SpanFromContext(r.Context())
	if span == nil {
		span = opentracing.StartSpan("handleAPIRequest")
		defer span.Finish()
		r = r.WithContext(opentracing.ContextWithSpan(r.Context(),span))
	}

	// 第一步:查询用户数据
	userSpan,ctx := opentracing.StartSpanFromContext(r.Context(),"getUserData")
	userSpan.SetTag("user.id",12345)
	defer userSpan.Finish()

	db := NewTracedDBClient("mysql://user:pass@localhost/mydb")
	users,err := db.Query(ctx,"SELECT * FROM users WHERE id = 12345")
	if err != nil {
		http.Error(w,err.Error(),http.StatusInternalServerError)
		return
	}

	// 第二步:缓存处理
	cacheSpan,ctx := opentracing.StartSpanFromContext(r.Context(),"cacheUserData")
	defer cacheSpan.Finish()

	// 模拟缓存操作
	time.Sleep(5 * time.Millisecond)

	// 第三步:外部API调用
	apiSpan,ctx := opentracing.StartSpanFromContext(r.Context(),"callExternalAPI")
	defer apiSpan.Finish()

	httpClient := NewTracedHTTPClient()
	externalReq,_ := http.NewRequestWithContext(ctx,"GET","http://external-api/service",nil)
	resp,err := httpClient.Do(externalReq)
	if err != nil {
		http.Error(w,err.Error(),http.StatusInternalServerError)
		return
	}
	defer resp.Body.Close()

	// 返回响应
	response := map[string]interface{}{
		"users": users,
		"status": "success",
		"trace_id": span.BaggageItem("trace_id"),
	}

	w.Header().Set("Content-Type","application/json")
	fmt.Fprintf(w,"Trace ID: %s",span.BaggageItem("trace_id"))
}

func main() {
	// 初始化Jaeger
	closer,err := initJaeger("web-service")
	if err != nil {
		fmt.Printf("无法初始化Jaeger: %v\n",err)
		return
	}
	defer closer.Close()

	// 设置路由
	http.HandleFunc("/api/users",handleAPIRequest)

	// 启动服务
	fmt.Println("Starting traced web service on :8080")
	fmt.Println("Jaeger UI: http://localhost:16686")

	if err := http.ListenAndServe(":8080",nil); err != nil {
		fmt.Printf("Error starting server: %v\n",err)
	}
}

自动化故障诊断

智能告警系统

#!/usr/bin/env python3
# 智能告警系统

import time
import json
import requests
from datetime import datetime,timedelta
from typing import Dict,List,Optional

class AlertRule:
    def __init__(self,name: str,condition: str,threshold: float,duration: int):
     self.name = name
     self.condition = condition
     self.threshold = threshold
     self.duration = duration
     self.violations = []
     self.alert_sent = False
     
    def check(self,current_value: float) -> bool:
     """检查是否违反规则"""
     violated = False
     
     if self.condition == ">" and current_value > self.threshold:
         violated = True
     elif self.condition == "<" and current_value < self.threshold:
         violated = True
     elif self.condition == "==" and current_value == self.threshold:
         violated = True
     elif self.condition == "!=" and current_value != self.threshold:
         violated = True
         
     return violated
    
    def record_violation(self,timestamp: datetime,value: float):
     """记录违规情况"""
     self.violations.append({
         'timestamp': timestamp,
         'value': value
     })
     
     # 检查是否需要告警
     if len(self.violations) >= self.duration and not self.alert_sent:
         self.alert_sent = True
         return True
     
     return False
    
    def reset(self):
     """重置告警状态"""
     self.violations = []
     self.alert_sent = False

class AlertManager:
    def __init__(self):
     self.rules: List[AlertRule] = []
     self.alert_history: List[Dict] = []
     
    def add_rule(self,rule: AlertRule):
     """添加告警规则"""
     self.rules.append(rule)
     
    def check_metrics(self,metrics: Dict[str,float]):
     """检查指标是否触发告警"""
     current_time = datetime.now()
     alerts_triggered = []
     
     for rule in self.rules:
         metric_value = metrics.get(rule.name,0)
         
         if rule.check(metric_value):
             if rule.record_violation(current_time,metric_value):
                 alert = {
                     'rule': rule.name,
                     'condition': rule.condition,
                     'threshold': rule.threshold,
                     'current_value': metric_value,
                     'timestamp': current_time.isoformat(),
                     'violation_count': len(rule.violations)
                 }
                 
                 alerts_triggered.append(alert)
                 self.alert_history.append(alert)
                 
                 # 发送告警
                 self.send_alert(alert)
                 
                 # 重置规则
                 rule.reset()
         else:
             # 清理过期的违规记录
             rule.violations = [
                 v for v in rule.violations 
                 if (current_time - v["timestamp"]).seconds() < 300
             ]
             
     return alerts_triggered
    
    def send_alert(self,alert: Dict):
     """发送告警通知"""
     print(f"[ALERT] {alert["rule"]} 违反阈值!")
     print(f"  当前值: {alert["current_value"]}")
     print(f"  阈值: {alert["threshold"]}")
     print(f"  违规次数: {alert["violation_count"]}")
     print(f"  时间: {alert["timestamp"]}")
     
     # 这里可以添加实际的告警发送逻辑
     # - 发送邮件
     # - 发送短信
     # - 发送到Slack/钉钉
     # - 调用Webhook
     # - 创建工单
     
    def get_alert_summary(self,hours: int = 24) -> Dict:
     """获取告警摘要"""
     cutoff_time = datetime.now() - timedelta(hours=hours)
     
     recent_alerts = [
         alert for alert in self.alert_history
         if datetime.fromisoformat(alert["timestamp"]) > cutoff_time
     ]
     
     # 按规则分组统计
     alert_counts = {}
     for alert in recent_alerts:
         rule = alert["rule"]
         alert_counts[rule] = alert_counts.get(rule,0) + 1
     
     return {
         'total_alerts': len(recent_alerts),
         'alert_counts': alert_counts,
         'time_range': f"过去{hours}小时"
     }

class MetricCollector:
    def __init__(self):
     self.metrics_cache = {}
     
    def collect_from_prometheus(self,prometheus_url: str) -> Dict[str,float]:
     """从Prometheus收集指标"""
     try:
         # 健康检查端点
         response = requests.get(f"{prometheus_url}/-/healthy",timeout=5)
         
         # 查询基础指标
         queries = {
             'cpu_usage': '100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[1m])) * 100)',
             'memory_usage': '(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100',
             'disk_usage': '(1 - node_filesystem_avail_bytes{mountpoint="/"} / node_filesystem_size_bytes{mountpoint="/"}) * 100',
             'load_average': 'node_load1'
         }
         
         metrics = {}
         for name,query in queries.items():
             try:
                 result = requests.get(
                     f"{prometheus_url}/api/v1/query",
                     params={'query': query},
                     timeout=10
                 )
                 
                 if result.status_code == 200:
                     data = result.json()
                     if data["data"]["result"]:
                         value = float(data["data"]["result"][0]["value"][1])
                         metrics[name] = value
             except Exception as e:
                 print(f"查询 {name} 失败: {e}")
                 
         return metrics
         
     except Exception as e:
         print(f"连接Prometheus失败: {e}")
         return {}
    
    def collect_from_application(self,app_url: str) -> Dict[str,float]:
     """从应用收集指标"""
     try:
         # 应用健康检查
         response = requests.get(f"{app_url}/health",timeout=5)
         
         # 应用指标端点
         metrics_response = requests.get(f"{app_url}/metrics",timeout=5)
         
         if metrics_response.status_code == 200:
             # 解析Prometheus格式指标
             metrics = {}
             for line in metrics_response.text.split('\n'):
                 if line and not line.startswith('#'):
                     parts = line.split()
                     if len(parts) >= 2:
                         try:
                             name = parts[0]
                             value = float(parts[1])
                             metrics[name] = value
                         except ValueError:
                             pass
                             
             return metrics
         return {}
         
     except Exception as e:
         print(f"连接应用失败: {e}")
         return {}

def main():
    print("智能告警系统启动")
    print("=" * 50)
    
    # 初始化组件
    alert_manager = AlertManager()
    metric_collector = MetricCollector()
    
    # 配置告警规则
    alert_manager.add_rule(AlertRule("cpu_usage",">",80.0,3))      # CPU使用率>80%持续3分钟
    alert_manager.add_rule(AlertRule("memory_usage",">",90.0,5)) # 内存使用率>90%持续5分钟
    alert_manager.add_rule(AlertRule("disk_usage",">",90.0,10))    # 磁盘使用率>90%持续10分钟
    alert_manager.add_rule(AlertRule("app_response_time",">",1.0,2)) # 应用响应时间>1秒持续2分钟
    
    # 配置数据源
    prometheus_url = "http://localhost:9090"
    app_url = "http://localhost:8080"
    
    print(f"Prometheus地址: {prometheus_url}")
    print(f"应用地址: {app_url}")
    print(f"告警规则数量: {len(alert_manager.rules)}")
    print()
    
    # 开始监控循环
    try:
     while True:
         print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始监控检查...")
         
         # 收集指标
         prometheus_metrics = metric_collector.collect_from_prometheus(prometheus_url)
         app_metrics = metric_collector.collect_from_application(app_url)
         
         # 合并指标
         all_metrics = {**prometheus_metrics,**app_metrics}
         
         # 检查告警
         new_alerts = alert_manager.check_metrics(all_metrics)
         
         if new_alerts:
             print(f"触发 {len(new_alerts)} 个新告警")
         else:
             print("没有新告警")
         
         # 显示当前指标状态
         print("当前指标状态:")
         for name,value in all_metrics.items():
             print(f"  {name}: {value:.2f}")
         
         # 等待下一次检查
         time.sleep(60)
         
    except KeyboardInterrupt:
     print("\n监控停止")
     
     # 生成告警摘要
     summary = alert_manager.get_alert_summary(24)
     print(f"\n24小时告警摘要:")
     print(f"总告警数: {summary["total_alerts"]}")
     print(f"告警分布:")
     for rule,count in summary["alert_counts"].items():
         print(f"  {rule}: {count} 次")

if __name__ == "__main__":
    main()

自动化故障恢复

#!/bin/bash
# 自动化故障恢复脚本

# 故障检测函数
detect_failure() {
    local service=$1
    local check_url=$2
    
    echo "检查服务: $service"
    
    if ! curl -f -s --max-time 5 "$check_url" > /dev/null; then
     echo "检测到故障: $service 不可用"
     return 1
    fi
    
    echo "服务正常: $service"
    return 0
}

# 自动重启服务
restart_service() {
    local service=$1
    local max_attempts=3
    local attempt=1
    
    echo "尝试重启服务: $service"
    
    while [ $attempt -le $max_attempts ]; do
     echo "重启尝试 $attempt/$max_attempts"
     
     if systemctl restart "$service"; then
         echo "服务重启成功"
         sleep 10
         
         # 验证服务状态
         if systemctl is-active --quiet "$service"; then
             echo "服务运行正常"
             return 0
         fi
     fi
     
     attempt=$((attempt + 1))
     sleep 5
    done
    
    echo "服务重启失败"
    return 1
}

# 回滚部署
rollback_deployment() {
    local app=$1
    local environment=$2
    
    echo "执行回滚: $app ($environment)"
    
    # 获取上一个版本
    local previous_version=$(kubectl get deployment "$app" -n "$environment" -o jsonpath='{.spec.template.spec.containers[0].image}' | awk -F':' '{print $2}')
    previous_version=$((previous_version - 1))
    
    if [ $previous_version -lt 1 ]; then
     echo "没有可用的历史版本"
     return 1
    fi
    
    echo "回滚到版本: $previous_version"
    
    # 执行回滚
    if kubectl rollout undo deployment/"$app" -n "$environment"; then
     echo "回滚成功"
     return 0
    else
     echo "回滚失败"
     return 1
    fi
}

# 扩容服务
scale_service() {
    local service=$1
    local replicas=$2
    
    echo "扩容服务: $service 到 $replicas 个副本"
    
    if kubectl scale deployment "$service" --replicas="$replicas"; then
     echo "扩容成功"
     return 0
    else
     echo "扩容失败"
     return 1
    fi
}

# 清理资源
cleanup_resources() {
    local service=$1
    
    echo "清理资源: $service"
    
    # 清理死锁进程
    local deadlocks=$(ps aux | grep "$service" | grep defunct | awk '{print $2}')
    if [ -n "$deadlocks" ]; then
     echo "发现死锁进程: $deadlocks"
     kill -9 $deadlocks
    fi
    
    # 清理僵尸进程
    local zombies=$(ps aux | grep "$service" | grep "Z" | awk '{print $2}')
    if [ -n "$zombies" ]; then
     echo "发现僵尸进程: $zombies"
     # 僵尸进程需要等待父进程处理
    fi
}

# 主恢复流程
auto_recovery() {
    local service=$1
    local check_url=$2
    local recovery_strategy=$3
    
    echo "开始自动化恢复流程"
    echo "服务: $service"
    echo "检查URL: $check_url"
    echo "恢复策略: $recovery_strategy"
    
    # 故障检测
    if ! detect_failure "$service" "$check_url"; then
     return 0
    fi
    
    echo "执行恢复策略: $recovery_strategy"
    
    case "$recovery_strategy" in
     restart)
         restart_service "$service"
         ;;
         
     rollback)
         rollback_deployment "$service" "production"
         ;;
         
     scale)
         scale_service "$service" "4"
         ;;
         
     cleanup)
         cleanup_resources "$service"
         restart_service "$service"
         ;;
         
     *)
         echo "未知的恢复策略: $recovery_strategy"
         return 1
         ;;
    esac
    
    # 恢复验证
    sleep 15
    if detect_failure "$service" "$check_url"; then
     echo "恢复成功"
     return 0
    else
     echo "恢复失败,需要人工介入"
     return 1
    fi
}

# 集群级别的故障恢复
cluster_recovery() {
    echo "集群级故障恢复"
    
    # 检查节点状态
    echo "检查节点状态..."
    local not_ready_nodes=$(kubectl get nodes | grep "NotReady" | wc -l)
    
    if [ $not_ready_nodes -gt 0 ]; then
     echo "发现 $not_ready_nodes 个不可用节点"
     
     # 标记节点为不可调度
     kubectl get nodes | grep "NotReady" | awk '{print $1}' | while read node; do
         kubectl cordon "$node"
         kubectl drain "$node" --ignore-daemonsets --delete-emptydir-data
     done
    fi
    
    # 检查Pod状态
    echo "检查Pod状态..."
    local crashed_pods=$(kubectl get pods --all-namespaces | grep "CrashLoopBackOff" | wc -l)
    
    if [ $crashed_pods -gt 0 ]; then
     echo "发现 $crashed_pods 个崩溃Pod"
     
     # 重启崩溃的Pod
     kubectl get pods --all-namespaces | grep "CrashLoopBackOff" | awk '{print $1}' | while read pod; do
         kubectl delete pod "$pod" --all-namespaces
     done
    fi
    
    echo "集群恢复完成"
}

# 主函数
main() {
    case "${1:-help}" in
     detect)
         auto_recovery "$2" "$3" "restart"
         ;;
     rollback)
         auto_recovery "$2" "" "rollback"
         ;;
     scale)
         auto_recovery "$2" "" "scale"
         ;;
     cluster)
         cluster_recovery
         ;;
     *)
         echo "自动化故障恢复工具"
         echo "用法: $0 {detect|rollback|scale|cluster} [service] [check_url]"
         echo ""
         echo "示例:"
         echo "  $0 detect webapp http://localhost:8080/health"
         echo "  $0 rollback webapp"
         echo "  $0 scale webapp"
         echo "  $0 cluster"
         ;;
    esac
}

main "$@"

通过建立完整的可观测性平台和自动化运维体系,可以显著提升系统的可维护性和可靠性,实现从被动响应到主动预测的运维模式转变,为业务的稳定运行提供坚实保障。