Coverage for src/local_deep_research/security/notification_validator.py: 93%
102 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Security validation for notification service URLs.
4This module provides validation for user-configured notification service URLs
5to prevent Server-Side Request Forgery (SSRF) attacks and other security issues.
6"""
8import ipaddress
9import socket
10from concurrent.futures import ThreadPoolExecutor, TimeoutError
11from typing import Optional, Tuple
12from urllib.parse import urlparse
13from loguru import logger
14from urllib3.exceptions import LocationParseError
15from urllib3.util import parse_url
17from .ip_ranges import PRIVATE_IP_RANGES as _PRIVATE_IP_RANGES
18from .ssrf_validator import RFC_FORBIDDEN_URL_CHARS_RE, redact_url_for_log
21class NotificationURLValidationError(ValueError):
22 """Raised when a notification service URL fails security validation."""
24 pass
27class NotificationURLValidator:
28 """Validates notification service URLs to prevent SSRF and other attacks."""
30 # Dangerous protocols that should never be used for notifications
31 BLOCKED_SCHEMES = (
32 "file", # Local file access
33 "ftp", # FTP can be abused for SSRF
34 "ftps", # Secure FTP can be abused for SSRF
35 "data", # Data URIs can leak sensitive data
36 "javascript", # XSS/code execution
37 "vbscript", # XSS/code execution
38 "about", # Browser internal
39 "blob", # Browser internal
40 )
42 # Allowed protocols for notification services
43 ALLOWED_SCHEMES = (
44 "http", # Webhook services
45 "https", # Webhook services (preferred)
46 "mailto", # Email notifications
47 "discord", # Discord webhooks
48 "slack", # Slack webhooks
49 "telegram", # Telegram bot API
50 "gotify", # Gotify notifications
51 "pushover", # Pushover notifications
52 "ntfy", # ntfy.sh notifications (http)
53 "ntfys", # ntfy.sh notifications (https)
54 "signal", # Signal via signal-api-rest container
55 "matrix", # Matrix protocol
56 "mattermost", # Mattermost webhooks
57 "rocketchat", # Rocket.Chat webhooks
58 "teams", # Microsoft Teams
59 "json", # Generic JSON webhooks
60 "xml", # Generic XML webhooks
61 "form", # Form-encoded webhooks
62 )
64 # Reuse shared private IP range definitions
65 PRIVATE_IP_RANGES = _PRIVATE_IP_RANGES
67 @staticmethod
68 def _ip_matches_blocked_range(ip, allow_private_ips: bool = False) -> bool:
69 """Block-decision for a parsed IP, delegating to
70 ``ssrf_validator.is_ip_blocked`` so the two validators share a
71 single source of truth.
73 Honors:
74 - ALWAYS_BLOCKED_METADATA_IPS (cloud metadata, absolute)
75 - is_nat64_wrapped_metadata_ip (NAT64-wrapped IMDS, absolute)
76 - security.allow_nat64 env carve-out for the two NAT64 prefixes
77 - allow_private_ips: when True, RFC1918 / CGNAT / loopback /
78 link-local / IPv6 ULA are allowed BUT the two absolute checks
79 above still fire. This closes the historical bypass where
80 ``allow_private_ips=True`` skipped the host check entirely
81 and let metadata IPs through the notification path.
82 """
83 from .ssrf_validator import is_ip_blocked
85 return is_ip_blocked(str(ip), allow_private_ips=allow_private_ips)
87 @staticmethod
88 def _is_private_ip(hostname: str, allow_private_ips: bool = False) -> bool:
89 """
90 Check if hostname resolves to a private IP address.
92 Args:
93 hostname: Hostname to check
94 allow_private_ips: When True, RFC1918 / CGNAT / loopback /
95 link-local / IPv6 ULA are NOT considered private. Cloud
96 metadata IPs and NAT64-wrapped metadata IPs are blocked
97 regardless — the operator opt-in cannot license IMDS
98 exposure.
100 Returns:
101 True if hostname is a private IP or localhost (subject to
102 allow_private_ips), or wraps a metadata IP unconditionally
103 """
104 # Localhost-string shortcuts only apply when the operator hasn't
105 # opted into private-IP reachability. With allow_private_ips=True
106 # we let the IP path (DNS-resolved or literal) make the decision
107 # so metadata-IP literals like "169.254.169.254" still block.
108 if not allow_private_ips and hostname.lower() in (
109 "localhost",
110 "127.0.0.1",
111 "::1",
112 "0.0.0.0",
113 "::",
114 ):
115 return True
117 # Try to parse as IP address
118 try:
119 ip = ipaddress.ip_address(hostname)
120 return NotificationURLValidator._ip_matches_blocked_range(
121 ip, allow_private_ips=allow_private_ips
122 )
123 except ValueError:
124 # Hostname - resolve to IP and check.
125 #
126 # NOTE: This is a best-effort, validation-time check. Apprise
127 # re-resolves the hostname when it actually sends the request
128 # (via requests/urllib3), so an attacker controlling DNS can
129 # serve a public IP here and a private IP at send time -- a
130 # classic DNS rebinding TOCTOU window. Apprise exposes no
131 # Session/adapter/DNS hook to close this in code without
132 # fragile monkey-patching of its plugin internals.
133 #
134 # Because the window cannot be closed cleanly in code, the
135 # whole outbound-notification path is gated behind an
136 # env-only master switch (LDR_NOTIFICATIONS_ALLOW_OUTBOUND,
137 # default off); turning it on is the operator's explicit
138 # risk-acceptance. See SECURITY.md "Notification Webhook
139 # SSRF" for details.
140 # Operators wanting to avoid the window entirely should
141 # prefer plugin schemes (discord://, slack://, ntfy://, ntfys://,
142 # gotify://, telegram://, mattermost://, etc.) that hardcode
143 # their endpoints instead of raw http(s):// webhooks.
144 #
145 # Use concurrent.futures for thread-safe timeout instead of
146 # socket.setdefaulttimeout() which is process-global and not
147 # thread-safe.
148 try:
149 executor = ThreadPoolExecutor(max_workers=1)
150 try:
151 future = executor.submit(
152 socket.getaddrinfo,
153 hostname,
154 None,
155 socket.AF_UNSPEC,
156 socket.SOCK_STREAM,
157 )
158 resolved_ips = future.result(timeout=5)
159 finally:
160 executor.shutdown(wait=False, cancel_futures=True)
161 for _family, _, _, _, sockaddr in resolved_ips:
162 ip = ipaddress.ip_address(sockaddr[0])
163 if NotificationURLValidator._ip_matches_blocked_range(
164 ip, allow_private_ips=allow_private_ips
165 ):
166 return True
167 except (socket.gaierror, OSError, TimeoutError):
168 logger.warning(
169 "DNS resolution failed for hostname {} — "
170 "allowing request (unable to determine if private)",
171 hostname,
172 )
173 return False
175 @staticmethod
176 def validate_service_url(
177 url: str, allow_private_ips: bool = False
178 ) -> Tuple[bool, Optional[str]]:
179 """
180 Validate a notification service URL for security issues.
182 This function prevents SSRF attacks by validating that service URLs
183 use safe protocols and don't target private/internal infrastructure.
185 Args:
186 url: Service URL to validate (e.g., "discord://webhook_id/token")
187 allow_private_ips: Whether to allow private IPs (default: False)
188 Set to True for development/testing environments
190 Returns:
191 Tuple of (is_valid, error_message)
192 - is_valid: True if URL passes security checks
193 - error_message: None if valid, error description if invalid
195 Examples:
196 >>> validate_service_url("discord://webhook_id/token")
197 (True, None)
199 >>> validate_service_url("file:///etc/passwd")
200 (False, "Blocked unsafe protocol: file")
202 >>> validate_service_url("http://localhost:5000/webhook")
203 (False, "Blocked private/internal IP address: localhost")
204 """
205 if not url or not isinstance(url, str):
206 return False, "Service URL must be a non-empty string"
208 # Strip whitespace (must run before the RFC-illegal char check
209 # so legitimate URLs with surrounding whitespace are not rejected).
210 url = url.strip()
212 # Reject URLs containing characters that drive parser-differential
213 # SSRF bypasses (backslash, whitespace, control bytes) — see
214 # GHSA-g23j-2vwm-5c25. The URL is omitted from the log line because
215 # userinfo (RFC 3986 §3.2.1) may contain credentials and rejected
216 # URLs are by definition adversarial-shaped.
217 if RFC_FORBIDDEN_URL_CHARS_RE.search(url):
218 logger.warning(
219 "Blocked notification URL containing RFC-illegal characters"
220 )
221 return (
222 False,
223 "URL contains characters that are not allowed (whitespace, backslash, or control bytes)",
224 )
226 # Parse URL
227 try:
228 parsed = urlparse(url)
229 except Exception as e:
230 logger.warning("Failed to parse service URL")
231 return False, f"Invalid URL format: {e}"
233 # Check for scheme
234 if not parsed.scheme:
235 return False, "Service URL must have a protocol (e.g., https://)"
237 scheme = parsed.scheme.lower()
239 # Check for blocked schemes
240 if scheme in NotificationURLValidator.BLOCKED_SCHEMES:
241 logger.warning(
242 f"Blocked unsafe notification protocol: {scheme} in URL: {redact_url_for_log(url)}"
243 )
244 return False, f"Blocked unsafe protocol: {scheme}"
246 # Check for allowed schemes
247 if scheme not in NotificationURLValidator.ALLOWED_SCHEMES:
248 logger.warning(
249 f"Unknown notification protocol: {scheme} in URL: {redact_url_for_log(url)}"
250 )
251 return (
252 False,
253 f"Unsupported protocol: {scheme}. "
254 f"Allowed: {', '.join(NotificationURLValidator.ALLOWED_SCHEMES[:5])}...",
255 )
257 # Extract the host for any allowed scheme. We use urllib3 (the
258 # parser ``requests`` uses internally) instead of urlparse —
259 # urlparse is vulnerable to parser-differential bypasses like
260 # ``http://127.0.0.1\@1.1.1.1`` (GHSA-g23j-2vwm-5c25).
261 #
262 # Per-scheme policy applied below:
263 # - http/https: full ``_is_private_ip`` check, honoring the
264 # operator ``allow_private_ips`` opt-in. RFC1918 / loopback
265 # are allowed through with the flag, but cloud-metadata and
266 # NAT64-wrapped metadata always block.
267 # - Apprise plugin schemes (discord, slack, signal, gotify,
268 # ntfy/ntfys, mattermost, rocketchat, matrix, teams, mailto,
269 # json, xml, form): private-IP reachability is intentionally
270 # allowed (these are typically self-hosted on a LAN), but the
271 # absolute cloud-metadata block still applies. Apprise
272 # translates these to HTTP requests against the URL host
273 # (e.g. ``signal://169.254.169.254/...`` → ``POST
274 # http://169.254.169.254/v2/send``), so without this guard
275 # the plugin schemes would bypass the IMDS protection that
276 # http/https has.
277 try:
278 u3 = parse_url(url)
279 except LocationParseError:
280 logger.warning(
281 "Blocked notification URL: urllib3 parser rejected it"
282 )
283 return False, "Invalid URL format (parser rejected)"
284 hostname = u3.host
285 # Authority must be ASCII printable (forward-defence vs urllib3
286 # ever loosening its IDN handling).
287 if hostname and any(ord(c) < 0x20 or ord(c) > 0x7E for c in hostname): 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 logger.warning(
289 "Blocked notification URL with non-ASCII / control bytes in host"
290 )
291 return False, "URL host contains disallowed characters"
292 if hostname and hostname.startswith("[") and hostname.endswith("]"):
293 hostname = hostname[1:-1]
294 if hostname: 294 ↛ 297line 294 didn't jump to line 297 because the condition on line 294 was always true
295 hostname = hostname.rstrip(".")
297 if scheme in ("http", "https"):
298 if hostname and NotificationURLValidator._is_private_ip(
299 hostname, allow_private_ips=allow_private_ips
300 ):
301 logger.warning(
302 f"Blocked private/internal IP in notification URL: "
303 f"{hostname}"
304 )
305 return (
306 False,
307 f"Blocked private/internal IP address: {hostname}",
308 )
309 else:
310 # Plugin-scheme IMDS guard. ``allow_private_ips=True`` leaves
311 # ALWAYS_BLOCKED_METADATA_IPS and NAT64-wrapped metadata as
312 # the only active blocks in ``_is_private_ip`` — exactly the
313 # set we want to enforce regardless of operator flags.
314 if hostname and NotificationURLValidator._is_private_ip(
315 hostname, allow_private_ips=True
316 ):
317 logger.warning(
318 f"Blocked cloud-metadata IP in notification URL: {hostname}"
319 )
320 return (
321 False,
322 f"Blocked cloud-metadata IP address: {hostname}",
323 )
325 # Passed all security checks
326 return True, None
328 @staticmethod
329 def validate_service_url_strict(
330 url: str, allow_private_ips: bool = False
331 ) -> bool:
332 """
333 Strict validation that raises an exception on invalid URLs.
335 Args:
336 url: Service URL to validate
337 allow_private_ips: Whether to allow private IPs (default: False)
339 Returns:
340 True if valid
342 Raises:
343 NotificationURLValidationError: If URL fails security validation
344 """
345 is_valid, error_message = NotificationURLValidator.validate_service_url(
346 url, allow_private_ips
347 )
349 if not is_valid:
350 raise NotificationURLValidationError(
351 f"Notification service URL validation failed: {error_message}"
352 )
354 return True
356 @staticmethod
357 def validate_multiple_urls(
358 urls: str, allow_private_ips: bool = False, separator: str = ","
359 ) -> Tuple[bool, Optional[str]]:
360 """
361 Validate multiple comma-separated service URLs.
363 Args:
364 urls: Comma-separated service URLs
365 allow_private_ips: Whether to allow private IPs (default: False)
366 separator: URL separator (default: ",")
368 Returns:
369 Tuple of (all_valid, error_message)
370 - all_valid: True if all URLs pass validation
371 - error_message: None if all valid, first error if any invalid
372 """
373 if not urls or not isinstance(urls, str):
374 return False, "Service URLs must be a non-empty string"
376 # Split by separator and strip whitespace
377 url_list = [url.strip() for url in urls.split(separator) if url.strip()]
379 if not url_list:
380 return False, "No valid URLs found after parsing"
382 # Validate each URL
383 for url in url_list:
384 is_valid, error_message = (
385 NotificationURLValidator.validate_service_url(
386 url, allow_private_ips
387 )
388 )
390 if not is_valid:
391 # Return first error found
392 return (
393 False,
394 f"Invalid URL '{redact_url_for_log(url)}': {error_message}",
395 )
397 # All URLs passed validation
398 return True, None