Streaming Interactions with LLMs
1. Why Streaming?
Without streaming, the user waits for the entire LLM response before seeing anything. With streaming, the first token appears within milliseconds — dramatically improving perceived responsiveness.
| Metric | Non-Streaming | Streaming |
|---|---|---|
| Time-to-first-token (TTFT) | Equal to total latency | ~200ms (model dependent) |
| Perceived responsiveness | Long blank wait | Response appears to "think" |
| Timeout risk | Higher (1 long request) | Lower (stream kept alive) |
| Implementation complexity | Simple | Moderate |
| Frontend requirements | None | SSE or WebSocket handling |
2. LangChain Streaming
All LangChain runnables support .stream() (sync) and .astream() (async). They yield AIMessageChunk objects containing partial content.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a concise technical writer."),
("human", "{topic}"),
])
chain = prompt | llm
# Sync streaming
for chunk in chain.stream({"topic": "Explain LangGraph in 3 sentences."}):
print(chunk.content, end="", flush=True)
print() # newline
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{question}"),
])
chain = prompt | llm
async def stream_response(question: str):
full_response = ""
async for chunk in chain.astream({"question": question}):
token = chunk.content
full_response += token
print(token, end="", flush=True)
print()
return full_response
asyncio.run(stream_response("What are the key features of LangGraph?"))
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
chain = (
ChatPromptTemplate.from_messages([("human", "{q}")]) | llm
).with_config({"run_name": "my-chain"})
async def stream_events():
async for event in chain.astream_events({"q": "Explain RAG"}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
# Token-level events from the LLM
chunk = event["data"]["chunk"]
print(chunk.content, end="", flush=True)
elif kind == "on_chain_start":
print(f"\n[START: {event['name']}]")
elif kind == "on_chain_end":
print(f"\n[END: {event['name']}]")
asyncio.run(stream_events())
version="v2" — the v1 format is deprecated. Events include chain start/end, LLM token chunks, tool call events, and retriever events, giving you full observability over the streaming pipeline.
3. LangGraph Streaming Modes
LangGraph graphs expose a .stream() method with four modes that control what gets yielded at each step.
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, BaseMessage
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
def agent(state: State) -> State:
return {"messages": [llm.invoke(state["messages"])]}
graph = StateGraph(State)
graph.add_node("agent", agent)
graph.set_entry_point("agent")
graph.add_edge("agent", END)
app = graph.compile()
initial = {"messages": [HumanMessage(content="Write a haiku about LangGraph.")]}
# --- stream_mode="values" — full state after every node ---
print("=== values mode ===")
for state in app.stream(initial, stream_mode="values"):
print(state["messages"][-1].content)
# --- stream_mode="updates" — only the delta from each node ---
print("\n=== updates mode ===")
for update in app.stream(initial, stream_mode="updates"):
for node_name, delta in update.items():
print(f"[{node_name}] {delta}")
# --- stream_mode="messages" — individual LLM token chunks ---
print("\n=== messages mode (tokens) ===")
for chunk, metadata in app.stream(initial, stream_mode="messages"):
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
print()
values— UI state sync (show the current full state after each node)updates— debugging (see exactly what each node changed)messages— real-time token rendering in chat UIsevents— full observability (combine with astream_events)
4. Server-Sent Events (SSE) with FastAPI
SSE is the simplest way to push streaming LLM tokens to a browser over HTTP/1.1. No WebSocket upgrade is needed — it's a standard GET/POST that stays open.
pip install fastapi uvicorn sse-starlette langchain-openai
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
app = FastAPI()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{question}"),
])
chain = prompt | llm
class ChatRequest(BaseModel):
question: str
async def token_generator(question: str):
async for chunk in chain.astream({"question": question}):
token = chunk.content
if token:
yield f"data: {token}\n\n" # SSE format
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
return StreamingResponse(
token_generator(request.question),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # disable nginx buffering
},
)
# Run: uvicorn sse_server:app --reload
<div id="output"></div>
<script>
async function streamChat(question) {
const output = document.getElementById("output");
output.textContent = "";
const response = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const token = line.slice(6);
if (token === "[DONE]") return;
output.textContent += token;
}
}
}
}
streamChat("Explain LangGraph streaming modes.");
</script>
5. WebSocket Streaming
WebSockets provide full-duplex communication — ideal when clients need to send multiple messages and receive streaming responses in the same connection (e.g. multi-turn chat).
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
app = FastAPI()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="history"),
("human", "{question}"),
])
chain = prompt | llm
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
history: list[BaseMessage] = []
try:
while True:
question = await websocket.receive_text()
full_response = ""
async for chunk in chain.astream({"question": question, "history": history}):
token = chunk.content
if token:
full_response += token
await websocket.send_text(token) # send each token immediately
await websocket.send_text("[DONE]") # signal completion
history.append(HumanMessage(content=question))
history.append(AIMessage(content=full_response))
except WebSocketDisconnect:
pass
const ws = new WebSocket("ws://localhost:8000/ws/chat");
const output = document.getElementById("output");
ws.onmessage = (event) => {
if (event.data === "[DONE]") {
output.textContent += "\n"; // new line after response
return;
}
output.textContent += event.data;
};
function sendMessage(text) {
output.textContent = ""; // clear for new response
ws.send(text);
}
sendMessage("Tell me about RAG pipelines.");
6. Streaming a Full LangGraph Agent via SSE
Combine LangGraph's stream_mode="messages" with FastAPI SSE to stream agent thinking and tool calls in real time.
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
app = FastAPI()
@tool
def get_weather(city: str) -> str:
"""Get current weather for a city."""
return f"The weather in {city} is sunny, 25°C."
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
agent = create_react_agent(llm, tools=[get_weather])
class AgentRequest(BaseModel):
message: str
thread_id: str = "default"
async def agent_token_generator(message: str):
async for chunk, metadata in agent.astream(
{"messages": [HumanMessage(content=message)]},
stream_mode="messages",
):
if hasattr(chunk, "content") and chunk.content:
event = {"type": "token", "content": chunk.content}
yield f"data: {json.dumps(event)}\n\n"
elif hasattr(chunk, "tool_calls") and chunk.tool_calls:
for tc in chunk.tool_calls:
event = {"type": "tool_call", "tool": tc["name"], "args": tc["args"]}
yield f"data: {json.dumps(event)}\n\n"
yield "data: {\"type\": \"done\"}\n\n"
@app.post("/agent/stream")
async def stream_agent(request: AgentRequest):
return StreamingResponse(
agent_token_generator(request.message),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
7. Backpressure & Error Handling in Streams
Streaming errors mid-response require special handling — the HTTP status 200 has already been sent, so you must signal errors in-band.
import json
import asyncio
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([("human", "{q}")])
chain = prompt | llm
async def safe_token_generator(question: str):
try:
async with asyncio.timeout(30): # 30s overall timeout
async for chunk in chain.astream({"q": question}):
if chunk.content:
yield f"data: {json.dumps({'token': chunk.content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
except asyncio.TimeoutError:
yield f"data: {json.dumps({'error': 'Request timed out'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
{"error": "..."} events in your frontend. Once the SSE stream starts, HTTP errors can't be sent — errors MUST be delivered as data events.