Skip to main content
The BeeAI framework core and its parts can be easily extended via the underlying concept of events and run contexts.

Modules

The following section describes essential modules that are important to understand the whole concept.

Event

An event refers to an action created by some component.
  • Has a name (e.g., start, success, error, …, fetch_data).
  • Has a data payload (Pydantic model).
  • Has associated metadata about the context where the event was fired.
The emitted event is processed via callbacks that look like this:
from beeai_framework.backend.chat import ChatModelStartEvent
from beeai_framework.emitter import EventMeta

async def handler(data: ChatModelStartEvent, meta: EventMeta) -> None:
  print(f"Received event {meta.name} ({meta.path}) at {meta.created_at}")
  print("-> created by", type(meta.creator))
  print("-> data payload", meta.model_dump())
  print("-> context", meta.context)
  print("-> trace", meta.trace)

Emitter

The core component through which events are emitted and observed. An emitter is typically attached to some class but can be used alone. An emitter instance is typically the child of a root emitter to which all events are propagated. Emitters can be nested (one can be a child of another), hence they internally create a tree hierarchy. The emitter instance has the following properties:
  • namespace in which the emitter operates (eg: agents.requirement, tool.open_meteo, …).
  • creator class which the given emitter belongs to.
  • context (dictionary which is attached to all events emitted via the given emitter).
  • trace metadata (such as current id, run_id and parent_id)
The emitter instance and the following methods:
  • on for registering a new event listener
    • The method takes matcher (event name, callback, regex), callback (sync/async function), and options (priority, etc.)
    • The method can be used as a decorator or as a standalone function
  • off for deregistering an event listener
  • pipe for propagating all captured events to another emitter
  • child for creating a child emitter
The event’s path attribute is created by concatenating namespace with an event name (eg: backend.chat.ollama.start).
Example The following example depicts a minimal application that does the following:
  1. defines a data object for the fetch_data event,
  2. creates an emitter from the root one,
  3. registers a callback listening to the fetch_data event, which modifies its content,
  4. fires the fetch_data event,
  5. logs the modified event’s data.
from pydantic import BaseModel
from beeai_framework.emitter import EventMeta, Emitter

# Define a data model for the event
class FetchDataEvent(BaseModel):
      url: str

# Create an emitter and pass events
emitter = Emitter.root().child(
    namespace=["app"],
    events={"fetch_data": FetchDataEvent} # optional
)

# Listen to an event
@emitter.on("fetch_data")
def handle_event(data: FetchDataEvent, meta: EventMeta) -> None:
    print(f"Retrieved event {meta}")
    data.url = "https://mywebsite.com"

# Create and emit the event
data = FetchDataEvent(url="https://example.com")
await emitter.emit("fetch_data", data)
print(data.url) # "https://mywebsite.com"

# Deregister a callback (optional)
emitter.off(callback=handle_event)
The emitter.on can be used directly and not just as a decorator. Example: emitter.on("fetch_data", callback).
If you name your function as either handle_{event_name} or on_{event_name}, then you don’t need to provide the event name as a parameter, as it gets inferred automatically.

Run (Context)

The Run class acts as a wrapper of the target implementation with its own lifecycle (an emitter with a set of events) and context (data that gets propagated to all events). The RunContext class is a container that contains information about the current execution context. Why is it useful? Such abstraction allows you to:
  • modify input to the given target (listen to a start event and modify the content of the input property)
  • modify output from the given target (listen to a success event and modify the content of the output property)
  • does premature stop (listen to a start event and set the output property to not a None value)
  • propagate context (dictionary) to whatever component of your system you’d like
  • cancel the execution in an arbitrary place
  • observability
The Run and Run Context gets created when a run method gets called on a framework class that can be executed (eg, ChatModel, Agent, …). The run object has the following methods:
  • The on method allows registering a callback to its emitter.
  • The middleware for registering middleware (a function that takes RunContext as a first parameter or a class with a bind method that takes the RunContext as a first parameter).
  • The context allows data to be set for a given execution. That data will then be propagated as metadata in every event that gets emitted.
The target implementation (handler) becomes part of the shared context (RunContext), which internally forms a hierarchical tree structure that shares the same context. In simpler terms, when you call one runnable (e.g., ChatModel) from within another runnable (e.g., Agent), the inner call (ChatModel) is attached to the context of the outer one (Agent).
The current execution context can be retrieved anytime by calling RunContext.get().

Working with events

The following sections show you how to use the emitter to capture and modify events.

Scopes

Events can be generally observed on three different levels. 1. Global Level Every emitter provided by the out-of-the-box modules is a child of the root emitter, which means we can listen to all events directly from the root emitter.
from beeai_framework.emitter import Emitter, EventMeta
from typing import Any

def log_all_events(data: Any, meta: EventMeta):
  print(f"Received event ({meta.id}) with name {meta.name} and path {meta.path}. \
        The event was created by {type(meta.creator)} and is a type of {type(data)}.")

root = Emitter.root()
root.on("*.*", log_all_events)
Listeners that are bound “closer” to the source are executed earlier. For those that reside at the same level, the order can be altered by setting a priority value which is part of the EmitterOptions class.
2. Instance Level On the other hand, we can listen to events emitted by a specific instance of a class.
from beeai_framework.backend.chat import ChatModel, ChatModelStartEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.emitter import EventMeta
from typing import Any

def change_model_temperature(data: ChatModelStartEvent, meta: EventMeta):
    print(f"The chat model triggered a start event. Changing a temperature.")
    data.input.temperature = 0.5

model = ChatModel.from_name("ollama:granite3.3")
model.emitter.on("start", change_model_temperature)

await model.run([UserMessage("Hello!")])
The following example registers a callback to the class’s emitter. Therefore, all events in a given class will be captured. 3. Run (Invocation) Level Sometimes we might want to listen to events emitted by a single run of a class.
from beeai_framework.backend.chat import ChatModel, ChatModelStartEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.emitter import EventMeta
from typing import Any

model = ChatModel.from_name("ollama:granite4")

def change_model_temperature(data: ChatModelStartEvent, meta: EventMeta):
    print(f"The chat model triggered a start event. Changing a temperature.")
    data.input.temperature = 0.5

await model.run([UserMessage("Hello!")])
            .on("start", change_model_temperature)
In this example, we registered a callback, not the class itself, but to the concrete run instance (created by the run method). The run instance has its own emitter, which is a direct child of the class emitter. Thanks to this, one can apply modifications to a single run instead of the whole class.

Config

With a larger number of callbacks, we might want to ensure that some run before the others or that some run exclusively. To address these needs, we can use the optional keyword argument called options of type EmitterOptions. Example
from beeai_framework.emitter import EmitterOptions, EventMeta
from beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEvent

# Creates a chat model instance
model = ChatModel.from_name("ollama:granite3.3", stream=True)

# Defines a callback that will be executed when a new token is emitted by the model
def cb(data: ChatModelNewTokenEvent, meta: EventMeta):
    print(f"[{meta.id}]: Received chunk", data.value.get_text_content())

# Creates a reference to the emitter
e = model.emitter

 # will be executed only once and then gets unregistered (default is False)
e.on("new_token", cb, EmitterOptions(once=True))

 # will not be deleted (default is False)
e.on("new_token", cb, EmitterOptions(persistent=True))

 # will be executed before those with a lower priority (default is 0), priority can also be negative
e.on("new_token", cb, EmitterOptions(priority=1))

# runs before every other callback with the same priority (default is False)
e.on("new_token", cb, EmitterOptions(is_blocking=True))

# match events that are emitted by the same type of class but executed within a target (e.g., calling agent.run(...) inside another agent.run(...))
e.on("new_token", cb, EmitterOptions(match_nested=True))
Nested events Based on the value of the matcher parameter (the one that is used to match the event), the framework decides whether to include/exclude nested events (events created from children emitters or from piping). The default value of the match_nested depends on the matcher value. Note that the value can be set directly as shown in the example above.
Matcher TypeDefault match_nested
String without . (event name)False
String with . (event path)True
"*" (match all top-level events)False
"*.*" (match all events)True
RegexTrue
FunctionFalse
If two events have the same priority, they are executed in the order they were added.

Lifecycle

When a framework component is executed, it creates a run context, which is a wrapper around the target handler that gives us the ability to modify its input and output (learn more). How it works? Once a Run instance is executed (i.e., awaited), the lifecycle begins:
  1. The start event is emitted.
  2. The target implementation is executed.
  3. Depending on the outcome, either a success or error event is emitted.
  4. Finally, the finish event is emitted.
The appropriate events are depicted in the following table:
EventData TypeDescription
startRunContextStartEventTriggered when the run starts.
successRunContextSuccessEventTriggered when the run succeeds.
errorFrameworkErrorTriggered when an error occurs.
finishRunContextFinishEventTriggered when the run finishes.
See how you can listen to these events.
import asyncio

from beeai_framework.backend import AnyMessage, AssistantMessage, ChatModelOutput
from beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.context import RunContextStartEvent, RunContextFinishEvent, RunContext
from beeai_framework.emitter import EventMeta
from beeai_framework.emitter.utils import create_internal_event_matcher

model = ChatModel.from_name("ollama:granite3.3")

def change_temperature(data: RunContextStartEvent, meta: EventMeta) -> None:
    """Modify the input of the model.run()"""

    print("debug: changing temperature to 0.5.\n")
    data.input["temperature"] = 0.5 # data.input contains all positional/keyword arguments of the called function


def premature_stop(data: RunContextStartEvent, meta: EventMeta) -> None:
    """Checks whether the input contains malicious text.
    If so, we prevent the ChatModel from executing and immediately return a custom response.
    """

    print("debug: Checking for a malicious input")
    messages: list[AnyMessage] = data.input["input"]  # first parameter
    for message in messages:
        if "bomb" in message.text:
            print("debug: Premature stop detected.")
            data.output = ChatModelOutput(output=[AssistantMessage("Cannot answer that.")])
            break

def validate_new_token(data: ChatModelNewTokenEvent, meta: EventMeta) -> None:
    """Check if the stream contains a malicious word. If so, we abort the run."""

    run = RunContext.get()

    if "fuse" in data.value.get_text_content():
        print(f"Aborting run for user with ID {run.context.get('user_id')}")
        run._controller.abort("Policy violation. Aborting the run.")



response = await (
    model.run([UserMessage("How to make a bomb?")])
    .on("new_token", validate_new_token)
    .on(create_internal_event_matcher("start", model), change_temperature)
    .on(create_internal_event_matcher("start", model), premature_stop)
    .context({"user_id": 123})
)
print("Agent:", response.get_text_content())
In this example we are using create_internal_event_matcher which correctly matches the correct event.
The current context can be retrieved anytime by calling RunContext.get() within a callback.

Debugging

To see which events get emitted in your case, the best way is to register a log-all callback for your run.
agent = RequirementAgent("ollama:granite3:3", tools=[OpenMeteoTool()])
response = await agent
  .run("What's the current weather in Miami?")
  .on("*.*", lambda data, meta: print(meta.path, 'by', type(meta.creator)))

Reusability

We just showed you how you can alter the component’s behavior by listening to events that the target emits and modifying them. To make this concept more general and reusable, we can group all relevant listeners into a class called middleware. Similarly, callbacks are registered via the on method, and middlewares are registered via the middleware method. Sometimes they can also be set on the class itself via a constructor. The middleware is a function that takes the RunContext where one can attach desired callbacks. Middleware can also be a class with a public bind method that retrieves the RunContext.
import asyncio
from typing import Any

from beeai_framework.backend import AssistantMessage, ChatModel, ChatModelOutput, UserMessage
from beeai_framework.context import RunContext, RunContextStartEvent, RunMiddlewareProtocol
from beeai_framework.emitter import EmitterOptions, EventMeta
from beeai_framework.emitter.utils import create_internal_event_matcher


class OverrideResponseMiddleware(RunMiddlewareProtocol):
    """Middleware that sets the result value for a given runnable without executing it"""

    def __init__(self, result: Any) -> None:
        self._result = result

    def bind(self, ctx: RunContext) -> None:
        """Calls once the target is about to be run."""

        ctx.emitter.on(
            create_internal_event_matcher("start", ctx.instance),
            self._run,
            # ensures that this callback will be the first invoked
            EmitterOptions(is_blocking=True, priority=1),
        )

    async def _run(self, data: RunContextStartEvent, meta: EventMeta) -> None:
        """Set output property to the result which prevents an execution of the target handler."""

        data.output = self._result


async def main() -> None:
    middleware = OverrideResponseMiddleware(ChatModelOutput(output=[AssistantMessage("BeeAI is the best!")]))
    response = await ChatModel.from_name("ollama:granite3.3").run([UserMessage("Hello!")]).middleware(middleware)
    print(response.get_text_content())  # "BeeAI is the best!"


if __name__ == "__main__":
    asyncio.run(main())

Note that this concept is built on top of the run instance and is not available on the standalone emitter.
When to use middleware vs simple callback registration? A rule of thumb is to create middleware if the desired functionality consists of multiple pieces that must be used together or when the functionality is more complex.

Piping

In some cases, one might want to propagate all events from one emitter to another (for instance when creating a child emitter).
import asyncio
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    first: Emitter = Emitter(namespace=["app"])

    first.on(
        "*.*",
        lambda data, event: print(
            f"'first' has retrieved the following event '{event.path}', isDirect: {event.source == first}"
        ),
    )

    second: Emitter = Emitter(namespace=["app", "llm"])

    second.on(
        "*.*",
        lambda data, event: print(
            f"'second' has retrieved the following event '{event.path}', isDirect: {event.source == second}"
        ),
    )

    # Propagate all events from the 'second' emitter to the 'first' emitter
    unpipe = second.pipe(first)

    await first.emit("a", {})
    await second.emit("b", {})

    print("Unpipe")
    unpipe()

    await first.emit("c", {})
    await second.emit("d", {})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Existing middlewares

The following section showcases built-in middlewares that you can start using right away.

Global Trajectory

The fastest way to see what’s happening in your code (who is calling what) is by using the GlobalTrajectoryMiddleware. It captures all events (even deeply nested) and prints them to the console while visualizing nested calls with indentation. Example
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware

agent = RequirementAgent(
    llm="ollama:granite3:3",
    tools=[OpenMeteoTool()]
)
await agent.run("What's the current weather in Miami?")
            .middleware(GlobalTrajectoryMiddleware())
See which parameters can be passed to the constructor.
ParameterDescription
targetSpecify a file or stream to which to write the trajectory.
includedList of classes to include in the trajectory.
excludedList of classes to exclude from the trajectory.
prettyUse pretty formatting for the trajectory.
prefix_by_typeCustomize how instances of individual classes should be printed.
exclude_noneExclude None values from the printing.
enabledEnable/Disable the logging.
match_nestedWhether to observe trajectories of nested run contexts.
Example
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware
from beeai_framework.tools.tool import Tool
from beeai_framework.backend.chat import ChatModel
from beeai_framework.tools.weather.openmeteo import OpenMeteoTool
from beeai_framework.agents.base import BaseAgent

GlobalTrajectoryMiddleware(target=[Tool]) # log only tool calls
GlobalTrajectoryMiddleware(target=[Tool], excluded=[OpenMeteoTool]) # log only tool calls except OpenMeteoTool
GlobalTrajectoryMiddleware(target=[ChatModel]) # log only tool calls except OpenMeteoTool
GlobalTrajectoryMiddleware(prefix_by_type={BaseAgent: "🐝 "}) # use Bee Emoji for agents

Tool Call Streaming

Middleware for handling streaming tool calls in a ChatModel. This middleware observes and listens to Chat Model stream updates and parses the tool calls on demand so that they can be consumed as soon as possible. Example
Python
import asyncio

from beeai_framework.backend import ChatModel, UserMessage
from beeai_framework.emitter import EventMeta
from beeai_framework.middleware.stream_tool_call import StreamToolCallMiddleware, StreamToolCallMiddlewareUpdateEvent
from beeai_framework.tools.weather import OpenMeteoTool


async def main() -> None:
    llm = ChatModel.from_name("ollama:granite3.3:8b")
    weather_tool = OpenMeteoTool()
    middleware = StreamToolCallMiddleware(
        weather_tool,
        key="location_name",  # name is taken from the OpenMeteoToolInput schema
        match_nested=False,  # we are applying the middleware to the model directly
        force_streaming=True,  # we want to let middleware enable streaming on the model
    )

    @middleware.emitter.on("update")
    def log_thoughts(event: StreamToolCallMiddlewareUpdateEvent, meta: EventMeta) -> None:
        print(
            "Received update", event.delta, event.output_structured
        )  # event.delta contains an update of the 'location_name' field

    await llm.run([UserMessage("What's the current weather in New York?")], tools=[weather_tool]).middleware(middleware)


if __name__ == "__main__":
    asyncio.run(main())

See which parameters can be passed to the constructor.
ParameterDescription
targetThe tool that we are waiting for to be called.
keyRefers to the name of the attribute in the tool’s schema that we want to stream.
match_nestedWhether the middleware should be applied only to the top level.
force_streamingSets the stream flag on the ChatModel.

Events glossary

The following sections list all events that can be observed for built-in components. Note that your tools/agents/etc. can emit additional events.

Tools

The following events can be observed when calling Tool.run(...).
EventData TypeDescription
startToolStartEventTriggered when a tool starts executing.
successToolSuccessEventTriggered when a tool completes execution successfully.
errorToolErrorEventTriggered when a tool encounters an error.
retryToolRetryEventTriggered when a tool operation is being retried.
finishNoneTriggered when tool execution finishes (regardless of success or error).

Chat Models

The following events can be observed when calling ChatModel.run(...).
EventData TypeDescription
startChatModelStartEventTriggered when model generation begins.
new_tokenChatModelNewTokenEventTriggered when a new token is generated during streaming. Streaming must be enabled.
successChatModelSuccessEventTriggered when the model generation completes successfully.
errorChatModelErrorEventTriggered when model generation encounters an error.
finishNoneTriggered when model generation finishes (regardless of success or error).
Check out the in-code definition

Requirement Agent

EventData TypeDescription
startRequirementAgentStartEventTriggered when the agent begins execution.
successRequirementAgentSuccessEventTriggered when the agent successfully completes execution.
final_answerRequirementAgentFinalAnswerEventTriggered with intermediate chunks of the final answer when streaming is enabled.
Check out the in-code definition.

ToolCalling Agent

The following events can be observed by calling ToolCallingAgent.run(...).
EventData TypeDescription
startToolCallingAgentStartEventTriggered when the agent begins execution.
successToolCallingAgentSuccessEventTriggered when the agent successfully completes execution.
Check out the in-code definition.

ReAct Agent

The following events can be observed by calling ReActAgent.run(...).
EventData TypeDescription
startReActAgentStartEventTriggered when the agent begins execution.
errorReActAgentErrorEventTriggered when the agent encounters an error.
retryReActAgentRetryEventTriggered when the agent is retrying an operation.
successReActAgentSuccessEventTriggered when the agent successfully completes execution.
update and partial_updateReActAgentUpdateEventTriggered when the agent updates its state.
Check out the in-code definition Check out the in-code definition.

Workflow

The following events can be observed when calling Workflow.run(...).
EventData TypeDescription
startWorkflowStartEventTriggered when a workflow step begins execution.
successWorkflowSuccessEventTriggered when a workflow step completes successfully.
errorWorkflowErrorEventTriggered when a workflow step encounters an error.
Check out the in-code definition.

LinePrefixParser

The following events are caught internally by the line prefix parser.
EventData TypeDescription
updateLinePrefixParserUpdateTriggered when an update occurs.
partial_updateLinePrefixParserUpdateTriggered when a partial update occurs.

StreamToolCallMiddleware

The following events are caught internally by the StreamToolCallMiddleware.
EventData TypeDescription
updateStreamToolCallMiddlewareUpdateEventTriggered when an update occurs.
Check out the in-code definition.

RunContext

Special events that are emitted before the target’s handler gets executed. A run event contains .run. in its event’s path and has internal set to true in the event’s context object.
EventData TypeDescription
startRunContextStartEventTriggered when the run starts. Has input (positional/keyword argument with which the function was run) and output property. Set the output property to prevent the execution of the target handler.
successRunContextSuccessEventTriggered when the run succeeds.
errorFrameworkErrorTriggered when an error occurs.
finishRunContextFinishEventTriggered when the run finishes.
Check out the in-code definition.
Instead of a manual matching, use create_internal_event_matcher helper function.

Examples

I