Claude
Skills
Sign in
Back

implementing-dragos-platform-for-ot-monitoring

Included with Lifetime
$97 forever

Deploy and configure the Dragos Platform for OT network monitoring, leveraging its 600+ industrial protocol parsers, intelligence-driven threat detection analytics, and asset visibility capabilities to protect ICS environments against threat groups like VOLTZITE, GRAPHITE, and BAUXITE.

Cloud & DevOpsot-securityicsdragosthreat-detectionot-monitoringscadathreat-intelligencendrscripts

What this skill does


# Implementing Dragos Platform for OT Monitoring

## When to Use

- When deploying an OT-specific network detection and response (NDR) solution for industrial environments
- When needing threat intelligence-driven detection against known ICS threat groups (VOLTZITE, CHERNOVITE, KAMACITE)
- When building an OT SOC capability with purpose-built industrial security tooling
- When requiring asset discovery and vulnerability management alongside threat detection in a single platform
- When integrating OT security monitoring with an enterprise SIEM (Splunk, Sentinel, QRadar)

**Do not use** for IT-only network monitoring without ICS components, for endpoint detection and response (EDR) on OT workstations, or for environments standardized on Claroty or Nozomi (see respective skills).

## Prerequisites

- Dragos Platform license and deployment package
- Network TAP or SPAN port at OT network boundaries (one sensor per monitored segment)
- Dragos sensor hardware (physical appliance) or virtual appliance meeting minimum specifications
- Firewall rules allowing sensor-to-Dragos-SiteStore communication (encrypted, outbound only from OT)
- Dragos Knowledge Pack subscription for threat intelligence updates

## Workflow

### Step 1: Deploy Dragos Sensors and Configure Monitoring

```python
#!/usr/bin/env python3
"""Dragos Platform Deployment Validator and Integration Tool.

Validates Dragos sensor deployment, checks connectivity, and
configures integration with enterprise SIEM for OT alert forwarding.
"""

import json
import sys
import csv
from datetime import datetime
from typing import Optional, List, Dict

try:
    import requests
except ImportError:
    print("Install requests: pip install requests")
    sys.exit(1)


class DragosPlatformManager:
    """Interface with Dragos Platform API for OT monitoring management."""

    def __init__(self, base_url: str, api_key: str, api_secret: str, verify_ssl: bool = True):
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({
            "API-Key": api_key,
            "API-Secret": api_secret,
            "Content-Type": "application/json",
        })
        self.session.verify = verify_ssl

    def get_sensors(self) -> List[Dict]:
        """Retrieve all deployed Dragos sensors and their status."""
        resp = self.session.get(f"{self.base_url}/api/v1/sensors")
        resp.raise_for_status()
        return resp.json().get("sensors", [])

    def get_assets(self, asset_type: Optional[str] = None) -> List[Dict]:
        """Retrieve OT assets discovered by Dragos."""
        params = {}
        if asset_type:
            params["type"] = asset_type
        resp = self.session.get(f"{self.base_url}/api/v1/assets", params=params)
        resp.raise_for_status()
        return resp.json().get("assets", [])

    def get_notifications(self, severity: str = "high", limit: int = 50) -> List[Dict]:
        """Retrieve threat detection notifications."""
        params = {"min_severity": severity, "limit": limit}
        resp = self.session.get(f"{self.base_url}/api/v1/notifications", params=params)
        resp.raise_for_status()
        return resp.json().get("notifications", [])

    def get_vulnerabilities(self, severity: str = "critical") -> List[Dict]:
        """Retrieve OT vulnerabilities with Dragos-specific context."""
        params = {"min_severity": severity}
        resp = self.session.get(f"{self.base_url}/api/v1/vulnerabilities", params=params)
        resp.raise_for_status()
        return resp.json().get("vulnerabilities", [])

    def get_threat_groups(self) -> List[Dict]:
        """Retrieve tracked ICS threat group activity relevant to the environment."""
        resp = self.session.get(f"{self.base_url}/api/v1/threat-groups")
        resp.raise_for_status()
        return resp.json().get("threat_groups", [])

    def validate_deployment(self):
        """Validate sensor deployment health and coverage."""
        sensors = self.get_sensors()
        assets = self.get_assets()

        print(f"\n{'='*65}")
        print("DRAGOS PLATFORM DEPLOYMENT VALIDATION")
        print(f"{'='*65}")
        print(f"Validation Time: {datetime.now().isoformat()}")

        print(f"\n--- SENSOR STATUS ---")
        healthy_sensors = 0
        for sensor in sensors:
            status = sensor.get("status", "unknown")
            icon = "[OK]" if status == "connected" else "[!!]"
            print(f"  {icon} {sensor.get('name', 'Unknown')} | Status: {status}")
            print(f"      IP: {sensor.get('ip_address')} | Segment: {sensor.get('monitored_segment')}")
            print(f"      Last Seen: {sensor.get('last_seen')} | Packets/sec: {sensor.get('pps', 0)}")
            print(f"      Knowledge Pack: {sensor.get('knowledge_pack_version', 'N/A')}")
            if status == "connected":
                healthy_sensors += 1

        print(f"\n  Sensor Health: {healthy_sensors}/{len(sensors)} operational")

        print(f"\n--- ASSET VISIBILITY ---")
        print(f"  Total Assets Discovered: {len(assets)}")
        asset_types = {}
        for asset in assets:
            atype = asset.get("type", "Unknown")
            asset_types[atype] = asset_types.get(atype, 0) + 1
        for atype, count in sorted(asset_types.items(), key=lambda x: -x[1]):
            print(f"    {atype}: {count}")

        protocols = set()
        for asset in assets:
            protocols.update(asset.get("protocols", []))
        print(f"  Protocols Observed: {', '.join(sorted(protocols))}")

        print(f"\n--- THREAT INTELLIGENCE ---")
        groups = self.get_threat_groups()
        print(f"  Relevant Threat Groups: {len(groups)}")
        for group in groups:
            print(f"    - {group.get('name')}: {group.get('description', '')[:80]}")
            print(f"      Targets: {', '.join(group.get('target_sectors', []))}")
            print(f"      Activity Level: {group.get('activity_level', 'Unknown')}")

    def generate_siem_integration_config(self, siem_type: str = "splunk"):
        """Generate SIEM integration configuration for Dragos alerts."""
        configs = {
            "splunk": {
                "syslog_format": "CEF",
                "syslog_port": 514,
                "severity_mapping": {
                    "critical": 10,
                    "high": 7,
                    "medium": 5,
                    "low": 3,
                    "info": 1,
                },
                "index": "ot_security",
                "sourcetype": "dragos:notification",
                "fields": [
                    "notification_id", "severity", "category", "source_ip",
                    "destination_ip", "asset_name", "protocol", "description",
                    "mitre_ics_technique", "threat_group",
                ],
            },
            "sentinel": {
                "connector_type": "Syslog-CEF",
                "workspace_id": "<workspace-id>",
                "log_analytics_table": "DragosOTAlerts_CL",
                "severity_mapping": {
                    "critical": "High",
                    "high": "High",
                    "medium": "Medium",
                    "low": "Low",
                    "info": "Informational",
                },
            },
        }

        config = configs.get(siem_type, configs["splunk"])
        print(f"\n--- {siem_type.upper()} INTEGRATION CONFIG ---")
        print(json.dumps(config, indent=2))
        return config


if __name__ == "__main__":
    manager = DragosPlatformManager(
        base_url="https://dragos-sitestore.plant.local",
        api_key="your-api-key",
        api_secret="your-api-secret",
        verify_ssl=True,
    )

    manager.validate_deployment()
    manager.generate_siem_integration_config("splunk")

    print(f"\n--- RECENT HIGH-SEVERITY NOTIFICATIONS ---")
    notifications = manager.get_notifications(severity="high", limit=10)
    for n in notifications:
        pri

Related in Cloud & DevOps