2026-06-03 · 7 min read

Observability for CrewAI Flows: Tracing State Machines and Conditional Routes with Nexus

CrewAI Flows (introduced in v0.60) bring structured, event-driven workflow orchestration to CrewAI — @start methods, @listen handlers, and @router decorators for conditional branching. Unlike unstructured Crew runs, Flows are deterministic state machines that need different observability: tracking state transitions, detecting unexpected routing branches, and recording what triggered each step. Here's how to wrap every Flow method with Nexus spans.

CrewAI Flows vs. Crews: Why They Need Different Observability

CrewAI has two distinct abstractions for multi-agent work:

Flows need different observability than Crews because the failure modes are different:

Nexus solves this by creating one span per Flow method — recording the state at each transition point, the routing decision made, and the step latency. See also: Nexus docs for CrewAI integration.

The Core Pattern: One Span Per Flow Step

The pattern is: start a trace in your @start method, store it on the flow instance, then create a span at the beginning of each @listen and @router method and end it before returning. End the trace in the terminal step.

from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
import os
from nexus_client import NexusClient

nexus = NexusClient(
    api_key=os.environ["NEXUS_API_KEY"],
    agent_id="my-crewai-flow",
)

class ContentFlowState(BaseModel):
    topic: str = ""
    research_result: str = ""
    draft: str = ""
    routed_to: str = ""

class ContentFlow(Flow[ContentFlowState]):

    @start()
    def gather_topic(self):
        trace = nexus.start_trace(
            name="flow: content-generation",
            metadata={"flow": "ContentFlow", "step": "gather_topic"},
        )
        # Store trace on the flow instance so downstream steps can access it
        self._nexus_trace = trace

        span = trace.add_span(
            name="gather_topic",
            input={"topic": self.state.topic},
        )
        # ... actual logic here ...
        self.state.topic = "AI agent observability"
        span.end(status="ok", output={"topic": self.state.topic})

    @listen(gather_topic)
    def research(self):
        span = self._nexus_trace.add_span(
            name="research",
            input={"topic": self.state.topic},
        )
        # ... call your research crew or tool here ...
        self.state.research_result = f"Research findings on {self.state.topic}"
        span.end(
            status="ok",
            output={"research_length": len(self.state.research_result)},
        )

    @router(research)
    def route_by_topic(self):
        span = self._nexus_trace.add_span(
            name="route_by_topic",
            input={"topic": self.state.topic},
        )
        if "AI" in self.state.topic:
            self.state.routed_to = "technical_draft"
            span.end(status="ok", output={"route": "technical_draft"})
            return "technical_draft"
        else:
            self.state.routed_to = "general_draft"
            span.end(status="ok", output={"route": "general_draft"})
            return "general_draft"

    @listen("technical_draft")
    def write_technical(self):
        span = self._nexus_trace.add_span(
            name="write_technical",
            input={"routed_to": self.state.routed_to},
        )
        self.state.draft = f"Technical article: {self.state.research_result}"
        span.end(status="ok", output={"draft_length": len(self.state.draft)})
        self._nexus_trace.end(status="success")

    @listen("general_draft")
    def write_general(self):
        span = self._nexus_trace.add_span(
            name="write_general",
            input={"routed_to": self.state.routed_to},
        )
        self.state.draft = f"General article: {self.state.research_result}"
        span.end(status="ok", output={"draft_length": len(self.state.draft)})
        self._nexus_trace.end(status="success")

Key points:

Detecting Routing Failures

The most dangerous failure mode in Flows is a routing failure: a @router method returns a string that no @listen handler matches. The flow halts silently — no exception, no error message, just a stopped workflow. Detecting this requires validating the routing return value before returning it:

class InstrumentedFlow(Flow[ContentFlowState]):

    @start()
    def gather_topic(self):
        trace = nexus.start_trace(
            name="flow: content-generation",
            metadata={"flow": self.__class__.__name__},
        )
        self._nexus_trace = trace
        span = trace.add_span(name="gather_topic", input={})
        try:
            # ... your logic ...
            span.end(status="ok", output={"topic": self.state.topic})
        except Exception as e:
            span.end(status="error", output={"error": str(e)})
            trace.end(status="error")
            raise

    @router(gather_topic)
    def validate_and_route(self):
        span = self._nexus_trace.add_span(
            name="validate_and_route",
            input={"topic": self.state.topic},
        )
        # Detect unexpected routing — if topic is empty, the router has
        # nowhere valid to go; record it as an error span rather than
        # letting the flow silently proceed to a wrong branch
        if not self.state.topic.strip():
            span.end(
                status="error",
                output={"error": "empty_topic", "possible_routes": ["detailed", "brief"]},
            )
            self._nexus_trace.end(status="error")
            raise ValueError("Cannot route: topic is empty")

        route = "detailed" if len(self.state.topic) > 20 else "brief"
        span.end(status="ok", output={"route": route})
        return route

This pattern ensures that every routing failure shows up as a red span in Nexus with the exact state at the time of failure — so you can reproduce and debug it.

Capturing Flow State at Transition Points

For complex flows where you need to reproduce the exact state that led to a routing decision, snapshot the flow state in the span input using Pydantic's model_dump():

# Capture full flow state at key transition points
# This lets you replay the exact state that led to a routing decision

@router(research)
def route_by_quality(self):
    # Snapshot current state in the span input
    state_snapshot = self.state.model_dump()
    span = self._nexus_trace.add_span(
        name="route_by_quality",
        input={
            "state": state_snapshot,
            "research_length": len(self.state.research_result),
        },
    )

    quality_score = len(self.state.research_result) / 100  # simplified
    if quality_score > 5:
        route = "high_quality"
    elif quality_score > 2:
        route = "needs_review"
    else:
        route = "retry_research"

    span.end(
        status="ok" if route != "retry_research" else "error",
        output={
            "route": route,
            "quality_score": quality_score,
            "state_at_decision": state_snapshot,
        },
    )
    return route

This gives you a complete audit trail: every routing decision is recorded with the full state that triggered it. If a flow takes an unexpected route in production, you can open the trace in Nexus, look at the route_by_quality span, and see exactly what the state looked like at that moment.

What You'll See in the Nexus Dashboard

Once instrumented, each Flow execution appears as a trace. The waterfall view shows the full step sequence with individual step latencies, and you can filter by route metadata to find all traces that took a specific branch. Error spans from routing failures or step exceptions stand out immediately without scanning logs.

Get Started

Install the Nexus Python client (pip install nexus-client), create a free account at nexus.keylightdigital.dev/pricing, and add the tracing pattern to your existing Flow class. The CrewAI integration guide covers both Crews and Flows with additional examples.

Ready to trace your CrewAI Flows step by step?

Start free — no credit card required. Up to 10,000 spans/month on the free tier.

Start monitoring for free →
← Back to blog