implementing-threat-intelligence-lifecycle-management
Implement a structured threat intelligence lifecycle encompassing planning, collection, processing, analysis, dissemination, and feedback stages to produce actionable intelligence for organizational decision-making.
What this skill does
# Implementing Threat Intelligence Lifecycle Management
## Overview
The threat intelligence lifecycle is a structured, iterative process for transforming raw data into actionable intelligence. Based on the intelligence cycle used by military and government agencies, it comprises six phases: Direction (requirements gathering), Collection (data acquisition), Processing (normalization and deduplication), Analysis (contextualization and assessment), Dissemination (distribution to stakeholders), and Feedback (evaluation and refinement). This skill covers building each phase with tooling, metrics, and integration points for a mature CTI program.
## When to Use
- When deploying or configuring implementing threat intelligence lifecycle management capabilities in your environment
- When establishing security controls aligned to compliance requirements
- When building or improving security architecture for this domain
- When conducting security assessments that require this implementation
## Prerequisites
- Python 3.9+ with `pymisp`, `stix2`, `requests`, `pandas` libraries
- MISP or OpenCTI as threat intelligence platform
- Ticketing system (Jira, ServiceNow) for requirements management
- SIEM integration (Splunk, Elastic) for indicator operationalization
- Understanding of intelligence analysis techniques (ACH, Diamond Model)
## Key Concepts
### Intelligence Requirements (IR)
Priority Intelligence Requirements (PIRs) define what the organization needs to know. Examples: Which threat actors target our sector? What vulnerabilities are being actively exploited? Are our brand or credentials being traded on dark web? PIRs drive collection planning and ensure intelligence production is relevant.
### Collection Management Framework
A collection management framework maps intelligence requirements to collection sources, tracks collection gaps, and ensures coverage across the threat landscape. Sources include OSINT, commercial feeds, ISAC sharing, internal telemetry, and human intelligence from industry contacts.
### Intelligence Levels
Strategic intelligence informs executive decision-making (threat landscape, risk trends, geopolitical context). Operational intelligence supports security operations (campaign tracking, actor TTPs, attack timing). Tactical intelligence enables immediate defense (IOCs, detection rules, blocklists).
## Workflow
### Step 1: Define Intelligence Requirements
```python
import json
from datetime import datetime
from enum import Enum
class Priority(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
class IntelligenceRequirement:
def __init__(self, requirement_id, question, priority, stakeholder,
intelligence_level, collection_sources=None):
self.id = requirement_id
self.question = question
self.priority = priority
self.stakeholder = stakeholder
self.level = intelligence_level
self.sources = collection_sources or []
self.created = datetime.now().isoformat()
self.status = "active"
self.last_answered = None
def to_dict(self):
return {
"id": self.id,
"question": self.question,
"priority": self.priority.name,
"stakeholder": self.stakeholder,
"intelligence_level": self.level,
"collection_sources": self.sources,
"created": self.created,
"status": self.status,
"last_answered": self.last_answered,
}
class RequirementsManager:
def __init__(self):
self.requirements = []
def add_requirement(self, requirement):
self.requirements.append(requirement)
print(f"[+] Added IR-{requirement.id}: {requirement.question[:60]}...")
def get_active_requirements(self, priority=None, level=None):
filtered = [r for r in self.requirements if r.status == "active"]
if priority:
filtered = [r for r in filtered if r.priority == priority]
if level:
filtered = [r for r in filtered if r.level == level]
return filtered
def export_requirements(self, output_file="intelligence_requirements.json"):
data = [r.to_dict() for r in self.requirements]
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
print(f"[+] Exported {len(data)} requirements to {output_file}")
# Define organizational PIRs
mgr = RequirementsManager()
mgr.add_requirement(IntelligenceRequirement(
"PIR-001", "Which threat actors are actively targeting our sector?",
Priority.CRITICAL, "CISO", "strategic",
["MITRE ATT&CK", "ISAC feeds", "Vendor reports"],
))
mgr.add_requirement(IntelligenceRequirement(
"PIR-002", "What vulnerabilities are being actively exploited in the wild?",
Priority.CRITICAL, "Vulnerability Management", "operational",
["CISA KEV", "Exploit-DB", "VulnCheck", "Shodan"],
))
mgr.add_requirement(IntelligenceRequirement(
"PIR-003", "Are any organization credentials or data exposed on dark web?",
Priority.HIGH, "SOC Manager", "tactical",
["Dark web monitoring", "Paste site monitoring", "Breach databases"],
))
mgr.add_requirement(IntelligenceRequirement(
"PIR-004", "What are the emerging attack techniques against cloud infrastructure?",
Priority.HIGH, "Cloud Security", "operational",
["ATT&CK Cloud matrix", "Vendor advisories", "ISAC bulletins"],
))
mgr.export_requirements()
```
### Step 2: Build Collection Pipeline
```python
import requests
from datetime import datetime, timedelta
class CollectionPipeline:
def __init__(self, config):
self.config = config
self.collected_data = []
def collect_cisa_kev(self):
"""Collect CISA Known Exploited Vulnerabilities catalog."""
url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
resp = requests.get(url, timeout=30)
if resp.status_code == 200:
data = resp.json()
vulns = data.get("vulnerabilities", [])
self.collected_data.append({
"source": "CISA KEV",
"type": "vulnerability",
"count": len(vulns),
"collected_at": datetime.now().isoformat(),
"data": vulns,
})
print(f"[+] CISA KEV: {len(vulns)} known exploited vulnerabilities")
return vulns
return []
def collect_otx_pulses(self, api_key, days=7):
"""Collect recent OTX pulses."""
headers = {"X-OTX-API-KEY": api_key}
since = (datetime.now() - timedelta(days=days)).isoformat()
url = f"https://otx.alienvault.com/api/v1/pulses/subscribed?modified_since={since}"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
pulses = resp.json().get("results", [])
self.collected_data.append({
"source": "AlienVault OTX",
"type": "threat_intelligence",
"count": len(pulses),
"collected_at": datetime.now().isoformat(),
})
print(f"[+] OTX: {len(pulses)} pulses in last {days} days")
return pulses
return []
def collect_abuse_ch(self):
"""Collect recent malware samples from MalwareBazaar."""
url = "https://mb-api.abuse.ch/api/v1/"
resp = requests.post(url, data={"query": "get_recent", "selector": "time"}, timeout=30)
if resp.status_code == 200:
data = resp.json().get("data", [])
self.collected_data.append({
"source": "MalwareBazaar",
"type": "malware_samples",
"count": len(data),
"collected_at": datetime.now().isoformat(),
})
print(f"[+] MalwareBazaar: {len(data)} recent samples")
return data
return []
def get_collection_summary(self):
summary = {
"total_sRelated in Security
mac-ops
IncludedComprehensive macOS workstation operations — diagnose kernel panics, identify failing drives, audit launchd startup items, decode wake reasons, triage TCC permission denials, manage APFS snapshots, recover from no-boot. Use for: Mac is slow, slow bootup, won't boot, kernel panic, kernel_task hot, mds_stores CPU, photoanalysisd, cloudd, login loop, gray screen, sleep wake failure, drive failing, IO errors, APFS snapshots eating space, Time Machine local snapshots, Spotlight indexing, launchd, LaunchAgent, LaunchDaemon, login items, TCC permissions, Full Disk Access, Screen Recording denied, Gatekeeper, quarantine, com.apple.quarantine, app is damaged, helper tool, /Library/PrivilegedHelperTools, pmset, wake reasons, dark wake, sysdiagnose, panic.ips, DiagnosticReports, configuration profile, MDM profile, remote diagnostics over SSH.
a11y-audit
IncludedRun accessibility audits on web projects combining automated scanning (axe-core, Lighthouse) with WCAG 2.1 AA compliance mapping, manual check guidance, and structured reporting. Output is configurable: markdown report only, markdown plus machine-readable JSON, or markdown plus issue tracker integration. Use this skill whenever the user mentions "accessibility audit", "a11y audit", "WCAG audit", "accessibility check", "compliance scan", or asks to check a web project for accessibility issues. Also trigger when the user wants to verify WCAG conformance or map findings to a specific standard (CAN-ASC-6.2, EN 301 549, ADA/AODA).
erpclaw
IncludedAI-native ERP system with self-extending OS. Full accounting, invoicing, inventory, purchasing, tax, billing, HR, payroll, advanced accounting (ASC 606/842, intercompany, consolidation), and financial reporting. 413 actions across 14 domains, 43 expansion modules. Constitutional guardrails, adversarial audit, schema migration. Double-entry GL, immutable audit trail, US GAAP.
assess
IncludedAssesses and rates quality 0-10 across multiple dimensions (correctness, maintainability, security, performance, testability, simplicity) with pros/cons analysis. Compares against project conventions and prior decisions from memory. Produces structured evaluation reports with actionable improvement suggestions. Use when evaluating code, designs, architectures, or comparing alternative approaches.
spring-boot-security-jwt
IncludedProvides JWT authentication and authorization patterns for Spring Boot 3.5.x covering token generation with JJWT, Bearer/cookie authentication, database/OAuth2 integration, and RBAC/permission-based access control using Spring Security 6.x. Use when implementing authentication or authorization in Spring Boot applications.
code-hardcode-audit
IncludedDetect hardcoded values, magic numbers, and leaked secrets. TRIGGERS - hardcode audit, magic numbers, PLR2004, secret scanning.