Coverage for src / local_deep_research / api / research_functions.py: 93%
186 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2API module for Local Deep Research.
3Provides programmatic access to search and research capabilities.
4"""
6from datetime import datetime, UTC
7from typing import Any, Callable
9from loguru import logger
10from local_deep_research.settings.logger import log_settings
12from ..config.llm_config import get_llm
13from ..config.search_config import get_search
14from ..config.thread_settings import get_setting_from_snapshot
15from ..report_generator import IntegratedReportGenerator
16from ..search_system import AdvancedSearchSystem
17from ..utilities.db_utils import no_db_settings
18from ..utilities.thread_context import clear_search_context, set_search_context
19from ..utilities.search_utilities import remove_think_tags
20from .settings_utils import create_settings_snapshot
23def _init_search_system(
24 model_name: str | None = None,
25 temperature: float = 0.7,
26 provider: str | None = None,
27 openai_endpoint_url: str | None = None,
28 progress_callback: Callable[[str, int, dict], None] | None = None,
29 search_tool: str | None = None,
30 search_strategy: str = "source_based",
31 iterations: int = 1,
32 questions_per_iteration: int = 1,
33 retrievers: dict[str, Any] | None = None,
34 llms: dict[str, Any] | None = None,
35 username: str | None = None,
36 research_id: str | None = None,
37 research_context: dict[str, Any] | None = None,
38 programmatic_mode: bool = True,
39 search_original_query: bool = True,
40 settings_snapshot: dict[str, Any] | None = None,
41 **kwargs: Any,
42) -> AdvancedSearchSystem:
43 """
44 Initializes the advanced search system with specified parameters. This function sets up
45 and returns an instance of the AdvancedSearchSystem using the provided configuration
46 options such as model name, temperature for randomness in responses, provider service
47 details, endpoint URL, and an optional search tool.
49 Args:
50 model_name: Name of the model to use (if None, uses database setting)
51 temperature: LLM temperature for generation
52 provider: Provider to use (if None, uses database setting)
53 openai_endpoint_url: Custom endpoint URL to use (if None, uses database
54 setting)
55 progress_callback: Optional callback function to receive progress updates
56 search_tool: Search engine to use (auto, wikipedia, arxiv, etc.). If None, uses default
57 search_strategy: Search strategy to use (modular, source_based, etc.). If None, uses default
58 iterations: Number of research cycles to perform
59 questions_per_iteration: Number of questions to generate per cycle
60 search_strategy: The name of the search strategy to use.
61 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
62 llms: Optional dictionary of {name: llm} pairs to use as language models
63 programmatic_mode: If True, disables database operations and metrics tracking
64 search_original_query: Whether to include the original query in the first iteration of search
66 Returns:
67 AdvancedSearchSystem: An instance of the configured AdvancedSearchSystem.
69 """
70 # Register retrievers if provided
71 if retrievers:
72 from ..web_search_engines.retriever_registry import retriever_registry
74 retriever_registry.register_multiple(retrievers)
75 logger.info(
76 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
77 )
79 # Register LLMs if provided
80 if llms:
81 from ..llm import register_llm
83 for name, llm_instance in llms.items():
84 register_llm(name, llm_instance)
85 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
87 # Use settings_snapshot from parameter, or fall back to kwargs
88 if settings_snapshot is None:
89 settings_snapshot = kwargs.get("settings_snapshot")
91 # Get language model with custom temperature
92 llm = get_llm(
93 temperature=temperature,
94 openai_endpoint_url=openai_endpoint_url,
95 model_name=model_name,
96 provider=provider,
97 research_id=research_id,
98 research_context=research_context,
99 settings_snapshot=settings_snapshot,
100 )
102 # Set the search engine if specified or get from settings
103 search_engine = None
105 # If no search_tool provided, get from settings_snapshot
106 if not search_tool and settings_snapshot:
107 search_tool = get_setting_from_snapshot(
108 "search.tool", settings_snapshot=settings_snapshot
109 )
111 if search_tool:
112 search_engine = get_search(
113 search_tool,
114 llm_instance=llm,
115 username=username,
116 settings_snapshot=settings_snapshot,
117 programmatic_mode=programmatic_mode,
118 )
119 if search_engine is None:
120 logger.warning(
121 f"Could not create search engine '{search_tool}', using default."
122 )
124 # Create search system with custom parameters
125 logger.info("Search strategy: {}", search_strategy)
126 system = AdvancedSearchSystem(
127 llm=llm,
128 search=search_engine,
129 strategy_name=search_strategy,
130 username=username,
131 research_id=research_id,
132 research_context=research_context,
133 settings_snapshot=settings_snapshot,
134 programmatic_mode=programmatic_mode,
135 search_original_query=search_original_query,
136 )
138 # Override default settings with user-provided values
139 system.max_iterations = iterations
140 system.questions_per_iteration = questions_per_iteration
142 # Set progress callback if provided
143 if progress_callback:
144 system.set_progress_callback(progress_callback)
146 return system
149@no_db_settings
150def quick_summary(
151 query: str,
152 research_id: str | None = None,
153 retrievers: dict[str, Any] | None = None,
154 llms: dict[str, Any] | None = None,
155 username: str | None = None,
156 provider: str | None = None,
157 api_key: str | None = None,
158 temperature: float | None = None,
159 max_search_results: int | None = None,
160 settings: dict[str, Any] | None = None,
161 settings_override: dict[str, Any] | None = None,
162 search_original_query: bool = True,
163 **kwargs: Any,
164) -> dict[str, Any]:
165 """
166 Generate a quick research summary for a given query.
168 Args:
169 query: The research query to analyze
170 research_id: Optional research ID (int or UUID string) for tracking metrics
171 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
172 llms: Optional dictionary of {name: llm} pairs to use as language models
173 provider: LLM provider to use (e.g., 'openai', 'anthropic'). For programmatic API only.
174 api_key: API key for the provider. For programmatic API only.
175 temperature: LLM temperature (0.0-1.0). For programmatic API only.
176 max_search_results: Maximum number of search results to return. For programmatic API only.
177 settings: Base settings dict to use instead of defaults. For programmatic API only.
178 settings_override: Dictionary of settings to override (e.g., {"llm.max_tokens": 4000}). For programmatic API only.
179 search_original_query: Whether to include the original query in the first iteration of search.
180 Set to False for news searches to avoid sending long subscription prompts to search engines.
181 **kwargs: Additional configuration for the search system. Will be forwarded to
182 `_init_search_system()`.
184 Returns:
185 Dictionary containing the research results with keys:
186 - 'summary': The generated summary text
187 - 'findings': List of detailed findings from each search
188 - 'iterations': Number of iterations performed
189 - 'questions': Questions generated during research
191 Examples:
192 # Simple usage with defaults
193 result = quick_summary("What is quantum computing?")
195 # With custom provider
196 result = quick_summary(
197 "What is quantum computing?",
198 provider="anthropic",
199 api_key="sk-ant-..."
200 )
202 # With advanced settings
203 result = quick_summary(
204 "What is quantum computing?",
205 temperature=0.2,
206 settings_override={"search.engines.arxiv.enabled": True}
207 )
208 """
209 logger.info("Generating quick summary for query: %s", query)
211 # Only create settings snapshot if not already provided (programmatic API)
212 if "settings_snapshot" not in kwargs:
213 # Build kwargs for create_settings_snapshot from explicit parameters
214 snapshot_kwargs = {}
215 if provider is not None:
216 snapshot_kwargs["provider"] = provider
217 if api_key is not None:
218 snapshot_kwargs["api_key"] = api_key
219 if temperature is not None:
220 snapshot_kwargs["temperature"] = temperature
221 if max_search_results is not None: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 snapshot_kwargs["max_search_results"] = max_search_results
224 # Create settings snapshot for programmatic use (only if not already provided)
225 if "settings_snapshot" not in kwargs: 225 ↛ 236line 225 didn't jump to line 236 because the condition on line 225 was always true
226 kwargs["settings_snapshot"] = create_settings_snapshot(
227 base_settings=settings,
228 overrides=settings_override,
229 **snapshot_kwargs,
230 )
231 log_settings(
232 kwargs["settings_snapshot"],
233 "Created settings snapshot for programmatic API",
234 )
235 else:
236 log_settings(
237 kwargs["settings_snapshot"],
238 "Using provided settings snapshot for programmatic API",
239 )
241 # Generate a research_id if none provided
242 if research_id is None:
243 import uuid
245 research_id = str(uuid.uuid4())
246 logger.debug(f"Generated research_id: {research_id}")
248 # Register retrievers if provided
249 if retrievers:
250 from ..web_search_engines.retriever_registry import retriever_registry
252 retriever_registry.register_multiple(retrievers)
253 logger.info(
254 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
255 )
257 # Register LLMs if provided
258 if llms:
259 from ..llm import register_llm
261 for name, llm_instance in llms.items():
262 register_llm(name, llm_instance)
263 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
265 search_context = {
266 "research_id": research_id, # Pass UUID or integer directly
267 "research_query": query,
268 "research_mode": kwargs.get("research_mode", "quick"),
269 "research_phase": "init",
270 "search_iteration": 0,
271 "search_engine_selected": kwargs.get("search_tool"),
272 "username": username, # Include username for metrics tracking
273 "user_password": kwargs.get(
274 "user_password"
275 ), # Include password for metrics tracking
276 }
277 set_search_context(search_context)
279 try:
280 # Remove research_mode from kwargs before passing to _init_search_system
281 init_kwargs = {k: v for k, v in kwargs.items() if k != "research_mode"}
282 # Make sure username is passed to the system
283 init_kwargs["username"] = username
284 init_kwargs["research_id"] = research_id
285 init_kwargs["research_context"] = search_context
286 init_kwargs["search_original_query"] = search_original_query
287 system = _init_search_system(llms=llms, **init_kwargs)
289 # Perform the search and analysis
290 results = system.analyze_topic(query)
292 # Extract the summary from the current knowledge
293 if results and "current_knowledge" in results:
294 summary = results["current_knowledge"]
295 else:
296 summary = "Unable to generate summary for the query."
298 # Prepare the return value
299 return {
300 "research_id": research_id,
301 "summary": summary,
302 "findings": results.get("findings", []),
303 "iterations": results.get("iterations", 0),
304 "questions": results.get("questions", {}),
305 "formatted_findings": results.get("formatted_findings", ""),
306 "sources": results.get("all_links_of_system", []),
307 }
308 finally:
309 clear_search_context()
312@no_db_settings
313def generate_report(
314 query: str,
315 output_file: str | None = None,
316 progress_callback: Callable | None = None,
317 searches_per_section: int = 2,
318 retrievers: dict[str, Any] | None = None,
319 llms: dict[str, Any] | None = None,
320 provider: str | None = None,
321 api_key: str | None = None,
322 temperature: float | None = None,
323 max_search_results: int | None = None,
324 settings: dict[str, Any] | None = None,
325 settings_override: dict[str, Any] | None = None,
326 **kwargs: Any,
327) -> dict[str, Any]:
328 """
329 Generate a comprehensive, structured research report for a given query.
331 Args:
332 query: The research query to analyze
333 output_file: Optional path to save report markdown file
334 progress_callback: Optional callback function to receive progress updates
335 searches_per_section: The number of searches to perform for each
336 section in the report.
337 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
338 llms: Optional dictionary of {name: llm} pairs to use as language models
339 provider: LLM provider to use (e.g., 'openai', 'anthropic'). For programmatic API only.
340 api_key: API key for the provider. For programmatic API only.
341 temperature: LLM temperature (0.0-1.0). For programmatic API only.
342 max_search_results: Maximum number of search results to return. For programmatic API only.
343 settings: Base settings dict to use instead of defaults. For programmatic API only.
344 settings_override: Dictionary of settings to override. For programmatic API only.
345 **kwargs: Additional configuration for the search system.
347 Returns:
348 Dictionary containing the research report with keys:
349 - 'content': The full report content in markdown format
350 - 'metadata': Report metadata including generated timestamp and query
351 - 'file_path': Path to saved file (if output_file was provided)
353 Examples:
354 # Simple usage with settings snapshot
355 from local_deep_research.api.settings_utils import create_settings_snapshot
356 settings = create_settings_snapshot({"programmatic_mode": True})
357 result = generate_report("AI research", settings_snapshot=settings)
359 # Save to file
360 result = generate_report(
361 "AI research",
362 output_file="report.md",
363 settings_snapshot=settings
364 )
365 """
366 logger.info("Generating comprehensive research report for query: %s", query)
368 # Only create settings snapshot if not already provided (programmatic API)
369 if "settings_snapshot" not in kwargs: 369 ↛ 399line 369 didn't jump to line 399 because the condition on line 369 was always true
370 # Build kwargs for create_settings_snapshot from explicit parameters
371 snapshot_kwargs = {}
372 if provider is not None:
373 snapshot_kwargs["provider"] = provider
374 if api_key is not None: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true
375 snapshot_kwargs["api_key"] = api_key
376 if temperature is not None: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 snapshot_kwargs["temperature"] = temperature
378 if max_search_results is not None: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true
379 snapshot_kwargs["max_search_results"] = max_search_results
381 # Create settings snapshot for programmatic use (only if not already provided)
382 if "settings_snapshot" not in kwargs: 382 ↛ 393line 382 didn't jump to line 393 because the condition on line 382 was always true
383 kwargs["settings_snapshot"] = create_settings_snapshot(
384 base_settings=settings,
385 overrides=settings_override,
386 **snapshot_kwargs,
387 )
388 log_settings(
389 kwargs["settings_snapshot"],
390 "Created settings snapshot for programmatic API",
391 )
392 else:
393 log_settings(
394 kwargs["settings_snapshot"],
395 "Using provided settings snapshot for programmatic API",
396 )
398 # Register retrievers if provided
399 if retrievers:
400 from ..web_search_engines.retriever_registry import retriever_registry
402 retriever_registry.register_multiple(retrievers)
403 logger.info(
404 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
405 )
407 # Register LLMs if provided
408 if llms:
409 from ..llm import register_llm
411 for name, llm_instance in llms.items():
412 register_llm(name, llm_instance)
413 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
415 system = _init_search_system(retrievers=retrievers, llms=llms, **kwargs)
417 # Set progress callback if provided
418 if progress_callback:
419 system.set_progress_callback(progress_callback)
421 # Perform the initial research
422 initial_findings = system.analyze_topic(query)
424 # Generate the structured report
425 report_generator = IntegratedReportGenerator(
426 search_system=system,
427 llm=system.model,
428 searches_per_section=searches_per_section,
429 settings_snapshot=kwargs.get("settings_snapshot"),
430 )
431 report = report_generator.generate_report(initial_findings, query)
433 # Save report to file if path is provided
434 if output_file and report and "content" in report:
435 from ..security.file_write_verifier import write_file_verified
437 write_file_verified(
438 output_file,
439 report["content"],
440 "api.allow_file_output",
441 context="API research report",
442 settings_snapshot=kwargs.get("settings_snapshot"),
443 )
444 logger.info(f"Report saved to {output_file}")
445 report["file_path"] = output_file
446 return report
449@no_db_settings
450def detailed_research(
451 query: str,
452 research_id: str | None = None,
453 retrievers: dict[str, Any] | None = None,
454 llms: dict[str, Any] | None = None,
455 **kwargs: Any,
456) -> dict[str, Any]:
457 """
458 Perform detailed research with comprehensive analysis.
460 Similar to generate_report but returns structured data instead of markdown.
462 Args:
463 query: The research query to analyze
464 research_id: Optional research ID (int or UUID string) for tracking metrics
465 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
466 llms: Optional dictionary of {name: llm} pairs to use as language models
467 **kwargs: Configuration for the search system
469 Returns:
470 Dictionary containing detailed research results
471 """
472 logger.info("Performing detailed research for query: %s", query)
474 # Generate a research_id if none provided
475 if research_id is None:
476 import uuid
478 research_id = str(uuid.uuid4())
479 logger.debug(f"Generated research_id: {research_id}")
481 # Register retrievers if provided
482 if retrievers:
483 from ..web_search_engines.retriever_registry import retriever_registry
485 retriever_registry.register_multiple(retrievers)
486 logger.info(
487 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
488 )
490 # Register LLMs if provided
491 if llms:
492 from ..llm import register_llm
494 for name, llm_instance in llms.items():
495 register_llm(name, llm_instance)
496 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
498 search_context = {
499 "research_id": research_id,
500 "research_query": query,
501 "research_mode": "detailed",
502 "research_phase": "init",
503 "search_iteration": 0,
504 "search_engine_selected": kwargs.get("search_tool"),
505 }
506 set_search_context(search_context)
508 try:
509 # Initialize system
510 system = _init_search_system(retrievers=retrievers, llms=llms, **kwargs)
512 # Perform detailed research
513 results = system.analyze_topic(query)
515 # Return comprehensive results
516 return {
517 "query": query,
518 "research_id": research_id,
519 "summary": results.get("current_knowledge", ""),
520 "findings": results.get("findings", []),
521 "iterations": results.get("iterations", 0),
522 "questions": results.get("questions", {}),
523 "formatted_findings": results.get("formatted_findings", ""),
524 "sources": results.get("all_links_of_system", []),
525 "metadata": {
526 "timestamp": datetime.now(UTC).isoformat(),
527 "search_tool": kwargs.get("search_tool", "auto"),
528 "iterations_requested": kwargs.get("iterations", 1),
529 "strategy": kwargs.get("search_strategy", "source_based"),
530 },
531 }
532 finally:
533 clear_search_context()
536@no_db_settings
537def analyze_documents(
538 query: str,
539 collection_name: str,
540 max_results: int = 10,
541 temperature: float = 0.7,
542 force_reindex: bool = False,
543 output_file: str | None = None,
544) -> dict[str, Any]:
545 """
546 Search and analyze documents in a specific local collection.
548 Args:
549 query: The search query
550 collection_name: Name of the local document collection to search
551 max_results: Maximum number of results to return
552 temperature: LLM temperature for summary generation
553 force_reindex: Whether to force reindexing the collection
554 output_file: Optional path to save analysis results to a file
556 Returns:
557 Dictionary containing:
558 - 'summary': Summary of the findings
559 - 'documents': List of matching documents with content and metadata
560 """
561 logger.info(
562 f"Analyzing documents in collection '{collection_name}' for query: {query}"
563 )
565 # Get language model with custom temperature
566 llm = get_llm(temperature=temperature)
568 # Get search engine for the specified collection
569 search = get_search(collection_name, llm_instance=llm)
571 if not search:
572 return {
573 "summary": f"Error: Collection '{collection_name}' not found or not properly configured.",
574 "documents": [],
575 }
577 # Set max results
578 search.max_results = max_results
580 # Force reindex if requested
581 if force_reindex and hasattr(search, "embedding_manager"): 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true
582 for folder_path in search.folder_paths:
583 search.embedding_manager.index_folder(
584 folder_path, force_reindex=True
585 )
587 # Perform the search
588 results = search.run(query)
590 if not results:
591 return {
592 "summary": f"No documents found in collection '{collection_name}' for query: '{query}'",
593 "documents": [],
594 }
596 # Get LLM to generate a summary of the results
598 docs_text = "\n\n".join(
599 [
600 f"Document {i + 1}:"
601 f" {doc.get('content', doc.get('snippet', ''))[:1000]}"
602 for i, doc in enumerate(results[:5])
603 ]
604 ) # Limit to first 5 docs and 1000 chars each
606 summary_prompt = f"""Analyze these document excerpts related to the query: "{query}"
608 {docs_text}
610 Provide a concise summary of the key information found in these documents related to the query.
611 """
613 import time
615 llm_start_time = time.time()
616 logger.info(
617 f"Starting LLM summary generation (prompt length: {len(summary_prompt)} chars)..."
618 )
620 summary_response = llm.invoke(summary_prompt)
622 llm_elapsed = time.time() - llm_start_time
623 logger.info(f"LLM summary generation completed in {llm_elapsed:.2f}s")
625 if hasattr(summary_response, "content"): 625 ↛ 628line 625 didn't jump to line 628 because the condition on line 625 was always true
626 summary = remove_think_tags(summary_response.content)
627 else:
628 summary = str(summary_response)
630 # Create result dictionary
631 analysis_result = {
632 "summary": summary,
633 "documents": results,
634 "collection": collection_name,
635 "document_count": len(results),
636 }
638 # Save to file if requested
639 if output_file:
640 from ..security.file_write_verifier import write_file_verified
642 content = f"# Document Analysis: {query}\n\n"
643 content += f"## Summary\n\n{summary}\n\n"
644 content += f"## Documents Found: {len(results)}\n\n"
646 for i, doc in enumerate(results):
647 content += (
648 f"### Document {i + 1}: {doc.get('title', 'Untitled')}\n\n"
649 )
650 content += f"**Source:** {doc.get('link', 'Unknown')}\n\n"
651 content += f"**Content:**\n\n{doc.get('content', doc.get('snippet', 'No content available'))[:1000]}...\n\n"
652 content += "---\n\n"
654 write_file_verified(
655 output_file,
656 content,
657 "api.allow_file_output",
658 context="API document analysis",
659 settings_snapshot=None, # analyze_documents doesn't support programmatic mode yet
660 )
662 analysis_result["file_path"] = output_file
663 logger.info(f"Analysis saved to {output_file}")
665 return analysis_result