Coverage for src / local_deep_research / security / security_headers.py: 79%

75 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1"""Security headers middleware for Flask applications. 

2 

3This module provides comprehensive HTTP security headers to protect against 

4common web vulnerabilities identified by OWASP ZAP and other security scanners. 

5""" 

6 

7from typing import Optional 

8 

9from flask import Flask, Response, request 

10from loguru import logger 

11 

12 

13class SecurityHeaders: 

14 """Configure and apply security headers to Flask responses. 

15 

16 Addresses security vulnerabilities: 

17 - CSP: Content Security Policy to prevent XSS 

18 - Clickjacking: X-Frame-Options protection 

19 - MIME sniffing: X-Content-Type-Options 

20 - Spectre: Cross-Origin policies 

21 - Feature abuse: Permissions-Policy 

22 - Information leakage: Server header removal 

23 - Protocol downgrade: HSTS (Strict-Transport-Security) 

24 """ 

25 

26 def __init__(self, app: Optional[Flask] = None) -> None: 

27 """Initialize security headers middleware. 

28 

29 Args: 

30 app (Optional[Flask]): Flask application instance (optional, can call init_app later) 

31 """ 

32 if app is not None: 32 ↛ exitline 32 didn't return from function '__init__' because the condition on line 32 was always true

33 self.init_app(app) 

34 

35 def init_app(self, app: Flask): 

36 """Initialize the Flask application with security headers. 

37 

38 Args: 

39 app: Flask application instance 

40 

41 Security Configuration: 

42 SECURITY_CSP_CONNECT_SRC: Restricts WebSocket/fetch connections (default: 'self' ws: wss:) 

43 SECURITY_CORS_ENABLED: Enable CORS for API routes (default: True) 

44 SECURITY_CORS_ALLOW_CREDENTIALS: Allow credentials in CORS (default: False) 

45 SECURITY_CORS_ALLOWED_ORIGINS: Allowed CORS origins, comma-separated for multiple 

46 (default: "" - requires explicit configuration). 

47 Multi-origin uses origin reflection. 

48 SECURITY_COEP_POLICY: Cross-Origin-Embedder-Policy applied globally to ALL routes 

49 (default: "credentialless"). Options: require-corp, credentialless, unsafe-none 

50 

51 Important Security Trade-offs: 

52 - CSP includes 'unsafe-inline' for Socket.IO compatibility 

53 - This reduces XSS protection but is necessary for real-time WebSocket functionality 

54 - 'unsafe-eval' has been removed to prevent eval() XSS attacks 

55 - If Socket.IO is removed, 'unsafe-inline' should also be tightened immediately 

56 - Monitor CSP violation reports to detect potential XSS attempts 

57 - COEP policy is applied globally, not per-route. 'credentialless' allows cross-origin 

58 requests without credentials. Use 'require-corp' for stricter isolation. 

59 """ 

60 # Set default security configuration 

61 # connect-src uses 'self' + ws:/wss: to work from any origin (localhost, LAN, Docker, internet) 

62 app.config.setdefault( 

63 "SECURITY_CSP_CONNECT_SRC", 

64 "'self' ws: wss:", 

65 ) 

66 app.config.setdefault("SECURITY_CORS_ENABLED", True) 

67 app.config.setdefault("SECURITY_CORS_ALLOW_CREDENTIALS", False) 

68 # Default to empty (fail closed) - require explicit configuration for CORS 

69 # Using "*" creates a security footgun - users must explicitly allow origins 

70 app.config.setdefault("SECURITY_CORS_ALLOWED_ORIGINS", "") 

71 app.config.setdefault("SECURITY_COEP_POLICY", "credentialless") 

72 

73 self.app = app 

74 app.after_request(self.add_security_headers) 

75 

76 # Validate CORS configuration at startup 

77 self._validate_cors_config() 

78 

79 logger.info("Security headers middleware initialized") 

80 logger.warning( 

81 "CSP configured with 'unsafe-inline' for Socket.IO compatibility. " 

82 "'unsafe-eval' removed for better XSS protection. Monitor for CSP violations." 

83 ) 

84 

85 def get_csp_policy(self) -> str: 

86 """Generate Content Security Policy header value. 

87 

88 Returns: 

89 CSP policy string with directives for Socket.IO compatibility 

90 

91 Note: 

92 - 'unsafe-inline' is required for Socket.IO compatibility 

93 - 'unsafe-eval' removed for better XSS protection (not needed for Socket.IO) 

94 - connect-src uses 'self' ws: wss: by default (works from any origin) 

95 """ 

96 connect_src = self.app.config.get("SECURITY_CSP_CONNECT_SRC", "'self'") 

97 return ( 

98 "default-src 'self'; " 

99 f"connect-src {connect_src}; " 

100 "script-src 'self' 'unsafe-inline'; " 

101 "style-src 'self' 'unsafe-inline'; " 

102 "font-src 'self' data:; " 

103 "img-src 'self' data:; " 

104 "worker-src blob:; " 

105 "frame-src 'self'; " 

106 "object-src 'none'; " 

107 "base-uri 'self'; " 

108 "form-action 'self';" 

109 ) 

110 

111 @staticmethod 

112 def get_permissions_policy() -> str: 

113 """Generate Permissions-Policy header value. 

114 

115 Disables potentially dangerous browser features by default. 

116 

117 Returns: 

118 Permissions-Policy string 

119 """ 

120 return ( 

121 "geolocation=(), " 

122 "midi=(), " 

123 "camera=(), " 

124 "usb=(), " 

125 "magnetometer=(), " 

126 "accelerometer=(), " 

127 "gyroscope=(), " 

128 "microphone=(), " 

129 "payment=(), " 

130 "sync-xhr=(), " 

131 "document-domain=()" 

132 ) 

133 

134 def add_security_headers(self, response: Response) -> Response: 

135 """Add comprehensive security headers to Flask response. 

136 

137 Args: 

138 response: Flask response object 

139 

140 Returns: 

141 Response object with security headers added 

142 

143 Note: 

144 Cross-Origin-Embedder-Policy (COEP) is applied globally to all routes. 

145 Default is 'credentialless' which allows cross-origin requests without 

146 credentials. If stricter isolation is needed, configure SECURITY_COEP_POLICY 

147 to 'require-corp', but this may break cross-origin API access. 

148 """ 

149 # Content Security Policy - prevents XSS and injection attacks 

150 csp = self.get_csp_policy() 

151 response.headers["Content-Security-Policy"] = csp 

152 

153 # Anti-clickjacking protection 

154 response.headers["X-Frame-Options"] = "SAMEORIGIN" 

155 

156 # Prevent MIME-type sniffing 

157 response.headers["X-Content-Type-Options"] = "nosniff" 

158 

159 # Spectre vulnerability mitigation - applied globally 

160 response.headers["Cross-Origin-Opener-Policy"] = "same-origin" 

161 coep_policy = self.app.config.get( 

162 "SECURITY_COEP_POLICY", "credentialless" 

163 ) 

164 response.headers["Cross-Origin-Embedder-Policy"] = coep_policy 

165 response.headers["Cross-Origin-Resource-Policy"] = "same-origin" 

166 

167 # Permissions Policy - disable dangerous features 

168 response.headers["Permissions-Policy"] = self.get_permissions_policy() 

169 

170 # Remove server version information to prevent information disclosure 

171 response.headers.pop("Server", None) 

172 

173 # Referrer Policy - control referrer information 

174 response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" 

175 

176 # HSTS - enforce HTTPS connections (only when request is secure) 

177 # request.is_secure checks wsgi.url_scheme == 'https' 

178 # ProxyFix (configured in app_factory) handles X-Forwarded-Proto for reverse proxies 

179 if request.is_secure: 

180 response.headers["Strict-Transport-Security"] = ( 

181 "max-age=31536000; includeSubDomains" 

182 ) 

183 

184 # Add CORS headers for API requests if enabled 

185 if self._is_api_route(request.path) and self.app.config.get( 

186 "SECURITY_CORS_ENABLED", True 

187 ): 

188 response = self._add_cors_headers(response) 

189 

190 return response 

191 

192 @staticmethod 

193 def _is_api_route(path: str) -> bool: 

194 """Check if the request path is an API route. 

195 

196 Args: 

197 path: Request path to check 

198 

199 Returns: 

200 True if path is an API route 

201 """ 

202 return path.startswith("/api/") or path.startswith("/research/api/") 

203 

204 def _validate_cors_config(self) -> None: 

205 """Validate CORS configuration at startup to catch misconfigurations early. 

206 

207 Raises: 

208 ValueError: If CORS configuration is invalid 

209 """ 

210 if not self.app.config.get("SECURITY_CORS_ENABLED", True): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 return 

212 

213 allowed_origins = self.app.config.get( 

214 "SECURITY_CORS_ALLOWED_ORIGINS", "" 

215 ) 

216 allow_credentials = self.app.config.get( 

217 "SECURITY_CORS_ALLOW_CREDENTIALS", False 

218 ) 

219 

220 # Validate credentials with wildcard origin 

221 if allow_credentials and allowed_origins == "*": 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 raise ValueError( 

223 "CORS misconfiguration: Cannot use credentials with wildcard origin '*'. " 

224 "Set SECURITY_CORS_ALLOWED_ORIGINS to a specific origin or disable credentials." 

225 ) 

226 

227 # Log info about multi-origin + credentials configuration 

228 if allow_credentials and "," in allowed_origins: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 logger.info( 

230 f"CORS configured with multiple origins and credentials enabled. " 

231 f"Origins: {allowed_origins}. " 

232 f"Using origin reflection pattern for security." 

233 ) 

234 

235 def _add_cors_headers(self, response: Response) -> Response: 

236 """Add CORS headers for API routes using origin reflection for multi-origin support. 

237 

238 Args: 

239 response: Flask response object to modify 

240 

241 Returns: 

242 Response object with CORS headers added 

243 

244 Security Note: 

245 - Uses "origin reflection" pattern for multi-origin support (comma-separated origins config) 

246 - Reflects the requesting origin back if it matches the whitelist 

247 - Wildcard origin (*) disables credentials per CORS spec 

248 - Single origin or reflected origins allow credentials if configured 

249 """ 

250 configured_origins = self.app.config.get( 

251 "SECURITY_CORS_ALLOWED_ORIGINS", "" 

252 ) 

253 allow_credentials = self.app.config.get( 

254 "SECURITY_CORS_ALLOW_CREDENTIALS", False 

255 ) 

256 

257 # Determine which origin to send back 

258 origin_to_send = configured_origins 

259 request_origin = request.headers.get("Origin") 

260 

261 # If wildcard, allow all origins 

262 if configured_origins == "*": 

263 origin_to_send = "*" 

264 # If multiple origins configured (comma-separated), use origin reflection 

265 elif "," in configured_origins: 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was never true

266 # Parse configured origins into a set 

267 allowed_origins_set = { 

268 origin.strip() for origin in configured_origins.split(",") 

269 } 

270 

271 # Reflect the request origin if it's in the whitelist (when present) 

272 if request_origin: 

273 if request_origin in allowed_origins_set: 

274 origin_to_send = request_origin 

275 else: 

276 # Request origin not in whitelist - log but still set configured origin 

277 # (Browsers will enforce CORS, this just logs for monitoring) 

278 logger.warning( 

279 f"CORS request from non-whitelisted origin: {request_origin}. " 

280 f"Allowed origins: {configured_origins}" 

281 ) 

282 # Use first configured origin for backward compatibility 

283 origin_to_send = list(allowed_origins_set)[0] 

284 # Single origin configured - always use it (browser enforces CORS) 

285 else: 

286 origin_to_send = configured_origins 

287 

288 response.headers["Access-Control-Allow-Origin"] = origin_to_send 

289 response.headers["Access-Control-Allow-Methods"] = ( 

290 "GET, POST, PUT, DELETE, OPTIONS" 

291 ) 

292 response.headers["Access-Control-Allow-Headers"] = ( 

293 "Content-Type, Authorization, X-Requested-With, X-HTTP-Method-Override" 

294 ) 

295 

296 # Set credentials if configured and not using wildcard 

297 # Note: Startup validation ensures credentials is False with wildcard or multi-origin 

298 if allow_credentials and origin_to_send != "*": 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true

299 response.headers["Access-Control-Allow-Credentials"] = "true" 

300 

301 response.headers["Access-Control-Max-Age"] = "3600" 

302 

303 return response