Coverage for src / local_deep_research / security / url_validator.py: 94%
163 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Centralized URL validation utilities for security.
4This module provides secure URL validation to prevent XSS attacks,
5data exfiltration, and other URL-based security vulnerabilities.
6"""
8import re
9from typing import Optional, List
10from urllib.parse import urlparse, urljoin, unquote
11from loguru import logger
14class URLValidationError(ValueError):
15 """Raised when URL construction or validation fails."""
17 pass
20class URLValidator:
21 """Centralized URL validation for security."""
23 # Unsafe URL schemes that could lead to XSS or data exfiltration
24 UNSAFE_SCHEMES = (
25 "javascript",
26 "data",
27 "vbscript",
28 "about",
29 "blob",
30 "file",
31 )
33 # Safe schemes for external links
34 SAFE_SCHEMES = ("http", "https", "ftp", "ftps")
36 # Email scheme
37 EMAIL_SCHEME = "mailto"
39 # Common academic/research domains that should be allowed
40 TRUSTED_ACADEMIC_DOMAINS = (
41 "arxiv.org",
42 "pubmed.ncbi.nlm.nih.gov",
43 "ncbi.nlm.nih.gov",
44 "biorxiv.org",
45 "medrxiv.org",
46 "doi.org",
47 "nature.com",
48 "science.org",
49 "sciencedirect.com",
50 "springer.com",
51 "wiley.com",
52 "plos.org",
53 "pnas.org",
54 "ieee.org",
55 "acm.org",
56 )
58 @staticmethod
59 def is_unsafe_scheme(url: str) -> bool:
60 """
61 Check if a URL uses an unsafe scheme.
63 Args:
64 url: The URL to check
66 Returns:
67 True if the URL uses an unsafe scheme, False otherwise
68 """
69 if not url:
70 return False
72 # Normalize the URL - trim whitespace and convert to lowercase
73 normalized_url = url.strip().lower()
75 # Check for unsafe schemes
76 for scheme in URLValidator.UNSAFE_SCHEMES:
77 if normalized_url.startswith(f"{scheme}:"):
78 logger.warning(
79 f"Unsafe URL scheme detected: {scheme} in URL: {url[:100]}"
80 )
81 return True
83 return False
85 @staticmethod
86 def is_safe_url(
87 url: str,
88 require_scheme: bool = True,
89 allow_fragments: bool = True,
90 allow_mailto: bool = False,
91 trusted_domains: Optional[List[str]] = None,
92 ) -> bool:
93 """
94 Validate if a URL is safe to use.
96 Args:
97 url: The URL to validate
98 require_scheme: Whether to require an explicit scheme
99 allow_fragments: Whether to allow fragment identifiers (#)
100 allow_mailto: Whether to allow mailto: links
101 trusted_domains: Optional list of trusted domains
103 Returns:
104 True if the URL is safe, False otherwise
105 """
106 if not url or not isinstance(url, str):
107 return False
109 # Check for unsafe schemes first
110 if URLValidator.is_unsafe_scheme(url):
111 return False
113 # Handle fragment-only URLs
114 if url.startswith("#"):
115 return allow_fragments
117 # Parse the URL
118 try:
119 parsed = urlparse(url)
120 except Exception:
121 logger.warning(f"Failed to parse URL '{url[:100]}'")
122 return False
124 # Check scheme
125 if not parsed.scheme:
126 if require_scheme:
127 return False
128 # If no scheme is required, assume http/https for URL parsing
129 parsed = urlparse(f"http://{url}") # DevSkim: ignore DS137138
131 scheme_lower = parsed.scheme.lower()
133 # Check if it's a mailto link
134 if scheme_lower == URLValidator.EMAIL_SCHEME:
135 return allow_mailto
137 # Check if it's a safe scheme
138 if scheme_lower not in URLValidator.SAFE_SCHEMES:
139 logger.warning(f"Unsafe URL scheme: {scheme_lower}")
140 return False
142 # Validate domain if trusted domains are specified
143 if trusted_domains and parsed.hostname:
144 hostname_lower = parsed.hostname.lower()
145 if not any(
146 hostname_lower == domain.lower()
147 or hostname_lower.endswith(f".{domain.lower()}")
148 for domain in trusted_domains
149 ):
150 logger.warning(
151 f"URL domain not in trusted list: {parsed.hostname}"
152 )
153 return False
155 # Check for suspicious patterns in the URL
156 if URLValidator._has_suspicious_patterns(url):
157 return False
159 return True
161 @staticmethod
162 def _has_suspicious_patterns(url: str) -> bool:
163 """
164 Check for suspicious patterns in URLs that might indicate attacks.
166 Args:
167 url: The URL to check
169 Returns:
170 True if suspicious patterns are found, False otherwise
171 """
172 suspicious_patterns = [
173 # Double encoding
174 r"%25[0-9a-fA-F]{2}",
175 # Null bytes
176 r"%00",
177 # Unicode encoding bypass attempts
178 r"\\u[0-9a-fA-F]{4}",
179 # HTML entity encoding
180 r"&(#x?[0-9a-fA-F]+|[a-zA-Z]+);",
181 ]
183 for pattern in suspicious_patterns:
184 if re.search(pattern, url, re.IGNORECASE):
185 logger.warning(f"Suspicious pattern found in URL: {pattern}")
186 return True
188 return False
190 @staticmethod
191 def sanitize_url(url: str, default_scheme: str = "https") -> Optional[str]:
192 """
193 Sanitize a URL by adding a scheme if missing and validating it.
195 Args:
196 url: The URL to sanitize
197 default_scheme: The default scheme to add if missing
199 Returns:
200 Sanitized URL or None if the URL is unsafe
201 """
202 if not url:
203 return None
205 # Check for unsafe schemes
206 if URLValidator.is_unsafe_scheme(url):
207 return None
209 # Strip whitespace
210 url = url.strip()
212 # Parse the URL
213 try:
214 parsed = urlparse(url)
216 # Add scheme if missing
217 if not parsed.scheme:
218 url = f"{default_scheme}://{url}"
219 parsed = urlparse(url)
221 # Validate the final URL
222 if URLValidator.is_safe_url(url, require_scheme=True):
223 return url
225 except Exception:
226 logger.warning(f"Failed to sanitize URL '{url[:100]}'")
228 return None
230 @staticmethod
231 def is_academic_url(url: str) -> bool:
232 """
233 Check if a URL is from a known academic/research domain.
235 Args:
236 url: The URL to check
238 Returns:
239 True if the URL is from an academic domain, False otherwise
240 """
241 try:
242 parsed = urlparse(url)
243 if parsed.hostname:
244 hostname_lower = parsed.hostname.lower()
245 return any(
246 hostname_lower == domain
247 or hostname_lower.endswith(f".{domain}")
248 for domain in URLValidator.TRUSTED_ACADEMIC_DOMAINS
249 )
250 except Exception:
251 logger.debug(
252 "URL parsing may fail on malformed input; treat as non-academic",
253 exc_info=True,
254 )
256 return False
258 @staticmethod
259 def extract_doi(url: str) -> Optional[str]:
260 """
261 Extract DOI from a URL if present.
263 Args:
264 url: The URL to extract DOI from
266 Returns:
267 The DOI if found, None otherwise
268 """
269 # Common DOI patterns with explicit pattern identification
270 doi_patterns = [
271 (
272 r"10\.\d{4,}(?:\.\d+)*\/[-._;()\/:a-zA-Z0-9]+",
273 0,
274 ), # Direct DOI, group 0
275 (r"doi\.org\/(10\.\d{4,}[^\s]*)", 1), # doi.org URL, group 1
276 ]
278 for pattern, group_index in doi_patterns:
279 match = re.search(pattern, url, re.IGNORECASE)
280 if match:
281 return match.group(group_index)
283 return None
285 @staticmethod
286 def validate_http_url(url: str) -> bool:
287 """
288 Validate that a callback URL is well-formed and safe for HTTP/HTTPS use.
290 This is stricter than is_safe_url() and specifically validates HTTP/HTTPS
291 URLs for use as application callbacks (e.g., in notifications, redirects).
292 It does NOT validate Apprise service URLs which use other protocols.
294 Args:
295 url: HTTP/HTTPS callback URL to validate
297 Returns:
298 True if valid
300 Raises:
301 URLValidationError: If URL is invalid
302 """
303 if not url or not isinstance(url, str):
304 raise URLValidationError("URL must be a non-empty string")
306 try:
307 parsed = urlparse(url)
308 except URLValidationError:
309 raise
310 except Exception as e:
311 raise URLValidationError(f"Failed to validate URL: {e}") from e
313 # Must have a scheme
314 if not parsed.scheme:
315 raise URLValidationError("URL must have a scheme (http or https)")
317 # Must be http or https (callback URLs only)
318 if parsed.scheme not in ("http", "https"):
319 raise URLValidationError(
320 f"URL scheme must be http or https, got: {parsed.scheme}"
321 )
323 # Use the general security validator for additional safety
324 if not URLValidator.is_safe_url(url, require_scheme=True):
325 raise URLValidationError(f"URL failed security validation: {url}")
327 # Must have a netloc (hostname)
328 if not parsed.netloc:
329 raise URLValidationError("URL must have a hostname")
331 # Check for obvious hostname issues
332 if parsed.netloc.startswith(".") or parsed.netloc.endswith("."):
333 raise URLValidationError(f"Invalid hostname: {parsed.netloc}")
335 # Path should be valid if present
336 if parsed.path and not parsed.path.startswith("/"): 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 raise URLValidationError(
338 f"URL path must start with /: {parsed.path}"
339 )
341 return True
343 @staticmethod
344 def is_safe_redirect_url(target: str, host_url: str) -> bool:
345 """
346 Validate that a redirect target is safe (same host).
348 Prevents open redirect attacks by ensuring the target URL
349 is either relative or points to the same host as the application.
350 Also prevents CRLF injection attacks by rejecting URLs containing
351 carriage return or line feed characters.
353 Uses the standard Flask pattern from:
354 https://github.com/fengsp/flask-snippets/blob/master/security/redirect_back.py
356 Security protections implemented:
357 - CRLF injection prevention
358 - Protocol-relative URL bypass (//evil.com)
359 - Triple-slash bypass (///evil.com)
360 - URL-encoded bypass attempts
361 - Backslash bypass (treated as forward slash by some browsers)
362 - Path traversal detection
364 Args:
365 target: The redirect URL to validate (can be relative or absolute)
366 host_url: The application's host URL (e.g., request.host_url)
368 Returns:
369 True if the URL is safe to redirect to, False otherwise
370 """
371 if not target:
372 return False
374 # Prevent CRLF injection by rejecting URLs with CR or LF characters
375 if "\r" in target or "\n" in target:
376 logger.warning("CRLF injection attempt detected in redirect URL")
377 return False
379 # Normalize and decode URL to detect encoded bypass attempts
380 # This catches %2f%2f (encoded //) and similar tricks
381 decoded_target = unquote(target)
383 # Check for encoded CRLF injection (e.g. %0d%0a)
384 if "\r" in decoded_target or "\n" in decoded_target:
385 logger.warning(
386 "Encoded CRLF injection attempt detected in redirect URL"
387 )
388 return False
390 # Check for backslash (treated as forward slash by some browsers)
391 # Prevents \evil.com and \\evil.com bypasses
392 if "\\" in decoded_target:
393 logger.warning("Backslash detected in redirect URL")
394 return False
396 # Check for protocol-relative URLs before stripping slashes
397 # //evil.com would be treated as absolute URL by urljoin()
398 if decoded_target.startswith("//"):
399 logger.warning("Protocol-relative URL detected in redirect")
400 return False
402 # Block null bytes (truncation attacks in some URL processors)
403 if "\x00" in decoded_target:
404 logger.warning("Null byte detected in redirect URL")
405 return False
407 # Block path traversal patterns (/../, /.. at end, ../ at start)
408 if re.search(r"(^|/)\.\.(/|$|\?|#)", decoded_target):
409 logger.warning(
410 "Path traversal detected in redirect URL: {}", target
411 )
412 return False
414 ref_url = urlparse(host_url)
415 test_url = urlparse(urljoin(host_url, target))
417 if test_url.path.startswith("//"):
418 logger.warning("Double-slash path in redirect URL: {}", target)
419 return False
421 return (
422 test_url.scheme in ("http", "https")
423 and ref_url.netloc == test_url.netloc
424 )
426 @staticmethod
427 def get_safe_redirect_path(target: str, host_url: str) -> Optional[str]:
428 """
429 Validate a redirect target and return its path-only form.
431 Combines is_safe_redirect_url validation with defense-in-depth path
432 extraction. The target is resolved against host_url via urljoin, then
433 only the path, query, and fragment are returned. This ensures that
434 even a hypothetical validator bypass cannot cause an external redirect,
435 since path-only URLs always resolve to the same host.
437 Args:
438 target: The redirect URL to validate (can be relative or absolute)
439 host_url: The application's host URL (e.g., request.host_url)
441 Returns:
442 The safe path string (e.g., "/dashboard?tab=settings") if the
443 target is valid, or None if the target is unsafe.
444 """
445 if not URLValidator.is_safe_redirect_url(target, host_url):
446 return None
448 resolved = urljoin(host_url, target)
449 parsed = urlparse(resolved)
450 safe_path = parsed.path or "/"
451 if parsed.query:
452 safe_path += "?" + parsed.query
453 if parsed.fragment:
454 safe_path += "#" + parsed.fragment
456 # Final assertion: the constructed path must be relative (no netloc).
457 # This is always true by construction but guards against future bugs.
458 if urlparse(safe_path).netloc: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true
459 return None
460 return safe_path
463def get_javascript_url_validator() -> str:
464 """
465 Get JavaScript code for URL validation that matches the Python implementation.
467 Returns:
468 JavaScript code as a string that can be embedded in web pages
469 """
470 return r"""
471 // URL validation utilities matching Python URLValidator
472 const URLValidator = {
473 UNSAFE_SCHEMES: ['javascript', 'data', 'vbscript', 'about', 'blob', 'file'],
474 SAFE_SCHEMES: ['http', 'https', 'ftp', 'ftps'],
475 EMAIL_SCHEME: 'mailto',
477 isUnsafeScheme: function(url) {
478 if (!url) return false;
480 const normalizedUrl = url.trim().toLowerCase();
482 for (const scheme of this.UNSAFE_SCHEMES) {
483 if (normalizedUrl.startsWith(scheme + ':')) {
484 console.warn(`Unsafe URL scheme detected: ${scheme}`);
485 return true;
486 }
487 }
489 return false;
490 },
492 isSafeUrl: function(url, options = {}) {
493 const {
494 requireScheme = true,
495 allowFragments = true,
496 allowMailto = false,
497 trustedDomains = []
498 } = options;
500 if (!url || typeof url !== 'string') {
501 return false;
502 }
504 // Check for unsafe schemes first
505 if (this.isUnsafeScheme(url)) {
506 return false;
507 }
509 // Handle fragment-only URLs
510 if (url.startsWith('#')) {
511 return allowFragments;
512 }
514 // Parse the URL
515 try {
516 const parsed = new URL(url, window.location.href);
517 const scheme = parsed.protocol.slice(0, -1).toLowerCase(); // Remove trailing ':'
519 // Check if it's a mailto link
520 if (scheme === this.EMAIL_SCHEME) {
521 return allowMailto;
522 }
524 // Check if it's a safe scheme
525 if (!this.SAFE_SCHEMES.includes(scheme)) {
526 console.warn(`Unsafe URL scheme: ${scheme}`);
527 return false;
528 }
530 // Validate domain if trusted domains are specified
531 if (trustedDomains.length > 0 && parsed.hostname) {
532 const hostname = parsed.hostname.toLowerCase();
533 const isTrusted = trustedDomains.some(domain =>
534 hostname === domain.toLowerCase() ||
535 hostname.endsWith('.' + domain.toLowerCase())
536 );
538 if (!isTrusted) {
539 console.warn(`URL domain not in trusted list: ${parsed.hostname}`);
540 return false;
541 }
542 }
544 return true;
545 } catch (e) {
546 console.warn(`Failed to parse URL: ${e.message}`);
547 return false;
548 }
549 },
551 sanitizeUrl: function(url, defaultScheme = 'https') {
552 if (!url) return null;
554 // Check for unsafe schemes
555 if (this.isUnsafeScheme(url)) {
556 return null;
557 }
559 // Strip whitespace
560 url = url.trim();
562 // Add scheme if missing
563 if (!url.match(/^[a-zA-Z][a-zA-Z\d+\-.]*:/)) {
564 url = `${defaultScheme}://${url}`;
565 }
567 // Validate the final URL
568 if (this.isSafeUrl(url, { requireScheme: true })) {
569 return url;
570 }
572 return null;
573 }
574 };
575 """