detecting-anomalous-authentication-patterns
Detects anomalous authentication patterns using UEBA analytics, statistical baselines, and machine learning models to identify impossible travel, credential stuffing, brute force, password spraying, and compromised account behaviors across authentication logs. Activates for requests involving authentication anomaly detection, login behavior analysis, UEBA implementation, or suspicious sign-in investigation.
What this skill does
# Detecting Anomalous Authentication Patterns
## When to Use
- Security operations needs to identify compromised accounts from authentication log analysis
- Implementing impossible travel detection to flag geographically inconsistent logins
- Detecting brute force, password spraying, and credential stuffing attacks in real time
- Building behavioral baselines for users to identify deviations indicating account compromise
- Correlating authentication anomalies with threat intelligence for lateral movement detection
- Investigating alerts from SIEM or IdP for suspicious sign-in activity
**Do not use** for static rule-based alerting on single failed logins; anomaly detection requires statistical baselines across time and entity dimensions to reduce false positives.
## Prerequisites
- Authentication log sources (Azure AD/Entra ID sign-in logs, Okta system logs, Active Directory event logs 4624/4625/4648/4768/4771)
- SIEM platform (Splunk, Microsoft Sentinel, Elastic SIEM) with at least 90 days of baseline data
- GeoIP database for location-based anomaly detection (MaxMind GeoLite2 or IP2Location)
- Python 3.9+ with pandas, scikit-learn, and scipy for custom analytics
- User identity context (department, role, typical work hours, location)
## Workflow
### Step 1: Collect and Normalize Authentication Logs
Aggregate authentication events from all identity sources:
```python
import pandas as pd
import json
from datetime import datetime, timedelta
from collections import defaultdict
# Parse authentication logs from multiple sources
def normalize_auth_logs(log_source, raw_logs):
"""Normalize authentication events to a common schema."""
normalized = []
for event in raw_logs:
if log_source == "azure_ad":
normalized.append({
"timestamp": event["createdDateTime"],
"user": event["userPrincipalName"],
"source_ip": event["ipAddress"],
"location": {
"city": event.get("location", {}).get("city"),
"state": event.get("location", {}).get("state"),
"country": event.get("location", {}).get("countryOrRegion"),
"lat": event.get("location", {}).get("geoCoordinates", {}).get("latitude"),
"lon": event.get("location", {}).get("geoCoordinates", {}).get("longitude")
},
"result": "success" if event["status"]["errorCode"] == 0 else "failure",
"failure_reason": event["status"].get("failureReason", ""),
"app": event.get("appDisplayName", "Unknown"),
"device": event.get("deviceDetail", {}).get("operatingSystem", "Unknown"),
"browser": event.get("deviceDetail", {}).get("browser", "Unknown"),
"mfa_result": event.get("authenticationDetails", [{}])[0].get("succeeded", None),
"risk_level": event.get("riskLevelDuringSignIn", "none"),
"client_app": event.get("clientAppUsed", "Unknown"),
"source": "azure_ad"
})
elif log_source == "okta":
normalized.append({
"timestamp": event["published"],
"user": event["actor"]["alternateId"],
"source_ip": event["client"]["ipAddress"],
"location": {
"city": event["client"].get("geographicalContext", {}).get("city"),
"state": event["client"].get("geographicalContext", {}).get("state"),
"country": event["client"].get("geographicalContext", {}).get("country"),
"lat": event["client"].get("geographicalContext", {}).get("geolocation", {}).get("lat"),
"lon": event["client"].get("geographicalContext", {}).get("geolocation", {}).get("lon")
},
"result": "success" if event["outcome"]["result"] == "SUCCESS" else "failure",
"failure_reason": event["outcome"].get("reason", ""),
"app": event.get("target", [{}])[0].get("displayName", "Unknown"),
"device": event["client"].get("device", "Unknown"),
"browser": event["client"].get("userAgent", {}).get("browser", "Unknown"),
"source": "okta"
})
elif log_source == "windows_ad":
normalized.append({
"timestamp": event["TimeCreated"],
"user": event["TargetUserName"],
"source_ip": event.get("IpAddress", ""),
"location": None, # Requires GeoIP enrichment
"result": "success" if event["EventId"] in [4624, 4648] else "failure",
"failure_reason": event.get("FailureReason", ""),
"logon_type": event.get("LogonType", ""),
"source": "windows_ad"
})
return pd.DataFrame(normalized)
# Enrich with GeoIP data for Windows AD logs missing location
import geoip2.database
def enrich_geoip(df, geoip_db_path="/opt/geoip/GeoLite2-City.mmdb"):
"""Add geolocation data to events missing location information."""
reader = geoip2.database.Reader(geoip_db_path)
for idx, row in df.iterrows():
if row["location"] is None and row["source_ip"]:
try:
response = reader.city(row["source_ip"])
df.at[idx, "location"] = {
"city": response.city.name,
"country": response.country.iso_code,
"lat": response.location.latitude,
"lon": response.location.longitude
}
except Exception:
pass
reader.close()
return df
```
### Step 2: Detect Impossible Travel Anomalies
Identify logins from geographically impossible locations:
```python
from math import radians, sin, cos, sqrt, atan2
from datetime import datetime
def haversine_distance(lat1, lon1, lat2, lon2):
"""Calculate great-circle distance between two points in km."""
R = 6371 # Earth's radius in kilometers
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def detect_impossible_travel(df, max_speed_kmh=900):
"""
Detect impossible travel events where a user authenticates from
two locations faster than physically possible.
max_speed_kmh: Maximum realistic travel speed (900 km/h ~= commercial flight)
"""
alerts = []
# Sort by user and timestamp
df_sorted = df.sort_values(["user", "timestamp"])
for user, user_events in df_sorted.groupby("user"):
successful_events = user_events[user_events["result"] == "success"]
for i in range(1, len(successful_events)):
prev = successful_events.iloc[i-1]
curr = successful_events.iloc[i]
# Skip if location data is missing
if not prev.get("location") or not curr.get("location"):
continue
if not prev["location"].get("lat") or not curr["location"].get("lat"):
continue
# Calculate distance and time delta
distance_km = haversine_distance(
prev["location"]["lat"], prev["location"]["lon"],
curr["location"]["lat"], curr["location"]["lon"]
)
time_diff = (pd.Timestamp(curr["timestamp"]) -
pd.Timestamp(prev["timestamp"])).total_seconds() / 3600
if time_diff <= 0:
continue
required_speed = distance_km / time_diff
# Flag if required speed exceeds maximum realistic travel
if required_speed > max_speed_kmh and distance_km > 100:
alerts.append({
"alert_type": "IMPOSSIBLE_TRAVEL",
"severity": "HIGH",
Related in Data & Analytics
clawarr-suite
IncludedComprehensive management for self-hosted media stacks (Sonarr, Radarr, Lidarr, Readarr, Prowlarr, Bazarr, Overseerr, Plex, Tautulli, SABnzbd, Recyclarr, Unpackerr, Notifiarr, Maintainerr, Kometa, FlareSolverr). Deep library exploration, analytics, dashboard generation, content management, request handling, subtitle management, indexer control, download monitoring, quality profile sync, library cleanup automation, notification routing, collection/overlay management, and media tracker integration (Trakt, Letterboxd, Simkl).
querying-soql
IncludedSOQL query generation, optimization, and analysis with 100-point scoring. Use this skill when the user needs SOQL/SOSL authoring or optimization: natural-language-to-query generation, relationship queries, aggregates, query-plan analysis, and performance or safety improvements for Salesforce queries. TRIGGER when: user writes, optimizes, or debugs SOQL/SOSL queries, touches .soql files, or asks about relationship queries, aggregates, or query performance. DO NOT TRIGGER when: bulk data operations (use handling-sf-data), Apex DML logic (use generating-apex), or report/dashboard queries.
app-store-optimization
IncludedApp Store Optimization (ASO) toolkit for researching keywords, analyzing competitor rankings, generating metadata suggestions, and improving app visibility on Apple App Store and Google Play Store. Use when the user asks about ASO, app store rankings, app metadata, app titles and descriptions, app store listings, app visibility, or mobile app marketing on iOS or Android. Supports keyword research and scoring, competitor keyword analysis, metadata optimization, A/B test planning, launch checklists, and tracking ranking changes.
habit-flow
IncludedAI-powered atomic habit tracker with natural language logging, streak tracking, smart reminders, and coaching. Use for creating habits, logging completions naturally ("I meditated today"), viewing progress, and getting personalized coaching.
app-store-optimization
IncludedApp Store Optimization (ASO) toolkit for researching keywords, analyzing competitor rankings, generating metadata suggestions, and improving app visibility on Apple App Store and Google Play Store. Use when the user asks about ASO, app store rankings, app metadata, app titles and descriptions, app store listings, app visibility, or mobile app marketing on iOS or Android. Supports keyword research and scoring, competitor keyword analysis, metadata optimization, A/B test planning, launch checklists, and tracking ranking changes.
visualizing-data
IncludedBuilds dashboards, reports, and data-driven interfaces requiring charts, graphs, or visual analytics. Provides systematic framework for selecting appropriate visualizations based on data characteristics and analytical purpose. Includes 24+ visualization types organized by purpose (trends, comparisons, distributions, relationships, flows, hierarchies, geospatial), accessibility patterns (WCAG 2.1 AA compliance), colorblind-safe palettes, and performance optimization strategies. Use when creating visualizations, choosing chart types, displaying data graphically, or designing data interfaces.