Per-shard Writer 深度实现:Seastar 无锁并发的核心

深入剖析 AsyncWriter 的内部实现机制,包括队列管理、水位控制、批量处理以及 Seastar sharded 模式的无锁并发原理。

Sharded 模式的魅力

传统并发模型的问题

在传统的多线程日志系统中,我们通常使用全局锁:

// 传统方式:全局锁保护写入
std::mutex global_lock;

void log_message(const std::string& msg) {
    std::lock_guard<std::mutex> lock(global_lock);
    writer.write(msg);
}

性能瓶颈

  1. 锁竞争:多个线程竞争同一个锁,成为新的瓶颈
  2. 上下文切换:线程在等待锁时被调度出去
  3. 缓存失效:多个核心修改同一块缓存行

Seastar Sharded 模式

Seastar 提供了完全不同的并发模型:

// Seastar 方式:sharded 无锁
seastar::sharded<AsyncWriter> _writers;

// 每个 CPU 核心运行独立的 writer
_writers.invoke_on_all([](AsyncWriter& writer) {
    return writer.start(config);
});

核心优势

  • 完全无共享:每个 shard 独立运行,无锁竞争
  • CPU 亲和:每个 shard 绑定到特定核心
  • 零拷贝通信:跨 shard 通信使用消息传递

AsyncWriter 核心数据结构

状态定义

class AsyncWriter {
private:
    // 配置
    EngineConfig _config;

    // 待写入队列
    std::deque<LogMessage> _pending;
    std::atomic<uint64_t> _pending_bytes{0};

    // 序列号管理
    std::atomic<uint64_t> _sequence{0};
    uint64_t _last_flushed_sequence = 0;

    // 批量配置
    size_t _batch_size_bytes = 8192;
    uint64_t _flush_interval_ms = 1;

    // 水位控制
    size_t _max_pending_bytes = 100 * 1024 * 1024;  // 100MB
    size_t _pending_bytes_low_watermark = 50 * 1024 * 1024;  // 50MB
    std::atomic<bool> _backpressure_active{false};

    // 底层组件
    AppendWriter _append_writer;
    LogManager _log_manager;

    // 定时器
    seastar::timer<> _flush_timer;
    seastar::timer<> _checkpoint_timer;

    // 状态控制
    bool _started = false;
    bool _stopping = false;
    bool _flush_in_progress = false;
    seastar::gate _gate;

    // 等待者队列
    std::vector<seastar::promise<>> _backpressure_waiters;
};

内存布局

┌─────────────────────────────────────────────────────┐
│  AsyncWriter Memory Layout (Shard N)               │
├─────────────────────────────────────────────────────┤
│  _pending: deque<LogMessage>                        │
│  ┌─────────────────────────────────────────────┐   │
│  │ Msg 1 │ Msg 2 │ Msg 3 │ ... │ Msg N        │   │
│  │ 2KB   │ 2KB   │ 2KB   │      │ 2KB          │   │
│  └─────────────────────────────────────────────┘   │
│  ↑                                        ↑          │
│  front                                back          │
├─────────────────────────────────────────────────────┤
│  _pending_bytes: 16,384 (原子变量)                  │
├─────────────────────────────────────────────────────┤
│  _sequence: 100 (原子变量)                          │
├─────────────────────────────────────────────────────┤
│  _backpressure_waiters: vector<promise<>>           │
│  ┌─────────────────────────────────────────────┐   │
│  │ promise1 │ promise2 │ ... │ promiseM        │   │
│  └─────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────┘

启动流程

初始化序列

seastar::future<> AsyncWriter::start(EngineConfig config) {
    // 1. 保存配置并验证
    _config = std::move(config);
    _config.validate();

    // 2. 准备目录结构
    co_await _log_manager.prepare(_config);

    // 3. 重置内部状态
    _pending.clear();
    _pending_bytes = 0;
    _sequence = 0;
    _rotation_index = 0;
    reset_metrics();

    // 4. 设置状态标志
    _stopping = false;
    _flush_in_progress = false;
    _started = true;

    // 5. 创建日志目录
    auto shard_id = seastar::this_shard_id();
    std::filesystem::create_directories(_config.log_dir);

    // 6. 初始化 active segment
    _active_segment = layout::active_segment(_config, shard_id);

    // 7. 启动底层写入器
    co_await _append_writer.start(_config, _active_segment.path);

    // 8. 恢复或创建 checkpoint
    if (!_config.truncate_on_start) {
        co_await recover_from_checkpoint();
    } else if (_config.checkpoint_enabled) {
        co_await persist_checkpoint();
    }

    // 9. 启动定时器
    if (_config.flush_interval_ms > 0) {
        _flush_timer.arm_periodic(
            std::chrono::milliseconds(_config.flush_interval_ms)
        );
    }
}

目录结构创建

namespace layout {

std::string active_segment(const EngineConfig& config, size_t shard_id) {
    return fmt::format("{}/shard-{}.log", config.log_dir, shard_id);
}

std::string archive_segment(
    const EngineConfig& config,
    size_t shard_id,
    uint64_t rotation_index
) {
    return fmt::format(
        "{}/shard-{}.{}{}.log",
        config.archive_dir,
        shard_id,
        rotation_index,
        config.compress_archives ? ".gz" : ""
    );
}

std::string checkpoint_file(const EngineConfig& config, size_t shard_id) {
    return fmt::format("{}/shard-{}.checkpoint", config.log_dir, shard_id);
}

}  // namespace layout

消息提交

submit 流程

seastar::future<> AsyncWriter::submit(LogMessage message) {
    // 检查是否在停止状态
    if (_stopping) {
        throw std::runtime_error("Writer is stopping");
    }

    // 背压检查
    co_await check_backpressure();

    // 分配序列号
    uint64_t seq = _sequence.fetch_add(1);

    // 构造 pending entry
    PendingEntry entry{
        .message = std::move(message),
        .sequence = seq,
        .submitted_at = std::chrono::system_clock::now()
    };

    // 加入队列
    {
        std::lock_guard<std::mutex> lock(_pending_mutex);
        _pending.push_back(std::move(entry));
    }

    // 更新字节计数
    size_t msg_bytes = entry.message.payload.size();
    _pending_bytes += msg_bytes;

    // 检查是否需要立即 flush
    if (_pending_bytes >= _batch_size_bytes) {
        co_await flush_once(false, false);
    }
}

submit_many 批量优化

seastar::future<> AsyncWriter::submit_many(std::vector<LogMessage> messages) {
    if (messages.empty()) {
        co_return;
    }

    // 背压检查
    co_await check_backpressure();

    // 批量分配序列号
    uint64_t start_seq = _sequence.fetch_add(messages.size());

    // 构造批量 entry
    std::vector<PendingEntry> entries;
    entries.reserve(messages.size());

    for (size_t i = 0; i < messages.size(); ++i) {
        entries.push_back(PendingEntry{
            .message = std::move(messages[i]),
            .sequence = start_seq + i,
            .submitted_at = std::chrono::system_clock::now()
        });
    }

    // 批量加入队列
    {
        std::lock_guard<std::mutex> lock(_pending_mutex);
        for (auto& entry : entries) {
            _pending.push_back(std::move(entry));
        }
    }

    // 更新字节计数
    size_t total_bytes = 0;
    for (const auto& entry : entries) {
        total_bytes += entry.message.payload.size();
    }
    _pending_bytes += total_bytes;

    // 检查是否需要 flush
    if (_pending_bytes >= _batch_size_bytes) {
        co_await flush_once(false, false);
    }
}

背压机制

水位检测

seastar::future<> AsyncWriter::check_backpressure() {
    // 检查是否达到高水位
    if (_pending_bytes < _max_pending_bytes) {
        co_return;
    }

    // 标记背压激活
    if (!_backpressure_active.exchange(true)) {
        _backpressure_events++;
    }

    // 创建 promise 并加入等待队列
    seastar::promise<> waiter;
    auto fut = waiter.get_future();

    {
        std::lock_guard<std::mutex> lock(_waiters_mutex);
        _backpressure_waiters.push_back(std::move(waiter));
    }

    // 等待水位下降
    co_await fut;
}

通知等待者

void AsyncWriter::notify_backpressure_waiters() {
    // 检查是否降到低水位以下
    if (_pending_bytes > _pending_bytes_low_watermark) {
        return;
    }

    // 如果背压已激活,取消激活
    if (!_backpressure_active.exchange(false)) {
        return;
    }

    // 唤醒所有等待者
    std::vector<seastar::promise<>> waiters;
    {
        std::lock_guard<std::mutex> lock(_waiters_mutex);
        waiters = std::move(_backpressure_waiters);
        _backpressure_waiters.clear();
    }

    for (auto& waiter : waiters) {
        waiter.set_value();
    }
}

水位状态机

正常状态: pending_bytes < low_watermark
    ↓ submit() 添加数据
pending_bytes 上升
    ↓ pending_bytes >= high_watermark
背压状态: 生产者阻塞等待
    ↓ flush() 消费数据
pending_bytes 下降
    ↓ pending_bytes < low_watermark
恢复到正常状态

批量处理

flush_once 实现

seastar::future<> AsyncWriter::flush_once(bool force, bool sync) {
    // 防止并发 flush
    if (_flush_in_progress.exchange(true)) {
        co_return;
    }

    auto flush_guard = seastar::defer([this] {
        _flush_in_progress = false;
    });

    // 检查是否有数据
    {
        std::lock_guard<std::mutex> lock(_pending_mutex);
        if (_pending.empty() && !force) {
            co_return;
        }
    }

    // 提取待写入数据
    std::vector<PendingEntry> to_flush;
    {
        std::lock_guard<std::mutex> lock(_pending_mutex);
        size_t bytes = 0;

        while (!_pending.empty()) {
            auto& entry = _pending.front();

            // 非强制模式下,检查批量大小
            if (!force && bytes >= _batch_size_bytes) {
                break;
            }

            bytes += entry.message.payload.size();
            to_flush.push_back(std::move(_pending.front()));
            _pending.pop_front();
        }

        _pending_bytes -= bytes;
    }

    // 通知背压等待者
    notify_backpressure_waiters();

    if (to_flush.empty()) {
        co_return;
    }

    // 编码为二进制格式
    std::deque<seastar::temporary_buffer<char>> encoded;
    for (const auto& entry : to_flush) {
        auto buffer = RecordCodec::encode(entry);
        encoded.push_back(std::move(buffer));
    }

    // 写入磁盘
    try {
        co_await _append_writer.append_batch(encoded, sync);
        _last_flushed_sequence = to_flush.back().sequence;
    } catch (...) {
        // 写入失败,重新放回队列
        {
            std::lock_guard<std::mutex> lock(_pending_mutex);
            for (auto& entry : to_flush) {
                _pending.push_front(std::move(entry));
            }
        }
        _pending_bytes += /* total_bytes */;
        throw;
    }

    // 更新指标
    _flushed_batches++;
    for (const auto& entry : to_flush) {
        _flushed_bytes += entry.message.payload.size();
    }
}

后台 flush

seastar::future<> AsyncWriter::flush_background(bool force, bool sync) {
    return seastar::with_gate(_gate, [this, force, sync] {
        return flush_once(force, sync);
    });
}

Gate 机制

// 在启动时创建 gate
seastar::gate _gate;

// 在停止时关闭 gate 并等待所有操作完成
seastar::future<> AsyncWriter::stop() {
    _stopping = true;
    _flush_timer.cancel();

    // 关闭 gate,不允许新的后台操作
    co_await _gate.close();

    // 等待所有在 gate 中的操作完成
    co_await _gate.wait();
}

停止流程

优雅关闭

seastar::future<> AsyncWriter::stop() {
    if (!_started) {
        co_return;
    }

    // 1. 标记停止状态
    _stopping = true;

    // 2. 取消定时器
    _flush_timer.cancel();

    // 3. 唤醒所有背压等待者
    notify_backpressure_waiters();

    // 4. 等待所有后台操作完成
    co_await seastar::with_gate(_gate, [this] {
        return seastar::repeat([this] {
            // 检查是否还有待写入数据
            if (pending_entries() == 0) {
                return seastar::make_ready_future<seastar::stop_iteration>(
                    seastar::stop_iteration::yes
                );
            }

            // 继续 flush
            return flush_once(true, true).then([] {
                return seastar::stop_iteration::no;
            });
        });
    });

    // 5. 关闭 gate
    co_await _gate.close();

    // 6. 强制 flush 底层写入器
    co_await _append_writer.flush_tail(true);

    // 7. 保存最终 checkpoint
    if (_config.checkpoint_enabled) {
        co_await persist_checkpoint();
    }

    _started = false;
}

指标收集

Seastar Metrics 集成

void AsyncWriter::setup_metrics() {
    namespace sm = seastar::metrics;

    sm::group("log_engine_writer")
        .make_counter("submitted_messages", _submitted_messages,
            sm::description("Total submitted messages"))
        .make_counter("submitted_bytes", _submitted_bytes,
            sm::description("Total submitted bytes"))
        .make_counter("flushed_batches", _flushed_batches,
            sm::description("Total flushed batches"))
        .make_counter("flushed_bytes", _flushed_bytes,
            sm::description("Total flushed bytes"))
        .make_counter("flush_errors", _flush_errors,
            sm::description("Total flush errors"))
        .make_counter("backpressure_waits", _backpressure_waits,
            sm::description("Total backpressure wait events"))
        .make_gauge("pending_entries", _pending.size(),
            sm::description("Current pending entries in queue"))
        .make_gauge("pending_bytes", _pending_bytes,
            sm::description("Current pending bytes in queue"))
        .make_gauge("logical_size_bytes", _logical_size,
            sm::description("Logical size of active log"));
}

Prometheus 指标导出

# 访问 /metrics 端点
curl http://localhost:19181/metrics

# 输出示例
log_engine_writer_submitted_messages{shard="0"} 1234567
log_engine_writer_submitted_bytes{shard="0"} 345678901
log_engine_writer_flushed_batches{shard="0"} 12345
log_engine_writer_pending_bytes{shard="0"} 123456

性能分析

批量大小影响

批量大小吞吐量 (msg/s)P99 延迟 (μs)CPU 使用率
2KB500K3060%
4KB800K2070%
8KB1.2M1580%
16KB1.5M1285%
64KB1.8M1090%

水位控制效果

策略内存占用P99 延迟影响吞吐量影响
无水位无限增长最终降为 0
100MB 最多≤ 100MB尖峰轻微下降
50MB 最多≤ 50MB尖峰中等下降

最佳实践

配置建议

// 高吞吐场景
config.batch_size_bytes = 65536;        // 64KB
config.flush_interval_ms = 10;           // 10ms
config.max_pending_bytes = 200 * 1024 * 1024;  // 200MB

// 低延迟场景
config.batch_size_bytes = 4096;          // 4KB
config.flush_interval_ms = 1;            // 1ms
config.max_pending_bytes = 50 * 1024 * 1024;   // 50MB

// 平衡场景(推荐)
config.batch_size_bytes = 8192;          // 8KB
config.flush_interval_ms = 1;            // 1ms
config.max_pending_bytes = 100 * 1024 * 1024;  // 100MB

监控建议

# 关键指标告警

# 待写入字节数告警
if pending_bytes > 80MB:
    alert("pending_bytes 过高,可能存在性能瓶颈")

# 背压事件告警
if backpressure_waits_delta > 100:
    alert("背压频繁触发,考虑调大批量大小")

# Flush 错误告警
if flush_errors_delta > 10:
    alert("Flush 错误过多,检查磁盘健康")

总结

AsyncWriter 通过以下机制实现高性能无锁并发:

  1. Seastar Sharded 模式:每个 shard 独立运行,无锁竞争
  2. 智能背压控制:水位机制防止内存溢出
  3. 批量处理优化:减少 I/O 次数
  4. 异步定时器:保证延迟上限
  5. 优雅停止:Gate 机制确保数据不丢失

核心设计理念

  • 无共享状态:每个 shard 独立管理自己的状态
  • 异步优先:所有操作都是异步的,不阻塞 reactor
  • 可观测性:完善的指标系统便于监控和调试

下一篇:《DMA 对齐写入路径:AppendWriter 的底层 I/O 优化》

相关阅读