Coverage for src / local_deep_research / web_search_engines / engines / meta_search_engine.py: 80%
194 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1from typing import Any, Dict, List, Optional
3from loguru import logger
5from ...config.search_config import get_setting_from_snapshot
6from ...web.services.socket_service import SocketIOService
7from ..search_engine_base import BaseSearchEngine
8from ..search_engine_factory import create_search_engine
9from .search_engine_wikipedia import WikipediaSearchEngine
12class MetaSearchEngine(BaseSearchEngine):
13 """
14 LLM-powered meta search engine that intelligently selects and uses
15 the appropriate search engines based on query analysis
16 """
18 def __init__(
19 self,
20 llm,
21 max_results: int = 10,
22 use_api_key_services: bool = True,
23 max_engines_to_try: int = 3,
24 max_filtered_results: Optional[int] = None,
25 _engine_selection_callback=None,
26 settings_snapshot: Optional[Dict[str, Any]] = None,
27 programmatic_mode: bool = False,
28 **kwargs,
29 ):
30 """
31 Initialize the meta search engine.
33 Args:
34 llm: Language model instance for query classification and relevance filtering
35 max_results: Maximum number of search results to return
36 use_api_key_services: Whether to include services that require API keys
37 max_engines_to_try: Maximum number of engines to try before giving up
38 max_filtered_results: Maximum number of results to keep after filtering
39 settings_snapshot: Settings snapshot for thread context
40 programmatic_mode: If True, disables database operations and metrics tracking
41 **kwargs: Additional parameters (ignored but accepted for compatibility)
42 """
43 # Initialize the BaseSearchEngine with the LLM, max_filtered_results, and max_results
44 super().__init__(
45 llm=llm,
46 max_filtered_results=max_filtered_results,
47 max_results=max_results,
48 settings_snapshot=settings_snapshot,
49 programmatic_mode=programmatic_mode,
50 )
52 self.use_api_key_services = use_api_key_services
53 self.max_engines_to_try = max_engines_to_try
54 self.settings_snapshot = settings_snapshot
56 # Cache for engine instances
57 self.engine_cache = {}
59 # Get available engines (excluding 'meta' and 'auto')
60 self.available_engines = self._get_available_engines()
61 logger.info(
62 f"Meta Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}"
63 )
65 # Create a fallback engine in case everything else fails
66 self.fallback_engine = WikipediaSearchEngine(
67 max_results=self.max_results,
68 llm=llm,
69 max_filtered_results=max_filtered_results,
70 )
72 def _get_search_config(self) -> Dict[str, Any]:
73 """Get search config from settings_snapshot or return empty dict."""
74 if self.settings_snapshot:
75 # Extract search engine configs from settings snapshot
76 config_data = {}
77 for key, value in self.settings_snapshot.items():
78 if key.startswith("search.engine.web."):
79 parts = key.split(".")
80 if len(parts) >= 4: 80 ↛ 77line 80 didn't jump to line 77 because the condition on line 80 was always true
81 engine_name = parts[3]
82 if engine_name not in config_data:
83 config_data[engine_name] = {}
84 remaining_key = (
85 ".".join(parts[4:]) if len(parts) > 4 else ""
86 )
87 if remaining_key: 87 ↛ 77line 87 didn't jump to line 77 because the condition on line 87 was always true
88 config_data[engine_name][remaining_key] = (
89 value.get("value")
90 if isinstance(value, dict)
91 else value
92 )
94 # Also check for auto engine
95 if "search.engine.auto.class_name" in self.settings_snapshot:
96 config_data["auto"] = {}
97 for key, value in self.settings_snapshot.items():
98 if key.startswith("search.engine.auto."):
99 remaining_key = key.replace("search.engine.auto.", "")
100 config_data["auto"][remaining_key] = (
101 value.get("value")
102 if isinstance(value, dict)
103 else value
104 )
105 return config_data
106 else:
107 logger.warning(
108 "No settings_snapshot provided to MetaSearchEngine, "
109 "returning empty search config"
110 )
111 return {}
113 def _get_available_engines(self) -> List[str]:
114 """Get list of available engines, excluding 'meta' and 'auto', based on user settings"""
115 # Filter out 'meta' and 'auto' and check API key availability
116 available = []
118 # Get search config using helper method
119 config_data = self._get_search_config()
121 for name, config_ in config_data.items():
122 if name in ["meta", "auto"]:
123 continue
125 # Determine if this is a local engine (starts with "local.")
126 is_local_engine = name.startswith("local.")
128 # Determine the appropriate setting path based on engine type
129 if is_local_engine:
130 # Format: search.engine.local.{engine_name}.use_in_auto_search
131 local_name = name.replace("local.", "")
132 auto_search_setting = (
133 f"search.engine.local.{local_name}.use_in_auto_search"
134 )
135 else:
136 # Format: search.engine.web.{engine_name}.use_in_auto_search
137 auto_search_setting = (
138 f"search.engine.web.{name}.use_in_auto_search"
139 )
141 # Get setting from database, default to False if not found
142 use_in_auto_search = get_setting_from_snapshot(
143 auto_search_setting,
144 False,
145 settings_snapshot=self.settings_snapshot,
146 )
148 # Skip engines that aren't enabled for auto search
149 if not use_in_auto_search:
150 logger.info(
151 f"Skipping {name} engine because it's not enabled for auto search"
152 )
153 continue
155 # Skip engines that require API keys if we don't want to use them
156 if (
157 config_.get("requires_api_key", False)
158 and not self.use_api_key_services
159 ):
160 continue
162 # Skip engines that require API keys if the key is not available
163 if config_.get("requires_api_key", False):
164 api_key = config_.get("api_key")
165 if not api_key:
166 continue
168 available.append(name)
170 # If no engines are available, raise an error instead of falling back silently
171 if not available:
172 error_msg = "No search engines enabled for auto search. Please enable at least one engine in settings."
173 logger.error(error_msg)
174 raise RuntimeError(error_msg)
176 return available
178 def analyze_query(self, query: str) -> List[str]:
179 """
180 Analyze the query to determine the best search engines to use.
181 Prioritizes SearXNG for general queries, but selects specialized engines
182 for domain-specific queries (e.g., scientific papers, code).
184 Args:
185 query: The search query
187 Returns:
188 List of search engine names sorted by suitability
189 """
190 try:
191 # First check if this is a specialized query that should use specific engines
192 specialized_domains = {
193 "scientific paper": ["arxiv", "pubmed", "wikipedia"],
194 "medical research": ["pubmed", "searxng"],
195 "clinical": ["pubmed", "searxng"],
196 "github": ["github", "searxng"],
197 "repository": ["github", "searxng"],
198 "code": ["github", "searxng"],
199 "programming": ["github", "searxng"],
200 }
202 # Quick heuristic check for specialized queries
203 query_lower = query.lower()
204 for term, engines in specialized_domains.items():
205 if term in query_lower:
206 valid_engines = []
207 for engine in engines:
208 if engine in self.available_engines:
209 valid_engines.append(engine)
211 if valid_engines: 211 ↛ 204line 211 didn't jump to line 204 because the condition on line 211 was always true
212 logger.info(
213 f"Detected specialized query type: {term}, using engines: {valid_engines}"
214 )
215 return valid_engines
217 # For searches containing "arxiv", prioritize the arxiv engine
218 if "arxiv" in query_lower and "arxiv" in self.available_engines:
219 return ["arxiv"] + [
220 e for e in self.available_engines if e != "arxiv"
221 ]
223 # For searches containing "pubmed", prioritize the pubmed engine
224 if "pubmed" in query_lower and "pubmed" in self.available_engines:
225 return ["pubmed"] + [
226 e for e in self.available_engines if e != "pubmed"
227 ]
229 # Check if SearXNG is available and prioritize it for general queries
230 if "searxng" in self.available_engines:
231 # For general queries, return SearXNG first followed by reliability-ordered engines
232 engines_without_searxng = [
233 e for e in self.available_engines if e != "searxng"
234 ]
235 reliability_sorted = sorted(
236 engines_without_searxng,
237 key=lambda x: self._get_search_config()
238 .get(x, {})
239 .get("reliability", 0),
240 reverse=True,
241 )
242 return ["searxng"] + reliability_sorted
244 # If LLM is not available or SearXNG is not available, fall back to reliability
245 if not self.llm or "searxng" not in self.available_engines: 245 ↛ 259line 245 didn't jump to line 259 because the condition on line 245 was always true
246 logger.warning(
247 "No LLM available or SearXNG not available, using reliability-based engines"
248 )
249 # Return engines sorted by reliability
250 return sorted(
251 self.available_engines,
252 key=lambda x: self._get_search_config()
253 .get(x, {})
254 .get("reliability", 0),
255 reverse=True,
256 )
258 # Create a prompt that outlines the available search engines and their strengths
259 engines_info = []
260 for engine_name in self.available_engines:
261 try:
262 if engine_name in self._get_search_config():
263 strengths = self._get_search_config()[engine_name].get(
264 "strengths", "General search"
265 )
266 weaknesses = self._get_search_config()[engine_name].get(
267 "weaknesses", "None specified"
268 )
269 description = self._get_search_config()[
270 engine_name
271 ].get("description", engine_name)
272 engines_info.append(
273 f"- {engine_name}: {description}\n Strengths: {strengths}\n Weaknesses: {weaknesses}"
274 )
275 except KeyError:
276 logger.exception(f"Missing key for engine {engine_name}")
278 # Only proceed if we have engines available to choose from
279 if not engines_info:
280 logger.warning(
281 "No engine information available for prompt, using reliability-based sorting instead"
282 )
283 return sorted(
284 self.available_engines,
285 key=lambda x: self._get_search_config()
286 .get(x, {})
287 .get("reliability", 0),
288 reverse=True,
289 )
291 # Use a stronger prompt that emphasizes SearXNG preference for general queries
292 prompt = f"""You are a search query analyst. Consider this search query:
294QUERY: {query}
296I have these search engines available:
297{chr(10).join(engines_info)}
299Determine which search engines would be most appropriate for answering this query.
300First analyze the nature of the query: Is it factual, scientific, code-related, medical, etc.?
302IMPORTANT GUIDELINES:
303- Use SearXNG for most general queries as it combines results from multiple search engines
304- For academic/scientific searches, prefer arXiv
305- For medical research, prefer PubMed
306- For code repositories and programming, prefer GitHub
307- For every other query type, SearXNG is usually the best option
309Output ONLY a comma-separated list of 1-3 search engine names in order of most appropriate to least appropriate.
310Example output: searxng,wikipedia,brave"""
312 # Get analysis from LLM
313 response = self.llm.invoke(prompt)
315 # Handle different response formats
316 if hasattr(response, "content"):
317 content = response.content.strip()
318 else:
319 content = str(response).strip()
321 # Extract engine names
322 valid_engines = []
323 for engine_name in content.split(","):
324 cleaned_name = engine_name.strip().lower()
325 if cleaned_name in self.available_engines:
326 valid_engines.append(cleaned_name)
328 # If SearXNG is available but not selected by the LLM, add it as a fallback
329 if (
330 "searxng" in self.available_engines
331 and "searxng" not in valid_engines
332 ):
333 # Add it as the last option if the LLM selected others
334 if valid_engines:
335 valid_engines.append("searxng")
336 # Use it as the first option if no valid engines were selected
337 else:
338 valid_engines = ["searxng"]
340 # If still no valid engines, use reliability-based ordering
341 if not valid_engines:
342 valid_engines = sorted(
343 self.available_engines,
344 key=lambda x: self._get_search_config()
345 .get(x, {})
346 .get("reliability", 0),
347 reverse=True,
348 )
350 return valid_engines
351 except Exception:
352 logger.exception("Error analyzing query with LLM")
353 # Fall back to SearXNG if available, then reliability-based ordering
354 if "searxng" in self.available_engines:
355 return ["searxng"] + sorted(
356 [e for e in self.available_engines if e != "searxng"],
357 key=lambda x: self._get_search_config()
358 .get(x, {})
359 .get("reliability", 0),
360 reverse=True,
361 )
362 else:
363 return sorted(
364 self.available_engines,
365 key=lambda x: self._get_search_config()
366 .get(x, {})
367 .get("reliability", 0),
368 reverse=True,
369 )
371 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
372 """
373 Get preview information by selecting the best search engine for this query.
375 Args:
376 query: The search query
378 Returns:
379 List of preview dictionaries
380 """
381 # Get ranked list of engines for this query
382 ranked_engines = self.analyze_query(query)
384 if not ranked_engines: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true
385 logger.warning(
386 "No suitable search engines found for query, using fallback engine"
387 )
388 return self.fallback_engine._get_previews(query)
390 # Limit the number of engines to try
391 engines_to_try = ranked_engines[: self.max_engines_to_try]
392 logger.info(
393 f"SEARCH_PLAN: Will try these engines in order: {', '.join(engines_to_try)}"
394 )
396 all_errors = []
397 # Try each engine in order
398 for engine_name in engines_to_try:
399 logger.info(f"Trying search engine: {engine_name}")
401 # Get or create the engine instance
402 engine = self._get_engine_instance(engine_name)
404 if not engine:
405 logger.warning(f"Failed to initialize {engine_name}, skipping")
406 all_errors.append(f"Failed to initialize {engine_name}")
407 continue
409 try:
410 # Get previews from this engine
411 previews = engine._get_previews(query)
413 # If search was successful, return results
414 if previews and len(previews) > 0:
415 logger.info(f"ENGINE_SELECTED: {engine_name}")
416 logger.info(
417 f"Successfully got {len(previews)} preview results from {engine_name}"
418 )
419 # Store selected engine for later use
420 self._selected_engine = engine
421 self._selected_engine_name = engine_name
423 # Emit a socket event to inform about the selected engine
424 try:
425 SocketIOService().emit_socket_event(
426 "search_engine_selected",
427 {
428 "engine": engine_name,
429 "result_count": len(previews),
430 },
431 )
432 except Exception:
433 logger.exception("Socket emit error (non-critical)")
435 return previews
437 logger.info(f"{engine_name} returned no previews")
438 all_errors.append(f"{engine_name} returned no previews")
440 except Exception as e:
441 error_msg = f"Error getting previews from {engine_name}: {e!s}"
442 logger.exception(error_msg)
443 all_errors.append(error_msg)
445 # If we reach here, all engines failed, use fallback
446 logger.warning(
447 f"All engines failed or returned no preview results: {', '.join(all_errors)}"
448 )
449 logger.info("Using fallback Wikipedia engine for previews")
450 self._selected_engine = self.fallback_engine
451 self._selected_engine_name = "wikipedia"
452 return self.fallback_engine._get_previews(query)
454 def _get_full_content(
455 self, relevant_items: List[Dict[str, Any]]
456 ) -> List[Dict[str, Any]]:
457 """
458 Get full content using the engine that provided the previews.
460 Args:
461 relevant_items: List of relevant preview dictionaries
463 Returns:
464 List of result dictionaries with full content
465 """
466 # Check if we should get full content
467 if get_setting_from_snapshot(
468 "search.snippets_only",
469 True,
470 settings_snapshot=self.settings_snapshot,
471 ):
472 logger.info("Snippet-only mode, skipping full content retrieval")
473 return relevant_items
475 logger.info("Getting full content for relevant items")
477 # Use the selected engine to get full content
478 if hasattr(self, "_selected_engine"):
479 try:
480 logger.info(
481 f"Using {self._selected_engine_name} to get full content"
482 )
483 return self._selected_engine._get_full_content(relevant_items)
484 except Exception:
485 logger.exception(
486 f"Error getting full content from {self._selected_engine_name}"
487 )
488 # Fall back to returning relevant items without full content
489 return relevant_items
490 else:
491 logger.warning(
492 "No engine was selected during preview phase, returning relevant items as-is"
493 )
494 return relevant_items
496 def _get_engine_instance(
497 self, engine_name: str
498 ) -> Optional[BaseSearchEngine]:
499 """Get or create an instance of the specified search engine"""
500 # Return cached instance if available
501 if engine_name in self.engine_cache:
502 return self.engine_cache[engine_name]
504 # Create a new instance
505 engine = None
506 try:
507 # Only pass parameters that all engines accept
508 common_params = {"llm": self.llm, "max_results": self.max_results}
510 # Add max_filtered_results if specified
511 if self.max_filtered_results is not None: 511 ↛ 516line 511 didn't jump to line 516 because the condition on line 511 was always true
512 common_params["max_filtered_results"] = (
513 self.max_filtered_results
514 )
516 engine = create_search_engine(
517 engine_name,
518 settings_snapshot=self.settings_snapshot,
519 programmatic_mode=self.programmatic_mode,
520 **common_params,
521 )
522 except Exception:
523 logger.exception(
524 f"Error creating engine instance for {engine_name}"
525 )
526 return None
528 if engine: 528 ↛ 532line 528 didn't jump to line 532 because the condition on line 528 was always true
529 # Cache the instance
530 self.engine_cache[engine_name] = engine
532 return engine
534 def invoke(self, query: str) -> List[Dict[str, Any]]:
535 """Compatibility method for LangChain tools"""
536 return self.run(query)