Claude
Skills
Sign in
Back

yaml-workflow-executor

Included with Lifetime
$97 forever

Execute data processing workflows defined in YAML configuration files. Supports data loading, transformation, validation, and reporting pipelines.

workspace-hub

What this skill does


# YAML Workflow Executor

> Execute standardized data processing workflows from YAML configuration files.

## Quick Start

```bash
# Execute workflow from YAML
/yaml-workflow-executor config/input/analysis.yaml

# Execute with output directory
/yaml-workflow-executor config/input/pipeline.yaml --output reports/

# Dry run (validate only)
/yaml-workflow-executor config/input/pipeline.yaml --dry-run
```

## When to Use

**USE when:**
- Running standardized analysis workflows
- Batch processing with different parameters
- Creating reproducible pipelines
- Separating configuration from code

**DON'T USE when:**
- One-off scripts
- Interactive exploration
- Configuration is simple (single parameter)

## Prerequisites

- Python 3.9+
- pyyaml>=6.0
- pydantic>=2.0 (for validation)
- Data files in expected locations

## Overview

Implements the YAML → Script → Report pattern used across workspace-hub:

1. **Load YAML** - Parse configuration file
2. **Validate** - Check required fields and types
3. **Execute** - Run processing pipeline
4. **Report** - Generate output and logs

## YAML Configuration Format

### Standard Structure

```yaml
# config/input/analysis_pipeline.yaml

# Metadata (required)
metadata:
  name: "data-analysis-pipeline"
  version: "1.0.0"
  created: "2026-01-14"
  author: "analyst"
  description: "Process and analyze CSV data"

# Input configuration
input:
  source:
    type: "csv"                    # csv, excel, json, parquet
    path: "data/raw/input.csv"     # Relative path
    encoding: "utf-8"

  validation:
    required_columns: ["id", "value", "date"]
    max_rows: 1000000
    max_size_mb: 100

# Processing steps
processing:
  steps:
    - name: "clean_data"
      operation: "remove_nulls"
      columns: ["value"]

    - name: "transform"
      operation: "calculate"
      expression: "value * 1.1"
      output_column: "adjusted_value"

    - name: "aggregate"
      operation: "group_by"
      by: ["category"]
      aggregations:
        value: "sum"
        count: "count"

# Output configuration
output:
  format: "html"                   # html, csv, json, excel
  path: "reports/analysis_report.html"
  include_plots: true
  plots:
    - type: "time_series"
      x: "date"
      y: ["value", "adjusted_value"]
    - type: "bar"
      x: "category"
      y: "sum_value"

# Execution settings
execution:
  log_level: "INFO"
  parallel: false
  timeout_minutes: 30
```

### Complete Example

```yaml
# config/input/bsee_analysis.yaml
metadata:
  name: "bsee-production-analysis"
  version: "2.0.0"
  created: "2026-01-14"
  author: "energy-analyst"
  description: "BSEE production data analysis with NPV calculation"

input:
  source:
    type: "csv"
    path: "data/raw/bsee_production.csv"
    date_columns: ["production_date"]
    parse_dates: true

  filters:
    - column: "field_name"
      operator: "in"
      values: ["JULIA", "ANCHOR", "JACK"]
    - column: "production_date"
      operator: ">="
      value: "2020-01-01"

  validation:
    required_columns:
      - "api_number"
      - "field_name"
      - "oil_bbl"
      - "gas_mcf"
      - "production_date"
    numeric_columns: ["oil_bbl", "gas_mcf", "water_bbl"]

processing:
  steps:
    - name: "clean"
      operation: "fillna"
      columns: ["water_bbl"]
      value: 0

    - name: "calculate_boe"
      operation: "add_column"
      expression: "oil_bbl + gas_mcf / 6"
      output_column: "boe"

    - name: "monthly_aggregate"
      operation: "resample"
      date_column: "production_date"
      frequency: "M"
      aggregations:
        oil_bbl: "sum"
        gas_mcf: "sum"
        boe: "sum"

    - name: "npv_calculation"
      operation: "npv"
      cash_flow_column: "revenue"
      discount_rates: [0.08, 0.10, 0.12]
      periods: 20

output:
  format: "html"
  path: "reports/bsee_analysis_{timestamp}.html"
  title: "BSEE Production Analysis"

  summary:
    include: true
    metrics:
      - "total_oil_bbl"
      - "total_gas_mcf"
      - "total_boe"
      - "npv_results"

  plots:
    - type: "time_series"
      title: "Monthly Production"
      x: "production_date"
      y: ["oil_bbl", "gas_mcf"]

    - type: "bar"
      title: "Production by Field"
      x: "field_name"
      y: "total_boe"

    - type: "line"
      title: "NPV Sensitivity"
      x: "discount_rate"
      y: "npv"

execution:
  log_level: "INFO"
  save_intermediate: true
  intermediate_path: "data/processed/"
  parallel: true
  n_workers: 4
```

## Core Implementation

### Workflow Executor Class

```python
"""
ABOUTME: YAML workflow executor for standardized data pipelines
ABOUTME: Executes configuration-driven processing workflows
"""

import yaml
import logging
from pathlib import Path
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
from pydantic import BaseModel, validator


class InputConfig(BaseModel):
    """Input configuration model."""
    type: str = "csv"
    path: str
    encoding: str = "utf-8"
    date_columns: List[str] = []
    parse_dates: bool = True


class ProcessingStep(BaseModel):
    """Processing step configuration."""
    name: str
    operation: str
    columns: Optional[List[str]] = None
    expression: Optional[str] = None
    output_column: Optional[str] = None


class WorkflowConfig(BaseModel):
    """Complete workflow configuration."""
    metadata: Dict[str, Any]
    input: Dict[str, Any]
    processing: Dict[str, Any]
    output: Dict[str, Any]
    execution: Dict[str, Any] = {}


class YAMLWorkflowExecutor:
    """Execute workflows defined in YAML configuration."""

    def __init__(self, config_path: Path):
        """
        Initialize executor with configuration file.

        Args:
            config_path: Path to YAML configuration
        """
        self.config_path = Path(config_path)
        self.config: Optional[WorkflowConfig] = None
        self.data: Optional[pd.DataFrame] = None
        self.results: Dict[str, Any] = {}

        self._setup_logging()
        self._load_config()

    def _setup_logging(self):
        """Configure logging."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)

    def _load_config(self):
        """Load and validate configuration."""
        if not self.config_path.exists():
            raise FileNotFoundError(f"Config not found: {self.config_path}")

        with open(self.config_path) as f:
            raw_config = yaml.safe_load(f)

        self.config = WorkflowConfig(**raw_config)
        self.logger.info(f"Loaded config: {self.config.metadata.get('name')}")

    def validate(self) -> bool:
        """
        Validate configuration without executing.

        Returns:
            True if valid, False otherwise
        """
        errors = []

        # Check input file exists
        input_path = Path(self.config.input['source']['path'])
        if not input_path.exists():
            errors.append(f"Input file not found: {input_path}")

        # Check output directory
        output_path = Path(self.config.output['path']).parent
        if not output_path.exists():
            self.logger.warning(f"Output directory will be created: {output_path}")

        # Validate processing steps
        for step in self.config.processing.get('steps', []):
            if 'name' not in step:
                errors.append("Processing step missing 'name'")
            if 'operation' not in step:
                errors.append(f"Step '{step.get('name')}' missing 'operation'")

        if errors:
            for error in errors:
                self.logger.error(error)
            return False

        self.logger.info("Configuration validated successfully")
        return True

    def execute(self, dry_run: bool = False) -> Dict[str, Any]:
        """
        Execute the workflow.

        Args:
            dry_ru

Related in workspace-hub