Skip to content

Transactions

The @transaction decorator ensures atomicity: all operations within the decorated function either commit on success or roll back on exception.

Basic Usage

from amsdal.transactions import transaction

@transaction
def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    person.save()
    return person
from amsdal.transactions import async_transaction

@async_transaction
async def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    await person.asave()
    return person

If the function raises an exception, all database changes made inside it are rolled back automatically.

Parameters

name

Add name for database logging. Each top-level transaction is recorded in the database and linked to the metadata of all objects created or modified within it.

from amsdal.transactions import transaction

@transaction(name="Create Person")
def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    person.save()
    return person
from amsdal.transactions import async_transaction

@async_transaction(name="Create Person")
async def create_person(first_name: str, last_name: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    await person.asave()
    return person

Nested Transactions

Nested transactions are independent. If an inner transaction fails and the outer one catches the exception, only the inner transaction is rolled back:

from amsdal.transactions import transaction

@transaction
def inner():
    Person(first_name="John", last_name="Doe").save()
    raise ValueError("something went wrong")

@transaction
def outer():
    Person(first_name="Jane", last_name="Doe").save()

    try:
        inner()  # rolled back
    except ValueError:
        pass
    # Jane is still saved
from amsdal.transactions import async_transaction

@async_transaction
async def inner():
    await Person(first_name="John", last_name="Doe").asave()
    raise ValueError("something went wrong")

@async_transaction
async def outer():
    await Person(first_name="Jane", last_name="Doe").asave()

    try:
        await inner()  # rolled back
    except ValueError:
        pass
    # Jane is still saved

Background Transactions

To execute transactions in the background, configure a worker connection and use the .submit() method.

Background transactions use Celery under the hood, which requires a message broker (e.g., RabbitMQ or Redis).

AMSDAL Cloud

When deployed to AMSDAL Cloud, the broker and worker infrastructure is configured automatically — no extra setup needed. Just write your transaction code and deploy.

For local development, you need to run a broker yourself (e.g., via Docker, a system package, or a managed service) and configure it in config.yml.

Worker Configuration

Add a worker connection to your config.yml:

connections:
  - name: celery_worker
    backend: amsdal_data.transactions.background.connections.celery_connection.CeleryConnection
    credentials:
      - broker_url: amqp://guest:guest@localhost:5672/

resources_config:
  worker: celery_worker
async_mode: true

connections:
  - name: celery_worker
    backend: amsdal_data.transactions.background.connections.celery_connection.AsyncCeleryConnection
    credentials:
      - broker_url: amqp://guest:guest@localhost:5672/

resources_config:
  worker: celery_worker

Submitting Background Work

from amsdal.transactions import transaction

@transaction
def create_person_and_notify(first_name: str, last_name: str, email: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    person.save()

    send_email.submit(email)  # runs in background

    return person

@transaction
def send_email(email: str) -> None:
    # send email logic
    pass
from amsdal.transactions import async_transaction

@async_transaction
async def create_person_and_notify(first_name: str, last_name: str, email: str) -> Person:
    person = Person(first_name=first_name, last_name=last_name)
    await person.asave()

    await send_email.submit(email)  # runs in background

    return person

@async_transaction
async def send_email(email: str) -> None:
    # send email logic
    pass

Start the Celery worker to process background transactions:

amsdal worker run

This connects to the broker specified in config.yml and starts consuming tasks from the queue.

Scheduled Transactions

Schedule transactions to run periodically. Requires a worker connection that supports scheduling (e.g., Celery).

from datetime import timedelta
from amsdal.transactions import transaction, Crontab, ScheduleConfig

# Run every 10 minutes (interval in seconds)
@transaction(schedule=60 * 10)
def cleanup_expired() -> None:
    pass

# Run every midnight (crontab)
@transaction(schedule_config=ScheduleConfig(schedule=Crontab(minute=0, hour=0)))
def daily_report() -> None:
    pass

# Run every 3 days with arguments
@transaction(
    schedule_config=ScheduleConfig(
        schedule=timedelta(days=3),
        args=(1, 2),
        kwargs={"c": 3},
    )
)
def periodic_task(a: int, b: int, c: int) -> None:
    pass
from datetime import timedelta
from amsdal.transactions import async_transaction, Crontab, ScheduleConfig

@async_transaction(schedule=60 * 10)
async def cleanup_expired() -> None:
    pass

@async_transaction(schedule_config=ScheduleConfig(schedule=Crontab(minute=0, hour=0)))
async def daily_report() -> None:
    pass

@async_transaction(
    schedule_config=ScheduleConfig(
        schedule=timedelta(days=3),
        args=(1, 2),
        kwargs={"c": 3},
    )
)
async def periodic_task(a: int, b: int, c: int) -> None:
    pass

Start the worker in scheduler or hybrid mode:

# Scheduler only — puts tasks in the queue
amsdal worker run --mode scheduler

# Hybrid — schedules and processes tasks
amsdal worker run --mode hybrid

Rollback

AMSDAL records a full history of changes. You can roll back to a previous state by timestamp or by transaction ID. Rolling back creates new versions of affected objects (it does not delete history).

By Timestamp

from amsdal.utils.rollback import rollback_to_timestamp

# Roll back all changes made after the given timestamp
rollback_to_timestamp(c2.get_metadata().updated_at)
from amsdal.utils.rollback import async_rollback_to_timestamp

await async_rollback_to_timestamp((await c2.aget_metadata()).updated_at)

By Transaction ID

from amsdal.utils.rollback import rollback_transaction

# Roll back all changes made by transactions after the given one
rollback_transaction(c2.get_metadata().transaction.ref.object_id)
from amsdal.utils.rollback import async_rollback_transaction

await async_rollback_transaction(
    (await c2.aget_metadata()).transaction.ref.object_id
)

Limitations

You cannot roll back to a timestamp that falls in the middle of a transaction. If two objects were created within the same transaction, you cannot roll back to a point between them — an AmsdalTransactionError is raised.

from amsdal_data.transactions.errors import AmsdalTransactionError

@transaction
def create_two():
    Company(name='first').save()
    Company(name='second').save()

create_two()

# This fails — both were created in the same transaction
try:
    rollback_to_timestamp(first_company.get_metadata().updated_at)
except AmsdalTransactionError:
    pass  # cannot roll back to mid-transaction point

Code Organization

When transactions grow complex, extract shared logic into a separate module. The src directory serves as the root for custom code:

📁 src
├── 📁 transactions
│   ├── 📁 common
│   │   ├── __init__.py
│   │   └── utils.py
│   └── create_person.py
# transactions/create_person.py
from amsdal.transactions import transaction
from transactions.common.utils import authorize_user

@transaction
def create_person(user: User, first_name: str, last_name: str) -> Person:
    authorize_user(user, "create_person")
    person = Person(first_name=first_name, last_name=last_name)
    person.save()
    return person