Coverage for src / local_deep_research / report_generator.py: 98%
176 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import importlib
2from typing import Any, Dict, List, Optional
3from datetime import datetime, UTC
5from langchain_core.language_models import BaseChatModel
6from loguru import logger
8# Fix circular import by importing directly from source modules
9from .config.llm_config import get_llm
10from .config.thread_settings import get_setting_from_snapshot
11from .search_system import AdvancedSearchSystem
12from .utilities import search_utilities
14# Default constants for context accumulation to avoid repetition
15# These are used as fallbacks when settings are not available
16DEFAULT_MAX_CONTEXT_SECTIONS = (
17 3 # Number of previous sections to include as context
18)
19DEFAULT_MAX_CONTEXT_CHARS = (
20 4000 # Max characters for context (safe for smaller local models)
21)
24def get_report_generator(search_system=None):
25 """Return an instance of the report generator with default settings.
27 Args:
28 search_system: Optional existing AdvancedSearchSystem to use
29 """
30 return IntegratedReportGenerator(search_system=search_system)
33class IntegratedReportGenerator:
34 def __init__(
35 self,
36 searches_per_section: int = 2,
37 search_system=None,
38 llm: BaseChatModel | None = None,
39 settings_snapshot: Optional[Dict] = None,
40 ):
41 """
42 Args:
43 searches_per_section: Number of searches to perform for each
44 section in the report.
45 search_system: Custom search system to use, otherwise just uses
46 the default.
47 llm: Custom LLM to use. Required if search_system is not provided.
48 settings_snapshot: Optional settings snapshot for configurable values.
50 """
51 # If search_system is provided, use its LLM; otherwise use the provided LLM
52 self._owns_llm = False
53 if search_system:
54 self.search_system = search_system
55 self.model = llm or search_system.model
56 elif llm:
57 self.model = llm
58 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg]
59 else:
60 # Fallback for backwards compatibility - will only work with auth
61 self._owns_llm = True
62 self.model = get_llm()
63 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg]
65 self.searches_per_section = (
66 searches_per_section # Control search depth per section
67 )
69 # Load context settings from snapshot or use defaults
70 self.max_context_sections = get_setting_from_snapshot(
71 "report.max_context_sections",
72 default=DEFAULT_MAX_CONTEXT_SECTIONS,
73 settings_snapshot=settings_snapshot,
74 )
75 self.max_context_chars = get_setting_from_snapshot(
76 "report.max_context_chars",
77 default=DEFAULT_MAX_CONTEXT_CHARS,
78 settings_snapshot=settings_snapshot,
79 )
81 def close(self) -> None:
82 """Close the LLM client if this instance created it."""
83 from .utilities.resource_utils import safe_close
85 if self._owns_llm:
86 safe_close(self.model, "report generator LLM")
88 def generate_report(
89 self,
90 initial_findings: Dict,
91 query: str,
92 progress_callback=None,
93 ) -> Dict:
94 """Generate a complete research report with section-specific research.
96 Args:
97 initial_findings: Results from initial research phase.
98 query: Original user query.
99 progress_callback: Optional callable(message, progress_percent, metadata)
100 for reporting progress (0-100%) and checking cancellation.
101 """
103 # Step 1: Determine structure
104 if progress_callback:
105 progress_callback(
106 "Determining report structure",
107 0,
108 {"phase": "report_structure"},
109 )
110 structure = self._determine_report_structure(initial_findings, query)
112 # Step 2: Research and generate content for each section in one step
113 sections = self._research_and_generate_sections(
114 initial_findings,
115 structure,
116 query,
117 progress_callback=progress_callback,
118 )
120 # Step 3: Format final report
121 if progress_callback:
122 progress_callback(
123 "Formatting final report",
124 90,
125 {"phase": "report_formatting"},
126 )
127 report = self._format_final_report(sections, structure, query)
129 if progress_callback:
130 progress_callback(
131 "Report complete", 100, {"phase": "report_complete"}
132 )
134 return report
136 def _determine_report_structure(
137 self, findings: Dict, query: str
138 ) -> List[Dict]:
139 """Analyze content and determine optimal report structure."""
140 combined_content = findings["current_knowledge"]
141 prompt = f"""
142 Analyze this research content about: {query}
144 Content Summary:
145 {combined_content[:1000]}... [truncated]
147 Determine the most appropriate report structure by:
148 1. Analyzing the type of content (technical, business, academic, etc.)
149 2. Identifying main themes and logical groupings
150 3. Considering the depth and breadth of the research
152 Return a table of contents structure in this exact format:
153 STRUCTURE
154 1. [Section Name]
155 - [Subsection] | [purpose]
156 2. [Section Name]
157 - [Subsection] | [purpose]
158 ...
159 END_STRUCTURE
161 Make the structure specific to the content, not generic.
162 Each subsection must include its purpose after the | symbol.
163 DO NOT include sections about sources, citations, references, or methodology.
164 """
166 response = search_utilities.remove_think_tags(
167 str(self.model.invoke(prompt).content)
168 )
170 # Parse the structure
171 structure: List[Dict[str, Any]] = []
172 current_section: Optional[Dict[str, Any]] = None
174 for line in response.split("\n"):
175 if line.strip() in ["STRUCTURE", "END_STRUCTURE"]:
176 continue
178 if line.strip().startswith(tuple("123456789")):
179 # Main section
180 section_name = line.split(".")[1].strip()
181 current_section = {"name": section_name, "subsections": []}
182 structure.append(current_section)
183 elif line.strip().startswith("-") and current_section:
184 # Subsection with or without purpose
185 parts = line.strip("- ").split(
186 "|", 1
187 ) # Only split on first pipe
188 if len(parts) == 2:
189 current_section["subsections"].append(
190 {"name": parts[0].strip(), "purpose": parts[1].strip()}
191 )
192 elif len(parts) == 1 and parts[0].strip(): 192 ↛ 174line 192 didn't jump to line 174 because the condition on line 192 was always true
193 # Subsection without purpose - add default
194 current_section["subsections"].append(
195 {
196 "name": parts[0].strip(),
197 "purpose": f"Provide detailed information about {parts[0].strip()}",
198 }
199 )
201 # Check if the last section is source-related and remove it
202 if structure:
203 last_section = structure[-1]
204 section_name_lower = last_section["name"].lower()
205 source_keywords = [
206 "source",
207 "citation",
208 "reference",
209 "bibliography",
210 ]
212 # Only check the last section for source-related content
213 if any(
214 keyword in section_name_lower for keyword in source_keywords
215 ):
216 logger.info(
217 f"Removed source-related last section: {last_section['name']}"
218 )
219 structure = structure[:-1]
221 return structure
223 def _truncate_at_sentence_boundary(self, text: str, max_chars: int) -> str:
224 """Truncate text at a sentence boundary to preserve readability.
226 Attempts to cut at the last sentence-ending punctuation (.!?) before
227 the limit. If no suitable boundary is found within 80% of the limit,
228 falls back to hard truncation.
230 Args:
231 text: The text to truncate
232 max_chars: Maximum characters allowed
234 Returns:
235 Truncated text with [...truncated] marker if truncation occurred
236 """
237 if len(text) <= max_chars:
238 return text
240 truncated = text[:max_chars]
242 # Look for sentence boundaries (. ! ?) followed by space or newline
243 # Search backwards from the end for the last complete sentence
244 last_sentence_end = -1
245 for i in range(len(truncated) - 1, -1, -1):
246 if truncated[i] in ".!?" and (
247 i + 1 >= len(truncated) or truncated[i + 1] in " \n"
248 ):
249 last_sentence_end = i + 1
250 break
252 # Only use sentence boundary if it preserves at least 80% of content
253 min_acceptable = int(max_chars * 0.8)
254 if last_sentence_end > min_acceptable:
255 return truncated[:last_sentence_end] + "\n[...truncated]"
257 # Fallback to hard truncation
258 return truncated + "\n[...truncated]"
260 def _build_previous_context(self, accumulated_findings: List[str]) -> str:
261 """Build context block from previously generated sections.
263 Creates a formatted context block containing content from the last
264 N sections (defined by self.max_context_sections) with explicit instructions
265 not to repeat this content. Context is truncated if it exceeds
266 self.max_context_chars to stay safe for smaller local models.
268 Args:
269 accumulated_findings: List of previously generated section content,
270 each formatted as "[Section > Subsection]\\n{content}"
272 Returns:
273 Formatted context block with delimiters, or empty string if no
274 previous findings exist
275 """
276 if not accumulated_findings:
277 return ""
279 recent_findings = accumulated_findings[-self.max_context_sections :]
280 previous_context = "\n\n---\n\n".join(recent_findings)
282 # Truncate at sentence boundary if too long
283 if len(previous_context) > self.max_context_chars:
284 previous_context = self._truncate_at_sentence_boundary(
285 previous_context, self.max_context_chars
286 )
288 return (
289 f"\n\n=== CONTENT ALREADY WRITTEN (DO NOT REPEAT) ===\n"
290 f"{previous_context}\n"
291 f"=== END OF PREVIOUS CONTENT ===\n\n"
292 f"CRITICAL: The above content has already been written. Do NOT repeat "
293 f"these points, examples, or explanations. Focus on NEW information "
294 f"not covered above.\n"
295 )
297 def _research_and_generate_sections(
298 self,
299 initial_findings: Dict,
300 structure: List[Dict],
301 query: str,
302 progress_callback=None,
303 ) -> Dict[str, str]:
304 """Research and generate content for each section in one step.
306 This method processes sections sequentially, accumulating generated
307 content as it goes. For each new section/subsection, it passes context
308 from the last few previously generated sections to help the LLM avoid
309 repetition.
311 The context accumulation mechanism:
312 - Tracks all generated content in accumulated_findings list
313 - Before generating each section, builds context from recent findings
314 - Uses self.max_context_sections (configurable, default: 3) to limit context size
315 - Truncates context to self.max_context_chars (configurable, default: 4000) for safety
316 - Includes explicit "DO NOT REPEAT" instructions with actual content
318 Args:
319 initial_findings: Results from initial research phase, may contain
320 questions_by_iteration to preserve search continuity
321 structure: List of section definitions, each with name and subsections
322 query: Original user query for context
324 Returns:
325 Dict mapping section names to their generated markdown content
326 """
327 sections = {}
329 # Accumulate content from previous sections to avoid repetition
330 accumulated_findings: List[str] = []
332 # Count total subsections for progress tracking
333 total_subsections = sum(
334 max(len(section.get("subsections", [])), 1) for section in structure
335 )
336 completed_subsections = 0
338 # Preserve questions from initial research to avoid repetition
339 # This follows the same pattern as citation tracking (all_links_of_system)
340 existing_questions = initial_findings.get("questions_by_iteration", {})
341 if existing_questions:
342 # Set questions on both search system and its strategy
343 if hasattr(self.search_system, "questions_by_iteration"): 343 ↛ 349line 343 didn't jump to line 349 because the condition on line 343 was always true
344 self.search_system.questions_by_iteration = (
345 existing_questions.copy()
346 )
348 # More importantly, set it on the strategy which actually uses it
349 if hasattr(self.search_system, "strategy") and hasattr( 349 ↛ 359line 349 didn't jump to line 359 because the condition on line 349 was always true
350 self.search_system.strategy, "questions_by_iteration"
351 ):
352 self.search_system.strategy.questions_by_iteration = (
353 existing_questions.copy()
354 )
355 logger.info(
356 f"Initialized strategy with {len(existing_questions)} iterations of previous questions"
357 )
359 for section in structure:
360 logger.info(f"Processing section: {section['name']}")
361 section_content = []
363 section_content.append(f"# {section['name']}\n")
365 # If section has no subsections, create one from the section itself
366 if not section["subsections"]:
367 # Parse section name for purpose
368 if "|" in section["name"]:
369 parts = section["name"].split("|", 1)
370 section["subsections"] = [
371 {"name": parts[0].strip(), "purpose": parts[1].strip()}
372 ]
373 else:
374 # No purpose provided - use section name as subsection
375 section["subsections"] = [
376 {
377 "name": section["name"],
378 "purpose": f"Provide comprehensive content for {section['name']}",
379 }
380 ]
382 # Process each subsection by directly researching it
383 for subsection in section["subsections"]:
384 # Only add subsection header if there are multiple subsections
385 if len(section["subsections"]) > 1:
386 section_content.append(f"## {subsection['name']}\n")
387 section_content.append(f"_{subsection['purpose']}_\n\n")
389 # Get other subsections in this section for context
390 other_subsections = [
391 f"- {s['name']}: {s['purpose']}"
392 for s in section["subsections"]
393 if s["name"] != subsection["name"]
394 ]
395 other_subsections_text = (
396 "\n".join(other_subsections)
397 if other_subsections
398 else "None"
399 )
401 # Get all other sections for broader context
402 other_sections = [
403 f"- {s['name']}"
404 for s in structure
405 if s["name"] != section["name"]
406 ]
407 other_sections_text = (
408 "\n".join(other_sections) if other_sections else "None"
409 )
411 # Check if this is actually a section-level content (only one subsection, likely auto-created)
412 is_section_level = len(section["subsections"]) == 1
414 # Build context from previously generated sections to avoid repetition
415 previous_context_section = self._build_previous_context(
416 accumulated_findings
417 )
419 # Generate appropriate search query
420 if is_section_level:
421 # Section-level prompt - more comprehensive
422 subsection_query = (
423 f"Research task: Create comprehensive content for the '{subsection['name']}' section in a report about '{query}'. "
424 f"Section purpose: {subsection['purpose']} "
425 f"\n"
426 f"Other sections in the report:\n{other_sections_text}\n"
427 f"{previous_context_section}"
428 f"This is a standalone section requiring comprehensive coverage of its topic. "
429 f"Provide a thorough exploration that may include synthesis of information from previous sections where relevant. "
430 f"Include unique insights, specific examples, and concrete data. "
431 f"Use tables to organize information where applicable. "
432 f"For conclusion sections: synthesize key findings and provide forward-looking insights. "
433 f"Build upon the research findings from earlier sections to create a cohesive narrative."
434 )
435 else:
436 # Subsection-level prompt - more focused
437 subsection_query = (
438 f"Research task: Create content for subsection '{subsection['name']}' in a report about '{query}'. "
439 f"This subsection's purpose: {subsection['purpose']} "
440 f"Part of section: '{section['name']}' "
441 f"\n"
442 f"Other sections in the report:\n{other_sections_text}\n"
443 f"\n"
444 f"Other subsections in this section will cover:\n{other_subsections_text}\n"
445 f"{previous_context_section}"
446 f"Focus ONLY on information specific to your subsection's purpose. "
447 f"Include unique details, specific examples, and concrete data. "
448 f"Use tables to organize information where applicable. "
449 f"IMPORTANT: Avoid repeating information that would logically be covered in other sections - focus on what makes this subsection unique. "
450 f"Previous research exists - find specific angles for this subsection."
451 )
453 logger.info(
454 f"Researching subsection: {subsection['name']} with query: {subsection_query}"
455 )
457 # Report progress and check for cancellation
458 if progress_callback:
459 pct = int(
460 10
461 + (completed_subsections / max(total_subsections, 1))
462 * 80
463 )
464 progress_callback(
465 f"Researching: {section['name']} > {subsection['name']}",
466 pct,
467 {
468 "phase": "report_section_research",
469 "subsection": subsection["name"],
470 },
471 )
473 # Fix iteration override: modify strategy's settings_snapshot
474 # which is read dynamically via get_setting()
475 strategy = self.search_system.strategy
476 original_iterations = strategy.settings_snapshot.get(
477 "search.iterations"
478 )
479 had_iterations_key = (
480 "search.iterations" in strategy.settings_snapshot
481 )
482 strategy.settings_snapshot["search.iterations"] = 1
483 # Belt-and-suspenders: also override max_iterations for
484 # strategies that cache it at __init__ time
485 original_max_iter = getattr(strategy, "max_iterations", None)
486 strategy.max_iterations = 1
488 try:
489 # Perform search for this subsection
490 subsection_results = self.search_system.analyze_topic(
491 subsection_query
492 )
493 finally:
494 # Restore original iteration settings
495 if had_iterations_key:
496 strategy.settings_snapshot["search.iterations"] = (
497 original_iterations
498 )
499 else:
500 strategy.settings_snapshot.pop(
501 "search.iterations", None
502 )
503 if original_max_iter is not None: 503 ↛ 506line 503 didn't jump to line 506 because the condition on line 503 was always true
504 strategy.max_iterations = original_max_iter
506 completed_subsections += 1
508 # Add the researched content for this subsection
509 if subsection_results.get("current_knowledge"):
510 generated_content = subsection_results["current_knowledge"]
511 section_content.append(generated_content)
512 # Accumulate for context in subsequent sections
513 accumulated_findings.append(
514 f"[{section['name']} > {subsection['name']}]\n{generated_content}"
515 )
516 else:
517 section_content.append(
518 "*Limited information was found for this subsection.*\n"
519 )
521 section_content.append("\n\n")
523 # Combine all content for this section
524 sections[section["name"]] = "\n".join(section_content)
526 return sections
528 def _generate_sections(
529 self,
530 initial_findings: Dict,
531 _section_research: Dict[str, List[Dict]],
532 structure: List[Dict],
533 query: str,
534 ) -> Dict[str, str]:
535 """
536 This method is kept for compatibility but no longer used.
537 The functionality has been moved to _research_and_generate_sections.
538 """
539 return {}
541 def _format_final_report(
542 self,
543 sections: Dict[str, str],
544 structure: List[Dict],
545 query: str,
546 ) -> Dict:
547 """Format the final report with table of contents and sections."""
548 # Generate TOC
549 toc = ["# Table of Contents\n"]
550 for i, section in enumerate(structure, 1):
551 toc.append(f"{i}. **{section['name']}**")
552 for j, subsection in enumerate(section["subsections"], 1):
553 toc.append(
554 f" {i}.{j} {subsection['name']} | _{subsection['purpose']}_"
555 )
557 # Combine TOC and sections
558 report_parts = ["\n".join(toc), ""]
560 # Add a summary of the research
561 report_parts.append("# Research Summary")
562 report_parts.append(
563 "This report was researched using an advanced search system."
564 )
565 report_parts.append(
566 "Research included targeted searches for each section and subsection."
567 )
568 report_parts.append("\n---\n")
570 # Add each section's content
571 for section in structure:
572 if section["name"] in sections:
573 report_parts.append(sections[section["name"]])
574 report_parts.append("")
576 # Format links from search system
577 # Get utilities module dynamically to avoid circular imports
578 utilities = importlib.import_module("local_deep_research.utilities")
579 formatted_all_links = (
580 utilities.search_utilities.format_links_to_markdown(
581 all_links=self.search_system.all_links_of_system
582 )
583 )
585 # Create final report with all parts
586 final_report_content = "\n\n".join(report_parts)
587 final_report_content = (
588 final_report_content + "\n\n## Sources\n\n" + formatted_all_links
589 )
591 # Create metadata dictionary
592 metadata = {
593 "generated_at": datetime.now(UTC).isoformat(),
594 "initial_sources": len(self.search_system.all_links_of_system),
595 "sections_researched": len(structure),
596 "searches_per_section": self.searches_per_section,
597 "query": query,
598 }
600 # Return both content and metadata
601 return {"content": final_report_content, "metadata": metadata}
603 def _generate_error_report(self, query: str, error_msg: str) -> str:
604 return f"=== ERROR REPORT ===\nQuery: {query}\nError: {error_msg}"