Resiliency & Fault Tolerance
Failure Taxonomy for LLM Applications
Understanding which type of failure you're handling determines the right fix. Applying a retry strategy to a semantic failure wastes money; ignoring a transient network error degrades reliability.
| Type | Examples | Strategy |
|---|---|---|
| Transient | Rate limit (429), network timeout, upstream 503 | Retry with exponential backoff |
| Semantic | Hallucination, wrong tool selected, bad output format | Fallback chain, guardrail re-ask, reflection loop |
| Structural | Infinite agent loop, context overflow, OOM | Max iterations, context compression, circuit breaker |
| Cascade | One agent's failure breaking the whole workflow | Isolation, circuit breaker, dead-letter queue |
Retry Strategies
Use tenacity for fine-grained retry control, or LangChain's built-in with_retry() wrapper for simple cases. Always add jitter to prevent a thundering herd when multiple agents retry simultaneously.
import openai
from tenacity import (
retry,
stop_after_attempt,
wait_exponential_jitter,
retry_if_exception_type,
before_sleep_log,
)
import logging
logger = logging.getLogger(__name__)
# Retry only on transient errors; fail fast on auth errors
RETRYABLE = (
openai.RateLimitError,
openai.APIConnectionError,
openai.APITimeoutError,
openai.InternalServerError,
)
@retry(
retry=retry_if_exception_type(RETRYABLE),
wait=wait_exponential_jitter(initial=1, max=60, jitter=2),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
async def resilient_llm_call(model, messages: list) -> str:
"""LLM call with automatic retry on transient failures."""
response = await model.ainvoke(messages)
return response.content
# ── LangChain built-in with_retry() wrapper ──
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableRetry
model = ChatOpenAI(model="gpt-4o-mini")
resilient_model = model.with_retry(
retry_if_exception_type=(
openai.RateLimitError,
openai.APIConnectionError,
),
wait_exponential_jitter=True,
stop_after_attempt=4,
)
# Drop in as a direct replacement — same .invoke() / .stream() interface
result = resilient_model.invoke("What is RAG?")
Retrying an AuthenticationError (invalid API key) or a BadRequestError (malformed prompt) will never succeed and will waste your retry budget. Only retry on errors that might succeed if you wait: rate limits (429), server errors (500, 503), and network timeouts. Fail fast on everything else.
Fallback Chains
with_fallbacks() chains models by priority. When the primary model fails (after retries are exhausted), the next model in the chain is tried automatically.
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Primary: most capable model
primary = ChatOpenAI(model="gpt-4o", temperature=0)
# Fallback 1: cheaper / different provider
fallback1 = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Fallback 2: completely different provider
fallback2 = ChatAnthropic(model="claude-haiku-4-5-20251001", temperature=0)
# Fallback 3: static response when all LLMs fail
from langchain_core.runnables import RunnableLambda
static_fallback = RunnableLambda(
lambda _: "I'm temporarily unavailable. Please try again in a few minutes."
)
# Build the fallback chain
resilient_model = primary.with_fallbacks(
[fallback1, fallback2, static_fallback],
exceptions_to_handle=(
Exception, # catch everything; the static fallback is the safety net
),
)
prompt = ChatPromptTemplate.from_template("Answer: {question}")
parser = StrOutputParser()
chain = prompt | resilient_model | parser
# This chain never raises — worst case returns the static message
result = chain.invoke({"question": "What is LangChain?"})
print(result)
Circuit Breaker Pattern
A circuit breaker prevents an agent from repeatedly calling a failing service. After N consecutive failures it opens (stops calling), then transitions to half-open (try one probe), and returns to closed (normal) on success.
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable
class CircuitState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # rejecting calls
HALF_OPEN = "half_open" # testing if service recovered
@dataclass
class CircuitBreaker:
failure_threshold: int = 5 # failures before opening
recovery_timeout: float = 60.0 # seconds before half-open probe
success_threshold: int = 2 # successes in half-open to close
# Internal state
state: CircuitState = field(default=CircuitState.CLOSED, init=False)
failure_count: int = field(default=0, init=False)
success_count: int = field(default=0, init=False)
last_failure_time: float = field(default=0.0, init=False)
def call(self, func: Callable, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise RuntimeError(
f"Circuit OPEN — service unavailable. "
f"Retry in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = max(0, self.failure_count - 1)
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# ── Using the circuit breaker in a LangGraph node ──
openai_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def resilient_llm_node(state: dict) -> dict:
try:
result = openai_breaker.call(
model.invoke,
state["messages"]
)
return {"messages": [result]}
except RuntimeError as e:
# Circuit is open — use cached response or escalate
return {"messages": [AIMessage(content=f"Service temporarily unavailable: {e}")],
"error": "circuit_open"}
Timeout Management
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage
model = ChatOpenAI(model="gpt-4o", temperature=0)
LLM_TIMEOUT_SECONDS = 30
async def timeout_guarded_node(state: dict) -> dict:
"""LLM node that times out and returns a graceful fallback."""
try:
response = await asyncio.wait_for(
model.ainvoke(state["messages"]),
timeout=LLM_TIMEOUT_SECONDS,
)
return {"messages": [response]}
except asyncio.TimeoutError:
return {
"messages": [AIMessage(
content="The request timed out. Please try again with a simpler query."
)],
"error": "timeout",
"timed_out": True,
}
# Compile with the async node
from langgraph.graph import StateGraph, START, END
builder = StateGraph(dict)
builder.add_node("llm", timeout_guarded_node)
builder.add_edge(START, "llm")
builder.add_edge("llm", END)
graph = builder.compile()
Idempotency & Safe Replay
An idempotent operation produces the same result when called multiple times. This is critical for tools that have side effects — you don't want to charge a credit card twice because a retry triggered after the first call succeeded.
import hashlib, json, redis
from langchain_core.tools import tool
from datetime import timedelta
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
IDEMPOTENCY_TTL = timedelta(hours=24)
def idempotency_key(tool_name: str, args: dict) -> str:
"""Deterministic key based on tool name + sorted args."""
payload = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
return f"idem:{hashlib.sha256(payload.encode()).hexdigest()}"
@tool
def send_invoice(customer_id: str, amount_cents: int) -> dict:
"""Send an invoice to a customer. Safe to retry — deduplicated by args."""
key = idempotency_key("send_invoice", {"customer_id": customer_id, "amount_cents": amount_cents})
# Check if this exact call was already completed
cached = redis_client.get(key)
if cached:
return json.loads(cached) # return the original result
# Execute the side-effectful operation
invoice_id = f"INV-{customer_id}-{amount_cents}"
result = {"status": "sent", "invoice_id": invoice_id}
# Cache with TTL so retries within 24h return the same result
redis_client.setex(key, IDEMPOTENCY_TTL, json.dumps(result))
return result
Backpressure & Rate Limiting
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
model = ChatOpenAI(model="gpt-4o-mini")
# Limit to 5 concurrent LLM calls regardless of how many are queued
MAX_CONCURRENT = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def rate_limited_call(query: str) -> str:
async with semaphore:
response = await model.ainvoke([HumanMessage(content=query)])
return response.content
async def process_batch(queries: list[str]) -> list[str]:
"""Process many queries with automatic concurrency limiting."""
tasks = [rate_limited_call(q) for q in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle individual failures without failing the whole batch
return [
r if not isinstance(r, Exception) else f"Error: {r}"
for r in results
]
# Process 50 queries but never more than 5 LLM calls at once
queries = [f"Summarise document {i}" for i in range(50)]
results = asyncio.run(process_batch(queries))
Use all four mechanisms together: Semaphore limits concurrent calls → Retry handles transient failures → Fallback handles persistent primary failures → Circuit breaker stops cascade failures. Each layer catches what the previous layer misses.