klingai-async-workflows
Build async video generation workflows with Kling AI using queues, state machines, and event-driven patterns. Trigger with phrases like 'klingai async', 'kling ai workflow', 'klingai pipeline', 'async video generation'.
What this skill does
# Kling AI Async Workflows
## Overview
Kling AI video generation is inherently async: you submit a task, then poll or receive a callback when done. This skill covers production patterns for integrating this into larger systems using queues, state machines, and event-driven architectures.
## Core Pattern: Submit + Callback
```python
import jwt, time, os, requests
BASE = "https://api.klingai.com/v1"
def get_headers():
ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
token = jwt.encode(
{"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
)
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def submit_async(prompt, callback_url=None, **kwargs):
"""Submit task and return immediately."""
body = {
"model_name": kwargs.get("model", "kling-v2-master"),
"prompt": prompt,
"duration": str(kwargs.get("duration", 5)),
"mode": kwargs.get("mode", "standard"),
}
if callback_url:
body["callback_url"] = callback_url
r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json=body)
return r.json()["data"]["task_id"]
```
## Redis Queue Workflow
```python
import redis
import json
r = redis.Redis()
# Producer: enqueue video generation requests
def enqueue_video_job(prompt, metadata=None):
job = {
"id": f"job_{int(time.time() * 1000)}",
"prompt": prompt,
"metadata": metadata or {},
"status": "queued",
"created_at": time.time(),
}
r.lpush("kling:jobs:pending", json.dumps(job))
return job["id"]
# Worker: process jobs from queue
def process_jobs(max_concurrent=3):
active_tasks = {}
while True:
# Submit new jobs if under concurrency limit
while len(active_tasks) < max_concurrent:
raw = r.rpop("kling:jobs:pending")
if not raw:
break
job = json.loads(raw)
task_id = submit_async(job["prompt"])
active_tasks[task_id] = job
r.hset("kling:jobs:active", task_id, json.dumps(job))
# Check active tasks
completed = []
for task_id, job in active_tasks.items():
result = requests.get(
f"{BASE}/videos/text2video/{task_id}", headers=get_headers()
).json()
status = result["data"]["task_status"]
if status == "succeed":
job["status"] = "completed"
job["video_url"] = result["data"]["task_result"]["videos"][0]["url"]
r.lpush("kling:jobs:completed", json.dumps(job))
completed.append(task_id)
elif status == "failed":
job["status"] = "failed"
job["error"] = result["data"].get("task_status_msg")
r.lpush("kling:jobs:failed", json.dumps(job))
completed.append(task_id)
for tid in completed:
active_tasks.pop(tid)
r.hdel("kling:jobs:active", tid)
time.sleep(10)
```
## State Machine Pattern
```python
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class JobState(Enum):
QUEUED = "queued"
SUBMITTING = "submitting"
PROCESSING = "processing"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class VideoJob:
prompt: str
state: JobState = JobState.QUEUED
task_id: Optional[str] = None
video_url: Optional[str] = None
error: Optional[str] = None
attempts: int = 0
max_attempts: int = 3
def can_retry(self) -> bool:
return self.state == JobState.FAILED and self.attempts < self.max_attempts
def transition(self, new_state: JobState):
valid = {
JobState.QUEUED: {JobState.SUBMITTING},
JobState.SUBMITTING: {JobState.PROCESSING, JobState.FAILED},
JobState.PROCESSING: {JobState.DOWNLOADING, JobState.FAILED},
JobState.DOWNLOADING: {JobState.COMPLETED, JobState.FAILED},
JobState.FAILED: {JobState.RETRYING},
JobState.RETRYING: {JobState.SUBMITTING},
}
if new_state not in valid.get(self.state, set()):
raise ValueError(f"Invalid transition: {self.state} -> {new_state}")
self.state = new_state
```
## Multi-Step Pipeline
```python
async def video_pipeline(prompt, steps=None):
"""Chain: generate -> extend -> download -> upload."""
steps = steps or ["generate", "extend", "download"]
# Step 1: Generate
task_id = submit_async(prompt, duration=5)
result = poll_task("/videos/text2video", task_id) # from job-monitoring skill
video_url = result["videos"][0]["url"]
# Step 2: Extend (optional)
if "extend" in steps:
ext_r = requests.post(f"{BASE}/videos/video-extend", headers=get_headers(), json={
"task_id": task_id,
"prompt": f"Continue: {prompt}",
"duration": "5",
}).json()
ext_result = poll_task("/videos/video-extend", ext_r["data"]["task_id"])
video_url = ext_result["videos"][0]["url"]
# Step 3: Download
if "download" in steps:
video_data = requests.get(video_url).content
filepath = f"output/{task_id}.mp4"
with open(filepath, "wb") as f:
f.write(video_data)
return filepath
return video_url
```
## Event-Driven with Webhook
```python
# Use callback_url to avoid polling entirely
task_id = submit_async(
"Sunset over ocean with sailboats",
callback_url="https://your-app.com/webhooks/kling"
)
# Your webhook handler triggers next pipeline step
# See klingai-webhook-config skill for receiver implementation
```
## Resources
- [API Reference](https://app.klingai.com/global/dev/document-api/apiReference/model/textToVideo)
- [Developer Portal](https://app.klingai.com/global/dev)
Related in Image & Video
watch
IncludedWatch a video (URL or local path). Downloads with yt-dlp, extracts auto-scaled frames with ffmpeg, pulls the transcript from captions (or Whisper API fallback), and hands the result to Claude so it can answer questions about what's in the video.
physical-ai-defect-image-generation
IncludedUse when the user wants to orchestrate defect image generation, run associated setup, or handle outputs on OSMO. The Day 0 path handles cold-start with USD-to-ROI, image-edit augmentation, and AnomalyGen to create initial PCBA datasets. The Day 1 path performs inference and labeling on real images. This skill helps with first-time asset setup, creation of finetuning checkpoints, and configuring deployment. Trigger keywords: defect image generation, dig workflow, dig pipeline, defect image detection workflow, aoi pipeline, aoi anomalygen, usd2roi anomalygen, day 0 pcba, day 1 pcba, day 1 real-photo alignment, day 1 manual roi, metal surface anomaly, glass defect, anomalygen finetune, setup_pcb, setup_metal, setup_glass, setup_pretrained, dig setup, dig datasets, dig pretrained checkpoint, dig image-edit endpoint.
accelint-react-best-practices
IncludedReact performance optimization and best practices. ALWAYS use this skill when working with any React code - writing components, hooks, JSX; refactoring; optimizing re-renders, memoization, state management; reviewing for performance; fixing hydration mismatches; debugging infinite re-renders, stale closures, input focus loss, animations restarting; preventing remounting; implementing transitions, lazy initialization, effect dependencies. Even simple React tasks benefit from these patterns. Covers React 19+ (useEffectEvent, Activity, ref props). Triggers - useEffect, useState, useMemo, useCallback, memo, inline components, nested components, components inside components, re-render, performance, hydration, SSR, Next.js, useDeferredValue, combined hooks.
elevenlabs-agents
IncludedBuild conversational AI voice agents with ElevenLabs Platform using React, JavaScript, React Native, or Swift SDKs. Configure agents, tools (client/server/MCP), RAG knowledge bases, multi-voice, and Scribe real-time STT. Use when: building voice chat interfaces, implementing AI phone agents with Twilio, configuring agent workflows or tools, adding RAG knowledge bases, testing with CLI "agents as code", or troubleshooting deprecated @11labs packages, Android audio cutoff, CSP violations, dynamic variables, or WebRTC config. Keywords: ElevenLabs Agents, ElevenLabs voice agents, AI voice agents, conversational AI, @elevenlabs/react, @elevenlabs/client, @elevenlabs/react-native, @elevenlabs/elevenlabs-js, @elevenlabs/agents-cli, elevenlabs SDK, voice AI, TTS, text-to-speech, ASR, speech recognition, turn-taking model, WebRTC voice, WebSocket voice, ElevenLabs conversation, agent system prompt, agent tools, agent knowledge base, RAG voice agents, multi-voice agents, pronunciation dictionary, voice speed control, elevenlabs scribe, @11labs deprecated, Android audio cutoff, CSP violation elevenlabs, dynamic variables elevenlabs, case-sensitive tool names, webhook authentication
humanizer
IncludedHumanize AI-generated text by detecting and removing patterns typical of LLM output. Rewrites text to sound natural, specific, and human. Uses 28 pattern detectors, 560+ AI vocabulary terms across 3 tiers, and statistical analysis (burstiness, type-token ratio, readability) for comprehensive detection. Use when asked to humanize text, de-AI writing, make content sound more natural/human, review writing for AI patterns, score text for AI detection, or improve AI-generated drafts. Covers content, language, style, communication, and filler categories.
generating-mermaid-diagrams
IncludedSalesforce architecture diagrams using Mermaid with ASCII fallback. Use this skill when generating text-based diagrams for Salesforce architecture, OAuth flows, ERDs, integration sequences, or Agentforce structure. TRIGGER when: user says "diagram", "visualize", "ERD", or asks for sequence diagrams, flowcharts, class diagrams, or architecture visualizations in Mermaid. DO NOT TRIGGER when: user wants PNG/SVG image output (use generating-visual-diagrams), or asks about non-Salesforce systems.