Debugging & Troubleshooting Agent Behaviour
The Debugging Mindset for Agents
Debugging agents is fundamentally different from debugging deterministic code. The same input can produce different outputs. An agent might fail on turn 7 of a 10-turn conversation but work fine on turns 1–6. Three questions to ask every time:
Adding LangSmith/Langfuse tracing, structured logging, and correlation IDs from day one is far cheaper than retrofitting them when a production incident occurs. Every hour spent on observability infrastructure saves 10 hours of future debugging.
LangGraph Studio
LangGraph Studio is a visual IDE for inspecting, stepping through, and replaying LangGraph runs. It is a developer tool — not for end users — that dramatically reduces the time needed to understand agent behaviour.
LangGraph Studio features: ┌─────────────────────────────────────────────────┐ │ Graph View │ State Inspector │ │ • Visual node map │ • Full state at each step │ │ • Active node │ • Field-by-field diff │ │ highlighted │ between checkpoints │ ├─────────────────────┼────────────────────────────┤ │ Execution Controls │ Message Thread │ │ • Step forward │ • All messages rendered │ │ • Step backward │ • Tool calls visible │ │ • Replay from any │ • Streaming support │ │ checkpoint │ │ └─────────────────────┴────────────────────────────┘
# Install the LangGraph CLI
pip install langgraph-cli
# Start Studio pointing at your graph definition
langgraph dev --graph my_module:graph
# Studio opens at http://localhost:8123
# Requires LANGSMITH_API_KEY to be set for tracing
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = your_graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "debug-session"}}
# Run the graph
result = graph.invoke({"messages": [HumanMessage("Complex query...")]}, config)
# List all saved checkpoints for this thread
history = list(graph.get_state_history(config))
print(f"Found {len(history)} checkpoints")
for i, checkpoint in enumerate(history):
print(f"Step {i}: node={checkpoint.metadata.get('source')}, "
f"keys={list(checkpoint.values.keys())}")
# Rewind to checkpoint 3 (before the bad decision)
checkpoint_to_rewind = history[3]
rewind_config = {
"configurable": {
"thread_id": "debug-session",
"checkpoint_id": checkpoint_to_rewind.config["configurable"]["checkpoint_id"]
}
}
# Replay with a modified input (e.g., a different prompt)
replay_result = graph.invoke(
{"messages": [HumanMessage("Same query but rephrased differently...")]},
config=rewind_config
)
LangSmith Trace Analysis
LangSmith records every run automatically when LANGCHAIN_TRACING_V2=true is set. Learning to read a trace efficiently is one of the highest-leverage debugging skills.
from langsmith import traceable, Client
from langchain_core.runnables import RunnableConfig
import uuid
ls_client = Client()
# @traceable wraps any function and makes it appear in the trace tree
@traceable(name="rag_retrieval", tags=["rag", "retrieval"])
def retrieve_documents(query: str, k: int = 4) -> list:
"""Retrieval step — now visible as its own span in LangSmith."""
return retriever.invoke(query)[:k]
# Injecting metadata into a LangGraph run
def my_agent_node(state: dict, config: RunnableConfig) -> dict:
# Access run metadata for tagging
run_id = config.get("run_id") or str(uuid.uuid4())
# Add custom metadata visible in LangSmith
response = model.invoke(
state["messages"],
config=RunnableConfig(
metadata={
"user_id": state.get("user_id"),
"session_id": state.get("session_id"),
"node_name": "my_agent_node",
"iteration": state.get("iteration", 0),
},
tags=["production", "customer-service"]
)
)
return {"messages": [response]}
# Search and filter traces programmatically
runs = ls_client.list_runs(
project_name="my-enterprise-app",
filter='and(eq(status, "error"), gte(start_time, "2025-01-01"))',
limit=50,
)
for run in runs:
print(f"{run.id}: {run.name} | error: {run.error}")
Structured Logging Inside Agents
Application-level logs complement LangSmith traces. Structure your logs as JSON so they can be queried by correlation ID across distributed systems.
import logging, json, uuid, time
from langchain_core.callbacks import BaseCallbackHandler
# ── JSON formatter ──
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
log_obj = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
**getattr(record, "extra", {}),
}
return json.dumps(log_obj)
logger = logging.getLogger("agent")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# ── LangGraph node with structured logging ──
def instrumented_node(state: dict) -> dict:
start = time.time()
correlation_id = state.get("correlation_id", str(uuid.uuid4()))
logger.info("node_entered", extra={
"correlation_id": correlation_id,
"node": "instrumented_node",
"iteration": state.get("iteration", 0),
"message_count": len(state.get("messages", [])),
})
try:
response = model.invoke(state["messages"])
logger.info("node_completed", extra={
"correlation_id": correlation_id,
"node": "instrumented_node",
"duration_ms": int((time.time() - start) * 1000),
"output_tokens": response.usage_metadata.get("output_tokens", 0),
"has_tool_calls": bool(response.tool_calls),
})
return {"messages": [response], "correlation_id": correlation_id}
except Exception as e:
logger.error("node_failed", extra={
"correlation_id": correlation_id,
"node": "instrumented_node",
"error_type": type(e).__name__,
"error_message": str(e),
"duration_ms": int((time.time() - start) * 1000),
})
raise
State Inspection & Snapshot Diffing
import deepdiff
def diff_checkpoints(graph, config: dict, step_a: int, step_b: int) -> dict:
"""Compare state between two checkpoints — useful for finding unexpected mutations."""
history = list(graph.get_state_history(config))
state_a = history[step_a].values
state_b = history[step_b].values
# DeepDiff handles nested dicts, lists, and objects
diff = deepdiff.DeepDiff(state_a, state_b, ignore_order=True)
if diff:
print(f"\nState changed from step {step_a} to step {step_b}:")
for change_type, changes in diff.items():
print(f"\n {change_type}:")
for path, val in (changes.items() if isinstance(changes, dict) else enumerate(changes)):
print(f" {path}: {val}")
else:
print("No state changes between the two checkpoints.")
return diff
# Usage: compare what changed between step 2 and step 5
diff = diff_checkpoints(graph, config, step_a=2, step_b=5)
Common Failure Patterns & Fixes
| Symptom | Root Cause | Diagnosis | Fix |
|---|---|---|---|
| Agent loops forever | Missing stopping condition or bad routing | Check tools_condition edge; inspect iteration count in state | Add MAX_ITERATIONS counter to state; add fallback END route |
| Tool called 100+ times | Model misreads tool output and calls it again | LangSmith trace — look for repeated tool_calls in messages | Improve tool docstring; add tool result validation |
| Context window overflow | Message history grows without trimming | Log len(messages) at each node | Add message trimmer: keep last N messages |
| Agent ignores tool result | Tool returns wrong type; reducer not appending | Check ToolMessage is appended before next LLM call | Verify ToolMessage.tool_call_id matches AIMessage.tool_calls[n].id |
| Wrong agent selected | Supervisor prompt too vague | Trace supervisor's routing decision in LangSmith | Add examples to supervisor prompt; use structured output |
from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage, AIMessage
from langgraph.graph.message import add_messages
MAX_AGENT_ITERATIONS = 10
class SafeAgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
iteration: int # incremented by every agent node call
def safe_agent_node(state: SafeAgentState) -> dict:
# Hard stop on runaway loops
if state["iteration"] >= MAX_AGENT_ITERATIONS:
return {
"messages": [AIMessage(
content=f"[SafeGuard] Reached maximum iterations ({MAX_AGENT_ITERATIONS}). "
"Terminating to prevent infinite loop."
)],
"iteration": state["iteration"] + 1,
}
response = model.invoke(state["messages"])
return {
"messages": [response],
"iteration": state["iteration"] + 1,
}
Evaluation-Driven Debugging
When you find a bug, the first thing to do is reproduce it as a test case. This prevents regression and gives you an automated way to verify the fix.
from langsmith import Client
from langsmith.evaluation import evaluate, LangChainStringEvaluator
client = Client()
# Step 1: capture the failing query as a dataset entry
dataset = client.create_dataset("bug-regression-suite")
client.create_example(
inputs={"question": "What is our refund policy for enterprise plans?"},
outputs={"answer": "Enterprise plan refunds are handled within 14 days per section 4.2."},
dataset_id=dataset.id,
)
# Step 2: define your RAG chain as a target function
def rag_target(inputs: dict) -> dict:
answer = rag_chain.invoke(inputs["question"])
return {"answer": answer}
# Step 3: run evaluation and assert quality threshold
results = evaluate(
rag_target,
data=dataset.name,
evaluators=[
LangChainStringEvaluator("qa"), # is it correct?
LangChainStringEvaluator("criteria", # is it grounded?
criteria={"grounded": "Does the answer cite a source?"}),
],
)
# Step 4: assert minimum quality in CI
avg_score = sum(r.score for r in results.results) / len(results.results)
assert avg_score >= 0.8, f"Regression detected: avg score {avg_score:.2f} < 0.8"