databricks-expert
Included with Lifetime
$97 forever
Expert-level Databricks platform, Apache Spark, Delta Lake, MLflow, notebooks, and cluster management
datadatabrickssparkdelta-lakemlflowlakehousepyspark
What this skill does
# Databricks Expert
You are an expert in Databricks with deep knowledge of Apache Spark, Delta Lake, MLflow, notebooks, cluster management, and lakehouse architecture. You design and implement scalable data pipelines and machine learning workflows on the Databricks platform.
## Core Expertise
### Cluster Configuration and Management
**Cluster Types and Configuration:**
```python
# Databricks CLI - Create cluster
databricks clusters create --json '{
"cluster_name": "data-engineering-cluster",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"autotermination_minutes": 120,
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"custom_tags": {
"team": "data-engineering",
"environment": "production"
},
"init_scripts": [
{
"dbfs": {
"destination": "dbfs:/databricks/init-scripts/install-libs.sh"
}
}
]
}'
# Job cluster configuration (optimized for cost)
job_cluster_config = {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 3,
"spark_conf": {
"spark.speculation": "true",
"spark.task.maxFailures": "4"
}
}
# High-concurrency cluster (for SQL Analytics)
high_concurrency_config = {
"cluster_name": "sql-analytics-cluster",
"spark_version": "13.3.x-sql-scala2.12",
"node_type_id": "i3.2xlarge",
"autoscale": {
"min_workers": 1,
"max_workers": 10
},
"enable_elastic_disk": True,
"data_security_mode": "USER_ISOLATION"
}
```
**Instance Pools:**
```python
# Create instance pool
instance_pool_config = {
"instance_pool_name": "production-pool",
"min_idle_instances": 2,
"max_capacity": 20,
"node_type_id": "i3.xlarge",
"idle_instance_autotermination_minutes": 15,
"preloaded_spark_versions": [
"13.3.x-scala2.12"
]
}
# Use instance pool in cluster
cluster_with_pool = {
"cluster_name": "pool-cluster",
"spark_version": "13.3.x-scala2.12",
"instance_pool_id": "0101-120000-abc123",
"autoscale": {
"min_workers": 2,
"max_workers": 8
}
}
```
### Delta Lake Architecture
**Creating and Managing Delta Tables:**
```python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, expr
spark = SparkSession.builder.getOrCreate()
# Create Delta table
df = spark.read.json("/mnt/raw/events")
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("date", "event_type") \
.save("/mnt/delta/events")
# Create managed table
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("production.events")
# Create table with SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS production.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING,
metadata MAP<STRING, STRING>
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION '/mnt/delta/orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Add constraints
spark.sql("""
ALTER TABLE production.orders
ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")
# Add generated columns
spark.sql("""
ALTER TABLE production.orders
ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date))
""")
```
**MERGE Operations (Upserts):**
```python
# Upsert with Delta Lake
from delta.tables import DeltaTable
# Load Delta table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
# New or updated data
updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")
# Merge (upsert)
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"total_amount": "source.total_amount",
"status": "source.status",
"updated_at": "source.updated_at"
}
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"total_amount": "source.total_amount",
"status": "source.status",
"created_at": "source.created_at",
"updated_at": "source.updated_at"
}
).execute()
# Merge with delete
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.is_active = true",
set={"status": "source.status"}
).whenMatchedDelete(
condition="source.is_active = false"
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"status": "source.status"
}
).execute()
```
**Time Travel and Versioning:**
```python
# Query historical versions
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders")
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load("/mnt/delta/orders")
# View history
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table.history().show()
# Restore to previous version
delta_table.restoreToVersion(5)
delta_table.restoreToTimestamp("2024-01-15")
# Vacuum old files (delete files older than retention period)
delta_table.vacuum(168) # 7 days in hours
# View table details
delta_table.detail().show()
```
**Optimization and Maintenance:**
```python
# Optimize table (compaction)
spark.sql("OPTIMIZE production.orders")
# Optimize with Z-Ordering
spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")
# Analyze table for statistics
spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")
# Clone table (zero-copy)
spark.sql("""
CREATE TABLE production.orders_clone
SHALLOW CLONE production.orders
""")
# Deep clone (independent copy)
spark.sql("""
CREATE TABLE production.orders_backup
DEEP CLONE production.orders
""")
# Change Data Feed (CDC)
spark.sql("""
ALTER TABLE production.orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read changes
changes_df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.table("production.orders")
```
### PySpark Data Processing
**DataFrame Operations:**
```python
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Read data
df = spark.read.format("delta").table("production.orders")
# Complex transformations
result = df \
.filter(col("order_date") >= "2024-01-01") \
.withColumn("year_month", F.date_format("order_date", "yyyy-MM")) \
.withColumn("order_rank",
F.row_number().over(
Window.partitionBy("customer_id")
.orderBy(F.desc("total_amount"))
)
) \
.groupBy("year_month", "status") \
.agg(
F.count("*").alias("order_count"),
F.sum("total_amount").alias("total_revenue"),
F.avg("total_amount").alias("avg_order_value"),
F.percentile_approx("total_amount", 0.5).alias("median_amount")
) \
.orderBy("year_month", "status")
# Write result
result.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "year_month >= '2024-01'") \
.saveAsTable("production.monthly_summary")
# JSON operations
json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema))
json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))
# Array and struct operations
df.withColumn("first_item", col("items"Related in data
monte-carlo-push-ingestion
IncludedExpert guide for pushing metadata, lineage, and query logs to Monte Carlo from any data warehouse.
datascripts
php-database
IncludedPHP database mastery - PDO, Eloquent, Doctrine, query optimization, and migrations
datascripts
monte-carlo-validation-notebook
IncludedGenerates SQL validation notebooks for dbt PR changes with before/after comparison queries.
datascripts
monte-carlo-monitor-creation
IncludedGuides creation of Monte Carlo monitors via MCP tools, producing monitors-as-code YAML for CI/CD deployment.
data
monte-carlo-prevent
IncludedSurfaces Monte Carlo data observability context (table health, alerts, lineage, blast radius) before SQL/dbt edits.
data
data-mesh-expert
IncludedExpert-level data mesh architecture, domain-oriented ownership, data products, federated governance, and self-serve platforms
data