Coverage for src/local_deep_research/web_search_engines/engines/parallel_search_engine.py: 96%
274 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import os
2import concurrent.futures
3from typing import Any, Dict, List, Optional
4from threading import Lock
5import atexit
7from loguru import logger
9from ...config.search_config import get_setting_from_snapshot
10from ...database.thread_local_session import cleanup_current_thread
11from ...utilities.enums import SearchMode
12from ...utilities.json_utils import extract_json, get_llm_response_text
13from ...utilities.thread_context import (
14 clear_search_context,
15 get_search_context,
16 set_search_context,
17)
18from ...web.services.socket_service import SocketIOService
19from ..search_engine_base import BaseSearchEngine
20from ..search_engine_factory import create_search_engine
21from ..search_engines_config import get_available_engines
23# Global thread pool shared by all ParallelSearchEngine instances
24# This prevents creating multiple thread pools and having more threads than expected
25_global_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None
26_global_executor_lock = Lock()
29def _get_global_executor(
30 max_workers: Optional[int] = None,
31) -> Optional[concurrent.futures.ThreadPoolExecutor]:
32 """
33 Get or initialize the global thread pool executor.
34 Thread-safe lazy initialization ensures only one pool is created.
36 Args:
37 max_workers: Number of worker threads. If None, uses Python's recommended
38 formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
39 Only used on first initialization; subsequent calls ignore this parameter.
41 Returns:
42 The global ThreadPoolExecutor instance, or None if initialization fails
43 """
44 global _global_executor
46 with _global_executor_lock:
47 if _global_executor is None:
48 if max_workers is None:
49 max_workers = min(32, (os.cpu_count() or 1) + 4)
51 try:
52 _global_executor = concurrent.futures.ThreadPoolExecutor(
53 max_workers=max_workers,
54 thread_name_prefix="parallel_search_",
55 )
56 logger.info(
57 f"Initialized global ThreadPool with {max_workers} workers "
58 f"(shared by all ParallelSearchEngine instances)"
59 )
60 except Exception:
61 logger.exception(
62 "Failed to create global ThreadPoolExecutor, parallel search will not work"
63 )
64 return None
66 return _global_executor
69def shutdown_global_executor(wait: bool = True):
70 """
71 Shutdown the global thread pool executor.
73 This is called automatically at process exit via atexit.
74 After calling this, any new ParallelSearchEngine instances will create a new pool.
76 Args:
77 wait: If True, wait for all threads to complete before returning
78 """
79 global _global_executor
81 with _global_executor_lock:
82 if _global_executor is not None:
83 try:
84 _global_executor.shutdown(wait=wait)
85 logger.info("Global ThreadPool shutdown complete")
86 except Exception:
87 logger.exception("Error shutting down global ThreadPool")
88 finally:
89 _global_executor = None
92# Register automatic cleanup at process exit
93atexit.register(lambda: shutdown_global_executor(wait=True))
96class ParallelSearchEngine(BaseSearchEngine):
97 """
98 Parallel search engine that executes multiple search engines simultaneously.
99 Uses LLM to select appropriate engines based on query, then runs them all in parallel.
101 Thread Pool Management:
102 All instances share a single global thread pool to prevent thread proliferation.
103 The pool is automatically cleaned up at process exit.
105 Usage:
106 engine = ParallelSearchEngine(llm)
107 results = engine.run("query")
108 # No manual cleanup needed - global pool is shared and cleaned up automatically
109 """
111 def __init__(
112 self,
113 llm,
114 max_results: int = 10,
115 use_api_key_services: bool = True,
116 max_engines_to_select: int = 100, # Allow selecting all available engines
117 allow_local_engines: bool = False, # Disabled by default for privacy
118 include_generic_engines: bool = True, # Always include generic search engines
119 search_mode: SearchMode = SearchMode.ALL,
120 max_filtered_results: Optional[int] = None,
121 settings_snapshot: Optional[Dict[str, Any]] = None,
122 programmatic_mode: bool = False,
123 max_workers: Optional[
124 int
125 ] = None, # Thread pool size (None = auto-detect)
126 **kwargs,
127 ):
128 """
129 Initialize the parallel search engine.
131 All instances share a global thread pool. The first instance created will
132 initialize the pool with the specified max_workers (or auto-detected value).
133 Subsequent instances reuse the existing pool.
135 Args:
136 llm: Language model instance for query classification
137 max_results: Maximum number of search results to return per engine
138 use_api_key_services: Whether to include services that require API keys
139 max_engines_to_select: Maximum number of engines to select for parallel execution
140 allow_local_engines: Whether to include local/private engines (WARNING: May expose personal data to web)
141 include_generic_engines: Always include generic search engines (searxng, brave, ddg, etc.)
142 search_mode: SearchMode enum value - ALL for all engines, SCIENTIFIC for scientific + generic engines only
143 max_filtered_results: Maximum number of results to keep after filtering
144 settings_snapshot: Settings snapshot for thread context
145 programmatic_mode: If True, disables database operations and metrics tracking
146 max_workers: Thread pool size for the FIRST instance only. If None, uses Python's
147 recommended formula: min(32, (os.cpu_count() or 1) + 4) for I/O-bound operations.
148 Ignored if the global pool is already initialized.
149 **kwargs: Additional parameters (ignored but accepted for compatibility)
150 """
151 # Parallel search aggregates results from multiple engines, so
152 # it runs with a higher post-filter cap than a single-engine
153 # search would get from DEFAULT_MAX_FILTERED_RESULTS.
154 if max_filtered_results is None:
155 max_filtered_results = 50
156 logger.info(
157 f"Setting max_filtered_results to {max_filtered_results} for parallel search"
158 )
160 super().__init__(
161 llm=llm,
162 max_filtered_results=max_filtered_results,
163 max_results=max_results,
164 settings_snapshot=settings_snapshot,
165 programmatic_mode=programmatic_mode,
166 )
168 self.use_api_key_services = use_api_key_services
169 self.max_engines_to_select = max_engines_to_select
170 self.allow_local_engines = allow_local_engines
171 self.include_generic_engines = include_generic_engines
172 self.search_mode = search_mode
173 self.settings_snapshot = settings_snapshot or {}
175 # Disable LLM relevance filtering at the parallel level by default
176 # Individual engines can still filter their own results
177 # Double filtering (engines + parallel) is too aggressive
178 self.enable_llm_relevance_filter = kwargs.get(
179 "enable_llm_relevance_filter", False
180 )
182 # Cache for engine instances
183 self.engine_cache: Dict[str, Any] = {}
184 self.cache_lock = Lock()
186 # Initialize global thread pool (thread-safe, only happens once)
187 # All instances share the same pool to prevent thread proliferation
188 _get_global_executor(max_workers)
190 # Get available engines (excluding 'meta', 'auto', and 'parallel')
191 self.available_engines = self._get_available_engines()
192 logger.info(
193 f"Parallel Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}"
194 )
196 def _get_search_config(self) -> Dict[str, Any]:
197 """Get search config for available engines."""
198 return get_available_engines(
199 settings_snapshot=self.settings_snapshot,
200 use_api_key_services=self.use_api_key_services,
201 )
203 def _get_available_engines(self) -> List[str]:
204 """Get list of available engines, applying parallel-specific filters
205 (is_public, search mode) on top of the shared base filter."""
206 base_available = get_available_engines(
207 settings_snapshot=self.settings_snapshot,
208 use_api_key_services=self.use_api_key_services,
209 )
211 available = []
212 for name, config_ in base_available.items():
213 # Try to get the engine class to check is_public flag
214 success, engine_class, error_msg = (
215 BaseSearchEngine._load_engine_class(name, config_)
216 )
218 if not success: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 logger.debug(error_msg)
220 continue
222 # Check if engine is public or if local engines are allowed
223 if hasattr(engine_class, "is_public"):
224 if not engine_class.is_public and not self.allow_local_engines:
225 logger.debug(f"Skipping local/private engine: {name}")
226 continue
227 if not engine_class.is_public and self.allow_local_engines:
228 logger.warning(
229 f"Including local/private engine {name} - data may be exposed"
230 )
231 else:
232 # No is_public flag - assume it's private/local for safety
233 if not self.allow_local_engines:
234 logger.debug(
235 f"Skipping engine {name} - no is_public flag and local engines not allowed"
236 )
237 continue
239 # Apply scientific mode filtering if enabled
240 if self.search_mode == SearchMode.SCIENTIFIC:
241 is_scientific = getattr(engine_class, "is_scientific", False)
242 is_generic = getattr(engine_class, "is_generic", False)
244 if not (is_scientific or is_generic):
245 logger.debug(
246 f"Skipping {name} in scientific mode (not scientific or generic)"
247 )
248 continue
250 logger.debug(
251 f"Including {name} in scientific mode (scientific={is_scientific}, generic={is_generic})"
252 )
254 available.append(name)
256 return available
258 def _get_available_generic_engines(self) -> List[str]:
259 """Get list of available generic search engines that pass API key checks"""
260 generic_engines = []
261 config_data = self._get_search_config()
263 for name, config_ in config_data.items():
264 # Skip if not in available engines (already filtered for API keys etc)
265 if name not in self.available_engines:
266 continue
268 # Load the engine class to check is_generic flag
269 success, engine_class, error_msg = (
270 BaseSearchEngine._load_engine_class(name, config_)
271 )
273 if not success:
274 logger.debug(
275 f"Could not check if {name} is generic: {error_msg}"
276 )
277 continue
279 # Check if engine is generic
280 if getattr(engine_class, "is_generic", False):
281 generic_engines.append(name)
282 logger.debug(f"Found generic engine: {name}")
284 return generic_engines
286 def select_engines(self, query: str) -> List[str]:
287 """
288 Use LLM to select appropriate search engines based only on names.
290 Args:
291 query: The search query
293 Returns:
294 List of selected engine names
295 """
296 if not self.llm or not self.available_engines:
297 logger.warning(
298 "No LLM or no available engines, using all available"
299 )
300 return self.available_engines[: self.max_engines_to_select]
302 try:
303 # Get list of engines for LLM to select from (exclude generic ones if they'll be auto-added)
304 engines_for_selection = self.available_engines.copy()
305 generic_engines = []
307 if self.include_generic_engines:
308 generic_engines = self._get_available_generic_engines()
309 # Remove generic engines from selection since they'll be added automatically
310 engines_for_selection = [
311 e for e in engines_for_selection if e not in generic_engines
312 ]
313 logger.debug(
314 f"Excluding generic engines from LLM selection: {generic_engines}"
315 )
317 # If no specialized engines available, just return the generic ones
318 if not engines_for_selection:
319 logger.info(
320 f"No specialized engines available, using generic engines: {generic_engines}"
321 )
322 return generic_engines
324 # Create a simple prompt with just non-generic engine names
325 engine_list = "\n".join(
326 [
327 f"[{i}] {name}"
328 for i, name in enumerate(engines_for_selection)
329 ]
330 )
332 logger.debug(f"Engines for LLM selection: {engines_for_selection}")
334 prompt = f"""Query: {query}
336Available search engines:
337{engine_list}
339Select the most appropriate search engines for this query. Return ONLY a JSON array of indices.
340Example: [0, 2, 5]
342Select up to {self.max_engines_to_select} engines that would best answer this query."""
344 logger.debug("Sending prompt to LLM for engine selection")
345 # Get LLM response
346 response = self.llm.invoke(prompt)
347 content = get_llm_response_text(response)
349 indices = extract_json(content, expected_type=list)
351 if indices is not None:
352 # Convert indices to engine names (from the filtered list)
353 selected_engines = []
354 for idx in indices:
355 if isinstance(idx, int) and 0 <= idx < len(
356 engines_for_selection
357 ):
358 selected_engines.append(engines_for_selection[idx])
360 if selected_engines: 360 ↛ 372line 360 didn't jump to line 372 because the condition on line 360 was always true
361 # Add generic search engines (they were excluded from selection)
362 if self.include_generic_engines:
363 for engine in generic_engines:
364 if engine not in selected_engines: 364 ↛ 363line 364 didn't jump to line 363 because the condition on line 364 was always true
365 selected_engines.append(engine)
366 logger.debug(f"Added generic engine: {engine}")
368 logger.info(f"Final selected engines: {selected_engines}")
369 return selected_engines
371 # Fallback if parsing fails - return generic engines plus some specialized ones
372 logger.warning(
373 "Failed to parse LLM response, using generic engines + top specialized"
374 )
375 result = (
376 generic_engines.copy() if self.include_generic_engines else []
377 )
378 for engine in self.available_engines[: self.max_engines_to_select]:
379 if engine not in result:
380 result.append(engine)
381 return result
383 except Exception:
384 logger.exception("Error selecting engines with LLM")
385 # Fallback to using generic engines + available engines
386 if self.include_generic_engines:
387 generic_engines = self._get_available_generic_engines()
388 result = generic_engines.copy()
389 for engine in self.available_engines[
390 : self.max_engines_to_select
391 ]:
392 if engine not in result: 392 ↛ 389line 392 didn't jump to line 389 because the condition on line 392 was always true
393 result.append(engine)
394 return result
395 return self.available_engines[: self.max_engines_to_select]
397 def _get_engine_instance(
398 self, engine_name: str
399 ) -> Optional[BaseSearchEngine]:
400 """Get or create an instance of the specified search engine"""
401 with self.cache_lock:
402 # Return cached instance if available
403 if engine_name in self.engine_cache:
404 return self.engine_cache[engine_name] # type: ignore[no-any-return]
406 # Create a new instance
407 engine = None
408 try:
409 # Only pass parameters that all engines accept
410 common_params = {
411 "llm": self.llm,
412 "max_results": self.max_results,
413 }
415 # Add max_filtered_results if specified
416 if self.max_filtered_results is not None: 416 ↛ 421line 416 didn't jump to line 421 because the condition on line 416 was always true
417 common_params["max_filtered_results"] = (
418 self.max_filtered_results
419 )
421 engine = create_search_engine(
422 engine_name,
423 settings_snapshot=self.settings_snapshot,
424 programmatic_mode=self.programmatic_mode,
425 **common_params, # type: ignore[arg-type]
426 )
428 # Individual engines use their auto-detected filtering settings
429 # The factory enables LLM filtering for scientific engines (arXiv, etc.)
430 # and disables it for generic engines (Google, Brave, etc.)
431 except Exception:
432 logger.exception(
433 f"Error creating engine instance for {engine_name}"
434 )
435 return None
437 if engine: 437 ↛ 441line 437 didn't jump to line 441 because the condition on line 437 was always true
438 # Cache the instance
439 self.engine_cache[engine_name] = engine
441 return engine
443 def _execute_single_engine(
444 self, engine_name: str, query: str
445 ) -> Dict[str, Any]:
446 """
447 Execute a single search engine and return results.
449 Context propagation and thread-local cleanup are handled by
450 ``_run_with_context`` at the submit site in ``_get_previews``,
451 which captures the submitter's context once per request.
453 Args:
454 engine_name: Name of the engine to execute
455 query: The search query
457 Returns:
458 Dictionary with engine name and results or error
459 """
460 logger.info(f"Executing search on {engine_name}")
462 engine = self._get_engine_instance(engine_name)
463 if not engine:
464 return {
465 "engine": engine_name,
466 "success": False,
467 "error": f"Failed to initialize {engine_name}",
468 "results": [],
469 }
471 try:
472 # Run the engine properly through its run() method
473 # This ensures proper filter application, context propagation, etc.
474 results = engine.run(query)
476 if results and len(results) > 0:
477 logger.info(f"Got {len(results)} results from {engine_name}")
478 return {
479 "engine": engine_name,
480 "success": True,
481 "results": results,
482 "count": len(results),
483 }
484 return {
485 "engine": engine_name,
486 "success": False,
487 "error": "No results",
488 "results": [],
489 }
491 except Exception:
492 logger.exception(f"Error executing {engine_name}")
493 return {
494 "engine": engine_name,
495 "success": False,
496 "error": f"Engine {engine_name} failed",
497 "results": [],
498 }
500 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
501 """
502 Get preview information by executing selected engines in parallel.
504 Args:
505 query: The search query
507 Returns:
508 Combined list of preview dictionaries from all successful engines
509 """
510 # Select engines for this query
511 selected_engines = self.select_engines(query)
513 if not selected_engines:
514 logger.warning("No engines selected")
515 return []
517 logger.info(
518 f"PARALLEL_SEARCH: Executing {len(selected_engines)} engines in parallel: {', '.join(selected_engines)}"
519 )
521 # Emit socket event about selected engines
522 try:
523 SocketIOService().emit_socket_event(
524 "parallel_search_started",
525 {"engines": selected_engines, "query": query},
526 )
527 except Exception:
528 logger.exception("Socket emit error (non-critical)")
530 # Execute all engines in parallel using persistent thread pool
531 all_results = []
532 engine_results = {}
534 # Get the global thread pool
535 executor = _get_global_executor()
536 if executor is None:
537 logger.error(
538 "Global thread pool not available, cannot execute parallel search"
539 )
540 return []
542 # Capture submitter's research context ONCE (in the request thread,
543 # where it is populated by the strategy's caller). Workers in the
544 # global pool do not inherit thread-local state, so we must re-apply
545 # it per task and clear it (+ clean up thread-local DB/session/password
546 # state) in a finally block so nothing leaks to the next task that
547 # lands on the same worker.
548 submitter_ctx = get_search_context()
550 def _run_with_context(fn, *fargs):
551 if submitter_ctx is not None:
552 set_search_context(submitter_ctx)
553 try:
554 return fn(*fargs)
555 finally:
556 clear_search_context()
557 try:
558 cleanup_current_thread()
559 except Exception:
560 logger.debug(
561 "parallel_search submit wrapper: cleanup_current_thread failed",
562 exc_info=True,
563 )
565 # Submit all tasks to the global executor
566 future_to_engine = {
567 executor.submit(
568 _run_with_context,
569 self._execute_single_engine,
570 engine_name,
571 query,
572 ): engine_name
573 for engine_name in selected_engines
574 }
576 # Collect results as they complete
577 for future in concurrent.futures.as_completed(future_to_engine):
578 engine_name = future_to_engine[future]
579 try:
580 result = future.result()
581 engine_results[engine_name] = result
583 if result["success"]:
584 # Add source information to each result
585 for item in result["results"]:
586 item["search_engine"] = engine_name
587 all_results.extend(result["results"])
589 # Emit success event
590 try:
591 SocketIOService().emit_socket_event(
592 "engine_completed",
593 {
594 "engine": engine_name,
595 "success": True,
596 "count": result["count"],
597 },
598 )
599 except Exception:
600 logger.debug("Socket emit error (non-critical)")
601 else:
602 # Emit failure event
603 try:
604 SocketIOService().emit_socket_event(
605 "engine_completed",
606 {
607 "engine": engine_name,
608 "success": False,
609 "error": result.get("error", "Unknown error"),
610 },
611 )
612 except Exception:
613 logger.debug("Socket emit error (non-critical)")
615 except Exception:
616 logger.exception(f"Failed to get result from {engine_name}")
617 engine_results[engine_name] = {
618 "engine": engine_name,
619 "success": False,
620 "error": "Search execution failed",
621 "results": [],
622 }
624 # Log summary
625 successful_engines = [
626 name for name, res in engine_results.items() if res["success"]
627 ]
628 failed_engines = [
629 name for name, res in engine_results.items() if not res["success"]
630 ]
632 logger.info(
633 f"PARALLEL_SEARCH_COMPLETE: {len(successful_engines)} succeeded, {len(failed_engines)} failed"
634 )
635 if successful_engines:
636 logger.info(f"Successful engines: {', '.join(successful_engines)}")
637 if failed_engines:
638 logger.info(f"Failed engines: {', '.join(failed_engines)}")
640 # Log sample result to understand structure
641 if all_results:
642 logger.debug(
643 f"Sample result keys from first result: {list(all_results[0].keys())}"
644 )
645 logger.debug(f"Sample result: {str(all_results[0])[:500]}")
647 logger.info(f"Total results from all engines: {len(all_results)}")
649 # Store the engine results for potential use in _get_full_content
650 self._engine_results = engine_results
651 self._successful_engines = successful_engines
653 return all_results
655 def _get_full_content(
656 self, relevant_items: List[Dict[str, Any]]
657 ) -> List[Dict[str, Any]]:
658 """
659 Get full content for the relevant items.
660 Since we ran multiple engines, we'll group items by their source engine
661 and get full content from each engine for its own results.
663 Args:
664 relevant_items: List of relevant preview dictionaries
666 Returns:
667 List of result dictionaries with full content
668 """
669 # Check if we should get full content
670 if get_setting_from_snapshot(
671 "search.snippets_only",
672 True,
673 settings_snapshot=self.settings_snapshot,
674 ):
675 logger.info("Snippet-only mode, skipping full content retrieval")
676 return relevant_items
678 logger.info("Getting full content for relevant items")
680 # Group items by their source engine
681 items_by_engine: Dict[str, List[Dict[str, Any]]] = {}
682 for item in relevant_items:
683 engine_name = item.get("search_engine")
684 if engine_name:
685 if engine_name not in items_by_engine: 685 ↛ 687line 685 didn't jump to line 687 because the condition on line 685 was always true
686 items_by_engine[engine_name] = []
687 items_by_engine[engine_name].append(item)
689 # Get full content from each engine for its items
690 all_full_content = []
692 for engine_name, items in items_by_engine.items():
693 engine = self._get_engine_instance(engine_name)
694 if engine: 694 ↛ 709line 694 didn't jump to line 709 because the condition on line 694 was always true
695 try:
696 logger.info(
697 f"Getting full content from {engine_name} for {len(items)} items"
698 )
699 full_content = engine._get_full_content(items)
700 all_full_content.extend(full_content)
701 except Exception:
702 logger.exception(
703 f"Error getting full content from {engine_name}"
704 )
705 # Fall back to returning items without full content
706 all_full_content.extend(items)
707 else:
708 # No engine available, return items as-is
709 all_full_content.extend(items)
711 return all_full_content
713 def close(self):
714 """Close all cached child search engines and own resources."""
715 from ...utilities.resource_utils import safe_close
717 for engine in self.engine_cache.values():
718 safe_close(engine, "child search engine")
719 self.engine_cache.clear()
720 super().close()
722 def invoke(self, query: str) -> List[Dict[str, Any]]:
723 """Compatibility method for LangChain tools"""
724 return self.run(query)
726 # Note: No shutdown() or context manager methods needed
727 # The global thread pool is automatically cleaned up at process exit via atexit