Claude
Skills
Sign in
Back

performing-ioc-enrichment-automation

Included with Lifetime
$97 forever

Automates Indicator of Compromise (IOC) enrichment by orchestrating lookups across VirusTotal, AbuseIPDB, Shodan, MISP, and other intelligence sources to provide contextual scoring and disposition recommendations. Use when SOC analysts need rapid multi-source enrichment of IPs, domains, URLs, and file hashes during alert triage or incident investigation.

Generalsociocenrichmentautomationvirustotalabuseipdbshodanthreat-intelligencescripts

What this skill does

# Performing IOC Enrichment Automation

## When to Use

Use this skill when:
- SOC analysts need to quickly enrich IOCs from multiple sources during alert triage
- High alert volumes require automated enrichment to reduce manual lookup time
- Incident investigations need comprehensive IOC context for scope assessment
- SOAR playbooks require enrichment actions as part of automated triage workflows

**Do not use** for bulk blocking decisions without analyst review — enrichment provides context, not definitive malicious/benign determination.

## Prerequisites

- API keys: VirusTotal (free or premium), AbuseIPDB, Shodan, URLScan.io, GreyNoise
- Python 3.8+ with `requests`, `vt-py`, `shodan` libraries
- MISP instance or TIP for cross-referencing organizational intelligence
- SOAR platform (optional) for workflow integration
- Rate limit awareness: VT free (4 req/min), AbuseIPDB (1000/day), Shodan (1 req/sec)

## Workflow

### Step 1: Build Unified Enrichment Engine

Create a multi-source enrichment pipeline:

```python
import requests
import vt
import shodan
import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class EnrichmentResult:
    ioc_value: str
    ioc_type: str
    virustotal: dict = field(default_factory=dict)
    abuseipdb: dict = field(default_factory=dict)
    shodan_data: dict = field(default_factory=dict)
    greynoise: dict = field(default_factory=dict)
    urlscan: dict = field(default_factory=dict)
    misp_matches: list = field(default_factory=list)
    risk_score: float = 0.0
    disposition: str = "Unknown"

class IOCEnrichmentEngine:
    def __init__(self, config):
        self.vt_client = vt.Client(config["virustotal_key"])
        self.shodan_api = shodan.Shodan(config["shodan_key"])
        self.abuseipdb_key = config["abuseipdb_key"]
        self.greynoise_key = config["greynoise_key"]
        self.urlscan_key = config["urlscan_key"]

    def enrich_ip(self, ip_address):
        result = EnrichmentResult(ioc_value=ip_address, ioc_type="ip")

        # VirusTotal
        try:
            vt_obj = self.vt_client.get_object(f"/ip_addresses/{ip_address}")
            result.virustotal = {
                "malicious": vt_obj.last_analysis_stats.get("malicious", 0),
                "suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
                "total_engines": sum(vt_obj.last_analysis_stats.values()),
                "reputation": vt_obj.reputation,
                "country": getattr(vt_obj, "country", "Unknown"),
                "as_owner": getattr(vt_obj, "as_owner", "Unknown")
            }
        except Exception as e:
            result.virustotal = {"error": str(e)}

        # AbuseIPDB
        try:
            response = requests.get(
                "https://api.abuseipdb.com/api/v2/check",
                headers={"Key": self.abuseipdb_key, "Accept": "application/json"},
                params={"ipAddress": ip_address, "maxAgeInDays": 90}
            )
            data = response.json()["data"]
            result.abuseipdb = {
                "confidence_score": data["abuseConfidenceScore"],
                "total_reports": data["totalReports"],
                "is_tor": data.get("isTor", False),
                "usage_type": data.get("usageType", "Unknown"),
                "isp": data.get("isp", "Unknown"),
                "domain": data.get("domain", "Unknown")
            }
        except Exception as e:
            result.abuseipdb = {"error": str(e)}

        # Shodan
        try:
            host = self.shodan_api.host(ip_address)
            result.shodan_data = {
                "ports": host.get("ports", []),
                "os": host.get("os", "Unknown"),
                "organization": host.get("org", "Unknown"),
                "isp": host.get("isp", "Unknown"),
                "vulns": host.get("vulns", []),
                "last_update": host.get("last_update", "Unknown")
            }
        except shodan.APIError:
            result.shodan_data = {"status": "Not found in Shodan"}

        # GreyNoise
        try:
            response = requests.get(
                f"https://api.greynoise.io/v3/community/{ip_address}",
                headers={"key": self.greynoise_key}
            )
            gn_data = response.json()
            result.greynoise = {
                "classification": gn_data.get("classification", "unknown"),
                "noise": gn_data.get("noise", False),
                "riot": gn_data.get("riot", False),
                "name": gn_data.get("name", "Unknown")
            }
        except Exception as e:
            result.greynoise = {"error": str(e)}

        # Calculate composite risk score
        result.risk_score = self._calculate_ip_risk(result)
        result.disposition = self._determine_disposition(result.risk_score)
        return result

    def enrich_domain(self, domain):
        result = EnrichmentResult(ioc_value=domain, ioc_type="domain")

        # VirusTotal
        try:
            vt_obj = self.vt_client.get_object(f"/domains/{domain}")
            result.virustotal = {
                "malicious": vt_obj.last_analysis_stats.get("malicious", 0),
                "suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
                "reputation": vt_obj.reputation,
                "creation_date": getattr(vt_obj, "creation_date", "Unknown"),
                "registrar": getattr(vt_obj, "registrar", "Unknown"),
                "categories": getattr(vt_obj, "categories", {})
            }
        except Exception as e:
            result.virustotal = {"error": str(e)}

        # URLScan.io
        try:
            response = requests.get(
                f"https://urlscan.io/api/v1/search/?q=domain:{domain}",
                headers={"API-Key": self.urlscan_key}
            )
            scans = response.json().get("results", [])
            result.urlscan = {
                "total_scans": len(scans),
                "verdicts": [s.get("verdicts", {}).get("overall", {}).get("malicious", False)
                            for s in scans[:5]],
                "last_scan": scans[0]["task"]["time"] if scans else "Never scanned"
            }
        except Exception as e:
            result.urlscan = {"error": str(e)}

        result.risk_score = self._calculate_domain_risk(result)
        result.disposition = self._determine_disposition(result.risk_score)
        return result

    def enrich_hash(self, file_hash):
        result = EnrichmentResult(ioc_value=file_hash, ioc_type="hash")

        # VirusTotal
        try:
            vt_obj = self.vt_client.get_object(f"/files/{file_hash}")
            result.virustotal = {
                "malicious": vt_obj.last_analysis_stats.get("malicious", 0),
                "suspicious": vt_obj.last_analysis_stats.get("suspicious", 0),
                "undetected": vt_obj.last_analysis_stats.get("undetected", 0),
                "total_engines": sum(vt_obj.last_analysis_stats.values()),
                "type_description": getattr(vt_obj, "type_description", "Unknown"),
                "popular_threat_name": getattr(vt_obj, "popular_threat_classification", {}).get(
                    "suggested_threat_label", "Unknown"
                ),
                "sandbox_verdicts": getattr(vt_obj, "sandbox_verdicts", {}),
                "first_seen": getattr(vt_obj, "first_submission_date", "Unknown")
            }
        except vt.APIError:
            result.virustotal = {"status": "Not found in VirusTotal"}

        # MalwareBazaar
        try:
            response = requests.post(
                "https://mb-api.abuse.ch/api/v1/",
                data={"query": "get_info", "hash": file_hash}
            )
            mb_data = response.json()
            if mb_data["query_status"] == "ok":
                entry = mb_data["data"][0]
                result.abuseipdb = {  # Reusing field for MalwareBazaar data
                    "malware_family": entry.get("signature

Related in General