Coverage for src/local_deep_research/web_search_engines/search_engine_base.py: 97%
335 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import json
2import re
3import time
4from abc import ABC, abstractmethod
5from typing import Any, Dict, List, Optional, Set, Union
7from langchain_core.language_models import BaseLLM
8from loguru import logger
9from tenacity import (
10 RetryError,
11 retry,
12 retry_if_exception_type,
13 stop_after_attempt,
14)
15from tenacity.wait import wait_base
17from ..advanced_search_system.filters.base_filter import BaseFilter
18from ..config.constants import DEFAULT_MAX_FILTERED_RESULTS
19from ..config.thread_settings import get_setting_from_snapshot
20from ..utilities.thread_context import clear_search_context, set_search_context
22from .rate_limiting import RateLimitError, get_tracker
25class AdaptiveWait(wait_base):
26 """Custom wait strategy that uses adaptive rate limiting."""
28 def __init__(self, get_wait_func):
29 self.get_wait_func = get_wait_func
31 def __call__(self, retry_state):
32 return self.get_wait_func()
35class BaseSearchEngine(ABC):
36 """
37 Abstract base class for search engines with two-phase retrieval capability.
38 Handles common parameters and implements the two-phase search approach.
40 Subclass contract for ``__init__``
41 ---------------------------------
42 Concrete engines should forward ``settings_snapshot`` and
43 ``programmatic_mode`` to ``super().__init__`` so the base class can wire
44 them up correctly. The cleanest way is to declare ``**kwargs`` and pass
45 it along::
47 def __init__(self, max_results=10, *, my_param=None, **kwargs):
48 super().__init__(max_results=max_results, **kwargs)
49 self.my_param = my_param
51 Many existing engines accept ``**kwargs`` but don't forward — that
52 silently drops ``programmatic_mode`` (and used to bind the wrong rate
53 tracker). The factory has a post-construction safety net that calls
54 ``_configure_programmatic_mode`` when the engine's mode doesn't match
55 what the caller asked for, but new engines should not rely on it: the
56 safety net only covers ``programmatic_mode``, not ``settings_snapshot``
57 or other base kwargs.
58 """
60 # Class attribute to indicate if this engine searches public internet sources
61 # Should be overridden by subclasses - defaults to False for safety
62 is_public = False
64 # Class attribute to indicate if this is a generic search engine (vs specialized)
65 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed).
66 # Note: generic does NOT imply good native ranking — see is_lexical.
67 is_generic = False
69 # Class attribute to indicate if this is a scientific/academic search engine
70 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc.
71 is_scientific = False
73 # Class attribute to indicate if this is a local RAG/document search engine
74 # Local engines search private document collections stored locally
75 is_local = False
77 # Class attribute to indicate if this is a news search engine
78 # News engines specialize in news articles and current events
79 is_news = False
81 # Class attribute to indicate if this is a code search engine
82 # Code engines specialize in searching code repositories
83 is_code = False
85 # Class attribute to indicate if this is a book/literature search engine
86 # Book engines search libraries and literary archives
87 is_books = False
89 # Classification: does this engine use lexical/keyword-based search?
90 # Lexical engines (arXiv, PubMed, Wikipedia, Mojeek, etc.) match results by
91 # keywords without ML-based ranking. This is an informational flag that can
92 # drive multiple behaviors (query optimization, result deduplication, UI hints).
93 # For LLM relevance filtering specifically, see needs_llm_relevance_filter.
94 is_lexical = False
96 # Behavioral: should the factory auto-enable LLM relevance filtering?
97 # When True, the factory sets enable_llm_relevance_filter=True on the engine
98 # instance, causing _filter_for_relevance() to run after previews are fetched.
99 # Typically set alongside is_lexical=True, but can be set independently —
100 # e.g. a non-lexical engine with noisy results could opt in.
101 needs_llm_relevance_filter = False
103 # Tuning for the LLM relevance filter (only applies when the filter
104 # is active for this engine).
105 #
106 # relevance_filter_batch_size: split previews into chunks of this many
107 # before sending to the LLM. Smaller batches are faster per call and
108 # more reliable on weaker models which struggle with many indices in
109 # one context. None or 0 = single-call mode (no batching).
110 #
111 # relevance_filter_max_parallel_batches: number of batches to dispatch
112 # concurrently against the LLM. 1 = sequential. Most providers handle
113 # parallel requests fine (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI,
114 # Anthropic).
115 relevance_filter_batch_size: Optional[int] = 5
116 relevance_filter_max_parallel_batches: int = 10
118 # Class attribute for rate limit detection patterns
119 # Subclasses can override to add engine-specific patterns
120 rate_limit_patterns: Set[str] = {
121 "rate limit",
122 "rate_limit",
123 "ratelimit",
124 "too many requests",
125 "throttl",
126 "quota exceeded",
127 "quota_exceeded",
128 "limit exceeded",
129 "request limit",
130 "api limit",
131 "usage limit",
132 }
134 @staticmethod
135 def _ensure_list(value, *, default=None):
136 """Normalize a value that should be a list.
138 Handles JSON-encoded strings, comma-separated strings, and
139 already-parsed lists. Returns *default* (empty list when not
140 supplied) for ``None`` or empty/unparseable input.
141 """
142 if default is None:
143 default = []
144 if value is None:
145 return default
146 if isinstance(value, list):
147 return value
148 if isinstance(value, str):
149 stripped = value.strip()
150 if not stripped:
151 return default
152 if stripped.startswith("["):
153 try:
154 parsed = json.loads(stripped)
155 if isinstance(parsed, list): 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true
156 return [str(item) for item in parsed]
157 except (json.JSONDecodeError, ValueError, RecursionError):
158 pass
159 return [
160 item.strip() for item in stripped.split(",") if item.strip()
161 ]
162 return default
164 @classmethod
165 def _load_engine_class(cls, name: str, config: Dict[str, Any]):
166 """
167 Helper method to load an engine class dynamically.
169 Args:
170 name: Engine name
171 config: Engine configuration dict with module_path and class_name
173 Returns:
174 Tuple of (success: bool, engine_class or None, error_msg or None)
175 """
176 from ..security.module_whitelist import (
177 ModuleNotAllowedError,
178 get_safe_module_class,
179 )
181 try:
182 module_path = config.get("module_path")
183 class_name = config.get("class_name")
185 if not module_path or not class_name:
186 return (
187 False,
188 None,
189 f"Missing module_path or class_name for {name}",
190 )
192 # Use whitelist-validated safe import
193 engine_class = get_safe_module_class(module_path, class_name)
195 return True, engine_class, None
197 except ModuleNotAllowedError as e:
198 return (
199 False,
200 None,
201 f"Security error loading engine class for {name}: {e}",
202 )
203 except Exception as e:
204 return False, None, f"Could not load engine class for {name}: {e}"
206 @classmethod
207 def _check_api_key_availability(
208 cls, name: str, config: Dict[str, Any]
209 ) -> bool:
210 """
211 Helper method to check if an engine's API key is available and valid.
213 Args:
214 name: Engine name
215 config: Engine configuration dict
217 Returns:
218 True if API key is not required or is available and valid
219 """
220 from loguru import logger
222 if not config.get("requires_api_key", False):
223 return True
225 api_key = config.get("api_key", "").strip()
227 # Check for common placeholder values
228 if (
229 not api_key
230 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"]
231 or api_key.endswith(
232 "_API_KEY"
233 ) # Default placeholders like BRAVE_API_KEY
234 or api_key.startswith("YOUR_")
235 or api_key == "null"
236 ):
237 logger.debug(
238 f"Skipping {name} - requires API key but none configured"
239 )
240 return False
242 return True
244 def __init__(
245 self,
246 llm: Optional[BaseLLM] = None,
247 max_filtered_results: Optional[int] = None,
248 max_results: Optional[int] = 10, # Default value if not provided
249 preview_filters: List[BaseFilter] | None = None,
250 content_filters: List[BaseFilter] | None = None,
251 search_snippets_only: bool = True, # New parameter with default
252 include_full_content: bool = False,
253 settings_snapshot: Optional[Dict[str, Any]] = None,
254 programmatic_mode: bool = False,
255 **kwargs,
256 ):
257 """
258 Initialize the search engine with common parameters.
260 Args:
261 llm: Optional language model for relevance filtering
262 max_filtered_results: Maximum number of results to keep after filtering
263 max_results: Maximum number of search results to return
264 preview_filters: Filters that will be applied to all previews
265 produced by the search engine, before relevancy checks.
266 content_filters: Filters that will be applied to the full content
267 produced by the search engine, after relevancy checks.
268 search_snippets_only: Whether to return only snippets or full content
269 include_full_content: Whether to use FullSearchResults for full webpage content
270 settings_snapshot: Settings snapshot for configuration
271 programmatic_mode: If True, disables database operations and uses memory-only tracking
272 **kwargs: Additional engine-specific parameters
273 """
274 if max_filtered_results is None:
275 max_filtered_results = DEFAULT_MAX_FILTERED_RESULTS
276 if max_results is None:
277 max_results = 10
278 self._preview_filters: List[BaseFilter] = (
279 preview_filters if preview_filters is not None else []
280 )
281 self._content_filters: List[BaseFilter] = (
282 content_filters if content_filters is not None else []
283 )
285 self.llm = llm # LLM for relevance filtering
286 self._max_filtered_results = int(
287 max_filtered_results
288 ) # Ensure it's an integer
289 self._max_results = max(
290 1, int(max_results)
291 ) # Ensure it's a positive integer
292 self.search_snippets_only = search_snippets_only # Store the setting
293 self.include_full_content = include_full_content
294 self.settings_snapshot = (
295 settings_snapshot or {}
296 ) # Store settings snapshot
298 self.engine_type = self.__class__.__name__
299 self._configure_programmatic_mode(programmatic_mode)
300 self._last_wait_time = (
301 0.0 # Default to 0 for successful searches without rate limiting
302 )
303 self._last_results_count = 0
305 def _configure_programmatic_mode(self, programmatic_mode: bool) -> None:
306 """Set ``programmatic_mode`` and (re)bind the matching rate tracker.
308 Called from ``__init__`` and from the factory as a fallback when an
309 engine subclass swallows the ``programmatic_mode`` kwarg without
310 forwarding it to ``super().__init__``. Safe to call after init —
311 rebinding ``rate_tracker`` discards the previous tracker (no
312 resources to release) and the new one starts with empty in-memory
313 caches.
314 """
315 self.programmatic_mode = programmatic_mode
316 if programmatic_mode:
317 from .rate_limiting.tracker import AdaptiveRateLimitTracker
319 self.rate_tracker = AdaptiveRateLimitTracker(
320 settings_snapshot=self.settings_snapshot,
321 programmatic_mode=programmatic_mode,
322 )
323 else:
324 self.rate_tracker = get_tracker()
326 @property
327 def max_filtered_results(self) -> int:
328 """Get the maximum number of filtered results."""
329 return self._max_filtered_results
331 @max_filtered_results.setter
332 def max_filtered_results(self, value: int) -> None:
333 """Set the maximum number of filtered results."""
334 if value is None:
335 value = DEFAULT_MAX_FILTERED_RESULTS
336 logger.warning(
337 f"Setting max_filtered_results to {DEFAULT_MAX_FILTERED_RESULTS}"
338 )
339 self._max_filtered_results = int(value)
341 @property
342 def max_results(self) -> int:
343 """Get the maximum number of search results."""
344 return self._max_results
346 @max_results.setter
347 def max_results(self, value: int) -> None:
348 """Set the maximum number of search results."""
349 if value is None:
350 value = 10
351 self._max_results = max(1, int(value))
353 def _get_adaptive_wait(self) -> float:
354 """Get adaptive wait time from tracker."""
355 wait_time = self.rate_tracker.get_wait_time(self.engine_type)
356 self._last_wait_time = wait_time
357 logger.debug(
358 f"{self.engine_type} waiting {wait_time:.2f}s before retry"
359 )
360 return wait_time
362 def _record_retry_outcome(self, retry_state) -> None:
363 """Record outcome after retry completes."""
364 success = (
365 not retry_state.outcome.failed if retry_state.outcome else False
366 )
367 self.rate_tracker.record_outcome(
368 self.engine_type,
369 self._last_wait_time or 0,
370 success,
371 retry_state.attempt_number,
372 error_type="RateLimitError" if not success else None,
373 search_result_count=self._last_results_count if success else 0,
374 )
376 def run(
377 self, query: str, research_context: Dict[str, Any] | None = None
378 ) -> List[Dict[str, Any]]:
379 """
380 Run the search engine with a given query, retrieving and filtering results.
381 This implements a two-phase retrieval approach:
382 1. Get preview information for many results
383 2. Filter the previews for relevance
384 3. Get full content for only the relevant results
386 Args:
387 query: The search query
388 research_context: Context from previous research to use.
390 Returns:
391 List of search results with full content (if available)
392 """
393 logger.info(f"---Execute a search using {self.__class__.__name__}---")
395 # Track search call for metrics (if available and not in programmatic mode)
396 should_record_metrics = False
397 context_was_set = False
398 if not self.programmatic_mode:
399 from ..metrics.search_tracker import SearchTracker
401 should_record_metrics = True
403 # For thread-safe context propagation: if we have research_context parameter, use it
404 # Otherwise, try to inherit from current thread context (normal case)
405 # This allows strategies running in threads to explicitly pass context when needed
406 if research_context:
407 # Explicit context provided - use it and set it for this thread
408 set_search_context(research_context)
409 context_was_set = True
411 engine_name = self.__class__.__name__.replace(
412 "SearchEngine", ""
413 ).lower()
414 start_time = time.time()
416 success = True
417 error_message = None
418 results_count = 0
420 # Define the core search function with retry logic
421 if self.rate_tracker.enabled:
422 # Rate limiting enabled - use retry with adaptive wait
423 @retry(
424 stop=stop_after_attempt(3),
425 wait=AdaptiveWait(lambda: self._get_adaptive_wait()),
426 retry=retry_if_exception_type((RateLimitError,)),
427 after=self._record_retry_outcome,
428 reraise=True,
429 )
430 def _run_with_retry():
431 nonlocal success, error_message, results_count
432 return _execute_search()
433 else:
434 # Rate limiting disabled - run without retry
435 def _run_with_retry():
436 nonlocal success, error_message, results_count
437 return _execute_search()
439 def _execute_search():
440 nonlocal success, error_message, results_count
442 try:
443 # Step 1: Get preview information for items
444 previews = self._get_previews(query)
445 if not previews:
446 logger.info(
447 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}"
448 )
449 results_count = 0
450 return []
452 # Pre-filter enrichment: resolve DOIs to OpenAlex source
453 # IDs BEFORE the preview filters run. The
454 # JournalReputationFilter is registered as a preview
455 # filter and uses ``result["openalex_source_id"]`` for
456 # Tier 2 lookups; running enrichment afterwards (as the
457 # old pipeline did) left the field empty at filter time,
458 # silently forcing a fragile-name-match fallback. Only
459 # for scientific engines whose results carry DOIs.
460 if getattr(self, "is_scientific", False):
461 try:
462 from ..utilities.openalex_enrichment import (
463 enrich_results_with_source_ids,
464 )
466 email = getattr(self, "email", None)
467 previews = enrich_results_with_source_ids(
468 previews, email=email
469 )
470 except Exception:
471 logger.debug(
472 "DOI enrichment skipped (import or call failed)"
473 )
475 for preview_filter in self._preview_filters:
476 previews = preview_filter.filter_results(previews, query)
478 # Step 2: Filter previews for relevance with LLM
479 enable_llm_filter = getattr(
480 self, "enable_llm_relevance_filter", False
481 )
483 if enable_llm_filter and self.llm:
484 filtered_items = self._filter_for_relevance(previews, query)
485 else:
486 filtered_items = previews
487 logger.debug(
488 f"[{type(self).__name__}] Relevance filter skipped "
489 f"(enabled={enable_llm_filter}, "
490 f"llm={'yes' if self.llm else 'no'})"
491 )
493 # Step 3: Get full content for filtered items
494 if self.search_snippets_only:
495 logger.info("Returning snippet-only results as per config")
496 results = filtered_items
497 else:
498 results = self._get_full_content(filtered_items)
500 for content_filter in self._content_filters:
501 results = content_filter.filter_results(results, query)
503 results_count = len(results)
504 self._last_results_count = results_count
506 # Record success if we get here and rate limiting is enabled
507 if self.rate_tracker.enabled:
508 logger.info(
509 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}"
510 )
511 self.rate_tracker.record_outcome(
512 self.engine_type,
513 self._last_wait_time,
514 success=True,
515 retry_count=1, # First attempt succeeded
516 search_result_count=results_count,
517 )
518 else:
519 logger.info(
520 f"Rate limiting disabled, not recording search for {self.engine_type}"
521 )
523 return results
525 except RateLimitError:
526 # Only re-raise if rate limiting is enabled
527 if self.rate_tracker.enabled:
528 raise
529 # If rate limiting is disabled, treat as regular error
530 success = False
531 error_message = "Rate limit hit but rate limiting disabled"
532 logger.warning(
533 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled"
534 )
535 results_count = 0
536 return []
537 except Exception as e:
538 # Other errors - don't retry
539 success = False
540 error_message = str(e)
541 logger.exception(
542 f"Search engine {self.__class__.__name__} failed"
543 )
544 results_count = 0
545 return []
547 try:
548 return _run_with_retry() # type: ignore[no-any-return]
549 except RetryError as e:
550 # All retries exhausted
551 success = False
552 error_message = f"Rate limited after all retries: {e}"
553 logger.exception(
554 f"{self.__class__.__name__} failed after all retries"
555 )
556 return []
557 except Exception as e:
558 success = False
559 error_message = str(e)
560 logger.exception(f"Search engine {self.__class__.__name__} error")
561 return []
562 finally:
563 try:
564 # Record search metrics BEFORE clearing context (record_search needs it)
565 if should_record_metrics:
566 response_time_ms = int((time.time() - start_time) * 1000)
567 SearchTracker.record_search(
568 engine_name=engine_name,
569 query=query,
570 results_count=results_count,
571 response_time_ms=response_time_ms,
572 success=success,
573 error_message=error_message,
574 )
575 finally:
576 # Clean up temporary search result storage
577 for attr in self._temp_attributes():
578 if hasattr(self, attr):
579 delattr(self, attr)
580 # ALWAYS clean up search context, even if recording fails
581 if context_was_set:
582 clear_search_context()
584 def invoke(self, query: str) -> List[Dict[str, Any]]:
585 """Compatibility method for LangChain tools"""
586 return self.run(query)
588 def _filter_for_relevance(
589 self, previews: List[Dict[str, Any]], query: str
590 ) -> List[Dict[str, Any]]:
591 """
592 Filter search results by relevance using the LLM.
594 Delegates to the ``relevance_filter`` module, which prompts the
595 LLM for a plain-text list of relevant indices and parses them
596 with a regex (no structured output).
598 Args:
599 previews: List of preview dictionaries
600 query: The original search query
602 Returns:
603 Filtered list of preview dictionaries
604 """
605 engine_name = type(self).__name__
607 if not self.llm or len(previews) <= 1:
608 logger.debug(
609 f"[{engine_name}] Skipping relevance filter "
610 f"(llm={'yes' if self.llm else 'no'}, "
611 f"previews={len(previews)})"
612 )
613 return previews
615 from .relevance_filter import filter_previews_for_relevance
617 return filter_previews_for_relevance(
618 llm=self.llm,
619 previews=previews,
620 query=query,
621 max_filtered_results=self.max_filtered_results,
622 engine_name=engine_name,
623 batch_size=self.relevance_filter_batch_size,
624 max_parallel_batches=self.relevance_filter_max_parallel_batches,
625 )
627 # =========================================================================
628 # Shared Helper Methods for Subclasses
629 # =========================================================================
631 @staticmethod
632 def _is_valid_api_key(api_key: Optional[str]) -> bool:
633 """
634 Check if an API key is valid (not a placeholder value).
636 Args:
637 api_key: The API key to validate
639 Returns:
640 True if the key appears to be a real API key, False if it's a placeholder
642 Example:
643 >>> BaseSearchEngine._is_valid_api_key("sk-abc123")
644 True
645 >>> BaseSearchEngine._is_valid_api_key("YOUR_API_KEY_HERE")
646 False
647 """
648 if not api_key:
649 return False
651 api_key = api_key.strip()
653 # Empty or whitespace-only
654 if not api_key:
655 return False
657 # Common placeholder values
658 placeholders = {
659 "",
660 "None",
661 "null",
662 "PLACEHOLDER",
663 "YOUR_API_KEY_HERE",
664 "YOUR_API_KEY",
665 "API_KEY",
666 "your_api_key",
667 "your-api-key",
668 }
670 if api_key in placeholders:
671 return False
673 # Patterns that indicate placeholders
674 if api_key.endswith("_API_KEY"):
675 return False
676 if api_key.startswith("YOUR_"):
677 return False
678 if api_key.startswith("<") and api_key.endswith(">"):
679 return False
680 if api_key.startswith("${") and api_key.endswith("}"):
681 return False
683 return True
685 def _resolve_api_key(
686 self,
687 api_key: Optional[str],
688 setting_key: str,
689 engine_name: str = "search engine",
690 settings_snapshot: Optional[Dict[str, Any]] = None,
691 ) -> str:
692 """
693 Resolve an API key from multiple sources with priority order.
695 Environment variables are handled automatically by SettingsManager
696 when building the settings snapshot, so they don't need to be
697 checked separately here.
699 Priority order:
700 1. Direct parameter (api_key argument)
701 2. Settings snapshot (via setting_key)
703 Args:
704 api_key: API key passed directly as parameter
705 setting_key: Key to look up in settings snapshot (e.g., "search.brave_api_key")
706 engine_name: Human-readable engine name for error messages
707 settings_snapshot: Optional settings snapshot dict (uses self.settings_snapshot if not provided)
709 Returns:
710 The resolved API key string
712 Raises:
713 ValueError: If no valid API key is found from any source
715 Example:
716 >>> engine._resolve_api_key(
717 ... api_key=None,
718 ... setting_key="search.brave_api_key",
719 ... engine_name="Brave Search"
720 ... )
721 "sk-abc123..."
722 """
723 # Use instance settings snapshot if not provided
724 if settings_snapshot is None:
725 settings_snapshot = self.settings_snapshot
727 # Priority 1: Direct parameter
728 if self._is_valid_api_key(api_key) and api_key is not None:
729 return api_key.strip()
731 # Priority 2: Settings snapshot (includes env var overrides via SettingsManager)
732 if settings_snapshot:
733 settings_value = get_setting_from_snapshot(
734 setting_key,
735 default=None,
736 settings_snapshot=settings_snapshot,
737 )
738 if self._is_valid_api_key(settings_value):
739 return settings_value.strip() if settings_value else ""
741 # No valid API key found
742 masked_key = self._mask_api_key(str(api_key)) if api_key else "None"
743 raise ValueError(
744 f"No valid API key found for {engine_name}. "
745 f"Checked: direct parameter ({masked_key}), "
746 f"settings key '{setting_key}'. "
747 f"Please provide a valid API key."
748 )
750 def _is_rate_limit_error(
751 self,
752 error: Union[Exception, str, int],
753 additional_patterns: Optional[Set[str]] = None,
754 ) -> bool:
755 """
756 Detect if an error is a rate limit error.
758 Checks multiple sources for rate limit indicators:
759 - HTTP status code 429
760 - HTTPError response objects
761 - Error messages containing rate limit phrases
763 Args:
764 error: The error to check (Exception, string, or HTTP status code)
765 additional_patterns: Optional set of additional patterns to match
767 Returns:
768 True if the error appears to be a rate limit error
770 Example:
771 >>> engine._is_rate_limit_error(429)
772 True
773 >>> engine._is_rate_limit_error("Rate limit exceeded")
774 True
775 >>> engine._is_rate_limit_error(ValueError("Invalid input"))
776 False
777 """
778 # Combine default and additional patterns
779 patterns = self.rate_limit_patterns.copy()
780 if additional_patterns:
781 patterns.update(additional_patterns)
783 # Check integer status code directly
784 if isinstance(error, int):
785 return error == 429
787 # Convert to string for pattern matching
788 error_str = ""
789 status_code = None
791 if isinstance(error, str):
792 error_str = error
793 elif isinstance(error, Exception):
794 error_str = str(error)
796 # Check for HTTP status code in common HTTP error types
797 if hasattr(error, "status_code"):
798 status_code = error.status_code
799 elif hasattr(error, "response"):
800 response = error.response
801 if hasattr(response, "status_code"):
802 status_code = response.status_code
804 # Check status code first
805 if status_code == 429:
806 return True
808 # Case-insensitive pattern matching
809 error_lower = error_str.lower()
810 for pattern in patterns:
811 if pattern.lower() in error_lower:
812 return True
814 return False
816 def _raise_if_rate_limit(
817 self,
818 error: Union[Exception, str, int],
819 additional_patterns: Optional[Set[str]] = None,
820 ) -> None:
821 """
822 Raise RateLimitError if the given error is a rate limit error.
824 Convenience method that combines _is_rate_limit_error check with
825 raising RateLimitError.
827 Args:
828 error: The error to check
829 additional_patterns: Optional set of additional patterns to match
831 Raises:
832 RateLimitError: If the error is detected as a rate limit error
834 Example:
835 >>> try:
836 ... response = make_api_call()
837 ... except Exception as e:
838 ... engine._raise_if_rate_limit(e)
839 """
840 if self._is_rate_limit_error(error, additional_patterns):
841 error_msg = str(error) if not isinstance(error, str) else error
842 raise RateLimitError(
843 f"Rate limit detected: {self._sanitize_error_message(error_msg)}"
844 )
846 def _extract_full_result(self, item: Dict[str, Any]) -> Dict[str, Any]:
847 """
848 Extract the full result from an item that may contain a _full_result key.
850 This is a helper for the default _get_full_content implementation.
851 It extracts data from the _full_result key if present, otherwise uses
852 the item directly, and removes the internal _full_result key.
854 Args:
855 item: A search result item that may contain a _full_result key
857 Returns:
858 A dictionary with the full result data, without the _full_result key
860 Example:
861 >>> engine._extract_full_result({"title": "A", "_full_result": {"title": "A", "content": "Full"}})
862 {"title": "A", "content": "Full"}
863 """
864 source = item.get("_full_result")
865 if source is None:
866 source = item
867 return {k: v for k, v in source.items() if k != "_full_result"}
869 def _get_full_content(
870 self, relevant_items: List[Dict[str, Any]]
871 ) -> List[Dict[str, Any]]:
872 """
873 Get full content for the relevant items.
875 Default implementation extracts data from _full_result keys if present.
876 Subclasses can override this method to fetch additional content from
877 external sources (e.g., web scraping, API calls).
879 Args:
880 relevant_items: List of relevant preview dictionaries
882 Returns:
883 List of result dictionaries with full content
885 Example:
886 >>> engine._get_full_content([
887 ... {"title": "A", "_full_result": {"title": "A", "content": "Full A"}},
888 ... {"title": "B"}
889 ... ])
890 [{"title": "A", "content": "Full A"}, {"title": "B"}]
891 """
892 if not relevant_items:
893 return []
894 return [self._extract_full_result(item) for item in relevant_items]
896 def _init_full_search(
897 self,
898 web_search=None,
899 language="en",
900 max_results=10,
901 region=None,
902 time_period=None,
903 safe_search=None,
904 ):
905 """Initialize FullSearchResults if include_full_content is True.
907 Call this at the end of your __init__ after setting up your search wrapper.
909 Args:
910 web_search: The search wrapper/engine to pass to FullSearchResults
911 language: Language for search results
912 max_results: Maximum number of results
913 region: Region/country code for results
914 time_period: Time period filter
915 safe_search: Safe search setting (string value for FullSearchResults)
916 """
917 if self.include_full_content and self.llm:
918 try:
919 from .engines.full_search import FullSearchResults
921 self.full_search = FullSearchResults(
922 llm=self.llm,
923 web_search=web_search,
924 language=language,
925 max_results=max_results,
926 region=region,
927 time=time_period,
928 safesearch=safe_search,
929 settings_snapshot=self.settings_snapshot,
930 )
931 except ImportError:
932 logger.warning(
933 "FullSearchResults not available. "
934 "Full content retrieval disabled."
935 )
936 self.include_full_content = False
938 def _temp_attributes(self):
939 """Return list of temporary attribute names to clean up after run().
941 Override in subclasses that store additional temporary data.
942 """
943 return ["_search_results"]
945 def _sanitize_error_message(self, message: str) -> str:
946 """
947 Remove/mask API keys, tokens, and secrets from error messages.
949 Uses pattern matching for common credential formats.
951 Args:
952 message: The error message to sanitize
954 Returns:
955 Sanitized message with sensitive data redacted
957 Example:
958 >>> engine._sanitize_error_message("Error with key sk-abc123xyz")
959 "Error with key [REDACTED]"
960 """
961 if not message:
962 return message
964 sanitized = message
966 # Additional regex patterns for common credential formats
967 patterns = [
968 # Bearer tokens
969 (r"Bearer\s+[A-Za-z0-9\-._~+/]+=*", "Bearer [REDACTED]"),
970 # API keys in URLs
971 (
972 r"([?&])(api_key|apikey|key|token|secret)=([A-Za-z0-9\-._~+/]+)",
973 r"\1\2=[REDACTED]",
974 ),
975 # URL credentials (user:pass@host)
976 (r"(https?://)([^:\s]+):([^@\s]+)@", r"\1[REDACTED]:[REDACTED]@"),
977 # Common API key patterns (sk-*, pk-*, etc.)
978 (r"\b(sk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"),
979 (r"\b(pk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"),
980 ]
982 for pattern, replacement in patterns:
983 sanitized = re.sub(pattern, replacement, sanitized)
985 return sanitized
987 def _mask_api_key(self, api_key: str, visible_chars: int = 4) -> str:
988 """
989 Mask an API key for safe logging, showing only first and last characters.
991 Args:
992 api_key: The API key to mask
993 visible_chars: Number of characters to show at start and end
995 Returns:
996 Masked API key in format "sk-1...nop" or "***" for short keys
998 Example:
999 >>> engine._mask_api_key("sk-abcdefghijklmnop123456")
1000 "sk-a...3456"
1001 >>> engine._mask_api_key("short")
1002 "***"
1003 """
1004 if not api_key:
1005 return "***"
1007 api_key = str(api_key).strip()
1009 if len(api_key) <= visible_chars * 2:
1010 return "***"
1012 return f"{api_key[:visible_chars]}...{api_key[-visible_chars:]}"
1014 @abstractmethod
1015 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
1016 """
1017 Get preview information (titles, summaries) for initial search results.
1019 Args:
1020 query: The search query
1022 Returns:
1023 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys
1024 """
1025 pass
1027 def close(self) -> None:
1028 """
1029 Close any resources held by this search engine.
1031 Subclasses with HTTP sessions or other resources should override this.
1032 The base implementation safely closes any 'session' attribute if present
1033 and closes both preview and content filters that hold resources.
1034 """
1035 from ..utilities.resource_utils import safe_close
1037 if hasattr(self, "session") and self.session is not None:
1038 safe_close(self.session, "HTTP session")
1039 # Close preview filters as well as content filters — the journal
1040 # reputation filter is registered as a preview filter on academic
1041 # engines (arxiv, pubmed, openalex, nasa_ads, semantic_scholar)
1042 # and holds a SearXNG engine + LLM client that need releasing.
1043 if hasattr(self, "_preview_filters"): 1043 ↛ 1046line 1043 didn't jump to line 1046 because the condition on line 1043 was always true
1044 for preview_filter in self._preview_filters: 1044 ↛ 1045line 1044 didn't jump to line 1045 because the loop on line 1044 never started
1045 safe_close(preview_filter, "preview filter")
1046 if hasattr(self, "_content_filters"): 1046 ↛ exitline 1046 didn't return from function 'close' because the condition on line 1046 was always true
1047 for content_filter in self._content_filters: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the loop on line 1047 never started
1048 safe_close(content_filter, "content filter")
1050 def __enter__(self):
1051 """Support context manager usage."""
1052 return self
1054 def __exit__(self, exc_type, exc_val, exc_tb):
1055 """Cleanup on context exit."""
1056 self.close()
1057 return False