Coverage for src / local_deep_research / security / safe_requests.py: 95%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Safe HTTP Requests Wrapper 

3 

4Wraps requests library to add SSRF protection and security best practices. 

5""" 

6 

7import requests 

8from typing import Any, Optional 

9from loguru import logger 

10 

11from .ssrf_validator import validate_url 

12 

13 

14# Default timeout for all HTTP requests (prevents hanging) 

15DEFAULT_TIMEOUT = 30 # seconds 

16 

17# Maximum response size to prevent memory exhaustion (10MB) 

18MAX_RESPONSE_SIZE = 10 * 1024 * 1024 

19 

20 

21def safe_get( 

22 url: str, 

23 params: Optional[dict] = None, 

24 timeout: int = DEFAULT_TIMEOUT, 

25 allow_localhost: bool = False, 

26 allow_private_ips: bool = False, 

27 **kwargs, 

28) -> requests.Response: 

29 """ 

30 Make a safe HTTP GET request with SSRF protection. 

31 

32 Args: 

33 url: URL to request 

34 params: URL parameters 

35 timeout: Request timeout in seconds 

36 allow_localhost: Whether to allow localhost/loopback addresses. 

37 Set to True for trusted internal services like self-hosted 

38 search engines (e.g., searxng). Default False. 

39 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

40 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

41 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

42 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

43 like SearXNG or Ollama in containerized environments. 

44 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

45 **kwargs: Additional arguments to pass to requests.get() 

46 

47 Returns: 

48 Response object 

49 

50 Raises: 

51 ValueError: If URL fails SSRF validation 

52 requests.RequestException: If request fails 

53 """ 

54 # Validate URL to prevent SSRF 

55 if not validate_url( 

56 url, 

57 allow_localhost=allow_localhost, 

58 allow_private_ips=allow_private_ips, 

59 ): 

60 raise ValueError( 

61 f"URL failed security validation (possible SSRF): {url}" 

62 ) 

63 

64 # Ensure timeout is set 

65 if "timeout" not in kwargs: 65 ↛ 71line 65 didn't jump to line 71 because the condition on line 65 was always true

66 kwargs["timeout"] = timeout 

67 

68 # Disable redirects by default to prevent SSRF bypass via redirect chains 

69 # Redirects could point to internal services, bypassing initial URL validation 

70 # Callers can explicitly enable redirects if needed and trust the redirect target 

71 if "allow_redirects" not in kwargs: 

72 kwargs["allow_redirects"] = False 

73 

74 try: 

75 response = requests.get(url, params=params, **kwargs) 

76 

77 # Check response size 

78 content_length = response.headers.get("Content-Length") 

79 if content_length: 

80 try: 

81 if int(content_length) > MAX_RESPONSE_SIZE: 

82 raise ValueError( 

83 f"Response too large: {content_length} bytes " 

84 f"(max {MAX_RESPONSE_SIZE})" 

85 ) 

86 except (ValueError, TypeError): 

87 # Ignore if Content-Length is not a valid number (e.g., in mocks) 

88 pass 

89 

90 return response 

91 

92 except requests.Timeout: 

93 logger.warning(f"Request timeout after {timeout}s: {url}") 

94 raise 

95 except requests.RequestException as e: 

96 logger.warning(f"Request failed for {url}: {e}") 

97 raise 

98 

99 

100def safe_post( 

101 url: str, 

102 data: Optional[Any] = None, 

103 json: Optional[dict] = None, 

104 timeout: int = DEFAULT_TIMEOUT, 

105 allow_localhost: bool = False, 

106 allow_private_ips: bool = False, 

107 **kwargs, 

108) -> requests.Response: 

109 """ 

110 Make a safe HTTP POST request with SSRF protection. 

111 

112 Args: 

113 url: URL to request 

114 data: Data to send in request body 

115 json: JSON data to send in request body 

116 timeout: Request timeout in seconds 

117 allow_localhost: Whether to allow localhost/loopback addresses. 

118 Set to True for trusted internal services like self-hosted 

119 search engines (e.g., searxng). Default False. 

120 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

121 This includes RFC1918 (10.x, 172.16-31.x, 192.168.x), CGNAT (100.64.x.x 

122 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6 

123 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services 

124 like SearXNG or Ollama in containerized environments. 

125 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

126 **kwargs: Additional arguments to pass to requests.post() 

127 

128 Returns: 

129 Response object 

130 

131 Raises: 

132 ValueError: If URL fails SSRF validation 

133 requests.RequestException: If request fails 

134 """ 

135 # Validate URL to prevent SSRF 

136 if not validate_url( 

137 url, 

138 allow_localhost=allow_localhost, 

139 allow_private_ips=allow_private_ips, 

140 ): 

141 raise ValueError( 

142 f"URL failed security validation (possible SSRF): {url}" 

143 ) 

144 

145 # Ensure timeout is set 

146 if "timeout" not in kwargs: 146 ↛ 152line 146 didn't jump to line 152 because the condition on line 146 was always true

147 kwargs["timeout"] = timeout 

148 

149 # Disable redirects by default to prevent SSRF bypass via redirect chains 

150 # Redirects could point to internal services, bypassing initial URL validation 

151 # Callers can explicitly enable redirects if needed and trust the redirect target 

152 if "allow_redirects" not in kwargs: 152 ↛ 155line 152 didn't jump to line 155 because the condition on line 152 was always true

153 kwargs["allow_redirects"] = False 

154 

155 try: 

156 response = requests.post(url, data=data, json=json, **kwargs) 

157 

158 # Check response size 

159 content_length = response.headers.get("Content-Length") 

160 if content_length: 

161 try: 

162 if int(content_length) > MAX_RESPONSE_SIZE: 162 ↛ 171line 162 didn't jump to line 171 because the condition on line 162 was always true

163 raise ValueError( 

164 f"Response too large: {content_length} bytes " 

165 f"(max {MAX_RESPONSE_SIZE})" 

166 ) 

167 except (ValueError, TypeError): 

168 # Ignore if Content-Length is not a valid number (e.g., in mocks) 

169 pass 

170 

171 return response 

172 

173 except requests.Timeout: 

174 logger.warning(f"Request timeout after {timeout}s: {url}") 

175 raise 

176 except requests.RequestException as e: 

177 logger.warning(f"Request failed for {url}: {e}") 

178 raise 

179 

180 

181# Create a safe session class 

182class SafeSession(requests.Session): 

183 """ 

184 Session with built-in SSRF protection. 

185 

186 Usage: 

187 with SafeSession() as session: 

188 response = session.get(url) 

189 

190 # For trusted internal services (e.g., searxng on localhost): 

191 with SafeSession(allow_localhost=True) as session: 

192 response = session.get(url) 

193 

194 # For trusted internal services on any private network IP: 

195 with SafeSession(allow_private_ips=True) as session: 

196 response = session.get(url) 

197 """ 

198 

199 def __init__( 

200 self, allow_localhost: bool = False, allow_private_ips: bool = False 

201 ): 

202 """ 

203 Initialize SafeSession. 

204 

205 Args: 

206 allow_localhost: Whether to allow localhost/loopback addresses. 

207 allow_private_ips: Whether to allow all private/internal IPs plus localhost. 

208 This includes RFC1918, CGNAT (100.64.x.x used by Podman), link-local, and 

209 IPv6 private ranges. Use for trusted self-hosted services like SearXNG or 

210 Ollama in containerized environments. 

211 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

212 """ 

213 super().__init__() 

214 self.allow_localhost = allow_localhost 

215 self.allow_private_ips = allow_private_ips 

216 

217 def request(self, method: str, url: str, **kwargs) -> requests.Response: 

218 """Override request method to add SSRF validation.""" 

219 # Validate URL 

220 if not validate_url( 

221 url, 

222 allow_localhost=self.allow_localhost, 

223 allow_private_ips=self.allow_private_ips, 

224 ): 

225 raise ValueError( 

226 f"URL failed security validation (possible SSRF): {url}" 

227 ) 

228 

229 # Ensure timeout is set 

230 if "timeout" not in kwargs: 

231 kwargs["timeout"] = DEFAULT_TIMEOUT 

232 

233 return super().request(method, url, **kwargs)