Coverage for src / local_deep_research / web_search_engines / engines / meta_search_engine.py: 6%

193 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1from typing import Any, Dict, List, Optional 

2 

3from loguru import logger 

4 

5from ...config.search_config import get_setting_from_snapshot 

6from ...web.services.socket_service import SocketIOService 

7from ..search_engine_base import BaseSearchEngine 

8from ..search_engine_factory import create_search_engine 

9from .search_engine_wikipedia import WikipediaSearchEngine 

10 

11 

12class MetaSearchEngine(BaseSearchEngine): 

13 """ 

14 LLM-powered meta search engine that intelligently selects and uses 

15 the appropriate search engines based on query analysis 

16 """ 

17 

18 def __init__( 

19 self, 

20 llm, 

21 max_results: int = 10, 

22 use_api_key_services: bool = True, 

23 max_engines_to_try: int = 3, 

24 max_filtered_results: Optional[int] = None, 

25 engine_selection_callback=None, 

26 settings_snapshot: Optional[Dict[str, Any]] = None, 

27 programmatic_mode: bool = False, 

28 **kwargs, 

29 ): 

30 """ 

31 Initialize the meta search engine. 

32 

33 Args: 

34 llm: Language model instance for query classification and relevance filtering 

35 max_results: Maximum number of search results to return 

36 use_api_key_services: Whether to include services that require API keys 

37 max_engines_to_try: Maximum number of engines to try before giving up 

38 max_filtered_results: Maximum number of results to keep after filtering 

39 settings_snapshot: Settings snapshot for thread context 

40 programmatic_mode: If True, disables database operations and metrics tracking 

41 **kwargs: Additional parameters (ignored but accepted for compatibility) 

42 """ 

43 # Initialize the BaseSearchEngine with the LLM, max_filtered_results, and max_results 

44 super().__init__( 

45 llm=llm, 

46 max_filtered_results=max_filtered_results, 

47 max_results=max_results, 

48 settings_snapshot=settings_snapshot, 

49 programmatic_mode=programmatic_mode, 

50 ) 

51 

52 self.use_api_key_services = use_api_key_services 

53 self.max_engines_to_try = max_engines_to_try 

54 self.settings_snapshot = settings_snapshot 

55 

56 # Cache for engine instances 

57 self.engine_cache = {} 

58 

59 # Get available engines (excluding 'meta' and 'auto') 

60 self.available_engines = self._get_available_engines() 

61 logger.info( 

62 f"Meta Search Engine initialized with {len(self.available_engines)} available engines: {', '.join(self.available_engines)}" 

63 ) 

64 

65 # Create a fallback engine in case everything else fails 

66 self.fallback_engine = WikipediaSearchEngine( 

67 max_results=self.max_results, 

68 llm=llm, 

69 max_filtered_results=max_filtered_results, 

70 ) 

71 

72 def _get_search_config(self) -> Dict[str, Any]: 

73 """Get search config from settings_snapshot or fallback to self._get_search_config()""" 

74 if self.settings_snapshot: 

75 # Extract search engine configs from settings snapshot 

76 config_data = {} 

77 for key, value in self.settings_snapshot.items(): 

78 if key.startswith("search.engine.web."): 

79 parts = key.split(".") 

80 if len(parts) >= 4: 

81 engine_name = parts[3] 

82 if engine_name not in config_data: 

83 config_data[engine_name] = {} 

84 remaining_key = ( 

85 ".".join(parts[4:]) if len(parts) > 4 else "" 

86 ) 

87 if remaining_key: 

88 config_data[engine_name][remaining_key] = ( 

89 value.get("value") 

90 if isinstance(value, dict) 

91 else value 

92 ) 

93 

94 # Also check for auto engine 

95 if "search.engine.auto.class_name" in self.settings_snapshot: 

96 config_data["auto"] = {} 

97 for key, value in self.settings_snapshot.items(): 

98 if key.startswith("search.engine.auto."): 

99 remaining_key = key.replace("search.engine.auto.", "") 

100 config_data["auto"][remaining_key] = ( 

101 value.get("value") 

102 if isinstance(value, dict) 

103 else value 

104 ) 

105 return config_data 

106 else: 

107 # Fallback to search_config if no snapshot 

108 return self._get_search_config() 

109 

110 def _get_available_engines(self) -> List[str]: 

111 """Get list of available engines, excluding 'meta' and 'auto', based on user settings""" 

112 # Filter out 'meta' and 'auto' and check API key availability 

113 available = [] 

114 

115 # Get search config using helper method 

116 config_data = self._get_search_config() 

117 

118 for name, config_ in config_data.items(): 

119 if name in ["meta", "auto"]: 

120 continue 

121 

122 # Determine if this is a local engine (starts with "local.") 

123 is_local_engine = name.startswith("local.") 

124 

125 # Determine the appropriate setting path based on engine type 

126 if is_local_engine: 

127 # Format: search.engine.local.{engine_name}.use_in_auto_search 

128 local_name = name.replace("local.", "") 

129 auto_search_setting = ( 

130 f"search.engine.local.{local_name}.use_in_auto_search" 

131 ) 

132 else: 

133 # Format: search.engine.web.{engine_name}.use_in_auto_search 

134 auto_search_setting = ( 

135 f"search.engine.web.{name}.use_in_auto_search" 

136 ) 

137 

138 # Get setting from database, default to False if not found 

139 use_in_auto_search = get_setting_from_snapshot( 

140 auto_search_setting, 

141 False, 

142 settings_snapshot=self.settings_snapshot, 

143 ) 

144 

145 # Skip engines that aren't enabled for auto search 

146 if not use_in_auto_search: 

147 logger.info( 

148 f"Skipping {name} engine because it's not enabled for auto search" 

149 ) 

150 continue 

151 

152 # Skip engines that require API keys if we don't want to use them 

153 if ( 

154 config_.get("requires_api_key", False) 

155 and not self.use_api_key_services 

156 ): 

157 continue 

158 

159 # Skip engines that require API keys if the key is not available 

160 if config_.get("requires_api_key", False): 

161 api_key = config_.get("api_key") 

162 if not api_key: 

163 continue 

164 

165 available.append(name) 

166 

167 # If no engines are available, raise an error instead of falling back silently 

168 if not available: 

169 error_msg = "No search engines enabled for auto search. Please enable at least one engine in settings." 

170 logger.error(error_msg) 

171 raise RuntimeError(error_msg) 

172 

173 return available 

174 

175 def analyze_query(self, query: str) -> List[str]: 

176 """ 

177 Analyze the query to determine the best search engines to use. 

178 Prioritizes SearXNG for general queries, but selects specialized engines 

179 for domain-specific queries (e.g., scientific papers, code). 

180 

181 Args: 

182 query: The search query 

183 

184 Returns: 

185 List of search engine names sorted by suitability 

186 """ 

187 try: 

188 # First check if this is a specialized query that should use specific engines 

189 specialized_domains = { 

190 "scientific paper": ["arxiv", "pubmed", "wikipedia"], 

191 "medical research": ["pubmed", "searxng"], 

192 "clinical": ["pubmed", "searxng"], 

193 "github": ["github", "searxng"], 

194 "repository": ["github", "searxng"], 

195 "code": ["github", "searxng"], 

196 "programming": ["github", "searxng"], 

197 } 

198 

199 # Quick heuristic check for specialized queries 

200 query_lower = query.lower() 

201 for term, engines in specialized_domains.items(): 

202 if term in query_lower: 

203 valid_engines = [] 

204 for engine in engines: 

205 if engine in self.available_engines: 

206 valid_engines.append(engine) 

207 

208 if valid_engines: 

209 logger.info( 

210 f"Detected specialized query type: {term}, using engines: {valid_engines}" 

211 ) 

212 return valid_engines 

213 

214 # For searches containing "arxiv", prioritize the arxiv engine 

215 if "arxiv" in query_lower and "arxiv" in self.available_engines: 

216 return ["arxiv"] + [ 

217 e for e in self.available_engines if e != "arxiv" 

218 ] 

219 

220 # For searches containing "pubmed", prioritize the pubmed engine 

221 if "pubmed" in query_lower and "pubmed" in self.available_engines: 

222 return ["pubmed"] + [ 

223 e for e in self.available_engines if e != "pubmed" 

224 ] 

225 

226 # Check if SearXNG is available and prioritize it for general queries 

227 if "searxng" in self.available_engines: 

228 # For general queries, return SearXNG first followed by reliability-ordered engines 

229 engines_without_searxng = [ 

230 e for e in self.available_engines if e != "searxng" 

231 ] 

232 reliability_sorted = sorted( 

233 engines_without_searxng, 

234 key=lambda x: self._get_search_config() 

235 .get(x, {}) 

236 .get("reliability", 0), 

237 reverse=True, 

238 ) 

239 return ["searxng"] + reliability_sorted 

240 

241 # If LLM is not available or SearXNG is not available, fall back to reliability 

242 if not self.llm or "searxng" not in self.available_engines: 

243 logger.warning( 

244 "No LLM available or SearXNG not available, using reliability-based engines" 

245 ) 

246 # Return engines sorted by reliability 

247 return sorted( 

248 self.available_engines, 

249 key=lambda x: self._get_search_config() 

250 .get(x, {}) 

251 .get("reliability", 0), 

252 reverse=True, 

253 ) 

254 

255 # Create a prompt that outlines the available search engines and their strengths 

256 engines_info = [] 

257 for engine_name in self.available_engines: 

258 try: 

259 if engine_name in self._get_search_config(): 

260 strengths = self._get_search_config()[engine_name].get( 

261 "strengths", "General search" 

262 ) 

263 weaknesses = self._get_search_config()[engine_name].get( 

264 "weaknesses", "None specified" 

265 ) 

266 description = self._get_search_config()[ 

267 engine_name 

268 ].get("description", engine_name) 

269 engines_info.append( 

270 f"- {engine_name}: {description}\n Strengths: {strengths}\n Weaknesses: {weaknesses}" 

271 ) 

272 except KeyError: 

273 logger.exception(f"Missing key for engine {engine_name}") 

274 

275 # Only proceed if we have engines available to choose from 

276 if not engines_info: 

277 logger.warning( 

278 "No engine information available for prompt, using reliability-based sorting instead" 

279 ) 

280 return sorted( 

281 self.available_engines, 

282 key=lambda x: self._get_search_config() 

283 .get(x, {}) 

284 .get("reliability", 0), 

285 reverse=True, 

286 ) 

287 

288 # Use a stronger prompt that emphasizes SearXNG preference for general queries 

289 prompt = f"""You are a search query analyst. Consider this search query: 

290 

291QUERY: {query} 

292 

293I have these search engines available: 

294{chr(10).join(engines_info)} 

295 

296Determine which search engines would be most appropriate for answering this query. 

297First analyze the nature of the query: Is it factual, scientific, code-related, medical, etc.? 

298 

299IMPORTANT GUIDELINES: 

300- Use SearXNG for most general queries as it combines results from multiple search engines 

301- For academic/scientific searches, prefer arXiv 

302- For medical research, prefer PubMed 

303- For code repositories and programming, prefer GitHub 

304- For every other query type, SearXNG is usually the best option 

305 

306Output ONLY a comma-separated list of 1-3 search engine names in order of most appropriate to least appropriate. 

307Example output: searxng,wikipedia,brave""" 

308 

309 # Get analysis from LLM 

310 response = self.llm.invoke(prompt) 

311 

312 # Handle different response formats 

313 if hasattr(response, "content"): 

314 content = response.content.strip() 

315 else: 

316 content = str(response).strip() 

317 

318 # Extract engine names 

319 valid_engines = [] 

320 for engine_name in content.split(","): 

321 cleaned_name = engine_name.strip().lower() 

322 if cleaned_name in self.available_engines: 

323 valid_engines.append(cleaned_name) 

324 

325 # If SearXNG is available but not selected by the LLM, add it as a fallback 

326 if ( 

327 "searxng" in self.available_engines 

328 and "searxng" not in valid_engines 

329 ): 

330 # Add it as the last option if the LLM selected others 

331 if valid_engines: 

332 valid_engines.append("searxng") 

333 # Use it as the first option if no valid engines were selected 

334 else: 

335 valid_engines = ["searxng"] 

336 

337 # If still no valid engines, use reliability-based ordering 

338 if not valid_engines: 

339 valid_engines = sorted( 

340 self.available_engines, 

341 key=lambda x: self._get_search_config() 

342 .get(x, {}) 

343 .get("reliability", 0), 

344 reverse=True, 

345 ) 

346 

347 return valid_engines 

348 except Exception: 

349 logger.exception("Error analyzing query with LLM") 

350 # Fall back to SearXNG if available, then reliability-based ordering 

351 if "searxng" in self.available_engines: 

352 return ["searxng"] + sorted( 

353 [e for e in self.available_engines if e != "searxng"], 

354 key=lambda x: self._get_search_config() 

355 .get(x, {}) 

356 .get("reliability", 0), 

357 reverse=True, 

358 ) 

359 else: 

360 return sorted( 

361 self.available_engines, 

362 key=lambda x: self._get_search_config() 

363 .get(x, {}) 

364 .get("reliability", 0), 

365 reverse=True, 

366 ) 

367 

368 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

369 """ 

370 Get preview information by selecting the best search engine for this query. 

371 

372 Args: 

373 query: The search query 

374 

375 Returns: 

376 List of preview dictionaries 

377 """ 

378 # Get ranked list of engines for this query 

379 ranked_engines = self.analyze_query(query) 

380 

381 if not ranked_engines: 

382 logger.warning( 

383 "No suitable search engines found for query, using fallback engine" 

384 ) 

385 return self.fallback_engine._get_previews(query) 

386 

387 # Limit the number of engines to try 

388 engines_to_try = ranked_engines[: self.max_engines_to_try] 

389 logger.info( 

390 f"SEARCH_PLAN: Will try these engines in order: {', '.join(engines_to_try)}" 

391 ) 

392 

393 all_errors = [] 

394 # Try each engine in order 

395 for engine_name in engines_to_try: 

396 logger.info(f"Trying search engine: {engine_name}") 

397 

398 # Get or create the engine instance 

399 engine = self._get_engine_instance(engine_name) 

400 

401 if not engine: 

402 logger.warning(f"Failed to initialize {engine_name}, skipping") 

403 all_errors.append(f"Failed to initialize {engine_name}") 

404 continue 

405 

406 try: 

407 # Get previews from this engine 

408 previews = engine._get_previews(query) 

409 

410 # If search was successful, return results 

411 if previews and len(previews) > 0: 

412 logger.info(f"ENGINE_SELECTED: {engine_name}") 

413 logger.info( 

414 f"Successfully got {len(previews)} preview results from {engine_name}" 

415 ) 

416 # Store selected engine for later use 

417 self._selected_engine = engine 

418 self._selected_engine_name = engine_name 

419 

420 # Emit a socket event to inform about the selected engine 

421 try: 

422 SocketIOService().emit_socket_event( 

423 "search_engine_selected", 

424 { 

425 "engine": engine_name, 

426 "result_count": len(previews), 

427 }, 

428 ) 

429 except Exception: 

430 logger.exception("Socket emit error (non-critical)") 

431 

432 return previews 

433 

434 logger.info(f"{engine_name} returned no previews") 

435 all_errors.append(f"{engine_name} returned no previews") 

436 

437 except Exception as e: 

438 error_msg = f"Error getting previews from {engine_name}: {e!s}" 

439 logger.exception(error_msg) 

440 all_errors.append(error_msg) 

441 

442 # If we reach here, all engines failed, use fallback 

443 logger.warning( 

444 f"All engines failed or returned no preview results: {', '.join(all_errors)}" 

445 ) 

446 logger.info("Using fallback Wikipedia engine for previews") 

447 self._selected_engine = self.fallback_engine 

448 self._selected_engine_name = "wikipedia" 

449 return self.fallback_engine._get_previews(query) 

450 

451 def _get_full_content( 

452 self, relevant_items: List[Dict[str, Any]] 

453 ) -> List[Dict[str, Any]]: 

454 """ 

455 Get full content using the engine that provided the previews. 

456 

457 Args: 

458 relevant_items: List of relevant preview dictionaries 

459 

460 Returns: 

461 List of result dictionaries with full content 

462 """ 

463 # Check if we should get full content 

464 if get_setting_from_snapshot( 

465 "search.snippets_only", 

466 True, 

467 settings_snapshot=self.settings_snapshot, 

468 ): 

469 logger.info("Snippet-only mode, skipping full content retrieval") 

470 return relevant_items 

471 

472 logger.info("Getting full content for relevant items") 

473 

474 # Use the selected engine to get full content 

475 if hasattr(self, "_selected_engine"): 

476 try: 

477 logger.info( 

478 f"Using {self._selected_engine_name} to get full content" 

479 ) 

480 return self._selected_engine._get_full_content(relevant_items) 

481 except Exception: 

482 logger.exception( 

483 f"Error getting full content from {self._selected_engine_name}" 

484 ) 

485 # Fall back to returning relevant items without full content 

486 return relevant_items 

487 else: 

488 logger.warning( 

489 "No engine was selected during preview phase, returning relevant items as-is" 

490 ) 

491 return relevant_items 

492 

493 def _get_engine_instance( 

494 self, engine_name: str 

495 ) -> Optional[BaseSearchEngine]: 

496 """Get or create an instance of the specified search engine""" 

497 # Return cached instance if available 

498 if engine_name in self.engine_cache: 

499 return self.engine_cache[engine_name] 

500 

501 # Create a new instance 

502 engine = None 

503 try: 

504 # Only pass parameters that all engines accept 

505 common_params = {"llm": self.llm, "max_results": self.max_results} 

506 

507 # Add max_filtered_results if specified 

508 if self.max_filtered_results is not None: 

509 common_params["max_filtered_results"] = ( 

510 self.max_filtered_results 

511 ) 

512 

513 engine = create_search_engine( 

514 engine_name, 

515 settings_snapshot=self.settings_snapshot, 

516 programmatic_mode=self.programmatic_mode, 

517 **common_params, 

518 ) 

519 except Exception: 

520 logger.exception( 

521 f"Error creating engine instance for {engine_name}" 

522 ) 

523 return None 

524 

525 if engine: 

526 # Cache the instance 

527 self.engine_cache[engine_name] = engine 

528 

529 return engine 

530 

531 def invoke(self, query: str) -> List[Dict[str, Any]]: 

532 """Compatibility method for LangChain tools""" 

533 return self.run(query)