Coverage for src / local_deep_research / security / security_headers.py: 99%
84 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Security headers middleware for Flask applications.
3This module provides comprehensive HTTP security headers to protect against
4common web vulnerabilities identified by OWASP ZAP and other security scanners.
5"""
7from typing import Optional
9from flask import Flask, Response, request
10from loguru import logger
13class SecurityHeaders:
14 """Configure and apply security headers to Flask responses.
16 Addresses security vulnerabilities:
17 - CSP: Content Security Policy to prevent XSS
18 - Clickjacking: X-Frame-Options protection
19 - MIME sniffing: X-Content-Type-Options
20 - Spectre: Cross-Origin policies
21 - Feature abuse: Permissions-Policy
22 - Information leakage: Server header removal
23 - Protocol downgrade: HSTS (Strict-Transport-Security)
24 """
26 def __init__(self, app: Optional[Flask] = None) -> None:
27 """Initialize security headers middleware.
29 Args:
30 app (Optional[Flask]): Flask application instance (optional, can call init_app later)
31 """
32 if app is not None:
33 self.init_app(app)
35 def init_app(self, app: Flask):
36 """Initialize the Flask application with security headers.
38 Args:
39 app: Flask application instance
41 Security Configuration:
42 SECURITY_CSP_CONNECT_SRC: Restricts WebSocket/fetch connections (default: 'self')
43 SECURITY_CORS_ENABLED: Enable CORS for API routes (default: True)
44 SECURITY_CORS_ALLOW_CREDENTIALS: Allow credentials in CORS (default: False)
45 SECURITY_CORS_ALLOWED_ORIGINS: Allowed CORS origins, comma-separated for multiple
46 (default: "" - requires explicit configuration).
47 Multi-origin uses origin reflection.
48 SECURITY_COEP_POLICY: Cross-Origin-Embedder-Policy applied globally to ALL routes
49 (default: "credentialless"). Options: require-corp, credentialless, unsafe-none
51 Important Security Trade-offs:
52 - CSP includes 'unsafe-inline' for Socket.IO compatibility
53 - This reduces XSS protection but is necessary for real-time WebSocket functionality
54 - 'unsafe-eval' has been removed to prevent eval() XSS attacks
55 - If Socket.IO is removed, 'unsafe-inline' should also be tightened immediately
56 - Monitor CSP violation reports to detect potential XSS attempts
57 - COEP policy is applied globally, not per-route. 'credentialless' allows cross-origin
58 requests without credentials. Use 'require-corp' for stricter isolation.
59 """
60 # Set default security configuration
61 # connect-src restricted to 'self' — covers same-origin HTTP requests.
62 # Socket.IO uses HTTP long-polling transport with same-origin URLs
63 # (baseUrl = window.location.protocol + '//' + window.location.host),
64 # so explicit ws:/wss: schemes are not needed.
65 app.config.setdefault(
66 "SECURITY_CSP_CONNECT_SRC",
67 "'self'",
68 )
69 app.config.setdefault("SECURITY_CORS_ENABLED", True)
70 app.config.setdefault("SECURITY_CORS_ALLOW_CREDENTIALS", False)
72 # CORS allowed origins: check env var first, then fall back to default
73 # Env var: LDR_SECURITY_CORS_ALLOWED_ORIGINS
74 # Default to empty (fail closed) - require explicit configuration for CORS
75 from ..settings.env_registry import get_env_setting
77 cors_env = get_env_setting("security.cors.allowed_origins")
78 if cors_env is not None:
79 app.config["SECURITY_CORS_ALLOWED_ORIGINS"] = cors_env
80 else:
81 app.config.setdefault("SECURITY_CORS_ALLOWED_ORIGINS", "")
83 app.config.setdefault("SECURITY_COEP_POLICY", "credentialless")
85 self.app = app
87 # Pre-compute static header values once at startup to avoid
88 # regenerating identical strings on every response
89 self._cached_csp = self.get_csp_policy()
90 self._cached_permissions_policy = self.get_permissions_policy()
92 app.after_request(self.add_security_headers)
94 # Validate CORS configuration at startup
95 self._validate_cors_config()
97 logger.info("Security headers middleware initialized")
98 logger.warning(
99 "CSP configured with 'unsafe-inline' for Socket.IO compatibility. "
100 "'unsafe-eval' removed for better XSS protection. Monitor for CSP violations."
101 )
103 def get_csp_policy(self) -> str:
104 """Generate Content Security Policy header value.
106 Returns:
107 CSP policy string with directives for Socket.IO compatibility
109 Note:
110 - 'unsafe-inline' is required for Socket.IO compatibility; removing it
111 would break real-time WebSocket functionality. If Socket.IO is removed
112 in the future, 'unsafe-inline' should be tightened immediately.
113 - 'unsafe-eval' removed for better XSS protection (not needed for Socket.IO)
114 - connect-src defaults to 'self' (same-origin HTTP + WebSocket)
115 """
116 connect_src = self.app.config.get("SECURITY_CSP_CONNECT_SRC", "'self'")
117 return (
118 "default-src 'self'; "
119 f"connect-src {connect_src}; "
120 "script-src 'self' 'unsafe-inline'; "
121 "style-src 'self' 'unsafe-inline'; "
122 "font-src 'self' data:; "
123 "img-src 'self' data:; "
124 "media-src 'self'; "
125 "worker-src blob:; "
126 "child-src 'self' blob:; "
127 "frame-src 'self'; "
128 "frame-ancestors 'self'; "
129 "manifest-src 'self'; "
130 "object-src 'none'; "
131 "base-uri 'self'; "
132 "form-action 'self';"
133 )
135 @staticmethod
136 def get_permissions_policy() -> str:
137 """Generate Permissions-Policy header value.
139 Disables potentially dangerous browser features by default.
141 Returns:
142 Permissions-Policy string
143 """
144 return (
145 "geolocation=(), "
146 "midi=(), "
147 "camera=(), "
148 "usb=(), "
149 "magnetometer=(), "
150 "accelerometer=(), "
151 "gyroscope=(), "
152 "microphone=(), "
153 "payment=(), "
154 "sync-xhr=(), "
155 "document-domain=()"
156 )
158 def add_security_headers(self, response: Response) -> Response:
159 """Add comprehensive security headers to Flask response.
161 Args:
162 response: Flask response object
164 Returns:
165 Response object with security headers added
167 Note:
168 Cross-Origin-Embedder-Policy (COEP) is applied globally to all routes.
169 Default is 'credentialless' which allows cross-origin requests without
170 credentials. If stricter isolation is needed, configure SECURITY_COEP_POLICY
171 to 'require-corp', but this may break cross-origin API access.
172 """
173 # Content Security Policy - prevents XSS and injection attacks
174 response.headers["Content-Security-Policy"] = self._cached_csp
176 # Anti-clickjacking protection
177 response.headers["X-Frame-Options"] = "SAMEORIGIN"
179 # Prevent MIME-type sniffing
180 response.headers["X-Content-Type-Options"] = "nosniff"
182 # Spectre vulnerability mitigation - applied globally
183 response.headers["Cross-Origin-Opener-Policy"] = "same-origin"
184 coep_policy = self.app.config.get(
185 "SECURITY_COEP_POLICY", "credentialless"
186 )
187 response.headers["Cross-Origin-Embedder-Policy"] = coep_policy
188 response.headers["Cross-Origin-Resource-Policy"] = "same-origin"
190 # Permissions Policy - disable dangerous features
191 response.headers["Permissions-Policy"] = self._cached_permissions_policy
193 # Remove server version information to prevent information disclosure
194 response.headers.pop("Server", None)
196 # Referrer Policy - control referrer information
197 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
199 # HSTS - enforce HTTPS connections (only when request is secure)
200 # request.is_secure checks wsgi.url_scheme == 'https'
201 # ProxyFix (configured in app_factory) handles X-Forwarded-Proto for reverse proxies
202 if request.is_secure:
203 response.headers["Strict-Transport-Security"] = (
204 "max-age=31536000; includeSubDomains"
205 )
207 # Cache-Control - prevent caching of sensitive content
208 # Static assets are handled separately by the static file route
209 if not request.path.startswith("/static/"):
210 response.headers["Cache-Control"] = (
211 "no-store, no-cache, must-revalidate, max-age=0"
212 )
213 response.headers["Pragma"] = "no-cache" # HTTP/1.0 compatibility
214 response.headers["Expires"] = "0"
216 # Add CORS headers for API requests if enabled
217 if self._is_api_route(request.path) and self.app.config.get(
218 "SECURITY_CORS_ENABLED", True
219 ):
220 response = self._add_cors_headers(response)
222 return response
224 @staticmethod
225 def _is_api_route(path: str) -> bool:
226 """Check if the request path is an API route.
228 Args:
229 path: Request path to check
231 Returns:
232 True if path is an API route
233 """
234 return (
235 path.startswith("/api/")
236 or path.startswith("/research/api/")
237 or path.startswith("/history/api")
238 )
240 def _validate_cors_config(self) -> None:
241 """Validate CORS configuration at startup to catch misconfigurations early.
243 Raises:
244 ValueError: If CORS configuration is invalid
245 """
246 if not self.app.config.get("SECURITY_CORS_ENABLED", True):
247 return
249 allowed_origins = self.app.config.get(
250 "SECURITY_CORS_ALLOWED_ORIGINS", ""
251 )
252 allow_credentials = self.app.config.get(
253 "SECURITY_CORS_ALLOW_CREDENTIALS", False
254 )
256 # Validate credentials with wildcard origin
257 if allow_credentials and allowed_origins == "*":
258 raise ValueError(
259 "CORS misconfiguration: Cannot use credentials with wildcard origin '*'. "
260 "Set SECURITY_CORS_ALLOWED_ORIGINS to a specific origin or disable credentials."
261 )
263 # Log info about multi-origin + credentials configuration
264 if allow_credentials and "," in allowed_origins:
265 logger.info(
266 f"CORS configured with multiple origins and credentials enabled. "
267 f"Origins: {allowed_origins}. "
268 f"Using origin reflection pattern for security."
269 )
271 def _add_cors_headers(self, response: Response) -> Response:
272 """Add CORS headers for API routes using origin reflection for multi-origin support.
274 Args:
275 response: Flask response object to modify
277 Returns:
278 Response object with CORS headers added
280 Security Note:
281 - Uses "origin reflection" pattern for multi-origin support (comma-separated origins config)
282 - Reflects the requesting origin back if it matches the whitelist
283 - Wildcard origin (*) disables credentials per CORS spec
284 - Single origin or reflected origins allow credentials if configured
285 """
286 configured_origins = self.app.config.get(
287 "SECURITY_CORS_ALLOWED_ORIGINS", ""
288 )
289 allow_credentials = self.app.config.get(
290 "SECURITY_CORS_ALLOW_CREDENTIALS", False
291 )
293 # Determine which origin to send back
294 origin_to_send = configured_origins
295 request_origin = request.headers.get("Origin")
297 # If wildcard, allow all origins
298 if configured_origins == "*":
299 origin_to_send = "*"
300 # If multiple origins configured (comma-separated), use origin reflection
301 elif "," in configured_origins:
302 # Parse configured origins into a set
303 allowed_origins_set = {
304 origin.strip() for origin in configured_origins.split(",")
305 }
307 # Reflect the request origin if it's in the whitelist (when present)
308 if request_origin: 308 ↛ 324line 308 didn't jump to line 324 because the condition on line 308 was always true
309 if request_origin in allowed_origins_set:
310 origin_to_send = request_origin
311 else:
312 # Request origin not in whitelist - log but still set configured origin
313 # (Browsers will enforce CORS, this just logs for monitoring)
314 logger.warning(
315 f"CORS request from non-whitelisted origin: {request_origin}. "
316 f"Allowed origins: {configured_origins}"
317 )
318 # Use first configured origin for backward compatibility
319 origin_to_send = list(allowed_origins_set)[0]
320 # Single origin configured - always use it (browser enforces CORS)
321 else:
322 origin_to_send = configured_origins
324 response.headers["Access-Control-Allow-Origin"] = origin_to_send
325 response.headers["Access-Control-Allow-Methods"] = (
326 "GET, POST, PUT, DELETE, OPTIONS"
327 )
328 response.headers["Access-Control-Allow-Headers"] = (
329 "Content-Type, Authorization, X-Requested-With, X-HTTP-Method-Override"
330 )
332 # Set credentials if configured and not using wildcard
333 # Note: Startup validation rejects credentials with wildcard origin
334 if allow_credentials and origin_to_send != "*":
335 response.headers["Access-Control-Allow-Credentials"] = "true"
337 response.headers["Access-Control-Max-Age"] = "3600"
339 return response