HIGH nosql injectionfastapi

Nosql Injection in Fastapi

How Nosql Injection Manifests in Fastapi

Nosql injection in Fastapi applications typically occurs when user input is directly incorporated into NoSQL database queries without proper sanitization. Fastapi's async nature and common patterns with MongoDB create specific attack vectors that developers must understand.

The most common scenario involves Fastapi dependency injection where query parameters are passed directly to database methods. Consider this vulnerable Fastapi endpoint:

from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient

app = FastAPI()
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.my_database

@app.get("/users")
async def get_users(name: str):
    query = {"name": name}  # Direct user input
    results = await db.users.find(query).to_list(10)
    return results

An attacker can exploit this by sending requests like:

GET /users?name[$ne]=1 HTTP/1.1
Host: localhost:8000

GET /users?name[$ne]=1&age[$lt]=20 HTTP/1.1
Host: localhost:8000

This bypasses authentication by using MongoDB's query operators. The $ne operator returns all documents where name doesn't equal 1, effectively dumping the entire collection.

Fastapi's Pydantic model binding can also introduce vulnerabilities when using dynamic queries:

from pydantic import BaseModel

class UserQuery(BaseModel):
    name: str
    age: int

@app.get("/users")
async def get_users(query: UserQuery = Depends()):
    results = await db.users.find(query.dict()).to_list(10)
    return results

Even though Pydantic validates types, it doesn't prevent NoSQL injection operators. An attacker can craft requests with special characters that Pydantic accepts but the database interprets as operators.

Another Fastapi-specific pattern involves path parameters used in queries:

@app.get("/users/{user_id}")
async def get_user(user_id: str):
    query = {"_id": user_id}
    user = await db.users.find_one(query)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

Attackers can inject MongoDB operators through the URL path, such as /users/[$ne=null], causing the application to return arbitrary documents.

Fastapi-Specific Detection

Detecting NoSQL injection in Fastapi requires understanding both the framework's patterns and the underlying database interactions. Here's how to identify these vulnerabilities in your Fastapi applications.

Code Review Patterns: Look for these red flags in your Fastapi codebase:

# VULNERABLE - Direct query construction
query = {"field": request_data}
await db.collection.find(query).to_list(10)

# VULNERABLE - Dynamic operator usage
query = {"$or": [{ "field": value }, { "field2": value2 }]}

# VULNERABLE - Path parameter injection
@app.get("/items/{item_id}")
async def get_item(item_id: str):
    await db.items.find_one({"_id": item_id})

Automated Scanning with middleBrick: middleBrick's black-box scanning approach is particularly effective for Fastapi applications because it tests the actual runtime behavior without requiring source code access.

The scanner sends crafted payloads to Fastapi endpoints and analyzes responses for NoSQL injection indicators:

# Install and use middleBrick CLI
npm install -g middlebrick
middlebrick scan http://localhost:8000/api/users

# Output includes:
# - NoSQL injection vulnerability detection
# - Risk score (A-F) for each endpoint
# - Specific findings with remediation guidance
# - API inventory showing all discovered endpoints

middleBrick tests 12 security categories including NoSQL injection by sending payloads with MongoDB operators like $ne, $gt, $lt, $regex, and $where. It analyzes whether the application returns unexpected data or error messages that reveal database structure.

Runtime Monitoring: For Fastapi applications in production, implement request logging and anomaly detection:

from fastapi import Request
from fastapi.middleware import Middleware

async def nosql_injection_middleware(request: Request, call_next):
    # Check for suspicious query patterns
    query = dict(request.query_params)
    suspicious = ["$ne", "$gt", "$lt", "$regex", "$where"]
    if any(op in str(query) for op in suspicious):
        # Log and alert
        print(f"Potential NoSQL injection attempt: {query}")
    response = await call_next(request)
    return response

app.add_middleware(nosql_injection_middleware)

Fastapi-Specific Remediation

Remediating NoSQL injection in Fastapi requires a defense-in-depth approach. Here are Fastapi-specific techniques to secure your NoSQL database interactions.

1. Input Validation with Pydantic Models: Create strict validation models that reject special characters:

from pydantic import BaseModel, constr
from typing import Optional

class UserQuery(BaseModel):
    name: constr(min_length=1, max_length=50, strip_whitespace=True)
    age: Optional[int] = None
    
    @validator('name')
    def name_cannot_contain_operators(cls, v):
        if any(char in v for char in '$(){}[].'):
            raise ValueError('Invalid characters in name')
        return v

@app.get("/users")
async def get_users(query: UserQuery = Depends()):
    # Safe query construction
    query_dict = query.dict(exclude_unset=True)
    results = await db.users.find(query_dict).to_list(10)
    return results

2. Whitelist-Based Query Building: Only allow specific fields and operators:

ALLOWED_FIELDS = {"name", "age", "email"}
ALLOWED_OPERATORS = {"eq", "gt", "lt"}  # Custom operators only

@app.get("/users")
async def get_users(**kwargs):
    query = {}
    for field, value in kwargs.items():
        if field not in ALLOWED_FIELDS:
            raise HTTPException(status_code=400, detail="Invalid field")
        # Validate value format
        if isinstance(value, str) and any(op in value for op in ["$ne", "$gt"]):
            raise HTTPException(status_code=400, detail="Invalid operator")
        query[field] = value
    results = await db.users.find(query).to_list(10)
    return results

3. Use Fastapi Dependency Injection for Security: Create reusable security dependencies:

from fastapi import Depends, HTTPException
from typing import Dict

def validate_nosql_query(query: Dict) -> Dict:
    """Dependency to validate NoSQL queries"""
    for key, value in query.items():
        if isinstance(value, dict):
            for operator in value.keys():
                if operator.startswith('$'):
                    raise HTTPException(
                        status_code=400, 
                        detail=f"Operator {operator} not allowed"
                    )
        if isinstance(value, str):
            if any(char in value for char in '$(){}[].'):
                raise HTTPException(
                    status_code=400, 
                    detail="Invalid characters in query"
                )
    return query

@app.get("/users")
async def get_users(query: Dict = Depends(validate_nosql_query)):
    results = await db.users.find(query).to_list(10)
    return results

4. Parameterized Queries with PyMongo: While PyMongo doesn't support traditional parameterization, you can create safe query builders:

from bson import ObjectId

async def safe_find_one(collection, _id: str):
    try:
        # Validate ObjectId format
        obj_id = ObjectId(_id)
        return await collection.find_one({"_id": obj_id})
    except Exception:
        raise HTTPException(status_code=400, detail="Invalid ID format")

@app.get("/users/{user_id}")
async def get_user(user_id: str):
    user = await safe_find_one(db.users, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

5. Rate Limiting and Monitoring: Combine Fastapi's rate limiting with security monitoring:

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()
app.exception_handler(RateLimitExceeded)(rate_limit_exceeded_handler)
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.middleware("http")
async def security_middleware(request: Request, call_next):
    # Check for suspicious patterns
    query = dict(request.query_params)
    if any(char in str(query) for char in '$(){}[].'):
        # Rate limit suspicious requests
        limit = "10/minute;60/hour"
        if not request.state.limiter.is_allowed("security_check", limit):
            raise HTTPException(status_code=429, detail="Rate limit exceeded")
    response = await call_next(request)
    return response

Frequently Asked Questions

How can I test my Fastapi application for NoSQL injection vulnerabilities?
Use middleBrick's automated scanning by running middlebrick scan <your-api-url>. The scanner sends crafted payloads with MongoDB operators to test for injection vulnerabilities. It analyzes responses for data leakage and provides a security score with specific findings. For manual testing, try requests with operators like $ne, $gt, and $regex to see if they're properly rejected.
Does Fastapi's Pydantic validation prevent NoSQL injection?
No, Pydantic validation alone doesn't prevent NoSQL injection. While Pydantic validates data types and can enforce constraints, it doesn't block MongoDB query operators. An attacker can still use operators like $ne or $gt in string fields that pass Pydantic validation but are interpreted as database operators. You need additional validation to reject special characters and operators.