Coverage for src / local_deep_research / security / url_validator.py: 89%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Centralized URL validation utilities for security. 

3 

4This module provides secure URL validation to prevent XSS attacks, 

5data exfiltration, and other URL-based security vulnerabilities. 

6""" 

7 

8import re 

9from typing import Optional, List 

10from urllib.parse import urlparse, urljoin 

11from loguru import logger 

12 

13 

14class URLValidationError(ValueError): 

15 """Raised when URL construction or validation fails.""" 

16 

17 pass 

18 

19 

20class URLValidator: 

21 """Centralized URL validation for security.""" 

22 

23 # Unsafe URL schemes that could lead to XSS or data exfiltration 

24 UNSAFE_SCHEMES = ( 

25 "javascript", 

26 "data", 

27 "vbscript", 

28 "about", 

29 "blob", 

30 "file", 

31 ) 

32 

33 # Safe schemes for external links 

34 SAFE_SCHEMES = ("http", "https", "ftp", "ftps") 

35 

36 # Email scheme 

37 EMAIL_SCHEME = "mailto" 

38 

39 # Common academic/research domains that should be allowed 

40 TRUSTED_ACADEMIC_DOMAINS = ( 

41 "arxiv.org", 

42 "pubmed.ncbi.nlm.nih.gov", 

43 "ncbi.nlm.nih.gov", 

44 "biorxiv.org", 

45 "medrxiv.org", 

46 "doi.org", 

47 "nature.com", 

48 "science.org", 

49 "sciencedirect.com", 

50 "springer.com", 

51 "wiley.com", 

52 "plos.org", 

53 "pnas.org", 

54 "ieee.org", 

55 "acm.org", 

56 ) 

57 

58 @staticmethod 

59 def is_unsafe_scheme(url: str) -> bool: 

60 """ 

61 Check if a URL uses an unsafe scheme. 

62 

63 Args: 

64 url: The URL to check 

65 

66 Returns: 

67 True if the URL uses an unsafe scheme, False otherwise 

68 """ 

69 if not url: 

70 return False 

71 

72 # Normalize the URL - trim whitespace and convert to lowercase 

73 normalized_url = url.strip().lower() 

74 

75 # Check for unsafe schemes 

76 for scheme in URLValidator.UNSAFE_SCHEMES: 

77 if normalized_url.startswith(f"{scheme}:"): 

78 logger.warning( 

79 f"Unsafe URL scheme detected: {scheme} in URL: {url[:100]}" 

80 ) 

81 return True 

82 

83 return False 

84 

85 @staticmethod 

86 def is_safe_url( 

87 url: str, 

88 require_scheme: bool = True, 

89 allow_fragments: bool = True, 

90 allow_mailto: bool = False, 

91 trusted_domains: Optional[List[str]] = None, 

92 ) -> bool: 

93 """ 

94 Validate if a URL is safe to use. 

95 

96 Args: 

97 url: The URL to validate 

98 require_scheme: Whether to require an explicit scheme 

99 allow_fragments: Whether to allow fragment identifiers (#) 

100 allow_mailto: Whether to allow mailto: links 

101 trusted_domains: Optional list of trusted domains 

102 

103 Returns: 

104 True if the URL is safe, False otherwise 

105 """ 

106 if not url or not isinstance(url, str): 

107 return False 

108 

109 # Check for unsafe schemes first 

110 if URLValidator.is_unsafe_scheme(url): 

111 return False 

112 

113 # Handle fragment-only URLs 

114 if url.startswith("#"): 

115 return allow_fragments 

116 

117 # Parse the URL 

118 try: 

119 parsed = urlparse(url) 

120 except Exception as e: 

121 logger.warning(f"Failed to parse URL '{url[:100]}': {e}") 

122 return False 

123 

124 # Check scheme 

125 if not parsed.scheme: 

126 if require_scheme: 

127 return False 

128 # If no scheme is required, assume http/https for URL parsing 

129 parsed = urlparse(f"http://{url}") # DevSkim: ignore DS137138 

130 

131 scheme_lower = parsed.scheme.lower() 

132 

133 # Check if it's a mailto link 

134 if scheme_lower == URLValidator.EMAIL_SCHEME: 

135 return allow_mailto 

136 

137 # Check if it's a safe scheme 

138 if scheme_lower not in URLValidator.SAFE_SCHEMES: 

139 logger.warning(f"Unsafe URL scheme: {scheme_lower}") 

140 return False 

141 

142 # Validate domain if trusted domains are specified 

143 if trusted_domains and parsed.hostname: 

144 hostname_lower = parsed.hostname.lower() 

145 if not any( 

146 hostname_lower == domain.lower() 

147 or hostname_lower.endswith(f".{domain.lower()}") 

148 for domain in trusted_domains 

149 ): 

150 logger.warning( 

151 f"URL domain not in trusted list: {parsed.hostname}" 

152 ) 

153 return False 

154 

155 # Check for suspicious patterns in the URL 

156 if URLValidator._has_suspicious_patterns(url): 

157 return False 

158 

159 return True 

160 

161 @staticmethod 

162 def _has_suspicious_patterns(url: str) -> bool: 

163 """ 

164 Check for suspicious patterns in URLs that might indicate attacks. 

165 

166 Args: 

167 url: The URL to check 

168 

169 Returns: 

170 True if suspicious patterns are found, False otherwise 

171 """ 

172 suspicious_patterns = [ 

173 # Double encoding 

174 r"%25[0-9a-fA-F]{2}", 

175 # Null bytes 

176 r"%00", 

177 # Unicode encoding bypass attempts 

178 r"\\u[0-9a-fA-F]{4}", 

179 # HTML entity encoding 

180 r"&(#x?[0-9a-fA-F]+|[a-zA-Z]+);", 

181 ] 

182 

183 for pattern in suspicious_patterns: 

184 if re.search(pattern, url, re.IGNORECASE): 

185 logger.warning(f"Suspicious pattern found in URL: {pattern}") 

186 return True 

187 

188 return False 

189 

190 @staticmethod 

191 def sanitize_url(url: str, default_scheme: str = "https") -> Optional[str]: 

192 """ 

193 Sanitize a URL by adding a scheme if missing and validating it. 

194 

195 Args: 

196 url: The URL to sanitize 

197 default_scheme: The default scheme to add if missing 

198 

199 Returns: 

200 Sanitized URL or None if the URL is unsafe 

201 """ 

202 if not url: 

203 return None 

204 

205 # Check for unsafe schemes 

206 if URLValidator.is_unsafe_scheme(url): 

207 return None 

208 

209 # Strip whitespace 

210 url = url.strip() 

211 

212 # Parse the URL 

213 try: 

214 parsed = urlparse(url) 

215 

216 # Add scheme if missing 

217 if not parsed.scheme: 

218 url = f"{default_scheme}://{url}" 

219 parsed = urlparse(url) 

220 

221 # Validate the final URL 

222 if URLValidator.is_safe_url(url, require_scheme=True): 222 ↛ 228line 222 didn't jump to line 228 because the condition on line 222 was always true

223 return url 

224 

225 except Exception as e: 

226 logger.warning(f"Failed to sanitize URL '{url[:100]}': {e}") 

227 

228 return None 

229 

230 @staticmethod 

231 def is_academic_url(url: str) -> bool: 

232 """ 

233 Check if a URL is from a known academic/research domain. 

234 

235 Args: 

236 url: The URL to check 

237 

238 Returns: 

239 True if the URL is from an academic domain, False otherwise 

240 """ 

241 try: 

242 parsed = urlparse(url) 

243 if parsed.hostname: 

244 hostname_lower = parsed.hostname.lower() 

245 return any( 

246 hostname_lower == domain 

247 or hostname_lower.endswith(f".{domain}") 

248 for domain in URLValidator.TRUSTED_ACADEMIC_DOMAINS 

249 ) 

250 except Exception: 

251 pass 

252 

253 return False 

254 

255 @staticmethod 

256 def extract_doi(url: str) -> Optional[str]: 

257 """ 

258 Extract DOI from a URL if present. 

259 

260 Args: 

261 url: The URL to extract DOI from 

262 

263 Returns: 

264 The DOI if found, None otherwise 

265 """ 

266 # Common DOI patterns with explicit pattern identification 

267 doi_patterns = [ 

268 ( 

269 r"10\.\d{4,}(?:\.\d+)*\/[-._;()\/:a-zA-Z0-9]+", 

270 0, 

271 ), # Direct DOI, group 0 

272 (r"doi\.org\/(10\.\d{4,}[^\s]*)", 1), # doi.org URL, group 1 

273 ] 

274 

275 for pattern, group_index in doi_patterns: 

276 match = re.search(pattern, url, re.IGNORECASE) 

277 if match: 

278 return match.group(group_index) 

279 

280 return None 

281 

282 @staticmethod 

283 def validate_http_url(url: str) -> bool: 

284 """ 

285 Validate that a callback URL is well-formed and safe for HTTP/HTTPS use. 

286 

287 This is stricter than is_safe_url() and specifically validates HTTP/HTTPS 

288 URLs for use as application callbacks (e.g., in notifications, redirects). 

289 It does NOT validate Apprise service URLs which use other protocols. 

290 

291 Args: 

292 url: HTTP/HTTPS callback URL to validate 

293 

294 Returns: 

295 True if valid 

296 

297 Raises: 

298 URLValidationError: If URL is invalid 

299 """ 

300 if not url or not isinstance(url, str): 

301 raise URLValidationError("URL must be a non-empty string") 

302 

303 try: 

304 parsed = urlparse(url) 

305 

306 # Must have a scheme 

307 if not parsed.scheme: 

308 raise URLValidationError( 

309 "URL must have a scheme (http or https)" 

310 ) 

311 

312 # Must be http or https (callback URLs only) 

313 if parsed.scheme not in ("http", "https"): 

314 raise URLValidationError( 

315 f"URL scheme must be http or https, got: {parsed.scheme}" 

316 ) 

317 

318 # Use the general security validator for additional safety 

319 if not URLValidator.is_safe_url(url, require_scheme=True): 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 raise URLValidationError( 

321 f"URL failed security validation: {url}" 

322 ) 

323 

324 # Must have a netloc (hostname) 

325 if not parsed.netloc: 

326 raise URLValidationError("URL must have a hostname") 

327 

328 # Check for obvious hostname issues 

329 if parsed.netloc.startswith(".") or parsed.netloc.endswith("."): 

330 raise URLValidationError(f"Invalid hostname: {parsed.netloc}") 

331 

332 # Path should be valid if present 

333 if parsed.path and not parsed.path.startswith("/"): 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 raise URLValidationError( 

335 f"URL path must start with /: {parsed.path}" 

336 ) 

337 

338 return True 

339 

340 except Exception as e: 

341 if isinstance(e, URLValidationError): 341 ↛ 343line 341 didn't jump to line 343 because the condition on line 341 was always true

342 raise 

343 raise URLValidationError(f"Failed to validate URL: {e}") 

344 

345 @staticmethod 

346 def is_safe_redirect_url(target: str, host_url: str) -> bool: 

347 """ 

348 Validate that a redirect target is safe (same host). 

349 

350 Prevents open redirect attacks by ensuring the target URL 

351 is either relative or points to the same host as the application. 

352 Also prevents CRLF injection attacks by rejecting URLs containing 

353 carriage return or line feed characters. 

354 

355 Uses the standard Flask pattern from: 

356 https://github.com/fengsp/flask-snippets/blob/master/security/redirect_back.py 

357 

358 Security protections implemented: 

359 - CRLF injection prevention 

360 - Protocol-relative URL bypass (//evil.com) 

361 - Triple-slash bypass (///evil.com) 

362 - URL-encoded bypass attempts 

363 - Backslash bypass (treated as forward slash by some browsers) 

364 - Path traversal blocking 

365 

366 Args: 

367 target: The redirect URL to validate (can be relative or absolute) 

368 host_url: The application's host URL (e.g., request.host_url) 

369 

370 Returns: 

371 True if the URL is safe to redirect to, False otherwise 

372 """ 

373 if not target: 

374 return False 

375 

376 # Prevent CRLF injection by rejecting URLs with CR or LF characters 

377 if "\r" in target or "\n" in target: 

378 logger.warning("CRLF injection attempt detected in redirect URL") 

379 return False 

380 

381 # Normalize and decode URL to detect encoded bypass attempts 

382 # This catches %2f%2f (encoded //) and similar tricks 

383 try: 

384 from urllib.parse import unquote 

385 

386 decoded_target = unquote(target) 

387 except Exception: 

388 decoded_target = target 

389 

390 # Check for backslash (treated as forward slash by some browsers) 

391 # Prevents \evil.com and \\evil.com bypasses 

392 if "\\" in decoded_target: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true

393 logger.warning("Backslash detected in redirect URL") 

394 return False 

395 

396 # Check for protocol-relative URLs before stripping slashes 

397 # //evil.com would be treated as absolute URL by urljoin() 

398 if decoded_target.startswith("//"): 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true

399 logger.warning("Protocol-relative URL detected in redirect") 

400 return False 

401 

402 # Block path traversal patterns in redirect URL path 

403 # Check both original and decoded to catch single-encoded traversal (%2e%2e) 

404 original_path = urlparse(target).path 

405 decoded_path = urlparse(decoded_target).path 

406 if ".." in original_path or ".." in decoded_path: 

407 logger.warning( 

408 "Path traversal detected in redirect URL: %s", target 

409 ) 

410 return False 

411 

412 ref_url = urlparse(host_url) 

413 test_url = urlparse(urljoin(host_url, target)) 

414 return ( 

415 test_url.scheme in ("http", "https") 

416 and ref_url.netloc == test_url.netloc 

417 ) 

418 

419 

420def get_javascript_url_validator() -> str: 

421 """ 

422 Get JavaScript code for URL validation that matches the Python implementation. 

423 

424 Returns: 

425 JavaScript code as a string that can be embedded in web pages 

426 """ 

427 return r""" 

428 // URL validation utilities matching Python URLValidator 

429 const URLValidator = { 

430 UNSAFE_SCHEMES: ['javascript', 'data', 'vbscript', 'about', 'blob', 'file'], 

431 SAFE_SCHEMES: ['http', 'https', 'ftp', 'ftps'], 

432 EMAIL_SCHEME: 'mailto', 

433 

434 isUnsafeScheme: function(url) { 

435 if (!url) return false; 

436 

437 const normalizedUrl = url.trim().toLowerCase(); 

438 

439 for (const scheme of this.UNSAFE_SCHEMES) { 

440 if (normalizedUrl.startsWith(scheme + ':')) { 

441 console.warn(`Unsafe URL scheme detected: ${scheme}`); 

442 return true; 

443 } 

444 } 

445 

446 return false; 

447 }, 

448 

449 isSafeUrl: function(url, options = {}) { 

450 const { 

451 requireScheme = true, 

452 allowFragments = true, 

453 allowMailto = false, 

454 trustedDomains = [] 

455 } = options; 

456 

457 if (!url || typeof url !== 'string') { 

458 return false; 

459 } 

460 

461 // Check for unsafe schemes first 

462 if (this.isUnsafeScheme(url)) { 

463 return false; 

464 } 

465 

466 // Handle fragment-only URLs 

467 if (url.startsWith('#')) { 

468 return allowFragments; 

469 } 

470 

471 // Parse the URL 

472 try { 

473 const parsed = new URL(url, window.location.href); 

474 const scheme = parsed.protocol.slice(0, -1).toLowerCase(); // Remove trailing ':' 

475 

476 // Check if it's a mailto link 

477 if (scheme === this.EMAIL_SCHEME) { 

478 return allowMailto; 

479 } 

480 

481 // Check if it's a safe scheme 

482 if (!this.SAFE_SCHEMES.includes(scheme)) { 

483 console.warn(`Unsafe URL scheme: ${scheme}`); 

484 return false; 

485 } 

486 

487 // Validate domain if trusted domains are specified 

488 if (trustedDomains.length > 0 && parsed.hostname) { 

489 const hostname = parsed.hostname.toLowerCase(); 

490 const isTrusted = trustedDomains.some(domain => 

491 hostname === domain.toLowerCase() || 

492 hostname.endsWith('.' + domain.toLowerCase()) 

493 ); 

494 

495 if (!isTrusted) { 

496 console.warn(`URL domain not in trusted list: ${parsed.hostname}`); 

497 return false; 

498 } 

499 } 

500 

501 return true; 

502 } catch (e) { 

503 console.warn(`Failed to parse URL: ${e.message}`); 

504 return false; 

505 } 

506 }, 

507 

508 sanitizeUrl: function(url, defaultScheme = 'https') { 

509 if (!url) return null; 

510 

511 // Check for unsafe schemes 

512 if (this.isUnsafeScheme(url)) { 

513 return null; 

514 } 

515 

516 // Strip whitespace 

517 url = url.trim(); 

518 

519 // Add scheme if missing 

520 if (!url.match(/^[a-zA-Z][a-zA-Z\d+\-.]*:/)) { 

521 url = `${defaultScheme}://${url}`; 

522 } 

523 

524 // Validate the final URL 

525 if (this.isSafeUrl(url, { requireScheme: true })) { 

526 return url; 

527 } 

528 

529 return null; 

530 } 

531 }; 

532 """