模块12: A2A协议
理解Agent-to-Agent协议,实现跨平台的标准化智能体间通信与协作。
互操作性挑战
在模块9中,我们学习了MCP (Model Context Protocol)如何让智能体通过标准化接口连接工具和数据源。但当智能体需要与其他智能体通信时会怎样?如果你的研究智能体是用LangChain构建的,而写作智能体运行在自定义框架上,它们如何通信?
没有标准协议的话,每个智能体到智能体的集成都变成了自定义的、脆弱的点对点连接。如果你有5个智能体且它们都需要相互通信,那就是10个自定义集成。再加一个第6个智能体,你就需要15个。这无法扩展。
把MCP想象成让每个员工都能使用相同的办公工具(打印机、数据库、文件系统)。A2A则是通用语言和会议协议,让来自不同公司的员工可以在共享项目上协作——即使他们接受了不同的培训,使用不同的内部术语。
行业需要一种标准方式让智能体能够:
- 发现其他智能体能做什么
- 委派任务给最合适的智能体
- 追踪委派工作的进度
- 交换结构化的结果和工件
什么是A2A?
Agent-to-Agent (A2A) 协议是Google于2025年4月推出的开放标准,用于智能体间通信。它是与50多家技术公司合作开发的,设计为框架无关——使用任何技术栈构建的智能体都可以互操作。
A2A定义了四个核心能力:
发现
智能体发布机器可读的Agent Card,描述其技能、端点和认证要求。其他智能体可以自动发现它们。
任务委派
客户端智能体创建一个Task并发送给远程智能体。任务包含一条消息和请求,远程智能体处理它。
流式传输
对于长时间运行的任务,智能体可以通过Server-Sent Events (SSE)流式传输进度更新,让客户端智能体实时监控状态。
工件交换
智能体以Artifacts的形式交换结构化数据——文件、JSON对象、图像或任何内容类型。这超越了纯文本,支持丰富的数据载荷。
A2A在核心上是传输无关的,但将HTTP + JSON-RPC定义为主要传输方式。智能体通过标准Web协议通信,使其与现有基础设施(如负载均衡器、API网关和认证系统)兼容。
Agent Card
每个符合A2A标准的智能体都会发布一个Agent Card——一个JSON文档,描述智能体是谁、能做什么以及如何与之交互。Agent Card托管在一个已知的URL:/.well-known/agent.json。
将Agent Card视为智能体的API规范。它告诉其他智能体(或编排器)决定是否委派任务以及如何格式化请求所需的一切信息。
{
"name": "Invoice Processor",
"description": "Extracts structured data from invoice documents (PDF, images).",
"url": "https://agents.example.com/invoice",
"version": "1.0.0",
"protocol": "a2a/1.0",
"authentication": {
"schemes": ["bearer"]
},
"skills": [
{
"id": "extract_invoice_data",
"name": "Extract Invoice Data",
"description": "Parse an invoice and return structured line items, totals, and vendor info.",
"inputModes": ["application/pdf", "image/png", "image/jpeg"],
"outputModes": ["application/json"]
},
{
"id": "validate_invoice",
"name": "Validate Invoice",
"description": "Cross-check invoice data against purchase orders.",
"inputModes": ["application/json"],
"outputModes": ["application/json"]
}
],
"capabilities": {
"streaming": true,
"pushNotifications": false
}
}
发现智能体
客户端智能体可以通过获取Agent Card来发现可用的智能体。在典型的企业环境中,注册中心或目录服务会索引可用的智能体。
import httpx
async def discover_agent(base_url: str) -> dict:
"""Fetch and parse an Agent Card from a remote agent."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{base_url}/.well-known/agent.json")
response.raise_for_status()
card = response.json()
print(f"Discovered: {card['name']}")
print(f"Skills: {[s['id'] for s in card['skills']]}")
return card
async def find_agent_for_skill(registry_urls: list[str], skill_needed: str) -> dict:
"""Search a list of known agents for one that has a specific skill."""
for url in registry_urls:
try:
card = await discover_agent(url)
skill_ids = [s["id"] for s in card.get("skills", [])]
if skill_needed in skill_ids:
return card
except httpx.HTTPError:
continue
raise ValueError(f"No agent found with skill: {skill_needed}")
在生产环境中,维护一个带有TTL(生存时间)的Agent Card本地缓存。每次请求都重新获取卡片会增加延迟。定期刷新缓存,或在收到版本不匹配错误时刷新。
任务生命周期
A2A中的通信围绕Task展开。客户端智能体创建一个任务,将其发送给远程智能体,远程智能体通过定义良好的生命周期来处理它。
| 状态 | 描述 | 可转换为 |
|---|---|---|
submitted | 远程智能体已收到任务,排队等待处理 | working, failed |
working | 智能体正在积极处理任务 | completed, failed, input-needed |
input-needed | 智能体需要来自客户端的额外信息 | working, failed |
completed | 任务成功完成,工件可用 | (终态) |
failed | 任务失败,附带错误消息 | (终态) |
创建和发送任务
import httpx, uuid
async def send_task(agent_url: str, skill_id: str, message: str,
auth_token: str) -> dict:
"""Send a task to a remote A2A agent."""
task_id = str(uuid.uuid4())
payload = {
"jsonrpc": "2.0",
"method": "tasks/send",
"id": task_id,
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
},
"metadata": {
"skill_id": skill_id
}
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
agent_url,
json=payload,
headers={"Authorization": f"Bearer {auth_token}"}
)
return response.json()
# Example: delegate invoice processing to a remote agent
# result = await send_task(
# "https://agents.example.com/invoice",
# "extract_invoice_data",
# "Extract all line items from the attached invoice.",
# auth_token="your-token-here"
# )
轮询任务状态
import asyncio
async def poll_task(agent_url: str, task_id: str, auth_token: str,
interval: float = 2.0) -> dict:
"""Poll a task until it reaches a terminal state."""
terminal_states = {"completed", "failed"}
async with httpx.AsyncClient() as client:
while True:
response = await client.post(agent_url, json={
"jsonrpc": "2.0",
"method": "tasks/get",
"id": "poll-1",
"params": {"id": task_id}
}, headers={"Authorization": f"Bearer {auth_token}"})
result = response.json()["result"]
status = result["status"]["state"]
print(f"Task {task_id}: {status}")
if status in terminal_states:
return result
await asyncio.sleep(interval)
使用Server-Sent Events进行流式传输
对于长时间运行的任务,A2A支持通过SSE进行流式更新。客户端订阅一个流,在智能体工作时接收事件,而不是轮询。
import httpx
async def stream_task(agent_url: str, task_id: str, skill_id: str,
message: str, auth_token: str):
"""Subscribe to streaming updates from a remote agent."""
payload = {
"jsonrpc": "2.0",
"method": "tasks/sendSubscribe",
"id": task_id,
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
}
async with httpx.AsyncClient() as client:
async with client.stream("POST", agent_url, json=payload,
headers={"Authorization": f"Bearer {auth_token}"}) as resp:
async for line in resp.aiter_lines():
if line.startswith("data:"):
event_data = json.loads(line[5:].strip())
print(f"Event: {event_data}")
并非所有A2A智能体都支持流式传输。在尝试使用 tasks/sendSubscribe 之前,务必检查Agent Card中的 capabilities.streaming 字段。如果流式传输不可用,请回退到轮询。
A2A vs MCP
A2A和MCP经常被混淆,因为两者都是"AI智能体的协议"。区别很简单:MCP将智能体连接到工具和数据,而A2A将智能体连接到其他智能体。它们在不同层面运作,设计为协同工作。
| 方面 | MCP | A2A |
|---|---|---|
| 目的 | 智能体 ↔ 工具与数据 | 智能体 ↔ 智能体 |
| 发起方 | Anthropic | |
| 传输层 | stdio / HTTP+SSE | HTTP + JSON-RPC |
| 发现 | 服务器能力列表 | Agent Card(已知URL) |
| 交互模型 | 工具调用(请求/响应) | 带生命周期状态的任务 |
| 流式传输 | SSE用于资源 | SSE用于任务进度 |
| 最佳用途 | 连接数据库、API、文件、服务 | 在自主智能体之间委派工作 |
| 不透明执行 | 否——客户端控制工具调用 | 是——远程智能体决定如何执行 |
一个关键的架构区别:使用MCP时,客户端智能体决定调用哪些工具以及何时调用。使用A2A时,客户端委派任务,远程智能体自主决定如何完成。A2A将远程智能体视为不透明的协作者,而非被控制的工具。
结合使用两者
在生产环境的多智能体系统中,典型架构同时使用两种协议:
# An orchestrator agent that uses both MCP and A2A
#
# MCP: Connect to local tools (database, file system, APIs)
# A2A: Delegate to remote specialist agents
# 1. Use MCP to read customer data from a database
customer_data = await mcp_client.call_tool("database_query", {
"sql": "SELECT * FROM customers WHERE id = 42"
})
# 2. Use A2A to delegate analysis to a remote specialist agent
analysis_result = await send_task(
agent_url="https://agents.analytics-team.com/customer-insights",
skill_id="analyse_customer",
message=f"Analyse this customer profile and suggest retention strategies: "
f"{customer_data}"
)
# 3. Use MCP to write the result back to a local system
await mcp_client.call_tool("database_insert", {
"table": "customer_insights",
"data": analysis_result
})
实现A2A服务器
要让你的智能体通过A2A对其他智能体可用,你需要实现一个符合A2A标准的服务器。Google A2A SDK提供了参考实现,下面是一个展示核心概念的简化示例。
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uuid
app = FastAPI()
# In-memory task store (use a database in production)
tasks = {}
@app.get("/.well-known/agent.json")
async def agent_card():
"""Serve the Agent Card for discovery."""
return {
"name": "Summary Agent",
"description": "Summarises documents and articles.",
"url": "https://my-agent.example.com",
"version": "1.0.0",
"protocol": "a2a/1.0",
"skills": [{
"id": "summarise",
"name": "Summarise Text",
"description": "Create a concise summary of any text input.",
"inputModes": ["text/plain"],
"outputModes": ["text/plain"]
}],
"capabilities": {"streaming": False}
}
@app.post("/")
async def handle_jsonrpc(request: Request):
"""Handle A2A JSON-RPC requests."""
body = await request.json()
method = body.get("method")
if method == "tasks/send":
return await handle_task_send(body)
elif method == "tasks/get":
return await handle_task_get(body)
else:
return JSONResponse({"jsonrpc": "2.0", "error": {
"code": -32601, "message": f"Unknown method: {method}"
}, "id": body.get("id")})
async def handle_task_send(body: dict) -> JSONResponse:
"""Process a new task."""
params = body["params"]
task_id = params.get("id", str(uuid.uuid4()))
message_text = params["message"]["parts"][0]["text"]
# Process the task (in production, do this async with a queue)
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4-20250514", max_tokens=1024,
system="You create clear, concise summaries.",
messages=[{"role": "user", "content": f"Summarise:\n{message_text}"}]
)
summary = response.content[0].text
task_result = {
"id": task_id,
"status": {"state": "completed"},
"artifacts": [{
"parts": [{"type": "text", "text": summary}]
}]
}
tasks[task_id] = task_result
return JSONResponse({"jsonrpc": "2.0", "result": task_result, "id": body["id"]})
async def handle_task_get(body: dict) -> JSONResponse:
"""Return task status and results."""
task_id = body["params"]["id"]
if task_id in tasks:
return JSONResponse({"jsonrpc": "2.0", "result": tasks[task_id], "id": body["id"]})
return JSONResponse({"jsonrpc": "2.0", "error": {
"code": -32602, "message": "Task not found"
}, "id": body["id"]})
对于生产级A2A服务器,建议使用Google官方的 a2a-python SDK,而不是从头构建。它开箱即用地处理了JSON-RPC解析、SSE流式传输、任务状态管理和认证。
未来
MCP和A2A的结合为开放的智能体生态系统奠定了基础。使用任何框架构建的智能体可以互相发现、委派任务和交换结果——就像今天的Web服务通过REST API通信一样。
值得关注的趋势:
- 智能体市场 — 组织发布符合A2A标准的智能体的目录,类似于API市场
- 跨组织协作 — 来自不同公司的智能体在共享工作流上合作(例如,供应链协调)
- 智能体身份和信任 — 可验证的智能体凭证和能力证明,确保你委派给值得信赖的智能体
- 标准化计费 — 智能体之间协商和支付服务费用的协议
A2A仍然是一个不断演进的标准。截至2026年初,核心协议已稳定,但计费、信任和高级发现的扩展仍在开发中。请关注A2A官方仓库获取更新。