Coverage for src / local_deep_research / web / services / research_service.py: 48%

676 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import hashlib 

2import json 

3import re 

4import threading 

5from datetime import datetime, UTC 

6from pathlib import Path 

7 

8from flask import g, session 

9from loguru import logger 

10 

11from ...config.llm_config import get_llm 

12 

13# Output directory for research results 

14from ...config.paths import get_research_outputs_directory 

15from ...config.search_config import get_search 

16from ...database.models import ResearchHistory, ResearchStrategy 

17from ...database.session_context import get_user_db_session 

18from ...error_handling.report_generator import ErrorReportGenerator 

19from ...utilities.thread_context import set_search_context 

20from ...report_generator import IntegratedReportGenerator 

21from ...search_system import AdvancedSearchSystem 

22from ...text_optimization import CitationFormatter, CitationMode 

23from ...utilities.log_utils import log_for_research 

24from ...utilities.search_utilities import extract_links_from_search_results 

25from ...utilities.threading_utils import thread_context, thread_with_app_context 

26from ..models.database import calculate_duration 

27from .socket_service import SocketIOService 

28 

29OUTPUT_DIR = get_research_outputs_directory() 

30 

31 

32def get_citation_formatter(): 

33 """Get citation formatter with settings from thread context.""" 

34 # Import here to avoid circular imports 

35 from ...config.search_config import get_setting_from_snapshot 

36 

37 citation_format = get_setting_from_snapshot( 

38 "report.citation_format", "number_hyperlinks" 

39 ) 

40 mode_map = { 

41 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

42 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

43 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

44 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

45 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

46 } 

47 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS) 

48 return CitationFormatter(mode=mode) 

49 

50 

51def export_report_to_memory( 

52 markdown_content: str, format: str, title: str = None 

53): 

54 """ 

55 Export a markdown report to different formats in memory. 

56 

57 Args: 

58 markdown_content: The markdown content to export 

59 format: Export format ('latex', 'quarto', 'ris', or 'pdf') 

60 title: Optional title for the document 

61 

62 Returns: 

63 Tuple of (content_bytes, filename, mimetype) 

64 """ 

65 if format == "pdf": 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was never true

66 # Use WeasyPrint for PDF generation 

67 from .pdf_service import get_pdf_service 

68 

69 pdf_service = get_pdf_service() 

70 

71 # Add title as H1 at the top if provided and not already present 

72 if title and not markdown_content.startswith(f"# {title}"): 

73 # Check if the content starts with any H1 

74 if not markdown_content.startswith("#"): 

75 markdown_content = f"# {title}\n\n{markdown_content}" 

76 

77 # Pass the title if provided, but don't add duplicate content 

78 pdf_bytes = pdf_service.markdown_to_pdf( 

79 markdown_content, 

80 title=title, # Use the title from the research record (for HTML metadata) 

81 metadata=None, # Don't add extra metadata section 

82 ) 

83 

84 # Generate a filename based on title or use default 

85 safe_title = ( 

86 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50] 

87 if title 

88 else "research_report" 

89 ) 

90 filename = f"{safe_title}.pdf" 

91 

92 logger.info(f"Generated PDF in memory, size: {len(pdf_bytes)} bytes") 

93 return pdf_bytes, filename, "application/pdf" 

94 

95 elif format == "latex": 

96 from ...text_optimization.citation_formatter import LaTeXExporter 

97 

98 exporter = LaTeXExporter() 

99 exported_content = exporter.export_to_latex(markdown_content) 

100 

101 safe_title = ( 

102 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50] 

103 if title 

104 else "research_report" 

105 ) 

106 filename = f"{safe_title}.tex" 

107 

108 logger.info("Generated LaTeX in memory") 

109 return exported_content.encode("utf-8"), filename, "text/plain" 

110 

111 elif format == "quarto": 

112 import zipfile 

113 import io 

114 from ...text_optimization.citation_formatter import QuartoExporter 

115 

116 exporter = QuartoExporter() 

117 # Extract title from markdown if not provided 

118 if not title: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 title_match = re.search( 

120 r"^#\s+(.+)$", markdown_content, re.MULTILINE 

121 ) 

122 title = title_match.group(1) if title_match else "Research Report" 

123 exported_content = exporter.export_to_quarto(markdown_content, title) 

124 

125 # Generate bibliography 

126 bib_content = exporter._generate_bibliography(markdown_content) 

127 

128 safe_title = ( 

129 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50] 

130 if title 

131 else "research_report" 

132 ) 

133 

134 # Create a zip file in memory containing both files 

135 zip_buffer = io.BytesIO() 

136 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: 

137 # Add the Quarto document 

138 zipf.writestr(f"{safe_title}.qmd", exported_content) 

139 # Add the bibliography file 

140 zipf.writestr("references.bib", bib_content) 

141 

142 zip_bytes = zip_buffer.getvalue() 

143 filename = f"{safe_title}_quarto.zip" 

144 

145 logger.info("Generated Quarto with bibliography in memory (zip)") 

146 return zip_bytes, filename, "application/zip" 

147 

148 elif format == "ris": 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 from ...text_optimization.citation_formatter import RISExporter 

150 

151 exporter = RISExporter() 

152 exported_content = exporter.export_to_ris(markdown_content) 

153 

154 safe_title = ( 

155 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50] 

156 if title 

157 else "research_report" 

158 ) 

159 filename = f"{safe_title}.ris" 

160 

161 logger.info("Generated RIS in memory") 

162 return exported_content.encode("utf-8"), filename, "text/plain" 

163 

164 else: 

165 raise ValueError(f"Unsupported export format: {format}") 

166 

167 

168def save_research_strategy(research_id, strategy_name, username=None): 

169 """ 

170 Save the strategy used for a research to the database. 

171 

172 Args: 

173 research_id: The ID of the research 

174 strategy_name: The name of the strategy used 

175 username: The username for database access (required for thread context) 

176 """ 

177 try: 

178 logger.debug( 

179 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}" 

180 ) 

181 with get_user_db_session(username) as session: 

182 try: 

183 # Check if a strategy already exists for this research 

184 existing_strategy = ( 

185 session.query(ResearchStrategy) 

186 .filter_by(research_id=research_id) 

187 .first() 

188 ) 

189 

190 if existing_strategy: 190 ↛ 192line 190 didn't jump to line 192 because the condition on line 190 was never true

191 # Update existing strategy 

192 existing_strategy.strategy_name = strategy_name 

193 logger.debug( 

194 f"Updating existing strategy for research {research_id}" 

195 ) 

196 else: 

197 # Create new strategy record 

198 new_strategy = ResearchStrategy( 

199 research_id=research_id, strategy_name=strategy_name 

200 ) 

201 session.add(new_strategy) 

202 logger.debug( 

203 f"Creating new strategy record for research {research_id}" 

204 ) 

205 

206 session.commit() 

207 logger.info( 

208 f"Saved strategy '{strategy_name}' for research {research_id}" 

209 ) 

210 finally: 

211 session.close() 

212 except Exception: 

213 logger.exception("Error saving research strategy") 

214 

215 

216def get_research_strategy(research_id, username=None): 

217 """ 

218 Get the strategy used for a research. 

219 

220 Args: 

221 research_id: The ID of the research 

222 username: The username for database access (required for thread context) 

223 

224 Returns: 

225 str: The strategy name or None if not found 

226 """ 

227 try: 

228 with get_user_db_session(username) as session: 

229 try: 

230 strategy = ( 

231 session.query(ResearchStrategy) 

232 .filter_by(research_id=research_id) 

233 .first() 

234 ) 

235 

236 return strategy.strategy_name if strategy else None 

237 finally: 

238 session.close() 

239 except Exception: 

240 logger.exception("Error getting research strategy") 

241 return None 

242 

243 

244def start_research_process( 

245 research_id, 

246 query, 

247 mode, 

248 active_research, 

249 termination_flags, 

250 run_research_callback, 

251 **kwargs, 

252): 

253 """ 

254 Start a research process in a background thread. 

255 

256 Args: 

257 research_id: The ID of the research 

258 query: The research query 

259 mode: The research mode (quick/detailed) 

260 active_research: Dictionary of active research processes 

261 termination_flags: Dictionary of termination flags 

262 run_research_callback: The callback function to run the research 

263 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.) 

264 

265 Returns: 

266 threading.Thread: The thread running the research 

267 """ 

268 # Pass the app context to the thread. 

269 run_research_callback = thread_with_app_context(run_research_callback) 

270 

271 # Start research process in a background thread 

272 thread = threading.Thread( 

273 target=run_research_callback, 

274 args=( 

275 thread_context(), 

276 research_id, 

277 query, 

278 mode, 

279 active_research, 

280 termination_flags, 

281 ), 

282 kwargs=kwargs, 

283 ) 

284 thread.daemon = True 

285 thread.start() 

286 

287 active_research[research_id] = { 

288 "thread": thread, 

289 "progress": 0, 

290 "status": "in_progress", 

291 "log": [], 

292 "settings": kwargs, # Store settings for reference 

293 } 

294 

295 return thread 

296 

297 

298def _generate_report_path(query: str) -> Path: 

299 """ 

300 Generates a path for a new report file based on the query. 

301 

302 Args: 

303 query: The query used for the report. 

304 

305 Returns: 

306 The path that it generated. 

307 

308 """ 

309 # Generate a unique filename that does not contain 

310 # non-alphanumeric characters. 

311 query_hash = hashlib.md5( # DevSkim: ignore DS126858 

312 query.encode("utf-8"), usedforsecurity=False 

313 ).hexdigest()[:10] 

314 return OUTPUT_DIR / ( 

315 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md" 

316 ) 

317 

318 

319@log_for_research 

320def run_research_process( 

321 research_id, query, mode, active_research, termination_flags, **kwargs 

322): 

323 """ 

324 Run the research process in the background for a given research ID. 

325 

326 Args: 

327 research_id: The ID of the research 

328 query: The research query 

329 mode: The research mode (quick/detailed) 

330 active_research: Dictionary of active research processes 

331 termination_flags: Dictionary of termination flags 

332 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.) 

333 MUST include 'username' for database access 

334 """ 

335 

336 # Extract username - required for database access 

337 username = kwargs.get("username") 

338 logger.info(f"Research thread started with username: {username}") 

339 if not username: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 logger.error("No username provided to research thread") 

341 raise ValueError("Username is required for research process") 

342 try: 

343 # Check if this research has been terminated before we even start 

344 if termination_flags.get(research_id): 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 logger.info( 

346 f"Research {research_id} was terminated before starting" 

347 ) 

348 cleanup_research_resources( 

349 research_id, active_research, termination_flags, username 

350 ) 

351 return 

352 

353 logger.info( 

354 f"Starting research process for ID {research_id}, query: {query}" 

355 ) 

356 

357 # Extract key parameters 

358 model_provider = kwargs.get("model_provider") 

359 model = kwargs.get("model") 

360 custom_endpoint = kwargs.get("custom_endpoint") 

361 search_engine = kwargs.get("search_engine") 

362 max_results = kwargs.get("max_results") 

363 time_period = kwargs.get("time_period") 

364 iterations = kwargs.get("iterations") 

365 questions_per_iteration = kwargs.get("questions_per_iteration") 

366 strategy = kwargs.get( 

367 "strategy", "source-based" 

368 ) # Default to source-based 

369 settings_snapshot = kwargs.get( 

370 "settings_snapshot", {} 

371 ) # Complete settings snapshot 

372 

373 # Log settings snapshot to debug 

374 from ...settings.logger import log_settings 

375 

376 log_settings(settings_snapshot, "Settings snapshot received in thread") 

377 

378 # Strategy should already be saved in the database before thread starts 

379 logger.info(f"Research strategy: {strategy}") 

380 

381 # Log all parameters for debugging 

382 logger.info( 

383 f"Research parameters: provider={model_provider}, model={model}, " 

384 f"search_engine={search_engine}, max_results={max_results}, " 

385 f"time_period={time_period}, iterations={iterations}, " 

386 f"questions_per_iteration={questions_per_iteration}, " 

387 f"custom_endpoint={custom_endpoint}, strategy={strategy}" 

388 ) 

389 

390 # Set up the AI Context Manager 

391 output_dir = OUTPUT_DIR / f"research_{research_id}" 

392 output_dir.mkdir(parents=True, exist_ok=True) 

393 

394 # Create a settings context that uses snapshot if available, otherwise falls back to database 

395 # This allows the research to be independent of database during execution 

396 class SettingsContext: 

397 def __init__(self, snapshot, username): 

398 self.snapshot = snapshot or {} 

399 self.username = username 

400 # Extract values from setting objects if needed 

401 self.values = {} 

402 for key, setting in self.snapshot.items(): 

403 if isinstance(setting, dict) and "value" in setting: 403 ↛ 408line 403 didn't jump to line 408 because the condition on line 403 was always true

404 # It's a full setting object, extract the value 

405 self.values[key] = setting["value"] 

406 else: 

407 # It's already just a value 

408 self.values[key] = setting 

409 

410 def get_setting(self, key, default=None): 

411 """Get setting from snapshot only - no database access in threads""" 

412 if key in self.values: 

413 return self.values[key] 

414 # No fallback to database - threads must use snapshot only 

415 logger.debug( 

416 f"Setting '{key}' not found in snapshot, using default" 

417 ) 

418 return default 

419 

420 settings_context = SettingsContext(settings_snapshot, username) 

421 

422 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var 

423 from ...settings.logger import log_settings 

424 

425 log_settings( 

426 settings_context.values, "SettingsContext values extracted" 

427 ) 

428 

429 # Set the settings context for this thread 

430 from ...config.thread_settings import set_settings_context 

431 

432 set_settings_context(settings_context) 

433 

434 # Get user password if provided 

435 user_password = kwargs.get("user_password") 

436 

437 # Create shared research context that can be updated during research 

438 shared_research_context = { 

439 "research_id": research_id, 

440 "research_query": query, 

441 "research_mode": mode, 

442 "research_phase": "init", 

443 "search_iteration": 0, 

444 "search_engines_planned": None, 

445 "search_engine_selected": search_engine, 

446 "username": username, # Add username for queue operations 

447 "user_password": user_password, # Add password for metrics access 

448 } 

449 

450 # If this is a follow-up research, include the parent context 

451 if "research_context" in kwargs and kwargs["research_context"]: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 logger.info( 

453 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings" 

454 ) 

455 shared_research_context.update(kwargs["research_context"]) 

456 

457 # Do not log context keys as they may contain sensitive information 

458 logger.info(f"Created shared_research_context for user: {username}") 

459 

460 # Set search context for search tracking 

461 set_search_context(shared_research_context) 

462 

463 # Set up progress callback 

464 def progress_callback(message, progress_percent, metadata): 

465 # Frequent termination check 

466 if termination_flags.get(research_id): 

467 handle_termination( 

468 research_id, active_research, termination_flags, username 

469 ) 

470 raise Exception("Research was terminated by user") 

471 

472 # Bind research_id to logger for this specific log 

473 bound_logger = logger.bind(research_id=research_id) 

474 bound_logger.log("MILESTONE", message) 

475 

476 if "SEARCH_PLAN:" in message: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 engines = message.split("SEARCH_PLAN:")[1].strip() 

478 metadata["planned_engines"] = engines 

479 metadata["phase"] = "search_planning" # Use existing phase 

480 # Update shared context for token tracking 

481 shared_research_context["search_engines_planned"] = engines 

482 shared_research_context["research_phase"] = "search_planning" 

483 

484 if "ENGINE_SELECTED:" in message: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 engine = message.split("ENGINE_SELECTED:")[1].strip() 

486 metadata["selected_engine"] = engine 

487 metadata["phase"] = "search" # Use existing 'search' phase 

488 # Update shared context for token tracking 

489 shared_research_context["search_engine_selected"] = engine 

490 shared_research_context["research_phase"] = "search" 

491 

492 # Capture other research phases for better context tracking 

493 if metadata.get("phase"): 493 ↛ 497line 493 didn't jump to line 497 because the condition on line 493 was always true

494 shared_research_context["research_phase"] = metadata["phase"] 

495 

496 # Update search iteration if available 

497 if "iteration" in metadata: 

498 shared_research_context["search_iteration"] = metadata[ 

499 "iteration" 

500 ] 

501 

502 # Adjust progress based on research mode 

503 adjusted_progress = progress_percent 

504 if ( 504 ↛ 509line 504 didn't jump to line 509 because the condition on line 504 was never true

505 mode == "detailed" 

506 and metadata.get("phase") == "output_generation" 

507 ): 

508 # For detailed mode, adjust the progress range for output generation 

509 adjusted_progress = min(80, progress_percent) 

510 elif ( 510 ↛ 515line 510 didn't jump to line 515 because the condition on line 510 was never true

511 mode == "detailed" 

512 and metadata.get("phase") == "report_generation" 

513 ): 

514 # Scale the progress from 80% to 95% for the report generation phase 

515 if progress_percent is not None: 

516 normalized = progress_percent / 100 

517 adjusted_progress = 80 + (normalized * 15) 

518 elif ( 

519 mode == "quick" and metadata.get("phase") == "output_generation" 

520 ): 

521 # For quick mode, ensure we're at least at 85% during output generation 

522 adjusted_progress = max(85, progress_percent) 

523 # Map any further progress within output_generation to 85-95% range 

524 if progress_percent is not None and progress_percent > 0: 524 ↛ 529line 524 didn't jump to line 529 because the condition on line 524 was always true

525 normalized = progress_percent / 100 

526 adjusted_progress = 85 + (normalized * 10) 

527 

528 # Don't let progress go backwards 

529 if research_id in active_research and adjusted_progress is not None: 

530 current_progress = active_research[research_id].get( 

531 "progress", 0 

532 ) 

533 adjusted_progress = max(current_progress, adjusted_progress) 

534 

535 # Update active research record 

536 if research_id in active_research: 

537 if adjusted_progress is not None: 

538 active_research[research_id]["progress"] = adjusted_progress 

539 

540 # Queue the progress update to be processed in main thread 

541 if adjusted_progress is not None: 

542 from ..queue.processor_v2 import queue_processor 

543 

544 if username: 544 ↛ 549line 544 didn't jump to line 549 because the condition on line 544 was always true

545 queue_processor.queue_progress_update( 

546 username, research_id, adjusted_progress 

547 ) 

548 else: 

549 logger.warning( 

550 f"Cannot queue progress update for research {research_id} - no username available" 

551 ) 

552 

553 # Emit a socket event 

554 try: 

555 # Basic event data 

556 event_data = {"progress": adjusted_progress} 

557 

558 SocketIOService().emit_to_subscribers( 

559 "progress", research_id, event_data 

560 ) 

561 except Exception: 

562 logger.exception("Socket emit error (non-critical)") 

563 

564 # Function to check termination during long-running operations 

565 def check_termination(): 

566 if termination_flags.get(research_id): 

567 handle_termination( 

568 research_id, active_research, termination_flags, username 

569 ) 

570 raise Exception( 

571 "Research was terminated by user during long-running operation" 

572 ) 

573 return False # Not terminated 

574 

575 # Configure the system with the specified parameters 

576 use_llm = None 

577 if model or search_engine or model_provider: 577 ↛ 584line 577 didn't jump to line 584 because the condition on line 577 was always true

578 # Log that we're overriding system settings 

579 logger.info( 

580 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}" 

581 ) 

582 

583 # Override LLM if model or model_provider specified 

584 if model or model_provider: 584 ↛ 605line 584 didn't jump to line 605 because the condition on line 584 was always true

585 try: 

586 # Get LLM with the overridden settings 

587 # Use the shared_research_context which includes username 

588 use_llm = get_llm( 

589 model_name=model, 

590 provider=model_provider, 

591 openai_endpoint_url=custom_endpoint, 

592 research_id=research_id, 

593 research_context=shared_research_context, 

594 ) 

595 

596 logger.info( 

597 f"Successfully set LLM to: provider={model_provider}, model={model}" 

598 ) 

599 except Exception: 

600 logger.exception( 

601 f"Error setting LLM provider={model_provider}, model={model}" 

602 ) 

603 

604 # Create search engine first if specified, to avoid default creation without username 

605 use_search = None 

606 if search_engine: 606 ↛ 624line 606 didn't jump to line 624 because the condition on line 606 was always true

607 try: 

608 # Create a new search object with these settings 

609 use_search = get_search( 

610 search_tool=search_engine, 

611 llm_instance=use_llm, 

612 username=username, 

613 settings_snapshot=settings_snapshot, 

614 ) 

615 logger.info( 

616 f"Successfully created search engine: {search_engine}" 

617 ) 

618 except Exception: 

619 logger.exception( 

620 f"Error creating search engine {search_engine}" 

621 ) 

622 

623 # Set the progress callback in the system 

624 system = AdvancedSearchSystem( 

625 llm=use_llm, 

626 search=use_search, 

627 strategy_name=strategy, 

628 max_iterations=iterations, 

629 questions_per_iteration=questions_per_iteration, 

630 username=username, 

631 settings_snapshot=settings_snapshot, 

632 research_id=research_id, 

633 research_context=shared_research_context, 

634 ) 

635 system.set_progress_callback(progress_callback) 

636 

637 # Run the search 

638 progress_callback("Starting research process", 5, {"phase": "init"}) 

639 

640 try: 

641 results = system.analyze_topic(query) 

642 if mode == "quick": 642 ↛ 649line 642 didn't jump to line 649 because the condition on line 642 was always true

643 progress_callback( 

644 "Search complete, preparing to generate summary...", 

645 85, 

646 {"phase": "output_generation"}, 

647 ) 

648 else: 

649 progress_callback( 

650 "Search complete, generating output", 

651 80, 

652 {"phase": "output_generation"}, 

653 ) 

654 except Exception as search_error: 

655 # Better handling of specific search errors 

656 error_message = str(search_error) 

657 error_type = "unknown" 

658 

659 # Extract error details for common issues 

660 if "status code: 503" in error_message: 

661 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system." 

662 error_type = "ollama_unavailable" 

663 elif "status code: 404" in error_message: 

664 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model." 

665 error_type = "model_not_found" 

666 elif "status code:" in error_message: 

667 # Extract the status code for other HTTP errors 

668 status_code = error_message.split("status code:")[1].strip() 

669 error_message = f"API request failed with status code {status_code}. Please check your configuration." 

670 error_type = "api_error" 

671 elif "connection" in error_message.lower(): 

672 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible." 

673 error_type = "connection_error" 

674 

675 # Raise with improved error message 

676 raise Exception(f"{error_message} (Error type: {error_type})") 

677 

678 # Generate output based on mode 

679 if mode == "quick": 679 ↛ 1181line 679 didn't jump to line 1181 because the condition on line 679 was always true

680 # Quick Summary 

681 if results.get("findings") or results.get("formatted_findings"): 681 ↛ 1176line 681 didn't jump to line 1176 because the condition on line 681 was always true

682 raw_formatted_findings = results["formatted_findings"] 

683 

684 # Check if formatted_findings contains an error message 

685 if isinstance( 685 ↛ 688line 685 didn't jump to line 688 because the condition on line 685 was never true

686 raw_formatted_findings, str 

687 ) and raw_formatted_findings.startswith("Error:"): 

688 logger.exception( 

689 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..." 

690 ) 

691 

692 # Determine error type for better user feedback 

693 error_type = "unknown" 

694 error_message = raw_formatted_findings.lower() 

695 

696 if ( 

697 "token limit" in error_message 

698 or "context length" in error_message 

699 ): 

700 error_type = "token_limit" 

701 # Log specific error type 

702 logger.warning( 

703 "Detected token limit error in synthesis" 

704 ) 

705 

706 # Update progress with specific error type 

707 progress_callback( 

708 "Synthesis hit token limits. Attempting fallback...", 

709 87, 

710 { 

711 "phase": "synthesis_error", 

712 "error_type": error_type, 

713 }, 

714 ) 

715 elif ( 

716 "timeout" in error_message 

717 or "timed out" in error_message 

718 ): 

719 error_type = "timeout" 

720 logger.warning("Detected timeout error in synthesis") 

721 progress_callback( 

722 "Synthesis timed out. Attempting fallback...", 

723 87, 

724 { 

725 "phase": "synthesis_error", 

726 "error_type": error_type, 

727 }, 

728 ) 

729 elif "rate limit" in error_message: 

730 error_type = "rate_limit" 

731 logger.warning("Detected rate limit error in synthesis") 

732 progress_callback( 

733 "LLM rate limit reached. Attempting fallback...", 

734 87, 

735 { 

736 "phase": "synthesis_error", 

737 "error_type": error_type, 

738 }, 

739 ) 

740 elif ( 

741 "connection" in error_message 

742 or "network" in error_message 

743 ): 

744 error_type = "connection" 

745 logger.warning("Detected connection error in synthesis") 

746 progress_callback( 

747 "Connection issue with LLM. Attempting fallback...", 

748 87, 

749 { 

750 "phase": "synthesis_error", 

751 "error_type": error_type, 

752 }, 

753 ) 

754 elif ( 

755 "llm error" in error_message 

756 or "final answer synthesis fail" in error_message 

757 ): 

758 error_type = "llm_error" 

759 logger.warning( 

760 "Detected general LLM error in synthesis" 

761 ) 

762 progress_callback( 

763 "LLM error during synthesis. Attempting fallback...", 

764 87, 

765 { 

766 "phase": "synthesis_error", 

767 "error_type": error_type, 

768 }, 

769 ) 

770 else: 

771 # Generic error 

772 logger.warning("Detected unknown error in synthesis") 

773 progress_callback( 

774 "Error during synthesis. Attempting fallback...", 

775 87, 

776 { 

777 "phase": "synthesis_error", 

778 "error_type": "unknown", 

779 }, 

780 ) 

781 

782 # Extract synthesized content from findings if available 

783 synthesized_content = "" 

784 for finding in results.get("findings", []): 

785 if finding.get("phase") == "Final synthesis": 

786 synthesized_content = finding.get("content", "") 

787 break 

788 

789 # Use synthesized content as fallback 

790 if ( 

791 synthesized_content 

792 and not synthesized_content.startswith("Error:") 

793 ): 

794 logger.info( 

795 "Using existing synthesized content as fallback" 

796 ) 

797 raw_formatted_findings = synthesized_content 

798 

799 # Or use current_knowledge as another fallback 

800 elif results.get("current_knowledge"): 

801 logger.info("Using current_knowledge as fallback") 

802 raw_formatted_findings = results["current_knowledge"] 

803 

804 # Or combine all finding contents as last resort 

805 elif results.get("findings"): 

806 logger.info("Combining all findings as fallback") 

807 # First try to use any findings that are not errors 

808 valid_findings = [ 

809 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

810 for finding in results.get("findings", []) 

811 if finding.get("content") 

812 and not finding.get("content", "").startswith( 

813 "Error:" 

814 ) 

815 ] 

816 

817 if valid_findings: 

818 raw_formatted_findings = ( 

819 "# Research Results (Fallback Mode)\n\n" 

820 ) 

821 raw_formatted_findings += "\n\n".join( 

822 valid_findings 

823 ) 

824 raw_formatted_findings += f"\n\n## Error Information\n{raw_formatted_findings}" 

825 else: 

826 # Last resort: use everything including errors 

827 raw_formatted_findings = ( 

828 "# Research Results (Emergency Fallback)\n\n" 

829 ) 

830 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n" 

831 raw_formatted_findings += "\n\n".join( 

832 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

833 for finding in results.get("findings", []) 

834 if finding.get("content") 

835 ) 

836 

837 progress_callback( 

838 f"Using fallback synthesis due to {error_type} error", 

839 88, 

840 { 

841 "phase": "synthesis_fallback", 

842 "error_type": error_type, 

843 }, 

844 ) 

845 

846 logger.info( 

847 "Found formatted_findings of length: %s", 

848 len(str(raw_formatted_findings)), 

849 ) 

850 

851 try: 

852 # Check if we have an error in the findings and use enhanced error handling 

853 if isinstance( 853 ↛ 856line 853 didn't jump to line 856 because the condition on line 853 was never true

854 raw_formatted_findings, str 

855 ) and raw_formatted_findings.startswith("Error:"): 

856 logger.info( 

857 "Generating enhanced error report using ErrorReportGenerator" 

858 ) 

859 

860 # Get LLM for error explanation if available 

861 try: 

862 llm = get_llm( 

863 research_id=research_id, 

864 research_context=shared_research_context, 

865 ) 

866 except Exception: 

867 llm = None 

868 logger.warning( 

869 "Could not get LLM for error explanation" 

870 ) 

871 

872 # Generate comprehensive error report 

873 error_generator = ErrorReportGenerator(llm) 

874 clean_markdown = error_generator.generate_error_report( 

875 error_message=raw_formatted_findings, 

876 query=query, 

877 partial_results=results, 

878 search_iterations=results.get("iterations", 0), 

879 research_id=research_id, 

880 ) 

881 

882 logger.info( 

883 "Generated enhanced error report with %d characters", 

884 len(clean_markdown), 

885 ) 

886 else: 

887 # Get the synthesized content from the LLM directly 

888 clean_markdown = raw_formatted_findings 

889 

890 # Extract all sources from findings to add them to the summary 

891 all_links = [] 

892 for finding in results.get("findings", []): 

893 search_results = finding.get("search_results", []) 

894 if search_results: 

895 try: 

896 links = extract_links_from_search_results( 

897 search_results 

898 ) 

899 all_links.extend(links) 

900 except Exception: 

901 logger.exception( 

902 "Error processing search results/links" 

903 ) 

904 

905 logger.info( 

906 "Successfully converted to clean markdown of length: %s", 

907 len(clean_markdown), 

908 ) 

909 

910 # First send a progress update for generating the summary 

911 progress_callback( 

912 "Generating clean summary from research data...", 

913 90, 

914 {"phase": "output_generation"}, 

915 ) 

916 

917 # Send progress update for saving report 

918 progress_callback( 

919 "Saving research report to database...", 

920 95, 

921 {"phase": "report_complete"}, 

922 ) 

923 

924 # Format citations in the markdown content 

925 formatter = get_citation_formatter() 

926 formatted_content = formatter.format_document( 

927 clean_markdown 

928 ) 

929 

930 # Prepare complete report content 

931 full_report_content = f"""{formatted_content} 

932 

933## Research Metrics 

934- Search Iterations: {results["iterations"]} 

935- Generated at: {datetime.now(UTC).isoformat()} 

936""" 

937 

938 # Save sources to database 

939 from .research_sources_service import ResearchSourcesService 

940 

941 sources_service = ResearchSourcesService() 

942 if all_links: 

943 logger.info( 

944 f"Quick summary: Saving {len(all_links)} sources to database" 

945 ) 

946 sources_saved = sources_service.save_research_sources( 

947 research_id=research_id, 

948 sources=all_links, 

949 username=username, 

950 ) 

951 logger.info( 

952 f"Quick summary: Saved {sources_saved} sources for research {research_id}" 

953 ) 

954 

955 # Save report using storage abstraction 

956 from ...storage import get_report_storage 

957 

958 with get_user_db_session(username) as db_session: 

959 storage = get_report_storage(session=db_session) 

960 

961 # Prepare metadata 

962 metadata = { 

963 "iterations": results["iterations"], 

964 "generated_at": datetime.now(UTC).isoformat(), 

965 } 

966 

967 # Save report using storage abstraction 

968 success = storage.save_report( 

969 research_id=research_id, 

970 content=full_report_content, 

971 metadata=metadata, 

972 username=username, 

973 ) 

974 

975 if not success: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true

976 raise Exception("Failed to save research report") 

977 

978 logger.info( 

979 f"Report saved for research_id: {research_id}" 

980 ) 

981 

982 # Skip export to additional formats - we're storing in database only 

983 

984 # Update research status in database 

985 completed_at = datetime.now(UTC).isoformat() 

986 

987 with get_user_db_session(username) as db_session: 

988 research = ( 

989 db_session.query(ResearchHistory) 

990 .filter_by(id=research_id) 

991 .first() 

992 ) 

993 

994 # Preserve existing metadata and update with new values 

995 logger.info( 

996 f"Existing research_meta type: {type(research.research_meta)}" 

997 ) 

998 

999 # Handle both dict and string types for research_meta 

1000 if isinstance(research.research_meta, dict): 1000 ↛ 1002line 1000 didn't jump to line 1002 because the condition on line 1000 was always true

1001 metadata = dict(research.research_meta) 

1002 elif isinstance(research.research_meta, str): 

1003 try: 

1004 metadata = json.loads(research.research_meta) 

1005 except json.JSONDecodeError: 

1006 logger.exception( 

1007 f"Failed to parse research_meta as JSON: {research.research_meta}" 

1008 ) 

1009 metadata = {} 

1010 else: 

1011 metadata = {} 

1012 

1013 metadata.update( 

1014 { 

1015 "iterations": results["iterations"], 

1016 "generated_at": datetime.now(UTC).isoformat(), 

1017 } 

1018 ) 

1019 

1020 # Use the helper function for consistent duration calculation 

1021 duration_seconds = calculate_duration( 

1022 research.created_at, completed_at 

1023 ) 

1024 

1025 research.status = "completed" 

1026 research.completed_at = completed_at 

1027 research.duration_seconds = duration_seconds 

1028 # Note: report_content is saved by CachedResearchService 

1029 # report_path is not used in encrypted database version 

1030 

1031 # Generate headline and topics only for news searches 

1032 if ( 1032 ↛ 1036line 1032 didn't jump to line 1036 because the condition on line 1032 was never true

1033 metadata.get("is_news_search") 

1034 or metadata.get("search_type") == "news_analysis" 

1035 ): 

1036 try: 

1037 from ...news.utils.headline_generator import ( 

1038 generate_headline, 

1039 ) 

1040 from ...news.utils.topic_generator import ( 

1041 generate_topics, 

1042 ) 

1043 

1044 # Get the report content from database for better headline/topic generation 

1045 report_content = "" 

1046 try: 

1047 research = ( 

1048 db_session.query(ResearchHistory) 

1049 .filter_by(id=research_id) 

1050 .first() 

1051 ) 

1052 if research and research.report_content: 

1053 report_content = research.report_content 

1054 logger.info( 

1055 f"Retrieved {len(report_content)} chars from database for headline generation" 

1056 ) 

1057 else: 

1058 logger.warning( 

1059 f"No report content found in database for research_id: {research_id}" 

1060 ) 

1061 except Exception as e: 

1062 logger.warning( 

1063 f"Could not retrieve report content from database: {e}" 

1064 ) 

1065 

1066 # Generate headline 

1067 logger.info( 

1068 f"Generating headline for query: {query[:100]}" 

1069 ) 

1070 headline = generate_headline( 

1071 query, report_content 

1072 ) 

1073 metadata["generated_headline"] = headline 

1074 

1075 # Generate topics 

1076 logger.info( 

1077 f"Generating topics with category: {metadata.get('category', 'News')}" 

1078 ) 

1079 topics = generate_topics( 

1080 query=query, 

1081 findings=report_content, 

1082 category=metadata.get("category", "News"), 

1083 max_topics=6, 

1084 ) 

1085 metadata["generated_topics"] = topics 

1086 

1087 logger.info(f"Generated headline: {headline}") 

1088 logger.info(f"Generated topics: {topics}") 

1089 

1090 except Exception as e: 

1091 logger.warning( 

1092 f"Could not generate headline/topics: {e}" 

1093 ) 

1094 

1095 research.research_meta = metadata 

1096 

1097 db_session.commit() 

1098 logger.info( 

1099 f"Database commit completed for research_id: {research_id}" 

1100 ) 

1101 

1102 # Update subscription if this was triggered by a subscription 

1103 if metadata.get("subscription_id"): 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true

1104 try: 

1105 from ...news.subscription_manager.storage import ( 

1106 SQLSubscriptionStorage, 

1107 ) 

1108 from datetime import ( 

1109 datetime as dt, 

1110 timezone, 

1111 timedelta, 

1112 ) 

1113 

1114 sub_storage = SQLSubscriptionStorage() 

1115 subscription_id = metadata["subscription_id"] 

1116 

1117 # Get subscription to find refresh interval 

1118 subscription = sub_storage.get(subscription_id) 

1119 if subscription: 

1120 refresh_minutes = subscription.get( 

1121 "refresh_minutes", 240 

1122 ) 

1123 now = dt.now(timezone.utc) 

1124 next_refresh = now + timedelta( 

1125 minutes=refresh_minutes 

1126 ) 

1127 

1128 # Update refresh times 

1129 sub_storage.update_refresh_time( 

1130 subscription_id=subscription_id, 

1131 last_refresh=now, 

1132 next_refresh=next_refresh, 

1133 ) 

1134 

1135 # Increment stats 

1136 sub_storage.increment_stats( 

1137 subscription_id, 1 

1138 ) 

1139 

1140 logger.info( 

1141 f"Updated subscription {subscription_id} refresh times" 

1142 ) 

1143 except Exception as e: 

1144 logger.warning( 

1145 f"Could not update subscription refresh time: {e}" 

1146 ) 

1147 

1148 logger.info( 

1149 f"Database updated successfully for research_id: {research_id}" 

1150 ) 

1151 

1152 # Send the final completion message 

1153 progress_callback( 

1154 "Research completed successfully", 

1155 100, 

1156 {"phase": "complete"}, 

1157 ) 

1158 

1159 # Clean up resources 

1160 logger.info( 

1161 "Cleaning up resources for research_id: %s", research_id 

1162 ) 

1163 cleanup_research_resources( 

1164 research_id, active_research, termination_flags 

1165 ) 

1166 logger.info( 

1167 "Resources cleaned up for research_id: %s", research_id 

1168 ) 

1169 

1170 except Exception as inner_e: 

1171 logger.exception("Error during quick summary generation") 

1172 raise Exception( 

1173 f"Error generating quick summary: {inner_e!s}" 

1174 ) 

1175 else: 

1176 raise Exception( 

1177 "No research findings were generated. Please try again." 

1178 ) 

1179 else: 

1180 # Full Report 

1181 progress_callback( 

1182 "Generating detailed report...", 

1183 85, 

1184 {"phase": "report_generation"}, 

1185 ) 

1186 

1187 # Extract the search system from the results if available 

1188 search_system = results.get("search_system", None) 

1189 

1190 # Pass the existing search system to maintain citation indices 

1191 report_generator = IntegratedReportGenerator( 

1192 search_system=search_system 

1193 ) 

1194 final_report = report_generator.generate_report(results, query) 

1195 

1196 progress_callback( 

1197 "Report generation complete", 95, {"phase": "report_complete"} 

1198 ) 

1199 

1200 # Format citations in the report content 

1201 formatter = get_citation_formatter() 

1202 formatted_content = formatter.format_document( 

1203 final_report["content"] 

1204 ) 

1205 

1206 # Save sources to database 

1207 from .research_sources_service import ResearchSourcesService 

1208 

1209 sources_service = ResearchSourcesService() 

1210 if ( 

1211 hasattr(search_system, "all_links_of_system") 

1212 and search_system.all_links_of_system 

1213 ): 

1214 logger.info( 

1215 f"Saving {len(search_system.all_links_of_system)} sources to database" 

1216 ) 

1217 sources_saved = sources_service.save_research_sources( 

1218 research_id=research_id, 

1219 sources=search_system.all_links_of_system, 

1220 username=username, 

1221 ) 

1222 logger.info( 

1223 f"Saved {sources_saved} sources for research {research_id}" 

1224 ) 

1225 

1226 # Save report to database 

1227 with get_user_db_session(username) as db_session: 

1228 # Update metadata 

1229 metadata = final_report["metadata"] 

1230 metadata["iterations"] = results["iterations"] 

1231 

1232 # Save report to database 

1233 try: 

1234 research = ( 

1235 db_session.query(ResearchHistory) 

1236 .filter_by(id=research_id) 

1237 .first() 

1238 ) 

1239 

1240 if not research: 

1241 logger.error(f"Research {research_id} not found") 

1242 success = False 

1243 else: 

1244 research.report_content = formatted_content 

1245 if research.research_meta: 

1246 research.research_meta.update(metadata) 

1247 else: 

1248 research.research_meta = metadata 

1249 db_session.commit() 

1250 success = True 

1251 logger.info( 

1252 f"Saved report for research {research_id} to database" 

1253 ) 

1254 except Exception: 

1255 logger.exception("Error saving report to database") 

1256 db_session.rollback() 

1257 success = False 

1258 

1259 if not success: 

1260 raise Exception("Failed to save research report") 

1261 

1262 logger.info( 

1263 f"Report saved to database for research_id: {research_id}" 

1264 ) 

1265 

1266 # Update research status in database 

1267 completed_at = datetime.now(UTC).isoformat() 

1268 

1269 with get_user_db_session(username) as db_session: 

1270 research = ( 

1271 db_session.query(ResearchHistory) 

1272 .filter_by(id=research_id) 

1273 .first() 

1274 ) 

1275 

1276 # Preserve existing metadata and merge with report metadata 

1277 logger.info( 

1278 f"Full report - Existing research_meta type: {type(research.research_meta)}" 

1279 ) 

1280 

1281 # Handle both dict and string types for research_meta 

1282 if isinstance(research.research_meta, dict): 

1283 metadata = dict(research.research_meta) 

1284 elif isinstance(research.research_meta, str): 

1285 try: 

1286 metadata = json.loads(research.research_meta) 

1287 except json.JSONDecodeError: 

1288 logger.exception( 

1289 f"Failed to parse research_meta as JSON: {research.research_meta}" 

1290 ) 

1291 metadata = {} 

1292 else: 

1293 metadata = {} 

1294 

1295 metadata.update(final_report["metadata"]) 

1296 metadata["iterations"] = results["iterations"] 

1297 

1298 # Use the helper function for consistent duration calculation 

1299 duration_seconds = calculate_duration( 

1300 research.created_at, completed_at 

1301 ) 

1302 

1303 research.status = "completed" 

1304 research.completed_at = completed_at 

1305 research.duration_seconds = duration_seconds 

1306 # Note: report_content is saved by CachedResearchService 

1307 # report_path is not used in encrypted database version 

1308 

1309 # Generate headline and topics only for news searches 

1310 if ( 

1311 metadata.get("is_news_search") 

1312 or metadata.get("search_type") == "news_analysis" 

1313 ): 

1314 try: 

1315 from ..news.utils.headline_generator import ( 

1316 generate_headline, 

1317 ) 

1318 from ..news.utils.topic_generator import ( 

1319 generate_topics, 

1320 ) 

1321 

1322 # Get the report content from database for better headline/topic generation 

1323 report_content = "" 

1324 try: 

1325 research = ( 

1326 db_session.query(ResearchHistory) 

1327 .filter_by(id=research_id) 

1328 .first() 

1329 ) 

1330 if research and research.report_content: 

1331 report_content = research.report_content 

1332 else: 

1333 logger.warning( 

1334 f"No report content found in database for research_id: {research_id}" 

1335 ) 

1336 except Exception as e: 

1337 logger.warning( 

1338 f"Could not retrieve report content from database: {e}" 

1339 ) 

1340 

1341 # Generate headline 

1342 headline = generate_headline(query, report_content) 

1343 metadata["generated_headline"] = headline 

1344 

1345 # Generate topics 

1346 topics = generate_topics( 

1347 query=query, 

1348 findings=report_content, 

1349 category=metadata.get("category", "News"), 

1350 max_topics=6, 

1351 ) 

1352 metadata["generated_topics"] = topics 

1353 

1354 logger.info(f"Generated headline: {headline}") 

1355 logger.info(f"Generated topics: {topics}") 

1356 

1357 except Exception as e: 

1358 logger.warning( 

1359 f"Could not generate headline/topics: {e}" 

1360 ) 

1361 

1362 research.research_meta = metadata 

1363 

1364 db_session.commit() 

1365 

1366 # Update subscription if this was triggered by a subscription 

1367 if metadata.get("subscription_id"): 

1368 try: 

1369 from ...news.subscription_manager.storage import ( 

1370 SQLSubscriptionStorage, 

1371 ) 

1372 from datetime import datetime as dt, timezone, timedelta 

1373 

1374 sub_storage = SQLSubscriptionStorage() 

1375 subscription_id = metadata["subscription_id"] 

1376 

1377 # Get subscription to find refresh interval 

1378 subscription = sub_storage.get(subscription_id) 

1379 if subscription: 

1380 refresh_minutes = subscription.get( 

1381 "refresh_minutes", 240 

1382 ) 

1383 now = dt.now(timezone.utc) 

1384 next_refresh = now + timedelta( 

1385 minutes=refresh_minutes 

1386 ) 

1387 

1388 # Update refresh times 

1389 sub_storage.update_refresh_time( 

1390 subscription_id=subscription_id, 

1391 last_refresh=now, 

1392 next_refresh=next_refresh, 

1393 ) 

1394 

1395 # Increment stats 

1396 sub_storage.increment_stats(subscription_id, 1) 

1397 

1398 logger.info( 

1399 f"Updated subscription {subscription_id} refresh times" 

1400 ) 

1401 except Exception as e: 

1402 logger.warning( 

1403 f"Could not update subscription refresh time: {e}" 

1404 ) 

1405 

1406 progress_callback( 

1407 "Research completed successfully", 

1408 100, 

1409 {"phase": "complete"}, 

1410 ) 

1411 

1412 # Clean up resources 

1413 cleanup_research_resources( 

1414 research_id, active_research, termination_flags, username 

1415 ) 

1416 

1417 except Exception as e: 

1418 # Handle error 

1419 error_message = f"Research failed: {e!s}" 

1420 logger.exception(error_message) 

1421 

1422 try: 

1423 # Check for common Ollama error patterns in the exception and provide more user-friendly errors 

1424 user_friendly_error = str(e) 

1425 error_context = {} 

1426 

1427 if "Error type: ollama_unavailable" in user_friendly_error: 1427 ↛ 1428line 1427 didn't jump to line 1428 because the condition on line 1427 was never true

1428 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system." 

1429 error_context = { 

1430 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly." 

1431 } 

1432 elif "Error type: model_not_found" in user_friendly_error: 1432 ↛ 1433line 1432 didn't jump to line 1433 because the condition on line 1432 was never true

1433 user_friendly_error = "Required Ollama model not found. Please pull the model first." 

1434 error_context = { 

1435 "solution": "Run 'ollama pull mistral' to download the required model." 

1436 } 

1437 elif "Error type: connection_error" in user_friendly_error: 1437 ↛ 1438line 1437 didn't jump to line 1438 because the condition on line 1437 was never true

1438 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running." 

1439 error_context = { 

1440 "solution": "Ensure Ollama or your API service is running and accessible." 

1441 } 

1442 elif "Error type: api_error" in user_friendly_error: 1442 ↛ 1444line 1442 didn't jump to line 1444 because the condition on line 1442 was never true

1443 # Keep the original error message as it's already improved 

1444 error_context = { 

1445 "solution": "Check API configuration and credentials." 

1446 } 

1447 

1448 # Generate enhanced error report for failed research 

1449 enhanced_report_content = None 

1450 try: 

1451 # Get LLM for error explanation if available 

1452 try: 

1453 llm = get_llm( 

1454 research_id=research_id, 

1455 research_context=shared_research_context, 

1456 ) 

1457 except Exception: 

1458 llm = None 

1459 logger.warning( 

1460 "Could not get LLM for error explanation in failure handler" 

1461 ) 

1462 

1463 # Get partial results if they exist 

1464 partial_results = results if "results" in locals() else None 

1465 search_iterations = ( 

1466 results.get("iterations", 0) if partial_results else 0 

1467 ) 

1468 

1469 # Generate comprehensive error report 

1470 error_generator = ErrorReportGenerator(llm) 

1471 enhanced_report_content = error_generator.generate_error_report( 

1472 error_message=f"Research failed: {e!s}", 

1473 query=query, 

1474 partial_results=partial_results, 

1475 search_iterations=search_iterations, 

1476 research_id=research_id, 

1477 ) 

1478 

1479 logger.info( 

1480 "Generated enhanced error report for failed research (length: %d)", 

1481 len(enhanced_report_content), 

1482 ) 

1483 

1484 # Save enhanced error report to encrypted database 

1485 try: 

1486 # Get username from the research context 

1487 username = getattr(g, "username", None) or session.get( 

1488 "username" 

1489 ) 

1490 if username: 1490 ↛ 1512line 1490 didn't jump to line 1512 because the condition on line 1490 was always true

1491 from ...storage import get_report_storage 

1492 

1493 with get_user_db_session(username) as db_session: 

1494 storage = get_report_storage(session=db_session) 

1495 success = storage.save_report( 

1496 research_id=research_id, 

1497 content=enhanced_report_content, 

1498 metadata={"error_report": True}, 

1499 username=username, 

1500 ) 

1501 if success: 1501 ↛ 1507line 1501 didn't jump to line 1507 because the condition on line 1501 was always true

1502 logger.info( 

1503 "Saved enhanced error report to encrypted database for research %s", 

1504 research_id, 

1505 ) 

1506 else: 

1507 logger.warning( 

1508 "Failed to save enhanced error report to database for research %s", 

1509 research_id, 

1510 ) 

1511 else: 

1512 logger.warning( 

1513 "Cannot save error report: username not available" 

1514 ) 

1515 

1516 except Exception as report_error: 

1517 logger.exception( 

1518 "Failed to save enhanced error report: %s", report_error 

1519 ) 

1520 

1521 except Exception as error_gen_error: 

1522 logger.exception( 

1523 "Failed to generate enhanced error report: %s", 

1524 error_gen_error, 

1525 ) 

1526 enhanced_report_content = None 

1527 

1528 # Get existing metadata from database first 

1529 existing_metadata = {} 

1530 try: 

1531 # Get username from the research context 

1532 username = getattr(g, "username", None) or session.get( 

1533 "username" 

1534 ) 

1535 if username: 1535 ↛ 1548line 1535 didn't jump to line 1548 because the condition on line 1535 was always true

1536 with get_user_db_session(username) as db_session: 

1537 research = ( 

1538 db_session.query(ResearchHistory) 

1539 .filter_by(id=research_id) 

1540 .first() 

1541 ) 

1542 if research and research.research_meta: 1542 ↛ 1548line 1542 didn't jump to line 1548

1543 existing_metadata = dict(research.research_meta) 

1544 except Exception: 

1545 logger.exception("Failed to get existing metadata") 

1546 

1547 # Update metadata with more context about the error while preserving existing values 

1548 metadata = existing_metadata 

1549 metadata.update({"phase": "error", "error": user_friendly_error}) 

1550 if error_context: 1550 ↛ 1551line 1550 didn't jump to line 1551 because the condition on line 1550 was never true

1551 metadata.update(error_context) 

1552 if enhanced_report_content: 1552 ↛ 1556line 1552 didn't jump to line 1556 because the condition on line 1552 was always true

1553 metadata["has_enhanced_report"] = True 

1554 

1555 # If we still have an active research record, update its log 

1556 if research_id in active_research: 

1557 progress_callback(user_friendly_error, None, metadata) 

1558 

1559 # If termination was requested, mark as suspended instead of failed 

1560 status = ( 

1561 "suspended" 

1562 if (termination_flags.get(research_id)) 

1563 else "failed" 

1564 ) 

1565 message = ( 

1566 "Research was terminated by user" 

1567 if status == "suspended" 

1568 else user_friendly_error 

1569 ) 

1570 

1571 # Calculate duration up to termination point - using UTC consistently 

1572 now = datetime.now(UTC) 

1573 completed_at = now.isoformat() 

1574 

1575 # NOTE: Database updates from threads are handled by queue processor 

1576 # The queue_processor.queue_error_update() method is already being used below 

1577 # to safely update the database from the main thread 

1578 

1579 # Queue the error update to be processed in main thread 

1580 # Using the queue processor v2 system 

1581 from ..queue.processor_v2 import queue_processor 

1582 

1583 if username: 1583 ↛ 1597line 1583 didn't jump to line 1597 because the condition on line 1583 was always true

1584 queue_processor.queue_error_update( 

1585 username=username, 

1586 research_id=research_id, 

1587 status=status, 

1588 error_message=message, 

1589 metadata=metadata, 

1590 completed_at=completed_at, 

1591 report_path=None, 

1592 ) 

1593 logger.info( 

1594 f"Queued error update for research {research_id} with status '{status}'" 

1595 ) 

1596 else: 

1597 logger.error( 

1598 f"Cannot queue error update for research {research_id} - no username provided. " 

1599 f"Status: '{status}', Message: {message}" 

1600 ) 

1601 

1602 try: 

1603 SocketIOService().emit_to_subscribers( 

1604 "research_progress", 

1605 research_id, 

1606 {"status": status, "error": message}, 

1607 ) 

1608 except Exception: 

1609 logger.exception("Failed to emit error via socket") 

1610 

1611 except Exception: 

1612 logger.exception("Error in error handler") 

1613 

1614 # Clean up resources 

1615 cleanup_research_resources( 

1616 research_id, active_research, termination_flags, username 

1617 ) 

1618 

1619 

1620def cleanup_research_resources( 

1621 research_id, active_research, termination_flags, username=None 

1622): 

1623 """ 

1624 Clean up resources for a completed research. 

1625 

1626 Args: 

1627 research_id: The ID of the research 

1628 active_research: Dictionary of active research processes 

1629 termination_flags: Dictionary of termination flags 

1630 username: The username for database access (required for thread context) 

1631 """ 

1632 logger.info("Cleaning up resources for research %s", research_id) 

1633 

1634 # For testing: Add a small delay to simulate research taking time 

1635 # This helps test concurrent research limits 

1636 from ...settings.env_registry import is_test_mode 

1637 

1638 if is_test_mode(): 1638 ↛ 1639line 1638 didn't jump to line 1639 because the condition on line 1638 was never true

1639 import time 

1640 

1641 logger.info( 

1642 f"Test mode: Adding 5 second delay before cleanup for {research_id}" 

1643 ) 

1644 time.sleep(5) 

1645 

1646 # Get the current status from the database to determine the final status message 

1647 current_status = "completed" # Default 

1648 

1649 # NOTE: Queue processor already handles database updates from the main thread 

1650 # The notify_research_completed() method is called at the end of this function 

1651 # which safely updates the database status 

1652 

1653 # Notify queue processor that research completed 

1654 # This uses processor_v2 which handles database updates in the main thread 

1655 # avoiding the Flask request context issues that occur in background threads 

1656 from ..queue.processor_v2 import queue_processor 

1657 

1658 if username: 

1659 queue_processor.notify_research_completed(username, research_id) 

1660 logger.info( 

1661 f"Notified queue processor of completion for research {research_id} (user: {username})" 

1662 ) 

1663 else: 

1664 logger.warning( 

1665 f"Cannot notify completion for research {research_id} - no username provided" 

1666 ) 

1667 

1668 # Remove from active research 

1669 if research_id in active_research: 

1670 del active_research[research_id] 

1671 

1672 # Remove from termination flags 

1673 if research_id in termination_flags: 

1674 del termination_flags[research_id] 

1675 

1676 # Send a final message to subscribers 

1677 try: 

1678 # Import here to avoid circular imports 

1679 from ..routes.globals import get_globals 

1680 

1681 globals_dict = get_globals() 

1682 socket_subscriptions = globals_dict.get("socket_subscriptions", {}) 

1683 

1684 # Send a final message to any remaining subscribers with explicit status 

1685 if socket_subscriptions.get(research_id): 1685 ↛ 1687line 1685 didn't jump to line 1687 because the condition on line 1685 was never true

1686 # Use the proper status message based on database status 

1687 if current_status == "suspended" or current_status == "failed": 

1688 final_message = { 

1689 "status": current_status, 

1690 "message": f"Research was {current_status}", 

1691 "progress": 0, # For suspended research, show 0% not 100% 

1692 } 

1693 else: 

1694 final_message = { 

1695 "status": "completed", 

1696 "message": "Research process has ended and resources have been cleaned up", 

1697 "progress": 100, 

1698 } 

1699 

1700 logger.info( 

1701 "Sending final %s socket message for research %s", 

1702 current_status, 

1703 research_id, 

1704 ) 

1705 

1706 SocketIOService().emit_to_subscribers( 

1707 "research_progress", research_id, final_message 

1708 ) 

1709 

1710 except Exception: 

1711 logger.exception("Error sending final cleanup message") 

1712 

1713 

1714def handle_termination( 

1715 research_id, active_research, termination_flags, username=None 

1716): 

1717 """ 

1718 Handle the termination of a research process. 

1719 

1720 Args: 

1721 research_id: The ID of the research 

1722 active_research: Dictionary of active research processes 

1723 termination_flags: Dictionary of termination flags 

1724 username: The username for database access (required for thread context) 

1725 """ 

1726 logger.info(f"Handling termination for research {research_id}") 

1727 

1728 # Queue the status update to be processed in the main thread 

1729 # This avoids Flask request context errors in background threads 

1730 try: 

1731 from ..queue.processor_v2 import queue_processor 

1732 

1733 now = datetime.now(UTC) 

1734 completed_at = now.isoformat() 

1735 

1736 # Queue the suspension update 

1737 queue_processor.queue_error_update( 

1738 username=username, 

1739 research_id=research_id, 

1740 status="suspended", 

1741 error_message="Research was terminated by user", 

1742 metadata={"terminated_at": completed_at}, 

1743 completed_at=completed_at, 

1744 report_path=None, 

1745 ) 

1746 

1747 logger.info(f"Queued suspension update for research {research_id}") 

1748 except Exception: 

1749 logger.exception( 

1750 f"Error queueing termination update for research {research_id}" 

1751 ) 

1752 

1753 # Clean up resources (this already handles things properly) 

1754 cleanup_research_resources( 

1755 research_id, active_research, termination_flags, username 

1756 ) 

1757 

1758 

1759def cancel_research(research_id, username=None): 

1760 """ 

1761 Cancel/terminate a research process using ORM. 

1762 

1763 Args: 

1764 research_id: The ID of the research to cancel 

1765 username: The username of the user cancelling the research (optional, will try to get from session if not provided) 

1766 

1767 Returns: 

1768 bool: True if the research was found and cancelled, False otherwise 

1769 """ 

1770 try: 

1771 # Import globals from research routes 

1772 from ..routes.globals import get_globals 

1773 

1774 globals_dict = get_globals() 

1775 active_research = globals_dict["active_research"] 

1776 termination_flags = globals_dict["termination_flags"] 

1777 

1778 # Set termination flag 

1779 termination_flags[research_id] = True 

1780 

1781 # Check if the research is active 

1782 if research_id in active_research: 1782 ↛ 1791line 1782 didn't jump to line 1791 because the condition on line 1782 was always true

1783 # Call handle_termination to update database 

1784 handle_termination( 

1785 research_id, active_research, termination_flags, username 

1786 ) 

1787 return True 

1788 else: 

1789 # Update database directly if not found in active_research 

1790 # Get username from parameter or session 

1791 if not username: 

1792 from flask import session 

1793 

1794 username = session.get("username") 

1795 

1796 if not username: 

1797 logger.warning( 

1798 f"No username available for cancelling research {research_id}" 

1799 ) 

1800 return False 

1801 

1802 try: 

1803 with get_user_db_session(username) as db_session: 

1804 research = ( 

1805 db_session.query(ResearchHistory) 

1806 .filter_by(id=research_id) 

1807 .first() 

1808 ) 

1809 if not research: 

1810 logger.info( 

1811 f"Research {research_id} not found in database" 

1812 ) 

1813 return False 

1814 

1815 # Check if already completed or suspended 

1816 if research.status in ["completed", "suspended", "error"]: 

1817 logger.info( 

1818 f"Research {research_id} already in terminal state: {research.status}" 

1819 ) 

1820 return True # Consider this a success since it's already stopped 

1821 

1822 # If it exists but isn't in active_research, still update status 

1823 research.status = "suspended" 

1824 db_session.commit() 

1825 logger.info( 

1826 f"Successfully suspended research {research_id}" 

1827 ) 

1828 except Exception as e: 

1829 logger.exception( 

1830 f"Error accessing database for research {research_id}: {e}" 

1831 ) 

1832 return False 

1833 

1834 return True 

1835 except Exception as e: 

1836 logger.exception( 

1837 f"Unexpected error in cancel_research for {research_id}: {e}" 

1838 ) 

1839 return False