Coverage for src / local_deep_research / web / queue / manager.py: 16%
79 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""Queue manager for handling research queue operations"""
3from loguru import logger
4from sqlalchemy import func
5from sqlalchemy.orm import sessionmaker
7from ...database.encrypted_db import db_manager
8from ...database.models import QueuedResearch, ResearchHistory
9from .processor_v2 import queue_processor
12class QueueManager:
13 """Manages the research queue operations"""
15 @staticmethod
16 def add_to_queue(username, research_id, query, mode, settings):
17 """
18 Add a research to the queue
20 Args:
21 username: User who owns the research
22 research_id: UUID of the research
23 query: Research query
24 mode: Research mode
25 settings: Research settings dictionary
27 Returns:
28 int: Queue position
29 """
30 engine = db_manager.connections.get(username)
31 if not engine:
32 raise ValueError(f"No database connection for user {username}")
34 SessionLocal = sessionmaker(bind=engine)
35 session = SessionLocal()
37 try:
38 # Get the next position in queue for this user
39 max_position = (
40 session.query(func.max(QueuedResearch.position))
41 .filter_by(username=username)
42 .scalar()
43 or 0
44 )
46 queued_record = QueuedResearch(
47 username=username,
48 research_id=research_id,
49 query=query,
50 mode=mode,
51 settings_snapshot=settings,
52 position=max_position + 1,
53 )
54 session.add(queued_record)
55 session.commit()
57 logger.info(
58 f"Added research {research_id} to queue at position {max_position + 1}"
59 )
61 # Send RESEARCH_QUEUED notification if enabled
62 try:
63 from ...settings import SettingsManager
64 from ...notifications import send_queue_notification
66 settings_manager = SettingsManager(session)
67 settings_snapshot = settings_manager.get_settings_snapshot()
69 send_queue_notification(
70 username=username,
71 research_id=research_id,
72 query=query,
73 settings_snapshot=settings_snapshot,
74 position=max_position + 1,
75 )
76 except Exception as e:
77 logger.debug(f"Failed to send queued notification: {e}")
79 # Notify queue processor about the new queued research
80 # Note: When using QueueManager, we don't have all parameters for direct execution
81 # So it will fall back to queue mode
82 queue_processor.notify_research_queued(username, research_id)
84 return max_position + 1
86 finally:
87 session.close()
89 @staticmethod
90 def get_queue_position(username, research_id):
91 """
92 Get the current queue position for a research
94 Args:
95 username: User who owns the research
96 research_id: UUID of the research
98 Returns:
99 int: Current queue position or None if not in queue
100 """
101 engine = db_manager.connections.get(username)
102 if not engine:
103 return None
105 SessionLocal = sessionmaker(bind=engine)
106 session = SessionLocal()
108 try:
109 queued = (
110 session.query(QueuedResearch)
111 .filter_by(username=username, research_id=research_id)
112 .first()
113 )
115 if not queued:
116 return None
118 # Count how many are ahead in queue
119 ahead_count = (
120 session.query(QueuedResearch)
121 .filter(
122 QueuedResearch.username == username,
123 QueuedResearch.position < queued.position,
124 )
125 .count()
126 )
128 return ahead_count + 1
130 finally:
131 session.close()
133 @staticmethod
134 def remove_from_queue(username, research_id):
135 """
136 Remove a research from the queue
138 Args:
139 username: User who owns the research
140 research_id: UUID of the research
142 Returns:
143 bool: True if removed, False if not found
144 """
145 engine = db_manager.connections.get(username)
146 if not engine:
147 return False
149 SessionLocal = sessionmaker(bind=engine)
150 session = SessionLocal()
152 try:
153 queued = (
154 session.query(QueuedResearch)
155 .filter_by(username=username, research_id=research_id)
156 .first()
157 )
159 if not queued:
160 return False
162 position = queued.position
163 session.delete(queued)
165 # Update positions of items behind in queue
166 session.query(QueuedResearch).filter(
167 QueuedResearch.username == username,
168 QueuedResearch.position > position,
169 ).update({QueuedResearch.position: QueuedResearch.position - 1})
171 session.commit()
172 logger.info(f"Removed research {research_id} from queue")
173 return True
175 finally:
176 session.close()
178 @staticmethod
179 def get_user_queue(username):
180 """
181 Get all queued researches for a user
183 Args:
184 username: User to get queue for
186 Returns:
187 list: List of queued research info
188 """
189 engine = db_manager.connections.get(username)
190 if not engine:
191 return []
193 SessionLocal = sessionmaker(bind=engine)
194 session = SessionLocal()
196 try:
197 queued_items = (
198 session.query(QueuedResearch)
199 .filter_by(username=username)
200 .order_by(QueuedResearch.position)
201 .all()
202 )
204 result = []
205 for item in queued_items:
206 # Get research info
207 research = (
208 session.query(ResearchHistory)
209 .filter_by(id=item.research_id)
210 .first()
211 )
213 if research:
214 result.append(
215 {
216 "research_id": item.research_id,
217 "query": item.query,
218 "mode": item.mode,
219 "position": item.position,
220 "created_at": item.created_at.isoformat()
221 if item.created_at
222 else None,
223 "is_processing": item.is_processing,
224 }
225 )
227 return result
229 finally:
230 session.close()