Coverage for src / local_deep_research / web_search_engines / search_engine_factory.py: 47%

222 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import importlib 

2import inspect 

3from typing import Any, Dict, Optional 

4 

5from loguru import logger 

6 

7from ..utilities.enums import SearchMode 

8from .retriever_registry import retriever_registry 

9from .search_engine_base import BaseSearchEngine 

10from .search_engines_config import search_config 

11 

12 

13def create_search_engine( 

14 engine_name: str, 

15 llm=None, 

16 username: str = None, 

17 settings_snapshot: Dict[str, Any] = None, 

18 programmatic_mode: bool = False, 

19 **kwargs, 

20) -> Optional[BaseSearchEngine]: 

21 """ 

22 Create a search engine instance based on the engine name. 

23 

24 Args: 

25 engine_name: Name of the search engine to create 

26 llm: Language model instance (required for some engines like meta) 

27 programmatic_mode: If True, disables database operations and metrics tracking 

28 **kwargs: Additional parameters to override defaults 

29 

30 Returns: 

31 Initialized search engine instance or None if creation failed 

32 """ 

33 # Debug logging 

34 logger.info( 

35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})" 

36 ) 

37 

38 # Handle special parallel search engine variants 

39 if engine_name == "parallel_scientific": 

40 logger.info("Creating scientific parallel search engine") 

41 from .engines.parallel_search_engine import ParallelSearchEngine 

42 

43 return ParallelSearchEngine( 

44 llm=llm, 

45 search_mode=SearchMode.SCIENTIFIC, 

46 settings_snapshot=settings_snapshot, 

47 **kwargs, 

48 ) 

49 elif engine_name == "parallel": 

50 logger.info("Creating standard parallel search engine") 

51 from .engines.parallel_search_engine import ParallelSearchEngine 

52 

53 return ParallelSearchEngine( 

54 llm=llm, 

55 search_mode=SearchMode.ALL, 

56 settings_snapshot=settings_snapshot, 

57 **kwargs, 

58 ) 

59 

60 # Check if this is a registered retriever first 

61 retriever = retriever_registry.get(engine_name) 

62 if retriever: 

63 logger.info(f"Using registered LangChain retriever: {engine_name}") 

64 from .engines.search_engine_retriever import RetrieverSearchEngine 

65 

66 return RetrieverSearchEngine( 

67 retriever=retriever, 

68 name=engine_name, 

69 max_results=kwargs.get("max_results", 10), 

70 ) 

71 

72 # Extract search engine configs from settings snapshot 

73 if settings_snapshot: 

74 config = search_config( 

75 username=username, settings_snapshot=settings_snapshot 

76 ) 

77 

78 logger.debug( 

79 f"Extracted search engines from snapshot: {list(config.keys())}" 

80 ) 

81 else: 

82 raise RuntimeError( 

83 "settings_snapshot is required for search engine creation in threads" 

84 ) 

85 

86 if engine_name not in config: 

87 # Check if engine_name might be a display label instead of a config key 

88 # Display labels have format: "{icon} {base_name} ({category})" 

89 # e.g., "🔬 OpenAlex (Scientific)" 

90 # NOTE: This fallback is deprecated - callers should pass config keys directly 

91 logger.warning( 

92 f"Engine '{engine_name}' not found in config - attempting display label fallback. " 

93 "This is deprecated; callers should pass the config key directly." 

94 ) 

95 

96 # Try to extract the base name from the label 

97 # To avoid ReDoS, we use string operations instead of regex 

98 # Pattern: icon, space, base_name, space, (category) 

99 # Example: "🔬 OpenAlex (Scientific)" 

100 if " (" in engine_name and engine_name.endswith(")"): 100 ↛ 102line 100 didn't jump to line 102 because the condition on line 100 was never true

101 # Split on the last occurrence of ' (' 

102 parts = engine_name.rsplit(" (", 1) 

103 if len(parts) == 2: 

104 # Remove icon (first word) from the beginning 

105 before_paren = parts[0] 

106 space_idx = before_paren.find(" ") 

107 if space_idx > 0: 

108 base_name = before_paren[space_idx + 1 :].strip() 

109 logger.info( 

110 f"Extracted base name '{base_name}' from label '{engine_name}'" 

111 ) 

112 

113 # Search for a config entry with matching display_name 

114 for config_key, config_data in config.items(): 

115 if isinstance(config_data, dict): 

116 display_name = config_data.get( 

117 "display_name", config_key 

118 ) 

119 if display_name == base_name: 

120 logger.info( 

121 f"Matched label to config key: '{engine_name}' -> '{config_key}'" 

122 ) 

123 engine_name = config_key 

124 break 

125 

126 # If still not found, use default 

127 if engine_name not in config: 127 ↛ 141line 127 didn't jump to line 141 because the condition on line 127 was always true

128 logger.warning( 

129 f"Search engine '{engine_name}' not found in config, using default" 

130 ) 

131 # Try to use 'auto' as default if available 

132 if "auto" in config: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 engine_name = "auto" 

134 else: 

135 logger.error( 

136 f"No default search engine available. Available engines: {list(config.keys())}" 

137 ) 

138 return None 

139 

140 # Get engine configuration 

141 engine_config = config[engine_name] 

142 

143 # Set default max_results from config if not provided in kwargs 

144 if "max_results" not in kwargs: 

145 if settings_snapshot and "search.max_results" in settings_snapshot: 

146 max_results = ( 

147 settings_snapshot["search.max_results"].get("value", 20) 

148 if isinstance(settings_snapshot["search.max_results"], dict) 

149 else settings_snapshot["search.max_results"] 

150 ) 

151 else: 

152 max_results = 20 

153 kwargs["max_results"] = max_results 

154 

155 # Check for API key requirements 

156 requires_api_key = engine_config.get("requires_api_key", False) 

157 

158 if requires_api_key: 

159 # Check the settings snapshot for the API key 

160 api_key = None 

161 api_key_path = f"search.engine.web.{engine_name}.api_key" 

162 

163 if settings_snapshot: 163 ↛ 174line 163 didn't jump to line 174 because the condition on line 163 was always true

164 api_key_setting = settings_snapshot.get(api_key_path) 

165 

166 if api_key_setting: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 api_key = ( 

168 api_key_setting.get("value") 

169 if isinstance(api_key_setting, dict) 

170 else api_key_setting 

171 ) 

172 

173 # Still try to get from engine config if not found 

174 if not api_key: 174 ↛ 177line 174 didn't jump to line 177 because the condition on line 174 was always true

175 api_key = engine_config.get("api_key") 

176 

177 if not api_key: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true

178 logger.info( 

179 f"Required API key for {engine_name} not found in settings." 

180 ) 

181 return None 

182 

183 # Pass the API key in kwargs for engines that need it 

184 if api_key: 

185 kwargs["api_key"] = api_key 

186 

187 # Check for LLM requirements 

188 if engine_config.get("requires_llm", False) and not llm: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 logger.info( 

190 f"Engine {engine_name} requires an LLM instance but none was provided" 

191 ) 

192 return None 

193 

194 try: 

195 # Load the engine class 

196 module_path = engine_config["module_path"] 

197 class_name = engine_config["class_name"] 

198 

199 package = None 

200 if module_path.startswith("."): 

201 # This is a relative import. Assume it's relative to 

202 # `web_search_engines`. 

203 package = "local_deep_research.web_search_engines" 

204 module = importlib.import_module(module_path, package=package) 

205 engine_class = getattr(module, class_name) 

206 

207 # Get the engine class's __init__ parameters to filter out unsupported ones 

208 engine_init_signature = inspect.signature(engine_class.__init__) 

209 engine_init_params = list(engine_init_signature.parameters.keys()) 

210 

211 # Combine default parameters with provided ones 

212 all_params = {**engine_config.get("default_params", {}), **kwargs} 

213 

214 # Filter out parameters that aren't accepted by the engine class 

215 # Note: 'self' is always the first parameter of instance methods, so we skip it 

216 filtered_params = { 

217 k: v for k, v in all_params.items() if k in engine_init_params[1:] 

218 } 

219 

220 # Always pass settings_snapshot if the engine accepts it 

221 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 filtered_params["settings_snapshot"] = settings_snapshot 

223 

224 # Pass programmatic_mode if the engine accepts it 

225 if "programmatic_mode" in engine_init_params[1:]: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true

226 filtered_params["programmatic_mode"] = programmatic_mode 

227 

228 # Add LLM if required OR if provided and engine accepts it 

229 if engine_config.get("requires_llm", False): 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 filtered_params["llm"] = llm 

231 elif ( 

232 "llm" in engine_init_params[1:] 

233 and llm 

234 and "llm" not in filtered_params 

235 ): 

236 # If LLM was provided and engine accepts it, pass it through 

237 filtered_params["llm"] = llm 

238 logger.info( 

239 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)" 

240 ) 

241 

242 # Add API key if required and not already in filtered_params 

243 if ( 243 ↛ 248line 243 didn't jump to line 248 because the condition on line 243 was never true

244 engine_config.get("requires_api_key", False) 

245 and "api_key" not in filtered_params 

246 ): 

247 # Use the api_key we got earlier from settings 

248 if api_key: 

249 filtered_params["api_key"] = api_key 

250 

251 logger.info( 

252 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}" 

253 ) 

254 

255 # Create the engine instance with filtered parameters 

256 engine = engine_class(**filtered_params) 

257 

258 # Determine if this engine should use LLM relevance filtering 

259 # Priority: per-engine setting > auto-detection > global setting 

260 # 

261 # Rationale: 

262 # - Academic engines (arXiv, Semantic Scholar) use simple keyword matching 

263 # and benefit significantly from LLM-based relevance filtering 

264 # - Generic engines (Google, Brave, SearXNG) already use semantic search 

265 # and LLM filtering is redundant/wasteful 

266 # - CrossEngineFilter still ranks combined results at the strategy level 

267 should_filter = False 

268 

269 # Check for per-engine setting first (highest priority) 

270 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter" 

271 if settings_snapshot and per_engine_key in settings_snapshot: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 per_engine_setting = settings_snapshot[per_engine_key] 

273 should_filter = ( 

274 per_engine_setting.get("value", False) 

275 if isinstance(per_engine_setting, dict) 

276 else per_engine_setting 

277 ) 

278 logger.info( 

279 f"Using per-engine setting for {engine_name}: " 

280 f"enable_llm_relevance_filter={should_filter}" 

281 ) 

282 else: 

283 # Auto-detection based on engine type (medium priority) 

284 # Scientific engines benefit from LLM filtering (simple keyword search) 

285 # Generic engines already have semantic search (waste of LLM calls) 

286 if ( 286 ↛ 290line 286 didn't jump to line 290 because the condition on line 286 was never true

287 hasattr(engine_class, "is_scientific") 

288 and engine_class.is_scientific 

289 ): 

290 should_filter = True 

291 logger.info( 

292 f"Auto-enabling LLM filtering for scientific engine: {engine_name}" 

293 ) 

294 elif ( 

295 hasattr(engine_class, "is_generic") and engine_class.is_generic 

296 ): 

297 should_filter = False 

298 logger.debug( 

299 f"Auto-disabling LLM filtering for generic engine: {engine_name} " 

300 f"(already semantic)" 

301 ) 

302 

303 # Check global override (lowest priority but overrides auto-detection) 

304 if ( 

305 settings_snapshot 

306 and "search.skip_relevance_filter" in settings_snapshot 

307 ): 

308 skip_filter_setting = settings_snapshot[ 

309 "search.skip_relevance_filter" 

310 ] 

311 skip_filter = ( 

312 skip_filter_setting.get("value", False) 

313 if isinstance(skip_filter_setting, dict) 

314 else skip_filter_setting 

315 ) 

316 if skip_filter: 316 ↛ 323line 316 didn't jump to line 323 because the condition on line 316 was always true

317 should_filter = False 

318 logger.info( 

319 f"Global skip_relevance_filter=True overrides for {engine_name}" 

320 ) 

321 

322 # Apply the setting 

323 if should_filter and hasattr(engine, "llm") and engine.llm: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 engine.enable_llm_relevance_filter = True 

325 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}") 

326 elif not hasattr(engine, "llm") or not engine.llm: 

327 logger.debug( 

328 f"LLM relevance filtering not available for {engine_name} (no LLM)" 

329 ) 

330 else: 

331 logger.debug(f"LLM relevance filtering disabled for {engine_name}") 

332 

333 # Check if we need to wrap with full search capabilities 

334 if kwargs.get("use_full_search", False) and engine_config.get( 334 ↛ 337line 334 didn't jump to line 337 because the condition on line 334 was never true

335 "supports_full_search", False 

336 ): 

337 return _create_full_search_wrapper( 

338 engine_name, engine, llm, kwargs, username, settings_snapshot 

339 ) 

340 

341 return engine 

342 

343 except Exception: 

344 logger.exception(f"Failed to create search engine '{engine_name}'") 

345 return None 

346 

347 

348def _create_full_search_wrapper( 

349 engine_name: str, 

350 base_engine: BaseSearchEngine, 

351 llm, 

352 params: Dict[str, Any], 

353 username: str = None, 

354 settings_snapshot: Dict[str, Any] = None, 

355) -> Optional[BaseSearchEngine]: 

356 """Create a full search wrapper for the base engine if supported""" 

357 try: 

358 # Extract search engine config from settings snapshot 

359 if settings_snapshot: 

360 config = {} 

361 

362 # Extract web search engines 

363 web_engines = {} 

364 for key, value in settings_snapshot.items(): 

365 if key.startswith("search.engine.web."): 

366 # Extract engine name from key like "search.engine.web.searxng.class_name" 

367 parts = key.split(".") 

368 if len(parts) >= 4: 

369 engine_name_from_key = parts[3] 

370 if engine_name_from_key not in web_engines: 

371 web_engines[engine_name_from_key] = {} 

372 # Store the config value 

373 remaining_key = ( 

374 ".".join(parts[4:]) if len(parts) > 4 else "" 

375 ) 

376 if remaining_key: 

377 web_engines[engine_name_from_key][remaining_key] = ( 

378 value.get("value") 

379 if isinstance(value, dict) 

380 else value 

381 ) 

382 

383 config.update(web_engines) 

384 else: 

385 # Fallback to search_config if no snapshot (not recommended for threads) 

386 config = search_config( 

387 username=username, settings_snapshot=settings_snapshot 

388 ) 

389 

390 if engine_name not in config: 

391 logger.warning(f"Engine config for {engine_name} not found") 

392 return base_engine 

393 

394 engine_config = config[engine_name] 

395 

396 # Get full search class details 

397 module_path = engine_config.get("full_search_module") 

398 class_name = engine_config.get("full_search_class") 

399 

400 if not module_path or not class_name: 

401 logger.warning( 

402 f"Full search configuration missing for {engine_name}" 

403 ) 

404 return base_engine 

405 

406 # Import the full search class 

407 module = importlib.import_module(module_path) 

408 full_search_class = getattr(module, class_name) 

409 

410 # Get the wrapper's __init__ parameters to filter out unsupported ones 

411 wrapper_init_signature = inspect.signature(full_search_class.__init__) 

412 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[ 

413 1: 

414 ] # Skip 'self' 

415 

416 # Extract relevant parameters for the full search wrapper 

417 wrapper_params = { 

418 k: v for k, v in params.items() if k in wrapper_init_params 

419 } 

420 

421 # Special case for SerpAPI which needs the API key directly 

422 if ( 

423 engine_name == "serpapi" 

424 and "serpapi_api_key" in wrapper_init_params 

425 ): 

426 # Check settings snapshot for API key 

427 serpapi_api_key = None 

428 if settings_snapshot: 

429 serpapi_setting = settings_snapshot.get( 

430 "search.engine.web.serpapi.api_key" 

431 ) 

432 if serpapi_setting: 

433 serpapi_api_key = ( 

434 serpapi_setting.get("value") 

435 if isinstance(serpapi_setting, dict) 

436 else serpapi_setting 

437 ) 

438 if serpapi_api_key: 

439 wrapper_params["serpapi_api_key"] = serpapi_api_key 

440 

441 # Map some parameter names to what the wrapper expects 

442 if ( 

443 "language" in params 

444 and "search_language" not in params 

445 and "language" in wrapper_init_params 

446 ): 

447 wrapper_params["language"] = params["language"] 

448 

449 if ( 

450 "safesearch" not in wrapper_params 

451 and "safe_search" in params 

452 and "safesearch" in wrapper_init_params 

453 ): 

454 wrapper_params["safesearch"] = ( 

455 "active" if params["safe_search"] else "off" 

456 ) 

457 

458 # Special case for Brave which needs the API key directly 

459 if engine_name == "brave" and "api_key" in wrapper_init_params: 

460 # Check settings snapshot for API key 

461 brave_api_key = None 

462 if settings_snapshot: 

463 brave_setting = settings_snapshot.get( 

464 "search.engine.web.brave.api_key" 

465 ) 

466 if brave_setting: 

467 brave_api_key = ( 

468 brave_setting.get("value") 

469 if isinstance(brave_setting, dict) 

470 else brave_setting 

471 ) 

472 

473 if brave_api_key: 

474 wrapper_params["api_key"] = brave_api_key 

475 

476 # Map some parameter names to what the wrapper expects 

477 if ( 

478 "language" in params 

479 and "search_language" not in params 

480 and "language" in wrapper_init_params 

481 ): 

482 wrapper_params["language"] = params["language"] 

483 

484 if ( 

485 "safesearch" not in wrapper_params 

486 and "safe_search" in params 

487 and "safesearch" in wrapper_init_params 

488 ): 

489 wrapper_params["safesearch"] = ( 

490 "moderate" if params["safe_search"] else "off" 

491 ) 

492 

493 # Always include llm if it's a parameter 

494 if "llm" in wrapper_init_params: 

495 wrapper_params["llm"] = llm 

496 

497 # If the wrapper needs the base engine and has a parameter for it 

498 if "web_search" in wrapper_init_params: 

499 wrapper_params["web_search"] = base_engine 

500 

501 logger.debug( 

502 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}" 

503 ) 

504 

505 # Create the full search wrapper with filtered parameters 

506 full_search = full_search_class(**wrapper_params) 

507 

508 return full_search 

509 

510 except Exception: 

511 logger.exception( 

512 f"Failed to create full search wrapper for {engine_name}" 

513 ) 

514 return base_engine 

515 

516 

517def get_search( 

518 search_tool: str, 

519 llm_instance, 

520 max_results: int = 10, 

521 region: str = "us", 

522 time_period: str = "y", 

523 safe_search: bool = True, 

524 search_snippets_only: bool = False, 

525 search_language: str = "English", 

526 max_filtered_results: Optional[int] = None, 

527 settings_snapshot: Dict[str, Any] = None, 

528 programmatic_mode: bool = False, 

529): 

530 """ 

531 Get search tool instance based on the provided parameters. 

532 

533 Args: 

534 search_tool: Name of the search engine to use 

535 llm_instance: Language model instance 

536 max_results: Maximum number of search results 

537 region: Search region/locale 

538 time_period: Time period for search results 

539 safe_search: Whether to enable safe search 

540 search_snippets_only: Whether to return just snippets (vs. full content) 

541 search_language: Language for search results 

542 max_filtered_results: Maximum number of results to keep after filtering 

543 programmatic_mode: If True, disables database operations and metrics tracking 

544 

545 Returns: 

546 Initialized search engine instance 

547 """ 

548 # Common parameters 

549 params = { 

550 "max_results": max_results, 

551 "llm": llm_instance, # Only used by engines that need it 

552 } 

553 

554 # Add max_filtered_results if provided 

555 if max_filtered_results is not None: 555 ↛ 559line 555 didn't jump to line 559 because the condition on line 555 was always true

556 params["max_filtered_results"] = max_filtered_results 

557 

558 # Add engine-specific parameters 

559 if search_tool in ["duckduckgo", "serpapi", "google_pse", "brave"]: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 params.update( 

561 { 

562 "region": region, 

563 "safe_search": safe_search, 

564 "use_full_search": not search_snippets_only, 

565 } 

566 ) 

567 

568 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true

569 params["search_language"] = search_language 

570 

571 if search_tool == "wikinews": 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true

572 params["search_snippets_only"] = search_snippets_only 

573 params["adaptive_search"] = bool( 

574 settings_snapshot.get( 

575 "search.engine.web.wikinews.adaptive_search", {} 

576 ).get("value", True) 

577 ) 

578 

579 if search_tool in ["serpapi", "wikinews"]: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 params["time_period"] = time_period 

581 

582 # Create and return the search engine 

583 logger.info( 

584 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}" 

585 ) 

586 logger.info( 

587 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}" 

588 ) 

589 logger.info(f"Params being passed to create_search_engine: {params}") 

590 

591 engine = create_search_engine( 

592 search_tool, 

593 settings_snapshot=settings_snapshot, 

594 programmatic_mode=programmatic_mode, 

595 **params, 

596 ) 

597 

598 # Add debugging to check if engine is None 

599 if engine is None: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true

600 logger.error( 

601 f"Failed to create search engine for {search_tool} - returned None" 

602 ) 

603 else: 

604 engine_type = type(engine).__name__ 

605 logger.info( 

606 f"Successfully created search engine of type: {engine_type}" 

607 ) 

608 # Check if the engine has run method 

609 if hasattr(engine, "run"): 609 ↛ 612line 609 didn't jump to line 612 because the condition on line 609 was always true

610 logger.info(f"Engine has 'run' method: {engine.run}") 

611 else: 

612 logger.error("Engine does NOT have 'run' method!") 

613 

614 # For SearxNG, check availability flag 

615 if hasattr(engine, "is_available"): 

616 logger.info(f"Engine availability flag: {engine.is_available}") 

617 

618 return engine