Coverage for src / local_deep_research / security / notification_validator.py: 96%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Security validation for notification service URLs. 

3 

4This module provides validation for user-configured notification service URLs 

5to prevent Server-Side Request Forgery (SSRF) attacks and other security issues. 

6""" 

7 

8import ipaddress 

9from typing import Optional, Tuple 

10from urllib.parse import urlparse 

11from loguru import logger 

12 

13from .ip_ranges import PRIVATE_IP_RANGES as _PRIVATE_IP_RANGES 

14 

15 

16class NotificationURLValidationError(ValueError): 

17 """Raised when a notification service URL fails security validation.""" 

18 

19 pass 

20 

21 

22class NotificationURLValidator: 

23 """Validates notification service URLs to prevent SSRF and other attacks.""" 

24 

25 # Dangerous protocols that should never be used for notifications 

26 BLOCKED_SCHEMES = ( 

27 "file", # Local file access 

28 "ftp", # FTP can be abused for SSRF 

29 "ftps", # Secure FTP can be abused for SSRF 

30 "data", # Data URIs can leak sensitive data 

31 "javascript", # XSS/code execution 

32 "vbscript", # XSS/code execution 

33 "about", # Browser internal 

34 "blob", # Browser internal 

35 ) 

36 

37 # Allowed protocols for notification services 

38 ALLOWED_SCHEMES = ( 

39 "http", # Webhook services 

40 "https", # Webhook services (preferred) 

41 "mailto", # Email notifications 

42 "discord", # Discord webhooks 

43 "slack", # Slack webhooks 

44 "telegram", # Telegram bot API 

45 "gotify", # Gotify notifications 

46 "pushover", # Pushover notifications 

47 "ntfy", # ntfy.sh notifications 

48 "matrix", # Matrix protocol 

49 "mattermost", # Mattermost webhooks 

50 "rocketchat", # Rocket.Chat webhooks 

51 "teams", # Microsoft Teams 

52 "json", # Generic JSON webhooks 

53 "xml", # Generic XML webhooks 

54 "form", # Form-encoded webhooks 

55 ) 

56 

57 # Reuse shared private IP range definitions 

58 PRIVATE_IP_RANGES = _PRIVATE_IP_RANGES 

59 

60 @staticmethod 

61 def _is_private_ip(hostname: str) -> bool: 

62 """ 

63 Check if hostname resolves to a private IP address. 

64 

65 Args: 

66 hostname: Hostname to check 

67 

68 Returns: 

69 True if hostname is a private IP or localhost 

70 """ 

71 # Check for localhost variations 

72 if hostname.lower() in ( 

73 "localhost", 

74 "127.0.0.1", 

75 "::1", 

76 "0.0.0.0", 

77 "::", 

78 ): 

79 return True 

80 

81 # Try to parse as IP address 

82 try: 

83 ip = ipaddress.ip_address(hostname) 

84 return any( 

85 ip in network 

86 for network in NotificationURLValidator.PRIVATE_IP_RANGES 

87 ) 

88 except ValueError: 

89 # Not a valid IP address, might be a hostname 

90 # For security, we don't resolve hostnames to avoid DNS rebinding attacks 

91 # Apprise services should be configured with public endpoints 

92 return False 

93 

94 @staticmethod 

95 def validate_service_url( 

96 url: str, allow_private_ips: bool = False 

97 ) -> Tuple[bool, Optional[str]]: 

98 """ 

99 Validate a notification service URL for security issues. 

100 

101 This function prevents SSRF attacks by validating that service URLs 

102 use safe protocols and don't target private/internal infrastructure. 

103 

104 Args: 

105 url: Service URL to validate (e.g., "discord://webhook_id/token") 

106 allow_private_ips: Whether to allow private IPs (default: False) 

107 Set to True for development/testing environments 

108 

109 Returns: 

110 Tuple of (is_valid, error_message) 

111 - is_valid: True if URL passes security checks 

112 - error_message: None if valid, error description if invalid 

113 

114 Examples: 

115 >>> validate_service_url("discord://webhook_id/token") 

116 (True, None) 

117 

118 >>> validate_service_url("file:///etc/passwd") 

119 (False, "Blocked unsafe protocol: file") 

120 

121 >>> validate_service_url("http://localhost:5000/webhook") 

122 (False, "Blocked private/internal IP address: localhost") 

123 """ 

124 if not url or not isinstance(url, str): 

125 return False, "Service URL must be a non-empty string" 

126 

127 # Strip whitespace 

128 url = url.strip() 

129 

130 # Parse URL 

131 try: 

132 parsed = urlparse(url) 

133 except Exception as e: 

134 logger.warning(f"Failed to parse service URL: {e}") 

135 return False, f"Invalid URL format: {e}" 

136 

137 # Check for scheme 

138 if not parsed.scheme: 

139 return False, "Service URL must have a protocol (e.g., https://)" 

140 

141 scheme = parsed.scheme.lower() 

142 

143 # Check for blocked schemes 

144 if scheme in NotificationURLValidator.BLOCKED_SCHEMES: 

145 logger.warning( 

146 f"Blocked unsafe notification protocol: {scheme} in URL: {url[:50]}..." 

147 ) 

148 return False, f"Blocked unsafe protocol: {scheme}" 

149 

150 # Check for allowed schemes 

151 if scheme not in NotificationURLValidator.ALLOWED_SCHEMES: 

152 logger.warning( 

153 f"Unknown notification protocol: {scheme} in URL: {url[:50]}..." 

154 ) 

155 return ( 

156 False, 

157 f"Unsupported protocol: {scheme}. " 

158 f"Allowed: {', '.join(NotificationURLValidator.ALLOWED_SCHEMES[:5])}...", 

159 ) 

160 

161 # For HTTP/HTTPS, check for private IPs (SSRF prevention) 

162 if scheme in ("http", "https") and not allow_private_ips: 

163 if parsed.hostname: 163 ↛ 175line 163 didn't jump to line 175 because the condition on line 163 was always true

164 if NotificationURLValidator._is_private_ip(parsed.hostname): 

165 logger.warning( 

166 f"Blocked private/internal IP in notification URL: " 

167 f"{parsed.hostname}" 

168 ) 

169 return ( 

170 False, 

171 f"Blocked private/internal IP address: {parsed.hostname}", 

172 ) 

173 

174 # Passed all security checks 

175 return True, None 

176 

177 @staticmethod 

178 def validate_service_url_strict( 

179 url: str, allow_private_ips: bool = False 

180 ) -> bool: 

181 """ 

182 Strict validation that raises an exception on invalid URLs. 

183 

184 Args: 

185 url: Service URL to validate 

186 allow_private_ips: Whether to allow private IPs (default: False) 

187 

188 Returns: 

189 True if valid 

190 

191 Raises: 

192 NotificationURLValidationError: If URL fails security validation 

193 """ 

194 is_valid, error_message = NotificationURLValidator.validate_service_url( 

195 url, allow_private_ips 

196 ) 

197 

198 if not is_valid: 

199 raise NotificationURLValidationError( 

200 f"Notification service URL validation failed: {error_message}" 

201 ) 

202 

203 return True 

204 

205 @staticmethod 

206 def validate_multiple_urls( 

207 urls: str, allow_private_ips: bool = False, separator: str = "," 

208 ) -> Tuple[bool, Optional[str]]: 

209 """ 

210 Validate multiple comma-separated service URLs. 

211 

212 Args: 

213 urls: Comma-separated service URLs 

214 allow_private_ips: Whether to allow private IPs (default: False) 

215 separator: URL separator (default: ",") 

216 

217 Returns: 

218 Tuple of (all_valid, error_message) 

219 - all_valid: True if all URLs pass validation 

220 - error_message: None if all valid, first error if any invalid 

221 """ 

222 if not urls or not isinstance(urls, str): 

223 return False, "Service URLs must be a non-empty string" 

224 

225 # Split by separator and strip whitespace 

226 url_list = [url.strip() for url in urls.split(separator) if url.strip()] 

227 

228 if not url_list: 

229 return False, "No valid URLs found after parsing" 

230 

231 # Validate each URL 

232 for url in url_list: 

233 is_valid, error_message = ( 

234 NotificationURLValidator.validate_service_url( 

235 url, allow_private_ips 

236 ) 

237 ) 

238 

239 if not is_valid: 

240 # Return first error found 

241 return False, f"Invalid URL '{url[:50]}...': {error_message}" 

242 

243 # All URLs passed validation 

244 return True, None