Coverage for src / local_deep_research / web / queue / manager.py: 16%

79 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1"""Queue manager for handling research queue operations""" 

2 

3from loguru import logger 

4from sqlalchemy import func 

5from sqlalchemy.orm import sessionmaker 

6 

7from ...database.encrypted_db import db_manager 

8from ...database.models import QueuedResearch, ResearchHistory 

9from .processor_v2 import queue_processor 

10 

11 

12class QueueManager: 

13 """Manages the research queue operations""" 

14 

15 @staticmethod 

16 def add_to_queue(username, research_id, query, mode, settings): 

17 """ 

18 Add a research to the queue 

19 

20 Args: 

21 username: User who owns the research 

22 research_id: UUID of the research 

23 query: Research query 

24 mode: Research mode 

25 settings: Research settings dictionary 

26 

27 Returns: 

28 int: Queue position 

29 """ 

30 engine = db_manager.connections.get(username) 

31 if not engine: 

32 raise ValueError(f"No database connection for user {username}") 

33 

34 SessionLocal = sessionmaker(bind=engine) 

35 session = SessionLocal() 

36 

37 try: 

38 # Get the next position in queue for this user 

39 max_position = ( 

40 session.query(func.max(QueuedResearch.position)) 

41 .filter_by(username=username) 

42 .scalar() 

43 or 0 

44 ) 

45 

46 queued_record = QueuedResearch( 

47 username=username, 

48 research_id=research_id, 

49 query=query, 

50 mode=mode, 

51 settings_snapshot=settings, 

52 position=max_position + 1, 

53 ) 

54 session.add(queued_record) 

55 session.commit() 

56 

57 logger.info( 

58 f"Added research {research_id} to queue at position {max_position + 1}" 

59 ) 

60 

61 # Send RESEARCH_QUEUED notification if enabled 

62 try: 

63 from ...settings import SettingsManager 

64 from ...notifications import send_queue_notification 

65 

66 settings_manager = SettingsManager(session) 

67 settings_snapshot = settings_manager.get_settings_snapshot() 

68 

69 send_queue_notification( 

70 username=username, 

71 research_id=research_id, 

72 query=query, 

73 settings_snapshot=settings_snapshot, 

74 position=max_position + 1, 

75 ) 

76 except Exception as e: 

77 logger.debug(f"Failed to send queued notification: {e}") 

78 

79 # Notify queue processor about the new queued research 

80 # Note: When using QueueManager, we don't have all parameters for direct execution 

81 # So it will fall back to queue mode 

82 queue_processor.notify_research_queued(username, research_id) 

83 

84 return max_position + 1 

85 

86 finally: 

87 session.close() 

88 

89 @staticmethod 

90 def get_queue_position(username, research_id): 

91 """ 

92 Get the current queue position for a research 

93 

94 Args: 

95 username: User who owns the research 

96 research_id: UUID of the research 

97 

98 Returns: 

99 int: Current queue position or None if not in queue 

100 """ 

101 engine = db_manager.connections.get(username) 

102 if not engine: 

103 return None 

104 

105 SessionLocal = sessionmaker(bind=engine) 

106 session = SessionLocal() 

107 

108 try: 

109 queued = ( 

110 session.query(QueuedResearch) 

111 .filter_by(username=username, research_id=research_id) 

112 .first() 

113 ) 

114 

115 if not queued: 

116 return None 

117 

118 # Count how many are ahead in queue 

119 ahead_count = ( 

120 session.query(QueuedResearch) 

121 .filter( 

122 QueuedResearch.username == username, 

123 QueuedResearch.position < queued.position, 

124 ) 

125 .count() 

126 ) 

127 

128 return ahead_count + 1 

129 

130 finally: 

131 session.close() 

132 

133 @staticmethod 

134 def remove_from_queue(username, research_id): 

135 """ 

136 Remove a research from the queue 

137 

138 Args: 

139 username: User who owns the research 

140 research_id: UUID of the research 

141 

142 Returns: 

143 bool: True if removed, False if not found 

144 """ 

145 engine = db_manager.connections.get(username) 

146 if not engine: 

147 return False 

148 

149 SessionLocal = sessionmaker(bind=engine) 

150 session = SessionLocal() 

151 

152 try: 

153 queued = ( 

154 session.query(QueuedResearch) 

155 .filter_by(username=username, research_id=research_id) 

156 .first() 

157 ) 

158 

159 if not queued: 

160 return False 

161 

162 position = queued.position 

163 session.delete(queued) 

164 

165 # Update positions of items behind in queue 

166 session.query(QueuedResearch).filter( 

167 QueuedResearch.username == username, 

168 QueuedResearch.position > position, 

169 ).update({QueuedResearch.position: QueuedResearch.position - 1}) 

170 

171 session.commit() 

172 logger.info(f"Removed research {research_id} from queue") 

173 return True 

174 

175 finally: 

176 session.close() 

177 

178 @staticmethod 

179 def get_user_queue(username): 

180 """ 

181 Get all queued researches for a user 

182 

183 Args: 

184 username: User to get queue for 

185 

186 Returns: 

187 list: List of queued research info 

188 """ 

189 engine = db_manager.connections.get(username) 

190 if not engine: 

191 return [] 

192 

193 SessionLocal = sessionmaker(bind=engine) 

194 session = SessionLocal() 

195 

196 try: 

197 queued_items = ( 

198 session.query(QueuedResearch) 

199 .filter_by(username=username) 

200 .order_by(QueuedResearch.position) 

201 .all() 

202 ) 

203 

204 result = [] 

205 for item in queued_items: 

206 # Get research info 

207 research = ( 

208 session.query(ResearchHistory) 

209 .filter_by(id=item.research_id) 

210 .first() 

211 ) 

212 

213 if research: 

214 result.append( 

215 { 

216 "research_id": item.research_id, 

217 "query": item.query, 

218 "mode": item.mode, 

219 "position": item.position, 

220 "created_at": item.created_at.isoformat() 

221 if item.created_at 

222 else None, 

223 "is_processing": item.is_processing, 

224 } 

225 ) 

226 

227 return result 

228 

229 finally: 

230 session.close()