Coverage for src / local_deep_research / security / ssrf_validator.py: 77%
79 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2URL Validator for SSRF Prevention
4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks
5by blocking requests to internal/private networks and enforcing safe schemes.
6"""
8import ipaddress
9import os
10import socket
11from urllib.parse import urlparse
12from typing import Optional
13from loguru import logger
15from ..settings.env_registry import get_env_setting
18# Blocked IP ranges (RFC1918 private networks, localhost, link-local, etc.)
19# nosec B104 - These hardcoded IPs are intentional for SSRF prevention (blocking private networks)
20BLOCKED_IP_RANGES = [
21 ipaddress.ip_network("127.0.0.0/8"), # Loopback
22 ipaddress.ip_network("::1/128"), # IPv6 loopback
23 ipaddress.ip_network("10.0.0.0/8"), # Private network
24 ipaddress.ip_network("172.16.0.0/12"), # Private network
25 ipaddress.ip_network("192.168.0.0/16"), # Private network
26 ipaddress.ip_network("169.254.0.0/16"), # Link-local
27 ipaddress.ip_network("fe80::/10"), # IPv6 link-local
28 ipaddress.ip_network("fc00::/7"), # IPv6 unique local
29 ipaddress.ip_network("0.0.0.0/8"), # "This" network
30 ipaddress.ip_network("100.64.0.0/10"), # Shared address space
31]
33# AWS metadata endpoint (commonly targeted in SSRF attacks)
34# nosec B104 - Hardcoded IP is intentional for SSRF prevention (blocking AWS metadata endpoint)
35AWS_METADATA_IP = "169.254.169.254"
37# Allowed URL schemes
38ALLOWED_SCHEMES = {"http", "https"}
41def is_ip_blocked(
42 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False
43) -> bool:
44 """
45 Check if an IP address is in a blocked range.
47 Args:
48 ip_str: IP address as string
49 allow_localhost: Whether to allow localhost/loopback addresses
50 allow_private_ips: Whether to allow all RFC1918 private IPs (10.x, 172.16-31.x,
51 192.168.x) plus localhost. Use for trusted self-hosted services like SearXNG.
52 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
54 Returns:
55 True if IP is blocked, False otherwise
56 """
57 # Loopback ranges that can be allowed for trusted internal services
58 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
59 LOOPBACK_RANGES = [
60 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback
61 ipaddress.ip_network("::1/128"), # IPv6 loopback
62 ]
64 # RFC1918 private network ranges - allowed with allow_private_ips=True
65 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
66 PRIVATE_RANGES = [
67 ipaddress.ip_network("10.0.0.0/8"), # Class A private
68 ipaddress.ip_network("172.16.0.0/12"), # Class B private
69 ipaddress.ip_network("192.168.0.0/16"), # Class C private
70 ]
72 try:
73 ip = ipaddress.ip_address(ip_str)
75 # ALWAYS block AWS metadata endpoint - critical SSRF target for credential theft
76 if str(ip) == AWS_METADATA_IP:
77 return True
79 # Check if IP is in any blocked range
80 for blocked_range in BLOCKED_IP_RANGES:
81 if ip in blocked_range:
82 # If allow_private_ips is True, skip blocking for private + loopback
83 if allow_private_ips:
84 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
85 is_private = any(ip in pr for pr in PRIVATE_RANGES)
86 if is_loopback or is_private: 86 ↛ 93line 86 didn't jump to line 93 because the condition on line 86 was always true
87 continue
88 # If allow_localhost is True, skip blocking for loopback only
89 elif allow_localhost:
90 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
91 if is_loopback:
92 continue
93 return True
95 return False
97 except ValueError:
98 # Invalid IP address
99 return False
102def validate_url(
103 url: str,
104 allow_redirects: bool = True,
105 allow_localhost: bool = False,
106 allow_private_ips: bool = False,
107) -> bool:
108 """
109 Validate URL to prevent SSRF attacks.
111 Checks:
112 1. URL scheme is allowed (http/https only)
113 2. Hostname is not an internal/private IP address
114 3. Hostname does not resolve to an internal/private IP
116 Args:
117 url: URL to validate
118 allow_redirects: Whether to allow redirects (future use)
119 allow_localhost: Whether to allow localhost/loopback addresses.
120 Set to True for trusted internal services like self-hosted
121 search engines (e.g., searxng). Default False.
122 allow_private_ips: Whether to allow all RFC1918 private IPs (10.x, 172.16-31.x,
123 192.168.x) plus localhost. Use for trusted self-hosted services like SearXNG
124 that may be running on a different machine on the local network.
125 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
127 Returns:
128 True if URL is safe, False otherwise
130 Note:
131 SSRF validation can be disabled for testing by setting environment variables:
132 - TESTING=true
133 - PYTEST_CURRENT_TEST (automatically set by pytest)
134 - LDR_SECURITY_SSRF_DISABLE_VALIDATION=true
135 """
136 # Bypass SSRF validation in test mode
137 # Check environment variables at runtime (not import time) to ensure
138 # pytest's PYTEST_CURRENT_TEST is captured when tests actually run
139 disable_ssrf = get_env_setting(
140 "security.ssrf.disable_validation", default=False
141 )
142 testing_mode = os.environ.get("TESTING", "").lower() in ("true", "1", "yes")
143 pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST")
144 if testing_mode or pytest_current_test or disable_ssrf:
145 logger.debug(f"SSRF validation bypassed in test mode for URL: {url}")
146 return True
148 try:
149 parsed = urlparse(url)
151 # Check scheme
152 if parsed.scheme.lower() not in ALLOWED_SCHEMES:
153 logger.warning(
154 f"Blocked URL with invalid scheme: {parsed.scheme} - {url}"
155 )
156 return False
158 hostname = parsed.hostname
159 if not hostname: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 logger.warning(f"Blocked URL with no hostname: {url}")
161 return False
163 # Check if hostname is an IP address
164 try:
165 ip = ipaddress.ip_address(hostname)
166 if is_ip_blocked(
167 str(ip),
168 allow_localhost=allow_localhost,
169 allow_private_ips=allow_private_ips,
170 ):
171 logger.warning(
172 f"Blocked URL with internal/private IP: {hostname} - {url}"
173 )
174 return False
175 except ValueError:
176 # Not an IP address, it's a hostname - need to resolve it
177 pass
179 # Resolve hostname to IP and check
180 try:
181 # Get all IP addresses for hostname
182 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP)
183 addr_info = socket.getaddrinfo(
184 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM
185 )
187 for info in addr_info:
188 ip_str = info[4][0] # Extract IP address from addr_info tuple
190 if is_ip_blocked(
191 ip_str,
192 allow_localhost=allow_localhost,
193 allow_private_ips=allow_private_ips,
194 ):
195 logger.warning(
196 f"Blocked URL - hostname {hostname} resolves to "
197 f"internal/private IP: {ip_str} - {url}"
198 )
199 return False
201 except socket.gaierror as e:
202 logger.warning(f"Failed to resolve hostname {hostname}: {e}")
203 return False
204 except Exception:
205 logger.exception("Error during hostname resolution")
206 return False
208 # URL passes all checks
209 return True
211 except Exception:
212 logger.exception(f"Error validating URL {url}")
213 return False
216def get_safe_url(
217 url: Optional[str], default: Optional[str] = None
218) -> Optional[str]:
219 """
220 Get URL if it's safe, otherwise return default.
222 Args:
223 url: URL to validate
224 default: Default value if URL is unsafe
226 Returns:
227 URL if safe, default otherwise
228 """
229 if not url:
230 return default
232 if validate_url(url):
233 return url
235 logger.warning(f"Unsafe URL rejected: {url}")
236 return default