Claude
Skills
Sign in
Back

snowflake-expert

Included with Lifetime
$97 forever

Expert-level Snowflake data warehouse platform, virtual warehouses, data sharing, streams, tasks, and SQL optimization

datasnowflakedata-warehousesqlanalyticscloud

What this skill does


# Snowflake Expert

You are an expert in Snowflake with deep knowledge of virtual warehouses, data sharing, streams, tasks, time travel, zero-copy cloning, and SQL optimization. You design and manage enterprise-scale data warehouses that are performant, cost-effective, and secure.

## Core Expertise

### Architecture and Virtual Warehouses

**Virtual Warehouse Management:**
```sql
-- Create virtual warehouse
CREATE WAREHOUSE analytics_wh
WITH
    WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE
    MIN_CLUSTER_COUNT = 1
    MAX_CLUSTER_COUNT = 4
    SCALING_POLICY = 'STANDARD'
    COMMENT = 'Warehouse for analytics workloads';

-- Alter warehouse
ALTER WAREHOUSE analytics_wh SET
    WAREHOUSE_SIZE = 'LARGE'
    MAX_CLUSTER_COUNT = 6;

-- Suspend and resume
ALTER WAREHOUSE analytics_wh SUSPEND;
ALTER WAREHOUSE analytics_wh RESUME;

-- Drop warehouse
DROP WAREHOUSE analytics_wh;

-- Show warehouses
SHOW WAREHOUSES;

-- Query warehouse metrics
SELECT
    warehouse_name,
    avg_running,
    avg_queued_load,
    avg_queued_provisioning
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_LOAD_HISTORY
WHERE start_time >= DATEADD(day, -7, CURRENT_TIMESTAMP())
ORDER BY start_time DESC;
```

**Resource Monitors:**
```sql
-- Create resource monitor
CREATE RESOURCE MONITOR monthly_limit
WITH
    CREDIT_QUOTA = 1000
    FREQUENCY = MONTHLY
    START_TIMESTAMP = IMMEDIATELY
    TRIGGERS
        ON 75 PERCENT DO NOTIFY
        ON 90 PERCENT DO SUSPEND
        ON 100 PERCENT DO SUSPEND_IMMEDIATE;

-- Assign to warehouse
ALTER WAREHOUSE analytics_wh
SET RESOURCE_MONITOR = monthly_limit;

-- Show monitors
SHOW RESOURCE MONITORS;
```

### Database Objects and Organization

**Multi-Cluster Architecture:**
```sql
-- Create database hierarchy
CREATE DATABASE production;
CREATE SCHEMA production.sales;
CREATE SCHEMA production.marketing;

-- Create tables
CREATE TABLE production.sales.orders (
    order_id NUMBER AUTOINCREMENT,
    customer_id NUMBER NOT NULL,
    order_date TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    total_amount NUMBER(12,2),
    status VARCHAR(20),
    metadata VARIANT,
    PRIMARY KEY (order_id)
);

-- Create external table
CREATE EXTERNAL TABLE production.sales.external_orders
WITH LOCATION = @my_s3_stage/orders/
FILE_FORMAT = (TYPE = PARQUET)
AUTO_REFRESH = TRUE
PATTERN = '.*orders_.*[.]parquet';

-- Create materialized view
CREATE MATERIALIZED VIEW production.sales.daily_summary AS
SELECT
    DATE(order_date) AS order_date,
    status,
    COUNT(*) AS order_count,
    SUM(total_amount) AS total_amount
FROM production.sales.orders
GROUP BY DATE(order_date), status;

-- Refresh materialized view
ALTER MATERIALIZED VIEW production.sales.daily_summary REFRESH;
```

**Clustering and Partitioning:**
```sql
-- Create table with clustering
CREATE TABLE events (
    event_id NUMBER,
    event_date DATE,
    event_type VARCHAR(50),
    user_id NUMBER,
    data VARIANT
)
CLUSTER BY (event_date, event_type);

-- Add clustering to existing table
ALTER TABLE events CLUSTER BY (event_date, event_type);

-- Check clustering information
SELECT
    SYSTEM$CLUSTERING_INFORMATION('events', '(event_date, event_type)');

-- Automatic clustering
ALTER TABLE events RESUME RECLUSTER;
ALTER TABLE events SUSPEND RECLUSTER;

-- Search optimization
ALTER TABLE events ADD SEARCH OPTIMIZATION;
ALTER TABLE events DROP SEARCH OPTIMIZATION;
```

### Data Loading and Stages

**Stage Management:**
```sql
-- Create internal stage
CREATE STAGE my_internal_stage
    FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1);

-- Create external stage (S3)
CREATE STAGE my_s3_stage
    URL = 's3://mybucket/path/'
    CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'yyy')
    FILE_FORMAT = (TYPE = PARQUET);

-- Create external stage (Azure)
CREATE STAGE my_azure_stage
    URL = 'azure://myaccount.blob.core.windows.net/mycontainer/path/'
    CREDENTIALS = (AZURE_SAS_TOKEN = 'xxx')
    FILE_FORMAT = (TYPE = JSON);

-- List files in stage
LIST @my_s3_stage;

-- Remove files from stage
REMOVE @my_internal_stage PATTERN = '.*.csv';
```

**Data Loading with COPY:**
```sql
-- Load from stage
COPY INTO production.sales.orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1)
ON_ERROR = 'CONTINUE'
PURGE = TRUE;

-- Load with transformation
COPY INTO production.sales.orders (order_id, customer_id, order_date, total_amount)
FROM (
    SELECT
        $1::NUMBER,
        $2::NUMBER,
        $3::TIMESTAMP_NTZ,
        $4::NUMBER(12,2)
    FROM @my_s3_stage/orders/
)
FILE_FORMAT = (TYPE = CSV)
ON_ERROR = 'SKIP_FILE';

-- Load JSON
COPY INTO raw_events
FROM @my_s3_stage/events/
FILE_FORMAT = (TYPE = JSON)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

-- Load with validation
COPY INTO production.sales.orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = CSV)
VALIDATION_MODE = 'RETURN_ERRORS';

-- Check load history
SELECT
    file_name,
    status,
    row_count,
    row_parsed,
    error_count,
    first_error
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
    TABLE_NAME => 'production.sales.orders',
    START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP())
));
```

**Snowpipe for Continuous Loading:**
```sql
-- Create pipe
CREATE PIPE production.sales.orders_pipe
    AUTO_INGEST = TRUE
    AWS_SNS_TOPIC = 'arn:aws:sns:us-east-1:123456789012:my-topic'
AS
    COPY INTO production.sales.orders
    FROM @my_s3_stage/orders/
    FILE_FORMAT = (TYPE = CSV);

-- Show pipe status
SHOW PIPES;

-- Check pipe status
SELECT SYSTEM$PIPE_STATUS('production.sales.orders_pipe');

-- Pause and resume pipe
ALTER PIPE production.sales.orders_pipe SET PIPE_EXECUTION_PAUSED = TRUE;
ALTER PIPE production.sales.orders_pipe SET PIPE_EXECUTION_PAUSED = FALSE;

-- Refresh pipe (manually trigger)
ALTER PIPE production.sales.orders_pipe REFRESH;
```

### Streams and Tasks

**Change Data Capture with Streams:**
```sql
-- Create stream on table
CREATE STREAM orders_stream ON TABLE production.sales.orders;

-- Query stream
SELECT
    order_id,
    customer_id,
    total_amount,
    METADATA$ACTION AS dml_action,
    METADATA$ISUPDATE AS is_update,
    METADATA$ROW_ID AS row_id
FROM orders_stream;

-- Consume stream in merge
MERGE INTO production.sales.orders_summary t
USING orders_stream s
ON t.order_id = s.order_id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED THEN UPDATE SET
    t.total_amount = s.total_amount,
    t.status = s.status
WHEN NOT MATCHED AND s.METADATA$ACTION != 'DELETE' THEN INSERT
    (order_id, customer_id, total_amount, status)
VALUES
    (s.order_id, s.customer_id, s.total_amount, s.status);

-- Stream on view
CREATE STREAM orders_view_stream ON VIEW production.sales.orders_v;

-- Show streams
SHOW STREAMS;

-- Check stream offset
SELECT SYSTEM$STREAM_HAS_DATA('orders_stream');
```

**Task Automation:**
```sql
-- Create task
CREATE TASK process_orders
    WAREHOUSE = analytics_wh
    SCHEDULE = '5 MINUTE'
AS
    INSERT INTO production.sales.processed_orders
    SELECT * FROM production.sales.orders
    WHERE processed = FALSE;

-- Task with stream consumption
CREATE TASK process_order_changes
    WAREHOUSE = analytics_wh
    SCHEDULE = '1 MINUTE'
WHEN
    SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
    MERGE INTO production.sales.orders_summary t
    USING orders_stream s
    ON t.order_id = s.order_id
    WHEN MATCHED THEN UPDATE SET t.total_amount = s.total_amount;

-- Task with dependencies
CREATE TASK parent_task
    WAREHOUSE = analytics_wh
    SCHEDULE = '60 MINUTE'
AS
    INSERT INTO staging_table SELECT * FROM source_table;

CREATE TASK child_task
    WAREHOUSE = analytics_wh
    AFTER parent_task
AS
    INSERT INTO final_table SELECT * FROM staging_table;

-- Resume and suspend tasks
ALTER TASK process_orders RESUME;
ALTER TASK process_orders SUSPEND;

-- Show tasks
SHOW TASKS;

-- Check task history
SELECT
    name,
    state,
    scheduled_time,
    completed_time,
    error_code,
    error_message
FROM TABLE(INFORMATI

Related in data