查询接口设计:HTTP/gRPC 双协议支持
深入分析 Seastar Log Engine 的查询接口设计,包括 HTTP 和 gRPC 双协议支持、查询优化和安全考虑。
查询接口的重要性
为什么需要查询接口?
虽然日志系统的核心是写入,但在实际生产环境中,查询能力同样重要:
- 问题排查:快速定位日志中的错误信息
- 数据分析:统计和分析日志中的模式
- 运维监控:实时监控系统状态
- 审计追踪:记录关键操作日志
查询场景
| 场景 | 查询模式 | 典型需求 |
|---|---|---|
| 实时监控 | 状态查询 | 当前系统状态、健康检查 |
| 问题排查 | 按时间查询 | 查询特定时间段的日志 |
| 数据分析 | 按序列号查询 | 查询特定范围的记录 |
| 路由调试 | 路由查询 | 验证消息路由规则 |
双协议设计
为什么支持双协议?
HTTP 和 gRPC 各有优势,适合不同场景:
| 协议 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| HTTP | 简单易用、广泛支持、调试方便 | 性能相对较低、类型检查弱 | 快速调试、脚本自动化 |
| gRPC | 高性能、类型安全、流式支持 | 复杂度较高、需要代码生成 | 高频查询、微服务调用 |
协议选择建议
// 使用场景决策树
if (需要快速调试) {
使用 HTTP;
} else if (高频调用) {
使用 gRPC;
} else if (需要流式数据) {
使用 gRPC streaming;
} else {
两者都支持,根据客户端选择;
}
HTTP 查询接口
端点设计
Seastar Log Engine 提供 4 个核心 HTTP 端点:
GET /v1/status # 获取系统状态
GET /v1/route # 查询路由规则
GET /v1/records # 查询日志记录
GET /metrics # Prometheus 指标
1. Status 端点
端点:GET /v1/status
用途:获取系统整体状态和健康检查
返回示例:
{
"health": "healthy",
"health_reason": "no_recent_errors",
"health_reason_basis": "no_errors_in_window",
"recovery_fallback_reason": "none",
"routing_strategy": "consistent_hashing",
"routing_shards": 4,
"routing_virtual_nodes": 256,
"ring_size": 1024,
"log_dir": "./logs",
"archive_dir": "./archive",
"shard_file_prefix": "shard",
"reader_stats": {
"segments_read": 1024,
"archive_segments_read": 256,
"active_segments_read": 768,
"records_returned": 50000,
"corrupted_segments": 0,
"corrupted_lines": 5,
"gzip_read_errors": 2
},
"log_manager_stats": {
"rotate_operations": 128,
"checkpoint_write_successes": 953,
"checkpoint_write_failures": 0,
"recovery_fallbacks": 1,
"recovery_fallback_incomplete_checkpoint": 0,
"recovery_fallback_stale_checkpoint": 1,
"gzip_archive_successes": 120,
"gzip_archive_failures": 0
},
"health_recent_errors": {
"reader_corrupted_segments": 0,
"reader_corrupted_lines": 5,
"reader_gzip_read_errors": 2,
"log_manager_checkpoint_failures": 0,
"log_manager_gzip_failures": 0,
"log_manager_recovery_fallbacks": 1
}
}
使用示例:
# 健康检查
curl http://localhost:18080/v1/status
# 解析健康状态
health=$(curl -s http://localhost:18080/v1/status | jq -r '.health')
if [ "$health" != "healthy" ]; then
echo "System unhealthy!"
fi
2. Route 端点
端点:GET /v1/route?key=<route_key>
用途:查询消息应该路由到哪个 shard
返回示例:
{
"route_key": "user:123",
"shard": 2,
"hash": 1234567890,
"token": 54321,
"used_local_fallback": false
}
使用场景:
# 验证路由规则
curl http://localhost:18080/v1/route?key=user:123
# 批量验证
for key in "user:123" "user:456" "order:789"; do
echo "$key -> $(curl -s "http://localhost:18080/v1/route?key=$key" | jq -r '.shard')"
done
3. Records 端点
端点:GET /v1/records?<params>
参数:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
shard | int | 否 | 指定 shard,不指定则查询所有 |
seq_from | int | 否 | 起始序列号 |
seq_to | int | 否 | 结束序列号 |
time_from | string | 否 | 起始时间 (RFC3339) |
time_to | string | 否 | 结束时间 (RFC3339) |
limit | int | 是 | 返回记录数限制 |
include_archive | bool | 否 | 是否包含归档文件 |
返回示例:
{
"records": [
{
"crc": 1234567890,
"timestamp": "2026-05-15T10:30:00Z",
"shard": 2,
"has_sequence": true,
"sequence": 100,
"level": "INFO",
"payload": "Processing request user:123",
"raw_line": "ts=2026-05-15T10:30:00Z shard=2 seq=100 level=INFO crc=a1b2c3d4 Processing request user:123"
}
]
}
使用示例:
# 查询最近的 10 条记录
curl "http://localhost:18080/v1/records?limit=10"
# 查询特定 shard 的记录
curl "http://localhost:18080/v1/records?shard=2&limit=100"
# 按序列号范围查询
curl "http://localhost:18080/v1/records?seq_from=100&seq_to=200&limit=50"
# 按时间范围查询
curl "http://localhost:18080/v1/records?time_from=2026-05-15T10:00:00Z&time_to=2026-05-15T11:00:00Z&limit=100"
# 包含归档文件
curl "http://localhost:18080/v1/records?include_archive=true&limit=100"
4. Metrics 端点
端点:GET /metrics
用途:Prometheus 格式的指标导出
返回示例:
# log_engine_writer metrics
log_engine_writer_submitted_messages{shard="0"} 125000
log_engine_writer_submitted_bytes{shard="0"} 32000000
log_engine_writer_flushed_batches{shard="0"} 1250
log_engine_writer_flushed_bytes{shard="0"} 31750000
log_engine_writer_flush_errors{shard="0"} 0
log_engine_writer_backpressure_waits{shard="0"} 0
log_engine_writer_pending_entries{shard="0"} 5
log_engine_writer_pending_bytes{shard="0"} 12800
# log_engine_reader metrics
log_engine_reader_segments_read{shard="0"} 256
log_engine_reader_archive_segments_read{shard="0"} 64
log_engine_reader_active_segments_read{shard="0"} 192
log_engine_reader_records_returned{shard="0"} 12500
log_engine_reader_corrupted_segments{shard="0"} 0
log_engine_reader_corrupted_lines{shard="0"} 2
log_engine_reader_gzip_read_errors{shard="0"} 0
gRPC 查询接口
Protocol Buffers 定义
syntax = "proto3";
package logengine.query.v1;
message Empty {}
message StatusReply {
string routing_strategy = 1;
uint32 routing_shards = 2;
uint64 routing_virtual_nodes = 3;
// ... 其他字段
}
message RouteRequest {
string route_key = 1;
}
message RouteReply {
string route_key = 1;
uint32 shard = 2;
uint64 hash = 3;
uint64 token = 4;
bool used_local_fallback = 5;
}
message QueryRecordsRequest {
optional uint32 shard = 1;
optional uint64 seq_from = 2;
optional uint64 seq_to = 3;
optional string time_from = 4;
optional string time_to = 5;
uint64 limit = 6;
optional bool include_archive = 7;
}
message QueryRecord {
uint32 crc = 1;
string timestamp = 2;
uint32 shard = 3;
bool has_sequence = 4;
uint64 sequence = 5;
string level = 6;
string payload = 7;
string raw_line = 8;
}
message QueryRecordsReply {
repeated QueryRecord records = 1;
}
service QueryService {
rpc GetStatus(Empty) returns (StatusReply);
rpc Route(RouteRequest) returns (RouteReply);
rpc QueryRecords(QueryRecordsRequest) returns (QueryRecordsReply);
}
Client 使用示例
C++ Client:
#include "log_engine/query_client.hh"
// 创建客户端
QueryClient client("127.0.0.1:19090");
// 查询状态
auto status = client.get_status();
std::cout << "Health: " << status.health() << std::endl;
// 查询路由
auto route = client.route("user:123");
std::cout << "Route: " << route.route_key() << " -> shard " << route.shard() << std::endl;
// 查询记录
QueryRecordsRequest request;
request.set_limit(100);
request.set_shard(2);
auto reply = client.query_records(request);
for (const auto& record : reply.records()) {
std::cout << record.payload() << std::endl;
}
Python Client:
import grpc
from logengine_query_pb2 import *
from logengine_query_pb2_grpc import *
# 创建客户端
channel = grpc.insecure_channel('127.0.0.1:19090')
stub = QueryServiceStub(channel)
# 查询状态
status = stub.GetStatus(Empty())
print(f"Health: {status.health}")
# 查询路由
route_request = RouteRequest(route_key="user:123")
route = stub.Route(route_request)
print(f"Route: {route.route_key} -> shard {route.shard}")
# 查询记录
query_request = QueryRecordsRequest(
shard=2,
limit=100
)
reply = stub.QueryRecords(query_request)
for record in reply.records:
print(record.payload)
查询优化
1. 索引优化
// 按序列号查询 - 直接定位
if (query.seq_from.has_value()) {
// 直接跳到目标序列号附近
offset = estimate_offset_by_sequence(query.seq_from.value());
}
// 按时间查询 - 使用时间索引
if (query.time_from.has_value()) {
// 使用时间戳二分查找
offset = binary_search_by_time(query.time_from.value());
}
2. 缓存策略
class QueryCache {
private:
std::unordered_map<QueryKey, CachedResult> _cache;
size_t _max_size = 1000;
public:
std::optional<QueryResult> get(const QueryKey& key) {
auto it = _cache.find(key);
if (it != _cache.end()) {
if (is_valid(it->second)) {
return it->second.result;
}
_cache.erase(it);
}
return std::nullopt;
}
void put(const QueryKey& key, const QueryResult& result) {
if (_cache.size() >= _max_size) {
evict_oldest();
}
_cache[key] = {result, current_time()};
}
};
3. 分页优化
// 支持游标分页
QueryResult query_with_cursor(const Query& query, const std::string& cursor) {
size_t offset = decode_cursor(cursor);
auto records = read_records(query, offset);
std::string next_cursor;
if (records.size() == query.limit) {
next_cursor = encode_cursor(offset + records.size());
}
return {records, next_cursor};
}
安全考虑
1. 访问控制
// 简单的 IP 白名单
class AccessControl {
private:
std::unordered_set<std::string> _whitelist = {
"127.0.0.1",
"10.0.0.0/8"
};
public:
bool is_allowed(const std::string& ip) {
// 检查白名单
for (const auto& allowed : _whitelist) {
if (ip_match(ip, allowed)) {
return true;
}
}
return false;
}
};
2. 查询限制
// 限制查询范围和结果数量
void validate_query(const QueryRecordsRequest& request) {
// 限制返回数量
if (request.limit() > 10000) {
throw std::runtime_error("Limit too large, max 10000");
}
// 限制时间范围
if (request.time_from().has_value() && request.time_to().has_value()) {
auto duration = parse_time(request.time_to()) -
parse_time(request.time_from());
if (duration > std::chrono::hours(24)) {
throw std::runtime_error("Time range too large, max 24 hours");
}
}
}
3. 审计日志
// 记录查询操作
void audit_query(const std::string& client_ip, const QueryRecordsRequest& request) {
AuditLog entry{
.timestamp = current_time(),
.client_ip = client_ip,
.query_type = "QueryRecords",
.parameters = serialize_query(request)
};
audit_logger.log(entry);
}
性能基准
HTTP vs gRPC 性能对比
测试环境:
- 消息数:10,000
- 并发度:10
- Payload 大小:256 bytes
测试结果:
| 操作 | HTTP | gRPC | 提升 |
|---|---|---|---|
| GetStatus | 50 ops/s | 200 ops/s | 4x |
| Route | 100 ops/s | 400 ops/s | 4x |
| QueryRecords (100) | 20 ops/s | 80 ops/s | 4x |
| QueryRecords (1000) | 5 ops/s | 20 ops/s | 4x |
结论:gRPC 性能显著优于 HTTP,适合高频查询场景。
部署建议
1. 独立部署查询服务
# 查询服务独立进程
./build/log_engine_query_server \
--log-dir ./logs \
--archive-dir ./archive \
--http-address 0.0.0.0 \
--http-port 18080 \
--grpc-address 0.0.0.0 \
--grpc-port 19090 \
--metrics-address 0.0.0.0 \
--metrics-port 19181
2. 负载均衡
upstream query_backend {
server 127.0.0.1:19090;
server 127.0.0.1:19091;
server 127.0.0.1:19092;
}
server {
listen 80;
location /v1/ {
proxy_pass http://query_backend;
proxy_set_header Host $host;
}
}
3. 监控告警
# 查询延迟监控
histogram_histogram(query_duration_seconds, "Query duration")
# 查询频率监控
counter(queries_total, "Total queries", ["method"])
# 错误率监控
counter(query_errors_total, "Total query errors", ["method", "error_type"])
总结
查询接口设计要点:
- 双协议支持:HTTP 简单易用,gRPC 高性能
- 丰富的查询能力:状态、路由、记录查询
- 性能优化:索引、缓存、分页
- 安全考虑:访问控制、查询限制、审计日志
- 可观测性:Prometheus 指标导出
最佳实践:
- 快速调试使用 HTTP
- 高频查询使用 gRPC
- 合理设置查询限制,避免过载
- 启用审计日志,追踪查询行为
- 监控查询性能,及时发现瓶颈
下一篇:《故障注入与容错设计:系统健壮性验证》
相关阅读: