Rotate 与 Archive 设计:日志生命周期管理

深入分析日志文件的 Rotate(滚动)和 Archive(归档)机制,包括触发策略、归档格式、清理策略等生命周期管理设计。

日志生命周期

问题场景

长期运行的日志系统会产生无限的日志文件:

时间线:
T0:    shard-0.log (100MB)
T1:    shard-0.log (200MB)
T2:    shard-0.log (300MB)
T3:    shard-0.log (400MB)
...
问题:文件无限增长,磁盘空间耗尽

解决方案:生命周期管理

┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
│   Write  │ ──→ │  Rotate  │ ──→ │  Archive │ ──→ │ Cleanup  │
│  (Active)│     │ (滚动)   │     │  (归档)  │     │  (清理)  │
└──────────┘     └──────────┘     └──────────┘     └──────────┘
                      ↓                  ↓                  ↓
                旧文件归档          压缩存储           清理过期文件
                创建新文件          释放空间          控制总容量

Rotate:日志滚动

触发条件

struct RotateConfig {
    // 按大小触发
    size_t rotate_size_bytes = 100 * 1024 * 1024;  // 100MB

    // 按时间触发
    uint64_t rotate_interval_seconds = 86400;  // 1天
};

触发检查

seastar::future<> AsyncWriter::check_rotation() {
    const auto& stats = _append_writer.get_stats();

    // 1. 按大小触发
    if (stats.logical_size >= _config.rotate_size_bytes) {
        co_await rotate_log();
        co_return;
    }

    // 2. 按时间触发
    auto now = std::chrono::system_clock::now();
    auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
        now - stats.opened_at
    ).count();

    if (elapsed >= _config.rotate_interval_seconds) {
        co_await rotate_log();
    }
}

Rotate 流程

1. 检查触发条件
        ↓
2. flush_tail() 确保所有数据落盘
        ↓
3. 创建 checkpoint
        ↓
4. 重命名 active log → archive log
        ↓
5. 创建新的 active log
        ↓
6. 更新 rotation_index

实现代码

seastar::future<> AsyncWriter::rotate_log() {
    // 1. 确保所有数据落盘
    co_await _append_writer.flush_tail(true);

    // 2. 创建 checkpoint
    if (_config.checkpoint_enabled) {
        co_await persist_checkpoint();
    }

    // 3. 生成归档文件名
    auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::system_clock::now().time_since_epoch()
    ).count();

    auto shard_id = seastar::this_shard_id();
    std::string archive_path = fmt::format(
        "{}/{}-{}.{}.{}.log",
        _config.archive_dir,
        _config.shard_file_prefix,
        shard_id,
        timestamp,
        _rotation_index
    );

    // 4. 重命名 active log → archive log
    std::string active_path = layout::active_log_path(_config, shard_id);
    co_await seastar::rename_file(active_path, archive_path);

    // 5. 创建新的 active log
    co_await _append_writer.start(_config, active_path);

    // 6. 更新 rotation index
    _rotation_index++;
    g_rotate_operations++;

    mgrlog.info("Rotated log shard-{}: {} → {}",
        shard_id, active_path, archive_path);
}

文件命名格式

Active Log:
shard-0.log

Archive Log:
shard-0.1715424000000.0.log    # shard_id.timestamp.rotation_index.log
shard-0.1715424000000.1.log
shard-0.1715424000000.2.log

Compressed Archive:
shard-0.1715424000000.0.log.gz
shard-0.1715424000000.1.log.gz

Archive:日志归档

归档触发

struct ArchiveConfig {
    bool compress_archives = true;           // 是否压缩
    size_t max_archive_count = 100;         // 最大归档文件数
    uint64_t max_archive_age_seconds = 2592000;  // 最大保留时间(30天)
};

归档流程

1. 扫描 archive 目录
        ↓
2. 按时间排序
        ↓
3. 检查是否需要压缩
        ↓
4. 检查是否需要清理
        ↓
5. 执行压缩/清理

Gzip 压缩实现

seastar::future<> LogManager::gzip_compress(const std::string& path) {
    // 1. 读取原始文件
    std::vector<char> data;
    auto file = co_await seastar::open_file_dma(path, seastar::open_flags::ro);
    auto size = co_await file.size();
    data.resize(size);
    co_await file.dma_read(0, data.data(), size);
    co_await file.close();

    // 2. 压缩数据
    std::vector<char> compressed;
    compressed.reserve(data.size());

    z_stream stream{};
    stream.next_in = reinterpret_cast<Bytef*>(data.data());
    stream.avail_in = data.size();
    stream.total_in = data.size();

    // 初始化压缩流
    if (deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
        15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
        throw std::runtime_error("Failed to initialize deflate");
    }

    // 执行压缩
    std::array<char, 8192> buffer{};
    int ret;
    do {
        stream.next_out = reinterpret_cast<Bytef*>(buffer.data());
        stream.avail_out = buffer.size();

        ret = deflate(&stream, Z_FINISH);
        if (ret < 0 && ret != Z_STREAM_END) {
            deflateEnd(&stream);
            throw std::runtime_error("Compression failed");
        }

        size_t have = buffer.size() - stream.avail_out;
        compressed.insert(compressed.end(), buffer.begin(), buffer.begin() + have);
    } while (ret != Z_STREAM_END);

    deflateEnd(&stream);

    // 3. 写入临时文件(原子操作)
    std::string temp_path = path + ".gz.tmp";
    auto out_file = co_await seastar::open_file_dma(
        temp_path,
        seastar::open_flags::wo | seastar::open_flags::create
    );
    co_await out_file.dma_write(0, compressed.data(), compressed.size());
    co_await out_file.flush();
    co_await out_file.close();

    // 4. 重命名临时文件
    std::string final_path = path + ".gz";
    co_await seastar::rename_file(temp_path, final_path);

    // 5. 删除原始文件
    remove_file_if_exists(path);

    g_gzip_archive_successes++;
    mgrlog.info("Compressed: {} → {} ({} → {} bytes)",
        path, final_path, data.size(), compressed.size());
}

压缩效果

文件类型原始大小压缩后大小压缩率
文本日志100MB15MB85%
JSON 日志100MB20MB80%
二进制日志100MB60MB40%

Cleanup:日志清理

清理策略

seastar::future<> LogManager::cleanup_archives() {
    // 1. 收集所有归档文件
    auto segments = co_await collect_archive_segments(_config, std::nullopt);

    // 2. 按时间排序(从旧到新)
    layout::sort_segments(segments);

    auto now = std::chrono::system_clock::now();
    std::vector<std::string> to_remove;

    // 3. 检查是否需要清理
    for (const auto& segment : segments) {
        bool should_remove = false;

        // 按数量清理
        if (segments.size() > _config.max_archive_count) {
            should_remove = true;
        }

        // 按时间清理
        auto segment_time = std::chrono::system_clock::time_point(
            std::chrono::milliseconds(segment.timestamp_ms)
        );
        auto age = std::chrono::duration_cast<std::chrono::seconds>(
            now - segment_time
        ).count();

        if (age >= _config.max_archive_age_seconds) {
            should_remove = true;
        }

        if (should_remove) {
            to_remove.push_back(segment.path);
        }
    }

    // 4. 执行清理
    for (const auto& path : to_remove) {
        remove_file_if_exists(path);
        mgrlog.info("Removed archive: {}", path);
    }
}

清理策略选择

策略优点缺点适用场景
按数量简单可控可能保留过久磁盘空间有限
按时间更精确需要时间戳合规要求场景
组合策略灵活复杂度高生产环境推荐

组合策略

// 保留最多 100 个文件,或 30 天内的文件(以更严格的为准)
config.max_archive_count = 100;
config.max_archive_age_seconds = 2592000;  // 30天

文件布局管理

目录结构

log_dir/                              # 活跃日志目录
├── log_engine-0.log                  # shard 0 活跃日志
├── log_engine-0.log.checkpoint      # shard 0 checkpoint
├── log_engine-1.log                  # shard 1 活跃日志
├── log_engine-1.log.checkpoint      # shard 1 checkpoint
└── ...

archive_dir/                          # 归档日志目录
├── log_engine-0.1715424000000.0.log   # shard 0 归档
├── log_engine-0.1715424000000.0.log.gz
├── log_engine-0.1715424000000.1.log
├── log_engine-0.1715424000000.1.log.gz
└── ...

文件描述符

struct SegmentDescriptor {
    std::string path;              // 文件路径
    unsigned shard_id;             // shard ID
    bool archived;                 // 是否已归档
    bool compressed;               // 是否已压缩
    std::uint64_t timestamp_ms;    // 时间戳
    std::uint64_t rotation_index;  // 旋转索引
};

路径生成

namespace log_engine::layout {

// 活跃日志路径
std::string active_log_path(const EngineConfig& config, unsigned shard_id) {
    return config.log_dir + "/" + shard_prefix(config, shard_id) + ".log";
}

// Checkpoint 路径
std::string checkpoint_path(const SegmentDescriptor& active_segment) {
    return active_segment.path + ".checkpoint";
}

// 归档日志路径
std::string archive_log_path(
    const EngineConfig& config,
    unsigned shard_id,
    std::uint64_t timestamp_ms,
    std::uint64_t rotation_index,
    bool compressed) {
    auto filename = shard_prefix(config, shard_id)
        + "." + std::to_string(timestamp_ms)
        + "." + std::to_string(rotation_index)
        + ".log";
    if (compressed) {
        filename += ".gz";
    }
    return config.archive_dir + "/" + filename;
}

}  // namespace log_engine::layout

文件扫描与排序

扫描活跃文件

std::vector<SegmentDescriptor> collect_active_segments(
    const EngineConfig& config,
    const std::optional<unsigned>& shard
) {
    namespace fs = std::filesystem;
    std::vector<SegmentDescriptor> segments;

    if (!fs::exists(config.log_dir)) {
        return segments;
    }

    for (const auto& entry : fs::directory_iterator(config.log_dir)) {
        if (!entry.is_regular_file() || entry.path().extension() != ".log") {
            continue;
        }

        const auto segment = describe_path(config, entry.path().string());
        if (!segment || segment->archived || !matches_query_shard(*segment, shard)) {
            continue;
        }

        segments.push_back(*segment);
    }

    sort_segments(segments);
    return segments;
}

扫描归档文件

std::vector<SegmentDescriptor> collect_archive_segments(
    const EngineConfig& config,
    const std::optional<unsigned>& shard
) {
    namespace fs = std::filesystem;
    std::vector<SegmentDescriptor> segments;

    if (!fs::exists(config.archive_dir)) {
        return segments;
    }

    for (const auto& entry : fs::directory_iterator(config.archive_dir)) {
        if (!entry.is_regular_file()) {
            continue;
        }

        const auto segment = describe_path(config, entry.path().string());
        if (!segment || !segment->archived || !matches_query_shard(*segment, shard)) {
            continue;
        }

        segments.push_back(*segment);
    }

    sort_segments(segments);
    return segments;
}

排序策略

void sort_segments(std::vector<SegmentDescriptor>& segments) {
    // 按 shard_id → timestamp → rotation_index 排序
    std::sort(segments.begin(), segments.end(),
        [](const SegmentDescriptor& lhs, const SegmentDescriptor& rhs) {
            if (lhs.shard_id != rhs.shard_id) {
                return lhs.shard_id < rhs.shard_id;
            }
            if (lhs.timestamp_ms != rhs.timestamp_ms) {
                return lhs.timestamp_ms < rhs.timestamp_ms;
            }
            return lhs.rotation_index < rhs.rotation_index;
        });
}

原子操作保证

文件写入原子性

// 错误方式:直接写入
write_file("archive.log.gz", compressed_data);
// 如果进程在写入中崩溃,文件损坏

// 正确方式:写入临时文件 + 重命名
write_file("archive.log.gz.tmp", compressed_data);  // 写入临时文件
rename_file("archive.log.gz.tmp", "archive.log.gz");  // 原子重命名

Checkpoint 原子写入

seastar::future<> AsyncWriter::persist_checkpoint() {
    CheckpointState checkpoint{
        .logical_size = _append_writer.get_logical_size(),
        .sequence = _sequence,
        .rotation_index = _rotation_index
    };

    std::string checkpoint_path = layout::checkpoint_path(_active_segment);

    // 1. 写入临时文件
    std::string temp_path = checkpoint_path + ".tmp";
    std::ofstream out(temp_path, std::ios::binary);
    out << "logical_size=" << checkpoint.logical_size << "\n";
    out << "sequence=" << checkpoint.sequence << "\n";
    out << "rotation_index=" << checkpoint.rotation_index << "\n";
    out.close();

    // 2. 原子重命名
    co_await seastar::rename_file(temp_path, checkpoint_path);

    g_checkpoint_write_successes++;
}

清理临时文件

void cleanup_temporary_sidecars(const EngineConfig& config) {
    namespace fs = std::filesystem;
    size_t removed = 0;

    // 清理 checkpoint.tmp
    if (fs::exists(config.log_dir)) {
        for (const auto& entry : fs::directory_iterator(config.log_dir)) {
            if (entry.path().extension() == ".tmp") {
                fs::remove(entry.path());
                removed++;
            }
        }
    }

    // 清理 .gz.tmp
    if (fs::exists(config.archive_dir)) {
        for (const auto& entry : fs::directory_iterator(config.archive_dir)) {
            if (entry.path().extension() == ".tmp") {
                fs::remove(entry.path());
                removed++;
            }
        }
    }

    if (removed > 0) {
        mgrlog.warn("Cleaned up {} temporary files", removed);
    }
}

性能优化

异步压缩

seastar::future<> LogManager::compress_archives_async() {
    auto segments = co_await collect_archive_segments(_config, std::nullopt);

    // 找出未压缩的归档文件
    std::vector<std::string> to_compress;
    for (const auto& segment : segments) {
        if (!segment.compressed) {
            to_compress.push_back(segment.path);
        }
    }

    // 并发压缩
    std::vector<seastar::future<>> futures;
    for (const auto& path : to_compress) {
        futures.push_back(
            seastar::async([this, path] {
                return gzip_compress(path);
            })
        );
    }

    co_await seastar::when_all_succeed(futures.begin(), futures.end());
}

批量清理

seastar::future<> LogManager::cleanup_archives_batch() {
    auto segments = co_await collect_archive_segments(_config, std::nullopt);
    layout::sort_segments(segments);

    std::vector<std::string> to_remove;
    // ... 判断哪些文件需要清理 ...

    // 批量删除
    co_await seastar::async([to_remove] {
        for (const auto& path : to_remove) {
            remove_file_if_exists(path);
        }
    });
}

故障处理

压缩失败处理

seastar::future<> LogManager::gzip_compress(const std::string& path) {
    try {
        // ... 压缩逻辑 ...
        g_gzip_archive_successes++;
    } catch (const std::exception& e) {
        g_gzip_archive_failures++;
        mgrlog.error("Failed to compress {}: {}", path, e.what());

        // 清理临时文件
        std::string temp_path = path + ".gz.tmp";
        remove_file_if_exists(temp_path);

        throw;
    }
}

归档文件损坏处理

seastar::future<> LogManager::verify_archives() {
    auto segments = co_await collect_archive_segments(_config, std::nullopt);

    for (const auto& segment : segments) {
        try {
            // 验证文件完整性
            if (segment.compressed) {
                co_await verify_gzip(segment.path);
            } else {
                co_await verify_log(segment.path);
            }
        } catch (const std::exception& e) {
            mgrlog.error("Archive verification failed {}: {}", segment.path, e.what());
            // 标记为损坏,后续处理
        }
    }
}

指标监控

Rotate 指标

seastar::metrics::group("log_engine_log_manager")
    .make_counter("rotate_operations", g_rotate_operations)
    .make_gauge("current_rotation_index", _rotation_index)
    .make_gauge("active_log_size_bytes", _append_writer.get_logical_size());

Archive 指标

seastar::metrics::group("log_engine_log_manager")
    .make_counter("gzip_archive_successes", g_gzip_archive_successes)
    .make_counter("gzip_archive_failures", g_gzip_archive_failures)
    .make_gauge("archive_count", get_archive_count())
    .make_gauge("archive_total_bytes", get_archive_total_bytes());

监控建议

# Rotate 频率异常
if rotate_operations > expected * 2:
    alert("Rotate frequency too high, check rotate_size_bytes")

# 归档文件数量过多
if archive_count > max_archive_count * 0.9:
    alert("Archive count approaching limit")

# 压缩失败
if gzip_archive_failures > 0:
    alert("Gzip compression failures detected")

配置建议

Rotate 配置

// 小规模系统(日志量小)
config.rotate_size_bytes = 50 * 1024 * 1024;     // 50MB
config.rotate_interval_seconds = 86400;          // 1天

// 中等规模系统
config.rotate_size_bytes = 100 * 1024 * 1024;    // 100MB
config.rotate_interval_seconds = 43200;          // 12小时

// 大规模系统(日志量大)
config.rotate_size_bytes = 200 * 1024 * 1024;    // 200MB
config.rotate_interval_seconds = 21600;          // 6小时

Archive 配置

// 开发环境
config.compress_archives = false;                // 不压缩
config.max_archive_count = 50;
config.max_archive_age_seconds = 604800;         // 7天

// 生产环境(推荐)
config.compress_archives = true;                 // 压缩
config.max_archive_count = 100;
config.max_archive_age_seconds = 2592000;        // 30天

// 合规要求场景
config.compress_archives = true;
config.max_archive_count = 1000;                 // 保留更多文件
config.max_archive_age_seconds = 7776000;        // 90天

总结

Rotate 与 Archive 机制通过以下方式管理日志生命周期:

  1. Rotate:按大小/时间滚动,防止单文件过大
  2. Archive:压缩存储,节省磁盘空间
  3. Cleanup:按数量/时间清理,控制总容量
  4. 原子操作:确保进程崩溃时数据一致
  5. 性能优化:异步处理,减少阻塞

最佳实践

  • 默认配置:100MB rotate + gzip 压缩 + 100 个归档文件 + 30 天保留
  • 定期监控归档文件数量和磁盘使用情况
  • 定期验证归档文件完整性
  • 根据日志量调整 rotate 和 archive 策略

系列 2 完结

下一步:系列 3 《性能优化实践》

相关阅读