Coverage for src / local_deep_research / security / safe_requests.py: 95%

63 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Safe HTTP Requests Wrapper 

3 

4Wraps requests library to add SSRF protection and security best practices. 

5""" 

6 

7import requests 

8from typing import Any, Optional 

9from loguru import logger 

10 

11from .ssrf_validator import validate_url 

12 

13 

14# Default timeout for all HTTP requests (prevents hanging) 

15DEFAULT_TIMEOUT = 30 # seconds 

16 

17# Maximum response size to prevent memory exhaustion (10MB) 

18MAX_RESPONSE_SIZE = 10 * 1024 * 1024 

19 

20 

21def safe_get( 

22 url: str, 

23 params: Optional[dict] = None, 

24 timeout: int = DEFAULT_TIMEOUT, 

25 allow_localhost: bool = False, 

26 allow_private_ips: bool = False, 

27 **kwargs, 

28) -> requests.Response: 

29 """ 

30 Make a safe HTTP GET request with SSRF protection. 

31 

32 Args: 

33 url: URL to request 

34 params: URL parameters 

35 timeout: Request timeout in seconds 

36 allow_localhost: Whether to allow localhost/loopback addresses. 

37 Set to True for trusted internal services like self-hosted 

38 search engines (e.g., searxng). Default False. 

39 allow_private_ips: Whether to allow all RFC1918 private IPs (10.x, 172.16-31.x, 

40 192.168.x) plus localhost. Use for trusted self-hosted services like SearXNG 

41 that may be running on a different machine on the local network. 

42 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

43 **kwargs: Additional arguments to pass to requests.get() 

44 

45 Returns: 

46 Response object 

47 

48 Raises: 

49 ValueError: If URL fails SSRF validation 

50 requests.RequestException: If request fails 

51 """ 

52 # Validate URL to prevent SSRF 

53 if not validate_url( 

54 url, 

55 allow_localhost=allow_localhost, 

56 allow_private_ips=allow_private_ips, 

57 ): 

58 raise ValueError( 

59 f"URL failed security validation (possible SSRF): {url}" 

60 ) 

61 

62 # Ensure timeout is set 

63 if "timeout" not in kwargs: 63 ↛ 69line 63 didn't jump to line 69 because the condition on line 63 was always true

64 kwargs["timeout"] = timeout 

65 

66 # Disable redirects by default to prevent SSRF bypass via redirect chains 

67 # Redirects could point to internal services, bypassing initial URL validation 

68 # Callers can explicitly enable redirects if needed and trust the redirect target 

69 if "allow_redirects" not in kwargs: 

70 kwargs["allow_redirects"] = False 

71 

72 try: 

73 response = requests.get(url, params=params, **kwargs) 

74 

75 # Check response size 

76 content_length = response.headers.get("Content-Length") 

77 if content_length: 

78 try: 

79 if int(content_length) > MAX_RESPONSE_SIZE: 

80 raise ValueError( 

81 f"Response too large: {content_length} bytes " 

82 f"(max {MAX_RESPONSE_SIZE})" 

83 ) 

84 except (ValueError, TypeError): 

85 # Ignore if Content-Length is not a valid number (e.g., in mocks) 

86 pass 

87 

88 return response 

89 

90 except requests.Timeout: 

91 logger.warning(f"Request timeout after {timeout}s: {url}") 

92 raise 

93 except requests.RequestException as e: 

94 logger.warning(f"Request failed for {url}: {e}") 

95 raise 

96 

97 

98def safe_post( 

99 url: str, 

100 data: Optional[Any] = None, 

101 json: Optional[dict] = None, 

102 timeout: int = DEFAULT_TIMEOUT, 

103 allow_localhost: bool = False, 

104 allow_private_ips: bool = False, 

105 **kwargs, 

106) -> requests.Response: 

107 """ 

108 Make a safe HTTP POST request with SSRF protection. 

109 

110 Args: 

111 url: URL to request 

112 data: Data to send in request body 

113 json: JSON data to send in request body 

114 timeout: Request timeout in seconds 

115 allow_localhost: Whether to allow localhost/loopback addresses. 

116 Set to True for trusted internal services like self-hosted 

117 search engines (e.g., searxng). Default False. 

118 allow_private_ips: Whether to allow all RFC1918 private IPs (10.x, 172.16-31.x, 

119 192.168.x) plus localhost. Use for trusted self-hosted services like SearXNG 

120 that may be running on a different machine on the local network. 

121 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

122 **kwargs: Additional arguments to pass to requests.post() 

123 

124 Returns: 

125 Response object 

126 

127 Raises: 

128 ValueError: If URL fails SSRF validation 

129 requests.RequestException: If request fails 

130 """ 

131 # Validate URL to prevent SSRF 

132 if not validate_url( 

133 url, 

134 allow_localhost=allow_localhost, 

135 allow_private_ips=allow_private_ips, 

136 ): 

137 raise ValueError( 

138 f"URL failed security validation (possible SSRF): {url}" 

139 ) 

140 

141 # Ensure timeout is set 

142 if "timeout" not in kwargs: 142 ↛ 148line 142 didn't jump to line 148 because the condition on line 142 was always true

143 kwargs["timeout"] = timeout 

144 

145 # Disable redirects by default to prevent SSRF bypass via redirect chains 

146 # Redirects could point to internal services, bypassing initial URL validation 

147 # Callers can explicitly enable redirects if needed and trust the redirect target 

148 if "allow_redirects" not in kwargs: 148 ↛ 151line 148 didn't jump to line 151 because the condition on line 148 was always true

149 kwargs["allow_redirects"] = False 

150 

151 try: 

152 response = requests.post(url, data=data, json=json, **kwargs) 

153 

154 # Check response size 

155 content_length = response.headers.get("Content-Length") 

156 if content_length: 

157 try: 

158 if int(content_length) > MAX_RESPONSE_SIZE: 158 ↛ 167line 158 didn't jump to line 167 because the condition on line 158 was always true

159 raise ValueError( 

160 f"Response too large: {content_length} bytes " 

161 f"(max {MAX_RESPONSE_SIZE})" 

162 ) 

163 except (ValueError, TypeError): 

164 # Ignore if Content-Length is not a valid number (e.g., in mocks) 

165 pass 

166 

167 return response 

168 

169 except requests.Timeout: 

170 logger.warning(f"Request timeout after {timeout}s: {url}") 

171 raise 

172 except requests.RequestException as e: 

173 logger.warning(f"Request failed for {url}: {e}") 

174 raise 

175 

176 

177# Create a safe session class 

178class SafeSession(requests.Session): 

179 """ 

180 Session with built-in SSRF protection. 

181 

182 Usage: 

183 with SafeSession() as session: 

184 response = session.get(url) 

185 

186 # For trusted internal services (e.g., searxng on localhost): 

187 with SafeSession(allow_localhost=True) as session: 

188 response = session.get(url) 

189 

190 # For trusted internal services on any private network IP: 

191 with SafeSession(allow_private_ips=True) as session: 

192 response = session.get(url) 

193 """ 

194 

195 def __init__( 

196 self, allow_localhost: bool = False, allow_private_ips: bool = False 

197 ): 

198 """ 

199 Initialize SafeSession. 

200 

201 Args: 

202 allow_localhost: Whether to allow localhost/loopback addresses. 

203 allow_private_ips: Whether to allow all RFC1918 private IPs (10.x, 172.16-31.x, 

204 192.168.x) plus localhost. Use for trusted self-hosted services like SearXNG. 

205 Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked. 

206 """ 

207 super().__init__() 

208 self.allow_localhost = allow_localhost 

209 self.allow_private_ips = allow_private_ips 

210 

211 def request(self, method: str, url: str, **kwargs) -> requests.Response: 

212 """Override request method to add SSRF validation.""" 

213 # Validate URL 

214 if not validate_url( 

215 url, 

216 allow_localhost=self.allow_localhost, 

217 allow_private_ips=self.allow_private_ips, 

218 ): 

219 raise ValueError( 

220 f"URL failed security validation (possible SSRF): {url}" 

221 ) 

222 

223 # Ensure timeout is set 

224 if "timeout" not in kwargs: 

225 kwargs["timeout"] = DEFAULT_TIMEOUT 

226 

227 return super().request(method, url, **kwargs)