building-threat-intelligence-feed-integration
Builds automated threat intelligence feed integration pipelines connecting STIX/TAXII feeds, open-source threat intel, and commercial TI platforms into SIEM and security tools for real-time IOC matching and alerting. Use when SOC teams need to operationalize threat intelligence by automating feed ingestion, normalization, scoring, and distribution to detection systems.
What this skill does
# Building Threat Intelligence Feed Integration
## When to Use
Use this skill when:
- SOC teams need automated ingestion of threat intelligence feeds into SIEM platforms
- Multiple TI sources require normalization into a common format (STIX 2.1)
- Detection systems need real-time IOC matching against network and endpoint telemetry
- TI feed quality assessment and deduplication processes need to be established
**Do not use** for manual IOC lookup — use dedicated enrichment tools (VirusTotal, AbuseIPDB) for ad-hoc queries.
## Prerequisites
- MISP instance or Threat Intelligence Platform (TIP) for feed aggregation
- STIX/TAXII client library (`taxii2-client`, `stix2` Python packages)
- SIEM platform (Splunk ES, Elastic Security, or Sentinel) with TI framework configured
- API keys for commercial and open-source feeds (AlienVault OTX, Abuse.ch, CISA AIS)
- Python 3.8+ for feed processing automation
## Workflow
### Step 1: Identify and Catalog Intelligence Sources
Map available feeds by type, format, and update frequency:
| Feed Source | Format | IOC Types | Update Freq | Cost |
|-------------|--------|-----------|-------------|------|
| AlienVault OTX | STIX/JSON | IP, Domain, Hash, URL | Real-time | Free |
| Abuse.ch URLhaus | CSV/JSON | URL, Domain | Every 5 min | Free |
| Abuse.ch MalwareBazaar | JSON API | File Hash | Real-time | Free |
| CISA AIS | STIX/TAXII 2.1 | All types | Daily | Free (US Gov) |
| CrowdStrike Intel | STIX/JSON | All types + Actor TTP | Real-time | Commercial |
| Mandiant Advantage | STIX 2.1 | All types + Reports | Real-time | Commercial |
### Step 2: Ingest STIX/TAXII Feeds
Connect to a TAXII 2.1 server and download indicators:
```python
from taxii2client.v21 import Server, Collection
from stix2 import parse
# Connect to TAXII server (example: CISA AIS)
server = Server(
"https://taxii.cisa.gov/taxii2/",
user="your_username",
password="your_password"
)
# List available collections
for api_root in server.api_roots:
print(f"API Root: {api_root.title}")
for collection in api_root.collections:
print(f" Collection: {collection.title} (ID: {collection.id})")
# Fetch indicators from a collection
collection = Collection(
"https://taxii.cisa.gov/taxii2/collections/COLLECTION_ID/",
user="your_username",
password="your_password"
)
# Get indicators added in last 24 hours
from datetime import datetime, timedelta
added_after = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
response = collection.get_objects(added_after=added_after, type=["indicator"])
for obj in response.get("objects", []):
indicator = parse(obj)
print(f"Type: {indicator.type}")
print(f"Pattern: {indicator.pattern}")
print(f"Valid Until: {indicator.valid_until}")
print(f"Confidence: {indicator.confidence}")
print("---")
```
### Step 3: Ingest Open-Source Feeds
**Abuse.ch URLhaus Feed:**
```python
import requests
import csv
from io import StringIO
# Download URLhaus recent URLs
response = requests.get("https://urlhaus.abuse.ch/downloads/csv_recent/")
reader = csv.reader(StringIO(response.text), delimiter=',')
indicators = []
for row in reader:
if row[0].startswith("#"):
continue
indicators.append({
"id": row[0],
"dateadded": row[1],
"url": row[2],
"url_status": row[3],
"threat": row[5],
"tags": row[6]
})
print(f"Ingested {len(indicators)} URLs from URLhaus")
# Filter for active threats only
active = [i for i in indicators if i["url_status"] == "online"]
print(f"Active threats: {len(active)}")
```
**AlienVault OTX Pulse Feed:**
```python
from OTXv2 import OTXv2, IndicatorTypes
otx = OTXv2("YOUR_OTX_API_KEY")
# Get subscribed pulses (last 24 hours)
pulses = otx.getall(modified_since="2024-03-14T00:00:00")
for pulse in pulses:
print(f"Pulse: {pulse['name']}")
print(f"Tags: {pulse['tags']}")
for indicator in pulse["indicators"]:
print(f" IOC: {indicator['indicator']} ({indicator['type']})")
```
**Abuse.ch Feodo Tracker (C2 IPs):**
```python
response = requests.get("https://feodotracker.abuse.ch/downloads/ipblocklist_recommended.json")
c2_data = response.json()
for entry in c2_data:
print(f"IP: {entry['ip_address']}:{entry['port']}")
print(f"Malware: {entry['malware']}")
print(f"First Seen: {entry['first_seen']}")
print(f"Last Online: {entry['last_online']}")
```
### Step 4: Normalize and Deduplicate
Convert all feeds to STIX 2.1 format for standardization:
```python
from stix2 import Indicator, Bundle
import hashlib
def create_stix_indicator(ioc_value, ioc_type, source, confidence=50):
"""Convert raw IOC to STIX 2.1 indicator"""
pattern_map = {
"ipv4": f"[ipv4-addr:value = '{ioc_value}']",
"domain": f"[domain-name:value = '{ioc_value}']",
"url": f"[url:value = '{ioc_value}']",
"sha256": f"[file:hashes.'SHA-256' = '{ioc_value}']",
"md5": f"[file:hashes.MD5 = '{ioc_value}']",
}
return Indicator(
name=f"{ioc_type}: {ioc_value}",
pattern=pattern_map[ioc_type],
pattern_type="stix",
valid_from="2024-03-15T00:00:00Z",
confidence=confidence,
labels=[source],
custom_properties={"x_source_feed": source}
)
# Deduplicate across sources
seen_iocs = set()
unique_indicators = []
for ioc in all_collected_iocs:
ioc_hash = hashlib.sha256(f"{ioc['type']}:{ioc['value']}".encode()).hexdigest()
if ioc_hash not in seen_iocs:
seen_iocs.add(ioc_hash)
unique_indicators.append(
create_stix_indicator(ioc["value"], ioc["type"], ioc["source"])
)
bundle = Bundle(objects=unique_indicators)
print(f"Unique indicators: {len(unique_indicators)}")
```
### Step 5: Push to SIEM Threat Intelligence Framework
**Push to Splunk ES Threat Intelligence:**
```python
import requests
splunk_url = "https://splunk.company.com:8089"
headers = {"Authorization": f"Bearer {splunk_token}"}
for indicator in unique_indicators:
# Extract IOC value from STIX pattern
ioc_value = indicator.pattern.split("'")[1]
# Upload to Splunk ES threat intel collection
data = {
"ip": ioc_value,
"description": indicator.name,
"weight": indicator.confidence // 10,
"threat_key": indicator.id,
"source_feed": indicator.get("x_source_feed", "unknown")
}
requests.post(
f"{splunk_url}/services/data/threat_intel/item/ip_intel",
headers=headers, data=data,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
)
```
**Push to MISP for centralized management:**
```python
from pymisp import PyMISP, MISPEvent, MISPAttribute
misp = PyMISP("https://misp.company.com", "YOUR_MISP_API_KEY")
# Create event for feed batch
event = MISPEvent()
event.info = f"TI Feed Import - {datetime.now().strftime('%Y-%m-%d')}"
event.threat_level_id = 2 # Medium
event.analysis = 2 # Completed
# Add indicators as attributes
for ioc in unique_indicators:
attr = MISPAttribute()
attr.type = "ip-dst" if "ipv4" in ioc.pattern else "domain"
attr.value = ioc.pattern.split("'")[1]
attr.to_ids = True
attr.comment = f"Source: {ioc.get('x_source_feed', 'mixed')}"
event.add_attribute(**attr)
result = misp.add_event(event)
print(f"MISP Event created: {result['Event']['id']}")
```
### Step 6: Monitor Feed Health and Quality
Track feed effectiveness metrics:
```spl
index=threat_intel sourcetype="threat_intel_manager"
| stats count AS total_iocs,
dc(threat_key) AS unique_iocs,
dc(source_feed) AS feed_count
by source_feed
| join source_feed [
search index=notable source="Threat Intelligence"
| stats count AS matches by source_feed
]
| eval match_rate = round(matches / unique_iocs * 100, 2)
| sort - match_rate
| table source_feed, unique_iocs, matches, match_rate
```
## Key Related in Security
mac-ops
IncludedComprehensive macOS workstation operations — diagnose kernel panics, identify failing drives, audit launchd startup items, decode wake reasons, triage TCC permission denials, manage APFS snapshots, recover from no-boot. Use for: Mac is slow, slow bootup, won't boot, kernel panic, kernel_task hot, mds_stores CPU, photoanalysisd, cloudd, login loop, gray screen, sleep wake failure, drive failing, IO errors, APFS snapshots eating space, Time Machine local snapshots, Spotlight indexing, launchd, LaunchAgent, LaunchDaemon, login items, TCC permissions, Full Disk Access, Screen Recording denied, Gatekeeper, quarantine, com.apple.quarantine, app is damaged, helper tool, /Library/PrivilegedHelperTools, pmset, wake reasons, dark wake, sysdiagnose, panic.ips, DiagnosticReports, configuration profile, MDM profile, remote diagnostics over SSH.
a11y-audit
IncludedRun accessibility audits on web projects combining automated scanning (axe-core, Lighthouse) with WCAG 2.1 AA compliance mapping, manual check guidance, and structured reporting. Output is configurable: markdown report only, markdown plus machine-readable JSON, or markdown plus issue tracker integration. Use this skill whenever the user mentions "accessibility audit", "a11y audit", "WCAG audit", "accessibility check", "compliance scan", or asks to check a web project for accessibility issues. Also trigger when the user wants to verify WCAG conformance or map findings to a specific standard (CAN-ASC-6.2, EN 301 549, ADA/AODA).
erpclaw
IncludedAI-native ERP system with self-extending OS. Full accounting, invoicing, inventory, purchasing, tax, billing, HR, payroll, advanced accounting (ASC 606/842, intercompany, consolidation), and financial reporting. 413 actions across 14 domains, 43 expansion modules. Constitutional guardrails, adversarial audit, schema migration. Double-entry GL, immutable audit trail, US GAAP.
assess
IncludedAssesses and rates quality 0-10 across multiple dimensions (correctness, maintainability, security, performance, testability, simplicity) with pros/cons analysis. Compares against project conventions and prior decisions from memory. Produces structured evaluation reports with actionable improvement suggestions. Use when evaluating code, designs, architectures, or comparing alternative approaches.
spring-boot-security-jwt
IncludedProvides JWT authentication and authorization patterns for Spring Boot 3.5.x covering token generation with JJWT, Bearer/cookie authentication, database/OAuth2 integration, and RBAC/permission-based access control using Spring Security 6.x. Use when implementing authentication or authorization in Spring Boot applications.
code-hardcode-audit
IncludedDetect hardcoded values, magic numbers, and leaked secrets. TRIGGERS - hardcode audit, magic numbers, PLR2004, secret scanning.