代码评审 Agent:静态分析与代码质量评估
代码评审 Agent 结合静态分析工具和 LLM 的代码理解能力,提供多维度的代码质量评估、安全漏洞检测和最佳实践检查,成为现代开发团队的质量把关人。
代码评审 Agent:静态分析与代码质量评估
概述与动机
在软件开发的生命周期中,代码评审是保证代码质量、发现潜在问题、促进知识传承的关键环节。然而,传统的人工代码评审面临着效率低、标准不一、容易疲劳等挑战。特别是在快速迭代的团队中,评审者往往需要在有限的时间内审查大量代码,导致评审深度不足或遗漏重要问题。
代码评审 Agent 通过结合传统静态分析工具的精确性和 LLM 的语义理解能力,为团队提供自动化、智能化的代码审查解决方案。它不仅能检测代码中的潜在 bug、安全漏洞和性能问题,还能从代码风格、架构设计、可维护性等多个维度给出改进建议。更重要的是,Agent 能够理解业务上下文,提供更有针对性的审查意见,大大提升评审的准确性和实用性。
从工程价值角度看,一个优秀的代码评审 Agent 能够将重复性的审查工作自动化,让开发者专注于更有价值的架构设计和业务逻辑讨论。它能作为编码阶段的第一道防线,在代码合并之前就发现问题,避免后期修复的高昂成本。同时,通过标准化的审查规则和量化的质量指标,帮助团队建立统一的代码质量标准,提升整体代码健康度。
核心概念与架构设计
代码理解与 AST 解析
代码评审 Agent 的基础是深度的代码理解能力。不同于简单的文本匹配,它需要解析代码的抽象语法树(AST),理解代码的结构、依赖关系和控制流。AST 解析让 Agent 能够识别出代码的语义信息,比如变量作用域、函数调用关系、类型推断结果等,这些信息对于发现深层问题至关重要。
AST 解析的一个关键优势是语言无关性。无论是对 Python、Java、TypeScript 还是 Go,Agent 都能通过相应的解析器提取出结构化的代码信息。这种结构化表示为后续的分析提供了可靠的数据基础,比如构建调用图、计算圈复杂度、分析数据流等。
多维度质量评估
代码质量是一个多维度概念,包括正确性、可读性、可维护性、性能和安全性等多个方面。代码评审 Agent 需要建立完善的评估体系,从不同角度对代码进行打分和分析。
正确性评估主要关注潜在的运行时错误,比如空指针解引用、数组越界、并发问题等。这些错误往往在运行时才会暴露,但在代码阶段就能通过静态分析和代码理解发现端倪。
可读性评估关注代码的命名规范、注释质量、代码长度和复杂度等方面。命名不清晰、函数过长、嵌套过深等问题都会影响代码的可维护性,Agent 需要识别这些模式并给出改进建议。
安全性评估是代码评审的另一个重要维度。Agent 需要检测常见的安全漏洞,比如 SQL 注入、XSS、硬编码凭证、不安全的随机数生成等。这些漏洞可能导致严重的安全后果,因此在代码评审阶段就需要严格把关。
LLM 增强的代码理解
传统静态分析工具基于规则和模式匹配,擅长检测已知的代码反模式,但在理解代码意图、发现逻辑错误方面能力有限。LLM 的引入弥补了这一不足,它能够理解代码的业务逻辑,发现更深层次的问题。
LLM 增强的代码理解体现在几个方面。首先是上下文感知能力,Agent 能结合需求文档、设计文档和历史代码,理解代码的设计意图和业务背景,从而给出更准确的评审意见。其次是模式识别能力,LLM 能从大量代码中学习常见的代码模式和最佳实践,识别出偏离这些模式的代码。最后是生成能力,Agent 不仅能发现问题,还能直接生成修复建议代码,大大提升问题修复的效率。
Agent 架构设计
代码评审 Agent 的架构采用模块化设计,各组件职责清晰,便于扩展和维护。核心组件包括代码解析模块、规则引擎、LLM 分析模块、结果聚合模块和报告生成模块。
代码解析模块负责将代码转换为结构化的表示,包括 AST、符号表、调用图等。规则引擎执行预定义的静态分析规则,检查代码中的反模式和潜在问题。LLM 分析模块对代码进行语义理解,识别规则引擎难以发现的逻辑问题和设计问题。结果聚合模块整合来自不同源的分析结果,去除重复项,建立问题之间的关联关系。质量评分引擎根据分析结果对代码进行多维度的评分,帮助团队快速了解代码质量状况。报告生成模块将分析结果以清晰的格式呈现给开发者,支持多种输出格式如 Markdown、JSON、HTML 等。
关键技术实现
代码解析与 AST 处理
代码解析是代码评审 Agent 的基础功能。我们需要使用各语言的解析器将代码转换为 AST,并提取出有用的代码信息。Python 有 ast 模块,Java 可以用 JavaParser,TypeScript 可以用 TypeScript Compiler API。
import ast
from typing import Dict, List, Optional
import inspect
class CodeParser:
"""代码解析器,支持多种语言的 AST 解析"""
def __init__(self, language: str = "python"):
self.language = language
self.parsers = {
"python": self._parse_python,
# 可以扩展其他语言
}
def _parse_python(self, code: str) -> Dict:
"""解析 Python 代码为 AST"""
try:
tree = ast.parse(code)
# 提取函数定义
functions = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
functions.append({
"name": node.name,
"lineno": node.lineno,
"args": [arg.arg for arg in node.args.args],
"docstring": ast.get_docstring(node),
"body_start": node.body[0].lineno if node.body else None,
"decorators": [self._get_decorator_name(d) for d in node.decorator_list]
})
# 提取类定义
classes = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
methods = [
n.name for n in node.body
if isinstance(n, ast.FunctionDef)
]
classes.append({
"name": node.name,
"lineno": node.lineno,
"methods": methods,
"bases": [self._get_name(base) for base in node.bases]
})
# 提取导入语句
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
imports.extend([alias.name for alias in node.names])
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
imports.extend([
f"{module}.{alias.name}" for alias in node.names
])
return {
"ast": tree,
"functions": functions,
"classes": classes,
"imports": imports,
"lines": code.split('\n')
}
except SyntaxError as e:
return {
"error": f"语法错误: {e}",
"lineno": e.lineno
}
def _get_decorator_name(self, decorator) -> str:
"""获取装饰器名称"""
if isinstance(decorator, ast.Name):
return decorator.id
elif isinstance(decorator, ast.Attribute):
return self._get_name(decorator)
elif isinstance(decorator, ast.Call):
return self._get_name(decorator.func)
return "unknown"
def _get_name(self, node) -> str:
"""递归获取节点名称"""
if isinstance(node, ast.Name):
return node.id
elif isinstance(node, ast.Attribute):
return f"{self._get_name(node.value)}.{node.attr}"
return ""
def parse(self, code: str) -> Dict:
"""解析代码"""
parser = self.parsers.get(self.language)
if parser:
return parser(code)
else:
return {"error": f"不支持的语言: {self.language}"}
这个代码解析器能够提取代码的结构信息,包括函数、类、导入语句等。这些信息为后续的静态分析和 LLM 理解提供了基础数据。解析器还处理了语法错误的情况,确保 Agent 在面对不合法代码时能够优雅地降级。
静态分析规则引擎
静态分析规则引擎基于 AST 和代码信息,执行预定义的检查规则。这些规则覆盖常见的代码问题和最佳实践,比如复杂的函数、未使用的变量、重复的代码片段等。
from typing import Dict, List, Callable
import re
class StaticAnalysisRules:
"""静态分析规则引擎"""
def __init__(self):
self.rules = [
self.check_function_length,
self.check_function_complexity,
self.check_unused_imports,
self.check_naming_convention,
self.check_duplicate_code,
self.check_security_issues,
self.check_hardcoded_values,
]
def check_function_length(self, code_info: Dict) -> List[Dict]:
"""检查函数长度"""
issues = []
lines = code_info.get("lines", [])
for func in code_info.get("functions", []):
if func.get("body_start") and func.get("lineno"):
func_lines = func["body_start"] - func["lineno"]
if func_lines > 50:
issues.append({
"type": "code_smell",
"severity": "medium",
"message": f"函数 {func['name']} 过长 ({func_lines} 行),建议拆分为更小的函数",
"location": f"Line {func['lineno']}",
"suggestion": "将函数拆分为多个职责单一的小函数"
})
return issues
def check_function_complexity(self, code_info: Dict) -> List[Dict]:
"""检查函数复杂度(简化版圈复杂度)"""
issues = []
for func in code_info.get("functions", []):
func_lines = code_info.get("lines", [])
if func.get("lineno"):
func_code = '\n'.join(func_lines[func["lineno"]-1:func.get("body_start", func["lineno"])])
# 简化的复杂度计算:统计 if/for/while/try 的数量
complexity = len(re.findall(r'\b(if|elif|for|while|try|except|with)\b', func_code))
if complexity > 10:
issues.append({
"type": "code_smell",
"severity": "high",
"message": f"函数 {func['name']} 复杂度过高 (圈复杂度约 {complexity})",
"location": f"Line {func['lineno']}",
"suggestion": "使用策略模式、状态机或拆分函数来降低复杂度"
})
return issues
def check_unused_imports(self, code_info: Dict) -> List[Dict]:
"""检查未使用的导入"""
issues = []
imports = set(code_info.get("imports", []))
# 提取代码中实际使用的标识符
lines = code_info.get("lines", [])
used_identifiers = set()
for line in lines:
# 匹配标识符使用(简化版)
matches = re.findall(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', line)
used_identifiers.update(matches)
# 检查未使用的导入
for imp in imports:
module_name = imp.split('.')[-1] if '.' in imp else imp
if module_name not in used_identifiers:
issues.append({
"type": "code_smell",
"severity": "low",
"message": f"导入 {imp} 未使用",
"location": "Import statement",
"suggestion": "删除未使用的导入以保持代码整洁"
})
return issues
def check_naming_convention(self, code_info: Dict) -> List[Dict]:
"""检查命名规范"""
issues = []
# 检查函数命名(小写蛇形)
for func in code_info.get("functions", []):
name = func.get("name", "")
if name and not re.match(r'^[a-z][a-z0-9_]*$', name):
issues.append({
"type": "code_style",
"severity": "low",
"message": f"函数名 {name} 不符合 Python 命名规范",
"location": f"Line {func['lineno']}",
"suggestion": "函数名应使用小写蛇形命名法,如 get_user_data"
})
# 检查类命名(驼峰)
for cls in code_info.get("classes", []):
name = cls.get("name", "")
if name and not re.match(r'^[A-Z][a-zA-Z0-9]*$', name):
issues.append({
"type": "code_style",
"severity": "low",
"message": f"类名 {name} 不符合 Python 命名规范",
"location": f"Line {cls['lineno']}",
"suggestion": "类名应使用帕斯卡命名法,如 UserData"
})
return issues
def check_duplicate_code(self, code_info: Dict) -> List[Dict]:
"""检查重复代码(简化版)"""
issues = []
lines = code_info.get("lines", [])
# 简单的字符串匹配检测重复
from collections import Counter
code_blocks = []
# 滑动窗口提取代码块
window_size = 5
for i in range(len(lines) - window_size + 1):
block = '\n'.join(lines[i:i+window_size])
code_blocks.append(block)
# 统计重复
block_counts = Counter(code_blocks)
for block, count in block_counts.items():
if count > 1:
issues.append({
"type": "code_smell",
"severity": "medium",
"message": "发现重复代码块",
"location": "Multiple locations",
"suggestion": "将重复代码提取为函数或方法以遵循 DRY 原则"
})
break # 只报告一个例子
return issues
def check_security_issues(self, code_info: Dict) -> List[Dict]:
"""检查安全问题"""
issues = []
lines = code_info.get("lines", [])
security_patterns = {
r'eval\s*\(': "避免使用 eval(),可能导致代码注入漏洞",
r'exec\s*\(': "避免使用 exec(),可能导致代码注入漏洞",
r'pickle\.loads?': "使用 pickle 可能导致对象注入攻击",
r'subprocess\.call\(.*shell=True': "shell=True 可能导致命令注入",
r'os\.system\s*\(': "os.system() 存在命令注入风险",
r'password\s*=\s*["\']': "发现可能硬编码的密码",
r'api_key\s*=\s*["\']': "发现可能硬编码的 API 密钥",
r'secret\s*=\s*["\']': "发现可能硬编码的密钥",
}
for idx, line in enumerate(lines, 1):
for pattern, message in security_patterns.items():
if re.search(pattern, line):
issues.append({
"type": "security",
"severity": "high",
"message": f"安全问题: {message}",
"location": f"Line {idx}",
"suggestion": "使用环境变量或配置管理工具存储敏感信息"
})
return issues
def check_hardcoded_values(self, code_info: Dict) -> List[Dict]:
"""检查硬编码值"""
issues = []
lines = code_info.get("lines", [])
# 检查硬编码的数字(排除常见的 0, 1, -1)
for idx, line in enumerate(lines, 1):
matches = re.findall(r'\b(?!0|1|-1)(\d{2,})\b', line)
if matches and not line.strip().startswith('#'):
issues.append({
"type": "code_smell",
"severity": "low",
"message": f"发现硬编码值: {matches[0]}",
"location": f"Line {idx}",
"suggestion": "考虑将硬编码值提取为命名常量"
})
break # 只报告一个例子
return issues
def run_all_rules(self, code_info: Dict) -> List[Dict]:
"""运行所有规则"""
all_issues = []
for rule in self.rules:
issues = rule(code_info)
all_issues.extend(issues)
# 按严重程度排序
severity_order = {"high": 0, "medium": 1, "low": 2}
all_issues.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3))
return all_issues
这个规则引擎实现了多种检查规则,覆盖了代码质量、安全性和最佳实践等多个方面。每个规则返回一个问题列表,包含问题类型、严重程度、描述、位置和建议修复方案。规则引擎可以轻松扩展新的规则,团队可以根据自己的编码标准定制规则集。
LLM 增强的代码分析
LLM 分析模块利用大语言模型的语义理解能力,发现规则引擎难以检测的深层问题。它能理解代码的意图,发现逻辑错误,并提供更具洞察力的评审建议。
import openai
from typing import Dict, List, Optional
import json
class LLMCodeAnalyzer:
"""LLM 增强的代码分析器"""
def __init__(self, api_key: str, model: str = "gpt-4"):
openai.api_key = api_key
self.model = model
self.analysis_prompts = {
"logic_review": """请分析以下代码的逻辑问题。关注:
1. 边界条件处理是否完整
2. 异常情况是否被正确处理
3. 逻辑是否存在明显的错误或矛盾
4. 是否有潜在的死循环或无限递归风险
代码如下:
{code}
请以 JSON 格式返回分析结果,格式如下:
{{
"issues": [
{{
"type": "logic_error",
"severity": "high|medium|low",
"message": "问题描述",
"location": "代码位置描述",
"suggestion": "改进建议"
}}
],
"summary": "整体评价"
}}""",
"design_review": """请从软件工程角度评估以下代码的设计质量:
1. 单一职责原则:函数/类是否职责单一
2. 开闭原则:代码是否易于扩展,对修改关闭
3. 依赖倒置:是否依赖抽象而非具体实现
4. 接口隔离:接口是否设计合理
5. 代码复用性:是否存在重复逻辑
代码如下:
{code}
请以 JSON 格式返回评估结果:
{{
"scores": {{
"single_responsibility": 1-10,
"open_closed": 1-10,
"dependency_inversion": 1-10,
"interface_segregation": 1-10,
"reusability": 1-10
}},
"suggestions": [
"设计改进建议1",
"设计改进建议2"
]
}}""",
"documentation_review": """评估以下代码的文档质量:
1. 函数/类是否有清晰的文档字符串
2. 参数和返回值是否有说明
3. 复杂逻辑是否有注释
4. 注释是否准确且有用
代码如下:
{code}
请以 JSON 格式返回评估结果:
{{
"issues": [
{{
"location": "代码位置",
"message": "文档问题描述"
}}
],
"improvements": [
"文档改进建议"
]
}}"""
}
async def analyze_code_logic(self, code: str) -> Dict:
"""分析代码逻辑问题"""
prompt = self.analysis_prompts["logic_review"].format(code=code)
try:
response = await openai.ChatCompletion.acreate(
model=self.model,
messages=[
{"role": "system", "content": "你是一个经验丰富的代码评审专家。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return result
except Exception as e:
return {
"error": f"LLM 分析失败: {str(e)}",
"issues": [],
"summary": "分析失败"
}
async def evaluate_design(self, code: str) -> Dict:
"""评估代码设计质量"""
prompt = self.analysis_prompts["design_review"].format(code=code)
try:
response = await openai.ChatCompletion.acreate(
model=self.model,
messages=[
{"role": "system", "content": "你是一个软件架构专家。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return result
except Exception as e:
return {
"error": f"设计评估失败: {str(e)}",
"scores": {},
"suggestions": []
}
async def review_documentation(self, code: str) -> Dict:
"""评审代码文档"""
prompt = self.analysis_prompts["documentation_review"].format(code=code)
try:
response = await openai.ChatCompletion.acreate(
model=self.model,
messages=[
{"role": "system", "content": "你是一个技术文档专家。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return result
except Exception as e:
return {
"error": f"文档评审失败: {str(e)}",
"issues": [],
"improvements": []
}
async def comprehensive_analysis(self, code: str) -> Dict:
"""综合分析"""
logic_result = await self.analyze_code_logic(code)
design_result = await self.evaluate_design(code)
doc_result = await self.review_documentation(code)
return {
"logic_issues": logic_result.get("issues", []),
"logic_summary": logic_result.get("summary", ""),
"design_scores": design_result.get("scores", {}),
"design_suggestions": design_result.get("suggestions", []),
"documentation_issues": doc_result.get("issues", []),
"documentation_improvements": doc_result.get("improvements", [])
}
LLM 分析器通过精心设计的 Prompt 引导模型关注代码的不同方面,包括逻辑正确性、设计质量和文档完整性。每个分析都有明确的目标和输出格式,确保结果的一致性和可用性。LLM 能够发现规则引擎难以识别的问题,比如逻辑错误、设计缺陷和文档不足,为代码评审提供更全面的视角。
结果聚合与质量评分
结果聚合模块将来自静态分析规则引擎和 LLM 分析器的结果整合在一起,进行去重、关联和优先级排序,为开发者提供清晰、可操作的评审报告。
from typing import Dict, List
from collections import defaultdict
class ReviewResultAggregator:
"""评审结果聚合器"""
def __init__(self):
self.severity_weights = {
"high": 10,
"medium": 5,
"low": 1
}
def aggregate_results(self,
static_issues: List[Dict],
llm_results: Dict) -> Dict:
"""聚合静态分析和 LLM 分析结果"""
all_issues = []
# 聚合静态分析问题
for issue in static_issues:
all_issues.append({
**issue,
"source": "static_analysis"
})
# 聚合 LLM 分析的逻辑问题
for issue in llm_results.get("logic_issues", []):
all_issues.append({
**issue,
"source": "llm_analysis"
})
# 聚合文档问题
for issue in llm_results.get("documentation_issues", []):
all_issues.append({
"type": "documentation",
"severity": "low",
"message": issue.get("message", ""),
"location": issue.get("location", ""),
"suggestion": "补充或改进文档",
"source": "llm_analysis"
})
# 去重和分类
deduplicated_issues = self._deduplicate_issues(all_issues)
categorized_issues = self._categorize_issues(deduplicated_issues)
# 计算质量评分
quality_scores = self._calculate_quality_scores(
static_issues,
llm_results
)
return {
"issues": categorized_issues,
"quality_scores": quality_scores,
"summary": self._generate_summary(categorized_issues, quality_scores),
"improvement_suggestions": self._aggregate_suggestions(llm_results)
}
def _deduplicate_issues(self, issues: List[Dict]) -> List[Dict]:
"""去重问题"""
seen = set()
unique_issues = []
for issue in issues:
# 使用类型和消息作为去重键
key = (issue.get("type"), issue.get("message"))
if key not in seen:
seen.add(key)
unique_issues.append(issue)
return unique_issues
def _categorize_issues(self, issues: List[Dict]) -> Dict:
"""按类别分类问题"""
categories = defaultdict(list)
for issue in issues:
issue_type = issue.get("type", "unknown")
categories[issue_type].append(issue)
return dict(categories)
def _calculate_quality_scores(self,
static_issues: List[Dict],
llm_results: Dict) -> Dict:
"""计算质量评分"""
scores = {}
# 基于问题严重程度计算基础分
penalty = 0
for issue in static_issues:
severity = issue.get("severity", "low")
penalty += self.severity_weights.get(severity, 1)
# LLM 逻辑问题影响更大
for issue in llm_results.get("logic_issues", []):
severity = issue.get("severity", "medium")
penalty += self.severity_weights.get(severity, 5) * 2
# 基础分(满分 100)
base_score = max(0, 100 - penalty)
# 设计质量评分
design_scores = llm_results.get("design_scores", {})
avg_design_score = sum(design_scores.values()) / len(design_scores) if design_scores else 7
# 文档质量评分
doc_issues_count = len(llm_results.get("documentation_issues", []))
doc_score = max(0, 10 - doc_issues_count)
# 综合评分
scores = {
"overall": int((base_score * 0.5 + avg_design_score * 10 * 0.3 + doc_score * 2) / 100 * 100),
"correctness": base_score,
"design_quality": int(avg_design_score * 10),
"documentation": doc_score,
"maintainability": int((base_score + avg_design_score * 10) / 2)
}
return scores
def _generate_summary(self,
categorized_issues: Dict,
quality_scores: Dict) -> str:
"""生成评审摘要"""
total_issues = sum(len(issues) for issues in categorized_issues.values())
high_severity = len([
issue for issues in categorized_issues.values()
for issue in issues
if issue.get("severity") == "high"
])
summary = f"代码评审完成。发现 {total_issues} 个问题,其中高严重性问题 {high_severity} 个。"
summary += f"\n整体质量评分: {quality_scores.get('overall', 0)}/100"
if high_severity > 0:
summary += "\n建议优先处理高严重性问题。"
return summary
def _aggregate_suggestions(self, llm_results: Dict) -> List[str]:
"""聚合改进建议"""
suggestions = []
# 设计建议
suggestions.extend(llm_results.get("design_suggestions", []))
# 文档改进建议
suggestions.extend(llm_results.get("documentation_improvements", []))
return suggestions
结果聚合器负责将不同来源的分析结果整合到一起,并进行智能的去重和分类。它还计算多维度的质量评分,帮助开发者快速了解代码的整体质量状况。聚合器还会生成简洁明了的摘要和改进建议,让开发者能够快速抓住重点。
完整的代码评审 Agent
将所有组件组合起来,创建一个完整的代码评审 Agent:
import asyncio
from typing import Dict, List, Optional
class CodeReviewAgent:
"""代码评审 Agent"""
def __init__(self, llm_api_key: str, llm_model: str = "gpt-4"):
self.parser = CodeParser(language="python")
self.static_analyzer = StaticAnalysisRules()
self.llm_analyzer = LLMCodeAnalyzer(llm_api_key, llm_model)
self.aggregator = ReviewResultAggregator()
async def review_code(self,
code: str,
filename: str = "unknown.py") -> Dict:
"""评审代码"""
print(f"开始评审代码: {filename}")
# 1. 解析代码
print("步骤 1/4: 解析代码...")
code_info = self.parser.parse(code)
if "error" in code_info:
return {
"error": code_info["error"],
"filename": filename
}
# 2. 静态分析
print("步骤 2/4: 执行静态分析...")
static_issues = self.static_analyzer.run_all_rules(code_info)
# 3. LLM 分析
print("步骤 3/4: LLM 语义分析...")
llm_results = await self.llm_analyzer.comprehensive_analysis(code)
# 4. 结果聚合
print("步骤 4/4: 聚合结果...")
review_result = self.aggregator.aggregate_results(
static_issues,
llm_results
)
review_result["filename"] = filename
review_result["code_info"] = {
"functions": len(code_info.get("functions", [])),
"classes": len(code_info.get("classes", [])),
"lines": len(code_info.get("lines", []))
}
print(f"评审完成! 发现 {review_result.get('summary', '')}")
return review_result
def generate_markdown_report(self, review_result: Dict) -> str:
"""生成 Markdown 格式的评审报告"""
report = f"""# 代码评审报告
**文件**: {review_result.get('filename', 'unknown')}
**评审时间**: {review_result.get('timestamp', 'Unknown')}
## 代码统计
- 函数数量: {review_result.get('code_info', {}).get('functions', 0)}
- 类数量: {review_result.get('code_info', {}).get('classes', 0)}
- 代码行数: {review_result.get('code_info', {}).get('lines', 0)}
## 评审摘要
{review_result.get('summary', '')}
## 质量评分
"""
# 质量评分表格
scores = review_result.get('quality_scores', {})
report += "| 维度 | 评分 |\n"
report += "|------|------|\n"
for key, value in scores.items():
report += f"| {key} | {value}/100 |\n"
report += "\n"
# 分类问题
issues = review_result.get('issues', {})
for category, category_issues in issues.items():
if category_issues:
report += f"## {category.replace('_', ' ').title()} ({len(category_issues)} 个问题)\n\n"
for issue in category_issues:
severity_icon = {
"high": "🔴",
"medium": "🟡",
"low": "🟢"
}.get(issue.get("severity", "low"), "⚪")
report += f"### {severity_icon} {issue.get('message', 'No message')}\n\n"
report += f"- **位置**: {issue.get('location', 'Unknown')}\n"
report += f"- **严重程度**: {issue.get('severity', 'low')}\n"
report += f"- **来源**: {issue.get('source', 'Unknown')}\n"
report += f"- **建议**: {issue.get('suggestion', 'No suggestion')}\n\n"
# 改进建议
suggestions = review_result.get('improvement_suggestions', [])
if suggestions:
report += "## 改进建议\n\n"
for i, suggestion in enumerate(suggestions, 1):
report += f"{i}. {suggestion}\n"
report += "\n"
return report
# 使用示例
async def main():
# 示例代码
sample_code = '''
import os
import sys
def process_data(data_list, threshold):
result = []
for item in data_list:
if item > threshold:
result.append(item * 2)
elif item < 0:
result.append(0)
else:
result.append(item)
return result
def save_to_file(filename, data):
with open(filename, 'w') as f:
for item in data:
f.write(str(item) + '\\n')
def load_config():
return {
'max_size': 100,
'api_key': 'sk-1234567890abcdef',
'timeout': 30
}
def complex_function(data):
result = []
for item in data:
if item > 0:
if item < 10:
if item % 2 == 0:
result.append(item * 2)
else:
result.append(item)
else:
if item % 3 == 0:
result.append(item * 3)
else:
result.append(item)
else:
result.append(0)
return result
'''
# 创建评审 Agent
agent = CodeReviewAgent(llm_api_key="your-api-key-here")
# 执行评审
result = await agent.review_code(sample_code, filename="example.py")
# 生成报告
report = agent.generate_markdown_report(result)
print(report)
# 保存报告
with open('/Users/liuyutao/Desktop/workspace/code_review_report.md', 'w') as f:
f.write(report)
if __name__ == "__main__":
asyncio.run(main())
这个完整的代码评审 Agent 整合了代码解析、静态分析、LLM 语义分析和结果聚合等所有组件,提供端到端的代码评审功能。它能够生成详细的 Markdown 报告,帮助开发者全面了解代码质量状况和改进方向。
最佳实践与常见陷阱
静态分析规则配置
静态分析规则是代码评审 Agent 的基础,配置得当的规则能够准确捕获代码问题。团队应该根据自己的编码标准和技术栈定制规则集,避免一刀切的通用配置。
规则配置的一个常见陷阱是过于严格或过于宽松。过于严格的规则会产生大量误报,淹没真正的问题,导致开发者对 Agent 的结果失去信任。过于宽松的规则则无法提供有效的质量保证。最佳实践是逐步调整规则,根据团队的实际情况找到平衡点。
另一个重要考虑是规则的分类和优先级。不同类型的规则应该有不同的处理方式,比如安全类问题应该高优先级处理,而代码风格问题可以作为低优先级提醒。Agent 应该支持灵活的规则配置和调整,让团队能够根据项目的不同阶段调整评审标准。
LLM Prompt 工程
LLM 的分析质量很大程度上取决于 Prompt 的设计。好的 Prompt 应该明确、具体、有上下文,引导模型关注正确的方面。
Prompt 工程的一个关键原则是角色设定。给 LLM 分配明确的角色,比如"资深代码评审专家"或"软件架构师",能够引导模型采用合适的视角和方法。另一个重要原则是任务分解,将复杂的评审任务分解为多个子任务,比如逻辑评审、设计评审、文档评审,每个子任务有专门优化的 Prompt。
另一个常见陷阱是 Prompt 过于冗长或包含冲突的指令。简洁明确、目标一致的 Prompt 通常效果更好。同时,应该为 LLM 提供充足的上下文信息,比如代码的用途、相关的设计文档、项目的编码规范等,这些信息能帮助 LLM 做出更准确的判断。
结果过滤和误报处理
代码评审 Agent 难免会产生误报,如何有效过滤误报是一个重要挑战。完全避免误报是不现实的,但可以通过多种方法减少误报的影响。
一种方法是建立白名单机制,允许开发者标记已知的误报,避免重复报告。另一种方法是学习机制,Agent 可以从开发者的反馈中学习,逐步降低特定规则的误报率。第三种方法是置信度评分,为每个问题分配置信度分数,让开发者优先处理高置信度的问题。
误报处理的另一个考虑是问题关联性。多个相关的问题应该被合并为一个更大的问题,而不是零散地报告多个小问题。比如一个函数过长导致的多个相关问题,应该作为一个整体问题报告,并给出重构建议。
集成到开发流程
代码评审 Agent 的价值只有在集成到实际开发流程中才能充分体现。常见的集成点包括 Git 提交钩子、代码评审平台(如 GitHub Pull Request)、CI/CD 流水线等。
集成的一个关键考虑是性能。代码评审可能需要较长时间,特别是在使用 LLM 的情况下。应该根据场景选择合适的策略,比如对小型变更进行实时评审,对大型变更使用异步评审。另一个考虑是用户体验,评审结果应该以开发者友好的方式呈现,比如在代码编辑器中直接显示问题,或者在 Pull Request 中提供清晰的问题列表和修复建议。
集成的另一个最佳实践是渐进式引入。可以先从简单的静态分析规则开始,逐步增加 LLM 分析功能。同时,应该给开发者提供配置选项,允许他们根据自己的偏好调整评审的严格程度和范围。
性能优化考虑
静态分析性能优化
静态分析的性能直接影响 Agent 的响应速度。对于大型代码库,静态分析可能成为性能瓶颈。优化静态分析性能的几种方法包括增量分析、并行处理和缓存。
增量分析只分析变更的代码部分,而不是重新分析整个代码库。这需要维护代码的依赖关系和变更历史,但能够显著提升性能。并行处理则是将分析任务分解为多个子任务,利用多核 CPU 并行执行。缓存则是缓存分析结果,对于未变更的代码部分直接使用缓存结果。
另一个优化方向是规则执行的优化。不是所有规则都需要对所有代码执行,可以根据代码的类型、上下文智能选择适用的规则。比如安全规则更相关于用户输入处理代码,性能规则更相关于计算密集型代码。
LLM 调用优化
LLM 调用是 Agent 的另一个性能瓶颈。优化 LLM 调用的方法包括批处理、缓存、模型选择和 Prompt 压缩。
批处理是将多个小的代码分析请求合并为一个大的请求,减少 API 调用次数。缓存则是缓存常见的代码模式和分析结果,对于相似代码直接使用缓存结果。模型选择是根据任务的复杂程度选择合适的模型,比如简单的规则检查使用较小的模型,复杂的语义分析使用较大的模型。Prompt 压缩则是减少 Prompt 的长度,只包含必要的信息,减少 token 使用量和响应时间。
另一个优化方向是异步处理。LLM 调用可以异步执行,不阻塞其他任务。对于大型代码评审,可以分批处理,边分析边返回结果,而不是等待所有分析完成才返回。
结果缓存和增量更新
代码评审的结果应该被缓存,避免重复分析。同时,应该支持增量更新,只重新分析变更的部分。
缓存的设计需要考虑缓存键和失效策略。缓存键可以基于代码的哈希值、文件路径和时间戳等。失效策略可以是时间失效、内容失效或手动失效。增量更新则需要跟踪代码的变更历史,只分析变更的代码部分。
另一个考虑是缓存的大小管理。代码评审的结果可能占用大量存储空间,需要定期清理过期的缓存,或者使用 LRU 策略管理缓存大小。
成本优化
LLM 调用的成本是代码评审 Agent 运行成本的重要组成部分。优化成本的方法包括智能规则选择、代码预处理和结果复用。
智能规则选择是只在必要时使用 LLM 分析,对于能够通过静态分析解决的问题就不调用 LLM。代码预处理是在调用 LLM 之前对代码进行简化,比如移除注释、格式化代码、提取关键部分等,减少 token 使用量。结果复用是缓存 LLM 的分析结果,对于相似代码复用分析结果。
另一个成本优化方向是使用更经济的模型。对于简单的分析任务,可以使用较小的模型;对于复杂的语义分析,才使用较大的模型。同时,可以考虑使用开源模型或自部署模型,降低调用成本。
参考资源
官方文档和工具
- AST 模块文档 - Python 官方 AST 解析文档
- ESLint - JavaScript 静态分析工具
- Pylint - Python 静态分析工具
- SonarQube - 持续代码质量平台
- DeepCode - AI 增强的代码分析工具
学术论文和研究
- "Automated Code Review with Machine Learning" - 探讨机器学习在代码评审中的应用
- "Large Language Models for Code Understanding" - 研究 LLM 在代码理解中的能力
- "Semantic Code Analysis using Transformer Models" - 基于 Transformer 的代码语义分析
实践指南
- Google Code Review Guide - Google 代码评审最佳实践
- Effective Code Review - 高效代码评审实践
- Code Review Checklist - 代码评审检查清单
通过合理的设计和实现,代码评审 Agent 能够显著提升代码评审的效率和质量,成为开发团队不可或缺的质量把关工具。