ETL with AMSDAL Glue¶
This notebook demonstrates how to use AWS Glue to extract, transform, and load (ETL) data from two sources (sqlite databases) to third one (sqlite database as well).
The data sources are:
data/countries.db
- contains thecountries
table with countries and their populationdata/cities.db
- contains thecities
table cities and their population
The structure of the countries
table is:
Field | Type |
---|---|
country code | TEXT |
population | INTEGER |
The structure of the cities
table is:
Field | Type |
---|---|
Country | TEXT |
City | TEXT |
CityPopulation | INTEGER |
The resulting database will be summary.db
and it will contain the following
summary
table:
Field | Type |
---|---|
country_name | TEXT |
city_population | INTEGER |
non_city_population | INTEGER |
So, the country_name
and city_population
we can take from the cities
table directly, but the non_city_population
we need to calculate as the difference between the total population of the country and the sum of the populations of all cities in that country.
Let's start by installing amsdal-glue
:
from tempfile import TemporaryDirectory
!pip install "amsdal-glue"
Extract the data¶
The first step is to extract the data from the countries
and cities
tables. To do this we need to initiate AMSDAL Glue and setup connections to sources.
from amsdal_glue import init_default_containers
init_default_containers()
import os
from pathlib import Path
from amsdal_glue import Container, ConnectionManager, DefaultConnectionPool, SqliteConnection
BASE_DIR = Path(os.getcwd())
temp_dir = TemporaryDirectory()
temp_path = Path(temp_dir.name)
countries_db_pool = DefaultConnectionPool(
SqliteConnection,
db_path=BASE_DIR / "data/countries.db",
)
cities_db_pool = DefaultConnectionPool(
SqliteConnection,
db_path=BASE_DIR / "data/cities.db",
)
summary_db_pool = DefaultConnectionPool(
SqliteConnection,
db_path=temp_path / "summary.db",
)
connection_mng = Container.managers.get(ConnectionManager)
connection_mng.register_connection_pool(countries_db_pool, schema_name="countries")
connection_mng.register_connection_pool(cities_db_pool, schema_name="cities")
connection_mng.register_connection_pool(summary_db_pool)
print('Done!')
Done!
Transform the data¶
The next step in ETL is to transform the data.
Although, let's first of all investigate the data in the countries
and cities
tables to understand the structure of the data.
from amsdal_glue import Container, DataQueryOperation, QueryStatement, SchemaReference, Version
from amsdal_glue.interfaces import DataQueryService
from pprint import pprint
service = Container.services.get(DataQueryService)
countries_query = QueryStatement(
table=SchemaReference(name="countries", version=Version.LATEST),
)
cities_query = QueryStatement(
table=SchemaReference(name="cities", version=Version.LATEST),
)
countries_result = service.execute(DataQueryOperation(query=countries_query))
cities_result = service.execute(DataQueryOperation(query=cities_query))
print('Countries (first 5 records):')
pprint(countries_result.data[:5])
print('\nCities (first 5 records):')
pprint(cities_result.data[:5])
Countries (first 5 records): [Data(data={'country_code': 'CN', 'population': 1392730000}, metadata=None), Data(data={'country_code': 'IN', 'population': 1352617328}, metadata=None), Data(data={'country_code': 'US', 'population': 326687501}, metadata=None), Data(data={'country_code': 'ID', 'population': 267663435}, metadata=None), Data(data={'country_code': 'PK', 'population': 212215030}, metadata=None)] Cities (first 5 records): [Data(data={'City': 'Acheng', 'CityPopulation': 144665, 'Country': 'China'}, metadata=None), Data(data={'City': 'Aksu', 'CityPopulation': 340020, 'Country': 'China'}, metadata=None), Data(data={'City': 'Altay', 'CityPopulation': 139341, 'Country': 'China'}, metadata=None), Data(data={'City': 'Anbu', 'CityPopulation': 162964, 'Country': 'China'}, metadata=None), Data(data={'City': 'Anda', 'CityPopulation': 181271, 'Country': 'China'}, metadata=None)]
Perfect!
We can see that the countries
table contains the country code, however, the cities
table contains the country name. In order to join the tables we need to transform the cities
table to have the same country code as the countries
table.
Create countries code mapping¶
Let's create a table with the country code mapping from the countries.json
file.
from amsdal_glue import Container, SchemaCommand, RegisterSchema, Version, IndexSchema
from amsdal_glue.interfaces import SchemaCommandService
from amsdal_glue import Schema, PropertySchema, PrimaryKeyConstraint
countries_map_schema = Schema(
name="countries_map",
version=Version.LATEST,
properties=[
PropertySchema(
name="country_code",
type=str,
required=True,
),
PropertySchema(
name="country_name",
type=str,
required=True,
),
],
constraints=[
PrimaryKeyConstraint(name="pk_country_map", fields=["country_code", "country_name"]),
],
indexes=[
IndexSchema(name="idx_country_name", fields=["country_name"]),
],
)
service = Container.services.get(SchemaCommandService)
result = service.execute(
SchemaCommand(
mutations=[
RegisterSchema(schema=countries_map_schema),
]
),
)
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
Success: True
And now let's fill this table with data from the countries.json
file.
import json
import os
from pathlib import Path
from amsdal_glue import Container, DataCommand, InsertData, Version, Data, SchemaReference
from amsdal_glue.interfaces import DataCommandService
BASE_DIR = Path(os.getcwd())
with open(BASE_DIR / "data/countries.json") as f:
_countries_map = json.load(f)
data = [
Data(
data={
"country_code": _code,
"country_name": _name,
}
)
for _code, _name in _countries_map.items()
]
service = Container.services.get(DataCommandService)
result = service.execute(
DataCommand(
mutations=[
InsertData(
schema=SchemaReference(name="countries_map", version=Version.LATEST),
data=data,
),
],
),
)
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
Success: True
OK, now we are ready to transform the cities
table to have the country code column:
from amsdal_glue import (
QueryStatement,
Version,
SchemaReference,
JoinQuery,
JoinType,
FieldReference,
FieldReferenceAliased,
Field,
Conditions,
Condition,
FieldLookup,
)
from amsdal_glue_core.common.expressions.field_reference import FieldReferenceExpression
cities_with_country_code_query = QueryStatement(
only=[
FieldReferenceAliased(field=Field(name="country_code"), table_name="cm", alias="country_code"),
FieldReferenceAliased(field=Field(name="Country"), table_name="c", alias="country_name"),
FieldReferenceAliased(field=Field(name="City"), table_name="c", alias="city_name"),
FieldReferenceAliased(field=Field(name="CityPopulation"), table_name="c", alias="city_population"),
],
table=SchemaReference(name="cities", alias="c", version=Version.LATEST),
joins=[
JoinQuery(
table=SchemaReference(name="countries_map", alias="cm", version=Version.LATEST),
join_type=JoinType.LEFT,
on=Conditions(
Condition(
left=FieldReferenceExpression(field_reference=FieldReference(field=Field(name="country_name"), table_name="cm")),
lookup=FieldLookup.EQ,
right=FieldReferenceExpression(field_reference=FieldReference(field=Field(name="Country"), table_name="c")),
),
),
),
],
)
print("Query is defined!")
Query is defined!
Let's check the result of the query:
from amsdal_glue import Container, DataQueryOperation
from amsdal_glue.interfaces import DataQueryService
service = Container.services.get(DataQueryService)
result = service.execute(DataQueryOperation(query=cities_with_country_code_query))
print('Success:', result.success)
if not result.success:
print('Error:', result.message)
else:
print('Found records (the first 5):', result.data[:5])
Success: True Found records (the first 5): [Data(data={'country_code': 'CN', 'country_name': 'China', 'city_name': 'Acheng', 'city_population': 144665}, metadata=None), Data(data={'country_code': 'CN', 'country_name': 'China', 'city_name': 'Aksu', 'city_population': 340020}, metadata=None), Data(data={'country_code': 'CN', 'country_name': 'China', 'city_name': 'Altay', 'city_population': 139341}, metadata=None), Data(data={'country_code': 'CN', 'country_name': 'China', 'city_name': 'Anbu', 'city_population': 162964}, metadata=None), Data(data={'country_code': 'CN', 'country_name': 'China', 'city_name': 'Anda', 'city_population': 181271}, metadata=None)]
Perfect! Now, let's build subquery to calculate the SUM of the city populations for each country:
from amsdal_glue import (
QueryStatement,
FieldReference,
Field,
Conditions,
Condition,
FieldLookup,
SubQueryStatement,
AggregationQuery,
Sum,
GroupByQuery,
)
sum_city_population_query = QueryStatement(
only=[
FieldReferenceAliased(field=Field(name="country_name"), table_name="cts_codes", alias="country_name"),
FieldReferenceAliased(field=Field(name="population"), table_name="cnt", alias="country_population"),
],
aggregations=[
AggregationQuery(
expression=Sum(field=FieldReference(field=Field(name="city_population"), table_name="cts_codes")),
alias="city_population",
),
],
table=SchemaReference(name="countries", alias="cnt", version=Version.LATEST),
joins=[
JoinQuery(
table=SubQueryStatement(query=cities_with_country_code_query, alias="cts_codes"),
join_type=JoinType.LEFT,
on=Conditions(
Condition(
left=FieldReferenceExpression(field_reference=FieldReference(field=Field(name="country_code"), table_name="cts_codes")),
lookup=FieldLookup.EQ,
right=FieldReferenceExpression(field_reference=FieldReference(field=Field(name="country_code"), table_name="cnt")),
),
),
),
],
group_by=[
GroupByQuery(
field=FieldReference(field=Field(name="country_name"), table_name="cts_codes"),
),
GroupByQuery(
field=FieldReference(field=Field(name="population"), table_name="cnt"),
),
],
)
print("Query is defined!")
Query is defined!
Let's execute this query to see the results:
from amsdal_glue import Container, DataQueryOperation
from amsdal_glue.interfaces import DataQueryService
service = Container.services.get(DataQueryService)
result = service.execute(DataQueryOperation(query=sum_city_population_query))
print('Success:', result.success)
if not result.success:
print('Error:', result.message)
else:
print('Found records (the first 5):', result.data[:5])
Success: True Found records (the first 5): [Data(data={'country_name': 'Bolivia', 'country_population': 11353142, 'city_population': 4370486}, metadata=None), Data(data={'country_name': 'Japan', 'country_population': 126529100, 'city_population': 94943369}, metadata=None), Data(data={'country_name': 'Mali', 'country_population': 19077690, 'city_population': 2446134}, metadata=None), Data(data={'country_name': 'Canada', 'country_population': 37057765, 'city_population': 28317916}, metadata=None), Data(data={'country_name': 'Cambodia', 'country_population': 16249798, 'city_population': 3640169}, metadata=None)]
As we can see, now we have the country_population
and the city_population
columns for each country. The last transform step is to calculate the non_city_population
column as the difference between the country_population
and the city_population
.
from amsdal_glue import (
QueryStatement,
FieldReferenceAliased,
FieldReference,
Field,
AnnotationQuery,
SubQueryStatement,
ExpressionAnnotation,
)
sum_non_city_population_query = QueryStatement(
only=[
FieldReferenceAliased(field=Field(name="country_name"), table_name="pop", alias="country_name"),
FieldReferenceAliased(field=Field(name="city_population"), table_name="pop", alias="city_population"),
],
annotations=[
AnnotationQuery(
value=ExpressionAnnotation(
expression=FieldReference(
field=Field(name='country_population'),
table_name="pop"
) - FieldReference(
field=Field(name='city_population'),
table_name="pop"
),
alias='non_city_population',
),
),
],
table=SubQueryStatement(query=sum_city_population_query, alias='pop'),
)
print("Query is defined!")
Query is defined!
Let's execute this query to see the results:
from amsdal_glue import Container, DataQueryOperation
from amsdal_glue.interfaces import DataQueryService
service = Container.services.get(DataQueryService)
summary_result = service.execute(DataQueryOperation(query=sum_non_city_population_query))
print('Success:', summary_result.success)
if not summary_result.success:
print('Error:', summary_result.message)
else:
print('Found records (the first 5):', summary_result.data[:5])
Success: True Found records (the first 5): [Data(data={'country_name': 'Ivory Coast', 'city_population': 8431368, 'non_city_population': 16637861}, metadata=None), Data(data={'country_name': 'France', 'city_population': 55397172, 'non_city_population': 11579935}, metadata=None), Data(data={'country_name': 'Tanzania', 'city_population': 11497775, 'non_city_population': 44820573}, metadata=None), Data(data={'country_name': 'Kazakhstan', 'city_population': 9838011, 'non_city_population': 8434419}, metadata=None), Data(data={'country_name': 'Somalia', 'city_population': 5091026, 'non_city_population': 9917128}, metadata=None)]
Perfect! Now we are ready to go to the last step of ETL - to load the data into the summary
table.
Load the data into the summary table¶
In order to load the data into the summary
table we need to create the table first.
from amsdal_glue import Container, SchemaCommand, RegisterSchema, Version
from amsdal_glue.interfaces import SchemaCommandService
from amsdal_glue import Schema, PropertySchema, PrimaryKeyConstraint
summary_schema = Schema(
name="summary",
version=Version.LATEST,
properties=[
PropertySchema(
name="country_name",
type=str,
required=True,
),
PropertySchema(
name="city_population",
type=int,
required=True,
),
PropertySchema(
name="non_city_population",
type=int,
required=True,
),
],
constraints=[
PrimaryKeyConstraint(name="pk_country_name", fields=["country_name"]),
],
)
service = Container.services.get(SchemaCommandService)
result = service.execute(
SchemaCommand(
mutations=[
RegisterSchema(schema=summary_schema),
]
),
)
print('Success:', result.success)
if not result.success:
print('Error details:', result.message)
Success: True
Now, let's load the data into the summary
table:
service = Container.services.get(DataCommandService)
result = service.execute(
DataCommand(
mutations=[
InsertData(
schema=SchemaReference(name="summary", version=Version.LATEST),
data=summary_result.data,
),
],
),
)
That's it! The data is loaded into the summary
table.
The AMSDAL Glue by default uses sub-processes to execute the sub-queries in parallel, although you are always able to implement your own logic to execute the queries in the way you need, e.g. by using queue to make it more reliable.
SQL queries for transformation¶
By using the amsdal-glue-sql-parser
package it's possible to do the same transformation we did above using raw SQL. Let's try it. The first of all, we need to install the package:
!pip install "amsdal-glue-sql-parser[sqloxide]"
Now, let's define the SQL queries for the transformation:
cities_with_country_code_sql = """
SELECT
cm.country_code AS CountryCode,
c.Country AS CountryName,
c.CityPopulation AS CityPopulation
FROM
cities AS c
LEFT JOIN
countries_map AS cm
ON c.Country = cm.country_name
"""
sum_city_population_sql = f"""
SELECT
SUM(cts_codes.CityPopulation) AS city_population,
cts_codes.CountryCode AS country_code,
cts_codes.CountryName AS country_name,
cnt.population AS country_population
FROM
countries AS cnt
LEFT JOIN ({cities_with_country_code_sql}) AS cts_codes
ON cts_codes.CountryCode = cnt.country_code
GROUP BY
cts_codes.CountryCode, cts_codes.CountryName, cnt.population
"""
summary_sql = f"""
SELECT
pop.country_name,
pop.city_population,
(pop.country_population - pop.city_population) as non_city_population
FROM ({sum_city_population_sql}) as pop
"""
print('SQL queries are defined!')
SQL queries are defined!
Now, let's execute the summary_sql
SQL query:
from amsdal_glue_sql_parser.parsers.base import SqlParserBase
from amsdal_glue import Container
from amsdal_glue.interfaces import DataQueryService
from amsdal_glue_sql_parser.parsers.sqloxide_parser import SqlOxideParser
Container.services.register(SqlParserBase, SqlOxideParser)
parser = Container.services.get(SqlParserBase)
service = Container.services.get(DataQueryService)
op = parser.parse_sql(summary_sql)[0]
summary_result = service.execute(
query_op=op,
)
print('Success:', summary_result.success)
if not summary_result.success:
print('Error:', summary_result.message)
else:
print('Found records (the first 5):', summary_result.data[:5])
Success: True Found records (the first 5): [Data(data={'country_name': 'Dominican Republic', 'city_population': 7914717, 'non_city_population': 2712448}, metadata=None), Data(data={'country_name': 'Iraq', 'city_population': 20846671, 'non_city_population': 17586929}, metadata=None), Data(data={'country_name': 'Belarus', 'city_population': 7107860, 'non_city_population': 2375639}, metadata=None), Data(data={'country_name': 'Kenya', 'city_population': 6541149, 'non_city_population': 44851861}, metadata=None), Data(data={'country_name': 'China', 'city_population': 376255246, 'non_city_population': 1016474754}, metadata=None)]
As we can see the result is the same as we got using the AMSDAL Glue.