Coverage for src / local_deep_research / web / queue / manager.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Queue manager for handling research queue operations""" 

2 

3from loguru import logger 

4from sqlalchemy import func 

5 

6from ...database.session_context import get_user_db_session 

7from ...database.models import QueuedResearch, ResearchHistory 

8from .processor_v2 import queue_processor 

9 

10 

11class QueueManager: 

12 """Manages the research queue operations""" 

13 

14 @staticmethod 

15 def add_to_queue(username, research_id, query, mode, settings): 

16 """ 

17 Add a research to the queue 

18 

19 Args: 

20 username: User who owns the research 

21 research_id: UUID of the research 

22 query: Research query 

23 mode: Research mode 

24 settings: Research settings dictionary 

25 

26 Returns: 

27 int: Queue position 

28 """ 

29 with get_user_db_session(username) as session: 

30 # Get the next position in queue for this user 

31 max_position = ( 

32 session.query(func.max(QueuedResearch.position)) 

33 .filter_by(username=username) 

34 .scalar() 

35 or 0 

36 ) 

37 

38 queued_record = QueuedResearch( 

39 username=username, 

40 research_id=research_id, 

41 query=query, 

42 mode=mode, 

43 settings_snapshot=settings, 

44 position=max_position + 1, 

45 ) 

46 session.add(queued_record) 

47 session.commit() 

48 

49 logger.info( 

50 f"Added research {research_id} to queue at position {max_position + 1}" 

51 ) 

52 

53 # Send RESEARCH_QUEUED notification if enabled 

54 try: 

55 from ...settings import SettingsManager 

56 from ...notifications import send_queue_notification 

57 

58 settings_manager = SettingsManager(session) 

59 settings_snapshot = settings_manager.get_settings_snapshot() 

60 

61 send_queue_notification( 

62 username=username, 

63 research_id=research_id, 

64 query=query, 

65 settings_snapshot=settings_snapshot, 

66 position=max_position + 1, 

67 ) 

68 except Exception as e: 

69 logger.debug(f"Failed to send queued notification: {e}") 

70 

71 # Notify queue processor about the new queued research 

72 # Note: When using QueueManager, we don't have all parameters for direct execution 

73 # So it will fall back to queue mode 

74 try: 

75 queue_processor.notify_research_queued(username, research_id) 

76 except Exception: 

77 logger.warning("Failed to notify queue processor") 

78 

79 return max_position + 1 

80 

81 @staticmethod 

82 def get_queue_position(username, research_id): 

83 """ 

84 Get the current queue position for a research 

85 

86 Args: 

87 username: User who owns the research 

88 research_id: UUID of the research 

89 

90 Returns: 

91 int: Current queue position or None if not in queue 

92 """ 

93 with get_user_db_session(username) as session: 

94 queued = ( 

95 session.query(QueuedResearch) 

96 .filter_by(username=username, research_id=research_id) 

97 .first() 

98 ) 

99 

100 if not queued: 

101 return None 

102 

103 # Count how many are ahead in queue 

104 ahead_count = ( 

105 session.query(QueuedResearch) 

106 .filter( 

107 QueuedResearch.username == username, 

108 QueuedResearch.position < queued.position, 

109 ) 

110 .count() 

111 ) 

112 

113 return ahead_count + 1 

114 

115 @staticmethod 

116 def remove_from_queue(username, research_id): 

117 """ 

118 Remove a research from the queue 

119 

120 Args: 

121 username: User who owns the research 

122 research_id: UUID of the research 

123 

124 Returns: 

125 bool: True if removed, False if not found 

126 """ 

127 with get_user_db_session(username) as session: 

128 queued = ( 

129 session.query(QueuedResearch) 

130 .filter_by(username=username, research_id=research_id) 

131 .first() 

132 ) 

133 

134 if not queued: 

135 return False 

136 

137 position = queued.position 

138 session.delete(queued) 

139 

140 # Update positions of items behind in queue 

141 session.query(QueuedResearch).filter( 

142 QueuedResearch.username == username, 

143 QueuedResearch.position > position, 

144 ).update({QueuedResearch.position: QueuedResearch.position - 1}) 

145 

146 session.commit() 

147 logger.info(f"Removed research {research_id} from queue") 

148 return True 

149 

150 @staticmethod 

151 def get_user_queue(username): 

152 """ 

153 Get all queued researches for a user 

154 

155 Args: 

156 username: User to get queue for 

157 

158 Returns: 

159 list: List of queued research info 

160 """ 

161 with get_user_db_session(username) as session: 

162 queued_items = ( 

163 session.query(QueuedResearch) 

164 .filter_by(username=username) 

165 .order_by(QueuedResearch.position) 

166 .all() 

167 ) 

168 

169 result = [] 

170 for item in queued_items: 

171 # Get research info 

172 research = ( 

173 session.query(ResearchHistory) 

174 .filter_by(id=item.research_id) 

175 .first() 

176 ) 

177 

178 if research: 

179 result.append( 

180 { 

181 "research_id": item.research_id, 

182 "query": item.query, 

183 "mode": item.mode, 

184 "position": item.position, 

185 "created_at": item.created_at.isoformat() 

186 if item.created_at 

187 else None, 

188 "is_processing": item.is_processing, 

189 } 

190 ) 

191 

192 return result