查询接口设计:HTTP/gRPC 双协议支持

深入分析 Seastar Log Engine 的查询接口设计,包括 HTTP 和 gRPC 双协议支持、查询优化和安全考虑。

查询接口的重要性

为什么需要查询接口?

虽然日志系统的核心是写入,但在实际生产环境中,查询能力同样重要:

  1. 问题排查:快速定位日志中的错误信息
  2. 数据分析:统计和分析日志中的模式
  3. 运维监控:实时监控系统状态
  4. 审计追踪:记录关键操作日志

查询场景

场景查询模式典型需求
实时监控状态查询当前系统状态、健康检查
问题排查按时间查询查询特定时间段的日志
数据分析按序列号查询查询特定范围的记录
路由调试路由查询验证消息路由规则

双协议设计

为什么支持双协议?

HTTP 和 gRPC 各有优势,适合不同场景:

协议优点缺点适用场景
HTTP简单易用、广泛支持、调试方便性能相对较低、类型检查弱快速调试、脚本自动化
gRPC高性能、类型安全、流式支持复杂度较高、需要代码生成高频查询、微服务调用

协议选择建议

// 使用场景决策树
if (需要快速调试) {
    使用 HTTP;
} else if (高频调用) {
    使用 gRPC;
} else if (需要流式数据) {
    使用 gRPC streaming;
} else {
    两者都支持,根据客户端选择;
}

HTTP 查询接口

端点设计

Seastar Log Engine 提供 4 个核心 HTTP 端点:

GET  /v1/status          # 获取系统状态
GET  /v1/route           # 查询路由规则
GET  /v1/records         # 查询日志记录
GET  /metrics            # Prometheus 指标

1. Status 端点

端点GET /v1/status

用途:获取系统整体状态和健康检查

返回示例

{
  "health": "healthy",
  "health_reason": "no_recent_errors",
  "health_reason_basis": "no_errors_in_window",
  "recovery_fallback_reason": "none",
  "routing_strategy": "consistent_hashing",
  "routing_shards": 4,
  "routing_virtual_nodes": 256,
  "ring_size": 1024,
  "log_dir": "./logs",
  "archive_dir": "./archive",
  "shard_file_prefix": "shard",
  "reader_stats": {
    "segments_read": 1024,
    "archive_segments_read": 256,
    "active_segments_read": 768,
    "records_returned": 50000,
    "corrupted_segments": 0,
    "corrupted_lines": 5,
    "gzip_read_errors": 2
  },
  "log_manager_stats": {
    "rotate_operations": 128,
    "checkpoint_write_successes": 953,
    "checkpoint_write_failures": 0,
    "recovery_fallbacks": 1,
    "recovery_fallback_incomplete_checkpoint": 0,
    "recovery_fallback_stale_checkpoint": 1,
    "gzip_archive_successes": 120,
    "gzip_archive_failures": 0
  },
  "health_recent_errors": {
    "reader_corrupted_segments": 0,
    "reader_corrupted_lines": 5,
    "reader_gzip_read_errors": 2,
    "log_manager_checkpoint_failures": 0,
    "log_manager_gzip_failures": 0,
    "log_manager_recovery_fallbacks": 1
  }
}

使用示例

# 健康检查
curl http://localhost:18080/v1/status

# 解析健康状态
health=$(curl -s http://localhost:18080/v1/status | jq -r '.health')
if [ "$health" != "healthy" ]; then
    echo "System unhealthy!"
fi

2. Route 端点

端点GET /v1/route?key=<route_key>

用途:查询消息应该路由到哪个 shard

返回示例

{
  "route_key": "user:123",
  "shard": 2,
  "hash": 1234567890,
  "token": 54321,
  "used_local_fallback": false
}

使用场景

# 验证路由规则
curl http://localhost:18080/v1/route?key=user:123

# 批量验证
for key in "user:123" "user:456" "order:789"; do
    echo "$key -> $(curl -s "http://localhost:18080/v1/route?key=$key" | jq -r '.shard')"
done

3. Records 端点

端点GET /v1/records?<params>

参数

参数类型必填说明
shardint指定 shard,不指定则查询所有
seq_fromint起始序列号
seq_toint结束序列号
time_fromstring起始时间 (RFC3339)
time_tostring结束时间 (RFC3339)
limitint返回记录数限制
include_archivebool是否包含归档文件

返回示例

{
  "records": [
    {
      "crc": 1234567890,
      "timestamp": "2026-05-15T10:30:00Z",
      "shard": 2,
      "has_sequence": true,
      "sequence": 100,
      "level": "INFO",
      "payload": "Processing request user:123",
      "raw_line": "ts=2026-05-15T10:30:00Z shard=2 seq=100 level=INFO crc=a1b2c3d4 Processing request user:123"
    }
  ]
}

使用示例

# 查询最近的 10 条记录
curl "http://localhost:18080/v1/records?limit=10"

# 查询特定 shard 的记录
curl "http://localhost:18080/v1/records?shard=2&limit=100"

# 按序列号范围查询
curl "http://localhost:18080/v1/records?seq_from=100&seq_to=200&limit=50"

# 按时间范围查询
curl "http://localhost:18080/v1/records?time_from=2026-05-15T10:00:00Z&time_to=2026-05-15T11:00:00Z&limit=100"

# 包含归档文件
curl "http://localhost:18080/v1/records?include_archive=true&limit=100"

4. Metrics 端点

端点GET /metrics

用途:Prometheus 格式的指标导出

返回示例

# log_engine_writer metrics
log_engine_writer_submitted_messages{shard="0"} 125000
log_engine_writer_submitted_bytes{shard="0"} 32000000
log_engine_writer_flushed_batches{shard="0"} 1250
log_engine_writer_flushed_bytes{shard="0"} 31750000
log_engine_writer_flush_errors{shard="0"} 0
log_engine_writer_backpressure_waits{shard="0"} 0
log_engine_writer_pending_entries{shard="0"} 5
log_engine_writer_pending_bytes{shard="0"} 12800

# log_engine_reader metrics
log_engine_reader_segments_read{shard="0"} 256
log_engine_reader_archive_segments_read{shard="0"} 64
log_engine_reader_active_segments_read{shard="0"} 192
log_engine_reader_records_returned{shard="0"} 12500
log_engine_reader_corrupted_segments{shard="0"} 0
log_engine_reader_corrupted_lines{shard="0"} 2
log_engine_reader_gzip_read_errors{shard="0"} 0

gRPC 查询接口

Protocol Buffers 定义

syntax = "proto3";

package logengine.query.v1;

message Empty {}

message StatusReply {
  string routing_strategy = 1;
  uint32 routing_shards = 2;
  uint64 routing_virtual_nodes = 3;
  // ... 其他字段
}

message RouteRequest {
  string route_key = 1;
}

message RouteReply {
  string route_key = 1;
  uint32 shard = 2;
  uint64 hash = 3;
  uint64 token = 4;
  bool used_local_fallback = 5;
}

message QueryRecordsRequest {
  optional uint32 shard = 1;
  optional uint64 seq_from = 2;
  optional uint64 seq_to = 3;
  optional string time_from = 4;
  optional string time_to = 5;
  uint64 limit = 6;
  optional bool include_archive = 7;
}

message QueryRecord {
  uint32 crc = 1;
  string timestamp = 2;
  uint32 shard = 3;
  bool has_sequence = 4;
  uint64 sequence = 5;
  string level = 6;
  string payload = 7;
  string raw_line = 8;
}

message QueryRecordsReply {
  repeated QueryRecord records = 1;
}

service QueryService {
  rpc GetStatus(Empty) returns (StatusReply);
  rpc Route(RouteRequest) returns (RouteReply);
  rpc QueryRecords(QueryRecordsRequest) returns (QueryRecordsReply);
}

Client 使用示例

C++ Client

#include "log_engine/query_client.hh"

// 创建客户端
QueryClient client("127.0.0.1:19090");

// 查询状态
auto status = client.get_status();
std::cout << "Health: " << status.health() << std::endl;

// 查询路由
auto route = client.route("user:123");
std::cout << "Route: " << route.route_key() << " -> shard " << route.shard() << std::endl;

// 查询记录
QueryRecordsRequest request;
request.set_limit(100);
request.set_shard(2);

auto reply = client.query_records(request);
for (const auto& record : reply.records()) {
    std::cout << record.payload() << std::endl;
}

Python Client

import grpc
from logengine_query_pb2 import *
from logengine_query_pb2_grpc import *

# 创建客户端
channel = grpc.insecure_channel('127.0.0.1:19090')
stub = QueryServiceStub(channel)

# 查询状态
status = stub.GetStatus(Empty())
print(f"Health: {status.health}")

# 查询路由
route_request = RouteRequest(route_key="user:123")
route = stub.Route(route_request)
print(f"Route: {route.route_key} -> shard {route.shard}")

# 查询记录
query_request = QueryRecordsRequest(
    shard=2,
    limit=100
)
reply = stub.QueryRecords(query_request)
for record in reply.records:
    print(record.payload)

查询优化

1. 索引优化

// 按序列号查询 - 直接定位
if (query.seq_from.has_value()) {
    // 直接跳到目标序列号附近
    offset = estimate_offset_by_sequence(query.seq_from.value());
}

// 按时间查询 - 使用时间索引
if (query.time_from.has_value()) {
    // 使用时间戳二分查找
    offset = binary_search_by_time(query.time_from.value());
}

2. 缓存策略

class QueryCache {
private:
    std::unordered_map<QueryKey, CachedResult> _cache;
    size_t _max_size = 1000;

public:
    std::optional<QueryResult> get(const QueryKey& key) {
        auto it = _cache.find(key);
        if (it != _cache.end()) {
            if (is_valid(it->second)) {
                return it->second.result;
            }
            _cache.erase(it);
        }
        return std::nullopt;
    }

    void put(const QueryKey& key, const QueryResult& result) {
        if (_cache.size() >= _max_size) {
            evict_oldest();
        }
        _cache[key] = {result, current_time()};
    }
};

3. 分页优化

// 支持游标分页
QueryResult query_with_cursor(const Query& query, const std::string& cursor) {
    size_t offset = decode_cursor(cursor);
    auto records = read_records(query, offset);

    std::string next_cursor;
    if (records.size() == query.limit) {
        next_cursor = encode_cursor(offset + records.size());
    }

    return {records, next_cursor};
}

安全考虑

1. 访问控制

// 简单的 IP 白名单
class AccessControl {
private:
    std::unordered_set<std::string> _whitelist = {
        "127.0.0.1",
        "10.0.0.0/8"
    };

public:
    bool is_allowed(const std::string& ip) {
        // 检查白名单
        for (const auto& allowed : _whitelist) {
            if (ip_match(ip, allowed)) {
                return true;
            }
        }
        return false;
    }
};

2. 查询限制

// 限制查询范围和结果数量
void validate_query(const QueryRecordsRequest& request) {
    // 限制返回数量
    if (request.limit() > 10000) {
        throw std::runtime_error("Limit too large, max 10000");
    }

    // 限制时间范围
    if (request.time_from().has_value() && request.time_to().has_value()) {
        auto duration = parse_time(request.time_to()) -
                       parse_time(request.time_from());
        if (duration > std::chrono::hours(24)) {
            throw std::runtime_error("Time range too large, max 24 hours");
        }
    }
}

3. 审计日志

// 记录查询操作
void audit_query(const std::string& client_ip, const QueryRecordsRequest& request) {
    AuditLog entry{
        .timestamp = current_time(),
        .client_ip = client_ip,
        .query_type = "QueryRecords",
        .parameters = serialize_query(request)
    };
    audit_logger.log(entry);
}

性能基准

HTTP vs gRPC 性能对比

测试环境

  • 消息数:10,000
  • 并发度:10
  • Payload 大小:256 bytes

测试结果

操作HTTPgRPC提升
GetStatus50 ops/s200 ops/s4x
Route100 ops/s400 ops/s4x
QueryRecords (100)20 ops/s80 ops/s4x
QueryRecords (1000)5 ops/s20 ops/s4x

结论:gRPC 性能显著优于 HTTP,适合高频查询场景。

部署建议

1. 独立部署查询服务

# 查询服务独立进程
./build/log_engine_query_server \
  --log-dir ./logs \
  --archive-dir ./archive \
  --http-address 0.0.0.0 \
  --http-port 18080 \
  --grpc-address 0.0.0.0 \
  --grpc-port 19090 \
  --metrics-address 0.0.0.0 \
  --metrics-port 19181

2. 负载均衡

upstream query_backend {
    server 127.0.0.1:19090;
    server 127.0.0.1:19091;
    server 127.0.0.1:19092;
}

server {
    listen 80;

    location /v1/ {
        proxy_pass http://query_backend;
        proxy_set_header Host $host;
    }
}

3. 监控告警

# 查询延迟监控
histogram_histogram(query_duration_seconds, "Query duration")

# 查询频率监控
counter(queries_total, "Total queries", ["method"])

# 错误率监控
counter(query_errors_total, "Total query errors", ["method", "error_type"])

总结

查询接口设计要点:

  1. 双协议支持:HTTP 简单易用,gRPC 高性能
  2. 丰富的查询能力:状态、路由、记录查询
  3. 性能优化:索引、缓存、分页
  4. 安全考虑:访问控制、查询限制、审计日志
  5. 可观测性:Prometheus 指标导出

最佳实践

  • 快速调试使用 HTTP
  • 高频查询使用 gRPC
  • 合理设置查询限制,避免过载
  • 启用审计日志,追踪查询行为
  • 监控查询性能,及时发现瓶颈

下一篇:《故障注入与容错设计:系统健壮性验证》

相关阅读