detecting-modbus-command-injection-attacks
Detect command injection attacks against Modbus TCP/RTU protocol in ICS environments by monitoring for unauthorized write operations, anomalous function codes, malformed frames, and deviations from established communication baselines using ICS-aware IDS and protocol deep packet inspection.
What this skill does
# Detecting Modbus Command Injection Attacks
## When to Use
- When deploying intrusion detection for environments using Modbus TCP (port 502) or Modbus RTU
- When investigating suspected unauthorized modifications to PLC registers or coils
- When building detection analytics for OT SOC monitoring Modbus-heavy environments
- When responding to FrostyGoop-style attacks that leverage Modbus TCP for operational impact
- When performing baseline validation after a suspected compromise of a Modbus master
**Do not use** for detecting attacks on non-Modbus protocols (see detecting-dnp3-protocol-anomalies for DNP3), for general IT network intrusion detection, or for Modbus device configuration (see performing-ot-vulnerability-scanning-safely).
## Prerequisites
- Network SPAN/TAP on the segment carrying Modbus TCP traffic (typically port 502)
- Baseline of normal Modbus communication patterns (masters, slaves, function codes, register ranges, polling intervals)
- Suricata, Zeek, or commercial OT IDS deployed with Modbus protocol parsers enabled
- Understanding of Modbus function codes used in the environment (read vs write operations)
- Access to PLC programming documentation to validate expected register ranges
## Workflow
### Step 1: Build Modbus Communication Baseline
Capture and analyze normal Modbus traffic to establish what constitutes legitimate communication patterns.
```python
#!/usr/bin/env python3
"""Modbus Command Injection Detector.
Monitors Modbus TCP traffic for unauthorized write operations, anomalous
function codes, and deviations from established communication baselines.
Detects attacks like FrostyGoop that use Modbus TCP for operational impact.
"""
import json
import struct
import sys
import time
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple
try:
from scapy.all import sniff, IP, TCP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
# Modbus function code definitions
MODBUS_READ_FUNCTIONS = {1, 2, 3, 4}
MODBUS_WRITE_FUNCTIONS = {5, 6, 15, 16}
MODBUS_DIAGNOSTIC_FUNCTIONS = {8, 17, 43}
MODBUS_FUNC_NAMES = {
1: "Read Coils", 2: "Read Discrete Inputs",
3: "Read Holding Registers", 4: "Read Input Registers",
5: "Write Single Coil", 6: "Write Single Register",
8: "Diagnostics", 15: "Write Multiple Coils",
16: "Write Multiple Registers", 17: "Report Slave ID",
22: "Mask Write Register", 23: "Read/Write Multiple Registers",
43: "Encapsulated Interface Transport",
}
class ModbusAlert:
"""Represents a detected Modbus anomaly."""
def __init__(self, severity: str, alert_type: str, src_ip: str,
dst_ip: str, unit_id: int, func_code: int,
description: str, mitre_technique: str = ""):
self.timestamp = datetime.now().isoformat()
self.severity = severity
self.alert_type = alert_type
self.src_ip = src_ip
self.dst_ip = dst_ip
self.unit_id = unit_id
self.func_code = func_code
self.func_name = MODBUS_FUNC_NAMES.get(func_code, f"Unknown FC {func_code}")
self.description = description
self.mitre_technique = mitre_technique
def __str__(self):
return (
f"[{self.severity}] {self.alert_type} | {self.src_ip} -> {self.dst_ip} "
f"| Unit {self.unit_id} | {self.func_name} | {self.description}"
)
class ModbusInjectionDetector:
"""Detects Modbus command injection attacks."""
def __init__(self, baseline_file: Optional[str] = None):
self.alerts: List[ModbusAlert] = []
self.packet_count = 0
self.modbus_count = 0
# Baseline data
self.authorized_masters: Set[str] = set()
self.authorized_pairs: Set[Tuple[str, str]] = set()
self.allowed_write_sources: Set[str] = set()
self.allowed_function_codes: Dict[str, Set[int]] = defaultdict(set)
self.allowed_register_ranges: Dict[str, List[Tuple[int, int]]] = defaultdict(list)
self.polling_intervals: Dict[str, float] = {}
self.last_seen: Dict[str, float] = {}
# Counters for rate detection
self.write_counts: Dict[str, List[float]] = defaultdict(list)
if baseline_file:
self.load_baseline(baseline_file)
def load_baseline(self, filepath: str):
"""Load established Modbus communication baseline."""
with open(filepath, "r") as f:
baseline = json.load(f)
for session_key, data in baseline.get("modbus_baselines", {}).items():
src, dst = session_key.split("->")
self.authorized_pairs.add((src.strip(), dst.strip()))
self.authorized_masters.add(src.strip())
fc_set = set(data.get("allowed_function_codes", []))
self.allowed_function_codes[session_key] = fc_set
if fc_set & MODBUS_WRITE_FUNCTIONS:
self.allowed_write_sources.add(src.strip())
for reg_range in data.get("register_ranges", []):
self.allowed_register_ranges[session_key].append(
(reg_range["start"], reg_range["end"])
)
if data.get("polling_interval_avg_sec"):
self.polling_intervals[session_key] = data["polling_interval_avg_sec"]
print(f"[*] Baseline loaded: {len(self.authorized_pairs)} authorized pairs, "
f"{len(self.allowed_write_sources)} authorized write sources")
def parse_modbus_mbap(self, payload: bytes) -> Optional[dict]:
"""Parse Modbus TCP MBAP header and PDU."""
if len(payload) < 8:
return None
transaction_id = struct.unpack(">H", payload[0:2])[0]
protocol_id = struct.unpack(">H", payload[2:4])[0]
length = struct.unpack(">H", payload[4:6])[0]
unit_id = payload[6]
func_code = payload[7]
if protocol_id != 0: # Not Modbus
return None
result = {
"transaction_id": transaction_id,
"protocol_id": protocol_id,
"length": length,
"unit_id": unit_id,
"func_code": func_code,
}
# Parse register address and count for read/write operations
if len(payload) >= 12 and func_code in (1, 2, 3, 4, 5, 6, 15, 16):
result["start_address"] = struct.unpack(">H", payload[8:10])[0]
result["quantity"] = struct.unpack(">H", payload[10:12])[0]
return result
def analyze_packet(self, pkt):
"""Analyze a network packet for Modbus command injection."""
self.packet_count += 1
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
return
tcp = pkt[TCP]
if tcp.dport != 502 and tcp.sport != 502:
return
payload = bytes(tcp.payload)
if not payload:
return
modbus = self.parse_modbus_mbap(payload)
if not modbus:
return
self.modbus_count += 1
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
session_key = f"{src_ip}->{dst_ip}"
now = time.time()
# Detection Rule 1: Unauthorized Modbus master
if self.authorized_masters and src_ip not in self.authorized_masters:
if tcp.dport == 502:
self.alerts.append(ModbusAlert(
severity="CRITICAL",
alert_type="UNAUTHORIZED_MASTER",
src_ip=src_ip, dst_ip=dst_ip,
unit_id=modbus["unit_id"],
func_code=modbus["func_code"],
description=f"Unauthorized device {src_ip} sending Modbus commands to {dst_ip}",
mitre_technique="T0843 - Program Download",
))
# Detection Rule 2: Unauthorized write operation
if modbus["func_code"] in MODBUS_WRITE_FUNCTIONS:
if self.allowed_write_sources and src_ip not in self.allowed_write_sRelated in General
modeling-omnistudio-epc-catalog
IncludedSalesforce Industries CME EPC product-modeling skill for Product2-based catalog creation. Use when creating EPC products, configuring product attributes, building offer bundles with Product Child Items, or reviewing EPC DataPack JSON metadata for product catalog changes. TRIGGER when: user creates or updates Product2 EPC records, AttributeAssignment payloads, AttributeMetadata/AttributeDefaultValues, Offer bundles, or ProductChildItem relationships. DO NOT TRIGGER when: designing OmniScripts/FlexCards/Integration Procedures (use building-omnistudio-omniscript, building-omnistudio-flexcard, or building-omnistudio-integration-procedure), implementing Apex business logic (use generating-apex), or troubleshooting deployment pipelines (use deploying-metadata).
relationship-science-coach
IncludedUse this skill for direct, practical adult relationship coaching: couples conflict, repair, trust, marriage, dating, flirting, attachment patterns, emotional connection, sex, desire differences, eroticism, kink negotiation, affection, love languages, breakups, and long-term passion. Draw on Gottman, EFT and Hold Me Tight, attachment science, modern sex research, Perel, Nagoski, Kerner, Schnarch, Love and Stosny, and flexible love-language tools. Be concrete and low-hedge. Redirect only for imminent danger, abuse, coercive control, minors, non-consent, self-harm, stalking, or medical/legal/psychiatric decisions.
building-sf-integrations
IncludedSalesforce integration architecture and runtime plumbing with 120-point scoring. Use this skill to set up Named Credentials, External Credentials, External Services, REST/SOAP callout patterns, Platform Events, and Change Data Capture. TRIGGER when: user sets up Named Credentials, External Services, REST/SOAP callouts, Platform Events, CDC, or touches .namedCredential-meta.xml files. DO NOT TRIGGER when: Connected App/OAuth config (use configuring-connected-apps), Apex-only logic (use generating-apex), or data import/export (use handling-sf-data).
venue-templates
IncludedAccess comprehensive LaTeX templates, formatting requirements, and submission guidelines for major scientific publication venues (Nature, Science, PLOS, IEEE, ACM), academic conferences (NeurIPS, ICML, CVPR, CHI), research posters, and grant proposals (NSF, NIH, DOE, DARPA). This skill should be used when preparing manuscripts for journal submission, conference papers, research posters, or grant proposals and need venue-specific formatting requirements and templates.
let-fate-decide
IncludedDraws the 12 Houses of the Zodiac Tarot spread to inject entropy into planning when prompts are vague, ambiguous, or casually delegated. Interprets the spread to guide next steps. Use when the user says 'let fate decide', 'YOLO', 'whatever', 'idk', or other nonchalant phrases, makes Yu-Gi-Oh references, or when you are about to arbitrarily pick between multiple reasonable approaches. Prefer over ask-questions-if-underspecified when the user's tone is casual or playful rather than precision-seeking.
net-ops
IncludedCross-platform network troubleshooting (Windows, macOS, Linux) via local or remote shell. Use for: DNS broken, can't resolve hostnames, nslookup/dig works but apps fail, NRPT, WFP, scutil, /etc/resolver, systemd-resolved, /etc/resolv.conf, NetworkManager, VPN DNS leak residue (ProtonVPN/Mullvad/WireGuard/AnyConnect), AV/firewall blocking DNS or DoH, Tailscale DNS interaction, intermittent connectivity, remote diagnostics over SSH.