Module 12: A2A Protocol
Understanding the Agent-to-Agent protocol for standardised inter-agent communication and collaboration across platforms.
The Interoperability Challenge
In Module 9, we learned how MCP (Model Context Protocol) lets agents connect to tools and data sources through a standardised interface. But what happens when agents need to talk to each other? If your research agent is built with LangChain and your writing agent runs on a custom framework, how do they communicate?
Without a standard protocol, every agent-to-agent integration becomes a custom, fragile point-to-point connection. If you have 5 agents and they all need to talk to each other, that is 10 custom integrations. Add a 6th agent and you need 15. This does not scale.
Think of MCP as giving every worker access to the same office tools (printers, databases, file systems). A2A is the common language and meeting protocol that lets workers from different companies collaborate on shared projects — even if they were trained differently and speak different internal jargons.
The industry needed a standard way for agents to:
- Discover what other agents can do
- Delegate tasks to the best-suited agent
- Track progress on delegated work
- Exchange structured results and artifacts
What is A2A?
The Agent-to-Agent (A2A) protocol is an open standard introduced by Google in April 2025 for inter-agent communication. It was developed in collaboration with over 50 technology companies and is designed to be framework-agnostic — agents built with any stack can interoperate.
A2A defines four core capabilities:
Discovery
Agents publish machine-readable Agent Cards describing their skills, endpoints, and authentication requirements. Other agents can discover them automatically.
Task Delegation
A client agent creates a Task and sends it to a remote agent. The task contains a message with the request, and the remote agent processes it.
Streaming
For long-running tasks, agents can stream progress updates via Server-Sent Events (SSE), letting the client agent monitor status in real time.
Artifact Exchange
Agents exchange structured data as Artifacts — files, JSON objects, images, or any content type. This goes beyond plain text to rich data payloads.
A2A is transport-agnostic at its core but defines HTTP + JSON-RPC as the primary transport. Agents communicate over standard web protocols, making them compatible with existing infrastructure like load balancers, API gateways, and authentication systems.
Agent Cards
Every A2A-compliant agent publishes an Agent Card — a JSON document that describes who the agent is, what it can do, and how to interact with it. Agent Cards are hosted at a well-known URL: /.well-known/agent.json.
Think of an Agent Card as an API specification for an agent. It tells other agents (or orchestrators) everything they need to know to decide whether to delegate a task and how to format the request.
{
"name": "Invoice Processor",
"description": "Extracts structured data from invoice documents (PDF, images).",
"url": "https://agents.example.com/invoice",
"version": "1.0.0",
"protocol": "a2a/1.0",
"authentication": {
"schemes": ["bearer"]
},
"skills": [
{
"id": "extract_invoice_data",
"name": "Extract Invoice Data",
"description": "Parse an invoice and return structured line items, totals, and vendor info.",
"inputModes": ["application/pdf", "image/png", "image/jpeg"],
"outputModes": ["application/json"]
},
{
"id": "validate_invoice",
"name": "Validate Invoice",
"description": "Cross-check invoice data against purchase orders.",
"inputModes": ["application/json"],
"outputModes": ["application/json"]
}
],
"capabilities": {
"streaming": true,
"pushNotifications": false
}
}
Discovering Agents
A client agent can discover available agents by fetching their Agent Cards. In a typical enterprise setup, a registry or directory service indexes available agents.
import httpx
async def discover_agent(base_url: str) -> dict:
"""Fetch and parse an Agent Card from a remote agent."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{base_url}/.well-known/agent.json")
response.raise_for_status()
card = response.json()
print(f"Discovered: {card['name']}")
print(f"Skills: {[s['id'] for s in card['skills']]}")
return card
async def find_agent_for_skill(registry_urls: list[str], skill_needed: str) -> dict:
"""Search a list of known agents for one that has a specific skill."""
for url in registry_urls:
try:
card = await discover_agent(url)
skill_ids = [s["id"] for s in card.get("skills", [])]
if skill_needed in skill_ids:
return card
except httpx.HTTPError:
continue
raise ValueError(f"No agent found with skill: {skill_needed}")
In production, maintain a local cache of Agent Cards with a TTL (time-to-live). Re-fetching cards on every request adds latency. Refresh the cache periodically or when you get a version mismatch error.
Task Lifecycle
Communication in A2A revolves around Tasks. A client agent creates a task, sends it to a remote agent, and the remote agent processes it through a well-defined lifecycle.
| State | Description | Transitions To |
|---|---|---|
submitted | Task received by remote agent, queued for processing | working, failed |
working | Agent is actively processing the task | completed, failed, input-needed |
input-needed | Agent needs additional information from the client | working, failed |
completed | Task finished successfully, artifacts available | (terminal) |
failed | Task failed with an error message | (terminal) |
Creating and Sending a Task
import httpx, uuid
async def send_task(agent_url: str, skill_id: str, message: str,
auth_token: str) -> dict:
"""Send a task to a remote A2A agent."""
task_id = str(uuid.uuid4())
payload = {
"jsonrpc": "2.0",
"method": "tasks/send",
"id": task_id,
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
},
"metadata": {
"skill_id": skill_id
}
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
agent_url,
json=payload,
headers={"Authorization": f"Bearer {auth_token}"}
)
return response.json()
# Example: delegate invoice processing to a remote agent
# result = await send_task(
# "https://agents.example.com/invoice",
# "extract_invoice_data",
# "Extract all line items from the attached invoice.",
# auth_token="your-token-here"
# )
Polling for Task Status
import asyncio
async def poll_task(agent_url: str, task_id: str, auth_token: str,
interval: float = 2.0) -> dict:
"""Poll a task until it reaches a terminal state."""
terminal_states = {"completed", "failed"}
async with httpx.AsyncClient() as client:
while True:
response = await client.post(agent_url, json={
"jsonrpc": "2.0",
"method": "tasks/get",
"id": "poll-1",
"params": {"id": task_id}
}, headers={"Authorization": f"Bearer {auth_token}"})
result = response.json()["result"]
status = result["status"]["state"]
print(f"Task {task_id}: {status}")
if status in terminal_states:
return result
await asyncio.sleep(interval)
Streaming with Server-Sent Events
For long-running tasks, A2A supports streaming updates via SSE. Instead of polling, the client subscribes to a stream and receives events as the agent works.
import httpx
async def stream_task(agent_url: str, task_id: str, skill_id: str,
message: str, auth_token: str):
"""Subscribe to streaming updates from a remote agent."""
payload = {
"jsonrpc": "2.0",
"method": "tasks/sendSubscribe",
"id": task_id,
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
}
async with httpx.AsyncClient() as client:
async with client.stream("POST", agent_url, json=payload,
headers={"Authorization": f"Bearer {auth_token}"}) as resp:
async for line in resp.aiter_lines():
if line.startswith("data:"):
event_data = json.loads(line[5:].strip())
print(f"Event: {event_data}")
Not all A2A agents support streaming. Always check the capabilities.streaming field in the Agent Card before attempting to use tasks/sendSubscribe. Fall back to polling if streaming is unavailable.
A2A vs MCP
A2A and MCP are frequently confused because both are "protocols for AI agents." The distinction is straightforward: MCP connects agents to tools and data, while A2A connects agents to other agents. They operate at different layers and are designed to work together.
| Aspect | MCP | A2A |
|---|---|---|
| Purpose | Agent ↔ Tools & Data | Agent ↔ Agent |
| Originated by | Anthropic | |
| Transport | stdio / HTTP+SSE | HTTP + JSON-RPC |
| Discovery | Server capabilities list | Agent Cards at well-known URL |
| Interaction model | Tool calls (request/response) | Tasks with lifecycle states |
| Streaming | SSE for resources | SSE for task progress |
| Best for | Connecting to databases, APIs, files, services | Delegating work between autonomous agents |
| Opaque execution | No — client controls tool calls | Yes — remote agent decides how to execute |
A key architectural difference: with MCP, the client agent decides which tools to call and when. With A2A, the client delegates a task and the remote agent autonomously decides how to accomplish it. A2A treats remote agents as opaque collaborators, not as tools to be controlled.
Using Both Together
In a production multi-agent system, a typical architecture uses both protocols:
# An orchestrator agent that uses both MCP and A2A
#
# MCP: Connect to local tools (database, file system, APIs)
# A2A: Delegate to remote specialist agents
# 1. Use MCP to read customer data from a database
customer_data = await mcp_client.call_tool("database_query", {
"sql": "SELECT * FROM customers WHERE id = 42"
})
# 2. Use A2A to delegate analysis to a remote specialist agent
analysis_result = await send_task(
agent_url="https://agents.analytics-team.com/customer-insights",
skill_id="analyse_customer",
message=f"Analyse this customer profile and suggest retention strategies: "
f"{customer_data}"
)
# 3. Use MCP to write the result back to a local system
await mcp_client.call_tool("database_insert", {
"table": "customer_insights",
"data": analysis_result
})
Implementing an A2A Server
To make your agent available to other agents via A2A, you need to implement an A2A-compliant server. The Google A2A SDK provides reference implementations, but here is a simplified example showing the core concepts.
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uuid
app = FastAPI()
# In-memory task store (use a database in production)
tasks = {}
@app.get("/.well-known/agent.json")
async def agent_card():
"""Serve the Agent Card for discovery."""
return {
"name": "Summary Agent",
"description": "Summarises documents and articles.",
"url": "https://my-agent.example.com",
"version": "1.0.0",
"protocol": "a2a/1.0",
"skills": [{
"id": "summarise",
"name": "Summarise Text",
"description": "Create a concise summary of any text input.",
"inputModes": ["text/plain"],
"outputModes": ["text/plain"]
}],
"capabilities": {"streaming": False}
}
@app.post("/")
async def handle_jsonrpc(request: Request):
"""Handle A2A JSON-RPC requests."""
body = await request.json()
method = body.get("method")
if method == "tasks/send":
return await handle_task_send(body)
elif method == "tasks/get":
return await handle_task_get(body)
else:
return JSONResponse({"jsonrpc": "2.0", "error": {
"code": -32601, "message": f"Unknown method: {method}"
}, "id": body.get("id")})
async def handle_task_send(body: dict) -> JSONResponse:
"""Process a new task."""
params = body["params"]
task_id = params.get("id", str(uuid.uuid4()))
message_text = params["message"]["parts"][0]["text"]
# Process the task (in production, do this async with a queue)
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4-20250514", max_tokens=1024,
system="You create clear, concise summaries.",
messages=[{"role": "user", "content": f"Summarise:\n{message_text}"}]
)
summary = response.content[0].text
task_result = {
"id": task_id,
"status": {"state": "completed"},
"artifacts": [{
"parts": [{"type": "text", "text": summary}]
}]
}
tasks[task_id] = task_result
return JSONResponse({"jsonrpc": "2.0", "result": task_result, "id": body["id"]})
async def handle_task_get(body: dict) -> JSONResponse:
"""Return task status and results."""
task_id = body["params"]["id"]
if task_id in tasks:
return JSONResponse({"jsonrpc": "2.0", "result": tasks[task_id], "id": body["id"]})
return JSONResponse({"jsonrpc": "2.0", "error": {
"code": -32602, "message": "Task not found"
}, "id": body["id"]})
For production A2A servers, use the official a2a-python SDK from Google rather than building from scratch. It handles JSON-RPC parsing, SSE streaming, task state management, and authentication out of the box.
The Future
The combination of MCP and A2A creates the foundation for an open agent ecosystem. Agents built with any framework can discover each other, delegate tasks, and exchange results — just as web services communicate via REST APIs today.
Key trends to watch:
- Agent marketplaces — directories where organisations publish A2A-compliant agents, similar to API marketplaces
- Cross-organisation collaboration — agents from different companies working together on shared workflows (e.g., supply chain coordination)
- Agent identity and trust — verifiable agent credentials and capability attestations to ensure you are delegating to a trustworthy agent
- Standardised billing — protocols for agents to negotiate and pay for services rendered by other agents
A2A is still an evolving standard. As of early 2026, the core protocol is stable, but extensions for billing, trust, and advanced discovery are still being developed. Follow the official A2A repository for updates.