Coverage for src / local_deep_research / web / queue / processor_v2.py: 51%
281 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Queue processor v2 - uses encrypted user databases instead of service.db
3Supports both direct execution and queue modes.
4"""
6import threading
7import time
8import uuid
9from typing import Any, Dict, Optional, Set
11from loguru import logger
13from ...database.encrypted_db import db_manager
14from ...database.models import (
15 QueuedResearch,
16 ResearchHistory,
17 UserActiveResearch,
18)
19from ...database.queue_service import UserQueueService
20from ...database.session_context import get_user_db_session
21from ...database.session_passwords import session_password_store
22from ...notifications.queue_helpers import (
23 send_research_completed_notification_from_session,
24 send_research_failed_notification_from_session,
25)
26from ..routes.globals import active_research, termination_flags
27from ..services.research_service import (
28 run_research_process,
29 start_research_process,
30)
32# Retry configuration constants for notification database queries
33MAX_RESEARCH_LOOKUP_RETRIES = 3
34INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds
35RETRY_BACKOFF_MULTIPLIER = 2
38class QueueProcessorV2:
39 """
40 Processes queued researches using encrypted user databases.
41 This replaces the service.db approach.
42 """
44 def __init__(self, check_interval=10):
45 """
46 Initialize the queue processor.
48 Args:
49 check_interval: How often to check for work (seconds)
50 """
51 self.check_interval = check_interval
52 self.running = False
53 self.thread = None
55 # Per-user settings will be retrieved from each user's database
56 # when processing their queue using SettingsManager
57 logger.info(
58 "Queue processor v2 initialized - will use per-user settings from SettingsManager"
59 )
61 # Track which users we should check
62 self._users_to_check: Set[str] = set()
63 self._users_lock = threading.Lock()
65 # Track pending operations from background threads
66 self.pending_operations = {}
67 self._pending_operations_lock = threading.Lock()
69 def start(self):
70 """Start the queue processor thread."""
71 if self.running:
72 logger.warning("Queue processor already running")
73 return
75 self.running = True
76 self.thread = threading.Thread(
77 target=self._process_queue_loop, daemon=True
78 )
79 self.thread.start()
80 logger.info("Queue processor v2 started")
82 def stop(self):
83 """Stop the queue processor thread."""
84 self.running = False
85 if self.thread:
86 self.thread.join(timeout=10)
87 logger.info("Queue processor v2 stopped")
89 def notify_user_activity(self, username: str, session_id: str):
90 """
91 Notify that a user has activity and their queue should be checked.
93 Args:
94 username: The username
95 session_id: The Flask session ID (for password access)
96 """
97 with self._users_lock:
98 self._users_to_check.add(f"{username}:{session_id}")
99 logger.debug(f"User {username} added to queue check list")
101 def notify_research_queued(self, username: str, research_id: str, **kwargs):
102 """
103 Notify that a research was queued.
104 In direct mode, this immediately starts the research if slots are available.
105 In queue mode, it adds to the queue.
107 Args:
108 username: The username
109 research_id: The research ID
110 **kwargs: Additional parameters for direct execution (query, mode, etc.)
111 """
112 # Check user's queue_mode setting when we have database access
113 if kwargs:
114 session_id = kwargs.get("session_id")
115 if session_id:
116 # Check if we can start it directly
117 password = session_password_store.get_session_password(
118 username, session_id
119 )
120 if password:
121 try:
122 # Open database and check settings + active count
123 engine = db_manager.open_user_database(
124 username, password
125 )
126 if engine:
127 with get_user_db_session(username) as db_session:
128 # Get user's settings using SettingsManager
129 from ...settings.manager import SettingsManager
131 settings_manager = SettingsManager(db_session)
133 # Get user's queue_mode setting (env > DB > default)
134 queue_mode = settings_manager.get_setting(
135 "app.queue_mode", "direct"
136 )
138 # Get user's max concurrent setting (env > DB > default)
139 max_concurrent = settings_manager.get_setting(
140 "app.max_concurrent_researches", 3
141 )
143 logger.debug(
144 f"User {username} settings: queue_mode={queue_mode}, "
145 f"max_concurrent={max_concurrent}"
146 )
148 # Only try direct execution if user has queue_mode="direct"
149 if queue_mode == "direct":
150 # Count active researches
151 active_count = (
152 db_session.query(UserActiveResearch)
153 .filter_by(
154 username=username,
155 status="in_progress",
156 )
157 .count()
158 )
160 if active_count < max_concurrent:
161 # We have slots - start directly!
162 logger.info(
163 f"Direct mode: Starting research {research_id} immediately "
164 f"(active: {active_count}/{max_concurrent})"
165 )
167 # Start the research directly
168 self._start_research_directly(
169 username,
170 research_id,
171 password,
172 **kwargs,
173 )
174 return
175 else:
176 logger.info(
177 f"Direct mode: Max concurrent reached ({active_count}/"
178 f"{max_concurrent}), queueing {research_id}"
179 )
180 else:
181 logger.info(
182 f"User {username} has queue_mode={queue_mode}, "
183 f"queueing research {research_id}"
184 )
185 except Exception:
186 logger.exception(
187 f"Error in direct execution for {username}"
188 )
190 # Fall back to queue mode (or if direct mode failed)
191 try:
192 with get_user_db_session(username) as session:
193 queue_service = UserQueueService(session)
194 queue_service.add_task_metadata(
195 task_id=research_id,
196 task_type="research",
197 priority=0,
198 )
199 logger.info(
200 f"Research {research_id} queued for user {username}"
201 )
202 except Exception:
203 logger.exception(f"Failed to update queue status for {username}")
205 def _start_research_directly(
206 self, username: str, research_id: str, password: str, **kwargs
207 ):
208 """
209 Start a research directly without queueing.
211 Args:
212 username: The username
213 research_id: The research ID
214 password: The user's password
215 **kwargs: Research parameters (query, mode, settings, etc.)
216 """
217 query = kwargs.get("query")
218 mode = kwargs.get("mode")
219 settings_snapshot = kwargs.get("settings_snapshot", {})
221 # Create active research record
222 try:
223 with get_user_db_session(username) as db_session:
224 active_record = UserActiveResearch(
225 username=username,
226 research_id=research_id,
227 status="in_progress",
228 thread_id="pending",
229 settings_snapshot=settings_snapshot,
230 )
231 db_session.add(active_record)
232 db_session.commit()
234 # Update task status if it exists
235 queue_service = UserQueueService(db_session)
236 queue_service.update_task_status(research_id, "processing")
237 except Exception:
238 logger.exception(
239 f"Failed to create active research record for {research_id}"
240 )
241 return
243 # Extract parameters from kwargs
244 model_provider = kwargs.get("model_provider")
245 model = kwargs.get("model")
246 custom_endpoint = kwargs.get("custom_endpoint")
247 search_engine = kwargs.get("search_engine")
249 # Start the research process
250 try:
251 research_thread = start_research_process(
252 research_id,
253 query,
254 mode,
255 active_research,
256 termination_flags,
257 run_research_process,
258 username=username,
259 user_password=password,
260 model_provider=model_provider,
261 model=model,
262 custom_endpoint=custom_endpoint,
263 search_engine=search_engine,
264 max_results=kwargs.get("max_results"),
265 time_period=kwargs.get("time_period"),
266 iterations=kwargs.get("iterations"),
267 questions_per_iteration=kwargs.get("questions_per_iteration"),
268 strategy=kwargs.get("strategy", "source-based"),
269 settings_snapshot=settings_snapshot,
270 )
272 # Update thread ID
273 try:
274 with get_user_db_session(username) as db_session:
275 active_record = (
276 db_session.query(UserActiveResearch)
277 .filter_by(username=username, research_id=research_id)
278 .first()
279 )
280 if active_record:
281 active_record.thread_id = str(research_thread.ident)
282 db_session.commit()
283 except Exception:
284 logger.exception(
285 f"Failed to update thread ID for {research_id}"
286 )
288 logger.info(
289 f"Direct execution: Started research {research_id} for user {username} "
290 f"in thread {research_thread.ident}"
291 )
293 except Exception:
294 logger.exception(f"Failed to start research {research_id} directly")
295 # Clean up the active record
296 try:
297 with get_user_db_session(username) as db_session:
298 active_record = (
299 db_session.query(UserActiveResearch)
300 .filter_by(username=username, research_id=research_id)
301 .first()
302 )
303 if active_record:
304 db_session.delete(active_record)
305 db_session.commit()
306 except Exception:
307 pass
309 def notify_research_completed(
310 self, username: str, research_id: str, user_password: str = None
311 ):
312 """
313 Notify that a research completed.
314 Updates the user's queue status in their database.
316 Args:
317 username: The username
318 research_id: The research ID
319 user_password: User password for database access. Required for queue
320 updates and database lookups during notification sending.
321 Optional only because some callers may not have it
322 available, in which case only basic updates occur.
323 """
324 try:
325 # get_user_db_session is already imported at module level (line 19)
326 # It accepts optional password parameter and returns a context manager
327 with get_user_db_session(username, user_password) as session:
328 queue_service = UserQueueService(session)
329 queue_service.update_task_status(research_id, "completed")
330 logger.info(
331 f"Research {research_id} completed for user {username}"
332 )
334 # Send notification using helper from notification module
335 send_research_completed_notification_from_session(
336 username=username,
337 research_id=research_id,
338 db_session=session,
339 )
341 except Exception:
342 logger.exception(
343 f"Failed to update completion status for {username}"
344 )
346 def notify_research_failed(
347 self,
348 username: str,
349 research_id: str,
350 error_message: str = None,
351 user_password: str = None,
352 ):
353 """
354 Notify that a research failed.
355 Updates the user's queue status in their database and sends notification.
357 Args:
358 username: The username
359 research_id: The research ID
360 error_message: Optional error message
361 user_password: User password for database access. Required for queue
362 updates and database lookups during notification sending.
363 Optional only because some callers may not have it
364 available, in which case only basic updates occur.
365 """
366 try:
367 # get_user_db_session is already imported at module level (line 19)
368 # It accepts optional password parameter and returns a context manager
369 with get_user_db_session(username, user_password) as session:
370 queue_service = UserQueueService(session)
371 queue_service.update_task_status(
372 research_id, "failed", error_message=error_message
373 )
374 logger.info(
375 f"Research {research_id} failed for user {username}: "
376 f"{error_message}"
377 )
379 # Send notification using helper from notification module
380 send_research_failed_notification_from_session(
381 username=username,
382 research_id=research_id,
383 error_message=error_message or "Unknown error",
384 db_session=session,
385 )
387 except Exception:
388 logger.exception(f"Failed to update failure status for {username}")
390 def _process_queue_loop(self):
391 """Main loop that processes the queue."""
392 while self.running: 392 ↛ exitline 392 didn't return from function '_process_queue_loop' because the condition on line 392 was always true
393 try:
394 # Get list of users to check (don't clear immediately)
395 with self._users_lock:
396 users_to_check = list(self._users_to_check)
398 # Process each user's queue
399 users_to_remove = []
400 for user_session in users_to_check:
401 try:
402 username, session_id = user_session.split(":", 1)
403 # _process_user_queue returns True if queue is empty
404 queue_empty = self._process_user_queue(
405 username, session_id
406 )
407 if queue_empty:
408 users_to_remove.append(user_session)
409 except Exception:
410 logger.exception(
411 f"Error processing queue for {user_session}"
412 )
413 # Don't remove on error - the _process_user_queue method
414 # determines whether to keep checking based on error type
416 # Only remove users whose queues are now empty
417 with self._users_lock:
418 for user_session in users_to_remove:
419 self._users_to_check.discard(user_session)
421 except Exception:
422 logger.exception("Error in queue processor loop")
424 time.sleep(self.check_interval)
426 def _process_user_queue(self, username: str, session_id: str) -> bool:
427 """
428 Process the queue for a specific user.
430 Args:
431 username: The username
432 session_id: The Flask session ID
434 Returns:
435 True if the queue is empty, False if there are still items
436 """
437 # Get the user's password from session store
438 password = session_password_store.get_session_password(
439 username, session_id
440 )
441 if not password: 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true
442 logger.debug(
443 f"No password available for user {username}, skipping queue check"
444 )
445 return True # Remove from checking - session expired
447 # Open the user's encrypted database
448 try:
449 # First ensure the database is open
450 engine = db_manager.open_user_database(username, password)
451 if not engine:
452 logger.error(f"Failed to open database for user {username}")
453 return False # Keep checking - could be temporary DB issue
455 # Get a session and process the queue
456 with get_user_db_session(username, password) as db_session:
457 queue_service = UserQueueService(db_session)
459 # Get user's settings using SettingsManager
460 from ...settings.manager import SettingsManager
462 settings_manager = SettingsManager(db_session)
464 # Get user's max concurrent setting (env > DB > default)
465 max_concurrent = settings_manager.get_setting(
466 "app.max_concurrent_researches", 3
467 )
469 # Get queue status
470 queue_status = queue_service.get_queue_status() or {
471 "active_tasks": 0,
472 "queued_tasks": 0,
473 }
475 # Calculate available slots
476 available_slots = max_concurrent - queue_status["active_tasks"]
478 if available_slots <= 0: 478 ↛ 480line 478 didn't jump to line 480 because the condition on line 478 was never true
479 # No slots available, but queue might not be empty
480 return False # Keep checking
482 if queue_status["queued_tasks"] == 0: 482 ↛ 486line 482 didn't jump to line 486 because the condition on line 482 was always true
483 # Queue is empty
484 return True # Remove from checking
486 logger.info(
487 f"Processing queue for {username}: "
488 f"{queue_status['active_tasks']} active, "
489 f"{queue_status['queued_tasks']} queued, "
490 f"{available_slots} slots available"
491 )
493 # Process queued researches
494 self._start_queued_researches(
495 db_session,
496 queue_service,
497 username,
498 password,
499 available_slots,
500 )
502 # Check if there are still items in queue
503 updated_status = queue_service.get_queue_status() or {
504 "queued_tasks": 0
505 }
506 return updated_status["queued_tasks"] == 0
508 except Exception:
509 logger.exception(f"Error processing queue for user {username}")
510 return False # Keep checking - errors might be temporary
512 def _start_queued_researches(
513 self,
514 db_session,
515 queue_service: UserQueueService,
516 username: str,
517 password: str,
518 available_slots: int,
519 ):
520 """Start queued researches up to available slots."""
521 # Get queued researches
522 queued = (
523 db_session.query(QueuedResearch)
524 .filter_by(username=username, is_processing=False)
525 .order_by(QueuedResearch.position)
526 .limit(available_slots)
527 .all()
528 )
530 for queued_research in queued:
531 try:
532 # Mark as processing
533 queued_research.is_processing = True
534 db_session.commit()
536 # Update task status
537 queue_service.update_task_status(
538 queued_research.research_id, "processing"
539 )
541 # Start the research
542 self._start_research(
543 db_session,
544 username,
545 password,
546 queued_research,
547 )
549 # Remove from queue
550 db_session.delete(queued_research)
551 db_session.commit()
553 logger.info(
554 f"Started queued research {queued_research.research_id} "
555 f"for user {username}"
556 )
558 except Exception:
559 logger.exception(
560 f"Error starting queued research {queued_research.research_id}"
561 )
562 # Reset processing flag
563 queued_research.is_processing = False
564 db_session.commit()
566 # Update task status
567 queue_service.update_task_status(
568 queued_research.research_id,
569 "failed",
570 error_message="Failed to start research",
571 )
573 def _start_research(
574 self,
575 db_session,
576 username: str,
577 password: str,
578 queued_research,
579 ):
580 """Start a queued research."""
581 # Update research status
582 research = (
583 db_session.query(ResearchHistory)
584 .filter_by(id=queued_research.research_id)
585 .first()
586 )
588 if not research:
589 raise ValueError(
590 f"Research {queued_research.research_id} not found"
591 )
593 research.status = "in_progress"
594 db_session.commit()
596 # Create active research record
597 active_record = UserActiveResearch(
598 username=username,
599 research_id=queued_research.research_id,
600 status="in_progress",
601 thread_id="pending",
602 settings_snapshot=queued_research.settings_snapshot,
603 )
604 db_session.add(active_record)
605 db_session.commit()
607 # Extract settings
608 settings_snapshot = queued_research.settings_snapshot or {}
610 # Handle new vs legacy structure
611 if (
612 isinstance(settings_snapshot, dict)
613 and "submission" in settings_snapshot
614 ):
615 submission_params = settings_snapshot.get("submission", {})
616 complete_settings = settings_snapshot.get("settings_snapshot", {})
617 else:
618 submission_params = settings_snapshot
619 complete_settings = {}
621 # Start the research process with password
622 research_thread = start_research_process(
623 queued_research.research_id,
624 queued_research.query,
625 queued_research.mode,
626 active_research,
627 termination_flags,
628 run_research_process,
629 username=username,
630 user_password=password, # Pass password for metrics
631 model_provider=submission_params.get("model_provider"),
632 model=submission_params.get("model"),
633 custom_endpoint=submission_params.get("custom_endpoint"),
634 search_engine=submission_params.get("search_engine"),
635 max_results=submission_params.get("max_results"),
636 time_period=submission_params.get("time_period"),
637 iterations=submission_params.get("iterations"),
638 questions_per_iteration=submission_params.get(
639 "questions_per_iteration"
640 ),
641 strategy=submission_params.get("strategy", "source-based"),
642 settings_snapshot=complete_settings,
643 )
645 # Update thread ID
646 active_record.thread_id = str(research_thread.ident)
647 db_session.commit()
649 def process_user_request(self, username: str, session_id: str) -> int:
650 """
651 Process queue for a user during their request.
652 This is called from request context to check and start queued items.
654 Returns:
655 Number of researches started
656 """
657 try:
658 # Add user to check list
659 self.notify_user_activity(username, session_id)
661 # Force immediate check (don't wait for loop)
662 password = session_password_store.get_session_password(
663 username, session_id
664 )
665 if password: 665 ↛ 681line 665 didn't jump to line 681 because the condition on line 665 was always true
666 # Open database and check queue
667 engine = db_manager.open_user_database(username, password)
668 if engine: 668 ↛ 681line 668 didn't jump to line 681 because the condition on line 668 was always true
669 with get_user_db_session(username) as db_session:
670 queue_service = UserQueueService(db_session)
671 status = queue_service.get_queue_status()
673 if status and status["queued_tasks"] > 0: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 logger.info(
675 f"User {username} has {status['queued_tasks']} "
676 f"queued tasks, triggering immediate processing"
677 )
678 # Process will happen in background thread
679 return status["queued_tasks"]
681 return 0
683 except Exception:
684 logger.exception(f"Error in process_user_request for {username}")
685 return 0
687 def queue_progress_update(
688 self, username: str, research_id: str, progress: float
689 ):
690 """
691 Queue a progress update that needs database access.
692 For compatibility with old processor during migration.
694 Args:
695 username: The username
696 research_id: The research ID
697 progress: The progress value (0-100)
698 """
699 # In processor_v2, we can update directly if we have database access
700 # or queue it for later processing
701 operation_id = str(uuid.uuid4())
702 with self._pending_operations_lock:
703 self.pending_operations[operation_id] = {
704 "username": username,
705 "operation_type": "progress_update",
706 "research_id": research_id,
707 "progress": progress,
708 "timestamp": time.time(),
709 }
710 logger.debug(
711 f"Queued progress update for research {research_id}: {progress}%"
712 )
714 def queue_error_update(
715 self,
716 username: str,
717 research_id: str,
718 status: str,
719 error_message: str,
720 metadata: Dict[str, Any],
721 completed_at: str,
722 report_path: Optional[str] = None,
723 ):
724 """
725 Queue an error status update that needs database access.
726 For compatibility with old processor during migration.
728 Args:
729 username: The username
730 research_id: The research ID
731 status: The status to set (failed, suspended, etc.)
732 error_message: The error message
733 metadata: Research metadata
734 completed_at: Completion timestamp
735 report_path: Optional path to error report
736 """
737 operation_id = str(uuid.uuid4())
738 with self._pending_operations_lock:
739 self.pending_operations[operation_id] = {
740 "username": username,
741 "operation_type": "error_update",
742 "research_id": research_id,
743 "status": status,
744 "error_message": error_message,
745 "metadata": metadata,
746 "completed_at": completed_at,
747 "report_path": report_path,
748 "timestamp": time.time(),
749 }
750 logger.info(
751 f"Queued error update for research {research_id} with status {status}"
752 )
754 def process_pending_operations_for_user(
755 self, username: str, db_session
756 ) -> int:
757 """
758 Process pending operations for a user when we have database access.
759 Called from request context where encrypted database is accessible.
760 For compatibility with old processor during migration.
762 Args:
763 username: Username to process operations for
764 db_session: Active database session for the user
766 Returns:
767 Number of operations processed
768 """
769 # Find pending operations for this user (with lock)
770 operations_to_process = []
771 with self._pending_operations_lock:
772 for op_id, op_data in list(self.pending_operations.items()):
773 if op_data["username"] == username:
774 operations_to_process.append((op_id, op_data))
775 # Remove immediately to prevent duplicate processing
776 del self.pending_operations[op_id]
778 if not operations_to_process:
779 return 0
781 processed_count = 0
783 # Process operations outside the lock (to avoid holding lock during DB operations)
784 for op_id, op_data in operations_to_process:
785 try:
786 operation_type = op_data.get("operation_type")
788 if operation_type == "progress_update":
789 # Update progress in database
790 from ...database.models import ResearchHistory
792 research = (
793 db_session.query(ResearchHistory)
794 .filter_by(id=op_data["research_id"])
795 .first()
796 )
797 if research: 797 ↛ 784line 797 didn't jump to line 784 because the condition on line 797 was always true
798 # Update the progress column directly
799 research.progress = op_data["progress"]
800 db_session.commit()
801 processed_count += 1
803 elif operation_type == "error_update": 803 ↛ 784line 803 didn't jump to line 784 because the condition on line 803 was always true
804 # Update error status in database
805 from ...database.models import ResearchHistory
807 research = (
808 db_session.query(ResearchHistory)
809 .filter_by(id=op_data["research_id"])
810 .first()
811 )
812 if research: 812 ↛ 784line 812 didn't jump to line 784 because the condition on line 812 was always true
813 research.status = op_data["status"]
814 research.error_message = op_data["error_message"]
815 research.research_meta = op_data["metadata"]
816 research.completed_at = op_data["completed_at"]
817 if op_data.get("report_path"): 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true
818 research.report_path = op_data["report_path"]
819 db_session.commit()
820 processed_count += 1
822 except Exception:
823 logger.exception(f"Error processing operation {op_id}")
824 # Rollback to clear the failed transaction state
825 try:
826 db_session.rollback()
827 except Exception:
828 logger.warning(
829 f"Failed to rollback after error in operation {op_id}"
830 )
832 return processed_count
835# Global queue processor instance
836queue_processor = QueueProcessorV2()