Coverage for src/local_deep_research/security/notification_validator.py: 93%

102 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Security validation for notification service URLs. 

3 

4This module provides validation for user-configured notification service URLs 

5to prevent Server-Side Request Forgery (SSRF) attacks and other security issues. 

6""" 

7 

8import ipaddress 

9import socket 

10from concurrent.futures import ThreadPoolExecutor, TimeoutError 

11from typing import Optional, Tuple 

12from urllib.parse import urlparse 

13from loguru import logger 

14from urllib3.exceptions import LocationParseError 

15from urllib3.util import parse_url 

16 

17from .ip_ranges import PRIVATE_IP_RANGES as _PRIVATE_IP_RANGES 

18from .ssrf_validator import RFC_FORBIDDEN_URL_CHARS_RE, redact_url_for_log 

19 

20 

21class NotificationURLValidationError(ValueError): 

22 """Raised when a notification service URL fails security validation.""" 

23 

24 pass 

25 

26 

27class NotificationURLValidator: 

28 """Validates notification service URLs to prevent SSRF and other attacks.""" 

29 

30 # Dangerous protocols that should never be used for notifications 

31 BLOCKED_SCHEMES = ( 

32 "file", # Local file access 

33 "ftp", # FTP can be abused for SSRF 

34 "ftps", # Secure FTP can be abused for SSRF 

35 "data", # Data URIs can leak sensitive data 

36 "javascript", # XSS/code execution 

37 "vbscript", # XSS/code execution 

38 "about", # Browser internal 

39 "blob", # Browser internal 

40 ) 

41 

42 # Allowed protocols for notification services 

43 ALLOWED_SCHEMES = ( 

44 "http", # Webhook services 

45 "https", # Webhook services (preferred) 

46 "mailto", # Email notifications 

47 "discord", # Discord webhooks 

48 "slack", # Slack webhooks 

49 "telegram", # Telegram bot API 

50 "gotify", # Gotify notifications 

51 "pushover", # Pushover notifications 

52 "ntfy", # ntfy.sh notifications (http) 

53 "ntfys", # ntfy.sh notifications (https) 

54 "signal", # Signal via signal-api-rest container 

55 "matrix", # Matrix protocol 

56 "mattermost", # Mattermost webhooks 

57 "rocketchat", # Rocket.Chat webhooks 

58 "teams", # Microsoft Teams 

59 "json", # Generic JSON webhooks 

60 "xml", # Generic XML webhooks 

61 "form", # Form-encoded webhooks 

62 ) 

63 

64 # Reuse shared private IP range definitions 

65 PRIVATE_IP_RANGES = _PRIVATE_IP_RANGES 

66 

67 @staticmethod 

68 def _ip_matches_blocked_range(ip, allow_private_ips: bool = False) -> bool: 

69 """Block-decision for a parsed IP, delegating to 

70 ``ssrf_validator.is_ip_blocked`` so the two validators share a 

71 single source of truth. 

72 

73 Honors: 

74 - ALWAYS_BLOCKED_METADATA_IPS (cloud metadata, absolute) 

75 - is_nat64_wrapped_metadata_ip (NAT64-wrapped IMDS, absolute) 

76 - security.allow_nat64 env carve-out for the two NAT64 prefixes 

77 - allow_private_ips: when True, RFC1918 / CGNAT / loopback / 

78 link-local / IPv6 ULA are allowed BUT the two absolute checks 

79 above still fire. This closes the historical bypass where 

80 ``allow_private_ips=True`` skipped the host check entirely 

81 and let metadata IPs through the notification path. 

82 """ 

83 from .ssrf_validator import is_ip_blocked 

84 

85 return is_ip_blocked(str(ip), allow_private_ips=allow_private_ips) 

86 

87 @staticmethod 

88 def _is_private_ip(hostname: str, allow_private_ips: bool = False) -> bool: 

89 """ 

90 Check if hostname resolves to a private IP address. 

91 

92 Args: 

93 hostname: Hostname to check 

94 allow_private_ips: When True, RFC1918 / CGNAT / loopback / 

95 link-local / IPv6 ULA are NOT considered private. Cloud 

96 metadata IPs and NAT64-wrapped metadata IPs are blocked 

97 regardless — the operator opt-in cannot license IMDS 

98 exposure. 

99 

100 Returns: 

101 True if hostname is a private IP or localhost (subject to 

102 allow_private_ips), or wraps a metadata IP unconditionally 

103 """ 

104 # Localhost-string shortcuts only apply when the operator hasn't 

105 # opted into private-IP reachability. With allow_private_ips=True 

106 # we let the IP path (DNS-resolved or literal) make the decision 

107 # so metadata-IP literals like "169.254.169.254" still block. 

108 if not allow_private_ips and hostname.lower() in ( 

109 "localhost", 

110 "127.0.0.1", 

111 "::1", 

112 "0.0.0.0", 

113 "::", 

114 ): 

115 return True 

116 

117 # Try to parse as IP address 

118 try: 

119 ip = ipaddress.ip_address(hostname) 

120 return NotificationURLValidator._ip_matches_blocked_range( 

121 ip, allow_private_ips=allow_private_ips 

122 ) 

123 except ValueError: 

124 # Hostname - resolve to IP and check. 

125 # 

126 # NOTE: This is a best-effort, validation-time check. Apprise 

127 # re-resolves the hostname when it actually sends the request 

128 # (via requests/urllib3), so an attacker controlling DNS can 

129 # serve a public IP here and a private IP at send time -- a 

130 # classic DNS rebinding TOCTOU window. Apprise exposes no 

131 # Session/adapter/DNS hook to close this in code without 

132 # fragile monkey-patching of its plugin internals. 

133 # 

134 # Because the window cannot be closed cleanly in code, the 

135 # whole outbound-notification path is gated behind an 

136 # env-only master switch (LDR_NOTIFICATIONS_ALLOW_OUTBOUND, 

137 # default off); turning it on is the operator's explicit 

138 # risk-acceptance. See SECURITY.md "Notification Webhook 

139 # SSRF" for details. 

140 # Operators wanting to avoid the window entirely should 

141 # prefer plugin schemes (discord://, slack://, ntfy://, ntfys://, 

142 # gotify://, telegram://, mattermost://, etc.) that hardcode 

143 # their endpoints instead of raw http(s):// webhooks. 

144 # 

145 # Use concurrent.futures for thread-safe timeout instead of 

146 # socket.setdefaulttimeout() which is process-global and not 

147 # thread-safe. 

148 try: 

149 executor = ThreadPoolExecutor(max_workers=1) 

150 try: 

151 future = executor.submit( 

152 socket.getaddrinfo, 

153 hostname, 

154 None, 

155 socket.AF_UNSPEC, 

156 socket.SOCK_STREAM, 

157 ) 

158 resolved_ips = future.result(timeout=5) 

159 finally: 

160 executor.shutdown(wait=False, cancel_futures=True) 

161 for _family, _, _, _, sockaddr in resolved_ips: 

162 ip = ipaddress.ip_address(sockaddr[0]) 

163 if NotificationURLValidator._ip_matches_blocked_range( 

164 ip, allow_private_ips=allow_private_ips 

165 ): 

166 return True 

167 except (socket.gaierror, OSError, TimeoutError): 

168 logger.warning( 

169 "DNS resolution failed for hostname {} — " 

170 "allowing request (unable to determine if private)", 

171 hostname, 

172 ) 

173 return False 

174 

175 @staticmethod 

176 def validate_service_url( 

177 url: str, allow_private_ips: bool = False 

178 ) -> Tuple[bool, Optional[str]]: 

179 """ 

180 Validate a notification service URL for security issues. 

181 

182 This function prevents SSRF attacks by validating that service URLs 

183 use safe protocols and don't target private/internal infrastructure. 

184 

185 Args: 

186 url: Service URL to validate (e.g., "discord://webhook_id/token") 

187 allow_private_ips: Whether to allow private IPs (default: False) 

188 Set to True for development/testing environments 

189 

190 Returns: 

191 Tuple of (is_valid, error_message) 

192 - is_valid: True if URL passes security checks 

193 - error_message: None if valid, error description if invalid 

194 

195 Examples: 

196 >>> validate_service_url("discord://webhook_id/token") 

197 (True, None) 

198 

199 >>> validate_service_url("file:///etc/passwd") 

200 (False, "Blocked unsafe protocol: file") 

201 

202 >>> validate_service_url("http://localhost:5000/webhook") 

203 (False, "Blocked private/internal IP address: localhost") 

204 """ 

205 if not url or not isinstance(url, str): 

206 return False, "Service URL must be a non-empty string" 

207 

208 # Strip whitespace (must run before the RFC-illegal char check 

209 # so legitimate URLs with surrounding whitespace are not rejected). 

210 url = url.strip() 

211 

212 # Reject URLs containing characters that drive parser-differential 

213 # SSRF bypasses (backslash, whitespace, control bytes) — see 

214 # GHSA-g23j-2vwm-5c25. The URL is omitted from the log line because 

215 # userinfo (RFC 3986 §3.2.1) may contain credentials and rejected 

216 # URLs are by definition adversarial-shaped. 

217 if RFC_FORBIDDEN_URL_CHARS_RE.search(url): 

218 logger.warning( 

219 "Blocked notification URL containing RFC-illegal characters" 

220 ) 

221 return ( 

222 False, 

223 "URL contains characters that are not allowed (whitespace, backslash, or control bytes)", 

224 ) 

225 

226 # Parse URL 

227 try: 

228 parsed = urlparse(url) 

229 except Exception as e: 

230 logger.warning("Failed to parse service URL") 

231 return False, f"Invalid URL format: {e}" 

232 

233 # Check for scheme 

234 if not parsed.scheme: 

235 return False, "Service URL must have a protocol (e.g., https://)" 

236 

237 scheme = parsed.scheme.lower() 

238 

239 # Check for blocked schemes 

240 if scheme in NotificationURLValidator.BLOCKED_SCHEMES: 

241 logger.warning( 

242 f"Blocked unsafe notification protocol: {scheme} in URL: {redact_url_for_log(url)}" 

243 ) 

244 return False, f"Blocked unsafe protocol: {scheme}" 

245 

246 # Check for allowed schemes 

247 if scheme not in NotificationURLValidator.ALLOWED_SCHEMES: 

248 logger.warning( 

249 f"Unknown notification protocol: {scheme} in URL: {redact_url_for_log(url)}" 

250 ) 

251 return ( 

252 False, 

253 f"Unsupported protocol: {scheme}. " 

254 f"Allowed: {', '.join(NotificationURLValidator.ALLOWED_SCHEMES[:5])}...", 

255 ) 

256 

257 # Extract the host for any allowed scheme. We use urllib3 (the 

258 # parser ``requests`` uses internally) instead of urlparse — 

259 # urlparse is vulnerable to parser-differential bypasses like 

260 # ``http://127.0.0.1\@1.1.1.1`` (GHSA-g23j-2vwm-5c25). 

261 # 

262 # Per-scheme policy applied below: 

263 # - http/https: full ``_is_private_ip`` check, honoring the 

264 # operator ``allow_private_ips`` opt-in. RFC1918 / loopback 

265 # are allowed through with the flag, but cloud-metadata and 

266 # NAT64-wrapped metadata always block. 

267 # - Apprise plugin schemes (discord, slack, signal, gotify, 

268 # ntfy/ntfys, mattermost, rocketchat, matrix, teams, mailto, 

269 # json, xml, form): private-IP reachability is intentionally 

270 # allowed (these are typically self-hosted on a LAN), but the 

271 # absolute cloud-metadata block still applies. Apprise 

272 # translates these to HTTP requests against the URL host 

273 # (e.g. ``signal://169.254.169.254/...`` → ``POST 

274 # http://169.254.169.254/v2/send``), so without this guard 

275 # the plugin schemes would bypass the IMDS protection that 

276 # http/https has. 

277 try: 

278 u3 = parse_url(url) 

279 except LocationParseError: 

280 logger.warning( 

281 "Blocked notification URL: urllib3 parser rejected it" 

282 ) 

283 return False, "Invalid URL format (parser rejected)" 

284 hostname = u3.host 

285 # Authority must be ASCII printable (forward-defence vs urllib3 

286 # ever loosening its IDN handling). 

287 if hostname and any(ord(c) < 0x20 or ord(c) > 0x7E for c in hostname): 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 logger.warning( 

289 "Blocked notification URL with non-ASCII / control bytes in host" 

290 ) 

291 return False, "URL host contains disallowed characters" 

292 if hostname and hostname.startswith("[") and hostname.endswith("]"): 

293 hostname = hostname[1:-1] 

294 if hostname: 294 ↛ 297line 294 didn't jump to line 297 because the condition on line 294 was always true

295 hostname = hostname.rstrip(".") 

296 

297 if scheme in ("http", "https"): 

298 if hostname and NotificationURLValidator._is_private_ip( 

299 hostname, allow_private_ips=allow_private_ips 

300 ): 

301 logger.warning( 

302 f"Blocked private/internal IP in notification URL: " 

303 f"{hostname}" 

304 ) 

305 return ( 

306 False, 

307 f"Blocked private/internal IP address: {hostname}", 

308 ) 

309 else: 

310 # Plugin-scheme IMDS guard. ``allow_private_ips=True`` leaves 

311 # ALWAYS_BLOCKED_METADATA_IPS and NAT64-wrapped metadata as 

312 # the only active blocks in ``_is_private_ip`` — exactly the 

313 # set we want to enforce regardless of operator flags. 

314 if hostname and NotificationURLValidator._is_private_ip( 

315 hostname, allow_private_ips=True 

316 ): 

317 logger.warning( 

318 f"Blocked cloud-metadata IP in notification URL: {hostname}" 

319 ) 

320 return ( 

321 False, 

322 f"Blocked cloud-metadata IP address: {hostname}", 

323 ) 

324 

325 # Passed all security checks 

326 return True, None 

327 

328 @staticmethod 

329 def validate_service_url_strict( 

330 url: str, allow_private_ips: bool = False 

331 ) -> bool: 

332 """ 

333 Strict validation that raises an exception on invalid URLs. 

334 

335 Args: 

336 url: Service URL to validate 

337 allow_private_ips: Whether to allow private IPs (default: False) 

338 

339 Returns: 

340 True if valid 

341 

342 Raises: 

343 NotificationURLValidationError: If URL fails security validation 

344 """ 

345 is_valid, error_message = NotificationURLValidator.validate_service_url( 

346 url, allow_private_ips 

347 ) 

348 

349 if not is_valid: 

350 raise NotificationURLValidationError( 

351 f"Notification service URL validation failed: {error_message}" 

352 ) 

353 

354 return True 

355 

356 @staticmethod 

357 def validate_multiple_urls( 

358 urls: str, allow_private_ips: bool = False, separator: str = "," 

359 ) -> Tuple[bool, Optional[str]]: 

360 """ 

361 Validate multiple comma-separated service URLs. 

362 

363 Args: 

364 urls: Comma-separated service URLs 

365 allow_private_ips: Whether to allow private IPs (default: False) 

366 separator: URL separator (default: ",") 

367 

368 Returns: 

369 Tuple of (all_valid, error_message) 

370 - all_valid: True if all URLs pass validation 

371 - error_message: None if all valid, first error if any invalid 

372 """ 

373 if not urls or not isinstance(urls, str): 

374 return False, "Service URLs must be a non-empty string" 

375 

376 # Split by separator and strip whitespace 

377 url_list = [url.strip() for url in urls.split(separator) if url.strip()] 

378 

379 if not url_list: 

380 return False, "No valid URLs found after parsing" 

381 

382 # Validate each URL 

383 for url in url_list: 

384 is_valid, error_message = ( 

385 NotificationURLValidator.validate_service_url( 

386 url, allow_private_ips 

387 ) 

388 ) 

389 

390 if not is_valid: 

391 # Return first error found 

392 return ( 

393 False, 

394 f"Invalid URL '{redact_url_for_log(url)}': {error_message}", 

395 ) 

396 

397 # All URLs passed validation 

398 return True, None