Skip to content

Extract Tranform Load Examples and Tutorial

Task Orchestration

Apache Airflow

Installation

To install Apache Airflow, run the following command:

pip install apache-airflow

Configuration

First, we need to set up airflow home directory. We can do this by running the following command:

export AIRFLOW_HOME=~/airflow

Now, we need to generate a configuration file that we can modify. We can do this by running the following command:

airflow config list --defaults > "${AIRFLOW_HOME}/airflow.cfg"

Before the database initialization, let’s specify explicitly the path to the database. To do this, find in the newly created airflow.cfg file the line with sql_alchemy_conn and change it to the following:

sql_alchemy_conn = sqlite:////home/airflow/airflow/airflow.db

Now, we can initialize the database by running the following command:

airflow db init

Define a DAG

Now, let’s define a simple DAG that will run a simple ETL pipeline. The pipeline will consist of the following steps:

  • Create an initial dataframe
  • Process the data by two different functions that will add and multiply two columns
  • Merge the dataframes
  • Save the final dataframe to a CSV file

Les’t create a file called tutorial.py inside the ${AIRFLOW_HOME}/dags directory with the following content:

from datetime import datetime, timedelta
from airflow.models.dag import DAG
import pandas as pd
from airflow.operators.python import get_current_context
from airflow.operators.python import PythonOperator

def initial_function() -> dict[str, pd.DataFrame]:
    a = [i for i in range(10000000)]
    b = [i * 2 for i in a]
    df = pd.DataFrame({"a": a, "b": b})
    return {"initial_df": df}

def process_data() -> dict[str, pd.DataFrame]:
    context = get_current_context()
    ti = context["ti"]
    initial_df = ti.xcom_pull(task_ids="initial_function", key="return_value")[
        "initial_df"    ]
    initial_df["c"] = initial_df["a"] + initial_df["b"]
    return {"df1": initial_df}

def process_data2() -> dict[str, pd.DataFrame]:
    context = get_current_context()
    ti = context["ti"]
    initial_df = ti.xcom_pull(task_ids="initial_function", key="return_value")[
        "initial_df"    ]
    initial_df["d"] = initial_df["a"] * initial_df["b"]
    return {"df2": initial_df}

def merge_data() -> dict[str, pd.DataFrame]:
    context = get_current_context()
    ti = context["ti"]
    df1: pd.DataFrame = ti.xcom_pull(task_ids="process_data", key="return_value")["df1"]
    df2: pd.DataFrame = ti.xcom_pull(task_ids="process_data2", key="return_value")[
        "df2"    ]
    df = pd.merge(df1, df2, on=["a", "b"], how="inner")
    return {"merged_df": df}

def save_data():
    context = get_current_context()
    ti = context["ti"]
    merged_df = ti.xcom_pull(task_ids="merge_data", key="return_value")["merged_df"]
    merged_df.to_csv("data.csv", index=False)

with DAG(
    "MyTestTutorial",
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    init_function_op = PythonOperator(
        task_id="initial_function",
        python_callable=initial_function,
    )
    process_data_op = PythonOperator(
        task_id="process_data",
        python_callable=process_data,
    )
    process_data2_op = PythonOperator(
        task_id="process_data2",
        python_callable=process_data2,
    )
    merge_data_op = PythonOperator(
        task_id="merge_data",
        python_callable=merge_data,
    )
    save_data_op = PythonOperator(
        task_id="save_data",
        python_callable=save_data,
    )
    (
        init_function_op
        >> [process_data_op, process_data2_op]
        >> merge_data_op
        >> save_data_op
    )

Start the server

To start the server, run the following command:

airflow webserver --port 8080

Now you can go to the http://localhost:8080/ to see the web interface and execute the defined DAG and view the progress.

Luigi

Installation

pip install luigi[toml]

Define a Task

You can define multiple tasks that will work in parallel using Luigi. Here is an example of how you can do this:

import luigi
import pandas as pd
from io import StringIO

class InitialTask(luigi.Task):
    def run(self):
        a = [i for i in range(10000000)]
        b = [i * 2 for i in a]
        df = pd.DataFrame({"a": a, "b": b})
        with self.output().open("w") as f:
            f.write(df.to_csv())
    def output(self):
        return luigi.LocalTarget("output.csv")

class ProcessData1Task(luigi.Task):
    def requires(self):
        return InitialTask()
    def run(self):
        with self.input().open("r") as f:
            initial_df = pd.read_csv(StringIO(f.read()))
        initial_df["c"] = initial_df["a"] + initial_df["b"]
        with self.output().open("w") as f:
            f.write(initial_df.to_csv())
    def output(self):
        return luigi.LocalTarget("output1.csv")

class ProcessData2Task(luigi.Task):
    def requires(self):
        return InitialTask()
    def run(self):
        with self.input().open("r") as f:
            initial_df = pd.read_csv(StringIO(f.read()))
        initial_df["d"] = initial_df["a"] * initial_df["b"]
        with self.output().open("w") as f:
            f.write(initial_df.to_csv())
    def output(self):
        return luigi.LocalTarget("output2.csv")

class MergeTask(luigi.Task):
    def requires(self):
        return {
            "process_data1": ProcessData1Task(),
            "process_data2": ProcessData2Task(),
        }
    def run(self):
        with self.input()["process_data1"].open("r") as f:
            df1 = pd.read_csv(StringIO(f.read()))
        with self.input()["process_data2"].open("r") as f:
            df2 = pd.read_csv(StringIO(f.read()))
        df = pd.merge(df1, df2, on=["a", "b"], how="inner")
        with self.output().open("w") as f:
            f.write(df.to_csv())
    def output(self):
        return luigi.LocalTarget("output3.csv")

class SaveDataTask(luigi.Task):
    def requires(self):
        return MergeTask()
    def run(self):
        with self.input().open("r") as f:
            merged_df = pd.read_csv(StringIO(f.read()))
        merged_df.to_csv("data.csv", index=False)
    def output(self):
        return luigi.LocalTarget("data.csv")

if __name__ == "__main__":
    luigi.build([SaveDataTask()], local_scheduler=True)

Run the Task

To run the task, simply run the following command:

python example.py

Start the server

To start the server, run the following command:

luigid

Now you can go to the http://localhost:8082/ to see the web interface.

Prefect

Installation

pip install prefect

Define a Task

from prefect import flow, task, State
import pandas as pd

@task(name="Process Data 1")
def process_data1(initial_df: pd.DataFrame) -> pd.DataFrame:
    initial_df["c"] = initial_df["a"] + initial_df["b"]
    return initial_df

@task(name="Process Data 2")
def process_data2(initial_df: pd.DataFrame) -> pd.DataFrame:
    initial_df["d"] = initial_df["a"] * initial_df["b"]
    return initial_df

@task(name="Merge Data")
def merge_data(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    df = pd.merge(df1, df2, on=["a", "b"], how="inner")
    return df

@task(name="Save Data")
def save_data(merged_df: pd.DataFrame) -> None:
    merged_df.to_csv("data.csv", index=False)

@flow(name="Process CSV")
def initial_function() -> None:
    a = [i for i in range(10000000)]
    b = [i * 2 for i in a]
    df = pd.DataFrame({"a": a, "b": b})
    state1: State = process_data1.submit(df)
    state2: State = process_data2.submit(df)
    save_data(merge_data(state1.result(), state2.result()))

if __name__ == "__main__":
    initial_function()

Run the Task

To run the task, simply run the following command:

python main.py

Start the server

To start the server, run the following command:

prefect server start

Now you can go to the http://localhost:4200/ to see the web interface.

Data Processing

pandas

Installation

pip install dask[complete]

Example

import sqlite3
import os
from pathlib import Path
import pandas as pd
from pprint import pprint

BASE_DIR = Path(os.getcwd())

countries_con = sqlite3.connect(BASE_DIR / "data/countries.db")
cities_con = sqlite3.connect(BASE_DIR / "data/cities.db")
summary_con = sqlite3.connect(BASE_DIR / "dist/summary.db")

countries_df = pd.read_sql_query("SELECT * FROM countries", countries_con)
cities_df = pd.read_sql_query("SELECT * FROM cities", cities_con)


print('Countries (first 5 records):')
pprint(countries_df.head())
print('\nCities (first 5 records):')
pprint(cities_df.head())

import json

with open(BASE_DIR / "data/countries.json") as f:
    _countries_map = json.load(f)
    map_df = pd.DataFrame(
        [
            {
                "country_code": _code,
                "country_name": _name,
            }
            for _code, _name in _countries_map.items()
        ]
    )

map_df.to_sql("countries_map", summary_con, if_exists="replace", index=False)

cities_with_country_code_df = pd.merge(
    cities_df,
    map_df,
    left_on="Country",
    right_on='country_name',
)[['Country', 'City', 'CityPopulation', 'country_code']]
cities_with_country_code_df.head()

sum_country_population_df = pd.merge(
    countries_df,
    cities_with_country_code_df,
    left_on='country_code',
    right_on='country_code',
)[['Country', 'CityPopulation']].groupby('Country').sum().reset_index().rename(columns={'CityPopulation': 'TotalPopulation'})
sum_country_population_df.head()

sum_non_city_population_df = pd.merge(
    cities_df,
    sum_country_population_df,
    left_on='Country',
    right_on='Country',
)[['Country', 'City', 'TotalPopulation', 'CityPopulation']].assign(
    NonCityPopulation=lambda x: x['TotalPopulation'] - x['CityPopulation']
)[['Country', 'City', 'CityPopulation', 'NonCityPopulation']]
sum_non_city_population_df.head()

sum_non_city_population_df.to_sql("summary", summary_con, if_exists="replace", index=False)

Dask

Installation

pip install dask[complete]

Define and run tasks

You can define a task that will work in parallel using Dask. Here is an example of how you can do this:

from copy import copy

import dask
import dask.bag as db
import pandas as pd

def initial_function(i):
    return {
        "a": i,
        "b": i * 2,
    }

def process_data(initial_df: dict[str, int]) -> dict[str, int]:
    initial_df = copy(initial_df)
    initial_df["c"] = initial_df["a"] + initial_df["b"]
    return initial_df

def process_data2(initial_df: dict[str, int]) -> dict[str, int]:
    initial_df = copy(initial_df)
    initial_df["d"] = initial_df["a"] * initial_df["b"]
    return initial_df

@dask.delayed
def save_data(merged_df: list[dict[str, int]]):
    pd.DataFrame(merged_df).to_csv("data.csv", index=False)

tasks = db.from_sequence(range(100000), npartitions=1000)
tasks = tasks.map(initial_function)
tasks = tasks.map(process_data)
tasks = tasks.map(process_data2)
final_task = save_data(tasks)

# Execution
dask.compute(final_task)

Here, we use dask.bag to automatically batch applying our functions and at the end, we call our the final task save_data that is marked within dask.delayed decorator that creates a delayed object.

Another approach is to use the dask.dataframe API to process the data. Here is an example of how you can do this:

import dask.dataframe as dd
import pandas as pd
pandas_df = pd.DataFrame(
    [
        {
            "a": i,
            "b": i * 2,
        }
        for i in range(100000)
    ]
)
dask_df = dd.from_pandas(pandas_df)
dask_df["c"] = dask_df["a"] + dask_df["b"]
dask_df["d"] = dask_df["a"] * dask_df["b"]
dask_df.to_csv("data.csv", index=False, single_file=True)

Note how we just pass the pandas dataframe to the dd.from_pandas function to create a Dask dataframe, and then we can use the dask dataframe API to process the data.

Pypeln

Installation

pip install pypeln

Define a Task

We can define a task that will work in parallel using Pypeln. Here is an example of how you can do this:

import pandas as pd
import pypeln as pl

def initial_function() -> list[dict[str, int]]:
    return [
        {
            "a": i,
            "b": i * 2,
        }
        for i in range(100000)
    ]

def process_data(initial_df: dict[str, int]) -> dict[str, int]:
    initial_df["c"] = initial_df["a"] + initial_df["b"]
    return initial_df

def process_data2(initial_df: dict[str, int]) -> dict[str, int]:
    initial_df["d"] = initial_df["a"] * initial_df["b"]
    return initial_df

def save_data(merged_df: list[dict[str, int]]):
  pd.DataFrame(merged_df).to_csv("data.csv", index=False)

stage = (
    initial_function()
    | pl.process.map(process_data, workers=10, maxsize=100)
    | pl.process.map(process_data2, workers=10, maxsize=100)
)
save_data(list(stage))

This code will create a pipeline that will process the data in parallel using Pypeln. The pl.process.map function is used to create a stage that will process the data in parallel, and the workers argument is used to specify the number of workers that will be used to process the data.

PySpark

Let's see how we can use PySpark to build an ETL pipeline for the following case:

  1. We have two sources of data (just fo simplicity we will have two CSV files).
  2. The first source contains list of countries with country_code and country_population columns.
  3. The second source contains list of cities with City, Country, and Population columns, where Country stores country code and Population it is city population.
  4. We want to load, transform and load the data with country_code, city_population and non_city_population columns into a separate data source.

Installation

pip install pyspark

Define ETL Pipeline

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()

# Load data
countries = spark.read.csv("countries.csv", header=True)
cities = spark.read.csv("cities.csv", header=True)

# Transform data
cities = (
    cities
    .withColumn("Population", col("Population").cast("int"))  # Ensure Population is numeric
    .groupBy("Country")
    .sum("Population")
    .withColumnRenamed("sum(Population)", "city_population")
)
data = countries.join(cities, countries.country_code == cities.Country, "inner")
data = data.withColumn("non_city_population", data["country_population"] - data["city_population"])
data = data.drop("country_population")
data = data.drop("Country")

# Save data
data.write.csv("output.csv", header=True)

ETL in Python and SQL

Another approach is to use SQL to process the data. Here is an example of how you can do this:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()

# Load data
countries = spark.read.csv("countries.csv", header=True)
cities = spark.read.csv("cities.csv", header=True)

# Create temporary views
countries.createOrReplaceTempView("countries")
cities.createOrReplaceTempView("cities")

# Transform data
data = spark.sql("""
    SELECT
        c.country_code,
        SUM(CAST(ci.Population AS INT)) AS city_population,
        c.country_population - SUM(CAST(ci.Population AS INT)) AS non_city_population
    FROM
        countries c
    JOIN
        cities ci
    ON
        c.country_code = ci.Country
    GROUP BY
        c.country_code, c.country_population
""")

# Save data
data.write.csv("output.csv", header=True)

AMSDAL GLue

Installation

pip install amsdal-glue

Define ETL Pipeline

We assume that our data source are stored as SQLite databases. So, let's start by initializing the default containers:

from amsdal_glue import init_default_containers

init_default_containers()

Now, let's create our connections:

from amsdal_glue import Container, ConnectionManager, DefaultConnectionPool, SqliteConnection

countries_db_pool = DefaultConnectionPool(
    SqliteConnection,
    db_path="data/countries.db",
)

cities_db_pool = DefaultConnectionPool(
    SqliteConnection,
    db_path="data/cities.db",
)

summary_db_pool = DefaultConnectionPool(
    SqliteConnection,
    db_path="dist/summary.db",
)

connection_mng = Container.managers.get(ConnectionManager)
connection_mng.register_connection_pool(countries_db_pool, schema_name="countries")
connection_mng.register_connection_pool(cities_db_pool, schema_name="cities")
connection_mng.register_connection_pool(summary_db_pool)

Now, we are ready to define transformations:

from amsdal_glue import (
    QueryStatement,
    Version,
    SchemaReference,
    JoinQuery,
    JoinType,
    FieldReference,
    Field,
    Conditions,
    Condition,
    FieldLookup,
    SubQueryStatement,
    AggregationQuery,
    Sum,
    GroupByQuery,
    AnnotationQuery,
    ExpressionAnnotation,
)

# Define the query to sum city population by country
sum_city_population_query = QueryStatement(
    only=[
        FieldReference(field=Field(name="country_code"), table_name="c"),
        FieldReference(field=Field(name="country_population"), table_name="c"),
    ],
    aggregations=[
        AggregationQuery(
            expression=Sum(field=FieldReference(field=Field(name="Population"), table_name="ci")),
            alias="city_population",
        ),
    ],
    table=SchemaReference(name="countries", alias="c", version=Version.LATEST),
    joins=[
        JoinQuery(
            table=SchemaReference(name="cities", alias="ci", version=Version.LATEST),
            join_type=JoinType.LEFT,
            on=Conditions(
                Condition(
                    field=FieldReference(field=Field(name="country_code"), table_name="c"),
                    lookup=FieldLookup.EQ,
                    value=FieldReference(field=Field(name="Country"), table_name="ci"),
                ),
            ),
        ),
    ],
    group_by=[
        GroupByQuery(
            field=FieldReference(field=Field(name="country_code"), table_name="c"),
        ),
        GroupByQuery(
            field=FieldReference(field=Field(name="country_population"), table_name="c"),
        ),
    ],
)

# Define the query to calculate non city population
final_query = QueryStatement(
    only=[
        FieldReference(field=Field(name="country_code"), table_name="c"),
        FieldReference(field=Field(name="city_population"), table_name="c"),
    ],
    annotations=[
        AnnotationQuery(
            value=ExpressionAnnotation(
                expression=FieldReference(
                    field=Field(name='country_population'),
                    table_name="c"
                ) - FieldReference(
                    field=Field(name='city_population'),
                    table_name="c"
                ),
                alias='non_city_population',
            ),
        ),
    ],
    table=SubQueryStatement(
        query=sum_city_population_query,
        alias="c",
    ),
)

Now we can execute this query and save the results:

from amsdal_glue import Container, DataQueryOperation, DataCommand, InsertData, SchemaReference, Version
from amsdal_glue.interfaces import DataQueryService, DataCommandService

service = Container.services.get(DataQueryService)
summary_result = service.execute(DataQueryOperation(query=final_query))

print('Success:', summary_result.success)
print('Data (the first 5):', summary_result.data[:5])

# Store results (we assume the destination table is already created)
service = Container.services.get(DataCommandService)
result = service.execute(
    DataCommand(
        mutations=[
            InsertData(
                schema=SchemaReference(name="summary", version=Version.LATEST),
                data=summary_result.data,
            ),
        ],
    ),
)

ETL in Python and SQL

AMSDAL Glue also supports SQL queries. Here is an example of how you can do this:

from amsdal_glue_sql_parser.parsers.base import SqlParserBase
from amsdal_glue_sql_parser.parsers.sqloxide_parser import SqlOxideParser
from amsdal_glue import Container, DataQueryOperation, DataCommand, InsertData, SchemaReference, Version
from amsdal_glue.interfaces import DataQueryService

# Register parser
Container.services.register(SqlParserBase, SqlOxideParser)

subquery = """
    SELECT 
        c.country_code, 
        c.country_population, 
        SUM(ci.Population) as city_population
    FROM countries as c
    LEFT JOIN cities as ci
        ON c.country_name = ci.Country
    GROUP BY c.country_code, c.country_population
"""
summary_sql = f"""
    SELECT
        c.country_code,
        c.city_population,
        c.country_population - c.city_population AS non_city_population
    FROM
        ({subquery}) c
"""

parser = Container.services.get(SqlParserBase)
service = Container.services.get(DataQueryService)
summary_result = service.execute(DataQueryOperation(query=parser.parse_sql(summary_sql)[0]))

print('Success:', summary_result.success)
print('Data (the first 5):', summary_result.data[:5])

Note, that AMSDAL Glue's default services, executors, and utils are designed to be easily extendable and customizable. It's still in active development to provide more features and integrations with other tools.