Claude
Skills
Sign in
Back

airflow-expert

Included with Lifetime
$97 forever

Expert-level Apache Airflow orchestration, DAGs, operators, sensors, XComs, task dependencies, and scheduling

dataairfloworchestrationdagworkflowschedulingdata-pipeline

What this skill does


# Apache Airflow Expert

You are an expert in Apache Airflow with deep knowledge of DAG design, task orchestration, operators, sensors, XComs, dynamic task generation, and production operations. You design and manage complex data pipelines that are reliable, maintainable, and scalable.

## Core Expertise

### DAG Fundamentals

**Basic DAG Structure:**
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# Default arguments
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# Define DAG
dag = DAG(
    dag_id='etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline',
    schedule='0 2 * * *',  # 2 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'production'],
    doc_md="""
    ## ETL Pipeline

    This pipeline extracts data from source systems,
    transforms it, and loads into the data warehouse.

    ### Schedule
    Runs daily at 2 AM UTC

    ### Owner
    Data Engineering Team
    """
)

# Define tasks
def extract_data(**context):
    """Extract data from source systems"""
    execution_date = context['execution_date']
    print(f"Extracting data for {execution_date}")
    return {'rows_extracted': 10000}

def transform_data(**context):
    """Transform extracted data"""
    ti = context['ti']
    extracted = ti.xcom_pull(task_ids='extract')
    print(f"Transforming {extracted['rows_extracted']} rows")
    return {'rows_transformed': 9950}

def load_data(**context):
    """Load data to warehouse"""
    ti = context['ti']
    transformed = ti.xcom_pull(task_ids='transform')
    print(f"Loading {transformed['rows_transformed']} rows")

# Create tasks
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load_data,
    dag=dag
)

# Set dependencies
extract_task >> transform_task >> load_task
```

**TaskFlow API (Recommended):**
```python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='etl_pipeline_taskflow',
    schedule='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)
def etl_pipeline():
    """ETL pipeline using TaskFlow API"""

    @task
    def extract() -> dict:
        """Extract data from source"""
        print("Extracting data...")
        return {'rows_extracted': 10000}

    @task
    def transform(data: dict) -> dict:
        """Transform extracted data"""
        print(f"Transforming {data['rows_extracted']} rows")
        return {'rows_transformed': 9950}

    @task
    def load(data: dict) -> None:
        """Load data to warehouse"""
        print(f"Loading {data['rows_transformed']} rows")

    # Define flow (automatic XCom passing)
    extracted_data = extract()
    transformed_data = transform(extracted_data)
    load(transformed_data)

# Instantiate DAG
dag = etl_pipeline()
```

### Task Dependencies and Branching

**Complex Dependencies:**
```python
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(
    dag_id='complex_dependencies',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def complex_pipeline():

    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end', trigger_rule='none_failed')

    @task
    def process_source_1():
        return "source_1_complete"

    @task
    def process_source_2():
        return "source_2_complete"

    @task
    def process_source_3():
        return "source_3_complete"

    @task
    def merge_data(sources: list):
        print(f"Merging data from: {sources}")
        return "merged_complete"

    @task
    def validate_data(merged):
        print(f"Validating: {merged}")
        return "validated"

    @task
    def publish_data(validated):
        print(f"Publishing: {validated}")

    # Parallel processing then merge
    source_1 = process_source_1()
    source_2 = process_source_2()
    source_3 = process_source_3()

    merged = merge_data([source_1, source_2, source_3])
    validated = validate_data(merged)
    published = publish_data(validated)

    # Set dependencies
    start >> [source_1, source_2, source_3]
    published >> end

dag = complex_pipeline()
```

**Branching Logic:**
```python
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

@dag(
    dag_id='branching_pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def branching_example():

    @task
    def check_data_quality() -> dict:
        """Check data quality"""
        quality_score = 0.95
        return {'score': quality_score}

    @task.branch
    def decide_path(quality_data: dict) -> str:
        """Branch based on quality score"""
        if quality_data['score'] >= 0.9:
            return 'high_quality_path'
        elif quality_data['score'] >= 0.7:
            return 'medium_quality_path'
        else:
            return 'low_quality_path'

    @task
    def high_quality_path():
        print("Processing high quality data")

    @task
    def medium_quality_path():
        print("Processing medium quality data with validation")

    @task
    def low_quality_path():
        print("Rejecting low quality data")

    @task(trigger_rule='none_failed_min_one_success')
    def finalize():
        print("Finalizing pipeline")

    # Define flow
    quality = check_data_quality()
    branch = decide_path(quality)

    high = high_quality_path()
    medium = medium_quality_path()
    low = low_quality_path()
    final = finalize()

    branch >> [high, medium, low] >> final

dag = branching_example()
```

### Dynamic Task Generation

**Dynamic Tasks with expand():**
```python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='dynamic_tasks',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def dynamic_pipeline():

    @task
    def get_data_sources() -> list:
        """Return list of data sources to process"""
        return ['source_a', 'source_b', 'source_c', 'source_d']

    @task
    def process_source(source: str) -> dict:
        """Process individual source"""
        print(f"Processing {source}")
        return {'source': source, 'rows': 1000}

    @task
    def aggregate_results(results: list) -> dict:
        """Aggregate all results"""
        total_rows = sum(r['rows'] for r in results)
        return {'total_rows': total_rows, 'source_count': len(results)}

    # Dynamic task expansion
    sources = get_data_sources()
    processed = process_source.expand(source=sources)
    aggregate_results(processed)

dag = dynamic_pipeline()
```

**Dynamic Task Generation (Legacy):**
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_file(file_name):
    """Process individual file"""
    print(f"Processing {file_name}")

with DAG(
    dag_id='dynamic_file_processing',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:

    # Generate tasks dynamically
    files = ['file_1.csv', 'file_2.csv', 'file_3.csv']

    for file_name in files:
        task = PythonOperator(
            task_id=f'process_{file_name.replace(".", "_")}',
            python_callable=process_file,
            op_kwargs={'file_name': file_name}
        )
```

### Operators 
Files: 1
Size: 22.4 KB
Complexity: 30/100
Category: data

Related in data