Claude
Skills
Sign in
Back

langchain-langgraph-streaming

Included with Lifetime
$97 forever

Pick the correct LangGraph 1.0 stream_mode ("messages" vs "updates" vs "values"), wire it into SSE or WebSocket without proxy-buffering gotchas, and filter astream_events(v2) server-side before forwarding to the browser. Use when building a live-token chat UI, a per-node progress bar, a debug/time-travel view, or diagnosing a LangGraph stream that hangs over a production proxy. Trigger with "langgraph streaming", "stream_mode messages", "stream_mode updates", "stream_mode values", "langgraph SSE", "langgraph astream_events", "SSE hangs behind nginx", "cloud run streaming".

Designsaaslangchainlanggraphpythonlangchain-1.0streamingssewebsocket

What this skill does

# LangGraph Streaming (Python)

## Overview

An engineer ships `stream_mode="values"` to a token-level chat UI because it
"seemed the most complete." Every single token causes the full graph state —
message history, scratchpad, plan — to be re-sent and re-rendered. At ~60
tokens/sec the browser overdraws, the React reconciler can't keep up, the tab
freezes, and users blame the model. The correct answer was `stream_mode="messages"`,
which emits an `AIMessageChunk` delta per token (typically 5-50 bytes) — one
token's worth of DOM work. This is pain-catalog entry **P19** and it is the #1
LangGraph integration mistake in the 1.0 generation.

Then the same UI ships to Cloud Run and hangs forever. No error. No logs. The
server is emitting tokens; they just never reach the browser. Default proxy
buffering (Nginx, Cloud Run's HTTP/1.1 path, Cloudflare Free) holds the last
chunk waiting for more bytes. This is **P46** — SSE streams from LangGraph
drop the final `end` event over proxies that buffer — and the fix is three
headers: `X-Accel-Buffering: no`, `Cache-Control: no-cache`, `Connection: keep-alive`.

And then the debug view starts crashing browser tabs on long runs. The engineer
forwarded `astream_events(version="v2")` raw to the client because "it has more
detail" — but v2 emits thousands of events per invocation (per-token, per-node,
per-runnable lifecycle), and a 60-second agent run easily hits 3,000 events.
Browsers freeze on the JSON deserialize queue. This is **P47** — filter
server-side, forward only `on_chat_model_stream` tokens (and optionally
`on_tool_start` / `on_tool_end`).

This skill ships the decision matrix, a production-grade FastAPI SSE endpoint
with the anti-buffering headers and a 15-second heartbeat, a server-side v2
event filter that drops ~90% of noise, and a WebSocket variant with
reconnect-by-`thread_id` that resumes from the LangGraph checkpointer. Pin:
`langgraph 1.0.x`, `langchain-core 1.0.x`. Pain-catalog anchors: **P19, P46,
P47, P48, P67**, plus P16 for the `thread_id` rule and P22 for checkpointer
persistence.

## Prerequisites

- Python 3.10+
- `langgraph >= 1.0, < 2.0`, `langchain-core >= 1.0, < 2.0`
- `fastapi >= 0.110`, `uvicorn[standard]` (for SSE/WebSocket hosting)
- A checkpointer: `langgraph.checkpoint.memory.MemorySaver` for dev, or
  `langgraph.checkpoint.postgres.PostgresSaver` for prod
- Access to deploy behind your actual proxy (Nginx / Cloud Run / Cloudflare) —
  localhost does not reproduce the buffering class of bugs

## Instructions

### Step 1 — Pick the right `stream_mode` for your UI

The three modes emit fundamentally different payloads. Match the mode to the
UI shape **before** writing any server code.

| UI type | `stream_mode` | Payload each tick | Emit rate | Overdraw risk | Typical bandwidth per 5s run |
|---|---|---|---|---|---|
| Live-token chat | `"messages"` | `(AIMessageChunk, metadata)` delta | ~30-80 tokens/sec | Low | ~5-15 KB |
| Per-node progress bar / status line | `"updates"` | `{node_name: state_diff}` | 1 per node (~2-20 per run) | Low | ~1-5 KB |
| Debug / time-travel / state replay | `"values"` | Entire graph state dict | 1 per node (~2-20 per run) | **High** (state size × steps) | ~20 KB to MBs |
| Hybrid (progress + tokens) | `["updates", "messages"]` | `(mode, payload)` interleaved | Sum of above | Depends on inner modes | Sum |
| Non-browser observability | `astream_events(v2)` + filter | Filtered dicts | Depends on filter | Low (server-controlled) | Controlled |

Decision tree:

```
Do you need LLM tokens rendered live in the UI?
├── Yes → stream_mode="messages"
│         (add "updates" to the list if you also want per-node progress)
└── No, I need per-step progress
    ├── Full state for debug/replay? → stream_mode="values"
    └── Just what changed (most UIs)  → stream_mode="updates"
```

Full payload samples and combined-mode examples are in
[Stream Mode Comparison](references/stream-mode-comparison.md).

### Step 2 — Wire a minimal SSE endpoint

```python
import asyncio, json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from app.graph import build_graph

app = FastAPI()
graph = build_graph(checkpointer=MemorySaver())


def sse(event: str, data: dict) -> str:
    return f"event: {event}\ndata: {json.dumps(data, default=str)}\n\n"


async def stream_tokens(thread_id: str, user_input: str):
    config = {"configurable": {"thread_id": thread_id}}
    async for chunk, metadata in graph.astream(
        {"messages": [HumanMessage(user_input)]},
        config=config,
        stream_mode="messages",
    ):
        # chunk.content may be list[dict] on Claude tool-use turns (P02)
        text = chunk.text if hasattr(chunk, "text") else (
            chunk.content if isinstance(chunk.content, str) else None
        )
        if text:
            yield sse("token", {"text": text, "node": metadata.get("langgraph_node")})
    yield sse("done", {"thread_id": thread_id})
```

Always use `graph.astream(...)` (async). Never call `graph.stream(...)` (sync)
from inside an async handler — it blocks the event loop and one slow request
blocks every other connection (P48).

### Step 3 — Set the anti-buffering headers

```python
@app.get("/stream")
async def stream(thread_id: str, q: str):
    return StreamingResponse(
        stream_tokens(thread_id, q),
        media_type="text/event-stream",
        headers={
            "X-Accel-Buffering": "no",    # Nginx / Cloud Run / Cloudflare
            "Cache-Control": "no-cache",  # Block intermediate caches
            "Connection": "keep-alive",   # Hold the TCP connection
        },
    )
```

These three headers are non-negotiable in production. Without them, your
stream works on localhost and hangs on Cloud Run. See [SSE Endpoint Template](references/sse-endpoint-template.md)
for the full template with a 15-second heartbeat (required to survive Cloud
Run's 60s idle timeout and corporate-proxy timeouts) plus reverse-proxy
snippets for Nginx, Traefik, and Cloud Run.

### Step 4 — Filter `astream_events(version="v2")` server-side

If your UI needs richer events than `"messages"` provides — tool start/end,
progress markers, retrieval events — do **not** forward `astream_events`
raw. A single 60-second agent run can emit 3,000+ events. Filter on the
server and forward only what the browser uses.

```python
FORWARD = {"on_chat_model_stream", "on_tool_start", "on_tool_end"}

async def filtered(graph, inputs, config):
    async for event in graph.astream_events(inputs, config=config, version="v2"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            text = chunk.text if hasattr(chunk, "text") else None
            if text:
                yield {"type": "token", "text": text,
                       "node": event["metadata"].get("langgraph_node")}
        elif kind == "on_tool_start":
            yield {"type": "tool_start", "tool": event["name"]}
        elif kind == "on_tool_end":
            yield {"type": "tool_end", "tool": event["name"]}
        # Drop: on_chain_*, on_parser_*, on_prompt_*, on_retriever_* (P47)
```

Never use `astream_log()` in new code — soft-deprecated in 1.0 (P67), scheduled
for removal in 2.0. Use `astream_events(version="v2")` instead. Full event
taxonomy and compression/backpressure patterns in
[Astream Events Filtering](references/astream-events-filtering.md).

### Step 5 — WebSocket variant with reconnect

Use WebSocket instead of SSE when the user may cancel, interrupt, or send
follow-up messages mid-stream. WebSocket also sidesteps Cloudflare Free's
default response buffering.

```python
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{thread_id}")
async def ws(websocket: WebSocket, thread_id: str):
    await websocket.accept()
    config = {"configurable": {"thread_id": thread_id}}  # P16 —

Related in Design