Coverage for src / local_deep_research / report_generator.py: 98%

176 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import importlib 

2from typing import Any, Dict, List, Optional 

3from datetime import datetime, UTC 

4 

5from langchain_core.language_models import BaseChatModel 

6from loguru import logger 

7 

8# Fix circular import by importing directly from source modules 

9from .config.llm_config import get_llm 

10from .config.thread_settings import get_setting_from_snapshot 

11from .search_system import AdvancedSearchSystem 

12from .utilities import search_utilities 

13 

14# Default constants for context accumulation to avoid repetition 

15# These are used as fallbacks when settings are not available 

16DEFAULT_MAX_CONTEXT_SECTIONS = ( 

17 3 # Number of previous sections to include as context 

18) 

19DEFAULT_MAX_CONTEXT_CHARS = ( 

20 4000 # Max characters for context (safe for smaller local models) 

21) 

22 

23 

24def get_report_generator(search_system=None): 

25 """Return an instance of the report generator with default settings. 

26 

27 Args: 

28 search_system: Optional existing AdvancedSearchSystem to use 

29 """ 

30 return IntegratedReportGenerator(search_system=search_system) 

31 

32 

33class IntegratedReportGenerator: 

34 def __init__( 

35 self, 

36 searches_per_section: int = 2, 

37 search_system=None, 

38 llm: BaseChatModel | None = None, 

39 settings_snapshot: Optional[Dict] = None, 

40 ): 

41 """ 

42 Args: 

43 searches_per_section: Number of searches to perform for each 

44 section in the report. 

45 search_system: Custom search system to use, otherwise just uses 

46 the default. 

47 llm: Custom LLM to use. Required if search_system is not provided. 

48 settings_snapshot: Optional settings snapshot for configurable values. 

49 

50 """ 

51 # If search_system is provided, use its LLM; otherwise use the provided LLM 

52 self._owns_llm = False 

53 if search_system: 

54 self.search_system = search_system 

55 self.model = llm or search_system.model 

56 elif llm: 

57 self.model = llm 

58 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg] 

59 else: 

60 # Fallback for backwards compatibility - will only work with auth 

61 self._owns_llm = True 

62 self.model = get_llm() 

63 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg] 

64 

65 self.searches_per_section = ( 

66 searches_per_section # Control search depth per section 

67 ) 

68 

69 # Load context settings from snapshot or use defaults 

70 self.max_context_sections = get_setting_from_snapshot( 

71 "report.max_context_sections", 

72 default=DEFAULT_MAX_CONTEXT_SECTIONS, 

73 settings_snapshot=settings_snapshot, 

74 ) 

75 self.max_context_chars = get_setting_from_snapshot( 

76 "report.max_context_chars", 

77 default=DEFAULT_MAX_CONTEXT_CHARS, 

78 settings_snapshot=settings_snapshot, 

79 ) 

80 

81 def close(self) -> None: 

82 """Close the LLM client if this instance created it.""" 

83 from .utilities.resource_utils import safe_close 

84 

85 if self._owns_llm: 

86 safe_close(self.model, "report generator LLM") 

87 

88 def generate_report( 

89 self, 

90 initial_findings: Dict, 

91 query: str, 

92 progress_callback=None, 

93 ) -> Dict: 

94 """Generate a complete research report with section-specific research. 

95 

96 Args: 

97 initial_findings: Results from initial research phase. 

98 query: Original user query. 

99 progress_callback: Optional callable(message, progress_percent, metadata) 

100 for reporting progress (0-100%) and checking cancellation. 

101 """ 

102 

103 # Step 1: Determine structure 

104 if progress_callback: 

105 progress_callback( 

106 "Determining report structure", 

107 0, 

108 {"phase": "report_structure"}, 

109 ) 

110 structure = self._determine_report_structure(initial_findings, query) 

111 

112 # Step 2: Research and generate content for each section in one step 

113 sections = self._research_and_generate_sections( 

114 initial_findings, 

115 structure, 

116 query, 

117 progress_callback=progress_callback, 

118 ) 

119 

120 # Step 3: Format final report 

121 if progress_callback: 

122 progress_callback( 

123 "Formatting final report", 

124 90, 

125 {"phase": "report_formatting"}, 

126 ) 

127 report = self._format_final_report(sections, structure, query) 

128 

129 if progress_callback: 

130 progress_callback( 

131 "Report complete", 100, {"phase": "report_complete"} 

132 ) 

133 

134 return report 

135 

136 def _determine_report_structure( 

137 self, findings: Dict, query: str 

138 ) -> List[Dict]: 

139 """Analyze content and determine optimal report structure.""" 

140 combined_content = findings["current_knowledge"] 

141 prompt = f""" 

142 Analyze this research content about: {query} 

143 

144 Content Summary: 

145 {combined_content[:1000]}... [truncated] 

146 

147 Determine the most appropriate report structure by: 

148 1. Analyzing the type of content (technical, business, academic, etc.) 

149 2. Identifying main themes and logical groupings 

150 3. Considering the depth and breadth of the research 

151 

152 Return a table of contents structure in this exact format: 

153 STRUCTURE 

154 1. [Section Name] 

155 - [Subsection] | [purpose] 

156 2. [Section Name] 

157 - [Subsection] | [purpose] 

158 ... 

159 END_STRUCTURE 

160 

161 Make the structure specific to the content, not generic. 

162 Each subsection must include its purpose after the | symbol. 

163 DO NOT include sections about sources, citations, references, or methodology. 

164 """ 

165 

166 response = search_utilities.remove_think_tags( 

167 str(self.model.invoke(prompt).content) 

168 ) 

169 

170 # Parse the structure 

171 structure: List[Dict[str, Any]] = [] 

172 current_section: Optional[Dict[str, Any]] = None 

173 

174 for line in response.split("\n"): 

175 if line.strip() in ["STRUCTURE", "END_STRUCTURE"]: 

176 continue 

177 

178 if line.strip().startswith(tuple("123456789")): 

179 # Main section 

180 section_name = line.split(".")[1].strip() 

181 current_section = {"name": section_name, "subsections": []} 

182 structure.append(current_section) 

183 elif line.strip().startswith("-") and current_section: 

184 # Subsection with or without purpose 

185 parts = line.strip("- ").split( 

186 "|", 1 

187 ) # Only split on first pipe 

188 if len(parts) == 2: 

189 current_section["subsections"].append( 

190 {"name": parts[0].strip(), "purpose": parts[1].strip()} 

191 ) 

192 elif len(parts) == 1 and parts[0].strip(): 192 ↛ 174line 192 didn't jump to line 174 because the condition on line 192 was always true

193 # Subsection without purpose - add default 

194 current_section["subsections"].append( 

195 { 

196 "name": parts[0].strip(), 

197 "purpose": f"Provide detailed information about {parts[0].strip()}", 

198 } 

199 ) 

200 

201 # Check if the last section is source-related and remove it 

202 if structure: 

203 last_section = structure[-1] 

204 section_name_lower = last_section["name"].lower() 

205 source_keywords = [ 

206 "source", 

207 "citation", 

208 "reference", 

209 "bibliography", 

210 ] 

211 

212 # Only check the last section for source-related content 

213 if any( 

214 keyword in section_name_lower for keyword in source_keywords 

215 ): 

216 logger.info( 

217 f"Removed source-related last section: {last_section['name']}" 

218 ) 

219 structure = structure[:-1] 

220 

221 return structure 

222 

223 def _truncate_at_sentence_boundary(self, text: str, max_chars: int) -> str: 

224 """Truncate text at a sentence boundary to preserve readability. 

225 

226 Attempts to cut at the last sentence-ending punctuation (.!?) before 

227 the limit. If no suitable boundary is found within 80% of the limit, 

228 falls back to hard truncation. 

229 

230 Args: 

231 text: The text to truncate 

232 max_chars: Maximum characters allowed 

233 

234 Returns: 

235 Truncated text with [...truncated] marker if truncation occurred 

236 """ 

237 if len(text) <= max_chars: 

238 return text 

239 

240 truncated = text[:max_chars] 

241 

242 # Look for sentence boundaries (. ! ?) followed by space or newline 

243 # Search backwards from the end for the last complete sentence 

244 last_sentence_end = -1 

245 for i in range(len(truncated) - 1, -1, -1): 

246 if truncated[i] in ".!?" and ( 

247 i + 1 >= len(truncated) or truncated[i + 1] in " \n" 

248 ): 

249 last_sentence_end = i + 1 

250 break 

251 

252 # Only use sentence boundary if it preserves at least 80% of content 

253 min_acceptable = int(max_chars * 0.8) 

254 if last_sentence_end > min_acceptable: 

255 return truncated[:last_sentence_end] + "\n[...truncated]" 

256 

257 # Fallback to hard truncation 

258 return truncated + "\n[...truncated]" 

259 

260 def _build_previous_context(self, accumulated_findings: List[str]) -> str: 

261 """Build context block from previously generated sections. 

262 

263 Creates a formatted context block containing content from the last 

264 N sections (defined by self.max_context_sections) with explicit instructions 

265 not to repeat this content. Context is truncated if it exceeds 

266 self.max_context_chars to stay safe for smaller local models. 

267 

268 Args: 

269 accumulated_findings: List of previously generated section content, 

270 each formatted as "[Section > Subsection]\\n{content}" 

271 

272 Returns: 

273 Formatted context block with delimiters, or empty string if no 

274 previous findings exist 

275 """ 

276 if not accumulated_findings: 

277 return "" 

278 

279 recent_findings = accumulated_findings[-self.max_context_sections :] 

280 previous_context = "\n\n---\n\n".join(recent_findings) 

281 

282 # Truncate at sentence boundary if too long 

283 if len(previous_context) > self.max_context_chars: 

284 previous_context = self._truncate_at_sentence_boundary( 

285 previous_context, self.max_context_chars 

286 ) 

287 

288 return ( 

289 f"\n\n=== CONTENT ALREADY WRITTEN (DO NOT REPEAT) ===\n" 

290 f"{previous_context}\n" 

291 f"=== END OF PREVIOUS CONTENT ===\n\n" 

292 f"CRITICAL: The above content has already been written. Do NOT repeat " 

293 f"these points, examples, or explanations. Focus on NEW information " 

294 f"not covered above.\n" 

295 ) 

296 

297 def _research_and_generate_sections( 

298 self, 

299 initial_findings: Dict, 

300 structure: List[Dict], 

301 query: str, 

302 progress_callback=None, 

303 ) -> Dict[str, str]: 

304 """Research and generate content for each section in one step. 

305 

306 This method processes sections sequentially, accumulating generated 

307 content as it goes. For each new section/subsection, it passes context 

308 from the last few previously generated sections to help the LLM avoid 

309 repetition. 

310 

311 The context accumulation mechanism: 

312 - Tracks all generated content in accumulated_findings list 

313 - Before generating each section, builds context from recent findings 

314 - Uses self.max_context_sections (configurable, default: 3) to limit context size 

315 - Truncates context to self.max_context_chars (configurable, default: 4000) for safety 

316 - Includes explicit "DO NOT REPEAT" instructions with actual content 

317 

318 Args: 

319 initial_findings: Results from initial research phase, may contain 

320 questions_by_iteration to preserve search continuity 

321 structure: List of section definitions, each with name and subsections 

322 query: Original user query for context 

323 

324 Returns: 

325 Dict mapping section names to their generated markdown content 

326 """ 

327 sections = {} 

328 

329 # Accumulate content from previous sections to avoid repetition 

330 accumulated_findings: List[str] = [] 

331 

332 # Count total subsections for progress tracking 

333 total_subsections = sum( 

334 max(len(section.get("subsections", [])), 1) for section in structure 

335 ) 

336 completed_subsections = 0 

337 

338 # Preserve questions from initial research to avoid repetition 

339 # This follows the same pattern as citation tracking (all_links_of_system) 

340 existing_questions = initial_findings.get("questions_by_iteration", {}) 

341 if existing_questions: 

342 # Set questions on both search system and its strategy 

343 if hasattr(self.search_system, "questions_by_iteration"): 343 ↛ 349line 343 didn't jump to line 349 because the condition on line 343 was always true

344 self.search_system.questions_by_iteration = ( 

345 existing_questions.copy() 

346 ) 

347 

348 # More importantly, set it on the strategy which actually uses it 

349 if hasattr(self.search_system, "strategy") and hasattr( 349 ↛ 359line 349 didn't jump to line 359 because the condition on line 349 was always true

350 self.search_system.strategy, "questions_by_iteration" 

351 ): 

352 self.search_system.strategy.questions_by_iteration = ( 

353 existing_questions.copy() 

354 ) 

355 logger.info( 

356 f"Initialized strategy with {len(existing_questions)} iterations of previous questions" 

357 ) 

358 

359 for section in structure: 

360 logger.info(f"Processing section: {section['name']}") 

361 section_content = [] 

362 

363 section_content.append(f"# {section['name']}\n") 

364 

365 # If section has no subsections, create one from the section itself 

366 if not section["subsections"]: 

367 # Parse section name for purpose 

368 if "|" in section["name"]: 

369 parts = section["name"].split("|", 1) 

370 section["subsections"] = [ 

371 {"name": parts[0].strip(), "purpose": parts[1].strip()} 

372 ] 

373 else: 

374 # No purpose provided - use section name as subsection 

375 section["subsections"] = [ 

376 { 

377 "name": section["name"], 

378 "purpose": f"Provide comprehensive content for {section['name']}", 

379 } 

380 ] 

381 

382 # Process each subsection by directly researching it 

383 for subsection in section["subsections"]: 

384 # Only add subsection header if there are multiple subsections 

385 if len(section["subsections"]) > 1: 

386 section_content.append(f"## {subsection['name']}\n") 

387 section_content.append(f"_{subsection['purpose']}_\n\n") 

388 

389 # Get other subsections in this section for context 

390 other_subsections = [ 

391 f"- {s['name']}: {s['purpose']}" 

392 for s in section["subsections"] 

393 if s["name"] != subsection["name"] 

394 ] 

395 other_subsections_text = ( 

396 "\n".join(other_subsections) 

397 if other_subsections 

398 else "None" 

399 ) 

400 

401 # Get all other sections for broader context 

402 other_sections = [ 

403 f"- {s['name']}" 

404 for s in structure 

405 if s["name"] != section["name"] 

406 ] 

407 other_sections_text = ( 

408 "\n".join(other_sections) if other_sections else "None" 

409 ) 

410 

411 # Check if this is actually a section-level content (only one subsection, likely auto-created) 

412 is_section_level = len(section["subsections"]) == 1 

413 

414 # Build context from previously generated sections to avoid repetition 

415 previous_context_section = self._build_previous_context( 

416 accumulated_findings 

417 ) 

418 

419 # Generate appropriate search query 

420 if is_section_level: 

421 # Section-level prompt - more comprehensive 

422 subsection_query = ( 

423 f"Research task: Create comprehensive content for the '{subsection['name']}' section in a report about '{query}'. " 

424 f"Section purpose: {subsection['purpose']} " 

425 f"\n" 

426 f"Other sections in the report:\n{other_sections_text}\n" 

427 f"{previous_context_section}" 

428 f"This is a standalone section requiring comprehensive coverage of its topic. " 

429 f"Provide a thorough exploration that may include synthesis of information from previous sections where relevant. " 

430 f"Include unique insights, specific examples, and concrete data. " 

431 f"Use tables to organize information where applicable. " 

432 f"For conclusion sections: synthesize key findings and provide forward-looking insights. " 

433 f"Build upon the research findings from earlier sections to create a cohesive narrative." 

434 ) 

435 else: 

436 # Subsection-level prompt - more focused 

437 subsection_query = ( 

438 f"Research task: Create content for subsection '{subsection['name']}' in a report about '{query}'. " 

439 f"This subsection's purpose: {subsection['purpose']} " 

440 f"Part of section: '{section['name']}' " 

441 f"\n" 

442 f"Other sections in the report:\n{other_sections_text}\n" 

443 f"\n" 

444 f"Other subsections in this section will cover:\n{other_subsections_text}\n" 

445 f"{previous_context_section}" 

446 f"Focus ONLY on information specific to your subsection's purpose. " 

447 f"Include unique details, specific examples, and concrete data. " 

448 f"Use tables to organize information where applicable. " 

449 f"IMPORTANT: Avoid repeating information that would logically be covered in other sections - focus on what makes this subsection unique. " 

450 f"Previous research exists - find specific angles for this subsection." 

451 ) 

452 

453 logger.info( 

454 f"Researching subsection: {subsection['name']} with query: {subsection_query}" 

455 ) 

456 

457 # Report progress and check for cancellation 

458 if progress_callback: 

459 pct = int( 

460 10 

461 + (completed_subsections / max(total_subsections, 1)) 

462 * 80 

463 ) 

464 progress_callback( 

465 f"Researching: {section['name']} > {subsection['name']}", 

466 pct, 

467 { 

468 "phase": "report_section_research", 

469 "subsection": subsection["name"], 

470 }, 

471 ) 

472 

473 # Fix iteration override: modify strategy's settings_snapshot 

474 # which is read dynamically via get_setting() 

475 strategy = self.search_system.strategy 

476 original_iterations = strategy.settings_snapshot.get( 

477 "search.iterations" 

478 ) 

479 had_iterations_key = ( 

480 "search.iterations" in strategy.settings_snapshot 

481 ) 

482 strategy.settings_snapshot["search.iterations"] = 1 

483 # Belt-and-suspenders: also override max_iterations for 

484 # strategies that cache it at __init__ time 

485 original_max_iter = getattr(strategy, "max_iterations", None) 

486 strategy.max_iterations = 1 

487 

488 try: 

489 # Perform search for this subsection 

490 subsection_results = self.search_system.analyze_topic( 

491 subsection_query 

492 ) 

493 finally: 

494 # Restore original iteration settings 

495 if had_iterations_key: 

496 strategy.settings_snapshot["search.iterations"] = ( 

497 original_iterations 

498 ) 

499 else: 

500 strategy.settings_snapshot.pop( 

501 "search.iterations", None 

502 ) 

503 if original_max_iter is not None: 503 ↛ 506line 503 didn't jump to line 506 because the condition on line 503 was always true

504 strategy.max_iterations = original_max_iter 

505 

506 completed_subsections += 1 

507 

508 # Add the researched content for this subsection 

509 if subsection_results.get("current_knowledge"): 

510 generated_content = subsection_results["current_knowledge"] 

511 section_content.append(generated_content) 

512 # Accumulate for context in subsequent sections 

513 accumulated_findings.append( 

514 f"[{section['name']} > {subsection['name']}]\n{generated_content}" 

515 ) 

516 else: 

517 section_content.append( 

518 "*Limited information was found for this subsection.*\n" 

519 ) 

520 

521 section_content.append("\n\n") 

522 

523 # Combine all content for this section 

524 sections[section["name"]] = "\n".join(section_content) 

525 

526 return sections 

527 

528 def _generate_sections( 

529 self, 

530 initial_findings: Dict, 

531 _section_research: Dict[str, List[Dict]], 

532 structure: List[Dict], 

533 query: str, 

534 ) -> Dict[str, str]: 

535 """ 

536 This method is kept for compatibility but no longer used. 

537 The functionality has been moved to _research_and_generate_sections. 

538 """ 

539 return {} 

540 

541 def _format_final_report( 

542 self, 

543 sections: Dict[str, str], 

544 structure: List[Dict], 

545 query: str, 

546 ) -> Dict: 

547 """Format the final report with table of contents and sections.""" 

548 # Generate TOC 

549 toc = ["# Table of Contents\n"] 

550 for i, section in enumerate(structure, 1): 

551 toc.append(f"{i}. **{section['name']}**") 

552 for j, subsection in enumerate(section["subsections"], 1): 

553 toc.append( 

554 f" {i}.{j} {subsection['name']} | _{subsection['purpose']}_" 

555 ) 

556 

557 # Combine TOC and sections 

558 report_parts = ["\n".join(toc), ""] 

559 

560 # Add a summary of the research 

561 report_parts.append("# Research Summary") 

562 report_parts.append( 

563 "This report was researched using an advanced search system." 

564 ) 

565 report_parts.append( 

566 "Research included targeted searches for each section and subsection." 

567 ) 

568 report_parts.append("\n---\n") 

569 

570 # Add each section's content 

571 for section in structure: 

572 if section["name"] in sections: 

573 report_parts.append(sections[section["name"]]) 

574 report_parts.append("") 

575 

576 # Format links from search system 

577 # Get utilities module dynamically to avoid circular imports 

578 utilities = importlib.import_module("local_deep_research.utilities") 

579 formatted_all_links = ( 

580 utilities.search_utilities.format_links_to_markdown( 

581 all_links=self.search_system.all_links_of_system 

582 ) 

583 ) 

584 

585 # Create final report with all parts 

586 final_report_content = "\n\n".join(report_parts) 

587 final_report_content = ( 

588 final_report_content + "\n\n## Sources\n\n" + formatted_all_links 

589 ) 

590 

591 # Create metadata dictionary 

592 metadata = { 

593 "generated_at": datetime.now(UTC).isoformat(), 

594 "initial_sources": len(self.search_system.all_links_of_system), 

595 "sections_researched": len(structure), 

596 "searches_per_section": self.searches_per_section, 

597 "query": query, 

598 } 

599 

600 # Return both content and metadata 

601 return {"content": final_report_content, "metadata": metadata} 

602 

603 def _generate_error_report(self, query: str, error_msg: str) -> str: 

604 return f"=== ERROR REPORT ===\nQuery: {query}\nError: {error_msg}"