Coverage for src / local_deep_research / report_generator.py: 98%
150 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import importlib
2from typing import Dict, List, Optional
3from datetime import datetime, UTC
5from langchain_core.language_models import BaseChatModel
6from loguru import logger
8# Fix circular import by importing directly from source modules
9from .config.llm_config import get_llm
10from .config.thread_settings import get_setting_from_snapshot
11from .search_system import AdvancedSearchSystem
12from .utilities import search_utilities
14# Default constants for context accumulation to avoid repetition
15# These are used as fallbacks when settings are not available
16DEFAULT_MAX_CONTEXT_SECTIONS = (
17 3 # Number of previous sections to include as context
18)
19DEFAULT_MAX_CONTEXT_CHARS = (
20 4000 # Max characters for context (safe for smaller local models)
21)
24def get_report_generator(search_system=None):
25 """Return an instance of the report generator with default settings.
27 Args:
28 search_system: Optional existing AdvancedSearchSystem to use
29 """
30 return IntegratedReportGenerator(search_system=search_system)
33class IntegratedReportGenerator:
34 def __init__(
35 self,
36 searches_per_section: int = 2,
37 search_system=None,
38 llm: BaseChatModel | None = None,
39 settings_snapshot: Optional[Dict] = None,
40 ):
41 """
42 Args:
43 searches_per_section: Number of searches to perform for each
44 section in the report.
45 search_system: Custom search system to use, otherwise just uses
46 the default.
47 llm: Custom LLM to use. Required if search_system is not provided.
48 settings_snapshot: Optional settings snapshot for configurable values.
50 """
51 # If search_system is provided, use its LLM; otherwise use the provided LLM
52 if search_system:
53 self.search_system = search_system
54 self.model = llm or search_system.model
55 elif llm:
56 self.model = llm
57 self.search_system = AdvancedSearchSystem(llm=self.model)
58 else:
59 # Fallback for backwards compatibility - will only work with auth
60 self.model = get_llm()
61 self.search_system = AdvancedSearchSystem(llm=self.model)
63 self.searches_per_section = (
64 searches_per_section # Control search depth per section
65 )
67 # Load context settings from snapshot or use defaults
68 self.max_context_sections = get_setting_from_snapshot(
69 "report.max_context_sections",
70 default=DEFAULT_MAX_CONTEXT_SECTIONS,
71 settings_snapshot=settings_snapshot,
72 )
73 self.max_context_chars = get_setting_from_snapshot(
74 "report.max_context_chars",
75 default=DEFAULT_MAX_CONTEXT_CHARS,
76 settings_snapshot=settings_snapshot,
77 )
79 def generate_report(self, initial_findings: Dict, query: str) -> Dict:
80 """Generate a complete research report with section-specific research."""
82 # Step 1: Determine structure
83 structure = self._determine_report_structure(initial_findings, query)
85 # Step 2: Research and generate content for each section in one step
86 sections = self._research_and_generate_sections(
87 initial_findings, structure, query
88 )
90 # Step 3: Format final report
91 report = self._format_final_report(sections, structure, query)
93 return report
95 def _determine_report_structure(
96 self, findings: Dict, query: str
97 ) -> List[Dict]:
98 """Analyze content and determine optimal report structure."""
99 combined_content = findings["current_knowledge"]
100 prompt = f"""
101 Analyze this research content about: {query}
103 Content Summary:
104 {combined_content[:1000]}... [truncated]
106 Determine the most appropriate report structure by:
107 1. Analyzing the type of content (technical, business, academic, etc.)
108 2. Identifying main themes and logical groupings
109 3. Considering the depth and breadth of the research
111 Return a table of contents structure in this exact format:
112 STRUCTURE
113 1. [Section Name]
114 - [Subsection] | [purpose]
115 2. [Section Name]
116 - [Subsection] | [purpose]
117 ...
118 END_STRUCTURE
120 Make the structure specific to the content, not generic.
121 Each subsection must include its purpose after the | symbol.
122 DO NOT include sections about sources, citations, references, or methodology.
123 """
125 response = search_utilities.remove_think_tags(
126 self.model.invoke(prompt).content
127 )
129 # Parse the structure
130 structure = []
131 current_section = None
133 for line in response.split("\n"):
134 if line.strip() in ["STRUCTURE", "END_STRUCTURE"]:
135 continue
137 if line.strip().startswith(tuple("123456789")):
138 # Main section
139 section_name = line.split(".")[1].strip()
140 current_section = {"name": section_name, "subsections": []}
141 structure.append(current_section)
142 elif line.strip().startswith("-") and current_section:
143 # Subsection with or without purpose
144 parts = line.strip("- ").split(
145 "|", 1
146 ) # Only split on first pipe
147 if len(parts) == 2:
148 current_section["subsections"].append(
149 {"name": parts[0].strip(), "purpose": parts[1].strip()}
150 )
151 elif len(parts) == 1 and parts[0].strip(): 151 ↛ 133line 151 didn't jump to line 133 because the condition on line 151 was always true
152 # Subsection without purpose - add default
153 current_section["subsections"].append(
154 {
155 "name": parts[0].strip(),
156 "purpose": f"Provide detailed information about {parts[0].strip()}",
157 }
158 )
160 # Check if the last section is source-related and remove it
161 if structure:
162 last_section = structure[-1]
163 section_name_lower = last_section["name"].lower()
164 source_keywords = [
165 "source",
166 "citation",
167 "reference",
168 "bibliography",
169 ]
171 # Only check the last section for source-related content
172 if any(
173 keyword in section_name_lower for keyword in source_keywords
174 ):
175 logger.info(
176 f"Removed source-related last section: {last_section['name']}"
177 )
178 structure = structure[:-1]
180 return structure
182 def _truncate_at_sentence_boundary(self, text: str, max_chars: int) -> str:
183 """Truncate text at a sentence boundary to preserve readability.
185 Attempts to cut at the last sentence-ending punctuation (.!?) before
186 the limit. If no suitable boundary is found within 80% of the limit,
187 falls back to hard truncation.
189 Args:
190 text: The text to truncate
191 max_chars: Maximum characters allowed
193 Returns:
194 Truncated text with [...truncated] marker if truncation occurred
195 """
196 if len(text) <= max_chars:
197 return text
199 truncated = text[:max_chars]
201 # Look for sentence boundaries (. ! ?) followed by space or newline
202 # Search backwards from the end for the last complete sentence
203 last_sentence_end = -1
204 for i in range(len(truncated) - 1, -1, -1):
205 if truncated[i] in ".!?" and (
206 i + 1 >= len(truncated) or truncated[i + 1] in " \n"
207 ):
208 last_sentence_end = i + 1
209 break
211 # Only use sentence boundary if it preserves at least 80% of content
212 min_acceptable = int(max_chars * 0.8)
213 if last_sentence_end > min_acceptable:
214 return truncated[:last_sentence_end] + "\n[...truncated]"
216 # Fallback to hard truncation
217 return truncated + "\n[...truncated]"
219 def _build_previous_context(self, accumulated_findings: List[str]) -> str:
220 """Build context block from previously generated sections.
222 Creates a formatted context block containing content from the last
223 N sections (defined by self.max_context_sections) with explicit instructions
224 not to repeat this content. Context is truncated if it exceeds
225 self.max_context_chars to stay safe for smaller local models.
227 Args:
228 accumulated_findings: List of previously generated section content,
229 each formatted as "[Section > Subsection]\\n{content}"
231 Returns:
232 Formatted context block with delimiters, or empty string if no
233 previous findings exist
234 """
235 if not accumulated_findings:
236 return ""
238 recent_findings = accumulated_findings[-self.max_context_sections :]
239 previous_context = "\n\n---\n\n".join(recent_findings)
241 # Truncate at sentence boundary if too long
242 if len(previous_context) > self.max_context_chars:
243 previous_context = self._truncate_at_sentence_boundary(
244 previous_context, self.max_context_chars
245 )
247 return (
248 f"\n\n=== CONTENT ALREADY WRITTEN (DO NOT REPEAT) ===\n"
249 f"{previous_context}\n"
250 f"=== END OF PREVIOUS CONTENT ===\n\n"
251 f"CRITICAL: The above content has already been written. Do NOT repeat "
252 f"these points, examples, or explanations. Focus on NEW information "
253 f"not covered above.\n"
254 )
256 def _research_and_generate_sections(
257 self,
258 initial_findings: Dict,
259 structure: List[Dict],
260 query: str,
261 ) -> Dict[str, str]:
262 """Research and generate content for each section in one step.
264 This method processes sections sequentially, accumulating generated
265 content as it goes. For each new section/subsection, it passes context
266 from the last few previously generated sections to help the LLM avoid
267 repetition.
269 The context accumulation mechanism:
270 - Tracks all generated content in accumulated_findings list
271 - Before generating each section, builds context from recent findings
272 - Uses self.max_context_sections (configurable, default: 3) to limit context size
273 - Truncates context to self.max_context_chars (configurable, default: 4000) for safety
274 - Includes explicit "DO NOT REPEAT" instructions with actual content
276 Args:
277 initial_findings: Results from initial research phase, may contain
278 questions_by_iteration to preserve search continuity
279 structure: List of section definitions, each with name and subsections
280 query: Original user query for context
282 Returns:
283 Dict mapping section names to their generated markdown content
284 """
285 sections = {}
287 # Accumulate content from previous sections to avoid repetition
288 accumulated_findings: List[str] = []
290 # Preserve questions from initial research to avoid repetition
291 # This follows the same pattern as citation tracking (all_links_of_system)
292 existing_questions = initial_findings.get("questions_by_iteration", {})
293 if existing_questions:
294 # Set questions on both search system and its strategy
295 if hasattr(self.search_system, "questions_by_iteration"): 295 ↛ 301line 295 didn't jump to line 301 because the condition on line 295 was always true
296 self.search_system.questions_by_iteration = (
297 existing_questions.copy()
298 )
300 # More importantly, set it on the strategy which actually uses it
301 if hasattr(self.search_system, "strategy") and hasattr( 301 ↛ 311line 301 didn't jump to line 311 because the condition on line 301 was always true
302 self.search_system.strategy, "questions_by_iteration"
303 ):
304 self.search_system.strategy.questions_by_iteration = (
305 existing_questions.copy()
306 )
307 logger.info(
308 f"Initialized strategy with {len(existing_questions)} iterations of previous questions"
309 )
311 for section in structure:
312 logger.info(f"Processing section: {section['name']}")
313 section_content = []
315 section_content.append(f"# {section['name']}\n")
317 # If section has no subsections, create one from the section itself
318 if not section["subsections"]:
319 # Parse section name for purpose
320 if "|" in section["name"]:
321 parts = section["name"].split("|", 1)
322 section["subsections"] = [
323 {"name": parts[0].strip(), "purpose": parts[1].strip()}
324 ]
325 else:
326 # No purpose provided - use section name as subsection
327 section["subsections"] = [
328 {
329 "name": section["name"],
330 "purpose": f"Provide comprehensive content for {section['name']}",
331 }
332 ]
334 # Process each subsection by directly researching it
335 for subsection in section["subsections"]:
336 # Only add subsection header if there are multiple subsections
337 if len(section["subsections"]) > 1:
338 section_content.append(f"## {subsection['name']}\n")
339 section_content.append(f"_{subsection['purpose']}_\n\n")
341 # Get other subsections in this section for context
342 other_subsections = [
343 f"- {s['name']}: {s['purpose']}"
344 for s in section["subsections"]
345 if s["name"] != subsection["name"]
346 ]
347 other_subsections_text = (
348 "\n".join(other_subsections)
349 if other_subsections
350 else "None"
351 )
353 # Get all other sections for broader context
354 other_sections = [
355 f"- {s['name']}"
356 for s in structure
357 if s["name"] != section["name"]
358 ]
359 other_sections_text = (
360 "\n".join(other_sections) if other_sections else "None"
361 )
363 # Check if this is actually a section-level content (only one subsection, likely auto-created)
364 is_section_level = len(section["subsections"]) == 1
366 # Build context from previously generated sections to avoid repetition
367 previous_context_section = self._build_previous_context(
368 accumulated_findings
369 )
371 # Generate appropriate search query
372 if is_section_level:
373 # Section-level prompt - more comprehensive
374 subsection_query = (
375 f"Research task: Create comprehensive content for the '{subsection['name']}' section in a report about '{query}'. "
376 f"Section purpose: {subsection['purpose']} "
377 f"\n"
378 f"Other sections in the report:\n{other_sections_text}\n"
379 f"{previous_context_section}"
380 f"This is a standalone section requiring comprehensive coverage of its topic. "
381 f"Provide a thorough exploration that may include synthesis of information from previous sections where relevant. "
382 f"Include unique insights, specific examples, and concrete data. "
383 f"Use tables to organize information where applicable. "
384 f"For conclusion sections: synthesize key findings and provide forward-looking insights. "
385 f"Build upon the research findings from earlier sections to create a cohesive narrative."
386 )
387 else:
388 # Subsection-level prompt - more focused
389 subsection_query = (
390 f"Research task: Create content for subsection '{subsection['name']}' in a report about '{query}'. "
391 f"This subsection's purpose: {subsection['purpose']} "
392 f"Part of section: '{section['name']}' "
393 f"\n"
394 f"Other sections in the report:\n{other_sections_text}\n"
395 f"\n"
396 f"Other subsections in this section will cover:\n{other_subsections_text}\n"
397 f"{previous_context_section}"
398 f"Focus ONLY on information specific to your subsection's purpose. "
399 f"Include unique details, specific examples, and concrete data. "
400 f"Use tables to organize information where applicable. "
401 f"IMPORTANT: Avoid repeating information that would logically be covered in other sections - focus on what makes this subsection unique. "
402 f"Previous research exists - find specific angles for this subsection."
403 )
405 logger.info(
406 f"Researching subsection: {subsection['name']} with query: {subsection_query}"
407 )
409 # Configure search system for focused search
410 original_max_iterations = self.search_system.max_iterations
411 self.search_system.max_iterations = 1 # Keep search focused
413 # Perform search for this subsection
414 subsection_results = self.search_system.analyze_topic(
415 subsection_query
416 )
418 # Restore original iterations setting
419 self.search_system.max_iterations = original_max_iterations
421 # Add the researched content for this subsection
422 if subsection_results.get("current_knowledge"):
423 generated_content = subsection_results["current_knowledge"]
424 section_content.append(generated_content)
425 # Accumulate for context in subsequent sections
426 accumulated_findings.append(
427 f"[{section['name']} > {subsection['name']}]\n{generated_content}"
428 )
429 else:
430 section_content.append(
431 "*Limited information was found for this subsection.*\n"
432 )
434 section_content.append("\n\n")
436 # Combine all content for this section
437 sections[section["name"]] = "\n".join(section_content)
439 return sections
441 def _generate_sections(
442 self,
443 initial_findings: Dict,
444 _section_research: Dict[str, List[Dict]],
445 structure: List[Dict],
446 query: str,
447 ) -> Dict[str, str]:
448 """
449 This method is kept for compatibility but no longer used.
450 The functionality has been moved to _research_and_generate_sections.
451 """
452 return {}
454 def _format_final_report(
455 self,
456 sections: Dict[str, str],
457 structure: List[Dict],
458 query: str,
459 ) -> Dict:
460 """Format the final report with table of contents and sections."""
461 # Generate TOC
462 toc = ["# Table of Contents\n"]
463 for i, section in enumerate(structure, 1):
464 toc.append(f"{i}. **{section['name']}**")
465 for j, subsection in enumerate(section["subsections"], 1):
466 toc.append(
467 f" {i}.{j} {subsection['name']} | _{subsection['purpose']}_"
468 )
470 # Combine TOC and sections
471 report_parts = ["\n".join(toc), ""]
473 # Add a summary of the research
474 report_parts.append("# Research Summary")
475 report_parts.append(
476 "This report was researched using an advanced search system."
477 )
478 report_parts.append(
479 "Research included targeted searches for each section and subsection."
480 )
481 report_parts.append("\n---\n")
483 # Add each section's content
484 for section in structure:
485 if section["name"] in sections: 485 ↛ 484line 485 didn't jump to line 484 because the condition on line 485 was always true
486 report_parts.append(sections[section["name"]])
487 report_parts.append("")
489 # Format links from search system
490 # Get utilities module dynamically to avoid circular imports
491 utilities = importlib.import_module("local_deep_research.utilities")
492 formatted_all_links = (
493 utilities.search_utilities.format_links_to_markdown(
494 all_links=self.search_system.all_links_of_system
495 )
496 )
498 # Create final report with all parts
499 final_report_content = "\n\n".join(report_parts)
500 final_report_content = (
501 final_report_content + "\n\n## Sources\n\n" + formatted_all_links
502 )
504 # Create metadata dictionary
505 metadata = {
506 "generated_at": datetime.now(UTC).isoformat(),
507 "initial_sources": len(self.search_system.all_links_of_system),
508 "sections_researched": len(structure),
509 "searches_per_section": self.searches_per_section,
510 "query": query,
511 }
513 # Return both content and metadata
514 return {"content": final_report_content, "metadata": metadata}
516 def _generate_error_report(self, query: str, error_msg: str) -> str:
517 error_report = (
518 f"=== ERROR REPORT ===\nQuery: {query}\nError: {error_msg}"
519 )
520 return error_report