Coverage for src / local_deep_research / security / security_headers.py: 99%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Security headers middleware for Flask applications. 

2 

3This module provides comprehensive HTTP security headers to protect against 

4common web vulnerabilities identified by OWASP ZAP and other security scanners. 

5""" 

6 

7from typing import Optional 

8 

9from flask import Flask, Response, request 

10from loguru import logger 

11 

12 

13class SecurityHeaders: 

14 """Configure and apply security headers to Flask responses. 

15 

16 Addresses security vulnerabilities: 

17 - CSP: Content Security Policy to prevent XSS 

18 - Clickjacking: X-Frame-Options protection 

19 - MIME sniffing: X-Content-Type-Options 

20 - Spectre: Cross-Origin policies 

21 - Feature abuse: Permissions-Policy 

22 - Information leakage: Server header removal 

23 - Protocol downgrade: HSTS (Strict-Transport-Security) 

24 """ 

25 

26 def __init__(self, app: Optional[Flask] = None) -> None: 

27 """Initialize security headers middleware. 

28 

29 Args: 

30 app (Optional[Flask]): Flask application instance (optional, can call init_app later) 

31 """ 

32 if app is not None: 

33 self.init_app(app) 

34 

35 def init_app(self, app: Flask): 

36 """Initialize the Flask application with security headers. 

37 

38 Args: 

39 app: Flask application instance 

40 

41 Security Configuration: 

42 SECURITY_CSP_CONNECT_SRC: Restricts WebSocket/fetch connections (default: 'self') 

43 SECURITY_CORS_ENABLED: Enable CORS for API routes (default: True) 

44 SECURITY_CORS_ALLOW_CREDENTIALS: Allow credentials in CORS (default: False) 

45 SECURITY_CORS_ALLOWED_ORIGINS: Allowed CORS origins, comma-separated for multiple 

46 (default: "" - requires explicit configuration). 

47 Multi-origin uses origin reflection. 

48 SECURITY_COEP_POLICY: Cross-Origin-Embedder-Policy applied globally to ALL routes 

49 (default: "credentialless"). Options: require-corp, credentialless, unsafe-none 

50 

51 Important Security Trade-offs: 

52 - CSP includes 'unsafe-inline' for Socket.IO compatibility 

53 - This reduces XSS protection but is necessary for real-time WebSocket functionality 

54 - 'unsafe-eval' has been removed to prevent eval() XSS attacks 

55 - If Socket.IO is removed, 'unsafe-inline' should also be tightened immediately 

56 - Monitor CSP violation reports to detect potential XSS attempts 

57 - COEP policy is applied globally, not per-route. 'credentialless' allows cross-origin 

58 requests without credentials. Use 'require-corp' for stricter isolation. 

59 """ 

60 # Set default security configuration 

61 # connect-src restricted to 'self' — covers same-origin HTTP requests. 

62 # Socket.IO uses HTTP long-polling transport with same-origin URLs 

63 # (baseUrl = window.location.protocol + '//' + window.location.host), 

64 # so explicit ws:/wss: schemes are not needed. 

65 app.config.setdefault( 

66 "SECURITY_CSP_CONNECT_SRC", 

67 "'self'", 

68 ) 

69 app.config.setdefault("SECURITY_CORS_ENABLED", True) 

70 app.config.setdefault("SECURITY_CORS_ALLOW_CREDENTIALS", False) 

71 

72 # CORS allowed origins: check env var first, then fall back to default 

73 # Env var: LDR_SECURITY_CORS_ALLOWED_ORIGINS 

74 # Default to empty (fail closed) - require explicit configuration for CORS 

75 from ..settings.env_registry import get_env_setting 

76 

77 cors_env = get_env_setting("security.cors.allowed_origins") 

78 if cors_env is not None: 

79 app.config["SECURITY_CORS_ALLOWED_ORIGINS"] = cors_env 

80 else: 

81 app.config.setdefault("SECURITY_CORS_ALLOWED_ORIGINS", "") 

82 

83 app.config.setdefault("SECURITY_COEP_POLICY", "credentialless") 

84 

85 self.app = app 

86 

87 # Pre-compute static header values once at startup to avoid 

88 # regenerating identical strings on every response 

89 self._cached_csp = self.get_csp_policy() 

90 self._cached_permissions_policy = self.get_permissions_policy() 

91 

92 app.after_request(self.add_security_headers) 

93 

94 # Validate CORS configuration at startup 

95 self._validate_cors_config() 

96 

97 logger.info("Security headers middleware initialized") 

98 logger.warning( 

99 "CSP configured with 'unsafe-inline' for Socket.IO compatibility. " 

100 "'unsafe-eval' removed for better XSS protection. Monitor for CSP violations." 

101 ) 

102 

103 def get_csp_policy(self) -> str: 

104 """Generate Content Security Policy header value. 

105 

106 Returns: 

107 CSP policy string with directives for Socket.IO compatibility 

108 

109 Note: 

110 - 'unsafe-inline' is required for Socket.IO compatibility; removing it 

111 would break real-time WebSocket functionality. If Socket.IO is removed 

112 in the future, 'unsafe-inline' should be tightened immediately. 

113 - 'unsafe-eval' removed for better XSS protection (not needed for Socket.IO) 

114 - connect-src defaults to 'self' (same-origin HTTP + WebSocket) 

115 """ 

116 connect_src = self.app.config.get("SECURITY_CSP_CONNECT_SRC", "'self'") 

117 return ( 

118 "default-src 'self'; " 

119 f"connect-src {connect_src}; " 

120 "script-src 'self' 'unsafe-inline'; " 

121 "style-src 'self' 'unsafe-inline'; " 

122 "font-src 'self' data:; " 

123 "img-src 'self' data:; " 

124 "media-src 'self'; " 

125 "worker-src blob:; " 

126 "child-src 'self' blob:; " 

127 "frame-src 'self'; " 

128 "frame-ancestors 'self'; " 

129 "manifest-src 'self'; " 

130 "object-src 'none'; " 

131 "base-uri 'self'; " 

132 "form-action 'self';" 

133 ) 

134 

135 @staticmethod 

136 def get_permissions_policy() -> str: 

137 """Generate Permissions-Policy header value. 

138 

139 Disables potentially dangerous browser features by default. 

140 

141 Returns: 

142 Permissions-Policy string 

143 """ 

144 return ( 

145 "geolocation=(), " 

146 "midi=(), " 

147 "camera=(), " 

148 "usb=(), " 

149 "magnetometer=(), " 

150 "accelerometer=(), " 

151 "gyroscope=(), " 

152 "microphone=(), " 

153 "payment=(), " 

154 "sync-xhr=(), " 

155 "document-domain=()" 

156 ) 

157 

158 def add_security_headers(self, response: Response) -> Response: 

159 """Add comprehensive security headers to Flask response. 

160 

161 Args: 

162 response: Flask response object 

163 

164 Returns: 

165 Response object with security headers added 

166 

167 Note: 

168 Cross-Origin-Embedder-Policy (COEP) is applied globally to all routes. 

169 Default is 'credentialless' which allows cross-origin requests without 

170 credentials. If stricter isolation is needed, configure SECURITY_COEP_POLICY 

171 to 'require-corp', but this may break cross-origin API access. 

172 """ 

173 # Content Security Policy - prevents XSS and injection attacks 

174 response.headers["Content-Security-Policy"] = self._cached_csp 

175 

176 # Anti-clickjacking protection 

177 response.headers["X-Frame-Options"] = "SAMEORIGIN" 

178 

179 # Prevent MIME-type sniffing 

180 response.headers["X-Content-Type-Options"] = "nosniff" 

181 

182 # Spectre vulnerability mitigation - applied globally 

183 response.headers["Cross-Origin-Opener-Policy"] = "same-origin" 

184 coep_policy = self.app.config.get( 

185 "SECURITY_COEP_POLICY", "credentialless" 

186 ) 

187 response.headers["Cross-Origin-Embedder-Policy"] = coep_policy 

188 response.headers["Cross-Origin-Resource-Policy"] = "same-origin" 

189 

190 # Permissions Policy - disable dangerous features 

191 response.headers["Permissions-Policy"] = self._cached_permissions_policy 

192 

193 # Remove server version information to prevent information disclosure 

194 response.headers.pop("Server", None) 

195 

196 # Referrer Policy - control referrer information 

197 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

198 

199 # HSTS - enforce HTTPS connections (only when request is secure) 

200 # request.is_secure checks wsgi.url_scheme == 'https' 

201 # ProxyFix (configured in app_factory) handles X-Forwarded-Proto for reverse proxies 

202 if request.is_secure: 

203 response.headers["Strict-Transport-Security"] = ( 

204 "max-age=31536000; includeSubDomains" 

205 ) 

206 

207 # Cache-Control - prevent caching of sensitive content 

208 # Static assets are handled separately by the static file route 

209 if not request.path.startswith("/static/"): 

210 response.headers["Cache-Control"] = ( 

211 "no-store, no-cache, must-revalidate, max-age=0" 

212 ) 

213 response.headers["Pragma"] = "no-cache" # HTTP/1.0 compatibility 

214 response.headers["Expires"] = "0" 

215 

216 # Add CORS headers for API requests if enabled 

217 if self._is_api_route(request.path) and self.app.config.get( 

218 "SECURITY_CORS_ENABLED", True 

219 ): 

220 response = self._add_cors_headers(response) 

221 

222 return response 

223 

224 @staticmethod 

225 def _is_api_route(path: str) -> bool: 

226 """Check if the request path is an API route. 

227 

228 Args: 

229 path: Request path to check 

230 

231 Returns: 

232 True if path is an API route 

233 """ 

234 return ( 

235 path.startswith("/api/") 

236 or path.startswith("/research/api/") 

237 or path.startswith("/history/api") 

238 ) 

239 

240 def _validate_cors_config(self) -> None: 

241 """Validate CORS configuration at startup to catch misconfigurations early. 

242 

243 Raises: 

244 ValueError: If CORS configuration is invalid 

245 """ 

246 if not self.app.config.get("SECURITY_CORS_ENABLED", True): 

247 return 

248 

249 allowed_origins = self.app.config.get( 

250 "SECURITY_CORS_ALLOWED_ORIGINS", "" 

251 ) 

252 allow_credentials = self.app.config.get( 

253 "SECURITY_CORS_ALLOW_CREDENTIALS", False 

254 ) 

255 

256 # Validate credentials with wildcard origin 

257 if allow_credentials and allowed_origins == "*": 

258 raise ValueError( 

259 "CORS misconfiguration: Cannot use credentials with wildcard origin '*'. " 

260 "Set SECURITY_CORS_ALLOWED_ORIGINS to a specific origin or disable credentials." 

261 ) 

262 

263 # Log info about multi-origin + credentials configuration 

264 if allow_credentials and "," in allowed_origins: 

265 logger.info( 

266 f"CORS configured with multiple origins and credentials enabled. " 

267 f"Origins: {allowed_origins}. " 

268 f"Using origin reflection pattern for security." 

269 ) 

270 

271 def _add_cors_headers(self, response: Response) -> Response: 

272 """Add CORS headers for API routes using origin reflection for multi-origin support. 

273 

274 Args: 

275 response: Flask response object to modify 

276 

277 Returns: 

278 Response object with CORS headers added 

279 

280 Security Note: 

281 - Uses "origin reflection" pattern for multi-origin support (comma-separated origins config) 

282 - Reflects the requesting origin back if it matches the whitelist 

283 - Wildcard origin (*) disables credentials per CORS spec 

284 - Single origin or reflected origins allow credentials if configured 

285 """ 

286 configured_origins = self.app.config.get( 

287 "SECURITY_CORS_ALLOWED_ORIGINS", "" 

288 ) 

289 allow_credentials = self.app.config.get( 

290 "SECURITY_CORS_ALLOW_CREDENTIALS", False 

291 ) 

292 

293 # Determine which origin to send back 

294 origin_to_send = configured_origins 

295 request_origin = request.headers.get("Origin") 

296 

297 # If wildcard, allow all origins 

298 if configured_origins == "*": 

299 origin_to_send = "*" 

300 # If multiple origins configured (comma-separated), use origin reflection 

301 elif "," in configured_origins: 

302 # Parse configured origins into a set 

303 allowed_origins_set = { 

304 origin.strip() for origin in configured_origins.split(",") 

305 } 

306 

307 # Reflect the request origin if it's in the whitelist (when present) 

308 if request_origin: 308 ↛ 324line 308 didn't jump to line 324 because the condition on line 308 was always true

309 if request_origin in allowed_origins_set: 

310 origin_to_send = request_origin 

311 else: 

312 # Request origin not in whitelist - log but still set configured origin 

313 # (Browsers will enforce CORS, this just logs for monitoring) 

314 logger.warning( 

315 f"CORS request from non-whitelisted origin: {request_origin}. " 

316 f"Allowed origins: {configured_origins}" 

317 ) 

318 # Use first configured origin for backward compatibility 

319 origin_to_send = list(allowed_origins_set)[0] 

320 # Single origin configured - always use it (browser enforces CORS) 

321 else: 

322 origin_to_send = configured_origins 

323 

324 response.headers["Access-Control-Allow-Origin"] = origin_to_send 

325 response.headers["Access-Control-Allow-Methods"] = ( 

326 "GET, POST, PUT, DELETE, OPTIONS" 

327 ) 

328 response.headers["Access-Control-Allow-Headers"] = ( 

329 "Content-Type, Authorization, X-Requested-With, X-HTTP-Method-Override" 

330 ) 

331 

332 # Set credentials if configured and not using wildcard 

333 # Note: Startup validation rejects credentials with wildcard origin 

334 if allow_credentials and origin_to_send != "*": 

335 response.headers["Access-Control-Allow-Credentials"] = "true" 

336 

337 response.headers["Access-Control-Max-Age"] = "3600" 

338 

339 return response