异步批量写入模型:深入 Per-shard Writer 的设计

深入分析 AsyncWriter 的异步批量写入机制,包括批处理触发策略、水位控制、确认语义等核心实现。

写入模型演进

问题场景

在高并发场景下,如果每次日志调用都触发磁盘 I/O:

// 错误的写入方式
for (int i = 0; i < 1000000; ++i) {
    LOG(INFO) << "Message " << i;
    // 每次调用都触发磁盘写入,性能极差
}

性能瓶颈

  1. 系统调用开销:每次 write() 都需要进入内核态
  2. 磁盘寻址成本:磁头移动延迟(HDD)或 I/O 队列延迟(SSD)
  3. 上下文切换:从用户态到内核态的切换

解决方案:异步批量写入

将多个小消息聚合为一个大批次,一次性写入:

// 批量写入方式
std::vector<LogMessage> batch;
for (int i = 0; i < 1000000; ++i) {
    batch.push_back({"Message " + std::to_string(i)});
    if (batch.size() >= 1000) {
        co_await writer.submit_batch(batch);
        batch.clear();
    }
}

性能提升

  • 减少系统调用次数(1M 次变为 1K 次)
  • 提高磁盘顺序写入效率
  • 减少 context switch

AsyncWriter 核心设计

数据结构

class AsyncWriter {
private:
    // 待写入队列
    std::deque<LogMessage> _pending_queue;

    // 内存水位
    size_t _pending_bytes = 0;
    size_t _max_pending_bytes = 100 * 1024 * 1024;  // 100MB
    size_t _pending_bytes_low_watermark = 50 * 1024 * 1024;  // 50MB

    // 批量配置
    size_t _batch_size_bytes = 8192;  // 8KB
    uint64_t _flush_interval_ms = 1;  // 1ms

    // 底层组件
    AppendWriter _append_writer;
    seastar::timer<> _flush_timer;

    // 确认语义
    AckMode _ack_mode = AckMode::write_ack;

    // 指标
    std::atomic<uint64_t> _submitted_messages{0};
    std::atomic<uint64_t> _submitted_bytes{0};
    std::atomic<uint64_t> _backpressure_waits{0};
};

状态流转

┌─────────────┐
│   Idle      │
└──────┬──────┘
       │ submit()
       ↓
┌─────────────┐
│  Accumulate │  ← 消息累积
└──────┬──────┘
       │ batch size or flush interval
       ↓
┌─────────────┐
│  Flushing   │  ← 写入磁盘
└──────┬──────┘
       │ 完成
       ↓
┌─────────────┐
│   Idle      │
└─────────────┘

批量触发策略

策略一:按大小触发

seastar::future<> AsyncWriter::submit(LogMessage message) {
    // 加入队列
    _pending_queue.push_back(std::move(message));
    _pending_bytes += message.payload.size();

    // 检查是否达到批量大小阈值
    if (_pending_bytes >= _batch_size_bytes) {
        co_await flush_batch();
    }
}

触发时机:累积到 _batch_size_bytes 字节时触发

优点:保证批量大小,最大化 I/O 效率

缺点:低流量时延迟较高

策略二:按时间触发

seastar::future<> AsyncWriter::start(const EngineConfig& config) {
    // 启动定时器
    _flush_timer.arm_periodic(
        std::chrono::milliseconds(config.flush_interval_ms)
    );

    _flush_timer.set_callback([this] {
        if (!_pending_queue.empty()) {
            flush_batch();
        }
    });
}

触发时机:每隔 _flush_interval_ms 毫秒触发一次

优点:保证延迟上限

缺点:高流量时可能产生小批次

策略三:组合触发(推荐)

同时使用大小和时间触发:

seastar::future<> AsyncWriter::submit(LogMessage message) {
    _pending_queue.push_back(std::move(message));
    _pending_bytes += message.payload.size();

    // 按大小触发
    if (_pending_bytes >= _batch_size_bytes) {
        co_await flush_batch();
    }
    // 时间触发由定时器处理
}

配置建议

场景batch_size_bytesflush_interval_ms
高吞吐64KB10ms
低延迟4KB1ms
平衡8KB1ms

批量写入实现

flush_batch 流程

seastar::future<> AsyncWriter::flush_batch() {
    if (_pending_queue.empty()) {
        co_return;
    }

    // 1. 移动队列内容
    auto batch = std::exchange(_pending_queue, {});
    auto batch_bytes = std::exchange(_pending_bytes, 0);

    // 2. 编码为二进制格式
    std::vector<char> encoded_buffer;
    encoded_buffer.reserve(batch_bytes);

    for (const auto& message : batch) {
        RecordCodec::encode(message, encoded_buffer);
    }

    // 3. 写入磁盘
    co_await _append_writer.append(encoded_buffer);

    // 4. 根据 ack_mode 决定是否 fsync
    if (_ack_mode == AckMode::sync_ack) {
        co_await _append_writer.flush();
    }

    // 5. 更新指标
    _flushed_batches++;
    _flushed_bytes += encoded_buffer.size();
}

RecordCodec 编码

class RecordCodec {
public:
    static void encode(const LogMessage& message, std::vector<char>& buffer) {
        // 格式:[length][header][payload][crc]

        // 1. 编码 header
        RecordHeader header{
            .timestamp = std::chrono::system_clock::now(),
            .route_key = message.route_key,
            .payload_size = message.payload.size()
        };

        std::vector<char> header_buf;
        encode_header(header, header_buf);

        // 2. 计算总长度
        uint32_t total_length =
            sizeof(uint32_t) +           // length 字段
            header_buf.size() +          // header
            message.payload.size() +     // payload
            sizeof(uint32_t);            // crc

        // 3. 写入 length
        auto length_bytes = reinterpret_cast<const char*>(&total_length);
        buffer.insert(buffer.end(), length_bytes, length_bytes + sizeof(uint32_t));

        // 4. 写入 header
        buffer.insert(buffer.end(), header_buf.begin(), header_buf.end());

        // 5. 写入 payload
        buffer.insert(buffer.end(), message.payload.begin(), message.payload.end());

        // 6. 计算 CRC
        uint32_t crc = compute_crc32(buffer.data(), buffer.size());
        auto crc_bytes = reinterpret_cast<const char*>(&crc);
        buffer.insert(buffer.end(), crc_bytes, crc_bytes + sizeof(uint32_t));
    }
};

编码格式

┌──────────┬──────────┬──────────┬──────────┐
│ Length   │ Header   │ Payload  │ CRC      │
│ 4 bytes  │ N bytes  │ M bytes  │ 4 bytes  │
└──────────┴──────────┴──────────┴──────────┘

水位控制机制

背压(Backpressure)问题

当写入速度 > 磁盘速度时,内存会无限增长:

生产者: 10MB/s
消费者: 8MB/s
结果: 内存每秒增长 2MB

水位控制策略

seastar::future<> AsyncWriter::submit(LogMessage message) {
    // 高水位检查
    while (_pending_bytes >= _max_pending_bytes) {
        _backpressure_waits++;
        co_await seastar::yield();  // 让出 CPU,等待消费
    }

    _pending_queue.push_back(std::move(message));
    _pending_bytes += message.payload.size();

    if (_pending_bytes >= _batch_size_bytes) {
        co_await flush_batch();
    }
}

水位定义

  • 高水位(High Watermark):_max_pending_bytes
    • 达到此水位时,生产者需要等待
  • 低水位(Low Watermark):_pending_bytes_low_watermark
    • 降至此水位以下时,生产者恢复

水位监控指标

seastar::metrics::group("log_engine_writer")
    .make_gauge("pending_entries", _pending_queue.size())
    .make_gauge("pending_bytes", _pending_bytes)
    .make_counter("backpressure_waits", _backpressure_waits);

监控建议

  • pending_bytes > 80% 高水位:告警
  • backpressure_waits 上升快:可能磁盘性能瓶颈

确认语义

write_ack:快速确认

seastar::future<> AsyncWriter::flush_batch() {
    // ... 编码和写入 ...

    co_await _append_writer.append(encoded_buffer);
    // 立即返回,不等待 fsync
}

特性

  • 提交到 page cache 后立即返回
  • 延迟:~10-50μs
  • 一致性:进程崩溃时可能丢失最多一个批次

适用场景

  • 高吞吐场景
  • 可容忍少量数据丢失
  • 日志作为辅助信息

sync_ack:强一致性

seastar::future<> AsyncWriter::flush_batch() {
    // ... 编码和写入 ...

    co_await _append_writer.append(encoded_buffer);
    co_await _append_writer.flush();  // 额外 fsync
}

特性

  • fsync 到磁盘后返回
  • 延迟:~1-10ms
  • 一致性:保证落盘,不丢失

适用场景

  • 审计日志
  • 重要事件记录
  • 金融交易日志

性能对比

语义延迟吞吐数据安全性
write_ack10-50μs> 1M msg/s进程崩溃可能丢失一个批次
sync_ack1-10ms~100K msg/s强保证,不丢失

配置建议

// 大多数场景使用 write_ack
config.ack_mode = AckMode::write_ack;

// 重要场景使用 sync_ack
config.ack_mode = AckMode::sync_ack;

批量优化技巧

1. submit_many:批量提交

seastar::future<> AsyncWriter::submit_many(std::vector<LogMessage> messages) {
    for (auto& message : messages) {
        _pending_queue.push_back(std::move(message));
        _pending_bytes += message.payload.size();
    }

    if (_pending_bytes >= _batch_size_bytes) {
        co_await flush_batch();
    }
}

优势

  • 减少 queue 操作次数
  • 一次性触发批量写入

2. 内存预分配

class AsyncWriter {
private:
    std::vector<char> _encode_buffer;  // 复用编码缓冲区

    void ensure_encode_buffer(size_t min_size) {
        if (_encode_buffer.capacity() < min_size) {
            _encode_buffer.reserve(min_size * 2);  // 预留空间
        }
    }
};

效果

  • 减少内存分配次数
  • 提高编码性能

3. 零拷贝优化

seastar::future<> AsyncWriter::submit(LogMessage message) {
    // 使用 std::move 避免拷贝
    _pending_queue.push_back(std::move(message));
    // ...
}

故障处理

写入失败重试

seastar::future<> AsyncWriter::flush_batch() {
    const int MAX_RETRIES = 3;
    int retry_count = 0;

    while (retry_count < MAX_RETRIES) {
        try {
            co_await _append_writer.append(encoded_buffer);
            if (_ack_mode == AckMode::sync_ack) {
                co_await _append_writer.flush();
            }
            break;  // 成功,退出重试
        } catch (const std::exception& e) {
            _flush_errors++;
            retry_count++;

            if (retry_count < MAX_RETRIES) {
                co_await seastar::sleep(std::chrono::milliseconds(100 * retry_count));
            } else {
                // 重试耗尽,记录错误
                log_error("Write failed after {} retries: {}", retry_count, e.what());
                throw;
            }
        }
    }
}

指标监控

seastar::metrics::group("log_engine_writer")
    .make_counter("flush_errors", _flush_errors);

性能基准测试

测试环境

  • CPU: 16 cores
  • Disk: NVMe SSD
  • Message size: 256 bytes

测试结果

批量大小write_ack 吞吐sync_ack 吞吐P99 延迟 (write_ack)
4KB800K msg/s80K msg/s20μs
8KB1.2M msg/s120K msg/s15μs
16KB1.5M msg/s150K msg/s12μs
64KB1.8M msg/s180K msg/s10μs

结论

  • 批量越大,吞吐越高
  • write_acksync_ack 快 10 倍以上
  • 8KB 是性能和延迟的最佳平衡点

总结

AsyncWriter 的异步批量写入模型通过以下机制实现高性能:

  1. 批量累积:减少系统调用和磁盘 I/O 次数
  2. 定时触发:保证延迟上限
  3. 水位控制:防止内存无限增长
  4. 可配置语义:满足不同场景需求

最佳实践

  • 默认配置:8KB 批量大小 + 1ms 定时器 + write_ack
  • 高吞吐场景:增大批量大小到 64KB
  • 低延迟场景:减小批量大小到 4KB
  • 强一致性场景:使用 sync_ack

下一篇:《路由策略与分片机制:从 Hash Modulo 到 Consistent Hashing》

相关阅读