detecting-attacks-on-scada-systems
This skill covers detecting cyber attacks targeting Supervisory Control and Data Acquisition (SCADA) systems including man-in-the-middle attacks on industrial protocols, unauthorized command injection into PLCs, HMI compromise, historian data manipulation, and denial-of-service against control system communications. It leverages OT-specific intrusion detection systems, industrial protocol anomaly detection, and process data analytics to identify attacks that traditional IT security tools miss.
What this skill does
# Detecting Attacks on SCADA Systems
## When to Use
- When deploying intrusion detection capabilities in a SCADA environment for the first time
- When investigating suspected cyber attacks against industrial control systems
- When building detection rules for OT-specific attack patterns (Stuxnet, TRITON, Industroyer)
- When integrating OT network monitoring with an enterprise SOC for unified threat visibility
- When responding to alerts from OT security monitoring tools (Dragos, Nozomi, Claroty)
**Do not use** for detecting attacks on IT-only networks without SCADA/ICS components, for building generic network IDS rules (see building-detection-rules-with-sigma), or for incident response procedures after an attack is confirmed (see performing-ot-incident-response).
## Prerequisites
- Passive network monitoring sensors deployed on SPAN/TAP ports at OT network boundaries
- OT intrusion detection system (Dragos Platform, Nozomi Guardian, Claroty xDome, or Suricata with OT rulesets)
- Understanding of industrial protocols in use (Modbus, DNP3, OPC UA, EtherNet/IP, S7comm)
- Baseline of normal SCADA communication patterns (polling intervals, function codes, register ranges)
- Access to process historian data for physical process anomaly correlation
## Workflow
### Step 1: Establish SCADA Communication Baselines
Before detecting anomalies, establish what normal SCADA traffic looks like. Industrial protocols are highly deterministic - the same master polls the same slaves at the same intervals reading the same registers.
```python
#!/usr/bin/env python3
"""SCADA Communication Baseline Builder.
Analyzes OT network traffic to establish deterministic baselines for
Modbus/TCP, DNP3, EtherNet/IP, and S7comm communications.
"""
import json
import sys
from collections import defaultdict
from datetime import datetime
from statistics import mean, stdev
try:
from scapy.all import rdpcap, IP, TCP, UDP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
MODBUS_FUNC_NAMES = {
1: "Read Coils", 2: "Read Discrete Inputs",
3: "Read Holding Registers", 4: "Read Input Registers",
5: "Write Single Coil", 6: "Write Single Register",
8: "Diagnostics", 15: "Write Multiple Coils",
16: "Write Multiple Registers", 17: "Report Slave ID",
22: "Mask Write Register", 23: "Read/Write Multiple Registers",
43: "Encapsulated Interface Transport",
}
class SCADABaselineBuilder:
"""Builds deterministic baselines from SCADA traffic captures."""
def __init__(self):
self.modbus_sessions = defaultdict(lambda: {
"func_codes": defaultdict(int),
"register_ranges": set(),
"intervals": [],
"last_seen": None,
"request_count": 0,
})
self.communication_pairs = defaultdict(lambda: {
"protocols": set(),
"packet_count": 0,
"first_seen": None,
"last_seen": None,
})
def process_pcap(self, pcap_file):
"""Process pcap file to build SCADA baselines."""
packets = rdpcap(pcap_file)
print(f"[*] Processing {len(packets)} packets for baseline...")
for pkt in packets:
if not pkt.haslayer(IP):
continue
src = pkt[IP].src
dst = pkt[IP].dst
ts = float(pkt.time)
# Track communication pairs
pair_key = f"{src}->{dst}"
pair = self.communication_pairs[pair_key]
pair["packet_count"] += 1
if pair["first_seen"] is None:
pair["first_seen"] = ts
pair["last_seen"] = ts
# Analyze Modbus/TCP
if pkt.haslayer(TCP) and pkt[TCP].dport == 502:
self._analyze_modbus(pkt, src, dst, ts)
def _analyze_modbus(self, pkt, src, dst, timestamp):
"""Extract Modbus function codes and register ranges."""
payload = bytes(pkt[TCP].payload)
if len(payload) < 8:
return
# MBAP header: transaction_id(2) + protocol_id(2) + length(2) + unit_id(1) + func_code(1)
func_code = payload[7]
session_key = f"{src}->{dst}"
session = self.modbus_sessions[session_key]
session["func_codes"][func_code] += 1
session["request_count"] += 1
session["protocols"] = {"Modbus/TCP"}
# Track polling intervals
if session["last_seen"] is not None:
interval = timestamp - session["last_seen"]
if 0.01 < interval < 60: # Reasonable polling interval
session["intervals"].append(interval)
session["last_seen"] = timestamp
# Extract register range for read/write operations
if len(payload) >= 12 and func_code in (1, 2, 3, 4, 5, 6, 15, 16):
start_register = (payload[8] << 8) | payload[9]
if func_code in (1, 2, 3, 4, 15, 16) and len(payload) >= 12:
count = (payload[10] << 8) | payload[11]
session["register_ranges"].add((func_code, start_register, start_register + count))
def generate_baseline(self):
"""Generate the baseline profile from collected data."""
baseline = {
"generated": datetime.now().isoformat(),
"modbus_baselines": {},
"communication_pairs": {},
}
for session_key, session in self.modbus_sessions.items():
avg_interval = mean(session["intervals"]) if session["intervals"] else 0
interval_std = stdev(session["intervals"]) if len(session["intervals"]) > 1 else 0
baseline["modbus_baselines"][session_key] = {
"allowed_function_codes": list(session["func_codes"].keys()),
"function_code_distribution": {
MODBUS_FUNC_NAMES.get(k, f"FC{k}"): v
for k, v in session["func_codes"].items()
},
"polling_interval_avg_sec": round(avg_interval, 3),
"polling_interval_stddev": round(interval_std, 3),
"register_ranges": [
{"func_code": r[0], "start": r[1], "end": r[2]}
for r in session["register_ranges"]
],
"total_requests": session["request_count"],
}
return baseline
def export_baseline(self, output_file):
"""Export baseline to JSON file."""
baseline = self.generate_baseline()
with open(output_file, "w") as f:
json.dump(baseline, f, indent=2)
print(f"[*] Baseline saved to: {output_file}")
# Print summary
print(f"\n{'='*60}")
print("SCADA COMMUNICATION BASELINE SUMMARY")
print(f"{'='*60}")
for session, data in baseline["modbus_baselines"].items():
print(f"\n Session: {session}")
print(f" Function Codes: {data['allowed_function_codes']}")
print(f" Polling Interval: {data['polling_interval_avg_sec']}s (+/- {data['polling_interval_stddev']}s)")
print(f" Register Ranges: {len(data['register_ranges'])}")
print(f" Total Requests: {data['total_requests']}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python scada_baseline.py <pcap_file> [output.json]")
sys.exit(1)
builder = SCADABaselineBuilder()
builder.process_pcap(sys.argv[1])
output = sys.argv[2] if len(sys.argv) > 2 else "scada_baseline.json"
builder.export_baseline(output)
```
### Step 2: Deploy OT-Specific Detection Rules
Create detection rules for known SCADA attack patterns including those used by TRITON, Industroyer/CrashOverride, and PIPEDREAM/INCONTROLLER.
```yaml
# Suricata Rules for SCADA Attack Detection
# Deploy on IDS sensor monitoring OT network SPAN port
# --- Modbus Attack Detection ---
# Unauthorized Modbus write to PLC from non-engineering workstation
alert modbus any any -> $OT_PLC_SUBNET 502 (
msg:"OT-DRelated in Security
mac-ops
IncludedComprehensive macOS workstation operations — diagnose kernel panics, identify failing drives, audit launchd startup items, decode wake reasons, triage TCC permission denials, manage APFS snapshots, recover from no-boot. Use for: Mac is slow, slow bootup, won't boot, kernel panic, kernel_task hot, mds_stores CPU, photoanalysisd, cloudd, login loop, gray screen, sleep wake failure, drive failing, IO errors, APFS snapshots eating space, Time Machine local snapshots, Spotlight indexing, launchd, LaunchAgent, LaunchDaemon, login items, TCC permissions, Full Disk Access, Screen Recording denied, Gatekeeper, quarantine, com.apple.quarantine, app is damaged, helper tool, /Library/PrivilegedHelperTools, pmset, wake reasons, dark wake, sysdiagnose, panic.ips, DiagnosticReports, configuration profile, MDM profile, remote diagnostics over SSH.
a11y-audit
IncludedRun accessibility audits on web projects combining automated scanning (axe-core, Lighthouse) with WCAG 2.1 AA compliance mapping, manual check guidance, and structured reporting. Output is configurable: markdown report only, markdown plus machine-readable JSON, or markdown plus issue tracker integration. Use this skill whenever the user mentions "accessibility audit", "a11y audit", "WCAG audit", "accessibility check", "compliance scan", or asks to check a web project for accessibility issues. Also trigger when the user wants to verify WCAG conformance or map findings to a specific standard (CAN-ASC-6.2, EN 301 549, ADA/AODA).
erpclaw
IncludedAI-native ERP system with self-extending OS. Full accounting, invoicing, inventory, purchasing, tax, billing, HR, payroll, advanced accounting (ASC 606/842, intercompany, consolidation), and financial reporting. 413 actions across 14 domains, 43 expansion modules. Constitutional guardrails, adversarial audit, schema migration. Double-entry GL, immutable audit trail, US GAAP.
assess
IncludedAssesses and rates quality 0-10 across multiple dimensions (correctness, maintainability, security, performance, testability, simplicity) with pros/cons analysis. Compares against project conventions and prior decisions from memory. Produces structured evaluation reports with actionable improvement suggestions. Use when evaluating code, designs, architectures, or comparing alternative approaches.
spring-boot-security-jwt
IncludedProvides JWT authentication and authorization patterns for Spring Boot 3.5.x covering token generation with JJWT, Bearer/cookie authentication, database/OAuth2 integration, and RBAC/permission-based access control using Spring Security 6.x. Use when implementing authentication or authorization in Spring Boot applications.
code-hardcode-audit
IncludedDetect hardcoded values, magic numbers, and leaked secrets. TRIGGERS - hardcode audit, magic numbers, PLR2004, secret scanning.