implementing-stix-taxii-feed-integration
STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence.
What this skill does
# Implementing STIX/TAXII Feed Integration
## Overview
STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence. This skill covers implementing a STIX/TAXII 2.1 feed consumer and producer using Python, configuring TAXII server discovery, collection management, polling for new intelligence, parsing STIX 2.1 objects, and integrating feeds into SIEM and TIP platforms.
## When to Use
- When deploying or configuring implementing stix taxii feed integration capabilities in your environment
- When establishing security controls aligned to compliance requirements
- When building or improving security architecture for this domain
- When conducting security assessments that require this implementation
## Prerequisites
- Python 3.9+ with `taxii2-client`, `stix2`, `cti-taxii-client` libraries
- Understanding of STIX 2.1 data model (SDOs, SCOs, SROs)
- Understanding of TAXII 2.1 protocol (discovery, API roots, collections)
- Network access to TAXII servers (MITRE ATT&CK TAXII, Anomali STAXX)
- Optional: medallion for running a local TAXII 2.1 server
## Key Concepts
### TAXII 2.1 Architecture
TAXII defines a RESTful API with three service types:
- **Discovery**: Returns information about available API roots
- **API Root**: Contains collections and serves as the main interaction point
- **Collection**: A logical grouping of STIX objects accessible via GET/POST
### STIX 2.1 Object Model
STIX objects are categorized as:
- **SDOs (STIX Domain Objects)**: Indicator, Malware, Threat Actor, Campaign, Attack Pattern, Tool, Infrastructure, Vulnerability, Identity, Location, Note, Opinion, Report, Grouping
- **SCOs (STIX Cyber Observables)**: IPv4-Addr, Domain-Name, URL, File, Email-Addr, Process, Network-Traffic, Artifact
- **SROs (STIX Relationship Objects)**: Relationship, Sighting
- **Meta Objects**: Marking Definition (TLP), Language Content, Extension Definition
### STIX Bundle
A Bundle is a collection of STIX objects transmitted together. Bundles have a unique ID and contain an array of objects. TAXII collections serve bundles in response to GET requests.
## Workflow
### Step 1: TAXII Server Discovery
```python
from taxii2client.v21 import Server, Collection, as_pages
# Connect to MITRE ATT&CK TAXII server
server = Server("https://cti-taxii.mitre.org/taxii2/", user="", password="")
print(f"Title: {server.title}")
print(f"Description: {server.description}")
# List API roots
for api_root in server.api_roots:
print(f"\nAPI Root: {api_root.title}")
print(f" URL: {api_root.url}")
# List collections
for collection in api_root.collections:
print(f" Collection: {collection.title} (ID: {collection.id})")
print(f" Can Read: {collection.can_read}")
print(f" Can Write: {collection.can_write}")
```
### Step 2: Fetch STIX Objects from Collection
```python
from taxii2client.v21 import Collection, as_pages
import json
# Connect to Enterprise ATT&CK collection
ENTERPRISE_ATTACK_ID = "95ecc380-afe9-11e4-9b6c-751b66dd541e"
collection = Collection(
f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/",
user="",
password="",
)
print(f"Collection: {collection.title}")
# Fetch all objects (paginated)
all_objects = []
for envelope in as_pages(collection.get_objects, per_request=50):
objects = envelope.get("objects", [])
all_objects.extend(objects)
print(f" Fetched {len(objects)} objects (total: {len(all_objects)})")
print(f"\nTotal objects retrieved: {len(all_objects)}")
# Categorize by type
type_counts = {}
for obj in all_objects:
obj_type = obj.get("type", "unknown")
type_counts[obj_type] = type_counts.get(obj_type, 0) + 1
for obj_type, count in sorted(type_counts.items()):
print(f" {obj_type}: {count}")
```
### Step 3: Parse STIX 2.1 Objects with stix2 Library
```python
from stix2 import parse, Filter, MemoryStore
# Load objects into a MemoryStore for querying
store = MemoryStore(stix_data=all_objects)
# Query for all indicators
indicators = store.query([Filter("type", "=", "indicator")])
print(f"Indicators: {len(indicators)}")
for ind in indicators[:5]:
print(f" {ind.name}: {ind.pattern}")
# Query for malware
malware_list = store.query([Filter("type", "=", "malware")])
print(f"\nMalware families: {len(malware_list)}")
# Query for threat actors
actors = store.query([Filter("type", "=", "intrusion-set")])
print(f"Threat actors: {len(actors)}")
# Find relationships for a specific object
def get_related(store, source_id):
relationships = store.query([
Filter("type", "=", "relationship"),
Filter("source_ref", "=", source_id),
])
return relationships
# Example: Get all techniques used by APT28
apt28 = store.query([
Filter("type", "=", "intrusion-set"),
Filter("name", "=", "APT28"),
])
if apt28:
rels = get_related(store, apt28[0].id)
for rel in rels:
target = store.get(rel.target_ref)
if target:
print(f" {rel.relationship_type} -> {target.name} ({target.type})")
```
### Step 4: Implement Custom TAXII Consumer
```python
from taxii2client.v21 import Collection, as_pages
from stix2 import parse, Bundle
from datetime import datetime, timedelta
import json
class TAXIIConsumer:
"""Consume STIX/TAXII 2.1 feeds and extract IOCs."""
def __init__(self, collection_url, user="", password=""):
self.collection = Collection(collection_url, user=user, password=password)
self.last_poll = None
def poll_new_objects(self, added_after=None):
"""Poll for objects added after a specific timestamp."""
if added_after is None:
added_after = (
self.last_poll or
(datetime.utcnow() - timedelta(days=1)).strftime(
"%Y-%m-%dT%H:%M:%S.000Z"
)
)
all_objects = []
kwargs = {"added_after": added_after}
for envelope in as_pages(
self.collection.get_objects, per_request=100, **kwargs
):
objects = envelope.get("objects", [])
all_objects.extend(objects)
self.last_poll = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
return all_objects
def extract_indicators(self, objects):
"""Extract actionable indicators from STIX objects."""
indicators = []
for obj in objects:
if obj.get("type") == "indicator":
indicators.append({
"id": obj.get("id"),
"name": obj.get("name", ""),
"pattern": obj.get("pattern", ""),
"pattern_type": obj.get("pattern_type", ""),
"valid_from": obj.get("valid_from", ""),
"valid_until": obj.get("valid_until", ""),
"indicator_types": obj.get("indicator_types", []),
"confidence": obj.get("confidence", 0),
"labels": obj.get("labels", []),
})
return indicators
def extract_observables(self, objects):
"""Extract STIX Cyber Observables."""
observables = []
observable_types = {
"ipv4-addr", "ipv6-addr", "domain-name", "url",
"file", "email-addr", "network-traffic",
}
for obj in objects:
if obj.get("type") in observable_types:
observables.append({
"type": obj["type"],
"value": obj.get("value", ""),
"id": obj.get("id"),
})
return observables
# Usage
consumer = TAXIIConsumer(
f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/"
)
new_objects = consumer.poll_new_objects()
indicators = consumer.extract_indicators(new_objects)
print(f"New indicators: {len(indicators)}")
```
### Step 5: Set Related in Security
mac-ops
IncludedComprehensive macOS workstation operations — diagnose kernel panics, identify failing drives, audit launchd startup items, decode wake reasons, triage TCC permission denials, manage APFS snapshots, recover from no-boot. Use for: Mac is slow, slow bootup, won't boot, kernel panic, kernel_task hot, mds_stores CPU, photoanalysisd, cloudd, login loop, gray screen, sleep wake failure, drive failing, IO errors, APFS snapshots eating space, Time Machine local snapshots, Spotlight indexing, launchd, LaunchAgent, LaunchDaemon, login items, TCC permissions, Full Disk Access, Screen Recording denied, Gatekeeper, quarantine, com.apple.quarantine, app is damaged, helper tool, /Library/PrivilegedHelperTools, pmset, wake reasons, dark wake, sysdiagnose, panic.ips, DiagnosticReports, configuration profile, MDM profile, remote diagnostics over SSH.
a11y-audit
IncludedRun accessibility audits on web projects combining automated scanning (axe-core, Lighthouse) with WCAG 2.1 AA compliance mapping, manual check guidance, and structured reporting. Output is configurable: markdown report only, markdown plus machine-readable JSON, or markdown plus issue tracker integration. Use this skill whenever the user mentions "accessibility audit", "a11y audit", "WCAG audit", "accessibility check", "compliance scan", or asks to check a web project for accessibility issues. Also trigger when the user wants to verify WCAG conformance or map findings to a specific standard (CAN-ASC-6.2, EN 301 549, ADA/AODA).
erpclaw
IncludedAI-native ERP system with self-extending OS. Full accounting, invoicing, inventory, purchasing, tax, billing, HR, payroll, advanced accounting (ASC 606/842, intercompany, consolidation), and financial reporting. 413 actions across 14 domains, 43 expansion modules. Constitutional guardrails, adversarial audit, schema migration. Double-entry GL, immutable audit trail, US GAAP.
assess
IncludedAssesses and rates quality 0-10 across multiple dimensions (correctness, maintainability, security, performance, testability, simplicity) with pros/cons analysis. Compares against project conventions and prior decisions from memory. Produces structured evaluation reports with actionable improvement suggestions. Use when evaluating code, designs, architectures, or comparing alternative approaches.
spring-boot-security-jwt
IncludedProvides JWT authentication and authorization patterns for Spring Boot 3.5.x covering token generation with JJWT, Bearer/cookie authentication, database/OAuth2 integration, and RBAC/permission-based access control using Spring Security 6.x. Use when implementing authentication or authorization in Spring Boot applications.
code-hardcode-audit
IncludedDetect hardcoded values, magic numbers, and leaked secrets. TRIGGERS - hardcode audit, magic numbers, PLR2004, secret scanning.