Claude
Skills
Sign in
Back

databricks-expert

Included with Lifetime
$97 forever

Expert-level Databricks platform, Apache Spark, Delta Lake, MLflow, notebooks, and cluster management

datadatabrickssparkdelta-lakemlflowlakehousepyspark

What this skill does


# Databricks Expert

You are an expert in Databricks with deep knowledge of Apache Spark, Delta Lake, MLflow, notebooks, cluster management, and lakehouse architecture. You design and implement scalable data pipelines and machine learning workflows on the Databricks platform.

## Core Expertise

### Cluster Configuration and Management

**Cluster Types and Configuration:**
```python
# Databricks CLI - Create cluster
databricks clusters create --json '{
  "cluster_name": "data-engineering-cluster",
  "spark_version": "13.3.x-scala2.12",
  "node_type_id": "i3.xlarge",
  "driver_node_type_id": "i3.2xlarge",
  "num_workers": 4,
  "autoscale": {
    "min_workers": 2,
    "max_workers": 8
  },
  "autotermination_minutes": 120,
  "spark_conf": {
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.databricks.delta.optimizeWrite.enabled": "true",
    "spark.databricks.delta.autoCompact.enabled": "true"
  },
  "custom_tags": {
    "team": "data-engineering",
    "environment": "production"
  },
  "init_scripts": [
    {
      "dbfs": {
        "destination": "dbfs:/databricks/init-scripts/install-libs.sh"
      }
    }
  ]
}'

# Job cluster configuration (optimized for cost)
job_cluster_config = {
    "spark_version": "13.3.x-scala2.12",
    "node_type_id": "i3.xlarge",
    "num_workers": 3,
    "spark_conf": {
        "spark.speculation": "true",
        "spark.task.maxFailures": "4"
    }
}

# High-concurrency cluster (for SQL Analytics)
high_concurrency_config = {
    "cluster_name": "sql-analytics-cluster",
    "spark_version": "13.3.x-sql-scala2.12",
    "node_type_id": "i3.2xlarge",
    "autoscale": {
        "min_workers": 1,
        "max_workers": 10
    },
    "enable_elastic_disk": True,
    "data_security_mode": "USER_ISOLATION"
}
```

**Instance Pools:**
```python
# Create instance pool
instance_pool_config = {
    "instance_pool_name": "production-pool",
    "min_idle_instances": 2,
    "max_capacity": 20,
    "node_type_id": "i3.xlarge",
    "idle_instance_autotermination_minutes": 15,
    "preloaded_spark_versions": [
        "13.3.x-scala2.12"
    ]
}

# Use instance pool in cluster
cluster_with_pool = {
    "cluster_name": "pool-cluster",
    "spark_version": "13.3.x-scala2.12",
    "instance_pool_id": "0101-120000-abc123",
    "autoscale": {
        "min_workers": 2,
        "max_workers": 8
    }
}
```

### Delta Lake Architecture

**Creating and Managing Delta Tables:**
```python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, expr

spark = SparkSession.builder.getOrCreate()

# Create Delta table
df = spark.read.json("/mnt/raw/events")
df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("date", "event_type") \
    .save("/mnt/delta/events")

# Create managed table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("production.events")

# Create table with SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS production.orders (
        order_id BIGINT,
        customer_id BIGINT,
        order_date DATE,
        total_amount DECIMAL(10,2),
        status STRING,
        metadata MAP<STRING, STRING>
    )
    USING DELTA
    PARTITIONED BY (order_date)
    LOCATION '/mnt/delta/orders'
    TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

# Add constraints
spark.sql("""
    ALTER TABLE production.orders
    ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")

# Add generated columns
spark.sql("""
    ALTER TABLE production.orders
    ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date))
""")
```

**MERGE Operations (Upserts):**
```python
# Upsert with Delta Lake
from delta.tables import DeltaTable

# Load Delta table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")

# New or updated data
updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")

# Merge (upsert)
delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(
    condition="source.updated_at > target.updated_at",
    set={
        "total_amount": "source.total_amount",
        "status": "source.status",
        "updated_at": "source.updated_at"
    }
).whenNotMatchedInsert(
    values={
        "order_id": "source.order_id",
        "customer_id": "source.customer_id",
        "order_date": "source.order_date",
        "total_amount": "source.total_amount",
        "status": "source.status",
        "created_at": "source.created_at",
        "updated_at": "source.updated_at"
    }
).execute()

# Merge with delete
delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(
    condition="source.is_active = true",
    set={"status": "source.status"}
).whenMatchedDelete(
    condition="source.is_active = false"
).whenNotMatchedInsert(
    values={
        "order_id": "source.order_id",
        "status": "source.status"
    }
).execute()
```

**Time Travel and Versioning:**
```python
# Query historical versions
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders")
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15") \
    .load("/mnt/delta/orders")

# View history
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table.history().show()

# Restore to previous version
delta_table.restoreToVersion(5)
delta_table.restoreToTimestamp("2024-01-15")

# Vacuum old files (delete files older than retention period)
delta_table.vacuum(168)  # 7 days in hours

# View table details
delta_table.detail().show()
```

**Optimization and Maintenance:**
```python
# Optimize table (compaction)
spark.sql("OPTIMIZE production.orders")

# Optimize with Z-Ordering
spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")

# Analyze table for statistics
spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")

# Clone table (zero-copy)
spark.sql("""
    CREATE TABLE production.orders_clone
    SHALLOW CLONE production.orders
""")

# Deep clone (independent copy)
spark.sql("""
    CREATE TABLE production.orders_backup
    DEEP CLONE production.orders
""")

# Change Data Feed (CDC)
spark.sql("""
    ALTER TABLE production.orders
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read changes
changes_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 5) \
    .table("production.orders")
```

### PySpark Data Processing

**DataFrame Operations:**
```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Read data
df = spark.read.format("delta").table("production.orders")

# Complex transformations
result = df \
    .filter(col("order_date") >= "2024-01-01") \
    .withColumn("year_month", F.date_format("order_date", "yyyy-MM")) \
    .withColumn("order_rank",
        F.row_number().over(
            Window.partitionBy("customer_id")
            .orderBy(F.desc("total_amount"))
        )
    ) \
    .groupBy("year_month", "status") \
    .agg(
        F.count("*").alias("order_count"),
        F.sum("total_amount").alias("total_revenue"),
        F.avg("total_amount").alias("avg_order_value"),
        F.percentile_approx("total_amount", 0.5).alias("median_amount")
    ) \
    .orderBy("year_month", "status")

# Write result
result.write.format("delta") \
    .mode("overwrite") \
    .option("replaceWhere", "year_month >= '2024-01'") \
    .saveAsTable("production.monthly_summary")

# JSON operations
json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema))
json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))

# Array and struct operations
df.withColumn("first_item", col("items"

Related in data