Coverage for src / local_deep_research / api / research_functions.py: 93%

186 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2API module for Local Deep Research. 

3Provides programmatic access to search and research capabilities. 

4""" 

5 

6from datetime import datetime, UTC 

7from typing import Any, Callable 

8 

9from loguru import logger 

10from local_deep_research.settings.logger import log_settings 

11 

12from ..config.llm_config import get_llm 

13from ..config.search_config import get_search 

14from ..config.thread_settings import get_setting_from_snapshot 

15from ..report_generator import IntegratedReportGenerator 

16from ..search_system import AdvancedSearchSystem 

17from ..utilities.db_utils import no_db_settings 

18from ..utilities.thread_context import clear_search_context, set_search_context 

19from ..utilities.search_utilities import remove_think_tags 

20from .settings_utils import create_settings_snapshot 

21 

22 

23def _init_search_system( 

24 model_name: str | None = None, 

25 temperature: float = 0.7, 

26 provider: str | None = None, 

27 openai_endpoint_url: str | None = None, 

28 progress_callback: Callable[[str, int, dict], None] | None = None, 

29 search_tool: str | None = None, 

30 search_strategy: str = "source_based", 

31 iterations: int = 1, 

32 questions_per_iteration: int = 1, 

33 retrievers: dict[str, Any] | None = None, 

34 llms: dict[str, Any] | None = None, 

35 username: str | None = None, 

36 research_id: str | None = None, 

37 research_context: dict[str, Any] | None = None, 

38 programmatic_mode: bool = True, 

39 search_original_query: bool = True, 

40 settings_snapshot: dict[str, Any] | None = None, 

41 **kwargs: Any, 

42) -> AdvancedSearchSystem: 

43 """ 

44 Initializes the advanced search system with specified parameters. This function sets up 

45 and returns an instance of the AdvancedSearchSystem using the provided configuration 

46 options such as model name, temperature for randomness in responses, provider service 

47 details, endpoint URL, and an optional search tool. 

48 

49 Args: 

50 model_name: Name of the model to use (if None, uses database setting) 

51 temperature: LLM temperature for generation 

52 provider: Provider to use (if None, uses database setting) 

53 openai_endpoint_url: Custom endpoint URL to use (if None, uses database 

54 setting) 

55 progress_callback: Optional callback function to receive progress updates 

56 search_tool: Search engine to use (auto, wikipedia, arxiv, etc.). If None, uses default 

57 search_strategy: Search strategy to use (modular, source_based, etc.). If None, uses default 

58 iterations: Number of research cycles to perform 

59 questions_per_iteration: Number of questions to generate per cycle 

60 search_strategy: The name of the search strategy to use. 

61 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines 

62 llms: Optional dictionary of {name: llm} pairs to use as language models 

63 programmatic_mode: If True, disables database operations and metrics tracking 

64 search_original_query: Whether to include the original query in the first iteration of search 

65 

66 Returns: 

67 AdvancedSearchSystem: An instance of the configured AdvancedSearchSystem. 

68 

69 """ 

70 # Register retrievers if provided 

71 if retrievers: 

72 from ..web_search_engines.retriever_registry import retriever_registry 

73 

74 retriever_registry.register_multiple(retrievers) 

75 logger.info( 

76 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}" 

77 ) 

78 

79 # Register LLMs if provided 

80 if llms: 

81 from ..llm import register_llm 

82 

83 for name, llm_instance in llms.items(): 

84 register_llm(name, llm_instance) 

85 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}") 

86 

87 # Use settings_snapshot from parameter, or fall back to kwargs 

88 if settings_snapshot is None: 

89 settings_snapshot = kwargs.get("settings_snapshot") 

90 

91 # Get language model with custom temperature 

92 llm = get_llm( 

93 temperature=temperature, 

94 openai_endpoint_url=openai_endpoint_url, 

95 model_name=model_name, 

96 provider=provider, 

97 research_id=research_id, 

98 research_context=research_context, 

99 settings_snapshot=settings_snapshot, 

100 ) 

101 

102 # Set the search engine if specified or get from settings 

103 search_engine = None 

104 

105 # If no search_tool provided, get from settings_snapshot 

106 if not search_tool and settings_snapshot: 

107 search_tool = get_setting_from_snapshot( 

108 "search.tool", settings_snapshot=settings_snapshot 

109 ) 

110 

111 if search_tool: 

112 search_engine = get_search( 

113 search_tool, 

114 llm_instance=llm, 

115 username=username, 

116 settings_snapshot=settings_snapshot, 

117 programmatic_mode=programmatic_mode, 

118 ) 

119 if search_engine is None: 

120 logger.warning( 

121 f"Could not create search engine '{search_tool}', using default." 

122 ) 

123 

124 # Create search system with custom parameters 

125 logger.info("Search strategy: {}", search_strategy) 

126 system = AdvancedSearchSystem( 

127 llm=llm, 

128 search=search_engine, 

129 strategy_name=search_strategy, 

130 username=username, 

131 research_id=research_id, 

132 research_context=research_context, 

133 settings_snapshot=settings_snapshot, 

134 programmatic_mode=programmatic_mode, 

135 search_original_query=search_original_query, 

136 ) 

137 

138 # Override default settings with user-provided values 

139 system.max_iterations = iterations 

140 system.questions_per_iteration = questions_per_iteration 

141 

142 # Set progress callback if provided 

143 if progress_callback: 

144 system.set_progress_callback(progress_callback) 

145 

146 return system 

147 

148 

149@no_db_settings 

150def quick_summary( 

151 query: str, 

152 research_id: str | None = None, 

153 retrievers: dict[str, Any] | None = None, 

154 llms: dict[str, Any] | None = None, 

155 username: str | None = None, 

156 provider: str | None = None, 

157 api_key: str | None = None, 

158 temperature: float | None = None, 

159 max_search_results: int | None = None, 

160 settings: dict[str, Any] | None = None, 

161 settings_override: dict[str, Any] | None = None, 

162 search_original_query: bool = True, 

163 **kwargs: Any, 

164) -> dict[str, Any]: 

165 """ 

166 Generate a quick research summary for a given query. 

167 

168 Args: 

169 query: The research query to analyze 

170 research_id: Optional research ID (int or UUID string) for tracking metrics 

171 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines 

172 llms: Optional dictionary of {name: llm} pairs to use as language models 

173 provider: LLM provider to use (e.g., 'openai', 'anthropic'). For programmatic API only. 

174 api_key: API key for the provider. For programmatic API only. 

175 temperature: LLM temperature (0.0-1.0). For programmatic API only. 

176 max_search_results: Maximum number of search results to return. For programmatic API only. 

177 settings: Base settings dict to use instead of defaults. For programmatic API only. 

178 settings_override: Dictionary of settings to override (e.g., {"llm.max_tokens": 4000}). For programmatic API only. 

179 search_original_query: Whether to include the original query in the first iteration of search. 

180 Set to False for news searches to avoid sending long subscription prompts to search engines. 

181 **kwargs: Additional configuration for the search system. Will be forwarded to 

182 `_init_search_system()`. 

183 

184 Returns: 

185 Dictionary containing the research results with keys: 

186 - 'summary': The generated summary text 

187 - 'findings': List of detailed findings from each search 

188 - 'iterations': Number of iterations performed 

189 - 'questions': Questions generated during research 

190 

191 Examples: 

192 # Simple usage with defaults 

193 result = quick_summary("What is quantum computing?") 

194 

195 # With custom provider 

196 result = quick_summary( 

197 "What is quantum computing?", 

198 provider="anthropic", 

199 api_key="sk-ant-..." 

200 ) 

201 

202 # With advanced settings 

203 result = quick_summary( 

204 "What is quantum computing?", 

205 temperature=0.2, 

206 settings_override={"search.engines.arxiv.enabled": True} 

207 ) 

208 """ 

209 logger.info("Generating quick summary for query: %s", query) 

210 

211 # Only create settings snapshot if not already provided (programmatic API) 

212 if "settings_snapshot" not in kwargs: 

213 # Build kwargs for create_settings_snapshot from explicit parameters 

214 snapshot_kwargs = {} 

215 if provider is not None: 

216 snapshot_kwargs["provider"] = provider 

217 if api_key is not None: 

218 snapshot_kwargs["api_key"] = api_key 

219 if temperature is not None: 

220 snapshot_kwargs["temperature"] = temperature 

221 if max_search_results is not None: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 snapshot_kwargs["max_search_results"] = max_search_results 

223 

224 # Create settings snapshot for programmatic use (only if not already provided) 

225 if "settings_snapshot" not in kwargs: 225 ↛ 236line 225 didn't jump to line 236 because the condition on line 225 was always true

226 kwargs["settings_snapshot"] = create_settings_snapshot( 

227 base_settings=settings, 

228 overrides=settings_override, 

229 **snapshot_kwargs, 

230 ) 

231 log_settings( 

232 kwargs["settings_snapshot"], 

233 "Created settings snapshot for programmatic API", 

234 ) 

235 else: 

236 log_settings( 

237 kwargs["settings_snapshot"], 

238 "Using provided settings snapshot for programmatic API", 

239 ) 

240 

241 # Generate a research_id if none provided 

242 if research_id is None: 

243 import uuid 

244 

245 research_id = str(uuid.uuid4()) 

246 logger.debug(f"Generated research_id: {research_id}") 

247 

248 # Register retrievers if provided 

249 if retrievers: 

250 from ..web_search_engines.retriever_registry import retriever_registry 

251 

252 retriever_registry.register_multiple(retrievers) 

253 logger.info( 

254 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}" 

255 ) 

256 

257 # Register LLMs if provided 

258 if llms: 

259 from ..llm import register_llm 

260 

261 for name, llm_instance in llms.items(): 

262 register_llm(name, llm_instance) 

263 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}") 

264 

265 search_context = { 

266 "research_id": research_id, # Pass UUID or integer directly 

267 "research_query": query, 

268 "research_mode": kwargs.get("research_mode", "quick"), 

269 "research_phase": "init", 

270 "search_iteration": 0, 

271 "search_engine_selected": kwargs.get("search_tool"), 

272 "username": username, # Include username for metrics tracking 

273 "user_password": kwargs.get( 

274 "user_password" 

275 ), # Include password for metrics tracking 

276 } 

277 set_search_context(search_context) 

278 

279 try: 

280 # Remove research_mode from kwargs before passing to _init_search_system 

281 init_kwargs = {k: v for k, v in kwargs.items() if k != "research_mode"} 

282 # Make sure username is passed to the system 

283 init_kwargs["username"] = username 

284 init_kwargs["research_id"] = research_id 

285 init_kwargs["research_context"] = search_context 

286 init_kwargs["search_original_query"] = search_original_query 

287 system = _init_search_system(llms=llms, **init_kwargs) 

288 

289 # Perform the search and analysis 

290 results = system.analyze_topic(query) 

291 

292 # Extract the summary from the current knowledge 

293 if results and "current_knowledge" in results: 

294 summary = results["current_knowledge"] 

295 else: 

296 summary = "Unable to generate summary for the query." 

297 

298 # Prepare the return value 

299 return { 

300 "research_id": research_id, 

301 "summary": summary, 

302 "findings": results.get("findings", []), 

303 "iterations": results.get("iterations", 0), 

304 "questions": results.get("questions", {}), 

305 "formatted_findings": results.get("formatted_findings", ""), 

306 "sources": results.get("all_links_of_system", []), 

307 } 

308 finally: 

309 clear_search_context() 

310 

311 

312@no_db_settings 

313def generate_report( 

314 query: str, 

315 output_file: str | None = None, 

316 progress_callback: Callable | None = None, 

317 searches_per_section: int = 2, 

318 retrievers: dict[str, Any] | None = None, 

319 llms: dict[str, Any] | None = None, 

320 provider: str | None = None, 

321 api_key: str | None = None, 

322 temperature: float | None = None, 

323 max_search_results: int | None = None, 

324 settings: dict[str, Any] | None = None, 

325 settings_override: dict[str, Any] | None = None, 

326 **kwargs: Any, 

327) -> dict[str, Any]: 

328 """ 

329 Generate a comprehensive, structured research report for a given query. 

330 

331 Args: 

332 query: The research query to analyze 

333 output_file: Optional path to save report markdown file 

334 progress_callback: Optional callback function to receive progress updates 

335 searches_per_section: The number of searches to perform for each 

336 section in the report. 

337 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines 

338 llms: Optional dictionary of {name: llm} pairs to use as language models 

339 provider: LLM provider to use (e.g., 'openai', 'anthropic'). For programmatic API only. 

340 api_key: API key for the provider. For programmatic API only. 

341 temperature: LLM temperature (0.0-1.0). For programmatic API only. 

342 max_search_results: Maximum number of search results to return. For programmatic API only. 

343 settings: Base settings dict to use instead of defaults. For programmatic API only. 

344 settings_override: Dictionary of settings to override. For programmatic API only. 

345 **kwargs: Additional configuration for the search system. 

346 

347 Returns: 

348 Dictionary containing the research report with keys: 

349 - 'content': The full report content in markdown format 

350 - 'metadata': Report metadata including generated timestamp and query 

351 - 'file_path': Path to saved file (if output_file was provided) 

352 

353 Examples: 

354 # Simple usage with settings snapshot 

355 from local_deep_research.api.settings_utils import create_settings_snapshot 

356 settings = create_settings_snapshot({"programmatic_mode": True}) 

357 result = generate_report("AI research", settings_snapshot=settings) 

358 

359 # Save to file 

360 result = generate_report( 

361 "AI research", 

362 output_file="report.md", 

363 settings_snapshot=settings 

364 ) 

365 """ 

366 logger.info("Generating comprehensive research report for query: %s", query) 

367 

368 # Only create settings snapshot if not already provided (programmatic API) 

369 if "settings_snapshot" not in kwargs: 369 ↛ 399line 369 didn't jump to line 399 because the condition on line 369 was always true

370 # Build kwargs for create_settings_snapshot from explicit parameters 

371 snapshot_kwargs = {} 

372 if provider is not None: 

373 snapshot_kwargs["provider"] = provider 

374 if api_key is not None: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 snapshot_kwargs["api_key"] = api_key 

376 if temperature is not None: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 snapshot_kwargs["temperature"] = temperature 

378 if max_search_results is not None: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true

379 snapshot_kwargs["max_search_results"] = max_search_results 

380 

381 # Create settings snapshot for programmatic use (only if not already provided) 

382 if "settings_snapshot" not in kwargs: 382 ↛ 393line 382 didn't jump to line 393 because the condition on line 382 was always true

383 kwargs["settings_snapshot"] = create_settings_snapshot( 

384 base_settings=settings, 

385 overrides=settings_override, 

386 **snapshot_kwargs, 

387 ) 

388 log_settings( 

389 kwargs["settings_snapshot"], 

390 "Created settings snapshot for programmatic API", 

391 ) 

392 else: 

393 log_settings( 

394 kwargs["settings_snapshot"], 

395 "Using provided settings snapshot for programmatic API", 

396 ) 

397 

398 # Register retrievers if provided 

399 if retrievers: 

400 from ..web_search_engines.retriever_registry import retriever_registry 

401 

402 retriever_registry.register_multiple(retrievers) 

403 logger.info( 

404 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}" 

405 ) 

406 

407 # Register LLMs if provided 

408 if llms: 

409 from ..llm import register_llm 

410 

411 for name, llm_instance in llms.items(): 

412 register_llm(name, llm_instance) 

413 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}") 

414 

415 system = _init_search_system(retrievers=retrievers, llms=llms, **kwargs) 

416 

417 # Set progress callback if provided 

418 if progress_callback: 

419 system.set_progress_callback(progress_callback) 

420 

421 # Perform the initial research 

422 initial_findings = system.analyze_topic(query) 

423 

424 # Generate the structured report 

425 report_generator = IntegratedReportGenerator( 

426 search_system=system, 

427 llm=system.model, 

428 searches_per_section=searches_per_section, 

429 settings_snapshot=kwargs.get("settings_snapshot"), 

430 ) 

431 report = report_generator.generate_report(initial_findings, query) 

432 

433 # Save report to file if path is provided 

434 if output_file and report and "content" in report: 

435 from ..security.file_write_verifier import write_file_verified 

436 

437 write_file_verified( 

438 output_file, 

439 report["content"], 

440 "api.allow_file_output", 

441 context="API research report", 

442 settings_snapshot=kwargs.get("settings_snapshot"), 

443 ) 

444 logger.info(f"Report saved to {output_file}") 

445 report["file_path"] = output_file 

446 return report 

447 

448 

449@no_db_settings 

450def detailed_research( 

451 query: str, 

452 research_id: str | None = None, 

453 retrievers: dict[str, Any] | None = None, 

454 llms: dict[str, Any] | None = None, 

455 **kwargs: Any, 

456) -> dict[str, Any]: 

457 """ 

458 Perform detailed research with comprehensive analysis. 

459 

460 Similar to generate_report but returns structured data instead of markdown. 

461 

462 Args: 

463 query: The research query to analyze 

464 research_id: Optional research ID (int or UUID string) for tracking metrics 

465 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines 

466 llms: Optional dictionary of {name: llm} pairs to use as language models 

467 **kwargs: Configuration for the search system 

468 

469 Returns: 

470 Dictionary containing detailed research results 

471 """ 

472 logger.info("Performing detailed research for query: %s", query) 

473 

474 # Generate a research_id if none provided 

475 if research_id is None: 

476 import uuid 

477 

478 research_id = str(uuid.uuid4()) 

479 logger.debug(f"Generated research_id: {research_id}") 

480 

481 # Register retrievers if provided 

482 if retrievers: 

483 from ..web_search_engines.retriever_registry import retriever_registry 

484 

485 retriever_registry.register_multiple(retrievers) 

486 logger.info( 

487 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}" 

488 ) 

489 

490 # Register LLMs if provided 

491 if llms: 

492 from ..llm import register_llm 

493 

494 for name, llm_instance in llms.items(): 

495 register_llm(name, llm_instance) 

496 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}") 

497 

498 search_context = { 

499 "research_id": research_id, 

500 "research_query": query, 

501 "research_mode": "detailed", 

502 "research_phase": "init", 

503 "search_iteration": 0, 

504 "search_engine_selected": kwargs.get("search_tool"), 

505 } 

506 set_search_context(search_context) 

507 

508 try: 

509 # Initialize system 

510 system = _init_search_system(retrievers=retrievers, llms=llms, **kwargs) 

511 

512 # Perform detailed research 

513 results = system.analyze_topic(query) 

514 

515 # Return comprehensive results 

516 return { 

517 "query": query, 

518 "research_id": research_id, 

519 "summary": results.get("current_knowledge", ""), 

520 "findings": results.get("findings", []), 

521 "iterations": results.get("iterations", 0), 

522 "questions": results.get("questions", {}), 

523 "formatted_findings": results.get("formatted_findings", ""), 

524 "sources": results.get("all_links_of_system", []), 

525 "metadata": { 

526 "timestamp": datetime.now(UTC).isoformat(), 

527 "search_tool": kwargs.get("search_tool", "auto"), 

528 "iterations_requested": kwargs.get("iterations", 1), 

529 "strategy": kwargs.get("search_strategy", "source_based"), 

530 }, 

531 } 

532 finally: 

533 clear_search_context() 

534 

535 

536@no_db_settings 

537def analyze_documents( 

538 query: str, 

539 collection_name: str, 

540 max_results: int = 10, 

541 temperature: float = 0.7, 

542 force_reindex: bool = False, 

543 output_file: str | None = None, 

544) -> dict[str, Any]: 

545 """ 

546 Search and analyze documents in a specific local collection. 

547 

548 Args: 

549 query: The search query 

550 collection_name: Name of the local document collection to search 

551 max_results: Maximum number of results to return 

552 temperature: LLM temperature for summary generation 

553 force_reindex: Whether to force reindexing the collection 

554 output_file: Optional path to save analysis results to a file 

555 

556 Returns: 

557 Dictionary containing: 

558 - 'summary': Summary of the findings 

559 - 'documents': List of matching documents with content and metadata 

560 """ 

561 logger.info( 

562 f"Analyzing documents in collection '{collection_name}' for query: {query}" 

563 ) 

564 

565 # Get language model with custom temperature 

566 llm = get_llm(temperature=temperature) 

567 

568 # Get search engine for the specified collection 

569 search = get_search(collection_name, llm_instance=llm) 

570 

571 if not search: 

572 return { 

573 "summary": f"Error: Collection '{collection_name}' not found or not properly configured.", 

574 "documents": [], 

575 } 

576 

577 # Set max results 

578 search.max_results = max_results 

579 

580 # Force reindex if requested 

581 if force_reindex and hasattr(search, "embedding_manager"): 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true

582 for folder_path in search.folder_paths: 

583 search.embedding_manager.index_folder( 

584 folder_path, force_reindex=True 

585 ) 

586 

587 # Perform the search 

588 results = search.run(query) 

589 

590 if not results: 

591 return { 

592 "summary": f"No documents found in collection '{collection_name}' for query: '{query}'", 

593 "documents": [], 

594 } 

595 

596 # Get LLM to generate a summary of the results 

597 

598 docs_text = "\n\n".join( 

599 [ 

600 f"Document {i + 1}:" 

601 f" {doc.get('content', doc.get('snippet', ''))[:1000]}" 

602 for i, doc in enumerate(results[:5]) 

603 ] 

604 ) # Limit to first 5 docs and 1000 chars each 

605 

606 summary_prompt = f"""Analyze these document excerpts related to the query: "{query}" 

607 

608 {docs_text} 

609 

610 Provide a concise summary of the key information found in these documents related to the query. 

611 """ 

612 

613 import time 

614 

615 llm_start_time = time.time() 

616 logger.info( 

617 f"Starting LLM summary generation (prompt length: {len(summary_prompt)} chars)..." 

618 ) 

619 

620 summary_response = llm.invoke(summary_prompt) 

621 

622 llm_elapsed = time.time() - llm_start_time 

623 logger.info(f"LLM summary generation completed in {llm_elapsed:.2f}s") 

624 

625 if hasattr(summary_response, "content"): 625 ↛ 628line 625 didn't jump to line 628 because the condition on line 625 was always true

626 summary = remove_think_tags(summary_response.content) 

627 else: 

628 summary = str(summary_response) 

629 

630 # Create result dictionary 

631 analysis_result = { 

632 "summary": summary, 

633 "documents": results, 

634 "collection": collection_name, 

635 "document_count": len(results), 

636 } 

637 

638 # Save to file if requested 

639 if output_file: 

640 from ..security.file_write_verifier import write_file_verified 

641 

642 content = f"# Document Analysis: {query}\n\n" 

643 content += f"## Summary\n\n{summary}\n\n" 

644 content += f"## Documents Found: {len(results)}\n\n" 

645 

646 for i, doc in enumerate(results): 

647 content += ( 

648 f"### Document {i + 1}: {doc.get('title', 'Untitled')}\n\n" 

649 ) 

650 content += f"**Source:** {doc.get('link', 'Unknown')}\n\n" 

651 content += f"**Content:**\n\n{doc.get('content', doc.get('snippet', 'No content available'))[:1000]}...\n\n" 

652 content += "---\n\n" 

653 

654 write_file_verified( 

655 output_file, 

656 content, 

657 "api.allow_file_output", 

658 context="API document analysis", 

659 settings_snapshot=None, # analyze_documents doesn't support programmatic mode yet 

660 ) 

661 

662 analysis_result["file_path"] = output_file 

663 logger.info(f"Analysis saved to {output_file}") 

664 

665 return analysis_result