Module 13

Enterprise Patterns & Production Readiness

⏱ ~4.5 hours ❓ 12-question quiz 🎯 Unlock Module 14

1. Production Readiness Checklist

Multi-tenancy

Isolate tenant data, rate limits, and vector namespaces. Never let Tenant A see Tenant B's data.

Authentication

JWT-validated API endpoints. Embed tenant ID in token claims for downstream isolation.

Semantic Caching

Cache LLM responses by semantic similarity to reduce repeated API costs.

Observability

Prometheus metrics, structured JSON logs, distributed tracing (LangSmith/Langfuse).

Rate Limiting

Per-tenant token budgets enforced via Redis counters. Prevents runaway cost.

Containerisation

Multi-stage Docker build, Kubernetes Deployment + HPA, health check endpoints.

2. Multi-Tenancy with Namespace Isolation

Use a separate ChromaDB collection (or Pinecone namespace) per tenant so retrieval is always scoped to the correct tenant's data.

multi_tenant_rag.py
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

def get_tenant_store(tenant_id: str) -> Chroma:
    """Returns a vector store scoped to the tenant's collection."""
    return Chroma(
        collection_name=f"tenant_{tenant_id}",   # isolated collection
        embedding_function=embeddings,
        persist_directory=f"./chroma_db/{tenant_id}",
    )

def index_tenant_document(tenant_id: str, documents: list):
    store = get_tenant_store(tenant_id)
    store.add_documents(documents)

def build_tenant_chain(tenant_id: str):
    store = get_tenant_store(tenant_id)
    retriever = store.as_retriever(search_kwargs={"k": 4})

    prompt = ChatPromptTemplate.from_messages([
        ("system", "Answer using only the provided context. Tenant: {tenant_id}"),
        ("human", "Context: {context}\n\nQuestion: {question}"),
    ])
    llm = ChatOpenAI(model="gpt-4o-mini")

    return (
        {
            "context": retriever,
            "question": RunnablePassthrough(),
            "tenant_id": lambda _: tenant_id,
        }
        | prompt
        | llm
    )

# Each tenant gets an isolated retriever
chain_a = build_tenant_chain("tenant-a")
chain_b = build_tenant_chain("tenant-b")

print(chain_a.invoke("What is our refund policy?").content)   # Only sees Tenant A docs
print(chain_b.invoke("What is our refund policy?").content)   # Only sees Tenant B docs

3. JWT Authentication Middleware

Protect your FastAPI LLM endpoints with JWT validation. Extract tenant ID from claims to scope all downstream operations.

bash
pip install python-jose[cryptography] passlib[bcrypt]
auth.py
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

SECRET_KEY = "your-256-bit-secret"   # use os.environ in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60

security = HTTPBearer()

def create_access_token(tenant_id: str, user_id: str) -> str:
    expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    payload = {
        "sub": user_id,
        "tenant_id": tenant_id,
        "exp": expire,
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def get_current_tenant(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        tenant_id = payload.get("tenant_id")
        user_id = payload.get("sub")
        if not tenant_id or not user_id:
            raise HTTPException(status_code=401, detail="Invalid token claims")
        return {"tenant_id": tenant_id, "user_id": user_id}
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
        )
api.py — protected endpoint
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from auth import get_current_tenant, create_access_token
from multi_tenant_rag import build_tenant_chain

app = FastAPI()

class ChatRequest(BaseModel):
    question: str

@app.post("/chat")
async def chat(
    request: ChatRequest,
    tenant: dict = Depends(get_current_tenant),   # validates JWT + extracts tenant
):
    chain = build_tenant_chain(tenant["tenant_id"])
    response = chain.invoke(request.question)
    return {"answer": response.content, "tenant_id": tenant["tenant_id"]}

# Dev helper: generate a test token
@app.get("/dev/token")
def get_test_token(tenant_id: str, user_id: str):
    return {"token": create_access_token(tenant_id, user_id)}

4. Semantic Caching

Semantic caching returns cached LLM responses for semantically similar (not just identical) queries. It dramatically reduces API cost for repeated or near-duplicate questions.

semantic_cache.py
from langchain.globals import set_llm_cache
from langchain_community.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# Configure semantic cache backed by Redis + vector similarity
set_llm_cache(
    RedisSemanticCache(
        redis_url="redis://localhost:6379",
        embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
        score_threshold=0.95,   # cosine similarity threshold
    )
)

llm = ChatOpenAI(model="gpt-4o-mini")   # all calls go through cache automatically
prompt = ChatPromptTemplate.from_messages([("human", "{q}")])
chain = prompt | llm

# First call — hits API
r1 = chain.invoke({"q": "What is LangGraph?"})

# Second call (identical) — cache hit
r2 = chain.invoke({"q": "What is LangGraph?"})

# Third call (semantically similar) — likely cache hit if score > 0.95
r3 = chain.invoke({"q": "Can you explain LangGraph?"})

print(r1.content[:100])
print("Cache working" if r1.content == r2.content else "Cache miss")
Cache invalidation: Use a lower score_threshold (0.90) for broader cache hits (lower cost, slightly less accurate) or higher (0.98+) for near-exact matches only. For RAG queries, cache the final answer but not intermediate retrieval steps.

5. Prometheus Metrics

Expose LLM-specific metrics (latency, token usage, error rate, cache hit rate) to Prometheus for alerting and dashboards.

bash
pip install prometheus-client
metrics.py
import time
from functools import wraps
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Define metrics
REQUEST_COUNT = Counter(
    "llm_requests_total",
    "Total LLM requests",
    ["tenant_id", "model", "status"],
)
REQUEST_LATENCY = Histogram(
    "llm_request_latency_seconds",
    "LLM request latency",
    ["tenant_id", "model"],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)
TOKEN_USAGE = Counter(
    "llm_tokens_total",
    "Total tokens used",
    ["tenant_id", "model", "type"],   # type: prompt | completion
)
CACHE_HITS = Counter("llm_cache_hits_total", "Semantic cache hits", ["tenant_id"])

def track_llm_call(tenant_id: str, model: str):
    """Decorator to track LLM call metrics."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.time()
            try:
                result = await func(*args, **kwargs)
                REQUEST_COUNT.labels(tenant_id=tenant_id, model=model, status="success").inc()
                # Track token usage if available
                if hasattr(result, "usage_metadata"):
                    TOKEN_USAGE.labels(tenant_id=tenant_id, model=model, type="prompt").inc(
                        result.usage_metadata.get("input_tokens", 0)
                    )
                    TOKEN_USAGE.labels(tenant_id=tenant_id, model=model, type="completion").inc(
                        result.usage_metadata.get("output_tokens", 0)
                    )
                return result
            except Exception as e:
                REQUEST_COUNT.labels(tenant_id=tenant_id, model=model, status="error").inc()
                raise
            finally:
                REQUEST_LATENCY.labels(tenant_id=tenant_id, model=model).observe(
                    time.time() - start
                )
        return wrapper
    return decorator

# Start metrics server on port 8001
start_http_server(8001)   # scrape at http://localhost:8001/metrics

6. Structured JSON Logging

JSON-formatted logs integrate with log aggregators (ELK, CloudWatch, Datadog) and enable structured queries.

logging_config.py
import logging
import json
import time
from uuid import uuid4

class JSONFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        log = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "trace_id": getattr(record, "trace_id", None),
            "tenant_id": getattr(record, "tenant_id", None),
            "model": getattr(record, "model", None),
            "latency_ms": getattr(record, "latency_ms", None),
        }
        if record.exc_info:
            log["exception"] = self.formatException(record.exc_info)
        return json.dumps({k: v for k, v in log.items() if v is not None})

def setup_logging():
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    root = logging.getLogger()
    root.setLevel(logging.INFO)
    root.addHandler(handler)

logger = logging.getLogger("llm-api")

def log_llm_request(tenant_id: str, model: str, latency_ms: float, tokens: int):
    logger.info(
        "LLM request completed",
        extra={
            "trace_id": str(uuid4()),
            "tenant_id": tenant_id,
            "model": model,
            "latency_ms": round(latency_ms, 2),
            "tokens": tokens,
        },
    )

7. Docker & Kubernetes Deployment

Dockerfile
# Multi-stage build for smaller image
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1

# Non-root user for security
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=5s CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-api
  template:
    metadata:
      labels:
        app: llm-api
    spec:
      containers:
        - name: llm-api
          image: your-registry/llm-api:latest
          ports:
            - containerPort: 8000
          env:
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: llm-secrets
                  key: openai-api-key
          resources:
            requests:
              cpu: "250m"
              memory: "512Mi"
            limits:
              cpu: "1000m"
              memory: "2Gi"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 10
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

8. Per-Tenant Rate Limiting & Cost Controls

rate_limiter.py
import redis.asyncio as redis
from fastapi import HTTPException

r = redis.from_url("redis://localhost:6379")

# Limits per tenant per day
DAILY_TOKEN_LIMITS = {
    "free": 50_000,
    "pro": 500_000,
    "enterprise": 5_000_000,
}

async def check_and_consume_tokens(
    tenant_id: str,
    tier: str,
    tokens_to_use: int,
) -> bool:
    key = f"tokens:{tenant_id}:{__import__('datetime').date.today()}"
    limit = DAILY_TOKEN_LIMITS.get(tier, DAILY_TOKEN_LIMITS["free"])

    async with r.pipeline() as pipe:
        await pipe.incrby(key, tokens_to_use)
        await pipe.expire(key, 86400)   # expire after 24h
        results = await pipe.execute()

    used = results[0]
    if used > limit:
        raise HTTPException(
            status_code=429,
            detail=f"Daily token limit exceeded ({limit:,} tokens). Used: {used:,}",
        )
    return True

# Example: call before invoking LLM
# Estimate tokens with tiktoken before calling, or use actual usage after
async def guarded_llm_call(tenant_id: str, tier: str, question: str):
    import tiktoken
    enc = tiktoken.encoding_for_model("gpt-4o-mini")
    estimated_tokens = len(enc.encode(question)) + 500   # rough estimate
    await check_and_consume_tokens(tenant_id, tier, estimated_tokens)
    # ... proceed with LLM call

📝 Knowledge Check

Module 13 — Quiz

Score 80% or higher (10 out of 12) to unlock Module 14.

0 of 12 answered