HIGH replay attackfastapijwt tokens

Replay Attack in Fastapi with Jwt Tokens

Replay Attack in Fastapi with Jwt Tokens

A replay attack occurs when an attacker intercepts a valid request and retransmits it to reproduce the original effect. In FastAPI applications that rely on JWT tokens for authentication, this risk exists primarily at the application and transport layers rather than within the JWT specification itself. JWTs include a timestamp (iat) and often an expiration (exp), but if these are not actively validated against a per-request or per-session context, a captured token can be reused until it expires.

Without additional protections, an attacker who observes a legitimate request—such as an order submission, a payment confirmation, or a state-changing PATCH/PUT—can replay the same HTTP method, path, headers, and body to perform unauthorized actions. This is especially relevant for idempotent-sensitive endpoints where repeated execution leads to duplicate records, double charges, or unintended state transitions. In FastAPI, common contributing factors include missing nonce or replay-cache mechanisms, long token lifetimes, and endpoints that do not bind the request to a unique context such as a client identifier or timestamp.

Transport layer protections like TLS reduce the likelihood of interception, but they do not prevent a legitimately captured token from being reused from the same or a different location if other defenses are absent. JWTs signed with strong algorithms (e.g., RS256) guarantee integrity and authenticity to the server, but they do not guarantee freshness. Therefore, replay resistance must be implemented explicitly by the API designer, for example by requiring short-lived tokens, one-time nonces, server-side replay caches, or per-request timestamps with strict tolerances.

In practice, an attacker may exploit replay weaknesses in unauthenticated or partially authenticated attack surfaces that middleBrick scans as part of its 12 security checks, including Authentication, BOLA/IDOR, and Rate Limiting. While middleBrick does not fix or block attacks, its findings can highlight missing freshness controls and recommend specific remediations aligned with frameworks such as OWASP API Security Top 10 and relevant compliance mappings.

Jwt Tokens-Specific Remediation in Fastapi

To mitigate replay risks in FastAPI with JWT tokens, implement per-request freshness checks and ensure tokens are short-lived and bound to contextual data. Use HTTPS consistently, set conservative expiration times, and incorporate a server-side mechanism to detect reused requests. Below are concrete code examples illustrating these practices.

First, configure JWT token validation in FastAPI with strict exp and nbf (not before) validation, and prefer asymmetric keys (RS256) for better key management. Enforce token expiration and issue timestamps, and bind tokens to a per-user or per-client claim to reduce the value of a captured token.

from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel

SECRET_KEY = "your-secure-secret-or-private-key"
ALGORITHM = "RS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class TokenData(BaseModel):
    sub: Optional[str] = None
    jti: Optional[str] = None  # JWT ID for replay detection
    exp: Optional[int] = None

# In production, load public key from a secure source
def load_public_key():
    # Placeholder: in practice, read PEM from file or KMS
    return "-----BEGIN PUBLIC KEY-----..."

def verify_token(token: str = Depends(oauth2_scheme)):
    public_key = load_public_key()
    try:
        payload = jwt.decode(token, public_key, algorithms=[ALGORITHM], options={"verify_exp": True, "verify_nbf": True})
        sub: str = payload.get("sub")
        jti: str = payload.get("jti")
        exp: int = payload.get("exp")
        if sub is None or jti is None:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token claims")
        return TokenData(sub=sub, jti=jti, exp=exp)
    except JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate token")

Second, implement a lightweight server-side replay cache using an in-memory store for demonstration; in production, use a distributed store with TTL slightly longer than your token lifetime to handle clock skew and ensure idempotency. Include the jti (JWT ID), the HTTP method, and the canonicalized request path (and optionally the request body hash for non-GET requests) as the replay key.

from fastapi import Request
import time
from typing import Dict, Tuple

# Simple in-memory replay cache; replace with Redis or similar in production
replay_cache: Dict[Tuple[str, str, str], float] = {}
REPLAY_TTL_SECONDS = ACCESS_TOKEN_EXPIRE_MINUTES * 60 + 60  # slightly longer than token lifetime

def is_replayed(key: Tuple[str, str, str], now: float) -> bool:
    entry = replay_cache.get(key)
    if entry is None:
        return False
    if now - entry > REPLAY_TTL_SECONDS:
        del replay_cache[key]
        return False
    return True

@app.middleware("http")
async def replay_protection_middleware(request: Request, call_next):
    if request.method in {"POST", "PUT", "PATCH", "DELETE"}:
        token = request.headers.get("Authorization")
        if token and token.startswith("Bearer "):
            # Assume verify_token or dependency provides jti; here we decode again for illustration
            try:
                from jose import jwt
                public_key = load_public_key()
                payload = jwt.decode(token[7:], public_key, algorithms=[ALGORITHM], options={"verify_exp": True})
                jti = payload.get("jti")
                if jti:
                    key = (jti, request.method, request.url.path)
                    now = time.time()
                    if is_replayed(key, now):
                        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Request replay detected")
                    replay_cache[key] = now
            except Exception:
                # If token decoding fails, allow downstream authentication errors to handle it
                pass
    response = await call_next(request)
    return response

Third, prefer short token lifetimes and refresh token rotation to limit the window for replay. Combine these controls with rate limiting on sensitive endpoints to further reduce risk. Findings from scans such as those performed by middleBrick can help identify missing freshness controls and support remediation aligned with frameworks like OWASP API Top 10 and compliance requirements.

Frequently Asked Questions

Can a JWT be safely reused if it has not expired?
No. Even if a JWT is not expired, reusing it can lead to replay attacks. You should implement per-request identifiers (e.g., jti) and server-side replay detection to ensure freshness and prevent reuse.
Does enabling HTTPS alone prevent replay attacks with JWT tokens?
No. HTTPS protects against token interception in transit, but it does not prevent an attacker from replaying a legitimately captured token. Additional controls such as short lifetimes, nonces, and replay caches are required.