Coverage for src / local_deep_research / web / services / research_service.py: 86%

669 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import hashlib 

2import json 

3import threading 

4import time 

5from datetime import datetime, UTC 

6from pathlib import Path 

7 

8from loguru import logger 

9 

10from ...exceptions import ResearchTerminatedException 

11from ...config.llm_config import get_llm 

12from ...settings.manager import SnapshotSettingsContext 

13 

14# Output directory for research results 

15from ...config.paths import get_research_outputs_directory 

16from ...config.search_config import get_search 

17from ...constants import ResearchStatus 

18from ...database.models import ResearchHistory, ResearchStrategy 

19from ...database.session_context import get_user_db_session 

20from ...database.thread_local_session import thread_cleanup 

21from ...error_handling.report_generator import ErrorReportGenerator 

22from ...utilities.thread_context import set_search_context 

23from ...report_generator import IntegratedReportGenerator 

24from ...search_system import AdvancedSearchSystem 

25from ...text_optimization import CitationFormatter, CitationMode 

26from ...utilities.log_utils import log_for_research 

27from ...utilities.search_utilities import extract_links_from_search_results 

28from ...utilities.threading_utils import thread_context, thread_with_app_context 

29from ..models.database import calculate_duration 

30from ...settings.env_registry import get_env_setting 

31from .socket_service import SocketIOService 

32 

33OUTPUT_DIR = get_research_outputs_directory() 

34 

35 

36# Global concurrent research limit (server-wide, across all users) 

37_MAX_GLOBAL_CONCURRENT = get_env_setting( 

38 "server.max_concurrent_research", default=10 

39) 

40_global_research_semaphore = threading.Semaphore(_MAX_GLOBAL_CONCURRENT) 

41 

42# Socket.IO emission throttling: minimum interval between progress emissions per research 

43_EMIT_THROTTLE_SECONDS = 0.2 # 200ms 

44_EMIT_TTL_SECONDS = 3600 # 1 hour — evict stale entries from orphaned research 

45_emit_cleanup_counter = 0 

46_last_emit_times: dict[str, float] = {} 

47_last_emit_lock = threading.Lock() 

48 

49 

50def _parse_research_metadata(research_meta) -> dict: 

51 """Parse research_meta into a dict, handling both dict and JSON string types.""" 

52 if isinstance(research_meta, dict): 

53 return dict(research_meta) 

54 if isinstance(research_meta, str): 

55 try: 

56 parsed = json.loads(research_meta) 

57 return dict(parsed) if isinstance(parsed, dict) else {} 

58 except json.JSONDecodeError: 

59 logger.exception("Failed to parse research_meta as JSON") 

60 return {} 

61 return {} 

62 

63 

64def get_citation_formatter(): 

65 """Get citation formatter with settings from thread context.""" 

66 # Import here to avoid circular imports 

67 from ...config.search_config import get_setting_from_snapshot 

68 

69 citation_format = get_setting_from_snapshot( 

70 "report.citation_format", "number_hyperlinks" 

71 ) 

72 mode_map = { 

73 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

74 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

75 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

76 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

77 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

78 } 

79 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS) 

80 return CitationFormatter(mode=mode) 

81 

82 

83def export_report_to_memory( 

84 markdown_content: str, format: str, title: str | None = None 

85): 

86 """ 

87 Export a markdown report to different formats in memory. 

88 

89 Uses the modular exporter registry to support multiple formats. 

90 Available formats can be queried with ExporterRegistry.get_available_formats(). 

91 

92 Args: 

93 markdown_content: The markdown content to export 

94 format: Export format (e.g., 'pdf', 'odt', 'latex', 'quarto', 'ris') 

95 title: Optional title for the document 

96 

97 Returns: 

98 Tuple of (content_bytes, filename, mimetype) 

99 """ 

100 from ...exporters import ExporterRegistry, ExportOptions 

101 

102 # Normalize format 

103 format_lower = format.lower() 

104 

105 # Get exporter from registry 

106 exporter = ExporterRegistry.get_exporter(format_lower) 

107 

108 if exporter is None: 

109 available = ExporterRegistry.get_available_formats() 

110 raise ValueError( 

111 f"Unsupported export format: {format}. " 

112 f"Available formats: {', '.join(available)}" 

113 ) 

114 

115 # Title prepending is now handled by each exporter via _prepend_title_if_needed() 

116 # PDF and ODT exporters prepend titles; RIS and other formats ignore them 

117 

118 # Create options 

119 options = ExportOptions(title=title) 

120 

121 # Export 

122 result = exporter.export(markdown_content, options) 

123 

124 logger.info( 

125 f"Generated {format_lower} in memory, size: {len(result.content)} bytes" 

126 ) 

127 

128 return result.content, result.filename, result.mimetype 

129 

130 

131def save_research_strategy(research_id, strategy_name, username=None): 

132 """ 

133 Save the strategy used for a research to the database. 

134 

135 Args: 

136 research_id: The ID of the research 

137 strategy_name: The name of the strategy used 

138 username: The username for database access (required for thread context) 

139 """ 

140 try: 

141 logger.debug( 

142 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}" 

143 ) 

144 with get_user_db_session(username) as session: 

145 # Check if a strategy already exists for this research 

146 existing_strategy = ( 

147 session.query(ResearchStrategy) 

148 .filter_by(research_id=research_id) 

149 .first() 

150 ) 

151 

152 if existing_strategy: 

153 # Update existing strategy 

154 existing_strategy.strategy_name = strategy_name 

155 logger.debug( 

156 f"Updating existing strategy for research {research_id}" 

157 ) 

158 else: 

159 # Create new strategy record 

160 new_strategy = ResearchStrategy( 

161 research_id=research_id, strategy_name=strategy_name 

162 ) 

163 session.add(new_strategy) 

164 logger.debug( 

165 f"Creating new strategy record for research {research_id}" 

166 ) 

167 

168 session.commit() 

169 logger.info( 

170 f"Saved strategy '{strategy_name}' for research {research_id}" 

171 ) 

172 except Exception: 

173 logger.exception("Error saving research strategy") 

174 

175 

176def get_research_strategy(research_id, username=None): 

177 """ 

178 Get the strategy used for a research. 

179 

180 Args: 

181 research_id: The ID of the research 

182 username: The username for database access (required for thread context) 

183 

184 Returns: 

185 str: The strategy name or None if not found 

186 """ 

187 try: 

188 with get_user_db_session(username) as session: 

189 strategy = ( 

190 session.query(ResearchStrategy) 

191 .filter_by(research_id=research_id) 

192 .first() 

193 ) 

194 

195 return strategy.strategy_name if strategy else None 

196 except Exception: 

197 logger.exception("Error getting research strategy") 

198 return None 

199 

200 

201def start_research_process( 

202 research_id, 

203 query, 

204 mode, 

205 run_research_callback, 

206 **kwargs, 

207): 

208 """ 

209 Start a research process in a background thread. 

210 

211 Args: 

212 research_id: The ID of the research 

213 query: The research query 

214 mode: The research mode (quick/detailed) 

215 run_research_callback: The callback function to run the research 

216 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.) 

217 

218 Returns: 

219 threading.Thread: The thread running the research 

220 """ 

221 from ..routes.globals import set_active_research 

222 

223 # Pass the app context to the thread. 

224 run_research_callback = thread_with_app_context(run_research_callback) 

225 

226 # Wrap callback with global concurrency limiter 

227 original_callback = run_research_callback 

228 

229 def _rate_limited_callback(*args, **kw): 

230 _global_research_semaphore.acquire() 

231 try: 

232 return original_callback(*args, **kw) 

233 finally: 

234 _global_research_semaphore.release() 

235 

236 # Start research process in a background thread 

237 thread = threading.Thread( 

238 target=_rate_limited_callback, 

239 args=( 

240 thread_context(), 

241 research_id, 

242 query, 

243 mode, 

244 ), 

245 kwargs=kwargs, 

246 ) 

247 thread.daemon = True 

248 thread.start() 

249 

250 set_active_research( 

251 research_id, 

252 { 

253 "thread": thread, 

254 "progress": 0, 

255 "status": ResearchStatus.IN_PROGRESS, 

256 "log": [], 

257 "settings": kwargs, # Store settings for reference 

258 }, 

259 ) 

260 

261 return thread 

262 

263 

264def _generate_report_path(query: str) -> Path: 

265 """ 

266 Generates a path for a new report file based on the query. 

267 

268 Args: 

269 query: The query used for the report. 

270 

271 Returns: 

272 The path that it generated. 

273 

274 """ 

275 # Generate a unique filename that does not contain 

276 # non-alphanumeric characters. 

277 query_hash = hashlib.md5( # DevSkim: ignore DS126858 

278 query.encode("utf-8"), usedforsecurity=False 

279 ).hexdigest()[:10] 

280 return OUTPUT_DIR / ( 

281 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md" 

282 ) 

283 

284 

285@log_for_research 

286@thread_cleanup 

287def run_research_process(research_id, query, mode, **kwargs): 

288 """ 

289 Run the research process in the background for a given research ID. 

290 

291 Args: 

292 research_id: The ID of the research 

293 query: The research query 

294 mode: The research mode (quick/detailed) 

295 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.) 

296 MUST include 'username' for database access 

297 """ 

298 from ..routes.globals import ( 

299 is_research_active, 

300 is_termination_requested, 

301 update_progress_and_check_active, 

302 ) 

303 

304 # Extract username - required for database access 

305 username = kwargs.get("username") 

306 logger.info(f"Research thread started with username: {username}") 

307 if not username: 

308 logger.error("No username provided to research thread") 

309 raise ValueError("Username is required for research process") 

310 # Extract user_password early so it's available for all cleanup paths 

311 user_password = kwargs.get("user_password") 

312 

313 try: 

314 # Check if this research has been terminated before we even start 

315 if is_termination_requested(research_id): 

316 logger.info( 

317 f"Research {research_id} was terminated before starting" 

318 ) 

319 cleanup_research_resources( 

320 research_id, username, user_password=user_password 

321 ) 

322 return 

323 

324 logger.info( 

325 f"Starting research process for ID {research_id}, query: {query}" 

326 ) 

327 

328 # Extract key parameters 

329 model_provider = kwargs.get("model_provider") 

330 model = kwargs.get("model") 

331 custom_endpoint = kwargs.get("custom_endpoint") 

332 search_engine = kwargs.get("search_engine") 

333 max_results = kwargs.get("max_results") 

334 time_period = kwargs.get("time_period") 

335 iterations = kwargs.get("iterations") 

336 questions_per_iteration = kwargs.get("questions_per_iteration") 

337 strategy = kwargs.get( 

338 "strategy", "source-based" 

339 ) # Default to source-based 

340 settings_snapshot = kwargs.get( 

341 "settings_snapshot", {} 

342 ) # Complete settings snapshot 

343 

344 # Log settings snapshot to debug 

345 from ...settings.logger import log_settings 

346 

347 log_settings(settings_snapshot, "Settings snapshot received in thread") 

348 

349 # Strategy should already be saved in the database before thread starts 

350 logger.info(f"Research strategy: {strategy}") 

351 

352 # Log all parameters for debugging 

353 logger.info( 

354 f"Research parameters: provider={model_provider}, model={model}, " 

355 f"search_engine={search_engine}, max_results={max_results}, " 

356 f"time_period={time_period}, iterations={iterations}, " 

357 f"questions_per_iteration={questions_per_iteration}, " 

358 f"custom_endpoint={custom_endpoint}, strategy={strategy}" 

359 ) 

360 

361 # Set up the AI Context Manager 

362 output_dir = OUTPUT_DIR / f"research_{research_id}" 

363 output_dir.mkdir(parents=True, exist_ok=True) 

364 

365 # Create a settings context that uses snapshot - no database access in threads 

366 settings_context = SnapshotSettingsContext( 

367 settings_snapshot, username=username 

368 ) 

369 

370 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var 

371 from ...settings.logger import log_settings 

372 

373 log_settings( 

374 settings_context.values, "SettingsContext values extracted" 

375 ) 

376 

377 # Set the settings context for this thread 

378 from ...config.thread_settings import ( 

379 set_settings_context, 

380 ) 

381 

382 set_settings_context(settings_context) 

383 

384 # user_password already extracted above (before termination check) 

385 

386 # Create shared research context that can be updated during research 

387 shared_research_context = { 

388 "research_id": research_id, 

389 "research_query": query, 

390 "research_mode": mode, 

391 "research_phase": "init", 

392 "search_iteration": 0, 

393 "search_engines_planned": None, 

394 "search_engine_selected": search_engine, 

395 "username": username, # Add username for queue operations 

396 "user_password": user_password, # Add password for metrics access 

397 } 

398 

399 # If this is a follow-up research, include the parent context 

400 if "research_context" in kwargs and kwargs["research_context"]: 

401 logger.info( 

402 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings" 

403 ) 

404 shared_research_context.update(kwargs["research_context"]) 

405 

406 # Do not log context keys as they may contain sensitive information 

407 logger.info(f"Created shared_research_context for user: {username}") 

408 

409 # Set search context for search tracking 

410 set_search_context(shared_research_context) 

411 

412 # Set up progress callback 

413 def progress_callback(message, progress_percent, metadata): 

414 # Frequent termination check 

415 if is_termination_requested(research_id): 

416 handle_termination(research_id, username) 

417 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try 

418 "Research was terminated by user" 

419 ) 

420 

421 # Silent phase — no UI logging or socket emission needed 

422 if metadata.get("phase") == "termination_check": 

423 return 

424 

425 # Bind research_id to logger for this specific log 

426 bound_logger = logger.bind(research_id=research_id) 

427 bound_logger.log("MILESTONE", message) 

428 

429 if "SEARCH_PLAN:" in message: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true

430 engines = message.split("SEARCH_PLAN:")[1].strip() 

431 metadata["planned_engines"] = engines 

432 metadata["phase"] = "search_planning" # Use existing phase 

433 # Update shared context for token tracking 

434 shared_research_context["search_engines_planned"] = engines 

435 shared_research_context["research_phase"] = "search_planning" 

436 

437 if "ENGINE_SELECTED:" in message: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 engine = message.split("ENGINE_SELECTED:")[1].strip() 

439 metadata["selected_engine"] = engine 

440 metadata["phase"] = "search" # Use existing 'search' phase 

441 # Update shared context for token tracking 

442 shared_research_context["search_engine_selected"] = engine 

443 shared_research_context["research_phase"] = "search" 

444 

445 # Capture other research phases for better context tracking 

446 if metadata.get("phase"): 446 ↛ 450line 446 didn't jump to line 450 because the condition on line 446 was always true

447 shared_research_context["research_phase"] = metadata["phase"] 

448 

449 # Update search iteration if available 

450 if "iteration" in metadata: 

451 shared_research_context["search_iteration"] = metadata[ 

452 "iteration" 

453 ] 

454 

455 # Adjust progress based on research mode 

456 adjusted_progress = progress_percent 

457 if ( 

458 mode == "detailed" 

459 and metadata.get("phase") == "output_generation" 

460 ): 

461 # For detailed mode, adjust the progress range for output generation 

462 adjusted_progress = min(80, progress_percent) 

463 elif ( 

464 mode == "detailed" 

465 and metadata.get("phase") == "report_generation" 

466 ): 

467 # Scale the progress from 80% to 95% for the report generation phase 

468 if progress_percent is not None: 468 ↛ 482line 468 didn't jump to line 482 because the condition on line 468 was always true

469 normalized = progress_percent / 100 

470 adjusted_progress = 80 + (normalized * 15) 

471 elif ( 

472 mode == "quick" and metadata.get("phase") == "output_generation" 

473 ): 

474 # For quick mode, ensure we're at least at 85% during output generation 

475 adjusted_progress = max(85, progress_percent) 

476 # Map any further progress within output_generation to 85-95% range 

477 if progress_percent is not None and progress_percent > 0: 477 ↛ 482line 477 didn't jump to line 482 because the condition on line 477 was always true

478 normalized = progress_percent / 100 

479 adjusted_progress = 85 + (normalized * 10) 

480 

481 # Atomically update progress and check if research is still active 

482 if adjusted_progress is not None: 

483 adjusted_progress, still_active = ( 

484 update_progress_and_check_active( 

485 research_id, adjusted_progress 

486 ) 

487 ) 

488 else: 

489 still_active = is_research_active(research_id) 

490 

491 if still_active: 

492 # Queue the progress update to be processed in main thread 

493 if adjusted_progress is not None: 

494 from ..queue.processor_v2 import queue_processor 

495 

496 if username: 496 ↛ 501line 496 didn't jump to line 501 because the condition on line 496 was always true

497 queue_processor.queue_progress_update( 

498 username, research_id, adjusted_progress 

499 ) 

500 else: 

501 logger.warning( 

502 f"Cannot queue progress update for research {research_id} - no username available" 

503 ) 

504 

505 # Emit a socket event (throttled to avoid event storms) 

506 try: 

507 # Always emit completion/error states immediately; 

508 # throttle intermediate progress updates 

509 phase = metadata.get("phase", "") 

510 is_final = ( 

511 phase 

512 in ( 

513 "complete", 

514 "error", 

515 "report_complete", 

516 ) 

517 or adjusted_progress == 100 

518 ) 

519 

520 should_emit = is_final 

521 if not is_final: 

522 now = time.monotonic() 

523 with _last_emit_lock: 

524 last = _last_emit_times.get(research_id, 0) 

525 if now - last >= _EMIT_THROTTLE_SECONDS: 

526 _last_emit_times[research_id] = now 

527 should_emit = True 

528 # Periodic TTL cleanup for orphaned entries 

529 global _emit_cleanup_counter # noqa: PLW0603 

530 _emit_cleanup_counter += 1 

531 if _emit_cleanup_counter % 100 == 0: 

532 stale = [ 

533 rid 

534 for rid, t in _last_emit_times.items() 

535 if now - t > _EMIT_TTL_SECONDS 

536 ] 

537 for rid in stale: 537 ↛ 538line 537 didn't jump to line 538 because the loop on line 537 never started

538 del _last_emit_times[rid] 

539 

540 if should_emit: 

541 # Basic event data - include message for display 

542 event_data = { 

543 "progress": adjusted_progress, 

544 "message": message, 

545 "phase": phase, 

546 } 

547 

548 # Include additional metadata for MCP/ReAct strategy display 

549 if metadata.get("thought"): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 event_data["thought"] = metadata["thought"] 

551 if metadata.get("tool"): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 event_data["tool"] = metadata["tool"] 

553 if metadata.get("arguments"): 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true

554 event_data["arguments"] = metadata["arguments"] 

555 if metadata.get("iteration"): 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true

556 event_data["iteration"] = metadata["iteration"] 

557 if metadata.get("error"): 

558 event_data["error"] = metadata["error"] 

559 if metadata.get("content"): 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 event_data["content"] = metadata["content"] 

561 

562 SocketIOService().emit_to_subscribers( 

563 "progress", research_id, event_data 

564 ) 

565 except Exception: 

566 logger.exception("Socket emit error (non-critical)") 

567 

568 # Function to check termination during long-running operations 

569 def check_termination(): 

570 if is_termination_requested(research_id): 

571 handle_termination(research_id, username) 

572 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try 

573 "Research was terminated by user during long-running operation" 

574 ) 

575 return False # Not terminated 

576 

577 # Configure the system with the specified parameters 

578 use_llm = None 

579 if model or search_engine or model_provider: 

580 # Log that we're overriding system settings 

581 logger.info( 

582 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}" 

583 ) 

584 

585 # Override LLM if model or model_provider specified 

586 if model or model_provider: 

587 try: 

588 # Get LLM with the overridden settings 

589 # Use the shared_research_context which includes username 

590 use_llm = get_llm( 

591 model_name=model, 

592 provider=model_provider, 

593 openai_endpoint_url=custom_endpoint, 

594 research_id=research_id, 

595 research_context=shared_research_context, 

596 ) 

597 

598 logger.info( 

599 f"Successfully set LLM to: provider={model_provider}, model={model}" 

600 ) 

601 except Exception as e: 

602 logger.exception( 

603 f"Error setting LLM provider={model_provider}, model={model}" 

604 ) 

605 error_msg = str(e) 

606 # Surface configuration errors to user instead of silently continuing 

607 config_error_keywords = [ 

608 "model path", 

609 "llamacpp", 

610 "cannot connect", 

611 "server", 

612 "not configured", 

613 "not responding", 

614 "directory", 

615 ".gguf", 

616 ] 

617 if any( 

618 keyword in error_msg.lower() 

619 for keyword in config_error_keywords 

620 ): 

621 # This is a configuration error the user can fix 

622 raise ValueError( 

623 f"LLM Configuration Error: {error_msg}" 

624 ) from e 

625 # For other errors, re-raise to avoid silent failures 

626 raise 

627 

628 # Create search engine first if specified, to avoid default creation without username 

629 use_search = None 

630 if search_engine: 

631 try: 

632 # Create a new search object with these settings 

633 use_search = get_search( 

634 search_tool=search_engine, 

635 llm_instance=use_llm, 

636 username=username, 

637 settings_snapshot=settings_snapshot, 

638 ) 

639 logger.info( 

640 f"Successfully created search engine: {search_engine}" 

641 ) 

642 except Exception as e: 

643 logger.exception( 

644 f"Error creating search engine {search_engine}" 

645 ) 

646 error_msg = str(e) 

647 # Surface configuration errors to user instead of silently continuing 

648 config_error_keywords = [ 

649 "searxng", 

650 "instance_url", 

651 "api_key", 

652 "cannot connect", 

653 "connection", 

654 "timeout", 

655 "not configured", 

656 ] 

657 if any( 

658 keyword in error_msg.lower() 

659 for keyword in config_error_keywords 

660 ): 

661 # This is a configuration error the user can fix 

662 raise ValueError( 

663 f"Search Engine Configuration Error ({search_engine}): {error_msg}" 

664 ) from e 

665 # For other errors, re-raise to avoid silent failures 

666 raise 

667 

668 # Set the progress callback in the system 

669 system = AdvancedSearchSystem( 

670 llm=use_llm, # type: ignore[arg-type] 

671 search=use_search, # type: ignore[arg-type] 

672 strategy_name=strategy, 

673 max_iterations=iterations, 

674 questions_per_iteration=questions_per_iteration, 

675 username=username, 

676 settings_snapshot=settings_snapshot, 

677 research_id=research_id, 

678 research_context=shared_research_context, 

679 ) 

680 system.set_progress_callback(progress_callback) 

681 

682 # Run the search 

683 progress_callback("Starting research process", 5, {"phase": "init"}) 

684 

685 try: 

686 results = system.analyze_topic(query) 

687 if mode == "quick": 

688 progress_callback( 

689 "Search complete, preparing to generate summary...", 

690 85, 

691 {"phase": "output_generation"}, 

692 ) 

693 else: 

694 progress_callback( 

695 "Search complete, generating output", 

696 80, 

697 {"phase": "output_generation"}, 

698 ) 

699 except Exception as search_error: 

700 # Better handling of specific search errors 

701 error_message = str(search_error) 

702 error_type = "unknown" 

703 

704 # Extract error details for common issues 

705 if "status code: 503" in error_message: 

706 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system." 

707 error_type = "ollama_unavailable" 

708 elif "status code: 404" in error_message: 

709 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model." 

710 error_type = "model_not_found" 

711 elif "status code:" in error_message: 

712 # Extract the status code for other HTTP errors 

713 status_code = error_message.split("status code:")[1].strip() 

714 error_message = f"API request failed with status code {status_code}. Please check your configuration." 

715 error_type = "api_error" 

716 elif "connection" in error_message.lower(): 

717 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible." 

718 error_type = "connection_error" 

719 

720 # Raise with improved error message 

721 raise RuntimeError( 

722 f"{error_message} (Error type: {error_type})" 

723 ) from search_error 

724 

725 # Generate output based on mode 

726 if mode == "quick": 

727 # Quick Summary 

728 if results.get("findings") or results.get("formatted_findings"): 

729 raw_formatted_findings = results["formatted_findings"] 

730 

731 # Check if formatted_findings contains an error message 

732 if isinstance( 

733 raw_formatted_findings, str 

734 ) and raw_formatted_findings.startswith("Error:"): 

735 logger.exception( 

736 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..." 

737 ) 

738 

739 # Determine error type for better user feedback 

740 error_type = "unknown" 

741 error_message = raw_formatted_findings.lower() 

742 

743 if ( 

744 "token limit" in error_message 

745 or "context length" in error_message 

746 ): 

747 error_type = "token_limit" 

748 # Log specific error type 

749 logger.warning( 

750 "Detected token limit error in synthesis" 

751 ) 

752 

753 # Update progress with specific error type 

754 progress_callback( 

755 "Synthesis hit token limits. Attempting fallback...", 

756 87, 

757 { 

758 "phase": "synthesis_error", 

759 "error_type": error_type, 

760 }, 

761 ) 

762 elif ( 

763 "timeout" in error_message 

764 or "timed out" in error_message 

765 ): 

766 error_type = "timeout" 

767 logger.warning("Detected timeout error in synthesis") 

768 progress_callback( 

769 "Synthesis timed out. Attempting fallback...", 

770 87, 

771 { 

772 "phase": "synthesis_error", 

773 "error_type": error_type, 

774 }, 

775 ) 

776 elif "rate limit" in error_message: 

777 error_type = "rate_limit" 

778 logger.warning("Detected rate limit error in synthesis") 

779 progress_callback( 

780 "LLM rate limit reached. Attempting fallback...", 

781 87, 

782 { 

783 "phase": "synthesis_error", 

784 "error_type": error_type, 

785 }, 

786 ) 

787 elif ( 

788 "connection" in error_message 

789 or "network" in error_message 

790 ): 

791 error_type = "connection" 

792 logger.warning("Detected connection error in synthesis") 

793 progress_callback( 

794 "Connection issue with LLM. Attempting fallback...", 

795 87, 

796 { 

797 "phase": "synthesis_error", 

798 "error_type": error_type, 

799 }, 

800 ) 

801 elif ( 

802 "llm error" in error_message 

803 or "final answer synthesis fail" in error_message 

804 ): 

805 error_type = "llm_error" 

806 logger.warning( 

807 "Detected general LLM error in synthesis" 

808 ) 

809 progress_callback( 

810 "LLM error during synthesis. Attempting fallback...", 

811 87, 

812 { 

813 "phase": "synthesis_error", 

814 "error_type": error_type, 

815 }, 

816 ) 

817 else: 

818 # Generic error 

819 logger.warning("Detected unknown error in synthesis") 

820 progress_callback( 

821 "Error during synthesis. Attempting fallback...", 

822 87, 

823 { 

824 "phase": "synthesis_error", 

825 "error_type": "unknown", 

826 }, 

827 ) 

828 

829 # Extract synthesized content from findings if available 

830 synthesized_content = "" 

831 for finding in results.get("findings", []): 

832 if finding.get("phase") == "Final synthesis": 

833 synthesized_content = finding.get("content", "") 

834 break 

835 

836 # Use synthesized content as fallback 

837 if ( 

838 synthesized_content 

839 and not synthesized_content.startswith("Error:") 

840 ): 

841 logger.info( 

842 "Using existing synthesized content as fallback" 

843 ) 

844 raw_formatted_findings = synthesized_content 

845 

846 # Or use current_knowledge as another fallback 

847 elif results.get("current_knowledge"): 

848 logger.info("Using current_knowledge as fallback") 

849 raw_formatted_findings = results["current_knowledge"] 

850 

851 # Or combine all finding contents as last resort 

852 elif results.get("findings"): 

853 logger.info("Combining all findings as fallback") 

854 # First try to use any findings that are not errors 

855 valid_findings = [ 

856 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

857 for finding in results.get("findings", []) 

858 if finding.get("content") 

859 and not finding.get("content", "").startswith( 

860 "Error:" 

861 ) 

862 ] 

863 

864 if valid_findings: 

865 raw_formatted_findings = ( 

866 "# Research Results (Fallback Mode)\n\n" 

867 ) 

868 raw_formatted_findings += "\n\n".join( 

869 valid_findings 

870 ) 

871 raw_formatted_findings += f"\n\n## Error Information\n{raw_formatted_findings}" 

872 else: 

873 # Last resort: use everything including errors 

874 raw_formatted_findings = ( 

875 "# Research Results (Emergency Fallback)\n\n" 

876 ) 

877 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n" 

878 raw_formatted_findings += "\n\n".join( 

879 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}" 

880 for finding in results.get("findings", []) 

881 if finding.get("content") 

882 ) 

883 

884 progress_callback( 

885 f"Using fallback synthesis due to {error_type} error", 

886 88, 

887 { 

888 "phase": "synthesis_fallback", 

889 "error_type": error_type, 

890 }, 

891 ) 

892 

893 logger.info( 

894 "Found formatted_findings of length: {}", 

895 len(str(raw_formatted_findings)), 

896 ) 

897 

898 try: 

899 # Check if we have an error in the findings and use enhanced error handling 

900 if isinstance( 

901 raw_formatted_findings, str 

902 ) and raw_formatted_findings.startswith("Error:"): 

903 logger.info( 

904 "Generating enhanced error report using ErrorReportGenerator" 

905 ) 

906 

907 # Generate comprehensive error report 

908 # ErrorReportGenerator does not use LLM (kept for compat) 

909 error_generator = ErrorReportGenerator() 

910 clean_markdown = error_generator.generate_error_report( 

911 error_message=raw_formatted_findings, 

912 query=query, 

913 partial_results=results, 

914 search_iterations=results.get("iterations", 0), 

915 research_id=research_id, 

916 ) 

917 

918 logger.info( 

919 "Generated enhanced error report with {} characters", 

920 len(clean_markdown), 

921 ) 

922 else: 

923 # Get the synthesized content from the LLM directly 

924 clean_markdown = raw_formatted_findings 

925 

926 # Extract all sources from findings to add them to the summary 

927 all_links = [] 

928 for finding in results.get("findings", []): 

929 search_results = finding.get("search_results", []) 

930 if search_results: 

931 try: 

932 links = extract_links_from_search_results( 

933 search_results 

934 ) 

935 all_links.extend(links) 

936 except Exception: 

937 logger.exception( 

938 "Error processing search results/links" 

939 ) 

940 

941 logger.info( 

942 "Successfully converted to clean markdown of length: {}", 

943 len(clean_markdown), 

944 ) 

945 

946 # First send a progress update for generating the summary 

947 progress_callback( 

948 "Generating clean summary from research data...", 

949 90, 

950 {"phase": "output_generation"}, 

951 ) 

952 

953 # Send progress update for saving report 

954 progress_callback( 

955 "Saving research report to database...", 

956 95, 

957 {"phase": "report_complete"}, 

958 ) 

959 

960 # Format citations in the markdown content 

961 formatter = get_citation_formatter() 

962 formatted_content = formatter.format_document( 

963 clean_markdown 

964 ) 

965 

966 # Prepare complete report content 

967 full_report_content = f"""{formatted_content} 

968 

969## Research Metrics 

970- Search Iterations: {results["iterations"]} 

971- Generated at: {datetime.now(UTC).isoformat()} 

972""" 

973 

974 # Save sources to database (non-fatal - report should still 

975 # be saved even if source saving fails) 

976 try: 

977 from .research_sources_service import ( 

978 ResearchSourcesService, 

979 ) 

980 

981 sources_service = ResearchSourcesService() 

982 if all_links: 

983 logger.info( 

984 f"Quick summary: Saving {len(all_links)} sources to database" 

985 ) 

986 sources_saved = ( 

987 sources_service.save_research_sources( 

988 research_id=research_id, 

989 sources=all_links, 

990 username=username, 

991 ) 

992 ) 

993 logger.info( 

994 f"Quick summary: Saved {sources_saved} sources for research {research_id}" 

995 ) 

996 except Exception: 

997 logger.exception( 

998 f"Failed to save sources for research {research_id} (continuing with report save)" 

999 ) 

1000 

1001 # Save report using storage abstraction 

1002 from ...storage import get_report_storage 

1003 

1004 with get_user_db_session(username) as db_session: 

1005 storage = get_report_storage(session=db_session) 

1006 

1007 # Prepare metadata 

1008 metadata = { 

1009 "iterations": results["iterations"], 

1010 "generated_at": datetime.now(UTC).isoformat(), 

1011 } 

1012 

1013 # Save report using storage abstraction 

1014 success = storage.save_report( 

1015 research_id=research_id, 

1016 content=full_report_content, 

1017 metadata=metadata, 

1018 username=username, 

1019 ) 

1020 

1021 if not success: 

1022 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except 

1023 

1024 logger.info( 

1025 f"Report saved for research_id: {research_id}" 

1026 ) 

1027 

1028 # Skip export to additional formats - we're storing in database only 

1029 

1030 # Update research status in database 

1031 completed_at = datetime.now(UTC).isoformat() 

1032 

1033 with get_user_db_session(username) as db_session: 

1034 research = ( 

1035 db_session.query(ResearchHistory) 

1036 .filter_by(id=research_id) 

1037 .first() 

1038 ) 

1039 

1040 # Preserve existing metadata and update with new values 

1041 metadata = _parse_research_metadata( 

1042 research.research_meta 

1043 ) 

1044 

1045 metadata.update( 

1046 { 

1047 "iterations": results["iterations"], 

1048 "generated_at": datetime.now(UTC).isoformat(), 

1049 } 

1050 ) 

1051 

1052 # Use the helper function for consistent duration calculation 

1053 duration_seconds = calculate_duration( 

1054 research.created_at, completed_at 

1055 ) 

1056 

1057 research.status = ResearchStatus.COMPLETED 

1058 research.completed_at = completed_at 

1059 research.duration_seconds = duration_seconds 

1060 # Note: report_content is saved by CachedResearchService 

1061 # report_path is not used in encrypted database version 

1062 

1063 # Generate headline and topics only for news searches 

1064 if ( 

1065 metadata.get("is_news_search") 

1066 or metadata.get("search_type") == "news_analysis" 

1067 ): 

1068 try: 

1069 from ...news.utils.headline_generator import ( 

1070 generate_headline, 

1071 ) 

1072 from ...news.utils.topic_generator import ( 

1073 generate_topics, 

1074 ) 

1075 

1076 # Get the report content from database for better headline/topic generation 

1077 report_content = "" 

1078 try: 

1079 research = ( 

1080 db_session.query(ResearchHistory) 

1081 .filter_by(id=research_id) 

1082 .first() 

1083 ) 

1084 if research and research.report_content: 1084 ↛ 1090line 1084 didn't jump to line 1090 because the condition on line 1084 was always true

1085 report_content = research.report_content 

1086 logger.info( 

1087 f"Retrieved {len(report_content)} chars from database for headline generation" 

1088 ) 

1089 else: 

1090 logger.warning( 

1091 f"No report content found in database for research_id: {research_id}" 

1092 ) 

1093 except Exception: 

1094 logger.warning( 

1095 "Could not retrieve report content from database" 

1096 ) 

1097 

1098 # Generate headline 

1099 logger.info( 

1100 f"Generating headline for query: {query[:100]}" 

1101 ) 

1102 headline = generate_headline( 

1103 query, report_content 

1104 ) 

1105 metadata["generated_headline"] = headline 

1106 

1107 # Generate topics 

1108 logger.info( 

1109 f"Generating topics with category: {metadata.get('category', 'News')}" 

1110 ) 

1111 topics = generate_topics( 

1112 query=query, 

1113 findings=report_content, 

1114 category=metadata.get("category", "News"), 

1115 max_topics=6, 

1116 ) 

1117 metadata["generated_topics"] = topics 

1118 

1119 logger.info(f"Generated headline: {headline}") 

1120 logger.info(f"Generated topics: {topics}") 

1121 

1122 except Exception: 

1123 logger.warning( 

1124 "Could not generate headline/topics" 

1125 ) 

1126 

1127 research.research_meta = metadata 

1128 

1129 db_session.commit() 

1130 logger.info( 

1131 f"Database commit completed for research_id: {research_id}" 

1132 ) 

1133 

1134 # Update subscription if this was triggered by a subscription 

1135 if metadata.get("subscription_id"): 

1136 try: 

1137 from ...news.subscription_manager.storage import ( 

1138 SQLSubscriptionStorage, 

1139 ) 

1140 from datetime import ( 

1141 datetime as dt, 

1142 timezone, 

1143 timedelta, 

1144 ) 

1145 

1146 sub_storage = SQLSubscriptionStorage(db_session) 

1147 subscription_id = metadata["subscription_id"] 

1148 

1149 # Get subscription to find refresh interval 

1150 subscription = sub_storage.get(subscription_id) 

1151 if subscription: 1151 ↛ 1180line 1151 didn't jump to line 1180

1152 refresh_minutes = subscription.get( 

1153 "refresh_minutes", 240 

1154 ) 

1155 now = dt.now(timezone.utc) 

1156 next_refresh = now + timedelta( 

1157 minutes=refresh_minutes 

1158 ) 

1159 

1160 # Update refresh times 

1161 sub_storage.update_refresh_time( 

1162 subscription_id=subscription_id, 

1163 last_refresh=now, 

1164 next_refresh=next_refresh, 

1165 ) 

1166 

1167 # Increment stats 

1168 sub_storage.increment_stats( 

1169 subscription_id, 1 

1170 ) 

1171 

1172 logger.info( 

1173 f"Updated subscription {subscription_id} refresh times" 

1174 ) 

1175 except Exception: 

1176 logger.warning( 

1177 "Could not update subscription refresh time" 

1178 ) 

1179 

1180 logger.info( 

1181 f"Database updated successfully for research_id: {research_id}" 

1182 ) 

1183 

1184 # Send the final completion message 

1185 progress_callback( 

1186 "Research completed successfully", 

1187 100, 

1188 {"phase": "complete"}, 

1189 ) 

1190 

1191 # Clean up resources 

1192 logger.info( 

1193 "Cleaning up resources for research_id: {}", research_id 

1194 ) 

1195 cleanup_research_resources( 

1196 research_id, username, user_password=user_password 

1197 ) 

1198 logger.info( 

1199 "Resources cleaned up for research_id: {}", research_id 

1200 ) 

1201 

1202 except Exception as inner_e: 

1203 logger.exception("Error during quick summary generation") 

1204 raise RuntimeError( 

1205 f"Error generating quick summary: {inner_e!s}" 

1206 ) 

1207 else: 

1208 raise RuntimeError( # noqa: TRY301 — triggers research failure handling in outer except 

1209 "No research findings were generated. Please try again." 

1210 ) 

1211 else: 

1212 # Full Report 

1213 progress_callback( 

1214 "Generating detailed report...", 

1215 85, 

1216 {"phase": "report_generation"}, 

1217 ) 

1218 

1219 # Extract the search system from the results if available 

1220 search_system = results.get("search_system", None) 

1221 

1222 # Wrapper that maps report generator's 0-100% to 85-95% range 

1223 # and relays cancellation checks through the outer progress_callback 

1224 def report_progress_callback(message, progress_percent, metadata): 

1225 if progress_percent is not None: 

1226 adjusted = 85 + (progress_percent / 100) * 10 

1227 else: 

1228 adjusted = progress_percent 

1229 progress_callback(message, adjusted, metadata) 

1230 

1231 # Pass the existing search system to maintain citation indices 

1232 report_generator = IntegratedReportGenerator( 

1233 search_system=search_system, 

1234 settings_snapshot=settings_snapshot, 

1235 ) 

1236 final_report = report_generator.generate_report( 

1237 results, query, progress_callback=report_progress_callback 

1238 ) 

1239 

1240 progress_callback( 

1241 "Report generation complete", 95, {"phase": "report_complete"} 

1242 ) 

1243 

1244 # Format citations in the report content 

1245 formatter = get_citation_formatter() 

1246 formatted_content = formatter.format_document( 

1247 final_report["content"] 

1248 ) 

1249 

1250 # Save sources to database (non-fatal - report should still be saved 

1251 # even if source saving fails, e.g. due to expired session password) 

1252 try: 

1253 from .research_sources_service import ResearchSourcesService 

1254 

1255 sources_service = ResearchSourcesService() 

1256 all_links = getattr(search_system, "all_links_of_system", None) 

1257 if all_links: 

1258 logger.info(f"Saving {len(all_links)} sources to database") 

1259 sources_saved = sources_service.save_research_sources( 

1260 research_id=research_id, 

1261 sources=all_links, 

1262 username=username, 

1263 ) 

1264 logger.info( 

1265 f"Saved {sources_saved} sources for research {research_id}" 

1266 ) 

1267 except Exception: 

1268 logger.exception( 

1269 f"Failed to save sources for research {research_id} (continuing with report save)" 

1270 ) 

1271 

1272 # Save report to database 

1273 with get_user_db_session(username) as db_session: 

1274 # Update metadata 

1275 metadata = final_report["metadata"] 

1276 metadata["iterations"] = results["iterations"] 

1277 

1278 # Save report to database 

1279 try: 

1280 research = ( 

1281 db_session.query(ResearchHistory) 

1282 .filter_by(id=research_id) 

1283 .first() 

1284 ) 

1285 

1286 if not research: 1286 ↛ 1287line 1286 didn't jump to line 1287 because the condition on line 1286 was never true

1287 logger.error(f"Research {research_id} not found") 

1288 success = False 

1289 else: 

1290 research.report_content = formatted_content 

1291 if research.research_meta: 1291 ↛ 1292line 1291 didn't jump to line 1292 because the condition on line 1291 was never true

1292 research.research_meta.update(metadata) 

1293 else: 

1294 research.research_meta = metadata 

1295 db_session.commit() 

1296 success = True 

1297 logger.info( 

1298 f"Saved report for research {research_id} to database" 

1299 ) 

1300 except Exception: 

1301 logger.exception("Error saving report to database") 

1302 db_session.rollback() 

1303 success = False 

1304 

1305 if not success: 1305 ↛ 1306line 1305 didn't jump to line 1306 because the condition on line 1305 was never true

1306 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except 

1307 

1308 logger.info( 

1309 f"Report saved to database for research_id: {research_id}" 

1310 ) 

1311 

1312 # Update research status in database 

1313 completed_at = datetime.now(UTC).isoformat() 

1314 

1315 with get_user_db_session(username) as db_session: 

1316 research = ( 

1317 db_session.query(ResearchHistory) 

1318 .filter_by(id=research_id) 

1319 .first() 

1320 ) 

1321 

1322 # Preserve existing metadata and merge with report metadata 

1323 metadata = _parse_research_metadata(research.research_meta) 

1324 

1325 metadata.update(final_report["metadata"]) 

1326 metadata["iterations"] = results["iterations"] 

1327 

1328 # Use the helper function for consistent duration calculation 

1329 duration_seconds = calculate_duration( 

1330 research.created_at, completed_at 

1331 ) 

1332 

1333 research.status = ResearchStatus.COMPLETED 

1334 research.completed_at = completed_at 

1335 research.duration_seconds = duration_seconds 

1336 # Note: report_content is saved by CachedResearchService 

1337 # report_path is not used in encrypted database version 

1338 

1339 # Generate headline and topics only for news searches 

1340 if ( 1340 ↛ 1344line 1340 didn't jump to line 1344 because the condition on line 1340 was never true

1341 metadata.get("is_news_search") 

1342 or metadata.get("search_type") == "news_analysis" 

1343 ): 

1344 try: 

1345 from ..news.utils.headline_generator import ( 

1346 generate_headline, # type: ignore[no-redef] 

1347 ) 

1348 from ..news.utils.topic_generator import ( 

1349 generate_topics, # type: ignore[no-redef] 

1350 ) 

1351 

1352 # Get the report content from database for better headline/topic generation 

1353 report_content = "" 

1354 try: 

1355 research = ( 

1356 db_session.query(ResearchHistory) 

1357 .filter_by(id=research_id) 

1358 .first() 

1359 ) 

1360 if research and research.report_content: 

1361 report_content = research.report_content 

1362 else: 

1363 logger.warning( 

1364 f"No report content found in database for research_id: {research_id}" 

1365 ) 

1366 except Exception: 

1367 logger.warning( 

1368 "Could not retrieve report content from database" 

1369 ) 

1370 

1371 # Generate headline 

1372 headline = generate_headline(query, report_content) 

1373 metadata["generated_headline"] = headline 

1374 

1375 # Generate topics 

1376 topics = generate_topics( 

1377 query=query, 

1378 findings=report_content, 

1379 category=metadata.get("category", "News"), 

1380 max_topics=6, 

1381 ) 

1382 metadata["generated_topics"] = topics 

1383 

1384 logger.info(f"Generated headline: {headline}") 

1385 logger.info(f"Generated topics: {topics}") 

1386 

1387 except Exception: 

1388 logger.warning("Could not generate headline/topics") 

1389 

1390 research.research_meta = metadata 

1391 

1392 db_session.commit() 

1393 

1394 # Update subscription if this was triggered by a subscription 

1395 if metadata.get("subscription_id"): 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true

1396 try: 

1397 from ...news.subscription_manager.storage import ( 

1398 SQLSubscriptionStorage, 

1399 ) 

1400 from datetime import datetime as dt, timezone, timedelta 

1401 

1402 sub_storage = SQLSubscriptionStorage(db_session) 

1403 subscription_id = metadata["subscription_id"] 

1404 

1405 # Get subscription to find refresh interval 

1406 subscription = sub_storage.get(subscription_id) 

1407 if subscription: 

1408 refresh_minutes = subscription.get( 

1409 "refresh_minutes", 240 

1410 ) 

1411 now = dt.now(timezone.utc) 

1412 next_refresh = now + timedelta( 

1413 minutes=refresh_minutes 

1414 ) 

1415 

1416 # Update refresh times 

1417 sub_storage.update_refresh_time( 

1418 subscription_id=subscription_id, 

1419 last_refresh=now, 

1420 next_refresh=next_refresh, 

1421 ) 

1422 

1423 # Increment stats 

1424 sub_storage.increment_stats(subscription_id, 1) 

1425 

1426 logger.info( 

1427 f"Updated subscription {subscription_id} refresh times" 

1428 ) 

1429 except Exception: 

1430 logger.warning( 

1431 "Could not update subscription refresh time" 

1432 ) 

1433 

1434 progress_callback( 

1435 "Research completed successfully", 

1436 100, 

1437 {"phase": "complete"}, 

1438 ) 

1439 

1440 # Clean up resources 

1441 cleanup_research_resources( 

1442 research_id, username, user_password=user_password 

1443 ) 

1444 

1445 except ResearchTerminatedException: 

1446 logger.info(f"Research {research_id} terminated by user") 

1447 # handle_termination() was already called by progress_callback 

1448 # before raising, which: 

1449 # 1. Queued SUSPENDED status update via queue_processor 

1450 # 2. Called cleanup_research_resources() 

1451 # No additional cleanup needed here. 

1452 

1453 except Exception as e: 

1454 # Handle error 

1455 error_message = f"Research failed: {e!s}" 

1456 logger.exception(error_message) 

1457 

1458 try: 

1459 # Check for common Ollama error patterns in the exception and provide more user-friendly errors 

1460 user_friendly_error = str(e) 

1461 error_context = {} 

1462 

1463 if "Error type: ollama_unavailable" in user_friendly_error: 

1464 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system." 

1465 error_context = { 

1466 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly." 

1467 } 

1468 elif "Error type: model_not_found" in user_friendly_error: 

1469 user_friendly_error = "Required Ollama model not found. Please pull the model first." 

1470 error_context = { 

1471 "solution": "Run 'ollama pull mistral' to download the required model." 

1472 } 

1473 elif "Error type: connection_error" in user_friendly_error: 

1474 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running." 

1475 error_context = { 

1476 "solution": "Ensure Ollama or your API service is running and accessible." 

1477 } 

1478 elif "Error type: api_error" in user_friendly_error: 

1479 # Keep the original error message as it's already improved 

1480 error_context = { 

1481 "solution": "Check API configuration and credentials." 

1482 } 

1483 

1484 # Generate enhanced error report for failed research 

1485 enhanced_report_content = None 

1486 try: 

1487 # Get partial results if they exist 

1488 partial_results = results if "results" in locals() else None 

1489 search_iterations = ( 

1490 results.get("iterations", 0) if partial_results else 0 

1491 ) 

1492 

1493 # Generate comprehensive error report 

1494 # ErrorReportGenerator does not use LLM (kept for compat) 

1495 error_generator = ErrorReportGenerator() 

1496 enhanced_report_content = error_generator.generate_error_report( 

1497 error_message=f"Research failed: {e!s}", 

1498 query=query, 

1499 partial_results=partial_results, 

1500 search_iterations=search_iterations, 

1501 research_id=research_id, 

1502 ) 

1503 

1504 logger.info( 

1505 "Generated enhanced error report for failed research (length: {})", 

1506 len(enhanced_report_content), 

1507 ) 

1508 

1509 # Save enhanced error report to encrypted database 

1510 try: 

1511 # username already available from function scope (line 281) 

1512 if username: 1512 ↛ 1534line 1512 didn't jump to line 1534 because the condition on line 1512 was always true

1513 from ...storage import get_report_storage 

1514 

1515 with get_user_db_session(username) as db_session: 

1516 storage = get_report_storage(session=db_session) 

1517 success = storage.save_report( 

1518 research_id=research_id, 

1519 content=enhanced_report_content, 

1520 metadata={"error_report": True}, 

1521 username=username, 

1522 ) 

1523 if success: 

1524 logger.info( 

1525 "Saved enhanced error report to encrypted database for research {}", 

1526 research_id, 

1527 ) 

1528 else: 

1529 logger.warning( 

1530 "Failed to save enhanced error report to database for research {}", 

1531 research_id, 

1532 ) 

1533 else: 

1534 logger.warning( 

1535 "Cannot save error report: username not available" 

1536 ) 

1537 

1538 except Exception as report_error: 

1539 logger.exception( 

1540 "Failed to save enhanced error report: {}", report_error 

1541 ) 

1542 

1543 except Exception as error_gen_error: 

1544 logger.exception( 

1545 "Failed to generate enhanced error report: {}", 

1546 error_gen_error, 

1547 ) 

1548 enhanced_report_content = None 

1549 

1550 # Get existing metadata from database first 

1551 existing_metadata = {} 

1552 try: 

1553 # username already available from function scope (line 281) 

1554 if username: 1554 ↛ 1567line 1554 didn't jump to line 1567 because the condition on line 1554 was always true

1555 with get_user_db_session(username) as db_session: 

1556 research = ( 

1557 db_session.query(ResearchHistory) 

1558 .filter_by(id=research_id) 

1559 .first() 

1560 ) 

1561 if research and research.research_meta: 

1562 existing_metadata = dict(research.research_meta) 

1563 except Exception: 

1564 logger.exception("Failed to get existing metadata") 

1565 

1566 # Update metadata with more context about the error while preserving existing values 

1567 metadata = existing_metadata 

1568 metadata.update({"phase": "error", "error": user_friendly_error}) 

1569 if error_context: 

1570 metadata.update(error_context) 

1571 if enhanced_report_content: 1571 ↛ 1575line 1571 didn't jump to line 1575 because the condition on line 1571 was always true

1572 metadata["has_enhanced_report"] = True 

1573 

1574 # If we still have an active research record, update its log 

1575 if is_research_active(research_id): 

1576 progress_callback(user_friendly_error, None, metadata) 

1577 

1578 # If termination was requested, mark as suspended instead of failed 

1579 status = ( 

1580 ResearchStatus.SUSPENDED 

1581 if is_termination_requested(research_id) 

1582 else ResearchStatus.FAILED 

1583 ) 

1584 message = ( 

1585 "Research was terminated by user" 

1586 if status == ResearchStatus.SUSPENDED 

1587 else user_friendly_error 

1588 ) 

1589 

1590 # Calculate duration up to termination point - using UTC consistently 

1591 now = datetime.now(UTC) 

1592 completed_at = now.isoformat() 

1593 

1594 # NOTE: Database updates from threads are handled by queue processor 

1595 # The queue_processor.queue_error_update() method is already being used below 

1596 # to safely update the database from the main thread 

1597 

1598 # Queue the error update to be processed in main thread 

1599 # Using the queue processor v2 system 

1600 from ..queue.processor_v2 import queue_processor 

1601 

1602 if username: 1602 ↛ 1616line 1602 didn't jump to line 1616 because the condition on line 1602 was always true

1603 queue_processor.queue_error_update( 

1604 username=username, 

1605 research_id=research_id, 

1606 status=status, 

1607 error_message=message, 

1608 metadata=metadata, 

1609 completed_at=completed_at, 

1610 report_path=None, 

1611 ) 

1612 logger.info( 

1613 f"Queued error update for research {research_id} with status '{status}'" 

1614 ) 

1615 else: 

1616 logger.error( 

1617 f"Cannot queue error update for research {research_id} - no username provided. " 

1618 f"Status: '{status}', Message: {message}" 

1619 ) 

1620 

1621 try: 

1622 SocketIOService().emit_to_subscribers( 

1623 "progress", 

1624 research_id, 

1625 {"status": status, "error": message}, 

1626 ) 

1627 except Exception: 

1628 logger.exception("Failed to emit error via socket") 

1629 

1630 except Exception: 

1631 logger.exception("Error in error handler") 

1632 

1633 # Clean up resources 

1634 cleanup_research_resources( 

1635 research_id, username, user_password=user_password 

1636 ) 

1637 

1638 finally: 

1639 # RESOURCE CLEANUP: Close search engine HTTP sessions. 

1640 # 

1641 # Search engines (created via get_search()) may hold HTTP connection 

1642 # pools. Currently only SemanticScholarSearchEngine creates a 

1643 # persistent SafeSession; other engines use stateless safe_get()/ 

1644 # safe_post() utility functions. However, BaseSearchEngine.close() 

1645 # is safe to call on any engine — it checks for a 'session' 

1646 # attribute and is fully idempotent (SemanticScholar sets 

1647 # self.session = None after close). 

1648 # 

1649 # Neither @thread_cleanup nor cleanup_research_resources() close 

1650 # the search engine — @thread_cleanup only handles database sessions 

1651 # and context cleanup, and cleanup_research_resources() only handles 

1652 # status updates, notifications, and tracking dict removal. 

1653 # 

1654 # Without this explicit close, search engine sessions rely on 

1655 # Python's non-deterministic garbage collection (__del__) for 

1656 # cleanup, which can cause file descriptor exhaustion under 

1657 # sustained load. 

1658 from ...utilities.resource_utils import safe_close 

1659 

1660 if "use_search" in locals(): 

1661 safe_close(use_search, "research search engine") 

1662 # Close search system (cascades to strategy thread pools). 

1663 # See AdvancedSearchSystem.close() for details. 

1664 if "system" in locals(): 

1665 safe_close(system, "research system") 

1666 # Close the LLM instance created for model/provider overrides. 

1667 # system.close() does NOT close the LLM passed to it via system.model, 

1668 # so we must close it explicitly here. 

1669 if "use_llm" in locals(): 

1670 safe_close(use_llm, "research LLM") 

1671 

1672 

1673def cleanup_research_resources(research_id, username=None, user_password=None): 

1674 """ 

1675 Clean up resources for a completed research. 

1676 

1677 Args: 

1678 research_id: The ID of the research 

1679 username: The username for database access (required for thread context) 

1680 """ 

1681 from ..routes.globals import cleanup_research 

1682 

1683 logger.info("Cleaning up resources for research {}", research_id) 

1684 

1685 # For testing: Add a small delay to simulate research taking time 

1686 # This helps test concurrent research limits 

1687 from ...settings.env_registry import is_test_mode 

1688 

1689 if is_test_mode(): 

1690 import time 

1691 

1692 logger.info( 

1693 f"Test mode: Adding 5 second delay before cleanup for {research_id}" 

1694 ) 

1695 time.sleep(5) 

1696 

1697 # Get the current status from the database to determine the final status message 

1698 current_status = ResearchStatus.COMPLETED # Default 

1699 

1700 # NOTE: Queue processor already handles database updates from the main thread 

1701 # The notify_research_completed() method is called at the end of this function 

1702 # which safely updates the database status 

1703 

1704 # Notify queue processor that research completed 

1705 # This uses processor_v2 which handles database updates in the main thread 

1706 # avoiding the Flask request context issues that occur in background threads 

1707 from ..queue.processor_v2 import queue_processor 

1708 

1709 if username: 

1710 queue_processor.notify_research_completed( 

1711 username, research_id, user_password=user_password 

1712 ) 

1713 logger.info( 

1714 f"Notified queue processor of completion for research {research_id} (user: {username})" 

1715 ) 

1716 else: 

1717 logger.warning( 

1718 f"Cannot notify completion for research {research_id} - no username provided" 

1719 ) 

1720 

1721 # Remove from active research and termination flags atomically 

1722 cleanup_research(research_id) 

1723 

1724 # Clean up throttle state for this research 

1725 with _last_emit_lock: 

1726 _last_emit_times.pop(research_id, None) 

1727 

1728 # Send a final message to subscribers 

1729 try: 

1730 # Send a final message to any remaining subscribers with explicit status 

1731 # Use the proper status message based on database status 

1732 if current_status in ( 1732 ↛ 1736line 1732 didn't jump to line 1736 because the condition on line 1732 was never true

1733 ResearchStatus.SUSPENDED, 

1734 ResearchStatus.FAILED, 

1735 ): 

1736 final_message = { 

1737 "status": current_status, 

1738 "message": f"Research was {current_status}", 

1739 "progress": 0, # For suspended research, show 0% not 100% 

1740 } 

1741 else: 

1742 final_message = { 

1743 "status": ResearchStatus.COMPLETED, 

1744 "message": "Research process has ended and resources have been cleaned up", 

1745 "progress": 100, 

1746 } 

1747 

1748 logger.info( 

1749 "Sending final {} socket message for research {}", 

1750 current_status, 

1751 research_id, 

1752 ) 

1753 

1754 SocketIOService().emit_to_subscribers( 

1755 "progress", research_id, final_message 

1756 ) 

1757 

1758 # Clean up socket subscriptions for this research 

1759 SocketIOService().remove_subscriptions_for_research(research_id) 

1760 

1761 except Exception: 

1762 logger.exception("Error sending final cleanup message") 

1763 

1764 

1765def handle_termination(research_id, username=None): 

1766 """ 

1767 Handle the termination of a research process. 

1768 

1769 Args: 

1770 research_id: The ID of the research 

1771 username: The username for database access (required for thread context) 

1772 """ 

1773 logger.info(f"Handling termination for research {research_id}") 

1774 

1775 # Queue the status update to be processed in the main thread 

1776 # This avoids Flask request context errors in background threads 

1777 try: 

1778 from ..queue.processor_v2 import queue_processor 

1779 

1780 now = datetime.now(UTC) 

1781 completed_at = now.isoformat() 

1782 

1783 # Queue the suspension update 

1784 queue_processor.queue_error_update( 

1785 username=username, 

1786 research_id=research_id, 

1787 status=ResearchStatus.SUSPENDED, 

1788 error_message="Research was terminated by user", 

1789 metadata={"terminated_at": completed_at}, 

1790 completed_at=completed_at, 

1791 report_path=None, 

1792 ) 

1793 

1794 logger.info(f"Queued suspension update for research {research_id}") 

1795 except Exception: 

1796 logger.exception( 

1797 f"Error queueing termination update for research {research_id}" 

1798 ) 

1799 

1800 # Clean up resources (this already handles things properly) 

1801 cleanup_research_resources(research_id, username) 

1802 

1803 

1804def cancel_research(research_id, username): 

1805 """ 

1806 Cancel/terminate a research process using ORM. 

1807 

1808 Args: 

1809 research_id: The ID of the research to cancel 

1810 username: The username of the user cancelling the research 

1811 

1812 Returns: 

1813 bool: True if the research was found and cancelled, False otherwise 

1814 """ 

1815 try: 

1816 from ..routes.globals import is_research_active, set_termination_flag 

1817 

1818 # Set termination flag 

1819 set_termination_flag(research_id) 

1820 

1821 # Check if the research is active 

1822 if is_research_active(research_id): 

1823 # Call handle_termination to update database 

1824 handle_termination(research_id, username) 

1825 return True 

1826 try: 

1827 with get_user_db_session(username) as db_session: 

1828 research = ( 

1829 db_session.query(ResearchHistory) 

1830 .filter_by(id=research_id) 

1831 .first() 

1832 ) 

1833 if not research: 

1834 logger.info(f"Research {research_id} not found in database") 

1835 return False 

1836 

1837 # Check if already in a terminal state 

1838 if research.status in ( 

1839 ResearchStatus.COMPLETED, 

1840 ResearchStatus.SUSPENDED, 

1841 ResearchStatus.FAILED, 

1842 ResearchStatus.ERROR, 

1843 ): 

1844 logger.info( 

1845 f"Research {research_id} already in terminal state: {research.status}" 

1846 ) 

1847 return True # Consider this a success since it's already stopped 

1848 

1849 # If it exists but isn't in active_research, still update status 

1850 research.status = ResearchStatus.SUSPENDED 

1851 db_session.commit() 

1852 logger.info(f"Successfully suspended research {research_id}") 

1853 except Exception: 

1854 logger.exception( 

1855 f"Error accessing database for research {research_id}" 

1856 ) 

1857 return False 

1858 

1859 return True 

1860 except Exception: 

1861 logger.exception( 

1862 f"Unexpected error in cancel_research for {research_id}" 

1863 ) 

1864 return False