Claude
Skills
Sign in
Back

detecting-attacks-on-scada-systems

Included with Lifetime
$97 forever

This skill covers detecting cyber attacks targeting Supervisory Control and Data Acquisition (SCADA) systems including man-in-the-middle attacks on industrial protocols, unauthorized command injection into PLCs, HMI compromise, historian data manipulation, and denial-of-service against control system communications. It leverages OT-specific intrusion detection systems, industrial protocol anomaly detection, and process data analytics to identify attacks that traditional IT security tools miss.

Securityot-securityicsscadaindustrial-controliec62443intrusion-detectionthreat-detectionscripts

What this skill does


# Detecting Attacks on SCADA Systems

## When to Use

- When deploying intrusion detection capabilities in a SCADA environment for the first time
- When investigating suspected cyber attacks against industrial control systems
- When building detection rules for OT-specific attack patterns (Stuxnet, TRITON, Industroyer)
- When integrating OT network monitoring with an enterprise SOC for unified threat visibility
- When responding to alerts from OT security monitoring tools (Dragos, Nozomi, Claroty)

**Do not use** for detecting attacks on IT-only networks without SCADA/ICS components, for building generic network IDS rules (see building-detection-rules-with-sigma), or for incident response procedures after an attack is confirmed (see performing-ot-incident-response).

## Prerequisites

- Passive network monitoring sensors deployed on SPAN/TAP ports at OT network boundaries
- OT intrusion detection system (Dragos Platform, Nozomi Guardian, Claroty xDome, or Suricata with OT rulesets)
- Understanding of industrial protocols in use (Modbus, DNP3, OPC UA, EtherNet/IP, S7comm)
- Baseline of normal SCADA communication patterns (polling intervals, function codes, register ranges)
- Access to process historian data for physical process anomaly correlation

## Workflow

### Step 1: Establish SCADA Communication Baselines

Before detecting anomalies, establish what normal SCADA traffic looks like. Industrial protocols are highly deterministic - the same master polls the same slaves at the same intervals reading the same registers.

```python
#!/usr/bin/env python3
"""SCADA Communication Baseline Builder.

Analyzes OT network traffic to establish deterministic baselines for
Modbus/TCP, DNP3, EtherNet/IP, and S7comm communications.
"""

import json
import sys
from collections import defaultdict
from datetime import datetime
from statistics import mean, stdev

try:
    from scapy.all import rdpcap, IP, TCP, UDP
except ImportError:
    print("Install scapy: pip install scapy")
    sys.exit(1)

MODBUS_FUNC_NAMES = {
    1: "Read Coils", 2: "Read Discrete Inputs",
    3: "Read Holding Registers", 4: "Read Input Registers",
    5: "Write Single Coil", 6: "Write Single Register",
    8: "Diagnostics", 15: "Write Multiple Coils",
    16: "Write Multiple Registers", 17: "Report Slave ID",
    22: "Mask Write Register", 23: "Read/Write Multiple Registers",
    43: "Encapsulated Interface Transport",
}


class SCADABaselineBuilder:
    """Builds deterministic baselines from SCADA traffic captures."""

    def __init__(self):
        self.modbus_sessions = defaultdict(lambda: {
            "func_codes": defaultdict(int),
            "register_ranges": set(),
            "intervals": [],
            "last_seen": None,
            "request_count": 0,
        })
        self.communication_pairs = defaultdict(lambda: {
            "protocols": set(),
            "packet_count": 0,
            "first_seen": None,
            "last_seen": None,
        })

    def process_pcap(self, pcap_file):
        """Process pcap file to build SCADA baselines."""
        packets = rdpcap(pcap_file)
        print(f"[*] Processing {len(packets)} packets for baseline...")

        for pkt in packets:
            if not pkt.haslayer(IP):
                continue

            src = pkt[IP].src
            dst = pkt[IP].dst
            ts = float(pkt.time)

            # Track communication pairs
            pair_key = f"{src}->{dst}"
            pair = self.communication_pairs[pair_key]
            pair["packet_count"] += 1
            if pair["first_seen"] is None:
                pair["first_seen"] = ts
            pair["last_seen"] = ts

            # Analyze Modbus/TCP
            if pkt.haslayer(TCP) and pkt[TCP].dport == 502:
                self._analyze_modbus(pkt, src, dst, ts)

    def _analyze_modbus(self, pkt, src, dst, timestamp):
        """Extract Modbus function codes and register ranges."""
        payload = bytes(pkt[TCP].payload)
        if len(payload) < 8:
            return

        # MBAP header: transaction_id(2) + protocol_id(2) + length(2) + unit_id(1) + func_code(1)
        func_code = payload[7]
        session_key = f"{src}->{dst}"
        session = self.modbus_sessions[session_key]

        session["func_codes"][func_code] += 1
        session["request_count"] += 1
        session["protocols"] = {"Modbus/TCP"}

        # Track polling intervals
        if session["last_seen"] is not None:
            interval = timestamp - session["last_seen"]
            if 0.01 < interval < 60:  # Reasonable polling interval
                session["intervals"].append(interval)
        session["last_seen"] = timestamp

        # Extract register range for read/write operations
        if len(payload) >= 12 and func_code in (1, 2, 3, 4, 5, 6, 15, 16):
            start_register = (payload[8] << 8) | payload[9]
            if func_code in (1, 2, 3, 4, 15, 16) and len(payload) >= 12:
                count = (payload[10] << 8) | payload[11]
                session["register_ranges"].add((func_code, start_register, start_register + count))

    def generate_baseline(self):
        """Generate the baseline profile from collected data."""
        baseline = {
            "generated": datetime.now().isoformat(),
            "modbus_baselines": {},
            "communication_pairs": {},
        }

        for session_key, session in self.modbus_sessions.items():
            avg_interval = mean(session["intervals"]) if session["intervals"] else 0
            interval_std = stdev(session["intervals"]) if len(session["intervals"]) > 1 else 0

            baseline["modbus_baselines"][session_key] = {
                "allowed_function_codes": list(session["func_codes"].keys()),
                "function_code_distribution": {
                    MODBUS_FUNC_NAMES.get(k, f"FC{k}"): v
                    for k, v in session["func_codes"].items()
                },
                "polling_interval_avg_sec": round(avg_interval, 3),
                "polling_interval_stddev": round(interval_std, 3),
                "register_ranges": [
                    {"func_code": r[0], "start": r[1], "end": r[2]}
                    for r in session["register_ranges"]
                ],
                "total_requests": session["request_count"],
            }

        return baseline

    def export_baseline(self, output_file):
        """Export baseline to JSON file."""
        baseline = self.generate_baseline()
        with open(output_file, "w") as f:
            json.dump(baseline, f, indent=2)
        print(f"[*] Baseline saved to: {output_file}")

        # Print summary
        print(f"\n{'='*60}")
        print("SCADA COMMUNICATION BASELINE SUMMARY")
        print(f"{'='*60}")
        for session, data in baseline["modbus_baselines"].items():
            print(f"\n  Session: {session}")
            print(f"    Function Codes: {data['allowed_function_codes']}")
            print(f"    Polling Interval: {data['polling_interval_avg_sec']}s (+/- {data['polling_interval_stddev']}s)")
            print(f"    Register Ranges: {len(data['register_ranges'])}")
            print(f"    Total Requests: {data['total_requests']}")


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python scada_baseline.py <pcap_file> [output.json]")
        sys.exit(1)

    builder = SCADABaselineBuilder()
    builder.process_pcap(sys.argv[1])
    output = sys.argv[2] if len(sys.argv) > 2 else "scada_baseline.json"
    builder.export_baseline(output)
```

### Step 2: Deploy OT-Specific Detection Rules

Create detection rules for known SCADA attack patterns including those used by TRITON, Industroyer/CrashOverride, and PIPEDREAM/INCONTROLLER.

```yaml
# Suricata Rules for SCADA Attack Detection
# Deploy on IDS sensor monitoring OT network SPAN port

# --- Modbus Attack Detection ---

# Unauthorized Modbus write to PLC from non-engineering workstation
alert modbus any any -> $OT_PLC_SUBNET 502 (
  msg:"OT-D

Related in Security