多Agent协作模式深入解析

多Agent协作是实现复杂AI应用的核心机制。本文深入探讨四种主流的协作模式:监督者-工作者模式、辩论模式、审查模式和流水线模式,分析各自的适用场景和实现细节。

多Agent协作模式深入解析

在多Agent系统中,协作模式决定了Agent之间如何交互、协调和共同完成任务。选择合适的协作模式对于系统性能、可靠性和可维护性至关重要。不同的协作模式适用于不同的应用场景,理解它们的特性和实现细节是设计高效多Agent系统的关键。

概览与动机

随着AI应用复杂度的提升,单个Agent难以应对多样化的任务需求。多Agent协作通过让多个专业化的Agent协同工作,能够实现比单Agent更强大的智能和更高效的任务执行。然而,Agent之间的协作并非自动发生的,需要设计合理的协作模式来指导Agent之间的交互和协调。

协作模式定义了Agent之间的角色分工、通信协议、决策机制和结果聚合方式。一个好的协作模式能够:

  • 明确各Agent的职责边界
  • 优化任务执行流程
  • 提高系统容错能力
  • 增强整体智能水平
  • 便于系统扩展和维护

本文将深入分析四种主流的协作模式:监督者-工作者模式、辩论模式、审查模式和流水线模式。每种模式都有其独特的优势和适用场景,通过代码示例和实际案例,我们将展示如何在Python中实现这些协作模式。

核心概念与架构设计

协作模式分类框架

多Agent协作模式可以从多个维度进行分类:

Rendering diagram...

模式选择决策树

选择合适的协作模式需要考虑多个因素:

Rendering diagram...

监督者-工作者模式

监督者-工作者模式是最常见的协作模式,包含一个中央协调器(监督者)和多个工作Agent(工作者)。监督者负责任务分解、分配和结果聚合,工作者专注于具体任务的执行。

核心特点

  • 集中式控制,易于管理和监控
  • 职责清晰,各司其职
  • 灵活的任务分配策略
  • 适合可分解的复杂任务

适用场景

  • 大规模数据处理
  • 并行计算任务
  • 需要全局最优的任务
  • 资源管理和调度

辩论模式

辩论模式允许多个Agent从不同角度分析同一问题,通过观点碰撞和辩论达成更好的决策。

核心特点

  • 多角度视角分析
  • 观点碰撞和论证
  • 共识达成机制
  • 决策质量高

适用场景

  • 复杂决策问题
  • 需要多维度分析的任务
  • 风险评估和预测
  • 战略规划

审查模式

审查模式将任务生成和任务审查分离,通过独立的质量检查确保输出质量。

核心特点

  • 生成-审查分离
  • 多轮迭代优化
  • 质量保证机制
  • 错误检测和纠正

适用场景

  • 代码生成和审查
  • 文档质量检查
  • 内容审核
  • 风险控制

流水线模式

流水线模式将任务分解为多个顺序步骤,每个步骤由专门的Agent负责处理。

核心特点

  • 顺序处理流程
  • 专业化分工
  • 流水线优化
  • 吞吐量高

适用场景

  • 数据处理流水线
  • 内容创作流程
  • 软件开发流程
  • 客户服务流程

关键技术实现

监督者-工作者模式实现

import asyncio
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import logging
from datetime import datetime
import json
import hashlib

class WorkerStatus(Enum):
    """工作者状态"""
    IDLE = "idle"
    WORKING = "working"
    ERROR = "error"

@dataclass
class WorkerTask:
    """工作者任务"""
    task_id: str
    worker_id: str
    data: Dict[str, Any]
    priority: int = 0
    status: str = "pending"
    result: Optional[Any] = None
    error: Optional[str] = None
    created_at: datetime = field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

class WorkerAgent:
    """工作者Agent基类"""
    
    def __init__(self, worker_id: str, capabilities: List[str], 
                 processing_capacity: int = 1):
        self.worker_id = worker_id
        self.capabilities = capabilities
        self.processing_capacity = processing_capacity
        self.status = WorkerStatus.IDLE
        self.current_tasks: List[WorkerTask] = []
        self.completed_tasks: List[WorkerTask] = []
        self.logger = logging.getLogger(f"Worker.{worker_id}")
        self.performance_stats = {
            'total_tasks': 0,
            'successful_tasks': 0,
            'failed_tasks': 0,
            'average_processing_time': 0.0
        }
        
    def can_handle(self, task_type: str) -> bool:
        """检查是否能处理指定类型的任务"""
        return task_type in self.capabilities
        
    def is_available(self) -> bool:
        """检查是否有空闲处理能力"""
        return (self.status == WorkerStatus.IDLE and 
                len(self.current_tasks) < self.processing_capacity)
        
    async def process_task(self, task: WorkerTask) -> Any:
        """处理任务,子类实现"""
        raise NotImplementedError("Subclasses must implement process_task")
        
    async def execute_task(self, task: WorkerTask) -> None:
        """执行任务"""
        try:
            self.status = WorkerStatus.WORKING
            task.status = "running"
            task.started_at = datetime.now()
            
            self.current_tasks.append(task)
            
            # 执行任务
            result = await asyncio.wait_for(
                self.process_task(task),
                timeout=300  # 5分钟超时
            )
            
            task.result = result
            task.status = "completed"
            task.completed_at = datetime.now()
            
            # 更新性能统计
            self.performance_stats['total_tasks'] += 1
            self.performance_stats['successful_tasks'] += 1
            processing_time = (task.completed_at - task.started_at).total_seconds()
            self._update_average_time(processing_time)
            
            self.logger.info(f"Task {task.task_id} completed successfully")
            
        except asyncio.TimeoutError:
            task.status = "timeout"
            task.error = "Task processing timeout"
            self._handle_task_failure(task)
            
        except Exception as e:
            task.status = "error"
            task.error = str(e)
            self._handle_task_failure(task)
            
        finally:
            self.current_tasks.remove(task)
            self.completed_tasks.append(task)
            if not self.current_tasks:
                self.status = WorkerStatus.IDLE
                
    def _handle_task_failure(self, task: WorkerTask) -> None:
        """处理任务失败"""
        self.performance_stats['total_tasks'] += 1
        self.performance_stats['failed_tasks'] += 1
        self.logger.error(f"Task {task.task_id} failed: {task.error}")
        
    def _update_average_time(self, new_time: float) -> None:
        """更新平均处理时间"""
        total = self.performance_stats['total_tasks']
        current_avg = self.performance_stats['average_processing_time']
        self.performance_stats['average_processing_time'] = (
            (current_avg * (total - 1) + new_time) / total
        )

class SupervisorAgent:
    """监督者Agent"""
    
    def __init__(self, supervisor_id: str):
        self.supervisor_id = supervisor_id
        self.workers: Dict[str, WorkerAgent] = {}
        self.task_queue = asyncio.Queue()
        self.completed_tasks: Dict[str, WorkerTask] = {}
        self.failed_tasks: Dict[str, WorkerTask] = {}
        self.logger = logging.getLogger(f"Supervisor.{supervisor_id}")
        self._running = False
        self.task_dispatcher = None
        
    def register_worker(self, worker: WorkerAgent) -> None:
        """注册工作者"""
        self.workers[worker.worker_id] = worker
        self.logger.info(f"Registered worker: {worker.worker_id}")
        
    def unregister_worker(self, worker_id: str) -> None:
        """注销工作者"""
        if worker_id in self.workers:
            del self.workers[worker_id]
            self.logger.info(f"Unregistered worker: {worker_id}")
            
    async def submit_task(self, task_type: str, data: Dict[str, Any], 
                         priority: int = 0) -> str:
        """提交任务"""
        task_id = self._generate_task_id(task_type, data)
        task = WorkerTask(
            task_id=task_id,
            worker_id="",  # 将由分配器设置
            data=data,
            priority=priority
        )
        
        await self.task_queue.put((task_type, task))
        self.logger.info(f"Task submitted: {task_id}")
        return task_id
        
    def _generate_task_id(self, task_type: str, data: Dict[str, Any]) -> str:
        """生成任务ID"""
        content = json.dumps({
            'type': task_type,
            'data': data,
            'timestamp': datetime.now().isoformat()
        })
        return hashlib.md5(content.encode()).hexdigest()[:12]
        
    async def find_available_worker(self, task_type: str) -> Optional[WorkerAgent]:
        """查找可用的工作者"""
        available_workers = [
            worker for worker in self.workers.values()
            if worker.can_handle(task_type) and worker.is_available()
        ]
        
        if not available_workers:
            return None
            
        # 选择负载最低的工作者
        return min(available_workers, key=lambda w: len(w.current_tasks))
        
    async def dispatch_tasks(self) -> None:
        """任务分发循环"""
        while self._running:
            try:
                task_type, task = await asyncio.wait_for(
                    self.task_queue.get(),
                    timeout=1.0
                )
                
                worker = await self.find_available_worker(task_type)
                
                if worker:
                    task.worker_id = worker.worker_id
                    asyncio.create_task(worker.execute_task(task))
                    self.logger.info(f"Task {task.task_id} dispatched to {worker.worker_id}")
                else:
                    # 没有可用工作者,重新放回队列
                    await self.task_queue.put((task_type, task))
                    await asyncio.sleep(1)  # 等待一秒再试
                    
            except asyncio.TimeoutError:
                continue
                
    async def start(self) -> None:
        """启动监督者"""
        self._running = True
        self.task_dispatcher = asyncio.create_task(self.dispatch_tasks())
        self.logger.info("Supervisor started")
        
    async def stop(self) -> None:
        """停止监督者"""
        self._running = False
        if self.task_dispatcher:
            self.task_dispatcher.cancel()
            try:
                await self.task_dispatcher
            except asyncio.CancelledError:
                pass
        self.logger.info("Supervisor stopped")
        
    def get_system_status(self) -> Dict[str, Any]:
        """获取系统状态"""
        total_workers = len(self.workers)
        active_workers = sum(
            1 for w in self.workers.values() if w.status == WorkerStatus.WORKING
        )
        
        total_completed_tasks = sum(
            len(w.completed_tasks) for w in self.workers.values()
        )
        
        return {
            'supervisor_id': self.supervisor_id,
            'total_workers': total_workers,
            'active_workers': active_workers,
            'idle_workers': total_workers - active_workers,
            'pending_tasks': self.task_queue.qsize(),
            'completed_tasks': total_completed_tasks,
            'worker_stats': {
                worker_id: {
                    'status': worker.status.value,
                    'current_tasks': len(worker.current_tasks),
                    'completed_tasks': len(worker.completed_tasks),
                    'performance': worker.performance_stats
                }
                for worker_id, worker in self.workers.items()
            }
        }
        
    async def aggregate_results(self, task_ids: List[str]) -> Dict[str, Any]:
        """聚合任务结果"""
        results = []
        errors = []
        
        for task_id in task_ids:
            # 在所有工作者的已完成任务中查找
            for worker in self.workers.values():
                for task in worker.completed_tasks:
                    if task.task_id == task_id:
                        if task.status == "completed":
                            results.append(task.result)
                        else:
                            errors.append({
                                'task_id': task_id,
                                'error': task.error
                            })
                        break
                        
        return {
            'successful_tasks': len(results),
            'failed_tasks': len(errors),
            'results': results,
            'errors': errors
        }

辩论模式实现

from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import asyncio
import logging

@dataclass
class DebatePosition:
    """辩论立场"""
    position_id: str
    agent_id: str
    viewpoint: str
    arguments: List[str]
    supporting_evidence: List[Dict[str, Any]]
    confidence: float
    timestamp: datetime

class DebateAgent:
    """辩论Agent"""
    
    def __init__(self, agent_id: str, perspective: str):
        self.agent_id = agent_id
        self.perspective = perspective
        self.logger = logging.getLogger(f"DebateAgent.{agent_id}")
        
    async def analyze_issue(self, issue: str, context: Dict[str, Any]) -> DebatePosition:
        """分析问题并生成立场"""
        self.logger.info(f"Analyzing issue from {self.perspective} perspective")
        
        # 生成分析观点
        viewpoint = await self._generate_viewpoint(issue, context)
        arguments = await self._generate_arguments(issue, viewpoint)
        evidence = await self._gather_evidence(issue, arguments)
        confidence = await self._assess_confidence(viewpoint, evidence)
        
        return DebatePosition(
            position_id=f"{self.agent_id}_{datetime.now().timestamp()}",
            agent_id=self.agent_id,
            viewpoint=viewpoint,
            arguments=arguments,
            supporting_evidence=evidence,
            confidence=confidence,
            timestamp=datetime.now()
        )
        
    async def _generate_viewpoint(self, issue: str, 
                                  context: Dict[str, Any]) -> str:
        """生成观点"""
        await asyncio.sleep(1.0)  # 模拟分析过程
        
        if self.perspective == "optimistic":
            return f"从积极角度看,{issue}提供了巨大的机会和价值"
        elif self.perspective == "pessimistic":
            return f"从风险角度看,{issue}存在严重的潜在问题"
        elif self.perspective == "technical":
            return f"从技术角度看,{issue}需要考虑架构、性能和可扩展性"
        else:
            return f"从{self.perspective}角度分析{issue}"
            
    async def _generate_arguments(self, issue: str, 
                                  viewpoint: str) -> List[str]:
        """生成论证"""
        await asyncio.sleep(0.8)
        
        return [
            f"关于{issue},第一,{viewpoint}",
            f"第二,相关因素包括{self.perspective}相关的多个方面",
            f"第三,基于经验和数据分析,我们的判断是..."
        ]
        
    async def _gather_evidence(self, issue: str, 
                               arguments: List[str]) -> List[Dict[str, Any]]:
        """收集证据"""
        await asyncio.sleep(0.5)
        
        return [
            {
                'source': '历史数据',
                'content': f"关于{issue}的历史数据显示相关趋势",
                'relevance': 0.85
            },
            {
                'source': '专家意见',
                'content': f"领域专家对{issue}的观点支持当前立场",
                'relevance': 0.78
            }
        ]
        
    async def _assess_confidence(self, viewpoint: str, 
                                 evidence: List[Dict[str, Any]]) -> float:
        """评估信心度"""
        await asyncio.sleep(0.3)
        
        # 基于证据相关性和数量计算信心度
        avg_relevance = sum(e['relevance'] for e in evidence) / len(evidence)
        return min(avg_relevance * 0.9 + 0.1, 1.0)
        
    async def respond_to_counterarguments(self, counterarguments: List[str],
                                         current_position: DebatePosition) -> DebatePosition:
        """回应反方论点"""
        self.logger.info("Formulating counterarguments")
        
        # 分析反方论点并生成回应
        responses = []
        for counterarg in counterarguments:
            response = await self._generate_response(counterarg)
            responses.append(response)
            
        # 更新立场
        updated_position = DebatePosition(
            position_id=f"{self.agent_id}_{datetime.now().timestamp()}",
            agent_id=self.agent_id,
            viewpoint=current_position.viewpoint,
            arguments=current_position.arguments + responses,
            supporting_evidence=current_position.supporting_evidence,
            confidence=current_position.confidence * 0.95,  # 稍微降低信心度
            timestamp=datetime.now()
        )
        
        return updated_position
        
    async def _generate_response(self, counterargument: str) -> str:
        """生成回应"""
        await asyncio.sleep(0.7)
        return f"针对'{counterargument}'的反驳,我们的立场是..."

class DebateModerator:
    """辩论主持人"""
    
    def __init__(self, moderator_id: str):
        self.moderator_id = moderator_id
        self.agents: Dict[str, DebateAgent] = {}
        self.debate_history: List[Dict[str, Any]] = []
        self.logger = logging.getLogger(f"DebateModerator.{moderator_id}")
        
    def register_agent(self, agent: DebateAgent) -> None:
        """注册辩论Agent"""
        self.agents[agent.agent_id] = agent
        self.logger.info(f"Registered debate agent: {agent.agent_id}")
        
    async def conduct_debate(self, issue: str, 
                            context: Dict[str, Any],
                            max_rounds: int = 3) -> Dict[str, Any]:
        """主持辩论"""
        self.logger.info(f"Starting debate on: {issue}")
        
        # 第一轮:各Agent发表初始立场
        positions = {}
        for agent_id, agent in self.agents.items():
            position = await agent.analyze_issue(issue, context)
            positions[agent_id] = position
            
        self.debate_history.append({
            'round': 1,
            'positions': positions,
            'timestamp': datetime.now().isoformat()
        })
        
        # 后续轮:互相辩论
        for round_num in range(2, max_rounds + 1):
            self.logger.info(f"Starting debate round {round_num}")
            
            # 每个Agent回应其他Agent的论点
            for agent_id, agent in self.agents.items():
                other_positions = [
                    pos for other_id, pos in positions.items() 
                    if other_id != agent_id
                ]
                
                counterarguments = []
                for other_pos in other_positions:
                    counterarguments.extend(other_pos.arguments)
                    
                updated_position = await agent.respond_to_counterarguments(
                    counterarguments, positions[agent_id]
                )
                positions[agent_id] = updated_position
                
            self.debate_history.append({
                'round': round_num,
                'positions': positions,
                'timestamp': datetime.now().isoformat()
            })
            
            # 检查是否达成共识
            if await self._check_consensus(positions):
                self.logger.info("Consensus reached, ending debate")
                break
                
        # 总结辩论结果
        return await self._summarize_debate(issue, positions)
        
    async def _check_consensus(self, positions: Dict[str, DebatePosition]) -> bool:
        """检查是否达成共识"""
        if len(positions) < 2:
            return True
            
        # 计算观点相似度(简化版本)
        positions_list = list(positions.values())
        first_position = positions_list[0]
        
        for position in positions_list[1:]:
            # 简化:如果信心度都很高,认为达成共识
            if position.confidence > 0.8 and first_position.confidence > 0.8:
                continue
            else:
                return False
                
        return True
        
    async def _summarize_debate(self, issue: str, 
                               positions: Dict[str, DebatePosition]) -> Dict[str, Any]:
        """总结辩论结果"""
        self.logger.info("Summarizing debate results")
        
        # 分析最终立场
        positions_list = list(positions.values())
        
        # 找出最强立场
        strongest_position = max(positions_list, key=lambda p: p.confidence)
        
        # 汇总所有论点
        all_arguments = []
        for position in positions_list:
            all_arguments.extend(position.arguments)
            
        # 生成综合结论
        conclusion = await self._generate_conclusion(issue, positions_list)
        
        return {
            'issue': issue,
            'strongest_position': {
                'agent_id': strongest_position.agent_id,
                'viewpoint': strongest_position.viewpoint,
                'confidence': strongest_position.confidence
            },
            'all_arguments': all_arguments,
            'conclusion': conclusion,
            'debate_rounds': len(self.debate_history),
            'timestamp': datetime.now().isoformat()
        }
        
    async def _generate_conclusion(self, issue: str, 
                                  positions: List[DebatePosition]) -> str:
        """生成综合结论"""
        await asyncio.sleep(1.0)
        
        if len(positions) == 0:
            return "无法形成结论"
            
        # 简化版本:基于最高信心度的立场
        strongest = max(positions, key=lambda p: p.confidence)
        
        return f"经过多轮辩论,综合各Agent的观点,关于'{issue}'的结论是:{strongest.viewpoint},信心度:{strongest.confidence:.2f}"

审查模式实现

@dataclass
class ReviewResult:
    """审查结果"""
    reviewer_id: str
    content: Any
    approved: bool
    comments: List[str]
    suggestions: List[str]
    score: float
    timestamp: datetime

class GeneratorAgent:
    """生成者Agent"""
    
    def __init__(self, agent_id: str, capability: str):
        self.agent_id = agent_id
        self.capability = capability
        self.logger = logging.getLogger(f"GeneratorAgent.{agent_id}")
        
    async def generate(self, request: Dict[str, Any]) -> Any:
        """生成内容"""
        self.logger.info(f"Generating {self.capability}")
        await asyncio.sleep(1.5)  # 模拟生成过程
        
        # 简化版本:返回示例内容
        return {
            'content': f"生成的{self.capability}内容",
            'metadata': {
                'generator_id': self.agent_id,
                'timestamp': datetime.now().isoformat()
            }
        }

class ReviewerAgent:
    """审查者Agent"""
    
    def __init__(self, agent_id: str, review_criteria: List[str]):
        self.agent_id = agent_id
        self.review_criteria = review_criteria
        self.logger = logging.getLogger(f"ReviewerAgent.{agent_id}")
        
    async def review(self, content: Any, context: Dict[str, Any]) -> ReviewResult:
        """审查内容"""
        self.logger.info(f"Reviewing content")
        
        # 基于审查标准进行评估
        comments = []
        suggestions = []
        score = 0.0
        
        for criterion in self.review_criteria:
            criterion_result = await self._evaluate_criterion(
                content, criterion
            )
            comments.append(criterion_result['comment'])
            suggestions.extend(criterion_result['suggestions'])
            score += criterion_result['score']
            
        # 计算最终分数
        final_score = score / len(self.review_criteria)
        approved = final_score >= 0.7
        
        return ReviewResult(
            reviewer_id=self.agent_id,
            content=content,
            approved=approved,
            comments=comments,
            suggestions=suggestions,
            score=final_score,
            timestamp=datetime.now()
        )
        
    async def _evaluate_criterion(self, content: Any, 
                                 criterion: str) -> Dict[str, Any]:
        """评估单一标准"""
        await asyncio.sleep(0.3)
        
        # 简化版本:模拟评估
        return {
            'comment': f"关于'{criterion}'的审查结果",
            'suggestions': [f"改进建议:{criterion}"],
            'score': 0.8
        }

class ReviewOrchestrator:
    """审查编排器"""
    
    def __init__(self, orchestrator_id: str):
        self.orchestrator_id = orchestrator_id
        self.generators: Dict[str, GeneratorAgent] = {}
        self.reviewers: Dict[str, ReviewerAgent] = {}
        self.review_history: List[Dict[str, Any]] = []
        self.logger = logging.getLogger(f"ReviewOrchestrator.{orchestrator_id}")
        
    def register_generator(self, generator: GeneratorAgent) -> None:
        """注册生成者"""
        self.generators[generator.agent_id] = generator
        self.logger.info(f"Registered generator: {generator.agent_id}")
        
    def register_reviewer(self, reviewer: ReviewerAgent) -> None:
        """注册审查者"""
        self.reviewers[reviewer.agent_id] = reviewer
        self.logger.info(f"Registered reviewer: {reviewer.agent_id}")
        
    async def generate_and_review(self, request: Dict[str, Any],
                                  max_iterations: int = 3) -> Dict[str, Any]:
        """生成并审查内容"""
        generator_id = request.get('generator_id')
        content_request = request.get('content_request', {})
        
        if generator_id not in self.generators:
            raise ValueError(f"Unknown generator: {generator_id}")
            
        generator = self.generators[generator_id]
        
        # 迭代生成和审查
        for iteration in range(max_iterations):
            self.logger.info(f"Iteration {iteration + 1}/{max_iterations}")
            
            # 生成内容
            generated_content = await generator.generate(content_request)
            
            # 并行审查
            review_tasks = [
                reviewer.review(generated_content, request)
                for reviewer in self.reviewers.values()
            ]
            
            review_results = await asyncio.gather(*review_tasks)
            
            # 分析审查结果
            all_approved = all(result.approved for result in review_results)
            average_score = sum(result.score for result in review_results) / len(review_results)
            
            self.logger.info(f"Iteration {iteration + 1}: Score={average_score:.2f}, Approved={all_approved}")
            
            # 记录审查历史
            self.review_history.append({
                'iteration': iteration + 1,
                'generated_content': generated_content,
                'review_results': review_results,
                'average_score': average_score,
                'all_approved': all_approved,
                'timestamp': datetime.now().isoformat()
            })
            
            # 如果所有审查都通过,返回结果
            if all_approved:
                return {
                    'success': True,
                    'final_content': generated_content,
                    'review_results': review_results,
                    'iterations': iteration + 1,
                    'final_score': average_score
                }
                
            # 否则,基于审查反馈改进生成
            content_request = await self._improve_based_on_feedback(
                content_request, review_results
            )
            
        # 达到最大迭代次数仍未通过
        return {
            'success': False,
            'final_content': generated_content,
            'review_results': review_results,
            'iterations': max_iterations,
            'final_score': average_score,
            'error': 'Maximum iterations reached without approval'
        }
        
    async def _improve_based_on_feedback(self, original_request: Dict[str, Any],
                                         review_results: List[ReviewResult]) -> Dict[str, Any]:
        """基于审查反馈改进请求"""
        self.logger.info("Improving based on review feedback")
        
        # 收集所有建议
        all_suggestions = []
        for result in review_results:
            all_suggestions.extend(result.suggestions)
            
        # 改进原始请求
        improved_request = original_request.copy()
        improved_request['improvement_suggestions'] = all_suggestions
        improved_request['iteration_context'] = 'improvement'
        
        return improved_request
        
    def get_review_statistics(self) -> Dict[str, Any]:
        """获取审查统计"""
        if not self.review_history:
            return {
                'total_reviews': 0,
                'average_iterations': 0,
                'success_rate': 0.0
            }
            
        successful_reviews = sum(
            1 for history in self.review_history
            if history['all_approved']
        )
        
        total_iterations = len(self.review_history)
        
        return {
            'total_reviews': total_iterations,
            'successful_reviews': successful_reviews,
            'success_rate': successful_reviews / total_iterations if total_iterations > 0 else 0.0,
            'average_iterations': sum(
                h['iteration'] for h in self.review_history
            ) / total_iterations,
            'average_score': sum(
                h['average_score'] for h in self.review_history
            ) / total_iterations
        }

流水线模式实现

class PipelineAgent:
    """流水线Agent"""
    
    def __init__(self, agent_id: str, stage_name: str, 
                 processing_function: Callable):
        self.agent_id = agent_id
        self.stage_name = stage_name
        self.processing_function = processing_function
        self.input_queue = asyncio.Queue()
        self.output_queue = asyncio.Queue()
        self.logger = logging.getLogger(f"PipelineAgent.{agent_id}")
        self._running = False
        self.stats = {
            'processed_items': 0,
            'failed_items': 0,
            'average_processing_time': 0.0
        }
        
    async def start(self) -> None:
        """启动流水线Agent"""
        self._running = True
        asyncio.create_task(self._process_loop())
        self.logger.info(f"Pipeline agent {self.agent_id} started")
        
    async def stop(self) -> None:
        """停止流水线Agent"""
        self._running = False
        self.logger.info(f"Pipeline agent {self.agent_id} stopped")
        
    async def _process_loop(self) -> None:
        """处理循环"""
        while self._running:
            try:
                item = await asyncio.wait_for(
                    self.input_queue.get(),
                    timeout=1.0
                )
                
                start_time = datetime.now()
                
                try:
                    # 处理项目
                    processed_item = await self.processing_function(item)
                    
                    # 传递到下一阶段
                    await self.output_queue.put(processed_item)
                    
                    # 更新统计
                    self.stats['processed_items'] += 1
                    processing_time = (datetime.now() - start_time).total_seconds()
                    self._update_avg_time(processing_time)
                    
                except Exception as e:
                    self.logger.error(f"Error processing item: {e}")
                    self.stats['failed_items'] += 1
                    
            except asyncio.TimeoutError:
                continue
                
    def _update_avg_time(self, new_time: float) -> None:
        """更新平均处理时间"""
        processed = self.stats['processed_items']
        current_avg = self.stats['average_processing_time']
        self.stats['average_processing_time'] = (
            (current_avg * (processed - 1) + new_time) / processed
        )
        
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            'agent_id': self.agent_id,
            'stage_name': self.stage_name,
            'input_queue_size': self.input_queue.qsize(),
            'output_queue_size': self.output_queue.qsize(),
            'stats': self.stats.copy()
        }

class PipelineOrchestrator:
    """流水线编排器"""
    
    def __init__(self, orchestrator_id: str):
        self.orchestrator_id = orchestr_id
        self.stages: List[PipelineAgent] = []
        self.logger = logging.getLogger(f"PipelineOrchestrator.{orchestrator_id}")
        self._running = False
        self.input_queue = None
        self.output_queue = None
        
    def add_stage(self, agent: PipelineAgent) -> None:
        """添加流水线阶段"""
        if self.stages:
            # 连接到上一个阶段的输出队列
            previous_stage = self.stages[-1]
            agent.input_queue = previous_stage.output_queue
        self.stages.append(agent)
        self.logger.info(f"Added stage: {agent.stage_name}")
        
    async def start(self) -> None:
        """启动流水线"""
        if not self.stages:
            raise ValueError("No stages configured")
            
        self.input_queue = self.stages[0].input_queue
        self.output_queue = self.stages[-1].output_queue
        
        # 启动所有阶段
        for stage in self.stages:
            await stage.start()
            
        self._running = True
        self.logger.info("Pipeline started")
        
    async def stop(self) -> None:
        """停止流水线"""
        self._running = False
        for stage in self.stages:
            await stage.stop()
        self.logger.info("Pipeline stopped")
        
    async def process_item(self, item: Any) -> Any:
        """处理单个项目"""
        if not self._running:
            raise RuntimeError("Pipeline not running")
            
        await self.input_queue.put(item)
        
        # 等待项目处理完成
        try:
            result = await asyncio.wait_for(
                self.output_queue.get(),
                timeout=300  # 5分钟超时
            )
            return result
        except asyncio.TimeoutError:
            raise TimeoutError("Pipeline processing timeout")
            
    async def process_batch(self, items: List[Any]) -> List[Any]:
        """批量处理项目"""
        tasks = [
            self.process_item(item) for item in items
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理异常
        successful_results = []
        errors = []
        
        for result in results:
            if isinstance(result, Exception):
                errors.append(result)
            else:
                successful_results.append(result)
                
        return {
            'successful_results': successful_results,
            'errors': errors,
            'total_items': len(items),
            'success_count': len(successful_results),
            'error_count': len(errors)
        }
        
    def get_pipeline_status(self) -> Dict[str, Any]:
        """获取流水线状态"""
        return {
            'orchestrator_id': self.orchestrator_id,
            'running': self._running,
            'total_stages': len(self.stages),
            'stages': [stage.get_stats() for stage in self.stages],
            'input_queue_size': self.input_queue.qsize() if self.input_queue else 0,
            'output_queue_size': self.output_queue.qsize() if self.output_queue else 0
        }

综合协作示例

async def comprehensive_collaboration_demo():
    """综合协作演示"""
    
    # 初始化日志
    logging.basicConfig(level=logging.INFO)
    
    # 1. 监督者-工作者模式示例
    print("=== 监督者-工作者模式演示 ===")
    supervisor = SupervisorAgent("supervisor_1")
    
    # 创建专业工作者
    data_worker = WorkerAgent("data_worker", ["data_processing", "data_analysis"])
    code_worker = WorkerAgent("code_worker", ["code_generation", "code_optimization"])
    test_worker = WorkerAgent("test_worker", ["unit_testing", "integration_testing"])
    
    supervisor.register_worker(data_worker)
    supervisor.register_worker(code_worker)
    supervisor.register_worker(test_worker)
    
    await supervisor.start()
    
    # 提交任务
    task1_id = await supervisor.submit_task("data_processing", {
        'data': "sample_data",
        'operation': "transform"
    })
    
    task2_id = await supervisor.submit_task("code_generation", {
        'description': "创建数据处理函数",
        'language': "python"
    })
    
    # 等待任务完成
    await asyncio.sleep(5)
    
    print(f"监督者-工作者模式状态: {supervisor.get_system_status()}")
    
    await supervisor.stop()
    
    # 2. 辩论模式示例
    print("\n=== 辩论模式演示 ===")
    debate_moderator = DebateModerator("moderator_1")
    
    # 创建不同视角的辩论Agent
    optimistic_agent = DebateAgent("agent_optimistic", "optimistic")
    pessimistic_agent = DebateAgent("agent_pessimistic", "pessimistic")
    technical_agent = DebateAgent("agent_technical", "technical")
    
    debate_moderator.register_agent(optimistic_agent)
    debate_moderator.register_agent(pessimistic_agent)
    debate_moderator.register_agent(technical_agent)
    
    # 主持辩论
    debate_issue = "是否应该采用新的AI技术栈"
    debate_result = await debate_moderator.conduct_debate(
        debate_issue,
        context={'project_type': '大型企业应用', 'team_size': 20},
        max_rounds=2
    )
    
    print(f"辩论结果: {debate_result['conclusion']}")
    print(f"最强立场: {debate_result['strongest_position']}")
    
    # 3. 审查模式示例
    print("\n=== 审查模式演示 ===")
    review_orchestrator = ReviewOrchestrator("review_orch_1")
    
    # 创建生成者和审查者
    code_generator = GeneratorAgent("code_gen_1", "代码生成")
    security_reviewer = ReviewerAgent("sec_rev_1", ["安全性", "漏洞检测"])
    quality_reviewer = ReviewerAgent("quality_rev_1", ["代码质量", "可维护性"])
    performance_reviewer = ReviewerAgent("perf_rev_1", ["性能", "效率"])
    
    review_orchestrator.register_generator(code_generator)
    review_orchestrator.register_reviewer(security_reviewer)
    review_orchestrator.register_reviewer(quality_reviewer)
    review_orchestrator.register_reviewer(performance_reviewer)
    
    # 生成和审查
    code_request = {
        'generator_id': 'code_gen_1',
        'content_request': {
            'type': 'api_endpoint',
            'requirements': ['安全性', '高性能']
        }
    }
    
    review_result = await review_orchestrator.generate_and_review(code_request)
    
    print(f"审查成功: {review_result['success']}")
    print(f"最终分数: {review_result.get('final_score', 0):.2f}")
    print(f"迭代次数: {review_result.get('iterations', 0)}")
    
    print(f"审查统计: {review_orchestrator.get_review_statistics()}")
    
    # 4. 流水线模式示例
    print("\n=== 流水线模式演示 ===")
    pipeline_orchestrator = PipelineOrchestrator("pipeline_orch_1")
    
    # 定义流水线处理函数
    async def data_extraction_stage(item):
        """数据提取阶段"""
        await asyncio.sleep(0.5)
        return {**item, 'extracted': True, 'stage': 'extraction'}
    
    async def data_transformation_stage(item):
        """数据转换阶段"""
        await asyncio.sleep(0.8)
        return {**item, 'transformed': True, 'stage': 'transformation'}
    
    async def data_validation_stage(item):
        """数据验证阶段"""
        await asyncio.sleep(0.3)
        return {**item, 'validated': True, 'stage': 'validation'}
    
    # 创建流水线阶段
    extraction_agent = PipelineAgent("stage_1", "extraction", data_extraction_stage)
    transformation_agent = PipelineAgent("stage_2", "transformation", data_transformation_stage)
    validation_agent = PipelineAgent("stage_3", "validation", data_validation_stage)
    
    pipeline_orchestrator.add_stage(extraction_agent)
    pipeline_orchestrator.add_stage(transformation_agent)
    pipeline_orchestrator.add_stage(validation_agent)
    
    await pipeline_orchestrator.start()
    
    # 处理项目
    test_items = [
        {'id': 1, 'data': 'test_data_1'},
        {'id': 2, 'data': 'test_data_2'},
        {'id': 3, 'data': 'test_data_3'}
    ]
    
    batch_result = await pipeline_orchestrator.process_batch(test_items)
    
    print(f"流水线处理结果: {batch_result['success_count']}/{batch_result['total_items']} 成功")
    print(f"流水线状态: {pipeline_orchestrator.get_pipeline_status()}")
    
    await pipeline_orchestrator.stop()

# 运行综合演示
if __name__ == "__main__":
    asyncio.run(comprehensive_collaboration_demo())

最佳实践与常见陷阱

协作模式选择指南

  1. 监督者-工作者模式最佳实践

    • 合理设计任务粒度,避免任务过于细碎
    • 实现动态负载均衡,根据Agent能力分配任务
    • 建立完善的监控和告警机制
    • 考虑工作者故障时的任务重分配
  2. 辩论模式最佳实践

    • 确保Agent视角的多样性
    • 设计合理的共识达成机制
    • 控制辩论轮数,避免无限循环
    • 记录完整的辩论过程,便于追溯
  3. 审查模式最佳实践

    • 明确审查标准和评分机制
    • 设置合理的迭代次数限制
    • 平衡审查严格度和效率
    • 提供具体的改进建议,而非简单的通过/不通过
  4. 流水线模式最佳实践

    • 合理划分流水线阶段
    • 考虑阶段间的并行化机会
    • 实现背压机制,避免队列积压
    • 设计优雅的错误处理和恢复

常见陷阱与避免方法

  1. 协作模式选择错误

    • 问题:为不适合的场景选择错误的协作模式
    • 避免方法:仔细分析任务特点,参考决策树,必要时进行原型验证
  2. Agent能力不匹配

    • 问题:Agent的能力与分配的任务不匹配
    • 避免方法:完善Agent能力描述,实现智能任务匹配算法
  3. 通信开销过大

    • 问题:Agent间频繁通信导致性能下降
    • 避免方法:优化通信协议,减少不必要的消息传递,使用批量处理
  4. 状态同步问题

    • 问题:多Agent之间的状态同步困难
    • 避免方法:明确状态所有权,使用分布式状态管理,设计合理的同步策略
  5. 错误处理不当

    • 问题:单个Agent的错误影响整个系统
    • 避免方法:实现错误隔离,设计重试和恢复机制,建立完善的监控

性能优化建议

  1. 并行处理优化

    • 最大化并行度,减少串行等待
    • 合理设置Agent数量和并发度
    • 考虑任务依赖关系和执行顺序
  2. 资源利用优化

    • 根据任务类型选择合适的Agent
    • 实现资源池管理和复用
    • 动态调整Agent数量和处理能力
  3. 通信优化

    • 使用高效的序列化格式
    • 实现消息批处理和压缩
    • 考虑使用消息队列进行解耦

性能优化考虑

协作模式性能对比

class CollaborationPatternBenchmark:
    """协作模式性能基准测试"""
    
    def __init__(self):
        self.results = {}
        
    async def benchmark_supervisor_worker(self, num_tasks: int, num_workers: int) -> Dict[str, float]:
        """基准测试监督者-工作者模式"""
        print(f"Testing supervisor-worker pattern: {num_tasks} tasks, {num_workers} workers")
        
        supervisor = SupervisorAgent("benchmark_supervisor")
        
        for i in range(num_workers):
            worker = WorkerAgent(f"worker_{i}", ["general_task"])
            supervisor.register_worker(worker)
            
        await supervisor.start()
        
        start_time = datetime.now()
        
        # 提交任务
        task_ids = []
        for i in range(num_tasks):
            task_id = await supervisor.submit_task("general_task", {'data': f'task_{i}'})
            task_ids.append(task_id)
            
        # 等待所有任务完成
        await asyncio.sleep(max(num_tasks / num_workers * 2, 5))
        
        end_time = datetime.now()
        total_time = (end_time - start_time).total_seconds()
        
        await supervisor.stop()
        
        return {
            'total_time': total_time,
            'throughput': num_tasks / total_time,
            'latency': total_time / num_tasks
        }
        
    async def benchmark_pipeline(self, num_items: int, num_stages: int) -> Dict[str, float]:
        """基准测试流水线模式"""
        print(f"Testing pipeline pattern: {num_items} items, {num_stages} stages")
        
        pipeline_orchestrator = PipelineOrchestrator("benchmark_pipeline")
        
        # 创建流水线阶段
        for i in range(num_stages):
            async def stage_processor(item, stage_num=i):
                await asyncio.sleep(0.5)  # 每个阶段0.5秒
                return {**item, f'stage_{stage_num}_processed': True}
                
            stage_agent = PipelineAgent(f"stage_{i}", f"stage_{i}", stage_processor)
            pipeline_orchestrator.add_stage(stage_agent)
            
        await pipeline_orchestrator.start()
        
        start_time = datetime.now()
        
        # 处理项目
        items = [{'id': i, 'data': f'item_{i}'} for i in range(num_items)]
        result = await pipeline_orchestrator.process_batch(items)
        
        end_time = datetime.now()
        total_time = (end_time - start_time).total_seconds()
        
        await pipeline_orchestrator.stop()
        
        return {
            'total_time': total_time,
            'throughput': num_items / total_time,
            'latency': total_time / num_items,
            'success_rate': result['success_count'] / num_items
        }
        
    async def run_comprehensive_benchmark(self) -> Dict[str, Dict[str, float]]:
        """运行综合基准测试"""
        print("=== 协作模式性能基准测试 ===")
        
        # 测试监督者-工作者模式
        supervisor_results = await self.benchmark_supervisor_worker(
            num_tasks=20, num_workers=5
        )
        
        # 测试流水线模式
        pipeline_results = await self.benchmark_pipeline(
            num_items=20, num_stages=3
        )
        
        self.results = {
            'supervisor_worker': supervisor_results,
            'pipeline': pipeline_results
        }
        
        return self.results
        
    def generate_report(self) -> str:
        """生成性能报告"""
        report = []
        report.append("协作模式性能对比报告")
        report.append("=" * 50)
        
        for pattern, results in self.results.items():
            report.append(f"\n{pattern}:")
            report.append(f"  总时间: {results['total_time']:.2f}s")
            report.append(f"  吞吐量: {results['throughput']:.2f} items/s")
            report.append(f"  平均延迟: {results['latency']:.4f}s")
            if 'success_rate' in results:
                report.append(f"  成功率: {results['success_rate']:.2%}")
                
        # 性能分析
        report.append("\n性能分析:")
        supervisor_throughput = self.results['supervisor_worker']['throughput']
        pipeline_throughput = self.results['pipeline']['throughput']
        
        if supervisor_throughput > pipeline_throughput:
            report.append(f"监督者-工作者模式比流水线模式快 {supervisor_throughput/pipeline_throughput:.2f}x")
        else:
            report.append(f"流水线模式比监督者-工作者模式快 {pipeline_throughput/supervisor_throughput:.2f}x")
            
        return "\n".join(report)

# 运行基准测试
async def run_benchmark_demo():
    """运行基准测试演示"""
    benchmark = CollaborationPatternBenchmark()
    results = await benchmark.run_comprehensive_benchmark()
    report = benchmark.generate_report()
    print(report)

if __name__ == "__main__":
    asyncio.run(run_benchmark_demo())

参考资源

官方文档

学术论文

  • "A Survey of Multi-Agent Reinforcement Learning" - IEEE Transactions
  • "Coordination and Cooperation in Multi-Agent Systems" - ACM Computing Surveys
  • "Debate-based Consensus in Multi-Agent Systems" - AAAI Conference

开源项目

相关工具

进一步阅读

  • "Multi-Agent Systems: An Introduction to Distributed Artificial Intelligence" - Jacques Ferber
  • "Collaborative Intelligence: Using Multi-Agent Systems to Solve Complex Problems" - 实战指南
  • "Design Patterns for Multi-Agent Systems" - 架构模式大全

通过本文的深入分析,我们可以看到多Agent协作模式的强大之处。选择合适的协作模式、设计合理的交互机制、实现高效的处理流程,是构建成功多Agent系统的关键。在实际应用中,往往需要结合多种协作模式,根据具体需求和约束条件进行灵活设计和优化。