Extract Tranform Load Examples and Tutorial¶
Task Orchestration¶
Apache Airflow¶
Installation
To install Apache Airflow, run the following command:
pip install apache-airflow
Configuration
First, we need to set up airflow home directory. We can do this by running the following command:
export AIRFLOW_HOME=~/airflow
Now, we need to generate a configuration file that we can modify. We can do this by running the following command:
airflow config list --defaults > "${AIRFLOW_HOME}/airflow.cfg"
Before the database initialization, let’s specify explicitly the path to the database. To do this, find in the newly created airflow.cfg
file the line with sql_alchemy_conn
and change it to the following:
sql_alchemy_conn = sqlite:////home/airflow/airflow/airflow.db
Now, we can initialize the database by running the following command:
airflow db init
Define a DAG
Now, let’s define a simple DAG that will run a simple ETL pipeline. The pipeline will consist of the following steps:
- Create an initial dataframe
- Process the data by two different functions that will add and multiply two columns
- Merge the dataframes
- Save the final dataframe to a CSV file
Les’t create a file called tutorial.py
inside the ${AIRFLOW_HOME}/dags
directory with the following content:
from datetime import datetime, timedelta
from airflow.models.dag import DAG
import pandas as pd
from airflow.operators.python import get_current_context
from airflow.operators.python import PythonOperator
def initial_function() -> dict[str, pd.DataFrame]:
a = [i for i in range(10000000)]
b = [i * 2 for i in a]
df = pd.DataFrame({"a": a, "b": b})
return {"initial_df": df}
def process_data() -> dict[str, pd.DataFrame]:
context = get_current_context()
ti = context["ti"]
initial_df = ti.xcom_pull(task_ids="initial_function", key="return_value")[
"initial_df" ]
initial_df["c"] = initial_df["a"] + initial_df["b"]
return {"df1": initial_df}
def process_data2() -> dict[str, pd.DataFrame]:
context = get_current_context()
ti = context["ti"]
initial_df = ti.xcom_pull(task_ids="initial_function", key="return_value")[
"initial_df" ]
initial_df["d"] = initial_df["a"] * initial_df["b"]
return {"df2": initial_df}
def merge_data() -> dict[str, pd.DataFrame]:
context = get_current_context()
ti = context["ti"]
df1: pd.DataFrame = ti.xcom_pull(task_ids="process_data", key="return_value")["df1"]
df2: pd.DataFrame = ti.xcom_pull(task_ids="process_data2", key="return_value")[
"df2" ]
df = pd.merge(df1, df2, on=["a", "b"], how="inner")
return {"merged_df": df}
def save_data():
context = get_current_context()
ti = context["ti"]
merged_df = ti.xcom_pull(task_ids="merge_data", key="return_value")["merged_df"]
merged_df.to_csv("data.csv", index=False)
with DAG(
"MyTestTutorial",
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
init_function_op = PythonOperator(
task_id="initial_function",
python_callable=initial_function,
)
process_data_op = PythonOperator(
task_id="process_data",
python_callable=process_data,
)
process_data2_op = PythonOperator(
task_id="process_data2",
python_callable=process_data2,
)
merge_data_op = PythonOperator(
task_id="merge_data",
python_callable=merge_data,
)
save_data_op = PythonOperator(
task_id="save_data",
python_callable=save_data,
)
(
init_function_op
>> [process_data_op, process_data2_op]
>> merge_data_op
>> save_data_op
)
Start the server
To start the server, run the following command:
airflow webserver --port 8080
Now you can go to the http://localhost:8080/
to see the web interface and execute the defined DAG and view the progress.
Luigi¶
Installation
pip install luigi[toml]
Define a Task
You can define multiple tasks that will work in parallel using Luigi. Here is an example of how you can do this:
import luigi
import pandas as pd
from io import StringIO
class InitialTask(luigi.Task):
def run(self):
a = [i for i in range(10000000)]
b = [i * 2 for i in a]
df = pd.DataFrame({"a": a, "b": b})
with self.output().open("w") as f:
f.write(df.to_csv())
def output(self):
return luigi.LocalTarget("output.csv")
class ProcessData1Task(luigi.Task):
def requires(self):
return InitialTask()
def run(self):
with self.input().open("r") as f:
initial_df = pd.read_csv(StringIO(f.read()))
initial_df["c"] = initial_df["a"] + initial_df["b"]
with self.output().open("w") as f:
f.write(initial_df.to_csv())
def output(self):
return luigi.LocalTarget("output1.csv")
class ProcessData2Task(luigi.Task):
def requires(self):
return InitialTask()
def run(self):
with self.input().open("r") as f:
initial_df = pd.read_csv(StringIO(f.read()))
initial_df["d"] = initial_df["a"] * initial_df["b"]
with self.output().open("w") as f:
f.write(initial_df.to_csv())
def output(self):
return luigi.LocalTarget("output2.csv")
class MergeTask(luigi.Task):
def requires(self):
return {
"process_data1": ProcessData1Task(),
"process_data2": ProcessData2Task(),
}
def run(self):
with self.input()["process_data1"].open("r") as f:
df1 = pd.read_csv(StringIO(f.read()))
with self.input()["process_data2"].open("r") as f:
df2 = pd.read_csv(StringIO(f.read()))
df = pd.merge(df1, df2, on=["a", "b"], how="inner")
with self.output().open("w") as f:
f.write(df.to_csv())
def output(self):
return luigi.LocalTarget("output3.csv")
class SaveDataTask(luigi.Task):
def requires(self):
return MergeTask()
def run(self):
with self.input().open("r") as f:
merged_df = pd.read_csv(StringIO(f.read()))
merged_df.to_csv("data.csv", index=False)
def output(self):
return luigi.LocalTarget("data.csv")
if __name__ == "__main__":
luigi.build([SaveDataTask()], local_scheduler=True)
Run the Task
To run the task, simply run the following command:
python example.py
Start the server
To start the server, run the following command:
luigid
Now you can go to the http://localhost:8082/
to see the web interface.
Prefect¶
Installation
pip install prefect
Define a Task
from prefect import flow, task, State
import pandas as pd
@task(name="Process Data 1")
def process_data1(initial_df: pd.DataFrame) -> pd.DataFrame:
initial_df["c"] = initial_df["a"] + initial_df["b"]
return initial_df
@task(name="Process Data 2")
def process_data2(initial_df: pd.DataFrame) -> pd.DataFrame:
initial_df["d"] = initial_df["a"] * initial_df["b"]
return initial_df
@task(name="Merge Data")
def merge_data(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
df = pd.merge(df1, df2, on=["a", "b"], how="inner")
return df
@task(name="Save Data")
def save_data(merged_df: pd.DataFrame) -> None:
merged_df.to_csv("data.csv", index=False)
@flow(name="Process CSV")
def initial_function() -> None:
a = [i for i in range(10000000)]
b = [i * 2 for i in a]
df = pd.DataFrame({"a": a, "b": b})
state1: State = process_data1.submit(df)
state2: State = process_data2.submit(df)
save_data(merge_data(state1.result(), state2.result()))
if __name__ == "__main__":
initial_function()
Run the Task
To run the task, simply run the following command:
python main.py
Start the server
To start the server, run the following command:
prefect server start
Now you can go to the http://localhost:4200/
to see the web interface.
Data Processing¶
pandas¶
Installation
pip install dask[complete]
Example
import sqlite3
import os
from pathlib import Path
import pandas as pd
from pprint import pprint
BASE_DIR = Path(os.getcwd())
countries_con = sqlite3.connect(BASE_DIR / "data/countries.db")
cities_con = sqlite3.connect(BASE_DIR / "data/cities.db")
summary_con = sqlite3.connect(BASE_DIR / "dist/summary.db")
countries_df = pd.read_sql_query("SELECT * FROM countries", countries_con)
cities_df = pd.read_sql_query("SELECT * FROM cities", cities_con)
print('Countries (first 5 records):')
pprint(countries_df.head())
print('\nCities (first 5 records):')
pprint(cities_df.head())
import json
with open(BASE_DIR / "data/countries.json") as f:
_countries_map = json.load(f)
map_df = pd.DataFrame(
[
{
"country_code": _code,
"country_name": _name,
}
for _code, _name in _countries_map.items()
]
)
map_df.to_sql("countries_map", summary_con, if_exists="replace", index=False)
cities_with_country_code_df = pd.merge(
cities_df,
map_df,
left_on="Country",
right_on='country_name',
)[['Country', 'City', 'CityPopulation', 'country_code']]
cities_with_country_code_df.head()
sum_country_population_df = pd.merge(
countries_df,
cities_with_country_code_df,
left_on='country_code',
right_on='country_code',
)[['Country', 'CityPopulation']].groupby('Country').sum().reset_index().rename(columns={'CityPopulation': 'TotalPopulation'})
sum_country_population_df.head()
sum_non_city_population_df = pd.merge(
cities_df,
sum_country_population_df,
left_on='Country',
right_on='Country',
)[['Country', 'City', 'TotalPopulation', 'CityPopulation']].assign(
NonCityPopulation=lambda x: x['TotalPopulation'] - x['CityPopulation']
)[['Country', 'City', 'CityPopulation', 'NonCityPopulation']]
sum_non_city_population_df.head()
sum_non_city_population_df.to_sql("summary", summary_con, if_exists="replace", index=False)
Dask¶
Installation
pip install dask[complete]
Define and run tasks
You can define a task that will work in parallel using Dask. Here is an example of how you can do this:
from copy import copy
import dask
import dask.bag as db
import pandas as pd
def initial_function(i):
return {
"a": i,
"b": i * 2,
}
def process_data(initial_df: dict[str, int]) -> dict[str, int]:
initial_df = copy(initial_df)
initial_df["c"] = initial_df["a"] + initial_df["b"]
return initial_df
def process_data2(initial_df: dict[str, int]) -> dict[str, int]:
initial_df = copy(initial_df)
initial_df["d"] = initial_df["a"] * initial_df["b"]
return initial_df
@dask.delayed
def save_data(merged_df: list[dict[str, int]]):
pd.DataFrame(merged_df).to_csv("data.csv", index=False)
tasks = db.from_sequence(range(100000), npartitions=1000)
tasks = tasks.map(initial_function)
tasks = tasks.map(process_data)
tasks = tasks.map(process_data2)
final_task = save_data(tasks)
# Execution
dask.compute(final_task)
Here, we use dask.bag
to automatically batch applying our functions and at the end, we call our the final task save_data
that is marked within dask.delayed
decorator that creates a delayed object.
Another approach is to use the dask.dataframe
API to process the data. Here is an example of how you can do this:
import dask.dataframe as dd
import pandas as pd
pandas_df = pd.DataFrame(
[
{
"a": i,
"b": i * 2,
}
for i in range(100000)
]
)
dask_df = dd.from_pandas(pandas_df)
dask_df["c"] = dask_df["a"] + dask_df["b"]
dask_df["d"] = dask_df["a"] * dask_df["b"]
dask_df.to_csv("data.csv", index=False, single_file=True)
Note how we just pass the pandas dataframe to the dd.from_pandas
function to create a Dask dataframe, and then we can use the dask dataframe API to process the data.
Pypeln¶
Installation
pip install pypeln
Define a Task
We can define a task that will work in parallel using Pypeln. Here is an example of how you can do this:
import pandas as pd
import pypeln as pl
def initial_function() -> list[dict[str, int]]:
return [
{
"a": i,
"b": i * 2,
}
for i in range(100000)
]
def process_data(initial_df: dict[str, int]) -> dict[str, int]:
initial_df["c"] = initial_df["a"] + initial_df["b"]
return initial_df
def process_data2(initial_df: dict[str, int]) -> dict[str, int]:
initial_df["d"] = initial_df["a"] * initial_df["b"]
return initial_df
def save_data(merged_df: list[dict[str, int]]):
pd.DataFrame(merged_df).to_csv("data.csv", index=False)
stage = (
initial_function()
| pl.process.map(process_data, workers=10, maxsize=100)
| pl.process.map(process_data2, workers=10, maxsize=100)
)
save_data(list(stage))
This code will create a pipeline that will process the data in parallel using Pypeln. The pl.process.map
function is used to create a stage that will process the data in parallel, and the workers
argument is used to specify the number of workers that will be used to process the data.
PySpark¶
Let's see how we can use PySpark to build an ETL pipeline for the following case:
- We have two sources of data (just fo simplicity we will have two CSV files).
- The first source contains list of countries with
country_code
andcountry_population
columns. - The second source contains list of cities with
City
,Country
, andPopulation
columns, whereCountry
stores country code andPopulation
it is city population. - We want to load, transform and load the data with
country_code
,city_population
andnon_city_population
columns into a separate data source.
Installation
pip install pyspark
Define ETL Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()
# Load data
countries = spark.read.csv("countries.csv", header=True)
cities = spark.read.csv("cities.csv", header=True)
# Transform data
cities = (
cities
.withColumn("Population", col("Population").cast("int")) # Ensure Population is numeric
.groupBy("Country")
.sum("Population")
.withColumnRenamed("sum(Population)", "city_population")
)
data = countries.join(cities, countries.country_code == cities.Country, "inner")
data = data.withColumn("non_city_population", data["country_population"] - data["city_population"])
data = data.drop("country_population")
data = data.drop("Country")
# Save data
data.write.csv("output.csv", header=True)
ETL in Python and SQL
Another approach is to use SQL to process the data. Here is an example of how you can do this:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()
# Load data
countries = spark.read.csv("countries.csv", header=True)
cities = spark.read.csv("cities.csv", header=True)
# Create temporary views
countries.createOrReplaceTempView("countries")
cities.createOrReplaceTempView("cities")
# Transform data
data = spark.sql("""
SELECT
c.country_code,
SUM(CAST(ci.Population AS INT)) AS city_population,
c.country_population - SUM(CAST(ci.Population AS INT)) AS non_city_population
FROM
countries c
JOIN
cities ci
ON
c.country_code = ci.Country
GROUP BY
c.country_code, c.country_population
""")
# Save data
data.write.csv("output.csv", header=True)
AMSDAL GLue¶
Installation
pip install amsdal-glue
Define ETL Pipeline
We assume that our data source are stored as SQLite databases. So, let's start by initializing the default containers:
from amsdal_glue import init_default_containers
init_default_containers()
Now, let's create our connections:
from amsdal_glue import Container, ConnectionManager, DefaultConnectionPool, SqliteConnection
countries_db_pool = DefaultConnectionPool(
SqliteConnection,
db_path="data/countries.db",
)
cities_db_pool = DefaultConnectionPool(
SqliteConnection,
db_path="data/cities.db",
)
summary_db_pool = DefaultConnectionPool(
SqliteConnection,
db_path="dist/summary.db",
)
connection_mng = Container.managers.get(ConnectionManager)
connection_mng.register_connection_pool(countries_db_pool, schema_name="countries")
connection_mng.register_connection_pool(cities_db_pool, schema_name="cities")
connection_mng.register_connection_pool(summary_db_pool)
Now, we are ready to define transformations:
from amsdal_glue import (
QueryStatement,
Version,
SchemaReference,
JoinQuery,
JoinType,
FieldReference,
Field,
Conditions,
Condition,
FieldLookup,
SubQueryStatement,
AggregationQuery,
Sum,
GroupByQuery,
AnnotationQuery,
ExpressionAnnotation,
)
# Define the query to sum city population by country
sum_city_population_query = QueryStatement(
only=[
FieldReference(field=Field(name="country_code"), table_name="c"),
FieldReference(field=Field(name="country_population"), table_name="c"),
],
aggregations=[
AggregationQuery(
expression=Sum(field=FieldReference(field=Field(name="Population"), table_name="ci")),
alias="city_population",
),
],
table=SchemaReference(name="countries", alias="c", version=Version.LATEST),
joins=[
JoinQuery(
table=SchemaReference(name="cities", alias="ci", version=Version.LATEST),
join_type=JoinType.LEFT,
on=Conditions(
Condition(
field=FieldReference(field=Field(name="country_code"), table_name="c"),
lookup=FieldLookup.EQ,
value=FieldReference(field=Field(name="Country"), table_name="ci"),
),
),
),
],
group_by=[
GroupByQuery(
field=FieldReference(field=Field(name="country_code"), table_name="c"),
),
GroupByQuery(
field=FieldReference(field=Field(name="country_population"), table_name="c"),
),
],
)
# Define the query to calculate non city population
final_query = QueryStatement(
only=[
FieldReference(field=Field(name="country_code"), table_name="c"),
FieldReference(field=Field(name="city_population"), table_name="c"),
],
annotations=[
AnnotationQuery(
value=ExpressionAnnotation(
expression=FieldReference(
field=Field(name='country_population'),
table_name="c"
) - FieldReference(
field=Field(name='city_population'),
table_name="c"
),
alias='non_city_population',
),
),
],
table=SubQueryStatement(
query=sum_city_population_query,
alias="c",
),
)
Now we can execute this query and save the results:
from amsdal_glue import Container, DataQueryOperation, DataCommand, InsertData, SchemaReference, Version
from amsdal_glue.interfaces import DataQueryService, DataCommandService
service = Container.services.get(DataQueryService)
summary_result = service.execute(DataQueryOperation(query=final_query))
print('Success:', summary_result.success)
print('Data (the first 5):', summary_result.data[:5])
# Store results (we assume the destination table is already created)
service = Container.services.get(DataCommandService)
result = service.execute(
DataCommand(
mutations=[
InsertData(
schema=SchemaReference(name="summary", version=Version.LATEST),
data=summary_result.data,
),
],
),
)
ETL in Python and SQL
AMSDAL Glue also supports SQL queries. Here is an example of how you can do this:
from amsdal_glue_sql_parser.parsers.base import SqlParserBase
from amsdal_glue_sql_parser.parsers.sqloxide_parser import SqlOxideParser
from amsdal_glue import Container, DataQueryOperation, DataCommand, InsertData, SchemaReference, Version
from amsdal_glue.interfaces import DataQueryService
# Register parser
Container.services.register(SqlParserBase, SqlOxideParser)
subquery = """
SELECT
c.country_code,
c.country_population,
SUM(ci.Population) as city_population
FROM countries as c
LEFT JOIN cities as ci
ON c.country_name = ci.Country
GROUP BY c.country_code, c.country_population
"""
summary_sql = f"""
SELECT
c.country_code,
c.city_population,
c.country_population - c.city_population AS non_city_population
FROM
({subquery}) c
"""
parser = Container.services.get(SqlParserBase)
service = Container.services.get(DataQueryService)
summary_result = service.execute(DataQueryOperation(query=parser.parse_sql(summary_sql)[0]))
print('Success:', summary_result.success)
print('Data (the first 5):', summary_result.data[:5])
Note, that AMSDAL Glue's default services, executors, and utils are designed to be easily extendable and customizable. It's still in active development to provide more features and integrations with other tools.