Dictionary Attack in Axum
How Dictionary Attack Manifests in Axum
Dictionary attacks in Axum applications exploit weak authentication mechanisms by systematically trying common passwords, usernames, or API keys. In Axum's async architecture, these attacks can be particularly damaging because the framework's non-blocking nature allows attackers to maintain numerous concurrent connections while testing credentials.
The most common manifestation occurs in login endpoints where attackers iterate through password dictionaries against valid usernames. In Axum, this often targets routes handling authentication middleware, session management, or API key validation. The attack can be amplified when Axum applications use shared connection pools or when authentication state is stored in memory without proper rate limiting.
Consider a typical Axum login endpoint:
async fn login(
Json(credentials): Json<LoginRequest>,
data: web::Data<AppState>
) -> Result<Json<LoginResponse>> {
let user = sqlx::query_as(
"SELECT * FROM users WHERE username = $1"
)
.bind(credentials.username)
.fetch_one(&data.db)
.await?;
let valid = argon2::verify_password(
credentials.password.as_bytes(),
user.password_hash.as_bytes()
)?;
if valid {
Ok(Json(LoginResponse { token: "..." }))
} else {
Err(Error::Unauthorized)
}
}An attacker can send thousands of requests per second to this endpoint, cycling through common passwords. Without rate limiting, Axum's async runtime will happily process all requests, potentially exhausting database connections or memory resources.
Dictionary attacks also target API key validation in Axum middleware:
async fn validate_api_key(
req: &Request,
data: &AppState
) -> Result<()> {
let key = req.headers().get("x-api-key");
if let Some(key) = key {
let valid = sqlx::query(
"SELECT 1 FROM api_keys WHERE key = $1 AND active = true"
)
.bind(key)
.fetch_optional(&data.db)
.await?;
if valid.is_some() {
return Ok(());
}
}
Err(Error::Unauthorized)
}Without rate limiting or IP-based restrictions, attackers can brute-force API keys by sending requests with different key values, potentially discovering valid keys through timing analysis or error message variations.
Axum-Specific Detection
Detecting dictionary attacks in Axum requires monitoring authentication endpoints for anomalous request patterns. The framework's middleware system makes it straightforward to implement detection logic that tracks failed authentication attempts.
Implementation using Axum's built-in middleware:
use std::collections::HashMap;
use std::time::Duration;
use async_trait::async_trait;
use axum::middleware::Next;
use axum::http::Request;
use axum::response::Response;
struct DictionaryAttackDetector {
attempts: HashMap<String, Vec<std::time::Instant>>,
max_attempts: usize,
window: Duration,
}
#[async_trait]
impl axum::middleware::Middleware for DictionaryAttackDetector {
type Response = Response;
type Error = axum::BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(
&mut self,
_cx: &mut Context,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request, next: Next<S>) -> Self::Future {
let ip = req.remote_addr().unwrap_or_else(|| "unknown".parse().unwrap());
let now = std::time::Instant::now();
let attempts = self.attempts.entry(ip.to_string()).or_insert_with(Vec::new);
attempts.retain(|t| now - *t < self.window);
attempts.push(now);
if attempts.len() > self.max_attempts {
let fut = async move {
Ok(axum::response::IntoResponse::into_response(
(axum::http::StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded")
))
};
Box::pin(fut)
} else {
Box::pin(next.run(req))
}
}
}This middleware tracks authentication attempts per IP address and blocks requests exceeding thresholds. For more sophisticated detection, you can integrate with external services or use Axum's extractors to analyze request patterns.
middleBrick scanning provides comprehensive detection by testing authentication endpoints with common password dictionaries and analyzing response patterns. The scanner identifies vulnerable endpoints, measures response time variations that could indicate successful authentication, and checks for rate limiting implementation.
middleBrick's LLM security checks also detect if authentication endpoints are exposed to AI models, which could allow attackers to use language models to generate and test credential variations more efficiently.
Axum-Specific Remediation
Remediating dictionary attacks in Axum requires a multi-layered approach combining rate limiting, authentication hardening, and monitoring. Axum's middleware system and extractors provide native capabilities for implementing these protections.
Rate limiting implementation using Axum's tower-http crate:
use tower_http::rate_limit::RateLimitLayer;
use tower_http::rate_limit::RateClass::Unbounded;
use std::time::Duration;
let rate_limit_layer = RateLimitLayer::new(
DefaultRateLimitStore::direct(
Unbounded::from_class(
NonZeroU32::new(100).unwrap(), // 100 requests per window
Duration::from_secs(60), // 60-second window
),
),
|| Uuid::new_v4().to_string(),
);
let app = Router::new()
.route("/login", post(login_handler))
.layer(rate_limit_layer);
This implements sliding window rate limiting that prevents excessive requests to any endpoint. For authentication-specific rate limiting, you can create custom middleware that tracks failed attempts:
async fn login(
Json(credentials): Json<LoginRequest>,
data: web::Data<AppState>,
Extension(attempt_tracker): Extract<Extension<AttemptTracker>>,
) -> Result<Json<LoginResponse>> {
let ip = attempt_tracker.get_client_ip();
if attempt_tracker.is_rate_limited(ip) {
return Err(Error::TooManyRequests);
}
let user = sqlx::query_as(
"SELECT * FROM users WHERE username = $1"
)
.bind(credentials.username)
.fetch_one(&data.db)
.await?;
let valid = argon2::verify_password(
credentials.password.as_bytes(),
user.password_hash.as_bytes()
)?;
if valid {
attempt_tracker.reset_attempts(ip);
Ok(Json(LoginResponse { token: "..." }))
} else {
attempt_tracker.record_failed_attempt(ip);
Err(Error::Unauthorized)
}
}
struct AttemptTracker {
failed_attempts: HashMap<String, usize>,
last_attempt: HashMap<String, std::time::Instant>,
max_attempts: usize,
window: Duration,
}
impl AttemptTracker {
fn is_rate_limited(&self, ip: &str) -> bool {
if let Some(count) = self.failed_attempts.get(ip) {
if *count >= self.max_attempts {
return true;
}
}
false
}
fn record_failed_attempt(&mut self, ip: &str) {
*self.failed_attempts.entry(ip.to_string()).or_insert(0) += 1;
self.last_attempt.insert(ip.to_string(), std::time::Instant::now());
}
fn reset_attempts(&mut self, ip: &str) {
self.failed_attempts.remove(ip);
self.last_attempt.remove(ip);
}
}
let app = Router::new()
.route("/login", post(login))
.with_state(AppState { db, attempt_tracker })
.layer(AddExtensionLayer::new(attempt_tracker));
For API key validation, implement exponential backoff and account lockout:
async fn validate_api_key(
req: &Request,
data: &AppState,
Extension(attempt_tracker): Extract<Extension<AttemptTracker>>
) -> Result<()> {
let key = req.headers().get("x-api-key");
let ip = req.remote_addr().unwrap_or_else(|| "unknown".parse().unwrap());
if attempt_tracker.is_rate_limited(ip) {
return Err(Error::TooManyRequests);
}
if let Some(key) = key {
let valid = sqlx::query(
"SELECT 1 FROM api_keys WHERE key = $1 AND active = true"
)
.bind(key)
.fetch_optional(&data.db)
.await?;
if valid.is_some() {
attempt_tracker.reset_attempts(ip);
return Ok(());
}
}
attempt_tracker.record_failed_attempt(ip);
Err(Error::Unauthorized)
}middleBrick's CLI tool can scan your Axum application for dictionary attack vulnerabilities before deployment:
npm install -g middlebrick
middlebrick scan http://localhost:3000/login --type=api
The scanner tests authentication endpoints with common password dictionaries and provides specific remediation guidance for Axum applications, including recommended rate limiting configurations and authentication hardening techniques.