Coverage for src / local_deep_research / web_search_engines / engines / parallel_search_engine.py: 95%
263 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import os
2import concurrent.futures
3from typing import Any, Dict, List, Optional
4from threading import Lock
5import atexit
7from loguru import logger
9from ...config.search_config import get_setting_from_snapshot
10from ...utilities.enums import SearchMode
11from ...utilities.json_utils import extract_json, get_llm_response_text
12from ...utilities.thread_context import preserve_research_context
13from ...web.services.socket_service import SocketIOService
14from ..search_engine_base import BaseSearchEngine
15from ..search_engine_factory import create_search_engine
16from ..search_engines_config import get_available_engines
18# Global thread pool shared by all ParallelSearchEngine instances
19# This prevents creating multiple thread pools and having more threads than expected
20_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
21_global_executor_lock = Lock()
24def _get_global_executor(
25 max_workers: Optional[int] = None,
26) -> Optional[concurrent.futures.ThreadPoolExecutor]:
27 """
28 Get or initialize the global thread pool executor.
29 Thread-safe lazy initialization ensures only one pool is created.
31 Args:
32 max_workers: Number of worker threads. If None, uses Python's recommended
33 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
34 Only used on first initialization; subsequent calls ignore this parameter.
36 Returns:
37 The global ThreadPoolExecutor instance, or None if initialization fails
38 """
39 global _global_executor
41 with _global_executor_lock:
42 if _global_executor is None:
43 if max_workers is None:
44 max_workers = min(32, (os.cpu_count() or 1) + 4)
46 try:
47 _global_executor = concurrent.futures.ThreadPoolExecutor(
48 max_workers=max_workers,
49 thread_name_prefix="parallel_search_",
50 )
51 logger.info(
52 f"Initialized global ThreadPool with {max_workers} workers "
53 f"(shared by all ParallelSearchEngine instances)"
54 )
55 except Exception:
56 logger.exception(
57 "Failed to create global ThreadPoolExecutor, parallel search will not work"
58 )
59 return None
61 return _global_executor
64def shutdown_global_executor(wait: bool = True):
65 """
66 Shutdown the global thread pool executor.
68 This is called automatically at process exit via atexit.
69 After calling this, any new ParallelSearchEngine instances will create a new pool.
71 Args:
72 wait: If True, wait for all threads to complete before returning
73 """
74 global _global_executor
76 with _global_executor_lock:
77 if _global_executor is not None:
78 try:
79 _global_executor.shutdown(wait=wait)
80 logger.info("Global ThreadPool shutdown complete")
81 except Exception:
82 logger.exception("Error shutting down global ThreadPool")
83 finally:
84 _global_executor = None
87# Register automatic cleanup at process exit
88atexit.register(lambda: shutdown_global_executor(wait=True))
91class ParallelSearchEngine(BaseSearchEngine):
92 """
93 Parallel search engine that executes multiple search engines simultaneously.
94 Uses LLM to select appropriate engines based on query, then runs them all in parallel.
96 Thread Pool Management:
97 All instances share a single global thread pool to prevent thread proliferation.
98 The pool is automatically cleaned up at process exit.
100 Usage:
101 engine = ParallelSearchEngine(llm)
102 results = engine.run("query")
103 # No manual cleanup needed - global pool is shared and cleaned up automatically
104 """
106 def __init__(
107 self,
108 llm,
109 max_results: int = 10,
110 use_api_key_services: bool = True,
111 max_engines_to_select: int = 100, # Allow selecting all available engines
112 allow_local_engines: bool = False, # Disabled by default for privacy
113 include_generic_engines: bool = True, # Always include generic search engines
114 search_mode: SearchMode = SearchMode.ALL,
115 max_filtered_results: Optional[int] = None,
116 settings_snapshot: Optional[Dict[str, Any]] = None,
117 programmatic_mode: bool = False,
118 max_workers: Optional[
119 int
120 ] = None, # Thread pool size (None = auto-detect)
121 **kwargs,
122 ):
123 """
124 Initialize the parallel search engine.
126 All instances share a global thread pool. The first instance created will
127 initialize the pool with the specified max_workers (or auto-detected value).
128 Subsequent instances reuse the existing pool.
130 Args:
131 llm: Language model instance for query classification
132 max_results: Maximum number of search results to return per engine
133 use_api_key_services: Whether to include services that require API keys
134 max_engines_to_select: Maximum number of engines to select for parallel execution
135 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web)
136 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.)
137 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only
138 max_filtered_results: Maximum number of results to keep after filtering
139 settings_snapshot: Settings snapshot for thread context
140 programmatic_mode: If True, disables database operations and metrics tracking
141 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's
142 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
143 Ignored if the global pool is already initialized.
144 **kwargs: Additional parameters (ignored but accepted for compatibility)
145 """
146 # Override max_filtered_results to be much higher for parallel search
147 # If not explicitly set, use 50 instead of the default 5-20
148 if max_filtered_results is None:
149 max_filtered_results = 50
150 logger.info(
151 f"Setting max_filtered_results to {max_filtered_results} for parallel search"
152 )
154 super().__init__(
155 llm=llm,
156 max_filtered_results=max_filtered_results,
157 max_results=max_results,
158 settings_snapshot=settings_snapshot,
159 programmatic_mode=programmatic_mode,
160 )
162 self.use_api_key_services = use_api_key_services
163 self.max_engines_to_select = max_engines_to_select
164 self.allow_local_engines = allow_local_engines
165 self.include_generic_engines = include_generic_engines
166 self.search_mode = search_mode
167 self.settings_snapshot = settings_snapshot or {}
169 # Disable LLM relevance filtering at the parallel level by default
170 # Individual engines can still filter their own results
171 # Double filtering (engines + parallel) is too aggressive
172 self.enable_llm_relevance_filter = kwargs.get(
173 "enable_llm_relevance_filter", False
174 )
176 # Cache for engine instances
177 self.engine_cache: Dict[str, Any] = {}
178 self.cache_lock = Lock()
180 # Initialize global thread pool (thread-safe, only happens once)
181 # All instances share the same pool to prevent thread proliferation
182 _get_global_executor(max_workers)
184 # Get available engines (excluding 'meta', 'auto', and 'parallel')
185 self.available_engines = self._get_available_engines()
186 logger.info(
187 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}"
188 )
190 def _get_search_config(self) -> Dict[str, Any]:
191 """Get search config for available engines."""
192 return get_available_engines(
193 settings_snapshot=self.settings_snapshot,
194 use_api_key_services=self.use_api_key_services,
195 )
197 def _get_available_engines(self) -> List[str]:
198 """Get list of available engines, applying parallel-specific filters
199 (is_public, search mode) on top of the shared base filter."""
200 base_available = get_available_engines(
201 settings_snapshot=self.settings_snapshot,
202 use_api_key_services=self.use_api_key_services,
203 )
205 available = []
206 for name, config_ in base_available.items():
207 # Try to get the engine class to check is_public flag
208 success, engine_class, error_msg = (
209 BaseSearchEngine._load_engine_class(name, config_)
210 )
212 if not success: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 logger.debug(error_msg)
214 continue
216 # Check if engine is public or if local engines are allowed
217 if hasattr(engine_class, "is_public"):
218 if not engine_class.is_public and not self.allow_local_engines:
219 logger.debug(f"Skipping local/private engine: {name}")
220 continue
221 if not engine_class.is_public and self.allow_local_engines:
222 logger.warning(
223 f"Including local/private engine {name} - data may be exposed"
224 )
225 else:
226 # No is_public flag - assume it's private/local for safety
227 if not self.allow_local_engines:
228 logger.debug(
229 f"Skipping engine {name} - no is_public flag and local engines not allowed"
230 )
231 continue
233 # Apply scientific mode filtering if enabled
234 if self.search_mode == SearchMode.SCIENTIFIC:
235 is_scientific = getattr(engine_class, "is_scientific", False)
236 is_generic = getattr(engine_class, "is_generic", False)
238 if not (is_scientific or is_generic):
239 logger.debug(
240 f"Skipping {name} in scientific mode (not scientific or generic)"
241 )
242 continue
244 logger.debug(
245 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})"
246 )
248 available.append(name)
250 return available
252 def _get_available_generic_engines(self) -> List[str]:
253 """Get list of available generic search engines that pass API key checks"""
254 generic_engines = []
255 config_data = self._get_search_config()
257 for name, config_ in config_data.items():
258 # Skip if not in available engines (already filtered for API keys etc)
259 if name not in self.available_engines:
260 continue
262 # Load the engine class to check is_generic flag
263 success, engine_class, error_msg = (
264 BaseSearchEngine._load_engine_class(name, config_)
265 )
267 if not success:
268 logger.debug(
269 f"Could not check if {name} is generic: {error_msg}"
270 )
271 continue
273 # Check if engine is generic
274 if getattr(engine_class, "is_generic", False):
275 generic_engines.append(name)
276 logger.debug(f"Found generic engine: {name}")
278 return generic_engines
280 def select_engines(self, query: str) -> List[str]:
281 """
282 Use LLM to select appropriate search engines based only on names.
284 Args:
285 query: The search query
287 Returns:
288 List of selected engine names
289 """
290 if not self.llm or not self.available_engines:
291 logger.warning(
292 "No LLM or no available engines, using all available"
293 )
294 return self.available_engines[: self.max_engines_to_select]
296 try:
297 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added)
298 engines_for_selection = self.available_engines.copy()
299 generic_engines = []
301 if self.include_generic_engines:
302 generic_engines = self._get_available_generic_engines()
303 # Remove generic engines from selection since they'll be added automatically
304 engines_for_selection = [
305 e for e in engines_for_selection if e not in generic_engines
306 ]
307 logger.debug(
308 f"Excluding generic engines from LLM selection: {generic_engines}"
309 )
311 # If no specialized engines available, just return the generic ones
312 if not engines_for_selection:
313 logger.info(
314 f"No specialized engines available, using generic engines: {generic_engines}"
315 )
316 return generic_engines
318 # Create a simple prompt with just non-generic engine names
319 engine_list = "\n".join(
320 [
321 f"[{i}] {name}"
322 for i, name in enumerate(engines_for_selection)
323 ]
324 )
326 logger.debug(f"Engines for LLM selection: {engines_for_selection}")
328 prompt = f"""Query: {query}
330Available search engines:
331{engine_list}
333Select the most appropriate search engines for this query. Return ONLY a JSON array of indices.
334Example: [0, 2, 5]
336Select up to {self.max_engines_to_select} engines that would best answer this query."""
338 logger.debug("Sending prompt to LLM for engine selection")
339 # Get LLM response
340 response = self.llm.invoke(prompt)
341 content = get_llm_response_text(response)
343 indices = extract_json(content, expected_type=list)
345 if indices is not None:
346 # Convert indices to engine names (from the filtered list)
347 selected_engines = []
348 for idx in indices:
349 if isinstance(idx, int) and 0 <= idx < len(
350 engines_for_selection
351 ):
352 selected_engines.append(engines_for_selection[idx])
354 if selected_engines: 354 ↛ 366line 354 didn't jump to line 366 because the condition on line 354 was always true
355 # Add generic search engines (they were excluded from selection)
356 if self.include_generic_engines:
357 for engine in generic_engines:
358 if engine not in selected_engines: 358 ↛ 357line 358 didn't jump to line 357 because the condition on line 358 was always true
359 selected_engines.append(engine)
360 logger.debug(f"Added generic engine: {engine}")
362 logger.info(f"Final selected engines: {selected_engines}")
363 return selected_engines
365 # Fallback if parsing fails - return generic engines plus some specialized ones
366 logger.warning(
367 "Failed to parse LLM response, using generic engines + top specialized"
368 )
369 result = (
370 generic_engines.copy() if self.include_generic_engines else []
371 )
372 for engine in self.available_engines[: self.max_engines_to_select]:
373 if engine not in result:
374 result.append(engine)
375 return result
377 except Exception:
378 logger.exception("Error selecting engines with LLM")
379 # Fallback to using generic engines + available engines
380 if self.include_generic_engines:
381 generic_engines = self._get_available_generic_engines()
382 result = generic_engines.copy()
383 for engine in self.available_engines[
384 : self.max_engines_to_select
385 ]:
386 if engine not in result: 386 ↛ 383line 386 didn't jump to line 383 because the condition on line 386 was always true
387 result.append(engine)
388 return result
389 return self.available_engines[: self.max_engines_to_select]
391 def _get_engine_instance(
392 self, engine_name: str
393 ) -> Optional[BaseSearchEngine]:
394 """Get or create an instance of the specified search engine"""
395 with self.cache_lock:
396 # Return cached instance if available
397 if engine_name in self.engine_cache:
398 return self.engine_cache[engine_name] # type: ignore[no-any-return]
400 # Create a new instance
401 engine = None
402 try:
403 # Only pass parameters that all engines accept
404 common_params = {
405 "llm": self.llm,
406 "max_results": self.max_results,
407 }
409 # Add max_filtered_results if specified
410 if self.max_filtered_results is not None: 410 ↛ 415line 410 didn't jump to line 415 because the condition on line 410 was always true
411 common_params["max_filtered_results"] = (
412 self.max_filtered_results
413 )
415 engine = create_search_engine(
416 engine_name,
417 settings_snapshot=self.settings_snapshot,
418 programmatic_mode=self.programmatic_mode,
419 **common_params, # type: ignore[arg-type]
420 )
422 # Individual engines use their auto-detected filtering settings
423 # The factory enables LLM filtering for scientific engines (arXiv, etc.)
424 # and disables it for generic engines (Google, Brave, etc.)
425 except Exception:
426 logger.exception(
427 f"Error creating engine instance for {engine_name}"
428 )
429 return None
431 if engine: 431 ↛ 435line 431 didn't jump to line 435 because the condition on line 431 was always true
432 # Cache the instance
433 self.engine_cache[engine_name] = engine
435 return engine
437 @preserve_research_context
438 def _execute_single_engine(
439 self, engine_name: str, query: str
440 ) -> Dict[str, Any]:
441 """
442 Execute a single search engine and return results.
444 Note: This method is decorated with @preserve_research_context to ensure
445 rate limiting context is properly propagated to child threads.
447 Args:
448 engine_name: Name of the engine to execute
449 query: The search query
451 Returns:
452 Dictionary with engine name and results or error
453 """
454 logger.info(f"Executing search on {engine_name}")
456 engine = self._get_engine_instance(engine_name)
457 if not engine:
458 return {
459 "engine": engine_name,
460 "success": False,
461 "error": f"Failed to initialize {engine_name}",
462 "results": [],
463 }
465 try:
466 # Run the engine properly through its run() method
467 # This ensures proper filter application, context propagation, etc.
468 results = engine.run(query)
470 if results and len(results) > 0:
471 logger.info(f"Got {len(results)} results from {engine_name}")
472 return {
473 "engine": engine_name,
474 "success": True,
475 "results": results,
476 "count": len(results),
477 }
478 return {
479 "engine": engine_name,
480 "success": False,
481 "error": "No results",
482 "results": [],
483 }
485 except Exception:
486 logger.exception(f"Error executing {engine_name}")
487 return {
488 "engine": engine_name,
489 "success": False,
490 "error": f"Engine {engine_name} failed",
491 "results": [],
492 }
494 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
495 """
496 Get preview information by executing selected engines in parallel.
498 Args:
499 query: The search query
501 Returns:
502 Combined list of preview dictionaries from all successful engines
503 """
504 # Select engines for this query
505 selected_engines = self.select_engines(query)
507 if not selected_engines:
508 logger.warning("No engines selected")
509 return []
511 logger.info(
512 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}"
513 )
515 # Emit socket event about selected engines
516 try:
517 SocketIOService().emit_socket_event(
518 "parallel_search_started",
519 {"engines": selected_engines, "query": query},
520 )
521 except Exception:
522 logger.exception("Socket emit error (non-critical)")
524 # Execute all engines in parallel using persistent thread pool
525 all_results = []
526 engine_results = {}
528 # Get the global thread pool
529 executor = _get_global_executor()
530 if executor is None:
531 logger.error(
532 "Global thread pool not available, cannot execute parallel search"
533 )
534 return []
536 # Submit all tasks to the global executor
537 future_to_engine = {
538 executor.submit(
539 self._execute_single_engine, engine_name, query
540 ): engine_name
541 for engine_name in selected_engines
542 }
544 # Collect results as they complete
545 for future in concurrent.futures.as_completed(future_to_engine):
546 engine_name = future_to_engine[future]
547 try:
548 result = future.result()
549 engine_results[engine_name] = result
551 if result["success"]:
552 # Add source information to each result
553 for item in result["results"]:
554 item["search_engine"] = engine_name
555 all_results.extend(result["results"])
557 # Emit success event
558 try:
559 SocketIOService().emit_socket_event(
560 "engine_completed",
561 {
562 "engine": engine_name,
563 "success": True,
564 "count": result["count"],
565 },
566 )
567 except Exception:
568 logger.debug("Socket emit error (non-critical)")
569 else:
570 # Emit failure event
571 try:
572 SocketIOService().emit_socket_event(
573 "engine_completed",
574 {
575 "engine": engine_name,
576 "success": False,
577 "error": result.get("error", "Unknown error"),
578 },
579 )
580 except Exception:
581 logger.debug("Socket emit error (non-critical)")
583 except Exception:
584 logger.exception(f"Failed to get result from {engine_name}")
585 engine_results[engine_name] = {
586 "engine": engine_name,
587 "success": False,
588 "error": "Search execution failed",
589 "results": [],
590 }
592 # Log summary
593 successful_engines = [
594 name for name, res in engine_results.items() if res["success"]
595 ]
596 failed_engines = [
597 name for name, res in engine_results.items() if not res["success"]
598 ]
600 logger.info(
601 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed"
602 )
603 if successful_engines:
604 logger.info(f"Successful engines: {', '.join(successful_engines)}")
605 if failed_engines:
606 logger.info(f"Failed engines: {', '.join(failed_engines)}")
608 # Log sample result to understand structure
609 if all_results:
610 logger.debug(
611 f"Sample result keys from first result: {list(all_results[0].keys())}"
612 )
613 logger.debug(f"Sample result: {str(all_results[0])[:500]}")
615 logger.info(f"Total results from all engines: {len(all_results)}")
617 # Store the engine results for potential use in _get_full_content
618 self._engine_results = engine_results
619 self._successful_engines = successful_engines
621 return all_results
623 def _get_full_content(
624 self, relevant_items: List[Dict[str, Any]]
625 ) -> List[Dict[str, Any]]:
626 """
627 Get full content for the relevant items.
628 Since we ran multiple engines, we'll group items by their source engine
629 and get full content from each engine for its own results.
631 Args:
632 relevant_items: List of relevant preview dictionaries
634 Returns:
635 List of result dictionaries with full content
636 """
637 # Check if we should get full content
638 if get_setting_from_snapshot(
639 "search.snippets_only",
640 True,
641 settings_snapshot=self.settings_snapshot,
642 ):
643 logger.info("Snippet-only mode, skipping full content retrieval")
644 return relevant_items
646 logger.info("Getting full content for relevant items")
648 # Group items by their source engine
649 items_by_engine: Dict[str, List[Dict[str, Any]]] = {}
650 for item in relevant_items:
651 engine_name = item.get("search_engine")
652 if engine_name:
653 if engine_name not in items_by_engine: 653 ↛ 655line 653 didn't jump to line 655 because the condition on line 653 was always true
654 items_by_engine[engine_name] = []
655 items_by_engine[engine_name].append(item)
657 # Get full content from each engine for its items
658 all_full_content = []
660 for engine_name, items in items_by_engine.items():
661 engine = self._get_engine_instance(engine_name)
662 if engine: 662 ↛ 677line 662 didn't jump to line 677 because the condition on line 662 was always true
663 try:
664 logger.info(
665 f"Getting full content from {engine_name} for {len(items)} items"
666 )
667 full_content = engine._get_full_content(items)
668 all_full_content.extend(full_content)
669 except Exception:
670 logger.exception(
671 f"Error getting full content from {engine_name}"
672 )
673 # Fall back to returning items without full content
674 all_full_content.extend(items)
675 else:
676 # No engine available, return items as-is
677 all_full_content.extend(items)
679 return all_full_content
681 def close(self):
682 """Close all cached child search engines and own resources."""
683 from ...utilities.resource_utils import safe_close
685 for engine in self.engine_cache.values():
686 safe_close(engine, "child search engine")
687 self.engine_cache.clear()
688 super().close()
690 def invoke(self, query: str) -> List[Dict[str, Any]]:
691 """Compatibility method for LangChain tools"""
692 return self.run(query)
694 # Note: No shutdown() or context manager methods needed
695 # The global thread pool is automatically cleaned up at process exit via atexit