Ran Wei/ AI智能体/模块12
EN
AI智能体系列 — Ran Wei

模块12: A2A协议

理解Agent-to-Agent协议,实现跨平台的标准化智能体间通信与协作。

1

互操作性挑战

在模块9中,我们学习了MCP (Model Context Protocol)如何让智能体通过标准化接口连接工具和数据源。但当智能体需要与其他智能体通信时会怎样?如果你的研究智能体是用LangChain构建的,而写作智能体运行在自定义框架上,它们如何通信?

没有标准协议的话,每个智能体到智能体的集成都变成了自定义的、脆弱的点对点连接。如果你有5个智能体且它们都需要相互通信,那就是10个自定义集成。再加一个第6个智能体,你就需要15个。这无法扩展。

类比

把MCP想象成让每个员工都能使用相同的办公工具(打印机、数据库、文件系统)。A2A则是通用语言和会议协议,让来自不同公司的员工可以在共享项目上协作——即使他们接受了不同的培训,使用不同的内部术语。

行业需要一种标准方式让智能体能够:

2

什么是A2A?

Agent-to-Agent (A2A) 协议是Google于2025年4月推出的开放标准,用于智能体间通信。它是与50多家技术公司合作开发的,设计为框架无关——使用任何技术栈构建的智能体都可以互操作。

A2A定义了四个核心能力:

发现

智能体发布机器可读的Agent Card,描述其技能、端点和认证要求。其他智能体可以自动发现它们。

任务委派

客户端智能体创建一个Task并发送给远程智能体。任务包含一条消息和请求,远程智能体处理它。

流式传输

对于长时间运行的任务,智能体可以通过Server-Sent Events (SSE)流式传输进度更新,让客户端智能体实时监控状态。

工件交换

智能体以Artifacts的形式交换结构化数据——文件、JSON对象、图像或任何内容类型。这超越了纯文本,支持丰富的数据载荷。

注意

A2A在核心上是传输无关的,但将HTTP + JSON-RPC定义为主要传输方式。智能体通过标准Web协议通信,使其与现有基础设施(如负载均衡器、API网关和认证系统)兼容。

3

Agent Card

每个符合A2A标准的智能体都会发布一个Agent Card——一个JSON文档,描述智能体是谁、能做什么以及如何与之交互。Agent Card托管在一个已知的URL:/.well-known/agent.json

将Agent Card视为智能体的API规范。它告诉其他智能体(或编排器)决定是否委派任务以及如何格式化请求所需的一切信息。

{
  "name": "Invoice Processor",
  "description": "Extracts structured data from invoice documents (PDF, images).",
  "url": "https://agents.example.com/invoice",
  "version": "1.0.0",
  "protocol": "a2a/1.0",
  "authentication": {
    "schemes": ["bearer"]
  },
  "skills": [
    {
      "id": "extract_invoice_data",
      "name": "Extract Invoice Data",
      "description": "Parse an invoice and return structured line items, totals, and vendor info.",
      "inputModes": ["application/pdf", "image/png", "image/jpeg"],
      "outputModes": ["application/json"]
    },
    {
      "id": "validate_invoice",
      "name": "Validate Invoice",
      "description": "Cross-check invoice data against purchase orders.",
      "inputModes": ["application/json"],
      "outputModes": ["application/json"]
    }
  ],
  "capabilities": {
    "streaming": true,
    "pushNotifications": false
  }
}

发现智能体

客户端智能体可以通过获取Agent Card来发现可用的智能体。在典型的企业环境中,注册中心或目录服务会索引可用的智能体。

import httpx

async def discover_agent(base_url: str) -> dict:
    """Fetch and parse an Agent Card from a remote agent."""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{base_url}/.well-known/agent.json")
        response.raise_for_status()
        card = response.json()
        print(f"Discovered: {card['name']}")
        print(f"Skills: {[s['id'] for s in card['skills']]}")
        return card

async def find_agent_for_skill(registry_urls: list[str], skill_needed: str) -> dict:
    """Search a list of known agents for one that has a specific skill."""
    for url in registry_urls:
        try:
            card = await discover_agent(url)
            skill_ids = [s["id"] for s in card.get("skills", [])]
            if skill_needed in skill_ids:
                return card
        except httpx.HTTPError:
            continue
    raise ValueError(f"No agent found with skill: {skill_needed}")
提示

在生产环境中,维护一个带有TTL(生存时间)的Agent Card本地缓存。每次请求都重新获取卡片会增加延迟。定期刷新缓存,或在收到版本不匹配错误时刷新。

4

任务生命周期

A2A中的通信围绕Task展开。客户端智能体创建一个任务,将其发送给远程智能体,远程智能体通过定义良好的生命周期来处理它。

状态描述可转换为
submitted远程智能体已收到任务,排队等待处理working, failed
working智能体正在积极处理任务completed, failed, input-needed
input-needed智能体需要来自客户端的额外信息working, failed
completed任务成功完成,工件可用(终态)
failed任务失败,附带错误消息(终态)

创建和发送任务

import httpx, uuid

async def send_task(agent_url: str, skill_id: str, message: str,
                    auth_token: str) -> dict:
    """Send a task to a remote A2A agent."""
    task_id = str(uuid.uuid4())
    payload = {
        "jsonrpc": "2.0",
        "method": "tasks/send",
        "id": task_id,
        "params": {
            "id": task_id,
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            },
            "metadata": {
                "skill_id": skill_id
            }
        }
    }
    async with httpx.AsyncClient() as client:
        response = await client.post(
            agent_url,
            json=payload,
            headers={"Authorization": f"Bearer {auth_token}"}
        )
        return response.json()

# Example: delegate invoice processing to a remote agent
# result = await send_task(
#     "https://agents.example.com/invoice",
#     "extract_invoice_data",
#     "Extract all line items from the attached invoice.",
#     auth_token="your-token-here"
# )

轮询任务状态

import asyncio

async def poll_task(agent_url: str, task_id: str, auth_token: str,
                    interval: float = 2.0) -> dict:
    """Poll a task until it reaches a terminal state."""
    terminal_states = {"completed", "failed"}
    async with httpx.AsyncClient() as client:
        while True:
            response = await client.post(agent_url, json={
                "jsonrpc": "2.0",
                "method": "tasks/get",
                "id": "poll-1",
                "params": {"id": task_id}
            }, headers={"Authorization": f"Bearer {auth_token}"})

            result = response.json()["result"]
            status = result["status"]["state"]
            print(f"Task {task_id}: {status}")

            if status in terminal_states:
                return result
            await asyncio.sleep(interval)

使用Server-Sent Events进行流式传输

对于长时间运行的任务,A2A支持通过SSE进行流式更新。客户端订阅一个流,在智能体工作时接收事件,而不是轮询。

import httpx

async def stream_task(agent_url: str, task_id: str, skill_id: str,
                      message: str, auth_token: str):
    """Subscribe to streaming updates from a remote agent."""
    payload = {
        "jsonrpc": "2.0",
        "method": "tasks/sendSubscribe",
        "id": task_id,
        "params": {
            "id": task_id,
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            }
        }
    }
    async with httpx.AsyncClient() as client:
        async with client.stream("POST", agent_url, json=payload,
                                  headers={"Authorization": f"Bearer {auth_token}"}) as resp:
            async for line in resp.aiter_lines():
                if line.startswith("data:"):
                    event_data = json.loads(line[5:].strip())
                    print(f"Event: {event_data}")
陷阱

并非所有A2A智能体都支持流式传输。在尝试使用 tasks/sendSubscribe 之前,务必检查Agent Card中的 capabilities.streaming 字段。如果流式传输不可用,请回退到轮询。

5

A2A vs MCP

A2A和MCP经常被混淆,因为两者都是"AI智能体的协议"。区别很简单:MCP将智能体连接到工具和数据,而A2A将智能体连接到其他智能体。它们在不同层面运作,设计为协同工作。

方面MCPA2A
目的智能体 ↔ 工具与数据智能体 ↔ 智能体
发起方AnthropicGoogle
传输层stdio / HTTP+SSEHTTP + JSON-RPC
发现服务器能力列表Agent Card(已知URL)
交互模型工具调用(请求/响应)带生命周期状态的任务
流式传输SSE用于资源SSE用于任务进度
最佳用途连接数据库、API、文件、服务在自主智能体之间委派工作
不透明执行否——客户端控制工具调用是——远程智能体决定如何执行
注意

一个关键的架构区别:使用MCP时,客户端智能体决定调用哪些工具以及何时调用。使用A2A时,客户端委派任务,远程智能体自主决定如何完成。A2A将远程智能体视为不透明的协作者,而非被控制的工具。

结合使用两者

在生产环境的多智能体系统中,典型架构同时使用两种协议:

# An orchestrator agent that uses both MCP and A2A
#
# MCP: Connect to local tools (database, file system, APIs)
# A2A: Delegate to remote specialist agents

# 1. Use MCP to read customer data from a database
customer_data = await mcp_client.call_tool("database_query", {
    "sql": "SELECT * FROM customers WHERE id = 42"
})

# 2. Use A2A to delegate analysis to a remote specialist agent
analysis_result = await send_task(
    agent_url="https://agents.analytics-team.com/customer-insights",
    skill_id="analyse_customer",
    message=f"Analyse this customer profile and suggest retention strategies: "
            f"{customer_data}"
)

# 3. Use MCP to write the result back to a local system
await mcp_client.call_tool("database_insert", {
    "table": "customer_insights",
    "data": analysis_result
})
6

实现A2A服务器

要让你的智能体通过A2A对其他智能体可用,你需要实现一个符合A2A标准的服务器。Google A2A SDK提供了参考实现,下面是一个展示核心概念的简化示例。

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uuid

app = FastAPI()

# In-memory task store (use a database in production)
tasks = {}

@app.get("/.well-known/agent.json")
async def agent_card():
    """Serve the Agent Card for discovery."""
    return {
        "name": "Summary Agent",
        "description": "Summarises documents and articles.",
        "url": "https://my-agent.example.com",
        "version": "1.0.0",
        "protocol": "a2a/1.0",
        "skills": [{
            "id": "summarise",
            "name": "Summarise Text",
            "description": "Create a concise summary of any text input.",
            "inputModes": ["text/plain"],
            "outputModes": ["text/plain"]
        }],
        "capabilities": {"streaming": False}
    }

@app.post("/")
async def handle_jsonrpc(request: Request):
    """Handle A2A JSON-RPC requests."""
    body = await request.json()
    method = body.get("method")

    if method == "tasks/send":
        return await handle_task_send(body)
    elif method == "tasks/get":
        return await handle_task_get(body)
    else:
        return JSONResponse({"jsonrpc": "2.0", "error": {
            "code": -32601, "message": f"Unknown method: {method}"
        }, "id": body.get("id")})

async def handle_task_send(body: dict) -> JSONResponse:
    """Process a new task."""
    params = body["params"]
    task_id = params.get("id", str(uuid.uuid4()))
    message_text = params["message"]["parts"][0]["text"]

    # Process the task (in production, do this async with a queue)
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-20250514", max_tokens=1024,
        system="You create clear, concise summaries.",
        messages=[{"role": "user", "content": f"Summarise:\n{message_text}"}]
    )
    summary = response.content[0].text

    task_result = {
        "id": task_id,
        "status": {"state": "completed"},
        "artifacts": [{
            "parts": [{"type": "text", "text": summary}]
        }]
    }
    tasks[task_id] = task_result
    return JSONResponse({"jsonrpc": "2.0", "result": task_result, "id": body["id"]})

async def handle_task_get(body: dict) -> JSONResponse:
    """Return task status and results."""
    task_id = body["params"]["id"]
    if task_id in tasks:
        return JSONResponse({"jsonrpc": "2.0", "result": tasks[task_id], "id": body["id"]})
    return JSONResponse({"jsonrpc": "2.0", "error": {
        "code": -32602, "message": "Task not found"
    }, "id": body["id"]})
提示

对于生产级A2A服务器,建议使用Google官方的 a2a-python SDK,而不是从头构建。它开箱即用地处理了JSON-RPC解析、SSE流式传输、任务状态管理和认证。

7

未来

MCP和A2A的结合为开放的智能体生态系统奠定了基础。使用任何框架构建的智能体可以互相发现、委派任务和交换结果——就像今天的Web服务通过REST API通信一样。

值得关注的趋势:

注意

A2A仍然是一个不断演进的标准。截至2026年初,核心协议已稳定,但计费、信任和高级发现的扩展仍在开发中。请关注A2A官方仓库获取更新。

下一模块

模块13 — 安全与护栏