Coverage for src/local_deep_research/web_search_engines/search_engine_base.py: 97%

335 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import json 

2import re 

3import time 

4from abc import ABC, abstractmethod 

5from typing import Any, Dict, List, Optional, Set, Union 

6 

7from langchain_core.language_models import BaseLLM 

8from loguru import logger 

9from tenacity import ( 

10 RetryError, 

11 retry, 

12 retry_if_exception_type, 

13 stop_after_attempt, 

14) 

15from tenacity.wait import wait_base 

16 

17from ..advanced_search_system.filters.base_filter import BaseFilter 

18from ..config.constants import DEFAULT_MAX_FILTERED_RESULTS 

19from ..config.thread_settings import get_setting_from_snapshot 

20from ..utilities.thread_context import clear_search_context, set_search_context 

21 

22from .rate_limiting import RateLimitError, get_tracker 

23 

24 

25class AdaptiveWait(wait_base): 

26 """Custom wait strategy that uses adaptive rate limiting.""" 

27 

28 def __init__(self, get_wait_func): 

29 self.get_wait_func = get_wait_func 

30 

31 def __call__(self, retry_state): 

32 return self.get_wait_func() 

33 

34 

35class BaseSearchEngine(ABC): 

36 """ 

37 Abstract base class for search engines with two-phase retrieval capability. 

38 Handles common parameters and implements the two-phase search approach. 

39 

40 Subclass contract for ``__init__`` 

41 --------------------------------- 

42 Concrete engines should forward ``settings_snapshot`` and 

43 ``programmatic_mode`` to ``super().__init__`` so the base class can wire 

44 them up correctly. The cleanest way is to declare ``**kwargs`` and pass 

45 it along:: 

46 

47 def __init__(self, max_results=10, *, my_param=None, **kwargs): 

48 super().__init__(max_results=max_results, **kwargs) 

49 self.my_param = my_param 

50 

51 Many existing engines accept ``**kwargs`` but don't forward — that 

52 silently drops ``programmatic_mode`` (and used to bind the wrong rate 

53 tracker). The factory has a post-construction safety net that calls 

54 ``_configure_programmatic_mode`` when the engine's mode doesn't match 

55 what the caller asked for, but new engines should not rely on it: the 

56 safety net only covers ``programmatic_mode``, not ``settings_snapshot`` 

57 or other base kwargs. 

58 """ 

59 

60 # Class attribute to indicate if this engine searches public internet sources 

61 # Should be overridden by subclasses - defaults to False for safety 

62 is_public = False 

63 

64 # Class attribute to indicate if this is a generic search engine (vs specialized) 

65 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed). 

66 # Note: generic does NOT imply good native ranking — see is_lexical. 

67 is_generic = False 

68 

69 # Class attribute to indicate if this is a scientific/academic search engine 

70 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc. 

71 is_scientific = False 

72 

73 # Class attribute to indicate if this is a local RAG/document search engine 

74 # Local engines search private document collections stored locally 

75 is_local = False 

76 

77 # Class attribute to indicate if this is a news search engine 

78 # News engines specialize in news articles and current events 

79 is_news = False 

80 

81 # Class attribute to indicate if this is a code search engine 

82 # Code engines specialize in searching code repositories 

83 is_code = False 

84 

85 # Class attribute to indicate if this is a book/literature search engine 

86 # Book engines search libraries and literary archives 

87 is_books = False 

88 

89 # Classification: does this engine use lexical/keyword-based search? 

90 # Lexical engines (arXiv, PubMed, Wikipedia, Mojeek, etc.) match results by 

91 # keywords without ML-based ranking. This is an informational flag that can 

92 # drive multiple behaviors (query optimization, result deduplication, UI hints). 

93 # For LLM relevance filtering specifically, see needs_llm_relevance_filter. 

94 is_lexical = False 

95 

96 # Behavioral: should the factory auto-enable LLM relevance filtering? 

97 # When True, the factory sets enable_llm_relevance_filter=True on the engine 

98 # instance, causing _filter_for_relevance() to run after previews are fetched. 

99 # Typically set alongside is_lexical=True, but can be set independently — 

100 # e.g. a non-lexical engine with noisy results could opt in. 

101 needs_llm_relevance_filter = False 

102 

103 # Tuning for the LLM relevance filter (only applies when the filter 

104 # is active for this engine). 

105 # 

106 # relevance_filter_batch_size: split previews into chunks of this many 

107 # before sending to the LLM. Smaller batches are faster per call and 

108 # more reliable on weaker models which struggle with many indices in 

109 # one context. None or 0 = single-call mode (no batching). 

110 # 

111 # relevance_filter_max_parallel_batches: number of batches to dispatch 

112 # concurrently against the LLM. 1 = sequential. Most providers handle 

113 # parallel requests fine (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI, 

114 # Anthropic). 

115 relevance_filter_batch_size: Optional[int] = 5 

116 relevance_filter_max_parallel_batches: int = 10 

117 

118 # Class attribute for rate limit detection patterns 

119 # Subclasses can override to add engine-specific patterns 

120 rate_limit_patterns: Set[str] = { 

121 "rate limit", 

122 "rate_limit", 

123 "ratelimit", 

124 "too many requests", 

125 "throttl", 

126 "quota exceeded", 

127 "quota_exceeded", 

128 "limit exceeded", 

129 "request limit", 

130 "api limit", 

131 "usage limit", 

132 } 

133 

134 @staticmethod 

135 def _ensure_list(value, *, default=None): 

136 """Normalize a value that should be a list. 

137 

138 Handles JSON-encoded strings, comma-separated strings, and 

139 already-parsed lists. Returns *default* (empty list when not 

140 supplied) for ``None`` or empty/unparseable input. 

141 """ 

142 if default is None: 

143 default = [] 

144 if value is None: 

145 return default 

146 if isinstance(value, list): 

147 return value 

148 if isinstance(value, str): 

149 stripped = value.strip() 

150 if not stripped: 

151 return default 

152 if stripped.startswith("["): 

153 try: 

154 parsed = json.loads(stripped) 

155 if isinstance(parsed, list): 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true

156 return [str(item) for item in parsed] 

157 except (json.JSONDecodeError, ValueError, RecursionError): 

158 pass 

159 return [ 

160 item.strip() for item in stripped.split(",") if item.strip() 

161 ] 

162 return default 

163 

164 @classmethod 

165 def _load_engine_class(cls, name: str, config: Dict[str, Any]): 

166 """ 

167 Helper method to load an engine class dynamically. 

168 

169 Args: 

170 name: Engine name 

171 config: Engine configuration dict with module_path and class_name 

172 

173 Returns: 

174 Tuple of (success: bool, engine_class or None, error_msg or None) 

175 """ 

176 from ..security.module_whitelist import ( 

177 ModuleNotAllowedError, 

178 get_safe_module_class, 

179 ) 

180 

181 try: 

182 module_path = config.get("module_path") 

183 class_name = config.get("class_name") 

184 

185 if not module_path or not class_name: 

186 return ( 

187 False, 

188 None, 

189 f"Missing module_path or class_name for {name}", 

190 ) 

191 

192 # Use whitelist-validated safe import 

193 engine_class = get_safe_module_class(module_path, class_name) 

194 

195 return True, engine_class, None 

196 

197 except ModuleNotAllowedError as e: 

198 return ( 

199 False, 

200 None, 

201 f"Security error loading engine class for {name}: {e}", 

202 ) 

203 except Exception as e: 

204 return False, None, f"Could not load engine class for {name}: {e}" 

205 

206 @classmethod 

207 def _check_api_key_availability( 

208 cls, name: str, config: Dict[str, Any] 

209 ) -> bool: 

210 """ 

211 Helper method to check if an engine's API key is available and valid. 

212 

213 Args: 

214 name: Engine name 

215 config: Engine configuration dict 

216 

217 Returns: 

218 True if API key is not required or is available and valid 

219 """ 

220 from loguru import logger 

221 

222 if not config.get("requires_api_key", False): 

223 return True 

224 

225 api_key = config.get("api_key", "").strip() 

226 

227 # Check for common placeholder values 

228 if ( 

229 not api_key 

230 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"] 

231 or api_key.endswith( 

232 "_API_KEY" 

233 ) # Default placeholders like BRAVE_API_KEY 

234 or api_key.startswith("YOUR_") 

235 or api_key == "null" 

236 ): 

237 logger.debug( 

238 f"Skipping {name} - requires API key but none configured" 

239 ) 

240 return False 

241 

242 return True 

243 

244 def __init__( 

245 self, 

246 llm: Optional[BaseLLM] = None, 

247 max_filtered_results: Optional[int] = None, 

248 max_results: Optional[int] = 10, # Default value if not provided 

249 preview_filters: List[BaseFilter] | None = None, 

250 content_filters: List[BaseFilter] | None = None, 

251 search_snippets_only: bool = True, # New parameter with default 

252 include_full_content: bool = False, 

253 settings_snapshot: Optional[Dict[str, Any]] = None, 

254 programmatic_mode: bool = False, 

255 **kwargs, 

256 ): 

257 """ 

258 Initialize the search engine with common parameters. 

259 

260 Args: 

261 llm: Optional language model for relevance filtering 

262 max_filtered_results: Maximum number of results to keep after filtering 

263 max_results: Maximum number of search results to return 

264 preview_filters: Filters that will be applied to all previews 

265 produced by the search engine, before relevancy checks. 

266 content_filters: Filters that will be applied to the full content 

267 produced by the search engine, after relevancy checks. 

268 search_snippets_only: Whether to return only snippets or full content 

269 include_full_content: Whether to use FullSearchResults for full webpage content 

270 settings_snapshot: Settings snapshot for configuration 

271 programmatic_mode: If True, disables database operations and uses memory-only tracking 

272 **kwargs: Additional engine-specific parameters 

273 """ 

274 if max_filtered_results is None: 

275 max_filtered_results = DEFAULT_MAX_FILTERED_RESULTS 

276 if max_results is None: 

277 max_results = 10 

278 self._preview_filters: List[BaseFilter] = ( 

279 preview_filters if preview_filters is not None else [] 

280 ) 

281 self._content_filters: List[BaseFilter] = ( 

282 content_filters if content_filters is not None else [] 

283 ) 

284 

285 self.llm = llm # LLM for relevance filtering 

286 self._max_filtered_results = int( 

287 max_filtered_results 

288 ) # Ensure it's an integer 

289 self._max_results = max( 

290 1, int(max_results) 

291 ) # Ensure it's a positive integer 

292 self.search_snippets_only = search_snippets_only # Store the setting 

293 self.include_full_content = include_full_content 

294 self.settings_snapshot = ( 

295 settings_snapshot or {} 

296 ) # Store settings snapshot 

297 

298 self.engine_type = self.__class__.__name__ 

299 self._configure_programmatic_mode(programmatic_mode) 

300 self._last_wait_time = ( 

301 0.0 # Default to 0 for successful searches without rate limiting 

302 ) 

303 self._last_results_count = 0 

304 

305 def _configure_programmatic_mode(self, programmatic_mode: bool) -> None: 

306 """Set ``programmatic_mode`` and (re)bind the matching rate tracker. 

307 

308 Called from ``__init__`` and from the factory as a fallback when an 

309 engine subclass swallows the ``programmatic_mode`` kwarg without 

310 forwarding it to ``super().__init__``. Safe to call after init — 

311 rebinding ``rate_tracker`` discards the previous tracker (no 

312 resources to release) and the new one starts with empty in-memory 

313 caches. 

314 """ 

315 self.programmatic_mode = programmatic_mode 

316 if programmatic_mode: 

317 from .rate_limiting.tracker import AdaptiveRateLimitTracker 

318 

319 self.rate_tracker = AdaptiveRateLimitTracker( 

320 settings_snapshot=self.settings_snapshot, 

321 programmatic_mode=programmatic_mode, 

322 ) 

323 else: 

324 self.rate_tracker = get_tracker() 

325 

326 @property 

327 def max_filtered_results(self) -> int: 

328 """Get the maximum number of filtered results.""" 

329 return self._max_filtered_results 

330 

331 @max_filtered_results.setter 

332 def max_filtered_results(self, value: int) -> None: 

333 """Set the maximum number of filtered results.""" 

334 if value is None: 

335 value = DEFAULT_MAX_FILTERED_RESULTS 

336 logger.warning( 

337 f"Setting max_filtered_results to {DEFAULT_MAX_FILTERED_RESULTS}" 

338 ) 

339 self._max_filtered_results = int(value) 

340 

341 @property 

342 def max_results(self) -> int: 

343 """Get the maximum number of search results.""" 

344 return self._max_results 

345 

346 @max_results.setter 

347 def max_results(self, value: int) -> None: 

348 """Set the maximum number of search results.""" 

349 if value is None: 

350 value = 10 

351 self._max_results = max(1, int(value)) 

352 

353 def _get_adaptive_wait(self) -> float: 

354 """Get adaptive wait time from tracker.""" 

355 wait_time = self.rate_tracker.get_wait_time(self.engine_type) 

356 self._last_wait_time = wait_time 

357 logger.debug( 

358 f"{self.engine_type} waiting {wait_time:.2f}s before retry" 

359 ) 

360 return wait_time 

361 

362 def _record_retry_outcome(self, retry_state) -> None: 

363 """Record outcome after retry completes.""" 

364 success = ( 

365 not retry_state.outcome.failed if retry_state.outcome else False 

366 ) 

367 self.rate_tracker.record_outcome( 

368 self.engine_type, 

369 self._last_wait_time or 0, 

370 success, 

371 retry_state.attempt_number, 

372 error_type="RateLimitError" if not success else None, 

373 search_result_count=self._last_results_count if success else 0, 

374 ) 

375 

376 def run( 

377 self, query: str, research_context: Dict[str, Any] | None = None 

378 ) -> List[Dict[str, Any]]: 

379 """ 

380 Run the search engine with a given query, retrieving and filtering results. 

381 This implements a two-phase retrieval approach: 

382 1. Get preview information for many results 

383 2. Filter the previews for relevance 

384 3. Get full content for only the relevant results 

385 

386 Args: 

387 query: The search query 

388 research_context: Context from previous research to use. 

389 

390 Returns: 

391 List of search results with full content (if available) 

392 """ 

393 logger.info(f"---Execute a search using {self.__class__.__name__}---") 

394 

395 # Track search call for metrics (if available and not in programmatic mode) 

396 should_record_metrics = False 

397 context_was_set = False 

398 if not self.programmatic_mode: 

399 from ..metrics.search_tracker import SearchTracker 

400 

401 should_record_metrics = True 

402 

403 # For thread-safe context propagation: if we have research_context parameter, use it 

404 # Otherwise, try to inherit from current thread context (normal case) 

405 # This allows strategies running in threads to explicitly pass context when needed 

406 if research_context: 

407 # Explicit context provided - use it and set it for this thread 

408 set_search_context(research_context) 

409 context_was_set = True 

410 

411 engine_name = self.__class__.__name__.replace( 

412 "SearchEngine", "" 

413 ).lower() 

414 start_time = time.time() 

415 

416 success = True 

417 error_message = None 

418 results_count = 0 

419 

420 # Define the core search function with retry logic 

421 if self.rate_tracker.enabled: 

422 # Rate limiting enabled - use retry with adaptive wait 

423 @retry( 

424 stop=stop_after_attempt(3), 

425 wait=AdaptiveWait(lambda: self._get_adaptive_wait()), 

426 retry=retry_if_exception_type((RateLimitError,)), 

427 after=self._record_retry_outcome, 

428 reraise=True, 

429 ) 

430 def _run_with_retry(): 

431 nonlocal success, error_message, results_count 

432 return _execute_search() 

433 else: 

434 # Rate limiting disabled - run without retry 

435 def _run_with_retry(): 

436 nonlocal success, error_message, results_count 

437 return _execute_search() 

438 

439 def _execute_search(): 

440 nonlocal success, error_message, results_count 

441 

442 try: 

443 # Step 1: Get preview information for items 

444 previews = self._get_previews(query) 

445 if not previews: 

446 logger.info( 

447 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}" 

448 ) 

449 results_count = 0 

450 return [] 

451 

452 # Pre-filter enrichment: resolve DOIs to OpenAlex source 

453 # IDs BEFORE the preview filters run. The 

454 # JournalReputationFilter is registered as a preview 

455 # filter and uses ``result["openalex_source_id"]`` for 

456 # Tier 2 lookups; running enrichment afterwards (as the 

457 # old pipeline did) left the field empty at filter time, 

458 # silently forcing a fragile-name-match fallback. Only 

459 # for scientific engines whose results carry DOIs. 

460 if getattr(self, "is_scientific", False): 

461 try: 

462 from ..utilities.openalex_enrichment import ( 

463 enrich_results_with_source_ids, 

464 ) 

465 

466 email = getattr(self, "email", None) 

467 previews = enrich_results_with_source_ids( 

468 previews, email=email 

469 ) 

470 except Exception: 

471 logger.debug( 

472 "DOI enrichment skipped (import or call failed)" 

473 ) 

474 

475 for preview_filter in self._preview_filters: 

476 previews = preview_filter.filter_results(previews, query) 

477 

478 # Step 2: Filter previews for relevance with LLM 

479 enable_llm_filter = getattr( 

480 self, "enable_llm_relevance_filter", False 

481 ) 

482 

483 if enable_llm_filter and self.llm: 

484 filtered_items = self._filter_for_relevance(previews, query) 

485 else: 

486 filtered_items = previews 

487 logger.debug( 

488 f"[{type(self).__name__}] Relevance filter skipped " 

489 f"(enabled={enable_llm_filter}, " 

490 f"llm={'yes' if self.llm else 'no'})" 

491 ) 

492 

493 # Step 3: Get full content for filtered items 

494 if self.search_snippets_only: 

495 logger.info("Returning snippet-only results as per config") 

496 results = filtered_items 

497 else: 

498 results = self._get_full_content(filtered_items) 

499 

500 for content_filter in self._content_filters: 

501 results = content_filter.filter_results(results, query) 

502 

503 results_count = len(results) 

504 self._last_results_count = results_count 

505 

506 # Record success if we get here and rate limiting is enabled 

507 if self.rate_tracker.enabled: 

508 logger.info( 

509 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}" 

510 ) 

511 self.rate_tracker.record_outcome( 

512 self.engine_type, 

513 self._last_wait_time, 

514 success=True, 

515 retry_count=1, # First attempt succeeded 

516 search_result_count=results_count, 

517 ) 

518 else: 

519 logger.info( 

520 f"Rate limiting disabled, not recording search for {self.engine_type}" 

521 ) 

522 

523 return results 

524 

525 except RateLimitError: 

526 # Only re-raise if rate limiting is enabled 

527 if self.rate_tracker.enabled: 

528 raise 

529 # If rate limiting is disabled, treat as regular error 

530 success = False 

531 error_message = "Rate limit hit but rate limiting disabled" 

532 logger.warning( 

533 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled" 

534 ) 

535 results_count = 0 

536 return [] 

537 except Exception as e: 

538 # Other errors - don't retry 

539 success = False 

540 error_message = str(e) 

541 logger.exception( 

542 f"Search engine {self.__class__.__name__} failed" 

543 ) 

544 results_count = 0 

545 return [] 

546 

547 try: 

548 return _run_with_retry() # type: ignore[no-any-return] 

549 except RetryError as e: 

550 # All retries exhausted 

551 success = False 

552 error_message = f"Rate limited after all retries: {e}" 

553 logger.exception( 

554 f"{self.__class__.__name__} failed after all retries" 

555 ) 

556 return [] 

557 except Exception as e: 

558 success = False 

559 error_message = str(e) 

560 logger.exception(f"Search engine {self.__class__.__name__} error") 

561 return [] 

562 finally: 

563 try: 

564 # Record search metrics BEFORE clearing context (record_search needs it) 

565 if should_record_metrics: 

566 response_time_ms = int((time.time() - start_time) * 1000) 

567 SearchTracker.record_search( 

568 engine_name=engine_name, 

569 query=query, 

570 results_count=results_count, 

571 response_time_ms=response_time_ms, 

572 success=success, 

573 error_message=error_message, 

574 ) 

575 finally: 

576 # Clean up temporary search result storage 

577 for attr in self._temp_attributes(): 

578 if hasattr(self, attr): 

579 delattr(self, attr) 

580 # ALWAYS clean up search context, even if recording fails 

581 if context_was_set: 

582 clear_search_context() 

583 

584 def invoke(self, query: str) -> List[Dict[str, Any]]: 

585 """Compatibility method for LangChain tools""" 

586 return self.run(query) 

587 

588 def _filter_for_relevance( 

589 self, previews: List[Dict[str, Any]], query: str 

590 ) -> List[Dict[str, Any]]: 

591 """ 

592 Filter search results by relevance using the LLM. 

593 

594 Delegates to the ``relevance_filter`` module, which prompts the 

595 LLM for a plain-text list of relevant indices and parses them 

596 with a regex (no structured output). 

597 

598 Args: 

599 previews: List of preview dictionaries 

600 query: The original search query 

601 

602 Returns: 

603 Filtered list of preview dictionaries 

604 """ 

605 engine_name = type(self).__name__ 

606 

607 if not self.llm or len(previews) <= 1: 

608 logger.debug( 

609 f"[{engine_name}] Skipping relevance filter " 

610 f"(llm={'yes' if self.llm else 'no'}, " 

611 f"previews={len(previews)})" 

612 ) 

613 return previews 

614 

615 from .relevance_filter import filter_previews_for_relevance 

616 

617 return filter_previews_for_relevance( 

618 llm=self.llm, 

619 previews=previews, 

620 query=query, 

621 max_filtered_results=self.max_filtered_results, 

622 engine_name=engine_name, 

623 batch_size=self.relevance_filter_batch_size, 

624 max_parallel_batches=self.relevance_filter_max_parallel_batches, 

625 ) 

626 

627 # ========================================================================= 

628 # Shared Helper Methods for Subclasses 

629 # ========================================================================= 

630 

631 @staticmethod 

632 def _is_valid_api_key(api_key: Optional[str]) -> bool: 

633 """ 

634 Check if an API key is valid (not a placeholder value). 

635 

636 Args: 

637 api_key: The API key to validate 

638 

639 Returns: 

640 True if the key appears to be a real API key, False if it's a placeholder 

641 

642 Example: 

643 >>> BaseSearchEngine._is_valid_api_key("sk-abc123") 

644 True 

645 >>> BaseSearchEngine._is_valid_api_key("YOUR_API_KEY_HERE") 

646 False 

647 """ 

648 if not api_key: 

649 return False 

650 

651 api_key = api_key.strip() 

652 

653 # Empty or whitespace-only 

654 if not api_key: 

655 return False 

656 

657 # Common placeholder values 

658 placeholders = { 

659 "", 

660 "None", 

661 "null", 

662 "PLACEHOLDER", 

663 "YOUR_API_KEY_HERE", 

664 "YOUR_API_KEY", 

665 "API_KEY", 

666 "your_api_key", 

667 "your-api-key", 

668 } 

669 

670 if api_key in placeholders: 

671 return False 

672 

673 # Patterns that indicate placeholders 

674 if api_key.endswith("_API_KEY"): 

675 return False 

676 if api_key.startswith("YOUR_"): 

677 return False 

678 if api_key.startswith("<") and api_key.endswith(">"): 

679 return False 

680 if api_key.startswith("${") and api_key.endswith("}"): 

681 return False 

682 

683 return True 

684 

685 def _resolve_api_key( 

686 self, 

687 api_key: Optional[str], 

688 setting_key: str, 

689 engine_name: str = "search engine", 

690 settings_snapshot: Optional[Dict[str, Any]] = None, 

691 ) -> str: 

692 """ 

693 Resolve an API key from multiple sources with priority order. 

694 

695 Environment variables are handled automatically by SettingsManager 

696 when building the settings snapshot, so they don't need to be 

697 checked separately here. 

698 

699 Priority order: 

700 1. Direct parameter (api_key argument) 

701 2. Settings snapshot (via setting_key) 

702 

703 Args: 

704 api_key: API key passed directly as parameter 

705 setting_key: Key to look up in settings snapshot (e.g., "search.brave_api_key") 

706 engine_name: Human-readable engine name for error messages 

707 settings_snapshot: Optional settings snapshot dict (uses self.settings_snapshot if not provided) 

708 

709 Returns: 

710 The resolved API key string 

711 

712 Raises: 

713 ValueError: If no valid API key is found from any source 

714 

715 Example: 

716 >>> engine._resolve_api_key( 

717 ... api_key=None, 

718 ... setting_key="search.brave_api_key", 

719 ... engine_name="Brave Search" 

720 ... ) 

721 "sk-abc123..." 

722 """ 

723 # Use instance settings snapshot if not provided 

724 if settings_snapshot is None: 

725 settings_snapshot = self.settings_snapshot 

726 

727 # Priority 1: Direct parameter 

728 if self._is_valid_api_key(api_key) and api_key is not None: 

729 return api_key.strip() 

730 

731 # Priority 2: Settings snapshot (includes env var overrides via SettingsManager) 

732 if settings_snapshot: 

733 settings_value = get_setting_from_snapshot( 

734 setting_key, 

735 default=None, 

736 settings_snapshot=settings_snapshot, 

737 ) 

738 if self._is_valid_api_key(settings_value): 

739 return settings_value.strip() if settings_value else "" 

740 

741 # No valid API key found 

742 masked_key = self._mask_api_key(str(api_key)) if api_key else "None" 

743 raise ValueError( 

744 f"No valid API key found for {engine_name}. " 

745 f"Checked: direct parameter ({masked_key}), " 

746 f"settings key '{setting_key}'. " 

747 f"Please provide a valid API key." 

748 ) 

749 

750 def _is_rate_limit_error( 

751 self, 

752 error: Union[Exception, str, int], 

753 additional_patterns: Optional[Set[str]] = None, 

754 ) -> bool: 

755 """ 

756 Detect if an error is a rate limit error. 

757 

758 Checks multiple sources for rate limit indicators: 

759 - HTTP status code 429 

760 - HTTPError response objects 

761 - Error messages containing rate limit phrases 

762 

763 Args: 

764 error: The error to check (Exception, string, or HTTP status code) 

765 additional_patterns: Optional set of additional patterns to match 

766 

767 Returns: 

768 True if the error appears to be a rate limit error 

769 

770 Example: 

771 >>> engine._is_rate_limit_error(429) 

772 True 

773 >>> engine._is_rate_limit_error("Rate limit exceeded") 

774 True 

775 >>> engine._is_rate_limit_error(ValueError("Invalid input")) 

776 False 

777 """ 

778 # Combine default and additional patterns 

779 patterns = self.rate_limit_patterns.copy() 

780 if additional_patterns: 

781 patterns.update(additional_patterns) 

782 

783 # Check integer status code directly 

784 if isinstance(error, int): 

785 return error == 429 

786 

787 # Convert to string for pattern matching 

788 error_str = "" 

789 status_code = None 

790 

791 if isinstance(error, str): 

792 error_str = error 

793 elif isinstance(error, Exception): 

794 error_str = str(error) 

795 

796 # Check for HTTP status code in common HTTP error types 

797 if hasattr(error, "status_code"): 

798 status_code = error.status_code 

799 elif hasattr(error, "response"): 

800 response = error.response 

801 if hasattr(response, "status_code"): 

802 status_code = response.status_code 

803 

804 # Check status code first 

805 if status_code == 429: 

806 return True 

807 

808 # Case-insensitive pattern matching 

809 error_lower = error_str.lower() 

810 for pattern in patterns: 

811 if pattern.lower() in error_lower: 

812 return True 

813 

814 return False 

815 

816 def _raise_if_rate_limit( 

817 self, 

818 error: Union[Exception, str, int], 

819 additional_patterns: Optional[Set[str]] = None, 

820 ) -> None: 

821 """ 

822 Raise RateLimitError if the given error is a rate limit error. 

823 

824 Convenience method that combines _is_rate_limit_error check with 

825 raising RateLimitError. 

826 

827 Args: 

828 error: The error to check 

829 additional_patterns: Optional set of additional patterns to match 

830 

831 Raises: 

832 RateLimitError: If the error is detected as a rate limit error 

833 

834 Example: 

835 >>> try: 

836 ... response = make_api_call() 

837 ... except Exception as e: 

838 ... engine._raise_if_rate_limit(e) 

839 """ 

840 if self._is_rate_limit_error(error, additional_patterns): 

841 error_msg = str(error) if not isinstance(error, str) else error 

842 raise RateLimitError( 

843 f"Rate limit detected: {self._sanitize_error_message(error_msg)}" 

844 ) 

845 

846 def _extract_full_result(self, item: Dict[str, Any]) -> Dict[str, Any]: 

847 """ 

848 Extract the full result from an item that may contain a _full_result key. 

849 

850 This is a helper for the default _get_full_content implementation. 

851 It extracts data from the _full_result key if present, otherwise uses 

852 the item directly, and removes the internal _full_result key. 

853 

854 Args: 

855 item: A search result item that may contain a _full_result key 

856 

857 Returns: 

858 A dictionary with the full result data, without the _full_result key 

859 

860 Example: 

861 >>> engine._extract_full_result({"title": "A", "_full_result": {"title": "A", "content": "Full"}}) 

862 {"title": "A", "content": "Full"} 

863 """ 

864 source = item.get("_full_result") 

865 if source is None: 

866 source = item 

867 return {k: v for k, v in source.items() if k != "_full_result"} 

868 

869 def _get_full_content( 

870 self, relevant_items: List[Dict[str, Any]] 

871 ) -> List[Dict[str, Any]]: 

872 """ 

873 Get full content for the relevant items. 

874 

875 Default implementation extracts data from _full_result keys if present. 

876 Subclasses can override this method to fetch additional content from 

877 external sources (e.g., web scraping, API calls). 

878 

879 Args: 

880 relevant_items: List of relevant preview dictionaries 

881 

882 Returns: 

883 List of result dictionaries with full content 

884 

885 Example: 

886 >>> engine._get_full_content([ 

887 ... {"title": "A", "_full_result": {"title": "A", "content": "Full A"}}, 

888 ... {"title": "B"} 

889 ... ]) 

890 [{"title": "A", "content": "Full A"}, {"title": "B"}] 

891 """ 

892 if not relevant_items: 

893 return [] 

894 return [self._extract_full_result(item) for item in relevant_items] 

895 

896 def _init_full_search( 

897 self, 

898 web_search=None, 

899 language="en", 

900 max_results=10, 

901 region=None, 

902 time_period=None, 

903 safe_search=None, 

904 ): 

905 """Initialize FullSearchResults if include_full_content is True. 

906 

907 Call this at the end of your __init__ after setting up your search wrapper. 

908 

909 Args: 

910 web_search: The search wrapper/engine to pass to FullSearchResults 

911 language: Language for search results 

912 max_results: Maximum number of results 

913 region: Region/country code for results 

914 time_period: Time period filter 

915 safe_search: Safe search setting (string value for FullSearchResults) 

916 """ 

917 if self.include_full_content and self.llm: 

918 try: 

919 from .engines.full_search import FullSearchResults 

920 

921 self.full_search = FullSearchResults( 

922 llm=self.llm, 

923 web_search=web_search, 

924 language=language, 

925 max_results=max_results, 

926 region=region, 

927 time=time_period, 

928 safesearch=safe_search, 

929 settings_snapshot=self.settings_snapshot, 

930 ) 

931 except ImportError: 

932 logger.warning( 

933 "FullSearchResults not available. " 

934 "Full content retrieval disabled." 

935 ) 

936 self.include_full_content = False 

937 

938 def _temp_attributes(self): 

939 """Return list of temporary attribute names to clean up after run(). 

940 

941 Override in subclasses that store additional temporary data. 

942 """ 

943 return ["_search_results"] 

944 

945 def _sanitize_error_message(self, message: str) -> str: 

946 """ 

947 Remove/mask API keys, tokens, and secrets from error messages. 

948 

949 Uses pattern matching for common credential formats. 

950 

951 Args: 

952 message: The error message to sanitize 

953 

954 Returns: 

955 Sanitized message with sensitive data redacted 

956 

957 Example: 

958 >>> engine._sanitize_error_message("Error with key sk-abc123xyz") 

959 "Error with key [REDACTED]" 

960 """ 

961 if not message: 

962 return message 

963 

964 sanitized = message 

965 

966 # Additional regex patterns for common credential formats 

967 patterns = [ 

968 # Bearer tokens 

969 (r"Bearer\s+[A-Za-z0-9\-._~+/]+=*", "Bearer [REDACTED]"), 

970 # API keys in URLs 

971 ( 

972 r"([?&])(api_key|apikey|key|token|secret)=([A-Za-z0-9\-._~+/]+)", 

973 r"\1\2=[REDACTED]", 

974 ), 

975 # URL credentials (user:pass@host) 

976 (r"(https?://)([^:\s]+):([^@\s]+)@", r"\1[REDACTED]:[REDACTED]@"), 

977 # Common API key patterns (sk-*, pk-*, etc.) 

978 (r"\b(sk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"), 

979 (r"\b(pk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"), 

980 ] 

981 

982 for pattern, replacement in patterns: 

983 sanitized = re.sub(pattern, replacement, sanitized) 

984 

985 return sanitized 

986 

987 def _mask_api_key(self, api_key: str, visible_chars: int = 4) -> str: 

988 """ 

989 Mask an API key for safe logging, showing only first and last characters. 

990 

991 Args: 

992 api_key: The API key to mask 

993 visible_chars: Number of characters to show at start and end 

994 

995 Returns: 

996 Masked API key in format "sk-1...nop" or "***" for short keys 

997 

998 Example: 

999 >>> engine._mask_api_key("sk-abcdefghijklmnop123456") 

1000 "sk-a...3456" 

1001 >>> engine._mask_api_key("short") 

1002 "***" 

1003 """ 

1004 if not api_key: 

1005 return "***" 

1006 

1007 api_key = str(api_key).strip() 

1008 

1009 if len(api_key) <= visible_chars * 2: 

1010 return "***" 

1011 

1012 return f"{api_key[:visible_chars]}...{api_key[-visible_chars:]}" 

1013 

1014 @abstractmethod 

1015 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

1016 """ 

1017 Get preview information (titles, summaries) for initial search results. 

1018 

1019 Args: 

1020 query: The search query 

1021 

1022 Returns: 

1023 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys 

1024 """ 

1025 pass 

1026 

1027 def close(self) -> None: 

1028 """ 

1029 Close any resources held by this search engine. 

1030 

1031 Subclasses with HTTP sessions or other resources should override this. 

1032 The base implementation safely closes any 'session' attribute if present 

1033 and closes both preview and content filters that hold resources. 

1034 """ 

1035 from ..utilities.resource_utils import safe_close 

1036 

1037 if hasattr(self, "session") and self.session is not None: 

1038 safe_close(self.session, "HTTP session") 

1039 # Close preview filters as well as content filters — the journal 

1040 # reputation filter is registered as a preview filter on academic 

1041 # engines (arxiv, pubmed, openalex, nasa_ads, semantic_scholar) 

1042 # and holds a SearXNG engine + LLM client that need releasing. 

1043 if hasattr(self, "_preview_filters"): 1043 ↛ 1046line 1043 didn't jump to line 1046 because the condition on line 1043 was always true

1044 for preview_filter in self._preview_filters: 1044 ↛ 1045line 1044 didn't jump to line 1045 because the loop on line 1044 never started

1045 safe_close(preview_filter, "preview filter") 

1046 if hasattr(self, "_content_filters"): 1046 ↛ exitline 1046 didn't return from function 'close' because the condition on line 1046 was always true

1047 for content_filter in self._content_filters: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the loop on line 1047 never started

1048 safe_close(content_filter, "content filter") 

1049 

1050 def __enter__(self): 

1051 """Support context manager usage.""" 

1052 return self 

1053 

1054 def __exit__(self, exc_type, exc_val, exc_tb): 

1055 """Cleanup on context exit.""" 

1056 self.close() 

1057 return False