Coverage for src/local_deep_research/security/ssrf_validator.py: 98%

119 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2URL Validator for SSRF Prevention 

3 

4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks 

5by blocking requests to internal/private networks and enforcing safe schemes. 

6""" 

7 

8import ipaddress 

9import re 

10import socket 

11from urllib.parse import urlparse 

12from typing import Optional 

13from loguru import logger 

14from urllib3.exceptions import LocationParseError 

15from urllib3.util import parse_url 

16 

17from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES 

18from .ip_ranges import NAT64_PREFIXES 

19 

20# Cloud-provider metadata endpoints — always blocked, even with 

21# allow_localhost=True or allow_private_ips=True. These IPs expose IAM / 

22# instance-role credentials and are never legitimate destinations. 

23# nosec B104 - Hardcoded IPs are intentional for SSRF prevention 

24ALWAYS_BLOCKED_METADATA_IPS = frozenset( 

25 { 

26 "169.254.169.254", # AWS IMDSv1/v2, Azure, OCI, DigitalOcean 

27 "169.254.170.2", # AWS ECS task metadata v3 

28 "169.254.170.23", # AWS ECS task metadata v4 

29 "169.254.0.23", # Tencent Cloud 

30 "100.100.100.200", # AlibabaCloud 

31 } 

32) 

33 

34# Allowed URL schemes 

35ALLOWED_SCHEMES = {"http", "https"} 

36 

37 

38def is_nat64_wrapped_metadata_ip(ip: ipaddress._BaseAddress) -> bool: 

39 """True iff ``ip`` is an IPv6 address inside a NAT64 prefix whose 

40 embedded IPv4 (low 32 bits) is in ``ALWAYS_BLOCKED_METADATA_IPS``. 

41 

42 Both ``is_ip_blocked`` and ``NotificationURLValidator._ip_matches_blocked_range`` 

43 consult this before honoring the ``security.allow_nat64`` operator 

44 opt-in, so cloud-metadata access cannot be re-opened through an 

45 IPv6-wrapped destination on a NAT64-equipped host. Keeping the 

46 extraction in one place prevents the two validators from drifting. 

47 """ 

48 if not isinstance(ip, ipaddress.IPv6Address): 

49 return False 

50 for nat64_prefix in NAT64_PREFIXES: 

51 if ip in nat64_prefix: 

52 embedded_v4 = ipaddress.IPv4Address(int(ip) & 0xFFFFFFFF) 

53 return str(embedded_v4) in ALWAYS_BLOCKED_METADATA_IPS 

54 return False 

55 

56 

57# RFC 3986 forbids these characters in URLs; their presence in a URL signals 

58# a parser-differential attempt (GHSA-g23j-2vwm-5c25). \s covers space, \t, 

59# \n, \r, \v, \f. Backslash is the load-bearing payload — Python's urlparse 

60# treats it as a literal char while requests/urllib3 treat it as a path 

61# delimiter, so a crafted URL like ``http://127.0.0.1\@1.1.1.1`` would 

62# pass the urlparse-based hostname check but actually connect to 127.0.0.1. 

63RFC_FORBIDDEN_URL_CHARS_RE = re.compile(r"[\\\s\x00-\x1f\x7f]") 

64 

65 

66def is_ip_blocked( 

67 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False 

68) -> bool: 

69 """ 

70 Check if an IP address is in a blocked range. 

71 

72 Args: 

73 ip_str: IP address as string 

74 allow_localhost: Whether to allow localhost/loopback addresses 

75 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

76 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

77 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

78 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

79 like SearXNG or Ollama in containerized environments. 

80 Note: cloud metadata endpoints in ``ALWAYS_BLOCKED_METADATA_IPS`` 

81 (AWS / Azure / OCI / DigitalOcean / AlibabaCloud / Tencent / ECS) 

82 are ALWAYS blocked regardless of these flags. 

83 

84 Returns: 

85 True if IP is blocked, False otherwise 

86 """ 

87 # Loopback ranges that can be allowed for trusted internal services 

88 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

89 LOOPBACK_RANGES = [ 

90 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback 

91 ipaddress.ip_network("::1/128"), # IPv6 loopback 

92 ] 

93 

94 # Private/internal network ranges - allowed with allow_private_ips=True 

95 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

96 PRIVATE_RANGES = [ 

97 # RFC1918 Private Ranges 

98 ipaddress.ip_network("10.0.0.0/8"), # Class A private 

99 ipaddress.ip_network("172.16.0.0/12"), # Class B private 

100 ipaddress.ip_network("192.168.0.0/16"), # Class C private 

101 # Container/Virtual Network Ranges 

102 ipaddress.ip_network( 

103 "100.64.0.0/10" 

104 ), # CGNAT - used by Podman/rootless containers 

105 ipaddress.ip_network( 

106 "169.254.0.0/16" 

107 ), # Link-local (cloud metadata IPs blocked separately via ALWAYS_BLOCKED_METADATA_IPS) 

108 # IPv6 Private Ranges 

109 ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Addresses 

110 ipaddress.ip_network("fe80::/10"), # IPv6 Link-Local 

111 ] 

112 

113 try: 

114 ip = ipaddress.ip_address(ip_str) 

115 

116 # Unwrap IPv4-mapped IPv6 addresses (e.g. ::ffff:127.0.0.1 → 127.0.0.1) 

117 # These bypass IPv4 range checks if not converted. 

118 if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped: 

119 ip = ip.ipv4_mapped 

120 

121 # ALWAYS block cloud-metadata endpoints - critical SSRF target 

122 # for credential theft (AWS IMDS/ECS, Azure, OCI, DigitalOcean, 

123 # AlibabaCloud, Tencent Cloud). These are never legitimate 

124 # destinations regardless of allow_localhost / allow_private_ips. 

125 if str(ip) in ALWAYS_BLOCKED_METADATA_IPS: 

126 return True 

127 

128 # Also block metadata IPs reached via NAT64 wrap. NAT64 prefixes 

129 # embed the IPv4 destination in the low 32 bits; even when the 

130 # operator has set LDR_SECURITY_ALLOW_NAT64=true the metadata 

131 # block is "always" — an opt-in for IPv4 reachability does NOT 

132 # license IMDS exposure. 

133 if is_nat64_wrapped_metadata_ip(ip): 

134 return True 

135 

136 # Operator escape hatch for IPv6-only deployments using DNS64+NAT64. 

137 # Read lazily (not at import) so test monkeypatching works and so the 

138 # value is not cached across env mutations. Cloud-metadata IPs are 

139 # ALWAYS blocked above, so this carve-out cannot reopen IMDS via 

140 # the IPv6-wrapped form. 

141 from ..settings.env_registry import get_env_setting 

142 

143 nat64_allowed = bool(get_env_setting("security.allow_nat64", False)) 

144 

145 # Check if IP is in any blocked range 

146 for blocked_range in BLOCKED_IP_RANGES: 

147 if ip in blocked_range: 

148 # NAT64 carve-out: when the operator has opted in, the two 

149 # NAT64 prefixes don't block. 6to4 / Teredo / discard remain 

150 # blocked unconditionally. 

151 if nat64_allowed and blocked_range in NAT64_PREFIXES: 

152 continue 

153 # If allow_private_ips is True, skip blocking for private + loopback 

154 if allow_private_ips: 

155 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

156 is_private = any(ip in pr for pr in PRIVATE_RANGES) 

157 if is_loopback or is_private: 

158 continue 

159 # If allow_localhost is True, skip blocking for loopback only 

160 elif allow_localhost: 

161 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

162 if is_loopback: 

163 continue 

164 return True 

165 

166 return False 

167 

168 except ValueError: 

169 # Invalid IP address 

170 return False 

171 

172 

173def validate_url( 

174 url: str, 

175 allow_localhost: bool = False, 

176 allow_private_ips: bool = False, 

177) -> bool: 

178 """ 

179 Validate URL to prevent SSRF attacks. 

180 

181 Checks: 

182 1. URL scheme is allowed (http/https only) 

183 2. Hostname is not an internal/private IP address 

184 3. Hostname does not resolve to an internal/private IP 

185 

186 Args: 

187 url: URL to validate 

188 allow_localhost: Whether to allow localhost/loopback addresses. 

189 Set to True for trusted internal services like self-hosted 

190 search engines (e.g., searxng). Default False. 

191 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

192 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

193 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

194 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

195 like SearXNG or Ollama in containerized environments. 

196 Note: cloud metadata endpoints in ``ALWAYS_BLOCKED_METADATA_IPS`` 

197 (AWS / Azure / OCI / DigitalOcean / AlibabaCloud / Tencent / ECS) 

198 are ALWAYS blocked regardless of these flags. 

199 

200 Returns: 

201 True if URL is safe, False otherwise 

202 """ 

203 if not isinstance(url, str): 

204 return False 

205 try: 

206 url = url.strip() 

207 # Layer 1: reject RFC-illegal characters that drive parser-differential 

208 # attacks (backslash, whitespace, control bytes). The URL is omitted 

209 # from this log line because userinfo (RFC 3986 §3.2.1) may contain 

210 # credentials and rejected URLs are by definition adversarial-shaped. 

211 if RFC_FORBIDDEN_URL_CHARS_RE.search(url): 

212 logger.warning("Blocked URL containing RFC-illegal characters") 

213 return False 

214 

215 parsed = urlparse(url) 

216 

217 # Check scheme 

218 if parsed.scheme.lower() not in ALLOWED_SCHEMES: 

219 logger.warning( 

220 f"Blocked URL with invalid scheme: {parsed.scheme} - {redact_url_for_log(url)}" 

221 ) 

222 return False 

223 

224 # Layer 2: extract host using urllib3, the same parser ``requests`` 

225 # uses internally. ``urlparse`` and urllib3 disagree on URLs like 

226 # ``http://127.0.0.1\@1.1.1.1`` — urlparse says ``1.1.1.1``, 

227 # urllib3 says ``127.0.0.1``. Validating against urllib3 means the 

228 # validator and the HTTP client cannot disagree on destination. 

229 try: 

230 u3 = parse_url(url) 

231 except LocationParseError: 

232 logger.warning("Blocked URL: urllib3 parser rejected it") 

233 return False 

234 hostname = u3.host 

235 # Authority must be ASCII printable. urllib3 currently rejects 

236 # non-ASCII via LocationParseError, but this guard keeps us 

237 # independent of that staying constant — CVE-2019-9636 showed 

238 # Python's stdlib loosened a similar restriction previously. 

239 # Brackets/colon used in IPv6 hosts are within 0x20-0x7e, so this 

240 # runs cleanly before bracket-strip. 

241 if hostname and any(ord(c) < 0x20 or ord(c) > 0x7E for c in hostname): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 logger.warning("Blocked URL with non-ASCII / control bytes in host") 

243 return False 

244 # Strip IPv6 brackets so ipaddress.ip_address can parse the host. 

245 if hostname and hostname.startswith("[") and hostname.endswith("]"): 

246 hostname = hostname[1:-1] 

247 # rstrip(".") matches getaddrinfo behaviour — trailing dots are 

248 # ignored at resolution time. 

249 if hostname: 

250 hostname = hostname.rstrip(".") 

251 if not hostname: 

252 logger.warning( 

253 f"Blocked URL with no hostname: {redact_url_for_log(url)}" 

254 ) 

255 return False 

256 

257 # Check if hostname is an IP address 

258 try: 

259 ip = ipaddress.ip_address(hostname) 

260 if is_ip_blocked( 

261 str(ip), 

262 allow_localhost=allow_localhost, 

263 allow_private_ips=allow_private_ips, 

264 ): 

265 logger.warning( 

266 f"Blocked URL with internal/private IP: {hostname} - {redact_url_for_log(url)}" 

267 ) 

268 return False 

269 except ValueError: 

270 # Not an IP address, it's a hostname - need to resolve it 

271 pass 

272 

273 # Resolve hostname to IP and check. 

274 # 

275 # NOTE: This is a best-effort, validation-time check. The caller 

276 # (typically safe_requests) hands the URL to requests/urllib3 

277 # afterwards, which resolves the hostname AGAIN at connect time -- 

278 # a DNS rebinding TOCTOU window. Closing it would require pinning 

279 # the resolved IP into the outbound connection (HTTPAdapter shim 

280 # with server_hostname for SNI), which is HTTPS-only and doesn't 

281 # follow redirects cleanly. See SECURITY.md "Notification Webhook 

282 # SSRF" subsection for the accepted-risk rationale (the same 

283 # caveat applies here). 

284 try: 

285 # Get all IP addresses for hostname 

286 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP) 

287 addr_info = socket.getaddrinfo( 

288 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM 

289 ) 

290 

291 for info in addr_info: 

292 ip_str = str( 

293 info[4][0] 

294 ) # Extract IP address from addr_info tuple 

295 

296 if is_ip_blocked( 

297 ip_str, 

298 allow_localhost=allow_localhost, 

299 allow_private_ips=allow_private_ips, 

300 ): 

301 logger.warning( 

302 f"Blocked URL - hostname {hostname} resolves to " 

303 f"internal/private IP: {ip_str} - {redact_url_for_log(url)}" 

304 ) 

305 return False 

306 

307 except socket.gaierror: 

308 logger.warning(f"Failed to resolve hostname {hostname}") 

309 return False 

310 except Exception: 

311 logger.exception("Error during hostname resolution") 

312 return False 

313 

314 # URL passes all checks 

315 return True 

316 

317 except Exception: 

318 logger.exception(f"Error validating URL {redact_url_for_log(url)}") 

319 return False 

320 

321 

322def get_safe_url( 

323 url: Optional[str], default: Optional[str] = None 

324) -> Optional[str]: 

325 """ 

326 Get URL if it's safe, otherwise return default. 

327 

328 Args: 

329 url: URL to validate 

330 default: Default value if URL is unsafe 

331 

332 Returns: 

333 URL if safe, default otherwise 

334 """ 

335 if not url: 

336 return default 

337 

338 if validate_url(url): 

339 return url 

340 

341 logger.warning(f"Unsafe URL rejected: {redact_url_for_log(url)}") 

342 return default 

343 

344 

345def redact_url_for_log(url: str) -> str: 

346 """Return ``scheme://host:port`` (no userinfo, path, query, fragment). 

347 

348 For log output only. Drops everything except scheme + authority host 

349 + port to minimise the chance of leaking credentials, tokens, or 

350 sensitive paths into logs while still giving operators enough to 

351 distinguish ``http://10.0.0.1:80`` from ``https://10.0.0.1:443``. 

352 

353 RFC 3986 §3.2.1 allows credentials in URL userinfo 

354 (``http://user:pass@host/``). A rejected URL is by definition 

355 adversarial-shaped, but it may still carry the operator's real 

356 credentials if a misconfiguration produced it. 

357 """ 

358 try: 

359 u = parse_url(url) 

360 scheme = u.scheme or "?" 

361 host = u.host or "<no-host>" 

362 host_port = f"{host}:{u.port}" if u.port else host 

363 return f"{scheme}://{host_port}" 

364 except (LocationParseError, ValueError): 

365 return "<unparseable>"