数据一致性与恢复模型:Checkpoint 与 Crash Recovery 的实现
深入分析 Seastar Log Engine 的一致性保证机制,包括 Checkpoint 的设计原理、崩溃恢复流程以及数据完整性校验。
一致性保证的挑战
问题场景
在日志系统中,进程可能随时崩溃:
时间线:
T1: 写入消息 1
T2: 写入消息 2
T3: 进程崩溃
T4: 重启恢复
关键问题:
- 消息 1 和 2 是否真正写入磁盘?
- 恢复时从哪里开始读取?
- 如何避免数据丢失或重复?
一致性级别
| 级别 | 定义 | 实现成本 |
|---|---|---|
| Loose | 不保证 | 低 |
| Soft | 重启后可恢复大部分 | 中 |
| Strong | 重启后不丢失任何数据 | 高 |
Seastar Log Engine 提供 Soft 到 Strong 之间的可配置级别。
Checkpoint 机制
Checkpoint 的作用
Checkpoint 是一个"检查点",记录系统在某个时刻的状态:
Active Log:
┌─────────────────────────────────────────────┐
│ Msg 1 │ Msg 2 │ Msg 3 │ Msg 4 │ Msg 5 │
│ │ │ │ │ │
└───────┴───────┴───────┴───────┴─────────────┘
↑ ↑
Checkpoint 写入位置
(Msg 2 之前已持久化)
Checkpoint 结构
struct Checkpoint {
uint64_t sequence_number; // 最后一条已确认的序列号
uint64_t file_offset; // 对应的文件偏移量
std::string shard_id; // shard 标识
std::chrono::system_clock::time_point timestamp;
void serialize(std::ostream& os) const;
static Checkpoint deserialize(std::istream& is);
};
Checkpoint 写入流程
seastar::future<> AsyncWriter::create_checkpoint() {
// 1. 确保当前批次落盘
co_await flush_batch();
// 2. 获取当前状态
Checkpoint checkpoint{
.sequence_number = _last_confirmed_sequence,
.file_offset = _append_writer.get_file_offset(),
.shard_id = fmt::format("shard-{}", seastar::this_shard_id()),
.timestamp = std::chrono::system_clock::now()
};
// 3. 写入 checkpoint 文件(原子操作)
std::string checkpoint_path = fmt::format(
"{}/shard-{}.checkpoint",
_log_dir,
seastar::this_shard_id()
);
std::vector<char> checkpoint_data;
checkpoint.serialize_to(checkpoint_data);
// 4. 先写入临时文件
std::string temp_path = checkpoint_path + ".tmp";
co_await write_file_atomically(temp_path, checkpoint_data);
// 5. 重命名为正式文件(原子操作)
co_await seastar::rename_file(temp_path, checkpoint_path);
}
定时 Checkpoint
seastar::future<> AsyncWriter::start(const EngineConfig& config) {
// ...
if (config.checkpoint_enabled) {
_checkpoint_timer.arm_periodic(
std::chrono::seconds(config.checkpoint_interval_seconds)
);
_checkpoint_timer.set_callback([this] {
create_checkpoint();
});
}
}
配置建议:
// 平衡性能和安全性
config.checkpoint_interval_seconds = 60; // 每分钟一次
// 高安全性场景
config.checkpoint_interval_seconds = 10; // 每10秒一次
// 高性能场景
config.checkpoint_interval_seconds = 300; // 每5分钟一次
崩溃恢复流程
恢复策略
┌─────────────────────────────────────────────┐
│ 检查 checkpoint 文件是否存在? │
└──────────┬──────────────────────────────────┘
│
┌─────┴─────┐
│ │
存在 不存在
│ │
↓ ↓
从 checkpoint 从文件头
位置恢复 开始扫描
从 Checkpoint 恢复
seastar::future<> AsyncWriter::recovery_from_checkpoint() {
std::string checkpoint_path = fmt::format(
"{}/shard-{}.checkpoint",
_log_dir,
seastar::this_shard_id()
);
// 1. 读取 checkpoint
Checkpoint checkpoint;
try {
auto data = co_await read_file(checkpoint_path);
checkpoint = Checkpoint::deserialize(data);
} catch (const std::exception& e) {
// checkpoint 文件损坏,从头开始
co_await recovery_from_scratch();
co_return;
}
// 2. 验证 checkpoint 有效性
if (!validate_checkpoint(checkpoint)) {
co_await recovery_from_scratch();
co_return;
}
// 3. 从 checkpoint 位置开始读取
std::string log_path = fmt::format(
"{}/shard-{}.log",
_log_dir,
seastar::this_shard_id()
);
auto records = co_await read_records_from(
log_path,
checkpoint.file_offset
);
// 4. 验证记录完整性
for (const auto& record : records) {
if (!verify_record_crc(record)) {
// 检测到损坏记录,截断日志
co_await truncate_log(log_path, record.offset);
break;
}
}
// 5. 恢复内部状态
_last_confirmed_sequence = checkpoint.sequence_number;
_file_offset = checkpoint.file_offset;
}
从文件头恢复
seastar::future<> AsyncWriter::recovery_from_scratch() {
std::string log_path = fmt::format(
"{}/shard-{}.log",
_log_dir,
seastar::this_shard_id()
);
// 1. 读取所有记录
auto records = co_await read_all_records(log_path);
// 2. 扫描并验证
size_t last_valid_offset = 0;
for (const auto& record : records) {
if (verify_record_crc(record)) {
last_valid_offset = record.offset;
} else {
// 检测到损坏记录,截断日志
co_await truncate_log(log_path, last_valid_offset);
break;
}
}
// 3. 恢复内部状态
_last_confirmed_sequence = records.back().sequence_number;
_file_offset = last_valid_offset;
}
损坏尾部处理
seastar::future<> AsyncWriter::truncate_log(
const std::string& log_path,
size_t valid_offset
) {
// 1. 打开文件
auto file = co_await seastar::open_file_dma(
log_path,
seastar::open_flags::rw
);
// 2. 截断到有效位置
co_await file.truncate(valid_offset);
// 3. 关闭文件
co_await file.close();
// 4. 记录日志
log_info("Truncated log at offset {}", valid_offset);
}
数据完整性校验
CRC 校验
每条记录都包含 CRC32 校验和:
class RecordCodec {
public:
static void encode(const LogMessage& message, std::vector<char>& buffer) {
// ... 编码数据 ...
// 计算 CRC
uint32_t crc = compute_crc32(buffer.data(), buffer.size());
// 追加 CRC
auto crc_bytes = reinterpret_cast<const char*>(&crc);
buffer.insert(buffer.end(), crc_bytes, crc_bytes + sizeof(uint32_t));
}
static bool verify_crc(const std::vector<char>& buffer) {
if (buffer.size() < sizeof(uint32_t)) {
return false;
}
// 分离数据和 CRC
size_t data_size = buffer.size() - sizeof(uint32_t);
uint32_t expected_crc = *reinterpret_cast<const uint32_t*>(
buffer.data() + data_size
);
// 重新计算 CRC
uint32_t computed_crc = compute_crc32(buffer.data(), data_size);
return expected_crc == computed_crc;
}
private:
static uint32_t compute_crc32(const char* data, size_t size) {
boost::crc_32_type crc;
crc.process_bytes(data, size);
return crc.checksum();
}
};
校验流程
读取记录
↓
提取数据部分
↓
计算 CRC32
↓
对比记录中的 CRC32
↓
一致 → 记录有效
不一致 → 记录损坏
校验优化策略
策略一:全量校验
// 恢复时全量校验
for (const auto& record : records) {
if (!verify_record_crc(record)) {
// 处理损坏记录
break;
}
}
优点:安全性最高
缺点:恢复时间长
策略二:抽样校验
// 每隔 100 条校验一次
for (size_t i = 0; i < records.size(); ++i) {
if (i % 100 == 0) {
if (!verify_record_crc(records[i])) {
break;
}
}
}
优点:恢复速度快
缺点:可能漏掉损坏记录
策略三:尾部校验(推荐)
// 只校验最后几条记录
size_t check_count = std::min(records.size(), size_t(100));
for (size_t i = records.size() - check_count; i < records.size(); ++i) {
if (!verify_record_crc(records[i])) {
break;
}
}
优点:恢复快 + 安全性高
配置:
config.recovery_verify_mode = RecoveryVerifyMode::tail_only;
config.recovery_tail_check_count = 100;
Crash Consistency 语义
语义定义
不同配置下,崩溃后的保证不同:
| 配置 | 崩溃后丢失 | 崩溃后重复 |
|---|---|---|
write_ack + 无 checkpoint | ≤ 1 批次 | 无 |
write_ack + 有 checkpoint | ≤ checkpoint 间隔 | 无 |
sync_ack + 无 checkpoint | 0 | 无 |
sync_ack + 有 checkpoint | 0 | 无 |
示例
// 配置:write_ack + checkpoint_interval = 60s
时间线:
T0: 进程启动
T10: Checkpoint A (seq=100)
T30: 写入消息 101-200
T40: 进程崩溃
T45: 进程恢复
结果:
- 消息 1-100 已持久化(Checkpoint A)
- 消息 101-200 可能丢失(最多 100 条)
- 没有重复消息
故障注入测试
测试场景
# 测试脚本:test_fault_injection.sh
# 1. 正常写入
./build/log_engine_demo --messages 1000
# 2. 在写入过程中 kill 进程
kill -9 $(pgrep log_engine_demo)
# 3. 恢复并验证
./build/log_engine_verify --path ./logs/shard-0.log
测试覆盖
| 场景 | 预期结果 |
|---|---|
| 正常写入 | 所有消息写入成功 |
| 写入中崩溃 | 从 checkpoint 恢复,最多丢失一个批次 |
| Checkpoint 损坏 | 从文件头恢复 |
| 日志尾部损坏 | 自动截断到有效位置 |
| Archive 损坏 | 跳过损坏文件 |
性能影响
Checkpoint 开销
| 配置 | 额外 I/O | 性能影响 |
|---|---|---|
| Checkpoint 关闭 | 0 | 0% |
| Checkpoint 60s | ~1KB/min | < 1% |
| Checkpoint 10s | ~1KB/10s | ~1% |
| Checkpoint 1s | ~1KB/s | ~5% |
结论:合理的 checkpoint 间隔(60s)对性能影响极小。
CRC 校验开销
| 校验模式 | CPU 开销 | 恢复时间 |
|---|---|---|
| 无校验 | 0 | 极快 |
| 尾部校验(100条) | 低 | 快 |
| 抽样校验(1%) | 中等 | 中等 |
| 全量校验 | 高 | 慢 |
推荐:尾部校验(100 条)
配置建议
高性能场景
config.checkpoint_enabled = false; // 关闭 checkpoint
config.ack_mode = AckMode::write_ack; // 快速确认
config.recovery_verify_mode = RecoveryVerifyMode::none;
平衡场景(推荐)
config.checkpoint_enabled = true; // 开启 checkpoint
config.checkpoint_interval_seconds = 60; // 每分钟一次
config.ack_mode = AckMode::write_ack; // 快速确认
config.recovery_verify_mode = RecoveryVerifyMode::tail_only; // 尾部校验
config.recovery_tail_check_count = 100; // 校验最后 100 条
高安全性场景
config.checkpoint_enabled = true; // 开启 checkpoint
config.checkpoint_interval_seconds = 10; // 每10秒一次
config.ack_mode = AckMode::sync_ack; // 强确认
config.recovery_verify_mode = RecoveryVerifyMode::full; // 全量校验
监控指标
恢复相关指标
seastar::metrics::group("log_engine_writer")
.make_counter("recovery_count", _recovery_count)
.make_gauge("recovery_time_ms", _recovery_time_ms)
.make_counter("corrupted_records", _corrupted_records)
.make_counter("checkpoint_creates", _checkpoint_creates)
.make_counter("checkpoint_failures", _checkpoint_failures);
告警规则
# 恢复时间过长
if recovery_time_ms > 5000:
alert("恢复时间过长,可能 checkpoint 文件损坏")
# 损坏记录数异常
if corrupted_records > 10:
alert("检测到多条损坏记录,可能磁盘故障")
# Checkpoint 创建失败
if checkpoint_failures > 3:
alert("Checkpoint 创建失败,可能磁盘空间不足")
总结
Seastar Log Engine 的一致性保证机制:
- Checkpoint:定期保存系统状态,加速恢复
- CRC 校验:检测数据损坏,保证数据完整性
- 灵活配置:根据场景选择合适的保证级别
- 自动恢复:进程重启后自动恢复到一致状态
最佳实践:
- 生产环境推荐:
write_ack+ 60s checkpoint + 尾部校验 - 关键业务场景:
sync_ack+ 10s checkpoint + 全量校验 - 定期验证恢复流程,确保数据安全
系列 1 完结
下一步:系列 2 《核心技术实现》
相关阅读: