Coverage for src / local_deep_research / web_search_engines / engines / meta_search_engine.py: 78%

163 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1from typing import Any, Dict, List, Optional 

2 

3from loguru import logger 

4 

5from ...config.search_config import get_setting_from_snapshot 

6from ...web.services.socket_service import SocketIOService 

7from ..search_engine_base import BaseSearchEngine 

8from ..search_engine_factory import create_search_engine 

9from ..search_engines_config import get_available_engines 

10from .search_engine_wikipedia import WikipediaSearchEngine 

11 

12 

13class MetaSearchEngine(BaseSearchEngine): 

14 """ 

15 LLM-powered meta search engine that intelligently selects and uses 

16 the appropriate search engines based on query analysis 

17 """ 

18 

19 def __init__( 

20 self, 

21 llm, 

22 max_results: int = 10, 

23 use_api_key_services: bool = True, 

24 max_engines_to_try: int = 3, 

25 max_filtered_results: Optional[int] = None, 

26 _engine_selection_callback=None, 

27 settings_snapshot: Optional[Dict[str, Any]] = None, 

28 programmatic_mode: bool = False, 

29 **kwargs, 

30 ): 

31 """ 

32 Initialize the meta search engine. 

33 

34 Args: 

35 llm: Language model instance for query classification and relevance filtering 

36 max_results: Maximum number of search results to return 

37 use_api_key_services: Whether to include services that require API keys 

38 max_engines_to_try: Maximum number of engines to try before giving up 

39 max_filtered_results: Maximum number of results to keep after filtering 

40 settings_snapshot: Settings snapshot for thread context 

41 programmatic_mode: If True, disables database operations and metrics tracking 

42 **kwargs: Additional parameters (ignored but accepted for compatibility) 

43 """ 

44 # Initialize the BaseSearchEngine with the LLM, max_filtered_results, and max_results 

45 super().__init__( 

46 llm=llm, 

47 max_filtered_results=max_filtered_results, 

48 max_results=max_results, 

49 settings_snapshot=settings_snapshot, 

50 programmatic_mode=programmatic_mode, 

51 ) 

52 

53 self.use_api_key_services = use_api_key_services 

54 self.max_engines_to_try = max_engines_to_try 

55 self.settings_snapshot = settings_snapshot or {} 

56 

57 # Cache for engine instances 

58 self.engine_cache: Dict[str, Any] = {} 

59 

60 # Get available engines (excluding 'meta' and 'auto') 

61 self.available_engines = self._get_available_engines() 

62 logger.info( 

63 f"Meta Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}" 

64 ) 

65 

66 # Create a fallback engine in case everything else fails 

67 self.fallback_engine = WikipediaSearchEngine( 

68 max_results=self.max_results, 

69 llm=llm, 

70 max_filtered_results=max_filtered_results, 

71 ) 

72 

73 def _get_search_config(self) -> Dict[str, Any]: 

74 """Get search config for available engines (used for reliability/strengths lookups).""" 

75 return get_available_engines( 

76 settings_snapshot=self.settings_snapshot, 

77 use_api_key_services=self.use_api_key_services, 

78 ) 

79 

80 def _get_available_engines(self) -> List[str]: 

81 """Get list of available engines based on user settings.""" 

82 available = get_available_engines( 

83 settings_snapshot=self.settings_snapshot, 

84 use_api_key_services=self.use_api_key_services, 

85 ) 

86 

87 if not available: 

88 error_msg = "No search engines enabled for auto search. Please enable at least one engine in settings." 

89 logger.error(error_msg) 

90 raise RuntimeError(error_msg) 

91 

92 return list(available.keys()) 

93 

94 def analyze_query(self, query: str) -> List[str]: 

95 """ 

96 Analyze the query to determine the best search engines to use. 

97 Prioritizes SearXNG for general queries, but selects specialized engines 

98 for domain-specific queries (e.g., scientific papers, code). 

99 

100 Args: 

101 query: The search query 

102 

103 Returns: 

104 List of search engine names sorted by suitability 

105 """ 

106 try: 

107 # First check if this is a specialized query that should use specific engines 

108 specialized_domains = { 

109 "scientific paper": ["arxiv", "pubmed", "wikipedia"], 

110 "medical research": ["pubmed", "searxng"], 

111 "clinical": ["pubmed", "searxng"], 

112 "github": ["github", "searxng"], 

113 "repository": ["github", "searxng"], 

114 "code": ["github", "searxng"], 

115 "programming": ["github", "searxng"], 

116 } 

117 

118 # Quick heuristic check for specialized queries 

119 query_lower = query.lower() 

120 for term, engines in specialized_domains.items(): 

121 if term in query_lower: 

122 valid_engines = [] 

123 for engine in engines: 

124 if engine in self.available_engines: 

125 valid_engines.append(engine) 

126 

127 if valid_engines: 

128 logger.info( 

129 f"Detected specialized query type: {term}, using engines: {valid_engines}" 

130 ) 

131 return valid_engines 

132 

133 # For searches containing "arxiv", prioritize the arxiv engine 

134 if "arxiv" in query_lower and "arxiv" in self.available_engines: 

135 return ["arxiv"] + [ 

136 e for e in self.available_engines if e != "arxiv" 

137 ] 

138 

139 # For searches containing "pubmed", prioritize the pubmed engine 

140 if "pubmed" in query_lower and "pubmed" in self.available_engines: 

141 return ["pubmed"] + [ 

142 e for e in self.available_engines if e != "pubmed" 

143 ] 

144 

145 # Check if SearXNG is available and prioritize it for general queries 

146 if "searxng" in self.available_engines: 

147 # For general queries, return SearXNG first followed by reliability-ordered engines 

148 engines_without_searxng = [ 

149 e for e in self.available_engines if e != "searxng" 

150 ] 

151 reliability_sorted = sorted( 

152 engines_without_searxng, 

153 key=lambda x: ( 

154 self._get_search_config() 

155 .get(x, {}) 

156 .get("reliability", 0) 

157 ), 

158 reverse=True, 

159 ) 

160 return ["searxng"] + reliability_sorted 

161 

162 # If LLM is not available or SearXNG is not available, fall back to reliability 

163 if not self.llm or "searxng" not in self.available_engines: 163 ↛ 179line 163 didn't jump to line 179 because the condition on line 163 was always true

164 logger.warning( 

165 "No LLM available or SearXNG not available, using reliability-based engines" 

166 ) 

167 # Return engines sorted by reliability 

168 return sorted( 

169 self.available_engines, 

170 key=lambda x: ( 

171 self._get_search_config() 

172 .get(x, {}) 

173 .get("reliability", 0) 

174 ), 

175 reverse=True, 

176 ) 

177 

178 # Create a prompt that outlines the available search engines and their strengths 

179 engines_info = [] 

180 for engine_name in self.available_engines: 

181 try: 

182 if engine_name in self._get_search_config(): 

183 strengths = self._get_search_config()[engine_name].get( 

184 "strengths", "General search" 

185 ) 

186 weaknesses = self._get_search_config()[engine_name].get( 

187 "weaknesses", "None specified" 

188 ) 

189 description = self._get_search_config()[ 

190 engine_name 

191 ].get("description", engine_name) 

192 engines_info.append( 

193 f"- {engine_name}: {description}\n Strengths: {strengths}\n Weaknesses: {weaknesses}" 

194 ) 

195 except KeyError: 

196 logger.exception(f"Missing key for engine {engine_name}") 

197 

198 # Only proceed if we have engines available to choose from 

199 if not engines_info: 

200 logger.warning( 

201 "No engine information available for prompt, using reliability-based sorting instead" 

202 ) 

203 return sorted( 

204 self.available_engines, 

205 key=lambda x: ( 

206 self._get_search_config() 

207 .get(x, {}) 

208 .get("reliability", 0) 

209 ), 

210 reverse=True, 

211 ) 

212 

213 # Use a stronger prompt that emphasizes SearXNG preference for general queries 

214 prompt = f"""You are a search query analyst. Consider this search query: 

215 

216QUERY: {query} 

217 

218I have these search engines available: 

219{chr(10).join(engines_info)} 

220 

221Determine which search engines would be most appropriate for answering this query. 

222First analyze the nature of the query: Is it factual, scientific, code-related, medical, etc.? 

223 

224IMPORTANT GUIDELINES: 

225- Use SearXNG for most general queries as it combines results from multiple search engines 

226- For academic/scientific searches, prefer arXiv 

227- For medical research, prefer PubMed 

228- For code repositories and programming, prefer GitHub 

229- For every other query type, SearXNG is usually the best option 

230 

231Output ONLY a comma-separated list of 1-3 search engine names in order of most appropriate to least appropriate. 

232Example output: searxng,wikipedia,brave""" 

233 

234 # Get analysis from LLM 

235 response = self.llm.invoke(prompt) 

236 

237 # Handle different response formats 

238 if hasattr(response, "content"): 

239 content = response.content.strip() 

240 else: 

241 content = str(response).strip() 

242 

243 # Extract engine names 

244 valid_engines = [] 

245 for engine_name in content.split(","): 

246 cleaned_name = engine_name.strip().lower() 

247 if cleaned_name in self.available_engines: 

248 valid_engines.append(cleaned_name) 

249 

250 # If SearXNG is available but not selected by the LLM, add it as a fallback 

251 if ( 

252 "searxng" in self.available_engines 

253 and "searxng" not in valid_engines 

254 ): 

255 # Add it as the last option if the LLM selected others 

256 if valid_engines: 

257 valid_engines.append("searxng") 

258 # Use it as the first option if no valid engines were selected 

259 else: 

260 valid_engines = ["searxng"] 

261 

262 # If still no valid engines, use reliability-based ordering 

263 if not valid_engines: 

264 valid_engines = sorted( 

265 self.available_engines, 

266 key=lambda x: ( 

267 self._get_search_config() 

268 .get(x, {}) 

269 .get("reliability", 0) 

270 ), 

271 reverse=True, 

272 ) 

273 

274 return valid_engines 

275 except Exception: 

276 logger.exception("Error analyzing query with LLM") 

277 # Fall back to SearXNG if available, then reliability-based ordering 

278 if "searxng" in self.available_engines: 

279 return ["searxng"] + sorted( 

280 [e for e in self.available_engines if e != "searxng"], 

281 key=lambda x: ( 

282 self._get_search_config() 

283 .get(x, {}) 

284 .get("reliability", 0) 

285 ), 

286 reverse=True, 

287 ) 

288 return sorted( 

289 self.available_engines, 

290 key=lambda x: ( 

291 self._get_search_config().get(x, {}).get("reliability", 0) 

292 ), 

293 reverse=True, 

294 ) 

295 

296 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

297 """ 

298 Get preview information by selecting the best search engine for this query. 

299 

300 Args: 

301 query: The search query 

302 

303 Returns: 

304 List of preview dictionaries 

305 """ 

306 # Get ranked list of engines for this query 

307 ranked_engines = self.analyze_query(query) 

308 

309 if not ranked_engines: 

310 logger.warning( 

311 "No suitable search engines found for query, using fallback engine" 

312 ) 

313 return self.fallback_engine._get_previews(query) 

314 

315 # Limit the number of engines to try 

316 engines_to_try = ranked_engines[: self.max_engines_to_try] 

317 logger.info( 

318 f"SEARCH_PLAN: Will try these engines in order: {', '.join(engines_to_try)}" 

319 ) 

320 

321 all_errors = [] 

322 # Try each engine in order 

323 for engine_name in engines_to_try: 

324 logger.info(f"Trying search engine: {engine_name}") 

325 

326 # Get or create the engine instance 

327 engine = self._get_engine_instance(engine_name) 

328 

329 if not engine: 

330 logger.warning(f"Failed to initialize {engine_name}, skipping") 

331 all_errors.append(f"Failed to initialize {engine_name}") 

332 continue 

333 

334 try: 

335 # Get previews from this engine 

336 previews = engine._get_previews(query) 

337 

338 # If search was successful, return results 

339 if previews and len(previews) > 0: 

340 logger.info(f"ENGINE_SELECTED: {engine_name}") 

341 logger.info( 

342 f"Successfully got {len(previews)} preview results from {engine_name}" 

343 ) 

344 # Store selected engine for later use 

345 self._selected_engine = engine 

346 self._selected_engine_name = engine_name 

347 

348 # Emit a socket event to inform about the selected engine 

349 try: 

350 SocketIOService().emit_socket_event( 

351 "search_engine_selected", 

352 { 

353 "engine": engine_name, 

354 "result_count": len(previews), 

355 }, 

356 ) 

357 except Exception: 

358 logger.exception("Socket emit error (non-critical)") 

359 

360 return previews 

361 

362 logger.info(f"{engine_name} returned no previews") 

363 all_errors.append(f"{engine_name} returned no previews") 

364 

365 except Exception as e: 

366 error_msg = f"Error getting previews from {engine_name}: {e!s}" 

367 logger.exception(error_msg) 

368 all_errors.append(error_msg) 

369 

370 # If we reach here, all engines failed, use fallback 

371 logger.warning( 

372 f"All engines failed or returned no preview results: {', '.join(all_errors)}" 

373 ) 

374 logger.info("Using fallback Wikipedia engine for previews") 

375 self._selected_engine = self.fallback_engine 

376 self._selected_engine_name = "wikipedia" 

377 return self.fallback_engine._get_previews(query) 

378 

379 def _get_full_content( 

380 self, relevant_items: List[Dict[str, Any]] 

381 ) -> List[Dict[str, Any]]: 

382 """ 

383 Get full content using the engine that provided the previews. 

384 

385 Args: 

386 relevant_items: List of relevant preview dictionaries 

387 

388 Returns: 

389 List of result dictionaries with full content 

390 """ 

391 # Check if we should get full content 

392 if get_setting_from_snapshot( 

393 "search.snippets_only", 

394 True, 

395 settings_snapshot=self.settings_snapshot, 

396 ): 

397 logger.info("Snippet-only mode, skipping full content retrieval") 

398 return relevant_items 

399 

400 logger.info("Getting full content for relevant items") 

401 

402 # Use the selected engine to get full content 

403 if hasattr(self, "_selected_engine"): 

404 try: 

405 logger.info( 

406 f"Using {self._selected_engine_name} to get full content" 

407 ) 

408 return self._selected_engine._get_full_content(relevant_items) 

409 except Exception: 

410 logger.exception( 

411 f"Error getting full content from {self._selected_engine_name}" 

412 ) 

413 # Fall back to returning relevant items without full content 

414 return relevant_items 

415 else: 

416 logger.warning( 

417 "No engine was selected during preview phase, returning relevant items as-is" 

418 ) 

419 return relevant_items 

420 

421 def _get_engine_instance( 

422 self, engine_name: str 

423 ) -> Optional[BaseSearchEngine]: 

424 """Get or create an instance of the specified search engine""" 

425 # Return cached instance if available 

426 if engine_name in self.engine_cache: 

427 return self.engine_cache[engine_name] # type: ignore[no-any-return] 

428 

429 # Create a new instance 

430 engine = None 

431 try: 

432 # Only pass parameters that all engines accept 

433 common_params = {"llm": self.llm, "max_results": self.max_results} 

434 

435 # Add max_filtered_results if specified 

436 if self.max_filtered_results is not None: 436 ↛ 441line 436 didn't jump to line 441 because the condition on line 436 was always true

437 common_params["max_filtered_results"] = ( 

438 self.max_filtered_results 

439 ) 

440 

441 engine = create_search_engine( 

442 engine_name, 

443 settings_snapshot=self.settings_snapshot, 

444 programmatic_mode=self.programmatic_mode, 

445 **common_params, # type: ignore[arg-type] 

446 ) 

447 except Exception: 

448 logger.exception( 

449 f"Error creating engine instance for {engine_name}" 

450 ) 

451 return None 

452 

453 if engine: 

454 # Cache the instance 

455 self.engine_cache[engine_name] = engine 

456 

457 return engine 

458 

459 def close(self): 

460 """Close all cached child search engines, fallback engine, and own resources.""" 

461 from ...utilities.resource_utils import safe_close 

462 

463 for engine in self.engine_cache.values(): 

464 safe_close(engine, "child search engine") 

465 self.engine_cache.clear() 

466 if ( 

467 hasattr(self, "fallback_engine") 

468 and self.fallback_engine is not None 

469 ): 

470 safe_close(self.fallback_engine, "fallback search engine") 

471 super().close() 

472 

473 def invoke(self, query: str) -> List[Dict[str, Any]]: 

474 """Compatibility method for LangChain tools""" 

475 return self.run(query)