Claude
Skills
Sign in
Back

implementing-api-rate-limiting-and-throttling

Included with Lifetime
$97 forever

Implements API rate limiting and throttling controls using token bucket, sliding window, and fixed window algorithms to protect against brute force attacks, credential stuffing, resource exhaustion, and API abuse. The engineer configures per-user, per-IP, and per-endpoint rate limits using Redis-backed counters, API gateway plugins, or application middleware, and implements proper HTTP 429 responses with Retry-After headers. Activates for requests involving rate limiting implementation, API throttling setup, request quota management, or API abuse prevention.

Backend & APIsapi-securityrate-limitingthrottlingredistoken-bucketabuse-preventionscripts

What this skill does

# Implementing API Rate Limiting and Throttling

## When to Use

- Protecting authentication endpoints against brute force and credential stuffing attacks
- Preventing API abuse and resource exhaustion from automated scripts and bots
- Implementing fair usage quotas for different API consumer tiers (free, premium, enterprise)
- Defending against denial-of-service attacks at the application layer
- Meeting compliance requirements that mandate API abuse prevention controls

**Do not use** rate limiting as the sole defense against attacks. Combine with authentication, authorization, and WAF rules.

## Prerequisites

- Redis 6.0+ for distributed rate limit counters (or in-memory for single-instance deployments)
- API framework (Express.js, FastAPI, Spring Boot, or Django REST Framework)
- Monitoring system for rate limit metrics (Prometheus, CloudWatch, Datadog)
- Understanding of the API's normal traffic patterns and peak usage
- Load testing tool (k6, Gatling, or Locust) for validating rate limit behavior

## Workflow

### Step 1: Rate Limiting Strategy Design

Define rate limits per endpoint category and user tier:

```python
# Rate limit configuration
RATE_LIMITS = {
    # Authentication endpoints (most restrictive)
    "auth": {
        "login": {"requests": 5, "window_seconds": 60, "by": "ip"},
        "register": {"requests": 3, "window_seconds": 300, "by": "ip"},
        "forgot_password": {"requests": 3, "window_seconds": 3600, "by": "ip"},
        "verify_mfa": {"requests": 5, "window_seconds": 300, "by": "user"},
    },
    # Standard API endpoints
    "api": {
        "free": {"requests": 60, "window_seconds": 60, "by": "user"},
        "premium": {"requests": 300, "window_seconds": 60, "by": "user"},
        "enterprise": {"requests": 1000, "window_seconds": 60, "by": "user"},
    },
    # Resource-intensive endpoints
    "expensive": {
        "search": {"requests": 10, "window_seconds": 60, "by": "user"},
        "export": {"requests": 5, "window_seconds": 3600, "by": "user"},
        "bulk_import": {"requests": 2, "window_seconds": 3600, "by": "user"},
    },
    # Global limits
    "global": {
        "per_ip": {"requests": 1000, "window_seconds": 60, "by": "ip"},
        "per_user": {"requests": 5000, "window_seconds": 3600, "by": "user"},
    },
}
```

### Step 2: Sliding Window Rate Limiter (Redis)

```python
import redis
import time
import hashlib
from functools import wraps
from flask import Flask, request, jsonify, g

app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

class SlidingWindowRateLimiter:
    """Sliding window rate limiter using Redis sorted sets."""

    def __init__(self, redis_conn):
        self.redis = redis_conn

    def is_allowed(self, key, max_requests, window_seconds):
        """Check if request is allowed and record it."""
        now = time.time()
        window_start = now - window_seconds
        pipe = self.redis.pipeline()

        # Remove expired entries
        pipe.zremrangebyscore(key, 0, window_start)
        # Count requests in current window
        pipe.zcard(key)
        # Add current request
        pipe.zadd(key, {f"{now}:{hashlib.md5(str(now).encode()).hexdigest()[:8]}": now})
        # Set TTL on the key
        pipe.expire(key, window_seconds + 1)

        results = pipe.execute()
        current_count = results[1]

        if current_count >= max_requests:
            # Calculate retry-after
            oldest = self.redis.zrange(key, 0, 0, withscores=True)
            if oldest:
                retry_after = int(oldest[0][1] + window_seconds - now) + 1
            else:
                retry_after = window_seconds
            return False, current_count, max_requests, retry_after

        return True, current_count + 1, max_requests, 0

rate_limiter = SlidingWindowRateLimiter(redis_client)

def rate_limit(max_requests, window_seconds, key_func=None):
    """Decorator for rate limiting API endpoints."""
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kwargs):
            # Determine the rate limit key
            if key_func:
                identifier = key_func()
            elif hasattr(g, 'user_id'):
                identifier = f"user:{g.user_id}"
            else:
                identifier = f"ip:{request.remote_addr}"

            key = f"ratelimit:{request.endpoint}:{identifier}"
            allowed, current, limit, retry_after = rate_limiter.is_allowed(
                key, max_requests, window_seconds)

            # Always set rate limit headers
            headers = {
                "X-RateLimit-Limit": str(limit),
                "X-RateLimit-Remaining": str(max(0, limit - current)),
                "X-RateLimit-Reset": str(int(time.time()) + window_seconds),
            }

            if not allowed:
                headers["Retry-After"] = str(retry_after)
                response = jsonify({
                    "error": "rate_limit_exceeded",
                    "message": "Too many requests. Please try again later.",
                    "retry_after": retry_after
                })
                response.status_code = 429
                for h, v in headers.items():
                    response.headers[h] = v
                return response

            response = f(*args, **kwargs)
            for h, v in headers.items():
                response.headers[h] = v
            return response
        return wrapped
    return decorator

# Apply rate limiting to endpoints
@app.route('/api/v1/auth/login', methods=['POST'])
@rate_limit(max_requests=5, window_seconds=60,
            key_func=lambda: f"ip:{request.remote_addr}")
def login():
    # Login logic
    return jsonify({"message": "Login successful"})

@app.route('/api/v1/users/me', methods=['GET'])
@rate_limit(max_requests=60, window_seconds=60)
def get_profile():
    # Profile logic
    return jsonify({"user": "data"})

@app.route('/api/v1/search', methods=['GET'])
@rate_limit(max_requests=10, window_seconds=60)
def search():
    # Search logic
    return jsonify({"results": []})
```

### Step 3: Token Bucket Rate Limiter

```python
import redis
import time

class TokenBucketRateLimiter:
    """Token bucket rate limiter allowing burst traffic within limits."""

    def __init__(self, redis_conn):
        self.redis = redis_conn

    def is_allowed(self, key, max_tokens, refill_rate, refill_interval=1):
        """
        Token bucket algorithm:
        - max_tokens: Maximum burst capacity
        - refill_rate: Tokens added per refill_interval
        - refill_interval: Seconds between refills
        """
        now = time.time()
        bucket_key = f"tb:{key}"

        # Lua script for atomic token bucket operation
        lua_script = """
        local key = KEYS[1]
        local max_tokens = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local refill_interval = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])

        local bucket = redis.call('hmget', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1])
        local last_refill = tonumber(bucket[2])

        if tokens == nil then
            tokens = max_tokens
            last_refill = now
        end

        -- Refill tokens
        local elapsed = now - last_refill
        local refills = math.floor(elapsed / refill_interval)
        if refills > 0 then
            tokens = math.min(max_tokens, tokens + (refills * refill_rate))
            last_refill = last_refill + (refills * refill_interval)
        end

        local allowed = 0
        if tokens >= 1 then
            tokens = tokens - 1
            allowed = 1
        end

        redis.call('hmset', key, 'tokens', tokens, 'last_refill', last_refill)
        redis.call('expire', key, math.ceil(max_tokens / refill_rate * refill_interval) + 10)

        return {allowed, tokens, max_tokens}
        """

        result = self.redis.eval(lua_script, 1, bucket_key,
         

Related in Backend & APIs