Coverage for src/local_deep_research/report_generator.py: 98%

180 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import importlib 

2from typing import Any, Dict, List, Optional 

3from datetime import datetime, UTC 

4 

5from langchain_core.language_models import BaseChatModel 

6from loguru import logger 

7 

8# Fix circular import by importing directly from source modules 

9from .config.llm_config import get_llm 

10from .config.thread_settings import get_setting_from_snapshot 

11from .search_system import AdvancedSearchSystem 

12from .utilities.json_utils import get_llm_response_text 

13 

14# Default constants for context accumulation to avoid repetition 

15# These are used as fallbacks when settings are not available 

16DEFAULT_MAX_CONTEXT_SECTIONS = ( 

17 3 # Number of previous sections to include as context 

18) 

19DEFAULT_MAX_CONTEXT_CHARS = ( 

20 4000 # Max characters for context (safe for smaller local models) 

21) 

22 

23 

24def get_report_generator(search_system=None): 

25 """Return an instance of the report generator with default settings. 

26 

27 Args: 

28 search_system: Optional existing AdvancedSearchSystem to use 

29 """ 

30 return IntegratedReportGenerator(search_system=search_system) 

31 

32 

33class IntegratedReportGenerator: 

34 def __init__( 

35 self, 

36 searches_per_section: int = 2, 

37 search_system=None, 

38 llm: BaseChatModel | None = None, 

39 settings_snapshot: Optional[Dict] = None, 

40 ): 

41 """ 

42 Args: 

43 searches_per_section: Number of searches to perform for each 

44 section in the report. 

45 search_system: Custom search system to use, otherwise just uses 

46 the default. 

47 llm: Custom LLM to use. Required if search_system is not provided. 

48 settings_snapshot: Optional settings snapshot for configurable values. 

49 

50 """ 

51 # If search_system is provided, use its LLM; otherwise use the provided LLM 

52 self._owns_llm = False 

53 if search_system: 

54 self.search_system = search_system 

55 self.model = llm or search_system.model 

56 elif llm: 

57 self.model = llm 

58 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg] 

59 else: 

60 # Fallback for backwards compatibility - will only work with auth 

61 self._owns_llm = True 

62 self.model = get_llm() 

63 self.search_system = AdvancedSearchSystem(llm=self.model) # type: ignore[call-arg] 

64 

65 self.searches_per_section = ( 

66 searches_per_section # Control search depth per section 

67 ) 

68 

69 # Load context settings from snapshot or use defaults 

70 self.max_context_sections = get_setting_from_snapshot( 

71 "report.max_context_sections", 

72 default=DEFAULT_MAX_CONTEXT_SECTIONS, 

73 settings_snapshot=settings_snapshot, 

74 ) 

75 self.max_context_chars = get_setting_from_snapshot( 

76 "report.max_context_chars", 

77 default=DEFAULT_MAX_CONTEXT_CHARS, 

78 settings_snapshot=settings_snapshot, 

79 ) 

80 

81 def close(self) -> None: 

82 """Close the LLM client if this instance created it.""" 

83 from .utilities.resource_utils import safe_close 

84 

85 if self._owns_llm: 

86 safe_close(self.model, "report generator LLM") 

87 

88 def generate_report( 

89 self, 

90 initial_findings: Dict, 

91 query: str, 

92 progress_callback=None, 

93 ) -> Dict: 

94 """Generate a complete research report with section-specific research. 

95 

96 Args: 

97 initial_findings: Results from initial research phase. 

98 query: Original user query. 

99 progress_callback: Optional callable(message, progress_percent, metadata) 

100 for reporting progress (0-100%) and checking cancellation. 

101 """ 

102 

103 # Step 1: Determine structure 

104 if progress_callback: 

105 progress_callback( 

106 "Determining report structure", 

107 0, 

108 {"phase": "report_structure"}, 

109 ) 

110 structure = self._determine_report_structure(initial_findings, query) 

111 

112 # Step 2: Research and generate content for each section in one step 

113 sections = self._research_and_generate_sections( 

114 initial_findings, 

115 structure, 

116 query, 

117 progress_callback=progress_callback, 

118 ) 

119 

120 # Step 3: Format final report 

121 if progress_callback: 

122 progress_callback( 

123 "Formatting final report", 

124 90, 

125 {"phase": "report_formatting"}, 

126 ) 

127 report = self._format_final_report(sections, structure, query) 

128 

129 if progress_callback: 

130 progress_callback( 

131 "Report complete", 100, {"phase": "report_complete"} 

132 ) 

133 

134 return report 

135 

136 def _determine_report_structure( 

137 self, findings: Dict, query: str 

138 ) -> List[Dict]: 

139 """Analyze content and determine optimal report structure.""" 

140 combined_content = findings["current_knowledge"] 

141 prompt = f""" 

142 Analyze this research content about: {query} 

143 

144 Content Summary: 

145 {combined_content[:1000]}... [truncated] 

146 

147 Determine the most appropriate report structure by: 

148 1. Analyzing the type of content (technical, business, academic, etc.) 

149 2. Identifying main themes and logical groupings 

150 3. Considering the depth and breadth of the research 

151 

152 Return a table of contents structure in this exact format: 

153 STRUCTURE 

154 1. [Section Name] 

155 - [Subsection] | [purpose] 

156 2. [Section Name] 

157 - [Subsection] | [purpose] 

158 ... 

159 END_STRUCTURE 

160 

161 Make the structure specific to the content, not generic. 

162 Each subsection must include its purpose after the | symbol. 

163 DO NOT include sections about sources, citations, references, or methodology. 

164 """ 

165 

166 response = get_llm_response_text(self.model.invoke(prompt)) 

167 

168 # Parse the structure 

169 structure: List[Dict[str, Any]] = [] 

170 current_section: Optional[Dict[str, Any]] = None 

171 

172 for line in response.split("\n"): 

173 if line.strip() in ["STRUCTURE", "END_STRUCTURE"]: 

174 continue 

175 

176 if line.strip().startswith(tuple("123456789")): 

177 # Main section — require a dot-delimited name (e.g. "1. Intro"). 

178 parts = line.split(".", 1) 

179 if len(parts) < 2 or not parts[1].strip(): 

180 continue 

181 section_name = parts[1].strip() 

182 current_section = {"name": section_name, "subsections": []} 

183 structure.append(current_section) 

184 elif line.strip().startswith("-") and current_section: 

185 # Subsection with or without purpose 

186 parts = line.strip("- ").split( 

187 "|", 1 

188 ) # Only split on first pipe 

189 if len(parts) == 2: 

190 current_section["subsections"].append( 

191 {"name": parts[0].strip(), "purpose": parts[1].strip()} 

192 ) 

193 elif len(parts) == 1 and parts[0].strip(): 193 ↛ 172line 193 didn't jump to line 172 because the condition on line 193 was always true

194 # Subsection without purpose - add default 

195 current_section["subsections"].append( 

196 { 

197 "name": parts[0].strip(), 

198 "purpose": f"Provide detailed information about {parts[0].strip()}", 

199 } 

200 ) 

201 

202 # Check if the last section is source-related and remove it 

203 if structure: 

204 last_section = structure[-1] 

205 section_name_lower = last_section["name"].lower() 

206 source_keywords = [ 

207 "source", 

208 "citation", 

209 "reference", 

210 "bibliography", 

211 ] 

212 

213 # Only check the last section for source-related content 

214 if any( 

215 keyword in section_name_lower for keyword in source_keywords 

216 ): 

217 logger.info( 

218 f"Removed source-related last section: {last_section['name']}" 

219 ) 

220 structure = structure[:-1] 

221 

222 return structure 

223 

224 def _truncate_at_sentence_boundary(self, text: str, max_chars: int) -> str: 

225 """Truncate text at a sentence boundary to preserve readability. 

226 

227 Attempts to cut at the last sentence-ending punctuation (.!?) before 

228 the limit. If no suitable boundary is found within 80% of the limit, 

229 falls back to hard truncation. 

230 

231 Args: 

232 text: The text to truncate 

233 max_chars: Maximum characters allowed 

234 

235 Returns: 

236 Truncated text with [...truncated] marker if truncation occurred 

237 """ 

238 if len(text) <= max_chars: 

239 return text 

240 

241 truncated = text[:max_chars] 

242 

243 # Look for sentence boundaries (. ! ?) followed by space or newline 

244 # Search backwards from the end for the last complete sentence 

245 last_sentence_end = -1 

246 for i in range(len(truncated) - 1, -1, -1): 

247 if truncated[i] in ".!?" and ( 

248 i + 1 >= len(truncated) or truncated[i + 1] in " \n" 

249 ): 

250 last_sentence_end = i + 1 

251 break 

252 

253 # Only use sentence boundary if it preserves at least 80% of content 

254 min_acceptable = int(max_chars * 0.8) 

255 if last_sentence_end > min_acceptable: 

256 return truncated[:last_sentence_end] + "\n[...truncated]" 

257 

258 # Fallback to hard truncation 

259 return truncated + "\n[...truncated]" 

260 

261 def _build_previous_context(self, accumulated_findings: List[str]) -> str: 

262 """Build context block from previously generated sections. 

263 

264 Creates a formatted context block containing content from the last 

265 N sections (defined by self.max_context_sections) with explicit instructions 

266 not to repeat this content. Context is truncated if it exceeds 

267 self.max_context_chars to stay safe for smaller local models. 

268 

269 Args: 

270 accumulated_findings: List of previously generated section content, 

271 each formatted as "[Section > Subsection]\\n{content}" 

272 

273 Returns: 

274 Formatted context block with delimiters, or empty string if no 

275 previous findings exist 

276 """ 

277 if not accumulated_findings: 

278 return "" 

279 

280 recent_findings = accumulated_findings[-self.max_context_sections :] 

281 previous_context = "\n\n---\n\n".join(recent_findings) 

282 

283 # Truncate at sentence boundary if too long 

284 if len(previous_context) > self.max_context_chars: 

285 previous_context = self._truncate_at_sentence_boundary( 

286 previous_context, self.max_context_chars 

287 ) 

288 

289 return ( 

290 f"\n\n=== CONTENT ALREADY WRITTEN (DO NOT REPEAT) ===\n" 

291 f"{previous_context}\n" 

292 f"=== END OF PREVIOUS CONTENT ===\n\n" 

293 f"CRITICAL: The above content has already been written. Do NOT repeat " 

294 f"these points, examples, or explanations. Focus on NEW information " 

295 f"not covered above.\n" 

296 ) 

297 

298 def _research_and_generate_sections( 

299 self, 

300 initial_findings: Dict, 

301 structure: List[Dict], 

302 query: str, 

303 progress_callback=None, 

304 ) -> Dict[str, str]: 

305 """Research and generate content for each section in one step. 

306 

307 This method processes sections sequentially, accumulating generated 

308 content as it goes. For each new section/subsection, it passes context 

309 from the last few previously generated sections to help the LLM avoid 

310 repetition. 

311 

312 The context accumulation mechanism: 

313 - Tracks all generated content in accumulated_findings list 

314 - Before generating each section, builds context from recent findings 

315 - Uses self.max_context_sections (configurable, default: 3) to limit context size 

316 - Truncates context to self.max_context_chars (configurable, default: 4000) for safety 

317 - Includes explicit "DO NOT REPEAT" instructions with actual content 

318 

319 Args: 

320 initial_findings: Results from initial research phase, may contain 

321 questions_by_iteration to preserve search continuity 

322 structure: List of section definitions, each with name and subsections 

323 query: Original user query for context 

324 

325 Returns: 

326 Dict mapping section names to their generated markdown content 

327 """ 

328 sections = {} 

329 

330 # Accumulate content from previous sections to avoid repetition 

331 accumulated_findings: List[str] = [] 

332 

333 # Count total subsections for progress tracking 

334 total_subsections = sum( 

335 max(len(section.get("subsections", [])), 1) for section in structure 

336 ) 

337 completed_subsections = 0 

338 

339 # Preserve questions from initial research to avoid repetition 

340 # This follows the same pattern as citation tracking (all_links_of_system) 

341 existing_questions = initial_findings.get("questions_by_iteration", {}) 

342 if existing_questions: 

343 # Set questions on both search system and its strategy 

344 if hasattr(self.search_system, "questions_by_iteration"): 344 ↛ 350line 344 didn't jump to line 350 because the condition on line 344 was always true

345 self.search_system.questions_by_iteration = ( 

346 existing_questions.copy() 

347 ) 

348 

349 # More importantly, set it on the strategy which actually uses it 

350 if hasattr(self.search_system, "strategy") and hasattr( 350 ↛ 360line 350 didn't jump to line 360 because the condition on line 350 was always true

351 self.search_system.strategy, "questions_by_iteration" 

352 ): 

353 self.search_system.strategy.questions_by_iteration = ( 

354 existing_questions.copy() 

355 ) 

356 logger.info( 

357 f"Initialized strategy with {len(existing_questions)} iterations of previous questions" 

358 ) 

359 

360 for i, section in enumerate(structure, 1): 

361 logger.info(f"Processing section: {section['name']}") 

362 section_content = [] 

363 

364 section_content.append(f"# {i}. {section['name']}\n") 

365 

366 # If section has no subsections, create one from the section itself 

367 if not section["subsections"]: 

368 # Parse section name for purpose 

369 if "|" in section["name"]: 

370 parts = section["name"].split("|", 1) 

371 section["subsections"] = [ 

372 {"name": parts[0].strip(), "purpose": parts[1].strip()} 

373 ] 

374 else: 

375 # No purpose provided - use section name as subsection 

376 section["subsections"] = [ 

377 { 

378 "name": section["name"], 

379 "purpose": f"Provide comprehensive content for {section['name']}", 

380 } 

381 ] 

382 

383 # Process each subsection by directly researching it 

384 for j, subsection in enumerate(section["subsections"], 1): 

385 # Only add subsection header if there are multiple subsections 

386 if len(section["subsections"]) > 1: 

387 section_content.append(f"## {i}.{j} {subsection['name']}\n") 

388 section_content.append(f"_{subsection['purpose']}_\n\n") 

389 

390 # Get other subsections in this section for context 

391 other_subsections = [ 

392 f"- {s['name']}: {s['purpose']}" 

393 for s in section["subsections"] 

394 if s["name"] != subsection["name"] 

395 ] 

396 other_subsections_text = ( 

397 "\n".join(other_subsections) 

398 if other_subsections 

399 else "None" 

400 ) 

401 

402 # Get all other sections for broader context 

403 other_sections = [ 

404 f"- {s['name']}" 

405 for s in structure 

406 if s["name"] != section["name"] 

407 ] 

408 other_sections_text = ( 

409 "\n".join(other_sections) if other_sections else "None" 

410 ) 

411 

412 # Check if this is actually a section-level content (only one subsection, likely auto-created) 

413 is_section_level = len(section["subsections"]) == 1 

414 

415 # Build context from previously generated sections to avoid repetition 

416 previous_context_section = self._build_previous_context( 

417 accumulated_findings 

418 ) 

419 

420 # Generate appropriate search query 

421 if is_section_level: 

422 # Section-level prompt - more comprehensive 

423 subsection_query = ( 

424 f"Research task: Create comprehensive content for the '{subsection['name']}' section in a report about '{query}'. " 

425 f"Section purpose: {subsection['purpose']} " 

426 f"\n" 

427 f"Other sections in the report:\n{other_sections_text}\n" 

428 f"{previous_context_section}" 

429 f"This is a standalone section requiring comprehensive coverage of its topic. " 

430 f"Provide a thorough exploration that may include synthesis of information from previous sections where relevant. " 

431 f"Include unique insights, specific examples, and concrete data. " 

432 f"Use tables to organize information where applicable. " 

433 f"For conclusion sections: synthesize key findings and provide forward-looking insights. " 

434 f"Build upon the research findings from earlier sections to create a cohesive narrative." 

435 ) 

436 else: 

437 # Subsection-level prompt - more focused 

438 subsection_query = ( 

439 f"Research task: Create content for subsection '{subsection['name']}' in a report about '{query}'. " 

440 f"This subsection's purpose: {subsection['purpose']} " 

441 f"Part of section: '{section['name']}' " 

442 f"\n" 

443 f"Other sections in the report:\n{other_sections_text}\n" 

444 f"\n" 

445 f"Other subsections in this section will cover:\n{other_subsections_text}\n" 

446 f"{previous_context_section}" 

447 f"Focus ONLY on information specific to your subsection's purpose. " 

448 f"Include unique details, specific examples, and concrete data. " 

449 f"Use tables to organize information where applicable. " 

450 f"IMPORTANT: Avoid repeating information that would logically be covered in other sections - focus on what makes this subsection unique. " 

451 f"Previous research exists - find specific angles for this subsection." 

452 ) 

453 

454 logger.info( 

455 f"Researching subsection: {subsection['name']} with query: {subsection_query}" 

456 ) 

457 

458 # Report progress and check for cancellation 

459 if progress_callback: 

460 pct = int( 

461 10 

462 + (completed_subsections / max(total_subsections, 1)) 

463 * 80 

464 ) 

465 progress_callback( 

466 f"Researching: {section['name']} > {subsection['name']}", 

467 pct, 

468 { 

469 "phase": "report_section_research", 

470 "subsection": subsection["name"], 

471 }, 

472 ) 

473 

474 # Fix iteration override: modify strategy's settings_snapshot 

475 # which is read dynamically via get_setting() 

476 strategy = self.search_system.strategy 

477 original_iterations = strategy.settings_snapshot.get( 

478 "search.iterations" 

479 ) 

480 had_iterations_key = ( 

481 "search.iterations" in strategy.settings_snapshot 

482 ) 

483 strategy.settings_snapshot["search.iterations"] = 1 

484 # Belt-and-suspenders: also override max_iterations for 

485 # strategies that cache it at __init__ time 

486 original_max_iter = getattr(strategy, "max_iterations", None) 

487 strategy.max_iterations = 1 

488 

489 try: 

490 # Perform search for this subsection 

491 subsection_results = self.search_system.analyze_topic( 

492 subsection_query 

493 ) 

494 finally: 

495 # Restore original iteration settings 

496 if had_iterations_key: 

497 strategy.settings_snapshot["search.iterations"] = ( 

498 original_iterations 

499 ) 

500 else: 

501 strategy.settings_snapshot.pop( 

502 "search.iterations", None 

503 ) 

504 if original_max_iter is not None: 504 ↛ 507line 504 didn't jump to line 507 because the condition on line 504 was always true

505 strategy.max_iterations = original_max_iter 

506 

507 completed_subsections += 1 

508 

509 # Add the researched content for this subsection 

510 if subsection_results.get("current_knowledge"): 

511 generated_content = subsection_results["current_knowledge"] 

512 section_content.append(generated_content) 

513 # Accumulate for context in subsequent sections 

514 accumulated_findings.append( 

515 f"[{section['name']} > {subsection['name']}]\n{generated_content}" 

516 ) 

517 else: 

518 section_content.append( 

519 "*Limited information was found for this subsection.*\n" 

520 ) 

521 

522 section_content.append("\n\n") 

523 

524 # Combine all content for this section 

525 sections[section["name"]] = "\n".join(section_content) 

526 

527 return sections 

528 

529 def _generate_sections( 

530 self, 

531 initial_findings: Dict, 

532 _section_research: Dict[str, List[Dict]], 

533 structure: List[Dict], 

534 query: str, 

535 ) -> Dict[str, str]: 

536 """ 

537 This method is kept for compatibility but no longer used. 

538 The functionality has been moved to _research_and_generate_sections. 

539 """ 

540 return {} 

541 

542 def _format_final_report( 

543 self, 

544 sections: Dict[str, str], 

545 structure: List[Dict], 

546 query: str, 

547 ) -> Dict: 

548 """Format the final report with table of contents and sections.""" 

549 # Generate TOC 

550 toc = ["# Table of Contents\n"] 

551 for i, section in enumerate(structure, 1): 

552 toc.append(f"{i}. **{section['name']}**") 

553 if len(section["subsections"]) > 1: 

554 for j, subsection in enumerate(section["subsections"], 1): 

555 toc.append( 

556 f" {i}.{j} {subsection['name']} | _{subsection['purpose']}_" 

557 ) 

558 

559 # Combine TOC and sections 

560 report_parts = ["\n".join(toc), ""] 

561 

562 # Add a summary of the research 

563 report_parts.append("# Research Summary") 

564 report_parts.append( 

565 "This report was researched using an advanced search system." 

566 ) 

567 report_parts.append( 

568 "Research included targeted searches for each section and subsection." 

569 ) 

570 report_parts.append("\n---\n") 

571 

572 # Add each section's content 

573 for section in structure: 

574 if section["name"] in sections: 

575 report_parts.append(sections[section["name"]]) 

576 report_parts.append("") 

577 

578 # Format links from search system 

579 # Get utilities module dynamically to avoid circular imports 

580 utilities = importlib.import_module("local_deep_research.utilities") 

581 formatted_all_links = ( 

582 utilities.search_utilities.format_links_to_markdown( 

583 all_links=self.search_system.all_links_of_system 

584 ) 

585 ) 

586 

587 # Create final report with all parts. The Sources tail is 

588 # kept here so in-memory consumers (MCP `generate_report`, 

589 # programmatic API) get the full assembled blob unchanged. 

590 # The DB save site (research_service.py) strips this Sources 

591 # section via format_document_split before persisting, so the 

592 # answer-only invariant on report_content still holds. 

593 final_report_content = "\n\n".join(report_parts) 

594 # Explicit "\n\n" separator: downstream regex consumers 

595 # (_SOURCES_SECTION_PATTERNS in text_optimization/citation_formatter.py 

596 # and _LEGACY_SOURCES_RE in web/services/report_assembly_service.py) 

597 # use line-anchored `re.MULTILINE` matching. Today the trailing 

598 # newlines produced by `"\n\n".join` happen to keep `## Sources` 

599 # at the start of a line, but that is incidental; an explicit 

600 # separator preserves the invariant against future section 

601 # template changes. 

602 final_report_content += "\n\n## Sources\n\n" + formatted_all_links 

603 

604 # Create metadata dictionary 

605 metadata = { 

606 "generated_at": datetime.now(UTC).isoformat(), 

607 "initial_sources": len(self.search_system.all_links_of_system), 

608 "sections_researched": len(structure), 

609 "searches_per_section": self.searches_per_section, 

610 "query": query, 

611 } 

612 

613 # Return both content and metadata 

614 return {"content": final_report_content, "metadata": metadata} 

615 

616 def _generate_error_report(self, query: str, error_msg: str) -> str: 

617 return f"=== ERROR REPORT ===\nQuery: {query}\nError: {error_msg}"