Integrity Failures in Flask
How Integrity Failures Manifests in Flask
Integrity Failures in Flask APIs occur when data modifications bypass authorization checks, allowing attackers to tamper with resources they shouldn't access. In Flask's context, this manifests through several specific patterns that exploit the framework's flexibility and common development practices.
The most prevalent pattern involves improper session handling. Flask's default session management stores data client-side (signed but not encrypted), creating opportunities for manipulation if developers store authorization tokens or user roles in the session. Consider this vulnerable pattern:
from flask import Flask, session, request
app = Flask(__name__)
app.secret_key = 'dev_secret_key'
@app.route('/update_profile', methods=['POST'])
def update_profile():
if session.get('user_id') != request.form['user_id']:
return 'Unauthorized', 403
# Direct database update without proper authorization
db.update_user(request.form['user_id'], request.form)
return 'Success'The vulnerability here is that the authorization check only compares session user_id against the submitted user_id, but doesn't verify the authenticated user has permission to modify that specific user's data. An attacker can simply change the user_id parameter to modify any account.
Flask's decorator-based routing creates another attack surface. Developers often use custom decorators for authorization but fail to apply them consistently:
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if session.get('role') != 'admin':
abort(401)
return f(*args, **kwargs)
return decorated_function
@app.route('/delete_user/<user_id>')
def delete_user(user_id):
db.delete_user(user_id) # Missing admin_required decorator!
return 'Deleted'Missing decorators represent a common Flask-specific failure where the absence of a single annotation bypasses all authorization logic.
Property-level integrity failures occur when Flask applications expose object properties through APIs without proper access controls. A typical Flask SQLAlchemy pattern:
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), unique=True)
is_admin = db.Column(db.Boolean, default=False)
@app.route('/users/<int:user_id>')
def get_user(user_id):
user = User.query.get(user_id)
return jsonify(user=user.to_dict()) # Exposes all properties!Without property-level filtering, this exposes sensitive fields like is_admin to any authenticated user who can guess user IDs.
Flask's flexibility with request data formats also enables integrity bypass. The framework accepts form data, JSON, and query parameters interchangeably, and developers often validate only one format:
@app.route('/update_settings', methods=['POST'])
def update_settings():
if 'user_id' in request.form:
user_id = request.form['user_id']
else:
user_id = request.json.get('user_id', session['user_id'])
# Inconsistent validation creates bypass opportunities
if user_id == session['user_id']:
db.update_settings(user_id, request.json)
return 'Updated'Attackers can exploit this by switching between request formats to bypass validation logic.
Flask-Specific Detection
Detecting Integrity Failures in Flask requires understanding both the framework's patterns and the specific attack vectors. Manual detection starts with examining route handlers for proper authorization checks before any data modification operations.
Code review should focus on these Flask-specific patterns:
# Check for missing decorators
@app.route('/sensitive_endpoint')
def sensitive_endpoint():
# No @login_required or @admin_required - immediate red flag
perform_sensitive_operation()
# Check for direct database operations without authorization
@app.route('/update/<resource_id>')
def update_resource(resource_id):
data = request.json
db.session.query(Resource).filter_by(id=resource_id).update(data)
db.session.commit() # No authorization check!Flask's debug mode can also expose integrity vulnerabilities. When enabled in production, it provides an interactive debugger that can be exploited to bypass authentication:
app.run(debug=True) # Never in production!Automated detection with middleBrick specifically targets Flask patterns through its black-box scanning approach. The scanner identifies:
- Missing authentication on data modification endpoints
- Property exposure through API responses
- Inconsistent authorization across similar endpoints
- Session manipulation opportunities
- Parameter tampering in URL routes
For OpenAPI spec analysis, middleBrick resolves $ref references and identifies parameter definitions that could be exploited for integrity bypass:
# middleBrick CLI scan
middlebrick scan https://api.example.com --format json
# Output includes integrity findings with severity and remediation
{
"findings": [
{
"severity": "high",
"category": "BOLA/IDOR",
"description": "Endpoint /api/users/{id} allows modification without proper authorization checks",
"remediation": "Implement authorization middleware to verify user permissions before data modification"
}
]
}The scanner also tests for parameter pollution and type confusion that Flask's flexible request parsing enables.
Flask-Specific Remediation
Remediating Integrity Failures in Flask requires implementing defense-in-depth strategies that leverage Flask's extensibility while addressing its unique vulnerabilities. The foundation is consistent authorization middleware.
Create a reusable authorization decorator that validates permissions before any data operation:
from functools import wraps
from flask import abort, g
def authorize_resource(resource_type, operation):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user = g.current_user
resource_id = kwargs.get('resource_id')
if not user.can(resource_type, operation, resource_id):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/users/<int:user_id>', methods=['PUT'])
@authorize_resource('user', 'update')
def update_user(user_id):
data = request.json
user = User.query.get(user_id)
user.update_from_dict(data)
db.session.commit()
return jsonify(user.to_dict())This pattern ensures authorization is checked before any database operation, and the decorator can be consistently applied across all data modification endpoints.
For session security, implement proper session encryption and avoid storing sensitive authorization data client-side:
app.secret_key = os.environ.get('FLASK_SECRET_KEY')
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# Use server-side session storage for sensitive data
session_store = RedisSessionInterface()
app.session_interface = session_store
# Never store authorization in session
# Instead, use JWT tokens with proper claims verification
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
@app.route('/login')
def login():
access_token = create_access_token(identity=user.id,
additional_claims={'role': user.role})
return jsonify(access_token=access_token)Property-level filtering prevents data exposure through API responses:
class User:
def to_dict(self, include_sensitive=False):
data = {
'id': self.id,
'email': self.email,
'created_at': self.created_at
}
if include_sensitive:
data.update({
'is_admin': self.is_admin,
'last_login': self.last_login
})
return data
@app.route('/users/<int:user_id>')
@login_required
def get_user(user_id):
user = User.query.get(user_id)
if not user:
abort(404)
# Only include sensitive data if user is viewing their own profile
include_sensitive = (g.current_user.id == user_id)
return jsonify(user=user.to_dict(include_sensitive))Input validation should be strict and format-specific:
from marshmallow import Schema, fields, ValidationError
class UserUpdateSchema(Schema):
user_id = fields.Int(required=True)
email = fields.Email(allow_none=True)
name = fields.Str(allow_none=True)
@app.route('/update', methods=['POST'])
def update():
try:
data = UserUpdateSchema().load(request.json)
except ValidationError as err:
return jsonify(err.messages), 400
# Verify user owns the resource being updated
if data['user_id'] != g.current_user.id:
abort(403)
db.update_user(data['user_id'], data)
return 'Updated'Finally, implement comprehensive logging for all data modification operations to detect and investigate potential integrity violations:
import logging
from datetime import datetime
audit_logger = logging.getLogger('audit')
class AuditedModel:
def save(self):
db.session.add(self)
db.session.commit()
audit_logger.info(f"{datetime.now()} - {g.current_user.id} modified {self.__class__.__name__} {self.id}")
# Use in models
class User(db.Model, AuditedModel):
# ... model definition
pass