Claude
Skills
Sign in
Back

implementing-soar-automation-with-phantom

Included with Lifetime
$97 forever

Implements Security Orchestration, Automation, and Response (SOAR) workflows using Splunk SOAR (formerly Phantom) to automate alert triage, IOC enrichment, containment actions, and incident response playbooks. Use when SOC teams need to reduce manual analyst work, standardize response procedures, or integrate multiple security tools into automated workflows.

Securitysocsoarphantomsplunk-soarautomationplaybookorchestrationincident-responsescripts

What this skill does

# Implementing SOAR Automation with Phantom

## When to Use

Use this skill when:
- SOC teams need to automate repetitive triage and enrichment tasks for high-volume alerts
- Manual response times exceed SLA requirements and automation can reduce MTTR
- Multiple security tools (SIEM, EDR, firewall, TIP) need orchestrated response actions
- Playbook standardization is required to ensure consistent analyst response across shifts

**Do not use** for fully autonomous containment without human approval gates — always include analyst decision points for high-impact actions like account disabling or host isolation.

## Prerequisites

- Splunk SOAR (Phantom) 6.x+ deployed with web interface access
- App connectors configured: VirusTotal, CrowdStrike, ServiceNow, Active Directory, Splunk ES
- Splunk ES integration for ingesting notable events as SOAR events
- API credentials for each integrated tool stored in SOAR asset configuration
- Python knowledge for custom playbook actions

## Workflow

### Step 1: Configure Asset Connections

Set up integrations with security tools via SOAR Apps:

**VirusTotal Asset Configuration:**
```json
{
  "app": "VirusTotal v3",
  "asset_name": "virustotal_prod",
  "configuration": {
    "api_key": "YOUR_VT_API_KEY",
    "rate_limit": true,
    "max_requests_per_minute": 4
  },
  "product_vendor": "VirusTotal",
  "product_name": "VirusTotal"
}
```

**CrowdStrike Falcon Asset:**
```json
{
  "app": "CrowdStrike Falcon",
  "asset_name": "crowdstrike_prod",
  "configuration": {
    "client_id": "CS_CLIENT_ID",
    "client_secret": "CS_CLIENT_SECRET",
    "base_url": "https://api.crowdstrike.com"
  }
}
```

**Active Directory Asset:**
```json
{
  "app": "Active Directory",
  "asset_name": "ad_prod",
  "configuration": {
    "server": "dc01.company.com",
    "username": "[email protected]",
    "password": "SERVICE_ACCOUNT_PASSWORD",
    "ssl": true
  }
}
```

### Step 2: Build Phishing Triage Playbook

Create an automated phishing response playbook in Python (Phantom playbook format):

```python
"""
Phishing Triage Automation Playbook
Trigger: New phishing email reported via Splunk ES notable or email ingestion
"""

import phantom.rules as phantom
import json

def on_start(container):
    # Extract artifacts (URLs, file hashes, sender) from the container
    artifacts = phantom.get_artifacts(container_id=container["id"])

    for artifact in artifacts:
        artifact_type = artifact.get("cef", {}).get("type", "")

        if artifact_type == "url":
            phantom.act("url reputation", targets=artifact,
                        assets=["virustotal_prod"],
                        callback=url_reputation_callback,
                        name="url_reputation")

        elif artifact_type == "hash":
            phantom.act("file reputation", targets=artifact,
                        assets=["virustotal_prod"],
                        callback=hash_reputation_callback,
                        name="file_reputation")

        elif artifact_type == "ip":
            phantom.act("ip reputation", targets=artifact,
                        assets=["virustotal_prod"],
                        callback=ip_reputation_callback,
                        name="ip_reputation")

def url_reputation_callback(action, success, container, results, handle):
    if not success:
        phantom.comment(container, "URL reputation check failed")
        return

    for result in results:
        data = result.get("data", [{}])[0]
        malicious_count = data.get("summary", {}).get("malicious", 0)
        total_engines = data.get("summary", {}).get("total_engines", 0)

        if malicious_count > 5:
            # High confidence malicious — auto-block and escalate
            phantom.act("block url", targets=result,
                        assets=["palo_alto_prod"],
                        name="block_malicious_url")

            phantom.set_severity(container, "high")
            phantom.set_status(container, "open")
            phantom.comment(container,
                f"URL flagged by {malicious_count}/{total_engines} engines. "
                f"Blocked on firewall. Escalating to Tier 2.")

            # Create ServiceNow ticket
            phantom.act("create ticket", targets=container,
                        assets=["servicenow_prod"],
                        parameters=[{
                            "short_description": f"Phishing - Malicious URL detected",
                            "urgency": "2",
                            "impact": "2"
                        }],
                        name="create_incident_ticket")

        elif malicious_count > 0:
            # Medium confidence — request analyst review
            phantom.promote(container, template="Phishing Investigation")
            phantom.comment(container,
                f"URL flagged by {malicious_count}/{total_engines} engines. "
                f"Requires analyst review.")

        else:
            # Clean — close with comment
            phantom.set_status(container, "closed")
            phantom.comment(container,
                f"URL clean: 0/{total_engines} engines flagged. Auto-closed.")

def hash_reputation_callback(action, success, container, results, handle):
    if not success:
        return

    for result in results:
        data = result.get("data", [{}])[0]
        positives = data.get("summary", {}).get("positives", 0)

        if positives > 10:
            # Known malware — quarantine and block
            phantom.act("quarantine device", targets=result,
                        assets=["crowdstrike_prod"],
                        name="isolate_endpoint")
            phantom.set_severity(container, "high")

def ip_reputation_callback(action, success, container, results, handle):
    if not success:
        return

    for result in results:
        data = result.get("data", [{}])[0]
        malicious = data.get("summary", {}).get("malicious", 0)

        if malicious > 3:
            phantom.act("block ip", targets=result,
                        assets=["palo_alto_prod"],
                        name="block_malicious_ip")
```

### Step 3: Build Alert Enrichment Playbook

Automate enrichment for all incoming SIEM alerts:

```python
"""
Universal Alert Enrichment Playbook
Runs on every new event to add context before analyst review
"""

import phantom.rules as phantom

def on_start(container):
    # Get all artifacts
    success, message, artifacts = phantom.get_artifacts(
        container_id=container["id"], full_data=True
    )

    ip_artifacts = [a for a in artifacts if a.get("cef", {}).get("sourceAddress")]
    domain_artifacts = [a for a in artifacts if a.get("cef", {}).get("destinationDnsDomain")]

    # Enrich IPs in parallel
    for artifact in ip_artifacts:
        ip = artifact["cef"]["sourceAddress"]

        # VirusTotal lookup
        phantom.act("ip reputation",
                    parameters=[{"ip": ip}],
                    assets=["virustotal_prod"],
                    callback=enrich_ip_callback,
                    name=f"vt_ip_{ip}")

        # GeoIP lookup
        phantom.act("geolocate ip",
                    parameters=[{"ip": ip}],
                    assets=["maxmind_prod"],
                    callback=geoip_callback,
                    name=f"geo_{ip}")

        # Whois lookup
        phantom.act("whois ip",
                    parameters=[{"ip": ip}],
                    assets=["whois_prod"],
                    name=f"whois_{ip}")

    # Enrich domains
    for artifact in domain_artifacts:
        domain = artifact["cef"]["destinationDnsDomain"]
        phantom.act("domain reputation",
                    parameters=[{"domain": domain}],
                    assets=["virustotal_prod"],
                    name=f"vt_domain_{domain}")

def enrich_ip_callback(action, success, container, results, handle):
    """Update container with enrichment data"""
    if success:
        for result in results:
            summary = 

Related in Security