apify-core-workflow-b
Manage Apify datasets, key-value stores, and request queues programmatically. Use when reading/writing datasets, exporting data, managing Actor storage, or orchestrating multi-Actor pipelines. Trigger: "apify dataset", "apify key-value store", "apify storage", "export apify data", "apify pipeline", "apify request queue".
What this skill does
# Apify Core Workflow B — Storage & Pipelines
## Overview
Manage Apify's three storage types (datasets, key-value stores, request queues) and orchestrate multi-Actor pipelines. Covers CRUD operations, data export, pagination, and chaining Actors together.
## Prerequisites
- `apify-client` installed and authenticated
- Familiarity with `apify-core-workflow-a`
## Storage Types at a Glance
| Storage | Best For | Analogy | Retention |
|---------|----------|---------|-----------|
| Dataset | Lists of similar items (products, pages) | Append-only table | 7 days (unnamed) |
| Key-Value Store | Config, screenshots, summaries, any file | S3 bucket | 7 days (unnamed) |
| Request Queue | URLs to crawl (managed by Crawlee) | Job queue | 7 days (unnamed) |
Named storages persist indefinitely. Unnamed (default run) storages expire after 7 days.
## Instructions
### Step 1: Dataset Operations
```typescript
import { ApifyClient } from 'apify-client';
const client = new ApifyClient({ token: process.env.APIFY_TOKEN });
// Create a named dataset (persists indefinitely)
const dataset = await client.datasets().getOrCreate('product-catalog');
const dsClient = client.dataset(dataset.id);
// Push items (single or batch)
await dsClient.pushItems([
{ sku: 'ABC123', name: 'Widget', price: 9.99 },
{ sku: 'DEF456', name: 'Gadget', price: 19.99 },
]);
// List items with pagination
const page1 = await dsClient.listItems({ limit: 100, offset: 0 });
console.log(`Total items: ${page1.total}, this page: ${page1.items.length}`);
// Iterate all items (handles pagination automatically)
let offset = 0;
const limit = 1000;
const allItems = [];
while (true) {
const { items } = await dsClient.listItems({ limit, offset });
if (items.length === 0) break;
allItems.push(...items);
offset += items.length;
}
// Download in various formats
const csvBuffer = await dsClient.downloadItems('csv');
const jsonBuffer = await dsClient.downloadItems('json');
const xlsxBuffer = await dsClient.downloadItems('xlsx');
// Download filtered/transformed
const filtered = await dsClient.downloadItems('json', {
fields: ['sku', 'name', 'price'], // Only these fields
unwind: 'variants', // Flatten nested arrays
desc: true, // Reverse order
});
// Get dataset info (item count, size)
const info = await dsClient.get();
console.log(`${info.itemCount} items, ${info.actSize} bytes`);
```
### Step 2: Key-Value Store Operations
```typescript
// Create a named store
const store = await client.keyValueStores().getOrCreate('scraper-config');
const kvClient = client.keyValueStore(store.id);
// Store JSON config
await kvClient.setRecord({
key: 'settings',
value: { maxRetries: 3, proxy: 'residential', country: 'US' },
contentType: 'application/json',
});
// Store binary data (screenshot, PDF)
import { readFileSync } from 'fs';
await kvClient.setRecord({
key: 'report.pdf',
value: readFileSync('report.pdf'),
contentType: 'application/pdf',
});
// Retrieve a record
const record = await kvClient.getRecord('settings');
console.log(record.value); // { maxRetries: 3, proxy: 'residential', ... }
// List all keys in the store
const { items: keys } = await kvClient.listKeys();
keys.forEach(k => console.log(`${k.key} (${k.size} bytes)`));
// Delete a record
await kvClient.deleteRecord('old-config');
// Access an Actor run's default stores
const run = await client.actor('apify/web-scraper').call(input);
const runKv = client.keyValueStore(run.defaultKeyValueStoreId);
const output = await runKv.getRecord('OUTPUT');
```
### Step 3: Request Queue Management
```typescript
// Create a named request queue (useful for resumable crawls)
const queue = await client.requestQueues().getOrCreate('my-crawl-queue');
const rqClient = client.requestQueue(queue.id);
// Add requests
await rqClient.addRequest({ url: 'https://example.com/page1', uniqueKey: 'page1' });
// Batch add (up to 25 per call)
await rqClient.batchAddRequests([
{ url: 'https://example.com/page2', uniqueKey: 'page2' },
{ url: 'https://example.com/page3', uniqueKey: 'page3' },
]);
// Get queue info
const queueInfo = await rqClient.get();
console.log(`Pending: ${queueInfo.pendingRequestCount}, Handled: ${queueInfo.handledRequestCount}`);
```
### Step 4: Multi-Actor Pipeline
```typescript
// Pipeline: Scrape -> Transform -> Export
async function runPipeline(urls: string[]) {
const client = new ApifyClient({ token: process.env.APIFY_TOKEN });
// Stage 1: Scrape raw data
console.log('Stage 1: Scraping...');
const scrapeRun = await client.actor('username/product-scraper').call({
startUrls: urls.map(url => ({ url })),
maxItems: 1000,
});
const { items: rawData } = await client
.dataset(scrapeRun.defaultDatasetId)
.listItems();
console.log(`Scraped ${rawData.length} items`);
// Stage 2: Transform (using a data-processing Actor)
console.log('Stage 2: Transforming...');
const transformRun = await client.actor('username/data-transformer').call({
datasetId: scrapeRun.defaultDatasetId,
transformations: {
dedup: { field: 'sku' },
filter: { field: 'price', operator: 'gt', value: 0 },
},
});
// Stage 3: Export to named dataset for long-term storage
console.log('Stage 3: Exporting...');
const { items: cleanData } = await client
.dataset(transformRun.defaultDatasetId)
.listItems();
const exportDs = await client.datasets().getOrCreate('product-catalog-clean');
await client.dataset(exportDs.id).pushItems(cleanData);
console.log(`Pipeline complete. ${cleanData.length} clean items stored.`);
return exportDs.id;
}
```
### Step 5: Monitor Actor Runs
```typescript
// List recent runs for an Actor
const { items: runs } = await client.actor('username/my-actor').runs().list({
limit: 10,
desc: true,
});
runs.forEach(run => {
console.log(`${run.id} | ${run.status} | ${run.startedAt} | ${run.usageTotalUsd?.toFixed(4)} USD`);
});
// Get detailed run info
const runDetail = await client.run('RUN_ID').get();
console.log({
status: runDetail.status,
statusMessage: runDetail.statusMessage,
datasetItems: runDetail.stats?.datasetItemCount,
computeUnits: runDetail.usage?.ACTOR_COMPUTE_UNITS,
durationSecs: runDetail.stats?.runTimeSecs,
});
// Abort a running Actor
await client.run('RUN_ID').abort();
```
## Data Flow Diagram
```
Actor Run
├── Default Dataset ← Actor.pushData() writes here
├── Default KV Store ← Actor.setValue() writes here
│ ├── INPUT ← Input passed at run start
│ └── OUTPUT ← Convention for main output
└── Default Request Queue ← Crawlee manages this
```
## Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| `Dataset not found` | Expired (unnamed, >7 days) | Use named datasets for persistence |
| `Record too large` | KV store 9MB record limit | Split into multiple records |
| `Push failed` | Dataset items >9MB batch | Push in smaller batches |
| `Request already exists` | Duplicate uniqueKey | Expected behavior, queue deduplicates |
## Resources
- [Dataset Documentation](https://docs.apify.com/platform/storage/dataset)
- [Key-Value Store Documentation](https://docs.apify.com/platform/storage/key-value-store)
- [Request Queue Documentation](https://docs.apify.com/platform/storage/request-queue)
- [JS Client API Reference](https://docs.apify.com/api/client/js/reference)
## Next Steps
For common errors, see `apify-common-errors`.
Related in Writing & Docs
jax-development
IncludedUse this skill when the user is writing, debugging, profiling, refactoring, reviewing, benchmarking, parallelising, exporting, or explaining JAX code, or when they mention JAX, jax.numpy, jit, grad, value_and_grad, vmap, scan, lax, random keys, pytrees, jax.Array, sharding, Mesh, PartitionSpec, NamedSharding, pmap, shard_map, Pallas, XLA, StableHLO, checkify, profiler, or the JAX repo. It helps turn NumPy or PyTorch-style code into pure functional JAX, fix tracer/control-flow/shape/PRNG bugs, remove recompiles and host-device syncs, choose transforms and sharding strategies, inspect jaxpr/lowering/IR, and benchmark compiled code correctly.
nature-article-writer
IncludedDrafts, rewrites, diagnostically critiques, and style-calibrates primary research manuscripts for Nature and Nature Portfolio journals. Use when the user wants a Nature-style title, summary paragraph or abstract, introduction, results, discussion, methods, figure legends, presubmission enquiry, cover letter, reviewer response, or when a scientific draft sounds generic, jargon-heavy, structurally weak, or AI-ish and needs precise, broad-reader-friendly prose without inventing data, analyses, or references. Best for primary research articles and letters rather than reviews or press releases unless explicitly adapting one.
deckrd
IncludedDocument-driven framework that derives requirements, specifications, implementation plans, and executable tasks from goals through structured AI dialogue. Use when user says "write requirements", "create spec", "plan implementation", "derive tasks", "structure this feature", "break down into tasks", or "document this module". Also use for reverse engineering existing code into docs (/deckrd rev). Do NOT use for direct code writing — use /deckrd-coder after tasks are generated. Do NOT use when the user only wants to run or fix existing code without planning.
clinical-decision-support
IncludedGenerate professional clinical decision support (CDS) documents for pharmaceutical and clinical research settings, including patient cohort analyses (biomarker-stratified with outcomes) and treatment recommendation reports (evidence-based guidelines with decision algorithms). Supports GRADE evidence grading, statistical analysis (hazard ratios, survival curves, waterfall plots), biomarker integration, and regulatory compliance. Outputs publication-ready LaTeX/PDF format optimized for drug development, clinical research, and evidence synthesis.
handling-sf-data
IncludedSalesforce data operations with 130-point scoring. Use this skill to create, update, delete, bulk import/export, generate test data, and clean up org records using sf CLI and anonymous Apex. TRIGGER when: user creates test data, performs bulk import/export, uses sf data CLI commands, needs data factory patterns for Apex tests, or needs to seed/clean records in a Salesforce org. DO NOT TRIGGER when: SOQL query writing only (use querying-soql), Apex test execution (use running-apex-tests), or metadata deployment (use deploying-metadata).
accelint-ac-to-playwright
IncludedConvert and validate acceptance criteria for Playwright test automation. Use when user asks to (1) review/evaluate/check if AC are ready for automation, (2) assess if AC can be converted as-is, (3) validate AC quality for Playwright, (4) turn AC into tests, (5) generate tests from acceptance criteria, (6) convert .md bullets or .feature Gherkin files to Playwright specs, (7) create test automation from requirements. Handles both bullet-style markdown and Gherkin syntax with JSON test plan generation and validation.