HIGH race conditionfastapihmac signatures

Race Condition in Fastapi with Hmac Signatures

Race Condition in Fastapi with Hmac Signatures — how this specific combination creates or exposes the vulnerability

A race condition in FastAPI involving HMAC signatures typically arises when signature validation and the subsequent state-modifying action are not performed as an atomic check. Consider an endpoint that accepts a JSON payload containing an identifier, a timestamp, and an HMAC signature over these fields. If the application first verifies the HMAC and then, in a separate step, checks whether the resource identified by the identifier is already processed, an attacker can exploit timing differences or concurrent requests.

For example, an attacker may rapidly submit multiple requests with the same valid HMAC but different identifiers or operations. Between signature verification and resource state updates, concurrent execution may allow more than one request to pass the signature check and proceed to modify shared state, bypassing intended one-time or single-owner guarantees. This pattern is relevant to replay-like scenarios or authorization checks that are not tied to the signed context.

In FastAPI, if the signature validation logic does not incorporate the business-state check within the same critical section or does not use deterministic, constant-time comparison for signature validation, the unauthenticated attack surface includes timing discrepancies across requests. The scanner’s BOLA/IDOR and Authentication checks may surface findings where endpoints accept valid HMACs but lack proper synchronization or idempotency controls, indicating a potential race condition risk in the application flow.

An illustrative vulnerable FastAPI route (no middleware or database transaction coordination):

import time
import hmac
import hashlib
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel

app = FastAPI()

SECRET = b"super-secret-key"

class Payload(BaseModel):
    resource_id: str
    action: str
    timestamp: str
    signature: str

def verify_signature(data: dict, sig: str) -> bool:
    message = f"{data['resource_id']}{data['action']}{data['timestamp']}".encode()
    expected = hmac.new(SECRET, message, hashlib.sha256).hexdigest()
    # Potential timing issue if not constant-time
    return hmac.compare_digest(expected, sig)

@app.post("/operate")
async def operate(payload: Payload, x_request_id: str = Header(None)):
    data = payload.dict()
    if not verify_signature(data, payload.signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
    # Race condition: state check and update are not atomic
    # An attacker could trigger this concurrently and bypass intended one-time behavior
    if resource_already_processed(data["resource_id"]):
        raise HTTPException(status_code=409, detail="Already processed")
    process_resource(data)
    return {"status": "ok"}

def resource_already_processed(rid: str) -> bool:
    # Placeholder for actual storage check
    return False

def process_resource(data: dict):
    # Placeholder for actual processing
    pass

Hmac Signatures-Specific Remediation in Fastapi — concrete code fixes

To mitigate race conditions with HMAC signatures in FastAPI, ensure that verification and the subsequent state change are treated as a single logical unit. Use server-side locking or database transactions with appropriate isolation levels so that concurrent requests for the same resource serialize correctly. Additionally, enforce replay protection by storing processed identifiers with expiration and use constant-time comparisons for signatures.

Below is a hardened example that includes a lock file pattern using asyncio locks for in-process serialization per resource_id and constant-time HMAC verification. For distributed systems, replace the asyncio lock with a distributed lock (e.g., using Redis or your database row-level locking).

import asyncio
import time
import hmac
import hashlib
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
from typing import Dict

app = FastAPI()

SECRET = b"super-secret-key"
processed_locks: Dict[str, asyncio.Lock] = {}
processed_set = set()  # Replace with persistent store and TTL in production

class Payload(BaseModel):
    resource_id: str
    action: str
    timestamp: str
    signature: str

def verify_signature_constant_time(data: dict, sig: str) -> bool:
    message = f"{data['resource_id']}{data['action']}{data['timestamp']}".encode()
    expected = hmac.new(SECRET, message, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, sig)

async def get_lock(resource_id: str) -> asyncio.Lock:
    if resource_id not in processed_locks:
        processed_locks[resource_id] = asyncio.Lock()
    return processed_locks[resource_id]

async def is_already_processed(rid: str) -> bool:
    # Replace with a persistent, TTL-backed store in production
    return rid in processed_set

async def mark_processed(rid: str):
    # Replace with persistent store; ensure TTL to avoid unbounded growth
    processed_set.add(rid)

@app.post("/operate")
async def operate(payload: Payload, x_request_id: str = Header(None)):
    if not verify_signature_constant_time(payload.dict(), payload.signature):
        raise HTTPException(status_code=401, detail="Invalid signature")

    rid = payload.resource_id
    lock = await get_lock(rid)
    async with lock:
        if await is_already_processed(rid):
            raise HTTPException(status_code=409, detail="Already processed")
        process_resource(payload.dict())
        await mark_processed(rid)
    return {"status": "ok"}

def process_resource(data: dict):
    # Actual business logic here
    pass

Key remediation points:

  • Use hmac.compare_digest for constant-time signature comparison to mitigate timing attacks on the HMAC itself.
  • Serialize access per resource using per-resource locks (or distributed locks) so that signature validation and state mutation occur atomically from the perspective of concurrency control.
  • Store processed identifiers in a persistent store with TTL to prevent indefinite growth and to handle restarts; this also supports idempotency across deployments.
  • Ensure the signed message includes all contextual fields that must be bound (e.g., timestamp, resource_id, action) to prevent substitution attacks that could exploit missing bindings.

These changes reduce the race condition surface by ensuring that a valid HMAC alone does not implicitly authorize state changes; the combined check is protected by synchronization and replay-awareness, aligning the implementation with secure concurrency practices.

Frequently Asked Questions

Can race conditions still occur if I use database transactions with HMAC validation in FastAPI?
Yes, if the transaction isolation level is insufficient or if the signature verification occurs outside the transaction boundary. Include the signature check and state update within the same transaction and use appropriate isolation (e.g., serializable) to prevent concurrent anomalies; otherwise a race condition may still be reachable.
Is using a lock per resource_id sufficient protection against all race conditions involving HMAC signatures?
Locks help serialize access for known resource IDs but do not protect against other vectors such as timing attacks on the HMAC or replay attacks without TTL-bound storage. Combine locks with constant-time HMAC verification and replay detection for comprehensive protection.