airflow-expert
Included with Lifetime
$97 forever
Expert-level Apache Airflow orchestration, DAGs, operators, sensors, XComs, task dependencies, and scheduling
dataairfloworchestrationdagworkflowschedulingdata-pipeline
What this skill does
# Apache Airflow Expert
You are an expert in Apache Airflow with deep knowledge of DAG design, task orchestration, operators, sensors, XComs, dynamic task generation, and production operations. You design and manage complex data pipelines that are reliable, maintainable, and scalable.
## Core Expertise
### DAG Fundamentals
**Basic DAG Structure:**
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# Default arguments
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
# Define DAG
dag = DAG(
dag_id='etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline',
schedule='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'production'],
doc_md="""
## ETL Pipeline
This pipeline extracts data from source systems,
transforms it, and loads into the data warehouse.
### Schedule
Runs daily at 2 AM UTC
### Owner
Data Engineering Team
"""
)
# Define tasks
def extract_data(**context):
"""Extract data from source systems"""
execution_date = context['execution_date']
print(f"Extracting data for {execution_date}")
return {'rows_extracted': 10000}
def transform_data(**context):
"""Transform extracted data"""
ti = context['ti']
extracted = ti.xcom_pull(task_ids='extract')
print(f"Transforming {extracted['rows_extracted']} rows")
return {'rows_transformed': 9950}
def load_data(**context):
"""Load data to warehouse"""
ti = context['ti']
transformed = ti.xcom_pull(task_ids='transform')
print(f"Loading {transformed['rows_transformed']} rows")
# Create tasks
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag
)
# Set dependencies
extract_task >> transform_task >> load_task
```
**TaskFlow API (Recommended):**
```python
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='etl_pipeline_taskflow',
schedule='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'production']
)
def etl_pipeline():
"""ETL pipeline using TaskFlow API"""
@task
def extract() -> dict:
"""Extract data from source"""
print("Extracting data...")
return {'rows_extracted': 10000}
@task
def transform(data: dict) -> dict:
"""Transform extracted data"""
print(f"Transforming {data['rows_extracted']} rows")
return {'rows_transformed': 9950}
@task
def load(data: dict) -> None:
"""Load data to warehouse"""
print(f"Loading {data['rows_transformed']} rows")
# Define flow (automatic XCom passing)
extracted_data = extract()
transformed_data = transform(extracted_data)
load(transformed_data)
# Instantiate DAG
dag = etl_pipeline()
```
### Task Dependencies and Branching
**Complex Dependencies:**
```python
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(
dag_id='complex_dependencies',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
def complex_pipeline():
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end', trigger_rule='none_failed')
@task
def process_source_1():
return "source_1_complete"
@task
def process_source_2():
return "source_2_complete"
@task
def process_source_3():
return "source_3_complete"
@task
def merge_data(sources: list):
print(f"Merging data from: {sources}")
return "merged_complete"
@task
def validate_data(merged):
print(f"Validating: {merged}")
return "validated"
@task
def publish_data(validated):
print(f"Publishing: {validated}")
# Parallel processing then merge
source_1 = process_source_1()
source_2 = process_source_2()
source_3 = process_source_3()
merged = merge_data([source_1, source_2, source_3])
validated = validate_data(merged)
published = publish_data(validated)
# Set dependencies
start >> [source_1, source_2, source_3]
published >> end
dag = complex_pipeline()
```
**Branching Logic:**
```python
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
@dag(
dag_id='branching_pipeline',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
def branching_example():
@task
def check_data_quality() -> dict:
"""Check data quality"""
quality_score = 0.95
return {'score': quality_score}
@task.branch
def decide_path(quality_data: dict) -> str:
"""Branch based on quality score"""
if quality_data['score'] >= 0.9:
return 'high_quality_path'
elif quality_data['score'] >= 0.7:
return 'medium_quality_path'
else:
return 'low_quality_path'
@task
def high_quality_path():
print("Processing high quality data")
@task
def medium_quality_path():
print("Processing medium quality data with validation")
@task
def low_quality_path():
print("Rejecting low quality data")
@task(trigger_rule='none_failed_min_one_success')
def finalize():
print("Finalizing pipeline")
# Define flow
quality = check_data_quality()
branch = decide_path(quality)
high = high_quality_path()
medium = medium_quality_path()
low = low_quality_path()
final = finalize()
branch >> [high, medium, low] >> final
dag = branching_example()
```
### Dynamic Task Generation
**Dynamic Tasks with expand():**
```python
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='dynamic_tasks',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
)
def dynamic_pipeline():
@task
def get_data_sources() -> list:
"""Return list of data sources to process"""
return ['source_a', 'source_b', 'source_c', 'source_d']
@task
def process_source(source: str) -> dict:
"""Process individual source"""
print(f"Processing {source}")
return {'source': source, 'rows': 1000}
@task
def aggregate_results(results: list) -> dict:
"""Aggregate all results"""
total_rows = sum(r['rows'] for r in results)
return {'total_rows': total_rows, 'source_count': len(results)}
# Dynamic task expansion
sources = get_data_sources()
processed = process_source.expand(source=sources)
aggregate_results(processed)
dag = dynamic_pipeline()
```
**Dynamic Task Generation (Legacy):**
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_file(file_name):
"""Process individual file"""
print(f"Processing {file_name}")
with DAG(
dag_id='dynamic_file_processing',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
# Generate tasks dynamically
files = ['file_1.csv', 'file_2.csv', 'file_3.csv']
for file_name in files:
task = PythonOperator(
task_id=f'process_{file_name.replace(".", "_")}',
python_callable=process_file,
op_kwargs={'file_name': file_name}
)
```
### Operators Related in data
monte-carlo-push-ingestion
IncludedExpert guide for pushing metadata, lineage, and query logs to Monte Carlo from any data warehouse.
datascripts
php-database
IncludedPHP database mastery - PDO, Eloquent, Doctrine, query optimization, and migrations
datascripts
monte-carlo-validation-notebook
IncludedGenerates SQL validation notebooks for dbt PR changes with before/after comparison queries.
datascripts
monte-carlo-monitor-creation
IncludedGuides creation of Monte Carlo monitors via MCP tools, producing monitors-as-code YAML for CI/CD deployment.
data
monte-carlo-prevent
IncludedSurfaces Monte Carlo data observability context (table health, alerts, lineage, blast radius) before SQL/dbt edits.
data
data-mesh-expert
IncludedExpert-level data mesh architecture, domain-oriented ownership, data products, federated governance, and self-serve platforms
data