“一个人走得快,一群人走得远。Agent 也是如此。”


TL;DR

Multi-Agent 系统通过任务并行、专业分工、容错冗余实现规模化智能。三种核心协作模式:Supervisor-Workers(主管-工作者)、Peer-to-Peer(对等协作)、Hierarchical(层级组织)。核心挑战在于通信协议设计、任务分配策略、冲突解决机制和一致性保证。实战模式包括 Map-Reduce、Debate、Voting、Pipeline 等。


📋 本文结构


为什么需要 Multi-Agent?

单 Agent 的局限

场景:复杂销售流程

一个 enterprise sale 涉及:

  • 线索挖掘(Research)
  • 初步接触(Outreach)
  • 需求分析(Discovery)
  • 方案设计(Solutioning)
  • 谈判(Negotiation)
  • 合同(Contracting)
  • 交付协调(Delivery)

单 Agent 尝试处理所有环节:

  • Context window 爆炸
  • 专业能力稀释
  • 错误率累积
  • 响应时间不可接受

Multi-Agent 的优势

优势 说明 效果
并行处理 多个 Agent 同时工作 效率提升 5-10x
专业分工 每个 Agent 专注一个领域 准确率提升
容错冗余 单点失败不影响整体 系统可靠性提升
可扩展性 按需增减 Agent 弹性应对负载
模块化 独立开发、测试、部署 开发效率提升

三种协作架构模式

模式 1:Supervisor-Workers(主管-工作者)

结构:

flowchart TB
    Supervisor["Supervisor Agent<br/>协调者:任务分解、分配、汇总"]
    
    Worker1["Worker 1"]
    Worker2["Worker 2"]
    Worker3["Worker 3"]
    
    Supervisor --> Worker1
    Supervisor --> Worker2
    Supervisor --> Worker3
    
    style Supervisor fill:#fef3c7,stroke:#d97706,stroke-width:2px
    style Worker1 fill:#dbeafe,stroke:#2563eb
    style Worker2 fill:#dbeafe,stroke:#2563eb
    style Worker3 fill:#dbeafe,stroke:#2563eb

适用场景:

  • 任务可明确分解为独立子任务
  • 需要统一协调和汇总
  • 子任务之间依赖较少

示例:市场调研报告

class MarketResearchOrchestrator:
    """
    主管 Agent:协调多个研究员 Agent 生成市场报告
    """
    
    def __init__(self):
        self.supervisor = SupervisorAgent()
        self.researchers = {
            "industry": ResearcherAgent(expertise="industry_analysis"),
            "competitor": ResearcherAgent(expertise="competitive_analysis"),
            "customer": ResearcherAgent(expertise="customer_research"),
            "trend": ResearcherAgent(expertise="trend_forecasting")
        }
    
    async def generate_report(self, topic: str) -> Report:
        # 1. 主管分解任务
        subtasks = self.supervisor.decompose_task(topic)
        
        # 2. 并行分配给研究员
        results = await asyncio.gather(*[
            self.researchers[task.type].research(task)
            for task in subtasks
        ])
        
        # 3. 主管汇总整合
        report = self.supervisor.synthesize(results)
        
        return report

模式 2:Peer-to-Peer(对等协作)

结构:

flowchart TB
    AgentA["Agent A"]
    AgentB["Agent B"]
    AgentC["Agent C"]
    AgentD["Agent D"]
    
    AgentA <--> AgentB
    AgentA <--> AgentC
    AgentB <--> AgentD
    AgentC <--> AgentD
    
    style AgentA fill:#dbeafe,stroke:#2563eb
    style AgentB fill:#dbeafe,stroke:#2563eb
    style AgentC fill:#dbeafe,stroke:#2563eb
    style AgentD fill:#dbeafe,stroke:#2563eb

适用场景:

  • 任务需要多轮协商
  • Agent 之间有平等话语权
  • 需要头脑风暴或辩论

示例:代码评审委员会

class CodeReviewCommittee:
    """
    多个专家 Agent 共同评审代码
    """
    
    def __init__(self):
        self.reviewers = [
            SecurityReviewer(),
            PerformanceReviewer(),
            MaintainabilityReviewer(),
            BusinessLogicReviewer()
        ]
    
    async def review(self, code: Code) -> ReviewDecision:
        # 每个评审员独立评审
        reviews = await asyncio.gather(*[
            reviewer.review(code) for reviewer in self.reviewers
        ])
        
        # 讨论和协商
        consensus = await self.discuss_and_resolve(reviews)
        
        return consensus
    
    async def discuss_and_resolve(self, reviews: List[Review]) -> ReviewDecision:
        """通过多轮讨论达成共识"""
        discussion = Discussion()
        
        for round in range(max_rounds):
            # 每个评审员回应其他人的观点
            for reviewer in self.reviewers:
                response = await reviewer.respond_to(reviews, discussion)
                discussion.add(response)
            
            # 检查是否达成共识
            if self.check_consensus(discussion):
                break
        
        return self.compile_decision(discussion)

模式 3:Hierarchical(层级组织)

结构:

flowchart TB
    CEO["CEO Agent 战略决策"]
    
    VPSales["VP Sales 战术规划"]
    VPMkt["VP Marketing 战术规划"]
    VPProd["VP Product 战术规划"]
    
    Mgr1["Manager 执行管理"]
    Mgr2["Manager 执行管理"]
    Mgr3["Manager 执行管理"]
    
    Worker1["Worker 具体执行"]
    Worker2["Worker 具体执行"]
    Worker3["Worker 具体执行"]
    
    CEO --> VPSales
    CEO --> VPMkt
    CEO --> VPProd
    
    VPSales --> Mgr1
    VPMkt --> Mgr2
    VPProd --> Mgr3
    
    Mgr1 --> Worker1
    Mgr2 --> Worker2
    Mgr3 --> Worker3
    
    style CEO fill:#fef3c7,stroke:#d97706,stroke-width:3px
    style VPSales fill:#fed7aa,stroke:#ea580c,stroke-width:2px
    style VPMkt fill:#fed7aa,stroke:#ea580c,stroke-width:2px
    style VPProd fill:#fed7aa,stroke:#ea580c,stroke-width:2px
    style Mgr1 fill:#dbeafe,stroke:#2563eb
    style Mgr2 fill:#dbeafe,stroke:#2563eb
    style Mgr3 fill:#dbeafe,stroke:#2563eb
    style Worker1 fill:#d1fae5,stroke:#059669
    style Worker2 fill:#d1fae5,stroke:#059669
    style Worker3 fill:#d1fae5,stroke:#059669

适用场景:

  • 复杂组织模拟
  • 多层次决策
  • 需要战略到执行的完整链条

示例:虚拟销售组织

class VirtualSalesOrganization:
    """
    模拟完整销售组织的 Multi-Agent 系统
    """
    
    def __init__(self):
        # 顶层:战略层
        self.ceo = CEOAgent()
        
        # 中层:部门层
        self.vps = {
            "sales": VPSales(),
            "marketing": VPMarketing(),
            "customer_success": VPCustomerSuccess()
        }
        
        # 基层:执行层
        self.sales_team = [SalesRepAgent() for _ in range(10)]
        self.marketing_team = [MarketingSpecialistAgent() for _ in range(5)]
    
    async def quarterly_planning(self):
        # CEO 制定战略目标
        strategy = await self.ceo.set_quarterly_strategy()
        
        # VP 们制定战术计划
        plans = await asyncio.gather(*[
            vp.create_plan(strategy) for vp in self.vps.values()
        ])
        
        # 执行团队落实
        execution_results = await self.execute_plans(plans)
        
        return execution_results

核心机制:通信、分配、协调

1. 通信协议

Message Bus 模式:

class MessageBus:
    """
    Agent 间通信的总线
    """
    
    def __init__(self):
        self.subscribers = defaultdict(list)
        self.message_history = []
    
    def subscribe(self, topic: str, agent: Agent):
        """订阅主题"""
        self.subscribers[topic].append(agent)
    
    def publish(self, message: Message):
        """发布消息"""
        self.message_history.append(message)
        
        # 广播给订阅者
        for agent in self.subscribers.get(message.topic, []):
            if agent.id != message.sender_id:  # 不发给发送者自己
                asyncio.create_task(agent.receive(message))
    
    def send_direct(self, message: Message, recipient_id: str):
        """点对点发送"""
        recipient = self.get_agent(recipient_id)
        asyncio.create_task(recipient.receive(message))

消息格式:

@dataclass
class Message:
    id: str
    sender_id: str
    recipient_id: Optional[str]  # None 表示广播
    topic: str
    content: str
    message_type: str  # task, response, notification, request_help
    priority: int  # 1-5
    timestamp: datetime
    context: dict  # 关联的上下文信息

2. 任务分配

策略 1:基于能力的分配

class CapabilityBasedAssignment:
    """
    根据 Agent 能力匹配任务
    """
    
    def assign(self, task: Task, available_agents: List[Agent]) -> Agent:
        # 计算每个 Agent 的能力匹配度
        scores = []
        for agent in available_agents:
            score = self.calculate_match_score(task, agent)
            scores.append((agent, score))
        
        # 选择匹配度最高的
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores[0][0]
    
    def calculate_match_score(self, task: Task, agent: Agent) -> float:
        score = 0.0
        
        # 技能匹配
        for required_skill in task.required_skills:
            if required_skill in agent.skills:
                score += agent.skills[required_skill]
        
        # 负载均衡(避免过度分配给同一个 Agent)
        current_load = agent.get_current_load()
        score *= (1 - current_load)
        
        # 历史表现
        past_performance = agent.get_performance_on(task.type)
        score *= past_performance
        
        return score

策略 2:拍卖机制

class AuctionBasedAssignment:
    """
    Agent 竞拍任务,出价基于能力和负载
    """
    
    async def assign_via_auction(self, task: Task, agents: List[Agent]) -> Agent:
        # 收集投标
        bids = await asyncio.gather(*[
            agent.submit_bid(task) for agent in agents
        ])
        
        # 选择最优投标
        # 考虑:价格、质量、交付时间
        winner = self.select_winner(bids)
        
        return winner

3. 冲突解决

常见冲突类型:

  1. 资源竞争:多个 Agent 需要同一资源
  2. 决策分歧:对同一问题有不同解决方案
  3. 优先级冲突:任务排序不一致
  4. 数据不一致:基于不同数据做出判断

解决机制:

class ConflictResolver:
    """
    Multi-Agent 冲突解决器
    """
    
    def resolve(self, conflict: Conflict) -> Resolution:
        if conflict.type == "RESOURCE_CONTENTION":
            return self.resolve_resource_contention(conflict)
        elif conflict.type == "DECISION_DISAGREEMENT":
            return self.resolve_decision_disagreement(conflict)
        elif conflict.type == "PRIORITY":
            return self.resolve_priority_conflict(conflict)
    
    def resolve_decision_disagreement(self, conflict: Conflict) -> Resolution:
        """
        决策分歧:通过投票或上级裁决
        """
        agents = conflict.involved_agents
        
        # 尝试达成共识
        consensus = self.attempt_consensus(agents)
        if consensus:
            return Resolution(type="CONSENSUS", decision=consensus)
        
        # 无法达成共识,升级给上级
        if conflict.supervisor:
            decision = conflict.supervisor.arbitrate(conflict)
            return Resolution(type="ARBITRATION", decision=decision)
        
        # 最终手段:投票
        votes = self.collect_votes(agents)
        winner = max(votes, key=votes.get)
        return Resolution(type="VOTING", decision=winner)

实战协作模式

模式 1:Map-Reduce(分治聚合)

class MapReducePattern:
    """
    将大任务分解为小任务并行处理,然后汇总结果
    """
    
    async def execute(self, task: LargeTask, workers: List[Agent]) -> Result:
        # Map 阶段:分解任务
        subtasks = self.decompose(task)
        
        # 并行执行
        partial_results = await asyncio.gather(*[
            workers[i % len(workers)].execute(subtask)
            for i, subtask in enumerate(subtasks)
        ])
        
        # Reduce 阶段:汇总结果
        final_result = self.aggregate(partial_results)
        
        return final_result

适用场景:

  • 大规模数据处理
  • 批量文档分析
  • 分布式研究

模式 2:Debate(辩论)

class DebatePattern:
    """
    多个 Agent 就一个问题展开辩论,最后达成共识或投票
    """
    
    async def debate(self, topic: str, debaters: List[Agent], rounds: int = 3):
        discussion = []
        
        for round in range(rounds):
            round_arguments = []
            
            for debater in debaters:
                # 每个 debater 基于之前的讨论发表观点
                argument = await debater.argue(topic, discussion)
                round_arguments.append(argument)
            
            discussion.extend(round_arguments)
            
            # 检查是否达成共识
            if self.check_consensus(discussion):
                break
        
        return self.compile_conclusion(discussion)

适用场景:

  • 方案评估
  • 风险评估
  • 创意生成

模式 3:Voting(投票)

class VotingPattern:
    """
    多个 Agent 对选项进行投票
    """
    
    async def vote(self, options: List[Option], voters: List[Agent]) -> Option:
        votes = defaultdict(int)
        
        for voter in voters:
            vote = await voter.vote(options)
            votes[vote] += voter.weight  # 权重可以基于专业能力
        
        winner = max(votes, key=votes.get)
        return winner

适用场景:

  • 快速决策
  • 分类任务
  • 质量评估

模式 4:Pipeline(流水线)

class PipelinePattern:
    """
    Agent 按顺序处理,每个 Agent 的输出是下一个的输入
    """
    
    async def execute(self, input_data: Any, stages: List[Agent]) -> Any:
        data = input_data
        
        for stage in stages:
            data = await stage.process(data)
            
            # 质量检查
            if not self.quality_check(data):
                # 回退或重试
                data = await self.handle_failure(stage, data)
        
        return data

适用场景:

  • 文档处理流水线
  • 内容审核
  • 数据处理流程

Multi-Agent 的边界与陷阱

什么时候不该用 Multi-Agent?

场景 建议 理由
简单任务 单 Agent 增加复杂度没有收益
低延迟要求 单 Agent 协调开销不可接受
强一致性要求 单 Agent 分布式一致性难度大
资源受限 单 Agent Multi-Agent 成本高

常见陷阱

陷阱 1:过度分解

❌ 错误:把简单任务拆给 10 个 Agent
✅ 正确:分解的粒度应该与收益匹配

陷阱 2:通信风暴

❌ 错误:N 个 Agent 两两通信,复杂度 O(N²)
✅ 正确:使用 Pub/Sub 或层级通信

陷阱 3:级联失败

❌ 错误:一个 Agent 失败导致整个系统崩溃
✅ 正确:设计熔断、降级、重试机制

陷阱 4:调试地狱

❌ 错误:多个 Agent 的行为难以追踪
✅ 正确:完善的日志、追踪、可视化工具

从 Multi-Agent 到 Agent Organization

数字组织的诞生

当 Multi-Agent 系统足够复杂时,它开始像一个”组织”:

  • 角色定义:每个 Agent 有明确的角色和职责
  • 流程规范:协作遵循既定流程
  • 绩效评估:评估 Agent 的表现
  • 演进优化:根据反馈调整组织结构

隐喻:Agent 即员工

组织概念 Agent 世界 示例
招聘 Agent 实例化 根据负载自动创建新 Agent
培训 Fine-tuning 针对特定任务优化 Agent
绩效考核 Metrics 准确率、延迟、用户满意度
晋升 升级模型 表现好的 Agent 使用更强的模型
解雇 销毁实例 表现差的 Agent 被回收
组织架构 Topology 扁平 vs 层级

未来:AI-Native 组织

传统组织:
人类员工 + 软件工具

AI-Native 组织:
人类决策者 + AI 管理者 + Agent 员工

这种组织的特点是:

  • 7x24 小时运作
  • 秒级响应
  • 无限可扩展
  • 持续自我优化

写在最后

Multi-Agent 不是万能的,但在合适的场景下,它能带来数量级的提升。

关键成功因素:

  1. 正确选择架构模式:没有最好的模式,只有最适合的模式
  2. 设计良好的通信机制:通信是协作的基础
  3. 处理好冲突和一致性:这是分布式系统的核心挑战
  4. 监控和可观测性:你需要知道系统内部发生了什么

最后的话:

单个 Agent 是聪明的个体,Multi-Agent 是智慧的群体。

当我们学会让 Agent 协作时,我们就在创造一种新型的数字生命形式。


📚 延伸阅读

本系列文章

外部资源


Agent OS 系列 - 第 6 篇 由 @postcodeeng 整理发布

Published on 2026-04-14 阅读时间:约 15 分钟

下一篇预告:《CRM 的 Agent 化重构》