Coverage for src / local_deep_research / web_search_engines / engines / search_engine_google_pse.py: 80%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1from loguru import logger 

2import random 

3import time 

4from typing import Any, Dict, List, Optional 

5 

6from langchain_core.language_models import BaseLLM 

7from requests.exceptions import RequestException 

8 

9from ...security.safe_requests import safe_get 

10from ..rate_limiting import RateLimitError 

11from ..search_engine_base import BaseSearchEngine 

12 

13 

14class GooglePSESearchEngine(BaseSearchEngine): 

15 """Google Programmable Search Engine implementation""" 

16 

17 # Mark as public search engine 

18 is_public = True 

19 # Mark as generic search engine (general web search) 

20 is_generic = True 

21 

22 def __init__( 

23 self, 

24 max_results: int = 10, 

25 region: str = "us", 

26 safe_search: bool = True, 

27 search_language: str = "English", 

28 api_key: Optional[str] = None, 

29 search_engine_id: Optional[str] = None, 

30 llm: Optional[BaseLLM] = None, 

31 include_full_content: bool = False, 

32 max_filtered_results: Optional[int] = None, 

33 max_retries: int = 3, 

34 retry_delay: float = 2.0, 

35 **kwargs, 

36 ): 

37 """ 

38 Initialize the Google Programmable Search Engine. 

39 

40 Args: 

41 max_results: Maximum number of search results 

42 region: Region code for search results 

43 safe_search: Whether to enable safe search 

44 search_language: Language for search results 

45 api_key: Google API key (can also be set in GOOGLE_PSE_API_KEY env) 

46 search_engine_id: Google CSE ID (can also be set in GOOGLE_PSE_ENGINE_ID env) 

47 llm: Language model for relevance filtering 

48 include_full_content: Whether to include full webpage content in results 

49 max_filtered_results: Maximum number of results to keep after filtering 

50 max_retries: Maximum number of retry attempts for API requests 

51 retry_delay: Base delay in seconds between retry attempts 

52 **kwargs: Additional parameters (ignored but accepted for compatibility) 

53 """ 

54 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

55 super().__init__( 

56 llm=llm, 

57 max_filtered_results=max_filtered_results, 

58 max_results=max_results, 

59 **kwargs, 

60 ) 

61 self.include_full_content = include_full_content 

62 

63 # Retry configuration 

64 self.max_retries = max_retries 

65 self.retry_delay = retry_delay 

66 

67 # Rate limiting - keep track of last request time 

68 self.last_request_time = 0 

69 self.min_request_interval = ( 

70 0.5 # Minimum time between requests in seconds 

71 ) 

72 

73 # Language code mapping 

74 language_code_mapping = { 

75 "english": "en", 

76 "spanish": "es", 

77 "french": "fr", 

78 "german": "de", 

79 "italian": "it", 

80 "japanese": "ja", 

81 "korean": "ko", 

82 "portuguese": "pt", 

83 "russian": "ru", 

84 "chinese": "zh-CN", 

85 } 

86 

87 # Get language code 

88 search_language = search_language.lower() 

89 self.language = language_code_mapping.get(search_language, "en") 

90 

91 # Safe search setting 

92 self.safe = "active" if safe_search else "off" 

93 

94 # Region/Country setting 

95 self.region = region 

96 

97 # API key and Search Engine ID - check params, env vars, or database 

98 from ...config.thread_settings import ( 

99 get_setting_from_snapshot, 

100 NoSettingsContextError, 

101 ) 

102 

103 self.api_key = api_key 

104 if not self.api_key: 

105 try: 

106 self.api_key = get_setting_from_snapshot( 

107 "search.engine.web.google_pse.api_key", 

108 default=None, 

109 settings_snapshot=self.settings_snapshot, 

110 ) 

111 except NoSettingsContextError: 

112 # No settings context available 

113 logger.debug( 

114 "No settings context available for Google PSE API key" 

115 ) 

116 pass 

117 

118 self.search_engine_id = search_engine_id 

119 if not self.search_engine_id: 

120 try: 

121 self.search_engine_id = get_setting_from_snapshot( 

122 "search.engine.web.google_pse.engine_id", 

123 default=None, 

124 settings_snapshot=self.settings_snapshot, 

125 ) 

126 except NoSettingsContextError: 

127 # No settings context available 

128 logger.debug( 

129 "No settings context available for Google PSE engine ID" 

130 ) 

131 pass 

132 

133 if not self.api_key: 

134 raise ValueError( 

135 "Google API key is required. Set it in the UI settings, use the api_key parameter, or set the GOOGLE_PSE_API_KEY environment variable." 

136 ) 

137 if not self.search_engine_id: 

138 raise ValueError( 

139 "Google Search Engine ID is required. Set it in the UI settings, use the search_engine_id parameter, or set the GOOGLE_PSE_ENGINE_ID environment variable." 

140 ) 

141 

142 # Validate connection and credentials 

143 self._validate_connection() 

144 

145 def _validate_connection(self): 

146 """Test the connection to ensure API key and Search Engine ID are valid""" 

147 try: 

148 # Make a minimal test query 

149 response = self._make_request("test") 

150 

151 # Check if we got a valid response 

152 if response.get("error"): 

153 error_msg = response["error"].get("message", "Unknown error") 

154 raise ValueError(f"Google PSE API error: {error_msg}") 

155 

156 # If we get here, the connection is valid 

157 logger.info("Google PSE connection validated successfully") 

158 return True 

159 

160 except Exception: 

161 # Log the error and re-raise 

162 logger.exception("Error validating Google PSE connection") 

163 raise 

164 

165 def _respect_rate_limit(self): 

166 """Ensure we don't exceed rate limits by adding appropriate delay between requests""" 

167 current_time = time.time() 

168 elapsed = current_time - self.last_request_time 

169 

170 # If we've made a request recently, wait until the minimum interval has passed 

171 if elapsed < self.min_request_interval: 

172 sleep_time = self.min_request_interval - elapsed 

173 logger.debug("Rate limiting: sleeping for %.2f s", sleep_time) 

174 time.sleep(sleep_time) 

175 

176 # Update the last request time 

177 self.last_request_time = time.time() 

178 

179 def _make_request(self, query: str, start_index: int = 1) -> Dict: 

180 """ 

181 Make a request to the Google PSE API with retry logic and rate limiting 

182 

183 Args: 

184 query: Search query string 

185 start_index: Starting index for pagination 

186 

187 Returns: 

188 JSON response from the API 

189 

190 Raises: 

191 RequestException: If all retry attempts fail 

192 """ 

193 # Base URL for the API 

194 url = "https://www.googleapis.com/customsearch/v1" 

195 

196 # Parameters for the request 

197 params = { 

198 "key": self.api_key, 

199 "cx": self.search_engine_id, 

200 "q": query, 

201 "num": min(10, self.max_results), # Max 10 per request 

202 "start": start_index, 

203 "safe": self.safe, 

204 "lr": f"lang_{self.language}", 

205 "gl": self.region, 

206 } 

207 

208 # Implement retry logic with exponential backoff 

209 attempt = 0 

210 last_exception = None 

211 

212 while attempt < self.max_retries: 

213 try: 

214 # Add jitter to retries after the first attempt 

215 if attempt > 0: 215 ↛ 217line 215 didn't jump to line 217 because the condition on line 215 was never true

216 # Security: random jitter for exponential backoff retry, not security-sensitive 

217 jitter = random.uniform(0.5, 1.5) 

218 sleep_time = ( 

219 self.retry_delay * (2 ** (attempt - 1)) * jitter 

220 ) 

221 logger.info( 

222 "Retry attempt %s / %s for query '%s'. Waiting %s s", 

223 attempt + 1, 

224 self.max_retries, 

225 query, 

226 f"{sleep_time:.2f}", 

227 ) 

228 time.sleep(sleep_time) 

229 

230 # Make the request 

231 logger.debug( 

232 "Making request to Google PSE API: %s (start_index=%s)", 

233 query, 

234 start_index, 

235 ) 

236 # Apply rate limiting before request 

237 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

238 self.engine_type 

239 ) 

240 

241 response = safe_get(url, params=params, timeout=10) 

242 

243 # Check for HTTP errors 

244 response.raise_for_status() 

245 

246 # Return the JSON response 

247 return response.json() 

248 

249 except RequestException as e: 

250 error_msg = str(e) 

251 logger.warning( 

252 "Request error on attempt %s / %s: %s", 

253 attempt + 1, 

254 self.max_retries, 

255 error_msg, 

256 ) 

257 

258 # Check for rate limiting patterns 

259 if ( 

260 "quota" in error_msg.lower() 

261 or "quotaExceeded" in error_msg 

262 or "dailyLimitExceeded" in error_msg 

263 or "rateLimitExceeded" in error_msg 

264 or "429" in error_msg 

265 or "403" in error_msg 

266 ): 

267 raise RateLimitError( 

268 f"Google PSE rate limit/quota exceeded: {error_msg}" 

269 ) 

270 

271 last_exception = e 

272 except Exception as e: 

273 error_msg = str(e) 

274 logger.warning( 

275 "Error on attempt %s / %s: %s", 

276 attempt + 1, 

277 self.max_retries, 

278 error_msg, 

279 ) 

280 

281 # Check for rate limiting patterns in general errors 

282 if "quota" in error_msg.lower() or "limit" in error_msg.lower(): 

283 raise RateLimitError( 

284 f"Google PSE error (possible rate limit): {error_msg}" 

285 ) 

286 

287 last_exception = e 

288 

289 attempt += 1 

290 

291 # If we get here, all retries failed 

292 error_msg = f"Failed to get response from Google PSE API after {self.max_retries} attempts" 

293 logger.error(error_msg) 

294 

295 if last_exception: 295 ↛ 298line 295 didn't jump to line 298 because the condition on line 295 was always true

296 raise RequestException(f"{error_msg}: {last_exception!s}") 

297 else: 

298 raise RequestException(error_msg) 

299 

300 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

301 """Get search result previews/snippets""" 

302 results = [] 

303 

304 # Google PSE API returns a maximum of 10 results per request 

305 # We may need to make multiple requests to get the desired number 

306 start_index = 1 

307 total_results = 0 

308 

309 while total_results < self.max_results: 309 ↛ 353line 309 didn't jump to line 353 because the condition on line 309 was always true

310 try: 

311 response = self._make_request(query, start_index) 

312 

313 # Break if no items 

314 if "items" not in response: 

315 break 

316 

317 items = response.get("items", []) 

318 

319 # Process each result 

320 for item in items: 

321 title = item.get("title", "") 

322 snippet = item.get("snippet", "") 

323 url = item.get("link", "") 

324 

325 # Skip results without URL 

326 if not url: 

327 continue 

328 

329 results.append( 

330 { 

331 "title": title, 

332 "snippet": snippet, 

333 "link": url, 

334 "source": "Google Programmable Search", 

335 } 

336 ) 

337 

338 total_results += 1 

339 if total_results >= self.max_results: 

340 break 

341 

342 # Check if there are more results 

343 if not items or total_results >= self.max_results: 

344 break 

345 

346 # Update start index for next request 

347 start_index += len(items) 

348 

349 except Exception: 

350 logger.exception("Error getting search results") 

351 break 

352 

353 logger.info( 

354 "Retrieved %s search results for query: '%s'", len(results), query 

355 ) 

356 return results 

357 

358 def _get_full_content( 

359 self, relevant_items: List[Dict[str, Any]] 

360 ) -> List[Dict[str, Any]]: 

361 """Get full content for search results""" 

362 # Use the BaseSearchEngine implementation 

363 return super()._get_full_content(relevant_items)