Coverage for src/local_deep_research/search_system.py: 95%

101 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1# src/local_deep_research/search_system/search_system.py 

2from typing import Any, Callable, Dict 

3 

4from langchain_core.language_models import BaseChatModel 

5from loguru import logger 

6 

7from .advanced_search_system.findings.repository import FindingsRepository 

8from .advanced_search_system.questions.standard_question import ( 

9 StandardQuestionGenerator, 

10) 

11from .advanced_search_system.strategies.followup.enhanced_contextual_followup import ( 

12 EnhancedContextualFollowUpStrategy, 

13) 

14 

15# StandardSearchStrategy imported lazily to avoid database access during module import 

16from .citation_handler import CitationHandler 

17from .web_search_engines.search_engine_base import BaseSearchEngine 

18 

19 

20class AdvancedSearchSystem: 

21 """ 

22 Advanced search system that coordinates different search strategies. 

23 """ 

24 

25 def __init__( 

26 self, 

27 llm: BaseChatModel, 

28 search: BaseSearchEngine, 

29 strategy_name: str = "source-based", # Default to comprehensive research strategy 

30 include_text_content: bool = True, 

31 use_cross_engine_filter: bool = True, 

32 max_iterations: int | None = None, 

33 questions_per_iteration: int | None = None, 

34 use_atomic_facts: bool = False, 

35 username: str | None = None, 

36 settings_snapshot: dict | None = None, 

37 research_id: str | None = None, 

38 research_context: dict | None = None, 

39 programmatic_mode: bool = False, 

40 search_original_query: bool = True, 

41 ): 

42 """Initialize the advanced search system. 

43 

44 Args: 

45 llm: LLM to use for the search strategy. 

46 search: Search engine to use for queries. 

47 strategy_name: The name of the search strategy to use. Options: 

48 - "standard": Basic iterative search strategy 

49 - "iterdrag": Iterative Dense Retrieval Augmented Generation 

50 - "source-based": Focuses on finding and extracting from sources 

51 - "parallel": Runs multiple search queries in parallel 

52 - "rapid": Quick single-pass search 

53 - "recursive": Recursive decomposition of complex queries 

54 - "iterative": Loop-based reasoning with persistent knowledge 

55 - "adaptive": Adaptive step-by-step reasoning 

56 - "smart": Automatically chooses best strategy based on query 

57 - "browsecomp": Optimized for BrowseComp-style puzzle queries 

58 - "evidence": Enhanced evidence-based verification with improved candidate discovery 

59 - "constrained": Progressive constraint-based search that narrows candidates step by step 

60 - "parallel-constrained": Parallel constraint-based search with combined constraint execution 

61 - "early-stop-constrained": Parallel constraint search with immediate evaluation and early stopping at 99% confidence 

62 - "dual-confidence": Dual confidence scoring with positive/negative/uncertainty 

63 - "dual-confidence-with-rejection": Dual confidence with early rejection of poor candidates 

64 - "concurrent-dual-confidence": Concurrent search & evaluation with progressive constraint relaxation 

65 - "modular": Modular architecture using constraint checking and candidate exploration modules 

66 - "browsecomp-entity": Entity-focused search for BrowseComp questions with knowledge graph building 

67 - "iterative-refinement": Iteratively refines results using LLM evaluation and follow-up queries 

68 include_text_content: If False, only includes metadata and links in search results 

69 use_cross_engine_filter: Whether to filter results across search 

70 engines. 

71 max_iterations: The maximum number of search iterations to 

72 perform. Will be read from the settings if not specified. 

73 questions_per_iteration: The number of questions to include in 

74 each iteration. Will be read from the settings if not specified. 

75 use_atomic_facts: Whether to use atomic fact decomposition for 

76 complex queries when using the source-based strategy. 

77 programmatic_mode: If True, disables database operations and metrics tracking. 

78 This is useful for running searches without database dependencies. 

79 search_original_query: Whether to include the original query in the first iteration 

80 of search. Set to False for news searches to avoid sending long subscription 

81 prompts to search engines. 

82 

83 """ 

84 # Store research context for strategies 

85 self.research_id = research_id 

86 self.research_context = research_context 

87 self.username = username 

88 

89 # Store required components 

90 self.model = llm 

91 self.search = search 

92 

93 # Store settings snapshot 

94 self.settings_snapshot = settings_snapshot or {} 

95 

96 # Store programmatic mode 

97 self.programmatic_mode = programmatic_mode 

98 

99 # Store search original query setting 

100 self.search_original_query = search_original_query 

101 

102 # Log if running in programmatic mode 

103 if self.programmatic_mode: 

104 logger.warning( 

105 "Running in programmatic mode - database operations and metrics tracking disabled. " 

106 "Rate limiting, search metrics, and persistence features will not be available." 

107 ) 

108 

109 # Get iterations setting 

110 self.max_iterations = max_iterations 

111 if self.max_iterations is None: 

112 # Use settings from snapshot 

113 if "search.iterations" in self.settings_snapshot: 

114 value = self.settings_snapshot["search.iterations"] 

115 if isinstance(value, dict) and "value" in value: 

116 self.max_iterations = value["value"] 

117 else: 

118 self.max_iterations = value 

119 else: 

120 self.max_iterations = 1 # Default 

121 

122 self.questions_per_iteration = questions_per_iteration 

123 if self.questions_per_iteration is None: 

124 # Use settings from snapshot 

125 if "search.questions_per_iteration" in self.settings_snapshot: 

126 value = self.settings_snapshot["search.questions_per_iteration"] 

127 if isinstance(value, dict) and "value" in value: 

128 self.questions_per_iteration = value["value"] 

129 else: 

130 self.questions_per_iteration = value 

131 else: 

132 self.questions_per_iteration = 3 # Default 

133 

134 # Log the strategy name that's being used 

135 logger.info( 

136 f"Initializing AdvancedSearchSystem with strategy_name='{strategy_name}'" 

137 ) 

138 

139 # Initialize components 

140 self.citation_handler = CitationHandler( 

141 self.model, settings_snapshot=self.settings_snapshot 

142 ) 

143 self.question_generator = StandardQuestionGenerator(self.model) 

144 self.findings_repository = FindingsRepository(self.model) 

145 # For backward compatibility 

146 self.questions_by_iteration: dict[Any, Any] = {} 

147 self.progress_callback = lambda _1, _2, _3: None 

148 self.all_links_of_system: list[dict[Any, Any]] = [] 

149 

150 # Initialize strategy using factory 

151 from .search_system_factory import create_strategy 

152 

153 # Special handling for follow-up strategy which needs different logic 

154 if strategy_name.lower() in [ 

155 "enhanced-contextual-followup", 

156 "enhanced_contextual_followup", 

157 "contextual-followup", 

158 "contextual_followup", 

159 ]: 

160 logger.info("Creating EnhancedContextualFollowUpStrategy instance") 

161 # Get delegate strategy from research context 

162 # This should be the user's preferred strategy from settings 

163 delegate_strategy_name = ( 

164 self.research_context.get("delegate_strategy", "source-based") 

165 if self.research_context 

166 else "source-based" 

167 ) 

168 

169 delegate = create_strategy( 

170 strategy_name=delegate_strategy_name, 

171 model=self.model, 

172 search=self.search, 

173 all_links_of_system=[], 

174 settings_snapshot=self.settings_snapshot, 

175 knowledge_accumulation_mode=True, 

176 search_original_query=self.search_original_query, 

177 ) 

178 

179 # Create the contextual follow-up strategy with the delegate 

180 self.strategy = EnhancedContextualFollowUpStrategy( 

181 model=self.model, 

182 search=self.search, 

183 delegate_strategy=delegate, 

184 all_links_of_system=self.all_links_of_system, 

185 settings_snapshot=self.settings_snapshot, 

186 research_context=self.research_context, 

187 ) 

188 else: 

189 # Use factory for all other strategies 

190 logger.info(f"Creating {strategy_name} strategy using factory") 

191 self.strategy = create_strategy( 

192 strategy_name=strategy_name, 

193 model=self.model, 

194 search=self.search, 

195 all_links_of_system=self.all_links_of_system, 

196 settings_snapshot=self.settings_snapshot, 

197 # Pass strategy-specific parameters 

198 include_text_content=include_text_content, 

199 use_cross_engine_filter=use_cross_engine_filter, 

200 use_atomic_facts=use_atomic_facts, 

201 max_iterations=self.max_iterations, 

202 questions_per_iteration=self.questions_per_iteration, 

203 # Special parameters for iterative strategy 

204 search_iterations_per_round=self.max_iterations or 1, 

205 questions_per_search=self.questions_per_iteration, 

206 # Special parameters for adaptive strategy 

207 max_steps=self.max_iterations, 

208 source_questions_per_iteration=self.questions_per_iteration, 

209 # Special parameters for evidence and constrained strategies 

210 max_search_iterations=self.max_iterations, 

211 # Special parameters for focused iteration 

212 use_browsecomp_optimization=True, 

213 # Pass search original query parameter 

214 search_original_query=self.search_original_query, 

215 # Forwarded so strategies that create engines per tool call 

216 # (e.g. langgraph-agent) can match the system's mode. 

217 programmatic_mode=self.programmatic_mode, 

218 ) 

219 

220 # Log the actual strategy class 

221 logger.info(f"Created strategy of type: {type(self.strategy).__name__}") 

222 

223 # Configure the strategy with our attributes 

224 if ( 224 ↛ exitline 224 didn't return from function '__init__' because the condition on line 224 was always true

225 hasattr(self, "progress_callback") 

226 and self.progress_callback is not None 

227 ): 

228 self.strategy.set_progress_callback(self.progress_callback) 

229 

230 def close(self): 

231 """Close resources held by the search system. 

232 

233 Cascades close to the strategy, which may hold persistent 

234 ThreadPoolExecutor instances (e.g. ConstraintParallelStrategy holds 

235 search_executor and evaluation_executor, 

236 ConcurrentDualConfidenceStrategy holds evaluation_executor). 

237 

238 NOTE: Does NOT close self.search (the search engine) — the caller 

239 (run_research_process) manages search engine lifecycle separately 

240 because the search engine may be shared or reused. 

241 

242 Most strategies (including the default source-based) use 

243 context-managed ThreadPoolExecutors that clean up automatically. 

244 The close() call here is a safety net for the two constraint-based 

245 strategies that hold persistent executors in __init__. Those 

246 strategies also shut down their executors in 

247 find_relevant_information()'s finally block, so this is a second 

248 line of defense for the edge case where the method is never called. 

249 """ 

250 from .utilities.resource_utils import safe_close 

251 

252 if hasattr(self, "strategy"): 

253 safe_close(self.strategy, "search strategy") 

254 

255 def _progress_callback( 

256 self, message: str, progress: int, metadata: dict 

257 ) -> None: 

258 """Handle progress updates from the strategy.""" 

259 logger.info(f"Progress: {progress}% - {message}") 

260 if hasattr(self, "progress_callback"): 

261 self.progress_callback(message, progress, metadata) 

262 

263 def set_progress_callback( 

264 self, callback: Callable[[str, int, dict], None] 

265 ) -> None: 

266 """Set a callback function to receive progress updates.""" 

267 self.progress_callback = callback 

268 if hasattr(self, "strategy"): 268 ↛ exitline 268 didn't return from function 'set_progress_callback' because the condition on line 268 was always true

269 self.strategy.set_progress_callback(callback) 

270 

271 def analyze_topic( 

272 self, 

273 query: str, 

274 is_user_search: bool = True, 

275 is_news_search: bool = False, 

276 user_id: str = "anonymous", 

277 search_id: str | None = None, 

278 **kwargs, 

279 ) -> Dict: 

280 """Analyze a topic using the current strategy. 

281 

282 Args: 

283 query: The research query to analyze 

284 is_user_search: Whether this is a user-initiated search 

285 is_news_search: Whether this is a news search 

286 user_id: The user ID for tracking 

287 search_id: The search ID (auto-generated if not provided) 

288 **kwargs: Additional arguments 

289 """ 

290 

291 # Generate search ID if not provided 

292 if search_id is None: 

293 import uuid 

294 

295 search_id = str(uuid.uuid4()) 

296 

297 # Perform the search 

298 return self._perform_search( 

299 query, search_id, is_user_search, is_news_search, user_id 

300 ) 

301 

302 def _perform_search( 

303 self, 

304 query: str, 

305 search_id: str, 

306 is_user_search: bool, 

307 is_news_search: bool, 

308 user_id: str, 

309 ) -> Dict: 

310 """Perform the actual search.""" 

311 # Send progress message with LLM info 

312 # Get settings from snapshot if available 

313 llm_provider = "unknown" 

314 llm_model = "unknown" 

315 search_tool = "unknown" 

316 

317 if self.settings_snapshot: 

318 # Extract values from settings snapshot 

319 provider_setting = self.settings_snapshot.get("llm.provider", {}) 

320 llm_provider = ( 

321 provider_setting.get("value", "unknown") 

322 if isinstance(provider_setting, dict) 

323 else provider_setting 

324 ) 

325 

326 model_setting = self.settings_snapshot.get("llm.model", {}) 

327 llm_model = ( 

328 model_setting.get("value", "unknown") 

329 if isinstance(model_setting, dict) 

330 else model_setting 

331 ) 

332 

333 tool_setting = self.settings_snapshot.get("search.tool", {}) 

334 search_tool = ( 

335 tool_setting.get("value", "searxng") 

336 if isinstance(tool_setting, dict) 

337 else tool_setting 

338 ) 

339 

340 self.progress_callback( 

341 f"Using {llm_provider} model: {llm_model}", 

342 1, # Low percentage to show this as an early step 

343 { 

344 "phase": "setup", 

345 "llm_info": { 

346 "name": llm_model, 

347 "provider": llm_provider, 

348 }, 

349 }, 

350 ) 

351 # Send progress message with search strategy info 

352 self.progress_callback( 

353 f"Using search tool: {search_tool}", 

354 1.5, # Between setup and processing steps 

355 { 

356 "phase": "setup", 

357 "search_info": { 

358 "tool": search_tool, 

359 }, 

360 }, 

361 ) 

362 

363 # Use the strategy to analyze the topic 

364 result = self.strategy.analyze_topic(query) 

365 

366 # Update our attributes for backward compatibility 

367 

368 self.questions_by_iteration = ( 

369 self.strategy.questions_by_iteration.copy() 

370 ) 

371 # Send progress message with search info 

372 

373 # Only extend if they're different objects in memory to avoid duplication 

374 # This check prevents doubling the list when they reference the same object 

375 # Fix for issue #301: "too many links in detailed report mode" 

376 if id(self.all_links_of_system) != id( 

377 self.strategy.all_links_of_system 

378 ): 

379 self.all_links_of_system.extend(self.strategy.all_links_of_system) 

380 

381 # Include the search system instance for access to citations 

382 result["search_system"] = self 

383 result["all_links_of_system"] = self.all_links_of_system 

384 

385 # Ensure query is included in the result 

386 if "query" not in result: 

387 result["query"] = query 

388 result["questions_by_iteration"] = self.questions_by_iteration 

389 

390 # Call news callback 

391 try: 

392 from .news.core.search_integration import NewsSearchCallback 

393 

394 callback = NewsSearchCallback() 

395 context = { 

396 "is_user_search": is_user_search, 

397 "is_news_search": is_news_search, 

398 "user_id": user_id, 

399 "search_id": search_id, 

400 } 

401 callback(query, result, context) 

402 except Exception: 

403 logger.exception("Error in news callback") 

404 

405 return result