Coverage for src / local_deep_research / security / ssrf_validator.py: 77%

79 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2URL Validator for SSRF Prevention 

3 

4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks 

5by blocking requests to internal/private networks and enforcing safe schemes. 

6""" 

7 

8import ipaddress 

9import os 

10import socket 

11from urllib.parse import urlparse 

12from typing import Optional 

13from loguru import logger 

14 

15from ..settings.env_registry import get_env_setting 

16 

17 

18# Blocked IP ranges (RFC1918 private networks, localhost, link-local, etc.) 

19# nosec B104 - These hardcoded IPs are intentional for SSRF prevention (blocking private networks) 

20BLOCKED_IP_RANGES = [ 

21 ipaddress.ip_network("127.0.0.0/8"), # Loopback 

22 ipaddress.ip_network("::1/128"), # IPv6 loopback 

23 ipaddress.ip_network("10.0.0.0/8"), # Private network 

24 ipaddress.ip_network("172.16.0.0/12"), # Private network 

25 ipaddress.ip_network("192.168.0.0/16"), # Private network 

26 ipaddress.ip_network("169.254.0.0/16"), # Link-local 

27 ipaddress.ip_network("fe80::/10"), # IPv6 link-local 

28 ipaddress.ip_network("fc00::/7"), # IPv6 unique local 

29 ipaddress.ip_network("0.0.0.0/8"), # "This" network 

30 ipaddress.ip_network("100.64.0.0/10"), # Shared address space 

31] 

32 

33# AWS metadata endpoint (commonly targeted in SSRF attacks) 

34# nosec B104 - Hardcoded IP is intentional for SSRF prevention (blocking AWS metadata endpoint) 

35AWS_METADATA_IP = "169.254.169.254" 

36 

37# Allowed URL schemes 

38ALLOWED_SCHEMES = {"http", "https"} 

39 

40 

41def is_ip_blocked( 

42 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False 

43) -> bool: 

44 """ 

45 Check if an IP address is in a blocked range. 

46 

47 Args: 

48 ip_str: IP address as string 

49 allow_localhost: Whether to allow localhost/loopback addresses 

50 allow_private_ips: Whether to allow all RFC1918 private IPs (10.x, 172.16-31.x, 

51 192.168.x) plus localhost. Use for trusted self-hosted services like SearXNG. 

52 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

53 

54 Returns: 

55 True if IP is blocked, False otherwise 

56 """ 

57 # Loopback ranges that can be allowed for trusted internal services 

58 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

59 LOOPBACK_RANGES = [ 

60 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback 

61 ipaddress.ip_network("::1/128"), # IPv6 loopback 

62 ] 

63 

64 # RFC1918 private network ranges - allowed with allow_private_ips=True 

65 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

66 PRIVATE_RANGES = [ 

67 ipaddress.ip_network("10.0.0.0/8"), # Class A private 

68 ipaddress.ip_network("172.16.0.0/12"), # Class B private 

69 ipaddress.ip_network("192.168.0.0/16"), # Class C private 

70 ] 

71 

72 try: 

73 ip = ipaddress.ip_address(ip_str) 

74 

75 # ALWAYS block AWS metadata endpoint - critical SSRF target for credential theft 

76 if str(ip) == AWS_METADATA_IP: 

77 return True 

78 

79 # Check if IP is in any blocked range 

80 for blocked_range in BLOCKED_IP_RANGES: 

81 if ip in blocked_range: 

82 # If allow_private_ips is True, skip blocking for private + loopback 

83 if allow_private_ips: 

84 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

85 is_private = any(ip in pr for pr in PRIVATE_RANGES) 

86 if is_loopback or is_private: 86 ↛ 93line 86 didn't jump to line 93 because the condition on line 86 was always true

87 continue 

88 # If allow_localhost is True, skip blocking for loopback only 

89 elif allow_localhost: 

90 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

91 if is_loopback: 

92 continue 

93 return True 

94 

95 return False 

96 

97 except ValueError: 

98 # Invalid IP address 

99 return False 

100 

101 

102def validate_url( 

103 url: str, 

104 allow_redirects: bool = True, 

105 allow_localhost: bool = False, 

106 allow_private_ips: bool = False, 

107) -> bool: 

108 """ 

109 Validate URL to prevent SSRF attacks. 

110 

111 Checks: 

112 1. URL scheme is allowed (http/https only) 

113 2. Hostname is not an internal/private IP address 

114 3. Hostname does not resolve to an internal/private IP 

115 

116 Args: 

117 url: URL to validate 

118 allow_redirects: Whether to allow redirects (future use) 

119 allow_localhost: Whether to allow localhost/loopback addresses. 

120 Set to True for trusted internal services like self-hosted 

121 search engines (e.g., searxng). Default False. 

122 allow_private_ips: Whether to allow all RFC1918 private IPs (10.x, 172.16-31.x, 

123 192.168.x) plus localhost. Use for trusted self-hosted services like SearXNG 

124 that may be running on a different machine on the local network. 

125 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

126 

127 Returns: 

128 True if URL is safe, False otherwise 

129 

130 Note: 

131 SSRF validation can be disabled for testing by setting environment variables: 

132 - TESTING=true 

133 - PYTEST_CURRENT_TEST (automatically set by pytest) 

134 - LDR_SECURITY_SSRF_DISABLE_VALIDATION=true 

135 """ 

136 # Bypass SSRF validation in test mode 

137 # Check environment variables at runtime (not import time) to ensure 

138 # pytest's PYTEST_CURRENT_TEST is captured when tests actually run 

139 disable_ssrf = get_env_setting( 

140 "security.ssrf.disable_validation", default=False 

141 ) 

142 testing_mode = os.environ.get("TESTING", "").lower() in ("true", "1", "yes") 

143 pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST") 

144 if testing_mode or pytest_current_test or disable_ssrf: 

145 logger.debug(f"SSRF validation bypassed in test mode for URL: {url}") 

146 return True 

147 

148 try: 

149 parsed = urlparse(url) 

150 

151 # Check scheme 

152 if parsed.scheme.lower() not in ALLOWED_SCHEMES: 

153 logger.warning( 

154 f"Blocked URL with invalid scheme: {parsed.scheme} - {url}" 

155 ) 

156 return False 

157 

158 hostname = parsed.hostname 

159 if not hostname: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 logger.warning(f"Blocked URL with no hostname: {url}") 

161 return False 

162 

163 # Check if hostname is an IP address 

164 try: 

165 ip = ipaddress.ip_address(hostname) 

166 if is_ip_blocked( 

167 str(ip), 

168 allow_localhost=allow_localhost, 

169 allow_private_ips=allow_private_ips, 

170 ): 

171 logger.warning( 

172 f"Blocked URL with internal/private IP: {hostname} - {url}" 

173 ) 

174 return False 

175 except ValueError: 

176 # Not an IP address, it's a hostname - need to resolve it 

177 pass 

178 

179 # Resolve hostname to IP and check 

180 try: 

181 # Get all IP addresses for hostname 

182 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP) 

183 addr_info = socket.getaddrinfo( 

184 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM 

185 ) 

186 

187 for info in addr_info: 

188 ip_str = info[4][0] # Extract IP address from addr_info tuple 

189 

190 if is_ip_blocked( 

191 ip_str, 

192 allow_localhost=allow_localhost, 

193 allow_private_ips=allow_private_ips, 

194 ): 

195 logger.warning( 

196 f"Blocked URL - hostname {hostname} resolves to " 

197 f"internal/private IP: {ip_str} - {url}" 

198 ) 

199 return False 

200 

201 except socket.gaierror as e: 

202 logger.warning(f"Failed to resolve hostname {hostname}: {e}") 

203 return False 

204 except Exception: 

205 logger.exception("Error during hostname resolution") 

206 return False 

207 

208 # URL passes all checks 

209 return True 

210 

211 except Exception: 

212 logger.exception(f"Error validating URL {url}") 

213 return False 

214 

215 

216def get_safe_url( 

217 url: Optional[str], default: Optional[str] = None 

218) -> Optional[str]: 

219 """ 

220 Get URL if it's safe, otherwise return default. 

221 

222 Args: 

223 url: URL to validate 

224 default: Default value if URL is unsafe 

225 

226 Returns: 

227 URL if safe, default otherwise 

228 """ 

229 if not url: 

230 return default 

231 

232 if validate_url(url): 

233 return url 

234 

235 logger.warning(f"Unsafe URL rejected: {url}") 

236 return default