Coverage for src / local_deep_research / web_search_engines / search_engine_base.py: 91%

257 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import json 

2import time 

3from abc import ABC, abstractmethod 

4from datetime import datetime, UTC 

5from typing import Any, Dict, List, Optional 

6 

7from langchain_core.language_models import BaseLLM 

8from loguru import logger 

9from tenacity import ( 

10 RetryError, 

11 retry, 

12 retry_if_exception_type, 

13 stop_after_attempt, 

14) 

15from tenacity.wait import wait_base 

16 

17from ..advanced_search_system.filters.base_filter import BaseFilter 

18from ..utilities.thread_context import set_search_context 

19 

20# Lazy import for metrics to avoid database dependencies in programmatic mode 

21# from ..metrics.search_tracker import get_search_tracker 

22from .rate_limiting import RateLimitError, get_tracker 

23 

24 

25class AdaptiveWait(wait_base): 

26 """Custom wait strategy that uses adaptive rate limiting.""" 

27 

28 def __init__(self, get_wait_func): 

29 self.get_wait_func = get_wait_func 

30 

31 def __call__(self, retry_state): 

32 return self.get_wait_func() 

33 

34 

35class BaseSearchEngine(ABC): 

36 """ 

37 Abstract base class for search engines with two-phase retrieval capability. 

38 Handles common parameters and implements the two-phase search approach. 

39 """ 

40 

41 # Class attribute to indicate if this engine searches public internet sources 

42 # Should be overridden by subclasses - defaults to False for safety 

43 is_public = False 

44 

45 # Class attribute to indicate if this is a generic search engine (vs specialized) 

46 # Generic engines are general web search (Google, Bing, etc) vs specialized (arXiv, PubMed) 

47 is_generic = False 

48 

49 # Class attribute to indicate if this is a scientific/academic search engine 

50 # Scientific engines include arXiv, PubMed, Semantic Scholar, etc. 

51 is_scientific = False 

52 

53 # Class attribute to indicate if this is a local RAG/document search engine 

54 # Local engines search private document collections stored locally 

55 is_local = False 

56 

57 # Class attribute to indicate if this is a news search engine 

58 # News engines specialize in news articles and current events 

59 is_news = False 

60 

61 # Class attribute to indicate if this is a code search engine 

62 # Code engines specialize in searching code repositories 

63 is_code = False 

64 

65 @classmethod 

66 def _load_engine_class(cls, name: str, config: Dict[str, Any]): 

67 """ 

68 Helper method to load an engine class dynamically. 

69 

70 Args: 

71 name: Engine name 

72 config: Engine configuration dict with module_path and class_name 

73 

74 Returns: 

75 Tuple of (success: bool, engine_class or None, error_msg or None) 

76 """ 

77 import importlib 

78 

79 try: 

80 module_path = config.get("module_path") 

81 class_name = config.get("class_name") 

82 

83 if not module_path or not class_name: 

84 return ( 

85 False, 

86 None, 

87 f"Missing module_path or class_name for {name}", 

88 ) 

89 

90 # Import the module 

91 package = None 

92 if module_path.startswith("."): 92 ↛ 95line 92 didn't jump to line 95 because the condition on line 92 was always true

93 # This is a relative import 

94 package = "local_deep_research.web_search_engines" 

95 module = importlib.import_module(module_path, package=package) 

96 engine_class = getattr(module, class_name) 

97 

98 return True, engine_class, None 

99 

100 except Exception as e: 

101 return False, None, f"Could not load engine class for {name}: {e}" 

102 

103 @classmethod 

104 def _check_api_key_availability( 

105 cls, name: str, config: Dict[str, Any] 

106 ) -> bool: 

107 """ 

108 Helper method to check if an engine's API key is available and valid. 

109 

110 Args: 

111 name: Engine name 

112 config: Engine configuration dict 

113 

114 Returns: 

115 True if API key is not required or is available and valid 

116 """ 

117 from loguru import logger 

118 

119 if not config.get("requires_api_key", False): 

120 return True 

121 

122 api_key = config.get("api_key", "").strip() 

123 

124 # Check for common placeholder values 

125 if ( 

126 not api_key 

127 or api_key in ["", "None", "PLACEHOLDER", "YOUR_API_KEY_HERE"] 

128 or api_key.endswith( 

129 "_API_KEY" 

130 ) # Default placeholders like BRAVE_API_KEY 

131 or api_key.startswith("YOUR_") 

132 or api_key == "null" 

133 ): 

134 logger.debug( 

135 f"Skipping {name} - requires API key but none configured" 

136 ) 

137 return False 

138 

139 return True 

140 

141 def __init__( 

142 self, 

143 llm: Optional[BaseLLM] = None, 

144 max_filtered_results: Optional[int] = None, 

145 max_results: Optional[int] = 10, # Default value if not provided 

146 preview_filters: List[BaseFilter] | None = None, 

147 content_filters: List[BaseFilter] | None = None, 

148 search_snippets_only: bool = True, # New parameter with default 

149 settings_snapshot: Optional[Dict[str, Any]] = None, 

150 programmatic_mode: bool = False, 

151 **kwargs, 

152 ): 

153 """ 

154 Initialize the search engine with common parameters. 

155 

156 Args: 

157 llm: Optional language model for relevance filtering 

158 max_filtered_results: Maximum number of results to keep after filtering 

159 max_results: Maximum number of search results to return 

160 preview_filters: Filters that will be applied to all previews 

161 produced by the search engine, before relevancy checks. 

162 content_filters: Filters that will be applied to the full content 

163 produced by the search engine, after relevancy checks. 

164 search_snippets_only: Whether to return only snippets or full content 

165 settings_snapshot: Settings snapshot for configuration 

166 programmatic_mode: If True, disables database operations and uses memory-only tracking 

167 **kwargs: Additional engine-specific parameters 

168 """ 

169 if max_filtered_results is None: 

170 max_filtered_results = 5 

171 if max_results is None: 

172 max_results = 10 

173 self._preview_filters: List[BaseFilter] = preview_filters 

174 if self._preview_filters is None: 

175 self._preview_filters = [] 

176 self._content_filters: List[BaseFilter] = content_filters 

177 if self._content_filters is None: 

178 self._content_filters = [] 

179 

180 self.llm = llm # LLM for relevance filtering 

181 self._max_filtered_results = int( 

182 max_filtered_results 

183 ) # Ensure it's an integer 

184 self._max_results = max( 

185 1, int(max_results) 

186 ) # Ensure it's a positive integer 

187 self.search_snippets_only = search_snippets_only # Store the setting 

188 self.settings_snapshot = ( 

189 settings_snapshot or {} 

190 ) # Store settings snapshot 

191 self.programmatic_mode = programmatic_mode 

192 

193 # Rate limiting attributes 

194 self.engine_type = self.__class__.__name__ 

195 # Create a tracker with our settings if in programmatic mode 

196 if self.programmatic_mode: 

197 from .rate_limiting.tracker import AdaptiveRateLimitTracker 

198 

199 self.rate_tracker = AdaptiveRateLimitTracker( 

200 settings_snapshot=self.settings_snapshot, 

201 programmatic_mode=self.programmatic_mode, 

202 ) 

203 else: 

204 self.rate_tracker = get_tracker() 

205 self._last_wait_time = ( 

206 0.0 # Default to 0 for successful searches without rate limiting 

207 ) 

208 self._last_results_count = 0 

209 

210 @property 

211 def max_filtered_results(self) -> int: 

212 """Get the maximum number of filtered results.""" 

213 return self._max_filtered_results 

214 

215 @max_filtered_results.setter 

216 def max_filtered_results(self, value: int) -> None: 

217 """Set the maximum number of filtered results.""" 

218 if value is None: 

219 value = 5 

220 logger.warning("Setting max_filtered_results to 5") 

221 self._max_filtered_results = int(value) 

222 

223 @property 

224 def max_results(self) -> int: 

225 """Get the maximum number of search results.""" 

226 return self._max_results 

227 

228 @max_results.setter 

229 def max_results(self, value: int) -> None: 

230 """Set the maximum number of search results.""" 

231 if value is None: 

232 value = 10 

233 self._max_results = max(1, int(value)) 

234 

235 def _get_adaptive_wait(self) -> float: 

236 """Get adaptive wait time from tracker.""" 

237 wait_time = self.rate_tracker.get_wait_time(self.engine_type) 

238 self._last_wait_time = wait_time 

239 logger.debug( 

240 f"{self.engine_type} waiting {wait_time:.2f}s before retry" 

241 ) 

242 return wait_time 

243 

244 def _record_retry_outcome(self, retry_state) -> None: 

245 """Record outcome after retry completes.""" 

246 success = ( 

247 not retry_state.outcome.failed if retry_state.outcome else False 

248 ) 

249 self.rate_tracker.record_outcome( 

250 self.engine_type, 

251 self._last_wait_time or 0, 

252 success, 

253 retry_state.attempt_number, 

254 error_type="RateLimitError" if not success else None, 

255 search_result_count=self._last_results_count if success else 0, 

256 ) 

257 

258 def run( 

259 self, query: str, research_context: Dict[str, Any] | None = None 

260 ) -> List[Dict[str, Any]]: 

261 """ 

262 Run the search engine with a given query, retrieving and filtering results. 

263 This implements a two-phase retrieval approach: 

264 1. Get preview information for many results 

265 2. Filter the previews for relevance 

266 3. Get full content for only the relevant results 

267 

268 Args: 

269 query: The search query 

270 research_context: Context from previous research to use. 

271 

272 Returns: 

273 List of search results with full content (if available) 

274 """ 

275 # Track search call for metrics (if available and not in programmatic mode) 

276 tracker = None 

277 if not self.programmatic_mode: 

278 from ..metrics.search_tracker import get_search_tracker 

279 

280 # For thread-safe context propagation: if we have research_context parameter, use it 

281 # Otherwise, try to inherit from current thread context (normal case) 

282 # This allows strategies running in threads to explicitly pass context when needed 

283 if research_context: 

284 # Explicit context provided - use it and set it for this thread 

285 set_search_context(research_context) 

286 

287 # Get tracker after context is set (either from parameter or thread) 

288 tracker = get_search_tracker() 

289 

290 engine_name = self.__class__.__name__.replace( 

291 "SearchEngine", "" 

292 ).lower() 

293 start_time = time.time() 

294 

295 success = True 

296 error_message = None 

297 results_count = 0 

298 

299 # Define the core search function with retry logic 

300 if self.rate_tracker.enabled: 

301 # Rate limiting enabled - use retry with adaptive wait 

302 @retry( 

303 stop=stop_after_attempt(3), 

304 wait=AdaptiveWait(lambda: self._get_adaptive_wait()), 

305 retry=retry_if_exception_type((RateLimitError,)), 

306 after=self._record_retry_outcome, 

307 reraise=True, 

308 ) 

309 def _run_with_retry(): 

310 nonlocal success, error_message, results_count 

311 return _execute_search() 

312 else: 

313 # Rate limiting disabled - run without retry 

314 def _run_with_retry(): 

315 nonlocal success, error_message, results_count 

316 return _execute_search() 

317 

318 def _execute_search(): 

319 nonlocal success, error_message, results_count 

320 

321 try: 

322 # Step 1: Get preview information for items 

323 previews = self._get_previews(query) 

324 if not previews: 

325 logger.info( 

326 f"Search engine {self.__class__.__name__} returned no preview results for query: {query}" 

327 ) 

328 results_count = 0 

329 return [] 

330 

331 for preview_filter in self._preview_filters: 

332 previews = preview_filter.filter_results(previews, query) 

333 

334 # Step 2: Filter previews for relevance with LLM 

335 # Check if LLM relevance filtering should be enabled 

336 enable_llm_filter = getattr( 

337 self, "enable_llm_relevance_filter", False 

338 ) 

339 

340 logger.info( 

341 f"BaseSearchEngine: Relevance filter check - enable_llm_relevance_filter={enable_llm_filter}, has_llm={self.llm is not None}, engine_type={type(self).__name__}" 

342 ) 

343 

344 if enable_llm_filter and self.llm: 

345 logger.info( 

346 f"Applying LLM relevance filter to {len(previews)} previews" 

347 ) 

348 filtered_items = self._filter_for_relevance(previews, query) 

349 logger.info( 

350 f"LLM filter kept {len(filtered_items)} of {len(previews)} results" 

351 ) 

352 else: 

353 filtered_items = previews 

354 if not enable_llm_filter: 354 ↛ 358line 354 didn't jump to line 358 because the condition on line 354 was always true

355 logger.info( 

356 f"LLM relevance filtering disabled (enable_llm_relevance_filter={enable_llm_filter}) - returning all {len(previews)} previews" 

357 ) 

358 elif not self.llm: 

359 logger.info( 

360 f"No LLM available for relevance filtering - returning all {len(previews)} previews" 

361 ) 

362 

363 # Step 3: Get full content for filtered items 

364 if self.search_snippets_only: 

365 logger.info("Returning snippet-only results as per config") 

366 results = filtered_items 

367 else: 

368 results = self._get_full_content(filtered_items) 

369 

370 for content_filter in self._content_filters: 

371 results = content_filter.filter_results(results, query) 

372 

373 results_count = len(results) 

374 self._last_results_count = results_count 

375 

376 # Record success if we get here and rate limiting is enabled 

377 if self.rate_tracker.enabled: 

378 logger.info( 

379 f"Recording successful search for {self.engine_type}: wait_time={self._last_wait_time}s, results={results_count}" 

380 ) 

381 self.rate_tracker.record_outcome( 

382 self.engine_type, 

383 self._last_wait_time, 

384 success=True, 

385 retry_count=1, # First attempt succeeded 

386 search_result_count=results_count, 

387 ) 

388 else: 

389 logger.info( 

390 f"Rate limiting disabled, not recording search for {self.engine_type}" 

391 ) 

392 

393 return results 

394 

395 except RateLimitError: 

396 # Only re-raise if rate limiting is enabled 

397 if self.rate_tracker.enabled: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 raise 

399 else: 

400 # If rate limiting is disabled, treat as regular error 

401 success = False 

402 error_message = "Rate limit hit but rate limiting disabled" 

403 logger.warning( 

404 f"Rate limit hit on {self.__class__.__name__} but rate limiting is disabled" 

405 ) 

406 results_count = 0 

407 return [] 

408 except Exception as e: 

409 # Other errors - don't retry 

410 success = False 

411 error_message = str(e) 

412 logger.exception( 

413 f"Search engine {self.__class__.__name__} failed" 

414 ) 

415 results_count = 0 

416 return [] 

417 

418 try: 

419 return _run_with_retry() 

420 except RetryError as e: 

421 # All retries exhausted 

422 success = False 

423 error_message = f"Rate limited after all retries: {e}" 

424 logger.exception( 

425 f"{self.__class__.__name__} failed after all retries" 

426 ) 

427 return [] 

428 except Exception as e: 

429 success = False 

430 error_message = str(e) 

431 logger.exception(f"Search engine {self.__class__.__name__} error") 

432 return [] 

433 finally: 

434 # Record search metrics (if tracking is available) 

435 if tracker is not None: 

436 response_time_ms = int((time.time() - start_time) * 1000) 

437 tracker.record_search( 

438 engine_name=engine_name, 

439 query=query, 

440 results_count=results_count, 

441 response_time_ms=response_time_ms, 

442 success=success, 

443 error_message=error_message, 

444 ) 

445 

446 def invoke(self, query: str) -> List[Dict[str, Any]]: 

447 """Compatibility method for LangChain tools""" 

448 return self.run(query) 

449 

450 def _filter_for_relevance( 

451 self, previews: List[Dict[str, Any]], query: str 

452 ) -> List[Dict[str, Any]]: 

453 """ 

454 Filter search results by relevance to the query using the LLM. 

455 

456 Args: 

457 previews: List of preview dictionaries 

458 query: The original search query 

459 

460 Returns: 

461 Filtered list of preview dictionaries 

462 """ 

463 # If no LLM or too few previews, return all 

464 if not self.llm or len(previews) <= 1: 

465 return previews 

466 

467 # Log the number of previews we're processing 

468 logger.info(f"Filtering {len(previews)} previews for relevance") 

469 

470 # Create a simple context for LLM 

471 preview_context = [] 

472 indices_used = [] 

473 for i, preview in enumerate(previews): 

474 title = preview.get("title", "Untitled").strip() 

475 snippet = preview.get("snippet", "").strip() 

476 url = preview.get("url", "").strip() 

477 

478 # Clean up snippet if too long 

479 if len(snippet) > 300: 

480 snippet = snippet[:300] + "..." 

481 

482 preview_context.append( 

483 f"[{i}] Title: {title}\nURL: {url}\nSnippet: {snippet}" 

484 ) 

485 indices_used.append(i) 

486 

487 # Log the indices we're presenting to the LLM 

488 logger.info( 

489 f"Created preview context with indices 0-{len(previews) - 1}" 

490 ) 

491 logger.info( 

492 f"First 5 indices in prompt: {indices_used[:5]}, Last 5: {indices_used[-5:] if len(indices_used) > 5 else 'N/A'}" 

493 ) 

494 

495 # Join all previews with clear separation 

496 preview_text = "\n\n".join(preview_context) 

497 

498 # Log a sample of what we're sending to the LLM 

499 logger.debug( 

500 f"First preview in prompt: {preview_context[0] if preview_context else 'None'}" 

501 ) 

502 if len(preview_context) > 1: 502 ↛ 506line 502 didn't jump to line 506 because the condition on line 502 was always true

503 logger.debug(f"Last preview in prompt: {preview_context[-1]}") 

504 

505 # Set a reasonable limit on context length 

506 current_date = datetime.now(UTC).strftime("%Y-%m-%d") 

507 prompt = f"""Analyze these search results and select the most relevant ones for the query. 

508 

509Query: "{query}" 

510Current date: {current_date} 

511Total results: {len(previews)} 

512 

513Criteria for selection (in order of importance): 

5141. Direct relevance - MUST directly address the specific query topic, not just mention keywords 

5152. Quality - from reputable sources with substantive information about the query 

5163. Recency - prefer recent information when relevant 

517 

518Search results to evaluate: 

519{preview_text} 

520 

521Return a JSON array of indices (0-based) for results that are highly relevant to the query. 

522Valid indices are 0 to {len(previews) - 1}. 

523Only include results that directly help answer the specific question asked. 

524Be selective - it's better to return fewer high-quality results than many mediocre ones. 

525Maximum results to return: {self.max_filtered_results} 

526Example response: [4, 0, 2, 7] 

527 

528Respond with ONLY the JSON array, no other text.""" 

529 

530 try: 

531 # Get LLM's evaluation 

532 response = self.llm.invoke(prompt) 

533 

534 # Log the raw response for debugging 

535 logger.info(f"Raw LLM response for relevance filtering: {response}") 

536 

537 # Handle different response formats 

538 response_text = "" 

539 if hasattr(response, "content"): 539 ↛ 542line 539 didn't jump to line 542 because the condition on line 539 was always true

540 response_text = response.content 

541 else: 

542 response_text = str(response) 

543 

544 # Clean up response 

545 response_text = response_text.strip() 

546 logger.debug(f"Cleaned response text: {response_text}") 

547 

548 # Find JSON array in response 

549 start_idx = response_text.find("[") 

550 end_idx = response_text.rfind("]") 

551 

552 if start_idx >= 0 and end_idx > start_idx: 

553 array_text = response_text[start_idx : end_idx + 1] 

554 try: 

555 ranked_indices = json.loads(array_text) 

556 logger.info(f"LLM returned indices: {ranked_indices}") 

557 

558 # Validate that ranked_indices is a list of integers 

559 if not isinstance(ranked_indices, list): 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 logger.warning( 

561 "LLM response is not a list, returning empty results" 

562 ) 

563 return [] 

564 

565 if not all(isinstance(idx, int) for idx in ranked_indices): 

566 logger.warning( 

567 "LLM response contains non-integer indices, returning empty results" 

568 ) 

569 return [] 

570 

571 # Log analysis of the indices 

572 max_index = max(ranked_indices) if ranked_indices else -1 

573 min_index = min(ranked_indices) if ranked_indices else -1 

574 logger.info( 

575 f"Index analysis: min={min_index}, max={max_index}, " 

576 f"valid_range=0-{len(previews) - 1}, count={len(ranked_indices)}" 

577 ) 

578 

579 # Return the results in ranked order 

580 ranked_results = [] 

581 out_of_range = [] 

582 for idx in ranked_indices: 

583 if 0 <= idx < len(previews): 

584 ranked_results.append(previews[idx]) 

585 else: 

586 out_of_range.append(idx) 

587 logger.warning( 

588 f"Index {idx} out of range (valid: 0-{len(previews) - 1}), skipping" 

589 ) 

590 

591 if out_of_range: 

592 logger.error( 

593 f"Out of range indices: {out_of_range}. " 

594 f"Total previews: {len(previews)}, " 

595 f"All returned indices: {ranked_indices}" 

596 ) 

597 

598 # Limit to max_filtered_results if specified 

599 if ( 

600 self.max_filtered_results 

601 and len(ranked_results) > self.max_filtered_results 

602 ): 

603 logger.info( 

604 f"Limiting filtered results to top {self.max_filtered_results}" 

605 ) 

606 return ranked_results[: self.max_filtered_results] 

607 

608 return ranked_results 

609 

610 except json.JSONDecodeError as e: 

611 logger.warning( 

612 f"Failed to parse JSON from LLM response: {e}" 

613 ) 

614 logger.debug(f"Problematic JSON text: {array_text}") 

615 return [] 

616 else: 

617 logger.warning( 

618 "Could not find JSON array in response, returning original previews" 

619 ) 

620 logger.debug( 

621 f"Response text without JSON array: {response_text}" 

622 ) 

623 return previews[: min(5, len(previews))] 

624 

625 except Exception: 

626 logger.exception("Relevance filtering error") 

627 # Fall back to returning top results on error 

628 return previews[: min(5, len(previews))] 

629 

630 @abstractmethod 

631 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

632 """ 

633 Get preview information (titles, summaries) for initial search results. 

634 

635 Args: 

636 query: The search query 

637 

638 Returns: 

639 List of preview dictionaries with at least 'id', 'title', and 'snippet' keys 

640 """ 

641 pass 

642 

643 @abstractmethod 

644 def _get_full_content( 

645 self, relevant_items: List[Dict[str, Any]] 

646 ) -> List[Dict[str, Any]]: 

647 """ 

648 Get full content for the relevant items. 

649 

650 Args: 

651 relevant_items: List of relevant preview dictionaries 

652 

653 Returns: 

654 List of result dictionaries with full content 

655 """ 

656 pass