Coverage for src/local_deep_research/web_search_engines/search_engine_factory.py: 94%

198 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import inspect 

2from typing import Any, Dict, Optional 

3 

4from loguru import logger 

5 

6from ..security.module_whitelist import get_safe_module_class 

7from ..utilities.enums import SearchMode 

8from .retriever_registry import retriever_registry 

9from .search_engine_base import BaseSearchEngine 

10from .search_engines_config import search_config 

11 

12 

13def create_search_engine( 

14 engine_name: str, 

15 llm=None, 

16 username: str | None = None, 

17 settings_snapshot: Dict[str, Any] | None = None, 

18 programmatic_mode: bool = False, 

19 **kwargs, 

20) -> Optional[BaseSearchEngine]: 

21 """ 

22 Create a search engine instance based on the engine name. 

23 

24 Args: 

25 engine_name: Name of the search engine to create 

26 llm: Language model instance (required for some engines like meta) 

27 programmatic_mode: If True, disables database operations and metrics tracking 

28 **kwargs: Additional parameters to override defaults 

29 

30 Returns: 

31 Initialized search engine instance or None if creation failed 

32 """ 

33 # Debug logging 

34 logger.info( 

35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})" 

36 ) 

37 

38 # Handle special parallel search engine variants 

39 if engine_name == "parallel_scientific": 

40 logger.info("Creating scientific parallel search engine") 

41 from .engines.parallel_search_engine import ParallelSearchEngine 

42 

43 return ParallelSearchEngine( 

44 llm=llm, 

45 search_mode=SearchMode.SCIENTIFIC, 

46 settings_snapshot=settings_snapshot, 

47 programmatic_mode=programmatic_mode, 

48 **kwargs, 

49 ) 

50 if engine_name == "parallel": 

51 logger.info("Creating standard parallel search engine") 

52 from .engines.parallel_search_engine import ParallelSearchEngine 

53 

54 return ParallelSearchEngine( 

55 llm=llm, 

56 search_mode=SearchMode.ALL, 

57 settings_snapshot=settings_snapshot, 

58 programmatic_mode=programmatic_mode, 

59 **kwargs, 

60 ) 

61 

62 # Check if this is a registered retriever first 

63 retriever = retriever_registry.get(engine_name) 

64 if retriever: 

65 logger.info(f"Using registered LangChain retriever: {engine_name}") 

66 from .engines.search_engine_retriever import RetrieverSearchEngine 

67 

68 return RetrieverSearchEngine( 

69 retriever=retriever, 

70 name=engine_name, 

71 max_results=kwargs.get("max_results", 10), 

72 programmatic_mode=programmatic_mode, 

73 ) 

74 

75 # Extract search engine configs from settings snapshot 

76 if settings_snapshot: 

77 config = search_config( 

78 username=username, settings_snapshot=settings_snapshot 

79 ) 

80 

81 logger.debug( 

82 f"Extracted search engines from snapshot: {list(config.keys())}" 

83 ) 

84 else: 

85 raise RuntimeError( 

86 "settings_snapshot is required for search engine creation in threads" 

87 ) 

88 

89 if engine_name == "none": 

90 # Reject the literal string "none". Historically this silently fell 

91 # through to the "auto" engine and hit live networks — callers that 

92 # wanted an offline pipeline were unknowingly doing real searches. 

93 raise ValueError( 

94 "search.tool='none' is not a valid engine. Register a LangChain " 

95 "retriever via `retrievers={...}` (see " 

96 "examples/llm_integration/mock_llm_example.py) or pick a real " 

97 "engine. Previously this silently fell back to 'auto'." 

98 ) 

99 

100 if engine_name not in config: 

101 # Check if engine_name might be a display label instead of a config key 

102 # Display labels have format: "{icon} {base_name} ({category})" 

103 # e.g., "🔬 OpenAlex (Scientific)" 

104 # NOTE: This fallback is deprecated - callers should pass config keys directly 

105 logger.warning( 

106 f"Engine '{engine_name}' not found in config - attempting display label fallback. " 

107 "This is deprecated; callers should pass the config key directly." 

108 ) 

109 

110 # Try to extract the base name from the label 

111 # To avoid ReDoS, we use string operations instead of regex 

112 # Pattern: icon, space, base_name, space, (category) 

113 # Example: "🔬 OpenAlex (Scientific)" 

114 if " (" in engine_name and engine_name.endswith(")"): 

115 # Split on the last occurrence of ' (' 

116 parts = engine_name.rsplit(" (", 1) 

117 if len(parts) == 2: 117 ↛ 141line 117 didn't jump to line 141 because the condition on line 117 was always true

118 # Remove icon (first word) from the beginning 

119 before_paren = parts[0] 

120 space_idx = before_paren.find(" ") 

121 if space_idx > 0: 

122 base_name = before_paren[space_idx + 1 :].strip() 

123 logger.info( 

124 f"Extracted base name '{base_name}' from label '{engine_name}'" 

125 ) 

126 

127 # Search for a config entry with matching display_name 

128 for config_key, config_data in config.items(): 

129 if isinstance(config_data, dict): 129 ↛ 128line 129 didn't jump to line 128 because the condition on line 129 was always true

130 display_name = config_data.get( 

131 "display_name", config_key 

132 ) 

133 if display_name == base_name: 

134 logger.info( 

135 f"Matched label to config key: '{engine_name}' -> '{config_key}'" 

136 ) 

137 engine_name = config_key 

138 break 

139 

140 # If still not found, use default 

141 if engine_name not in config: 

142 logger.warning( 

143 f"Search engine '{engine_name}' not found in config, using default" 

144 ) 

145 # Try to use 'auto' as default if available 

146 if "auto" in config: 

147 engine_name = "auto" 

148 else: 

149 logger.error( 

150 f"No default search engine available. Available engines: {list(config.keys())}" 

151 ) 

152 return None 

153 

154 # Get engine configuration 

155 engine_config = config[engine_name] 

156 

157 # Set default max_results from config if not provided in kwargs 

158 if "max_results" not in kwargs: 

159 if settings_snapshot and "search.max_results" in settings_snapshot: 

160 max_results = ( 

161 settings_snapshot["search.max_results"].get("value", 20) 

162 if isinstance(settings_snapshot["search.max_results"], dict) 

163 else settings_snapshot["search.max_results"] 

164 ) 

165 else: 

166 max_results = 20 

167 kwargs["max_results"] = max_results 

168 

169 # Check for API key requirements 

170 requires_api_key = engine_config.get("requires_api_key", False) 

171 

172 if requires_api_key: 

173 # Check the settings snapshot for the API key 

174 api_key = None 

175 api_key_path = f"search.engine.web.{engine_name}.api_key" 

176 

177 if settings_snapshot: 177 ↛ 188line 177 didn't jump to line 188 because the condition on line 177 was always true

178 api_key_setting = settings_snapshot.get(api_key_path) 

179 

180 if api_key_setting: 

181 api_key = ( 

182 api_key_setting.get("value") 

183 if isinstance(api_key_setting, dict) 

184 else api_key_setting 

185 ) 

186 

187 # Still try to get from engine config if not found 

188 if not api_key: 

189 api_key = engine_config.get("api_key") 

190 

191 if not api_key: 

192 logger.info( 

193 f"Required API key for {engine_name} not found in settings." 

194 ) 

195 return None 

196 

197 # Pass the API key in kwargs for engines that need it 

198 if api_key: 198 ↛ 204line 198 didn't jump to line 204 because the condition on line 198 was always true

199 kwargs["api_key"] = api_key 

200 

201 # Warn about missing LLM but allow engine creation in degraded mode. 

202 # All engines with requires_llm=True handle llm=None gracefully 

203 # (e.g. skipping query optimization, using reliability-based sorting). 

204 if engine_config.get("requires_llm", False) and not llm: 

205 logger.warning( 

206 f"Engine '{engine_name}' is configured with requires_llm=True but no LLM provided. " 

207 f"Creating engine without LLM — some features (query optimization, relevance filtering) " 

208 f"may be unavailable." 

209 ) 

210 

211 try: 

212 # Load the engine class 

213 module_path = engine_config["module_path"] 

214 class_name = engine_config["class_name"] 

215 

216 engine_class = get_safe_module_class(module_path, class_name) 

217 

218 # Get the engine class's __init__ parameters to filter out unsupported ones 

219 engine_init_signature = inspect.signature(engine_class.__init__) 

220 engine_init_params = list(engine_init_signature.parameters.keys()) 

221 

222 # Combine default parameters with provided ones 

223 all_params = {**engine_config.get("default_params", {}), **kwargs} 

224 

225 # Filter out parameters that aren't accepted by the engine class 

226 # Note: 'self' is always the first parameter of instance methods, so we skip it 

227 filtered_params = { 

228 k: v for k, v in all_params.items() if k in engine_init_params[1:] 

229 } 

230 

231 # Always pass settings_snapshot if the engine accepts it 

232 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot: 

233 filtered_params["settings_snapshot"] = settings_snapshot 

234 

235 # Pass programmatic_mode if the engine accepts it 

236 if "programmatic_mode" in engine_init_params[1:]: 

237 filtered_params["programmatic_mode"] = programmatic_mode 

238 

239 # Add LLM if required OR if provided and engine accepts it 

240 if engine_config.get("requires_llm", False): 

241 filtered_params["llm"] = llm 

242 elif ( 

243 "llm" in engine_init_params[1:] 

244 and llm 

245 and "llm" not in filtered_params 

246 ): 

247 # If LLM was provided and engine accepts it, pass it through 

248 filtered_params["llm"] = llm 

249 logger.info( 

250 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)" 

251 ) 

252 

253 # Add API key if required and not already in filtered_params 

254 if ( 254 ↛ 259line 254 didn't jump to line 259 because the condition on line 254 was never true

255 engine_config.get("requires_api_key", False) 

256 and "api_key" not in filtered_params 

257 ): 

258 # Use the api_key we got earlier from settings 

259 if api_key: 

260 filtered_params["api_key"] = api_key 

261 

262 logger.info( 

263 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}" 

264 ) 

265 

266 # Create the engine instance with filtered parameters 

267 engine = engine_class(**filtered_params) 

268 

269 # Most engine subclasses do not name ``programmatic_mode`` in their 

270 # signature (or accept it via **kwargs without forwarding to 

271 # ``super().__init__``), so the constructor often falls back to the 

272 # BaseSearchEngine default of False even when the API caller asked 

273 # for True. Apply the requested mode post-construction so the 

274 # engine's rate tracker matches. 

275 if isinstance(engine, BaseSearchEngine) and ( 

276 engine.programmatic_mode != programmatic_mode 

277 ): 

278 engine._configure_programmatic_mode(programmatic_mode) 

279 

280 # Determine if this engine should use LLM relevance filtering 

281 # Priority: per-engine setting > needs_llm_relevance_filter > global setting 

282 # 

283 # Rationale: 

284 # - Engines with needs_llm_relevance_filter=True have poor native relevance ranking 

285 # (keyword-only, no ML ranking) and benefit from LLM-based filtering 

286 # - Well-ranked engines (Google, Brave) and semantic engines (Exa, Tavily) 

287 # do not need this and should not waste LLM calls 

288 # - The global skip_relevance_filter only affects unclassified engines 

289 # - CrossEngineFilter still ranks combined results at the strategy level 

290 should_filter = False 

291 

292 # Check for per-engine setting first (highest priority) 

293 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter" 

294 if settings_snapshot and per_engine_key in settings_snapshot: 

295 per_engine_setting = settings_snapshot[per_engine_key] 

296 should_filter = ( 

297 per_engine_setting.get("value", False) 

298 if isinstance(per_engine_setting, dict) 

299 else per_engine_setting 

300 ) 

301 logger.info( 

302 f"Using per-engine setting for {engine_name}: " 

303 f"enable_llm_relevance_filter={should_filter}" 

304 ) 

305 else: 

306 # Auto-detection based on engine attribute (medium priority) 

307 if ( 

308 hasattr(engine_class, "needs_llm_relevance_filter") 

309 and engine_class.needs_llm_relevance_filter 

310 ): 

311 should_filter = True 

312 logger.info( 

313 f"Auto-enabling LLM filtering for {engine_name} " 

314 f"(needs_llm_relevance_filter=True)" 

315 ) 

316 else: 

317 # Global override only applies to engines without needs_llm_relevance_filter 

318 if ( 

319 settings_snapshot 

320 and "search.skip_relevance_filter" in settings_snapshot 

321 ): 

322 skip_filter_setting = settings_snapshot[ 

323 "search.skip_relevance_filter" 

324 ] 

325 skip_filter = ( 

326 skip_filter_setting.get("value", False) 

327 if isinstance(skip_filter_setting, dict) 

328 else skip_filter_setting 

329 ) 

330 if skip_filter: 330 ↛ 338line 330 didn't jump to line 338 because the condition on line 330 was always true

331 should_filter = False 

332 logger.debug( 

333 f"Global skip_relevance_filter=True applied " 

334 f"for {engine_name}" 

335 ) 

336 

337 # Apply the setting 

338 if should_filter and hasattr(engine, "llm") and engine.llm: 

339 engine.enable_llm_relevance_filter = True 

340 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}") 

341 elif should_filter: 

342 logger.warning( 

343 f"LLM relevance filtering requested for {engine_name} " 

344 f"but no LLM is available — filtering skipped" 

345 ) 

346 else: 

347 logger.debug(f"LLM relevance filtering disabled for {engine_name}") 

348 

349 # Check if we need to wrap with full search capabilities 

350 if kwargs.get("use_full_search", False) and engine_config.get( 

351 "supports_full_search", False 

352 ): 

353 return _create_full_search_wrapper( 

354 engine_name, 

355 engine, 

356 engine_config, 

357 llm, 

358 kwargs, 

359 username, 

360 settings_snapshot, 

361 ) 

362 

363 return engine # type: ignore[no-any-return] 

364 

365 except Exception: 

366 logger.exception(f"Failed to create search engine '{engine_name}'") 

367 return None 

368 

369 

370def _create_full_search_wrapper( 

371 engine_name: str, 

372 base_engine: BaseSearchEngine, 

373 engine_config: Dict[str, Any], 

374 llm, 

375 params: Dict[str, Any], 

376 username: str | None = None, 

377 settings_snapshot: Dict[str, Any] | None = None, 

378) -> Optional[BaseSearchEngine]: 

379 """Create a full search wrapper for the base engine if supported""" 

380 try: 

381 # Get full search class details from engine_config (already has 

382 # registry-injected values from search_config()). 

383 module_path = engine_config.get("full_search_module") 

384 class_name = engine_config.get("full_search_class") 

385 

386 if not module_path or not class_name: 

387 logger.warning( 

388 f"Full search configuration missing for {engine_name}" 

389 ) 

390 return base_engine 

391 

392 # Import the full search class 

393 full_search_class = get_safe_module_class(module_path, class_name) 

394 

395 # Get the wrapper's __init__ parameters to filter out unsupported ones 

396 wrapper_init_signature = inspect.signature(full_search_class.__init__) 

397 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[ 

398 1: 

399 ] # Skip 'self' 

400 

401 # Extract relevant parameters for the full search wrapper 

402 wrapper_params = { 

403 k: v for k, v in params.items() if k in wrapper_init_params 

404 } 

405 

406 # Special case for SerpAPI which needs the API key directly 

407 if ( 

408 engine_name == "serpapi" 

409 and "serpapi_api_key" in wrapper_init_params 

410 ): 

411 # Check settings snapshot for API key 

412 serpapi_api_key = None 

413 if settings_snapshot: 413 ↛ 423line 413 didn't jump to line 423 because the condition on line 413 was always true

414 serpapi_setting = settings_snapshot.get( 

415 "search.engine.web.serpapi.api_key" 

416 ) 

417 if serpapi_setting: 417 ↛ 423line 417 didn't jump to line 423 because the condition on line 417 was always true

418 serpapi_api_key = ( 

419 serpapi_setting.get("value") 

420 if isinstance(serpapi_setting, dict) 

421 else serpapi_setting 

422 ) 

423 if serpapi_api_key: 423 ↛ 427line 423 didn't jump to line 427 because the condition on line 423 was always true

424 wrapper_params["serpapi_api_key"] = serpapi_api_key 

425 

426 # Map some parameter names to what the wrapper expects 

427 if ( 

428 "language" in params 

429 and "search_language" not in params 

430 and "language" in wrapper_init_params 

431 ): 

432 wrapper_params["language"] = params["language"] 

433 

434 if ( 434 ↛ 444line 434 didn't jump to line 444 because the condition on line 434 was always true

435 "safesearch" not in wrapper_params 

436 and "safe_search" in params 

437 and "safesearch" in wrapper_init_params 

438 ): 

439 wrapper_params["safesearch"] = ( 

440 "active" if params["safe_search"] else "off" 

441 ) 

442 

443 # Special case for Brave which needs the API key directly 

444 if engine_name == "brave" and "api_key" in wrapper_init_params: 

445 # Check settings snapshot for API key 

446 brave_api_key = None 

447 if settings_snapshot: 

448 brave_setting = settings_snapshot.get( 

449 "search.engine.web.brave.api_key" 

450 ) 

451 if brave_setting: 451 ↛ 458line 451 didn't jump to line 458 because the condition on line 451 was always true

452 brave_api_key = ( 

453 brave_setting.get("value") 

454 if isinstance(brave_setting, dict) 

455 else brave_setting 

456 ) 

457 

458 if brave_api_key: 

459 wrapper_params["api_key"] = brave_api_key 

460 

461 # Map some parameter names to what the wrapper expects 

462 if ( 

463 "language" in params 

464 and "search_language" not in params 

465 and "language" in wrapper_init_params 

466 ): 

467 wrapper_params["language"] = params["language"] 

468 

469 if ( 

470 "safesearch" not in wrapper_params 

471 and "safe_search" in params 

472 and "safesearch" in wrapper_init_params 

473 ): 

474 wrapper_params["safesearch"] = ( 

475 "moderate" if params["safe_search"] else "off" 

476 ) 

477 

478 # Always include llm if it's a parameter 

479 if "llm" in wrapper_init_params: 479 ↛ 483line 479 didn't jump to line 483 because the condition on line 479 was always true

480 wrapper_params["llm"] = llm 

481 

482 # If the wrapper needs the base engine and has a parameter for it 

483 if "web_search" in wrapper_init_params: 

484 wrapper_params["web_search"] = base_engine 

485 

486 logger.debug( 

487 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}" 

488 ) 

489 

490 # Create the full search wrapper with filtered parameters 

491 service: BaseSearchEngine = full_search_class(**wrapper_params) 

492 return service 

493 

494 except Exception: 

495 logger.exception( 

496 f"Failed to create full search wrapper for {engine_name}" 

497 ) 

498 return base_engine 

499 

500 

501def get_search( 

502 search_tool: str, 

503 llm_instance, 

504 max_results: int = 10, 

505 region: str = "us", 

506 time_period: str = "y", 

507 safe_search: bool = True, 

508 search_snippets_only: bool = False, 

509 search_language: str = "English", 

510 max_filtered_results: Optional[int] = None, 

511 settings_snapshot: Dict[str, Any] | None = None, 

512 programmatic_mode: bool = False, 

513): 

514 """ 

515 Get search tool instance based on the provided parameters. 

516 

517 Args: 

518 search_tool: Name of the search engine to use 

519 llm_instance: Language model instance 

520 max_results: Maximum number of search results 

521 region: Search region/locale 

522 time_period: Time period for search results 

523 safe_search: Whether to enable safe search 

524 search_snippets_only: Whether to return just snippets (vs. full content) 

525 search_language: Language for search results 

526 max_filtered_results: Maximum number of results to keep after filtering 

527 programmatic_mode: If True, disables database operations and metrics tracking 

528 

529 Returns: 

530 Initialized search engine instance 

531 """ 

532 # Common parameters 

533 params = { 

534 "max_results": max_results, 

535 "llm": llm_instance, # Only used by engines that need it 

536 } 

537 

538 # Add max_filtered_results if provided 

539 if max_filtered_results is not None: 

540 params["max_filtered_results"] = max_filtered_results 

541 

542 # Add engine-specific parameters 

543 if search_tool in [ 

544 "duckduckgo", 

545 "serpapi", 

546 "google_pse", 

547 "brave", 

548 "mojeek", 

549 ]: 

550 params.update( 

551 { 

552 "region": region, 

553 "safe_search": safe_search, 

554 "use_full_search": not search_snippets_only, 

555 } 

556 ) 

557 

558 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]: 

559 params["search_language"] = search_language 

560 

561 if search_tool == "wikinews": 

562 params["search_snippets_only"] = search_snippets_only 

563 params["adaptive_search"] = bool( 

564 (settings_snapshot or {}) 

565 .get("search.engine.web.wikinews.adaptive_search", {}) 

566 .get("value", True) 

567 ) 

568 

569 if search_tool in ["serpapi", "wikinews"]: 

570 params["time_period"] = time_period 

571 

572 # Create and return the search engine 

573 logger.info( 

574 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}" 

575 ) 

576 logger.info( 

577 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}" 

578 ) 

579 logger.info( 

580 f"Params being passed to create_search_engine: {list(params.keys()) if isinstance(params, dict) else type(params)}" 

581 ) 

582 

583 engine = create_search_engine( 

584 search_tool, 

585 settings_snapshot=settings_snapshot, 

586 programmatic_mode=programmatic_mode, 

587 **params, 

588 ) 

589 

590 # Add debugging to check if engine is None 

591 if engine is None: 

592 logger.error( 

593 f"Failed to create search engine for {search_tool} - returned None" 

594 ) 

595 else: 

596 engine_type = type(engine).__name__ 

597 logger.info( 

598 f"Successfully created search engine of type: {engine_type}" 

599 ) 

600 # Check if the engine has run method 

601 if hasattr(engine, "run"): 601 ↛ 604line 601 didn't jump to line 604 because the condition on line 601 was always true

602 logger.info(f"Engine has 'run' method: {engine.run}") 

603 else: 

604 logger.error("Engine does NOT have 'run' method!") 

605 

606 # For SearxNG, check availability flag 

607 if hasattr(engine, "is_available"): 

608 logger.info(f"Engine availability flag: {engine.is_available}") 

609 

610 return engine