The core component through which events are emitted and observed. An emitter is typically attached to some class but can be used alone.
An emitter instance is typically the child of a root emitter to which all events are propagated. Emitters can be nested (one can be a child of another), hence they internally create a tree hierarchy.The emitter instance has the following properties:
namespace in which the emitter operates (eg: agents.requirement, tool.open_meteo, …).
creator class which the given emitter belongs to.
context (dictionary which is attached to all events emitted via the given emitter).
trace metadata (such as current id, run_id and parent_id)
The emitter instance and the following methods:
on for registering a new event listener
The method takes matcher (event name, callback, regex), callback (sync/async function), and options (priority, etc.)
The method can be used as a decorator or as a standalone function
off for deregistering an event listener
pipe for propagating all captured events to another emitter
child for creating a child emitter
The event’s path attribute is created by concatenating namespace with an event name (eg: backend.chat.ollama.start).
ExampleThe following example depicts a minimal application that does the following:
defines a data object for the fetch_data event,
creates an emitter from the root one,
registers a callback listening to the fetch_data event, which modifies its content,
fires the fetch_data event,
logs the modified event’s data.
Copy
Ask AI
from pydantic import BaseModelfrom beeai_framework.emitter import EventMeta, Emitter# Define a data model for the eventclass FetchDataEvent(BaseModel): url: str# Create an emitter and pass eventsemitter = Emitter.root().child( namespace=["app"], events={"fetch_data": FetchDataEvent} # optional)# Listen to an event@emitter.on("fetch_data")def handle_event(data: FetchDataEvent, meta: EventMeta) -> None: print(f"Retrieved event {meta}") data.url = "https://mywebsite.com"# Create and emit the eventdata = FetchDataEvent(url="https://example.com")await emitter.emit("fetch_data", data)print(data.url) # "https://mywebsite.com"# Deregister a callback (optional)emitter.off(callback=handle_event)
The emitter.on can be used directly and not just as a decorator. Example: emitter.on("fetch_data", callback).
If you name your function as either handle_{event_name} or on_{event_name}, then you don’t need to provide the event name as a parameter, as it gets inferred automatically.
The Run class acts as a wrapper of the target implementation with its own lifecycle (an emitter with a set of events) and context (data that gets propagated to all events).The RunContext class is a container that contains information about the current execution context.Why is it useful?Such abstraction allows you to:
modify input to the given target (listen to a start event and modify the content of the input property)
modify output from the given target (listen to a success event and modify the content of the output property)
does premature stop (listen to a start event and set the output property to not a None value)
propagate context (dictionary) to whatever component of your system you’d like
cancel the execution in an arbitrary place
observability
The Run and Run Context gets created when a run method gets called on a framework class that can be executed (eg, ChatModel, Agent, …).The run object has the following methods:
The on method allows registering a callback to its emitter.
The middleware for registering middleware (a function that takes RunContext as a first parameter or a class with a bind method that takes the RunContext as a first parameter).
The context allows data to be set for a given execution. That data will then be propagated as metadata in every event that gets emitted.
The target implementation (handler) becomes part of the shared context (RunContext), which internally forms a hierarchical tree structure that shares the same context.In simpler terms, when you call one runnable (e.g., ChatModel) from within another runnable (e.g., Agent), the inner call (ChatModel) is attached to the context of the outer one (Agent).
The current execution context can be retrieved anytime by calling RunContext.get().
Events can be generally observed on three different levels.1. Global LevelEvery emitter provided by the out-of-the-box modules is a child of the root emitter, which means we can listen to all events directly from the root emitter.
Copy
Ask AI
from beeai_framework.emitter import Emitter, EventMetafrom typing import Anydef log_all_events(data: Any, meta: EventMeta): print(f"Received event ({meta.id}) with name {meta.name} and path {meta.path}. \ The event was created by {type(meta.creator)} and is a type of {type(data)}.")root = Emitter.root()root.on("*.*", log_all_events)
Listeners that are bound “closer” to the source are executed earlier. For those that reside at the same level, the order can be altered by setting a priority value which is part of the EmitterOptions class.
2. Instance LevelOn the other hand, we can listen to events emitted by a specific instance of a class.
Copy
Ask AI
from beeai_framework.backend.chat import ChatModel, ChatModelStartEventfrom beeai_framework.backend.message import UserMessagefrom beeai_framework.emitter import EventMetafrom typing import Anydef change_model_temperature(data: ChatModelStartEvent, meta: EventMeta): print(f"The chat model triggered a start event. Changing a temperature.") data.input.temperature = 0.5model = ChatModel.from_name("ollama:granite3.3")model.emitter.on("start", change_model_temperature)await model.run([UserMessage("Hello!")])
The following example registers a callback to the class’s emitter.
Therefore, all events in a given class will be captured.3. Run (Invocation) LevelSometimes we might want to listen to events emitted by a single run of a class.
Copy
Ask AI
from beeai_framework.backend.chat import ChatModel, ChatModelStartEventfrom beeai_framework.backend.message import UserMessagefrom beeai_framework.emitter import EventMetafrom typing import Anymodel = ChatModel.from_name("ollama:granite4")def change_model_temperature(data: ChatModelStartEvent, meta: EventMeta): print(f"The chat model triggered a start event. Changing a temperature.") data.input.temperature = 0.5await model.run([UserMessage("Hello!")]) .on("start", change_model_temperature)
In this example, we registered a callback, not the class itself, but to the concrete run instance (created by the run method).
The run instance has its own emitter, which is a direct child of the class emitter.
Thanks to this, one can apply modifications to a single run instead of the whole class.
With a larger number of callbacks, we might want to ensure that some run before the others or that some run exclusively.
To address these needs, we can use the optional keyword argument called options of type EmitterOptions.Example
Copy
Ask AI
from beeai_framework.emitter import EmitterOptions, EventMetafrom beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEvent# Creates a chat model instancemodel = ChatModel.from_name("ollama:granite3.3", stream=True)# Defines a callback that will be executed when a new token is emitted by the modeldef cb(data: ChatModelNewTokenEvent, meta: EventMeta): print(f"[{meta.id}]: Received chunk", data.value.get_text_content())# Creates a reference to the emittere = model.emitter # will be executed only once and then gets unregistered (default is False)e.on("new_token", cb, EmitterOptions(once=True)) # will not be deleted (default is False)e.on("new_token", cb, EmitterOptions(persistent=True)) # will be executed before those with a lower priority (default is 0), priority can also be negativee.on("new_token", cb, EmitterOptions(priority=1))# runs before every other callback with the same priority (default is False)e.on("new_token", cb, EmitterOptions(is_blocking=True))# match events that are emitted by the same type of class but executed within a target (e.g., calling agent.run(...) inside another agent.run(...))e.on("new_token", cb, EmitterOptions(match_nested=True))
Nested eventsBased on the value of the matcher parameter (the one that is used to match the event), the framework
decides whether to include/exclude nested events (events created from children emitters or from piping).The default value of the match_nested depends on the matcher value. Note that the value can be set directly as shown in the example above.
Matcher Type
Default match_nested
String without . (event name)
False
String with . (event path)
True
"*" (match all top-level events)
False
"*.*" (match all events)
True
Regex
True
Function
False
If two events have the same priority, they are executed in the order they were added.
When a framework component is executed, it creates a run context, which is a wrapper around the target handler that gives us the ability to modify its input and output (learn more).How it works?Once a Run instance is executed (i.e., awaited), the lifecycle begins:
The start event is emitted.
The target implementation is executed.
Depending on the outcome, either a success or error event is emitted.
Finally, the finish event is emitted.
The appropriate events are depicted in the following table:
Event
Data Type
Description
start
RunContextStartEvent
Triggered when the run starts.
success
RunContextSuccessEvent
Triggered when the run succeeds.
error
FrameworkError
Triggered when an error occurs.
finish
RunContextFinishEvent
Triggered when the run finishes.
See how you can listen to these events.
Copy
Ask AI
import asynciofrom beeai_framework.backend import AnyMessage, AssistantMessage, ChatModelOutputfrom beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEventfrom beeai_framework.backend.message import UserMessagefrom beeai_framework.context import RunContextStartEvent, RunContextFinishEvent, RunContextfrom beeai_framework.emitter import EventMetafrom beeai_framework.emitter.utils import create_internal_event_matchermodel = ChatModel.from_name("ollama:granite3.3")def change_temperature(data: RunContextStartEvent, meta: EventMeta) -> None: """Modify the input of the model.run()""" print("debug: changing temperature to 0.5.\n") data.input["temperature"] = 0.5 # data.input contains all positional/keyword arguments of the called functiondef premature_stop(data: RunContextStartEvent, meta: EventMeta) -> None: """Checks whether the input contains malicious text. If so, we prevent the ChatModel from executing and immediately return a custom response. """ print("debug: Checking for a malicious input") messages: list[AnyMessage] = data.input["input"] # first parameter for message in messages: if "bomb" in message.text: print("debug: Premature stop detected.") data.output = ChatModelOutput(output=[AssistantMessage("Cannot answer that.")]) breakdef validate_new_token(data: ChatModelNewTokenEvent, meta: EventMeta) -> None: """Check if the stream contains a malicious word. If so, we abort the run.""" run = RunContext.get() if "fuse" in data.value.get_text_content(): print(f"Aborting run for user with ID {run.context.get('user_id')}") run._controller.abort("Policy violation. Aborting the run.")response = await ( model.run([UserMessage("How to make a bomb?")]) .on("new_token", validate_new_token) .on(create_internal_event_matcher("start", model), change_temperature) .on(create_internal_event_matcher("start", model), premature_stop) .context({"user_id": 123}))print("Agent:", response.get_text_content())
In this example we are using create_internal_event_matcher which correctly matches the correct event.
The current context can be retrieved anytime by calling RunContext.get() within a callback.
We just showed you how you can alter the component’s behavior by listening to events that the target emits and modifying them. To make this concept more general and reusable, we can group all relevant listeners into a class called middleware.Similarly, callbacks are registered via the on method, and middlewares are registered via the middleware method. Sometimes they can also be set on the class itself via a constructor.
The middleware is a function that takes the RunContext where one can attach desired callbacks. Middleware can also be a class with a public bind method that retrieves the RunContext.
Copy
Ask AI
import asynciofrom typing import Anyfrom beeai_framework.backend import AssistantMessage, ChatModel, ChatModelOutput, UserMessagefrom beeai_framework.context import RunContext, RunContextStartEvent, RunMiddlewareProtocolfrom beeai_framework.emitter import EmitterOptions, EventMetafrom beeai_framework.emitter.utils import create_internal_event_matcherclass OverrideResponseMiddleware(RunMiddlewareProtocol): """Middleware that sets the result value for a given runnable without executing it""" def __init__(self, result: Any) -> None: self._result = result def bind(self, ctx: RunContext) -> None: """Calls once the target is about to be run.""" ctx.emitter.on( create_internal_event_matcher("start", ctx.instance), self._run, # ensures that this callback will be the first invoked EmitterOptions(is_blocking=True, priority=1), ) async def _run(self, data: RunContextStartEvent, meta: EventMeta) -> None: """Set output property to the result which prevents an execution of the target handler.""" data.output = self._resultasync def main() -> None: middleware = OverrideResponseMiddleware(ChatModelOutput(output=[AssistantMessage("BeeAI is the best!")])) response = await ChatModel.from_name("ollama:granite3.3").run([UserMessage("Hello!")]).middleware(middleware) print(response.get_text_content()) # "BeeAI is the best!"if __name__ == "__main__": asyncio.run(main())
Note that this concept is built on top of the run instance and is not available on the standalone emitter.
When to use middleware vs simple callback registration?A rule of thumb is to create middleware if the desired functionality consists of multiple pieces that must be used together or when the functionality is more complex.
The fastest way to see what’s happening in your code (who is calling what) is by using the GlobalTrajectoryMiddleware. It captures all events (even deeply nested) and prints them to the console while visualizing nested calls with indentation.Example
Copy
Ask AI
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddlewareagent = RequirementAgent( llm="ollama:granite3:3", tools=[OpenMeteoTool()])await agent.run("What's the current weather in Miami?") .middleware(GlobalTrajectoryMiddleware())
See which parameters can be passed to the constructor.
Parameter
Description
target
Specify a file or stream to which to write the trajectory.
included
List of classes to include in the trajectory.
excluded
List of classes to exclude from the trajectory.
pretty
Use pretty formatting for the trajectory.
prefix_by_type
Customize how instances of individual classes should be printed.
exclude_none
Exclude None values from the printing.
enabled
Enable/Disable the logging.
match_nested
Whether to observe trajectories of nested run contexts.
Example
Copy
Ask AI
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddlewarefrom beeai_framework.tools.tool import Toolfrom beeai_framework.backend.chat import ChatModelfrom beeai_framework.tools.weather.openmeteo import OpenMeteoToolfrom beeai_framework.agents.base import BaseAgentGlobalTrajectoryMiddleware(target=[Tool]) # log only tool callsGlobalTrajectoryMiddleware(target=[Tool], excluded=[OpenMeteoTool]) # log only tool calls except OpenMeteoToolGlobalTrajectoryMiddleware(target=[ChatModel]) # log only tool calls except OpenMeteoToolGlobalTrajectoryMiddleware(prefix_by_type={BaseAgent: "🐝 "}) # use Bee Emoji for agents
Middleware for handling streaming tool calls in a ChatModel. This middleware observes and listens to Chat Model stream updates and parses the tool calls on demand so that they can be consumed as soon as possible.Example
Python
Copy
Ask AI
import asynciofrom beeai_framework.backend import ChatModel, UserMessagefrom beeai_framework.emitter import EventMetafrom beeai_framework.middleware.stream_tool_call import StreamToolCallMiddleware, StreamToolCallMiddlewareUpdateEventfrom beeai_framework.tools.weather import OpenMeteoToolasync def main() -> None: llm = ChatModel.from_name("ollama:granite3.3:8b") weather_tool = OpenMeteoTool() middleware = StreamToolCallMiddleware( weather_tool, key="location_name", # name is taken from the OpenMeteoToolInput schema match_nested=False, # we are applying the middleware to the model directly force_streaming=True, # we want to let middleware enable streaming on the model ) @middleware.emitter.on("update") def log_thoughts(event: StreamToolCallMiddlewareUpdateEvent, meta: EventMeta) -> None: print( "Received update", event.delta, event.output_structured ) # event.delta contains an update of the 'location_name' field await llm.run([UserMessage("What's the current weather in New York?")], tools=[weather_tool]).middleware(middleware)if __name__ == "__main__": asyncio.run(main())
See which parameters can be passed to the constructor.
Parameter
Description
target
The tool that we are waiting for to be called.
key
Refers to the name of the attribute in the tool’s schema that we want to stream.
match_nested
Whether the middleware should be applied only to the top level.
Special events that are emitted before the target’s handler gets executed.
A run event contains .run. in its event’s path and has internal set to true in the event’s context object.
Event
Data Type
Description
start
RunContextStartEvent
Triggered when the run starts. Has input (positional/keyword argument with which the function was run) and output property. Set the output property to prevent the execution of the target handler.