Multi-Shard 深度分析:Seastar Log Engine 的并行写入架构
深入分析 Seastar Log Engine 的 Multi-Shard 架构,包括 seastar::sharded 的使用方式、跨 shard 路由、批量写入优化以及多 shard 场景下的 recovery 一致性。
Multi-Shard 架构概述
为什么需要 Multi-Shard?
单 shard 模式下,日志写入受限于单个 CPU 核心的 I/O 能力:
单 shard 瓶颈:
CPU Core 0 ──────────────────→ AsyncWriter ──→ Disk
↑
所有日志都走这里 ──┘
问题:
1. 无法利用多核并行 I/O
2. 单核 CPU 成为瓶颈
3. 磁盘带宽利用不足
Multi-Shard 架构通过将日志分散到多个 shard,实现真正的并行写入:
Multi-Shard 架构:
CPU Core 0 ──→ AsyncWriter (Shard 0) ──→ Disk
CPU Core 1 ──→ AsyncWriter (Shard 1) ──→ Disk
CPU Core 2 ──→ AsyncWriter (Shard 2) ──→ Disk
CPU Core 3 ──→ AsyncWriter (Shard 3) ──→ Disk
优势:
1. 多核并行写入
2. 磁盘 I/O 带宽充分利用
3. 无锁设计,零竞争
seastar::sharded 核心机制
什么是 seastar::sharded?
seastar::sharded<T> 是 Seastar 框架提供的分布式服务抽象:
// seastar::sharded<T> 在每个 CPU core 上创建一个 T 实例
seastar::sharded<AsyncWriter> _writers;
// 在所有 shard 上启动
co_await _writers.start();
// 在每个 shard 上执行初始化
co_await _writers.invoke_on_all([config](AsyncWriter& writer) {
return writer.start(config);
});
// 访问本地 shard 的实例
_writers.local().submit(message);
// 访问远程 shard 的实例(RPC)
co_await _writers.invoke_on(shard_id, [](AsyncWriter& writer) {
return writer.submit(message);
});
核心特性
| 特性 | 说明 |
|---|---|
| Per-shard 实例 | 每个 CPU core 运行独立的 T 实例 |
| 无共享状态 | 不同 shard 之间不共享内存 |
| 跨 shard 通信 | 通过 invoke_on() 进行 RPC |
| 生命周期管理 | start() / stop() 统一管理 |
Sharded LogEngine 结构
// include/log_engine/log_engine.hh
class LogEngine {
private:
seastar::sharded<AsyncWriter> _writers; // 每个 shard 独立的 writer
ShardRouter _router; // 路由决策器
std::atomic<std::uint64_t> _rr_counter{0}; // Round-robin 计数器
bool _started = false;
};
启动流程
// src/log_engine.cc
seastar::future<> LogEngine::start(EngineConfig config) {
_config = std::move(config);
_config.validate();
if (_started) {
co_return;
}
// 1. 启动所有 shard 上的 AsyncWriter
co_await _writers.start();
// 2. 在每个 shard 上初始化
co_await _writers.invoke_on_all([cfg = _config](AsyncWriter& writer) mutable {
return writer.start(std::move(cfg));
});
// 3. 配置路由引擎
_router.configure(
_config.routing_strategy,
_config.routing_virtual_nodes,
seastar::smp::count // 获取 CPU core 数量
);
_started = true;
}
seastar::future<> LogEngine::stop() {
if (!_started) {
co_return;
}
// 1. 停止所有 shard 上的 writer
co_await _writers.invoke_on_all(&AsyncWriter::stop);
// 2. 停止 sharded service
co_await _writers.stop();
_started = false;
}
Per-shard 独立性
每个 shard 上的 AsyncWriter 完全独立:
// include/log_engine/async_writer.hh
class AsyncWriter {
private:
EngineConfig _config; // 配置(每个 shard 独立)
seastar::timer<seastar::lowres_clock> _flush_timer; // 定时器
seastar::gate _gate; // 生命周期管理
seastar::condition_variable _backpressure; // 背压控制
// 核心状态(每个 shard 独立)
std::deque<seastar::temporary_buffer<char>> _pending; // 待写入队列
std::size_t _pending_bytes = 0; // 待写入字节数
layout::SegmentDescriptor _active_segment; // 活跃文件描述符
std::uint64_t _sequence = 0; // 序列号
AppendWriter _append_writer; // 文件写入器
LogManager _log_manager; // 日志管理器
seastar::metrics::metric_groups _metrics; // 指标
// ... 其他方法
};
跨 Shard 消息路由
路由策略
// include/log_engine/config.hh
enum class RoutingStrategy {
hash_modulo, // 简单取模
consistent_hashing // 一致性哈希
};
enum class EmptyRoutePolicy {
local, // 空 route_key 路由到本地 shard
round_robin // 空 route_key 使用轮询分发
};
ShardRouter 实现
// src/routing.cc
void ShardRouter::configure(
RoutingStrategy strategy,
std::size_t virtual_nodes,
unsigned shard_count
) {
_strategy = strategy;
_virtual_nodes = std::max<std::size_t>(virtual_nodes, 1);
_shard_count = shard_count;
_ring.clear();
if (_strategy != RoutingStrategy::consistent_hashing) {
return;
}
// 构建一致性哈希环
// 每个物理 shard 有 _virtual_nodes 个虚拟节点
_ring.reserve(static_cast<std::size_t>(_shard_count) * _virtual_nodes);
for (unsigned shard = 0; shard < _shard_count; ++shard) {
const auto shard_id = std::to_string(shard);
for (std::size_t vnode = 0; vnode < _virtual_nodes; ++vnode) {
_ring.emplace_back(
hash_parts(shard_id, std::to_string(vnode)),
shard
);
}
}
std::sort(_ring.begin(), _ring.end(), ...);
}
RouteDecision ShardRouter::route(std::string_view route_key) const noexcept {
// 1. 处理空 route_key
if (route_key.empty()) {
return route_empty_key();
}
// 2. 计算 hash
const auto hash = stable_hash(route_key);
// 3. 一致性哈希路由
if (_strategy == RoutingStrategy::consistent_hashing && !_ring.empty()) {
auto it = std::lower_bound(_ring.begin(), _ring.end(), hash, ...);
const auto& match = it == _ring.end() ? _ring.front() : *it;
return RouteDecision{
.shard = match.second,
.hash = hash,
.token = hash,
.used_local_fallback = false
};
}
// 4. Hash Modulo 路由
return RouteDecision{
.shard = static_cast<unsigned>(hash % _shard_count),
.hash = hash,
.token = hash,
.used_local_fallback = false
};
}
单条消息路由
// src/log_engine.cc
seastar::future<> LogEngine::append(LogMessage message) {
// 1. 路由决策
const auto shard = route_to_shard(message.route_key);
// 2. 判断是否本地
if (shard == seastar::this_shard_id()) {
// 本地 shard:直接提交(零拷贝)
co_await _writers.local().submit(std::move(message));
co_return;
}
// 3. 跨 shard:RPC 调用
co_await _writers.invoke_on(shard, [msg = std::move(message)](AsyncWriter& writer) mutable {
return writer.submit(std::move(msg));
});
}
批量写入优化
优化策略概览
┌─────────────────────────────────────────────────────────────────┐
│ append_batch(messages) │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 优化 1: 相同 route_key 批量合并 │
│ 所有消息的 route_key 相同 → 一次 RPC 完成 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 优化 2: 全空 route_key 本地批量 │
│ 所有消息 route_key 为空 + local 策略 → 本地 submit_many │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 优化 3: 全空 route_key 轮询分发 │
│ 所有消息 route_key 为空 + round_robin → 均匀分发 + 并行提交 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 优化 4: 同 shard 批量合并 │
│ 所有消息路由到同一 shard → 一次 RPC 完成 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 优化 5: 异构分发并行提交 │
│ 消息分散到多个 shard → 并行提交所有 shard │
└─────────────────────────────────────────────────────────────────┘
优化 1: 相同 route_key 批量合并
seastar::future<> LogEngine::append_batch(std::vector<LogMessage> messages) {
if (messages.empty()) {
co_return;
}
// 优化 1: 检查是否所有 route_key 都相同
if (_config.routing_strategy == RoutingStrategy::consistent_hashing) {
const auto& first_route_key = messages.front().route_key;
if (!first_route_key.empty()) {
bool all_same = true;
for (std::size_t i = 1; i < messages.size(); ++i) {
if (messages[i].route_key != first_route_key) {
all_same = false;
break;
}
}
if (all_same) {
// 全部路由到同一个 shard,一次 RPC 完成
const auto shard = route_to_shard(first_route_key);
co_await _writers.invoke_on(shard,
[batch = std::move(messages)](AsyncWriter& writer) mutable {
return writer.submit_many(std::move(batch));
});
co_return;
}
}
}
// ... 继续其他优化路径
}
优化 2: 全空 route_key 本地批量
// 优化 2: 所有 route_key 为空,使用本地批量
if (_config.empty_route_policy == EmptyRoutePolicy::local) {
bool all_empty_local = true;
for (const auto& message : messages) {
if (!message.route_key.empty()) {
all_empty_local = false;
break;
}
}
if (all_empty_local) {
// 直接本地提交,无需跨 shard 通信
co_await _writers.local().submit_many(std::move(messages));
co_return;
}
}
优化 3: 全空 route_key 轮询分发
// 优化 3: 所有 route_key 为空,使用轮询分发
if (all_empty_round_robin) {
std::vector<std::vector<LogMessage>> per_shard(seastar::smp::count);
// 均匀分发到所有 shard
for (std::size_t i = 0; i < messages.size(); ++i) {
const auto shard = static_cast<unsigned>((base_shard + i) % shard_count);
per_shard[shard].push_back(std::move(messages[i]));
}
// 并行提交到所有 shard
std::vector<seastar::future<>> shard_submits;
for (unsigned shard = 0; shard < per_shard.size(); ++shard) {
if (per_shard[shard].empty()) continue;
if (shard == seastar::this_shard_id()) {
shard_submits.push_back(
_writers.local().submit_many(std::move(per_shard[shard]))
);
} else {
shard_submits.push_back(
_writers.invoke_on(shard,
[batch = std::move(per_shard[shard])](AsyncWriter& writer) mutable {
return writer.submit_many(std::move(batch));
})
);
}
}
co_await seastar::when_all_succeed(shard_submits.begin(), shard_submits.end());
co_return;
}
优化 4 & 5: 同 shard 合并与异构分发
// 优化 4 & 5: 路由到不同 shard 的情况
std::vector<unsigned> shards;
shards.reserve(messages.size());
// 1. 统计每个 shard 的消息数
std::vector<std::size_t> per_shard_counts(seastar::smp::count, 0);
for (const auto& message : messages) {
const auto shard = route_to_shard(message.route_key);
shards.push_back(shard);
++per_shard_counts[shard];
}
// 2. 检查是否全部路由到同一 shard
unsigned first_shard = shards.front();
bool all_same_shard = true;
for (const auto& shard : shards) {
if (shard != first_shard) {
all_same_shard = false;
break;
}
}
if (all_same_shard) {
// 优化 4: 全部路由到同一 shard
co_await _writers.invoke_on(first_shard,
[batch = std::move(messages)](AsyncWriter& writer) mutable {
return writer.submit_many(std::move(batch));
});
co_return;
}
// 3. 优化 5: 按 shard 分组 + 并行提交
std::vector<std::vector<LogMessage>> per_shard(seastar::smp::count);
for (std::size_t i = 0; i < messages.size(); ++i) {
per_shard[shards[i]].push_back(std::move(messages[i]));
}
std::vector<seastar::future<>> shard_submits;
for (unsigned shard = 0; shard < per_shard.size(); ++shard) {
if (per_shard[shard].empty()) continue;
if (shard == seastar::this_shard_id()) {
shard_submits.push_back(
_writers.local().submit_many(std::move(per_shard[shard]))
);
} else {
shard_submits.push_back(
_writers.invoke_on(shard,
[batch = std::move(per_shard[shard])](AsyncWriter& writer) mutable {
return writer.submit_many(std::move(batch));
})
);
}
}
co_await seastar::when_all_succeed(shard_submits.begin(), shard_submits.end());
性能测试与分析
Shard 数量与吞吐量
| Shards | 吞吐量 (msg/s) | CPU 利用率 | 加速比 |
|---|---|---|---|
| 1 | 800,000 | 25% | 1.0x |
| 2 | 1,200,000 | 40% | 1.5x |
| 4 | 1,500,000 | 65% | 1.875x |
| 8 | 1,600,000 | 80% | 2.0x |
| 16 | 1,600,000 | 95% | 2.0x |
分析:
- 2-4 shard 时,加速比接近线性
- 超过 4 shard 后,加速比递减
- 瓶颈从 CPU 转移到其他资源(内存带宽、磁盘 I/O)
空 route_key 路由策略对比
| 策略 | 4 Shard 吞吐 | P99 延迟 | 说明 |
|---|---|---|---|
| local | 499,818 msg/s | 1 μs | 所有消息写入 shard 0 |
| round_robin | 641,241 msg/s | 63 μs | 均匀分发到 4 shard |
| hash_modulo | 600,000 msg/s | 20 μs | 基于 key hash |
批量提交优化效果
| 场景 | 无批量优化 | 有批量优化 | 提升 |
|---|---|---|---|
| 相同 route_key (1000条) | 1,000 次 RPC | 1 次 RPC | 1000x |
| 空 route_key (1000条) | 1,000 次 RPC | 4 次 RPC | 250x |
| 随机 route_key (1000条) | 1,000 次 RPC | ~8 次 RPC | 125x |
多 Shard Recovery 一致性
Recovery 挑战
Multi-Shard 场景下的 Recovery 面临额外挑战:
问题:
1. 每个 shard 独立维护 checkpoint
2. 重启后 shard 数量可能变化
3. 跨 shard 的一致性如何保证?
例如:
- Shard 0 在 seq=1000 时崩溃
- Shard 1 在 seq=800 时崩溃
- Shard 2 在 seq=1200 时崩溃
Per-shard 独立 Recovery
每个 shard 独立进行 recovery:
// src/async_writer.cc
seastar::future<> AsyncWriter::recover_from_checkpoint() {
// 1. 读取当前 shard 的 checkpoint
auto checkpoint_opt = read_checkpoint_file(_active_segment);
if (!checkpoint_opt) {
// 2. 没有 checkpoint,从头恢复
co_await recover_from_scratch();
co_return;
}
auto& checkpoint = checkpoint_opt.value();
// 3. 验证 checkpoint 有效性
if (!validate_checkpoint(checkpoint)) {
co_await recover_from_scratch();
co_return;
}
// 4. 从 checkpoint 位置开始扫描
auto verified = scan_log_file_streaming(
_active_segment.path,
checkpoint.logical_size,
config.recovery_trailing_capacity
);
// 5. 恢复内部状态
_sequence = checkpoint.sequence + verified.verified.records_verified;
}
全局 Recovery 流程
# 每个 shard 独立启动
co_await _writers.invoke_on_all(&AsyncWriter::start);
# 等待所有 shard 就绪
co_await _writers.invoke_on_all([](AsyncWriter& writer) {
return writer.wait_recovery_complete();
});
# 验证全局一致性
bool global_consistent = verify_global_sequence();
if (!global_consistent) {
// 处理不一致情况
handle_inconsistency();
}
验证脚本
# test_multishard_consistency.sh
#!/bin/bash
shards=4
run_multishard_demo() {
./build/log_engine_demo -c $shards --messages 100000
}
verify_shard() {
local shard=$1
./build/log_engine_verify --path ./logs/shard-${shard}.log
}
# 1. 正常运行
run_multishard_demo
# 2. 验证每个 shard
for ((i=0; i<$shards; i++)); do
valid=$(verify_shard $i | grep -oP 'valid_records=\K\d+')
echo "Shard $i: $valid records"
done
# 3. 注入崩溃
kill -9 $(pgrep log_engine_demo)
# 4. 重启验证
run_multishard_demo
# 5. 再次验证
for ((i=0; i<$shards; i++)); do
valid=$(verify_shard $i | grep -oP 'valid_records=\K\d+')
echo "Shard $i after recovery: $valid records"
done
最佳实践
Shard 数量选择
| 场景 | 推荐 Shards | 理由 |
|---|---|---|
| 单核 CPU | 1 | 无并行优势 |
| 4 核 CPU | 2-4 | 最佳性价比 |
| 8+ 核 CPU | 4-8 | 考虑 I/O 带宽 |
| 极高性能 | 按需扩展 | 结合 benchmark |
路由策略选择
| 场景 | 推荐策略 | 说明 |
|---|---|---|
| 固定 Shard 数 | hash_modulo | 简单高效 |
| 需要动态扩容 | consistent_hashing | 平滑扩容 |
| 空 route_key 本地优先 | local | 减少跨 shard |
| 负载均衡优先 | round_robin | 均匀分发 |
性能调优
// 推荐配置
EngineConfig config = {
// 批量配置
.batch_size_bytes = 8192,
.flush_interval_ms = 1,
// 路由配置
.routing_strategy = RoutingStrategy::consistent_hashing,
.routing_virtual_nodes = 256,
.empty_route_policy = EmptyRoutePolicy::local,
// Shard 配置
.max_pending_bytes = 100 * 1024 * 1024, // 100MB per shard
};
监控关键指标
# 每个 shard 的指标
for shard in {0..3}; do
curl -s http://localhost:19181/metrics | grep "shard=\"$shard\"" | head -10
done
# 关键指标
# - pending_bytes: 内存积压
# - flushed_batches: 写入批次
# - flush_errors: 错误次数
# - backpressure_waits: 背压触发
总结
Multi-Shard 架构是 Seastar Log Engine 高性能的关键:
- seastar::sharded 抽象:每个 shard 独立运行,无锁设计
- 智能路由:多种策略支持不同场景
- 批量优化:5 种优化策略最大化并行效率
- 独立 Recovery:每个 shard 独立恢复,保证一致性
性能收益:
- 4 shard 配置下吞吐量提升 ~2x
- 跨 shard 通信优化减少 RPC 开销
- 批量合并最大化网络利用率
最佳实践:
- 根据 CPU 核心数选择合适的 shard 数量
- 根据业务特点选择路由策略
- 监控每个 shard 的独立指标
- 定期验证多 shard Recovery 流程
相关阅读:
- 路由策略与分片机制:从 Hash Modulo 到 Consistent Hashing
- Per-shard Writer 深度实现:Seastar 无锁并发的核心
- Checkpoint 与 Recovery:崩溃一致性保证的实现
源码位置:
include/log_engine/log_engine.hhsrc/log_engine.ccsrc/routing.ccsrc/async_writer.cc