Coverage for src / local_deep_research / security / ssrf_validator.py: 94%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2URL Validator for SSRF Prevention 

3 

4Validates URLs to prevent Server-Side Request Forgery (SSRF) attacks 

5by blocking requests to internal/private networks and enforcing safe schemes. 

6""" 

7 

8import ipaddress 

9import socket 

10from urllib.parse import urlparse 

11from typing import Optional 

12from loguru import logger 

13 

14from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES 

15 

16# AWS metadata endpoint (commonly targeted in SSRF attacks) 

17# nosec B104 - Hardcoded IP is intentional for SSRF prevention (blocking AWS metadata endpoint) 

18AWS_METADATA_IP = "169.254.169.254" 

19 

20# Allowed URL schemes 

21ALLOWED_SCHEMES = {"http", "https"} 

22 

23 

24def is_ip_blocked( 

25 ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False 

26) -> bool: 

27 """ 

28 Check if an IP address is in a blocked range. 

29 

30 Args: 

31 ip_str: IP address as string 

32 allow_localhost: Whether to allow localhost/loopback addresses 

33 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

34 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

35 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

36 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

37 like SearXNG or Ollama in containerized environments. 

38 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

39 

40 Returns: 

41 True if IP is blocked, False otherwise 

42 """ 

43 # Loopback ranges that can be allowed for trusted internal services 

44 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

45 LOOPBACK_RANGES = [ 

46 ipaddress.ip_network("127.0.0.0/8"), # IPv4 loopback 

47 ipaddress.ip_network("::1/128"), # IPv6 loopback 

48 ] 

49 

50 # Private/internal network ranges - allowed with allow_private_ips=True 

51 # nosec B104 - These hardcoded IPs are intentional for SSRF allowlist 

52 PRIVATE_RANGES = [ 

53 # RFC1918 Private Ranges 

54 ipaddress.ip_network("10.0.0.0/8"), # Class A private 

55 ipaddress.ip_network("172.16.0.0/12"), # Class B private 

56 ipaddress.ip_network("192.168.0.0/16"), # Class C private 

57 # Container/Virtual Network Ranges 

58 ipaddress.ip_network( 

59 "100.64.0.0/10" 

60 ), # CGNAT - used by Podman/rootless containers 

61 ipaddress.ip_network( 

62 "169.254.0.0/16" 

63 ), # Link-local (AWS metadata blocked separately) 

64 # IPv6 Private Ranges 

65 ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Addresses 

66 ipaddress.ip_network("fe80::/10"), # IPv6 Link-Local 

67 ] 

68 

69 try: 

70 ip = ipaddress.ip_address(ip_str) 

71 

72 # Unwrap IPv4-mapped IPv6 addresses (e.g. ::ffff:127.0.0.1 → 127.0.0.1) 

73 # These bypass IPv4 range checks if not converted. 

74 if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped: 

75 ip = ip.ipv4_mapped 

76 

77 # ALWAYS block AWS metadata endpoint - critical SSRF target for credential theft 

78 if str(ip) == AWS_METADATA_IP: 

79 return True 

80 

81 # Check if IP is in any blocked range 

82 for blocked_range in BLOCKED_IP_RANGES: 

83 if ip in blocked_range: 

84 # If allow_private_ips is True, skip blocking for private + loopback 

85 if allow_private_ips: 

86 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

87 is_private = any(ip in pr for pr in PRIVATE_RANGES) 

88 if is_loopback or is_private: 

89 continue 

90 # If allow_localhost is True, skip blocking for loopback only 

91 elif allow_localhost: 

92 is_loopback = any(ip in lr for lr in LOOPBACK_RANGES) 

93 if is_loopback: 

94 continue 

95 return True 

96 

97 return False 

98 

99 except ValueError: 

100 # Invalid IP address 

101 return False 

102 

103 

104def validate_url( 

105 url: str, 

106 allow_localhost: bool = False, 

107 allow_private_ips: bool = False, 

108) -> bool: 

109 """ 

110 Validate URL to prevent SSRF attacks. 

111 

112 Checks: 

113 1. URL scheme is allowed (http/https only) 

114 2. Hostname is not an internal/private IP address 

115 3. Hostname does not resolve to an internal/private IP 

116 

117 Args: 

118 url: URL to validate 

119 allow_localhost: Whether to allow localhost/loopback addresses. 

120 Set to True for trusted internal services like self-hosted 

121 search engines (e.g., searxng). Default False. 

122 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

123 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

124 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

125 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

126 like SearXNG or Ollama in containerized environments. 

127 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

128 

129 Returns: 

130 True if URL is safe, False otherwise 

131 """ 

132 try: 

133 parsed = urlparse(url) 

134 

135 # Check scheme 

136 if parsed.scheme.lower() not in ALLOWED_SCHEMES: 

137 logger.warning( 

138 f"Blocked URL with invalid scheme: {parsed.scheme} - {url}" 

139 ) 

140 return False 

141 

142 hostname = parsed.hostname 

143 if not hostname: 

144 logger.warning(f"Blocked URL with no hostname: {url}") 

145 return False 

146 

147 # Check if hostname is an IP address 

148 try: 

149 ip = ipaddress.ip_address(hostname) 

150 if is_ip_blocked( 

151 str(ip), 

152 allow_localhost=allow_localhost, 

153 allow_private_ips=allow_private_ips, 

154 ): 

155 logger.warning( 

156 f"Blocked URL with internal/private IP: {hostname} - {url}" 

157 ) 

158 return False 

159 except ValueError: 

160 # Not an IP address, it's a hostname - need to resolve it 

161 pass 

162 

163 # Resolve hostname to IP and check 

164 try: 

165 # Get all IP addresses for hostname 

166 # nosec B104 - DNS resolution is intentional for SSRF prevention (checking if hostname resolves to private IP) 

167 addr_info = socket.getaddrinfo( 

168 hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM 

169 ) 

170 

171 for info in addr_info: 

172 ip_str = str( 

173 info[4][0] 

174 ) # Extract IP address from addr_info tuple 

175 

176 if is_ip_blocked( 

177 ip_str, 

178 allow_localhost=allow_localhost, 

179 allow_private_ips=allow_private_ips, 

180 ): 

181 logger.warning( 

182 f"Blocked URL - hostname {hostname} resolves to " 

183 f"internal/private IP: {ip_str} - {url}" 

184 ) 

185 return False 

186 

187 except socket.gaierror: 

188 logger.warning(f"Failed to resolve hostname {hostname}") 

189 return False 

190 except Exception: 

191 logger.exception("Error during hostname resolution") 

192 return False 

193 

194 # URL passes all checks 

195 return True 

196 

197 except Exception: 

198 logger.exception(f"Error validating URL {url}") 

199 return False 

200 

201 

202def get_safe_url( 

203 url: Optional[str], default: Optional[str] = None 

204) -> Optional[str]: 

205 """ 

206 Get URL if it's safe, otherwise return default. 

207 

208 Args: 

209 url: URL to validate 

210 default: Default value if URL is unsafe 

211 

212 Returns: 

213 URL if safe, default otherwise 

214 """ 

215 if not url: 

216 return default 

217 

218 if validate_url(url): 

219 return url 

220 

221 logger.warning(f"Unsafe URL rejected: {url}") 

222 return default