Coverage for src / local_deep_research / security / security_headers.py: 79%
75 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""Security headers middleware for Flask applications.
3This module provides comprehensive HTTP security headers to protect against
4common web vulnerabilities identified by OWASP ZAP and other security scanners.
5"""
7from typing import Optional
9from flask import Flask, Response, request
10from loguru import logger
13class SecurityHeaders:
14 """Configure and apply security headers to Flask responses.
16 Addresses security vulnerabilities:
17 - CSP: Content Security Policy to prevent XSS
18 - Clickjacking: X-Frame-Options protection
19 - MIME sniffing: X-Content-Type-Options
20 - Spectre: Cross-Origin policies
21 - Feature abuse: Permissions-Policy
22 - Information leakage: Server header removal
23 - Protocol downgrade: HSTS (Strict-Transport-Security)
24 """
26 def __init__(self, app: Optional[Flask] = None) -> None:
27 """Initialize security headers middleware.
29 Args:
30 app (Optional[Flask]): Flask application instance (optional, can call init_app later)
31 """
32 if app is not None: 32 ↛ exitline 32 didn't return from function '__init__' because the condition on line 32 was always true
33 self.init_app(app)
35 def init_app(self, app: Flask):
36 """Initialize the Flask application with security headers.
38 Args:
39 app: Flask application instance
41 Security Configuration:
42 SECURITY_CSP_CONNECT_SRC: Restricts WebSocket/fetch connections (default: 'self' ws: wss:)
43 SECURITY_CORS_ENABLED: Enable CORS for API routes (default: True)
44 SECURITY_CORS_ALLOW_CREDENTIALS: Allow credentials in CORS (default: False)
45 SECURITY_CORS_ALLOWED_ORIGINS: Allowed CORS origins, comma-separated for multiple
46 (default: "" - requires explicit configuration).
47 Multi-origin uses origin reflection.
48 SECURITY_COEP_POLICY: Cross-Origin-Embedder-Policy applied globally to ALL routes
49 (default: "credentialless"). Options: require-corp, credentialless, unsafe-none
51 Important Security Trade-offs:
52 - CSP includes 'unsafe-inline' for Socket.IO compatibility
53 - This reduces XSS protection but is necessary for real-time WebSocket functionality
54 - 'unsafe-eval' has been removed to prevent eval() XSS attacks
55 - If Socket.IO is removed, 'unsafe-inline' should also be tightened immediately
56 - Monitor CSP violation reports to detect potential XSS attempts
57 - COEP policy is applied globally, not per-route. 'credentialless' allows cross-origin
58 requests without credentials. Use 'require-corp' for stricter isolation.
59 """
60 # Set default security configuration
61 # connect-src uses 'self' + ws:/wss: to work from any origin (localhost, LAN, Docker, internet)
62 app.config.setdefault(
63 "SECURITY_CSP_CONNECT_SRC",
64 "'self' ws: wss:",
65 )
66 app.config.setdefault("SECURITY_CORS_ENABLED", True)
67 app.config.setdefault("SECURITY_CORS_ALLOW_CREDENTIALS", False)
68 # Default to empty (fail closed) - require explicit configuration for CORS
69 # Using "*" creates a security footgun - users must explicitly allow origins
70 app.config.setdefault("SECURITY_CORS_ALLOWED_ORIGINS", "")
71 app.config.setdefault("SECURITY_COEP_POLICY", "credentialless")
73 self.app = app
74 app.after_request(self.add_security_headers)
76 # Validate CORS configuration at startup
77 self._validate_cors_config()
79 logger.info("Security headers middleware initialized")
80 logger.warning(
81 "CSP configured with 'unsafe-inline' for Socket.IO compatibility. "
82 "'unsafe-eval' removed for better XSS protection. Monitor for CSP violations."
83 )
85 def get_csp_policy(self) -> str:
86 """Generate Content Security Policy header value.
88 Returns:
89 CSP policy string with directives for Socket.IO compatibility
91 Note:
92 - 'unsafe-inline' is required for Socket.IO compatibility
93 - 'unsafe-eval' removed for better XSS protection (not needed for Socket.IO)
94 - connect-src uses 'self' ws: wss: by default (works from any origin)
95 """
96 connect_src = self.app.config.get("SECURITY_CSP_CONNECT_SRC", "'self'")
97 return (
98 "default-src 'self'; "
99 f"connect-src {connect_src}; "
100 "script-src 'self' 'unsafe-inline'; "
101 "style-src 'self' 'unsafe-inline'; "
102 "font-src 'self' data:; "
103 "img-src 'self' data:; "
104 "worker-src blob:; "
105 "frame-src 'self'; "
106 "object-src 'none'; "
107 "base-uri 'self'; "
108 "form-action 'self';"
109 )
111 @staticmethod
112 def get_permissions_policy() -> str:
113 """Generate Permissions-Policy header value.
115 Disables potentially dangerous browser features by default.
117 Returns:
118 Permissions-Policy string
119 """
120 return (
121 "geolocation=(), "
122 "midi=(), "
123 "camera=(), "
124 "usb=(), "
125 "magnetometer=(), "
126 "accelerometer=(), "
127 "gyroscope=(), "
128 "microphone=(), "
129 "payment=(), "
130 "sync-xhr=(), "
131 "document-domain=()"
132 )
134 def add_security_headers(self, response: Response) -> Response:
135 """Add comprehensive security headers to Flask response.
137 Args:
138 response: Flask response object
140 Returns:
141 Response object with security headers added
143 Note:
144 Cross-Origin-Embedder-Policy (COEP) is applied globally to all routes.
145 Default is 'credentialless' which allows cross-origin requests without
146 credentials. If stricter isolation is needed, configure SECURITY_COEP_POLICY
147 to 'require-corp', but this may break cross-origin API access.
148 """
149 # Content Security Policy - prevents XSS and injection attacks
150 csp = self.get_csp_policy()
151 response.headers["Content-Security-Policy"] = csp
153 # Anti-clickjacking protection
154 response.headers["X-Frame-Options"] = "SAMEORIGIN"
156 # Prevent MIME-type sniffing
157 response.headers["X-Content-Type-Options"] = "nosniff"
159 # Spectre vulnerability mitigation - applied globally
160 response.headers["Cross-Origin-Opener-Policy"] = "same-origin"
161 coep_policy = self.app.config.get(
162 "SECURITY_COEP_POLICY", "credentialless"
163 )
164 response.headers["Cross-Origin-Embedder-Policy"] = coep_policy
165 response.headers["Cross-Origin-Resource-Policy"] = "same-origin"
167 # Permissions Policy - disable dangerous features
168 response.headers["Permissions-Policy"] = self.get_permissions_policy()
170 # Remove server version information to prevent information disclosure
171 response.headers.pop("Server", None)
173 # Referrer Policy - control referrer information
174 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
176 # HSTS - enforce HTTPS connections (only when request is secure)
177 # request.is_secure checks wsgi.url_scheme == 'https'
178 # ProxyFix (configured in app_factory) handles X-Forwarded-Proto for reverse proxies
179 if request.is_secure:
180 response.headers["Strict-Transport-Security"] = (
181 "max-age=31536000; includeSubDomains"
182 )
184 # Add CORS headers for API requests if enabled
185 if self._is_api_route(request.path) and self.app.config.get(
186 "SECURITY_CORS_ENABLED", True
187 ):
188 response = self._add_cors_headers(response)
190 return response
192 @staticmethod
193 def _is_api_route(path: str) -> bool:
194 """Check if the request path is an API route.
196 Args:
197 path: Request path to check
199 Returns:
200 True if path is an API route
201 """
202 return path.startswith("/api/") or path.startswith("/research/api/")
204 def _validate_cors_config(self) -> None:
205 """Validate CORS configuration at startup to catch misconfigurations early.
207 Raises:
208 ValueError: If CORS configuration is invalid
209 """
210 if not self.app.config.get("SECURITY_CORS_ENABLED", True): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 return
213 allowed_origins = self.app.config.get(
214 "SECURITY_CORS_ALLOWED_ORIGINS", ""
215 )
216 allow_credentials = self.app.config.get(
217 "SECURITY_CORS_ALLOW_CREDENTIALS", False
218 )
220 # Validate credentials with wildcard origin
221 if allow_credentials and allowed_origins == "*": 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 raise ValueError(
223 "CORS misconfiguration: Cannot use credentials with wildcard origin '*'. "
224 "Set SECURITY_CORS_ALLOWED_ORIGINS to a specific origin or disable credentials."
225 )
227 # Log info about multi-origin + credentials configuration
228 if allow_credentials and "," in allowed_origins: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 logger.info(
230 f"CORS configured with multiple origins and credentials enabled. "
231 f"Origins: {allowed_origins}. "
232 f"Using origin reflection pattern for security."
233 )
235 def _add_cors_headers(self, response: Response) -> Response:
236 """Add CORS headers for API routes using origin reflection for multi-origin support.
238 Args:
239 response: Flask response object to modify
241 Returns:
242 Response object with CORS headers added
244 Security Note:
245 - Uses "origin reflection" pattern for multi-origin support (comma-separated origins config)
246 - Reflects the requesting origin back if it matches the whitelist
247 - Wildcard origin (*) disables credentials per CORS spec
248 - Single origin or reflected origins allow credentials if configured
249 """
250 configured_origins = self.app.config.get(
251 "SECURITY_CORS_ALLOWED_ORIGINS", ""
252 )
253 allow_credentials = self.app.config.get(
254 "SECURITY_CORS_ALLOW_CREDENTIALS", False
255 )
257 # Determine which origin to send back
258 origin_to_send = configured_origins
259 request_origin = request.headers.get("Origin")
261 # If wildcard, allow all origins
262 if configured_origins == "*":
263 origin_to_send = "*"
264 # If multiple origins configured (comma-separated), use origin reflection
265 elif "," in configured_origins: 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was never true
266 # Parse configured origins into a set
267 allowed_origins_set = {
268 origin.strip() for origin in configured_origins.split(",")
269 }
271 # Reflect the request origin if it's in the whitelist (when present)
272 if request_origin:
273 if request_origin in allowed_origins_set:
274 origin_to_send = request_origin
275 else:
276 # Request origin not in whitelist - log but still set configured origin
277 # (Browsers will enforce CORS, this just logs for monitoring)
278 logger.warning(
279 f"CORS request from non-whitelisted origin: {request_origin}. "
280 f"Allowed origins: {configured_origins}"
281 )
282 # Use first configured origin for backward compatibility
283 origin_to_send = list(allowed_origins_set)[0]
284 # Single origin configured - always use it (browser enforces CORS)
285 else:
286 origin_to_send = configured_origins
288 response.headers["Access-Control-Allow-Origin"] = origin_to_send
289 response.headers["Access-Control-Allow-Methods"] = (
290 "GET, POST, PUT, DELETE, OPTIONS"
291 )
292 response.headers["Access-Control-Allow-Headers"] = (
293 "Content-Type, Authorization, X-Requested-With, X-HTTP-Method-Override"
294 )
296 # Set credentials if configured and not using wildcard
297 # Note: Startup validation ensures credentials is False with wildcard or multi-origin
298 if allow_credentials and origin_to_send != "*": 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 response.headers["Access-Control-Allow-Credentials"] = "true"
301 response.headers["Access-Control-Max-Age"] = "3600"
303 return response