Module 11: Multi-Agent Orchestration
Coordinating multiple specialised agents to tackle complex tasks that no single agent can handle alone.
Why Multiple Agents?
A single AI agent can handle simple, well-scoped tasks. But as complexity grows — multi-step research, code generation with testing, customer support across domains — a single agent hits its limits. It struggles to maintain focus, its system prompt becomes bloated, and error rates climb.
Multi-agent systems solve this by decomposing work across specialised agents. Each agent has a focused system prompt, a targeted set of tools, and a clear area of responsibility. This mirrors how human organisations work: you don't ask your accountant to write marketing copy.
The benefits of multi-agent architectures include:
- Specialisation — each agent excels at a narrow task with optimised prompts and tools
- Modularity — swap, upgrade, or debug individual agents without touching others
- Scalability — run agents in parallel for throughput; add new agents for new capabilities
- Reliability — isolate failures to a single agent rather than losing the whole pipeline
- Cost efficiency — use cheaper models for simple sub-tasks, expensive models only where needed
Think of a newsroom. An editor-in-chief (orchestrator) assigns stories to reporters (research agents), who hand drafts to editors (writing agents), then to fact-checkers (validation agents). No single person does everything.
Agent Communication
Before agents can collaborate, they need a way to exchange information. The simplest approach is message passing — one agent's output becomes another agent's input. More sophisticated systems use shared memory, event buses, or structured protocols.
Direct Message Passing
The most straightforward pattern: Agent A produces output, which is injected into Agent B's prompt as context. This is synchronous and easy to debug.
import anthropic
client = anthropic.Anthropic()
def run_agent(system_prompt: str, task: str, context: str = "") -> str:
"""Run a single agent with optional context from previous agents."""
messages = [{"role": "user", "content": f"{task}\n\nContext:\n{context}" if context else task}]
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=system_prompt,
messages=messages
)
return response.content[0].text
# Agent A produces research
research = run_agent(
system_prompt="You are a market research analyst. Provide data-driven insights.",
task="Analyse the current state of the AI agent market in 2025."
)
# Agent B consumes research to write a report
report = run_agent(
system_prompt="You are a technical writer. Write clear, structured reports.",
task="Write an executive summary based on this research.",
context=research
)
Shared State via a Blackboard
For more complex workflows, agents can read from and write to a shared data structure. This blackboard pattern allows agents to operate independently while staying coordinated.
class Blackboard:
"""Shared state that all agents can read and write."""
def __init__(self):
self.state = {}
self.history = []
def write(self, agent_name: str, key: str, value: str):
self.state[key] = value
self.history.append({"agent": agent_name, "key": key, "timestamp": time.time()})
def read(self, key: str) -> str:
return self.state.get(key, "")
def get_summary(self) -> str:
return "\n".join(f"{k}: {v[:200]}..." for k, v in self.state.items())
# Usage
board = Blackboard()
board.write("researcher", "market_data", research_output)
board.write("analyst", "trends", analyst_output)
# Writer agent reads everything it needs
summary = board.get_summary()
Start with direct message passing. Only introduce shared state when you have 3+ agents that need to read each other's outputs in non-linear ways.
| Communication Pattern | Complexity | Best For | Drawback |
|---|---|---|---|
| Direct message passing | Low | Linear pipelines (A → B → C) | Rigid ordering |
| Shared blackboard | Medium | Collaborative analysis, parallel work | Potential conflicts |
| Event bus / pub-sub | High | Reactive systems, real-time workflows | Debugging complexity |
| Structured protocol (A2A) | High | Cross-organisation, interoperability | Setup overhead |
Orchestrator Pattern
The orchestrator pattern is the most common multi-agent architecture. A central orchestrator agent receives a task, decomposes it into sub-tasks, delegates each to a specialist agent, and assembles the results. The orchestrator acts as a project manager.
This pattern is powerful because the orchestrator can dynamically decide which agents to call, in what order, and how to combine their outputs. It can also handle errors by retrying or re-routing tasks.
import anthropic
from dataclasses import dataclass
@dataclass
class Agent:
name: str
system_prompt: str
description: str # Used by orchestrator to decide delegation
def run(self, task: str) -> str:
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=self.system_prompt,
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
class Orchestrator:
def __init__(self, agents: list[Agent]):
self.agents = {a.name: a for a in agents}
self.client = anthropic.Anthropic()
def plan(self, task: str) -> list[dict]:
"""Ask the LLM to decompose a task into sub-tasks."""
agent_descriptions = "\n".join(
f"- {a.name}: {a.description}" for a in self.agents.values()
)
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system="You are a task planner. Decompose tasks into steps. "
"Return JSON: [{\"agent\": \"name\", \"task\": \"description\"}]",
messages=[{"role": "user",
"content": f"Task: {task}\n\nAvailable agents:\n{agent_descriptions}"}]
)
import json
return json.loads(response.content[0].text)
def execute(self, task: str) -> str:
plan = self.plan(task)
results = {}
for step in plan:
agent = self.agents[step["agent"]]
context = "\n".join(f"[{k}]: {v}" for k, v in results.items())
result = agent.run(f"{step['task']}\n\nPrevious results:\n{context}")
results[step["agent"]] = result
return results
# Create specialist agents
researcher = Agent("Researcher",
"You are a research specialist. Find facts, data, and evidence.",
"Gathers information and data on any topic")
writer = Agent("Writer",
"You are a technical writer. Write clear, well-structured content.",
"Writes reports, summaries, and documentation")
critic = Agent("Critic",
"You are a critical reviewer. Find flaws, gaps, and improvements.",
"Reviews and critiques content for quality")
# Run orchestrated workflow
orchestrator = Orchestrator([researcher, writer, critic])
results = orchestrator.execute("Write a market analysis report on AI agents in 2025")
The orchestrator itself is an LLM call. It uses the agent descriptions to decide delegation. This means your agent descriptions matter — write them like job postings so the orchestrator can route correctly.
Delegation Patterns
There are several well-established patterns for how agents delegate work to each other. Each suits different problem structures.
Sequential Pipeline
A → B → C. Each agent processes and passes forward. Best for linear workflows like: research → draft → review → publish.
Hub-and-Spoke
Central orchestrator delegates to specialists and collects results. Best when sub-tasks are independent and can run in parallel.
Debate / Adversarial
Two agents argue opposing positions, a judge agent selects the best. Great for decision-making and reducing bias.
Iterative Refinement
Draft → critique → revise → critique → revise. Loop until quality threshold is met. Ideal for content generation.
Hierarchical
Manager agents delegate to team leads, who delegate to workers. Mirrors org charts. Scales to very complex tasks.
Voting / Ensemble
Multiple agents solve the same task independently, results are aggregated (majority vote, best-of-N). Improves reliability.
Sequential Pipeline Example
def pipeline(task: str, agents: list[Agent]) -> str:
"""Run agents in sequence, each building on the previous output."""
result = task
for agent in agents:
result = agent.run(result)
print(f"[{agent.name}] completed")
return result
# Research -> Write -> Edit pipeline
final = pipeline(
"Analyse the impact of AI on healthcare",
[researcher, writer, critic]
)
Debate Pattern Example
def debate(topic: str, rounds: int = 2) -> str:
"""Two agents debate, a judge picks the winner."""
optimist = Agent("Optimist",
"You argue the positive case. Be persuasive with evidence.",
"Argues for")
pessimist = Agent("Pessimist",
"You argue the negative case. Be critical with evidence.",
"Argues against")
judge = Agent("Judge",
"You evaluate both arguments fairly and pick the stronger one. "
"Explain your reasoning.",
"Judges debates")
history = []
for round_num in range(rounds):
arg_for = optimist.run(
f"Topic: {topic}\nRound {round_num+1}. Previous arguments:\n" +
"\n".join(history)
)
history.append(f"FOR: {arg_for}")
arg_against = pessimist.run(
f"Topic: {topic}\nRound {round_num+1}. Previous arguments:\n" +
"\n".join(history)
)
history.append(f"AGAINST: {arg_against}")
verdict = judge.run(f"Topic: {topic}\n\nFull debate:\n" + "\n\n".join(history))
return verdict
More agents does not always mean better results. Each agent adds latency, cost, and potential for miscommunication. Start with the minimum number of agents needed and add more only when a single agent genuinely cannot handle the complexity.
Supervisor Architecture
The supervisor architecture extends the orchestrator pattern by adding monitoring, error recovery, and quality control. The supervisor not only delegates tasks but also watches for failures, validates outputs, and can reassign work.
This is critical for production systems where you cannot afford silent failures. A supervisor checks each agent's output against expected criteria before passing it along.
class Supervisor:
"""Manages agents with quality checks and error recovery."""
def __init__(self, agents: list[Agent]):
self.agents = {a.name: a for a in agents}
self.client = anthropic.Anthropic()
self.max_retries = 2
def validate_output(self, output: str, criteria: str) -> bool:
"""Use an LLM call to check if output meets criteria."""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
system="You are a quality checker. Reply YES if the output meets "
"the criteria, NO if it does not. One word only.",
messages=[{"role": "user",
"content": f"Criteria: {criteria}\n\nOutput: {output[:2000]}"}]
)
return "YES" in response.content[0].text.upper()
def delegate(self, agent_name: str, task: str, criteria: str) -> str:
"""Delegate with validation and retry logic."""
agent = self.agents[agent_name]
for attempt in range(self.max_retries + 1):
result = agent.run(task)
if self.validate_output(result, criteria):
return result
print(f"[Supervisor] {agent_name} output failed validation "
f"(attempt {attempt + 1}), retrying...")
task = f"{task}\n\nPrevious attempt was rejected. Improve quality."
return result # Return last attempt even if not perfect
# Usage
supervisor = Supervisor([researcher, writer, critic])
data = supervisor.delegate(
"Researcher",
"Find 5 statistics about AI agent adoption in enterprise",
"Must contain at least 5 specific numerical statistics with sources"
)
report = supervisor.delegate(
"Writer",
f"Write an executive summary using this data:\n{data}",
"Must be 200-400 words, professional tone, structured with bullet points"
)
The validation step is itself an LLM call, so factor the cost into your budget. For cheaper validation, use regex checks or JSON schema validation for structured outputs before escalating to LLM-based validation.
Collaborative Workflows
Real-world applications typically combine multiple patterns. Here is a practical example: a content creation pipeline that uses research, writing, fact-checking, and editorial review agents working together.
import asyncio, anthropic
async def run_agent_async(name: str, system: str, task: str) -> dict:
"""Run an agent asynchronously for parallel execution."""
client = anthropic.AsyncAnthropic()
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=system,
messages=[{"role": "user", "content": task}]
)
return {"agent": name, "output": response.content[0].text}
async def content_pipeline(topic: str) -> str:
# Step 1: Parallel research from multiple angles
research_tasks = [
run_agent_async("Data Researcher",
"Find statistics and market data. Cite sources.",
f"Research data about: {topic}"),
run_agent_async("Trend Analyst",
"Identify emerging trends and predictions.",
f"Analyse trends in: {topic}"),
run_agent_async("Expert Finder",
"Find expert quotes and opinions from industry leaders.",
f"Find expert perspectives on: {topic}")
]
research_results = await asyncio.gather(*research_tasks)
combined_research = "\n\n".join(
f"[{r['agent']}]\n{r['output']}" for r in research_results
)
# Step 2: Writer creates draft using all research
draft = await run_agent_async("Writer",
"You are a skilled technical writer. Create engaging, well-structured content.",
f"Write a comprehensive article about {topic}.\n\nResearch:\n{combined_research}")
# Step 3: Parallel review
review_tasks = [
run_agent_async("Fact Checker",
"Verify claims and flag any unsubstantiated statements.",
f"Review this article for factual accuracy:\n{draft['output']}"),
run_agent_async("Editor",
"Improve clarity, flow, and style. Suggest specific edits.",
f"Edit this article for quality:\n{draft['output']}")
]
reviews = await asyncio.gather(*review_tasks)
feedback = "\n\n".join(f"[{r['agent']}]\n{r['output']}" for r in reviews)
# Step 4: Final revision incorporating feedback
final = await run_agent_async("Writer",
"You are a skilled technical writer. Revise based on editorial feedback.",
f"Revise this article:\n{draft['output']}\n\nFeedback:\n{feedback}")
return final["output"]
# Run the pipeline
# result = asyncio.run(content_pipeline("AI agents in enterprise software"))
Using asyncio for parallel agent calls can cut total execution time significantly. In the example above, three research agents run simultaneously instead of sequentially, reducing wait time from 3x to roughly 1x the latency of a single call.
Putting It All Together
When designing multi-agent systems, follow these principles:
- Start simple — begin with 2 agents before building a fleet of 10
- Define clear interfaces — what each agent expects as input and produces as output
- Log everything — trace each agent's input, output, tokens used, and latency
- Handle failures gracefully — if one agent fails, the system should degrade, not crash
- Monitor costs — each agent call is an API call; multi-agent systems multiply costs quickly
| Pattern | Agents | Latency | Best Use Case |
|---|---|---|---|
| Sequential Pipeline | 2-5 | Sum of all agents | Document processing, ETL |
| Hub-and-Spoke | 3-10 | Orchestrator + slowest agent | Research, analysis |
| Debate | 3 | 2x rounds x agent latency | Decision support, risk assessment |
| Iterative Refinement | 2-3 | Rounds x 2 agents | Content creation, code generation |
| Hierarchical | 5-20+ | Depth x slowest branch | Complex enterprise workflows |
Use different models for different agents. A cheap, fast model (like Claude Haiku) works well for simple classification or routing agents, while more capable models (like Claude Sonnet) handle complex reasoning. This can reduce costs by 5-10x without sacrificing quality.