Coverage for src / local_deep_research / web_search_engines / engines / meta_search_engine.py: 80%

194 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1from typing import Any, Dict, List, Optional 

2 

3from loguru import logger 

4 

5from ...config.search_config import get_setting_from_snapshot 

6from ...web.services.socket_service import SocketIOService 

7from ..search_engine_base import BaseSearchEngine 

8from ..search_engine_factory import create_search_engine 

9from .search_engine_wikipedia import WikipediaSearchEngine 

10 

11 

12class MetaSearchEngine(BaseSearchEngine): 

13 """ 

14 LLM-powered meta search engine that intelligently selects and uses 

15 the appropriate search engines based on query analysis 

16 """ 

17 

18 def __init__( 

19 self, 

20 llm, 

21 max_results: int = 10, 

22 use_api_key_services: bool = True, 

23 max_engines_to_try: int = 3, 

24 max_filtered_results: Optional[int] = None, 

25 _engine_selection_callback=None, 

26 settings_snapshot: Optional[Dict[str, Any]] = None, 

27 programmatic_mode: bool = False, 

28 **kwargs, 

29 ): 

30 """ 

31 Initialize the meta search engine. 

32 

33 Args: 

34 llm: Language model instance for query classification and relevance filtering 

35 max_results: Maximum number of search results to return 

36 use_api_key_services: Whether to include services that require API keys 

37 max_engines_to_try: Maximum number of engines to try before giving up 

38 max_filtered_results: Maximum number of results to keep after filtering 

39 settings_snapshot: Settings snapshot for thread context 

40 programmatic_mode: If True, disables database operations and metrics tracking 

41 **kwargs: Additional parameters (ignored but accepted for compatibility) 

42 """ 

43 # Initialize the BaseSearchEngine with the LLM, max_filtered_results, and max_results 

44 super().__init__( 

45 llm=llm, 

46 max_filtered_results=max_filtered_results, 

47 max_results=max_results, 

48 settings_snapshot=settings_snapshot, 

49 programmatic_mode=programmatic_mode, 

50 ) 

51 

52 self.use_api_key_services = use_api_key_services 

53 self.max_engines_to_try = max_engines_to_try 

54 self.settings_snapshot = settings_snapshot 

55 

56 # Cache for engine instances 

57 self.engine_cache = {} 

58 

59 # Get available engines (excluding 'meta' and 'auto') 

60 self.available_engines = self._get_available_engines() 

61 logger.info( 

62 f"Meta Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}" 

63 ) 

64 

65 # Create a fallback engine in case everything else fails 

66 self.fallback_engine = WikipediaSearchEngine( 

67 max_results=self.max_results, 

68 llm=llm, 

69 max_filtered_results=max_filtered_results, 

70 ) 

71 

72 def _get_search_config(self) -> Dict[str, Any]: 

73 """Get search config from settings_snapshot or return empty dict.""" 

74 if self.settings_snapshot: 

75 # Extract search engine configs from settings snapshot 

76 config_data = {} 

77 for key, value in self.settings_snapshot.items(): 

78 if key.startswith("search.engine.web."): 

79 parts = key.split(".") 

80 if len(parts) >= 4: 80 ↛ 77line 80 didn't jump to line 77 because the condition on line 80 was always true

81 engine_name = parts[3] 

82 if engine_name not in config_data: 

83 config_data[engine_name] = {} 

84 remaining_key = ( 

85 ".".join(parts[4:]) if len(parts) > 4 else "" 

86 ) 

87 if remaining_key: 87 ↛ 77line 87 didn't jump to line 77 because the condition on line 87 was always true

88 config_data[engine_name][remaining_key] = ( 

89 value.get("value") 

90 if isinstance(value, dict) 

91 else value 

92 ) 

93 

94 # Also check for auto engine 

95 if "search.engine.auto.class_name" in self.settings_snapshot: 

96 config_data["auto"] = {} 

97 for key, value in self.settings_snapshot.items(): 

98 if key.startswith("search.engine.auto."): 

99 remaining_key = key.replace("search.engine.auto.", "") 

100 config_data["auto"][remaining_key] = ( 

101 value.get("value") 

102 if isinstance(value, dict) 

103 else value 

104 ) 

105 return config_data 

106 else: 

107 logger.warning( 

108 "No settings_snapshot provided to MetaSearchEngine, " 

109 "returning empty search config" 

110 ) 

111 return {} 

112 

113 def _get_available_engines(self) -> List[str]: 

114 """Get list of available engines, excluding 'meta' and 'auto', based on user settings""" 

115 # Filter out 'meta' and 'auto' and check API key availability 

116 available = [] 

117 

118 # Get search config using helper method 

119 config_data = self._get_search_config() 

120 

121 for name, config_ in config_data.items(): 

122 if name in ["meta", "auto"]: 

123 continue 

124 

125 # Determine if this is a local engine (starts with "local.") 

126 is_local_engine = name.startswith("local.") 

127 

128 # Determine the appropriate setting path based on engine type 

129 if is_local_engine: 

130 # Format: search.engine.local.{engine_name}.use_in_auto_search 

131 local_name = name.replace("local.", "") 

132 auto_search_setting = ( 

133 f"search.engine.local.{local_name}.use_in_auto_search" 

134 ) 

135 else: 

136 # Format: search.engine.web.{engine_name}.use_in_auto_search 

137 auto_search_setting = ( 

138 f"search.engine.web.{name}.use_in_auto_search" 

139 ) 

140 

141 # Get setting from database, default to False if not found 

142 use_in_auto_search = get_setting_from_snapshot( 

143 auto_search_setting, 

144 False, 

145 settings_snapshot=self.settings_snapshot, 

146 ) 

147 

148 # Skip engines that aren't enabled for auto search 

149 if not use_in_auto_search: 

150 logger.info( 

151 f"Skipping {name} engine because it's not enabled for auto search" 

152 ) 

153 continue 

154 

155 # Skip engines that require API keys if we don't want to use them 

156 if ( 

157 config_.get("requires_api_key", False) 

158 and not self.use_api_key_services 

159 ): 

160 continue 

161 

162 # Skip engines that require API keys if the key is not available 

163 if config_.get("requires_api_key", False): 

164 api_key = config_.get("api_key") 

165 if not api_key: 

166 continue 

167 

168 available.append(name) 

169 

170 # If no engines are available, raise an error instead of falling back silently 

171 if not available: 

172 error_msg = "No search engines enabled for auto search. Please enable at least one engine in settings." 

173 logger.error(error_msg) 

174 raise RuntimeError(error_msg) 

175 

176 return available 

177 

178 def analyze_query(self, query: str) -> List[str]: 

179 """ 

180 Analyze the query to determine the best search engines to use. 

181 Prioritizes SearXNG for general queries, but selects specialized engines 

182 for domain-specific queries (e.g., scientific papers, code). 

183 

184 Args: 

185 query: The search query 

186 

187 Returns: 

188 List of search engine names sorted by suitability 

189 """ 

190 try: 

191 # First check if this is a specialized query that should use specific engines 

192 specialized_domains = { 

193 "scientific paper": ["arxiv", "pubmed", "wikipedia"], 

194 "medical research": ["pubmed", "searxng"], 

195 "clinical": ["pubmed", "searxng"], 

196 "github": ["github", "searxng"], 

197 "repository": ["github", "searxng"], 

198 "code": ["github", "searxng"], 

199 "programming": ["github", "searxng"], 

200 } 

201 

202 # Quick heuristic check for specialized queries 

203 query_lower = query.lower() 

204 for term, engines in specialized_domains.items(): 

205 if term in query_lower: 

206 valid_engines = [] 

207 for engine in engines: 

208 if engine in self.available_engines: 

209 valid_engines.append(engine) 

210 

211 if valid_engines: 211 ↛ 204line 211 didn't jump to line 204 because the condition on line 211 was always true

212 logger.info( 

213 f"Detected specialized query type: {term}, using engines: {valid_engines}" 

214 ) 

215 return valid_engines 

216 

217 # For searches containing "arxiv", prioritize the arxiv engine 

218 if "arxiv" in query_lower and "arxiv" in self.available_engines: 

219 return ["arxiv"] + [ 

220 e for e in self.available_engines if e != "arxiv" 

221 ] 

222 

223 # For searches containing "pubmed", prioritize the pubmed engine 

224 if "pubmed" in query_lower and "pubmed" in self.available_engines: 

225 return ["pubmed"] + [ 

226 e for e in self.available_engines if e != "pubmed" 

227 ] 

228 

229 # Check if SearXNG is available and prioritize it for general queries 

230 if "searxng" in self.available_engines: 

231 # For general queries, return SearXNG first followed by reliability-ordered engines 

232 engines_without_searxng = [ 

233 e for e in self.available_engines if e != "searxng" 

234 ] 

235 reliability_sorted = sorted( 

236 engines_without_searxng, 

237 key=lambda x: self._get_search_config() 

238 .get(x, {}) 

239 .get("reliability", 0), 

240 reverse=True, 

241 ) 

242 return ["searxng"] + reliability_sorted 

243 

244 # If LLM is not available or SearXNG is not available, fall back to reliability 

245 if not self.llm or "searxng" not in self.available_engines: 245 ↛ 259line 245 didn't jump to line 259 because the condition on line 245 was always true

246 logger.warning( 

247 "No LLM available or SearXNG not available, using reliability-based engines" 

248 ) 

249 # Return engines sorted by reliability 

250 return sorted( 

251 self.available_engines, 

252 key=lambda x: self._get_search_config() 

253 .get(x, {}) 

254 .get("reliability", 0), 

255 reverse=True, 

256 ) 

257 

258 # Create a prompt that outlines the available search engines and their strengths 

259 engines_info = [] 

260 for engine_name in self.available_engines: 

261 try: 

262 if engine_name in self._get_search_config(): 

263 strengths = self._get_search_config()[engine_name].get( 

264 "strengths", "General search" 

265 ) 

266 weaknesses = self._get_search_config()[engine_name].get( 

267 "weaknesses", "None specified" 

268 ) 

269 description = self._get_search_config()[ 

270 engine_name 

271 ].get("description", engine_name) 

272 engines_info.append( 

273 f"- {engine_name}: {description}\n Strengths: {strengths}\n Weaknesses: {weaknesses}" 

274 ) 

275 except KeyError: 

276 logger.exception(f"Missing key for engine {engine_name}") 

277 

278 # Only proceed if we have engines available to choose from 

279 if not engines_info: 

280 logger.warning( 

281 "No engine information available for prompt, using reliability-based sorting instead" 

282 ) 

283 return sorted( 

284 self.available_engines, 

285 key=lambda x: self._get_search_config() 

286 .get(x, {}) 

287 .get("reliability", 0), 

288 reverse=True, 

289 ) 

290 

291 # Use a stronger prompt that emphasizes SearXNG preference for general queries 

292 prompt = f"""You are a search query analyst. Consider this search query: 

293 

294QUERY: {query} 

295 

296I have these search engines available: 

297{chr(10).join(engines_info)} 

298 

299Determine which search engines would be most appropriate for answering this query. 

300First analyze the nature of the query: Is it factual, scientific, code-related, medical, etc.? 

301 

302IMPORTANT GUIDELINES: 

303- Use SearXNG for most general queries as it combines results from multiple search engines 

304- For academic/scientific searches, prefer arXiv 

305- For medical research, prefer PubMed 

306- For code repositories and programming, prefer GitHub 

307- For every other query type, SearXNG is usually the best option 

308 

309Output ONLY a comma-separated list of 1-3 search engine names in order of most appropriate to least appropriate. 

310Example output: searxng,wikipedia,brave""" 

311 

312 # Get analysis from LLM 

313 response = self.llm.invoke(prompt) 

314 

315 # Handle different response formats 

316 if hasattr(response, "content"): 

317 content = response.content.strip() 

318 else: 

319 content = str(response).strip() 

320 

321 # Extract engine names 

322 valid_engines = [] 

323 for engine_name in content.split(","): 

324 cleaned_name = engine_name.strip().lower() 

325 if cleaned_name in self.available_engines: 

326 valid_engines.append(cleaned_name) 

327 

328 # If SearXNG is available but not selected by the LLM, add it as a fallback 

329 if ( 

330 "searxng" in self.available_engines 

331 and "searxng" not in valid_engines 

332 ): 

333 # Add it as the last option if the LLM selected others 

334 if valid_engines: 

335 valid_engines.append("searxng") 

336 # Use it as the first option if no valid engines were selected 

337 else: 

338 valid_engines = ["searxng"] 

339 

340 # If still no valid engines, use reliability-based ordering 

341 if not valid_engines: 

342 valid_engines = sorted( 

343 self.available_engines, 

344 key=lambda x: self._get_search_config() 

345 .get(x, {}) 

346 .get("reliability", 0), 

347 reverse=True, 

348 ) 

349 

350 return valid_engines 

351 except Exception: 

352 logger.exception("Error analyzing query with LLM") 

353 # Fall back to SearXNG if available, then reliability-based ordering 

354 if "searxng" in self.available_engines: 

355 return ["searxng"] + sorted( 

356 [e for e in self.available_engines if e != "searxng"], 

357 key=lambda x: self._get_search_config() 

358 .get(x, {}) 

359 .get("reliability", 0), 

360 reverse=True, 

361 ) 

362 else: 

363 return sorted( 

364 self.available_engines, 

365 key=lambda x: self._get_search_config() 

366 .get(x, {}) 

367 .get("reliability", 0), 

368 reverse=True, 

369 ) 

370 

371 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

372 """ 

373 Get preview information by selecting the best search engine for this query. 

374 

375 Args: 

376 query: The search query 

377 

378 Returns: 

379 List of preview dictionaries 

380 """ 

381 # Get ranked list of engines for this query 

382 ranked_engines = self.analyze_query(query) 

383 

384 if not ranked_engines: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 logger.warning( 

386 "No suitable search engines found for query, using fallback engine" 

387 ) 

388 return self.fallback_engine._get_previews(query) 

389 

390 # Limit the number of engines to try 

391 engines_to_try = ranked_engines[: self.max_engines_to_try] 

392 logger.info( 

393 f"SEARCH_PLAN: Will try these engines in order: {', '.join(engines_to_try)}" 

394 ) 

395 

396 all_errors = [] 

397 # Try each engine in order 

398 for engine_name in engines_to_try: 

399 logger.info(f"Trying search engine: {engine_name}") 

400 

401 # Get or create the engine instance 

402 engine = self._get_engine_instance(engine_name) 

403 

404 if not engine: 

405 logger.warning(f"Failed to initialize {engine_name}, skipping") 

406 all_errors.append(f"Failed to initialize {engine_name}") 

407 continue 

408 

409 try: 

410 # Get previews from this engine 

411 previews = engine._get_previews(query) 

412 

413 # If search was successful, return results 

414 if previews and len(previews) > 0: 

415 logger.info(f"ENGINE_SELECTED: {engine_name}") 

416 logger.info( 

417 f"Successfully got {len(previews)} preview results from {engine_name}" 

418 ) 

419 # Store selected engine for later use 

420 self._selected_engine = engine 

421 self._selected_engine_name = engine_name 

422 

423 # Emit a socket event to inform about the selected engine 

424 try: 

425 SocketIOService().emit_socket_event( 

426 "search_engine_selected", 

427 { 

428 "engine": engine_name, 

429 "result_count": len(previews), 

430 }, 

431 ) 

432 except Exception: 

433 logger.exception("Socket emit error (non-critical)") 

434 

435 return previews 

436 

437 logger.info(f"{engine_name} returned no previews") 

438 all_errors.append(f"{engine_name} returned no previews") 

439 

440 except Exception as e: 

441 error_msg = f"Error getting previews from {engine_name}: {e!s}" 

442 logger.exception(error_msg) 

443 all_errors.append(error_msg) 

444 

445 # If we reach here, all engines failed, use fallback 

446 logger.warning( 

447 f"All engines failed or returned no preview results: {', '.join(all_errors)}" 

448 ) 

449 logger.info("Using fallback Wikipedia engine for previews") 

450 self._selected_engine = self.fallback_engine 

451 self._selected_engine_name = "wikipedia" 

452 return self.fallback_engine._get_previews(query) 

453 

454 def _get_full_content( 

455 self, relevant_items: List[Dict[str, Any]] 

456 ) -> List[Dict[str, Any]]: 

457 """ 

458 Get full content using the engine that provided the previews. 

459 

460 Args: 

461 relevant_items: List of relevant preview dictionaries 

462 

463 Returns: 

464 List of result dictionaries with full content 

465 """ 

466 # Check if we should get full content 

467 if get_setting_from_snapshot( 

468 "search.snippets_only", 

469 True, 

470 settings_snapshot=self.settings_snapshot, 

471 ): 

472 logger.info("Snippet-only mode, skipping full content retrieval") 

473 return relevant_items 

474 

475 logger.info("Getting full content for relevant items") 

476 

477 # Use the selected engine to get full content 

478 if hasattr(self, "_selected_engine"): 

479 try: 

480 logger.info( 

481 f"Using {self._selected_engine_name} to get full content" 

482 ) 

483 return self._selected_engine._get_full_content(relevant_items) 

484 except Exception: 

485 logger.exception( 

486 f"Error getting full content from {self._selected_engine_name}" 

487 ) 

488 # Fall back to returning relevant items without full content 

489 return relevant_items 

490 else: 

491 logger.warning( 

492 "No engine was selected during preview phase, returning relevant items as-is" 

493 ) 

494 return relevant_items 

495 

496 def _get_engine_instance( 

497 self, engine_name: str 

498 ) -> Optional[BaseSearchEngine]: 

499 """Get or create an instance of the specified search engine""" 

500 # Return cached instance if available 

501 if engine_name in self.engine_cache: 

502 return self.engine_cache[engine_name] 

503 

504 # Create a new instance 

505 engine = None 

506 try: 

507 # Only pass parameters that all engines accept 

508 common_params = {"llm": self.llm, "max_results": self.max_results} 

509 

510 # Add max_filtered_results if specified 

511 if self.max_filtered_results is not None: 511 ↛ 516line 511 didn't jump to line 516 because the condition on line 511 was always true

512 common_params["max_filtered_results"] = ( 

513 self.max_filtered_results 

514 ) 

515 

516 engine = create_search_engine( 

517 engine_name, 

518 settings_snapshot=self.settings_snapshot, 

519 programmatic_mode=self.programmatic_mode, 

520 **common_params, 

521 ) 

522 except Exception: 

523 logger.exception( 

524 f"Error creating engine instance for {engine_name}" 

525 ) 

526 return None 

527 

528 if engine: 528 ↛ 532line 528 didn't jump to line 532 because the condition on line 528 was always true

529 # Cache the instance 

530 self.engine_cache[engine_name] = engine 

531 

532 return engine 

533 

534 def invoke(self, query: str) -> List[Dict[str, Any]]: 

535 """Compatibility method for LangChain tools""" 

536 return self.run(query)