Coverage for src / local_deep_research / web_search_engines / engines / parallel_search_engine.py: 95%

263 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import os 

2import concurrent.futures 

3from typing import Any, Dict, List, Optional 

4from threading import Lock 

5import atexit 

6 

7from loguru import logger 

8 

9from ...config.search_config import get_setting_from_snapshot 

10from ...utilities.enums import SearchMode 

11from ...utilities.json_utils import extract_json, get_llm_response_text 

12from ...utilities.thread_context import preserve_research_context 

13from ...web.services.socket_service import SocketIOService 

14from ..search_engine_base import BaseSearchEngine 

15from ..search_engine_factory import create_search_engine 

16from ..search_engines_config import get_available_engines 

17 

18# Global thread pool shared by all ParallelSearchEngine instances 

19# This prevents creating multiple thread pools and having more threads than expected 

20_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None 

21_global_executor_lock = Lock() 

22 

23 

24def _get_global_executor( 

25 max_workers: Optional[int] = None, 

26) -> Optional[concurrent.futures.ThreadPoolExecutor]: 

27 """ 

28 Get or initialize the global thread pool executor. 

29 Thread-safe lazy initialization ensures only one pool is created. 

30 

31 Args: 

32 max_workers: Number of worker threads. If None, uses Python's recommended 

33 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

34 Only used on first initialization; subsequent calls ignore this parameter. 

35 

36 Returns: 

37 The global ThreadPoolExecutor instance, or None if initialization fails 

38 """ 

39 global _global_executor 

40 

41 with _global_executor_lock: 

42 if _global_executor is None: 

43 if max_workers is None: 

44 max_workers = min(32, (os.cpu_count() or 1) + 4) 

45 

46 try: 

47 _global_executor = concurrent.futures.ThreadPoolExecutor( 

48 max_workers=max_workers, 

49 thread_name_prefix="parallel_search_", 

50 ) 

51 logger.info( 

52 f"Initialized global ThreadPool with {max_workers} workers " 

53 f"(shared by all ParallelSearchEngine instances)" 

54 ) 

55 except Exception: 

56 logger.exception( 

57 "Failed to create global ThreadPoolExecutor, parallel search will not work" 

58 ) 

59 return None 

60 

61 return _global_executor 

62 

63 

64def shutdown_global_executor(wait: bool = True): 

65 """ 

66 Shutdown the global thread pool executor. 

67 

68 This is called automatically at process exit via atexit. 

69 After calling this, any new ParallelSearchEngine instances will create a new pool. 

70 

71 Args: 

72 wait: If True, wait for all threads to complete before returning 

73 """ 

74 global _global_executor 

75 

76 with _global_executor_lock: 

77 if _global_executor is not None: 

78 try: 

79 _global_executor.shutdown(wait=wait) 

80 logger.info("Global ThreadPool shutdown complete") 

81 except Exception: 

82 logger.exception("Error shutting down global ThreadPool") 

83 finally: 

84 _global_executor = None 

85 

86 

87# Register automatic cleanup at process exit 

88atexit.register(lambda: shutdown_global_executor(wait=True)) 

89 

90 

91class ParallelSearchEngine(BaseSearchEngine): 

92 """ 

93 Parallel search engine that executes multiple search engines simultaneously. 

94 Uses LLM to select appropriate engines based on query, then runs them all in parallel. 

95 

96 Thread Pool Management: 

97 All instances share a single global thread pool to prevent thread proliferation. 

98 The pool is automatically cleaned up at process exit. 

99 

100 Usage: 

101 engine = ParallelSearchEngine(llm) 

102 results = engine.run("query") 

103 # No manual cleanup needed - global pool is shared and cleaned up automatically 

104 """ 

105 

106 def __init__( 

107 self, 

108 llm, 

109 max_results: int = 10, 

110 use_api_key_services: bool = True, 

111 max_engines_to_select: int = 100, # Allow selecting all available engines 

112 allow_local_engines: bool = False, # Disabled by default for privacy 

113 include_generic_engines: bool = True, # Always include generic search engines 

114 search_mode: SearchMode = SearchMode.ALL, 

115 max_filtered_results: Optional[int] = None, 

116 settings_snapshot: Optional[Dict[str, Any]] = None, 

117 programmatic_mode: bool = False, 

118 max_workers: Optional[ 

119 int 

120 ] = None, # Thread pool size (None = auto-detect) 

121 **kwargs, 

122 ): 

123 """ 

124 Initialize the parallel search engine. 

125 

126 All instances share a global thread pool. The first instance created will 

127 initialize the pool with the specified max_workers (or auto-detected value). 

128 Subsequent instances reuse the existing pool. 

129 

130 Args: 

131 llm: Language model instance for query classification 

132 max_results: Maximum number of search results to return per engine 

133 use_api_key_services: Whether to include services that require API keys 

134 max_engines_to_select: Maximum number of engines to select for parallel execution 

135 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web) 

136 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.) 

137 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only 

138 max_filtered_results: Maximum number of results to keep after filtering 

139 settings_snapshot: Settings snapshot for thread context 

140 programmatic_mode: If True, disables database operations and metrics tracking 

141 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's 

142 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations. 

143 Ignored if the global pool is already initialized. 

144 **kwargs: Additional parameters (ignored but accepted for compatibility) 

145 """ 

146 # Override max_filtered_results to be much higher for parallel search 

147 # If not explicitly set, use 50 instead of the default 5-20 

148 if max_filtered_results is None: 

149 max_filtered_results = 50 

150 logger.info( 

151 f"Setting max_filtered_results to {max_filtered_results} for parallel search" 

152 ) 

153 

154 super().__init__( 

155 llm=llm, 

156 max_filtered_results=max_filtered_results, 

157 max_results=max_results, 

158 settings_snapshot=settings_snapshot, 

159 programmatic_mode=programmatic_mode, 

160 ) 

161 

162 self.use_api_key_services = use_api_key_services 

163 self.max_engines_to_select = max_engines_to_select 

164 self.allow_local_engines = allow_local_engines 

165 self.include_generic_engines = include_generic_engines 

166 self.search_mode = search_mode 

167 self.settings_snapshot = settings_snapshot or {} 

168 

169 # Disable LLM relevance filtering at the parallel level by default 

170 # Individual engines can still filter their own results 

171 # Double filtering (engines + parallel) is too aggressive 

172 self.enable_llm_relevance_filter = kwargs.get( 

173 "enable_llm_relevance_filter", False 

174 ) 

175 

176 # Cache for engine instances 

177 self.engine_cache: Dict[str, Any] = {} 

178 self.cache_lock = Lock() 

179 

180 # Initialize global thread pool (thread-safe, only happens once) 

181 # All instances share the same pool to prevent thread proliferation 

182 _get_global_executor(max_workers) 

183 

184 # Get available engines (excluding 'meta', 'auto', and 'parallel') 

185 self.available_engines = self._get_available_engines() 

186 logger.info( 

187 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}" 

188 ) 

189 

190 def _get_search_config(self) -> Dict[str, Any]: 

191 """Get search config for available engines.""" 

192 return get_available_engines( 

193 settings_snapshot=self.settings_snapshot, 

194 use_api_key_services=self.use_api_key_services, 

195 ) 

196 

197 def _get_available_engines(self) -> List[str]: 

198 """Get list of available engines, applying parallel-specific filters 

199 (is_public, search mode) on top of the shared base filter.""" 

200 base_available = get_available_engines( 

201 settings_snapshot=self.settings_snapshot, 

202 use_api_key_services=self.use_api_key_services, 

203 ) 

204 

205 available = [] 

206 for name, config_ in base_available.items(): 

207 # Try to get the engine class to check is_public flag 

208 success, engine_class, error_msg = ( 

209 BaseSearchEngine._load_engine_class(name, config_) 

210 ) 

211 

212 if not success: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 logger.debug(error_msg) 

214 continue 

215 

216 # Check if engine is public or if local engines are allowed 

217 if hasattr(engine_class, "is_public"): 

218 if not engine_class.is_public and not self.allow_local_engines: 

219 logger.debug(f"Skipping local/private engine: {name}") 

220 continue 

221 if not engine_class.is_public and self.allow_local_engines: 

222 logger.warning( 

223 f"Including local/private engine {name} - data may be exposed" 

224 ) 

225 else: 

226 # No is_public flag - assume it's private/local for safety 

227 if not self.allow_local_engines: 

228 logger.debug( 

229 f"Skipping engine {name} - no is_public flag and local engines not allowed" 

230 ) 

231 continue 

232 

233 # Apply scientific mode filtering if enabled 

234 if self.search_mode == SearchMode.SCIENTIFIC: 

235 is_scientific = getattr(engine_class, "is_scientific", False) 

236 is_generic = getattr(engine_class, "is_generic", False) 

237 

238 if not (is_scientific or is_generic): 

239 logger.debug( 

240 f"Skipping {name} in scientific mode (not scientific or generic)" 

241 ) 

242 continue 

243 

244 logger.debug( 

245 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})" 

246 ) 

247 

248 available.append(name) 

249 

250 return available 

251 

252 def _get_available_generic_engines(self) -> List[str]: 

253 """Get list of available generic search engines that pass API key checks""" 

254 generic_engines = [] 

255 config_data = self._get_search_config() 

256 

257 for name, config_ in config_data.items(): 

258 # Skip if not in available engines (already filtered for API keys etc) 

259 if name not in self.available_engines: 

260 continue 

261 

262 # Load the engine class to check is_generic flag 

263 success, engine_class, error_msg = ( 

264 BaseSearchEngine._load_engine_class(name, config_) 

265 ) 

266 

267 if not success: 

268 logger.debug( 

269 f"Could not check if {name} is generic: {error_msg}" 

270 ) 

271 continue 

272 

273 # Check if engine is generic 

274 if getattr(engine_class, "is_generic", False): 

275 generic_engines.append(name) 

276 logger.debug(f"Found generic engine: {name}") 

277 

278 return generic_engines 

279 

280 def select_engines(self, query: str) -> List[str]: 

281 """ 

282 Use LLM to select appropriate search engines based only on names. 

283 

284 Args: 

285 query: The search query 

286 

287 Returns: 

288 List of selected engine names 

289 """ 

290 if not self.llm or not self.available_engines: 

291 logger.warning( 

292 "No LLM or no available engines, using all available" 

293 ) 

294 return self.available_engines[: self.max_engines_to_select] 

295 

296 try: 

297 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added) 

298 engines_for_selection = self.available_engines.copy() 

299 generic_engines = [] 

300 

301 if self.include_generic_engines: 

302 generic_engines = self._get_available_generic_engines() 

303 # Remove generic engines from selection since they'll be added automatically 

304 engines_for_selection = [ 

305 e for e in engines_for_selection if e not in generic_engines 

306 ] 

307 logger.debug( 

308 f"Excluding generic engines from LLM selection: {generic_engines}" 

309 ) 

310 

311 # If no specialized engines available, just return the generic ones 

312 if not engines_for_selection: 

313 logger.info( 

314 f"No specialized engines available, using generic engines: {generic_engines}" 

315 ) 

316 return generic_engines 

317 

318 # Create a simple prompt with just non-generic engine names 

319 engine_list = "\n".join( 

320 [ 

321 f"[{i}] {name}" 

322 for i, name in enumerate(engines_for_selection) 

323 ] 

324 ) 

325 

326 logger.debug(f"Engines for LLM selection: {engines_for_selection}") 

327 

328 prompt = f"""Query: {query} 

329 

330Available search engines: 

331{engine_list} 

332 

333Select the most appropriate search engines for this query. Return ONLY a JSON array of indices. 

334Example: [0, 2, 5] 

335 

336Select up to {self.max_engines_to_select} engines that would best answer this query.""" 

337 

338 logger.debug("Sending prompt to LLM for engine selection") 

339 # Get LLM response 

340 response = self.llm.invoke(prompt) 

341 content = get_llm_response_text(response) 

342 

343 indices = extract_json(content, expected_type=list) 

344 

345 if indices is not None: 

346 # Convert indices to engine names (from the filtered list) 

347 selected_engines = [] 

348 for idx in indices: 

349 if isinstance(idx, int) and 0 <= idx < len( 

350 engines_for_selection 

351 ): 

352 selected_engines.append(engines_for_selection[idx]) 

353 

354 if selected_engines: 354 ↛ 366line 354 didn't jump to line 366 because the condition on line 354 was always true

355 # Add generic search engines (they were excluded from selection) 

356 if self.include_generic_engines: 

357 for engine in generic_engines: 

358 if engine not in selected_engines: 358 ↛ 357line 358 didn't jump to line 357 because the condition on line 358 was always true

359 selected_engines.append(engine) 

360 logger.debug(f"Added generic engine: {engine}") 

361 

362 logger.info(f"Final selected engines: {selected_engines}") 

363 return selected_engines 

364 

365 # Fallback if parsing fails - return generic engines plus some specialized ones 

366 logger.warning( 

367 "Failed to parse LLM response, using generic engines + top specialized" 

368 ) 

369 result = ( 

370 generic_engines.copy() if self.include_generic_engines else [] 

371 ) 

372 for engine in self.available_engines[: self.max_engines_to_select]: 

373 if engine not in result: 

374 result.append(engine) 

375 return result 

376 

377 except Exception: 

378 logger.exception("Error selecting engines with LLM") 

379 # Fallback to using generic engines + available engines 

380 if self.include_generic_engines: 

381 generic_engines = self._get_available_generic_engines() 

382 result = generic_engines.copy() 

383 for engine in self.available_engines[ 

384 : self.max_engines_to_select 

385 ]: 

386 if engine not in result: 386 ↛ 383line 386 didn't jump to line 383 because the condition on line 386 was always true

387 result.append(engine) 

388 return result 

389 return self.available_engines[: self.max_engines_to_select] 

390 

391 def _get_engine_instance( 

392 self, engine_name: str 

393 ) -> Optional[BaseSearchEngine]: 

394 """Get or create an instance of the specified search engine""" 

395 with self.cache_lock: 

396 # Return cached instance if available 

397 if engine_name in self.engine_cache: 

398 return self.engine_cache[engine_name] # type: ignore[no-any-return] 

399 

400 # Create a new instance 

401 engine = None 

402 try: 

403 # Only pass parameters that all engines accept 

404 common_params = { 

405 "llm": self.llm, 

406 "max_results": self.max_results, 

407 } 

408 

409 # Add max_filtered_results if specified 

410 if self.max_filtered_results is not None: 410 ↛ 415line 410 didn't jump to line 415 because the condition on line 410 was always true

411 common_params["max_filtered_results"] = ( 

412 self.max_filtered_results 

413 ) 

414 

415 engine = create_search_engine( 

416 engine_name, 

417 settings_snapshot=self.settings_snapshot, 

418 programmatic_mode=self.programmatic_mode, 

419 **common_params, # type: ignore[arg-type] 

420 ) 

421 

422 # Individual engines use their auto-detected filtering settings 

423 # The factory enables LLM filtering for scientific engines (arXiv, etc.) 

424 # and disables it for generic engines (Google, Brave, etc.) 

425 except Exception: 

426 logger.exception( 

427 f"Error creating engine instance for {engine_name}" 

428 ) 

429 return None 

430 

431 if engine: 431 ↛ 435line 431 didn't jump to line 435 because the condition on line 431 was always true

432 # Cache the instance 

433 self.engine_cache[engine_name] = engine 

434 

435 return engine 

436 

437 @preserve_research_context 

438 def _execute_single_engine( 

439 self, engine_name: str, query: str 

440 ) -> Dict[str, Any]: 

441 """ 

442 Execute a single search engine and return results. 

443 

444 Note: This method is decorated with @preserve_research_context to ensure 

445 rate limiting context is properly propagated to child threads. 

446 

447 Args: 

448 engine_name: Name of the engine to execute 

449 query: The search query 

450 

451 Returns: 

452 Dictionary with engine name and results or error 

453 """ 

454 logger.info(f"Executing search on {engine_name}") 

455 

456 engine = self._get_engine_instance(engine_name) 

457 if not engine: 

458 return { 

459 "engine": engine_name, 

460 "success": False, 

461 "error": f"Failed to initialize {engine_name}", 

462 "results": [], 

463 } 

464 

465 try: 

466 # Run the engine properly through its run() method 

467 # This ensures proper filter application, context propagation, etc. 

468 results = engine.run(query) 

469 

470 if results and len(results) > 0: 

471 logger.info(f"Got {len(results)} results from {engine_name}") 

472 return { 

473 "engine": engine_name, 

474 "success": True, 

475 "results": results, 

476 "count": len(results), 

477 } 

478 return { 

479 "engine": engine_name, 

480 "success": False, 

481 "error": "No results", 

482 "results": [], 

483 } 

484 

485 except Exception: 

486 logger.exception(f"Error executing {engine_name}") 

487 return { 

488 "engine": engine_name, 

489 "success": False, 

490 "error": f"Engine {engine_name} failed", 

491 "results": [], 

492 } 

493 

494 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

495 """ 

496 Get preview information by executing selected engines in parallel. 

497 

498 Args: 

499 query: The search query 

500 

501 Returns: 

502 Combined list of preview dictionaries from all successful engines 

503 """ 

504 # Select engines for this query 

505 selected_engines = self.select_engines(query) 

506 

507 if not selected_engines: 

508 logger.warning("No engines selected") 

509 return [] 

510 

511 logger.info( 

512 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}" 

513 ) 

514 

515 # Emit socket event about selected engines 

516 try: 

517 SocketIOService().emit_socket_event( 

518 "parallel_search_started", 

519 {"engines": selected_engines, "query": query}, 

520 ) 

521 except Exception: 

522 logger.exception("Socket emit error (non-critical)") 

523 

524 # Execute all engines in parallel using persistent thread pool 

525 all_results = [] 

526 engine_results = {} 

527 

528 # Get the global thread pool 

529 executor = _get_global_executor() 

530 if executor is None: 

531 logger.error( 

532 "Global thread pool not available, cannot execute parallel search" 

533 ) 

534 return [] 

535 

536 # Submit all tasks to the global executor 

537 future_to_engine = { 

538 executor.submit( 

539 self._execute_single_engine, engine_name, query 

540 ): engine_name 

541 for engine_name in selected_engines 

542 } 

543 

544 # Collect results as they complete 

545 for future in concurrent.futures.as_completed(future_to_engine): 

546 engine_name = future_to_engine[future] 

547 try: 

548 result = future.result() 

549 engine_results[engine_name] = result 

550 

551 if result["success"]: 

552 # Add source information to each result 

553 for item in result["results"]: 

554 item["search_engine"] = engine_name 

555 all_results.extend(result["results"]) 

556 

557 # Emit success event 

558 try: 

559 SocketIOService().emit_socket_event( 

560 "engine_completed", 

561 { 

562 "engine": engine_name, 

563 "success": True, 

564 "count": result["count"], 

565 }, 

566 ) 

567 except Exception: 

568 logger.debug("Socket emit error (non-critical)") 

569 else: 

570 # Emit failure event 

571 try: 

572 SocketIOService().emit_socket_event( 

573 "engine_completed", 

574 { 

575 "engine": engine_name, 

576 "success": False, 

577 "error": result.get("error", "Unknown error"), 

578 }, 

579 ) 

580 except Exception: 

581 logger.debug("Socket emit error (non-critical)") 

582 

583 except Exception: 

584 logger.exception(f"Failed to get result from {engine_name}") 

585 engine_results[engine_name] = { 

586 "engine": engine_name, 

587 "success": False, 

588 "error": "Search execution failed", 

589 "results": [], 

590 } 

591 

592 # Log summary 

593 successful_engines = [ 

594 name for name, res in engine_results.items() if res["success"] 

595 ] 

596 failed_engines = [ 

597 name for name, res in engine_results.items() if not res["success"] 

598 ] 

599 

600 logger.info( 

601 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed" 

602 ) 

603 if successful_engines: 

604 logger.info(f"Successful engines: {', '.join(successful_engines)}") 

605 if failed_engines: 

606 logger.info(f"Failed engines: {', '.join(failed_engines)}") 

607 

608 # Log sample result to understand structure 

609 if all_results: 

610 logger.debug( 

611 f"Sample result keys from first result: {list(all_results[0].keys())}" 

612 ) 

613 logger.debug(f"Sample result: {str(all_results[0])[:500]}") 

614 

615 logger.info(f"Total results from all engines: {len(all_results)}") 

616 

617 # Store the engine results for potential use in _get_full_content 

618 self._engine_results = engine_results 

619 self._successful_engines = successful_engines 

620 

621 return all_results 

622 

623 def _get_full_content( 

624 self, relevant_items: List[Dict[str, Any]] 

625 ) -> List[Dict[str, Any]]: 

626 """ 

627 Get full content for the relevant items. 

628 Since we ran multiple engines, we'll group items by their source engine 

629 and get full content from each engine for its own results. 

630 

631 Args: 

632 relevant_items: List of relevant preview dictionaries 

633 

634 Returns: 

635 List of result dictionaries with full content 

636 """ 

637 # Check if we should get full content 

638 if get_setting_from_snapshot( 

639 "search.snippets_only", 

640 True, 

641 settings_snapshot=self.settings_snapshot, 

642 ): 

643 logger.info("Snippet-only mode, skipping full content retrieval") 

644 return relevant_items 

645 

646 logger.info("Getting full content for relevant items") 

647 

648 # Group items by their source engine 

649 items_by_engine: Dict[str, List[Dict[str, Any]]] = {} 

650 for item in relevant_items: 

651 engine_name = item.get("search_engine") 

652 if engine_name: 

653 if engine_name not in items_by_engine: 653 ↛ 655line 653 didn't jump to line 655 because the condition on line 653 was always true

654 items_by_engine[engine_name] = [] 

655 items_by_engine[engine_name].append(item) 

656 

657 # Get full content from each engine for its items 

658 all_full_content = [] 

659 

660 for engine_name, items in items_by_engine.items(): 

661 engine = self._get_engine_instance(engine_name) 

662 if engine: 662 ↛ 677line 662 didn't jump to line 677 because the condition on line 662 was always true

663 try: 

664 logger.info( 

665 f"Getting full content from {engine_name} for {len(items)} items" 

666 ) 

667 full_content = engine._get_full_content(items) 

668 all_full_content.extend(full_content) 

669 except Exception: 

670 logger.exception( 

671 f"Error getting full content from {engine_name}" 

672 ) 

673 # Fall back to returning items without full content 

674 all_full_content.extend(items) 

675 else: 

676 # No engine available, return items as-is 

677 all_full_content.extend(items) 

678 

679 return all_full_content 

680 

681 def close(self): 

682 """Close all cached child search engines and own resources.""" 

683 from ...utilities.resource_utils import safe_close 

684 

685 for engine in self.engine_cache.values(): 

686 safe_close(engine, "child search engine") 

687 self.engine_cache.clear() 

688 super().close() 

689 

690 def invoke(self, query: str) -> List[Dict[str, Any]]: 

691 """Compatibility method for LangChain tools""" 

692 return self.run(query) 

693 

694 # Note: No shutdown() or context manager methods needed 

695 # The global thread pool is automatically cleaned up at process exit via atexit