2026-04-21 · 9 min read

Tracing Haystack Pipelines: Observability for RAG and Document AI

Haystack (by deepset) builds AI pipelines from composable components — Embedder, Retriever, PromptBuilder, Generator. When a retriever returns empty results, an embedder cold-starts slowly, or prompt length creep degrades generation quality, you need per-component trace visibility to diagnose it. Here's how to instrument Haystack pipelines with Nexus.

What Haystack is

Haystack (by deepset) is a Python framework for building production-ready AI pipelines. Unlike agent frameworks that focus on autonomous decision-making, Haystack organizes work as a directed Pipeline of Components — each component handles a discrete step like document retrieval, embedding, re-ranking, or text generation. This makes Haystack particularly well-suited for RAG (retrieval-augmented generation) and document AI workloads where the data flow is deterministic and repeatable.

A typical Haystack RAG pipeline looks like this:

  1. Embedder — converts the user query into a vector embedding
  2. Retriever — fetches the top-k documents from a vector store
  3. PromptBuilder — assembles a prompt from the retrieved documents and query
  4. Generator — calls an LLM to produce the final answer

Each component runs synchronously in sequence, passing its output as the named input of the next. That determinism is a feature — but it also means pipeline failures are silent by default: if the retriever returns zero documents, the generator sees an empty context and quietly produces a hallucinated answer.

Observability blind spots in Haystack pipelines

Without instrumentation, three failure modes are invisible:

Mapping Haystack components to Nexus spans

The cleanest tracing strategy for Haystack is to wrap the entire pipeline.run() call in a top-level trace and add one span per component with the component name, latency, and key output metadata.

Install the dependencies first:

pip install haystack-ai nexus-sdk

Here is a complete instrumented RAG pipeline:

import os
import time
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
from nexus_sdk import NexusClient

nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])

# Build pipeline
document_store = InMemoryDocumentStore()
document_store.write_documents([
    Document(content="Haystack is a framework for building AI pipelines."),
    Document(content="Nexus provides trace observability for AI systems."),
    Document(content="RAG pipelines combine retrieval with generation."),
])

embedder = SentenceTransformersTextEmbedder()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(template="""
Given the context below, answer the question.
Context: {% for doc in documents %}{{ doc.content }}{% endfor %}
Question: {{ query }}
Answer:""")
generator = OpenAIGenerator(model="gpt-4o-mini")

pipeline = Pipeline()
pipeline.add_component("embedder", embedder)
pipeline.add_component("retriever", retriever)
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("generator", generator)
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "generator.prompt")

pipeline.warm_up()

def run_with_tracing(query: str, user_id: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "haystack-rag-pipeline",
        "name": f"rag: {query[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {"user_id": user_id, "query": query[:300]},
    })
    trace_id = trace["trace_id"]
    pipeline_start = time.time()

    try:
        # Embedder span
        t0 = time.time()
        embedding_result = embedder.run(text=query)
        embed_ms = int((time.time() - t0) * 1000)
        nexus.add_span(trace_id, {
            "name": "component:embedder",
            "started_at": nexus.now(),
            "status": "success",
            "latency_ms": embed_ms,
            "metadata": {"model": "sentence-transformers", "embed_dim": len(embedding_result["embedding"])},
        })

        # Retriever span
        t0 = time.time()
        retrieval_result = retriever.run(query_embedding=embedding_result["embedding"])
        retrieve_ms = int((time.time() - t0) * 1000)
        docs = retrieval_result["documents"]
        nexus.add_span(trace_id, {
            "name": "component:retriever",
            "started_at": nexus.now(),
            "status": "success" if docs else "warning",
            "latency_ms": retrieve_ms,
            "metadata": {
                "retrieved_doc_count": len(docs),
                "top_score": round(docs[0].score, 4) if docs and docs[0].score else None,
            },
        })

        # PromptBuilder span
        t0 = time.time()
        prompt_result = prompt_builder.run(documents=docs, query=query)
        prompt_ms = int((time.time() - t0) * 1000)
        prompt_text = prompt_result["prompt"]
        nexus.add_span(trace_id, {
            "name": "component:prompt_builder",
            "started_at": nexus.now(),
            "status": "success",
            "latency_ms": prompt_ms,
            "metadata": {"prompt_length": len(prompt_text), "doc_count": len(docs)},
        })

        # Generator span
        t0 = time.time()
        gen_result = generator.run(prompt=prompt_text)
        gen_ms = int((time.time() - t0) * 1000)
        reply = gen_result["replies"][0] if gen_result.get("replies") else ""
        nexus.add_span(trace_id, {
            "name": "component:generator",
            "started_at": nexus.now(),
            "status": "success",
            "latency_ms": gen_ms,
            "metadata": {
                "model": "gpt-4o-mini",
                "output_length": len(reply),
                "grounded": len(docs) > 0,
            },
        })

        total_ms = int((time.time() - pipeline_start) * 1000)
        nexus.end_trace(trace_id, {
            "status": "success",
            "latency_ms": total_ms,
            "metadata": {"retrieved_doc_count": len(docs), "output_length": len(reply)},
        })
        return reply

    except Exception as e:
        nexus.end_trace(trace_id, {
            "status": "error",
            "latency_ms": int((time.time() - pipeline_start) * 1000),
            "error": str(e),
        })
        raise

Each component records its own latency_ms, so the Nexus UI shows a breakdown like component:embedder → component:retriever → component:prompt_builder → component:generator within a single trace.

Using pipeline.run() directly

If you prefer Haystack’s built-in pipeline orchestration over calling components individually, you can still instrument at the pipeline level. Wrap pipeline.run() with a top-level trace and extract component timing from the result metadata:

def run_pipeline_with_tracing(query: str, user_id: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "haystack-rag-pipeline",
        "name": f"rag: {query[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {"user_id": user_id, "query": query[:300]},
    })
    trace_id = trace["trace_id"]
    t0 = time.time()

    try:
        result = pipeline.run({
            "embedder": {"text": query},
            "prompt_builder": {"query": query},
        })
        elapsed_ms = int((time.time() - t0) * 1000)

        replies = result.get("generator", {}).get("replies", [])
        reply = replies[0] if replies else ""
        docs = result.get("retriever", {}).get("documents", [])

        nexus.end_trace(trace_id, {
            "status": "success",
            "latency_ms": elapsed_ms,
            "metadata": {
                "retrieved_doc_count": len(docs),
                "output_length": len(reply),
                "grounded": len(docs) > 0,
            },
        })
        return reply

    except Exception as e:
        nexus.end_trace(trace_id, {
            "status": "error",
            "latency_ms": int((time.time() - t0) * 1000),
            "error": str(e),
        })
        raise

This approach is simpler but gives you only one span per pipeline run. Choose the per-component approach if you need to diagnose which component is slow; use pipeline.run() wrapping when you want minimal code changes and only care about end-to-end latency.

Monitoring retrieval quality

Retrieval quality is the most important signal in a RAG pipeline. Poor retrieval means your generator is working with the wrong context — and the answer will sound plausible while being wrong. Track three metrics in your retriever span:

# Extended retriever span with quality signals
docs = retrieval_result["documents"]
scores = [d.score for d in docs if d.score is not None]

nexus.add_span(trace_id, {
    "name": "component:retriever",
    "started_at": nexus.now(),
    "status": "success" if docs else "warning",
    "latency_ms": retrieve_ms,
    "metadata": {
        "retrieved_doc_count": len(docs),
        "top_score": round(max(scores), 4) if scores else None,
        "avg_score": round(sum(scores) / len(scores), 4) if scores else None,
        "low_quality": bool(scores and max(scores) < 0.70),
        "grounded": len(docs) > 0,
    },
})

Monitoring generation quality

Generation quality is harder to measure automatically, but two proxy signals are useful:

# Generator span with quality signals
reply = gen_result["replies"][0] if gen_result.get("replies") else ""
word_count = len(reply.split())

nexus.add_span(trace_id, {
    "name": "component:generator",
    "started_at": nexus.now(),
    "status": "success" if reply else "warning",
    "latency_ms": gen_ms,
    "metadata": {
        "model": "gpt-4o-mini",
        "output_length": len(reply),
        "word_count": word_count,
        "empty_response": len(reply.strip()) == 0,
        "short_response": word_count < 5,
        "grounded": len(docs) > 0,
    },
})

Debugging failed pipeline runs

When a Haystack pipeline fails, the exception often surfaces at the generator level but originates in an earlier component. Common failure patterns and how to spot them in your traces:

def run_with_component_error_handling(query: str, user_id: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "haystack-rag-pipeline",
        "name": f"rag: {query[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {"user_id": user_id},
    })
    trace_id = trace["trace_id"]
    t0 = time.time()

    try:
        t_embed = time.time()
        embedding_result = embedder.run(text=query)
        nexus.add_span(trace_id, {
            "name": "component:embedder",
            "started_at": nexus.now(),
            "status": "success",
            "latency_ms": int((time.time() - t_embed) * 1000),
            "metadata": {},
        })
    except Exception as e:
        nexus.end_trace(trace_id, {
            "status": "error",
            "latency_ms": int((time.time() - t0) * 1000),
            "error": f"embedder failed: {e}",
        })
        raise

    try:
        t_ret = time.time()
        retrieval_result = retriever.run(query_embedding=embedding_result["embedding"])
        docs = retrieval_result["documents"]
        nexus.add_span(trace_id, {
            "name": "component:retriever",
            "started_at": nexus.now(),
            "status": "success" if docs else "warning",
            "latency_ms": int((time.time() - t_ret) * 1000),
            "metadata": {"retrieved_doc_count": len(docs)},
        })
    except Exception as e:
        nexus.end_trace(trace_id, {
            "status": "error",
            "latency_ms": int((time.time() - t0) * 1000),
            "error": f"retriever failed: {e}",
        })
        raise

    try:
        t_prompt = time.time()
        prompt_result = prompt_builder.run(documents=docs, query=query)
        t_gen = time.time()
        gen_result = generator.run(prompt=prompt_result["prompt"])
        reply = gen_result["replies"][0] if gen_result.get("replies") else ""
        nexus.add_span(trace_id, {
            "name": "component:generator",
            "started_at": nexus.now(),
            "status": "success" if reply else "warning",
            "latency_ms": int((time.time() - t_gen) * 1000),
            "metadata": {"output_length": len(reply)},
        })
        nexus.end_trace(trace_id, {
            "status": "success",
            "latency_ms": int((time.time() - t0) * 1000),
            "metadata": {"retrieved_doc_count": len(docs), "output_length": len(reply)},
        })
        return reply

    except Exception as e:
        nexus.end_trace(trace_id, {
            "status": "error",
            "latency_ms": int((time.time() - t0) * 1000),
            "error": f"generation failed: {e}",
        })
        raise

With per-component error handling, a retriever failure produces a trace with component:embedder succeeding and no generator span — making the failure location unambiguous in the Nexus UI.

What to watch for in production

Once traces are flowing from your Haystack pipelines, three failure patterns appear most often:

Next steps

Haystack’s component model maps cleanly onto Nexus spans — one span per component, one trace per pipeline run. That structure gives you the retrieval and generation quality signals you need to debug grounding failures, diagnose slow components, and catch quality drift before it reaches users. Sign up for a free Nexus account to start capturing traces from your Haystack pipelines today.

Add observability to Haystack pipelines

Free tier, no credit card required. Full trace visibility in under 5 minutes.