Checkpoint 与 Recovery:崩溃一致性保证的实现

深入分析 Checkpoint 的持久化机制和崩溃恢复流程,包括 Checkpoint 文件格式、恢复策略、数据验证等核心实现。

崩溃一致性的挑战

问题场景

在日志系统中,进程可能随时崩溃:

时间线:
T0:   进程启动
T10:  写入消息 1-1000
T20:  Checkpoint 写入中
T21:  进程崩溃 ← 关键点
T25:  进程恢复

关键问题

  1. Checkpoint 写入是原子操作吗?
  2. 恢复时如何选择正确的 Checkpoint?
  3. 如果 Checkpoint 损坏怎么办?
  4. 如何避免重复消费数据?

一致性保证的级别

级别保证实现成本性能影响
None无保证最低无影响
Soft大多数数据可恢复中等< 1%
Strong所有数据不丢失最高~5%

Seastar Log Engine 提供 SoftStrong 之间的可配置级别。

Checkpoint 设计

Checkpoint 的作用

Checkpoint 是系统状态的"快照",记录了某个时刻的关键信息:

Active Log:
┌──────────────────────────────────────────────────────────┐
│ Msg 1 │ Msg 2 │ Msg 3 │ Msg 4 │ Msg 5 │ Msg 6 │ Msg 7    │
│       │       │       │       │       │       │          │
└───────┴───────┴───────┴───────┴───────┴───────┴──────────┘
                              ↑
                         Checkpoint (seq=1000, offset=10KB)

Checkpoint 数据结构

struct CheckpointState {
    std::uint64_t sequence;           // 最后一条已确认的序列号
    std::uint64_t logical_size;       // 对应的逻辑大小
    std::uint64_t rotation_index;     // 轮换索引
    std::chrono::system_clock::time_point timestamp;

    bool is_valid() const {
        return sequence > 0 && logical_size > 0;
    }
};

Checkpoint 文件格式

采用简单的文本格式,便于手动检查和调试:

sequence=1000
logical_size=10240
rotation_index=5

文件命名

logs/shard-0.checkpoint    # shard 0 的 checkpoint
logs/shard-1.checkpoint    # shard 1 的 checkpoint

Checkpoint 写入机制

原子写入策略

为了保证原子性,使用"先写临时文件,再重命名"的策略:

seastar::future<> AsyncWriter::persist_checkpoint() {
    const auto shard_id = seastar::this_shard_id();
    const auto checkpoint_path = layout::checkpoint_path(_active_segment);
    const auto temp_path = checkpoint_path + ".tmp";

    // 1. 准备 checkpoint 数据
    CheckpointState checkpoint{
        .sequence = _sequence,
        .logical_size = _append_writer.get_logical_size(),
        .rotation_index = _rotation_index,
        .timestamp = std::chrono::system_clock::now()
    };

    // 2. 写入临时文件
    std::ofstream out(temp_path, std::ios::binary | std::ios::trunc);
    if (!out.good()) {
        throw std::runtime_error("failed to open checkpoint file: " + temp_path);
    }

    out << "sequence=" << checkpoint.sequence << "\n";
    out << "logical_size=" << checkpoint.logical_size << "\n";
    out << "rotation_index=" << checkpoint.rotation_index << "\n";

    if (!out.good()) {
        out.close();
        std::filesystem::remove(temp_path);
        throw std::runtime_error("failed to write checkpoint file: " + temp_path);
    }

    out.close();

    // 3. 原子重命名(rename 是原子操作)
    std::error_code ec;
    std::filesystem::rename(temp_path, checkpoint_path, ec);

    if (ec) {
        std::filesystem::remove(temp_path);
        throw std::runtime_error("failed to rename checkpoint file: " + ec.message());
    }

    // 4. 更新指标
    g_checkpoint_write_successes.fetch_add(1, std::memory_order_relaxed);
}

定时 Checkpoint

seastar::future<> AsyncWriter::start(EngineConfig config) {
    // ...

    if (config.checkpoint_enabled && config.checkpoint_interval_seconds > 0) {
        _checkpoint_timer.arm_periodic(
            std::chrono::seconds(config.checkpoint_interval_seconds)
        );

        _checkpoint_timer.set_callback([this] {
            (void)persist_checkpoint().handle_exception([](std::exception_ptr ep) {
                applog.warn("checkpoint write failed: {}", ep);
                g_checkpoint_write_failures.fetch_add(1, std::memory_order_relaxed);
            });
        });
    }
}

Checkpoint 写入时机

  1. 定时触发:每隔 checkpoint_interval_seconds
  2. Stop 时:关闭前最后一次 Checkpoint
  3. Rotate 时:日志轮换时

配置建议

// 平衡性能和安全性
config.checkpoint_interval_seconds = 60;  // 每分钟一次

// 高安全性场景
config.checkpoint_interval_seconds = 10;  // 每10秒一次

// 高性能场景
config.checkpoint_interval_seconds = 300; // 每5分钟一次

Checkpoint 读取与验证

Checkpoint 读取

std::optional<CheckpointState> read_checkpoint_file(
    const layout::SegmentDescriptor& active_segment
) {
    const auto path = layout::checkpoint_path(active_segment);

    // 1. 检查文件是否存在
    if (!std::filesystem::exists(path)) {
        return std::nullopt;
    }

    // 2. 打开文件
    std::ifstream in(path, std::ios::binary);
    if (!in.good()) {
        return std::nullopt;
    }

    // 3. 解析 checkpoint
    CheckpointState checkpoint;
    bool saw_logical_size = false;
    bool saw_sequence = false;
    bool saw_rotation_index = false;

    std::string line;
    while (std::getline(in, line)) {
        const auto pos = line.find('=');
        if (pos == std::string::npos) {
            continue;
        }

        const auto key = line.substr(0, pos);
        const auto value = line.substr(pos + 1);

        std::uint64_t parsed = 0;
        const auto result = std::from_chars(
            value.data(),
            value.data() + value.size(),
            parsed,
            10
        );

        if (result.ec != std::errc{} || result.ptr != value.data() + value.size()) {
            continue;
        }

        if (key == "logical_size") {
            checkpoint.logical_size = parsed;
            saw_logical_size = true;
        } else if (key == "sequence") {
            checkpoint.sequence = parsed;
            saw_sequence = true;
        } else if (key == "rotation_index") {
            checkpoint.rotation_index = parsed;
            saw_rotation_index = true;
        }
    }

    // 4. 验证完整性
    if (!saw_logical_size || !saw_sequence || !saw_rotation_index) {
        return std::nullopt;
    }

    return checkpoint;
}

Checkpoint 验证

bool validate_checkpoint(
    const CheckpointState& checkpoint,
    const layout::SegmentDescriptor& active_segment
) {
    // 1. 基本验证
    if (!checkpoint.is_valid()) {
        return false;
    }

    // 2. 检查 checkpoint 是否过期
    auto now = std::chrono::system_clock::now();
    auto age = std::chrono::duration_cast<std::chrono::seconds>(
        now - checkpoint.timestamp
    ).count();

    if (age > config.checkpoint_max_age_seconds) {
        mgrlog.warn("checkpoint is too old: {} seconds", age);
        return false;
    }

    // 3. 检查 sequence 是否在合理范围内
    if (checkpoint.sequence > config.max_sequence) {
        mgrlog.warn("checkpoint sequence is invalid: {}", checkpoint.sequence);
        return false;
    }

    return true;
}

崩溃恢复流程

恢复策略

┌─────────────────────────────────────────────┐
│  检查 checkpoint 文件是否存在?              │
└──────────┬──────────────────────────────────┘
           │
     ┌─────┴─────┐
     │           │
    存在        不存在
     │           │
     ↓           ↓
验证有效性    从文件头
     │           开始扫描
     ↓           ↓
   有效?      验证记录
     │           │
  ┌──┴──┐        │
  │     │        ↓
 有效  损坏     完整?
  │     │        │
  ↓     ↓     ┌──┴──┐
恢复  从头   │     │
      开始   ├─────┤
         否  完整  损坏
              │     │
              ↓     ↓
            恢复  截断

从 Checkpoint 恢复

seastar::future<> AsyncWriter::recover_from_checkpoint() {
    const auto shard_id = seastar::this_shard_id();

    // 1. 读取 checkpoint
    auto checkpoint_opt = read_checkpoint_file(_active_segment);

    if (!checkpoint_opt) {
        g_recovery_fallbacks.fetch_add(1, std::memory_order_relaxed);
        g_last_recovery_fallback_reason.store(
            static_cast<int>(RecoveryFallbackReason::no_checkpoint_file),
            std::memory_order_relaxed
        );
        co_await recover_from_scratch();
        co_return;
    }

    auto& checkpoint = checkpoint_opt.value();

    // 2. 验证 checkpoint
    if (!validate_checkpoint(checkpoint, _active_segment)) {
        g_recovery_fallbacks.fetch_add(1, std::memory_order_relaxed);
        g_last_recovery_fallback_reason.store(
            static_cast<int>(RecoveryFallbackReason::incomplete_checkpoint),
            std::memory_order_relaxed
        );
        co_await recover_from_scratch();
        co_return;
    }

    // 3. 从 checkpoint 位置开始扫描
    auto verified = scan_log_file_streaming(
        _active_segment.path,
        checkpoint.logical_size,
        config.recovery_trailing_capacity
    );

    // 4. 检查扫描结果
    if (!verified.verified.is_valid()) {
        // 检查到损坏,截断日志
        co_await truncate_to_valid_state(verified);
        co_await persist_checkpoint();  // 更新 checkpoint
    } else {
        // 恢复成功
        _sequence = checkpoint.sequence + verified.verified.records_verified;
    }
}

从文件头恢复

seastar::future<> AsyncWriter::recover_from_scratch() {
    // 1. 扫描整个日志文件
    auto verified = scan_log_file_streaming(
        _active_segment.path,
        0,  // 从文件头开始
        config.recovery_trailing_capacity
    );

    // 2. 验证扫描结果
    if (!verified.verified.is_valid()) {
        // 检测到损坏记录,截断日志
        co_await truncate_to_valid_state(verified);
    }

    // 3. 恢复内部状态
    _sequence = verified.verified.records_verified;
    _rotation_index = verified.verified.last_rotation_index;

    // 4. 写入新的 checkpoint
    if (config.checkpoint_enabled) {
        co_await persist_checkpoint();
    }
}

流式扫描实现

StreamedVerifiedLogState scan_log_file_streaming(
    const std::string& path,
    std::size_t start_offset,
    std::size_t trailing_capacity
) {
    std::ifstream in(path, std::ios::binary);
    if (!in.is_open()) {
        throw std::runtime_error("failed to open log file for recovery: " + path);
    }

    StreamedVerifiedLogState state;
    std::array<char, 64 * 1024> buffer{};
    std::string pending_line;
    pending_line.reserve(4096);

    // 1. 跳到起始位置
    in.seekg(start_offset);

    // 2. 流式读取和验证
    while (in.good()) {
        in.read(buffer.data(), buffer.size());
        size_t bytes_read = in.gcount();

        if (bytes_read == 0) {
            break;
        }

        // 3. 验证记录
        for (size_t i = 0; i < bytes_read; ++i) {
            pending_line += buffer[i];

            // 检查是否读取到完整记录
            auto record_opt = try_parse_record(pending_line);
            if (record_opt) {
                auto& record = record_opt.value();

                // 验证 CRC
                if (verify_record_crc(record)) {
                    state.verified.records_verified++;
                    state.verified.last_valid_offset = start_offset + i + 1;
                } else {
                    // CRC 失败,检测到损坏
                    state.trailing_bytes = pending_line;
                    return state;
                }

                pending_line.clear();
            }
        }

        start_offset += bytes_read;
    }

    return state;
}

数据损坏处理

损坏检测

bool verify_record_crc(const ParsedRecord& record) {
    // 1. 提取数据和 CRC
    const char* data = record.data.data();
    size_t data_size = record.data.size() - sizeof(uint32_t);

    const uint32_t expected_crc = *reinterpret_cast<const uint32_t*>(
        data + data_size
    );

    // 2. 计算 CRC
    uint32_t computed_crc = crc32_update(0, std::string_view(data, data_size));

    // 3. 对比
    return expected_crc == computed_crc;
}

日志截断

seastar::future<> AsyncWriter::truncate_to_valid_state(
    const StreamedVerifiedLogState& state
) {
    // 1. 计算截断位置
    size_t truncate_offset = state.verified.last_valid_offset;

    if (truncate_offset == 0) {
        // 整个文件损坏,删除文件
        std::filesystem::remove(_active_segment.path);
        co_return;
    }

    // 2. 打开文件
    auto file = co_await seastar::open_file_dma(
        _active_segment.path,
        seastar::open_flags::rw
    );

    // 3. 截断到有效位置
    co_await file.truncate(truncate_offset);

    // 4. 关闭文件
    co_await file.close();

    // 5. 记录日志
    mgrlog.warn(
        "truncated log file {} at offset {} due to corruption",
        _active_segment.path,
        truncate_offset
    );
}

故障注入测试

测试场景

# 1. 正常写入
./build/log_engine_demo --messages 1000

# 2. 在 checkpoint 写入时 kill 进程
while true; do
    ./build/log_engine_demo --messages 1000 &
    sleep 1
    kill -9 $(pgrep log_engine_demo)
done

# 3. 验证恢复
./build/log_engine_verify --path ./logs/shard-0.log

测试覆盖

场景预期行为
正常崩溃恢复从 checkpoint 恢复
Checkpoint 文件损坏从文件头恢复
Checkpoint 文件缺失从文件头恢复
日志尾部损坏自动截断到有效位置
日志头部损坏整个文件损坏,删除重建

性能影响分析

Checkpoint 开销

配置额外 I/O性能影响恢复速度
Checkpoint 关闭00%慢(全量扫描)
Checkpoint 60s~1KB/分钟< 1%快(增量扫描)
Checkpoint 10s~1KB/10秒~1%很快(小增量)

结论:合理的 checkpoint 间隔(60s)对性能影响极小,但能显著提升恢复速度。

恢复时间对比

日志大小有 Checkpoint无 Checkpoint加速比
100MB~1s~10s10x
1GB~5s~100s20x
10GB~30s~1000s33x

配置建议

高性能场景

config.checkpoint_enabled = false;
config.ack_mode = AckMode::write_ack;

特性

  • 无额外 I/O 开销
  • 崩溃后从头扫描,恢复慢

平衡场景(推荐)

config.checkpoint_enabled = true;
config.checkpoint_interval_seconds = 60;
config.checkpoint_max_age_seconds = 3600;
config.ack_mode = AckMode::write_ack;
config.recovery_trailing_capacity = 1024;

特性

  • 性能影响 < 1%
  • 恢复速度快
  • 平衡性能和安全性

高安全性场景

config.checkpoint_enabled = true;
config.checkpoint_interval_seconds = 10;
config.checkpoint_max_age_seconds = 600;
config.ack_mode = AckMode::sync_ack;
config.recovery_trailing_capacity = 4096;

特性

  • Checkpoint 频繁,数据丢失少
  • 强确认语义,保证落盘
  • 性能影响 ~5%

监控指标

Checkpoint 指标

namespace sm = seastar::metrics;

sm::group("log_engine")
    .make_counter("checkpoint_write_successes", g_checkpoint_write_successes)
    .make_counter("checkpoint_write_failures", g_checkpoint_write_failures)
    .make_counter("recovery_fallbacks", g_recovery_fallbacks)
    .make_gauge("last_recovery_fallback_reason", g_last_recovery_fallback_reason);

告警规则

# Checkpoint 写入失败率过高
if checkpoint_write_failures / checkpoint_write_successes > 0.01:
    alert("Checkpoint 写入失败率过高,可能磁盘问题")

# 频繁回退到从头恢复
if recovery_fallbacks > 10 in last_hour:
    alert("频繁回退到从头恢复,可能 checkpoint 损坏")

# 最后一次回退原因是 Checkpoint 损坏
if last_recovery_fallback_reason == incomplete_checkpoint:
    alert("Checkpoint 文件损坏或缺失")

总结

Checkpoint 与 Recovery 机制通过以下方式保证数据一致性:

  1. 原子写入:使用临时文件 + 重命名保证原子性
  2. 灵活恢复:支持从 checkpoint 或文件头恢复
  3. 数据验证:CRC 校验检测损坏数据
  4. 自动修复:自动截断损坏的日志尾部
  5. 性能优化:流式扫描,内存占用小

最佳实践

  • 生产环境推荐:60s checkpoint + write_ack
  • 关键业务:10s checkpoint + sync_ack
  • 定期验证恢复流程,确保数据安全
  • 监控 checkpoint 写入失败率,及时发现磁盘问题

下一篇:《Rotate 与 Archive:日志生命周期管理》

相关阅读