Docs Haystack

Haystack Integration

Haystack (deepset) is a Python framework for building production-ready RAG pipelines and document AI applications. This guide shows how to add full distributed tracing with Nexus: per-run traces, per-component spans (Embedder, Retriever, PromptBuilder, Generator), retrieval quality monitoring, and debugging failed pipeline runs.

Installation

pip install keylightdigital-nexus haystack-ai

Get your API key from Dashboard → API Keys and set it as an environment variable:

export NEXUS_API_KEY="nxs_your_key_here"

Basic pipeline trace

Wrap each pipeline.run() call in a Nexus trace. Every run appears in the dashboard with its status, duration, and metadata:

import os
from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from nexus_sdk import NexusClient

nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])

# Build a simple pipeline
pipeline = Pipeline()
pipeline.add_component("prompt_builder", PromptBuilder(template="Answer this question: {{question}}"))
pipeline.add_component("generator", OpenAIGenerator(model="gpt-4o-mini"))
pipeline.connect("prompt_builder", "generator")

def run_pipeline(question: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "haystack-qa-pipeline",
        "name": f"pipeline: {question[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {
            "question": question[:200],
            "model": "gpt-4o-mini",
        },
    })
    try:
        result = pipeline.run({"prompt_builder": {"question": question}})
        answer = result["generator"]["replies"][0]
        nexus.end_trace(trace["trace_id"], {"status": "success"})
        return answer
    except Exception as e:
        nexus.end_trace(trace["trace_id"], {
            "status": "error",
            "metadata": {"error": str(e)},
        })
        raise

Component spans

Haystack pipelines are composed of discrete components. Wrap each component invocation in a Nexus span to measure per-step latency and capture inputs and outputs. The pattern is to run components manually within spans rather than letting the pipeline auto-run them:

import os
from haystack import Pipeline, Document
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from nexus_sdk import NexusClient

nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])

# Set up components
embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
embedder.warm_up()
document_store = InMemoryDocumentStore()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(
    template="Given these documents:\n{% for doc in documents %}{{ doc.content }}\n{% endfor %}\nAnswer: {{question}}"
)
generator = OpenAIGenerator(model="gpt-4o-mini")

def run_rag_with_spans(question: str, trace_id: str) -> str:
    # Embedder span
    embed_span = nexus.start_span(trace_id, {
        "name": "component:embedder",
        "type": "tool",
        "metadata": {"query": question[:200], "model": "all-MiniLM-L6-v2"},
    })
    embedding_result = embedder.run(text=question)
    nexus.end_span(embed_span["id"], {
        "output": f"embedding dim={len(embedding_result['embedding'])}",
    })

    # Retriever span
    retriever_span = nexus.start_span(trace_id, {
        "name": "component:retriever",
        "type": "tool",
        "metadata": {"top_k": 3},
    })
    retrieval_result = retriever.run(query_embedding=embedding_result["embedding"], top_k=3)
    docs = retrieval_result["documents"]
    nexus.end_span(retriever_span["id"], {
        "output": f"retrieved {len(docs)} documents",
        "metadata": {"doc_count": len(docs)},
    })

    # PromptBuilder span
    prompt_span = nexus.start_span(trace_id, {
        "name": "component:prompt_builder",
        "type": "tool",
        "metadata": {"doc_count": len(docs)},
    })
    prompt_result = prompt_builder.run(question=question, documents=docs)
    nexus.end_span(prompt_span["id"], {
        "output": f"prompt length={len(prompt_result['prompt'])} chars",
    })

    # Generator span
    gen_span = nexus.start_span(trace_id, {
        "name": "component:generator",
        "type": "tool",
        "metadata": {"model": "gpt-4o-mini"},
    })
    gen_result = generator.run(prompt=prompt_result["prompt"])
    answer = gen_result["replies"][0]
    nexus.end_span(gen_span["id"], {
        "output": answer[:500],
    })

    return answer

Monitoring retrieval quality

Record retrieved document count and relevance scores as span metadata to monitor retrieval quality over time. Low scores often explain poor answers before you even look at the generated text:

import os
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from nexus_sdk import NexusClient

nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
document_store = InMemoryDocumentStore()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)

def retrieve_with_quality_span(query_embedding: list, trace_id: str, top_k: int = 5):
    retriever_span = nexus.start_span(trace_id, {
        "name": "component:retriever",
        "type": "tool",
        "metadata": {"top_k": top_k},
    })
    result = retriever.run(query_embedding=query_embedding, top_k=top_k)
    docs = result["documents"]

    scores = [doc.score for doc in docs if doc.score is not None]
    avg_score = sum(scores) / len(scores) if scores else 0.0
    top_score = max(scores) if scores else 0.0

    nexus.end_span(retriever_span["id"], {
        "output": f"retrieved {len(docs)} docs, avg_score={avg_score:.3f}",
        "metadata": {
            "doc_count": len(docs),
            "avg_score": round(avg_score, 4),
            "top_score": round(top_score, 4),
            "low_quality": avg_score < 0.5,
        },
    })
    return docs

Filter by low_quality: true in the Nexus dashboard to surface runs where retrieval underperformed — these are usually your worst answers.

Full example

A complete RAG pipeline combining a trace, per-component spans, and retrieval quality monitoring in a single runnable script:

import os
from haystack import Document
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from nexus_sdk import NexusClient

nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])

# ----- Setup -----

document_store = InMemoryDocumentStore()

# Index some documents
doc_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
doc_embedder.warm_up()
docs = [
    Document(content="Haystack is an open-source framework for building NLP pipelines."),
    Document(content="Nexus provides observability for AI agents and LLM pipelines."),
    Document(content="RAG combines retrieval with generation for grounded answers."),
]
embedded_docs = doc_embedder.run(documents=docs)
document_store.write_documents(embedded_docs["documents"])

text_embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
text_embedder.warm_up()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(
    template="Context:\n{% for doc in documents %}{{ doc.content }}\n{% endfor %}\nQuestion: {{question}}\nAnswer:"
)
generator = OpenAIGenerator(model="gpt-4o-mini")

# ----- Traced runner -----

def run(question: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "haystack-rag-pipeline",
        "name": f"rag: {question[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {"question": question[:200]},
    })
    try:
        embed_span = nexus.start_span(trace["trace_id"], {
            "name": "component:embedder", "type": "tool",
            "metadata": {"query": question[:200]},
        })
        embed_result = text_embedder.run(text=question)
        nexus.end_span(embed_span["id"], {"output": f"dim={len(embed_result['embedding'])}"})

        retriever_span = nexus.start_span(trace["trace_id"], {
            "name": "component:retriever", "type": "tool",
            "metadata": {"top_k": 3},
        })
        retrieval_result = retriever.run(query_embedding=embed_result["embedding"], top_k=3)
        docs = retrieval_result["documents"]
        scores = [d.score for d in docs if d.score is not None]
        avg_score = sum(scores) / len(scores) if scores else 0.0
        nexus.end_span(retriever_span["id"], {
            "output": f"{len(docs)} docs, avg_score={avg_score:.3f}",
            "metadata": {"doc_count": len(docs), "avg_score": round(avg_score, 4)},
        })

        prompt_span = nexus.start_span(trace["trace_id"], {
            "name": "component:prompt_builder", "type": "tool", "metadata": {},
        })
        prompt_result = prompt_builder.run(question=question, documents=docs)
        nexus.end_span(prompt_span["id"], {"output": f"len={len(prompt_result['prompt'])}"})

        gen_span = nexus.start_span(trace["trace_id"], {
            "name": "component:generator", "type": "tool",
            "metadata": {"model": "gpt-4o-mini"},
        })
        gen_result = generator.run(prompt=prompt_result["prompt"])
        answer = gen_result["replies"][0]
        nexus.end_span(gen_span["id"], {"output": answer[:500]})

        nexus.end_trace(trace["trace_id"], {"status": "success"})
        return answer
    except Exception as e:
        nexus.end_trace(trace["trace_id"], {
            "status": "error",
            "metadata": {"error": str(e)},
        })
        raise

if __name__ == "__main__":
    print(run("What is Haystack?"))
    print(run("How does RAG work?"))

Debugging patterns

Pipeline returns empty or irrelevant answers

Check the component:retriever span in the trace waterfall. If doc_count is 0 or avg_score is below 0.3, the embedder or document store is the culprit — not the generator. Check that documents were embedded with the same model used for query embedding.

Component spans not closing on failure

A span stays open if an exception is thrown before nexus.end_span(). Always wrap each component call in its own try/except, or use a context-manager helper, so spans close with an error status rather than hanging indefinitely.

Traces stuck in “running” state

A trace stays running if nexus.end_trace() is never called. Always wrap pipeline.run() in a try/except/finally block so the trace closes even when an exception propagates out of a component.

Ready to instrument your Haystack pipelines?

Start for free — no credit card required. See traces in under 5 minutes.

Start free →