HIGH dictionary attackaxum

Dictionary Attack in Axum

How Dictionary Attack Manifests in Axum

Dictionary attacks in Axum applications exploit weak authentication mechanisms by systematically trying common passwords, usernames, or API keys. In Axum's async architecture, these attacks can be particularly damaging because the framework's non-blocking nature allows attackers to maintain numerous concurrent connections while testing credentials.

The most common manifestation occurs in login endpoints where attackers iterate through password dictionaries against valid usernames. In Axum, this often targets routes handling authentication middleware, session management, or API key validation. The attack can be amplified when Axum applications use shared connection pools or when authentication state is stored in memory without proper rate limiting.

Consider a typical Axum login endpoint:

async fn login(
    Json(credentials): Json<LoginRequest>,
    data: web::Data<AppState>
) -> Result<Json<LoginResponse>> {
    let user = sqlx::query_as(
        "SELECT * FROM users WHERE username = $1"
    )
    .bind(credentials.username)
    .fetch_one(&data.db)
    .await?;
    
    let valid = argon2::verify_password(
        credentials.password.as_bytes(),
        user.password_hash.as_bytes()
    )?;
    
    if valid {
        Ok(Json(LoginResponse { token: "..." }))
    } else {
        Err(Error::Unauthorized)
    }
}

An attacker can send thousands of requests per second to this endpoint, cycling through common passwords. Without rate limiting, Axum's async runtime will happily process all requests, potentially exhausting database connections or memory resources.

Dictionary attacks also target API key validation in Axum middleware:

async fn validate_api_key(
    req: &Request,
    data: &AppState
) -> Result<()> {
    let key = req.headers().get("x-api-key");
    if let Some(key) = key {
        let valid = sqlx::query(
            "SELECT 1 FROM api_keys WHERE key = $1 AND active = true"
        )
        .bind(key)
        .fetch_optional(&data.db)
        .await?;
        
        if valid.is_some() {
            return Ok(());
        }
    }
    Err(Error::Unauthorized)
}

Without rate limiting or IP-based restrictions, attackers can brute-force API keys by sending requests with different key values, potentially discovering valid keys through timing analysis or error message variations.

Axum-Specific Detection

Detecting dictionary attacks in Axum requires monitoring authentication endpoints for anomalous request patterns. The framework's middleware system makes it straightforward to implement detection logic that tracks failed authentication attempts.

Implementation using Axum's built-in middleware:

use std::collections::HashMap;
use std::time::Duration;
use async_trait::async_trait;
use axum::middleware::Next;
use axum::http::Request;
use axum::response::Response;

struct DictionaryAttackDetector {
    attempts: HashMap<String, Vec<std::time::Instant>>,
    max_attempts: usize,
    window: Duration,
}

#[async_trait]
impl axum::middleware::Middleware for DictionaryAttackDetector {
    type Response = Response;
    type Error = axum::BoxError;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(
        &mut self,
        _cx: &mut Context,
    ) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Request, next: Next<S>) -> Self::Future {
        let ip = req.remote_addr().unwrap_or_else(|| "unknown".parse().unwrap());
        let now = std::time::Instant::now();
        
        let attempts = self.attempts.entry(ip.to_string()).or_insert_with(Vec::new);
        attempts.retain(|t| now - *t < self.window);
        attempts.push(now);
        
        if attempts.len() > self.max_attempts {
            let fut = async move {
                Ok(axum::response::IntoResponse::into_response(
                    (axum::http::StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded")
                ))
            };
            Box::pin(fut)
        } else {
            Box::pin(next.run(req))
        }
    }
}

This middleware tracks authentication attempts per IP address and blocks requests exceeding thresholds. For more sophisticated detection, you can integrate with external services or use Axum's extractors to analyze request patterns.

middleBrick scanning provides comprehensive detection by testing authentication endpoints with common password dictionaries and analyzing response patterns. The scanner identifies vulnerable endpoints, measures response time variations that could indicate successful authentication, and checks for rate limiting implementation.

middleBrick's LLM security checks also detect if authentication endpoints are exposed to AI models, which could allow attackers to use language models to generate and test credential variations more efficiently.

Axum-Specific Remediation

Remediating dictionary attacks in Axum requires a multi-layered approach combining rate limiting, authentication hardening, and monitoring. Axum's middleware system and extractors provide native capabilities for implementing these protections.

Rate limiting implementation using Axum's tower-http crate:

use tower_http::rate_limit::RateLimitLayer;
use tower_http::rate_limit::RateClass::Unbounded;
use std::time::Duration;

let rate_limit_layer = RateLimitLayer::new(
    DefaultRateLimitStore::direct(
        Unbounded::from_class(
            NonZeroU32::new(100).unwrap(), // 100 requests per window
            Duration::from_secs(60), // 60-second window
        ),
    ),
    || Uuid::new_v4().to_string(),
);

let app = Router::new()
    .route("/login", post(login_handler))
    .layer(rate_limit_layer);

This implements sliding window rate limiting that prevents excessive requests to any endpoint. For authentication-specific rate limiting, you can create custom middleware that tracks failed attempts:

async fn login(
    Json(credentials): Json<LoginRequest>,
    data: web::Data<AppState>,
    Extension(attempt_tracker): Extract<Extension<AttemptTracker>>,
) -> Result<Json<LoginResponse>> {
    let ip = attempt_tracker.get_client_ip();
    
    if attempt_tracker.is_rate_limited(ip) {
        return Err(Error::TooManyRequests);
    }
    
    let user = sqlx::query_as(
        "SELECT * FROM users WHERE username = $1"
    )
    .bind(credentials.username)
    .fetch_one(&data.db)
    .await?;
    
    let valid = argon2::verify_password(
        credentials.password.as_bytes(),
        user.password_hash.as_bytes()
    )?;
    
    if valid {
        attempt_tracker.reset_attempts(ip);
        Ok(Json(LoginResponse { token: "..." }))
    } else {
        attempt_tracker.record_failed_attempt(ip);
        Err(Error::Unauthorized)
    }
}

struct AttemptTracker {
    failed_attempts: HashMap<String, usize>,
    last_attempt: HashMap<String, std::time::Instant>,
    max_attempts: usize,
    window: Duration,
}

impl AttemptTracker {
    fn is_rate_limited(&self, ip: &str) -> bool {
        if let Some(count) = self.failed_attempts.get(ip) {
            if *count >= self.max_attempts {
                return true;
            }
        }
        false
    }
    
    fn record_failed_attempt(&mut self, ip: &str) {
        *self.failed_attempts.entry(ip.to_string()).or_insert(0) += 1;
        self.last_attempt.insert(ip.to_string(), std::time::Instant::now());
    }
    
    fn reset_attempts(&mut self, ip: &str) {
        self.failed_attempts.remove(ip);
        self.last_attempt.remove(ip);
    }
}

let app = Router::new()
    .route("/login", post(login))
    .with_state(AppState { db, attempt_tracker })
    .layer(AddExtensionLayer::new(attempt_tracker));

For API key validation, implement exponential backoff and account lockout:

async fn validate_api_key(
    req: &Request,
    data: &AppState,
    Extension(attempt_tracker): Extract<Extension<AttemptTracker>>
) -> Result<()> {
    let key = req.headers().get("x-api-key");
    let ip = req.remote_addr().unwrap_or_else(|| "unknown".parse().unwrap());
    
    if attempt_tracker.is_rate_limited(ip) {
        return Err(Error::TooManyRequests);
    }
    
    if let Some(key) = key {
        let valid = sqlx::query(
            "SELECT 1 FROM api_keys WHERE key = $1 AND active = true"
        )
        .bind(key)
        .fetch_optional(&data.db)
        .await?;
        
        if valid.is_some() {
            attempt_tracker.reset_attempts(ip);
            return Ok(());
        }
    }
    
    attempt_tracker.record_failed_attempt(ip);
    Err(Error::Unauthorized)
}

middleBrick's CLI tool can scan your Axum application for dictionary attack vulnerabilities before deployment:

npm install -g middlebrick
middlebrick scan http://localhost:3000/login --type=api

The scanner tests authentication endpoints with common password dictionaries and provides specific remediation guidance for Axum applications, including recommended rate limiting configurations and authentication hardening techniques.

Frequently Asked Questions

How can I test my Axum application for dictionary attack vulnerabilities?
Use middleBrick's CLI tool to scan your authentication endpoints. The scanner tests with common password dictionaries, analyzes response patterns for timing attacks, and checks for rate limiting implementation. You can also implement the detection middleware shown above to monitor live traffic patterns.
What's the best way to implement rate limiting in Axum for authentication endpoints?
Use tower-http's RateLimitLayer for basic rate limiting, combined with custom middleware that tracks failed authentication attempts per IP address. Implement sliding window counters with configurable thresholds (e.g., 5 failed attempts per 15 minutes) and exponential backoff for repeated failures. Consider adding CAPTCHA challenges after multiple failed attempts.