Skip to content

Events System

Middleware-based event system for building extensible and type-safe plugins in the AMSDAL framework.

The Events system is a middleware chain pattern for event handling in the AMSDAL framework. It enables:

  • Plugins to define their own events without modifying core code
  • Type-safe event handling through generics
  • Execution order control via dependencies and priority
  • Context history tracking
  • Flexible error handling through ErrorStrategy
  • Both sync and async support

Architecture

Middleware Chain Pattern

Event → Listener 1 → Listener 2 → Listener 3 → Result
         ↓            ↓            ↓
      next_fn()    next_fn()    next_fn()

Each listener: 1. Receives context and next_fn 2. Executes its logic 3. Either calls next_fn(context) to continue 4. Or raises exception to stop

Components

┌─────────────────────────────────────────────────┐
│                   EventBus                       │
│  - subscribe()   - emit()                       │
│  - unsubscribe() - aemit()                      │
└─────────────────────────────────────────────────┘
                        │
        ┌───────────────┼───────────────┐
        ↓               ↓               ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│   Event      │ │ EventContext │ │EventListener │
│  (generic)   │ │  (history)   │ │ (middleware) │
└──────────────┘ └──────────────┘ └──────────────┘

Topological Sort

Listeners are sorted considering: 1. Dependencies (after, before) 2. Priority (lower number = earlier) 3. Cycle detection (raises ValueError)


Registering Listeners

Listeners must be registered during application initialization — inside AppConfig.on_setup(). The @listen_to decorator and EventBus.subscribe() execute immediately when the code is evaluated, so they must run at the right time in the startup sequence.

AMSDAL loads configs in this order:

  1. Plugins (AMSDAL_CONTRIBS) — each plugin's on_setup() is called
  2. Application (APP_CONFIG) — the main app's on_setup() is called

For end-user applications

Create an app.py file in your source directory (src/) with a MainAppConfig class:

# src/app.py
from amsdal.contrib.app_config import AppConfig


class MainAppConfig(AppConfig):
    def on_setup(self) -> None:
        from amsdal_utils.events import EventBus
        from amsdal_server.apps.common.events.server import ServerStartupEvent
        from myapp.listeners import MyStartupListener

        EventBus.subscribe(ServerStartupEvent, MyStartupListener)

AMSDAL discovers this file automatically via the APP_CONFIG setting (default: app.MainAppConfig). The amsdal new CLI command generates this file for you.

Note

If app.py doesn't exist in your source directory, AMSDAL silently skips it — it's optional. If you set a custom AMSDAL_APP_CONFIG path, the file must exist.

For plugins

Same pattern, but registered via AMSDAL_CONTRIBS:

# my_plugin/app.py
from amsdal.contrib.app_config import AppConfig


class MyPluginAppConfig(AppConfig):
    def on_setup(self) -> None:
        from amsdal_utils.events import EventBus
        from amsdal_server.apps.common.events.server import RouterSetupEvent
        from my_plugin.listeners import MyRouteListener

        EventBus.subscribe(RouterSetupEvent, MyRouteListener)
AMSDAL_CONTRIBS="amsdal.contrib.auth.app.AuthAppConfig,...,my_plugin.app.MyPluginAppConfig"

Warning

Do not use @listen_to or EventBus.subscribe() at module top level or in model files. Listeners registered outside on_setup() may execute before the framework is ready, leading to import errors or missed events.


Quick Start

1. Define Context

from amsdal_utils.events import EventContext

class UserContext(EventContext):
    user_id: str
    username: str
    authenticated: bool = False

2. Define Event

from amsdal_utils.events import Event

class AuthenticateEvent(Event[UserContext]):
    """Event emitted during authentication"""
    pass

3. Create Listener

from amsdal_utils.events import EventListener, listen_to

@listen_to(AuthenticateEvent, priority=100)
class TokenValidator(EventListener[UserContext]):
    def handle(self, context: UserContext, next_fn):
        # Your logic
        if not self._validate_token(context.user_id):
            raise ValueError('Invalid token')

        # Modify context
        new_context = context.create_next(
            listener_id=self.listener_id,
            authenticated=True
        )

        # Continue chain
        return next_fn(new_context)

    async def ahandle(self, context: UserContext, next_fn):
        # Async version
        raise NotImplementedError

4. Emit Event

from amsdal_utils.events import EventBus

# Sync
context = UserContext(user_id='123', username='john')
result = EventBus.emit(AuthenticateEvent, context)
print(result.authenticated)  # True

# Async
result = await EventBus.aemit(AuthenticateEvent, context)

Core Concepts

1. EventContext

Immutable Pydantic model with history tracking. The frozen=True config prevents accidental mutations.

from amsdal_utils.events import EventContext, EventBus
from pydantic import ValidationError

class MyContext(EventContext):
    value: int
    processed: bool = False

# Create initial context
context = MyContext(value=10)

# ✅ Correct: Create new version with create_next()
new_context = context.create_next(
    listener_id='my.Listener',
    value=context.value * 2,
    processed=True
)

# ❌ Wrong: Direct mutation raises ValidationError
try:
    context.value = 20  # Raises: ValidationError - "MyContext" is frozen
except ValidationError:
    print("Mutation prevented!")

# After emitting event, access history from final context
result = EventBus.emit(MyEvent, context)
for entry in result.history:
    print(f'{entry.listener_id}: {entry.context_snapshot.value}')

# Get specific version from history
old_version = result.get_by_listener('my.Listener')

Why Pydantic? - Automatic immutability enforcement with frozen=True - Type validation on field assignment - No need for manual __post_init__ or defensive copying - Better integration with serialization/deserialization

2. Event Types

Events can define default ErrorStrategy:

class CriticalEvent(Event[MyContext]):
    """Critical event - stop on error"""
    default_error_strategy = ErrorStrategy.PROPAGATE

class NonCriticalEvent(Event[MyContext]):
    """Non-critical - log and continue"""
    default_error_strategy = ErrorStrategy.LOG_AND_CONTINUE

3. Listener Ordering

Priority (basic sorting)

@listen_to(MyEvent, priority=100)  # executes first
class FirstListener(EventListener[MyContext]):
    pass

@listen_to(MyEvent, priority=200)  # executes second
class SecondListener(EventListener[MyContext]):
    pass

Dependencies (after/before)

You can specify dependencies using either class references (type-safe) or string paths:

@listen_to(MyEvent)
class AuthListener(EventListener[MyContext]):
    pass

class MLListener(EventListener[MyContext]):
    pass

# Option 1: Using class references (recommended - type-safe)
@listen_to(
    MyEvent,
    after=[AuthListener],  # after auth
    before=[MLListener]     # before ML
)
class CustomListener(EventListener[MyContext]):
    pass

# Option 2: Using string paths (for cross-module references)
@listen_to(
    MyEvent,
    after=['myapp.auth.AuthListener'],  # fully-qualified module path
    before=['myapp.ml.MLListener']
)
class CustomListener(EventListener[MyContext]):
    pass

# Option 3: Mixed approach
@listen_to(
    MyEvent,
    after=[AuthListener],  # local class
    before=['myapp.ml.MLListener']  # remote class
)
class CustomListener(EventListener[MyContext]):
    pass

String Path Validation: - String paths must be fully-qualified module paths (e.g., "module.ClassName") - The system validates paths by attempting to import them - Invalid paths raise ValueError immediately to catch typos early - Use class references when possible for better type safety

Topological Sort

The system automatically sorts listeners: - Respects after/before dependencies - Uses priority as tiebreaker - Detects circular dependencies

# Circular dependency raises ValueError
@listen_to(MyEvent, after=['B'])
class A(EventListener[MyContext]): pass

@listen_to(MyEvent, after=['A'])
class B(EventListener[MyContext]): pass

# ValueError: Circular dependency detected among listeners: ['A', 'B']

4. Error Handling

class ErrorStrategy(Enum):
    PROPAGATE = 'propagate'           # raise exception, stop chain
    LOG_AND_CONTINUE = 'log_and_continue'  # log error, continue
    SILENT = 'silent'                 # ignore errors

# Per-listener
@listen_to(MyEvent, error_strategy=ErrorStrategy.LOG_AND_CONTINUE)
class SafeListener(EventListener[MyContext]):
    pass

# Per-event (default for all listeners)
class SafeEvent(Event[MyContext]):
    default_error_strategy = ErrorStrategy.LOG_AND_CONTINUE

5. Sync vs Async

Listeners can be sync or async:

class MyListener(EventListener[MyContext]):
    def handle(self, context, next_fn):
        # Sync implementation
        return next_fn(context)

    async def ahandle(self, context, next_fn):
        # Async implementation
        await some_async_operation()
        return await next_fn(context)

# Usage
result = EventBus.emit(MyEvent, context)      # sync
result = await EventBus.aemit(MyEvent, context)  # async

Usage Examples

Example 1: Authentication System

from amsdal_utils.events import Event, EventContext, EventListener, listen_to

class AuthContext(EventContext):
    credentials: str
    user: dict | None = None
    authenticated: bool = False

class AuthenticateEvent(Event[AuthContext]):
    pass

@listen_to(AuthenticateEvent, priority=100)
class TokenValidator(EventListener[AuthContext]):
    def handle(self, context: AuthContext, next_fn):
        user = self._validate_token(context.credentials)
        if not user:
            raise Unauthorized('Invalid credentials')

        return next_fn(context.create_next(
            listener_id=self.listener_id,
            user=user,
            authenticated=True
        ))

    async def ahandle(self, context: AuthContext, next_fn):
        user = await self._async_validate_token(context.credentials)
        if not user:
            raise Unauthorized('Invalid credentials')

        return await next_fn(context.create_next(
            listener_id=self.listener_id,
            user=user,
            authenticated=True
        ))

Example 2: Server Event with Permission Check

class ReadObjectsContext(EventContext):
    model: type
    filters: dict
    user: dict

class PreReadObjectsEvent(Event[ReadObjectsContext]):
    """Emitted before reading objects"""
    pass

# In server
async def read_objects_handler(request):
    context = ReadObjectsContext(
        model=request.model,
        filters=request.filters,
        user=request.user,
    )

    try:
        result = await EventBus.aemit(PreReadObjectsEvent, context)
        objects = await db.read(result.model, result.filters)
        return objects
    except PermissionDenied as e:
        return Response(status=403, body=str(e))

# In auth plugin
@listen_to(PreReadObjectsEvent, priority=100)
class ReadPermissionChecker(EventListener[ReadObjectsContext]):
    async def ahandle(self, context: ReadObjectsContext, next_fn):
        if not self._can_read(context.user, context.model):
            raise PermissionDenied('No read permission')

        # Add tenant filter
        new_filters = {**context.filters, 'tenant_id': context.user['tenant_id']}
        return await next_fn(context.create_next(
            listener_id=self.listener_id,
            filters=new_filters
        ))

Example 3: Audit Logging

@listen_to(
    PreReadObjectsEvent,
    priority=50,  # runs first
    error_strategy=ErrorStrategy.LOG_AND_CONTINUE
)
class AuditLogger(EventListener[ReadObjectsContext]):
    async def ahandle(self, context: ReadObjectsContext, next_fn):
        start = time.time()

        try:
            result = await next_fn(context)
            await self._log_audit(
                user=context.user,
                action='read',
                model=context.model,
                duration=time.time() - start,
                status='success'
            )
            return result
        except Exception as e:
            await self._log_audit(
                user=context.user,
                action='read',
                model=context.model,
                duration=time.time() - start,
                status='error',
                error=str(e)
            )
            raise

API Reference

EventBus

class EventBus:
    @classmethod
    def subscribe(
        cls,
        event: type[Event[TContext]],
        listener: type[EventListener[TContext]],
        after: list[type[EventListener[Any]] | str] | None = None,
        before: list[type[EventListener[Any]] | str] | None = None,
        priority: int = 500,
        error_strategy: ErrorStrategy | None = None,
    ) -> None:
        """Register listener for event"""

    @classmethod
    def unsubscribe(cls, event: type[Event[Any]], listener_id: str) -> bool:
        """Remove listener from event"""

    @classmethod
    def emit(cls, event: type[Event[TContext]], context: TContext) -> TContext:
        """Execute sync middleware chain"""

    @classmethod
    async def aemit(cls, event: type[Event[TContext]], context: TContext) -> TContext:
        """Execute async middleware chain"""

    @classmethod
    def reset(cls) -> None:
        """Clear all listeners (for testing)"""

    @classmethod
    def get_listeners(cls, event: type[Event[Any]]) -> list[str]:
        """Get listener IDs in execution order"""

Event

class Event(Generic[TContext]):
    context_type: type[TContext]  # auto-detected
    default_error_strategy: ErrorStrategy = ErrorStrategy.PROPAGATE

EventContext

class EventContext(BaseModel, ABC):
    def create_next(self: TContext, listener_id: str, **changes) -> TContext:
        """Create new context version with changes"""

    @property
    def history(self) -> list[ContextHistoryEntry]:
        """Get full history of mutations"""

    def get_by_listener(self, listener_id: str) -> EventContext | None:
        """Get context version by listener ID"""

EventListener

class EventListener(ABC, Generic[TContext]):
    listener_id: str  # auto-generated

    @abstractmethod
    def handle(self, context: TContext, next_fn: NextFn[TContext]) -> TContext:
        """Sync handler"""

    @abstractmethod
    async def ahandle(self, context: TContext, next_fn: AsyncNextFn[TContext]) -> TContext:
        """Async handler"""

Decorators

def listen_to(
    event: type[Event[TContext]],
    after: list[type[EventListener[Any]] | str] | None = None,
    before: list[type[EventListener[Any]] | str] | None = None,
    priority: int = 500,
    error_strategy: ErrorStrategy | None = None,
) -> Callable:
    """Decorator to auto-register listener"""

ErrorStrategy

class ErrorStrategy(Enum):
    PROPAGATE = 'propagate'           # raise exception, stop chain
    LOG_AND_CONTINUE = 'log_and_continue'  # log error, continue chain
    SILENT = 'silent'                 # ignore errors completely

Best Practices

1. Naming

# ✅ Good
class AuthenticateEvent(Event[AuthContext])
class PreReadObjectsEvent(Event[ReadContext])

# ❌ Bad
class Event1(Event[Context1])
class MyEvent(Event[MyContext])

2. Context Design

# ✅ Good - immutable, use create_next
class MyContext(EventContext):
    value: int
    readonly_data: dict

new_context = context.create_next(
    listener_id=self.listener_id,
    value=new_value
)

# ❌ Bad - direct mutation
context.value = new_value  # Don't do this!

3. Error Handling

# ✅ Good - specific exceptions
if not valid:
    raise PermissionDenied('Reason here')

# ✅ Good - LOG_AND_CONTINUE for non-critical
@listen_to(MyEvent, error_strategy=ErrorStrategy.LOG_AND_CONTINUE)

# ❌ Bad - generic exceptions
raise Exception('Something wrong')

4. Testing

def test_my_listener():
    # Isolate EventBus for tests
    EventBus.reset()

    # Register listeners
    EventBus.subscribe(MyEvent, MyListener)

    # Test
    context = MyContext(value=10)
    result = EventBus.emit(MyEvent, context)

    assert result.value == 20

5. Dependencies

# ✅ Good - explicit dependencies
@listen_to(MyEvent, after=['auth.TokenValidator'])

# ❌ Bad - implicit order via priority
@listen_to(MyEvent, priority=150)  # who knows what this means?

Troubleshooting

ValueError: Circular dependency detected

# Problem
@listen_to(MyEvent, after=['B'])
class A: pass

@listen_to(MyEvent, after=['A'])
class B: pass

# Solution: review dependencies
@listen_to(MyEvent, priority=100)
class A: pass

@listen_to(MyEvent, after=['A'], priority=200)
class B: pass

NotImplementedError: does not implement sync/async handling

# Problem - calling async but not implemented
await EventBus.aemit(MyEvent, context)

# Solution - implement both methods
class MyListener(EventListener[MyContext]):
    def handle(self, context, next_fn):
        return next_fn(context)

    async def ahandle(self, context, next_fn):
        return await next_fn(context)

Listener doesn't execute in correct order

# Check order
listeners = EventBus.get_listeners(MyEvent)
print(listeners)  # ['first.Listener', 'second.Listener', ...]

# Check dependencies
@listen_to(MyEvent, after=['first.Listener'])
class SecondListener: pass