Coverage for src / local_deep_research / web_search_engines / search_engine_base.py: 95%
281 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import json
2import time
3from abc import ABC, abstractmethod
4from datetime import datetime, UTC
5from typing import Any, Dict, List, Optional
7from langchain_core.language_models import BaseLLM
8from loguru import logger
9from tenacity import (
10 RetryError,
11 retry,
12 retry_if_exception_type,
13 stop_after_attempt,
14)
15from tenacity.wait import wait_base
17from ..advanced_search_system.filters.base_filter import BaseFilter
18from ..utilities.json_utils import extract_json, get_llm_response_text
19from ..utilities.thread_context import clear_search_context, set_search_context
21# Lazy import for metrics to avoid database dependencies in programmatic mode
22# from ..metrics.search_tracker import get_search_tracker
23from .rate_limiting import RateLimitError, get_tracker
26class AdaptiveWait(wait_base):
27 """Custom wait strategy that uses adaptive rate limiting."""
29 def __init__(self, get_wait_func):
30 self.get_wait_func = get_wait_func
32 def __call__(self, retry_state):
33 return self.get_wait_func()
36class BaseSearchEngine(ABC):
37 """
38 Abstract base class for search engines with two-phase retrieval capability.
39 Handles common parameters and implements the two-phase search approach.
40 """
42 # Class attribute to indicate if this engine searches public internet sources
43 # Should be overridden by subclasses - defaults to False for safety
44 is_public = False
46 # Class attribute to indicate if this is a generic search engine (vs specialized)
47 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed)
48 is_generic = False
50 # Class attribute to indicate if this is a scientific/academic search engine
51 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc.
52 is_scientific = False
54 # Class attribute to indicate if this is a local RAG/document search engine
55 # Local engines search private document collections stored locally
56 is_local = False
58 # Class attribute to indicate if this is a news search engine
59 # News engines specialize in news articles and current events
60 is_news = False
62 # Class attribute to indicate if this is a code search engine
63 # Code engines specialize in searching code repositories
64 is_code = False
66 @staticmethod
67 def _ensure_list(value, *, default=None):
68 """Normalize a value that should be a list.
70 Handles JSON-encoded strings, comma-separated strings, and
71 already-parsed lists. Returns *default* (empty list when not
72 supplied) for ``None`` or empty/unparseable input.
73 """
74 if default is None:
75 default = []
76 if value is None:
77 return default
78 if isinstance(value, list):
79 return value
80 if isinstance(value, str):
81 stripped = value.strip()
82 if not stripped:
83 return default
84 if stripped.startswith("["):
85 try:
86 parsed = json.loads(stripped)
87 if isinstance(parsed, list): 87 ↛ 91line 87 didn't jump to line 91 because the condition on line 87 was always true
88 return [str(item) for item in parsed]
89 except (json.JSONDecodeError, ValueError, RecursionError):
90 pass
91 return [
92 item.strip() for item in stripped.split(",") if item.strip()
93 ]
94 return default
96 @classmethod
97 def _load_engine_class(cls, name: str, config: Dict[str, Any]):
98 """
99 Helper method to load an engine class dynamically.
101 Args:
102 name: Engine name
103 config: Engine configuration dict with module_path and class_name
105 Returns:
106 Tuple of (success: bool, engine_class or None, error_msg or None)
107 """
108 from ..security.module_whitelist import (
109 ModuleNotAllowedError,
110 get_safe_module_class,
111 )
113 try:
114 module_path = config.get("module_path")
115 class_name = config.get("class_name")
117 if not module_path or not class_name:
118 return (
119 False,
120 None,
121 f"Missing module_path or class_name for {name}",
122 )
124 # Use whitelist-validated safe import
125 engine_class = get_safe_module_class(module_path, class_name)
127 return True, engine_class, None
129 except ModuleNotAllowedError as e:
130 return (
131 False,
132 None,
133 f"Security error loading engine class for {name}: {e}",
134 )
135 except Exception as e:
136 return False, None, f"Could not load engine class for {name}: {e}"
138 @classmethod
139 def _check_api_key_availability(
140 cls, name: str, config: Dict[str, Any]
141 ) -> bool:
142 """
143 Helper method to check if an engine's API key is available and valid.
145 Args:
146 name: Engine name
147 config: Engine configuration dict
149 Returns:
150 True if API key is not required or is available and valid
151 """
152 from loguru import logger
154 if not config.get("requires_api_key", False):
155 return True
157 api_key = config.get("api_key", "").strip()
159 # Check for common placeholder values
160 if (
161 not api_key
162 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"]
163 or api_key.endswith(
164 "_API_KEY"
165 ) # Default placeholders like BRAVE_API_KEY
166 or api_key.startswith("YOUR_")
167 or api_key == "null"
168 ):
169 logger.debug(
170 f"Skipping {name} - requires API key but none configured"
171 )
172 return False
174 return True
176 def __init__(
177 self,
178 llm: Optional[BaseLLM] = None,
179 max_filtered_results: Optional[int] = None,
180 max_results: Optional[int] = 10, # Default value if not provided
181 preview_filters: List[BaseFilter] | None = None,
182 content_filters: List[BaseFilter] | None = None,
183 search_snippets_only: bool = True, # New parameter with default
184 settings_snapshot: Optional[Dict[str, Any]] = None,
185 programmatic_mode: bool = False,
186 **kwargs,
187 ):
188 """
189 Initialize the search engine with common parameters.
191 Args:
192 llm: Optional language model for relevance filtering
193 max_filtered_results: Maximum number of results to keep after filtering
194 max_results: Maximum number of search results to return
195 preview_filters: Filters that will be applied to all previews
196 produced by the search engine, before relevancy checks.
197 content_filters: Filters that will be applied to the full content
198 produced by the search engine, after relevancy checks.
199 search_snippets_only: Whether to return only snippets or full content
200 settings_snapshot: Settings snapshot for configuration
201 programmatic_mode: If True, disables database operations and uses memory-only tracking
202 **kwargs: Additional engine-specific parameters
203 """
204 if max_filtered_results is None:
205 max_filtered_results = 5
206 if max_results is None:
207 max_results = 10
208 self._preview_filters: List[BaseFilter] = preview_filters
209 if self._preview_filters is None:
210 self._preview_filters = []
211 self._content_filters: List[BaseFilter] = content_filters
212 if self._content_filters is None:
213 self._content_filters = []
215 self.llm = llm # LLM for relevance filtering
216 self._max_filtered_results = int(
217 max_filtered_results
218 ) # Ensure it's an integer
219 self._max_results = max(
220 1, int(max_results)
221 ) # Ensure it's a positive integer
222 self.search_snippets_only = search_snippets_only # Store the setting
223 self.settings_snapshot = (
224 settings_snapshot or {}
225 ) # Store settings snapshot
226 self.programmatic_mode = programmatic_mode
228 # Rate limiting attributes
229 self.engine_type = self.__class__.__name__
230 # Create a tracker with our settings if in programmatic mode
231 if self.programmatic_mode:
232 from .rate_limiting.tracker import AdaptiveRateLimitTracker
234 self.rate_tracker = AdaptiveRateLimitTracker(
235 settings_snapshot=self.settings_snapshot,
236 programmatic_mode=self.programmatic_mode,
237 )
238 else:
239 self.rate_tracker = get_tracker()
240 self._last_wait_time = (
241 0.0 # Default to 0 for successful searches without rate limiting
242 )
243 self._last_results_count = 0
245 @property
246 def max_filtered_results(self) -> int:
247 """Get the maximum number of filtered results."""
248 return self._max_filtered_results
250 @max_filtered_results.setter
251 def max_filtered_results(self, value: int) -> None:
252 """Set the maximum number of filtered results."""
253 if value is None:
254 value = 5
255 logger.warning("Setting max_filtered_results to 5")
256 self._max_filtered_results = int(value)
258 @property
259 def max_results(self) -> int:
260 """Get the maximum number of search results."""
261 return self._max_results
263 @max_results.setter
264 def max_results(self, value: int) -> None:
265 """Set the maximum number of search results."""
266 if value is None:
267 value = 10
268 self._max_results = max(1, int(value))
270 def _get_adaptive_wait(self) -> float:
271 """Get adaptive wait time from tracker."""
272 wait_time = self.rate_tracker.get_wait_time(self.engine_type)
273 self._last_wait_time = wait_time
274 logger.debug(
275 f"{self.engine_type} waiting {wait_time:.2f}s before retry"
276 )
277 return wait_time
279 def _record_retry_outcome(self, retry_state) -> None:
280 """Record outcome after retry completes."""
281 success = (
282 not retry_state.outcome.failed if retry_state.outcome else False
283 )
284 self.rate_tracker.record_outcome(
285 self.engine_type,
286 self._last_wait_time or 0,
287 success,
288 retry_state.attempt_number,
289 error_type="RateLimitError" if not success else None,
290 search_result_count=self._last_results_count if success else 0,
291 )
293 def run(
294 self, query: str, research_context: Dict[str, Any] | None = None
295 ) -> List[Dict[str, Any]]:
296 """
297 Run the search engine with a given query, retrieving and filtering results.
298 This implements a two-phase retrieval approach:
299 1. Get preview information for many results
300 2. Filter the previews for relevance
301 3. Get full content for only the relevant results
303 Args:
304 query: The search query
305 research_context: Context from previous research to use.
307 Returns:
308 List of search results with full content (if available)
309 """
310 # Track search call for metrics (if available and not in programmatic mode)
311 tracker = None
312 context_was_set = False
313 if not self.programmatic_mode:
314 from ..metrics.search_tracker import get_search_tracker
316 # For thread-safe context propagation: if we have research_context parameter, use it
317 # Otherwise, try to inherit from current thread context (normal case)
318 # This allows strategies running in threads to explicitly pass context when needed
319 if research_context:
320 # Explicit context provided - use it and set it for this thread
321 set_search_context(research_context)
322 context_was_set = True
324 # Get tracker after context is set (either from parameter or thread)
325 tracker = get_search_tracker()
327 engine_name = self.__class__.__name__.replace(
328 "SearchEngine", ""
329 ).lower()
330 start_time = time.time()
332 success = True
333 error_message = None
334 results_count = 0
336 # Define the core search function with retry logic
337 if self.rate_tracker.enabled:
338 # Rate limiting enabled - use retry with adaptive wait
339 @retry(
340 stop=stop_after_attempt(3),
341 wait=AdaptiveWait(lambda: self._get_adaptive_wait()),
342 retry=retry_if_exception_type((RateLimitError,)),
343 after=self._record_retry_outcome,
344 reraise=True,
345 )
346 def _run_with_retry():
347 nonlocal success, error_message, results_count
348 return _execute_search()
349 else:
350 # Rate limiting disabled - run without retry
351 def _run_with_retry():
352 nonlocal success, error_message, results_count
353 return _execute_search()
355 def _execute_search():
356 nonlocal success, error_message, results_count
358 try:
359 # Step 1: Get preview information for items
360 previews = self._get_previews(query)
361 if not previews:
362 logger.info(
363 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}"
364 )
365 results_count = 0
366 return []
368 for preview_filter in self._preview_filters:
369 previews = preview_filter.filter_results(previews, query)
371 # Step 2: Filter previews for relevance with LLM
372 # Check if LLM relevance filtering should be enabled
373 enable_llm_filter = getattr(
374 self, "enable_llm_relevance_filter", False
375 )
377 logger.info(
378 f"BaseSearchEngine: Relevance filter check - enable_llm_relevance_filter={enable_llm_filter}, has_llm={self.llm is not None}, engine_type={type(self).__name__}"
379 )
381 if enable_llm_filter and self.llm:
382 logger.info(
383 f"Applying LLM relevance filter to {len(previews)} previews"
384 )
385 filtered_items = self._filter_for_relevance(previews, query)
386 logger.info(
387 f"LLM filter kept {len(filtered_items)} of {len(previews)} results"
388 )
389 else:
390 filtered_items = previews
391 if not enable_llm_filter: 391 ↛ 395line 391 didn't jump to line 395 because the condition on line 391 was always true
392 logger.info(
393 f"LLM relevance filtering disabled (enable_llm_relevance_filter={enable_llm_filter}) - returning all {len(previews)} previews"
394 )
395 elif not self.llm:
396 logger.info(
397 f"No LLM available for relevance filtering - returning all {len(previews)} previews"
398 )
400 # Step 3: Get full content for filtered items
401 if self.search_snippets_only:
402 logger.info("Returning snippet-only results as per config")
403 results = filtered_items
404 else:
405 results = self._get_full_content(filtered_items)
407 for content_filter in self._content_filters:
408 results = content_filter.filter_results(results, query)
410 results_count = len(results)
411 self._last_results_count = results_count
413 # Record success if we get here and rate limiting is enabled
414 if self.rate_tracker.enabled:
415 logger.info(
416 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}"
417 )
418 self.rate_tracker.record_outcome(
419 self.engine_type,
420 self._last_wait_time,
421 success=True,
422 retry_count=1, # First attempt succeeded
423 search_result_count=results_count,
424 )
425 else:
426 logger.info(
427 f"Rate limiting disabled, not recording search for {self.engine_type}"
428 )
430 return results
432 except RateLimitError:
433 # Only re-raise if rate limiting is enabled
434 if self.rate_tracker.enabled:
435 raise
436 else:
437 # If rate limiting is disabled, treat as regular error
438 success = False
439 error_message = "Rate limit hit but rate limiting disabled"
440 logger.warning(
441 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled"
442 )
443 results_count = 0
444 return []
445 except Exception as e:
446 # Other errors - don't retry
447 success = False
448 error_message = str(e)
449 logger.exception(
450 f"Search engine {self.__class__.__name__} failed"
451 )
452 results_count = 0
453 return []
455 try:
456 return _run_with_retry()
457 except RetryError as e:
458 # All retries exhausted
459 success = False
460 error_message = f"Rate limited after all retries: {e}"
461 logger.exception(
462 f"{self.__class__.__name__} failed after all retries"
463 )
464 return []
465 except Exception as e:
466 success = False
467 error_message = str(e)
468 logger.exception(f"Search engine {self.__class__.__name__} error")
469 return []
470 finally:
471 try:
472 # Record search metrics BEFORE clearing context (record_search needs it)
473 if tracker is not None:
474 response_time_ms = int((time.time() - start_time) * 1000)
475 tracker.record_search(
476 engine_name=engine_name,
477 query=query,
478 results_count=results_count,
479 response_time_ms=response_time_ms,
480 success=success,
481 error_message=error_message,
482 )
483 finally:
484 # ALWAYS clean up search context, even if recording fails
485 if context_was_set:
486 clear_search_context()
488 def invoke(self, query: str) -> List[Dict[str, Any]]:
489 """Compatibility method for LangChain tools"""
490 return self.run(query)
492 def _filter_for_relevance(
493 self, previews: List[Dict[str, Any]], query: str
494 ) -> List[Dict[str, Any]]:
495 """
496 Filter search results by relevance to the query using the LLM.
498 Args:
499 previews: List of preview dictionaries
500 query: The original search query
502 Returns:
503 Filtered list of preview dictionaries
504 """
505 # If no LLM or too few previews, return all
506 if not self.llm or len(previews) <= 1:
507 return previews
509 # Log the number of previews we're processing
510 logger.info(f"Filtering {len(previews)} previews for relevance")
512 # Create a simple context for LLM
513 preview_context = []
514 indices_used = []
515 for i, preview in enumerate(previews):
516 title = preview.get("title", "Untitled").strip()
517 snippet = preview.get("snippet", "").strip()
518 url = preview.get("url", "").strip()
520 # Clean up snippet if too long
521 if len(snippet) > 300:
522 snippet = snippet[:300] + "..."
524 preview_context.append(
525 f"[{i}] Title: {title}\nURL: {url}\nSnippet: {snippet}"
526 )
527 indices_used.append(i)
529 # Log the indices we're presenting to the LLM
530 logger.info(
531 f"Created preview context with indices 0-{len(previews) - 1}"
532 )
533 logger.info(
534 f"First 5 indices in prompt: {indices_used[:5]}, Last 5: {indices_used[-5:] if len(indices_used) > 5 else 'N/A'}"
535 )
537 # Join all previews with clear separation
538 preview_text = "\n\n".join(preview_context)
540 # Log a sample of what we're sending to the LLM
541 logger.debug(
542 f"First preview in prompt: {preview_context[0] if preview_context else 'None'}"
543 )
544 if len(preview_context) > 1: 544 ↛ 548line 544 didn't jump to line 548 because the condition on line 544 was always true
545 logger.debug(f"Last preview in prompt: {preview_context[-1]}")
547 # Set a reasonable limit on context length
548 current_date = datetime.now(UTC).strftime("%Y-%m-%d")
549 prompt = f"""Analyze these search results and select the most relevant ones for the query.
551Query: "{query}"
552Current date: {current_date}
553Total results: {len(previews)}
555Criteria for selection (in order of importance):
5561. Direct relevance - MUST directly address the specific query topic, not just mention keywords
5572. Quality - from reputable sources with substantive information about the query
5583. Recency - prefer recent information when relevant
560Search results to evaluate:
561{preview_text}
563Return a JSON array of indices (0-based) for results that are highly relevant to the query.
564Valid indices are 0 to {len(previews) - 1}.
565Only include results that directly help answer the specific question asked.
566Be selective - it's better to return fewer high-quality results than many mediocre ones.
567Maximum results to return: {self.max_filtered_results}
568Example response: [4, 0, 2, 7]
570Respond with ONLY the JSON array, no other text."""
572 try:
573 # Get LLM's evaluation
574 response = self.llm.invoke(prompt)
576 # Log the raw response for debugging
577 logger.info(f"Raw LLM response for relevance filtering: {response}")
579 response_text = get_llm_response_text(response)
580 logger.debug(f"Cleaned response text: {response_text}")
582 ranked_indices = extract_json(response_text, expected_type=list)
584 if ranked_indices is not None:
585 logger.info(f"LLM returned indices: {ranked_indices}")
587 # Validate that ranked_indices is a list of integers
588 if not isinstance(ranked_indices, list): 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true
589 logger.warning(
590 "LLM response is not a list, returning empty results"
591 )
592 return []
594 if not all(isinstance(idx, int) for idx in ranked_indices):
595 logger.warning(
596 "LLM response contains non-integer indices, returning empty results"
597 )
598 return []
600 # Log analysis of the indices
601 max_index = max(ranked_indices) if ranked_indices else -1
602 min_index = min(ranked_indices) if ranked_indices else -1
603 logger.info(
604 f"Index analysis: min={min_index}, max={max_index}, "
605 f"valid_range=0-{len(previews) - 1}, count={len(ranked_indices)}"
606 )
608 # Return the results in ranked order
609 ranked_results = []
610 out_of_range = []
611 for idx in ranked_indices:
612 if 0 <= idx < len(previews):
613 ranked_results.append(previews[idx])
614 else:
615 out_of_range.append(idx)
616 logger.warning(
617 f"Index {idx} out of range (valid: 0-{len(previews) - 1}), skipping"
618 )
620 if out_of_range:
621 logger.error(
622 f"Out of range indices: {out_of_range}. "
623 f"Total previews: {len(previews)}, "
624 f"All returned indices: {ranked_indices}"
625 )
627 # Limit to max_filtered_results if specified
628 if (
629 self.max_filtered_results
630 and len(ranked_results) > self.max_filtered_results
631 ):
632 logger.info(
633 f"Limiting filtered results to top {self.max_filtered_results}"
634 )
635 return ranked_results[: self.max_filtered_results]
637 return ranked_results
638 else:
639 logger.warning(
640 "Could not find JSON array in response, returning original previews"
641 )
642 logger.debug(
643 f"Response text without JSON array: {response_text}"
644 )
645 return previews[: min(5, len(previews))]
647 except Exception:
648 logger.exception("Relevance filtering error")
649 # Fall back to returning top results on error
650 return previews[: min(5, len(previews))]
652 @abstractmethod
653 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
654 """
655 Get preview information (titles, summaries) for initial search results.
657 Args:
658 query: The search query
660 Returns:
661 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys
662 """
663 pass
665 @abstractmethod
666 def _get_full_content(
667 self, relevant_items: List[Dict[str, Any]]
668 ) -> List[Dict[str, Any]]:
669 """
670 Get full content for the relevant items.
672 Args:
673 relevant_items: List of relevant preview dictionaries
675 Returns:
676 List of result dictionaries with full content
677 """
678 pass
680 def close(self) -> None:
681 """
682 Close any resources held by this search engine.
684 Subclasses with HTTP sessions or other resources should override this.
685 The base implementation safely closes any 'session' attribute if present.
686 """
687 if hasattr(self, "session") and self.session is not None:
688 try:
689 self.session.close()
690 except Exception:
691 pass # Safe cleanup - don't raise on close
693 def __enter__(self):
694 """Support context manager usage."""
695 return self
697 def __exit__(self, exc_type, exc_val, exc_tb):
698 """Cleanup on context exit."""
699 self.close()
700 return False