Coverage for src / local_deep_research / security / ssrf_validator.py: 85%
79 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2URL Validator for SSRF Prevention
4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks
5by blocking requests to internal/private networks and enforcing safe schemes.
6"""
8import ipaddress
9import os
10import socket
11from urllib.parse import urlparse
12from typing import Optional
13from loguru import logger
15from ..settings.env_registry import get_env_setting
16from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES
18# AWS metadata endpoint (commonly targeted in SSRF attacks)
19# nosec B104 - Hardcoded IP is intentional for SSRF prevention (blocking AWS metadata endpoint)
20AWS_METADATA_IP = "169.254.169.254"
22# Allowed URL schemes
23ALLOWED_SCHEMES = {"http", "https"}
26def is_ip_blocked(
27 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False
28) -> bool:
29 """
30 Check if an IP address is in a blocked range.
32 Args:
33 ip_str: IP address as string
34 allow_localhost: Whether to allow localhost/loopback addresses
35 allow_private_ips: Whether to allow all private/internal IPs plus localhost.
36 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x
37 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
38 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
39 like SearXNG or Ollama in containerized environments.
40 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
42 Returns:
43 True if IP is blocked, False otherwise
44 """
45 # Loopback ranges that can be allowed for trusted internal services
46 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
47 LOOPBACK_RANGES = [
48 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback
49 ipaddress.ip_network("::1/128"), # IPv6 loopback
50 ]
52 # Private/internal network ranges - allowed with allow_private_ips=True
53 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist
54 PRIVATE_RANGES = [
55 # RFC1918 Private Ranges
56 ipaddress.ip_network("10.0.0.0/8"), # Class A private
57 ipaddress.ip_network("172.16.0.0/12"), # Class B private
58 ipaddress.ip_network("192.168.0.0/16"), # Class C private
59 # Container/Virtual Network Ranges
60 ipaddress.ip_network(
61 "100.64.0.0/10"
62 ), # CGNAT - used by Podman/rootless containers
63 ipaddress.ip_network(
64 "169.254.0.0/16"
65 ), # Link-local (AWS metadata blocked separately)
66 # IPv6 Private Ranges
67 ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Addresses
68 ipaddress.ip_network("fe80::/10"), # IPv6 Link-Local
69 ]
71 try:
72 ip = ipaddress.ip_address(ip_str)
74 # ALWAYS block AWS metadata endpoint - critical SSRF target for credential theft
75 if str(ip) == AWS_METADATA_IP:
76 return True
78 # Check if IP is in any blocked range
79 for blocked_range in BLOCKED_IP_RANGES:
80 if ip in blocked_range:
81 # If allow_private_ips is True, skip blocking for private + loopback
82 if allow_private_ips:
83 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
84 is_private = any(ip in pr for pr in PRIVATE_RANGES)
85 if is_loopback or is_private: 85 ↛ 92line 85 didn't jump to line 92 because the condition on line 85 was always true
86 continue
87 # If allow_localhost is True, skip blocking for loopback only
88 elif allow_localhost:
89 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
90 if is_loopback:
91 continue
92 return True
94 return False
96 except ValueError:
97 # Invalid IP address
98 return False
101def validate_url(
102 url: str,
103 allow_redirects: bool = True,
104 allow_localhost: bool = False,
105 allow_private_ips: bool = False,
106) -> bool:
107 """
108 Validate URL to prevent SSRF attacks.
110 Checks:
111 1. URL scheme is allowed (http/https only)
112 2. Hostname is not an internal/private IP address
113 3. Hostname does not resolve to an internal/private IP
115 Args:
116 url: URL to validate
117 allow_redirects: Whether to allow redirects (future use)
118 allow_localhost: Whether to allow localhost/loopback addresses.
119 Set to True for trusted internal services like self-hosted
120 search engines (e.g., searxng). Default False.
121 allow_private_ips: Whether to allow all private/internal IPs plus localhost.
122 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x
123 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
124 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
125 like SearXNG or Ollama in containerized environments.
126 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
128 Returns:
129 True if URL is safe, False otherwise
131 Note:
132 SSRF validation can be disabled for testing by setting environment variables:
133 - TESTING=true
134 - PYTEST_CURRENT_TEST (automatically set by pytest)
135 - LDR_SECURITY_SSRF_DISABLE_VALIDATION=true
136 """
137 # Bypass SSRF validation in test mode
138 # Check environment variables at runtime (not import time) to ensure
139 # pytest's PYTEST_CURRENT_TEST is captured when tests actually run
140 disable_ssrf = get_env_setting(
141 "security.ssrf.disable_validation", default=False
142 )
143 testing_mode = os.environ.get("TESTING", "").lower() in ("true", "1", "yes")
144 pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST")
145 if testing_mode or pytest_current_test or disable_ssrf:
146 logger.debug(f"SSRF validation bypassed in test mode for URL: {url}")
147 return True
149 try:
150 parsed = urlparse(url)
152 # Check scheme
153 if parsed.scheme.lower() not in ALLOWED_SCHEMES:
154 logger.warning(
155 f"Blocked URL with invalid scheme: {parsed.scheme} - {url}"
156 )
157 return False
159 hostname = parsed.hostname
160 if not hostname: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 logger.warning(f"Blocked URL with no hostname: {url}")
162 return False
164 # Check if hostname is an IP address
165 try:
166 ip = ipaddress.ip_address(hostname)
167 if is_ip_blocked(
168 str(ip),
169 allow_localhost=allow_localhost,
170 allow_private_ips=allow_private_ips,
171 ):
172 logger.warning(
173 f"Blocked URL with internal/private IP: {hostname} - {url}"
174 )
175 return False
176 except ValueError:
177 # Not an IP address, it's a hostname - need to resolve it
178 pass
180 # Resolve hostname to IP and check
181 try:
182 # Get all IP addresses for hostname
183 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP)
184 addr_info = socket.getaddrinfo(
185 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM
186 )
188 for info in addr_info:
189 ip_str = info[4][0] # Extract IP address from addr_info tuple
191 if is_ip_blocked(
192 ip_str,
193 allow_localhost=allow_localhost,
194 allow_private_ips=allow_private_ips,
195 ):
196 logger.warning(
197 f"Blocked URL - hostname {hostname} resolves to "
198 f"internal/private IP: {ip_str} - {url}"
199 )
200 return False
202 except socket.gaierror as e:
203 logger.warning(f"Failed to resolve hostname {hostname}: {e}")
204 return False
205 except Exception:
206 logger.exception("Error during hostname resolution")
207 return False
209 # URL passes all checks
210 return True
212 except Exception:
213 logger.exception(f"Error validating URL {url}")
214 return False
217def get_safe_url(
218 url: Optional[str], default: Optional[str] = None
219) -> Optional[str]:
220 """
221 Get URL if it's safe, otherwise return default.
223 Args:
224 url: URL to validate
225 default: Default value if URL is unsafe
227 Returns:
228 URL if safe, default otherwise
229 """
230 if not url:
231 return default
233 if validate_url(url): 233 ↛ 236line 233 didn't jump to line 236 because the condition on line 233 was always true
234 return url
236 logger.warning(f"Unsafe URL rejected: {url}")
237 return default