How to Build A2A Agents with Python: A Production Guide to Google's Agent-to-Agent Protocol

Build production-ready A2A agents in Python with Google's Agent-to-Agent protocol. Covers Agent Cards, executors, streaming, multi-agent orchestration, and MCP integration with working code examples.

If you've been building AI agents with MCP (Model Context Protocol), you've already tackled the vertical problem — connecting agents to tools and data. But what happens when your agents need to talk to each other? That's exactly the gap the Agent-to-Agent (A2A) protocol fills.

Google introduced A2A back in April 2025, and honestly, the adoption has been wild. By early 2026, it's been contributed to the Linux Foundation, merged with IBM's Agent Communication Protocol, and picked up by Microsoft, AWS, and dozens of other organizations. While MCP standardizes how agents access capabilities (tools, APIs, databases), A2A standardizes how agents discover, communicate, and collaborate with each other.

So, let's walk through building production-ready A2A agents in Python — from defining Agent Cards to implementing streaming executors, wiring up multi-agent orchestration, and combining A2A with MCP for a complete agent communication stack.

What Is the A2A Protocol and Why Should You Care?

The Agent-to-Agent (A2A) protocol is an open communication standard for AI agents. Think of it as HTTP for agent-to-agent interactions.

Just as REST APIs standardized how web services talk to each other, A2A standardizes how AI agents discover capabilities, delegate tasks, and exchange results — regardless of which framework built them.

A2A vs MCP: Complementary, Not Competing

The relationship between A2A and MCP clicks once you think about directionality:

  • MCP (vertical): Agent ↔ Tools/Data — how a single agent connects to external capabilities
  • A2A (horizontal): Agent ↔ Agent — how multiple agents communicate and collaborate

Here's a concrete example. Imagine a retail scenario: an inventory agent uses MCP to query a product database. When stock runs low, it uses A2A to notify a procurement agent, which in turn uses A2A to negotiate with external supplier agents. Each agent uses MCP for its own tools while A2A handles the inter-agent coordination.

They're two halves of the same puzzle.

Core A2A Concepts

Before we start writing code, let's cover the five building blocks you need to know:

  • Agent Card: A JSON metadata document describing an agent's identity, capabilities, skills, endpoint URL, and authentication requirements. Served at /.well-known/agent-card.json
  • Agent Skill: A discrete capability an agent exposes (e.g., "currency conversion", "sentiment analysis")
  • Task: A stateful unit of work with a lifecycle (submittedworkingcompleted/failed/canceled)
  • Message: A single communication turn carrying a role (user or agent) and one or more Parts
  • Part: A content container holding text, binary data, a URL, or structured JSON

Setting Up Your A2A Development Environment

The official A2A Python SDK requires Python 3.10 or higher. Install it with the extras you need:

# Core SDK
pip install a2a-sdk

# With HTTP server support (Starlette/FastAPI)
pip install "a2a-sdk[http-server]"

# With all extras (HTTP, gRPC, telemetry, SQL stores)
pip install "a2a-sdk[all]"

# For development/testing
pip install httpx uvicorn

The SDK supports multiple transports out of the box, which is great if your infrastructure already leans toward one protocol or another:

TransportClientServer
JSON-RPC over HTTPYesYes
HTTP + RESTYesYes
gRPCYesYes

Step 1: Define Agent Skills and the Agent Card

Every A2A agent starts with an AgentCard — essentially the "business card" that lets other agents discover what you can do. You define skills first, then compose them into a card:

from a2a.types import AgentCard, AgentCapabilities, AgentSkill

# Define what this agent can do
sentiment_skill = AgentSkill(
    id="sentiment_analysis",
    name="Sentiment Analysis",
    description="Analyzes text sentiment and returns a score with explanation",
    tags=["nlp", "sentiment", "text-analysis"],
    examples=["Analyze the sentiment of this review", "Is this feedback positive or negative?"],
    inputModes=["text/plain"],
    outputModes=["application/json"],
)

summarization_skill = AgentSkill(
    id="text_summarization",
    name="Text Summarization",
    description="Produces concise summaries of long-form text",
    tags=["nlp", "summarization", "text"],
    examples=["Summarize this article", "Give me a brief overview"],
    inputModes=["text/plain"],
    outputModes=["text/plain"],
)

# Compose the Agent Card
agent_card = AgentCard(
    name="NLP Analysis Agent",
    description="A production NLP agent for sentiment analysis and text summarization",
    url="http://localhost:8000/",
    version="1.0.0",
    capabilities=AgentCapabilities(streaming=True),
    defaultInputModes=["text/plain"],
    defaultOutputModes=["text/plain", "application/json"],
    skills=[sentiment_skill, summarization_skill],
)

A few design tips for production Agent Cards:

  • Be specific with skill descriptions: Other agents (and LLMs selecting agents) use these descriptions to decide whether to delegate a task to you. Vague descriptions lead to bad routing.
  • Include realistic examples: These help client agents format their requests correctly
  • Declare capabilities honestly: If you support streaming, set streaming=True. If you handle long-running tasks, advertise push notifications
  • Version your cards: Clients may cache Agent Cards, so version bumps signal capability changes

Step 2: Implement the Agent Executor

The AgentExecutor is where your agent's actual logic lives. It bridges the A2A protocol layer with your AI/ML code. You need to implement two methods: execute for handling requests and cancel for stopping in-flight tasks.

This is probably the most important piece of the whole setup, so let's walk through it carefully:

import json
from typing_extensions import override
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message
from a2a.types import (
    TaskArtifactUpdateEvent,
    TaskState,
    TaskStatus,
    TaskStatusUpdateEvent,
    TextPart,
    Part,
    Artifact,
    Message,
)


class NLPAgent:
    """Core NLP logic, independent of the A2A protocol."""

    async def analyze_sentiment(self, text: str) -> dict:
        # In production, call your LLM or ML model here
        # This example uses a simplified approach
        positive_words = {"good", "great", "excellent", "amazing", "love", "fantastic"}
        negative_words = {"bad", "terrible", "awful", "hate", "horrible", "poor"}
        words = set(text.lower().split())
        pos = len(words & positive_words)
        neg = len(words & negative_words)
        if pos > neg:
            return {"sentiment": "positive", "score": 0.8, "explanation": "Text contains positive indicators"}
        elif neg > pos:
            return {"sentiment": "negative", "score": 0.3, "explanation": "Text contains negative indicators"}
        return {"sentiment": "neutral", "score": 0.5, "explanation": "No strong sentiment detected"}

    async def summarize(self, text: str) -> str:
        # In production, call your LLM here
        sentences = text.split(". ")
        if len(sentences) <= 2:
            return text
        return ". ".join(sentences[:2]) + "."


class NLPAgentExecutor(AgentExecutor):
    """A2A executor that routes requests to the NLP agent."""

    def __init__(self):
        self.agent = NLPAgent()

    @override
    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        # Extract the user's input text
        user_input = context.get_user_input()
        if not user_input:
            await event_queue.enqueue_event(
                new_agent_text_message("No input provided. Send text for analysis.")
            )
            return

        # Route to the appropriate skill based on input
        input_lower = user_input.lower()
        if any(kw in input_lower for kw in ["sentiment", "feeling", "tone", "positive", "negative"]):
            result = await self.agent.analyze_sentiment(user_input)
            await event_queue.enqueue_event(
                new_agent_text_message(json.dumps(result, indent=2))
            )
        elif any(kw in input_lower for kw in ["summarize", "summary", "brief", "overview"]):
            result = await self.agent.summarize(user_input)
            await event_queue.enqueue_event(
                new_agent_text_message(result)
            )
        else:
            # Default to sentiment analysis
            result = await self.agent.analyze_sentiment(user_input)
            await event_queue.enqueue_event(
                new_agent_text_message(json.dumps(result, indent=2))
            )

    @override
    async def cancel(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        # Signal that the task was canceled
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                taskId=context.task_id,
                status=TaskStatus(state=TaskState.canceled),
                final=True,
            )
        )

The RequestContext carries all incoming request data (user message, task details, context ID), while the EventQueue is your outbound channel. You enqueue Message, Task, TaskStatusUpdateEvent, or TaskArtifactUpdateEvent objects, and the DefaultRequestHandler delivers them to the client.

Step 3: Wire Up the A2A Server

Now connect your executor to an HTTP server using the SDK's Starlette integration. This part is surprisingly straightforward:

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
import uvicorn

# Create the request handler with your executor and a task store
request_handler = DefaultRequestHandler(
    agent_executor=NLPAgentExecutor(),
    task_store=InMemoryTaskStore(),
)

# Build the Starlette application
app = A2AStarletteApplication(
    agent_card=agent_card,
    http_handler=request_handler,
)

if __name__ == "__main__":
    uvicorn.run(app.build(), host="0.0.0.0", port=8000)

Once running, your agent automatically serves its Agent Card at http://localhost:8000/.well-known/agent-card.json and accepts A2A requests at the root endpoint. That's it — you've got a working A2A agent.

For production, you'll want to swap InMemoryTaskStore for a persistent store:

# PostgreSQL task store for production
pip install "a2a-sdk[postgresql]"

from a2a.server.tasks import PostgreSQLTaskStore

task_store = PostgreSQLTaskStore(
    connection_string="postgresql://user:pass@localhost:5432/a2a_tasks"
)

Step 4: Build an A2A Client

On the client side, discovering and interacting with remote agents follows a pretty clean pattern:

import asyncio
import httpx
from a2a.client import A2AClient
from a2a.types import MessageSendParams, SendMessageRequest
from uuid import uuid4


async def call_nlp_agent():
    async with httpx.AsyncClient() as httpx_client:
        # Discover the agent by fetching its Agent Card
        client = await A2AClient.get_client_from_agent_card_url(
            httpx_client,
            "http://localhost:8000/",
        )

        # Send a task
        request = SendMessageRequest(
            id=str(uuid4()),
            params=MessageSendParams(
                message={
                    "role": "user",
                    "parts": [{"kind": "text", "text": "Analyze the sentiment: This product is amazing and I love it!"}],
                    "messageId": str(uuid4()),
                }
            ),
        )

        # Synchronous request/response
        response = await client.send_message(request)
        print("Response:", response)

        # Streaming request
        stream_request = SendMessageRequest(
            id=str(uuid4()),
            params=MessageSendParams(
                message={
                    "role": "user",
                    "parts": [{"kind": "text", "text": "Summarize: The A2A protocol enables agents to discover and communicate with each other. It uses Agent Cards for discovery. Tasks are the unit of work. Messages carry the actual content between agents."}],
                    "messageId": str(uuid4()),
                }
            ),
        )
        async for event in client.send_message_streaming(stream_request):
            print("Stream event:", event)


asyncio.run(call_nlp_agent())

The client workflow boils down to three steps: discover (fetch the Agent Card to understand capabilities), select (match skills to your task), and send (dispatch messages via send_message or send_message_streaming).

Step 5: Multi-Agent Orchestration

This is where A2A really shines. The real power shows up when you orchestrate multiple agents working together. Here's a pattern where an orchestrator agent delegates to specialized sub-agents:

import asyncio
import httpx
from a2a.client import A2AClient
from a2a.types import MessageSendParams, SendMessageRequest
from uuid import uuid4


class AgentOrchestrator:
    """Discovers and delegates to specialized A2A agents."""

    def __init__(self):
        self.agents: dict[str, A2AClient] = {}

    async def discover_agents(self, agent_urls: list[str]):
        """Fetch Agent Cards and register available agents."""
        async with httpx.AsyncClient() as httpx_client:
            for url in agent_urls:
                try:
                    client = await A2AClient.get_client_from_agent_card_url(
                        httpx_client, url
                    )
                    # Use the agent name from the card as the registry key
                    card = client.agent_card
                    self.agents[card.name] = client
                    print(f"Discovered: {card.name} - {card.description}")
                    for skill in card.skills:
                        print(f"  Skill: {skill.name} ({skill.id})")
                except Exception as e:
                    print(f"Failed to discover agent at {url}: {e}")

    async def delegate_task(self, agent_name: str, task_text: str) -> str:
        """Send a task to a specific agent and return the result."""
        client = self.agents.get(agent_name)
        if not client:
            raise ValueError(f"Agent '{agent_name}' not found in registry")

        request = SendMessageRequest(
            id=str(uuid4()),
            params=MessageSendParams(
                message={
                    "role": "user",
                    "parts": [{"kind": "text", "text": task_text}],
                    "messageId": str(uuid4()),
                }
            ),
        )
        response = await client.send_message(request)
        return str(response.result)

    async def run_pipeline(self, text: str):
        """Run a multi-agent analysis pipeline."""
        print(f"\n--- Analyzing: {text[:80]}... ---\n")

        # Step 1: Get sentiment from the NLP agent
        sentiment = await self.delegate_task(
            "NLP Analysis Agent",
            f"Analyze the sentiment: {text}"
        )
        print(f"Sentiment result: {sentiment}")

        # Step 2: Get a summary from the same or different agent
        summary = await self.delegate_task(
            "NLP Analysis Agent",
            f"Summarize: {text}"
        )
        print(f"Summary result: {summary}")

        return {"sentiment": sentiment, "summary": summary}


async def main():
    orchestrator = AgentOrchestrator()
    await orchestrator.discover_agents([
        "http://localhost:8000/",  # NLP Agent
        "http://localhost:8001/",  # Another agent
    ])
    result = await orchestrator.run_pipeline(
        "The new A2A protocol is fantastic for building multi-agent systems. "
        "It provides standardized communication and discovery mechanisms. "
        "However, the documentation could be more detailed in some areas."
    )
    print(f"\nPipeline result: {result}")

asyncio.run(main())

This orchestrator pattern is framework-agnostic — and that's the whole point. Your sub-agents can be built with LangGraph, CrewAI, PydanticAI, or plain Python. They only need to expose an A2A-compliant endpoint. That's the core value proposition of the protocol.

Combining A2A with MCP: The Full Agent Stack

In practice, most production agents need both protocols. MCP gives each agent access to its tools, while A2A enables cross-agent collaboration. Here's what the layered architecture looks like:

┌─────────────────────────────────────────────┐
│           Orchestrator Agent                 │
│  ┌─────────────────────────────────────────┐ │
│  │         A2A Client Layer                │ │
│  │   (discover, delegate, coordinate)      │ │
│  └──────────┬──────────────┬───────────────┘ │
└─────────────┼──────────────┼─────────────────┘
              │ A2A          │ A2A
              ▼              ▼
┌─────────────────┐  ┌─────────────────┐
│  NLP Agent      │  │  Data Agent     │
│  ┌───────────┐  │  │  ┌───────────┐  │
│  │ MCP Layer │  │  │  │ MCP Layer │  │
│  │ - LLM API │  │  │  │ - Database│  │
│  │ - Vectors │  │  │  │ - S3 Files│  │
│  └───────────┘  │  │  └───────────┘  │
└─────────────────┘  └─────────────────┘

Each agent uses MCP internally to connect to its own tools (LLM APIs, databases, file systems), while the orchestrator uses A2A to coordinate between them. This separation is really nice because you can swap out agents independently, scale them separately, and even replace the LLM framework behind one agent without touching the others.

Production Considerations

Task Store Selection

The InMemoryTaskStore is fine for development but loses all state on restart. For production, pick a persistent backend:

  • PostgreSQL: Best for most production workloads — full ACID compliance and battle-tested
  • SQLite: Good for single-node deployments and edge cases
  • MySQL: Viable if your infrastructure already runs MySQL

Observability with OpenTelemetry

The SDK has built-in OpenTelemetry support for tracing A2A requests across agent boundaries:

pip install "a2a-sdk[telemetry]"

# Traces propagate automatically through A2A calls,
# giving you distributed tracing across your agent mesh

This plugs into the same observability stack you might already be using for LLM monitoring (Langfuse, Jaeger, Grafana), giving you end-to-end visibility into multi-agent workflows. In my experience, this kind of tracing becomes essential the moment you have more than two agents talking to each other.

Authentication and Security

Production A2A deployments should declare authentication requirements in the Agent Card. Here's what that looks like with OAuth2:

agent_card = AgentCard(
    name="Secure Agent",
    description="An authenticated agent",
    url="https://agents.example.com/nlp/",
    version="1.0.0",
    capabilities=AgentCapabilities(streaming=True),
    skills=[sentiment_skill],
    securitySchemes={
        "oauth2": {
            "type": "oauth2",
            "flows": {
                "clientCredentials": {
                    "tokenUrl": "https://auth.example.com/token",
                    "scopes": {
                        "agent:read": "Read agent capabilities",
                        "agent:execute": "Execute agent tasks"
                    }
                }
            }
        }
    },
    security=[{"oauth2": ["agent:read", "agent:execute"]}],
    supports_authenticated_extended_card=True,
)

Agent Cards can also be digitally signed using JSON Web Signature (JWS) per RFC 7515. This lets clients verify that a card hasn't been tampered with and actually originates from the claimed provider — which matters a lot when you're discovering agents across organizational boundaries.

Error Handling and Resilience

Don't skip this part. Build resilience into your executor with proper error handling:

class ResilientAgentExecutor(AgentExecutor):

    @override
    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        try:
            # Signal that work has started
            if context.current_task:
                await event_queue.enqueue_event(
                    TaskStatusUpdateEvent(
                        taskId=context.current_task.id,
                        status=TaskStatus(
                            state=TaskState.working,
                            message=Message(
                                role="agent",
                                parts=[Part(root=TextPart(text="Processing your request..."))],
                                messageId=str(uuid4()),
                            ),
                        ),
                        final=False,
                    )
                )

            result = await self._do_work(context)
            await event_queue.enqueue_event(new_agent_text_message(result))

        except TimeoutError:
            await event_queue.enqueue_event(
                new_agent_text_message("Request timed out. Please try again with a shorter input.")
            )
        except Exception as e:
            await event_queue.enqueue_event(
                new_agent_text_message(f"An error occurred: {str(e)}")
            )

A2A Protocol Ecosystem in 2026

The A2A ecosystem has grown fast since its introduction. Here's where things stand right now:

  • Linux Foundation governance: A2A was contributed to the Linux Foundation, ensuring vendor-neutral stewardship
  • SDK support: Official SDKs for Python and JavaScript, with community implementations for Java (Spring AI), Go, and Rust
  • Framework integration: LangChain/LangGraph, Google ADK, Spring AI, and others provide native A2A support
  • Protocol version: The current spec is v0.3.0, with active development toward 1.0
  • IBM ACP merger: IBM's Agent Communication Protocol merged into A2A, consolidating inter-agent communication standards

The protocol is built on existing, proven standards — HTTP, SSE (Server-Sent Events), and JSON-RPC — so there's no proprietary transport to learn. If you can build a REST API, you can build an A2A agent.

Frequently Asked Questions

Is A2A a replacement for MCP?

No, and it's not meant to be. A2A and MCP solve different problems and are designed to work together. MCP handles vertical communication (agent to tools/data), while A2A handles horizontal communication (agent to agent). Most production systems end up using both. Google explicitly positioned A2A as complementary to MCP when they announced the protocol.

Can I use A2A with agents built on different frameworks?

Yes — that's literally the whole point. An agent built with LangGraph can communicate with an agent built using CrewAI, PydanticAI, or vanilla Python, as long as each exposes an A2A-compliant endpoint with an Agent Card. The protocol is framework-agnostic by design.

What Python version does the A2A SDK require?

The official a2a-sdk requires Python 3.10 or higher. Some sample repositories recommend Python 3.12+ for the best compatibility with all features. The SDK is async-first, built on asyncio and httpx.

How does A2A handle long-running tasks?

A2A supports three interaction patterns: synchronous request/response for quick tasks, SSE streaming for real-time incremental results, and push notifications where the server sends updates to a client-provided webhook. Tasks have a full lifecycle with states (submitted, working, completed, failed, canceled), so clients can poll or subscribe for updates on long-running operations.

Is A2A production-ready in 2026?

The spec is at v0.3.0 with active development toward 1.0. The official Python SDK supports multiple transports (HTTP, gRPC), persistent task stores (PostgreSQL, MySQL, SQLite), and OpenTelemetry tracing. Organizations like Google, Microsoft, and IBM are already using it in production. That said, expect some API changes as the specification evolves — pin your SDK versions and keep an eye on the release notes.

About the Author Editorial Team

Our team of expert writers and editors.