Coverage for src/local_deep_research/web_search_engines/engines/parallel_search_engine.py: 96%

274 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import os 

2import concurrent.futures 

3from typing import Any, Dict, List, Optional 

4from threading import Lock 

5import atexit 

6 

7from loguru import logger 

8 

9from ...config.search_config import get_setting_from_snapshot 

10from ...database.thread_local_session import cleanup_current_thread 

11from ...utilities.enums import SearchMode 

12from ...utilities.json_utils import extract_json, get_llm_response_text 

13from ...utilities.thread_context import ( 

14 clear_search_context, 

15 get_search_context, 

16 set_search_context, 

17) 

18from ...web.services.socket_service import SocketIOService 

19from ..search_engine_base import BaseSearchEngine 

20from ..search_engine_factory import create_search_engine 

21from ..search_engines_config import get_available_engines 

22 

23# Global thread pool shared by all ParallelSearchEngine instances 

24# This prevents creating multiple thread pools and having more threads than expected 

25_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None 

26_global_executor_lock = Lock() 

27 

28 

29def _get_global_executor( 

30 max_workers: Optional[int] = None, 

31) -> Optional[concurrent.futures.ThreadPoolExecutor]: 

32 """ 

33 Get or initialize the global thread pool executor. 

34 Thread-safe lazy initialization ensures only one pool is created. 

35 

36 Args: 

37 max_workers: Number of worker threads. If None, uses Python's recommended 

38 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

39 Only used on first initialization; subsequent calls ignore this parameter. 

40 

41 Returns: 

42 The global ThreadPoolExecutor instance, or None if initialization fails 

43 """ 

44 global _global_executor 

45 

46 with _global_executor_lock: 

47 if _global_executor is None: 

48 if max_workers is None: 

49 max_workers = min(32, (os.cpu_count() or 1) + 4) 

50 

51 try: 

52 _global_executor = concurrent.futures.ThreadPoolExecutor( 

53 max_workers=max_workers, 

54 thread_name_prefix="parallel_search_", 

55 ) 

56 logger.info( 

57 f"Initialized global ThreadPool with {max_workers} workers " 

58 f"(shared by all ParallelSearchEngine instances)" 

59 ) 

60 except Exception: 

61 logger.exception( 

62 "Failed to create global ThreadPoolExecutor, parallel search will not work" 

63 ) 

64 return None 

65 

66 return _global_executor 

67 

68 

69def shutdown_global_executor(wait: bool = True): 

70 """ 

71 Shutdown the global thread pool executor. 

72 

73 This is called automatically at process exit via atexit. 

74 After calling this, any new ParallelSearchEngine instances will create a new pool. 

75 

76 Args: 

77 wait: If True, wait for all threads to complete before returning 

78 """ 

79 global _global_executor 

80 

81 with _global_executor_lock: 

82 if _global_executor is not None: 

83 try: 

84 _global_executor.shutdown(wait=wait) 

85 logger.info("Global ThreadPool shutdown complete") 

86 except Exception: 

87 logger.exception("Error shutting down global ThreadPool") 

88 finally: 

89 _global_executor = None 

90 

91 

92# Register automatic cleanup at process exit 

93atexit.register(lambda: shutdown_global_executor(wait=True)) 

94 

95 

96class ParallelSearchEngine(BaseSearchEngine): 

97 """ 

98 Parallel search engine that executes multiple search engines simultaneously. 

99 Uses LLM to select appropriate engines based on query, then runs them all in parallel. 

100 

101 Thread Pool Management: 

102 All instances share a single global thread pool to prevent thread proliferation. 

103 The pool is automatically cleaned up at process exit. 

104 

105 Usage: 

106 engine = ParallelSearchEngine(llm) 

107 results = engine.run("query") 

108 # No manual cleanup needed - global pool is shared and cleaned up automatically 

109 """ 

110 

111 def __init__( 

112 self, 

113 llm, 

114 max_results: int = 10, 

115 use_api_key_services: bool = True, 

116 max_engines_to_select: int = 100, # Allow selecting all available engines 

117 allow_local_engines: bool = False, # Disabled by default for privacy 

118 include_generic_engines: bool = True, # Always include generic search engines 

119 search_mode: SearchMode = SearchMode.ALL, 

120 max_filtered_results: Optional[int] = None, 

121 settings_snapshot: Optional[Dict[str, Any]] = None, 

122 programmatic_mode: bool = False, 

123 max_workers: Optional[ 

124 int 

125 ] = None, # Thread pool size (None = auto-detect) 

126 **kwargs, 

127 ): 

128 """ 

129 Initialize the parallel search engine. 

130 

131 All instances share a global thread pool. The first instance created will 

132 initialize the pool with the specified max_workers (or auto-detected value). 

133 Subsequent instances reuse the existing pool. 

134 

135 Args: 

136 llm: Language model instance for query classification 

137 max_results: Maximum number of search results to return per engine 

138 use_api_key_services: Whether to include services that require API keys 

139 max_engines_to_select: Maximum number of engines to select for parallel execution 

140 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web) 

141 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.) 

142 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only 

143 max_filtered_results: Maximum number of results to keep after filtering 

144 settings_snapshot: Settings snapshot for thread context 

145 programmatic_mode: If True, disables database operations and metrics tracking 

146 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's 

147 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

148 Ignored if the global pool is already initialized. 

149 **kwargs: Additional parameters (ignored but accepted for compatibility) 

150 """ 

151 # Parallel search aggregates results from multiple engines, so 

152 # it runs with a higher post-filter cap than a single-engine 

153 # search would get from DEFAULT_MAX_FILTERED_RESULTS. 

154 if max_filtered_results is None: 

155 max_filtered_results = 50 

156 logger.info( 

157 f"Setting max_filtered_results to {max_filtered_results} for parallel search" 

158 ) 

159 

160 super().__init__( 

161 llm=llm, 

162 max_filtered_results=max_filtered_results, 

163 max_results=max_results, 

164 settings_snapshot=settings_snapshot, 

165 programmatic_mode=programmatic_mode, 

166 ) 

167 

168 self.use_api_key_services = use_api_key_services 

169 self.max_engines_to_select = max_engines_to_select 

170 self.allow_local_engines = allow_local_engines 

171 self.include_generic_engines = include_generic_engines 

172 self.search_mode = search_mode 

173 self.settings_snapshot = settings_snapshot or {} 

174 

175 # Disable LLM relevance filtering at the parallel level by default 

176 # Individual engines can still filter their own results 

177 # Double filtering (engines + parallel) is too aggressive 

178 self.enable_llm_relevance_filter = kwargs.get( 

179 "enable_llm_relevance_filter", False 

180 ) 

181 

182 # Cache for engine instances 

183 self.engine_cache: Dict[str, Any] = {} 

184 self.cache_lock = Lock() 

185 

186 # Initialize global thread pool (thread-safe, only happens once) 

187 # All instances share the same pool to prevent thread proliferation 

188 _get_global_executor(max_workers) 

189 

190 # Get available engines (excluding 'meta', 'auto', and 'parallel') 

191 self.available_engines = self._get_available_engines() 

192 logger.info( 

193 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}" 

194 ) 

195 

196 def _get_search_config(self) -> Dict[str, Any]: 

197 """Get search config for available engines.""" 

198 return get_available_engines( 

199 settings_snapshot=self.settings_snapshot, 

200 use_api_key_services=self.use_api_key_services, 

201 ) 

202 

203 def _get_available_engines(self) -> List[str]: 

204 """Get list of available engines, applying parallel-specific filters 

205 (is_public, search mode) on top of the shared base filter.""" 

206 base_available = get_available_engines( 

207 settings_snapshot=self.settings_snapshot, 

208 use_api_key_services=self.use_api_key_services, 

209 ) 

210 

211 available = [] 

212 for name, config_ in base_available.items(): 

213 # Try to get the engine class to check is_public flag 

214 success, engine_class, error_msg = ( 

215 BaseSearchEngine._load_engine_class(name, config_) 

216 ) 

217 

218 if not success: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 logger.debug(error_msg) 

220 continue 

221 

222 # Check if engine is public or if local engines are allowed 

223 if hasattr(engine_class, "is_public"): 

224 if not engine_class.is_public and not self.allow_local_engines: 

225 logger.debug(f"Skipping local/private engine: {name}") 

226 continue 

227 if not engine_class.is_public and self.allow_local_engines: 

228 logger.warning( 

229 f"Including local/private engine {name} - data may be exposed" 

230 ) 

231 else: 

232 # No is_public flag - assume it's private/local for safety 

233 if not self.allow_local_engines: 

234 logger.debug( 

235 f"Skipping engine {name} - no is_public flag and local engines not allowed" 

236 ) 

237 continue 

238 

239 # Apply scientific mode filtering if enabled 

240 if self.search_mode == SearchMode.SCIENTIFIC: 

241 is_scientific = getattr(engine_class, "is_scientific", False) 

242 is_generic = getattr(engine_class, "is_generic", False) 

243 

244 if not (is_scientific or is_generic): 

245 logger.debug( 

246 f"Skipping {name} in scientific mode (not scientific or generic)" 

247 ) 

248 continue 

249 

250 logger.debug( 

251 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})" 

252 ) 

253 

254 available.append(name) 

255 

256 return available 

257 

258 def _get_available_generic_engines(self) -> List[str]: 

259 """Get list of available generic search engines that pass API key checks""" 

260 generic_engines = [] 

261 config_data = self._get_search_config() 

262 

263 for name, config_ in config_data.items(): 

264 # Skip if not in available engines (already filtered for API keys etc) 

265 if name not in self.available_engines: 

266 continue 

267 

268 # Load the engine class to check is_generic flag 

269 success, engine_class, error_msg = ( 

270 BaseSearchEngine._load_engine_class(name, config_) 

271 ) 

272 

273 if not success: 

274 logger.debug( 

275 f"Could not check if {name} is generic: {error_msg}" 

276 ) 

277 continue 

278 

279 # Check if engine is generic 

280 if getattr(engine_class, "is_generic", False): 

281 generic_engines.append(name) 

282 logger.debug(f"Found generic engine: {name}") 

283 

284 return generic_engines 

285 

286 def select_engines(self, query: str) -> List[str]: 

287 """ 

288 Use LLM to select appropriate search engines based only on names. 

289 

290 Args: 

291 query: The search query 

292 

293 Returns: 

294 List of selected engine names 

295 """ 

296 if not self.llm or not self.available_engines: 

297 logger.warning( 

298 "No LLM or no available engines, using all available" 

299 ) 

300 return self.available_engines[: self.max_engines_to_select] 

301 

302 try: 

303 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added) 

304 engines_for_selection = self.available_engines.copy() 

305 generic_engines = [] 

306 

307 if self.include_generic_engines: 

308 generic_engines = self._get_available_generic_engines() 

309 # Remove generic engines from selection since they'll be added automatically 

310 engines_for_selection = [ 

311 e for e in engines_for_selection if e not in generic_engines 

312 ] 

313 logger.debug( 

314 f"Excluding generic engines from LLM selection: {generic_engines}" 

315 ) 

316 

317 # If no specialized engines available, just return the generic ones 

318 if not engines_for_selection: 

319 logger.info( 

320 f"No specialized engines available, using generic engines: {generic_engines}" 

321 ) 

322 return generic_engines 

323 

324 # Create a simple prompt with just non-generic engine names 

325 engine_list = "\n".join( 

326 [ 

327 f"[{i}] {name}" 

328 for i, name in enumerate(engines_for_selection) 

329 ] 

330 ) 

331 

332 logger.debug(f"Engines for LLM selection: {engines_for_selection}") 

333 

334 prompt = f"""Query: {query} 

335 

336Available search engines: 

337{engine_list} 

338 

339Select the most appropriate search engines for this query. Return ONLY a JSON array of indices. 

340Example: [0, 2, 5] 

341 

342Select up to {self.max_engines_to_select} engines that would best answer this query.""" 

343 

344 logger.debug("Sending prompt to LLM for engine selection") 

345 # Get LLM response 

346 response = self.llm.invoke(prompt) 

347 content = get_llm_response_text(response) 

348 

349 indices = extract_json(content, expected_type=list) 

350 

351 if indices is not None: 

352 # Convert indices to engine names (from the filtered list) 

353 selected_engines = [] 

354 for idx in indices: 

355 if isinstance(idx, int) and 0 <= idx < len( 

356 engines_for_selection 

357 ): 

358 selected_engines.append(engines_for_selection[idx]) 

359 

360 if selected_engines: 360 ↛ 372line 360 didn't jump to line 372 because the condition on line 360 was always true

361 # Add generic search engines (they were excluded from selection) 

362 if self.include_generic_engines: 

363 for engine in generic_engines: 

364 if engine not in selected_engines: 364 ↛ 363line 364 didn't jump to line 363 because the condition on line 364 was always true

365 selected_engines.append(engine) 

366 logger.debug(f"Added generic engine: {engine}") 

367 

368 logger.info(f"Final selected engines: {selected_engines}") 

369 return selected_engines 

370 

371 # Fallback if parsing fails - return generic engines plus some specialized ones 

372 logger.warning( 

373 "Failed to parse LLM response, using generic engines + top specialized" 

374 ) 

375 result = ( 

376 generic_engines.copy() if self.include_generic_engines else [] 

377 ) 

378 for engine in self.available_engines[: self.max_engines_to_select]: 

379 if engine not in result: 

380 result.append(engine) 

381 return result 

382 

383 except Exception: 

384 logger.exception("Error selecting engines with LLM") 

385 # Fallback to using generic engines + available engines 

386 if self.include_generic_engines: 

387 generic_engines = self._get_available_generic_engines() 

388 result = generic_engines.copy() 

389 for engine in self.available_engines[ 

390 : self.max_engines_to_select 

391 ]: 

392 if engine not in result: 392 ↛ 389line 392 didn't jump to line 389 because the condition on line 392 was always true

393 result.append(engine) 

394 return result 

395 return self.available_engines[: self.max_engines_to_select] 

396 

397 def _get_engine_instance( 

398 self, engine_name: str 

399 ) -> Optional[BaseSearchEngine]: 

400 """Get or create an instance of the specified search engine""" 

401 with self.cache_lock: 

402 # Return cached instance if available 

403 if engine_name in self.engine_cache: 

404 return self.engine_cache[engine_name] # type: ignore[no-any-return] 

405 

406 # Create a new instance 

407 engine = None 

408 try: 

409 # Only pass parameters that all engines accept 

410 common_params = { 

411 "llm": self.llm, 

412 "max_results": self.max_results, 

413 } 

414 

415 # Add max_filtered_results if specified 

416 if self.max_filtered_results is not None: 416 ↛ 421line 416 didn't jump to line 421 because the condition on line 416 was always true

417 common_params["max_filtered_results"] = ( 

418 self.max_filtered_results 

419 ) 

420 

421 engine = create_search_engine( 

422 engine_name, 

423 settings_snapshot=self.settings_snapshot, 

424 programmatic_mode=self.programmatic_mode, 

425 **common_params, # type: ignore[arg-type] 

426 ) 

427 

428 # Individual engines use their auto-detected filtering settings 

429 # The factory enables LLM filtering for scientific engines (arXiv, etc.) 

430 # and disables it for generic engines (Google, Brave, etc.) 

431 except Exception: 

432 logger.exception( 

433 f"Error creating engine instance for {engine_name}" 

434 ) 

435 return None 

436 

437 if engine: 437 ↛ 441line 437 didn't jump to line 441 because the condition on line 437 was always true

438 # Cache the instance 

439 self.engine_cache[engine_name] = engine 

440 

441 return engine 

442 

443 def _execute_single_engine( 

444 self, engine_name: str, query: str 

445 ) -> Dict[str, Any]: 

446 """ 

447 Execute a single search engine and return results. 

448 

449 Context propagation and thread-local cleanup are handled by 

450 ``_run_with_context`` at the submit site in ``_get_previews``, 

451 which captures the submitter's context once per request. 

452 

453 Args: 

454 engine_name: Name of the engine to execute 

455 query: The search query 

456 

457 Returns: 

458 Dictionary with engine name and results or error 

459 """ 

460 logger.info(f"Executing search on {engine_name}") 

461 

462 engine = self._get_engine_instance(engine_name) 

463 if not engine: 

464 return { 

465 "engine": engine_name, 

466 "success": False, 

467 "error": f"Failed to initialize {engine_name}", 

468 "results": [], 

469 } 

470 

471 try: 

472 # Run the engine properly through its run() method 

473 # This ensures proper filter application, context propagation, etc. 

474 results = engine.run(query) 

475 

476 if results and len(results) > 0: 

477 logger.info(f"Got {len(results)} results from {engine_name}") 

478 return { 

479 "engine": engine_name, 

480 "success": True, 

481 "results": results, 

482 "count": len(results), 

483 } 

484 return { 

485 "engine": engine_name, 

486 "success": False, 

487 "error": "No results", 

488 "results": [], 

489 } 

490 

491 except Exception: 

492 logger.exception(f"Error executing {engine_name}") 

493 return { 

494 "engine": engine_name, 

495 "success": False, 

496 "error": f"Engine {engine_name} failed", 

497 "results": [], 

498 } 

499 

500 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

501 """ 

502 Get preview information by executing selected engines in parallel. 

503 

504 Args: 

505 query: The search query 

506 

507 Returns: 

508 Combined list of preview dictionaries from all successful engines 

509 """ 

510 # Select engines for this query 

511 selected_engines = self.select_engines(query) 

512 

513 if not selected_engines: 

514 logger.warning("No engines selected") 

515 return [] 

516 

517 logger.info( 

518 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}" 

519 ) 

520 

521 # Emit socket event about selected engines 

522 try: 

523 SocketIOService().emit_socket_event( 

524 "parallel_search_started", 

525 {"engines": selected_engines, "query": query}, 

526 ) 

527 except Exception: 

528 logger.exception("Socket emit error (non-critical)") 

529 

530 # Execute all engines in parallel using persistent thread pool 

531 all_results = [] 

532 engine_results = {} 

533 

534 # Get the global thread pool 

535 executor = _get_global_executor() 

536 if executor is None: 

537 logger.error( 

538 "Global thread pool not available, cannot execute parallel search" 

539 ) 

540 return [] 

541 

542 # Capture submitter's research context ONCE (in the request thread, 

543 # where it is populated by the strategy's caller). Workers in the 

544 # global pool do not inherit thread-local state, so we must re-apply 

545 # it per task and clear it (+ clean up thread-local DB/session/password 

546 # state) in a finally block so nothing leaks to the next task that 

547 # lands on the same worker. 

548 submitter_ctx = get_search_context() 

549 

550 def _run_with_context(fn, *fargs): 

551 if submitter_ctx is not None: 

552 set_search_context(submitter_ctx) 

553 try: 

554 return fn(*fargs) 

555 finally: 

556 clear_search_context() 

557 try: 

558 cleanup_current_thread() 

559 except Exception: 

560 logger.debug( 

561 "parallel_search submit wrapper: cleanup_current_thread failed", 

562 exc_info=True, 

563 ) 

564 

565 # Submit all tasks to the global executor 

566 future_to_engine = { 

567 executor.submit( 

568 _run_with_context, 

569 self._execute_single_engine, 

570 engine_name, 

571 query, 

572 ): engine_name 

573 for engine_name in selected_engines 

574 } 

575 

576 # Collect results as they complete 

577 for future in concurrent.futures.as_completed(future_to_engine): 

578 engine_name = future_to_engine[future] 

579 try: 

580 result = future.result() 

581 engine_results[engine_name] = result 

582 

583 if result["success"]: 

584 # Add source information to each result 

585 for item in result["results"]: 

586 item["search_engine"] = engine_name 

587 all_results.extend(result["results"]) 

588 

589 # Emit success event 

590 try: 

591 SocketIOService().emit_socket_event( 

592 "engine_completed", 

593 { 

594 "engine": engine_name, 

595 "success": True, 

596 "count": result["count"], 

597 }, 

598 ) 

599 except Exception: 

600 logger.debug("Socket emit error (non-critical)") 

601 else: 

602 # Emit failure event 

603 try: 

604 SocketIOService().emit_socket_event( 

605 "engine_completed", 

606 { 

607 "engine": engine_name, 

608 "success": False, 

609 "error": result.get("error", "Unknown error"), 

610 }, 

611 ) 

612 except Exception: 

613 logger.debug("Socket emit error (non-critical)") 

614 

615 except Exception: 

616 logger.exception(f"Failed to get result from {engine_name}") 

617 engine_results[engine_name] = { 

618 "engine": engine_name, 

619 "success": False, 

620 "error": "Search execution failed", 

621 "results": [], 

622 } 

623 

624 # Log summary 

625 successful_engines = [ 

626 name for name, res in engine_results.items() if res["success"] 

627 ] 

628 failed_engines = [ 

629 name for name, res in engine_results.items() if not res["success"] 

630 ] 

631 

632 logger.info( 

633 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed" 

634 ) 

635 if successful_engines: 

636 logger.info(f"Successful engines: {', '.join(successful_engines)}") 

637 if failed_engines: 

638 logger.info(f"Failed engines: {', '.join(failed_engines)}") 

639 

640 # Log sample result to understand structure 

641 if all_results: 

642 logger.debug( 

643 f"Sample result keys from first result: {list(all_results[0].keys())}" 

644 ) 

645 logger.debug(f"Sample result: {str(all_results[0])[:500]}") 

646 

647 logger.info(f"Total results from all engines: {len(all_results)}") 

648 

649 # Store the engine results for potential use in _get_full_content 

650 self._engine_results = engine_results 

651 self._successful_engines = successful_engines 

652 

653 return all_results 

654 

655 def _get_full_content( 

656 self, relevant_items: List[Dict[str, Any]] 

657 ) -> List[Dict[str, Any]]: 

658 """ 

659 Get full content for the relevant items. 

660 Since we ran multiple engines, we'll group items by their source engine 

661 and get full content from each engine for its own results. 

662 

663 Args: 

664 relevant_items: List of relevant preview dictionaries 

665 

666 Returns: 

667 List of result dictionaries with full content 

668 """ 

669 # Check if we should get full content 

670 if get_setting_from_snapshot( 

671 "search.snippets_only", 

672 True, 

673 settings_snapshot=self.settings_snapshot, 

674 ): 

675 logger.info("Snippet-only mode, skipping full content retrieval") 

676 return relevant_items 

677 

678 logger.info("Getting full content for relevant items") 

679 

680 # Group items by their source engine 

681 items_by_engine: Dict[str, List[Dict[str, Any]]] = {} 

682 for item in relevant_items: 

683 engine_name = item.get("search_engine") 

684 if engine_name: 

685 if engine_name not in items_by_engine: 685 ↛ 687line 685 didn't jump to line 687 because the condition on line 685 was always true

686 items_by_engine[engine_name] = [] 

687 items_by_engine[engine_name].append(item) 

688 

689 # Get full content from each engine for its items 

690 all_full_content = [] 

691 

692 for engine_name, items in items_by_engine.items(): 

693 engine = self._get_engine_instance(engine_name) 

694 if engine: 694 ↛ 709line 694 didn't jump to line 709 because the condition on line 694 was always true

695 try: 

696 logger.info( 

697 f"Getting full content from {engine_name} for {len(items)} items" 

698 ) 

699 full_content = engine._get_full_content(items) 

700 all_full_content.extend(full_content) 

701 except Exception: 

702 logger.exception( 

703 f"Error getting full content from {engine_name}" 

704 ) 

705 # Fall back to returning items without full content 

706 all_full_content.extend(items) 

707 else: 

708 # No engine available, return items as-is 

709 all_full_content.extend(items) 

710 

711 return all_full_content 

712 

713 def close(self): 

714 """Close all cached child search engines and own resources.""" 

715 from ...utilities.resource_utils import safe_close 

716 

717 for engine in self.engine_cache.values(): 

718 safe_close(engine, "child search engine") 

719 self.engine_cache.clear() 

720 super().close() 

721 

722 def invoke(self, query: str) -> List[Dict[str, Any]]: 

723 """Compatibility method for LangChain tools""" 

724 return self.run(query) 

725 

726 # Note: No shutdown() or context manager methods needed 

727 # The global thread pool is automatically cleaned up at process exit via atexit