Checkpoint 与 Recovery:崩溃一致性保证的实现
深入分析 Checkpoint 的持久化机制和崩溃恢复流程,包括 Checkpoint 文件格式、恢复策略、数据验证等核心实现。
崩溃一致性的挑战
问题场景
在日志系统中,进程可能随时崩溃:
时间线:
T0: 进程启动
T10: 写入消息 1-1000
T20: Checkpoint 写入中
T21: 进程崩溃 ← 关键点
T25: 进程恢复
关键问题:
- Checkpoint 写入是原子操作吗?
- 恢复时如何选择正确的 Checkpoint?
- 如果 Checkpoint 损坏怎么办?
- 如何避免重复消费数据?
一致性保证的级别
| 级别 | 保证 | 实现成本 | 性能影响 |
|---|---|---|---|
| None | 无保证 | 最低 | 无影响 |
| Soft | 大多数数据可恢复 | 中等 | < 1% |
| Strong | 所有数据不丢失 | 最高 | ~5% |
Seastar Log Engine 提供 Soft 到 Strong 之间的可配置级别。
Checkpoint 设计
Checkpoint 的作用
Checkpoint 是系统状态的"快照",记录了某个时刻的关键信息:
Active Log:
┌──────────────────────────────────────────────────────────┐
│ Msg 1 │ Msg 2 │ Msg 3 │ Msg 4 │ Msg 5 │ Msg 6 │ Msg 7 │
│ │ │ │ │ │ │ │
└───────┴───────┴───────┴───────┴───────┴───────┴──────────┘
↑
Checkpoint (seq=1000, offset=10KB)
Checkpoint 数据结构
struct CheckpointState {
std::uint64_t sequence; // 最后一条已确认的序列号
std::uint64_t logical_size; // 对应的逻辑大小
std::uint64_t rotation_index; // 轮换索引
std::chrono::system_clock::time_point timestamp;
bool is_valid() const {
return sequence > 0 && logical_size > 0;
}
};
Checkpoint 文件格式
采用简单的文本格式,便于手动检查和调试:
sequence=1000
logical_size=10240
rotation_index=5
文件命名:
logs/shard-0.checkpoint # shard 0 的 checkpoint
logs/shard-1.checkpoint # shard 1 的 checkpoint
Checkpoint 写入机制
原子写入策略
为了保证原子性,使用"先写临时文件,再重命名"的策略:
seastar::future<> AsyncWriter::persist_checkpoint() {
const auto shard_id = seastar::this_shard_id();
const auto checkpoint_path = layout::checkpoint_path(_active_segment);
const auto temp_path = checkpoint_path + ".tmp";
// 1. 准备 checkpoint 数据
CheckpointState checkpoint{
.sequence = _sequence,
.logical_size = _append_writer.get_logical_size(),
.rotation_index = _rotation_index,
.timestamp = std::chrono::system_clock::now()
};
// 2. 写入临时文件
std::ofstream out(temp_path, std::ios::binary | std::ios::trunc);
if (!out.good()) {
throw std::runtime_error("failed to open checkpoint file: " + temp_path);
}
out << "sequence=" << checkpoint.sequence << "\n";
out << "logical_size=" << checkpoint.logical_size << "\n";
out << "rotation_index=" << checkpoint.rotation_index << "\n";
if (!out.good()) {
out.close();
std::filesystem::remove(temp_path);
throw std::runtime_error("failed to write checkpoint file: " + temp_path);
}
out.close();
// 3. 原子重命名(rename 是原子操作)
std::error_code ec;
std::filesystem::rename(temp_path, checkpoint_path, ec);
if (ec) {
std::filesystem::remove(temp_path);
throw std::runtime_error("failed to rename checkpoint file: " + ec.message());
}
// 4. 更新指标
g_checkpoint_write_successes.fetch_add(1, std::memory_order_relaxed);
}
定时 Checkpoint
seastar::future<> AsyncWriter::start(EngineConfig config) {
// ...
if (config.checkpoint_enabled && config.checkpoint_interval_seconds > 0) {
_checkpoint_timer.arm_periodic(
std::chrono::seconds(config.checkpoint_interval_seconds)
);
_checkpoint_timer.set_callback([this] {
(void)persist_checkpoint().handle_exception([](std::exception_ptr ep) {
applog.warn("checkpoint write failed: {}", ep);
g_checkpoint_write_failures.fetch_add(1, std::memory_order_relaxed);
});
});
}
}
Checkpoint 写入时机
- 定时触发:每隔
checkpoint_interval_seconds秒 - Stop 时:关闭前最后一次 Checkpoint
- Rotate 时:日志轮换时
配置建议:
// 平衡性能和安全性
config.checkpoint_interval_seconds = 60; // 每分钟一次
// 高安全性场景
config.checkpoint_interval_seconds = 10; // 每10秒一次
// 高性能场景
config.checkpoint_interval_seconds = 300; // 每5分钟一次
Checkpoint 读取与验证
Checkpoint 读取
std::optional<CheckpointState> read_checkpoint_file(
const layout::SegmentDescriptor& active_segment
) {
const auto path = layout::checkpoint_path(active_segment);
// 1. 检查文件是否存在
if (!std::filesystem::exists(path)) {
return std::nullopt;
}
// 2. 打开文件
std::ifstream in(path, std::ios::binary);
if (!in.good()) {
return std::nullopt;
}
// 3. 解析 checkpoint
CheckpointState checkpoint;
bool saw_logical_size = false;
bool saw_sequence = false;
bool saw_rotation_index = false;
std::string line;
while (std::getline(in, line)) {
const auto pos = line.find('=');
if (pos == std::string::npos) {
continue;
}
const auto key = line.substr(0, pos);
const auto value = line.substr(pos + 1);
std::uint64_t parsed = 0;
const auto result = std::from_chars(
value.data(),
value.data() + value.size(),
parsed,
10
);
if (result.ec != std::errc{} || result.ptr != value.data() + value.size()) {
continue;
}
if (key == "logical_size") {
checkpoint.logical_size = parsed;
saw_logical_size = true;
} else if (key == "sequence") {
checkpoint.sequence = parsed;
saw_sequence = true;
} else if (key == "rotation_index") {
checkpoint.rotation_index = parsed;
saw_rotation_index = true;
}
}
// 4. 验证完整性
if (!saw_logical_size || !saw_sequence || !saw_rotation_index) {
return std::nullopt;
}
return checkpoint;
}
Checkpoint 验证
bool validate_checkpoint(
const CheckpointState& checkpoint,
const layout::SegmentDescriptor& active_segment
) {
// 1. 基本验证
if (!checkpoint.is_valid()) {
return false;
}
// 2. 检查 checkpoint 是否过期
auto now = std::chrono::system_clock::now();
auto age = std::chrono::duration_cast<std::chrono::seconds>(
now - checkpoint.timestamp
).count();
if (age > config.checkpoint_max_age_seconds) {
mgrlog.warn("checkpoint is too old: {} seconds", age);
return false;
}
// 3. 检查 sequence 是否在合理范围内
if (checkpoint.sequence > config.max_sequence) {
mgrlog.warn("checkpoint sequence is invalid: {}", checkpoint.sequence);
return false;
}
return true;
}
崩溃恢复流程
恢复策略
┌─────────────────────────────────────────────┐
│ 检查 checkpoint 文件是否存在? │
└──────────┬──────────────────────────────────┘
│
┌─────┴─────┐
│ │
存在 不存在
│ │
↓ ↓
验证有效性 从文件头
│ 开始扫描
↓ ↓
有效? 验证记录
│ │
┌──┴──┐ │
│ │ ↓
有效 损坏 完整?
│ │ │
↓ ↓ ┌──┴──┐
恢复 从头 │ │
开始 ├─────┤
否 完整 损坏
│ │
↓ ↓
恢复 截断
从 Checkpoint 恢复
seastar::future<> AsyncWriter::recover_from_checkpoint() {
const auto shard_id = seastar::this_shard_id();
// 1. 读取 checkpoint
auto checkpoint_opt = read_checkpoint_file(_active_segment);
if (!checkpoint_opt) {
g_recovery_fallbacks.fetch_add(1, std::memory_order_relaxed);
g_last_recovery_fallback_reason.store(
static_cast<int>(RecoveryFallbackReason::no_checkpoint_file),
std::memory_order_relaxed
);
co_await recover_from_scratch();
co_return;
}
auto& checkpoint = checkpoint_opt.value();
// 2. 验证 checkpoint
if (!validate_checkpoint(checkpoint, _active_segment)) {
g_recovery_fallbacks.fetch_add(1, std::memory_order_relaxed);
g_last_recovery_fallback_reason.store(
static_cast<int>(RecoveryFallbackReason::incomplete_checkpoint),
std::memory_order_relaxed
);
co_await recover_from_scratch();
co_return;
}
// 3. 从 checkpoint 位置开始扫描
auto verified = scan_log_file_streaming(
_active_segment.path,
checkpoint.logical_size,
config.recovery_trailing_capacity
);
// 4. 检查扫描结果
if (!verified.verified.is_valid()) {
// 检查到损坏,截断日志
co_await truncate_to_valid_state(verified);
co_await persist_checkpoint(); // 更新 checkpoint
} else {
// 恢复成功
_sequence = checkpoint.sequence + verified.verified.records_verified;
}
}
从文件头恢复
seastar::future<> AsyncWriter::recover_from_scratch() {
// 1. 扫描整个日志文件
auto verified = scan_log_file_streaming(
_active_segment.path,
0, // 从文件头开始
config.recovery_trailing_capacity
);
// 2. 验证扫描结果
if (!verified.verified.is_valid()) {
// 检测到损坏记录,截断日志
co_await truncate_to_valid_state(verified);
}
// 3. 恢复内部状态
_sequence = verified.verified.records_verified;
_rotation_index = verified.verified.last_rotation_index;
// 4. 写入新的 checkpoint
if (config.checkpoint_enabled) {
co_await persist_checkpoint();
}
}
流式扫描实现
StreamedVerifiedLogState scan_log_file_streaming(
const std::string& path,
std::size_t start_offset,
std::size_t trailing_capacity
) {
std::ifstream in(path, std::ios::binary);
if (!in.is_open()) {
throw std::runtime_error("failed to open log file for recovery: " + path);
}
StreamedVerifiedLogState state;
std::array<char, 64 * 1024> buffer{};
std::string pending_line;
pending_line.reserve(4096);
// 1. 跳到起始位置
in.seekg(start_offset);
// 2. 流式读取和验证
while (in.good()) {
in.read(buffer.data(), buffer.size());
size_t bytes_read = in.gcount();
if (bytes_read == 0) {
break;
}
// 3. 验证记录
for (size_t i = 0; i < bytes_read; ++i) {
pending_line += buffer[i];
// 检查是否读取到完整记录
auto record_opt = try_parse_record(pending_line);
if (record_opt) {
auto& record = record_opt.value();
// 验证 CRC
if (verify_record_crc(record)) {
state.verified.records_verified++;
state.verified.last_valid_offset = start_offset + i + 1;
} else {
// CRC 失败,检测到损坏
state.trailing_bytes = pending_line;
return state;
}
pending_line.clear();
}
}
start_offset += bytes_read;
}
return state;
}
数据损坏处理
损坏检测
bool verify_record_crc(const ParsedRecord& record) {
// 1. 提取数据和 CRC
const char* data = record.data.data();
size_t data_size = record.data.size() - sizeof(uint32_t);
const uint32_t expected_crc = *reinterpret_cast<const uint32_t*>(
data + data_size
);
// 2. 计算 CRC
uint32_t computed_crc = crc32_update(0, std::string_view(data, data_size));
// 3. 对比
return expected_crc == computed_crc;
}
日志截断
seastar::future<> AsyncWriter::truncate_to_valid_state(
const StreamedVerifiedLogState& state
) {
// 1. 计算截断位置
size_t truncate_offset = state.verified.last_valid_offset;
if (truncate_offset == 0) {
// 整个文件损坏,删除文件
std::filesystem::remove(_active_segment.path);
co_return;
}
// 2. 打开文件
auto file = co_await seastar::open_file_dma(
_active_segment.path,
seastar::open_flags::rw
);
// 3. 截断到有效位置
co_await file.truncate(truncate_offset);
// 4. 关闭文件
co_await file.close();
// 5. 记录日志
mgrlog.warn(
"truncated log file {} at offset {} due to corruption",
_active_segment.path,
truncate_offset
);
}
故障注入测试
测试场景
# 1. 正常写入
./build/log_engine_demo --messages 1000
# 2. 在 checkpoint 写入时 kill 进程
while true; do
./build/log_engine_demo --messages 1000 &
sleep 1
kill -9 $(pgrep log_engine_demo)
done
# 3. 验证恢复
./build/log_engine_verify --path ./logs/shard-0.log
测试覆盖
| 场景 | 预期行为 |
|---|---|
| 正常崩溃恢复 | 从 checkpoint 恢复 |
| Checkpoint 文件损坏 | 从文件头恢复 |
| Checkpoint 文件缺失 | 从文件头恢复 |
| 日志尾部损坏 | 自动截断到有效位置 |
| 日志头部损坏 | 整个文件损坏,删除重建 |
性能影响分析
Checkpoint 开销
| 配置 | 额外 I/O | 性能影响 | 恢复速度 |
|---|---|---|---|
| Checkpoint 关闭 | 0 | 0% | 慢(全量扫描) |
| Checkpoint 60s | ~1KB/分钟 | < 1% | 快(增量扫描) |
| Checkpoint 10s | ~1KB/10秒 | ~1% | 很快(小增量) |
结论:合理的 checkpoint 间隔(60s)对性能影响极小,但能显著提升恢复速度。
恢复时间对比
| 日志大小 | 有 Checkpoint | 无 Checkpoint | 加速比 |
|---|---|---|---|
| 100MB | ~1s | ~10s | 10x |
| 1GB | ~5s | ~100s | 20x |
| 10GB | ~30s | ~1000s | 33x |
配置建议
高性能场景
config.checkpoint_enabled = false;
config.ack_mode = AckMode::write_ack;
特性:
- 无额外 I/O 开销
- 崩溃后从头扫描,恢复慢
平衡场景(推荐)
config.checkpoint_enabled = true;
config.checkpoint_interval_seconds = 60;
config.checkpoint_max_age_seconds = 3600;
config.ack_mode = AckMode::write_ack;
config.recovery_trailing_capacity = 1024;
特性:
- 性能影响 < 1%
- 恢复速度快
- 平衡性能和安全性
高安全性场景
config.checkpoint_enabled = true;
config.checkpoint_interval_seconds = 10;
config.checkpoint_max_age_seconds = 600;
config.ack_mode = AckMode::sync_ack;
config.recovery_trailing_capacity = 4096;
特性:
- Checkpoint 频繁,数据丢失少
- 强确认语义,保证落盘
- 性能影响 ~5%
监控指标
Checkpoint 指标
namespace sm = seastar::metrics;
sm::group("log_engine")
.make_counter("checkpoint_write_successes", g_checkpoint_write_successes)
.make_counter("checkpoint_write_failures", g_checkpoint_write_failures)
.make_counter("recovery_fallbacks", g_recovery_fallbacks)
.make_gauge("last_recovery_fallback_reason", g_last_recovery_fallback_reason);
告警规则
# Checkpoint 写入失败率过高
if checkpoint_write_failures / checkpoint_write_successes > 0.01:
alert("Checkpoint 写入失败率过高,可能磁盘问题")
# 频繁回退到从头恢复
if recovery_fallbacks > 10 in last_hour:
alert("频繁回退到从头恢复,可能 checkpoint 损坏")
# 最后一次回退原因是 Checkpoint 损坏
if last_recovery_fallback_reason == incomplete_checkpoint:
alert("Checkpoint 文件损坏或缺失")
总结
Checkpoint 与 Recovery 机制通过以下方式保证数据一致性:
- 原子写入:使用临时文件 + 重命名保证原子性
- 灵活恢复:支持从 checkpoint 或文件头恢复
- 数据验证:CRC 校验检测损坏数据
- 自动修复:自动截断损坏的日志尾部
- 性能优化:流式扫描,内存占用小
最佳实践:
- 生产环境推荐:60s checkpoint + write_ack
- 关键业务:10s checkpoint + sync_ack
- 定期验证恢复流程,确保数据安全
- 监控 checkpoint 写入失败率,及时发现磁盘问题
下一篇:《Rotate 与 Archive:日志生命周期管理》
相关阅读: