Events System¶
Middleware-based event system for building extensible and type-safe plugins in the AMSDAL framework.
The Events system is a middleware chain pattern for event handling in the AMSDAL framework. It enables:
- Plugins to define their own events without modifying core code
- Type-safe event handling through generics
- Execution order control via dependencies and priority
- Context history tracking
- Flexible error handling through ErrorStrategy
- Both sync and async support
Architecture¶
Middleware Chain Pattern¶
Event → Listener 1 → Listener 2 → Listener 3 → Result
↓ ↓ ↓
next_fn() next_fn() next_fn()
Each listener:
1. Receives context and next_fn
2. Executes its logic
3. Either calls next_fn(context) to continue
4. Or raises exception to stop
Components¶
┌─────────────────────────────────────────────────┐
│ EventBus │
│ - subscribe() - emit() │
│ - unsubscribe() - aemit() │
└─────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Event │ │ EventContext │ │EventListener │
│ (generic) │ │ (history) │ │ (middleware) │
└──────────────┘ └──────────────┘ └──────────────┘
Topological Sort¶
Listeners are sorted considering:
1. Dependencies (after, before)
2. Priority (lower number = earlier)
3. Cycle detection (raises ValueError)
Registering Listeners¶
Listeners must be registered during application initialization — inside AppConfig.on_setup(). The @listen_to decorator and EventBus.subscribe() execute immediately when the code is evaluated, so they must run at the right time in the startup sequence.
AMSDAL loads configs in this order:
- Plugins (
AMSDAL_CONTRIBS) — each plugin'son_setup()is called - Application (
APP_CONFIG) — the main app'son_setup()is called
For end-user applications¶
Create an app.py file in your source directory (src/) with a MainAppConfig class:
# src/app.py
from amsdal.contrib.app_config import AppConfig
class MainAppConfig(AppConfig):
def on_setup(self) -> None:
from amsdal_utils.events import EventBus
from amsdal_server.apps.common.events.server import ServerStartupEvent
from myapp.listeners import MyStartupListener
EventBus.subscribe(ServerStartupEvent, MyStartupListener)
AMSDAL discovers this file automatically via the APP_CONFIG setting (default: app.MainAppConfig). The amsdal new CLI command generates this file for you.
Note
If app.py doesn't exist in your source directory, AMSDAL silently skips it — it's optional.
If you set a custom AMSDAL_APP_CONFIG path, the file must exist.
For plugins¶
Same pattern, but registered via AMSDAL_CONTRIBS:
# my_plugin/app.py
from amsdal.contrib.app_config import AppConfig
class MyPluginAppConfig(AppConfig):
def on_setup(self) -> None:
from amsdal_utils.events import EventBus
from amsdal_server.apps.common.events.server import RouterSetupEvent
from my_plugin.listeners import MyRouteListener
EventBus.subscribe(RouterSetupEvent, MyRouteListener)
AMSDAL_CONTRIBS="amsdal.contrib.auth.app.AuthAppConfig,...,my_plugin.app.MyPluginAppConfig"
Warning
Do not use @listen_to or EventBus.subscribe() at module top level or in model files. Listeners registered outside on_setup() may execute before the framework is ready, leading to import errors or missed events.
Quick Start¶
1. Define Context¶
from amsdal_utils.events import EventContext
class UserContext(EventContext):
user_id: str
username: str
authenticated: bool = False
2. Define Event¶
from amsdal_utils.events import Event
class AuthenticateEvent(Event[UserContext]):
"""Event emitted during authentication"""
pass
3. Create Listener¶
from amsdal_utils.events import EventListener, listen_to
@listen_to(AuthenticateEvent, priority=100)
class TokenValidator(EventListener[UserContext]):
def handle(self, context: UserContext, next_fn):
# Your logic
if not self._validate_token(context.user_id):
raise ValueError('Invalid token')
# Modify context
new_context = context.create_next(
listener_id=self.listener_id,
authenticated=True
)
# Continue chain
return next_fn(new_context)
async def ahandle(self, context: UserContext, next_fn):
# Async version
raise NotImplementedError
4. Emit Event¶
from amsdal_utils.events import EventBus
# Sync
context = UserContext(user_id='123', username='john')
result = EventBus.emit(AuthenticateEvent, context)
print(result.authenticated) # True
# Async
result = await EventBus.aemit(AuthenticateEvent, context)
Core Concepts¶
1. EventContext¶
Immutable Pydantic model with history tracking. The frozen=True config prevents accidental mutations.
from amsdal_utils.events import EventContext, EventBus
from pydantic import ValidationError
class MyContext(EventContext):
value: int
processed: bool = False
# Create initial context
context = MyContext(value=10)
# ✅ Correct: Create new version with create_next()
new_context = context.create_next(
listener_id='my.Listener',
value=context.value * 2,
processed=True
)
# ❌ Wrong: Direct mutation raises ValidationError
try:
context.value = 20 # Raises: ValidationError - "MyContext" is frozen
except ValidationError:
print("Mutation prevented!")
# After emitting event, access history from final context
result = EventBus.emit(MyEvent, context)
for entry in result.history:
print(f'{entry.listener_id}: {entry.context_snapshot.value}')
# Get specific version from history
old_version = result.get_by_listener('my.Listener')
Why Pydantic?
- Automatic immutability enforcement with frozen=True
- Type validation on field assignment
- No need for manual __post_init__ or defensive copying
- Better integration with serialization/deserialization
2. Event Types¶
Events can define default ErrorStrategy:
class CriticalEvent(Event[MyContext]):
"""Critical event - stop on error"""
default_error_strategy = ErrorStrategy.PROPAGATE
class NonCriticalEvent(Event[MyContext]):
"""Non-critical - log and continue"""
default_error_strategy = ErrorStrategy.LOG_AND_CONTINUE
3. Listener Ordering¶
Priority (basic sorting)¶
@listen_to(MyEvent, priority=100) # executes first
class FirstListener(EventListener[MyContext]):
pass
@listen_to(MyEvent, priority=200) # executes second
class SecondListener(EventListener[MyContext]):
pass
Dependencies (after/before)¶
You can specify dependencies using either class references (type-safe) or string paths:
@listen_to(MyEvent)
class AuthListener(EventListener[MyContext]):
pass
class MLListener(EventListener[MyContext]):
pass
# Option 1: Using class references (recommended - type-safe)
@listen_to(
MyEvent,
after=[AuthListener], # after auth
before=[MLListener] # before ML
)
class CustomListener(EventListener[MyContext]):
pass
# Option 2: Using string paths (for cross-module references)
@listen_to(
MyEvent,
after=['myapp.auth.AuthListener'], # fully-qualified module path
before=['myapp.ml.MLListener']
)
class CustomListener(EventListener[MyContext]):
pass
# Option 3: Mixed approach
@listen_to(
MyEvent,
after=[AuthListener], # local class
before=['myapp.ml.MLListener'] # remote class
)
class CustomListener(EventListener[MyContext]):
pass
String Path Validation:
- String paths must be fully-qualified module paths (e.g., "module.ClassName")
- The system validates paths by attempting to import them
- Invalid paths raise ValueError immediately to catch typos early
- Use class references when possible for better type safety
Topological Sort¶
The system automatically sorts listeners:
- Respects after/before dependencies
- Uses priority as tiebreaker
- Detects circular dependencies
# Circular dependency raises ValueError
@listen_to(MyEvent, after=['B'])
class A(EventListener[MyContext]): pass
@listen_to(MyEvent, after=['A'])
class B(EventListener[MyContext]): pass
# ValueError: Circular dependency detected among listeners: ['A', 'B']
4. Error Handling¶
class ErrorStrategy(Enum):
PROPAGATE = 'propagate' # raise exception, stop chain
LOG_AND_CONTINUE = 'log_and_continue' # log error, continue
SILENT = 'silent' # ignore errors
# Per-listener
@listen_to(MyEvent, error_strategy=ErrorStrategy.LOG_AND_CONTINUE)
class SafeListener(EventListener[MyContext]):
pass
# Per-event (default for all listeners)
class SafeEvent(Event[MyContext]):
default_error_strategy = ErrorStrategy.LOG_AND_CONTINUE
5. Sync vs Async¶
Listeners can be sync or async:
class MyListener(EventListener[MyContext]):
def handle(self, context, next_fn):
# Sync implementation
return next_fn(context)
async def ahandle(self, context, next_fn):
# Async implementation
await some_async_operation()
return await next_fn(context)
# Usage
result = EventBus.emit(MyEvent, context) # sync
result = await EventBus.aemit(MyEvent, context) # async
Usage Examples¶
Example 1: Authentication System¶
from amsdal_utils.events import Event, EventContext, EventListener, listen_to
class AuthContext(EventContext):
credentials: str
user: dict | None = None
authenticated: bool = False
class AuthenticateEvent(Event[AuthContext]):
pass
@listen_to(AuthenticateEvent, priority=100)
class TokenValidator(EventListener[AuthContext]):
def handle(self, context: AuthContext, next_fn):
user = self._validate_token(context.credentials)
if not user:
raise Unauthorized('Invalid credentials')
return next_fn(context.create_next(
listener_id=self.listener_id,
user=user,
authenticated=True
))
async def ahandle(self, context: AuthContext, next_fn):
user = await self._async_validate_token(context.credentials)
if not user:
raise Unauthorized('Invalid credentials')
return await next_fn(context.create_next(
listener_id=self.listener_id,
user=user,
authenticated=True
))
Example 2: Server Event with Permission Check¶
class ReadObjectsContext(EventContext):
model: type
filters: dict
user: dict
class PreReadObjectsEvent(Event[ReadObjectsContext]):
"""Emitted before reading objects"""
pass
# In server
async def read_objects_handler(request):
context = ReadObjectsContext(
model=request.model,
filters=request.filters,
user=request.user,
)
try:
result = await EventBus.aemit(PreReadObjectsEvent, context)
objects = await db.read(result.model, result.filters)
return objects
except PermissionDenied as e:
return Response(status=403, body=str(e))
# In auth plugin
@listen_to(PreReadObjectsEvent, priority=100)
class ReadPermissionChecker(EventListener[ReadObjectsContext]):
async def ahandle(self, context: ReadObjectsContext, next_fn):
if not self._can_read(context.user, context.model):
raise PermissionDenied('No read permission')
# Add tenant filter
new_filters = {**context.filters, 'tenant_id': context.user['tenant_id']}
return await next_fn(context.create_next(
listener_id=self.listener_id,
filters=new_filters
))
Example 3: Audit Logging¶
@listen_to(
PreReadObjectsEvent,
priority=50, # runs first
error_strategy=ErrorStrategy.LOG_AND_CONTINUE
)
class AuditLogger(EventListener[ReadObjectsContext]):
async def ahandle(self, context: ReadObjectsContext, next_fn):
start = time.time()
try:
result = await next_fn(context)
await self._log_audit(
user=context.user,
action='read',
model=context.model,
duration=time.time() - start,
status='success'
)
return result
except Exception as e:
await self._log_audit(
user=context.user,
action='read',
model=context.model,
duration=time.time() - start,
status='error',
error=str(e)
)
raise
API Reference¶
EventBus¶
class EventBus:
@classmethod
def subscribe(
cls,
event: type[Event[TContext]],
listener: type[EventListener[TContext]],
after: list[type[EventListener[Any]] | str] | None = None,
before: list[type[EventListener[Any]] | str] | None = None,
priority: int = 500,
error_strategy: ErrorStrategy | None = None,
) -> None:
"""Register listener for event"""
@classmethod
def unsubscribe(cls, event: type[Event[Any]], listener_id: str) -> bool:
"""Remove listener from event"""
@classmethod
def emit(cls, event: type[Event[TContext]], context: TContext) -> TContext:
"""Execute sync middleware chain"""
@classmethod
async def aemit(cls, event: type[Event[TContext]], context: TContext) -> TContext:
"""Execute async middleware chain"""
@classmethod
def reset(cls) -> None:
"""Clear all listeners (for testing)"""
@classmethod
def get_listeners(cls, event: type[Event[Any]]) -> list[str]:
"""Get listener IDs in execution order"""
Event¶
class Event(Generic[TContext]):
context_type: type[TContext] # auto-detected
default_error_strategy: ErrorStrategy = ErrorStrategy.PROPAGATE
EventContext¶
class EventContext(BaseModel, ABC):
def create_next(self: TContext, listener_id: str, **changes) -> TContext:
"""Create new context version with changes"""
@property
def history(self) -> list[ContextHistoryEntry]:
"""Get full history of mutations"""
def get_by_listener(self, listener_id: str) -> EventContext | None:
"""Get context version by listener ID"""
EventListener¶
class EventListener(ABC, Generic[TContext]):
listener_id: str # auto-generated
@abstractmethod
def handle(self, context: TContext, next_fn: NextFn[TContext]) -> TContext:
"""Sync handler"""
@abstractmethod
async def ahandle(self, context: TContext, next_fn: AsyncNextFn[TContext]) -> TContext:
"""Async handler"""
Decorators¶
def listen_to(
event: type[Event[TContext]],
after: list[type[EventListener[Any]] | str] | None = None,
before: list[type[EventListener[Any]] | str] | None = None,
priority: int = 500,
error_strategy: ErrorStrategy | None = None,
) -> Callable:
"""Decorator to auto-register listener"""
ErrorStrategy¶
class ErrorStrategy(Enum):
PROPAGATE = 'propagate' # raise exception, stop chain
LOG_AND_CONTINUE = 'log_and_continue' # log error, continue chain
SILENT = 'silent' # ignore errors completely
Best Practices¶
1. Naming¶
# ✅ Good
class AuthenticateEvent(Event[AuthContext])
class PreReadObjectsEvent(Event[ReadContext])
# ❌ Bad
class Event1(Event[Context1])
class MyEvent(Event[MyContext])
2. Context Design¶
# ✅ Good - immutable, use create_next
class MyContext(EventContext):
value: int
readonly_data: dict
new_context = context.create_next(
listener_id=self.listener_id,
value=new_value
)
# ❌ Bad - direct mutation
context.value = new_value # Don't do this!
3. Error Handling¶
# ✅ Good - specific exceptions
if not valid:
raise PermissionDenied('Reason here')
# ✅ Good - LOG_AND_CONTINUE for non-critical
@listen_to(MyEvent, error_strategy=ErrorStrategy.LOG_AND_CONTINUE)
# ❌ Bad - generic exceptions
raise Exception('Something wrong')
4. Testing¶
def test_my_listener():
# Isolate EventBus for tests
EventBus.reset()
# Register listeners
EventBus.subscribe(MyEvent, MyListener)
# Test
context = MyContext(value=10)
result = EventBus.emit(MyEvent, context)
assert result.value == 20
5. Dependencies¶
# ✅ Good - explicit dependencies
@listen_to(MyEvent, after=['auth.TokenValidator'])
# ❌ Bad - implicit order via priority
@listen_to(MyEvent, priority=150) # who knows what this means?
Troubleshooting¶
ValueError: Circular dependency detected¶
# Problem
@listen_to(MyEvent, after=['B'])
class A: pass
@listen_to(MyEvent, after=['A'])
class B: pass
# Solution: review dependencies
@listen_to(MyEvent, priority=100)
class A: pass
@listen_to(MyEvent, after=['A'], priority=200)
class B: pass
NotImplementedError: does not implement sync/async handling¶
# Problem - calling async but not implemented
await EventBus.aemit(MyEvent, context)
# Solution - implement both methods
class MyListener(EventListener[MyContext]):
def handle(self, context, next_fn):
return next_fn(context)
async def ahandle(self, context, next_fn):
return await next_fn(context)
Listener doesn't execute in correct order¶
# Check order
listeners = EventBus.get_listeners(MyEvent)
print(listeners) # ['first.Listener', 'second.Listener', ...]
# Check dependencies
@listen_to(MyEvent, after=['first.Listener'])
class SecondListener: pass