Module 09

Langfuse: Open-Source Observability

⏱ ~3.5 hours ❓ 10-question quiz 🎯 Unlock Module 10

1. Why Langfuse?

Langfuse is a fully open-source LLM engineering platform. Unlike fully-managed SaaS, Langfuse can be self-hosted on your own infrastructure so telemetry data never leaves your network — a hard requirement for many regulated industries.

CapabilityLangSmithLangfuse
Open sourceNoYes (MIT)
Self-hostableEnterprise onlyDocker / Kubernetes
Trace storageLangSmith cloudPostgres + S3/Minio
Prompt versioningHub (pull/push)Built-in prompt management
Dataset / evalsYesYes
Cost trackingLimitedNative token + cost dashboards
LangChain integrationLANGCHAIN_TRACING_V2CallbackHandler
When to choose Langfuse: Data residency requirements, on-prem deployments, cost-sensitive teams, or when you want to extend observability with custom plugins.

2. Self-Hosting with Docker Compose

The fastest way to run Langfuse locally or on a VM is the official Docker Compose stack, which bundles the web server, background worker, and Postgres.

docker-compose.yml (minimal)
version: "3.9"
services:
  langfuse-server:
    image: langfuse/langfuse:2
    depends_on:
      - db
    ports:
      - "3000:3000"
    environment:
      DATABASE_URL: postgresql://langfuse:langfuse@db:5432/langfuse
      NEXTAUTH_SECRET: change_me_in_production
      SALT: change_me_in_production
      NEXTAUTH_URL: http://localhost:3000
      TELEMETRY_ENABLED: "false"   # opt-out of usage telemetry

  db:
    image: postgres:15
    environment:
      POSTGRES_USER: langfuse
      POSTGRES_PASSWORD: langfuse
      POSTGRES_DB: langfuse
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:
shell
# Start the stack
docker compose up -d

# Access UI at http://localhost:3000
# Create an organisation → project → copy API keys
Production hardening: Replace NEXTAUTH_SECRET and SALT with long random strings, add an Nginx/Caddy reverse proxy with TLS, and mount a persistent volume for Postgres data.

3. Python SDK & Environment Setup

bash
pip install langfuse langchain-openai langchain
.env
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_HOST=http://localhost:3000   # or https://cloud.langfuse.com

OPENAI_API_KEY=sk-...
python — verify connection
from langfuse import Langfuse

lf = Langfuse()
lf.auth_check()   # raises if credentials are wrong
print("Langfuse connection OK")

4. LangChain CallbackHandler — Zero-Code Tracing

Drop the CallbackHandler into any LangChain call and every chain step, LLM call, retriever invocation, and tool call is automatically captured as a trace in Langfuse.

langchain_callback.py
from langfuse.callback import CallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# Create a handler — one per "session" or "user request"
handler = CallbackHandler(
    user_id="user-42",
    session_id="session-abc",
    metadata={"environment": "staging", "feature": "support-bot"},
    tags=["support", "gpt-4o-mini"],
)

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful support agent."),
    ("human", "{question}"),
])
chain = prompt | llm

# Pass handler in config — all child spans inherit it
response = chain.invoke(
    {"question": "How do I reset my password?"},
    config={"callbacks": [handler]},
)

# IMPORTANT: flush before process exits
handler.flush()
print(response.content)
Automatic attributes captured: model name, temperature, prompt tokens, completion tokens, latency per step, total cost (using built-in pricing tables for OpenAI/Anthropic/etc.), and any metadata/tags you attach.

5. The @observe Decorator — Structured Spans

Use @observe to wrap your own Python functions as named spans in the trace tree. This gives you end-to-end visibility across application logic, not just LLM calls.

observe_decorator.py
from langfuse.decorators import langfuse_context, observe
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer concisely."),
    ("human", "{question}"),
])

@observe(name="retrieve-context")
def retrieve_context(query: str) -> str:
    # Simulate a retriever
    return f"[Doc about: {query[:30]}]"

@observe(name="generate-answer")
def generate_answer(question: str, context: str) -> str:
    chain = prompt | llm
    handler = langfuse_context.get_current_langchain_handler()
    result = chain.invoke(
        {"question": f"Context: {context}\n\nQuestion: {question}"},
        config={"callbacks": [handler]},
    )
    return result.content

@observe(name="rag-pipeline")
def rag_pipeline(question: str) -> str:
    # Attach metadata to the root span
    langfuse_context.update_current_observation(
        metadata={"question_length": len(question)},
        tags=["rag", "production"],
    )
    ctx = retrieve_context(question)
    answer = generate_answer(question, ctx)

    # Score the root trace inline
    langfuse_context.score_current_trace(
        name="auto-relevance",
        value=0.9,
        comment="High overlap between question and context",
    )
    return answer

if __name__ == "__main__":
    ans = rag_pipeline("What is LangGraph?")
    print(ans)
Nesting works automatically: Any @observe-decorated function called inside another creates a child span. The SDK tracks the span stack via Python contextvars — no manual parent ID passing required.

6. Trace Scoring

Scores attach human or automated quality signals to traces. They power dashboards and regression alerts.

scoring.py
from langfuse import Langfuse

lf = Langfuse()

# Human-in-the-loop: a reviewer rates a trace after the fact
lf.score(
    trace_id="trace-uuid-from-ui",
    name="human-rating",
    value=4,        # numeric 1–5
    comment="Accurate and concise",
    data_type="NUMERIC",
)

# Boolean pass/fail
lf.score(
    trace_id="trace-uuid-from-ui",
    name="pii-free",
    value=1,        # 1 = pass
    data_type="BOOLEAN",
)

# Categorical
lf.score(
    trace_id="trace-uuid-from-ui",
    name="tone",
    value="professional",
    data_type="CATEGORICAL",
)

# LLM-as-judge — score many traces in a loop
traces = lf.client.trace.list(limit=20)
for trace in traces.data:
    score_value = run_llm_judge(trace.output)   # your evaluator
    lf.score(
        trace_id=trace.id,
        name="llm-judge-correctness",
        value=score_value,
        data_type="NUMERIC",
    )

lf.flush()

7. Prompt Management & Versioning

Langfuse has a built-in prompt registry. Prompts are versioned, can be labelled (production/staging), and are fetched at runtime so you can A/B test or hot-swap prompts without redeploying code.

prompt_management.py
from langfuse import Langfuse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

lf = Langfuse()

# --- Push a new prompt version (or use the UI) ---
lf.create_prompt(
    name="support-reply-v2",
    prompt="You are a {{tone}} support agent. Answer: {{question}}",
    labels=["production"],
    config={"model": "gpt-4o-mini", "temperature": 0.3},
)

# --- Fetch prompt at runtime (always latest "production" label) ---
lf_prompt = lf.get_prompt("support-reply-v2", label="production")
print(f"Using prompt version: {lf_prompt.version}")

# Compile to a LangChain PromptTemplate
lc_prompt = lf_prompt.get_langchain_prompt()   # returns ChatPromptTemplate

# Build chain
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
chain = lc_prompt | llm

handler = lf_prompt.get_langchain_handler()   # links trace to this prompt version
result = chain.invoke(
    {"tone": "friendly", "question": "Where is my order?"},
    config={"callbacks": [handler]},
)
print(result.content)
lf.flush()
Why this matters: Every trace in Langfuse UI shows which exact prompt version was used. When you roll back a bad prompt, old traces still reference the version that produced them — perfect audit trail.

8. Datasets & Evaluation

Langfuse datasets let you curate golden test cases from real traces and run batch evaluations — similar to LangSmith but self-hosted.

dataset_eval.py
from langfuse import Langfuse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

lf = Langfuse()

# 1. Create dataset
dataset = lf.create_dataset(name="support-qa-golden")

# 2. Add items (input / expected output)
examples = [
    ("How do I reset my password?", "Navigate to Settings → Security → Reset password."),
    ("What payment methods do you accept?", "We accept Visa, Mastercard, PayPal, and bank transfer."),
    ("How long does shipping take?", "Standard shipping is 3–5 business days."),
]
for inp, expected in examples:
    lf.create_dataset_item(
        dataset_name="support-qa-golden",
        input={"question": inp},
        expected_output={"answer": expected},
    )

# 3. Run evaluation loop
llm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a support agent."),
    ("human", "{question}"),
])
chain = prompt | llm

dataset_items = lf.get_dataset("support-qa-golden")
for item in dataset_items.items:
    handler = item.get_langchain_handler(run_name="eval-run-v1")
    result = chain.invoke(item.input, config={"callbacks": [handler]})

    # Score each run
    item.link(handler, "eval-run-v1")
    lf.score(
        trace_id=handler.get_trace_id(),
        name="exact-match",
        value=1 if result.content.strip() == item.expected_output["answer"] else 0,
        data_type="BOOLEAN",
    )

lf.flush()
print("Evaluation complete — check Langfuse UI for results")

9. Cost & Latency Dashboards

Langfuse auto-calculates token costs using its built-in pricing table (updated regularly) and surfaces them in the Analytics tab — no extra configuration needed.

Cost by Model

Bar chart showing total spend per model over time. Drill down by user, session, or tag.

Latency Percentiles

p50 / p95 / p99 latency per trace name. Spot slow chains or outlier requests immediately.

Token Usage

Stacked chart of prompt vs completion tokens. Useful for prompt engineering — shorter prompts = lower cost.

Score Trends

Line chart of average scores per score name over time — track quality regressions after deployments.

User Analytics

Cost and usage broken down by user_id — identify top consumers or suspicious spikes.

Session Replay

View the full conversation thread for any session_id with all turns and metadata.

Custom model pricing: If you use a fine-tuned or locally-hosted model, add it to Langfuse's Models settings page with a per-token price and it will appear in cost dashboards automatically.

10. Integrating with LangGraph

LangGraph graphs invoke LangChain runnables internally, so passing a Langfuse callback through RunnableConfig traces every node automatically.

langgraph_langfuse.py
from langfuse.callback import CallbackHandler
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

llm = ChatOpenAI(model="gpt-4o-mini")

def chatbot(state: State) -> State:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

graph = StateGraph(State)
graph.add_node("chatbot", chatbot)
graph.set_entry_point("chatbot")
graph.add_edge("chatbot", END)
app = graph.compile()

# Create a Langfuse handler per invocation
handler = CallbackHandler(
    session_id="langgraph-session-001",
    user_id="user-123",
    tags=["langgraph", "chatbot"],
)

result = app.invoke(
    {"messages": [HumanMessage(content="Explain LangGraph in one sentence.")]},
    config={"callbacks": [handler]},
)

handler.flush()
print(result["messages"][-1].content)
Multi-step traces: Each LangGraph node that calls an LLM produces a child span inside the same trace. You'll see the full DAG timeline in Langfuse — which nodes were slow, how many tokens each consumed, and what each returned.

📝 Knowledge Check

Module 09 — Quiz

Score 80% or higher (8 out of 10) to unlock Module 10.

0 of 10 answered