The A2A Protocol, developed by Google, is an open standard that allows AI agents to communicate and collaborate across various platforms and frameworks, independent of the underlying technologies they use.
Supported in Python only.

Prerequisites

  • BeeAI Framework installed with pip install beeai-framework
  • BeeAI Framework extension for A2A installed with pip install 'beeai-framework[a2a]'

A2A Agent (Client)

A2AAgent lets you easily connect with external agents using the A2A protocol.
import asyncio
import sys
import traceback

from beeai_framework.adapters.a2a.agents import A2AAgent
from beeai_framework.errors import FrameworkError
from beeai_framework.memory.unconstrained_memory import UnconstrainedMemory
from examples.helpers.io import ConsoleReader


async def main() -> None:
    reader = ConsoleReader()

    agent = A2AAgent(url="http://127.0.0.1:9999", memory=UnconstrainedMemory())
    for prompt in reader:
        # Run the agent and observe events
        response = await agent.run(prompt).on(
            "update",
            lambda data, event: (reader.write("Agent 🤖 (debug) : ", data)),
        )

        reader.write("Agent 🤖 : ", response.result.text)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

A2A Server

A2AServer lets you expose agents built in the BeeAI framework via A2A protocol.
from beeai_framework.adapters.a2a import A2AServer, A2AServerConfig
from beeai_framework.agents.experimental import RequirementAgent
from beeai_framework.backend import ChatModel
from beeai_framework.memory import UnconstrainedMemory
from beeai_framework.tools.search.duckduckgo import DuckDuckGoSearchTool
from beeai_framework.tools.weather import OpenMeteoTool


def main() -> None:
    llm = ChatModel.from_name("ollama:granite3.3:8b")
    agent = RequirementAgent(
        llm=llm,
        tools=[DuckDuckGoSearchTool(), OpenMeteoTool()],
        memory=UnconstrainedMemory(),
    )

    # Register the agent with the A2A server and run the HTTP server
    # For the ToolCallingAgent, we dont need to specify ACPAgent factory method
    # because it is already registered in the A2AServer
    A2AServer(config=A2AServerConfig(port=9999)).register(agent).serve()


if __name__ == "__main__":
    main()