Coverage for src/local_deep_research/web_search_engines/search_engine_factory.py: 94%
198 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import inspect
2from typing import Any, Dict, Optional
4from loguru import logger
6from ..security.module_whitelist import get_safe_module_class
7from ..utilities.enums import SearchMode
8from .retriever_registry import retriever_registry
9from .search_engine_base import BaseSearchEngine
10from .search_engines_config import search_config
13def create_search_engine(
14 engine_name: str,
15 llm=None,
16 username: str | None = None,
17 settings_snapshot: Dict[str, Any] | None = None,
18 programmatic_mode: bool = False,
19 **kwargs,
20) -> Optional[BaseSearchEngine]:
21 """
22 Create a search engine instance based on the engine name.
24 Args:
25 engine_name: Name of the search engine to create
26 llm: Language model instance (required for some engines like meta)
27 programmatic_mode: If True, disables database operations and metrics tracking
28 **kwargs: Additional parameters to override defaults
30 Returns:
31 Initialized search engine instance or None if creation failed
32 """
33 # Debug logging
34 logger.info(
35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})"
36 )
38 # Handle special parallel search engine variants
39 if engine_name == "parallel_scientific":
40 logger.info("Creating scientific parallel search engine")
41 from .engines.parallel_search_engine import ParallelSearchEngine
43 return ParallelSearchEngine(
44 llm=llm,
45 search_mode=SearchMode.SCIENTIFIC,
46 settings_snapshot=settings_snapshot,
47 programmatic_mode=programmatic_mode,
48 **kwargs,
49 )
50 if engine_name == "parallel":
51 logger.info("Creating standard parallel search engine")
52 from .engines.parallel_search_engine import ParallelSearchEngine
54 return ParallelSearchEngine(
55 llm=llm,
56 search_mode=SearchMode.ALL,
57 settings_snapshot=settings_snapshot,
58 programmatic_mode=programmatic_mode,
59 **kwargs,
60 )
62 # Check if this is a registered retriever first
63 retriever = retriever_registry.get(engine_name)
64 if retriever:
65 logger.info(f"Using registered LangChain retriever: {engine_name}")
66 from .engines.search_engine_retriever import RetrieverSearchEngine
68 return RetrieverSearchEngine(
69 retriever=retriever,
70 name=engine_name,
71 max_results=kwargs.get("max_results", 10),
72 programmatic_mode=programmatic_mode,
73 )
75 # Extract search engine configs from settings snapshot
76 if settings_snapshot:
77 config = search_config(
78 username=username, settings_snapshot=settings_snapshot
79 )
81 logger.debug(
82 f"Extracted search engines from snapshot: {list(config.keys())}"
83 )
84 else:
85 raise RuntimeError(
86 "settings_snapshot is required for search engine creation in threads"
87 )
89 if engine_name == "none":
90 # Reject the literal string "none". Historically this silently fell
91 # through to the "auto" engine and hit live networks — callers that
92 # wanted an offline pipeline were unknowingly doing real searches.
93 raise ValueError(
94 "search.tool='none' is not a valid engine. Register a LangChain "
95 "retriever via `retrievers={...}` (see "
96 "examples/llm_integration/mock_llm_example.py) or pick a real "
97 "engine. Previously this silently fell back to 'auto'."
98 )
100 if engine_name not in config:
101 # Check if engine_name might be a display label instead of a config key
102 # Display labels have format: "{icon} {base_name} ({category})"
103 # e.g., "🔬 OpenAlex (Scientific)"
104 # NOTE: This fallback is deprecated - callers should pass config keys directly
105 logger.warning(
106 f"Engine '{engine_name}' not found in config - attempting display label fallback. "
107 "This is deprecated; callers should pass the config key directly."
108 )
110 # Try to extract the base name from the label
111 # To avoid ReDoS, we use string operations instead of regex
112 # Pattern: icon, space, base_name, space, (category)
113 # Example: "🔬 OpenAlex (Scientific)"
114 if " (" in engine_name and engine_name.endswith(")"):
115 # Split on the last occurrence of ' ('
116 parts = engine_name.rsplit(" (", 1)
117 if len(parts) == 2: 117 ↛ 141line 117 didn't jump to line 141 because the condition on line 117 was always true
118 # Remove icon (first word) from the beginning
119 before_paren = parts[0]
120 space_idx = before_paren.find(" ")
121 if space_idx > 0:
122 base_name = before_paren[space_idx + 1 :].strip()
123 logger.info(
124 f"Extracted base name '{base_name}' from label '{engine_name}'"
125 )
127 # Search for a config entry with matching display_name
128 for config_key, config_data in config.items():
129 if isinstance(config_data, dict): 129 ↛ 128line 129 didn't jump to line 128 because the condition on line 129 was always true
130 display_name = config_data.get(
131 "display_name", config_key
132 )
133 if display_name == base_name:
134 logger.info(
135 f"Matched label to config key: '{engine_name}' -> '{config_key}'"
136 )
137 engine_name = config_key
138 break
140 # If still not found, use default
141 if engine_name not in config:
142 logger.warning(
143 f"Search engine '{engine_name}' not found in config, using default"
144 )
145 # Try to use 'auto' as default if available
146 if "auto" in config:
147 engine_name = "auto"
148 else:
149 logger.error(
150 f"No default search engine available. Available engines: {list(config.keys())}"
151 )
152 return None
154 # Get engine configuration
155 engine_config = config[engine_name]
157 # Set default max_results from config if not provided in kwargs
158 if "max_results" not in kwargs:
159 if settings_snapshot and "search.max_results" in settings_snapshot:
160 max_results = (
161 settings_snapshot["search.max_results"].get("value", 20)
162 if isinstance(settings_snapshot["search.max_results"], dict)
163 else settings_snapshot["search.max_results"]
164 )
165 else:
166 max_results = 20
167 kwargs["max_results"] = max_results
169 # Check for API key requirements
170 requires_api_key = engine_config.get("requires_api_key", False)
172 if requires_api_key:
173 # Check the settings snapshot for the API key
174 api_key = None
175 api_key_path = f"search.engine.web.{engine_name}.api_key"
177 if settings_snapshot: 177 ↛ 188line 177 didn't jump to line 188 because the condition on line 177 was always true
178 api_key_setting = settings_snapshot.get(api_key_path)
180 if api_key_setting:
181 api_key = (
182 api_key_setting.get("value")
183 if isinstance(api_key_setting, dict)
184 else api_key_setting
185 )
187 # Still try to get from engine config if not found
188 if not api_key:
189 api_key = engine_config.get("api_key")
191 if not api_key:
192 logger.info(
193 f"Required API key for {engine_name} not found in settings."
194 )
195 return None
197 # Pass the API key in kwargs for engines that need it
198 if api_key: 198 ↛ 204line 198 didn't jump to line 204 because the condition on line 198 was always true
199 kwargs["api_key"] = api_key
201 # Warn about missing LLM but allow engine creation in degraded mode.
202 # All engines with requires_llm=True handle llm=None gracefully
203 # (e.g. skipping query optimization, using reliability-based sorting).
204 if engine_config.get("requires_llm", False) and not llm:
205 logger.warning(
206 f"Engine '{engine_name}' is configured with requires_llm=True but no LLM provided. "
207 f"Creating engine without LLM — some features (query optimization, relevance filtering) "
208 f"may be unavailable."
209 )
211 try:
212 # Load the engine class
213 module_path = engine_config["module_path"]
214 class_name = engine_config["class_name"]
216 engine_class = get_safe_module_class(module_path, class_name)
218 # Get the engine class's __init__ parameters to filter out unsupported ones
219 engine_init_signature = inspect.signature(engine_class.__init__)
220 engine_init_params = list(engine_init_signature.parameters.keys())
222 # Combine default parameters with provided ones
223 all_params = {**engine_config.get("default_params", {}), **kwargs}
225 # Filter out parameters that aren't accepted by the engine class
226 # Note: 'self' is always the first parameter of instance methods, so we skip it
227 filtered_params = {
228 k: v for k, v in all_params.items() if k in engine_init_params[1:]
229 }
231 # Always pass settings_snapshot if the engine accepts it
232 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot:
233 filtered_params["settings_snapshot"] = settings_snapshot
235 # Pass programmatic_mode if the engine accepts it
236 if "programmatic_mode" in engine_init_params[1:]:
237 filtered_params["programmatic_mode"] = programmatic_mode
239 # Add LLM if required OR if provided and engine accepts it
240 if engine_config.get("requires_llm", False):
241 filtered_params["llm"] = llm
242 elif (
243 "llm" in engine_init_params[1:]
244 and llm
245 and "llm" not in filtered_params
246 ):
247 # If LLM was provided and engine accepts it, pass it through
248 filtered_params["llm"] = llm
249 logger.info(
250 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)"
251 )
253 # Add API key if required and not already in filtered_params
254 if ( 254 ↛ 259line 254 didn't jump to line 259 because the condition on line 254 was never true
255 engine_config.get("requires_api_key", False)
256 and "api_key" not in filtered_params
257 ):
258 # Use the api_key we got earlier from settings
259 if api_key:
260 filtered_params["api_key"] = api_key
262 logger.info(
263 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}"
264 )
266 # Create the engine instance with filtered parameters
267 engine = engine_class(**filtered_params)
269 # Most engine subclasses do not name ``programmatic_mode`` in their
270 # signature (or accept it via **kwargs without forwarding to
271 # ``super().__init__``), so the constructor often falls back to the
272 # BaseSearchEngine default of False even when the API caller asked
273 # for True. Apply the requested mode post-construction so the
274 # engine's rate tracker matches.
275 if isinstance(engine, BaseSearchEngine) and (
276 engine.programmatic_mode != programmatic_mode
277 ):
278 engine._configure_programmatic_mode(programmatic_mode)
280 # Determine if this engine should use LLM relevance filtering
281 # Priority: per-engine setting > needs_llm_relevance_filter > global setting
282 #
283 # Rationale:
284 # - Engines with needs_llm_relevance_filter=True have poor native relevance ranking
285 # (keyword-only, no ML ranking) and benefit from LLM-based filtering
286 # - Well-ranked engines (Google, Brave) and semantic engines (Exa, Tavily)
287 # do not need this and should not waste LLM calls
288 # - The global skip_relevance_filter only affects unclassified engines
289 # - CrossEngineFilter still ranks combined results at the strategy level
290 should_filter = False
292 # Check for per-engine setting first (highest priority)
293 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter"
294 if settings_snapshot and per_engine_key in settings_snapshot:
295 per_engine_setting = settings_snapshot[per_engine_key]
296 should_filter = (
297 per_engine_setting.get("value", False)
298 if isinstance(per_engine_setting, dict)
299 else per_engine_setting
300 )
301 logger.info(
302 f"Using per-engine setting for {engine_name}: "
303 f"enable_llm_relevance_filter={should_filter}"
304 )
305 else:
306 # Auto-detection based on engine attribute (medium priority)
307 if (
308 hasattr(engine_class, "needs_llm_relevance_filter")
309 and engine_class.needs_llm_relevance_filter
310 ):
311 should_filter = True
312 logger.info(
313 f"Auto-enabling LLM filtering for {engine_name} "
314 f"(needs_llm_relevance_filter=True)"
315 )
316 else:
317 # Global override only applies to engines without needs_llm_relevance_filter
318 if (
319 settings_snapshot
320 and "search.skip_relevance_filter" in settings_snapshot
321 ):
322 skip_filter_setting = settings_snapshot[
323 "search.skip_relevance_filter"
324 ]
325 skip_filter = (
326 skip_filter_setting.get("value", False)
327 if isinstance(skip_filter_setting, dict)
328 else skip_filter_setting
329 )
330 if skip_filter: 330 ↛ 338line 330 didn't jump to line 338 because the condition on line 330 was always true
331 should_filter = False
332 logger.debug(
333 f"Global skip_relevance_filter=True applied "
334 f"for {engine_name}"
335 )
337 # Apply the setting
338 if should_filter and hasattr(engine, "llm") and engine.llm:
339 engine.enable_llm_relevance_filter = True
340 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}")
341 elif should_filter:
342 logger.warning(
343 f"LLM relevance filtering requested for {engine_name} "
344 f"but no LLM is available — filtering skipped"
345 )
346 else:
347 logger.debug(f"LLM relevance filtering disabled for {engine_name}")
349 # Check if we need to wrap with full search capabilities
350 if kwargs.get("use_full_search", False) and engine_config.get(
351 "supports_full_search", False
352 ):
353 return _create_full_search_wrapper(
354 engine_name,
355 engine,
356 engine_config,
357 llm,
358 kwargs,
359 username,
360 settings_snapshot,
361 )
363 return engine # type: ignore[no-any-return]
365 except Exception:
366 logger.exception(f"Failed to create search engine '{engine_name}'")
367 return None
370def _create_full_search_wrapper(
371 engine_name: str,
372 base_engine: BaseSearchEngine,
373 engine_config: Dict[str, Any],
374 llm,
375 params: Dict[str, Any],
376 username: str | None = None,
377 settings_snapshot: Dict[str, Any] | None = None,
378) -> Optional[BaseSearchEngine]:
379 """Create a full search wrapper for the base engine if supported"""
380 try:
381 # Get full search class details from engine_config (already has
382 # registry-injected values from search_config()).
383 module_path = engine_config.get("full_search_module")
384 class_name = engine_config.get("full_search_class")
386 if not module_path or not class_name:
387 logger.warning(
388 f"Full search configuration missing for {engine_name}"
389 )
390 return base_engine
392 # Import the full search class
393 full_search_class = get_safe_module_class(module_path, class_name)
395 # Get the wrapper's __init__ parameters to filter out unsupported ones
396 wrapper_init_signature = inspect.signature(full_search_class.__init__)
397 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[
398 1:
399 ] # Skip 'self'
401 # Extract relevant parameters for the full search wrapper
402 wrapper_params = {
403 k: v for k, v in params.items() if k in wrapper_init_params
404 }
406 # Special case for SerpAPI which needs the API key directly
407 if (
408 engine_name == "serpapi"
409 and "serpapi_api_key" in wrapper_init_params
410 ):
411 # Check settings snapshot for API key
412 serpapi_api_key = None
413 if settings_snapshot: 413 ↛ 423line 413 didn't jump to line 423 because the condition on line 413 was always true
414 serpapi_setting = settings_snapshot.get(
415 "search.engine.web.serpapi.api_key"
416 )
417 if serpapi_setting: 417 ↛ 423line 417 didn't jump to line 423 because the condition on line 417 was always true
418 serpapi_api_key = (
419 serpapi_setting.get("value")
420 if isinstance(serpapi_setting, dict)
421 else serpapi_setting
422 )
423 if serpapi_api_key: 423 ↛ 427line 423 didn't jump to line 427 because the condition on line 423 was always true
424 wrapper_params["serpapi_api_key"] = serpapi_api_key
426 # Map some parameter names to what the wrapper expects
427 if (
428 "language" in params
429 and "search_language" not in params
430 and "language" in wrapper_init_params
431 ):
432 wrapper_params["language"] = params["language"]
434 if ( 434 ↛ 444line 434 didn't jump to line 444 because the condition on line 434 was always true
435 "safesearch" not in wrapper_params
436 and "safe_search" in params
437 and "safesearch" in wrapper_init_params
438 ):
439 wrapper_params["safesearch"] = (
440 "active" if params["safe_search"] else "off"
441 )
443 # Special case for Brave which needs the API key directly
444 if engine_name == "brave" and "api_key" in wrapper_init_params:
445 # Check settings snapshot for API key
446 brave_api_key = None
447 if settings_snapshot:
448 brave_setting = settings_snapshot.get(
449 "search.engine.web.brave.api_key"
450 )
451 if brave_setting: 451 ↛ 458line 451 didn't jump to line 458 because the condition on line 451 was always true
452 brave_api_key = (
453 brave_setting.get("value")
454 if isinstance(brave_setting, dict)
455 else brave_setting
456 )
458 if brave_api_key:
459 wrapper_params["api_key"] = brave_api_key
461 # Map some parameter names to what the wrapper expects
462 if (
463 "language" in params
464 and "search_language" not in params
465 and "language" in wrapper_init_params
466 ):
467 wrapper_params["language"] = params["language"]
469 if (
470 "safesearch" not in wrapper_params
471 and "safe_search" in params
472 and "safesearch" in wrapper_init_params
473 ):
474 wrapper_params["safesearch"] = (
475 "moderate" if params["safe_search"] else "off"
476 )
478 # Always include llm if it's a parameter
479 if "llm" in wrapper_init_params: 479 ↛ 483line 479 didn't jump to line 483 because the condition on line 479 was always true
480 wrapper_params["llm"] = llm
482 # If the wrapper needs the base engine and has a parameter for it
483 if "web_search" in wrapper_init_params:
484 wrapper_params["web_search"] = base_engine
486 logger.debug(
487 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}"
488 )
490 # Create the full search wrapper with filtered parameters
491 service: BaseSearchEngine = full_search_class(**wrapper_params)
492 return service
494 except Exception:
495 logger.exception(
496 f"Failed to create full search wrapper for {engine_name}"
497 )
498 return base_engine
501def get_search(
502 search_tool: str,
503 llm_instance,
504 max_results: int = 10,
505 region: str = "us",
506 time_period: str = "y",
507 safe_search: bool = True,
508 search_snippets_only: bool = False,
509 search_language: str = "English",
510 max_filtered_results: Optional[int] = None,
511 settings_snapshot: Dict[str, Any] | None = None,
512 programmatic_mode: bool = False,
513):
514 """
515 Get search tool instance based on the provided parameters.
517 Args:
518 search_tool: Name of the search engine to use
519 llm_instance: Language model instance
520 max_results: Maximum number of search results
521 region: Search region/locale
522 time_period: Time period for search results
523 safe_search: Whether to enable safe search
524 search_snippets_only: Whether to return just snippets (vs. full content)
525 search_language: Language for search results
526 max_filtered_results: Maximum number of results to keep after filtering
527 programmatic_mode: If True, disables database operations and metrics tracking
529 Returns:
530 Initialized search engine instance
531 """
532 # Common parameters
533 params = {
534 "max_results": max_results,
535 "llm": llm_instance, # Only used by engines that need it
536 }
538 # Add max_filtered_results if provided
539 if max_filtered_results is not None:
540 params["max_filtered_results"] = max_filtered_results
542 # Add engine-specific parameters
543 if search_tool in [
544 "duckduckgo",
545 "serpapi",
546 "google_pse",
547 "brave",
548 "mojeek",
549 ]:
550 params.update(
551 {
552 "region": region,
553 "safe_search": safe_search,
554 "use_full_search": not search_snippets_only,
555 }
556 )
558 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]:
559 params["search_language"] = search_language
561 if search_tool == "wikinews":
562 params["search_snippets_only"] = search_snippets_only
563 params["adaptive_search"] = bool(
564 (settings_snapshot or {})
565 .get("search.engine.web.wikinews.adaptive_search", {})
566 .get("value", True)
567 )
569 if search_tool in ["serpapi", "wikinews"]:
570 params["time_period"] = time_period
572 # Create and return the search engine
573 logger.info(
574 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}"
575 )
576 logger.info(
577 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}"
578 )
579 logger.info(
580 f"Params being passed to create_search_engine: {list(params.keys()) if isinstance(params, dict) else type(params)}"
581 )
583 engine = create_search_engine(
584 search_tool,
585 settings_snapshot=settings_snapshot,
586 programmatic_mode=programmatic_mode,
587 **params,
588 )
590 # Add debugging to check if engine is None
591 if engine is None:
592 logger.error(
593 f"Failed to create search engine for {search_tool} - returned None"
594 )
595 else:
596 engine_type = type(engine).__name__
597 logger.info(
598 f"Successfully created search engine of type: {engine_type}"
599 )
600 # Check if the engine has run method
601 if hasattr(engine, "run"): 601 ↛ 604line 601 didn't jump to line 604 because the condition on line 601 was always true
602 logger.info(f"Engine has 'run' method: {engine.run}")
603 else:
604 logger.error("Engine does NOT have 'run' method!")
606 # For SearxNG, check availability flag
607 if hasattr(engine, "is_available"):
608 logger.info(f"Engine availability flag: {engine.is_available}")
610 return engine