Coverage for src / local_deep_research / web_search_engines / search_engine_factory.py: 70%
217 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import inspect
2from typing import Any, Dict, Optional
4from loguru import logger
6from ..security.module_whitelist import get_safe_module_class
7from ..utilities.enums import SearchMode
8from .retriever_registry import retriever_registry
9from .search_engine_base import BaseSearchEngine
10from .search_engines_config import search_config
13def create_search_engine(
14 engine_name: str,
15 llm=None,
16 username: str = None,
17 settings_snapshot: Dict[str, Any] = None,
18 programmatic_mode: bool = False,
19 **kwargs,
20) -> Optional[BaseSearchEngine]:
21 """
22 Create a search engine instance based on the engine name.
24 Args:
25 engine_name: Name of the search engine to create
26 llm: Language model instance (required for some engines like meta)
27 programmatic_mode: If True, disables database operations and metrics tracking
28 **kwargs: Additional parameters to override defaults
30 Returns:
31 Initialized search engine instance or None if creation failed
32 """
33 # Debug logging
34 logger.info(
35 f"create_search_engine called with engine_name={engine_name} (type: {type(engine_name)})"
36 )
38 # Handle special parallel search engine variants
39 if engine_name == "parallel_scientific":
40 logger.info("Creating scientific parallel search engine")
41 from .engines.parallel_search_engine import ParallelSearchEngine
43 return ParallelSearchEngine(
44 llm=llm,
45 search_mode=SearchMode.SCIENTIFIC,
46 settings_snapshot=settings_snapshot,
47 **kwargs,
48 )
49 elif engine_name == "parallel":
50 logger.info("Creating standard parallel search engine")
51 from .engines.parallel_search_engine import ParallelSearchEngine
53 return ParallelSearchEngine(
54 llm=llm,
55 search_mode=SearchMode.ALL,
56 settings_snapshot=settings_snapshot,
57 **kwargs,
58 )
60 # Check if this is a registered retriever first
61 retriever = retriever_registry.get(engine_name)
62 if retriever:
63 logger.info(f"Using registered LangChain retriever: {engine_name}")
64 from .engines.search_engine_retriever import RetrieverSearchEngine
66 return RetrieverSearchEngine(
67 retriever=retriever,
68 name=engine_name,
69 max_results=kwargs.get("max_results", 10),
70 )
72 # Extract search engine configs from settings snapshot
73 if settings_snapshot:
74 config = search_config(
75 username=username, settings_snapshot=settings_snapshot
76 )
78 logger.debug(
79 f"Extracted search engines from snapshot: {list(config.keys())}"
80 )
81 else:
82 raise RuntimeError(
83 "settings_snapshot is required for search engine creation in threads"
84 )
86 if engine_name not in config:
87 # Check if engine_name might be a display label instead of a config key
88 # Display labels have format: "{icon} {base_name} ({category})"
89 # e.g., "🔬 OpenAlex (Scientific)"
90 # NOTE: This fallback is deprecated - callers should pass config keys directly
91 logger.warning(
92 f"Engine '{engine_name}' not found in config - attempting display label fallback. "
93 "This is deprecated; callers should pass the config key directly."
94 )
96 # Try to extract the base name from the label
97 # To avoid ReDoS, we use string operations instead of regex
98 # Pattern: icon, space, base_name, space, (category)
99 # Example: "🔬 OpenAlex (Scientific)"
100 if " (" in engine_name and engine_name.endswith(")"): 100 ↛ 102line 100 didn't jump to line 102 because the condition on line 100 was never true
101 # Split on the last occurrence of ' ('
102 parts = engine_name.rsplit(" (", 1)
103 if len(parts) == 2:
104 # Remove icon (first word) from the beginning
105 before_paren = parts[0]
106 space_idx = before_paren.find(" ")
107 if space_idx > 0:
108 base_name = before_paren[space_idx + 1 :].strip()
109 logger.info(
110 f"Extracted base name '{base_name}' from label '{engine_name}'"
111 )
113 # Search for a config entry with matching display_name
114 for config_key, config_data in config.items():
115 if isinstance(config_data, dict):
116 display_name = config_data.get(
117 "display_name", config_key
118 )
119 if display_name == base_name:
120 logger.info(
121 f"Matched label to config key: '{engine_name}' -> '{config_key}'"
122 )
123 engine_name = config_key
124 break
126 # If still not found, use default
127 if engine_name not in config: 127 ↛ 141line 127 didn't jump to line 141 because the condition on line 127 was always true
128 logger.warning(
129 f"Search engine '{engine_name}' not found in config, using default"
130 )
131 # Try to use 'auto' as default if available
132 if "auto" in config:
133 engine_name = "auto"
134 else:
135 logger.error(
136 f"No default search engine available. Available engines: {list(config.keys())}"
137 )
138 return None
140 # Get engine configuration
141 engine_config = config[engine_name]
143 # Set default max_results from config if not provided in kwargs
144 if "max_results" not in kwargs:
145 if settings_snapshot and "search.max_results" in settings_snapshot: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 max_results = (
147 settings_snapshot["search.max_results"].get("value", 20)
148 if isinstance(settings_snapshot["search.max_results"], dict)
149 else settings_snapshot["search.max_results"]
150 )
151 else:
152 max_results = 20
153 kwargs["max_results"] = max_results
155 # Check for API key requirements
156 requires_api_key = engine_config.get("requires_api_key", False)
158 if requires_api_key:
159 # Check the settings snapshot for the API key
160 api_key = None
161 api_key_path = f"search.engine.web.{engine_name}.api_key"
163 if settings_snapshot: 163 ↛ 174line 163 didn't jump to line 174 because the condition on line 163 was always true
164 api_key_setting = settings_snapshot.get(api_key_path)
166 if api_key_setting: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true
167 api_key = (
168 api_key_setting.get("value")
169 if isinstance(api_key_setting, dict)
170 else api_key_setting
171 )
173 # Still try to get from engine config if not found
174 if not api_key: 174 ↛ 177line 174 didn't jump to line 177 because the condition on line 174 was always true
175 api_key = engine_config.get("api_key")
177 if not api_key: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true
178 logger.info(
179 f"Required API key for {engine_name} not found in settings."
180 )
181 return None
183 # Pass the API key in kwargs for engines that need it
184 if api_key:
185 kwargs["api_key"] = api_key
187 # Check for LLM requirements
188 if engine_config.get("requires_llm", False) and not llm:
189 logger.info(
190 f"Engine {engine_name} requires an LLM instance but none was provided"
191 )
192 return None
194 try:
195 # Load the engine class
196 module_path = engine_config["module_path"]
197 class_name = engine_config["class_name"]
199 engine_class = get_safe_module_class(module_path, class_name)
201 # Get the engine class's __init__ parameters to filter out unsupported ones
202 engine_init_signature = inspect.signature(engine_class.__init__)
203 engine_init_params = list(engine_init_signature.parameters.keys())
205 # Combine default parameters with provided ones
206 all_params = {**engine_config.get("default_params", {}), **kwargs}
208 # Filter out parameters that aren't accepted by the engine class
209 # Note: 'self' is always the first parameter of instance methods, so we skip it
210 filtered_params = {
211 k: v for k, v in all_params.items() if k in engine_init_params[1:]
212 }
214 # Always pass settings_snapshot if the engine accepts it
215 if "settings_snapshot" in engine_init_params[1:] and settings_snapshot:
216 filtered_params["settings_snapshot"] = settings_snapshot
218 # Pass programmatic_mode if the engine accepts it
219 if "programmatic_mode" in engine_init_params[1:]:
220 filtered_params["programmatic_mode"] = programmatic_mode
222 # Add LLM if required OR if provided and engine accepts it
223 if engine_config.get("requires_llm", False):
224 filtered_params["llm"] = llm
225 elif ( 225 ↛ 231line 225 didn't jump to line 231 because the condition on line 225 was never true
226 "llm" in engine_init_params[1:]
227 and llm
228 and "llm" not in filtered_params
229 ):
230 # If LLM was provided and engine accepts it, pass it through
231 filtered_params["llm"] = llm
232 logger.info(
233 f"Passing LLM to {engine_name} (engine accepts it and LLM was provided)"
234 )
236 # Add API key if required and not already in filtered_params
237 if ( 237 ↛ 242line 237 didn't jump to line 242 because the condition on line 237 was never true
238 engine_config.get("requires_api_key", False)
239 and "api_key" not in filtered_params
240 ):
241 # Use the api_key we got earlier from settings
242 if api_key:
243 filtered_params["api_key"] = api_key
245 logger.info(
246 f"Creating {engine_name} with filtered parameters: {filtered_params.keys()}"
247 )
249 # Create the engine instance with filtered parameters
250 engine = engine_class(**filtered_params)
252 # Determine if this engine should use LLM relevance filtering
253 # Priority: per-engine setting > auto-detection > global setting
254 #
255 # Rationale:
256 # - Academic engines (arXiv, Semantic Scholar) use simple keyword matching
257 # and benefit significantly from LLM-based relevance filtering
258 # - Generic engines (Google, Brave, SearXNG) already use semantic search
259 # and LLM filtering is redundant/wasteful
260 # - CrossEngineFilter still ranks combined results at the strategy level
261 should_filter = False
263 # Check for per-engine setting first (highest priority)
264 per_engine_key = f"search.engine.web.{engine_name}.default_params.enable_llm_relevance_filter"
265 if settings_snapshot and per_engine_key in settings_snapshot: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 per_engine_setting = settings_snapshot[per_engine_key]
267 should_filter = (
268 per_engine_setting.get("value", False)
269 if isinstance(per_engine_setting, dict)
270 else per_engine_setting
271 )
272 logger.info(
273 f"Using per-engine setting for {engine_name}: "
274 f"enable_llm_relevance_filter={should_filter}"
275 )
276 else:
277 # Auto-detection based on engine type (medium priority)
278 # Scientific engines benefit from LLM filtering (simple keyword search)
279 # Generic engines already have semantic search (waste of LLM calls)
280 if ( 280 ↛ 284line 280 didn't jump to line 284 because the condition on line 280 was never true
281 hasattr(engine_class, "is_scientific")
282 and engine_class.is_scientific
283 ):
284 should_filter = True
285 logger.info(
286 f"Auto-enabling LLM filtering for scientific engine: {engine_name}"
287 )
288 elif ( 288 ↛ 291line 288 didn't jump to line 291 because the condition on line 288 was never true
289 hasattr(engine_class, "is_generic") and engine_class.is_generic
290 ):
291 should_filter = False
292 logger.debug(
293 f"Auto-disabling LLM filtering for generic engine: {engine_name} "
294 f"(already semantic)"
295 )
297 # Check global override (lowest priority but overrides auto-detection)
298 if ( 298 ↛ 302line 298 didn't jump to line 302 because the condition on line 298 was never true
299 settings_snapshot
300 and "search.skip_relevance_filter" in settings_snapshot
301 ):
302 skip_filter_setting = settings_snapshot[
303 "search.skip_relevance_filter"
304 ]
305 skip_filter = (
306 skip_filter_setting.get("value", False)
307 if isinstance(skip_filter_setting, dict)
308 else skip_filter_setting
309 )
310 if skip_filter:
311 should_filter = False
312 logger.info(
313 f"Global skip_relevance_filter=True overrides for {engine_name}"
314 )
316 # Apply the setting
317 if should_filter and hasattr(engine, "llm") and engine.llm: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 engine.enable_llm_relevance_filter = True
319 logger.info(f"✓ Enabled LLM relevance filtering for {engine_name}")
320 elif not hasattr(engine, "llm") or not engine.llm: 320 ↛ 325line 320 didn't jump to line 325 because the condition on line 320 was always true
321 logger.debug(
322 f"LLM relevance filtering not available for {engine_name} (no LLM)"
323 )
324 else:
325 logger.debug(f"LLM relevance filtering disabled for {engine_name}")
327 # Check if we need to wrap with full search capabilities
328 if kwargs.get("use_full_search", False) and engine_config.get( 328 ↛ 331line 328 didn't jump to line 331 because the condition on line 328 was never true
329 "supports_full_search", False
330 ):
331 return _create_full_search_wrapper(
332 engine_name, engine, llm, kwargs, username, settings_snapshot
333 )
335 return engine
337 except Exception:
338 logger.exception(f"Failed to create search engine '{engine_name}'")
339 return None
342def _create_full_search_wrapper(
343 engine_name: str,
344 base_engine: BaseSearchEngine,
345 llm,
346 params: Dict[str, Any],
347 username: str = None,
348 settings_snapshot: Dict[str, Any] = None,
349) -> Optional[BaseSearchEngine]:
350 """Create a full search wrapper for the base engine if supported"""
351 try:
352 # Extract search engine config from settings snapshot
353 if settings_snapshot:
354 config = {}
356 # Extract web search engines
357 web_engines = {}
358 for key, value in settings_snapshot.items():
359 if key.startswith("search.engine.web."):
360 # Extract engine name from key like "search.engine.web.searxng.class_name"
361 parts = key.split(".")
362 if len(parts) >= 4: 362 ↛ 358line 362 didn't jump to line 358 because the condition on line 362 was always true
363 engine_name_from_key = parts[3]
364 if engine_name_from_key not in web_engines:
365 web_engines[engine_name_from_key] = {}
366 # Store the config value
367 remaining_key = (
368 ".".join(parts[4:]) if len(parts) > 4 else ""
369 )
370 if remaining_key: 370 ↛ 358line 370 didn't jump to line 358 because the condition on line 370 was always true
371 web_engines[engine_name_from_key][remaining_key] = (
372 value.get("value")
373 if isinstance(value, dict)
374 else value
375 )
377 config.update(web_engines)
378 else:
379 # Fallback to search_config if no snapshot (not recommended for threads)
380 config = search_config(
381 username=username, settings_snapshot=settings_snapshot
382 )
384 if engine_name not in config:
385 logger.warning(f"Engine config for {engine_name} not found")
386 return base_engine
388 engine_config = config[engine_name]
390 # Get full search class details
391 module_path = engine_config.get("full_search_module")
392 class_name = engine_config.get("full_search_class")
394 if not module_path or not class_name:
395 logger.warning(
396 f"Full search configuration missing for {engine_name}"
397 )
398 return base_engine
400 # Import the full search class
401 full_search_class = get_safe_module_class(module_path, class_name)
403 # Get the wrapper's __init__ parameters to filter out unsupported ones
404 wrapper_init_signature = inspect.signature(full_search_class.__init__)
405 wrapper_init_params = list(wrapper_init_signature.parameters.keys())[
406 1:
407 ] # Skip 'self'
409 # Extract relevant parameters for the full search wrapper
410 wrapper_params = {
411 k: v for k, v in params.items() if k in wrapper_init_params
412 }
414 # Special case for SerpAPI which needs the API key directly
415 if ( 415 ↛ 420line 415 didn't jump to line 420 because the condition on line 415 was never true
416 engine_name == "serpapi"
417 and "serpapi_api_key" in wrapper_init_params
418 ):
419 # Check settings snapshot for API key
420 serpapi_api_key = None
421 if settings_snapshot:
422 serpapi_setting = settings_snapshot.get(
423 "search.engine.web.serpapi.api_key"
424 )
425 if serpapi_setting:
426 serpapi_api_key = (
427 serpapi_setting.get("value")
428 if isinstance(serpapi_setting, dict)
429 else serpapi_setting
430 )
431 if serpapi_api_key:
432 wrapper_params["serpapi_api_key"] = serpapi_api_key
434 # Map some parameter names to what the wrapper expects
435 if (
436 "language" in params
437 and "search_language" not in params
438 and "language" in wrapper_init_params
439 ):
440 wrapper_params["language"] = params["language"]
442 if (
443 "safesearch" not in wrapper_params
444 and "safe_search" in params
445 and "safesearch" in wrapper_init_params
446 ):
447 wrapper_params["safesearch"] = (
448 "active" if params["safe_search"] else "off"
449 )
451 # Special case for Brave which needs the API key directly
452 if engine_name == "brave" and "api_key" in wrapper_init_params:
453 # Check settings snapshot for API key
454 brave_api_key = None
455 if settings_snapshot: 455 ↛ 466line 455 didn't jump to line 466 because the condition on line 455 was always true
456 brave_setting = settings_snapshot.get(
457 "search.engine.web.brave.api_key"
458 )
459 if brave_setting:
460 brave_api_key = (
461 brave_setting.get("value")
462 if isinstance(brave_setting, dict)
463 else brave_setting
464 )
466 if brave_api_key:
467 wrapper_params["api_key"] = brave_api_key
469 # Map some parameter names to what the wrapper expects
470 if ( 470 ↛ 475line 470 didn't jump to line 475 because the condition on line 470 was never true
471 "language" in params
472 and "search_language" not in params
473 and "language" in wrapper_init_params
474 ):
475 wrapper_params["language"] = params["language"]
477 if ( 477 ↛ 482line 477 didn't jump to line 482 because the condition on line 477 was never true
478 "safesearch" not in wrapper_params
479 and "safe_search" in params
480 and "safesearch" in wrapper_init_params
481 ):
482 wrapper_params["safesearch"] = (
483 "moderate" if params["safe_search"] else "off"
484 )
486 # Always include llm if it's a parameter
487 if "llm" in wrapper_init_params: 487 ↛ 491line 487 didn't jump to line 491 because the condition on line 487 was always true
488 wrapper_params["llm"] = llm
490 # If the wrapper needs the base engine and has a parameter for it
491 if "web_search" in wrapper_init_params:
492 wrapper_params["web_search"] = base_engine
494 logger.debug(
495 f"Creating full search wrapper for {engine_name} with filtered parameters: {wrapper_params.keys()}"
496 )
498 # Create the full search wrapper with filtered parameters
499 full_search = full_search_class(**wrapper_params)
501 return full_search
503 except Exception:
504 logger.exception(
505 f"Failed to create full search wrapper for {engine_name}"
506 )
507 return base_engine
510def get_search(
511 search_tool: str,
512 llm_instance,
513 max_results: int = 10,
514 region: str = "us",
515 time_period: str = "y",
516 safe_search: bool = True,
517 search_snippets_only: bool = False,
518 search_language: str = "English",
519 max_filtered_results: Optional[int] = None,
520 settings_snapshot: Dict[str, Any] = None,
521 programmatic_mode: bool = False,
522):
523 """
524 Get search tool instance based on the provided parameters.
526 Args:
527 search_tool: Name of the search engine to use
528 llm_instance: Language model instance
529 max_results: Maximum number of search results
530 region: Search region/locale
531 time_period: Time period for search results
532 safe_search: Whether to enable safe search
533 search_snippets_only: Whether to return just snippets (vs. full content)
534 search_language: Language for search results
535 max_filtered_results: Maximum number of results to keep after filtering
536 programmatic_mode: If True, disables database operations and metrics tracking
538 Returns:
539 Initialized search engine instance
540 """
541 # Common parameters
542 params = {
543 "max_results": max_results,
544 "llm": llm_instance, # Only used by engines that need it
545 }
547 # Add max_filtered_results if provided
548 if max_filtered_results is not None:
549 params["max_filtered_results"] = max_filtered_results
551 # Add engine-specific parameters
552 if search_tool in [
553 "duckduckgo",
554 "serpapi",
555 "google_pse",
556 "brave",
557 "mojeek",
558 ]:
559 params.update(
560 {
561 "region": region,
562 "safe_search": safe_search,
563 "use_full_search": not search_snippets_only,
564 }
565 )
567 if search_tool in ["serpapi", "brave", "google_pse", "wikinews"]:
568 params["search_language"] = search_language
570 if search_tool == "wikinews": 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true
571 params["search_snippets_only"] = search_snippets_only
572 params["adaptive_search"] = bool(
573 settings_snapshot.get(
574 "search.engine.web.wikinews.adaptive_search", {}
575 ).get("value", True)
576 )
578 if search_tool in ["serpapi", "wikinews"]:
579 params["time_period"] = time_period
581 # Create and return the search engine
582 logger.info(
583 f"Creating search engine for tool: {search_tool} (type: {type(search_tool)}) with params: {params.keys()}"
584 )
585 logger.info(
586 f"About to call create_search_engine with search_tool={search_tool}, settings_snapshot type={type(settings_snapshot)}"
587 )
588 logger.info(
589 f"Params being passed to create_search_engine: {list(params.keys()) if isinstance(params, dict) else type(params)}"
590 )
592 engine = create_search_engine(
593 search_tool,
594 settings_snapshot=settings_snapshot,
595 programmatic_mode=programmatic_mode,
596 **params,
597 )
599 # Add debugging to check if engine is None
600 if engine is None:
601 logger.error(
602 f"Failed to create search engine for {search_tool} - returned None"
603 )
604 else:
605 engine_type = type(engine).__name__
606 logger.info(
607 f"Successfully created search engine of type: {engine_type}"
608 )
609 # Check if the engine has run method
610 if hasattr(engine, "run"): 610 ↛ 613line 610 didn't jump to line 613 because the condition on line 610 was always true
611 logger.info(f"Engine has 'run' method: {engine.run}")
612 else:
613 logger.error("Engine does NOT have 'run' method!")
615 # For SearxNG, check availability flag
616 if hasattr(engine, "is_available"): 616 ↛ 619line 616 didn't jump to line 619 because the condition on line 616 was always true
617 logger.info(f"Engine availability flag: {engine.is_available}")
619 return engine