Coverage for src / local_deep_research / security / rate_limiter.py: 99%
63 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Rate limiting utility for HTTP endpoints.
3Provides a global limiter instance that can be imported by blueprints.
5Rate limits are configurable via environment variables (LDR_SECURITY_RATE_LIMIT_*).
6Legacy server_config.json values are honored during the deprecation period.
7Changes require server restart to take effect.
9Note: This is designed for single-instance local deployments. For multi-worker
10production deployments, configure Redis storage via RATELIMIT_STORAGE_URL.
11"""
13from flask import g, request, session as flask_session
14from flask_limiter import Limiter
15from flask_limiter.util import get_remote_address
16from loguru import logger
18from ..settings.env_registry import is_rate_limiting_enabled
19from ..web.server_config import load_server_config
21# Load rate limits from server config (UI-configurable)
22# Multiple limits can be separated by semicolons (e.g., "5000 per hour;50000 per day")
23_config = load_server_config()
24DEFAULT_RATE_LIMIT = _config["rate_limit_default"]
25LOGIN_RATE_LIMIT = _config["rate_limit_login"]
26REGISTRATION_RATE_LIMIT = _config["rate_limit_registration"]
27# Settings modification rate limit - prevent abuse of settings endpoints
28SETTINGS_RATE_LIMIT = _config["rate_limit_settings"]
31def get_client_ip():
32 """
33 Get the real client IP address, respecting X-Forwarded-For headers.
35 This is important for deployments behind proxies/load balancers.
36 Falls back to direct remote address if no forwarded headers present.
37 """
38 # Check X-Forwarded-For header (set by proxies/load balancers)
39 forwarded_for = request.environ.get("HTTP_X_FORWARDED_FOR")
40 if forwarded_for:
41 # Take the first IP in the chain (client IP)
42 return forwarded_for.split(",")[0].strip()
44 # Check X-Real-IP header (alternative proxy header)
45 real_ip = request.environ.get("HTTP_X_REAL_IP")
46 if real_ip:
47 return real_ip.strip()
49 # Fallback to direct remote address
50 return get_remote_address()
53# Global limiter instance - will be initialized in app_factory
54# Rate limiting is disabled in CI unless ENABLE_RATE_LIMITING=true
55# This allows the rate limiting test to run with rate limiting enabled
56#
57# Note: In-memory storage is used by default, which is suitable for single-instance
58# deployments. For multi-instance production deployments behind a load balancer,
59# configure Redis storage via RATELIMIT_STORAGE_URL environment variable:
60# export RATELIMIT_STORAGE_URL="redis://localhost:6379"
61limiter = Limiter(
62 key_func=get_client_ip,
63 default_limits=[DEFAULT_RATE_LIMIT],
64 storage_uri="memory://",
65 headers_enabled=True,
66 enabled=is_rate_limiting_enabled(),
67)
70# Shared rate limit decorators for authentication endpoints
71# These can be imported and used directly on routes
72login_limit = limiter.shared_limit(
73 LOGIN_RATE_LIMIT,
74 scope="login",
75)
77registration_limit = limiter.shared_limit(
78 REGISTRATION_RATE_LIMIT,
79 scope="registration",
80)
82settings_limit = limiter.shared_limit(
83 SETTINGS_RATE_LIMIT,
84 scope="settings",
85)
87password_change_limit = limiter.shared_limit(
88 LOGIN_RATE_LIMIT,
89 scope="password_change",
90)
93# ---------------------------------------------------------------------------
94# Shared helpers
95# ---------------------------------------------------------------------------
98def get_current_username():
99 """Return the authenticated username from g.current_user or the session.
101 g.current_user is set by the inject_current_user before_request handler
102 and is the preferred source. The session fallback covers cases where
103 g.current_user was cleared or is unavailable (e.g., tests, CLI contexts).
104 """
105 if hasattr(g, "current_user") and g.current_user:
106 return g.current_user
107 return flask_session.get("username")
110# ---------------------------------------------------------------------------
111# API v1 rate limiting (per-user, configurable via DB setting)
112# ---------------------------------------------------------------------------
114API_RATE_LIMIT_DEFAULT = 60 # requests per minute
117def _get_user_api_rate_limit():
118 """Read the per-user API rate limit from DB, cached on flask.g."""
119 if hasattr(g, "_api_rate_limit"):
120 return g._api_rate_limit
122 from ..database.session_context import get_user_db_session
123 from ..utilities.db_utils import get_settings_manager
125 username = get_current_username()
127 rate_limit = API_RATE_LIMIT_DEFAULT
128 if username:
129 try:
130 with get_user_db_session(username) as db_session:
131 if db_session: 131 ↛ 139line 131 didn't jump to line 139
132 sm = get_settings_manager(db_session, username)
133 rate_limit = sm.get_setting(
134 "app.api_rate_limit", API_RATE_LIMIT_DEFAULT
135 )
136 except Exception:
137 logger.debug("Failed to read API rate limit setting", exc_info=True)
139 g._api_rate_limit = rate_limit
140 return rate_limit
143def _get_api_rate_limit_string():
144 """Return Flask-Limiter format string for the current user's API limit."""
145 return f"{_get_user_api_rate_limit()} per minute"
148def _is_api_rate_limit_exempt():
149 """Exempt unauthenticated requests (auth decorator handles rejection)
150 and users who set rate_limit=0 (disabled)."""
151 if not get_current_username():
152 return True
153 return not _get_user_api_rate_limit()
156def _get_api_user_key():
157 """Key function for API rate limiting — keyed by authenticated username.
159 Unauthenticated requests are exempt via _is_api_rate_limit_exempt and
160 rejected by api_access_control, so this function is only called for
161 authenticated users.
162 """
163 return f"api_user:{get_current_username()}"
166api_rate_limit = limiter.shared_limit(
167 _get_api_rate_limit_string,
168 scope="api_v1",
169 key_func=_get_api_user_key,
170 exempt_when=_is_api_rate_limit_exempt,
171)
174# ---------------------------------------------------------------------------
175# File upload rate limiting (dual-keyed: per-user AND per-IP)
176# ---------------------------------------------------------------------------
178_UPLOAD_RATE_LIMIT = "10 per minute;100 per hour"
181def _get_upload_user_key():
182 """Key function for upload rate limiting — keyed by authenticated username."""
183 username = get_current_username()
184 if username:
185 return f"upload_user:{username}"
186 return f"upload_ip:{get_client_ip()}"
189upload_rate_limit_user = limiter.shared_limit(
190 _UPLOAD_RATE_LIMIT,
191 scope="upload_user",
192 key_func=_get_upload_user_key,
193)
195upload_rate_limit_ip = limiter.shared_limit(
196 _UPLOAD_RATE_LIMIT,
197 scope="upload_ip",
198)