Coverage for src/local_deep_research/web/services/research_service.py: 81%
916 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import hashlib
2import json
3import re
4import threading
5import time
6from datetime import datetime, UTC
7from pathlib import Path
9from loguru import logger
11from ...exceptions import DuplicateResearchError, ResearchTerminatedException
12from ...config.llm_config import get_llm
13from ...settings.manager import SnapshotSettingsContext
15# Output directory for research results
16from ...config.paths import get_research_outputs_directory
17from ...config.search_config import get_search
18from ...constants import ResearchStatus
19from ...database.models import ResearchHistory, ResearchStrategy
20from ...database.session_context import get_user_db_session
21from ...database.thread_local_session import thread_cleanup
22from ...error_handling.openai_compat_errors import (
23 friendly_openai_compatible_error,
24 is_openai_compat_runtime_error,
25)
26from ...error_handling.report_generator import ErrorReportGenerator
27from ...utilities.thread_context import set_search_context
28from ...report_generator import IntegratedReportGenerator
29from ...search_system import AdvancedSearchSystem
30from ...text_optimization import CitationFormatter, CitationMode
31from ...utilities.log_utils import log_for_research
32from ...utilities.search_utilities import extract_links_from_search_results
33from ...utilities.threading_utils import thread_context, thread_with_app_context
34from ..models.database import calculate_duration
35from ...settings.env_registry import get_env_setting
36from .socket_service import SocketIOService
38OUTPUT_DIR = get_research_outputs_directory()
41# Global concurrent research limit (server-wide, across all users)
42_MAX_GLOBAL_CONCURRENT = get_env_setting(
43 "server.max_concurrent_research", default=10
44)
45_global_research_semaphore = threading.Semaphore(_MAX_GLOBAL_CONCURRENT)
47# Socket.IO emission throttling: minimum interval between progress emissions per research
48_EMIT_THROTTLE_SECONDS = 0.2 # 200ms
49_EMIT_TTL_SECONDS = 3600 # 1 hour — evict stale entries from orphaned research
51# Cap on the partial-content buffer kept server-side so chat-mode termination
52# can persist whatever was already streamed. Bounded to keep memory predictable
53# under pathologically long answers (typical answers are a few KB).
54_MAX_PARTIAL_BUFFER_BYTES = 256 * 1024 # 256 KB
55_emit_cleanup_counter = 0
56_last_emit_times: dict[str, float] = {}
57_last_emit_lock = threading.Lock()
59# Phases that produce user-visible step messages in chat mode.
60# "complete" is excluded — it fires AFTER the response message is written,
61# which would create a step with a higher sequence_number than the response.
62_STEP_PHASES = frozenset(
63 {
64 "init",
65 "setup",
66 "search_planning",
67 "search",
68 "observation",
69 "output_generation",
70 "synthesis_error",
71 "synthesis_fallback",
72 "report_generation",
73 "report_complete",
74 "error",
75 }
76)
79def _chat_step_decision(
80 phase: str | None,
81 last_step_phase: str | None,
82 is_final: bool,
83) -> tuple[bool, bool]:
84 """Decide whether to persist + emit a chat-mode step event.
86 Encodes the symmetry invariant the progress_callback in
87 run_research_process enforces: for chat sessions, what the live UI
88 surfaces over the socket must equal what `loadSession` reconstructs
89 from chat_progress_steps on reload. The repeat-phase dedup must
90 therefore block BOTH writes, not just the DB write.
92 Returns:
93 (persist, suppress_emit)
94 - persist: True iff add_progress_step should be called for this event.
95 - suppress_emit: True iff the caller should null the socket payload
96 so the emit is dropped. Only suppressed when this is a non-final
97 repeat — final phases (complete/error/report_complete) always
98 emit so the client completion handler fires.
100 Args:
101 phase: the phase tag from this event (e.g. "search", "observation")
102 last_step_phase: phase tag of the previously persisted chat step
103 (None until the first persist of this research).
104 is_final: True if this is a "final" phase event the client must
105 see to fire its completion handler (complete | error |
106 report_complete | progress==100).
107 """
108 if phase not in _STEP_PHASES:
109 # Not a chat-step phase at all (e.g. "complete"). Persist no,
110 # and let the emit through unchanged — completion / control
111 # events still need to reach the client.
112 return False, False
113 dedup_ok = phase != last_step_phase or phase == "observation"
114 if dedup_ok:
115 return True, False
116 return False, not is_final
119def _parse_research_metadata(research_meta) -> dict:
120 """Parse research_meta into a dict, handling both dict and JSON string types."""
121 if isinstance(research_meta, dict):
122 return dict(research_meta)
123 if isinstance(research_meta, str):
124 try:
125 parsed = json.loads(research_meta)
126 return dict(parsed) if isinstance(parsed, dict) else {}
127 except json.JSONDecodeError:
128 logger.exception("Failed to parse research_meta as JSON")
129 return {}
130 return {}
133def _extract_synthesized_answer(results: dict) -> str:
134 """Pull the LLM-synthesized answer out of a strategy result dict.
136 ``report_content`` must store ONLY the synthesized answer (LLM
137 prose with [N] inline citations). The strategy's
138 ``formatted_findings`` is the full ``format_findings`` blob —
139 answer + ``format_links_to_markdown`` source list +
140 ``## SEARCH QUESTIONS BY ITERATION`` + ``## DETAILED FINDINGS``
141 + ``## ALL SOURCES`` — and ``format_document_split`` only knows
142 how to strip ``## Sources`` headers, not those other sections.
143 Saving the blob would leak sources/findings into ``report_content``.
145 Resolution order:
146 1. ``Final synthesis`` finding (set by source_based, parallel,
147 rapid, focused_iteration, iterdrag).
148 2. ``current_knowledge`` (other strategies expose the answer
149 there).
150 3. Empty string — caller decides whether to fall back further.
151 """
152 for finding in results.get("findings") or []:
153 if finding.get("phase") == "Final synthesis":
154 content = finding.get("content") or ""
155 if content:
156 return content
157 return results.get("current_knowledge") or ""
160def get_citation_formatter():
161 """Get citation formatter with settings from thread context."""
162 # Import here to avoid circular imports
163 from ...config.search_config import get_setting_from_snapshot
165 citation_format = get_setting_from_snapshot(
166 "report.citation_format", "number_hyperlinks"
167 )
168 mode_map = {
169 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
170 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
171 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
172 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
173 "source_tagged_hyperlinks": CitationMode.SOURCE_TAGGED_HYPERLINKS,
174 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
175 }
176 mode = mode_map.get(citation_format, CitationMode.NUMBER_HYPERLINKS)
177 return CitationFormatter(mode=mode)
180def export_report_to_memory(
181 markdown_content: str, format: str, title: str | None = None
182):
183 """
184 Export a markdown report to different formats in memory.
186 Uses the modular exporter registry to support multiple formats.
187 Available formats can be queried with ExporterRegistry.get_available_formats().
189 Args:
190 markdown_content: The markdown content to export
191 format: Export format (e.g., 'pdf', 'odt', 'latex', 'quarto', 'ris')
192 title: Optional title for the document
194 Returns:
195 Tuple of (content_bytes, filename, mimetype)
196 """
197 from ...exporters import ExporterRegistry, ExportOptions
199 # Normalize format
200 format_lower = format.lower()
202 # Get exporter from registry
203 exporter = ExporterRegistry.get_exporter(format_lower)
205 if exporter is None:
206 available = ExporterRegistry.get_available_formats()
207 raise ValueError(
208 f"Unsupported export format: {format}. "
209 f"Available formats: {', '.join(available)}"
210 )
212 # Title prepending is now handled by each exporter via _prepend_title_if_needed()
213 # PDF and ODT exporters prepend titles; RIS and other formats ignore them
215 # Create options
216 options = ExportOptions(title=title)
218 # Export
219 result = exporter.export(markdown_content, options)
221 logger.info(
222 f"Generated {format_lower} in memory, size: {len(result.content)} bytes"
223 )
225 return result.content, result.filename, result.mimetype
228def save_research_strategy(research_id, strategy_name, username=None):
229 """
230 Save the strategy used for a research to the database.
232 Args:
233 research_id: The ID of the research
234 strategy_name: The name of the strategy used
235 username: The username for database access (required for thread context)
236 """
237 try:
238 logger.debug(
239 f"save_research_strategy called with research_id={research_id}, strategy_name={strategy_name}"
240 )
241 with get_user_db_session(username) as session:
242 # Check if a strategy already exists for this research
243 existing_strategy = (
244 session.query(ResearchStrategy)
245 .filter_by(research_id=research_id)
246 .first()
247 )
249 if existing_strategy:
250 # Update existing strategy
251 existing_strategy.strategy_name = strategy_name
252 logger.debug(
253 f"Updating existing strategy for research {research_id}"
254 )
255 else:
256 # Create new strategy record
257 new_strategy = ResearchStrategy(
258 research_id=research_id, strategy_name=strategy_name
259 )
260 session.add(new_strategy)
261 logger.debug(
262 f"Creating new strategy record for research {research_id}"
263 )
265 session.commit()
266 logger.info(
267 f"Saved strategy '{strategy_name}' for research {research_id}"
268 )
269 except Exception:
270 logger.exception("Error saving research strategy")
273def get_research_strategy(research_id, username=None):
274 """
275 Get the strategy used for a research.
277 Args:
278 research_id: The ID of the research
279 username: The username for database access (required for thread context)
281 Returns:
282 str: The strategy name or None if not found
283 """
284 try:
285 with get_user_db_session(username) as session:
286 strategy = (
287 session.query(ResearchStrategy)
288 .filter_by(research_id=research_id)
289 .first()
290 )
292 return strategy.strategy_name if strategy else None
293 except Exception:
294 logger.exception("Error getting research strategy")
295 return None
298def start_research_process(
299 research_id,
300 query,
301 mode,
302 run_research_callback,
303 **kwargs,
304):
305 """
306 Start a research process in a background thread.
308 Args:
309 research_id: The ID of the research
310 query: The research query
311 mode: The research mode (quick/detailed)
312 run_research_callback: The callback function to run the research
313 **kwargs: Additional parameters to pass to the research process (model, search_engine, etc.)
315 Returns:
316 threading.Thread: The thread running the research
317 """
318 from ..routes.globals import check_and_start_research
319 from ...exceptions import SystemAtCapacityError
321 # Acquire the global concurrency semaphore SYNCHRONOUSLY in the caller's
322 # thread. Previously this happened inside the worker after the HTTP route
323 # had already returned 200 — at capacity, the worker parked and the user
324 # saw an infinite thinking spinner with the partial unique in-progress
325 # index blocking retries. Surfacing capacity as an exception lets the
326 # route return HTTP 429 before committing any DB state.
327 if not _global_research_semaphore.acquire(blocking=False): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 raise SystemAtCapacityError(
329 f"At research capacity (max {_MAX_GLOBAL_CONCURRENT} concurrent)"
330 )
332 # Pass the app context to the thread.
333 run_research_callback = thread_with_app_context(run_research_callback)
335 # Wrap callback so the worker releases the already-held semaphore on exit.
336 original_callback = run_research_callback
338 def _release_semaphore_on_exit(*args, **kw):
339 try:
340 return original_callback(*args, **kw)
341 finally:
342 _global_research_semaphore.release()
344 # Prepare (but do not start) the background thread.
345 thread = threading.Thread(
346 target=_release_semaphore_on_exit,
347 args=(
348 thread_context(),
349 research_id,
350 query,
351 mode,
352 ),
353 kwargs=kwargs,
354 )
355 thread.daemon = True
357 # Atomic check-and-start: refuses to spawn a second live thread
358 # for the same research_id. Guards against the double-spawn window
359 # where a post-spawn commit failure in the queue processor could
360 # otherwise cause the retry loop to dispatch the same research twice.
361 try:
362 started = check_and_start_research(
363 research_id,
364 {
365 "thread": thread,
366 "progress": 0,
367 "status": ResearchStatus.IN_PROGRESS,
368 "log": [],
369 "settings": kwargs,
370 },
371 )
372 except Exception:
373 # check_and_start_research raised before the thread ran — the
374 # semaphore won't be released by the worker, so release it here.
375 _global_research_semaphore.release()
376 raise
377 if not started:
378 # No thread will run → no _release_semaphore_on_exit → release here.
379 _global_research_semaphore.release()
380 raise DuplicateResearchError(
381 f"Research {research_id} already has a live thread"
382 )
384 return thread
387def _generate_report_path(query: str) -> Path:
388 """
389 Generates a path for a new report file based on the query.
391 Args:
392 query: The query used for the report.
394 Returns:
395 The path that it generated.
397 """
398 # Generate a unique filename that does not contain
399 # non-alphanumeric characters.
400 query_hash = hashlib.md5( # DevSkim: ignore DS126858
401 query.encode("utf-8"), usedforsecurity=False
402 ).hexdigest()[:10]
403 return OUTPUT_DIR / (
404 f"research_report_{query_hash}_{int(datetime.now(UTC).timestamp())}.md"
405 )
408def _save_chat_message_and_context(
409 chat_session_id,
410 research_id,
411 username,
412 report_content,
413 streaming_enabled,
414 streaming_state,
415 socket_service,
416 settings_snapshot=None,
417):
418 """Save assistant message to chat and update accumulated context."""
419 from ...chat.service import ChatService
420 from ...chat.context import ChatContextManager
422 chat_service = ChatService(username)
423 # chat_messages.content is NOT NULL. Write report_content
424 # inline (snapshot pattern). Falls back to a placeholder marker only
425 # if report_content is itself empty — the same pattern used by the
426 # terminate path's _STOPPED_BEFORE_OUTPUT_MARKER.
427 snapshot_content = report_content or _NO_OUTPUT_MARKER
428 # allow_archived=True: a multi-tab race can flip the session to
429 # archived between research.status=COMPLETED commit and this write.
430 # Losing the final assistant answer is worse than violating the
431 # "no writes to archived sessions" rule for a system-generated row;
432 # user-message writes from chat/routes.py still keep the default.
433 chat_service.add_message(
434 session_id=chat_session_id,
435 role="assistant",
436 content=snapshot_content,
437 message_type="response",
438 research_id=research_id,
439 allow_archived=True,
440 )
441 # Mark the response row as persisted so the trailing
442 # progress_callback("Research completed successfully", 100) — which
443 # runs the termination check and could fire if the user clicks Stop
444 # in the small window between this write and the final emit — does
445 # NOT write a duplicate row via _save_partial_chat_message_on_terminate.
446 if streaming_state is not None: 446 ↛ 448line 446 didn't jump to line 448 because the condition on line 446 was always true
447 streaming_state["_persisted"] = True
448 logger.info(f"Added research result to chat {chat_session_id[:8]}...")
450 try:
451 # report_content is the answer-only string; no extraction needed.
452 chat_content = report_content or ""
453 ctx_manager = ChatContextManager(
454 session_id=chat_session_id,
455 messages=[],
456 settings_snapshot=settings_snapshot,
457 )
458 updates = ctx_manager.extract_context_updates(new_content=chat_content)
459 chat_service.update_accumulated_context(
460 session_id=chat_session_id, **updates
461 )
462 logger.info(
463 f"Updated accumulated context for chat {chat_session_id[:8]}..."
464 )
465 except Exception:
466 # Bumped from debug to warning: a failed accumulated-context
467 # update silently degrades multi-turn context for the next
468 # follow-up in this chat (entities/topics/source counts are
469 # missing). Ops need visibility to catch widespread breakage
470 # before users notice "the AI forgot what we were talking
471 # about" — debug-level was invisible in production.
472 logger.opt(exception=True).warning(
473 "Could not update accumulated context"
474 )
476 if streaming_enabled and streaming_state.get("chunks_sent", 0) > 0: 476 ↛ exitline 476 didn't return from function '_save_chat_message_and_context' because the condition on line 476 was always true
477 # Flush any partial-bracket fragment held in the citation carry
478 # buffer before sending the final empty sentinel. Without this,
479 # a stream that ends mid-token (LLM emits "[12" as its last
480 # bytes before closing) silently drops the leading "[" from
481 # what the client renders — the carry would be discarded when
482 # the callback's closure goes out of scope.
483 flush = streaming_state.get("_flush_carry")
484 if flush: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 leftover = flush()
486 if leftover:
487 try:
488 socket_service.emit_to_subscribers(
489 "response_chunk",
490 research_id,
491 {
492 "chunk": leftover,
493 "is_streaming": True,
494 "is_final": False,
495 },
496 )
497 except Exception:
498 logger.debug(
499 "Carry-buffer flush emit failed (non-critical)"
500 )
501 socket_service.emit_to_subscribers(
502 "response_chunk",
503 research_id,
504 {"chunk": "", "is_streaming": True, "is_final": True},
505 )
508_STOPPED_BEFORE_OUTPUT_MARKER = "[Stopped before any output was generated.]"
509_STOPPED_FOOTER = "\n\n_— Stopped by user._"
510_NO_OUTPUT_MARKER = "_(Research completed without producing output.)_"
513# Match a trailing incomplete citation opener at the chunk boundary.
514# Covers both ASCII "[" and the lenticular "【" (U+3010) — some LLMs
515# emit Chinese-style brackets and the citation formatter accepts them,
516# so we have to hold those back the same way to avoid breaking a token
517# across the next chunk.
518_PARTIAL_BRACKET_RE = re.compile(r"[\[【]\d*$")
520# Upper bound on the inline-citation carry buffer. A real citation token
521# is a handful of bytes ("[123"); anything longer means the "[" wasn't a
522# citation opener or the stream is pathological. Flush raw past this so a
523# never-closing "[" + endless digits can't grow the buffer without limit.
524_MAX_CARRY_BYTES = 64
527def _make_chat_stream_callback(
528 research_id,
529 streaming_state,
530 socket_service,
531 source_resolver=None,
532 formatter=None,
533):
534 """Build the chat-mode streaming callback.
536 The callback:
537 * Counts chunks (``chunks_sent``).
538 * Buffers RAW chunks in ``streaming_state['chunks']`` so partial
539 content survives termination — the citation handler's local list
540 is discarded on raise. Capped at ``_MAX_PARTIAL_BUFFER_BYTES``;
541 once capped, ``streaming_state['_truncated']`` flips to True and
542 further chunks aren't accumulated server-side.
543 * Raises ``ResearchTerminatedException`` if the user clicked Stop
544 mid-stream — fails fast instead of letting the LLM finish.
545 ``ResearchTerminatedException`` inherits from ``BaseException``,
546 so the citation handler's ``except Exception`` blocks naturally
547 propagate it.
548 * Emits ``response_chunk`` over Socket.IO for live display, with
549 inline citation hyperlinks applied per-chunk when both
550 ``source_resolver`` and ``formatter`` are provided — so the
551 client sees ``[[arxiv.org-1]](url)`` appearing live rather than
552 plain ``[1]`` brackets that only get hyperlinked after the full
553 response saves. Bracket tokens split across chunk boundaries are
554 held in a small carry buffer until the closing ``]`` arrives.
556 ``source_resolver`` — optional ``() -> list[dict]`` returning the
557 current ``all_links_of_system`` (so we read it lazily; the list
558 grows as the agent collects sources).
559 ``formatter`` — optional :class:`CitationFormatter` instance. Its
560 ``mode`` controls the inline format (``[[arxiv.org-1]](url)``
561 etc.), matching what the final-save formatter produces so the
562 live display doesn't mode-shift when ``handleResearchComplete``
563 swaps in the DB-saved version.
565 Extracted to module level so it can be unit-tested without spinning
566 up the full ``run_research_process``.
567 """
569 # Per-call closure state for the streaming-substitution carry
570 # buffer (holds a trailing incomplete bracket like "[12" so the
571 # closing "]3]" on the next chunk completes the citation token).
572 carry = [""]
574 def _flush_carry() -> str:
575 """Release and clear any held partial-bracket fragment.
577 The completion finalizer (``_save_chat_message_and_context``)
578 lives in a different scope and can't reach ``carry`` directly,
579 so it calls this through ``streaming_state['_flush_carry']`` to
580 avoid silently dropping the tail of a stream that ends mid-token
581 like ``"[12"``.
582 """
583 released, carry[0] = carry[0], ""
584 return released
586 # Expose to the completion path via the shared state dict.
587 streaming_state["_flush_carry"] = _flush_carry
589 def _hyperlink_chunk(chunk: str) -> str:
590 """Apply inline citation hyperlinks to a single chunk.
592 Maintains the closure-level ``carry`` buffer for incomplete
593 ``[N`` tokens straddling chunk boundaries. The carry contains
594 the trailing partial-bracket fragment from the previous chunk
595 that we haven't been able to substitute yet — it's prepended
596 to the next chunk so the regex can see the full token. The
597 DELTA returned is what the client should append next: it
598 includes the just-completed carry (now hyperlinked) plus the
599 new chunk's safe portion, MINUS the new chunk's own trailing
600 partial bracket (which becomes the new carry).
601 """
602 if source_resolver is None or formatter is None:
603 return chunk
604 try:
605 sources = source_resolver() or []
606 if not sources: 606 ↛ 609line 606 didn't jump to line 609 because the condition on line 606 was never true
607 # Reset carry so the leading "[" we held onto doesn't
608 # disappear from the client's accumulated text.
609 released, carry[0] = carry[0], ""
610 return released + chunk
611 text = carry[0] + chunk
612 pending = _PARTIAL_BRACKET_RE.search(text)
613 if pending:
614 safe = text[: pending.start()]
615 new_carry = text[pending.start() :]
616 # Bound the carry. A well-formed citation token is a few
617 # bytes (`[12`); if the held fragment grows past this, the
618 # "[" was not actually opening a citation (or a hostile /
619 # misbehaving LLM is streaming `[` + endless digits with no
620 # closing `]`). Flush it raw rather than buffering without
621 # limit — preserves the text, just doesn't hyperlink it.
622 if len(new_carry) > _MAX_CARRY_BYTES:
623 safe = text
624 carry[0] = ""
625 else:
626 carry[0] = new_carry
627 else:
628 safe = text
629 carry[0] = ""
630 if not safe: 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true
631 return ""
632 return formatter.apply_inline_hyperlinks(safe, sources)
633 except Exception:
634 # Hyperlinking is quality-of-life. On any failure fall back
635 # to emitting the raw chunk so the user still sees the text.
636 logger.debug(
637 "Inline citation hyperlinking failed; emitting raw chunk",
638 exc_info=True,
639 )
640 released, carry[0] = carry[0], ""
641 return released + chunk
643 def stream_callback(chunk: str):
644 # Resolve through the module namespace each call so tests can
645 # ``patch("local_deep_research.web.routes.globals.is_termination_requested")``.
646 # Cached in sys.modules — negligible cost.
647 from ..routes.globals import is_termination_requested
649 if not chunk:
650 return
651 streaming_state["chunks_sent"] += 1
652 if not streaming_state["_truncated"]: 652 ↛ 673line 652 didn't jump to line 673 because the condition on line 652 was always true
653 chunk_bytes = len(chunk.encode("utf-8"))
654 if (
655 streaming_state["_bytes"] + chunk_bytes
656 <= _MAX_PARTIAL_BUFFER_BYTES
657 ):
658 # IMPORTANT: store the RAW chunk in the partial-content
659 # buffer (terminate handler joins these and saves them as
660 # the partial assistant message). If we stored the
661 # hyperlinked version, the saved-on-terminate text would
662 # be double-formatted when the user resumes.
663 streaming_state["chunks"].append(chunk)
664 streaming_state["_bytes"] += chunk_bytes
665 else:
666 streaming_state["_truncated"] = True
667 logger.warning(
668 f"Partial-content buffer hit {_MAX_PARTIAL_BUFFER_BYTES} bytes "
669 f"for research {research_id}; further chunks won't be persisted on terminate"
670 )
671 # Mid-stream interrupt — fail fast if the user clicked Stop while
672 # the LLM is still streaming.
673 if is_termination_requested(research_id):
674 raise ResearchTerminatedException( # noqa: TRY301 — propagated through citation handler
675 "Research was terminated by user during streaming"
676 )
677 # Apply citation hyperlinks for the client emit only — the
678 # client accumulates substituted text into the streaming bubble
679 # so the user sees [[arxiv.org-1]](url) appearing live as the
680 # model writes "According to [1]…".
681 display_chunk = _hyperlink_chunk(chunk)
682 if not display_chunk: 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true
683 return # nothing safe to emit yet (all held in carry buffer)
684 try:
685 socket_service.emit_to_subscribers(
686 "response_chunk",
687 research_id,
688 {
689 "chunk": display_chunk,
690 "is_streaming": True,
691 "is_final": False,
692 },
693 )
694 except Exception:
695 logger.debug("Stream chunk emit failed (non-critical)")
697 return stream_callback
700def _save_partial_chat_message_on_terminate(
701 chat_session_id,
702 research_id,
703 username,
704 partial_content,
705 truncated=False,
706 streaming_state=None,
707):
708 """Persist a chat 'response' row capturing whatever was streamed before
709 termination, and emit a final ``response_chunk`` so the client strips
710 the streaming class from the bubble.
712 Must run BEFORE ``handle_termination()`` because that path runs
713 ``cleanup_research_resources()`` which removes the Socket.IO room
714 subscriptions — anything emitted afterwards goes nowhere.
716 Idempotent: when ``streaming_state`` is supplied, sets a ``_persisted``
717 flag so duplicate calls (one from the progress callback, one from the
718 outer except handler when a stream_callback raises mid-stream) only
719 write a single row.
721 Skips silently when ``chat_session_id`` is falsy (single-turn case).
722 All failures are swallowed — termination cleanup must never crash the
723 worker.
724 """
725 if not chat_session_id:
726 return
727 if streaming_state is not None and streaming_state.get("_persisted"):
728 return
729 try:
730 from ...chat.service import ChatService
732 if partial_content:
733 content = partial_content + _STOPPED_FOOTER
734 if truncated:
735 content += " _(output was very long; truncated.)_"
736 else:
737 content = _STOPPED_BEFORE_OUTPUT_MARKER
739 # allow_archived=True: same rationale as the completion path —
740 # the partial response on Stop is system-generated and must
741 # survive a concurrent archive (see _save_chat_message_and_context).
742 ChatService(username).add_message(
743 session_id=chat_session_id,
744 role="assistant",
745 content=content,
746 message_type="response",
747 research_id=research_id,
748 allow_archived=True,
749 )
750 # Set the idempotency flag ONLY after the write succeeds. If
751 # `add_message` raises (DB lock, encryption error, archived
752 # session), the outer ResearchTerminatedException handler retries
753 # this helper — flipping the flag pre-write would short-circuit
754 # the retry and silently lose the partial response.
755 if streaming_state is not None:
756 streaming_state["_persisted"] = True
757 logger.info(
758 f"Persisted partial chat response for terminated research "
759 f"{research_id} ({len(content)} chars)"
760 )
761 except Exception:
762 logger.opt(exception=True).warning(
763 "Failed to persist partial chat message on terminate"
764 )
766 try:
767 SocketIOService().emit_to_subscribers(
768 "response_chunk",
769 research_id,
770 {"chunk": "", "is_streaming": True, "is_final": True},
771 )
772 except Exception:
773 logger.debug("Final-chunk emit on terminate failed (non-critical)")
776@log_for_research
777@thread_cleanup
778def run_research_process(research_id, query, mode, **kwargs):
779 """
780 Run the research process in the background for a given research ID.
782 Args:
783 research_id: The ID of the research
784 query: The research query
785 mode: The research mode (quick/detailed)
786 **kwargs: Additional parameters for the research (model_provider, model, search_engine, etc.)
787 MUST include 'username' for database access
788 """
789 from ..routes.globals import (
790 is_research_active,
791 is_termination_requested,
792 update_progress_and_check_active,
793 )
795 # Extract username - required for database access
796 username = kwargs.get("username")
797 if not username:
798 logger.error("No username provided to research thread")
799 raise ValueError("Username is required for research process")
800 # Extract user_password early so it's available for all cleanup paths
801 user_password = kwargs.get("user_password")
803 # Establish thread context FIRST so every subsequent log line in this
804 # thread can be attributed to the correct user/research and persisted
805 # to the user's encrypted ResearchLog. Otherwise the early INFO logs
806 # below ("Research thread started", "Research strategy", "Research
807 # parameters") fire before start_research_process gets to its own
808 # set_search_context call (~line 417) and the daemon can't open the
809 # encrypted DB to write them — silently dropped via the bare-except.
810 set_search_context(
811 {
812 "research_id": research_id,
813 "username": username,
814 "user_password": user_password,
815 }
816 )
818 logger.info(f"Research thread started with username: {username}")
820 try:
821 # Check if this research has been terminated before we even start
822 if is_termination_requested(research_id):
823 logger.info(
824 f"Research {research_id} was terminated before starting"
825 )
826 cleanup_research_resources(
827 research_id,
828 username,
829 user_password=user_password,
830 final_status=ResearchStatus.SUSPENDED,
831 )
832 return
834 logger.info(
835 f"Starting research process for ID {research_id}, query: {query}"
836 )
838 # Extract key parameters
839 model_provider = kwargs.get("model_provider")
840 model = kwargs.get("model")
841 custom_endpoint = kwargs.get("custom_endpoint")
842 search_engine = kwargs.get("search_engine")
843 max_results = kwargs.get("max_results")
844 time_period = kwargs.get("time_period")
845 iterations = kwargs.get("iterations")
846 questions_per_iteration = kwargs.get("questions_per_iteration")
847 strategy = kwargs.get(
848 "strategy", "source-based"
849 ) # Default to source-based
850 settings_snapshot = kwargs.get(
851 "settings_snapshot", {}
852 ) # Complete settings snapshot
854 # Log settings snapshot to debug
855 from ...settings.logger import log_settings
857 log_settings(settings_snapshot, "Settings snapshot received in thread")
859 # Strategy should already be saved in the database before thread starts
860 logger.info(f"Research strategy: {strategy}")
862 # Log all parameters for debugging
863 logger.info(
864 f"Research parameters: provider={model_provider}, model={model}, "
865 f"search_engine={search_engine}, max_results={max_results}, "
866 f"time_period={time_period}, iterations={iterations}, "
867 f"questions_per_iteration={questions_per_iteration}, "
868 f"custom_endpoint={custom_endpoint}, strategy={strategy}"
869 )
871 # Set up the AI Context Manager
872 output_dir = OUTPUT_DIR / f"research_{research_id}"
873 output_dir.mkdir(parents=True, exist_ok=True)
875 # Create a settings context that uses snapshot - no database access in threads
876 settings_context = SnapshotSettingsContext(
877 settings_snapshot, username=username
878 )
880 # Only log settings if explicitly enabled via LDR_LOG_SETTINGS env var
881 from ...settings.logger import log_settings
883 log_settings(
884 settings_context.values, "SettingsContext values extracted"
885 )
887 # Set the settings context for this thread
888 from ...config.thread_settings import (
889 set_settings_context,
890 )
892 set_settings_context(settings_context)
894 # user_password already extracted above (before termination check)
896 # Create shared research context that can be updated during research
897 shared_research_context = {
898 "research_id": research_id,
899 "research_query": query,
900 "research_mode": mode,
901 "research_phase": "init",
902 "search_iteration": 0,
903 "search_engines_planned": None,
904 "search_engine_selected": search_engine,
905 "username": username, # Add username for queue operations
906 "user_password": user_password, # Add password for metrics access
907 "chat_session_id": kwargs.get("chat_session_id"),
908 }
910 # If this is a follow-up research, include the parent context
911 if "research_context" in kwargs and kwargs["research_context"]:
912 logger.info(
913 f"Adding parent research context with {len(kwargs['research_context'].get('past_findings', ''))} chars of findings"
914 )
915 shared_research_context.update(kwargs["research_context"])
917 # Do not log context keys as they may contain sensitive information
918 logger.info(f"Created shared_research_context for user: {username}")
920 # Set search context for search tracking
921 set_search_context(shared_research_context)
923 # Per-research dedup state for step message persistence
924 last_step_phase = None
926 # Pre-bind streaming_state so progress_callback's closure cell has a
927 # value even if an exception fires between this point and the full
928 # initialization later in this function. Without this, the except handler's
929 # call to progress_callback during a concurrent termination raises
930 # UnboundLocalError, silently skipping the DB FAILED update and the
931 # error socket emit. The full streaming_state is reassigned below
932 # before any real streaming starts; keys must match the canonical shape.
933 streaming_state: dict = {
934 "chunks_sent": 0,
935 "chunks": [],
936 "_bytes": 0,
937 "_truncated": False,
938 }
939 streaming_enabled = False
941 # Set up progress callback
942 def progress_callback(message, progress_percent, metadata):
943 nonlocal last_step_phase
944 # Frequent termination check
945 if is_termination_requested(research_id):
946 # Persist the partial chat row + emit final chunk BEFORE
947 # handle_termination — afterwards the Socket.IO room is gone.
948 _save_partial_chat_message_on_terminate(
949 shared_research_context.get("chat_session_id"),
950 research_id,
951 username,
952 "".join(streaming_state.get("chunks", [])),
953 truncated=streaming_state.get("_truncated", False),
954 streaming_state=streaming_state,
955 )
956 handle_termination(research_id, username)
957 streaming_state["_termination_handled"] = True
958 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try
959 "Research was terminated by user"
960 )
962 # Silent phase — no UI logging or socket emission needed
963 if metadata.get("phase") == "termination_check":
964 return
966 # Bind research_id AND username so the database_sink + queue
967 # daemon can resolve the per-user encrypted DB. Without username
968 # the daemon's _write_log_to_database hits "No authenticated
969 # user", silently swallows the error, and ResearchLog ends up
970 # with zero milestone rows — leaving /api/research/<id>/status
971 # without a log_entry to render and the frontend stuck on the
972 # "Performing research..." fallback.
973 bound_logger = logger.bind(
974 research_id=research_id, username=username
975 )
976 bound_logger.log("MILESTONE", message)
978 if "SEARCH_PLAN:" in message: 978 ↛ 979line 978 didn't jump to line 979 because the condition on line 978 was never true
979 engines = message.split("SEARCH_PLAN:")[1].strip()
980 metadata["planned_engines"] = engines
981 metadata["phase"] = "search_planning" # Use existing phase
982 # Update shared context for token tracking
983 shared_research_context["search_engines_planned"] = engines
984 shared_research_context["research_phase"] = "search_planning"
986 if "ENGINE_SELECTED:" in message: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true
987 engine = message.split("ENGINE_SELECTED:")[1].strip()
988 metadata["selected_engine"] = engine
989 metadata["phase"] = "search" # Use existing 'search' phase
990 # Update shared context for token tracking
991 shared_research_context["search_engine_selected"] = engine
992 shared_research_context["research_phase"] = "search"
994 # Capture other research phases for better context tracking
995 if metadata.get("phase"): 995 ↛ 999line 995 didn't jump to line 999 because the condition on line 995 was always true
996 shared_research_context["research_phase"] = metadata["phase"]
998 # Update search iteration if available
999 if "iteration" in metadata:
1000 shared_research_context["search_iteration"] = metadata[
1001 "iteration"
1002 ]
1004 # Adjust progress based on research mode
1005 adjusted_progress = progress_percent
1006 if (
1007 mode == "detailed"
1008 and metadata.get("phase") == "output_generation"
1009 ):
1010 # For detailed mode, adjust the progress range for output generation
1011 adjusted_progress = min(80, progress_percent)
1012 elif (
1013 mode == "detailed"
1014 and metadata.get("phase") == "report_generation"
1015 ):
1016 # Scale the progress from 80% to 95% for the report generation phase
1017 if progress_percent is not None: 1017 ↛ 1031line 1017 didn't jump to line 1031 because the condition on line 1017 was always true
1018 normalized = progress_percent / 100
1019 adjusted_progress = 80 + (normalized * 15)
1020 elif (
1021 mode == "quick" and metadata.get("phase") == "output_generation"
1022 ):
1023 # For quick mode, ensure we're at least at 85% during output generation
1024 adjusted_progress = max(85, progress_percent)
1025 # Map any further progress within output_generation to 85-95% range
1026 if progress_percent is not None and progress_percent > 0: 1026 ↛ 1031line 1026 didn't jump to line 1031 because the condition on line 1026 was always true
1027 normalized = progress_percent / 100
1028 adjusted_progress = 85 + (normalized * 10)
1030 # Atomically update progress and check if research is still active
1031 if adjusted_progress is not None:
1032 adjusted_progress, still_active = (
1033 update_progress_and_check_active(
1034 research_id, adjusted_progress
1035 )
1036 )
1037 else:
1038 still_active = is_research_active(research_id)
1040 if still_active: 1040 ↛ exitline 1040 didn't return from function 'progress_callback' because the condition on line 1040 was always true
1041 # Queue the progress update to be processed in main thread
1042 if adjusted_progress is not None:
1043 from ..queue.processor_v2 import queue_processor
1045 if username: 1045 ↛ 1050line 1045 didn't jump to line 1050 because the condition on line 1045 was always true
1046 queue_processor.queue_progress_update(
1047 username, research_id, adjusted_progress
1048 )
1049 else:
1050 logger.warning(
1051 f"Cannot queue progress update for research {research_id} - no username available"
1052 )
1054 # Determine socket emit throttling
1055 phase = metadata.get("phase", "")
1056 is_final = (
1057 phase
1058 in (
1059 "complete",
1060 "error",
1061 "report_complete",
1062 )
1063 or adjusted_progress == 100
1064 )
1066 should_emit = is_final
1067 if not is_final:
1068 now = time.monotonic()
1069 with _last_emit_lock:
1070 last = _last_emit_times.get(research_id, 0)
1071 if now - last >= _EMIT_THROTTLE_SECONDS:
1072 _last_emit_times[research_id] = now
1073 should_emit = True
1074 # Periodic TTL cleanup for orphaned entries
1075 global _emit_cleanup_counter # noqa: PLW0603
1076 _emit_cleanup_counter += 1
1077 if _emit_cleanup_counter % 100 == 0: 1077 ↛ 1078line 1077 didn't jump to line 1078 because the condition on line 1077 was never true
1078 stale = [
1079 rid
1080 for rid, t in _last_emit_times.items()
1081 if now - t > _EMIT_TTL_SECONDS
1082 ]
1083 for rid in stale:
1084 del _last_emit_times[rid]
1086 # Build event data (before emit, shared scope)
1087 event_data = None
1088 if should_emit:
1089 event_data = {
1090 "progress": adjusted_progress,
1091 "message": message,
1092 "phase": phase,
1093 }
1094 # Include additional metadata for MCP/ReAct strategy display
1095 if metadata.get("thought"): 1095 ↛ 1096line 1095 didn't jump to line 1096 because the condition on line 1095 was never true
1096 event_data["thought"] = metadata["thought"]
1097 if metadata.get("tool"): 1097 ↛ 1098line 1097 didn't jump to line 1098 because the condition on line 1097 was never true
1098 event_data["tool"] = metadata["tool"]
1099 if metadata.get("arguments"): 1099 ↛ 1100line 1099 didn't jump to line 1100 because the condition on line 1099 was never true
1100 event_data["arguments"] = metadata["arguments"]
1101 if metadata.get("iteration"): 1101 ↛ 1102line 1101 didn't jump to line 1102 because the condition on line 1101 was never true
1102 event_data["iteration"] = metadata["iteration"]
1103 if metadata.get("error"):
1104 event_data["error"] = metadata["error"]
1105 if metadata.get("content"): 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true
1106 event_data["content"] = metadata["content"]
1108 # Persist step message to chat session BEFORE emitting socket
1109 # (ensures DB has the step when clients receive the event).
1110 #
1111 # Symmetry invariant: for chat sessions, what the user sees
1112 # live must equal what `loadSession` reconstructs on reload.
1113 # If dedup blocks persistence of a repeat step phase, drop
1114 # the socket emit too (unless this is a final-phase event
1115 # the completion handler depends on) so live UI doesn't
1116 # surface events that vanish on reload.
1117 _chat_session_id = shared_research_context.get(
1118 "chat_session_id"
1119 )
1120 if _chat_session_id:
1121 _persist, _suppress_emit = _chat_step_decision(
1122 phase, last_step_phase, is_final
1123 )
1124 if _persist: 1124 ↛ 1144line 1124 didn't jump to line 1144 because the condition on line 1124 was always true
1125 try:
1126 from ...chat.service import ChatService
1128 ChatService(username).add_progress_step(
1129 session_id=_chat_session_id,
1130 research_id=research_id,
1131 content=message,
1132 phase=phase,
1133 )
1134 last_step_phase = phase
1135 except Exception:
1136 logger.opt(exception=True).warning(
1137 "Failed to persist progress step"
1138 )
1139 # Symmetry invariant: if persistence failed
1140 # (e.g. OperationalError under DB contention),
1141 # suppress the live emit too — otherwise the
1142 # client sees a step that vanishes on reload.
1143 event_data = None
1144 if _suppress_emit: 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true
1145 event_data = None
1147 # Emit socket event AFTER DB persistence
1148 if event_data is not None:
1149 try:
1150 SocketIOService().emit_to_subscribers(
1151 "progress", research_id, event_data
1152 )
1153 except Exception:
1154 logger.exception("Socket emit error (non-critical)")
1156 # Function to check termination during long-running operations
1157 def check_termination():
1158 if is_termination_requested(research_id):
1159 _save_partial_chat_message_on_terminate(
1160 shared_research_context.get("chat_session_id"),
1161 research_id,
1162 username,
1163 "".join(streaming_state.get("chunks", [])),
1164 truncated=streaming_state.get("_truncated", False),
1165 streaming_state=streaming_state,
1166 )
1167 handle_termination(research_id, username)
1168 streaming_state["_termination_handled"] = True
1169 raise ResearchTerminatedException( # noqa: TRY301 — inside nested callback, not caught by enclosing try
1170 "Research was terminated by user during long-running operation"
1171 )
1172 return False # Not terminated
1174 # Configure the system with the specified parameters
1175 use_llm = None
1176 if model or search_engine or model_provider:
1177 # Log that we're overriding system settings
1178 logger.info(
1179 f"Overriding system settings with: provider={model_provider}, model={model}, search_engine={search_engine}"
1180 )
1182 # Override LLM if model or model_provider specified
1183 if model or model_provider:
1184 try:
1185 # Get LLM with the overridden settings
1186 # Use the shared_research_context which includes username
1187 use_llm = get_llm(
1188 model_name=model,
1189 provider=model_provider,
1190 openai_endpoint_url=custom_endpoint,
1191 research_id=research_id,
1192 research_context=shared_research_context,
1193 )
1195 logger.info(
1196 f"Successfully set LLM to: provider={model_provider}, model={model}"
1197 )
1198 except Exception as e:
1199 logger.exception(
1200 f"Error setting LLM provider={model_provider}, model={model}"
1201 )
1202 error_msg = str(e)
1203 # Surface configuration errors to user instead of silently continuing
1204 config_error_keywords = [
1205 "model path",
1206 "llamacpp",
1207 "cannot connect",
1208 "server",
1209 "not configured",
1210 "not responding",
1211 "directory",
1212 ".gguf",
1213 ]
1214 if any(
1215 keyword in error_msg.lower()
1216 for keyword in config_error_keywords
1217 ):
1218 # This is a configuration error the user can fix
1219 raise ValueError(
1220 f"LLM Configuration Error: {error_msg}"
1221 ) from e
1222 # For other errors, re-raise to avoid silent failures
1223 raise
1225 # Create search engine first if specified, to avoid default creation without username
1226 use_search = None
1227 if search_engine:
1228 try:
1229 # Create a new search object with these settings
1230 use_search = get_search(
1231 search_tool=search_engine,
1232 llm_instance=use_llm,
1233 username=username,
1234 settings_snapshot=settings_snapshot,
1235 )
1236 logger.info(
1237 f"Successfully created search engine: {search_engine}"
1238 )
1239 except Exception as e:
1240 logger.exception(
1241 f"Error creating search engine {search_engine}"
1242 )
1243 error_msg = str(e)
1244 # Surface configuration errors to user instead of silently continuing
1245 config_error_keywords = [
1246 "searxng",
1247 "instance_url",
1248 "api_key",
1249 "cannot connect",
1250 "connection",
1251 "timeout",
1252 "not configured",
1253 ]
1254 if any(
1255 keyword in error_msg.lower()
1256 for keyword in config_error_keywords
1257 ):
1258 # This is a configuration error the user can fix
1259 raise ValueError(
1260 f"Search Engine Configuration Error ({search_engine}): {error_msg}"
1261 ) from e
1262 # For other errors, re-raise to avoid silent failures
1263 raise
1265 # Set the progress callback in the system
1266 system = AdvancedSearchSystem(
1267 llm=use_llm, # type: ignore[arg-type]
1268 search=use_search, # type: ignore[arg-type]
1269 strategy_name=strategy,
1270 max_iterations=iterations,
1271 questions_per_iteration=questions_per_iteration,
1272 username=username,
1273 settings_snapshot=settings_snapshot,
1274 research_id=research_id,
1275 research_context=shared_research_context,
1276 )
1277 system.set_progress_callback(progress_callback)
1279 # Chat mode: set up LLM streaming callback for real-time response chunks
1280 streaming_enabled = False
1281 # chunks: server-side buffer so partial content survives termination
1282 # (the citation handler's local list is discarded on raise).
1283 # _bytes / _truncated cap the buffer to bound memory on pathologically
1284 # long answers; once capped we still forward to the frontend but stop
1285 # accumulating server-side.
1286 streaming_state = {
1287 "chunks_sent": 0,
1288 "chunks": [],
1289 "_bytes": 0,
1290 "_truncated": False,
1291 }
1292 chat_session_id = shared_research_context.get("chat_session_id")
1294 if chat_session_id: 1294 ↛ 1295line 1294 didn't jump to line 1295 because the condition on line 1294 was never true
1295 try:
1296 socket_service = SocketIOService()
1298 # Source resolver returns the strategy's currently-collected
1299 # source list. Late-bound so the streaming callback can
1300 # apply inline hyperlinks as the agent finishes adding to
1301 # all_links_of_system (sources may still be growing when
1302 # synthesis starts; reading via the closure picks up the
1303 # final list at chunk-emit time, not callback-build time).
1304 def _resolve_sources():
1305 if not hasattr(system, "all_links_of_system"):
1306 return []
1307 return list(system.all_links_of_system or [])
1309 # Build a formatter matching the user's report.citation_format
1310 # so live-display brackets ([[arxiv.org-1]] / [[arxiv-1]] /
1311 # [[1]] etc.) match what the final-save formatter will emit
1312 # — avoids a visible format-flip when handleResearchComplete
1313 # swaps in the DB-saved version.
1314 live_formatter = get_citation_formatter()
1316 stream_callback = _make_chat_stream_callback(
1317 research_id,
1318 streaming_state,
1319 socket_service,
1320 source_resolver=_resolve_sources,
1321 formatter=live_formatter,
1322 )
1324 # Hook into the citation handler's streaming
1325 if hasattr(system, "strategy") and hasattr(
1326 system.strategy, "citation_handler"
1327 ):
1328 handler = system.strategy.citation_handler
1329 if hasattr(handler, "set_stream_callback"):
1330 handler.set_stream_callback(stream_callback)
1331 streaming_enabled = True
1332 logger.info(
1333 f"Streaming enabled for chat {chat_session_id[:8]}..."
1334 )
1335 except Exception:
1336 # exception=True so the traceback is visible: streaming is
1337 # non-critical (research still completes without it), but a
1338 # silent warning would hide real bugs in the setup above.
1339 logger.opt(exception=True).warning(
1340 "Could not set up streaming (non-critical)"
1341 )
1343 # Helper to save chat message (closes over outer scope vars)
1344 def _maybe_save_chat_message(content):
1345 _chat_sid = shared_research_context.get("chat_session_id")
1346 if not _chat_sid: 1346 ↛ 1348line 1346 didn't jump to line 1348 because the condition on line 1346 was always true
1347 return
1348 try:
1349 _save_chat_message_and_context(
1350 _chat_sid,
1351 research_id,
1352 username,
1353 content,
1354 streaming_enabled,
1355 streaming_state,
1356 SocketIOService(),
1357 settings_snapshot=settings_snapshot,
1358 )
1359 except Exception:
1360 # Promoted from debug→warning: at debug level this is invisible
1361 # in production and the user sees "Research completed but no
1362 # report available" with no operator signal. Same rationale as
1363 # the accumulated_context handler in _save_chat_message_and_context.
1364 logger.opt(exception=True).warning(
1365 "Could not add message to chat session — assistant "
1366 "response NOT persisted; user will see 'no report available'"
1367 )
1368 # The DB write failed, so _save_chat_message_and_context never
1369 # emitted its is_final response_chunk. Emit one here so the
1370 # streaming UI clears its 'thinking' state instead of stalling.
1371 try:
1372 SocketIOService().emit_to_subscribers(
1373 "response_chunk",
1374 research_id,
1375 {"chunk": "", "is_streaming": True, "is_final": True},
1376 )
1377 except Exception:
1378 logger.opt(exception=True).debug(
1379 "Failed to emit final chunk after chat-persist error"
1380 )
1382 # Run the search
1383 progress_callback("Starting research process", 5, {"phase": "init"})
1385 try:
1386 results = system.analyze_topic(query)
1387 if mode == "quick":
1388 progress_callback(
1389 "Search complete, preparing to generate summary...",
1390 85,
1391 {"phase": "output_generation"},
1392 )
1393 else:
1394 progress_callback(
1395 "Search complete, generating output",
1396 80,
1397 {"phase": "output_generation"},
1398 )
1399 except Exception as search_error:
1400 # Better handling of specific search errors
1401 error_message = str(search_error)
1402 error_type = "unknown"
1404 # OpenAI-compatible runtime failures (LM Studio / vLLM / llama.cpp
1405 # server / OpenRouter / custom endpoint) -- rewrite to a message
1406 # that names the provider, base URL, and model (#3878).
1407 if model_provider in { 1407 ↛ 1417line 1407 didn't jump to line 1417 because the condition on line 1407 was never true
1408 "openai_endpoint",
1409 "lmstudio",
1410 "llamacpp",
1411 "openai",
1412 "openrouter",
1413 "google",
1414 "ionos",
1415 "xai",
1416 } and is_openai_compat_runtime_error(search_error):
1417 rewritten = friendly_openai_compatible_error(
1418 search_error,
1419 provider=model_provider,
1420 base_url=custom_endpoint,
1421 model=model,
1422 )
1423 raise RuntimeError(rewritten) from search_error
1425 # Extract error details for common issues
1426 if "status code: 503" in error_message:
1427 error_message = "Ollama AI service is unavailable (HTTP 503). Please check that Ollama is running properly on your system."
1428 error_type = "ollama_unavailable"
1429 elif "status code: 404" in error_message:
1430 error_message = "Ollama model not found (HTTP 404). Please check that you have pulled the required model."
1431 error_type = "model_not_found"
1432 elif "status code:" in error_message:
1433 # Extract the status code for other HTTP errors
1434 status_code = error_message.split("status code:")[1].strip()
1435 error_message = f"API request failed with status code {status_code}. Please check your configuration."
1436 error_type = "api_error"
1437 elif "connection" in error_message.lower():
1438 error_message = "Connection error. Please check that your LLM service (Ollama/API) is running and accessible."
1439 error_type = "connection_error"
1441 # Raise with improved error message
1442 raise RuntimeError(
1443 f"{error_message} (Error type: {error_type})"
1444 ) from search_error
1446 # Generate output based on mode
1447 if mode == "quick":
1448 # Quick Summary
1449 if results.get("findings") or results.get("formatted_findings"):
1450 raw_formatted_findings = results["formatted_findings"]
1452 # Check if formatted_findings contains an error message
1453 if isinstance(
1454 raw_formatted_findings, str
1455 ) and raw_formatted_findings.startswith("Error:"):
1456 logger.error(
1457 f"Detected error in formatted findings: {raw_formatted_findings[:100]}..."
1458 )
1460 # Determine error type for better user feedback
1461 error_type = "unknown"
1462 error_message = raw_formatted_findings.lower()
1464 if (
1465 "token limit" in error_message
1466 or "context length" in error_message
1467 ):
1468 error_type = "token_limit"
1469 # Log specific error type
1470 logger.warning(
1471 "Detected token limit error in synthesis"
1472 )
1474 # Update progress with specific error type
1475 progress_callback(
1476 "Synthesis hit token limits. Attempting fallback...",
1477 87,
1478 {
1479 "phase": "synthesis_error",
1480 "error_type": error_type,
1481 },
1482 )
1483 elif (
1484 "timeout" in error_message
1485 or "timed out" in error_message
1486 ):
1487 error_type = "timeout"
1488 logger.warning("Detected timeout error in synthesis")
1489 progress_callback(
1490 "Synthesis timed out. Attempting fallback...",
1491 87,
1492 {
1493 "phase": "synthesis_error",
1494 "error_type": error_type,
1495 },
1496 )
1497 elif "rate limit" in error_message:
1498 error_type = "rate_limit"
1499 logger.warning("Detected rate limit error in synthesis")
1500 progress_callback(
1501 "LLM rate limit reached. Attempting fallback...",
1502 87,
1503 {
1504 "phase": "synthesis_error",
1505 "error_type": error_type,
1506 },
1507 )
1508 elif (
1509 "connection" in error_message
1510 or "network" in error_message
1511 ):
1512 error_type = "connection"
1513 logger.warning("Detected connection error in synthesis")
1514 progress_callback(
1515 "Connection issue with LLM. Attempting fallback...",
1516 87,
1517 {
1518 "phase": "synthesis_error",
1519 "error_type": error_type,
1520 },
1521 )
1522 elif (
1523 "llm error" in error_message
1524 or "final answer synthesis fail" in error_message
1525 ):
1526 error_type = "llm_error"
1527 logger.warning(
1528 "Detected general LLM error in synthesis"
1529 )
1530 progress_callback(
1531 "LLM error during synthesis. Attempting fallback...",
1532 87,
1533 {
1534 "phase": "synthesis_error",
1535 "error_type": error_type,
1536 },
1537 )
1538 else:
1539 # Generic error
1540 logger.warning("Detected unknown error in synthesis")
1541 progress_callback(
1542 "Error during synthesis. Attempting fallback...",
1543 87,
1544 {
1545 "phase": "synthesis_error",
1546 "error_type": "unknown",
1547 },
1548 )
1550 # Extract synthesized content from findings if available
1551 synthesized_content = ""
1552 for finding in results.get("findings", []):
1553 if finding.get("phase") == "Final synthesis":
1554 synthesized_content = finding.get("content", "")
1555 break
1557 # Use synthesized content as fallback
1558 if (
1559 synthesized_content
1560 and not synthesized_content.startswith("Error:")
1561 ):
1562 logger.info(
1563 "Using existing synthesized content as fallback"
1564 )
1565 raw_formatted_findings = synthesized_content
1567 # Or use current_knowledge as another fallback
1568 elif results.get("current_knowledge"):
1569 logger.info("Using current_knowledge as fallback")
1570 raw_formatted_findings = results["current_knowledge"]
1572 # Or combine all finding contents as last resort
1573 elif results.get("findings"):
1574 logger.info("Combining all findings as fallback")
1575 # First try to use any findings that are not errors
1576 valid_findings = [
1577 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
1578 for finding in results.get("findings", [])
1579 if finding.get("content")
1580 and not finding.get("content", "").startswith(
1581 "Error:"
1582 )
1583 ]
1585 synthesis_error = raw_formatted_findings
1586 if valid_findings:
1587 raw_formatted_findings = (
1588 "# Research Results (Fallback Mode)\n\n"
1589 )
1590 raw_formatted_findings += "\n\n".join(
1591 valid_findings
1592 )
1593 raw_formatted_findings += (
1594 f"\n\n## Error Information\n{synthesis_error}"
1595 )
1596 else:
1597 # Last resort: use everything including errors
1598 raw_formatted_findings = (
1599 "# Research Results (Emergency Fallback)\n\n"
1600 )
1601 raw_formatted_findings += "The system encountered errors during final synthesis.\n\n"
1602 raw_formatted_findings += "\n\n".join(
1603 f"## {finding.get('phase', 'Finding')}\n\n{finding.get('content', '')}"
1604 for finding in results.get("findings", [])
1605 if finding.get("content")
1606 )
1608 progress_callback(
1609 f"Using fallback synthesis due to {error_type} error",
1610 88,
1611 {
1612 "phase": "synthesis_fallback",
1613 "error_type": error_type,
1614 },
1615 )
1617 logger.info(
1618 "Found formatted_findings of length: {}",
1619 len(str(raw_formatted_findings)),
1620 )
1622 try:
1623 # Check if we have an error in the findings and use enhanced error handling
1624 if isinstance(
1625 raw_formatted_findings, str
1626 ) and raw_formatted_findings.startswith("Error:"):
1627 logger.info(
1628 "Generating enhanced error report using ErrorReportGenerator"
1629 )
1631 # Generate comprehensive error report
1632 # ErrorReportGenerator does not use LLM (kept for compat)
1633 error_generator = ErrorReportGenerator()
1634 clean_markdown = error_generator.generate_error_report(
1635 error_message=raw_formatted_findings,
1636 query=query,
1637 partial_results=results,
1638 search_iterations=results.get("iterations", 0),
1639 research_id=research_id,
1640 )
1642 logger.info(
1643 "Generated enhanced error report with {} characters",
1644 len(clean_markdown),
1645 )
1646 else:
1647 # report_content stores the synthesized answer
1648 # only (see _extract_synthesized_answer for the
1649 # full rationale). Fall back to the formatted
1650 # blob only when neither Final synthesis nor
1651 # current_knowledge is populated — leaks
1652 # sources, but at least we save *something*.
1653 clean_markdown = (
1654 _extract_synthesized_answer(results)
1655 or raw_formatted_findings
1656 )
1658 # Pull sources from the search-system's accumulated link
1659 # buffer first — same source the detailed-report path
1660 # uses at the equivalent point below. Wrapper
1661 # strategies (e.g. EnhancedContextualFollowUpStrategy,
1662 # IterativeRefinementStrategy) delegate the actual
1663 # search to an inner strategy that populates
1664 # `self.all_links_of_system`, but they don't bubble
1665 # that list back into the result dict's `findings`. So
1666 # the legacy `findings[*].search_results` extraction
1667 # below stays empty for chat follow-ups, leaving the
1668 # citation formatter with no urls to hyperlink. Prefer
1669 # the system-level accumulator; fall back to the
1670 # legacy extraction so direct strategies that bypass
1671 # the system buffer still work.
1672 all_links = list(
1673 getattr(system, "all_links_of_system", None) or []
1674 )
1675 if not all_links: 1675 ↛ 1689line 1675 didn't jump to line 1689 because the condition on line 1675 was always true
1676 for finding in results.get("findings", []):
1677 search_results = finding.get("search_results", [])
1678 if search_results:
1679 try:
1680 links = extract_links_from_search_results(
1681 search_results
1682 )
1683 all_links.extend(links)
1684 except Exception:
1685 logger.exception(
1686 "Error processing search results/links"
1687 )
1689 logger.info(
1690 "Successfully converted to clean markdown of length: {}",
1691 len(clean_markdown),
1692 )
1694 # First send a progress update for generating the summary
1695 progress_callback(
1696 "Generating clean summary from research data...",
1697 90,
1698 {"phase": "output_generation"},
1699 )
1701 # Send progress update for saving report
1702 progress_callback(
1703 "Saving research report to database...",
1704 95,
1705 {"phase": "report_complete"},
1706 )
1708 # Format citations in the markdown content. The
1709 # split returns the answer-with-hyperlinks half
1710 # separately from the trailing sources section the
1711 # LLM may have emitted (which is discarded — sources
1712 # live in research_resources, the canonical store).
1713 # When no Sources section is found, fall back to
1714 # structured-source hyperlinking — never re-parse
1715 # concatenated formatter output downstream.
1716 formatter = get_citation_formatter()
1717 try:
1718 answer_with_links, llm_sources = (
1719 formatter.format_document_split(clean_markdown)
1720 )
1721 if not llm_sources: 1721 ↛ 1733line 1721 didn't jump to line 1733 because the condition on line 1721 was always true
1722 answer_with_links = (
1723 formatter.apply_inline_hyperlinks(
1724 clean_markdown, all_links
1725 )
1726 )
1727 # Safety check: a >50% strip on a long input
1728 # likely means the regex over-stripped on a
1729 # "Sources:" header inside the answer body.
1730 # Fall back to structured-source hyperlinking
1731 # on the full text. Min-length floor prevents
1732 # false-fires on legitimately short answers.
1733 SAFETY_MIN_LEN = 800
1734 if ( 1734 ↛ 1740line 1734 didn't jump to line 1740 because the condition on line 1734 was never true
1735 llm_sources
1736 and len(clean_markdown) > SAFETY_MIN_LEN
1737 and len(answer_with_links)
1738 < len(clean_markdown) * 0.5
1739 ):
1740 logger.warning(
1741 "format_document_split appears to have "
1742 "over-stripped (answer={} chars, "
1743 "original={} chars) for research {}. "
1744 "Falling back to structured-source "
1745 "hyperlinking on full input.",
1746 len(answer_with_links),
1747 len(clean_markdown),
1748 research_id,
1749 )
1750 answer_with_links = (
1751 formatter.apply_inline_hyperlinks(
1752 clean_markdown, all_links
1753 )
1754 )
1755 except Exception:
1756 # Hyperlinking is quality-of-life, not a hard
1757 # requirement. If anything blows up, save the
1758 # raw LLM text rather than fail the research.
1759 logger.exception(
1760 "Citation formatter failed; saving raw answer"
1761 )
1762 answer_with_links = clean_markdown
1764 # report_content stores ONLY the synthesized answer.
1765 # The legacy "answer + ## Sources + ## Research
1766 # Metrics" view is reconstructed at render time by
1767 # report_assembly_service.assemble_full_report.
1768 full_report_content = answer_with_links
1770 # Save report FIRST, then sources:
1771 # a chat read between commits sees a report with no
1772 # sources (assembler renders just the answer) — better
1773 # failure mode than partial assembly with sources but
1774 # no answer body.
1775 from ...storage import get_report_storage
1777 with get_user_db_session(username) as db_session:
1778 storage = get_report_storage(session=db_session)
1780 # Prepare metadata
1781 metadata = {
1782 "iterations": results["iterations"],
1783 "generated_at": datetime.now(UTC).isoformat(),
1784 }
1786 # Save report using storage abstraction
1787 success = storage.save_report(
1788 research_id=research_id,
1789 content=full_report_content,
1790 metadata=metadata,
1791 username=username,
1792 )
1794 if not success:
1795 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except
1797 # Save sources to database (non-fatal - report
1798 # already saved; sources missing is recoverable
1799 # because the assembler omits empty Sources blocks)
1800 try:
1801 from .research_sources_service import (
1802 ResearchSourcesService,
1803 )
1805 sources_service = ResearchSourcesService()
1806 if all_links:
1807 logger.info(
1808 f"Quick summary: Saving {len(all_links)} sources to database"
1809 )
1810 sources_saved = (
1811 sources_service.save_research_sources(
1812 research_id=research_id,
1813 sources=all_links,
1814 username=username,
1815 )
1816 )
1817 logger.info(
1818 f"Quick summary: Saved {sources_saved} sources for research {research_id}"
1819 )
1820 except Exception:
1821 logger.exception(
1822 f"Failed to save sources for research {research_id} (continuing with report save)"
1823 )
1825 logger.info(f"Report saved for research_id: {research_id}")
1827 # Skip export to additional formats - we're storing in database only
1829 # Update research status in database
1830 completed_at = datetime.now(UTC).isoformat()
1832 with get_user_db_session(username) as db_session:
1833 research = (
1834 db_session.query(ResearchHistory)
1835 .filter_by(id=research_id)
1836 .first()
1837 )
1839 # Preserve existing metadata and update with new values
1840 metadata = _parse_research_metadata(
1841 research.research_meta
1842 )
1844 metadata.update(
1845 {
1846 "iterations": results["iterations"],
1847 "generated_at": datetime.now(UTC).isoformat(),
1848 }
1849 )
1851 # Use the helper function for consistent duration calculation
1852 duration_seconds = calculate_duration(
1853 research.created_at, completed_at
1854 )
1856 research.status = ResearchStatus.COMPLETED
1857 research.completed_at = completed_at
1858 research.duration_seconds = duration_seconds
1859 # Note: report_content is saved by CachedResearchService
1860 # report_path is not used in encrypted database version
1862 # Generate headline and topics only for news searches
1863 if (
1864 metadata.get("is_news_search")
1865 or metadata.get("search_type") == "news_analysis"
1866 ):
1867 try:
1868 from ...news.utils.headline_generator import (
1869 generate_headline,
1870 )
1871 from ...news.utils.topic_generator import (
1872 generate_topics,
1873 )
1875 # Get the report content from database for better headline/topic generation
1876 report_content = ""
1877 try:
1878 research = (
1879 db_session.query(ResearchHistory)
1880 .filter_by(id=research_id)
1881 .first()
1882 )
1883 if research and research.report_content: 1883 ↛ 1889line 1883 didn't jump to line 1889 because the condition on line 1883 was always true
1884 report_content = research.report_content
1885 logger.info(
1886 f"Retrieved {len(report_content)} chars from database for headline generation"
1887 )
1888 else:
1889 logger.warning(
1890 f"No report content found in database for research_id: {research_id}"
1891 )
1892 except Exception:
1893 logger.warning(
1894 "Could not retrieve report content from database"
1895 )
1897 # Generate headline
1898 logger.info(
1899 f"Generating headline for query: {query[:100]}"
1900 )
1901 headline = generate_headline(
1902 query, report_content
1903 )
1904 metadata["generated_headline"] = headline
1906 # Generate topics
1907 logger.info(
1908 f"Generating topics with category: {metadata.get('category', 'News')}"
1909 )
1910 topics = generate_topics(
1911 query=query,
1912 findings=report_content,
1913 category=metadata.get("category", "News"),
1914 max_topics=6,
1915 )
1916 metadata["generated_topics"] = topics
1918 logger.info(f"Generated headline: {headline}")
1919 logger.info(f"Generated topics: {topics}")
1921 except Exception:
1922 logger.warning(
1923 "Could not generate headline/topics"
1924 )
1926 research.research_meta = metadata
1928 db_session.commit()
1929 logger.info(
1930 f"Database commit completed for research_id: {research_id}"
1931 )
1933 # Update subscription if this was triggered by a subscription
1934 if metadata.get("subscription_id"):
1935 try:
1936 from ...news.subscription_manager.storage import (
1937 SQLSubscriptionStorage,
1938 )
1939 from datetime import (
1940 datetime as dt,
1941 timezone,
1942 timedelta,
1943 )
1945 sub_storage = SQLSubscriptionStorage(db_session)
1946 subscription_id = metadata["subscription_id"]
1948 # Get subscription to find refresh interval
1949 subscription = sub_storage.get(subscription_id)
1950 if subscription: 1950 ↛ 1979line 1950 didn't jump to line 1979
1951 refresh_minutes = subscription.get(
1952 "refresh_minutes", 240
1953 )
1954 now = dt.now(timezone.utc)
1955 next_refresh = now + timedelta(
1956 minutes=refresh_minutes
1957 )
1959 # Update refresh times
1960 sub_storage.update_refresh_time(
1961 subscription_id=subscription_id,
1962 last_refresh=now,
1963 next_refresh=next_refresh,
1964 )
1966 # Increment stats
1967 sub_storage.increment_stats(
1968 subscription_id, 1
1969 )
1971 logger.info(
1972 f"Updated subscription {subscription_id} refresh times"
1973 )
1974 except Exception:
1975 logger.warning(
1976 "Could not update subscription refresh time"
1977 )
1979 logger.info(
1980 f"Database updated successfully for research_id: {research_id}"
1981 )
1983 _maybe_save_chat_message(full_report_content)
1985 # Send the final completion message
1986 progress_callback(
1987 "Research completed successfully",
1988 100,
1989 {"phase": "complete"},
1990 )
1992 # Clean up resources
1993 logger.info(
1994 "Cleaning up resources for research_id: {}", research_id
1995 )
1996 cleanup_research_resources(
1997 research_id, username, user_password=user_password
1998 )
1999 logger.info(
2000 "Resources cleaned up for research_id: {}", research_id
2001 )
2003 except Exception as inner_e:
2004 logger.exception("Error during quick summary generation")
2005 raise RuntimeError(
2006 f"Error generating quick summary: {inner_e!s}"
2007 )
2008 else:
2009 raise RuntimeError( # noqa: TRY301 — triggers research failure handling in outer except
2010 "No research findings were generated. Please try again."
2011 )
2012 else:
2013 # Full Report
2014 progress_callback(
2015 "Generating detailed report...",
2016 85,
2017 {"phase": "report_generation"},
2018 )
2020 # Extract the search system from the results if available
2021 search_system = results.get("search_system", None)
2023 # Wrapper that maps report generator's 0-100% to 85-95% range
2024 # and relays cancellation checks through the outer progress_callback
2025 def report_progress_callback(message, progress_percent, metadata):
2026 if progress_percent is not None:
2027 adjusted = 85 + (progress_percent / 100) * 10
2028 else:
2029 adjusted = progress_percent
2030 progress_callback(message, adjusted, metadata)
2032 # Pass the existing search system to maintain citation indices
2033 report_generator = IntegratedReportGenerator(
2034 search_system=search_system,
2035 settings_snapshot=settings_snapshot,
2036 )
2037 final_report = report_generator.generate_report(
2038 results, query, progress_callback=report_progress_callback
2039 )
2041 progress_callback(
2042 "Report generation complete", 95, {"phase": "report_complete"}
2043 )
2045 # Format citations and split off the trailing Sources
2046 # section. Save only the answer half — sources are
2047 # persisted structurally to research_resources.
2048 all_links = (
2049 getattr(search_system, "all_links_of_system", None) or []
2050 )
2051 formatter = get_citation_formatter()
2052 try:
2053 answer_with_links, llm_sources = (
2054 formatter.format_document_split(final_report["content"])
2055 )
2056 if not llm_sources:
2057 answer_with_links = formatter.apply_inline_hyperlinks(
2058 final_report["content"], all_links
2059 )
2060 SAFETY_MIN_LEN = 800
2061 if (
2062 llm_sources
2063 and len(final_report["content"]) > SAFETY_MIN_LEN
2064 and len(answer_with_links)
2065 < len(final_report["content"]) * 0.5
2066 ):
2067 logger.warning(
2068 "format_document_split appears to have over-stripped "
2069 "(answer={} chars, original={} chars) for research {}.",
2070 len(answer_with_links),
2071 len(final_report["content"]),
2072 research_id,
2073 )
2074 answer_with_links = formatter.apply_inline_hyperlinks(
2075 final_report["content"], all_links
2076 )
2077 except Exception:
2078 logger.exception("Citation formatter failed; saving raw answer")
2079 answer_with_links = final_report["content"]
2080 formatted_content = answer_with_links
2082 # Save report FIRST, sources after.
2083 # See quick-summary path for rationale.
2084 with get_user_db_session(username) as db_session:
2085 # Update metadata
2086 metadata = final_report["metadata"]
2087 metadata["iterations"] = results["iterations"]
2089 # Save report to database
2090 try:
2091 research = (
2092 db_session.query(ResearchHistory)
2093 .filter_by(id=research_id)
2094 .first()
2095 )
2097 if not research: 2097 ↛ 2098line 2097 didn't jump to line 2098 because the condition on line 2097 was never true
2098 logger.error(f"Research {research_id} not found")
2099 success = False
2100 else:
2101 research.report_content = formatted_content
2102 if research.research_meta: 2102 ↛ 2103line 2102 didn't jump to line 2103 because the condition on line 2102 was never true
2103 research.research_meta.update(metadata)
2104 else:
2105 research.research_meta = metadata
2106 db_session.commit()
2107 success = True
2108 logger.info(
2109 f"Saved report for research {research_id} to database"
2110 )
2111 except Exception:
2112 logger.exception("Error saving report to database")
2113 db_session.rollback()
2114 success = False
2116 if not success: 2116 ↛ 2117line 2116 didn't jump to line 2117 because the condition on line 2116 was never true
2117 raise RuntimeError("Failed to save research report") # noqa: TRY301 — triggers research failure handling in outer except
2119 logger.info(
2120 f"Report saved to database for research_id: {research_id}"
2121 )
2123 # Save sources AFTER report (non-fatal; assembler omits
2124 # empty Sources blocks if this fails).
2125 try:
2126 from .research_sources_service import ResearchSourcesService
2128 sources_service = ResearchSourcesService()
2129 if all_links:
2130 logger.info(f"Saving {len(all_links)} sources to database")
2131 sources_saved = sources_service.save_research_sources(
2132 research_id=research_id,
2133 sources=all_links,
2134 username=username,
2135 )
2136 logger.info(
2137 f"Saved {sources_saved} sources for research {research_id}"
2138 )
2139 except Exception:
2140 logger.exception(
2141 f"Failed to save sources for research {research_id} (continuing)"
2142 )
2144 # Update research status in database
2145 completed_at = datetime.now(UTC).isoformat()
2147 with get_user_db_session(username) as db_session:
2148 research = (
2149 db_session.query(ResearchHistory)
2150 .filter_by(id=research_id)
2151 .first()
2152 )
2154 # Preserve existing metadata and merge with report metadata
2155 metadata = _parse_research_metadata(research.research_meta)
2157 metadata.update(final_report["metadata"])
2158 metadata["iterations"] = results["iterations"]
2160 # Use the helper function for consistent duration calculation
2161 duration_seconds = calculate_duration(
2162 research.created_at, completed_at
2163 )
2165 research.status = ResearchStatus.COMPLETED
2166 research.completed_at = completed_at
2167 research.duration_seconds = duration_seconds
2168 # Note: report_content is saved by CachedResearchService
2169 # report_path is not used in encrypted database version
2171 # Generate headline and topics only for news searches
2172 if ( 2172 ↛ 2176line 2172 didn't jump to line 2176 because the condition on line 2172 was never true
2173 metadata.get("is_news_search")
2174 or metadata.get("search_type") == "news_analysis"
2175 ):
2176 try:
2177 from ...news.utils.headline_generator import (
2178 generate_headline, # type: ignore[no-redef]
2179 )
2180 from ...news.utils.topic_generator import (
2181 generate_topics, # type: ignore[no-redef]
2182 )
2184 # Get the report content from database for better headline/topic generation
2185 report_content = ""
2186 try:
2187 research = (
2188 db_session.query(ResearchHistory)
2189 .filter_by(id=research_id)
2190 .first()
2191 )
2192 if research and research.report_content:
2193 report_content = research.report_content
2194 else:
2195 logger.warning(
2196 f"No report content found in database for research_id: {research_id}"
2197 )
2198 except Exception:
2199 logger.warning(
2200 "Could not retrieve report content from database"
2201 )
2203 # Generate headline
2204 headline = generate_headline(query, report_content)
2205 metadata["generated_headline"] = headline
2207 # Generate topics
2208 topics = generate_topics(
2209 query=query,
2210 findings=report_content,
2211 category=metadata.get("category", "News"),
2212 max_topics=6,
2213 )
2214 metadata["generated_topics"] = topics
2216 logger.info(f"Generated headline: {headline}")
2217 logger.info(f"Generated topics: {topics}")
2219 except Exception:
2220 logger.warning("Could not generate headline/topics")
2222 research.research_meta = metadata
2224 db_session.commit()
2226 # Update subscription if this was triggered by a subscription
2227 if metadata.get("subscription_id"): 2227 ↛ 2228line 2227 didn't jump to line 2228 because the condition on line 2227 was never true
2228 try:
2229 from ...news.subscription_manager.storage import (
2230 SQLSubscriptionStorage,
2231 )
2232 from datetime import datetime as dt, timezone, timedelta
2234 sub_storage = SQLSubscriptionStorage(db_session)
2235 subscription_id = metadata["subscription_id"]
2237 # Get subscription to find refresh interval
2238 subscription = sub_storage.get(subscription_id)
2239 if subscription:
2240 refresh_minutes = subscription.get(
2241 "refresh_minutes", 240
2242 )
2243 now = dt.now(timezone.utc)
2244 next_refresh = now + timedelta(
2245 minutes=refresh_minutes
2246 )
2248 # Update refresh times
2249 sub_storage.update_refresh_time(
2250 subscription_id=subscription_id,
2251 last_refresh=now,
2252 next_refresh=next_refresh,
2253 )
2255 # Increment stats
2256 sub_storage.increment_stats(subscription_id, 1)
2258 logger.info(
2259 f"Updated subscription {subscription_id} refresh times"
2260 )
2261 except Exception:
2262 logger.warning(
2263 "Could not update subscription refresh time"
2264 )
2266 _maybe_save_chat_message(formatted_content)
2268 progress_callback(
2269 "Research completed successfully",
2270 100,
2271 {"phase": "complete"},
2272 )
2274 # Clean up resources
2275 cleanup_research_resources(
2276 research_id, username, user_password=user_password
2277 )
2279 except ResearchTerminatedException:
2280 logger.info(f"Research {research_id} terminated by user")
2281 # Fallback path: when termination was raised from the streaming
2282 # callback (mid-stream interrupt), progress_callback hasn't run
2283 # since the flag was set, so handle_termination() was NOT called
2284 # and the partial row hasn't been persisted yet. The helper is
2285 # idempotent via streaming_state["_persisted"]; if the in-callback
2286 # path already ran, both calls are no-ops.
2287 _save_partial_chat_message_on_terminate(
2288 shared_research_context.get("chat_session_id"),
2289 research_id,
2290 username,
2291 "".join(streaming_state.get("chunks", [])),
2292 truncated=streaming_state.get("_truncated", False),
2293 streaming_state=streaming_state,
2294 )
2295 # Ensure the SUSPENDED status update + cleanup runs even when the
2296 # exception was raised mid-stream. The in-callback termination paths
2297 # set "_termination_handled"; only run here when they did NOT, so a
2298 # single termination doesn't queue two SUSPENDED updates, emit two
2299 # final socket messages, and (in test mode) sleep twice.
2300 if not streaming_state.get("_termination_handled"):
2301 try:
2302 handle_termination(research_id, username)
2303 except Exception:
2304 logger.opt(exception=True).debug(
2305 "handle_termination in except block failed"
2306 )
2308 except Exception as e:
2309 # Handle error
2310 error_message = f"Research failed: {e!s}"
2311 logger.exception(error_message)
2313 try:
2314 # Check for common Ollama error patterns in the exception and provide more user-friendly errors
2315 user_friendly_error = str(e)
2316 error_context = {}
2318 if "Error type: ollama_unavailable" in user_friendly_error:
2319 user_friendly_error = "Ollama AI service is unavailable. Please check that Ollama is running properly on your system."
2320 error_context = {
2321 "solution": "Start Ollama with 'ollama serve' or check if it's installed correctly."
2322 }
2323 elif "Error type: model_not_found" in user_friendly_error:
2324 user_friendly_error = "Required Ollama model not found. Please pull the model first."
2325 error_context = {
2326 "solution": "Run 'ollama pull mistral' to download the required model."
2327 }
2328 elif "Error type: connection_error" in user_friendly_error:
2329 user_friendly_error = "Connection error with LLM service. Please check that your AI service is running."
2330 error_context = {
2331 "solution": "Ensure Ollama or your API service is running and accessible."
2332 }
2333 elif "Error type: api_error" in user_friendly_error:
2334 # Keep the original error message as it's already improved
2335 error_context = {
2336 "solution": "Check API configuration and credentials."
2337 }
2338 # OpenAI-compatible runtime tokens (#3878). The friendly message
2339 # built by friendly_openai_compatible_error() already names the
2340 # provider, base URL, and model -- keep it as-is.
2341 elif "Error type: openai_connection_refused" in user_friendly_error: 2341 ↛ 2342line 2341 didn't jump to line 2342 because the condition on line 2341 was never true
2342 error_context = {
2343 "solution": "Start your LLM server (LM Studio / vLLM / llama.cpp server) and verify the base URL in Settings -> LLM Providers."
2344 }
2345 elif "Error type: openai_timeout" in user_friendly_error: 2345 ↛ 2346line 2345 didn't jump to line 2346 because the condition on line 2345 was never true
2346 error_context = {
2347 "solution": "The server is reachable but slow -- it may be loading a model. Retry, or increase the request timeout."
2348 }
2349 elif "Error type: openai_auth" in user_friendly_error: 2349 ↛ 2350line 2349 didn't jump to line 2350 because the condition on line 2349 was never true
2350 error_context = {
2351 "solution": "Set or correct the API key for this provider in Settings -> LLM Providers. Local servers usually accept any non-empty key."
2352 }
2353 elif "Error type: openai_permission_denied" in user_friendly_error: 2353 ↛ 2354line 2353 didn't jump to line 2354 because the condition on line 2353 was never true
2354 error_context = {
2355 "solution": "Your API key is valid but lacks access to this model. Pick a model your account/server is permitted to use."
2356 }
2357 elif "Error type: openai_model_not_found" in user_friendly_error: 2357 ↛ 2358line 2357 didn't jump to line 2358 because the condition on line 2357 was never true
2358 error_context = {
2359 "solution": "The model id is not loaded on this server. Pick a currently-loaded model in the provider's UI/config."
2360 }
2361 elif "Error type: openai_bad_request" in user_friendly_error: 2361 ↛ 2362line 2361 didn't jump to line 2362 because the condition on line 2361 was never true
2362 error_context = {
2363 "solution": "The server rejected the request. Check the model id and any provider-specific parameters."
2364 }
2365 elif "Error type: openai_unknown" in user_friendly_error: 2365 ↛ 2366line 2365 didn't jump to line 2366 because the condition on line 2365 was never true
2366 error_context = {
2367 "solution": "Check the provider's logs for the full error and verify the base URL / model id."
2368 }
2369 elif "Error type: openai_rate_limit" in user_friendly_error: 2369 ↛ 2370line 2369 didn't jump to line 2370 because the condition on line 2369 was never true
2370 error_context = {
2371 "solution": "The provider rate-limited the request. Wait a moment and retry, or enable LLM Rate Limiting in Settings."
2372 }
2374 # Generate enhanced error report for failed research
2375 enhanced_report_content = None
2376 try:
2377 # Get partial results if they exist
2378 partial_results = results if "results" in locals() else None
2379 search_iterations = (
2380 results.get("iterations", 0) if partial_results else 0
2381 )
2383 # Generate comprehensive error report
2384 # ErrorReportGenerator does not use LLM (kept for compat)
2385 error_generator = ErrorReportGenerator()
2386 enhanced_report_content = error_generator.generate_error_report(
2387 error_message=f"Research failed: {e!s}",
2388 query=query,
2389 partial_results=partial_results,
2390 search_iterations=search_iterations,
2391 research_id=research_id,
2392 )
2394 logger.info(
2395 "Generated enhanced error report for failed research (length: {})",
2396 len(enhanced_report_content),
2397 )
2399 # Save enhanced error report to encrypted database
2400 try:
2401 # username already available from function scope (line 281)
2402 if username: 2402 ↛ 2424line 2402 didn't jump to line 2424 because the condition on line 2402 was always true
2403 from ...storage import get_report_storage
2405 with get_user_db_session(username) as db_session:
2406 storage = get_report_storage(session=db_session)
2407 success = storage.save_report(
2408 research_id=research_id,
2409 content=enhanced_report_content,
2410 metadata={"error_report": True},
2411 username=username,
2412 )
2413 if success:
2414 logger.info(
2415 "Saved enhanced error report to encrypted database for research {}",
2416 research_id,
2417 )
2418 else:
2419 logger.warning(
2420 "Failed to save enhanced error report to database for research {}",
2421 research_id,
2422 )
2423 else:
2424 logger.warning(
2425 "Cannot save error report: username not available"
2426 )
2428 except Exception as report_error:
2429 logger.exception(
2430 "Failed to save enhanced error report: {}", report_error
2431 )
2433 except Exception as error_gen_error:
2434 logger.exception(
2435 "Failed to generate enhanced error report: {}",
2436 error_gen_error,
2437 )
2438 enhanced_report_content = None
2440 # Get existing metadata from database first
2441 existing_metadata = {}
2442 try:
2443 # username already available from function scope (line 281)
2444 if username: 2444 ↛ 2457line 2444 didn't jump to line 2457 because the condition on line 2444 was always true
2445 with get_user_db_session(username) as db_session:
2446 research = (
2447 db_session.query(ResearchHistory)
2448 .filter_by(id=research_id)
2449 .first()
2450 )
2451 if research and research.research_meta:
2452 existing_metadata = dict(research.research_meta)
2453 except Exception:
2454 logger.exception("Failed to get existing metadata")
2456 # Update metadata with more context about the error while preserving existing values
2457 metadata = existing_metadata
2458 metadata.update({"phase": "error", "error": user_friendly_error})
2459 if error_context:
2460 metadata.update(error_context)
2461 if enhanced_report_content: 2461 ↛ 2465line 2461 didn't jump to line 2465 because the condition on line 2461 was always true
2462 metadata["has_enhanced_report"] = True
2464 # If we still have an active research record, update its log
2465 if is_research_active(research_id):
2466 progress_callback(user_friendly_error, None, metadata)
2468 # If termination was requested, mark as suspended instead of failed
2469 status = (
2470 ResearchStatus.SUSPENDED
2471 if is_termination_requested(research_id)
2472 else ResearchStatus.FAILED
2473 )
2474 message = (
2475 "Research was terminated by user"
2476 if status == ResearchStatus.SUSPENDED
2477 else user_friendly_error
2478 )
2480 # Calculate duration up to termination point - using UTC consistently
2481 now = datetime.now(UTC)
2482 completed_at = now.isoformat()
2484 # NOTE: Database updates from threads are handled by queue processor
2485 # The queue_processor.queue_error_update() method is already being used below
2486 # to safely update the database from the main thread
2488 # Queue the error update to be processed in main thread
2489 # Using the queue processor v2 system
2490 from ..queue.processor_v2 import queue_processor
2492 if username: 2492 ↛ 2506line 2492 didn't jump to line 2506 because the condition on line 2492 was always true
2493 queue_processor.queue_error_update(
2494 username=username,
2495 research_id=research_id,
2496 status=status,
2497 error_message=message,
2498 metadata=metadata,
2499 completed_at=completed_at,
2500 report_path=None,
2501 )
2502 logger.info(
2503 f"Queued error update for research {research_id} with status '{status}'"
2504 )
2505 else:
2506 logger.error(
2507 f"Cannot queue error update for research {research_id} - no username provided. "
2508 f"Status: '{status}', Message: {message}"
2509 )
2511 try:
2512 SocketIOService().emit_to_subscribers(
2513 "progress",
2514 research_id,
2515 {"status": status, "error": message},
2516 )
2517 except Exception:
2518 logger.exception("Failed to emit error via socket")
2520 # Add error message to chat session if applicable.
2521 # Read chat_session_id from shared_research_context (the canonical
2522 # source — kept consistent with the six other reads in this file).
2523 chat_session_id = shared_research_context.get("chat_session_id")
2524 if chat_session_id and username:
2525 try:
2526 from ...chat.service import ChatService
2528 chat_service = ChatService(username)
2529 # allow_archived=True: same multi-tab race rationale as
2530 # the completion / stop-and-partial paths — if the user
2531 # archived (or deleted) the session between research
2532 # start and failure, the error message would otherwise
2533 # be silently dropped by the active-only insert guard
2534 # and the user sees nothing in the chat.
2535 chat_service.add_message(
2536 session_id=chat_session_id,
2537 role="assistant",
2538 content=f"Sorry, the research failed: {message}",
2539 message_type="response",
2540 allow_archived=True,
2541 )
2542 except Exception:
2543 # Promoted from debug → warning to match the success-path
2544 # rationale: if this write fails the user never sees the
2545 # error and operators get no signal at debug-off level.
2546 logger.opt(exception=True).warning(
2547 "Could not add error message to chat session"
2548 )
2550 except Exception:
2551 logger.exception("Error in error handler")
2553 # Clean up resources. This is the error path, so report FAILED on
2554 # the final socket message rather than a spurious "completed".
2555 cleanup_research_resources(
2556 research_id,
2557 username,
2558 user_password=user_password,
2559 final_status=ResearchStatus.FAILED,
2560 )
2562 finally:
2563 # RESOURCE CLEANUP: Close search engine HTTP sessions.
2564 #
2565 # Search engines (created via get_search()) may hold HTTP connection
2566 # pools. Currently only SemanticScholarSearchEngine creates a
2567 # persistent SafeSession; other engines use stateless safe_get()/
2568 # safe_post() utility functions. However, BaseSearchEngine.close()
2569 # is safe to call on any engine — it checks for a 'session'
2570 # attribute and is fully idempotent (SemanticScholar sets
2571 # self.session = None after close).
2572 #
2573 # Neither @thread_cleanup nor cleanup_research_resources() close
2574 # the search engine — @thread_cleanup only handles database sessions
2575 # and context cleanup, and cleanup_research_resources() only handles
2576 # status updates, notifications, and tracking dict removal.
2577 #
2578 # Without this explicit close, search engine sessions rely on
2579 # Python's non-deterministic garbage collection (__del__) for
2580 # cleanup, which can cause file descriptor exhaustion under
2581 # sustained load.
2582 from ...utilities.resource_utils import safe_close
2584 if "use_search" in locals():
2585 safe_close(use_search, "research search engine")
2586 # Close search system (cascades to strategy thread pools).
2587 # See AdvancedSearchSystem.close() for details.
2588 if "system" in locals():
2589 safe_close(system, "research system")
2590 # Close the LLM instance created for model/provider overrides.
2591 # system.close() does NOT close the LLM passed to it via system.model,
2592 # so we must close it explicitly here.
2593 if "use_llm" in locals():
2594 safe_close(use_llm, "research LLM")
2597def cleanup_research_resources(
2598 research_id,
2599 username=None,
2600 user_password=None,
2601 final_status=ResearchStatus.COMPLETED,
2602):
2603 """
2604 Clean up resources for a completed research.
2606 Args:
2607 research_id: The ID of the research
2608 username: The username for database access (required for thread context)
2609 final_status: The terminal status to report on the final socket
2610 message. Callers that end a research for a reason other than
2611 normal completion MUST pass the real status (e.g. SUSPENDED on
2612 user termination, FAILED on error) so the final ``progress``
2613 event matches reality. Defaulting this to COMPLETED — and
2614 previously hard-coding it — caused the stop/error paths to emit
2615 a spurious "completed" signal to subscribers.
2616 """
2617 from ..routes.globals import cleanup_research
2619 logger.info("Cleaning up resources for research {}", research_id)
2621 # For testing: Add a small delay to simulate research taking time
2622 # This helps test concurrent research limits
2623 from ...settings.env_registry import is_test_mode
2625 if is_test_mode():
2626 import time
2628 logger.info(
2629 f"Test mode: Adding 5 second delay before cleanup for {research_id}"
2630 )
2631 time.sleep(5)
2633 # The terminal status to report on the final socket message. This comes
2634 # from the caller (which knows why the research ended) rather than a
2635 # hard-coded COMPLETED, so termination (SUSPENDED) and error (FAILED)
2636 # paths no longer emit a false "completed" signal to subscribers.
2637 current_status = final_status
2639 # NOTE: Queue processor already handles database updates from the main thread
2640 # The notify_research_completed() method is called at the end of this function
2641 # which safely updates the database status
2643 # Notify queue processor that research completed
2644 # This uses processor_v2 which handles database updates in the main thread
2645 # avoiding the Flask request context issues that occur in background threads
2646 from ..queue.processor_v2 import queue_processor
2648 if username:
2649 queue_processor.notify_research_completed(
2650 username, research_id, user_password=user_password
2651 )
2652 logger.info(
2653 f"Notified queue processor of completion for research {research_id} (user: {username})"
2654 )
2655 else:
2656 logger.warning(
2657 f"Cannot notify completion for research {research_id} - no username provided"
2658 )
2660 # Remove from active research and termination flags atomically
2661 cleanup_research(research_id)
2663 # Clean up throttle state for this research
2664 with _last_emit_lock:
2665 _last_emit_times.pop(research_id, None)
2667 # Send a final message to subscribers
2668 try:
2669 # Send a final message to any remaining subscribers with explicit status
2670 # Use the proper status message based on database status
2671 if current_status in (
2672 ResearchStatus.SUSPENDED,
2673 ResearchStatus.FAILED,
2674 ):
2675 final_message = {
2676 "status": current_status,
2677 "message": f"Research was {current_status}",
2678 "progress": 0, # For suspended research, show 0% not 100%
2679 }
2680 else:
2681 final_message = {
2682 "status": ResearchStatus.COMPLETED,
2683 "message": "Research process has ended and resources have been cleaned up",
2684 "progress": 100,
2685 }
2687 logger.info(
2688 "Sending final {} socket message for research {}",
2689 current_status,
2690 research_id,
2691 )
2693 SocketIOService().emit_to_subscribers(
2694 "progress", research_id, final_message
2695 )
2697 # Clean up socket subscriptions for this research
2698 SocketIOService().remove_subscriptions_for_research(research_id)
2700 except Exception:
2701 logger.exception("Error sending final cleanup message")
2704def handle_termination(research_id, username=None):
2705 """
2706 Handle the termination of a research process.
2708 Args:
2709 research_id: The ID of the research
2710 username: The username for database access (required for thread context)
2711 """
2712 logger.info(f"Handling termination for research {research_id}")
2714 # Queue the status update to be processed in the main thread
2715 # This avoids Flask request context errors in background threads
2716 try:
2717 from ..queue.processor_v2 import queue_processor
2719 now = datetime.now(UTC)
2720 completed_at = now.isoformat()
2722 # Queue the suspension update
2723 queue_processor.queue_error_update(
2724 username=username,
2725 research_id=research_id,
2726 status=ResearchStatus.SUSPENDED,
2727 error_message="Research was terminated by user",
2728 metadata={"terminated_at": completed_at},
2729 completed_at=completed_at,
2730 report_path=None,
2731 )
2733 logger.info(f"Queued suspension update for research {research_id}")
2734 except Exception:
2735 logger.exception(
2736 f"Error queueing termination update for research {research_id}"
2737 )
2739 # Clean up resources (this already handles things properly).
2740 # Pass SUSPENDED so the final socket message reports the real terminal
2741 # status — not a spurious "completed" — to chat/progress subscribers.
2742 cleanup_research_resources(
2743 research_id, username, final_status=ResearchStatus.SUSPENDED
2744 )
2747def cancel_research(research_id, username):
2748 """
2749 Cancel/terminate a research process using ORM.
2751 Args:
2752 research_id: The ID of the research to cancel
2753 username: The username of the user cancelling the research
2755 Returns:
2756 bool: True if the research was found and cancelled, False otherwise
2757 """
2758 try:
2759 from ..routes.globals import is_research_active, set_termination_flag
2761 # Set termination flag
2762 set_termination_flag(research_id)
2764 # Check if the research is active
2765 if is_research_active(research_id):
2766 # Call handle_termination to update database
2767 handle_termination(research_id, username)
2768 return True
2769 try:
2770 with get_user_db_session(username) as db_session:
2771 research = (
2772 db_session.query(ResearchHistory)
2773 .filter_by(id=research_id)
2774 .first()
2775 )
2776 if not research:
2777 logger.info(f"Research {research_id} not found in database")
2778 return False
2780 # Check if already in a terminal state
2781 if research.status in (
2782 ResearchStatus.COMPLETED,
2783 ResearchStatus.SUSPENDED,
2784 ResearchStatus.FAILED,
2785 ResearchStatus.ERROR,
2786 ):
2787 logger.info(
2788 f"Research {research_id} already in terminal state: {research.status}"
2789 )
2790 return True # Consider this a success since it's already stopped
2792 # If it exists but isn't in active_research, still update status
2793 research.status = ResearchStatus.SUSPENDED
2794 db_session.commit()
2795 logger.info(f"Successfully suspended research {research_id}")
2796 except Exception:
2797 logger.exception(
2798 f"Error accessing database for research {research_id}"
2799 )
2800 return False
2802 return True
2803 except Exception:
2804 logger.exception(
2805 f"Unexpected error in cancel_research for {research_id}"
2806 )
2807 return False