Rotate 与 Archive 设计:日志生命周期管理
深入分析日志文件的 Rotate(滚动)和 Archive(归档)机制,包括触发策略、归档格式、清理策略等生命周期管理设计。
日志生命周期
问题场景
长期运行的日志系统会产生无限的日志文件:
时间线:
T0: shard-0.log (100MB)
T1: shard-0.log (200MB)
T2: shard-0.log (300MB)
T3: shard-0.log (400MB)
...
问题:文件无限增长,磁盘空间耗尽
解决方案:生命周期管理
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Write │ ──→ │ Rotate │ ──→ │ Archive │ ──→ │ Cleanup │
│ (Active)│ │ (滚动) │ │ (归档) │ │ (清理) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
↓ ↓ ↓
旧文件归档 压缩存储 清理过期文件
创建新文件 释放空间 控制总容量
Rotate:日志滚动
触发条件
struct RotateConfig {
// 按大小触发
size_t rotate_size_bytes = 100 * 1024 * 1024; // 100MB
// 按时间触发
uint64_t rotate_interval_seconds = 86400; // 1天
};
触发检查
seastar::future<> AsyncWriter::check_rotation() {
const auto& stats = _append_writer.get_stats();
// 1. 按大小触发
if (stats.logical_size >= _config.rotate_size_bytes) {
co_await rotate_log();
co_return;
}
// 2. 按时间触发
auto now = std::chrono::system_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - stats.opened_at
).count();
if (elapsed >= _config.rotate_interval_seconds) {
co_await rotate_log();
}
}
Rotate 流程
1. 检查触发条件
↓
2. flush_tail() 确保所有数据落盘
↓
3. 创建 checkpoint
↓
4. 重命名 active log → archive log
↓
5. 创建新的 active log
↓
6. 更新 rotation_index
实现代码
seastar::future<> AsyncWriter::rotate_log() {
// 1. 确保所有数据落盘
co_await _append_writer.flush_tail(true);
// 2. 创建 checkpoint
if (_config.checkpoint_enabled) {
co_await persist_checkpoint();
}
// 3. 生成归档文件名
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
auto shard_id = seastar::this_shard_id();
std::string archive_path = fmt::format(
"{}/{}-{}.{}.{}.log",
_config.archive_dir,
_config.shard_file_prefix,
shard_id,
timestamp,
_rotation_index
);
// 4. 重命名 active log → archive log
std::string active_path = layout::active_log_path(_config, shard_id);
co_await seastar::rename_file(active_path, archive_path);
// 5. 创建新的 active log
co_await _append_writer.start(_config, active_path);
// 6. 更新 rotation index
_rotation_index++;
g_rotate_operations++;
mgrlog.info("Rotated log shard-{}: {} → {}",
shard_id, active_path, archive_path);
}
文件命名格式
Active Log:
shard-0.log
Archive Log:
shard-0.1715424000000.0.log # shard_id.timestamp.rotation_index.log
shard-0.1715424000000.1.log
shard-0.1715424000000.2.log
Compressed Archive:
shard-0.1715424000000.0.log.gz
shard-0.1715424000000.1.log.gz
Archive:日志归档
归档触发
struct ArchiveConfig {
bool compress_archives = true; // 是否压缩
size_t max_archive_count = 100; // 最大归档文件数
uint64_t max_archive_age_seconds = 2592000; // 最大保留时间(30天)
};
归档流程
1. 扫描 archive 目录
↓
2. 按时间排序
↓
3. 检查是否需要压缩
↓
4. 检查是否需要清理
↓
5. 执行压缩/清理
Gzip 压缩实现
seastar::future<> LogManager::gzip_compress(const std::string& path) {
// 1. 读取原始文件
std::vector<char> data;
auto file = co_await seastar::open_file_dma(path, seastar::open_flags::ro);
auto size = co_await file.size();
data.resize(size);
co_await file.dma_read(0, data.data(), size);
co_await file.close();
// 2. 压缩数据
std::vector<char> compressed;
compressed.reserve(data.size());
z_stream stream{};
stream.next_in = reinterpret_cast<Bytef*>(data.data());
stream.avail_in = data.size();
stream.total_in = data.size();
// 初始化压缩流
if (deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
throw std::runtime_error("Failed to initialize deflate");
}
// 执行压缩
std::array<char, 8192> buffer{};
int ret;
do {
stream.next_out = reinterpret_cast<Bytef*>(buffer.data());
stream.avail_out = buffer.size();
ret = deflate(&stream, Z_FINISH);
if (ret < 0 && ret != Z_STREAM_END) {
deflateEnd(&stream);
throw std::runtime_error("Compression failed");
}
size_t have = buffer.size() - stream.avail_out;
compressed.insert(compressed.end(), buffer.begin(), buffer.begin() + have);
} while (ret != Z_STREAM_END);
deflateEnd(&stream);
// 3. 写入临时文件(原子操作)
std::string temp_path = path + ".gz.tmp";
auto out_file = co_await seastar::open_file_dma(
temp_path,
seastar::open_flags::wo | seastar::open_flags::create
);
co_await out_file.dma_write(0, compressed.data(), compressed.size());
co_await out_file.flush();
co_await out_file.close();
// 4. 重命名临时文件
std::string final_path = path + ".gz";
co_await seastar::rename_file(temp_path, final_path);
// 5. 删除原始文件
remove_file_if_exists(path);
g_gzip_archive_successes++;
mgrlog.info("Compressed: {} → {} ({} → {} bytes)",
path, final_path, data.size(), compressed.size());
}
压缩效果
| 文件类型 | 原始大小 | 压缩后大小 | 压缩率 |
|---|---|---|---|
| 文本日志 | 100MB | 15MB | 85% |
| JSON 日志 | 100MB | 20MB | 80% |
| 二进制日志 | 100MB | 60MB | 40% |
Cleanup:日志清理
清理策略
seastar::future<> LogManager::cleanup_archives() {
// 1. 收集所有归档文件
auto segments = co_await collect_archive_segments(_config, std::nullopt);
// 2. 按时间排序(从旧到新)
layout::sort_segments(segments);
auto now = std::chrono::system_clock::now();
std::vector<std::string> to_remove;
// 3. 检查是否需要清理
for (const auto& segment : segments) {
bool should_remove = false;
// 按数量清理
if (segments.size() > _config.max_archive_count) {
should_remove = true;
}
// 按时间清理
auto segment_time = std::chrono::system_clock::time_point(
std::chrono::milliseconds(segment.timestamp_ms)
);
auto age = std::chrono::duration_cast<std::chrono::seconds>(
now - segment_time
).count();
if (age >= _config.max_archive_age_seconds) {
should_remove = true;
}
if (should_remove) {
to_remove.push_back(segment.path);
}
}
// 4. 执行清理
for (const auto& path : to_remove) {
remove_file_if_exists(path);
mgrlog.info("Removed archive: {}", path);
}
}
清理策略选择
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 按数量 | 简单可控 | 可能保留过久 | 磁盘空间有限 |
| 按时间 | 更精确 | 需要时间戳 | 合规要求场景 |
| 组合策略 | 灵活 | 复杂度高 | 生产环境推荐 |
组合策略:
// 保留最多 100 个文件,或 30 天内的文件(以更严格的为准)
config.max_archive_count = 100;
config.max_archive_age_seconds = 2592000; // 30天
文件布局管理
目录结构
log_dir/ # 活跃日志目录
├── log_engine-0.log # shard 0 活跃日志
├── log_engine-0.log.checkpoint # shard 0 checkpoint
├── log_engine-1.log # shard 1 活跃日志
├── log_engine-1.log.checkpoint # shard 1 checkpoint
└── ...
archive_dir/ # 归档日志目录
├── log_engine-0.1715424000000.0.log # shard 0 归档
├── log_engine-0.1715424000000.0.log.gz
├── log_engine-0.1715424000000.1.log
├── log_engine-0.1715424000000.1.log.gz
└── ...
文件描述符
struct SegmentDescriptor {
std::string path; // 文件路径
unsigned shard_id; // shard ID
bool archived; // 是否已归档
bool compressed; // 是否已压缩
std::uint64_t timestamp_ms; // 时间戳
std::uint64_t rotation_index; // 旋转索引
};
路径生成
namespace log_engine::layout {
// 活跃日志路径
std::string active_log_path(const EngineConfig& config, unsigned shard_id) {
return config.log_dir + "/" + shard_prefix(config, shard_id) + ".log";
}
// Checkpoint 路径
std::string checkpoint_path(const SegmentDescriptor& active_segment) {
return active_segment.path + ".checkpoint";
}
// 归档日志路径
std::string archive_log_path(
const EngineConfig& config,
unsigned shard_id,
std::uint64_t timestamp_ms,
std::uint64_t rotation_index,
bool compressed) {
auto filename = shard_prefix(config, shard_id)
+ "." + std::to_string(timestamp_ms)
+ "." + std::to_string(rotation_index)
+ ".log";
if (compressed) {
filename += ".gz";
}
return config.archive_dir + "/" + filename;
}
} // namespace log_engine::layout
文件扫描与排序
扫描活跃文件
std::vector<SegmentDescriptor> collect_active_segments(
const EngineConfig& config,
const std::optional<unsigned>& shard
) {
namespace fs = std::filesystem;
std::vector<SegmentDescriptor> segments;
if (!fs::exists(config.log_dir)) {
return segments;
}
for (const auto& entry : fs::directory_iterator(config.log_dir)) {
if (!entry.is_regular_file() || entry.path().extension() != ".log") {
continue;
}
const auto segment = describe_path(config, entry.path().string());
if (!segment || segment->archived || !matches_query_shard(*segment, shard)) {
continue;
}
segments.push_back(*segment);
}
sort_segments(segments);
return segments;
}
扫描归档文件
std::vector<SegmentDescriptor> collect_archive_segments(
const EngineConfig& config,
const std::optional<unsigned>& shard
) {
namespace fs = std::filesystem;
std::vector<SegmentDescriptor> segments;
if (!fs::exists(config.archive_dir)) {
return segments;
}
for (const auto& entry : fs::directory_iterator(config.archive_dir)) {
if (!entry.is_regular_file()) {
continue;
}
const auto segment = describe_path(config, entry.path().string());
if (!segment || !segment->archived || !matches_query_shard(*segment, shard)) {
continue;
}
segments.push_back(*segment);
}
sort_segments(segments);
return segments;
}
排序策略
void sort_segments(std::vector<SegmentDescriptor>& segments) {
// 按 shard_id → timestamp → rotation_index 排序
std::sort(segments.begin(), segments.end(),
[](const SegmentDescriptor& lhs, const SegmentDescriptor& rhs) {
if (lhs.shard_id != rhs.shard_id) {
return lhs.shard_id < rhs.shard_id;
}
if (lhs.timestamp_ms != rhs.timestamp_ms) {
return lhs.timestamp_ms < rhs.timestamp_ms;
}
return lhs.rotation_index < rhs.rotation_index;
});
}
原子操作保证
文件写入原子性
// 错误方式:直接写入
write_file("archive.log.gz", compressed_data);
// 如果进程在写入中崩溃,文件损坏
// 正确方式:写入临时文件 + 重命名
write_file("archive.log.gz.tmp", compressed_data); // 写入临时文件
rename_file("archive.log.gz.tmp", "archive.log.gz"); // 原子重命名
Checkpoint 原子写入
seastar::future<> AsyncWriter::persist_checkpoint() {
CheckpointState checkpoint{
.logical_size = _append_writer.get_logical_size(),
.sequence = _sequence,
.rotation_index = _rotation_index
};
std::string checkpoint_path = layout::checkpoint_path(_active_segment);
// 1. 写入临时文件
std::string temp_path = checkpoint_path + ".tmp";
std::ofstream out(temp_path, std::ios::binary);
out << "logical_size=" << checkpoint.logical_size << "\n";
out << "sequence=" << checkpoint.sequence << "\n";
out << "rotation_index=" << checkpoint.rotation_index << "\n";
out.close();
// 2. 原子重命名
co_await seastar::rename_file(temp_path, checkpoint_path);
g_checkpoint_write_successes++;
}
清理临时文件
void cleanup_temporary_sidecars(const EngineConfig& config) {
namespace fs = std::filesystem;
size_t removed = 0;
// 清理 checkpoint.tmp
if (fs::exists(config.log_dir)) {
for (const auto& entry : fs::directory_iterator(config.log_dir)) {
if (entry.path().extension() == ".tmp") {
fs::remove(entry.path());
removed++;
}
}
}
// 清理 .gz.tmp
if (fs::exists(config.archive_dir)) {
for (const auto& entry : fs::directory_iterator(config.archive_dir)) {
if (entry.path().extension() == ".tmp") {
fs::remove(entry.path());
removed++;
}
}
}
if (removed > 0) {
mgrlog.warn("Cleaned up {} temporary files", removed);
}
}
性能优化
异步压缩
seastar::future<> LogManager::compress_archives_async() {
auto segments = co_await collect_archive_segments(_config, std::nullopt);
// 找出未压缩的归档文件
std::vector<std::string> to_compress;
for (const auto& segment : segments) {
if (!segment.compressed) {
to_compress.push_back(segment.path);
}
}
// 并发压缩
std::vector<seastar::future<>> futures;
for (const auto& path : to_compress) {
futures.push_back(
seastar::async([this, path] {
return gzip_compress(path);
})
);
}
co_await seastar::when_all_succeed(futures.begin(), futures.end());
}
批量清理
seastar::future<> LogManager::cleanup_archives_batch() {
auto segments = co_await collect_archive_segments(_config, std::nullopt);
layout::sort_segments(segments);
std::vector<std::string> to_remove;
// ... 判断哪些文件需要清理 ...
// 批量删除
co_await seastar::async([to_remove] {
for (const auto& path : to_remove) {
remove_file_if_exists(path);
}
});
}
故障处理
压缩失败处理
seastar::future<> LogManager::gzip_compress(const std::string& path) {
try {
// ... 压缩逻辑 ...
g_gzip_archive_successes++;
} catch (const std::exception& e) {
g_gzip_archive_failures++;
mgrlog.error("Failed to compress {}: {}", path, e.what());
// 清理临时文件
std::string temp_path = path + ".gz.tmp";
remove_file_if_exists(temp_path);
throw;
}
}
归档文件损坏处理
seastar::future<> LogManager::verify_archives() {
auto segments = co_await collect_archive_segments(_config, std::nullopt);
for (const auto& segment : segments) {
try {
// 验证文件完整性
if (segment.compressed) {
co_await verify_gzip(segment.path);
} else {
co_await verify_log(segment.path);
}
} catch (const std::exception& e) {
mgrlog.error("Archive verification failed {}: {}", segment.path, e.what());
// 标记为损坏,后续处理
}
}
}
指标监控
Rotate 指标
seastar::metrics::group("log_engine_log_manager")
.make_counter("rotate_operations", g_rotate_operations)
.make_gauge("current_rotation_index", _rotation_index)
.make_gauge("active_log_size_bytes", _append_writer.get_logical_size());
Archive 指标
seastar::metrics::group("log_engine_log_manager")
.make_counter("gzip_archive_successes", g_gzip_archive_successes)
.make_counter("gzip_archive_failures", g_gzip_archive_failures)
.make_gauge("archive_count", get_archive_count())
.make_gauge("archive_total_bytes", get_archive_total_bytes());
监控建议
# Rotate 频率异常
if rotate_operations > expected * 2:
alert("Rotate frequency too high, check rotate_size_bytes")
# 归档文件数量过多
if archive_count > max_archive_count * 0.9:
alert("Archive count approaching limit")
# 压缩失败
if gzip_archive_failures > 0:
alert("Gzip compression failures detected")
配置建议
Rotate 配置
// 小规模系统(日志量小)
config.rotate_size_bytes = 50 * 1024 * 1024; // 50MB
config.rotate_interval_seconds = 86400; // 1天
// 中等规模系统
config.rotate_size_bytes = 100 * 1024 * 1024; // 100MB
config.rotate_interval_seconds = 43200; // 12小时
// 大规模系统(日志量大)
config.rotate_size_bytes = 200 * 1024 * 1024; // 200MB
config.rotate_interval_seconds = 21600; // 6小时
Archive 配置
// 开发环境
config.compress_archives = false; // 不压缩
config.max_archive_count = 50;
config.max_archive_age_seconds = 604800; // 7天
// 生产环境(推荐)
config.compress_archives = true; // 压缩
config.max_archive_count = 100;
config.max_archive_age_seconds = 2592000; // 30天
// 合规要求场景
config.compress_archives = true;
config.max_archive_count = 1000; // 保留更多文件
config.max_archive_age_seconds = 7776000; // 90天
总结
Rotate 与 Archive 机制通过以下方式管理日志生命周期:
- Rotate:按大小/时间滚动,防止单文件过大
- Archive:压缩存储,节省磁盘空间
- Cleanup:按数量/时间清理,控制总容量
- 原子操作:确保进程崩溃时数据一致
- 性能优化:异步处理,减少阻塞
最佳实践:
- 默认配置:100MB rotate + gzip 压缩 + 100 个归档文件 + 30 天保留
- 定期监控归档文件数量和磁盘使用情况
- 定期验证归档文件完整性
- 根据日志量调整 rotate 和 archive 策略
系列 2 完结
下一步:系列 3 《性能优化实践》
相关阅读: