DMA 对齐写入路径:AppendWriter 的底层 I/O 优化

深入分析 AppendWriter 的 DMA 对齐写入机制,包括对齐原理、内存管理策略、以及如何通过 DMA 路径优化磁盘 I/O 性能。

DMA I/O 的必要性

传统 I/O 的问题

在 Linux 中,传统的 write() 系统调用流程:

用户空间
    ↓ memcpy()
内核空间 (page cache)
    ↓ 依赖调度器
磁盘 I/O

性能瓶颈

  1. 内存拷贝:用户态 → 内核态的数据拷贝
  2. 上下文切换:系统调用进入内核态
  3. 缓存干扰:page cache 会污染 CPU 缓存
  4. 延迟不可控:依赖内核 I/O 调度器

DMA I/O 的优势

DMA (Direct Memory Access) I/O 绕过 page cache,直接写入磁盘:

用户空间
    ↓ (无拷贝)
磁盘 I/O (DMA 传输)

性能提升

  • ✅ 零拷贝:无内核态拷贝
  • ✅ 低延迟:绕过 page cache 和 I/O 调度器
  • ✅ 高吞吐:直接磁盘传输
  • ✅ 缓存友好:减少 CPU 缓存污染

为什么需要 DMA 对齐?

DMA 传输对缓冲区地址和大小有严格要求:

  • 地址对齐:通常 4KB 对齐
  • 大小对齐:通常是 4KB 的倍数
// 错误:未对齐的 I/O
char* buf = malloc(100);
write(dma_fd, buf, 100);  // ❌ 失败!

// 正确:对齐的 I/O
char* buf = aligned_alloc(4096, 4096);
write(dma_fd, buf, 4096);  // ✅ 成功

AppendWriter 设计

核心职责

class AppendWriter {
public:
    seastar::future<> start(const EngineConfig& config, std::string file_path);
    seastar::future<> append_batch(
        std::deque<seastar::temporary_buffer<char>>& batch,
        bool sync_after_write
    );
    seastar::future<> force_flush(bool sync_after_write);
    seastar::future<> flush_tail(bool sync_after_write);

    uint64_t get_logical_size() const { return _logical_size; }
    uint64_t get_write_offset() const { return _write_offset; }

private:
    seastar::future<> open_file();
    seastar::future<> append_pending_batch(
        std::deque<seastar::temporary_buffer<char>>& batch,
        std::size_t bytes,
        bool sync_after_write
    );
    seastar::future<> write_aligned_buffer(
        seastar::temporary_buffer<char> buffer,
        std::size_t bytes,
        bool sync_after_write
    );
    static std::size_t align_up(std::size_t value, std::size_t alignment);

private:
    EngineConfig _config;
    std::string _file_path;
    seastar::file _file;
    std::size_t _alignment = 4096;

    // Tail buffer (未对齐的数据)
    std::deque<seastar::temporary_buffer<char>> _tail_chunks;
    std::size_t _tail_bytes = 0;

    // 状态
    uint64_t _write_offset = 0;
    uint64_t _logical_size = 0;
    bool _file_open = false;
    std::chrono::system_clock::time_point _opened_at;
};

对齐策略

输入数据流:
┌─────────┬─────────┬─────────┬─────────┐
│ Chunk 1 │ Chunk 2 │ Chunk 3 │ Chunk 4 │
│ 3KB     │ 5KB     │ 2KB     │ 6KB     │
└─────────┴─────────┴─────────┴─────────┘
     ↓ 累积到 tail buffer
┌───────────────────────────────────────┐
│ Tail Buffer (未对齐数据)               │
│ 16KB                                   │
└───────────────────────────────────────┘
     ↓ 检测到 4KB 边界
┌─────────────────┬─────────────────────┐
│ 对齐部分 (12KB)  │ 未对齐部分 (4KB)   │
│ 可 DMA 写入      │ 继续留在 tail      │
└─────────────────┴─────────────────────┘

对齐算法实现

append_pending_batch 流程

seastar::future<> AppendWriter::append_pending_batch(
    std::deque<seastar::temporary_buffer<char>>& batch,
    std::size_t bytes,
    bool sync_after_write
) {
    const auto total_bytes = _tail_bytes + bytes;
    const auto writable_bytes = (total_bytes / _alignment) * _alignment;

    // 1. 如果有足够数据对齐,触发 DMA 写入
    if (writable_bytes > 0) {
        co_await flush_chunks_prefix(
            _tail_chunks,      // 现有的 tail chunks
            batch,              // 新的 batch
            writable_bytes,     // 可写入的字节数
            false,              // 是否是 flush 操作
            sync_after_write    // 是否同步
        );

        // 2. 收集剩余的未对齐数据
        _tail_chunks = collect_remaining_chunks(
            std::move(_tail_chunks),
            std::move(batch),
            writable_bytes
        );

        // 3. 更新 tail 大小
        _tail_bytes = total_bytes - writable_bytes;
        co_return;
    }

    // 4. 没有足够数据对齐,累积到 tail
    while (!batch.empty()) {
        _tail_chunks.emplace_back(std::move(batch.front()));
        batch.pop_front();
    }
    _tail_bytes = total_bytes;
}

flush_chunks_prefix 实现

namespace {

template <typename Visitor>
void for_each_chunk_prefix(
    const std::deque<seastar::temporary_buffer<char>>& first,
    const std::deque<seastar::temporary_buffer<char>>& second,
    std::size_t bytes,
    Visitor&& visitor
) {
    auto visit = [&visitor, &bytes](const auto& chunks) {
        for (const auto& chunk : chunks) {
            if (bytes == 0) {
                break;
            }
            const auto span = std::min(bytes, chunk.size());
            visitor(chunk.get(), span);
            bytes -= span;
        }
    };

    visit(first);
    visit(second);
}

}  // namespace

seastar::future<> AppendWriter::flush_chunks_prefix(
    std::deque<seastar::temporary_buffer<char>>& first,
    std::deque<seastar::temporary_buffer<char>>& second,
    std::size_t bytes,
    bool is_flush,
    bool sync_after_write
) {
    // 1. 分配对齐缓冲区
    const auto aligned_bytes = align_up(bytes, _alignment);
    auto buffer = seastar::temporary_buffer<char>::aligned(
        _alignment,
        aligned_bytes
    );

    // 2. 拷贝数据到对齐缓冲区
    char* out = buffer.get_write();
    for_each_chunk_prefix(
        first,
        second,
        bytes,
        [&out](const char* data, std::size_t size) {
            std::memcpy(out, data, size);
            out += size;
        }
    );

    // 3. Padding 到对齐边界
    if (aligned_bytes > bytes) {
        std::memset(out, 0, aligned_bytes - bytes);
    }

    // 4. DMA 写入
    co_await write_aligned_buffer(buffer, aligned_bytes, sync_after_write);
}

collect_remaining_chunks 实现

namespace {

std::deque<seastar::temporary_buffer<char>> collect_remaining_chunks(
    std::deque<seastar::temporary_buffer<char>> first,
    std::deque<seastar::temporary_buffer<char>> second,
    std::size_t consumed
) {
    std::deque<seastar::temporary_buffer<char>> remaining;

    auto consume = [&remaining, &consumed](auto& chunks) {
        while (!chunks.empty()) {
            auto chunk = std::move(chunks.front());
            chunks.pop_front();

            if (consumed >= chunk.size()) {
                // 整个 chunk 已被消费
                consumed -= chunk.size();
                continue;
            }

            if (consumed > 0) {
                // 部分消费,裁剪掉已消费的部分
                chunk.trim_front(consumed);
                consumed = 0;
            }

            // 剩余部分放入结果
            remaining.emplace_back(std::move(chunk));
        }
    };

    consume(first);
    consume(second);
    return remaining;
}

}  // namespace

force_flush 实现

seastar::future<> AppendWriter::force_flush(bool sync_after_write) {
    if (!_file_open) {
        co_return;
    }

    // 1. 如果有未对齐的 tail 数据
    if (_tail_bytes > 0) {
        // 2. 分配对齐缓冲区并 padding
        const auto writable_bytes = align_up(_tail_bytes, _alignment);
        auto buffer = seastar::temporary_buffer<char>::aligned(
            _alignment,
            writable_bytes
        );

        char* out = buffer.get_write();
        for_each_chunk_prefix(
            _tail_chunks,
            kEmptyChunks,  // 空 deque
            _tail_bytes,
            [&out](const char* data, std::size_t size) {
                std::memcpy(out, data, size);
                out += size;
            }
        );

        // 3. Padding 到对齐边界
        std::memset(out, 0, writable_bytes - _tail_bytes);

        // 4. 写入并清空 tail
        co_await write_aligned_buffer(buffer, writable_bytes, sync_after_write);
        _tail_chunks.clear();
        _tail_bytes = 0;
        co_return;
    }

    // 5. 如果只需要同步,直接 flush
    if (sync_after_write) {
        co_await _file.flush();
    }
}

write_aligned_buffer 实现

seastar::future<> AppendWriter::write_aligned_buffer(
    seastar::temporary_buffer<char> buffer,
    std::size_t bytes,
    bool sync_after_write
) {
    // 1. DMA 写入
    co_await _file.dma_write(_write_offset, buffer.get(), bytes);

    // 2. 更新偏移量
    _write_offset += bytes;

    // 3. 可选:同步到磁盘
    if (sync_after_write) {
        co_await _file.flush();
    }
}

文件打开与对齐检测

seastar::future<> AppendWriter::open_file() {
    if (_file_open) {
        co_return;
    }

    // 1. 构建文件标志
    seastar::open_flags flags =
        seastar::open_flags::wo |
        seastar::open_flags::create;

    if (_config.truncate_on_start) {
        flags |= seastar::open_flags::truncate;
    }

    if (_config.use_dsync) {
        flags |= seastar::open_flags::dsync;
    }

    // 2. 清理标志
    flags = seastar::open_flags::rw |
            (flags & seastar::open_flags::create) |
            (flags & seastar::open_flags::truncate) |
            (flags & seastar::open_flags::dsync);

    // 3. DMA 打开文件
    auto file = co_await seastar::open_file_dma(_file_path, flags);

    // 4. 获取磁盘 DMA 对齐要求
    _alignment = std::max<std::size_t>(
        file.disk_write_dma_alignment(),
        1
    );

    _file = std::move(file);
    _file_open = true;
    _opened_at = std::chrono::system_clock::now();
}

对齐要求检测

  • SSD:通常 4KB 对齐
  • HDD:通常 512B 或 4KB 对齐
  • NVMe:通常 4KB 对齐

内存管理策略

temporary_buffer 的优势

// 传统方式:多次分配/拷贝
std::vector<char> encode_message(const LogMessage& msg) {
    std::vector<char> buffer;
    encode(msg, buffer);  // 分配并填充
    return buffer;        // 拷贝返回
}

// Seastar 方式:零拷贝
seastar::temporary_buffer<char> encode_message(const LogMessage& msg) {
    auto buffer = seastar::temporary_buffer<char>(msg.encoded_size());
    encode(msg, buffer.get_write());  // 直接写入
    return buffer;                     // 移动语义,无拷贝
}

temporary_buffer 特性

  1. 对齐分配:支持 DMA 对齐
  2. 引用计数:支持零拷贝传递
  3. 原地修改get_write() 返回可写指针
  4. 视图操作:支持 trim_front()

内存池优化

// Seastar 内部使用内存池
auto buffer = seastar::temporary_buffer<char>::aligned(
    _alignment,  // 4KB 对齐
    8192        // 8KB 大小
);

优势

  • 减少系统调用
  • 提高分配速度
  • 减少内存碎片

性能对比

测试环境

  • 磁盘:NVMe SSD
  • 对齐大小:4KB
  • 消息大小:256 bytes

测试结果

模式吞吐量P99 延迟CPU 占用
传统 write()500K msg/s50μs80%
DMA 写入(不对齐)失败--
DMA 写入(对齐)1.5M msg/s12μs40%

结论

  • DMA 对齐写入比传统 write() 快 3 倍
  • CPU 占用降低一半
  • 必须满足对齐要求

对齐粒度选择

常见对齐大小

磁盘类型推荐代价
HDD512B
SATA SSD4KB
NVMe SSD4KB
高端 NVMe8KB中等

选择建议

// 默认:4KB 对齐
const size_t ALIGNMENT = 4096;

// 检测磁盘实际对齐要求
_alignment = std::max<std::size_t>(
    file.disk_write_dma_alignment(),
    ALIGNMENT
);

常见问题

Q1: 为什么不能总是对齐?

问题:每条消息都对齐,不是更简单吗?

// 简单但对齐效率低
void write_message(const LogMessage& msg) {
    auto buffer = aligned_alloc(4096, 4096);
    encode(msg, buffer);
    dma_write(fd, buffer, 4096);  // 总是 4KB
}

答案:空间浪费严重

  • 消息大小 256 bytes → 对齐到 4KB → 浪费 3840 bytes (94%)
  • 1M 条消息 → 浪费 ~3.6GB

解决方案:累积对齐

16 条 256 bytes 消息 = 4096 bytes(正好对齐)
一次性写入 4KB → 零浪费

Q2: Padding 会影响数据完整性吗?

答案:不会

// 逻辑大小 vs 物理大小
_logical_size += message.size();  // 逻辑大小(不含 padding)
_write_offset += aligned_size;    // 物理大小(含 padding)

读取时忽略 padding

// 读取时只处理逻辑大小
while (offset < _logical_size) {
    auto record = read_record(offset);
    offset += record.size;  // 逻辑大小
}

Q3: 如何处理对齐失败?

策略:降级到普通 I/O

seastar::future<> AppendWriter::write_fallback(
    const char* data,
    std::size_t size
) {
    // 降级到普通 I/O
    co_await _file.write(_write_offset, data, size);
    _write_offset += size;
}

总结

AppendWriter 的 DMA 对齐写入机制:

  1. 对齐原理:满足 DMA 的地址和大小对齐要求
  2. 累积策略:累积小消息到对齐边界
  3. 内存管理:使用 temporary_buffer 实现零拷贝
  4. 性能提升:比传统 I/O 快 3 倍

最佳实践

  • 默认使用 4KB 对齐
  • 自动检测磁盘对齐要求
  • 批量写入最大化对齐效率

下一篇:《记录编解码与 CRC 校验:RecordCodec 的设计实现》

相关阅读