Coverage for src / local_deep_research / database / queue_service.py: 28%

70 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Queue service for managing tasks using encrypted user databases. 

3Replaces the service_db approach with direct access to user databases. 

4""" 

5 

6from datetime import datetime, timedelta, UTC 

7from typing import Any, Dict, List, Optional 

8 

9from sqlalchemy.orm import Session 

10 

11from .models import QueueStatus, TaskMetadata 

12 

13 

14class UserQueueService: 

15 """Manages queue operations within a user's encrypted database.""" 

16 

17 def __init__(self, session: Session): 

18 """ 

19 Initialize with a database session. 

20 

21 Args: 

22 session: SQLAlchemy session for the user's encrypted database 

23 """ 

24 self.session = session 

25 

26 def update_queue_status( 

27 self, 

28 active_tasks: int, 

29 queued_tasks: int, 

30 last_task_id: Optional[str] = None, 

31 ) -> None: 

32 """Update queue status for the user.""" 

33 status = self.session.query(QueueStatus).first() 

34 

35 if status: 

36 status.active_tasks = active_tasks 

37 status.queued_tasks = queued_tasks 

38 status.last_checked = datetime.now(UTC) 

39 if last_task_id: 

40 status.last_task_id = last_task_id 

41 else: 

42 status = QueueStatus( 

43 active_tasks=active_tasks, 

44 queued_tasks=queued_tasks, 

45 last_task_id=last_task_id, 

46 ) 

47 self.session.add(status) 

48 

49 self.session.commit() 

50 

51 def get_queue_status(self) -> Optional[Dict[str, Any]]: 

52 """Get queue status for the user.""" 

53 status = self.session.query(QueueStatus).first() 

54 

55 if status: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 return { 

57 "active_tasks": status.active_tasks, 

58 "queued_tasks": status.queued_tasks, 

59 "last_checked": status.last_checked, 

60 "last_task_id": status.last_task_id, 

61 } 

62 return None 

63 

64 def add_task_metadata( 

65 self, 

66 task_id: str, 

67 task_type: str, 

68 priority: int = 0, 

69 ) -> None: 

70 """Add metadata for a new task.""" 

71 task = TaskMetadata( 

72 task_id=task_id, 

73 status="queued", 

74 task_type=task_type, 

75 priority=priority, 

76 ) 

77 self.session.add(task) 

78 

79 # Update queue counts 

80 self._increment_queue_count() 

81 

82 self.session.commit() 

83 

84 def update_task_status( 

85 self, task_id: str, status: str, error_message: Optional[str] = None 

86 ) -> None: 

87 """Update task status.""" 

88 task = ( 

89 self.session.query(TaskMetadata).filter_by(task_id=task_id).first() 

90 ) 

91 

92 if task: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 old_status = task.status 

94 task.status = status 

95 task.error_message = error_message 

96 

97 if status == "processing" and old_status == "queued": 

98 task.started_at = datetime.now(UTC) 

99 self._update_queue_counts(-1, 1) # -1 queued, +1 active 

100 

101 elif status in ["completed", "failed"]: 

102 task.completed_at = datetime.now(UTC) 

103 self._update_queue_counts(0, -1) # 0 queued, -1 active 

104 

105 self.session.commit() 

106 

107 def get_pending_tasks(self, limit: int = 5) -> List[Dict[str, Any]]: 

108 """Get pending tasks for the user.""" 

109 tasks = ( 

110 self.session.query(TaskMetadata) 

111 .filter_by(status="queued") 

112 .order_by(TaskMetadata.priority.desc(), TaskMetadata.created_at) 

113 .limit(limit) 

114 .all() 

115 ) 

116 

117 return [ 

118 { 

119 "task_id": t.task_id, 

120 "task_type": t.task_type, 

121 "created_at": t.created_at, 

122 "priority": t.priority, 

123 } 

124 for t in tasks 

125 ] 

126 

127 def cleanup_old_tasks(self, days: int = 7) -> int: 

128 """Clean up old completed/failed tasks.""" 

129 cutoff_date = datetime.now(UTC) - timedelta(days=days) 

130 

131 deleted = ( 

132 self.session.query(TaskMetadata) 

133 .filter( 

134 TaskMetadata.status.in_(["completed", "failed"]), 

135 TaskMetadata.completed_at < cutoff_date, 

136 ) 

137 .delete() 

138 ) 

139 

140 self.session.commit() 

141 return deleted 

142 

143 def get_active_task_count(self) -> int: 

144 """Get count of active tasks.""" 

145 status = self.session.query(QueueStatus).first() 

146 return status.active_tasks if status else 0 

147 

148 def get_queued_task_count(self) -> int: 

149 """Get count of queued tasks.""" 

150 status = self.session.query(QueueStatus).first() 

151 return status.queued_tasks if status else 0 

152 

153 def _increment_queue_count(self): 

154 """Increment the queued task count.""" 

155 status = self.session.query(QueueStatus).first() 

156 if status: 

157 status.queued_tasks += 1 

158 status.last_checked = datetime.now(UTC) 

159 else: 

160 status = QueueStatus(queued_tasks=1, active_tasks=0) 

161 self.session.add(status) 

162 

163 def _update_queue_counts(self, queued_delta: int, active_delta: int): 

164 """Update queue counts by deltas.""" 

165 status = self.session.query(QueueStatus).first() 

166 if status: 

167 status.queued_tasks = max(0, status.queued_tasks + queued_delta) 

168 status.active_tasks = max(0, status.active_tasks + active_delta) 

169 status.last_checked = datetime.now(UTC) 

170 else: 

171 # Create new status if doesn't exist 

172 status = QueueStatus( 

173 queued_tasks=max(0, queued_delta), 

174 active_tasks=max(0, active_delta), 

175 ) 

176 self.session.add(status)