Coverage for src / local_deep_research / web / services / research_service.py: 41%

642 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import hashlib 

2import json 

3import threading 

4from datetime import datetime, UTC 

5from pathlib import Path 

6 

7from loguru import logger 

8 

9from ...config.llm_config import get_llm 

10 

11# Output directory for research results 

12from ...config.paths import get_research_outputs_directory 

13from ...config.search_config import get_search 

14from ...constants import ResearchStatus 

15from ...database.models import ResearchHistory, ResearchStrategy 

16from ...database.session_context import get_user_db_session 

17from ...error_handling.report_generator import ErrorReportGenerator 

18from ...utilities.thread_context import set_search_context 

19from ...report_generator import IntegratedReportGenerator 

20from ...search_system import AdvancedSearchSystem 

21from ...text_optimization import CitationFormatter, CitationMode 

22from ...utilities.log_utils import log_for_research 

23from ...utilities.search_utilities import extract_links_from_search_results 

24from ...utilities.threading_utils import thread_context, thread_with_app_context 

25from ..models.database import calculate_duration 

26from .socket_service import SocketIOService 

27 

28OUTPUT_DIR = get_research_outputs_directory() 

29 

30 

31def _parse_research_metadata(research_meta) -> dict: 

32 """Parse research_meta into a dict, handling both dict and JSON string types.""" 

33 if isinstance(research_meta, dict): 

34 return dict(research_meta) 

35 if isinstance(research_meta, str): 

36 try: 

37 return json.loads(research_meta) 

38 except json.JSONDecodeError: 

39 logger.exception("Failed to parse research_meta as JSON") 

40 return {} 

41 return {} 

42 

43 

44def get_citation_formatter(): 

45 """Get citation formatter with settings from thread context.""" 

46 # Import here to avoid circular imports 

47 from ...config.search_config import get_setting_from_snapshot 

48 

49 citation_format = get_setting_from_snapshot( 

50 "report.citation_format", "number_hyperlinks" 

51 ) 

52 mode_map = { 

53 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

54 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

55 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

56 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

57 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

58 } 

59 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS) 

60 return CitationFormatter(mode=mode) 

61 

62 

63def export_report_to_memory( 

64 markdown_content: str, format: str, title: str = None 

65): 

66 """ 

67 Export a markdown report to different formats in memory. 

68 

69 Uses the modular exporter registry to support multiple formats. 

70 Available formats can be queried with ExporterRegistry.get_available_formats(). 

71 

72 Args: 

73 markdown_content: The markdown content to export 

74 format: Export format (e.g., 'pdf', 'odt', 'latex', 'quarto', 'ris') 

75 title: Optional title for the document 

76 

77 Returns: 

78 Tuple of (content_bytes, filename, mimetype) 

79 """ 

80 from ...exporters import ExporterRegistry, ExportOptions 

81 

82 # Normalize format 

83 format_lower = format.lower() 

84 

85 # Get exporter from registry 

86 exporter = ExporterRegistry.get_exporter(format_lower) 

87 

88 if exporter is None: 

89 available = ExporterRegistry.get_available_formats() 

90 raise ValueError( 

91 f"Unsupported export format: {format}. " 

92 f"Available formats: {', '.join(available)}" 

93 ) 

94 

95 # Title prepending is now handled by each exporter via _prepend_title_if_needed() 

96 # PDF and ODT exporters prepend titles; RIS and other formats ignore them 

97 

98 # Create options 

99 options = ExportOptions(title=title) 

100 

101 # Export 

102 result = exporter.export(markdown_content, options) 

103 

104 logger.info( 

105 f"Generated {format_lower} in memory, size: {len(result.content)} bytes" 

106 ) 

107 

108 return result.content, result.filename, result.mimetype 

109 

110 

111def save_research_strategy(research_id, strategy_name, username=None): 

112 """ 

113 Save the strategy used for a research to the database. 

114 

115 Args: 

116 research_id: The ID of the research 

117 strategy_name: The name of the strategy used 

118 username: The username for database access (required for thread context) 

119 """ 

120 try: 

121 logger.debug( 

122 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}" 

123 ) 

124 with get_user_db_session(username) as session: 

125 try: 

126 # Check if a strategy already exists for this research 

127 existing_strategy = ( 

128 session.query(ResearchStrategy) 

129 .filter_by(research_id=research_id) 

130 .first() 

131 ) 

132 

133 if existing_strategy: 

134 # Update existing strategy 

135 existing_strategy.strategy_name = strategy_name 

136 logger.debug( 

137 f"Updating existing strategy for research {research_id}" 

138 ) 

139 else: 

140 # Create new strategy record 

141 new_strategy = ResearchStrategy( 

142 research_id=research_id, strategy_name=strategy_name 

143 ) 

144 session.add(new_strategy) 

145 logger.debug( 

146 f"Creating new strategy record for research {research_id}" 

147 ) 

148 

149 session.commit() 

150 logger.info( 

151 f"Saved strategy '{strategy_name}' for research {research_id}" 

152 ) 

153 finally: 

154 session.close() 

155 except Exception: 

156 logger.exception("Error saving research strategy") 

157 

158 

159def get_research_strategy(research_id, username=None): 

160 """ 

161 Get the strategy used for a research. 

162 

163 Args: 

164 research_id: The ID of the research 

165 username: The username for database access (required for thread context) 

166 

167 Returns: 

168 str: The strategy name or None if not found 

169 """ 

170 try: 

171 with get_user_db_session(username) as session: 

172 try: 

173 strategy = ( 

174 session.query(ResearchStrategy) 

175 .filter_by(research_id=research_id) 

176 .first() 

177 ) 

178 

179 return strategy.strategy_name if strategy else None 

180 finally: 

181 session.close() 

182 except Exception: 

183 logger.exception("Error getting research strategy") 

184 return None 

185 

186 

187def start_research_process( 

188 research_id, 

189 query, 

190 mode, 

191 active_research, 

192 termination_flags, 

193 run_research_callback, 

194 **kwargs, 

195): 

196 """ 

197 Start a research process in a background thread. 

198 

199 Args: 

200 research_id: The ID of the research 

201 query: The research query 

202 mode: The research mode (quick/detailed) 

203 active_research: Dictionary of active research processes 

204 termination_flags: Dictionary of termination flags 

205 run_research_callback: The callback function to run the research 

206 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.) 

207 

208 Returns: 

209 threading.Thread: The thread running the research 

210 """ 

211 # Pass the app context to the thread. 

212 run_research_callback = thread_with_app_context(run_research_callback) 

213 

214 # Start research process in a background thread 

215 thread = threading.Thread( 

216 target=run_research_callback, 

217 args=( 

218 thread_context(), 

219 research_id, 

220 query, 

221 mode, 

222 active_research, 

223 termination_flags, 

224 ), 

225 kwargs=kwargs, 

226 ) 

227 thread.daemon = True 

228 thread.start() 

229 

230 active_research[research_id] = { 

231 "thread": thread, 

232 "progress": 0, 

233 "status": ResearchStatus.IN_PROGRESS, 

234 "log": [], 

235 "settings": kwargs, # Store settings for reference 

236 } 

237 

238 return thread 

239 

240 

241def _generate_report_path(query: str) -> Path: 

242 """ 

243 Generates a path for a new report file based on the query. 

244 

245 Args: 

246 query: The query used for the report. 

247 

248 Returns: 

249 The path that it generated. 

250 

251 """ 

252 # Generate a unique filename that does not contain 

253 # non-alphanumeric characters. 

254 query_hash = hashlib.md5( # DevSkim: ignore DS126858 

255 query.encode("utf-8"), usedforsecurity=False 

256 ).hexdigest()[:10] 

257 return OUTPUT_DIR / ( 

258 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md" 

259 ) 

260 

261 

262@log_for_research 

263def run_research_process( 

264 research_id, query, mode, active_research, termination_flags, **kwargs 

265): 

266 """ 

267 Run the research process in the background for a given research ID. 

268 

269 Args: 

270 research_id: The ID of the research 

271 query: The research query 

272 mode: The research mode (quick/detailed) 

273 active_research: Dictionary of active research processes 

274 termination_flags: Dictionary of termination flags 

275 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.) 

276 MUST include 'username' for database access 

277 """ 

278 

279 # Extract username - required for database access 

280 username = kwargs.get("username") 

281 logger.info(f"Research thread started with username: {username}") 

282 if not username: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 logger.error("No username provided to research thread") 

284 raise ValueError("Username is required for research process") 

285 try: 

286 # Check if this research has been terminated before we even start 

287 if termination_flags.get(research_id): 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 logger.info( 

289 f"Research {research_id} was terminated before starting" 

290 ) 

291 cleanup_research_resources( 

292 research_id, active_research, termination_flags, username 

293 ) 

294 return 

295 

296 logger.info( 

297 f"Starting research process for ID {research_id}, query: {query}" 

298 ) 

299 

300 # Extract key parameters 

301 model_provider = kwargs.get("model_provider") 

302 model = kwargs.get("model") 

303 custom_endpoint = kwargs.get("custom_endpoint") 

304 search_engine = kwargs.get("search_engine") 

305 max_results = kwargs.get("max_results") 

306 time_period = kwargs.get("time_period") 

307 iterations = kwargs.get("iterations") 

308 questions_per_iteration = kwargs.get("questions_per_iteration") 

309 strategy = kwargs.get( 

310 "strategy", "source-based" 

311 ) # Default to source-based 

312 settings_snapshot = kwargs.get( 

313 "settings_snapshot", {} 

314 ) # Complete settings snapshot 

315 

316 # Log settings snapshot to debug 

317 from ...settings.logger import log_settings 

318 

319 log_settings(settings_snapshot, "Settings snapshot received in thread") 

320 

321 # Strategy should already be saved in the database before thread starts 

322 logger.info(f"Research strategy: {strategy}") 

323 

324 # Log all parameters for debugging 

325 logger.info( 

326 f"Research parameters: provider={model_provider}, model={model}, " 

327 f"search_engine={search_engine}, max_results={max_results}, " 

328 f"time_period={time_period}, iterations={iterations}, " 

329 f"questions_per_iteration={questions_per_iteration}, " 

330 f"custom_endpoint={custom_endpoint}, strategy={strategy}" 

331 ) 

332 

333 # Set up the AI Context Manager 

334 output_dir = OUTPUT_DIR / f"research_{research_id}" 

335 output_dir.mkdir(parents=True, exist_ok=True) 

336 

337 # Create a settings context that uses snapshot if available, otherwise falls back to database 

338 # This allows the research to be independent of database during execution 

339 class SettingsContext: 

340 def __init__(self, snapshot, username): 

341 self.snapshot = snapshot or {} 

342 self.username = username 

343 # Extract values from setting objects if needed 

344 self.values = {} 

345 for key, setting in self.snapshot.items(): 

346 if isinstance(setting, dict) and "value" in setting: 346 ↛ 351line 346 didn't jump to line 351 because the condition on line 346 was always true

347 # It's a full setting object, extract the value 

348 self.values[key] = setting["value"] 

349 else: 

350 # It's already just a value 

351 self.values[key] = setting 

352 

353 def get_setting(self, key, default=None): 

354 """Get setting from snapshot only - no database access in threads""" 

355 if key in self.values: 355 ↛ 358line 355 didn't jump to line 358 because the condition on line 355 was always true

356 return self.values[key] 

357 # No fallback to database - threads must use snapshot only 

358 logger.debug( 

359 f"Setting '{key}' not found in snapshot, using default" 

360 ) 

361 return default 

362 

363 settings_context = SettingsContext(settings_snapshot, username) 

364 

365 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var 

366 from ...settings.logger import log_settings 

367 

368 log_settings( 

369 settings_context.values, "SettingsContext values extracted" 

370 ) 

371 

372 # Set the settings context for this thread 

373 from ...config.thread_settings import ( 

374 clear_settings_context, 

375 set_settings_context, 

376 ) 

377 

378 set_settings_context(settings_context) 

379 

380 # Get user password if provided 

381 user_password = kwargs.get("user_password") 

382 

383 # Create shared research context that can be updated during research 

384 shared_research_context = { 

385 "research_id": research_id, 

386 "research_query": query, 

387 "research_mode": mode, 

388 "research_phase": "init", 

389 "search_iteration": 0, 

390 "search_engines_planned": None, 

391 "search_engine_selected": search_engine, 

392 "username": username, # Add username for queue operations 

393 "user_password": user_password, # Add password for metrics access 

394 } 

395 

396 # If this is a follow-up research, include the parent context 

397 if "research_context" in kwargs and kwargs["research_context"]: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 logger.info( 

399 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings" 

400 ) 

401 shared_research_context.update(kwargs["research_context"]) 

402 

403 # Do not log context keys as they may contain sensitive information 

404 logger.info(f"Created shared_research_context for user: {username}") 

405 

406 # Set search context for search tracking 

407 set_search_context(shared_research_context) 

408 

409 # Set up progress callback 

410 def progress_callback(message, progress_percent, metadata): 

411 # Frequent termination check 

412 if termination_flags.get(research_id): 

413 handle_termination( 

414 research_id, active_research, termination_flags, username 

415 ) 

416 raise Exception("Research was terminated by user") 

417 

418 # Bind research_id to logger for this specific log 

419 bound_logger = logger.bind(research_id=research_id) 

420 bound_logger.log("MILESTONE", message) 

421 

422 if "SEARCH_PLAN:" in message: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 engines = message.split("SEARCH_PLAN:")[1].strip() 

424 metadata["planned_engines"] = engines 

425 metadata["phase"] = "search_planning" # Use existing phase 

426 # Update shared context for token tracking 

427 shared_research_context["search_engines_planned"] = engines 

428 shared_research_context["research_phase"] = "search_planning" 

429 

430 if "ENGINE_SELECTED:" in message: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true

431 engine = message.split("ENGINE_SELECTED:")[1].strip() 

432 metadata["selected_engine"] = engine 

433 metadata["phase"] = "search" # Use existing 'search' phase 

434 # Update shared context for token tracking 

435 shared_research_context["search_engine_selected"] = engine 

436 shared_research_context["research_phase"] = "search" 

437 

438 # Capture other research phases for better context tracking 

439 if metadata.get("phase"): 439 ↛ 443line 439 didn't jump to line 443 because the condition on line 439 was always true

440 shared_research_context["research_phase"] = metadata["phase"] 

441 

442 # Update search iteration if available 

443 if "iteration" in metadata: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 shared_research_context["search_iteration"] = metadata[ 

445 "iteration" 

446 ] 

447 

448 # Adjust progress based on research mode 

449 adjusted_progress = progress_percent 

450 if ( 450 ↛ 455line 450 didn't jump to line 455 because the condition on line 450 was never true

451 mode == "detailed" 

452 and metadata.get("phase") == "output_generation" 

453 ): 

454 # For detailed mode, adjust the progress range for output generation 

455 adjusted_progress = min(80, progress_percent) 

456 elif ( 456 ↛ 461line 456 didn't jump to line 461 because the condition on line 456 was never true

457 mode == "detailed" 

458 and metadata.get("phase") == "report_generation" 

459 ): 

460 # Scale the progress from 80% to 95% for the report generation phase 

461 if progress_percent is not None: 

462 normalized = progress_percent / 100 

463 adjusted_progress = 80 + (normalized * 15) 

464 elif ( 464 ↛ 468line 464 didn't jump to line 468 because the condition on line 464 was never true

465 mode == "quick" and metadata.get("phase") == "output_generation" 

466 ): 

467 # For quick mode, ensure we're at least at 85% during output generation 

468 adjusted_progress = max(85, progress_percent) 

469 # Map any further progress within output_generation to 85-95% range 

470 if progress_percent is not None and progress_percent > 0: 

471 normalized = progress_percent / 100 

472 adjusted_progress = 85 + (normalized * 10) 

473 

474 # Don't let progress go backwards 

475 if research_id in active_research and adjusted_progress is not None: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 current_progress = active_research[research_id].get( 

477 "progress", 0 

478 ) 

479 adjusted_progress = max(current_progress, adjusted_progress) 

480 

481 # Update active research record 

482 if research_id in active_research: 482 ↛ exitline 482 didn't return from function 'progress_callback' because the condition on line 482 was always true

483 if adjusted_progress is not None: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 active_research[research_id]["progress"] = adjusted_progress 

485 

486 # Queue the progress update to be processed in main thread 

487 if adjusted_progress is not None: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true

488 from ..queue.processor_v2 import queue_processor 

489 

490 if username: 

491 queue_processor.queue_progress_update( 

492 username, research_id, adjusted_progress 

493 ) 

494 else: 

495 logger.warning( 

496 f"Cannot queue progress update for research {research_id} - no username available" 

497 ) 

498 

499 # Emit a socket event 

500 try: 

501 # Basic event data 

502 event_data = {"progress": adjusted_progress} 

503 

504 SocketIOService().emit_to_subscribers( 

505 "progress", research_id, event_data 

506 ) 

507 except Exception: 

508 logger.exception("Socket emit error (non-critical)") 

509 

510 # Function to check termination during long-running operations 

511 def check_termination(): 

512 if termination_flags.get(research_id): 

513 handle_termination( 

514 research_id, active_research, termination_flags, username 

515 ) 

516 raise Exception( 

517 "Research was terminated by user during long-running operation" 

518 ) 

519 return False # Not terminated 

520 

521 # Configure the system with the specified parameters 

522 use_llm = None 

523 if model or search_engine or model_provider: 523 ↛ 530line 523 didn't jump to line 530 because the condition on line 523 was always true

524 # Log that we're overriding system settings 

525 logger.info( 

526 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}" 

527 ) 

528 

529 # Override LLM if model or model_provider specified 

530 if model or model_provider: 530 ↛ 573line 530 didn't jump to line 573 because the condition on line 530 was always true

531 try: 

532 # Get LLM with the overridden settings 

533 # Use the shared_research_context which includes username 

534 use_llm = get_llm( 

535 model_name=model, 

536 provider=model_provider, 

537 openai_endpoint_url=custom_endpoint, 

538 research_id=research_id, 

539 research_context=shared_research_context, 

540 ) 

541 

542 logger.info( 

543 f"Successfully set LLM to: provider={model_provider}, model={model}" 

544 ) 

545 except Exception as e: 

546 logger.exception( 

547 f"Error setting LLM provider={model_provider}, model={model}" 

548 ) 

549 error_msg = str(e) 

550 # Surface configuration errors to user instead of silently continuing 

551 config_error_keywords = [ 

552 "model path", 

553 "llamacpp", 

554 "cannot connect", 

555 "server", 

556 "not configured", 

557 "not responding", 

558 "directory", 

559 ".gguf", 

560 ] 

561 if any( 561 ↛ 566line 561 didn't jump to line 566 because the condition on line 561 was never true

562 keyword in error_msg.lower() 

563 for keyword in config_error_keywords 

564 ): 

565 # This is a configuration error the user can fix 

566 raise ValueError( 

567 f"LLM Configuration Error: {error_msg}" 

568 ) from e 

569 # For other errors, re-raise to avoid silent failures 

570 raise 

571 

572 # Create search engine first if specified, to avoid default creation without username 

573 use_search = None 

574 if search_engine: 

575 try: 

576 # Create a new search object with these settings 

577 use_search = get_search( 

578 search_tool=search_engine, 

579 llm_instance=use_llm, 

580 username=username, 

581 settings_snapshot=settings_snapshot, 

582 ) 

583 logger.info( 

584 f"Successfully created search engine: {search_engine}" 

585 ) 

586 except Exception as e: 

587 logger.exception( 

588 f"Error creating search engine {search_engine}" 

589 ) 

590 error_msg = str(e) 

591 # Surface configuration errors to user instead of silently continuing 

592 config_error_keywords = [ 

593 "searxng", 

594 "instance_url", 

595 "api_key", 

596 "cannot connect", 

597 "connection", 

598 "timeout", 

599 "not configured", 

600 ] 

601 if any( 

602 keyword in error_msg.lower() 

603 for keyword in config_error_keywords 

604 ): 

605 # This is a configuration error the user can fix 

606 raise ValueError( 

607 f"Search Engine Configuration Error ({search_engine}): {error_msg}" 

608 ) from e 

609 # For other errors, re-raise to avoid silent failures 

610 raise 

611 

612 # Set the progress callback in the system 

613 system = AdvancedSearchSystem( 

614 llm=use_llm, 

615 search=use_search, 

616 strategy_name=strategy, 

617 max_iterations=iterations, 

618 questions_per_iteration=questions_per_iteration, 

619 username=username, 

620 settings_snapshot=settings_snapshot, 

621 research_id=research_id, 

622 research_context=shared_research_context, 

623 ) 

624 system.set_progress_callback(progress_callback) 

625 

626 # Run the search 

627 progress_callback("Starting research process", 5, {"phase": "init"}) 

628 

629 try: 

630 results = system.analyze_topic(query) 

631 if mode == "quick": 

632 progress_callback( 

633 "Search complete, preparing to generate summary...", 

634 85, 

635 {"phase": "output_generation"}, 

636 ) 

637 else: 

638 progress_callback( 

639 "Search complete, generating output", 

640 80, 

641 {"phase": "output_generation"}, 

642 ) 

643 except Exception as search_error: 

644 # Better handling of specific search errors 

645 error_message = str(search_error) 

646 error_type = "unknown" 

647 

648 # Extract error details for common issues 

649 if "status code: 503" in error_message: 

650 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system." 

651 error_type = "ollama_unavailable" 

652 elif "status code: 404" in error_message: 

653 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model." 

654 error_type = "model_not_found" 

655 elif "status code:" in error_message: 

656 # Extract the status code for other HTTP errors 

657 status_code = error_message.split("status code:")[1].strip() 

658 error_message = f"API request failed with status code {status_code}. Please check your configuration." 

659 error_type = "api_error" 

660 elif "connection" in error_message.lower(): 

661 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible." 

662 error_type = "connection_error" 

663 

664 # Raise with improved error message 

665 raise Exception(f"{error_message} (Error type: {error_type})") 

666 

667 # Generate output based on mode 

668 if mode == "quick": 

669 # Quick Summary 

670 if results.get("findings") or results.get("formatted_findings"): 

671 raw_formatted_findings = results["formatted_findings"] 

672 

673 # Check if formatted_findings contains an error message 

674 if isinstance( 

675 raw_formatted_findings, str 

676 ) and raw_formatted_findings.startswith("Error:"): 

677 logger.exception( 

678 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..." 

679 ) 

680 

681 # Determine error type for better user feedback 

682 error_type = "unknown" 

683 error_message = raw_formatted_findings.lower() 

684 

685 if ( 

686 "token limit" in error_message 

687 or "context length" in error_message 

688 ): 

689 error_type = "token_limit" 

690 # Log specific error type 

691 logger.warning( 

692 "Detected token limit error in synthesis" 

693 ) 

694 

695 # Update progress with specific error type 

696 progress_callback( 

697 "Synthesis hit token limits. Attempting fallback...", 

698 87, 

699 { 

700 "phase": "synthesis_error", 

701 "error_type": error_type, 

702 }, 

703 ) 

704 elif ( 

705 "timeout" in error_message 

706 or "timed out" in error_message 

707 ): 

708 error_type = "timeout" 

709 logger.warning("Detected timeout error in synthesis") 

710 progress_callback( 

711 "Synthesis timed out. Attempting fallback...", 

712 87, 

713 { 

714 "phase": "synthesis_error", 

715 "error_type": error_type, 

716 }, 

717 ) 

718 elif "rate limit" in error_message: 

719 error_type = "rate_limit" 

720 logger.warning("Detected rate limit error in synthesis") 

721 progress_callback( 

722 "LLM rate limit reached. Attempting fallback...", 

723 87, 

724 { 

725 "phase": "synthesis_error", 

726 "error_type": error_type, 

727 }, 

728 ) 

729 elif ( 

730 "connection" in error_message 

731 or "network" in error_message 

732 ): 

733 error_type = "connection" 

734 logger.warning("Detected connection error in synthesis") 

735 progress_callback( 

736 "Connection issue with LLM. Attempting fallback...", 

737 87, 

738 { 

739 "phase": "synthesis_error", 

740 "error_type": error_type, 

741 }, 

742 ) 

743 elif ( 

744 "llm error" in error_message 

745 or "final answer synthesis fail" in error_message 

746 ): 

747 error_type = "llm_error" 

748 logger.warning( 

749 "Detected general LLM error in synthesis" 

750 ) 

751 progress_callback( 

752 "LLM error during synthesis. Attempting fallback...", 

753 87, 

754 { 

755 "phase": "synthesis_error", 

756 "error_type": error_type, 

757 }, 

758 ) 

759 else: 

760 # Generic error 

761 logger.warning("Detected unknown error in synthesis") 

762 progress_callback( 

763 "Error during synthesis. Attempting fallback...", 

764 87, 

765 { 

766 "phase": "synthesis_error", 

767 "error_type": "unknown", 

768 }, 

769 ) 

770 

771 # Extract synthesized content from findings if available 

772 synthesized_content = "" 

773 for finding in results.get("findings", []): 

774 if finding.get("phase") == "Final synthesis": 

775 synthesized_content = finding.get("content", "") 

776 break 

777 

778 # Use synthesized content as fallback 

779 if ( 

780 synthesized_content 

781 and not synthesized_content.startswith("Error:") 

782 ): 

783 logger.info( 

784 "Using existing synthesized content as fallback" 

785 ) 

786 raw_formatted_findings = synthesized_content 

787 

788 # Or use current_knowledge as another fallback 

789 elif results.get("current_knowledge"): 

790 logger.info("Using current_knowledge as fallback") 

791 raw_formatted_findings = results["current_knowledge"] 

792 

793 # Or combine all finding contents as last resort 

794 elif results.get("findings"): 

795 logger.info("Combining all findings as fallback") 

796 # First try to use any findings that are not errors 

797 valid_findings = [ 

798 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

799 for finding in results.get("findings", []) 

800 if finding.get("content") 

801 and not finding.get("content", "").startswith( 

802 "Error:" 

803 ) 

804 ] 

805 

806 if valid_findings: 

807 raw_formatted_findings = ( 

808 "# Research Results (Fallback Mode)\n\n" 

809 ) 

810 raw_formatted_findings += "\n\n".join( 

811 valid_findings 

812 ) 

813 raw_formatted_findings += f"\n\n## Error Information\n{raw_formatted_findings}" 

814 else: 

815 # Last resort: use everything including errors 

816 raw_formatted_findings = ( 

817 "# Research Results (Emergency Fallback)\n\n" 

818 ) 

819 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n" 

820 raw_formatted_findings += "\n\n".join( 

821 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

822 for finding in results.get("findings", []) 

823 if finding.get("content") 

824 ) 

825 

826 progress_callback( 

827 f"Using fallback synthesis due to {error_type} error", 

828 88, 

829 { 

830 "phase": "synthesis_fallback", 

831 "error_type": error_type, 

832 }, 

833 ) 

834 

835 logger.info( 

836 "Found formatted_findings of length: %s", 

837 len(str(raw_formatted_findings)), 

838 ) 

839 

840 try: 

841 # Check if we have an error in the findings and use enhanced error handling 

842 if isinstance( 

843 raw_formatted_findings, str 

844 ) and raw_formatted_findings.startswith("Error:"): 

845 logger.info( 

846 "Generating enhanced error report using ErrorReportGenerator" 

847 ) 

848 

849 # Get LLM for error explanation if available 

850 try: 

851 llm = get_llm( 

852 research_id=research_id, 

853 research_context=shared_research_context, 

854 ) 

855 except Exception: 

856 llm = None 

857 logger.warning( 

858 "Could not get LLM for error explanation" 

859 ) 

860 

861 # Generate comprehensive error report 

862 error_generator = ErrorReportGenerator(llm) 

863 clean_markdown = error_generator.generate_error_report( 

864 error_message=raw_formatted_findings, 

865 query=query, 

866 partial_results=results, 

867 search_iterations=results.get("iterations", 0), 

868 research_id=research_id, 

869 ) 

870 

871 logger.info( 

872 "Generated enhanced error report with %d characters", 

873 len(clean_markdown), 

874 ) 

875 else: 

876 # Get the synthesized content from the LLM directly 

877 clean_markdown = raw_formatted_findings 

878 

879 # Extract all sources from findings to add them to the summary 

880 all_links = [] 

881 for finding in results.get("findings", []): 

882 search_results = finding.get("search_results", []) 

883 if search_results: 

884 try: 

885 links = extract_links_from_search_results( 

886 search_results 

887 ) 

888 all_links.extend(links) 

889 except Exception: 

890 logger.exception( 

891 "Error processing search results/links" 

892 ) 

893 

894 logger.info( 

895 "Successfully converted to clean markdown of length: %s", 

896 len(clean_markdown), 

897 ) 

898 

899 # First send a progress update for generating the summary 

900 progress_callback( 

901 "Generating clean summary from research data...", 

902 90, 

903 {"phase": "output_generation"}, 

904 ) 

905 

906 # Send progress update for saving report 

907 progress_callback( 

908 "Saving research report to database...", 

909 95, 

910 {"phase": "report_complete"}, 

911 ) 

912 

913 # Format citations in the markdown content 

914 formatter = get_citation_formatter() 

915 formatted_content = formatter.format_document( 

916 clean_markdown 

917 ) 

918 

919 # Prepare complete report content 

920 full_report_content = f"""{formatted_content} 

921 

922## Research Metrics 

923- Search Iterations: {results["iterations"]} 

924- Generated at: {datetime.now(UTC).isoformat()} 

925""" 

926 

927 # Save sources to database 

928 from .research_sources_service import ResearchSourcesService 

929 

930 sources_service = ResearchSourcesService() 

931 if all_links: 

932 logger.info( 

933 f"Quick summary: Saving {len(all_links)} sources to database" 

934 ) 

935 sources_saved = sources_service.save_research_sources( 

936 research_id=research_id, 

937 sources=all_links, 

938 username=username, 

939 ) 

940 logger.info( 

941 f"Quick summary: Saved {sources_saved} sources for research {research_id}" 

942 ) 

943 

944 # Save report using storage abstraction 

945 from ...storage import get_report_storage 

946 

947 with get_user_db_session(username) as db_session: 

948 storage = get_report_storage(session=db_session) 

949 

950 # Prepare metadata 

951 metadata = { 

952 "iterations": results["iterations"], 

953 "generated_at": datetime.now(UTC).isoformat(), 

954 } 

955 

956 # Save report using storage abstraction 

957 success = storage.save_report( 

958 research_id=research_id, 

959 content=full_report_content, 

960 metadata=metadata, 

961 username=username, 

962 ) 

963 

964 if not success: 

965 raise Exception("Failed to save research report") 

966 

967 logger.info( 

968 f"Report saved for research_id: {research_id}" 

969 ) 

970 

971 # Skip export to additional formats - we're storing in database only 

972 

973 # Update research status in database 

974 completed_at = datetime.now(UTC).isoformat() 

975 

976 with get_user_db_session(username) as db_session: 

977 research = ( 

978 db_session.query(ResearchHistory) 

979 .filter_by(id=research_id) 

980 .first() 

981 ) 

982 

983 # Preserve existing metadata and update with new values 

984 metadata = _parse_research_metadata( 

985 research.research_meta 

986 ) 

987 

988 metadata.update( 

989 { 

990 "iterations": results["iterations"], 

991 "generated_at": datetime.now(UTC).isoformat(), 

992 } 

993 ) 

994 

995 # Use the helper function for consistent duration calculation 

996 duration_seconds = calculate_duration( 

997 research.created_at, completed_at 

998 ) 

999 

1000 research.status = ResearchStatus.COMPLETED 

1001 research.completed_at = completed_at 

1002 research.duration_seconds = duration_seconds 

1003 # Note: report_content is saved by CachedResearchService 

1004 # report_path is not used in encrypted database version 

1005 

1006 # Generate headline and topics only for news searches 

1007 if ( 

1008 metadata.get("is_news_search") 

1009 or metadata.get("search_type") == "news_analysis" 

1010 ): 

1011 try: 

1012 from ...news.utils.headline_generator import ( 

1013 generate_headline, 

1014 ) 

1015 from ...news.utils.topic_generator import ( 

1016 generate_topics, 

1017 ) 

1018 

1019 # Get the report content from database for better headline/topic generation 

1020 report_content = "" 

1021 try: 

1022 research = ( 

1023 db_session.query(ResearchHistory) 

1024 .filter_by(id=research_id) 

1025 .first() 

1026 ) 

1027 if research and research.report_content: 

1028 report_content = research.report_content 

1029 logger.info( 

1030 f"Retrieved {len(report_content)} chars from database for headline generation" 

1031 ) 

1032 else: 

1033 logger.warning( 

1034 f"No report content found in database for research_id: {research_id}" 

1035 ) 

1036 except Exception as e: 

1037 logger.warning( 

1038 f"Could not retrieve report content from database: {e}" 

1039 ) 

1040 

1041 # Generate headline 

1042 logger.info( 

1043 f"Generating headline for query: {query[:100]}" 

1044 ) 

1045 headline = generate_headline( 

1046 query, report_content 

1047 ) 

1048 metadata["generated_headline"] = headline 

1049 

1050 # Generate topics 

1051 logger.info( 

1052 f"Generating topics with category: {metadata.get('category', 'News')}" 

1053 ) 

1054 topics = generate_topics( 

1055 query=query, 

1056 findings=report_content, 

1057 category=metadata.get("category", "News"), 

1058 max_topics=6, 

1059 ) 

1060 metadata["generated_topics"] = topics 

1061 

1062 logger.info(f"Generated headline: {headline}") 

1063 logger.info(f"Generated topics: {topics}") 

1064 

1065 except Exception as e: 

1066 logger.warning( 

1067 f"Could not generate headline/topics: {e}" 

1068 ) 

1069 

1070 research.research_meta = metadata 

1071 

1072 db_session.commit() 

1073 logger.info( 

1074 f"Database commit completed for research_id: {research_id}" 

1075 ) 

1076 

1077 # Update subscription if this was triggered by a subscription 

1078 if metadata.get("subscription_id"): 

1079 try: 

1080 from ...news.subscription_manager.storage import ( 

1081 SQLSubscriptionStorage, 

1082 ) 

1083 from datetime import ( 

1084 datetime as dt, 

1085 timezone, 

1086 timedelta, 

1087 ) 

1088 

1089 sub_storage = SQLSubscriptionStorage() 

1090 subscription_id = metadata["subscription_id"] 

1091 

1092 # Get subscription to find refresh interval 

1093 subscription = sub_storage.get(subscription_id) 

1094 if subscription: 

1095 refresh_minutes = subscription.get( 

1096 "refresh_minutes", 240 

1097 ) 

1098 now = dt.now(timezone.utc) 

1099 next_refresh = now + timedelta( 

1100 minutes=refresh_minutes 

1101 ) 

1102 

1103 # Update refresh times 

1104 sub_storage.update_refresh_time( 

1105 subscription_id=subscription_id, 

1106 last_refresh=now, 

1107 next_refresh=next_refresh, 

1108 ) 

1109 

1110 # Increment stats 

1111 sub_storage.increment_stats( 

1112 subscription_id, 1 

1113 ) 

1114 

1115 logger.info( 

1116 f"Updated subscription {subscription_id} refresh times" 

1117 ) 

1118 except Exception as e: 

1119 logger.warning( 

1120 f"Could not update subscription refresh time: {e}" 

1121 ) 

1122 

1123 logger.info( 

1124 f"Database updated successfully for research_id: {research_id}" 

1125 ) 

1126 

1127 # Send the final completion message 

1128 progress_callback( 

1129 "Research completed successfully", 

1130 100, 

1131 {"phase": "complete"}, 

1132 ) 

1133 

1134 # Clean up resources 

1135 logger.info( 

1136 "Cleaning up resources for research_id: %s", research_id 

1137 ) 

1138 cleanup_research_resources( 

1139 research_id, 

1140 active_research, 

1141 termination_flags, 

1142 username, 

1143 ) 

1144 logger.info( 

1145 "Resources cleaned up for research_id: %s", research_id 

1146 ) 

1147 

1148 except Exception as inner_e: 

1149 logger.exception("Error during quick summary generation") 

1150 raise Exception( 

1151 f"Error generating quick summary: {inner_e!s}" 

1152 ) 

1153 else: 

1154 raise Exception( 

1155 "No research findings were generated. Please try again." 

1156 ) 

1157 else: 

1158 # Full Report 

1159 progress_callback( 

1160 "Generating detailed report...", 

1161 85, 

1162 {"phase": "report_generation"}, 

1163 ) 

1164 

1165 # Extract the search system from the results if available 

1166 search_system = results.get("search_system", None) 

1167 

1168 # Pass the existing search system to maintain citation indices 

1169 report_generator = IntegratedReportGenerator( 

1170 search_system=search_system, 

1171 settings_snapshot=settings_snapshot, 

1172 ) 

1173 final_report = report_generator.generate_report(results, query) 

1174 

1175 progress_callback( 

1176 "Report generation complete", 95, {"phase": "report_complete"} 

1177 ) 

1178 

1179 # Format citations in the report content 

1180 formatter = get_citation_formatter() 

1181 formatted_content = formatter.format_document( 

1182 final_report["content"] 

1183 ) 

1184 

1185 # Save sources to database 

1186 from .research_sources_service import ResearchSourcesService 

1187 

1188 sources_service = ResearchSourcesService() 

1189 if ( 

1190 hasattr(search_system, "all_links_of_system") 

1191 and search_system.all_links_of_system 

1192 ): 

1193 logger.info( 

1194 f"Saving {len(search_system.all_links_of_system)} sources to database" 

1195 ) 

1196 sources_saved = sources_service.save_research_sources( 

1197 research_id=research_id, 

1198 sources=search_system.all_links_of_system, 

1199 username=username, 

1200 ) 

1201 logger.info( 

1202 f"Saved {sources_saved} sources for research {research_id}" 

1203 ) 

1204 

1205 # Save report to database 

1206 with get_user_db_session(username) as db_session: 

1207 # Update metadata 

1208 metadata = final_report["metadata"] 

1209 metadata["iterations"] = results["iterations"] 

1210 

1211 # Save report to database 

1212 try: 

1213 research = ( 

1214 db_session.query(ResearchHistory) 

1215 .filter_by(id=research_id) 

1216 .first() 

1217 ) 

1218 

1219 if not research: 

1220 logger.error(f"Research {research_id} not found") 

1221 success = False 

1222 else: 

1223 research.report_content = formatted_content 

1224 if research.research_meta: 

1225 research.research_meta.update(metadata) 

1226 else: 

1227 research.research_meta = metadata 

1228 db_session.commit() 

1229 success = True 

1230 logger.info( 

1231 f"Saved report for research {research_id} to database" 

1232 ) 

1233 except Exception: 

1234 logger.exception("Error saving report to database") 

1235 db_session.rollback() 

1236 success = False 

1237 

1238 if not success: 

1239 raise Exception("Failed to save research report") 

1240 

1241 logger.info( 

1242 f"Report saved to database for research_id: {research_id}" 

1243 ) 

1244 

1245 # Update research status in database 

1246 completed_at = datetime.now(UTC).isoformat() 

1247 

1248 with get_user_db_session(username) as db_session: 

1249 research = ( 

1250 db_session.query(ResearchHistory) 

1251 .filter_by(id=research_id) 

1252 .first() 

1253 ) 

1254 

1255 # Preserve existing metadata and merge with report metadata 

1256 metadata = _parse_research_metadata(research.research_meta) 

1257 

1258 metadata.update(final_report["metadata"]) 

1259 metadata["iterations"] = results["iterations"] 

1260 

1261 # Use the helper function for consistent duration calculation 

1262 duration_seconds = calculate_duration( 

1263 research.created_at, completed_at 

1264 ) 

1265 

1266 research.status = ResearchStatus.COMPLETED 

1267 research.completed_at = completed_at 

1268 research.duration_seconds = duration_seconds 

1269 # Note: report_content is saved by CachedResearchService 

1270 # report_path is not used in encrypted database version 

1271 

1272 # Generate headline and topics only for news searches 

1273 if ( 

1274 metadata.get("is_news_search") 

1275 or metadata.get("search_type") == "news_analysis" 

1276 ): 

1277 try: 

1278 from ..news.utils.headline_generator import ( 

1279 generate_headline, 

1280 ) 

1281 from ..news.utils.topic_generator import ( 

1282 generate_topics, 

1283 ) 

1284 

1285 # Get the report content from database for better headline/topic generation 

1286 report_content = "" 

1287 try: 

1288 research = ( 

1289 db_session.query(ResearchHistory) 

1290 .filter_by(id=research_id) 

1291 .first() 

1292 ) 

1293 if research and research.report_content: 

1294 report_content = research.report_content 

1295 else: 

1296 logger.warning( 

1297 f"No report content found in database for research_id: {research_id}" 

1298 ) 

1299 except Exception as e: 

1300 logger.warning( 

1301 f"Could not retrieve report content from database: {e}" 

1302 ) 

1303 

1304 # Generate headline 

1305 headline = generate_headline(query, report_content) 

1306 metadata["generated_headline"] = headline 

1307 

1308 # Generate topics 

1309 topics = generate_topics( 

1310 query=query, 

1311 findings=report_content, 

1312 category=metadata.get("category", "News"), 

1313 max_topics=6, 

1314 ) 

1315 metadata["generated_topics"] = topics 

1316 

1317 logger.info(f"Generated headline: {headline}") 

1318 logger.info(f"Generated topics: {topics}") 

1319 

1320 except Exception as e: 

1321 logger.warning( 

1322 f"Could not generate headline/topics: {e}" 

1323 ) 

1324 

1325 research.research_meta = metadata 

1326 

1327 db_session.commit() 

1328 

1329 # Update subscription if this was triggered by a subscription 

1330 if metadata.get("subscription_id"): 

1331 try: 

1332 from ...news.subscription_manager.storage import ( 

1333 SQLSubscriptionStorage, 

1334 ) 

1335 from datetime import datetime as dt, timezone, timedelta 

1336 

1337 sub_storage = SQLSubscriptionStorage() 

1338 subscription_id = metadata["subscription_id"] 

1339 

1340 # Get subscription to find refresh interval 

1341 subscription = sub_storage.get(subscription_id) 

1342 if subscription: 

1343 refresh_minutes = subscription.get( 

1344 "refresh_minutes", 240 

1345 ) 

1346 now = dt.now(timezone.utc) 

1347 next_refresh = now + timedelta( 

1348 minutes=refresh_minutes 

1349 ) 

1350 

1351 # Update refresh times 

1352 sub_storage.update_refresh_time( 

1353 subscription_id=subscription_id, 

1354 last_refresh=now, 

1355 next_refresh=next_refresh, 

1356 ) 

1357 

1358 # Increment stats 

1359 sub_storage.increment_stats(subscription_id, 1) 

1360 

1361 logger.info( 

1362 f"Updated subscription {subscription_id} refresh times" 

1363 ) 

1364 except Exception as e: 

1365 logger.warning( 

1366 f"Could not update subscription refresh time: {e}" 

1367 ) 

1368 

1369 progress_callback( 

1370 "Research completed successfully", 

1371 100, 

1372 {"phase": "complete"}, 

1373 ) 

1374 

1375 # Clean up resources 

1376 cleanup_research_resources( 

1377 research_id, active_research, termination_flags, username 

1378 ) 

1379 

1380 except Exception as e: 

1381 # Handle error 

1382 error_message = f"Research failed: {e!s}" 

1383 logger.exception(error_message) 

1384 

1385 try: 

1386 # Check for common Ollama error patterns in the exception and provide more user-friendly errors 

1387 user_friendly_error = str(e) 

1388 error_context = {} 

1389 

1390 if "Error type: ollama_unavailable" in user_friendly_error: 1390 ↛ 1391line 1390 didn't jump to line 1391 because the condition on line 1390 was never true

1391 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system." 

1392 error_context = { 

1393 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly." 

1394 } 

1395 elif "Error type: model_not_found" in user_friendly_error: 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true

1396 user_friendly_error = "Required Ollama model not found. Please pull the model first." 

1397 error_context = { 

1398 "solution": "Run 'ollama pull mistral' to download the required model." 

1399 } 

1400 elif "Error type: connection_error" in user_friendly_error: 1400 ↛ 1401line 1400 didn't jump to line 1401 because the condition on line 1400 was never true

1401 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running." 

1402 error_context = { 

1403 "solution": "Ensure Ollama or your API service is running and accessible." 

1404 } 

1405 elif "Error type: api_error" in user_friendly_error: 1405 ↛ 1407line 1405 didn't jump to line 1407 because the condition on line 1405 was never true

1406 # Keep the original error message as it's already improved 

1407 error_context = { 

1408 "solution": "Check API configuration and credentials." 

1409 } 

1410 

1411 # Generate enhanced error report for failed research 

1412 enhanced_report_content = None 

1413 try: 

1414 # Get LLM for error explanation if available 

1415 try: 

1416 llm = get_llm( 

1417 research_id=research_id, 

1418 research_context=shared_research_context, 

1419 ) 

1420 except Exception: 

1421 llm = None 

1422 logger.warning( 

1423 "Could not get LLM for error explanation in failure handler" 

1424 ) 

1425 

1426 # Get partial results if they exist 

1427 partial_results = results if "results" in locals() else None 

1428 search_iterations = ( 

1429 results.get("iterations", 0) if partial_results else 0 

1430 ) 

1431 

1432 # Generate comprehensive error report 

1433 error_generator = ErrorReportGenerator(llm) 

1434 enhanced_report_content = error_generator.generate_error_report( 

1435 error_message=f"Research failed: {e!s}", 

1436 query=query, 

1437 partial_results=partial_results, 

1438 search_iterations=search_iterations, 

1439 research_id=research_id, 

1440 ) 

1441 

1442 logger.info( 

1443 "Generated enhanced error report for failed research (length: %d)", 

1444 len(enhanced_report_content), 

1445 ) 

1446 

1447 # Save enhanced error report to encrypted database 

1448 try: 

1449 # username already available from function scope (line 281) 

1450 if username: 1450 ↛ 1472line 1450 didn't jump to line 1472 because the condition on line 1450 was always true

1451 from ...storage import get_report_storage 

1452 

1453 with get_user_db_session(username) as db_session: 

1454 storage = get_report_storage(session=db_session) 

1455 success = storage.save_report( 

1456 research_id=research_id, 

1457 content=enhanced_report_content, 

1458 metadata={"error_report": True}, 

1459 username=username, 

1460 ) 

1461 if success: 1461 ↛ 1467line 1461 didn't jump to line 1467 because the condition on line 1461 was always true

1462 logger.info( 

1463 "Saved enhanced error report to encrypted database for research %s", 

1464 research_id, 

1465 ) 

1466 else: 

1467 logger.warning( 

1468 "Failed to save enhanced error report to database for research %s", 

1469 research_id, 

1470 ) 

1471 else: 

1472 logger.warning( 

1473 "Cannot save error report: username not available" 

1474 ) 

1475 

1476 except Exception as report_error: 

1477 logger.exception( 

1478 "Failed to save enhanced error report: %s", report_error 

1479 ) 

1480 

1481 except Exception as error_gen_error: 

1482 logger.exception( 

1483 "Failed to generate enhanced error report: %s", 

1484 error_gen_error, 

1485 ) 

1486 enhanced_report_content = None 

1487 

1488 # Get existing metadata from database first 

1489 existing_metadata = {} 

1490 try: 

1491 # username already available from function scope (line 281) 

1492 if username: 1492 ↛ 1505line 1492 didn't jump to line 1505 because the condition on line 1492 was always true

1493 with get_user_db_session(username) as db_session: 

1494 research = ( 

1495 db_session.query(ResearchHistory) 

1496 .filter_by(id=research_id) 

1497 .first() 

1498 ) 

1499 if research and research.research_meta: 1499 ↛ 1505line 1499 didn't jump to line 1505

1500 existing_metadata = dict(research.research_meta) 

1501 except Exception: 

1502 logger.exception("Failed to get existing metadata") 

1503 

1504 # Update metadata with more context about the error while preserving existing values 

1505 metadata = existing_metadata 

1506 metadata.update({"phase": "error", "error": user_friendly_error}) 

1507 if error_context: 1507 ↛ 1508line 1507 didn't jump to line 1508 because the condition on line 1507 was never true

1508 metadata.update(error_context) 

1509 if enhanced_report_content: 1509 ↛ 1513line 1509 didn't jump to line 1513 because the condition on line 1509 was always true

1510 metadata["has_enhanced_report"] = True 

1511 

1512 # If we still have an active research record, update its log 

1513 if research_id in active_research: 

1514 progress_callback(user_friendly_error, None, metadata) 

1515 

1516 # If termination was requested, mark as suspended instead of failed 

1517 status = ( 

1518 ResearchStatus.SUSPENDED 

1519 if (termination_flags.get(research_id)) 

1520 else ResearchStatus.FAILED 

1521 ) 

1522 message = ( 

1523 "Research was terminated by user" 

1524 if status == ResearchStatus.SUSPENDED 

1525 else user_friendly_error 

1526 ) 

1527 

1528 # Calculate duration up to termination point - using UTC consistently 

1529 now = datetime.now(UTC) 

1530 completed_at = now.isoformat() 

1531 

1532 # NOTE: Database updates from threads are handled by queue processor 

1533 # The queue_processor.queue_error_update() method is already being used below 

1534 # to safely update the database from the main thread 

1535 

1536 # Queue the error update to be processed in main thread 

1537 # Using the queue processor v2 system 

1538 from ..queue.processor_v2 import queue_processor 

1539 

1540 if username: 1540 ↛ 1554line 1540 didn't jump to line 1554 because the condition on line 1540 was always true

1541 queue_processor.queue_error_update( 

1542 username=username, 

1543 research_id=research_id, 

1544 status=status, 

1545 error_message=message, 

1546 metadata=metadata, 

1547 completed_at=completed_at, 

1548 report_path=None, 

1549 ) 

1550 logger.info( 

1551 f"Queued error update for research {research_id} with status '{status}'" 

1552 ) 

1553 else: 

1554 logger.error( 

1555 f"Cannot queue error update for research {research_id} - no username provided. " 

1556 f"Status: '{status}', Message: {message}" 

1557 ) 

1558 

1559 try: 

1560 SocketIOService().emit_to_subscribers( 

1561 "research_progress", 

1562 research_id, 

1563 {"status": status, "error": message}, 

1564 ) 

1565 except Exception: 

1566 logger.exception("Failed to emit error via socket") 

1567 

1568 except Exception: 

1569 logger.exception("Error in error handler") 

1570 

1571 # Clean up resources 

1572 cleanup_research_resources( 

1573 research_id, active_research, termination_flags, username 

1574 ) 

1575 

1576 finally: 

1577 # Clear thread-local contexts to prevent leaks when threads are reused 

1578 from ...utilities.thread_context import clear_search_context 

1579 from ...config.thread_settings import clear_settings_context 

1580 

1581 clear_search_context() 

1582 clear_settings_context() 

1583 

1584 

1585def cleanup_research_resources( 

1586 research_id, active_research, termination_flags, username=None 

1587): 

1588 """ 

1589 Clean up resources for a completed research. 

1590 

1591 Args: 

1592 research_id: The ID of the research 

1593 active_research: Dictionary of active research processes 

1594 termination_flags: Dictionary of termination flags 

1595 username: The username for database access (required for thread context) 

1596 """ 

1597 logger.info("Cleaning up resources for research %s", research_id) 

1598 

1599 # For testing: Add a small delay to simulate research taking time 

1600 # This helps test concurrent research limits 

1601 from ...settings.env_registry import is_test_mode 

1602 

1603 if is_test_mode(): 1603 ↛ 1604line 1603 didn't jump to line 1604 because the condition on line 1603 was never true

1604 import time 

1605 

1606 logger.info( 

1607 f"Test mode: Adding 5 second delay before cleanup for {research_id}" 

1608 ) 

1609 time.sleep(5) 

1610 

1611 # Get the current status from the database to determine the final status message 

1612 current_status = ResearchStatus.COMPLETED # Default 

1613 

1614 # NOTE: Queue processor already handles database updates from the main thread 

1615 # The notify_research_completed() method is called at the end of this function 

1616 # which safely updates the database status 

1617 

1618 # Notify queue processor that research completed 

1619 # This uses processor_v2 which handles database updates in the main thread 

1620 # avoiding the Flask request context issues that occur in background threads 

1621 from ..queue.processor_v2 import queue_processor 

1622 

1623 if username: 1623 ↛ 1629line 1623 didn't jump to line 1629 because the condition on line 1623 was always true

1624 queue_processor.notify_research_completed(username, research_id) 

1625 logger.info( 

1626 f"Notified queue processor of completion for research {research_id} (user: {username})" 

1627 ) 

1628 else: 

1629 logger.warning( 

1630 f"Cannot notify completion for research {research_id} - no username provided" 

1631 ) 

1632 

1633 # Remove from active research 

1634 if research_id in active_research: 

1635 del active_research[research_id] 

1636 

1637 # Remove from termination flags 

1638 if research_id in termination_flags: 

1639 del termination_flags[research_id] 

1640 

1641 # Send a final message to subscribers 

1642 try: 

1643 # Import here to avoid circular imports 

1644 from ..routes.globals import get_globals 

1645 

1646 globals_dict = get_globals() 

1647 socket_subscriptions = globals_dict.get("socket_subscriptions", {}) 

1648 

1649 # Send a final message to any remaining subscribers with explicit status 

1650 if socket_subscriptions.get(research_id): 

1651 # Use the proper status message based on database status 

1652 if current_status in ( 1652 ↛ 1656line 1652 didn't jump to line 1656 because the condition on line 1652 was never true

1653 ResearchStatus.SUSPENDED, 

1654 ResearchStatus.FAILED, 

1655 ): 

1656 final_message = { 

1657 "status": current_status, 

1658 "message": f"Research was {current_status}", 

1659 "progress": 0, # For suspended research, show 0% not 100% 

1660 } 

1661 else: 

1662 final_message = { 

1663 "status": ResearchStatus.COMPLETED, 

1664 "message": "Research process has ended and resources have been cleaned up", 

1665 "progress": 100, 

1666 } 

1667 

1668 logger.info( 

1669 "Sending final %s socket message for research %s", 

1670 current_status, 

1671 research_id, 

1672 ) 

1673 

1674 SocketIOService().emit_to_subscribers( 

1675 "research_progress", research_id, final_message 

1676 ) 

1677 

1678 except Exception: 

1679 logger.exception("Error sending final cleanup message") 

1680 

1681 

1682def handle_termination( 

1683 research_id, active_research, termination_flags, username=None 

1684): 

1685 """ 

1686 Handle the termination of a research process. 

1687 

1688 Args: 

1689 research_id: The ID of the research 

1690 active_research: Dictionary of active research processes 

1691 termination_flags: Dictionary of termination flags 

1692 username: The username for database access (required for thread context) 

1693 """ 

1694 logger.info(f"Handling termination for research {research_id}") 

1695 

1696 # Queue the status update to be processed in the main thread 

1697 # This avoids Flask request context errors in background threads 

1698 try: 

1699 from ..queue.processor_v2 import queue_processor 

1700 

1701 now = datetime.now(UTC) 

1702 completed_at = now.isoformat() 

1703 

1704 # Queue the suspension update 

1705 queue_processor.queue_error_update( 

1706 username=username, 

1707 research_id=research_id, 

1708 status=ResearchStatus.SUSPENDED, 

1709 error_message="Research was terminated by user", 

1710 metadata={"terminated_at": completed_at}, 

1711 completed_at=completed_at, 

1712 report_path=None, 

1713 ) 

1714 

1715 logger.info(f"Queued suspension update for research {research_id}") 

1716 except Exception: 

1717 logger.exception( 

1718 f"Error queueing termination update for research {research_id}" 

1719 ) 

1720 

1721 # Clean up resources (this already handles things properly) 

1722 cleanup_research_resources( 

1723 research_id, active_research, termination_flags, username 

1724 ) 

1725 

1726 

1727def cancel_research(research_id, username=None): 

1728 """ 

1729 Cancel/terminate a research process using ORM. 

1730 

1731 Args: 

1732 research_id: The ID of the research to cancel 

1733 username: The username of the user cancelling the research (optional, will try to get from session if not provided) 

1734 

1735 Returns: 

1736 bool: True if the research was found and cancelled, False otherwise 

1737 """ 

1738 try: 

1739 # Import globals from research routes 

1740 from ..routes.globals import get_globals 

1741 

1742 globals_dict = get_globals() 

1743 active_research = globals_dict["active_research"] 

1744 termination_flags = globals_dict["termination_flags"] 

1745 

1746 # Set termination flag 

1747 termination_flags[research_id] = True 

1748 

1749 # Check if the research is active 

1750 if research_id in active_research: 

1751 # Call handle_termination to update database 

1752 handle_termination( 

1753 research_id, active_research, termination_flags, username 

1754 ) 

1755 return True 

1756 else: 

1757 # Update database directly if not found in active_research 

1758 # Get username from parameter or session 

1759 if not username: 1759 ↛ 1760line 1759 didn't jump to line 1760 because the condition on line 1759 was never true

1760 from flask import session 

1761 

1762 username = session.get("username") 

1763 

1764 if not username: 1764 ↛ 1765line 1764 didn't jump to line 1765 because the condition on line 1764 was never true

1765 logger.warning( 

1766 f"No username available for cancelling research {research_id}" 

1767 ) 

1768 return False 

1769 

1770 try: 

1771 with get_user_db_session(username) as db_session: 

1772 research = ( 

1773 db_session.query(ResearchHistory) 

1774 .filter_by(id=research_id) 

1775 .first() 

1776 ) 

1777 if not research: 1777 ↛ 1778line 1777 didn't jump to line 1778 because the condition on line 1777 was never true

1778 logger.info( 

1779 f"Research {research_id} not found in database" 

1780 ) 

1781 return False 

1782 

1783 # Check if already in a terminal state 

1784 if research.status in ( 

1785 ResearchStatus.COMPLETED, 

1786 ResearchStatus.SUSPENDED, 

1787 ResearchStatus.FAILED, 

1788 ResearchStatus.ERROR, 

1789 ): 

1790 logger.info( 

1791 f"Research {research_id} already in terminal state: {research.status}" 

1792 ) 

1793 return True # Consider this a success since it's already stopped 

1794 

1795 # If it exists but isn't in active_research, still update status 

1796 research.status = ResearchStatus.SUSPENDED 

1797 db_session.commit() 

1798 logger.info( 

1799 f"Successfully suspended research {research_id}" 

1800 ) 

1801 except Exception: 

1802 logger.exception( 

1803 f"Error accessing database for research {research_id}" 

1804 ) 

1805 return False 

1806 

1807 return True 

1808 except Exception: 

1809 logger.exception( 

1810 f"Unexpected error in cancel_research for {research_id}" 

1811 ) 

1812 return False