Coverage for src / local_deep_research / web_search_engines / engines / search_engine_google_pse.py: 75%
128 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1from loguru import logger
2import random
3import time
4from typing import Any, Dict, List, Optional
6from langchain_core.language_models import BaseLLM
7from requests.exceptions import RequestException
9from ...security.safe_requests import safe_get
10from ..rate_limiting import RateLimitError
11from ..search_engine_base import BaseSearchEngine
14class GooglePSESearchEngine(BaseSearchEngine):
15 """Google Programmable Search Engine implementation"""
17 # Mark as public search engine
18 is_public = True
19 # Mark as generic search engine (general web search)
20 is_generic = True
22 def __init__(
23 self,
24 max_results: int = 10,
25 region: str = "us",
26 safe_search: bool = True,
27 search_language: str = "English",
28 api_key: Optional[str] = None,
29 search_engine_id: Optional[str] = None,
30 llm: Optional[BaseLLM] = None,
31 include_full_content: bool = False,
32 max_filtered_results: Optional[int] = None,
33 max_retries: int = 3,
34 retry_delay: float = 2.0,
35 **kwargs,
36 ):
37 """
38 Initialize the Google Programmable Search Engine.
40 Args:
41 max_results: Maximum number of search results
42 region: Region code for search results
43 safe_search: Whether to enable safe search
44 search_language: Language for search results
45 api_key: Google API key (can also be set in GOOGLE_PSE_API_KEY env)
46 search_engine_id: Google CSE ID (can also be set in GOOGLE_PSE_ENGINE_ID env)
47 llm: Language model for relevance filtering
48 include_full_content: Whether to include full webpage content in results
49 max_filtered_results: Maximum number of results to keep after filtering
50 max_retries: Maximum number of retry attempts for API requests
51 retry_delay: Base delay in seconds between retry attempts
52 **kwargs: Additional parameters (ignored but accepted for compatibility)
53 """
54 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results
55 super().__init__(
56 llm=llm,
57 max_filtered_results=max_filtered_results,
58 max_results=max_results,
59 **kwargs,
60 )
61 self.include_full_content = include_full_content
63 # Retry configuration
64 self.max_retries = max_retries
65 self.retry_delay = retry_delay
67 # Rate limiting - keep track of last request time
68 self.last_request_time = 0
69 self.min_request_interval = (
70 0.5 # Minimum time between requests in seconds
71 )
73 # Language code mapping
74 language_code_mapping = {
75 "english": "en",
76 "spanish": "es",
77 "french": "fr",
78 "german": "de",
79 "italian": "it",
80 "japanese": "ja",
81 "korean": "ko",
82 "portuguese": "pt",
83 "russian": "ru",
84 "chinese": "zh-CN",
85 }
87 # Get language code
88 search_language = search_language.lower()
89 self.language = language_code_mapping.get(search_language, "en")
91 # Safe search setting
92 self.safe = "active" if safe_search else "off"
94 # Region/Country setting
95 self.region = region
97 # API key and Search Engine ID - check params, env vars, or database
98 from ...config.thread_settings import (
99 get_setting_from_snapshot,
100 NoSettingsContextError,
101 )
103 self.api_key = api_key
104 if not self.api_key:
105 try:
106 self.api_key = get_setting_from_snapshot(
107 "search.engine.web.google_pse.api_key",
108 default=None,
109 settings_snapshot=self.settings_snapshot,
110 )
111 except NoSettingsContextError:
112 # No settings context available
113 logger.debug(
114 "No settings context available for Google PSE API key"
115 )
116 pass
118 self.search_engine_id = search_engine_id
119 if not self.search_engine_id:
120 try:
121 self.search_engine_id = get_setting_from_snapshot(
122 "search.engine.web.google_pse.engine_id",
123 default=None,
124 settings_snapshot=self.settings_snapshot,
125 )
126 except NoSettingsContextError:
127 # No settings context available
128 logger.debug(
129 "No settings context available for Google PSE engine ID"
130 )
131 pass
133 if not self.api_key:
134 raise ValueError(
135 "Google API key is required. Set it in the UI settings, use the api_key parameter, or set the GOOGLE_PSE_API_KEY environment variable."
136 )
137 if not self.search_engine_id:
138 raise ValueError(
139 "Google Search Engine ID is required. Set it in the UI settings, use the search_engine_id parameter, or set the GOOGLE_PSE_ENGINE_ID environment variable."
140 )
142 # Validate connection and credentials
143 self._validate_connection()
145 def _validate_connection(self):
146 """Test the connection to ensure API key and Search Engine ID are valid"""
147 try:
148 # Make a minimal test query
149 response = self._make_request("test")
151 # Check if we got a valid response
152 if response.get("error"): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 error_msg = response["error"].get("message", "Unknown error")
154 raise ValueError(f"Google PSE API error: {error_msg}")
156 # If we get here, the connection is valid
157 logger.info("Google PSE connection validated successfully")
158 return True
160 except Exception as e:
161 # Log the error and re-raise
162 logger.exception(f"Error validating Google PSE connection: {e!s}")
163 raise
165 def _respect_rate_limit(self):
166 """Ensure we don't exceed rate limits by adding appropriate delay between requests"""
167 current_time = time.time()
168 elapsed = current_time - self.last_request_time
170 # If we've made a request recently, wait until the minimum interval has passed
171 if elapsed < self.min_request_interval:
172 sleep_time = self.min_request_interval - elapsed
173 logger.debug("Rate limiting: sleeping for %.2f s", sleep_time)
174 time.sleep(sleep_time)
176 # Update the last request time
177 self.last_request_time = time.time()
179 def _make_request(self, query: str, start_index: int = 1) -> Dict:
180 """
181 Make a request to the Google PSE API with retry logic and rate limiting
183 Args:
184 query: Search query string
185 start_index: Starting index for pagination
187 Returns:
188 JSON response from the API
190 Raises:
191 RequestException: If all retry attempts fail
192 """
193 # Base URL for the API
194 url = "https://www.googleapis.com/customsearch/v1"
196 # Parameters for the request
197 params = {
198 "key": self.api_key,
199 "cx": self.search_engine_id,
200 "q": query,
201 "num": min(10, self.max_results), # Max 10 per request
202 "start": start_index,
203 "safe": self.safe,
204 "lr": f"lang_{self.language}",
205 "gl": self.region,
206 }
208 # Implement retry logic with exponential backoff
209 attempt = 0
210 last_exception = None
212 while attempt < self.max_retries:
213 try:
214 # Add jitter to retries after the first attempt
215 if attempt > 0: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 jitter = random.uniform(0.5, 1.5)
217 sleep_time = (
218 self.retry_delay * (2 ** (attempt - 1)) * jitter
219 )
220 logger.info(
221 "Retry attempt %s / %s for query '%s'. Waiting %s s",
222 attempt + 1,
223 self.max_retries,
224 query,
225 f"{sleep_time:.2f}",
226 )
227 time.sleep(sleep_time)
229 # Make the request
230 logger.debug(
231 "Making request to Google PSE API: %s (start_index=%s)",
232 query,
233 start_index,
234 )
235 # Apply rate limiting before request
236 self._last_wait_time = self.rate_tracker.apply_rate_limit(
237 self.engine_type
238 )
240 response = safe_get(url, params=params, timeout=10)
242 # Check for HTTP errors
243 response.raise_for_status()
245 # Return the JSON response
246 return response.json()
248 except RequestException as e:
249 error_msg = str(e)
250 logger.warning(
251 "Request error on attempt %s / %s: %s",
252 attempt + 1,
253 self.max_retries,
254 error_msg,
255 )
257 # Check for rate limiting patterns
258 if (
259 "quota" in error_msg.lower()
260 or "quotaExceeded" in error_msg
261 or "dailyLimitExceeded" in error_msg
262 or "rateLimitExceeded" in error_msg
263 or "429" in error_msg
264 or "403" in error_msg
265 ):
266 raise RateLimitError(
267 f"Google PSE rate limit/quota exceeded: {error_msg}"
268 )
270 last_exception = e
271 except Exception as e:
272 error_msg = str(e)
273 logger.warning(
274 "Error on attempt %s / %s: %s",
275 attempt + 1,
276 self.max_retries,
277 error_msg,
278 )
280 # Check for rate limiting patterns in general errors
281 if "quota" in error_msg.lower() or "limit" in error_msg.lower():
282 raise RateLimitError(
283 f"Google PSE error (possible rate limit): {error_msg}"
284 )
286 last_exception = e
288 attempt += 1
290 # If we get here, all retries failed
291 error_msg = f"Failed to get response from Google PSE API after {self.max_retries} attempts"
292 logger.error(error_msg)
294 if last_exception: 294 ↛ 297line 294 didn't jump to line 297 because the condition on line 294 was always true
295 raise RequestException(f"{error_msg}: {last_exception!s}")
296 else:
297 raise RequestException(error_msg)
299 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
300 """Get search result previews/snippets"""
301 results = []
303 # Google PSE API returns a maximum of 10 results per request
304 # We may need to make multiple requests to get the desired number
305 start_index = 1
306 total_results = 0
308 while total_results < self.max_results: 308 ↛ 352line 308 didn't jump to line 352 because the condition on line 308 was always true
309 try:
310 response = self._make_request(query, start_index)
312 # Break if no items
313 if "items" not in response:
314 break
316 items = response.get("items", [])
318 # Process each result
319 for item in items:
320 title = item.get("title", "")
321 snippet = item.get("snippet", "")
322 url = item.get("link", "")
324 # Skip results without URL
325 if not url:
326 continue
328 results.append(
329 {
330 "title": title,
331 "snippet": snippet,
332 "link": url,
333 "source": "Google Programmable Search",
334 }
335 )
337 total_results += 1
338 if total_results >= self.max_results:
339 break
341 # Check if there are more results
342 if not items or total_results >= self.max_results:
343 break
345 # Update start index for next request
346 start_index += len(items)
348 except Exception as e:
349 logger.exception("Error getting search results: %s", str(e))
350 break
352 logger.info(
353 "Retrieved %s search results for query: '%s'", len(results), query
354 )
355 return results
357 def _get_full_content(
358 self, relevant_items: List[Dict[str, Any]]
359 ) -> List[Dict[str, Any]]:
360 """Get full content for search results"""
361 # Use the BaseSearchEngine implementation
362 return super()._get_full_content(relevant_items)