Tracing DSPy Programs: Observability for Prompt Optimization Pipelines
DSPy replaces hand-written prompts with compiled LM programs — but when an optimizer iteration degrades performance, a multi-hop retrieval chain produces irrelevant context, or production inputs diverge from your training set, you need trace visibility to diagnose what's happening. Here's how to instrument DSPy programs with Nexus.
What DSPy adds
DSPy flips the prompting model: instead of writing prompt strings by hand, you define LM programs using declarative modules like dspy.ChainOfThought, dspy.Predict, and dspy.Retrieve. Then an optimizer (like MIPRO or BootstrapFewShot) compiles the program by running it on training examples and tuning the prompts to maximize a metric.
This approach creates new observability challenges that don’t exist with traditional prompting:
- Optimizer iterations are opaque: MIPRO runs dozens of forward passes to find good few-shot examples and instructions. You can’t tell which iteration produced the metric improvement or which failed.
- Module-level latency is invisible: A compiled program with
ChainOfThought+Retrievein sequence has no built-in way to attribute latency to each module. - Retrieval quality drift: Multi-hop programs (Retrieve → reason → Retrieve again) can degrade silently when the retrieved context at hop 2 is irrelevant to the refined query.
- Compiled vs. uncompiled divergence: A program that scores well on your dev set after compilation may behave differently in production with out-of-distribution inputs.
Tracing program.forward() calls
Every DSPy program inherits from dspy.Module and implements a forward() method. Wrapping forward calls in Nexus traces gives you end-to-end latency, per-call metadata, and error capture:
import os
import time
import dspy
from nexus_sdk import NexusClient
nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
# Configure DSPy with your LM
lm = dspy.LM("openai/gpt-4o", api_key=os.environ["OPENAI_API_KEY"])
dspy.configure(lm=lm)
class RAGProgram(dspy.Module):
def __init__(self, num_passages=3):
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(question).passages
return self.generate(context=context, question=question)
rag = RAGProgram()
def run_with_tracing(question: str, user_id: str) -> str:
trace = nexus.start_trace({
"agent_id": "dspy-rag-program",
"name": f"rag: {question[:60]}",
"status": "running",
"started_at": nexus.now(),
"metadata": {
"user_id": user_id,
"question_length": len(question),
"program": "RAGProgram",
"environment": os.environ.get("APP_ENV", "dev"),
},
})
trace_id = trace["trace_id"]
t0 = time.time()
try:
result = rag(question=question)
elapsed_ms = int((time.time() - t0) * 1000)
nexus.end_trace(trace_id, {
"status": "success",
"latency_ms": elapsed_ms,
"metadata": {
"answer_length": len(result.answer),
"reasoning_steps": result.reasoning.count("\n") if hasattr(result, "reasoning") else None,
},
})
return result.answer
except Exception as e:
nexus.end_trace(trace_id, {
"status": "error",
"latency_ms": int((time.time() - t0) * 1000),
"error": str(e),
})
raise
Tracing multi-hop programs
Multi-hop programs (where the output of one module feeds into the retrieval query of the next) are the hardest DSPy programs to debug. Emit a span for each retrieval hop so you can see what was retrieved at each step and whether the refined query produced better results:
class MultiHopRAG(dspy.Module):
def __init__(self):
self.retrieve = dspy.Retrieve(k=3)
self.refine_query = dspy.Predict("context, question -> refined_query")
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
return self._forward_with_spans(question, None)
def _forward_with_spans(self, question: str, trace_id: str | None):
# Hop 1: retrieve on original question
t0 = time.time()
hop1 = self.retrieve(question).passages
if trace_id:
nexus.add_span(trace_id, {
"name": "retrieve:hop1",
"status": "success",
"latency_ms": int((time.time() - t0) * 1000),
"metadata": {"passages": len(hop1), "query": question[:80]},
})
# Refine the query based on initial context
t1 = time.time()
refined = self.refine_query(context=hop1, question=question)
if trace_id:
nexus.add_span(trace_id, {
"name": "refine_query",
"status": "success",
"latency_ms": int((time.time() - t1) * 1000),
"metadata": {"refined_query": refined.refined_query[:80]},
})
# Hop 2: retrieve on refined query
t2 = time.time()
hop2 = self.retrieve(refined.refined_query).passages
if trace_id:
nexus.add_span(trace_id, {
"name": "retrieve:hop2",
"status": "success",
"latency_ms": int((time.time() - t2) * 1000),
"metadata": {"passages": len(hop2), "query": refined.refined_query[:80]},
})
all_context = hop1 + hop2
return self.generate(context=all_context, question=question)
Tracing optimizer iterations
DSPy’s MIPRO and BootstrapFewShot optimizers run your program many times on training examples. Capturing optimizer metadata alongside each run lets you correlate which training iteration produced a latency or quality regression:
import dspy
from dspy.teleprompt import MIPROv2
def optimize_with_tracing(program, trainset, metric, program_name: str):
"""Run MIPRO optimization and trace each evaluation call."""
iteration_counter = [0]
# Wrap the metric to capture per-iteration metadata
def traced_metric(example, prediction, trace=None):
iteration_counter[0] += 1
iteration = iteration_counter[0]
t0 = time.time()
score = metric(example, prediction, trace)
elapsed_ms = int((time.time() - t0) * 1000)
nexus.start_trace({
"agent_id": f"dspy-optimizer-{program_name}",
"name": f"optimize:iter-{iteration}",
"status": "success" if score > 0.5 else "error",
"started_at": nexus.now(),
"metadata": {
"iteration": iteration,
"score": score,
"latency_ms": elapsed_ms,
"question": str(example.question)[:80] if hasattr(example, "question") else None,
},
})
return score
optimizer = MIPROv2(metric=traced_metric, num_candidates=5, num_threads=4)
compiled_program = optimizer.compile(program, trainset=trainset, num_trials=20)
return compiled_program
Production vs. compiled-program divergence
One of the most common DSPy production issues: a program compiled on your dev set underperforms on production inputs because the few-shot examples the optimizer selected don’t generalize. Add metadata to distinguish compiled vs. uncompiled runs and track this in Nexus:
trace = nexus.start_trace({
"agent_id": "dspy-rag-program",
"name": f"rag: {question[:60]}",
"status": "running",
"started_at": nexus.now(),
"metadata": {
"compiled": True,
"optimizer": "MIPROv2",
"num_demos": len(program.generate.demos) if hasattr(program.generate, "demos") else 0,
"model": "gpt-4o",
"environment": os.environ.get("APP_ENV", "dev"),
"user_id": user_id,
},
})
With this metadata, you can filter Nexus traces to compare error rates and latency between compiled and uncompiled runs — or between programs compiled with different optimizers or training set sizes.
What to alert on
Once traces are flowing into Nexus, set up alerts for these DSPy-specific failure patterns:
- Error rate spike on the production program: if your compiled program’s error rate exceeds 5% in a 1-hour window, the few-shot examples may no longer match production inputs.
- Latency above 8s: multi-hop retrieval with ChainOfThought synthesis is slow by design; spikes above your baseline indicate retrieval bottlenecks or LLM rate limiting.
- Optimizer run with zero improvement: if 20 MIPRO iterations all score below your baseline, the training set may need expansion or the metric definition needs revisiting.
Next steps
DSPy makes prompt optimization systematic — and with Nexus traces, you make the optimization loop itself observable. You can see which iterations improve the metric, which production inputs fall outside the compiled program’s coverage, and where latency is being spent in multi-hop retrieval chains.
Sign up for a free Nexus account and start tracing your DSPy programs today.
Trace your DSPy programs
Free tier, no credit card required. Full trace visibility in under 5 minutes.