Claude
Skills
Sign in
Back

airflow

Included with Lifetime
$97 forever

Python DAG workflow orchestration using Apache Airflow for data pipelines, ETL processes, and scheduled task automation

automationairflowdagworkfloworchestrationetldata-pipelineschedulingpython

What this skill does


# Apache Airflow Skill

Master Apache Airflow for workflow orchestration, data pipeline automation, and scheduled task management. This skill covers DAG authoring, operators, sensors, hooks, XComs, variables, connections, and deployment patterns.

## When to Use This Skill

### USE when:
- Building complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows
- Scheduling recurring batch jobs
- Managing workflows with retries and error handling
- Coordinating tasks across multiple systems
- Need visibility into workflow execution history
- Requiring audit trails and lineage tracking
- Building ML pipeline orchestration

### DON'T USE when:
- Real-time streaming data (use Kafka, Flink)
- Simple cron jobs (use systemd timers, crontab)
- CI/CD pipelines (use GitHub Actions, Jenkins)
- Low-latency requirements (Airflow has scheduler overhead)
- Simple single-task automation (overkill)
- Need visual workflow design for non-developers (use n8n)

## Prerequisites

### Installation Options

**Option 1: pip (Development)**
```bash
# Create virtual environment
python -m venv airflow-env
source airflow-env/bin/activate

# Set Airflow home
export AIRFLOW_HOME=~/airflow

# Install Airflow with constraints
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# Initialize database
airflow db init

# Create admin user
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email [email protected] \
    --password admin

# Start services
airflow webserver --port 8080 &
airflow scheduler &
```

**Option 2: Docker Compose (Recommended)**
```bash
# Download official docker-compose
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'

# Create required directories
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize
docker compose up airflow-init

# Start services
docker compose up -d

# Access UI at http://localhost:8080 (airflow/airflow)
```

**Option 3: Kubernetes with Helm**
```bash
# Add Airflow Helm repo
helm repo add apache-airflow https://airflow.apache.org
helm repo update

# Install Airflow
helm install airflow apache-airflow/airflow \
    --namespace airflow \
    --create-namespace \
    --set executor=KubernetesExecutor

# Get web UI password
kubectl get secret --namespace airflow airflow-webserver-secret -o jsonpath="{.data.webserver-secret-key}" | base64 --decode
```

### Development Setup
```bash
# Install development dependencies
pip install apache-airflow[dev,postgres,celery,kubernetes]

# Install testing tools
pip install pytest pytest-airflow

# Install linting
pip install ruff
```

## Core Capabilities

### 1. Basic DAG Structure

```python
# dags/basic_dag.py
"""
Basic DAG demonstrating core Airflow concepts.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

# Default arguments for all tasks
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'execution_timeout': timedelta(hours=2),
}

# DAG definition
with DAG(
    dag_id='basic_etl_pipeline',
    default_args=default_args,
    description='Basic ETL pipeline demonstrating core patterns',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2026, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'production'],
    doc_md="""
    ## Basic ETL Pipeline

    This DAG demonstrates:
    - Task dependencies
    - Python and Bash operators
    - Error handling with retries
    - Task documentation

    **Owner**: data-team
    **Schedule**: Daily at 6 AM UTC
    """,
) as dag:

    # Start marker
    start = EmptyOperator(
        task_id='start',
        doc='Pipeline start marker',
    )

    # Extract task
    def extract_data(**context):
        """Extract data from source systems."""
        import logging
        logger = logging.getLogger(__name__)

        # Access execution context
        execution_date = context['ds']
        logger.info(f"Extracting data for {execution_date}")

        # Simulated extraction
        data = {
            'records': 1000,
            'source': 'database',
            'execution_date': execution_date,
        }

        # Return value available via XCom
        return data

    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data,
        doc='Extract data from source database',
    )

    # Transform task
    def transform_data(**context):
        """Transform extracted data."""
        import logging
        logger = logging.getLogger(__name__)

        # Pull data from previous task via XCom
        ti = context['ti']
        extracted_data = ti.xcom_pull(task_ids='extract_data')

        logger.info(f"Transforming {extracted_data['records']} records")

        # Simulated transformation
        transformed = {
            **extracted_data,
            'records_transformed': extracted_data['records'],
            'quality_score': 0.95,
        }

        return transformed

    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        doc='Apply transformations to extracted data',
    )

    # Load task using Bash
    load = BashOperator(
        task_id='load_data',
        bash_command='''
            echo "Loading data to warehouse"
            echo "Execution date: {{ ds }}"
            echo "Previous task output: {{ ti.xcom_pull(task_ids='transform_data') }}"
        ''',
        doc='Load transformed data to data warehouse',
    )

    # End marker
    end = EmptyOperator(
        task_id='end',
        doc='Pipeline end marker',
        trigger_rule='all_success',
    )

    # Define dependencies
    start >> extract >> transform >> load >> end
```

### 2. Advanced Operators

```python
# dags/advanced_operators.py
"""
DAG demonstrating advanced operator patterns.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='advanced_operators_demo',
    default_args=default_args,
    schedule_interval=None,  # Manual trigger only
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['demo', 'advanced'],
) as dag:

    # BranchPythonOperator for conditional logic
    def choose_branch(**context):
        """Decide which branch to execute based on data."""
        import random

        # Simulated decision logic
        data_volume = random.randint(0, 1000)
        context['ti'].xcom_push(key='data_volume', value=data_volume)

        if data_volume > 500:
            return 'process_large_dataset'
        else:
            return 'process_small_dataset'

    branch_task = BranchPythonOperator(
        task_id='branch_on_data_volume',
        python_callable=choose_branch,
    )

    process_large = PythonOperator(
        task_id='process_large_dataset',
        python_callable=lambda: print("Processing large dataset with parallel workers"),
    )

    process_small = PythonOperator(
        task_id='process_small_dataset',
   

Related in automation