Tracing Haystack Pipelines: Observability for RAG and Document AI
Haystack (by deepset) builds AI pipelines from composable components — Embedder, Retriever, PromptBuilder, Generator. When a retriever returns empty results, an embedder cold-starts slowly, or prompt length creep degrades generation quality, you need per-component trace visibility to diagnose it. Here's how to instrument Haystack pipelines with Nexus.
What Haystack is
Haystack (by deepset) is a Python framework for building production-ready AI pipelines. Unlike agent frameworks that focus on autonomous decision-making, Haystack organizes work as a directed Pipeline of Components — each component handles a discrete step like document retrieval, embedding, re-ranking, or text generation. This makes Haystack particularly well-suited for RAG (retrieval-augmented generation) and document AI workloads where the data flow is deterministic and repeatable.
A typical Haystack RAG pipeline looks like this:
- Embedder — converts the user query into a vector embedding
- Retriever — fetches the top-k documents from a vector store
- PromptBuilder — assembles a prompt from the retrieved documents and query
- Generator — calls an LLM to produce the final answer
Each component runs synchronously in sequence, passing its output as the named input of the next. That determinism is a feature — but it also means pipeline failures are silent by default: if the retriever returns zero documents, the generator sees an empty context and quietly produces a hallucinated answer.
Observability blind spots in Haystack pipelines
Without instrumentation, three failure modes are invisible:
- Empty retrieval: The retriever returns zero documents because the query embedding is too far from your document corpus, or because the document store is stale. The generator then answers from parametric knowledge — which looks correct but isn’t grounded. Without a span recording
retrieved_doc_count, you can’t distinguish grounded answers from hallucinations in aggregate. - Slow components: A single slow re-ranker or embedding model adds hundreds of milliseconds to every request. Without per-component latency spans, you only see total pipeline latency — not which component is the bottleneck.
- Generation quality drift: The LLM’s output quality can degrade as your prompt template grows, as retrieved documents get noisier, or as the underlying model is updated. Without recording output length and metadata, quality drift is invisible until users complain.
Mapping Haystack components to Nexus spans
The cleanest tracing strategy for Haystack is to wrap the entire pipeline.run() call in a top-level trace and add one span per component with the component name, latency, and key output metadata.
Install the dependencies first:
pip install haystack-ai nexus-sdk
Here is a complete instrumented RAG pipeline:
import os
import time
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
from nexus_sdk import NexusClient
nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
# Build pipeline
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="Haystack is a framework for building AI pipelines."),
Document(content="Nexus provides trace observability for AI systems."),
Document(content="RAG pipelines combine retrieval with generation."),
])
embedder = SentenceTransformersTextEmbedder()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(template="""
Given the context below, answer the question.
Context: {% for doc in documents %}{{ doc.content }}{% endfor %}
Question: {{ query }}
Answer:""")
generator = OpenAIGenerator(model="gpt-4o-mini")
pipeline = Pipeline()
pipeline.add_component("embedder", embedder)
pipeline.add_component("retriever", retriever)
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("generator", generator)
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "generator.prompt")
pipeline.warm_up()
def run_with_tracing(query: str, user_id: str) -> str:
trace = nexus.start_trace({
"agent_id": "haystack-rag-pipeline",
"name": f"rag: {query[:60]}",
"status": "running",
"started_at": nexus.now(),
"metadata": {"user_id": user_id, "query": query[:300]},
})
trace_id = trace["trace_id"]
pipeline_start = time.time()
try:
# Embedder span
t0 = time.time()
embedding_result = embedder.run(text=query)
embed_ms = int((time.time() - t0) * 1000)
nexus.add_span(trace_id, {
"name": "component:embedder",
"started_at": nexus.now(),
"status": "success",
"latency_ms": embed_ms,
"metadata": {"model": "sentence-transformers", "embed_dim": len(embedding_result["embedding"])},
})
# Retriever span
t0 = time.time()
retrieval_result = retriever.run(query_embedding=embedding_result["embedding"])
retrieve_ms = int((time.time() - t0) * 1000)
docs = retrieval_result["documents"]
nexus.add_span(trace_id, {
"name": "component:retriever",
"started_at": nexus.now(),
"status": "success" if docs else "warning",
"latency_ms": retrieve_ms,
"metadata": {
"retrieved_doc_count": len(docs),
"top_score": round(docs[0].score, 4) if docs and docs[0].score else None,
},
})
# PromptBuilder span
t0 = time.time()
prompt_result = prompt_builder.run(documents=docs, query=query)
prompt_ms = int((time.time() - t0) * 1000)
prompt_text = prompt_result["prompt"]
nexus.add_span(trace_id, {
"name": "component:prompt_builder",
"started_at": nexus.now(),
"status": "success",
"latency_ms": prompt_ms,
"metadata": {"prompt_length": len(prompt_text), "doc_count": len(docs)},
})
# Generator span
t0 = time.time()
gen_result = generator.run(prompt=prompt_text)
gen_ms = int((time.time() - t0) * 1000)
reply = gen_result["replies"][0] if gen_result.get("replies") else ""
nexus.add_span(trace_id, {
"name": "component:generator",
"started_at": nexus.now(),
"status": "success",
"latency_ms": gen_ms,
"metadata": {
"model": "gpt-4o-mini",
"output_length": len(reply),
"grounded": len(docs) > 0,
},
})
total_ms = int((time.time() - pipeline_start) * 1000)
nexus.end_trace(trace_id, {
"status": "success",
"latency_ms": total_ms,
"metadata": {"retrieved_doc_count": len(docs), "output_length": len(reply)},
})
return reply
except Exception as e:
nexus.end_trace(trace_id, {
"status": "error",
"latency_ms": int((time.time() - pipeline_start) * 1000),
"error": str(e),
})
raise
Each component records its own latency_ms, so the Nexus UI shows a breakdown like component:embedder → component:retriever → component:prompt_builder → component:generator within a single trace.
Using pipeline.run() directly
If you prefer Haystack’s built-in pipeline orchestration over calling components individually, you can still instrument at the pipeline level. Wrap pipeline.run() with a top-level trace and extract component timing from the result metadata:
def run_pipeline_with_tracing(query: str, user_id: str) -> str:
trace = nexus.start_trace({
"agent_id": "haystack-rag-pipeline",
"name": f"rag: {query[:60]}",
"status": "running",
"started_at": nexus.now(),
"metadata": {"user_id": user_id, "query": query[:300]},
})
trace_id = trace["trace_id"]
t0 = time.time()
try:
result = pipeline.run({
"embedder": {"text": query},
"prompt_builder": {"query": query},
})
elapsed_ms = int((time.time() - t0) * 1000)
replies = result.get("generator", {}).get("replies", [])
reply = replies[0] if replies else ""
docs = result.get("retriever", {}).get("documents", [])
nexus.end_trace(trace_id, {
"status": "success",
"latency_ms": elapsed_ms,
"metadata": {
"retrieved_doc_count": len(docs),
"output_length": len(reply),
"grounded": len(docs) > 0,
},
})
return reply
except Exception as e:
nexus.end_trace(trace_id, {
"status": "error",
"latency_ms": int((time.time() - t0) * 1000),
"error": str(e),
})
raise
This approach is simpler but gives you only one span per pipeline run. Choose the per-component approach if you need to diagnose which component is slow; use pipeline.run() wrapping when you want minimal code changes and only care about end-to-end latency.
Monitoring retrieval quality
Retrieval quality is the most important signal in a RAG pipeline. Poor retrieval means your generator is working with the wrong context — and the answer will sound plausible while being wrong. Track three metrics in your retriever span:
- retrieved_doc_count: How many documents came back. Zero means the query is out-of-distribution or your document store is empty — alert on this immediately.
- top_score: The cosine similarity of the best match. If your retriever returns 5 documents but the top score is 0.45, the context is likely noisy. A threshold of ~0.70 is a reasonable floor for most embedding models.
- grounded flag: A boolean recorded in the trace that tells you whether the generator had any retrieved context. Correlate
grounded: falsetraces with negative user feedback to quantify how often empty retrieval leads to bad answers.
# Extended retriever span with quality signals
docs = retrieval_result["documents"]
scores = [d.score for d in docs if d.score is not None]
nexus.add_span(trace_id, {
"name": "component:retriever",
"started_at": nexus.now(),
"status": "success" if docs else "warning",
"latency_ms": retrieve_ms,
"metadata": {
"retrieved_doc_count": len(docs),
"top_score": round(max(scores), 4) if scores else None,
"avg_score": round(sum(scores) / len(scores), 4) if scores else None,
"low_quality": bool(scores and max(scores) < 0.70),
"grounded": len(docs) > 0,
},
})
Monitoring generation quality
Generation quality is harder to measure automatically, but two proxy signals are useful:
- output_length: Suspiciously short outputs (under 20 characters) often indicate a refusal, an empty response, or a model timeout. Track
output_length < 20as a quality failure signal. - grounded correlation: Compare average output length between
grounded: trueandgrounded: falsetraces. A large gap confirms that empty retrieval is degrading generation quality.
# Generator span with quality signals
reply = gen_result["replies"][0] if gen_result.get("replies") else ""
word_count = len(reply.split())
nexus.add_span(trace_id, {
"name": "component:generator",
"started_at": nexus.now(),
"status": "success" if reply else "warning",
"latency_ms": gen_ms,
"metadata": {
"model": "gpt-4o-mini",
"output_length": len(reply),
"word_count": word_count,
"empty_response": len(reply.strip()) == 0,
"short_response": word_count < 5,
"grounded": len(docs) > 0,
},
})
Debugging failed pipeline runs
When a Haystack pipeline fails, the exception often surfaces at the generator level but originates in an earlier component. Common failure patterns and how to spot them in your traces:
- Embedder timeout: The embedding model call hangs or times out. Look for traces where
component:embedderlatency exceeds your SLA (typically 2s for fast models). If the span is missing entirely, the embedder threw before Nexus could record it — check your exception handler. - Retriever key error: If your document store schema changed but your retriever query didn’t, you get a
KeyErrorinsideretriever.run(). The trace will show an error status on the retriever span with the exception string in metadata. - Generator rate limit: OpenAI and Anthropic rate-limit errors raise inside the generator. The trace status is
errorat both the generator span and the top-level trace. A spike in generator errors without a corresponding spike in retriever errors confirms the issue is downstream of retrieval.
def run_with_component_error_handling(query: str, user_id: str) -> str:
trace = nexus.start_trace({
"agent_id": "haystack-rag-pipeline",
"name": f"rag: {query[:60]}",
"status": "running",
"started_at": nexus.now(),
"metadata": {"user_id": user_id},
})
trace_id = trace["trace_id"]
t0 = time.time()
try:
t_embed = time.time()
embedding_result = embedder.run(text=query)
nexus.add_span(trace_id, {
"name": "component:embedder",
"started_at": nexus.now(),
"status": "success",
"latency_ms": int((time.time() - t_embed) * 1000),
"metadata": {},
})
except Exception as e:
nexus.end_trace(trace_id, {
"status": "error",
"latency_ms": int((time.time() - t0) * 1000),
"error": f"embedder failed: {e}",
})
raise
try:
t_ret = time.time()
retrieval_result = retriever.run(query_embedding=embedding_result["embedding"])
docs = retrieval_result["documents"]
nexus.add_span(trace_id, {
"name": "component:retriever",
"started_at": nexus.now(),
"status": "success" if docs else "warning",
"latency_ms": int((time.time() - t_ret) * 1000),
"metadata": {"retrieved_doc_count": len(docs)},
})
except Exception as e:
nexus.end_trace(trace_id, {
"status": "error",
"latency_ms": int((time.time() - t0) * 1000),
"error": f"retriever failed: {e}",
})
raise
try:
t_prompt = time.time()
prompt_result = prompt_builder.run(documents=docs, query=query)
t_gen = time.time()
gen_result = generator.run(prompt=prompt_result["prompt"])
reply = gen_result["replies"][0] if gen_result.get("replies") else ""
nexus.add_span(trace_id, {
"name": "component:generator",
"started_at": nexus.now(),
"status": "success" if reply else "warning",
"latency_ms": int((time.time() - t_gen) * 1000),
"metadata": {"output_length": len(reply)},
})
nexus.end_trace(trace_id, {
"status": "success",
"latency_ms": int((time.time() - t0) * 1000),
"metadata": {"retrieved_doc_count": len(docs), "output_length": len(reply)},
})
return reply
except Exception as e:
nexus.end_trace(trace_id, {
"status": "error",
"latency_ms": int((time.time() - t0) * 1000),
"error": f"generation failed: {e}",
})
raise
With per-component error handling, a retriever failure produces a trace with component:embedder succeeding and no generator span — making the failure location unambiguous in the Nexus UI.
What to watch for in production
Once traces are flowing from your Haystack pipelines, three failure patterns appear most often:
- Retrieval quality degradation: As your document corpus grows or document embeddings age, retrieval scores drift downward. Set an alert on
avg top_score < 0.65over a rolling 100-trace window to catch this before users notice. - Embedder cold start latency: Sentence-transformers models have a warm-up cost the first time they load. If your pipeline isn’t warmed up, the first few requests will spike. Look for
component:embedderlatency > 5x median as a cold start signal. - Prompt length creep: As your retrieved context grows (larger top-k, longer documents), the prompt sent to the generator grows with it. Track
prompt_lengthover time — once it approaches your model’s context limit, generation quality degrades and costs spike.
Next steps
Haystack’s component model maps cleanly onto Nexus spans — one span per component, one trace per pipeline run. That structure gives you the retrieval and generation quality signals you need to debug grounding failures, diagnose slow components, and catch quality drift before it reaches users. Sign up for a free Nexus account to start capturing traces from your Haystack pipelines today.
Add observability to Haystack pipelines
Free tier, no credit card required. Full trace visibility in under 5 minutes.