整体架构与核心模块:深度解析 Seastar Log Engine 的组件设计

从模块层面拆解 Seastar Log Engine 的整体架构,包括 Per-shard Writer、路由引擎、日志管理器等核心组件的设计思想和职责边界。

架构总览

Seastar Log Engine 采用了清晰的分层架构,每个组件职责单一,便于测试和维护:

┌─────────────────────────────────────────────────────┐
│                    Application Layer                 │
│            (compat_glog / query_server)              │
├─────────────────────────────────────────────────────┤
│                      LogEngine                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  │
│  │   Router    │  │   Config    │  │  Metrics    │  │
│  │   Engine    │  │   Loader    │  │  Exporter   │  │
│  └─────────────┘  └─────────────┘  └─────────────┘  │
├─────────────────────────────────────────────────────┤
│            seastar::sharded<AsyncWriter>             │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐   │
│  │ Shard 0 │ │ Shard 1 │ │ Shard 2 │ │ Shard N │   │
│  └─────────┘ └─────────┘ └─────────┘ └─────────┘   │
├─────────────────────────────────────────────────────┤
│                   Storage Layer                      │
│  ┌───────────────────────────────────────────────┐  │
│  │  Active Logs + Archived Logs + Checkpoint     │  │
│  └───────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────┘

核心组件详解

1. LogEngine:门面层

职责:对外提供统一的写入接口,协调内部组件

核心接口

class LogEngine {
public:
    seastar::future<> start(EngineConfig config);
    seastar::future<> stop();
    seastar::future<> append(LogMessage message);
    seastar::future<> append_batch(std::vector<LogMessage> messages);

private:
    seastar::sharded<AsyncWriter> _writers;
    RoutingEngine _router;
    EngineConfig _config;
    bool _started = false;
};

设计要点

  1. 配置验证前置:在 start 时验证配置有效性
  2. 状态机管理:通过 _started 标志防止重复启动/停止
  3. 批量优化append_batch 提供批量路径,减少跨 shard 通信

启动流程

seastar::future<> LogEngine::start(EngineConfig config) {
    _config = std::move(config);
    _config.validate();  // 配置验证

    co_await _writers.start();  // 启动所有 shard

    // 在每个 shard 上初始化 AsyncWriter
    co_await _writers.invoke_on_all([cfg = _config](AsyncWriter& writer) mutable {
        return writer.start(std::move(cfg));
    });

    // 配置路由引擎
    _router.configure(
        _config.routing_strategy,
        _config.routing_virtual_nodes,
        seastar::smp::count
    );

    _started = true;
}

2. AsyncWriter:Per-shard 写入核心

职责:管理单个 shard 的写入逻辑,包括批处理、刷盘、rotate 等

内部状态

class AsyncWriter {
private:
    std::deque<LogMessage> _pending_queue;  // 待写入队列
    size_t _pending_bytes = 0;              // 待写入字节数
    AppendWriter _append_writer;            // 实际写入器
    seastar::timer<> _flush_timer;          // 定时刷盘定时器

    // 配置参数
    size_t _batch_size_bytes;
    uint64_t _flush_interval_ms;
    AckMode _ack_mode;

    // 水位控制
    size_t _max_pending_bytes;
    size_t _pending_bytes_low_watermark;
};

写入流程

submit() → 加入 pending_queue → 检查水位 → batch/flush 触发 → 写入磁盘
    ↓                                                      ↓
  (空间不足时阻塞)                                        (返回 future)

关键代码

seastar::future<> AsyncWriter::submit(LogMessage message) {
    // 水位检查,防止内存溢出
    while (_pending_bytes >= _max_pending_bytes) {
        _backpressure_waits++;
        co_await seastar::yield();
    }

    // 加入待写入队列
    _pending_queue.push_back(std::move(message));
    _pending_bytes += message.payload.size();

    // 尝试触发批量写入
    if (_pending_bytes >= _batch_size_bytes) {
        co_await flush_batch();
    }
}

3. RoutingEngine:智能路由决策

职责:根据 route_key 决定消息应该写入哪个 shard

支持的路由策略

class RoutingEngine {
public:
    enum class Strategy {
        hash_modulo,           // 简单取模
        consistent_hashing     // 一致性哈希
    };

    void configure(Strategy strategy, size_t virtual_nodes, size_t shard_count);
    size_t route(const std::string& key) const;

private:
    Strategy _strategy;
    size_t _virtual_nodes;
    size_t _shard_count;
    std::map<size_t, size_t> _hash_ring;  // 一致性哈希环
};

Hash Modulo 实现

size_t RoutingEngine::route_hash_modulo(const std::string& key) const {
    if (key.empty()) {
        return seastar::this_shard_id();  // 空键回退到本地 shard
    }

    std::hash<std::string> hasher;
    return hasher(key) % _shard_count;
}

Consistent Hashing 实现

void RoutingEngine::setup_consistent_hashing() {
    for (size_t shard = 0; shard < _shard_count; ++shard) {
        for (size_t vnode = 0; vnode < _virtual_nodes; ++vnode) {
            // 虚拟节点名:shard:vnode
            std::string vnode_key = fmt::format("{}:{}", shard, vnode);
            size_t hash = std::hash<std::string>{}(vnode_key);
            _hash_ring[hash] = shard;
        }
    }
}

size_t RoutingEngine::route_consistent_hashing(const std::string& key) const {
    if (key.empty()) {
        return seastar::this_shard_id();
    }

    size_t hash = std::hash<std::string>{}(key);

    // 在哈希环上找到第一个 >= hash 的节点
    auto it = _hash_ring.lower_bound(hash);
    if (it == _hash_ring.end()) {
        // 环绕到第一个节点
        return _hash_ring.begin()->second;
    }

    return it->second;
}

路由策略对比

策略优点缺点适用场景
Hash Modulo简单高效扩容需要重路由固定 shard 数
Consistent Hashing平滑扩容需维护虚拟节点动态分片

4. LogManager:生命周期管理

职责:协调 rotate、archive、checkpoint 等生命周期操作

核心功能

class LogManager {
public:
    seastar::future<> check_rotate(const AppendWriter& writer);
    seastar::future<> archive_old_logs();
    seastar::future<> create_checkpoint();
    seastar::future<> recovery_from_checkpoint();

private:
    std::string _log_dir;
    std::string _archive_dir;
    size_t _rotate_size_bytes;
    uint64_t _rotate_interval_seconds;
    bool _compress_archives;
};

Rotate 逻辑

seastar::future<> LogManager::check_rotate(const AppendWriter& writer) {
    const auto& stats = writer.get_stats();

    // 按大小触发
    if (stats.logical_size_bytes >= _rotate_size_bytes) {
        co_await rotate_log(writer);
        co_return;
    }

    // 按时间触发
    auto now = seastar::lowres_system_clock::now();
    auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
        now - stats.rotate_time
    ).count();

    if (elapsed >= _rotate_interval_seconds) {
        co_await rotate_log(writer);
    }
}

Archive 逻辑

seastar::future<> LogManager::archive_old_logs() {
    auto entries = co_await list_log_files();

    for (const auto& entry : entries) {
        if (should_archive(entry)) {
            if (_compress_archives) {
                co_await gzip_compress(entry.path);
            }
            co_await move_to_archive(entry.path);
        }
    }

    co_await cleanup_old_archives();
}

5. AppendWriter:底层写入器

职责:处理实际的磁盘 I/O,确保 DMA 对齐

关键特性

class AppendWriter {
public:
    seastar::future<> append(const std::vector<char>& buffer);
    seastar::future<> flush();
    uint64_t get_logical_size() const;

private:
    seastar::file _file;
    uint64_t _logical_size = 0;
    uint64_t _file_offset = 0;
    bool _dma_aligned = true;

    static constexpr size_t ALIGNMENT = 4096;  // 4KB 对齐
};

DMA 对齐写入

seastar::future<> AppendWriter::append(const std::vector<char>& buffer) {
    size_t offset = _file_offset;

    // 计算 padding,确保 DMA 对齐
    size_t padding = 0;
    if (_dma_aligned) {
        padding = (ALIGNMENT - (offset % ALIGNMENT)) % ALIGNMENT;
    }

    // 写入 padding(如果是 DMA 模式)
    if (padding > 0) {
        std::vector<char> padding_buf(padding, 0);
        co_await _file.dma_write(offset, padding_buf.data(), padding);
        offset += padding;
    }

    // 写入实际数据
    co_await _file.dma_write(offset, buffer.data(), buffer.size());

    _file_offset = offset + buffer.size();
    _logical_size += buffer.size();  // 记录逻辑大小(不含 padding)
}

为何需要 DMA 对齐?

  • 避免内核态拷贝
  • 减少系统调用次数
  • 利用硬件直接内存访问

组件交互流程

典型写入流程

Application
    ↓
LogEngine::append(message)
    ↓
RoutingEngine::route(key) → 决定 shard
    ↓
AsyncWriter::submit(message) → 加入 pending_queue
    ↓
Batch/Flush 触发
    ↓
RecordCodec::encode(message) → 编码为二进制
    ↓
AppendWriter::append(buffer) → DMA 对齐写入
    ↓
LogManager::check_rotate() → 检查是否需要 rotate

批量优化流程

Application
    ↓
LogEngine::append_batch(messages)
    ↓
按 route_key 分组
    ↓
每个 shard 调用 submit_many()
    ↓
AsyncWriter::submit_many(batch)
    ↓
一次性编码整个批次
    ↓
AppendWriter::append(batch_buffer)
    ↓
减少跨 shard 通信次数

配置管理

EngineConfig 结构

struct EngineConfig {
    // 路由配置
    RoutingStrategy routing_strategy = RoutingStrategy::hash_modulo;
    size_t routing_virtual_nodes = 256;
    EmptyRoutePolicy empty_route_policy = EmptyRoutePolicy::local;

    // 批量配置
    size_t batch_size_bytes = 8192;
    uint64_t flush_interval_ms = 1;

    // Rotate 配置
    size_t rotate_size_bytes = 104857600;  // 100MB
    uint64_t rotate_interval_seconds = 86400;  // 1天

    // Archive 配置
    bool compress_archives = true;
    size_t max_archive_count = 100;
    uint64_t max_archive_age_seconds = 2592000;  // 30天

    // 水位控制
    size_t max_pending_bytes = 104857600;  // 100MB
    size_t pending_bytes_low_watermark = 52428800;  // 50MB

    // Checkpoint 配置
    bool checkpoint_enabled = true;
    uint64_t checkpoint_interval_seconds = 60;

    // 确认语义
    AckMode ack_mode = AckMode::write_ack;

    void validate() const;
};

配置文件格式

[log]
log_dir = "./logs"
archive_dir = "./archive"

[routing]
strategy = "consistent_hashing"
virtual_nodes = 256
empty_route_policy = "local"

[batch]
size_bytes = 8192
flush_interval_ms = 1

[rotate]
size_bytes = 104857600
interval_seconds = 86400

[watermark]
max_pending_bytes = 104857600
low_watermark_bytes = 52428800

[checkpoint]
enabled = true
interval_seconds = 60

指标导出

Writer Metrics

seastar::metrics::group("log_engine_writer")
    .make_counter("submitted_messages", _submitted_messages)
    .make_counter("submitted_bytes", _submitted_bytes)
    .make_counter("flushed_batches", _flushed_batches)
    .make_counter("flushed_bytes", _flushed_bytes)
    .make_counter("flush_errors", _flush_errors)
    .make_counter("backpressure_waits", _backpressure_waits)
    .make_gauge("pending_entries", _pending_queue.size())
    .make_gauge("pending_bytes", _pending_bytes)
    .make_gauge("waiting_submitters", _waiting_submitters)
    .make_gauge("logical_size_bytes", _logical_size);

Reader Metrics

seastar::metrics::group("log_engine_reader")
    .make_counter("segments_read", _segments_read)
    .make_counter("archive_segments_read", _archive_segments_read)
    .make_counter("active_segments_read", _active_segments_read)
    .make_counter("records_returned", _records_returned)
    .make_counter("corrupted_segments", _corrupted_segments)
    .make_counter("corrupted_lines", _corrupted_lines)
    .make_counter("gzip_read_errors", _gzip_read_errors);

总结

Seastar Log Engine 的架构设计遵循以下原则:

  1. 职责单一:每个组件专注一个功能领域
  2. 无锁并发:利用 Seastar 的 sharded 机制避免锁竞争
  3. 异步优先:所有 I/O 操作都是异步的,不阻塞 reactor
  4. 可观测性强:通过 metrics 导出所有关键指标

后续文章将深入讲解各个核心组件的实现细节:

  • Per-shard Writer:深入 AsyncWriter 的批处理和水位控制
  • DMA 对齐路径:AppendWriter 的底层 I/O 优化
  • 路由策略实现:一致哈希的工程实践

下一篇:《异步批量写入模型:深入 Per-shard Writer 的设计》

相关阅读