Multiple Postgres Connections¶
This notebook demonstrates how to use AMSDAL Glue to connect to two postgres databases, one with existing tables and records, and another one with no tables.
This example demonstrates how to:
- Initialize AMSDAL Glue
- Register connections
- Register a new schema/table in PostgreSQL database
- Fetch all schemas/tables from both connections
- Insert multiple records into
shipping
table - Fetch data by single command using joins for tables from different databases
Databases and data overview¶
The existing database will contain the following tables and data:
Table customers
¶
customer_id | first_name | last_name | age | country |
---|---|---|---|---|
1 | John | Doe | 31 | USA |
2 | Robert | Luna | 22 | USA |
3 | David | Robinson | 22 | UK |
4 | John | Reinhardt | 25 | UK |
5 | Betty | Doe | 28 | UAE |
Table orders
¶
order_id | item | amount | customer_id |
---|---|---|---|
1 | Keyboard | 400 | 4 |
2 | Mouse | 300 | 4 |
3 | Monitor | 12000 | 3 |
4 | Keyboard | 400 | 1 |
5 | Mousepad | 250 | 2 |
In this example we will also create a new shipping
table with the following data in a second database:
Table shipping
¶
shipping_id | status | customer_id |
---|---|---|
1 | Pending | 2 |
2 | Pending | 4 |
3 | Delivered | 3 |
4 | Pending | 5 |
5 | Delivered | 1 |
Prerequisites¶
We will run two postgres databases in docker containers, so make sure you have installed Docker locally.
# Functions to manage Docker containers
def start_containers():
# Start existing db container
!docker run -d \
--name db_existing \
--volume ./existing-db-dump.sql:/docker-entrypoint-initdb.d/1.sql \
-e POSTGRES_DB=db_name_1 \
-e POSTGRES_USER=db_user \
-e POSTGRES_PASSWORD=db_password \
-p 5433:5432 \
library/postgres:16
# Start new db container
!docker run -d \
--name db_new \
-e POSTGRES_DB=db_name_2 \
-e POSTGRES_USER=db_user \
-e POSTGRES_PASSWORD=db_password \
-p 5434:5432 \
library/postgres:16
print("Containers started!")
def stop_containers():
!docker stop db_existing db_new
!docker rm db_existing db_new
print("Containers stopped and removed.")
def check_status():
!docker ps -a | grep "db_existing\|db_new"
start_containers()
Let's check if the containers are running:
check_status()
b3064332a9d8 postgres:16 "docker-entrypoint.s…" 1 second ago Up Less than a second 0.0.0.0:5434->5432/tcp, [::]:5434->5432/tcp db_new 9f76f440fddd postgres:16 "docker-entrypoint.s…" 1 second ago Up Less than a second 0.0.0.0:5433->5432/tcp, [::]:5433->5432/tcp db_existing
We can see two running postgres:16
containers.
Install AMSDAL Glue¶
Now we can start with installing AMSDAL Glue.
!pip install "amsdal-glue[postgres-binary]"
AMSDAL Glue also supports the installation of the psycopg
package from source, that is recommended for production but requires system build tools and extra system libraries (Details here). To install it, run the following command:
pip install "amsdal-glue[postgres]"
Initialize default AMSDAL Glue services and containers¶
The AMSDAL Glue provides a set of ready-to-use services and containers that can be used to connect to databases and execute queries.
Let's initialize the default AMSDAL Glue services and containers.
from amsdal_glue import init_default_containers
init_default_containers()
print("That's it! AMSDAL Glue is ready to use.")
That's it! AMSDAL Glue is ready to use.
Register connections¶
Now it's time to register connections to the databases.
from amsdal_glue import Container, ConnectionManager, DefaultConnectionPool, PostgresConnection
existing_db_pool = DefaultConnectionPool(
PostgresConnection,
dsn="postgres://db_user:db_password@localhost:5433/db_name_1",
)
new_db_pool = DefaultConnectionPool(
PostgresConnection,
dsn="postgres://db_user:db_password@localhost:5434/db_name_2",
)
connection_mng = Container.managers.get(ConnectionManager)
connection_mng.register_connection_pool(existing_db_pool)
connection_mng.register_connection_pool(new_db_pool, schema_name="shipping")
print('Connections are registered.')
Connections are registered.
Note, above we have created two Postgres connection pools and registered the existing_db_pool
as a default one, and the new_db_pool
linked to the shipping
schema name only. It means, any queries to the shipping
schema will be executed using the new_db_pool
connection.
Now we can fetch all schemas/tables from both connections:¶
from amsdal_glue import Container, SchemaQueryOperation
from amsdal_glue.interfaces import SchemaQueryService
query_service = Container.services.get(SchemaQueryService)
result = query_service.execute(SchemaQueryOperation(filters=None))
print('Success:', result.success)
print('Error details:', result.message)
print('Found schemas:', len(result.schemas))
for idx, schema in enumerate((result.schemas or [])):
print(f'Schema {idx + 1}:', schema, end='\n\n')
Success: True Error details: None Found schemas: 2 Schema 1: Schema<.customers_v_Version.LATEST:None:[PropertySchema<customer_id:<class 'int'>:True:None:nextval('customers_customer_id_seq'::regclass)>, PropertySchema<age:<class 'int'>:False:None:None>, PropertySchema<first_name:<class 'str'>:True:None:None>, PropertySchema<last_name:<class 'str'>:True:None:None>, PropertySchema<country:<class 'str'>:False:None:None>]:[PrimaryKeyConstraint(name='customers_pkey', fields=['customer_id'])]:[IndexSchema<idx_full_name:['first_name', 'last_name']:None>]> Schema 2: Schema<.orders_v_Version.LATEST:None:[PropertySchema<order_id:<class 'int'>:True:None:nextval('orders_order_id_seq'::regclass)>, PropertySchema<amount:<class 'float'>:True:None:None>, PropertySchema<customer_id:<class 'int'>:False:None:None>, PropertySchema<item:<class 'str'>:True:None:None>]:[PrimaryKeyConstraint(name='orders_pkey', fields=['order_id']), ForeignKeyConstraint(name='orders_customer_id_fkey', fields=['customer_id'], reference_schema=SchemaReference<None.customers__v__Version.LATEST:None>, reference_fields=['customer_id'])]:[]>
As we can see, we got two tables from existing database. Now, let's create a new table in second database.
Create the shipping table¶
from amsdal_glue import Container, SchemaCommand, RegisterSchema, Version
from amsdal_glue.interfaces import SchemaCommandService
from amsdal_glue import Schema, PropertySchema, PrimaryKeyConstraint
shipping_schema = Schema(
name="shipping",
version=Version.LATEST,
properties=[
PropertySchema(
name="shipping_id",
type=int,
required=True,
),
PropertySchema(
name="status",
type=str,
required=True,
),
PropertySchema(
name="customer_id",
type=int,
required=True,
),
],
constraints=[
PrimaryKeyConstraint(name="pk_shipping", fields=["shipping_id"]),
],
)
service = Container.services.get(SchemaCommandService)
result = service.execute(
SchemaCommand(
mutations=[
RegisterSchema(schema=shipping_schema),
]
),
)
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
Success: True
Let's run the schema query again to check if the new table was created.
from amsdal_glue import Container, SchemaQueryOperation
from amsdal_glue.interfaces import SchemaQueryService
query_service = Container.services.get(SchemaQueryService)
result = query_service.execute(SchemaQueryOperation(filters=None))
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
else:
print('Found schemas:', len(result.schemas))
for idx, schema in enumerate((result.schemas or [])):
print(f'Schema {idx + 1}:', schema, end='\n\n')
Success: True Found schemas: 3 Schema 1: Schema<.customers_v_Version.LATEST:None:[PropertySchema<customer_id:<class 'int'>:True:None:nextval('customers_customer_id_seq'::regclass)>, PropertySchema<age:<class 'int'>:False:None:None>, PropertySchema<first_name:<class 'str'>:True:None:None>, PropertySchema<last_name:<class 'str'>:True:None:None>, PropertySchema<country:<class 'str'>:False:None:None>]:[PrimaryKeyConstraint(name='customers_pkey', fields=['customer_id'])]:[IndexSchema<idx_full_name:['first_name', 'last_name']:None>]> Schema 2: Schema<.orders_v_Version.LATEST:None:[PropertySchema<order_id:<class 'int'>:True:None:nextval('orders_order_id_seq'::regclass)>, PropertySchema<amount:<class 'float'>:True:None:None>, PropertySchema<customer_id:<class 'int'>:False:None:None>, PropertySchema<item:<class 'str'>:True:None:None>]:[PrimaryKeyConstraint(name='orders_pkey', fields=['order_id']), ForeignKeyConstraint(name='orders_customer_id_fkey', fields=['customer_id'], reference_schema=SchemaReference<None.customers__v__Version.LATEST:None>, reference_fields=['customer_id'])]:[]> Schema 3: Schema<.shipping_v_Version.LATEST:None:[PropertySchema<shipping_id:<class 'int'>:True:None:None>, PropertySchema<customer_id:<class 'int'>:True:None:None>, PropertySchema<status:<class 'str'>:True:None:None>]:[PrimaryKeyConstraint(name='pk_shipping', fields=['shipping_id'])]:[]>
Perfect! Now we have two tables in the existing_db_pool
and one table in the new_db_pool
. It's time to insert some records into the shipping
table.
Insert multiple records into shipping
table¶
from amsdal_glue import Container, DataCommand, InsertData, Version, Data, SchemaReference
from amsdal_glue.interfaces import DataCommandService
service = Container.services.get(DataCommandService)
result = service.execute(
DataCommand(
mutations=[
InsertData(
schema=SchemaReference(name="shipping", version=Version.LATEST),
data=[
Data(
data={
"shipping_id": 1,
"status": "Pending",
"customer_id": 2,
}
),
Data(
data={
"shipping_id": 2,
"status": "Pending",
"customer_id": 4,
}
),
Data(
data={
"shipping_id": 3,
"status": "Delivered",
"customer_id": 3,
}
),
Data(
data={
"shipping_id": 4,
"status": "Pending",
"customer_id": 5,
}
),
Data(
data={
"shipping_id": 5,
"status": "Delivered",
"customer_id": 1,
}
),
],
),
],
),
)
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
Success: True
OK, now let's fetch data by single command using joins for tables from different databases.
Fetch customers with their orders and shipping status¶
Let's firs of all define the query to fetch the data:
from amsdal_glue import (
QueryStatement,
Version,
SchemaReference,
JoinQuery,
JoinType,
FieldReference,
Field,
Conditions,
Condition,
FieldLookup,
OrderByQuery,
OrderDirection
)
from amsdal_glue_core.common.expressions.field_reference import FieldReferenceExpression
query = QueryStatement(
only=[
FieldReference(field=Field(name="customer_id"), table_name="c"),
FieldReference(field=Field(name="first_name"), table_name="c"),
FieldReference(field=Field(name="status"), table_name="s"),
],
table=SchemaReference(name="customers", alias="c", version=Version.LATEST),
joins=[
JoinQuery(
table=SchemaReference(
name="shipping", alias="s", version=Version.LATEST
),
on=Conditions(
Condition(
left=FieldReferenceExpression(
field_reference=FieldReference(
field=Field(name="customer_id"), table_name="s"
)
),
lookup=FieldLookup.EQ,
right=FieldReferenceExpression(
field_reference=FieldReference(
field=Field(name="customer_id"), table_name="c"
)
),
),
),
join_type=JoinType.INNER,
),
],
order_by=[
OrderByQuery(
field=FieldReference(field=Field(name="customer_id"), table_name="c"),
direction=OrderDirection.ASC,
),
OrderByQuery(
field=FieldReference(field=Field(name="shipping_id"), table_name="s"),
direction=OrderDirection.ASC,
),
],
)
print("Query is defined:", query)
Query is defined: QueryStatement(table=SchemaReference<None.customers__v__Version.LATEST:c>, only=[c.customer_id, c.first_name, s.status], distinct=False, annotations=None, aggregations=None, joins=[JoinQuery(table=SchemaReference<None.shipping__v__Version.LATEST:s>, on=(Condition(left=s.customer_id, lookup===, right=c.customer_id, negate=False)), join_type=<JoinType.INNER: 'INNER'>)], where=None, group_by=None, order_by=[OrderByQuery(field=c.customer_id, direction=<OrderDirection.ASC: 'ASC'>), OrderByQuery(field=s.shipping_id, direction=<OrderDirection.ASC: 'ASC'>)], limit=None)
Now we can execute the query:
from amsdal_glue import Container, DataQueryOperation
from amsdal_glue.interfaces import DataQueryService
service = Container.services.get(DataQueryService)
result = service.execute(DataQueryOperation(query=query))
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
else:
print('Found records:', len(result.data))
print('Customers report:')
for row in result.data:
print(
f'{row.data["first_name"]} (ID: {row.data["customer_id"]}) - Shipping status: {row.data["status"]}'
)
Success: True Found records: 5 Customers report: John (ID: 1) - Shipping status: Delivered Robert (ID: 2) - Shipping status: Pending David (ID: 3) - Shipping status: Delivered John (ID: 4) - Shipping status: Pending Betty (ID: 5) - Shipping status: Pending
That's it! We have successfully fetched the data from two different databases using a single query.
SQL Parser¶
The AMSDAL GLue operates with python objects to define queries. However, you can also install the amsdal-glue-sql-parser
package that allows you to parse SQL queries into AMSDAL Glue objects.
!pip install "amsdal-glue-sql-parser[sqloxide]"
We also need to register this parser:
from amsdal_glue import Container
from amsdal_glue_sql_parser.parsers.base import SqlParserBase
from amsdal_glue_sql_parser.parsers.sqloxide_parser import SqlOxideParser
Container.services.register(SqlParserBase, SqlOxideParser)
print('Done!')
Done!
Now we are able to use raw SQL to define and execute queries:
from amsdal_glue import Container
from amsdal_glue.interfaces import DataQueryService
from amsdal_glue_sql_parser.parsers.base import SqlParserBase
service = Container.services.get(DataQueryService)
parser = Container.services.get(SqlParserBase)
raw_sql = """SELECT * FROM customers"""
result = service.execute(parser.parse_sql(raw_sql)[0])
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
else:
print('Found records:', len(result.data))
for row in result.data:
print(row.data)
Success: True Found records: 5 {'customer_id': 1, 'first_name': 'John', 'last_name': 'Doe', 'age': 31, 'country': 'USA'} {'customer_id': 2, 'first_name': 'Robert', 'last_name': 'Luna', 'age': 22, 'country': 'USA'} {'customer_id': 3, 'first_name': 'David', 'last_name': 'Robinson', 'age': 22, 'country': 'UK'} {'customer_id': 4, 'first_name': 'John', 'last_name': 'Reinhardt', 'age': 25, 'country': 'UK'} {'customer_id': 5, 'first_name': 'Betty', 'last_name': 'Doe', 'age': 28, 'country': 'UAE'}
Perfect! You can use the SQL parser to convert SQL queries (SELECT, INSERT, DELETE, etc.) into AMSDAL Glue objects and execute them.
Note, parser.parse_sql()
returns the list of parsed query objects due to the possibility of multiple queries in a single SQL string separated by ;
.
Cleanup¶
Finally, let's stop and remove the containers.
stop_containers()
db_existing
db_new
db_existing db_new
Containers stopped and removed.
Conclusion¶
In this notebook we have demonstrated how to use AMSDAL Glue to connect to two postgres databases, one with existing tables and records, and another one with no tables. We also created a new table in the second database, inserted multiple records into it, and fetched data by single command using joins for tables from different databases. This is a powerful feature of AMSDAL Glue that allows you to work with multiple databases and schemas in a single query.