Transactions¶
The @transaction decorator ensures atomicity: all operations within the decorated function either commit on success or roll back on exception.
Basic Usage¶
from amsdal.transactions import transaction
@transaction
def create_person(first_name: str, last_name: str) -> Person:
person = Person(first_name=first_name, last_name=last_name)
person.save()
return person
from amsdal.transactions import async_transaction
@async_transaction
async def create_person(first_name: str, last_name: str) -> Person:
person = Person(first_name=first_name, last_name=last_name)
await person.asave()
return person
If the function raises an exception, all database changes made inside it are rolled back automatically.
Parameters¶
name¶
Add name for database logging. Each top-level transaction is recorded in the database and linked to the metadata of all objects created or modified within it.
from amsdal.transactions import transaction
@transaction(name="Create Person")
def create_person(first_name: str, last_name: str) -> Person:
person = Person(first_name=first_name, last_name=last_name)
person.save()
return person
from amsdal.transactions import async_transaction
@async_transaction(name="Create Person")
async def create_person(first_name: str, last_name: str) -> Person:
person = Person(first_name=first_name, last_name=last_name)
await person.asave()
return person
Nested Transactions¶
Nested transactions are independent. If an inner transaction fails and the outer one catches the exception, only the inner transaction is rolled back:
from amsdal.transactions import transaction
@transaction
def inner():
Person(first_name="John", last_name="Doe").save()
raise ValueError("something went wrong")
@transaction
def outer():
Person(first_name="Jane", last_name="Doe").save()
try:
inner() # rolled back
except ValueError:
pass
# Jane is still saved
from amsdal.transactions import async_transaction
@async_transaction
async def inner():
await Person(first_name="John", last_name="Doe").asave()
raise ValueError("something went wrong")
@async_transaction
async def outer():
await Person(first_name="Jane", last_name="Doe").asave()
try:
await inner() # rolled back
except ValueError:
pass
# Jane is still saved
Background Transactions¶
To execute transactions in the background, configure a worker connection and use the .submit() method.
Background transactions use Celery under the hood, which requires a message broker (e.g., RabbitMQ or Redis).
AMSDAL Cloud
When deployed to AMSDAL Cloud, the broker and worker infrastructure is configured automatically — no extra setup needed. Just write your transaction code and deploy.
For local development, you need to run a broker yourself (e.g., via Docker, a system package, or a managed service) and configure it in config.yml.
Worker Configuration¶
Add a worker connection to your config.yml:
connections:
- name: celery_worker
backend: amsdal_data.transactions.background.connections.celery_connection.CeleryConnection
credentials:
- broker_url: amqp://guest:guest@localhost:5672/
resources_config:
worker: celery_worker
async_mode: true
connections:
- name: celery_worker
backend: amsdal_data.transactions.background.connections.celery_connection.AsyncCeleryConnection
credentials:
- broker_url: amqp://guest:guest@localhost:5672/
resources_config:
worker: celery_worker
Submitting Background Work¶
from amsdal.transactions import transaction
@transaction
def create_person_and_notify(first_name: str, last_name: str, email: str) -> Person:
person = Person(first_name=first_name, last_name=last_name)
person.save()
send_email.submit(email) # runs in background
return person
@transaction
def send_email(email: str) -> None:
# send email logic
pass
from amsdal.transactions import async_transaction
@async_transaction
async def create_person_and_notify(first_name: str, last_name: str, email: str) -> Person:
person = Person(first_name=first_name, last_name=last_name)
await person.asave()
await send_email.submit(email) # runs in background
return person
@async_transaction
async def send_email(email: str) -> None:
# send email logic
pass
Start the Celery worker to process background transactions:
amsdal worker run
This connects to the broker specified in config.yml and starts consuming tasks from the queue.
Scheduled Transactions¶
Schedule transactions to run periodically. Requires a worker connection that supports scheduling (e.g., Celery).
from datetime import timedelta
from amsdal.transactions import transaction, Crontab, ScheduleConfig
# Run every 10 minutes (interval in seconds)
@transaction(schedule=60 * 10)
def cleanup_expired() -> None:
pass
# Run every midnight (crontab)
@transaction(schedule_config=ScheduleConfig(schedule=Crontab(minute=0, hour=0)))
def daily_report() -> None:
pass
# Run every 3 days with arguments
@transaction(
schedule_config=ScheduleConfig(
schedule=timedelta(days=3),
args=(1, 2),
kwargs={"c": 3},
)
)
def periodic_task(a: int, b: int, c: int) -> None:
pass
from datetime import timedelta
from amsdal.transactions import async_transaction, Crontab, ScheduleConfig
@async_transaction(schedule=60 * 10)
async def cleanup_expired() -> None:
pass
@async_transaction(schedule_config=ScheduleConfig(schedule=Crontab(minute=0, hour=0)))
async def daily_report() -> None:
pass
@async_transaction(
schedule_config=ScheduleConfig(
schedule=timedelta(days=3),
args=(1, 2),
kwargs={"c": 3},
)
)
async def periodic_task(a: int, b: int, c: int) -> None:
pass
Start the worker in scheduler or hybrid mode:
# Scheduler only — puts tasks in the queue
amsdal worker run --mode scheduler
# Hybrid — schedules and processes tasks
amsdal worker run --mode hybrid
Rollback¶
AMSDAL records a full history of changes. You can roll back to a previous state by timestamp or by transaction ID. Rolling back creates new versions of affected objects (it does not delete history).
By Timestamp¶
from amsdal.utils.rollback import rollback_to_timestamp
# Roll back all changes made after the given timestamp
rollback_to_timestamp(c2.get_metadata().updated_at)
from amsdal.utils.rollback import async_rollback_to_timestamp
await async_rollback_to_timestamp((await c2.aget_metadata()).updated_at)
By Transaction ID¶
from amsdal.utils.rollback import rollback_transaction
# Roll back all changes made by transactions after the given one
rollback_transaction(c2.get_metadata().transaction.ref.object_id)
from amsdal.utils.rollback import async_rollback_transaction
await async_rollback_transaction(
(await c2.aget_metadata()).transaction.ref.object_id
)
Limitations¶
You cannot roll back to a timestamp that falls in the middle of a transaction. If two objects were created within the same transaction, you cannot roll back to a point between them — an AmsdalTransactionError is raised.
from amsdal_data.transactions.errors import AmsdalTransactionError
@transaction
def create_two():
Company(name='first').save()
Company(name='second').save()
create_two()
# This fails — both were created in the same transaction
try:
rollback_to_timestamp(first_company.get_metadata().updated_at)
except AmsdalTransactionError:
pass # cannot roll back to mid-transaction point
Code Organization¶
When transactions grow complex, extract shared logic into a separate module. The src directory serves as the root for custom code:
📁 src
├── 📁 transactions
│ ├── 📁 common
│ │ ├── __init__.py
│ │ └── utils.py
│ └── create_person.py
# transactions/create_person.py
from amsdal.transactions import transaction
from transactions.common.utils import authorize_user
@transaction
def create_person(user: User, first_name: str, last_name: str) -> Person:
authorize_user(user, "create_person")
person = Person(first_name=first_name, last_name=last_name)
person.save()
return person