Coverage for src / local_deep_research / search_system.py: 95%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1# src/local_deep_research/search_system/search_system.py
2from typing import Any, Callable, Dict
4from langchain_core.language_models import BaseChatModel
5from loguru import logger
7from .advanced_search_system.findings.repository import FindingsRepository
8from .advanced_search_system.questions.standard_question import (
9 StandardQuestionGenerator,
10)
11from .advanced_search_system.strategies.followup.enhanced_contextual_followup import (
12 EnhancedContextualFollowUpStrategy,
13)
15# StandardSearchStrategy imported lazily to avoid database access during module import
16from .citation_handler import CitationHandler
17from .web_search_engines.search_engine_base import BaseSearchEngine
20class AdvancedSearchSystem:
21 """
22 Advanced search system that coordinates different search strategies.
23 """
25 def __init__(
26 self,
27 llm: BaseChatModel,
28 search: BaseSearchEngine,
29 strategy_name: str = "source-based", # Default to comprehensive research strategy
30 include_text_content: bool = True,
31 use_cross_engine_filter: bool = True,
32 max_iterations: int | None = None,
33 questions_per_iteration: int | None = None,
34 use_atomic_facts: bool = False,
35 username: str | None = None,
36 settings_snapshot: dict | None = None,
37 research_id: str | None = None,
38 research_context: dict | None = None,
39 programmatic_mode: bool = False,
40 search_original_query: bool = True,
41 ):
42 """Initialize the advanced search system.
44 Args:
45 llm: LLM to use for the search strategy.
46 search: Search engine to use for queries.
47 strategy_name: The name of the search strategy to use. Options:
48 - "standard": Basic iterative search strategy
49 - "iterdrag": Iterative Dense Retrieval Augmented Generation
50 - "source-based": Focuses on finding and extracting from sources
51 - "parallel": Runs multiple search queries in parallel
52 - "rapid": Quick single-pass search
53 - "recursive": Recursive decomposition of complex queries
54 - "iterative": Loop-based reasoning with persistent knowledge
55 - "adaptive": Adaptive step-by-step reasoning
56 - "smart": Automatically chooses best strategy based on query
57 - "browsecomp": Optimized for BrowseComp-style puzzle queries
58 - "evidence": Enhanced evidence-based verification with improved candidate discovery
59 - "constrained": Progressive constraint-based search that narrows candidates step by step
60 - "parallel-constrained": Parallel constraint-based search with combined constraint execution
61 - "early-stop-constrained": Parallel constraint search with immediate evaluation and early stopping at 99% confidence
62 - "dual-confidence": Dual confidence scoring with positive/negative/uncertainty
63 - "dual-confidence-with-rejection": Dual confidence with early rejection of poor candidates
64 - "concurrent-dual-confidence": Concurrent search & evaluation with progressive constraint relaxation
65 - "modular": Modular architecture using constraint checking and candidate exploration modules
66 - "browsecomp-entity": Entity-focused search for BrowseComp questions with knowledge graph building
67 - "iterative-refinement": Iteratively refines results using LLM evaluation and follow-up queries
68 include_text_content: If False, only includes metadata and links in search results
69 use_cross_engine_filter: Whether to filter results across search
70 engines.
71 max_iterations: The maximum number of search iterations to
72 perform. Will be read from the settings if not specified.
73 questions_per_iteration: The number of questions to include in
74 each iteration. Will be read from the settings if not specified.
75 use_atomic_facts: Whether to use atomic fact decomposition for
76 complex queries when using the source-based strategy.
77 programmatic_mode: If True, disables database operations and metrics tracking.
78 This is useful for running searches without database dependencies.
79 search_original_query: Whether to include the original query in the first iteration
80 of search. Set to False for news searches to avoid sending long subscription
81 prompts to search engines.
83 """
84 # Store research context for strategies
85 self.research_id = research_id
86 self.research_context = research_context
87 self.username = username
89 # Store required components
90 self.model = llm
91 self.search = search
93 # Store settings snapshot
94 self.settings_snapshot = settings_snapshot or {}
96 # Store programmatic mode
97 self.programmatic_mode = programmatic_mode
99 # Store search original query setting
100 self.search_original_query = search_original_query
102 # Log if running in programmatic mode
103 if self.programmatic_mode:
104 logger.warning(
105 "Running in programmatic mode - database operations and metrics tracking disabled. "
106 "Rate limiting, search metrics, and persistence features will not be available."
107 )
109 # Get iterations setting
110 self.max_iterations = max_iterations
111 if self.max_iterations is None:
112 # Use settings from snapshot
113 if "search.iterations" in self.settings_snapshot:
114 value = self.settings_snapshot["search.iterations"]
115 if isinstance(value, dict) and "value" in value:
116 self.max_iterations = value["value"]
117 else:
118 self.max_iterations = value
119 else:
120 self.max_iterations = 1 # Default
122 self.questions_per_iteration = questions_per_iteration
123 if self.questions_per_iteration is None:
124 # Use settings from snapshot
125 if "search.questions_per_iteration" in self.settings_snapshot:
126 value = self.settings_snapshot["search.questions_per_iteration"]
127 if isinstance(value, dict) and "value" in value:
128 self.questions_per_iteration = value["value"]
129 else:
130 self.questions_per_iteration = value
131 else:
132 self.questions_per_iteration = 3 # Default
134 # Log the strategy name that's being used
135 logger.info(
136 f"Initializing AdvancedSearchSystem with strategy_name='{strategy_name}'"
137 )
139 # Initialize components
140 self.citation_handler = CitationHandler(
141 self.model, settings_snapshot=self.settings_snapshot
142 )
143 self.question_generator = StandardQuestionGenerator(self.model)
144 self.findings_repository = FindingsRepository(self.model)
145 # For backward compatibility
146 self.questions_by_iteration: dict[Any, Any] = {}
147 self.progress_callback = lambda _1, _2, _3: None
148 self.all_links_of_system: list[dict[Any, Any]] = []
150 # Initialize strategy using factory
151 from .search_system_factory import create_strategy
153 # Special handling for follow-up strategy which needs different logic
154 if strategy_name.lower() in [
155 "enhanced-contextual-followup",
156 "enhanced_contextual_followup",
157 "contextual-followup",
158 "contextual_followup",
159 ]:
160 logger.info("Creating EnhancedContextualFollowUpStrategy instance")
161 # Get delegate strategy from research context
162 # This should be the user's preferred strategy from settings
163 delegate_strategy_name = (
164 self.research_context.get("delegate_strategy", "source-based")
165 if self.research_context
166 else "source-based"
167 )
169 delegate = create_strategy(
170 strategy_name=delegate_strategy_name,
171 model=self.model,
172 search=self.search,
173 all_links_of_system=[],
174 settings_snapshot=self.settings_snapshot,
175 knowledge_accumulation_mode=True,
176 search_original_query=self.search_original_query,
177 )
179 # Create the contextual follow-up strategy with the delegate
180 self.strategy = EnhancedContextualFollowUpStrategy(
181 model=self.model,
182 search=self.search,
183 delegate_strategy=delegate,
184 all_links_of_system=self.all_links_of_system,
185 settings_snapshot=self.settings_snapshot,
186 research_context=self.research_context,
187 )
188 else:
189 # Use factory for all other strategies
190 logger.info(f"Creating {strategy_name} strategy using factory")
191 self.strategy = create_strategy(
192 strategy_name=strategy_name,
193 model=self.model,
194 search=self.search,
195 all_links_of_system=self.all_links_of_system,
196 settings_snapshot=self.settings_snapshot,
197 # Pass strategy-specific parameters
198 include_text_content=include_text_content,
199 use_cross_engine_filter=use_cross_engine_filter,
200 use_atomic_facts=use_atomic_facts,
201 max_iterations=self.max_iterations,
202 questions_per_iteration=self.questions_per_iteration,
203 # Special parameters for iterative strategy
204 search_iterations_per_round=self.max_iterations or 1,
205 questions_per_search=self.questions_per_iteration,
206 # Special parameters for adaptive strategy
207 max_steps=self.max_iterations,
208 source_questions_per_iteration=self.questions_per_iteration,
209 # Special parameters for evidence and constrained strategies
210 max_search_iterations=self.max_iterations,
211 # Special parameters for focused iteration
212 use_browsecomp_optimization=True,
213 # Pass search original query parameter
214 search_original_query=self.search_original_query,
215 )
217 # Log the actual strategy class
218 logger.info(f"Created strategy of type: {type(self.strategy).__name__}")
220 # Configure the strategy with our attributes
221 if ( 221 ↛ exitline 221 didn't return from function '__init__' because the condition on line 221 was always true
222 hasattr(self, "progress_callback")
223 and self.progress_callback is not None
224 ):
225 self.strategy.set_progress_callback(self.progress_callback)
227 def close(self):
228 """Close resources held by the search system.
230 Cascades close to the strategy, which may hold persistent
231 ThreadPoolExecutor instances (e.g. ConstraintParallelStrategy holds
232 search_executor and evaluation_executor,
233 ConcurrentDualConfidenceStrategy holds evaluation_executor).
235 NOTE: Does NOT close self.search (the search engine) — the caller
236 (run_research_process) manages search engine lifecycle separately
237 because the search engine may be shared or reused.
239 Most strategies (including the default source-based) use
240 context-managed ThreadPoolExecutors that clean up automatically.
241 The close() call here is a safety net for the two constraint-based
242 strategies that hold persistent executors in __init__. Those
243 strategies also shut down their executors in
244 find_relevant_information()'s finally block, so this is a second
245 line of defense for the edge case where the method is never called.
246 """
247 from .utilities.resource_utils import safe_close
249 if hasattr(self, "strategy"):
250 safe_close(self.strategy, "search strategy")
252 def _progress_callback(
253 self, message: str, progress: int, metadata: dict
254 ) -> None:
255 """Handle progress updates from the strategy."""
256 logger.info(f"Progress: {progress}% - {message}")
257 if hasattr(self, "progress_callback"):
258 self.progress_callback(message, progress, metadata)
260 def set_progress_callback(
261 self, callback: Callable[[str, int, dict], None]
262 ) -> None:
263 """Set a callback function to receive progress updates."""
264 self.progress_callback = callback
265 if hasattr(self, "strategy"): 265 ↛ exitline 265 didn't return from function 'set_progress_callback' because the condition on line 265 was always true
266 self.strategy.set_progress_callback(callback)
268 def analyze_topic(
269 self,
270 query: str,
271 is_user_search: bool = True,
272 is_news_search: bool = False,
273 user_id: str = "anonymous",
274 search_id: str | None = None,
275 **kwargs,
276 ) -> Dict:
277 """Analyze a topic using the current strategy.
279 Args:
280 query: The research query to analyze
281 is_user_search: Whether this is a user-initiated search
282 is_news_search: Whether this is a news search
283 user_id: The user ID for tracking
284 search_id: The search ID (auto-generated if not provided)
285 **kwargs: Additional arguments
286 """
288 # Generate search ID if not provided
289 if search_id is None:
290 import uuid
292 search_id = str(uuid.uuid4())
294 # Perform the search
295 return self._perform_search(
296 query, search_id, is_user_search, is_news_search, user_id
297 )
299 def _perform_search(
300 self,
301 query: str,
302 search_id: str,
303 is_user_search: bool,
304 is_news_search: bool,
305 user_id: str,
306 ) -> Dict:
307 """Perform the actual search."""
308 # Send progress message with LLM info
309 # Get settings from snapshot if available
310 llm_provider = "unknown"
311 llm_model = "unknown"
312 search_tool = "unknown"
314 if self.settings_snapshot:
315 # Extract values from settings snapshot
316 provider_setting = self.settings_snapshot.get("llm.provider", {})
317 llm_provider = (
318 provider_setting.get("value", "unknown")
319 if isinstance(provider_setting, dict)
320 else provider_setting
321 )
323 model_setting = self.settings_snapshot.get("llm.model", {})
324 llm_model = (
325 model_setting.get("value", "unknown")
326 if isinstance(model_setting, dict)
327 else model_setting
328 )
330 tool_setting = self.settings_snapshot.get("search.tool", {})
331 search_tool = (
332 tool_setting.get("value", "searxng")
333 if isinstance(tool_setting, dict)
334 else tool_setting
335 )
337 self.progress_callback(
338 f"Using {llm_provider} model: {llm_model}",
339 1, # Low percentage to show this as an early step
340 {
341 "phase": "setup",
342 "llm_info": {
343 "name": llm_model,
344 "provider": llm_provider,
345 },
346 },
347 )
348 # Send progress message with search strategy info
349 self.progress_callback(
350 f"Using search tool: {search_tool}",
351 1.5, # Between setup and processing steps
352 {
353 "phase": "setup",
354 "search_info": {
355 "tool": search_tool,
356 },
357 },
358 )
360 # Use the strategy to analyze the topic
361 result = self.strategy.analyze_topic(query)
363 # Update our attributes for backward compatibility
365 self.questions_by_iteration = (
366 self.strategy.questions_by_iteration.copy()
367 )
368 # Send progress message with search info
370 # Only extend if they're different objects in memory to avoid duplication
371 # This check prevents doubling the list when they reference the same object
372 # Fix for issue #301: "too many links in detailed report mode"
373 if id(self.all_links_of_system) != id(
374 self.strategy.all_links_of_system
375 ):
376 self.all_links_of_system.extend(self.strategy.all_links_of_system)
378 # Include the search system instance for access to citations
379 result["search_system"] = self
380 result["all_links_of_system"] = self.all_links_of_system
382 # Ensure query is included in the result
383 if "query" not in result:
384 result["query"] = query
385 result["questions_by_iteration"] = self.questions_by_iteration
387 # Call news callback
388 try:
389 from .news.core.search_integration import NewsSearchCallback
391 callback = NewsSearchCallback()
392 context = {
393 "is_user_search": is_user_search,
394 "is_news_search": is_news_search,
395 "user_id": user_id,
396 "search_id": search_id,
397 }
398 callback(query, result, context)
399 except Exception:
400 logger.exception("Error in news callback")
402 return result