LangGraph: Stateful Agentic Workflows
Why LangGraph?
LCEL chains are linear: they flow from left to right and terminate. Real-world agents need cycles (retry loops), branching (route to different tools), and state (remember what happened across steps). LangGraph models this as an explicit directed graph.
Core Concepts: State, Nodes, Edges
┌──────────────────────────────────────────────────────┐
│ StateGraph │
│ │
│ START → [node_a] → [node_b] → [node_c] → END │
│ ↑ │ │
│ └───────────────────────┘ (cycle) │
│ │
│ State: { messages: [...], next: str, count: int } │
└──────────────────────────────────────────────────────┘
Node = Python function that reads state, returns state update
Edge = connection between nodes (unconditional or conditional)
State = typed dict shared by all nodes; mutations are logged
from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
# TypedDict defines the shape of the shared state
class AgentState(TypedDict):
# add_messages is a reducer: appends new messages rather than replacing
messages: Annotated[list[BaseMessage], add_messages]
# Other fields replace their previous value on update
next_action: str
iteration: int
# MessagesState is a convenience state with just `messages` + add_messages
from langgraph.graph import MessagesState # equivalent shorthand
Without a reducer annotation, a node's returned value replaces the field. With add_messages, returned messages are appended to the list. You can write custom reducers for any merge logic (e.g., keep only the last N items, deduplicate by ID).
Building Your First Agent
The classic agent pattern is ReAct: Reason (decide what to do) → Act (call a tool) → Observe (process result) → repeat. Here's a complete implementation from scratch in LangGraph:
from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import BaseMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
# ── 1. State ──
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# ── 2. Tools ──
@tool
def search_web(query: str) -> str:
"""Search the web for current information about the given query."""
# In production, use Tavily or SerpAPI
return f"[Simulated result for: {query}] Python 3.12 was released in October 2023."
@tool
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression like '2 ** 32' or '100 / 7'."""
try:
result = eval(expression, {"__builtins__": {}}) # noqa: S307
return str(result)
except Exception as e:
return f"Error: {e}"
tools = [search_web, calculator]
# ── 3. Model with tools bound ──
model = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
# ── 4. Nodes ──
def call_model(state: State) -> dict:
"""The LLM node: reason about what to do next."""
response = model.invoke(state["messages"])
return {"messages": [response]} # add_messages appends this
# ToolNode handles running whichever tools the model requests
tool_node = ToolNode(tools)
# ── 5. Build graph ──
graph_builder = StateGraph(State)
graph_builder.add_node("agent", call_model)
graph_builder.add_node("tools", tool_node)
graph_builder.add_edge(START, "agent")
# Conditional edge: if the model called a tool → go to tools node
# Otherwise → end
graph_builder.add_conditional_edges(
"agent",
tools_condition, # built-in: checks for tool_calls in last AIMessage
{"tools": "tools", END: END},
)
# After tools run, always go back to agent
graph_builder.add_edge("tools", "agent")
graph = graph_builder.compile()
# ── 6. Run ──
from langchain_core.messages import HumanMessage
result = graph.invoke({
"messages": [HumanMessage("What is 2 to the power of 32, and when was Python 3.12 released?")]
})
print(result["messages"][-1].content)
Persistence & Checkpointing
By default, each graph.invoke() call is stateless. Adding a checkpointer makes the graph durable — every node's state is saved so runs can be paused, resumed, or rewound.
from langgraph.checkpoint.memory import MemorySaver
# Attach checkpointer at compile time
checkpointer = MemorySaver()
graph = graph_builder.compile(checkpointer=checkpointer)
# thread_id scopes the memory to one conversation / user session
config = {"configurable": {"thread_id": "user-42-session-1"}}
# Turn 1
result1 = graph.invoke(
{"messages": [HumanMessage("My name is Alice.")]},
config=config,
)
# Turn 2 — messages from turn 1 are automatically included
result2 = graph.invoke(
{"messages": [HumanMessage("What's my name?")]},
config=config,
)
print(result2["messages"][-1].content) # → "Your name is Alice."
# Inspect the full conversation history
state = graph.get_state(config)
print([m.type for m in state.values["messages"]])
# → ['human', 'ai', 'human', 'ai']
from langgraph.checkpoint.sqlite import SqliteSaver
# SQLite — great for development and single-instance production
with SqliteSaver.from_conn_string("./checkpoints.db") as saver:
graph = graph_builder.compile(checkpointer=saver)
config = {"configurable": {"thread_id": "user-42"}}
result = graph.invoke({"messages": [HumanMessage("Hello")]}, config=config)
# For production use PostgresSaver from langgraph-checkpoint-postgres:
# from langgraph.checkpoint.postgres import PostgresSaver
# saver = PostgresSaver.from_conn_string("postgresql://user:pass@host:5432/db")
Human-in-the-Loop (HITL)
HITL lets a human review and approve or modify agent actions at a specific point in the graph. The graph pauses, a human makes a decision, then execution resumes from exactly where it stopped.
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt
@tool
def delete_record(record_id: str) -> str:
"""Permanently delete a database record. This cannot be undone."""
# ... database deletion logic
return f"Record {record_id} deleted."
# Compile with interrupt_before the tools node
graph = graph_builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["tools"], # pause BEFORE executing any tool call
)
config = {"configurable": {"thread_id": "admin-session-1"}}
# Agent runs, decides to call delete_record, then PAUSES
snapshot = graph.invoke(
{"messages": [HumanMessage("Delete record R-9912")]},
config=config,
)
# Inspect the pending tool call
state = graph.get_state(config)
pending_tool = state.values["messages"][-1].tool_calls[0]
print(f"Agent wants to call: {pending_tool['name']}({pending_tool['args']})")
# Human decision: approve by resuming, or cancel by updating state
human_approved = True # In real code, show a UI prompt here
if human_approved:
# Resume from the interrupt — passes None as input
result = graph.invoke(None, config=config)
print(result["messages"][-1].content)
else:
# Cancel: update state to remove the pending tool calls
graph.update_state(
config,
{"messages": [AIMessage(content="Action cancelled by operator.")]},
as_node="agent",
)
Streaming Agent Output
LangGraph supports multiple streaming modes so users see progress in real time rather than waiting for the full run to complete.
from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": "stream-test"}}
input_msg = {"messages": [HumanMessage("Research quantum computing and summarise.")]}
# ── stream_mode="updates" — see each node's state delta as it runs ──
for event in graph.stream(input_msg, config, stream_mode="updates"):
node_name = list(event.keys())[0]
print(f"[{node_name}] updated state keys: {list(event[node_name].keys())}")
# ── stream_mode="messages" — token-by-token LLM output ──
async def stream_tokens():
async for msg, metadata in graph.astream(
input_msg, config, stream_mode="messages"
):
if hasattr(msg, "content") and msg.content:
print(msg.content, end="", flush=True)
import asyncio
asyncio.run(stream_tokens())
In a chat application, stream_mode="messages" gives you token-level events as the LLM generates, enabling a ChatGPT-like typing effect. stream_mode="updates" is better for showing progress indicators ("Agent is searching...") between steps.