Skip to content

EventBus & EventListener

Event bus and listener classes for the events system.

amsdal_utils.events.bus.EventBus

Central event bus for middleware-style event handling.

Features: - Type-safe event/context binding - Middleware chain pattern - Dependency-based ordering - Context history tracking - Caching of sorted listeners for performance

Example: # Register listener EventBus.subscribe(MyEvent, MyListener, priority=100)

# Emit event
context = MyContext(value=10)
result = EventBus.emit(MyEvent, context)

# Or async
result = await EventBus.aemit(MyEvent, context)

subscribe classmethod

subscribe(
    event,
    listener,
    after=None,
    before=None,
    priority=500,
    error_strategy=None,
)

Subscribe listener to event.

Args: event: Event class to listen to listener: Listener class (will be instantiated per execution) after: List of listeners or listener_id strings this should run after. Strings must be fully-qualified paths (e.g., "module.ClassName") before: List of listeners or listener_id strings this should run before. Strings must be fully-qualified paths (e.g., "module.ClassName") priority: Numeric priority (0-999, lower = earlier) error_strategy: How to handle errors (None = use event's default)

Raises: ValueError: If string path cannot be imported (typo in listener reference) TypeError: If listener reference is not an EventListener subclass

Example: # Using class references (type-safe) EventBus.subscribe(MyEvent, ListenerA, after=[ListenerB])

# Using string paths (for cross-module references)
EventBus.subscribe(MyEvent, ListenerA, after=["other.module.ListenerB"])

# Mixed approach
EventBus.subscribe(MyEvent, ListenerC, after=[ListenerA, "other.module.ListenerB"])

unsubscribe classmethod

unsubscribe(event, listener_id)

Unsubscribe listener from event.

Args: event: Event type listener_id: Listener ID to remove

Returns: True if listener was found and removed

emit classmethod

emit(event, context)

Emit event synchronously and execute middleware chain.

Args: event: Event type to emit context: Initial context

Returns: Final context after all listeners

Raises: Any exception raised by listeners (depends on error_strategy)

aemit async classmethod

aemit(event, context)

Emit event asynchronously and execute middleware chain.

Args: event: Event type to emit context: Initial context

Returns: Final context after all listeners

Raises: Any exception raised by listeners (depends on error_strategy)

reset classmethod

reset()

Clear all listeners and cache (useful for testing)

get_listeners classmethod

get_listeners(event)

Get list of listener IDs for event (for introspection).

Args: event: Event type

Returns: List of listener IDs in execution order

amsdal_utils.events.listener.EventListener

Bases: ABC, Generic[TContext]

Base event listener acting as middleware.

Each listener receives context and next() function. Must call next(context) to continue chain or raise exception to stop.

The listener_id is auto-generated from module and class name.

Example: class MyListener(EventListener[MyContext]): def handle(self, context: MyContext, next: NextFn) -> MyContext: # Your logic if not self.is_valid(context): raise ValueError("Invalid context")

        # Modify context
        new_ctx = context.create_next(
            listener_id=self.listener_id,
            some_field="new_value",
        )

        # Continue chain
        return next(new_ctx)

handle abstractmethod

handle(context, next_fn)

Sync handler (default).

Args: context: Event context (immutable - create new with context.create_next()) next_fn: Function to call next listener

Returns: Context (original or modified)

Raises: Any exception to stop chain execution NotImplementedError: If only async is supported

ahandle abstractmethod async

ahandle(context, next_fn)

Async handler.

Args: context: Event context (immutable - create new with context.create_next()) next_fn: Async function to call next listener

Returns: Context (original or modified)

Raises: Any exception to stop chain execution NotImplementedError: If only sync is supported