Coverage for src / local_deep_research / security / ssrf_validator.py: 94%
73 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2URL Validator for SSRF Prevention
4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks
5by blocking requests to internal/private networks and enforcing safe schemes.
6"""
8import ipaddress
9import socket
10from urllib.parse import urlparse
11from typing import Optional
12from loguru import logger
14from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES
16# AWS metadata endpoint (commonly targeted in SSRF attacks)
17# nosec B104 - Hardcoded IP is intentional for SSRF prevention (blocking AWS metadata endpoint)
18AWS_METADATA_IP = "169.254.169.254"
20# Allowed URL schemes
21ALLOWED_SCHEMES = {"http", "https"}
24def is_ip_blocked(
25 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False
26) -> bool:
27 """
28 Check if an IP address is in a blocked range.
30 Args:
31 ip_str: IP address as string
32 allow_localhost: Whether to allow localhost/loopback addresses
33 allow_private_ips: Whether to allow all private/internal IPs plus localhost.
34 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x
35 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
36 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
37 like SearXNG or Ollama in containerized environments.
38 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
40 Returns:
41 True if IP is blocked, False otherwise
42 """
43 # Loopback ranges that can be allowed for trusted internal services
44 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
45 LOOPBACK_RANGES = [
46 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback
47 ipaddress.ip_network("::1/128"), # IPv6 loopback
48 ]
50 # Private/internal network ranges - allowed with allow_private_ips=True
51 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
52 PRIVATE_RANGES = [
53 # RFC1918 Private Ranges
54 ipaddress.ip_network("10.0.0.0/8"), # Class A private
55 ipaddress.ip_network("172.16.0.0/12"), # Class B private
56 ipaddress.ip_network("192.168.0.0/16"), # Class C private
57 # Container/Virtual Network Ranges
58 ipaddress.ip_network(
59 "100.64.0.0/10"
60 ), # CGNAT - used by Podman/rootless containers
61 ipaddress.ip_network(
62 "169.254.0.0/16"
63 ), # Link-local (AWS metadata blocked separately)
64 # IPv6 Private Ranges
65 ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Addresses
66 ipaddress.ip_network("fe80::/10"), # IPv6 Link-Local
67 ]
69 try:
70 ip = ipaddress.ip_address(ip_str)
72 # Unwrap IPv4-mapped IPv6 addresses (e.g. ::ffff:127.0.0.1 → 127.0.0.1)
73 # These bypass IPv4 range checks if not converted.
74 if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped:
75 ip = ip.ipv4_mapped
77 # ALWAYS block AWS metadata endpoint - critical SSRF target for credential theft
78 if str(ip) == AWS_METADATA_IP:
79 return True
81 # Check if IP is in any blocked range
82 for blocked_range in BLOCKED_IP_RANGES:
83 if ip in blocked_range:
84 # If allow_private_ips is True, skip blocking for private + loopback
85 if allow_private_ips:
86 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
87 is_private = any(ip in pr for pr in PRIVATE_RANGES)
88 if is_loopback or is_private:
89 continue
90 # If allow_localhost is True, skip blocking for loopback only
91 elif allow_localhost:
92 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
93 if is_loopback:
94 continue
95 return True
97 return False
99 except ValueError:
100 # Invalid IP address
101 return False
104def validate_url(
105 url: str,
106 allow_localhost: bool = False,
107 allow_private_ips: bool = False,
108) -> bool:
109 """
110 Validate URL to prevent SSRF attacks.
112 Checks:
113 1. URL scheme is allowed (http/https only)
114 2. Hostname is not an internal/private IP address
115 3. Hostname does not resolve to an internal/private IP
117 Args:
118 url: URL to validate
119 allow_localhost: Whether to allow localhost/loopback addresses.
120 Set to True for trusted internal services like self-hosted
121 search engines (e.g., searxng). Default False.
122 allow_private_ips: Whether to allow all private/internal IPs plus localhost.
123 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x
124 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
125 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
126 like SearXNG or Ollama in containerized environments.
127 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
129 Returns:
130 True if URL is safe, False otherwise
131 """
132 try:
133 parsed = urlparse(url)
135 # Check scheme
136 if parsed.scheme.lower() not in ALLOWED_SCHEMES:
137 logger.warning(
138 f"Blocked URL with invalid scheme: {parsed.scheme} - {url}"
139 )
140 return False
142 hostname = parsed.hostname
143 if not hostname:
144 logger.warning(f"Blocked URL with no hostname: {url}")
145 return False
147 # Check if hostname is an IP address
148 try:
149 ip = ipaddress.ip_address(hostname)
150 if is_ip_blocked(
151 str(ip),
152 allow_localhost=allow_localhost,
153 allow_private_ips=allow_private_ips,
154 ):
155 logger.warning(
156 f"Blocked URL with internal/private IP: {hostname} - {url}"
157 )
158 return False
159 except ValueError:
160 # Not an IP address, it's a hostname - need to resolve it
161 pass
163 # Resolve hostname to IP and check
164 try:
165 # Get all IP addresses for hostname
166 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP)
167 addr_info = socket.getaddrinfo(
168 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM
169 )
171 for info in addr_info:
172 ip_str = str(
173 info[4][0]
174 ) # Extract IP address from addr_info tuple
176 if is_ip_blocked(
177 ip_str,
178 allow_localhost=allow_localhost,
179 allow_private_ips=allow_private_ips,
180 ):
181 logger.warning(
182 f"Blocked URL - hostname {hostname} resolves to "
183 f"internal/private IP: {ip_str} - {url}"
184 )
185 return False
187 except socket.gaierror:
188 logger.warning(f"Failed to resolve hostname {hostname}")
189 return False
190 except Exception:
191 logger.exception("Error during hostname resolution")
192 return False
194 # URL passes all checks
195 return True
197 except Exception:
198 logger.exception(f"Error validating URL {url}")
199 return False
202def get_safe_url(
203 url: Optional[str], default: Optional[str] = None
204) -> Optional[str]:
205 """
206 Get URL if it's safe, otherwise return default.
208 Args:
209 url: URL to validate
210 default: Default value if URL is unsafe
212 Returns:
213 URL if safe, default otherwise
214 """
215 if not url:
216 return default
218 if validate_url(url):
219 return url
221 logger.warning(f"Unsafe URL rejected: {url}")
222 return default