Module 12

Streaming Interactions with LLMs

⏱ ~3.5 hours ❓ 10-question quiz 🎯 Unlock Module 13

1. Why Streaming?

Without streaming, the user waits for the entire LLM response before seeing anything. With streaming, the first token appears within milliseconds — dramatically improving perceived responsiveness.

MetricNon-StreamingStreaming
Time-to-first-token (TTFT)Equal to total latency~200ms (model dependent)
Perceived responsivenessLong blank waitResponse appears to "think"
Timeout riskHigher (1 long request)Lower (stream kept alive)
Implementation complexitySimpleModerate
Frontend requirementsNoneSSE or WebSocket handling

2. LangChain Streaming

All LangChain runnables support .stream() (sync) and .astream() (async). They yield AIMessageChunk objects containing partial content.

langchain_stream.py — sync streaming
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a concise technical writer."),
    ("human", "{topic}"),
])
chain = prompt | llm

# Sync streaming
for chunk in chain.stream({"topic": "Explain LangGraph in 3 sentences."}):
    print(chunk.content, end="", flush=True)
print()  # newline
langchain_astream.py — async streaming
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{question}"),
])
chain = prompt | llm

async def stream_response(question: str):
    full_response = ""
    async for chunk in chain.astream({"question": question}):
        token = chunk.content
        full_response += token
        print(token, end="", flush=True)
    print()
    return full_response

asyncio.run(stream_response("What are the key features of LangGraph?"))
astream_events.py — fine-grained event streaming
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
chain = (
    ChatPromptTemplate.from_messages([("human", "{q}")]) | llm
).with_config({"run_name": "my-chain"})

async def stream_events():
    async for event in chain.astream_events({"q": "Explain RAG"}, version="v2"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            # Token-level events from the LLM
            chunk = event["data"]["chunk"]
            print(chunk.content, end="", flush=True)
        elif kind == "on_chain_start":
            print(f"\n[START: {event['name']}]")
        elif kind == "on_chain_end":
            print(f"\n[END: {event['name']}]")

asyncio.run(stream_events())
astream_events v2: Use version="v2" — the v1 format is deprecated. Events include chain start/end, LLM token chunks, tool call events, and retriever events, giving you full observability over the streaming pipeline.

3. LangGraph Streaming Modes

LangGraph graphs expose a .stream() method with four modes that control what gets yielded at each step.

langgraph_streaming.py
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, BaseMessage
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)

def agent(state: State) -> State:
    return {"messages": [llm.invoke(state["messages"])]}

graph = StateGraph(State)
graph.add_node("agent", agent)
graph.set_entry_point("agent")
graph.add_edge("agent", END)
app = graph.compile()

initial = {"messages": [HumanMessage(content="Write a haiku about LangGraph.")]}

# --- stream_mode="values" — full state after every node ---
print("=== values mode ===")
for state in app.stream(initial, stream_mode="values"):
    print(state["messages"][-1].content)

# --- stream_mode="updates" — only the delta from each node ---
print("\n=== updates mode ===")
for update in app.stream(initial, stream_mode="updates"):
    for node_name, delta in update.items():
        print(f"[{node_name}] {delta}")

# --- stream_mode="messages" — individual LLM token chunks ---
print("\n=== messages mode (tokens) ===")
for chunk, metadata in app.stream(initial, stream_mode="messages"):
    if hasattr(chunk, "content") and chunk.content:
        print(chunk.content, end="", flush=True)
print()
Choosing a mode:
  • values — UI state sync (show the current full state after each node)
  • updates — debugging (see exactly what each node changed)
  • messages — real-time token rendering in chat UIs
  • events — full observability (combine with astream_events)

4. Server-Sent Events (SSE) with FastAPI

SSE is the simplest way to push streaming LLM tokens to a browser over HTTP/1.1. No WebSocket upgrade is needed — it's a standard GET/POST that stays open.

bash
pip install fastapi uvicorn sse-starlette langchain-openai
sse_server.py
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

app = FastAPI()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{question}"),
])
chain = prompt | llm

class ChatRequest(BaseModel):
    question: str

async def token_generator(question: str):
    async for chunk in chain.astream({"question": question}):
        token = chunk.content
        if token:
            yield f"data: {token}\n\n"   # SSE format
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    return StreamingResponse(
        token_generator(request.question),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",   # disable nginx buffering
        },
    )

# Run: uvicorn sse_server:app --reload
sse_client.html — browser SSE consumer
<div id="output"></div>
<script>
async function streamChat(question) {
  const output = document.getElementById("output");
  output.textContent = "";

  const response = await fetch("/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ question }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    const lines = text.split("\n");
    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const token = line.slice(6);
        if (token === "[DONE]") return;
        output.textContent += token;
      }
    }
  }
}

streamChat("Explain LangGraph streaming modes.");
</script>

5. WebSocket Streaming

WebSockets provide full-duplex communication — ideal when clients need to send multiple messages and receive streaming responses in the same connection (e.g. multi-turn chat).

websocket_server.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

app = FastAPI()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{question}"),
])
chain = prompt | llm

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    history: list[BaseMessage] = []
    try:
        while True:
            question = await websocket.receive_text()
            full_response = ""
            async for chunk in chain.astream({"question": question, "history": history}):
                token = chunk.content
                if token:
                    full_response += token
                    await websocket.send_text(token)   # send each token immediately
            await websocket.send_text("[DONE]")         # signal completion
            history.append(HumanMessage(content=question))
            history.append(AIMessage(content=full_response))
    except WebSocketDisconnect:
        pass
ws_client.js — browser WebSocket consumer
const ws = new WebSocket("ws://localhost:8000/ws/chat");
const output = document.getElementById("output");

ws.onmessage = (event) => {
  if (event.data === "[DONE]") {
    output.textContent += "\n";   // new line after response
    return;
  }
  output.textContent += event.data;
};

function sendMessage(text) {
  output.textContent = "";    // clear for new response
  ws.send(text);
}

sendMessage("Tell me about RAG pipelines.");

6. Streaming a Full LangGraph Agent via SSE

Combine LangGraph's stream_mode="messages" with FastAPI SSE to stream agent thinking and tool calls in real time.

agent_sse.py
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage

app = FastAPI()

@tool
def get_weather(city: str) -> str:
    """Get current weather for a city."""
    return f"The weather in {city} is sunny, 25°C."

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
agent = create_react_agent(llm, tools=[get_weather])

class AgentRequest(BaseModel):
    message: str
    thread_id: str = "default"

async def agent_token_generator(message: str):
    async for chunk, metadata in agent.astream(
        {"messages": [HumanMessage(content=message)]},
        stream_mode="messages",
    ):
        if hasattr(chunk, "content") and chunk.content:
            event = {"type": "token", "content": chunk.content}
            yield f"data: {json.dumps(event)}\n\n"
        elif hasattr(chunk, "tool_calls") and chunk.tool_calls:
            for tc in chunk.tool_calls:
                event = {"type": "tool_call", "tool": tc["name"], "args": tc["args"]}
                yield f"data: {json.dumps(event)}\n\n"
    yield "data: {\"type\": \"done\"}\n\n"

@app.post("/agent/stream")
async def stream_agent(request: AgentRequest):
    return StreamingResponse(
        agent_token_generator(request.message),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

7. Backpressure & Error Handling in Streams

Streaming errors mid-response require special handling — the HTTP status 200 has already been sent, so you must signal errors in-band.

robust_stream.py
import json
import asyncio
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
prompt = ChatPromptTemplate.from_messages([("human", "{q}")])
chain = prompt | llm

async def safe_token_generator(question: str):
    try:
        async with asyncio.timeout(30):   # 30s overall timeout
            async for chunk in chain.astream({"q": question}):
                if chunk.content:
                    yield f"data: {json.dumps({'token': chunk.content})}\n\n"
        yield f"data: {json.dumps({'done': True})}\n\n"
    except asyncio.TimeoutError:
        yield f"data: {json.dumps({'error': 'Request timed out'})}\n\n"
    except Exception as e:
        yield f"data: {json.dumps({'error': str(e)})}\n\n"
Client-side handling: Always listen for {"error": "..."} events in your frontend. Once the SSE stream starts, HTTP errors can't be sent — errors MUST be delivered as data events.

📝 Knowledge Check

Module 12 — Quiz

Score 80% or higher (8 out of 10) to unlock Module 13.

0 of 10 answered