Claude
Skills
Sign in
Back

langchain-webhooks-events

Included with Lifetime
$97 forever

Dispatch LangChain 1.0 chain/agent events to external systems — webhooks, Kafka, Redis Streams, SNS — via async fire-and-forget callbacks, subgraph-aware wiring, and HMAC-signed delivery with idempotency keys. Use when firing webhooks on tool calls, pushing telemetry to Kafka / Redis Streams, or fanning progress to multiple subscribers without blocking the chain. Trigger with "langchain webhook", "langchain event dispatch", "langchain callback kafka", "langchain pubsub", "langchain per-tool webhook", "BaseCallbackHandler webhook", "on_tool_end webhook", "langchain analytics event".

Backend & APIssaaslangchainlanggraphpythonlangchain-1.0webhooksasyncevents

What this skill does

# LangChain Webhooks and Event Dispatch (Python)

## Overview

A team wires per-tool webhook dispatch from their LangChain agent via FastAPI
`BackgroundTasks` — analytics is always N seconds late because `BackgroundTasks`
fire **after** the HTTP response closes, not during the stream (P60). Worse:
the `BaseCallbackHandler` they attached via `.with_config(callbacks=[h])`
fires on the outer agent but is dark on the subagent's tool calls — custom
callbacks are **not** inherited by LangGraph subgraphs (P28), they must be
passed via `config["callbacks"]` at invoke time.

Pain-catalog anchors handled here:

- P28 — Callbacks via `with_config` don't propagate to subgraphs
- P46 — SSE streams dropped by buffering proxies (see `langchain-langgraph-streaming`)
- P47 — `astream_events(v2)` emits thousands of events; never forward raw
- P48 — Sync `invoke()` inside async endpoint blocks the event loop
- P60 — `BackgroundTasks` fire post-response; wrong for per-event dispatch

This skill walks through an async `AsyncCallbackHandler` with fire-and-forget
dispatch, per-target sinks for HTTP / Kafka / Redis Streams / SNS, HMAC-signed
delivery with 1s/5s/30s retry and DLQ, idempotency keys = `run_id + event_type

- step_index`, and`config["callbacks"]` wiring that makes subagent calls visible.
Typical webhook latency budget: <500ms per event. Pin: `langchain-core 1.0.x`,
`langgraph 1.0.x`. Scope: server-to-server dispatch only — UI streaming is in
`langchain-langgraph-streaming`.

## Prerequisites

- Python 3.10+
- `langchain-core >= 1.0, < 2.0`, `langgraph >= 1.0, < 2.0`
- `httpx >= 0.27` for async HTTP (or `aiohttp`)
- One of: `aiokafka`, `redis[hiredis] >= 5`, `aioboto3` (per target)
- An event sink — a webhook endpoint, Kafka topic, Redis Stream, or SNS topic
- A shared secret (for HMAC) stored in your secret manager, not env

## Instructions

### Step 1 — Write an async handler that fire-and-forget dispatches

Sync dispatch from a callback blocks the chain — a slow HTTP POST during
`on_tool_end` serializes all downstream tokens behind it (P48). Use
`asyncio.create_task(...)` so the dispatch runs alongside the chain:

```python
import asyncio
import uuid
from typing import Any
from langchain_core.callbacks import AsyncCallbackHandler

class EventDispatchHandler(AsyncCallbackHandler):
    """Fire-and-forget dispatch to external sinks.

    IMPORTANT: subclass AsyncCallbackHandler (not BaseCallbackHandler) so
    on_* methods are awaited. Mixing sync and async handlers is a silent
    footgun — sync on_* blocks the event loop (P48).
    """

    def __init__(self, sink, *, run_id: str | None = None):
        self.sink = sink                      # dispatch target — Step 3
        self.run_id = run_id or str(uuid.uuid4())
        self._tasks: set[asyncio.Task] = set()

    def _dispatch(self, event_type: str, payload: dict, step_index: int) -> None:
        # Fire-and-forget. Keep a strong reference so the task isn't GC'd
        # mid-flight (asyncio quirk — orphan tasks get garbage-collected).
        task = asyncio.create_task(
            self.sink.send(
                idempotency_key=f"{self.run_id}:{event_type}:{step_index}",
                event_type=event_type,
                payload=payload,
            )
        )
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    async def on_tool_end(self, output: Any, *, run_id, parent_run_id=None, **kwargs):
        # 4000 char cap — keep payload under typical webhook body limits
        # while preserving enough context for downstream analytics.
        MAX_OUTPUT_CHARS = 4000
        self._dispatch(
            "tool_end",
            {"output": str(output)[:MAX_OUTPUT_CHARS], "run_id": str(run_id)},
            step_index=kwargs.get("tags", []).__len__() or 0,
        )

    async def on_chain_end(self, outputs: dict, *, run_id, **kwargs):
        # Only named chains — skip the unnamed LCEL inner nodes (P47)
        name = kwargs.get("name")
        if not name or name.startswith("RunnableLambda"):
            return
        self._dispatch("chain_end", {"name": name, "run_id": str(run_id)}, step_index=0)

    async def drain(self, timeout: float = 5.0) -> None:
        """Call before process exit so in-flight dispatches complete."""
        if self._tasks:
            await asyncio.wait(self._tasks, timeout=timeout)
```

See [Async Callback Handler](references/async-callback-handler.md) for the full
handler — `on_llm_end`, filtering, sync-vs-async decision.

### Step 2 — Pass callbacks via `config` so subgraphs inherit them

P28: `Runnable.with_config(callbacks=[h])` binds at definition and is **not**
inherited by LangGraph subgraphs. Pass callbacks via `config` at invocation:

```python
# WRONG — subagent tool calls never fire the handler
agent_with_handler = agent.with_config({"callbacks": [handler]})
await agent_with_handler.ainvoke({"messages": [...]})

# RIGHT — callbacks in config propagate into subgraphs
await agent.ainvoke(
    {"messages": [...]},
    config={"callbacks": [handler], "configurable": {"thread_id": "t1"}},
)
```

Validate propagation with a probe that counts events by `kwargs["name"]` and
asserts the subagent's name appears. See [Subgraph Propagation](references/subgraph-propagation.md).

### Step 3 — Pick a dispatch target by delivery semantics

Match the event to the transport:

| Target | Delivery | Typical latency | Use when | Failure mode |
|---|---|---|---|---|
| HTTP webhook | At-least-once (with retry) | 50-500ms | Partner integrations, Zapier/Make, customer-owned endpoints | Endpoint 5xx → retry 1s/5s/30s → DLQ |
| Kafka (aiokafka) | At-least-once (idempotent producer) | 5-20ms intra-region | High-volume telemetry, analytics fan-in | Broker unavailable → retry + local buffer |
| Redis Streams (`XADD`) | At-least-once (consumer groups) | 1-5ms | Near-realtime worker queues, progress fan-out | Redis down → retry or spill to disk |
| SNS | At-most-once (best-effort) | 10-100ms | Fan-out to multiple SQS/Lambda subscribers | Best-effort only; accept loss or front with SQS FIFO |

Handler stays provider-agnostic; only the `sink` changes. A minimal HTTP sink:

```python
import hashlib, hmac, json, os
import httpx

WEBHOOK_URL = os.environ["WEBHOOK_URL"]
SIGNING_SECRET = os.environ["WEBHOOK_SIGNING_SECRET"].encode()

class WebhookSink:
    # 256-bit HMAC — industry-standard signature strength (GitHub, Stripe use same)
    SIG_ALG = hashlib.sha256
    # Retry schedule: 1s absorbs transient blips, 5s absorbs brief 503s,
    # 30s absorbs autoscaler / cold-start incidents. Beyond 30s = stale event.
    RETRY_DELAYS = (1, 5, 30)
    REQUEST_TIMEOUT_S = 5.0

    def __init__(self, client: httpx.AsyncClient):
        self.client = client

    async def send(self, *, idempotency_key: str, event_type: str, payload: dict) -> None:
        body = json.dumps({"event": event_type, "data": payload}, sort_keys=True).encode()
        sig = hmac.new(SIGNING_SECRET, body, self.SIG_ALG).hexdigest()
        headers = {
            "Content-Type": "application/json",
            "Idempotency-Key": idempotency_key,
            "X-Signature-256": f"sha256={sig}",
        }
        for delay in self.RETRY_DELAYS:
            try:
                resp = await self.client.post(WEBHOOK_URL, content=body, headers=headers, timeout=self.REQUEST_TIMEOUT_S)
                if 200 <= resp.status_code < 300:
                    return
                if resp.status_code < 500 and resp.status_code != 429:
                    return  # 4xx (except 429) is not retryable
            except (httpx.TimeoutException, httpx.TransportError):
                pass
            await asyncio.sleep(delay)
        await self._dead_letter(idempotency_key, event_type, payload)
```

See [Dispatch Targets](references/dispatch-targets.md) for Kafka / Redis Streams
/ SNS sinks and per-target DLQ patterns.

### Step 4 — Filter events so you don't saturate the downstream

`astream_events(version="v2")` emit

Related in Backend & APIs