Docs AG2

AG2 (AutoGen v2) Integration

Add distributed tracing to AG2 multi-agent conversations. Instrument ConversableAgent turns, GroupChat coordination, and function tool calls to debug handoff bugs, context drift, and silent tool failures.

Installation

pip install keylightdigital-nexus ag2

Get your API key from Dashboard → API Keys.

Basic conversation trace

Wrap an entire initiate_chat call in a Nexus trace to capture the full conversation as a single observable unit:

import os
import ag2
from nexus_sdk import NexusClient

nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])

# Configure LLM
llm_config = {
    "config_list": [{"model": "gpt-4o", "api_key": os.environ["OPENAI_API_KEY"]}]
}

assistant = ag2.AssistantAgent(name="assistant", llm_config=llm_config)
user_proxy = ag2.UserProxyAgent(
    name="user_proxy",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=5,
)

def run_conversation(query: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "ag2-assistant",
        "name": f"conversation: {query[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {
            "query": query[:200],
            "environment": os.environ.get("APP_ENV", "dev"),
            "model": "gpt-4o",
        },
    })

    try:
        user_proxy.initiate_chat(
            assistant,
            message=query,
            max_turns=10,
        )
        last_message = assistant.last_message()["content"]
        nexus.end_trace(trace["trace_id"], {"status": "success"})
        return last_message
    except Exception as e:
        nexus.end_trace(trace["trace_id"], {"status": "error", "metadata": {"error": str(e)}})
        raise

Per-message spans with reply hook

Use AG2's reply hook to record each agent turn as a Nexus span. This gives you a span per message in the conversation waterfall:

def make_reply_hook(nexus: NexusClient, trace_id: str, agent_name: str):
    """Returns a reply hook that wraps each agent turn in a Nexus span."""
    def hook(reply: str | dict | None, sender: ag2.ConversableAgent,
             config: dict | None) -> tuple[bool, str | dict | None]:
        if reply is None:
            return False, None
        content = reply if isinstance(reply, str) else reply.get("content", "")
        span = nexus.start_span(trace_id, {
            "name": f"turn:{agent_name}",
            "type": "llm",
            "metadata": {
                "agent": agent_name,
                "sender": sender.name if sender else "unknown",
                "reply_length": len(content),
            },
        })
        nexus.end_span(span["id"], {"output": content[:500]})
        return False, reply  # Return False to not modify the reply
    return hook

# Register hooks on both agents
assistant.register_reply(
    [ag2.ConversableAgent, None],
    make_reply_hook(nexus, trace_id, "assistant"),
    position=0,
)
user_proxy.register_reply(
    [ag2.ConversableAgent, None],
    make_reply_hook(nexus, trace_id, "user_proxy"),
    position=0,
)

GroupChat tracing

For GroupChat workflows, record which agent was selected at each turn as span metadata:

researcher = ag2.AssistantAgent(name="researcher", llm_config=llm_config,
    system_message="You find and cite facts. Be concise.")
writer = ag2.AssistantAgent(name="writer", llm_config=llm_config,
    system_message="You write clear summaries based on facts provided.")
critic = ag2.AssistantAgent(name="critic", llm_config=llm_config,
    system_message="You review summaries for accuracy. Flag any errors.")

groupchat = ag2.GroupChat(
    agents=[researcher, writer, critic],
    messages=[],
    max_round=12,
)
manager = ag2.GroupChatManager(groupchat=groupchat, llm_config=llm_config)

def run_groupchat(task: str) -> str:
    trace = nexus.start_trace({
        "agent_id": "ag2-groupchat",
        "name": f"groupchat: {task[:60]}",
        "status": "running",
        "started_at": nexus.now(),
        "metadata": {
            "agents": ["researcher", "writer", "critic"],
            "max_round": 12,
            "task_length": len(task),
        },
    })
    trace_id = trace["trace_id"]

    # Track which agents speak using a closure
    round_counter = [0]
    original_select = groupchat.select_speaker

    def traced_select(last_speaker, selector):
        result = original_select(last_speaker, selector)
        round_counter[0] += 1
        span = nexus.start_span(trace_id, {
            "name": f"groupchat:round_{round_counter[0]}",
            "type": "tool",
            "metadata": {
                "round": round_counter[0],
                "last_speaker": last_speaker.name if last_speaker else "none",
                "selected": result.name if result else "none",
            },
        })
        nexus.end_span(span["id"], {"output": result.name if result else "done"})
        return result

    groupchat.select_speaker = traced_select

    try:
        user_proxy.initiate_chat(manager, message=task, max_turns=12)
        nexus.end_trace(trace_id, {"status": "success"})
        return groupchat.messages[-1]["content"] if groupchat.messages else ""
    except Exception as e:
        nexus.end_trace(trace_id, {"status": "error", "metadata": {"error": str(e)}})
        raise

Tool call spans

Wrap AG2 function tools with Nexus spans to capture tool inputs, outputs, and errors:

def traced_tool(nexus: NexusClient, trace_id: str, tool_name: str):
    """Decorator that wraps an AG2 tool function in a Nexus span."""
    def decorator(fn):
        def wrapper(*args, **kwargs):
            span = nexus.start_span(trace_id, {
                "name": f"tool:{tool_name}",
                "type": "tool",
                "metadata": {"tool": tool_name, "args": str(args)[:200]},
            })
            try:
                result = fn(*args, **kwargs)
                nexus.end_span(span["id"], {"output": str(result)[:500]})
                return result
            except Exception as e:
                nexus.end_span(span["id"], {"error": str(e)})
                raise
        wrapper.__name__ = fn.__name__
        return wrapper
    return decorator

# Usage with a trace_id captured in closure:
@traced_tool(nexus, trace_id, "web_search")
def web_search(query: str) -> str:
    """Search the web for information."""
    return f"Search results for: {query}"

user_proxy.register_function({"web_search": web_search})

Debugging patterns

Context drift over long conversations

If early spans show concise replies but later spans show wandering output, the conversation history is growing too large for the model context. Track reply_length across turns — a spike means the model is padding.

Stuck GroupChat round-robin

If the same agent is selected every round, check the selected metadata on GroupChat spans. A stuck selector often means the system prompt is ambiguous about agent roles.

Silent tool failures

Tool spans with an error field but no corresponding error in the conversation mean AG2 swallowed the exception. Add explicit error messages in your tool functions and check the span error metadata.

Ready to instrument your AG2 agents?

Start for free — no credit card required. See traces in under 5 minutes.

Start free →