detecting-dnp3-protocol-anomalies
Detect anomalies in DNP3 (Distributed Network Protocol 3) communications used in SCADA systems by monitoring for unauthorized control commands, firmware update attempts, protocol violations, and deviations from baseline traffic patterns using deep packet inspection and machine learning approaches.
What this skill does
# Detecting DNP3 Protocol Anomalies
## When to Use
- When monitoring SCADA systems in the energy sector where DNP3 is the primary protocol
- When building detection rules for DNP3-based attacks against RTUs and substations
- When investigating suspected unauthorized control commands sent via DNP3
- When deploying IDS with DNP3 deep packet inspection at utility substations
- When responding to alerts from OT monitoring platforms about DNP3 traffic anomalies
**Do not use** for non-DNP3 protocol monitoring (see detecting-modbus-command-injection-attacks for Modbus), for DNP3 Secure Authentication configuration (separate implementation), or for protocol-agnostic network anomaly detection.
## Prerequisites
- Network TAP/SPAN on DNP3 communication segments (TCP port 20000 or serial)
- Baseline of normal DNP3 traffic patterns (masters, outstations, poll intervals, function codes)
- Suricata or Zeek with DNP3 protocol parser enabled
- Understanding of DNP3 function codes and object groups used in the environment
- DNP3 communication topology map (master-to-outstation relationships)
## Workflow
### Step 1: Analyze DNP3 Traffic for Anomalies
```python
#!/usr/bin/env python3
"""DNP3 Protocol Anomaly Detector.
Monitors DNP3 communications for unauthorized control commands,
protocol violations, and deviations from established baselines.
Supports both TCP and serial DNP3 deployments.
"""
import struct
import sys
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Set
try:
from scapy.all import rdpcap, IP, TCP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
# DNP3 Function Codes
DNP3_FUNCTIONS = {
0x00: "Confirm", 0x01: "Read", 0x02: "Write",
0x03: "Select", 0x04: "Operate", 0x05: "Direct Operate",
0x06: "Direct Operate No Ack", 0x07: "Immediate Freeze",
0x08: "Immediate Freeze No Ack", 0x09: "Freeze and Clear",
0x0A: "Freeze and Clear No Ack", 0x0B: "Freeze at Time",
0x0C: "Freeze at Time No Ack", 0x0D: "Cold Restart",
0x0E: "Warm Restart", 0x0F: "Initialize Data",
0x10: "Initialize Application", 0x11: "Start Application",
0x12: "Stop Application", 0x13: "Save Configuration",
0x14: "Enable Unsolicited", 0x15: "Disable Unsolicited",
0x16: "Assign Class", 0x17: "Delay Measurement",
0x18: "Record Current Time", 0x19: "Open File",
0x1A: "Close File", 0x1B: "Delete File",
0x1C: "Get File Info", 0x1D: "Authenticate File",
0x1E: "Abort File", 0x81: "Response", 0x82: "Unsolicited Response",
}
# High-risk function codes that should trigger alerts
DNP3_CRITICAL_FUNCTIONS = {
0x02, # Write
0x03, 0x04, 0x05, 0x06, # Select/Operate/Direct Operate
0x0D, # Cold Restart
0x0E, # Warm Restart
0x0F, # Initialize Data
0x10, # Initialize Application
0x12, # Stop Application
0x19, 0x1A, 0x1B, # File operations (firmware update)
}
class DNP3AnomalyDetector:
"""Detects anomalies in DNP3 protocol communications."""
def __init__(self, baseline_file: Optional[str] = None):
self.alerts = []
self.sessions = defaultdict(lambda: {
"packet_count": 0,
"function_codes": defaultdict(int),
"control_commands": 0,
"file_operations": 0,
"restarts": 0,
})
self.packet_count = 0
self.dnp3_count = 0
self.authorized_masters: Set[str] = set()
self.authorized_pairs: Dict[str, Set[str]] = defaultdict(set)
self.baseline_functions: Dict[str, Set[int]] = defaultdict(set)
if baseline_file:
self.load_baseline(baseline_file)
def load_baseline(self, filepath: str):
"""Load DNP3 communication baseline."""
with open(filepath, "r") as f:
baseline = json.load(f)
for entry in baseline.get("authorized_communications", []):
master = entry["master_ip"]
outstation = entry["outstation_ip"]
self.authorized_masters.add(master)
self.authorized_pairs[master].add(outstation)
self.baseline_functions[f"{master}->{outstation}"] = set(
entry.get("expected_function_codes", [0x00, 0x01])
)
def parse_dnp3_header(self, payload: bytes) -> Optional[dict]:
"""Parse DNP3 data link layer and transport/application headers."""
if len(payload) < 10:
return None
# DNP3 Data Link Layer: start(2) + length(1) + control(1) + dest(2) + source(2) + crc(2)
start_bytes = struct.unpack(">H", payload[0:2])[0]
if start_bytes != 0x0564:
return None
length = payload[2]
control = payload[3]
dest_addr = struct.unpack("<H", payload[4:6])[0]
source_addr = struct.unpack("<H", payload[6:8])[0]
direction = "Master->Outstation" if (control & 0x80) else "Outstation->Master"
result = {
"length": length,
"control": control,
"direction": direction,
"dest_addr": dest_addr,
"source_addr": source_addr,
"is_master": bool(control & 0x80),
}
# Parse transport and application layer (after CRC bytes)
if len(payload) >= 12:
transport_header = payload[10]
if len(payload) >= 13:
app_control = payload[11]
func_code = payload[12]
result["function_code"] = func_code
result["function_name"] = DNP3_FUNCTIONS.get(
func_code, f"Unknown (0x{func_code:02x})"
)
return result
def analyze_packet(self, pkt):
"""Analyze a packet for DNP3 anomalies."""
self.packet_count += 1
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
return
tcp = pkt[TCP]
if tcp.dport != 20000 and tcp.sport != 20000:
return
payload = bytes(tcp.payload)
if not payload:
return
dnp3 = self.parse_dnp3_header(payload)
if not dnp3:
return
self.dnp3_count += 1
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
session_key = f"{src_ip}->{dst_ip}"
session = self.sessions[session_key]
session["packet_count"] += 1
func_code = dnp3.get("function_code")
if func_code is not None:
session["function_codes"][func_code] += 1
# Detection 1: Unauthorized DNP3 master
if dnp3.get("is_master") and self.authorized_masters:
if src_ip not in self.authorized_masters:
self.alerts.append({
"severity": "CRITICAL",
"type": "UNAUTHORIZED_DNP3_MASTER",
"src": src_ip, "dst": dst_ip,
"function": dnp3.get("function_name"),
"description": f"Unauthorized DNP3 master {src_ip} communicating with outstation {dst_ip}",
"mitre": "T0869 - Standard Application Layer Protocol",
})
# Detection 2: Cold/Warm restart command
if func_code in (0x0D, 0x0E):
session["restarts"] += 1
restart_type = "Cold" if func_code == 0x0D else "Warm"
self.alerts.append({
"severity": "CRITICAL",
"type": "DNP3_RESTART_COMMAND",
"src": src_ip, "dst": dst_ip,
"function": f"{restart_type} Restart",
"description": f"{restart_type} restart command sent to outstation {dst_ip} (addr {dnp3['dest_addr']})",
"mitre": "T0816 - Device Restart/Shutdown",
})
# Detection 3: File operations (potential firmware update)
if func_code in (0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E):
session["file_operations"] += 1
Related in General
modeling-omnistudio-epc-catalog
IncludedSalesforce Industries CME EPC product-modeling skill for Product2-based catalog creation. Use when creating EPC products, configuring product attributes, building offer bundles with Product Child Items, or reviewing EPC DataPack JSON metadata for product catalog changes. TRIGGER when: user creates or updates Product2 EPC records, AttributeAssignment payloads, AttributeMetadata/AttributeDefaultValues, Offer bundles, or ProductChildItem relationships. DO NOT TRIGGER when: designing OmniScripts/FlexCards/Integration Procedures (use building-omnistudio-omniscript, building-omnistudio-flexcard, or building-omnistudio-integration-procedure), implementing Apex business logic (use generating-apex), or troubleshooting deployment pipelines (use deploying-metadata).
relationship-science-coach
IncludedUse this skill for direct, practical adult relationship coaching: couples conflict, repair, trust, marriage, dating, flirting, attachment patterns, emotional connection, sex, desire differences, eroticism, kink negotiation, affection, love languages, breakups, and long-term passion. Draw on Gottman, EFT and Hold Me Tight, attachment science, modern sex research, Perel, Nagoski, Kerner, Schnarch, Love and Stosny, and flexible love-language tools. Be concrete and low-hedge. Redirect only for imminent danger, abuse, coercive control, minors, non-consent, self-harm, stalking, or medical/legal/psychiatric decisions.
building-sf-integrations
IncludedSalesforce integration architecture and runtime plumbing with 120-point scoring. Use this skill to set up Named Credentials, External Credentials, External Services, REST/SOAP callout patterns, Platform Events, and Change Data Capture. TRIGGER when: user sets up Named Credentials, External Services, REST/SOAP callouts, Platform Events, CDC, or touches .namedCredential-meta.xml files. DO NOT TRIGGER when: Connected App/OAuth config (use configuring-connected-apps), Apex-only logic (use generating-apex), or data import/export (use handling-sf-data).
venue-templates
IncludedAccess comprehensive LaTeX templates, formatting requirements, and submission guidelines for major scientific publication venues (Nature, Science, PLOS, IEEE, ACM), academic conferences (NeurIPS, ICML, CVPR, CHI), research posters, and grant proposals (NSF, NIH, DOE, DARPA). This skill should be used when preparing manuscripts for journal submission, conference papers, research posters, or grant proposals and need venue-specific formatting requirements and templates.
let-fate-decide
IncludedDraws the 12 Houses of the Zodiac Tarot spread to inject entropy into planning when prompts are vague, ambiguous, or casually delegated. Interprets the spread to guide next steps. Use when the user says 'let fate decide', 'YOLO', 'whatever', 'idk', or other nonchalant phrases, makes Yu-Gi-Oh references, or when you are about to arbitrarily pick between multiple reasonable approaches. Prefer over ask-questions-if-underspecified when the user's tone is casual or playful rather than precision-seeking.
net-ops
IncludedCross-platform network troubleshooting (Windows, macOS, Linux) via local or remote shell. Use for: DNS broken, can't resolve hostnames, nslookup/dig works but apps fail, NRPT, WFP, scutil, /etc/resolver, systemd-resolved, /etc/resolv.conf, NetworkManager, VPN DNS leak residue (ProtonVPN/Mullvad/WireGuard/AnyConnect), AV/firewall blocking DNS or DoH, Tailscale DNS interaction, intermittent connectivity, remote diagnostics over SSH.