Coverage for src / local_deep_research / web_search_engines / engines / parallel_search_engine.py: 85%
287 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import os
2import concurrent.futures
3from typing import Any, Dict, List, Optional
4from threading import Lock
5import atexit
7from loguru import logger
9from ...config.search_config import get_setting_from_snapshot
10from ...utilities.enums import SearchMode
11from ...utilities.json_utils import extract_json, get_llm_response_text
12from ...utilities.thread_context import preserve_research_context
13from ...web.services.socket_service import SocketIOService
14from ..search_engine_base import BaseSearchEngine
15from ..search_engine_factory import create_search_engine
17# Global thread pool shared by all ParallelSearchEngine instances
18# This prevents creating multiple thread pools and having more threads than expected
19_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
20_global_executor_lock = Lock()
23def _get_global_executor(
24 max_workers: Optional[int] = None,
25) -> Optional[concurrent.futures.ThreadPoolExecutor]:
26 """
27 Get or initialize the global thread pool executor.
28 Thread-safe lazy initialization ensures only one pool is created.
30 Args:
31 max_workers: Number of worker threads. If None, uses Python's recommended
32 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
33 Only used on first initialization; subsequent calls ignore this parameter.
35 Returns:
36 The global ThreadPoolExecutor instance, or None if initialization fails
37 """
38 global _global_executor
40 with _global_executor_lock:
41 if _global_executor is None:
42 if max_workers is None:
43 max_workers = min(32, (os.cpu_count() or 1) + 4)
45 try:
46 _global_executor = concurrent.futures.ThreadPoolExecutor(
47 max_workers=max_workers,
48 thread_name_prefix="parallel_search_",
49 )
50 logger.info(
51 f"Initialized global ThreadPool with {max_workers} workers "
52 f"(shared by all ParallelSearchEngine instances)"
53 )
54 except Exception:
55 logger.exception(
56 "Failed to create global ThreadPoolExecutor, parallel search will not work"
57 )
58 return None
60 return _global_executor
63def shutdown_global_executor(wait: bool = True):
64 """
65 Shutdown the global thread pool executor.
67 This is called automatically at process exit via atexit.
68 After calling this, any new ParallelSearchEngine instances will create a new pool.
70 Args:
71 wait: If True, wait for all threads to complete before returning
72 """
73 global _global_executor
75 with _global_executor_lock:
76 if _global_executor is not None:
77 try:
78 _global_executor.shutdown(wait=wait)
79 logger.info("Global ThreadPool shutdown complete")
80 except Exception:
81 logger.exception("Error shutting down global ThreadPool")
82 finally:
83 _global_executor = None
86# Register automatic cleanup at process exit
87atexit.register(lambda: shutdown_global_executor(wait=True))
90class ParallelSearchEngine(BaseSearchEngine):
91 """
92 Parallel search engine that executes multiple search engines simultaneously.
93 Uses LLM to select appropriate engines based on query, then runs them all in parallel.
95 Thread Pool Management:
96 All instances share a single global thread pool to prevent thread proliferation.
97 The pool is automatically cleaned up at process exit.
99 Usage:
100 engine = ParallelSearchEngine(llm)
101 results = engine.run("query")
102 # No manual cleanup needed - global pool is shared and cleaned up automatically
103 """
105 def __init__(
106 self,
107 llm,
108 max_results: int = 10,
109 use_api_key_services: bool = True,
110 max_engines_to_select: int = 100, # Allow selecting all available engines
111 allow_local_engines: bool = False, # Disabled by default for privacy
112 include_generic_engines: bool = True, # Always include generic search engines
113 search_mode: SearchMode = SearchMode.ALL,
114 max_filtered_results: Optional[int] = None,
115 settings_snapshot: Optional[Dict[str, Any]] = None,
116 programmatic_mode: bool = False,
117 max_workers: Optional[
118 int
119 ] = None, # Thread pool size (None = auto-detect)
120 **kwargs,
121 ):
122 """
123 Initialize the parallel search engine.
125 All instances share a global thread pool. The first instance created will
126 initialize the pool with the specified max_workers (or auto-detected value).
127 Subsequent instances reuse the existing pool.
129 Args:
130 llm: Language model instance for query classification
131 max_results: Maximum number of search results to return per engine
132 use_api_key_services: Whether to include services that require API keys
133 max_engines_to_select: Maximum number of engines to select for parallel execution
134 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web)
135 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.)
136 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only
137 max_filtered_results: Maximum number of results to keep after filtering
138 settings_snapshot: Settings snapshot for thread context
139 programmatic_mode: If True, disables database operations and metrics tracking
140 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's
141 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
142 Ignored if the global pool is already initialized.
143 **kwargs: Additional parameters (ignored but accepted for compatibility)
144 """
145 # Override max_filtered_results to be much higher for parallel search
146 # If not explicitly set, use 50 instead of the default 5-20
147 if max_filtered_results is None:
148 max_filtered_results = 50
149 logger.info(
150 f"Setting max_filtered_results to {max_filtered_results} for parallel search"
151 )
153 super().__init__(
154 llm=llm,
155 max_filtered_results=max_filtered_results,
156 max_results=max_results,
157 settings_snapshot=settings_snapshot,
158 programmatic_mode=programmatic_mode,
159 )
161 self.use_api_key_services = use_api_key_services
162 self.max_engines_to_select = max_engines_to_select
163 self.allow_local_engines = allow_local_engines
164 self.include_generic_engines = include_generic_engines
165 self.search_mode = search_mode
166 self.settings_snapshot = settings_snapshot
168 # Disable LLM relevance filtering at the parallel level by default
169 # Individual engines can still filter their own results
170 # Double filtering (engines + parallel) is too aggressive
171 self.enable_llm_relevance_filter = kwargs.get(
172 "enable_llm_relevance_filter", False
173 )
175 # Cache for engine instances
176 self.engine_cache = {}
177 self.cache_lock = Lock()
179 # Initialize global thread pool (thread-safe, only happens once)
180 # All instances share the same pool to prevent thread proliferation
181 _get_global_executor(max_workers)
183 # Get available engines (excluding 'meta', 'auto', and 'parallel')
184 self.available_engines = self._get_available_engines()
185 logger.info(
186 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}"
187 )
189 def _check_api_key_availability(
190 self, name: str, config_: Dict[str, Any]
191 ) -> bool:
192 """
193 Check if API keys are available for engines that require them.
195 Args:
196 name: Engine name
197 config_: Engine configuration
199 Returns:
200 True if the engine can be used (API key available or not required)
201 """
202 # If engine doesn't require API key, it's available
203 if not config_.get("requires_api_key", False):
204 return True
206 # Check if API key is configured
207 api_key_setting = config_.get("api_key_setting")
208 if api_key_setting and self.settings_snapshot:
209 api_key = self.settings_snapshot.get(api_key_setting)
210 if api_key and str(api_key).strip():
211 return True
212 logger.debug(f"Skipping {name} - API key not configured")
213 return False
215 # No API key setting defined, assume it's available
216 return True
218 def _get_search_config(self) -> Dict[str, Any]:
219 """Get search config from settings_snapshot"""
220 if self.settings_snapshot:
221 # Extract search engine configs from settings snapshot
222 config_data = {}
223 for key, value in self.settings_snapshot.items():
224 if key.startswith("search.engine.web."):
225 parts = key.split(".")
226 if len(parts) >= 4: 226 ↛ 223line 226 didn't jump to line 223 because the condition on line 226 was always true
227 engine_name = parts[3]
228 if engine_name not in config_data:
229 config_data[engine_name] = {}
230 remaining_key = (
231 ".".join(parts[4:]) if len(parts) > 4 else ""
232 )
233 if remaining_key:
234 config_data[engine_name][remaining_key] = (
235 value.get("value")
236 if isinstance(value, dict)
237 else value
238 )
239 return config_data
240 else:
241 return {}
243 def _get_available_engines(self) -> List[str]:
244 """Get list of available engines based on is_public flag and API key availability"""
245 available = []
246 config_data = self._get_search_config()
248 for name, config_ in config_data.items():
249 # Skip meta, auto, and parallel engines (special engines)
250 if name in ["meta", "auto", "parallel"]:
251 continue
253 # Try to get the engine class to check is_public flag
254 success, engine_class, error_msg = (
255 BaseSearchEngine._load_engine_class(name, config_)
256 )
258 if not success:
259 logger.debug(error_msg)
260 continue
262 # Check if engine is public or if local engines are allowed
263 if hasattr(engine_class, "is_public"): 263 ↛ 273line 263 didn't jump to line 273 because the condition on line 263 was always true
264 if not engine_class.is_public and not self.allow_local_engines:
265 logger.debug(f"Skipping local/private engine: {name}")
266 continue
267 elif not engine_class.is_public and self.allow_local_engines: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 logger.warning(
269 f"Including local/private engine {name} - data may be exposed"
270 )
271 else:
272 # No is_public flag - assume it's private/local for safety
273 if not self.allow_local_engines:
274 logger.debug(
275 f"Skipping engine {name} - no is_public flag and local engines not allowed"
276 )
277 continue
279 # Apply scientific mode filtering if enabled
280 if self.search_mode == SearchMode.SCIENTIFIC:
281 # In scientific mode: include scientific engines AND generic engines
282 # Exclude non-scientific specialized engines (like Guardian, Wayback)
283 is_scientific = getattr(engine_class, "is_scientific", False)
284 is_generic = getattr(engine_class, "is_generic", False)
286 if not (is_scientific or is_generic):
287 logger.debug(
288 f"Skipping {name} in scientific mode (not scientific or generic)"
289 )
290 continue
292 logger.debug(
293 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})"
294 )
295 # Skip engines that require API keys if we don't want to use them
296 if ( 296 ↛ 300line 296 didn't jump to line 300 because the condition on line 296 was never true
297 config_.get("requires_api_key", False)
298 and not self.use_api_key_services
299 ):
300 logger.debug(
301 f"Skipping {name} - requires API key and use_api_key_services is False"
302 )
303 continue
305 # Check API key availability
306 if not self._check_api_key_availability(name, config_): 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 continue
309 available.append(name)
311 return available
313 def _get_available_generic_engines(self) -> List[str]:
314 """Get list of available generic search engines that pass API key checks"""
315 generic_engines = []
316 config_data = self._get_search_config()
318 for name, config_ in config_data.items(): 318 ↛ 320line 318 didn't jump to line 320 because the loop on line 318 never started
319 # Skip if not in available engines (already filtered for API keys etc)
320 if name not in self.available_engines:
321 continue
323 # Load the engine class to check is_generic flag
324 success, engine_class, error_msg = (
325 BaseSearchEngine._load_engine_class(name, config_)
326 )
328 if not success:
329 logger.debug(
330 f"Could not check if {name} is generic: {error_msg}"
331 )
332 continue
334 # Check if engine is generic
335 if getattr(engine_class, "is_generic", False):
336 generic_engines.append(name)
337 logger.debug(f"Found generic engine: {name}")
339 return generic_engines
341 def select_engines(self, query: str) -> List[str]:
342 """
343 Use LLM to select appropriate search engines based only on names.
345 Args:
346 query: The search query
348 Returns:
349 List of selected engine names
350 """
351 if not self.llm or not self.available_engines:
352 logger.warning(
353 "No LLM or no available engines, using all available"
354 )
355 return self.available_engines[: self.max_engines_to_select]
357 try:
358 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added)
359 engines_for_selection = self.available_engines.copy()
360 generic_engines = []
362 if self.include_generic_engines:
363 generic_engines = self._get_available_generic_engines()
364 # Remove generic engines from selection since they'll be added automatically
365 engines_for_selection = [
366 e for e in engines_for_selection if e not in generic_engines
367 ]
368 logger.debug(
369 f"Excluding generic engines from LLM selection: {generic_engines}"
370 )
372 # If no specialized engines available, just return the generic ones
373 if not engines_for_selection: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 logger.info(
375 f"No specialized engines available, using generic engines: {generic_engines}"
376 )
377 return generic_engines
379 # Create a simple prompt with just non-generic engine names
380 engine_list = "\n".join(
381 [
382 f"[{i}] {name}"
383 for i, name in enumerate(engines_for_selection)
384 ]
385 )
387 logger.debug(f"Engines for LLM selection: {engines_for_selection}")
389 prompt = f"""Query: {query}
391Available search engines:
392{engine_list}
394Select the most appropriate search engines for this query. Return ONLY a JSON array of indices.
395Example: [0, 2, 5]
397Select up to {self.max_engines_to_select} engines that would best answer this query."""
399 logger.debug("Sending prompt to LLM for engine selection")
400 # Get LLM response
401 response = self.llm.invoke(prompt)
402 content = get_llm_response_text(response)
404 indices = extract_json(content, expected_type=list)
406 if indices is not None:
407 # Convert indices to engine names (from the filtered list)
408 selected_engines = []
409 for idx in indices:
410 if isinstance(idx, int) and 0 <= idx < len(
411 engines_for_selection
412 ):
413 selected_engines.append(engines_for_selection[idx])
415 if selected_engines: 415 ↛ 427line 415 didn't jump to line 427 because the condition on line 415 was always true
416 # Add generic search engines (they were excluded from selection)
417 if self.include_generic_engines:
418 for engine in generic_engines:
419 if engine not in selected_engines: 419 ↛ 418line 419 didn't jump to line 418 because the condition on line 419 was always true
420 selected_engines.append(engine)
421 logger.debug(f"Added generic engine: {engine}")
423 logger.info(f"Final selected engines: {selected_engines}")
424 return selected_engines
426 # Fallback if parsing fails - return generic engines plus some specialized ones
427 logger.warning(
428 "Failed to parse LLM response, using generic engines + top specialized"
429 )
430 result = (
431 generic_engines.copy() if self.include_generic_engines else []
432 )
433 for engine in self.available_engines[: self.max_engines_to_select]:
434 if engine not in result:
435 result.append(engine)
436 return result
438 except Exception:
439 logger.exception("Error selecting engines with LLM")
440 # Fallback to using generic engines + available engines
441 if self.include_generic_engines: 441 ↛ 450line 441 didn't jump to line 450 because the condition on line 441 was always true
442 generic_engines = self._get_available_generic_engines()
443 result = generic_engines.copy()
444 for engine in self.available_engines[
445 : self.max_engines_to_select
446 ]:
447 if engine not in result: 447 ↛ 444line 447 didn't jump to line 444 because the condition on line 447 was always true
448 result.append(engine)
449 return result
450 return self.available_engines[: self.max_engines_to_select]
452 def _get_engine_instance(
453 self, engine_name: str
454 ) -> Optional[BaseSearchEngine]:
455 """Get or create an instance of the specified search engine"""
456 with self.cache_lock:
457 # Return cached instance if available
458 if engine_name in self.engine_cache:
459 return self.engine_cache[engine_name]
461 # Create a new instance
462 engine = None
463 try:
464 # Only pass parameters that all engines accept
465 common_params = {
466 "llm": self.llm,
467 "max_results": self.max_results,
468 }
470 # Add max_filtered_results if specified
471 if self.max_filtered_results is not None: 471 ↛ 476line 471 didn't jump to line 476 because the condition on line 471 was always true
472 common_params["max_filtered_results"] = (
473 self.max_filtered_results
474 )
476 engine = create_search_engine(
477 engine_name,
478 settings_snapshot=self.settings_snapshot,
479 programmatic_mode=self.programmatic_mode,
480 **common_params,
481 )
483 # Individual engines use their auto-detected filtering settings
484 # The factory enables LLM filtering for scientific engines (arXiv, etc.)
485 # and disables it for generic engines (Google, Brave, etc.)
486 except Exception:
487 logger.exception(
488 f"Error creating engine instance for {engine_name}"
489 )
490 return None
492 if engine: 492 ↛ 496line 492 didn't jump to line 496 because the condition on line 492 was always true
493 # Cache the instance
494 self.engine_cache[engine_name] = engine
496 return engine
498 @preserve_research_context
499 def _execute_single_engine(
500 self, engine_name: str, query: str
501 ) -> Dict[str, Any]:
502 """
503 Execute a single search engine and return results.
505 Note: This method is decorated with @preserve_research_context to ensure
506 rate limiting context is properly propagated to child threads.
508 Args:
509 engine_name: Name of the engine to execute
510 query: The search query
512 Returns:
513 Dictionary with engine name and results or error
514 """
515 logger.info(f"Executing search on {engine_name}")
517 engine = self._get_engine_instance(engine_name)
518 if not engine:
519 return {
520 "engine": engine_name,
521 "success": False,
522 "error": f"Failed to initialize {engine_name}",
523 "results": [],
524 }
526 try:
527 # Run the engine properly through its run() method
528 # This ensures proper filter application, context propagation, etc.
529 results = engine.run(query)
531 if results and len(results) > 0:
532 logger.info(f"Got {len(results)} results from {engine_name}")
533 return {
534 "engine": engine_name,
535 "success": True,
536 "results": results,
537 "count": len(results),
538 }
539 else:
540 return {
541 "engine": engine_name,
542 "success": False,
543 "error": "No results",
544 "results": [],
545 }
547 except Exception:
548 logger.exception(f"Error executing {engine_name}")
549 return {
550 "engine": engine_name,
551 "success": False,
552 "error": f"Engine {engine_name} failed",
553 "results": [],
554 }
556 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
557 """
558 Get preview information by executing selected engines in parallel.
560 Args:
561 query: The search query
563 Returns:
564 Combined list of preview dictionaries from all successful engines
565 """
566 # Select engines for this query
567 selected_engines = self.select_engines(query)
569 if not selected_engines:
570 logger.warning("No engines selected")
571 return []
573 logger.info(
574 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}"
575 )
577 # Emit socket event about selected engines
578 try:
579 SocketIOService().emit_socket_event(
580 "parallel_search_started",
581 {"engines": selected_engines, "query": query},
582 )
583 except Exception:
584 logger.exception("Socket emit error (non-critical)")
586 # Execute all engines in parallel using persistent thread pool
587 all_results = []
588 engine_results = {}
590 # Get the global thread pool
591 executor = _get_global_executor()
592 if executor is None:
593 logger.error(
594 "Global thread pool not available, cannot execute parallel search"
595 )
596 return []
598 # Submit all tasks to the global executor
599 future_to_engine = {
600 executor.submit(
601 self._execute_single_engine, engine_name, query
602 ): engine_name
603 for engine_name in selected_engines
604 }
606 # Collect results as they complete
607 for future in concurrent.futures.as_completed(future_to_engine):
608 engine_name = future_to_engine[future]
609 try:
610 result = future.result()
611 engine_results[engine_name] = result
613 if result["success"]: 613 ↛ 633line 613 didn't jump to line 633 because the condition on line 613 was always true
614 # Add source information to each result
615 for item in result["results"]:
616 item["search_engine"] = engine_name
617 all_results.extend(result["results"])
619 # Emit success event
620 try:
621 SocketIOService().emit_socket_event(
622 "engine_completed",
623 {
624 "engine": engine_name,
625 "success": True,
626 "count": result["count"],
627 },
628 )
629 except Exception:
630 logger.debug("Socket emit error (non-critical)")
631 else:
632 # Emit failure event
633 try:
634 SocketIOService().emit_socket_event(
635 "engine_completed",
636 {
637 "engine": engine_name,
638 "success": False,
639 "error": result.get("error", "Unknown error"),
640 },
641 )
642 except Exception:
643 logger.debug("Socket emit error (non-critical)")
645 except Exception:
646 logger.exception(f"Failed to get result from {engine_name}")
647 engine_results[engine_name] = {
648 "engine": engine_name,
649 "success": False,
650 "error": "Search execution failed",
651 "results": [],
652 }
654 # Log summary
655 successful_engines = [
656 name for name, res in engine_results.items() if res["success"]
657 ]
658 failed_engines = [
659 name for name, res in engine_results.items() if not res["success"]
660 ]
662 logger.info(
663 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed"
664 )
665 if successful_engines: 665 ↛ 667line 665 didn't jump to line 667 because the condition on line 665 was always true
666 logger.info(f"Successful engines: {', '.join(successful_engines)}")
667 if failed_engines:
668 logger.info(f"Failed engines: {', '.join(failed_engines)}")
670 # Log sample result to understand structure
671 if all_results: 671 ↛ 677line 671 didn't jump to line 677 because the condition on line 671 was always true
672 logger.debug(
673 f"Sample result keys from first result: {list(all_results[0].keys())}"
674 )
675 logger.debug(f"Sample result: {str(all_results[0])[:500]}")
677 logger.info(f"Total results from all engines: {len(all_results)}")
679 # Store the engine results for potential use in _get_full_content
680 self._engine_results = engine_results
681 self._successful_engines = successful_engines
683 return all_results
685 def _get_full_content(
686 self, relevant_items: List[Dict[str, Any]]
687 ) -> List[Dict[str, Any]]:
688 """
689 Get full content for the relevant items.
690 Since we ran multiple engines, we'll group items by their source engine
691 and get full content from each engine for its own results.
693 Args:
694 relevant_items: List of relevant preview dictionaries
696 Returns:
697 List of result dictionaries with full content
698 """
699 # Check if we should get full content
700 if get_setting_from_snapshot(
701 "search.snippets_only",
702 True,
703 settings_snapshot=self.settings_snapshot,
704 ):
705 logger.info("Snippet-only mode, skipping full content retrieval")
706 return relevant_items
708 logger.info("Getting full content for relevant items")
710 # Group items by their source engine
711 items_by_engine = {}
712 for item in relevant_items:
713 engine_name = item.get("search_engine")
714 if engine_name: 714 ↛ 712line 714 didn't jump to line 712 because the condition on line 714 was always true
715 if engine_name not in items_by_engine: 715 ↛ 717line 715 didn't jump to line 717 because the condition on line 715 was always true
716 items_by_engine[engine_name] = []
717 items_by_engine[engine_name].append(item)
719 # Get full content from each engine for its items
720 all_full_content = []
722 for engine_name, items in items_by_engine.items():
723 engine = self._get_engine_instance(engine_name)
724 if engine: 724 ↛ 739line 724 didn't jump to line 739 because the condition on line 724 was always true
725 try:
726 logger.info(
727 f"Getting full content from {engine_name} for {len(items)} items"
728 )
729 full_content = engine._get_full_content(items)
730 all_full_content.extend(full_content)
731 except Exception:
732 logger.exception(
733 f"Error getting full content from {engine_name}"
734 )
735 # Fall back to returning items without full content
736 all_full_content.extend(items)
737 else:
738 # No engine available, return items as-is
739 all_full_content.extend(items)
741 return all_full_content
743 def invoke(self, query: str) -> List[Dict[str, Any]]:
744 """Compatibility method for LangChain tools"""
745 return self.run(query)
747 # Note: No shutdown() or context manager methods needed
748 # The global thread pool is automatically cleaned up at process exit via atexit