Coverage for src / local_deep_research / web_search_engines / engines / meta_search_engine.py: 6%
193 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1from typing import Any, Dict, List, Optional
3from loguru import logger
5from ...config.search_config import get_setting_from_snapshot
6from ...web.services.socket_service import SocketIOService
7from ..search_engine_base import BaseSearchEngine
8from ..search_engine_factory import create_search_engine
9from .search_engine_wikipedia import WikipediaSearchEngine
12class MetaSearchEngine(BaseSearchEngine):
13 """
14 LLM-powered meta search engine that intelligently selects and uses
15 the appropriate search engines based on query analysis
16 """
18 def __init__(
19 self,
20 llm,
21 max_results: int = 10,
22 use_api_key_services: bool = True,
23 max_engines_to_try: int = 3,
24 max_filtered_results: Optional[int] = None,
25 engine_selection_callback=None,
26 settings_snapshot: Optional[Dict[str, Any]] = None,
27 programmatic_mode: bool = False,
28 **kwargs,
29 ):
30 """
31 Initialize the meta search engine.
33 Args:
34 llm: Language model instance for query classification and relevance filtering
35 max_results: Maximum number of search results to return
36 use_api_key_services: Whether to include services that require API keys
37 max_engines_to_try: Maximum number of engines to try before giving up
38 max_filtered_results: Maximum number of results to keep after filtering
39 settings_snapshot: Settings snapshot for thread context
40 programmatic_mode: If True, disables database operations and metrics tracking
41 **kwargs: Additional parameters (ignored but accepted for compatibility)
42 """
43 # Initialize the BaseSearchEngine with the LLM, max_filtered_results, and max_results
44 super().__init__(
45 llm=llm,
46 max_filtered_results=max_filtered_results,
47 max_results=max_results,
48 settings_snapshot=settings_snapshot,
49 programmatic_mode=programmatic_mode,
50 )
52 self.use_api_key_services = use_api_key_services
53 self.max_engines_to_try = max_engines_to_try
54 self.settings_snapshot = settings_snapshot
56 # Cache for engine instances
57 self.engine_cache = {}
59 # Get available engines (excluding 'meta' and 'auto')
60 self.available_engines = self._get_available_engines()
61 logger.info(
62 f"Meta Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}"
63 )
65 # Create a fallback engine in case everything else fails
66 self.fallback_engine = WikipediaSearchEngine(
67 max_results=self.max_results,
68 llm=llm,
69 max_filtered_results=max_filtered_results,
70 )
72 def _get_search_config(self) -> Dict[str, Any]:
73 """Get search config from settings_snapshot or fallback to self._get_search_config()"""
74 if self.settings_snapshot:
75 # Extract search engine configs from settings snapshot
76 config_data = {}
77 for key, value in self.settings_snapshot.items():
78 if key.startswith("search.engine.web."):
79 parts = key.split(".")
80 if len(parts) >= 4:
81 engine_name = parts[3]
82 if engine_name not in config_data:
83 config_data[engine_name] = {}
84 remaining_key = (
85 ".".join(parts[4:]) if len(parts) > 4 else ""
86 )
87 if remaining_key:
88 config_data[engine_name][remaining_key] = (
89 value.get("value")
90 if isinstance(value, dict)
91 else value
92 )
94 # Also check for auto engine
95 if "search.engine.auto.class_name" in self.settings_snapshot:
96 config_data["auto"] = {}
97 for key, value in self.settings_snapshot.items():
98 if key.startswith("search.engine.auto."):
99 remaining_key = key.replace("search.engine.auto.", "")
100 config_data["auto"][remaining_key] = (
101 value.get("value")
102 if isinstance(value, dict)
103 else value
104 )
105 return config_data
106 else:
107 # Fallback to search_config if no snapshot
108 return self._get_search_config()
110 def _get_available_engines(self) -> List[str]:
111 """Get list of available engines, excluding 'meta' and 'auto', based on user settings"""
112 # Filter out 'meta' and 'auto' and check API key availability
113 available = []
115 # Get search config using helper method
116 config_data = self._get_search_config()
118 for name, config_ in config_data.items():
119 if name in ["meta", "auto"]:
120 continue
122 # Determine if this is a local engine (starts with "local.")
123 is_local_engine = name.startswith("local.")
125 # Determine the appropriate setting path based on engine type
126 if is_local_engine:
127 # Format: search.engine.local.{engine_name}.use_in_auto_search
128 local_name = name.replace("local.", "")
129 auto_search_setting = (
130 f"search.engine.local.{local_name}.use_in_auto_search"
131 )
132 else:
133 # Format: search.engine.web.{engine_name}.use_in_auto_search
134 auto_search_setting = (
135 f"search.engine.web.{name}.use_in_auto_search"
136 )
138 # Get setting from database, default to False if not found
139 use_in_auto_search = get_setting_from_snapshot(
140 auto_search_setting,
141 False,
142 settings_snapshot=self.settings_snapshot,
143 )
145 # Skip engines that aren't enabled for auto search
146 if not use_in_auto_search:
147 logger.info(
148 f"Skipping {name} engine because it's not enabled for auto search"
149 )
150 continue
152 # Skip engines that require API keys if we don't want to use them
153 if (
154 config_.get("requires_api_key", False)
155 and not self.use_api_key_services
156 ):
157 continue
159 # Skip engines that require API keys if the key is not available
160 if config_.get("requires_api_key", False):
161 api_key = config_.get("api_key")
162 if not api_key:
163 continue
165 available.append(name)
167 # If no engines are available, raise an error instead of falling back silently
168 if not available:
169 error_msg = "No search engines enabled for auto search. Please enable at least one engine in settings."
170 logger.error(error_msg)
171 raise RuntimeError(error_msg)
173 return available
175 def analyze_query(self, query: str) -> List[str]:
176 """
177 Analyze the query to determine the best search engines to use.
178 Prioritizes SearXNG for general queries, but selects specialized engines
179 for domain-specific queries (e.g., scientific papers, code).
181 Args:
182 query: The search query
184 Returns:
185 List of search engine names sorted by suitability
186 """
187 try:
188 # First check if this is a specialized query that should use specific engines
189 specialized_domains = {
190 "scientific paper": ["arxiv", "pubmed", "wikipedia"],
191 "medical research": ["pubmed", "searxng"],
192 "clinical": ["pubmed", "searxng"],
193 "github": ["github", "searxng"],
194 "repository": ["github", "searxng"],
195 "code": ["github", "searxng"],
196 "programming": ["github", "searxng"],
197 }
199 # Quick heuristic check for specialized queries
200 query_lower = query.lower()
201 for term, engines in specialized_domains.items():
202 if term in query_lower:
203 valid_engines = []
204 for engine in engines:
205 if engine in self.available_engines:
206 valid_engines.append(engine)
208 if valid_engines:
209 logger.info(
210 f"Detected specialized query type: {term}, using engines: {valid_engines}"
211 )
212 return valid_engines
214 # For searches containing "arxiv", prioritize the arxiv engine
215 if "arxiv" in query_lower and "arxiv" in self.available_engines:
216 return ["arxiv"] + [
217 e for e in self.available_engines if e != "arxiv"
218 ]
220 # For searches containing "pubmed", prioritize the pubmed engine
221 if "pubmed" in query_lower and "pubmed" in self.available_engines:
222 return ["pubmed"] + [
223 e for e in self.available_engines if e != "pubmed"
224 ]
226 # Check if SearXNG is available and prioritize it for general queries
227 if "searxng" in self.available_engines:
228 # For general queries, return SearXNG first followed by reliability-ordered engines
229 engines_without_searxng = [
230 e for e in self.available_engines if e != "searxng"
231 ]
232 reliability_sorted = sorted(
233 engines_without_searxng,
234 key=lambda x: self._get_search_config()
235 .get(x, {})
236 .get("reliability", 0),
237 reverse=True,
238 )
239 return ["searxng"] + reliability_sorted
241 # If LLM is not available or SearXNG is not available, fall back to reliability
242 if not self.llm or "searxng" not in self.available_engines:
243 logger.warning(
244 "No LLM available or SearXNG not available, using reliability-based engines"
245 )
246 # Return engines sorted by reliability
247 return sorted(
248 self.available_engines,
249 key=lambda x: self._get_search_config()
250 .get(x, {})
251 .get("reliability", 0),
252 reverse=True,
253 )
255 # Create a prompt that outlines the available search engines and their strengths
256 engines_info = []
257 for engine_name in self.available_engines:
258 try:
259 if engine_name in self._get_search_config():
260 strengths = self._get_search_config()[engine_name].get(
261 "strengths", "General search"
262 )
263 weaknesses = self._get_search_config()[engine_name].get(
264 "weaknesses", "None specified"
265 )
266 description = self._get_search_config()[
267 engine_name
268 ].get("description", engine_name)
269 engines_info.append(
270 f"- {engine_name}: {description}\n Strengths: {strengths}\n Weaknesses: {weaknesses}"
271 )
272 except KeyError:
273 logger.exception(f"Missing key for engine {engine_name}")
275 # Only proceed if we have engines available to choose from
276 if not engines_info:
277 logger.warning(
278 "No engine information available for prompt, using reliability-based sorting instead"
279 )
280 return sorted(
281 self.available_engines,
282 key=lambda x: self._get_search_config()
283 .get(x, {})
284 .get("reliability", 0),
285 reverse=True,
286 )
288 # Use a stronger prompt that emphasizes SearXNG preference for general queries
289 prompt = f"""You are a search query analyst. Consider this search query:
291QUERY: {query}
293I have these search engines available:
294{chr(10).join(engines_info)}
296Determine which search engines would be most appropriate for answering this query.
297First analyze the nature of the query: Is it factual, scientific, code-related, medical, etc.?
299IMPORTANT GUIDELINES:
300- Use SearXNG for most general queries as it combines results from multiple search engines
301- For academic/scientific searches, prefer arXiv
302- For medical research, prefer PubMed
303- For code repositories and programming, prefer GitHub
304- For every other query type, SearXNG is usually the best option
306Output ONLY a comma-separated list of 1-3 search engine names in order of most appropriate to least appropriate.
307Example output: searxng,wikipedia,brave"""
309 # Get analysis from LLM
310 response = self.llm.invoke(prompt)
312 # Handle different response formats
313 if hasattr(response, "content"):
314 content = response.content.strip()
315 else:
316 content = str(response).strip()
318 # Extract engine names
319 valid_engines = []
320 for engine_name in content.split(","):
321 cleaned_name = engine_name.strip().lower()
322 if cleaned_name in self.available_engines:
323 valid_engines.append(cleaned_name)
325 # If SearXNG is available but not selected by the LLM, add it as a fallback
326 if (
327 "searxng" in self.available_engines
328 and "searxng" not in valid_engines
329 ):
330 # Add it as the last option if the LLM selected others
331 if valid_engines:
332 valid_engines.append("searxng")
333 # Use it as the first option if no valid engines were selected
334 else:
335 valid_engines = ["searxng"]
337 # If still no valid engines, use reliability-based ordering
338 if not valid_engines:
339 valid_engines = sorted(
340 self.available_engines,
341 key=lambda x: self._get_search_config()
342 .get(x, {})
343 .get("reliability", 0),
344 reverse=True,
345 )
347 return valid_engines
348 except Exception:
349 logger.exception("Error analyzing query with LLM")
350 # Fall back to SearXNG if available, then reliability-based ordering
351 if "searxng" in self.available_engines:
352 return ["searxng"] + sorted(
353 [e for e in self.available_engines if e != "searxng"],
354 key=lambda x: self._get_search_config()
355 .get(x, {})
356 .get("reliability", 0),
357 reverse=True,
358 )
359 else:
360 return sorted(
361 self.available_engines,
362 key=lambda x: self._get_search_config()
363 .get(x, {})
364 .get("reliability", 0),
365 reverse=True,
366 )
368 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
369 """
370 Get preview information by selecting the best search engine for this query.
372 Args:
373 query: The search query
375 Returns:
376 List of preview dictionaries
377 """
378 # Get ranked list of engines for this query
379 ranked_engines = self.analyze_query(query)
381 if not ranked_engines:
382 logger.warning(
383 "No suitable search engines found for query, using fallback engine"
384 )
385 return self.fallback_engine._get_previews(query)
387 # Limit the number of engines to try
388 engines_to_try = ranked_engines[: self.max_engines_to_try]
389 logger.info(
390 f"SEARCH_PLAN: Will try these engines in order: {', '.join(engines_to_try)}"
391 )
393 all_errors = []
394 # Try each engine in order
395 for engine_name in engines_to_try:
396 logger.info(f"Trying search engine: {engine_name}")
398 # Get or create the engine instance
399 engine = self._get_engine_instance(engine_name)
401 if not engine:
402 logger.warning(f"Failed to initialize {engine_name}, skipping")
403 all_errors.append(f"Failed to initialize {engine_name}")
404 continue
406 try:
407 # Get previews from this engine
408 previews = engine._get_previews(query)
410 # If search was successful, return results
411 if previews and len(previews) > 0:
412 logger.info(f"ENGINE_SELECTED: {engine_name}")
413 logger.info(
414 f"Successfully got {len(previews)} preview results from {engine_name}"
415 )
416 # Store selected engine for later use
417 self._selected_engine = engine
418 self._selected_engine_name = engine_name
420 # Emit a socket event to inform about the selected engine
421 try:
422 SocketIOService().emit_socket_event(
423 "search_engine_selected",
424 {
425 "engine": engine_name,
426 "result_count": len(previews),
427 },
428 )
429 except Exception:
430 logger.exception("Socket emit error (non-critical)")
432 return previews
434 logger.info(f"{engine_name} returned no previews")
435 all_errors.append(f"{engine_name} returned no previews")
437 except Exception as e:
438 error_msg = f"Error getting previews from {engine_name}: {e!s}"
439 logger.exception(error_msg)
440 all_errors.append(error_msg)
442 # If we reach here, all engines failed, use fallback
443 logger.warning(
444 f"All engines failed or returned no preview results: {', '.join(all_errors)}"
445 )
446 logger.info("Using fallback Wikipedia engine for previews")
447 self._selected_engine = self.fallback_engine
448 self._selected_engine_name = "wikipedia"
449 return self.fallback_engine._get_previews(query)
451 def _get_full_content(
452 self, relevant_items: List[Dict[str, Any]]
453 ) -> List[Dict[str, Any]]:
454 """
455 Get full content using the engine that provided the previews.
457 Args:
458 relevant_items: List of relevant preview dictionaries
460 Returns:
461 List of result dictionaries with full content
462 """
463 # Check if we should get full content
464 if get_setting_from_snapshot(
465 "search.snippets_only",
466 True,
467 settings_snapshot=self.settings_snapshot,
468 ):
469 logger.info("Snippet-only mode, skipping full content retrieval")
470 return relevant_items
472 logger.info("Getting full content for relevant items")
474 # Use the selected engine to get full content
475 if hasattr(self, "_selected_engine"):
476 try:
477 logger.info(
478 f"Using {self._selected_engine_name} to get full content"
479 )
480 return self._selected_engine._get_full_content(relevant_items)
481 except Exception:
482 logger.exception(
483 f"Error getting full content from {self._selected_engine_name}"
484 )
485 # Fall back to returning relevant items without full content
486 return relevant_items
487 else:
488 logger.warning(
489 "No engine was selected during preview phase, returning relevant items as-is"
490 )
491 return relevant_items
493 def _get_engine_instance(
494 self, engine_name: str
495 ) -> Optional[BaseSearchEngine]:
496 """Get or create an instance of the specified search engine"""
497 # Return cached instance if available
498 if engine_name in self.engine_cache:
499 return self.engine_cache[engine_name]
501 # Create a new instance
502 engine = None
503 try:
504 # Only pass parameters that all engines accept
505 common_params = {"llm": self.llm, "max_results": self.max_results}
507 # Add max_filtered_results if specified
508 if self.max_filtered_results is not None:
509 common_params["max_filtered_results"] = (
510 self.max_filtered_results
511 )
513 engine = create_search_engine(
514 engine_name,
515 settings_snapshot=self.settings_snapshot,
516 programmatic_mode=self.programmatic_mode,
517 **common_params,
518 )
519 except Exception:
520 logger.exception(
521 f"Error creating engine instance for {engine_name}"
522 )
523 return None
525 if engine:
526 # Cache the instance
527 self.engine_cache[engine_name] = engine
529 return engine
531 def invoke(self, query: str) -> List[Dict[str, Any]]:
532 """Compatibility method for LangChain tools"""
533 return self.run(query)