Coverage for src / local_deep_research / web / services / research_service.py: 86%
669 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import hashlib
2import json
3import threading
4import time
5from datetime import datetime, UTC
6from pathlib import Path
8from loguru import logger
10from ...exceptions import ResearchTerminatedException
11from ...config.llm_config import get_llm
12from ...settings.manager import SnapshotSettingsContext
14# Output directory for research results
15from ...config.paths import get_research_outputs_directory
16from ...config.search_config import get_search
17from ...constants import ResearchStatus
18from ...database.models import ResearchHistory, ResearchStrategy
19from ...database.session_context import get_user_db_session
20from ...database.thread_local_session import thread_cleanup
21from ...error_handling.report_generator import ErrorReportGenerator
22from ...utilities.thread_context import set_search_context
23from ...report_generator import IntegratedReportGenerator
24from ...search_system import AdvancedSearchSystem
25from ...text_optimization import CitationFormatter, CitationMode
26from ...utilities.log_utils import log_for_research
27from ...utilities.search_utilities import extract_links_from_search_results
28from ...utilities.threading_utils import thread_context, thread_with_app_context
29from ..models.database import calculate_duration
30from ...settings.env_registry import get_env_setting
31from .socket_service import SocketIOService
33OUTPUT_DIR = get_research_outputs_directory()
36# Global concurrent research limit (server-wide, across all users)
37_MAX_GLOBAL_CONCURRENT = get_env_setting(
38 "server.max_concurrent_research", default=10
39)
40_global_research_semaphore = threading.Semaphore(_MAX_GLOBAL_CONCURRENT)
42# Socket.IO emission throttling: minimum interval between progress emissions per research
43_EMIT_THROTTLE_SECONDS = 0.2 # 200ms
44_EMIT_TTL_SECONDS = 3600 # 1 hour — evict stale entries from orphaned research
45_emit_cleanup_counter = 0
46_last_emit_times: dict[str, float] = {}
47_last_emit_lock = threading.Lock()
50def _parse_research_metadata(research_meta) -> dict:
51 """Parse research_meta into a dict, handling both dict and JSON string types."""
52 if isinstance(research_meta, dict):
53 return dict(research_meta)
54 if isinstance(research_meta, str):
55 try:
56 parsed = json.loads(research_meta)
57 return dict(parsed) if isinstance(parsed, dict) else {}
58 except json.JSONDecodeError:
59 logger.exception("Failed to parse research_meta as JSON")
60 return {}
61 return {}
64def get_citation_formatter():
65 """Get citation formatter with settings from thread context."""
66 # Import here to avoid circular imports
67 from ...config.search_config import get_setting_from_snapshot
69 citation_format = get_setting_from_snapshot(
70 "report.citation_format", "number_hyperlinks"
71 )
72 mode_map = {
73 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
74 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
75 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
76 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
77 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
78 }
79 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS)
80 return CitationFormatter(mode=mode)
83def export_report_to_memory(
84 markdown_content: str, format: str, title: str | None = None
85):
86 """
87 Export a markdown report to different formats in memory.
89 Uses the modular exporter registry to support multiple formats.
90 Available formats can be queried with ExporterRegistry.get_available_formats().
92 Args:
93 markdown_content: The markdown content to export
94 format: Export format (e.g., 'pdf', 'odt', 'latex', 'quarto', 'ris')
95 title: Optional title for the document
97 Returns:
98 Tuple of (content_bytes, filename, mimetype)
99 """
100 from ...exporters import ExporterRegistry, ExportOptions
102 # Normalize format
103 format_lower = format.lower()
105 # Get exporter from registry
106 exporter = ExporterRegistry.get_exporter(format_lower)
108 if exporter is None:
109 available = ExporterRegistry.get_available_formats()
110 raise ValueError(
111 f"Unsupported export format: {format}. "
112 f"Available formats: {', '.join(available)}"
113 )
115 # Title prepending is now handled by each exporter via _prepend_title_if_needed()
116 # PDF and ODT exporters prepend titles; RIS and other formats ignore them
118 # Create options
119 options = ExportOptions(title=title)
121 # Export
122 result = exporter.export(markdown_content, options)
124 logger.info(
125 f"Generated {format_lower} in memory, size: {len(result.content)} bytes"
126 )
128 return result.content, result.filename, result.mimetype
131def save_research_strategy(research_id, strategy_name, username=None):
132 """
133 Save the strategy used for a research to the database.
135 Args:
136 research_id: The ID of the research
137 strategy_name: The name of the strategy used
138 username: The username for database access (required for thread context)
139 """
140 try:
141 logger.debug(
142 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}"
143 )
144 with get_user_db_session(username) as session:
145 # Check if a strategy already exists for this research
146 existing_strategy = (
147 session.query(ResearchStrategy)
148 .filter_by(research_id=research_id)
149 .first()
150 )
152 if existing_strategy:
153 # Update existing strategy
154 existing_strategy.strategy_name = strategy_name
155 logger.debug(
156 f"Updating existing strategy for research {research_id}"
157 )
158 else:
159 # Create new strategy record
160 new_strategy = ResearchStrategy(
161 research_id=research_id, strategy_name=strategy_name
162 )
163 session.add(new_strategy)
164 logger.debug(
165 f"Creating new strategy record for research {research_id}"
166 )
168 session.commit()
169 logger.info(
170 f"Saved strategy '{strategy_name}' for research {research_id}"
171 )
172 except Exception:
173 logger.exception("Error saving research strategy")
176def get_research_strategy(research_id, username=None):
177 """
178 Get the strategy used for a research.
180 Args:
181 research_id: The ID of the research
182 username: The username for database access (required for thread context)
184 Returns:
185 str: The strategy name or None if not found
186 """
187 try:
188 with get_user_db_session(username) as session:
189 strategy = (
190 session.query(ResearchStrategy)
191 .filter_by(research_id=research_id)
192 .first()
193 )
195 return strategy.strategy_name if strategy else None
196 except Exception:
197 logger.exception("Error getting research strategy")
198 return None
201def start_research_process(
202 research_id,
203 query,
204 mode,
205 run_research_callback,
206 **kwargs,
207):
208 """
209 Start a research process in a background thread.
211 Args:
212 research_id: The ID of the research
213 query: The research query
214 mode: The research mode (quick/detailed)
215 run_research_callback: The callback function to run the research
216 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.)
218 Returns:
219 threading.Thread: The thread running the research
220 """
221 from ..routes.globals import set_active_research
223 # Pass the app context to the thread.
224 run_research_callback = thread_with_app_context(run_research_callback)
226 # Wrap callback with global concurrency limiter
227 original_callback = run_research_callback
229 def _rate_limited_callback(*args, **kw):
230 _global_research_semaphore.acquire()
231 try:
232 return original_callback(*args, **kw)
233 finally:
234 _global_research_semaphore.release()
236 # Start research process in a background thread
237 thread = threading.Thread(
238 target=_rate_limited_callback,
239 args=(
240 thread_context(),
241 research_id,
242 query,
243 mode,
244 ),
245 kwargs=kwargs,
246 )
247 thread.daemon = True
248 thread.start()
250 set_active_research(
251 research_id,
252 {
253 "thread": thread,
254 "progress": 0,
255 "status": ResearchStatus.IN_PROGRESS,
256 "log": [],
257 "settings": kwargs, # Store settings for reference
258 },
259 )
261 return thread
264def _generate_report_path(query: str) -> Path:
265 """
266 Generates a path for a new report file based on the query.
268 Args:
269 query: The query used for the report.
271 Returns:
272 The path that it generated.
274 """
275 # Generate a unique filename that does not contain
276 # non-alphanumeric characters.
277 query_hash = hashlib.md5( # DevSkim: ignore DS126858
278 query.encode("utf-8"), usedforsecurity=False
279 ).hexdigest()[:10]
280 return OUTPUT_DIR / (
281 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md"
282 )
285@log_for_research
286@thread_cleanup
287def run_research_process(research_id, query, mode, **kwargs):
288 """
289 Run the research process in the background for a given research ID.
291 Args:
292 research_id: The ID of the research
293 query: The research query
294 mode: The research mode (quick/detailed)
295 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.)
296 MUST include 'username' for database access
297 """
298 from ..routes.globals import (
299 is_research_active,
300 is_termination_requested,
301 update_progress_and_check_active,
302 )
304 # Extract username - required for database access
305 username = kwargs.get("username")
306 logger.info(f"Research thread started with username: {username}")
307 if not username:
308 logger.error("No username provided to research thread")
309 raise ValueError("Username is required for research process")
310 # Extract user_password early so it's available for all cleanup paths
311 user_password = kwargs.get("user_password")
313 try:
314 # Check if this research has been terminated before we even start
315 if is_termination_requested(research_id):
316 logger.info(
317 f"Research {research_id} was terminated before starting"
318 )
319 cleanup_research_resources(
320 research_id, username, user_password=user_password
321 )
322 return
324 logger.info(
325 f"Starting research process for ID {research_id}, query: {query}"
326 )
328 # Extract key parameters
329 model_provider = kwargs.get("model_provider")
330 model = kwargs.get("model")
331 custom_endpoint = kwargs.get("custom_endpoint")
332 search_engine = kwargs.get("search_engine")
333 max_results = kwargs.get("max_results")
334 time_period = kwargs.get("time_period")
335 iterations = kwargs.get("iterations")
336 questions_per_iteration = kwargs.get("questions_per_iteration")
337 strategy = kwargs.get(
338 "strategy", "source-based"
339 ) # Default to source-based
340 settings_snapshot = kwargs.get(
341 "settings_snapshot", {}
342 ) # Complete settings snapshot
344 # Log settings snapshot to debug
345 from ...settings.logger import log_settings
347 log_settings(settings_snapshot, "Settings snapshot received in thread")
349 # Strategy should already be saved in the database before thread starts
350 logger.info(f"Research strategy: {strategy}")
352 # Log all parameters for debugging
353 logger.info(
354 f"Research parameters: provider={model_provider}, model={model}, "
355 f"search_engine={search_engine}, max_results={max_results}, "
356 f"time_period={time_period}, iterations={iterations}, "
357 f"questions_per_iteration={questions_per_iteration}, "
358 f"custom_endpoint={custom_endpoint}, strategy={strategy}"
359 )
361 # Set up the AI Context Manager
362 output_dir = OUTPUT_DIR / f"research_{research_id}"
363 output_dir.mkdir(parents=True, exist_ok=True)
365 # Create a settings context that uses snapshot - no database access in threads
366 settings_context = SnapshotSettingsContext(
367 settings_snapshot, username=username
368 )
370 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var
371 from ...settings.logger import log_settings
373 log_settings(
374 settings_context.values, "SettingsContext values extracted"
375 )
377 # Set the settings context for this thread
378 from ...config.thread_settings import (
379 set_settings_context,
380 )
382 set_settings_context(settings_context)
384 # user_password already extracted above (before termination check)
386 # Create shared research context that can be updated during research
387 shared_research_context = {
388 "research_id": research_id,
389 "research_query": query,
390 "research_mode": mode,
391 "research_phase": "init",
392 "search_iteration": 0,
393 "search_engines_planned": None,
394 "search_engine_selected": search_engine,
395 "username": username, # Add username for queue operations
396 "user_password": user_password, # Add password for metrics access
397 }
399 # If this is a follow-up research, include the parent context
400 if "research_context" in kwargs and kwargs["research_context"]:
401 logger.info(
402 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings"
403 )
404 shared_research_context.update(kwargs["research_context"])
406 # Do not log context keys as they may contain sensitive information
407 logger.info(f"Created shared_research_context for user: {username}")
409 # Set search context for search tracking
410 set_search_context(shared_research_context)
412 # Set up progress callback
413 def progress_callback(message, progress_percent, metadata):
414 # Frequent termination check
415 if is_termination_requested(research_id):
416 handle_termination(research_id, username)
417 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try
418 "Research was terminated by user"
419 )
421 # Silent phase — no UI logging or socket emission needed
422 if metadata.get("phase") == "termination_check":
423 return
425 # Bind research_id to logger for this specific log
426 bound_logger = logger.bind(research_id=research_id)
427 bound_logger.log("MILESTONE", message)
429 if "SEARCH_PLAN:" in message: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true
430 engines = message.split("SEARCH_PLAN:")[1].strip()
431 metadata["planned_engines"] = engines
432 metadata["phase"] = "search_planning" # Use existing phase
433 # Update shared context for token tracking
434 shared_research_context["search_engines_planned"] = engines
435 shared_research_context["research_phase"] = "search_planning"
437 if "ENGINE_SELECTED:" in message: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 engine = message.split("ENGINE_SELECTED:")[1].strip()
439 metadata["selected_engine"] = engine
440 metadata["phase"] = "search" # Use existing 'search' phase
441 # Update shared context for token tracking
442 shared_research_context["search_engine_selected"] = engine
443 shared_research_context["research_phase"] = "search"
445 # Capture other research phases for better context tracking
446 if metadata.get("phase"): 446 ↛ 450line 446 didn't jump to line 450 because the condition on line 446 was always true
447 shared_research_context["research_phase"] = metadata["phase"]
449 # Update search iteration if available
450 if "iteration" in metadata:
451 shared_research_context["search_iteration"] = metadata[
452 "iteration"
453 ]
455 # Adjust progress based on research mode
456 adjusted_progress = progress_percent
457 if (
458 mode == "detailed"
459 and metadata.get("phase") == "output_generation"
460 ):
461 # For detailed mode, adjust the progress range for output generation
462 adjusted_progress = min(80, progress_percent)
463 elif (
464 mode == "detailed"
465 and metadata.get("phase") == "report_generation"
466 ):
467 # Scale the progress from 80% to 95% for the report generation phase
468 if progress_percent is not None: 468 ↛ 482line 468 didn't jump to line 482 because the condition on line 468 was always true
469 normalized = progress_percent / 100
470 adjusted_progress = 80 + (normalized * 15)
471 elif (
472 mode == "quick" and metadata.get("phase") == "output_generation"
473 ):
474 # For quick mode, ensure we're at least at 85% during output generation
475 adjusted_progress = max(85, progress_percent)
476 # Map any further progress within output_generation to 85-95% range
477 if progress_percent is not None and progress_percent > 0: 477 ↛ 482line 477 didn't jump to line 482 because the condition on line 477 was always true
478 normalized = progress_percent / 100
479 adjusted_progress = 85 + (normalized * 10)
481 # Atomically update progress and check if research is still active
482 if adjusted_progress is not None:
483 adjusted_progress, still_active = (
484 update_progress_and_check_active(
485 research_id, adjusted_progress
486 )
487 )
488 else:
489 still_active = is_research_active(research_id)
491 if still_active:
492 # Queue the progress update to be processed in main thread
493 if adjusted_progress is not None:
494 from ..queue.processor_v2 import queue_processor
496 if username: 496 ↛ 501line 496 didn't jump to line 501 because the condition on line 496 was always true
497 queue_processor.queue_progress_update(
498 username, research_id, adjusted_progress
499 )
500 else:
501 logger.warning(
502 f"Cannot queue progress update for research {research_id} - no username available"
503 )
505 # Emit a socket event (throttled to avoid event storms)
506 try:
507 # Always emit completion/error states immediately;
508 # throttle intermediate progress updates
509 phase = metadata.get("phase", "")
510 is_final = (
511 phase
512 in (
513 "complete",
514 "error",
515 "report_complete",
516 )
517 or adjusted_progress == 100
518 )
520 should_emit = is_final
521 if not is_final:
522 now = time.monotonic()
523 with _last_emit_lock:
524 last = _last_emit_times.get(research_id, 0)
525 if now - last >= _EMIT_THROTTLE_SECONDS:
526 _last_emit_times[research_id] = now
527 should_emit = True
528 # Periodic TTL cleanup for orphaned entries
529 global _emit_cleanup_counter # noqa: PLW0603
530 _emit_cleanup_counter += 1
531 if _emit_cleanup_counter % 100 == 0:
532 stale = [
533 rid
534 for rid, t in _last_emit_times.items()
535 if now - t > _EMIT_TTL_SECONDS
536 ]
537 for rid in stale: 537 ↛ 538line 537 didn't jump to line 538 because the loop on line 537 never started
538 del _last_emit_times[rid]
540 if should_emit:
541 # Basic event data - include message for display
542 event_data = {
543 "progress": adjusted_progress,
544 "message": message,
545 "phase": phase,
546 }
548 # Include additional metadata for MCP/ReAct strategy display
549 if metadata.get("thought"): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true
550 event_data["thought"] = metadata["thought"]
551 if metadata.get("tool"): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 event_data["tool"] = metadata["tool"]
553 if metadata.get("arguments"): 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true
554 event_data["arguments"] = metadata["arguments"]
555 if metadata.get("iteration"): 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true
556 event_data["iteration"] = metadata["iteration"]
557 if metadata.get("error"):
558 event_data["error"] = metadata["error"]
559 if metadata.get("content"): 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 event_data["content"] = metadata["content"]
562 SocketIOService().emit_to_subscribers(
563 "progress", research_id, event_data
564 )
565 except Exception:
566 logger.exception("Socket emit error (non-critical)")
568 # Function to check termination during long-running operations
569 def check_termination():
570 if is_termination_requested(research_id):
571 handle_termination(research_id, username)
572 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try
573 "Research was terminated by user during long-running operation"
574 )
575 return False # Not terminated
577 # Configure the system with the specified parameters
578 use_llm = None
579 if model or search_engine or model_provider:
580 # Log that we're overriding system settings
581 logger.info(
582 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}"
583 )
585 # Override LLM if model or model_provider specified
586 if model or model_provider:
587 try:
588 # Get LLM with the overridden settings
589 # Use the shared_research_context which includes username
590 use_llm = get_llm(
591 model_name=model,
592 provider=model_provider,
593 openai_endpoint_url=custom_endpoint,
594 research_id=research_id,
595 research_context=shared_research_context,
596 )
598 logger.info(
599 f"Successfully set LLM to: provider={model_provider}, model={model}"
600 )
601 except Exception as e:
602 logger.exception(
603 f"Error setting LLM provider={model_provider}, model={model}"
604 )
605 error_msg = str(e)
606 # Surface configuration errors to user instead of silently continuing
607 config_error_keywords = [
608 "model path",
609 "llamacpp",
610 "cannot connect",
611 "server",
612 "not configured",
613 "not responding",
614 "directory",
615 ".gguf",
616 ]
617 if any(
618 keyword in error_msg.lower()
619 for keyword in config_error_keywords
620 ):
621 # This is a configuration error the user can fix
622 raise ValueError(
623 f"LLM Configuration Error: {error_msg}"
624 ) from e
625 # For other errors, re-raise to avoid silent failures
626 raise
628 # Create search engine first if specified, to avoid default creation without username
629 use_search = None
630 if search_engine:
631 try:
632 # Create a new search object with these settings
633 use_search = get_search(
634 search_tool=search_engine,
635 llm_instance=use_llm,
636 username=username,
637 settings_snapshot=settings_snapshot,
638 )
639 logger.info(
640 f"Successfully created search engine: {search_engine}"
641 )
642 except Exception as e:
643 logger.exception(
644 f"Error creating search engine {search_engine}"
645 )
646 error_msg = str(e)
647 # Surface configuration errors to user instead of silently continuing
648 config_error_keywords = [
649 "searxng",
650 "instance_url",
651 "api_key",
652 "cannot connect",
653 "connection",
654 "timeout",
655 "not configured",
656 ]
657 if any(
658 keyword in error_msg.lower()
659 for keyword in config_error_keywords
660 ):
661 # This is a configuration error the user can fix
662 raise ValueError(
663 f"Search Engine Configuration Error ({search_engine}): {error_msg}"
664 ) from e
665 # For other errors, re-raise to avoid silent failures
666 raise
668 # Set the progress callback in the system
669 system = AdvancedSearchSystem(
670 llm=use_llm, # type: ignore[arg-type]
671 search=use_search, # type: ignore[arg-type]
672 strategy_name=strategy,
673 max_iterations=iterations,
674 questions_per_iteration=questions_per_iteration,
675 username=username,
676 settings_snapshot=settings_snapshot,
677 research_id=research_id,
678 research_context=shared_research_context,
679 )
680 system.set_progress_callback(progress_callback)
682 # Run the search
683 progress_callback("Starting research process", 5, {"phase": "init"})
685 try:
686 results = system.analyze_topic(query)
687 if mode == "quick":
688 progress_callback(
689 "Search complete, preparing to generate summary...",
690 85,
691 {"phase": "output_generation"},
692 )
693 else:
694 progress_callback(
695 "Search complete, generating output",
696 80,
697 {"phase": "output_generation"},
698 )
699 except Exception as search_error:
700 # Better handling of specific search errors
701 error_message = str(search_error)
702 error_type = "unknown"
704 # Extract error details for common issues
705 if "status code: 503" in error_message:
706 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system."
707 error_type = "ollama_unavailable"
708 elif "status code: 404" in error_message:
709 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model."
710 error_type = "model_not_found"
711 elif "status code:" in error_message:
712 # Extract the status code for other HTTP errors
713 status_code = error_message.split("status code:")[1].strip()
714 error_message = f"API request failed with status code {status_code}. Please check your configuration."
715 error_type = "api_error"
716 elif "connection" in error_message.lower():
717 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible."
718 error_type = "connection_error"
720 # Raise with improved error message
721 raise RuntimeError(
722 f"{error_message} (Error type: {error_type})"
723 ) from search_error
725 # Generate output based on mode
726 if mode == "quick":
727 # Quick Summary
728 if results.get("findings") or results.get("formatted_findings"):
729 raw_formatted_findings = results["formatted_findings"]
731 # Check if formatted_findings contains an error message
732 if isinstance(
733 raw_formatted_findings, str
734 ) and raw_formatted_findings.startswith("Error:"):
735 logger.exception(
736 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..."
737 )
739 # Determine error type for better user feedback
740 error_type = "unknown"
741 error_message = raw_formatted_findings.lower()
743 if (
744 "token limit" in error_message
745 or "context length" in error_message
746 ):
747 error_type = "token_limit"
748 # Log specific error type
749 logger.warning(
750 "Detected token limit error in synthesis"
751 )
753 # Update progress with specific error type
754 progress_callback(
755 "Synthesis hit token limits. Attempting fallback...",
756 87,
757 {
758 "phase": "synthesis_error",
759 "error_type": error_type,
760 },
761 )
762 elif (
763 "timeout" in error_message
764 or "timed out" in error_message
765 ):
766 error_type = "timeout"
767 logger.warning("Detected timeout error in synthesis")
768 progress_callback(
769 "Synthesis timed out. Attempting fallback...",
770 87,
771 {
772 "phase": "synthesis_error",
773 "error_type": error_type,
774 },
775 )
776 elif "rate limit" in error_message:
777 error_type = "rate_limit"
778 logger.warning("Detected rate limit error in synthesis")
779 progress_callback(
780 "LLM rate limit reached. Attempting fallback...",
781 87,
782 {
783 "phase": "synthesis_error",
784 "error_type": error_type,
785 },
786 )
787 elif (
788 "connection" in error_message
789 or "network" in error_message
790 ):
791 error_type = "connection"
792 logger.warning("Detected connection error in synthesis")
793 progress_callback(
794 "Connection issue with LLM. Attempting fallback...",
795 87,
796 {
797 "phase": "synthesis_error",
798 "error_type": error_type,
799 },
800 )
801 elif (
802 "llm error" in error_message
803 or "final answer synthesis fail" in error_message
804 ):
805 error_type = "llm_error"
806 logger.warning(
807 "Detected general LLM error in synthesis"
808 )
809 progress_callback(
810 "LLM error during synthesis. Attempting fallback...",
811 87,
812 {
813 "phase": "synthesis_error",
814 "error_type": error_type,
815 },
816 )
817 else:
818 # Generic error
819 logger.warning("Detected unknown error in synthesis")
820 progress_callback(
821 "Error during synthesis. Attempting fallback...",
822 87,
823 {
824 "phase": "synthesis_error",
825 "error_type": "unknown",
826 },
827 )
829 # Extract synthesized content from findings if available
830 synthesized_content = ""
831 for finding in results.get("findings", []):
832 if finding.get("phase") == "Final synthesis":
833 synthesized_content = finding.get("content", "")
834 break
836 # Use synthesized content as fallback
837 if (
838 synthesized_content
839 and not synthesized_content.startswith("Error:")
840 ):
841 logger.info(
842 "Using existing synthesized content as fallback"
843 )
844 raw_formatted_findings = synthesized_content
846 # Or use current_knowledge as another fallback
847 elif results.get("current_knowledge"):
848 logger.info("Using current_knowledge as fallback")
849 raw_formatted_findings = results["current_knowledge"]
851 # Or combine all finding contents as last resort
852 elif results.get("findings"):
853 logger.info("Combining all findings as fallback")
854 # First try to use any findings that are not errors
855 valid_findings = [
856 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
857 for finding in results.get("findings", [])
858 if finding.get("content")
859 and not finding.get("content", "").startswith(
860 "Error:"
861 )
862 ]
864 if valid_findings:
865 raw_formatted_findings = (
866 "# Research Results (Fallback Mode)\n\n"
867 )
868 raw_formatted_findings += "\n\n".join(
869 valid_findings
870 )
871 raw_formatted_findings += f"\n\n## Error Information\n{raw_formatted_findings}"
872 else:
873 # Last resort: use everything including errors
874 raw_formatted_findings = (
875 "# Research Results (Emergency Fallback)\n\n"
876 )
877 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n"
878 raw_formatted_findings += "\n\n".join(
879 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
880 for finding in results.get("findings", [])
881 if finding.get("content")
882 )
884 progress_callback(
885 f"Using fallback synthesis due to {error_type} error",
886 88,
887 {
888 "phase": "synthesis_fallback",
889 "error_type": error_type,
890 },
891 )
893 logger.info(
894 "Found formatted_findings of length: {}",
895 len(str(raw_formatted_findings)),
896 )
898 try:
899 # Check if we have an error in the findings and use enhanced error handling
900 if isinstance(
901 raw_formatted_findings, str
902 ) and raw_formatted_findings.startswith("Error:"):
903 logger.info(
904 "Generating enhanced error report using ErrorReportGenerator"
905 )
907 # Generate comprehensive error report
908 # ErrorReportGenerator does not use LLM (kept for compat)
909 error_generator = ErrorReportGenerator()
910 clean_markdown = error_generator.generate_error_report(
911 error_message=raw_formatted_findings,
912 query=query,
913 partial_results=results,
914 search_iterations=results.get("iterations", 0),
915 research_id=research_id,
916 )
918 logger.info(
919 "Generated enhanced error report with {} characters",
920 len(clean_markdown),
921 )
922 else:
923 # Get the synthesized content from the LLM directly
924 clean_markdown = raw_formatted_findings
926 # Extract all sources from findings to add them to the summary
927 all_links = []
928 for finding in results.get("findings", []):
929 search_results = finding.get("search_results", [])
930 if search_results:
931 try:
932 links = extract_links_from_search_results(
933 search_results
934 )
935 all_links.extend(links)
936 except Exception:
937 logger.exception(
938 "Error processing search results/links"
939 )
941 logger.info(
942 "Successfully converted to clean markdown of length: {}",
943 len(clean_markdown),
944 )
946 # First send a progress update for generating the summary
947 progress_callback(
948 "Generating clean summary from research data...",
949 90,
950 {"phase": "output_generation"},
951 )
953 # Send progress update for saving report
954 progress_callback(
955 "Saving research report to database...",
956 95,
957 {"phase": "report_complete"},
958 )
960 # Format citations in the markdown content
961 formatter = get_citation_formatter()
962 formatted_content = formatter.format_document(
963 clean_markdown
964 )
966 # Prepare complete report content
967 full_report_content = f"""{formatted_content}
969## Research Metrics
970- Search Iterations: {results["iterations"]}
971- Generated at: {datetime.now(UTC).isoformat()}
972"""
974 # Save sources to database (non-fatal - report should still
975 # be saved even if source saving fails)
976 try:
977 from .research_sources_service import (
978 ResearchSourcesService,
979 )
981 sources_service = ResearchSourcesService()
982 if all_links:
983 logger.info(
984 f"Quick summary: Saving {len(all_links)} sources to database"
985 )
986 sources_saved = (
987 sources_service.save_research_sources(
988 research_id=research_id,
989 sources=all_links,
990 username=username,
991 )
992 )
993 logger.info(
994 f"Quick summary: Saved {sources_saved} sources for research {research_id}"
995 )
996 except Exception:
997 logger.exception(
998 f"Failed to save sources for research {research_id} (continuing with report save)"
999 )
1001 # Save report using storage abstraction
1002 from ...storage import get_report_storage
1004 with get_user_db_session(username) as db_session:
1005 storage = get_report_storage(session=db_session)
1007 # Prepare metadata
1008 metadata = {
1009 "iterations": results["iterations"],
1010 "generated_at": datetime.now(UTC).isoformat(),
1011 }
1013 # Save report using storage abstraction
1014 success = storage.save_report(
1015 research_id=research_id,
1016 content=full_report_content,
1017 metadata=metadata,
1018 username=username,
1019 )
1021 if not success:
1022 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except
1024 logger.info(
1025 f"Report saved for research_id: {research_id}"
1026 )
1028 # Skip export to additional formats - we're storing in database only
1030 # Update research status in database
1031 completed_at = datetime.now(UTC).isoformat()
1033 with get_user_db_session(username) as db_session:
1034 research = (
1035 db_session.query(ResearchHistory)
1036 .filter_by(id=research_id)
1037 .first()
1038 )
1040 # Preserve existing metadata and update with new values
1041 metadata = _parse_research_metadata(
1042 research.research_meta
1043 )
1045 metadata.update(
1046 {
1047 "iterations": results["iterations"],
1048 "generated_at": datetime.now(UTC).isoformat(),
1049 }
1050 )
1052 # Use the helper function for consistent duration calculation
1053 duration_seconds = calculate_duration(
1054 research.created_at, completed_at
1055 )
1057 research.status = ResearchStatus.COMPLETED
1058 research.completed_at = completed_at
1059 research.duration_seconds = duration_seconds
1060 # Note: report_content is saved by CachedResearchService
1061 # report_path is not used in encrypted database version
1063 # Generate headline and topics only for news searches
1064 if (
1065 metadata.get("is_news_search")
1066 or metadata.get("search_type") == "news_analysis"
1067 ):
1068 try:
1069 from ...news.utils.headline_generator import (
1070 generate_headline,
1071 )
1072 from ...news.utils.topic_generator import (
1073 generate_topics,
1074 )
1076 # Get the report content from database for better headline/topic generation
1077 report_content = ""
1078 try:
1079 research = (
1080 db_session.query(ResearchHistory)
1081 .filter_by(id=research_id)
1082 .first()
1083 )
1084 if research and research.report_content: 1084 ↛ 1090line 1084 didn't jump to line 1090 because the condition on line 1084 was always true
1085 report_content = research.report_content
1086 logger.info(
1087 f"Retrieved {len(report_content)} chars from database for headline generation"
1088 )
1089 else:
1090 logger.warning(
1091 f"No report content found in database for research_id: {research_id}"
1092 )
1093 except Exception:
1094 logger.warning(
1095 "Could not retrieve report content from database"
1096 )
1098 # Generate headline
1099 logger.info(
1100 f"Generating headline for query: {query[:100]}"
1101 )
1102 headline = generate_headline(
1103 query, report_content
1104 )
1105 metadata["generated_headline"] = headline
1107 # Generate topics
1108 logger.info(
1109 f"Generating topics with category: {metadata.get('category', 'News')}"
1110 )
1111 topics = generate_topics(
1112 query=query,
1113 findings=report_content,
1114 category=metadata.get("category", "News"),
1115 max_topics=6,
1116 )
1117 metadata["generated_topics"] = topics
1119 logger.info(f"Generated headline: {headline}")
1120 logger.info(f"Generated topics: {topics}")
1122 except Exception:
1123 logger.warning(
1124 "Could not generate headline/topics"
1125 )
1127 research.research_meta = metadata
1129 db_session.commit()
1130 logger.info(
1131 f"Database commit completed for research_id: {research_id}"
1132 )
1134 # Update subscription if this was triggered by a subscription
1135 if metadata.get("subscription_id"):
1136 try:
1137 from ...news.subscription_manager.storage import (
1138 SQLSubscriptionStorage,
1139 )
1140 from datetime import (
1141 datetime as dt,
1142 timezone,
1143 timedelta,
1144 )
1146 sub_storage = SQLSubscriptionStorage(db_session)
1147 subscription_id = metadata["subscription_id"]
1149 # Get subscription to find refresh interval
1150 subscription = sub_storage.get(subscription_id)
1151 if subscription: 1151 ↛ 1180line 1151 didn't jump to line 1180
1152 refresh_minutes = subscription.get(
1153 "refresh_minutes", 240
1154 )
1155 now = dt.now(timezone.utc)
1156 next_refresh = now + timedelta(
1157 minutes=refresh_minutes
1158 )
1160 # Update refresh times
1161 sub_storage.update_refresh_time(
1162 subscription_id=subscription_id,
1163 last_refresh=now,
1164 next_refresh=next_refresh,
1165 )
1167 # Increment stats
1168 sub_storage.increment_stats(
1169 subscription_id, 1
1170 )
1172 logger.info(
1173 f"Updated subscription {subscription_id} refresh times"
1174 )
1175 except Exception:
1176 logger.warning(
1177 "Could not update subscription refresh time"
1178 )
1180 logger.info(
1181 f"Database updated successfully for research_id: {research_id}"
1182 )
1184 # Send the final completion message
1185 progress_callback(
1186 "Research completed successfully",
1187 100,
1188 {"phase": "complete"},
1189 )
1191 # Clean up resources
1192 logger.info(
1193 "Cleaning up resources for research_id: {}", research_id
1194 )
1195 cleanup_research_resources(
1196 research_id, username, user_password=user_password
1197 )
1198 logger.info(
1199 "Resources cleaned up for research_id: {}", research_id
1200 )
1202 except Exception as inner_e:
1203 logger.exception("Error during quick summary generation")
1204 raise RuntimeError(
1205 f"Error generating quick summary: {inner_e!s}"
1206 )
1207 else:
1208 raise RuntimeError( # noqa: TRY301 — triggers research failure handling in outer except
1209 "No research findings were generated. Please try again."
1210 )
1211 else:
1212 # Full Report
1213 progress_callback(
1214 "Generating detailed report...",
1215 85,
1216 {"phase": "report_generation"},
1217 )
1219 # Extract the search system from the results if available
1220 search_system = results.get("search_system", None)
1222 # Wrapper that maps report generator's 0-100% to 85-95% range
1223 # and relays cancellation checks through the outer progress_callback
1224 def report_progress_callback(message, progress_percent, metadata):
1225 if progress_percent is not None:
1226 adjusted = 85 + (progress_percent / 100) * 10
1227 else:
1228 adjusted = progress_percent
1229 progress_callback(message, adjusted, metadata)
1231 # Pass the existing search system to maintain citation indices
1232 report_generator = IntegratedReportGenerator(
1233 search_system=search_system,
1234 settings_snapshot=settings_snapshot,
1235 )
1236 final_report = report_generator.generate_report(
1237 results, query, progress_callback=report_progress_callback
1238 )
1240 progress_callback(
1241 "Report generation complete", 95, {"phase": "report_complete"}
1242 )
1244 # Format citations in the report content
1245 formatter = get_citation_formatter()
1246 formatted_content = formatter.format_document(
1247 final_report["content"]
1248 )
1250 # Save sources to database (non-fatal - report should still be saved
1251 # even if source saving fails, e.g. due to expired session password)
1252 try:
1253 from .research_sources_service import ResearchSourcesService
1255 sources_service = ResearchSourcesService()
1256 all_links = getattr(search_system, "all_links_of_system", None)
1257 if all_links:
1258 logger.info(f"Saving {len(all_links)} sources to database")
1259 sources_saved = sources_service.save_research_sources(
1260 research_id=research_id,
1261 sources=all_links,
1262 username=username,
1263 )
1264 logger.info(
1265 f"Saved {sources_saved} sources for research {research_id}"
1266 )
1267 except Exception:
1268 logger.exception(
1269 f"Failed to save sources for research {research_id} (continuing with report save)"
1270 )
1272 # Save report to database
1273 with get_user_db_session(username) as db_session:
1274 # Update metadata
1275 metadata = final_report["metadata"]
1276 metadata["iterations"] = results["iterations"]
1278 # Save report to database
1279 try:
1280 research = (
1281 db_session.query(ResearchHistory)
1282 .filter_by(id=research_id)
1283 .first()
1284 )
1286 if not research: 1286 ↛ 1287line 1286 didn't jump to line 1287 because the condition on line 1286 was never true
1287 logger.error(f"Research {research_id} not found")
1288 success = False
1289 else:
1290 research.report_content = formatted_content
1291 if research.research_meta: 1291 ↛ 1292line 1291 didn't jump to line 1292 because the condition on line 1291 was never true
1292 research.research_meta.update(metadata)
1293 else:
1294 research.research_meta = metadata
1295 db_session.commit()
1296 success = True
1297 logger.info(
1298 f"Saved report for research {research_id} to database"
1299 )
1300 except Exception:
1301 logger.exception("Error saving report to database")
1302 db_session.rollback()
1303 success = False
1305 if not success: 1305 ↛ 1306line 1305 didn't jump to line 1306 because the condition on line 1305 was never true
1306 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except
1308 logger.info(
1309 f"Report saved to database for research_id: {research_id}"
1310 )
1312 # Update research status in database
1313 completed_at = datetime.now(UTC).isoformat()
1315 with get_user_db_session(username) as db_session:
1316 research = (
1317 db_session.query(ResearchHistory)
1318 .filter_by(id=research_id)
1319 .first()
1320 )
1322 # Preserve existing metadata and merge with report metadata
1323 metadata = _parse_research_metadata(research.research_meta)
1325 metadata.update(final_report["metadata"])
1326 metadata["iterations"] = results["iterations"]
1328 # Use the helper function for consistent duration calculation
1329 duration_seconds = calculate_duration(
1330 research.created_at, completed_at
1331 )
1333 research.status = ResearchStatus.COMPLETED
1334 research.completed_at = completed_at
1335 research.duration_seconds = duration_seconds
1336 # Note: report_content is saved by CachedResearchService
1337 # report_path is not used in encrypted database version
1339 # Generate headline and topics only for news searches
1340 if ( 1340 ↛ 1344line 1340 didn't jump to line 1344 because the condition on line 1340 was never true
1341 metadata.get("is_news_search")
1342 or metadata.get("search_type") == "news_analysis"
1343 ):
1344 try:
1345 from ..news.utils.headline_generator import (
1346 generate_headline, # type: ignore[no-redef]
1347 )
1348 from ..news.utils.topic_generator import (
1349 generate_topics, # type: ignore[no-redef]
1350 )
1352 # Get the report content from database for better headline/topic generation
1353 report_content = ""
1354 try:
1355 research = (
1356 db_session.query(ResearchHistory)
1357 .filter_by(id=research_id)
1358 .first()
1359 )
1360 if research and research.report_content:
1361 report_content = research.report_content
1362 else:
1363 logger.warning(
1364 f"No report content found in database for research_id: {research_id}"
1365 )
1366 except Exception:
1367 logger.warning(
1368 "Could not retrieve report content from database"
1369 )
1371 # Generate headline
1372 headline = generate_headline(query, report_content)
1373 metadata["generated_headline"] = headline
1375 # Generate topics
1376 topics = generate_topics(
1377 query=query,
1378 findings=report_content,
1379 category=metadata.get("category", "News"),
1380 max_topics=6,
1381 )
1382 metadata["generated_topics"] = topics
1384 logger.info(f"Generated headline: {headline}")
1385 logger.info(f"Generated topics: {topics}")
1387 except Exception:
1388 logger.warning("Could not generate headline/topics")
1390 research.research_meta = metadata
1392 db_session.commit()
1394 # Update subscription if this was triggered by a subscription
1395 if metadata.get("subscription_id"): 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true
1396 try:
1397 from ...news.subscription_manager.storage import (
1398 SQLSubscriptionStorage,
1399 )
1400 from datetime import datetime as dt, timezone, timedelta
1402 sub_storage = SQLSubscriptionStorage(db_session)
1403 subscription_id = metadata["subscription_id"]
1405 # Get subscription to find refresh interval
1406 subscription = sub_storage.get(subscription_id)
1407 if subscription:
1408 refresh_minutes = subscription.get(
1409 "refresh_minutes", 240
1410 )
1411 now = dt.now(timezone.utc)
1412 next_refresh = now + timedelta(
1413 minutes=refresh_minutes
1414 )
1416 # Update refresh times
1417 sub_storage.update_refresh_time(
1418 subscription_id=subscription_id,
1419 last_refresh=now,
1420 next_refresh=next_refresh,
1421 )
1423 # Increment stats
1424 sub_storage.increment_stats(subscription_id, 1)
1426 logger.info(
1427 f"Updated subscription {subscription_id} refresh times"
1428 )
1429 except Exception:
1430 logger.warning(
1431 "Could not update subscription refresh time"
1432 )
1434 progress_callback(
1435 "Research completed successfully",
1436 100,
1437 {"phase": "complete"},
1438 )
1440 # Clean up resources
1441 cleanup_research_resources(
1442 research_id, username, user_password=user_password
1443 )
1445 except ResearchTerminatedException:
1446 logger.info(f"Research {research_id} terminated by user")
1447 # handle_termination() was already called by progress_callback
1448 # before raising, which:
1449 # 1. Queued SUSPENDED status update via queue_processor
1450 # 2. Called cleanup_research_resources()
1451 # No additional cleanup needed here.
1453 except Exception as e:
1454 # Handle error
1455 error_message = f"Research failed: {e!s}"
1456 logger.exception(error_message)
1458 try:
1459 # Check for common Ollama error patterns in the exception and provide more user-friendly errors
1460 user_friendly_error = str(e)
1461 error_context = {}
1463 if "Error type: ollama_unavailable" in user_friendly_error:
1464 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system."
1465 error_context = {
1466 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly."
1467 }
1468 elif "Error type: model_not_found" in user_friendly_error:
1469 user_friendly_error = "Required Ollama model not found. Please pull the model first."
1470 error_context = {
1471 "solution": "Run 'ollama pull mistral' to download the required model."
1472 }
1473 elif "Error type: connection_error" in user_friendly_error:
1474 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running."
1475 error_context = {
1476 "solution": "Ensure Ollama or your API service is running and accessible."
1477 }
1478 elif "Error type: api_error" in user_friendly_error:
1479 # Keep the original error message as it's already improved
1480 error_context = {
1481 "solution": "Check API configuration and credentials."
1482 }
1484 # Generate enhanced error report for failed research
1485 enhanced_report_content = None
1486 try:
1487 # Get partial results if they exist
1488 partial_results = results if "results" in locals() else None
1489 search_iterations = (
1490 results.get("iterations", 0) if partial_results else 0
1491 )
1493 # Generate comprehensive error report
1494 # ErrorReportGenerator does not use LLM (kept for compat)
1495 error_generator = ErrorReportGenerator()
1496 enhanced_report_content = error_generator.generate_error_report(
1497 error_message=f"Research failed: {e!s}",
1498 query=query,
1499 partial_results=partial_results,
1500 search_iterations=search_iterations,
1501 research_id=research_id,
1502 )
1504 logger.info(
1505 "Generated enhanced error report for failed research (length: {})",
1506 len(enhanced_report_content),
1507 )
1509 # Save enhanced error report to encrypted database
1510 try:
1511 # username already available from function scope (line 281)
1512 if username: 1512 ↛ 1534line 1512 didn't jump to line 1534 because the condition on line 1512 was always true
1513 from ...storage import get_report_storage
1515 with get_user_db_session(username) as db_session:
1516 storage = get_report_storage(session=db_session)
1517 success = storage.save_report(
1518 research_id=research_id,
1519 content=enhanced_report_content,
1520 metadata={"error_report": True},
1521 username=username,
1522 )
1523 if success:
1524 logger.info(
1525 "Saved enhanced error report to encrypted database for research {}",
1526 research_id,
1527 )
1528 else:
1529 logger.warning(
1530 "Failed to save enhanced error report to database for research {}",
1531 research_id,
1532 )
1533 else:
1534 logger.warning(
1535 "Cannot save error report: username not available"
1536 )
1538 except Exception as report_error:
1539 logger.exception(
1540 "Failed to save enhanced error report: {}", report_error
1541 )
1543 except Exception as error_gen_error:
1544 logger.exception(
1545 "Failed to generate enhanced error report: {}",
1546 error_gen_error,
1547 )
1548 enhanced_report_content = None
1550 # Get existing metadata from database first
1551 existing_metadata = {}
1552 try:
1553 # username already available from function scope (line 281)
1554 if username: 1554 ↛ 1567line 1554 didn't jump to line 1567 because the condition on line 1554 was always true
1555 with get_user_db_session(username) as db_session:
1556 research = (
1557 db_session.query(ResearchHistory)
1558 .filter_by(id=research_id)
1559 .first()
1560 )
1561 if research and research.research_meta:
1562 existing_metadata = dict(research.research_meta)
1563 except Exception:
1564 logger.exception("Failed to get existing metadata")
1566 # Update metadata with more context about the error while preserving existing values
1567 metadata = existing_metadata
1568 metadata.update({"phase": "error", "error": user_friendly_error})
1569 if error_context:
1570 metadata.update(error_context)
1571 if enhanced_report_content: 1571 ↛ 1575line 1571 didn't jump to line 1575 because the condition on line 1571 was always true
1572 metadata["has_enhanced_report"] = True
1574 # If we still have an active research record, update its log
1575 if is_research_active(research_id):
1576 progress_callback(user_friendly_error, None, metadata)
1578 # If termination was requested, mark as suspended instead of failed
1579 status = (
1580 ResearchStatus.SUSPENDED
1581 if is_termination_requested(research_id)
1582 else ResearchStatus.FAILED
1583 )
1584 message = (
1585 "Research was terminated by user"
1586 if status == ResearchStatus.SUSPENDED
1587 else user_friendly_error
1588 )
1590 # Calculate duration up to termination point - using UTC consistently
1591 now = datetime.now(UTC)
1592 completed_at = now.isoformat()
1594 # NOTE: Database updates from threads are handled by queue processor
1595 # The queue_processor.queue_error_update() method is already being used below
1596 # to safely update the database from the main thread
1598 # Queue the error update to be processed in main thread
1599 # Using the queue processor v2 system
1600 from ..queue.processor_v2 import queue_processor
1602 if username: 1602 ↛ 1616line 1602 didn't jump to line 1616 because the condition on line 1602 was always true
1603 queue_processor.queue_error_update(
1604 username=username,
1605 research_id=research_id,
1606 status=status,
1607 error_message=message,
1608 metadata=metadata,
1609 completed_at=completed_at,
1610 report_path=None,
1611 )
1612 logger.info(
1613 f"Queued error update for research {research_id} with status '{status}'"
1614 )
1615 else:
1616 logger.error(
1617 f"Cannot queue error update for research {research_id} - no username provided. "
1618 f"Status: '{status}', Message: {message}"
1619 )
1621 try:
1622 SocketIOService().emit_to_subscribers(
1623 "progress",
1624 research_id,
1625 {"status": status, "error": message},
1626 )
1627 except Exception:
1628 logger.exception("Failed to emit error via socket")
1630 except Exception:
1631 logger.exception("Error in error handler")
1633 # Clean up resources
1634 cleanup_research_resources(
1635 research_id, username, user_password=user_password
1636 )
1638 finally:
1639 # RESOURCE CLEANUP: Close search engine HTTP sessions.
1640 #
1641 # Search engines (created via get_search()) may hold HTTP connection
1642 # pools. Currently only SemanticScholarSearchEngine creates a
1643 # persistent SafeSession; other engines use stateless safe_get()/
1644 # safe_post() utility functions. However, BaseSearchEngine.close()
1645 # is safe to call on any engine — it checks for a 'session'
1646 # attribute and is fully idempotent (SemanticScholar sets
1647 # self.session = None after close).
1648 #
1649 # Neither @thread_cleanup nor cleanup_research_resources() close
1650 # the search engine — @thread_cleanup only handles database sessions
1651 # and context cleanup, and cleanup_research_resources() only handles
1652 # status updates, notifications, and tracking dict removal.
1653 #
1654 # Without this explicit close, search engine sessions rely on
1655 # Python's non-deterministic garbage collection (__del__) for
1656 # cleanup, which can cause file descriptor exhaustion under
1657 # sustained load.
1658 from ...utilities.resource_utils import safe_close
1660 if "use_search" in locals():
1661 safe_close(use_search, "research search engine")
1662 # Close search system (cascades to strategy thread pools).
1663 # See AdvancedSearchSystem.close() for details.
1664 if "system" in locals():
1665 safe_close(system, "research system")
1666 # Close the LLM instance created for model/provider overrides.
1667 # system.close() does NOT close the LLM passed to it via system.model,
1668 # so we must close it explicitly here.
1669 if "use_llm" in locals():
1670 safe_close(use_llm, "research LLM")
1673def cleanup_research_resources(research_id, username=None, user_password=None):
1674 """
1675 Clean up resources for a completed research.
1677 Args:
1678 research_id: The ID of the research
1679 username: The username for database access (required for thread context)
1680 """
1681 from ..routes.globals import cleanup_research
1683 logger.info("Cleaning up resources for research {}", research_id)
1685 # For testing: Add a small delay to simulate research taking time
1686 # This helps test concurrent research limits
1687 from ...settings.env_registry import is_test_mode
1689 if is_test_mode():
1690 import time
1692 logger.info(
1693 f"Test mode: Adding 5 second delay before cleanup for {research_id}"
1694 )
1695 time.sleep(5)
1697 # Get the current status from the database to determine the final status message
1698 current_status = ResearchStatus.COMPLETED # Default
1700 # NOTE: Queue processor already handles database updates from the main thread
1701 # The notify_research_completed() method is called at the end of this function
1702 # which safely updates the database status
1704 # Notify queue processor that research completed
1705 # This uses processor_v2 which handles database updates in the main thread
1706 # avoiding the Flask request context issues that occur in background threads
1707 from ..queue.processor_v2 import queue_processor
1709 if username:
1710 queue_processor.notify_research_completed(
1711 username, research_id, user_password=user_password
1712 )
1713 logger.info(
1714 f"Notified queue processor of completion for research {research_id} (user: {username})"
1715 )
1716 else:
1717 logger.warning(
1718 f"Cannot notify completion for research {research_id} - no username provided"
1719 )
1721 # Remove from active research and termination flags atomically
1722 cleanup_research(research_id)
1724 # Clean up throttle state for this research
1725 with _last_emit_lock:
1726 _last_emit_times.pop(research_id, None)
1728 # Send a final message to subscribers
1729 try:
1730 # Send a final message to any remaining subscribers with explicit status
1731 # Use the proper status message based on database status
1732 if current_status in ( 1732 ↛ 1736line 1732 didn't jump to line 1736 because the condition on line 1732 was never true
1733 ResearchStatus.SUSPENDED,
1734 ResearchStatus.FAILED,
1735 ):
1736 final_message = {
1737 "status": current_status,
1738 "message": f"Research was {current_status}",
1739 "progress": 0, # For suspended research, show 0% not 100%
1740 }
1741 else:
1742 final_message = {
1743 "status": ResearchStatus.COMPLETED,
1744 "message": "Research process has ended and resources have been cleaned up",
1745 "progress": 100,
1746 }
1748 logger.info(
1749 "Sending final {} socket message for research {}",
1750 current_status,
1751 research_id,
1752 )
1754 SocketIOService().emit_to_subscribers(
1755 "progress", research_id, final_message
1756 )
1758 # Clean up socket subscriptions for this research
1759 SocketIOService().remove_subscriptions_for_research(research_id)
1761 except Exception:
1762 logger.exception("Error sending final cleanup message")
1765def handle_termination(research_id, username=None):
1766 """
1767 Handle the termination of a research process.
1769 Args:
1770 research_id: The ID of the research
1771 username: The username for database access (required for thread context)
1772 """
1773 logger.info(f"Handling termination for research {research_id}")
1775 # Queue the status update to be processed in the main thread
1776 # This avoids Flask request context errors in background threads
1777 try:
1778 from ..queue.processor_v2 import queue_processor
1780 now = datetime.now(UTC)
1781 completed_at = now.isoformat()
1783 # Queue the suspension update
1784 queue_processor.queue_error_update(
1785 username=username,
1786 research_id=research_id,
1787 status=ResearchStatus.SUSPENDED,
1788 error_message="Research was terminated by user",
1789 metadata={"terminated_at": completed_at},
1790 completed_at=completed_at,
1791 report_path=None,
1792 )
1794 logger.info(f"Queued suspension update for research {research_id}")
1795 except Exception:
1796 logger.exception(
1797 f"Error queueing termination update for research {research_id}"
1798 )
1800 # Clean up resources (this already handles things properly)
1801 cleanup_research_resources(research_id, username)
1804def cancel_research(research_id, username):
1805 """
1806 Cancel/terminate a research process using ORM.
1808 Args:
1809 research_id: The ID of the research to cancel
1810 username: The username of the user cancelling the research
1812 Returns:
1813 bool: True if the research was found and cancelled, False otherwise
1814 """
1815 try:
1816 from ..routes.globals import is_research_active, set_termination_flag
1818 # Set termination flag
1819 set_termination_flag(research_id)
1821 # Check if the research is active
1822 if is_research_active(research_id):
1823 # Call handle_termination to update database
1824 handle_termination(research_id, username)
1825 return True
1826 try:
1827 with get_user_db_session(username) as db_session:
1828 research = (
1829 db_session.query(ResearchHistory)
1830 .filter_by(id=research_id)
1831 .first()
1832 )
1833 if not research:
1834 logger.info(f"Research {research_id} not found in database")
1835 return False
1837 # Check if already in a terminal state
1838 if research.status in (
1839 ResearchStatus.COMPLETED,
1840 ResearchStatus.SUSPENDED,
1841 ResearchStatus.FAILED,
1842 ResearchStatus.ERROR,
1843 ):
1844 logger.info(
1845 f"Research {research_id} already in terminal state: {research.status}"
1846 )
1847 return True # Consider this a success since it's already stopped
1849 # If it exists but isn't in active_research, still update status
1850 research.status = ResearchStatus.SUSPENDED
1851 db_session.commit()
1852 logger.info(f"Successfully suspended research {research_id}")
1853 except Exception:
1854 logger.exception(
1855 f"Error accessing database for research {research_id}"
1856 )
1857 return False
1859 return True
1860 except Exception:
1861 logger.exception(
1862 f"Unexpected error in cancel_research for {research_id}"
1863 )
1864 return False