路由策略与分片机制:从 Hash Modulo 到 Consistent Hashing
深入分析 Seastar Log Engine 的路由策略,包括 Hash Modulo 和 Consistent Hashing 的实现原理、性能对比和适用场景。
路由的本质
问题场景
在分布式系统中,如何将日志消息分配到不同的 shard?
消息1: route_key="user:123" → 应该写入哪个 shard?
消息2: route_key="user:456" → 应该写入哪个 shard?
消息3: route_key="order:789" → 应该写入哪个 shard?
路由目标
- 负载均衡:每个 shard 的写入量大致相等
- 数据局部性:相同 route_key 的消息写入同一个 shard
- 扩展性:增删 shard 时影响最小
Hash Modulo:简单取模
算法原理
size_t hash_modulo_route(const std::string& key, size_t shard_count) {
if (key.empty()) {
return seastar::this_shard_id(); // 空键回退本地
}
std::hash<std::string> hasher;
size_t hash_value = hasher(key);
return hash_value % shard_count;
}
示例
size_t shard_count = 4;
hash_modulo_route("user:123", shard_count) // → 2
hash_modulo_route("user:456", shard_count) // → 1
hash_modulo_route("order:789", shard_count) // → 3
优点
- 实现简单:一行代码即可
- 性能极高:只有一次 hash 操作
- 内存占用小:不需要额外数据结构
缺点:扩容问题
当 shard 数量从 4 变为 6 时:
原路由:
"user:123" → shard 2 (hash % 4)
扩容后:
"user:123" → shard 5 (hash % 6)
结果:所有消息需要重新路由!
影响:
- 需要移动大量数据
- 查询索引失效
- 运维成本高
适用场景
- 固定规模:shard 数量基本不变
- 小规模系统:< 10 个 shard
- 快速原型:开发阶段验证功能
Consistent Hashing:一致性哈希
算法原理
一致性哈希通过虚拟节点实现平滑扩容:
物理环:
shard 0 ────────────────── shard 1
| |
| |
shard 3 ────────────────── shard 2
虚拟节点映射:
虚拟节点1, 3, 5 → shard 0
虚拟节点2, 4, 6 → shard 1
...
核心实现
class ConsistentHashRouter {
public:
void configure(size_t shard_count, size_t virtual_nodes) {
_shard_count = shard_count;
_virtual_nodes = virtual_nodes;
setup_hash_ring();
}
size_t route(const std::string& key) const {
if (key.empty()) {
return seastar::this_shard_id();
}
size_t hash_value = std::hash<std::string>{}(key);
// 在哈希环上找到第一个 >= hash_value 的节点
auto it = _hash_ring.lower_bound(hash_value);
if (it == _hash_ring.end()) {
// 环绕到第一个节点
return _hash_ring.begin()->second;
}
return it->second;
}
private:
void setup_hash_ring() {
_hash_ring.clear();
for (size_t shard = 0; shard < _shard_count; ++shard) {
for (size_t vnode = 0; vnode < _virtual_nodes; ++vnode) {
// 虚拟节点名:shard:vnode
std::string vnode_key = fmt::format("{}:{}", shard, vnode);
size_t vnode_hash = std::hash<std::string>{}(vnode_key);
_hash_ring[vnode_hash] = shard;
}
}
}
private:
size_t _shard_count;
size_t _virtual_nodes;
std::map<size_t, size_t> _hash_ring; // hash_value → shard
};
示例
// 初始化:4 个 shard,每个 shard 256 个虚拟节点
router.configure(4, 256);
// 路由
router.route("user:123") // → shard 2
router.route("user:456") // → shard 1
// 扩容到 6 个 shard
router.configure(6, 256);
// 路由:大部分 key 路由不变
router.route("user:123") // → shard 2 (不变!)
router.route("user:456") // → shard 1 (不变!)
扩容影响分析
假设原来有 4 个 shard,现在扩容到 6 个 shard:
每个 shard 负责的 key 数量:
扩容前:每个 shard 25%
扩容后:每个 shard 16.7%
数据移动:
shard 0 → 需要移动 (25% - 16.7%) = 8.3% 的数据
shard 1 → 需要移动 (25% - 16.7%) = 8.3% 的数据
shard 2 → 需要移动 (25% - 16.7%) = 8.3% 的数据
shard 3 → 需要移动 (25% - 16.7%) = 8.3% 的数据
总移动量:4 × 8.3% = 33.3%
对比 Hash Modulo:
总移动量:100%(所有 key 重新路由)
虚拟节点数量选择
| 虚拟节点数 | 负载均衡度 | 内存占用 | 计算开销 |
|---|---|---|---|
| 16 | 差 | 极小 | 极低 |
| 64 | 中等 | 小 | 低 |
| 256 | 好 | 中等 | 中等 |
| 1024 | 很好 | 大 | 高 |
推荐配置:
// 默认推荐
config.routing_virtual_nodes = 256;
// 小规模系统(< 10 shards)
config.routing_virtual_nodes = 64;
// 大规模系统(> 100 shards)
config.routing_virtual_nodes = 512;
Seastar Log Engine 的路由实现
RoutingEngine 类
class RoutingEngine {
public:
enum class Strategy {
hash_modulo,
consistent_hashing
};
enum class EmptyRoutePolicy {
local, // 空键写入当前 shard
round_robin // 空键轮询所有 shard
};
void configure(Strategy strategy, size_t virtual_nodes, size_t shard_count);
void set_empty_route_policy(EmptyRoutePolicy policy);
size_t route(const std::string& key) const;
private:
size_t route_hash_modulo(const std::string& key) const;
size_t route_consistent_hashing(const std::string& key) const;
size_t route_empty_key() const;
private:
Strategy _strategy = Strategy::hash_modulo;
size_t _virtual_nodes = 256;
size_t _shard_count = 1;
EmptyRoutePolicy _empty_route_policy = EmptyRoutePolicy::local;
std::map<size_t, size_t> _hash_ring;
std::atomic<size_t> _round_robin_counter{0};
};
路由策略选择
size_t RoutingEngine::route(const std::string& key) const {
if (key.empty()) {
return route_empty_key();
}
switch (_strategy) {
case Strategy::hash_modulo:
return route_hash_modulo(key);
case Strategy::consistent_hashing:
return route_consistent_hashing(key);
default:
return route_hash_modulo(key);
}
}
空 route_key 处理
策略一:local(默认)
size_t RoutingEngine::route_empty_key() const {
if (_empty_route_policy == EmptyRoutePolicy::local) {
return seastar::this_shard_id();
}
// ...
}
特性:
- 空键写入当前 shard
- 避免跨 shard 通信
- 适合单机多场景
策略二:round_robin
size_t RoutingEngine::route_empty_key() const {
if (_empty_route_policy == EmptyRoutePolicy::round_robin) {
size_t counter = _round_robin_counter.fetch_add(1);
return counter % _shard_count;
}
return seastar::this_shard_id();
}
特性:
- 空键轮询所有 shard
- 负载更均衡
- 可能跨 shard 通信
批量写入的路由优化
问题
批量写入时,如果消息属于不同 shard,需要多次跨 shard 通信:
// 低效方式
for (const auto& msg : messages) {
auto shard = router.route(msg.route_key);
co_await writers.invoke_on(shard, [msg](AsyncWriter& writer) {
return writer.submit(msg);
});
// 每次消息都跨 shard 通信!
}
优化一:按 shard 分组
seastar::future<> LogEngine::append_batch(std::vector<LogMessage> messages) {
// 按 shard 分组
std::map<size_t, std::vector<LogMessage>> grouped;
for (auto& msg : messages) {
auto shard = router.route(msg.route_key);
grouped[shard].push_back(std::move(msg));
}
// 每个 shard 一次性提交
std::vector<seastar::future<>> futures;
for (auto& [shard, batch] : grouped) {
futures.push_back(
_writers.invoke_on(shard, [batch = std::move(batch)](AsyncWriter& writer) mutable {
return writer.submit_many(std::move(batch));
})
);
}
co_await seastar::when_all_succeed(futures.begin(), futures.end());
}
效果:
- 跨 shard 通信次数:N → 分组数(通常 << N)
- 批量写入效率提升
优化二:单 shard 快路径
seastar::future<> LogEngine::append_batch(std::vector<LogMessage> messages) {
if (messages.empty()) {
co_return;
}
// 检查是否所有消息都指向同一个 shard
const auto& first_key = messages[0].route_key;
bool all_same_shard = true;
size_t target_shard = router.route(first_key);
for (size_t i = 1; i < messages.size(); ++i) {
if (router.route(messages[i].route_key) != target_shard) {
all_same_shard = false;
break;
}
}
// 快路径:所有消息指向同一个 shard
if (all_same_shard) {
if (target_shard == seastar::this_shard_id()) {
co_await _writers.local().submit_many(std::move(messages));
} else {
co_await _writers.invoke_on(target_shard, [batch = std::move(messages)](AsyncWriter& writer) mutable {
return writer.submit_many(std::move(batch));
});
}
co_return;
}
// 慢路径:按 shard 分组
// ...
}
性能对比
测试环境
- Shard 数:4
- 消息数:1,000,000
- 消息大小:256 bytes
测试结果
| 路由策略 | 吞吐量 | P99 延迟 | CPU 开销 |
|---|---|---|---|
| Hash Modulo | 1.8M msg/s | 12μs | 低 |
| Consistent Hashing (256 vnode) | 1.6M msg/s | 15μs | 中等 |
分析:
- Hash Modulo 性能略好,但差异不大
- Consistent Hashing 的内存和计算开销可以接受
使用建议
选择 Hash Modulo 的场景
- 固定规模系统:shard 数量长期不变
- 性能优先:追求极致性能
- 简单场景:不需要复杂的路由逻辑
config.routing_strategy = RoutingStrategy::hash_modulo;
选择 Consistent Hashing 的场景
- 动态扩容:预期会增删 shard
- 生产环境:需要平滑升级
- 多数据中心:shard 分布在不同机器
config.routing_strategy = RoutingStrategy::consistent_hashing;
config.routing_virtual_nodes = 256;
空 route_key 处理建议
// 单机场景:使用 local
config.empty_route_policy = EmptyRoutePolicy::local;
// 分布式场景:使用 round_robin
config.empty_route_policy = EmptyRoutePolicy::round_robin;
监控指标
路由均衡度
class RoutingEngine {
public:
std::map<size_t, size_t> get_route_stats() const {
std::map<size_t, size_t> stats;
for (size_t i = 0; i < _shard_count; ++i) {
stats[i] = _route_counts[i].load();
}
return stats;
}
private:
std::vector<std::atomic<uint64_t>> _route_counts;
};
告警规则:
# 任何 shard 的负载超过平均值的 2 倍
if max(route_counts) > 2 * avg(route_counts):
alert("路由不均衡!")
总结
路由策略的选择是性能和灵活性的权衡:
| 维度 | Hash Modulo | Consistent Hashing |
|---|---|---|
| 性能 | 最高 | 略低 |
| 扩容成本 | 高(100% 数据迁移) | 低(~33% 数据迁移) |
| 实现复杂度 | 简单 | 中等 |
| 适用场景 | 固定规模 | 动态扩容 |
推荐配置:
// 生产环境默认推荐
config.routing_strategy = RoutingStrategy::consistent_hashing;
config.routing_virtual_nodes = 256;
config.empty_route_policy = EmptyRoutePolicy::local;
下一篇:《数据一致性与恢复模型:Checkpoint 与 Crash Recovery 的实现》
相关阅读: