clickhouse-webhooks-events
Ingest data into ClickHouse from webhooks, Kafka, and streaming sources with batching, dedup, and exactly-once patterns. Use when building data ingestion pipelines, consuming webhook payloads, or integrating Kafka topics into ClickHouse. Trigger: "clickhouse ingestion", "clickhouse webhook", "clickhouse Kafka", "stream data to clickhouse", "clickhouse data pipeline".
What this skill does
# ClickHouse Data Ingestion
## Overview
Build data ingestion pipelines into ClickHouse from HTTP webhooks, Kafka,
and streaming sources with proper batching, deduplication, and error handling.
## Prerequisites
- ClickHouse table with appropriate engine (see `clickhouse-core-workflow-a`)
- `@clickhouse/client` connected
## Instructions
### Step 1: Webhook Receiver with Batched Inserts
```typescript
import express from 'express';
import { createClient } from '@clickhouse/client';
const client = createClient({ url: process.env.CLICKHOUSE_HOST! });
const app = express();
app.use(express.json());
// Buffer for batching — ClickHouse hates one-row-at-a-time inserts
const buffer: Record<string, unknown>[] = [];
const BATCH_SIZE = 5_000;
const FLUSH_INTERVAL_MS = 5_000;
async function flushBuffer() {
if (buffer.length === 0) return;
const batch = buffer.splice(0, buffer.length);
try {
await client.insert({
table: 'analytics.events',
values: batch,
format: 'JSONEachRow',
});
console.log(`Flushed ${batch.length} events to ClickHouse`);
} catch (err) {
console.error('Insert failed, re-queuing:', (err as Error).message);
buffer.unshift(...batch); // Put back at front for retry
}
}
// Flush periodically
setInterval(flushBuffer, FLUSH_INTERVAL_MS);
// Webhook endpoint
app.post('/ingest', async (req, res) => {
const events = Array.isArray(req.body) ? req.body : [req.body];
for (const event of events) {
buffer.push({
event_type: event.type ?? 'unknown',
user_id: event.userId ?? 0,
properties: JSON.stringify(event.properties ?? {}),
created_at: new Date().toISOString().replace('T', ' ').slice(0, 19),
});
}
if (buffer.length >= BATCH_SIZE) {
await flushBuffer();
}
res.status(202).json({ queued: events.length, buffer_size: buffer.length });
});
```
### Step 2: Kafka Table Engine (Server-Side Ingestion)
```sql
-- Create a Kafka engine table (consumes messages automatically)
CREATE TABLE analytics.events_kafka (
event_type String,
user_id UInt64,
properties String,
timestamp DateTime
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2,
kafka_max_block_size = 65536;
-- Materialized view pipes Kafka → MergeTree automatically
CREATE MATERIALIZED VIEW analytics.events_kafka_mv
TO analytics.events
AS SELECT
event_type,
user_id,
properties,
timestamp AS created_at
FROM analytics.events_kafka;
-- ClickHouse now consumes from Kafka continuously!
-- Check lag:
SELECT * FROM system.kafka_consumers;
```
### Step 3: ClickPipes (ClickHouse Cloud Managed Ingestion)
ClickHouse Cloud offers **ClickPipes** — a managed ingestion service that
connects to Kafka, Confluent, Amazon MSK, S3, and GCS without code.
```
ClickPipes Configuration (Cloud Console):
1. Source: Amazon MSK / Confluent Cloud / Apache Kafka
2. Topic: events
3. Format: JSONEachRow
4. Target: analytics.events
5. Scaling: 2 consumers (auto-scales)
```
### Step 4: HTTP Interface Bulk Insert
```text
# Insert from CSV file via HTTP (no client needed)
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+CSVWithNames' \
--data-binary @events.csv
# Insert from NDJSON file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+JSONEachRow' \
--data-binary @events.ndjson
# Insert from Parquet file
curl 'http://localhost:8123/?query=INSERT+INTO+analytics.events+FORMAT+Parquet' \
--data-binary @events.parquet
# Insert from remote URL (ClickHouse fetches it)
INSERT INTO analytics.events
SELECT * FROM url('https://data.example.com/events.csv', CSVWithNames);
# Insert from S3
INSERT INTO analytics.events
SELECT * FROM s3(
'https://my-bucket.s3.amazonaws.com/events/*.parquet',
'ACCESS_KEY', 'SECRET_KEY',
'Parquet'
);
```
### Step 5: Deduplication with ReplacingMergeTree
```sql
-- For idempotent ingestion (webhook retries, Kafka reprocessing)
CREATE TABLE analytics.events_dedup (
event_id String, -- Unique event identifier
event_type LowCardinality(String),
user_id UInt64,
properties String,
created_at DateTime,
_version UInt64 DEFAULT toUnixTimestamp(now())
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY event_id; -- Dedup key
-- Insert duplicate-safe: same event_id keeps latest _version
-- Query with FINAL for deduplicated results
SELECT * FROM analytics.events_dedup FINAL
WHERE created_at >= today() - 7;
```
### Step 6: Insert Monitoring
```sql
-- Track insert throughput
SELECT
toStartOfMinute(event_time) AS minute,
count() AS inserts,
sum(written_rows) AS rows_inserted,
formatReadableSize(sum(written_bytes)) AS bytes_inserted
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_kind = 'Insert'
AND event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
-- Check for insert errors
SELECT event_time, exception, substring(query, 1, 200)
FROM system.query_log
WHERE type = 'ExceptionWhileProcessing'
AND query_kind = 'Insert'
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY event_time DESC;
```
## Insert Best Practices
| Practice | Why |
|----------|-----|
| Batch 10K-100K rows per INSERT | Fewer parts, faster merges |
| Buffer 1-5 seconds for real-time | Balances latency vs throughput |
| Use `JSONEachRow` format | Client handles serialization |
| Compress with `ZSTD` on wire | Reduces network transfer |
| Use `ReplacingMergeTree` for retries | Handles duplicate delivery |
| Use `async_insert=1` for small batches | Server-side batching |
## Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| `Too many parts` | Single-row inserts | Batch inserts (10K+ rows) |
| `Cannot parse input` | Wrong format | Match format to data structure |
| `TIMEOUT` on large insert | Slow network | Enable compression, split batch |
| Duplicate events | Webhook retries | Use ReplacingMergeTree + event_id |
## Resources
- [Kafka Integration](https://clickhouse.com/docs/integrations/kafka)
- [ClickPipes](https://clickhouse.com/cloud/clickpipes)
- [HTTP Interface](https://clickhouse.com/docs/interfaces/http)
- [S3 Table Function](https://clickhouse.com/docs/sql-reference/table-functions/s3)
## Next Steps
For query and server performance, see `clickhouse-performance-tuning`.
Related in Data & Analytics
clawarr-suite
IncludedComprehensive management for self-hosted media stacks (Sonarr, Radarr, Lidarr, Readarr, Prowlarr, Bazarr, Overseerr, Plex, Tautulli, SABnzbd, Recyclarr, Unpackerr, Notifiarr, Maintainerr, Kometa, FlareSolverr). Deep library exploration, analytics, dashboard generation, content management, request handling, subtitle management, indexer control, download monitoring, quality profile sync, library cleanup automation, notification routing, collection/overlay management, and media tracker integration (Trakt, Letterboxd, Simkl).
querying-soql
IncludedSOQL query generation, optimization, and analysis with 100-point scoring. Use this skill when the user needs SOQL/SOSL authoring or optimization: natural-language-to-query generation, relationship queries, aggregates, query-plan analysis, and performance or safety improvements for Salesforce queries. TRIGGER when: user writes, optimizes, or debugs SOQL/SOSL queries, touches .soql files, or asks about relationship queries, aggregates, or query performance. DO NOT TRIGGER when: bulk data operations (use handling-sf-data), Apex DML logic (use generating-apex), or report/dashboard queries.
app-store-optimization
IncludedApp Store Optimization (ASO) toolkit for researching keywords, analyzing competitor rankings, generating metadata suggestions, and improving app visibility on Apple App Store and Google Play Store. Use when the user asks about ASO, app store rankings, app metadata, app titles and descriptions, app store listings, app visibility, or mobile app marketing on iOS or Android. Supports keyword research and scoring, competitor keyword analysis, metadata optimization, A/B test planning, launch checklists, and tracking ranking changes.
habit-flow
IncludedAI-powered atomic habit tracker with natural language logging, streak tracking, smart reminders, and coaching. Use for creating habits, logging completions naturally ("I meditated today"), viewing progress, and getting personalized coaching.
app-store-optimization
IncludedApp Store Optimization (ASO) toolkit for researching keywords, analyzing competitor rankings, generating metadata suggestions, and improving app visibility on Apple App Store and Google Play Store. Use when the user asks about ASO, app store rankings, app metadata, app titles and descriptions, app store listings, app visibility, or mobile app marketing on iOS or Android. Supports keyword research and scoring, competitor keyword analysis, metadata optimization, A/B test planning, launch checklists, and tracking ranking changes.
visualizing-data
IncludedBuilds dashboards, reports, and data-driven interfaces requiring charts, graphs, or visual analytics. Provides systematic framework for selecting appropriate visualizations based on data characteristics and analytical purpose. Includes 24+ visualization types organized by purpose (trends, comparisons, distributions, relationships, flows, hierarchies, geospatial), accessibility patterns (WCAG 2.1 AA compliance), colorblind-safe palettes, and performance optimization strategies. Use when creating visualizations, choosing chart types, displaying data graphically, or designing data interfaces.