Coverage for src / local_deep_research / web_search_engines / engines / parallel_search_engine.py: 17%
292 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import json
2import os
3import concurrent.futures
4from typing import Any, Dict, List, Optional
5from threading import Lock
6import atexit
8from loguru import logger
10from ...config.search_config import get_setting_from_snapshot
11from ...utilities.enums import SearchMode
12from ...utilities.thread_context import preserve_research_context
13from ...web.services.socket_service import SocketIOService
14from ..search_engine_base import BaseSearchEngine
15from ..search_engine_factory import create_search_engine
17# Global thread pool shared by all ParallelSearchEngine instances
18# This prevents creating multiple thread pools and having more threads than expected
19_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
20_global_executor_lock = Lock()
23def _get_global_executor(
24 max_workers: Optional[int] = None,
25) -> Optional[concurrent.futures.ThreadPoolExecutor]:
26 """
27 Get or initialize the global thread pool executor.
28 Thread-safe lazy initialization ensures only one pool is created.
30 Args:
31 max_workers: Number of worker threads. If None, uses Python's recommended
32 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
33 Only used on first initialization; subsequent calls ignore this parameter.
35 Returns:
36 The global ThreadPoolExecutor instance, or None if initialization fails
37 """
38 global _global_executor
40 with _global_executor_lock:
41 if _global_executor is None:
42 if max_workers is None: 42 ↛ 45line 42 didn't jump to line 45 because the condition on line 42 was always true
43 max_workers = min(32, (os.cpu_count() or 1) + 4)
45 try:
46 _global_executor = concurrent.futures.ThreadPoolExecutor(
47 max_workers=max_workers,
48 thread_name_prefix="parallel_search_",
49 )
50 logger.info(
51 f"Initialized global ThreadPool with {max_workers} workers "
52 f"(shared by all ParallelSearchEngine instances)"
53 )
54 except Exception:
55 logger.exception(
56 "Failed to create global ThreadPoolExecutor, parallel search will not work"
57 )
58 return None
60 return _global_executor
63def shutdown_global_executor(wait: bool = True):
64 """
65 Shutdown the global thread pool executor.
67 This is called automatically at process exit via atexit.
68 After calling this, any new ParallelSearchEngine instances will create a new pool.
70 Args:
71 wait: If True, wait for all threads to complete before returning
72 """
73 global _global_executor
75 with _global_executor_lock:
76 if _global_executor is not None:
77 try:
78 _global_executor.shutdown(wait=wait)
79 logger.info("Global ThreadPool shutdown complete")
80 except Exception:
81 logger.exception("Error shutting down global ThreadPool")
82 finally:
83 _global_executor = None
86# Register automatic cleanup at process exit
87atexit.register(lambda: shutdown_global_executor(wait=True))
90class ParallelSearchEngine(BaseSearchEngine):
91 """
92 Parallel search engine that executes multiple search engines simultaneously.
93 Uses LLM to select appropriate engines based on query, then runs them all in parallel.
95 Thread Pool Management:
96 All instances share a single global thread pool to prevent thread proliferation.
97 The pool is automatically cleaned up at process exit.
99 Usage:
100 engine = ParallelSearchEngine(llm)
101 results = engine.run("query")
102 # No manual cleanup needed - global pool is shared and cleaned up automatically
103 """
105 def __init__(
106 self,
107 llm,
108 max_results: int = 10,
109 use_api_key_services: bool = True,
110 max_engines_to_select: int = 100, # Allow selecting all available engines
111 allow_local_engines: bool = False, # Disabled by default for privacy
112 include_generic_engines: bool = True, # Always include generic search engines
113 search_mode: SearchMode = SearchMode.ALL,
114 max_filtered_results: Optional[int] = None,
115 settings_snapshot: Optional[Dict[str, Any]] = None,
116 programmatic_mode: bool = False,
117 max_workers: Optional[
118 int
119 ] = None, # Thread pool size (None = auto-detect)
120 **kwargs,
121 ):
122 """
123 Initialize the parallel search engine.
125 All instances share a global thread pool. The first instance created will
126 initialize the pool with the specified max_workers (or auto-detected value).
127 Subsequent instances reuse the existing pool.
129 Args:
130 llm: Language model instance for query classification
131 max_results: Maximum number of search results to return per engine
132 use_api_key_services: Whether to include services that require API keys
133 max_engines_to_select: Maximum number of engines to select for parallel execution
134 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web)
135 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.)
136 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only
137 max_filtered_results: Maximum number of results to keep after filtering
138 settings_snapshot: Settings snapshot for thread context
139 programmatic_mode: If True, disables database operations and metrics tracking
140 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's
141 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
142 Ignored if the global pool is already initialized.
143 **kwargs: Additional parameters (ignored but accepted for compatibility)
144 """
145 # Override max_filtered_results to be much higher for parallel search
146 # If not explicitly set, use 50 instead of the default 5-20
147 if max_filtered_results is None: 147 ↛ 153line 147 didn't jump to line 153 because the condition on line 147 was always true
148 max_filtered_results = 50
149 logger.info(
150 f"Setting max_filtered_results to {max_filtered_results} for parallel search"
151 )
153 super().__init__(
154 llm=llm,
155 max_filtered_results=max_filtered_results,
156 max_results=max_results,
157 settings_snapshot=settings_snapshot,
158 programmatic_mode=programmatic_mode,
159 )
161 self.use_api_key_services = use_api_key_services
162 self.max_engines_to_select = max_engines_to_select
163 self.allow_local_engines = allow_local_engines
164 self.include_generic_engines = include_generic_engines
165 self.search_mode = search_mode
166 self.settings_snapshot = settings_snapshot
168 # Disable LLM relevance filtering at the parallel level by default
169 # Individual engines can still filter their own results
170 # Double filtering (engines + parallel) is too aggressive
171 self.enable_llm_relevance_filter = kwargs.get(
172 "enable_llm_relevance_filter", False
173 )
175 # Cache for engine instances
176 self.engine_cache = {}
177 self.cache_lock = Lock()
179 # Initialize global thread pool (thread-safe, only happens once)
180 # All instances share the same pool to prevent thread proliferation
181 _get_global_executor(max_workers)
183 # Get available engines (excluding 'meta', 'auto', and 'parallel')
184 self.available_engines = self._get_available_engines()
185 logger.info(
186 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}"
187 )
189 def _check_api_key_availability(
190 self, name: str, config_: Dict[str, Any]
191 ) -> bool:
192 """
193 Check if API keys are available for engines that require them.
195 Args:
196 name: Engine name
197 config_: Engine configuration
199 Returns:
200 True if the engine can be used (API key available or not required)
201 """
202 # If engine doesn't require API key, it's available
203 if not config_.get("requires_api_key", False):
204 return True
206 # Check if API key is configured
207 api_key_setting = config_.get("api_key_setting")
208 if api_key_setting and self.settings_snapshot:
209 api_key = self.settings_snapshot.get(api_key_setting)
210 if api_key and str(api_key).strip():
211 return True
212 logger.debug(f"Skipping {name} - API key not configured")
213 return False
215 # No API key setting defined, assume it's available
216 return True
218 def _get_search_config(self) -> Dict[str, Any]:
219 """Get search config from settings_snapshot"""
220 if self.settings_snapshot: 220 ↛ 241line 220 didn't jump to line 241 because the condition on line 220 was always true
221 # Extract search engine configs from settings snapshot
222 config_data = {}
223 for key, value in self.settings_snapshot.items():
224 if key.startswith("search.engine.web."): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 parts = key.split(".")
226 if len(parts) >= 4:
227 engine_name = parts[3]
228 if engine_name not in config_data:
229 config_data[engine_name] = {}
230 remaining_key = (
231 ".".join(parts[4:]) if len(parts) > 4 else ""
232 )
233 if remaining_key:
234 config_data[engine_name][remaining_key] = (
235 value.get("value")
236 if isinstance(value, dict)
237 else value
238 )
239 return config_data
240 else:
241 return {}
243 def _get_available_engines(self) -> List[str]:
244 """Get list of available engines based on is_public flag and API key availability"""
245 available = []
246 config_data = self._get_search_config()
248 for name, config_ in config_data.items(): 248 ↛ 250line 248 didn't jump to line 250 because the loop on line 248 never started
249 # Skip meta, auto, and parallel engines (special engines)
250 if name in ["meta", "auto", "parallel"]:
251 continue
253 # Try to get the engine class to check is_public flag
254 success, engine_class, error_msg = (
255 BaseSearchEngine._load_engine_class(name, config_)
256 )
258 if not success:
259 logger.debug(error_msg)
260 continue
262 # Check if engine is public or if local engines are allowed
263 if hasattr(engine_class, "is_public"):
264 if not engine_class.is_public and not self.allow_local_engines:
265 logger.debug(f"Skipping local/private engine: {name}")
266 continue
267 elif not engine_class.is_public and self.allow_local_engines:
268 logger.warning(
269 f"Including local/private engine {name} - data may be exposed"
270 )
271 else:
272 # No is_public flag - assume it's private/local for safety
273 if not self.allow_local_engines:
274 logger.debug(
275 f"Skipping engine {name} - no is_public flag and local engines not allowed"
276 )
277 continue
279 # Apply scientific mode filtering if enabled
280 if self.search_mode == SearchMode.SCIENTIFIC:
281 # In scientific mode: include scientific engines AND generic engines
282 # Exclude non-scientific specialized engines (like Guardian, Wayback)
283 is_scientific = getattr(engine_class, "is_scientific", False)
284 is_generic = getattr(engine_class, "is_generic", False)
286 if not (is_scientific or is_generic):
287 logger.debug(
288 f"Skipping {name} in scientific mode (not scientific or generic)"
289 )
290 continue
292 logger.debug(
293 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})"
294 )
295 # Skip engines that require API keys if we don't want to use them
296 if (
297 config_.get("requires_api_key", False)
298 and not self.use_api_key_services
299 ):
300 logger.debug(
301 f"Skipping {name} - requires API key and use_api_key_services is False"
302 )
303 continue
305 # Check API key availability
306 if not self._check_api_key_availability(name, config_):
307 continue
309 available.append(name)
311 return available
313 def _get_available_generic_engines(self) -> List[str]:
314 """Get list of available generic search engines that pass API key checks"""
315 generic_engines = []
316 config_data = self._get_search_config()
318 for name, config_ in config_data.items():
319 # Skip if not in available engines (already filtered for API keys etc)
320 if name not in self.available_engines:
321 continue
323 # Load the engine class to check is_generic flag
324 success, engine_class, error_msg = (
325 BaseSearchEngine._load_engine_class(name, config_)
326 )
328 if not success:
329 logger.debug(
330 f"Could not check if {name} is generic: {error_msg}"
331 )
332 continue
334 # Check if engine is generic
335 if getattr(engine_class, "is_generic", False):
336 generic_engines.append(name)
337 logger.debug(f"Found generic engine: {name}")
339 return generic_engines
341 def select_engines(self, query: str) -> List[str]:
342 """
343 Use LLM to select appropriate search engines based only on names.
345 Args:
346 query: The search query
348 Returns:
349 List of selected engine names
350 """
351 if not self.llm or not self.available_engines:
352 logger.warning(
353 "No LLM or no available engines, using all available"
354 )
355 return self.available_engines[: self.max_engines_to_select]
357 try:
358 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added)
359 engines_for_selection = self.available_engines.copy()
360 generic_engines = []
362 if self.include_generic_engines:
363 generic_engines = self._get_available_generic_engines()
364 # Remove generic engines from selection since they'll be added automatically
365 engines_for_selection = [
366 e for e in engines_for_selection if e not in generic_engines
367 ]
368 logger.debug(
369 f"Excluding generic engines from LLM selection: {generic_engines}"
370 )
372 # If no specialized engines available, just return the generic ones
373 if not engines_for_selection:
374 logger.info(
375 f"No specialized engines available, using generic engines: {generic_engines}"
376 )
377 return generic_engines
379 # Create a simple prompt with just non-generic engine names
380 engine_list = "\n".join(
381 [
382 f"[{i}] {name}"
383 for i, name in enumerate(engines_for_selection)
384 ]
385 )
387 logger.debug(f"Engines for LLM selection: {engines_for_selection}")
389 prompt = f"""Query: {query}
391Available search engines:
392{engine_list}
394Select the most appropriate search engines for this query. Return ONLY a JSON array of indices.
395Example: [0, 2, 5]
397Select up to {self.max_engines_to_select} engines that would best answer this query."""
399 logger.debug("Sending prompt to LLM for engine selection")
400 # Get LLM response
401 response = self.llm.invoke(prompt)
403 # Handle different response formats
404 if hasattr(response, "content"):
405 content = response.content.strip()
406 else:
407 content = str(response).strip()
409 # Extract JSON array
410 start_idx = content.find("[")
411 end_idx = content.rfind("]")
413 if start_idx >= 0 and end_idx > start_idx:
414 array_text = content[start_idx : end_idx + 1]
415 indices = json.loads(array_text)
417 # Convert indices to engine names (from the filtered list)
418 selected_engines = []
419 for idx in indices:
420 if isinstance(idx, int) and 0 <= idx < len(
421 engines_for_selection
422 ):
423 selected_engines.append(engines_for_selection[idx])
425 if selected_engines:
426 # Add generic search engines (they were excluded from selection)
427 if self.include_generic_engines:
428 for engine in generic_engines:
429 if engine not in selected_engines:
430 selected_engines.append(engine)
431 logger.debug(f"Added generic engine: {engine}")
433 logger.info(f"Final selected engines: {selected_engines}")
434 return selected_engines
436 # Fallback if parsing fails - return generic engines plus some specialized ones
437 logger.warning(
438 "Failed to parse LLM response, using generic engines + top specialized"
439 )
440 result = (
441 generic_engines.copy() if self.include_generic_engines else []
442 )
443 for engine in self.available_engines[: self.max_engines_to_select]:
444 if engine not in result:
445 result.append(engine)
446 return result
448 except Exception:
449 logger.exception("Error selecting engines with LLM")
450 # Fallback to using generic engines + available engines
451 if self.include_generic_engines:
452 generic_engines = self._get_available_generic_engines()
453 result = generic_engines.copy()
454 for engine in self.available_engines[
455 : self.max_engines_to_select
456 ]:
457 if engine not in result:
458 result.append(engine)
459 return result
460 return self.available_engines[: self.max_engines_to_select]
462 def _get_engine_instance(
463 self, engine_name: str
464 ) -> Optional[BaseSearchEngine]:
465 """Get or create an instance of the specified search engine"""
466 with self.cache_lock:
467 # Return cached instance if available
468 if engine_name in self.engine_cache:
469 return self.engine_cache[engine_name]
471 # Create a new instance
472 engine = None
473 try:
474 # Only pass parameters that all engines accept
475 common_params = {
476 "llm": self.llm,
477 "max_results": self.max_results,
478 }
480 # Add max_filtered_results if specified
481 if self.max_filtered_results is not None:
482 common_params["max_filtered_results"] = (
483 self.max_filtered_results
484 )
486 engine = create_search_engine(
487 engine_name,
488 settings_snapshot=self.settings_snapshot,
489 programmatic_mode=self.programmatic_mode,
490 **common_params,
491 )
493 # Individual engines use their auto-detected filtering settings
494 # The factory enables LLM filtering for scientific engines (arXiv, etc.)
495 # and disables it for generic engines (Google, Brave, etc.)
496 except Exception:
497 logger.exception(
498 f"Error creating engine instance for {engine_name}"
499 )
500 return None
502 if engine:
503 # Cache the instance
504 self.engine_cache[engine_name] = engine
506 return engine
508 @preserve_research_context
509 def _execute_single_engine(
510 self, engine_name: str, query: str
511 ) -> Dict[str, Any]:
512 """
513 Execute a single search engine and return results.
515 Note: This method is decorated with @preserve_research_context to ensure
516 rate limiting context is properly propagated to child threads.
518 Args:
519 engine_name: Name of the engine to execute
520 query: The search query
522 Returns:
523 Dictionary with engine name and results or error
524 """
525 logger.info(f"Executing search on {engine_name}")
527 engine = self._get_engine_instance(engine_name)
528 if not engine:
529 return {
530 "engine": engine_name,
531 "success": False,
532 "error": f"Failed to initialize {engine_name}",
533 "results": [],
534 }
536 try:
537 # Run the engine properly through its run() method
538 # This ensures proper filter application, context propagation, etc.
539 results = engine.run(query)
541 if results and len(results) > 0:
542 logger.info(f"Got {len(results)} results from {engine_name}")
543 return {
544 "engine": engine_name,
545 "success": True,
546 "results": results,
547 "count": len(results),
548 }
549 else:
550 return {
551 "engine": engine_name,
552 "success": False,
553 "error": "No results",
554 "results": [],
555 }
557 except Exception:
558 logger.exception(f"Error executing {engine_name}")
559 return {
560 "engine": engine_name,
561 "success": False,
562 "error": f"Engine {engine_name} failed",
563 "results": [],
564 }
566 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
567 """
568 Get preview information by executing selected engines in parallel.
570 Args:
571 query: The search query
573 Returns:
574 Combined list of preview dictionaries from all successful engines
575 """
576 # Select engines for this query
577 selected_engines = self.select_engines(query)
579 if not selected_engines:
580 logger.warning("No engines selected")
581 return []
583 logger.info(
584 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}"
585 )
587 # Emit socket event about selected engines
588 try:
589 SocketIOService().emit_socket_event(
590 "parallel_search_started",
591 {"engines": selected_engines, "query": query},
592 )
593 except Exception:
594 logger.exception("Socket emit error (non-critical)")
596 # Execute all engines in parallel using persistent thread pool
597 all_results = []
598 engine_results = {}
600 # Get the global thread pool
601 executor = _get_global_executor()
602 if executor is None:
603 logger.error(
604 "Global thread pool not available, cannot execute parallel search"
605 )
606 return []
608 # Submit all tasks to the global executor
609 future_to_engine = {
610 executor.submit(
611 self._execute_single_engine, engine_name, query
612 ): engine_name
613 for engine_name in selected_engines
614 }
616 # Collect results as they complete
617 for future in concurrent.futures.as_completed(future_to_engine):
618 engine_name = future_to_engine[future]
619 try:
620 result = future.result()
621 engine_results[engine_name] = result
623 if result["success"]:
624 # Add source information to each result
625 for item in result["results"]:
626 item["search_engine"] = engine_name
627 all_results.extend(result["results"])
629 # Emit success event
630 try:
631 SocketIOService().emit_socket_event(
632 "engine_completed",
633 {
634 "engine": engine_name,
635 "success": True,
636 "count": result["count"],
637 },
638 )
639 except Exception:
640 logger.debug("Socket emit error (non-critical)")
641 else:
642 # Emit failure event
643 try:
644 SocketIOService().emit_socket_event(
645 "engine_completed",
646 {
647 "engine": engine_name,
648 "success": False,
649 "error": result.get("error", "Unknown error"),
650 },
651 )
652 except Exception:
653 logger.debug("Socket emit error (non-critical)")
655 except Exception:
656 logger.exception(f"Failed to get result from {engine_name}")
657 engine_results[engine_name] = {
658 "engine": engine_name,
659 "success": False,
660 "error": "Search execution failed",
661 "results": [],
662 }
664 # Log summary
665 successful_engines = [
666 name for name, res in engine_results.items() if res["success"]
667 ]
668 failed_engines = [
669 name for name, res in engine_results.items() if not res["success"]
670 ]
672 logger.info(
673 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed"
674 )
675 if successful_engines:
676 logger.info(f"Successful engines: {', '.join(successful_engines)}")
677 if failed_engines:
678 logger.info(f"Failed engines: {', '.join(failed_engines)}")
680 # Log sample result to understand structure
681 if all_results:
682 logger.debug(
683 f"Sample result keys from first result: {list(all_results[0].keys())}"
684 )
685 logger.debug(f"Sample result: {str(all_results[0])[:500]}")
687 logger.info(f"Total results from all engines: {len(all_results)}")
689 # Store the engine results for potential use in _get_full_content
690 self._engine_results = engine_results
691 self._successful_engines = successful_engines
693 return all_results
695 def _get_full_content(
696 self, relevant_items: List[Dict[str, Any]]
697 ) -> List[Dict[str, Any]]:
698 """
699 Get full content for the relevant items.
700 Since we ran multiple engines, we'll group items by their source engine
701 and get full content from each engine for its own results.
703 Args:
704 relevant_items: List of relevant preview dictionaries
706 Returns:
707 List of result dictionaries with full content
708 """
709 # Check if we should get full content
710 if get_setting_from_snapshot(
711 "search.snippets_only",
712 True,
713 settings_snapshot=self.settings_snapshot,
714 ):
715 logger.info("Snippet-only mode, skipping full content retrieval")
716 return relevant_items
718 logger.info("Getting full content for relevant items")
720 # Group items by their source engine
721 items_by_engine = {}
722 for item in relevant_items:
723 engine_name = item.get("search_engine")
724 if engine_name:
725 if engine_name not in items_by_engine:
726 items_by_engine[engine_name] = []
727 items_by_engine[engine_name].append(item)
729 # Get full content from each engine for its items
730 all_full_content = []
732 for engine_name, items in items_by_engine.items():
733 engine = self._get_engine_instance(engine_name)
734 if engine:
735 try:
736 logger.info(
737 f"Getting full content from {engine_name} for {len(items)} items"
738 )
739 full_content = engine._get_full_content(items)
740 all_full_content.extend(full_content)
741 except Exception:
742 logger.exception(
743 f"Error getting full content from {engine_name}"
744 )
745 # Fall back to returning items without full content
746 all_full_content.extend(items)
747 else:
748 # No engine available, return items as-is
749 all_full_content.extend(items)
751 return all_full_content
753 def invoke(self, query: str) -> List[Dict[str, Any]]:
754 """Compatibility method for LangChain tools"""
755 return self.run(query)
757 # Note: No shutdown() or context manager methods needed
758 # The global thread pool is automatically cleaned up at process exit via atexit