snowflake-sdk-patterns
Apply production-ready Snowflake SDK patterns for snowflake-sdk and snowflake-connector-python. Use when implementing connection pooling, async execute wrappers, streaming results, or establishing team coding standards for Snowflake. Trigger with phrases like "snowflake SDK patterns", "snowflake best practices", "snowflake code patterns", "idiomatic snowflake", "snowflake connection pool".
What this skill does
# Snowflake SDK Patterns
## Overview
Production-ready patterns for `snowflake-sdk` (Node.js) and `snowflake-connector-python` using real driver APIs.
## Prerequisites
- Completed `snowflake-install-auth` setup
- Understanding of callback-to-promise conversion patterns
- Familiarity with Snowflake's callback-based Node.js API
## Instructions
### Step 1: Connection Pool (Node.js)
```typescript
// src/snowflake/pool.ts
import snowflake from 'snowflake-sdk';
interface PoolConfig {
max: number;
idleTimeoutMs: number;
}
class SnowflakePool {
private pool: snowflake.Connection[] = [];
private available: snowflake.Connection[] = [];
private waiting: ((conn: snowflake.Connection) => void)[] = [];
private config: PoolConfig;
constructor(
private connConfig: snowflake.ConnectionOptions,
config: Partial<PoolConfig> = {}
) {
this.config = { max: 10, idleTimeoutMs: 60000, ...config };
}
async acquire(): Promise<snowflake.Connection> {
// Return available connection
if (this.available.length > 0) {
return this.available.pop()!;
}
// Create new if under limit
if (this.pool.length < this.config.max) {
const conn = snowflake.createConnection(this.connConfig);
await new Promise<void>((resolve, reject) => {
conn.connect((err) => (err ? reject(err) : resolve()));
});
this.pool.push(conn);
return conn;
}
// Wait for one to become available
return new Promise((resolve) => {
this.waiting.push(resolve);
});
}
release(conn: snowflake.Connection): void {
if (this.waiting.length > 0) {
const next = this.waiting.shift()!;
next(conn);
} else {
this.available.push(conn);
}
}
async withConnection<T>(fn: (conn: snowflake.Connection) => Promise<T>): Promise<T> {
const conn = await this.acquire();
try {
return await fn(conn);
} finally {
this.release(conn);
}
}
}
// Singleton pool
export const pool = new SnowflakePool({
account: process.env.SNOWFLAKE_ACCOUNT!,
username: process.env.SNOWFLAKE_USER!,
password: process.env.SNOWFLAKE_PASSWORD!,
warehouse: process.env.SNOWFLAKE_WAREHOUSE || 'COMPUTE_WH',
database: process.env.SNOWFLAKE_DATABASE!,
schema: process.env.SNOWFLAKE_SCHEMA || 'PUBLIC',
});
```
### Step 2: Promise-Based Query Helper
```typescript
// src/snowflake/query.ts
import snowflake from 'snowflake-sdk';
interface QueryResult<T = Record<string, any>> {
rows: T[];
statement: snowflake.Statement;
sqlText: string;
}
export function query<T = Record<string, any>>(
conn: snowflake.Connection,
sqlText: string,
binds?: snowflake.Binds
): Promise<QueryResult<T>> {
return new Promise((resolve, reject) => {
conn.execute({
sqlText,
binds,
complete: (err, stmt, rows) => {
if (err) {
reject(Object.assign(err, { sqlText }));
} else {
resolve({ rows: (rows || []) as T[], statement: stmt, sqlText });
}
},
});
});
}
// Multi-statement execution
export async function multiQuery(
conn: snowflake.Connection,
statements: string[]
): Promise<QueryResult[]> {
const results: QueryResult[] = [];
for (const sql of statements) {
results.push(await query(conn, sql));
}
return results;
}
```
### Step 3: Streaming for Large Result Sets
```typescript
// src/snowflake/stream.ts
export async function* streamQuery<T = Record<string, any>>(
conn: snowflake.Connection,
sqlText: string,
binds?: snowflake.Binds
): AsyncGenerator<T> {
const stmt = await new Promise<snowflake.Statement>((resolve, reject) => {
conn.execute({
sqlText,
binds,
streamResult: true,
complete: (err, stmt) => {
if (err) reject(err);
else resolve(stmt);
},
});
});
const stream = stmt.streamRows();
for await (const row of stream) {
yield row as T;
}
}
// Usage: process millions of rows without memory pressure
// for await (const row of streamQuery(conn, 'SELECT * FROM big_table')) {
// await processRow(row);
// }
```
### Step 4: Python Context Manager Pattern
```python
# src/snowflake_pool.py
import snowflake.connector
from contextlib import contextmanager
from typing import Generator, Any
class SnowflakePool:
def __init__(self, **conn_params):
self._params = conn_params
@contextmanager
def connection(self) -> Generator[snowflake.connector.SnowflakeConnection, None, None]:
conn = snowflake.connector.connect(**self._params)
try:
yield conn
finally:
conn.close()
@contextmanager
def cursor(self) -> Generator[snowflake.connector.cursor.SnowflakeCursor, None, None]:
with self.connection() as conn:
cur = conn.cursor()
try:
yield cur
finally:
cur.close()
def execute(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]:
with self.cursor() as cur:
cur.execute(sql, params)
columns = [desc[0] for desc in cur.description] if cur.description else []
return [dict(zip(columns, row)) for row in cur.fetchall()]
def execute_many(self, sql: str, params_list: list[tuple]) -> int:
"""Batch insert — much faster than individual inserts."""
with self.cursor() as cur:
cur.executemany(sql, params_list)
return cur.rowcount
# Usage
pool = SnowflakePool(
account=os.environ['SNOWFLAKE_ACCOUNT'],
user=os.environ['SNOWFLAKE_USER'],
password=os.environ['SNOWFLAKE_PASSWORD'],
warehouse='COMPUTE_WH',
database='MY_DB',
schema='PUBLIC',
)
users = pool.execute("SELECT * FROM users WHERE status = %s", ('active',))
```
### Step 5: Error Handling Wrapper
```typescript
// src/snowflake/errors.ts
export class SnowflakeQueryError extends Error {
constructor(
message: string,
public readonly sqlState: string,
public readonly code: number,
public readonly sqlText: string,
public readonly retryable: boolean
) {
super(message);
this.name = 'SnowflakeQueryError';
}
}
const RETRYABLE_CODES = new Set([
390114, // Connection token expired — reconnect
390503, // Service unavailable
]);
export async function safeQuery<T>(
conn: snowflake.Connection,
sqlText: string,
binds?: snowflake.Binds,
maxRetries = 3
): Promise<T[]> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const { rows } = await query<T>(conn, sqlText, binds);
return rows;
} catch (err: any) {
const retryable = RETRYABLE_CODES.has(err.code);
if (!retryable || attempt === maxRetries) {
throw new SnowflakeQueryError(
err.message, err.sqlState, err.code, sqlText, retryable
);
}
const delay = 1000 * Math.pow(2, attempt - 1);
await new Promise((r) => setTimeout(r, delay));
}
}
throw new Error('Unreachable');
}
```
## Error Handling
| Pattern | Use Case | Benefit |
|---------|----------|---------|
| Connection pool | High concurrency apps | Reuses connections, prevents exhaustion |
| Promise wrapper | All Node.js code | Clean async/await instead of callbacks |
| Streaming | Large result sets (>100K rows) | Constant memory usage |
| Context manager | All Python code | Guarantees connection cleanup |
| Retry with backoff | Transient failures | Handles token expiry, service blips |
## Resources
- [Node.js Driver Execute](https://docs.snowflake.com/en/developer-guide/node-js/nodejs-driver-execute)
- [Node.js Consuming Results](https://docs.snowflake.com/en/developer-guide/node-js/nodejs-driver-consume)
- [Python Connector API](https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api)
## Next Steps
Apply patterns in `snowflake-core-workflow-a` for real-world data pipeline usage.
Related in Backend & APIs
jfrog
IncludedInteract with the JFrog Platform via the JFrog CLI and REST/GraphQL APIs. Use this skill when the user wants to manage Artifactory repositories, upload or download artifacts, manage builds, configure permissions, manage users and groups, work with access tokens, configure JFrog CLI servers, search artifacts, manage properties, set up replication, manage JFrog Projects, run security audits or scans, look up CVE details, query exposures scan results from JFrog Advanced Security, manage release bundles and lifecycle operations, aggregate or export platform data, or perform any JFrog Platform administration task. Also use when the user mentions jf, jfrog, artifactory, xray, distribution, evidence, apptrust, onemodel, graphql, workers, mission control, curation, advanced security, exposures, or any JFrog product name.
cupynumeric-migration-readiness
IncludedPre-migration readiness assessor for porting NumPy to cuPyNumeric. Use BEFORE substantial porting work begins when the user asks whether code will scale on GPU, whether they should migrate to cuPyNumeric, which NumPy patterns transfer cleanly, what must be refactored before porting, or mentions pre-port assessment, scaling analysis, or refactor planning. Inspect the user's source code, look up NumPy usage, cross-reference the cuPyNumeric API support manifest, and distinguish distributed-scaling-friendly patterns from blockers such as unsupported APIs, scalar synchronization, host round-trips, Python/object-heavy control flow, shape/data-dependent branching, and in-place mutation hazards. Produce a verdict of READY, LIGHT REFACTOR, SIGNIFICANT REFACTOR, or NOT RECOMMENDED, with concrete refactor pointers.
alibabacloud-data-agent-skill
IncludedInvoke Alibaba Cloud Apsara Data Agent for Analytics via CLI to perform natural language-driven data analysis on enterprise databases. Data Agent for Analytics is an intelligent data analysis agent developed by Alibaba Cloud Database team for enterprise users. It automatically completes requirement analysis, data understanding, analysis insights, and report generation based on natural language descriptions. This tool supports: discovering data resources (instances/databases/tables) managed in DMS, initiating query or deep analysis sessions, real-time progress tracking, and retrieving analysis conclusions and generated reports. Use this Skill when users need to query databases, analyze data trends, generate data reports, ask questions in natural language, or mention "Data Agent", "data analysis", "database query", "SQL analysis", "data insights".
token-optimizer
IncludedReduce OpenClaw token usage and API costs through smart model routing, heartbeat optimization, budget tracking, and native 2026.2.15 features (session pruning, bootstrap size limits, cache TTL alignment). Use when token costs are high, API rate limits are being hit, or hosting multiple agents at scale. The 4 executable scripts (context_optimizer, model_router, heartbeat_optimizer, token_tracker) are local-only — no network requests, no subprocess calls, no system modifications. Reference files (PROVIDERS.md, config-patches.json) document optional multi-provider strategies that require external API keys and network access if you choose to use them. See SECURITY.md for full breakdown.
resend-cli
IncludedUse this skill when the task is specifically about operating Resend from an AI agent, terminal session, or CI job via the official resend CLI: installing/authenticating the CLI, sending/listing/updating/cancelling emails, batch sends, domains and DNS, webhooks and local listeners, inbound receiving, contacts, topics, segments, broadcasts, templates, API keys, profiles, or debugging Resend CLI/API failures. Trigger on mentions of Resend CLI, `resend`, `resend doctor`, `resend emails send`, `resend domains`, `resend webhooks listen`, `resend emails receiving`, or agent-friendly terminal automation.
alibabacloud-odps-maxframe-coding
IncludedUse this skill for MaxFrame SDK development and documentation navigation on Alibaba Cloud MaxCompute (ODPS). Helps answer MaxFrame API, concept, official example, and supported pandas API questions; create data processing programs; read/write MaxCompute tables; debug jobs (remote or local); and build custom DPE runtime images. Trigger when users mention MaxFrame, MaxCompute with MaxFrame, ODPS table processing, DPE runtime, MaxFrame docs/examples, DataFrame/Tensor operations, or GPU runtime setup. Works for both English and Chinese queries about Alibaba Cloud data processing with MaxFrame.