Coverage for src / local_deep_research / report_generator.py: 77%
120 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import importlib
2from typing import Dict, List
3from datetime import datetime, UTC
5from langchain_core.language_models import BaseChatModel
6from loguru import logger
8# Fix circular import by importing directly from source modules
9from .config.llm_config import get_llm
10from .search_system import AdvancedSearchSystem
11from .utilities import search_utilities
14def get_report_generator(search_system=None):
15 """Return an instance of the report generator with default settings.
17 Args:
18 search_system: Optional existing AdvancedSearchSystem to use
19 """
20 return IntegratedReportGenerator(search_system=search_system)
23class IntegratedReportGenerator:
24 def __init__(
25 self,
26 searches_per_section: int = 2,
27 search_system=None,
28 llm: BaseChatModel | None = None,
29 ):
30 """
31 Args:
32 searches_per_section: Number of searches to perform for each
33 section in the report.
34 search_system: Custom search system to use, otherwise just uses
35 the default.
36 llm: Custom LLM to use. Required if search_system is not provided.
38 """
39 # If search_system is provided, use its LLM; otherwise use the provided LLM
40 if search_system:
41 self.search_system = search_system
42 self.model = llm or search_system.model
43 elif llm: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true
44 self.model = llm
45 self.search_system = AdvancedSearchSystem(llm=self.model)
46 else:
47 # Fallback for backwards compatibility - will only work with auth
48 self.model = get_llm()
49 self.search_system = AdvancedSearchSystem(llm=self.model)
51 self.searches_per_section = (
52 searches_per_section # Control search depth per section
53 )
55 def generate_report(self, initial_findings: Dict, query: str) -> Dict:
56 """Generate a complete research report with section-specific research."""
58 # Step 1: Determine structure
59 structure = self._determine_report_structure(initial_findings, query)
61 # Step 2: Research and generate content for each section in one step
62 sections = self._research_and_generate_sections(
63 initial_findings, structure, query
64 )
66 # Step 3: Format final report
67 report = self._format_final_report(sections, structure, query)
69 return report
71 def _determine_report_structure(
72 self, findings: Dict, query: str
73 ) -> List[Dict]:
74 """Analyze content and determine optimal report structure."""
75 combined_content = findings["current_knowledge"]
76 prompt = f"""
77 Analyze this research content about: {query}
79 Content Summary:
80 {combined_content[:1000]}... [truncated]
82 Determine the most appropriate report structure by:
83 1. Analyzing the type of content (technical, business, academic, etc.)
84 2. Identifying main themes and logical groupings
85 3. Considering the depth and breadth of the research
87 Return a table of contents structure in this exact format:
88 STRUCTURE
89 1. [Section Name]
90 - [Subsection] | [purpose]
91 2. [Section Name]
92 - [Subsection] | [purpose]
93 ...
94 END_STRUCTURE
96 Make the structure specific to the content, not generic.
97 Each subsection must include its purpose after the | symbol.
98 DO NOT include sections about sources, citations, references, or methodology.
99 """
101 response = search_utilities.remove_think_tags(
102 self.model.invoke(prompt).content
103 )
105 # Parse the structure
106 structure = []
107 current_section = None
109 for line in response.split("\n"):
110 if line.strip() in ["STRUCTURE", "END_STRUCTURE"]:
111 continue
113 if line.strip().startswith(tuple("123456789")):
114 # Main section
115 section_name = line.split(".")[1].strip()
116 current_section = {"name": section_name, "subsections": []}
117 structure.append(current_section)
118 elif line.strip().startswith("-") and current_section:
119 # Subsection with or without purpose
120 parts = line.strip("- ").split("|")
121 if len(parts) == 2: 121 ↛ 125line 121 didn't jump to line 125 because the condition on line 121 was always true
122 current_section["subsections"].append(
123 {"name": parts[0].strip(), "purpose": parts[1].strip()}
124 )
125 elif len(parts) == 1 and parts[0].strip():
126 # Subsection without purpose - add default
127 current_section["subsections"].append(
128 {
129 "name": parts[0].strip(),
130 "purpose": f"Provide detailed information about {parts[0].strip()}",
131 }
132 )
134 # Check if the last section is source-related and remove it
135 if structure:
136 last_section = structure[-1]
137 section_name_lower = last_section["name"].lower()
138 source_keywords = [
139 "source",
140 "citation",
141 "reference",
142 "bibliography",
143 ]
145 # Only check the last section for source-related content
146 if any( 146 ↛ 149line 146 didn't jump to line 149 because the condition on line 146 was never true
147 keyword in section_name_lower for keyword in source_keywords
148 ):
149 logger.info(
150 f"Removed source-related last section: {last_section['name']}"
151 )
152 structure = structure[:-1]
154 return structure
156 def _research_and_generate_sections(
157 self,
158 initial_findings: Dict,
159 structure: List[Dict],
160 query: str,
161 ) -> Dict[str, str]:
162 """Research and generate content for each section in one step."""
163 sections = {}
165 # Preserve questions from initial research to avoid repetition
166 # This follows the same pattern as citation tracking (all_links_of_system)
167 existing_questions = initial_findings.get("questions_by_iteration", {})
168 if existing_questions: 168 ↛ 170line 168 didn't jump to line 170 because the condition on line 168 was never true
169 # Set questions on both search system and its strategy
170 if hasattr(self.search_system, "questions_by_iteration"):
171 self.search_system.questions_by_iteration = (
172 existing_questions.copy()
173 )
175 # More importantly, set it on the strategy which actually uses it
176 if hasattr(self.search_system, "strategy") and hasattr(
177 self.search_system.strategy, "questions_by_iteration"
178 ):
179 self.search_system.strategy.questions_by_iteration = (
180 existing_questions.copy()
181 )
182 logger.info(
183 f"Initialized strategy with {len(existing_questions)} iterations of previous questions"
184 )
186 for section in structure:
187 logger.info(f"Processing section: {section['name']}")
188 section_content = []
190 section_content.append(f"# {section['name']}\n")
192 # If section has no subsections, create one from the section itself
193 if not section["subsections"]: 193 ↛ 195line 193 didn't jump to line 195 because the condition on line 193 was never true
194 # Parse section name for purpose
195 if "|" in section["name"]:
196 parts = section["name"].split("|", 1)
197 section["subsections"] = [
198 {"name": parts[0].strip(), "purpose": parts[1].strip()}
199 ]
200 else:
201 # No purpose provided - use section name as subsection
202 section["subsections"] = [
203 {
204 "name": section["name"],
205 "purpose": f"Provide comprehensive content for {section['name']}",
206 }
207 ]
209 # Process each subsection by directly researching it
210 for subsection in section["subsections"]:
211 # Only add subsection header if there are multiple subsections
212 if len(section["subsections"]) > 1: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 section_content.append(f"## {subsection['name']}\n")
214 section_content.append(f"_{subsection['purpose']}_\n\n")
216 # Get other subsections in this section for context
217 other_subsections = [
218 f"- {s['name']}: {s['purpose']}"
219 for s in section["subsections"]
220 if s["name"] != subsection["name"]
221 ]
222 other_subsections_text = (
223 "\n".join(other_subsections)
224 if other_subsections
225 else "None"
226 )
228 # Get all other sections for broader context
229 other_sections = [
230 f"- {s['name']}"
231 for s in structure
232 if s["name"] != section["name"]
233 ]
234 other_sections_text = (
235 "\n".join(other_sections) if other_sections else "None"
236 )
238 # Check if this is actually a section-level content (only one subsection, likely auto-created)
239 is_section_level = len(section["subsections"]) == 1
241 # Generate appropriate search query
242 if is_section_level: 242 ↛ 259line 242 didn't jump to line 259 because the condition on line 242 was always true
243 # Section-level prompt - more comprehensive
244 subsection_query = (
245 f"Research task: Create comprehensive content for the '{subsection['name']}' section in a report about '{query}'. "
246 f"Section purpose: {subsection['purpose']} "
247 f"\n"
248 f"Other sections in the report:\n{other_sections_text}\n"
249 f"\n"
250 f"This is a standalone section requiring comprehensive coverage of its topic. "
251 f"Provide a thorough exploration that may include synthesis of information from previous sections where relevant. "
252 f"Include unique insights, specific examples, and concrete data. "
253 f"Use tables to organize information where applicable. "
254 f"For conclusion sections: synthesize key findings and provide forward-looking insights. "
255 f"Build upon the research findings from earlier sections to create a cohesive narrative."
256 )
257 else:
258 # Subsection-level prompt - more focused
259 subsection_query = (
260 f"Research task: Create content for subsection '{subsection['name']}' in a report about '{query}'. "
261 f"This subsection's purpose: {subsection['purpose']} "
262 f"Part of section: '{section['name']}' "
263 f"\n"
264 f"Other sections in the report:\n{other_sections_text}\n"
265 f"\n"
266 f"Other subsections in this section will cover:\n{other_subsections_text}\n"
267 f"\n"
268 f"Focus ONLY on information specific to your subsection's purpose. "
269 f"Include unique details, specific examples, and concrete data. "
270 f"Use tables to organize information where applicable. "
271 f"IMPORTANT: Avoid repeating information that would logically be covered in other sections - focus on what makes this subsection unique. "
272 f"Previous research exists - find specific angles for this subsection."
273 )
275 logger.info(
276 f"Researching subsection: {subsection['name']} with query: {subsection_query}"
277 )
279 # Configure search system for focused search
280 original_max_iterations = self.search_system.max_iterations
281 self.search_system.max_iterations = 1 # Keep search focused
283 # Perform search for this subsection
284 subsection_results = self.search_system.analyze_topic(
285 subsection_query
286 )
288 # Restore original iterations setting
289 self.search_system.max_iterations = original_max_iterations
291 # Add the researched content for this subsection
292 if subsection_results.get("current_knowledge"): 292 ↛ 297line 292 didn't jump to line 297 because the condition on line 292 was always true
293 section_content.append(
294 subsection_results["current_knowledge"]
295 )
296 else:
297 section_content.append(
298 "*Limited information was found for this subsection.*\n"
299 )
301 section_content.append("\n\n")
303 # Combine all content for this section
304 sections[section["name"]] = "\n".join(section_content)
306 return sections
308 def _generate_sections(
309 self,
310 initial_findings: Dict,
311 section_research: Dict[str, List[Dict]],
312 structure: List[Dict],
313 query: str,
314 ) -> Dict[str, str]:
315 """
316 This method is kept for compatibility but no longer used.
317 The functionality has been moved to _research_and_generate_sections.
318 """
319 return {}
321 def _format_final_report(
322 self,
323 sections: Dict[str, str],
324 structure: List[Dict],
325 query: str,
326 ) -> Dict:
327 """Format the final report with table of contents and sections."""
328 # Generate TOC
329 toc = ["# Table of Contents\n"]
330 for i, section in enumerate(structure, 1):
331 toc.append(f"{i}. **{section['name']}**")
332 for j, subsection in enumerate(section["subsections"], 1):
333 toc.append(
334 f" {i}.{j} {subsection['name']} | _{subsection['purpose']}_"
335 )
337 # Combine TOC and sections
338 report_parts = ["\n".join(toc), ""]
340 # Add a summary of the research
341 report_parts.append("# Research Summary")
342 report_parts.append(
343 "This report was researched using an advanced search system."
344 )
345 report_parts.append(
346 "Research included targeted searches for each section and subsection."
347 )
348 report_parts.append("\n---\n")
350 # Add each section's content
351 for section in structure:
352 if section["name"] in sections: 352 ↛ 351line 352 didn't jump to line 351 because the condition on line 352 was always true
353 report_parts.append(sections[section["name"]])
354 report_parts.append("")
356 # Format links from search system
357 # Get utilities module dynamically to avoid circular imports
358 utilities = importlib.import_module("local_deep_research.utilities")
359 formatted_all_links = (
360 utilities.search_utilities.format_links_to_markdown(
361 all_links=self.search_system.all_links_of_system
362 )
363 )
365 # Create final report with all parts
366 final_report_content = "\n\n".join(report_parts)
367 final_report_content = (
368 final_report_content + "\n\n## Sources\n\n" + formatted_all_links
369 )
371 # Create metadata dictionary
372 metadata = {
373 "generated_at": datetime.now(UTC).isoformat(),
374 "initial_sources": len(self.search_system.all_links_of_system),
375 "sections_researched": len(structure),
376 "searches_per_section": self.searches_per_section,
377 "query": query,
378 }
380 # Return both content and metadata
381 return {"content": final_report_content, "metadata": metadata}
383 def _generate_error_report(self, query: str, error_msg: str) -> str:
384 error_report = (
385 f"=== ERROR REPORT ===\nQuery: {query}\nError: {error_msg}"
386 )
387 return error_report