Coverage for src / local_deep_research / web_search_engines / search_engine_base.py: 91%
257 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import json
2import time
3from abc import ABC, abstractmethod
4from datetime import datetime, UTC
5from typing import Any, Dict, List, Optional
7from langchain_core.language_models import BaseLLM
8from loguru import logger
9from tenacity import (
10 RetryError,
11 retry,
12 retry_if_exception_type,
13 stop_after_attempt,
14)
15from tenacity.wait import wait_base
17from ..advanced_search_system.filters.base_filter import BaseFilter
18from ..utilities.thread_context import set_search_context
20# Lazy import for metrics to avoid database dependencies in programmatic mode
21# from ..metrics.search_tracker import get_search_tracker
22from .rate_limiting import RateLimitError, get_tracker
25class AdaptiveWait(wait_base):
26 """Custom wait strategy that uses adaptive rate limiting."""
28 def __init__(self, get_wait_func):
29 self.get_wait_func = get_wait_func
31 def __call__(self, retry_state):
32 return self.get_wait_func()
35class BaseSearchEngine(ABC):
36 """
37 Abstract base class for search engines with two-phase retrieval capability.
38 Handles common parameters and implements the two-phase search approach.
39 """
41 # Class attribute to indicate if this engine searches public internet sources
42 # Should be overridden by subclasses - defaults to False for safety
43 is_public = False
45 # Class attribute to indicate if this is a generic search engine (vs specialized)
46 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed)
47 is_generic = False
49 # Class attribute to indicate if this is a scientific/academic search engine
50 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc.
51 is_scientific = False
53 # Class attribute to indicate if this is a local RAG/document search engine
54 # Local engines search private document collections stored locally
55 is_local = False
57 # Class attribute to indicate if this is a news search engine
58 # News engines specialize in news articles and current events
59 is_news = False
61 # Class attribute to indicate if this is a code search engine
62 # Code engines specialize in searching code repositories
63 is_code = False
65 @classmethod
66 def _load_engine_class(cls, name: str, config: Dict[str, Any]):
67 """
68 Helper method to load an engine class dynamically.
70 Args:
71 name: Engine name
72 config: Engine configuration dict with module_path and class_name
74 Returns:
75 Tuple of (success: bool, engine_class or None, error_msg or None)
76 """
77 import importlib
79 try:
80 module_path = config.get("module_path")
81 class_name = config.get("class_name")
83 if not module_path or not class_name:
84 return (
85 False,
86 None,
87 f"Missing module_path or class_name for {name}",
88 )
90 # Import the module
91 package = None
92 if module_path.startswith("."): 92 ↛ 95line 92 didn't jump to line 95 because the condition on line 92 was always true
93 # This is a relative import
94 package = "local_deep_research.web_search_engines"
95 module = importlib.import_module(module_path, package=package)
96 engine_class = getattr(module, class_name)
98 return True, engine_class, None
100 except Exception as e:
101 return False, None, f"Could not load engine class for {name}: {e}"
103 @classmethod
104 def _check_api_key_availability(
105 cls, name: str, config: Dict[str, Any]
106 ) -> bool:
107 """
108 Helper method to check if an engine's API key is available and valid.
110 Args:
111 name: Engine name
112 config: Engine configuration dict
114 Returns:
115 True if API key is not required or is available and valid
116 """
117 from loguru import logger
119 if not config.get("requires_api_key", False):
120 return True
122 api_key = config.get("api_key", "").strip()
124 # Check for common placeholder values
125 if (
126 not api_key
127 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"]
128 or api_key.endswith(
129 "_API_KEY"
130 ) # Default placeholders like BRAVE_API_KEY
131 or api_key.startswith("YOUR_")
132 or api_key == "null"
133 ):
134 logger.debug(
135 f"Skipping {name} - requires API key but none configured"
136 )
137 return False
139 return True
141 def __init__(
142 self,
143 llm: Optional[BaseLLM] = None,
144 max_filtered_results: Optional[int] = None,
145 max_results: Optional[int] = 10, # Default value if not provided
146 preview_filters: List[BaseFilter] | None = None,
147 content_filters: List[BaseFilter] | None = None,
148 search_snippets_only: bool = True, # New parameter with default
149 settings_snapshot: Optional[Dict[str, Any]] = None,
150 programmatic_mode: bool = False,
151 **kwargs,
152 ):
153 """
154 Initialize the search engine with common parameters.
156 Args:
157 llm: Optional language model for relevance filtering
158 max_filtered_results: Maximum number of results to keep after filtering
159 max_results: Maximum number of search results to return
160 preview_filters: Filters that will be applied to all previews
161 produced by the search engine, before relevancy checks.
162 content_filters: Filters that will be applied to the full content
163 produced by the search engine, after relevancy checks.
164 search_snippets_only: Whether to return only snippets or full content
165 settings_snapshot: Settings snapshot for configuration
166 programmatic_mode: If True, disables database operations and uses memory-only tracking
167 **kwargs: Additional engine-specific parameters
168 """
169 if max_filtered_results is None:
170 max_filtered_results = 5
171 if max_results is None:
172 max_results = 10
173 self._preview_filters: List[BaseFilter] = preview_filters
174 if self._preview_filters is None:
175 self._preview_filters = []
176 self._content_filters: List[BaseFilter] = content_filters
177 if self._content_filters is None:
178 self._content_filters = []
180 self.llm = llm # LLM for relevance filtering
181 self._max_filtered_results = int(
182 max_filtered_results
183 ) # Ensure it's an integer
184 self._max_results = max(
185 1, int(max_results)
186 ) # Ensure it's a positive integer
187 self.search_snippets_only = search_snippets_only # Store the setting
188 self.settings_snapshot = (
189 settings_snapshot or {}
190 ) # Store settings snapshot
191 self.programmatic_mode = programmatic_mode
193 # Rate limiting attributes
194 self.engine_type = self.__class__.__name__
195 # Create a tracker with our settings if in programmatic mode
196 if self.programmatic_mode:
197 from .rate_limiting.tracker import AdaptiveRateLimitTracker
199 self.rate_tracker = AdaptiveRateLimitTracker(
200 settings_snapshot=self.settings_snapshot,
201 programmatic_mode=self.programmatic_mode,
202 )
203 else:
204 self.rate_tracker = get_tracker()
205 self._last_wait_time = (
206 0.0 # Default to 0 for successful searches without rate limiting
207 )
208 self._last_results_count = 0
210 @property
211 def max_filtered_results(self) -> int:
212 """Get the maximum number of filtered results."""
213 return self._max_filtered_results
215 @max_filtered_results.setter
216 def max_filtered_results(self, value: int) -> None:
217 """Set the maximum number of filtered results."""
218 if value is None:
219 value = 5
220 logger.warning("Setting max_filtered_results to 5")
221 self._max_filtered_results = int(value)
223 @property
224 def max_results(self) -> int:
225 """Get the maximum number of search results."""
226 return self._max_results
228 @max_results.setter
229 def max_results(self, value: int) -> None:
230 """Set the maximum number of search results."""
231 if value is None:
232 value = 10
233 self._max_results = max(1, int(value))
235 def _get_adaptive_wait(self) -> float:
236 """Get adaptive wait time from tracker."""
237 wait_time = self.rate_tracker.get_wait_time(self.engine_type)
238 self._last_wait_time = wait_time
239 logger.debug(
240 f"{self.engine_type} waiting {wait_time:.2f}s before retry"
241 )
242 return wait_time
244 def _record_retry_outcome(self, retry_state) -> None:
245 """Record outcome after retry completes."""
246 success = (
247 not retry_state.outcome.failed if retry_state.outcome else False
248 )
249 self.rate_tracker.record_outcome(
250 self.engine_type,
251 self._last_wait_time or 0,
252 success,
253 retry_state.attempt_number,
254 error_type="RateLimitError" if not success else None,
255 search_result_count=self._last_results_count if success else 0,
256 )
258 def run(
259 self, query: str, research_context: Dict[str, Any] | None = None
260 ) -> List[Dict[str, Any]]:
261 """
262 Run the search engine with a given query, retrieving and filtering results.
263 This implements a two-phase retrieval approach:
264 1. Get preview information for many results
265 2. Filter the previews for relevance
266 3. Get full content for only the relevant results
268 Args:
269 query: The search query
270 research_context: Context from previous research to use.
272 Returns:
273 List of search results with full content (if available)
274 """
275 # Track search call for metrics (if available and not in programmatic mode)
276 tracker = None
277 if not self.programmatic_mode:
278 from ..metrics.search_tracker import get_search_tracker
280 # For thread-safe context propagation: if we have research_context parameter, use it
281 # Otherwise, try to inherit from current thread context (normal case)
282 # This allows strategies running in threads to explicitly pass context when needed
283 if research_context:
284 # Explicit context provided - use it and set it for this thread
285 set_search_context(research_context)
287 # Get tracker after context is set (either from parameter or thread)
288 tracker = get_search_tracker()
290 engine_name = self.__class__.__name__.replace(
291 "SearchEngine", ""
292 ).lower()
293 start_time = time.time()
295 success = True
296 error_message = None
297 results_count = 0
299 # Define the core search function with retry logic
300 if self.rate_tracker.enabled:
301 # Rate limiting enabled - use retry with adaptive wait
302 @retry(
303 stop=stop_after_attempt(3),
304 wait=AdaptiveWait(lambda: self._get_adaptive_wait()),
305 retry=retry_if_exception_type((RateLimitError,)),
306 after=self._record_retry_outcome,
307 reraise=True,
308 )
309 def _run_with_retry():
310 nonlocal success, error_message, results_count
311 return _execute_search()
312 else:
313 # Rate limiting disabled - run without retry
314 def _run_with_retry():
315 nonlocal success, error_message, results_count
316 return _execute_search()
318 def _execute_search():
319 nonlocal success, error_message, results_count
321 try:
322 # Step 1: Get preview information for items
323 previews = self._get_previews(query)
324 if not previews:
325 logger.info(
326 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}"
327 )
328 results_count = 0
329 return []
331 for preview_filter in self._preview_filters:
332 previews = preview_filter.filter_results(previews, query)
334 # Step 2: Filter previews for relevance with LLM
335 # Check if LLM relevance filtering should be enabled
336 enable_llm_filter = getattr(
337 self, "enable_llm_relevance_filter", False
338 )
340 logger.info(
341 f"BaseSearchEngine: Relevance filter check - enable_llm_relevance_filter={enable_llm_filter}, has_llm={self.llm is not None}, engine_type={type(self).__name__}"
342 )
344 if enable_llm_filter and self.llm:
345 logger.info(
346 f"Applying LLM relevance filter to {len(previews)} previews"
347 )
348 filtered_items = self._filter_for_relevance(previews, query)
349 logger.info(
350 f"LLM filter kept {len(filtered_items)} of {len(previews)} results"
351 )
352 else:
353 filtered_items = previews
354 if not enable_llm_filter: 354 ↛ 358line 354 didn't jump to line 358 because the condition on line 354 was always true
355 logger.info(
356 f"LLM relevance filtering disabled (enable_llm_relevance_filter={enable_llm_filter}) - returning all {len(previews)} previews"
357 )
358 elif not self.llm:
359 logger.info(
360 f"No LLM available for relevance filtering - returning all {len(previews)} previews"
361 )
363 # Step 3: Get full content for filtered items
364 if self.search_snippets_only:
365 logger.info("Returning snippet-only results as per config")
366 results = filtered_items
367 else:
368 results = self._get_full_content(filtered_items)
370 for content_filter in self._content_filters:
371 results = content_filter.filter_results(results, query)
373 results_count = len(results)
374 self._last_results_count = results_count
376 # Record success if we get here and rate limiting is enabled
377 if self.rate_tracker.enabled:
378 logger.info(
379 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}"
380 )
381 self.rate_tracker.record_outcome(
382 self.engine_type,
383 self._last_wait_time,
384 success=True,
385 retry_count=1, # First attempt succeeded
386 search_result_count=results_count,
387 )
388 else:
389 logger.info(
390 f"Rate limiting disabled, not recording search for {self.engine_type}"
391 )
393 return results
395 except RateLimitError:
396 # Only re-raise if rate limiting is enabled
397 if self.rate_tracker.enabled: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 raise
399 else:
400 # If rate limiting is disabled, treat as regular error
401 success = False
402 error_message = "Rate limit hit but rate limiting disabled"
403 logger.warning(
404 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled"
405 )
406 results_count = 0
407 return []
408 except Exception as e:
409 # Other errors - don't retry
410 success = False
411 error_message = str(e)
412 logger.exception(
413 f"Search engine {self.__class__.__name__} failed"
414 )
415 results_count = 0
416 return []
418 try:
419 return _run_with_retry()
420 except RetryError as e:
421 # All retries exhausted
422 success = False
423 error_message = f"Rate limited after all retries: {e}"
424 logger.exception(
425 f"{self.__class__.__name__} failed after all retries"
426 )
427 return []
428 except Exception as e:
429 success = False
430 error_message = str(e)
431 logger.exception(f"Search engine {self.__class__.__name__} error")
432 return []
433 finally:
434 # Record search metrics (if tracking is available)
435 if tracker is not None:
436 response_time_ms = int((time.time() - start_time) * 1000)
437 tracker.record_search(
438 engine_name=engine_name,
439 query=query,
440 results_count=results_count,
441 response_time_ms=response_time_ms,
442 success=success,
443 error_message=error_message,
444 )
446 def invoke(self, query: str) -> List[Dict[str, Any]]:
447 """Compatibility method for LangChain tools"""
448 return self.run(query)
450 def _filter_for_relevance(
451 self, previews: List[Dict[str, Any]], query: str
452 ) -> List[Dict[str, Any]]:
453 """
454 Filter search results by relevance to the query using the LLM.
456 Args:
457 previews: List of preview dictionaries
458 query: The original search query
460 Returns:
461 Filtered list of preview dictionaries
462 """
463 # If no LLM or too few previews, return all
464 if not self.llm or len(previews) <= 1:
465 return previews
467 # Log the number of previews we're processing
468 logger.info(f"Filtering {len(previews)} previews for relevance")
470 # Create a simple context for LLM
471 preview_context = []
472 indices_used = []
473 for i, preview in enumerate(previews):
474 title = preview.get("title", "Untitled").strip()
475 snippet = preview.get("snippet", "").strip()
476 url = preview.get("url", "").strip()
478 # Clean up snippet if too long
479 if len(snippet) > 300:
480 snippet = snippet[:300] + "..."
482 preview_context.append(
483 f"[{i}] Title: {title}\nURL: {url}\nSnippet: {snippet}"
484 )
485 indices_used.append(i)
487 # Log the indices we're presenting to the LLM
488 logger.info(
489 f"Created preview context with indices 0-{len(previews) - 1}"
490 )
491 logger.info(
492 f"First 5 indices in prompt: {indices_used[:5]}, Last 5: {indices_used[-5:] if len(indices_used) > 5 else 'N/A'}"
493 )
495 # Join all previews with clear separation
496 preview_text = "\n\n".join(preview_context)
498 # Log a sample of what we're sending to the LLM
499 logger.debug(
500 f"First preview in prompt: {preview_context[0] if preview_context else 'None'}"
501 )
502 if len(preview_context) > 1: 502 ↛ 506line 502 didn't jump to line 506 because the condition on line 502 was always true
503 logger.debug(f"Last preview in prompt: {preview_context[-1]}")
505 # Set a reasonable limit on context length
506 current_date = datetime.now(UTC).strftime("%Y-%m-%d")
507 prompt = f"""Analyze these search results and select the most relevant ones for the query.
509Query: "{query}"
510Current date: {current_date}
511Total results: {len(previews)}
513Criteria for selection (in order of importance):
5141. Direct relevance - MUST directly address the specific query topic, not just mention keywords
5152. Quality - from reputable sources with substantive information about the query
5163. Recency - prefer recent information when relevant
518Search results to evaluate:
519{preview_text}
521Return a JSON array of indices (0-based) for results that are highly relevant to the query.
522Valid indices are 0 to {len(previews) - 1}.
523Only include results that directly help answer the specific question asked.
524Be selective - it's better to return fewer high-quality results than many mediocre ones.
525Maximum results to return: {self.max_filtered_results}
526Example response: [4, 0, 2, 7]
528Respond with ONLY the JSON array, no other text."""
530 try:
531 # Get LLM's evaluation
532 response = self.llm.invoke(prompt)
534 # Log the raw response for debugging
535 logger.info(f"Raw LLM response for relevance filtering: {response}")
537 # Handle different response formats
538 response_text = ""
539 if hasattr(response, "content"): 539 ↛ 542line 539 didn't jump to line 542 because the condition on line 539 was always true
540 response_text = response.content
541 else:
542 response_text = str(response)
544 # Clean up response
545 response_text = response_text.strip()
546 logger.debug(f"Cleaned response text: {response_text}")
548 # Find JSON array in response
549 start_idx = response_text.find("[")
550 end_idx = response_text.rfind("]")
552 if start_idx >= 0 and end_idx > start_idx:
553 array_text = response_text[start_idx : end_idx + 1]
554 try:
555 ranked_indices = json.loads(array_text)
556 logger.info(f"LLM returned indices: {ranked_indices}")
558 # Validate that ranked_indices is a list of integers
559 if not isinstance(ranked_indices, list): 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 logger.warning(
561 "LLM response is not a list, returning empty results"
562 )
563 return []
565 if not all(isinstance(idx, int) for idx in ranked_indices):
566 logger.warning(
567 "LLM response contains non-integer indices, returning empty results"
568 )
569 return []
571 # Log analysis of the indices
572 max_index = max(ranked_indices) if ranked_indices else -1
573 min_index = min(ranked_indices) if ranked_indices else -1
574 logger.info(
575 f"Index analysis: min={min_index}, max={max_index}, "
576 f"valid_range=0-{len(previews) - 1}, count={len(ranked_indices)}"
577 )
579 # Return the results in ranked order
580 ranked_results = []
581 out_of_range = []
582 for idx in ranked_indices:
583 if 0 <= idx < len(previews):
584 ranked_results.append(previews[idx])
585 else:
586 out_of_range.append(idx)
587 logger.warning(
588 f"Index {idx} out of range (valid: 0-{len(previews) - 1}), skipping"
589 )
591 if out_of_range:
592 logger.error(
593 f"Out of range indices: {out_of_range}. "
594 f"Total previews: {len(previews)}, "
595 f"All returned indices: {ranked_indices}"
596 )
598 # Limit to max_filtered_results if specified
599 if (
600 self.max_filtered_results
601 and len(ranked_results) > self.max_filtered_results
602 ):
603 logger.info(
604 f"Limiting filtered results to top {self.max_filtered_results}"
605 )
606 return ranked_results[: self.max_filtered_results]
608 return ranked_results
610 except json.JSONDecodeError as e:
611 logger.warning(
612 f"Failed to parse JSON from LLM response: {e}"
613 )
614 logger.debug(f"Problematic JSON text: {array_text}")
615 return []
616 else:
617 logger.warning(
618 "Could not find JSON array in response, returning original previews"
619 )
620 logger.debug(
621 f"Response text without JSON array: {response_text}"
622 )
623 return previews[: min(5, len(previews))]
625 except Exception:
626 logger.exception("Relevance filtering error")
627 # Fall back to returning top results on error
628 return previews[: min(5, len(previews))]
630 @abstractmethod
631 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
632 """
633 Get preview information (titles, summaries) for initial search results.
635 Args:
636 query: The search query
638 Returns:
639 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys
640 """
641 pass
643 @abstractmethod
644 def _get_full_content(
645 self, relevant_items: List[Dict[str, Any]]
646 ) -> List[Dict[str, Any]]:
647 """
648 Get full content for the relevant items.
650 Args:
651 relevant_items: List of relevant preview dictionaries
653 Returns:
654 List of result dictionaries with full content
655 """
656 pass