Coverage for src / local_deep_research / web_search_engines / engines / meta_search_engine.py: 78%
163 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1from typing import Any, Dict, List, Optional
3from loguru import logger
5from ...config.search_config import get_setting_from_snapshot
6from ...web.services.socket_service import SocketIOService
7from ..search_engine_base import BaseSearchEngine
8from ..search_engine_factory import create_search_engine
9from ..search_engines_config import get_available_engines
10from .search_engine_wikipedia import WikipediaSearchEngine
13class MetaSearchEngine(BaseSearchEngine):
14 """
15 LLM-powered meta search engine that intelligently selects and uses
16 the appropriate search engines based on query analysis
17 """
19 def __init__(
20 self,
21 llm,
22 max_results: int = 10,
23 use_api_key_services: bool = True,
24 max_engines_to_try: int = 3,
25 max_filtered_results: Optional[int] = None,
26 _engine_selection_callback=None,
27 settings_snapshot: Optional[Dict[str, Any]] = None,
28 programmatic_mode: bool = False,
29 **kwargs,
30 ):
31 """
32 Initialize the meta search engine.
34 Args:
35 llm: Language model instance for query classification and relevance filtering
36 max_results: Maximum number of search results to return
37 use_api_key_services: Whether to include services that require API keys
38 max_engines_to_try: Maximum number of engines to try before giving up
39 max_filtered_results: Maximum number of results to keep after filtering
40 settings_snapshot: Settings snapshot for thread context
41 programmatic_mode: If True, disables database operations and metrics tracking
42 **kwargs: Additional parameters (ignored but accepted for compatibility)
43 """
44 # Initialize the BaseSearchEngine with the LLM, max_filtered_results, and max_results
45 super().__init__(
46 llm=llm,
47 max_filtered_results=max_filtered_results,
48 max_results=max_results,
49 settings_snapshot=settings_snapshot,
50 programmatic_mode=programmatic_mode,
51 )
53 self.use_api_key_services = use_api_key_services
54 self.max_engines_to_try = max_engines_to_try
55 self.settings_snapshot = settings_snapshot or {}
57 # Cache for engine instances
58 self.engine_cache: Dict[str, Any] = {}
60 # Get available engines (excluding 'meta' and 'auto')
61 self.available_engines = self._get_available_engines()
62 logger.info(
63 f"Meta Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}"
64 )
66 # Create a fallback engine in case everything else fails
67 self.fallback_engine = WikipediaSearchEngine(
68 max_results=self.max_results,
69 llm=llm,
70 max_filtered_results=max_filtered_results,
71 )
73 def _get_search_config(self) -> Dict[str, Any]:
74 """Get search config for available engines (used for reliability/strengths lookups)."""
75 return get_available_engines(
76 settings_snapshot=self.settings_snapshot,
77 use_api_key_services=self.use_api_key_services,
78 )
80 def _get_available_engines(self) -> List[str]:
81 """Get list of available engines based on user settings."""
82 available = get_available_engines(
83 settings_snapshot=self.settings_snapshot,
84 use_api_key_services=self.use_api_key_services,
85 )
87 if not available:
88 error_msg = "No search engines enabled for auto search. Please enable at least one engine in settings."
89 logger.error(error_msg)
90 raise RuntimeError(error_msg)
92 return list(available.keys())
94 def analyze_query(self, query: str) -> List[str]:
95 """
96 Analyze the query to determine the best search engines to use.
97 Prioritizes SearXNG for general queries, but selects specialized engines
98 for domain-specific queries (e.g., scientific papers, code).
100 Args:
101 query: The search query
103 Returns:
104 List of search engine names sorted by suitability
105 """
106 try:
107 # First check if this is a specialized query that should use specific engines
108 specialized_domains = {
109 "scientific paper": ["arxiv", "pubmed", "wikipedia"],
110 "medical research": ["pubmed", "searxng"],
111 "clinical": ["pubmed", "searxng"],
112 "github": ["github", "searxng"],
113 "repository": ["github", "searxng"],
114 "code": ["github", "searxng"],
115 "programming": ["github", "searxng"],
116 }
118 # Quick heuristic check for specialized queries
119 query_lower = query.lower()
120 for term, engines in specialized_domains.items():
121 if term in query_lower:
122 valid_engines = []
123 for engine in engines:
124 if engine in self.available_engines:
125 valid_engines.append(engine)
127 if valid_engines:
128 logger.info(
129 f"Detected specialized query type: {term}, using engines: {valid_engines}"
130 )
131 return valid_engines
133 # For searches containing "arxiv", prioritize the arxiv engine
134 if "arxiv" in query_lower and "arxiv" in self.available_engines:
135 return ["arxiv"] + [
136 e for e in self.available_engines if e != "arxiv"
137 ]
139 # For searches containing "pubmed", prioritize the pubmed engine
140 if "pubmed" in query_lower and "pubmed" in self.available_engines:
141 return ["pubmed"] + [
142 e for e in self.available_engines if e != "pubmed"
143 ]
145 # Check if SearXNG is available and prioritize it for general queries
146 if "searxng" in self.available_engines:
147 # For general queries, return SearXNG first followed by reliability-ordered engines
148 engines_without_searxng = [
149 e for e in self.available_engines if e != "searxng"
150 ]
151 reliability_sorted = sorted(
152 engines_without_searxng,
153 key=lambda x: (
154 self._get_search_config()
155 .get(x, {})
156 .get("reliability", 0)
157 ),
158 reverse=True,
159 )
160 return ["searxng"] + reliability_sorted
162 # If LLM is not available or SearXNG is not available, fall back to reliability
163 if not self.llm or "searxng" not in self.available_engines: 163 ↛ 179line 163 didn't jump to line 179 because the condition on line 163 was always true
164 logger.warning(
165 "No LLM available or SearXNG not available, using reliability-based engines"
166 )
167 # Return engines sorted by reliability
168 return sorted(
169 self.available_engines,
170 key=lambda x: (
171 self._get_search_config()
172 .get(x, {})
173 .get("reliability", 0)
174 ),
175 reverse=True,
176 )
178 # Create a prompt that outlines the available search engines and their strengths
179 engines_info = []
180 for engine_name in self.available_engines:
181 try:
182 if engine_name in self._get_search_config():
183 strengths = self._get_search_config()[engine_name].get(
184 "strengths", "General search"
185 )
186 weaknesses = self._get_search_config()[engine_name].get(
187 "weaknesses", "None specified"
188 )
189 description = self._get_search_config()[
190 engine_name
191 ].get("description", engine_name)
192 engines_info.append(
193 f"- {engine_name}: {description}\n Strengths: {strengths}\n Weaknesses: {weaknesses}"
194 )
195 except KeyError:
196 logger.exception(f"Missing key for engine {engine_name}")
198 # Only proceed if we have engines available to choose from
199 if not engines_info:
200 logger.warning(
201 "No engine information available for prompt, using reliability-based sorting instead"
202 )
203 return sorted(
204 self.available_engines,
205 key=lambda x: (
206 self._get_search_config()
207 .get(x, {})
208 .get("reliability", 0)
209 ),
210 reverse=True,
211 )
213 # Use a stronger prompt that emphasizes SearXNG preference for general queries
214 prompt = f"""You are a search query analyst. Consider this search query:
216QUERY: {query}
218I have these search engines available:
219{chr(10).join(engines_info)}
221Determine which search engines would be most appropriate for answering this query.
222First analyze the nature of the query: Is it factual, scientific, code-related, medical, etc.?
224IMPORTANT GUIDELINES:
225- Use SearXNG for most general queries as it combines results from multiple search engines
226- For academic/scientific searches, prefer arXiv
227- For medical research, prefer PubMed
228- For code repositories and programming, prefer GitHub
229- For every other query type, SearXNG is usually the best option
231Output ONLY a comma-separated list of 1-3 search engine names in order of most appropriate to least appropriate.
232Example output: searxng,wikipedia,brave"""
234 # Get analysis from LLM
235 response = self.llm.invoke(prompt)
237 # Handle different response formats
238 if hasattr(response, "content"):
239 content = response.content.strip()
240 else:
241 content = str(response).strip()
243 # Extract engine names
244 valid_engines = []
245 for engine_name in content.split(","):
246 cleaned_name = engine_name.strip().lower()
247 if cleaned_name in self.available_engines:
248 valid_engines.append(cleaned_name)
250 # If SearXNG is available but not selected by the LLM, add it as a fallback
251 if (
252 "searxng" in self.available_engines
253 and "searxng" not in valid_engines
254 ):
255 # Add it as the last option if the LLM selected others
256 if valid_engines:
257 valid_engines.append("searxng")
258 # Use it as the first option if no valid engines were selected
259 else:
260 valid_engines = ["searxng"]
262 # If still no valid engines, use reliability-based ordering
263 if not valid_engines:
264 valid_engines = sorted(
265 self.available_engines,
266 key=lambda x: (
267 self._get_search_config()
268 .get(x, {})
269 .get("reliability", 0)
270 ),
271 reverse=True,
272 )
274 return valid_engines
275 except Exception:
276 logger.exception("Error analyzing query with LLM")
277 # Fall back to SearXNG if available, then reliability-based ordering
278 if "searxng" in self.available_engines:
279 return ["searxng"] + sorted(
280 [e for e in self.available_engines if e != "searxng"],
281 key=lambda x: (
282 self._get_search_config()
283 .get(x, {})
284 .get("reliability", 0)
285 ),
286 reverse=True,
287 )
288 return sorted(
289 self.available_engines,
290 key=lambda x: (
291 self._get_search_config().get(x, {}).get("reliability", 0)
292 ),
293 reverse=True,
294 )
296 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
297 """
298 Get preview information by selecting the best search engine for this query.
300 Args:
301 query: The search query
303 Returns:
304 List of preview dictionaries
305 """
306 # Get ranked list of engines for this query
307 ranked_engines = self.analyze_query(query)
309 if not ranked_engines:
310 logger.warning(
311 "No suitable search engines found for query, using fallback engine"
312 )
313 return self.fallback_engine._get_previews(query)
315 # Limit the number of engines to try
316 engines_to_try = ranked_engines[: self.max_engines_to_try]
317 logger.info(
318 f"SEARCH_PLAN: Will try these engines in order: {', '.join(engines_to_try)}"
319 )
321 all_errors = []
322 # Try each engine in order
323 for engine_name in engines_to_try:
324 logger.info(f"Trying search engine: {engine_name}")
326 # Get or create the engine instance
327 engine = self._get_engine_instance(engine_name)
329 if not engine:
330 logger.warning(f"Failed to initialize {engine_name}, skipping")
331 all_errors.append(f"Failed to initialize {engine_name}")
332 continue
334 try:
335 # Get previews from this engine
336 previews = engine._get_previews(query)
338 # If search was successful, return results
339 if previews and len(previews) > 0:
340 logger.info(f"ENGINE_SELECTED: {engine_name}")
341 logger.info(
342 f"Successfully got {len(previews)} preview results from {engine_name}"
343 )
344 # Store selected engine for later use
345 self._selected_engine = engine
346 self._selected_engine_name = engine_name
348 # Emit a socket event to inform about the selected engine
349 try:
350 SocketIOService().emit_socket_event(
351 "search_engine_selected",
352 {
353 "engine": engine_name,
354 "result_count": len(previews),
355 },
356 )
357 except Exception:
358 logger.exception("Socket emit error (non-critical)")
360 return previews
362 logger.info(f"{engine_name} returned no previews")
363 all_errors.append(f"{engine_name} returned no previews")
365 except Exception as e:
366 error_msg = f"Error getting previews from {engine_name}: {e!s}"
367 logger.exception(error_msg)
368 all_errors.append(error_msg)
370 # If we reach here, all engines failed, use fallback
371 logger.warning(
372 f"All engines failed or returned no preview results: {', '.join(all_errors)}"
373 )
374 logger.info("Using fallback Wikipedia engine for previews")
375 self._selected_engine = self.fallback_engine
376 self._selected_engine_name = "wikipedia"
377 return self.fallback_engine._get_previews(query)
379 def _get_full_content(
380 self, relevant_items: List[Dict[str, Any]]
381 ) -> List[Dict[str, Any]]:
382 """
383 Get full content using the engine that provided the previews.
385 Args:
386 relevant_items: List of relevant preview dictionaries
388 Returns:
389 List of result dictionaries with full content
390 """
391 # Check if we should get full content
392 if get_setting_from_snapshot(
393 "search.snippets_only",
394 True,
395 settings_snapshot=self.settings_snapshot,
396 ):
397 logger.info("Snippet-only mode, skipping full content retrieval")
398 return relevant_items
400 logger.info("Getting full content for relevant items")
402 # Use the selected engine to get full content
403 if hasattr(self, "_selected_engine"):
404 try:
405 logger.info(
406 f"Using {self._selected_engine_name} to get full content"
407 )
408 return self._selected_engine._get_full_content(relevant_items)
409 except Exception:
410 logger.exception(
411 f"Error getting full content from {self._selected_engine_name}"
412 )
413 # Fall back to returning relevant items without full content
414 return relevant_items
415 else:
416 logger.warning(
417 "No engine was selected during preview phase, returning relevant items as-is"
418 )
419 return relevant_items
421 def _get_engine_instance(
422 self, engine_name: str
423 ) -> Optional[BaseSearchEngine]:
424 """Get or create an instance of the specified search engine"""
425 # Return cached instance if available
426 if engine_name in self.engine_cache:
427 return self.engine_cache[engine_name] # type: ignore[no-any-return]
429 # Create a new instance
430 engine = None
431 try:
432 # Only pass parameters that all engines accept
433 common_params = {"llm": self.llm, "max_results": self.max_results}
435 # Add max_filtered_results if specified
436 if self.max_filtered_results is not None: 436 ↛ 441line 436 didn't jump to line 441 because the condition on line 436 was always true
437 common_params["max_filtered_results"] = (
438 self.max_filtered_results
439 )
441 engine = create_search_engine(
442 engine_name,
443 settings_snapshot=self.settings_snapshot,
444 programmatic_mode=self.programmatic_mode,
445 **common_params, # type: ignore[arg-type]
446 )
447 except Exception:
448 logger.exception(
449 f"Error creating engine instance for {engine_name}"
450 )
451 return None
453 if engine:
454 # Cache the instance
455 self.engine_cache[engine_name] = engine
457 return engine
459 def close(self):
460 """Close all cached child search engines, fallback engine, and own resources."""
461 from ...utilities.resource_utils import safe_close
463 for engine in self.engine_cache.values():
464 safe_close(engine, "child search engine")
465 self.engine_cache.clear()
466 if (
467 hasattr(self, "fallback_engine")
468 and self.fallback_engine is not None
469 ):
470 safe_close(self.fallback_engine, "fallback search engine")
471 super().close()
473 def invoke(self, query: str) -> List[Dict[str, Any]]:
474 """Compatibility method for LangChain tools"""
475 return self.run(query)