Coverage for src / local_deep_research / web_search_engines / engines / search_engine_google_pse.py: 98%
129 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1from loguru import logger
2import random
3import time
4from typing import Any, Dict, List, Optional
6from langchain_core.language_models import BaseLLM
7from requests.exceptions import RequestException
9from ...security.safe_requests import safe_get
10from ..rate_limiting import RateLimitError
11from ..search_engine_base import BaseSearchEngine
14class GooglePSESearchEngine(BaseSearchEngine):
15 """Google Programmable Search Engine implementation"""
17 # Mark as public search engine
18 is_public = True
19 # Mark as generic search engine (general web search)
20 is_generic = True
22 def __init__(
23 self,
24 max_results: int = 10,
25 region: str = "us",
26 safe_search: bool = True,
27 search_language: str = "English",
28 api_key: Optional[str] = None,
29 search_engine_id: Optional[str] = None,
30 llm: Optional[BaseLLM] = None,
31 include_full_content: bool = False,
32 max_filtered_results: Optional[int] = None,
33 settings_snapshot: Optional[Dict[str, Any]] = None,
34 max_retries: int = 3,
35 retry_delay: float = 2.0,
36 **kwargs,
37 ):
38 """
39 Initialize the Google Programmable Search Engine.
41 Args:
42 max_results: Maximum number of search results
43 region: Region code for search results
44 safe_search: Whether to enable safe search
45 search_language: Language for search results
46 api_key: Google API key (can also be set via LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_API_KEY env var or in UI settings)
47 search_engine_id: Google CSE ID (can also be set via LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_ENGINE_ID env var or in UI settings)
48 llm: Language model for relevance filtering
49 include_full_content: Whether to include full webpage content in results
50 max_filtered_results: Maximum number of results to keep after filtering
51 max_retries: Maximum number of retry attempts for API requests
52 retry_delay: Base delay in seconds between retry attempts
53 **kwargs: Additional parameters (ignored but accepted for compatibility)
54 """
55 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results
56 super().__init__(
57 llm=llm,
58 max_filtered_results=max_filtered_results,
59 max_results=max_results,
60 include_full_content=include_full_content,
61 settings_snapshot=settings_snapshot,
62 **kwargs,
63 )
65 # Google PSE returns full content via its API (snippet + htmlSnippet),
66 # so _init_full_search() is intentionally not called here.
68 # Retry configuration
69 self.max_retries = max_retries
70 self.retry_delay = retry_delay
72 # Rate limiting - keep track of last request time
73 self.last_request_time: float = 0.0
74 self.min_request_interval = (
75 0.5 # Minimum time between requests in seconds
76 )
78 # Language code mapping
79 language_code_mapping = {
80 "english": "en",
81 "spanish": "es",
82 "french": "fr",
83 "german": "de",
84 "italian": "it",
85 "japanese": "ja",
86 "korean": "ko",
87 "portuguese": "pt",
88 "russian": "ru",
89 "chinese": "zh-CN",
90 }
92 # Get language code
93 search_language = search_language.lower()
94 self.language = language_code_mapping.get(search_language, "en")
96 # Safe search setting
97 self.safe = "active" if safe_search else "off"
99 # Region/Country setting
100 self.region = region
102 # API key and Search Engine ID - check params, env vars, or database
103 from ...config.thread_settings import (
104 get_setting_from_snapshot,
105 NoSettingsContextError,
106 )
108 self.api_key = api_key
109 if not self.api_key:
110 try:
111 self.api_key = get_setting_from_snapshot(
112 "search.engine.web.google_pse.api_key",
113 default=None,
114 settings_snapshot=self.settings_snapshot,
115 )
116 except NoSettingsContextError:
117 # No settings context available
118 logger.debug(
119 "No settings context available for Google PSE API key"
120 )
121 pass
123 self.search_engine_id = search_engine_id
124 if not self.search_engine_id:
125 try:
126 self.search_engine_id = get_setting_from_snapshot(
127 "search.engine.web.google_pse.engine_id",
128 default=None,
129 settings_snapshot=self.settings_snapshot,
130 )
131 except NoSettingsContextError:
132 # No settings context available
133 logger.debug(
134 "No settings context available for Google PSE engine ID"
135 )
136 pass
138 if not self.api_key:
139 raise ValueError(
140 "Google API key is required. Set it in the UI settings, use the api_key parameter, or set the LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_API_KEY environment variable."
141 )
142 if not self.search_engine_id:
143 raise ValueError(
144 "Google Search Engine ID is required. Set it in the UI settings, use the search_engine_id parameter, or set the LDR_SEARCH_ENGINE_WEB_GOOGLE_PSE_ENGINE_ID environment variable."
145 )
147 # Validate connection and credentials
148 self._validate_connection()
150 def _validate_connection(self):
151 """Test the connection to ensure API key and Search Engine ID are valid"""
152 try:
153 # Make a minimal test query
154 response = self._make_request("test")
156 # Check if we got a valid response
157 if response.get("error"):
158 error_msg = response["error"].get("message", "Unknown error")
159 raise ValueError(f"Google PSE API error: {error_msg}") # noqa: TRY301 — except only adds logging before re-raise
161 # If we get here, the connection is valid
162 logger.info("Google PSE connection validated successfully")
163 return True
165 except Exception:
166 # Log the error and re-raise
167 logger.exception("Error validating Google PSE connection")
168 raise
170 def _respect_rate_limit(self):
171 """Ensure we don't exceed rate limits by adding appropriate delay between requests"""
172 current_time = time.time()
173 elapsed = current_time - self.last_request_time
175 # If we've made a request recently, wait until the minimum interval has passed
176 if elapsed < self.min_request_interval:
177 sleep_time = self.min_request_interval - elapsed
178 logger.debug("Rate limiting: sleeping for {:.2f} s", sleep_time)
179 time.sleep(sleep_time)
181 # Update the last request time
182 self.last_request_time = time.time()
184 def _make_request(self, query: str, start_index: int = 1) -> Dict:
185 """
186 Make a request to the Google PSE API with retry logic and rate limiting
188 Args:
189 query: Search query string
190 start_index: Starting index for pagination
192 Returns:
193 JSON response from the API
195 Raises:
196 RequestException: If all retry attempts fail
197 """
198 # Base URL for the API
199 url = "https://www.googleapis.com/customsearch/v1"
201 # Parameters for the request
202 params = {
203 "key": self.api_key,
204 "cx": self.search_engine_id,
205 "q": query,
206 "num": min(10, self.max_results), # Max 10 per request
207 "start": start_index,
208 "safe": self.safe,
209 "lr": f"lang_{self.language}",
210 "gl": self.region,
211 }
213 # Implement retry logic with exponential backoff
214 attempt = 0
215 last_exception: Exception | None = None
217 while attempt < self.max_retries:
218 try:
219 # Add jitter to retries after the first attempt
220 if attempt > 0:
221 # Security: random jitter for exponential backoff retry, not security-sensitive
222 jitter = random.uniform(0.5, 1.5)
223 sleep_time = (
224 self.retry_delay * (2 ** (attempt - 1)) * jitter
225 )
226 logger.info(
227 "Retry attempt {} / {} for query '{}'. Waiting {} s",
228 attempt + 1,
229 self.max_retries,
230 query,
231 f"{sleep_time:.2f}",
232 )
233 time.sleep(sleep_time)
235 # Make the request
236 logger.debug(
237 "Making request to Google PSE API: {} (start_index={})",
238 query,
239 start_index,
240 )
241 # Apply rate limiting before request
242 self._last_wait_time = self.rate_tracker.apply_rate_limit(
243 self.engine_type
244 )
246 response = safe_get(url, params=params, timeout=10)
248 # Check for HTTP errors
249 response.raise_for_status()
251 # Return the JSON response
252 return response.json() # type: ignore[no-any-return]
254 except RequestException as e:
255 error_msg = str(e)
256 sanitized = self._sanitize_error_message(error_msg)
257 logger.warning(
258 "Request error on attempt {} / {}: {}",
259 attempt + 1,
260 self.max_retries,
261 sanitized,
262 )
264 # Check for rate limiting patterns
265 if (
266 "quota" in error_msg.lower()
267 or "quotaExceeded" in error_msg
268 or "dailyLimitExceeded" in error_msg
269 or "rateLimitExceeded" in error_msg
270 or "429" in error_msg
271 or "403" in error_msg
272 ):
273 raise RateLimitError(
274 f"Google PSE rate limit/quota exceeded: {sanitized}"
275 )
277 last_exception = e
278 except Exception as e:
279 error_msg = str(e)
280 sanitized = self._sanitize_error_message(error_msg)
281 logger.warning(
282 "Error on attempt {} / {}: {}",
283 attempt + 1,
284 self.max_retries,
285 sanitized,
286 )
288 # Check for rate limiting patterns in general errors
289 if "quota" in error_msg.lower() or "limit" in error_msg.lower():
290 raise RateLimitError(
291 f"Google PSE error (possible rate limit): {sanitized}"
292 )
294 last_exception = e
296 attempt += 1
298 # If we get here, all retries failed
299 error_msg = f"Failed to get response from Google PSE API after {self.max_retries} attempts"
300 logger.error(error_msg)
302 if last_exception: 302 ↛ 306line 302 didn't jump to line 306 because the condition on line 302 was always true
303 raise RequestException(
304 f"{error_msg}: {self._sanitize_error_message(str(last_exception))}"
305 )
306 raise RequestException(error_msg)
308 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
309 """Get search result previews/snippets"""
310 results = []
312 # Google PSE API returns a maximum of 10 results per request
313 # We may need to make multiple requests to get the desired number
314 start_index = 1
315 total_results = 0
317 while total_results < self.max_results: 317 ↛ 361line 317 didn't jump to line 361 because the condition on line 317 was always true
318 try:
319 response = self._make_request(query, start_index)
321 # Break if no items
322 if "items" not in response:
323 break
325 items = response.get("items", [])
327 # Process each result
328 for item in items:
329 title = item.get("title", "")
330 snippet = item.get("snippet", "")
331 url = item.get("link", "")
333 # Skip results without URL
334 if not url:
335 continue
337 results.append(
338 {
339 "title": title,
340 "snippet": snippet,
341 "link": url,
342 "source": "Google Programmable Search",
343 }
344 )
346 total_results += 1
347 if total_results >= self.max_results:
348 break
350 # Check if there are more results
351 if not items or total_results >= self.max_results:
352 break
354 # Update start index for next request
355 start_index += len(items)
357 except Exception:
358 logger.exception("Error getting search results")
359 break
361 logger.info(
362 "Retrieved {} search results for query: '{}'", len(results), query
363 )
364 return results
366 def _get_full_content(
367 self, relevant_items: List[Dict[str, Any]]
368 ) -> List[Dict[str, Any]]:
369 """Get full content for search results"""
370 # Use the BaseSearchEngine implementation
371 return super()._get_full_content(relevant_items)