Basic usage¶
This notebook contains examples of how to use the AMSDAL Glue library.
We will cover the following topics:
- Initializing the Glue library and configure connections
- Create database tables
- Query database tables
- Query data
- Manage data
Installation¶
The first of all, let's install AMSDAL Glue library. We are going to use the csv
connection for this example, so we will install the csv
connection as well:
!pip install "amsdal-glue[csv]"
Initialize the Glue library¶
The AMSDAL Glue library build on to of the amsdal-glue-core
library that is set of interfaces and base classes for the Glue library, that provides an ability to implement any part of the AMSDAL Glue library.
The amsdal-glue
includes the default implementations of the interfaces and base classes from its core library. Implemented services, managers and others are registered through the Container
class.
Let's register the default implementations:
import amsdal_glue as glue
# Initialize the default containers
glue.init_default_containers()
That's it! Now we can use the Glue library.
Register SQLite connection¶
Now, let's register the SQLite connection. The sqlite
library is delivered with Python, so we don't need to install anything. So, connecting to SQLite is as simple as:
import tempfile
from pathlib import Path
# Create temporary directory
temp_dir = tempfile.TemporaryDirectory()
working_directory = Path(temp_dir.name)
# Create connection pool
connection_pool = glue.DefaultConnectionPool(
glue.SqliteConnection,
db_path=working_directory / 'db.sqlite',
)
# Register the connection pool
connection_manager: glue.interfaces.ConnectionManager = glue.Container.managers.get(glue.interfaces.ConnectionManager)
connection_manager.register_connection_pool(connection_pool)
Now we are ready to use AMSDAL Glue to create tables in our SQLite database.
Create database tables¶
We are going to create two tables: courses
and grades
.
The courses
table:
Column Name | Type |
---|---|
course_id | int |
name | str |
The grades
table:
Column Name | Type |
---|---|
grade_id | int |
course_id | int |
member_email | str |
grade | str |
We will use Glue Schema
to describe the tables:
courses_schema = glue.Schema(
name="courses",
properties=[
glue.PropertySchema(
name="course_id",
type=int,
required=True,
),
glue.PropertySchema(
name="name",
type=str,
required=True,
),
],
constraints=[
glue.PrimaryKeyConstraint(name="pk_course", fields=["course_id"]),
],
)
grades_schema = glue.Schema(
name="grades",
properties=[
glue.PropertySchema(
name="grade_id",
type=int,
required=True,
),
glue.PropertySchema(
name="course_id",
type=int,
required=True,
),
glue.PropertySchema(
name="member_email",
type=str,
required=True,
),
glue.PropertySchema(
name="grade",
type=str,
required=True,
),
],
constraints=[
glue.PrimaryKeyConstraint(name="pk_grade", fields=["grade_id"]),
],
)
And now we can use the SchemaCommandService
to create the tables:
service = glue.Container.services.get(glue.interfaces.SchemaCommandService)
result = service.execute(
glue.SchemaCommand(
mutations=[
glue.RegisterSchema(schema=courses_schema),
glue.RegisterSchema(schema=grades_schema),
],
),
)
assert result.success is True, result.message
print('Success:', result.success)
Success: True
Query database tables¶
Let's check if the tables are created, by using the SchemaQueryService
:
query = glue.SchemaQueryOperation()
query_service = glue.Container.services.get(glue.interfaces.SchemaQueryService)
result = query_service.execute(query)
print('Found total schemas:', len(result.schemas))
print('List of schemas:')
for schema in result.schemas:
print(' - ', schema.name)
Found total schemas: 2 List of schemas: - courses - grades
We can use SchemaQueryOperation
to filter the schemas:
# Query tables that start with "gra"
query = glue.SchemaQueryOperation(
filters=glue.Conditions(
glue.Condition(
left=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name='name'),
table_name=glue.SCHEMA_REGISTRY_TABLE,
),
),
lookup=glue.FieldLookup.STARTSWITH,
right=glue.Value('gra'),
),
),
)
result = query_service.execute(query)
if not result.schemas:
raise Exception('No schemas found')
print('Found total schemas:', len(result.schemas))
print('List of schemas:')
for schema in result.schemas:
print(' - ', schema.name)
Found total schemas: 1 List of schemas: - grades
Note, the glue.SCHEMA_REGISTRY_TABLE
is a special uniform table name that you need to use to query the schemas in the database due to each database stores information about its tables in different way and in different system tables.
Insert data into tables¶
Let's populate the courses
and grades
tables with some data:
# Our raw data
course_data = [
{'course_id': 1, 'name': 'Python'},
{'course_id': 2, 'name': 'Java'},
{'course_id': 3, 'name': 'C++'},
{'course_id': 4, 'name': 'JavaScript'},
]
grades_data = [
{'grade_id': 1, 'course_id': 1, 'member_email': 'joe@amsdal.com', 'grade': 'B'},
{'grade_id': 2, 'course_id': 1, 'member_email': 'annet@amsdal.com', 'grade': 'B'},
{'grade_id': 3, 'course_id': 2, 'member_email': 'mark@amsdal.com', 'grade': 'A'},
{'grade_id': 4, 'course_id': 2, 'member_email': 'susan@amsdal.com', 'grade': 'B'},
{'grade_id': 5, 'course_id': 3, 'member_email': 'john@amsdal.com', 'grade': 'C'},
{'grade_id': 6, 'course_id': 3, 'member_email': 'emma@amsdal.com', 'grade': 'A'},
{'grade_id': 7, 'course_id': 4, 'member_email': 'paul@amsdal.com', 'grade': 'B'},
{'grade_id': 8, 'course_id': 4, 'member_email': 'lisa@amsdal.com', 'grade': 'A'},
{'grade_id': 9, 'course_id': 1, 'member_email': 'dave@amsdal.com', 'grade': 'C'},
{'grade_id': 10, 'course_id': 2, 'member_email': 'amy@amsdal.com', 'grade': 'B'},
]
# build insert mutation
courses_mutation = glue.InsertData(
schema=glue.SchemaReference(name='courses'),
data=[
glue.Data(data=course) for course in course_data
],
)
grades_mutation = glue.InsertData(
schema=glue.SchemaReference(name='grades'),
data=[
glue.Data(data=grade) for grade in grades_data
],
)
# Get data command service
data_command_service = glue.Container.services.get(glue.interfaces.DataCommandService)
# Execute the mutations
result = data_command_service.execute(
glue.DataCommand(
mutations=[
courses_mutation,
grades_mutation,
],
),
)
print('Inserted successfully:', result.success)
Inserted successfully: True
Perfect!
Query data¶
Now, let's query the data from the courses
and grades
tables.
Let's get count of courses:
# Build query
query = glue.QueryStatement(
table=glue.SchemaReference(name='courses'),
aggregations=[
glue.AggregationQuery(
expression=glue.Count(field=glue.FieldReference(field=glue.Field(name='*'), table_name='courses')),
alias='total_count',
),
],
)
# Get data query service
query_service = glue.Container.services.get(glue.interfaces.DataQueryService)
# Execute the query
result = query_service.execute(
glue.DataQueryOperation(
query=query,
),
)
if not result.success:
raise Exception('Failed to get courses count:', result.message)
print('Total courses:', result.data[0].data['total_count'])
Total courses: 4
Let's now build a query with JOIN statement to get all grades with course names:
# Build query
query = glue.QueryStatement(
only=[
glue.FieldReference(field=glue.Field(name='grade_id'), table_name='grades'),
glue.FieldReference(field=glue.Field(name='name'), table_name='courses'),
glue.FieldReference(field=glue.Field(name='member_email'), table_name='grades'),
glue.FieldReference(field=glue.Field(name='grade'), table_name='grades'),
],
table=glue.SchemaReference(name='grades'),
joins=[
glue.JoinQuery(
table=glue.SchemaReference(name='courses'),
on=glue.Conditions(
glue.Condition(
left=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name='course_id'),
table_name='grades',
),
),
lookup=glue.FieldLookup.EQ,
right=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name='course_id'),
table_name='courses',
),
),
),
),
join_type=glue.JoinType.INNER,
),
],
)
# Execute the query
result = query_service.execute(
glue.DataQueryOperation(
query=query,
),
)
if not result.success:
raise Exception('Failed to get grades:', result.message)
print('Found total grades:', len(result.data))
print('List of grades:')
for item in result.data:
print(' - ', item.data)
Found total grades: 10 List of grades: - {'grade_id': 1, 'name': 'Python', 'member_email': 'joe@amsdal.com', 'grade': 'B'} - {'grade_id': 2, 'name': 'Python', 'member_email': 'annet@amsdal.com', 'grade': 'B'} - {'grade_id': 3, 'name': 'Java', 'member_email': 'mark@amsdal.com', 'grade': 'A'} - {'grade_id': 4, 'name': 'Java', 'member_email': 'susan@amsdal.com', 'grade': 'B'} - {'grade_id': 5, 'name': 'C++', 'member_email': 'john@amsdal.com', 'grade': 'C'} - {'grade_id': 6, 'name': 'C++', 'member_email': 'emma@amsdal.com', 'grade': 'A'} - {'grade_id': 7, 'name': 'JavaScript', 'member_email': 'paul@amsdal.com', 'grade': 'B'} - {'grade_id': 8, 'name': 'JavaScript', 'member_email': 'lisa@amsdal.com', 'grade': 'A'} - {'grade_id': 9, 'name': 'Python', 'member_email': 'dave@amsdal.com', 'grade': 'C'} - {'grade_id': 10, 'name': 'Java', 'member_email': 'amy@amsdal.com', 'grade': 'B'}
Note, we also specified the only
parameter to select only the columns we want to get. The only
parameter is optional, and if not specified, all columns will be selected.
Update data¶
Let's update the grades
table to set the grade to A
for all members starts with jo
in their email:
# Build update mutation
update_mutation = glue.UpdateData(
schema=glue.SchemaReference(name='grades'),
query=glue.Conditions(
glue.Condition(
left=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name='member_email'),
table_name='grades',
),
),
lookup=glue.FieldLookup.STARTSWITH,
right=glue.Value('jo'),
),
),
data=glue.Data(data={'grade': 'A'}),
)
# Execute the mutation
result = data_command_service.execute(
glue.DataCommand(
mutations=[
update_mutation,
],
),
)
if not result.success:
raise Exception('Failed to update grades:', result.message)
print('Updated successfully:', result.success)
Updated successfully: True
Let's now do a query to check if the update was successful. We will use QueryStatement
with WHERE
clause to filter the data:
# Build query
query = glue.QueryStatement(
table=glue.SchemaReference(name='grades'),
where=glue.Conditions(
glue.Condition(
left=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name='member_email'),
table_name='grades',
),
),
lookup=glue.FieldLookup.STARTSWITH,
right=glue.Value('jo'),
),
),
)
# Execute the query
result = query_service.execute(
glue.DataQueryOperation(
query=query,
),
)
if not result.success:
raise Exception('Failed to get grades:', result.message)
print('Found total grades:', len(result.data))
print('List of grades:')
for item in result.data:
print(' - ', item.data)
Found total grades: 2 List of grades: - {'grade_id': 1, 'course_id': 1, 'member_email': 'joe@amsdal.com', 'grade': 'A'} - {'grade_id': 5, 'course_id': 3, 'member_email': 'john@amsdal.com', 'grade': 'A'}
Perfect! You can see both records with jo
in their email have grade A
now.
Multiple connections¶
Let's now see examples how we can register multiple connections and use them in queries.
We will use CsvConnection
to register a CSV connection and use it to query the data from it.
First of all, let's create the CSV file:
raw_data = '''email,rating,comment
"john@amsdal.com",4.4,"Good"
"annet@amsdal.com",4.5,"Good"
"susan@amsdal.com",4,"Good"
"amy@amsdal.com",4.5,"Good"
"dave@amsdal.com",3.1,"OK"
"paul@amsdal.com",5,"Excellent"'''
csv_file = working_directory / 'rating.csv'
csv_file.write_text(raw_data)
194
And now, let's register the CSV connection:
connection_manager.register_connection_pool(
glue.DefaultConnectionPool(
glue.CsvConnection,
db_path=csv_file,
),
schema_name='rating',
)
Note, we specified the schema_name
parameter to link this connection with a specific schema/table name. It means, if we will query the rating
table, it will use this connection.
Now, we have two connections, one for SQLite and another one for CSV. We can use both of them in our queries. Let's do a query with JOIN statement to get all members with grades and their ratings:
# Build query
query = glue.QueryStatement(
only=[
glue.FieldReference(
field=glue.Field(name='member_email'),
table_name='grades',
),
glue.FieldReference(
field=glue.Field(name='grade'),
table_name='grades',
),
glue.FieldReference(
field=glue.Field(name='rating'),
table_name='rating',
),
],
table=glue.SchemaReference(name='grades'),
joins=[
glue.JoinQuery(
table=glue.SchemaReference(name='rating'),
join_type=glue.JoinType.LEFT,
on=glue.Conditions(
glue.Condition(
left=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name='member_email'),
table_name='grades',
),
),
lookup=glue.FieldLookup.EQ,
right=glue.FieldReferenceExpression(
field_reference=glue.FieldReference(
field=glue.Field(name='email'),
table_name='rating',
),
),
),
),
),
],
)
# Execute the query
result = query_service.execute(
glue.DataQueryOperation(
query=query,
),
)
if not result.success:
raise Exception('Failed to get grades:', result.message)
print('Found total grades:', len(result.data))
print('List of grades:')
for item in result.data:
print(' - ', item.data)
Found total grades: 10 List of grades: - {'member_email': 'joe@amsdal.com', 'grade': 'A', 'rating': None} - {'member_email': 'annet@amsdal.com', 'grade': 'B', 'rating': 4.5} - {'member_email': 'mark@amsdal.com', 'grade': 'A', 'rating': None} - {'member_email': 'susan@amsdal.com', 'grade': 'B', 'rating': 4.0} - {'member_email': 'john@amsdal.com', 'grade': 'A', 'rating': 4.4} - {'member_email': 'emma@amsdal.com', 'grade': 'A', 'rating': None} - {'member_email': 'paul@amsdal.com', 'grade': 'B', 'rating': 5.0} - {'member_email': 'lisa@amsdal.com', 'grade': 'A', 'rating': None} - {'member_email': 'dave@amsdal.com', 'grade': 'C', 'rating': 3.1} - {'member_email': 'amy@amsdal.com', 'grade': 'B', 'rating': 4.5}
As you can see, we can use both connections, even if they are different types of connections. The Glue library will take care of the connection management and use the right connection for each query.
SQL Parser¶
The AMSDAL Glue library also provides a SQL parser that can parse SQL queries and convert them to corresponding AMSDAL Glue objects. This allows you to use SQL syntax to query the data, instead of using the Glue API.
In order to use this feature, you need to install the following library:
!pip install "amsdal-glue-sql-parser[sqloxide]"
Next we need to create a parser object:
from amsdal_glue_sql_parser.parsers.sqloxide_parser import SqlOxideParser
parser = SqlOxideParser()
Now, we are ready to use the parser. Let's do a similar queries as before, but using SQL syntax. Let's get the total count of courses:
result = query_service.execute(
parser.parse_sql("SELECT COUNT(*) AS total_count FROM courses")[0],
)
if not result.success:
raise Exception('Failed to get courses count:', result.message)
print('Total courses:', result.data[0].data['total_count'])
Total courses: 4
Note, the parse_sql
method returns a list of queries, because the SQL query can contain multiple queries. In this case, we have only one query, so we take the first one with [0]
.
Now, let's do a query with JOIN statement to get all grades with course names:
result = query_service.execute(
parser.parse_sql(
"SELECT grades.grade_id, courses.name, grades.member_email, grades.grade "
"FROM grades "
"INNER JOIN courses ON grades.course_id = courses.course_id"
)[0],
)
if not result.success:
raise Exception('Failed to get grades:', result.message)
print('Found total grades:', len(result.data))
print('List of grades:')
for item in result.data:
print(' - ', item.data)
Found total grades: 10 List of grades: - {'grade_id': 1, 'name': 'Python', 'member_email': 'joe@amsdal.com', 'grade': 'A'} - {'grade_id': 2, 'name': 'Python', 'member_email': 'annet@amsdal.com', 'grade': 'B'} - {'grade_id': 3, 'name': 'Java', 'member_email': 'mark@amsdal.com', 'grade': 'A'} - {'grade_id': 4, 'name': 'Java', 'member_email': 'susan@amsdal.com', 'grade': 'B'} - {'grade_id': 5, 'name': 'C++', 'member_email': 'john@amsdal.com', 'grade': 'A'} - {'grade_id': 6, 'name': 'C++', 'member_email': 'emma@amsdal.com', 'grade': 'A'} - {'grade_id': 7, 'name': 'JavaScript', 'member_email': 'paul@amsdal.com', 'grade': 'B'} - {'grade_id': 8, 'name': 'JavaScript', 'member_email': 'lisa@amsdal.com', 'grade': 'A'} - {'grade_id': 9, 'name': 'Python', 'member_email': 'dave@amsdal.com', 'grade': 'C'} - {'grade_id': 10, 'name': 'Java', 'member_email': 'amy@amsdal.com', 'grade': 'B'}
As you can see, we have got the same result as before, but using SQL syntax.
Let's now insert new records into the courses
table using SQL syntax:
result = data_command_service.execute(
parser.parse_sql(
"INSERT INTO courses (course_id, name) VALUES "
"(5, 'Ruby'), "
"(6, 'Go'), "
"(7, 'PHP')"
)[0],
)
if not result.success:
raise Exception('Failed to insert courses:', result.message)
print('Inserted successfully:', result.success)
Inserted successfully: True
Conclusion¶
In this notebook, we have covered the basic usage of the AMSDAL Glue library.
We have seen how to initialize the library, create database tables, query data, and manage data. We also saw how to use multiple connections and the SQL parser to query data using SQL syntax.
The AMSDAL Glue library is a powerful tool that allows you to work with different data sources in a unified way.
It provides a simple and intuitive API to work with data, and it is easy to extend and customize.