Coverage for src / local_deep_research / security / url_validator.py: 94%

163 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Centralized URL validation utilities for security. 

3 

4This module provides secure URL validation to prevent XSS attacks, 

5data exfiltration, and other URL-based security vulnerabilities. 

6""" 

7 

8import re 

9from typing import Optional, List 

10from urllib.parse import urlparse, urljoin, unquote 

11from loguru import logger 

12 

13 

14class URLValidationError(ValueError): 

15 """Raised when URL construction or validation fails.""" 

16 

17 pass 

18 

19 

20class URLValidator: 

21 """Centralized URL validation for security.""" 

22 

23 # Unsafe URL schemes that could lead to XSS or data exfiltration 

24 UNSAFE_SCHEMES = ( 

25 "javascript", 

26 "data", 

27 "vbscript", 

28 "about", 

29 "blob", 

30 "file", 

31 ) 

32 

33 # Safe schemes for external links 

34 SAFE_SCHEMES = ("http", "https", "ftp", "ftps") 

35 

36 # Email scheme 

37 EMAIL_SCHEME = "mailto" 

38 

39 # Common academic/research domains that should be allowed 

40 TRUSTED_ACADEMIC_DOMAINS = ( 

41 "arxiv.org", 

42 "pubmed.ncbi.nlm.nih.gov", 

43 "ncbi.nlm.nih.gov", 

44 "biorxiv.org", 

45 "medrxiv.org", 

46 "doi.org", 

47 "nature.com", 

48 "science.org", 

49 "sciencedirect.com", 

50 "springer.com", 

51 "wiley.com", 

52 "plos.org", 

53 "pnas.org", 

54 "ieee.org", 

55 "acm.org", 

56 ) 

57 

58 @staticmethod 

59 def is_unsafe_scheme(url: str) -> bool: 

60 """ 

61 Check if a URL uses an unsafe scheme. 

62 

63 Args: 

64 url: The URL to check 

65 

66 Returns: 

67 True if the URL uses an unsafe scheme, False otherwise 

68 """ 

69 if not url: 

70 return False 

71 

72 # Normalize the URL - trim whitespace and convert to lowercase 

73 normalized_url = url.strip().lower() 

74 

75 # Check for unsafe schemes 

76 for scheme in URLValidator.UNSAFE_SCHEMES: 

77 if normalized_url.startswith(f"{scheme}:"): 

78 logger.warning( 

79 f"Unsafe URL scheme detected: {scheme} in URL: {url[:100]}" 

80 ) 

81 return True 

82 

83 return False 

84 

85 @staticmethod 

86 def is_safe_url( 

87 url: str, 

88 require_scheme: bool = True, 

89 allow_fragments: bool = True, 

90 allow_mailto: bool = False, 

91 trusted_domains: Optional[List[str]] = None, 

92 ) -> bool: 

93 """ 

94 Validate if a URL is safe to use. 

95 

96 Args: 

97 url: The URL to validate 

98 require_scheme: Whether to require an explicit scheme 

99 allow_fragments: Whether to allow fragment identifiers (#) 

100 allow_mailto: Whether to allow mailto: links 

101 trusted_domains: Optional list of trusted domains 

102 

103 Returns: 

104 True if the URL is safe, False otherwise 

105 """ 

106 if not url or not isinstance(url, str): 

107 return False 

108 

109 # Check for unsafe schemes first 

110 if URLValidator.is_unsafe_scheme(url): 

111 return False 

112 

113 # Handle fragment-only URLs 

114 if url.startswith("#"): 

115 return allow_fragments 

116 

117 # Parse the URL 

118 try: 

119 parsed = urlparse(url) 

120 except Exception: 

121 logger.warning(f"Failed to parse URL '{url[:100]}'") 

122 return False 

123 

124 # Check scheme 

125 if not parsed.scheme: 

126 if require_scheme: 

127 return False 

128 # If no scheme is required, assume http/https for URL parsing 

129 parsed = urlparse(f"http://{url}") # DevSkim: ignore DS137138 

130 

131 scheme_lower = parsed.scheme.lower() 

132 

133 # Check if it's a mailto link 

134 if scheme_lower == URLValidator.EMAIL_SCHEME: 

135 return allow_mailto 

136 

137 # Check if it's a safe scheme 

138 if scheme_lower not in URLValidator.SAFE_SCHEMES: 

139 logger.warning(f"Unsafe URL scheme: {scheme_lower}") 

140 return False 

141 

142 # Validate domain if trusted domains are specified 

143 if trusted_domains and parsed.hostname: 

144 hostname_lower = parsed.hostname.lower() 

145 if not any( 

146 hostname_lower == domain.lower() 

147 or hostname_lower.endswith(f".{domain.lower()}") 

148 for domain in trusted_domains 

149 ): 

150 logger.warning( 

151 f"URL domain not in trusted list: {parsed.hostname}" 

152 ) 

153 return False 

154 

155 # Check for suspicious patterns in the URL 

156 if URLValidator._has_suspicious_patterns(url): 

157 return False 

158 

159 return True 

160 

161 @staticmethod 

162 def _has_suspicious_patterns(url: str) -> bool: 

163 """ 

164 Check for suspicious patterns in URLs that might indicate attacks. 

165 

166 Args: 

167 url: The URL to check 

168 

169 Returns: 

170 True if suspicious patterns are found, False otherwise 

171 """ 

172 suspicious_patterns = [ 

173 # Double encoding 

174 r"%25[0-9a-fA-F]{2}", 

175 # Null bytes 

176 r"%00", 

177 # Unicode encoding bypass attempts 

178 r"\\u[0-9a-fA-F]{4}", 

179 # HTML entity encoding 

180 r"&(#x?[0-9a-fA-F]+|[a-zA-Z]+);", 

181 ] 

182 

183 for pattern in suspicious_patterns: 

184 if re.search(pattern, url, re.IGNORECASE): 

185 logger.warning(f"Suspicious pattern found in URL: {pattern}") 

186 return True 

187 

188 return False 

189 

190 @staticmethod 

191 def sanitize_url(url: str, default_scheme: str = "https") -> Optional[str]: 

192 """ 

193 Sanitize a URL by adding a scheme if missing and validating it. 

194 

195 Args: 

196 url: The URL to sanitize 

197 default_scheme: The default scheme to add if missing 

198 

199 Returns: 

200 Sanitized URL or None if the URL is unsafe 

201 """ 

202 if not url: 

203 return None 

204 

205 # Check for unsafe schemes 

206 if URLValidator.is_unsafe_scheme(url): 

207 return None 

208 

209 # Strip whitespace 

210 url = url.strip() 

211 

212 # Parse the URL 

213 try: 

214 parsed = urlparse(url) 

215 

216 # Add scheme if missing 

217 if not parsed.scheme: 

218 url = f"{default_scheme}://{url}" 

219 parsed = urlparse(url) 

220 

221 # Validate the final URL 

222 if URLValidator.is_safe_url(url, require_scheme=True): 

223 return url 

224 

225 except Exception: 

226 logger.warning(f"Failed to sanitize URL '{url[:100]}'") 

227 

228 return None 

229 

230 @staticmethod 

231 def is_academic_url(url: str) -> bool: 

232 """ 

233 Check if a URL is from a known academic/research domain. 

234 

235 Args: 

236 url: The URL to check 

237 

238 Returns: 

239 True if the URL is from an academic domain, False otherwise 

240 """ 

241 try: 

242 parsed = urlparse(url) 

243 if parsed.hostname: 

244 hostname_lower = parsed.hostname.lower() 

245 return any( 

246 hostname_lower == domain 

247 or hostname_lower.endswith(f".{domain}") 

248 for domain in URLValidator.TRUSTED_ACADEMIC_DOMAINS 

249 ) 

250 except Exception: 

251 logger.debug( 

252 "URL parsing may fail on malformed input; treat as non-academic", 

253 exc_info=True, 

254 ) 

255 

256 return False 

257 

258 @staticmethod 

259 def extract_doi(url: str) -> Optional[str]: 

260 """ 

261 Extract DOI from a URL if present. 

262 

263 Args: 

264 url: The URL to extract DOI from 

265 

266 Returns: 

267 The DOI if found, None otherwise 

268 """ 

269 # Common DOI patterns with explicit pattern identification 

270 doi_patterns = [ 

271 ( 

272 r"10\.\d{4,}(?:\.\d+)*\/[-._;()\/:a-zA-Z0-9]+", 

273 0, 

274 ), # Direct DOI, group 0 

275 (r"doi\.org\/(10\.\d{4,}[^\s]*)", 1), # doi.org URL, group 1 

276 ] 

277 

278 for pattern, group_index in doi_patterns: 

279 match = re.search(pattern, url, re.IGNORECASE) 

280 if match: 

281 return match.group(group_index) 

282 

283 return None 

284 

285 @staticmethod 

286 def validate_http_url(url: str) -> bool: 

287 """ 

288 Validate that a callback URL is well-formed and safe for HTTP/HTTPS use. 

289 

290 This is stricter than is_safe_url() and specifically validates HTTP/HTTPS 

291 URLs for use as application callbacks (e.g., in notifications, redirects). 

292 It does NOT validate Apprise service URLs which use other protocols. 

293 

294 Args: 

295 url: HTTP/HTTPS callback URL to validate 

296 

297 Returns: 

298 True if valid 

299 

300 Raises: 

301 URLValidationError: If URL is invalid 

302 """ 

303 if not url or not isinstance(url, str): 

304 raise URLValidationError("URL must be a non-empty string") 

305 

306 try: 

307 parsed = urlparse(url) 

308 except URLValidationError: 

309 raise 

310 except Exception as e: 

311 raise URLValidationError(f"Failed to validate URL: {e}") from e 

312 

313 # Must have a scheme 

314 if not parsed.scheme: 

315 raise URLValidationError("URL must have a scheme (http or https)") 

316 

317 # Must be http or https (callback URLs only) 

318 if parsed.scheme not in ("http", "https"): 

319 raise URLValidationError( 

320 f"URL scheme must be http or https, got: {parsed.scheme}" 

321 ) 

322 

323 # Use the general security validator for additional safety 

324 if not URLValidator.is_safe_url(url, require_scheme=True): 

325 raise URLValidationError(f"URL failed security validation: {url}") 

326 

327 # Must have a netloc (hostname) 

328 if not parsed.netloc: 

329 raise URLValidationError("URL must have a hostname") 

330 

331 # Check for obvious hostname issues 

332 if parsed.netloc.startswith(".") or parsed.netloc.endswith("."): 

333 raise URLValidationError(f"Invalid hostname: {parsed.netloc}") 

334 

335 # Path should be valid if present 

336 if parsed.path and not parsed.path.startswith("/"): 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 raise URLValidationError( 

338 f"URL path must start with /: {parsed.path}" 

339 ) 

340 

341 return True 

342 

343 @staticmethod 

344 def is_safe_redirect_url(target: str, host_url: str) -> bool: 

345 """ 

346 Validate that a redirect target is safe (same host). 

347 

348 Prevents open redirect attacks by ensuring the target URL 

349 is either relative or points to the same host as the application. 

350 Also prevents CRLF injection attacks by rejecting URLs containing 

351 carriage return or line feed characters. 

352 

353 Uses the standard Flask pattern from: 

354 https://github.com/fengsp/flask-snippets/blob/master/security/redirect_back.py 

355 

356 Security protections implemented: 

357 - CRLF injection prevention 

358 - Protocol-relative URL bypass (//evil.com) 

359 - Triple-slash bypass (///evil.com) 

360 - URL-encoded bypass attempts 

361 - Backslash bypass (treated as forward slash by some browsers) 

362 - Path traversal detection 

363 

364 Args: 

365 target: The redirect URL to validate (can be relative or absolute) 

366 host_url: The application's host URL (e.g., request.host_url) 

367 

368 Returns: 

369 True if the URL is safe to redirect to, False otherwise 

370 """ 

371 if not target: 

372 return False 

373 

374 # Prevent CRLF injection by rejecting URLs with CR or LF characters 

375 if "\r" in target or "\n" in target: 

376 logger.warning("CRLF injection attempt detected in redirect URL") 

377 return False 

378 

379 # Normalize and decode URL to detect encoded bypass attempts 

380 # This catches %2f%2f (encoded //) and similar tricks 

381 decoded_target = unquote(target) 

382 

383 # Check for encoded CRLF injection (e.g. %0d%0a) 

384 if "\r" in decoded_target or "\n" in decoded_target: 

385 logger.warning( 

386 "Encoded CRLF injection attempt detected in redirect URL" 

387 ) 

388 return False 

389 

390 # Check for backslash (treated as forward slash by some browsers) 

391 # Prevents \evil.com and \\evil.com bypasses 

392 if "\\" in decoded_target: 

393 logger.warning("Backslash detected in redirect URL") 

394 return False 

395 

396 # Check for protocol-relative URLs before stripping slashes 

397 # //evil.com would be treated as absolute URL by urljoin() 

398 if decoded_target.startswith("//"): 

399 logger.warning("Protocol-relative URL detected in redirect") 

400 return False 

401 

402 # Block null bytes (truncation attacks in some URL processors) 

403 if "\x00" in decoded_target: 

404 logger.warning("Null byte detected in redirect URL") 

405 return False 

406 

407 # Block path traversal patterns (/../, /.. at end, ../ at start) 

408 if re.search(r"(^|/)\.\.(/|$|\?|#)", decoded_target): 

409 logger.warning( 

410 "Path traversal detected in redirect URL: {}", target 

411 ) 

412 return False 

413 

414 ref_url = urlparse(host_url) 

415 test_url = urlparse(urljoin(host_url, target)) 

416 

417 if test_url.path.startswith("//"): 

418 logger.warning("Double-slash path in redirect URL: {}", target) 

419 return False 

420 

421 return ( 

422 test_url.scheme in ("http", "https") 

423 and ref_url.netloc == test_url.netloc 

424 ) 

425 

426 @staticmethod 

427 def get_safe_redirect_path(target: str, host_url: str) -> Optional[str]: 

428 """ 

429 Validate a redirect target and return its path-only form. 

430 

431 Combines is_safe_redirect_url validation with defense-in-depth path 

432 extraction. The target is resolved against host_url via urljoin, then 

433 only the path, query, and fragment are returned. This ensures that 

434 even a hypothetical validator bypass cannot cause an external redirect, 

435 since path-only URLs always resolve to the same host. 

436 

437 Args: 

438 target: The redirect URL to validate (can be relative or absolute) 

439 host_url: The application's host URL (e.g., request.host_url) 

440 

441 Returns: 

442 The safe path string (e.g., "/dashboard?tab=settings") if the 

443 target is valid, or None if the target is unsafe. 

444 """ 

445 if not URLValidator.is_safe_redirect_url(target, host_url): 

446 return None 

447 

448 resolved = urljoin(host_url, target) 

449 parsed = urlparse(resolved) 

450 safe_path = parsed.path or "/" 

451 if parsed.query: 

452 safe_path += "?" + parsed.query 

453 if parsed.fragment: 

454 safe_path += "#" + parsed.fragment 

455 

456 # Final assertion: the constructed path must be relative (no netloc). 

457 # This is always true by construction but guards against future bugs. 

458 if urlparse(safe_path).netloc: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true

459 return None 

460 return safe_path 

461 

462 

463def get_javascript_url_validator() -> str: 

464 """ 

465 Get JavaScript code for URL validation that matches the Python implementation. 

466 

467 Returns: 

468 JavaScript code as a string that can be embedded in web pages 

469 """ 

470 return r""" 

471 // URL validation utilities matching Python URLValidator 

472 const URLValidator = { 

473 UNSAFE_SCHEMES: ['javascript', 'data', 'vbscript', 'about', 'blob', 'file'], 

474 SAFE_SCHEMES: ['http', 'https', 'ftp', 'ftps'], 

475 EMAIL_SCHEME: 'mailto', 

476 

477 isUnsafeScheme: function(url) { 

478 if (!url) return false; 

479 

480 const normalizedUrl = url.trim().toLowerCase(); 

481 

482 for (const scheme of this.UNSAFE_SCHEMES) { 

483 if (normalizedUrl.startsWith(scheme + ':')) { 

484 console.warn(`Unsafe URL scheme detected: ${scheme}`); 

485 return true; 

486 } 

487 } 

488 

489 return false; 

490 }, 

491 

492 isSafeUrl: function(url, options = {}) { 

493 const { 

494 requireScheme = true, 

495 allowFragments = true, 

496 allowMailto = false, 

497 trustedDomains = [] 

498 } = options; 

499 

500 if (!url || typeof url !== 'string') { 

501 return false; 

502 } 

503 

504 // Check for unsafe schemes first 

505 if (this.isUnsafeScheme(url)) { 

506 return false; 

507 } 

508 

509 // Handle fragment-only URLs 

510 if (url.startsWith('#')) { 

511 return allowFragments; 

512 } 

513 

514 // Parse the URL 

515 try { 

516 const parsed = new URL(url, window.location.href); 

517 const scheme = parsed.protocol.slice(0, -1).toLowerCase(); // Remove trailing ':' 

518 

519 // Check if it's a mailto link 

520 if (scheme === this.EMAIL_SCHEME) { 

521 return allowMailto; 

522 } 

523 

524 // Check if it's a safe scheme 

525 if (!this.SAFE_SCHEMES.includes(scheme)) { 

526 console.warn(`Unsafe URL scheme: ${scheme}`); 

527 return false; 

528 } 

529 

530 // Validate domain if trusted domains are specified 

531 if (trustedDomains.length > 0 && parsed.hostname) { 

532 const hostname = parsed.hostname.toLowerCase(); 

533 const isTrusted = trustedDomains.some(domain => 

534 hostname === domain.toLowerCase() || 

535 hostname.endsWith('.' + domain.toLowerCase()) 

536 ); 

537 

538 if (!isTrusted) { 

539 console.warn(`URL domain not in trusted list: ${parsed.hostname}`); 

540 return false; 

541 } 

542 } 

543 

544 return true; 

545 } catch (e) { 

546 console.warn(`Failed to parse URL: ${e.message}`); 

547 return false; 

548 } 

549 }, 

550 

551 sanitizeUrl: function(url, defaultScheme = 'https') { 

552 if (!url) return null; 

553 

554 // Check for unsafe schemes 

555 if (this.isUnsafeScheme(url)) { 

556 return null; 

557 } 

558 

559 // Strip whitespace 

560 url = url.trim(); 

561 

562 // Add scheme if missing 

563 if (!url.match(/^[a-zA-Z][a-zA-Z\d+\-.]*:/)) { 

564 url = `${defaultScheme}://${url}`; 

565 } 

566 

567 // Validate the final URL 

568 if (this.isSafeUrl(url, { requireScheme: true })) { 

569 return url; 

570 } 

571 

572 return null; 

573 } 

574 }; 

575 """