Coverage for src / local_deep_research / security / notification_validator.py: 96%
63 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Security validation for notification service URLs.
4This module provides validation for user-configured notification service URLs
5to prevent Server-Side Request Forgery (SSRF) attacks and other security issues.
6"""
8import ipaddress
9from typing import Optional, Tuple
10from urllib.parse import urlparse
11from loguru import logger
13from .ip_ranges import PRIVATE_IP_RANGES as _PRIVATE_IP_RANGES
16class NotificationURLValidationError(ValueError):
17 """Raised when a notification service URL fails security validation."""
19 pass
22class NotificationURLValidator:
23 """Validates notification service URLs to prevent SSRF and other attacks."""
25 # Dangerous protocols that should never be used for notifications
26 BLOCKED_SCHEMES = (
27 "file", # Local file access
28 "ftp", # FTP can be abused for SSRF
29 "ftps", # Secure FTP can be abused for SSRF
30 "data", # Data URIs can leak sensitive data
31 "javascript", # XSS/code execution
32 "vbscript", # XSS/code execution
33 "about", # Browser internal
34 "blob", # Browser internal
35 )
37 # Allowed protocols for notification services
38 ALLOWED_SCHEMES = (
39 "http", # Webhook services
40 "https", # Webhook services (preferred)
41 "mailto", # Email notifications
42 "discord", # Discord webhooks
43 "slack", # Slack webhooks
44 "telegram", # Telegram bot API
45 "gotify", # Gotify notifications
46 "pushover", # Pushover notifications
47 "ntfy", # ntfy.sh notifications
48 "matrix", # Matrix protocol
49 "mattermost", # Mattermost webhooks
50 "rocketchat", # Rocket.Chat webhooks
51 "teams", # Microsoft Teams
52 "json", # Generic JSON webhooks
53 "xml", # Generic XML webhooks
54 "form", # Form-encoded webhooks
55 )
57 # Reuse shared private IP range definitions
58 PRIVATE_IP_RANGES = _PRIVATE_IP_RANGES
60 @staticmethod
61 def _is_private_ip(hostname: str) -> bool:
62 """
63 Check if hostname resolves to a private IP address.
65 Args:
66 hostname: Hostname to check
68 Returns:
69 True if hostname is a private IP or localhost
70 """
71 # Check for localhost variations
72 if hostname.lower() in (
73 "localhost",
74 "127.0.0.1",
75 "::1",
76 "0.0.0.0",
77 "::",
78 ):
79 return True
81 # Try to parse as IP address
82 try:
83 ip = ipaddress.ip_address(hostname)
84 return any(
85 ip in network
86 for network in NotificationURLValidator.PRIVATE_IP_RANGES
87 )
88 except ValueError:
89 # Not a valid IP address, might be a hostname
90 # For security, we don't resolve hostnames to avoid DNS rebinding attacks
91 # Apprise services should be configured with public endpoints
92 return False
94 @staticmethod
95 def validate_service_url(
96 url: str, allow_private_ips: bool = False
97 ) -> Tuple[bool, Optional[str]]:
98 """
99 Validate a notification service URL for security issues.
101 This function prevents SSRF attacks by validating that service URLs
102 use safe protocols and don't target private/internal infrastructure.
104 Args:
105 url: Service URL to validate (e.g., "discord://webhook_id/token")
106 allow_private_ips: Whether to allow private IPs (default: False)
107 Set to True for development/testing environments
109 Returns:
110 Tuple of (is_valid, error_message)
111 - is_valid: True if URL passes security checks
112 - error_message: None if valid, error description if invalid
114 Examples:
115 >>> validate_service_url("discord://webhook_id/token")
116 (True, None)
118 >>> validate_service_url("file:///etc/passwd")
119 (False, "Blocked unsafe protocol: file")
121 >>> validate_service_url("http://localhost:5000/webhook")
122 (False, "Blocked private/internal IP address: localhost")
123 """
124 if not url or not isinstance(url, str):
125 return False, "Service URL must be a non-empty string"
127 # Strip whitespace
128 url = url.strip()
130 # Parse URL
131 try:
132 parsed = urlparse(url)
133 except Exception as e:
134 logger.warning(f"Failed to parse service URL: {e}")
135 return False, f"Invalid URL format: {e}"
137 # Check for scheme
138 if not parsed.scheme:
139 return False, "Service URL must have a protocol (e.g., https://)"
141 scheme = parsed.scheme.lower()
143 # Check for blocked schemes
144 if scheme in NotificationURLValidator.BLOCKED_SCHEMES:
145 logger.warning(
146 f"Blocked unsafe notification protocol: {scheme} in URL: {url[:50]}..."
147 )
148 return False, f"Blocked unsafe protocol: {scheme}"
150 # Check for allowed schemes
151 if scheme not in NotificationURLValidator.ALLOWED_SCHEMES:
152 logger.warning(
153 f"Unknown notification protocol: {scheme} in URL: {url[:50]}..."
154 )
155 return (
156 False,
157 f"Unsupported protocol: {scheme}. "
158 f"Allowed: {', '.join(NotificationURLValidator.ALLOWED_SCHEMES[:5])}...",
159 )
161 # For HTTP/HTTPS, check for private IPs (SSRF prevention)
162 if scheme in ("http", "https") and not allow_private_ips:
163 if parsed.hostname: 163 ↛ 175line 163 didn't jump to line 175 because the condition on line 163 was always true
164 if NotificationURLValidator._is_private_ip(parsed.hostname):
165 logger.warning(
166 f"Blocked private/internal IP in notification URL: "
167 f"{parsed.hostname}"
168 )
169 return (
170 False,
171 f"Blocked private/internal IP address: {parsed.hostname}",
172 )
174 # Passed all security checks
175 return True, None
177 @staticmethod
178 def validate_service_url_strict(
179 url: str, allow_private_ips: bool = False
180 ) -> bool:
181 """
182 Strict validation that raises an exception on invalid URLs.
184 Args:
185 url: Service URL to validate
186 allow_private_ips: Whether to allow private IPs (default: False)
188 Returns:
189 True if valid
191 Raises:
192 NotificationURLValidationError: If URL fails security validation
193 """
194 is_valid, error_message = NotificationURLValidator.validate_service_url(
195 url, allow_private_ips
196 )
198 if not is_valid:
199 raise NotificationURLValidationError(
200 f"Notification service URL validation failed: {error_message}"
201 )
203 return True
205 @staticmethod
206 def validate_multiple_urls(
207 urls: str, allow_private_ips: bool = False, separator: str = ","
208 ) -> Tuple[bool, Optional[str]]:
209 """
210 Validate multiple comma-separated service URLs.
212 Args:
213 urls: Comma-separated service URLs
214 allow_private_ips: Whether to allow private IPs (default: False)
215 separator: URL separator (default: ",")
217 Returns:
218 Tuple of (all_valid, error_message)
219 - all_valid: True if all URLs pass validation
220 - error_message: None if all valid, first error if any invalid
221 """
222 if not urls or not isinstance(urls, str):
223 return False, "Service URLs must be a non-empty string"
225 # Split by separator and strip whitespace
226 url_list = [url.strip() for url in urls.split(separator) if url.strip()]
228 if not url_list:
229 return False, "No valid URLs found after parsing"
231 # Validate each URL
232 for url in url_list:
233 is_valid, error_message = (
234 NotificationURLValidator.validate_service_url(
235 url, allow_private_ips
236 )
237 )
239 if not is_valid:
240 # Return first error found
241 return False, f"Invalid URL '{url[:50]}...': {error_message}"
243 # All URLs passed validation
244 return True, None