Ran Wei/ AI Agents/Module 12
中文
AI Agent Series — Ran Wei

Module 12: A2A Protocol

Understanding the Agent-to-Agent protocol for standardised inter-agent communication and collaboration across platforms.

1

The Interoperability Challenge

In Module 9, we learned how MCP (Model Context Protocol) lets agents connect to tools and data sources through a standardised interface. But what happens when agents need to talk to each other? If your research agent is built with LangChain and your writing agent runs on a custom framework, how do they communicate?

Without a standard protocol, every agent-to-agent integration becomes a custom, fragile point-to-point connection. If you have 5 agents and they all need to talk to each other, that is 10 custom integrations. Add a 6th agent and you need 15. This does not scale.

ANALOGY

Think of MCP as giving every worker access to the same office tools (printers, databases, file systems). A2A is the common language and meeting protocol that lets workers from different companies collaborate on shared projects — even if they were trained differently and speak different internal jargons.

The industry needed a standard way for agents to:

2

What is A2A?

The Agent-to-Agent (A2A) protocol is an open standard introduced by Google in April 2025 for inter-agent communication. It was developed in collaboration with over 50 technology companies and is designed to be framework-agnostic — agents built with any stack can interoperate.

A2A defines four core capabilities:

Discovery

Agents publish machine-readable Agent Cards describing their skills, endpoints, and authentication requirements. Other agents can discover them automatically.

Task Delegation

A client agent creates a Task and sends it to a remote agent. The task contains a message with the request, and the remote agent processes it.

Streaming

For long-running tasks, agents can stream progress updates via Server-Sent Events (SSE), letting the client agent monitor status in real time.

Artifact Exchange

Agents exchange structured data as Artifacts — files, JSON objects, images, or any content type. This goes beyond plain text to rich data payloads.

NOTE

A2A is transport-agnostic at its core but defines HTTP + JSON-RPC as the primary transport. Agents communicate over standard web protocols, making them compatible with existing infrastructure like load balancers, API gateways, and authentication systems.

3

Agent Cards

Every A2A-compliant agent publishes an Agent Card — a JSON document that describes who the agent is, what it can do, and how to interact with it. Agent Cards are hosted at a well-known URL: /.well-known/agent.json.

Think of an Agent Card as an API specification for an agent. It tells other agents (or orchestrators) everything they need to know to decide whether to delegate a task and how to format the request.

{
  "name": "Invoice Processor",
  "description": "Extracts structured data from invoice documents (PDF, images).",
  "url": "https://agents.example.com/invoice",
  "version": "1.0.0",
  "protocol": "a2a/1.0",
  "authentication": {
    "schemes": ["bearer"]
  },
  "skills": [
    {
      "id": "extract_invoice_data",
      "name": "Extract Invoice Data",
      "description": "Parse an invoice and return structured line items, totals, and vendor info.",
      "inputModes": ["application/pdf", "image/png", "image/jpeg"],
      "outputModes": ["application/json"]
    },
    {
      "id": "validate_invoice",
      "name": "Validate Invoice",
      "description": "Cross-check invoice data against purchase orders.",
      "inputModes": ["application/json"],
      "outputModes": ["application/json"]
    }
  ],
  "capabilities": {
    "streaming": true,
    "pushNotifications": false
  }
}

Discovering Agents

A client agent can discover available agents by fetching their Agent Cards. In a typical enterprise setup, a registry or directory service indexes available agents.

import httpx

async def discover_agent(base_url: str) -> dict:
    """Fetch and parse an Agent Card from a remote agent."""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{base_url}/.well-known/agent.json")
        response.raise_for_status()
        card = response.json()
        print(f"Discovered: {card['name']}")
        print(f"Skills: {[s['id'] for s in card['skills']]}")
        return card

async def find_agent_for_skill(registry_urls: list[str], skill_needed: str) -> dict:
    """Search a list of known agents for one that has a specific skill."""
    for url in registry_urls:
        try:
            card = await discover_agent(url)
            skill_ids = [s["id"] for s in card.get("skills", [])]
            if skill_needed in skill_ids:
                return card
        except httpx.HTTPError:
            continue
    raise ValueError(f"No agent found with skill: {skill_needed}")
TIP

In production, maintain a local cache of Agent Cards with a TTL (time-to-live). Re-fetching cards on every request adds latency. Refresh the cache periodically or when you get a version mismatch error.

4

Task Lifecycle

Communication in A2A revolves around Tasks. A client agent creates a task, sends it to a remote agent, and the remote agent processes it through a well-defined lifecycle.

StateDescriptionTransitions To
submittedTask received by remote agent, queued for processingworking, failed
workingAgent is actively processing the taskcompleted, failed, input-needed
input-neededAgent needs additional information from the clientworking, failed
completedTask finished successfully, artifacts available(terminal)
failedTask failed with an error message(terminal)

Creating and Sending a Task

import httpx, uuid

async def send_task(agent_url: str, skill_id: str, message: str,
                    auth_token: str) -> dict:
    """Send a task to a remote A2A agent."""
    task_id = str(uuid.uuid4())
    payload = {
        "jsonrpc": "2.0",
        "method": "tasks/send",
        "id": task_id,
        "params": {
            "id": task_id,
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            },
            "metadata": {
                "skill_id": skill_id
            }
        }
    }
    async with httpx.AsyncClient() as client:
        response = await client.post(
            agent_url,
            json=payload,
            headers={"Authorization": f"Bearer {auth_token}"}
        )
        return response.json()

# Example: delegate invoice processing to a remote agent
# result = await send_task(
#     "https://agents.example.com/invoice",
#     "extract_invoice_data",
#     "Extract all line items from the attached invoice.",
#     auth_token="your-token-here"
# )

Polling for Task Status

import asyncio

async def poll_task(agent_url: str, task_id: str, auth_token: str,
                    interval: float = 2.0) -> dict:
    """Poll a task until it reaches a terminal state."""
    terminal_states = {"completed", "failed"}
    async with httpx.AsyncClient() as client:
        while True:
            response = await client.post(agent_url, json={
                "jsonrpc": "2.0",
                "method": "tasks/get",
                "id": "poll-1",
                "params": {"id": task_id}
            }, headers={"Authorization": f"Bearer {auth_token}"})

            result = response.json()["result"]
            status = result["status"]["state"]
            print(f"Task {task_id}: {status}")

            if status in terminal_states:
                return result
            await asyncio.sleep(interval)

Streaming with Server-Sent Events

For long-running tasks, A2A supports streaming updates via SSE. Instead of polling, the client subscribes to a stream and receives events as the agent works.

import httpx

async def stream_task(agent_url: str, task_id: str, skill_id: str,
                      message: str, auth_token: str):
    """Subscribe to streaming updates from a remote agent."""
    payload = {
        "jsonrpc": "2.0",
        "method": "tasks/sendSubscribe",
        "id": task_id,
        "params": {
            "id": task_id,
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            }
        }
    }
    async with httpx.AsyncClient() as client:
        async with client.stream("POST", agent_url, json=payload,
                                  headers={"Authorization": f"Bearer {auth_token}"}) as resp:
            async for line in resp.aiter_lines():
                if line.startswith("data:"):
                    event_data = json.loads(line[5:].strip())
                    print(f"Event: {event_data}")
PITFALL

Not all A2A agents support streaming. Always check the capabilities.streaming field in the Agent Card before attempting to use tasks/sendSubscribe. Fall back to polling if streaming is unavailable.

5

A2A vs MCP

A2A and MCP are frequently confused because both are "protocols for AI agents." The distinction is straightforward: MCP connects agents to tools and data, while A2A connects agents to other agents. They operate at different layers and are designed to work together.

AspectMCPA2A
PurposeAgent ↔ Tools & DataAgent ↔ Agent
Originated byAnthropicGoogle
Transportstdio / HTTP+SSEHTTP + JSON-RPC
DiscoveryServer capabilities listAgent Cards at well-known URL
Interaction modelTool calls (request/response)Tasks with lifecycle states
StreamingSSE for resourcesSSE for task progress
Best forConnecting to databases, APIs, files, servicesDelegating work between autonomous agents
Opaque executionNo — client controls tool callsYes — remote agent decides how to execute
NOTE

A key architectural difference: with MCP, the client agent decides which tools to call and when. With A2A, the client delegates a task and the remote agent autonomously decides how to accomplish it. A2A treats remote agents as opaque collaborators, not as tools to be controlled.

Using Both Together

In a production multi-agent system, a typical architecture uses both protocols:

# An orchestrator agent that uses both MCP and A2A
#
# MCP: Connect to local tools (database, file system, APIs)
# A2A: Delegate to remote specialist agents

# 1. Use MCP to read customer data from a database
customer_data = await mcp_client.call_tool("database_query", {
    "sql": "SELECT * FROM customers WHERE id = 42"
})

# 2. Use A2A to delegate analysis to a remote specialist agent
analysis_result = await send_task(
    agent_url="https://agents.analytics-team.com/customer-insights",
    skill_id="analyse_customer",
    message=f"Analyse this customer profile and suggest retention strategies: "
            f"{customer_data}"
)

# 3. Use MCP to write the result back to a local system
await mcp_client.call_tool("database_insert", {
    "table": "customer_insights",
    "data": analysis_result
})
6

Implementing an A2A Server

To make your agent available to other agents via A2A, you need to implement an A2A-compliant server. The Google A2A SDK provides reference implementations, but here is a simplified example showing the core concepts.

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uuid

app = FastAPI()

# In-memory task store (use a database in production)
tasks = {}

@app.get("/.well-known/agent.json")
async def agent_card():
    """Serve the Agent Card for discovery."""
    return {
        "name": "Summary Agent",
        "description": "Summarises documents and articles.",
        "url": "https://my-agent.example.com",
        "version": "1.0.0",
        "protocol": "a2a/1.0",
        "skills": [{
            "id": "summarise",
            "name": "Summarise Text",
            "description": "Create a concise summary of any text input.",
            "inputModes": ["text/plain"],
            "outputModes": ["text/plain"]
        }],
        "capabilities": {"streaming": False}
    }

@app.post("/")
async def handle_jsonrpc(request: Request):
    """Handle A2A JSON-RPC requests."""
    body = await request.json()
    method = body.get("method")

    if method == "tasks/send":
        return await handle_task_send(body)
    elif method == "tasks/get":
        return await handle_task_get(body)
    else:
        return JSONResponse({"jsonrpc": "2.0", "error": {
            "code": -32601, "message": f"Unknown method: {method}"
        }, "id": body.get("id")})

async def handle_task_send(body: dict) -> JSONResponse:
    """Process a new task."""
    params = body["params"]
    task_id = params.get("id", str(uuid.uuid4()))
    message_text = params["message"]["parts"][0]["text"]

    # Process the task (in production, do this async with a queue)
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-20250514", max_tokens=1024,
        system="You create clear, concise summaries.",
        messages=[{"role": "user", "content": f"Summarise:\n{message_text}"}]
    )
    summary = response.content[0].text

    task_result = {
        "id": task_id,
        "status": {"state": "completed"},
        "artifacts": [{
            "parts": [{"type": "text", "text": summary}]
        }]
    }
    tasks[task_id] = task_result
    return JSONResponse({"jsonrpc": "2.0", "result": task_result, "id": body["id"]})

async def handle_task_get(body: dict) -> JSONResponse:
    """Return task status and results."""
    task_id = body["params"]["id"]
    if task_id in tasks:
        return JSONResponse({"jsonrpc": "2.0", "result": tasks[task_id], "id": body["id"]})
    return JSONResponse({"jsonrpc": "2.0", "error": {
        "code": -32602, "message": "Task not found"
    }, "id": body["id"]})
TIP

For production A2A servers, use the official a2a-python SDK from Google rather than building from scratch. It handles JSON-RPC parsing, SSE streaming, task state management, and authentication out of the box.

7

The Future

The combination of MCP and A2A creates the foundation for an open agent ecosystem. Agents built with any framework can discover each other, delegate tasks, and exchange results — just as web services communicate via REST APIs today.

Key trends to watch:

NOTE

A2A is still an evolving standard. As of early 2026, the core protocol is stable, but extensions for billing, trust, and advanced discovery are still being developed. Follow the official A2A repository for updates.

Up Next

Module 13 — Safety & Guardrails