Coverage for src / local_deep_research / library / download_management / status_tracker.py: 9%
119 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Resource Status Tracker
4Tracks download attempts, failures, and cooldowns in the database.
5Provides persistent storage for failure classifications and retry eligibility.
6"""
8from datetime import datetime, timedelta, UTC
9from typing import Optional, Dict, Any
11from loguru import logger
12from sqlalchemy.orm import sessionmaker, Session
14from .models import ResourceDownloadStatus, Base
15from .failure_classifier import BaseFailure
18class ResourceStatusTracker:
19 """Track download attempts, failures, and cooldowns in database"""
21 def __init__(self, username: str, password: Optional[str] = None):
22 """
23 Initialize the status tracker for a user.
25 Args:
26 username: Username for database access
27 password: Optional password for encrypted database
28 """
29 self.username = username
30 self.password = password
32 # Use the global db_manager singleton to share cached connections
33 from ...database.encrypted_db import db_manager
35 self.db_manager = db_manager
36 self.engine = db_manager.open_user_database(username, password)
37 self.Session = sessionmaker(bind=self.engine)
39 # Create tables if they don't exist
40 Base.metadata.create_all(self.engine)
41 logger.info(
42 f"[STATUS_TRACKER] Initialized for user: {username} with encrypted database"
43 )
45 def _get_session(self) -> Session:
46 """Get a database session"""
47 return self.Session()
49 def mark_failure(
50 self,
51 resource_id: int,
52 failure: BaseFailure,
53 session: Optional[Session] = None,
54 ) -> None:
55 """
56 Mark a resource as failed with classification.
58 Args:
59 resource_id: Resource identifier
60 failure: Classified failure object
61 session: Optional existing database session to reuse
62 """
63 # Use provided session or create new one
64 should_close = session is None
65 if session is None:
66 session = self._get_session()
68 try:
69 # Get or create status record
70 status = (
71 session.query(ResourceDownloadStatus)
72 .filter_by(resource_id=resource_id)
73 .first()
74 )
76 if not status:
77 status = ResourceDownloadStatus(resource_id=resource_id)
78 session.add(status)
80 # Update status information
81 if failure.is_permanent():
82 status.status = "permanently_failed"
83 status.retry_after_timestamp = None
84 status.failure_type = failure.error_type
85 status.failure_message = failure.message
86 status.permanent_failure_at = datetime.now(UTC)
87 logger.info(
88 f"[STATUS_TRACKER] Marked resource {resource_id} as permanently failed: {failure.error_type}"
89 )
90 else:
91 status.status = "temporarily_failed"
92 status.retry_after_timestamp = (
93 failure.created_at + failure.retry_after
94 )
95 status.failure_type = failure.error_type
96 status.failure_message = failure.message
97 logger.info(
98 f"[STATUS_TRACKER] Marked resource {resource_id} as temporarily failed: {failure.error_type}, retry after: {failure.retry_after}"
99 )
101 # Update retry statistics
102 # Ensure total_retry_count is initialized (handle None from legacy data)
103 if status.total_retry_count is None:
104 status.total_retry_count = 0
105 status.total_retry_count += 1
106 status.last_attempt_at = datetime.now(UTC)
108 # Check if retry is today
109 today = datetime.now(UTC).date()
110 last_attempt = (
111 status.last_attempt_at.date()
112 if status.last_attempt_at
113 else None
114 )
115 if last_attempt == today:
116 # Ensure today_retry_count is initialized (handle None from legacy data)
117 if status.today_retry_count is None:
118 status.today_retry_count = 0
119 status.today_retry_count += 1
120 else:
121 status.today_retry_count = 1
123 # Only commit if we created the session
124 if should_close:
125 session.commit()
126 logger.debug(
127 f"[STATUS_TRACKER] Updated failure status for resource {resource_id}"
128 )
129 finally:
130 if should_close:
131 session.close()
133 def mark_success(
134 self, resource_id: int, session: Optional[Session] = None
135 ) -> None:
136 """
137 Mark a resource as successfully downloaded.
139 Args:
140 resource_id: Resource identifier
141 session: Optional existing database session to reuse
142 """
143 # Use provided session or create new one
144 should_close = session is None
145 if session is None:
146 session = self._get_session()
148 try:
149 status = (
150 session.query(ResourceDownloadStatus)
151 .filter_by(resource_id=resource_id)
152 .first()
153 )
155 if status:
156 status.status = "completed"
157 status.failure_type = None
158 status.failure_message = None
159 status.retry_after_timestamp = None
160 status.updated_at = datetime.now(UTC)
161 logger.info(
162 f"[STATUS_TRACKER] Marked resource {resource_id} as successfully completed"
163 )
165 # Only commit if we created the session
166 if should_close:
167 session.commit()
168 finally:
169 if should_close:
170 session.close()
172 def can_retry(self, resource_id: int) -> tuple[bool, Optional[str]]:
173 """
174 Check if a resource can be retried right now.
176 Args:
177 resource_id: Resource identifier
179 Returns:
180 Tuple of (can_retry, reason_if_not)
181 """
182 with self._get_session() as session:
183 status = (
184 session.query(ResourceDownloadStatus)
185 .filter_by(resource_id=resource_id)
186 .first()
187 )
189 if not status:
190 # No status record, can retry
191 return True, None
193 # Check if permanently failed
194 if status.status == "permanently_failed":
195 return (
196 False,
197 f"Permanently failed: {status.failure_message or status.failure_type}",
198 )
200 # Check if temporarily failed and cooldown not expired
201 if (
202 status.status == "temporarily_failed"
203 and status.retry_after_timestamp
204 ):
205 # Ensure retry_after_timestamp is timezone-aware (handle legacy data)
206 retry_timestamp = status.retry_after_timestamp
207 if retry_timestamp.tzinfo is None:
208 # Assume UTC for naive timestamps
209 retry_timestamp = retry_timestamp.replace(tzinfo=UTC)
211 if datetime.now(UTC) < retry_timestamp:
212 return (
213 False,
214 f"Cooldown active, retry available at {retry_timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
215 )
217 # Check daily retry limit (max 3 retries per day)
218 if status.today_retry_count >= 3:
219 return (
220 False,
221 f"Daily retry limit exceeded ({status.today_retry_count}/3). Retry available tomorrow.",
222 )
224 # Can retry
225 return True, None
227 def get_resource_status(self, resource_id: int) -> Optional[Dict[str, Any]]:
228 """
229 Get the current status of a resource.
231 Args:
232 resource_id: Resource identifier
234 Returns:
235 Status information dictionary or None if not found
236 """
237 with self._get_session() as session:
238 status = (
239 session.query(ResourceDownloadStatus)
240 .filter_by(resource_id=resource_id)
241 .first()
242 )
244 if not status:
245 return None
247 return {
248 "resource_id": status.resource_id,
249 "status": status.status,
250 "failure_type": status.failure_type,
251 "failure_message": status.failure_message,
252 "retry_after_timestamp": status.retry_after_timestamp.isoformat()
253 if status.retry_after_timestamp
254 else None,
255 "last_attempt_at": status.last_attempt_at.isoformat()
256 if status.last_attempt_at
257 else None,
258 "total_retry_count": status.total_retry_count,
259 "today_retry_count": status.today_retry_count,
260 "created_at": status.created_at.isoformat(),
261 "updated_at": status.updated_at.isoformat(),
262 }
264 def get_failed_resources_count(self) -> Dict[str, int]:
265 """
266 Get counts of resources by failure type.
268 Returns:
269 Dictionary mapping failure types to counts
270 """
271 with self._get_session() as session:
272 failed_resources = (
273 session.query(ResourceDownloadStatus)
274 .filter(
275 ResourceDownloadStatus.status.in_(
276 ["temporarily_failed", "permanently_failed"]
277 )
278 )
279 .all()
280 )
282 counts = {}
283 for resource in failed_resources:
284 failure_type = resource.failure_type or "unknown"
285 counts[failure_type] = counts.get(failure_type, 0) + 1
287 return counts
289 def clear_permanent_failures(self, older_than_days: int = 30) -> int:
290 """
291 Clear permanent failure statuses for old records.
293 Args:
294 older_than_days: Clear failures older than this many days
296 Returns:
297 Number of records cleared
298 """
299 cutoff_date = datetime.now(UTC) - timedelta(days=older_than_days)
301 with self._get_session() as session:
302 old_failures = (
303 session.query(ResourceDownloadStatus)
304 .filter(
305 ResourceDownloadStatus.status == "permanently_failed",
306 ResourceDownloadStatus.created_at < cutoff_date,
307 )
308 .all()
309 )
311 count = len(old_failures)
312 for failure in old_failures:
313 failure.status = "available"
314 failure.failure_type = None
315 failure.failure_message = None
316 failure.retry_after_timestamp = None
317 failure.permanent_failure_at = None
318 failure.total_retry_count = 0
319 failure.today_retry_count = 0
320 failure.updated_at = datetime.now(UTC)
322 session.commit()
323 logger.info(
324 f"[STATUS_TRACKER] Cleared {count} old permanent failure records"
325 )
326 return count