yaml-workflow-executor
Execute data processing workflows defined in YAML configuration files. Supports data loading, transformation, validation, and reporting pipelines.
What this skill does
# YAML Workflow Executor
> Execute standardized data processing workflows from YAML configuration files.
## Quick Start
```bash
# Execute workflow from YAML
/yaml-workflow-executor config/input/analysis.yaml
# Execute with output directory
/yaml-workflow-executor config/input/pipeline.yaml --output reports/
# Dry run (validate only)
/yaml-workflow-executor config/input/pipeline.yaml --dry-run
```
## When to Use
**USE when:**
- Running standardized analysis workflows
- Batch processing with different parameters
- Creating reproducible pipelines
- Separating configuration from code
**DON'T USE when:**
- One-off scripts
- Interactive exploration
- Configuration is simple (single parameter)
## Prerequisites
- Python 3.9+
- pyyaml>=6.0
- pydantic>=2.0 (for validation)
- Data files in expected locations
## Overview
Implements the YAML → Script → Report pattern used across workspace-hub:
1. **Load YAML** - Parse configuration file
2. **Validate** - Check required fields and types
3. **Execute** - Run processing pipeline
4. **Report** - Generate output and logs
## YAML Configuration Format
### Standard Structure
```yaml
# config/input/analysis_pipeline.yaml
# Metadata (required)
metadata:
name: "data-analysis-pipeline"
version: "1.0.0"
created: "2026-01-14"
author: "analyst"
description: "Process and analyze CSV data"
# Input configuration
input:
source:
type: "csv" # csv, excel, json, parquet
path: "data/raw/input.csv" # Relative path
encoding: "utf-8"
validation:
required_columns: ["id", "value", "date"]
max_rows: 1000000
max_size_mb: 100
# Processing steps
processing:
steps:
- name: "clean_data"
operation: "remove_nulls"
columns: ["value"]
- name: "transform"
operation: "calculate"
expression: "value * 1.1"
output_column: "adjusted_value"
- name: "aggregate"
operation: "group_by"
by: ["category"]
aggregations:
value: "sum"
count: "count"
# Output configuration
output:
format: "html" # html, csv, json, excel
path: "reports/analysis_report.html"
include_plots: true
plots:
- type: "time_series"
x: "date"
y: ["value", "adjusted_value"]
- type: "bar"
x: "category"
y: "sum_value"
# Execution settings
execution:
log_level: "INFO"
parallel: false
timeout_minutes: 30
```
### Complete Example
```yaml
# config/input/bsee_analysis.yaml
metadata:
name: "bsee-production-analysis"
version: "2.0.0"
created: "2026-01-14"
author: "energy-analyst"
description: "BSEE production data analysis with NPV calculation"
input:
source:
type: "csv"
path: "data/raw/bsee_production.csv"
date_columns: ["production_date"]
parse_dates: true
filters:
- column: "field_name"
operator: "in"
values: ["JULIA", "ANCHOR", "JACK"]
- column: "production_date"
operator: ">="
value: "2020-01-01"
validation:
required_columns:
- "api_number"
- "field_name"
- "oil_bbl"
- "gas_mcf"
- "production_date"
numeric_columns: ["oil_bbl", "gas_mcf", "water_bbl"]
processing:
steps:
- name: "clean"
operation: "fillna"
columns: ["water_bbl"]
value: 0
- name: "calculate_boe"
operation: "add_column"
expression: "oil_bbl + gas_mcf / 6"
output_column: "boe"
- name: "monthly_aggregate"
operation: "resample"
date_column: "production_date"
frequency: "M"
aggregations:
oil_bbl: "sum"
gas_mcf: "sum"
boe: "sum"
- name: "npv_calculation"
operation: "npv"
cash_flow_column: "revenue"
discount_rates: [0.08, 0.10, 0.12]
periods: 20
output:
format: "html"
path: "reports/bsee_analysis_{timestamp}.html"
title: "BSEE Production Analysis"
summary:
include: true
metrics:
- "total_oil_bbl"
- "total_gas_mcf"
- "total_boe"
- "npv_results"
plots:
- type: "time_series"
title: "Monthly Production"
x: "production_date"
y: ["oil_bbl", "gas_mcf"]
- type: "bar"
title: "Production by Field"
x: "field_name"
y: "total_boe"
- type: "line"
title: "NPV Sensitivity"
x: "discount_rate"
y: "npv"
execution:
log_level: "INFO"
save_intermediate: true
intermediate_path: "data/processed/"
parallel: true
n_workers: 4
```
## Core Implementation
### Workflow Executor Class
```python
"""
ABOUTME: YAML workflow executor for standardized data pipelines
ABOUTME: Executes configuration-driven processing workflows
"""
import yaml
import logging
from pathlib import Path
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
from pydantic import BaseModel, validator
class InputConfig(BaseModel):
"""Input configuration model."""
type: str = "csv"
path: str
encoding: str = "utf-8"
date_columns: List[str] = []
parse_dates: bool = True
class ProcessingStep(BaseModel):
"""Processing step configuration."""
name: str
operation: str
columns: Optional[List[str]] = None
expression: Optional[str] = None
output_column: Optional[str] = None
class WorkflowConfig(BaseModel):
"""Complete workflow configuration."""
metadata: Dict[str, Any]
input: Dict[str, Any]
processing: Dict[str, Any]
output: Dict[str, Any]
execution: Dict[str, Any] = {}
class YAMLWorkflowExecutor:
"""Execute workflows defined in YAML configuration."""
def __init__(self, config_path: Path):
"""
Initialize executor with configuration file.
Args:
config_path: Path to YAML configuration
"""
self.config_path = Path(config_path)
self.config: Optional[WorkflowConfig] = None
self.data: Optional[pd.DataFrame] = None
self.results: Dict[str, Any] = {}
self._setup_logging()
self._load_config()
def _setup_logging(self):
"""Configure logging."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def _load_config(self):
"""Load and validate configuration."""
if not self.config_path.exists():
raise FileNotFoundError(f"Config not found: {self.config_path}")
with open(self.config_path) as f:
raw_config = yaml.safe_load(f)
self.config = WorkflowConfig(**raw_config)
self.logger.info(f"Loaded config: {self.config.metadata.get('name')}")
def validate(self) -> bool:
"""
Validate configuration without executing.
Returns:
True if valid, False otherwise
"""
errors = []
# Check input file exists
input_path = Path(self.config.input['source']['path'])
if not input_path.exists():
errors.append(f"Input file not found: {input_path}")
# Check output directory
output_path = Path(self.config.output['path']).parent
if not output_path.exists():
self.logger.warning(f"Output directory will be created: {output_path}")
# Validate processing steps
for step in self.config.processing.get('steps', []):
if 'name' not in step:
errors.append("Processing step missing 'name'")
if 'operation' not in step:
errors.append(f"Step '{step.get('name')}' missing 'operation'")
if errors:
for error in errors:
self.logger.error(error)
return False
self.logger.info("Configuration validated successfully")
return True
def execute(self, dry_run: bool = False) -> Dict[str, Any]:
"""
Execute the workflow.
Args:
dry_ruRelated in workspace-hub
data-validation-reporter
IncludedGenerate interactive validation reports with quality scoring, missing data analysis, and type checking. Combines Pandas validation, Plotly visualization, and YAML configuration for comprehensive data quality reporting.
claude-reflection
IncludedSelf-improvement and learning skill that helps Claude learn from user interactions, corrections, and preferences
interactive-report-generator
IncludedGenerate interactive HTML reports with Plotly visualizations from data analysis results. Supports dashboards, charts, and professional styling.
bash-script-framework
IncludedCreate organized bash script structure with color output, menu systems, error handling, and cross-platform support. Standardizes CLI tooling.
pytest-fixture-generator
IncludedGenerate standardized pytest configuration with fixtures, markers, and coverage settings. Creates conftest.py and pytest.ini for workspace-hub compliant testing.
agent-os-framework
IncludedGenerate standardized .agent-os directory structure with product documentation, mission, tech-stack, roadmap, and decision records. Enables AI-native workflows.