Coverage for src / local_deep_research / security / security_headers.py: 91%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1"""Security headers middleware for Flask applications. 

2 

3This module provides comprehensive HTTP security headers to protect against 

4common web vulnerabilities identified by OWASP ZAP and other security scanners. 

5""" 

6 

7from typing import Optional 

8 

9from flask import Flask, Response, request 

10from loguru import logger 

11 

12 

13class SecurityHeaders: 

14 """Configure and apply security headers to Flask responses. 

15 

16 Addresses security vulnerabilities: 

17 - CSP: Content Security Policy to prevent XSS 

18 - Clickjacking: X-Frame-Options protection 

19 - MIME sniffing: X-Content-Type-Options 

20 - Spectre: Cross-Origin policies 

21 - Feature abuse: Permissions-Policy 

22 - Information leakage: Server header removal 

23 - Protocol downgrade: HSTS (Strict-Transport-Security) 

24 """ 

25 

26 def __init__(self, app: Optional[Flask] = None) -> None: 

27 """Initialize security headers middleware. 

28 

29 Args: 

30 app (Optional[Flask]): Flask application instance (optional, can call init_app later) 

31 """ 

32 if app is not None: 

33 self.init_app(app) 

34 

35 def init_app(self, app: Flask): 

36 """Initialize the Flask application with security headers. 

37 

38 Args: 

39 app: Flask application instance 

40 

41 Security Configuration: 

42 SECURITY_CSP_CONNECT_SRC: Restricts WebSocket/fetch connections (default: 'self' ws: wss:) 

43 SECURITY_CORS_ENABLED: Enable CORS for API routes (default: True) 

44 SECURITY_CORS_ALLOW_CREDENTIALS: Allow credentials in CORS (default: False) 

45 SECURITY_CORS_ALLOWED_ORIGINS: Allowed CORS origins, comma-separated for multiple 

46 (default: "" - requires explicit configuration). 

47 Multi-origin uses origin reflection. 

48 SECURITY_COEP_POLICY: Cross-Origin-Embedder-Policy applied globally to ALL routes 

49 (default: "credentialless"). Options: require-corp, credentialless, unsafe-none 

50 

51 Important Security Trade-offs: 

52 - CSP includes 'unsafe-inline' for Socket.IO compatibility 

53 - This reduces XSS protection but is necessary for real-time WebSocket functionality 

54 - 'unsafe-eval' has been removed to prevent eval() XSS attacks 

55 - If Socket.IO is removed, 'unsafe-inline' should also be tightened immediately 

56 - Monitor CSP violation reports to detect potential XSS attempts 

57 - COEP policy is applied globally, not per-route. 'credentialless' allows cross-origin 

58 requests without credentials. Use 'require-corp' for stricter isolation. 

59 """ 

60 # Set default security configuration 

61 # connect-src uses 'self' + ws:/wss: to work from any origin (localhost, LAN, Docker, internet) 

62 app.config.setdefault( 

63 "SECURITY_CSP_CONNECT_SRC", 

64 "'self' ws: wss:", 

65 ) 

66 app.config.setdefault("SECURITY_CORS_ENABLED", True) 

67 app.config.setdefault("SECURITY_CORS_ALLOW_CREDENTIALS", False) 

68 

69 # CORS allowed origins: check env var first, then fall back to default 

70 # Env var: LDR_SECURITY_CORS_ALLOWED_ORIGINS 

71 # Default to empty (fail closed) - require explicit configuration for CORS 

72 from ..settings.env_registry import get_env_setting 

73 

74 cors_env = get_env_setting("security.cors.allowed_origins") 

75 if cors_env is not None: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 app.config["SECURITY_CORS_ALLOWED_ORIGINS"] = cors_env 

77 else: 

78 app.config.setdefault("SECURITY_CORS_ALLOWED_ORIGINS", "") 

79 

80 app.config.setdefault("SECURITY_COEP_POLICY", "credentialless") 

81 

82 self.app = app 

83 app.after_request(self.add_security_headers) 

84 

85 # Validate CORS configuration at startup 

86 self._validate_cors_config() 

87 

88 logger.info("Security headers middleware initialized") 

89 logger.warning( 

90 "CSP configured with 'unsafe-inline' for Socket.IO compatibility. " 

91 "'unsafe-eval' removed for better XSS protection. Monitor for CSP violations." 

92 ) 

93 

94 def get_csp_policy(self) -> str: 

95 """Generate Content Security Policy header value. 

96 

97 Returns: 

98 CSP policy string with directives for Socket.IO compatibility 

99 

100 Note: 

101 - 'unsafe-inline' is required for Socket.IO compatibility 

102 - 'unsafe-eval' removed for better XSS protection (not needed for Socket.IO) 

103 - connect-src uses 'self' ws: wss: by default (works from any origin) 

104 """ 

105 connect_src = self.app.config.get("SECURITY_CSP_CONNECT_SRC", "'self'") 

106 return ( 

107 "default-src 'self'; " 

108 f"connect-src {connect_src}; " 

109 "script-src 'self' 'unsafe-inline'; " 

110 "style-src 'self' 'unsafe-inline'; " 

111 "font-src 'self' data:; " 

112 "img-src 'self' data:; " 

113 "media-src 'self'; " 

114 "worker-src blob:; " 

115 "child-src 'self' blob:; " 

116 "frame-src 'self'; " 

117 "frame-ancestors 'self'; " 

118 "manifest-src 'self'; " 

119 "object-src 'none'; " 

120 "base-uri 'self'; " 

121 "form-action 'self';" 

122 ) 

123 

124 @staticmethod 

125 def get_permissions_policy() -> str: 

126 """Generate Permissions-Policy header value. 

127 

128 Disables potentially dangerous browser features by default. 

129 

130 Returns: 

131 Permissions-Policy string 

132 """ 

133 return ( 

134 "geolocation=(), " 

135 "midi=(), " 

136 "camera=(), " 

137 "usb=(), " 

138 "magnetometer=(), " 

139 "accelerometer=(), " 

140 "gyroscope=(), " 

141 "microphone=(), " 

142 "payment=(), " 

143 "sync-xhr=(), " 

144 "document-domain=()" 

145 ) 

146 

147 def add_security_headers(self, response: Response) -> Response: 

148 """Add comprehensive security headers to Flask response. 

149 

150 Args: 

151 response: Flask response object 

152 

153 Returns: 

154 Response object with security headers added 

155 

156 Note: 

157 Cross-Origin-Embedder-Policy (COEP) is applied globally to all routes. 

158 Default is 'credentialless' which allows cross-origin requests without 

159 credentials. If stricter isolation is needed, configure SECURITY_COEP_POLICY 

160 to 'require-corp', but this may break cross-origin API access. 

161 """ 

162 # Content Security Policy - prevents XSS and injection attacks 

163 csp = self.get_csp_policy() 

164 response.headers["Content-Security-Policy"] = csp 

165 

166 # Anti-clickjacking protection 

167 response.headers["X-Frame-Options"] = "SAMEORIGIN" 

168 

169 # Prevent MIME-type sniffing 

170 response.headers["X-Content-Type-Options"] = "nosniff" 

171 

172 # Spectre vulnerability mitigation - applied globally 

173 response.headers["Cross-Origin-Opener-Policy"] = "same-origin" 

174 coep_policy = self.app.config.get( 

175 "SECURITY_COEP_POLICY", "credentialless" 

176 ) 

177 response.headers["Cross-Origin-Embedder-Policy"] = coep_policy 

178 response.headers["Cross-Origin-Resource-Policy"] = "same-origin" 

179 

180 # Permissions Policy - disable dangerous features 

181 response.headers["Permissions-Policy"] = self.get_permissions_policy() 

182 

183 # Remove server version information to prevent information disclosure 

184 response.headers.pop("Server", None) 

185 

186 # Referrer Policy - control referrer information 

187 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

188 

189 # HSTS - enforce HTTPS connections (only when request is secure) 

190 # request.is_secure checks wsgi.url_scheme == 'https' 

191 # ProxyFix (configured in app_factory) handles X-Forwarded-Proto for reverse proxies 

192 if request.is_secure: 

193 response.headers["Strict-Transport-Security"] = ( 

194 "max-age=31536000; includeSubDomains" 

195 ) 

196 

197 # Cache-Control - prevent caching of sensitive content 

198 # Static assets are handled separately by the static file route 

199 if not request.path.startswith("/static/"): 

200 response.headers["Cache-Control"] = ( 

201 "no-store, no-cache, must-revalidate, max-age=0" 

202 ) 

203 response.headers["Pragma"] = "no-cache" # HTTP/1.0 compatibility 

204 response.headers["Expires"] = "0" 

205 

206 # Add CORS headers for API requests if enabled 

207 if self._is_api_route(request.path) and self.app.config.get( 

208 "SECURITY_CORS_ENABLED", True 

209 ): 

210 response = self._add_cors_headers(response) 

211 

212 return response 

213 

214 @staticmethod 

215 def _is_api_route(path: str) -> bool: 

216 """Check if the request path is an API route. 

217 

218 Args: 

219 path: Request path to check 

220 

221 Returns: 

222 True if path is an API route 

223 """ 

224 return ( 

225 path.startswith("/api/") 

226 or path.startswith("/research/api/") 

227 or path.startswith("/history/api") 

228 ) 

229 

230 def _validate_cors_config(self) -> None: 

231 """Validate CORS configuration at startup to catch misconfigurations early. 

232 

233 Raises: 

234 ValueError: If CORS configuration is invalid 

235 """ 

236 if not self.app.config.get("SECURITY_CORS_ENABLED", True): 

237 return 

238 

239 allowed_origins = self.app.config.get( 

240 "SECURITY_CORS_ALLOWED_ORIGINS", "" 

241 ) 

242 allow_credentials = self.app.config.get( 

243 "SECURITY_CORS_ALLOW_CREDENTIALS", False 

244 ) 

245 

246 # Validate credentials with wildcard origin 

247 if allow_credentials and allowed_origins == "*": 

248 raise ValueError( 

249 "CORS misconfiguration: Cannot use credentials with wildcard origin '*'. " 

250 "Set SECURITY_CORS_ALLOWED_ORIGINS to a specific origin or disable credentials." 

251 ) 

252 

253 # Log info about multi-origin + credentials configuration 

254 if allow_credentials and "," in allowed_origins: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 logger.info( 

256 f"CORS configured with multiple origins and credentials enabled. " 

257 f"Origins: {allowed_origins}. " 

258 f"Using origin reflection pattern for security." 

259 ) 

260 

261 def _add_cors_headers(self, response: Response) -> Response: 

262 """Add CORS headers for API routes using origin reflection for multi-origin support. 

263 

264 Args: 

265 response: Flask response object to modify 

266 

267 Returns: 

268 Response object with CORS headers added 

269 

270 Security Note: 

271 - Uses "origin reflection" pattern for multi-origin support (comma-separated origins config) 

272 - Reflects the requesting origin back if it matches the whitelist 

273 - Wildcard origin (*) disables credentials per CORS spec 

274 - Single origin or reflected origins allow credentials if configured 

275 """ 

276 configured_origins = self.app.config.get( 

277 "SECURITY_CORS_ALLOWED_ORIGINS", "" 

278 ) 

279 allow_credentials = self.app.config.get( 

280 "SECURITY_CORS_ALLOW_CREDENTIALS", False 

281 ) 

282 

283 # Determine which origin to send back 

284 origin_to_send = configured_origins 

285 request_origin = request.headers.get("Origin") 

286 

287 # If wildcard, allow all origins 

288 if configured_origins == "*": 

289 origin_to_send = "*" 

290 # If multiple origins configured (comma-separated), use origin reflection 

291 elif "," in configured_origins: 

292 # Parse configured origins into a set 

293 allowed_origins_set = { 

294 origin.strip() for origin in configured_origins.split(",") 

295 } 

296 

297 # Reflect the request origin if it's in the whitelist (when present) 

298 if request_origin: 298 ↛ 314line 298 didn't jump to line 314 because the condition on line 298 was always true

299 if request_origin in allowed_origins_set: 299 ↛ 304line 299 didn't jump to line 304 because the condition on line 299 was always true

300 origin_to_send = request_origin 

301 else: 

302 # Request origin not in whitelist - log but still set configured origin 

303 # (Browsers will enforce CORS, this just logs for monitoring) 

304 logger.warning( 

305 f"CORS request from non-whitelisted origin: {request_origin}. " 

306 f"Allowed origins: {configured_origins}" 

307 ) 

308 # Use first configured origin for backward compatibility 

309 origin_to_send = list(allowed_origins_set)[0] 

310 # Single origin configured - always use it (browser enforces CORS) 

311 else: 

312 origin_to_send = configured_origins 

313 

314 response.headers["Access-Control-Allow-Origin"] = origin_to_send 

315 response.headers["Access-Control-Allow-Methods"] = ( 

316 "GET, POST, PUT, DELETE, OPTIONS" 

317 ) 

318 response.headers["Access-Control-Allow-Headers"] = ( 

319 "Content-Type, Authorization, X-Requested-With, X-HTTP-Method-Override" 

320 ) 

321 

322 # Set credentials if configured and not using wildcard 

323 # Note: Startup validation ensures credentials is False with wildcard or multi-origin 

324 if allow_credentials and origin_to_send != "*": 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true

325 response.headers["Access-Control-Allow-Credentials"] = "true" 

326 

327 response.headers["Access-Control-Max-Age"] = "3600" 

328 

329 return response