Coverage for src / local_deep_research / web / services / research_service.py: 48%
676 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import hashlib
2import json
3import re
4import threading
5from datetime import datetime, UTC
6from pathlib import Path
8from flask import g, session
9from loguru import logger
11from ...config.llm_config import get_llm
13# Output directory for research results
14from ...config.paths import get_research_outputs_directory
15from ...config.search_config import get_search
16from ...database.models import ResearchHistory, ResearchStrategy
17from ...database.session_context import get_user_db_session
18from ...error_handling.report_generator import ErrorReportGenerator
19from ...utilities.thread_context import set_search_context
20from ...report_generator import IntegratedReportGenerator
21from ...search_system import AdvancedSearchSystem
22from ...text_optimization import CitationFormatter, CitationMode
23from ...utilities.log_utils import log_for_research
24from ...utilities.search_utilities import extract_links_from_search_results
25from ...utilities.threading_utils import thread_context, thread_with_app_context
26from ..models.database import calculate_duration
27from .socket_service import SocketIOService
29OUTPUT_DIR = get_research_outputs_directory()
32def get_citation_formatter():
33 """Get citation formatter with settings from thread context."""
34 # Import here to avoid circular imports
35 from ...config.search_config import get_setting_from_snapshot
37 citation_format = get_setting_from_snapshot(
38 "report.citation_format", "number_hyperlinks"
39 )
40 mode_map = {
41 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
42 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
43 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
44 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
45 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
46 }
47 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS)
48 return CitationFormatter(mode=mode)
51def export_report_to_memory(
52 markdown_content: str, format: str, title: str = None
53):
54 """
55 Export a markdown report to different formats in memory.
57 Args:
58 markdown_content: The markdown content to export
59 format: Export format ('latex', 'quarto', 'ris', or 'pdf')
60 title: Optional title for the document
62 Returns:
63 Tuple of (content_bytes, filename, mimetype)
64 """
65 if format == "pdf": 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was never true
66 # Use WeasyPrint for PDF generation
67 from .pdf_service import get_pdf_service
69 pdf_service = get_pdf_service()
71 # Add title as H1 at the top if provided and not already present
72 if title and not markdown_content.startswith(f"# {title}"):
73 # Check if the content starts with any H1
74 if not markdown_content.startswith("#"):
75 markdown_content = f"# {title}\n\n{markdown_content}"
77 # Pass the title if provided, but don't add duplicate content
78 pdf_bytes = pdf_service.markdown_to_pdf(
79 markdown_content,
80 title=title, # Use the title from the research record (for HTML metadata)
81 metadata=None, # Don't add extra metadata section
82 )
84 # Generate a filename based on title or use default
85 safe_title = (
86 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50]
87 if title
88 else "research_report"
89 )
90 filename = f"{safe_title}.pdf"
92 logger.info(f"Generated PDF in memory, size: {len(pdf_bytes)} bytes")
93 return pdf_bytes, filename, "application/pdf"
95 elif format == "latex":
96 from ...text_optimization.citation_formatter import LaTeXExporter
98 exporter = LaTeXExporter()
99 exported_content = exporter.export_to_latex(markdown_content)
101 safe_title = (
102 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50]
103 if title
104 else "research_report"
105 )
106 filename = f"{safe_title}.tex"
108 logger.info("Generated LaTeX in memory")
109 return exported_content.encode("utf-8"), filename, "text/plain"
111 elif format == "quarto":
112 import zipfile
113 import io
114 from ...text_optimization.citation_formatter import QuartoExporter
116 exporter = QuartoExporter()
117 # Extract title from markdown if not provided
118 if not title: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 title_match = re.search(
120 r"^#\s+(.+)$", markdown_content, re.MULTILINE
121 )
122 title = title_match.group(1) if title_match else "Research Report"
123 exported_content = exporter.export_to_quarto(markdown_content, title)
125 # Generate bibliography
126 bib_content = exporter._generate_bibliography(markdown_content)
128 safe_title = (
129 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50]
130 if title
131 else "research_report"
132 )
134 # Create a zip file in memory containing both files
135 zip_buffer = io.BytesIO()
136 with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
137 # Add the Quarto document
138 zipf.writestr(f"{safe_title}.qmd", exported_content)
139 # Add the bibliography file
140 zipf.writestr("references.bib", bib_content)
142 zip_bytes = zip_buffer.getvalue()
143 filename = f"{safe_title}_quarto.zip"
145 logger.info("Generated Quarto with bibliography in memory (zip)")
146 return zip_bytes, filename, "application/zip"
148 elif format == "ris": 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 from ...text_optimization.citation_formatter import RISExporter
151 exporter = RISExporter()
152 exported_content = exporter.export_to_ris(markdown_content)
154 safe_title = (
155 re.sub(r"[^\w\s-]", "", title).strip().replace(" ", "_")[:50]
156 if title
157 else "research_report"
158 )
159 filename = f"{safe_title}.ris"
161 logger.info("Generated RIS in memory")
162 return exported_content.encode("utf-8"), filename, "text/plain"
164 else:
165 raise ValueError(f"Unsupported export format: {format}")
168def save_research_strategy(research_id, strategy_name, username=None):
169 """
170 Save the strategy used for a research to the database.
172 Args:
173 research_id: The ID of the research
174 strategy_name: The name of the strategy used
175 username: The username for database access (required for thread context)
176 """
177 try:
178 logger.debug(
179 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}"
180 )
181 with get_user_db_session(username) as session:
182 try:
183 # Check if a strategy already exists for this research
184 existing_strategy = (
185 session.query(ResearchStrategy)
186 .filter_by(research_id=research_id)
187 .first()
188 )
190 if existing_strategy: 190 ↛ 192line 190 didn't jump to line 192 because the condition on line 190 was never true
191 # Update existing strategy
192 existing_strategy.strategy_name = strategy_name
193 logger.debug(
194 f"Updating existing strategy for research {research_id}"
195 )
196 else:
197 # Create new strategy record
198 new_strategy = ResearchStrategy(
199 research_id=research_id, strategy_name=strategy_name
200 )
201 session.add(new_strategy)
202 logger.debug(
203 f"Creating new strategy record for research {research_id}"
204 )
206 session.commit()
207 logger.info(
208 f"Saved strategy '{strategy_name}' for research {research_id}"
209 )
210 finally:
211 session.close()
212 except Exception:
213 logger.exception("Error saving research strategy")
216def get_research_strategy(research_id, username=None):
217 """
218 Get the strategy used for a research.
220 Args:
221 research_id: The ID of the research
222 username: The username for database access (required for thread context)
224 Returns:
225 str: The strategy name or None if not found
226 """
227 try:
228 with get_user_db_session(username) as session:
229 try:
230 strategy = (
231 session.query(ResearchStrategy)
232 .filter_by(research_id=research_id)
233 .first()
234 )
236 return strategy.strategy_name if strategy else None
237 finally:
238 session.close()
239 except Exception:
240 logger.exception("Error getting research strategy")
241 return None
244def start_research_process(
245 research_id,
246 query,
247 mode,
248 active_research,
249 termination_flags,
250 run_research_callback,
251 **kwargs,
252):
253 """
254 Start a research process in a background thread.
256 Args:
257 research_id: The ID of the research
258 query: The research query
259 mode: The research mode (quick/detailed)
260 active_research: Dictionary of active research processes
261 termination_flags: Dictionary of termination flags
262 run_research_callback: The callback function to run the research
263 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.)
265 Returns:
266 threading.Thread: The thread running the research
267 """
268 # Pass the app context to the thread.
269 run_research_callback = thread_with_app_context(run_research_callback)
271 # Start research process in a background thread
272 thread = threading.Thread(
273 target=run_research_callback,
274 args=(
275 thread_context(),
276 research_id,
277 query,
278 mode,
279 active_research,
280 termination_flags,
281 ),
282 kwargs=kwargs,
283 )
284 thread.daemon = True
285 thread.start()
287 active_research[research_id] = {
288 "thread": thread,
289 "progress": 0,
290 "status": "in_progress",
291 "log": [],
292 "settings": kwargs, # Store settings for reference
293 }
295 return thread
298def _generate_report_path(query: str) -> Path:
299 """
300 Generates a path for a new report file based on the query.
302 Args:
303 query: The query used for the report.
305 Returns:
306 The path that it generated.
308 """
309 # Generate a unique filename that does not contain
310 # non-alphanumeric characters.
311 query_hash = hashlib.md5( # DevSkim: ignore DS126858
312 query.encode("utf-8"), usedforsecurity=False
313 ).hexdigest()[:10]
314 return OUTPUT_DIR / (
315 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md"
316 )
319@log_for_research
320def run_research_process(
321 research_id, query, mode, active_research, termination_flags, **kwargs
322):
323 """
324 Run the research process in the background for a given research ID.
326 Args:
327 research_id: The ID of the research
328 query: The research query
329 mode: The research mode (quick/detailed)
330 active_research: Dictionary of active research processes
331 termination_flags: Dictionary of termination flags
332 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.)
333 MUST include 'username' for database access
334 """
336 # Extract username - required for database access
337 username = kwargs.get("username")
338 logger.info(f"Research thread started with username: {username}")
339 if not username: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 logger.error("No username provided to research thread")
341 raise ValueError("Username is required for research process")
342 try:
343 # Check if this research has been terminated before we even start
344 if termination_flags.get(research_id): 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 logger.info(
346 f"Research {research_id} was terminated before starting"
347 )
348 cleanup_research_resources(
349 research_id, active_research, termination_flags, username
350 )
351 return
353 logger.info(
354 f"Starting research process for ID {research_id}, query: {query}"
355 )
357 # Extract key parameters
358 model_provider = kwargs.get("model_provider")
359 model = kwargs.get("model")
360 custom_endpoint = kwargs.get("custom_endpoint")
361 search_engine = kwargs.get("search_engine")
362 max_results = kwargs.get("max_results")
363 time_period = kwargs.get("time_period")
364 iterations = kwargs.get("iterations")
365 questions_per_iteration = kwargs.get("questions_per_iteration")
366 strategy = kwargs.get(
367 "strategy", "source-based"
368 ) # Default to source-based
369 settings_snapshot = kwargs.get(
370 "settings_snapshot", {}
371 ) # Complete settings snapshot
373 # Log settings snapshot to debug
374 from ...settings.logger import log_settings
376 log_settings(settings_snapshot, "Settings snapshot received in thread")
378 # Strategy should already be saved in the database before thread starts
379 logger.info(f"Research strategy: {strategy}")
381 # Log all parameters for debugging
382 logger.info(
383 f"Research parameters: provider={model_provider}, model={model}, "
384 f"search_engine={search_engine}, max_results={max_results}, "
385 f"time_period={time_period}, iterations={iterations}, "
386 f"questions_per_iteration={questions_per_iteration}, "
387 f"custom_endpoint={custom_endpoint}, strategy={strategy}"
388 )
390 # Set up the AI Context Manager
391 output_dir = OUTPUT_DIR / f"research_{research_id}"
392 output_dir.mkdir(parents=True, exist_ok=True)
394 # Create a settings context that uses snapshot if available, otherwise falls back to database
395 # This allows the research to be independent of database during execution
396 class SettingsContext:
397 def __init__(self, snapshot, username):
398 self.snapshot = snapshot or {}
399 self.username = username
400 # Extract values from setting objects if needed
401 self.values = {}
402 for key, setting in self.snapshot.items():
403 if isinstance(setting, dict) and "value" in setting: 403 ↛ 408line 403 didn't jump to line 408 because the condition on line 403 was always true
404 # It's a full setting object, extract the value
405 self.values[key] = setting["value"]
406 else:
407 # It's already just a value
408 self.values[key] = setting
410 def get_setting(self, key, default=None):
411 """Get setting from snapshot only - no database access in threads"""
412 if key in self.values:
413 return self.values[key]
414 # No fallback to database - threads must use snapshot only
415 logger.debug(
416 f"Setting '{key}' not found in snapshot, using default"
417 )
418 return default
420 settings_context = SettingsContext(settings_snapshot, username)
422 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var
423 from ...settings.logger import log_settings
425 log_settings(
426 settings_context.values, "SettingsContext values extracted"
427 )
429 # Set the settings context for this thread
430 from ...config.thread_settings import set_settings_context
432 set_settings_context(settings_context)
434 # Get user password if provided
435 user_password = kwargs.get("user_password")
437 # Create shared research context that can be updated during research
438 shared_research_context = {
439 "research_id": research_id,
440 "research_query": query,
441 "research_mode": mode,
442 "research_phase": "init",
443 "search_iteration": 0,
444 "search_engines_planned": None,
445 "search_engine_selected": search_engine,
446 "username": username, # Add username for queue operations
447 "user_password": user_password, # Add password for metrics access
448 }
450 # If this is a follow-up research, include the parent context
451 if "research_context" in kwargs and kwargs["research_context"]: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true
452 logger.info(
453 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings"
454 )
455 shared_research_context.update(kwargs["research_context"])
457 # Do not log context keys as they may contain sensitive information
458 logger.info(f"Created shared_research_context for user: {username}")
460 # Set search context for search tracking
461 set_search_context(shared_research_context)
463 # Set up progress callback
464 def progress_callback(message, progress_percent, metadata):
465 # Frequent termination check
466 if termination_flags.get(research_id):
467 handle_termination(
468 research_id, active_research, termination_flags, username
469 )
470 raise Exception("Research was terminated by user")
472 # Bind research_id to logger for this specific log
473 bound_logger = logger.bind(research_id=research_id)
474 bound_logger.log("MILESTONE", message)
476 if "SEARCH_PLAN:" in message: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true
477 engines = message.split("SEARCH_PLAN:")[1].strip()
478 metadata["planned_engines"] = engines
479 metadata["phase"] = "search_planning" # Use existing phase
480 # Update shared context for token tracking
481 shared_research_context["search_engines_planned"] = engines
482 shared_research_context["research_phase"] = "search_planning"
484 if "ENGINE_SELECTED:" in message: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 engine = message.split("ENGINE_SELECTED:")[1].strip()
486 metadata["selected_engine"] = engine
487 metadata["phase"] = "search" # Use existing 'search' phase
488 # Update shared context for token tracking
489 shared_research_context["search_engine_selected"] = engine
490 shared_research_context["research_phase"] = "search"
492 # Capture other research phases for better context tracking
493 if metadata.get("phase"): 493 ↛ 497line 493 didn't jump to line 497 because the condition on line 493 was always true
494 shared_research_context["research_phase"] = metadata["phase"]
496 # Update search iteration if available
497 if "iteration" in metadata:
498 shared_research_context["search_iteration"] = metadata[
499 "iteration"
500 ]
502 # Adjust progress based on research mode
503 adjusted_progress = progress_percent
504 if ( 504 ↛ 509line 504 didn't jump to line 509 because the condition on line 504 was never true
505 mode == "detailed"
506 and metadata.get("phase") == "output_generation"
507 ):
508 # For detailed mode, adjust the progress range for output generation
509 adjusted_progress = min(80, progress_percent)
510 elif ( 510 ↛ 515line 510 didn't jump to line 515 because the condition on line 510 was never true
511 mode == "detailed"
512 and metadata.get("phase") == "report_generation"
513 ):
514 # Scale the progress from 80% to 95% for the report generation phase
515 if progress_percent is not None:
516 normalized = progress_percent / 100
517 adjusted_progress = 80 + (normalized * 15)
518 elif (
519 mode == "quick" and metadata.get("phase") == "output_generation"
520 ):
521 # For quick mode, ensure we're at least at 85% during output generation
522 adjusted_progress = max(85, progress_percent)
523 # Map any further progress within output_generation to 85-95% range
524 if progress_percent is not None and progress_percent > 0: 524 ↛ 529line 524 didn't jump to line 529 because the condition on line 524 was always true
525 normalized = progress_percent / 100
526 adjusted_progress = 85 + (normalized * 10)
528 # Don't let progress go backwards
529 if research_id in active_research and adjusted_progress is not None:
530 current_progress = active_research[research_id].get(
531 "progress", 0
532 )
533 adjusted_progress = max(current_progress, adjusted_progress)
535 # Update active research record
536 if research_id in active_research:
537 if adjusted_progress is not None:
538 active_research[research_id]["progress"] = adjusted_progress
540 # Queue the progress update to be processed in main thread
541 if adjusted_progress is not None:
542 from ..queue.processor_v2 import queue_processor
544 if username: 544 ↛ 549line 544 didn't jump to line 549 because the condition on line 544 was always true
545 queue_processor.queue_progress_update(
546 username, research_id, adjusted_progress
547 )
548 else:
549 logger.warning(
550 f"Cannot queue progress update for research {research_id} - no username available"
551 )
553 # Emit a socket event
554 try:
555 # Basic event data
556 event_data = {"progress": adjusted_progress}
558 SocketIOService().emit_to_subscribers(
559 "progress", research_id, event_data
560 )
561 except Exception:
562 logger.exception("Socket emit error (non-critical)")
564 # Function to check termination during long-running operations
565 def check_termination():
566 if termination_flags.get(research_id):
567 handle_termination(
568 research_id, active_research, termination_flags, username
569 )
570 raise Exception(
571 "Research was terminated by user during long-running operation"
572 )
573 return False # Not terminated
575 # Configure the system with the specified parameters
576 use_llm = None
577 if model or search_engine or model_provider: 577 ↛ 584line 577 didn't jump to line 584 because the condition on line 577 was always true
578 # Log that we're overriding system settings
579 logger.info(
580 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}"
581 )
583 # Override LLM if model or model_provider specified
584 if model or model_provider: 584 ↛ 605line 584 didn't jump to line 605 because the condition on line 584 was always true
585 try:
586 # Get LLM with the overridden settings
587 # Use the shared_research_context which includes username
588 use_llm = get_llm(
589 model_name=model,
590 provider=model_provider,
591 openai_endpoint_url=custom_endpoint,
592 research_id=research_id,
593 research_context=shared_research_context,
594 )
596 logger.info(
597 f"Successfully set LLM to: provider={model_provider}, model={model}"
598 )
599 except Exception:
600 logger.exception(
601 f"Error setting LLM provider={model_provider}, model={model}"
602 )
604 # Create search engine first if specified, to avoid default creation without username
605 use_search = None
606 if search_engine: 606 ↛ 624line 606 didn't jump to line 624 because the condition on line 606 was always true
607 try:
608 # Create a new search object with these settings
609 use_search = get_search(
610 search_tool=search_engine,
611 llm_instance=use_llm,
612 username=username,
613 settings_snapshot=settings_snapshot,
614 )
615 logger.info(
616 f"Successfully created search engine: {search_engine}"
617 )
618 except Exception:
619 logger.exception(
620 f"Error creating search engine {search_engine}"
621 )
623 # Set the progress callback in the system
624 system = AdvancedSearchSystem(
625 llm=use_llm,
626 search=use_search,
627 strategy_name=strategy,
628 max_iterations=iterations,
629 questions_per_iteration=questions_per_iteration,
630 username=username,
631 settings_snapshot=settings_snapshot,
632 research_id=research_id,
633 research_context=shared_research_context,
634 )
635 system.set_progress_callback(progress_callback)
637 # Run the search
638 progress_callback("Starting research process", 5, {"phase": "init"})
640 try:
641 results = system.analyze_topic(query)
642 if mode == "quick": 642 ↛ 649line 642 didn't jump to line 649 because the condition on line 642 was always true
643 progress_callback(
644 "Search complete, preparing to generate summary...",
645 85,
646 {"phase": "output_generation"},
647 )
648 else:
649 progress_callback(
650 "Search complete, generating output",
651 80,
652 {"phase": "output_generation"},
653 )
654 except Exception as search_error:
655 # Better handling of specific search errors
656 error_message = str(search_error)
657 error_type = "unknown"
659 # Extract error details for common issues
660 if "status code: 503" in error_message:
661 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system."
662 error_type = "ollama_unavailable"
663 elif "status code: 404" in error_message:
664 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model."
665 error_type = "model_not_found"
666 elif "status code:" in error_message:
667 # Extract the status code for other HTTP errors
668 status_code = error_message.split("status code:")[1].strip()
669 error_message = f"API request failed with status code {status_code}. Please check your configuration."
670 error_type = "api_error"
671 elif "connection" in error_message.lower():
672 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible."
673 error_type = "connection_error"
675 # Raise with improved error message
676 raise Exception(f"{error_message} (Error type: {error_type})")
678 # Generate output based on mode
679 if mode == "quick": 679 ↛ 1181line 679 didn't jump to line 1181 because the condition on line 679 was always true
680 # Quick Summary
681 if results.get("findings") or results.get("formatted_findings"): 681 ↛ 1176line 681 didn't jump to line 1176 because the condition on line 681 was always true
682 raw_formatted_findings = results["formatted_findings"]
684 # Check if formatted_findings contains an error message
685 if isinstance( 685 ↛ 688line 685 didn't jump to line 688 because the condition on line 685 was never true
686 raw_formatted_findings, str
687 ) and raw_formatted_findings.startswith("Error:"):
688 logger.exception(
689 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..."
690 )
692 # Determine error type for better user feedback
693 error_type = "unknown"
694 error_message = raw_formatted_findings.lower()
696 if (
697 "token limit" in error_message
698 or "context length" in error_message
699 ):
700 error_type = "token_limit"
701 # Log specific error type
702 logger.warning(
703 "Detected token limit error in synthesis"
704 )
706 # Update progress with specific error type
707 progress_callback(
708 "Synthesis hit token limits. Attempting fallback...",
709 87,
710 {
711 "phase": "synthesis_error",
712 "error_type": error_type,
713 },
714 )
715 elif (
716 "timeout" in error_message
717 or "timed out" in error_message
718 ):
719 error_type = "timeout"
720 logger.warning("Detected timeout error in synthesis")
721 progress_callback(
722 "Synthesis timed out. Attempting fallback...",
723 87,
724 {
725 "phase": "synthesis_error",
726 "error_type": error_type,
727 },
728 )
729 elif "rate limit" in error_message:
730 error_type = "rate_limit"
731 logger.warning("Detected rate limit error in synthesis")
732 progress_callback(
733 "LLM rate limit reached. Attempting fallback...",
734 87,
735 {
736 "phase": "synthesis_error",
737 "error_type": error_type,
738 },
739 )
740 elif (
741 "connection" in error_message
742 or "network" in error_message
743 ):
744 error_type = "connection"
745 logger.warning("Detected connection error in synthesis")
746 progress_callback(
747 "Connection issue with LLM. Attempting fallback...",
748 87,
749 {
750 "phase": "synthesis_error",
751 "error_type": error_type,
752 },
753 )
754 elif (
755 "llm error" in error_message
756 or "final answer synthesis fail" in error_message
757 ):
758 error_type = "llm_error"
759 logger.warning(
760 "Detected general LLM error in synthesis"
761 )
762 progress_callback(
763 "LLM error during synthesis. Attempting fallback...",
764 87,
765 {
766 "phase": "synthesis_error",
767 "error_type": error_type,
768 },
769 )
770 else:
771 # Generic error
772 logger.warning("Detected unknown error in synthesis")
773 progress_callback(
774 "Error during synthesis. Attempting fallback...",
775 87,
776 {
777 "phase": "synthesis_error",
778 "error_type": "unknown",
779 },
780 )
782 # Extract synthesized content from findings if available
783 synthesized_content = ""
784 for finding in results.get("findings", []):
785 if finding.get("phase") == "Final synthesis":
786 synthesized_content = finding.get("content", "")
787 break
789 # Use synthesized content as fallback
790 if (
791 synthesized_content
792 and not synthesized_content.startswith("Error:")
793 ):
794 logger.info(
795 "Using existing synthesized content as fallback"
796 )
797 raw_formatted_findings = synthesized_content
799 # Or use current_knowledge as another fallback
800 elif results.get("current_knowledge"):
801 logger.info("Using current_knowledge as fallback")
802 raw_formatted_findings = results["current_knowledge"]
804 # Or combine all finding contents as last resort
805 elif results.get("findings"):
806 logger.info("Combining all findings as fallback")
807 # First try to use any findings that are not errors
808 valid_findings = [
809 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
810 for finding in results.get("findings", [])
811 if finding.get("content")
812 and not finding.get("content", "").startswith(
813 "Error:"
814 )
815 ]
817 if valid_findings:
818 raw_formatted_findings = (
819 "# Research Results (Fallback Mode)\n\n"
820 )
821 raw_formatted_findings += "\n\n".join(
822 valid_findings
823 )
824 raw_formatted_findings += f"\n\n## Error Information\n{raw_formatted_findings}"
825 else:
826 # Last resort: use everything including errors
827 raw_formatted_findings = (
828 "# Research Results (Emergency Fallback)\n\n"
829 )
830 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n"
831 raw_formatted_findings += "\n\n".join(
832 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
833 for finding in results.get("findings", [])
834 if finding.get("content")
835 )
837 progress_callback(
838 f"Using fallback synthesis due to {error_type} error",
839 88,
840 {
841 "phase": "synthesis_fallback",
842 "error_type": error_type,
843 },
844 )
846 logger.info(
847 "Found formatted_findings of length: %s",
848 len(str(raw_formatted_findings)),
849 )
851 try:
852 # Check if we have an error in the findings and use enhanced error handling
853 if isinstance( 853 ↛ 856line 853 didn't jump to line 856 because the condition on line 853 was never true
854 raw_formatted_findings, str
855 ) and raw_formatted_findings.startswith("Error:"):
856 logger.info(
857 "Generating enhanced error report using ErrorReportGenerator"
858 )
860 # Get LLM for error explanation if available
861 try:
862 llm = get_llm(
863 research_id=research_id,
864 research_context=shared_research_context,
865 )
866 except Exception:
867 llm = None
868 logger.warning(
869 "Could not get LLM for error explanation"
870 )
872 # Generate comprehensive error report
873 error_generator = ErrorReportGenerator(llm)
874 clean_markdown = error_generator.generate_error_report(
875 error_message=raw_formatted_findings,
876 query=query,
877 partial_results=results,
878 search_iterations=results.get("iterations", 0),
879 research_id=research_id,
880 )
882 logger.info(
883 "Generated enhanced error report with %d characters",
884 len(clean_markdown),
885 )
886 else:
887 # Get the synthesized content from the LLM directly
888 clean_markdown = raw_formatted_findings
890 # Extract all sources from findings to add them to the summary
891 all_links = []
892 for finding in results.get("findings", []):
893 search_results = finding.get("search_results", [])
894 if search_results:
895 try:
896 links = extract_links_from_search_results(
897 search_results
898 )
899 all_links.extend(links)
900 except Exception:
901 logger.exception(
902 "Error processing search results/links"
903 )
905 logger.info(
906 "Successfully converted to clean markdown of length: %s",
907 len(clean_markdown),
908 )
910 # First send a progress update for generating the summary
911 progress_callback(
912 "Generating clean summary from research data...",
913 90,
914 {"phase": "output_generation"},
915 )
917 # Send progress update for saving report
918 progress_callback(
919 "Saving research report to database...",
920 95,
921 {"phase": "report_complete"},
922 )
924 # Format citations in the markdown content
925 formatter = get_citation_formatter()
926 formatted_content = formatter.format_document(
927 clean_markdown
928 )
930 # Prepare complete report content
931 full_report_content = f"""{formatted_content}
933## Research Metrics
934- Search Iterations: {results["iterations"]}
935- Generated at: {datetime.now(UTC).isoformat()}
936"""
938 # Save sources to database
939 from .research_sources_service import ResearchSourcesService
941 sources_service = ResearchSourcesService()
942 if all_links:
943 logger.info(
944 f"Quick summary: Saving {len(all_links)} sources to database"
945 )
946 sources_saved = sources_service.save_research_sources(
947 research_id=research_id,
948 sources=all_links,
949 username=username,
950 )
951 logger.info(
952 f"Quick summary: Saved {sources_saved} sources for research {research_id}"
953 )
955 # Save report using storage abstraction
956 from ...storage import get_report_storage
958 with get_user_db_session(username) as db_session:
959 storage = get_report_storage(session=db_session)
961 # Prepare metadata
962 metadata = {
963 "iterations": results["iterations"],
964 "generated_at": datetime.now(UTC).isoformat(),
965 }
967 # Save report using storage abstraction
968 success = storage.save_report(
969 research_id=research_id,
970 content=full_report_content,
971 metadata=metadata,
972 username=username,
973 )
975 if not success: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true
976 raise Exception("Failed to save research report")
978 logger.info(
979 f"Report saved for research_id: {research_id}"
980 )
982 # Skip export to additional formats - we're storing in database only
984 # Update research status in database
985 completed_at = datetime.now(UTC).isoformat()
987 with get_user_db_session(username) as db_session:
988 research = (
989 db_session.query(ResearchHistory)
990 .filter_by(id=research_id)
991 .first()
992 )
994 # Preserve existing metadata and update with new values
995 logger.info(
996 f"Existing research_meta type: {type(research.research_meta)}"
997 )
999 # Handle both dict and string types for research_meta
1000 if isinstance(research.research_meta, dict): 1000 ↛ 1002line 1000 didn't jump to line 1002 because the condition on line 1000 was always true
1001 metadata = dict(research.research_meta)
1002 elif isinstance(research.research_meta, str):
1003 try:
1004 metadata = json.loads(research.research_meta)
1005 except json.JSONDecodeError:
1006 logger.exception(
1007 f"Failed to parse research_meta as JSON: {research.research_meta}"
1008 )
1009 metadata = {}
1010 else:
1011 metadata = {}
1013 metadata.update(
1014 {
1015 "iterations": results["iterations"],
1016 "generated_at": datetime.now(UTC).isoformat(),
1017 }
1018 )
1020 # Use the helper function for consistent duration calculation
1021 duration_seconds = calculate_duration(
1022 research.created_at, completed_at
1023 )
1025 research.status = "completed"
1026 research.completed_at = completed_at
1027 research.duration_seconds = duration_seconds
1028 # Note: report_content is saved by CachedResearchService
1029 # report_path is not used in encrypted database version
1031 # Generate headline and topics only for news searches
1032 if ( 1032 ↛ 1036line 1032 didn't jump to line 1036 because the condition on line 1032 was never true
1033 metadata.get("is_news_search")
1034 or metadata.get("search_type") == "news_analysis"
1035 ):
1036 try:
1037 from ...news.utils.headline_generator import (
1038 generate_headline,
1039 )
1040 from ...news.utils.topic_generator import (
1041 generate_topics,
1042 )
1044 # Get the report content from database for better headline/topic generation
1045 report_content = ""
1046 try:
1047 research = (
1048 db_session.query(ResearchHistory)
1049 .filter_by(id=research_id)
1050 .first()
1051 )
1052 if research and research.report_content:
1053 report_content = research.report_content
1054 logger.info(
1055 f"Retrieved {len(report_content)} chars from database for headline generation"
1056 )
1057 else:
1058 logger.warning(
1059 f"No report content found in database for research_id: {research_id}"
1060 )
1061 except Exception as e:
1062 logger.warning(
1063 f"Could not retrieve report content from database: {e}"
1064 )
1066 # Generate headline
1067 logger.info(
1068 f"Generating headline for query: {query[:100]}"
1069 )
1070 headline = generate_headline(
1071 query, report_content
1072 )
1073 metadata["generated_headline"] = headline
1075 # Generate topics
1076 logger.info(
1077 f"Generating topics with category: {metadata.get('category', 'News')}"
1078 )
1079 topics = generate_topics(
1080 query=query,
1081 findings=report_content,
1082 category=metadata.get("category", "News"),
1083 max_topics=6,
1084 )
1085 metadata["generated_topics"] = topics
1087 logger.info(f"Generated headline: {headline}")
1088 logger.info(f"Generated topics: {topics}")
1090 except Exception as e:
1091 logger.warning(
1092 f"Could not generate headline/topics: {e}"
1093 )
1095 research.research_meta = metadata
1097 db_session.commit()
1098 logger.info(
1099 f"Database commit completed for research_id: {research_id}"
1100 )
1102 # Update subscription if this was triggered by a subscription
1103 if metadata.get("subscription_id"): 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true
1104 try:
1105 from ...news.subscription_manager.storage import (
1106 SQLSubscriptionStorage,
1107 )
1108 from datetime import (
1109 datetime as dt,
1110 timezone,
1111 timedelta,
1112 )
1114 sub_storage = SQLSubscriptionStorage()
1115 subscription_id = metadata["subscription_id"]
1117 # Get subscription to find refresh interval
1118 subscription = sub_storage.get(subscription_id)
1119 if subscription:
1120 refresh_minutes = subscription.get(
1121 "refresh_minutes", 240
1122 )
1123 now = dt.now(timezone.utc)
1124 next_refresh = now + timedelta(
1125 minutes=refresh_minutes
1126 )
1128 # Update refresh times
1129 sub_storage.update_refresh_time(
1130 subscription_id=subscription_id,
1131 last_refresh=now,
1132 next_refresh=next_refresh,
1133 )
1135 # Increment stats
1136 sub_storage.increment_stats(
1137 subscription_id, 1
1138 )
1140 logger.info(
1141 f"Updated subscription {subscription_id} refresh times"
1142 )
1143 except Exception as e:
1144 logger.warning(
1145 f"Could not update subscription refresh time: {e}"
1146 )
1148 logger.info(
1149 f"Database updated successfully for research_id: {research_id}"
1150 )
1152 # Send the final completion message
1153 progress_callback(
1154 "Research completed successfully",
1155 100,
1156 {"phase": "complete"},
1157 )
1159 # Clean up resources
1160 logger.info(
1161 "Cleaning up resources for research_id: %s", research_id
1162 )
1163 cleanup_research_resources(
1164 research_id, active_research, termination_flags
1165 )
1166 logger.info(
1167 "Resources cleaned up for research_id: %s", research_id
1168 )
1170 except Exception as inner_e:
1171 logger.exception("Error during quick summary generation")
1172 raise Exception(
1173 f"Error generating quick summary: {inner_e!s}"
1174 )
1175 else:
1176 raise Exception(
1177 "No research findings were generated. Please try again."
1178 )
1179 else:
1180 # Full Report
1181 progress_callback(
1182 "Generating detailed report...",
1183 85,
1184 {"phase": "report_generation"},
1185 )
1187 # Extract the search system from the results if available
1188 search_system = results.get("search_system", None)
1190 # Pass the existing search system to maintain citation indices
1191 report_generator = IntegratedReportGenerator(
1192 search_system=search_system
1193 )
1194 final_report = report_generator.generate_report(results, query)
1196 progress_callback(
1197 "Report generation complete", 95, {"phase": "report_complete"}
1198 )
1200 # Format citations in the report content
1201 formatter = get_citation_formatter()
1202 formatted_content = formatter.format_document(
1203 final_report["content"]
1204 )
1206 # Save sources to database
1207 from .research_sources_service import ResearchSourcesService
1209 sources_service = ResearchSourcesService()
1210 if (
1211 hasattr(search_system, "all_links_of_system")
1212 and search_system.all_links_of_system
1213 ):
1214 logger.info(
1215 f"Saving {len(search_system.all_links_of_system)} sources to database"
1216 )
1217 sources_saved = sources_service.save_research_sources(
1218 research_id=research_id,
1219 sources=search_system.all_links_of_system,
1220 username=username,
1221 )
1222 logger.info(
1223 f"Saved {sources_saved} sources for research {research_id}"
1224 )
1226 # Save report to database
1227 with get_user_db_session(username) as db_session:
1228 # Update metadata
1229 metadata = final_report["metadata"]
1230 metadata["iterations"] = results["iterations"]
1232 # Save report to database
1233 try:
1234 research = (
1235 db_session.query(ResearchHistory)
1236 .filter_by(id=research_id)
1237 .first()
1238 )
1240 if not research:
1241 logger.error(f"Research {research_id} not found")
1242 success = False
1243 else:
1244 research.report_content = formatted_content
1245 if research.research_meta:
1246 research.research_meta.update(metadata)
1247 else:
1248 research.research_meta = metadata
1249 db_session.commit()
1250 success = True
1251 logger.info(
1252 f"Saved report for research {research_id} to database"
1253 )
1254 except Exception:
1255 logger.exception("Error saving report to database")
1256 db_session.rollback()
1257 success = False
1259 if not success:
1260 raise Exception("Failed to save research report")
1262 logger.info(
1263 f"Report saved to database for research_id: {research_id}"
1264 )
1266 # Update research status in database
1267 completed_at = datetime.now(UTC).isoformat()
1269 with get_user_db_session(username) as db_session:
1270 research = (
1271 db_session.query(ResearchHistory)
1272 .filter_by(id=research_id)
1273 .first()
1274 )
1276 # Preserve existing metadata and merge with report metadata
1277 logger.info(
1278 f"Full report - Existing research_meta type: {type(research.research_meta)}"
1279 )
1281 # Handle both dict and string types for research_meta
1282 if isinstance(research.research_meta, dict):
1283 metadata = dict(research.research_meta)
1284 elif isinstance(research.research_meta, str):
1285 try:
1286 metadata = json.loads(research.research_meta)
1287 except json.JSONDecodeError:
1288 logger.exception(
1289 f"Failed to parse research_meta as JSON: {research.research_meta}"
1290 )
1291 metadata = {}
1292 else:
1293 metadata = {}
1295 metadata.update(final_report["metadata"])
1296 metadata["iterations"] = results["iterations"]
1298 # Use the helper function for consistent duration calculation
1299 duration_seconds = calculate_duration(
1300 research.created_at, completed_at
1301 )
1303 research.status = "completed"
1304 research.completed_at = completed_at
1305 research.duration_seconds = duration_seconds
1306 # Note: report_content is saved by CachedResearchService
1307 # report_path is not used in encrypted database version
1309 # Generate headline and topics only for news searches
1310 if (
1311 metadata.get("is_news_search")
1312 or metadata.get("search_type") == "news_analysis"
1313 ):
1314 try:
1315 from ..news.utils.headline_generator import (
1316 generate_headline,
1317 )
1318 from ..news.utils.topic_generator import (
1319 generate_topics,
1320 )
1322 # Get the report content from database for better headline/topic generation
1323 report_content = ""
1324 try:
1325 research = (
1326 db_session.query(ResearchHistory)
1327 .filter_by(id=research_id)
1328 .first()
1329 )
1330 if research and research.report_content:
1331 report_content = research.report_content
1332 else:
1333 logger.warning(
1334 f"No report content found in database for research_id: {research_id}"
1335 )
1336 except Exception as e:
1337 logger.warning(
1338 f"Could not retrieve report content from database: {e}"
1339 )
1341 # Generate headline
1342 headline = generate_headline(query, report_content)
1343 metadata["generated_headline"] = headline
1345 # Generate topics
1346 topics = generate_topics(
1347 query=query,
1348 findings=report_content,
1349 category=metadata.get("category", "News"),
1350 max_topics=6,
1351 )
1352 metadata["generated_topics"] = topics
1354 logger.info(f"Generated headline: {headline}")
1355 logger.info(f"Generated topics: {topics}")
1357 except Exception as e:
1358 logger.warning(
1359 f"Could not generate headline/topics: {e}"
1360 )
1362 research.research_meta = metadata
1364 db_session.commit()
1366 # Update subscription if this was triggered by a subscription
1367 if metadata.get("subscription_id"):
1368 try:
1369 from ...news.subscription_manager.storage import (
1370 SQLSubscriptionStorage,
1371 )
1372 from datetime import datetime as dt, timezone, timedelta
1374 sub_storage = SQLSubscriptionStorage()
1375 subscription_id = metadata["subscription_id"]
1377 # Get subscription to find refresh interval
1378 subscription = sub_storage.get(subscription_id)
1379 if subscription:
1380 refresh_minutes = subscription.get(
1381 "refresh_minutes", 240
1382 )
1383 now = dt.now(timezone.utc)
1384 next_refresh = now + timedelta(
1385 minutes=refresh_minutes
1386 )
1388 # Update refresh times
1389 sub_storage.update_refresh_time(
1390 subscription_id=subscription_id,
1391 last_refresh=now,
1392 next_refresh=next_refresh,
1393 )
1395 # Increment stats
1396 sub_storage.increment_stats(subscription_id, 1)
1398 logger.info(
1399 f"Updated subscription {subscription_id} refresh times"
1400 )
1401 except Exception as e:
1402 logger.warning(
1403 f"Could not update subscription refresh time: {e}"
1404 )
1406 progress_callback(
1407 "Research completed successfully",
1408 100,
1409 {"phase": "complete"},
1410 )
1412 # Clean up resources
1413 cleanup_research_resources(
1414 research_id, active_research, termination_flags, username
1415 )
1417 except Exception as e:
1418 # Handle error
1419 error_message = f"Research failed: {e!s}"
1420 logger.exception(error_message)
1422 try:
1423 # Check for common Ollama error patterns in the exception and provide more user-friendly errors
1424 user_friendly_error = str(e)
1425 error_context = {}
1427 if "Error type: ollama_unavailable" in user_friendly_error: 1427 ↛ 1428line 1427 didn't jump to line 1428 because the condition on line 1427 was never true
1428 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system."
1429 error_context = {
1430 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly."
1431 }
1432 elif "Error type: model_not_found" in user_friendly_error: 1432 ↛ 1433line 1432 didn't jump to line 1433 because the condition on line 1432 was never true
1433 user_friendly_error = "Required Ollama model not found. Please pull the model first."
1434 error_context = {
1435 "solution": "Run 'ollama pull mistral' to download the required model."
1436 }
1437 elif "Error type: connection_error" in user_friendly_error: 1437 ↛ 1438line 1437 didn't jump to line 1438 because the condition on line 1437 was never true
1438 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running."
1439 error_context = {
1440 "solution": "Ensure Ollama or your API service is running and accessible."
1441 }
1442 elif "Error type: api_error" in user_friendly_error: 1442 ↛ 1444line 1442 didn't jump to line 1444 because the condition on line 1442 was never true
1443 # Keep the original error message as it's already improved
1444 error_context = {
1445 "solution": "Check API configuration and credentials."
1446 }
1448 # Generate enhanced error report for failed research
1449 enhanced_report_content = None
1450 try:
1451 # Get LLM for error explanation if available
1452 try:
1453 llm = get_llm(
1454 research_id=research_id,
1455 research_context=shared_research_context,
1456 )
1457 except Exception:
1458 llm = None
1459 logger.warning(
1460 "Could not get LLM for error explanation in failure handler"
1461 )
1463 # Get partial results if they exist
1464 partial_results = results if "results" in locals() else None
1465 search_iterations = (
1466 results.get("iterations", 0) if partial_results else 0
1467 )
1469 # Generate comprehensive error report
1470 error_generator = ErrorReportGenerator(llm)
1471 enhanced_report_content = error_generator.generate_error_report(
1472 error_message=f"Research failed: {e!s}",
1473 query=query,
1474 partial_results=partial_results,
1475 search_iterations=search_iterations,
1476 research_id=research_id,
1477 )
1479 logger.info(
1480 "Generated enhanced error report for failed research (length: %d)",
1481 len(enhanced_report_content),
1482 )
1484 # Save enhanced error report to encrypted database
1485 try:
1486 # Get username from the research context
1487 username = getattr(g, "username", None) or session.get(
1488 "username"
1489 )
1490 if username: 1490 ↛ 1512line 1490 didn't jump to line 1512 because the condition on line 1490 was always true
1491 from ...storage import get_report_storage
1493 with get_user_db_session(username) as db_session:
1494 storage = get_report_storage(session=db_session)
1495 success = storage.save_report(
1496 research_id=research_id,
1497 content=enhanced_report_content,
1498 metadata={"error_report": True},
1499 username=username,
1500 )
1501 if success: 1501 ↛ 1507line 1501 didn't jump to line 1507 because the condition on line 1501 was always true
1502 logger.info(
1503 "Saved enhanced error report to encrypted database for research %s",
1504 research_id,
1505 )
1506 else:
1507 logger.warning(
1508 "Failed to save enhanced error report to database for research %s",
1509 research_id,
1510 )
1511 else:
1512 logger.warning(
1513 "Cannot save error report: username not available"
1514 )
1516 except Exception as report_error:
1517 logger.exception(
1518 "Failed to save enhanced error report: %s", report_error
1519 )
1521 except Exception as error_gen_error:
1522 logger.exception(
1523 "Failed to generate enhanced error report: %s",
1524 error_gen_error,
1525 )
1526 enhanced_report_content = None
1528 # Get existing metadata from database first
1529 existing_metadata = {}
1530 try:
1531 # Get username from the research context
1532 username = getattr(g, "username", None) or session.get(
1533 "username"
1534 )
1535 if username: 1535 ↛ 1548line 1535 didn't jump to line 1548 because the condition on line 1535 was always true
1536 with get_user_db_session(username) as db_session:
1537 research = (
1538 db_session.query(ResearchHistory)
1539 .filter_by(id=research_id)
1540 .first()
1541 )
1542 if research and research.research_meta: 1542 ↛ 1548line 1542 didn't jump to line 1548
1543 existing_metadata = dict(research.research_meta)
1544 except Exception:
1545 logger.exception("Failed to get existing metadata")
1547 # Update metadata with more context about the error while preserving existing values
1548 metadata = existing_metadata
1549 metadata.update({"phase": "error", "error": user_friendly_error})
1550 if error_context: 1550 ↛ 1551line 1550 didn't jump to line 1551 because the condition on line 1550 was never true
1551 metadata.update(error_context)
1552 if enhanced_report_content: 1552 ↛ 1556line 1552 didn't jump to line 1556 because the condition on line 1552 was always true
1553 metadata["has_enhanced_report"] = True
1555 # If we still have an active research record, update its log
1556 if research_id in active_research:
1557 progress_callback(user_friendly_error, None, metadata)
1559 # If termination was requested, mark as suspended instead of failed
1560 status = (
1561 "suspended"
1562 if (termination_flags.get(research_id))
1563 else "failed"
1564 )
1565 message = (
1566 "Research was terminated by user"
1567 if status == "suspended"
1568 else user_friendly_error
1569 )
1571 # Calculate duration up to termination point - using UTC consistently
1572 now = datetime.now(UTC)
1573 completed_at = now.isoformat()
1575 # NOTE: Database updates from threads are handled by queue processor
1576 # The queue_processor.queue_error_update() method is already being used below
1577 # to safely update the database from the main thread
1579 # Queue the error update to be processed in main thread
1580 # Using the queue processor v2 system
1581 from ..queue.processor_v2 import queue_processor
1583 if username: 1583 ↛ 1597line 1583 didn't jump to line 1597 because the condition on line 1583 was always true
1584 queue_processor.queue_error_update(
1585 username=username,
1586 research_id=research_id,
1587 status=status,
1588 error_message=message,
1589 metadata=metadata,
1590 completed_at=completed_at,
1591 report_path=None,
1592 )
1593 logger.info(
1594 f"Queued error update for research {research_id} with status '{status}'"
1595 )
1596 else:
1597 logger.error(
1598 f"Cannot queue error update for research {research_id} - no username provided. "
1599 f"Status: '{status}', Message: {message}"
1600 )
1602 try:
1603 SocketIOService().emit_to_subscribers(
1604 "research_progress",
1605 research_id,
1606 {"status": status, "error": message},
1607 )
1608 except Exception:
1609 logger.exception("Failed to emit error via socket")
1611 except Exception:
1612 logger.exception("Error in error handler")
1614 # Clean up resources
1615 cleanup_research_resources(
1616 research_id, active_research, termination_flags, username
1617 )
1620def cleanup_research_resources(
1621 research_id, active_research, termination_flags, username=None
1622):
1623 """
1624 Clean up resources for a completed research.
1626 Args:
1627 research_id: The ID of the research
1628 active_research: Dictionary of active research processes
1629 termination_flags: Dictionary of termination flags
1630 username: The username for database access (required for thread context)
1631 """
1632 logger.info("Cleaning up resources for research %s", research_id)
1634 # For testing: Add a small delay to simulate research taking time
1635 # This helps test concurrent research limits
1636 from ...settings.env_registry import is_test_mode
1638 if is_test_mode(): 1638 ↛ 1639line 1638 didn't jump to line 1639 because the condition on line 1638 was never true
1639 import time
1641 logger.info(
1642 f"Test mode: Adding 5 second delay before cleanup for {research_id}"
1643 )
1644 time.sleep(5)
1646 # Get the current status from the database to determine the final status message
1647 current_status = "completed" # Default
1649 # NOTE: Queue processor already handles database updates from the main thread
1650 # The notify_research_completed() method is called at the end of this function
1651 # which safely updates the database status
1653 # Notify queue processor that research completed
1654 # This uses processor_v2 which handles database updates in the main thread
1655 # avoiding the Flask request context issues that occur in background threads
1656 from ..queue.processor_v2 import queue_processor
1658 if username:
1659 queue_processor.notify_research_completed(username, research_id)
1660 logger.info(
1661 f"Notified queue processor of completion for research {research_id} (user: {username})"
1662 )
1663 else:
1664 logger.warning(
1665 f"Cannot notify completion for research {research_id} - no username provided"
1666 )
1668 # Remove from active research
1669 if research_id in active_research:
1670 del active_research[research_id]
1672 # Remove from termination flags
1673 if research_id in termination_flags:
1674 del termination_flags[research_id]
1676 # Send a final message to subscribers
1677 try:
1678 # Import here to avoid circular imports
1679 from ..routes.globals import get_globals
1681 globals_dict = get_globals()
1682 socket_subscriptions = globals_dict.get("socket_subscriptions", {})
1684 # Send a final message to any remaining subscribers with explicit status
1685 if socket_subscriptions.get(research_id): 1685 ↛ 1687line 1685 didn't jump to line 1687 because the condition on line 1685 was never true
1686 # Use the proper status message based on database status
1687 if current_status == "suspended" or current_status == "failed":
1688 final_message = {
1689 "status": current_status,
1690 "message": f"Research was {current_status}",
1691 "progress": 0, # For suspended research, show 0% not 100%
1692 }
1693 else:
1694 final_message = {
1695 "status": "completed",
1696 "message": "Research process has ended and resources have been cleaned up",
1697 "progress": 100,
1698 }
1700 logger.info(
1701 "Sending final %s socket message for research %s",
1702 current_status,
1703 research_id,
1704 )
1706 SocketIOService().emit_to_subscribers(
1707 "research_progress", research_id, final_message
1708 )
1710 except Exception:
1711 logger.exception("Error sending final cleanup message")
1714def handle_termination(
1715 research_id, active_research, termination_flags, username=None
1716):
1717 """
1718 Handle the termination of a research process.
1720 Args:
1721 research_id: The ID of the research
1722 active_research: Dictionary of active research processes
1723 termination_flags: Dictionary of termination flags
1724 username: The username for database access (required for thread context)
1725 """
1726 logger.info(f"Handling termination for research {research_id}")
1728 # Queue the status update to be processed in the main thread
1729 # This avoids Flask request context errors in background threads
1730 try:
1731 from ..queue.processor_v2 import queue_processor
1733 now = datetime.now(UTC)
1734 completed_at = now.isoformat()
1736 # Queue the suspension update
1737 queue_processor.queue_error_update(
1738 username=username,
1739 research_id=research_id,
1740 status="suspended",
1741 error_message="Research was terminated by user",
1742 metadata={"terminated_at": completed_at},
1743 completed_at=completed_at,
1744 report_path=None,
1745 )
1747 logger.info(f"Queued suspension update for research {research_id}")
1748 except Exception:
1749 logger.exception(
1750 f"Error queueing termination update for research {research_id}"
1751 )
1753 # Clean up resources (this already handles things properly)
1754 cleanup_research_resources(
1755 research_id, active_research, termination_flags, username
1756 )
1759def cancel_research(research_id, username=None):
1760 """
1761 Cancel/terminate a research process using ORM.
1763 Args:
1764 research_id: The ID of the research to cancel
1765 username: The username of the user cancelling the research (optional, will try to get from session if not provided)
1767 Returns:
1768 bool: True if the research was found and cancelled, False otherwise
1769 """
1770 try:
1771 # Import globals from research routes
1772 from ..routes.globals import get_globals
1774 globals_dict = get_globals()
1775 active_research = globals_dict["active_research"]
1776 termination_flags = globals_dict["termination_flags"]
1778 # Set termination flag
1779 termination_flags[research_id] = True
1781 # Check if the research is active
1782 if research_id in active_research: 1782 ↛ 1791line 1782 didn't jump to line 1791 because the condition on line 1782 was always true
1783 # Call handle_termination to update database
1784 handle_termination(
1785 research_id, active_research, termination_flags, username
1786 )
1787 return True
1788 else:
1789 # Update database directly if not found in active_research
1790 # Get username from parameter or session
1791 if not username:
1792 from flask import session
1794 username = session.get("username")
1796 if not username:
1797 logger.warning(
1798 f"No username available for cancelling research {research_id}"
1799 )
1800 return False
1802 try:
1803 with get_user_db_session(username) as db_session:
1804 research = (
1805 db_session.query(ResearchHistory)
1806 .filter_by(id=research_id)
1807 .first()
1808 )
1809 if not research:
1810 logger.info(
1811 f"Research {research_id} not found in database"
1812 )
1813 return False
1815 # Check if already completed or suspended
1816 if research.status in ["completed", "suspended", "error"]:
1817 logger.info(
1818 f"Research {research_id} already in terminal state: {research.status}"
1819 )
1820 return True # Consider this a success since it's already stopped
1822 # If it exists but isn't in active_research, still update status
1823 research.status = "suspended"
1824 db_session.commit()
1825 logger.info(
1826 f"Successfully suspended research {research_id}"
1827 )
1828 except Exception as e:
1829 logger.exception(
1830 f"Error accessing database for research {research_id}: {e}"
1831 )
1832 return False
1834 return True
1835 except Exception as e:
1836 logger.exception(
1837 f"Unexpected error in cancel_research for {research_id}: {e}"
1838 )
1839 return False