Skip to content

Ingestion

Ingestion is the process of converting your data into vector embeddings and storing them for semantic search. amsdal_ml provides two approaches: model ingestion (for AMSDAL model objects) and document ingestion (for files like PDFs and text).

Model Ingestion

Indexing a Single Object

Use OpenAIIngesting to embed individual AMSDAL model objects:

from amsdal_ml.ml_ingesting.openai_ingesting import OpenAIIngesting

ingesting = OpenAIIngesting(tags=['customers'])

# Async (recommended)
records = await ingesting.agenerate_embeddings(my_object)
await ingesting.asave(records, my_object)

# Sync
records = ingesting.generate_embeddings(my_object)
ingesting.save(records, my_object)

The ingesting process walks the object's fields recursively, generates structured text, splits it into chunks, embeds each chunk via OpenAI, and stores the results as EmbeddingModel records.

Batch Ingestion with ModelIngester

ModelIngester processes collections of model objects — iterables, async iterables, or QuerySets:

from amsdal_ml.ml_ingesting.model_ingester import ModelIngester
from amsdal_ml.ml_ingesting.pipeline import DefaultIngestionPipeline
from amsdal_ml.ml_ingesting.loaders.pdf_loader import PdfLoader
from amsdal_ml.ml_ingesting.processors.cleaner import TextCleaner
from amsdal_ml.ml_ingesting.splitters.token_splitter import TokenSplitter
from amsdal_ml.ml_ingesting.embedders.openai_embedder import OpenAIEmbedder
from amsdal_ml.ml_ingesting.stores.embedding_data import EmbeddingDataStore

pipeline = DefaultIngestionPipeline(
    loader=PdfLoader(),
    cleaner=TextCleaner(),
    splitter=TokenSplitter(max_tokens=800, overlap_tokens=100),
    embedder=OpenAIEmbedder(),
    store=EmbeddingDataStore(),
)

ingester = ModelIngester(pipeline=pipeline)

# From a QuerySet
await ingester.aingest(MyModel.objects.all(), tags=['bulk-import'])

# From a list
await ingester.aingest(my_objects, tags=['manual'])

ModelIngester automatically:

  • Detects file fields (File, bytes, strings) on each object
  • Passes file content through the document ingestion pipeline
  • Logs progress and skips objects that fail with errors
  • Links each embedding back to the source object

Tags

Tags are string labels attached to embeddings. They're useful for filtering during search:

# Tag by data source
ingesting = OpenAIIngesting(tags=['customers', 'active'])
records = await ingesting.agenerate_embeddings(customer)
await ingesting.asave(records, customer)

# Later, search only within tagged embeddings
results = await retriever.asimilarity_search(
    'find VIP customers',
    include_tags=['customers'],
)

Document Ingestion Pipeline

For files (PDFs, text documents), use DefaultIngestionPipeline. The pipeline chains these stages:

Loader → Cleaner → Splitter → Embedder → Store

Built-in Loaders

Loader Description
PdfLoader Extracts text from PDF files using pypdf. Handles noisy pages, spaced characters, and page metadata.
TextLoader Reads plain text files (configurable encoding).
FolderLoader Standalone utility that recursively loads files from a directory using a delegated Loader. Not a Loader subclass — use via load_all()/aload_all().
PdfFolderLoader FolderLoader pre-configured with PdfLoader.

Pipeline Example

from amsdal_ml.ml_ingesting.pipeline import DefaultIngestionPipeline
from amsdal_ml.ml_ingesting.loaders.pdf_loader import PdfLoader
from amsdal_ml.ml_ingesting.processors.cleaner import TextCleaner
from amsdal_ml.ml_ingesting.splitters.token_splitter import TokenSplitter
from amsdal_ml.ml_ingesting.embedders.openai_embedder import OpenAIEmbedder
from amsdal_ml.ml_ingesting.stores.embedding_data import EmbeddingDataStore
from amsdal_ml.ml_ingesting.types import IngestionSource

pipeline = DefaultIngestionPipeline(
    loader=PdfLoader(),
    cleaner=TextCleaner(),
    splitter=TokenSplitter(max_tokens=800, overlap_tokens=100),
    embedder=OpenAIEmbedder(),
    store=EmbeddingDataStore(),
)

# Ingest a PDF file
source = IngestionSource(
    object_class='Contract',
    object_id='1',
    tags=['contracts', 'legal'],
    metadata={'source': 'contract.pdf'},
)
with open('contract.pdf', 'rb') as f:
    await pipeline.arun(f, filename='contract.pdf', tags=['contracts', 'legal'], source=source)

Pipeline Stages

  1. Loader — reads the file and produces a LoadedDocument (list of LoadedPage with text and metadata)
  2. Cleaner — normalizes and cleans text (whitespace, encoding issues)
  3. Splitter — splits text into TextChunk objects with configurable token limits and overlap
  4. Embedder — generates vector embeddings for each chunk via OpenAI
  5. Store — saves embeddings as EmbeddingModel records in the database

Tags and metadata are merged at each stage, so you can tag at the source level and at the chunk level.

Custom Ingesting

Subclass MLIngesting for a custom implementation:

from amsdal_ml.ml_ingesting.ingesting import MLIngesting

class MyIngesting(MLIngesting):
    def generate_text(self, instance: Any) -> str:
        # Custom text generation from object
        ...

    async def agenerate_text(self, instance: Any) -> str:
        ...

    def get_tags(self) -> list[str]:
        return ['my-tag']

    async def aget_tags(self) -> list[str]:
        return ['my-tag']

    def generate_embeddings(self, instance: Any, embed_func=None) -> list[EmbeddingData]:
        # Custom embedding generation
        ...

    async def agenerate_embeddings(self, instance: Any, embed_func=None) -> list[EmbeddingData]:
        ...

    def save(self, records: Sequence[EmbeddingData], instance: Any) -> list[EmbeddingData]:
        # Custom save logic
        ...

    async def asave(self, records: Sequence[EmbeddingData], instance: Any) -> list[EmbeddingData]:
        ...

Register via config:

export ML_INGESTING_CLASS='myapp.ingesting.MyIngesting'