Ran Wei/ AI智能体/模块11
EN
AI智能体系列 — Ran Wei

模块11: 多智能体编排

协调多个专业智能体,以应对单个智能体无法独自完成的复杂任务。

1

为什么需要多智能体?

单个AI智能体可以处理简单、范围明确的任务。但随着复杂度增加——多步骤研究、带测试的代码生成、跨领域的客户支持——单个智能体就会遇到瓶颈。它难以保持专注,系统提示变得臃肿,错误率不断攀升。

多智能体系统通过将工作分解给多个专业智能体来解决这个问题。每个智能体都有专注的系统提示、针对性的工具集和明确的职责范围。这就像人类组织的运作方式:你不会让会计师去写营销文案。

多智能体架构的优势包括:

类比

想象一个新闻编辑部。总编辑(编排器)将报道分配给记者(研究智能体),记者将草稿交给编辑(写作智能体),然后交给事实核查员(验证智能体)。没有一个人做所有事情。

2

智能体通信

在智能体协作之前,它们需要一种交换信息的方式。最简单的方法是消息传递——一个智能体的输出成为另一个智能体的输入。更复杂的系统使用共享内存、事件总线或结构化协议。

直接消息传递

最直接的模式:智能体A产生输出,作为上下文注入到智能体B的提示中。这是同步的,易于调试。

import anthropic

client = anthropic.Anthropic()

def run_agent(system_prompt: str, task: str, context: str = "") -> str:
    """Run a single agent with optional context from previous agents."""
    messages = [{"role": "user", "content": f"{task}\n\nContext:\n{context}" if context else task}]
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        system=system_prompt,
        messages=messages
    )
    return response.content[0].text

# Agent A produces research
research = run_agent(
    system_prompt="You are a market research analyst. Provide data-driven insights.",
    task="Analyse the current state of the AI agent market in 2025."
)

# Agent B consumes research to write a report
report = run_agent(
    system_prompt="You are a technical writer. Write clear, structured reports.",
    task="Write an executive summary based on this research.",
    context=research
)

通过黑板共享状态

对于更复杂的工作流,智能体可以从共享数据结构中读写。这种黑板模式允许智能体独立运行,同时保持协调。

class Blackboard:
    """Shared state that all agents can read and write."""
    def __init__(self):
        self.state = {}
        self.history = []

    def write(self, agent_name: str, key: str, value: str):
        self.state[key] = value
        self.history.append({"agent": agent_name, "key": key, "timestamp": time.time()})

    def read(self, key: str) -> str:
        return self.state.get(key, "")

    def get_summary(self) -> str:
        return "\n".join(f"{k}: {v[:200]}..." for k, v in self.state.items())

# Usage
board = Blackboard()
board.write("researcher", "market_data", research_output)
board.write("analyst", "trends", analyst_output)
# Writer agent reads everything it needs
summary = board.get_summary()
提示

从直接消息传递开始。只有当你有3个以上的智能体需要以非线性方式读取彼此的输出时,才引入共享状态。

通信模式复杂度最佳用途缺点
直接消息传递线性流水线 (A → B → C)顺序固定
共享黑板协作分析、并行工作潜在冲突
事件总线 / 发布-订阅反应式系统、实时工作流调试复杂
结构化协议 (A2A)跨组织、互操作性配置开销
3

编排器模式

编排器模式是最常见的多智能体架构。一个中央编排器智能体接收任务,将其分解为子任务,委派给专业智能体,然后组装结果。编排器充当项目经理。

这个模式的强大之处在于编排器可以动态决定调用哪些智能体、以什么顺序调用、以及如何组合它们的输出。它还可以通过重试或重新路由任务来处理错误。

import anthropic
from dataclasses import dataclass

@dataclass
class Agent:
    name: str
    system_prompt: str
    description: str  # Used by orchestrator to decide delegation

    def run(self, task: str) -> str:
        client = anthropic.Anthropic()
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            system=self.system_prompt,
            messages=[{"role": "user", "content": task}]
        )
        return response.content[0].text

class Orchestrator:
    def __init__(self, agents: list[Agent]):
        self.agents = {a.name: a for a in agents}
        self.client = anthropic.Anthropic()

    def plan(self, task: str) -> list[dict]:
        """Ask the LLM to decompose a task into sub-tasks."""
        agent_descriptions = "\n".join(
            f"- {a.name}: {a.description}" for a in self.agents.values()
        )
        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            system="You are a task planner. Decompose tasks into steps. "
                   "Return JSON: [{\"agent\": \"name\", \"task\": \"description\"}]",
            messages=[{"role": "user",
                       "content": f"Task: {task}\n\nAvailable agents:\n{agent_descriptions}"}]
        )
        import json
        return json.loads(response.content[0].text)

    def execute(self, task: str) -> str:
        plan = self.plan(task)
        results = {}
        for step in plan:
            agent = self.agents[step["agent"]]
            context = "\n".join(f"[{k}]: {v}" for k, v in results.items())
            result = agent.run(f"{step['task']}\n\nPrevious results:\n{context}")
            results[step["agent"]] = result
        return results

# Create specialist agents
researcher = Agent("Researcher",
    "You are a research specialist. Find facts, data, and evidence.",
    "Gathers information and data on any topic")
writer = Agent("Writer",
    "You are a technical writer. Write clear, well-structured content.",
    "Writes reports, summaries, and documentation")
critic = Agent("Critic",
    "You are a critical reviewer. Find flaws, gaps, and improvements.",
    "Reviews and critiques content for quality")

# Run orchestrated workflow
orchestrator = Orchestrator([researcher, writer, critic])
results = orchestrator.execute("Write a market analysis report on AI agents in 2025")
注意

编排器本身是一个LLM调用。它使用智能体描述来决定委派。这意味着你的智能体描述很重要——像写招聘启事一样编写它们,这样编排器才能正确路由。

4

委派模式

智能体之间委派工作有几种成熟的模式。每种模式适合不同的问题结构。

顺序管道

A → B → C。每个智能体处理后向前传递。最适合线性工作流,如:研究 → 草稿 → 审核 → 发布。

中心辐射

中央编排器委派给专家并收集结果。当子任务独立且可以并行运行时最适合。

辩论 / 对抗

两个智能体争论对立立场,一个裁判智能体选出最佳方案。非常适合决策制定和减少偏见。

迭代优化

草稿 → 评审 → 修改 → 评审 → 修改。循环直到达到质量阈值。非常适合内容生成。

层级式

管理智能体委派给组长,组长再委派给工作者。类似组织架构图。可扩展到非常复杂的任务。

投票 / 集成

多个智能体独立解决同一任务,结果聚合(多数投票、最佳N选一)。提高可靠性。

顺序管道示例

def pipeline(task: str, agents: list[Agent]) -> str:
    """Run agents in sequence, each building on the previous output."""
    result = task
    for agent in agents:
        result = agent.run(result)
        print(f"[{agent.name}] completed")
    return result

# Research -> Write -> Edit pipeline
final = pipeline(
    "Analyse the impact of AI on healthcare",
    [researcher, writer, critic]
)

辩论模式示例

def debate(topic: str, rounds: int = 2) -> str:
    """Two agents debate, a judge picks the winner."""
    optimist = Agent("Optimist",
        "You argue the positive case. Be persuasive with evidence.",
        "Argues for")
    pessimist = Agent("Pessimist",
        "You argue the negative case. Be critical with evidence.",
        "Argues against")
    judge = Agent("Judge",
        "You evaluate both arguments fairly and pick the stronger one. "
        "Explain your reasoning.",
        "Judges debates")

    history = []
    for round_num in range(rounds):
        arg_for = optimist.run(
            f"Topic: {topic}\nRound {round_num+1}. Previous arguments:\n" +
            "\n".join(history)
        )
        history.append(f"FOR: {arg_for}")

        arg_against = pessimist.run(
            f"Topic: {topic}\nRound {round_num+1}. Previous arguments:\n" +
            "\n".join(history)
        )
        history.append(f"AGAINST: {arg_against}")

    verdict = judge.run(f"Topic: {topic}\n\nFull debate:\n" + "\n\n".join(history))
    return verdict
陷阱

更多智能体并不总是意味着更好的结果。每个智能体都增加了延迟、成本和潜在的通信错误。从所需的最少智能体数量开始,只有当单个智能体确实无法处理复杂性时才添加更多。

5

监督者架构

监督者架构扩展了编排器模式,增加了监控、错误恢复和质量控制。监督者不仅委派任务,还监视失败、验证输出,并可以重新分配工作。

这对于生产系统至关重要,因为你无法承受静默失败。监督者在传递每个智能体的输出之前,会根据预期标准进行检查。

class Supervisor:
    """Manages agents with quality checks and error recovery."""

    def __init__(self, agents: list[Agent]):
        self.agents = {a.name: a for a in agents}
        self.client = anthropic.Anthropic()
        self.max_retries = 2

    def validate_output(self, output: str, criteria: str) -> bool:
        """Use an LLM call to check if output meets criteria."""
        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=100,
            system="You are a quality checker. Reply YES if the output meets "
                   "the criteria, NO if it does not. One word only.",
            messages=[{"role": "user",
                       "content": f"Criteria: {criteria}\n\nOutput: {output[:2000]}"}]
        )
        return "YES" in response.content[0].text.upper()

    def delegate(self, agent_name: str, task: str, criteria: str) -> str:
        """Delegate with validation and retry logic."""
        agent = self.agents[agent_name]
        for attempt in range(self.max_retries + 1):
            result = agent.run(task)
            if self.validate_output(result, criteria):
                return result
            print(f"[Supervisor] {agent_name} output failed validation "
                  f"(attempt {attempt + 1}), retrying...")
            task = f"{task}\n\nPrevious attempt was rejected. Improve quality."
        return result  # Return last attempt even if not perfect

# Usage
supervisor = Supervisor([researcher, writer, critic])
data = supervisor.delegate(
    "Researcher",
    "Find 5 statistics about AI agent adoption in enterprise",
    "Must contain at least 5 specific numerical statistics with sources"
)
report = supervisor.delegate(
    "Writer",
    f"Write an executive summary using this data:\n{data}",
    "Must be 200-400 words, professional tone, structured with bullet points"
)
提示

验证步骤本身也是一个LLM调用,因此要将成本纳入预算。对于更便宜的验证,可以先使用正则检查或JSON模式验证结构化输出,然后再升级到基于LLM的验证。

6

协作工作流

现实世界的应用通常结合多种模式。下面是一个实际示例:一个内容创作流水线,使用研究、写作、事实核查和编辑审核智能体协同工作。

import asyncio, anthropic

async def run_agent_async(name: str, system: str, task: str) -> dict:
    """Run an agent asynchronously for parallel execution."""
    client = anthropic.AsyncAnthropic()
    response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        system=system,
        messages=[{"role": "user", "content": task}]
    )
    return {"agent": name, "output": response.content[0].text}

async def content_pipeline(topic: str) -> str:
    # Step 1: Parallel research from multiple angles
    research_tasks = [
        run_agent_async("Data Researcher",
            "Find statistics and market data. Cite sources.",
            f"Research data about: {topic}"),
        run_agent_async("Trend Analyst",
            "Identify emerging trends and predictions.",
            f"Analyse trends in: {topic}"),
        run_agent_async("Expert Finder",
            "Find expert quotes and opinions from industry leaders.",
            f"Find expert perspectives on: {topic}")
    ]
    research_results = await asyncio.gather(*research_tasks)
    combined_research = "\n\n".join(
        f"[{r['agent']}]\n{r['output']}" for r in research_results
    )

    # Step 2: Writer creates draft using all research
    draft = await run_agent_async("Writer",
        "You are a skilled technical writer. Create engaging, well-structured content.",
        f"Write a comprehensive article about {topic}.\n\nResearch:\n{combined_research}")

    # Step 3: Parallel review
    review_tasks = [
        run_agent_async("Fact Checker",
            "Verify claims and flag any unsubstantiated statements.",
            f"Review this article for factual accuracy:\n{draft['output']}"),
        run_agent_async("Editor",
            "Improve clarity, flow, and style. Suggest specific edits.",
            f"Edit this article for quality:\n{draft['output']}")
    ]
    reviews = await asyncio.gather(*review_tasks)
    feedback = "\n\n".join(f"[{r['agent']}]\n{r['output']}" for r in reviews)

    # Step 4: Final revision incorporating feedback
    final = await run_agent_async("Writer",
        "You are a skilled technical writer. Revise based on editorial feedback.",
        f"Revise this article:\n{draft['output']}\n\nFeedback:\n{feedback}")

    return final["output"]

# Run the pipeline
# result = asyncio.run(content_pipeline("AI agents in enterprise software"))
注意

使用 asyncio 进行并行智能体调用可以显著减少总执行时间。在上面的示例中,三个研究智能体同时运行而不是顺序运行,将等待时间从单次调用延迟的3倍降低到大约1倍。

7

总结

设计多智能体系统时,请遵循以下原则:

  1. 从简单开始 — 先用2个智能体,再构建10个智能体的团队
  2. 定义清晰的接口 — 明确每个智能体期望的输入和产生的输出
  3. 记录一切 — 追踪每个智能体的输入、输出、使用的token和延迟
  4. 优雅处理故障 — 如果一个智能体失败,系统应该降级,而不是崩溃
  5. 监控成本 — 每个智能体调用都是一次API调用;多智能体系统会快速放大成本
模式智能体数延迟最佳用途
顺序管道2-5所有智能体之和文档处理、ETL
中心辐射3-10编排器 + 最慢智能体研究、分析
辩论32倍轮次 x 智能体延迟决策支持、风险评估
迭代优化2-3轮次 x 2个智能体内容创作、代码生成
层级式5-20+深度 x 最慢分支复杂企业工作流
提示

为不同的智能体使用不同的模型。便宜快速的模型(如Claude Haiku)非常适合简单的分类或路由智能体,而更强大的模型(如Claude Sonnet)处理复杂推理。这可以在不牺牲质量的情况下降低5-10倍的成本。

下一模块

模块12 — A2A协议