AWS Bedrock Agents Integration
AWS Bedrock Agents lets you build managed AI agents backed by foundation models, knowledge bases, and custom action groups. This guide shows how to add full observability with Nexus: per-invocation traces, knowledge base retrieval spans, ActionGroup tool call spans, token usage tracking, and finish reason capture.
Installation
pip install keylightdigital-nexus boto3
Get your Nexus API key from Dashboard → API Keys and set environment variables:
export NEXUS_API_KEY="nxs_your_key_here"
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_DEFAULT_REGION="us-east-1"
Find your Agent ID and Alias ID in the AWS Bedrock console under Agents. Create a named alias (e.g., PROD) or use the default draft alias TSTALIASID for testing.
Basic invocation trace
invoke_agent() returns a streaming EventStream under the completion key. Consume all events to assemble the response text, then end the trace with status and metadata:
import os
import uuid
import boto3
from nexus_sdk import NexusClient
nexus = NexusClient(api_key=os.environ["NEXUS_API_KEY"])
bedrock_agent = boto3.client(
"bedrock-agent-runtime",
region_name=os.environ.get("AWS_DEFAULT_REGION", "us-east-1"),
)
AGENT_ID = "YOUR_AGENT_ID"
AGENT_ALIAS_ID = "YOUR_ALIAS_ID"
def invoke(prompt: str, session_id: str | None = None) -> str:
sid = session_id or str(uuid.uuid4())
trace = nexus.start_trace({
"agent_id": "bedrock-agent",
"name": "invoke: " + prompt[:60],
"status": "running",
"started_at": nexus.now(),
"metadata": {
"prompt": prompt[:200],
"agent_id": AGENT_ID,
"alias_id": AGENT_ALIAS_ID,
"session_id": sid,
},
})
try:
response = bedrock_agent.invoke_agent(
agentId=AGENT_ID,
agentAliasId=AGENT_ALIAS_ID,
sessionId=sid,
inputText=prompt,
)
full_text = ""
stop_reason = "end_turn"
for event in response["completion"]:
if "chunk" in event:
chunk_bytes = event["chunk"].get("bytes", b"")
full_text += chunk_bytes.decode("utf-8")
if "attribution" in event["chunk"]:
stop_reason = event["chunk"]["attribution"].get(
"citations", [{}]
)[0].get("retrievedReferences", [{}])
nexus.end_trace(trace["trace_id"], {
"status": "success",
"metadata": {
"stop_reason": stop_reason,
"response_length": len(full_text),
},
})
return full_text
except Exception as e:
nexus.end_trace(trace["trace_id"], {
"status": "error",
"metadata": {"error": str(e)},
})
raise
Use a stable session_id across multiple turns to maintain conversation context. Bedrock Agents preserve memory within a session window.
Knowledge base retrieval spans
When an agent queries a knowledge base, Bedrock emits trace events in the EventStream. Parse these to record each retrieval as a Nexus span, capturing the query, number of results, and source documents:
def invoke_with_kb_spans(prompt: str) -> str:
sid = str(uuid.uuid4())
trace = nexus.start_trace({
"agent_id": "bedrock-agent-kb",
"name": "invoke: " + prompt[:60],
"status": "running",
"started_at": nexus.now(),
})
try:
response = bedrock_agent.invoke_agent(
agentId=AGENT_ID,
agentAliasId=AGENT_ALIAS_ID,
sessionId=sid,
inputText=prompt,
enableTrace=True, # required to receive trace events
)
full_text = ""
for event in response["completion"]:
if "chunk" in event:
full_text += event["chunk"].get("bytes", b"").decode("utf-8")
elif "trace" in event:
orch = event["trace"].get("trace", {}).get("orchestrationTrace", {})
# Knowledge base retrieval input
kb_input = orch.get("knowledgeBaseLookupInput")
if kb_input:
kb_span = nexus.start_span(trace["trace_id"], {
"name": "kb:lookup",
"type": "retrieval",
"metadata": {
"kb_id": kb_input.get("knowledgeBaseId"),
"query": kb_input.get("text", "")[:200],
},
})
# Knowledge base retrieval output
kb_output = orch.get("knowledgeBaseLookupOutput")
if kb_output and "kb_span" in dir():
refs = kb_output.get("retrievedReferences", [])
nexus.end_span(kb_span["id"], {
"output": str(len(refs)) + " references retrieved",
"metadata": {
"num_references": len(refs),
"sources": [
r.get("location", {}).get("s3Location", {}).get("uri", "")
for r in refs[:5]
],
},
})
nexus.end_trace(trace["trace_id"], {"status": "success"})
return full_text
except Exception as e:
nexus.end_trace(trace["trace_id"], {
"status": "error",
"metadata": {"error": str(e)},
})
raise
Pass enableTrace=True to invoke_agent() to receive orchestration trace events. Without it, knowledgeBaseLookupInput and knowledgeBaseLookupOutput events are suppressed.
ActionGroup tool call spans
ActionGroups are the Bedrock equivalent of function calling. When the agent decides to invoke an action, Bedrock emits a returnControl event (for Lambda-based actions) or an orchestration trace event for inline actions. Record each invocation as a span:
def invoke_with_action_spans(prompt: str) -> str:
sid = str(uuid.uuid4())
trace = nexus.start_trace({
"agent_id": "bedrock-agent-actions",
"name": "invoke: " + prompt[:60],
"status": "running",
"started_at": nexus.now(),
})
action_spans: dict = {}
try:
response = bedrock_agent.invoke_agent(
agentId=AGENT_ID,
agentAliasId=AGENT_ALIAS_ID,
sessionId=sid,
inputText=prompt,
enableTrace=True,
)
full_text = ""
for event in response["completion"]:
if "chunk" in event:
full_text += event["chunk"].get("bytes", b"").decode("utf-8")
elif "returnControl" in event:
# Lambda-based ActionGroup — agent paused, waiting for response
rc = event["returnControl"]
invocation_id = rc.get("invocationId", "")
invocation_inputs = rc.get("invocationInputs", [])
for inv in invocation_inputs:
ag_input = inv.get("actionGroupInvocationInput", {})
span = nexus.start_span(trace["trace_id"], {
"name": "action:" + ag_input.get("function", "unknown"),
"type": "tool",
"metadata": {
"action_group": ag_input.get("actionGroupName"),
"function": ag_input.get("function"),
"invocation_id": invocation_id,
"parameters": ag_input.get("parameters", []),
},
})
action_spans[invocation_id] = span
elif "trace" in event:
orch = event["trace"].get("trace", {}).get("orchestrationTrace", {})
# Inline action invocation output from trace events
action_output = orch.get("actionGroupInvocationOutput")
if action_output:
inv_id = action_output.get("invocationId", "")
if inv_id in action_spans:
nexus.end_span(action_spans[inv_id]["id"], {
"output": action_output.get("text", "")[:500],
"metadata": {"invocation_id": inv_id},
})
nexus.end_trace(trace["trace_id"], {"status": "success"})
return full_text
except Exception as e:
nexus.end_trace(trace["trace_id"], {
"status": "error",
"metadata": {"error": str(e)},
})
raise
For Lambda-based ActionGroups you must submit tool outputs back via invoke_agent() with sessionState.invocationId. Inline action groups (defined in the agent schema) emit results directly in trace events without requiring a round-trip.
Token usage monitoring
Token usage is embedded in the orchestration trace under modelInvocationOutput.metadata.usage. Accumulate these across all model invocations in a single agent call to get total prompt and completion tokens:
def invoke_with_token_tracking(prompt: str) -> str:
sid = str(uuid.uuid4())
trace = nexus.start_trace({
"agent_id": "bedrock-agent-tokens",
"name": "invoke: " + prompt[:60],
"status": "running",
"started_at": nexus.now(),
"metadata": {"prompt": prompt[:200]},
})
total_input_tokens = 0
total_output_tokens = 0
try:
response = bedrock_agent.invoke_agent(
agentId=AGENT_ID,
agentAliasId=AGENT_ALIAS_ID,
sessionId=sid,
inputText=prompt,
enableTrace=True,
)
full_text = ""
for event in response["completion"]:
if "chunk" in event:
full_text += event["chunk"].get("bytes", b"").decode("utf-8")
elif "trace" in event:
orch = event["trace"].get("trace", {}).get("orchestrationTrace", {})
model_output = orch.get("modelInvocationOutput", {})
usage = model_output.get("metadata", {}).get("usage", {})
total_input_tokens += usage.get("inputTokens", 0)
total_output_tokens += usage.get("outputTokens", 0)
nexus.end_trace(trace["trace_id"], {
"status": "success",
"metadata": {
"input_tokens": total_input_tokens,
"output_tokens": total_output_tokens,
"total_tokens": total_input_tokens + total_output_tokens,
},
})
return full_text
except Exception as e:
nexus.end_trace(trace["trace_id"], {
"status": "error",
"metadata": {"error": str(e)},
})
raise
A single invoke_agent() call may trigger multiple model invocations internally (one per reasoning step). Accumulate across all modelInvocationOutput events to get the true total for the request.
TypeScript
Use @aws-sdk/client-bedrock-agent-runtime with the Nexus TypeScript SDK. The async iterator on response.completion streams events just like the Python EventStream:
npm install @keylightdigital/nexus @aws-sdk/client-bedrock-agent-runtime
import {
BedrockAgentRuntimeClient,
InvokeAgentCommand,
} from '@aws-sdk/client-bedrock-agent-runtime';
import NexusClient from '@keylightdigital/nexus';
import { randomUUID } from 'crypto';
const nexus = new NexusClient(process.env.NEXUS_API_KEY!);
const client = new BedrockAgentRuntimeClient({
region: process.env.AWS_DEFAULT_REGION ?? 'us-east-1',
});
const AGENT_ID = process.env.BEDROCK_AGENT_ID!;
const AGENT_ALIAS_ID = process.env.BEDROCK_AGENT_ALIAS_ID!;
async function invoke(prompt: string, sessionId?: string): Promise<string> {
const sid = sessionId ?? randomUUID();
const trace = await nexus.startTrace({
agent_id: 'bedrock-agent-ts',
name: 'invoke: ' + prompt.slice(0, 60),
status: 'running',
started_at: new Date().toISOString(),
metadata: { prompt: prompt.slice(0, 200), session_id: sid },
});
let inputTokens = 0;
let outputTokens = 0;
let fullText = '';
try {
const response = await client.send(
new InvokeAgentCommand({
agentId: AGENT_ID,
agentAliasId: AGENT_ALIAS_ID,
sessionId: sid,
inputText: prompt,
enableTrace: true,
}),
);
for await (const event of response.completion ?? []) {
if (event.chunk?.bytes) {
fullText += Buffer.from(event.chunk.bytes).toString('utf-8');
}
if (event.trace?.trace?.orchestrationTrace) {
const orch = event.trace.trace.orchestrationTrace;
const usage = orch.modelInvocationOutput?.metadata?.usage;
if (usage) {
inputTokens += usage.inputTokens ?? 0;
outputTokens += usage.outputTokens ?? 0;
}
}
}
await nexus.endTrace(trace.id, {
status: 'success',
metadata: {
input_tokens: inputTokens,
output_tokens: outputTokens,
total_tokens: inputTokens + outputTokens,
},
});
return fullText;
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
await nexus.endTrace(trace.id, { status: 'error', metadata: { error: msg } });
throw err;
}
}
Debugging patterns
EventStream closes before full response
Iterate the completion EventStream inside a try/except block. If you break out of the loop early or don't consume all events, the stream closes and the agent invocation may be billed but the response truncated. Always drain the full stream before calling end_trace().
No trace events appearing
Token usage and knowledge base events only appear when enableTrace=True is passed. Without it the stream only yields chunk events. Some agent aliases disable trace by default — verify in the Bedrock console under Agents → Aliases → Trace.
returnControl event not consumed
If your agent uses Lambda-based ActionGroups and you receive a returnControl event, you must submit results back by calling invoke_agent() again with sessionState.invocationId and returnControlInvocationResults. Ignoring it leaves the agent session in a pending state that eventually times out.
High token counts per invocation
Bedrock Agents include the full agent instruction, conversation history, and knowledge base context in every model invocation. If input_tokens spikes, check whether the session history is growing unbounded. Start a new session periodically or use sessionState.sessionAttributes to pass only the context needed.
Ready to instrument your Bedrock agents?
Start for free — no credit card required. See traces in under 5 minutes.
Start free →