Skip to content

Transactions

Basic usage

Any python function that is decorated with amsdal_data.transactions.decorators.transaction is transaction function.

For example:

from amsdal_data.transactions import transaction
from models.user.person import Person

@transaction
def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    person.save()

    return person

The @transaction decorator ensures atomicity of operations inside decorated function.

from amsdal_data.transactions import async_transaction
from models.user.person import Person

@async_transaction
async def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    await person.asave()

    return person

The @async_transaction decorator ensures atomicity of operations inside decorated function.

Moreover, when you create the application using AMSDAL CLI and place your transaction functions inside transactions directory (see details here), it will be possible to execute these transaction functions via API endpoint of AMSDAL Server.

Nested transactions

You can use nested transactions, and they will work on each level of nesting. In the following example, internal_transaction is called inside external_transaction. internal_transaction raises an exception and is rolled back, but external_transaction is not affected, because it catches the exception and do not propagate it.

from amsdal_data.transactions import transaction
from models.user.person import Person

@transaction
def internal_transaction():
    person = Person(first_name="John", last_name="Doe")
    person.save()

    raise Exception("This is an exception")


@transaction
def external_transaction():
    person = Person(first_name="Jane", last_name="Doe")
    person.save()

    try:
        internal_transaction()
    except Exception as e:
        pass
from amsdal_data.transactions import async_transaction
from models.user.person import Person

@async_transaction
async def internal_transaction():
    person = Person(first_name="John", last_name="Doe")
    await person.asave()

    raise Exception("This is an exception")


@async_transaction
async def external_transaction():
    person = Person(first_name="Jane", last_name="Doe")
    await person.asave()

    try:
        await internal_transaction()
    except Exception as e:
        pass

Database logging

Each top level transaction function is logged in the database, and added to the metadata of objects that are created or modified inside the transaction. You can add name and tags optional arguments to the transcation decorator, and they will be used in that logging.

from amsdal_data.transactions import transaction
from models.user.person import Person

@transaction(name="Create Person", tags=["person", "create"])
def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    person.save()

    return person
from amsdal_data.transactions import async_transaction
from models.user.person import Person

@async_transaction(name="Create Person", tags=["person", "create"])
async def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    await person.asave()

    return person

Complex structure of transactions

In most cases, you want to use transactions in more complex scenarios, with usage of other transaction functions, models, enums etc., and put it all together in one file is not a good idea. For that, just develop your code as usual, and use src directory as your root directory for custom code.

- 📁 src
  - 📁 transactions
    - 📁 common
      - 📄 __init__.py
      - 📄 utils.py
    - 📄 create_person.py

create_person.py

from amsdal_data.transactions import transaction
from models.user.person import Person
from models.user.user import User

from transactions.common.utils import authorize_user

@transaction
def create_person(user: User, first_name: str, last_name: str) -> Person:
    authorize_user(user, "create_person")
    person = Person(first_name=first_name, last_name=last_name)
    person.save()

    return person
from amsdal_data.transactions import async_transaction
from models.user.person import Person
from models.user.user import User

from transactions.common.utils import authorize_user

@async_transaction
async def create_person(user: User, first_name: str, last_name: str) -> Person:
    authorize_user(user, "create_person")
    person = Person(first_name=first_name, last_name=last_name)
    await person.asave()

    return person

common/utils.py

def authorize_user(user: User, permission: str) -> bool:
    if not User.has_permission(user, permission):
        msg = f"User {user} does not have permission {permission}"
        raise Exception(msg)

Background transactions

Sometimes your transactions can take some time to execute, and you do not want to block the main process. For that, you can define worker connection in your config.yml file. After that, you can use .submit method of Transaction object to execute the transaction in the background.

config.yml

application_name: application_with_worker
connections:
...
- name: celery_worker
    backend: amsdal_data.transactions.background.connections.celery_connection.CeleryConnection
    credentials:
    - broker_url: amqp://guest:guest@localhost:5672/
...
resources_config:
...
worker: celery_worker
...
application_name: application_with_worker
async_mode: true
connections:
...
- name: celery_worker
    backend: amsdal_data.transactions.background.connections.celery_connection.AsyncCeleryConnection
    credentials:
    - broker_url: amqp://guest:guest@localhost:5672/
...
resources_config:
...
worker: celery_worker
...

person_management.py

from amsdal_data.transactions import transaction
from models.user.person import Person

@transaction
def create_person_and_send_email(
    first_name: str,
    last_name: str,
    email: str,
) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    person.save()

    # Send email in the background
    send_email.submit(email)

    return person

@transaction
def send_email(email: str) -> None:
    # Send email
    pass
from amsdal_data.transactions import async_transaction
from models.user.person import Person

@async_transaction
async def create_person_and_send_email(
    first_name: str,
    last_name: str,
    email: str,
) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    await person.asave()

    # Send email in the background
    await send_email.submit(email)

    return person

@async_transaction
async def send_email(email: str) -> None:
    # Send email
    pass

To process background transactions, you need to start the worker. To do this, you can use the following command:

amsdal worker run

That's it! Now, when you call create_person_and_send_email transaction, the send_email transaction will be executed in the background.

Scheduled transactions

You can schedule your transactions to be executed in the future. For that, you need to use worker that supports scheduling, like amsdal_data.transactions.background.connections.celery_connection.CeleryConnection.

Then, in your file with transaction, just add schedule or schedule_config argument to the @transaction decorator.

scheduled_transactions.py

from datetime import timedelta
from amsdal_data.transactions import transaction
from amsdal_data.transactions.background.schedule import Crontab
from amsdal_data.transactions.background.schedule import ScheduleConfig

@transaction(schedule=60 * 10)
def run_every_10_minutes() -> None:
    # This transaction will be executed every 10 minutes
    pass


@transaction(
    schedule_config=ScheduleConfig(
        schedule=Crontab(minute=0, hour=0)
    )
)
def run_every_midnight() -> None:
    # This transaction will be executed every midnight
    pass


@transaction(
    schedule_config=ScheduleConfig(
        schedule=timedelta(days=3),
        args=(1, 2),
        kwargs={"c": 3}
    )
)
def run_every_3_days(a: int, b: int, c: int) -> None:
    # This transaction will be executed every 3 days with 
    # arguments (1, 2) and keyword arguments {"c": 3}
    pass
from datetime import timedelta
from amsdal_data.transactions import async_transaction
from amsdal_data.transactions.background.schedule import Crontab
from amsdal_data.transactions.background.schedule import ScheduleConfig

@async_transaction(schedule=60 * 10)
async def run_every_10_minutes() -> None:
    # This transaction will be executed every 10 minutes
    pass


@async_transaction(
    schedule_config=ScheduleConfig(
        schedule=Crontab(minute=0, hour=0)
    )
)
async def run_every_midnight() -> None:
    # This transaction will be executed every midnight
    pass


@async_transaction(
    schedule_config=ScheduleConfig(
        schedule=timedelta(days=3),
        args=(1, 2),
        kwargs={"c": 3}
    )
)
async def run_every_3_days(a: int, b: int, c: int) -> None:
    # This transaction will be executed every 3 days with 
    # arguments (1, 2) and keyword arguments {"c": 3}
    pass

To process scheduled transactions, you need to start the worker in the scheduler mode. To do this, you can use one of the following commands:

amsdal worker run --mode scheduler

or

amsdal worker run --mode hybrid

The main difference between these two modes is:

  • In scheduler mode, the worker will only put tasks in the queue.
  • In hybrid mode, the worker will also process tasks from the queue.

Transaction rollback

Sometimes you want to rollback to the specific point in time. For that, you can use rollback_to_timestamp function. In the following example, we create three companies, and then rollback to the timestamp of the second company. This will mark the third company as deleted. After that, we rollback to the timestamp of the first company. This will mark the second company as deleted. So rollback actually rollbacks to the state of that timestamp, creating a new version of the objects.

from amsdal.utils.rollback import rollback_to_timestamp

from models.user.company import Company

c1 = Company(name='a1').save()
c2 = Company(name='a2').save()
Company(name='a3').save()

assert 3 == Company.objects.all().count().execute()
assert 3 == Company.objects.all().latest().count().execute()

rollback_to_timestamp(c2.get_metadata().updated_at)

assert 4 == Company.objects.all().count().execute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a2', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': c.get_metadata().is_deleted,
    }
    for c in Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).execute()
]

rollback_to_timestamp(c1.get_metadata().updated_at)

assert 5 == Company.objects.all().count().execute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
    {'name': 'a2', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': c.get_metadata().is_deleted,
    }
    for c in Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).execute()
]
from amsdal.utils.rollback import async_rollback_to_timestamp

from models.user.company import Company

c1 = await Company(name='a1').asave()
c2 = await Company(name='a2').asave()
await Company(name='a3').asave()

assert 3 == await Company.objects.all().count().aexecute()
assert 3 == await Company.objects.all().latest().count().aexecute()

await async_rollback_to_timestamp((await c2.aget_metadata()).updated_at)

assert 4 == await Company.objects.all().count().aexecute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a2', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': (await c.aget_metadata()).is_deleted,
    }
    for c in await Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).aexecute()
]

await async_rollback_to_timestamp((await c1.aget_metadata()).updated_at)

assert 5 == await Company.objects.all().count().aexecute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
    {'name': 'a2', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': (await c.aget_metadata()).is_deleted,
    }
    for c in await Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).aexecute()
]

Please note that rollback is not possible to the timestamp that is in the middle of some transaction. In the example below, we try to rollback to the timestamp of the first company, but it raises an exception, because the second company is created in the same transaction.

import pytest
from amsdal.utils.rollback import rollback_to_timestamp

from models.user.company import Company

@transaction
def create_companies(name_prefix: str = 'a') -> tuple['Company', 'Company']:
    a1 = Company(name=f'{name_prefix}1').save()
    a2 = Company(name=f'{name_prefix}2').save()

    return a1, a2


c1, c2 = create_companies()

assert 2 == Company.objects.all().count().execute()
assert 2 == Company.objects.all().latest().count().execute()

with pytest.raises(AmsdalTransactionError):
    rollback_to_timestamp(c1.get_metadata().updated_at)

assert 2 == Company.objects.all().count().execute()
assert 2 == Company.objects.all().latest().count().execute()
import pytest
from amsdal.utils.rollback import async_rollback_to_timestamp

from models.user.company import Company

@async_transaction
async def create_companies(name_prefix: str = 'a') -> tuple['Company', 'Company']:
    a1 = await Company(name=f'{name_prefix}1').asave()
    a2 = await Company(name=f'{name_prefix}2').asave()

    return a1, a2


c1, c2 = await create_companies()

assert 2 == await Company.objects.all().count().aexecute()
assert 2 == await Company.objects.all().latest().count().aexecute()

with pytest.raises(AmsdalTransactionError):
    await async_rollback_to_timestamp((await c1.aget_metadata()).updated_at)

assert 2 == await Company.objects.all().count().aexecute()
assert 2 == await Company.objects.all().latest().count().aexecute()

You can also rollback to the specific transaction object id using rollback_transaction function. This works almost the same as rollback to the timestamp, but you need to provide the transaction object id.

from amsdal.utils.rollback import rollback_transaction

from models.user.company import Company

c1 = Company(name='a1').save()
c2 = Company(name='a2').save()
Company(name='a3').save()

assert 3 == Company.objects.all().count().execute()
assert 3 == Company.objects.all().latest().count().execute()

rollback_transaction(c2.get_metadata().transaction.object_id)

assert 4 == Company.objects.all().count().execute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a2', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': c.get_metadata().is_deleted,
    }
    for c in Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).execute()
]

rollback_transaction(c1.get_metadata().transaction.object_id)

assert 5 == Company.objects.all().count().execute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
    {'name': 'a2', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': c.get_metadata().is_deleted,
    }
    for c in Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).execute()
]
from amsdal.utils.rollback import async_rollback_transaction

from models.user.company import Company

c1 = await Company(name='a1').asave()
c2 = await Company(name='a2').asave()
await Company(name='a3').asave()

assert 3 == await Company.objects.all().count().aexecute()
assert 3 == await Company.objects.all().latest().count().aexecute()

await async_rollback_transaction((await c2.aget_metadata()).transaction.object_id)

assert 4 == await Company.objects.all().count().aexecute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a2', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': (await c.aget_metadata()).is_deleted,
    }
    for c in await Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).aexecute()
]

await async_rollback_transaction((await c1.aget_metadata()).transaction.object_id)

assert 5 == await Company.objects.all().count().aexecute()

assert [
    {'name': 'a1', 'is_deleted': False},
    {'name': 'a3', 'is_deleted': True},
    {'name': 'a2', 'is_deleted': True},
] == [
    {
        'name': c.name,
        'is_deleted': (await c.aget_metadata()).is_deleted,
    }
    for c in await Company.objects.all().latest().order_by(
        '_metadata__updated_at'
    ).aexecute()
]