Coverage for src/local_deep_research/database/queue_service.py: 100%
75 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Queue service for managing tasks using encrypted user databases.
3Replaces the service_db approach with direct access to user databases.
4"""
6from datetime import datetime, timedelta, UTC
7from typing import Any, Dict, List, Optional
9from loguru import logger
10from sqlalchemy.orm import Session
12from .models import QueueStatus, TaskMetadata
15class UserQueueService:
16 """Manages queue operations within a user's encrypted database."""
18 def __init__(self, session: Session):
19 """
20 Initialize with a database session.
22 Args:
23 session: SQLAlchemy session for the user's encrypted database
24 """
25 self.session = session
27 def _safe_commit(self) -> None:
28 """Commit the current transaction, rolling back on failure."""
29 try:
30 self.session.commit()
31 except Exception:
32 self.session.rollback()
33 logger.exception("Database commit failed, transaction rolled back")
34 raise
36 def update_queue_status(
37 self,
38 active_tasks: int,
39 queued_tasks: int,
40 last_task_id: Optional[str] = None,
41 ) -> None:
42 """Update queue status for the user."""
43 status = self.session.query(QueueStatus).first()
45 if status:
46 status.active_tasks = active_tasks
47 status.queued_tasks = queued_tasks
48 status.last_checked = datetime.now(UTC)
49 if last_task_id:
50 status.last_task_id = last_task_id
51 else:
52 status = QueueStatus(
53 active_tasks=active_tasks,
54 queued_tasks=queued_tasks,
55 last_task_id=last_task_id,
56 )
57 self.session.add(status)
59 self._safe_commit()
61 def get_queue_status(self) -> Optional[Dict[str, Any]]:
62 """Get queue status for the user."""
63 status = self.session.query(QueueStatus).first()
65 if status:
66 return {
67 "active_tasks": status.active_tasks,
68 "queued_tasks": status.queued_tasks,
69 "last_checked": status.last_checked,
70 "last_task_id": status.last_task_id,
71 }
72 return None
74 def add_task_metadata(
75 self,
76 task_id: str,
77 task_type: str,
78 priority: int = 0,
79 ) -> None:
80 """Add metadata for a new task."""
81 task = TaskMetadata(
82 task_id=task_id,
83 status="queued",
84 task_type=task_type,
85 priority=priority,
86 )
87 self.session.add(task)
89 # Update queue counts
90 self._increment_queue_count()
92 self._safe_commit()
94 def update_task_status(
95 self, task_id: str, status: str, error_message: Optional[str] = None
96 ) -> None:
97 """Update task status."""
98 task = (
99 self.session.query(TaskMetadata).filter_by(task_id=task_id).first()
100 )
102 if task:
103 old_status = task.status
104 task.status = status
105 task.error_message = error_message
107 if status == "processing" and old_status == "queued":
108 task.started_at = datetime.now(UTC)
109 self._update_queue_counts(-1, 1) # -1 queued, +1 active
111 elif status == "queued" and old_status == "processing":
112 # Dispatch was rolled back (e.g. global capacity reject left
113 # the research queued for the next tick). Revert the claim so
114 # the counters stay balanced instead of leaking a slot into
115 # active_tasks on every retry.
116 task.started_at = None
117 self._update_queue_counts(1, -1) # +1 queued, -1 active
119 elif status in ["completed", "failed"]:
120 task.completed_at = datetime.now(UTC)
121 self._update_queue_counts(0, -1) # 0 queued, -1 active
123 self._safe_commit()
125 def get_pending_tasks(self, limit: int = 50) -> List[Dict[str, Any]]:
126 """Get pending tasks for the user."""
127 tasks = (
128 self.session.query(TaskMetadata)
129 .filter_by(status="queued")
130 .order_by(TaskMetadata.priority.desc(), TaskMetadata.created_at)
131 .limit(limit)
132 .all()
133 )
135 return [
136 {
137 "task_id": t.task_id,
138 "task_type": t.task_type,
139 "created_at": t.created_at,
140 "priority": t.priority,
141 }
142 for t in tasks
143 ]
145 def cleanup_old_tasks(self, days: int = 7) -> int:
146 """Clean up old completed/failed tasks."""
147 cutoff_date = datetime.now(UTC) - timedelta(days=days)
149 deleted = (
150 self.session.query(TaskMetadata)
151 .filter(
152 TaskMetadata.status.in_(["completed", "failed"]),
153 TaskMetadata.completed_at < cutoff_date,
154 )
155 .delete()
156 )
158 self._safe_commit()
159 return deleted
161 def _get_or_create_status(self) -> QueueStatus:
162 """Get existing queue status or create a new one with zero counts."""
163 status = self.session.query(QueueStatus).first()
164 if status is None:
165 status = QueueStatus(queued_tasks=0, active_tasks=0)
166 self.session.add(status)
167 return status
169 def _increment_queue_count(self):
170 """Increment the queued task count."""
171 status = self._get_or_create_status()
172 status.queued_tasks += 1
173 status.last_checked = datetime.now(UTC)
175 def _update_queue_counts(self, queued_delta: int, active_delta: int):
176 """Update queue counts by deltas."""
177 status = self._get_or_create_status()
178 status.queued_tasks = max(0, status.queued_tasks + queued_delta)
179 status.active_tasks = max(0, status.active_tasks + active_delta)
180 status.last_checked = datetime.now(UTC)