Coverage for src / local_deep_research / web_search_engines / search_engine_factory.py: 70%

217 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import inspect 

2from typing import Any, Dict, Optional 

3 

4from loguru import logger 

5 

6from ..security.module_whitelist import get_safe_module_class 

7from ..utilities.enums import SearchMode 

8from .retriever_registry import retriever_registry 

9from .search_engine_base import BaseSearchEngine 

10from .search_engines_config import search_config 

11 

12 

13def create_search_engine( 

14 engine_name: str, 

15 llm=None, 

16 username: str = None, 

17 settings_snapshot: Dict[str, Any] = None, 

18 programmatic_mode: bool = False, 

19 **kwargs, 

20) -> Optional[BaseSearchEngine]: 

21 """ 

22 Create a search engine instance based on the engine name. 

23 

24 Args: 

25 engine_name: Name of the search engine to create 

26 llm: Language model instance (required for some engines like meta) 

27 programmatic_mode: If True, disables database operations and metrics tracking 

28 **kwargs: Additional parameters to override defaults 

29 

30 Returns: 

31 Initialized search engine instance or None if creation failed 

32 """ 

33 # Debug logging 

34 logger.info( 

35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})" 

36 ) 

37 

38 # Handle special parallel search engine variants 

39 if engine_name == "parallel_scientific": 

40 logger.info("Creating scientific parallel search engine") 

41 from .engines.parallel_search_engine import ParallelSearchEngine 

42 

43 return ParallelSearchEngine( 

44 llm=llm, 

45 search_mode=SearchMode.SCIENTIFIC, 

46 settings_snapshot=settings_snapshot, 

47 **kwargs, 

48 ) 

49 elif engine_name == "parallel": 

50 logger.info("Creating standard parallel search engine") 

51 from .engines.parallel_search_engine import ParallelSearchEngine 

52 

53 return ParallelSearchEngine( 

54 llm=llm, 

55 search_mode=SearchMode.ALL, 

56 settings_snapshot=settings_snapshot, 

57 **kwargs, 

58 ) 

59 

60 # Check if this is a registered retriever first 

61 retriever = retriever_registry.get(engine_name) 

62 if retriever: 

63 logger.info(f"Using registered LangChain retriever: {engine_name}") 

64 from .engines.search_engine_retriever import RetrieverSearchEngine 

65 

66 return RetrieverSearchEngine( 

67 retriever=retriever, 

68 name=engine_name, 

69 max_results=kwargs.get("max_results", 10), 

70 ) 

71 

72 # Extract search engine configs from settings snapshot 

73 if settings_snapshot: 

74 config = search_config( 

75 username=username, settings_snapshot=settings_snapshot 

76 ) 

77 

78 logger.debug( 

79 f"Extracted search engines from snapshot: {list(config.keys())}" 

80 ) 

81 else: 

82 raise RuntimeError( 

83 "settings_snapshot is required for search engine creation in threads" 

84 ) 

85 

86 if engine_name not in config: 

87 # Check if engine_name might be a display label instead of a config key 

88 # Display labels have format: "{icon} {base_name} ({category})" 

89 # e.g., "🔬 OpenAlex (Scientific)" 

90 # NOTE: This fallback is deprecated - callers should pass config keys directly 

91 logger.warning( 

92 f"Engine '{engine_name}' not found in config - attempting display label fallback. " 

93 "This is deprecated; callers should pass the config key directly." 

94 ) 

95 

96 # Try to extract the base name from the label 

97 # To avoid ReDoS, we use string operations instead of regex 

98 # Pattern: icon, space, base_name, space, (category) 

99 # Example: "🔬 OpenAlex (Scientific)" 

100 if " (" in engine_name and engine_name.endswith(")"): 100 ↛ 102line 100 didn't jump to line 102 because the condition on line 100 was never true

101 # Split on the last occurrence of ' (' 

102 parts = engine_name.rsplit(" (", 1) 

103 if len(parts) == 2: 

104 # Remove icon (first word) from the beginning 

105 before_paren = parts[0] 

106 space_idx = before_paren.find(" ") 

107 if space_idx > 0: 

108 base_name = before_paren[space_idx + 1 :].strip() 

109 logger.info( 

110 f"Extracted base name '{base_name}' from label '{engine_name}'" 

111 ) 

112 

113 # Search for a config entry with matching display_name 

114 for config_key, config_data in config.items(): 

115 if isinstance(config_data, dict): 

116 display_name = config_data.get( 

117 "display_name", config_key 

118 ) 

119 if display_name == base_name: 

120 logger.info( 

121 f"Matched label to config key: '{engine_name}' -> '{config_key}'" 

122 ) 

123 engine_name = config_key 

124 break 

125 

126 # If still not found, use default 

127 if engine_name not in config: 127 ↛ 141line 127 didn't jump to line 141 because the condition on line 127 was always true

128 logger.warning( 

129 f"Search engine '{engine_name}' not found in config, using default" 

130 ) 

131 # Try to use 'auto' as default if available 

132 if "auto" in config: 

133 engine_name = "auto" 

134 else: 

135 logger.error( 

136 f"No default search engine available. Available engines: {list(config.keys())}" 

137 ) 

138 return None 

139 

140 # Get engine configuration 

141 engine_config = config[engine_name] 

142 

143 # Set default max_results from config if not provided in kwargs 

144 if "max_results" not in kwargs: 

145 if settings_snapshot and "search.max_results" in settings_snapshot: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 max_results = ( 

147 settings_snapshot["search.max_results"].get("value", 20) 

148 if isinstance(settings_snapshot["search.max_results"], dict) 

149 else settings_snapshot["search.max_results"] 

150 ) 

151 else: 

152 max_results = 20 

153 kwargs["max_results"] = max_results 

154 

155 # Check for API key requirements 

156 requires_api_key = engine_config.get("requires_api_key", False) 

157 

158 if requires_api_key: 

159 # Check the settings snapshot for the API key 

160 api_key = None 

161 api_key_path = f"search.engine.web.{engine_name}.api_key" 

162 

163 if settings_snapshot: 163 ↛ 174line 163 didn't jump to line 174 because the condition on line 163 was always true

164 api_key_setting = settings_snapshot.get(api_key_path) 

165 

166 if api_key_setting: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true

167 api_key = ( 

168 api_key_setting.get("value") 

169 if isinstance(api_key_setting, dict) 

170 else api_key_setting 

171 ) 

172 

173 # Still try to get from engine config if not found 

174 if not api_key: 174 ↛ 177line 174 didn't jump to line 177 because the condition on line 174 was always true

175 api_key = engine_config.get("api_key") 

176 

177 if not api_key: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true

178 logger.info( 

179 f"Required API key for {engine_name} not found in settings." 

180 ) 

181 return None 

182 

183 # Pass the API key in kwargs for engines that need it 

184 if api_key: 

185 kwargs["api_key"] = api_key 

186 

187 # Check for LLM requirements 

188 if engine_config.get("requires_llm", False) and not llm: 

189 logger.info( 

190 f"Engine {engine_name} requires an LLM instance but none was provided" 

191 ) 

192 return None 

193 

194 try: 

195 # Load the engine class 

196 module_path = engine_config["module_path"] 

197 class_name = engine_config["class_name"] 

198 

199 engine_class = get_safe_module_class(module_path, class_name) 

200 

201 # Get the engine class's __init__ parameters to filter out unsupported ones 

202 engine_init_signature = inspect.signature(engine_class.__init__) 

203 engine_init_params = list(engine_init_signature.parameters.keys()) 

204 

205 # Combine default parameters with provided ones 

206 all_params = {**engine_config.get("default_params", {}), **kwargs} 

207 

208 # Filter out parameters that aren't accepted by the engine class 

209 # Note: 'self' is always the first parameter of instance methods, so we skip it 

210 filtered_params = { 

211 k: v for k, v in all_params.items() if k in engine_init_params[1:] 

212 } 

213 

214 # Always pass settings_snapshot if the engine accepts it 

215 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot: 

216 filtered_params["settings_snapshot"] = settings_snapshot 

217 

218 # Pass programmatic_mode if the engine accepts it 

219 if "programmatic_mode" in engine_init_params[1:]: 

220 filtered_params["programmatic_mode"] = programmatic_mode 

221 

222 # Add LLM if required OR if provided and engine accepts it 

223 if engine_config.get("requires_llm", False): 

224 filtered_params["llm"] = llm 

225 elif ( 225 ↛ 231line 225 didn't jump to line 231 because the condition on line 225 was never true

226 "llm" in engine_init_params[1:] 

227 and llm 

228 and "llm" not in filtered_params 

229 ): 

230 # If LLM was provided and engine accepts it, pass it through 

231 filtered_params["llm"] = llm 

232 logger.info( 

233 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)" 

234 ) 

235 

236 # Add API key if required and not already in filtered_params 

237 if ( 237 ↛ 242line 237 didn't jump to line 242 because the condition on line 237 was never true

238 engine_config.get("requires_api_key", False) 

239 and "api_key" not in filtered_params 

240 ): 

241 # Use the api_key we got earlier from settings 

242 if api_key: 

243 filtered_params["api_key"] = api_key 

244 

245 logger.info( 

246 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}" 

247 ) 

248 

249 # Create the engine instance with filtered parameters 

250 engine = engine_class(**filtered_params) 

251 

252 # Determine if this engine should use LLM relevance filtering 

253 # Priority: per-engine setting > auto-detection > global setting 

254 # 

255 # Rationale: 

256 # - Academic engines (arXiv, Semantic Scholar) use simple keyword matching 

257 # and benefit significantly from LLM-based relevance filtering 

258 # - Generic engines (Google, Brave, SearXNG) already use semantic search 

259 # and LLM filtering is redundant/wasteful 

260 # - CrossEngineFilter still ranks combined results at the strategy level 

261 should_filter = False 

262 

263 # Check for per-engine setting first (highest priority) 

264 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter" 

265 if settings_snapshot and per_engine_key in settings_snapshot: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 per_engine_setting = settings_snapshot[per_engine_key] 

267 should_filter = ( 

268 per_engine_setting.get("value", False) 

269 if isinstance(per_engine_setting, dict) 

270 else per_engine_setting 

271 ) 

272 logger.info( 

273 f"Using per-engine setting for {engine_name}: " 

274 f"enable_llm_relevance_filter={should_filter}" 

275 ) 

276 else: 

277 # Auto-detection based on engine type (medium priority) 

278 # Scientific engines benefit from LLM filtering (simple keyword search) 

279 # Generic engines already have semantic search (waste of LLM calls) 

280 if ( 280 ↛ 284line 280 didn't jump to line 284 because the condition on line 280 was never true

281 hasattr(engine_class, "is_scientific") 

282 and engine_class.is_scientific 

283 ): 

284 should_filter = True 

285 logger.info( 

286 f"Auto-enabling LLM filtering for scientific engine: {engine_name}" 

287 ) 

288 elif ( 288 ↛ 291line 288 didn't jump to line 291 because the condition on line 288 was never true

289 hasattr(engine_class, "is_generic") and engine_class.is_generic 

290 ): 

291 should_filter = False 

292 logger.debug( 

293 f"Auto-disabling LLM filtering for generic engine: {engine_name} " 

294 f"(already semantic)" 

295 ) 

296 

297 # Check global override (lowest priority but overrides auto-detection) 

298 if ( 298 ↛ 302line 298 didn't jump to line 302 because the condition on line 298 was never true

299 settings_snapshot 

300 and "search.skip_relevance_filter" in settings_snapshot 

301 ): 

302 skip_filter_setting = settings_snapshot[ 

303 "search.skip_relevance_filter" 

304 ] 

305 skip_filter = ( 

306 skip_filter_setting.get("value", False) 

307 if isinstance(skip_filter_setting, dict) 

308 else skip_filter_setting 

309 ) 

310 if skip_filter: 

311 should_filter = False 

312 logger.info( 

313 f"Global skip_relevance_filter=True overrides for {engine_name}" 

314 ) 

315 

316 # Apply the setting 

317 if should_filter and hasattr(engine, "llm") and engine.llm: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 engine.enable_llm_relevance_filter = True 

319 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}") 

320 elif not hasattr(engine, "llm") or not engine.llm: 320 ↛ 325line 320 didn't jump to line 325 because the condition on line 320 was always true

321 logger.debug( 

322 f"LLM relevance filtering not available for {engine_name} (no LLM)" 

323 ) 

324 else: 

325 logger.debug(f"LLM relevance filtering disabled for {engine_name}") 

326 

327 # Check if we need to wrap with full search capabilities 

328 if kwargs.get("use_full_search", False) and engine_config.get( 328 ↛ 331line 328 didn't jump to line 331 because the condition on line 328 was never true

329 "supports_full_search", False 

330 ): 

331 return _create_full_search_wrapper( 

332 engine_name, engine, llm, kwargs, username, settings_snapshot 

333 ) 

334 

335 return engine 

336 

337 except Exception: 

338 logger.exception(f"Failed to create search engine '{engine_name}'") 

339 return None 

340 

341 

342def _create_full_search_wrapper( 

343 engine_name: str, 

344 base_engine: BaseSearchEngine, 

345 llm, 

346 params: Dict[str, Any], 

347 username: str = None, 

348 settings_snapshot: Dict[str, Any] = None, 

349) -> Optional[BaseSearchEngine]: 

350 """Create a full search wrapper for the base engine if supported""" 

351 try: 

352 # Extract search engine config from settings snapshot 

353 if settings_snapshot: 

354 config = {} 

355 

356 # Extract web search engines 

357 web_engines = {} 

358 for key, value in settings_snapshot.items(): 

359 if key.startswith("search.engine.web."): 

360 # Extract engine name from key like "search.engine.web.searxng.class_name" 

361 parts = key.split(".") 

362 if len(parts) >= 4: 362 ↛ 358line 362 didn't jump to line 358 because the condition on line 362 was always true

363 engine_name_from_key = parts[3] 

364 if engine_name_from_key not in web_engines: 

365 web_engines[engine_name_from_key] = {} 

366 # Store the config value 

367 remaining_key = ( 

368 ".".join(parts[4:]) if len(parts) > 4 else "" 

369 ) 

370 if remaining_key: 370 ↛ 358line 370 didn't jump to line 358 because the condition on line 370 was always true

371 web_engines[engine_name_from_key][remaining_key] = ( 

372 value.get("value") 

373 if isinstance(value, dict) 

374 else value 

375 ) 

376 

377 config.update(web_engines) 

378 else: 

379 # Fallback to search_config if no snapshot (not recommended for threads) 

380 config = search_config( 

381 username=username, settings_snapshot=settings_snapshot 

382 ) 

383 

384 if engine_name not in config: 

385 logger.warning(f"Engine config for {engine_name} not found") 

386 return base_engine 

387 

388 engine_config = config[engine_name] 

389 

390 # Get full search class details 

391 module_path = engine_config.get("full_search_module") 

392 class_name = engine_config.get("full_search_class") 

393 

394 if not module_path or not class_name: 

395 logger.warning( 

396 f"Full search configuration missing for {engine_name}" 

397 ) 

398 return base_engine 

399 

400 # Import the full search class 

401 full_search_class = get_safe_module_class(module_path, class_name) 

402 

403 # Get the wrapper's __init__ parameters to filter out unsupported ones 

404 wrapper_init_signature = inspect.signature(full_search_class.__init__) 

405 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[ 

406 1: 

407 ] # Skip 'self' 

408 

409 # Extract relevant parameters for the full search wrapper 

410 wrapper_params = { 

411 k: v for k, v in params.items() if k in wrapper_init_params 

412 } 

413 

414 # Special case for SerpAPI which needs the API key directly 

415 if ( 415 ↛ 420line 415 didn't jump to line 420 because the condition on line 415 was never true

416 engine_name == "serpapi" 

417 and "serpapi_api_key" in wrapper_init_params 

418 ): 

419 # Check settings snapshot for API key 

420 serpapi_api_key = None 

421 if settings_snapshot: 

422 serpapi_setting = settings_snapshot.get( 

423 "search.engine.web.serpapi.api_key" 

424 ) 

425 if serpapi_setting: 

426 serpapi_api_key = ( 

427 serpapi_setting.get("value") 

428 if isinstance(serpapi_setting, dict) 

429 else serpapi_setting 

430 ) 

431 if serpapi_api_key: 

432 wrapper_params["serpapi_api_key"] = serpapi_api_key 

433 

434 # Map some parameter names to what the wrapper expects 

435 if ( 

436 "language" in params 

437 and "search_language" not in params 

438 and "language" in wrapper_init_params 

439 ): 

440 wrapper_params["language"] = params["language"] 

441 

442 if ( 

443 "safesearch" not in wrapper_params 

444 and "safe_search" in params 

445 and "safesearch" in wrapper_init_params 

446 ): 

447 wrapper_params["safesearch"] = ( 

448 "active" if params["safe_search"] else "off" 

449 ) 

450 

451 # Special case for Brave which needs the API key directly 

452 if engine_name == "brave" and "api_key" in wrapper_init_params: 

453 # Check settings snapshot for API key 

454 brave_api_key = None 

455 if settings_snapshot: 455 ↛ 466line 455 didn't jump to line 466 because the condition on line 455 was always true

456 brave_setting = settings_snapshot.get( 

457 "search.engine.web.brave.api_key" 

458 ) 

459 if brave_setting: 

460 brave_api_key = ( 

461 brave_setting.get("value") 

462 if isinstance(brave_setting, dict) 

463 else brave_setting 

464 ) 

465 

466 if brave_api_key: 

467 wrapper_params["api_key"] = brave_api_key 

468 

469 # Map some parameter names to what the wrapper expects 

470 if ( 470 ↛ 475line 470 didn't jump to line 475 because the condition on line 470 was never true

471 "language" in params 

472 and "search_language" not in params 

473 and "language" in wrapper_init_params 

474 ): 

475 wrapper_params["language"] = params["language"] 

476 

477 if ( 477 ↛ 482line 477 didn't jump to line 482 because the condition on line 477 was never true

478 "safesearch" not in wrapper_params 

479 and "safe_search" in params 

480 and "safesearch" in wrapper_init_params 

481 ): 

482 wrapper_params["safesearch"] = ( 

483 "moderate" if params["safe_search"] else "off" 

484 ) 

485 

486 # Always include llm if it's a parameter 

487 if "llm" in wrapper_init_params: 487 ↛ 491line 487 didn't jump to line 491 because the condition on line 487 was always true

488 wrapper_params["llm"] = llm 

489 

490 # If the wrapper needs the base engine and has a parameter for it 

491 if "web_search" in wrapper_init_params: 

492 wrapper_params["web_search"] = base_engine 

493 

494 logger.debug( 

495 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}" 

496 ) 

497 

498 # Create the full search wrapper with filtered parameters 

499 full_search = full_search_class(**wrapper_params) 

500 

501 return full_search 

502 

503 except Exception: 

504 logger.exception( 

505 f"Failed to create full search wrapper for {engine_name}" 

506 ) 

507 return base_engine 

508 

509 

510def get_search( 

511 search_tool: str, 

512 llm_instance, 

513 max_results: int = 10, 

514 region: str = "us", 

515 time_period: str = "y", 

516 safe_search: bool = True, 

517 search_snippets_only: bool = False, 

518 search_language: str = "English", 

519 max_filtered_results: Optional[int] = None, 

520 settings_snapshot: Dict[str, Any] = None, 

521 programmatic_mode: bool = False, 

522): 

523 """ 

524 Get search tool instance based on the provided parameters. 

525 

526 Args: 

527 search_tool: Name of the search engine to use 

528 llm_instance: Language model instance 

529 max_results: Maximum number of search results 

530 region: Search region/locale 

531 time_period: Time period for search results 

532 safe_search: Whether to enable safe search 

533 search_snippets_only: Whether to return just snippets (vs. full content) 

534 search_language: Language for search results 

535 max_filtered_results: Maximum number of results to keep after filtering 

536 programmatic_mode: If True, disables database operations and metrics tracking 

537 

538 Returns: 

539 Initialized search engine instance 

540 """ 

541 # Common parameters 

542 params = { 

543 "max_results": max_results, 

544 "llm": llm_instance, # Only used by engines that need it 

545 } 

546 

547 # Add max_filtered_results if provided 

548 if max_filtered_results is not None: 

549 params["max_filtered_results"] = max_filtered_results 

550 

551 # Add engine-specific parameters 

552 if search_tool in [ 

553 "duckduckgo", 

554 "serpapi", 

555 "google_pse", 

556 "brave", 

557 "mojeek", 

558 ]: 

559 params.update( 

560 { 

561 "region": region, 

562 "safe_search": safe_search, 

563 "use_full_search": not search_snippets_only, 

564 } 

565 ) 

566 

567 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]: 

568 params["search_language"] = search_language 

569 

570 if search_tool == "wikinews": 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true

571 params["search_snippets_only"] = search_snippets_only 

572 params["adaptive_search"] = bool( 

573 settings_snapshot.get( 

574 "search.engine.web.wikinews.adaptive_search", {} 

575 ).get("value", True) 

576 ) 

577 

578 if search_tool in ["serpapi", "wikinews"]: 

579 params["time_period"] = time_period 

580 

581 # Create and return the search engine 

582 logger.info( 

583 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}" 

584 ) 

585 logger.info( 

586 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}" 

587 ) 

588 logger.info( 

589 f"Params being passed to create_search_engine: {list(params.keys()) if isinstance(params, dict) else type(params)}" 

590 ) 

591 

592 engine = create_search_engine( 

593 search_tool, 

594 settings_snapshot=settings_snapshot, 

595 programmatic_mode=programmatic_mode, 

596 **params, 

597 ) 

598 

599 # Add debugging to check if engine is None 

600 if engine is None: 

601 logger.error( 

602 f"Failed to create search engine for {search_tool} - returned None" 

603 ) 

604 else: 

605 engine_type = type(engine).__name__ 

606 logger.info( 

607 f"Successfully created search engine of type: {engine_type}" 

608 ) 

609 # Check if the engine has run method 

610 if hasattr(engine, "run"): 610 ↛ 613line 610 didn't jump to line 613 because the condition on line 610 was always true

611 logger.info(f"Engine has 'run' method: {engine.run}") 

612 else: 

613 logger.error("Engine does NOT have 'run' method!") 

614 

615 # For SearxNG, check availability flag 

616 if hasattr(engine, "is_available"): 616 ↛ 619line 616 didn't jump to line 619 because the condition on line 616 was always true

617 logger.info(f"Engine availability flag: {engine.is_available}") 

618 

619 return engine