Skip to content

Connections

amsdal_glue.PostgresConnection

Bases: PostgresConnectionMixin, ConnectionBase

PostgresConnection is responsible for managing connections and executing queries and commands on a PostgreSQL database.

Example

Here is example of how to create a connection to a PostgreSQL database:

from amsdal_glue_connections import PostgresConnection

connection = PostgresConnection()
connection.connect(
    dsn='postgresql://user:password@localhost:5432/mydatabase',
    schema='public',
    timezone='UTC',
)

Note, it's also possible to put any extra connection parameters as keyword arguments supported by psycopg. Also, be aware that the autocommit parameter is set to True by default.

Most of the time, you will use the ConnectionManager to manage connections instead of creating a connection directly.

is_connected property

is_connected

Checks if the connection to the PostgreSQL database is established.

Returns:

Name Type Description
bool bool

True if connected, False otherwise.

is_alive property

is_alive

Checks if the connection to the PostgreSQL database is alive.

Returns:

Name Type Description
bool bool

True if alive, False otherwise.

connection property

connection

Gets the current connection to the PostgreSQL database.

Returns:

Type Description
Any

psycopg.Connection: The current connection.

Raises:

Type Description
ConnectionError

If the connection is not established.

connect

connect(
    dsn="",
    schema=None,
    timezone="UTC",
    *,
    autocommit=True,
    **kwargs
)

Establishes a connection to the PostgreSQL database.

Parameters:

Name Type Description Default
dsn str

The Data Source Name for the connection.

''
schema str | None

The default schema to be used for the connection. If None, the default schema usually is 'public'.

None
timezone str

The timezone to be used for the connection.

'UTC'
autocommit bool

Whether to enable autocommit mode.

True
**kwargs Any

Additional connection parameters.

{}

Raises:

Type Description
ConnectionError

If the connection is already established.

ImportError

If the 'psycopg' package is not installed.

disconnect

disconnect()

Closes the connection to the PostgreSQL database.

query

query(query)

Executes a query on the PostgreSQL database.

Parameters:

Name Type Description Default
query QueryStatement

The query to be executed.

required

Returns:

Type Description
list[Data]

list[Data]: The result of the query execution.

Raises:

Type Description
ConnectionError

If there is an error executing the query.

query_schema

query_schema(filters=None)

Queries the schema of the PostgreSQL database.

Parameters:

Name Type Description Default
filters Conditions | None

The filters to be applied to the schema query.

None

Returns:

Type Description
list[Schema]

list[Schema]: The result of the schema query.

run_mutations

run_mutations(mutations)

Executes a list of data mutations on the PostgreSQL database.

Parameters:

Name Type Description Default
mutations list[DataMutation]

The list of data mutations to be executed.

required

Returns:

Type Description
list[list[Data] | None]

list[list[Data] | None]: The result of the data mutations execution.

run_schema_command

run_schema_command(command)

Executes a schema command on the PostgreSQL database.

Parameters:

Name Type Description Default
command SchemaCommand

The schema command to be executed.

required

Returns:

Type Description
list[Schema | None]

list[Schema | None]: The result of the schema command execution.

execute

execute(query, *args)

Executes a query on the PostgreSQL database.

Parameters:

Name Type Description Default
query str

The query to be executed.

required
*args Any

The query parameters.

()

Returns:

Type Description
Any

psycopg.Cursor: The cursor for the executed query.

Raises:

Type Description
ConnectionError

If there is an error executing the query.

get_table_info

get_table_info(table_name)

Gets the information of a table in the PostgreSQL database.

Parameters:

Name Type Description Default
table_name str

The name of the table.

required

Returns:

Type Description
tuple[list[PropertySchema], list[BaseConstraint], list[IndexSchema]]

tuple[list[PropertySchema], list[BaseConstraint], list[IndexSchema]]: The properties, constraints, and indexes of the table.

acquire_lock

acquire_lock(lock)

Acquires a lock on the PostgreSQL database.

Parameters:

Name Type Description Default
lock ExecutionLockCommand

The lock command to be executed.

required

Returns:

Name Type Description
Any Any

The result of the lock acquisition.

release_lock

release_lock(lock)

Releases a lock on the PostgreSQL database.

Parameters:

Name Type Description Default
lock ExecutionLockCommand

The lock command to be released.

required

Returns:

Name Type Description
Any Any

The result of the lock release.

commit_transaction

commit_transaction(transaction)

Commits a transaction on the PostgreSQL database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction to be committed.

required

Returns:

Name Type Description
Any Any

The result of the transaction commit.

rollback_transaction

rollback_transaction(transaction)

Rolls back a transaction on the PostgreSQL database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction to be rolled back.

required

Returns:

Name Type Description
Any Any

The result of the transaction rollback.

begin_transaction

begin_transaction(transaction)

Begins a transaction on the PostgreSQL database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction to be begun.

required

Returns:

Name Type Description
Any Any

The result of the transaction begin.

revert_transaction

revert_transaction(transaction)

Reverts a transaction on the PostgreSQL database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction to be reverted.

required

Returns:

Name Type Description
Any Any

The result of the transaction revert.

amsdal_glue.SqliteConnection

Bases: SqliteConnectionMixin, ConnectionBase

SqliteConnection is responsible for managing connections and executing queries and commands on a SQLite database.

Example

Here is example of how to create a connection to a SQlite database:

from amsdal_glue_connections import SqliteConnection

connection = SqliteConnection()
connection.connect(
    db_path='my_db.sqlite',
    check_same_thread=False,
)

Note, the check_same_thread parameter has True as default value. Although, it is required to set it to False due to using the ThreadParallelExecutor.

It's also possible to put any other parameters as a keyword arguments that are accepted by the sqlite3.connect function.

Most of the time, you will use the ConnectionManager to manage connections instead of creating a connection directly.

is_connected property

is_connected

Checks if the connection to the SQLite database is established.

Returns:

Name Type Description
bool bool

True if connected, False otherwise.

is_alive property

is_alive

Checks if the connection to the SQLite database is alive.

Returns:

Name Type Description
bool bool

True if alive, False otherwise.

connection property

connection

Gets the current SQLite connection.

Returns:

Type Description
Connection

sqlite3.Connection: The current SQLite connection.

Raises:

Type Description
ConnectionError

If the connection is not established.

connect

connect(db_path, *, check_same_thread=False, **kwargs)

Establishes a connection to the SQLite database.

Parameters:

Name Type Description Default
db_path Path

The path to the SQLite database file.

required
check_same_thread bool

Whether to check the same thread. Defaults to False.

False
**kwargs Any

Additional arguments for the SQLite connection.

{}

Raises:

Type Description
ConnectionError

If the connection is already established.

disconnect

disconnect()

Closes the connection to the SQLite database.

query

query(query)

Executes a query on the SQLite database.

Parameters:

Name Type Description Default
query QueryStatement

The query to be executed.

required

Returns:

Type Description
list[Data]

list[Data]: The result of the query execution.

Raises:

Type Description
ConnectionError

If there is an error executing the query.

ValueError

If a column name is duplicated.

query_schema

query_schema(filters=None)

Queries the schema of the SQLite database.

Parameters:

Name Type Description Default
filters Conditions

Filters to apply to the schema query. Defaults to None.

None

Returns:

Type Description
list[Schema]

list[Schema]: The list of schemas matching the filters.

run_mutations

run_mutations(mutations)

Runs a list of data mutations on the SQLite database.

Parameters:

Name Type Description Default
mutations list[DataMutation]

The list of data mutations to be executed.

required

Returns:

Type Description
list[list[Data] | None]

list[list[Data] | None]: The result of each mutation execution.

run_schema_command

run_schema_command(command)

Runs a schema command on the SQLite database.

Parameters:

Name Type Description Default
command SchemaCommand

The schema command to be executed.

required

Returns:

Type Description
list[Schema | None]

list[Schema | None]: The result of each schema mutation.

execute

execute(query, *args)

Executes a query on the SQLite database.

Parameters:

Name Type Description Default
query str

The query to be executed.

required
*args Any

The arguments for the query.

()

Returns:

Type Description
Cursor

sqlite3.Cursor: The cursor for the executed query.

Raises:

Type Description
ConnectionError

If there is an error executing the query.

get_table_info

get_table_info(table_name)

Gets the information of a table in the SQLite database.

Parameters:

Name Type Description Default
table_name str

The name of the table.

required

Returns:

Type Description
tuple[list[PropertySchema], list[BaseConstraint], list[IndexSchema]]

tuple[list[PropertySchema], list[BaseConstraint], list[IndexSchema]]: The properties, constraints, and indexes of the table.

acquire_lock

acquire_lock(lock)

Acquires a lock on the SQLite database.

Parameters:

Name Type Description Default
lock ExecutionLockCommand

The lock command.

required

Returns:

Name Type Description
Any Any

The result of the lock acquisition.

release_lock

release_lock(lock)

Releases a lock on the SQLite database.

Parameters:

Name Type Description Default
lock ExecutionLockCommand

The lock command.

required

Returns:

Name Type Description
Any Any

The result of the lock release.

commit_transaction

commit_transaction(transaction)

Commits a transaction on the SQLite database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction command or transaction ID.

required

Returns:

Name Type Description
Any Any

The result of the transaction commit.

rollback_transaction

rollback_transaction(transaction)

Rolls back a transaction on the SQLite database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction command or transaction ID.

required

Returns:

Name Type Description
Any Any

The result of the transaction rollback.

begin_transaction

begin_transaction(transaction)

Begins a transaction on the SQLite database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction command or transaction ID.

required

Returns:

Name Type Description
Any Any

The result of the transaction begin.

revert_transaction

revert_transaction(transaction)

Reverts a transaction on the SQLite database.

Parameters:

Name Type Description Default
transaction TransactionCommand | str | None

The transaction command or transaction ID.

required

Returns:

Name Type Description
Any Any

The result of the transaction revert.