HIGH ldap injectionfastapi

Ldap Injection in Fastapi

How Ldap Injection Manifests in Fastapi

LDAP injection in Fastapi applications typically occurs when user input is directly incorporated into LDAP queries without proper sanitization. Fastapi's async nature and common authentication patterns create specific vulnerabilities that attackers can exploit.

The most common scenario involves Fastapi's dependency injection system combined with LDAP-based authentication. Consider this vulnerable pattern:

from fastapi import FastAPI, Depends, HTTPException
from ldap3 import Server, Connection, ALL

app = FastAPI()

def get_ldap_connection():
    server = Server('ldap://your-ldap-server.com', get_info=ALL)
    conn = Connection(server, user='cn=admin,dc=example,dc=com', password='admin_password')
    if not conn.bind():
        raise HTTPException(status_code=500, detail='LDAP connection failed')
    return conn

@app.post('/login')
async def login(username: str, password: str, conn: Connection = Depends(get_ldap_connection)):
    search_filter = f'(uid={username})'
    conn.search(search_base='dc=example,dc=com', search_filter=search_filter)
    
    if not conn.entries:
        raise HTTPException(status_code=401, detail='Invalid credentials')
    
    user_dn = conn.entries[0].entry_dn
    return conn.rebind(user=user_dn, password=password)

The vulnerability lies in the search_filter = f'(uid={username})' line. An attacker can inject LDAP filter operators like *, |, or & to bypass authentication or extract sensitive directory information.

Common Fastapi LDAP injection payloads:

  • username=admin*)(|(uid=* - bypasses authentication by creating an always-true filter
  • username=admin*)(!uid=* - excludes valid users
  • username=admin*)(uid=admin* - wildcard search for admin accounts
  • username=admin*)(objectClass=* - retrieves all directory entries
  • username=admin*)(g=* - causes LDAP server errors revealing internal structure

Fastapi's async dependency injection can exacerbate these issues. If the LDAP connection is shared across requests without proper isolation, an attacker might cause race conditions or timing attacks to extract information about valid usernames.

Another Fastapi-specific pattern involves using Pydantic models for input validation, which can create a false sense of security:

from pydantic import BaseModel

class LoginRequest(BaseModel):
    username: str
    password: str

@app.post('/login')
async def login(login_data: LoginRequest, conn: Connection = Depends(get_ldap_connection)):
    # Still vulnerable despite Pydantic validation
    search_filter = f'(uid={login_data.username})'
    # ... rest of vulnerable code

Pydantic validates type and format but doesn't sanitize LDAP injection characters, leaving the application vulnerable.

Fastapi-Specific Detection

Detecting LDAP injection in Fastapi requires both static code analysis and runtime scanning. middleBrick's API security scanner includes specialized checks for LDAP vulnerabilities in Fastapi applications.

middleBrick's LDAP injection detection for Fastapi applications includes:

  • Pattern matching for LDAP query construction in Fastapi route handlers
  • Detection of direct string interpolation in search filters
  • Analysis of dependency injection patterns that might expose LDAP connections
  • Scanning for common LDAP injection payloads in request parameters
  • Checking for improper error handling that might leak directory information

The scanner tests your Fastapi endpoints with a battery of LDAP injection payloads, monitoring for:

  • Authentication bypass (returning success with invalid credentials)
  • Information disclosure (revealing valid usernames or directory structure)
  • Server errors that expose LDAP schema or configuration
  • Timing differences that indicate successful injection

middleBrick's LLM/AI security module also checks for AI-specific LDAP injection scenarios, such as when Fastapi applications use AI models to generate LDAP queries or when AI-generated code contains LDAP injection vulnerabilities.

For manual testing, you can use tools like Burp Suite or OWASP ZAP with these Fastapi-specific LDAP payloads:

# Test authentication bypass
curl -X POST 'https://your-fastapi-app.com/login' \
  -H 'Content-Type: application/json' \
  -d '{"username": "admin*)(|(uid=*&password="invalid"}'

middleBrick's CLI tool provides Fastapi-specific reporting:

middlebrick scan https://your-fastapi-app.com --fastapi --ldap

# Or integrate into Fastapi development workflow
middlebrick scan http://localhost:8000 --fastapi --ldap --fail-on-high-risk

The scanner's OpenAPI analysis feature examines your Fastapi's auto-generated OpenAPI spec to identify endpoints that handle user input potentially used in LDAP queries, then correlates this with runtime scanning results.

Fastapi-Specific Remediation

Remediating LDAP injection in Fastapi requires a defense-in-depth approach using Fastapi's native features and proper LDAP query construction techniques.

The most effective approach uses parameterized LDAP queries with Fastapi's dependency injection:

from fastapi import FastAPI, Depends, HTTPException
from ldap3 import Server, Connection, ALL, SUBTREE, NTLM
from ldap3.core.exceptions import LDAPException
from ldap3.utils.conv import escape_filter_chars

app = FastAPI()

class LDAPConnection:
    def __init__(self):
        self.server = Server('ldap://your-ldap-server.com', get_info=ALL)
        self.admin_conn = None
    
    async def __aenter__(self):
        self.admin_conn = Connection(self.server, user='cn=admin,dc=example,dc=com', password='admin_password', authentication=NTLM)
        if not self.admin_conn.bind():
            raise HTTPException(status_code=500, detail='LDAP connection failed')
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        if self.admin_conn:
            self.admin_conn.unbind()
    
    def search_user(self, username: str):
        # Critical: escape LDAP filter characters
        safe_username = escape_filter_chars(username)
        search_filter = f'(uid={safe_username})'
        
        try:
            self.admin_conn.search(
                search_base='dc=example,dc=com',
                search_filter=search_filter,
                search_scope=SUBTREE,
                attributes=['dn', 'uid', 'cn', 'mail']
            )
            return self.admin_conn.entries
        except LDAPException as e:
            raise HTTPException(status_code=500, detail='LDAP search failed')

@app.post('/login')
async def login(username: str, password: str, ldap: LDAPConnection = Depends()):
    entries = ldap.search_user(username)
    
    if not entries:
        raise HTTPException(status_code=401, detail='Invalid credentials')
    
    user_dn = entries[0].entry_dn
    
    try:
        user_conn = Connection(ldap.server, user=user_dn, password=password, authentication=NTLM)
        if not user_conn.bind():
            raise HTTPException(status_code=401, detail='Invalid credentials')
        return {'message': 'Login successful'}
    except LDAPException:
        raise HTTPException(status_code=401, detail='Invalid credentials')

Key Fastapi-specific remediation techniques:

  1. Use context managers: The LDAPConnection class uses async context management to ensure connections are properly closed, preventing resource exhaustion attacks.
  2. Escape filter characters: Always use escape_filter_chars() from ldap3.utils.conv on user input before incorporating it into LDAP filters.
  3. Implement proper error handling: Don't reveal whether a username exists or provide detailed LDAP error messages to attackers.
  4. Use dependency injection: Fastapi's dependency system ensures proper connection lifecycle management.
  5. Add rate limiting: Use Fastapi's rate limiting middleware to prevent brute-force LDAP injection attempts.

Additional Fastapi-specific security measures:

from fastapi.middleware.trustedhost import TrustedHostMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()

# Add trusted host middleware
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=['your-domain.com', 'localhost']
)

# Add rate limiting
limiter = Limiter(key_func=get_remote_address)
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.state.limiter = limiter

@app.post('/login')
@limiter.limit('5/minute')
async def login(username: str, password: str, ldap: LDAPConnection = Depends()):
    # ... secure LDAP code
    pass

For comprehensive protection, integrate middleBrick's continuous monitoring into your Fastapi CI/CD pipeline:

# .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]

jobs:
  security:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run middleBrick LDAP injection scan
        run: |
          npm install -g middlebrick
          middlebrick scan ${{ secrets.API_URL }} \
            --fastapi \
            --ldap \
            --fail-on-high-risk \
            --output json > security-report.json
      - name: Upload report
        uses: actions/upload-artifact@v3
        with:
          name: security-report
          path: security-report.json

Frequently Asked Questions

Why doesn't Pydantic validation prevent LDAP injection in Fastapi?
Pydantic only validates data types and formats (e.g., string length, regex patterns), but doesn't sanitize LDAP-specific injection characters like *, (, ), &, |, or null bytes. An attacker can craft payloads that pass Pydantic validation but still inject LDAP filters. Always use ldap3.utils.conv.escape_filter_chars() in addition to Pydantic validation.
Can I use FastAPI's dependency injection system to prevent LDAP injection?
FastAPI's dependency injection helps manage LDAP connection lifecycles and ensures proper resource cleanup, but it doesn't prevent injection attacks. The dependency system can be combined with parameterized queries and input sanitization to create a secure architecture. Use dependency injection for connection management, but always escape user input before using it in LDAP filters.