Coverage for src / local_deep_research / database / queue_service.py: 28%
70 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Queue service for managing tasks using encrypted user databases.
3Replaces the service_db approach with direct access to user databases.
4"""
6from datetime import datetime, timedelta, UTC
7from typing import Any, Dict, List, Optional
9from sqlalchemy.orm import Session
11from .models import QueueStatus, TaskMetadata
14class UserQueueService:
15 """Manages queue operations within a user's encrypted database."""
17 def __init__(self, session: Session):
18 """
19 Initialize with a database session.
21 Args:
22 session: SQLAlchemy session for the user's encrypted database
23 """
24 self.session = session
26 def update_queue_status(
27 self,
28 active_tasks: int,
29 queued_tasks: int,
30 last_task_id: Optional[str] = None,
31 ) -> None:
32 """Update queue status for the user."""
33 status = self.session.query(QueueStatus).first()
35 if status:
36 status.active_tasks = active_tasks
37 status.queued_tasks = queued_tasks
38 status.last_checked = datetime.now(UTC)
39 if last_task_id:
40 status.last_task_id = last_task_id
41 else:
42 status = QueueStatus(
43 active_tasks=active_tasks,
44 queued_tasks=queued_tasks,
45 last_task_id=last_task_id,
46 )
47 self.session.add(status)
49 self.session.commit()
51 def get_queue_status(self) -> Optional[Dict[str, Any]]:
52 """Get queue status for the user."""
53 status = self.session.query(QueueStatus).first()
55 if status: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 return {
57 "active_tasks": status.active_tasks,
58 "queued_tasks": status.queued_tasks,
59 "last_checked": status.last_checked,
60 "last_task_id": status.last_task_id,
61 }
62 return None
64 def add_task_metadata(
65 self,
66 task_id: str,
67 task_type: str,
68 priority: int = 0,
69 ) -> None:
70 """Add metadata for a new task."""
71 task = TaskMetadata(
72 task_id=task_id,
73 status="queued",
74 task_type=task_type,
75 priority=priority,
76 )
77 self.session.add(task)
79 # Update queue counts
80 self._increment_queue_count()
82 self.session.commit()
84 def update_task_status(
85 self, task_id: str, status: str, error_message: Optional[str] = None
86 ) -> None:
87 """Update task status."""
88 task = (
89 self.session.query(TaskMetadata).filter_by(task_id=task_id).first()
90 )
92 if task: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 old_status = task.status
94 task.status = status
95 task.error_message = error_message
97 if status == "processing" and old_status == "queued":
98 task.started_at = datetime.now(UTC)
99 self._update_queue_counts(-1, 1) # -1 queued, +1 active
101 elif status in ["completed", "failed"]:
102 task.completed_at = datetime.now(UTC)
103 self._update_queue_counts(0, -1) # 0 queued, -1 active
105 self.session.commit()
107 def get_pending_tasks(self, limit: int = 5) -> List[Dict[str, Any]]:
108 """Get pending tasks for the user."""
109 tasks = (
110 self.session.query(TaskMetadata)
111 .filter_by(status="queued")
112 .order_by(TaskMetadata.priority.desc(), TaskMetadata.created_at)
113 .limit(limit)
114 .all()
115 )
117 return [
118 {
119 "task_id": t.task_id,
120 "task_type": t.task_type,
121 "created_at": t.created_at,
122 "priority": t.priority,
123 }
124 for t in tasks
125 ]
127 def cleanup_old_tasks(self, days: int = 7) -> int:
128 """Clean up old completed/failed tasks."""
129 cutoff_date = datetime.now(UTC) - timedelta(days=days)
131 deleted = (
132 self.session.query(TaskMetadata)
133 .filter(
134 TaskMetadata.status.in_(["completed", "failed"]),
135 TaskMetadata.completed_at < cutoff_date,
136 )
137 .delete()
138 )
140 self.session.commit()
141 return deleted
143 def get_active_task_count(self) -> int:
144 """Get count of active tasks."""
145 status = self.session.query(QueueStatus).first()
146 return status.active_tasks if status else 0
148 def get_queued_task_count(self) -> int:
149 """Get count of queued tasks."""
150 status = self.session.query(QueueStatus).first()
151 return status.queued_tasks if status else 0
153 def _increment_queue_count(self):
154 """Increment the queued task count."""
155 status = self.session.query(QueueStatus).first()
156 if status:
157 status.queued_tasks += 1
158 status.last_checked = datetime.now(UTC)
159 else:
160 status = QueueStatus(queued_tasks=1, active_tasks=0)
161 self.session.add(status)
163 def _update_queue_counts(self, queued_delta: int, active_delta: int):
164 """Update queue counts by deltas."""
165 status = self.session.query(QueueStatus).first()
166 if status:
167 status.queued_tasks = max(0, status.queued_tasks + queued_delta)
168 status.active_tasks = max(0, status.active_tasks + active_delta)
169 status.last_checked = datetime.now(UTC)
170 else:
171 # Create new status if doesn't exist
172 status = QueueStatus(
173 queued_tasks=max(0, queued_delta),
174 active_tasks=max(0, active_delta),
175 )
176 self.session.add(status)