Coverage for src / local_deep_research / web_search_engines / search_engine_factory.py: 94%
194 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import inspect
2from typing import Any, Dict, Optional
4from loguru import logger
6from ..security.module_whitelist import get_safe_module_class
7from ..utilities.enums import SearchMode
8from .retriever_registry import retriever_registry
9from .search_engine_base import BaseSearchEngine
10from .search_engines_config import search_config
13def create_search_engine(
14 engine_name: str,
15 llm=None,
16 username: str | None = None,
17 settings_snapshot: Dict[str, Any] | None = None,
18 programmatic_mode: bool = False,
19 **kwargs,
20) -> Optional[BaseSearchEngine]:
21 """
22 Create a search engine instance based on the engine name.
24 Args:
25 engine_name: Name of the search engine to create
26 llm: Language model instance (required for some engines like meta)
27 programmatic_mode: If True, disables database operations and metrics tracking
28 **kwargs: Additional parameters to override defaults
30 Returns:
31 Initialized search engine instance or None if creation failed
32 """
33 # Debug logging
34 logger.info(
35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})"
36 )
38 # Handle special parallel search engine variants
39 if engine_name == "parallel_scientific":
40 logger.info("Creating scientific parallel search engine")
41 from .engines.parallel_search_engine import ParallelSearchEngine
43 return ParallelSearchEngine(
44 llm=llm,
45 search_mode=SearchMode.SCIENTIFIC,
46 settings_snapshot=settings_snapshot,
47 **kwargs,
48 )
49 if engine_name == "parallel":
50 logger.info("Creating standard parallel search engine")
51 from .engines.parallel_search_engine import ParallelSearchEngine
53 return ParallelSearchEngine(
54 llm=llm,
55 search_mode=SearchMode.ALL,
56 settings_snapshot=settings_snapshot,
57 **kwargs,
58 )
60 # Check if this is a registered retriever first
61 retriever = retriever_registry.get(engine_name)
62 if retriever:
63 logger.info(f"Using registered LangChain retriever: {engine_name}")
64 from .engines.search_engine_retriever import RetrieverSearchEngine
66 return RetrieverSearchEngine(
67 retriever=retriever,
68 name=engine_name,
69 max_results=kwargs.get("max_results", 10),
70 )
72 # Extract search engine configs from settings snapshot
73 if settings_snapshot:
74 config = search_config(
75 username=username, settings_snapshot=settings_snapshot
76 )
78 logger.debug(
79 f"Extracted search engines from snapshot: {list(config.keys())}"
80 )
81 else:
82 raise RuntimeError(
83 "settings_snapshot is required for search engine creation in threads"
84 )
86 if engine_name not in config:
87 # Check if engine_name might be a display label instead of a config key
88 # Display labels have format: "{icon} {base_name} ({category})"
89 # e.g., "🔬 OpenAlex (Scientific)"
90 # NOTE: This fallback is deprecated - callers should pass config keys directly
91 logger.warning(
92 f"Engine '{engine_name}' not found in config - attempting display label fallback. "
93 "This is deprecated; callers should pass the config key directly."
94 )
96 # Try to extract the base name from the label
97 # To avoid ReDoS, we use string operations instead of regex
98 # Pattern: icon, space, base_name, space, (category)
99 # Example: "🔬 OpenAlex (Scientific)"
100 if " (" in engine_name and engine_name.endswith(")"):
101 # Split on the last occurrence of ' ('
102 parts = engine_name.rsplit(" (", 1)
103 if len(parts) == 2: 103 ↛ 127line 103 didn't jump to line 127 because the condition on line 103 was always true
104 # Remove icon (first word) from the beginning
105 before_paren = parts[0]
106 space_idx = before_paren.find(" ")
107 if space_idx > 0:
108 base_name = before_paren[space_idx + 1 :].strip()
109 logger.info(
110 f"Extracted base name '{base_name}' from label '{engine_name}'"
111 )
113 # Search for a config entry with matching display_name
114 for config_key, config_data in config.items():
115 if isinstance(config_data, dict): 115 ↛ 114line 115 didn't jump to line 114 because the condition on line 115 was always true
116 display_name = config_data.get(
117 "display_name", config_key
118 )
119 if display_name == base_name:
120 logger.info(
121 f"Matched label to config key: '{engine_name}' -> '{config_key}'"
122 )
123 engine_name = config_key
124 break
126 # If still not found, use default
127 if engine_name not in config:
128 logger.warning(
129 f"Search engine '{engine_name}' not found in config, using default"
130 )
131 # Try to use 'auto' as default if available
132 if "auto" in config:
133 engine_name = "auto"
134 else:
135 logger.error(
136 f"No default search engine available. Available engines: {list(config.keys())}"
137 )
138 return None
140 # Get engine configuration
141 engine_config = config[engine_name]
143 # Set default max_results from config if not provided in kwargs
144 if "max_results" not in kwargs:
145 if settings_snapshot and "search.max_results" in settings_snapshot:
146 max_results = (
147 settings_snapshot["search.max_results"].get("value", 20)
148 if isinstance(settings_snapshot["search.max_results"], dict)
149 else settings_snapshot["search.max_results"]
150 )
151 else:
152 max_results = 20
153 kwargs["max_results"] = max_results
155 # Check for API key requirements
156 requires_api_key = engine_config.get("requires_api_key", False)
158 if requires_api_key:
159 # Check the settings snapshot for the API key
160 api_key = None
161 api_key_path = f"search.engine.web.{engine_name}.api_key"
163 if settings_snapshot: 163 ↛ 174line 163 didn't jump to line 174 because the condition on line 163 was always true
164 api_key_setting = settings_snapshot.get(api_key_path)
166 if api_key_setting:
167 api_key = (
168 api_key_setting.get("value")
169 if isinstance(api_key_setting, dict)
170 else api_key_setting
171 )
173 # Still try to get from engine config if not found
174 if not api_key:
175 api_key = engine_config.get("api_key")
177 if not api_key:
178 logger.info(
179 f"Required API key for {engine_name} not found in settings."
180 )
181 return None
183 # Pass the API key in kwargs for engines that need it
184 if api_key: 184 ↛ 190line 184 didn't jump to line 190 because the condition on line 184 was always true
185 kwargs["api_key"] = api_key
187 # Warn about missing LLM but allow engine creation in degraded mode.
188 # All engines with requires_llm=True handle llm=None gracefully
189 # (e.g. skipping query optimization, using reliability-based sorting).
190 if engine_config.get("requires_llm", False) and not llm:
191 logger.warning(
192 f"Engine '{engine_name}' is configured with requires_llm=True but no LLM provided. "
193 f"Creating engine without LLM — some features (query optimization, relevance filtering) "
194 f"may be unavailable."
195 )
197 try:
198 # Load the engine class
199 module_path = engine_config["module_path"]
200 class_name = engine_config["class_name"]
202 engine_class = get_safe_module_class(module_path, class_name)
204 # Get the engine class's __init__ parameters to filter out unsupported ones
205 engine_init_signature = inspect.signature(engine_class.__init__)
206 engine_init_params = list(engine_init_signature.parameters.keys())
208 # Combine default parameters with provided ones
209 all_params = {**engine_config.get("default_params", {}), **kwargs}
211 # Filter out parameters that aren't accepted by the engine class
212 # Note: 'self' is always the first parameter of instance methods, so we skip it
213 filtered_params = {
214 k: v for k, v in all_params.items() if k in engine_init_params[1:]
215 }
217 # Always pass settings_snapshot if the engine accepts it
218 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot:
219 filtered_params["settings_snapshot"] = settings_snapshot
221 # Pass programmatic_mode if the engine accepts it
222 if "programmatic_mode" in engine_init_params[1:]:
223 filtered_params["programmatic_mode"] = programmatic_mode
225 # Add LLM if required OR if provided and engine accepts it
226 if engine_config.get("requires_llm", False):
227 filtered_params["llm"] = llm
228 elif (
229 "llm" in engine_init_params[1:]
230 and llm
231 and "llm" not in filtered_params
232 ):
233 # If LLM was provided and engine accepts it, pass it through
234 filtered_params["llm"] = llm
235 logger.info(
236 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)"
237 )
239 # Add API key if required and not already in filtered_params
240 if ( 240 ↛ 245line 240 didn't jump to line 245 because the condition on line 240 was never true
241 engine_config.get("requires_api_key", False)
242 and "api_key" not in filtered_params
243 ):
244 # Use the api_key we got earlier from settings
245 if api_key:
246 filtered_params["api_key"] = api_key
248 logger.info(
249 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}"
250 )
252 # Create the engine instance with filtered parameters
253 engine = engine_class(**filtered_params)
255 # Determine if this engine should use LLM relevance filtering
256 # Priority: per-engine setting > needs_llm_relevance_filter > global setting
257 #
258 # Rationale:
259 # - Engines with needs_llm_relevance_filter=True have poor native relevance ranking
260 # (keyword-only, no ML ranking) and benefit from LLM-based filtering
261 # - Well-ranked engines (Google, Brave) and semantic engines (Exa, Tavily)
262 # do not need this and should not waste LLM calls
263 # - The global skip_relevance_filter only affects unclassified engines
264 # - CrossEngineFilter still ranks combined results at the strategy level
265 should_filter = False
267 # Check for per-engine setting first (highest priority)
268 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter"
269 if settings_snapshot and per_engine_key in settings_snapshot:
270 per_engine_setting = settings_snapshot[per_engine_key]
271 should_filter = (
272 per_engine_setting.get("value", False)
273 if isinstance(per_engine_setting, dict)
274 else per_engine_setting
275 )
276 logger.info(
277 f"Using per-engine setting for {engine_name}: "
278 f"enable_llm_relevance_filter={should_filter}"
279 )
280 else:
281 # Auto-detection based on engine attribute (medium priority)
282 if (
283 hasattr(engine_class, "needs_llm_relevance_filter")
284 and engine_class.needs_llm_relevance_filter
285 ):
286 should_filter = True
287 logger.info(
288 f"Auto-enabling LLM filtering for {engine_name} "
289 f"(needs_llm_relevance_filter=True)"
290 )
291 else:
292 # Global override only applies to engines without needs_llm_relevance_filter
293 if (
294 settings_snapshot
295 and "search.skip_relevance_filter" in settings_snapshot
296 ):
297 skip_filter_setting = settings_snapshot[
298 "search.skip_relevance_filter"
299 ]
300 skip_filter = (
301 skip_filter_setting.get("value", False)
302 if isinstance(skip_filter_setting, dict)
303 else skip_filter_setting
304 )
305 if skip_filter: 305 ↛ 313line 305 didn't jump to line 313 because the condition on line 305 was always true
306 should_filter = False
307 logger.debug(
308 f"Global skip_relevance_filter=True applied "
309 f"for {engine_name}"
310 )
312 # Apply the setting
313 if should_filter and hasattr(engine, "llm") and engine.llm:
314 engine.enable_llm_relevance_filter = True
315 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}")
316 elif should_filter:
317 logger.warning(
318 f"LLM relevance filtering requested for {engine_name} "
319 f"but no LLM is available — filtering skipped"
320 )
321 else:
322 logger.debug(f"LLM relevance filtering disabled for {engine_name}")
324 # Check if we need to wrap with full search capabilities
325 if kwargs.get("use_full_search", False) and engine_config.get(
326 "supports_full_search", False
327 ):
328 return _create_full_search_wrapper(
329 engine_name,
330 engine,
331 engine_config,
332 llm,
333 kwargs,
334 username,
335 settings_snapshot,
336 )
338 return engine # type: ignore[no-any-return]
340 except Exception:
341 logger.exception(f"Failed to create search engine '{engine_name}'")
342 return None
345def _create_full_search_wrapper(
346 engine_name: str,
347 base_engine: BaseSearchEngine,
348 engine_config: Dict[str, Any],
349 llm,
350 params: Dict[str, Any],
351 username: str | None = None,
352 settings_snapshot: Dict[str, Any] | None = None,
353) -> Optional[BaseSearchEngine]:
354 """Create a full search wrapper for the base engine if supported"""
355 try:
356 # Get full search class details from engine_config (already has
357 # registry-injected values from search_config()).
358 module_path = engine_config.get("full_search_module")
359 class_name = engine_config.get("full_search_class")
361 if not module_path or not class_name:
362 logger.warning(
363 f"Full search configuration missing for {engine_name}"
364 )
365 return base_engine
367 # Import the full search class
368 full_search_class = get_safe_module_class(module_path, class_name)
370 # Get the wrapper's __init__ parameters to filter out unsupported ones
371 wrapper_init_signature = inspect.signature(full_search_class.__init__)
372 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[
373 1:
374 ] # Skip 'self'
376 # Extract relevant parameters for the full search wrapper
377 wrapper_params = {
378 k: v for k, v in params.items() if k in wrapper_init_params
379 }
381 # Special case for SerpAPI which needs the API key directly
382 if (
383 engine_name == "serpapi"
384 and "serpapi_api_key" in wrapper_init_params
385 ):
386 # Check settings snapshot for API key
387 serpapi_api_key = None
388 if settings_snapshot: 388 ↛ 398line 388 didn't jump to line 398 because the condition on line 388 was always true
389 serpapi_setting = settings_snapshot.get(
390 "search.engine.web.serpapi.api_key"
391 )
392 if serpapi_setting: 392 ↛ 398line 392 didn't jump to line 398 because the condition on line 392 was always true
393 serpapi_api_key = (
394 serpapi_setting.get("value")
395 if isinstance(serpapi_setting, dict)
396 else serpapi_setting
397 )
398 if serpapi_api_key: 398 ↛ 402line 398 didn't jump to line 402 because the condition on line 398 was always true
399 wrapper_params["serpapi_api_key"] = serpapi_api_key
401 # Map some parameter names to what the wrapper expects
402 if (
403 "language" in params
404 and "search_language" not in params
405 and "language" in wrapper_init_params
406 ):
407 wrapper_params["language"] = params["language"]
409 if ( 409 ↛ 419line 409 didn't jump to line 419 because the condition on line 409 was always true
410 "safesearch" not in wrapper_params
411 and "safe_search" in params
412 and "safesearch" in wrapper_init_params
413 ):
414 wrapper_params["safesearch"] = (
415 "active" if params["safe_search"] else "off"
416 )
418 # Special case for Brave which needs the API key directly
419 if engine_name == "brave" and "api_key" in wrapper_init_params:
420 # Check settings snapshot for API key
421 brave_api_key = None
422 if settings_snapshot:
423 brave_setting = settings_snapshot.get(
424 "search.engine.web.brave.api_key"
425 )
426 if brave_setting: 426 ↛ 433line 426 didn't jump to line 433 because the condition on line 426 was always true
427 brave_api_key = (
428 brave_setting.get("value")
429 if isinstance(brave_setting, dict)
430 else brave_setting
431 )
433 if brave_api_key:
434 wrapper_params["api_key"] = brave_api_key
436 # Map some parameter names to what the wrapper expects
437 if (
438 "language" in params
439 and "search_language" not in params
440 and "language" in wrapper_init_params
441 ):
442 wrapper_params["language"] = params["language"]
444 if (
445 "safesearch" not in wrapper_params
446 and "safe_search" in params
447 and "safesearch" in wrapper_init_params
448 ):
449 wrapper_params["safesearch"] = (
450 "moderate" if params["safe_search"] else "off"
451 )
453 # Always include llm if it's a parameter
454 if "llm" in wrapper_init_params: 454 ↛ 458line 454 didn't jump to line 458 because the condition on line 454 was always true
455 wrapper_params["llm"] = llm
457 # If the wrapper needs the base engine and has a parameter for it
458 if "web_search" in wrapper_init_params:
459 wrapper_params["web_search"] = base_engine
461 logger.debug(
462 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}"
463 )
465 # Create the full search wrapper with filtered parameters
466 service: BaseSearchEngine = full_search_class(**wrapper_params)
467 return service
469 except Exception:
470 logger.exception(
471 f"Failed to create full search wrapper for {engine_name}"
472 )
473 return base_engine
476def get_search(
477 search_tool: str,
478 llm_instance,
479 max_results: int = 10,
480 region: str = "us",
481 time_period: str = "y",
482 safe_search: bool = True,
483 search_snippets_only: bool = False,
484 search_language: str = "English",
485 max_filtered_results: Optional[int] = None,
486 settings_snapshot: Dict[str, Any] | None = None,
487 programmatic_mode: bool = False,
488):
489 """
490 Get search tool instance based on the provided parameters.
492 Args:
493 search_tool: Name of the search engine to use
494 llm_instance: Language model instance
495 max_results: Maximum number of search results
496 region: Search region/locale
497 time_period: Time period for search results
498 safe_search: Whether to enable safe search
499 search_snippets_only: Whether to return just snippets (vs. full content)
500 search_language: Language for search results
501 max_filtered_results: Maximum number of results to keep after filtering
502 programmatic_mode: If True, disables database operations and metrics tracking
504 Returns:
505 Initialized search engine instance
506 """
507 # Common parameters
508 params = {
509 "max_results": max_results,
510 "llm": llm_instance, # Only used by engines that need it
511 }
513 # Add max_filtered_results if provided
514 if max_filtered_results is not None:
515 params["max_filtered_results"] = max_filtered_results
517 # Add engine-specific parameters
518 if search_tool in [
519 "duckduckgo",
520 "serpapi",
521 "google_pse",
522 "brave",
523 "mojeek",
524 ]:
525 params.update(
526 {
527 "region": region,
528 "safe_search": safe_search,
529 "use_full_search": not search_snippets_only,
530 }
531 )
533 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]:
534 params["search_language"] = search_language
536 if search_tool == "wikinews":
537 params["search_snippets_only"] = search_snippets_only
538 params["adaptive_search"] = bool(
539 (settings_snapshot or {})
540 .get("search.engine.web.wikinews.adaptive_search", {})
541 .get("value", True)
542 )
544 if search_tool in ["serpapi", "wikinews"]:
545 params["time_period"] = time_period
547 # Create and return the search engine
548 logger.info(
549 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}"
550 )
551 logger.info(
552 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}"
553 )
554 logger.info(
555 f"Params being passed to create_search_engine: {list(params.keys()) if isinstance(params, dict) else type(params)}"
556 )
558 engine = create_search_engine(
559 search_tool,
560 settings_snapshot=settings_snapshot,
561 programmatic_mode=programmatic_mode,
562 **params,
563 )
565 # Add debugging to check if engine is None
566 if engine is None:
567 logger.error(
568 f"Failed to create search engine for {search_tool} - returned None"
569 )
570 else:
571 engine_type = type(engine).__name__
572 logger.info(
573 f"Successfully created search engine of type: {engine_type}"
574 )
575 # Check if the engine has run method
576 if hasattr(engine, "run"): 576 ↛ 579line 576 didn't jump to line 579 because the condition on line 576 was always true
577 logger.info(f"Engine has 'run' method: {engine.run}")
578 else:
579 logger.error("Engine does NOT have 'run' method!")
581 # For SearxNG, check availability flag
582 if hasattr(engine, "is_available"):
583 logger.info(f"Engine availability flag: {engine.is_available}")
585 return engine