Coverage for src/local_deep_research/web/queue/processor_v2.py: 89%
446 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Queue processor v2 - uses encrypted user databases instead of service.db
3Supports both direct execution and queue modes.
4"""
6import threading
7import time
8import uuid
9from typing import Any, Dict, Optional
11from loguru import logger
13from ...constants import ResearchStatus
14from ...database.encrypted_db import db_manager
15from ...database.models import (
16 QueuedResearch,
17 ResearchHistory,
18 UserActiveResearch,
19)
20from ...database.queue_service import UserQueueService
21from ...database.session_context import get_user_db_session
22from ...database.session_passwords import session_password_store
23from ...exceptions import DuplicateResearchError, SystemAtCapacityError
24from ...notifications.queue_helpers import (
25 send_research_completed_notification_from_session,
26 send_research_failed_notification_from_session,
27)
28from ..services.research_service import (
29 run_research_process,
30 start_research_process,
31)
33# Retry configuration constants for notification database queries
34MAX_RESEARCH_LOOKUP_RETRIES = 3
35INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds
36RETRY_BACKOFF_MULTIPLIER = 2
38# Give up on a queued research after this many consecutive spawn failures.
39# Each failure leaves is_processing=False so the next loop tick retries.
40SPAWN_RETRY_LIMIT = 3
43class QueueProcessorV2:
44 """
45 Processes queued researches using encrypted user databases.
46 This replaces the service.db approach.
47 """
49 def __init__(self, check_interval=10):
50 """
51 Initialize the queue processor.
53 Args:
54 check_interval: How often to check for work (seconds)
55 """
56 self.check_interval = check_interval
57 self.running = False
58 self.thread = None
59 self._loop_iteration = 0
61 # Per-user settings will be retrieved from each user's database
62 # when processing their queue using SettingsManager
63 logger.info(
64 "Queue processor v2 initialized - will use per-user settings from SettingsManager"
65 )
67 # Track which users we should check
68 self._users_to_check: set[tuple[str, str]] = set()
69 self._users_lock = threading.Lock()
71 # Track pending operations from background threads
72 self.pending_operations = {}
73 self._pending_operations_lock = threading.Lock()
75 # Per-user serialisation for the "count active → start direct"
76 # critical section. Without this the count-then-insert races with
77 # itself for the same user (e.g. two concurrent research submissions
78 # from two browser tabs), and IMMEDIATE isolation was what used to
79 # paper over it at the DB layer.
80 self._user_critical_locks: Dict[str, threading.Lock] = {}
81 self._user_critical_locks_lock = threading.Lock()
83 # Count consecutive spawn failures per research_id. Entries are
84 # popped on success or after hitting SPAWN_RETRY_LIMIT (then the
85 # research is marked FAILED). In-memory is sufficient: a restart
86 # resets the counter and the research gets a fresh N retries,
87 # which is the desired behavior if the underlying system issue
88 # (thread pool, memory) cleared.
89 # Access is guarded by _spawn_retry_counts_lock because the
90 # increment path is a read-modify-write and the loop and direct
91 # request paths can interleave.
92 self._spawn_retry_counts: dict[str, int] = {}
93 self._spawn_retry_counts_lock = threading.Lock()
95 def _get_user_critical_lock(self, username: str) -> threading.Lock:
96 """Get (or lazily create) the per-user lock used to serialise the
97 count-active-and-start-direct critical section for a given user.
98 """
99 with self._user_critical_locks_lock:
100 lock = self._user_critical_locks.get(username)
101 if lock is None: 101 ↛ 104line 101 didn't jump to line 104 because the condition on line 101 was always true
102 lock = threading.Lock()
103 self._user_critical_locks[username] = lock
104 return lock
106 def pop_user_critical_lock(self, username: str) -> None:
107 """Remove the per-user critical-section lock for ``username``.
109 Called from the user-close path so this instance dict doesn't
110 accumulate one entry per username across the process lifetime.
111 The next direct-research submission for that user lazily
112 re-creates the lock if needed — the lock has no state that
113 needs to persist across login/logout.
114 """
115 with self._user_critical_locks_lock:
116 self._user_critical_locks.pop(username, None)
118 def _bump_spawn_retry_count(self, research_id: str) -> int:
119 """Atomically increment and return the spawn-retry counter for
120 ``research_id``. Extracted so tests can exercise the real
121 locked increment path instead of duplicating the lock in the
122 test worker (which would be a tautology).
123 """
124 with self._spawn_retry_counts_lock:
125 attempts = self._spawn_retry_counts.get(research_id, 0) + 1
126 self._spawn_retry_counts[research_id] = attempts
127 return attempts
129 @staticmethod
130 def _commit_with_safe_rollback(db_session, context: str) -> bool:
131 """Commit ``db_session`` with best-effort rollback on failure.
133 Returns ``True`` on success, ``False`` if the commit raised.
134 The failure path logs via ``logger.exception`` and attempts a
135 rollback, itself guarded so a subsequent rollback failure is
136 logged at debug level rather than propagated.
138 Extracted because the ``try: commit / except: log + try:
139 rollback`` idiom repeats at ≥5 sites in this module; inlining
140 hides the defensive structure behind nested ``try`` blocks and
141 makes each callsite longer than the work it describes.
142 """
143 try:
144 db_session.commit()
145 return True
146 except Exception:
147 logger.exception(f"Commit failed: {context}")
148 try:
149 db_session.rollback()
150 except Exception:
151 logger.debug(
152 f"Rollback after commit failure ({context})",
153 exc_info=True,
154 )
155 return False
157 def _delete_queue_row_safely(
158 self, db_session, username: str, research_id: str
159 ) -> None:
160 """Best-effort delete of the ``QueuedResearch`` row for
161 ``(username, research_id)``.
163 Rolls back any pending state first (the session may be in
164 ``PendingRollbackError`` from a failed commit inside
165 ``_start_research``), re-queries the row fresh, deletes it if
166 present, and commits via ``_commit_with_safe_rollback``.
168 Use this for ``DuplicateResearchError`` cleanup where the goal
169 is "drop the queue row regardless of session state." Do NOT
170 use it for paths that need the delete to be atomic with other
171 writes (e.g. the terminal FAILED path bundles
172 ``ResearchHistory.status = FAILED`` with the queue-row delete
173 in a single commit — that stays inline).
174 """
175 try:
176 db_session.rollback()
177 except Exception:
178 logger.debug(
179 f"Rollback before queue-row delete for {research_id}",
180 exc_info=True,
181 )
182 try:
183 fresh_queued = (
184 db_session.query(QueuedResearch)
185 .filter_by(username=username, research_id=research_id)
186 .first()
187 )
188 if fresh_queued:
189 db_session.delete(fresh_queued)
190 self._commit_with_safe_rollback(
191 db_session,
192 f"queue-row delete for research {research_id}",
193 )
194 except Exception:
195 logger.exception(
196 f"Failed to query/delete queue row for {research_id}"
197 )
198 try:
199 db_session.rollback()
200 except Exception:
201 logger.debug(
202 f"Rollback after queue-row delete failure for {research_id}",
203 exc_info=True,
204 )
206 def start(self):
207 """Start the queue processor thread."""
208 if self.running:
209 logger.warning("Queue processor already running")
210 return
212 self.running = True
213 self.thread = threading.Thread(
214 target=self._process_queue_loop, daemon=True
215 )
216 self.thread.start()
217 logger.info("Queue processor v2 started")
219 def stop(self):
220 """Stop the queue processor thread."""
221 self.running = False
222 if self.thread:
223 self.thread.join(timeout=10)
224 logger.info("Queue processor v2 stopped")
226 def notify_user_activity(self, username: str, session_id: str):
227 """
228 Notify that a user has activity and their queue should be checked.
230 Args:
231 username: The username
232 session_id: The Flask session ID (for password access)
233 """
234 with self._users_lock:
235 self._users_to_check.add((username, session_id))
236 logger.debug(f"User {username} added to queue check list")
238 def notify_research_queued(self, username: str, research_id: str, **kwargs):
239 """
240 Notify that a research was queued.
241 In direct mode, this immediately starts the research if slots are available.
242 In queue mode, it adds to the queue.
244 Args:
245 username: The username
246 research_id: The research ID
247 **kwargs: Additional parameters for direct execution (query, mode, etc.)
248 """
249 # Check user's queue_mode setting when we have database access
250 if kwargs:
251 session_id = kwargs.get("session_id")
252 if session_id:
253 # Check if we can start it directly
254 password = session_password_store.get_session_password(
255 username, session_id
256 )
257 if password:
258 try:
259 # Open database and check settings + active count
260 engine = db_manager.open_user_database(
261 username, password
262 )
263 if engine: 263 ↛ 335line 263 didn't jump to line 335 because the condition on line 263 was always true
264 with get_user_db_session(username) as db_session:
265 # Get user's settings using SettingsManager
266 from ...settings.manager import SettingsManager
268 settings_manager = SettingsManager(db_session)
270 # Get user's queue_mode setting (env > DB > default)
271 queue_mode = settings_manager.get_setting(
272 "app.queue_mode", "direct"
273 )
275 # Get user's max concurrent setting (env > DB > default)
276 max_concurrent = settings_manager.get_setting(
277 "app.max_concurrent_researches", 3
278 )
280 logger.debug(
281 f"User {username} settings: queue_mode={queue_mode}, "
282 f"max_concurrent={max_concurrent}"
283 )
285 # Only try direct execution if user has queue_mode="direct"
286 if queue_mode == "direct":
287 # Serialise the count→check→start critical
288 # section at the application layer. Two
289 # concurrent submissions for the same user
290 # must not both observe the same active
291 # count and both start — that would exceed
292 # max_concurrent. A per-user Python lock
293 # gives us that atomicity independent of
294 # the DB isolation level.
295 with self._get_user_critical_lock(username):
296 active_count = (
297 db_session.query(UserActiveResearch)
298 .filter_by(
299 username=username,
300 status=ResearchStatus.IN_PROGRESS,
301 )
302 .count()
303 )
305 if active_count < max_concurrent:
306 # We have slots - start directly!
307 logger.info(
308 f"Direct mode: Starting research {research_id} immediately "
309 f"(active: {active_count}/{max_concurrent})"
310 )
312 # Start the research directly
313 self._start_research_directly(
314 username,
315 research_id,
316 password,
317 **kwargs,
318 )
319 return
320 logger.info(
321 f"Direct mode: Max concurrent reached ({active_count}/"
322 f"{max_concurrent}), queueing {research_id}"
323 )
324 else:
325 logger.info(
326 f"User {username} has queue_mode={queue_mode}, "
327 f"queueing research {research_id}"
328 )
329 except Exception:
330 logger.exception(
331 f"Error in direct execution for {username}"
332 )
334 # Fall back to queue mode (or if direct mode failed)
335 try:
336 with get_user_db_session(username) as session:
337 queue_service = UserQueueService(session)
338 queue_service.add_task_metadata(
339 task_id=research_id,
340 task_type="research",
341 priority=0,
342 )
343 logger.info(
344 f"Research {research_id} queued for user {username}"
345 )
346 except Exception:
347 logger.exception(f"Failed to update queue status for {username}")
349 def _start_research_directly(
350 self, username: str, research_id: str, password: str, **kwargs
351 ):
352 """
353 Start a research directly without queueing.
355 Args:
356 username: The username
357 research_id: The research ID
358 password: The user's password
359 **kwargs: Research parameters (query, mode, settings, etc.)
360 """
361 query = kwargs.get("query")
362 mode = kwargs.get("mode")
363 settings_snapshot = kwargs.get("settings_snapshot", {})
365 # Create active research record
366 try:
367 with get_user_db_session(username) as db_session:
368 active_record = UserActiveResearch(
369 username=username,
370 research_id=research_id,
371 status=ResearchStatus.IN_PROGRESS,
372 thread_id="pending",
373 settings_snapshot=settings_snapshot,
374 )
375 db_session.add(active_record)
376 db_session.commit()
378 # Update task status if it exists
379 queue_service = UserQueueService(db_session)
380 queue_service.update_task_status(research_id, "processing")
381 except Exception:
382 logger.exception(
383 f"Failed to create active research record for {research_id}"
384 )
385 return
387 # Extract parameters from kwargs
388 model_provider = kwargs.get("model_provider")
389 model = kwargs.get("model")
390 custom_endpoint = kwargs.get("custom_endpoint")
391 search_engine = kwargs.get("search_engine")
393 # Start the research process
394 try:
395 research_thread = start_research_process(
396 research_id,
397 query,
398 mode,
399 run_research_process,
400 username=username,
401 user_password=password,
402 model_provider=model_provider,
403 model=model,
404 custom_endpoint=custom_endpoint,
405 search_engine=search_engine,
406 max_results=kwargs.get("max_results"),
407 time_period=kwargs.get("time_period"),
408 iterations=kwargs.get("iterations"),
409 questions_per_iteration=kwargs.get("questions_per_iteration"),
410 strategy=kwargs.get("strategy", "source-based"),
411 settings_snapshot=settings_snapshot,
412 )
414 # Update thread ID
415 try:
416 with get_user_db_session(username) as db_session:
417 active_record = (
418 db_session.query(UserActiveResearch)
419 .filter_by(username=username, research_id=research_id)
420 .first()
421 )
422 if active_record: 422 ↛ 430line 422 didn't jump to line 430
423 active_record.thread_id = str(research_thread.ident)
424 db_session.commit()
425 except Exception:
426 logger.exception(
427 f"Failed to update thread ID for {research_id}"
428 )
430 logger.info(
431 f"Direct execution: Started research {research_id} for user {username} "
432 f"in thread {research_thread.ident}"
433 )
435 except DuplicateResearchError:
436 # A live thread already owns this research_id. Do NOT delete
437 # the UserActiveResearch row or mark ResearchHistory FAILED —
438 # that state belongs to the live thread, and mutating it
439 # would terminate a running research from the user's
440 # perspective while it keeps executing. Same contract as the
441 # queue processor's dedicated dup branch (#3506).
442 logger.warning(
443 f"Duplicate live thread detected for {research_id} "
444 "in direct mode; leaving state intact"
445 )
446 return
447 except SystemAtCapacityError:
448 # System at concurrent-research capacity in the direct-execution
449 # path. Roll back the IN_PROGRESS active row and mark history
450 # back to QUEUED so the queue processor can pick it up later.
451 logger.info(
452 f"Direct execution hit capacity for {research_id}; re-queueing"
453 )
454 try:
455 with get_user_db_session(username) as db_session:
456 active_record = (
457 db_session.query(UserActiveResearch)
458 .filter_by(username=username, research_id=research_id)
459 .first()
460 )
461 if active_record:
462 db_session.delete(active_record)
463 research_row = (
464 db_session.query(ResearchHistory)
465 .filter_by(id=research_id)
466 .first()
467 )
468 if research_row:
469 research_row.status = ResearchStatus.QUEUED
470 # Bump queued_tasks so _process_user_queue's
471 # `queued_tasks == 0` gate doesn't treat the queue as
472 # empty and strand the QueuedResearch row the submit
473 # path already created. The direct path returns before
474 # the normal add_task_metadata call, so this is the
475 # single, non-double-counting increment;
476 # _start_queued_researches later dispatches the row and
477 # update_task_status() transitions this TaskMetadata
478 # queued->processing (balancing the counter).
479 UserQueueService(db_session).add_task_metadata(
480 task_id=research_id,
481 task_type="research",
482 priority=0,
483 )
484 db_session.commit()
485 except Exception:
486 logger.exception(
487 f"Cleanup after capacity reject failed for "
488 f"{research_id}; the stale UserActiveResearch row is "
489 f"recovered by reclaim_stale_user_active_research"
490 )
491 return
492 except Exception:
493 logger.exception(f"Failed to start research {research_id} directly")
494 # Clean up the active record AND mark the research terminal
495 # FAILED so the user-visible state matches reality (no running
496 # thread, not IN_PROGRESS). Same contract as the queue
497 # processor's terminal-failure branch (#3481).
498 try:
499 with get_user_db_session(username) as db_session:
500 active_record = (
501 db_session.query(UserActiveResearch)
502 .filter_by(username=username, research_id=research_id)
503 .first()
504 )
505 if active_record: 505 ↛ 507line 505 didn't jump to line 507 because the condition on line 505 was always true
506 db_session.delete(active_record)
507 research_row = (
508 db_session.query(ResearchHistory)
509 .filter_by(id=research_id)
510 .first()
511 )
512 if research_row: 512 ↛ 514line 512 didn't jump to line 514 because the condition on line 512 was always true
513 research_row.status = ResearchStatus.FAILED
514 db_session.commit()
515 except Exception:
516 logger.exception(
517 f"Failed to clean up active research record for {research_id}"
518 )
520 def notify_research_completed(
521 self, username: str, research_id: str, user_password: str | None = None
522 ):
523 """
524 Notify that a research completed.
525 Updates the user's queue status in their database.
527 Args:
528 username: The username
529 research_id: The research ID
530 user_password: User password for database access. Required for queue
531 updates and database lookups during notification sending.
532 Optional only because some callers may not have it
533 available, in which case only basic updates occur.
534 """
535 try:
536 # get_user_db_session is already imported at module level (line 19)
537 # It accepts optional password parameter and returns a context manager
538 with get_user_db_session(username, user_password) as session:
539 queue_service = UserQueueService(session)
540 queue_service.update_task_status(
541 research_id, ResearchStatus.COMPLETED
542 )
543 logger.info(
544 f"Research {research_id} completed for user {username}"
545 )
547 # Send notification using helper from notification module
548 send_research_completed_notification_from_session(
549 username=username,
550 research_id=research_id,
551 db_session=session,
552 )
554 except Exception:
555 logger.exception(
556 f"Failed to update completion status for {username}"
557 )
559 # Auto-convert research to document in History collection.
560 # Documents only — FAISS indexing is triggered separately by the user
561 # via "Index All" on the History page.
562 from ...research_library.search.services.research_history_indexer import (
563 auto_convert_research,
564 )
566 auto_convert_research(username, research_id, db_password=user_password)
568 def notify_research_failed(
569 self,
570 username: str,
571 research_id: str,
572 error_message: str | None = None,
573 user_password: str | None = None,
574 ):
575 """
576 Notify that a research failed.
577 Updates the user's queue status in their database and sends notification.
579 Args:
580 username: The username
581 research_id: The research ID
582 error_message: Optional error message
583 user_password: User password for database access. Required for queue
584 updates and database lookups during notification sending.
585 Optional only because some callers may not have it
586 available, in which case only basic updates occur.
587 """
588 try:
589 # get_user_db_session is already imported at module level (line 19)
590 # It accepts optional password parameter and returns a context manager
591 with get_user_db_session(username, user_password) as session:
592 queue_service = UserQueueService(session)
593 queue_service.update_task_status(
594 research_id,
595 ResearchStatus.FAILED,
596 error_message=error_message,
597 )
598 logger.info(
599 f"Research {research_id} failed for user {username}: "
600 f"{error_message}"
601 )
603 # Send notification using helper from notification module
604 send_research_failed_notification_from_session(
605 username=username,
606 research_id=research_id,
607 error_message=error_message or "Unknown error",
608 db_session=session,
609 )
611 except Exception:
612 logger.exception(f"Failed to update failure status for {username}")
614 def _process_queue_loop(self):
615 """Main loop that processes the queue."""
616 while self.running:
617 try:
618 # Get list of users to check (don't clear immediately)
619 with self._users_lock:
620 users_to_check = list(self._users_to_check)
622 # Process each user's queue
623 users_to_remove = []
624 for user_session in users_to_check:
625 try:
626 username, session_id = user_session
627 # _process_user_queue returns True if queue is empty
628 queue_empty = self._process_user_queue(
629 username, session_id
630 )
631 if queue_empty:
632 users_to_remove.append(user_session)
633 except Exception:
634 logger.exception(
635 f"Error processing queue for {user_session}"
636 )
637 # Don't remove on error - the _process_user_queue method
638 # determines whether to keep checking based on error type
640 # Only remove users whose queues are now empty
641 with self._users_lock:
642 for user_session in users_to_remove:
643 self._users_to_check.discard(user_session)
645 except Exception:
646 logger.exception("Error in queue processor loop")
647 finally:
648 # Clean up thread-local database session after each iteration.
649 # The loop opens a new session each iteration via get_user_db_session();
650 # closing it returns the connection to the shared QueuePool promptly.
651 try:
652 from ...database.thread_local_session import (
653 cleanup_current_thread,
654 cleanup_dead_threads,
655 )
657 cleanup_current_thread()
658 except Exception:
659 logger.debug(
660 "thread-local cleanup on shutdown", exc_info=True
661 )
663 # Periodic dead-thread credential sweep (every ~60s).
664 # One of three sweep trigger points (app_factory
665 # teardown, connection_cleanup scheduler, and here).
666 self._loop_iteration += 1
667 if self._loop_iteration % 6 == 0: # Every ~60s (10s × 6)
668 try:
669 cleanup_dead_threads()
670 except Exception:
671 logger.debug(
672 "periodic dead-thread sweep", exc_info=True
673 )
675 time.sleep(self.check_interval)
677 def _process_user_queue(self, username: str, session_id: str) -> bool:
678 """
679 Process the queue for a specific user.
681 Args:
682 username: The username
683 session_id: The Flask session ID
685 Returns:
686 True if the queue is empty, False if there are still items
687 """
688 # Get the user's password from session store
689 password = session_password_store.get_session_password(
690 username, session_id
691 )
692 if not password:
693 logger.debug(
694 f"No password available for user {username}, skipping queue check"
695 )
696 return True # Remove from checking - session expired
698 # Open the user's encrypted database
699 try:
700 # First ensure the database is open
701 engine = db_manager.open_user_database(username, password)
702 if not engine:
703 logger.error(f"Failed to open database for user {username}")
704 return False # Keep checking - could be temporary DB issue
706 # Get a session and process the queue
707 with get_user_db_session(username, password) as db_session:
708 queue_service = UserQueueService(db_session)
710 # Get user's settings using SettingsManager
711 from ...settings.manager import SettingsManager
713 settings_manager = SettingsManager(db_session)
715 # Get user's max concurrent setting (env > DB > default)
716 max_concurrent = settings_manager.get_setting(
717 "app.max_concurrent_researches", 3
718 )
720 # Get queue status
721 queue_status = queue_service.get_queue_status() or {
722 "active_tasks": 0,
723 "queued_tasks": 0,
724 }
726 # Calculate available slots
727 available_slots = max_concurrent - queue_status["active_tasks"]
729 if available_slots <= 0:
730 # No slots available, but queue might not be empty
731 return False # Keep checking
733 if queue_status["queued_tasks"] == 0: 733 ↛ 737line 733 didn't jump to line 737 because the condition on line 733 was always true
734 # Queue is empty
735 return True # Remove from checking
737 logger.info(
738 f"Processing queue for {username}: "
739 f"{queue_status['active_tasks']} active, "
740 f"{queue_status['queued_tasks']} queued, "
741 f"{available_slots} slots available"
742 )
744 # Process queued researches
745 self._start_queued_researches(
746 db_session,
747 queue_service,
748 username,
749 password,
750 available_slots,
751 )
753 # Check if there are still items in queue
754 updated_status = queue_service.get_queue_status() or {
755 "queued_tasks": 0
756 }
757 return bool(updated_status["queued_tasks"] == 0)
759 except Exception:
760 logger.exception(f"Error processing queue for user {username}")
761 return False # Keep checking - errors might be temporary
763 def _reclaim_stranded_queue_rows(self, db_session, username: str) -> int:
764 """Reclaim queue rows stranded by a crash or restart.
766 A row is stranded when ``is_processing=True`` but no live thread
767 exists in ``_active_research`` for its ``research_id``. This can
768 happen after a crash/restart between the pre-spawn IN_PROGRESS
769 commit and the queue-row deletion in ``_start_queued_researches``
770 — the row is invisible to the normal ``is_processing=False``
771 query and would never be retried.
773 Reverts ``QueuedResearch.is_processing`` to False and — if
774 ``ResearchHistory.status`` is still IN_PROGRESS with no live
775 thread — reverts that to QUEUED so the next tick can freshly
776 spawn. Returns the number of rows reclaimed.
777 """
778 from ..routes.globals import is_research_active
780 stranded = (
781 db_session.query(QueuedResearch)
782 .filter_by(username=username, is_processing=True)
783 .all()
784 )
785 reclaimed = 0
786 for row in stranded:
787 if is_research_active(row.research_id):
788 # A legitimate in-flight claim; don't touch.
789 continue
790 row.is_processing = False
791 research = (
792 db_session.query(ResearchHistory)
793 .filter_by(id=row.research_id)
794 .first()
795 )
796 status_changed = (
797 research is not None
798 and research.status == ResearchStatus.IN_PROGRESS
799 )
800 if status_changed:
801 research.status = ResearchStatus.QUEUED
802 reclaimed += 1
803 logger.warning(
804 f"Reclaimed stranded queue row for research "
805 f"{row.research_id} (user {username}): no live thread, "
806 "resetting is_processing=False"
807 + (" and status=QUEUED" if status_changed else "")
808 )
809 if reclaimed:
810 if not self._commit_with_safe_rollback( 810 ↛ 814line 810 didn't jump to line 814 because the condition on line 810 was never true
811 db_session,
812 f"reclaim of stranded rows for user {username}",
813 ):
814 return 0
815 return reclaimed
817 def _start_queued_researches(
818 self,
819 db_session,
820 queue_service: UserQueueService,
821 username: str,
822 password: str,
823 available_slots: int,
824 ):
825 """Start queued researches up to available slots."""
826 # Before picking work, reclaim any rows stranded by a prior
827 # crash — otherwise they are invisible to the is_processing=False
828 # filter below and would never retry.
829 self._reclaim_stranded_queue_rows(db_session, username)
831 # Get queued researches
832 queued = (
833 db_session.query(QueuedResearch)
834 .filter_by(username=username, is_processing=False)
835 .order_by(QueuedResearch.position)
836 .limit(available_slots)
837 .all()
838 )
840 for queued_research in queued:
841 research_id = queued_research.research_id
842 try:
843 # Atomically claim this item by flipping is_processing from
844 # False to True in a single UPDATE. If another worker has
845 # already claimed it since our SELECT above, the UPDATE will
846 # match zero rows and we skip. Under non-IMMEDIATE isolation
847 # the previous SELECT+assign pattern would race and two
848 # workers could both process the same queued item.
849 claimed = (
850 db_session.query(QueuedResearch)
851 .filter_by(
852 id=queued_research.id,
853 is_processing=False,
854 )
855 .update(
856 {QueuedResearch.is_processing: True},
857 synchronize_session=False,
858 )
859 )
860 db_session.commit()
861 if not claimed:
862 logger.debug(
863 f"Queued research {research_id} "
864 f"already claimed by another worker; skipping"
865 )
866 continue
867 # Refresh local object state now that we hold the claim
868 db_session.refresh(queued_research)
870 # Update task status
871 queue_service.update_task_status(research_id, "processing")
873 # Start the research
874 self._start_research(
875 db_session,
876 username,
877 password,
878 queued_research,
879 )
881 # Success — clear any prior spawn-failure count and
882 # remove the queue row.
883 with self._spawn_retry_counts_lock:
884 self._spawn_retry_counts.pop(research_id, None)
885 db_session.delete(queued_research)
886 db_session.commit()
888 logger.info(
889 f"Started queued research {research_id} for user {username}"
890 )
892 except DuplicateResearchError:
893 # Raised by _start_research when a prior attempt's thread
894 # is still live, OR when the ResearchHistory row is in a
895 # non-QUEUED state (IN_PROGRESS from a prior attempt's
896 # successful pre-spawn commit; terminal COMPLETED /
897 # FAILED / SUSPENDED from a thread that already finished
898 # and cleaned up). In every case the correct behavior is
899 # the same: clear the stale queue row and the retry
900 # counter, and do NOT fall through to the FAILED/notify
901 # path — that would terminate-status a live thread or
902 # emit a false failure for a completed one.
903 logger.warning(
904 f"Research {research_id} is already started "
905 "(live thread or non-QUEUED status); clearing stale "
906 "queue row"
907 )
908 with self._spawn_retry_counts_lock:
909 self._spawn_retry_counts.pop(research_id, None)
910 self._delete_queue_row_safely(db_session, username, research_id)
911 continue
913 except SystemAtCapacityError:
914 # System hit the global concurrent-research capacity while
915 # dispatching this queued item. _start_research already
916 # reset the ResearchHistory row back to QUEUED before
917 # re-raising. This is a transient condition, NOT a spawn
918 # failure, so it must NOT count toward SPAWN_RETRY_LIMIT —
919 # otherwise a busy system would wrongly mark a perfectly
920 # valid queued research FAILED after a few ticks. Just
921 # release the processing claim so the next tick retries.
922 # Mirrors the dedicated handler in _start_research_directly.
923 logger.info(
924 f"System at capacity dispatching queued research "
925 f"{research_id}; leaving queued for next tick"
926 )
927 # Revert the queued->processing claim from
928 # update_task_status("processing") above. The research stays
929 # queued for the next tick, so its slot must return to
930 # queued_tasks rather than leaking into active_tasks on
931 # every capacity-rejected retry.
932 queue_service.update_task_status(research_id, "queued")
933 fresh_queued = (
934 db_session.query(QueuedResearch)
935 .filter_by(username=username, research_id=research_id)
936 .first()
937 )
938 if fresh_queued: 938 ↛ 945line 938 didn't jump to line 945 because the condition on line 938 was always true
939 fresh_queued.is_processing = False
940 self._commit_with_safe_rollback(
941 db_session,
942 "is_processing reset after capacity reject for "
943 f"research {research_id}",
944 )
945 continue
947 except Exception:
948 logger.exception(
949 f"Error starting queued research {research_id}"
950 )
951 # Session may be in PendingRollbackError state after a
952 # failed commit inside _start_research.
953 try:
954 db_session.rollback()
955 except Exception:
956 logger.debug(
957 "Rollback after start failure",
958 exc_info=True,
959 )
961 attempts = self._bump_spawn_retry_count(research_id)
963 # Re-query in case rollback expired the ORM object.
964 fresh_queued = (
965 db_session.query(QueuedResearch)
966 .filter_by(username=username, research_id=research_id)
967 .first()
968 )
970 if attempts < SPAWN_RETRY_LIMIT:
971 # Transient failure — allow the next loop tick to
972 # retry. _start_research rolls back its own
973 # IN_PROGRESS write on spawn failure, so the only
974 # fix-up needed here is resetting is_processing.
975 logger.warning(
976 f"Spawn failed for research {research_id} "
977 f"(attempt {attempts}/{SPAWN_RETRY_LIMIT}), "
978 "leaving queued for retry"
979 )
980 if fresh_queued: 980 ↛ 986line 980 didn't jump to line 986 because the condition on line 980 was always true
981 fresh_queued.is_processing = False
982 self._commit_with_safe_rollback(
983 db_session,
984 f"is_processing reset for research {research_id}",
985 )
986 continue
988 # Exhausted retries — mark terminal FAILED, delete the
989 # queue row to stop re-dispatch, and notify the user.
990 # Use logger.warning (not logger.exception) because the
991 # spawn traceback was already logged at the top of this
992 # except block; re-logging with exc_info emits a second
993 # full traceback.
994 logger.warning(
995 f"Spawn failed for research {research_id} "
996 f"after {attempts} attempts; marking FAILED"
997 )
998 with self._spawn_retry_counts_lock:
999 self._spawn_retry_counts.pop(research_id, None)
1000 try:
1001 research = (
1002 db_session.query(ResearchHistory)
1003 .filter_by(id=research_id)
1004 .first()
1005 )
1006 if research: 1006 ↛ 1008line 1006 didn't jump to line 1008 because the condition on line 1006 was always true
1007 research.status = ResearchStatus.FAILED
1008 if fresh_queued: 1008 ↛ 1010line 1008 didn't jump to line 1010 because the condition on line 1008 was always true
1009 db_session.delete(fresh_queued)
1010 db_session.commit()
1011 except Exception:
1012 logger.exception(
1013 "Failed to persist terminal FAILED state for "
1014 f"research {research_id}"
1015 )
1016 try:
1017 db_session.rollback()
1018 except Exception:
1019 logger.debug(
1020 "Rollback after terminal update failure",
1021 exc_info=True,
1022 )
1024 # notify_research_failed opens its own session and
1025 # sends the user notification. Called exactly once
1026 # per research_id because the counter is popped above.
1027 self.notify_research_failed(
1028 username=username,
1029 research_id=research_id,
1030 error_message=(
1031 f"Failed to start research after {attempts} attempts"
1032 ),
1033 user_password=password,
1034 )
1036 def _start_research(
1037 self,
1038 db_session,
1039 username: str,
1040 password: str,
1041 queued_research,
1042 ):
1043 """Start a queued research.
1045 Commits ``ResearchHistory.status = IN_PROGRESS`` BEFORE spawning
1046 the thread. If we did this after, a fast-completing thread
1047 (which opens its own DB session) could write ``COMPLETED`` and
1048 then our post-spawn commit would overwrite that with
1049 ``IN_PROGRESS``, stranding the research as stuck IN_PROGRESS
1050 after it had already finished.
1052 If ``start_research_process`` raises, reset status back to
1053 ``QUEUED`` and re-raise so the caller's 3-strike retry logic
1054 handles it. ``DuplicateResearchError`` is re-raised as-is
1055 because a thread is already running for this research; mutating
1056 status further would be wrong.
1057 """
1058 research_id = queued_research.research_id
1059 research = (
1060 db_session.query(ResearchHistory).filter_by(id=research_id).first()
1061 )
1063 if not research:
1064 raise ValueError(f"Research {research_id} not found")
1066 # Guard against re-entering _start_research on a retry when a
1067 # prior attempt's post-spawn UserActiveResearch commit failed:
1068 # - IN_PROGRESS means the prior thread is (or was) running.
1069 # - COMPLETED/FAILED means the prior thread already finished
1070 # and cleaned itself up out of _active_research, so a bare
1071 # retry would both overwrite the terminal status with
1072 # IN_PROGRESS and then spawn a *second* thread (because
1073 # check_and_start_research sees no live entry), re-running
1074 # the whole research.
1075 # In all three cases the correct behavior is the same: raise
1076 # DuplicateResearchError so the caller's existing dup branch
1077 # deletes the queue row without mutating status or notifying.
1078 if research.status != ResearchStatus.QUEUED:
1079 raise DuplicateResearchError(
1080 f"Research {research_id} is already started "
1081 f"(status={research.status})"
1082 )
1084 # Claim IN_PROGRESS before spawn to close the
1085 # thread-completes-before-parent-commits race.
1086 research.status = ResearchStatus.IN_PROGRESS
1087 db_session.commit()
1089 # Extract settings
1090 settings_snapshot = queued_research.settings_snapshot or {}
1092 # Handle new vs legacy structure
1093 if (
1094 isinstance(settings_snapshot, dict)
1095 and "submission" in settings_snapshot
1096 ):
1097 submission_params = settings_snapshot.get("submission", {})
1098 complete_settings = settings_snapshot.get("settings_snapshot", {})
1099 else:
1100 submission_params = settings_snapshot
1101 complete_settings = {}
1103 try:
1104 research_thread = start_research_process(
1105 research_id,
1106 queued_research.query,
1107 queued_research.mode,
1108 run_research_process,
1109 username=username,
1110 user_password=password, # Pass password for metrics
1111 model_provider=submission_params.get("model_provider"),
1112 model=submission_params.get("model"),
1113 custom_endpoint=submission_params.get("custom_endpoint"),
1114 search_engine=submission_params.get("search_engine"),
1115 max_results=submission_params.get("max_results"),
1116 time_period=submission_params.get("time_period"),
1117 iterations=submission_params.get("iterations"),
1118 questions_per_iteration=submission_params.get(
1119 "questions_per_iteration"
1120 ),
1121 strategy=submission_params.get("strategy", "source-based"),
1122 settings_snapshot=complete_settings,
1123 )
1124 except DuplicateResearchError:
1125 # A live thread already exists for this research_id (e.g.
1126 # previous attempt's post-spawn commit failed). Do NOT
1127 # reset status — that would contradict the running thread.
1128 raise
1129 except SystemAtCapacityError:
1130 # System at concurrent-research capacity. No thread was
1131 # spawned. Reset to QUEUED so the next dispatch tick can try
1132 # again — this is not a permanent spawn failure and should
1133 # NOT count toward SPAWN_RETRY_LIMIT.
1134 logger.info(
1135 f"System at capacity when dispatching {research_id}; "
1136 "re-queueing for next tick"
1137 )
1138 research.status = ResearchStatus.QUEUED
1139 self._commit_with_safe_rollback(
1140 db_session,
1141 f"status reset to QUEUED after capacity reject for research {research_id}",
1142 )
1143 raise
1144 except Exception:
1145 # Genuine spawn failure: no thread exists. Roll back the
1146 # IN_PROGRESS claim so the retry sees a clean QUEUED row.
1147 research.status = ResearchStatus.QUEUED
1148 self._commit_with_safe_rollback(
1149 db_session,
1150 f"status reset to QUEUED after spawn failure for research {research_id}",
1151 )
1152 raise
1154 # Thread is running. Record the active-research row. If this
1155 # commit fails the live thread is unrecorded but still running.
1156 # Raise DuplicateResearchError instead of letting a generic
1157 # exception propagate, so the caller's dup branch cleans up the
1158 # queue row without bumping the retry counter — if we let this
1159 # count as a spawn failure, three consecutive post-spawn commit
1160 # failures (or one at LIMIT-1) would push the counter to
1161 # SPAWN_RETRY_LIMIT and mark a LIVE thread as terminal FAILED.
1162 active_record = UserActiveResearch(
1163 username=username,
1164 research_id=research_id,
1165 status=ResearchStatus.IN_PROGRESS,
1166 thread_id=str(research_thread.ident),
1167 settings_snapshot=queued_research.settings_snapshot,
1168 )
1169 db_session.add(active_record)
1170 if not self._commit_with_safe_rollback(
1171 db_session,
1172 f"UserActiveResearch persist after spawn for research {research_id}",
1173 ):
1174 # Thread is live; the commit failing leaves the UAR row
1175 # unrecorded but the thread running. Raise
1176 # DuplicateResearchError so the caller's dup branch deletes
1177 # the queue row without bumping the retry counter — if we
1178 # let a plain exception count as a spawn failure, a commit
1179 # failure at SPAWN_RETRY_LIMIT - 1 would mark a LIVE thread
1180 # as terminal FAILED.
1181 raise DuplicateResearchError(
1182 f"Research {research_id} thread is live; "
1183 "UserActiveResearch commit failed"
1184 )
1186 def process_user_request(self, username: str, session_id: str) -> int:
1187 """
1188 Process queue for a user during their request.
1189 This is called from request context to check and start queued items.
1191 Returns:
1192 Number of researches started
1193 """
1194 try:
1195 # Add user to check list
1196 self.notify_user_activity(username, session_id)
1198 # Force immediate check (don't wait for loop)
1199 password = session_password_store.get_session_password(
1200 username, session_id
1201 )
1202 if password:
1203 # Open database and check queue
1204 engine = db_manager.open_user_database(username, password)
1205 if engine: 1205 ↛ 1218line 1205 didn't jump to line 1218 because the condition on line 1205 was always true
1206 with get_user_db_session(username) as db_session:
1207 queue_service = UserQueueService(db_session)
1208 status = queue_service.get_queue_status()
1210 if status and status["queued_tasks"] > 0:
1211 logger.info(
1212 f"User {username} has {status['queued_tasks']} "
1213 f"queued tasks, triggering immediate processing"
1214 )
1215 # Process will happen in background thread
1216 return int(status["queued_tasks"])
1218 return 0
1220 except Exception:
1221 logger.exception(f"Error in process_user_request for {username}")
1222 return 0
1224 def queue_progress_update(
1225 self, username: str, research_id: str, progress: float
1226 ):
1227 """
1228 Queue a progress update that needs database access.
1229 For compatibility with old processor during migration.
1231 Args:
1232 username: The username
1233 research_id: The research ID
1234 progress: The progress value (0-100)
1235 """
1236 # In processor_v2, we can update directly if we have database access
1237 # or queue it for later processing
1238 operation_id = str(uuid.uuid4())
1239 with self._pending_operations_lock:
1240 self.pending_operations[operation_id] = {
1241 "username": username,
1242 "operation_type": "progress_update",
1243 "research_id": research_id,
1244 "progress": progress,
1245 "timestamp": time.time(),
1246 }
1247 logger.debug(
1248 f"Queued progress update for research {research_id}: {progress}%"
1249 )
1251 def queue_error_update(
1252 self,
1253 username: str,
1254 research_id: str,
1255 status: str,
1256 error_message: str,
1257 metadata: Dict[str, Any],
1258 completed_at: str,
1259 report_path: Optional[str] = None,
1260 ):
1261 """
1262 Queue an error status update that needs database access.
1263 For compatibility with old processor during migration.
1265 Args:
1266 username: The username
1267 research_id: The research ID
1268 status: The status to set (failed, suspended, etc.)
1269 error_message: The error message
1270 metadata: Research metadata
1271 completed_at: Completion timestamp
1272 report_path: Optional path to error report
1273 """
1274 operation_id = str(uuid.uuid4())
1275 with self._pending_operations_lock:
1276 self.pending_operations[operation_id] = {
1277 "username": username,
1278 "operation_type": "error_update",
1279 "research_id": research_id,
1280 "status": status,
1281 "error_message": error_message,
1282 "metadata": metadata,
1283 "completed_at": completed_at,
1284 "report_path": report_path,
1285 "timestamp": time.time(),
1286 }
1287 logger.info(
1288 f"Queued error update for research {research_id} with status {status}"
1289 )
1291 def process_pending_operations_for_user(
1292 self, username: str, db_session
1293 ) -> int:
1294 """
1295 Process pending operations for a user when we have database access.
1296 Called from request context where encrypted database is accessible.
1297 For compatibility with old processor during migration.
1299 Args:
1300 username: Username to process operations for
1301 db_session: Active database session for the user
1303 Returns:
1304 Number of operations processed
1305 """
1306 # Find pending operations for this user (with lock)
1307 operations_to_process = []
1308 with self._pending_operations_lock:
1309 for op_id, op_data in list(self.pending_operations.items()):
1310 if op_data["username"] == username:
1311 operations_to_process.append((op_id, op_data))
1312 # Remove immediately to prevent duplicate processing
1313 del self.pending_operations[op_id]
1315 if not operations_to_process:
1316 return 0
1318 processed_count = 0
1320 # Process operations outside the lock (to avoid holding lock during DB operations)
1321 for op_id, op_data in operations_to_process:
1322 try:
1323 operation_type = op_data.get("operation_type")
1325 if operation_type == "progress_update":
1326 # Update progress in database
1327 from ...database.models import ResearchHistory
1329 research = (
1330 db_session.query(ResearchHistory)
1331 .filter_by(id=op_data["research_id"])
1332 .first()
1333 )
1334 if research: 1334 ↛ 1321line 1334 didn't jump to line 1321 because the condition on line 1334 was always true
1335 # Update the progress column directly
1336 research.progress = op_data["progress"]
1337 db_session.commit()
1338 processed_count += 1
1340 elif operation_type == "error_update": 1340 ↛ 1321line 1340 didn't jump to line 1321 because the condition on line 1340 was always true
1341 # Update error status in database
1342 from ...database.models import ResearchHistory
1344 research = (
1345 db_session.query(ResearchHistory)
1346 .filter_by(id=op_data["research_id"])
1347 .first()
1348 )
1349 if research: 1349 ↛ 1321line 1349 didn't jump to line 1321 because the condition on line 1349 was always true
1350 research.status = op_data["status"]
1351 research.error_message = op_data["error_message"]
1352 research.research_meta = op_data["metadata"]
1353 research.completed_at = op_data["completed_at"]
1354 if op_data.get("report_path"):
1355 research.report_path = op_data["report_path"]
1356 db_session.commit()
1357 processed_count += 1
1359 except Exception:
1360 logger.exception(f"Error processing operation {op_id}")
1361 # Rollback to clear the failed transaction state
1362 try:
1363 db_session.rollback()
1364 except Exception:
1365 logger.warning(
1366 f"Failed to rollback after error in operation {op_id}"
1367 )
1369 return processed_count
1372# Global queue processor instance
1373queue_processor = QueueProcessorV2()