代码评审 Agent:静态分析与代码质量评估

代码评审 Agent 结合静态分析工具和 LLM 的代码理解能力,提供多维度的代码质量评估、安全漏洞检测和最佳实践检查,成为现代开发团队的质量把关人。

代码评审 Agent:静态分析与代码质量评估

概述与动机

在软件开发的生命周期中,代码评审是保证代码质量、发现潜在问题、促进知识传承的关键环节。然而,传统的人工代码评审面临着效率低、标准不一、容易疲劳等挑战。特别是在快速迭代的团队中,评审者往往需要在有限的时间内审查大量代码,导致评审深度不足或遗漏重要问题。

代码评审 Agent 通过结合传统静态分析工具的精确性和 LLM 的语义理解能力,为团队提供自动化、智能化的代码审查解决方案。它不仅能检测代码中的潜在 bug、安全漏洞和性能问题,还能从代码风格、架构设计、可维护性等多个维度给出改进建议。更重要的是,Agent 能够理解业务上下文,提供更有针对性的审查意见,大大提升评审的准确性和实用性。

从工程价值角度看,一个优秀的代码评审 Agent 能够将重复性的审查工作自动化,让开发者专注于更有价值的架构设计和业务逻辑讨论。它能作为编码阶段的第一道防线,在代码合并之前就发现问题,避免后期修复的高昂成本。同时,通过标准化的审查规则和量化的质量指标,帮助团队建立统一的代码质量标准,提升整体代码健康度。

核心概念与架构设计

代码理解与 AST 解析

代码评审 Agent 的基础是深度的代码理解能力。不同于简单的文本匹配,它需要解析代码的抽象语法树(AST),理解代码的结构、依赖关系和控制流。AST 解析让 Agent 能够识别出代码的语义信息,比如变量作用域、函数调用关系、类型推断结果等,这些信息对于发现深层问题至关重要。

AST 解析的一个关键优势是语言无关性。无论是对 Python、Java、TypeScript 还是 Go,Agent 都能通过相应的解析器提取出结构化的代码信息。这种结构化表示为后续的分析提供了可靠的数据基础,比如构建调用图、计算圈复杂度、分析数据流等。

多维度质量评估

代码质量是一个多维度概念,包括正确性、可读性、可维护性、性能和安全性等多个方面。代码评审 Agent 需要建立完善的评估体系,从不同角度对代码进行打分和分析。

正确性评估主要关注潜在的运行时错误,比如空指针解引用、数组越界、并发问题等。这些错误往往在运行时才会暴露,但在代码阶段就能通过静态分析和代码理解发现端倪。

可读性评估关注代码的命名规范、注释质量、代码长度和复杂度等方面。命名不清晰、函数过长、嵌套过深等问题都会影响代码的可维护性,Agent 需要识别这些模式并给出改进建议。

安全性评估是代码评审的另一个重要维度。Agent 需要检测常见的安全漏洞,比如 SQL 注入、XSS、硬编码凭证、不安全的随机数生成等。这些漏洞可能导致严重的安全后果,因此在代码评审阶段就需要严格把关。

LLM 增强的代码理解

传统静态分析工具基于规则和模式匹配,擅长检测已知的代码反模式,但在理解代码意图、发现逻辑错误方面能力有限。LLM 的引入弥补了这一不足,它能够理解代码的业务逻辑,发现更深层次的问题。

LLM 增强的代码理解体现在几个方面。首先是上下文感知能力,Agent 能结合需求文档、设计文档和历史代码,理解代码的设计意图和业务背景,从而给出更准确的评审意见。其次是模式识别能力,LLM 能从大量代码中学习常见的代码模式和最佳实践,识别出偏离这些模式的代码。最后是生成能力,Agent 不仅能发现问题,还能直接生成修复建议代码,大大提升问题修复的效率。

Agent 架构设计

代码评审 Agent 的架构采用模块化设计,各组件职责清晰,便于扩展和维护。核心组件包括代码解析模块、规则引擎、LLM 分析模块、结果聚合模块和报告生成模块。

Rendering diagram...

代码解析模块负责将代码转换为结构化的表示,包括 AST、符号表、调用图等。规则引擎执行预定义的静态分析规则,检查代码中的反模式和潜在问题。LLM 分析模块对代码进行语义理解,识别规则引擎难以发现的逻辑问题和设计问题。结果聚合模块整合来自不同源的分析结果,去除重复项,建立问题之间的关联关系。质量评分引擎根据分析结果对代码进行多维度的评分,帮助团队快速了解代码质量状况。报告生成模块将分析结果以清晰的格式呈现给开发者,支持多种输出格式如 Markdown、JSON、HTML 等。

关键技术实现

代码解析与 AST 处理

代码解析是代码评审 Agent 的基础功能。我们需要使用各语言的解析器将代码转换为 AST,并提取出有用的代码信息。Python 有 ast 模块,Java 可以用 JavaParser,TypeScript 可以用 TypeScript Compiler API。

import ast
from typing import Dict, List, Optional
import inspect

class CodeParser:
    """代码解析器,支持多种语言的 AST 解析"""

    def __init__(self, language: str = "python"):
        self.language = language
        self.parsers = {
            "python": self._parse_python,
            # 可以扩展其他语言
        }

    def _parse_python(self, code: str) -> Dict:
        """解析 Python 代码为 AST"""
        try:
            tree = ast.parse(code)

            # 提取函数定义
            functions = []
            for node in ast.walk(tree):
                if isinstance(node, ast.FunctionDef):
                    functions.append({
                        "name": node.name,
                        "lineno": node.lineno,
                        "args": [arg.arg for arg in node.args.args],
                        "docstring": ast.get_docstring(node),
                        "body_start": node.body[0].lineno if node.body else None,
                        "decorators": [self._get_decorator_name(d) for d in node.decorator_list]
                    })

            # 提取类定义
            classes = []
            for node in ast.walk(tree):
                if isinstance(node, ast.ClassDef):
                    methods = [
                        n.name for n in node.body
                        if isinstance(n, ast.FunctionDef)
                    ]
                    classes.append({
                        "name": node.name,
                        "lineno": node.lineno,
                        "methods": methods,
                        "bases": [self._get_name(base) for base in node.bases]
                    })

            # 提取导入语句
            imports = []
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    imports.extend([alias.name for alias in node.names])
                elif isinstance(node, ast.ImportFrom):
                    module = node.module or ""
                    imports.extend([
                        f"{module}.{alias.name}" for alias in node.names
                    ])

            return {
                "ast": tree,
                "functions": functions,
                "classes": classes,
                "imports": imports,
                "lines": code.split('\n')
            }

        except SyntaxError as e:
            return {
                "error": f"语法错误: {e}",
                "lineno": e.lineno
            }

    def _get_decorator_name(self, decorator) -> str:
        """获取装饰器名称"""
        if isinstance(decorator, ast.Name):
            return decorator.id
        elif isinstance(decorator, ast.Attribute):
            return self._get_name(decorator)
        elif isinstance(decorator, ast.Call):
            return self._get_name(decorator.func)
        return "unknown"

    def _get_name(self, node) -> str:
        """递归获取节点名称"""
        if isinstance(node, ast.Name):
            return node.id
        elif isinstance(node, ast.Attribute):
            return f"{self._get_name(node.value)}.{node.attr}"
        return ""

    def parse(self, code: str) -> Dict:
        """解析代码"""
        parser = self.parsers.get(self.language)
        if parser:
            return parser(code)
        else:
            return {"error": f"不支持的语言: {self.language}"}

这个代码解析器能够提取代码的结构信息,包括函数、类、导入语句等。这些信息为后续的静态分析和 LLM 理解提供了基础数据。解析器还处理了语法错误的情况,确保 Agent 在面对不合法代码时能够优雅地降级。

静态分析规则引擎

静态分析规则引擎基于 AST 和代码信息,执行预定义的检查规则。这些规则覆盖常见的代码问题和最佳实践,比如复杂的函数、未使用的变量、重复的代码片段等。

from typing import Dict, List, Callable
import re

class StaticAnalysisRules:
    """静态分析规则引擎"""

    def __init__(self):
        self.rules = [
            self.check_function_length,
            self.check_function_complexity,
            self.check_unused_imports,
            self.check_naming_convention,
            self.check_duplicate_code,
            self.check_security_issues,
            self.check_hardcoded_values,
        ]

    def check_function_length(self, code_info: Dict) -> List[Dict]:
        """检查函数长度"""
        issues = []
        lines = code_info.get("lines", [])

        for func in code_info.get("functions", []):
            if func.get("body_start") and func.get("lineno"):
                func_lines = func["body_start"] - func["lineno"]
                if func_lines > 50:
                    issues.append({
                        "type": "code_smell",
                        "severity": "medium",
                        "message": f"函数 {func['name']} 过长 ({func_lines} 行),建议拆分为更小的函数",
                        "location": f"Line {func['lineno']}",
                        "suggestion": "将函数拆分为多个职责单一的小函数"
                    })

        return issues

    def check_function_complexity(self, code_info: Dict) -> List[Dict]:
        """检查函数复杂度(简化版圈复杂度)"""
        issues = []

        for func in code_info.get("functions", []):
            func_lines = code_info.get("lines", [])
            if func.get("lineno"):
                func_code = '\n'.join(func_lines[func["lineno"]-1:func.get("body_start", func["lineno"])])

                # 简化的复杂度计算:统计 if/for/while/try 的数量
                complexity = len(re.findall(r'\b(if|elif|for|while|try|except|with)\b', func_code))
                if complexity > 10:
                    issues.append({
                        "type": "code_smell",
                        "severity": "high",
                        "message": f"函数 {func['name']} 复杂度过高 (圈复杂度约 {complexity})",
                        "location": f"Line {func['lineno']}",
                        "suggestion": "使用策略模式、状态机或拆分函数来降低复杂度"
                    })

        return issues

    def check_unused_imports(self, code_info: Dict) -> List[Dict]:
        """检查未使用的导入"""
        issues = []
        imports = set(code_info.get("imports", []))

        # 提取代码中实际使用的标识符
        lines = code_info.get("lines", [])
        used_identifiers = set()

        for line in lines:
            # 匹配标识符使用(简化版)
            matches = re.findall(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', line)
            used_identifiers.update(matches)

        # 检查未使用的导入
        for imp in imports:
            module_name = imp.split('.')[-1] if '.' in imp else imp
            if module_name not in used_identifiers:
                issues.append({
                    "type": "code_smell",
                    "severity": "low",
                    "message": f"导入 {imp} 未使用",
                    "location": "Import statement",
                    "suggestion": "删除未使用的导入以保持代码整洁"
                })

        return issues

    def check_naming_convention(self, code_info: Dict) -> List[Dict]:
        """检查命名规范"""
        issues = []

        # 检查函数命名(小写蛇形)
        for func in code_info.get("functions", []):
            name = func.get("name", "")
            if name and not re.match(r'^[a-z][a-z0-9_]*$', name):
                issues.append({
                    "type": "code_style",
                    "severity": "low",
                    "message": f"函数名 {name} 不符合 Python 命名规范",
                    "location": f"Line {func['lineno']}",
                    "suggestion": "函数名应使用小写蛇形命名法,如 get_user_data"
                })

        # 检查类命名(驼峰)
        for cls in code_info.get("classes", []):
            name = cls.get("name", "")
            if name and not re.match(r'^[A-Z][a-zA-Z0-9]*$', name):
                issues.append({
                    "type": "code_style",
                    "severity": "low",
                    "message": f"类名 {name} 不符合 Python 命名规范",
                    "location": f"Line {cls['lineno']}",
                    "suggestion": "类名应使用帕斯卡命名法,如 UserData"
                })

        return issues

    def check_duplicate_code(self, code_info: Dict) -> List[Dict]:
        """检查重复代码(简化版)"""
        issues = []
        lines = code_info.get("lines", [])

        # 简单的字符串匹配检测重复
        from collections import Counter
        code_blocks = []

        # 滑动窗口提取代码块
        window_size = 5
        for i in range(len(lines) - window_size + 1):
            block = '\n'.join(lines[i:i+window_size])
            code_blocks.append(block)

        # 统计重复
        block_counts = Counter(code_blocks)
        for block, count in block_counts.items():
            if count > 1:
                issues.append({
                    "type": "code_smell",
                    "severity": "medium",
                    "message": "发现重复代码块",
                    "location": "Multiple locations",
                    "suggestion": "将重复代码提取为函数或方法以遵循 DRY 原则"
                })
                break  # 只报告一个例子

        return issues

    def check_security_issues(self, code_info: Dict) -> List[Dict]:
        """检查安全问题"""
        issues = []
        lines = code_info.get("lines", [])

        security_patterns = {
            r'eval\s*\(': "避免使用 eval(),可能导致代码注入漏洞",
            r'exec\s*\(': "避免使用 exec(),可能导致代码注入漏洞",
            r'pickle\.loads?': "使用 pickle 可能导致对象注入攻击",
            r'subprocess\.call\(.*shell=True': "shell=True 可能导致命令注入",
            r'os\.system\s*\(': "os.system() 存在命令注入风险",
            r'password\s*=\s*["\']': "发现可能硬编码的密码",
            r'api_key\s*=\s*["\']': "发现可能硬编码的 API 密钥",
            r'secret\s*=\s*["\']': "发现可能硬编码的密钥",
        }

        for idx, line in enumerate(lines, 1):
            for pattern, message in security_patterns.items():
                if re.search(pattern, line):
                    issues.append({
                        "type": "security",
                        "severity": "high",
                        "message": f"安全问题: {message}",
                        "location": f"Line {idx}",
                        "suggestion": "使用环境变量或配置管理工具存储敏感信息"
                    })

        return issues

    def check_hardcoded_values(self, code_info: Dict) -> List[Dict]:
        """检查硬编码值"""
        issues = []
        lines = code_info.get("lines", [])

        # 检查硬编码的数字(排除常见的 0, 1, -1)
        for idx, line in enumerate(lines, 1):
            matches = re.findall(r'\b(?!0|1|-1)(\d{2,})\b', line)
            if matches and not line.strip().startswith('#'):
                issues.append({
                    "type": "code_smell",
                    "severity": "low",
                    "message": f"发现硬编码值: {matches[0]}",
                    "location": f"Line {idx}",
                    "suggestion": "考虑将硬编码值提取为命名常量"
                })
                break  # 只报告一个例子

        return issues

    def run_all_rules(self, code_info: Dict) -> List[Dict]:
        """运行所有规则"""
        all_issues = []
        for rule in self.rules:
            issues = rule(code_info)
            all_issues.extend(issues)

        # 按严重程度排序
        severity_order = {"high": 0, "medium": 1, "low": 2}
        all_issues.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3))

        return all_issues

这个规则引擎实现了多种检查规则,覆盖了代码质量、安全性和最佳实践等多个方面。每个规则返回一个问题列表,包含问题类型、严重程度、描述、位置和建议修复方案。规则引擎可以轻松扩展新的规则,团队可以根据自己的编码标准定制规则集。

LLM 增强的代码分析

LLM 分析模块利用大语言模型的语义理解能力,发现规则引擎难以检测的深层问题。它能理解代码的意图,发现逻辑错误,并提供更具洞察力的评审建议。

import openai
from typing import Dict, List, Optional
import json

class LLMCodeAnalyzer:
    """LLM 增强的代码分析器"""

    def __init__(self, api_key: str, model: str = "gpt-4"):
        openai.api_key = api_key
        self.model = model
        self.analysis_prompts = {
            "logic_review": """请分析以下代码的逻辑问题。关注:
1. 边界条件处理是否完整
2. 异常情况是否被正确处理
3. 逻辑是否存在明显的错误或矛盾
4. 是否有潜在的死循环或无限递归风险

代码如下:

{code}


请以 JSON 格式返回分析结果,格式如下:
{{
  "issues": [
    {{
      "type": "logic_error",
      "severity": "high|medium|low",
      "message": "问题描述",
      "location": "代码位置描述",
      "suggestion": "改进建议"
    }}
  ],
  "summary": "整体评价"
}}""",

            "design_review": """请从软件工程角度评估以下代码的设计质量:
1. 单一职责原则:函数/类是否职责单一
2. 开闭原则:代码是否易于扩展,对修改关闭
3. 依赖倒置:是否依赖抽象而非具体实现
4. 接口隔离:接口是否设计合理
5. 代码复用性:是否存在重复逻辑

代码如下:

{code}


请以 JSON 格式返回评估结果:
{{
  "scores": {{
    "single_responsibility": 1-10,
    "open_closed": 1-10,
    "dependency_inversion": 1-10,
    "interface_segregation": 1-10,
    "reusability": 1-10
  }},
  "suggestions": [
    "设计改进建议1",
    "设计改进建议2"
  ]
}}""",

            "documentation_review": """评估以下代码的文档质量:
1. 函数/类是否有清晰的文档字符串
2. 参数和返回值是否有说明
3. 复杂逻辑是否有注释
4. 注释是否准确且有用

代码如下:

{code}


请以 JSON 格式返回评估结果:
{{
  "issues": [
    {{
      "location": "代码位置",
      "message": "文档问题描述"
    }}
  ],
  "improvements": [
    "文档改进建议"
  ]
}}"""
        }

    async def analyze_code_logic(self, code: str) -> Dict:
        """分析代码逻辑问题"""
        prompt = self.analysis_prompts["logic_review"].format(code=code)

        try:
            response = await openai.ChatCompletion.acreate(
                model=self.model,
                messages=[
                    {"role": "system", "content": "你是一个经验丰富的代码评审专家。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )

            result = json.loads(response.choices[0].message.content)
            return result

        except Exception as e:
            return {
                "error": f"LLM 分析失败: {str(e)}",
                "issues": [],
                "summary": "分析失败"
            }

    async def evaluate_design(self, code: str) -> Dict:
        """评估代码设计质量"""
        prompt = self.analysis_prompts["design_review"].format(code=code)

        try:
            response = await openai.ChatCompletion.acreate(
                model=self.model,
                messages=[
                    {"role": "system", "content": "你是一个软件架构专家。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )

            result = json.loads(response.choices[0].message.content)
            return result

        except Exception as e:
            return {
                "error": f"设计评估失败: {str(e)}",
                "scores": {},
                "suggestions": []
            }

    async def review_documentation(self, code: str) -> Dict:
        """评审代码文档"""
        prompt = self.analysis_prompts["documentation_review"].format(code=code)

        try:
            response = await openai.ChatCompletion.acreate(
                model=self.model,
                messages=[
                    {"role": "system", "content": "你是一个技术文档专家。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )

            result = json.loads(response.choices[0].message.content)
            return result

        except Exception as e:
            return {
                "error": f"文档评审失败: {str(e)}",
                "issues": [],
                "improvements": []
            }

    async def comprehensive_analysis(self, code: str) -> Dict:
        """综合分析"""
        logic_result = await self.analyze_code_logic(code)
        design_result = await self.evaluate_design(code)
        doc_result = await self.review_documentation(code)

        return {
            "logic_issues": logic_result.get("issues", []),
            "logic_summary": logic_result.get("summary", ""),
            "design_scores": design_result.get("scores", {}),
            "design_suggestions": design_result.get("suggestions", []),
            "documentation_issues": doc_result.get("issues", []),
            "documentation_improvements": doc_result.get("improvements", [])
        }

LLM 分析器通过精心设计的 Prompt 引导模型关注代码的不同方面,包括逻辑正确性、设计质量和文档完整性。每个分析都有明确的目标和输出格式,确保结果的一致性和可用性。LLM 能够发现规则引擎难以识别的问题,比如逻辑错误、设计缺陷和文档不足,为代码评审提供更全面的视角。

结果聚合与质量评分

结果聚合模块将来自静态分析规则引擎和 LLM 分析器的结果整合在一起,进行去重、关联和优先级排序,为开发者提供清晰、可操作的评审报告。

from typing import Dict, List
from collections import defaultdict

class ReviewResultAggregator:
    """评审结果聚合器"""

    def __init__(self):
        self.severity_weights = {
            "high": 10,
            "medium": 5,
            "low": 1
        }

    def aggregate_results(self,
                          static_issues: List[Dict],
                          llm_results: Dict) -> Dict:
        """聚合静态分析和 LLM 分析结果"""
        all_issues = []

        # 聚合静态分析问题
        for issue in static_issues:
            all_issues.append({
                **issue,
                "source": "static_analysis"
            })

        # 聚合 LLM 分析的逻辑问题
        for issue in llm_results.get("logic_issues", []):
            all_issues.append({
                **issue,
                "source": "llm_analysis"
            })

        # 聚合文档问题
        for issue in llm_results.get("documentation_issues", []):
            all_issues.append({
                "type": "documentation",
                "severity": "low",
                "message": issue.get("message", ""),
                "location": issue.get("location", ""),
                "suggestion": "补充或改进文档",
                "source": "llm_analysis"
            })

        # 去重和分类
        deduplicated_issues = self._deduplicate_issues(all_issues)
        categorized_issues = self._categorize_issues(deduplicated_issues)

        # 计算质量评分
        quality_scores = self._calculate_quality_scores(
            static_issues,
            llm_results
        )

        return {
            "issues": categorized_issues,
            "quality_scores": quality_scores,
            "summary": self._generate_summary(categorized_issues, quality_scores),
            "improvement_suggestions": self._aggregate_suggestions(llm_results)
        }

    def _deduplicate_issues(self, issues: List[Dict]) -> List[Dict]:
        """去重问题"""
        seen = set()
        unique_issues = []

        for issue in issues:
            # 使用类型和消息作为去重键
            key = (issue.get("type"), issue.get("message"))
            if key not in seen:
                seen.add(key)
                unique_issues.append(issue)

        return unique_issues

    def _categorize_issues(self, issues: List[Dict]) -> Dict:
        """按类别分类问题"""
        categories = defaultdict(list)

        for issue in issues:
            issue_type = issue.get("type", "unknown")
            categories[issue_type].append(issue)

        return dict(categories)

    def _calculate_quality_scores(self,
                                   static_issues: List[Dict],
                                   llm_results: Dict) -> Dict:
        """计算质量评分"""
        scores = {}

        # 基于问题严重程度计算基础分
        penalty = 0
        for issue in static_issues:
            severity = issue.get("severity", "low")
            penalty += self.severity_weights.get(severity, 1)

        # LLM 逻辑问题影响更大
        for issue in llm_results.get("logic_issues", []):
            severity = issue.get("severity", "medium")
            penalty += self.severity_weights.get(severity, 5) * 2

        # 基础分(满分 100)
        base_score = max(0, 100 - penalty)

        # 设计质量评分
        design_scores = llm_results.get("design_scores", {})
        avg_design_score = sum(design_scores.values()) / len(design_scores) if design_scores else 7

        # 文档质量评分
        doc_issues_count = len(llm_results.get("documentation_issues", []))
        doc_score = max(0, 10 - doc_issues_count)

        # 综合评分
        scores = {
            "overall": int((base_score * 0.5 + avg_design_score * 10 * 0.3 + doc_score * 2) / 100 * 100),
            "correctness": base_score,
            "design_quality": int(avg_design_score * 10),
            "documentation": doc_score,
            "maintainability": int((base_score + avg_design_score * 10) / 2)
        }

        return scores

    def _generate_summary(self,
                          categorized_issues: Dict,
                          quality_scores: Dict) -> str:
        """生成评审摘要"""
        total_issues = sum(len(issues) for issues in categorized_issues.values())
        high_severity = len([
            issue for issues in categorized_issues.values()
            for issue in issues
            if issue.get("severity") == "high"
        ])

        summary = f"代码评审完成。发现 {total_issues} 个问题,其中高严重性问题 {high_severity} 个。"
        summary += f"\n整体质量评分: {quality_scores.get('overall', 0)}/100"

        if high_severity > 0:
            summary += "\n建议优先处理高严重性问题。"

        return summary

    def _aggregate_suggestions(self, llm_results: Dict) -> List[str]:
        """聚合改进建议"""
        suggestions = []

        # 设计建议
        suggestions.extend(llm_results.get("design_suggestions", []))

        # 文档改进建议
        suggestions.extend(llm_results.get("documentation_improvements", []))

        return suggestions

结果聚合器负责将不同来源的分析结果整合到一起,并进行智能的去重和分类。它还计算多维度的质量评分,帮助开发者快速了解代码的整体质量状况。聚合器还会生成简洁明了的摘要和改进建议,让开发者能够快速抓住重点。

完整的代码评审 Agent

将所有组件组合起来,创建一个完整的代码评审 Agent:

import asyncio
from typing import Dict, List, Optional

class CodeReviewAgent:
    """代码评审 Agent"""

    def __init__(self, llm_api_key: str, llm_model: str = "gpt-4"):
        self.parser = CodeParser(language="python")
        self.static_analyzer = StaticAnalysisRules()
        self.llm_analyzer = LLMCodeAnalyzer(llm_api_key, llm_model)
        self.aggregator = ReviewResultAggregator()

    async def review_code(self,
                         code: str,
                         filename: str = "unknown.py") -> Dict:
        """评审代码"""
        print(f"开始评审代码: {filename}")

        # 1. 解析代码
        print("步骤 1/4: 解析代码...")
        code_info = self.parser.parse(code)
        if "error" in code_info:
            return {
                "error": code_info["error"],
                "filename": filename
            }

        # 2. 静态分析
        print("步骤 2/4: 执行静态分析...")
        static_issues = self.static_analyzer.run_all_rules(code_info)

        # 3. LLM 分析
        print("步骤 3/4: LLM 语义分析...")
        llm_results = await self.llm_analyzer.comprehensive_analysis(code)

        # 4. 结果聚合
        print("步骤 4/4: 聚合结果...")
        review_result = self.aggregator.aggregate_results(
            static_issues,
            llm_results
        )

        review_result["filename"] = filename
        review_result["code_info"] = {
            "functions": len(code_info.get("functions", [])),
            "classes": len(code_info.get("classes", [])),
            "lines": len(code_info.get("lines", []))
        }

        print(f"评审完成! 发现 {review_result.get('summary', '')}")
        return review_result

    def generate_markdown_report(self, review_result: Dict) -> str:
        """生成 Markdown 格式的评审报告"""
        report = f"""# 代码评审报告

**文件**: {review_result.get('filename', 'unknown')}
**评审时间**: {review_result.get('timestamp', 'Unknown')}

## 代码统计
- 函数数量: {review_result.get('code_info', {}).get('functions', 0)}
- 类数量: {review_result.get('code_info', {}).get('classes', 0)}
- 代码行数: {review_result.get('code_info', {}).get('lines', 0)}

## 评审摘要

{review_result.get('summary', '')}

## 质量评分

"""

        # 质量评分表格
        scores = review_result.get('quality_scores', {})
        report += "| 维度 | 评分 |\n"
        report += "|------|------|\n"
        for key, value in scores.items():
            report += f"| {key} | {value}/100 |\n"
        report += "\n"

        # 分类问题
        issues = review_result.get('issues', {})
        for category, category_issues in issues.items():
            if category_issues:
                report += f"## {category.replace('_', ' ').title()} ({len(category_issues)} 个问题)\n\n"

                for issue in category_issues:
                    severity_icon = {
                        "high": "🔴",
                        "medium": "🟡",
                        "low": "🟢"
                    }.get(issue.get("severity", "low"), "⚪")

                    report += f"### {severity_icon} {issue.get('message', 'No message')}\n\n"
                    report += f"- **位置**: {issue.get('location', 'Unknown')}\n"
                    report += f"- **严重程度**: {issue.get('severity', 'low')}\n"
                    report += f"- **来源**: {issue.get('source', 'Unknown')}\n"
                    report += f"- **建议**: {issue.get('suggestion', 'No suggestion')}\n\n"

        # 改进建议
        suggestions = review_result.get('improvement_suggestions', [])
        if suggestions:
            report += "## 改进建议\n\n"
            for i, suggestion in enumerate(suggestions, 1):
                report += f"{i}. {suggestion}\n"
            report += "\n"

        return report

# 使用示例
async def main():
    # 示例代码
    sample_code = '''
import os
import sys

def process_data(data_list, threshold):
    result = []
    for item in data_list:
        if item > threshold:
            result.append(item * 2)
        elif item < 0:
            result.append(0)
        else:
            result.append(item)
    return result

def save_to_file(filename, data):
    with open(filename, 'w') as f:
        for item in data:
            f.write(str(item) + '\\n')

def load_config():
    return {
        'max_size': 100,
        'api_key': 'sk-1234567890abcdef',
        'timeout': 30
    }

def complex_function(data):
    result = []
    for item in data:
        if item > 0:
            if item < 10:
                if item % 2 == 0:
                    result.append(item * 2)
                else:
                    result.append(item)
            else:
                if item % 3 == 0:
                    result.append(item * 3)
                else:
                    result.append(item)
        else:
            result.append(0)
    return result
'''

    # 创建评审 Agent
    agent = CodeReviewAgent(llm_api_key="your-api-key-here")

    # 执行评审
    result = await agent.review_code(sample_code, filename="example.py")

    # 生成报告
    report = agent.generate_markdown_report(result)
    print(report)

    # 保存报告
    with open('/Users/liuyutao/Desktop/workspace/code_review_report.md', 'w') as f:
        f.write(report)

if __name__ == "__main__":
    asyncio.run(main())

这个完整的代码评审 Agent 整合了代码解析、静态分析、LLM 语义分析和结果聚合等所有组件,提供端到端的代码评审功能。它能够生成详细的 Markdown 报告,帮助开发者全面了解代码质量状况和改进方向。

最佳实践与常见陷阱

静态分析规则配置

静态分析规则是代码评审 Agent 的基础,配置得当的规则能够准确捕获代码问题。团队应该根据自己的编码标准和技术栈定制规则集,避免一刀切的通用配置。

规则配置的一个常见陷阱是过于严格或过于宽松。过于严格的规则会产生大量误报,淹没真正的问题,导致开发者对 Agent 的结果失去信任。过于宽松的规则则无法提供有效的质量保证。最佳实践是逐步调整规则,根据团队的实际情况找到平衡点。

另一个重要考虑是规则的分类和优先级。不同类型的规则应该有不同的处理方式,比如安全类问题应该高优先级处理,而代码风格问题可以作为低优先级提醒。Agent 应该支持灵活的规则配置和调整,让团队能够根据项目的不同阶段调整评审标准。

LLM Prompt 工程

LLM 的分析质量很大程度上取决于 Prompt 的设计。好的 Prompt 应该明确、具体、有上下文,引导模型关注正确的方面。

Prompt 工程的一个关键原则是角色设定。给 LLM 分配明确的角色,比如"资深代码评审专家"或"软件架构师",能够引导模型采用合适的视角和方法。另一个重要原则是任务分解,将复杂的评审任务分解为多个子任务,比如逻辑评审、设计评审、文档评审,每个子任务有专门优化的 Prompt。

另一个常见陷阱是 Prompt 过于冗长或包含冲突的指令。简洁明确、目标一致的 Prompt 通常效果更好。同时,应该为 LLM 提供充足的上下文信息,比如代码的用途、相关的设计文档、项目的编码规范等,这些信息能帮助 LLM 做出更准确的判断。

结果过滤和误报处理

代码评审 Agent 难免会产生误报,如何有效过滤误报是一个重要挑战。完全避免误报是不现实的,但可以通过多种方法减少误报的影响。

一种方法是建立白名单机制,允许开发者标记已知的误报,避免重复报告。另一种方法是学习机制,Agent 可以从开发者的反馈中学习,逐步降低特定规则的误报率。第三种方法是置信度评分,为每个问题分配置信度分数,让开发者优先处理高置信度的问题。

误报处理的另一个考虑是问题关联性。多个相关的问题应该被合并为一个更大的问题,而不是零散地报告多个小问题。比如一个函数过长导致的多个相关问题,应该作为一个整体问题报告,并给出重构建议。

集成到开发流程

代码评审 Agent 的价值只有在集成到实际开发流程中才能充分体现。常见的集成点包括 Git 提交钩子、代码评审平台(如 GitHub Pull Request)、CI/CD 流水线等。

集成的一个关键考虑是性能。代码评审可能需要较长时间,特别是在使用 LLM 的情况下。应该根据场景选择合适的策略,比如对小型变更进行实时评审,对大型变更使用异步评审。另一个考虑是用户体验,评审结果应该以开发者友好的方式呈现,比如在代码编辑器中直接显示问题,或者在 Pull Request 中提供清晰的问题列表和修复建议。

集成的另一个最佳实践是渐进式引入。可以先从简单的静态分析规则开始,逐步增加 LLM 分析功能。同时,应该给开发者提供配置选项,允许他们根据自己的偏好调整评审的严格程度和范围。

性能优化考虑

静态分析性能优化

静态分析的性能直接影响 Agent 的响应速度。对于大型代码库,静态分析可能成为性能瓶颈。优化静态分析性能的几种方法包括增量分析、并行处理和缓存。

增量分析只分析变更的代码部分,而不是重新分析整个代码库。这需要维护代码的依赖关系和变更历史,但能够显著提升性能。并行处理则是将分析任务分解为多个子任务,利用多核 CPU 并行执行。缓存则是缓存分析结果,对于未变更的代码部分直接使用缓存结果。

另一个优化方向是规则执行的优化。不是所有规则都需要对所有代码执行,可以根据代码的类型、上下文智能选择适用的规则。比如安全规则更相关于用户输入处理代码,性能规则更相关于计算密集型代码。

LLM 调用优化

LLM 调用是 Agent 的另一个性能瓶颈。优化 LLM 调用的方法包括批处理、缓存、模型选择和 Prompt 压缩。

批处理是将多个小的代码分析请求合并为一个大的请求,减少 API 调用次数。缓存则是缓存常见的代码模式和分析结果,对于相似代码直接使用缓存结果。模型选择是根据任务的复杂程度选择合适的模型,比如简单的规则检查使用较小的模型,复杂的语义分析使用较大的模型。Prompt 压缩则是减少 Prompt 的长度,只包含必要的信息,减少 token 使用量和响应时间。

另一个优化方向是异步处理。LLM 调用可以异步执行,不阻塞其他任务。对于大型代码评审,可以分批处理,边分析边返回结果,而不是等待所有分析完成才返回。

结果缓存和增量更新

代码评审的结果应该被缓存,避免重复分析。同时,应该支持增量更新,只重新分析变更的部分。

缓存的设计需要考虑缓存键和失效策略。缓存键可以基于代码的哈希值、文件路径和时间戳等。失效策略可以是时间失效、内容失效或手动失效。增量更新则需要跟踪代码的变更历史,只分析变更的代码部分。

另一个考虑是缓存的大小管理。代码评审的结果可能占用大量存储空间,需要定期清理过期的缓存,或者使用 LRU 策略管理缓存大小。

成本优化

LLM 调用的成本是代码评审 Agent 运行成本的重要组成部分。优化成本的方法包括智能规则选择、代码预处理和结果复用。

智能规则选择是只在必要时使用 LLM 分析,对于能够通过静态分析解决的问题就不调用 LLM。代码预处理是在调用 LLM 之前对代码进行简化,比如移除注释、格式化代码、提取关键部分等,减少 token 使用量。结果复用是缓存 LLM 的分析结果,对于相似代码复用分析结果。

另一个成本优化方向是使用更经济的模型。对于简单的分析任务,可以使用较小的模型;对于复杂的语义分析,才使用较大的模型。同时,可以考虑使用开源模型或自部署模型,降低调用成本。

参考资源

官方文档和工具

学术论文和研究

  • "Automated Code Review with Machine Learning" - 探讨机器学习在代码评审中的应用
  • "Large Language Models for Code Understanding" - 研究 LLM 在代码理解中的能力
  • "Semantic Code Analysis using Transformer Models" - 基于 Transformer 的代码语义分析

实践指南

通过合理的设计和实现,代码评审 Agent 能够显著提升代码评审的效率和质量,成为开发团队不可或缺的质量把关工具。