Coverage for src / local_deep_research / web_search_engines / engines / parallel_search_engine.py: 17%

292 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import json 

2import os 

3import concurrent.futures 

4from typing import Any, Dict, List, Optional 

5from threading import Lock 

6import atexit 

7 

8from loguru import logger 

9 

10from ...config.search_config import get_setting_from_snapshot 

11from ...utilities.enums import SearchMode 

12from ...utilities.thread_context import preserve_research_context 

13from ...web.services.socket_service import SocketIOService 

14from ..search_engine_base import BaseSearchEngine 

15from ..search_engine_factory import create_search_engine 

16 

17# Global thread pool shared by all ParallelSearchEngine instances 

18# This prevents creating multiple thread pools and having more threads than expected 

19_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None 

20_global_executor_lock = Lock() 

21 

22 

23def _get_global_executor( 

24 max_workers: Optional[int] = None, 

25) -> Optional[concurrent.futures.ThreadPoolExecutor]: 

26 """ 

27 Get or initialize the global thread pool executor. 

28 Thread-safe lazy initialization ensures only one pool is created. 

29 

30 Args: 

31 max_workers: Number of worker threads. If None, uses Python's recommended 

32 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

33 Only used on first initialization; subsequent calls ignore this parameter. 

34 

35 Returns: 

36 The global ThreadPoolExecutor instance, or None if initialization fails 

37 """ 

38 global _global_executor 

39 

40 with _global_executor_lock: 

41 if _global_executor is None: 

42 if max_workers is None: 42 ↛ 45line 42 didn't jump to line 45 because the condition on line 42 was always true

43 max_workers = min(32, (os.cpu_count() or 1) + 4) 

44 

45 try: 

46 _global_executor = concurrent.futures.ThreadPoolExecutor( 

47 max_workers=max_workers, 

48 thread_name_prefix="parallel_search_", 

49 ) 

50 logger.info( 

51 f"Initialized global ThreadPool with {max_workers} workers " 

52 f"(shared by all ParallelSearchEngine instances)" 

53 ) 

54 except Exception: 

55 logger.exception( 

56 "Failed to create global ThreadPoolExecutor, parallel search will not work" 

57 ) 

58 return None 

59 

60 return _global_executor 

61 

62 

63def shutdown_global_executor(wait: bool = True): 

64 """ 

65 Shutdown the global thread pool executor. 

66 

67 This is called automatically at process exit via atexit. 

68 After calling this, any new ParallelSearchEngine instances will create a new pool. 

69 

70 Args: 

71 wait: If True, wait for all threads to complete before returning 

72 """ 

73 global _global_executor 

74 

75 with _global_executor_lock: 

76 if _global_executor is not None: 

77 try: 

78 _global_executor.shutdown(wait=wait) 

79 logger.info("Global ThreadPool shutdown complete") 

80 except Exception: 

81 logger.exception("Error shutting down global ThreadPool") 

82 finally: 

83 _global_executor = None 

84 

85 

86# Register automatic cleanup at process exit 

87atexit.register(lambda: shutdown_global_executor(wait=True)) 

88 

89 

90class ParallelSearchEngine(BaseSearchEngine): 

91 """ 

92 Parallel search engine that executes multiple search engines simultaneously. 

93 Uses LLM to select appropriate engines based on query, then runs them all in parallel. 

94 

95 Thread Pool Management: 

96 All instances share a single global thread pool to prevent thread proliferation. 

97 The pool is automatically cleaned up at process exit. 

98 

99 Usage: 

100 engine = ParallelSearchEngine(llm) 

101 results = engine.run("query") 

102 # No manual cleanup needed - global pool is shared and cleaned up automatically 

103 """ 

104 

105 def __init__( 

106 self, 

107 llm, 

108 max_results: int = 10, 

109 use_api_key_services: bool = True, 

110 max_engines_to_select: int = 100, # Allow selecting all available engines 

111 allow_local_engines: bool = False, # Disabled by default for privacy 

112 include_generic_engines: bool = True, # Always include generic search engines 

113 search_mode: SearchMode = SearchMode.ALL, 

114 max_filtered_results: Optional[int] = None, 

115 settings_snapshot: Optional[Dict[str, Any]] = None, 

116 programmatic_mode: bool = False, 

117 max_workers: Optional[ 

118 int 

119 ] = None, # Thread pool size (None = auto-detect) 

120 **kwargs, 

121 ): 

122 """ 

123 Initialize the parallel search engine. 

124 

125 All instances share a global thread pool. The first instance created will 

126 initialize the pool with the specified max_workers (or auto-detected value). 

127 Subsequent instances reuse the existing pool. 

128 

129 Args: 

130 llm: Language model instance for query classification 

131 max_results: Maximum number of search results to return per engine 

132 use_api_key_services: Whether to include services that require API keys 

133 max_engines_to_select: Maximum number of engines to select for parallel execution 

134 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web) 

135 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.) 

136 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only 

137 max_filtered_results: Maximum number of results to keep after filtering 

138 settings_snapshot: Settings snapshot for thread context 

139 programmatic_mode: If True, disables database operations and metrics tracking 

140 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's 

141 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

142 Ignored if the global pool is already initialized. 

143 **kwargs: Additional parameters (ignored but accepted for compatibility) 

144 """ 

145 # Override max_filtered_results to be much higher for parallel search 

146 # If not explicitly set, use 50 instead of the default 5-20 

147 if max_filtered_results is None: 147 ↛ 153line 147 didn't jump to line 153 because the condition on line 147 was always true

148 max_filtered_results = 50 

149 logger.info( 

150 f"Setting max_filtered_results to {max_filtered_results} for parallel search" 

151 ) 

152 

153 super().__init__( 

154 llm=llm, 

155 max_filtered_results=max_filtered_results, 

156 max_results=max_results, 

157 settings_snapshot=settings_snapshot, 

158 programmatic_mode=programmatic_mode, 

159 ) 

160 

161 self.use_api_key_services = use_api_key_services 

162 self.max_engines_to_select = max_engines_to_select 

163 self.allow_local_engines = allow_local_engines 

164 self.include_generic_engines = include_generic_engines 

165 self.search_mode = search_mode 

166 self.settings_snapshot = settings_snapshot 

167 

168 # Disable LLM relevance filtering at the parallel level by default 

169 # Individual engines can still filter their own results 

170 # Double filtering (engines + parallel) is too aggressive 

171 self.enable_llm_relevance_filter = kwargs.get( 

172 "enable_llm_relevance_filter", False 

173 ) 

174 

175 # Cache for engine instances 

176 self.engine_cache = {} 

177 self.cache_lock = Lock() 

178 

179 # Initialize global thread pool (thread-safe, only happens once) 

180 # All instances share the same pool to prevent thread proliferation 

181 _get_global_executor(max_workers) 

182 

183 # Get available engines (excluding 'meta', 'auto', and 'parallel') 

184 self.available_engines = self._get_available_engines() 

185 logger.info( 

186 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}" 

187 ) 

188 

189 def _check_api_key_availability( 

190 self, name: str, config_: Dict[str, Any] 

191 ) -> bool: 

192 """ 

193 Check if API keys are available for engines that require them. 

194 

195 Args: 

196 name: Engine name 

197 config_: Engine configuration 

198 

199 Returns: 

200 True if the engine can be used (API key available or not required) 

201 """ 

202 # If engine doesn't require API key, it's available 

203 if not config_.get("requires_api_key", False): 

204 return True 

205 

206 # Check if API key is configured 

207 api_key_setting = config_.get("api_key_setting") 

208 if api_key_setting and self.settings_snapshot: 

209 api_key = self.settings_snapshot.get(api_key_setting) 

210 if api_key and str(api_key).strip(): 

211 return True 

212 logger.debug(f"Skipping {name} - API key not configured") 

213 return False 

214 

215 # No API key setting defined, assume it's available 

216 return True 

217 

218 def _get_search_config(self) -> Dict[str, Any]: 

219 """Get search config from settings_snapshot""" 

220 if self.settings_snapshot: 220 ↛ 241line 220 didn't jump to line 241 because the condition on line 220 was always true

221 # Extract search engine configs from settings snapshot 

222 config_data = {} 

223 for key, value in self.settings_snapshot.items(): 

224 if key.startswith("search.engine.web."): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 parts = key.split(".") 

226 if len(parts) >= 4: 

227 engine_name = parts[3] 

228 if engine_name not in config_data: 

229 config_data[engine_name] = {} 

230 remaining_key = ( 

231 ".".join(parts[4:]) if len(parts) > 4 else "" 

232 ) 

233 if remaining_key: 

234 config_data[engine_name][remaining_key] = ( 

235 value.get("value") 

236 if isinstance(value, dict) 

237 else value 

238 ) 

239 return config_data 

240 else: 

241 return {} 

242 

243 def _get_available_engines(self) -> List[str]: 

244 """Get list of available engines based on is_public flag and API key availability""" 

245 available = [] 

246 config_data = self._get_search_config() 

247 

248 for name, config_ in config_data.items(): 248 ↛ 250line 248 didn't jump to line 250 because the loop on line 248 never started

249 # Skip meta, auto, and parallel engines (special engines) 

250 if name in ["meta", "auto", "parallel"]: 

251 continue 

252 

253 # Try to get the engine class to check is_public flag 

254 success, engine_class, error_msg = ( 

255 BaseSearchEngine._load_engine_class(name, config_) 

256 ) 

257 

258 if not success: 

259 logger.debug(error_msg) 

260 continue 

261 

262 # Check if engine is public or if local engines are allowed 

263 if hasattr(engine_class, "is_public"): 

264 if not engine_class.is_public and not self.allow_local_engines: 

265 logger.debug(f"Skipping local/private engine: {name}") 

266 continue 

267 elif not engine_class.is_public and self.allow_local_engines: 

268 logger.warning( 

269 f"Including local/private engine {name} - data may be exposed" 

270 ) 

271 else: 

272 # No is_public flag - assume it's private/local for safety 

273 if not self.allow_local_engines: 

274 logger.debug( 

275 f"Skipping engine {name} - no is_public flag and local engines not allowed" 

276 ) 

277 continue 

278 

279 # Apply scientific mode filtering if enabled 

280 if self.search_mode == SearchMode.SCIENTIFIC: 

281 # In scientific mode: include scientific engines AND generic engines 

282 # Exclude non-scientific specialized engines (like Guardian, Wayback) 

283 is_scientific = getattr(engine_class, "is_scientific", False) 

284 is_generic = getattr(engine_class, "is_generic", False) 

285 

286 if not (is_scientific or is_generic): 

287 logger.debug( 

288 f"Skipping {name} in scientific mode (not scientific or generic)" 

289 ) 

290 continue 

291 

292 logger.debug( 

293 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})" 

294 ) 

295 # Skip engines that require API keys if we don't want to use them 

296 if ( 

297 config_.get("requires_api_key", False) 

298 and not self.use_api_key_services 

299 ): 

300 logger.debug( 

301 f"Skipping {name} - requires API key and use_api_key_services is False" 

302 ) 

303 continue 

304 

305 # Check API key availability 

306 if not self._check_api_key_availability(name, config_): 

307 continue 

308 

309 available.append(name) 

310 

311 return available 

312 

313 def _get_available_generic_engines(self) -> List[str]: 

314 """Get list of available generic search engines that pass API key checks""" 

315 generic_engines = [] 

316 config_data = self._get_search_config() 

317 

318 for name, config_ in config_data.items(): 

319 # Skip if not in available engines (already filtered for API keys etc) 

320 if name not in self.available_engines: 

321 continue 

322 

323 # Load the engine class to check is_generic flag 

324 success, engine_class, error_msg = ( 

325 BaseSearchEngine._load_engine_class(name, config_) 

326 ) 

327 

328 if not success: 

329 logger.debug( 

330 f"Could not check if {name} is generic: {error_msg}" 

331 ) 

332 continue 

333 

334 # Check if engine is generic 

335 if getattr(engine_class, "is_generic", False): 

336 generic_engines.append(name) 

337 logger.debug(f"Found generic engine: {name}") 

338 

339 return generic_engines 

340 

341 def select_engines(self, query: str) -> List[str]: 

342 """ 

343 Use LLM to select appropriate search engines based only on names. 

344 

345 Args: 

346 query: The search query 

347 

348 Returns: 

349 List of selected engine names 

350 """ 

351 if not self.llm or not self.available_engines: 

352 logger.warning( 

353 "No LLM or no available engines, using all available" 

354 ) 

355 return self.available_engines[: self.max_engines_to_select] 

356 

357 try: 

358 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added) 

359 engines_for_selection = self.available_engines.copy() 

360 generic_engines = [] 

361 

362 if self.include_generic_engines: 

363 generic_engines = self._get_available_generic_engines() 

364 # Remove generic engines from selection since they'll be added automatically 

365 engines_for_selection = [ 

366 e for e in engines_for_selection if e not in generic_engines 

367 ] 

368 logger.debug( 

369 f"Excluding generic engines from LLM selection: {generic_engines}" 

370 ) 

371 

372 # If no specialized engines available, just return the generic ones 

373 if not engines_for_selection: 

374 logger.info( 

375 f"No specialized engines available, using generic engines: {generic_engines}" 

376 ) 

377 return generic_engines 

378 

379 # Create a simple prompt with just non-generic engine names 

380 engine_list = "\n".join( 

381 [ 

382 f"[{i}] {name}" 

383 for i, name in enumerate(engines_for_selection) 

384 ] 

385 ) 

386 

387 logger.debug(f"Engines for LLM selection: {engines_for_selection}") 

388 

389 prompt = f"""Query: {query} 

390 

391Available search engines: 

392{engine_list} 

393 

394Select the most appropriate search engines for this query. Return ONLY a JSON array of indices. 

395Example: [0, 2, 5] 

396 

397Select up to {self.max_engines_to_select} engines that would best answer this query.""" 

398 

399 logger.debug("Sending prompt to LLM for engine selection") 

400 # Get LLM response 

401 response = self.llm.invoke(prompt) 

402 

403 # Handle different response formats 

404 if hasattr(response, "content"): 

405 content = response.content.strip() 

406 else: 

407 content = str(response).strip() 

408 

409 # Extract JSON array 

410 start_idx = content.find("[") 

411 end_idx = content.rfind("]") 

412 

413 if start_idx >= 0 and end_idx > start_idx: 

414 array_text = content[start_idx : end_idx + 1] 

415 indices = json.loads(array_text) 

416 

417 # Convert indices to engine names (from the filtered list) 

418 selected_engines = [] 

419 for idx in indices: 

420 if isinstance(idx, int) and 0 <= idx < len( 

421 engines_for_selection 

422 ): 

423 selected_engines.append(engines_for_selection[idx]) 

424 

425 if selected_engines: 

426 # Add generic search engines (they were excluded from selection) 

427 if self.include_generic_engines: 

428 for engine in generic_engines: 

429 if engine not in selected_engines: 

430 selected_engines.append(engine) 

431 logger.debug(f"Added generic engine: {engine}") 

432 

433 logger.info(f"Final selected engines: {selected_engines}") 

434 return selected_engines 

435 

436 # Fallback if parsing fails - return generic engines plus some specialized ones 

437 logger.warning( 

438 "Failed to parse LLM response, using generic engines + top specialized" 

439 ) 

440 result = ( 

441 generic_engines.copy() if self.include_generic_engines else [] 

442 ) 

443 for engine in self.available_engines[: self.max_engines_to_select]: 

444 if engine not in result: 

445 result.append(engine) 

446 return result 

447 

448 except Exception: 

449 logger.exception("Error selecting engines with LLM") 

450 # Fallback to using generic engines + available engines 

451 if self.include_generic_engines: 

452 generic_engines = self._get_available_generic_engines() 

453 result = generic_engines.copy() 

454 for engine in self.available_engines[ 

455 : self.max_engines_to_select 

456 ]: 

457 if engine not in result: 

458 result.append(engine) 

459 return result 

460 return self.available_engines[: self.max_engines_to_select] 

461 

462 def _get_engine_instance( 

463 self, engine_name: str 

464 ) -> Optional[BaseSearchEngine]: 

465 """Get or create an instance of the specified search engine""" 

466 with self.cache_lock: 

467 # Return cached instance if available 

468 if engine_name in self.engine_cache: 

469 return self.engine_cache[engine_name] 

470 

471 # Create a new instance 

472 engine = None 

473 try: 

474 # Only pass parameters that all engines accept 

475 common_params = { 

476 "llm": self.llm, 

477 "max_results": self.max_results, 

478 } 

479 

480 # Add max_filtered_results if specified 

481 if self.max_filtered_results is not None: 

482 common_params["max_filtered_results"] = ( 

483 self.max_filtered_results 

484 ) 

485 

486 engine = create_search_engine( 

487 engine_name, 

488 settings_snapshot=self.settings_snapshot, 

489 programmatic_mode=self.programmatic_mode, 

490 **common_params, 

491 ) 

492 

493 # Individual engines use their auto-detected filtering settings 

494 # The factory enables LLM filtering for scientific engines (arXiv, etc.) 

495 # and disables it for generic engines (Google, Brave, etc.) 

496 except Exception: 

497 logger.exception( 

498 f"Error creating engine instance for {engine_name}" 

499 ) 

500 return None 

501 

502 if engine: 

503 # Cache the instance 

504 self.engine_cache[engine_name] = engine 

505 

506 return engine 

507 

508 @preserve_research_context 

509 def _execute_single_engine( 

510 self, engine_name: str, query: str 

511 ) -> Dict[str, Any]: 

512 """ 

513 Execute a single search engine and return results. 

514 

515 Note: This method is decorated with @preserve_research_context to ensure 

516 rate limiting context is properly propagated to child threads. 

517 

518 Args: 

519 engine_name: Name of the engine to execute 

520 query: The search query 

521 

522 Returns: 

523 Dictionary with engine name and results or error 

524 """ 

525 logger.info(f"Executing search on {engine_name}") 

526 

527 engine = self._get_engine_instance(engine_name) 

528 if not engine: 

529 return { 

530 "engine": engine_name, 

531 "success": False, 

532 "error": f"Failed to initialize {engine_name}", 

533 "results": [], 

534 } 

535 

536 try: 

537 # Run the engine properly through its run() method 

538 # This ensures proper filter application, context propagation, etc. 

539 results = engine.run(query) 

540 

541 if results and len(results) > 0: 

542 logger.info(f"Got {len(results)} results from {engine_name}") 

543 return { 

544 "engine": engine_name, 

545 "success": True, 

546 "results": results, 

547 "count": len(results), 

548 } 

549 else: 

550 return { 

551 "engine": engine_name, 

552 "success": False, 

553 "error": "No results", 

554 "results": [], 

555 } 

556 

557 except Exception: 

558 logger.exception(f"Error executing {engine_name}") 

559 return { 

560 "engine": engine_name, 

561 "success": False, 

562 "error": f"Engine {engine_name} failed", 

563 "results": [], 

564 } 

565 

566 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

567 """ 

568 Get preview information by executing selected engines in parallel. 

569 

570 Args: 

571 query: The search query 

572 

573 Returns: 

574 Combined list of preview dictionaries from all successful engines 

575 """ 

576 # Select engines for this query 

577 selected_engines = self.select_engines(query) 

578 

579 if not selected_engines: 

580 logger.warning("No engines selected") 

581 return [] 

582 

583 logger.info( 

584 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}" 

585 ) 

586 

587 # Emit socket event about selected engines 

588 try: 

589 SocketIOService().emit_socket_event( 

590 "parallel_search_started", 

591 {"engines": selected_engines, "query": query}, 

592 ) 

593 except Exception: 

594 logger.exception("Socket emit error (non-critical)") 

595 

596 # Execute all engines in parallel using persistent thread pool 

597 all_results = [] 

598 engine_results = {} 

599 

600 # Get the global thread pool 

601 executor = _get_global_executor() 

602 if executor is None: 

603 logger.error( 

604 "Global thread pool not available, cannot execute parallel search" 

605 ) 

606 return [] 

607 

608 # Submit all tasks to the global executor 

609 future_to_engine = { 

610 executor.submit( 

611 self._execute_single_engine, engine_name, query 

612 ): engine_name 

613 for engine_name in selected_engines 

614 } 

615 

616 # Collect results as they complete 

617 for future in concurrent.futures.as_completed(future_to_engine): 

618 engine_name = future_to_engine[future] 

619 try: 

620 result = future.result() 

621 engine_results[engine_name] = result 

622 

623 if result["success"]: 

624 # Add source information to each result 

625 for item in result["results"]: 

626 item["search_engine"] = engine_name 

627 all_results.extend(result["results"]) 

628 

629 # Emit success event 

630 try: 

631 SocketIOService().emit_socket_event( 

632 "engine_completed", 

633 { 

634 "engine": engine_name, 

635 "success": True, 

636 "count": result["count"], 

637 }, 

638 ) 

639 except Exception: 

640 logger.debug("Socket emit error (non-critical)") 

641 else: 

642 # Emit failure event 

643 try: 

644 SocketIOService().emit_socket_event( 

645 "engine_completed", 

646 { 

647 "engine": engine_name, 

648 "success": False, 

649 "error": result.get("error", "Unknown error"), 

650 }, 

651 ) 

652 except Exception: 

653 logger.debug("Socket emit error (non-critical)") 

654 

655 except Exception: 

656 logger.exception(f"Failed to get result from {engine_name}") 

657 engine_results[engine_name] = { 

658 "engine": engine_name, 

659 "success": False, 

660 "error": "Search execution failed", 

661 "results": [], 

662 } 

663 

664 # Log summary 

665 successful_engines = [ 

666 name for name, res in engine_results.items() if res["success"] 

667 ] 

668 failed_engines = [ 

669 name for name, res in engine_results.items() if not res["success"] 

670 ] 

671 

672 logger.info( 

673 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed" 

674 ) 

675 if successful_engines: 

676 logger.info(f"Successful engines: {', '.join(successful_engines)}") 

677 if failed_engines: 

678 logger.info(f"Failed engines: {', '.join(failed_engines)}") 

679 

680 # Log sample result to understand structure 

681 if all_results: 

682 logger.debug( 

683 f"Sample result keys from first result: {list(all_results[0].keys())}" 

684 ) 

685 logger.debug(f"Sample result: {str(all_results[0])[:500]}") 

686 

687 logger.info(f"Total results from all engines: {len(all_results)}") 

688 

689 # Store the engine results for potential use in _get_full_content 

690 self._engine_results = engine_results 

691 self._successful_engines = successful_engines 

692 

693 return all_results 

694 

695 def _get_full_content( 

696 self, relevant_items: List[Dict[str, Any]] 

697 ) -> List[Dict[str, Any]]: 

698 """ 

699 Get full content for the relevant items. 

700 Since we ran multiple engines, we'll group items by their source engine 

701 and get full content from each engine for its own results. 

702 

703 Args: 

704 relevant_items: List of relevant preview dictionaries 

705 

706 Returns: 

707 List of result dictionaries with full content 

708 """ 

709 # Check if we should get full content 

710 if get_setting_from_snapshot( 

711 "search.snippets_only", 

712 True, 

713 settings_snapshot=self.settings_snapshot, 

714 ): 

715 logger.info("Snippet-only mode, skipping full content retrieval") 

716 return relevant_items 

717 

718 logger.info("Getting full content for relevant items") 

719 

720 # Group items by their source engine 

721 items_by_engine = {} 

722 for item in relevant_items: 

723 engine_name = item.get("search_engine") 

724 if engine_name: 

725 if engine_name not in items_by_engine: 

726 items_by_engine[engine_name] = [] 

727 items_by_engine[engine_name].append(item) 

728 

729 # Get full content from each engine for its items 

730 all_full_content = [] 

731 

732 for engine_name, items in items_by_engine.items(): 

733 engine = self._get_engine_instance(engine_name) 

734 if engine: 

735 try: 

736 logger.info( 

737 f"Getting full content from {engine_name} for {len(items)} items" 

738 ) 

739 full_content = engine._get_full_content(items) 

740 all_full_content.extend(full_content) 

741 except Exception: 

742 logger.exception( 

743 f"Error getting full content from {engine_name}" 

744 ) 

745 # Fall back to returning items without full content 

746 all_full_content.extend(items) 

747 else: 

748 # No engine available, return items as-is 

749 all_full_content.extend(items) 

750 

751 return all_full_content 

752 

753 def invoke(self, query: str) -> List[Dict[str, Any]]: 

754 """Compatibility method for LangChain tools""" 

755 return self.run(query) 

756 

757 # Note: No shutdown() or context manager methods needed 

758 # The global thread pool is automatically cleaned up at process exit via atexit