Haystack Integration
Haystack (deepset) is a Python framework for building production-ready RAG pipelines and document AI applications. This guide shows how to add full distributed tracing with Nexus: per-run traces, per-component spans (Embedder, Retriever, PromptBuilder, Generator), retrieval quality monitoring, and debugging failed pipeline runs.
Installation
pip install keylightdigital-nexus haystack-ai
Get your API key from Dashboard → API Keys and set it as an environment variable:
export NEXUS_API_KEY="nxs_your_key_here"
Basic pipeline trace
Wrap each pipeline.run() call in a Nexus trace. Every run appears in the dashboard with its status, duration, and metadata:
import os
from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from nexus_sdk import NexusClient
nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
# Build a simple pipeline
pipeline = Pipeline()
pipeline.add_component("prompt_builder", PromptBuilder(template="Answer this question: {{question}}"))
pipeline.add_component("generator", OpenAIGenerator(model="gpt-4o-mini"))
pipeline.connect("prompt_builder", "generator")
def run_pipeline(question: str) -> str:
trace = nexus.start_trace({
"agent_id": "haystack-qa-pipeline",
"name": f"pipeline: {question[:60]}",
"status": "running",
"started_at": nexus.now(),
"metadata": {
"question": question[:200],
"model": "gpt-4o-mini",
},
})
try:
result = pipeline.run({"prompt_builder": {"question": question}})
answer = result["generator"]["replies"][0]
nexus.end_trace(trace["trace_id"], {"status": "success"})
return answer
except Exception as e:
nexus.end_trace(trace["trace_id"], {
"status": "error",
"metadata": {"error": str(e)},
})
raise
Component spans
Haystack pipelines are composed of discrete components. Wrap each component invocation in a Nexus span to measure per-step latency and capture inputs and outputs. The pattern is to run components manually within spans rather than letting the pipeline auto-run them:
import os
from haystack import Pipeline, Document
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from nexus_sdk import NexusClient
nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
# Set up components
embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
embedder.warm_up()
document_store = InMemoryDocumentStore()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(
template="Given these documents:\n{% for doc in documents %}{{ doc.content }}\n{% endfor %}\nAnswer: {{question}}"
)
generator = OpenAIGenerator(model="gpt-4o-mini")
def run_rag_with_spans(question: str, trace_id: str) -> str:
# Embedder span
embed_span = nexus.start_span(trace_id, {
"name": "component:embedder",
"type": "tool",
"metadata": {"query": question[:200], "model": "all-MiniLM-L6-v2"},
})
embedding_result = embedder.run(text=question)
nexus.end_span(embed_span["id"], {
"output": f"embedding dim={len(embedding_result['embedding'])}",
})
# Retriever span
retriever_span = nexus.start_span(trace_id, {
"name": "component:retriever",
"type": "tool",
"metadata": {"top_k": 3},
})
retrieval_result = retriever.run(query_embedding=embedding_result["embedding"], top_k=3)
docs = retrieval_result["documents"]
nexus.end_span(retriever_span["id"], {
"output": f"retrieved {len(docs)} documents",
"metadata": {"doc_count": len(docs)},
})
# PromptBuilder span
prompt_span = nexus.start_span(trace_id, {
"name": "component:prompt_builder",
"type": "tool",
"metadata": {"doc_count": len(docs)},
})
prompt_result = prompt_builder.run(question=question, documents=docs)
nexus.end_span(prompt_span["id"], {
"output": f"prompt length={len(prompt_result['prompt'])} chars",
})
# Generator span
gen_span = nexus.start_span(trace_id, {
"name": "component:generator",
"type": "tool",
"metadata": {"model": "gpt-4o-mini"},
})
gen_result = generator.run(prompt=prompt_result["prompt"])
answer = gen_result["replies"][0]
nexus.end_span(gen_span["id"], {
"output": answer[:500],
})
return answer
Monitoring retrieval quality
Record retrieved document count and relevance scores as span metadata to monitor retrieval quality over time. Low scores often explain poor answers before you even look at the generated text:
import os
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from nexus_sdk import NexusClient
nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
document_store = InMemoryDocumentStore()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
def retrieve_with_quality_span(query_embedding: list, trace_id: str, top_k: int = 5):
retriever_span = nexus.start_span(trace_id, {
"name": "component:retriever",
"type": "tool",
"metadata": {"top_k": top_k},
})
result = retriever.run(query_embedding=query_embedding, top_k=top_k)
docs = result["documents"]
scores = [doc.score for doc in docs if doc.score is not None]
avg_score = sum(scores) / len(scores) if scores else 0.0
top_score = max(scores) if scores else 0.0
nexus.end_span(retriever_span["id"], {
"output": f"retrieved {len(docs)} docs, avg_score={avg_score:.3f}",
"metadata": {
"doc_count": len(docs),
"avg_score": round(avg_score, 4),
"top_score": round(top_score, 4),
"low_quality": avg_score < 0.5,
},
})
return docs
Filter by low_quality: true in the Nexus dashboard to surface runs where retrieval underperformed — these are usually your worst answers.
Full example
A complete RAG pipeline combining a trace, per-component spans, and retrieval quality monitoring in a single runnable script:
import os
from haystack import Document
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from nexus_sdk import NexusClient
nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
# ----- Setup -----
document_store = InMemoryDocumentStore()
# Index some documents
doc_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
doc_embedder.warm_up()
docs = [
Document(content="Haystack is an open-source framework for building NLP pipelines."),
Document(content="Nexus provides observability for AI agents and LLM pipelines."),
Document(content="RAG combines retrieval with generation for grounded answers."),
]
embedded_docs = doc_embedder.run(documents=docs)
document_store.write_documents(embedded_docs["documents"])
text_embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
text_embedder.warm_up()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(
template="Context:\n{% for doc in documents %}{{ doc.content }}\n{% endfor %}\nQuestion: {{question}}\nAnswer:"
)
generator = OpenAIGenerator(model="gpt-4o-mini")
# ----- Traced runner -----
def run(question: str) -> str:
trace = nexus.start_trace({
"agent_id": "haystack-rag-pipeline",
"name": f"rag: {question[:60]}",
"status": "running",
"started_at": nexus.now(),
"metadata": {"question": question[:200]},
})
try:
embed_span = nexus.start_span(trace["trace_id"], {
"name": "component:embedder", "type": "tool",
"metadata": {"query": question[:200]},
})
embed_result = text_embedder.run(text=question)
nexus.end_span(embed_span["id"], {"output": f"dim={len(embed_result['embedding'])}"})
retriever_span = nexus.start_span(trace["trace_id"], {
"name": "component:retriever", "type": "tool",
"metadata": {"top_k": 3},
})
retrieval_result = retriever.run(query_embedding=embed_result["embedding"], top_k=3)
docs = retrieval_result["documents"]
scores = [d.score for d in docs if d.score is not None]
avg_score = sum(scores) / len(scores) if scores else 0.0
nexus.end_span(retriever_span["id"], {
"output": f"{len(docs)} docs, avg_score={avg_score:.3f}",
"metadata": {"doc_count": len(docs), "avg_score": round(avg_score, 4)},
})
prompt_span = nexus.start_span(trace["trace_id"], {
"name": "component:prompt_builder", "type": "tool", "metadata": {},
})
prompt_result = prompt_builder.run(question=question, documents=docs)
nexus.end_span(prompt_span["id"], {"output": f"len={len(prompt_result['prompt'])}"})
gen_span = nexus.start_span(trace["trace_id"], {
"name": "component:generator", "type": "tool",
"metadata": {"model": "gpt-4o-mini"},
})
gen_result = generator.run(prompt=prompt_result["prompt"])
answer = gen_result["replies"][0]
nexus.end_span(gen_span["id"], {"output": answer[:500]})
nexus.end_trace(trace["trace_id"], {"status": "success"})
return answer
except Exception as e:
nexus.end_trace(trace["trace_id"], {
"status": "error",
"metadata": {"error": str(e)},
})
raise
if __name__ == "__main__":
print(run("What is Haystack?"))
print(run("How does RAG work?"))
Debugging patterns
Pipeline returns empty or irrelevant answers
Check the component:retriever span in the trace waterfall. If doc_count is 0 or avg_score is below 0.3, the embedder or document store is the culprit — not the generator. Check that documents were embedded with the same model used for query embedding.
Component spans not closing on failure
A span stays open if an exception is thrown before nexus.end_span(). Always wrap each component call in its own try/except, or use a context-manager helper, so spans close with an error status rather than hanging indefinitely.
Traces stuck in “running” state
A trace stays running if nexus.end_trace() is never called. Always wrap pipeline.run() in a try/except/finally block so the trace closes even when an exception propagates out of a component.
Ready to instrument your Haystack pipelines?
Start for free — no credit card required. See traces in under 5 minutes.
Start free →