Claude
Skills
Sign in
Back

implementing-api-key-security-controls

Included with Lifetime
$97 forever

Implements secure API key generation, storage, rotation, and revocation controls to protect API authentication credentials from leakage, brute force, and abuse. The engineer designs API key formats with sufficient entropy, implements secure hashing for storage, enforces per-key scoping and rate limiting, monitors for leaked keys in public repositories, and builds key rotation workflows. Activates for requests involving API key management, API key security, key rotation policy, or API credential protection.

Backend & APIsapi-securityapi-keyscredential-managementkey-rotationsecret-managementscripts

What this skill does

# Implementing API Key Security Controls

## When to Use

- Designing secure API key generation with sufficient entropy and identifiable prefixes for leak detection
- Implementing server-side API key hashing (never storing keys in plaintext) with SHA-256 or bcrypt
- Building key rotation workflows that allow zero-downtime key replacement for API consumers
- Configuring per-key scoping to limit each API key to specific endpoints, IP ranges, and rate limits
- Setting up automated monitoring for API key leakage in GitHub repos, logs, and client-side code

**Do not use** API keys as the sole authentication mechanism for user-facing applications. API keys are best suited for server-to-server communication and developer access.

## Prerequisites

- Secure random number generator (os.urandom, secrets module) for key generation
- Database with proper encryption at rest for storing hashed API keys
- Redis or similar store for key-to-metadata caching and rate limiting
- Secret scanning tools (GitHub secret scanning, truffleHog, gitleaks)
- Monitoring and alerting infrastructure for key usage anomalies

## Workflow

### Step 1: Secure API Key Generation

```python
import secrets
import hashlib
import hmac
import time
import json
from datetime import datetime, timedelta

class APIKeyManager:
    """Manages secure API key lifecycle: generation, storage, validation, rotation."""

    # Key format: prefix_base64random (e.g., sk_live_a1b2c3d4e5f6...)
    # Prefix identifies the key type and environment for leak detection
    KEY_PREFIXES = {
        "live_secret": "sk_live_",
        "test_secret": "sk_test_",
        "live_public": "pk_live_",
        "test_public": "pk_test_",
    }

    def __init__(self, db_connection, redis_connection):
        self.db = db_connection
        self.redis = redis_connection

    def generate_key(self, key_type="live_secret", owner_id=None, scopes=None,
                     rate_limit=None, ip_allowlist=None, expires_days=365):
        """Generate a new API key with metadata."""
        prefix = self.KEY_PREFIXES.get(key_type, "sk_live_")

        # Generate 32 bytes (256 bits) of randomness
        random_bytes = secrets.token_bytes(32)
        key_body = secrets.token_urlsafe(32)  # Base64url-encoded

        # Full API key that the client receives (shown only once)
        full_key = f"{prefix}{key_body}"

        # Hash the key for storage (never store the raw key)
        key_hash = hashlib.sha256(full_key.encode()).hexdigest()

        # Create a short key ID for reference (first 8 chars)
        key_id = f"{prefix}{key_body[:8]}..."

        # Store the hashed key with metadata
        key_metadata = {
            "key_hash": key_hash,
            "key_id": key_id,
            "key_type": key_type,
            "owner_id": owner_id,
            "scopes": scopes or ["read"],
            "rate_limit": rate_limit or {"requests": 1000, "window": 3600},
            "ip_allowlist": ip_allowlist or [],
            "created_at": datetime.utcnow().isoformat(),
            "expires_at": (datetime.utcnow() + timedelta(days=expires_days)).isoformat(),
            "last_used": None,
            "is_active": True,
            "usage_count": 0,
        }

        # Store in database
        self.db.execute(
            "INSERT INTO api_keys (key_hash, key_id, metadata) VALUES (?, ?, ?)",
            (key_hash, key_id, json.dumps(key_metadata))
        )

        # Cache in Redis for fast validation
        self.redis.setex(
            f"apikey:{key_hash}",
            86400,  # 24-hour cache TTL
            json.dumps(key_metadata)
        )

        return {
            "api_key": full_key,       # Show to user ONCE
            "key_id": key_id,          # For reference/management
            "scopes": key_metadata["scopes"],
            "expires_at": key_metadata["expires_at"],
        }

    def validate_key(self, api_key):
        """Validate an API key and return its metadata."""
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()

        # Check Redis cache first
        cached = self.redis.get(f"apikey:{key_hash}")
        if cached:
            metadata = json.loads(cached)
        else:
            # Fall back to database
            row = self.db.execute(
                "SELECT metadata FROM api_keys WHERE key_hash = ?",
                (key_hash,)
            ).fetchone()
            if not row:
                return None, "invalid_key"
            metadata = json.loads(row[0])
            # Refresh cache
            self.redis.setex(f"apikey:{key_hash}", 86400, row[0])

        # Validation checks
        if not metadata.get("is_active"):
            return None, "key_revoked"

        if metadata.get("expires_at"):
            if datetime.fromisoformat(metadata["expires_at"]) < datetime.utcnow():
                return None, "key_expired"

        # Update last used
        metadata["last_used"] = datetime.utcnow().isoformat()
        metadata["usage_count"] = metadata.get("usage_count", 0) + 1
        self.redis.setex(f"apikey:{key_hash}", 86400, json.dumps(metadata))

        return metadata, "valid"

    def revoke_key(self, key_id):
        """Immediately revoke an API key."""
        row = self.db.execute(
            "SELECT key_hash, metadata FROM api_keys WHERE key_id = ?",
            (key_id,)
        ).fetchone()
        if row:
            key_hash = row[0]
            metadata = json.loads(row[1])
            metadata["is_active"] = False
            metadata["revoked_at"] = datetime.utcnow().isoformat()

            self.db.execute(
                "UPDATE api_keys SET metadata = ? WHERE key_id = ?",
                (json.dumps(metadata), key_id)
            )
            # Invalidate cache immediately
            self.redis.delete(f"apikey:{key_hash}")
            return True
        return False

    def rotate_key(self, old_key_id, grace_period_hours=24):
        """Rotate an API key with a grace period where both old and new keys work."""
        old_row = self.db.execute(
            "SELECT key_hash, metadata FROM api_keys WHERE key_id = ?",
            (old_key_id,)
        ).fetchone()
        if not old_row:
            return None, "key_not_found"

        old_metadata = json.loads(old_row[1])

        # Generate new key with same settings
        new_key_data = self.generate_key(
            key_type=old_metadata["key_type"],
            owner_id=old_metadata["owner_id"],
            scopes=old_metadata["scopes"],
            rate_limit=old_metadata["rate_limit"],
            ip_allowlist=old_metadata["ip_allowlist"],
        )

        # Schedule old key revocation after grace period
        revoke_at = datetime.utcnow() + timedelta(hours=grace_period_hours)
        old_metadata["scheduled_revocation"] = revoke_at.isoformat()
        self.db.execute(
            "UPDATE api_keys SET metadata = ? WHERE key_id = ?",
            (json.dumps(old_metadata), old_key_id)
        )

        return {
            "new_key": new_key_data,
            "old_key_id": old_key_id,
            "old_key_revokes_at": revoke_at.isoformat(),
            "message": f"Old key will be revoked in {grace_period_hours} hours"
        }, "success"
```

### Step 2: API Key Validation Middleware

```python
from flask import Flask, request, jsonify, g
from functools import wraps

app = Flask(__name__)

def require_api_key(required_scopes=None):
    """Middleware to validate API key and check scopes."""
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kwargs):
            # Extract API key from header
            api_key = request.headers.get("X-API-Key")
            if not api_key:
                # Also check Authorization: Bearer <key>
                auth_header = request.headers.get("Authorization", "")
                if auth_header.startswith("Bearer "):
                    api_key = auth_header[7:]

            if not api_key:
                return jsonify({"error": "missing_api_key"}),

Related in Backend & APIs