Coverage for src / local_deep_research / web / queue / processor_v2.py: 95%
295 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Queue processor v2 - uses encrypted user databases instead of service.db
3Supports both direct execution and queue modes.
4"""
6import threading
7import time
8import uuid
9from typing import Any, Dict, Optional
11from loguru import logger
13from ...constants import ResearchStatus
14from ...database.encrypted_db import db_manager
15from ...database.models import (
16 QueuedResearch,
17 ResearchHistory,
18 UserActiveResearch,
19)
20from ...database.queue_service import UserQueueService
21from ...database.session_context import get_user_db_session
22from ...database.session_passwords import session_password_store
23from ...notifications.queue_helpers import (
24 send_research_completed_notification_from_session,
25 send_research_failed_notification_from_session,
26)
27from ..services.research_service import (
28 run_research_process,
29 start_research_process,
30)
32# Retry configuration constants for notification database queries
33MAX_RESEARCH_LOOKUP_RETRIES = 3
34INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds
35RETRY_BACKOFF_MULTIPLIER = 2
38class QueueProcessorV2:
39 """
40 Processes queued researches using encrypted user databases.
41 This replaces the service.db approach.
42 """
44 def __init__(self, check_interval=10):
45 """
46 Initialize the queue processor.
48 Args:
49 check_interval: How often to check for work (seconds)
50 """
51 self.check_interval = check_interval
52 self.running = False
53 self.thread = None
54 self._loop_iteration = 0
56 # Per-user settings will be retrieved from each user's database
57 # when processing their queue using SettingsManager
58 logger.info(
59 "Queue processor v2 initialized - will use per-user settings from SettingsManager"
60 )
62 # Track which users we should check
63 self._users_to_check: set[tuple[str, str]] = set()
64 self._users_lock = threading.Lock()
66 # Track pending operations from background threads
67 self.pending_operations = {}
68 self._pending_operations_lock = threading.Lock()
70 def start(self):
71 """Start the queue processor thread."""
72 if self.running:
73 logger.warning("Queue processor already running")
74 return
76 self.running = True
77 self.thread = threading.Thread(
78 target=self._process_queue_loop, daemon=True
79 )
80 self.thread.start()
81 logger.info("Queue processor v2 started")
83 def stop(self):
84 """Stop the queue processor thread."""
85 self.running = False
86 if self.thread:
87 self.thread.join(timeout=10)
88 logger.info("Queue processor v2 stopped")
90 def notify_user_activity(self, username: str, session_id: str):
91 """
92 Notify that a user has activity and their queue should be checked.
94 Args:
95 username: The username
96 session_id: The Flask session ID (for password access)
97 """
98 with self._users_lock:
99 self._users_to_check.add((username, session_id))
100 logger.debug(f"User {username} added to queue check list")
102 def notify_research_queued(self, username: str, research_id: str, **kwargs):
103 """
104 Notify that a research was queued.
105 In direct mode, this immediately starts the research if slots are available.
106 In queue mode, it adds to the queue.
108 Args:
109 username: The username
110 research_id: The research ID
111 **kwargs: Additional parameters for direct execution (query, mode, etc.)
112 """
113 # Check user's queue_mode setting when we have database access
114 if kwargs:
115 session_id = kwargs.get("session_id")
116 if session_id:
117 # Check if we can start it directly
118 password = session_password_store.get_session_password(
119 username, session_id
120 )
121 if password:
122 try:
123 # Open database and check settings + active count
124 engine = db_manager.open_user_database(
125 username, password
126 )
127 if engine: 127 ↛ 191line 127 didn't jump to line 191 because the condition on line 127 was always true
128 with get_user_db_session(username) as db_session:
129 # Get user's settings using SettingsManager
130 from ...settings.manager import SettingsManager
132 settings_manager = SettingsManager(db_session)
134 # Get user's queue_mode setting (env > DB > default)
135 queue_mode = settings_manager.get_setting(
136 "app.queue_mode", "direct"
137 )
139 # Get user's max concurrent setting (env > DB > default)
140 max_concurrent = settings_manager.get_setting(
141 "app.max_concurrent_researches", 3
142 )
144 logger.debug(
145 f"User {username} settings: queue_mode={queue_mode}, "
146 f"max_concurrent={max_concurrent}"
147 )
149 # Only try direct execution if user has queue_mode="direct"
150 if queue_mode == "direct":
151 # Count active researches
152 active_count = (
153 db_session.query(UserActiveResearch)
154 .filter_by(
155 username=username,
156 status=ResearchStatus.IN_PROGRESS,
157 )
158 .count()
159 )
161 if active_count < max_concurrent:
162 # We have slots - start directly!
163 logger.info(
164 f"Direct mode: Starting research {research_id} immediately "
165 f"(active: {active_count}/{max_concurrent})"
166 )
168 # Start the research directly
169 self._start_research_directly(
170 username,
171 research_id,
172 password,
173 **kwargs,
174 )
175 return
176 logger.info(
177 f"Direct mode: Max concurrent reached ({active_count}/"
178 f"{max_concurrent}), queueing {research_id}"
179 )
180 else:
181 logger.info(
182 f"User {username} has queue_mode={queue_mode}, "
183 f"queueing research {research_id}"
184 )
185 except Exception:
186 logger.exception(
187 f"Error in direct execution for {username}"
188 )
190 # Fall back to queue mode (or if direct mode failed)
191 try:
192 with get_user_db_session(username) as session:
193 queue_service = UserQueueService(session)
194 queue_service.add_task_metadata(
195 task_id=research_id,
196 task_type="research",
197 priority=0,
198 )
199 logger.info(
200 f"Research {research_id} queued for user {username}"
201 )
202 except Exception:
203 logger.exception(f"Failed to update queue status for {username}")
205 def _start_research_directly(
206 self, username: str, research_id: str, password: str, **kwargs
207 ):
208 """
209 Start a research directly without queueing.
211 Args:
212 username: The username
213 research_id: The research ID
214 password: The user's password
215 **kwargs: Research parameters (query, mode, settings, etc.)
216 """
217 query = kwargs.get("query")
218 mode = kwargs.get("mode")
219 settings_snapshot = kwargs.get("settings_snapshot", {})
221 # Create active research record
222 try:
223 with get_user_db_session(username) as db_session:
224 active_record = UserActiveResearch(
225 username=username,
226 research_id=research_id,
227 status=ResearchStatus.IN_PROGRESS,
228 thread_id="pending",
229 settings_snapshot=settings_snapshot,
230 )
231 db_session.add(active_record)
232 db_session.commit()
234 # Update task status if it exists
235 queue_service = UserQueueService(db_session)
236 queue_service.update_task_status(research_id, "processing")
237 except Exception:
238 logger.exception(
239 f"Failed to create active research record for {research_id}"
240 )
241 return
243 # Extract parameters from kwargs
244 model_provider = kwargs.get("model_provider")
245 model = kwargs.get("model")
246 custom_endpoint = kwargs.get("custom_endpoint")
247 search_engine = kwargs.get("search_engine")
249 # Start the research process
250 try:
251 research_thread = start_research_process(
252 research_id,
253 query,
254 mode,
255 run_research_process,
256 username=username,
257 user_password=password,
258 model_provider=model_provider,
259 model=model,
260 custom_endpoint=custom_endpoint,
261 search_engine=search_engine,
262 max_results=kwargs.get("max_results"),
263 time_period=kwargs.get("time_period"),
264 iterations=kwargs.get("iterations"),
265 questions_per_iteration=kwargs.get("questions_per_iteration"),
266 strategy=kwargs.get("strategy", "source-based"),
267 settings_snapshot=settings_snapshot,
268 )
270 # Update thread ID
271 try:
272 with get_user_db_session(username) as db_session:
273 active_record = (
274 db_session.query(UserActiveResearch)
275 .filter_by(username=username, research_id=research_id)
276 .first()
277 )
278 if active_record: 278 ↛ 286line 278 didn't jump to line 286
279 active_record.thread_id = str(research_thread.ident)
280 db_session.commit()
281 except Exception:
282 logger.exception(
283 f"Failed to update thread ID for {research_id}"
284 )
286 logger.info(
287 f"Direct execution: Started research {research_id} for user {username} "
288 f"in thread {research_thread.ident}"
289 )
291 except Exception:
292 logger.exception(f"Failed to start research {research_id} directly")
293 # Clean up the active record
294 try:
295 with get_user_db_session(username) as db_session:
296 active_record = (
297 db_session.query(UserActiveResearch)
298 .filter_by(username=username, research_id=research_id)
299 .first()
300 )
301 if active_record: 301 ↛ exitline 301 didn't jump to the function exit
302 db_session.delete(active_record)
303 db_session.commit()
304 except Exception:
305 logger.exception(
306 f"Failed to clean up active research record for {research_id}"
307 )
309 def notify_research_completed(
310 self, username: str, research_id: str, user_password: str | None = None
311 ):
312 """
313 Notify that a research completed.
314 Updates the user's queue status in their database.
316 Args:
317 username: The username
318 research_id: The research ID
319 user_password: User password for database access. Required for queue
320 updates and database lookups during notification sending.
321 Optional only because some callers may not have it
322 available, in which case only basic updates occur.
323 """
324 try:
325 # get_user_db_session is already imported at module level (line 19)
326 # It accepts optional password parameter and returns a context manager
327 with get_user_db_session(username, user_password) as session:
328 queue_service = UserQueueService(session)
329 queue_service.update_task_status(
330 research_id, ResearchStatus.COMPLETED
331 )
332 logger.info(
333 f"Research {research_id} completed for user {username}"
334 )
336 # Send notification using helper from notification module
337 send_research_completed_notification_from_session(
338 username=username,
339 research_id=research_id,
340 db_session=session,
341 )
343 except Exception:
344 logger.exception(
345 f"Failed to update completion status for {username}"
346 )
348 # Auto-convert research to document in History collection.
349 # Documents only — FAISS indexing is triggered separately by the user
350 # via "Index All" on the History page.
351 from ...research_library.search.services.research_history_indexer import (
352 auto_convert_research,
353 )
355 auto_convert_research(username, research_id, db_password=user_password)
357 def notify_research_failed(
358 self,
359 username: str,
360 research_id: str,
361 error_message: str | None = None,
362 user_password: str | None = None,
363 ):
364 """
365 Notify that a research failed.
366 Updates the user's queue status in their database and sends notification.
368 Args:
369 username: The username
370 research_id: The research ID
371 error_message: Optional error message
372 user_password: User password for database access. Required for queue
373 updates and database lookups during notification sending.
374 Optional only because some callers may not have it
375 available, in which case only basic updates occur.
376 """
377 try:
378 # get_user_db_session is already imported at module level (line 19)
379 # It accepts optional password parameter and returns a context manager
380 with get_user_db_session(username, user_password) as session:
381 queue_service = UserQueueService(session)
382 queue_service.update_task_status(
383 research_id,
384 ResearchStatus.FAILED,
385 error_message=error_message,
386 )
387 logger.info(
388 f"Research {research_id} failed for user {username}: "
389 f"{error_message}"
390 )
392 # Send notification using helper from notification module
393 send_research_failed_notification_from_session(
394 username=username,
395 research_id=research_id,
396 error_message=error_message or "Unknown error",
397 db_session=session,
398 )
400 except Exception:
401 logger.exception(f"Failed to update failure status for {username}")
403 def _process_queue_loop(self):
404 """Main loop that processes the queue."""
405 while self.running:
406 try:
407 # Get list of users to check (don't clear immediately)
408 with self._users_lock:
409 users_to_check = list(self._users_to_check)
411 # Process each user's queue
412 users_to_remove = []
413 for user_session in users_to_check:
414 try:
415 username, session_id = user_session
416 # _process_user_queue returns True if queue is empty
417 queue_empty = self._process_user_queue(
418 username, session_id
419 )
420 if queue_empty:
421 users_to_remove.append(user_session)
422 except Exception:
423 logger.exception(
424 f"Error processing queue for {user_session}"
425 )
426 # Don't remove on error - the _process_user_queue method
427 # determines whether to keep checking based on error type
429 # Only remove users whose queues are now empty
430 with self._users_lock:
431 for user_session in users_to_remove:
432 self._users_to_check.discard(user_session)
434 except Exception:
435 logger.exception("Error in queue processor loop")
436 finally:
437 # Clean up thread-local database session after each iteration.
438 # The loop opens a new session each iteration via get_user_db_session();
439 # closing it returns the connection to the shared QueuePool promptly.
440 try:
441 from ...database.thread_local_session import (
442 cleanup_current_thread,
443 cleanup_dead_threads,
444 )
446 cleanup_current_thread()
447 except Exception:
448 logger.debug(
449 "thread-local cleanup on shutdown", exc_info=True
450 )
452 # Periodic dead-thread credential sweep (every ~60s).
453 # One of three sweep trigger points (app_factory
454 # teardown, connection_cleanup scheduler, and here).
455 self._loop_iteration += 1
456 if self._loop_iteration % 6 == 0: # Every ~60s (10s × 6)
457 try:
458 cleanup_dead_threads()
459 except Exception:
460 logger.debug(
461 "periodic dead-thread sweep", exc_info=True
462 )
464 time.sleep(self.check_interval)
466 def _process_user_queue(self, username: str, session_id: str) -> bool:
467 """
468 Process the queue for a specific user.
470 Args:
471 username: The username
472 session_id: The Flask session ID
474 Returns:
475 True if the queue is empty, False if there are still items
476 """
477 # Get the user's password from session store
478 password = session_password_store.get_session_password(
479 username, session_id
480 )
481 if not password:
482 logger.debug(
483 f"No password available for user {username}, skipping queue check"
484 )
485 return True # Remove from checking - session expired
487 # Open the user's encrypted database
488 try:
489 # First ensure the database is open
490 engine = db_manager.open_user_database(username, password)
491 if not engine:
492 logger.error(f"Failed to open database for user {username}")
493 return False # Keep checking - could be temporary DB issue
495 # Get a session and process the queue
496 with get_user_db_session(username, password) as db_session:
497 queue_service = UserQueueService(db_session)
499 # Get user's settings using SettingsManager
500 from ...settings.manager import SettingsManager
502 settings_manager = SettingsManager(db_session)
504 # Get user's max concurrent setting (env > DB > default)
505 max_concurrent = settings_manager.get_setting(
506 "app.max_concurrent_researches", 3
507 )
509 # Get queue status
510 queue_status = queue_service.get_queue_status() or {
511 "active_tasks": 0,
512 "queued_tasks": 0,
513 }
515 # Calculate available slots
516 available_slots = max_concurrent - queue_status["active_tasks"]
518 if available_slots <= 0:
519 # No slots available, but queue might not be empty
520 return False # Keep checking
522 if queue_status["queued_tasks"] == 0: 522 ↛ 526line 522 didn't jump to line 526 because the condition on line 522 was always true
523 # Queue is empty
524 return True # Remove from checking
526 logger.info(
527 f"Processing queue for {username}: "
528 f"{queue_status['active_tasks']} active, "
529 f"{queue_status['queued_tasks']} queued, "
530 f"{available_slots} slots available"
531 )
533 # Process queued researches
534 self._start_queued_researches(
535 db_session,
536 queue_service,
537 username,
538 password,
539 available_slots,
540 )
542 # Check if there are still items in queue
543 updated_status = queue_service.get_queue_status() or {
544 "queued_tasks": 0
545 }
546 return bool(updated_status["queued_tasks"] == 0)
548 except Exception:
549 logger.exception(f"Error processing queue for user {username}")
550 return False # Keep checking - errors might be temporary
552 def _start_queued_researches(
553 self,
554 db_session,
555 queue_service: UserQueueService,
556 username: str,
557 password: str,
558 available_slots: int,
559 ):
560 """Start queued researches up to available slots."""
561 # Get queued researches
562 queued = (
563 db_session.query(QueuedResearch)
564 .filter_by(username=username, is_processing=False)
565 .order_by(QueuedResearch.position)
566 .limit(available_slots)
567 .all()
568 )
570 for queued_research in queued:
571 try:
572 # Mark as processing
573 queued_research.is_processing = True
574 db_session.commit()
576 # Update task status
577 queue_service.update_task_status(
578 queued_research.research_id, "processing"
579 )
581 # Start the research
582 self._start_research(
583 db_session,
584 username,
585 password,
586 queued_research,
587 )
589 # Remove from queue
590 db_session.delete(queued_research)
591 db_session.commit()
593 logger.info(
594 f"Started queued research {queued_research.research_id} "
595 f"for user {username}"
596 )
598 except Exception:
599 logger.exception(
600 f"Error starting queued research {queued_research.research_id}"
601 )
602 # Reset processing flag
603 queued_research.is_processing = False
604 db_session.commit()
606 # Update task status
607 queue_service.update_task_status(
608 queued_research.research_id,
609 ResearchStatus.FAILED,
610 error_message="Failed to start research",
611 )
613 def _start_research(
614 self,
615 db_session,
616 username: str,
617 password: str,
618 queued_research,
619 ):
620 """Start a queued research."""
621 # Update research status
622 research = (
623 db_session.query(ResearchHistory)
624 .filter_by(id=queued_research.research_id)
625 .first()
626 )
628 if not research:
629 raise ValueError(
630 f"Research {queued_research.research_id} not found"
631 )
633 research.status = ResearchStatus.IN_PROGRESS
634 db_session.commit()
636 # Create active research record
637 active_record = UserActiveResearch(
638 username=username,
639 research_id=queued_research.research_id,
640 status=ResearchStatus.IN_PROGRESS,
641 thread_id="pending",
642 settings_snapshot=queued_research.settings_snapshot,
643 )
644 db_session.add(active_record)
645 db_session.commit()
647 # Extract settings
648 settings_snapshot = queued_research.settings_snapshot or {}
650 # Handle new vs legacy structure
651 if (
652 isinstance(settings_snapshot, dict)
653 and "submission" in settings_snapshot
654 ):
655 submission_params = settings_snapshot.get("submission", {})
656 complete_settings = settings_snapshot.get("settings_snapshot", {})
657 else:
658 submission_params = settings_snapshot
659 complete_settings = {}
661 # Start the research process with password
662 research_thread = start_research_process(
663 queued_research.research_id,
664 queued_research.query,
665 queued_research.mode,
666 run_research_process,
667 username=username,
668 user_password=password, # Pass password for metrics
669 model_provider=submission_params.get("model_provider"),
670 model=submission_params.get("model"),
671 custom_endpoint=submission_params.get("custom_endpoint"),
672 search_engine=submission_params.get("search_engine"),
673 max_results=submission_params.get("max_results"),
674 time_period=submission_params.get("time_period"),
675 iterations=submission_params.get("iterations"),
676 questions_per_iteration=submission_params.get(
677 "questions_per_iteration"
678 ),
679 strategy=submission_params.get("strategy", "source-based"),
680 settings_snapshot=complete_settings,
681 )
683 # Update thread ID
684 active_record.thread_id = str(research_thread.ident) # type: ignore[assignment]
685 db_session.commit()
687 def process_user_request(self, username: str, session_id: str) -> int:
688 """
689 Process queue for a user during their request.
690 This is called from request context to check and start queued items.
692 Returns:
693 Number of researches started
694 """
695 try:
696 # Add user to check list
697 self.notify_user_activity(username, session_id)
699 # Force immediate check (don't wait for loop)
700 password = session_password_store.get_session_password(
701 username, session_id
702 )
703 if password:
704 # Open database and check queue
705 engine = db_manager.open_user_database(username, password)
706 if engine: 706 ↛ 719line 706 didn't jump to line 719 because the condition on line 706 was always true
707 with get_user_db_session(username) as db_session:
708 queue_service = UserQueueService(db_session)
709 status = queue_service.get_queue_status()
711 if status and status["queued_tasks"] > 0:
712 logger.info(
713 f"User {username} has {status['queued_tasks']} "
714 f"queued tasks, triggering immediate processing"
715 )
716 # Process will happen in background thread
717 return int(status["queued_tasks"])
719 return 0
721 except Exception:
722 logger.exception(f"Error in process_user_request for {username}")
723 return 0
725 def queue_progress_update(
726 self, username: str, research_id: str, progress: float
727 ):
728 """
729 Queue a progress update that needs database access.
730 For compatibility with old processor during migration.
732 Args:
733 username: The username
734 research_id: The research ID
735 progress: The progress value (0-100)
736 """
737 # In processor_v2, we can update directly if we have database access
738 # or queue it for later processing
739 operation_id = str(uuid.uuid4())
740 with self._pending_operations_lock:
741 self.pending_operations[operation_id] = {
742 "username": username,
743 "operation_type": "progress_update",
744 "research_id": research_id,
745 "progress": progress,
746 "timestamp": time.time(),
747 }
748 logger.debug(
749 f"Queued progress update for research {research_id}: {progress}%"
750 )
752 def queue_error_update(
753 self,
754 username: str,
755 research_id: str,
756 status: str,
757 error_message: str,
758 metadata: Dict[str, Any],
759 completed_at: str,
760 report_path: Optional[str] = None,
761 ):
762 """
763 Queue an error status update that needs database access.
764 For compatibility with old processor during migration.
766 Args:
767 username: The username
768 research_id: The research ID
769 status: The status to set (failed, suspended, etc.)
770 error_message: The error message
771 metadata: Research metadata
772 completed_at: Completion timestamp
773 report_path: Optional path to error report
774 """
775 operation_id = str(uuid.uuid4())
776 with self._pending_operations_lock:
777 self.pending_operations[operation_id] = {
778 "username": username,
779 "operation_type": "error_update",
780 "research_id": research_id,
781 "status": status,
782 "error_message": error_message,
783 "metadata": metadata,
784 "completed_at": completed_at,
785 "report_path": report_path,
786 "timestamp": time.time(),
787 }
788 logger.info(
789 f"Queued error update for research {research_id} with status {status}"
790 )
792 def process_pending_operations_for_user(
793 self, username: str, db_session
794 ) -> int:
795 """
796 Process pending operations for a user when we have database access.
797 Called from request context where encrypted database is accessible.
798 For compatibility with old processor during migration.
800 Args:
801 username: Username to process operations for
802 db_session: Active database session for the user
804 Returns:
805 Number of operations processed
806 """
807 # Find pending operations for this user (with lock)
808 operations_to_process = []
809 with self._pending_operations_lock:
810 for op_id, op_data in list(self.pending_operations.items()):
811 if op_data["username"] == username:
812 operations_to_process.append((op_id, op_data))
813 # Remove immediately to prevent duplicate processing
814 del self.pending_operations[op_id]
816 if not operations_to_process:
817 return 0
819 processed_count = 0
821 # Process operations outside the lock (to avoid holding lock during DB operations)
822 for op_id, op_data in operations_to_process:
823 try:
824 operation_type = op_data.get("operation_type")
826 if operation_type == "progress_update":
827 # Update progress in database
828 from ...database.models import ResearchHistory
830 research = (
831 db_session.query(ResearchHistory)
832 .filter_by(id=op_data["research_id"])
833 .first()
834 )
835 if research: 835 ↛ 822line 835 didn't jump to line 822 because the condition on line 835 was always true
836 # Update the progress column directly
837 research.progress = op_data["progress"]
838 db_session.commit()
839 processed_count += 1
841 elif operation_type == "error_update": 841 ↛ 822line 841 didn't jump to line 822 because the condition on line 841 was always true
842 # Update error status in database
843 from ...database.models import ResearchHistory
845 research = (
846 db_session.query(ResearchHistory)
847 .filter_by(id=op_data["research_id"])
848 .first()
849 )
850 if research: 850 ↛ 822line 850 didn't jump to line 822 because the condition on line 850 was always true
851 research.status = op_data["status"]
852 research.error_message = op_data["error_message"]
853 research.research_meta = op_data["metadata"]
854 research.completed_at = op_data["completed_at"]
855 if op_data.get("report_path"):
856 research.report_path = op_data["report_path"]
857 db_session.commit()
858 processed_count += 1
860 except Exception:
861 logger.exception(f"Error processing operation {op_id}")
862 # Rollback to clear the failed transaction state
863 try:
864 db_session.rollback()
865 except Exception:
866 logger.warning(
867 f"Failed to rollback after error in operation {op_id}"
868 )
870 return processed_count
873# Global queue processor instance
874queue_processor = QueueProcessorV2()