Claude
Skills
Sign in
Back

podium-conversation-history-export

Included with Lifetime
$97 forever

Bulk-export Podium conversation, review, and contact history into a vector-store-ready corpus with cursor-paginated full crawls, CDC via updated_at watermarks, attachment URL refresh on expiry, windowed semantic chunking, and PII redaction before embedding. Use when standing up a RAG pipeline on a Podium org, building nightly incremental syncs, or hardening export jobs against attachment expiry. Trigger with "podium history export", "podium rag corpus", "podium cdc sync", "podium incremental export", "podium chunk for embedding".

AI Agentspodiumdata-exportcdcrag-pipelinechunkingpii-redactionscripts

What this skill does


# Podium Conversation History Export

## Overview

Bulk-export a Podium organization's historical conversations, reviews, and contacts into a corpus suitable for embedding into a vector store. This is the skill you run when the customer says "we have two years of knowledge in there" and the AI team wants every thread, every review, every contact note searchable by similarity. It is not a one-shot script — it is the full-export-plus-incremental-CDC pipeline that ingests the historical backlog once and then keeps the corpus current via nightly `updated_at` watermark passes.

The six production failures this skill prevents:

1. **Cursor pagination drift** — Podium's `next_cursor` is server-side state derived from a sort key plus a position. If a conversation is created, updated, or deleted mid-export, naive cursor walks duplicate records (an updated row reappears at the new position) or skip records (a row deleted between pages shifts the cursor's anchor). A correct walk pins the sort to a stable monotonic field and dedups on `id`.
2. **Incremental CDC gaps via the updated_at watermark** — naive `updated_at > $watermark` queries miss writes that happen at exactly the watermark second. Two writes within the same second on opposite sides of the boundary produce a permanent hole. Correct CDC uses `>=` with explicit overlap margin and dedups in the loader on `(id, updated_at)`.
3. **Attachment URL expiry mid-download** — Podium attachment URLs are pre-signed S3-style URLs that expire on the order of 15 minutes. A bulk exporter that takes an hour will get `403 SignatureDoesNotMatch` on every attachment whose URL was issued in the first quarter of the run. Correct downloaders detect the 403, fetch a fresh signed URL by `attachment_id`, and resume.
4. **Oversized thread chunking failures** — a 4000-message conversation thread (a long-running concierge thread for a high-touch RV dealer customer) blows the typical 8K-token embedding budget if naively concatenated. Chunking must be windowed with semantic boundaries (turn boundaries, day boundaries, idle gaps) and emit overlapping chunks for cross-window retrieval.
5. **PII not redacted before embedding** — vector stores are effectively eternal; once a customer's SSN, credit-card number, or address is embedded it cannot be unembedded without recomputing the index. Redact at chunk-emit time with the same PII pattern set used by `podium-call-transcript-pipeline`, before any vector is computed.
6. **Export OOM on long threads** — naively loading all messages of a 4000-message thread into memory before chunking blows the heap on the host running the export. Correct exports stream message-by-message into JSONL, then a separate pass streams JSONL into chunks. Memory cost stays O(window-size), not O(thread-size).

## Prerequisites

- Python 3.10+
- `podium-auth` (this skill assumes a `PodiumAuth` instance is available; do not re-implement OAuth here)
- `podium-rate-limit-survival` (this skill assumes the calling layer obeys per-endpoint quotas — bulk export is the most rate-limit-aggressive workload in the pack)
- A persistent CDC watermark store — SQLite is the default; `cdc_watermark.py` ships with it
- Local disk for streaming JSONL output, gzip-compressed (typical 2-year org: ~1–10 GB raw, 200 MB–2 GB gzipped)
- An attachment-download target directory (S3 bucket, GCS bucket, or local path)
- The PII redaction pattern set from `podium-call-transcript-pipeline` (reused; do not fork)

## Authentication

This skill does NOT implement OAuth. All HTTP calls flow through a `podium_get()` injected dependency that holds a `PodiumAuth` instance from the `podium-auth` skill — that layer handles token caching, 80%-TTL refresh, single-flight locks, and scope validation. Bootstrap by `Read`-ing your refresh-token file, instantiate `PodiumAuth(client_id, client_secret, refresh_token)`, then pass the instance to every export script via `--refresh-token-file` and the env-var credential flags. The five scripts in this skill never construct credentials in-process; they delegate.

If you need to harden the auth path itself (rotation, decay monitoring, multi-tenant routing), `Read` `podium-auth/SKILL.md` and stack that skill on top — this skill is the bulk-data layer, not the auth layer.

## Instructions

Step 1 → Step 6 below. Build in this order. Each step neutralizes one of the six production failure modes from the Overview, in the same order.

### Step 1. Cursor-paginated full crawl (neutralizes cursor drift)

Pin the sort to a stable monotonic field (`created_at` ascending) and dedup on `id` in the loader. Persist the cursor after every successful page so a crash mid-walk resumes at the last successful page boundary, not at the start.

```python
import asyncio, json, time
from pathlib import Path
from typing import AsyncIterator

CURSOR_PATH = Path("./.cursor.conversations.json")
PAGE_SIZE = 100   # Podium documents 100 as the max; do not exceed

async def crawl_conversations(podium_get, location_uid: str) -> AsyncIterator[dict]:
    """Yield conversations in created_at-ascending order. Resumable across crashes."""
    state = json.loads(CURSOR_PATH.read_text()) if CURSOR_PATH.exists() else {}
    cursor = state.get("cursor")
    seen_ids = set(state.get("seen_ids", []))   # bounded; trim periodically

    while True:
        params = {
            "location_uid": location_uid,
            "sort": "created_at:asc",
            "limit": PAGE_SIZE,
        }
        if cursor:
            params["cursor"] = cursor

        resp = await podium_get("/v4/conversations", params=params)
        body = resp.json()

        page = body.get("data", [])
        for row in page:
            if row["id"] in seen_ids:
                continue                 # dedup against mid-walk updates
            seen_ids.add(row["id"])
            yield row

        cursor = body.get("next_cursor")
        # Persist after EVERY page — crash resume must land on the last good cursor
        CURSOR_PATH.write_text(json.dumps({
            "cursor": cursor,
            "seen_ids": list(seen_ids)[-50_000:],   # keep last 50k ids only
            "updated_at": time.time(),
        }))

        if not cursor:
            return
```

The `seen_ids` set is bounded at 50k to prevent unbounded memory growth on a multi-million-row export. Tune the cap to roughly 5× `PAGE_SIZE × pages_in_one_hour` — large enough to dedup any reasonable update churn within the page-write window, small enough to fit in memory.

### Step 2. Incremental CDC via overlap-margin watermark (neutralizes boundary gaps)

A naive `updated_at > $watermark` query misses any row whose `updated_at` is exactly the watermark second. Use `>=` and dedup in the loader; advance the watermark only after the page is fully persisted.

```python
import sqlite3, time, json

WATERMARK_DB = "./watermarks.sqlite"

def get_watermark(resource: str) -> float:
    con = sqlite3.connect(WATERMARK_DB)
    cur = con.execute("SELECT watermark FROM cdc WHERE resource = ?", (resource,))
    row = cur.fetchone()
    con.close()
    return row[0] if row else 0.0

def advance_watermark(resource: str, new_watermark: float) -> None:
    con = sqlite3.connect(WATERMARK_DB)
    con.execute("""
        INSERT INTO cdc(resource, watermark, updated_at) VALUES(?, ?, ?)
        ON CONFLICT(resource) DO UPDATE SET watermark = excluded.watermark, updated_at = excluded.updated_at
    """, (resource, new_watermark, time.time()))
    con.commit()
    con.close()

async def incremental_pull(podium_get, resource: str, overlap_margin_s: int = 60):
    """Pull rows with updated_at >= (watermark - overlap_margin) and dedup."""
    watermark = get_watermark(resource)
    since = max(0, watermark - overlap_margin_s)        # explicit overlap

    cursor = None
    max_seen = watermark
    seen_keys = set()

    while True:
        params = {"updated_since": since, "sort": "updated_at:asc", "limit": 100}
        if cursor:
            params["cur

Related in AI Agents