Coverage for src / local_deep_research / web_search_engines / engines / search_engine_google_pse.py: 75%

128 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1from loguru import logger 

2import random 

3import time 

4from typing import Any, Dict, List, Optional 

5 

6from langchain_core.language_models import BaseLLM 

7from requests.exceptions import RequestException 

8 

9from ...security.safe_requests import safe_get 

10from ..rate_limiting import RateLimitError 

11from ..search_engine_base import BaseSearchEngine 

12 

13 

14class GooglePSESearchEngine(BaseSearchEngine): 

15 """Google Programmable Search Engine implementation""" 

16 

17 # Mark as public search engine 

18 is_public = True 

19 # Mark as generic search engine (general web search) 

20 is_generic = True 

21 

22 def __init__( 

23 self, 

24 max_results: int = 10, 

25 region: str = "us", 

26 safe_search: bool = True, 

27 search_language: str = "English", 

28 api_key: Optional[str] = None, 

29 search_engine_id: Optional[str] = None, 

30 llm: Optional[BaseLLM] = None, 

31 include_full_content: bool = False, 

32 max_filtered_results: Optional[int] = None, 

33 max_retries: int = 3, 

34 retry_delay: float = 2.0, 

35 **kwargs, 

36 ): 

37 """ 

38 Initialize the Google Programmable Search Engine. 

39 

40 Args: 

41 max_results: Maximum number of search results 

42 region: Region code for search results 

43 safe_search: Whether to enable safe search 

44 search_language: Language for search results 

45 api_key: Google API key (can also be set in GOOGLE_PSE_API_KEY env) 

46 search_engine_id: Google CSE ID (can also be set in GOOGLE_PSE_ENGINE_ID env) 

47 llm: Language model for relevance filtering 

48 include_full_content: Whether to include full webpage content in results 

49 max_filtered_results: Maximum number of results to keep after filtering 

50 max_retries: Maximum number of retry attempts for API requests 

51 retry_delay: Base delay in seconds between retry attempts 

52 **kwargs: Additional parameters (ignored but accepted for compatibility) 

53 """ 

54 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

55 super().__init__( 

56 llm=llm, 

57 max_filtered_results=max_filtered_results, 

58 max_results=max_results, 

59 **kwargs, 

60 ) 

61 self.include_full_content = include_full_content 

62 

63 # Retry configuration 

64 self.max_retries = max_retries 

65 self.retry_delay = retry_delay 

66 

67 # Rate limiting - keep track of last request time 

68 self.last_request_time = 0 

69 self.min_request_interval = ( 

70 0.5 # Minimum time between requests in seconds 

71 ) 

72 

73 # Language code mapping 

74 language_code_mapping = { 

75 "english": "en", 

76 "spanish": "es", 

77 "french": "fr", 

78 "german": "de", 

79 "italian": "it", 

80 "japanese": "ja", 

81 "korean": "ko", 

82 "portuguese": "pt", 

83 "russian": "ru", 

84 "chinese": "zh-CN", 

85 } 

86 

87 # Get language code 

88 search_language = search_language.lower() 

89 self.language = language_code_mapping.get(search_language, "en") 

90 

91 # Safe search setting 

92 self.safe = "active" if safe_search else "off" 

93 

94 # Region/Country setting 

95 self.region = region 

96 

97 # API key and Search Engine ID - check params, env vars, or database 

98 from ...config.thread_settings import ( 

99 get_setting_from_snapshot, 

100 NoSettingsContextError, 

101 ) 

102 

103 self.api_key = api_key 

104 if not self.api_key: 

105 try: 

106 self.api_key = get_setting_from_snapshot( 

107 "search.engine.web.google_pse.api_key", 

108 default=None, 

109 settings_snapshot=self.settings_snapshot, 

110 ) 

111 except NoSettingsContextError: 

112 # No settings context available 

113 logger.debug( 

114 "No settings context available for Google PSE API key" 

115 ) 

116 pass 

117 

118 self.search_engine_id = search_engine_id 

119 if not self.search_engine_id: 

120 try: 

121 self.search_engine_id = get_setting_from_snapshot( 

122 "search.engine.web.google_pse.engine_id", 

123 default=None, 

124 settings_snapshot=self.settings_snapshot, 

125 ) 

126 except NoSettingsContextError: 

127 # No settings context available 

128 logger.debug( 

129 "No settings context available for Google PSE engine ID" 

130 ) 

131 pass 

132 

133 if not self.api_key: 

134 raise ValueError( 

135 "Google API key is required. Set it in the UI settings, use the api_key parameter, or set the GOOGLE_PSE_API_KEY environment variable." 

136 ) 

137 if not self.search_engine_id: 

138 raise ValueError( 

139 "Google Search Engine ID is required. Set it in the UI settings, use the search_engine_id parameter, or set the GOOGLE_PSE_ENGINE_ID environment variable." 

140 ) 

141 

142 # Validate connection and credentials 

143 self._validate_connection() 

144 

145 def _validate_connection(self): 

146 """Test the connection to ensure API key and Search Engine ID are valid""" 

147 try: 

148 # Make a minimal test query 

149 response = self._make_request("test") 

150 

151 # Check if we got a valid response 

152 if response.get("error"): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 error_msg = response["error"].get("message", "Unknown error") 

154 raise ValueError(f"Google PSE API error: {error_msg}") 

155 

156 # If we get here, the connection is valid 

157 logger.info("Google PSE connection validated successfully") 

158 return True 

159 

160 except Exception as e: 

161 # Log the error and re-raise 

162 logger.exception(f"Error validating Google PSE connection: {e!s}") 

163 raise 

164 

165 def _respect_rate_limit(self): 

166 """Ensure we don't exceed rate limits by adding appropriate delay between requests""" 

167 current_time = time.time() 

168 elapsed = current_time - self.last_request_time 

169 

170 # If we've made a request recently, wait until the minimum interval has passed 

171 if elapsed < self.min_request_interval: 

172 sleep_time = self.min_request_interval - elapsed 

173 logger.debug("Rate limiting: sleeping for %.2f s", sleep_time) 

174 time.sleep(sleep_time) 

175 

176 # Update the last request time 

177 self.last_request_time = time.time() 

178 

179 def _make_request(self, query: str, start_index: int = 1) -> Dict: 

180 """ 

181 Make a request to the Google PSE API with retry logic and rate limiting 

182 

183 Args: 

184 query: Search query string 

185 start_index: Starting index for pagination 

186 

187 Returns: 

188 JSON response from the API 

189 

190 Raises: 

191 RequestException: If all retry attempts fail 

192 """ 

193 # Base URL for the API 

194 url = "https://www.googleapis.com/customsearch/v1" 

195 

196 # Parameters for the request 

197 params = { 

198 "key": self.api_key, 

199 "cx": self.search_engine_id, 

200 "q": query, 

201 "num": min(10, self.max_results), # Max 10 per request 

202 "start": start_index, 

203 "safe": self.safe, 

204 "lr": f"lang_{self.language}", 

205 "gl": self.region, 

206 } 

207 

208 # Implement retry logic with exponential backoff 

209 attempt = 0 

210 last_exception = None 

211 

212 while attempt < self.max_retries: 

213 try: 

214 # Add jitter to retries after the first attempt 

215 if attempt > 0: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 jitter = random.uniform(0.5, 1.5) 

217 sleep_time = ( 

218 self.retry_delay * (2 ** (attempt - 1)) * jitter 

219 ) 

220 logger.info( 

221 "Retry attempt %s / %s for query '%s'. Waiting %s s", 

222 attempt + 1, 

223 self.max_retries, 

224 query, 

225 f"{sleep_time:.2f}", 

226 ) 

227 time.sleep(sleep_time) 

228 

229 # Make the request 

230 logger.debug( 

231 "Making request to Google PSE API: %s (start_index=%s)", 

232 query, 

233 start_index, 

234 ) 

235 # Apply rate limiting before request 

236 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

237 self.engine_type 

238 ) 

239 

240 response = safe_get(url, params=params, timeout=10) 

241 

242 # Check for HTTP errors 

243 response.raise_for_status() 

244 

245 # Return the JSON response 

246 return response.json() 

247 

248 except RequestException as e: 

249 error_msg = str(e) 

250 logger.warning( 

251 "Request error on attempt %s / %s: %s", 

252 attempt + 1, 

253 self.max_retries, 

254 error_msg, 

255 ) 

256 

257 # Check for rate limiting patterns 

258 if ( 

259 "quota" in error_msg.lower() 

260 or "quotaExceeded" in error_msg 

261 or "dailyLimitExceeded" in error_msg 

262 or "rateLimitExceeded" in error_msg 

263 or "429" in error_msg 

264 or "403" in error_msg 

265 ): 

266 raise RateLimitError( 

267 f"Google PSE rate limit/quota exceeded: {error_msg}" 

268 ) 

269 

270 last_exception = e 

271 except Exception as e: 

272 error_msg = str(e) 

273 logger.warning( 

274 "Error on attempt %s / %s: %s", 

275 attempt + 1, 

276 self.max_retries, 

277 error_msg, 

278 ) 

279 

280 # Check for rate limiting patterns in general errors 

281 if "quota" in error_msg.lower() or "limit" in error_msg.lower(): 

282 raise RateLimitError( 

283 f"Google PSE error (possible rate limit): {error_msg}" 

284 ) 

285 

286 last_exception = e 

287 

288 attempt += 1 

289 

290 # If we get here, all retries failed 

291 error_msg = f"Failed to get response from Google PSE API after {self.max_retries} attempts" 

292 logger.error(error_msg) 

293 

294 if last_exception: 294 ↛ 297line 294 didn't jump to line 297 because the condition on line 294 was always true

295 raise RequestException(f"{error_msg}: {last_exception!s}") 

296 else: 

297 raise RequestException(error_msg) 

298 

299 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

300 """Get search result previews/snippets""" 

301 results = [] 

302 

303 # Google PSE API returns a maximum of 10 results per request 

304 # We may need to make multiple requests to get the desired number 

305 start_index = 1 

306 total_results = 0 

307 

308 while total_results < self.max_results: 308 ↛ 352line 308 didn't jump to line 352 because the condition on line 308 was always true

309 try: 

310 response = self._make_request(query, start_index) 

311 

312 # Break if no items 

313 if "items" not in response: 

314 break 

315 

316 items = response.get("items", []) 

317 

318 # Process each result 

319 for item in items: 

320 title = item.get("title", "") 

321 snippet = item.get("snippet", "") 

322 url = item.get("link", "") 

323 

324 # Skip results without URL 

325 if not url: 

326 continue 

327 

328 results.append( 

329 { 

330 "title": title, 

331 "snippet": snippet, 

332 "link": url, 

333 "source": "Google Programmable Search", 

334 } 

335 ) 

336 

337 total_results += 1 

338 if total_results >= self.max_results: 

339 break 

340 

341 # Check if there are more results 

342 if not items or total_results >= self.max_results: 

343 break 

344 

345 # Update start index for next request 

346 start_index += len(items) 

347 

348 except Exception as e: 

349 logger.exception("Error getting search results: %s", str(e)) 

350 break 

351 

352 logger.info( 

353 "Retrieved %s search results for query: '%s'", len(results), query 

354 ) 

355 return results 

356 

357 def _get_full_content( 

358 self, relevant_items: List[Dict[str, Any]] 

359 ) -> List[Dict[str, Any]]: 

360 """Get full content for search results""" 

361 # Use the BaseSearchEngine implementation 

362 return super()._get_full_content(relevant_items)