2026-04-19 · 9 min read

Tracing DSPy Programs: Observability for Prompt Optimization Pipelines

DSPy replaces hand-written prompts with compiled LM programs — but when an optimizer iteration degrades performance, a multi-hop retrieval chain produces irrelevant context, or production inputs diverge from your training set, you need trace visibility to diagnose what's happening. Here's how to instrument DSPy programs with Nexus.

What DSPy adds

DSPy flips the prompting model: instead of writing prompt strings by hand, you define LM programs using declarative modules like dspy.ChainOfThought, dspy.Predict, and dspy.Retrieve. Then an optimizer (like MIPRO or BootstrapFewShot) compiles the program by running it on training examples and tuning the prompts to maximize a metric.

This approach creates new observability challenges that don’t exist with traditional prompting:

Tracing program.forward() calls

Every DSPy program inherits from dspy.Module and implements a forward() method. Wrapping forward calls in Nexus traces gives you end-to-end latency, per-call metadata, and error capture:

import os
import time
import dspy
from nexus_sdk import NexusClient

nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])

# Configure DSPy with your LM
lm = dspy.LM("openai/gpt-4o", api_key=os.environ["OPENAI_API_KEY"])
dspy.configure(lm=lm)

class RAGProgram(dspy.Module):
    def __init__(self, num_passages=3):
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        context = self.retrieve(question).passages
        return self.generate(context=context, question=question)

rag = RAGProgram()

def run_with_tracing(question: str, user_id: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "dspy-rag-program",
        "name": f"rag: {question[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {
            "user_id": user_id,
            "question_length": len(question),
            "program": "RAGProgram",
            "environment": os.environ.get("APP_ENV", "dev"),
        },
    })
    trace_id = trace["trace_id"]

    t0 = time.time()
    try:
        result = rag(question=question)
        elapsed_ms = int((time.time() - t0) * 1000)

        nexus.end_trace(trace_id, {
            "status": "success",
            "latency_ms": elapsed_ms,
            "metadata": {
                "answer_length": len(result.answer),
                "reasoning_steps": result.reasoning.count("\n") if hasattr(result, "reasoning") else None,
            },
        })
        return result.answer

    except Exception as e:
        nexus.end_trace(trace_id, {
            "status": "error",
            "latency_ms": int((time.time() - t0) * 1000),
            "error": str(e),
        })
        raise

Tracing multi-hop programs

Multi-hop programs (where the output of one module feeds into the retrieval query of the next) are the hardest DSPy programs to debug. Emit a span for each retrieval hop so you can see what was retrieved at each step and whether the refined query produced better results:

class MultiHopRAG(dspy.Module):
    def __init__(self):
        self.retrieve = dspy.Retrieve(k=3)
        self.refine_query = dspy.Predict("context, question -> refined_query")
        self.generate = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        return self._forward_with_spans(question, None)

    def _forward_with_spans(self, question: str, trace_id: str | None):
        # Hop 1: retrieve on original question
        t0 = time.time()
        hop1 = self.retrieve(question).passages
        if trace_id:
            nexus.add_span(trace_id, {
                "name": "retrieve:hop1",
                "status": "success",
                "latency_ms": int((time.time() - t0) * 1000),
                "metadata": {"passages": len(hop1), "query": question[:80]},
            })

        # Refine the query based on initial context
        t1 = time.time()
        refined = self.refine_query(context=hop1, question=question)
        if trace_id:
            nexus.add_span(trace_id, {
                "name": "refine_query",
                "status": "success",
                "latency_ms": int((time.time() - t1) * 1000),
                "metadata": {"refined_query": refined.refined_query[:80]},
            })

        # Hop 2: retrieve on refined query
        t2 = time.time()
        hop2 = self.retrieve(refined.refined_query).passages
        if trace_id:
            nexus.add_span(trace_id, {
                "name": "retrieve:hop2",
                "status": "success",
                "latency_ms": int((time.time() - t2) * 1000),
                "metadata": {"passages": len(hop2), "query": refined.refined_query[:80]},
            })

        all_context = hop1 + hop2
        return self.generate(context=all_context, question=question)

Tracing optimizer iterations

DSPy’s MIPRO and BootstrapFewShot optimizers run your program many times on training examples. Capturing optimizer metadata alongside each run lets you correlate which training iteration produced a latency or quality regression:

import dspy
from dspy.teleprompt import MIPROv2

def optimize_with_tracing(program, trainset, metric, program_name: str):
    """Run MIPRO optimization and trace each evaluation call."""
    
    iteration_counter = [0]
    
    # Wrap the metric to capture per-iteration metadata
    def traced_metric(example, prediction, trace=None):
        iteration_counter[0] += 1
        iteration = iteration_counter[0]
        
        t0 = time.time()
        score = metric(example, prediction, trace)
        elapsed_ms = int((time.time() - t0) * 1000)
        
        nexus.start_trace({
            "agent_id": f"dspy-optimizer-{program_name}",
            "name": f"optimize:iter-{iteration}",
            "status": "success" if score > 0.5 else "error",
            "started_at": nexus.now(),
            "metadata": {
                "iteration": iteration,
                "score": score,
                "latency_ms": elapsed_ms,
                "question": str(example.question)[:80] if hasattr(example, "question") else None,
            },
        })
        return score
    
    optimizer = MIPROv2(metric=traced_metric, num_candidates=5, num_threads=4)
    compiled_program = optimizer.compile(program, trainset=trainset, num_trials=20)
    return compiled_program

Production vs. compiled-program divergence

One of the most common DSPy production issues: a program compiled on your dev set underperforms on production inputs because the few-shot examples the optimizer selected don’t generalize. Add metadata to distinguish compiled vs. uncompiled runs and track this in Nexus:

trace = nexus.start_trace({
    "agent_id": "dspy-rag-program",
    "name": f"rag: {question[:60]}",
    "status": "running",
    "started_at": nexus.now(),
    "metadata": {
        "compiled": True,
        "optimizer": "MIPROv2",
        "num_demos": len(program.generate.demos) if hasattr(program.generate, "demos") else 0,
        "model": "gpt-4o",
        "environment": os.environ.get("APP_ENV", "dev"),
        "user_id": user_id,
    },
})

With this metadata, you can filter Nexus traces to compare error rates and latency between compiled and uncompiled runs — or between programs compiled with different optimizers or training set sizes.

What to alert on

Once traces are flowing into Nexus, set up alerts for these DSPy-specific failure patterns:

Next steps

DSPy makes prompt optimization systematic — and with Nexus traces, you make the optimization loop itself observable. You can see which iterations improve the metric, which production inputs fall outside the compiled program’s coverage, and where latency is being spent in multi-hop retrieval chains.

Sign up for a free Nexus account and start tracing your DSPy programs today.

Trace your DSPy programs

Free tier, no credit card required. Full trace visibility in under 5 minutes.