Ingestion¶
Ingestion is the process of converting your data into vector embeddings and storing them for semantic search. amsdal_ml provides two approaches: model ingestion (for AMSDAL model objects) and document ingestion (for files like PDFs and text).
Model Ingestion¶
Indexing a Single Object¶
Use OpenAIIngesting to embed individual AMSDAL model objects:
from amsdal_ml.ml_ingesting.openai_ingesting import OpenAIIngesting
ingesting = OpenAIIngesting(tags=['customers'])
# Async (recommended)
records = await ingesting.agenerate_embeddings(my_object)
await ingesting.asave(records, my_object)
# Sync
records = ingesting.generate_embeddings(my_object)
ingesting.save(records, my_object)
The ingesting process walks the object's fields recursively, generates structured text, splits it into chunks, embeds each chunk via OpenAI, and stores the results as EmbeddingModel records.
Batch Ingestion with ModelIngester¶
ModelIngester processes collections of model objects — iterables, async iterables, or QuerySets:
from amsdal_ml.ml_ingesting.model_ingester import ModelIngester
from amsdal_ml.ml_ingesting.pipeline import DefaultIngestionPipeline
from amsdal_ml.ml_ingesting.loaders.pdf_loader import PdfLoader
from amsdal_ml.ml_ingesting.processors.cleaner import TextCleaner
from amsdal_ml.ml_ingesting.splitters.token_splitter import TokenSplitter
from amsdal_ml.ml_ingesting.embedders.openai_embedder import OpenAIEmbedder
from amsdal_ml.ml_ingesting.stores.embedding_data import EmbeddingDataStore
pipeline = DefaultIngestionPipeline(
loader=PdfLoader(),
cleaner=TextCleaner(),
splitter=TokenSplitter(max_tokens=800, overlap_tokens=100),
embedder=OpenAIEmbedder(),
store=EmbeddingDataStore(),
)
ingester = ModelIngester(pipeline=pipeline)
# From a QuerySet
await ingester.aingest(MyModel.objects.all(), tags=['bulk-import'])
# From a list
await ingester.aingest(my_objects, tags=['manual'])
ModelIngester automatically:
- Detects file fields (
File, bytes, strings) on each object - Passes file content through the document ingestion pipeline
- Logs progress and skips objects that fail with errors
- Links each embedding back to the source object
Tags¶
Tags are string labels attached to embeddings. They're useful for filtering during search:
# Tag by data source
ingesting = OpenAIIngesting(tags=['customers', 'active'])
records = await ingesting.agenerate_embeddings(customer)
await ingesting.asave(records, customer)
# Later, search only within tagged embeddings
results = await retriever.asimilarity_search(
'find VIP customers',
include_tags=['customers'],
)
Document Ingestion Pipeline¶
For files (PDFs, text documents), use DefaultIngestionPipeline. The pipeline chains these stages:
Loader → Cleaner → Splitter → Embedder → Store
Built-in Loaders¶
| Loader | Description |
|---|---|
PdfLoader |
Extracts text from PDF files using pypdf. Handles noisy pages, spaced characters, and page metadata. |
TextLoader |
Reads plain text files (configurable encoding). |
FolderLoader |
Standalone utility that recursively loads files from a directory using a delegated Loader. Not a Loader subclass — use via load_all()/aload_all(). |
PdfFolderLoader |
FolderLoader pre-configured with PdfLoader. |
Pipeline Example¶
from amsdal_ml.ml_ingesting.pipeline import DefaultIngestionPipeline
from amsdal_ml.ml_ingesting.loaders.pdf_loader import PdfLoader
from amsdal_ml.ml_ingesting.processors.cleaner import TextCleaner
from amsdal_ml.ml_ingesting.splitters.token_splitter import TokenSplitter
from amsdal_ml.ml_ingesting.embedders.openai_embedder import OpenAIEmbedder
from amsdal_ml.ml_ingesting.stores.embedding_data import EmbeddingDataStore
from amsdal_ml.ml_ingesting.types import IngestionSource
pipeline = DefaultIngestionPipeline(
loader=PdfLoader(),
cleaner=TextCleaner(),
splitter=TokenSplitter(max_tokens=800, overlap_tokens=100),
embedder=OpenAIEmbedder(),
store=EmbeddingDataStore(),
)
# Ingest a PDF file
source = IngestionSource(
object_class='Contract',
object_id='1',
tags=['contracts', 'legal'],
metadata={'source': 'contract.pdf'},
)
with open('contract.pdf', 'rb') as f:
await pipeline.arun(f, filename='contract.pdf', tags=['contracts', 'legal'], source=source)
Pipeline Stages¶
- Loader — reads the file and produces a
LoadedDocument(list ofLoadedPagewith text and metadata) - Cleaner — normalizes and cleans text (whitespace, encoding issues)
- Splitter — splits text into
TextChunkobjects with configurable token limits and overlap - Embedder — generates vector embeddings for each chunk via OpenAI
- Store — saves embeddings as
EmbeddingModelrecords in the database
Tags and metadata are merged at each stage, so you can tag at the source level and at the chunk level.
Custom Ingesting¶
Subclass MLIngesting for a custom implementation:
from amsdal_ml.ml_ingesting.ingesting import MLIngesting
class MyIngesting(MLIngesting):
def generate_text(self, instance: Any) -> str:
# Custom text generation from object
...
async def agenerate_text(self, instance: Any) -> str:
...
def get_tags(self) -> list[str]:
return ['my-tag']
async def aget_tags(self) -> list[str]:
return ['my-tag']
def generate_embeddings(self, instance: Any, embed_func=None) -> list[EmbeddingData]:
# Custom embedding generation
...
async def agenerate_embeddings(self, instance: Any, embed_func=None) -> list[EmbeddingData]:
...
def save(self, records: Sequence[EmbeddingData], instance: Any) -> list[EmbeddingData]:
# Custom save logic
...
async def asave(self, records: Sequence[EmbeddingData], instance: Any) -> list[EmbeddingData]:
...
Register via config:
export ML_INGESTING_CLASS='myapp.ingesting.MyIngesting'