Coverage for src / local_deep_research / web_search_engines / search_engine_base.py: 95%

281 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import json 

2import time 

3from abc import ABC, abstractmethod 

4from datetime import datetime, UTC 

5from typing import Any, Dict, List, Optional 

6 

7from langchain_core.language_models import BaseLLM 

8from loguru import logger 

9from tenacity import ( 

10 RetryError, 

11 retry, 

12 retry_if_exception_type, 

13 stop_after_attempt, 

14) 

15from tenacity.wait import wait_base 

16 

17from ..advanced_search_system.filters.base_filter import BaseFilter 

18from ..utilities.json_utils import extract_json, get_llm_response_text 

19from ..utilities.thread_context import clear_search_context, set_search_context 

20 

21# Lazy import for metrics to avoid database dependencies in programmatic mode 

22# from ..metrics.search_tracker import get_search_tracker 

23from .rate_limiting import RateLimitError, get_tracker 

24 

25 

26class AdaptiveWait(wait_base): 

27 """Custom wait strategy that uses adaptive rate limiting.""" 

28 

29 def __init__(self, get_wait_func): 

30 self.get_wait_func = get_wait_func 

31 

32 def __call__(self, retry_state): 

33 return self.get_wait_func() 

34 

35 

36class BaseSearchEngine(ABC): 

37 """ 

38 Abstract base class for search engines with two-phase retrieval capability. 

39 Handles common parameters and implements the two-phase search approach. 

40 """ 

41 

42 # Class attribute to indicate if this engine searches public internet sources 

43 # Should be overridden by subclasses - defaults to False for safety 

44 is_public = False 

45 

46 # Class attribute to indicate if this is a generic search engine (vs specialized) 

47 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed) 

48 is_generic = False 

49 

50 # Class attribute to indicate if this is a scientific/academic search engine 

51 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc. 

52 is_scientific = False 

53 

54 # Class attribute to indicate if this is a local RAG/document search engine 

55 # Local engines search private document collections stored locally 

56 is_local = False 

57 

58 # Class attribute to indicate if this is a news search engine 

59 # News engines specialize in news articles and current events 

60 is_news = False 

61 

62 # Class attribute to indicate if this is a code search engine 

63 # Code engines specialize in searching code repositories 

64 is_code = False 

65 

66 @staticmethod 

67 def _ensure_list(value, *, default=None): 

68 """Normalize a value that should be a list. 

69 

70 Handles JSON-encoded strings, comma-separated strings, and 

71 already-parsed lists. Returns *default* (empty list when not 

72 supplied) for ``None`` or empty/unparseable input. 

73 """ 

74 if default is None: 

75 default = [] 

76 if value is None: 

77 return default 

78 if isinstance(value, list): 

79 return value 

80 if isinstance(value, str): 

81 stripped = value.strip() 

82 if not stripped: 

83 return default 

84 if stripped.startswith("["): 

85 try: 

86 parsed = json.loads(stripped) 

87 if isinstance(parsed, list): 87 ↛ 91line 87 didn't jump to line 91 because the condition on line 87 was always true

88 return [str(item) for item in parsed] 

89 except (json.JSONDecodeError, ValueError, RecursionError): 

90 pass 

91 return [ 

92 item.strip() for item in stripped.split(",") if item.strip() 

93 ] 

94 return default 

95 

96 @classmethod 

97 def _load_engine_class(cls, name: str, config: Dict[str, Any]): 

98 """ 

99 Helper method to load an engine class dynamically. 

100 

101 Args: 

102 name: Engine name 

103 config: Engine configuration dict with module_path and class_name 

104 

105 Returns: 

106 Tuple of (success: bool, engine_class or None, error_msg or None) 

107 """ 

108 from ..security.module_whitelist import ( 

109 ModuleNotAllowedError, 

110 get_safe_module_class, 

111 ) 

112 

113 try: 

114 module_path = config.get("module_path") 

115 class_name = config.get("class_name") 

116 

117 if not module_path or not class_name: 

118 return ( 

119 False, 

120 None, 

121 f"Missing module_path or class_name for {name}", 

122 ) 

123 

124 # Use whitelist-validated safe import 

125 engine_class = get_safe_module_class(module_path, class_name) 

126 

127 return True, engine_class, None 

128 

129 except ModuleNotAllowedError as e: 

130 return ( 

131 False, 

132 None, 

133 f"Security error loading engine class for {name}: {e}", 

134 ) 

135 except Exception as e: 

136 return False, None, f"Could not load engine class for {name}: {e}" 

137 

138 @classmethod 

139 def _check_api_key_availability( 

140 cls, name: str, config: Dict[str, Any] 

141 ) -> bool: 

142 """ 

143 Helper method to check if an engine's API key is available and valid. 

144 

145 Args: 

146 name: Engine name 

147 config: Engine configuration dict 

148 

149 Returns: 

150 True if API key is not required or is available and valid 

151 """ 

152 from loguru import logger 

153 

154 if not config.get("requires_api_key", False): 

155 return True 

156 

157 api_key = config.get("api_key", "").strip() 

158 

159 # Check for common placeholder values 

160 if ( 

161 not api_key 

162 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"] 

163 or api_key.endswith( 

164 "_API_KEY" 

165 ) # Default placeholders like BRAVE_API_KEY 

166 or api_key.startswith("YOUR_") 

167 or api_key == "null" 

168 ): 

169 logger.debug( 

170 f"Skipping {name} - requires API key but none configured" 

171 ) 

172 return False 

173 

174 return True 

175 

176 def __init__( 

177 self, 

178 llm: Optional[BaseLLM] = None, 

179 max_filtered_results: Optional[int] = None, 

180 max_results: Optional[int] = 10, # Default value if not provided 

181 preview_filters: List[BaseFilter] | None = None, 

182 content_filters: List[BaseFilter] | None = None, 

183 search_snippets_only: bool = True, # New parameter with default 

184 settings_snapshot: Optional[Dict[str, Any]] = None, 

185 programmatic_mode: bool = False, 

186 **kwargs, 

187 ): 

188 """ 

189 Initialize the search engine with common parameters. 

190 

191 Args: 

192 llm: Optional language model for relevance filtering 

193 max_filtered_results: Maximum number of results to keep after filtering 

194 max_results: Maximum number of search results to return 

195 preview_filters: Filters that will be applied to all previews 

196 produced by the search engine, before relevancy checks. 

197 content_filters: Filters that will be applied to the full content 

198 produced by the search engine, after relevancy checks. 

199 search_snippets_only: Whether to return only snippets or full content 

200 settings_snapshot: Settings snapshot for configuration 

201 programmatic_mode: If True, disables database operations and uses memory-only tracking 

202 **kwargs: Additional engine-specific parameters 

203 """ 

204 if max_filtered_results is None: 

205 max_filtered_results = 5 

206 if max_results is None: 

207 max_results = 10 

208 self._preview_filters: List[BaseFilter] = preview_filters 

209 if self._preview_filters is None: 

210 self._preview_filters = [] 

211 self._content_filters: List[BaseFilter] = content_filters 

212 if self._content_filters is None: 

213 self._content_filters = [] 

214 

215 self.llm = llm # LLM for relevance filtering 

216 self._max_filtered_results = int( 

217 max_filtered_results 

218 ) # Ensure it's an integer 

219 self._max_results = max( 

220 1, int(max_results) 

221 ) # Ensure it's a positive integer 

222 self.search_snippets_only = search_snippets_only # Store the setting 

223 self.settings_snapshot = ( 

224 settings_snapshot or {} 

225 ) # Store settings snapshot 

226 self.programmatic_mode = programmatic_mode 

227 

228 # Rate limiting attributes 

229 self.engine_type = self.__class__.__name__ 

230 # Create a tracker with our settings if in programmatic mode 

231 if self.programmatic_mode: 

232 from .rate_limiting.tracker import AdaptiveRateLimitTracker 

233 

234 self.rate_tracker = AdaptiveRateLimitTracker( 

235 settings_snapshot=self.settings_snapshot, 

236 programmatic_mode=self.programmatic_mode, 

237 ) 

238 else: 

239 self.rate_tracker = get_tracker() 

240 self._last_wait_time = ( 

241 0.0 # Default to 0 for successful searches without rate limiting 

242 ) 

243 self._last_results_count = 0 

244 

245 @property 

246 def max_filtered_results(self) -> int: 

247 """Get the maximum number of filtered results.""" 

248 return self._max_filtered_results 

249 

250 @max_filtered_results.setter 

251 def max_filtered_results(self, value: int) -> None: 

252 """Set the maximum number of filtered results.""" 

253 if value is None: 

254 value = 5 

255 logger.warning("Setting max_filtered_results to 5") 

256 self._max_filtered_results = int(value) 

257 

258 @property 

259 def max_results(self) -> int: 

260 """Get the maximum number of search results.""" 

261 return self._max_results 

262 

263 @max_results.setter 

264 def max_results(self, value: int) -> None: 

265 """Set the maximum number of search results.""" 

266 if value is None: 

267 value = 10 

268 self._max_results = max(1, int(value)) 

269 

270 def _get_adaptive_wait(self) -> float: 

271 """Get adaptive wait time from tracker.""" 

272 wait_time = self.rate_tracker.get_wait_time(self.engine_type) 

273 self._last_wait_time = wait_time 

274 logger.debug( 

275 f"{self.engine_type} waiting {wait_time:.2f}s before retry" 

276 ) 

277 return wait_time 

278 

279 def _record_retry_outcome(self, retry_state) -> None: 

280 """Record outcome after retry completes.""" 

281 success = ( 

282 not retry_state.outcome.failed if retry_state.outcome else False 

283 ) 

284 self.rate_tracker.record_outcome( 

285 self.engine_type, 

286 self._last_wait_time or 0, 

287 success, 

288 retry_state.attempt_number, 

289 error_type="RateLimitError" if not success else None, 

290 search_result_count=self._last_results_count if success else 0, 

291 ) 

292 

293 def run( 

294 self, query: str, research_context: Dict[str, Any] | None = None 

295 ) -> List[Dict[str, Any]]: 

296 """ 

297 Run the search engine with a given query, retrieving and filtering results. 

298 This implements a two-phase retrieval approach: 

299 1. Get preview information for many results 

300 2. Filter the previews for relevance 

301 3. Get full content for only the relevant results 

302 

303 Args: 

304 query: The search query 

305 research_context: Context from previous research to use. 

306 

307 Returns: 

308 List of search results with full content (if available) 

309 """ 

310 # Track search call for metrics (if available and not in programmatic mode) 

311 tracker = None 

312 context_was_set = False 

313 if not self.programmatic_mode: 

314 from ..metrics.search_tracker import get_search_tracker 

315 

316 # For thread-safe context propagation: if we have research_context parameter, use it 

317 # Otherwise, try to inherit from current thread context (normal case) 

318 # This allows strategies running in threads to explicitly pass context when needed 

319 if research_context: 

320 # Explicit context provided - use it and set it for this thread 

321 set_search_context(research_context) 

322 context_was_set = True 

323 

324 # Get tracker after context is set (either from parameter or thread) 

325 tracker = get_search_tracker() 

326 

327 engine_name = self.__class__.__name__.replace( 

328 "SearchEngine", "" 

329 ).lower() 

330 start_time = time.time() 

331 

332 success = True 

333 error_message = None 

334 results_count = 0 

335 

336 # Define the core search function with retry logic 

337 if self.rate_tracker.enabled: 

338 # Rate limiting enabled - use retry with adaptive wait 

339 @retry( 

340 stop=stop_after_attempt(3), 

341 wait=AdaptiveWait(lambda: self._get_adaptive_wait()), 

342 retry=retry_if_exception_type((RateLimitError,)), 

343 after=self._record_retry_outcome, 

344 reraise=True, 

345 ) 

346 def _run_with_retry(): 

347 nonlocal success, error_message, results_count 

348 return _execute_search() 

349 else: 

350 # Rate limiting disabled - run without retry 

351 def _run_with_retry(): 

352 nonlocal success, error_message, results_count 

353 return _execute_search() 

354 

355 def _execute_search(): 

356 nonlocal success, error_message, results_count 

357 

358 try: 

359 # Step 1: Get preview information for items 

360 previews = self._get_previews(query) 

361 if not previews: 

362 logger.info( 

363 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}" 

364 ) 

365 results_count = 0 

366 return [] 

367 

368 for preview_filter in self._preview_filters: 

369 previews = preview_filter.filter_results(previews, query) 

370 

371 # Step 2: Filter previews for relevance with LLM 

372 # Check if LLM relevance filtering should be enabled 

373 enable_llm_filter = getattr( 

374 self, "enable_llm_relevance_filter", False 

375 ) 

376 

377 logger.info( 

378 f"BaseSearchEngine: Relevance filter check - enable_llm_relevance_filter={enable_llm_filter}, has_llm={self.llm is not None}, engine_type={type(self).__name__}" 

379 ) 

380 

381 if enable_llm_filter and self.llm: 

382 logger.info( 

383 f"Applying LLM relevance filter to {len(previews)} previews" 

384 ) 

385 filtered_items = self._filter_for_relevance(previews, query) 

386 logger.info( 

387 f"LLM filter kept {len(filtered_items)} of {len(previews)} results" 

388 ) 

389 else: 

390 filtered_items = previews 

391 if not enable_llm_filter: 391 ↛ 395line 391 didn't jump to line 395 because the condition on line 391 was always true

392 logger.info( 

393 f"LLM relevance filtering disabled (enable_llm_relevance_filter={enable_llm_filter}) - returning all {len(previews)} previews" 

394 ) 

395 elif not self.llm: 

396 logger.info( 

397 f"No LLM available for relevance filtering - returning all {len(previews)} previews" 

398 ) 

399 

400 # Step 3: Get full content for filtered items 

401 if self.search_snippets_only: 

402 logger.info("Returning snippet-only results as per config") 

403 results = filtered_items 

404 else: 

405 results = self._get_full_content(filtered_items) 

406 

407 for content_filter in self._content_filters: 

408 results = content_filter.filter_results(results, query) 

409 

410 results_count = len(results) 

411 self._last_results_count = results_count 

412 

413 # Record success if we get here and rate limiting is enabled 

414 if self.rate_tracker.enabled: 

415 logger.info( 

416 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}" 

417 ) 

418 self.rate_tracker.record_outcome( 

419 self.engine_type, 

420 self._last_wait_time, 

421 success=True, 

422 retry_count=1, # First attempt succeeded 

423 search_result_count=results_count, 

424 ) 

425 else: 

426 logger.info( 

427 f"Rate limiting disabled, not recording search for {self.engine_type}" 

428 ) 

429 

430 return results 

431 

432 except RateLimitError: 

433 # Only re-raise if rate limiting is enabled 

434 if self.rate_tracker.enabled: 

435 raise 

436 else: 

437 # If rate limiting is disabled, treat as regular error 

438 success = False 

439 error_message = "Rate limit hit but rate limiting disabled" 

440 logger.warning( 

441 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled" 

442 ) 

443 results_count = 0 

444 return [] 

445 except Exception as e: 

446 # Other errors - don't retry 

447 success = False 

448 error_message = str(e) 

449 logger.exception( 

450 f"Search engine {self.__class__.__name__} failed" 

451 ) 

452 results_count = 0 

453 return [] 

454 

455 try: 

456 return _run_with_retry() 

457 except RetryError as e: 

458 # All retries exhausted 

459 success = False 

460 error_message = f"Rate limited after all retries: {e}" 

461 logger.exception( 

462 f"{self.__class__.__name__} failed after all retries" 

463 ) 

464 return [] 

465 except Exception as e: 

466 success = False 

467 error_message = str(e) 

468 logger.exception(f"Search engine {self.__class__.__name__} error") 

469 return [] 

470 finally: 

471 try: 

472 # Record search metrics BEFORE clearing context (record_search needs it) 

473 if tracker is not None: 

474 response_time_ms = int((time.time() - start_time) * 1000) 

475 tracker.record_search( 

476 engine_name=engine_name, 

477 query=query, 

478 results_count=results_count, 

479 response_time_ms=response_time_ms, 

480 success=success, 

481 error_message=error_message, 

482 ) 

483 finally: 

484 # ALWAYS clean up search context, even if recording fails 

485 if context_was_set: 

486 clear_search_context() 

487 

488 def invoke(self, query: str) -> List[Dict[str, Any]]: 

489 """Compatibility method for LangChain tools""" 

490 return self.run(query) 

491 

492 def _filter_for_relevance( 

493 self, previews: List[Dict[str, Any]], query: str 

494 ) -> List[Dict[str, Any]]: 

495 """ 

496 Filter search results by relevance to the query using the LLM. 

497 

498 Args: 

499 previews: List of preview dictionaries 

500 query: The original search query 

501 

502 Returns: 

503 Filtered list of preview dictionaries 

504 """ 

505 # If no LLM or too few previews, return all 

506 if not self.llm or len(previews) <= 1: 

507 return previews 

508 

509 # Log the number of previews we're processing 

510 logger.info(f"Filtering {len(previews)} previews for relevance") 

511 

512 # Create a simple context for LLM 

513 preview_context = [] 

514 indices_used = [] 

515 for i, preview in enumerate(previews): 

516 title = preview.get("title", "Untitled").strip() 

517 snippet = preview.get("snippet", "").strip() 

518 url = preview.get("url", "").strip() 

519 

520 # Clean up snippet if too long 

521 if len(snippet) > 300: 

522 snippet = snippet[:300] + "..." 

523 

524 preview_context.append( 

525 f"[{i}] Title: {title}\nURL: {url}\nSnippet: {snippet}" 

526 ) 

527 indices_used.append(i) 

528 

529 # Log the indices we're presenting to the LLM 

530 logger.info( 

531 f"Created preview context with indices 0-{len(previews) - 1}" 

532 ) 

533 logger.info( 

534 f"First 5 indices in prompt: {indices_used[:5]}, Last 5: {indices_used[-5:] if len(indices_used) > 5 else 'N/A'}" 

535 ) 

536 

537 # Join all previews with clear separation 

538 preview_text = "\n\n".join(preview_context) 

539 

540 # Log a sample of what we're sending to the LLM 

541 logger.debug( 

542 f"First preview in prompt: {preview_context[0] if preview_context else 'None'}" 

543 ) 

544 if len(preview_context) > 1: 544 ↛ 548line 544 didn't jump to line 548 because the condition on line 544 was always true

545 logger.debug(f"Last preview in prompt: {preview_context[-1]}") 

546 

547 # Set a reasonable limit on context length 

548 current_date = datetime.now(UTC).strftime("%Y-%m-%d") 

549 prompt = f"""Analyze these search results and select the most relevant ones for the query. 

550 

551Query: "{query}" 

552Current date: {current_date} 

553Total results: {len(previews)} 

554 

555Criteria for selection (in order of importance): 

5561. Direct relevance - MUST directly address the specific query topic, not just mention keywords 

5572. Quality - from reputable sources with substantive information about the query 

5583. Recency - prefer recent information when relevant 

559 

560Search results to evaluate: 

561{preview_text} 

562 

563Return a JSON array of indices (0-based) for results that are highly relevant to the query. 

564Valid indices are 0 to {len(previews) - 1}. 

565Only include results that directly help answer the specific question asked. 

566Be selective - it's better to return fewer high-quality results than many mediocre ones. 

567Maximum results to return: {self.max_filtered_results} 

568Example response: [4, 0, 2, 7] 

569 

570Respond with ONLY the JSON array, no other text.""" 

571 

572 try: 

573 # Get LLM's evaluation 

574 response = self.llm.invoke(prompt) 

575 

576 # Log the raw response for debugging 

577 logger.info(f"Raw LLM response for relevance filtering: {response}") 

578 

579 response_text = get_llm_response_text(response) 

580 logger.debug(f"Cleaned response text: {response_text}") 

581 

582 ranked_indices = extract_json(response_text, expected_type=list) 

583 

584 if ranked_indices is not None: 

585 logger.info(f"LLM returned indices: {ranked_indices}") 

586 

587 # Validate that ranked_indices is a list of integers 

588 if not isinstance(ranked_indices, list): 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true

589 logger.warning( 

590 "LLM response is not a list, returning empty results" 

591 ) 

592 return [] 

593 

594 if not all(isinstance(idx, int) for idx in ranked_indices): 

595 logger.warning( 

596 "LLM response contains non-integer indices, returning empty results" 

597 ) 

598 return [] 

599 

600 # Log analysis of the indices 

601 max_index = max(ranked_indices) if ranked_indices else -1 

602 min_index = min(ranked_indices) if ranked_indices else -1 

603 logger.info( 

604 f"Index analysis: min={min_index}, max={max_index}, " 

605 f"valid_range=0-{len(previews) - 1}, count={len(ranked_indices)}" 

606 ) 

607 

608 # Return the results in ranked order 

609 ranked_results = [] 

610 out_of_range = [] 

611 for idx in ranked_indices: 

612 if 0 <= idx < len(previews): 

613 ranked_results.append(previews[idx]) 

614 else: 

615 out_of_range.append(idx) 

616 logger.warning( 

617 f"Index {idx} out of range (valid: 0-{len(previews) - 1}), skipping" 

618 ) 

619 

620 if out_of_range: 

621 logger.error( 

622 f"Out of range indices: {out_of_range}. " 

623 f"Total previews: {len(previews)}, " 

624 f"All returned indices: {ranked_indices}" 

625 ) 

626 

627 # Limit to max_filtered_results if specified 

628 if ( 

629 self.max_filtered_results 

630 and len(ranked_results) > self.max_filtered_results 

631 ): 

632 logger.info( 

633 f"Limiting filtered results to top {self.max_filtered_results}" 

634 ) 

635 return ranked_results[: self.max_filtered_results] 

636 

637 return ranked_results 

638 else: 

639 logger.warning( 

640 "Could not find JSON array in response, returning original previews" 

641 ) 

642 logger.debug( 

643 f"Response text without JSON array: {response_text}" 

644 ) 

645 return previews[: min(5, len(previews))] 

646 

647 except Exception: 

648 logger.exception("Relevance filtering error") 

649 # Fall back to returning top results on error 

650 return previews[: min(5, len(previews))] 

651 

652 @abstractmethod 

653 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

654 """ 

655 Get preview information (titles, summaries) for initial search results. 

656 

657 Args: 

658 query: The search query 

659 

660 Returns: 

661 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys 

662 """ 

663 pass 

664 

665 @abstractmethod 

666 def _get_full_content( 

667 self, relevant_items: List[Dict[str, Any]] 

668 ) -> List[Dict[str, Any]]: 

669 """ 

670 Get full content for the relevant items. 

671 

672 Args: 

673 relevant_items: List of relevant preview dictionaries 

674 

675 Returns: 

676 List of result dictionaries with full content 

677 """ 

678 pass 

679 

680 def close(self) -> None: 

681 """ 

682 Close any resources held by this search engine. 

683 

684 Subclasses with HTTP sessions or other resources should override this. 

685 The base implementation safely closes any 'session' attribute if present. 

686 """ 

687 if hasattr(self, "session") and self.session is not None: 

688 try: 

689 self.session.close() 

690 except Exception: 

691 pass # Safe cleanup - don't raise on close 

692 

693 def __enter__(self): 

694 """Support context manager usage.""" 

695 return self 

696 

697 def __exit__(self, exc_type, exc_val, exc_tb): 

698 """Cleanup on context exit.""" 

699 self.close() 

700 return False