路由策略与分片机制:从 Hash Modulo 到 Consistent Hashing

深入分析 Seastar Log Engine 的路由策略,包括 Hash Modulo 和 Consistent Hashing 的实现原理、性能对比和适用场景。

路由的本质

问题场景

在分布式系统中,如何将日志消息分配到不同的 shard?

消息1: route_key="user:123" → 应该写入哪个 shard?
消息2: route_key="user:456" → 应该写入哪个 shard?
消息3: route_key="order:789" → 应该写入哪个 shard?

路由目标

  1. 负载均衡:每个 shard 的写入量大致相等
  2. 数据局部性:相同 route_key 的消息写入同一个 shard
  3. 扩展性:增删 shard 时影响最小

Hash Modulo:简单取模

算法原理

size_t hash_modulo_route(const std::string& key, size_t shard_count) {
    if (key.empty()) {
        return seastar::this_shard_id();  // 空键回退本地
    }

    std::hash<std::string> hasher;
    size_t hash_value = hasher(key);
    return hash_value % shard_count;
}

示例

size_t shard_count = 4;

hash_modulo_route("user:123", shard_count)  // → 2
hash_modulo_route("user:456", shard_count)  // → 1
hash_modulo_route("order:789", shard_count) // → 3

优点

  1. 实现简单:一行代码即可
  2. 性能极高:只有一次 hash 操作
  3. 内存占用小:不需要额外数据结构

缺点:扩容问题

当 shard 数量从 4 变为 6 时:

原路由:
"user:123" → shard 2 (hash % 4)

扩容后:
"user:123" → shard 5 (hash % 6)

结果:所有消息需要重新路由!

影响

  • 需要移动大量数据
  • 查询索引失效
  • 运维成本高

适用场景

  • 固定规模:shard 数量基本不变
  • 小规模系统:< 10 个 shard
  • 快速原型:开发阶段验证功能

Consistent Hashing:一致性哈希

算法原理

一致性哈希通过虚拟节点实现平滑扩容:

物理环:
shard 0 ────────────────── shard 1
    |                       |
    |                       |
shard 3 ────────────────── shard 2

虚拟节点映射:
虚拟节点1, 3, 5 → shard 0
虚拟节点2, 4, 6 → shard 1
...

核心实现

class ConsistentHashRouter {
public:
    void configure(size_t shard_count, size_t virtual_nodes) {
        _shard_count = shard_count;
        _virtual_nodes = virtual_nodes;

        setup_hash_ring();
    }

    size_t route(const std::string& key) const {
        if (key.empty()) {
            return seastar::this_shard_id();
        }

        size_t hash_value = std::hash<std::string>{}(key);

        // 在哈希环上找到第一个 >= hash_value 的节点
        auto it = _hash_ring.lower_bound(hash_value);

        if (it == _hash_ring.end()) {
            // 环绕到第一个节点
            return _hash_ring.begin()->second;
        }

        return it->second;
    }

private:
    void setup_hash_ring() {
        _hash_ring.clear();

        for (size_t shard = 0; shard < _shard_count; ++shard) {
            for (size_t vnode = 0; vnode < _virtual_nodes; ++vnode) {
                // 虚拟节点名:shard:vnode
                std::string vnode_key = fmt::format("{}:{}", shard, vnode);
                size_t vnode_hash = std::hash<std::string>{}(vnode_key);

                _hash_ring[vnode_hash] = shard;
            }
        }
    }

private:
    size_t _shard_count;
    size_t _virtual_nodes;
    std::map<size_t, size_t> _hash_ring;  // hash_value → shard
};

示例

// 初始化:4 个 shard,每个 shard 256 个虚拟节点
router.configure(4, 256);

// 路由
router.route("user:123")  // → shard 2
router.route("user:456")  // → shard 1

// 扩容到 6 个 shard
router.configure(6, 256);

// 路由:大部分 key 路由不变
router.route("user:123")  // → shard 2 (不变!)
router.route("user:456")  // → shard 1 (不变!)

扩容影响分析

假设原来有 4 个 shard,现在扩容到 6 个 shard:

每个 shard 负责的 key 数量:
扩容前:每个 shard 25%
扩容后:每个 shard 16.7%

数据移动:
shard 0 → 需要移动 (25% - 16.7%) = 8.3% 的数据
shard 1 → 需要移动 (25% - 16.7%) = 8.3% 的数据
shard 2 → 需要移动 (25% - 16.7%) = 8.3% 的数据
shard 3 → 需要移动 (25% - 16.7%) = 8.3% 的数据

总移动量:4 × 8.3% = 33.3%

对比 Hash Modulo:
总移动量:100%(所有 key 重新路由)

虚拟节点数量选择

虚拟节点数负载均衡度内存占用计算开销
16极小极低
64中等
256中等中等
1024很好

推荐配置

// 默认推荐
config.routing_virtual_nodes = 256;

// 小规模系统(< 10 shards)
config.routing_virtual_nodes = 64;

// 大规模系统(> 100 shards)
config.routing_virtual_nodes = 512;

Seastar Log Engine 的路由实现

RoutingEngine 类

class RoutingEngine {
public:
    enum class Strategy {
        hash_modulo,
        consistent_hashing
    };

    enum class EmptyRoutePolicy {
        local,          // 空键写入当前 shard
        round_robin     // 空键轮询所有 shard
    };

    void configure(Strategy strategy, size_t virtual_nodes, size_t shard_count);
    void set_empty_route_policy(EmptyRoutePolicy policy);
    size_t route(const std::string& key) const;

private:
    size_t route_hash_modulo(const std::string& key) const;
    size_t route_consistent_hashing(const std::string& key) const;
    size_t route_empty_key() const;

private:
    Strategy _strategy = Strategy::hash_modulo;
    size_t _virtual_nodes = 256;
    size_t _shard_count = 1;
    EmptyRoutePolicy _empty_route_policy = EmptyRoutePolicy::local;

    std::map<size_t, size_t> _hash_ring;
    std::atomic<size_t> _round_robin_counter{0};
};

路由策略选择

size_t RoutingEngine::route(const std::string& key) const {
    if (key.empty()) {
        return route_empty_key();
    }

    switch (_strategy) {
    case Strategy::hash_modulo:
        return route_hash_modulo(key);
    case Strategy::consistent_hashing:
        return route_consistent_hashing(key);
    default:
        return route_hash_modulo(key);
    }
}

空 route_key 处理

策略一:local(默认)

size_t RoutingEngine::route_empty_key() const {
    if (_empty_route_policy == EmptyRoutePolicy::local) {
        return seastar::this_shard_id();
    }
    // ...
}

特性

  • 空键写入当前 shard
  • 避免跨 shard 通信
  • 适合单机多场景

策略二:round_robin

size_t RoutingEngine::route_empty_key() const {
    if (_empty_route_policy == EmptyRoutePolicy::round_robin) {
        size_t counter = _round_robin_counter.fetch_add(1);
        return counter % _shard_count;
    }
    return seastar::this_shard_id();
}

特性

  • 空键轮询所有 shard
  • 负载更均衡
  • 可能跨 shard 通信

批量写入的路由优化

问题

批量写入时,如果消息属于不同 shard,需要多次跨 shard 通信:

// 低效方式
for (const auto& msg : messages) {
    auto shard = router.route(msg.route_key);
    co_await writers.invoke_on(shard, [msg](AsyncWriter& writer) {
        return writer.submit(msg);
    });
    // 每次消息都跨 shard 通信!
}

优化一:按 shard 分组

seastar::future<> LogEngine::append_batch(std::vector<LogMessage> messages) {
    // 按 shard 分组
    std::map<size_t, std::vector<LogMessage>> grouped;
    for (auto& msg : messages) {
        auto shard = router.route(msg.route_key);
        grouped[shard].push_back(std::move(msg));
    }

    // 每个 shard 一次性提交
    std::vector<seastar::future<>> futures;
    for (auto& [shard, batch] : grouped) {
        futures.push_back(
            _writers.invoke_on(shard, [batch = std::move(batch)](AsyncWriter& writer) mutable {
                return writer.submit_many(std::move(batch));
            })
        );
    }

    co_await seastar::when_all_succeed(futures.begin(), futures.end());
}

效果

  • 跨 shard 通信次数:N → 分组数(通常 << N)
  • 批量写入效率提升

优化二:单 shard 快路径

seastar::future<> LogEngine::append_batch(std::vector<LogMessage> messages) {
    if (messages.empty()) {
        co_return;
    }

    // 检查是否所有消息都指向同一个 shard
    const auto& first_key = messages[0].route_key;
    bool all_same_shard = true;
    size_t target_shard = router.route(first_key);

    for (size_t i = 1; i < messages.size(); ++i) {
        if (router.route(messages[i].route_key) != target_shard) {
            all_same_shard = false;
            break;
        }
    }

    // 快路径:所有消息指向同一个 shard
    if (all_same_shard) {
        if (target_shard == seastar::this_shard_id()) {
            co_await _writers.local().submit_many(std::move(messages));
        } else {
            co_await _writers.invoke_on(target_shard, [batch = std::move(messages)](AsyncWriter& writer) mutable {
                return writer.submit_many(std::move(batch));
            });
        }
        co_return;
    }

    // 慢路径:按 shard 分组
    // ...
}

性能对比

测试环境

  • Shard 数:4
  • 消息数:1,000,000
  • 消息大小:256 bytes

测试结果

路由策略吞吐量P99 延迟CPU 开销
Hash Modulo1.8M msg/s12μs
Consistent Hashing (256 vnode)1.6M msg/s15μs中等

分析

  • Hash Modulo 性能略好,但差异不大
  • Consistent Hashing 的内存和计算开销可以接受

使用建议

选择 Hash Modulo 的场景

  1. 固定规模系统:shard 数量长期不变
  2. 性能优先:追求极致性能
  3. 简单场景:不需要复杂的路由逻辑
config.routing_strategy = RoutingStrategy::hash_modulo;

选择 Consistent Hashing 的场景

  1. 动态扩容:预期会增删 shard
  2. 生产环境:需要平滑升级
  3. 多数据中心:shard 分布在不同机器
config.routing_strategy = RoutingStrategy::consistent_hashing;
config.routing_virtual_nodes = 256;

空 route_key 处理建议

// 单机场景:使用 local
config.empty_route_policy = EmptyRoutePolicy::local;

// 分布式场景:使用 round_robin
config.empty_route_policy = EmptyRoutePolicy::round_robin;

监控指标

路由均衡度

class RoutingEngine {
public:
    std::map<size_t, size_t> get_route_stats() const {
        std::map<size_t, size_t> stats;
        for (size_t i = 0; i < _shard_count; ++i) {
            stats[i] = _route_counts[i].load();
        }
        return stats;
    }

private:
    std::vector<std::atomic<uint64_t>> _route_counts;
};

告警规则

# 任何 shard 的负载超过平均值的 2 倍
if max(route_counts) > 2 * avg(route_counts):
    alert("路由不均衡!")

总结

路由策略的选择是性能和灵活性的权衡:

维度Hash ModuloConsistent Hashing
性能最高略低
扩容成本高(100% 数据迁移)低(~33% 数据迁移)
实现复杂度简单中等
适用场景固定规模动态扩容

推荐配置

// 生产环境默认推荐
config.routing_strategy = RoutingStrategy::consistent_hashing;
config.routing_virtual_nodes = 256;
config.empty_route_policy = EmptyRoutePolicy::local;

下一篇:《数据一致性与恢复模型:Checkpoint 与 Crash Recovery 的实现》

相关阅读