Enterprise Patterns & Production Readiness
1. Production Readiness Checklist
Multi-tenancy
Isolate tenant data, rate limits, and vector namespaces. Never let Tenant A see Tenant B's data.
Authentication
JWT-validated API endpoints. Embed tenant ID in token claims for downstream isolation.
Semantic Caching
Cache LLM responses by semantic similarity to reduce repeated API costs.
Observability
Prometheus metrics, structured JSON logs, distributed tracing (LangSmith/Langfuse).
Rate Limiting
Per-tenant token budgets enforced via Redis counters. Prevents runaway cost.
Containerisation
Multi-stage Docker build, Kubernetes Deployment + HPA, health check endpoints.
2. Multi-Tenancy with Namespace Isolation
Use a separate ChromaDB collection (or Pinecone namespace) per tenant so retrieval is always scoped to the correct tenant's data.
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
def get_tenant_store(tenant_id: str) -> Chroma:
"""Returns a vector store scoped to the tenant's collection."""
return Chroma(
collection_name=f"tenant_{tenant_id}", # isolated collection
embedding_function=embeddings,
persist_directory=f"./chroma_db/{tenant_id}",
)
def index_tenant_document(tenant_id: str, documents: list):
store = get_tenant_store(tenant_id)
store.add_documents(documents)
def build_tenant_chain(tenant_id: str):
store = get_tenant_store(tenant_id)
retriever = store.as_retriever(search_kwargs={"k": 4})
prompt = ChatPromptTemplate.from_messages([
("system", "Answer using only the provided context. Tenant: {tenant_id}"),
("human", "Context: {context}\n\nQuestion: {question}"),
])
llm = ChatOpenAI(model="gpt-4o-mini")
return (
{
"context": retriever,
"question": RunnablePassthrough(),
"tenant_id": lambda _: tenant_id,
}
| prompt
| llm
)
# Each tenant gets an isolated retriever
chain_a = build_tenant_chain("tenant-a")
chain_b = build_tenant_chain("tenant-b")
print(chain_a.invoke("What is our refund policy?").content) # Only sees Tenant A docs
print(chain_b.invoke("What is our refund policy?").content) # Only sees Tenant B docs
3. JWT Authentication Middleware
Protect your FastAPI LLM endpoints with JWT validation. Extract tenant ID from claims to scope all downstream operations.
pip install python-jose[cryptography] passlib[bcrypt]
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
SECRET_KEY = "your-256-bit-secret" # use os.environ in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
security = HTTPBearer()
def create_access_token(tenant_id: str, user_id: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": user_id,
"tenant_id": tenant_id,
"exp": expire,
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def get_current_tenant(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
tenant_id = payload.get("tenant_id")
user_id = payload.get("sub")
if not tenant_id or not user_id:
raise HTTPException(status_code=401, detail="Invalid token claims")
return {"tenant_id": tenant_id, "user_id": user_id}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from auth import get_current_tenant, create_access_token
from multi_tenant_rag import build_tenant_chain
app = FastAPI()
class ChatRequest(BaseModel):
question: str
@app.post("/chat")
async def chat(
request: ChatRequest,
tenant: dict = Depends(get_current_tenant), # validates JWT + extracts tenant
):
chain = build_tenant_chain(tenant["tenant_id"])
response = chain.invoke(request.question)
return {"answer": response.content, "tenant_id": tenant["tenant_id"]}
# Dev helper: generate a test token
@app.get("/dev/token")
def get_test_token(tenant_id: str, user_id: str):
return {"token": create_access_token(tenant_id, user_id)}
4. Semantic Caching
Semantic caching returns cached LLM responses for semantically similar (not just identical) queries. It dramatically reduces API cost for repeated or near-duplicate questions.
from langchain.globals import set_llm_cache
from langchain_community.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# Configure semantic cache backed by Redis + vector similarity
set_llm_cache(
RedisSemanticCache(
redis_url="redis://localhost:6379",
embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
score_threshold=0.95, # cosine similarity threshold
)
)
llm = ChatOpenAI(model="gpt-4o-mini") # all calls go through cache automatically
prompt = ChatPromptTemplate.from_messages([("human", "{q}")])
chain = prompt | llm
# First call — hits API
r1 = chain.invoke({"q": "What is LangGraph?"})
# Second call (identical) — cache hit
r2 = chain.invoke({"q": "What is LangGraph?"})
# Third call (semantically similar) — likely cache hit if score > 0.95
r3 = chain.invoke({"q": "Can you explain LangGraph?"})
print(r1.content[:100])
print("Cache working" if r1.content == r2.content else "Cache miss")
score_threshold (0.90) for broader cache hits (lower cost, slightly less accurate) or higher (0.98+) for near-exact matches only. For RAG queries, cache the final answer but not intermediate retrieval steps.
5. Prometheus Metrics
Expose LLM-specific metrics (latency, token usage, error rate, cache hit rate) to Prometheus for alerting and dashboards.
pip install prometheus-client
import time
from functools import wraps
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Define metrics
REQUEST_COUNT = Counter(
"llm_requests_total",
"Total LLM requests",
["tenant_id", "model", "status"],
)
REQUEST_LATENCY = Histogram(
"llm_request_latency_seconds",
"LLM request latency",
["tenant_id", "model"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)
TOKEN_USAGE = Counter(
"llm_tokens_total",
"Total tokens used",
["tenant_id", "model", "type"], # type: prompt | completion
)
CACHE_HITS = Counter("llm_cache_hits_total", "Semantic cache hits", ["tenant_id"])
def track_llm_call(tenant_id: str, model: str):
"""Decorator to track LLM call metrics."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
try:
result = await func(*args, **kwargs)
REQUEST_COUNT.labels(tenant_id=tenant_id, model=model, status="success").inc()
# Track token usage if available
if hasattr(result, "usage_metadata"):
TOKEN_USAGE.labels(tenant_id=tenant_id, model=model, type="prompt").inc(
result.usage_metadata.get("input_tokens", 0)
)
TOKEN_USAGE.labels(tenant_id=tenant_id, model=model, type="completion").inc(
result.usage_metadata.get("output_tokens", 0)
)
return result
except Exception as e:
REQUEST_COUNT.labels(tenant_id=tenant_id, model=model, status="error").inc()
raise
finally:
REQUEST_LATENCY.labels(tenant_id=tenant_id, model=model).observe(
time.time() - start
)
return wrapper
return decorator
# Start metrics server on port 8001
start_http_server(8001) # scrape at http://localhost:8001/metrics
6. Structured JSON Logging
JSON-formatted logs integrate with log aggregators (ELK, CloudWatch, Datadog) and enable structured queries.
import logging
import json
import time
from uuid import uuid4
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
log = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"trace_id": getattr(record, "trace_id", None),
"tenant_id": getattr(record, "tenant_id", None),
"model": getattr(record, "model", None),
"latency_ms": getattr(record, "latency_ms", None),
}
if record.exc_info:
log["exception"] = self.formatException(record.exc_info)
return json.dumps({k: v for k, v in log.items() if v is not None})
def setup_logging():
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(handler)
logger = logging.getLogger("llm-api")
def log_llm_request(tenant_id: str, model: str, latency_ms: float, tokens: int):
logger.info(
"LLM request completed",
extra={
"trace_id": str(uuid4()),
"tenant_id": tenant_id,
"model": model,
"latency_ms": round(latency_ms, 2),
"tokens": tokens,
},
)
7. Docker & Kubernetes Deployment
# Multi-stage build for smaller image
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1
# Non-root user for security
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=5s CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-api
spec:
replicas: 3
selector:
matchLabels:
app: llm-api
template:
metadata:
labels:
app: llm-api
spec:
containers:
- name: llm-api
image: your-registry/llm-api:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: llm-secrets
key: openai-api-key
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "2Gi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-api
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
8. Per-Tenant Rate Limiting & Cost Controls
import redis.asyncio as redis
from fastapi import HTTPException
r = redis.from_url("redis://localhost:6379")
# Limits per tenant per day
DAILY_TOKEN_LIMITS = {
"free": 50_000,
"pro": 500_000,
"enterprise": 5_000_000,
}
async def check_and_consume_tokens(
tenant_id: str,
tier: str,
tokens_to_use: int,
) -> bool:
key = f"tokens:{tenant_id}:{__import__('datetime').date.today()}"
limit = DAILY_TOKEN_LIMITS.get(tier, DAILY_TOKEN_LIMITS["free"])
async with r.pipeline() as pipe:
await pipe.incrby(key, tokens_to_use)
await pipe.expire(key, 86400) # expire after 24h
results = await pipe.execute()
used = results[0]
if used > limit:
raise HTTPException(
status_code=429,
detail=f"Daily token limit exceeded ({limit:,} tokens). Used: {used:,}",
)
return True
# Example: call before invoking LLM
# Estimate tokens with tiktoken before calling, or use actual usage after
async def guarded_llm_call(tenant_id: str, tier: str, question: str):
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o-mini")
estimated_tokens = len(enc.encode(question)) + 500 # rough estimate
await check_and_consume_tokens(tenant_id, tier, estimated_tokens)
# ... proceed with LLM call