Coverage for src / local_deep_research / security / security_headers.py: 91%
83 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""Security headers middleware for Flask applications.
3This module provides comprehensive HTTP security headers to protect against
4common web vulnerabilities identified by OWASP ZAP and other security scanners.
5"""
7from typing import Optional
9from flask import Flask, Response, request
10from loguru import logger
13class SecurityHeaders:
14 """Configure and apply security headers to Flask responses.
16 Addresses security vulnerabilities:
17 - CSP: Content Security Policy to prevent XSS
18 - Clickjacking: X-Frame-Options protection
19 - MIME sniffing: X-Content-Type-Options
20 - Spectre: Cross-Origin policies
21 - Feature abuse: Permissions-Policy
22 - Information leakage: Server header removal
23 - Protocol downgrade: HSTS (Strict-Transport-Security)
24 """
26 def __init__(self, app: Optional[Flask] = None) -> None:
27 """Initialize security headers middleware.
29 Args:
30 app (Optional[Flask]): Flask application instance (optional, can call init_app later)
31 """
32 if app is not None:
33 self.init_app(app)
35 def init_app(self, app: Flask):
36 """Initialize the Flask application with security headers.
38 Args:
39 app: Flask application instance
41 Security Configuration:
42 SECURITY_CSP_CONNECT_SRC: Restricts WebSocket/fetch connections (default: 'self' ws: wss:)
43 SECURITY_CORS_ENABLED: Enable CORS for API routes (default: True)
44 SECURITY_CORS_ALLOW_CREDENTIALS: Allow credentials in CORS (default: False)
45 SECURITY_CORS_ALLOWED_ORIGINS: Allowed CORS origins, comma-separated for multiple
46 (default: "" - requires explicit configuration).
47 Multi-origin uses origin reflection.
48 SECURITY_COEP_POLICY: Cross-Origin-Embedder-Policy applied globally to ALL routes
49 (default: "credentialless"). Options: require-corp, credentialless, unsafe-none
51 Important Security Trade-offs:
52 - CSP includes 'unsafe-inline' for Socket.IO compatibility
53 - This reduces XSS protection but is necessary for real-time WebSocket functionality
54 - 'unsafe-eval' has been removed to prevent eval() XSS attacks
55 - If Socket.IO is removed, 'unsafe-inline' should also be tightened immediately
56 - Monitor CSP violation reports to detect potential XSS attempts
57 - COEP policy is applied globally, not per-route. 'credentialless' allows cross-origin
58 requests without credentials. Use 'require-corp' for stricter isolation.
59 """
60 # Set default security configuration
61 # connect-src uses 'self' + ws:/wss: to work from any origin (localhost, LAN, Docker, internet)
62 app.config.setdefault(
63 "SECURITY_CSP_CONNECT_SRC",
64 "'self' ws: wss:",
65 )
66 app.config.setdefault("SECURITY_CORS_ENABLED", True)
67 app.config.setdefault("SECURITY_CORS_ALLOW_CREDENTIALS", False)
69 # CORS allowed origins: check env var first, then fall back to default
70 # Env var: LDR_SECURITY_CORS_ALLOWED_ORIGINS
71 # Default to empty (fail closed) - require explicit configuration for CORS
72 from ..settings.env_registry import get_env_setting
74 cors_env = get_env_setting("security.cors.allowed_origins")
75 if cors_env is not None: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 app.config["SECURITY_CORS_ALLOWED_ORIGINS"] = cors_env
77 else:
78 app.config.setdefault("SECURITY_CORS_ALLOWED_ORIGINS", "")
80 app.config.setdefault("SECURITY_COEP_POLICY", "credentialless")
82 self.app = app
83 app.after_request(self.add_security_headers)
85 # Validate CORS configuration at startup
86 self._validate_cors_config()
88 logger.info("Security headers middleware initialized")
89 logger.warning(
90 "CSP configured with 'unsafe-inline' for Socket.IO compatibility. "
91 "'unsafe-eval' removed for better XSS protection. Monitor for CSP violations."
92 )
94 def get_csp_policy(self) -> str:
95 """Generate Content Security Policy header value.
97 Returns:
98 CSP policy string with directives for Socket.IO compatibility
100 Note:
101 - 'unsafe-inline' is required for Socket.IO compatibility
102 - 'unsafe-eval' removed for better XSS protection (not needed for Socket.IO)
103 - connect-src uses 'self' ws: wss: by default (works from any origin)
104 """
105 connect_src = self.app.config.get("SECURITY_CSP_CONNECT_SRC", "'self'")
106 return (
107 "default-src 'self'; "
108 f"connect-src {connect_src}; "
109 "script-src 'self' 'unsafe-inline'; "
110 "style-src 'self' 'unsafe-inline'; "
111 "font-src 'self' data:; "
112 "img-src 'self' data:; "
113 "media-src 'self'; "
114 "worker-src blob:; "
115 "child-src 'self' blob:; "
116 "frame-src 'self'; "
117 "frame-ancestors 'self'; "
118 "manifest-src 'self'; "
119 "object-src 'none'; "
120 "base-uri 'self'; "
121 "form-action 'self';"
122 )
124 @staticmethod
125 def get_permissions_policy() -> str:
126 """Generate Permissions-Policy header value.
128 Disables potentially dangerous browser features by default.
130 Returns:
131 Permissions-Policy string
132 """
133 return (
134 "geolocation=(), "
135 "midi=(), "
136 "camera=(), "
137 "usb=(), "
138 "magnetometer=(), "
139 "accelerometer=(), "
140 "gyroscope=(), "
141 "microphone=(), "
142 "payment=(), "
143 "sync-xhr=(), "
144 "document-domain=()"
145 )
147 def add_security_headers(self, response: Response) -> Response:
148 """Add comprehensive security headers to Flask response.
150 Args:
151 response: Flask response object
153 Returns:
154 Response object with security headers added
156 Note:
157 Cross-Origin-Embedder-Policy (COEP) is applied globally to all routes.
158 Default is 'credentialless' which allows cross-origin requests without
159 credentials. If stricter isolation is needed, configure SECURITY_COEP_POLICY
160 to 'require-corp', but this may break cross-origin API access.
161 """
162 # Content Security Policy - prevents XSS and injection attacks
163 csp = self.get_csp_policy()
164 response.headers["Content-Security-Policy"] = csp
166 # Anti-clickjacking protection
167 response.headers["X-Frame-Options"] = "SAMEORIGIN"
169 # Prevent MIME-type sniffing
170 response.headers["X-Content-Type-Options"] = "nosniff"
172 # Spectre vulnerability mitigation - applied globally
173 response.headers["Cross-Origin-Opener-Policy"] = "same-origin"
174 coep_policy = self.app.config.get(
175 "SECURITY_COEP_POLICY", "credentialless"
176 )
177 response.headers["Cross-Origin-Embedder-Policy"] = coep_policy
178 response.headers["Cross-Origin-Resource-Policy"] = "same-origin"
180 # Permissions Policy - disable dangerous features
181 response.headers["Permissions-Policy"] = self.get_permissions_policy()
183 # Remove server version information to prevent information disclosure
184 response.headers.pop("Server", None)
186 # Referrer Policy - control referrer information
187 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
189 # HSTS - enforce HTTPS connections (only when request is secure)
190 # request.is_secure checks wsgi.url_scheme == 'https'
191 # ProxyFix (configured in app_factory) handles X-Forwarded-Proto for reverse proxies
192 if request.is_secure:
193 response.headers["Strict-Transport-Security"] = (
194 "max-age=31536000; includeSubDomains"
195 )
197 # Cache-Control - prevent caching of sensitive content
198 # Static assets are handled separately by the static file route
199 if not request.path.startswith("/static/"):
200 response.headers["Cache-Control"] = (
201 "no-store, no-cache, must-revalidate, max-age=0"
202 )
203 response.headers["Pragma"] = "no-cache" # HTTP/1.0 compatibility
204 response.headers["Expires"] = "0"
206 # Add CORS headers for API requests if enabled
207 if self._is_api_route(request.path) and self.app.config.get(
208 "SECURITY_CORS_ENABLED", True
209 ):
210 response = self._add_cors_headers(response)
212 return response
214 @staticmethod
215 def _is_api_route(path: str) -> bool:
216 """Check if the request path is an API route.
218 Args:
219 path: Request path to check
221 Returns:
222 True if path is an API route
223 """
224 return (
225 path.startswith("/api/")
226 or path.startswith("/research/api/")
227 or path.startswith("/history/api")
228 )
230 def _validate_cors_config(self) -> None:
231 """Validate CORS configuration at startup to catch misconfigurations early.
233 Raises:
234 ValueError: If CORS configuration is invalid
235 """
236 if not self.app.config.get("SECURITY_CORS_ENABLED", True):
237 return
239 allowed_origins = self.app.config.get(
240 "SECURITY_CORS_ALLOWED_ORIGINS", ""
241 )
242 allow_credentials = self.app.config.get(
243 "SECURITY_CORS_ALLOW_CREDENTIALS", False
244 )
246 # Validate credentials with wildcard origin
247 if allow_credentials and allowed_origins == "*":
248 raise ValueError(
249 "CORS misconfiguration: Cannot use credentials with wildcard origin '*'. "
250 "Set SECURITY_CORS_ALLOWED_ORIGINS to a specific origin or disable credentials."
251 )
253 # Log info about multi-origin + credentials configuration
254 if allow_credentials and "," in allowed_origins: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 logger.info(
256 f"CORS configured with multiple origins and credentials enabled. "
257 f"Origins: {allowed_origins}. "
258 f"Using origin reflection pattern for security."
259 )
261 def _add_cors_headers(self, response: Response) -> Response:
262 """Add CORS headers for API routes using origin reflection for multi-origin support.
264 Args:
265 response: Flask response object to modify
267 Returns:
268 Response object with CORS headers added
270 Security Note:
271 - Uses "origin reflection" pattern for multi-origin support (comma-separated origins config)
272 - Reflects the requesting origin back if it matches the whitelist
273 - Wildcard origin (*) disables credentials per CORS spec
274 - Single origin or reflected origins allow credentials if configured
275 """
276 configured_origins = self.app.config.get(
277 "SECURITY_CORS_ALLOWED_ORIGINS", ""
278 )
279 allow_credentials = self.app.config.get(
280 "SECURITY_CORS_ALLOW_CREDENTIALS", False
281 )
283 # Determine which origin to send back
284 origin_to_send = configured_origins
285 request_origin = request.headers.get("Origin")
287 # If wildcard, allow all origins
288 if configured_origins == "*":
289 origin_to_send = "*"
290 # If multiple origins configured (comma-separated), use origin reflection
291 elif "," in configured_origins:
292 # Parse configured origins into a set
293 allowed_origins_set = {
294 origin.strip() for origin in configured_origins.split(",")
295 }
297 # Reflect the request origin if it's in the whitelist (when present)
298 if request_origin: 298 ↛ 314line 298 didn't jump to line 314 because the condition on line 298 was always true
299 if request_origin in allowed_origins_set: 299 ↛ 304line 299 didn't jump to line 304 because the condition on line 299 was always true
300 origin_to_send = request_origin
301 else:
302 # Request origin not in whitelist - log but still set configured origin
303 # (Browsers will enforce CORS, this just logs for monitoring)
304 logger.warning(
305 f"CORS request from non-whitelisted origin: {request_origin}. "
306 f"Allowed origins: {configured_origins}"
307 )
308 # Use first configured origin for backward compatibility
309 origin_to_send = list(allowed_origins_set)[0]
310 # Single origin configured - always use it (browser enforces CORS)
311 else:
312 origin_to_send = configured_origins
314 response.headers["Access-Control-Allow-Origin"] = origin_to_send
315 response.headers["Access-Control-Allow-Methods"] = (
316 "GET, POST, PUT, DELETE, OPTIONS"
317 )
318 response.headers["Access-Control-Allow-Headers"] = (
319 "Content-Type, Authorization, X-Requested-With, X-HTTP-Method-Override"
320 )
322 # Set credentials if configured and not using wildcard
323 # Note: Startup validation ensures credentials is False with wildcard or multi-origin
324 if allow_credentials and origin_to_send != "*": 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true
325 response.headers["Access-Control-Allow-Credentials"] = "true"
327 response.headers["Access-Control-Max-Age"] = "3600"
329 return response