整体架构与核心模块:深度解析 Seastar Log Engine 的组件设计
从模块层面拆解 Seastar Log Engine 的整体架构,包括 Per-shard Writer、路由引擎、日志管理器等核心组件的设计思想和职责边界。
架构总览
Seastar Log Engine 采用了清晰的分层架构,每个组件职责单一,便于测试和维护:
┌─────────────────────────────────────────────────────┐
│ Application Layer │
│ (compat_glog / query_server) │
├─────────────────────────────────────────────────────┤
│ LogEngine │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Router │ │ Config │ │ Metrics │ │
│ │ Engine │ │ Loader │ │ Exporter │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ seastar::sharded<AsyncWriter> │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Shard 0 │ │ Shard 1 │ │ Shard 2 │ │ Shard N │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────────────┤
│ Storage Layer │
│ ┌───────────────────────────────────────────────┐ │
│ │ Active Logs + Archived Logs + Checkpoint │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
核心组件详解
1. LogEngine:门面层
职责:对外提供统一的写入接口,协调内部组件
核心接口:
class LogEngine {
public:
seastar::future<> start(EngineConfig config);
seastar::future<> stop();
seastar::future<> append(LogMessage message);
seastar::future<> append_batch(std::vector<LogMessage> messages);
private:
seastar::sharded<AsyncWriter> _writers;
RoutingEngine _router;
EngineConfig _config;
bool _started = false;
};
设计要点:
- 配置验证前置:在
start时验证配置有效性 - 状态机管理:通过
_started标志防止重复启动/停止 - 批量优化:
append_batch提供批量路径,减少跨 shard 通信
启动流程:
seastar::future<> LogEngine::start(EngineConfig config) {
_config = std::move(config);
_config.validate(); // 配置验证
co_await _writers.start(); // 启动所有 shard
// 在每个 shard 上初始化 AsyncWriter
co_await _writers.invoke_on_all([cfg = _config](AsyncWriter& writer) mutable {
return writer.start(std::move(cfg));
});
// 配置路由引擎
_router.configure(
_config.routing_strategy,
_config.routing_virtual_nodes,
seastar::smp::count
);
_started = true;
}
2. AsyncWriter:Per-shard 写入核心
职责:管理单个 shard 的写入逻辑,包括批处理、刷盘、rotate 等
内部状态:
class AsyncWriter {
private:
std::deque<LogMessage> _pending_queue; // 待写入队列
size_t _pending_bytes = 0; // 待写入字节数
AppendWriter _append_writer; // 实际写入器
seastar::timer<> _flush_timer; // 定时刷盘定时器
// 配置参数
size_t _batch_size_bytes;
uint64_t _flush_interval_ms;
AckMode _ack_mode;
// 水位控制
size_t _max_pending_bytes;
size_t _pending_bytes_low_watermark;
};
写入流程:
submit() → 加入 pending_queue → 检查水位 → batch/flush 触发 → 写入磁盘
↓ ↓
(空间不足时阻塞) (返回 future)
关键代码:
seastar::future<> AsyncWriter::submit(LogMessage message) {
// 水位检查,防止内存溢出
while (_pending_bytes >= _max_pending_bytes) {
_backpressure_waits++;
co_await seastar::yield();
}
// 加入待写入队列
_pending_queue.push_back(std::move(message));
_pending_bytes += message.payload.size();
// 尝试触发批量写入
if (_pending_bytes >= _batch_size_bytes) {
co_await flush_batch();
}
}
3. RoutingEngine:智能路由决策
职责:根据 route_key 决定消息应该写入哪个 shard
支持的路由策略:
class RoutingEngine {
public:
enum class Strategy {
hash_modulo, // 简单取模
consistent_hashing // 一致性哈希
};
void configure(Strategy strategy, size_t virtual_nodes, size_t shard_count);
size_t route(const std::string& key) const;
private:
Strategy _strategy;
size_t _virtual_nodes;
size_t _shard_count;
std::map<size_t, size_t> _hash_ring; // 一致性哈希环
};
Hash Modulo 实现:
size_t RoutingEngine::route_hash_modulo(const std::string& key) const {
if (key.empty()) {
return seastar::this_shard_id(); // 空键回退到本地 shard
}
std::hash<std::string> hasher;
return hasher(key) % _shard_count;
}
Consistent Hashing 实现:
void RoutingEngine::setup_consistent_hashing() {
for (size_t shard = 0; shard < _shard_count; ++shard) {
for (size_t vnode = 0; vnode < _virtual_nodes; ++vnode) {
// 虚拟节点名:shard:vnode
std::string vnode_key = fmt::format("{}:{}", shard, vnode);
size_t hash = std::hash<std::string>{}(vnode_key);
_hash_ring[hash] = shard;
}
}
}
size_t RoutingEngine::route_consistent_hashing(const std::string& key) const {
if (key.empty()) {
return seastar::this_shard_id();
}
size_t hash = std::hash<std::string>{}(key);
// 在哈希环上找到第一个 >= hash 的节点
auto it = _hash_ring.lower_bound(hash);
if (it == _hash_ring.end()) {
// 环绕到第一个节点
return _hash_ring.begin()->second;
}
return it->second;
}
路由策略对比:
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Hash Modulo | 简单高效 | 扩容需要重路由 | 固定 shard 数 |
| Consistent Hashing | 平滑扩容 | 需维护虚拟节点 | 动态分片 |
4. LogManager:生命周期管理
职责:协调 rotate、archive、checkpoint 等生命周期操作
核心功能:
class LogManager {
public:
seastar::future<> check_rotate(const AppendWriter& writer);
seastar::future<> archive_old_logs();
seastar::future<> create_checkpoint();
seastar::future<> recovery_from_checkpoint();
private:
std::string _log_dir;
std::string _archive_dir;
size_t _rotate_size_bytes;
uint64_t _rotate_interval_seconds;
bool _compress_archives;
};
Rotate 逻辑:
seastar::future<> LogManager::check_rotate(const AppendWriter& writer) {
const auto& stats = writer.get_stats();
// 按大小触发
if (stats.logical_size_bytes >= _rotate_size_bytes) {
co_await rotate_log(writer);
co_return;
}
// 按时间触发
auto now = seastar::lowres_system_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - stats.rotate_time
).count();
if (elapsed >= _rotate_interval_seconds) {
co_await rotate_log(writer);
}
}
Archive 逻辑:
seastar::future<> LogManager::archive_old_logs() {
auto entries = co_await list_log_files();
for (const auto& entry : entries) {
if (should_archive(entry)) {
if (_compress_archives) {
co_await gzip_compress(entry.path);
}
co_await move_to_archive(entry.path);
}
}
co_await cleanup_old_archives();
}
5. AppendWriter:底层写入器
职责:处理实际的磁盘 I/O,确保 DMA 对齐
关键特性:
class AppendWriter {
public:
seastar::future<> append(const std::vector<char>& buffer);
seastar::future<> flush();
uint64_t get_logical_size() const;
private:
seastar::file _file;
uint64_t _logical_size = 0;
uint64_t _file_offset = 0;
bool _dma_aligned = true;
static constexpr size_t ALIGNMENT = 4096; // 4KB 对齐
};
DMA 对齐写入:
seastar::future<> AppendWriter::append(const std::vector<char>& buffer) {
size_t offset = _file_offset;
// 计算 padding,确保 DMA 对齐
size_t padding = 0;
if (_dma_aligned) {
padding = (ALIGNMENT - (offset % ALIGNMENT)) % ALIGNMENT;
}
// 写入 padding(如果是 DMA 模式)
if (padding > 0) {
std::vector<char> padding_buf(padding, 0);
co_await _file.dma_write(offset, padding_buf.data(), padding);
offset += padding;
}
// 写入实际数据
co_await _file.dma_write(offset, buffer.data(), buffer.size());
_file_offset = offset + buffer.size();
_logical_size += buffer.size(); // 记录逻辑大小(不含 padding)
}
为何需要 DMA 对齐?
- 避免内核态拷贝
- 减少系统调用次数
- 利用硬件直接内存访问
组件交互流程
典型写入流程
Application
↓
LogEngine::append(message)
↓
RoutingEngine::route(key) → 决定 shard
↓
AsyncWriter::submit(message) → 加入 pending_queue
↓
Batch/Flush 触发
↓
RecordCodec::encode(message) → 编码为二进制
↓
AppendWriter::append(buffer) → DMA 对齐写入
↓
LogManager::check_rotate() → 检查是否需要 rotate
批量优化流程
Application
↓
LogEngine::append_batch(messages)
↓
按 route_key 分组
↓
每个 shard 调用 submit_many()
↓
AsyncWriter::submit_many(batch)
↓
一次性编码整个批次
↓
AppendWriter::append(batch_buffer)
↓
减少跨 shard 通信次数
配置管理
EngineConfig 结构
struct EngineConfig {
// 路由配置
RoutingStrategy routing_strategy = RoutingStrategy::hash_modulo;
size_t routing_virtual_nodes = 256;
EmptyRoutePolicy empty_route_policy = EmptyRoutePolicy::local;
// 批量配置
size_t batch_size_bytes = 8192;
uint64_t flush_interval_ms = 1;
// Rotate 配置
size_t rotate_size_bytes = 104857600; // 100MB
uint64_t rotate_interval_seconds = 86400; // 1天
// Archive 配置
bool compress_archives = true;
size_t max_archive_count = 100;
uint64_t max_archive_age_seconds = 2592000; // 30天
// 水位控制
size_t max_pending_bytes = 104857600; // 100MB
size_t pending_bytes_low_watermark = 52428800; // 50MB
// Checkpoint 配置
bool checkpoint_enabled = true;
uint64_t checkpoint_interval_seconds = 60;
// 确认语义
AckMode ack_mode = AckMode::write_ack;
void validate() const;
};
配置文件格式
[log]
log_dir = "./logs"
archive_dir = "./archive"
[routing]
strategy = "consistent_hashing"
virtual_nodes = 256
empty_route_policy = "local"
[batch]
size_bytes = 8192
flush_interval_ms = 1
[rotate]
size_bytes = 104857600
interval_seconds = 86400
[watermark]
max_pending_bytes = 104857600
low_watermark_bytes = 52428800
[checkpoint]
enabled = true
interval_seconds = 60
指标导出
Writer Metrics
seastar::metrics::group("log_engine_writer")
.make_counter("submitted_messages", _submitted_messages)
.make_counter("submitted_bytes", _submitted_bytes)
.make_counter("flushed_batches", _flushed_batches)
.make_counter("flushed_bytes", _flushed_bytes)
.make_counter("flush_errors", _flush_errors)
.make_counter("backpressure_waits", _backpressure_waits)
.make_gauge("pending_entries", _pending_queue.size())
.make_gauge("pending_bytes", _pending_bytes)
.make_gauge("waiting_submitters", _waiting_submitters)
.make_gauge("logical_size_bytes", _logical_size);
Reader Metrics
seastar::metrics::group("log_engine_reader")
.make_counter("segments_read", _segments_read)
.make_counter("archive_segments_read", _archive_segments_read)
.make_counter("active_segments_read", _active_segments_read)
.make_counter("records_returned", _records_returned)
.make_counter("corrupted_segments", _corrupted_segments)
.make_counter("corrupted_lines", _corrupted_lines)
.make_counter("gzip_read_errors", _gzip_read_errors);
总结
Seastar Log Engine 的架构设计遵循以下原则:
- 职责单一:每个组件专注一个功能领域
- 无锁并发:利用 Seastar 的 sharded 机制避免锁竞争
- 异步优先:所有 I/O 操作都是异步的,不阻塞 reactor
- 可观测性强:通过 metrics 导出所有关键指标
后续文章将深入讲解各个核心组件的实现细节:
- Per-shard Writer:深入 AsyncWriter 的批处理和水位控制
- DMA 对齐路径:AppendWriter 的底层 I/O 优化
- 路由策略实现:一致哈希的工程实践
下一篇:《异步批量写入模型:深入 Per-shard Writer 的设计》
相关阅读: