Web Connection¶
This example shows how to build a connection to the Daily Treasure web service using AMSDAL Glue.
This example demonstrates how to:
- Initialize AMSDAL Glue
- Define and register a connection to the Daily Treasure web service
- Fetch all schemas/tables from connection
- Fetch all data from the "daily_treasury_real_long_term_rate" table
- Fetch data by date from the "daily_treasury_yield_curve" table
Databases and data overview¶
The data source for this example is the Daily Treasury Yield Curve Rates and Daily Treasury Real Long-Term Rate from the U.S. Department of the Treasury: https://home.treasury.gov/resource-center/data-chart-center/interest-rates/TextView?type=daily_treasury_yield_curve&field_tdr_date_value_month=202407
Where each type of Interest Rate Data represent a separate table in the database dou to the different structure of the data.
This example is not optimized for performance, but rather to demonstrate the capabilities of AMSDAL Glue.
This example uses the httpx
library to fetch data from the web service.
Quick start¶
Step 1: Install the required packages¶
Let's install amsdal-glue
and httpx
packages:
!pip install amsdal-glue httpx
Defining the DailyTreasureWebConnection
class¶
We are going to define a connection class that inherits from amsdal_glue.interfaces.ConnectionBase
and implements the required methods to fetch data from the web service.
import logging
import re
import xml.etree.ElementTree as ElementTree
from datetime import datetime
from functools import lru_cache
from typing import Any
from typing import ClassVar
import httpx
import polars as pl
import amsdal_glue as glue
from amsdal_glue.queries.polars_operator_constructor import polars_operator_constructor
from amsdal_glue_core.commands.lock_command_node import ExecutionLockCommand
from amsdal_glue_core.common.operations.mutations.data import DataMutation
from amsdal_glue_core.queries.final_query_statement import QueryStatementNode
from amsdal_glue_connections.sql.connections.postgres_connection import get_pg_transform
logger = logging.getLogger(__name__)
MAX_PAGES = 5
class DailyTreasureWebCache:
_instance: "DailyTreasureWebCache"
def __new__(cls, *args, **kwargs):
if not hasattr(cls, "instance"):
cls._instance = super(DailyTreasureWebCache, cls).__new__(cls)
return cls._instance
def __init__(self):
self.cache: dict[str, pl.DataFrame] = {}
class DailyTreasureWebConnection(glue.interfaces.ConnectionBase):
BASE_URL: ClassVar[str] = (
"https://home.treasury.gov/resource-center/data-chart-center/interest-rates/pages/xml"
)
TABLES: ClassVar[list[str]] = [
"daily_treasury_yield_curve",
"daily_treasury_bill_rates",
"daily_treasury_long_term_rate",
"daily_treasury_real_yield_curve",
"daily_treasury_real_long_term",
]
TABLES_CONTEXT: ClassVar[pl.SQLContext] = pl.SQLContext(
frames={
"schemas": pl.DataFrame({"name": TABLES}),
},
)
def __init__(self):
self.cache_service: DailyTreasureWebCache = DailyTreasureWebCache()
def query(self, query: glue.QueryStatement) -> list[glue.Data]:
tables = query.get_related_tables()
for table in tables:
if table in self.cache_service.cache:
continue
self.cache_service.cache[table] = self._load_all(table)
sql = self._build_sql(query)
sql_context = pl.SQLContext(frames=self.cache_service.cache)
return self.process_results(
sql_context.execute(sql).collect().to_dicts(), # type: ignore[attr-defined]
)
def query_schema(self, filters: glue.Conditions | None = None) -> list[glue.Schema]:
_statement = "SELECT name FROM schemas"
if filters:
_statement += f" WHERE {self._sql_build_conditions(filters)}"
_schemas = self.TABLES_CONTEXT.execute(_statement).collect().to_dicts() # type: ignore[attr-defined]
return [
self.extract_schema(
name=_schema["name"],
xml_content=self._request_xml(_schema["name"], 2024),
)
for _schema in _schemas
]
def run_mutations(self, mutations: list[DataMutation]) -> list[list[glue.Data] | None]:
msg = f"Mutations are not supported"
raise NotImplementedError(msg)
def acquire_lock(self, lock: ExecutionLockCommand) -> Any:
logger.warning("This connection does not support locks")
def release_lock(self, lock: ExecutionLockCommand) -> Any:
logger.warning("This connection does not support locks")
def commit_transaction(self, transaction: glue.TransactionCommand | str | None) -> Any:
logger.warning("This connection does not support transactions")
def rollback_transaction(self, transaction: glue.TransactionCommand | str | None) -> Any:
logger.warning("This connection does not support transactions")
def begin_transaction(self, transaction: glue.TransactionCommand | str | None) -> Any:
logger.warning("This connection does not support transactions")
def revert_transaction(self, transaction: glue.TransactionCommand | str | None) -> Any:
logger.warning("This connection does not support transactions")
def run_schema_command(self, command: glue.SchemaCommand) -> list[glue.Schema | None]:
msg = f"Schema commands are not supported"
raise NotImplementedError(msg)
def connect(self, *args: Any, **kwargs: Any) -> None:
self._client = httpx.Client(timeout=120)
def disconnect(self) -> None:
self._client.close()
@property
def is_connected(self) -> bool:
return not self._client.is_closed
def process_results(self, data: list[dict[str, Any]]) -> list[glue.Data]:
_first_item = data[0] if data else {}
# check columns duplications
for key in _first_item:
if ":" in key:
msg = f"Column name {key.split(':', 1)[1]} is duplicated"
raise ValueError(msg)
return [glue.Data(data=_item) for _item in data] if data is not None else []
def _build_field_reference_stmt(
self,
field: glue.FieldReference | glue.FieldReferenceAliased,
) -> str:
_item_stmt = f"{field.table_name}.{self._build_field(field.field)}"
if isinstance(field, glue.FieldReferenceAliased):
_item_stmt += f" AS {field.alias}"
return _item_stmt
def _build_field(self, field: glue.Field) -> str:
parts = []
while field:
parts.append(field.name)
field = field.child # type: ignore[assignment]
return "__".join(parts)
def extract_schema(self, name: str, xml_content: str) -> glue.Schema:
root = ElementTree.fromstring(xml_content)
namespaces = dict(re.findall(r'xmlns:?(\w*?)=["\'](.*?)["\']', xml_content))
first_entry = root.find(".//{http://www.w3.org/2005/Atom}entry", namespaces)
if first_entry is None:
msg = "No schema found"
raise ValueError(msg)
_properties_section = first_entry.find(".//m:properties", namespaces)
_properties: list[glue.PropertySchema] = []
for _property in _properties_section:
tag_name = _property.tag.split("}")[-1] # Remove namespace
_type = _property.attrib.get("{" + namespaces["m"] + "}type")
_properties.append(
glue.PropertySchema(
name=tag_name,
type=self._xml_to_python_type(_type),
required=False,
),
)
return glue.Schema(
name=name,
properties=_properties,
)
@staticmethod
def _xml_to_python_type(xml_type: str | None) -> Any:
match xml_type:
case "Edm.DateTime":
return datetime
case "Edm.Double":
return float
case "Edm.Int32":
return int
case "Edm.String":
return str
case _:
return str
@lru_cache(maxsize=None)
def _request_xml(
self,
name: str,
period_year: int | str,
period_month: int | None = None,
page: int = 0,
) -> str:
period_param = (
"field_tdr_date_value_month" if period_month else "field_tdr_date_value"
)
period_value = (
f"{period_year}{period_month:02d}" if period_month else str(period_year)
)
params = {
"data": name,
period_param: period_value,
}
if page:
params["page"] = page
response = self._client.get(
url=self.BASE_URL,
params=params,
)
response.raise_for_status()
return response.text
def _load_all(self, table_name: str) -> pl.DataFrame:
page = 0
all_pages = []
while data := self._load_data(self._request_xml(table_name, "all", page=page)):
all_pages.extend(data)
page += 1
if page >= MAX_PAGES:
break
return pl.DataFrame(all_pages)
def _load_data(self, xml_content: str) -> list[dict[str, Any]] | None:
root = ElementTree.fromstring(xml_content)
namespaces = dict(re.findall(r'xmlns:?(\w*?)=["\'](.*?)["\']', xml_content))
entries = root.findall(".//{" + namespaces[""] + "}entry", namespaces)
if not entries:
return None
data: list[dict[str, Any]] = []
for entry in entries:
_properties = entry.find(".//m:properties", namespaces)
_item = {}
for _property in _properties:
tag_name = _property.tag.split("}")[-1] # Remove namespace
_type = _property.attrib.get("{" + namespaces["m"] + "}type")
_value_raw = _property.text
_python_type = self._xml_to_python_type(_type)
if _python_type is datetime:
_value = datetime.fromisoformat(_value_raw)
else:
_value = _python_type(_value_raw)
_item[tag_name] = _value
data.append(_item)
return data
def _build_sql(self, query: glue.QueryStatement) -> str:
_sql: list[str] = [
"SELECT",
]
_selection_stmt = self._sql_build_selection_stmt(
query.only,
query.annotations,
query.aggregations,
)
_sql.append(_selection_stmt or "*")
_sql.append(f"FROM {query.table.alias or query.table.name}")
_sql.append(self._sql_build_joins(query.joins))
_sql.append(self._sql_build_where(query.where))
_sql.append(self._sql_build_group_by(query.group_by))
_sql.append(self._sql_build_order_by(query.order_by))
_sql.append(self._sql_build_limit(query.limit))
return " ".join(filter(None, _sql))
def _sql_build_selection_stmt(
self,
only: list[glue.FieldReference | glue.FieldReferenceAliased] | None,
annotations: list[glue.AnnotationQuery] | None,
aggregations: list[glue.AggregationQuery] | None,
) -> str:
_stmt = [self._build_field_reference_stmt(_item) for _item in only or []]
for annotation in annotations or []:
if isinstance(annotation.value, QueryStatementNode):
msg = "PolarsFinalQueryExecutor does not support subquery annotations"
raise TypeError(msg)
_val = repr(annotation.value.value)
_stmt.append(f"{_val} AS {annotation.value.alias}")
for aggregation in aggregations or []:
_aggr_field = self._build_field_reference_stmt(aggregation.field)
_stmt.append(
f"{aggregation.expression.name}({_aggr_field}) AS {aggregation.alias}"
)
return ", ".join(filter(None, _stmt))
def _sql_build_joins(
self,
joins: list[glue.JoinQuery] | None,
) -> str:
if not joins:
return ""
_stmt = []
for join in joins:
_conditions = self._sql_build_conditions(join.on)
_stmt.append(
f"{join.join_type.value} JOIN {join.table.alias} ON {_conditions}"
)
return " ".join(_stmt)
def _sql_build_where(
self,
where: glue.Conditions | None,
) -> str:
if not where:
return ""
return f"WHERE {self._sql_build_conditions(where)}"
def _sql_build_conditions(
self,
conditions: glue.Conditions,
) -> str:
_stmt = []
for condition in conditions.children:
if isinstance(condition, glue.Conditions):
_stmt.append(f"({self._sql_build_conditions(condition)})")
continue
_stmt.append(
polars_operator_constructor(
left=condition.left,
lookup=condition.lookup,
right=condition.right,
transform=get_pg_transform(),
)
)
return f" {conditions.connector.value} ".join(_stmt)
def _sql_build_group_by(
self,
group_by: list[glue.GroupByQuery] | None,
) -> str:
if not group_by:
return ""
_stmt = [self._build_field_reference_stmt(_item.field) for _item in group_by]
return f"GROUP BY {', '.join(_stmt)}"
def _sql_build_order_by(
self,
order_by: list[glue.OrderByQuery] | None,
) -> str:
if not order_by:
return ""
_stmt = []
for field in order_by:
_item_stmt = self._build_field_reference_stmt(field.field)
_stmt.append(f"{_item_stmt} {field.direction.value}")
return f"ORDER BY {', '.join(_stmt)}"
def _sql_build_limit(
self,
limit: glue.LimitQuery | None,
) -> str:
if not limit:
return ""
_stmt = f"LIMIT {limit.limit}"
if limit.offset:
_stmt += f" OFFSET {limit.offset}"
return _stmt
def queries(self) -> list[str]:
return []
@property
def is_alive(self) -> bool:
return True
Initialize AMSDAL Glue¶
Let's initialize AMSDAL Glue:
# Init default containers
glue.init_default_containers()
Register the connection¶
Now we need to register the connection in AMSDAL Glue.
web_treasure_pool = glue.DefaultConnectionPool(DailyTreasureWebConnection)
connection_mng = glue.Container.managers.get(glue.interfaces.ConnectionManager)
connection_mng.register_connection_pool(web_treasure_pool)
Work with the connection¶
That's it. Now we are ready to do queries to our connection.
# Fetch all existing schemas (tables)
query_service = glue.Container.services.get(glue.interfaces.SchemaQueryService)
result = query_service.execute(glue.SchemaQueryOperation())
if result.success:
print('Found schemas:', len(result.schemas))
for schema in result.schemas:
print(f" - {schema.name}")
else:
print('Error:', result.message)
Found schemas: 5 - daily_treasury_yield_curve - daily_treasury_bill_rates - daily_treasury_long_term_rate - daily_treasury_real_yield_curve - daily_treasury_real_long_term
As you can see above, it found 5 schemas (tables).
Let's do now a query to all "Daily Treasury Real Long-Term Rates":
from pprint import pprint
# Build a query to `daily_treasury_real_long_term` schema
query = glue.QueryStatement(
table=glue.SchemaReference(name="daily_treasury_real_long_term"),
)
# Execute the query via the DataQueryService
query_service = glue.Container.services.get(glue.interfaces.DataQueryService)
result = query_service.execute(
glue.DataQueryOperation(query=query),
)
if result.success:
print('Found data:', len(result.data))
print('The first 5 records:')
for data in result.data[:5]:
pprint(data)
else:
print('Error:', result.message)
Found data: 1500 The first 5 records: Data(data={'QUOTE_DATE': datetime.datetime(2000, 1, 3, 0, 0), 'RATE': 4.3}, metadata=None) Data(data={'QUOTE_DATE': datetime.datetime(2000, 1, 4, 0, 0), 'RATE': 4.3}, metadata=None) Data(data={'QUOTE_DATE': datetime.datetime(2000, 1, 5, 0, 0), 'RATE': 4.3}, metadata=None) Data(data={'QUOTE_DATE': datetime.datetime(2000, 1, 6, 0, 0), 'RATE': 4.32}, metadata=None) Data(data={'QUOTE_DATE': datetime.datetime(2000, 1, 7, 0, 0), 'RATE': 4.32}, metadata=None)
Perfect!
Let's now do a query with filter to the "Daily Treasury Par Yield Curve Rates" for 1995-12-22
:
query = glue.QueryStatement(
table=glue.SchemaReference(name="daily_treasury_yield_curve"),
order_by=[glue.OrderByQuery(
field=glue.FieldReference(field=glue.Field(name='NEW_DATE'), table_name='daily_treasury_yield_curve'),
direction=glue.OrderDirection.DESC,
)],
where=glue.Conditions(
glue.Condition(
left=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name="NEW_DATE"),
table_name="daily_treasury_yield_curve",
),
),
lookup=glue.FieldLookup.EQ,
right=glue.Value('1995-12-22'),
),
),
)
# Execute the query via the DataQueryService
result = query_service.execute(
glue.DataQueryOperation(query=query),
)
if result.success:
print('Found data:', len(result.data))
for data in result.data:
pprint(data)
else:
print('Error:', result.message)
Found data: 1 Data(data={'BC_10YEAR': 5.71, 'BC_1YEAR': 5.26, 'BC_2YEAR': 5.28, 'BC_30YEAR': 6.06, 'BC_30YEARDISPLAY': 0.0, 'BC_3MONTH': 5.04, 'BC_3YEAR': 5.37, 'BC_5YEAR': 5.5, 'BC_6MONTH': 5.27, 'BC_7YEAR': 5.62, 'Id': 609, 'NEW_DATE': datetime.datetime(1995, 12, 22, 0, 0)}, metadata=None)
Amazing!
Conclusion¶
In this example, we demonstrated how to build a connection to the Daily Treasury web service using AMSDAL Glue.
We defined a connection class that implements the required methods to fetch data from the web service and registered it in AMSDAL Glue. This is the most complex part, you need to implement the glue.interfaces.ConnectionBase
interface.
This example is not optimized for performance, but rather to demonstrate the capabilities of AMSDAL Glue. You can implement a connection to any source you want, as long as you implement the required methods.