Coverage for src / local_deep_research / security / notification_validator.py: 95%
62 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Security validation for notification service URLs.
4This module provides validation for user-configured notification service URLs
5to prevent Server-Side Request Forgery (SSRF) attacks and other security issues.
6"""
8import ipaddress
9from typing import Optional, Tuple
10from urllib.parse import urlparse
11from loguru import logger
14class NotificationURLValidationError(ValueError):
15 """Raised when a notification service URL fails security validation."""
17 pass
20class NotificationURLValidator:
21 """Validates notification service URLs to prevent SSRF and other attacks."""
23 # Dangerous protocols that should never be used for notifications
24 BLOCKED_SCHEMES = (
25 "file", # Local file access
26 "ftp", # FTP can be abused for SSRF
27 "ftps", # Secure FTP can be abused for SSRF
28 "data", # Data URIs can leak sensitive data
29 "javascript", # XSS/code execution
30 "vbscript", # XSS/code execution
31 "about", # Browser internal
32 "blob", # Browser internal
33 )
35 # Allowed protocols for notification services
36 ALLOWED_SCHEMES = (
37 "http", # Webhook services
38 "https", # Webhook services (preferred)
39 "mailto", # Email notifications
40 "discord", # Discord webhooks
41 "slack", # Slack webhooks
42 "telegram", # Telegram bot API
43 "gotify", # Gotify notifications
44 "pushover", # Pushover notifications
45 "ntfy", # ntfy.sh notifications
46 "matrix", # Matrix protocol
47 "mattermost", # Mattermost webhooks
48 "rocketchat", # Rocket.Chat webhooks
49 "teams", # Microsoft Teams
50 "json", # Generic JSON webhooks
51 "xml", # Generic XML webhooks
52 "form", # Form-encoded webhooks
53 )
55 # Private IP ranges (RFC 1918 + loopback + link-local)
56 PRIVATE_IP_RANGES = [
57 ipaddress.ip_network("127.0.0.0/8"), # Loopback
58 ipaddress.ip_network("10.0.0.0/8"), # Private
59 ipaddress.ip_network("172.16.0.0/12"), # Private
60 ipaddress.ip_network("192.168.0.0/16"), # Private
61 ipaddress.ip_network("169.254.0.0/16"), # Link-local
62 ipaddress.ip_network("::1/128"), # IPv6 loopback
63 ipaddress.ip_network("fc00::/7"), # IPv6 unique local
64 ipaddress.ip_network("fe80::/10"), # IPv6 link-local
65 ]
67 @staticmethod
68 def _is_private_ip(hostname: str) -> bool:
69 """
70 Check if hostname resolves to a private IP address.
72 Args:
73 hostname: Hostname to check
75 Returns:
76 True if hostname is a private IP or localhost
77 """
78 # Check for localhost variations
79 if hostname.lower() in (
80 "localhost",
81 "127.0.0.1",
82 "::1",
83 "0.0.0.0",
84 "::",
85 ):
86 return True
88 # Try to parse as IP address
89 try:
90 ip = ipaddress.ip_address(hostname)
91 return any(
92 ip in network
93 for network in NotificationURLValidator.PRIVATE_IP_RANGES
94 )
95 except ValueError:
96 # Not a valid IP address, might be a hostname
97 # For security, we don't resolve hostnames to avoid DNS rebinding attacks
98 # Apprise services should be configured with public endpoints
99 return False
101 @staticmethod
102 def validate_service_url(
103 url: str, allow_private_ips: bool = False
104 ) -> Tuple[bool, Optional[str]]:
105 """
106 Validate a notification service URL for security issues.
108 This function prevents SSRF attacks by validating that service URLs
109 use safe protocols and don't target private/internal infrastructure.
111 Args:
112 url: Service URL to validate (e.g., "discord://webhook_id/token")
113 allow_private_ips: Whether to allow private IPs (default: False)
114 Set to True for development/testing environments
116 Returns:
117 Tuple of (is_valid, error_message)
118 - is_valid: True if URL passes security checks
119 - error_message: None if valid, error description if invalid
121 Examples:
122 >>> validate_service_url("discord://webhook_id/token")
123 (True, None)
125 >>> validate_service_url("file:///etc/passwd")
126 (False, "Blocked unsafe protocol: file")
128 >>> validate_service_url("http://localhost:5000/webhook")
129 (False, "Blocked private/internal IP address: localhost")
130 """
131 if not url or not isinstance(url, str):
132 return False, "Service URL must be a non-empty string"
134 # Strip whitespace
135 url = url.strip()
137 # Parse URL
138 try:
139 parsed = urlparse(url)
140 except Exception as e:
141 logger.warning(f"Failed to parse service URL: {e}")
142 return False, f"Invalid URL format: {e}"
144 # Check for scheme
145 if not parsed.scheme:
146 return False, "Service URL must have a protocol (e.g., https://)"
148 scheme = parsed.scheme.lower()
150 # Check for blocked schemes
151 if scheme in NotificationURLValidator.BLOCKED_SCHEMES:
152 logger.warning(
153 f"Blocked unsafe notification protocol: {scheme} in URL: {url[:50]}..."
154 )
155 return False, f"Blocked unsafe protocol: {scheme}"
157 # Check for allowed schemes
158 if scheme not in NotificationURLValidator.ALLOWED_SCHEMES:
159 logger.warning(
160 f"Unknown notification protocol: {scheme} in URL: {url[:50]}..."
161 )
162 return (
163 False,
164 f"Unsupported protocol: {scheme}. "
165 f"Allowed: {', '.join(NotificationURLValidator.ALLOWED_SCHEMES[:5])}...",
166 )
168 # For HTTP/HTTPS, check for private IPs (SSRF prevention)
169 if scheme in ("http", "https") and not allow_private_ips:
170 if parsed.hostname: 170 ↛ 182line 170 didn't jump to line 182 because the condition on line 170 was always true
171 if NotificationURLValidator._is_private_ip(parsed.hostname):
172 logger.warning(
173 f"Blocked private/internal IP in notification URL: "
174 f"{parsed.hostname}"
175 )
176 return (
177 False,
178 f"Blocked private/internal IP address: {parsed.hostname}",
179 )
181 # Passed all security checks
182 return True, None
184 @staticmethod
185 def validate_service_url_strict(
186 url: str, allow_private_ips: bool = False
187 ) -> bool:
188 """
189 Strict validation that raises an exception on invalid URLs.
191 Args:
192 url: Service URL to validate
193 allow_private_ips: Whether to allow private IPs (default: False)
195 Returns:
196 True if valid
198 Raises:
199 NotificationURLValidationError: If URL fails security validation
200 """
201 is_valid, error_message = NotificationURLValidator.validate_service_url(
202 url, allow_private_ips
203 )
205 if not is_valid:
206 raise NotificationURLValidationError(
207 f"Notification service URL validation failed: {error_message}"
208 )
210 return True
212 @staticmethod
213 def validate_multiple_urls(
214 urls: str, allow_private_ips: bool = False, separator: str = ","
215 ) -> Tuple[bool, Optional[str]]:
216 """
217 Validate multiple comma-separated service URLs.
219 Args:
220 urls: Comma-separated service URLs
221 allow_private_ips: Whether to allow private IPs (default: False)
222 separator: URL separator (default: ",")
224 Returns:
225 Tuple of (all_valid, error_message)
226 - all_valid: True if all URLs pass validation
227 - error_message: None if all valid, first error if any invalid
228 """
229 if not urls or not isinstance(urls, str):
230 return False, "Service URLs must be a non-empty string"
232 # Split by separator and strip whitespace
233 url_list = [url.strip() for url in urls.split(separator) if url.strip()]
235 if not url_list:
236 return False, "No valid URLs found after parsing"
238 # Validate each URL
239 for url in url_list:
240 is_valid, error_message = (
241 NotificationURLValidator.validate_service_url(
242 url, allow_private_ips
243 )
244 )
246 if not is_valid:
247 # Return first error found
248 return False, f"Invalid URL '{url[:50]}...': {error_message}"
250 # All URLs passed validation
251 return True, None