CQRS AMSDAL Glue¶
This notebook is a guide to use the AMSDAL Glue with the CQRS pattern. We will use docker compose to run two PostgreSQL containers, one for the read model and another for the write model.
Prerequisites¶
We will run two postgres databases in docker containers, so make sure you have installed Docker locally.
# Functions to manage Docker containers
def start_containers():
# Start existing db container
!docker run -d \
--name write_db \
-e POSTGRES_DB=write_db \
-e POSTGRES_USER=db_user \
-e POSTGRES_PASSWORD=db_password \
-p 5433:5432 \
library/postgres:16
# Start new db container
!docker run -d \
--name read_db \
-e POSTGRES_DB=read_db \
-e POSTGRES_USER=db_user \
-e POSTGRES_PASSWORD=db_password \
-p 5434:5432 \
library/postgres:16
print("Containers started!")
def stop_containers():
!docker stop write_db read_db
!docker rm write_db read_db
print("Containers stopped and removed.")
def check_status():
!docker ps -a | grep "write_db\|read_db"
start_containers()
e8af7aa9483b88046bfd56d49c2a3dbb378e6457612ab74ae10c1250f4b53945
81a2ae395583aba5bc5ae518a8522ee738d9e446dc2337c598963e1f7f031b9c
Containers started!
As you can see, we are running two containers, one for the write model and another for the read model.
Install AMSDAL Glue¶
!pip install "amsdal-glue[postgres-binary]"
Initialize the AMSDAL Glue¶
Now let's initialize the default containers and configure our connections to databases. We will use the CQRSApplication class, that init required containers and configures the pipelines of execution.
from amsdal_glue import DefaultConnectionPool, PostgresConnection
from amsdal_glue.applications.cqrs import CQRSApplication
# init CQRS application
app = CQRSApplication()
# configure connections
app.query_connection_manager.register_connection_pool(
DefaultConnectionPool(
PostgresConnection,
dsn="postgres://db_user:db_password@localhost:5434/read_db",
),
)
app.command_connection_manager.register_connection_pool(
DefaultConnectionPool(
PostgresConnection,
dsn="postgres://db_user:db_password@localhost:5433/write_db",
),
)
print('Connections are configured!')
Connections are configured!
That's it! Now we are ready to use the AMSDAL Glue with the CQRS pattern.
Let's register a new schema/table. By CQRS pattern it means, this command will send to write_db
and then the read model will be updated as well in the background.
Register a schema¶
The first of all, let's define a schema for the user table:
from amsdal_glue import (
CheckConstraint,
Condition,
Conditions,
Field,
FieldLookup,
FieldReference,
IndexSchema,
PrimaryKeyConstraint,
PropertySchema,
Schema,
UniqueConstraint,
Value,
Version,
)
from amsdal_glue_core.common.expressions.field_reference import FieldReferenceExpression
user_schema = Schema(
name='user',
version=Version.LATEST,
properties=[
PropertySchema(
name='id',
type=int,
required=True,
),
PropertySchema(
name='email',
type=str,
required=True,
),
PropertySchema(
name='age',
type=int,
required=True,
),
PropertySchema(
name='first_name',
type=str,
required=False,
),
PropertySchema(
name='last_name',
type=str,
required=False,
),
],
constraints=[
PrimaryKeyConstraint(name='pk_user', fields=['id']),
UniqueConstraint(name='uk_user_email', fields=['email'], condition=None),
CheckConstraint(
name='ck_user_age',
condition=Conditions(
Condition(
left=FieldReferenceExpression(field_reference=FieldReference(field=Field(name='age'), table_name='user')),
lookup=FieldLookup.GT,
right=Value(value=18),
),
),
),
],
indexes=[
IndexSchema(name='idx_user_email', fields=['first_name', 'last_name']),
],
)
print('Schema is defined!')
Schema is defined!
Now, let's execute it:
from amsdal_glue import Container, SchemaCommand, RegisterSchema
from amsdal_glue.interfaces import SchemaCommandService
service = Container.services.get(SchemaCommandService)
result = service.execute(
SchemaCommand(
mutations=[
RegisterSchema(schema=user_schema),
],
),
)
if result.success:
print('Schema is registered!')
else:
raise Exception(result.message) from result.exception
# We need to call shutdown emulating the end of the application to wait for the background process to finish.
app.shutdown(skip_close_connections=True)
Schema is registered!
By CQRS pattern, the schema is registered in the write model and then the read model is updated in the background. Let's check the schema in the read model:
from amsdal_glue import Container, SchemaQueryOperation
from amsdal_glue.interfaces import SchemaQueryService
query_service = Container.services.get(SchemaQueryService)
result = query_service.execute(
SchemaQueryOperation(filters=None),
)
if result.success:
print('Schema:', result.schemas)
else:
raise Exception(result.message) from result.exception
Schema: [Schema<.user_v_Version.LATEST:None:[PropertySchema<id:<class 'int'>:True:None:None>, PropertySchema<age:<class 'int'>:True:None:None>, PropertySchema<email:<class 'str'>:True:None:None>, PropertySchema<first_name:<class 'str'>:False:None:None>, PropertySchema<last_name:<class 'str'>:False:None:None>]:[PrimaryKeyConstraint(name='pk_user', fields=['id']), UniqueConstraint(name='uk_user_email', fields=['email'], condition=None)]:[IndexSchema<idx_user_email:['first_name', 'last_name']:None>]>]
Here, the actual query is executed in the read database, and the schema is returned.
That's it! Now you can use the AMSDAL Glue with the CQRS pattern.
Clean up¶
stop_containers()
write_db
read_db
write_db read_db
Containers stopped and removed.