Distributed Denial Of Service in Fastapi with Jwt Tokens
Distributed Denial Of Service in Fastapi with Jwt Tokens — how this specific combination creates or exposes the vulnerability
When Fastapi applications use JWT tokens for authentication, certain DDoS vectors become more nuanced because both protocol-layer floods and application-layer authentication costs interact. A common pattern is validating and decoding JWTs on every request, often with synchronous libraries or blocking calls to retrieve public keys or introspect token metadata. Under high request rates, CPU-bound verification (e.g., RSA verification) and Python’s Global Interpreter Lock can saturate worker processes, causing request queueing and increased latency. This is especially true when token validation is performed per-route rather than at a shared authentication layer, multiplying the compute cost across endpoints.
Another vector specific to JWT usage is token enumeration and rejection overhead. If endpoints accept malformed or unsigned tokens and trigger expensive exception handling or fallback logic, attackers can craft many unique tokens to force repeated decoding failures, consuming Python exception resources and event loop time. Similarly, endpoints that decode payloads and then re-encode or transform tokens can double the processing per request. Because Fastapi relies on async views for concurrency, blocking operations in token validation (such as synchronous cryptography or filesystem reads for key rotation) prevent the async runtime from efficiently servicing other requests, effectively reducing concurrency under load.
Moreover, if token validation depends on network calls (e.g., fetching JWKS from a remote HTTPS endpoint), DDoS amplification can occur: an attacker sends many requests with different tokens, each triggering an outbound HTTP fetch. This consumes thread or async connection pools and can exhaust upstream rate limits or network sockets, degrading availability for legitimate users. In a microservice chain where Fastapi services call each other with JWTs, a flood of authenticated requests can cascade latency across downstream services, compounding the denial-of-service effect.
These risks are detectable by security scans that test authentication endpoints under load and inspect token validation paths. middleBrick’s Authentication and Rate Limiting checks, for example, evaluate whether token validation is consistently applied and whether rate limits protect authentication-sensitive routes. The LLM/AI Security checks also probe for endpoints that may leak system information through timing differences or error messages when invalid JWTs are presented, which can aid attacker reconnaissance for resource-exhaustion attacks.
Because OpenAPI/Swagger specs often document JWT usage via securitySchemes (e.g., type: http, scheme: bearer), scanning can cross-reference runtime behavior against spec definitions to ensure token validation does not introduce unexpected synchronous work or missing protections. This helps teams identify whether authentication routes or token-intense endpoints are disproportionately costly, providing data to prioritize mitigation before an in-the-wild DDoS event.
Jwt Tokens-Specific Remediation in Fastapi — concrete code fixes
To reduce DDoS risk while keeping JWT authentication, minimize synchronous and blocking work in the token path. Prefer lightweight decoding, cache resolved public keys, and enforce rate limits on authentication endpoints. Below are concrete Fastapi patterns that address these concerns.
1. Use async-compatible JWT decoding and cache keys
Decode JWTs with fast, non-blocking libraries and avoid per-request network fetches. Cache JWKS keys with an appropriate TTL to eliminate remote calls under load.
import asyncio
import time
from typing import Optional
import httpx
from fastapi import Fastapi, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt # PyJWT
app = Fastapi()
security_scheme = HTTPBearer()
# In-memory cache for JWKS with TTL
_jwks_cache = {"keys": None, "fetched_at": 0, "ttl": 300}
async def fetch_jwks():
url = "https://auth.example.com/.well-known/jwks.json"
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=5.0)
resp.raise_for_status()
return resp.json()
def get_jwks():
now = time.time()
if _jwks_cache["keys"] is None or now - _jwks_cache["fetched_at"] > _jwks_cache["ttl"]:
_jwks_cache["keys"] = asyncio.run(fetch_jwks())
_jwks_cache["fetched_at"] = now
return _jwks_cache["keys"]
def decode_token(token: str) -> dict:
# Use PyJWT with cached keys; avoid remote call per request
jwks = get_jwks()
# Simplified: select correct key by kid; in practice use jwks["keys"] lookup
signing_key = jwks["keys"][0]["x5c"][0] if jwks.get("keys") else None
if not signing_key:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing jwks")
# Decode without expensive introspection; adjust algorithms as needed
return jwt.decode(token, signing_key, algorithms=["RS256"], audience="api", issuer="https://auth.example.com")
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security_scheme)):
try:
return decode_token(credentials.credentials)
except jwt.PyJWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"invalid token: {e}")
@app.get("/items")
async def list_items(user: dict = Depends(get_current_user)):
return {"user": user.get("sub"), "items": []}
2. Apply rate limits to authentication routes
Protect token validation and login endpoints with rate limits to curb brute-force and token enumeration. Fastapi does not include built-in rate limiting; integrate a lightweight in-memory or external store. The example below uses a simple in-memory sliding window; in production, use Redis or a shared store across workers.
from fastapi import Fastapi, Request, HTTPException, status
from time import time
app = Fastapi()
# Simple in-memory rate limiter (not distributed)
request_log = {} # key -> list of timestamps
def is_rate_limited(key: str, limit: int = 10, window: int = 60) -> bool:
now = time()
timestamps = request_log.get(key, [])
# Remove outdated entries
recent = [t for t in timestamps if now - t < window]
if len(recent) >= limit:
return True
request_log[key] = recent
return False
@app.post("/login")
async def login(request: Request, username: str, password: str):
client_ip = request.client.host
if is_rate_limited(f"login:{client_ip}", limit=5, window=30):
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="rate limit exceeded")
# Perform authentication (e.g., verify credentials and issue JWT)
return {"access_token": "fake-jwt", "token_type": "bearer"}
3. Centralize token validation and avoid per-route work
Define a single dependency for JWT validation and reuse it across routes. Avoid per-route decoding or extra transformation that multiplies CPU usage. If you must transform tokens, ensure it is done once per validated token.
from fastapi import Depends
from typing import Callable
def token_required(audience: str, issuer: str):
def dependency(request: Request):
auth = request.headers.get("authorization")
if not auth or not auth.startswith("Bearer "):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
token = auth.split(" ")[1]
return decode_token(token) # uses cached keys
return dependency
@app.get("/profile")
async def profile(user: dict = Depends(token_required(audience="api", issuer="https://auth.example.com"))):
return {"sub": user["sub"]}
4. Consider algorithm and key choices to reduce CPU load
Symmetric algorithms like HS256 are faster than asymmetric RS256/ES256, but key management differs. If you use asymmetric keys, ensure public keys are cached and verification is offloaded to optimized libraries. Avoid frequent key rotation that forces repeated fetches. For high-throughput scenarios, benchmark token validation under load and choose algorithms that balance security and performance without introducing blocking I/O.
5. Use dependency injection guards for optional tokens where appropriate
Not all endpoints require authentication. Use scope-based security to skip JWT validation for public routes, reducing unnecessary CPU cycles. This also reduces the attack surface for token-exhaustion attempts on endpoints that do not need authentication.
6. Monitor and test under realistic load
Validate that your mitigations hold under concurrency by performing load tests that include valid and invalid JWTs. Observe CPU, memory, and connection pool usage. Combine runtime monitoring with periodic security scans that verify rate limiting, authentication coverage, and absence of blocking calls in token validation paths.