Coverage for src / local_deep_research / web_search_engines / engines / search_engine_google_pse.py: 80%
128 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1from loguru import logger
2import random
3import time
4from typing import Any, Dict, List, Optional
6from langchain_core.language_models import BaseLLM
7from requests.exceptions import RequestException
9from ...security.safe_requests import safe_get
10from ..rate_limiting import RateLimitError
11from ..search_engine_base import BaseSearchEngine
14class GooglePSESearchEngine(BaseSearchEngine):
15 """Google Programmable Search Engine implementation"""
17 # Mark as public search engine
18 is_public = True
19 # Mark as generic search engine (general web search)
20 is_generic = True
22 def __init__(
23 self,
24 max_results: int = 10,
25 region: str = "us",
26 safe_search: bool = True,
27 search_language: str = "English",
28 api_key: Optional[str] = None,
29 search_engine_id: Optional[str] = None,
30 llm: Optional[BaseLLM] = None,
31 include_full_content: bool = False,
32 max_filtered_results: Optional[int] = None,
33 max_retries: int = 3,
34 retry_delay: float = 2.0,
35 **kwargs,
36 ):
37 """
38 Initialize the Google Programmable Search Engine.
40 Args:
41 max_results: Maximum number of search results
42 region: Region code for search results
43 safe_search: Whether to enable safe search
44 search_language: Language for search results
45 api_key: Google API key (can also be set in GOOGLE_PSE_API_KEY env)
46 search_engine_id: Google CSE ID (can also be set in GOOGLE_PSE_ENGINE_ID env)
47 llm: Language model for relevance filtering
48 include_full_content: Whether to include full webpage content in results
49 max_filtered_results: Maximum number of results to keep after filtering
50 max_retries: Maximum number of retry attempts for API requests
51 retry_delay: Base delay in seconds between retry attempts
52 **kwargs: Additional parameters (ignored but accepted for compatibility)
53 """
54 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results
55 super().__init__(
56 llm=llm,
57 max_filtered_results=max_filtered_results,
58 max_results=max_results,
59 **kwargs,
60 )
61 self.include_full_content = include_full_content
63 # Retry configuration
64 self.max_retries = max_retries
65 self.retry_delay = retry_delay
67 # Rate limiting - keep track of last request time
68 self.last_request_time = 0
69 self.min_request_interval = (
70 0.5 # Minimum time between requests in seconds
71 )
73 # Language code mapping
74 language_code_mapping = {
75 "english": "en",
76 "spanish": "es",
77 "french": "fr",
78 "german": "de",
79 "italian": "it",
80 "japanese": "ja",
81 "korean": "ko",
82 "portuguese": "pt",
83 "russian": "ru",
84 "chinese": "zh-CN",
85 }
87 # Get language code
88 search_language = search_language.lower()
89 self.language = language_code_mapping.get(search_language, "en")
91 # Safe search setting
92 self.safe = "active" if safe_search else "off"
94 # Region/Country setting
95 self.region = region
97 # API key and Search Engine ID - check params, env vars, or database
98 from ...config.thread_settings import (
99 get_setting_from_snapshot,
100 NoSettingsContextError,
101 )
103 self.api_key = api_key
104 if not self.api_key:
105 try:
106 self.api_key = get_setting_from_snapshot(
107 "search.engine.web.google_pse.api_key",
108 default=None,
109 settings_snapshot=self.settings_snapshot,
110 )
111 except NoSettingsContextError:
112 # No settings context available
113 logger.debug(
114 "No settings context available for Google PSE API key"
115 )
116 pass
118 self.search_engine_id = search_engine_id
119 if not self.search_engine_id:
120 try:
121 self.search_engine_id = get_setting_from_snapshot(
122 "search.engine.web.google_pse.engine_id",
123 default=None,
124 settings_snapshot=self.settings_snapshot,
125 )
126 except NoSettingsContextError:
127 # No settings context available
128 logger.debug(
129 "No settings context available for Google PSE engine ID"
130 )
131 pass
133 if not self.api_key:
134 raise ValueError(
135 "Google API key is required. Set it in the UI settings, use the api_key parameter, or set the GOOGLE_PSE_API_KEY environment variable."
136 )
137 if not self.search_engine_id:
138 raise ValueError(
139 "Google Search Engine ID is required. Set it in the UI settings, use the search_engine_id parameter, or set the GOOGLE_PSE_ENGINE_ID environment variable."
140 )
142 # Validate connection and credentials
143 self._validate_connection()
145 def _validate_connection(self):
146 """Test the connection to ensure API key and Search Engine ID are valid"""
147 try:
148 # Make a minimal test query
149 response = self._make_request("test")
151 # Check if we got a valid response
152 if response.get("error"):
153 error_msg = response["error"].get("message", "Unknown error")
154 raise ValueError(f"Google PSE API error: {error_msg}")
156 # If we get here, the connection is valid
157 logger.info("Google PSE connection validated successfully")
158 return True
160 except Exception:
161 # Log the error and re-raise
162 logger.exception("Error validating Google PSE connection")
163 raise
165 def _respect_rate_limit(self):
166 """Ensure we don't exceed rate limits by adding appropriate delay between requests"""
167 current_time = time.time()
168 elapsed = current_time - self.last_request_time
170 # If we've made a request recently, wait until the minimum interval has passed
171 if elapsed < self.min_request_interval:
172 sleep_time = self.min_request_interval - elapsed
173 logger.debug("Rate limiting: sleeping for %.2f s", sleep_time)
174 time.sleep(sleep_time)
176 # Update the last request time
177 self.last_request_time = time.time()
179 def _make_request(self, query: str, start_index: int = 1) -> Dict:
180 """
181 Make a request to the Google PSE API with retry logic and rate limiting
183 Args:
184 query: Search query string
185 start_index: Starting index for pagination
187 Returns:
188 JSON response from the API
190 Raises:
191 RequestException: If all retry attempts fail
192 """
193 # Base URL for the API
194 url = "https://www.googleapis.com/customsearch/v1"
196 # Parameters for the request
197 params = {
198 "key": self.api_key,
199 "cx": self.search_engine_id,
200 "q": query,
201 "num": min(10, self.max_results), # Max 10 per request
202 "start": start_index,
203 "safe": self.safe,
204 "lr": f"lang_{self.language}",
205 "gl": self.region,
206 }
208 # Implement retry logic with exponential backoff
209 attempt = 0
210 last_exception = None
212 while attempt < self.max_retries:
213 try:
214 # Add jitter to retries after the first attempt
215 if attempt > 0: 215 ↛ 217line 215 didn't jump to line 217 because the condition on line 215 was never true
216 # Security: random jitter for exponential backoff retry, not security-sensitive
217 jitter = random.uniform(0.5, 1.5)
218 sleep_time = (
219 self.retry_delay * (2 ** (attempt - 1)) * jitter
220 )
221 logger.info(
222 "Retry attempt %s / %s for query '%s'. Waiting %s s",
223 attempt + 1,
224 self.max_retries,
225 query,
226 f"{sleep_time:.2f}",
227 )
228 time.sleep(sleep_time)
230 # Make the request
231 logger.debug(
232 "Making request to Google PSE API: %s (start_index=%s)",
233 query,
234 start_index,
235 )
236 # Apply rate limiting before request
237 self._last_wait_time = self.rate_tracker.apply_rate_limit(
238 self.engine_type
239 )
241 response = safe_get(url, params=params, timeout=10)
243 # Check for HTTP errors
244 response.raise_for_status()
246 # Return the JSON response
247 return response.json()
249 except RequestException as e:
250 error_msg = str(e)
251 logger.warning(
252 "Request error on attempt %s / %s: %s",
253 attempt + 1,
254 self.max_retries,
255 error_msg,
256 )
258 # Check for rate limiting patterns
259 if (
260 "quota" in error_msg.lower()
261 or "quotaExceeded" in error_msg
262 or "dailyLimitExceeded" in error_msg
263 or "rateLimitExceeded" in error_msg
264 or "429" in error_msg
265 or "403" in error_msg
266 ):
267 raise RateLimitError(
268 f"Google PSE rate limit/quota exceeded: {error_msg}"
269 )
271 last_exception = e
272 except Exception as e:
273 error_msg = str(e)
274 logger.warning(
275 "Error on attempt %s / %s: %s",
276 attempt + 1,
277 self.max_retries,
278 error_msg,
279 )
281 # Check for rate limiting patterns in general errors
282 if "quota" in error_msg.lower() or "limit" in error_msg.lower():
283 raise RateLimitError(
284 f"Google PSE error (possible rate limit): {error_msg}"
285 )
287 last_exception = e
289 attempt += 1
291 # If we get here, all retries failed
292 error_msg = f"Failed to get response from Google PSE API after {self.max_retries} attempts"
293 logger.error(error_msg)
295 if last_exception: 295 ↛ 298line 295 didn't jump to line 298 because the condition on line 295 was always true
296 raise RequestException(f"{error_msg}: {last_exception!s}")
297 else:
298 raise RequestException(error_msg)
300 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
301 """Get search result previews/snippets"""
302 results = []
304 # Google PSE API returns a maximum of 10 results per request
305 # We may need to make multiple requests to get the desired number
306 start_index = 1
307 total_results = 0
309 while total_results < self.max_results: 309 ↛ 353line 309 didn't jump to line 353 because the condition on line 309 was always true
310 try:
311 response = self._make_request(query, start_index)
313 # Break if no items
314 if "items" not in response:
315 break
317 items = response.get("items", [])
319 # Process each result
320 for item in items:
321 title = item.get("title", "")
322 snippet = item.get("snippet", "")
323 url = item.get("link", "")
325 # Skip results without URL
326 if not url:
327 continue
329 results.append(
330 {
331 "title": title,
332 "snippet": snippet,
333 "link": url,
334 "source": "Google Programmable Search",
335 }
336 )
338 total_results += 1
339 if total_results >= self.max_results:
340 break
342 # Check if there are more results
343 if not items or total_results >= self.max_results:
344 break
346 # Update start index for next request
347 start_index += len(items)
349 except Exception:
350 logger.exception("Error getting search results")
351 break
353 logger.info(
354 "Retrieved %s search results for query: '%s'", len(results), query
355 )
356 return results
358 def _get_full_content(
359 self, relevant_items: List[Dict[str, Any]]
360 ) -> List[Dict[str, Any]]:
361 """Get full content for search results"""
362 # Use the BaseSearchEngine implementation
363 return super()._get_full_content(relevant_items)