Race Condition in Fastapi with Hmac Signatures
Race Condition in Fastapi with Hmac Signatures — how this specific combination creates or exposes the vulnerability
A race condition in FastAPI involving HMAC signatures typically arises when signature validation and the subsequent state-modifying action are not performed as an atomic check. Consider an endpoint that accepts a JSON payload containing an identifier, a timestamp, and an HMAC signature over these fields. If the application first verifies the HMAC and then, in a separate step, checks whether the resource identified by the identifier is already processed, an attacker can exploit timing differences or concurrent requests.
For example, an attacker may rapidly submit multiple requests with the same valid HMAC but different identifiers or operations. Between signature verification and resource state updates, concurrent execution may allow more than one request to pass the signature check and proceed to modify shared state, bypassing intended one-time or single-owner guarantees. This pattern is relevant to replay-like scenarios or authorization checks that are not tied to the signed context.
In FastAPI, if the signature validation logic does not incorporate the business-state check within the same critical section or does not use deterministic, constant-time comparison for signature validation, the unauthenticated attack surface includes timing discrepancies across requests. The scanner’s BOLA/IDOR and Authentication checks may surface findings where endpoints accept valid HMACs but lack proper synchronization or idempotency controls, indicating a potential race condition risk in the application flow.
An illustrative vulnerable FastAPI route (no middleware or database transaction coordination):
import time
import hmac
import hashlib
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
app = FastAPI()
SECRET = b"super-secret-key"
class Payload(BaseModel):
resource_id: str
action: str
timestamp: str
signature: str
def verify_signature(data: dict, sig: str) -> bool:
message = f"{data['resource_id']}{data['action']}{data['timestamp']}".encode()
expected = hmac.new(SECRET, message, hashlib.sha256).hexdigest()
# Potential timing issue if not constant-time
return hmac.compare_digest(expected, sig)
@app.post("/operate")
async def operate(payload: Payload, x_request_id: str = Header(None)):
data = payload.dict()
if not verify_signature(data, payload.signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# Race condition: state check and update are not atomic
# An attacker could trigger this concurrently and bypass intended one-time behavior
if resource_already_processed(data["resource_id"]):
raise HTTPException(status_code=409, detail="Already processed")
process_resource(data)
return {"status": "ok"}
def resource_already_processed(rid: str) -> bool:
# Placeholder for actual storage check
return False
def process_resource(data: dict):
# Placeholder for actual processing
pass
Hmac Signatures-Specific Remediation in Fastapi — concrete code fixes
To mitigate race conditions with HMAC signatures in FastAPI, ensure that verification and the subsequent state change are treated as a single logical unit. Use server-side locking or database transactions with appropriate isolation levels so that concurrent requests for the same resource serialize correctly. Additionally, enforce replay protection by storing processed identifiers with expiration and use constant-time comparisons for signatures.
Below is a hardened example that includes a lock file pattern using asyncio locks for in-process serialization per resource_id and constant-time HMAC verification. For distributed systems, replace the asyncio lock with a distributed lock (e.g., using Redis or your database row-level locking).
import asyncio
import time
import hmac
import hashlib
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from typing import Dict
app = FastAPI()
SECRET = b"super-secret-key"
processed_locks: Dict[str, asyncio.Lock] = {}
processed_set = set() # Replace with persistent store and TTL in production
class Payload(BaseModel):
resource_id: str
action: str
timestamp: str
signature: str
def verify_signature_constant_time(data: dict, sig: str) -> bool:
message = f"{data['resource_id']}{data['action']}{data['timestamp']}".encode()
expected = hmac.new(SECRET, message, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig)
async def get_lock(resource_id: str) -> asyncio.Lock:
if resource_id not in processed_locks:
processed_locks[resource_id] = asyncio.Lock()
return processed_locks[resource_id]
async def is_already_processed(rid: str) -> bool:
# Replace with a persistent, TTL-backed store in production
return rid in processed_set
async def mark_processed(rid: str):
# Replace with persistent store; ensure TTL to avoid unbounded growth
processed_set.add(rid)
@app.post("/operate")
async def operate(payload: Payload, x_request_id: str = Header(None)):
if not verify_signature_constant_time(payload.dict(), payload.signature):
raise HTTPException(status_code=401, detail="Invalid signature")
rid = payload.resource_id
lock = await get_lock(rid)
async with lock:
if await is_already_processed(rid):
raise HTTPException(status_code=409, detail="Already processed")
process_resource(payload.dict())
await mark_processed(rid)
return {"status": "ok"}
def process_resource(data: dict):
# Actual business logic here
pass
Key remediation points:
- Use
hmac.compare_digestfor constant-time signature comparison to mitigate timing attacks on the HMAC itself. - Serialize access per resource using per-resource locks (or distributed locks) so that signature validation and state mutation occur atomically from the perspective of concurrency control.
- Store processed identifiers in a persistent store with TTL to prevent indefinite growth and to handle restarts; this also supports idempotency across deployments.
- Ensure the signed message includes all contextual fields that must be bound (e.g., timestamp, resource_id, action) to prevent substitution attacks that could exploit missing bindings.
These changes reduce the race condition surface by ensuring that a valid HMAC alone does not implicitly authorize state changes; the combined check is protected by synchronization and replay-awareness, aligning the implementation with secure concurrency practices.