Coverage for src / local_deep_research / web / queue / manager.py: 96%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1"""Queue manager for handling research queue operations""" 

2 

3from loguru import logger 

4from sqlalchemy import func 

5 

6from ...database.encrypted_db import db_manager 

7from ...database.models import QueuedResearch, ResearchHistory 

8from .processor_v2 import queue_processor 

9 

10 

11class QueueManager: 

12 """Manages the research queue operations""" 

13 

14 @staticmethod 

15 def add_to_queue(username, research_id, query, mode, settings): 

16 """ 

17 Add a research to the queue 

18 

19 Args: 

20 username: User who owns the research 

21 research_id: UUID of the research 

22 query: Research query 

23 mode: Research mode 

24 settings: Research settings dictionary 

25 

26 Returns: 

27 int: Queue position 

28 """ 

29 session = db_manager.get_session(username) 

30 if not session: 

31 raise ValueError(f"No database connection for user {username}") 

32 

33 with session: 

34 # Get the next position in queue for this user 

35 max_position = ( 

36 session.query(func.max(QueuedResearch.position)) 

37 .filter_by(username=username) 

38 .scalar() 

39 or 0 

40 ) 

41 

42 queued_record = QueuedResearch( 

43 username=username, 

44 research_id=research_id, 

45 query=query, 

46 mode=mode, 

47 settings_snapshot=settings, 

48 position=max_position + 1, 

49 ) 

50 session.add(queued_record) 

51 session.commit() 

52 

53 logger.info( 

54 f"Added research {research_id} to queue at position {max_position + 1}" 

55 ) 

56 

57 # Send RESEARCH_QUEUED notification if enabled 

58 try: 

59 from ...settings import SettingsManager 

60 from ...notifications import send_queue_notification 

61 

62 settings_manager = SettingsManager(session) 

63 settings_snapshot = settings_manager.get_settings_snapshot() 

64 

65 send_queue_notification( 

66 username=username, 

67 research_id=research_id, 

68 query=query, 

69 settings_snapshot=settings_snapshot, 

70 position=max_position + 1, 

71 ) 

72 except Exception as e: 

73 logger.debug(f"Failed to send queued notification: {e}") 

74 

75 # Notify queue processor about the new queued research 

76 # Note: When using QueueManager, we don't have all parameters for direct execution 

77 # So it will fall back to queue mode 

78 queue_processor.notify_research_queued(username, research_id) 

79 

80 return max_position + 1 

81 

82 @staticmethod 

83 def get_queue_position(username, research_id): 

84 """ 

85 Get the current queue position for a research 

86 

87 Args: 

88 username: User who owns the research 

89 research_id: UUID of the research 

90 

91 Returns: 

92 int: Current queue position or None if not in queue 

93 """ 

94 session = db_manager.get_session(username) 

95 if not session: 

96 return None 

97 

98 with session: 

99 queued = ( 

100 session.query(QueuedResearch) 

101 .filter_by(username=username, research_id=research_id) 

102 .first() 

103 ) 

104 

105 if not queued: 

106 return None 

107 

108 # Count how many are ahead in queue 

109 ahead_count = ( 

110 session.query(QueuedResearch) 

111 .filter( 

112 QueuedResearch.username == username, 

113 QueuedResearch.position < queued.position, 

114 ) 

115 .count() 

116 ) 

117 

118 return ahead_count + 1 

119 

120 @staticmethod 

121 def remove_from_queue(username, research_id): 

122 """ 

123 Remove a research from the queue 

124 

125 Args: 

126 username: User who owns the research 

127 research_id: UUID of the research 

128 

129 Returns: 

130 bool: True if removed, False if not found 

131 """ 

132 session = db_manager.get_session(username) 

133 if not session: 

134 return False 

135 

136 with session: 

137 queued = ( 

138 session.query(QueuedResearch) 

139 .filter_by(username=username, research_id=research_id) 

140 .first() 

141 ) 

142 

143 if not queued: 

144 return False 

145 

146 position = queued.position 

147 session.delete(queued) 

148 

149 # Update positions of items behind in queue 

150 session.query(QueuedResearch).filter( 

151 QueuedResearch.username == username, 

152 QueuedResearch.position > position, 

153 ).update({QueuedResearch.position: QueuedResearch.position - 1}) 

154 

155 session.commit() 

156 logger.info(f"Removed research {research_id} from queue") 

157 return True 

158 

159 @staticmethod 

160 def get_user_queue(username): 

161 """ 

162 Get all queued researches for a user 

163 

164 Args: 

165 username: User to get queue for 

166 

167 Returns: 

168 list: List of queued research info 

169 """ 

170 session = db_manager.get_session(username) 

171 if not session: 

172 return [] 

173 

174 with session: 

175 queued_items = ( 

176 session.query(QueuedResearch) 

177 .filter_by(username=username) 

178 .order_by(QueuedResearch.position) 

179 .all() 

180 ) 

181 

182 result = [] 

183 for item in queued_items: 

184 # Get research info 

185 research = ( 

186 session.query(ResearchHistory) 

187 .filter_by(id=item.research_id) 

188 .first() 

189 ) 

190 

191 if research: 191 ↛ 183line 191 didn't jump to line 183 because the condition on line 191 was always true

192 result.append( 

193 { 

194 "research_id": item.research_id, 

195 "query": item.query, 

196 "mode": item.mode, 

197 "position": item.position, 

198 "created_at": item.created_at.isoformat() 

199 if item.created_at 

200 else None, 

201 "is_processing": item.is_processing, 

202 } 

203 ) 

204 

205 return result