PostgreSQL + Qdrant en la ingestión RAG: el cartero que sincroniza dos mundos

TL;DR — En un sistema RAG de producción, PostgreSQL guarda la verdad oficial de los documentos y Qdrant guarda los vectores para búsqueda. Mantenerlos sincronizados no es trivial: si borras un documento de Postgres y no invalidas sus chunks en Qdrant, el sistema devuelve respuestas de documentos fantasma. Hay dos patrones para evitarlo: el outbox pattern (transacción atómica + worker asíncrono, at-least-once) y CDC con Debezium (lectura directa del WAL de Postgres, baja latencia, mayor complejidad). Este artículo explica cuándo usar cada uno, cómo orquestarlos como microservicios y qué números esperar con bge-m3 en hardware on-premise.


La analogía del cartero y el registro civil

Imagina una ciudad con dos oficinas complementarias.

La primera es el Registro Civil: guarda el censo oficial. Cada vez que nace alguien, muere o cambia de domicilio, el Registro es el primero en saberlo. Es lento, estructurado, transaccional. Si el Registro dice que alguien existe, existe. Si dice que murió, está muerto. PostgreSQL es el Registro Civil de tus documentos.

La segunda es la libreta del cartero: una copia optimizada para encontrar a cualquier vecino en segundos, organizada por zonas, nombres fonéticos y rutas habituales. El cartero no puede actualizar el Registro, pero sí buscar a velocidades que el Registro jamás alcanzaría. Qdrant es la libreta del cartero.

El problema es la sincronización. Si el Registro anota un fallecimiento pero nadie avisa al cartero, este seguirá intentando entregar cartas a una dirección que ya no existe. En RAG, eso se traduce en chunks indexados de documentos que ya fueron eliminados, editados o reemplazados — documentos fantasma que contaminan los resultados.

¿Cómo avisa el Registro al cartero?

  • Outbox pattern: cada vez que el Registro actualiza su libro mayor, apunta el cambio en una hoja de salida (outbox). Un empleado mensajero lee esa hoja periódicamente y actualiza la libreta del cartero. Garantizado, asíncrono, tolerante a fallos.
  • CDC con Debezium: el cartero tiene un teléfono directo conectado al Registro. Cada vez que el escribano apunta algo nuevo, el teléfono suena y el cartero actualiza su libreta en tiempo casi real.

El problema del consistency gap

En una arquitectura RAG naive, el flujo es:

  1. El usuario sube un documento → se inserta en Postgres con metadatos.
  2. Un worker lo trocea en chunks, genera embeddings y hace upsert en Qdrant.
  3. El retrieval usa Qdrant para encontrar chunks relevantes y Postgres para hidratar metadatos.

Hasta aquí todo bien. El problema aparece en las actualizaciones y borrados:

  • El usuario edita un documento → Postgres actualiza el registro, pero los vectores de los chunks viejos siguen en Qdrant. El retrieval devuelve contexto obsoleto.
  • El usuario borra un documento → Postgres elimina la fila, pero los chunks permanecen en Qdrant. El retrieval devuelve chunks de un documento que ya no debería existir.
  • El sistema de permisos revoca el acceso de un tenant → Qdrant no tiene forma de saberlo si no hay sincronización explícita.

Esto no es un problema teórico. En corpus vivos (wikis corporativas, bases de conocimiento actualizadas diariamente), el consistency gap acumula ruido progresivamente. Un estudio interno en pipelines de producción muestra que sin reconciliación activa, el 3-8% de los chunks indexados corresponde a documentos que ya no existen en la fuente de verdad tras 30 días de operación.

La solución no es “reindexar todo cada noche”. Con 10M chunks y un modelo de embedding no trivial, eso cuesta horas de cómputo y provoca ventanas de indisponibilidad. La solución es propagación de cambios con garantías.


Outbox pattern: la hoja de salida

Mecanismo

El outbox pattern resuelve el problema de “escribir a dos sistemas en la misma operación” sin necesidad de transacciones distribuidas (que son caras y frágiles).

La idea es simple: PostgreSQL es el coordinador único. Cuando el microservicio de ingestión procesa un documento, realiza dos escrituras en la misma transacción local:

  1. Inserta o actualiza el documento en la tabla documents.
  2. Inserta un evento en la tabla outbox_events.

Si la transacción falla, ambas escrituras se deshacen. Si tiene éxito, ambas están comprometidas atomicamente. No hay estado intermedio inconsistente.

-- Tablas relevantes
CREATE TABLE documents (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id   TEXT NOT NULL,
    title       TEXT NOT NULL,
    content     TEXT NOT NULL,
    checksum    TEXT NOT NULL,
    updated_at  TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE outbox_events (
    id           BIGSERIAL PRIMARY KEY,
    aggregate_id UUID NOT NULL,        -- document id
    event_type   TEXT NOT NULL,        -- 'document.created' | 'document.updated' | 'document.deleted'
    payload      JSONB NOT NULL,
    created_at   TIMESTAMPTZ DEFAULT now(),
    processed_at TIMESTAMPTZ           -- NULL = pendiente
);

-- Ejemplo de inserción atómica
BEGIN;
  INSERT INTO documents (tenant_id, title, content, checksum)
       VALUES ($1, $2, $3, $4)
  RETURNING id INTO _doc_id;

  INSERT INTO outbox_events (aggregate_id, event_type, payload)
       VALUES (_doc_id, 'document.created', jsonb_build_object(
                   'tenant_id', $1,
                   'title',     $2,
                   'checksum',  $4
               ));
COMMIT;

El worker de outbox

Un proceso separado (el outbox worker) hace polling de outbox_events donde processed_at IS NULL, procesa cada evento (chunking, embedding, upsert en Qdrant) y marca la fila como procesada:

UPDATE outbox_events
   SET processed_at = now()
 WHERE id = $1;

Garantía: at-least-once. Si el worker falla entre el upsert en Qdrant y el UPDATE, el evento se reprocesará. Qdrant tolera upserts idempotentes (misma id de punto = sobreescritura), así que el reprocesado no genera duplicados.

Latencia: depende del intervalo de polling. Con polling cada 500ms, la latencia p50 es ~250ms; p99, ~500ms. Aceptable para la mayoría de casos RAG donde el usuario no espera ver indexado un documento en menos de un segundo.


CDC con Debezium: el teléfono directo

Mecanismo

El Change Data Capture (CDC) lee el Write-Ahead Log (WAL) de PostgreSQL directamente. Postgres escribe cada cambio en el WAL antes de aplicarlo a las tablas — es el mecanismo que usa para replicación y recuperación. Debezium se suscribe a un slot de replicación lógica y convierte esos eventos en mensajes estructurados.

-- Habilitar replicación lógica en postgresql.conf
-- wal_level = logical

-- Crear slot de replicación para Debezium
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');

El flujo completo:

Postgres WAL → Debezium connector → Kafka/NATS → Indexer consumer → Qdrant

Debezium emite eventos con la estructura before/after del registro:

{
  "op": "d",
  "before": {
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "tenant_id": "acme",
    "checksum": "sha256:abc123"
  },
  "after": null
}

Con "op": "d" (delete), el consumer sabe que debe borrar todos los puntos en Qdrant cuyo payload contenga ese document_id.

# Consumer: borrado por filtro de payload
qdrant_client.delete(
    collection_name="corpus",
    points_selector=FilterSelector(
        filter=Filter(
            must=[
                FieldCondition(
                    key="document_id",
                    match=MatchValue(value=event["before"]["id"])
                )
            ]
        )
    )
)

Ventajas e inconvenientes

CDC elimina el polling y reduce la latencia a decenas de milisegundos (el tiempo de propagación del WAL más el procesamiento del consumer). Pero añade complejidad operacional: necesitas gestionar el slot de replicación (los slots no consumidos retienen WAL indefinidamente, lo que puede llenar el disco), el broker de mensajes y el estado del consumer offset.


Comparativa: outbox vs CDC

CriterioOutbox patternCDC con Debezium
Latencia típica250ms – 2s20ms – 200ms
Garantía de entregaAt-least-onceAt-least-once
Complejidad operacionalBaja (solo Postgres)Alta (Debezium + broker)
Riesgo de retención WALNingunoAlto si el slot se atasca
Idempotencia requeridaSí (en indexer)Sí (en consumer)
Soporte multi-tablaManualAutomático (cualquier tabla)
BackpressureNatural (polling)Requiere diseño explícito
Cuándo elegirCorpus < 100k docs/día, equipo pequeñoCorpus > 1M docs/día, baja latencia crítica

Regla práctica: empieza con outbox. Migra a CDC cuando el volumen de cambios supere los ~50k eventos/hora o cuando la latencia de segundos sea inaceptable para el caso de uso (e.g., indexación de noticias en tiempo real).


Arquitectura de microservicios

El pipeline de ingestión se compone de tres microservicios con responsabilidades bien separadas:

Arquitectura de microservicios de ingestión RAGDiagrama que muestra el flujo desde Postgres a través de Ingestor, Indexer y Reconciler hasta QdrantPostgreSQLdocumentsoutbox_eventsIngestorchunkingembeddingIndexeroutbox workerQdrant upsertReconcilerdiff periódicoQdrantcollectioncorpus4×H100 SXMbge-m3~2000 chunks/sKafka/NATS(CDC path)Flujo outbox:síncronoFlujo CDC:asyncReconciler:periódico

Microservicio 1: Ingestor

Responsabilidades: recibir documentos, trocearlos en chunks y solicitar embeddings. No escribe en Qdrant directamente.

# ingestor/main.py (simplificado)
from langchain_text_splitters import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,       # tokens, no caracteres
    chunk_overlap=64,
    length_function=token_count,
)

def ingest_document(doc: Document, db: Session) -> None:
    chunks = splitter.split_text(doc.content)
    with db.begin():
        db.execute(
            "UPDATE documents SET checksum=$1 WHERE id=$2",
            [doc.checksum, doc.id]
        )
        db.execute(
            """INSERT INTO outbox_events (aggregate_id, event_type, payload)
               VALUES ($1, 'document.updated', $2)""",
            [doc.id, {"chunks": chunks, "tenant_id": doc.tenant_id,
                      "model": "bge-m3", "model_version": "1.0.0"}]
        )

Microservicio 2: Indexer

Lee la outbox, genera embeddings llamando al servidor de inferencia (vLLM o TEI) y hace upsert en Qdrant.

# indexer/worker.py
import asyncio
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, VectorParams, Distance

qdrant = QdrantClient(host="qdrant-service", port=6333)

async def process_event(event: dict) -> None:
    payload = event["payload"]
    chunks   = payload["chunks"]
    doc_id   = str(event["aggregate_id"])

    # Embedding batch en TEI (Text Embeddings Inference)
    embeddings = await embed_batch(chunks, model="bge-m3")

    points = [
        PointStruct(
            id=f"{doc_id}_{i}",
            vector=emb,
            payload={
                "document_id": doc_id,
                "tenant_id":   payload["tenant_id"],
                "chunk_index": i,
                "text":        chunks[i],
                "model_version": payload["model_version"],
            }
        )
        for i, emb in enumerate(embeddings)
    ]

    # Si es actualización, borrar chunks anteriores primero
    if event["event_type"] in ("document.updated", "document.deleted"):
        qdrant.delete(
            collection_name="corpus",
            points_selector=filter_by_doc_id(doc_id)
        )

    if event["event_type"] != "document.deleted":
        qdrant.upsert(collection_name="corpus", points=points)

Microservicio 3: Reconciler

El reconciler es la red de seguridad. Periódicamente (por ejemplo, cada hora) compara el conjunto de document_id en Postgres con el conjunto de document_id en Qdrant. Los IDs presentes en Qdrant pero ausentes en Postgres son fantasmas: se borran.

# reconciler/diff.py
async def reconcile(tenant_id: str) -> int:
    pg_ids    = set(await fetch_all_doc_ids(tenant_id))
    qdrant_ids = set(await scroll_all_doc_ids(tenant_id))  # scroll paginado

    orphans = qdrant_ids - pg_ids
    if orphans:
        logger.warning("Orphan chunks for %d documents", len(orphans))
        for doc_id in orphans:
            qdrant.delete("corpus", filter_by_doc_id(doc_id))

    return len(orphans)

Matemáticas de throughput e ingestión

Throughput de embedding

El modelo bge-m3 (1024 dimensiones, soporte denso + sparse + colbert) en un nodo con 4×H100 SXM (320 GB NVLink) ejecutado vía vLLM o HuggingFace TEI alcanza aproximadamente 2.000 chunks/segundo con batch size = 256 y secuencias de 512 tokens.

$$\text{throughput} = 4 \times 500 \text{ chunks/s/GPU} = 2{,}000 \text{ chunks/s}$$

La cifra de 500 chunks/s por GPU proviene de benchmarks públicos de TEI con bge-m3 en H100 SXM5, batch=256, seq_len=512 1.

Tiempo de re-ingestión total

Para un corpus de 10M chunks:

$$t = \frac{10{,}000{,}000 \text{ chunks}}{2{,}000 \text{ chunks/s}} = 5{,}000 \text{ s} \approx 83 \text{ minutos}$$

Esto es el tiempo puro de embedding. Añadiendo latencia de escritura en Qdrant (~0,5ms por upsert batch de 100 puntos):

$$t_{\text{qdrant}} = \frac{10{,}000{,}000}{100} \times 0.5\text{ ms} = 50{,}000 \text{ ms} = 50 \text{ s}$$

Total estimado para una re-ingestión completa: ~85-90 minutos en un nodo 4×H100.

Coste de almacenamiento en Qdrant

Cada vector de bge-m3 tiene 1024 dimensiones en float32 (4 bytes):

$$\text{tamaño por vector} = 1{,}024 \times 4 \text{ B} = 4{,}096 \text{ B} = 4 \text{ KB}$$

Para 10M chunks (solo vectores densos):

$$\text{total vectores} = 10^7 \times 4{,}096 \text{ B} = 40.96 \text{ GB}$$

Añadiendo payload JSON (estimado ~500 bytes/chunk):

$$\text{payload} = 10^7 \times 500 \text{ B} = 5 \text{ GB}$$

Índice HNSW (aproximadamente 1.2× el tamaño del vector para $m=16$):

$$\text{HNSW} \approx 40.96 \text{ GB} \times 1.2 = 49.15 \text{ GB}$$

Total estimado en disco: ~95 GB para 10M chunks con bge-m3 denso.

Con scalar quantization (int8), el tamaño del vector se reduce 4×:

$$\text{con quantización int8} \approx \frac{40.96}{4} + 5 + \frac{49.15}{4} \approx 27.5 \text{ GB}$$

ConfiguraciónVectoresHNSWPayloadTotal
float32, sin quantización40.96 GB49.15 GB5 GB~95 GB
int8 scalar quantization10.24 GB12.29 GB5 GB~28 GB
binary quantization1.28 GB1.54 GB5 GB~8 GB

La binary quantization pierde precisión de recall (~2-5% en NDCG@10), pero permite alojar corpus mucho mayores en RAM. Para producción con recall crítico, int8 es el punto de equilibrio habitual.


Hardware on-premise recomendado

Para un pipeline de ingestión continua en producción:

Nodo de embedding: 4×H100 SXM (320 GB, NVLink), 2× CPU 64-core (EPYC 9654), 1 TB RAM DDR5, 100 GbE. Ejecuta vLLM o TEI sirviendo bge-m3. Throughput sostenido: ~2.000 chunks/s con pipeline batch asíncrono.

Nodo Qdrant: CPU 32-core, 256 GB RAM (para mantener el índice HNSW en memoria con 10M chunks sin quantización), NVMe 2 TB (escritura de snapshots y WAL de Qdrant). Qdrant recomienda que el índice HNSW quepa en RAM para latencia p99 < 5ms.

Nodo PostgreSQL: CPU 16-core, 128 GB RAM, NVMe 4 TB para WAL (especialmente relevante si usas CDC con slot de replicación lógica; el slot retiene WAL hasta que Debezium lo consume).

Broker (si CDC): Kafka 3-broker con 500 GB NVMe por nodo, o NATS JetStream con 3 nodos para cargas más modestas.


Manifests de Kubernetes

Deployment del Indexer

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rag-indexer
  namespace: rag-pipeline
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rag-indexer
  template:
    metadata:
      labels:
        app: rag-indexer
    spec:
      containers:
      - name: indexer
        image: registry.example.com/rag-indexer:1.0.0
        env:
        - name: POSTGRES_DSN
          valueFrom:
            secretKeyRef:
              name: pg-credentials
              key: dsn
        - name: QDRANT_HOST
          value: "qdrant-service.qdrant.svc.cluster.local"
        - name: EMBEDDING_ENDPOINT
          value: "http://tei-service.embeddings.svc.cluster.local:8080"
        - name: POLL_INTERVAL_MS
          value: "500"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2"

CronJob del Reconciler

apiVersion: batch/v1
kind: CronJob
metadata:
  name: rag-reconciler
  namespace: rag-pipeline
spec:
  schedule: "0 * * * *"   # cada hora
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
          - name: reconciler
            image: registry.example.com/rag-reconciler:1.0.0
            env:
            - name: POSTGRES_DSN
              valueFrom:
                secretKeyRef:
                  name: pg-credentials
                  key: dsn
            - name: QDRANT_HOST
              value: "qdrant-service.qdrant.svc.cluster.local"
            - name: SCROLL_PAGE_SIZE
              value: "1000"
            resources:
              requests:
                memory: "256Mi"
                cpu: "250m"
              limits:
                memory: "1Gi"
                cpu: "1"

Gotchas de producción

Reindexing al cambiar de modelo de embedding

Este es el problema más doloroso. Si pasas de bge-m3 a nomic-embed-text-v2, los vectores son incompatibles: están en espacios de embedding distintos y las distancias coseno entre ellos no tienen significado.

La solución es dual-index aliasing:

  1. Crea una colección nueva en Qdrant: corpus_v2.
  2. Re-embeds el corpus completo con el modelo nuevo y carga corpus_v2.
  3. Cuando la colección nueva está completa y validada (test de recall), cambia el alias de corpus_prod de corpus_v1 a corpus_v2.
  4. Borra corpus_v1 cuando el tráfico haya migrado.

Durante la migración, los dos índices coexisten. El retriever usa el alias, no el nombre directo de la colección.

Versioning del índice

Guarda model_version en el payload de cada punto en Qdrant. Esto permite:

  • Filtrar por versión durante el retrieval (útil en A/B testing de modelos).
  • El reconciler puede detectar puntos con versión antigua y reprocesarlos selectivamente.
  • Auditoría: saber con qué modelo se generó cada embedding.
# Filtro por versión de modelo en retrieval
results = qdrant.search(
    collection_name="corpus",
    query_vector=query_embedding,
    query_filter=Filter(
        must=[
            FieldCondition(key="model_version", match=MatchValue(value="1.0.0")),
            FieldCondition(key="tenant_id", match=MatchValue(value=tenant_id)),
        ]
    ),
    limit=10
)

Namespace por tenant (multi-tenancy)

Hay dos estrategias en Qdrant:

EstrategiaProsContras
Colección por tenantAislamiento total, sin filtro extraN colecciones = N índices HNSW en RAM
Payload filter por tenantUna sola colección, menos RAMFiltro añade ~10-15% de latencia en búsqueda

Para menos de 100 tenants con corpus grandes (> 1M chunks/tenant), usa colección por tenant. Para cientos o miles de tenants con corpus pequeños, usa payload filter con tenant_id indexado:

qdrant.create_payload_index(
    collection_name="corpus",
    field_name="tenant_id",
    field_schema=PayloadSchemaType.KEYWORD
)

Lo que no hemos cubierto

  • Streaming corpus updates con CDC en near-real-time: invalidación selectiva de chunks cuando solo una sección de un documento cambia (chunking incremental basado en diff de contenido, no re-chunking completo).
  • Multi-tenant corpus isolation con ACLs por chunk: ir más allá del filtro por tenant_id para permisos a nivel de grupo, rol o incluso documento individual, aplicados en tiempo de retrieval.
  • Federated corpus: corpora distribuidos en silos cross-border donde las regulaciones (GDPR, CCPA) impiden centralizar los embeddings; patrones de federated search sin mover datos.
  • Reindexing incremental con zero-downtime usando dual-index aliasing: el protocolo completo de migración de modelo con rollback, test de regresión de recall y traffic splitting progresivo.

Ver también


Referencias


  1. HuggingFace Text Embeddings Inference — benchmarks oficiales con modelos de la familia bge en hardware A100/H100. https://github.com/huggingface/text-embeddings-inference ↩︎