Claude
Skills
Sign in
Back

implementing-stix-taxii-feed-integration

Included with Lifetime
$97 forever

STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence.

Securitythreat-intelligencectiiocmitre-attackstixtaxiifeed-integrationoasisscriptsassets

What this skill does

# Implementing STIX/TAXII Feed Integration

## Overview

STIX (Structured Threat Information eXpression) and TAXII (Trusted Automated eXchange of Intelligence Information) are OASIS open standards for representing and transporting cyber threat intelligence. This skill covers implementing a STIX/TAXII 2.1 feed consumer and producer using Python, configuring TAXII server discovery, collection management, polling for new intelligence, parsing STIX 2.1 objects, and integrating feeds into SIEM and TIP platforms.


## When to Use

- When deploying or configuring implementing stix taxii feed integration capabilities in your environment
- When establishing security controls aligned to compliance requirements
- When building or improving security architecture for this domain
- When conducting security assessments that require this implementation

## Prerequisites

- Python 3.9+ with `taxii2-client`, `stix2`, `cti-taxii-client` libraries
- Understanding of STIX 2.1 data model (SDOs, SCOs, SROs)
- Understanding of TAXII 2.1 protocol (discovery, API roots, collections)
- Network access to TAXII servers (MITRE ATT&CK TAXII, Anomali STAXX)
- Optional: medallion for running a local TAXII 2.1 server

## Key Concepts

### TAXII 2.1 Architecture

TAXII defines a RESTful API with three service types:
- **Discovery**: Returns information about available API roots
- **API Root**: Contains collections and serves as the main interaction point
- **Collection**: A logical grouping of STIX objects accessible via GET/POST

### STIX 2.1 Object Model

STIX objects are categorized as:
- **SDOs (STIX Domain Objects)**: Indicator, Malware, Threat Actor, Campaign, Attack Pattern, Tool, Infrastructure, Vulnerability, Identity, Location, Note, Opinion, Report, Grouping
- **SCOs (STIX Cyber Observables)**: IPv4-Addr, Domain-Name, URL, File, Email-Addr, Process, Network-Traffic, Artifact
- **SROs (STIX Relationship Objects)**: Relationship, Sighting
- **Meta Objects**: Marking Definition (TLP), Language Content, Extension Definition

### STIX Bundle

A Bundle is a collection of STIX objects transmitted together. Bundles have a unique ID and contain an array of objects. TAXII collections serve bundles in response to GET requests.

## Workflow

### Step 1: TAXII Server Discovery

```python
from taxii2client.v21 import Server, Collection, as_pages

# Connect to MITRE ATT&CK TAXII server
server = Server("https://cti-taxii.mitre.org/taxii2/", user="", password="")

print(f"Title: {server.title}")
print(f"Description: {server.description}")

# List API roots
for api_root in server.api_roots:
    print(f"\nAPI Root: {api_root.title}")
    print(f"  URL: {api_root.url}")

    # List collections
    for collection in api_root.collections:
        print(f"  Collection: {collection.title} (ID: {collection.id})")
        print(f"    Can Read: {collection.can_read}")
        print(f"    Can Write: {collection.can_write}")
```

### Step 2: Fetch STIX Objects from Collection

```python
from taxii2client.v21 import Collection, as_pages
import json

# Connect to Enterprise ATT&CK collection
ENTERPRISE_ATTACK_ID = "95ecc380-afe9-11e4-9b6c-751b66dd541e"
collection = Collection(
    f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/",
    user="",
    password="",
)

print(f"Collection: {collection.title}")

# Fetch all objects (paginated)
all_objects = []
for envelope in as_pages(collection.get_objects, per_request=50):
    objects = envelope.get("objects", [])
    all_objects.extend(objects)
    print(f"  Fetched {len(objects)} objects (total: {len(all_objects)})")

print(f"\nTotal objects retrieved: {len(all_objects)}")

# Categorize by type
type_counts = {}
for obj in all_objects:
    obj_type = obj.get("type", "unknown")
    type_counts[obj_type] = type_counts.get(obj_type, 0) + 1

for obj_type, count in sorted(type_counts.items()):
    print(f"  {obj_type}: {count}")
```

### Step 3: Parse STIX 2.1 Objects with stix2 Library

```python
from stix2 import parse, Filter, MemoryStore

# Load objects into a MemoryStore for querying
store = MemoryStore(stix_data=all_objects)

# Query for all indicators
indicators = store.query([Filter("type", "=", "indicator")])
print(f"Indicators: {len(indicators)}")

for ind in indicators[:5]:
    print(f"  {ind.name}: {ind.pattern}")

# Query for malware
malware_list = store.query([Filter("type", "=", "malware")])
print(f"\nMalware families: {len(malware_list)}")

# Query for threat actors
actors = store.query([Filter("type", "=", "intrusion-set")])
print(f"Threat actors: {len(actors)}")

# Find relationships for a specific object
def get_related(store, source_id):
    relationships = store.query([
        Filter("type", "=", "relationship"),
        Filter("source_ref", "=", source_id),
    ])
    return relationships

# Example: Get all techniques used by APT28
apt28 = store.query([
    Filter("type", "=", "intrusion-set"),
    Filter("name", "=", "APT28"),
])
if apt28:
    rels = get_related(store, apt28[0].id)
    for rel in rels:
        target = store.get(rel.target_ref)
        if target:
            print(f"  {rel.relationship_type} -> {target.name} ({target.type})")
```

### Step 4: Implement Custom TAXII Consumer

```python
from taxii2client.v21 import Collection, as_pages
from stix2 import parse, Bundle
from datetime import datetime, timedelta
import json

class TAXIIConsumer:
    """Consume STIX/TAXII 2.1 feeds and extract IOCs."""

    def __init__(self, collection_url, user="", password=""):
        self.collection = Collection(collection_url, user=user, password=password)
        self.last_poll = None

    def poll_new_objects(self, added_after=None):
        """Poll for objects added after a specific timestamp."""
        if added_after is None:
            added_after = (
                self.last_poll or
                (datetime.utcnow() - timedelta(days=1)).strftime(
                    "%Y-%m-%dT%H:%M:%S.000Z"
                )
            )

        all_objects = []
        kwargs = {"added_after": added_after}

        for envelope in as_pages(
            self.collection.get_objects, per_request=100, **kwargs
        ):
            objects = envelope.get("objects", [])
            all_objects.extend(objects)

        self.last_poll = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
        return all_objects

    def extract_indicators(self, objects):
        """Extract actionable indicators from STIX objects."""
        indicators = []
        for obj in objects:
            if obj.get("type") == "indicator":
                indicators.append({
                    "id": obj.get("id"),
                    "name": obj.get("name", ""),
                    "pattern": obj.get("pattern", ""),
                    "pattern_type": obj.get("pattern_type", ""),
                    "valid_from": obj.get("valid_from", ""),
                    "valid_until": obj.get("valid_until", ""),
                    "indicator_types": obj.get("indicator_types", []),
                    "confidence": obj.get("confidence", 0),
                    "labels": obj.get("labels", []),
                })
        return indicators

    def extract_observables(self, objects):
        """Extract STIX Cyber Observables."""
        observables = []
        observable_types = {
            "ipv4-addr", "ipv6-addr", "domain-name", "url",
            "file", "email-addr", "network-traffic",
        }
        for obj in objects:
            if obj.get("type") in observable_types:
                observables.append({
                    "type": obj["type"],
                    "value": obj.get("value", ""),
                    "id": obj.get("id"),
                })
        return observables


# Usage
consumer = TAXIIConsumer(
    f"https://cti-taxii.mitre.org/stix/collections/{ENTERPRISE_ATTACK_ID}/"
)
new_objects = consumer.poll_new_objects()
indicators = consumer.extract_indicators(new_objects)
print(f"New indicators: {len(indicators)}")
```

### Step 5: Set 

Related in Security