Overview
The Emitter
is a powerful event management and observability tool that allows you to track, monitor, and react to events happening within your AI agents and workflows.
This flexible event-driven mechanism providers the ability to:
Observe system events
Debug agent behaviors
Log and track agent interactions
Implement custom event handling
Supported in Python and TypeScript.
Basic usage
import asyncio
import json
import sys
import traceback
from typing import Any
from beeai_framework.emitter import Emitter, EventMeta
from beeai_framework.errors import FrameworkError
async def main () -> None :
# Get the root emitter or create your own
root = Emitter.root()
# Listen to all events that will get emitted
@root.on ( "*.*" )
async def handle_new_event ( data : Any, event : EventMeta) -> None :
print ( f "Received event ' { event.name } ' ( { event.path } ) with data { json.dumps(data) } " )
await root.emit( "start" , { "id" : 123 })
await root.emit( "end" , { "id" : 123 })
root.off( callback = handle_new_event) # deregister a listener
if __name__ == "__main__" :
try :
asyncio.run(main())
except FrameworkError as e:
traceback.print_exc()
sys.exit(e.explain())
See all 32 lines
You can create your own emitter by initiating the Emitter
class, but typically it’s better to use or fork the root one.
Key features
Event matching
Event matching allows you to:
Listen to specific event types
Use wildcard matching
Handle nested events
import asyncio
import re
import sys
import traceback
from beeai_framework.adapters.ollama import OllamaChatModel
from beeai_framework.backend import ChatModel
from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError
async def main () -> None :
emitter = Emitter.root().child( namespace = [ "app" ])
model = OllamaChatModel()
# Match events by a concrete name (strictly typed)
emitter.on( "update" , lambda data , event : print (data, ": on update" ))
# Match all events emitted directly on the instance (not nested)
emitter.on( "*" , lambda data , event : print (data, ": match all instance" ))
# Match all events (included nested)
cleanup = Emitter.root().on( "*.*" , lambda data , event : print (data, ": match all nested" ))
# Match events by providing a filter function
model.emitter.on(
lambda event : isinstance (event.creator, ChatModel), lambda data , event : print (data, ": match ChatModel" )
)
# Match events by regex
emitter.on(re.compile( r "watsonx" ), lambda data , event : print (data, ": match regex" ))
await emitter.emit( "update" , "update" )
await Emitter.root().emit( "root" , "root" )
await model.emitter.emit( "model" , "model" )
cleanup() # You can remove a listener from an emitter by calling the cleanup function it returns
if __name__ == "__main__" :
try :
asyncio.run(main())
except FrameworkError as e:
traceback.print_exc()
sys.exit(e.explain())
See all 46 lines
Event piping
Event piping enables:
Transferring events between emitters
Transforming events in transit
Creating complex event workflows
import asyncio
import sys
import traceback
from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError
async def main () -> None :
first: Emitter = Emitter( namespace = [ "app" ])
first.on(
"*.*" ,
lambda data , event : print (
f "'first' has retrieved the following event ' { event.path } ', isDirect: { event.source == first } "
),
)
second: Emitter = Emitter( namespace = [ "app" , "llm" ])
second.on(
"*.*" ,
lambda data , event : print (
f "'second' has retrieved the following event ' { event.path } ', isDirect: { event.source == second } "
),
)
# Propagate all events from the 'second' emitter to the 'first' emitter
unpipe = second.pipe(first)
await first.emit( "a" , {})
await second.emit( "b" , {})
print ( "Unpipe" )
unpipe()
await first.emit( "c" , {})
await second.emit( "d" , {})
if __name__ == "__main__" :
try :
asyncio.run(main())
except FrameworkError as e:
traceback.print_exc()
sys.exit(e.explain())
See all 47 lines
Framework usage
In the following section we will take a look how to consume events from core modules in the the framework.
The fastest way to see what is going on under the hood is by doing instance.run(...).middleware(GlobalTrajectoryMiddleware)
.
Agent usage
Integrate emitters with agents to:
Track agent decision-making
Log agent interactions
Debug agent behaviors
import asyncio
import sys
import traceback
from beeai_framework.adapters.ollama import OllamaChatModel
from beeai_framework.agents.react import ReActAgent
from beeai_framework.errors import FrameworkError
from beeai_framework.memory import UnconstrainedMemory
async def main () -> None :
agent = ReActAgent(
llm = OllamaChatModel( "llama3.1" ),
memory = UnconstrainedMemory(),
tools = [],
)
# Matching events on the instance level
agent.emitter.on( "*.*" , lambda data , event : None )
# Matching events on the execution (run) level
await agent.run( "Hello agent!" ).observe(
lambda emitter : emitter.on( "*.*" , lambda data , event : print ( f "RUN LOG: received event ' { event.path } '" ))
)
if __name__ == "__main__" :
try :
asyncio.run(main())
except FrameworkError as e:
traceback.print_exc()
sys.exit(e.explain())
See all 33 lines
Advanced usage
Advanced techniques include:
Custom event handlers
Complex event filtering
Performance optimization
import asyncio
import sys
import traceback
from typing import Any
from beeai_framework.emitter import Emitter, EventMeta
from beeai_framework.errors import FrameworkError
async def main () -> None :
# Create an emitter
emitter = Emitter.root().child(
namespace = [ "bee" , "demo" ],
creator = {}, # typically a class
context = {}, # custom data (propagates to the event's context property)
group_id = None , # optional id for grouping common events (propagates to the event's groupId property)
)
@emitter.on ()
async def on_start ( data : dict[ str , Any], event : EventMeta) -> None :
print ( f "Received ' { event.name } ' event with id ' { data[ 'id' ] } '" )
# Listen for "update" event
cleanup = emitter.on(
"update" , lambda data , event : print ( f "Received ' { event.name } ' with id ' { data[ 'id' ] } ' and data ' { data[ 'data' ] } '" )
)
cleanup() # deregister a listener
# Listen for "success" event
@emitter.on ( "success" )
async def custom_name ( data : dict[ str , Any], event : EventMeta) -> None :
print ( f "Received ' { event.name } ' event with the following data" , data)
await emitter.emit( "start" , { "id" : 123 })
await emitter.emit( "update" , { "id" : 123 , "data" : "Hello Bee!" })
await emitter.emit( "success" , { "id" : 123 , "result" : "Hello world!" })
emitter.off( "success" , custom_name) # deregister a listener
if __name__ == "__main__" :
try :
asyncio.run(main())
except FrameworkError as e:
traceback.print_exc()
sys.exit(e.explain())
See all 47 lines
Examples