airflow
Python DAG workflow orchestration using Apache Airflow for data pipelines, ETL processes, and scheduled task automation
What this skill does
# Apache Airflow Skill
Master Apache Airflow for workflow orchestration, data pipeline automation, and scheduled task management. This skill covers DAG authoring, operators, sensors, hooks, XComs, variables, connections, and deployment patterns.
## When to Use This Skill
### USE when:
- Building complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows
- Scheduling recurring batch jobs
- Managing workflows with retries and error handling
- Coordinating tasks across multiple systems
- Need visibility into workflow execution history
- Requiring audit trails and lineage tracking
- Building ML pipeline orchestration
### DON'T USE when:
- Real-time streaming data (use Kafka, Flink)
- Simple cron jobs (use systemd timers, crontab)
- CI/CD pipelines (use GitHub Actions, Jenkins)
- Low-latency requirements (Airflow has scheduler overhead)
- Simple single-task automation (overkill)
- Need visual workflow design for non-developers (use n8n)
## Prerequisites
### Installation Options
**Option 1: pip (Development)**
```bash
# Create virtual environment
python -m venv airflow-env
source airflow-env/bin/activate
# Set Airflow home
export AIRFLOW_HOME=~/airflow
# Install Airflow with constraints
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# Initialize database
airflow db init
# Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected] \
--password admin
# Start services
airflow webserver --port 8080 &
airflow scheduler &
```
**Option 2: Docker Compose (Recommended)**
```bash
# Download official docker-compose
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'
# Create required directories
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
# Initialize
docker compose up airflow-init
# Start services
docker compose up -d
# Access UI at http://localhost:8080 (airflow/airflow)
```
**Option 3: Kubernetes with Helm**
```bash
# Add Airflow Helm repo
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Install Airflow
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--set executor=KubernetesExecutor
# Get web UI password
kubectl get secret --namespace airflow airflow-webserver-secret -o jsonpath="{.data.webserver-secret-key}" | base64 --decode
```
### Development Setup
```bash
# Install development dependencies
pip install apache-airflow[dev,postgres,celery,kubernetes]
# Install testing tools
pip install pytest pytest-airflow
# Install linting
pip install ruff
```
## Core Capabilities
### 1. Basic DAG Structure
```python
# dags/basic_dag.py
"""
Basic DAG demonstrating core Airflow concepts.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
# Default arguments for all tasks
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'execution_timeout': timedelta(hours=2),
}
# DAG definition
with DAG(
dag_id='basic_etl_pipeline',
default_args=default_args,
description='Basic ETL pipeline demonstrating core patterns',
schedule_interval='0 6 * * *', # Daily at 6 AM
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'production'],
doc_md="""
## Basic ETL Pipeline
This DAG demonstrates:
- Task dependencies
- Python and Bash operators
- Error handling with retries
- Task documentation
**Owner**: data-team
**Schedule**: Daily at 6 AM UTC
""",
) as dag:
# Start marker
start = EmptyOperator(
task_id='start',
doc='Pipeline start marker',
)
# Extract task
def extract_data(**context):
"""Extract data from source systems."""
import logging
logger = logging.getLogger(__name__)
# Access execution context
execution_date = context['ds']
logger.info(f"Extracting data for {execution_date}")
# Simulated extraction
data = {
'records': 1000,
'source': 'database',
'execution_date': execution_date,
}
# Return value available via XCom
return data
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
doc='Extract data from source database',
)
# Transform task
def transform_data(**context):
"""Transform extracted data."""
import logging
logger = logging.getLogger(__name__)
# Pull data from previous task via XCom
ti = context['ti']
extracted_data = ti.xcom_pull(task_ids='extract_data')
logger.info(f"Transforming {extracted_data['records']} records")
# Simulated transformation
transformed = {
**extracted_data,
'records_transformed': extracted_data['records'],
'quality_score': 0.95,
}
return transformed
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
doc='Apply transformations to extracted data',
)
# Load task using Bash
load = BashOperator(
task_id='load_data',
bash_command='''
echo "Loading data to warehouse"
echo "Execution date: {{ ds }}"
echo "Previous task output: {{ ti.xcom_pull(task_ids='transform_data') }}"
''',
doc='Load transformed data to data warehouse',
)
# End marker
end = EmptyOperator(
task_id='end',
doc='Pipeline end marker',
trigger_rule='all_success',
)
# Define dependencies
start >> extract >> transform >> load >> end
```
### 2. Advanced Operators
```python
# dags/advanced_operators.py
"""
DAG demonstrating advanced operator patterns.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='advanced_operators_demo',
default_args=default_args,
schedule_interval=None, # Manual trigger only
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['demo', 'advanced'],
) as dag:
# BranchPythonOperator for conditional logic
def choose_branch(**context):
"""Decide which branch to execute based on data."""
import random
# Simulated decision logic
data_volume = random.randint(0, 1000)
context['ti'].xcom_push(key='data_volume', value=data_volume)
if data_volume > 500:
return 'process_large_dataset'
else:
return 'process_small_dataset'
branch_task = BranchPythonOperator(
task_id='branch_on_data_volume',
python_callable=choose_branch,
)
process_large = PythonOperator(
task_id='process_large_dataset',
python_callable=lambda: print("Processing large dataset with parallel workers"),
)
process_small = PythonOperator(
task_id='process_small_dataset',
Related in automation
prompt-engineer
IncludedTransforms user prompts into optimized prompts using frameworks (RTF, RISEN, Chain of Thought, RODES, Chain of Density, RACE, RISE, STAR, SOAP, CLEAR, GROW)
windmill
IncludedDeveloper-first workflow engine that turns scripts into workflows and UIs, supporting Python, TypeScript, Go, and Bash with approval flows, schedule management, and self-hosted deployment
prompt-engineer
IncludedTransforms user prompts into optimized prompts using frameworks (RTF, RISEN, Chain of Thought, RODES, Chain of Density, RACE, RISE, STAR, SOAP, CLEAR, GROW)
activepieces
IncludedSelf-hosted no-code automation platform with visual flow builder, type-safe custom pieces, API integrations, and event-driven triggers
github-actions
IncludedCI/CD automation and workflow orchestration using GitHub Actions for builds, tests, deployments, and repository automation
n8n
IncludedOpen-source workflow automation platform with visual node-based editor, 400+ integrations, webhooks, and self-hosted deployment capabilities