EventBus & EventListener
Event bus and listener classes for the events system.
amsdal_utils.events.bus.EventBus ¶
Central event bus for middleware-style event handling.
Features: - Type-safe event/context binding - Middleware chain pattern - Dependency-based ordering - Context history tracking - Caching of sorted listeners for performance
Example: # Register listener EventBus.subscribe(MyEvent, MyListener, priority=100)
# Emit event
context = MyContext(value=10)
result = EventBus.emit(MyEvent, context)
# Or async
result = await EventBus.aemit(MyEvent, context)
subscribe
classmethod
¶
subscribe(
event,
listener,
after=None,
before=None,
priority=500,
error_strategy=None,
)
Subscribe listener to event.
Args: event: Event class to listen to listener: Listener class (will be instantiated per execution) after: List of listeners or listener_id strings this should run after. Strings must be fully-qualified paths (e.g., "module.ClassName") before: List of listeners or listener_id strings this should run before. Strings must be fully-qualified paths (e.g., "module.ClassName") priority: Numeric priority (0-999, lower = earlier) error_strategy: How to handle errors (None = use event's default)
Raises: ValueError: If string path cannot be imported (typo in listener reference) TypeError: If listener reference is not an EventListener subclass
Example: # Using class references (type-safe) EventBus.subscribe(MyEvent, ListenerA, after=[ListenerB])
# Using string paths (for cross-module references)
EventBus.subscribe(MyEvent, ListenerA, after=["other.module.ListenerB"])
# Mixed approach
EventBus.subscribe(MyEvent, ListenerC, after=[ListenerA, "other.module.ListenerB"])
unsubscribe
classmethod
¶
unsubscribe(event, listener_id)
Unsubscribe listener from event.
Args: event: Event type listener_id: Listener ID to remove
Returns: True if listener was found and removed
emit
classmethod
¶
emit(event, context)
Emit event synchronously and execute middleware chain.
Args: event: Event type to emit context: Initial context
Returns: Final context after all listeners
Raises: Any exception raised by listeners (depends on error_strategy)
aemit
async
classmethod
¶
aemit(event, context)
Emit event asynchronously and execute middleware chain.
Args: event: Event type to emit context: Initial context
Returns: Final context after all listeners
Raises: Any exception raised by listeners (depends on error_strategy)
get_listeners
classmethod
¶
get_listeners(event)
Get list of listener IDs for event (for introspection).
Args: event: Event type
Returns: List of listener IDs in execution order
amsdal_utils.events.listener.EventListener ¶
Bases: ABC, Generic[TContext]
Base event listener acting as middleware.
Each listener receives context and next() function. Must call next(context) to continue chain or raise exception to stop.
The listener_id is auto-generated from module and class name.
Example: class MyListener(EventListener[MyContext]): def handle(self, context: MyContext, next: NextFn) -> MyContext: # Your logic if not self.is_valid(context): raise ValueError("Invalid context")
# Modify context
new_ctx = context.create_next(
listener_id=self.listener_id,
some_field="new_value",
)
# Continue chain
return next(new_ctx)
handle
abstractmethod
¶
handle(context, next_fn)
Sync handler (default).
Args: context: Event context (immutable - create new with context.create_next()) next_fn: Function to call next listener
Returns: Context (original or modified)
Raises: Any exception to stop chain execution NotImplementedError: If only async is supported
ahandle
abstractmethod
async
¶
ahandle(context, next_fn)
Async handler.
Args: context: Event context (immutable - create new with context.create_next()) next_fn: Async function to call next listener
Returns: Context (original or modified)
Raises: Any exception to stop chain execution NotImplementedError: If only sync is supported