故障注入与容错设计:构建健壮的日志系统

深入分析 Seastar Log Engine 的故障注入测试框架,包括崩溃模拟、损坏注入、恢复验证等容错设计实践。

容错设计的重要性

故障场景分类

在生产环境中,日志系统可能遇到各种故障:

故障类型发生概率影响范围恢复难度
进程崩溃单机
磁盘损坏单机
文件系统错误单机
网络分区集群
数据损坏单机

容错设计目标

  1. 数据不丢失:确保已确认的数据持久化
  2. 自动恢复:系统重启后自动恢复到一致状态
  3. 快速检测:及时发现数据损坏和异常
  4. 优雅降级:在部分故障时继续提供服务

故障注入测试框架

测试架构

class FaultInjector {
public:
    // 崩溃注入
    void inject_crash(CrashType type);
    
    // 数据损坏注入
    void inject_corruption(CorruptionLocation location, size_t offset);
    
    // 资源耗尽注入
    void inject_disk_full();
    void inject_memory_pressure();
    
    // 网络故障注入
    void inject_network_delay();
    void inject_network_partition();
    
private:
    std::mt19937 _rng;
    std::uniform_int_distribution<size_t> _dist;
};

测试场景设计

场景一:正常写入中的崩溃

TEST(Engine, CrashDuringWrite) {
    // 1. 启动引擎
    EngineConfig config;
    config.checkpoint_enabled = true;
    config.checkpoint_interval_seconds = 10;
    
    auto engine = LogEngine();
    co_await engine.start(config);
    
    // 2. 写入大量数据
    for (int i = 0; i < 10000; ++i) {
        co_await engine.append({.payload = "Message " + std::to_string(i)});
        if (i == 5000) {
            // 在中间崩溃
            fault_injector.inject_crash(CrashType::Kill9);
        }
    }
    
    // 3. 重启引擎
    co_await engine.stop();
    co_await engine.start(config);
    
    // 4. 验证恢复
    auto records = co_await engine.query(Query{.limit = 100000});
    ASSERT_GE(records.size(), 5000);  // 至少恢复到崩溃前的数据
}

场景二:Checkpoint 损坏

TEST(Engine, CorruptedCheckpoint) {
    auto engine = LogEngine();
    co_await engine.start(config);
    
    // 写入数据并触发 checkpoint
    for (int i = 0; i < 100; ++i) {
        co_await engine.append({.payload = "Message " + std::to_string(i)});
    }
    co_await seastar::sleep(std::chrono::seconds(15));
    
    // 损坏 checkpoint 文件
    auto checkpoint_path = get_checkpoint_path();
    fault_injector.corrupt_file(checkpoint_path);
    
    // 重启并验证自动恢复
    co_await engine.stop();
    co_await engine.start(config);
    
    // 应该从文件头恢复
    auto records = co_await engine.query(Query{});
    ASSERT_EQ(records.size(), 100);
}

场景三:日志尾部损坏

TEST(Engine, CorruptedLogTail) {
    auto engine = LogEngine();
    co_await engine.start(config);
    
    // 写入数据
    for (int i = 0; i < 1000; ++i) {
        co_await engine.append({.payload = "Message " + std::to_string(i)});
    }
    
    // 损坏最后 100 条记录
    auto log_path = get_active_log_path();
    fault_injector.corrupt_tail(log_path, 100 * avg_record_size);
    
    // 重启并验证自动截断
    co_await engine.stop();
    co_await engine.start(config);
    
    // 应该恢复前 900 条记录
    auto records = co_await engine.query(Query{});
    ASSERT_EQ(records.size(), 900);
}

数据损坏检测与恢复

CRC 校验机制

class RecordValidator {
public:
    struct ValidationResult {
        bool valid;
        std::string error;
        size_t offset;
    };
    
    ValidationResult validate_record(
        const std::vector<char>& record,
        size_t offset
    ) {
        // 1. 提取 CRC 字段
        auto crc_field = extract_crc_field(record);
        if (!crc_field) {
            return {.valid = false, .error = "Missing CRC", .offset = offset};
        }
        
        // 2. 重新计算 CRC
        uint32_t computed_crc = compute_crc32(record);
        
        // 3. 对比
        if (computed_crc != *crc_field) {
            return {
                .valid = false,
                .error = "CRC mismatch",
                .offset = offset
            };
        }
        
        return {.valid = true, .error = "", .offset = offset};
    }
};

损坏检测策略

策略一:完整扫描

auto validate_log_file(const std::string& log_path) {
    auto records = read_all_records(log_path);
    std::vector<ValidationResult> results;
    
    size_t offset = 0;
    for (const auto& record : records) {
        auto result = validator.validate_record(record, offset);
        results.push_back(result);
        
        if (!result.valid) {
            // 检测到损坏
            break;
        }
        
        offset += record.size();
    }
    
    return results;
}

策略二:尾部采样

auto validate_log_tail(const std::string& log_path, size_t sample_count) {
    auto records = read_all_records(log_path);
    size_t start = records.size() > sample_count 
        ? records.size() - sample_count 
        : 0;
    
    for (size_t i = start; i < records.size(); ++i) {
        auto result = validator.validate_record(records[i], 0);
        if (!result.valid) {
            return result;
        }
    }
    
    return ValidationResult{.valid = true};
}

自动恢复机制

seastar::future<> AsyncWriter::recovery_with_validation() {
    auto log_path = _active_segment.path;
    
    // 1. 尝试从 checkpoint 恢复
    auto checkpoint = read_checkpoint_file(log_path);
    if (checkpoint) {
        auto result = co_await recover_from_checkpoint(*checkpoint);
        if (result.success) {
            co_return;
        }
    }
    
    // 2. 从文件头恢复,带验证
    auto records = read_all_records(log_path);
    size_t last_valid_offset = 0;
    
    for (const auto& record : records) {
        auto validation = validate_record_crc(record);
        
        if (validation.valid) {
            last_valid_offset = record.offset;
        } else {
            // 检测到损坏,截断到有效位置
            co_await truncate_log(log_path, last_valid_offset);
            co_return;
        }
    }
}

资源耗尽处理

磁盘空间不足

seastar::future<> AsyncWriter::handle_disk_full() {
    // 1. 检测磁盘空间
    auto disk_usage = check_disk_space(_log_dir);
    
    if (disk_usage.available < 1024 * 1024 * 1024) {  // 小于 1GB
        // 2. 尝试清理旧归档
        co_await cleanup_old_archives();
        
        // 3. 重新检查
        disk_usage = check_disk_space(_log_dir);
        
        if (disk_usage.available < 512 * 1024 * 1024) {  // 小于 512MB
            // 4. 强制 rotate 当前日志
            co_await force_rotate();
            
            // 5. 如果还是不够,抛出异常
            disk_usage = check_disk_space(_log_dir);
            if (disk_usage.available < 100 * 1024 * 1024) {
                throw std::runtime_error("Disk space exhausted");
            }
        }
    }
}

内存压力处理

seastar::future<> AsyncWriter::handle_memory_pressure() {
    // 1. 检查内存使用
    auto memory_usage = get_memory_usage();
    
    if (memory_usage.heap_usage > _max_memory_usage) {
        // 2. 立即 flush
        co_await flush_batch();
        
        // 3. 释放缓存
        _append_writer.release_cache();
        
        // 4. 拒绝新写入,直到内存释放
        while (memory_usage.heap_usage > _max_memory_usage * 0.8) {
            co_await seastar::sleep(std::chrono::milliseconds(100));
            memory_usage = get_memory_usage();
        }
    }
}

网络分区处理

分布式场景下的路由策略

seastar::future<> LogEngine::append_with_fallback(LogMessage message) {
    const auto shard = route_to_shard(message.route_key);
    
    try {
        // 正常路由
        co_await _writers.invoke_on(shard, [msg = std::move(message)](AsyncWriter& writer) mutable {
            return writer.submit(std::move(msg));
        });
    } catch (const std::exception& e) {
        // 路由失败,尝试本地写入
        if (_config.allow_local_fallback) {
            co_await _writers.local().submit(std::move(message));
        } else {
            throw;
        }
    }
}

健康检查与自动恢复

class HealthMonitor {
public:
    enum class HealthStatus {
        Healthy,
        Degraded,
        Unhealthy
    };
    
    struct HealthSnapshot {
        HealthStatus status;
        std::string reason;
        std::map<std::string, uint64_t> recent_errors;
    };
    
    HealthSnapshot collect_health() {
        HealthSnapshot snapshot;
        
        // 收集各个模块的健康状态
        auto writer_errors = collect_writer_errors();
        auto reader_errors = collect_reader_errors();
        auto manager_errors = collect_manager_errors();
        
        // 综合评估
        if (has_critical_errors(writer_errors)) {
            snapshot.status = HealthStatus::Unhealthy;
            snapshot.reason = "Critical writer errors detected";
        } else if (has_errors(writer_errors) || has_errors(reader_errors)) {
            snapshot.status = HealthStatus::Degraded;
            snapshot.reason = "Non-critical errors detected";
        } else {
            snapshot.status = HealthStatus::Healthy;
            snapshot.reason = "All systems nominal";
        }
        
        return snapshot;
    }
    
private:
    bool has_critical_errors(const ErrorCounts& errors) {
        return errors.crc_failures > 10 ||
               errors.io_failures > 5 ||
               errors.recovery_failures > 2;
    }
};

测试覆盖矩阵

故障注入场景

场景测试类型预期结果
进程崩溃Kill 9从 checkpoint 恢复
进程崩溃Kill 15完整数据恢复
Checkpoint 损坏文件损坏从文件头恢复
日志尾部损坏截断自动截断恢复
日志中间损坏随机损坏检测并停止
磁盘满模拟清理归档或报错
内存压力模拟释放内存并拒绝写入
网络分区模拟本地降级或失败

恢复验证

# 运行故障注入测试
./script/test_fault_injection.sh

# 运行长稳测试
./script/bench_soak.sh \
  --duration-seconds 300 \
  --messages-per-run 1000 \
  --restart-interval 3

# 运行综合故障测试
./script/test_soak_and_fault.sh \
  --duration-seconds 60 \
  --messages-per-run 1000 \
  --restart-interval 3

监控与告警

健康指标

seastar::metrics::group("log_engine_health")
    .make_gauge("status", _health_status)
    .make_counter("crash_recoveries", _crash_recoveries)
    .make_counter("corruption_detections", _corruption_detections)
    .make_counter("auto_truncates", _auto_truncates)
    .make_counter("fallback_activations", _fallback_activations);

告警规则

# 健康状态告警
if health_status != "Healthy":
    alert(f"Health status degraded: {health_status}")

# 损坏检测告警
if corruption_detections > threshold:
    alert("Data corruption detected, investigation required")

# 崩溃恢复告警
if crash_recoveries > 5 in last_hour:
    alert("Frequent crashes detected")

# 自动截断告警
if auto_truncates > 10 in last_hour:
    alert("Frequent log truncations, possible disk issues")

最佳实践

1. 定期故障演练

# 每月执行一次故障演练
./script/fault_injection_drill.sh --scenarios all

2. 数据验证

# 每天验证日志完整性
./build/log_engine_verify --path ./logs --recursive

3. 备份策略

# 定期备份 checkpoint 文件
rsync -avz ./logs/*.checkpoint /backup/checkpoints/

4. 监控覆盖

  • 健康状态监控
  • 资源使用监控
  • 错误率监控
  • 恢复成功率监控

总结

Seastar Log Engine 的容错设计:

  1. 故障注入测试:覆盖各种故障场景
  2. 数据完整性:CRC 校验保证数据正确性
  3. 自动恢复:进程重启后自动恢复到一致状态
  4. 健康监控:实时监控系统健康状态
  5. 优雅降级:故障时尽量提供服务

关键特性

  • Checkpoint 机制加速恢复
  • CRC 校验检测数据损坏
  • 自动截断处理损坏尾部
  • 健康检查及时发现异常
  • 资源监控防止耗尽

下一篇:《兼容层设计与最佳实践:glog/spdlog 兼容使用》

相关阅读