performing-s7comm-protocol-security-analysis
Perform security analysis of Siemens S7comm and S7CommPlus protocols used by SIMATIC S7 PLCs to identify vulnerabilities including replay attacks, integrity bypass, unauthorized CPU stop commands, and program download manipulation exploiting weaknesses in S7-300, S7-400, S7-1200, and S7-1500 controllers.
What this skill does
# Performing S7comm Protocol Security Analysis
## When to Use
- When assessing the security posture of Siemens SIMATIC S7 PLC environments
- When building detection rules for S7comm-based attacks against S7-300/400/1200/1500 controllers
- When performing a security audit of Siemens Step 7/TIA Portal communications
- When investigating suspected unauthorized access to Siemens PLC programs
- When evaluating S7CommPlus integrity mechanisms and their bypass potential
**Do not use** for scanning production Siemens PLCs without authorization and a test plan (this can crash controllers), for non-Siemens protocol analysis (see detecting-modbus-command-injection-attacks for Modbus), or for modifying PLC programs in a production environment.
## Prerequisites
- Network access to the S7comm communication segment (TCP port 102)
- Wireshark with S7comm dissector or Zeek with S7comm protocol analyzer
- Authorized access for security testing (never scan production PLCs without authorization)
- Knowledge of the Siemens PLC models and firmware versions in scope
- Understanding of S7comm protocol structure (COTP, S7 PDU, function codes)
## Workflow
### Step 1: Analyze S7comm Traffic and Identify Vulnerabilities
```python
#!/usr/bin/env python3
"""S7comm Protocol Security Analyzer.
Analyzes Siemens S7comm protocol traffic to identify security
vulnerabilities, unauthorized access patterns, and potential
attack indicators against SIMATIC S7 PLCs.
"""
import struct
import sys
import json
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional
try:
from scapy.all import rdpcap, IP, TCP
except ImportError:
print("Install scapy: pip install scapy")
sys.exit(1)
# S7comm ROSCTR (PDU type) definitions
S7_ROSCTR = {
0x01: "Job (Request)",
0x02: "Ack",
0x03: "Ack_Data (Response)",
0x07: "Userdata",
}
# S7comm function codes
S7_FUNCTIONS = {
0x00: "CPU services",
0x04: "Read Variable",
0x05: "Write Variable",
0x1A: "Request Download (Program)",
0x1B: "Download Block",
0x1C: "Download Ended",
0x1D: "Start Upload (Read Program)",
0x1E: "Upload Block",
0x1F: "Upload Ended",
0x28: "PI Service (Start/Stop CPU)",
0x29: "PLC Stop",
0xF0: "Setup Communication",
}
# Critical security-relevant operations
CRITICAL_FUNCTIONS = {0x1A, 0x1B, 0x1C, 0x28, 0x29, 0x05}
PROGRAM_FUNCTIONS = {0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F}
class S7commSecurityFinding:
"""Represents a security finding in S7comm traffic."""
def __init__(self, severity: str, finding_type: str, src_ip: str,
dst_ip: str, function: str, description: str,
cve: str = "", recommendation: str = ""):
self.timestamp = datetime.now().isoformat()
self.severity = severity
self.finding_type = finding_type
self.src_ip = src_ip
self.dst_ip = dst_ip
self.function = function
self.description = description
self.cve = cve
self.recommendation = recommendation
class S7commAnalyzer:
"""Analyzes S7comm protocol traffic for security vulnerabilities."""
def __init__(self):
self.findings: List[S7commSecurityFinding] = []
self.sessions: Dict[str, dict] = defaultdict(lambda: {
"packets": 0,
"functions_seen": set(),
"writes": 0,
"program_downloads": 0,
"cpu_commands": 0,
"first_seen": None,
"last_seen": None,
})
self.authorized_engineering: set = set()
self.packet_count = 0
def set_authorized_stations(self, ips: List[str]):
"""Set list of authorized engineering workstation IPs."""
self.authorized_engineering = set(ips)
def parse_s7comm(self, payload: bytes) -> Optional[dict]:
"""Parse S7comm protocol data from TCP payload."""
# TPKT header: version(1) + reserved(1) + length(2)
if len(payload) < 4:
return None
tpkt_version = payload[0]
if tpkt_version != 3:
return None
tpkt_length = struct.unpack(">H", payload[2:4])[0]
# COTP header follows TPKT
if len(payload) < 7:
return None
cotp_length = payload[4]
cotp_type = payload[5]
# S7comm starts after COTP
s7_offset = 4 + 1 + cotp_length
if len(payload) < s7_offset + 10:
return None
# S7comm header
protocol_id = payload[s7_offset]
if protocol_id != 0x32: # S7comm magic byte
return None
rosctr = payload[s7_offset + 1]
redundancy = struct.unpack(">H", payload[s7_offset + 2:s7_offset + 4])[0]
pdu_ref = struct.unpack(">H", payload[s7_offset + 4:s7_offset + 6])[0]
param_length = struct.unpack(">H", payload[s7_offset + 6:s7_offset + 8])[0]
data_length = struct.unpack(">H", payload[s7_offset + 8:s7_offset + 10])[0]
result = {
"rosctr": rosctr,
"rosctr_name": S7_ROSCTR.get(rosctr, f"Unknown (0x{rosctr:02x})"),
"pdu_ref": pdu_ref,
"param_length": param_length,
"data_length": data_length,
}
# Parse function code from parameters
param_offset = s7_offset + 10
if rosctr in (0x01, 0x03) and param_length > 0 and len(payload) > param_offset:
func_code = payload[param_offset]
result["function_code"] = func_code
result["function_name"] = S7_FUNCTIONS.get(func_code, f"Unknown (0x{func_code:02x})")
return result
def analyze_packet(self, pkt):
"""Analyze a packet for S7comm security issues."""
self.packet_count += 1
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
return
tcp = pkt[TCP]
if tcp.dport != 102 and tcp.sport != 102:
return
payload = bytes(tcp.payload)
if not payload:
return
s7 = self.parse_s7comm(payload)
if not s7:
return
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
session_key = f"{src_ip}->{dst_ip}"
session = self.sessions[session_key]
session["packets"] += 1
if session["first_seen"] is None:
session["first_seen"] = float(pkt.time)
session["last_seen"] = float(pkt.time)
func_code = s7.get("function_code")
if func_code is not None:
session["functions_seen"].add(func_code)
# Check 1: Unauthorized engineering station
if tcp.dport == 102 and func_code in CRITICAL_FUNCTIONS:
if self.authorized_engineering and src_ip not in self.authorized_engineering:
self.findings.append(S7commSecurityFinding(
severity="CRITICAL",
finding_type="UNAUTHORIZED_ENGINEERING_ACCESS",
src_ip=src_ip, dst_ip=dst_ip,
function=s7.get("function_name", "Unknown"),
description=(
f"Critical S7comm operation from unauthorized source {src_ip}. "
f"Function: {s7.get('function_name')}. Only authorized TIA Portal "
f"workstations should issue these commands."
),
recommendation="Block unauthorized sources at industrial firewall. Investigate source host for compromise.",
))
# Check 2: CPU Stop command
if func_code == 0x29:
session["cpu_commands"] += 1
self.findings.append(S7commSecurityFinding(
severity="CRITICAL",
finding_type="CPU_STOP_COMMAND",
src_ip=src_ip, dst_ip=dst_ip,
function="PLC CPU Stop (0x29)",
description=f"CPU STOP command sent to PLC at {dst_ip}. This halts PRelated in Security
mac-ops
IncludedComprehensive macOS workstation operations — diagnose kernel panics, identify failing drives, audit launchd startup items, decode wake reasons, triage TCC permission denials, manage APFS snapshots, recover from no-boot. Use for: Mac is slow, slow bootup, won't boot, kernel panic, kernel_task hot, mds_stores CPU, photoanalysisd, cloudd, login loop, gray screen, sleep wake failure, drive failing, IO errors, APFS snapshots eating space, Time Machine local snapshots, Spotlight indexing, launchd, LaunchAgent, LaunchDaemon, login items, TCC permissions, Full Disk Access, Screen Recording denied, Gatekeeper, quarantine, com.apple.quarantine, app is damaged, helper tool, /Library/PrivilegedHelperTools, pmset, wake reasons, dark wake, sysdiagnose, panic.ips, DiagnosticReports, configuration profile, MDM profile, remote diagnostics over SSH.
a11y-audit
IncludedRun accessibility audits on web projects combining automated scanning (axe-core, Lighthouse) with WCAG 2.1 AA compliance mapping, manual check guidance, and structured reporting. Output is configurable: markdown report only, markdown plus machine-readable JSON, or markdown plus issue tracker integration. Use this skill whenever the user mentions "accessibility audit", "a11y audit", "WCAG audit", "accessibility check", "compliance scan", or asks to check a web project for accessibility issues. Also trigger when the user wants to verify WCAG conformance or map findings to a specific standard (CAN-ASC-6.2, EN 301 549, ADA/AODA).
erpclaw
IncludedAI-native ERP system with self-extending OS. Full accounting, invoicing, inventory, purchasing, tax, billing, HR, payroll, advanced accounting (ASC 606/842, intercompany, consolidation), and financial reporting. 413 actions across 14 domains, 43 expansion modules. Constitutional guardrails, adversarial audit, schema migration. Double-entry GL, immutable audit trail, US GAAP.
assess
IncludedAssesses and rates quality 0-10 across multiple dimensions (correctness, maintainability, security, performance, testability, simplicity) with pros/cons analysis. Compares against project conventions and prior decisions from memory. Produces structured evaluation reports with actionable improvement suggestions. Use when evaluating code, designs, architectures, or comparing alternative approaches.
spring-boot-security-jwt
IncludedProvides JWT authentication and authorization patterns for Spring Boot 3.5.x covering token generation with JJWT, Bearer/cookie authentication, database/OAuth2 integration, and RBAC/permission-based access control using Spring Security 6.x. Use when implementing authentication or authorization in Spring Boot applications.
code-hardcode-audit
IncludedDetect hardcoded values, magic numbers, and leaked secrets. TRIGGERS - hardcode audit, magic numbers, PLR2004, secret scanning.