Module 06

Resiliency & Fault Tolerance

⏱ ~4 hours ❓ 12-question quiz 🎯 Unlock Module 07

Failure Taxonomy for LLM Applications

Understanding which type of failure you're handling determines the right fix. Applying a retry strategy to a semantic failure wastes money; ignoring a transient network error degrades reliability.

TypeExamplesStrategy
TransientRate limit (429), network timeout, upstream 503Retry with exponential backoff
SemanticHallucination, wrong tool selected, bad output formatFallback chain, guardrail re-ask, reflection loop
StructuralInfinite agent loop, context overflow, OOMMax iterations, context compression, circuit breaker
CascadeOne agent's failure breaking the whole workflowIsolation, circuit breaker, dead-letter queue

Retry Strategies

Use tenacity for fine-grained retry control, or LangChain's built-in with_retry() wrapper for simple cases. Always add jitter to prevent a thundering herd when multiple agents retry simultaneously.

python Tenacity retry with exponential backoff + jitter
import openai
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential_jitter,
    retry_if_exception_type,
    before_sleep_log,
)
import logging

logger = logging.getLogger(__name__)

# Retry only on transient errors; fail fast on auth errors
RETRYABLE = (
    openai.RateLimitError,
    openai.APIConnectionError,
    openai.APITimeoutError,
    openai.InternalServerError,
)

@retry(
    retry=retry_if_exception_type(RETRYABLE),
    wait=wait_exponential_jitter(initial=1, max=60, jitter=2),
    stop=stop_after_attempt(5),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)
async def resilient_llm_call(model, messages: list) -> str:
    """LLM call with automatic retry on transient failures."""
    response = await model.ainvoke(messages)
    return response.content

# ── LangChain built-in with_retry() wrapper ──
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableRetry

model = ChatOpenAI(model="gpt-4o-mini")

resilient_model = model.with_retry(
    retry_if_exception_type=(
        openai.RateLimitError,
        openai.APIConnectionError,
    ),
    wait_exponential_jitter=True,
    stop_after_attempt=4,
)

# Drop in as a direct replacement — same .invoke() / .stream() interface
result = resilient_model.invoke("What is RAG?")
⚠️
Don't retry on all errors

Retrying an AuthenticationError (invalid API key) or a BadRequestError (malformed prompt) will never succeed and will waste your retry budget. Only retry on errors that might succeed if you wait: rate limits (429), server errors (500, 503), and network timeouts. Fail fast on everything else.

Fallback Chains

with_fallbacks() chains models by priority. When the primary model fails (after retries are exhausted), the next model in the chain is tried automatically.

python Tiered model fallback chain
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Primary: most capable model
primary   = ChatOpenAI(model="gpt-4o", temperature=0)
# Fallback 1: cheaper / different provider
fallback1 = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Fallback 2: completely different provider
fallback2 = ChatAnthropic(model="claude-haiku-4-5-20251001", temperature=0)
# Fallback 3: static response when all LLMs fail
from langchain_core.runnables import RunnableLambda
static_fallback = RunnableLambda(
    lambda _: "I'm temporarily unavailable. Please try again in a few minutes."
)

# Build the fallback chain
resilient_model = primary.with_fallbacks(
    [fallback1, fallback2, static_fallback],
    exceptions_to_handle=(
        Exception,  # catch everything; the static fallback is the safety net
    ),
)

prompt  = ChatPromptTemplate.from_template("Answer: {question}")
parser  = StrOutputParser()
chain   = prompt | resilient_model | parser

# This chain never raises — worst case returns the static message
result = chain.invoke({"question": "What is LangChain?"})
print(result)

Circuit Breaker Pattern

A circuit breaker prevents an agent from repeatedly calling a failing service. After N consecutive failures it opens (stops calling), then transitions to half-open (try one probe), and returns to closed (normal) on success.

python Circuit breaker as a LangGraph node
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable

class CircuitState(Enum):
    CLOSED    = "closed"      # normal operation
    OPEN      = "open"        # rejecting calls
    HALF_OPEN = "half_open"   # testing if service recovered

@dataclass
class CircuitBreaker:
    failure_threshold:  int   = 5    # failures before opening
    recovery_timeout:   float = 60.0 # seconds before half-open probe
    success_threshold:  int   = 2    # successes in half-open to close

    # Internal state
    state:              CircuitState = field(default=CircuitState.CLOSED, init=False)
    failure_count:      int          = field(default=0, init=False)
    success_count:      int          = field(default=0, init=False)
    last_failure_time:  float        = field(default=0.0, init=False)

    def call(self, func: Callable, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise RuntimeError(
                    f"Circuit OPEN — service unavailable. "
                    f"Retry in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
                )

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise e

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
        elif self.state == CircuitState.CLOSED:
            self.failure_count = max(0, self.failure_count - 1)

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# ── Using the circuit breaker in a LangGraph node ──
openai_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def resilient_llm_node(state: dict) -> dict:
    try:
        result = openai_breaker.call(
            model.invoke,
            state["messages"]
        )
        return {"messages": [result]}
    except RuntimeError as e:
        # Circuit is open — use cached response or escalate
        return {"messages": [AIMessage(content=f"Service temporarily unavailable: {e}")],
                "error": "circuit_open"}

Timeout Management

python Per-node async timeouts in LangGraph
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage

model = ChatOpenAI(model="gpt-4o", temperature=0)
LLM_TIMEOUT_SECONDS = 30

async def timeout_guarded_node(state: dict) -> dict:
    """LLM node that times out and returns a graceful fallback."""
    try:
        response = await asyncio.wait_for(
            model.ainvoke(state["messages"]),
            timeout=LLM_TIMEOUT_SECONDS,
        )
        return {"messages": [response]}
    except asyncio.TimeoutError:
        return {
            "messages": [AIMessage(
                content="The request timed out. Please try again with a simpler query."
            )],
            "error": "timeout",
            "timed_out": True,
        }

# Compile with the async node
from langgraph.graph import StateGraph, START, END
builder = StateGraph(dict)
builder.add_node("llm", timeout_guarded_node)
builder.add_edge(START, "llm")
builder.add_edge("llm", END)
graph = builder.compile()

Idempotency & Safe Replay

An idempotent operation produces the same result when called multiple times. This is critical for tools that have side effects — you don't want to charge a credit card twice because a retry triggered after the first call succeeded.

python Idempotent tool call with Redis deduplication key
import hashlib, json, redis
from langchain_core.tools import tool
from datetime import timedelta

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
IDEMPOTENCY_TTL = timedelta(hours=24)

def idempotency_key(tool_name: str, args: dict) -> str:
    """Deterministic key based on tool name + sorted args."""
    payload = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
    return f"idem:{hashlib.sha256(payload.encode()).hexdigest()}"

@tool
def send_invoice(customer_id: str, amount_cents: int) -> dict:
    """Send an invoice to a customer. Safe to retry — deduplicated by args."""
    key = idempotency_key("send_invoice", {"customer_id": customer_id, "amount_cents": amount_cents})

    # Check if this exact call was already completed
    cached = redis_client.get(key)
    if cached:
        return json.loads(cached)   # return the original result

    # Execute the side-effectful operation
    invoice_id = f"INV-{customer_id}-{amount_cents}"
    result = {"status": "sent", "invoice_id": invoice_id}

    # Cache with TTL so retries within 24h return the same result
    redis_client.setex(key, IDEMPOTENCY_TTL, json.dumps(result))
    return result

Backpressure & Rate Limiting

python asyncio.Semaphore to cap concurrent LLM calls
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

model = ChatOpenAI(model="gpt-4o-mini")

# Limit to 5 concurrent LLM calls regardless of how many are queued
MAX_CONCURRENT = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT)

async def rate_limited_call(query: str) -> str:
    async with semaphore:
        response = await model.ainvoke([HumanMessage(content=query)])
        return response.content

async def process_batch(queries: list[str]) -> list[str]:
    """Process many queries with automatic concurrency limiting."""
    tasks = [rate_limited_call(q) for q in queries]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # Handle individual failures without failing the whole batch
    return [
        r if not isinstance(r, Exception) else f"Error: {r}"
        for r in results
    ]

# Process 50 queries but never more than 5 LLM calls at once
queries = [f"Summarise document {i}" for i in range(50)]
results = asyncio.run(process_batch(queries))
💡
Layer your resiliency mechanisms

Use all four mechanisms together: Semaphore limits concurrent calls → Retry handles transient failures → Fallback handles persistent primary failures → Circuit breaker stops cascade failures. Each layer catches what the previous layer misses.


📝 Knowledge Check

Module 06 — Quiz

Score 80% or higher (10 out of 12) to unlock Module 07.

0 of 12 answered