Coverage for src / local_deep_research / web / queue / manager.py: 100%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Queue manager for handling research queue operations"""
3from loguru import logger
4from sqlalchemy import func
6from ...database.session_context import get_user_db_session
7from ...database.models import QueuedResearch, ResearchHistory
8from .processor_v2 import queue_processor
11class QueueManager:
12 """Manages the research queue operations"""
14 @staticmethod
15 def add_to_queue(username, research_id, query, mode, settings):
16 """
17 Add a research to the queue
19 Args:
20 username: User who owns the research
21 research_id: UUID of the research
22 query: Research query
23 mode: Research mode
24 settings: Research settings dictionary
26 Returns:
27 int: Queue position
28 """
29 with get_user_db_session(username) as session:
30 # Get the next position in queue for this user
31 max_position = (
32 session.query(func.max(QueuedResearch.position))
33 .filter_by(username=username)
34 .scalar()
35 or 0
36 )
38 queued_record = QueuedResearch(
39 username=username,
40 research_id=research_id,
41 query=query,
42 mode=mode,
43 settings_snapshot=settings,
44 position=max_position + 1,
45 )
46 session.add(queued_record)
47 session.commit()
49 logger.info(
50 f"Added research {research_id} to queue at position {max_position + 1}"
51 )
53 # Send RESEARCH_QUEUED notification if enabled
54 try:
55 from ...settings import SettingsManager
56 from ...notifications import send_queue_notification
58 settings_manager = SettingsManager(session)
59 settings_snapshot = settings_manager.get_settings_snapshot()
61 send_queue_notification(
62 username=username,
63 research_id=research_id,
64 query=query,
65 settings_snapshot=settings_snapshot,
66 position=max_position + 1,
67 )
68 except Exception as e:
69 logger.debug(f"Failed to send queued notification: {e}")
71 # Notify queue processor about the new queued research
72 # Note: When using QueueManager, we don't have all parameters for direct execution
73 # So it will fall back to queue mode
74 try:
75 queue_processor.notify_research_queued(username, research_id)
76 except Exception:
77 logger.warning("Failed to notify queue processor")
79 return max_position + 1
81 @staticmethod
82 def get_queue_position(username, research_id):
83 """
84 Get the current queue position for a research
86 Args:
87 username: User who owns the research
88 research_id: UUID of the research
90 Returns:
91 int: Current queue position or None if not in queue
92 """
93 with get_user_db_session(username) as session:
94 queued = (
95 session.query(QueuedResearch)
96 .filter_by(username=username, research_id=research_id)
97 .first()
98 )
100 if not queued:
101 return None
103 # Count how many are ahead in queue
104 ahead_count = (
105 session.query(QueuedResearch)
106 .filter(
107 QueuedResearch.username == username,
108 QueuedResearch.position < queued.position,
109 )
110 .count()
111 )
113 return ahead_count + 1
115 @staticmethod
116 def remove_from_queue(username, research_id):
117 """
118 Remove a research from the queue
120 Args:
121 username: User who owns the research
122 research_id: UUID of the research
124 Returns:
125 bool: True if removed, False if not found
126 """
127 with get_user_db_session(username) as session:
128 queued = (
129 session.query(QueuedResearch)
130 .filter_by(username=username, research_id=research_id)
131 .first()
132 )
134 if not queued:
135 return False
137 position = queued.position
138 session.delete(queued)
140 # Update positions of items behind in queue
141 session.query(QueuedResearch).filter(
142 QueuedResearch.username == username,
143 QueuedResearch.position > position,
144 ).update({QueuedResearch.position: QueuedResearch.position - 1})
146 session.commit()
147 logger.info(f"Removed research {research_id} from queue")
148 return True
150 @staticmethod
151 def get_user_queue(username):
152 """
153 Get all queued researches for a user
155 Args:
156 username: User to get queue for
158 Returns:
159 list: List of queued research info
160 """
161 with get_user_db_session(username) as session:
162 queued_items = (
163 session.query(QueuedResearch)
164 .filter_by(username=username)
165 .order_by(QueuedResearch.position)
166 .all()
167 )
169 result = []
170 for item in queued_items:
171 # Get research info
172 research = (
173 session.query(ResearchHistory)
174 .filter_by(id=item.research_id)
175 .first()
176 )
178 if research:
179 result.append(
180 {
181 "research_id": item.research_id,
182 "query": item.query,
183 "mode": item.mode,
184 "position": item.position,
185 "created_at": item.created_at.isoformat()
186 if item.created_at
187 else None,
188 "is_processing": item.is_processing,
189 }
190 )
192 return result