Coverage for src / local_deep_research / api / research_functions.py: 66%
182 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2API module for Local Deep Research.
3Provides programmatic access to search and research capabilities.
4"""
6from datetime import datetime, UTC
7from typing import Any, Callable, Dict, Optional, Union
9from loguru import logger
10from local_deep_research.settings.logger import log_settings
12from ..config.llm_config import get_llm
13from ..config.search_config import get_search
14from ..config.thread_settings import get_setting_from_snapshot
15from ..report_generator import IntegratedReportGenerator
16from ..search_system import AdvancedSearchSystem
17from ..utilities.db_utils import no_db_settings
18from ..utilities.thread_context import set_search_context
19from ..utilities.search_utilities import remove_think_tags
20from .settings_utils import create_settings_snapshot
23def _init_search_system(
24 model_name: str | None = None,
25 temperature: float = 0.7,
26 provider: str | None = None,
27 openai_endpoint_url: str | None = None,
28 progress_callback: Callable[[str, int, dict], None] | None = None,
29 search_tool: Optional[str] = None,
30 search_strategy: str = "source_based",
31 iterations: int = 1,
32 questions_per_iteration: int = 1,
33 retrievers: Optional[Dict[str, Any]] = None,
34 llms: Optional[Dict[str, Any]] = None,
35 username: Optional[str] = None,
36 research_id: Optional[Union[int, str]] = None,
37 research_context: Optional[Dict[str, Any]] = None,
38 programmatic_mode: bool = True,
39 search_original_query: bool = True,
40 settings_snapshot: Optional[Dict[str, Any]] = None,
41 **kwargs: Any,
42) -> AdvancedSearchSystem:
43 """
44 Initializes the advanced search system with specified parameters. This function sets up
45 and returns an instance of the AdvancedSearchSystem using the provided configuration
46 options such as model name, temperature for randomness in responses, provider service
47 details, endpoint URL, and an optional search tool.
49 Args:
50 model_name: Name of the model to use (if None, uses database setting)
51 temperature: LLM temperature for generation
52 provider: Provider to use (if None, uses database setting)
53 openai_endpoint_url: Custom endpoint URL to use (if None, uses database
54 setting)
55 progress_callback: Optional callback function to receive progress updates
56 search_tool: Search engine to use (auto, wikipedia, arxiv, etc.). If None, uses default
57 search_strategy: Search strategy to use (modular, source_based, etc.). If None, uses default
58 iterations: Number of research cycles to perform
59 questions_per_iteration: Number of questions to generate per cycle
60 search_strategy: The name of the search strategy to use.
61 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
62 llms: Optional dictionary of {name: llm} pairs to use as language models
63 programmatic_mode: If True, disables database operations and metrics tracking
64 search_original_query: Whether to include the original query in the first iteration of search
66 Returns:
67 AdvancedSearchSystem: An instance of the configured AdvancedSearchSystem.
69 """
70 # Register retrievers if provided
71 if retrievers:
72 from ..web_search_engines.retriever_registry import retriever_registry
74 retriever_registry.register_multiple(retrievers)
75 logger.info(
76 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
77 )
79 # Register LLMs if provided
80 if llms:
81 from ..llm import register_llm
83 for name, llm_instance in llms.items():
84 register_llm(name, llm_instance)
85 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
87 # Use settings_snapshot from parameter, or fall back to kwargs
88 if settings_snapshot is None: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 settings_snapshot = kwargs.get("settings_snapshot")
91 # Get language model with custom temperature
92 llm = get_llm(
93 temperature=temperature,
94 openai_endpoint_url=openai_endpoint_url,
95 model_name=model_name,
96 provider=provider,
97 research_id=research_id,
98 research_context=research_context,
99 settings_snapshot=settings_snapshot,
100 )
102 # Set the search engine if specified or get from settings
103 search_engine = None
105 # If no search_tool provided, get from settings_snapshot
106 if not search_tool and settings_snapshot: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 search_tool = get_setting_from_snapshot(
108 "search.tool", settings_snapshot=settings_snapshot
109 )
111 if search_tool: 111 ↛ 125line 111 didn't jump to line 125 because the condition on line 111 was always true
112 search_engine = get_search(
113 search_tool,
114 llm_instance=llm,
115 username=username,
116 settings_snapshot=settings_snapshot,
117 programmatic_mode=programmatic_mode,
118 )
119 if search_engine is None:
120 logger.warning(
121 f"Could not create search engine '{search_tool}', using default."
122 )
124 # Create search system with custom parameters
125 logger.info("Search strategy: {}", search_strategy)
126 system = AdvancedSearchSystem(
127 llm=llm,
128 search=search_engine,
129 strategy_name=search_strategy,
130 username=username,
131 research_id=research_id,
132 research_context=research_context,
133 settings_snapshot=settings_snapshot,
134 programmatic_mode=programmatic_mode,
135 search_original_query=search_original_query,
136 )
138 # Override default settings with user-provided values
139 system.max_iterations = iterations
140 system.questions_per_iteration = questions_per_iteration
142 # Set progress callback if provided
143 if progress_callback:
144 system.set_progress_callback(progress_callback)
146 return system
149@no_db_settings
150def quick_summary(
151 query: str,
152 research_id: Optional[Union[int, str]] = None,
153 retrievers: Optional[Dict[str, Any]] = None,
154 llms: Optional[Dict[str, Any]] = None,
155 username: Optional[str] = None,
156 provider: Optional[str] = None,
157 api_key: Optional[str] = None,
158 temperature: Optional[float] = None,
159 max_search_results: Optional[int] = None,
160 settings: Optional[Dict[str, Any]] = None,
161 settings_override: Optional[Dict[str, Any]] = None,
162 search_original_query: bool = True,
163 **kwargs: Any,
164) -> Dict[str, Any]:
165 """
166 Generate a quick research summary for a given query.
168 Args:
169 query: The research query to analyze
170 research_id: Optional research ID (int or UUID string) for tracking metrics
171 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
172 llms: Optional dictionary of {name: llm} pairs to use as language models
173 provider: LLM provider to use (e.g., 'openai', 'anthropic'). For programmatic API only.
174 api_key: API key for the provider. For programmatic API only.
175 temperature: LLM temperature (0.0-1.0). For programmatic API only.
176 max_search_results: Maximum number of search results to return. For programmatic API only.
177 settings: Base settings dict to use instead of defaults. For programmatic API only.
178 settings_override: Dictionary of settings to override (e.g., {"llm.max_tokens": 4000}). For programmatic API only.
179 search_original_query: Whether to include the original query in the first iteration of search.
180 Set to False for news searches to avoid sending long subscription prompts to search engines.
181 **kwargs: Additional configuration for the search system. Will be forwarded to
182 `_init_search_system()`.
184 Returns:
185 Dictionary containing the research results with keys:
186 - 'summary': The generated summary text
187 - 'findings': List of detailed findings from each search
188 - 'iterations': Number of iterations performed
189 - 'questions': Questions generated during research
191 Examples:
192 # Simple usage with defaults
193 result = quick_summary("What is quantum computing?")
195 # With custom provider
196 result = quick_summary(
197 "What is quantum computing?",
198 provider="anthropic",
199 api_key="sk-ant-..."
200 )
202 # With advanced settings
203 result = quick_summary(
204 "What is quantum computing?",
205 temperature=0.2,
206 settings_override={"search.engines.arxiv.enabled": True}
207 )
208 """
209 logger.info("Generating quick summary for query: %s", query)
211 # Only create settings snapshot if not already provided (programmatic API)
212 if "settings_snapshot" not in kwargs:
213 # Build kwargs for create_settings_snapshot from explicit parameters
214 snapshot_kwargs = {}
215 if provider is not None:
216 snapshot_kwargs["provider"] = provider
217 if api_key is not None:
218 snapshot_kwargs["api_key"] = api_key
219 if temperature is not None:
220 snapshot_kwargs["temperature"] = temperature
221 if max_search_results is not None: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 snapshot_kwargs["max_search_results"] = max_search_results
224 # Create settings snapshot for programmatic use (only if not already provided)
225 if "settings_snapshot" not in kwargs: 225 ↛ 236line 225 didn't jump to line 236 because the condition on line 225 was always true
226 kwargs["settings_snapshot"] = create_settings_snapshot(
227 base_settings=settings,
228 overrides=settings_override,
229 **snapshot_kwargs,
230 )
231 log_settings(
232 kwargs["settings_snapshot"],
233 "Created settings snapshot for programmatic API",
234 )
235 else:
236 log_settings(
237 kwargs["settings_snapshot"],
238 "Using provided settings snapshot for programmatic API",
239 )
241 # Generate a research_id if none provided
242 if research_id is None:
243 import uuid
245 research_id = str(uuid.uuid4())
246 logger.debug(f"Generated research_id: {research_id}")
248 # Register retrievers if provided
249 if retrievers:
250 from ..web_search_engines.retriever_registry import retriever_registry
252 retriever_registry.register_multiple(retrievers)
253 logger.info(
254 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
255 )
257 # Register LLMs if provided
258 if llms:
259 from ..llm import register_llm
261 for name, llm_instance in llms.items():
262 register_llm(name, llm_instance)
263 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
265 search_context = {
266 "research_id": research_id, # Pass UUID or integer directly
267 "research_query": query,
268 "research_mode": kwargs.get("research_mode", "quick"),
269 "research_phase": "init",
270 "search_iteration": 0,
271 "search_engine_selected": kwargs.get("search_tool"),
272 "username": username, # Include username for metrics tracking
273 "user_password": kwargs.get(
274 "user_password"
275 ), # Include password for metrics tracking
276 }
277 set_search_context(search_context)
279 # Remove research_mode from kwargs before passing to _init_search_system
280 init_kwargs = {k: v for k, v in kwargs.items() if k != "research_mode"}
281 # Make sure username is passed to the system
282 init_kwargs["username"] = username
283 init_kwargs["research_id"] = research_id
284 init_kwargs["research_context"] = search_context
285 init_kwargs["search_original_query"] = search_original_query
286 system = _init_search_system(llms=llms, **init_kwargs)
288 # Perform the search and analysis
289 results = system.analyze_topic(query)
291 # Extract the summary from the current knowledge
292 if results and "current_knowledge" in results: 292 ↛ 295line 292 didn't jump to line 295 because the condition on line 292 was always true
293 summary = results["current_knowledge"]
294 else:
295 summary = "Unable to generate summary for the query."
297 # Prepare the return value
298 return {
299 "summary": summary,
300 "findings": results.get("findings", []),
301 "iterations": results.get("iterations", 0),
302 "questions": results.get("questions", {}),
303 "formatted_findings": results.get("formatted_findings", ""),
304 "sources": results.get("all_links_of_system", []),
305 }
308@no_db_settings
309def generate_report(
310 query: str,
311 output_file: Optional[str] = None,
312 progress_callback: Optional[Callable] = None,
313 searches_per_section: int = 2,
314 retrievers: Optional[Dict[str, Any]] = None,
315 llms: Optional[Dict[str, Any]] = None,
316 provider: Optional[str] = None,
317 api_key: Optional[str] = None,
318 temperature: Optional[float] = None,
319 max_search_results: Optional[int] = None,
320 settings: Optional[Dict[str, Any]] = None,
321 settings_override: Optional[Dict[str, Any]] = None,
322 **kwargs: Any,
323) -> Dict[str, Any]:
324 """
325 Generate a comprehensive, structured research report for a given query.
327 Args:
328 query: The research query to analyze
329 output_file: Optional path to save report markdown file
330 progress_callback: Optional callback function to receive progress updates
331 searches_per_section: The number of searches to perform for each
332 section in the report.
333 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
334 llms: Optional dictionary of {name: llm} pairs to use as language models
335 provider: LLM provider to use (e.g., 'openai', 'anthropic'). For programmatic API only.
336 api_key: API key for the provider. For programmatic API only.
337 temperature: LLM temperature (0.0-1.0). For programmatic API only.
338 max_search_results: Maximum number of search results to return. For programmatic API only.
339 settings: Base settings dict to use instead of defaults. For programmatic API only.
340 settings_override: Dictionary of settings to override. For programmatic API only.
341 **kwargs: Additional configuration for the search system.
343 Returns:
344 Dictionary containing the research report with keys:
345 - 'content': The full report content in markdown format
346 - 'metadata': Report metadata including generated timestamp and query
347 - 'file_path': Path to saved file (if output_file was provided)
349 Examples:
350 # Simple usage with settings snapshot
351 from local_deep_research.api.settings_utils import create_settings_snapshot
352 settings = create_settings_snapshot({"programmatic_mode": True})
353 result = generate_report("AI research", settings_snapshot=settings)
355 # Save to file
356 result = generate_report(
357 "AI research",
358 output_file="report.md",
359 settings_snapshot=settings
360 )
361 """
362 logger.info("Generating comprehensive research report for query: %s", query)
364 # Only create settings snapshot if not already provided (programmatic API)
365 if "settings_snapshot" not in kwargs:
366 # Build kwargs for create_settings_snapshot from explicit parameters
367 snapshot_kwargs = {}
368 if provider is not None:
369 snapshot_kwargs["provider"] = provider
370 if api_key is not None: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 snapshot_kwargs["api_key"] = api_key
372 if temperature is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 snapshot_kwargs["temperature"] = temperature
374 if max_search_results is not None: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true
375 snapshot_kwargs["max_search_results"] = max_search_results
377 # Create settings snapshot for programmatic use (only if not already provided)
378 if "settings_snapshot" not in kwargs: 378 ↛ 389line 378 didn't jump to line 389 because the condition on line 378 was always true
379 kwargs["settings_snapshot"] = create_settings_snapshot(
380 base_settings=settings,
381 overrides=settings_override,
382 **snapshot_kwargs,
383 )
384 log_settings(
385 kwargs["settings_snapshot"],
386 "Created settings snapshot for programmatic API",
387 )
388 else:
389 log_settings(
390 kwargs["settings_snapshot"],
391 "Using provided settings snapshot for programmatic API",
392 )
394 # Register retrievers if provided
395 if retrievers:
396 from ..web_search_engines.retriever_registry import retriever_registry
398 retriever_registry.register_multiple(retrievers)
399 logger.info(
400 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
401 )
403 # Register LLMs if provided
404 if llms:
405 from ..llm import register_llm
407 for name, llm_instance in llms.items():
408 register_llm(name, llm_instance)
409 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
411 system = _init_search_system(retrievers=retrievers, llms=llms, **kwargs)
413 # Set progress callback if provided
414 if progress_callback: 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true
415 system.set_progress_callback(progress_callback)
417 # Perform the initial research
418 initial_findings = system.analyze_topic(query)
420 # Generate the structured report
421 report_generator = IntegratedReportGenerator(
422 search_system=system,
423 llm=system.model,
424 searches_per_section=searches_per_section,
425 )
426 report = report_generator.generate_report(initial_findings, query)
428 # Save report to file if path is provided
429 if output_file and report and "content" in report: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true
430 from ..security.file_write_verifier import write_file_verified
432 write_file_verified(
433 output_file,
434 report["content"],
435 "api.allow_file_output",
436 context="API research report",
437 settings_snapshot=kwargs.get("settings_snapshot"),
438 )
439 logger.info(f"Report saved to {output_file}")
440 report["file_path"] = output_file
441 return report
444@no_db_settings
445def detailed_research(
446 query: str,
447 research_id: Optional[Union[int, str]] = None,
448 retrievers: Optional[Dict[str, Any]] = None,
449 llms: Optional[Dict[str, Any]] = None,
450 **kwargs: Any,
451) -> Dict[str, Any]:
452 """
453 Perform detailed research with comprehensive analysis.
455 Similar to generate_report but returns structured data instead of markdown.
457 Args:
458 query: The research query to analyze
459 research_id: Optional research ID (int or UUID string) for tracking metrics
460 retrievers: Optional dictionary of {name: retriever} pairs to use as search engines
461 llms: Optional dictionary of {name: llm} pairs to use as language models
462 **kwargs: Configuration for the search system
464 Returns:
465 Dictionary containing detailed research results
466 """
467 logger.info("Performing detailed research for query: %s", query)
469 # Generate a research_id if none provided
470 if research_id is None:
471 import uuid
473 research_id = str(uuid.uuid4())
474 logger.debug(f"Generated research_id: {research_id}")
476 # Register retrievers if provided
477 if retrievers:
478 from ..web_search_engines.retriever_registry import retriever_registry
480 retriever_registry.register_multiple(retrievers)
481 logger.info(
482 f"Registered {len(retrievers)} retrievers: {list(retrievers.keys())}"
483 )
485 # Register LLMs if provided
486 if llms:
487 from ..llm import register_llm
489 for name, llm_instance in llms.items():
490 register_llm(name, llm_instance)
491 logger.info(f"Registered {len(llms)} LLMs: {list(llms.keys())}")
493 search_context = {
494 "research_id": research_id,
495 "research_query": query,
496 "research_mode": "detailed",
497 "research_phase": "init",
498 "search_iteration": 0,
499 "search_engine_selected": kwargs.get("search_tool"),
500 }
501 set_search_context(search_context)
503 # Initialize system
504 system = _init_search_system(retrievers=retrievers, llms=llms, **kwargs)
506 # Perform detailed research
507 results = system.analyze_topic(query)
509 # Return comprehensive results
510 return {
511 "query": query,
512 "research_id": research_id,
513 "summary": results.get("current_knowledge", ""),
514 "findings": results.get("findings", []),
515 "iterations": results.get("iterations", 0),
516 "questions": results.get("questions", {}),
517 "formatted_findings": results.get("formatted_findings", ""),
518 "sources": results.get("all_links_of_system", []),
519 "metadata": {
520 "timestamp": datetime.now(UTC).isoformat(),
521 "search_tool": kwargs.get("search_tool", "auto"),
522 "iterations_requested": kwargs.get("iterations", 1),
523 "strategy": kwargs.get("search_strategy", "source_based"),
524 },
525 }
528@no_db_settings
529def analyze_documents(
530 query: str,
531 collection_name: str,
532 max_results: int = 10,
533 temperature: float = 0.7,
534 force_reindex: bool = False,
535 output_file: Optional[str] = None,
536) -> Dict[str, Any]:
537 """
538 Search and analyze documents in a specific local collection.
540 Args:
541 query: The search query
542 collection_name: Name of the local document collection to search
543 max_results: Maximum number of results to return
544 temperature: LLM temperature for summary generation
545 force_reindex: Whether to force reindexing the collection
546 output_file: Optional path to save analysis results to a file
548 Returns:
549 Dictionary containing:
550 - 'summary': Summary of the findings
551 - 'documents': List of matching documents with content and metadata
552 """
553 logger.info(
554 f"Analyzing documents in collection '{collection_name}' for query: {query}"
555 )
557 # Get language model with custom temperature
558 llm = get_llm(temperature=temperature)
560 # Get search engine for the specified collection
561 search = get_search(collection_name, llm_instance=llm)
563 if not search:
564 return {
565 "summary": f"Error: Collection '{collection_name}' not found or not properly configured.",
566 "documents": [],
567 }
569 # Set max results
570 search.max_results = max_results
572 # Force reindex if requested
573 if force_reindex and hasattr(search, "embedding_manager"):
574 for folder_path in search.folder_paths:
575 search.embedding_manager.index_folder(
576 folder_path, force_reindex=True
577 )
579 # Perform the search
580 results = search.run(query)
582 if not results:
583 return {
584 "summary": f"No documents found in collection '{collection_name}' for query: '{query}'",
585 "documents": [],
586 }
588 # Get LLM to generate a summary of the results
590 docs_text = "\n\n".join(
591 [
592 f"Document {i + 1}:"
593 f" {doc.get('content', doc.get('snippet', ''))[:1000]}"
594 for i, doc in enumerate(results[:5])
595 ]
596 ) # Limit to first 5 docs and 1000 chars each
598 summary_prompt = f"""Analyze these document excerpts related to the query: "{query}"
600 {docs_text}
602 Provide a concise summary of the key information found in these documents related to the query.
603 """
605 import time
607 llm_start_time = time.time()
608 logger.info(
609 f"Starting LLM summary generation (prompt length: {len(summary_prompt)} chars)..."
610 )
612 summary_response = llm.invoke(summary_prompt)
614 llm_elapsed = time.time() - llm_start_time
615 logger.info(f"LLM summary generation completed in {llm_elapsed:.2f}s")
617 if hasattr(summary_response, "content"):
618 summary = remove_think_tags(summary_response.content)
619 else:
620 summary = str(summary_response)
622 # Create result dictionary
623 analysis_result = {
624 "summary": summary,
625 "documents": results,
626 "collection": collection_name,
627 "document_count": len(results),
628 }
630 # Save to file if requested
631 if output_file:
632 from ..security.file_write_verifier import write_file_verified
634 content = f"# Document Analysis: {query}\n\n"
635 content += f"## Summary\n\n{summary}\n\n"
636 content += f"## Documents Found: {len(results)}\n\n"
638 for i, doc in enumerate(results):
639 content += (
640 f"### Document {i + 1}: {doc.get('title', 'Untitled')}\n\n"
641 )
642 content += f"**Source:** {doc.get('link', 'Unknown')}\n\n"
643 content += f"**Content:**\n\n{doc.get('content', doc.get('snippet', 'No content available'))[:1000]}...\n\n"
644 content += "---\n\n"
646 write_file_verified(
647 output_file,
648 content,
649 "api.allow_file_output",
650 context="API document analysis",
651 settings_snapshot=None, # analyze_documents doesn't support programmatic mode yet
652 )
654 analysis_result["file_path"] = output_file
655 logger.info(f"Analysis saved to {output_file}")
657 return analysis_result