Coverage for src / local_deep_research / security / notification_validator.py: 95%

62 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Security validation for notification service URLs. 

3 

4This module provides validation for user-configured notification service URLs 

5to prevent Server-Side Request Forgery (SSRF) attacks and other security issues. 

6""" 

7 

8import ipaddress 

9from typing import Optional, Tuple 

10from urllib.parse import urlparse 

11from loguru import logger 

12 

13 

14class NotificationURLValidationError(ValueError): 

15 """Raised when a notification service URL fails security validation.""" 

16 

17 pass 

18 

19 

20class NotificationURLValidator: 

21 """Validates notification service URLs to prevent SSRF and other attacks.""" 

22 

23 # Dangerous protocols that should never be used for notifications 

24 BLOCKED_SCHEMES = ( 

25 "file", # Local file access 

26 "ftp", # FTP can be abused for SSRF 

27 "ftps", # Secure FTP can be abused for SSRF 

28 "data", # Data URIs can leak sensitive data 

29 "javascript", # XSS/code execution 

30 "vbscript", # XSS/code execution 

31 "about", # Browser internal 

32 "blob", # Browser internal 

33 ) 

34 

35 # Allowed protocols for notification services 

36 ALLOWED_SCHEMES = ( 

37 "http", # Webhook services 

38 "https", # Webhook services (preferred) 

39 "mailto", # Email notifications 

40 "discord", # Discord webhooks 

41 "slack", # Slack webhooks 

42 "telegram", # Telegram bot API 

43 "gotify", # Gotify notifications 

44 "pushover", # Pushover notifications 

45 "ntfy", # ntfy.sh notifications 

46 "matrix", # Matrix protocol 

47 "mattermost", # Mattermost webhooks 

48 "rocketchat", # Rocket.Chat webhooks 

49 "teams", # Microsoft Teams 

50 "json", # Generic JSON webhooks 

51 "xml", # Generic XML webhooks 

52 "form", # Form-encoded webhooks 

53 ) 

54 

55 # Private IP ranges (RFC 1918 + loopback + link-local) 

56 PRIVATE_IP_RANGES = [ 

57 ipaddress.ip_network("127.0.0.0/8"), # Loopback 

58 ipaddress.ip_network("10.0.0.0/8"), # Private 

59 ipaddress.ip_network("172.16.0.0/12"), # Private 

60 ipaddress.ip_network("192.168.0.0/16"), # Private 

61 ipaddress.ip_network("169.254.0.0/16"), # Link-local 

62 ipaddress.ip_network("::1/128"), # IPv6 loopback 

63 ipaddress.ip_network("fc00::/7"), # IPv6 unique local 

64 ipaddress.ip_network("fe80::/10"), # IPv6 link-local 

65 ] 

66 

67 @staticmethod 

68 def _is_private_ip(hostname: str) -> bool: 

69 """ 

70 Check if hostname resolves to a private IP address. 

71 

72 Args: 

73 hostname: Hostname to check 

74 

75 Returns: 

76 True if hostname is a private IP or localhost 

77 """ 

78 # Check for localhost variations 

79 if hostname.lower() in ( 

80 "localhost", 

81 "127.0.0.1", 

82 "::1", 

83 "0.0.0.0", 

84 "::", 

85 ): 

86 return True 

87 

88 # Try to parse as IP address 

89 try: 

90 ip = ipaddress.ip_address(hostname) 

91 return any( 

92 ip in network 

93 for network in NotificationURLValidator.PRIVATE_IP_RANGES 

94 ) 

95 except ValueError: 

96 # Not a valid IP address, might be a hostname 

97 # For security, we don't resolve hostnames to avoid DNS rebinding attacks 

98 # Apprise services should be configured with public endpoints 

99 return False 

100 

101 @staticmethod 

102 def validate_service_url( 

103 url: str, allow_private_ips: bool = False 

104 ) -> Tuple[bool, Optional[str]]: 

105 """ 

106 Validate a notification service URL for security issues. 

107 

108 This function prevents SSRF attacks by validating that service URLs 

109 use safe protocols and don't target private/internal infrastructure. 

110 

111 Args: 

112 url: Service URL to validate (e.g., "discord://webhook_id/token") 

113 allow_private_ips: Whether to allow private IPs (default: False) 

114 Set to True for development/testing environments 

115 

116 Returns: 

117 Tuple of (is_valid, error_message) 

118 - is_valid: True if URL passes security checks 

119 - error_message: None if valid, error description if invalid 

120 

121 Examples: 

122 >>> validate_service_url("discord://webhook_id/token") 

123 (True, None) 

124 

125 >>> validate_service_url("file:///etc/passwd") 

126 (False, "Blocked unsafe protocol: file") 

127 

128 >>> validate_service_url("http://localhost:5000/webhook") 

129 (False, "Blocked private/internal IP address: localhost") 

130 """ 

131 if not url or not isinstance(url, str): 

132 return False, "Service URL must be a non-empty string" 

133 

134 # Strip whitespace 

135 url = url.strip() 

136 

137 # Parse URL 

138 try: 

139 parsed = urlparse(url) 

140 except Exception as e: 

141 logger.warning(f"Failed to parse service URL: {e}") 

142 return False, f"Invalid URL format: {e}" 

143 

144 # Check for scheme 

145 if not parsed.scheme: 

146 return False, "Service URL must have a protocol (e.g., https://)" 

147 

148 scheme = parsed.scheme.lower() 

149 

150 # Check for blocked schemes 

151 if scheme in NotificationURLValidator.BLOCKED_SCHEMES: 

152 logger.warning( 

153 f"Blocked unsafe notification protocol: {scheme} in URL: {url[:50]}..." 

154 ) 

155 return False, f"Blocked unsafe protocol: {scheme}" 

156 

157 # Check for allowed schemes 

158 if scheme not in NotificationURLValidator.ALLOWED_SCHEMES: 

159 logger.warning( 

160 f"Unknown notification protocol: {scheme} in URL: {url[:50]}..." 

161 ) 

162 return ( 

163 False, 

164 f"Unsupported protocol: {scheme}. " 

165 f"Allowed: {', '.join(NotificationURLValidator.ALLOWED_SCHEMES[:5])}...", 

166 ) 

167 

168 # For HTTP/HTTPS, check for private IPs (SSRF prevention) 

169 if scheme in ("http", "https") and not allow_private_ips: 

170 if parsed.hostname: 170 ↛ 182line 170 didn't jump to line 182 because the condition on line 170 was always true

171 if NotificationURLValidator._is_private_ip(parsed.hostname): 

172 logger.warning( 

173 f"Blocked private/internal IP in notification URL: " 

174 f"{parsed.hostname}" 

175 ) 

176 return ( 

177 False, 

178 f"Blocked private/internal IP address: {parsed.hostname}", 

179 ) 

180 

181 # Passed all security checks 

182 return True, None 

183 

184 @staticmethod 

185 def validate_service_url_strict( 

186 url: str, allow_private_ips: bool = False 

187 ) -> bool: 

188 """ 

189 Strict validation that raises an exception on invalid URLs. 

190 

191 Args: 

192 url: Service URL to validate 

193 allow_private_ips: Whether to allow private IPs (default: False) 

194 

195 Returns: 

196 True if valid 

197 

198 Raises: 

199 NotificationURLValidationError: If URL fails security validation 

200 """ 

201 is_valid, error_message = NotificationURLValidator.validate_service_url( 

202 url, allow_private_ips 

203 ) 

204 

205 if not is_valid: 

206 raise NotificationURLValidationError( 

207 f"Notification service URL validation failed: {error_message}" 

208 ) 

209 

210 return True 

211 

212 @staticmethod 

213 def validate_multiple_urls( 

214 urls: str, allow_private_ips: bool = False, separator: str = "," 

215 ) -> Tuple[bool, Optional[str]]: 

216 """ 

217 Validate multiple comma-separated service URLs. 

218 

219 Args: 

220 urls: Comma-separated service URLs 

221 allow_private_ips: Whether to allow private IPs (default: False) 

222 separator: URL separator (default: ",") 

223 

224 Returns: 

225 Tuple of (all_valid, error_message) 

226 - all_valid: True if all URLs pass validation 

227 - error_message: None if all valid, first error if any invalid 

228 """ 

229 if not urls or not isinstance(urls, str): 

230 return False, "Service URLs must be a non-empty string" 

231 

232 # Split by separator and strip whitespace 

233 url_list = [url.strip() for url in urls.split(separator) if url.strip()] 

234 

235 if not url_list: 

236 return False, "No valid URLs found after parsing" 

237 

238 # Validate each URL 

239 for url in url_list: 

240 is_valid, error_message = ( 

241 NotificationURLValidator.validate_service_url( 

242 url, allow_private_ips 

243 ) 

244 ) 

245 

246 if not is_valid: 

247 # Return first error found 

248 return False, f"Invalid URL '{url[:50]}...': {error_message}" 

249 

250 # All URLs passed validation 

251 return True, None