Skip to main content

Overview

Middleware in the BeeAI Framework is code that runs “in the middle” of an execution lifecycle—intercepting the flow between when a component (like an Agent, Tool, or Model) starts and when it finishes. As these components execute, they emit events at key moments, such as starting a task, calling a tool, or completing a response . Middleware hooks into these events to inject behaviors like logging, filtering, or safety checks—all without modifying the component’s core logic. This modular approach allows you to apply consistent policies across your entire system. You can use built-in tools like GlobalTrajectoryMiddleware for immediate debugging, or write custom middleware to handle complex needs like blocking unsafe content, enforcing rate limits, or managing authentication.
Note on Terminology: In this framework, Middleware refers to the classic software design pattern (pipeline interceptors) that runs between execution steps. This is distinct from the industry term “Agentic Middleware,” which typically refers to entire orchestration platforms.

Built-in Middleware

The following section showcases built-in middleware that you can start using right away.

Global Trajectory

The fastest way to understand your agent’s execution flow is by using the GlobalTrajectoryMiddleware. It captures all events, including deeply-nested ones, and prints them to the console, using indentation to visualize the call stack . Example
Python
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware

agent = RequirementAgent(
    llm="ollama:granite3.3",
    tools=[OpenMeteoTool()]
)
await agent.run("What's the current weather in Miami?")
            .middleware(GlobalTrajectoryMiddleware())
You can customize the output by passing parameters to the constructor:
ParameterDescription
targetSpecify a file or stream to which to write the trajectory (pass False to disable).
includedList of classes to include in the trajectory.
excludedList of classes to exclude from the trajectory.
prettyUse pretty formatting for the trajectory.
prefix_by_typeCustomize how instances of individual classes should be printed.
exclude_noneExclude None values from the printing.
enabledEnable/Disable the logging.
match_nestedWhether to observe trajectories of nested run contexts.
emitter_prioritySetting higher priority may result in capturing events without any modifications from other middlewares.
Example
Python
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware
from beeai_framework.tools.tool import Tool
from beeai_framework.backend.chat import ChatModel
from beeai_framework.tools.weather.openmeteo import OpenMeteoTool
from beeai_framework.agents.base import BaseAgent

# Log only tool calls
GlobalTrajectoryMiddleware(target=[Tool]) 

# Log only tool calls except OpenMeteoTool
GlobalTrajectoryMiddleware(target=[Tool], excluded=[OpenMeteoTool]) 

# Log only ChatModel events
GlobalTrajectoryMiddleware(target=[ChatModel]) 

# Use a Bee emoji for agents
GlobalTrajectoryMiddleware(prefix_by_type={BaseAgent: "🐝 "}) 
You can listen to events emitted throughout the execution to build your custom trajectory.

Tool Call Streaming

This middleware handles streaming tool calls in a ChatModel. It observes stream updates from the Chat Model and parses tool calls on demand so that they can be consumed immediately. It works even without streaming enabled, in which case it emits the update event at the end of the execution.. Example
Python
import asyncio

from beeai_framework.backend import ChatModel, UserMessage
from beeai_framework.emitter import EventMeta
from beeai_framework.middleware.stream_tool_call import StreamToolCallMiddleware, StreamToolCallMiddlewareUpdateEvent
from beeai_framework.tools.weather import OpenMeteoTool


async def main() -> None:
    llm = ChatModel.from_name("ollama:granite4:micro")
    weather_tool = OpenMeteoTool()
    middleware = StreamToolCallMiddleware(
        weather_tool,
        key="location_name",  # name is taken from the OpenMeteoToolInput schema
        match_nested=False,  # we are applying the middleware to the model directly
        force_streaming=True,  # we want to let middleware enable streaming on the model
    )

    @middleware.emitter.on("update")
    def log_thoughts(event: StreamToolCallMiddlewareUpdateEvent, meta: EventMeta) -> None:
        print(
            "Received update", event.delta, event.output_structured
        )  # event.delta contains an update of the 'location_name' field

    response = await llm.run([UserMessage("What's the current weather in New York?")], tools=[weather_tool]).middleware(
        middleware
    )
    print(response.get_tool_calls()[0].args)


if __name__ == "__main__":
    asyncio.run(main())

The following parameters can be passed to the constructor:
ParameterDescription
targetThe tool that we are waiting for to be called.
keyRefers to the name of the attribute in the tool’s schema that we want to stream.
match_nestedWhether the middleware should be applied only to the top level.
force_streamingSets the stream flag on the ChatModel.

Core Primatives

The BeeAI Framework middleware is built on an underlying system of primitives, which are described in this section. Understanding these primitives is helpful for building complex middleware.

Events

An event refers to an action initiated by a component. It carries the details of what just happened within the system. Every event has three key properties:
  • Name: A string identifier (e.g., start, success, error, or custom names like fetch_data).
  • Data payload: The content of the event, typically astructured as a Pydantic model.
  • Metadata: Information about the context where the event was fired.
You process these events using callbacks that follow this structure:
Python
from beeai_framework.backend.chat import ChatModelStartEvent
from beeai_framework.emitter import EventMeta

async def handler(data: ChatModelStartEvent, meta: EventMeta) -> None:
  print(f"Received event {meta.name} ({meta.path}) at {meta.created_at}")
  print("-> created by", type(meta.creator))
  print("-> data payload", meta.model_dump())
  print("-> context", meta.context)
  print("-> trace", meta.trace)

Emitter

The Emitter is the core component that lets you send and watch for events. While it is typically attached to a specific class, you can also use it on its own. An emitter instance is typically the child of a root emitter to which all events are propagated. Emitters can be nested (one can be a child of another), hence they internally create a tree hierarchy. Every emitter instance has the following properties:
  • namespace in which the emitter operates (eg: agents.requirement, tool.open_meteo, …).
  • creator class which the given emitter belongs to.
  • context (dictionary which is attached to all events emitted via the given emitter).
  • trace metadata (such as current id, run_id and parent_id)
It also gives you the following methods for managing listeners:
  • on for registering a new event listener
    • The method takes matcher (event name, callback, regex), callback (sync/async function), and options (priority, etc.)
    • The method can be used as a decorator or as a standalone function
  • off for deregistering an event listener
  • pipe for propagating all captured events to another emitter
  • child for creating a child emitter
The event’s path attribute is created by concatenating namespace with an event name (eg: backend.chat.ollama.start).
Example The following example depicts a minimal application that does the following:
  1. defines a data object for the fetch_data event,
  2. creates an emitter from the root one,
  3. registers a callback listening to the fetch_data event, which modifies its content,
  4. fires the fetch_data event,
  5. logs the modified event’s data.
Python
from pydantic import BaseModel
from beeai_framework.emitter import EventMeta, Emitter

# Define a data model for the event
class FetchDataEvent(BaseModel):
      url: str

# Create an emitter and pass events
emitter = Emitter.root().child(
    namespace=["app"],
    events={"fetch_data": FetchDataEvent} # optional
)

# Listen to an event
@emitter.on("fetch_data")
def handle_event(data: FetchDataEvent, meta: EventMeta) -> None:
    print(f"Retrieved event {meta}")
    data.url = "https://mywebsite.com"

# Create and emit the event
data = FetchDataEvent(url="https://example.com")
await emitter.emit("fetch_data", data)
print(data.url) # "https://mywebsite.com"

# Deregister a callback (optional)
emitter.off(callback=handle_event)
The emitter.on can be used directly and not just as a decorator. Example: emitter.on("fetch_data", callback).
If you name your function as either handle_{event_name} or on_{event_name}, then you don’t need to provide the event name as a parameter, as it gets inferred automatically.

Run (Context)

The Run class acts as a wrapper of the target implementation with its own lifecycle (an emitter with a set of events) and context (data that gets propagated to all events). The RunContext class is a container that stores information about the current execution context. These abstractions allow you to:
  • modify input to the given target (listen to a start event and modify the content of the input property),
  • modify output from the given target (listen to a success event and modify the content of the output property),
  • stop the run early (listen to a start event and set the output property to a non-None value),
  • propagate context (dictionary) to any component of your system,
  • cancel the execution in an arbitrary place,
  • gain observability into runs via structured events (for logging, tracing, and debugging).
The Run and Run Context gets created when a run method gets called on a framework class that can be executed (eg, ChatModel, Agent, …). The run object has the following methods:
  • The on method allows registering a callback to its emitter.
  • The middleware for registering middleware (a function that takes RunContext as a first parameter or a class with a bind method that takes the RunContext as a first parameter).
  • The context allows data to be set for a given execution. That data will then be propagated as metadata in every event that gets emitted.
The target implementation (handler) becomes part of the shared context (RunContext), which internally forms a hierarchical tree structure that shares the same context. In simpler terms, when you call one runnable (e.g., ChatModel) from within another runnable (e.g., Agent), the inner call (ChatModel) is attached to the context of the outer one (Agent).
The current execution context can be retrieved anytime by calling RunContext.get().

Runnable

The Runnable[R] class unifies common objects that can be executed and observed. It is an abstract class with the following traits:
  • It has an abstract run method that executes the class and returns a Run[R] (R is bound to the RunnableOutput).
  • It has an abstract emitter getter.
  • It has a middlewares getter that lists the existing middlewares.
Invoking a Runnable Every runnable takes a list of messages as its first (positional) parameter, followed by the following optional keyword arguments (RunnableOptions):
  • signal (an instance of AbortSignal) — allows aborting the execution.
  • context (a dictionary) — used to propagate additional data.
You can also pass extra arguments that may or may not be processed by the given handler. The RunnableOutput has the following properties:
  • output: a list of messages (can be empty)
  • context: a dictionary that can store additional data
  • last_message (getter): returns the last message if it exists, or creates an empty AssistantMessage otherwise
Creating a custom Runnable
Python
import asyncio
from functools import cached_property
from typing import Unpack

from beeai_framework.backend import AnyMessage, AssistantMessage, UserMessage
from beeai_framework.context import RunContext
from beeai_framework.emitter import Emitter
from beeai_framework.runnable import Runnable, RunnableOptions, RunnableOutput, runnable_entry


class GreetingRunnable(Runnable[RunnableOutput]):
    @runnable_entry
    async def run(self, input: list[AnyMessage], /, **kwargs: Unpack[RunnableOptions]) -> RunnableOutput:
        # retrieves the current run contex
        run = RunContext.get()

        response = f"Hello, {run.context.get('name', 'stranger')}!"

        # sends an emit so that someone can react to it (optional)
        await run.emitter.emit("before_send", response)

        return RunnableOutput(output=[AssistantMessage(response)])

    @cached_property
    def emitter(self) -> Emitter:
        return Emitter.root().child(namespace=["echo"])


async def main() -> None:
    echo = GreetingRunnable()
    response = await echo.run([UserMessage("Hello!")]).context({"name": "Alex"})
    print(response.last_message.text)


if __name__ == "__main__":
    asyncio.run(main())

Event Handling

Building robust agents requires precise control over the execution lifecycle. You need the ability to not only observe your agent’s behavior but also intercept and modify it at specific points. The following sections covers the mechanics of the BeeAI Framework event system and will enable you to manage:
  • Scopes: Deciding whether to listen globally, per instance, or for a single run.
  • *Config: Controlling listener priority, persistence, and blocking behavior.
  • Lifecycle: Undestanding the exact sequence events that occur during execution.
  • Debugging: Inspecting raw event streams to see exactly what your agent is doing.
  • Piping: Linking emitters together via piping to create unified event streams.

Scopes

Events can be observed at three different levels. 1. Global Level Every emitter provided by the out-of-the-box modules is a child of the root emitter. This means you can listen to all events directly from the root emitter.
Python
from beeai_framework.emitter import Emitter, EventMeta
from typing import Any

def log_all_events(data: Any, meta: EventMeta):
  print(f"Received event ({meta.id}) with name {meta.name} and path {meta.path}. \
        The event was created by {type(meta.creator)} and is a type of {type(data)}.")

root = Emitter.root()
root.on("*.*", log_all_events)
Listeners that are bound “closer” to the source are executed earlier. For those that reside at the same level, the order can be altered by setting a priority value which is part of the EmitterOptions class. A higher priority value means the listener will be executed earlier. The default priority is 0.
2. Instance Level You can also listen to events emitted by a specific instance of a class.
Python
from beeai_framework.backend.chat import ChatModel, ChatModelStartEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.emitter import EventMeta
from typing import Any

def change_model_temperature(data: ChatModelStartEvent, meta: EventMeta):
    print(f"The chat model triggered a start event. Changing a temperature.")
    data.input.temperature = 0.5

model = ChatModel.from_name("ollama:granite3.3")
model.emitter.on("start", change_model_temperature)

await model.run([UserMessage("Hello!")])
This registers a callback to the class’s emitter so that all events in a given class will be captured. 3. Run (Invocation) Level Sometimes you may want to listen to events emitted by a single run of a class.
Python
from beeai_framework.backend.chat import ChatModel, ChatModelStartEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.emitter import EventMeta
from typing import Any

model = ChatModel.from_name("ollama:granite4")

def change_model_temperature(data: ChatModelStartEvent, meta: EventMeta):
    print(f"The chat model triggered a start event. Changing a temperature.")
    data.input.temperature = 0.5

await model.run([UserMessage("Hello!")])
            .on("start", change_model_temperature)
Here, the callback is registered on the run instance (created by the run method). The run’s emitter is a child of the class emitter, allowing you to modify behavior for a single invocation without affecting others.

Config

When working with multiple callbacks, you may need to control execution order, or ensure that some run exclusively. You can do this using the optional options argument of type EmitterOptions. Example
Python
from beeai_framework.emitter import EmitterOptions, EventMeta
from beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEvent

# Creates a chat model instance
model = ChatModel.from_name("ollama:granite3.3", stream=True)

# Defines a callback that will be executed when a new token is emitted by the model
def cb(data: ChatModelNewTokenEvent, meta: EventMeta):
    print(f"[{meta.id}]: Received chunk", data.value.get_text_content())

# Creates a reference to the emitter
e = model.emitter

 # will be executed only once and then gets unregistered (default is False)
e.on("new_token", cb, EmitterOptions(once=True))

 # will not be deleted (default is False)
e.on("new_token", cb, EmitterOptions(persistent=True))

 # will be executed before those with a lower priority (default is 0), priority can also be negative
e.on("new_token", cb, EmitterOptions(priority=1))

# runs before every other callback with the same priority (default is False)
e.on("new_token", cb, EmitterOptions(is_blocking=True))

# match events that are emitted by the same type of class but executed within a target (e.g., calling agent.run(...) inside another agent.run(...))
e.on("new_token", cb, EmitterOptions(match_nested=True))
Nested events Based on the value of the matcher parameter (the one that is used to match the event), the framework decides whether to include/exclude nested events (events created from children emitters or from piping). The default value of the match_nested depends on the matcher value. Note that the value can be set directly as shown in the example above.
Matcher TypeDefault match_nested
String without . (event name)False
String with . (event path)True
"*" (match all top-level events)False
"*.*" (match all events)True
RegexTrue
FunctionFalse
If two events have the same priority, they are executed in the order they were added.

Lifecycle

When a framework component is executed, it creates a run context, which wraps the target handler and allows you to modify its input and output (Learn more in the Run (Context) section). Once a Run instance is executed (i.e., awaited), its lifecycle proceeds through the following steps:
  1. The start event is emitted.
  2. The target implementation is executed.
  3. Depending on the outcome, either a success or error event is emitted.
  4. Finally, the finish event is emitted.
The appropriate events are depicted in the following table:
EventData TypeDescription
startRunContextStartEventTriggered when the run starts.
successRunContextSuccessEventTriggered when the run succeeds.
errorFrameworkErrorTriggered when an error occurs.
finishRunContextFinishEventTriggered when the run finishes.
Below is an example showing how to listen to these events:
Python
import asyncio

from beeai_framework.backend import AnyMessage, AssistantMessage, ChatModelOutput
from beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.context import RunContextStartEvent, RunContextFinishEvent, RunContext
from beeai_framework.emitter import EventMeta
from beeai_framework.emitter.utils import create_internal_event_matcher

model = ChatModel.from_name("ollama:granite3.3")

def change_temperature(data: RunContextStartEvent, meta: EventMeta) -> None:
    """Modify the input of the model.run()"""

    print("debug: changing temperature to 0.5.\n")
    data.input["temperature"] = 0.5 # data.input contains all positional/keyword arguments of the called function


def premature_stop(data: RunContextStartEvent, meta: EventMeta) -> None:
    """Checks whether the input contains malicious text.
    If so, we prevent the ChatModel from executing and immediately return a custom response.
    """

    print("debug: Checking for a malicious input")
    messages: list[AnyMessage] = data.input["input"]  # first parameter
    for message in messages:
        if "bomb" in message.text:
            print("debug: Premature stop detected.")
            data.output = ChatModelOutput(output=[AssistantMessage("Cannot answer that.")])
            break

def validate_new_token(data: ChatModelNewTokenEvent, meta: EventMeta) -> None:
    """Check if the stream contains a malicious word. If so, we abort the run."""

    run = RunContext.get()

    if "fuse" in data.value.get_text_content():
        print(f"Aborting run for user with ID {run.context.get('user_id')}")
        run._controller.abort("Policy violation. Aborting the run.")



response = await (
    model.run([UserMessage("How to make a bomb?")])
    .on("new_token", validate_new_token)
    .on(create_internal_event_matcher("start", model), change_temperature)
    .on(create_internal_event_matcher("start", model), premature_stop)
    .context({"user_id": 123})
)
print("Agent:", response.get_text_content())
In this example, create_internal_event_matcher ensures we correctly match the event.
You can retrieve the current run context at any time within a callback using RunContext.get().

Debugging

While the Global Trajectory middleware is excellent for visualizing the structural hierarchy of a run, sometimes you need to inspect the raw stream of events as they happen. To do this quickly without setting up a full middleware class, you can register a wildcard listener (*.*) directly on your run. This captures every single event emitted during that specific execution.
Python
agent = RequirementAgent("ollama:granite3.3", tools=[OpenMeteoTool()])
response = await agent
  .run("What's the current weather in Miami?")
  .on("*.*", lambda data, meta: print(meta.path, 'by', type(meta.creator)))

Piping

In some cases, one might want to propagate all events from one emitter to another (for instance when creating a child emitter).
import asyncio
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    first: Emitter = Emitter(namespace=["app"])

    first.on(
        "*.*",
        lambda data, event: print(
            f"'first' has retrieved the following event '{event.path}', isDirect: {event.source == first}"
        ),
    )

    second: Emitter = Emitter(namespace=["app", "llm"])

    second.on(
        "*.*",
        lambda data, event: print(
            f"'second' has retrieved the following event '{event.path}', isDirect: {event.source == second}"
        ),
    )

    # Propagate all events from the 'second' emitter to the 'first' emitter
    unpipe = second.pipe(first)

    await first.emit("a", {})
    await second.emit("b", {})

    print("Unpipe")
    unpipe()

    await first.emit("c", {})
    await second.emit("d", {})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Creating Custom Middleware

While you can register individual callbacks to handle specific events, this approach can become cluttered if you have complex logic. To make your event handling reusable and modular, the BeeAI framework allows you to group listeners into a class called Middleware.

When to use Middleware vs. Callbacks

  • Use Callbacks (.on / .match): For simple, one-off logic, such as logging a specific event or debugging a single run.
  • Use Middleware: When the logic is complex, multi-step or needs to be reused across different parts of your application.

The Middleware Protocol

A middleware component is defined by how it interacts with the RunContext. It can be structured in two ways:
  1. A Function: A simple function that accepts RunContext as its first parameter.
  2. A Class: A class that implements a bind method, which accepts RunContext as its first parameter.
The RunContex provides access to the emitter, the instance being run, and the shared memory for that specific execution.

Example: Intercepting and Overriding

A common use case for middleware is intercepting a request before the target component executes to modify the input or provide a mock response. The following example demonstrates a middleware that intercepts the start event. By setting the output property on the event data, the middleware effectively “mocks” the result, preventing the actual ChatModel from running.
Python
import asyncio
from typing import Any

from beeai_framework.backend import AssistantMessage, ChatModel, ChatModelOutput, UserMessage
from beeai_framework.context import RunContext, RunContextStartEvent, RunMiddlewareProtocol
from beeai_framework.emitter import EmitterOptions, EventMeta
from beeai_framework.emitter.utils import create_internal_event_matcher


class OverrideResponseMiddleware(RunMiddlewareProtocol):
    """Middleware that sets the result value for a given runnable without executing it"""

    def __init__(self, result: Any) -> None:
        self._result = result

    def bind(self, ctx: RunContext) -> None:
        """Calls once the target is about to be run."""

        ctx.emitter.on(
            create_internal_event_matcher("start", ctx.instance),
            self._run,
            # ensures that this callback will be the first invoked
            EmitterOptions(is_blocking=True, priority=1),
        )

    async def _run(self, data: RunContextStartEvent, meta: EventMeta) -> None:
        """Set output property to the result which prevents an execution of the target handler."""

        data.output = self._result


async def main() -> None:
    middleware = OverrideResponseMiddleware(ChatModelOutput(output=[AssistantMessage("BeeAI is the best!")]))
    response = await ChatModel.from_name("ollama:granite4:micro").run([UserMessage("Hello!")]).middleware(middleware)
    print(response.get_text_content())  # "BeeAI is the best!"


if __name__ == "__main__":
    asyncio.run(main())
Key Implementation Details:
  • create_internal_event_matcher: A helper used to ensure you are matching the specific internal event (like start / success / error / finish) for the correct component instance.
  • EmitterOptions: Used here to set priority=1 and is_blocking=True, ensuring this middleware executes early and takes precedence over other callbacks.
  • data.output: Setting this property during a start event signals the framework to skip the underlying execution (e.g., the LLM call) and return this value immediately.
Ensure that your mock response matches the expected output type of the component you are intercepting. For example, if you override a ChatModel, the return type must be ChatModelOutput.

Registering Middleware

Once defined, you can attach middleware to a component using the .middleware() method just before execution.
Python
async def main() -> None:
  # 1. Prepare the mock response
  mock_output = ChatModelOutput(output=[AssistantMessage("BeeAI is the best!")])

  # 2. Initialize the middleware
  middleware = OverrideResponseMiddleware(mock_output)

  # 3. Attach middleware to the run
  llm = ChatModel.from_name("ollama:granite4")
  response = await llm.run([UserMessage("Hello!")]).middleware(middleware)

  print(response.get_text_content())  # Output: "BeeAI is the best!"

if __name__ == "__main__":
  asyncio.run(main())
Note that middleware is applied to the Run instance (the result of calling .run()), not the standalone emitter class itself. However, in some cases, middleware can be passed via the component’s constructor if supported.

Events glossary

The following sections list all events that can be observed for built-in components. Note that your tools/agents/etc. can emit additional events.

Tools

The following events can be observed when calling Tool.run(...).
EventData TypeDescription
startToolStartEventTriggered when a tool starts executing.
successToolSuccessEventTriggered when a tool completes execution successfully.
errorToolErrorEventTriggered when a tool encounters an error.
retryToolRetryEventTriggered when a tool operation is being retried.
finishNoneTriggered when tool execution finishes (regardless of success or error).

Chat Models

The following events can be observed when calling ChatModel.run(...).
EventData TypeDescription
startChatModelStartEventTriggered when model generation begins.
new_tokenChatModelNewTokenEventTriggered when a new token is generated during streaming. Streaming must be enabled.
successChatModelSuccessEventTriggered when the model generation completes successfully.
errorChatModelErrorEventTriggered when model generation encounters an error.
finishNoneTriggered when model generation finishes (regardless of success or error).
Check out the in-code definition

Requirement Agent

EventData TypeDescription
startRequirementAgentStartEventTriggered when the agent begins execution.
successRequirementAgentSuccessEventTriggered when the agent successfully completes execution.
final_answerRequirementAgentFinalAnswerEventTriggered with intermediate chunks of the final answer.
Check out the in-code definition.

ToolCalling Agent

The following events can be observed by calling ToolCallingAgent.run(...).
EventData TypeDescription
startToolCallingAgentStartEventTriggered when the agent begins execution.
successToolCallingAgentSuccessEventTriggered when the agent successfully completes execution.
Check out the in-code definition.

ReAct Agent

The following events can be observed by calling ReActAgent.run(...).
EventData TypeDescription
startReActAgentStartEventTriggered when the agent begins execution.
errorReActAgentErrorEventTriggered when the agent encounters an error.
retryReActAgentRetryEventTriggered when the agent is retrying an operation.
successReActAgentSuccessEventTriggered when the agent successfully completes execution.
update and partial_updateReActAgentUpdateEventTriggered when the agent updates its state.
Check out the in-code definition Check out the in-code definition.

Workflow

The following events can be observed when calling Workflow.run(...).
EventData TypeDescription
startWorkflowStartEventTriggered when a workflow step begins execution.
successWorkflowSuccessEventTriggered when a workflow step completes successfully.
errorWorkflowErrorEventTriggered when a workflow step encounters an error.
Check out the in-code definition.

LinePrefixParser

The following events are caught internally by the LinePrefixParser.
EventData TypeDescription
updateLinePrefixParserUpdateTriggered when an update occurs.
partial_updateLinePrefixParserUpdateTriggered when a partial update occurs.

StreamToolCallMiddleware

The following events are caught internally by the StreamToolCallMiddleware.
EventData TypeDescription
updateStreamToolCallMiddlewareUpdateEventTriggered when an update occurs.
Check out the in-code definition.

GlobalTrajectoryMiddleware

The following events are handled internally by the GlobalTrajectoryMiddleware:
EventData TypeDescription
startGlobalTrajectoryMiddlewareStartEventTriggered when a target begins execution.
successGlobalTrajectoryMiddlewareSuccessEventTriggered when a target completes successfully.
errorGlobalTrajectoryMiddlewareErrorEventTriggered when an error occurs during target execution.
finishGlobalTrajectoryMiddlewareFinishEventTriggered after a target has finished execution, regardless of success or failure.
All events inherit from the GlobalTrajectoryMiddlewareEvent class.
Python
class GlobalTrajectoryMiddlewareEvent(BaseModel):
    message: str
    level: TraceLevel
    origin: tuple[Any, EventMeta]
The first element of the origin attribute is the original event (e.g., startRunContextStartEvent, etc.) that comes from the RunContext.

RunContext

Special events that are emitted before the target’s handler gets executed. A run event contains .run. in its event’s path and has internal set to true in the event’s context object.
EventData TypeDescription
startRunContextStartEventTriggered when the run starts. Has input (positional/keyword argument with which the function was run) and output property. Set the output property to prevent the execution of the target handler.
successRunContextSuccessEventTriggered when the run succeeds.
errorFrameworkErrorTriggered when an error occurs.
finishRunContextFinishEventTriggered when the run finishes.
Check out the in-code definition.
Instead of a manual matching, use create_internal_event_matcher helper function.

Examples