Coverage for src / local_deep_research / web_search_engines / search_engine_base.py: 98%

322 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import json 

2import re 

3import time 

4from abc import ABC, abstractmethod 

5from typing import Any, Dict, List, Optional, Set, Union 

6 

7from langchain_core.language_models import BaseLLM 

8from loguru import logger 

9from tenacity import ( 

10 RetryError, 

11 retry, 

12 retry_if_exception_type, 

13 stop_after_attempt, 

14) 

15from tenacity.wait import wait_base 

16 

17from ..advanced_search_system.filters.base_filter import BaseFilter 

18from ..config.thread_settings import get_setting_from_snapshot 

19from ..utilities.thread_context import clear_search_context, set_search_context 

20 

21from .rate_limiting import RateLimitError, get_tracker 

22 

23 

24class AdaptiveWait(wait_base): 

25 """Custom wait strategy that uses adaptive rate limiting.""" 

26 

27 def __init__(self, get_wait_func): 

28 self.get_wait_func = get_wait_func 

29 

30 def __call__(self, retry_state): 

31 return self.get_wait_func() 

32 

33 

34class BaseSearchEngine(ABC): 

35 """ 

36 Abstract base class for search engines with two-phase retrieval capability. 

37 Handles common parameters and implements the two-phase search approach. 

38 """ 

39 

40 # Class attribute to indicate if this engine searches public internet sources 

41 # Should be overridden by subclasses - defaults to False for safety 

42 is_public = False 

43 

44 # Class attribute to indicate if this is a generic search engine (vs specialized) 

45 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed). 

46 # Note: generic does NOT imply good native ranking — see is_lexical. 

47 is_generic = False 

48 

49 # Class attribute to indicate if this is a scientific/academic search engine 

50 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc. 

51 is_scientific = False 

52 

53 # Class attribute to indicate if this is a local RAG/document search engine 

54 # Local engines search private document collections stored locally 

55 is_local = False 

56 

57 # Class attribute to indicate if this is a news search engine 

58 # News engines specialize in news articles and current events 

59 is_news = False 

60 

61 # Class attribute to indicate if this is a code search engine 

62 # Code engines specialize in searching code repositories 

63 is_code = False 

64 

65 # Class attribute to indicate if this is a book/literature search engine 

66 # Book engines search libraries and literary archives 

67 is_books = False 

68 

69 # Classification: does this engine use lexical/keyword-based search? 

70 # Lexical engines (arXiv, PubMed, Wikipedia, Mojeek, etc.) match results by 

71 # keywords without ML-based ranking. This is an informational flag that can 

72 # drive multiple behaviors (query optimization, result deduplication, UI hints). 

73 # For LLM relevance filtering specifically, see needs_llm_relevance_filter. 

74 is_lexical = False 

75 

76 # Behavioral: should the factory auto-enable LLM relevance filtering? 

77 # When True, the factory sets enable_llm_relevance_filter=True on the engine 

78 # instance, causing _filter_for_relevance() to run after previews are fetched. 

79 # Typically set alongside is_lexical=True, but can be set independently — 

80 # e.g. a non-lexical engine with noisy results could opt in. 

81 needs_llm_relevance_filter = False 

82 

83 # Tuning for the LLM relevance filter (only applies when the filter 

84 # is active for this engine). 

85 # 

86 # relevance_filter_batch_size: split previews into chunks of this many 

87 # before sending to the LLM. Smaller batches are faster per call and 

88 # more reliable on weaker models which struggle with many indices in 

89 # one context. None or 0 = single-call mode (no batching). 

90 # 

91 # relevance_filter_max_parallel_batches: number of batches to dispatch 

92 # concurrently against the LLM. 1 = sequential. Most providers handle 

93 # parallel requests fine (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI, 

94 # Anthropic). 

95 relevance_filter_batch_size: Optional[int] = 5 

96 relevance_filter_max_parallel_batches: int = 10 

97 

98 # Class attribute for rate limit detection patterns 

99 # Subclasses can override to add engine-specific patterns 

100 rate_limit_patterns: Set[str] = { 

101 "rate limit", 

102 "rate_limit", 

103 "ratelimit", 

104 "too many requests", 

105 "throttl", 

106 "quota exceeded", 

107 "quota_exceeded", 

108 "limit exceeded", 

109 "request limit", 

110 "api limit", 

111 "usage limit", 

112 } 

113 

114 @staticmethod 

115 def _ensure_list(value, *, default=None): 

116 """Normalize a value that should be a list. 

117 

118 Handles JSON-encoded strings, comma-separated strings, and 

119 already-parsed lists. Returns *default* (empty list when not 

120 supplied) for ``None`` or empty/unparseable input. 

121 """ 

122 if default is None: 

123 default = [] 

124 if value is None: 

125 return default 

126 if isinstance(value, list): 

127 return value 

128 if isinstance(value, str): 

129 stripped = value.strip() 

130 if not stripped: 

131 return default 

132 if stripped.startswith("["): 

133 try: 

134 parsed = json.loads(stripped) 

135 if isinstance(parsed, list): 135 ↛ 139line 135 didn't jump to line 139 because the condition on line 135 was always true

136 return [str(item) for item in parsed] 

137 except (json.JSONDecodeError, ValueError, RecursionError): 

138 pass 

139 return [ 

140 item.strip() for item in stripped.split(",") if item.strip() 

141 ] 

142 return default 

143 

144 @classmethod 

145 def _load_engine_class(cls, name: str, config: Dict[str, Any]): 

146 """ 

147 Helper method to load an engine class dynamically. 

148 

149 Args: 

150 name: Engine name 

151 config: Engine configuration dict with module_path and class_name 

152 

153 Returns: 

154 Tuple of (success: bool, engine_class or None, error_msg or None) 

155 """ 

156 from ..security.module_whitelist import ( 

157 ModuleNotAllowedError, 

158 get_safe_module_class, 

159 ) 

160 

161 try: 

162 module_path = config.get("module_path") 

163 class_name = config.get("class_name") 

164 

165 if not module_path or not class_name: 

166 return ( 

167 False, 

168 None, 

169 f"Missing module_path or class_name for {name}", 

170 ) 

171 

172 # Use whitelist-validated safe import 

173 engine_class = get_safe_module_class(module_path, class_name) 

174 

175 return True, engine_class, None 

176 

177 except ModuleNotAllowedError as e: 

178 return ( 

179 False, 

180 None, 

181 f"Security error loading engine class for {name}: {e}", 

182 ) 

183 except Exception as e: 

184 return False, None, f"Could not load engine class for {name}: {e}" 

185 

186 @classmethod 

187 def _check_api_key_availability( 

188 cls, name: str, config: Dict[str, Any] 

189 ) -> bool: 

190 """ 

191 Helper method to check if an engine's API key is available and valid. 

192 

193 Args: 

194 name: Engine name 

195 config: Engine configuration dict 

196 

197 Returns: 

198 True if API key is not required or is available and valid 

199 """ 

200 from loguru import logger 

201 

202 if not config.get("requires_api_key", False): 

203 return True 

204 

205 api_key = config.get("api_key", "").strip() 

206 

207 # Check for common placeholder values 

208 if ( 

209 not api_key 

210 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"] 

211 or api_key.endswith( 

212 "_API_KEY" 

213 ) # Default placeholders like BRAVE_API_KEY 

214 or api_key.startswith("YOUR_") 

215 or api_key == "null" 

216 ): 

217 logger.debug( 

218 f"Skipping {name} - requires API key but none configured" 

219 ) 

220 return False 

221 

222 return True 

223 

224 def __init__( 

225 self, 

226 llm: Optional[BaseLLM] = None, 

227 max_filtered_results: Optional[int] = None, 

228 max_results: Optional[int] = 10, # Default value if not provided 

229 preview_filters: List[BaseFilter] | None = None, 

230 content_filters: List[BaseFilter] | None = None, 

231 search_snippets_only: bool = True, # New parameter with default 

232 include_full_content: bool = False, 

233 settings_snapshot: Optional[Dict[str, Any]] = None, 

234 programmatic_mode: bool = False, 

235 **kwargs, 

236 ): 

237 """ 

238 Initialize the search engine with common parameters. 

239 

240 Args: 

241 llm: Optional language model for relevance filtering 

242 max_filtered_results: Maximum number of results to keep after filtering 

243 max_results: Maximum number of search results to return 

244 preview_filters: Filters that will be applied to all previews 

245 produced by the search engine, before relevancy checks. 

246 content_filters: Filters that will be applied to the full content 

247 produced by the search engine, after relevancy checks. 

248 search_snippets_only: Whether to return only snippets or full content 

249 include_full_content: Whether to use FullSearchResults for full webpage content 

250 settings_snapshot: Settings snapshot for configuration 

251 programmatic_mode: If True, disables database operations and uses memory-only tracking 

252 **kwargs: Additional engine-specific parameters 

253 """ 

254 if max_filtered_results is None: 

255 max_filtered_results = 5 

256 if max_results is None: 

257 max_results = 10 

258 self._preview_filters: List[BaseFilter] = ( 

259 preview_filters if preview_filters is not None else [] 

260 ) 

261 self._content_filters: List[BaseFilter] = ( 

262 content_filters if content_filters is not None else [] 

263 ) 

264 

265 self.llm = llm # LLM for relevance filtering 

266 self._max_filtered_results = int( 

267 max_filtered_results 

268 ) # Ensure it's an integer 

269 self._max_results = max( 

270 1, int(max_results) 

271 ) # Ensure it's a positive integer 

272 self.search_snippets_only = search_snippets_only # Store the setting 

273 self.include_full_content = include_full_content 

274 self.settings_snapshot = ( 

275 settings_snapshot or {} 

276 ) # Store settings snapshot 

277 self.programmatic_mode = programmatic_mode 

278 

279 # Rate limiting attributes 

280 self.engine_type = self.__class__.__name__ 

281 # Create a tracker with our settings if in programmatic mode 

282 if self.programmatic_mode: 

283 from .rate_limiting.tracker import AdaptiveRateLimitTracker 

284 

285 self.rate_tracker = AdaptiveRateLimitTracker( 

286 settings_snapshot=self.settings_snapshot, 

287 programmatic_mode=self.programmatic_mode, 

288 ) 

289 else: 

290 self.rate_tracker = get_tracker() 

291 self._last_wait_time = ( 

292 0.0 # Default to 0 for successful searches without rate limiting 

293 ) 

294 self._last_results_count = 0 

295 

296 @property 

297 def max_filtered_results(self) -> int: 

298 """Get the maximum number of filtered results.""" 

299 return self._max_filtered_results 

300 

301 @max_filtered_results.setter 

302 def max_filtered_results(self, value: int) -> None: 

303 """Set the maximum number of filtered results.""" 

304 if value is None: 

305 value = 5 

306 logger.warning("Setting max_filtered_results to 5") 

307 self._max_filtered_results = int(value) 

308 

309 @property 

310 def max_results(self) -> int: 

311 """Get the maximum number of search results.""" 

312 return self._max_results 

313 

314 @max_results.setter 

315 def max_results(self, value: int) -> None: 

316 """Set the maximum number of search results.""" 

317 if value is None: 

318 value = 10 

319 self._max_results = max(1, int(value)) 

320 

321 def _get_adaptive_wait(self) -> float: 

322 """Get adaptive wait time from tracker.""" 

323 wait_time = self.rate_tracker.get_wait_time(self.engine_type) 

324 self._last_wait_time = wait_time 

325 logger.debug( 

326 f"{self.engine_type} waiting {wait_time:.2f}s before retry" 

327 ) 

328 return wait_time 

329 

330 def _record_retry_outcome(self, retry_state) -> None: 

331 """Record outcome after retry completes.""" 

332 success = ( 

333 not retry_state.outcome.failed if retry_state.outcome else False 

334 ) 

335 self.rate_tracker.record_outcome( 

336 self.engine_type, 

337 self._last_wait_time or 0, 

338 success, 

339 retry_state.attempt_number, 

340 error_type="RateLimitError" if not success else None, 

341 search_result_count=self._last_results_count if success else 0, 

342 ) 

343 

344 def run( 

345 self, query: str, research_context: Dict[str, Any] | None = None 

346 ) -> List[Dict[str, Any]]: 

347 """ 

348 Run the search engine with a given query, retrieving and filtering results. 

349 This implements a two-phase retrieval approach: 

350 1. Get preview information for many results 

351 2. Filter the previews for relevance 

352 3. Get full content for only the relevant results 

353 

354 Args: 

355 query: The search query 

356 research_context: Context from previous research to use. 

357 

358 Returns: 

359 List of search results with full content (if available) 

360 """ 

361 logger.info(f"---Execute a search using {self.__class__.__name__}---") 

362 

363 # Track search call for metrics (if available and not in programmatic mode) 

364 should_record_metrics = False 

365 context_was_set = False 

366 if not self.programmatic_mode: 

367 from ..metrics.search_tracker import SearchTracker 

368 

369 should_record_metrics = True 

370 

371 # For thread-safe context propagation: if we have research_context parameter, use it 

372 # Otherwise, try to inherit from current thread context (normal case) 

373 # This allows strategies running in threads to explicitly pass context when needed 

374 if research_context: 

375 # Explicit context provided - use it and set it for this thread 

376 set_search_context(research_context) 

377 context_was_set = True 

378 

379 engine_name = self.__class__.__name__.replace( 

380 "SearchEngine", "" 

381 ).lower() 

382 start_time = time.time() 

383 

384 success = True 

385 error_message = None 

386 results_count = 0 

387 

388 # Define the core search function with retry logic 

389 if self.rate_tracker.enabled: 

390 # Rate limiting enabled - use retry with adaptive wait 

391 @retry( 

392 stop=stop_after_attempt(3), 

393 wait=AdaptiveWait(lambda: self._get_adaptive_wait()), 

394 retry=retry_if_exception_type((RateLimitError,)), 

395 after=self._record_retry_outcome, 

396 reraise=True, 

397 ) 

398 def _run_with_retry(): 

399 nonlocal success, error_message, results_count 

400 return _execute_search() 

401 else: 

402 # Rate limiting disabled - run without retry 

403 def _run_with_retry(): 

404 nonlocal success, error_message, results_count 

405 return _execute_search() 

406 

407 def _execute_search(): 

408 nonlocal success, error_message, results_count 

409 

410 try: 

411 # Step 1: Get preview information for items 

412 previews = self._get_previews(query) 

413 if not previews: 

414 logger.info( 

415 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}" 

416 ) 

417 results_count = 0 

418 return [] 

419 

420 for preview_filter in self._preview_filters: 

421 previews = preview_filter.filter_results(previews, query) 

422 

423 # Step 2: Filter previews for relevance with LLM 

424 enable_llm_filter = getattr( 

425 self, "enable_llm_relevance_filter", False 

426 ) 

427 

428 if enable_llm_filter and self.llm: 

429 filtered_items = self._filter_for_relevance(previews, query) 

430 else: 

431 filtered_items = previews 

432 logger.debug( 

433 f"[{type(self).__name__}] Relevance filter skipped " 

434 f"(enabled={enable_llm_filter}, " 

435 f"llm={'yes' if self.llm else 'no'})" 

436 ) 

437 

438 # Step 3: Get full content for filtered items 

439 if self.search_snippets_only: 

440 logger.info("Returning snippet-only results as per config") 

441 results = filtered_items 

442 else: 

443 results = self._get_full_content(filtered_items) 

444 

445 for content_filter in self._content_filters: 

446 results = content_filter.filter_results(results, query) 

447 

448 results_count = len(results) 

449 self._last_results_count = results_count 

450 

451 # Record success if we get here and rate limiting is enabled 

452 if self.rate_tracker.enabled: 

453 logger.info( 

454 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}" 

455 ) 

456 self.rate_tracker.record_outcome( 

457 self.engine_type, 

458 self._last_wait_time, 

459 success=True, 

460 retry_count=1, # First attempt succeeded 

461 search_result_count=results_count, 

462 ) 

463 else: 

464 logger.info( 

465 f"Rate limiting disabled, not recording search for {self.engine_type}" 

466 ) 

467 

468 return results 

469 

470 except RateLimitError: 

471 # Only re-raise if rate limiting is enabled 

472 if self.rate_tracker.enabled: 

473 raise 

474 # If rate limiting is disabled, treat as regular error 

475 success = False 

476 error_message = "Rate limit hit but rate limiting disabled" 

477 logger.warning( 

478 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled" 

479 ) 

480 results_count = 0 

481 return [] 

482 except Exception as e: 

483 # Other errors - don't retry 

484 success = False 

485 error_message = str(e) 

486 logger.exception( 

487 f"Search engine {self.__class__.__name__} failed" 

488 ) 

489 results_count = 0 

490 return [] 

491 

492 try: 

493 return _run_with_retry() # type: ignore[no-any-return] 

494 except RetryError as e: 

495 # All retries exhausted 

496 success = False 

497 error_message = f"Rate limited after all retries: {e}" 

498 logger.exception( 

499 f"{self.__class__.__name__} failed after all retries" 

500 ) 

501 return [] 

502 except Exception as e: 

503 success = False 

504 error_message = str(e) 

505 logger.exception(f"Search engine {self.__class__.__name__} error") 

506 return [] 

507 finally: 

508 try: 

509 # Record search metrics BEFORE clearing context (record_search needs it) 

510 if should_record_metrics: 

511 response_time_ms = int((time.time() - start_time) * 1000) 

512 SearchTracker.record_search( 

513 engine_name=engine_name, 

514 query=query, 

515 results_count=results_count, 

516 response_time_ms=response_time_ms, 

517 success=success, 

518 error_message=error_message, 

519 ) 

520 finally: 

521 # Clean up temporary search result storage 

522 for attr in self._temp_attributes(): 

523 if hasattr(self, attr): 

524 delattr(self, attr) 

525 # ALWAYS clean up search context, even if recording fails 

526 if context_was_set: 

527 clear_search_context() 

528 

529 def invoke(self, query: str) -> List[Dict[str, Any]]: 

530 """Compatibility method for LangChain tools""" 

531 return self.run(query) 

532 

533 def _filter_for_relevance( 

534 self, previews: List[Dict[str, Any]], query: str 

535 ) -> List[Dict[str, Any]]: 

536 """ 

537 Filter search results by relevance using the LLM. 

538 

539 Delegates to the ``relevance_filter`` module, which prompts the 

540 LLM for a plain-text list of relevant indices and parses them 

541 with a regex (no structured output). 

542 

543 Args: 

544 previews: List of preview dictionaries 

545 query: The original search query 

546 

547 Returns: 

548 Filtered list of preview dictionaries 

549 """ 

550 engine_name = type(self).__name__ 

551 

552 if not self.llm or len(previews) <= 1: 

553 logger.debug( 

554 f"[{engine_name}] Skipping relevance filter " 

555 f"(llm={'yes' if self.llm else 'no'}, " 

556 f"previews={len(previews)})" 

557 ) 

558 return previews 

559 

560 from .relevance_filter import filter_previews_for_relevance 

561 

562 return filter_previews_for_relevance( 

563 llm=self.llm, 

564 previews=previews, 

565 query=query, 

566 max_filtered_results=self.max_filtered_results, 

567 engine_name=engine_name, 

568 batch_size=self.relevance_filter_batch_size, 

569 max_parallel_batches=self.relevance_filter_max_parallel_batches, 

570 ) 

571 

572 # ========================================================================= 

573 # Shared Helper Methods for Subclasses 

574 # ========================================================================= 

575 

576 @staticmethod 

577 def _is_valid_api_key(api_key: Optional[str]) -> bool: 

578 """ 

579 Check if an API key is valid (not a placeholder value). 

580 

581 Args: 

582 api_key: The API key to validate 

583 

584 Returns: 

585 True if the key appears to be a real API key, False if it's a placeholder 

586 

587 Example: 

588 >>> BaseSearchEngine._is_valid_api_key("sk-abc123") 

589 True 

590 >>> BaseSearchEngine._is_valid_api_key("YOUR_API_KEY_HERE") 

591 False 

592 """ 

593 if not api_key: 

594 return False 

595 

596 api_key = api_key.strip() 

597 

598 # Empty or whitespace-only 

599 if not api_key: 

600 return False 

601 

602 # Common placeholder values 

603 placeholders = { 

604 "", 

605 "None", 

606 "null", 

607 "PLACEHOLDER", 

608 "YOUR_API_KEY_HERE", 

609 "YOUR_API_KEY", 

610 "API_KEY", 

611 "your_api_key", 

612 "your-api-key", 

613 } 

614 

615 if api_key in placeholders: 

616 return False 

617 

618 # Patterns that indicate placeholders 

619 if api_key.endswith("_API_KEY"): 

620 return False 

621 if api_key.startswith("YOUR_"): 

622 return False 

623 if api_key.startswith("<") and api_key.endswith(">"): 

624 return False 

625 if api_key.startswith("${") and api_key.endswith("}"): 

626 return False 

627 

628 return True 

629 

630 def _resolve_api_key( 

631 self, 

632 api_key: Optional[str], 

633 setting_key: str, 

634 engine_name: str = "search engine", 

635 settings_snapshot: Optional[Dict[str, Any]] = None, 

636 ) -> str: 

637 """ 

638 Resolve an API key from multiple sources with priority order. 

639 

640 Environment variables are handled automatically by SettingsManager 

641 when building the settings snapshot, so they don't need to be 

642 checked separately here. 

643 

644 Priority order: 

645 1. Direct parameter (api_key argument) 

646 2. Settings snapshot (via setting_key) 

647 

648 Args: 

649 api_key: API key passed directly as parameter 

650 setting_key: Key to look up in settings snapshot (e.g., "search.brave_api_key") 

651 engine_name: Human-readable engine name for error messages 

652 settings_snapshot: Optional settings snapshot dict (uses self.settings_snapshot if not provided) 

653 

654 Returns: 

655 The resolved API key string 

656 

657 Raises: 

658 ValueError: If no valid API key is found from any source 

659 

660 Example: 

661 >>> engine._resolve_api_key( 

662 ... api_key=None, 

663 ... setting_key="search.brave_api_key", 

664 ... engine_name="Brave Search" 

665 ... ) 

666 "sk-abc123..." 

667 """ 

668 # Use instance settings snapshot if not provided 

669 if settings_snapshot is None: 

670 settings_snapshot = self.settings_snapshot 

671 

672 # Priority 1: Direct parameter 

673 if self._is_valid_api_key(api_key) and api_key is not None: 

674 return api_key.strip() 

675 

676 # Priority 2: Settings snapshot (includes env var overrides via SettingsManager) 

677 if settings_snapshot: 

678 settings_value = get_setting_from_snapshot( 

679 setting_key, 

680 default=None, 

681 settings_snapshot=settings_snapshot, 

682 ) 

683 if self._is_valid_api_key(settings_value): 

684 return settings_value.strip() if settings_value else "" 

685 

686 # No valid API key found 

687 masked_key = self._mask_api_key(str(api_key)) if api_key else "None" 

688 raise ValueError( 

689 f"No valid API key found for {engine_name}. " 

690 f"Checked: direct parameter ({masked_key}), " 

691 f"settings key '{setting_key}'. " 

692 f"Please provide a valid API key." 

693 ) 

694 

695 def _is_rate_limit_error( 

696 self, 

697 error: Union[Exception, str, int], 

698 additional_patterns: Optional[Set[str]] = None, 

699 ) -> bool: 

700 """ 

701 Detect if an error is a rate limit error. 

702 

703 Checks multiple sources for rate limit indicators: 

704 - HTTP status code 429 

705 - HTTPError response objects 

706 - Error messages containing rate limit phrases 

707 

708 Args: 

709 error: The error to check (Exception, string, or HTTP status code) 

710 additional_patterns: Optional set of additional patterns to match 

711 

712 Returns: 

713 True if the error appears to be a rate limit error 

714 

715 Example: 

716 >>> engine._is_rate_limit_error(429) 

717 True 

718 >>> engine._is_rate_limit_error("Rate limit exceeded") 

719 True 

720 >>> engine._is_rate_limit_error(ValueError("Invalid input")) 

721 False 

722 """ 

723 # Combine default and additional patterns 

724 patterns = self.rate_limit_patterns.copy() 

725 if additional_patterns: 

726 patterns.update(additional_patterns) 

727 

728 # Check integer status code directly 

729 if isinstance(error, int): 

730 return error == 429 

731 

732 # Convert to string for pattern matching 

733 error_str = "" 

734 status_code = None 

735 

736 if isinstance(error, str): 

737 error_str = error 

738 elif isinstance(error, Exception): 

739 error_str = str(error) 

740 

741 # Check for HTTP status code in common HTTP error types 

742 if hasattr(error, "status_code"): 

743 status_code = error.status_code 

744 elif hasattr(error, "response"): 

745 response = error.response 

746 if hasattr(response, "status_code"): 

747 status_code = response.status_code 

748 

749 # Check status code first 

750 if status_code == 429: 

751 return True 

752 

753 # Case-insensitive pattern matching 

754 error_lower = error_str.lower() 

755 for pattern in patterns: 

756 if pattern.lower() in error_lower: 

757 return True 

758 

759 return False 

760 

761 def _raise_if_rate_limit( 

762 self, 

763 error: Union[Exception, str, int], 

764 additional_patterns: Optional[Set[str]] = None, 

765 ) -> None: 

766 """ 

767 Raise RateLimitError if the given error is a rate limit error. 

768 

769 Convenience method that combines _is_rate_limit_error check with 

770 raising RateLimitError. 

771 

772 Args: 

773 error: The error to check 

774 additional_patterns: Optional set of additional patterns to match 

775 

776 Raises: 

777 RateLimitError: If the error is detected as a rate limit error 

778 

779 Example: 

780 >>> try: 

781 ... response = make_api_call() 

782 ... except Exception as e: 

783 ... engine._raise_if_rate_limit(e) 

784 """ 

785 if self._is_rate_limit_error(error, additional_patterns): 

786 error_msg = str(error) if not isinstance(error, str) else error 

787 raise RateLimitError( 

788 f"Rate limit detected: {self._sanitize_error_message(error_msg)}" 

789 ) 

790 

791 def _extract_full_result(self, item: Dict[str, Any]) -> Dict[str, Any]: 

792 """ 

793 Extract the full result from an item that may contain a _full_result key. 

794 

795 This is a helper for the default _get_full_content implementation. 

796 It extracts data from the _full_result key if present, otherwise uses 

797 the item directly, and removes the internal _full_result key. 

798 

799 Args: 

800 item: A search result item that may contain a _full_result key 

801 

802 Returns: 

803 A dictionary with the full result data, without the _full_result key 

804 

805 Example: 

806 >>> engine._extract_full_result({"title": "A", "_full_result": {"title": "A", "content": "Full"}}) 

807 {"title": "A", "content": "Full"} 

808 """ 

809 source = item.get("_full_result") 

810 if source is None: 

811 source = item 

812 return {k: v for k, v in source.items() if k != "_full_result"} 

813 

814 def _get_full_content( 

815 self, relevant_items: List[Dict[str, Any]] 

816 ) -> List[Dict[str, Any]]: 

817 """ 

818 Get full content for the relevant items. 

819 

820 Default implementation extracts data from _full_result keys if present. 

821 Subclasses can override this method to fetch additional content from 

822 external sources (e.g., web scraping, API calls). 

823 

824 Args: 

825 relevant_items: List of relevant preview dictionaries 

826 

827 Returns: 

828 List of result dictionaries with full content 

829 

830 Example: 

831 >>> engine._get_full_content([ 

832 ... {"title": "A", "_full_result": {"title": "A", "content": "Full A"}}, 

833 ... {"title": "B"} 

834 ... ]) 

835 [{"title": "A", "content": "Full A"}, {"title": "B"}] 

836 """ 

837 if not relevant_items: 

838 return [] 

839 return [self._extract_full_result(item) for item in relevant_items] 

840 

841 def _init_full_search( 

842 self, 

843 web_search=None, 

844 language="en", 

845 max_results=10, 

846 region=None, 

847 time_period=None, 

848 safe_search=None, 

849 ): 

850 """Initialize FullSearchResults if include_full_content is True. 

851 

852 Call this at the end of your __init__ after setting up your search wrapper. 

853 

854 Args: 

855 web_search: The search wrapper/engine to pass to FullSearchResults 

856 language: Language for search results 

857 max_results: Maximum number of results 

858 region: Region/country code for results 

859 time_period: Time period filter 

860 safe_search: Safe search setting (string value for FullSearchResults) 

861 """ 

862 if self.include_full_content and self.llm: 

863 try: 

864 from .engines.full_search import FullSearchResults 

865 

866 self.full_search = FullSearchResults( 

867 llm=self.llm, 

868 web_search=web_search, 

869 language=language, 

870 max_results=max_results, 

871 region=region, 

872 time=time_period, 

873 safesearch=safe_search, 

874 ) 

875 except ImportError: 

876 logger.warning( 

877 "FullSearchResults not available. " 

878 "Full content retrieval disabled." 

879 ) 

880 self.include_full_content = False 

881 

882 def _temp_attributes(self): 

883 """Return list of temporary attribute names to clean up after run(). 

884 

885 Override in subclasses that store additional temporary data. 

886 """ 

887 return ["_search_results"] 

888 

889 def _sanitize_error_message(self, message: str) -> str: 

890 """ 

891 Remove/mask API keys, tokens, and secrets from error messages. 

892 

893 Uses pattern matching for common credential formats. 

894 

895 Args: 

896 message: The error message to sanitize 

897 

898 Returns: 

899 Sanitized message with sensitive data redacted 

900 

901 Example: 

902 >>> engine._sanitize_error_message("Error with key sk-abc123xyz") 

903 "Error with key [REDACTED]" 

904 """ 

905 if not message: 

906 return message 

907 

908 sanitized = message 

909 

910 # Additional regex patterns for common credential formats 

911 patterns = [ 

912 # Bearer tokens 

913 (r"Bearer\s+[A-Za-z0-9\-._~+/]+=*", "Bearer [REDACTED]"), 

914 # API keys in URLs 

915 ( 

916 r"([?&])(api_key|apikey|key|token|secret)=([A-Za-z0-9\-._~+/]+)", 

917 r"\1\2=[REDACTED]", 

918 ), 

919 # URL credentials (user:pass@host) 

920 (r"(https?://)([^:\s]+):([^@\s]+)@", r"\1[REDACTED]:[REDACTED]@"), 

921 # Common API key patterns (sk-*, pk-*, etc.) 

922 (r"\b(sk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"), 

923 (r"\b(pk-[A-Za-z0-9]{20,})\b", "[REDACTED_KEY]"), 

924 ] 

925 

926 for pattern, replacement in patterns: 

927 sanitized = re.sub(pattern, replacement, sanitized) 

928 

929 return sanitized 

930 

931 def _mask_api_key(self, api_key: str, visible_chars: int = 4) -> str: 

932 """ 

933 Mask an API key for safe logging, showing only first and last characters. 

934 

935 Args: 

936 api_key: The API key to mask 

937 visible_chars: Number of characters to show at start and end 

938 

939 Returns: 

940 Masked API key in format "sk-1...nop" or "***" for short keys 

941 

942 Example: 

943 >>> engine._mask_api_key("sk-abcdefghijklmnop123456") 

944 "sk-a...3456" 

945 >>> engine._mask_api_key("short") 

946 "***" 

947 """ 

948 if not api_key: 

949 return "***" 

950 

951 api_key = str(api_key).strip() 

952 

953 if len(api_key) <= visible_chars * 2: 

954 return "***" 

955 

956 return f"{api_key[:visible_chars]}...{api_key[-visible_chars:]}" 

957 

958 @abstractmethod 

959 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

960 """ 

961 Get preview information (titles, summaries) for initial search results. 

962 

963 Args: 

964 query: The search query 

965 

966 Returns: 

967 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys 

968 """ 

969 pass 

970 

971 def close(self) -> None: 

972 """ 

973 Close any resources held by this search engine. 

974 

975 Subclasses with HTTP sessions or other resources should override this. 

976 The base implementation safely closes any 'session' attribute if present 

977 and closes content filters that hold resources. 

978 """ 

979 from ..utilities.resource_utils import safe_close 

980 

981 if hasattr(self, "session") and self.session is not None: 

982 safe_close(self.session, "HTTP session") 

983 if hasattr(self, "_content_filters"): 983 ↛ exitline 983 didn't return from function 'close' because the condition on line 983 was always true

984 for content_filter in self._content_filters: 984 ↛ 985line 984 didn't jump to line 985 because the loop on line 984 never started

985 safe_close(content_filter, "content filter") 

986 

987 def __enter__(self): 

988 """Support context manager usage.""" 

989 return self 

990 

991 def __exit__(self, exc_type, exc_val, exc_tb): 

992 """Cleanup on context exit.""" 

993 self.close() 

994 return False