Coverage for src / local_deep_research / security / ssrf_validator.py: 85%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2URL Validator for SSRF Prevention 

3 

4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks 

5by blocking requests to internal/private networks and enforcing safe schemes. 

6""" 

7 

8import ipaddress 

9import os 

10import socket 

11from urllib.parse import urlparse 

12from typing import Optional 

13from loguru import logger 

14 

15from ..settings.env_registry import get_env_setting 

16from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES 

17 

18# AWS metadata endpoint (commonly targeted in SSRF attacks) 

19# nosec B104 - Hardcoded IP is intentional for SSRF prevention (blocking AWS metadata endpoint) 

20AWS_METADATA_IP = "169.254.169.254" 

21 

22# Allowed URL schemes 

23ALLOWED_SCHEMES = {"http", "https"} 

24 

25 

26def is_ip_blocked( 

27 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False 

28) -> bool: 

29 """ 

30 Check if an IP address is in a blocked range. 

31 

32 Args: 

33 ip_str: IP address as string 

34 allow_localhost: Whether to allow localhost/loopback addresses 

35 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

36 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

37 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

38 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

39 like SearXNG or Ollama in containerized environments. 

40 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

41 

42 Returns: 

43 True if IP is blocked, False otherwise 

44 """ 

45 # Loopback ranges that can be allowed for trusted internal services 

46 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

47 LOOPBACK_RANGES = [ 

48 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback 

49 ipaddress.ip_network("::1/128"), # IPv6 loopback 

50 ] 

51 

52 # Private/internal network ranges - allowed with allow_private_ips=True 

53 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

54 PRIVATE_RANGES = [ 

55 # RFC1918 Private Ranges 

56 ipaddress.ip_network("10.0.0.0/8"), # Class A private 

57 ipaddress.ip_network("172.16.0.0/12"), # Class B private 

58 ipaddress.ip_network("192.168.0.0/16"), # Class C private 

59 # Container/Virtual Network Ranges 

60 ipaddress.ip_network( 

61 "100.64.0.0/10" 

62 ), # CGNAT - used by Podman/rootless containers 

63 ipaddress.ip_network( 

64 "169.254.0.0/16" 

65 ), # Link-local (AWS metadata blocked separately) 

66 # IPv6 Private Ranges 

67 ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Addresses 

68 ipaddress.ip_network("fe80::/10"), # IPv6 Link-Local 

69 ] 

70 

71 try: 

72 ip = ipaddress.ip_address(ip_str) 

73 

74 # ALWAYS block AWS metadata endpoint - critical SSRF target for credential theft 

75 if str(ip) == AWS_METADATA_IP: 

76 return True 

77 

78 # Check if IP is in any blocked range 

79 for blocked_range in BLOCKED_IP_RANGES: 

80 if ip in blocked_range: 

81 # If allow_private_ips is True, skip blocking for private + loopback 

82 if allow_private_ips: 

83 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

84 is_private = any(ip in pr for pr in PRIVATE_RANGES) 

85 if is_loopback or is_private: 85 ↛ 92line 85 didn't jump to line 92 because the condition on line 85 was always true

86 continue 

87 # If allow_localhost is True, skip blocking for loopback only 

88 elif allow_localhost: 

89 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

90 if is_loopback: 

91 continue 

92 return True 

93 

94 return False 

95 

96 except ValueError: 

97 # Invalid IP address 

98 return False 

99 

100 

101def validate_url( 

102 url: str, 

103 allow_redirects: bool = True, 

104 allow_localhost: bool = False, 

105 allow_private_ips: bool = False, 

106) -> bool: 

107 """ 

108 Validate URL to prevent SSRF attacks. 

109 

110 Checks: 

111 1. URL scheme is allowed (http/https only) 

112 2. Hostname is not an internal/private IP address 

113 3. Hostname does not resolve to an internal/private IP 

114 

115 Args: 

116 url: URL to validate 

117 allow_redirects: Whether to allow redirects (future use) 

118 allow_localhost: Whether to allow localhost/loopback addresses. 

119 Set to True for trusted internal services like self-hosted 

120 search engines (e.g., searxng). Default False. 

121 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

122 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

123 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

124 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

125 like SearXNG or Ollama in containerized environments. 

126 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

127 

128 Returns: 

129 True if URL is safe, False otherwise 

130 

131 Note: 

132 SSRF validation can be disabled for testing by setting environment variables: 

133 - TESTING=true 

134 - PYTEST_CURRENT_TEST (automatically set by pytest) 

135 - LDR_SECURITY_SSRF_DISABLE_VALIDATION=true 

136 """ 

137 # Bypass SSRF validation in test mode 

138 # Check environment variables at runtime (not import time) to ensure 

139 # pytest's PYTEST_CURRENT_TEST is captured when tests actually run 

140 disable_ssrf = get_env_setting( 

141 "security.ssrf.disable_validation", default=False 

142 ) 

143 testing_mode = os.environ.get("TESTING", "").lower() in ("true", "1", "yes") 

144 pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST") 

145 if testing_mode or pytest_current_test or disable_ssrf: 

146 logger.debug(f"SSRF validation bypassed in test mode for URL: {url}") 

147 return True 

148 

149 try: 

150 parsed = urlparse(url) 

151 

152 # Check scheme 

153 if parsed.scheme.lower() not in ALLOWED_SCHEMES: 

154 logger.warning( 

155 f"Blocked URL with invalid scheme: {parsed.scheme} - {url}" 

156 ) 

157 return False 

158 

159 hostname = parsed.hostname 

160 if not hostname: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 logger.warning(f"Blocked URL with no hostname: {url}") 

162 return False 

163 

164 # Check if hostname is an IP address 

165 try: 

166 ip = ipaddress.ip_address(hostname) 

167 if is_ip_blocked( 

168 str(ip), 

169 allow_localhost=allow_localhost, 

170 allow_private_ips=allow_private_ips, 

171 ): 

172 logger.warning( 

173 f"Blocked URL with internal/private IP: {hostname} - {url}" 

174 ) 

175 return False 

176 except ValueError: 

177 # Not an IP address, it's a hostname - need to resolve it 

178 pass 

179 

180 # Resolve hostname to IP and check 

181 try: 

182 # Get all IP addresses for hostname 

183 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP) 

184 addr_info = socket.getaddrinfo( 

185 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM 

186 ) 

187 

188 for info in addr_info: 

189 ip_str = info[4][0] # Extract IP address from addr_info tuple 

190 

191 if is_ip_blocked( 

192 ip_str, 

193 allow_localhost=allow_localhost, 

194 allow_private_ips=allow_private_ips, 

195 ): 

196 logger.warning( 

197 f"Blocked URL - hostname {hostname} resolves to " 

198 f"internal/private IP: {ip_str} - {url}" 

199 ) 

200 return False 

201 

202 except socket.gaierror as e: 

203 logger.warning(f"Failed to resolve hostname {hostname}: {e}") 

204 return False 

205 except Exception: 

206 logger.exception("Error during hostname resolution") 

207 return False 

208 

209 # URL passes all checks 

210 return True 

211 

212 except Exception: 

213 logger.exception(f"Error validating URL {url}") 

214 return False 

215 

216 

217def get_safe_url( 

218 url: Optional[str], default: Optional[str] = None 

219) -> Optional[str]: 

220 """ 

221 Get URL if it's safe, otherwise return default. 

222 

223 Args: 

224 url: URL to validate 

225 default: Default value if URL is unsafe 

226 

227 Returns: 

228 URL if safe, default otherwise 

229 """ 

230 if not url: 

231 return default 

232 

233 if validate_url(url): 233 ↛ 236line 233 didn't jump to line 236 because the condition on line 233 was always true

234 return url 

235 

236 logger.warning(f"Unsafe URL rejected: {url}") 

237 return default