Coverage for src/local_deep_research/report_generator.py: 98%
180 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import importlib
2from typing import Any, Dict, List, Optional
3from datetime import datetime, UTC
5from langchain_core.language_models import BaseChatModel
6from loguru import logger
8# Fix circular import by importing directly from source modules
9from .config.llm_config import get_llm
10from .config.thread_settings import get_setting_from_snapshot
11from .search_system import AdvancedSearchSystem
12from .utilities.json_utils import get_llm_response_text
14# Default constants for context accumulation to avoid repetition
15# These are used as fallbacks when settings are not available
16DEFAULT_MAX_CONTEXT_SECTIONS = (
17 3 # Number of previous sections to include as context
18)
19DEFAULT_MAX_CONTEXT_CHARS = (
20 4000 # Max characters for context (safe for smaller local models)
21)
24def get_report_generator(search_system=None):
25 """Return an instance of the report generator with default settings.
27 Args:
28 search_system: Optional existing AdvancedSearchSystem to use
29 """
30 return IntegratedReportGenerator(search_system=search_system)
33class IntegratedReportGenerator:
34 def __init__(
35 self,
36 searches_per_section: int = 2,
37 search_system=None,
38 llm: BaseChatModel | None = None,
39 settings_snapshot: Optional[Dict] = None,
40 ):
41 """
42 Args:
43 searches_per_section: Number of searches to perform for each
44 section in the report.
45 search_system: Custom search system to use, otherwise just uses
46 the default.
47 llm: Custom LLM to use. Required if search_system is not provided.
48 settings_snapshot: Optional settings snapshot for configurable values.
50 """
51 # If search_system is provided, use its LLM; otherwise use the provided LLM
52 self._owns_llm = False
53 if search_system:
54 self.search_system = search_system
55 self.model = llm or search_system.model
56 elif llm:
57 self.model = llm
58 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg]
59 else:
60 # Fallback for backwards compatibility - will only work with auth
61 self._owns_llm = True
62 self.model = get_llm()
63 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg]
65 self.searches_per_section = (
66 searches_per_section # Control search depth per section
67 )
69 # Load context settings from snapshot or use defaults
70 self.max_context_sections = get_setting_from_snapshot(
71 "report.max_context_sections",
72 default=DEFAULT_MAX_CONTEXT_SECTIONS,
73 settings_snapshot=settings_snapshot,
74 )
75 self.max_context_chars = get_setting_from_snapshot(
76 "report.max_context_chars",
77 default=DEFAULT_MAX_CONTEXT_CHARS,
78 settings_snapshot=settings_snapshot,
79 )
81 def close(self) -> None:
82 """Close the LLM client if this instance created it."""
83 from .utilities.resource_utils import safe_close
85 if self._owns_llm:
86 safe_close(self.model, "report generator LLM")
88 def generate_report(
89 self,
90 initial_findings: Dict,
91 query: str,
92 progress_callback=None,
93 ) -> Dict:
94 """Generate a complete research report with section-specific research.
96 Args:
97 initial_findings: Results from initial research phase.
98 query: Original user query.
99 progress_callback: Optional callable(message, progress_percent, metadata)
100 for reporting progress (0-100%) and checking cancellation.
101 """
103 # Step 1: Determine structure
104 if progress_callback:
105 progress_callback(
106 "Determining report structure",
107 0,
108 {"phase": "report_structure"},
109 )
110 structure = self._determine_report_structure(initial_findings, query)
112 # Step 2: Research and generate content for each section in one step
113 sections = self._research_and_generate_sections(
114 initial_findings,
115 structure,
116 query,
117 progress_callback=progress_callback,
118 )
120 # Step 3: Format final report
121 if progress_callback:
122 progress_callback(
123 "Formatting final report",
124 90,
125 {"phase": "report_formatting"},
126 )
127 report = self._format_final_report(sections, structure, query)
129 if progress_callback:
130 progress_callback(
131 "Report complete", 100, {"phase": "report_complete"}
132 )
134 return report
136 def _determine_report_structure(
137 self, findings: Dict, query: str
138 ) -> List[Dict]:
139 """Analyze content and determine optimal report structure."""
140 combined_content = findings["current_knowledge"]
141 prompt = f"""
142 Analyze this research content about: {query}
144 Content Summary:
145 {combined_content[:1000]}... [truncated]
147 Determine the most appropriate report structure by:
148 1. Analyzing the type of content (technical, business, academic, etc.)
149 2. Identifying main themes and logical groupings
150 3. Considering the depth and breadth of the research
152 Return a table of contents structure in this exact format:
153 STRUCTURE
154 1. [Section Name]
155 - [Subsection] | [purpose]
156 2. [Section Name]
157 - [Subsection] | [purpose]
158 ...
159 END_STRUCTURE
161 Make the structure specific to the content, not generic.
162 Each subsection must include its purpose after the | symbol.
163 DO NOT include sections about sources, citations, references, or methodology.
164 """
166 response = get_llm_response_text(self.model.invoke(prompt))
168 # Parse the structure
169 structure: List[Dict[str, Any]] = []
170 current_section: Optional[Dict[str, Any]] = None
172 for line in response.split("\n"):
173 if line.strip() in ["STRUCTURE", "END_STRUCTURE"]:
174 continue
176 if line.strip().startswith(tuple("123456789")):
177 # Main section — require a dot-delimited name (e.g. "1. Intro").
178 parts = line.split(".", 1)
179 if len(parts) < 2 or not parts[1].strip():
180 continue
181 section_name = parts[1].strip()
182 current_section = {"name": section_name, "subsections": []}
183 structure.append(current_section)
184 elif line.strip().startswith("-") and current_section:
185 # Subsection with or without purpose
186 parts = line.strip("- ").split(
187 "|", 1
188 ) # Only split on first pipe
189 if len(parts) == 2:
190 current_section["subsections"].append(
191 {"name": parts[0].strip(), "purpose": parts[1].strip()}
192 )
193 elif len(parts) == 1 and parts[0].strip(): 193 ↛ 172line 193 didn't jump to line 172 because the condition on line 193 was always true
194 # Subsection without purpose - add default
195 current_section["subsections"].append(
196 {
197 "name": parts[0].strip(),
198 "purpose": f"Provide detailed information about {parts[0].strip()}",
199 }
200 )
202 # Check if the last section is source-related and remove it
203 if structure:
204 last_section = structure[-1]
205 section_name_lower = last_section["name"].lower()
206 source_keywords = [
207 "source",
208 "citation",
209 "reference",
210 "bibliography",
211 ]
213 # Only check the last section for source-related content
214 if any(
215 keyword in section_name_lower for keyword in source_keywords
216 ):
217 logger.info(
218 f"Removed source-related last section: {last_section['name']}"
219 )
220 structure = structure[:-1]
222 return structure
224 def _truncate_at_sentence_boundary(self, text: str, max_chars: int) -> str:
225 """Truncate text at a sentence boundary to preserve readability.
227 Attempts to cut at the last sentence-ending punctuation (.!?) before
228 the limit. If no suitable boundary is found within 80% of the limit,
229 falls back to hard truncation.
231 Args:
232 text: The text to truncate
233 max_chars: Maximum characters allowed
235 Returns:
236 Truncated text with [...truncated] marker if truncation occurred
237 """
238 if len(text) <= max_chars:
239 return text
241 truncated = text[:max_chars]
243 # Look for sentence boundaries (. ! ?) followed by space or newline
244 # Search backwards from the end for the last complete sentence
245 last_sentence_end = -1
246 for i in range(len(truncated) - 1, -1, -1):
247 if truncated[i] in ".!?" and (
248 i + 1 >= len(truncated) or truncated[i + 1] in " \n"
249 ):
250 last_sentence_end = i + 1
251 break
253 # Only use sentence boundary if it preserves at least 80% of content
254 min_acceptable = int(max_chars * 0.8)
255 if last_sentence_end > min_acceptable:
256 return truncated[:last_sentence_end] + "\n[...truncated]"
258 # Fallback to hard truncation
259 return truncated + "\n[...truncated]"
261 def _build_previous_context(self, accumulated_findings: List[str]) -> str:
262 """Build context block from previously generated sections.
264 Creates a formatted context block containing content from the last
265 N sections (defined by self.max_context_sections) with explicit instructions
266 not to repeat this content. Context is truncated if it exceeds
267 self.max_context_chars to stay safe for smaller local models.
269 Args:
270 accumulated_findings: List of previously generated section content,
271 each formatted as "[Section > Subsection]\\n{content}"
273 Returns:
274 Formatted context block with delimiters, or empty string if no
275 previous findings exist
276 """
277 if not accumulated_findings:
278 return ""
280 recent_findings = accumulated_findings[-self.max_context_sections :]
281 previous_context = "\n\n---\n\n".join(recent_findings)
283 # Truncate at sentence boundary if too long
284 if len(previous_context) > self.max_context_chars:
285 previous_context = self._truncate_at_sentence_boundary(
286 previous_context, self.max_context_chars
287 )
289 return (
290 f"\n\n=== CONTENT ALREADY WRITTEN (DO NOT REPEAT) ===\n"
291 f"{previous_context}\n"
292 f"=== END OF PREVIOUS CONTENT ===\n\n"
293 f"CRITICAL: The above content has already been written. Do NOT repeat "
294 f"these points, examples, or explanations. Focus on NEW information "
295 f"not covered above.\n"
296 )
298 def _research_and_generate_sections(
299 self,
300 initial_findings: Dict,
301 structure: List[Dict],
302 query: str,
303 progress_callback=None,
304 ) -> Dict[str, str]:
305 """Research and generate content for each section in one step.
307 This method processes sections sequentially, accumulating generated
308 content as it goes. For each new section/subsection, it passes context
309 from the last few previously generated sections to help the LLM avoid
310 repetition.
312 The context accumulation mechanism:
313 - Tracks all generated content in accumulated_findings list
314 - Before generating each section, builds context from recent findings
315 - Uses self.max_context_sections (configurable, default: 3) to limit context size
316 - Truncates context to self.max_context_chars (configurable, default: 4000) for safety
317 - Includes explicit "DO NOT REPEAT" instructions with actual content
319 Args:
320 initial_findings: Results from initial research phase, may contain
321 questions_by_iteration to preserve search continuity
322 structure: List of section definitions, each with name and subsections
323 query: Original user query for context
325 Returns:
326 Dict mapping section names to their generated markdown content
327 """
328 sections = {}
330 # Accumulate content from previous sections to avoid repetition
331 accumulated_findings: List[str] = []
333 # Count total subsections for progress tracking
334 total_subsections = sum(
335 max(len(section.get("subsections", [])), 1) for section in structure
336 )
337 completed_subsections = 0
339 # Preserve questions from initial research to avoid repetition
340 # This follows the same pattern as citation tracking (all_links_of_system)
341 existing_questions = initial_findings.get("questions_by_iteration", {})
342 if existing_questions:
343 # Set questions on both search system and its strategy
344 if hasattr(self.search_system, "questions_by_iteration"): 344 ↛ 350line 344 didn't jump to line 350 because the condition on line 344 was always true
345 self.search_system.questions_by_iteration = (
346 existing_questions.copy()
347 )
349 # More importantly, set it on the strategy which actually uses it
350 if hasattr(self.search_system, "strategy") and hasattr( 350 ↛ 360line 350 didn't jump to line 360 because the condition on line 350 was always true
351 self.search_system.strategy, "questions_by_iteration"
352 ):
353 self.search_system.strategy.questions_by_iteration = (
354 existing_questions.copy()
355 )
356 logger.info(
357 f"Initialized strategy with {len(existing_questions)} iterations of previous questions"
358 )
360 for i, section in enumerate(structure, 1):
361 logger.info(f"Processing section: {section['name']}")
362 section_content = []
364 section_content.append(f"# {i}. {section['name']}\n")
366 # If section has no subsections, create one from the section itself
367 if not section["subsections"]:
368 # Parse section name for purpose
369 if "|" in section["name"]:
370 parts = section["name"].split("|", 1)
371 section["subsections"] = [
372 {"name": parts[0].strip(), "purpose": parts[1].strip()}
373 ]
374 else:
375 # No purpose provided - use section name as subsection
376 section["subsections"] = [
377 {
378 "name": section["name"],
379 "purpose": f"Provide comprehensive content for {section['name']}",
380 }
381 ]
383 # Process each subsection by directly researching it
384 for j, subsection in enumerate(section["subsections"], 1):
385 # Only add subsection header if there are multiple subsections
386 if len(section["subsections"]) > 1:
387 section_content.append(f"## {i}.{j} {subsection['name']}\n")
388 section_content.append(f"_{subsection['purpose']}_\n\n")
390 # Get other subsections in this section for context
391 other_subsections = [
392 f"- {s['name']}: {s['purpose']}"
393 for s in section["subsections"]
394 if s["name"] != subsection["name"]
395 ]
396 other_subsections_text = (
397 "\n".join(other_subsections)
398 if other_subsections
399 else "None"
400 )
402 # Get all other sections for broader context
403 other_sections = [
404 f"- {s['name']}"
405 for s in structure
406 if s["name"] != section["name"]
407 ]
408 other_sections_text = (
409 "\n".join(other_sections) if other_sections else "None"
410 )
412 # Check if this is actually a section-level content (only one subsection, likely auto-created)
413 is_section_level = len(section["subsections"]) == 1
415 # Build context from previously generated sections to avoid repetition
416 previous_context_section = self._build_previous_context(
417 accumulated_findings
418 )
420 # Generate appropriate search query
421 if is_section_level:
422 # Section-level prompt - more comprehensive
423 subsection_query = (
424 f"Research task: Create comprehensive content for the '{subsection['name']}' section in a report about '{query}'. "
425 f"Section purpose: {subsection['purpose']} "
426 f"\n"
427 f"Other sections in the report:\n{other_sections_text}\n"
428 f"{previous_context_section}"
429 f"This is a standalone section requiring comprehensive coverage of its topic. "
430 f"Provide a thorough exploration that may include synthesis of information from previous sections where relevant. "
431 f"Include unique insights, specific examples, and concrete data. "
432 f"Use tables to organize information where applicable. "
433 f"For conclusion sections: synthesize key findings and provide forward-looking insights. "
434 f"Build upon the research findings from earlier sections to create a cohesive narrative."
435 )
436 else:
437 # Subsection-level prompt - more focused
438 subsection_query = (
439 f"Research task: Create content for subsection '{subsection['name']}' in a report about '{query}'. "
440 f"This subsection's purpose: {subsection['purpose']} "
441 f"Part of section: '{section['name']}' "
442 f"\n"
443 f"Other sections in the report:\n{other_sections_text}\n"
444 f"\n"
445 f"Other subsections in this section will cover:\n{other_subsections_text}\n"
446 f"{previous_context_section}"
447 f"Focus ONLY on information specific to your subsection's purpose. "
448 f"Include unique details, specific examples, and concrete data. "
449 f"Use tables to organize information where applicable. "
450 f"IMPORTANT: Avoid repeating information that would logically be covered in other sections - focus on what makes this subsection unique. "
451 f"Previous research exists - find specific angles for this subsection."
452 )
454 logger.info(
455 f"Researching subsection: {subsection['name']} with query: {subsection_query}"
456 )
458 # Report progress and check for cancellation
459 if progress_callback:
460 pct = int(
461 10
462 + (completed_subsections / max(total_subsections, 1))
463 * 80
464 )
465 progress_callback(
466 f"Researching: {section['name']} > {subsection['name']}",
467 pct,
468 {
469 "phase": "report_section_research",
470 "subsection": subsection["name"],
471 },
472 )
474 # Fix iteration override: modify strategy's settings_snapshot
475 # which is read dynamically via get_setting()
476 strategy = self.search_system.strategy
477 original_iterations = strategy.settings_snapshot.get(
478 "search.iterations"
479 )
480 had_iterations_key = (
481 "search.iterations" in strategy.settings_snapshot
482 )
483 strategy.settings_snapshot["search.iterations"] = 1
484 # Belt-and-suspenders: also override max_iterations for
485 # strategies that cache it at __init__ time
486 original_max_iter = getattr(strategy, "max_iterations", None)
487 strategy.max_iterations = 1
489 try:
490 # Perform search for this subsection
491 subsection_results = self.search_system.analyze_topic(
492 subsection_query
493 )
494 finally:
495 # Restore original iteration settings
496 if had_iterations_key:
497 strategy.settings_snapshot["search.iterations"] = (
498 original_iterations
499 )
500 else:
501 strategy.settings_snapshot.pop(
502 "search.iterations", None
503 )
504 if original_max_iter is not None: 504 ↛ 507line 504 didn't jump to line 507 because the condition on line 504 was always true
505 strategy.max_iterations = original_max_iter
507 completed_subsections += 1
509 # Add the researched content for this subsection
510 if subsection_results.get("current_knowledge"):
511 generated_content = subsection_results["current_knowledge"]
512 section_content.append(generated_content)
513 # Accumulate for context in subsequent sections
514 accumulated_findings.append(
515 f"[{section['name']} > {subsection['name']}]\n{generated_content}"
516 )
517 else:
518 section_content.append(
519 "*Limited information was found for this subsection.*\n"
520 )
522 section_content.append("\n\n")
524 # Combine all content for this section
525 sections[section["name"]] = "\n".join(section_content)
527 return sections
529 def _generate_sections(
530 self,
531 initial_findings: Dict,
532 _section_research: Dict[str, List[Dict]],
533 structure: List[Dict],
534 query: str,
535 ) -> Dict[str, str]:
536 """
537 This method is kept for compatibility but no longer used.
538 The functionality has been moved to _research_and_generate_sections.
539 """
540 return {}
542 def _format_final_report(
543 self,
544 sections: Dict[str, str],
545 structure: List[Dict],
546 query: str,
547 ) -> Dict:
548 """Format the final report with table of contents and sections."""
549 # Generate TOC
550 toc = ["# Table of Contents\n"]
551 for i, section in enumerate(structure, 1):
552 toc.append(f"{i}. **{section['name']}**")
553 if len(section["subsections"]) > 1:
554 for j, subsection in enumerate(section["subsections"], 1):
555 toc.append(
556 f" {i}.{j} {subsection['name']} | _{subsection['purpose']}_"
557 )
559 # Combine TOC and sections
560 report_parts = ["\n".join(toc), ""]
562 # Add a summary of the research
563 report_parts.append("# Research Summary")
564 report_parts.append(
565 "This report was researched using an advanced search system."
566 )
567 report_parts.append(
568 "Research included targeted searches for each section and subsection."
569 )
570 report_parts.append("\n---\n")
572 # Add each section's content
573 for section in structure:
574 if section["name"] in sections:
575 report_parts.append(sections[section["name"]])
576 report_parts.append("")
578 # Format links from search system
579 # Get utilities module dynamically to avoid circular imports
580 utilities = importlib.import_module("local_deep_research.utilities")
581 formatted_all_links = (
582 utilities.search_utilities.format_links_to_markdown(
583 all_links=self.search_system.all_links_of_system
584 )
585 )
587 # Create final report with all parts. The Sources tail is
588 # kept here so in-memory consumers (MCP `generate_report`,
589 # programmatic API) get the full assembled blob unchanged.
590 # The DB save site (research_service.py) strips this Sources
591 # section via format_document_split before persisting, so the
592 # answer-only invariant on report_content still holds.
593 final_report_content = "\n\n".join(report_parts)
594 # Explicit "\n\n" separator: downstream regex consumers
595 # (_SOURCES_SECTION_PATTERNS in text_optimization/citation_formatter.py
596 # and _LEGACY_SOURCES_RE in web/services/report_assembly_service.py)
597 # use line-anchored `re.MULTILINE` matching. Today the trailing
598 # newlines produced by `"\n\n".join` happen to keep `## Sources`
599 # at the start of a line, but that is incidental; an explicit
600 # separator preserves the invariant against future section
601 # template changes.
602 final_report_content += "\n\n## Sources\n\n" + formatted_all_links
604 # Create metadata dictionary
605 metadata = {
606 "generated_at": datetime.now(UTC).isoformat(),
607 "initial_sources": len(self.search_system.all_links_of_system),
608 "sections_researched": len(structure),
609 "searches_per_section": self.searches_per_section,
610 "query": query,
611 }
613 # Return both content and metadata
614 return {"content": final_report_content, "metadata": metadata}
616 def _generate_error_report(self, query: str, error_msg: str) -> str:
617 return f"=== ERROR REPORT ===\nQuery: {query}\nError: {error_msg}"