Coverage for src / local_deep_research / report_generator.py: 98%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import importlib 

2from typing import Dict, List, Optional 

3from datetime import datetime, UTC 

4 

5from langchain_core.language_models import BaseChatModel 

6from loguru import logger 

7 

8# Fix circular import by importing directly from source modules 

9from .config.llm_config import get_llm 

10from .config.thread_settings import get_setting_from_snapshot 

11from .search_system import AdvancedSearchSystem 

12from .utilities import search_utilities 

13 

14# Default constants for context accumulation to avoid repetition 

15# These are used as fallbacks when settings are not available 

16DEFAULT_MAX_CONTEXT_SECTIONS = ( 

17 3 # Number of previous sections to include as context 

18) 

19DEFAULT_MAX_CONTEXT_CHARS = ( 

20 4000 # Max characters for context (safe for smaller local models) 

21) 

22 

23 

24def get_report_generator(search_system=None): 

25 """Return an instance of the report generator with default settings. 

26 

27 Args: 

28 search_system: Optional existing AdvancedSearchSystem to use 

29 """ 

30 return IntegratedReportGenerator(search_system=search_system) 

31 

32 

33class IntegratedReportGenerator: 

34 def __init__( 

35 self, 

36 searches_per_section: int = 2, 

37 search_system=None, 

38 llm: BaseChatModel | None = None, 

39 settings_snapshot: Optional[Dict] = None, 

40 ): 

41 """ 

42 Args: 

43 searches_per_section: Number of searches to perform for each 

44 section in the report. 

45 search_system: Custom search system to use, otherwise just uses 

46 the default. 

47 llm: Custom LLM to use. Required if search_system is not provided. 

48 settings_snapshot: Optional settings snapshot for configurable values. 

49 

50 """ 

51 # If search_system is provided, use its LLM; otherwise use the provided LLM 

52 if search_system: 

53 self.search_system = search_system 

54 self.model = llm or search_system.model 

55 elif llm: 

56 self.model = llm 

57 self.search_system = AdvancedSearchSystem(llm=self.model) 

58 else: 

59 # Fallback for backwards compatibility - will only work with auth 

60 self.model = get_llm() 

61 self.search_system = AdvancedSearchSystem(llm=self.model) 

62 

63 self.searches_per_section = ( 

64 searches_per_section # Control search depth per section 

65 ) 

66 

67 # Load context settings from snapshot or use defaults 

68 self.max_context_sections = get_setting_from_snapshot( 

69 "report.max_context_sections", 

70 default=DEFAULT_MAX_CONTEXT_SECTIONS, 

71 settings_snapshot=settings_snapshot, 

72 ) 

73 self.max_context_chars = get_setting_from_snapshot( 

74 "report.max_context_chars", 

75 default=DEFAULT_MAX_CONTEXT_CHARS, 

76 settings_snapshot=settings_snapshot, 

77 ) 

78 

79 def generate_report(self, initial_findings: Dict, query: str) -> Dict: 

80 """Generate a complete research report with section-specific research.""" 

81 

82 # Step 1: Determine structure 

83 structure = self._determine_report_structure(initial_findings, query) 

84 

85 # Step 2: Research and generate content for each section in one step 

86 sections = self._research_and_generate_sections( 

87 initial_findings, structure, query 

88 ) 

89 

90 # Step 3: Format final report 

91 report = self._format_final_report(sections, structure, query) 

92 

93 return report 

94 

95 def _determine_report_structure( 

96 self, findings: Dict, query: str 

97 ) -> List[Dict]: 

98 """Analyze content and determine optimal report structure.""" 

99 combined_content = findings["current_knowledge"] 

100 prompt = f""" 

101 Analyze this research content about: {query} 

102 

103 Content Summary: 

104 {combined_content[:1000]}... [truncated] 

105 

106 Determine the most appropriate report structure by: 

107 1. Analyzing the type of content (technical, business, academic, etc.) 

108 2. Identifying main themes and logical groupings 

109 3. Considering the depth and breadth of the research 

110 

111 Return a table of contents structure in this exact format: 

112 STRUCTURE 

113 1. [Section Name] 

114 - [Subsection] | [purpose] 

115 2. [Section Name] 

116 - [Subsection] | [purpose] 

117 ... 

118 END_STRUCTURE 

119 

120 Make the structure specific to the content, not generic. 

121 Each subsection must include its purpose after the | symbol. 

122 DO NOT include sections about sources, citations, references, or methodology. 

123 """ 

124 

125 response = search_utilities.remove_think_tags( 

126 self.model.invoke(prompt).content 

127 ) 

128 

129 # Parse the structure 

130 structure = [] 

131 current_section = None 

132 

133 for line in response.split("\n"): 

134 if line.strip() in ["STRUCTURE", "END_STRUCTURE"]: 

135 continue 

136 

137 if line.strip().startswith(tuple("123456789")): 

138 # Main section 

139 section_name = line.split(".")[1].strip() 

140 current_section = {"name": section_name, "subsections": []} 

141 structure.append(current_section) 

142 elif line.strip().startswith("-") and current_section: 

143 # Subsection with or without purpose 

144 parts = line.strip("- ").split( 

145 "|", 1 

146 ) # Only split on first pipe 

147 if len(parts) == 2: 

148 current_section["subsections"].append( 

149 {"name": parts[0].strip(), "purpose": parts[1].strip()} 

150 ) 

151 elif len(parts) == 1 and parts[0].strip(): 151 ↛ 133line 151 didn't jump to line 133 because the condition on line 151 was always true

152 # Subsection without purpose - add default 

153 current_section["subsections"].append( 

154 { 

155 "name": parts[0].strip(), 

156 "purpose": f"Provide detailed information about {parts[0].strip()}", 

157 } 

158 ) 

159 

160 # Check if the last section is source-related and remove it 

161 if structure: 

162 last_section = structure[-1] 

163 section_name_lower = last_section["name"].lower() 

164 source_keywords = [ 

165 "source", 

166 "citation", 

167 "reference", 

168 "bibliography", 

169 ] 

170 

171 # Only check the last section for source-related content 

172 if any( 

173 keyword in section_name_lower for keyword in source_keywords 

174 ): 

175 logger.info( 

176 f"Removed source-related last section: {last_section['name']}" 

177 ) 

178 structure = structure[:-1] 

179 

180 return structure 

181 

182 def _truncate_at_sentence_boundary(self, text: str, max_chars: int) -> str: 

183 """Truncate text at a sentence boundary to preserve readability. 

184 

185 Attempts to cut at the last sentence-ending punctuation (.!?) before 

186 the limit. If no suitable boundary is found within 80% of the limit, 

187 falls back to hard truncation. 

188 

189 Args: 

190 text: The text to truncate 

191 max_chars: Maximum characters allowed 

192 

193 Returns: 

194 Truncated text with [...truncated] marker if truncation occurred 

195 """ 

196 if len(text) <= max_chars: 

197 return text 

198 

199 truncated = text[:max_chars] 

200 

201 # Look for sentence boundaries (. ! ?) followed by space or newline 

202 # Search backwards from the end for the last complete sentence 

203 last_sentence_end = -1 

204 for i in range(len(truncated) - 1, -1, -1): 

205 if truncated[i] in ".!?" and ( 

206 i + 1 >= len(truncated) or truncated[i + 1] in " \n" 

207 ): 

208 last_sentence_end = i + 1 

209 break 

210 

211 # Only use sentence boundary if it preserves at least 80% of content 

212 min_acceptable = int(max_chars * 0.8) 

213 if last_sentence_end > min_acceptable: 

214 return truncated[:last_sentence_end] + "\n[...truncated]" 

215 

216 # Fallback to hard truncation 

217 return truncated + "\n[...truncated]" 

218 

219 def _build_previous_context(self, accumulated_findings: List[str]) -> str: 

220 """Build context block from previously generated sections. 

221 

222 Creates a formatted context block containing content from the last 

223 N sections (defined by self.max_context_sections) with explicit instructions 

224 not to repeat this content. Context is truncated if it exceeds 

225 self.max_context_chars to stay safe for smaller local models. 

226 

227 Args: 

228 accumulated_findings: List of previously generated section content, 

229 each formatted as "[Section > Subsection]\\n{content}" 

230 

231 Returns: 

232 Formatted context block with delimiters, or empty string if no 

233 previous findings exist 

234 """ 

235 if not accumulated_findings: 

236 return "" 

237 

238 recent_findings = accumulated_findings[-self.max_context_sections :] 

239 previous_context = "\n\n---\n\n".join(recent_findings) 

240 

241 # Truncate at sentence boundary if too long 

242 if len(previous_context) > self.max_context_chars: 

243 previous_context = self._truncate_at_sentence_boundary( 

244 previous_context, self.max_context_chars 

245 ) 

246 

247 return ( 

248 f"\n\n=== CONTENT ALREADY WRITTEN (DO NOT REPEAT) ===\n" 

249 f"{previous_context}\n" 

250 f"=== END OF PREVIOUS CONTENT ===\n\n" 

251 f"CRITICAL: The above content has already been written. Do NOT repeat " 

252 f"these points, examples, or explanations. Focus on NEW information " 

253 f"not covered above.\n" 

254 ) 

255 

256 def _research_and_generate_sections( 

257 self, 

258 initial_findings: Dict, 

259 structure: List[Dict], 

260 query: str, 

261 ) -> Dict[str, str]: 

262 """Research and generate content for each section in one step. 

263 

264 This method processes sections sequentially, accumulating generated 

265 content as it goes. For each new section/subsection, it passes context 

266 from the last few previously generated sections to help the LLM avoid 

267 repetition. 

268 

269 The context accumulation mechanism: 

270 - Tracks all generated content in accumulated_findings list 

271 - Before generating each section, builds context from recent findings 

272 - Uses self.max_context_sections (configurable, default: 3) to limit context size 

273 - Truncates context to self.max_context_chars (configurable, default: 4000) for safety 

274 - Includes explicit "DO NOT REPEAT" instructions with actual content 

275 

276 Args: 

277 initial_findings: Results from initial research phase, may contain 

278 questions_by_iteration to preserve search continuity 

279 structure: List of section definitions, each with name and subsections 

280 query: Original user query for context 

281 

282 Returns: 

283 Dict mapping section names to their generated markdown content 

284 """ 

285 sections = {} 

286 

287 # Accumulate content from previous sections to avoid repetition 

288 accumulated_findings: List[str] = [] 

289 

290 # Preserve questions from initial research to avoid repetition 

291 # This follows the same pattern as citation tracking (all_links_of_system) 

292 existing_questions = initial_findings.get("questions_by_iteration", {}) 

293 if existing_questions: 

294 # Set questions on both search system and its strategy 

295 if hasattr(self.search_system, "questions_by_iteration"): 295 ↛ 301line 295 didn't jump to line 301 because the condition on line 295 was always true

296 self.search_system.questions_by_iteration = ( 

297 existing_questions.copy() 

298 ) 

299 

300 # More importantly, set it on the strategy which actually uses it 

301 if hasattr(self.search_system, "strategy") and hasattr( 301 ↛ 311line 301 didn't jump to line 311 because the condition on line 301 was always true

302 self.search_system.strategy, "questions_by_iteration" 

303 ): 

304 self.search_system.strategy.questions_by_iteration = ( 

305 existing_questions.copy() 

306 ) 

307 logger.info( 

308 f"Initialized strategy with {len(existing_questions)} iterations of previous questions" 

309 ) 

310 

311 for section in structure: 

312 logger.info(f"Processing section: {section['name']}") 

313 section_content = [] 

314 

315 section_content.append(f"# {section['name']}\n") 

316 

317 # If section has no subsections, create one from the section itself 

318 if not section["subsections"]: 

319 # Parse section name for purpose 

320 if "|" in section["name"]: 

321 parts = section["name"].split("|", 1) 

322 section["subsections"] = [ 

323 {"name": parts[0].strip(), "purpose": parts[1].strip()} 

324 ] 

325 else: 

326 # No purpose provided - use section name as subsection 

327 section["subsections"] = [ 

328 { 

329 "name": section["name"], 

330 "purpose": f"Provide comprehensive content for {section['name']}", 

331 } 

332 ] 

333 

334 # Process each subsection by directly researching it 

335 for subsection in section["subsections"]: 

336 # Only add subsection header if there are multiple subsections 

337 if len(section["subsections"]) > 1: 

338 section_content.append(f"## {subsection['name']}\n") 

339 section_content.append(f"_{subsection['purpose']}_\n\n") 

340 

341 # Get other subsections in this section for context 

342 other_subsections = [ 

343 f"- {s['name']}: {s['purpose']}" 

344 for s in section["subsections"] 

345 if s["name"] != subsection["name"] 

346 ] 

347 other_subsections_text = ( 

348 "\n".join(other_subsections) 

349 if other_subsections 

350 else "None" 

351 ) 

352 

353 # Get all other sections for broader context 

354 other_sections = [ 

355 f"- {s['name']}" 

356 for s in structure 

357 if s["name"] != section["name"] 

358 ] 

359 other_sections_text = ( 

360 "\n".join(other_sections) if other_sections else "None" 

361 ) 

362 

363 # Check if this is actually a section-level content (only one subsection, likely auto-created) 

364 is_section_level = len(section["subsections"]) == 1 

365 

366 # Build context from previously generated sections to avoid repetition 

367 previous_context_section = self._build_previous_context( 

368 accumulated_findings 

369 ) 

370 

371 # Generate appropriate search query 

372 if is_section_level: 

373 # Section-level prompt - more comprehensive 

374 subsection_query = ( 

375 f"Research task: Create comprehensive content for the '{subsection['name']}' section in a report about '{query}'. " 

376 f"Section purpose: {subsection['purpose']} " 

377 f"\n" 

378 f"Other sections in the report:\n{other_sections_text}\n" 

379 f"{previous_context_section}" 

380 f"This is a standalone section requiring comprehensive coverage of its topic. " 

381 f"Provide a thorough exploration that may include synthesis of information from previous sections where relevant. " 

382 f"Include unique insights, specific examples, and concrete data. " 

383 f"Use tables to organize information where applicable. " 

384 f"For conclusion sections: synthesize key findings and provide forward-looking insights. " 

385 f"Build upon the research findings from earlier sections to create a cohesive narrative." 

386 ) 

387 else: 

388 # Subsection-level prompt - more focused 

389 subsection_query = ( 

390 f"Research task: Create content for subsection '{subsection['name']}' in a report about '{query}'. " 

391 f"This subsection's purpose: {subsection['purpose']} " 

392 f"Part of section: '{section['name']}' " 

393 f"\n" 

394 f"Other sections in the report:\n{other_sections_text}\n" 

395 f"\n" 

396 f"Other subsections in this section will cover:\n{other_subsections_text}\n" 

397 f"{previous_context_section}" 

398 f"Focus ONLY on information specific to your subsection's purpose. " 

399 f"Include unique details, specific examples, and concrete data. " 

400 f"Use tables to organize information where applicable. " 

401 f"IMPORTANT: Avoid repeating information that would logically be covered in other sections - focus on what makes this subsection unique. " 

402 f"Previous research exists - find specific angles for this subsection." 

403 ) 

404 

405 logger.info( 

406 f"Researching subsection: {subsection['name']} with query: {subsection_query}" 

407 ) 

408 

409 # Configure search system for focused search 

410 original_max_iterations = self.search_system.max_iterations 

411 self.search_system.max_iterations = 1 # Keep search focused 

412 

413 # Perform search for this subsection 

414 subsection_results = self.search_system.analyze_topic( 

415 subsection_query 

416 ) 

417 

418 # Restore original iterations setting 

419 self.search_system.max_iterations = original_max_iterations 

420 

421 # Add the researched content for this subsection 

422 if subsection_results.get("current_knowledge"): 

423 generated_content = subsection_results["current_knowledge"] 

424 section_content.append(generated_content) 

425 # Accumulate for context in subsequent sections 

426 accumulated_findings.append( 

427 f"[{section['name']} > {subsection['name']}]\n{generated_content}" 

428 ) 

429 else: 

430 section_content.append( 

431 "*Limited information was found for this subsection.*\n" 

432 ) 

433 

434 section_content.append("\n\n") 

435 

436 # Combine all content for this section 

437 sections[section["name"]] = "\n".join(section_content) 

438 

439 return sections 

440 

441 def _generate_sections( 

442 self, 

443 initial_findings: Dict, 

444 _section_research: Dict[str, List[Dict]], 

445 structure: List[Dict], 

446 query: str, 

447 ) -> Dict[str, str]: 

448 """ 

449 This method is kept for compatibility but no longer used. 

450 The functionality has been moved to _research_and_generate_sections. 

451 """ 

452 return {} 

453 

454 def _format_final_report( 

455 self, 

456 sections: Dict[str, str], 

457 structure: List[Dict], 

458 query: str, 

459 ) -> Dict: 

460 """Format the final report with table of contents and sections.""" 

461 # Generate TOC 

462 toc = ["# Table of Contents\n"] 

463 for i, section in enumerate(structure, 1): 

464 toc.append(f"{i}. **{section['name']}**") 

465 for j, subsection in enumerate(section["subsections"], 1): 

466 toc.append( 

467 f" {i}.{j} {subsection['name']} | _{subsection['purpose']}_" 

468 ) 

469 

470 # Combine TOC and sections 

471 report_parts = ["\n".join(toc), ""] 

472 

473 # Add a summary of the research 

474 report_parts.append("# Research Summary") 

475 report_parts.append( 

476 "This report was researched using an advanced search system." 

477 ) 

478 report_parts.append( 

479 "Research included targeted searches for each section and subsection." 

480 ) 

481 report_parts.append("\n---\n") 

482 

483 # Add each section's content 

484 for section in structure: 

485 if section["name"] in sections: 485 ↛ 484line 485 didn't jump to line 484 because the condition on line 485 was always true

486 report_parts.append(sections[section["name"]]) 

487 report_parts.append("") 

488 

489 # Format links from search system 

490 # Get utilities module dynamically to avoid circular imports 

491 utilities = importlib.import_module("local_deep_research.utilities") 

492 formatted_all_links = ( 

493 utilities.search_utilities.format_links_to_markdown( 

494 all_links=self.search_system.all_links_of_system 

495 ) 

496 ) 

497 

498 # Create final report with all parts 

499 final_report_content = "\n\n".join(report_parts) 

500 final_report_content = ( 

501 final_report_content + "\n\n## Sources\n\n" + formatted_all_links 

502 ) 

503 

504 # Create metadata dictionary 

505 metadata = { 

506 "generated_at": datetime.now(UTC).isoformat(), 

507 "initial_sources": len(self.search_system.all_links_of_system), 

508 "sections_researched": len(structure), 

509 "searches_per_section": self.searches_per_section, 

510 "query": query, 

511 } 

512 

513 # Return both content and metadata 

514 return {"content": final_report_content, "metadata": metadata} 

515 

516 def _generate_error_report(self, query: str, error_msg: str) -> str: 

517 error_report = ( 

518 f"=== ERROR REPORT ===\nQuery: {query}\nError: {error_msg}" 

519 ) 

520 return error_report