Coverage for src / local_deep_research / web_search_engines / search_engine_factory.py: 47%
222 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import importlib
2import inspect
3from typing import Any, Dict, Optional
5from loguru import logger
7from ..utilities.enums import SearchMode
8from .retriever_registry import retriever_registry
9from .search_engine_base import BaseSearchEngine
10from .search_engines_config import search_config
13def create_search_engine(
14 engine_name: str,
15 llm=None,
16 username: str = None,
17 settings_snapshot: Dict[str, Any] = None,
18 programmatic_mode: bool = False,
19 **kwargs,
20) -> Optional[BaseSearchEngine]:
21 """
22 Create a search engine instance based on the engine name.
24 Args:
25 engine_name: Name of the search engine to create
26 llm: Language model instance (required for some engines like meta)
27 programmatic_mode: If True, disables database operations and metrics tracking
28 **kwargs: Additional parameters to override defaults
30 Returns:
31 Initialized search engine instance or None if creation failed
32 """
33 # Debug logging
34 logger.info(
35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})"
36 )
38 # Handle special parallel search engine variants
39 if engine_name == "parallel_scientific":
40 logger.info("Creating scientific parallel search engine")
41 from .engines.parallel_search_engine import ParallelSearchEngine
43 return ParallelSearchEngine(
44 llm=llm,
45 search_mode=SearchMode.SCIENTIFIC,
46 settings_snapshot=settings_snapshot,
47 **kwargs,
48 )
49 elif engine_name == "parallel":
50 logger.info("Creating standard parallel search engine")
51 from .engines.parallel_search_engine import ParallelSearchEngine
53 return ParallelSearchEngine(
54 llm=llm,
55 search_mode=SearchMode.ALL,
56 settings_snapshot=settings_snapshot,
57 **kwargs,
58 )
60 # Check if this is a registered retriever first
61 retriever = retriever_registry.get(engine_name)
62 if retriever:
63 logger.info(f"Using registered LangChain retriever: {engine_name}")
64 from .engines.search_engine_retriever import RetrieverSearchEngine
66 return RetrieverSearchEngine(
67 retriever=retriever,
68 name=engine_name,
69 max_results=kwargs.get("max_results", 10),
70 )
72 # Extract search engine configs from settings snapshot
73 if settings_snapshot:
74 config = search_config(
75 username=username, settings_snapshot=settings_snapshot
76 )
78 logger.debug(
79 f"Extracted search engines from snapshot: {list(config.keys())}"
80 )
81 else:
82 raise RuntimeError(
83 "settings_snapshot is required for search engine creation in threads"
84 )
86 if engine_name not in config:
87 # Check if engine_name might be a display label instead of a config key
88 # Display labels have format: "{icon} {base_name} ({category})"
89 # e.g., "🔬 OpenAlex (Scientific)"
90 # NOTE: This fallback is deprecated - callers should pass config keys directly
91 logger.warning(
92 f"Engine '{engine_name}' not found in config - attempting display label fallback. "
93 "This is deprecated; callers should pass the config key directly."
94 )
96 # Try to extract the base name from the label
97 # To avoid ReDoS, we use string operations instead of regex
98 # Pattern: icon, space, base_name, space, (category)
99 # Example: "🔬 OpenAlex (Scientific)"
100 if " (" in engine_name and engine_name.endswith(")"): 100 ↛ 102line 100 didn't jump to line 102 because the condition on line 100 was never true
101 # Split on the last occurrence of ' ('
102 parts = engine_name.rsplit(" (", 1)
103 if len(parts) == 2:
104 # Remove icon (first word) from the beginning
105 before_paren = parts[0]
106 space_idx = before_paren.find(" ")
107 if space_idx > 0:
108 base_name = before_paren[space_idx + 1 :].strip()
109 logger.info(
110 f"Extracted base name '{base_name}' from label '{engine_name}'"
111 )
113 # Search for a config entry with matching display_name
114 for config_key, config_data in config.items():
115 if isinstance(config_data, dict):
116 display_name = config_data.get(
117 "display_name", config_key
118 )
119 if display_name == base_name:
120 logger.info(
121 f"Matched label to config key: '{engine_name}' -> '{config_key}'"
122 )
123 engine_name = config_key
124 break
126 # If still not found, use default
127 if engine_name not in config: 127 ↛ 141line 127 didn't jump to line 141 because the condition on line 127 was always true
128 logger.warning(
129 f"Search engine '{engine_name}' not found in config, using default"
130 )
131 # Try to use 'auto' as default if available
132 if "auto" in config: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 engine_name = "auto"
134 else:
135 logger.error(
136 f"No default search engine available. Available engines: {list(config.keys())}"
137 )
138 return None
140 # Get engine configuration
141 engine_config = config[engine_name]
143 # Set default max_results from config if not provided in kwargs
144 if "max_results" not in kwargs:
145 if settings_snapshot and "search.max_results" in settings_snapshot:
146 max_results = (
147 settings_snapshot["search.max_results"].get("value", 20)
148 if isinstance(settings_snapshot["search.max_results"], dict)
149 else settings_snapshot["search.max_results"]
150 )
151 else:
152 max_results = 20
153 kwargs["max_results"] = max_results
155 # Check for API key requirements
156 requires_api_key = engine_config.get("requires_api_key", False)
158 if requires_api_key:
159 # Check the settings snapshot for the API key
160 api_key = None
161 api_key_path = f"search.engine.web.{engine_name}.api_key"
163 if settings_snapshot: 163 ↛ 174line 163 didn't jump to line 174 because the condition on line 163 was always true
164 api_key_setting = settings_snapshot.get(api_key_path)
166 if api_key_setting: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 api_key = (
168 api_key_setting.get("value")
169 if isinstance(api_key_setting, dict)
170 else api_key_setting
171 )
173 # Still try to get from engine config if not found
174 if not api_key: 174 ↛ 177line 174 didn't jump to line 177 because the condition on line 174 was always true
175 api_key = engine_config.get("api_key")
177 if not api_key: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true
178 logger.info(
179 f"Required API key for {engine_name} not found in settings."
180 )
181 return None
183 # Pass the API key in kwargs for engines that need it
184 if api_key:
185 kwargs["api_key"] = api_key
187 # Check for LLM requirements
188 if engine_config.get("requires_llm", False) and not llm: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 logger.info(
190 f"Engine {engine_name} requires an LLM instance but none was provided"
191 )
192 return None
194 try:
195 # Load the engine class
196 module_path = engine_config["module_path"]
197 class_name = engine_config["class_name"]
199 package = None
200 if module_path.startswith("."):
201 # This is a relative import. Assume it's relative to
202 # `web_search_engines`.
203 package = "local_deep_research.web_search_engines"
204 module = importlib.import_module(module_path, package=package)
205 engine_class = getattr(module, class_name)
207 # Get the engine class's __init__ parameters to filter out unsupported ones
208 engine_init_signature = inspect.signature(engine_class.__init__)
209 engine_init_params = list(engine_init_signature.parameters.keys())
211 # Combine default parameters with provided ones
212 all_params = {**engine_config.get("default_params", {}), **kwargs}
214 # Filter out parameters that aren't accepted by the engine class
215 # Note: 'self' is always the first parameter of instance methods, so we skip it
216 filtered_params = {
217 k: v for k, v in all_params.items() if k in engine_init_params[1:]
218 }
220 # Always pass settings_snapshot if the engine accepts it
221 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 filtered_params["settings_snapshot"] = settings_snapshot
224 # Pass programmatic_mode if the engine accepts it
225 if "programmatic_mode" in engine_init_params[1:]: 225 ↛ 226line 225 didn't jump to line 226 because the condition on line 225 was never true
226 filtered_params["programmatic_mode"] = programmatic_mode
228 # Add LLM if required OR if provided and engine accepts it
229 if engine_config.get("requires_llm", False): 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 filtered_params["llm"] = llm
231 elif (
232 "llm" in engine_init_params[1:]
233 and llm
234 and "llm" not in filtered_params
235 ):
236 # If LLM was provided and engine accepts it, pass it through
237 filtered_params["llm"] = llm
238 logger.info(
239 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)"
240 )
242 # Add API key if required and not already in filtered_params
243 if ( 243 ↛ 248line 243 didn't jump to line 248 because the condition on line 243 was never true
244 engine_config.get("requires_api_key", False)
245 and "api_key" not in filtered_params
246 ):
247 # Use the api_key we got earlier from settings
248 if api_key:
249 filtered_params["api_key"] = api_key
251 logger.info(
252 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}"
253 )
255 # Create the engine instance with filtered parameters
256 engine = engine_class(**filtered_params)
258 # Determine if this engine should use LLM relevance filtering
259 # Priority: per-engine setting > auto-detection > global setting
260 #
261 # Rationale:
262 # - Academic engines (arXiv, Semantic Scholar) use simple keyword matching
263 # and benefit significantly from LLM-based relevance filtering
264 # - Generic engines (Google, Brave, SearXNG) already use semantic search
265 # and LLM filtering is redundant/wasteful
266 # - CrossEngineFilter still ranks combined results at the strategy level
267 should_filter = False
269 # Check for per-engine setting first (highest priority)
270 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter"
271 if settings_snapshot and per_engine_key in settings_snapshot: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 per_engine_setting = settings_snapshot[per_engine_key]
273 should_filter = (
274 per_engine_setting.get("value", False)
275 if isinstance(per_engine_setting, dict)
276 else per_engine_setting
277 )
278 logger.info(
279 f"Using per-engine setting for {engine_name}: "
280 f"enable_llm_relevance_filter={should_filter}"
281 )
282 else:
283 # Auto-detection based on engine type (medium priority)
284 # Scientific engines benefit from LLM filtering (simple keyword search)
285 # Generic engines already have semantic search (waste of LLM calls)
286 if ( 286 ↛ 290line 286 didn't jump to line 290 because the condition on line 286 was never true
287 hasattr(engine_class, "is_scientific")
288 and engine_class.is_scientific
289 ):
290 should_filter = True
291 logger.info(
292 f"Auto-enabling LLM filtering for scientific engine: {engine_name}"
293 )
294 elif (
295 hasattr(engine_class, "is_generic") and engine_class.is_generic
296 ):
297 should_filter = False
298 logger.debug(
299 f"Auto-disabling LLM filtering for generic engine: {engine_name} "
300 f"(already semantic)"
301 )
303 # Check global override (lowest priority but overrides auto-detection)
304 if (
305 settings_snapshot
306 and "search.skip_relevance_filter" in settings_snapshot
307 ):
308 skip_filter_setting = settings_snapshot[
309 "search.skip_relevance_filter"
310 ]
311 skip_filter = (
312 skip_filter_setting.get("value", False)
313 if isinstance(skip_filter_setting, dict)
314 else skip_filter_setting
315 )
316 if skip_filter: 316 ↛ 323line 316 didn't jump to line 323 because the condition on line 316 was always true
317 should_filter = False
318 logger.info(
319 f"Global skip_relevance_filter=True overrides for {engine_name}"
320 )
322 # Apply the setting
323 if should_filter and hasattr(engine, "llm") and engine.llm: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 engine.enable_llm_relevance_filter = True
325 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}")
326 elif not hasattr(engine, "llm") or not engine.llm:
327 logger.debug(
328 f"LLM relevance filtering not available for {engine_name} (no LLM)"
329 )
330 else:
331 logger.debug(f"LLM relevance filtering disabled for {engine_name}")
333 # Check if we need to wrap with full search capabilities
334 if kwargs.get("use_full_search", False) and engine_config.get( 334 ↛ 337line 334 didn't jump to line 337 because the condition on line 334 was never true
335 "supports_full_search", False
336 ):
337 return _create_full_search_wrapper(
338 engine_name, engine, llm, kwargs, username, settings_snapshot
339 )
341 return engine
343 except Exception:
344 logger.exception(f"Failed to create search engine '{engine_name}'")
345 return None
348def _create_full_search_wrapper(
349 engine_name: str,
350 base_engine: BaseSearchEngine,
351 llm,
352 params: Dict[str, Any],
353 username: str = None,
354 settings_snapshot: Dict[str, Any] = None,
355) -> Optional[BaseSearchEngine]:
356 """Create a full search wrapper for the base engine if supported"""
357 try:
358 # Extract search engine config from settings snapshot
359 if settings_snapshot:
360 config = {}
362 # Extract web search engines
363 web_engines = {}
364 for key, value in settings_snapshot.items():
365 if key.startswith("search.engine.web."):
366 # Extract engine name from key like "search.engine.web.searxng.class_name"
367 parts = key.split(".")
368 if len(parts) >= 4:
369 engine_name_from_key = parts[3]
370 if engine_name_from_key not in web_engines:
371 web_engines[engine_name_from_key] = {}
372 # Store the config value
373 remaining_key = (
374 ".".join(parts[4:]) if len(parts) > 4 else ""
375 )
376 if remaining_key:
377 web_engines[engine_name_from_key][remaining_key] = (
378 value.get("value")
379 if isinstance(value, dict)
380 else value
381 )
383 config.update(web_engines)
384 else:
385 # Fallback to search_config if no snapshot (not recommended for threads)
386 config = search_config(
387 username=username, settings_snapshot=settings_snapshot
388 )
390 if engine_name not in config:
391 logger.warning(f"Engine config for {engine_name} not found")
392 return base_engine
394 engine_config = config[engine_name]
396 # Get full search class details
397 module_path = engine_config.get("full_search_module")
398 class_name = engine_config.get("full_search_class")
400 if not module_path or not class_name:
401 logger.warning(
402 f"Full search configuration missing for {engine_name}"
403 )
404 return base_engine
406 # Import the full search class
407 module = importlib.import_module(module_path)
408 full_search_class = getattr(module, class_name)
410 # Get the wrapper's __init__ parameters to filter out unsupported ones
411 wrapper_init_signature = inspect.signature(full_search_class.__init__)
412 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[
413 1:
414 ] # Skip 'self'
416 # Extract relevant parameters for the full search wrapper
417 wrapper_params = {
418 k: v for k, v in params.items() if k in wrapper_init_params
419 }
421 # Special case for SerpAPI which needs the API key directly
422 if (
423 engine_name == "serpapi"
424 and "serpapi_api_key" in wrapper_init_params
425 ):
426 # Check settings snapshot for API key
427 serpapi_api_key = None
428 if settings_snapshot:
429 serpapi_setting = settings_snapshot.get(
430 "search.engine.web.serpapi.api_key"
431 )
432 if serpapi_setting:
433 serpapi_api_key = (
434 serpapi_setting.get("value")
435 if isinstance(serpapi_setting, dict)
436 else serpapi_setting
437 )
438 if serpapi_api_key:
439 wrapper_params["serpapi_api_key"] = serpapi_api_key
441 # Map some parameter names to what the wrapper expects
442 if (
443 "language" in params
444 and "search_language" not in params
445 and "language" in wrapper_init_params
446 ):
447 wrapper_params["language"] = params["language"]
449 if (
450 "safesearch" not in wrapper_params
451 and "safe_search" in params
452 and "safesearch" in wrapper_init_params
453 ):
454 wrapper_params["safesearch"] = (
455 "active" if params["safe_search"] else "off"
456 )
458 # Special case for Brave which needs the API key directly
459 if engine_name == "brave" and "api_key" in wrapper_init_params:
460 # Check settings snapshot for API key
461 brave_api_key = None
462 if settings_snapshot:
463 brave_setting = settings_snapshot.get(
464 "search.engine.web.brave.api_key"
465 )
466 if brave_setting:
467 brave_api_key = (
468 brave_setting.get("value")
469 if isinstance(brave_setting, dict)
470 else brave_setting
471 )
473 if brave_api_key:
474 wrapper_params["api_key"] = brave_api_key
476 # Map some parameter names to what the wrapper expects
477 if (
478 "language" in params
479 and "search_language" not in params
480 and "language" in wrapper_init_params
481 ):
482 wrapper_params["language"] = params["language"]
484 if (
485 "safesearch" not in wrapper_params
486 and "safe_search" in params
487 and "safesearch" in wrapper_init_params
488 ):
489 wrapper_params["safesearch"] = (
490 "moderate" if params["safe_search"] else "off"
491 )
493 # Always include llm if it's a parameter
494 if "llm" in wrapper_init_params:
495 wrapper_params["llm"] = llm
497 # If the wrapper needs the base engine and has a parameter for it
498 if "web_search" in wrapper_init_params:
499 wrapper_params["web_search"] = base_engine
501 logger.debug(
502 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}"
503 )
505 # Create the full search wrapper with filtered parameters
506 full_search = full_search_class(**wrapper_params)
508 return full_search
510 except Exception:
511 logger.exception(
512 f"Failed to create full search wrapper for {engine_name}"
513 )
514 return base_engine
517def get_search(
518 search_tool: str,
519 llm_instance,
520 max_results: int = 10,
521 region: str = "us",
522 time_period: str = "y",
523 safe_search: bool = True,
524 search_snippets_only: bool = False,
525 search_language: str = "English",
526 max_filtered_results: Optional[int] = None,
527 settings_snapshot: Dict[str, Any] = None,
528 programmatic_mode: bool = False,
529):
530 """
531 Get search tool instance based on the provided parameters.
533 Args:
534 search_tool: Name of the search engine to use
535 llm_instance: Language model instance
536 max_results: Maximum number of search results
537 region: Search region/locale
538 time_period: Time period for search results
539 safe_search: Whether to enable safe search
540 search_snippets_only: Whether to return just snippets (vs. full content)
541 search_language: Language for search results
542 max_filtered_results: Maximum number of results to keep after filtering
543 programmatic_mode: If True, disables database operations and metrics tracking
545 Returns:
546 Initialized search engine instance
547 """
548 # Common parameters
549 params = {
550 "max_results": max_results,
551 "llm": llm_instance, # Only used by engines that need it
552 }
554 # Add max_filtered_results if provided
555 if max_filtered_results is not None: 555 ↛ 559line 555 didn't jump to line 559 because the condition on line 555 was always true
556 params["max_filtered_results"] = max_filtered_results
558 # Add engine-specific parameters
559 if search_tool in ["duckduckgo", "serpapi", "google_pse", "brave"]: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 params.update(
561 {
562 "region": region,
563 "safe_search": safe_search,
564 "use_full_search": not search_snippets_only,
565 }
566 )
568 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true
569 params["search_language"] = search_language
571 if search_tool == "wikinews": 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true
572 params["search_snippets_only"] = search_snippets_only
573 params["adaptive_search"] = bool(
574 settings_snapshot.get(
575 "search.engine.web.wikinews.adaptive_search", {}
576 ).get("value", True)
577 )
579 if search_tool in ["serpapi", "wikinews"]: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true
580 params["time_period"] = time_period
582 # Create and return the search engine
583 logger.info(
584 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}"
585 )
586 logger.info(
587 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}"
588 )
589 logger.info(f"Params being passed to create_search_engine: {params}")
591 engine = create_search_engine(
592 search_tool,
593 settings_snapshot=settings_snapshot,
594 programmatic_mode=programmatic_mode,
595 **params,
596 )
598 # Add debugging to check if engine is None
599 if engine is None: 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true
600 logger.error(
601 f"Failed to create search engine for {search_tool} - returned None"
602 )
603 else:
604 engine_type = type(engine).__name__
605 logger.info(
606 f"Successfully created search engine of type: {engine_type}"
607 )
608 # Check if the engine has run method
609 if hasattr(engine, "run"): 609 ↛ 612line 609 didn't jump to line 612 because the condition on line 609 was always true
610 logger.info(f"Engine has 'run' method: {engine.run}")
611 else:
612 logger.error("Engine does NOT have 'run' method!")
614 # For SearxNG, check availability flag
615 if hasattr(engine, "is_available"):
616 logger.info(f"Engine availability flag: {engine.is_available}")
618 return engine