模块11: 多智能体编排
协调多个专业智能体,以应对单个智能体无法独自完成的复杂任务。
为什么需要多智能体?
单个AI智能体可以处理简单、范围明确的任务。但随着复杂度增加——多步骤研究、带测试的代码生成、跨领域的客户支持——单个智能体就会遇到瓶颈。它难以保持专注,系统提示变得臃肿,错误率不断攀升。
多智能体系统通过将工作分解给多个专业智能体来解决这个问题。每个智能体都有专注的系统提示、针对性的工具集和明确的职责范围。这就像人类组织的运作方式:你不会让会计师去写营销文案。
多智能体架构的优势包括:
- 专业化 — 每个智能体在狭窄的任务上表现出色,拥有优化的提示和工具
- 模块化 — 可以替换、升级或调试单个智能体而不影响其他智能体
- 可扩展性 — 并行运行智能体以提高吞吐量;添加新智能体以获得新能力
- 可靠性 — 将故障隔离在单个智能体内,而不是整个流水线崩溃
- 成本效率 — 简单子任务使用便宜的模型,仅在需要时使用昂贵的模型
想象一个新闻编辑部。总编辑(编排器)将报道分配给记者(研究智能体),记者将草稿交给编辑(写作智能体),然后交给事实核查员(验证智能体)。没有一个人做所有事情。
智能体通信
在智能体协作之前,它们需要一种交换信息的方式。最简单的方法是消息传递——一个智能体的输出成为另一个智能体的输入。更复杂的系统使用共享内存、事件总线或结构化协议。
直接消息传递
最直接的模式:智能体A产生输出,作为上下文注入到智能体B的提示中。这是同步的,易于调试。
import anthropic
client = anthropic.Anthropic()
def run_agent(system_prompt: str, task: str, context: str = "") -> str:
"""Run a single agent with optional context from previous agents."""
messages = [{"role": "user", "content": f"{task}\n\nContext:\n{context}" if context else task}]
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=system_prompt,
messages=messages
)
return response.content[0].text
# Agent A produces research
research = run_agent(
system_prompt="You are a market research analyst. Provide data-driven insights.",
task="Analyse the current state of the AI agent market in 2025."
)
# Agent B consumes research to write a report
report = run_agent(
system_prompt="You are a technical writer. Write clear, structured reports.",
task="Write an executive summary based on this research.",
context=research
)
通过黑板共享状态
对于更复杂的工作流,智能体可以从共享数据结构中读写。这种黑板模式允许智能体独立运行,同时保持协调。
class Blackboard:
"""Shared state that all agents can read and write."""
def __init__(self):
self.state = {}
self.history = []
def write(self, agent_name: str, key: str, value: str):
self.state[key] = value
self.history.append({"agent": agent_name, "key": key, "timestamp": time.time()})
def read(self, key: str) -> str:
return self.state.get(key, "")
def get_summary(self) -> str:
return "\n".join(f"{k}: {v[:200]}..." for k, v in self.state.items())
# Usage
board = Blackboard()
board.write("researcher", "market_data", research_output)
board.write("analyst", "trends", analyst_output)
# Writer agent reads everything it needs
summary = board.get_summary()
从直接消息传递开始。只有当你有3个以上的智能体需要以非线性方式读取彼此的输出时,才引入共享状态。
| 通信模式 | 复杂度 | 最佳用途 | 缺点 |
|---|---|---|---|
| 直接消息传递 | 低 | 线性流水线 (A → B → C) | 顺序固定 |
| 共享黑板 | 中 | 协作分析、并行工作 | 潜在冲突 |
| 事件总线 / 发布-订阅 | 高 | 反应式系统、实时工作流 | 调试复杂 |
| 结构化协议 (A2A) | 高 | 跨组织、互操作性 | 配置开销 |
编排器模式
编排器模式是最常见的多智能体架构。一个中央编排器智能体接收任务,将其分解为子任务,委派给专业智能体,然后组装结果。编排器充当项目经理。
这个模式的强大之处在于编排器可以动态决定调用哪些智能体、以什么顺序调用、以及如何组合它们的输出。它还可以通过重试或重新路由任务来处理错误。
import anthropic
from dataclasses import dataclass
@dataclass
class Agent:
name: str
system_prompt: str
description: str # Used by orchestrator to decide delegation
def run(self, task: str) -> str:
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=self.system_prompt,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
class Orchestrator:
def __init__(self, agents: list[Agent]):
self.agents = {a.name: a for a in agents}
self.client = anthropic.Anthropic()
def plan(self, task: str) -> list[dict]:
"""Ask the LLM to decompose a task into sub-tasks."""
agent_descriptions = "\n".join(
f"- {a.name}: {a.description}" for a in self.agents.values()
)
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system="You are a task planner. Decompose tasks into steps. "
"Return JSON: [{\"agent\": \"name\", \"task\": \"description\"}]",
messages=[{"role": "user",
"content": f"Task: {task}\n\nAvailable agents:\n{agent_descriptions}"}]
)
import json
return json.loads(response.content[0].text)
def execute(self, task: str) -> str:
plan = self.plan(task)
results = {}
for step in plan:
agent = self.agents[step["agent"]]
context = "\n".join(f"[{k}]: {v}" for k, v in results.items())
result = agent.run(f"{step['task']}\n\nPrevious results:\n{context}")
results[step["agent"]] = result
return results
# Create specialist agents
researcher = Agent("Researcher",
"You are a research specialist. Find facts, data, and evidence.",
"Gathers information and data on any topic")
writer = Agent("Writer",
"You are a technical writer. Write clear, well-structured content.",
"Writes reports, summaries, and documentation")
critic = Agent("Critic",
"You are a critical reviewer. Find flaws, gaps, and improvements.",
"Reviews and critiques content for quality")
# Run orchestrated workflow
orchestrator = Orchestrator([researcher, writer, critic])
results = orchestrator.execute("Write a market analysis report on AI agents in 2025")
编排器本身是一个LLM调用。它使用智能体描述来决定委派。这意味着你的智能体描述很重要——像写招聘启事一样编写它们,这样编排器才能正确路由。
委派模式
智能体之间委派工作有几种成熟的模式。每种模式适合不同的问题结构。
顺序管道
A → B → C。每个智能体处理后向前传递。最适合线性工作流,如:研究 → 草稿 → 审核 → 发布。
中心辐射
中央编排器委派给专家并收集结果。当子任务独立且可以并行运行时最适合。
辩论 / 对抗
两个智能体争论对立立场,一个裁判智能体选出最佳方案。非常适合决策制定和减少偏见。
迭代优化
草稿 → 评审 → 修改 → 评审 → 修改。循环直到达到质量阈值。非常适合内容生成。
层级式
管理智能体委派给组长,组长再委派给工作者。类似组织架构图。可扩展到非常复杂的任务。
投票 / 集成
多个智能体独立解决同一任务,结果聚合(多数投票、最佳N选一)。提高可靠性。
顺序管道示例
def pipeline(task: str, agents: list[Agent]) -> str:
"""Run agents in sequence, each building on the previous output."""
result = task
for agent in agents:
result = agent.run(result)
print(f"[{agent.name}] completed")
return result
# Research -> Write -> Edit pipeline
final = pipeline(
"Analyse the impact of AI on healthcare",
[researcher, writer, critic]
)
辩论模式示例
def debate(topic: str, rounds: int = 2) -> str:
"""Two agents debate, a judge picks the winner."""
optimist = Agent("Optimist",
"You argue the positive case. Be persuasive with evidence.",
"Argues for")
pessimist = Agent("Pessimist",
"You argue the negative case. Be critical with evidence.",
"Argues against")
judge = Agent("Judge",
"You evaluate both arguments fairly and pick the stronger one. "
"Explain your reasoning.",
"Judges debates")
history = []
for round_num in range(rounds):
arg_for = optimist.run(
f"Topic: {topic}\nRound {round_num+1}. Previous arguments:\n" +
"\n".join(history)
)
history.append(f"FOR: {arg_for}")
arg_against = pessimist.run(
f"Topic: {topic}\nRound {round_num+1}. Previous arguments:\n" +
"\n".join(history)
)
history.append(f"AGAINST: {arg_against}")
verdict = judge.run(f"Topic: {topic}\n\nFull debate:\n" + "\n\n".join(history))
return verdict
更多智能体并不总是意味着更好的结果。每个智能体都增加了延迟、成本和潜在的通信错误。从所需的最少智能体数量开始,只有当单个智能体确实无法处理复杂性时才添加更多。
监督者架构
监督者架构扩展了编排器模式,增加了监控、错误恢复和质量控制。监督者不仅委派任务,还监视失败、验证输出,并可以重新分配工作。
这对于生产系统至关重要,因为你无法承受静默失败。监督者在传递每个智能体的输出之前,会根据预期标准进行检查。
class Supervisor:
"""Manages agents with quality checks and error recovery."""
def __init__(self, agents: list[Agent]):
self.agents = {a.name: a for a in agents}
self.client = anthropic.Anthropic()
self.max_retries = 2
def validate_output(self, output: str, criteria: str) -> bool:
"""Use an LLM call to check if output meets criteria."""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
system="You are a quality checker. Reply YES if the output meets "
"the criteria, NO if it does not. One word only.",
messages=[{"role": "user",
"content": f"Criteria: {criteria}\n\nOutput: {output[:2000]}"}]
)
return "YES" in response.content[0].text.upper()
def delegate(self, agent_name: str, task: str, criteria: str) -> str:
"""Delegate with validation and retry logic."""
agent = self.agents[agent_name]
for attempt in range(self.max_retries + 1):
result = agent.run(task)
if self.validate_output(result, criteria):
return result
print(f"[Supervisor] {agent_name} output failed validation "
f"(attempt {attempt + 1}), retrying...")
task = f"{task}\n\nPrevious attempt was rejected. Improve quality."
return result # Return last attempt even if not perfect
# Usage
supervisor = Supervisor([researcher, writer, critic])
data = supervisor.delegate(
"Researcher",
"Find 5 statistics about AI agent adoption in enterprise",
"Must contain at least 5 specific numerical statistics with sources"
)
report = supervisor.delegate(
"Writer",
f"Write an executive summary using this data:\n{data}",
"Must be 200-400 words, professional tone, structured with bullet points"
)
验证步骤本身也是一个LLM调用,因此要将成本纳入预算。对于更便宜的验证,可以先使用正则检查或JSON模式验证结构化输出,然后再升级到基于LLM的验证。
协作工作流
现实世界的应用通常结合多种模式。下面是一个实际示例:一个内容创作流水线,使用研究、写作、事实核查和编辑审核智能体协同工作。
import asyncio, anthropic
async def run_agent_async(name: str, system: str, task: str) -> dict:
"""Run an agent asynchronously for parallel execution."""
client = anthropic.AsyncAnthropic()
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=system,
messages=[{"role": "user", "content": task}]
)
return {"agent": name, "output": response.content[0].text}
async def content_pipeline(topic: str) -> str:
# Step 1: Parallel research from multiple angles
research_tasks = [
run_agent_async("Data Researcher",
"Find statistics and market data. Cite sources.",
f"Research data about: {topic}"),
run_agent_async("Trend Analyst",
"Identify emerging trends and predictions.",
f"Analyse trends in: {topic}"),
run_agent_async("Expert Finder",
"Find expert quotes and opinions from industry leaders.",
f"Find expert perspectives on: {topic}")
]
research_results = await asyncio.gather(*research_tasks)
combined_research = "\n\n".join(
f"[{r['agent']}]\n{r['output']}" for r in research_results
)
# Step 2: Writer creates draft using all research
draft = await run_agent_async("Writer",
"You are a skilled technical writer. Create engaging, well-structured content.",
f"Write a comprehensive article about {topic}.\n\nResearch:\n{combined_research}")
# Step 3: Parallel review
review_tasks = [
run_agent_async("Fact Checker",
"Verify claims and flag any unsubstantiated statements.",
f"Review this article for factual accuracy:\n{draft['output']}"),
run_agent_async("Editor",
"Improve clarity, flow, and style. Suggest specific edits.",
f"Edit this article for quality:\n{draft['output']}")
]
reviews = await asyncio.gather(*review_tasks)
feedback = "\n\n".join(f"[{r['agent']}]\n{r['output']}" for r in reviews)
# Step 4: Final revision incorporating feedback
final = await run_agent_async("Writer",
"You are a skilled technical writer. Revise based on editorial feedback.",
f"Revise this article:\n{draft['output']}\n\nFeedback:\n{feedback}")
return final["output"]
# Run the pipeline
# result = asyncio.run(content_pipeline("AI agents in enterprise software"))
使用 asyncio 进行并行智能体调用可以显著减少总执行时间。在上面的示例中,三个研究智能体同时运行而不是顺序运行,将等待时间从单次调用延迟的3倍降低到大约1倍。
总结
设计多智能体系统时,请遵循以下原则:
- 从简单开始 — 先用2个智能体,再构建10个智能体的团队
- 定义清晰的接口 — 明确每个智能体期望的输入和产生的输出
- 记录一切 — 追踪每个智能体的输入、输出、使用的token和延迟
- 优雅处理故障 — 如果一个智能体失败,系统应该降级,而不是崩溃
- 监控成本 — 每个智能体调用都是一次API调用;多智能体系统会快速放大成本
| 模式 | 智能体数 | 延迟 | 最佳用途 |
|---|---|---|---|
| 顺序管道 | 2-5 | 所有智能体之和 | 文档处理、ETL |
| 中心辐射 | 3-10 | 编排器 + 最慢智能体 | 研究、分析 |
| 辩论 | 3 | 2倍轮次 x 智能体延迟 | 决策支持、风险评估 |
| 迭代优化 | 2-3 | 轮次 x 2个智能体 | 内容创作、代码生成 |
| 层级式 | 5-20+ | 深度 x 最慢分支 | 复杂企业工作流 |
为不同的智能体使用不同的模型。便宜快速的模型(如Claude Haiku)非常适合简单的分类或路由智能体,而更强大的模型(如Claude Sonnet)处理复杂推理。这可以在不牺牲质量的情况下降低5-10倍的成本。