Claude
Skills
Sign in
Back

securing-historian-server-in-ot-environment

Included with Lifetime
$97 forever

This skill covers hardening and securing process historian servers (OSIsoft PI, Honeywell PHD, GE Proficy, AVEVA Historian) in OT environments. It addresses network placement across Purdue levels, access control for historian interfaces, data replication through DMZ using data diodes or PI-to-PI connectors, SQL injection prevention in historian queries, and integrity protection of process data used for safety analysis, regulatory reporting, and process optimization.

Backend & APIsot-securityicsscadaindustrial-controliec62443historianosisoft-pidata-integrityscripts

What this skill does


# Securing Historian Server in OT Environment

## When to Use

- When deploying a new historian server in an OT environment and configuring it securely from the start
- When hardening an existing historian after a security assessment identified it as a high-risk target
- When designing historian data replication architecture through a DMZ for IT access to process data
- When implementing access controls to prevent unauthorized modification of historical process data
- When investigating suspected historian compromise or data integrity issues

**Do not use** for IT-only database security without OT data (see general database hardening), for real-time SCADA data transmission security (see detecting-attacks-on-scada-systems), or for historian selection and sizing decisions.

## Prerequisites

- Historian platform (OSIsoft PI, Honeywell PHD, GE Proficy, AVEVA Historian) installed and operational
- Network segmentation with historian placed in Level 3 (Site Operations) per Purdue Model
- Understanding of data flows: field devices -> PLCs -> OPC servers -> historian
- Access to historian administration credentials
- DMZ infrastructure for IT-facing data replication

## Workflow

### Step 1: Audit Current Historian Security Configuration

Evaluate the current security posture of the historian server including network exposure, authentication, and access controls.

```python
#!/usr/bin/env python3
"""Historian Security Audit Tool.

Evaluates the security configuration of process historian servers
including network exposure, authentication, access controls,
and data integrity protections.
"""

import json
import socket
import ssl
import subprocess
import sys
from dataclasses import dataclass, field, asdict
from datetime import datetime


@dataclass
class AuditFinding:
    finding_id: str
    severity: str
    category: str
    title: str
    detail: str
    remediation: str


class HistorianSecurityAudit:
    """Security audit for OT historian servers."""

    def __init__(self, historian_ip, historian_type="PI"):
        self.ip = historian_ip
        self.type = historian_type
        self.findings = []
        self.counter = 1

    def check_network_exposure(self):
        """Check which network services are exposed by the historian."""
        print(f"[*] Checking network exposure: {self.ip}")

        # Common historian ports
        ports_to_check = {
            5450: ("PI Data Archive", "PI SDK/API connections"),
            5457: ("PI AF Server", "PI Asset Framework"),
            5459: ("PI Notifications", "PI Notification Service"),
            443: ("HTTPS", "PI Vision / Web API"),
            80: ("HTTP", "Unsecured web interface"),
            1433: ("MS SQL Server", "Direct database access"),
            5432: ("PostgreSQL", "Direct database access"),
            3389: ("RDP", "Remote Desktop"),
            135: ("RPC", "Windows RPC"),
            445: ("SMB", "Windows File Sharing"),
            8080: ("HTTP Alt", "Alternative web interface"),
        }

        exposed = []
        for port, (service, desc) in ports_to_check.items():
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(3)
                result = sock.connect_ex((self.ip, port))
                sock.close()
                if result == 0:
                    exposed.append({"port": port, "service": service, "description": desc})
            except Exception:
                pass

        # Flag unnecessary exposed services
        for svc in exposed:
            if svc["port"] in (80, 135, 445, 3389):
                self.findings.append(AuditFinding(
                    finding_id=f"HIST-{self.counter:03d}",
                    severity="high",
                    category="Network Exposure",
                    title=f"Unnecessary service exposed: {svc['service']} (port {svc['port']})",
                    detail=f"Port {svc['port']} ({svc['description']}) is accessible on historian",
                    remediation=f"Disable {svc['service']} or restrict via host firewall",
                ))
                self.counter += 1

        if any(s["port"] == 80 for s in exposed):
            self.findings.append(AuditFinding(
                finding_id=f"HIST-{self.counter:03d}",
                severity="high",
                category="Encryption",
                title="Historian web interface on unencrypted HTTP",
                detail="Port 80 (HTTP) is open, exposing credentials and data in cleartext",
                remediation="Redirect HTTP to HTTPS; disable port 80",
            ))
            self.counter += 1

        return exposed

    def check_authentication(self):
        """Check historian authentication configuration."""
        print(f"[*] Checking authentication configuration")

        # Check if PI Trust authentication is still enabled (legacy, insecure)
        # PI Trust allows IP-based authentication without credentials
        checks = [
            {
                "check": "PI Trust Authentication",
                "risk": "PI Trust allows connections based on IP address alone without credentials",
                "severity": "critical",
                "remediation": "Migrate all PI Trust connections to Windows Integrated Security",
            },
            {
                "check": "Default piadmin account",
                "risk": "Default PI administrator account may have default or weak password",
                "severity": "critical",
                "remediation": "Disable piadmin; use named Windows accounts with PI mappings",
            },
            {
                "check": "PI SDK anonymous access",
                "risk": "Anonymous PI SDK connections may be permitted",
                "severity": "high",
                "remediation": "Require authentication for all PI SDK connections",
            },
        ]

        for check in checks:
            self.findings.append(AuditFinding(
                finding_id=f"HIST-{self.counter:03d}",
                severity=check["severity"],
                category="Authentication",
                title=f"Check: {check['check']}",
                detail=check["risk"],
                remediation=check["remediation"],
            ))
            self.counter += 1

    def check_data_integrity(self):
        """Check data integrity protections."""
        print(f"[*] Checking data integrity protections")

        integrity_checks = [
            AuditFinding(
                finding_id=f"HIST-{self.counter:03d}",
                severity="high",
                category="Data Integrity",
                title="Verify historical data modification audit trail",
                detail="Modifications to historical process data should be logged with before/after values",
                remediation="Enable PI audit trail for all data modifications; restrict edit permissions",
            ),
            AuditFinding(
                finding_id=f"HIST-{self.counter + 1:03d}",
                severity="medium",
                category="Data Integrity",
                title="Verify backup integrity and recovery testing",
                detail="Historian backups should be tested regularly for recovery capability",
                remediation="Implement automated backup verification with quarterly recovery testing",
            ),
        ]
        self.findings.extend(integrity_checks)
        self.counter += len(integrity_checks)

    def generate_report(self):
        """Generate historian security audit report."""
        report = []
        report.append("=" * 70)
        report.append("HISTORIAN SECURITY AUDIT REPORT")
        report.append(f"Target: {self.ip} ({self.type})")
        report.append(f"Date: {datetime.now().isoformat()}")
        report.append("=" * 70)

        for sev in ["critical", "high", "medium", "low"]:
            findings = [f for f in self.findings if f.severity == sev]
            if finding

Related in Backend & APIs