Coverage for src / local_deep_research / web_search_engines / engines / search_engine_google_pse.py: 98%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1from loguru import logger 

2import random 

3import time 

4from typing import Any, Dict, List, Optional 

5 

6from langchain_core.language_models import BaseLLM 

7from requests.exceptions import RequestException 

8 

9from ...security.safe_requests import safe_get 

10from ..rate_limiting import RateLimitError 

11from ..search_engine_base import BaseSearchEngine 

12 

13 

14class GooglePSESearchEngine(BaseSearchEngine): 

15 """Google Programmable Search Engine implementation""" 

16 

17 # Mark as public search engine 

18 is_public = True 

19 # Mark as generic search engine (general web search) 

20 is_generic = True 

21 

22 def __init__( 

23 self, 

24 max_results: int = 10, 

25 region: str = "us", 

26 safe_search: bool = True, 

27 search_language: str = "English", 

28 api_key: Optional[str] = None, 

29 search_engine_id: Optional[str] = None, 

30 llm: Optional[BaseLLM] = None, 

31 include_full_content: bool = False, 

32 max_filtered_results: Optional[int] = None, 

33 settings_snapshot: Optional[Dict[str, Any]] = None, 

34 max_retries: int = 3, 

35 retry_delay: float = 2.0, 

36 **kwargs, 

37 ): 

38 """ 

39 Initialize the Google Programmable Search Engine. 

40 

41 Args: 

42 max_results: Maximum number of search results 

43 region: Region code for search results 

44 safe_search: Whether to enable safe search 

45 search_language: Language for search results 

46 api_key: Google API key (can also be set via LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_API_KEY env var or in UI settings) 

47 search_engine_id: Google CSE ID (can also be set via LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_ENGINE_ID env var or in UI settings) 

48 llm: Language model for relevance filtering 

49 include_full_content: Whether to include full webpage content in results 

50 max_filtered_results: Maximum number of results to keep after filtering 

51 max_retries: Maximum number of retry attempts for API requests 

52 retry_delay: Base delay in seconds between retry attempts 

53 **kwargs: Additional parameters (ignored but accepted for compatibility) 

54 """ 

55 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

56 super().__init__( 

57 llm=llm, 

58 max_filtered_results=max_filtered_results, 

59 max_results=max_results, 

60 include_full_content=include_full_content, 

61 settings_snapshot=settings_snapshot, 

62 **kwargs, 

63 ) 

64 

65 # Google PSE returns full content via its API (snippet + htmlSnippet), 

66 # so _init_full_search() is intentionally not called here. 

67 

68 # Retry configuration 

69 self.max_retries = max_retries 

70 self.retry_delay = retry_delay 

71 

72 # Rate limiting - keep track of last request time 

73 self.last_request_time: float = 0.0 

74 self.min_request_interval = ( 

75 0.5 # Minimum time between requests in seconds 

76 ) 

77 

78 # Language code mapping 

79 language_code_mapping = { 

80 "english": "en", 

81 "spanish": "es", 

82 "french": "fr", 

83 "german": "de", 

84 "italian": "it", 

85 "japanese": "ja", 

86 "korean": "ko", 

87 "portuguese": "pt", 

88 "russian": "ru", 

89 "chinese": "zh-CN", 

90 } 

91 

92 # Get language code 

93 search_language = search_language.lower() 

94 self.language = language_code_mapping.get(search_language, "en") 

95 

96 # Safe search setting 

97 self.safe = "active" if safe_search else "off" 

98 

99 # Region/Country setting 

100 self.region = region 

101 

102 # API key and Search Engine ID - check params, env vars, or database 

103 from ...config.thread_settings import ( 

104 get_setting_from_snapshot, 

105 NoSettingsContextError, 

106 ) 

107 

108 self.api_key = api_key 

109 if not self.api_key: 

110 try: 

111 self.api_key = get_setting_from_snapshot( 

112 "search.engine.web.google_pse.api_key", 

113 default=None, 

114 settings_snapshot=self.settings_snapshot, 

115 ) 

116 except NoSettingsContextError: 

117 # No settings context available 

118 logger.debug( 

119 "No settings context available for Google PSE API key" 

120 ) 

121 pass 

122 

123 self.search_engine_id = search_engine_id 

124 if not self.search_engine_id: 

125 try: 

126 self.search_engine_id = get_setting_from_snapshot( 

127 "search.engine.web.google_pse.engine_id", 

128 default=None, 

129 settings_snapshot=self.settings_snapshot, 

130 ) 

131 except NoSettingsContextError: 

132 # No settings context available 

133 logger.debug( 

134 "No settings context available for Google PSE engine ID" 

135 ) 

136 pass 

137 

138 if not self.api_key: 

139 raise ValueError( 

140 "Google API key is required. Set it in the UI settings, use the api_key parameter, or set the LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_API_KEY environment variable." 

141 ) 

142 if not self.search_engine_id: 

143 raise ValueError( 

144 "Google Search Engine ID is required. Set it in the UI settings, use the search_engine_id parameter, or set the LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_ENGINE_ID environment variable." 

145 ) 

146 

147 # Validate connection and credentials 

148 self._validate_connection() 

149 

150 def _validate_connection(self): 

151 """Test the connection to ensure API key and Search Engine ID are valid""" 

152 try: 

153 # Make a minimal test query 

154 response = self._make_request("test") 

155 

156 # Check if we got a valid response 

157 if response.get("error"): 

158 error_msg = response["error"].get("message", "Unknown error") 

159 raise ValueError(f"Google PSE API error: {error_msg}") # noqa: TRY301 — except only adds logging before re-raise 

160 

161 # If we get here, the connection is valid 

162 logger.info("Google PSE connection validated successfully") 

163 return True 

164 

165 except Exception: 

166 # Log the error and re-raise 

167 logger.exception("Error validating Google PSE connection") 

168 raise 

169 

170 def _respect_rate_limit(self): 

171 """Ensure we don't exceed rate limits by adding appropriate delay between requests""" 

172 current_time = time.time() 

173 elapsed = current_time - self.last_request_time 

174 

175 # If we've made a request recently, wait until the minimum interval has passed 

176 if elapsed < self.min_request_interval: 

177 sleep_time = self.min_request_interval - elapsed 

178 logger.debug("Rate limiting: sleeping for {:.2f} s", sleep_time) 

179 time.sleep(sleep_time) 

180 

181 # Update the last request time 

182 self.last_request_time = time.time() 

183 

184 def _make_request(self, query: str, start_index: int = 1) -> Dict: 

185 """ 

186 Make a request to the Google PSE API with retry logic and rate limiting 

187 

188 Args: 

189 query: Search query string 

190 start_index: Starting index for pagination 

191 

192 Returns: 

193 JSON response from the API 

194 

195 Raises: 

196 RequestException: If all retry attempts fail 

197 """ 

198 # Base URL for the API 

199 url = "https://www.googleapis.com/customsearch/v1" 

200 

201 # Parameters for the request 

202 params = { 

203 "key": self.api_key, 

204 "cx": self.search_engine_id, 

205 "q": query, 

206 "num": min(10, self.max_results), # Max 10 per request 

207 "start": start_index, 

208 "safe": self.safe, 

209 "lr": f"lang_{self.language}", 

210 "gl": self.region, 

211 } 

212 

213 # Implement retry logic with exponential backoff 

214 attempt = 0 

215 last_exception: Exception | None = None 

216 

217 while attempt < self.max_retries: 

218 try: 

219 # Add jitter to retries after the first attempt 

220 if attempt > 0: 

221 # Security: random jitter for exponential backoff retry, not security-sensitive 

222 jitter = random.uniform(0.5, 1.5) 

223 sleep_time = ( 

224 self.retry_delay * (2 ** (attempt - 1)) * jitter 

225 ) 

226 logger.info( 

227 "Retry attempt {} / {} for query '{}'. Waiting {} s", 

228 attempt + 1, 

229 self.max_retries, 

230 query, 

231 f"{sleep_time:.2f}", 

232 ) 

233 time.sleep(sleep_time) 

234 

235 # Make the request 

236 logger.debug( 

237 "Making request to Google PSE API: {} (start_index={})", 

238 query, 

239 start_index, 

240 ) 

241 # Apply rate limiting before request 

242 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

243 self.engine_type 

244 ) 

245 

246 response = safe_get(url, params=params, timeout=10) 

247 

248 # Check for HTTP errors 

249 response.raise_for_status() 

250 

251 # Return the JSON response 

252 return response.json() # type: ignore[no-any-return] 

253 

254 except RequestException as e: 

255 error_msg = str(e) 

256 sanitized = self._sanitize_error_message(error_msg) 

257 logger.warning( 

258 "Request error on attempt {} / {}: {}", 

259 attempt + 1, 

260 self.max_retries, 

261 sanitized, 

262 ) 

263 

264 # Check for rate limiting patterns 

265 if ( 

266 "quota" in error_msg.lower() 

267 or "quotaExceeded" in error_msg 

268 or "dailyLimitExceeded" in error_msg 

269 or "rateLimitExceeded" in error_msg 

270 or "429" in error_msg 

271 or "403" in error_msg 

272 ): 

273 raise RateLimitError( 

274 f"Google PSE rate limit/quota exceeded: {sanitized}" 

275 ) 

276 

277 last_exception = e 

278 except Exception as e: 

279 error_msg = str(e) 

280 sanitized = self._sanitize_error_message(error_msg) 

281 logger.warning( 

282 "Error on attempt {} / {}: {}", 

283 attempt + 1, 

284 self.max_retries, 

285 sanitized, 

286 ) 

287 

288 # Check for rate limiting patterns in general errors 

289 if "quota" in error_msg.lower() or "limit" in error_msg.lower(): 

290 raise RateLimitError( 

291 f"Google PSE error (possible rate limit): {sanitized}" 

292 ) 

293 

294 last_exception = e 

295 

296 attempt += 1 

297 

298 # If we get here, all retries failed 

299 error_msg = f"Failed to get response from Google PSE API after {self.max_retries} attempts" 

300 logger.error(error_msg) 

301 

302 if last_exception: 302 ↛ 306line 302 didn't jump to line 306 because the condition on line 302 was always true

303 raise RequestException( 

304 f"{error_msg}: {self._sanitize_error_message(str(last_exception))}" 

305 ) 

306 raise RequestException(error_msg) 

307 

308 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

309 """Get search result previews/snippets""" 

310 results = [] 

311 

312 # Google PSE API returns a maximum of 10 results per request 

313 # We may need to make multiple requests to get the desired number 

314 start_index = 1 

315 total_results = 0 

316 

317 while total_results < self.max_results: 317 ↛ 361line 317 didn't jump to line 361 because the condition on line 317 was always true

318 try: 

319 response = self._make_request(query, start_index) 

320 

321 # Break if no items 

322 if "items" not in response: 

323 break 

324 

325 items = response.get("items", []) 

326 

327 # Process each result 

328 for item in items: 

329 title = item.get("title", "") 

330 snippet = item.get("snippet", "") 

331 url = item.get("link", "") 

332 

333 # Skip results without URL 

334 if not url: 

335 continue 

336 

337 results.append( 

338 { 

339 "title": title, 

340 "snippet": snippet, 

341 "link": url, 

342 "source": "Google Programmable Search", 

343 } 

344 ) 

345 

346 total_results += 1 

347 if total_results >= self.max_results: 

348 break 

349 

350 # Check if there are more results 

351 if not items or total_results >= self.max_results: 

352 break 

353 

354 # Update start index for next request 

355 start_index += len(items) 

356 

357 except Exception: 

358 logger.exception("Error getting search results") 

359 break 

360 

361 logger.info( 

362 "Retrieved {} search results for query: '{}'", len(results), query 

363 ) 

364 return results 

365 

366 def _get_full_content( 

367 self, relevant_items: List[Dict[str, Any]] 

368 ) -> List[Dict[str, Any]]: 

369 """Get full content for search results""" 

370 # Use the BaseSearchEngine implementation 

371 return super()._get_full_content(relevant_items)