异步批量写入模型:深入 Per-shard Writer 的设计
深入分析 AsyncWriter 的异步批量写入机制,包括批处理触发策略、水位控制、确认语义等核心实现。
写入模型演进
问题场景
在高并发场景下,如果每次日志调用都触发磁盘 I/O:
// 错误的写入方式
for (int i = 0; i < 1000000; ++i) {
LOG(INFO) << "Message " << i;
// 每次调用都触发磁盘写入,性能极差
}
性能瓶颈:
- 系统调用开销:每次
write()都需要进入内核态 - 磁盘寻址成本:磁头移动延迟(HDD)或 I/O 队列延迟(SSD)
- 上下文切换:从用户态到内核态的切换
解决方案:异步批量写入
将多个小消息聚合为一个大批次,一次性写入:
// 批量写入方式
std::vector<LogMessage> batch;
for (int i = 0; i < 1000000; ++i) {
batch.push_back({"Message " + std::to_string(i)});
if (batch.size() >= 1000) {
co_await writer.submit_batch(batch);
batch.clear();
}
}
性能提升:
- 减少系统调用次数(1M 次变为 1K 次)
- 提高磁盘顺序写入效率
- 减少 context switch
AsyncWriter 核心设计
数据结构
class AsyncWriter {
private:
// 待写入队列
std::deque<LogMessage> _pending_queue;
// 内存水位
size_t _pending_bytes = 0;
size_t _max_pending_bytes = 100 * 1024 * 1024; // 100MB
size_t _pending_bytes_low_watermark = 50 * 1024 * 1024; // 50MB
// 批量配置
size_t _batch_size_bytes = 8192; // 8KB
uint64_t _flush_interval_ms = 1; // 1ms
// 底层组件
AppendWriter _append_writer;
seastar::timer<> _flush_timer;
// 确认语义
AckMode _ack_mode = AckMode::write_ack;
// 指标
std::atomic<uint64_t> _submitted_messages{0};
std::atomic<uint64_t> _submitted_bytes{0};
std::atomic<uint64_t> _backpressure_waits{0};
};
状态流转
┌─────────────┐
│ Idle │
└──────┬──────┘
│ submit()
↓
┌─────────────┐
│ Accumulate │ ← 消息累积
└──────┬──────┘
│ batch size or flush interval
↓
┌─────────────┐
│ Flushing │ ← 写入磁盘
└──────┬──────┘
│ 完成
↓
┌─────────────┐
│ Idle │
└─────────────┘
批量触发策略
策略一:按大小触发
seastar::future<> AsyncWriter::submit(LogMessage message) {
// 加入队列
_pending_queue.push_back(std::move(message));
_pending_bytes += message.payload.size();
// 检查是否达到批量大小阈值
if (_pending_bytes >= _batch_size_bytes) {
co_await flush_batch();
}
}
触发时机:累积到 _batch_size_bytes 字节时触发
优点:保证批量大小,最大化 I/O 效率
缺点:低流量时延迟较高
策略二:按时间触发
seastar::future<> AsyncWriter::start(const EngineConfig& config) {
// 启动定时器
_flush_timer.arm_periodic(
std::chrono::milliseconds(config.flush_interval_ms)
);
_flush_timer.set_callback([this] {
if (!_pending_queue.empty()) {
flush_batch();
}
});
}
触发时机:每隔 _flush_interval_ms 毫秒触发一次
优点:保证延迟上限
缺点:高流量时可能产生小批次
策略三:组合触发(推荐)
同时使用大小和时间触发:
seastar::future<> AsyncWriter::submit(LogMessage message) {
_pending_queue.push_back(std::move(message));
_pending_bytes += message.payload.size();
// 按大小触发
if (_pending_bytes >= _batch_size_bytes) {
co_await flush_batch();
}
// 时间触发由定时器处理
}
配置建议:
| 场景 | batch_size_bytes | flush_interval_ms |
|---|---|---|
| 高吞吐 | 64KB | 10ms |
| 低延迟 | 4KB | 1ms |
| 平衡 | 8KB | 1ms |
批量写入实现
flush_batch 流程
seastar::future<> AsyncWriter::flush_batch() {
if (_pending_queue.empty()) {
co_return;
}
// 1. 移动队列内容
auto batch = std::exchange(_pending_queue, {});
auto batch_bytes = std::exchange(_pending_bytes, 0);
// 2. 编码为二进制格式
std::vector<char> encoded_buffer;
encoded_buffer.reserve(batch_bytes);
for (const auto& message : batch) {
RecordCodec::encode(message, encoded_buffer);
}
// 3. 写入磁盘
co_await _append_writer.append(encoded_buffer);
// 4. 根据 ack_mode 决定是否 fsync
if (_ack_mode == AckMode::sync_ack) {
co_await _append_writer.flush();
}
// 5. 更新指标
_flushed_batches++;
_flushed_bytes += encoded_buffer.size();
}
RecordCodec 编码
class RecordCodec {
public:
static void encode(const LogMessage& message, std::vector<char>& buffer) {
// 格式:[length][header][payload][crc]
// 1. 编码 header
RecordHeader header{
.timestamp = std::chrono::system_clock::now(),
.route_key = message.route_key,
.payload_size = message.payload.size()
};
std::vector<char> header_buf;
encode_header(header, header_buf);
// 2. 计算总长度
uint32_t total_length =
sizeof(uint32_t) + // length 字段
header_buf.size() + // header
message.payload.size() + // payload
sizeof(uint32_t); // crc
// 3. 写入 length
auto length_bytes = reinterpret_cast<const char*>(&total_length);
buffer.insert(buffer.end(), length_bytes, length_bytes + sizeof(uint32_t));
// 4. 写入 header
buffer.insert(buffer.end(), header_buf.begin(), header_buf.end());
// 5. 写入 payload
buffer.insert(buffer.end(), message.payload.begin(), message.payload.end());
// 6. 计算 CRC
uint32_t crc = compute_crc32(buffer.data(), buffer.size());
auto crc_bytes = reinterpret_cast<const char*>(&crc);
buffer.insert(buffer.end(), crc_bytes, crc_bytes + sizeof(uint32_t));
}
};
编码格式:
┌──────────┬──────────┬──────────┬──────────┐
│ Length │ Header │ Payload │ CRC │
│ 4 bytes │ N bytes │ M bytes │ 4 bytes │
└──────────┴──────────┴──────────┴──────────┘
水位控制机制
背压(Backpressure)问题
当写入速度 > 磁盘速度时,内存会无限增长:
生产者: 10MB/s
消费者: 8MB/s
结果: 内存每秒增长 2MB
水位控制策略
seastar::future<> AsyncWriter::submit(LogMessage message) {
// 高水位检查
while (_pending_bytes >= _max_pending_bytes) {
_backpressure_waits++;
co_await seastar::yield(); // 让出 CPU,等待消费
}
_pending_queue.push_back(std::move(message));
_pending_bytes += message.payload.size();
if (_pending_bytes >= _batch_size_bytes) {
co_await flush_batch();
}
}
水位定义:
- 高水位(High Watermark):
_max_pending_bytes- 达到此水位时,生产者需要等待
- 低水位(Low Watermark):
_pending_bytes_low_watermark- 降至此水位以下时,生产者恢复
水位监控指标
seastar::metrics::group("log_engine_writer")
.make_gauge("pending_entries", _pending_queue.size())
.make_gauge("pending_bytes", _pending_bytes)
.make_counter("backpressure_waits", _backpressure_waits);
监控建议:
pending_bytes> 80% 高水位:告警backpressure_waits上升快:可能磁盘性能瓶颈
确认语义
write_ack:快速确认
seastar::future<> AsyncWriter::flush_batch() {
// ... 编码和写入 ...
co_await _append_writer.append(encoded_buffer);
// 立即返回,不等待 fsync
}
特性:
- 提交到 page cache 后立即返回
- 延迟:~10-50μs
- 一致性:进程崩溃时可能丢失最多一个批次
适用场景:
- 高吞吐场景
- 可容忍少量数据丢失
- 日志作为辅助信息
sync_ack:强一致性
seastar::future<> AsyncWriter::flush_batch() {
// ... 编码和写入 ...
co_await _append_writer.append(encoded_buffer);
co_await _append_writer.flush(); // 额外 fsync
}
特性:
- fsync 到磁盘后返回
- 延迟:~1-10ms
- 一致性:保证落盘,不丢失
适用场景:
- 审计日志
- 重要事件记录
- 金融交易日志
性能对比
| 语义 | 延迟 | 吞吐 | 数据安全性 |
|---|---|---|---|
| write_ack | 10-50μs | > 1M msg/s | 进程崩溃可能丢失一个批次 |
| sync_ack | 1-10ms | ~100K msg/s | 强保证,不丢失 |
配置建议:
// 大多数场景使用 write_ack
config.ack_mode = AckMode::write_ack;
// 重要场景使用 sync_ack
config.ack_mode = AckMode::sync_ack;
批量优化技巧
1. submit_many:批量提交
seastar::future<> AsyncWriter::submit_many(std::vector<LogMessage> messages) {
for (auto& message : messages) {
_pending_queue.push_back(std::move(message));
_pending_bytes += message.payload.size();
}
if (_pending_bytes >= _batch_size_bytes) {
co_await flush_batch();
}
}
优势:
- 减少 queue 操作次数
- 一次性触发批量写入
2. 内存预分配
class AsyncWriter {
private:
std::vector<char> _encode_buffer; // 复用编码缓冲区
void ensure_encode_buffer(size_t min_size) {
if (_encode_buffer.capacity() < min_size) {
_encode_buffer.reserve(min_size * 2); // 预留空间
}
}
};
效果:
- 减少内存分配次数
- 提高编码性能
3. 零拷贝优化
seastar::future<> AsyncWriter::submit(LogMessage message) {
// 使用 std::move 避免拷贝
_pending_queue.push_back(std::move(message));
// ...
}
故障处理
写入失败重试
seastar::future<> AsyncWriter::flush_batch() {
const int MAX_RETRIES = 3;
int retry_count = 0;
while (retry_count < MAX_RETRIES) {
try {
co_await _append_writer.append(encoded_buffer);
if (_ack_mode == AckMode::sync_ack) {
co_await _append_writer.flush();
}
break; // 成功,退出重试
} catch (const std::exception& e) {
_flush_errors++;
retry_count++;
if (retry_count < MAX_RETRIES) {
co_await seastar::sleep(std::chrono::milliseconds(100 * retry_count));
} else {
// 重试耗尽,记录错误
log_error("Write failed after {} retries: {}", retry_count, e.what());
throw;
}
}
}
}
指标监控
seastar::metrics::group("log_engine_writer")
.make_counter("flush_errors", _flush_errors);
性能基准测试
测试环境
- CPU: 16 cores
- Disk: NVMe SSD
- Message size: 256 bytes
测试结果
| 批量大小 | write_ack 吞吐 | sync_ack 吞吐 | P99 延迟 (write_ack) |
|---|---|---|---|
| 4KB | 800K msg/s | 80K msg/s | 20μs |
| 8KB | 1.2M msg/s | 120K msg/s | 15μs |
| 16KB | 1.5M msg/s | 150K msg/s | 12μs |
| 64KB | 1.8M msg/s | 180K msg/s | 10μs |
结论:
- 批量越大,吞吐越高
write_ack比sync_ack快 10 倍以上- 8KB 是性能和延迟的最佳平衡点
总结
AsyncWriter 的异步批量写入模型通过以下机制实现高性能:
- 批量累积:减少系统调用和磁盘 I/O 次数
- 定时触发:保证延迟上限
- 水位控制:防止内存无限增长
- 可配置语义:满足不同场景需求
最佳实践:
- 默认配置:8KB 批量大小 + 1ms 定时器 + write_ack
- 高吞吐场景:增大批量大小到 64KB
- 低延迟场景:减小批量大小到 4KB
- 强一致性场景:使用 sync_ack
下一篇:《路由策略与分片机制:从 Hash Modulo 到 Consistent Hashing》
相关阅读: