数据一致性与恢复模型:Checkpoint 与 Crash Recovery 的实现

深入分析 Seastar Log Engine 的一致性保证机制,包括 Checkpoint 的设计原理、崩溃恢复流程以及数据完整性校验。

一致性保证的挑战

问题场景

在日志系统中,进程可能随时崩溃:

时间线:
T1: 写入消息 1
T2: 写入消息 2
T3: 进程崩溃
T4: 重启恢复

关键问题

  1. 消息 1 和 2 是否真正写入磁盘?
  2. 恢复时从哪里开始读取?
  3. 如何避免数据丢失或重复?

一致性级别

级别定义实现成本
Loose不保证
Soft重启后可恢复大部分
Strong重启后不丢失任何数据

Seastar Log Engine 提供 SoftStrong 之间的可配置级别。

Checkpoint 机制

Checkpoint 的作用

Checkpoint 是一个"检查点",记录系统在某个时刻的状态:

Active Log:
┌─────────────────────────────────────────────┐
│ Msg 1 │ Msg 2 │ Msg 3 │ Msg 4 │ Msg 5       │
│       │       │       │       │           │
└───────┴───────┴───────┴───────┴─────────────┘
         ↑                    ↑
      Checkpoint          写入位置
      (Msg 2 之前已持久化)

Checkpoint 结构

struct Checkpoint {
    uint64_t sequence_number;    // 最后一条已确认的序列号
    uint64_t file_offset;        // 对应的文件偏移量
    std::string shard_id;        // shard 标识
    std::chrono::system_clock::time_point timestamp;

    void serialize(std::ostream& os) const;
    static Checkpoint deserialize(std::istream& is);
};

Checkpoint 写入流程

seastar::future<> AsyncWriter::create_checkpoint() {
    // 1. 确保当前批次落盘
    co_await flush_batch();

    // 2. 获取当前状态
    Checkpoint checkpoint{
        .sequence_number = _last_confirmed_sequence,
        .file_offset = _append_writer.get_file_offset(),
        .shard_id = fmt::format("shard-{}", seastar::this_shard_id()),
        .timestamp = std::chrono::system_clock::now()
    };

    // 3. 写入 checkpoint 文件(原子操作)
    std::string checkpoint_path = fmt::format(
        "{}/shard-{}.checkpoint",
        _log_dir,
        seastar::this_shard_id()
    );

    std::vector<char> checkpoint_data;
    checkpoint.serialize_to(checkpoint_data);

    // 4. 先写入临时文件
    std::string temp_path = checkpoint_path + ".tmp";
    co_await write_file_atomically(temp_path, checkpoint_data);

    // 5. 重命名为正式文件(原子操作)
    co_await seastar::rename_file(temp_path, checkpoint_path);
}

定时 Checkpoint

seastar::future<> AsyncWriter::start(const EngineConfig& config) {
    // ...

    if (config.checkpoint_enabled) {
        _checkpoint_timer.arm_periodic(
            std::chrono::seconds(config.checkpoint_interval_seconds)
        );

        _checkpoint_timer.set_callback([this] {
            create_checkpoint();
        });
    }
}

配置建议

// 平衡性能和安全性
config.checkpoint_interval_seconds = 60;  // 每分钟一次

// 高安全性场景
config.checkpoint_interval_seconds = 10;  // 每10秒一次

// 高性能场景
config.checkpoint_interval_seconds = 300; // 每5分钟一次

崩溃恢复流程

恢复策略

┌─────────────────────────────────────────────┐
│  检查 checkpoint 文件是否存在?              │
└──────────┬──────────────────────────────────┘
           │
     ┌─────┴─────┐
     │           │
    存在        不存在
     │           │
     ↓           ↓
从 checkpoint  从文件头
  位置恢复     开始扫描

从 Checkpoint 恢复

seastar::future<> AsyncWriter::recovery_from_checkpoint() {
    std::string checkpoint_path = fmt::format(
        "{}/shard-{}.checkpoint",
        _log_dir,
        seastar::this_shard_id()
    );

    // 1. 读取 checkpoint
    Checkpoint checkpoint;
    try {
        auto data = co_await read_file(checkpoint_path);
        checkpoint = Checkpoint::deserialize(data);
    } catch (const std::exception& e) {
        // checkpoint 文件损坏,从头开始
        co_await recovery_from_scratch();
        co_return;
    }

    // 2. 验证 checkpoint 有效性
    if (!validate_checkpoint(checkpoint)) {
        co_await recovery_from_scratch();
        co_return;
    }

    // 3. 从 checkpoint 位置开始读取
    std::string log_path = fmt::format(
        "{}/shard-{}.log",
        _log_dir,
        seastar::this_shard_id()
    );

    auto records = co_await read_records_from(
        log_path,
        checkpoint.file_offset
    );

    // 4. 验证记录完整性
    for (const auto& record : records) {
        if (!verify_record_crc(record)) {
            // 检测到损坏记录,截断日志
            co_await truncate_log(log_path, record.offset);
            break;
        }
    }

    // 5. 恢复内部状态
    _last_confirmed_sequence = checkpoint.sequence_number;
    _file_offset = checkpoint.file_offset;
}

从文件头恢复

seastar::future<> AsyncWriter::recovery_from_scratch() {
    std::string log_path = fmt::format(
        "{}/shard-{}.log",
        _log_dir,
        seastar::this_shard_id()
    );

    // 1. 读取所有记录
    auto records = co_await read_all_records(log_path);

    // 2. 扫描并验证
    size_t last_valid_offset = 0;
    for (const auto& record : records) {
        if (verify_record_crc(record)) {
            last_valid_offset = record.offset;
        } else {
            // 检测到损坏记录,截断日志
            co_await truncate_log(log_path, last_valid_offset);
            break;
        }
    }

    // 3. 恢复内部状态
    _last_confirmed_sequence = records.back().sequence_number;
    _file_offset = last_valid_offset;
}

损坏尾部处理

seastar::future<> AsyncWriter::truncate_log(
    const std::string& log_path,
    size_t valid_offset
) {
    // 1. 打开文件
    auto file = co_await seastar::open_file_dma(
        log_path,
        seastar::open_flags::rw
    );

    // 2. 截断到有效位置
    co_await file.truncate(valid_offset);

    // 3. 关闭文件
    co_await file.close();

    // 4. 记录日志
    log_info("Truncated log at offset {}", valid_offset);
}

数据完整性校验

CRC 校验

每条记录都包含 CRC32 校验和:

class RecordCodec {
public:
    static void encode(const LogMessage& message, std::vector<char>& buffer) {
        // ... 编码数据 ...

        // 计算 CRC
        uint32_t crc = compute_crc32(buffer.data(), buffer.size());

        // 追加 CRC
        auto crc_bytes = reinterpret_cast<const char*>(&crc);
        buffer.insert(buffer.end(), crc_bytes, crc_bytes + sizeof(uint32_t));
    }

    static bool verify_crc(const std::vector<char>& buffer) {
        if (buffer.size() < sizeof(uint32_t)) {
            return false;
        }

        // 分离数据和 CRC
        size_t data_size = buffer.size() - sizeof(uint32_t);
        uint32_t expected_crc = *reinterpret_cast<const uint32_t*>(
            buffer.data() + data_size
        );

        // 重新计算 CRC
        uint32_t computed_crc = compute_crc32(buffer.data(), data_size);

        return expected_crc == computed_crc;
    }

private:
    static uint32_t compute_crc32(const char* data, size_t size) {
        boost::crc_32_type crc;
        crc.process_bytes(data, size);
        return crc.checksum();
    }
};

校验流程

读取记录
    ↓
提取数据部分
    ↓
计算 CRC32
    ↓
对比记录中的 CRC32
    ↓
一致 → 记录有效
不一致 → 记录损坏

校验优化策略

策略一:全量校验

// 恢复时全量校验
for (const auto& record : records) {
    if (!verify_record_crc(record)) {
        // 处理损坏记录
        break;
    }
}

优点:安全性最高

缺点:恢复时间长

策略二:抽样校验

// 每隔 100 条校验一次
for (size_t i = 0; i < records.size(); ++i) {
    if (i % 100 == 0) {
        if (!verify_record_crc(records[i])) {
            break;
        }
    }
}

优点:恢复速度快

缺点:可能漏掉损坏记录

策略三:尾部校验(推荐)

// 只校验最后几条记录
size_t check_count = std::min(records.size(), size_t(100));
for (size_t i = records.size() - check_count; i < records.size(); ++i) {
    if (!verify_record_crc(records[i])) {
        break;
    }
}

优点:恢复快 + 安全性高

配置

config.recovery_verify_mode = RecoveryVerifyMode::tail_only;
config.recovery_tail_check_count = 100;

Crash Consistency 语义

语义定义

不同配置下,崩溃后的保证不同:

配置崩溃后丢失崩溃后重复
write_ack + 无 checkpoint≤ 1 批次
write_ack + 有 checkpoint≤ checkpoint 间隔
sync_ack + 无 checkpoint0
sync_ack + 有 checkpoint0

示例

// 配置:write_ack + checkpoint_interval = 60s

时间线:
T0:   进程启动
T10:  Checkpoint A (seq=100)
T30:  写入消息 101-200
T40:  进程崩溃
T45:  进程恢复

结果:
- 消息 1-100 已持久化(Checkpoint A)
- 消息 101-200 可能丢失(最多 100 条)
- 没有重复消息

故障注入测试

测试场景

# 测试脚本:test_fault_injection.sh

# 1. 正常写入
./build/log_engine_demo --messages 1000

# 2. 在写入过程中 kill 进程
kill -9 $(pgrep log_engine_demo)

# 3. 恢复并验证
./build/log_engine_verify --path ./logs/shard-0.log

测试覆盖

场景预期结果
正常写入所有消息写入成功
写入中崩溃从 checkpoint 恢复,最多丢失一个批次
Checkpoint 损坏从文件头恢复
日志尾部损坏自动截断到有效位置
Archive 损坏跳过损坏文件

性能影响

Checkpoint 开销

配置额外 I/O性能影响
Checkpoint 关闭00%
Checkpoint 60s~1KB/min< 1%
Checkpoint 10s~1KB/10s~1%
Checkpoint 1s~1KB/s~5%

结论:合理的 checkpoint 间隔(60s)对性能影响极小。

CRC 校验开销

校验模式CPU 开销恢复时间
无校验0极快
尾部校验(100条)
抽样校验(1%)中等中等
全量校验

推荐:尾部校验(100 条)

配置建议

高性能场景

config.checkpoint_enabled = false;          // 关闭 checkpoint
config.ack_mode = AckMode::write_ack;       // 快速确认
config.recovery_verify_mode = RecoveryVerifyMode::none;

平衡场景(推荐)

config.checkpoint_enabled = true;                                    // 开启 checkpoint
config.checkpoint_interval_seconds = 60;                             // 每分钟一次
config.ack_mode = AckMode::write_ack;                                // 快速确认
config.recovery_verify_mode = RecoveryVerifyMode::tail_only;         // 尾部校验
config.recovery_tail_check_count = 100;                              // 校验最后 100 条

高安全性场景

config.checkpoint_enabled = true;                                    // 开启 checkpoint
config.checkpoint_interval_seconds = 10;                             // 每10秒一次
config.ack_mode = AckMode::sync_ack;                                 // 强确认
config.recovery_verify_mode = RecoveryVerifyMode::full;              // 全量校验

监控指标

恢复相关指标

seastar::metrics::group("log_engine_writer")
    .make_counter("recovery_count", _recovery_count)
    .make_gauge("recovery_time_ms", _recovery_time_ms)
    .make_counter("corrupted_records", _corrupted_records)
    .make_counter("checkpoint_creates", _checkpoint_creates)
    .make_counter("checkpoint_failures", _checkpoint_failures);

告警规则

# 恢复时间过长
if recovery_time_ms > 5000:
    alert("恢复时间过长,可能 checkpoint 文件损坏")

# 损坏记录数异常
if corrupted_records > 10:
    alert("检测到多条损坏记录,可能磁盘故障")

# Checkpoint 创建失败
if checkpoint_failures > 3:
    alert("Checkpoint 创建失败,可能磁盘空间不足")

总结

Seastar Log Engine 的一致性保证机制:

  1. Checkpoint:定期保存系统状态,加速恢复
  2. CRC 校验:检测数据损坏,保证数据完整性
  3. 灵活配置:根据场景选择合适的保证级别
  4. 自动恢复:进程重启后自动恢复到一致状态

最佳实践

  • 生产环境推荐:write_ack + 60s checkpoint + 尾部校验
  • 关键业务场景:sync_ack + 10s checkpoint + 全量校验
  • 定期验证恢复流程,确保数据安全

系列 1 完结

下一步:系列 2 《核心技术实现》

相关阅读