Coverage for src / local_deep_research / web_search_engines / search_engine_base.py: 98%
322 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import json
2import re
3import time
4from abc import ABC, abstractmethod
5from typing import Any, Dict, List, Optional, Set, Union
7from langchain_core.language_models import BaseLLM
8from loguru import logger
9from tenacity import (
10 RetryError,
11 retry,
12 retry_if_exception_type,
13 stop_after_attempt,
14)
15from tenacity.wait import wait_base
17from ..advanced_search_system.filters.base_filter import BaseFilter
18from ..config.thread_settings import get_setting_from_snapshot
19from ..utilities.thread_context import clear_search_context, set_search_context
21from .rate_limiting import RateLimitError, get_tracker
24class AdaptiveWait(wait_base):
25 """Custom wait strategy that uses adaptive rate limiting."""
27 def __init__(self, get_wait_func):
28 self.get_wait_func = get_wait_func
30 def __call__(self, retry_state):
31 return self.get_wait_func()
34class BaseSearchEngine(ABC):
35 """
36 Abstract base class for search engines with two-phase retrieval capability.
37 Handles common parameters and implements the two-phase search approach.
38 """
40 # Class attribute to indicate if this engine searches public internet sources
41 # Should be overridden by subclasses - defaults to False for safety
42 is_public = False
44 # Class attribute to indicate if this is a generic search engine (vs specialized)
45 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed).
46 # Note: generic does NOT imply good native ranking — see is_lexical.
47 is_generic = False
49 # Class attribute to indicate if this is a scientific/academic search engine
50 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc.
51 is_scientific = False
53 # Class attribute to indicate if this is a local RAG/document search engine
54 # Local engines search private document collections stored locally
55 is_local = False
57 # Class attribute to indicate if this is a news search engine
58 # News engines specialize in news articles and current events
59 is_news = False
61 # Class attribute to indicate if this is a code search engine
62 # Code engines specialize in searching code repositories
63 is_code = False
65 # Class attribute to indicate if this is a book/literature search engine
66 # Book engines search libraries and literary archives
67 is_books = False
69 # Classification: does this engine use lexical/keyword-based search?
70 # Lexical engines (arXiv, PubMed, Wikipedia, Mojeek, etc.) match results by
71 # keywords without ML-based ranking. This is an informational flag that can
72 # drive multiple behaviors (query optimization, result deduplication, UI hints).
73 # For LLM relevance filtering specifically, see needs_llm_relevance_filter.
74 is_lexical = False
76 # Behavioral: should the factory auto-enable LLM relevance filtering?
77 # When True, the factory sets enable_llm_relevance_filter=True on the engine
78 # instance, causing _filter_for_relevance() to run after previews are fetched.
79 # Typically set alongside is_lexical=True, but can be set independently —
80 # e.g. a non-lexical engine with noisy results could opt in.
81 needs_llm_relevance_filter = False
83 # Tuning for the LLM relevance filter (only applies when the filter
84 # is active for this engine).
85 #
86 # relevance_filter_batch_size: split previews into chunks of this many
87 # before sending to the LLM. Smaller batches are faster per call and
88 # more reliable on weaker models which struggle with many indices in
89 # one context. None or 0 = single-call mode (no batching).
90 #
91 # relevance_filter_max_parallel_batches: number of batches to dispatch
92 # concurrently against the LLM. 1 = sequential. Most providers handle
93 # parallel requests fine (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI,
94 # Anthropic).
95 relevance_filter_batch_size: Optional[int] = 5
96 relevance_filter_max_parallel_batches: int = 10
98 # Class attribute for rate limit detection patterns
99 # Subclasses can override to add engine-specific patterns
100 rate_limit_patterns: Set[str] = {
101 "rate limit",
102 "rate_limit",
103 "ratelimit",
104 "too many requests",
105 "throttl",
106 "quota exceeded",
107 "quota_exceeded",
108 "limit exceeded",
109 "request limit",
110 "api limit",
111 "usage limit",
112 }
114 @staticmethod
115 def _ensure_list(value, *, default=None):
116 """Normalize a value that should be a list.
118 Handles JSON-encoded strings, comma-separated strings, and
119 already-parsed lists. Returns *default* (empty list when not
120 supplied) for ``None`` or empty/unparseable input.
121 """
122 if default is None:
123 default = []
124 if value is None:
125 return default
126 if isinstance(value, list):
127 return value
128 if isinstance(value, str):
129 stripped = value.strip()
130 if not stripped:
131 return default
132 if stripped.startswith("["):
133 try:
134 parsed = json.loads(stripped)
135 if isinstance(parsed, list): 135 ↛ 139line 135 didn't jump to line 139 because the condition on line 135 was always true
136 return [str(item) for item in parsed]
137 except (json.JSONDecodeError, ValueError, RecursionError):
138 pass
139 return [
140 item.strip() for item in stripped.split(",") if item.strip()
141 ]
142 return default
144 @classmethod
145 def _load_engine_class(cls, name: str, config: Dict[str, Any]):
146 """
147 Helper method to load an engine class dynamically.
149 Args:
150 name: Engine name
151 config: Engine configuration dict with module_path and class_name
153 Returns:
154 Tuple of (success: bool, engine_class or None, error_msg or None)
155 """
156 from ..security.module_whitelist import (
157 ModuleNotAllowedError,
158 get_safe_module_class,
159 )
161 try:
162 module_path = config.get("module_path")
163 class_name = config.get("class_name")
165 if not module_path or not class_name:
166 return (
167 False,
168 None,
169 f"Missing module_path or class_name for {name}",
170 )
172 # Use whitelist-validated safe import
173 engine_class = get_safe_module_class(module_path, class_name)
175 return True, engine_class, None
177 except ModuleNotAllowedError as e:
178 return (
179 False,
180 None,
181 f"Security error loading engine class for {name}: {e}",
182 )
183 except Exception as e:
184 return False, None, f"Could not load engine class for {name}: {e}"
186 @classmethod
187 def _check_api_key_availability(
188 cls, name: str, config: Dict[str, Any]
189 ) -> bool:
190 """
191 Helper method to check if an engine's API key is available and valid.
193 Args:
194 name: Engine name
195 config: Engine configuration dict
197 Returns:
198 True if API key is not required or is available and valid
199 """
200 from loguru import logger
202 if not config.get("requires_api_key", False):
203 return True
205 api_key = config.get("api_key", "").strip()
207 # Check for common placeholder values
208 if (
209 not api_key
210 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"]
211 or api_key.endswith(
212 "_API_KEY"
213 ) # Default placeholders like BRAVE_API_KEY
214 or api_key.startswith("YOUR_")
215 or api_key == "null"
216 ):
217 logger.debug(
218 f"Skipping {name} - requires API key but none configured"
219 )
220 return False
222 return True
224 def __init__(
225 self,
226 llm: Optional[BaseLLM] = None,
227 max_filtered_results: Optional[int] = None,
228 max_results: Optional[int] = 10, # Default value if not provided
229 preview_filters: List[BaseFilter] | None = None,
230 content_filters: List[BaseFilter] | None = None,
231 search_snippets_only: bool = True, # New parameter with default
232 include_full_content: bool = False,
233 settings_snapshot: Optional[Dict[str, Any]] = None,
234 programmatic_mode: bool = False,
235 **kwargs,
236 ):
237 """
238 Initialize the search engine with common parameters.
240 Args:
241 llm: Optional language model for relevance filtering
242 max_filtered_results: Maximum number of results to keep after filtering
243 max_results: Maximum number of search results to return
244 preview_filters: Filters that will be applied to all previews
245 produced by the search engine, before relevancy checks.
246 content_filters: Filters that will be applied to the full content
247 produced by the search engine, after relevancy checks.
248 search_snippets_only: Whether to return only snippets or full content
249 include_full_content: Whether to use FullSearchResults for full webpage content
250 settings_snapshot: Settings snapshot for configuration
251 programmatic_mode: If True, disables database operations and uses memory-only tracking
252 **kwargs: Additional engine-specific parameters
253 """
254 if max_filtered_results is None:
255 max_filtered_results = 5
256 if max_results is None:
257 max_results = 10
258 self._preview_filters: List[BaseFilter] = (
259 preview_filters if preview_filters is not None else []
260 )
261 self._content_filters: List[BaseFilter] = (
262 content_filters if content_filters is not None else []
263 )
265 self.llm = llm # LLM for relevance filtering
266 self._max_filtered_results = int(
267 max_filtered_results
268 ) # Ensure it's an integer
269 self._max_results = max(
270 1, int(max_results)
271 ) # Ensure it's a positive integer
272 self.search_snippets_only = search_snippets_only # Store the setting
273 self.include_full_content = include_full_content
274 self.settings_snapshot = (
275 settings_snapshot or {}
276 ) # Store settings snapshot
277 self.programmatic_mode = programmatic_mode
279 # Rate limiting attributes
280 self.engine_type = self.__class__.__name__
281 # Create a tracker with our settings if in programmatic mode
282 if self.programmatic_mode:
283 from .rate_limiting.tracker import AdaptiveRateLimitTracker
285 self.rate_tracker = AdaptiveRateLimitTracker(
286 settings_snapshot=self.settings_snapshot,
287 programmatic_mode=self.programmatic_mode,
288 )
289 else:
290 self.rate_tracker = get_tracker()
291 self._last_wait_time = (
292 0.0 # Default to 0 for successful searches without rate limiting
293 )
294 self._last_results_count = 0
296 @property
297 def max_filtered_results(self) -> int:
298 """Get the maximum number of filtered results."""
299 return self._max_filtered_results
301 @max_filtered_results.setter
302 def max_filtered_results(self, value: int) -> None:
303 """Set the maximum number of filtered results."""
304 if value is None:
305 value = 5
306 logger.warning("Setting max_filtered_results to 5")
307 self._max_filtered_results = int(value)
309 @property
310 def max_results(self) -> int:
311 """Get the maximum number of search results."""
312 return self._max_results
314 @max_results.setter
315 def max_results(self, value: int) -> None:
316 """Set the maximum number of search results."""
317 if value is None:
318 value = 10
319 self._max_results = max(1, int(value))
321 def _get_adaptive_wait(self) -> float:
322 """Get adaptive wait time from tracker."""
323 wait_time = self.rate_tracker.get_wait_time(self.engine_type)
324 self._last_wait_time = wait_time
325 logger.debug(
326 f"{self.engine_type} waiting {wait_time:.2f}s before retry"
327 )
328 return wait_time
330 def _record_retry_outcome(self, retry_state) -> None:
331 """Record outcome after retry completes."""
332 success = (
333 not retry_state.outcome.failed if retry_state.outcome else False
334 )
335 self.rate_tracker.record_outcome(
336 self.engine_type,
337 self._last_wait_time or 0,
338 success,
339 retry_state.attempt_number,
340 error_type="RateLimitError" if not success else None,
341 search_result_count=self._last_results_count if success else 0,
342 )
344 def run(
345 self, query: str, research_context: Dict[str, Any] | None = None
346 ) -> List[Dict[str, Any]]:
347 """
348 Run the search engine with a given query, retrieving and filtering results.
349 This implements a two-phase retrieval approach:
350 1. Get preview information for many results
351 2. Filter the previews for relevance
352 3. Get full content for only the relevant results
354 Args:
355 query: The search query
356 research_context: Context from previous research to use.
358 Returns:
359 List of search results with full content (if available)
360 """
361 logger.info(f"---Execute a search using {self.__class__.__name__}---")
363 # Track search call for metrics (if available and not in programmatic mode)
364 should_record_metrics = False
365 context_was_set = False
366 if not self.programmatic_mode:
367 from ..metrics.search_tracker import SearchTracker
369 should_record_metrics = True
371 # For thread-safe context propagation: if we have research_context parameter, use it
372 # Otherwise, try to inherit from current thread context (normal case)
373 # This allows strategies running in threads to explicitly pass context when needed
374 if research_context:
375 # Explicit context provided - use it and set it for this thread
376 set_search_context(research_context)
377 context_was_set = True
379 engine_name = self.__class__.__name__.replace(
380 "SearchEngine", ""
381 ).lower()
382 start_time = time.time()
384 success = True
385 error_message = None
386 results_count = 0
388 # Define the core search function with retry logic
389 if self.rate_tracker.enabled:
390 # Rate limiting enabled - use retry with adaptive wait
391 @retry(
392 stop=stop_after_attempt(3),
393 wait=AdaptiveWait(lambda: self._get_adaptive_wait()),
394 retry=retry_if_exception_type((RateLimitError,)),
395 after=self._record_retry_outcome,
396 reraise=True,
397 )
398 def _run_with_retry():
399 nonlocal success, error_message, results_count
400 return _execute_search()
401 else:
402 # Rate limiting disabled - run without retry
403 def _run_with_retry():
404 nonlocal success, error_message, results_count
405 return _execute_search()
407 def _execute_search():
408 nonlocal success, error_message, results_count
410 try:
411 # Step 1: Get preview information for items
412 previews = self._get_previews(query)
413 if not previews:
414 logger.info(
415 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}"
416 )
417 results_count = 0
418 return []
420 for preview_filter in self._preview_filters:
421 previews = preview_filter.filter_results(previews, query)
423 # Step 2: Filter previews for relevance with LLM
424 enable_llm_filter = getattr(
425 self, "enable_llm_relevance_filter", False
426 )
428 if enable_llm_filter and self.llm:
429 filtered_items = self._filter_for_relevance(previews, query)
430 else:
431 filtered_items = previews
432 logger.debug(
433 f"[{type(self).__name__}] Relevance filter skipped "
434 f"(enabled={enable_llm_filter}, "
435 f"llm={'yes' if self.llm else 'no'})"
436 )
438 # Step 3: Get full content for filtered items
439 if self.search_snippets_only:
440 logger.info("Returning snippet-only results as per config")
441 results = filtered_items
442 else:
443 results = self._get_full_content(filtered_items)
445 for content_filter in self._content_filters:
446 results = content_filter.filter_results(results, query)
448 results_count = len(results)
449 self._last_results_count = results_count
451 # Record success if we get here and rate limiting is enabled
452 if self.rate_tracker.enabled:
453 logger.info(
454 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}"
455 )
456 self.rate_tracker.record_outcome(
457 self.engine_type,
458 self._last_wait_time,
459 success=True,
460 retry_count=1, # First attempt succeeded
461 search_result_count=results_count,
462 )
463 else:
464 logger.info(
465 f"Rate limiting disabled, not recording search for {self.engine_type}"
466 )
468 return results
470 except RateLimitError:
471 # Only re-raise if rate limiting is enabled
472 if self.rate_tracker.enabled:
473 raise
474 # If rate limiting is disabled, treat as regular error
475 success = False
476 error_message = "Rate limit hit but rate limiting disabled"
477 logger.warning(
478 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled"
479 )
480 results_count = 0
481 return []
482 except Exception as e:
483 # Other errors - don't retry
484 success = False
485 error_message = str(e)
486 logger.exception(
487 f"Search engine {self.__class__.__name__} failed"
488 )
489 results_count = 0
490 return []
492 try:
493 return _run_with_retry() # type: ignore[no-any-return]
494 except RetryError as e:
495 # All retries exhausted
496 success = False
497 error_message = f"Rate limited after all retries: {e}"
498 logger.exception(
499 f"{self.__class__.__name__} failed after all retries"
500 )
501 return []
502 except Exception as e:
503 success = False
504 error_message = str(e)
505 logger.exception(f"Search engine {self.__class__.__name__} error")
506 return []
507 finally:
508 try:
509 # Record search metrics BEFORE clearing context (record_search needs it)
510 if should_record_metrics:
511 response_time_ms = int((time.time() - start_time) * 1000)
512 SearchTracker.record_search(
513 engine_name=engine_name,
514 query=query,
515 results_count=results_count,
516 response_time_ms=response_time_ms,
517 success=success,
518 error_message=error_message,
519 )
520 finally:
521 # Clean up temporary search result storage
522 for attr in self._temp_attributes():
523 if hasattr(self, attr):
524 delattr(self, attr)
525 # ALWAYS clean up search context, even if recording fails
526 if context_was_set:
527 clear_search_context()
529 def invoke(self, query: str) -> List[Dict[str, Any]]:
530 """Compatibility method for LangChain tools"""
531 return self.run(query)
533 def _filter_for_relevance(
534 self, previews: List[Dict[str, Any]], query: str
535 ) -> List[Dict[str, Any]]:
536 """
537 Filter search results by relevance using the LLM.
539 Delegates to the ``relevance_filter`` module, which prompts the
540 LLM for a plain-text list of relevant indices and parses them
541 with a regex (no structured output).
543 Args:
544 previews: List of preview dictionaries
545 query: The original search query
547 Returns:
548 Filtered list of preview dictionaries
549 """
550 engine_name = type(self).__name__
552 if not self.llm or len(previews) <= 1:
553 logger.debug(
554 f"[{engine_name}] Skipping relevance filter "
555 f"(llm={'yes' if self.llm else 'no'}, "
556 f"previews={len(previews)})"
557 )
558 return previews
560 from .relevance_filter import filter_previews_for_relevance
562 return filter_previews_for_relevance(
563 llm=self.llm,
564 previews=previews,
565 query=query,
566 max_filtered_results=self.max_filtered_results,
567 engine_name=engine_name,
568 batch_size=self.relevance_filter_batch_size,
569 max_parallel_batches=self.relevance_filter_max_parallel_batches,
570 )
572 # =========================================================================
573 # Shared Helper Methods for Subclasses
574 # =========================================================================
576 @staticmethod
577 def _is_valid_api_key(api_key: Optional[str]) -> bool:
578 """
579 Check if an API key is valid (not a placeholder value).
581 Args:
582 api_key: The API key to validate
584 Returns:
585 True if the key appears to be a real API key, False if it's a placeholder
587 Example:
588 >>> BaseSearchEngine._is_valid_api_key("sk-abc123")
589 True
590 >>> BaseSearchEngine._is_valid_api_key("YOUR_API_KEY_HERE")
591 False
592 """
593 if not api_key:
594 return False
596 api_key = api_key.strip()
598 # Empty or whitespace-only
599 if not api_key:
600 return False
602 # Common placeholder values
603 placeholders = {
604 "",
605 "None",
606 "null",
607 "PLACEHOLDER",
608 "YOUR_API_KEY_HERE",
609 "YOUR_API_KEY",
610 "API_KEY",
611 "your_api_key",
612 "your-api-key",
613 }
615 if api_key in placeholders:
616 return False
618 # Patterns that indicate placeholders
619 if api_key.endswith("_API_KEY"):
620 return False
621 if api_key.startswith("YOUR_"):
622 return False
623 if api_key.startswith("<") and api_key.endswith(">"):
624 return False
625 if api_key.startswith("${") and api_key.endswith("}"):
626 return False
628 return True
630 def _resolve_api_key(
631 self,
632 api_key: Optional[str],
633 setting_key: str,
634 engine_name: str = "search engine",
635 settings_snapshot: Optional[Dict[str, Any]] = None,
636 ) -> str:
637 """
638 Resolve an API key from multiple sources with priority order.
640 Environment variables are handled automatically by SettingsManager
641 when building the settings snapshot, so they don't need to be
642 checked separately here.
644 Priority order:
645 1. Direct parameter (api_key argument)
646 2. Settings snapshot (via setting_key)
648 Args:
649 api_key: API key passed directly as parameter
650 setting_key: Key to look up in settings snapshot (e.g., "search.brave_api_key")
651 engine_name: Human-readable engine name for error messages
652 settings_snapshot: Optional settings snapshot dict (uses self.settings_snapshot if not provided)
654 Returns:
655 The resolved API key string
657 Raises:
658 ValueError: If no valid API key is found from any source
660 Example:
661 >>> engine._resolve_api_key(
662 ... api_key=None,
663 ... setting_key="search.brave_api_key",
664 ... engine_name="Brave Search"
665 ... )
666 "sk-abc123..."
667 """
668 # Use instance settings snapshot if not provided
669 if settings_snapshot is None:
670 settings_snapshot = self.settings_snapshot
672 # Priority 1: Direct parameter
673 if self._is_valid_api_key(api_key) and api_key is not None:
674 return api_key.strip()
676 # Priority 2: Settings snapshot (includes env var overrides via SettingsManager)
677 if settings_snapshot:
678 settings_value = get_setting_from_snapshot(
679 setting_key,
680 default=None,
681 settings_snapshot=settings_snapshot,
682 )
683 if self._is_valid_api_key(settings_value):
684 return settings_value.strip() if settings_value else ""
686 # No valid API key found
687 masked_key = self._mask_api_key(str(api_key)) if api_key else "None"
688 raise ValueError(
689 f"No valid API key found for {engine_name}. "
690 f"Checked: direct parameter ({masked_key}), "
691 f"settings key '{setting_key}'. "
692 f"Please provide a valid API key."
693 )
695 def _is_rate_limit_error(
696 self,
697 error: Union[Exception, str, int],
698 additional_patterns: Optional[Set[str]] = None,
699 ) -> bool:
700 """
701 Detect if an error is a rate limit error.
703 Checks multiple sources for rate limit indicators:
704 - HTTP status code 429
705 - HTTPError response objects
706 - Error messages containing rate limit phrases
708 Args:
709 error: The error to check (Exception, string, or HTTP status code)
710 additional_patterns: Optional set of additional patterns to match
712 Returns:
713 True if the error appears to be a rate limit error
715 Example:
716 >>> engine._is_rate_limit_error(429)
717 True
718 >>> engine._is_rate_limit_error("Rate limit exceeded")
719 True
720 >>> engine._is_rate_limit_error(ValueError("Invalid input"))
721 False
722 """
723 # Combine default and additional patterns
724 patterns = self.rate_limit_patterns.copy()
725 if additional_patterns:
726 patterns.update(additional_patterns)
728 # Check integer status code directly
729 if isinstance(error, int):
730 return error == 429
732 # Convert to string for pattern matching
733 error_str = ""
734 status_code = None
736 if isinstance(error, str):
737 error_str = error
738 elif isinstance(error, Exception):
739 error_str = str(error)
741 # Check for HTTP status code in common HTTP error types
742 if hasattr(error, "status_code"):
743 status_code = error.status_code
744 elif hasattr(error, "response"):
745 response = error.response
746 if hasattr(response, "status_code"):
747 status_code = response.status_code
749 # Check status code first
750 if status_code == 429:
751 return True
753 # Case-insensitive pattern matching
754 error_lower = error_str.lower()
755 for pattern in patterns:
756 if pattern.lower() in error_lower:
757 return True
759 return False
761 def _raise_if_rate_limit(
762 self,
763 error: Union[Exception, str, int],
764 additional_patterns: Optional[Set[str]] = None,
765 ) -> None:
766 """
767 Raise RateLimitError if the given error is a rate limit error.
769 Convenience method that combines _is_rate_limit_error check with
770 raising RateLimitError.
772 Args:
773 error: The error to check
774 additional_patterns: Optional set of additional patterns to match
776 Raises:
777 RateLimitError: If the error is detected as a rate limit error
779 Example:
780 >>> try:
781 ... response = make_api_call()
782 ... except Exception as e:
783 ... engine._raise_if_rate_limit(e)
784 """
785 if self._is_rate_limit_error(error, additional_patterns):
786 error_msg = str(error) if not isinstance(error, str) else error
787 raise RateLimitError(
788 f"Rate limit detected: {self._sanitize_error_message(error_msg)}"
789 )
791 def _extract_full_result(self, item: Dict[str, Any]) -> Dict[str, Any]:
792 """
793 Extract the full result from an item that may contain a _full_result key.
795 This is a helper for the default _get_full_content implementation.
796 It extracts data from the _full_result key if present, otherwise uses
797 the item directly, and removes the internal _full_result key.
799 Args:
800 item: A search result item that may contain a _full_result key
802 Returns:
803 A dictionary with the full result data, without the _full_result key
805 Example:
806 >>> engine._extract_full_result({"title": "A", "_full_result": {"title": "A", "content": "Full"}})
807 {"title": "A", "content": "Full"}
808 """
809 source = item.get("_full_result")
810 if source is None:
811 source = item
812 return {k: v for k, v in source.items() if k != "_full_result"}
814 def _get_full_content(
815 self, relevant_items: List[Dict[str, Any]]
816 ) -> List[Dict[str, Any]]:
817 """
818 Get full content for the relevant items.
820 Default implementation extracts data from _full_result keys if present.
821 Subclasses can override this method to fetch additional content from
822 external sources (e.g., web scraping, API calls).
824 Args:
825 relevant_items: List of relevant preview dictionaries
827 Returns:
828 List of result dictionaries with full content
830 Example:
831 >>> engine._get_full_content([
832 ... {"title": "A", "_full_result": {"title": "A", "content": "Full A"}},
833 ... {"title": "B"}
834 ... ])
835 [{"title": "A", "content": "Full A"}, {"title": "B"}]
836 """
837 if not relevant_items:
838 return []
839 return [self._extract_full_result(item) for item in relevant_items]
841 def _init_full_search(
842 self,
843 web_search=None,
844 language="en",
845 max_results=10,
846 region=None,
847 time_period=None,
848 safe_search=None,
849 ):
850 """Initialize FullSearchResults if include_full_content is True.
852 Call this at the end of your __init__ after setting up your search wrapper.
854 Args:
855 web_search: The search wrapper/engine to pass to FullSearchResults
856 language: Language for search results
857 max_results: Maximum number of results
858 region: Region/country code for results
859 time_period: Time period filter
860 safe_search: Safe search setting (string value for FullSearchResults)
861 """
862 if self.include_full_content and self.llm:
863 try:
864 from .engines.full_search import FullSearchResults
866 self.full_search = FullSearchResults(
867 llm=self.llm,
868 web_search=web_search,
869 language=language,
870 max_results=max_results,
871 region=region,
872 time=time_period,
873 safesearch=safe_search,
874 )
875 except ImportError:
876 logger.warning(
877 "FullSearchResults not available. "
878 "Full content retrieval disabled."
879 )
880 self.include_full_content = False
882 def _temp_attributes(self):
883 """Return list of temporary attribute names to clean up after run().
885 Override in subclasses that store additional temporary data.
886 """
887 return ["_search_results"]
889 def _sanitize_error_message(self, message: str) -> str:
890 """
891 Remove/mask API keys, tokens, and secrets from error messages.
893 Uses pattern matching for common credential formats.
895 Args:
896 message: The error message to sanitize
898 Returns:
899 Sanitized message with sensitive data redacted
901 Example:
902 >>> engine._sanitize_error_message("Error with key sk-abc123xyz")
903 "Error with key [REDACTED]"
904 """
905 if not message:
906 return message
908 sanitized = message
910 # Additional regex patterns for common credential formats
911 patterns = [
912 # Bearer tokens
913 (r"Bearer\s+[A-Za-z0-9\-._~+/]+=*", "Bearer [REDACTED]"),
914 # API keys in URLs
915 (
916 r"([?&])(api_key|apikey|key|token|secret)=([A-Za-z0-9\-._~+/]+)",
917 r"\1\2=[REDACTED]",
918 ),
919 # URL credentials (user:pass@host)
920 (r"(https?://)([^:\s]+):([^@\s]+)@", r"\1[REDACTED]:[REDACTED]@"),
921 # Common API key patterns (sk-*, pk-*, etc.)
922 (r"\b(sk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"),
923 (r"\b(pk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"),
924 ]
926 for pattern, replacement in patterns:
927 sanitized = re.sub(pattern, replacement, sanitized)
929 return sanitized
931 def _mask_api_key(self, api_key: str, visible_chars: int = 4) -> str:
932 """
933 Mask an API key for safe logging, showing only first and last characters.
935 Args:
936 api_key: The API key to mask
937 visible_chars: Number of characters to show at start and end
939 Returns:
940 Masked API key in format "sk-1...nop" or "***" for short keys
942 Example:
943 >>> engine._mask_api_key("sk-abcdefghijklmnop123456")
944 "sk-a...3456"
945 >>> engine._mask_api_key("short")
946 "***"
947 """
948 if not api_key:
949 return "***"
951 api_key = str(api_key).strip()
953 if len(api_key) <= visible_chars * 2:
954 return "***"
956 return f"{api_key[:visible_chars]}...{api_key[-visible_chars:]}"
958 @abstractmethod
959 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
960 """
961 Get preview information (titles, summaries) for initial search results.
963 Args:
964 query: The search query
966 Returns:
967 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys
968 """
969 pass
971 def close(self) -> None:
972 """
973 Close any resources held by this search engine.
975 Subclasses with HTTP sessions or other resources should override this.
976 The base implementation safely closes any 'session' attribute if present
977 and closes content filters that hold resources.
978 """
979 from ..utilities.resource_utils import safe_close
981 if hasattr(self, "session") and self.session is not None:
982 safe_close(self.session, "HTTP session")
983 if hasattr(self, "_content_filters"): 983 ↛ exitline 983 didn't return from function 'close' because the condition on line 983 was always true
984 for content_filter in self._content_filters: 984 ↛ 985line 984 didn't jump to line 985 because the loop on line 984 never started
985 safe_close(content_filter, "content filter")
987 def __enter__(self):
988 """Support context manager usage."""
989 return self
991 def __exit__(self, exc_type, exc_val, exc_tb):
992 """Cleanup on context exit."""
993 self.close()
994 return False