故障注入与容错设计:构建健壮的日志系统
深入分析 Seastar Log Engine 的故障注入测试框架,包括崩溃模拟、损坏注入、恢复验证等容错设计实践。
容错设计的重要性
故障场景分类
在生产环境中,日志系统可能遇到各种故障:
| 故障类型 | 发生概率 | 影响范围 | 恢复难度 |
|---|---|---|---|
| 进程崩溃 | 高 | 单机 | 低 |
| 磁盘损坏 | 低 | 单机 | 中 |
| 文件系统错误 | 中 | 单机 | 中 |
| 网络分区 | 中 | 集群 | 高 |
| 数据损坏 | 低 | 单机 | 高 |
容错设计目标
- 数据不丢失:确保已确认的数据持久化
- 自动恢复:系统重启后自动恢复到一致状态
- 快速检测:及时发现数据损坏和异常
- 优雅降级:在部分故障时继续提供服务
故障注入测试框架
测试架构
class FaultInjector {
public:
// 崩溃注入
void inject_crash(CrashType type);
// 数据损坏注入
void inject_corruption(CorruptionLocation location, size_t offset);
// 资源耗尽注入
void inject_disk_full();
void inject_memory_pressure();
// 网络故障注入
void inject_network_delay();
void inject_network_partition();
private:
std::mt19937 _rng;
std::uniform_int_distribution<size_t> _dist;
};
测试场景设计
场景一:正常写入中的崩溃
TEST(Engine, CrashDuringWrite) {
// 1. 启动引擎
EngineConfig config;
config.checkpoint_enabled = true;
config.checkpoint_interval_seconds = 10;
auto engine = LogEngine();
co_await engine.start(config);
// 2. 写入大量数据
for (int i = 0; i < 10000; ++i) {
co_await engine.append({.payload = "Message " + std::to_string(i)});
if (i == 5000) {
// 在中间崩溃
fault_injector.inject_crash(CrashType::Kill9);
}
}
// 3. 重启引擎
co_await engine.stop();
co_await engine.start(config);
// 4. 验证恢复
auto records = co_await engine.query(Query{.limit = 100000});
ASSERT_GE(records.size(), 5000); // 至少恢复到崩溃前的数据
}
场景二:Checkpoint 损坏
TEST(Engine, CorruptedCheckpoint) {
auto engine = LogEngine();
co_await engine.start(config);
// 写入数据并触发 checkpoint
for (int i = 0; i < 100; ++i) {
co_await engine.append({.payload = "Message " + std::to_string(i)});
}
co_await seastar::sleep(std::chrono::seconds(15));
// 损坏 checkpoint 文件
auto checkpoint_path = get_checkpoint_path();
fault_injector.corrupt_file(checkpoint_path);
// 重启并验证自动恢复
co_await engine.stop();
co_await engine.start(config);
// 应该从文件头恢复
auto records = co_await engine.query(Query{});
ASSERT_EQ(records.size(), 100);
}
场景三:日志尾部损坏
TEST(Engine, CorruptedLogTail) {
auto engine = LogEngine();
co_await engine.start(config);
// 写入数据
for (int i = 0; i < 1000; ++i) {
co_await engine.append({.payload = "Message " + std::to_string(i)});
}
// 损坏最后 100 条记录
auto log_path = get_active_log_path();
fault_injector.corrupt_tail(log_path, 100 * avg_record_size);
// 重启并验证自动截断
co_await engine.stop();
co_await engine.start(config);
// 应该恢复前 900 条记录
auto records = co_await engine.query(Query{});
ASSERT_EQ(records.size(), 900);
}
数据损坏检测与恢复
CRC 校验机制
class RecordValidator {
public:
struct ValidationResult {
bool valid;
std::string error;
size_t offset;
};
ValidationResult validate_record(
const std::vector<char>& record,
size_t offset
) {
// 1. 提取 CRC 字段
auto crc_field = extract_crc_field(record);
if (!crc_field) {
return {.valid = false, .error = "Missing CRC", .offset = offset};
}
// 2. 重新计算 CRC
uint32_t computed_crc = compute_crc32(record);
// 3. 对比
if (computed_crc != *crc_field) {
return {
.valid = false,
.error = "CRC mismatch",
.offset = offset
};
}
return {.valid = true, .error = "", .offset = offset};
}
};
损坏检测策略
策略一:完整扫描
auto validate_log_file(const std::string& log_path) {
auto records = read_all_records(log_path);
std::vector<ValidationResult> results;
size_t offset = 0;
for (const auto& record : records) {
auto result = validator.validate_record(record, offset);
results.push_back(result);
if (!result.valid) {
// 检测到损坏
break;
}
offset += record.size();
}
return results;
}
策略二:尾部采样
auto validate_log_tail(const std::string& log_path, size_t sample_count) {
auto records = read_all_records(log_path);
size_t start = records.size() > sample_count
? records.size() - sample_count
: 0;
for (size_t i = start; i < records.size(); ++i) {
auto result = validator.validate_record(records[i], 0);
if (!result.valid) {
return result;
}
}
return ValidationResult{.valid = true};
}
自动恢复机制
seastar::future<> AsyncWriter::recovery_with_validation() {
auto log_path = _active_segment.path;
// 1. 尝试从 checkpoint 恢复
auto checkpoint = read_checkpoint_file(log_path);
if (checkpoint) {
auto result = co_await recover_from_checkpoint(*checkpoint);
if (result.success) {
co_return;
}
}
// 2. 从文件头恢复,带验证
auto records = read_all_records(log_path);
size_t last_valid_offset = 0;
for (const auto& record : records) {
auto validation = validate_record_crc(record);
if (validation.valid) {
last_valid_offset = record.offset;
} else {
// 检测到损坏,截断到有效位置
co_await truncate_log(log_path, last_valid_offset);
co_return;
}
}
}
资源耗尽处理
磁盘空间不足
seastar::future<> AsyncWriter::handle_disk_full() {
// 1. 检测磁盘空间
auto disk_usage = check_disk_space(_log_dir);
if (disk_usage.available < 1024 * 1024 * 1024) { // 小于 1GB
// 2. 尝试清理旧归档
co_await cleanup_old_archives();
// 3. 重新检查
disk_usage = check_disk_space(_log_dir);
if (disk_usage.available < 512 * 1024 * 1024) { // 小于 512MB
// 4. 强制 rotate 当前日志
co_await force_rotate();
// 5. 如果还是不够,抛出异常
disk_usage = check_disk_space(_log_dir);
if (disk_usage.available < 100 * 1024 * 1024) {
throw std::runtime_error("Disk space exhausted");
}
}
}
}
内存压力处理
seastar::future<> AsyncWriter::handle_memory_pressure() {
// 1. 检查内存使用
auto memory_usage = get_memory_usage();
if (memory_usage.heap_usage > _max_memory_usage) {
// 2. 立即 flush
co_await flush_batch();
// 3. 释放缓存
_append_writer.release_cache();
// 4. 拒绝新写入,直到内存释放
while (memory_usage.heap_usage > _max_memory_usage * 0.8) {
co_await seastar::sleep(std::chrono::milliseconds(100));
memory_usage = get_memory_usage();
}
}
}
网络分区处理
分布式场景下的路由策略
seastar::future<> LogEngine::append_with_fallback(LogMessage message) {
const auto shard = route_to_shard(message.route_key);
try {
// 正常路由
co_await _writers.invoke_on(shard, [msg = std::move(message)](AsyncWriter& writer) mutable {
return writer.submit(std::move(msg));
});
} catch (const std::exception& e) {
// 路由失败,尝试本地写入
if (_config.allow_local_fallback) {
co_await _writers.local().submit(std::move(message));
} else {
throw;
}
}
}
健康检查与自动恢复
class HealthMonitor {
public:
enum class HealthStatus {
Healthy,
Degraded,
Unhealthy
};
struct HealthSnapshot {
HealthStatus status;
std::string reason;
std::map<std::string, uint64_t> recent_errors;
};
HealthSnapshot collect_health() {
HealthSnapshot snapshot;
// 收集各个模块的健康状态
auto writer_errors = collect_writer_errors();
auto reader_errors = collect_reader_errors();
auto manager_errors = collect_manager_errors();
// 综合评估
if (has_critical_errors(writer_errors)) {
snapshot.status = HealthStatus::Unhealthy;
snapshot.reason = "Critical writer errors detected";
} else if (has_errors(writer_errors) || has_errors(reader_errors)) {
snapshot.status = HealthStatus::Degraded;
snapshot.reason = "Non-critical errors detected";
} else {
snapshot.status = HealthStatus::Healthy;
snapshot.reason = "All systems nominal";
}
return snapshot;
}
private:
bool has_critical_errors(const ErrorCounts& errors) {
return errors.crc_failures > 10 ||
errors.io_failures > 5 ||
errors.recovery_failures > 2;
}
};
测试覆盖矩阵
故障注入场景
| 场景 | 测试类型 | 预期结果 |
|---|---|---|
| 进程崩溃 | Kill 9 | 从 checkpoint 恢复 |
| 进程崩溃 | Kill 15 | 完整数据恢复 |
| Checkpoint 损坏 | 文件损坏 | 从文件头恢复 |
| 日志尾部损坏 | 截断 | 自动截断恢复 |
| 日志中间损坏 | 随机损坏 | 检测并停止 |
| 磁盘满 | 模拟 | 清理归档或报错 |
| 内存压力 | 模拟 | 释放内存并拒绝写入 |
| 网络分区 | 模拟 | 本地降级或失败 |
恢复验证
# 运行故障注入测试
./script/test_fault_injection.sh
# 运行长稳测试
./script/bench_soak.sh \
--duration-seconds 300 \
--messages-per-run 1000 \
--restart-interval 3
# 运行综合故障测试
./script/test_soak_and_fault.sh \
--duration-seconds 60 \
--messages-per-run 1000 \
--restart-interval 3
监控与告警
健康指标
seastar::metrics::group("log_engine_health")
.make_gauge("status", _health_status)
.make_counter("crash_recoveries", _crash_recoveries)
.make_counter("corruption_detections", _corruption_detections)
.make_counter("auto_truncates", _auto_truncates)
.make_counter("fallback_activations", _fallback_activations);
告警规则
# 健康状态告警
if health_status != "Healthy":
alert(f"Health status degraded: {health_status}")
# 损坏检测告警
if corruption_detections > threshold:
alert("Data corruption detected, investigation required")
# 崩溃恢复告警
if crash_recoveries > 5 in last_hour:
alert("Frequent crashes detected")
# 自动截断告警
if auto_truncates > 10 in last_hour:
alert("Frequent log truncations, possible disk issues")
最佳实践
1. 定期故障演练
# 每月执行一次故障演练
./script/fault_injection_drill.sh --scenarios all
2. 数据验证
# 每天验证日志完整性
./build/log_engine_verify --path ./logs --recursive
3. 备份策略
# 定期备份 checkpoint 文件
rsync -avz ./logs/*.checkpoint /backup/checkpoints/
4. 监控覆盖
- 健康状态监控
- 资源使用监控
- 错误率监控
- 恢复成功率监控
总结
Seastar Log Engine 的容错设计:
- 故障注入测试:覆盖各种故障场景
- 数据完整性:CRC 校验保证数据正确性
- 自动恢复:进程重启后自动恢复到一致状态
- 健康监控:实时监控系统健康状态
- 优雅降级:故障时尽量提供服务
关键特性:
- Checkpoint 机制加速恢复
- CRC 校验检测数据损坏
- 自动截断处理损坏尾部
- 健康检查及时发现异常
- 资源监控防止耗尽
下一篇:《兼容层设计与最佳实践:glog/spdlog 兼容使用》
相关阅读: