Coverage for src / local_deep_research / web_search_engines / search_engine_factory.py: 94%

194 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import inspect 

2from typing import Any, Dict, Optional 

3 

4from loguru import logger 

5 

6from ..security.module_whitelist import get_safe_module_class 

7from ..utilities.enums import SearchMode 

8from .retriever_registry import retriever_registry 

9from .search_engine_base import BaseSearchEngine 

10from .search_engines_config import search_config 

11 

12 

13def create_search_engine( 

14 engine_name: str, 

15 llm=None, 

16 username: str | None = None, 

17 settings_snapshot: Dict[str, Any] | None = None, 

18 programmatic_mode: bool = False, 

19 **kwargs, 

20) -> Optional[BaseSearchEngine]: 

21 """ 

22 Create a search engine instance based on the engine name. 

23 

24 Args: 

25 engine_name: Name of the search engine to create 

26 llm: Language model instance (required for some engines like meta) 

27 programmatic_mode: If True, disables database operations and metrics tracking 

28 **kwargs: Additional parameters to override defaults 

29 

30 Returns: 

31 Initialized search engine instance or None if creation failed 

32 """ 

33 # Debug logging 

34 logger.info( 

35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})" 

36 ) 

37 

38 # Handle special parallel search engine variants 

39 if engine_name == "parallel_scientific": 

40 logger.info("Creating scientific parallel search engine") 

41 from .engines.parallel_search_engine import ParallelSearchEngine 

42 

43 return ParallelSearchEngine( 

44 llm=llm, 

45 search_mode=SearchMode.SCIENTIFIC, 

46 settings_snapshot=settings_snapshot, 

47 **kwargs, 

48 ) 

49 if engine_name == "parallel": 

50 logger.info("Creating standard parallel search engine") 

51 from .engines.parallel_search_engine import ParallelSearchEngine 

52 

53 return ParallelSearchEngine( 

54 llm=llm, 

55 search_mode=SearchMode.ALL, 

56 settings_snapshot=settings_snapshot, 

57 **kwargs, 

58 ) 

59 

60 # Check if this is a registered retriever first 

61 retriever = retriever_registry.get(engine_name) 

62 if retriever: 

63 logger.info(f"Using registered LangChain retriever: {engine_name}") 

64 from .engines.search_engine_retriever import RetrieverSearchEngine 

65 

66 return RetrieverSearchEngine( 

67 retriever=retriever, 

68 name=engine_name, 

69 max_results=kwargs.get("max_results", 10), 

70 ) 

71 

72 # Extract search engine configs from settings snapshot 

73 if settings_snapshot: 

74 config = search_config( 

75 username=username, settings_snapshot=settings_snapshot 

76 ) 

77 

78 logger.debug( 

79 f"Extracted search engines from snapshot: {list(config.keys())}" 

80 ) 

81 else: 

82 raise RuntimeError( 

83 "settings_snapshot is required for search engine creation in threads" 

84 ) 

85 

86 if engine_name not in config: 

87 # Check if engine_name might be a display label instead of a config key 

88 # Display labels have format: "{icon} {base_name} ({category})" 

89 # e.g., "🔬 OpenAlex (Scientific)" 

90 # NOTE: This fallback is deprecated - callers should pass config keys directly 

91 logger.warning( 

92 f"Engine '{engine_name}' not found in config - attempting display label fallback. " 

93 "This is deprecated; callers should pass the config key directly." 

94 ) 

95 

96 # Try to extract the base name from the label 

97 # To avoid ReDoS, we use string operations instead of regex 

98 # Pattern: icon, space, base_name, space, (category) 

99 # Example: "🔬 OpenAlex (Scientific)" 

100 if " (" in engine_name and engine_name.endswith(")"): 

101 # Split on the last occurrence of ' (' 

102 parts = engine_name.rsplit(" (", 1) 

103 if len(parts) == 2: 103 ↛ 127line 103 didn't jump to line 127 because the condition on line 103 was always true

104 # Remove icon (first word) from the beginning 

105 before_paren = parts[0] 

106 space_idx = before_paren.find(" ") 

107 if space_idx > 0: 

108 base_name = before_paren[space_idx + 1 :].strip() 

109 logger.info( 

110 f"Extracted base name '{base_name}' from label '{engine_name}'" 

111 ) 

112 

113 # Search for a config entry with matching display_name 

114 for config_key, config_data in config.items(): 

115 if isinstance(config_data, dict): 115 ↛ 114line 115 didn't jump to line 114 because the condition on line 115 was always true

116 display_name = config_data.get( 

117 "display_name", config_key 

118 ) 

119 if display_name == base_name: 

120 logger.info( 

121 f"Matched label to config key: '{engine_name}' -> '{config_key}'" 

122 ) 

123 engine_name = config_key 

124 break 

125 

126 # If still not found, use default 

127 if engine_name not in config: 

128 logger.warning( 

129 f"Search engine '{engine_name}' not found in config, using default" 

130 ) 

131 # Try to use 'auto' as default if available 

132 if "auto" in config: 

133 engine_name = "auto" 

134 else: 

135 logger.error( 

136 f"No default search engine available. Available engines: {list(config.keys())}" 

137 ) 

138 return None 

139 

140 # Get engine configuration 

141 engine_config = config[engine_name] 

142 

143 # Set default max_results from config if not provided in kwargs 

144 if "max_results" not in kwargs: 

145 if settings_snapshot and "search.max_results" in settings_snapshot: 

146 max_results = ( 

147 settings_snapshot["search.max_results"].get("value", 20) 

148 if isinstance(settings_snapshot["search.max_results"], dict) 

149 else settings_snapshot["search.max_results"] 

150 ) 

151 else: 

152 max_results = 20 

153 kwargs["max_results"] = max_results 

154 

155 # Check for API key requirements 

156 requires_api_key = engine_config.get("requires_api_key", False) 

157 

158 if requires_api_key: 

159 # Check the settings snapshot for the API key 

160 api_key = None 

161 api_key_path = f"search.engine.web.{engine_name}.api_key" 

162 

163 if settings_snapshot: 163 ↛ 174line 163 didn't jump to line 174 because the condition on line 163 was always true

164 api_key_setting = settings_snapshot.get(api_key_path) 

165 

166 if api_key_setting: 

167 api_key = ( 

168 api_key_setting.get("value") 

169 if isinstance(api_key_setting, dict) 

170 else api_key_setting 

171 ) 

172 

173 # Still try to get from engine config if not found 

174 if not api_key: 

175 api_key = engine_config.get("api_key") 

176 

177 if not api_key: 

178 logger.info( 

179 f"Required API key for {engine_name} not found in settings." 

180 ) 

181 return None 

182 

183 # Pass the API key in kwargs for engines that need it 

184 if api_key: 184 ↛ 190line 184 didn't jump to line 190 because the condition on line 184 was always true

185 kwargs["api_key"] = api_key 

186 

187 # Warn about missing LLM but allow engine creation in degraded mode. 

188 # All engines with requires_llm=True handle llm=None gracefully 

189 # (e.g. skipping query optimization, using reliability-based sorting). 

190 if engine_config.get("requires_llm", False) and not llm: 

191 logger.warning( 

192 f"Engine '{engine_name}' is configured with requires_llm=True but no LLM provided. " 

193 f"Creating engine without LLM — some features (query optimization, relevance filtering) " 

194 f"may be unavailable." 

195 ) 

196 

197 try: 

198 # Load the engine class 

199 module_path = engine_config["module_path"] 

200 class_name = engine_config["class_name"] 

201 

202 engine_class = get_safe_module_class(module_path, class_name) 

203 

204 # Get the engine class's __init__ parameters to filter out unsupported ones 

205 engine_init_signature = inspect.signature(engine_class.__init__) 

206 engine_init_params = list(engine_init_signature.parameters.keys()) 

207 

208 # Combine default parameters with provided ones 

209 all_params = {**engine_config.get("default_params", {}), **kwargs} 

210 

211 # Filter out parameters that aren't accepted by the engine class 

212 # Note: 'self' is always the first parameter of instance methods, so we skip it 

213 filtered_params = { 

214 k: v for k, v in all_params.items() if k in engine_init_params[1:] 

215 } 

216 

217 # Always pass settings_snapshot if the engine accepts it 

218 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot: 

219 filtered_params["settings_snapshot"] = settings_snapshot 

220 

221 # Pass programmatic_mode if the engine accepts it 

222 if "programmatic_mode" in engine_init_params[1:]: 

223 filtered_params["programmatic_mode"] = programmatic_mode 

224 

225 # Add LLM if required OR if provided and engine accepts it 

226 if engine_config.get("requires_llm", False): 

227 filtered_params["llm"] = llm 

228 elif ( 

229 "llm" in engine_init_params[1:] 

230 and llm 

231 and "llm" not in filtered_params 

232 ): 

233 # If LLM was provided and engine accepts it, pass it through 

234 filtered_params["llm"] = llm 

235 logger.info( 

236 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)" 

237 ) 

238 

239 # Add API key if required and not already in filtered_params 

240 if ( 240 ↛ 245line 240 didn't jump to line 245 because the condition on line 240 was never true

241 engine_config.get("requires_api_key", False) 

242 and "api_key" not in filtered_params 

243 ): 

244 # Use the api_key we got earlier from settings 

245 if api_key: 

246 filtered_params["api_key"] = api_key 

247 

248 logger.info( 

249 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}" 

250 ) 

251 

252 # Create the engine instance with filtered parameters 

253 engine = engine_class(**filtered_params) 

254 

255 # Determine if this engine should use LLM relevance filtering 

256 # Priority: per-engine setting > needs_llm_relevance_filter > global setting 

257 # 

258 # Rationale: 

259 # - Engines with needs_llm_relevance_filter=True have poor native relevance ranking 

260 # (keyword-only, no ML ranking) and benefit from LLM-based filtering 

261 # - Well-ranked engines (Google, Brave) and semantic engines (Exa, Tavily) 

262 # do not need this and should not waste LLM calls 

263 # - The global skip_relevance_filter only affects unclassified engines 

264 # - CrossEngineFilter still ranks combined results at the strategy level 

265 should_filter = False 

266 

267 # Check for per-engine setting first (highest priority) 

268 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter" 

269 if settings_snapshot and per_engine_key in settings_snapshot: 

270 per_engine_setting = settings_snapshot[per_engine_key] 

271 should_filter = ( 

272 per_engine_setting.get("value", False) 

273 if isinstance(per_engine_setting, dict) 

274 else per_engine_setting 

275 ) 

276 logger.info( 

277 f"Using per-engine setting for {engine_name}: " 

278 f"enable_llm_relevance_filter={should_filter}" 

279 ) 

280 else: 

281 # Auto-detection based on engine attribute (medium priority) 

282 if ( 

283 hasattr(engine_class, "needs_llm_relevance_filter") 

284 and engine_class.needs_llm_relevance_filter 

285 ): 

286 should_filter = True 

287 logger.info( 

288 f"Auto-enabling LLM filtering for {engine_name} " 

289 f"(needs_llm_relevance_filter=True)" 

290 ) 

291 else: 

292 # Global override only applies to engines without needs_llm_relevance_filter 

293 if ( 

294 settings_snapshot 

295 and "search.skip_relevance_filter" in settings_snapshot 

296 ): 

297 skip_filter_setting = settings_snapshot[ 

298 "search.skip_relevance_filter" 

299 ] 

300 skip_filter = ( 

301 skip_filter_setting.get("value", False) 

302 if isinstance(skip_filter_setting, dict) 

303 else skip_filter_setting 

304 ) 

305 if skip_filter: 305 ↛ 313line 305 didn't jump to line 313 because the condition on line 305 was always true

306 should_filter = False 

307 logger.debug( 

308 f"Global skip_relevance_filter=True applied " 

309 f"for {engine_name}" 

310 ) 

311 

312 # Apply the setting 

313 if should_filter and hasattr(engine, "llm") and engine.llm: 

314 engine.enable_llm_relevance_filter = True 

315 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}") 

316 elif should_filter: 

317 logger.warning( 

318 f"LLM relevance filtering requested for {engine_name} " 

319 f"but no LLM is available — filtering skipped" 

320 ) 

321 else: 

322 logger.debug(f"LLM relevance filtering disabled for {engine_name}") 

323 

324 # Check if we need to wrap with full search capabilities 

325 if kwargs.get("use_full_search", False) and engine_config.get( 

326 "supports_full_search", False 

327 ): 

328 return _create_full_search_wrapper( 

329 engine_name, 

330 engine, 

331 engine_config, 

332 llm, 

333 kwargs, 

334 username, 

335 settings_snapshot, 

336 ) 

337 

338 return engine # type: ignore[no-any-return] 

339 

340 except Exception: 

341 logger.exception(f"Failed to create search engine '{engine_name}'") 

342 return None 

343 

344 

345def _create_full_search_wrapper( 

346 engine_name: str, 

347 base_engine: BaseSearchEngine, 

348 engine_config: Dict[str, Any], 

349 llm, 

350 params: Dict[str, Any], 

351 username: str | None = None, 

352 settings_snapshot: Dict[str, Any] | None = None, 

353) -> Optional[BaseSearchEngine]: 

354 """Create a full search wrapper for the base engine if supported""" 

355 try: 

356 # Get full search class details from engine_config (already has 

357 # registry-injected values from search_config()). 

358 module_path = engine_config.get("full_search_module") 

359 class_name = engine_config.get("full_search_class") 

360 

361 if not module_path or not class_name: 

362 logger.warning( 

363 f"Full search configuration missing for {engine_name}" 

364 ) 

365 return base_engine 

366 

367 # Import the full search class 

368 full_search_class = get_safe_module_class(module_path, class_name) 

369 

370 # Get the wrapper's __init__ parameters to filter out unsupported ones 

371 wrapper_init_signature = inspect.signature(full_search_class.__init__) 

372 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[ 

373 1: 

374 ] # Skip 'self' 

375 

376 # Extract relevant parameters for the full search wrapper 

377 wrapper_params = { 

378 k: v for k, v in params.items() if k in wrapper_init_params 

379 } 

380 

381 # Special case for SerpAPI which needs the API key directly 

382 if ( 

383 engine_name == "serpapi" 

384 and "serpapi_api_key" in wrapper_init_params 

385 ): 

386 # Check settings snapshot for API key 

387 serpapi_api_key = None 

388 if settings_snapshot: 388 ↛ 398line 388 didn't jump to line 398 because the condition on line 388 was always true

389 serpapi_setting = settings_snapshot.get( 

390 "search.engine.web.serpapi.api_key" 

391 ) 

392 if serpapi_setting: 392 ↛ 398line 392 didn't jump to line 398 because the condition on line 392 was always true

393 serpapi_api_key = ( 

394 serpapi_setting.get("value") 

395 if isinstance(serpapi_setting, dict) 

396 else serpapi_setting 

397 ) 

398 if serpapi_api_key: 398 ↛ 402line 398 didn't jump to line 402 because the condition on line 398 was always true

399 wrapper_params["serpapi_api_key"] = serpapi_api_key 

400 

401 # Map some parameter names to what the wrapper expects 

402 if ( 

403 "language" in params 

404 and "search_language" not in params 

405 and "language" in wrapper_init_params 

406 ): 

407 wrapper_params["language"] = params["language"] 

408 

409 if ( 409 ↛ 419line 409 didn't jump to line 419 because the condition on line 409 was always true

410 "safesearch" not in wrapper_params 

411 and "safe_search" in params 

412 and "safesearch" in wrapper_init_params 

413 ): 

414 wrapper_params["safesearch"] = ( 

415 "active" if params["safe_search"] else "off" 

416 ) 

417 

418 # Special case for Brave which needs the API key directly 

419 if engine_name == "brave" and "api_key" in wrapper_init_params: 

420 # Check settings snapshot for API key 

421 brave_api_key = None 

422 if settings_snapshot: 

423 brave_setting = settings_snapshot.get( 

424 "search.engine.web.brave.api_key" 

425 ) 

426 if brave_setting: 426 ↛ 433line 426 didn't jump to line 433 because the condition on line 426 was always true

427 brave_api_key = ( 

428 brave_setting.get("value") 

429 if isinstance(brave_setting, dict) 

430 else brave_setting 

431 ) 

432 

433 if brave_api_key: 

434 wrapper_params["api_key"] = brave_api_key 

435 

436 # Map some parameter names to what the wrapper expects 

437 if ( 

438 "language" in params 

439 and "search_language" not in params 

440 and "language" in wrapper_init_params 

441 ): 

442 wrapper_params["language"] = params["language"] 

443 

444 if ( 

445 "safesearch" not in wrapper_params 

446 and "safe_search" in params 

447 and "safesearch" in wrapper_init_params 

448 ): 

449 wrapper_params["safesearch"] = ( 

450 "moderate" if params["safe_search"] else "off" 

451 ) 

452 

453 # Always include llm if it's a parameter 

454 if "llm" in wrapper_init_params: 454 ↛ 458line 454 didn't jump to line 458 because the condition on line 454 was always true

455 wrapper_params["llm"] = llm 

456 

457 # If the wrapper needs the base engine and has a parameter for it 

458 if "web_search" in wrapper_init_params: 

459 wrapper_params["web_search"] = base_engine 

460 

461 logger.debug( 

462 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}" 

463 ) 

464 

465 # Create the full search wrapper with filtered parameters 

466 service: BaseSearchEngine = full_search_class(**wrapper_params) 

467 return service 

468 

469 except Exception: 

470 logger.exception( 

471 f"Failed to create full search wrapper for {engine_name}" 

472 ) 

473 return base_engine 

474 

475 

476def get_search( 

477 search_tool: str, 

478 llm_instance, 

479 max_results: int = 10, 

480 region: str = "us", 

481 time_period: str = "y", 

482 safe_search: bool = True, 

483 search_snippets_only: bool = False, 

484 search_language: str = "English", 

485 max_filtered_results: Optional[int] = None, 

486 settings_snapshot: Dict[str, Any] | None = None, 

487 programmatic_mode: bool = False, 

488): 

489 """ 

490 Get search tool instance based on the provided parameters. 

491 

492 Args: 

493 search_tool: Name of the search engine to use 

494 llm_instance: Language model instance 

495 max_results: Maximum number of search results 

496 region: Search region/locale 

497 time_period: Time period for search results 

498 safe_search: Whether to enable safe search 

499 search_snippets_only: Whether to return just snippets (vs. full content) 

500 search_language: Language for search results 

501 max_filtered_results: Maximum number of results to keep after filtering 

502 programmatic_mode: If True, disables database operations and metrics tracking 

503 

504 Returns: 

505 Initialized search engine instance 

506 """ 

507 # Common parameters 

508 params = { 

509 "max_results": max_results, 

510 "llm": llm_instance, # Only used by engines that need it 

511 } 

512 

513 # Add max_filtered_results if provided 

514 if max_filtered_results is not None: 

515 params["max_filtered_results"] = max_filtered_results 

516 

517 # Add engine-specific parameters 

518 if search_tool in [ 

519 "duckduckgo", 

520 "serpapi", 

521 "google_pse", 

522 "brave", 

523 "mojeek", 

524 ]: 

525 params.update( 

526 { 

527 "region": region, 

528 "safe_search": safe_search, 

529 "use_full_search": not search_snippets_only, 

530 } 

531 ) 

532 

533 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]: 

534 params["search_language"] = search_language 

535 

536 if search_tool == "wikinews": 

537 params["search_snippets_only"] = search_snippets_only 

538 params["adaptive_search"] = bool( 

539 (settings_snapshot or {}) 

540 .get("search.engine.web.wikinews.adaptive_search", {}) 

541 .get("value", True) 

542 ) 

543 

544 if search_tool in ["serpapi", "wikinews"]: 

545 params["time_period"] = time_period 

546 

547 # Create and return the search engine 

548 logger.info( 

549 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}" 

550 ) 

551 logger.info( 

552 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}" 

553 ) 

554 logger.info( 

555 f"Params being passed to create_search_engine: {list(params.keys()) if isinstance(params, dict) else type(params)}" 

556 ) 

557 

558 engine = create_search_engine( 

559 search_tool, 

560 settings_snapshot=settings_snapshot, 

561 programmatic_mode=programmatic_mode, 

562 **params, 

563 ) 

564 

565 # Add debugging to check if engine is None 

566 if engine is None: 

567 logger.error( 

568 f"Failed to create search engine for {search_tool} - returned None" 

569 ) 

570 else: 

571 engine_type = type(engine).__name__ 

572 logger.info( 

573 f"Successfully created search engine of type: {engine_type}" 

574 ) 

575 # Check if the engine has run method 

576 if hasattr(engine, "run"): 576 ↛ 579line 576 didn't jump to line 579 because the condition on line 576 was always true

577 logger.info(f"Engine has 'run' method: {engine.run}") 

578 else: 

579 logger.error("Engine does NOT have 'run' method!") 

580 

581 # For SearxNG, check availability flag 

582 if hasattr(engine, "is_available"): 

583 logger.info(f"Engine availability flag: {engine.is_available}") 

584 

585 return engine