Coverage for src / local_deep_research / web / queue / processor_v2.py: 92%
282 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Queue processor v2 - uses encrypted user databases instead of service.db
3Supports both direct execution and queue modes.
4"""
6import threading
7import time
8import uuid
9from typing import Any, Dict, Optional, Set
11from loguru import logger
13from ...constants import ResearchStatus
14from ...database.encrypted_db import db_manager
15from ...database.models import (
16 QueuedResearch,
17 ResearchHistory,
18 UserActiveResearch,
19)
20from ...database.queue_service import UserQueueService
21from ...database.session_context import get_user_db_session
22from ...database.session_passwords import session_password_store
23from ...notifications.queue_helpers import (
24 send_research_completed_notification_from_session,
25 send_research_failed_notification_from_session,
26)
27from ..routes.globals import active_research, termination_flags
28from ..services.research_service import (
29 run_research_process,
30 start_research_process,
31)
33# Retry configuration constants for notification database queries
34MAX_RESEARCH_LOOKUP_RETRIES = 3
35INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds
36RETRY_BACKOFF_MULTIPLIER = 2
39class QueueProcessorV2:
40 """
41 Processes queued researches using encrypted user databases.
42 This replaces the service.db approach.
43 """
45 def __init__(self, check_interval=10):
46 """
47 Initialize the queue processor.
49 Args:
50 check_interval: How often to check for work (seconds)
51 """
52 self.check_interval = check_interval
53 self.running = False
54 self.thread = None
56 # Per-user settings will be retrieved from each user's database
57 # when processing their queue using SettingsManager
58 logger.info(
59 "Queue processor v2 initialized - will use per-user settings from SettingsManager"
60 )
62 # Track which users we should check
63 self._users_to_check: Set[str] = set()
64 self._users_lock = threading.Lock()
66 # Track pending operations from background threads
67 self.pending_operations = {}
68 self._pending_operations_lock = threading.Lock()
70 def start(self):
71 """Start the queue processor thread."""
72 if self.running:
73 logger.warning("Queue processor already running")
74 return
76 self.running = True
77 self.thread = threading.Thread(
78 target=self._process_queue_loop, daemon=True
79 )
80 self.thread.start()
81 logger.info("Queue processor v2 started")
83 def stop(self):
84 """Stop the queue processor thread."""
85 self.running = False
86 if self.thread:
87 self.thread.join(timeout=10)
88 logger.info("Queue processor v2 stopped")
90 def notify_user_activity(self, username: str, session_id: str):
91 """
92 Notify that a user has activity and their queue should be checked.
94 Args:
95 username: The username
96 session_id: The Flask session ID (for password access)
97 """
98 with self._users_lock:
99 self._users_to_check.add(f"{username}:{session_id}")
100 logger.debug(f"User {username} added to queue check list")
102 def notify_research_queued(self, username: str, research_id: str, **kwargs):
103 """
104 Notify that a research was queued.
105 In direct mode, this immediately starts the research if slots are available.
106 In queue mode, it adds to the queue.
108 Args:
109 username: The username
110 research_id: The research ID
111 **kwargs: Additional parameters for direct execution (query, mode, etc.)
112 """
113 # Check user's queue_mode setting when we have database access
114 if kwargs:
115 session_id = kwargs.get("session_id")
116 if session_id:
117 # Check if we can start it directly
118 password = session_password_store.get_session_password(
119 username, session_id
120 )
121 if password:
122 try:
123 # Open database and check settings + active count
124 engine = db_manager.open_user_database(
125 username, password
126 )
127 if engine: 127 ↛ 192line 127 didn't jump to line 192 because the condition on line 127 was always true
128 with get_user_db_session(username) as db_session:
129 # Get user's settings using SettingsManager
130 from ...settings.manager import SettingsManager
132 settings_manager = SettingsManager(db_session)
134 # Get user's queue_mode setting (env > DB > default)
135 queue_mode = settings_manager.get_setting(
136 "app.queue_mode", "direct"
137 )
139 # Get user's max concurrent setting (env > DB > default)
140 max_concurrent = settings_manager.get_setting(
141 "app.max_concurrent_researches", 3
142 )
144 logger.debug(
145 f"User {username} settings: queue_mode={queue_mode}, "
146 f"max_concurrent={max_concurrent}"
147 )
149 # Only try direct execution if user has queue_mode="direct"
150 if queue_mode == "direct":
151 # Count active researches
152 active_count = (
153 db_session.query(UserActiveResearch)
154 .filter_by(
155 username=username,
156 status=ResearchStatus.IN_PROGRESS,
157 )
158 .count()
159 )
161 if active_count < max_concurrent:
162 # We have slots - start directly!
163 logger.info(
164 f"Direct mode: Starting research {research_id} immediately "
165 f"(active: {active_count}/{max_concurrent})"
166 )
168 # Start the research directly
169 self._start_research_directly(
170 username,
171 research_id,
172 password,
173 **kwargs,
174 )
175 return
176 else:
177 logger.info(
178 f"Direct mode: Max concurrent reached ({active_count}/"
179 f"{max_concurrent}), queueing {research_id}"
180 )
181 else:
182 logger.info(
183 f"User {username} has queue_mode={queue_mode}, "
184 f"queueing research {research_id}"
185 )
186 except Exception:
187 logger.exception(
188 f"Error in direct execution for {username}"
189 )
191 # Fall back to queue mode (or if direct mode failed)
192 try:
193 with get_user_db_session(username) as session:
194 queue_service = UserQueueService(session)
195 queue_service.add_task_metadata(
196 task_id=research_id,
197 task_type="research",
198 priority=0,
199 )
200 logger.info(
201 f"Research {research_id} queued for user {username}"
202 )
203 except Exception:
204 logger.exception(f"Failed to update queue status for {username}")
206 def _start_research_directly(
207 self, username: str, research_id: str, password: str, **kwargs
208 ):
209 """
210 Start a research directly without queueing.
212 Args:
213 username: The username
214 research_id: The research ID
215 password: The user's password
216 **kwargs: Research parameters (query, mode, settings, etc.)
217 """
218 query = kwargs.get("query")
219 mode = kwargs.get("mode")
220 settings_snapshot = kwargs.get("settings_snapshot", {})
222 # Create active research record
223 try:
224 with get_user_db_session(username) as db_session:
225 active_record = UserActiveResearch(
226 username=username,
227 research_id=research_id,
228 status=ResearchStatus.IN_PROGRESS,
229 thread_id="pending",
230 settings_snapshot=settings_snapshot,
231 )
232 db_session.add(active_record)
233 db_session.commit()
235 # Update task status if it exists
236 queue_service = UserQueueService(db_session)
237 queue_service.update_task_status(research_id, "processing")
238 except Exception:
239 logger.exception(
240 f"Failed to create active research record for {research_id}"
241 )
242 return
244 # Extract parameters from kwargs
245 model_provider = kwargs.get("model_provider")
246 model = kwargs.get("model")
247 custom_endpoint = kwargs.get("custom_endpoint")
248 search_engine = kwargs.get("search_engine")
250 # Start the research process
251 try:
252 research_thread = start_research_process(
253 research_id,
254 query,
255 mode,
256 active_research,
257 termination_flags,
258 run_research_process,
259 username=username,
260 user_password=password,
261 model_provider=model_provider,
262 model=model,
263 custom_endpoint=custom_endpoint,
264 search_engine=search_engine,
265 max_results=kwargs.get("max_results"),
266 time_period=kwargs.get("time_period"),
267 iterations=kwargs.get("iterations"),
268 questions_per_iteration=kwargs.get("questions_per_iteration"),
269 strategy=kwargs.get("strategy", "source-based"),
270 settings_snapshot=settings_snapshot,
271 )
273 # Update thread ID
274 try:
275 with get_user_db_session(username) as db_session:
276 active_record = (
277 db_session.query(UserActiveResearch)
278 .filter_by(username=username, research_id=research_id)
279 .first()
280 )
281 if active_record: 281 ↛ 289line 281 didn't jump to line 289
282 active_record.thread_id = str(research_thread.ident)
283 db_session.commit()
284 except Exception:
285 logger.exception(
286 f"Failed to update thread ID for {research_id}"
287 )
289 logger.info(
290 f"Direct execution: Started research {research_id} for user {username} "
291 f"in thread {research_thread.ident}"
292 )
294 except Exception:
295 logger.exception(f"Failed to start research {research_id} directly")
296 # Clean up the active record
297 try:
298 with get_user_db_session(username) as db_session:
299 active_record = (
300 db_session.query(UserActiveResearch)
301 .filter_by(username=username, research_id=research_id)
302 .first()
303 )
304 if active_record: 304 ↛ exitline 304 didn't jump to the function exit
305 db_session.delete(active_record)
306 db_session.commit()
307 except Exception:
308 pass
310 def notify_research_completed(
311 self, username: str, research_id: str, user_password: str = None
312 ):
313 """
314 Notify that a research completed.
315 Updates the user's queue status in their database.
317 Args:
318 username: The username
319 research_id: The research ID
320 user_password: User password for database access. Required for queue
321 updates and database lookups during notification sending.
322 Optional only because some callers may not have it
323 available, in which case only basic updates occur.
324 """
325 try:
326 # get_user_db_session is already imported at module level (line 19)
327 # It accepts optional password parameter and returns a context manager
328 with get_user_db_session(username, user_password) as session:
329 queue_service = UserQueueService(session)
330 queue_service.update_task_status(
331 research_id, ResearchStatus.COMPLETED
332 )
333 logger.info(
334 f"Research {research_id} completed for user {username}"
335 )
337 # Send notification using helper from notification module
338 send_research_completed_notification_from_session(
339 username=username,
340 research_id=research_id,
341 db_session=session,
342 )
344 except Exception:
345 logger.exception(
346 f"Failed to update completion status for {username}"
347 )
349 def notify_research_failed(
350 self,
351 username: str,
352 research_id: str,
353 error_message: str = None,
354 user_password: str = None,
355 ):
356 """
357 Notify that a research failed.
358 Updates the user's queue status in their database and sends notification.
360 Args:
361 username: The username
362 research_id: The research ID
363 error_message: Optional error message
364 user_password: User password for database access. Required for queue
365 updates and database lookups during notification sending.
366 Optional only because some callers may not have it
367 available, in which case only basic updates occur.
368 """
369 try:
370 # get_user_db_session is already imported at module level (line 19)
371 # It accepts optional password parameter and returns a context manager
372 with get_user_db_session(username, user_password) as session:
373 queue_service = UserQueueService(session)
374 queue_service.update_task_status(
375 research_id,
376 ResearchStatus.FAILED,
377 error_message=error_message,
378 )
379 logger.info(
380 f"Research {research_id} failed for user {username}: "
381 f"{error_message}"
382 )
384 # Send notification using helper from notification module
385 send_research_failed_notification_from_session(
386 username=username,
387 research_id=research_id,
388 error_message=error_message or "Unknown error",
389 db_session=session,
390 )
392 except Exception:
393 logger.exception(f"Failed to update failure status for {username}")
395 def _process_queue_loop(self):
396 """Main loop that processes the queue."""
397 while self.running:
398 try:
399 # Get list of users to check (don't clear immediately)
400 with self._users_lock:
401 users_to_check = list(self._users_to_check)
403 # Process each user's queue
404 users_to_remove = []
405 for user_session in users_to_check:
406 try:
407 username, session_id = user_session.split(":", 1)
408 # _process_user_queue returns True if queue is empty
409 queue_empty = self._process_user_queue(
410 username, session_id
411 )
412 if queue_empty:
413 users_to_remove.append(user_session)
414 except Exception:
415 logger.exception(
416 f"Error processing queue for {user_session}"
417 )
418 # Don't remove on error - the _process_user_queue method
419 # determines whether to keep checking based on error type
421 # Only remove users whose queues are now empty
422 with self._users_lock:
423 for user_session in users_to_remove:
424 self._users_to_check.discard(user_session)
426 except Exception:
427 logger.exception("Error in queue processor loop")
429 time.sleep(self.check_interval)
431 def _process_user_queue(self, username: str, session_id: str) -> bool:
432 """
433 Process the queue for a specific user.
435 Args:
436 username: The username
437 session_id: The Flask session ID
439 Returns:
440 True if the queue is empty, False if there are still items
441 """
442 # Get the user's password from session store
443 password = session_password_store.get_session_password(
444 username, session_id
445 )
446 if not password:
447 logger.debug(
448 f"No password available for user {username}, skipping queue check"
449 )
450 return True # Remove from checking - session expired
452 # Open the user's encrypted database
453 try:
454 # First ensure the database is open
455 engine = db_manager.open_user_database(username, password)
456 if not engine:
457 logger.error(f"Failed to open database for user {username}")
458 return False # Keep checking - could be temporary DB issue
460 # Get a session and process the queue
461 with get_user_db_session(username, password) as db_session:
462 queue_service = UserQueueService(db_session)
464 # Get user's settings using SettingsManager
465 from ...settings.manager import SettingsManager
467 settings_manager = SettingsManager(db_session)
469 # Get user's max concurrent setting (env > DB > default)
470 max_concurrent = settings_manager.get_setting(
471 "app.max_concurrent_researches", 3
472 )
474 # Get queue status
475 queue_status = queue_service.get_queue_status() or {
476 "active_tasks": 0,
477 "queued_tasks": 0,
478 }
480 # Calculate available slots
481 available_slots = max_concurrent - queue_status["active_tasks"]
483 if available_slots <= 0:
484 # No slots available, but queue might not be empty
485 return False # Keep checking
487 if queue_status["queued_tasks"] == 0: 487 ↛ 491line 487 didn't jump to line 491 because the condition on line 487 was always true
488 # Queue is empty
489 return True # Remove from checking
491 logger.info(
492 f"Processing queue for {username}: "
493 f"{queue_status['active_tasks']} active, "
494 f"{queue_status['queued_tasks']} queued, "
495 f"{available_slots} slots available"
496 )
498 # Process queued researches
499 self._start_queued_researches(
500 db_session,
501 queue_service,
502 username,
503 password,
504 available_slots,
505 )
507 # Check if there are still items in queue
508 updated_status = queue_service.get_queue_status() or {
509 "queued_tasks": 0
510 }
511 return updated_status["queued_tasks"] == 0
513 except Exception:
514 logger.exception(f"Error processing queue for user {username}")
515 return False # Keep checking - errors might be temporary
517 def _start_queued_researches(
518 self,
519 db_session,
520 queue_service: UserQueueService,
521 username: str,
522 password: str,
523 available_slots: int,
524 ):
525 """Start queued researches up to available slots."""
526 # Get queued researches
527 queued = (
528 db_session.query(QueuedResearch)
529 .filter_by(username=username, is_processing=False)
530 .order_by(QueuedResearch.position)
531 .limit(available_slots)
532 .all()
533 )
535 for queued_research in queued:
536 try:
537 # Mark as processing
538 queued_research.is_processing = True
539 db_session.commit()
541 # Update task status
542 queue_service.update_task_status(
543 queued_research.research_id, "processing"
544 )
546 # Start the research
547 self._start_research(
548 db_session,
549 username,
550 password,
551 queued_research,
552 )
554 # Remove from queue
555 db_session.delete(queued_research)
556 db_session.commit()
558 logger.info(
559 f"Started queued research {queued_research.research_id} "
560 f"for user {username}"
561 )
563 except Exception:
564 logger.exception(
565 f"Error starting queued research {queued_research.research_id}"
566 )
567 # Reset processing flag
568 queued_research.is_processing = False
569 db_session.commit()
571 # Update task status
572 queue_service.update_task_status(
573 queued_research.research_id,
574 ResearchStatus.FAILED,
575 error_message="Failed to start research",
576 )
578 def _start_research(
579 self,
580 db_session,
581 username: str,
582 password: str,
583 queued_research,
584 ):
585 """Start a queued research."""
586 # Update research status
587 research = (
588 db_session.query(ResearchHistory)
589 .filter_by(id=queued_research.research_id)
590 .first()
591 )
593 if not research:
594 raise ValueError(
595 f"Research {queued_research.research_id} not found"
596 )
598 research.status = ResearchStatus.IN_PROGRESS
599 db_session.commit()
601 # Create active research record
602 active_record = UserActiveResearch(
603 username=username,
604 research_id=queued_research.research_id,
605 status=ResearchStatus.IN_PROGRESS,
606 thread_id="pending",
607 settings_snapshot=queued_research.settings_snapshot,
608 )
609 db_session.add(active_record)
610 db_session.commit()
612 # Extract settings
613 settings_snapshot = queued_research.settings_snapshot or {}
615 # Handle new vs legacy structure
616 if (
617 isinstance(settings_snapshot, dict)
618 and "submission" in settings_snapshot
619 ):
620 submission_params = settings_snapshot.get("submission", {})
621 complete_settings = settings_snapshot.get("settings_snapshot", {})
622 else:
623 submission_params = settings_snapshot
624 complete_settings = {}
626 # Start the research process with password
627 research_thread = start_research_process(
628 queued_research.research_id,
629 queued_research.query,
630 queued_research.mode,
631 active_research,
632 termination_flags,
633 run_research_process,
634 username=username,
635 user_password=password, # Pass password for metrics
636 model_provider=submission_params.get("model_provider"),
637 model=submission_params.get("model"),
638 custom_endpoint=submission_params.get("custom_endpoint"),
639 search_engine=submission_params.get("search_engine"),
640 max_results=submission_params.get("max_results"),
641 time_period=submission_params.get("time_period"),
642 iterations=submission_params.get("iterations"),
643 questions_per_iteration=submission_params.get(
644 "questions_per_iteration"
645 ),
646 strategy=submission_params.get("strategy", "source-based"),
647 settings_snapshot=complete_settings,
648 )
650 # Update thread ID
651 active_record.thread_id = str(research_thread.ident)
652 db_session.commit()
654 def process_user_request(self, username: str, session_id: str) -> int:
655 """
656 Process queue for a user during their request.
657 This is called from request context to check and start queued items.
659 Returns:
660 Number of researches started
661 """
662 try:
663 # Add user to check list
664 self.notify_user_activity(username, session_id)
666 # Force immediate check (don't wait for loop)
667 password = session_password_store.get_session_password(
668 username, session_id
669 )
670 if password:
671 # Open database and check queue
672 engine = db_manager.open_user_database(username, password)
673 if engine: 673 ↛ 686line 673 didn't jump to line 686 because the condition on line 673 was always true
674 with get_user_db_session(username) as db_session:
675 queue_service = UserQueueService(db_session)
676 status = queue_service.get_queue_status()
678 if status and status["queued_tasks"] > 0:
679 logger.info(
680 f"User {username} has {status['queued_tasks']} "
681 f"queued tasks, triggering immediate processing"
682 )
683 # Process will happen in background thread
684 return status["queued_tasks"]
686 return 0
688 except Exception:
689 logger.exception(f"Error in process_user_request for {username}")
690 return 0
692 def queue_progress_update(
693 self, username: str, research_id: str, progress: float
694 ):
695 """
696 Queue a progress update that needs database access.
697 For compatibility with old processor during migration.
699 Args:
700 username: The username
701 research_id: The research ID
702 progress: The progress value (0-100)
703 """
704 # In processor_v2, we can update directly if we have database access
705 # or queue it for later processing
706 operation_id = str(uuid.uuid4())
707 with self._pending_operations_lock:
708 self.pending_operations[operation_id] = {
709 "username": username,
710 "operation_type": "progress_update",
711 "research_id": research_id,
712 "progress": progress,
713 "timestamp": time.time(),
714 }
715 logger.debug(
716 f"Queued progress update for research {research_id}: {progress}%"
717 )
719 def queue_error_update(
720 self,
721 username: str,
722 research_id: str,
723 status: str,
724 error_message: str,
725 metadata: Dict[str, Any],
726 completed_at: str,
727 report_path: Optional[str] = None,
728 ):
729 """
730 Queue an error status update that needs database access.
731 For compatibility with old processor during migration.
733 Args:
734 username: The username
735 research_id: The research ID
736 status: The status to set (failed, suspended, etc.)
737 error_message: The error message
738 metadata: Research metadata
739 completed_at: Completion timestamp
740 report_path: Optional path to error report
741 """
742 operation_id = str(uuid.uuid4())
743 with self._pending_operations_lock:
744 self.pending_operations[operation_id] = {
745 "username": username,
746 "operation_type": "error_update",
747 "research_id": research_id,
748 "status": status,
749 "error_message": error_message,
750 "metadata": metadata,
751 "completed_at": completed_at,
752 "report_path": report_path,
753 "timestamp": time.time(),
754 }
755 logger.info(
756 f"Queued error update for research {research_id} with status {status}"
757 )
759 def process_pending_operations_for_user(
760 self, username: str, db_session
761 ) -> int:
762 """
763 Process pending operations for a user when we have database access.
764 Called from request context where encrypted database is accessible.
765 For compatibility with old processor during migration.
767 Args:
768 username: Username to process operations for
769 db_session: Active database session for the user
771 Returns:
772 Number of operations processed
773 """
774 # Find pending operations for this user (with lock)
775 operations_to_process = []
776 with self._pending_operations_lock:
777 for op_id, op_data in list(self.pending_operations.items()):
778 if op_data["username"] == username:
779 operations_to_process.append((op_id, op_data))
780 # Remove immediately to prevent duplicate processing
781 del self.pending_operations[op_id]
783 if not operations_to_process:
784 return 0
786 processed_count = 0
788 # Process operations outside the lock (to avoid holding lock during DB operations)
789 for op_id, op_data in operations_to_process:
790 try:
791 operation_type = op_data.get("operation_type")
793 if operation_type == "progress_update":
794 # Update progress in database
795 from ...database.models import ResearchHistory
797 research = (
798 db_session.query(ResearchHistory)
799 .filter_by(id=op_data["research_id"])
800 .first()
801 )
802 if research: 802 ↛ 789line 802 didn't jump to line 789 because the condition on line 802 was always true
803 # Update the progress column directly
804 research.progress = op_data["progress"]
805 db_session.commit()
806 processed_count += 1
808 elif operation_type == "error_update": 808 ↛ 789line 808 didn't jump to line 789 because the condition on line 808 was always true
809 # Update error status in database
810 from ...database.models import ResearchHistory
812 research = (
813 db_session.query(ResearchHistory)
814 .filter_by(id=op_data["research_id"])
815 .first()
816 )
817 if research: 817 ↛ 789line 817 didn't jump to line 789 because the condition on line 817 was always true
818 research.status = op_data["status"]
819 research.error_message = op_data["error_message"]
820 research.research_meta = op_data["metadata"]
821 research.completed_at = op_data["completed_at"]
822 if op_data.get("report_path"):
823 research.report_path = op_data["report_path"]
824 db_session.commit()
825 processed_count += 1
827 except Exception:
828 logger.exception(f"Error processing operation {op_id}")
829 # Rollback to clear the failed transaction state
830 try:
831 db_session.rollback()
832 except Exception:
833 logger.warning(
834 f"Failed to rollback after error in operation {op_id}"
835 )
837 return processed_count
840# Global queue processor instance
841queue_processor = QueueProcessorV2()