Coverage for src / local_deep_research / web / services / research_service.py: 41%
642 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import hashlib
2import json
3import threading
4from datetime import datetime, UTC
5from pathlib import Path
7from loguru import logger
9from ...config.llm_config import get_llm
11# Output directory for research results
12from ...config.paths import get_research_outputs_directory
13from ...config.search_config import get_search
14from ...constants import ResearchStatus
15from ...database.models import ResearchHistory, ResearchStrategy
16from ...database.session_context import get_user_db_session
17from ...error_handling.report_generator import ErrorReportGenerator
18from ...utilities.thread_context import set_search_context
19from ...report_generator import IntegratedReportGenerator
20from ...search_system import AdvancedSearchSystem
21from ...text_optimization import CitationFormatter, CitationMode
22from ...utilities.log_utils import log_for_research
23from ...utilities.search_utilities import extract_links_from_search_results
24from ...utilities.threading_utils import thread_context, thread_with_app_context
25from ..models.database import calculate_duration
26from .socket_service import SocketIOService
28OUTPUT_DIR = get_research_outputs_directory()
31def _parse_research_metadata(research_meta) -> dict:
32 """Parse research_meta into a dict, handling both dict and JSON string types."""
33 if isinstance(research_meta, dict):
34 return dict(research_meta)
35 if isinstance(research_meta, str):
36 try:
37 return json.loads(research_meta)
38 except json.JSONDecodeError:
39 logger.exception("Failed to parse research_meta as JSON")
40 return {}
41 return {}
44def get_citation_formatter():
45 """Get citation formatter with settings from thread context."""
46 # Import here to avoid circular imports
47 from ...config.search_config import get_setting_from_snapshot
49 citation_format = get_setting_from_snapshot(
50 "report.citation_format", "number_hyperlinks"
51 )
52 mode_map = {
53 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
54 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
55 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
56 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
57 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
58 }
59 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS)
60 return CitationFormatter(mode=mode)
63def export_report_to_memory(
64 markdown_content: str, format: str, title: str = None
65):
66 """
67 Export a markdown report to different formats in memory.
69 Uses the modular exporter registry to support multiple formats.
70 Available formats can be queried with ExporterRegistry.get_available_formats().
72 Args:
73 markdown_content: The markdown content to export
74 format: Export format (e.g., 'pdf', 'odt', 'latex', 'quarto', 'ris')
75 title: Optional title for the document
77 Returns:
78 Tuple of (content_bytes, filename, mimetype)
79 """
80 from ...exporters import ExporterRegistry, ExportOptions
82 # Normalize format
83 format_lower = format.lower()
85 # Get exporter from registry
86 exporter = ExporterRegistry.get_exporter(format_lower)
88 if exporter is None:
89 available = ExporterRegistry.get_available_formats()
90 raise ValueError(
91 f"Unsupported export format: {format}. "
92 f"Available formats: {', '.join(available)}"
93 )
95 # Title prepending is now handled by each exporter via _prepend_title_if_needed()
96 # PDF and ODT exporters prepend titles; RIS and other formats ignore them
98 # Create options
99 options = ExportOptions(title=title)
101 # Export
102 result = exporter.export(markdown_content, options)
104 logger.info(
105 f"Generated {format_lower} in memory, size: {len(result.content)} bytes"
106 )
108 return result.content, result.filename, result.mimetype
111def save_research_strategy(research_id, strategy_name, username=None):
112 """
113 Save the strategy used for a research to the database.
115 Args:
116 research_id: The ID of the research
117 strategy_name: The name of the strategy used
118 username: The username for database access (required for thread context)
119 """
120 try:
121 logger.debug(
122 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}"
123 )
124 with get_user_db_session(username) as session:
125 try:
126 # Check if a strategy already exists for this research
127 existing_strategy = (
128 session.query(ResearchStrategy)
129 .filter_by(research_id=research_id)
130 .first()
131 )
133 if existing_strategy:
134 # Update existing strategy
135 existing_strategy.strategy_name = strategy_name
136 logger.debug(
137 f"Updating existing strategy for research {research_id}"
138 )
139 else:
140 # Create new strategy record
141 new_strategy = ResearchStrategy(
142 research_id=research_id, strategy_name=strategy_name
143 )
144 session.add(new_strategy)
145 logger.debug(
146 f"Creating new strategy record for research {research_id}"
147 )
149 session.commit()
150 logger.info(
151 f"Saved strategy '{strategy_name}' for research {research_id}"
152 )
153 finally:
154 session.close()
155 except Exception:
156 logger.exception("Error saving research strategy")
159def get_research_strategy(research_id, username=None):
160 """
161 Get the strategy used for a research.
163 Args:
164 research_id: The ID of the research
165 username: The username for database access (required for thread context)
167 Returns:
168 str: The strategy name or None if not found
169 """
170 try:
171 with get_user_db_session(username) as session:
172 try:
173 strategy = (
174 session.query(ResearchStrategy)
175 .filter_by(research_id=research_id)
176 .first()
177 )
179 return strategy.strategy_name if strategy else None
180 finally:
181 session.close()
182 except Exception:
183 logger.exception("Error getting research strategy")
184 return None
187def start_research_process(
188 research_id,
189 query,
190 mode,
191 active_research,
192 termination_flags,
193 run_research_callback,
194 **kwargs,
195):
196 """
197 Start a research process in a background thread.
199 Args:
200 research_id: The ID of the research
201 query: The research query
202 mode: The research mode (quick/detailed)
203 active_research: Dictionary of active research processes
204 termination_flags: Dictionary of termination flags
205 run_research_callback: The callback function to run the research
206 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.)
208 Returns:
209 threading.Thread: The thread running the research
210 """
211 # Pass the app context to the thread.
212 run_research_callback = thread_with_app_context(run_research_callback)
214 # Start research process in a background thread
215 thread = threading.Thread(
216 target=run_research_callback,
217 args=(
218 thread_context(),
219 research_id,
220 query,
221 mode,
222 active_research,
223 termination_flags,
224 ),
225 kwargs=kwargs,
226 )
227 thread.daemon = True
228 thread.start()
230 active_research[research_id] = {
231 "thread": thread,
232 "progress": 0,
233 "status": ResearchStatus.IN_PROGRESS,
234 "log": [],
235 "settings": kwargs, # Store settings for reference
236 }
238 return thread
241def _generate_report_path(query: str) -> Path:
242 """
243 Generates a path for a new report file based on the query.
245 Args:
246 query: The query used for the report.
248 Returns:
249 The path that it generated.
251 """
252 # Generate a unique filename that does not contain
253 # non-alphanumeric characters.
254 query_hash = hashlib.md5( # DevSkim: ignore DS126858
255 query.encode("utf-8"), usedforsecurity=False
256 ).hexdigest()[:10]
257 return OUTPUT_DIR / (
258 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md"
259 )
262@log_for_research
263def run_research_process(
264 research_id, query, mode, active_research, termination_flags, **kwargs
265):
266 """
267 Run the research process in the background for a given research ID.
269 Args:
270 research_id: The ID of the research
271 query: The research query
272 mode: The research mode (quick/detailed)
273 active_research: Dictionary of active research processes
274 termination_flags: Dictionary of termination flags
275 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.)
276 MUST include 'username' for database access
277 """
279 # Extract username - required for database access
280 username = kwargs.get("username")
281 logger.info(f"Research thread started with username: {username}")
282 if not username: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 logger.error("No username provided to research thread")
284 raise ValueError("Username is required for research process")
285 try:
286 # Check if this research has been terminated before we even start
287 if termination_flags.get(research_id): 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true
288 logger.info(
289 f"Research {research_id} was terminated before starting"
290 )
291 cleanup_research_resources(
292 research_id, active_research, termination_flags, username
293 )
294 return
296 logger.info(
297 f"Starting research process for ID {research_id}, query: {query}"
298 )
300 # Extract key parameters
301 model_provider = kwargs.get("model_provider")
302 model = kwargs.get("model")
303 custom_endpoint = kwargs.get("custom_endpoint")
304 search_engine = kwargs.get("search_engine")
305 max_results = kwargs.get("max_results")
306 time_period = kwargs.get("time_period")
307 iterations = kwargs.get("iterations")
308 questions_per_iteration = kwargs.get("questions_per_iteration")
309 strategy = kwargs.get(
310 "strategy", "source-based"
311 ) # Default to source-based
312 settings_snapshot = kwargs.get(
313 "settings_snapshot", {}
314 ) # Complete settings snapshot
316 # Log settings snapshot to debug
317 from ...settings.logger import log_settings
319 log_settings(settings_snapshot, "Settings snapshot received in thread")
321 # Strategy should already be saved in the database before thread starts
322 logger.info(f"Research strategy: {strategy}")
324 # Log all parameters for debugging
325 logger.info(
326 f"Research parameters: provider={model_provider}, model={model}, "
327 f"search_engine={search_engine}, max_results={max_results}, "
328 f"time_period={time_period}, iterations={iterations}, "
329 f"questions_per_iteration={questions_per_iteration}, "
330 f"custom_endpoint={custom_endpoint}, strategy={strategy}"
331 )
333 # Set up the AI Context Manager
334 output_dir = OUTPUT_DIR / f"research_{research_id}"
335 output_dir.mkdir(parents=True, exist_ok=True)
337 # Create a settings context that uses snapshot if available, otherwise falls back to database
338 # This allows the research to be independent of database during execution
339 class SettingsContext:
340 def __init__(self, snapshot, username):
341 self.snapshot = snapshot or {}
342 self.username = username
343 # Extract values from setting objects if needed
344 self.values = {}
345 for key, setting in self.snapshot.items():
346 if isinstance(setting, dict) and "value" in setting: 346 ↛ 351line 346 didn't jump to line 351 because the condition on line 346 was always true
347 # It's a full setting object, extract the value
348 self.values[key] = setting["value"]
349 else:
350 # It's already just a value
351 self.values[key] = setting
353 def get_setting(self, key, default=None):
354 """Get setting from snapshot only - no database access in threads"""
355 if key in self.values: 355 ↛ 358line 355 didn't jump to line 358 because the condition on line 355 was always true
356 return self.values[key]
357 # No fallback to database - threads must use snapshot only
358 logger.debug(
359 f"Setting '{key}' not found in snapshot, using default"
360 )
361 return default
363 settings_context = SettingsContext(settings_snapshot, username)
365 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var
366 from ...settings.logger import log_settings
368 log_settings(
369 settings_context.values, "SettingsContext values extracted"
370 )
372 # Set the settings context for this thread
373 from ...config.thread_settings import (
374 clear_settings_context,
375 set_settings_context,
376 )
378 set_settings_context(settings_context)
380 # Get user password if provided
381 user_password = kwargs.get("user_password")
383 # Create shared research context that can be updated during research
384 shared_research_context = {
385 "research_id": research_id,
386 "research_query": query,
387 "research_mode": mode,
388 "research_phase": "init",
389 "search_iteration": 0,
390 "search_engines_planned": None,
391 "search_engine_selected": search_engine,
392 "username": username, # Add username for queue operations
393 "user_password": user_password, # Add password for metrics access
394 }
396 # If this is a follow-up research, include the parent context
397 if "research_context" in kwargs and kwargs["research_context"]: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 logger.info(
399 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings"
400 )
401 shared_research_context.update(kwargs["research_context"])
403 # Do not log context keys as they may contain sensitive information
404 logger.info(f"Created shared_research_context for user: {username}")
406 # Set search context for search tracking
407 set_search_context(shared_research_context)
409 # Set up progress callback
410 def progress_callback(message, progress_percent, metadata):
411 # Frequent termination check
412 if termination_flags.get(research_id):
413 handle_termination(
414 research_id, active_research, termination_flags, username
415 )
416 raise Exception("Research was terminated by user")
418 # Bind research_id to logger for this specific log
419 bound_logger = logger.bind(research_id=research_id)
420 bound_logger.log("MILESTONE", message)
422 if "SEARCH_PLAN:" in message: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 engines = message.split("SEARCH_PLAN:")[1].strip()
424 metadata["planned_engines"] = engines
425 metadata["phase"] = "search_planning" # Use existing phase
426 # Update shared context for token tracking
427 shared_research_context["search_engines_planned"] = engines
428 shared_research_context["research_phase"] = "search_planning"
430 if "ENGINE_SELECTED:" in message: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true
431 engine = message.split("ENGINE_SELECTED:")[1].strip()
432 metadata["selected_engine"] = engine
433 metadata["phase"] = "search" # Use existing 'search' phase
434 # Update shared context for token tracking
435 shared_research_context["search_engine_selected"] = engine
436 shared_research_context["research_phase"] = "search"
438 # Capture other research phases for better context tracking
439 if metadata.get("phase"): 439 ↛ 443line 439 didn't jump to line 443 because the condition on line 439 was always true
440 shared_research_context["research_phase"] = metadata["phase"]
442 # Update search iteration if available
443 if "iteration" in metadata: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 shared_research_context["search_iteration"] = metadata[
445 "iteration"
446 ]
448 # Adjust progress based on research mode
449 adjusted_progress = progress_percent
450 if ( 450 ↛ 455line 450 didn't jump to line 455 because the condition on line 450 was never true
451 mode == "detailed"
452 and metadata.get("phase") == "output_generation"
453 ):
454 # For detailed mode, adjust the progress range for output generation
455 adjusted_progress = min(80, progress_percent)
456 elif ( 456 ↛ 461line 456 didn't jump to line 461 because the condition on line 456 was never true
457 mode == "detailed"
458 and metadata.get("phase") == "report_generation"
459 ):
460 # Scale the progress from 80% to 95% for the report generation phase
461 if progress_percent is not None:
462 normalized = progress_percent / 100
463 adjusted_progress = 80 + (normalized * 15)
464 elif ( 464 ↛ 468line 464 didn't jump to line 468 because the condition on line 464 was never true
465 mode == "quick" and metadata.get("phase") == "output_generation"
466 ):
467 # For quick mode, ensure we're at least at 85% during output generation
468 adjusted_progress = max(85, progress_percent)
469 # Map any further progress within output_generation to 85-95% range
470 if progress_percent is not None and progress_percent > 0:
471 normalized = progress_percent / 100
472 adjusted_progress = 85 + (normalized * 10)
474 # Don't let progress go backwards
475 if research_id in active_research and adjusted_progress is not None: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 current_progress = active_research[research_id].get(
477 "progress", 0
478 )
479 adjusted_progress = max(current_progress, adjusted_progress)
481 # Update active research record
482 if research_id in active_research: 482 ↛ exitline 482 didn't return from function 'progress_callback' because the condition on line 482 was always true
483 if adjusted_progress is not None: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 active_research[research_id]["progress"] = adjusted_progress
486 # Queue the progress update to be processed in main thread
487 if adjusted_progress is not None: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true
488 from ..queue.processor_v2 import queue_processor
490 if username:
491 queue_processor.queue_progress_update(
492 username, research_id, adjusted_progress
493 )
494 else:
495 logger.warning(
496 f"Cannot queue progress update for research {research_id} - no username available"
497 )
499 # Emit a socket event
500 try:
501 # Basic event data
502 event_data = {"progress": adjusted_progress}
504 SocketIOService().emit_to_subscribers(
505 "progress", research_id, event_data
506 )
507 except Exception:
508 logger.exception("Socket emit error (non-critical)")
510 # Function to check termination during long-running operations
511 def check_termination():
512 if termination_flags.get(research_id):
513 handle_termination(
514 research_id, active_research, termination_flags, username
515 )
516 raise Exception(
517 "Research was terminated by user during long-running operation"
518 )
519 return False # Not terminated
521 # Configure the system with the specified parameters
522 use_llm = None
523 if model or search_engine or model_provider: 523 ↛ 530line 523 didn't jump to line 530 because the condition on line 523 was always true
524 # Log that we're overriding system settings
525 logger.info(
526 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}"
527 )
529 # Override LLM if model or model_provider specified
530 if model or model_provider: 530 ↛ 573line 530 didn't jump to line 573 because the condition on line 530 was always true
531 try:
532 # Get LLM with the overridden settings
533 # Use the shared_research_context which includes username
534 use_llm = get_llm(
535 model_name=model,
536 provider=model_provider,
537 openai_endpoint_url=custom_endpoint,
538 research_id=research_id,
539 research_context=shared_research_context,
540 )
542 logger.info(
543 f"Successfully set LLM to: provider={model_provider}, model={model}"
544 )
545 except Exception as e:
546 logger.exception(
547 f"Error setting LLM provider={model_provider}, model={model}"
548 )
549 error_msg = str(e)
550 # Surface configuration errors to user instead of silently continuing
551 config_error_keywords = [
552 "model path",
553 "llamacpp",
554 "cannot connect",
555 "server",
556 "not configured",
557 "not responding",
558 "directory",
559 ".gguf",
560 ]
561 if any( 561 ↛ 566line 561 didn't jump to line 566 because the condition on line 561 was never true
562 keyword in error_msg.lower()
563 for keyword in config_error_keywords
564 ):
565 # This is a configuration error the user can fix
566 raise ValueError(
567 f"LLM Configuration Error: {error_msg}"
568 ) from e
569 # For other errors, re-raise to avoid silent failures
570 raise
572 # Create search engine first if specified, to avoid default creation without username
573 use_search = None
574 if search_engine:
575 try:
576 # Create a new search object with these settings
577 use_search = get_search(
578 search_tool=search_engine,
579 llm_instance=use_llm,
580 username=username,
581 settings_snapshot=settings_snapshot,
582 )
583 logger.info(
584 f"Successfully created search engine: {search_engine}"
585 )
586 except Exception as e:
587 logger.exception(
588 f"Error creating search engine {search_engine}"
589 )
590 error_msg = str(e)
591 # Surface configuration errors to user instead of silently continuing
592 config_error_keywords = [
593 "searxng",
594 "instance_url",
595 "api_key",
596 "cannot connect",
597 "connection",
598 "timeout",
599 "not configured",
600 ]
601 if any(
602 keyword in error_msg.lower()
603 for keyword in config_error_keywords
604 ):
605 # This is a configuration error the user can fix
606 raise ValueError(
607 f"Search Engine Configuration Error ({search_engine}): {error_msg}"
608 ) from e
609 # For other errors, re-raise to avoid silent failures
610 raise
612 # Set the progress callback in the system
613 system = AdvancedSearchSystem(
614 llm=use_llm,
615 search=use_search,
616 strategy_name=strategy,
617 max_iterations=iterations,
618 questions_per_iteration=questions_per_iteration,
619 username=username,
620 settings_snapshot=settings_snapshot,
621 research_id=research_id,
622 research_context=shared_research_context,
623 )
624 system.set_progress_callback(progress_callback)
626 # Run the search
627 progress_callback("Starting research process", 5, {"phase": "init"})
629 try:
630 results = system.analyze_topic(query)
631 if mode == "quick":
632 progress_callback(
633 "Search complete, preparing to generate summary...",
634 85,
635 {"phase": "output_generation"},
636 )
637 else:
638 progress_callback(
639 "Search complete, generating output",
640 80,
641 {"phase": "output_generation"},
642 )
643 except Exception as search_error:
644 # Better handling of specific search errors
645 error_message = str(search_error)
646 error_type = "unknown"
648 # Extract error details for common issues
649 if "status code: 503" in error_message:
650 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system."
651 error_type = "ollama_unavailable"
652 elif "status code: 404" in error_message:
653 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model."
654 error_type = "model_not_found"
655 elif "status code:" in error_message:
656 # Extract the status code for other HTTP errors
657 status_code = error_message.split("status code:")[1].strip()
658 error_message = f"API request failed with status code {status_code}. Please check your configuration."
659 error_type = "api_error"
660 elif "connection" in error_message.lower():
661 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible."
662 error_type = "connection_error"
664 # Raise with improved error message
665 raise Exception(f"{error_message} (Error type: {error_type})")
667 # Generate output based on mode
668 if mode == "quick":
669 # Quick Summary
670 if results.get("findings") or results.get("formatted_findings"):
671 raw_formatted_findings = results["formatted_findings"]
673 # Check if formatted_findings contains an error message
674 if isinstance(
675 raw_formatted_findings, str
676 ) and raw_formatted_findings.startswith("Error:"):
677 logger.exception(
678 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..."
679 )
681 # Determine error type for better user feedback
682 error_type = "unknown"
683 error_message = raw_formatted_findings.lower()
685 if (
686 "token limit" in error_message
687 or "context length" in error_message
688 ):
689 error_type = "token_limit"
690 # Log specific error type
691 logger.warning(
692 "Detected token limit error in synthesis"
693 )
695 # Update progress with specific error type
696 progress_callback(
697 "Synthesis hit token limits. Attempting fallback...",
698 87,
699 {
700 "phase": "synthesis_error",
701 "error_type": error_type,
702 },
703 )
704 elif (
705 "timeout" in error_message
706 or "timed out" in error_message
707 ):
708 error_type = "timeout"
709 logger.warning("Detected timeout error in synthesis")
710 progress_callback(
711 "Synthesis timed out. Attempting fallback...",
712 87,
713 {
714 "phase": "synthesis_error",
715 "error_type": error_type,
716 },
717 )
718 elif "rate limit" in error_message:
719 error_type = "rate_limit"
720 logger.warning("Detected rate limit error in synthesis")
721 progress_callback(
722 "LLM rate limit reached. Attempting fallback...",
723 87,
724 {
725 "phase": "synthesis_error",
726 "error_type": error_type,
727 },
728 )
729 elif (
730 "connection" in error_message
731 or "network" in error_message
732 ):
733 error_type = "connection"
734 logger.warning("Detected connection error in synthesis")
735 progress_callback(
736 "Connection issue with LLM. Attempting fallback...",
737 87,
738 {
739 "phase": "synthesis_error",
740 "error_type": error_type,
741 },
742 )
743 elif (
744 "llm error" in error_message
745 or "final answer synthesis fail" in error_message
746 ):
747 error_type = "llm_error"
748 logger.warning(
749 "Detected general LLM error in synthesis"
750 )
751 progress_callback(
752 "LLM error during synthesis. Attempting fallback...",
753 87,
754 {
755 "phase": "synthesis_error",
756 "error_type": error_type,
757 },
758 )
759 else:
760 # Generic error
761 logger.warning("Detected unknown error in synthesis")
762 progress_callback(
763 "Error during synthesis. Attempting fallback...",
764 87,
765 {
766 "phase": "synthesis_error",
767 "error_type": "unknown",
768 },
769 )
771 # Extract synthesized content from findings if available
772 synthesized_content = ""
773 for finding in results.get("findings", []):
774 if finding.get("phase") == "Final synthesis":
775 synthesized_content = finding.get("content", "")
776 break
778 # Use synthesized content as fallback
779 if (
780 synthesized_content
781 and not synthesized_content.startswith("Error:")
782 ):
783 logger.info(
784 "Using existing synthesized content as fallback"
785 )
786 raw_formatted_findings = synthesized_content
788 # Or use current_knowledge as another fallback
789 elif results.get("current_knowledge"):
790 logger.info("Using current_knowledge as fallback")
791 raw_formatted_findings = results["current_knowledge"]
793 # Or combine all finding contents as last resort
794 elif results.get("findings"):
795 logger.info("Combining all findings as fallback")
796 # First try to use any findings that are not errors
797 valid_findings = [
798 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
799 for finding in results.get("findings", [])
800 if finding.get("content")
801 and not finding.get("content", "").startswith(
802 "Error:"
803 )
804 ]
806 if valid_findings:
807 raw_formatted_findings = (
808 "# Research Results (Fallback Mode)\n\n"
809 )
810 raw_formatted_findings += "\n\n".join(
811 valid_findings
812 )
813 raw_formatted_findings += f"\n\n## Error Information\n{raw_formatted_findings}"
814 else:
815 # Last resort: use everything including errors
816 raw_formatted_findings = (
817 "# Research Results (Emergency Fallback)\n\n"
818 )
819 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n"
820 raw_formatted_findings += "\n\n".join(
821 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
822 for finding in results.get("findings", [])
823 if finding.get("content")
824 )
826 progress_callback(
827 f"Using fallback synthesis due to {error_type} error",
828 88,
829 {
830 "phase": "synthesis_fallback",
831 "error_type": error_type,
832 },
833 )
835 logger.info(
836 "Found formatted_findings of length: %s",
837 len(str(raw_formatted_findings)),
838 )
840 try:
841 # Check if we have an error in the findings and use enhanced error handling
842 if isinstance(
843 raw_formatted_findings, str
844 ) and raw_formatted_findings.startswith("Error:"):
845 logger.info(
846 "Generating enhanced error report using ErrorReportGenerator"
847 )
849 # Get LLM for error explanation if available
850 try:
851 llm = get_llm(
852 research_id=research_id,
853 research_context=shared_research_context,
854 )
855 except Exception:
856 llm = None
857 logger.warning(
858 "Could not get LLM for error explanation"
859 )
861 # Generate comprehensive error report
862 error_generator = ErrorReportGenerator(llm)
863 clean_markdown = error_generator.generate_error_report(
864 error_message=raw_formatted_findings,
865 query=query,
866 partial_results=results,
867 search_iterations=results.get("iterations", 0),
868 research_id=research_id,
869 )
871 logger.info(
872 "Generated enhanced error report with %d characters",
873 len(clean_markdown),
874 )
875 else:
876 # Get the synthesized content from the LLM directly
877 clean_markdown = raw_formatted_findings
879 # Extract all sources from findings to add them to the summary
880 all_links = []
881 for finding in results.get("findings", []):
882 search_results = finding.get("search_results", [])
883 if search_results:
884 try:
885 links = extract_links_from_search_results(
886 search_results
887 )
888 all_links.extend(links)
889 except Exception:
890 logger.exception(
891 "Error processing search results/links"
892 )
894 logger.info(
895 "Successfully converted to clean markdown of length: %s",
896 len(clean_markdown),
897 )
899 # First send a progress update for generating the summary
900 progress_callback(
901 "Generating clean summary from research data...",
902 90,
903 {"phase": "output_generation"},
904 )
906 # Send progress update for saving report
907 progress_callback(
908 "Saving research report to database...",
909 95,
910 {"phase": "report_complete"},
911 )
913 # Format citations in the markdown content
914 formatter = get_citation_formatter()
915 formatted_content = formatter.format_document(
916 clean_markdown
917 )
919 # Prepare complete report content
920 full_report_content = f"""{formatted_content}
922## Research Metrics
923- Search Iterations: {results["iterations"]}
924- Generated at: {datetime.now(UTC).isoformat()}
925"""
927 # Save sources to database
928 from .research_sources_service import ResearchSourcesService
930 sources_service = ResearchSourcesService()
931 if all_links:
932 logger.info(
933 f"Quick summary: Saving {len(all_links)} sources to database"
934 )
935 sources_saved = sources_service.save_research_sources(
936 research_id=research_id,
937 sources=all_links,
938 username=username,
939 )
940 logger.info(
941 f"Quick summary: Saved {sources_saved} sources for research {research_id}"
942 )
944 # Save report using storage abstraction
945 from ...storage import get_report_storage
947 with get_user_db_session(username) as db_session:
948 storage = get_report_storage(session=db_session)
950 # Prepare metadata
951 metadata = {
952 "iterations": results["iterations"],
953 "generated_at": datetime.now(UTC).isoformat(),
954 }
956 # Save report using storage abstraction
957 success = storage.save_report(
958 research_id=research_id,
959 content=full_report_content,
960 metadata=metadata,
961 username=username,
962 )
964 if not success:
965 raise Exception("Failed to save research report")
967 logger.info(
968 f"Report saved for research_id: {research_id}"
969 )
971 # Skip export to additional formats - we're storing in database only
973 # Update research status in database
974 completed_at = datetime.now(UTC).isoformat()
976 with get_user_db_session(username) as db_session:
977 research = (
978 db_session.query(ResearchHistory)
979 .filter_by(id=research_id)
980 .first()
981 )
983 # Preserve existing metadata and update with new values
984 metadata = _parse_research_metadata(
985 research.research_meta
986 )
988 metadata.update(
989 {
990 "iterations": results["iterations"],
991 "generated_at": datetime.now(UTC).isoformat(),
992 }
993 )
995 # Use the helper function for consistent duration calculation
996 duration_seconds = calculate_duration(
997 research.created_at, completed_at
998 )
1000 research.status = ResearchStatus.COMPLETED
1001 research.completed_at = completed_at
1002 research.duration_seconds = duration_seconds
1003 # Note: report_content is saved by CachedResearchService
1004 # report_path is not used in encrypted database version
1006 # Generate headline and topics only for news searches
1007 if (
1008 metadata.get("is_news_search")
1009 or metadata.get("search_type") == "news_analysis"
1010 ):
1011 try:
1012 from ...news.utils.headline_generator import (
1013 generate_headline,
1014 )
1015 from ...news.utils.topic_generator import (
1016 generate_topics,
1017 )
1019 # Get the report content from database for better headline/topic generation
1020 report_content = ""
1021 try:
1022 research = (
1023 db_session.query(ResearchHistory)
1024 .filter_by(id=research_id)
1025 .first()
1026 )
1027 if research and research.report_content:
1028 report_content = research.report_content
1029 logger.info(
1030 f"Retrieved {len(report_content)} chars from database for headline generation"
1031 )
1032 else:
1033 logger.warning(
1034 f"No report content found in database for research_id: {research_id}"
1035 )
1036 except Exception as e:
1037 logger.warning(
1038 f"Could not retrieve report content from database: {e}"
1039 )
1041 # Generate headline
1042 logger.info(
1043 f"Generating headline for query: {query[:100]}"
1044 )
1045 headline = generate_headline(
1046 query, report_content
1047 )
1048 metadata["generated_headline"] = headline
1050 # Generate topics
1051 logger.info(
1052 f"Generating topics with category: {metadata.get('category', 'News')}"
1053 )
1054 topics = generate_topics(
1055 query=query,
1056 findings=report_content,
1057 category=metadata.get("category", "News"),
1058 max_topics=6,
1059 )
1060 metadata["generated_topics"] = topics
1062 logger.info(f"Generated headline: {headline}")
1063 logger.info(f"Generated topics: {topics}")
1065 except Exception as e:
1066 logger.warning(
1067 f"Could not generate headline/topics: {e}"
1068 )
1070 research.research_meta = metadata
1072 db_session.commit()
1073 logger.info(
1074 f"Database commit completed for research_id: {research_id}"
1075 )
1077 # Update subscription if this was triggered by a subscription
1078 if metadata.get("subscription_id"):
1079 try:
1080 from ...news.subscription_manager.storage import (
1081 SQLSubscriptionStorage,
1082 )
1083 from datetime import (
1084 datetime as dt,
1085 timezone,
1086 timedelta,
1087 )
1089 sub_storage = SQLSubscriptionStorage()
1090 subscription_id = metadata["subscription_id"]
1092 # Get subscription to find refresh interval
1093 subscription = sub_storage.get(subscription_id)
1094 if subscription:
1095 refresh_minutes = subscription.get(
1096 "refresh_minutes", 240
1097 )
1098 now = dt.now(timezone.utc)
1099 next_refresh = now + timedelta(
1100 minutes=refresh_minutes
1101 )
1103 # Update refresh times
1104 sub_storage.update_refresh_time(
1105 subscription_id=subscription_id,
1106 last_refresh=now,
1107 next_refresh=next_refresh,
1108 )
1110 # Increment stats
1111 sub_storage.increment_stats(
1112 subscription_id, 1
1113 )
1115 logger.info(
1116 f"Updated subscription {subscription_id} refresh times"
1117 )
1118 except Exception as e:
1119 logger.warning(
1120 f"Could not update subscription refresh time: {e}"
1121 )
1123 logger.info(
1124 f"Database updated successfully for research_id: {research_id}"
1125 )
1127 # Send the final completion message
1128 progress_callback(
1129 "Research completed successfully",
1130 100,
1131 {"phase": "complete"},
1132 )
1134 # Clean up resources
1135 logger.info(
1136 "Cleaning up resources for research_id: %s", research_id
1137 )
1138 cleanup_research_resources(
1139 research_id,
1140 active_research,
1141 termination_flags,
1142 username,
1143 )
1144 logger.info(
1145 "Resources cleaned up for research_id: %s", research_id
1146 )
1148 except Exception as inner_e:
1149 logger.exception("Error during quick summary generation")
1150 raise Exception(
1151 f"Error generating quick summary: {inner_e!s}"
1152 )
1153 else:
1154 raise Exception(
1155 "No research findings were generated. Please try again."
1156 )
1157 else:
1158 # Full Report
1159 progress_callback(
1160 "Generating detailed report...",
1161 85,
1162 {"phase": "report_generation"},
1163 )
1165 # Extract the search system from the results if available
1166 search_system = results.get("search_system", None)
1168 # Pass the existing search system to maintain citation indices
1169 report_generator = IntegratedReportGenerator(
1170 search_system=search_system,
1171 settings_snapshot=settings_snapshot,
1172 )
1173 final_report = report_generator.generate_report(results, query)
1175 progress_callback(
1176 "Report generation complete", 95, {"phase": "report_complete"}
1177 )
1179 # Format citations in the report content
1180 formatter = get_citation_formatter()
1181 formatted_content = formatter.format_document(
1182 final_report["content"]
1183 )
1185 # Save sources to database
1186 from .research_sources_service import ResearchSourcesService
1188 sources_service = ResearchSourcesService()
1189 if (
1190 hasattr(search_system, "all_links_of_system")
1191 and search_system.all_links_of_system
1192 ):
1193 logger.info(
1194 f"Saving {len(search_system.all_links_of_system)} sources to database"
1195 )
1196 sources_saved = sources_service.save_research_sources(
1197 research_id=research_id,
1198 sources=search_system.all_links_of_system,
1199 username=username,
1200 )
1201 logger.info(
1202 f"Saved {sources_saved} sources for research {research_id}"
1203 )
1205 # Save report to database
1206 with get_user_db_session(username) as db_session:
1207 # Update metadata
1208 metadata = final_report["metadata"]
1209 metadata["iterations"] = results["iterations"]
1211 # Save report to database
1212 try:
1213 research = (
1214 db_session.query(ResearchHistory)
1215 .filter_by(id=research_id)
1216 .first()
1217 )
1219 if not research:
1220 logger.error(f"Research {research_id} not found")
1221 success = False
1222 else:
1223 research.report_content = formatted_content
1224 if research.research_meta:
1225 research.research_meta.update(metadata)
1226 else:
1227 research.research_meta = metadata
1228 db_session.commit()
1229 success = True
1230 logger.info(
1231 f"Saved report for research {research_id} to database"
1232 )
1233 except Exception:
1234 logger.exception("Error saving report to database")
1235 db_session.rollback()
1236 success = False
1238 if not success:
1239 raise Exception("Failed to save research report")
1241 logger.info(
1242 f"Report saved to database for research_id: {research_id}"
1243 )
1245 # Update research status in database
1246 completed_at = datetime.now(UTC).isoformat()
1248 with get_user_db_session(username) as db_session:
1249 research = (
1250 db_session.query(ResearchHistory)
1251 .filter_by(id=research_id)
1252 .first()
1253 )
1255 # Preserve existing metadata and merge with report metadata
1256 metadata = _parse_research_metadata(research.research_meta)
1258 metadata.update(final_report["metadata"])
1259 metadata["iterations"] = results["iterations"]
1261 # Use the helper function for consistent duration calculation
1262 duration_seconds = calculate_duration(
1263 research.created_at, completed_at
1264 )
1266 research.status = ResearchStatus.COMPLETED
1267 research.completed_at = completed_at
1268 research.duration_seconds = duration_seconds
1269 # Note: report_content is saved by CachedResearchService
1270 # report_path is not used in encrypted database version
1272 # Generate headline and topics only for news searches
1273 if (
1274 metadata.get("is_news_search")
1275 or metadata.get("search_type") == "news_analysis"
1276 ):
1277 try:
1278 from ..news.utils.headline_generator import (
1279 generate_headline,
1280 )
1281 from ..news.utils.topic_generator import (
1282 generate_topics,
1283 )
1285 # Get the report content from database for better headline/topic generation
1286 report_content = ""
1287 try:
1288 research = (
1289 db_session.query(ResearchHistory)
1290 .filter_by(id=research_id)
1291 .first()
1292 )
1293 if research and research.report_content:
1294 report_content = research.report_content
1295 else:
1296 logger.warning(
1297 f"No report content found in database for research_id: {research_id}"
1298 )
1299 except Exception as e:
1300 logger.warning(
1301 f"Could not retrieve report content from database: {e}"
1302 )
1304 # Generate headline
1305 headline = generate_headline(query, report_content)
1306 metadata["generated_headline"] = headline
1308 # Generate topics
1309 topics = generate_topics(
1310 query=query,
1311 findings=report_content,
1312 category=metadata.get("category", "News"),
1313 max_topics=6,
1314 )
1315 metadata["generated_topics"] = topics
1317 logger.info(f"Generated headline: {headline}")
1318 logger.info(f"Generated topics: {topics}")
1320 except Exception as e:
1321 logger.warning(
1322 f"Could not generate headline/topics: {e}"
1323 )
1325 research.research_meta = metadata
1327 db_session.commit()
1329 # Update subscription if this was triggered by a subscription
1330 if metadata.get("subscription_id"):
1331 try:
1332 from ...news.subscription_manager.storage import (
1333 SQLSubscriptionStorage,
1334 )
1335 from datetime import datetime as dt, timezone, timedelta
1337 sub_storage = SQLSubscriptionStorage()
1338 subscription_id = metadata["subscription_id"]
1340 # Get subscription to find refresh interval
1341 subscription = sub_storage.get(subscription_id)
1342 if subscription:
1343 refresh_minutes = subscription.get(
1344 "refresh_minutes", 240
1345 )
1346 now = dt.now(timezone.utc)
1347 next_refresh = now + timedelta(
1348 minutes=refresh_minutes
1349 )
1351 # Update refresh times
1352 sub_storage.update_refresh_time(
1353 subscription_id=subscription_id,
1354 last_refresh=now,
1355 next_refresh=next_refresh,
1356 )
1358 # Increment stats
1359 sub_storage.increment_stats(subscription_id, 1)
1361 logger.info(
1362 f"Updated subscription {subscription_id} refresh times"
1363 )
1364 except Exception as e:
1365 logger.warning(
1366 f"Could not update subscription refresh time: {e}"
1367 )
1369 progress_callback(
1370 "Research completed successfully",
1371 100,
1372 {"phase": "complete"},
1373 )
1375 # Clean up resources
1376 cleanup_research_resources(
1377 research_id, active_research, termination_flags, username
1378 )
1380 except Exception as e:
1381 # Handle error
1382 error_message = f"Research failed: {e!s}"
1383 logger.exception(error_message)
1385 try:
1386 # Check for common Ollama error patterns in the exception and provide more user-friendly errors
1387 user_friendly_error = str(e)
1388 error_context = {}
1390 if "Error type: ollama_unavailable" in user_friendly_error: 1390 ↛ 1391line 1390 didn't jump to line 1391 because the condition on line 1390 was never true
1391 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system."
1392 error_context = {
1393 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly."
1394 }
1395 elif "Error type: model_not_found" in user_friendly_error: 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true
1396 user_friendly_error = "Required Ollama model not found. Please pull the model first."
1397 error_context = {
1398 "solution": "Run 'ollama pull mistral' to download the required model."
1399 }
1400 elif "Error type: connection_error" in user_friendly_error: 1400 ↛ 1401line 1400 didn't jump to line 1401 because the condition on line 1400 was never true
1401 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running."
1402 error_context = {
1403 "solution": "Ensure Ollama or your API service is running and accessible."
1404 }
1405 elif "Error type: api_error" in user_friendly_error: 1405 ↛ 1407line 1405 didn't jump to line 1407 because the condition on line 1405 was never true
1406 # Keep the original error message as it's already improved
1407 error_context = {
1408 "solution": "Check API configuration and credentials."
1409 }
1411 # Generate enhanced error report for failed research
1412 enhanced_report_content = None
1413 try:
1414 # Get LLM for error explanation if available
1415 try:
1416 llm = get_llm(
1417 research_id=research_id,
1418 research_context=shared_research_context,
1419 )
1420 except Exception:
1421 llm = None
1422 logger.warning(
1423 "Could not get LLM for error explanation in failure handler"
1424 )
1426 # Get partial results if they exist
1427 partial_results = results if "results" in locals() else None
1428 search_iterations = (
1429 results.get("iterations", 0) if partial_results else 0
1430 )
1432 # Generate comprehensive error report
1433 error_generator = ErrorReportGenerator(llm)
1434 enhanced_report_content = error_generator.generate_error_report(
1435 error_message=f"Research failed: {e!s}",
1436 query=query,
1437 partial_results=partial_results,
1438 search_iterations=search_iterations,
1439 research_id=research_id,
1440 )
1442 logger.info(
1443 "Generated enhanced error report for failed research (length: %d)",
1444 len(enhanced_report_content),
1445 )
1447 # Save enhanced error report to encrypted database
1448 try:
1449 # username already available from function scope (line 281)
1450 if username: 1450 ↛ 1472line 1450 didn't jump to line 1472 because the condition on line 1450 was always true
1451 from ...storage import get_report_storage
1453 with get_user_db_session(username) as db_session:
1454 storage = get_report_storage(session=db_session)
1455 success = storage.save_report(
1456 research_id=research_id,
1457 content=enhanced_report_content,
1458 metadata={"error_report": True},
1459 username=username,
1460 )
1461 if success: 1461 ↛ 1467line 1461 didn't jump to line 1467 because the condition on line 1461 was always true
1462 logger.info(
1463 "Saved enhanced error report to encrypted database for research %s",
1464 research_id,
1465 )
1466 else:
1467 logger.warning(
1468 "Failed to save enhanced error report to database for research %s",
1469 research_id,
1470 )
1471 else:
1472 logger.warning(
1473 "Cannot save error report: username not available"
1474 )
1476 except Exception as report_error:
1477 logger.exception(
1478 "Failed to save enhanced error report: %s", report_error
1479 )
1481 except Exception as error_gen_error:
1482 logger.exception(
1483 "Failed to generate enhanced error report: %s",
1484 error_gen_error,
1485 )
1486 enhanced_report_content = None
1488 # Get existing metadata from database first
1489 existing_metadata = {}
1490 try:
1491 # username already available from function scope (line 281)
1492 if username: 1492 ↛ 1505line 1492 didn't jump to line 1505 because the condition on line 1492 was always true
1493 with get_user_db_session(username) as db_session:
1494 research = (
1495 db_session.query(ResearchHistory)
1496 .filter_by(id=research_id)
1497 .first()
1498 )
1499 if research and research.research_meta: 1499 ↛ 1505line 1499 didn't jump to line 1505
1500 existing_metadata = dict(research.research_meta)
1501 except Exception:
1502 logger.exception("Failed to get existing metadata")
1504 # Update metadata with more context about the error while preserving existing values
1505 metadata = existing_metadata
1506 metadata.update({"phase": "error", "error": user_friendly_error})
1507 if error_context: 1507 ↛ 1508line 1507 didn't jump to line 1508 because the condition on line 1507 was never true
1508 metadata.update(error_context)
1509 if enhanced_report_content: 1509 ↛ 1513line 1509 didn't jump to line 1513 because the condition on line 1509 was always true
1510 metadata["has_enhanced_report"] = True
1512 # If we still have an active research record, update its log
1513 if research_id in active_research:
1514 progress_callback(user_friendly_error, None, metadata)
1516 # If termination was requested, mark as suspended instead of failed
1517 status = (
1518 ResearchStatus.SUSPENDED
1519 if (termination_flags.get(research_id))
1520 else ResearchStatus.FAILED
1521 )
1522 message = (
1523 "Research was terminated by user"
1524 if status == ResearchStatus.SUSPENDED
1525 else user_friendly_error
1526 )
1528 # Calculate duration up to termination point - using UTC consistently
1529 now = datetime.now(UTC)
1530 completed_at = now.isoformat()
1532 # NOTE: Database updates from threads are handled by queue processor
1533 # The queue_processor.queue_error_update() method is already being used below
1534 # to safely update the database from the main thread
1536 # Queue the error update to be processed in main thread
1537 # Using the queue processor v2 system
1538 from ..queue.processor_v2 import queue_processor
1540 if username: 1540 ↛ 1554line 1540 didn't jump to line 1554 because the condition on line 1540 was always true
1541 queue_processor.queue_error_update(
1542 username=username,
1543 research_id=research_id,
1544 status=status,
1545 error_message=message,
1546 metadata=metadata,
1547 completed_at=completed_at,
1548 report_path=None,
1549 )
1550 logger.info(
1551 f"Queued error update for research {research_id} with status '{status}'"
1552 )
1553 else:
1554 logger.error(
1555 f"Cannot queue error update for research {research_id} - no username provided. "
1556 f"Status: '{status}', Message: {message}"
1557 )
1559 try:
1560 SocketIOService().emit_to_subscribers(
1561 "research_progress",
1562 research_id,
1563 {"status": status, "error": message},
1564 )
1565 except Exception:
1566 logger.exception("Failed to emit error via socket")
1568 except Exception:
1569 logger.exception("Error in error handler")
1571 # Clean up resources
1572 cleanup_research_resources(
1573 research_id, active_research, termination_flags, username
1574 )
1576 finally:
1577 # Clear thread-local contexts to prevent leaks when threads are reused
1578 from ...utilities.thread_context import clear_search_context
1579 from ...config.thread_settings import clear_settings_context
1581 clear_search_context()
1582 clear_settings_context()
1585def cleanup_research_resources(
1586 research_id, active_research, termination_flags, username=None
1587):
1588 """
1589 Clean up resources for a completed research.
1591 Args:
1592 research_id: The ID of the research
1593 active_research: Dictionary of active research processes
1594 termination_flags: Dictionary of termination flags
1595 username: The username for database access (required for thread context)
1596 """
1597 logger.info("Cleaning up resources for research %s", research_id)
1599 # For testing: Add a small delay to simulate research taking time
1600 # This helps test concurrent research limits
1601 from ...settings.env_registry import is_test_mode
1603 if is_test_mode(): 1603 ↛ 1604line 1603 didn't jump to line 1604 because the condition on line 1603 was never true
1604 import time
1606 logger.info(
1607 f"Test mode: Adding 5 second delay before cleanup for {research_id}"
1608 )
1609 time.sleep(5)
1611 # Get the current status from the database to determine the final status message
1612 current_status = ResearchStatus.COMPLETED # Default
1614 # NOTE: Queue processor already handles database updates from the main thread
1615 # The notify_research_completed() method is called at the end of this function
1616 # which safely updates the database status
1618 # Notify queue processor that research completed
1619 # This uses processor_v2 which handles database updates in the main thread
1620 # avoiding the Flask request context issues that occur in background threads
1621 from ..queue.processor_v2 import queue_processor
1623 if username: 1623 ↛ 1629line 1623 didn't jump to line 1629 because the condition on line 1623 was always true
1624 queue_processor.notify_research_completed(username, research_id)
1625 logger.info(
1626 f"Notified queue processor of completion for research {research_id} (user: {username})"
1627 )
1628 else:
1629 logger.warning(
1630 f"Cannot notify completion for research {research_id} - no username provided"
1631 )
1633 # Remove from active research
1634 if research_id in active_research:
1635 del active_research[research_id]
1637 # Remove from termination flags
1638 if research_id in termination_flags:
1639 del termination_flags[research_id]
1641 # Send a final message to subscribers
1642 try:
1643 # Import here to avoid circular imports
1644 from ..routes.globals import get_globals
1646 globals_dict = get_globals()
1647 socket_subscriptions = globals_dict.get("socket_subscriptions", {})
1649 # Send a final message to any remaining subscribers with explicit status
1650 if socket_subscriptions.get(research_id):
1651 # Use the proper status message based on database status
1652 if current_status in ( 1652 ↛ 1656line 1652 didn't jump to line 1656 because the condition on line 1652 was never true
1653 ResearchStatus.SUSPENDED,
1654 ResearchStatus.FAILED,
1655 ):
1656 final_message = {
1657 "status": current_status,
1658 "message": f"Research was {current_status}",
1659 "progress": 0, # For suspended research, show 0% not 100%
1660 }
1661 else:
1662 final_message = {
1663 "status": ResearchStatus.COMPLETED,
1664 "message": "Research process has ended and resources have been cleaned up",
1665 "progress": 100,
1666 }
1668 logger.info(
1669 "Sending final %s socket message for research %s",
1670 current_status,
1671 research_id,
1672 )
1674 SocketIOService().emit_to_subscribers(
1675 "research_progress", research_id, final_message
1676 )
1678 except Exception:
1679 logger.exception("Error sending final cleanup message")
1682def handle_termination(
1683 research_id, active_research, termination_flags, username=None
1684):
1685 """
1686 Handle the termination of a research process.
1688 Args:
1689 research_id: The ID of the research
1690 active_research: Dictionary of active research processes
1691 termination_flags: Dictionary of termination flags
1692 username: The username for database access (required for thread context)
1693 """
1694 logger.info(f"Handling termination for research {research_id}")
1696 # Queue the status update to be processed in the main thread
1697 # This avoids Flask request context errors in background threads
1698 try:
1699 from ..queue.processor_v2 import queue_processor
1701 now = datetime.now(UTC)
1702 completed_at = now.isoformat()
1704 # Queue the suspension update
1705 queue_processor.queue_error_update(
1706 username=username,
1707 research_id=research_id,
1708 status=ResearchStatus.SUSPENDED,
1709 error_message="Research was terminated by user",
1710 metadata={"terminated_at": completed_at},
1711 completed_at=completed_at,
1712 report_path=None,
1713 )
1715 logger.info(f"Queued suspension update for research {research_id}")
1716 except Exception:
1717 logger.exception(
1718 f"Error queueing termination update for research {research_id}"
1719 )
1721 # Clean up resources (this already handles things properly)
1722 cleanup_research_resources(
1723 research_id, active_research, termination_flags, username
1724 )
1727def cancel_research(research_id, username=None):
1728 """
1729 Cancel/terminate a research process using ORM.
1731 Args:
1732 research_id: The ID of the research to cancel
1733 username: The username of the user cancelling the research (optional, will try to get from session if not provided)
1735 Returns:
1736 bool: True if the research was found and cancelled, False otherwise
1737 """
1738 try:
1739 # Import globals from research routes
1740 from ..routes.globals import get_globals
1742 globals_dict = get_globals()
1743 active_research = globals_dict["active_research"]
1744 termination_flags = globals_dict["termination_flags"]
1746 # Set termination flag
1747 termination_flags[research_id] = True
1749 # Check if the research is active
1750 if research_id in active_research:
1751 # Call handle_termination to update database
1752 handle_termination(
1753 research_id, active_research, termination_flags, username
1754 )
1755 return True
1756 else:
1757 # Update database directly if not found in active_research
1758 # Get username from parameter or session
1759 if not username: 1759 ↛ 1760line 1759 didn't jump to line 1760 because the condition on line 1759 was never true
1760 from flask import session
1762 username = session.get("username")
1764 if not username: 1764 ↛ 1765line 1764 didn't jump to line 1765 because the condition on line 1764 was never true
1765 logger.warning(
1766 f"No username available for cancelling research {research_id}"
1767 )
1768 return False
1770 try:
1771 with get_user_db_session(username) as db_session:
1772 research = (
1773 db_session.query(ResearchHistory)
1774 .filter_by(id=research_id)
1775 .first()
1776 )
1777 if not research: 1777 ↛ 1778line 1777 didn't jump to line 1778 because the condition on line 1777 was never true
1778 logger.info(
1779 f"Research {research_id} not found in database"
1780 )
1781 return False
1783 # Check if already in a terminal state
1784 if research.status in (
1785 ResearchStatus.COMPLETED,
1786 ResearchStatus.SUSPENDED,
1787 ResearchStatus.FAILED,
1788 ResearchStatus.ERROR,
1789 ):
1790 logger.info(
1791 f"Research {research_id} already in terminal state: {research.status}"
1792 )
1793 return True # Consider this a success since it's already stopped
1795 # If it exists but isn't in active_research, still update status
1796 research.status = ResearchStatus.SUSPENDED
1797 db_session.commit()
1798 logger.info(
1799 f"Successfully suspended research {research_id}"
1800 )
1801 except Exception:
1802 logger.exception(
1803 f"Error accessing database for research {research_id}"
1804 )
1805 return False
1807 return True
1808 except Exception:
1809 logger.exception(
1810 f"Unexpected error in cancel_research for {research_id}"
1811 )
1812 return False