Claude
Skills
Sign in
Back

workflow-builder

Included with Lifetime
$97 forever

Design and implement multi-step automation workflows

Design

What this skill does



# Workflow Builder Skill

> Build the YAML workflow engine for multi-step agent automation.

## Overview

The workflow engine provides declarative automation capabilities:
- YAML-based workflow definitions
- Variable interpolation between steps
- Conditional execution and error handling
- Multi-agent orchestration

## Prerequisites

```bash
pip install pyyaml
```

## Build Steps

### Step 1: Create the Workflow Executor

**File: `workflows/executor.py`**

```python
#!/usr/bin/env python3
"""
Workflow Executor - Parse and execute YAML workflow definitions.

Provides step-by-step execution with variable interpolation,
conditional logic, and error handling.
"""

import re
import yaml
import json
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from enum import Enum
from datetime import datetime


class StepStatus(Enum):
    """Status of a workflow step."""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"


class ErrorAction(Enum):
    """What to do when a step fails."""
    FAIL = "fail"
    SKIP = "skip"
    RETRY = "retry"


@dataclass
class StepResult:
    """Result of executing a step."""
    step_id: str
    status: StepStatus
    output: Any = None
    outputs: Dict[str, Any] = field(default_factory=dict)
    error: Optional[str] = None
    duration_ms: int = 0


@dataclass
class WorkflowResult:
    """Result of executing a workflow."""
    workflow_name: str
    status: StepStatus
    step_results: List[StepResult] = field(default_factory=list)
    outputs: Dict[str, Any] = field(default_factory=dict)
    error: Optional[str] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

    @property
    def duration_ms(self) -> int:
        if self.started_at and self.completed_at:
            return int((self.completed_at - self.started_at).total_seconds() * 1000)
        return 0

    def to_dict(self) -> dict:
        return {
            "workflow_name": self.workflow_name,
            "status": self.status.value,
            "step_results": [
                {
                    "step_id": r.step_id,
                    "status": r.status.value,
                    "output": r.output,
                    "outputs": r.outputs,
                    "error": r.error,
                    "duration_ms": r.duration_ms
                }
                for r in self.step_results
            ],
            "outputs": self.outputs,
            "error": self.error,
            "duration_ms": self.duration_ms
        }


class VariableResolver:
    """Resolve variable references in workflow definitions."""

    PATTERN = re.compile(r'\{\{([^}]+)\}\}')

    def __init__(self):
        self.workflow_inputs: Dict[str, Any] = {}
        self.step_results: Dict[str, StepResult] = {}
        self.env_vars: Dict[str, str] = dict(os.environ)

    def set_inputs(self, inputs: Dict[str, Any]):
        """Set workflow input values."""
        self.workflow_inputs = inputs

    def add_step_result(self, result: StepResult):
        """Add a completed step result."""
        self.step_results[result.step_id] = result

    def resolve(self, value: Any) -> Any:
        """Resolve variables in a value."""
        if isinstance(value, str):
            return self._resolve_string(value)
        elif isinstance(value, dict):
            return {k: self.resolve(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [self.resolve(v) for v in value]
        return value

    def _resolve_string(self, text: str) -> Any:
        """Resolve variables in a string."""
        match = self.PATTERN.fullmatch(text.strip())
        if match:
            return self._get_value(match.group(1).strip())

        def replacer(m):
            val = self._get_value(m.group(1).strip())
            return "" if val is None else str(val)

        return self.PATTERN.sub(replacer, text)

    def _get_value(self, path: str) -> Any:
        """Get value for a variable path."""
        parts = path.split('.')

        if len(parts) < 2:
            return None

        root = parts[0]

        if root == 'workflow' and len(parts) >= 3 and parts[1] == 'inputs':
            input_name = '.'.join(parts[2:])
            return self.workflow_inputs.get(input_name)

        elif root == 'steps' and len(parts) >= 3:
            step_id = parts[1]
            result = self.step_results.get(step_id)
            if not result:
                return None

            field = parts[2]
            if field == 'output':
                return result.output
            elif field == 'outputs' and len(parts) >= 4:
                output_name = '.'.join(parts[3:])
                return result.outputs.get(output_name)
            elif field == 'success':
                return result.status == StepStatus.SUCCESS
            elif field == 'error':
                return result.error

        elif root == 'env' and len(parts) >= 2:
            var_name = '.'.join(parts[1:])
            return self.env_vars.get(var_name)

        return None


class WorkflowExecutor:
    """Execute workflow definitions."""

    def __init__(self, agent_executor=None):
        self.agent_executor = agent_executor or self._mock_executor
        self.definitions_path = Path(__file__).parent / "definitions"

    def _mock_executor(self, agent: str, action: str, inputs: Dict[str, Any]) -> Any:
        """Mock executor for testing."""
        return {
            "agent": agent,
            "action_summary": action[:100] + "..." if len(action) > 100 else action,
            "inputs_received": list(inputs.keys()),
            "mock": True
        }

    def load_workflow(self, name: str) -> dict:
        """Load a workflow definition by name."""
        paths = [
            self.definitions_path / f"{name}.yaml",
            self.definitions_path / f"{name}.yml",
            self.definitions_path / name
        ]

        for path in paths:
            if path.exists():
                with open(path) as f:
                    return yaml.safe_load(f)

        raise FileNotFoundError(f"Workflow not found: {name}")

    def list_workflows(self) -> List[dict]:
        """List all available workflows."""
        workflows = []
        if self.definitions_path.exists():
            for path in self.definitions_path.glob("*.yaml"):
                try:
                    with open(path) as f:
                        data = yaml.safe_load(f)
                        workflows.append({
                            "name": data.get("name", path.stem),
                            "description": data.get("description", ""),
                            "file": path.name,
                            "inputs": [
                                {"name": i.get("name"), "required": i.get("required", False)}
                                for i in data.get("inputs", [])
                            ]
                        })
                except Exception:
                    pass
        return workflows

    def validate_workflow(self, workflow: dict) -> List[str]:
        """Validate a workflow definition. Returns list of errors."""
        errors = []

        if "name" not in workflow:
            errors.append("Missing required field: name")
        if "steps" not in workflow:
            errors.append("Missing required field: steps")
        elif not isinstance(workflow["steps"], list):
            errors.append("steps must be a list")
        elif len(workflow["steps"]) == 0:
            errors.append("steps cannot be empty")

        step_ids = set()
        for i, step in enumerate(workflow.get("steps", [])):
            step_num = i + 1
            if "id" not in step:
                errors.append(f"Step {step_num}: missing required field 'id'")
            elif step["id"] in step_ids:
                errors.append(f"Step {st

Related in Design