Claude
Skills
Sign in
Back

detecting-anomalous-authentication-patterns

Included with Lifetime
$97 forever

Detects anomalous authentication patterns using UEBA analytics, statistical baselines, and machine learning models to identify impossible travel, credential stuffing, brute force, password spraying, and compromised account behaviors across authentication logs. Activates for requests involving authentication anomaly detection, login behavior analysis, UEBA implementation, or suspicious sign-in investigation.

Data & AnalyticsUEBAauthentication-anomalyimpossible-travelbrute-forcecredential-stuffingbehavioral-analyticsscripts

What this skill does


# Detecting Anomalous Authentication Patterns

## When to Use

- Security operations needs to identify compromised accounts from authentication log analysis
- Implementing impossible travel detection to flag geographically inconsistent logins
- Detecting brute force, password spraying, and credential stuffing attacks in real time
- Building behavioral baselines for users to identify deviations indicating account compromise
- Correlating authentication anomalies with threat intelligence for lateral movement detection
- Investigating alerts from SIEM or IdP for suspicious sign-in activity

**Do not use** for static rule-based alerting on single failed logins; anomaly detection requires statistical baselines across time and entity dimensions to reduce false positives.

## Prerequisites

- Authentication log sources (Azure AD/Entra ID sign-in logs, Okta system logs, Active Directory event logs 4624/4625/4648/4768/4771)
- SIEM platform (Splunk, Microsoft Sentinel, Elastic SIEM) with at least 90 days of baseline data
- GeoIP database for location-based anomaly detection (MaxMind GeoLite2 or IP2Location)
- Python 3.9+ with pandas, scikit-learn, and scipy for custom analytics
- User identity context (department, role, typical work hours, location)

## Workflow

### Step 1: Collect and Normalize Authentication Logs

Aggregate authentication events from all identity sources:

```python
import pandas as pd
import json
from datetime import datetime, timedelta
from collections import defaultdict

# Parse authentication logs from multiple sources
def normalize_auth_logs(log_source, raw_logs):
    """Normalize authentication events to a common schema."""
    normalized = []

    for event in raw_logs:
        if log_source == "azure_ad":
            normalized.append({
                "timestamp": event["createdDateTime"],
                "user": event["userPrincipalName"],
                "source_ip": event["ipAddress"],
                "location": {
                    "city": event.get("location", {}).get("city"),
                    "state": event.get("location", {}).get("state"),
                    "country": event.get("location", {}).get("countryOrRegion"),
                    "lat": event.get("location", {}).get("geoCoordinates", {}).get("latitude"),
                    "lon": event.get("location", {}).get("geoCoordinates", {}).get("longitude")
                },
                "result": "success" if event["status"]["errorCode"] == 0 else "failure",
                "failure_reason": event["status"].get("failureReason", ""),
                "app": event.get("appDisplayName", "Unknown"),
                "device": event.get("deviceDetail", {}).get("operatingSystem", "Unknown"),
                "browser": event.get("deviceDetail", {}).get("browser", "Unknown"),
                "mfa_result": event.get("authenticationDetails", [{}])[0].get("succeeded", None),
                "risk_level": event.get("riskLevelDuringSignIn", "none"),
                "client_app": event.get("clientAppUsed", "Unknown"),
                "source": "azure_ad"
            })
        elif log_source == "okta":
            normalized.append({
                "timestamp": event["published"],
                "user": event["actor"]["alternateId"],
                "source_ip": event["client"]["ipAddress"],
                "location": {
                    "city": event["client"].get("geographicalContext", {}).get("city"),
                    "state": event["client"].get("geographicalContext", {}).get("state"),
                    "country": event["client"].get("geographicalContext", {}).get("country"),
                    "lat": event["client"].get("geographicalContext", {}).get("geolocation", {}).get("lat"),
                    "lon": event["client"].get("geographicalContext", {}).get("geolocation", {}).get("lon")
                },
                "result": "success" if event["outcome"]["result"] == "SUCCESS" else "failure",
                "failure_reason": event["outcome"].get("reason", ""),
                "app": event.get("target", [{}])[0].get("displayName", "Unknown"),
                "device": event["client"].get("device", "Unknown"),
                "browser": event["client"].get("userAgent", {}).get("browser", "Unknown"),
                "source": "okta"
            })
        elif log_source == "windows_ad":
            normalized.append({
                "timestamp": event["TimeCreated"],
                "user": event["TargetUserName"],
                "source_ip": event.get("IpAddress", ""),
                "location": None,  # Requires GeoIP enrichment
                "result": "success" if event["EventId"] in [4624, 4648] else "failure",
                "failure_reason": event.get("FailureReason", ""),
                "logon_type": event.get("LogonType", ""),
                "source": "windows_ad"
            })

    return pd.DataFrame(normalized)

# Enrich with GeoIP data for Windows AD logs missing location
import geoip2.database

def enrich_geoip(df, geoip_db_path="/opt/geoip/GeoLite2-City.mmdb"):
    """Add geolocation data to events missing location information."""
    reader = geoip2.database.Reader(geoip_db_path)

    for idx, row in df.iterrows():
        if row["location"] is None and row["source_ip"]:
            try:
                response = reader.city(row["source_ip"])
                df.at[idx, "location"] = {
                    "city": response.city.name,
                    "country": response.country.iso_code,
                    "lat": response.location.latitude,
                    "lon": response.location.longitude
                }
            except Exception:
                pass

    reader.close()
    return df
```

### Step 2: Detect Impossible Travel Anomalies

Identify logins from geographically impossible locations:

```python
from math import radians, sin, cos, sqrt, atan2
from datetime import datetime

def haversine_distance(lat1, lon1, lat2, lon2):
    """Calculate great-circle distance between two points in km."""
    R = 6371  # Earth's radius in kilometers

    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))

    return R * c

def detect_impossible_travel(df, max_speed_kmh=900):
    """
    Detect impossible travel events where a user authenticates from
    two locations faster than physically possible.

    max_speed_kmh: Maximum realistic travel speed (900 km/h ~= commercial flight)
    """
    alerts = []

    # Sort by user and timestamp
    df_sorted = df.sort_values(["user", "timestamp"])

    for user, user_events in df_sorted.groupby("user"):
        successful_events = user_events[user_events["result"] == "success"]

        for i in range(1, len(successful_events)):
            prev = successful_events.iloc[i-1]
            curr = successful_events.iloc[i]

            # Skip if location data is missing
            if not prev.get("location") or not curr.get("location"):
                continue
            if not prev["location"].get("lat") or not curr["location"].get("lat"):
                continue

            # Calculate distance and time delta
            distance_km = haversine_distance(
                prev["location"]["lat"], prev["location"]["lon"],
                curr["location"]["lat"], curr["location"]["lon"]
            )

            time_diff = (pd.Timestamp(curr["timestamp"]) -
                        pd.Timestamp(prev["timestamp"])).total_seconds() / 3600

            if time_diff <= 0:
                continue

            required_speed = distance_km / time_diff

            # Flag if required speed exceeds maximum realistic travel
            if required_speed > max_speed_kmh and distance_km > 100:
                alerts.append({
                    "alert_type": "IMPOSSIBLE_TRAVEL",
                    "severity": "HIGH",
                   

Related in Data & Analytics