Coverage for src / local_deep_research / web_search_engines / engines / parallel_search_engine.py: 85%

287 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import os 

2import concurrent.futures 

3from typing import Any, Dict, List, Optional 

4from threading import Lock 

5import atexit 

6 

7from loguru import logger 

8 

9from ...config.search_config import get_setting_from_snapshot 

10from ...utilities.enums import SearchMode 

11from ...utilities.json_utils import extract_json, get_llm_response_text 

12from ...utilities.thread_context import preserve_research_context 

13from ...web.services.socket_service import SocketIOService 

14from ..search_engine_base import BaseSearchEngine 

15from ..search_engine_factory import create_search_engine 

16 

17# Global thread pool shared by all ParallelSearchEngine instances 

18# This prevents creating multiple thread pools and having more threads than expected 

19_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None 

20_global_executor_lock = Lock() 

21 

22 

23def _get_global_executor( 

24 max_workers: Optional[int] = None, 

25) -> Optional[concurrent.futures.ThreadPoolExecutor]: 

26 """ 

27 Get or initialize the global thread pool executor. 

28 Thread-safe lazy initialization ensures only one pool is created. 

29 

30 Args: 

31 max_workers: Number of worker threads. If None, uses Python's recommended 

32 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

33 Only used on first initialization; subsequent calls ignore this parameter. 

34 

35 Returns: 

36 The global ThreadPoolExecutor instance, or None if initialization fails 

37 """ 

38 global _global_executor 

39 

40 with _global_executor_lock: 

41 if _global_executor is None: 

42 if max_workers is None: 

43 max_workers = min(32, (os.cpu_count() or 1) + 4) 

44 

45 try: 

46 _global_executor = concurrent.futures.ThreadPoolExecutor( 

47 max_workers=max_workers, 

48 thread_name_prefix="parallel_search_", 

49 ) 

50 logger.info( 

51 f"Initialized global ThreadPool with {max_workers} workers " 

52 f"(shared by all ParallelSearchEngine instances)" 

53 ) 

54 except Exception: 

55 logger.exception( 

56 "Failed to create global ThreadPoolExecutor, parallel search will not work" 

57 ) 

58 return None 

59 

60 return _global_executor 

61 

62 

63def shutdown_global_executor(wait: bool = True): 

64 """ 

65 Shutdown the global thread pool executor. 

66 

67 This is called automatically at process exit via atexit. 

68 After calling this, any new ParallelSearchEngine instances will create a new pool. 

69 

70 Args: 

71 wait: If True, wait for all threads to complete before returning 

72 """ 

73 global _global_executor 

74 

75 with _global_executor_lock: 

76 if _global_executor is not None: 

77 try: 

78 _global_executor.shutdown(wait=wait) 

79 logger.info("Global ThreadPool shutdown complete") 

80 except Exception: 

81 logger.exception("Error shutting down global ThreadPool") 

82 finally: 

83 _global_executor = None 

84 

85 

86# Register automatic cleanup at process exit 

87atexit.register(lambda: shutdown_global_executor(wait=True)) 

88 

89 

90class ParallelSearchEngine(BaseSearchEngine): 

91 """ 

92 Parallel search engine that executes multiple search engines simultaneously. 

93 Uses LLM to select appropriate engines based on query, then runs them all in parallel. 

94 

95 Thread Pool Management: 

96 All instances share a single global thread pool to prevent thread proliferation. 

97 The pool is automatically cleaned up at process exit. 

98 

99 Usage: 

100 engine = ParallelSearchEngine(llm) 

101 results = engine.run("query") 

102 # No manual cleanup needed - global pool is shared and cleaned up automatically 

103 """ 

104 

105 def __init__( 

106 self, 

107 llm, 

108 max_results: int = 10, 

109 use_api_key_services: bool = True, 

110 max_engines_to_select: int = 100, # Allow selecting all available engines 

111 allow_local_engines: bool = False, # Disabled by default for privacy 

112 include_generic_engines: bool = True, # Always include generic search engines 

113 search_mode: SearchMode = SearchMode.ALL, 

114 max_filtered_results: Optional[int] = None, 

115 settings_snapshot: Optional[Dict[str, Any]] = None, 

116 programmatic_mode: bool = False, 

117 max_workers: Optional[ 

118 int 

119 ] = None, # Thread pool size (None = auto-detect) 

120 **kwargs, 

121 ): 

122 """ 

123 Initialize the parallel search engine. 

124 

125 All instances share a global thread pool. The first instance created will 

126 initialize the pool with the specified max_workers (or auto-detected value). 

127 Subsequent instances reuse the existing pool. 

128 

129 Args: 

130 llm: Language model instance for query classification 

131 max_results: Maximum number of search results to return per engine 

132 use_api_key_services: Whether to include services that require API keys 

133 max_engines_to_select: Maximum number of engines to select for parallel execution 

134 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web) 

135 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.) 

136 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only 

137 max_filtered_results: Maximum number of results to keep after filtering 

138 settings_snapshot: Settings snapshot for thread context 

139 programmatic_mode: If True, disables database operations and metrics tracking 

140 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's 

141 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

142 Ignored if the global pool is already initialized. 

143 **kwargs: Additional parameters (ignored but accepted for compatibility) 

144 """ 

145 # Override max_filtered_results to be much higher for parallel search 

146 # If not explicitly set, use 50 instead of the default 5-20 

147 if max_filtered_results is None: 

148 max_filtered_results = 50 

149 logger.info( 

150 f"Setting max_filtered_results to {max_filtered_results} for parallel search" 

151 ) 

152 

153 super().__init__( 

154 llm=llm, 

155 max_filtered_results=max_filtered_results, 

156 max_results=max_results, 

157 settings_snapshot=settings_snapshot, 

158 programmatic_mode=programmatic_mode, 

159 ) 

160 

161 self.use_api_key_services = use_api_key_services 

162 self.max_engines_to_select = max_engines_to_select 

163 self.allow_local_engines = allow_local_engines 

164 self.include_generic_engines = include_generic_engines 

165 self.search_mode = search_mode 

166 self.settings_snapshot = settings_snapshot 

167 

168 # Disable LLM relevance filtering at the parallel level by default 

169 # Individual engines can still filter their own results 

170 # Double filtering (engines + parallel) is too aggressive 

171 self.enable_llm_relevance_filter = kwargs.get( 

172 "enable_llm_relevance_filter", False 

173 ) 

174 

175 # Cache for engine instances 

176 self.engine_cache = {} 

177 self.cache_lock = Lock() 

178 

179 # Initialize global thread pool (thread-safe, only happens once) 

180 # All instances share the same pool to prevent thread proliferation 

181 _get_global_executor(max_workers) 

182 

183 # Get available engines (excluding 'meta', 'auto', and 'parallel') 

184 self.available_engines = self._get_available_engines() 

185 logger.info( 

186 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}" 

187 ) 

188 

189 def _check_api_key_availability( 

190 self, name: str, config_: Dict[str, Any] 

191 ) -> bool: 

192 """ 

193 Check if API keys are available for engines that require them. 

194 

195 Args: 

196 name: Engine name 

197 config_: Engine configuration 

198 

199 Returns: 

200 True if the engine can be used (API key available or not required) 

201 """ 

202 # If engine doesn't require API key, it's available 

203 if not config_.get("requires_api_key", False): 

204 return True 

205 

206 # Check if API key is configured 

207 api_key_setting = config_.get("api_key_setting") 

208 if api_key_setting and self.settings_snapshot: 

209 api_key = self.settings_snapshot.get(api_key_setting) 

210 if api_key and str(api_key).strip(): 

211 return True 

212 logger.debug(f"Skipping {name} - API key not configured") 

213 return False 

214 

215 # No API key setting defined, assume it's available 

216 return True 

217 

218 def _get_search_config(self) -> Dict[str, Any]: 

219 """Get search config from settings_snapshot""" 

220 if self.settings_snapshot: 

221 # Extract search engine configs from settings snapshot 

222 config_data = {} 

223 for key, value in self.settings_snapshot.items(): 

224 if key.startswith("search.engine.web."): 

225 parts = key.split(".") 

226 if len(parts) >= 4: 226 ↛ 223line 226 didn't jump to line 223 because the condition on line 226 was always true

227 engine_name = parts[3] 

228 if engine_name not in config_data: 

229 config_data[engine_name] = {} 

230 remaining_key = ( 

231 ".".join(parts[4:]) if len(parts) > 4 else "" 

232 ) 

233 if remaining_key: 

234 config_data[engine_name][remaining_key] = ( 

235 value.get("value") 

236 if isinstance(value, dict) 

237 else value 

238 ) 

239 return config_data 

240 else: 

241 return {} 

242 

243 def _get_available_engines(self) -> List[str]: 

244 """Get list of available engines based on is_public flag and API key availability""" 

245 available = [] 

246 config_data = self._get_search_config() 

247 

248 for name, config_ in config_data.items(): 

249 # Skip meta, auto, and parallel engines (special engines) 

250 if name in ["meta", "auto", "parallel"]: 

251 continue 

252 

253 # Try to get the engine class to check is_public flag 

254 success, engine_class, error_msg = ( 

255 BaseSearchEngine._load_engine_class(name, config_) 

256 ) 

257 

258 if not success: 

259 logger.debug(error_msg) 

260 continue 

261 

262 # Check if engine is public or if local engines are allowed 

263 if hasattr(engine_class, "is_public"): 263 ↛ 273line 263 didn't jump to line 273 because the condition on line 263 was always true

264 if not engine_class.is_public and not self.allow_local_engines: 

265 logger.debug(f"Skipping local/private engine: {name}") 

266 continue 

267 elif not engine_class.is_public and self.allow_local_engines: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 logger.warning( 

269 f"Including local/private engine {name} - data may be exposed" 

270 ) 

271 else: 

272 # No is_public flag - assume it's private/local for safety 

273 if not self.allow_local_engines: 

274 logger.debug( 

275 f"Skipping engine {name} - no is_public flag and local engines not allowed" 

276 ) 

277 continue 

278 

279 # Apply scientific mode filtering if enabled 

280 if self.search_mode == SearchMode.SCIENTIFIC: 

281 # In scientific mode: include scientific engines AND generic engines 

282 # Exclude non-scientific specialized engines (like Guardian, Wayback) 

283 is_scientific = getattr(engine_class, "is_scientific", False) 

284 is_generic = getattr(engine_class, "is_generic", False) 

285 

286 if not (is_scientific or is_generic): 

287 logger.debug( 

288 f"Skipping {name} in scientific mode (not scientific or generic)" 

289 ) 

290 continue 

291 

292 logger.debug( 

293 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})" 

294 ) 

295 # Skip engines that require API keys if we don't want to use them 

296 if ( 296 ↛ 300line 296 didn't jump to line 300 because the condition on line 296 was never true

297 config_.get("requires_api_key", False) 

298 and not self.use_api_key_services 

299 ): 

300 logger.debug( 

301 f"Skipping {name} - requires API key and use_api_key_services is False" 

302 ) 

303 continue 

304 

305 # Check API key availability 

306 if not self._check_api_key_availability(name, config_): 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 continue 

308 

309 available.append(name) 

310 

311 return available 

312 

313 def _get_available_generic_engines(self) -> List[str]: 

314 """Get list of available generic search engines that pass API key checks""" 

315 generic_engines = [] 

316 config_data = self._get_search_config() 

317 

318 for name, config_ in config_data.items(): 318 ↛ 320line 318 didn't jump to line 320 because the loop on line 318 never started

319 # Skip if not in available engines (already filtered for API keys etc) 

320 if name not in self.available_engines: 

321 continue 

322 

323 # Load the engine class to check is_generic flag 

324 success, engine_class, error_msg = ( 

325 BaseSearchEngine._load_engine_class(name, config_) 

326 ) 

327 

328 if not success: 

329 logger.debug( 

330 f"Could not check if {name} is generic: {error_msg}" 

331 ) 

332 continue 

333 

334 # Check if engine is generic 

335 if getattr(engine_class, "is_generic", False): 

336 generic_engines.append(name) 

337 logger.debug(f"Found generic engine: {name}") 

338 

339 return generic_engines 

340 

341 def select_engines(self, query: str) -> List[str]: 

342 """ 

343 Use LLM to select appropriate search engines based only on names. 

344 

345 Args: 

346 query: The search query 

347 

348 Returns: 

349 List of selected engine names 

350 """ 

351 if not self.llm or not self.available_engines: 

352 logger.warning( 

353 "No LLM or no available engines, using all available" 

354 ) 

355 return self.available_engines[: self.max_engines_to_select] 

356 

357 try: 

358 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added) 

359 engines_for_selection = self.available_engines.copy() 

360 generic_engines = [] 

361 

362 if self.include_generic_engines: 

363 generic_engines = self._get_available_generic_engines() 

364 # Remove generic engines from selection since they'll be added automatically 

365 engines_for_selection = [ 

366 e for e in engines_for_selection if e not in generic_engines 

367 ] 

368 logger.debug( 

369 f"Excluding generic engines from LLM selection: {generic_engines}" 

370 ) 

371 

372 # If no specialized engines available, just return the generic ones 

373 if not engines_for_selection: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 logger.info( 

375 f"No specialized engines available, using generic engines: {generic_engines}" 

376 ) 

377 return generic_engines 

378 

379 # Create a simple prompt with just non-generic engine names 

380 engine_list = "\n".join( 

381 [ 

382 f"[{i}] {name}" 

383 for i, name in enumerate(engines_for_selection) 

384 ] 

385 ) 

386 

387 logger.debug(f"Engines for LLM selection: {engines_for_selection}") 

388 

389 prompt = f"""Query: {query} 

390 

391Available search engines: 

392{engine_list} 

393 

394Select the most appropriate search engines for this query. Return ONLY a JSON array of indices. 

395Example: [0, 2, 5] 

396 

397Select up to {self.max_engines_to_select} engines that would best answer this query.""" 

398 

399 logger.debug("Sending prompt to LLM for engine selection") 

400 # Get LLM response 

401 response = self.llm.invoke(prompt) 

402 content = get_llm_response_text(response) 

403 

404 indices = extract_json(content, expected_type=list) 

405 

406 if indices is not None: 

407 # Convert indices to engine names (from the filtered list) 

408 selected_engines = [] 

409 for idx in indices: 

410 if isinstance(idx, int) and 0 <= idx < len( 

411 engines_for_selection 

412 ): 

413 selected_engines.append(engines_for_selection[idx]) 

414 

415 if selected_engines: 415 ↛ 427line 415 didn't jump to line 427 because the condition on line 415 was always true

416 # Add generic search engines (they were excluded from selection) 

417 if self.include_generic_engines: 

418 for engine in generic_engines: 

419 if engine not in selected_engines: 419 ↛ 418line 419 didn't jump to line 418 because the condition on line 419 was always true

420 selected_engines.append(engine) 

421 logger.debug(f"Added generic engine: {engine}") 

422 

423 logger.info(f"Final selected engines: {selected_engines}") 

424 return selected_engines 

425 

426 # Fallback if parsing fails - return generic engines plus some specialized ones 

427 logger.warning( 

428 "Failed to parse LLM response, using generic engines + top specialized" 

429 ) 

430 result = ( 

431 generic_engines.copy() if self.include_generic_engines else [] 

432 ) 

433 for engine in self.available_engines[: self.max_engines_to_select]: 

434 if engine not in result: 

435 result.append(engine) 

436 return result 

437 

438 except Exception: 

439 logger.exception("Error selecting engines with LLM") 

440 # Fallback to using generic engines + available engines 

441 if self.include_generic_engines: 441 ↛ 450line 441 didn't jump to line 450 because the condition on line 441 was always true

442 generic_engines = self._get_available_generic_engines() 

443 result = generic_engines.copy() 

444 for engine in self.available_engines[ 

445 : self.max_engines_to_select 

446 ]: 

447 if engine not in result: 447 ↛ 444line 447 didn't jump to line 444 because the condition on line 447 was always true

448 result.append(engine) 

449 return result 

450 return self.available_engines[: self.max_engines_to_select] 

451 

452 def _get_engine_instance( 

453 self, engine_name: str 

454 ) -> Optional[BaseSearchEngine]: 

455 """Get or create an instance of the specified search engine""" 

456 with self.cache_lock: 

457 # Return cached instance if available 

458 if engine_name in self.engine_cache: 

459 return self.engine_cache[engine_name] 

460 

461 # Create a new instance 

462 engine = None 

463 try: 

464 # Only pass parameters that all engines accept 

465 common_params = { 

466 "llm": self.llm, 

467 "max_results": self.max_results, 

468 } 

469 

470 # Add max_filtered_results if specified 

471 if self.max_filtered_results is not None: 471 ↛ 476line 471 didn't jump to line 476 because the condition on line 471 was always true

472 common_params["max_filtered_results"] = ( 

473 self.max_filtered_results 

474 ) 

475 

476 engine = create_search_engine( 

477 engine_name, 

478 settings_snapshot=self.settings_snapshot, 

479 programmatic_mode=self.programmatic_mode, 

480 **common_params, 

481 ) 

482 

483 # Individual engines use their auto-detected filtering settings 

484 # The factory enables LLM filtering for scientific engines (arXiv, etc.) 

485 # and disables it for generic engines (Google, Brave, etc.) 

486 except Exception: 

487 logger.exception( 

488 f"Error creating engine instance for {engine_name}" 

489 ) 

490 return None 

491 

492 if engine: 492 ↛ 496line 492 didn't jump to line 496 because the condition on line 492 was always true

493 # Cache the instance 

494 self.engine_cache[engine_name] = engine 

495 

496 return engine 

497 

498 @preserve_research_context 

499 def _execute_single_engine( 

500 self, engine_name: str, query: str 

501 ) -> Dict[str, Any]: 

502 """ 

503 Execute a single search engine and return results. 

504 

505 Note: This method is decorated with @preserve_research_context to ensure 

506 rate limiting context is properly propagated to child threads. 

507 

508 Args: 

509 engine_name: Name of the engine to execute 

510 query: The search query 

511 

512 Returns: 

513 Dictionary with engine name and results or error 

514 """ 

515 logger.info(f"Executing search on {engine_name}") 

516 

517 engine = self._get_engine_instance(engine_name) 

518 if not engine: 

519 return { 

520 "engine": engine_name, 

521 "success": False, 

522 "error": f"Failed to initialize {engine_name}", 

523 "results": [], 

524 } 

525 

526 try: 

527 # Run the engine properly through its run() method 

528 # This ensures proper filter application, context propagation, etc. 

529 results = engine.run(query) 

530 

531 if results and len(results) > 0: 

532 logger.info(f"Got {len(results)} results from {engine_name}") 

533 return { 

534 "engine": engine_name, 

535 "success": True, 

536 "results": results, 

537 "count": len(results), 

538 } 

539 else: 

540 return { 

541 "engine": engine_name, 

542 "success": False, 

543 "error": "No results", 

544 "results": [], 

545 } 

546 

547 except Exception: 

548 logger.exception(f"Error executing {engine_name}") 

549 return { 

550 "engine": engine_name, 

551 "success": False, 

552 "error": f"Engine {engine_name} failed", 

553 "results": [], 

554 } 

555 

556 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

557 """ 

558 Get preview information by executing selected engines in parallel. 

559 

560 Args: 

561 query: The search query 

562 

563 Returns: 

564 Combined list of preview dictionaries from all successful engines 

565 """ 

566 # Select engines for this query 

567 selected_engines = self.select_engines(query) 

568 

569 if not selected_engines: 

570 logger.warning("No engines selected") 

571 return [] 

572 

573 logger.info( 

574 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}" 

575 ) 

576 

577 # Emit socket event about selected engines 

578 try: 

579 SocketIOService().emit_socket_event( 

580 "parallel_search_started", 

581 {"engines": selected_engines, "query": query}, 

582 ) 

583 except Exception: 

584 logger.exception("Socket emit error (non-critical)") 

585 

586 # Execute all engines in parallel using persistent thread pool 

587 all_results = [] 

588 engine_results = {} 

589 

590 # Get the global thread pool 

591 executor = _get_global_executor() 

592 if executor is None: 

593 logger.error( 

594 "Global thread pool not available, cannot execute parallel search" 

595 ) 

596 return [] 

597 

598 # Submit all tasks to the global executor 

599 future_to_engine = { 

600 executor.submit( 

601 self._execute_single_engine, engine_name, query 

602 ): engine_name 

603 for engine_name in selected_engines 

604 } 

605 

606 # Collect results as they complete 

607 for future in concurrent.futures.as_completed(future_to_engine): 

608 engine_name = future_to_engine[future] 

609 try: 

610 result = future.result() 

611 engine_results[engine_name] = result 

612 

613 if result["success"]: 613 ↛ 633line 613 didn't jump to line 633 because the condition on line 613 was always true

614 # Add source information to each result 

615 for item in result["results"]: 

616 item["search_engine"] = engine_name 

617 all_results.extend(result["results"]) 

618 

619 # Emit success event 

620 try: 

621 SocketIOService().emit_socket_event( 

622 "engine_completed", 

623 { 

624 "engine": engine_name, 

625 "success": True, 

626 "count": result["count"], 

627 }, 

628 ) 

629 except Exception: 

630 logger.debug("Socket emit error (non-critical)") 

631 else: 

632 # Emit failure event 

633 try: 

634 SocketIOService().emit_socket_event( 

635 "engine_completed", 

636 { 

637 "engine": engine_name, 

638 "success": False, 

639 "error": result.get("error", "Unknown error"), 

640 }, 

641 ) 

642 except Exception: 

643 logger.debug("Socket emit error (non-critical)") 

644 

645 except Exception: 

646 logger.exception(f"Failed to get result from {engine_name}") 

647 engine_results[engine_name] = { 

648 "engine": engine_name, 

649 "success": False, 

650 "error": "Search execution failed", 

651 "results": [], 

652 } 

653 

654 # Log summary 

655 successful_engines = [ 

656 name for name, res in engine_results.items() if res["success"] 

657 ] 

658 failed_engines = [ 

659 name for name, res in engine_results.items() if not res["success"] 

660 ] 

661 

662 logger.info( 

663 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed" 

664 ) 

665 if successful_engines: 665 ↛ 667line 665 didn't jump to line 667 because the condition on line 665 was always true

666 logger.info(f"Successful engines: {', '.join(successful_engines)}") 

667 if failed_engines: 

668 logger.info(f"Failed engines: {', '.join(failed_engines)}") 

669 

670 # Log sample result to understand structure 

671 if all_results: 671 ↛ 677line 671 didn't jump to line 677 because the condition on line 671 was always true

672 logger.debug( 

673 f"Sample result keys from first result: {list(all_results[0].keys())}" 

674 ) 

675 logger.debug(f"Sample result: {str(all_results[0])[:500]}") 

676 

677 logger.info(f"Total results from all engines: {len(all_results)}") 

678 

679 # Store the engine results for potential use in _get_full_content 

680 self._engine_results = engine_results 

681 self._successful_engines = successful_engines 

682 

683 return all_results 

684 

685 def _get_full_content( 

686 self, relevant_items: List[Dict[str, Any]] 

687 ) -> List[Dict[str, Any]]: 

688 """ 

689 Get full content for the relevant items. 

690 Since we ran multiple engines, we'll group items by their source engine 

691 and get full content from each engine for its own results. 

692 

693 Args: 

694 relevant_items: List of relevant preview dictionaries 

695 

696 Returns: 

697 List of result dictionaries with full content 

698 """ 

699 # Check if we should get full content 

700 if get_setting_from_snapshot( 

701 "search.snippets_only", 

702 True, 

703 settings_snapshot=self.settings_snapshot, 

704 ): 

705 logger.info("Snippet-only mode, skipping full content retrieval") 

706 return relevant_items 

707 

708 logger.info("Getting full content for relevant items") 

709 

710 # Group items by their source engine 

711 items_by_engine = {} 

712 for item in relevant_items: 

713 engine_name = item.get("search_engine") 

714 if engine_name: 714 ↛ 712line 714 didn't jump to line 712 because the condition on line 714 was always true

715 if engine_name not in items_by_engine: 715 ↛ 717line 715 didn't jump to line 717 because the condition on line 715 was always true

716 items_by_engine[engine_name] = [] 

717 items_by_engine[engine_name].append(item) 

718 

719 # Get full content from each engine for its items 

720 all_full_content = [] 

721 

722 for engine_name, items in items_by_engine.items(): 

723 engine = self._get_engine_instance(engine_name) 

724 if engine: 724 ↛ 739line 724 didn't jump to line 739 because the condition on line 724 was always true

725 try: 

726 logger.info( 

727 f"Getting full content from {engine_name} for {len(items)} items" 

728 ) 

729 full_content = engine._get_full_content(items) 

730 all_full_content.extend(full_content) 

731 except Exception: 

732 logger.exception( 

733 f"Error getting full content from {engine_name}" 

734 ) 

735 # Fall back to returning items without full content 

736 all_full_content.extend(items) 

737 else: 

738 # No engine available, return items as-is 

739 all_full_content.extend(items) 

740 

741 return all_full_content 

742 

743 def invoke(self, query: str) -> List[Dict[str, Any]]: 

744 """Compatibility method for LangChain tools""" 

745 return self.run(query) 

746 

747 # Note: No shutdown() or context manager methods needed 

748 # The global thread pool is automatically cleaned up at process exit via atexit