Coverage for src / local_deep_research / web / queue / manager.py: 96%
66 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""Queue manager for handling research queue operations"""
3from loguru import logger
4from sqlalchemy import func
6from ...database.encrypted_db import db_manager
7from ...database.models import QueuedResearch, ResearchHistory
8from .processor_v2 import queue_processor
11class QueueManager:
12 """Manages the research queue operations"""
14 @staticmethod
15 def add_to_queue(username, research_id, query, mode, settings):
16 """
17 Add a research to the queue
19 Args:
20 username: User who owns the research
21 research_id: UUID of the research
22 query: Research query
23 mode: Research mode
24 settings: Research settings dictionary
26 Returns:
27 int: Queue position
28 """
29 session = db_manager.get_session(username)
30 if not session:
31 raise ValueError(f"No database connection for user {username}")
33 with session:
34 # Get the next position in queue for this user
35 max_position = (
36 session.query(func.max(QueuedResearch.position))
37 .filter_by(username=username)
38 .scalar()
39 or 0
40 )
42 queued_record = QueuedResearch(
43 username=username,
44 research_id=research_id,
45 query=query,
46 mode=mode,
47 settings_snapshot=settings,
48 position=max_position + 1,
49 )
50 session.add(queued_record)
51 session.commit()
53 logger.info(
54 f"Added research {research_id} to queue at position {max_position + 1}"
55 )
57 # Send RESEARCH_QUEUED notification if enabled
58 try:
59 from ...settings import SettingsManager
60 from ...notifications import send_queue_notification
62 settings_manager = SettingsManager(session)
63 settings_snapshot = settings_manager.get_settings_snapshot()
65 send_queue_notification(
66 username=username,
67 research_id=research_id,
68 query=query,
69 settings_snapshot=settings_snapshot,
70 position=max_position + 1,
71 )
72 except Exception as e:
73 logger.debug(f"Failed to send queued notification: {e}")
75 # Notify queue processor about the new queued research
76 # Note: When using QueueManager, we don't have all parameters for direct execution
77 # So it will fall back to queue mode
78 queue_processor.notify_research_queued(username, research_id)
80 return max_position + 1
82 @staticmethod
83 def get_queue_position(username, research_id):
84 """
85 Get the current queue position for a research
87 Args:
88 username: User who owns the research
89 research_id: UUID of the research
91 Returns:
92 int: Current queue position or None if not in queue
93 """
94 session = db_manager.get_session(username)
95 if not session:
96 return None
98 with session:
99 queued = (
100 session.query(QueuedResearch)
101 .filter_by(username=username, research_id=research_id)
102 .first()
103 )
105 if not queued:
106 return None
108 # Count how many are ahead in queue
109 ahead_count = (
110 session.query(QueuedResearch)
111 .filter(
112 QueuedResearch.username == username,
113 QueuedResearch.position < queued.position,
114 )
115 .count()
116 )
118 return ahead_count + 1
120 @staticmethod
121 def remove_from_queue(username, research_id):
122 """
123 Remove a research from the queue
125 Args:
126 username: User who owns the research
127 research_id: UUID of the research
129 Returns:
130 bool: True if removed, False if not found
131 """
132 session = db_manager.get_session(username)
133 if not session:
134 return False
136 with session:
137 queued = (
138 session.query(QueuedResearch)
139 .filter_by(username=username, research_id=research_id)
140 .first()
141 )
143 if not queued:
144 return False
146 position = queued.position
147 session.delete(queued)
149 # Update positions of items behind in queue
150 session.query(QueuedResearch).filter(
151 QueuedResearch.username == username,
152 QueuedResearch.position > position,
153 ).update({QueuedResearch.position: QueuedResearch.position - 1})
155 session.commit()
156 logger.info(f"Removed research {research_id} from queue")
157 return True
159 @staticmethod
160 def get_user_queue(username):
161 """
162 Get all queued researches for a user
164 Args:
165 username: User to get queue for
167 Returns:
168 list: List of queued research info
169 """
170 session = db_manager.get_session(username)
171 if not session:
172 return []
174 with session:
175 queued_items = (
176 session.query(QueuedResearch)
177 .filter_by(username=username)
178 .order_by(QueuedResearch.position)
179 .all()
180 )
182 result = []
183 for item in queued_items:
184 # Get research info
185 research = (
186 session.query(ResearchHistory)
187 .filter_by(id=item.research_id)
188 .first()
189 )
191 if research: 191 ↛ 183line 191 didn't jump to line 183 because the condition on line 191 was always true
192 result.append(
193 {
194 "research_id": item.research_id,
195 "query": item.query,
196 "mode": item.mode,
197 "position": item.position,
198 "created_at": item.created_at.isoformat()
199 if item.created_at
200 else None,
201 "is_processing": item.is_processing,
202 }
203 )
205 return result