Coverage for src/local_deep_research/security/ssrf_validator.py: 98%
119 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2URL Validator for SSRF Prevention
4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks
5by blocking requests to internal/private networks and enforcing safe schemes.
6"""
8import ipaddress
9import re
10import socket
11from urllib.parse import urlparse
12from typing import Optional
13from loguru import logger
14from urllib3.exceptions import LocationParseError
15from urllib3.util import parse_url
17from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES
18from .ip_ranges import NAT64_PREFIXES
20# Cloud-provider metadata endpoints — always blocked, even with
21# allow_localhost=True or allow_private_ips=True. These IPs expose IAM /
22# instance-role credentials and are never legitimate destinations.
23# nosec B104 - Hardcoded IPs are intentional for SSRF prevention
24ALWAYS_BLOCKED_METADATA_IPS = frozenset(
25 {
26 "169.254.169.254", # AWS IMDSv1/v2, Azure, OCI, DigitalOcean
27 "169.254.170.2", # AWS ECS task metadata v3
28 "169.254.170.23", # AWS ECS task metadata v4
29 "169.254.0.23", # Tencent Cloud
30 "100.100.100.200", # AlibabaCloud
31 }
32)
34# Allowed URL schemes
35ALLOWED_SCHEMES = {"http", "https"}
38def is_nat64_wrapped_metadata_ip(ip: ipaddress._BaseAddress) -> bool:
39 """True iff ``ip`` is an IPv6 address inside a NAT64 prefix whose
40 embedded IPv4 (low 32 bits) is in ``ALWAYS_BLOCKED_METADATA_IPS``.
42 Both ``is_ip_blocked`` and ``NotificationURLValidator._ip_matches_blocked_range``
43 consult this before honoring the ``security.allow_nat64`` operator
44 opt-in, so cloud-metadata access cannot be re-opened through an
45 IPv6-wrapped destination on a NAT64-equipped host. Keeping the
46 extraction in one place prevents the two validators from drifting.
47 """
48 if not isinstance(ip, ipaddress.IPv6Address):
49 return False
50 for nat64_prefix in NAT64_PREFIXES:
51 if ip in nat64_prefix:
52 embedded_v4 = ipaddress.IPv4Address(int(ip) & 0xFFFFFFFF)
53 return str(embedded_v4) in ALWAYS_BLOCKED_METADATA_IPS
54 return False
57# RFC 3986 forbids these characters in URLs; their presence in a URL signals
58# a parser-differential attempt (GHSA-g23j-2vwm-5c25). \s covers space, \t,
59# \n, \r, \v, \f. Backslash is the load-bearing payload — Python's urlparse
60# treats it as a literal char while requests/urllib3 treat it as a path
61# delimiter, so a crafted URL like ``http://127.0.0.1\@1.1.1.1`` would
62# pass the urlparse-based hostname check but actually connect to 127.0.0.1.
63RFC_FORBIDDEN_URL_CHARS_RE = re.compile(r"[\\\s\x00-\x1f\x7f]")
66def is_ip_blocked(
67 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False
68) -> bool:
69 """
70 Check if an IP address is in a blocked range.
72 Args:
73 ip_str: IP address as string
74 allow_localhost: Whether to allow localhost/loopback addresses
75 allow_private_ips: Whether to allow all private/internal IPs plus localhost.
76 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x
77 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
78 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
79 like SearXNG or Ollama in containerized environments.
80 Note: cloud metadata endpoints in ``ALWAYS_BLOCKED_METADATA_IPS``
81 (AWS / Azure / OCI / DigitalOcean / AlibabaCloud / Tencent / ECS)
82 are ALWAYS blocked regardless of these flags.
84 Returns:
85 True if IP is blocked, False otherwise
86 """
87 # Loopback ranges that can be allowed for trusted internal services
88 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
89 LOOPBACK_RANGES = [
90 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback
91 ipaddress.ip_network("::1/128"), # IPv6 loopback
92 ]
94 # Private/internal network ranges - allowed with allow_private_ips=True
95 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
96 PRIVATE_RANGES = [
97 # RFC1918 Private Ranges
98 ipaddress.ip_network("10.0.0.0/8"), # Class A private
99 ipaddress.ip_network("172.16.0.0/12"), # Class B private
100 ipaddress.ip_network("192.168.0.0/16"), # Class C private
101 # Container/Virtual Network Ranges
102 ipaddress.ip_network(
103 "100.64.0.0/10"
104 ), # CGNAT - used by Podman/rootless containers
105 ipaddress.ip_network(
106 "169.254.0.0/16"
107 ), # Link-local (cloud metadata IPs blocked separately via ALWAYS_BLOCKED_METADATA_IPS)
108 # IPv6 Private Ranges
109 ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Addresses
110 ipaddress.ip_network("fe80::/10"), # IPv6 Link-Local
111 ]
113 try:
114 ip = ipaddress.ip_address(ip_str)
116 # Unwrap IPv4-mapped IPv6 addresses (e.g. ::ffff:127.0.0.1 → 127.0.0.1)
117 # These bypass IPv4 range checks if not converted.
118 if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped:
119 ip = ip.ipv4_mapped
121 # ALWAYS block cloud-metadata endpoints - critical SSRF target
122 # for credential theft (AWS IMDS/ECS, Azure, OCI, DigitalOcean,
123 # AlibabaCloud, Tencent Cloud). These are never legitimate
124 # destinations regardless of allow_localhost / allow_private_ips.
125 if str(ip) in ALWAYS_BLOCKED_METADATA_IPS:
126 return True
128 # Also block metadata IPs reached via NAT64 wrap. NAT64 prefixes
129 # embed the IPv4 destination in the low 32 bits; even when the
130 # operator has set LDR_SECURITY_ALLOW_NAT64=true the metadata
131 # block is "always" — an opt-in for IPv4 reachability does NOT
132 # license IMDS exposure.
133 if is_nat64_wrapped_metadata_ip(ip):
134 return True
136 # Operator escape hatch for IPv6-only deployments using DNS64+NAT64.
137 # Read lazily (not at import) so test monkeypatching works and so the
138 # value is not cached across env mutations. Cloud-metadata IPs are
139 # ALWAYS blocked above, so this carve-out cannot reopen IMDS via
140 # the IPv6-wrapped form.
141 from ..settings.env_registry import get_env_setting
143 nat64_allowed = bool(get_env_setting("security.allow_nat64", False))
145 # Check if IP is in any blocked range
146 for blocked_range in BLOCKED_IP_RANGES:
147 if ip in blocked_range:
148 # NAT64 carve-out: when the operator has opted in, the two
149 # NAT64 prefixes don't block. 6to4 / Teredo / discard remain
150 # blocked unconditionally.
151 if nat64_allowed and blocked_range in NAT64_PREFIXES:
152 continue
153 # If allow_private_ips is True, skip blocking for private + loopback
154 if allow_private_ips:
155 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
156 is_private = any(ip in pr for pr in PRIVATE_RANGES)
157 if is_loopback or is_private:
158 continue
159 # If allow_localhost is True, skip blocking for loopback only
160 elif allow_localhost:
161 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
162 if is_loopback:
163 continue
164 return True
166 return False
168 except ValueError:
169 # Invalid IP address
170 return False
173def validate_url(
174 url: str,
175 allow_localhost: bool = False,
176 allow_private_ips: bool = False,
177) -> bool:
178 """
179 Validate URL to prevent SSRF attacks.
181 Checks:
182 1. URL scheme is allowed (http/https only)
183 2. Hostname is not an internal/private IP address
184 3. Hostname does not resolve to an internal/private IP
186 Args:
187 url: URL to validate
188 allow_localhost: Whether to allow localhost/loopback addresses.
189 Set to True for trusted internal services like self-hosted
190 search engines (e.g., searxng). Default False.
191 allow_private_ips: Whether to allow all private/internal IPs plus localhost.
192 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x
193 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
194 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
195 like SearXNG or Ollama in containerized environments.
196 Note: cloud metadata endpoints in ``ALWAYS_BLOCKED_METADATA_IPS``
197 (AWS / Azure / OCI / DigitalOcean / AlibabaCloud / Tencent / ECS)
198 are ALWAYS blocked regardless of these flags.
200 Returns:
201 True if URL is safe, False otherwise
202 """
203 if not isinstance(url, str):
204 return False
205 try:
206 url = url.strip()
207 # Layer 1: reject RFC-illegal characters that drive parser-differential
208 # attacks (backslash, whitespace, control bytes). The URL is omitted
209 # from this log line because userinfo (RFC 3986 §3.2.1) may contain
210 # credentials and rejected URLs are by definition adversarial-shaped.
211 if RFC_FORBIDDEN_URL_CHARS_RE.search(url):
212 logger.warning("Blocked URL containing RFC-illegal characters")
213 return False
215 parsed = urlparse(url)
217 # Check scheme
218 if parsed.scheme.lower() not in ALLOWED_SCHEMES:
219 logger.warning(
220 f"Blocked URL with invalid scheme: {parsed.scheme} - {redact_url_for_log(url)}"
221 )
222 return False
224 # Layer 2: extract host using urllib3, the same parser ``requests``
225 # uses internally. ``urlparse`` and urllib3 disagree on URLs like
226 # ``http://127.0.0.1\@1.1.1.1`` — urlparse says ``1.1.1.1``,
227 # urllib3 says ``127.0.0.1``. Validating against urllib3 means the
228 # validator and the HTTP client cannot disagree on destination.
229 try:
230 u3 = parse_url(url)
231 except LocationParseError:
232 logger.warning("Blocked URL: urllib3 parser rejected it")
233 return False
234 hostname = u3.host
235 # Authority must be ASCII printable. urllib3 currently rejects
236 # non-ASCII via LocationParseError, but this guard keeps us
237 # independent of that staying constant — CVE-2019-9636 showed
238 # Python's stdlib loosened a similar restriction previously.
239 # Brackets/colon used in IPv6 hosts are within 0x20-0x7e, so this
240 # runs cleanly before bracket-strip.
241 if hostname and any(ord(c) < 0x20 or ord(c) > 0x7E for c in hostname): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 logger.warning("Blocked URL with non-ASCII / control bytes in host")
243 return False
244 # Strip IPv6 brackets so ipaddress.ip_address can parse the host.
245 if hostname and hostname.startswith("[") and hostname.endswith("]"):
246 hostname = hostname[1:-1]
247 # rstrip(".") matches getaddrinfo behaviour — trailing dots are
248 # ignored at resolution time.
249 if hostname:
250 hostname = hostname.rstrip(".")
251 if not hostname:
252 logger.warning(
253 f"Blocked URL with no hostname: {redact_url_for_log(url)}"
254 )
255 return False
257 # Check if hostname is an IP address
258 try:
259 ip = ipaddress.ip_address(hostname)
260 if is_ip_blocked(
261 str(ip),
262 allow_localhost=allow_localhost,
263 allow_private_ips=allow_private_ips,
264 ):
265 logger.warning(
266 f"Blocked URL with internal/private IP: {hostname} - {redact_url_for_log(url)}"
267 )
268 return False
269 except ValueError:
270 # Not an IP address, it's a hostname - need to resolve it
271 pass
273 # Resolve hostname to IP and check.
274 #
275 # NOTE: This is a best-effort, validation-time check. The caller
276 # (typically safe_requests) hands the URL to requests/urllib3
277 # afterwards, which resolves the hostname AGAIN at connect time --
278 # a DNS rebinding TOCTOU window. Closing it would require pinning
279 # the resolved IP into the outbound connection (HTTPAdapter shim
280 # with server_hostname for SNI), which is HTTPS-only and doesn't
281 # follow redirects cleanly. See SECURITY.md "Notification Webhook
282 # SSRF" subsection for the accepted-risk rationale (the same
283 # caveat applies here).
284 try:
285 # Get all IP addresses for hostname
286 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP)
287 addr_info = socket.getaddrinfo(
288 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM
289 )
291 for info in addr_info:
292 ip_str = str(
293 info[4][0]
294 ) # Extract IP address from addr_info tuple
296 if is_ip_blocked(
297 ip_str,
298 allow_localhost=allow_localhost,
299 allow_private_ips=allow_private_ips,
300 ):
301 logger.warning(
302 f"Blocked URL - hostname {hostname} resolves to "
303 f"internal/private IP: {ip_str} - {redact_url_for_log(url)}"
304 )
305 return False
307 except socket.gaierror:
308 logger.warning(f"Failed to resolve hostname {hostname}")
309 return False
310 except Exception:
311 logger.exception("Error during hostname resolution")
312 return False
314 # URL passes all checks
315 return True
317 except Exception:
318 logger.exception(f"Error validating URL {redact_url_for_log(url)}")
319 return False
322def get_safe_url(
323 url: Optional[str], default: Optional[str] = None
324) -> Optional[str]:
325 """
326 Get URL if it's safe, otherwise return default.
328 Args:
329 url: URL to validate
330 default: Default value if URL is unsafe
332 Returns:
333 URL if safe, default otherwise
334 """
335 if not url:
336 return default
338 if validate_url(url):
339 return url
341 logger.warning(f"Unsafe URL rejected: {redact_url_for_log(url)}")
342 return default
345def redact_url_for_log(url: str) -> str:
346 """Return ``scheme://host:port`` (no userinfo, path, query, fragment).
348 For log output only. Drops everything except scheme + authority host
349 + port to minimise the chance of leaking credentials, tokens, or
350 sensitive paths into logs while still giving operators enough to
351 distinguish ``http://10.0.0.1:80`` from ``https://10.0.0.1:443``.
353 RFC 3986 §3.2.1 allows credentials in URL userinfo
354 (``http://user:pass@host/``). A rejected URL is by definition
355 adversarial-shaped, but it may still carry the operator's real
356 credentials if a misconfiguration produced it.
357 """
358 try:
359 u = parse_url(url)
360 scheme = u.scheme or "?"
361 host = u.host or "<no-host>"
362 host_port = f"{host}:{u.port}" if u.port else host
363 return f"{scheme}://{host_port}"
364 except (LocationParseError, ValueError):
365 return "<unparseable>"