Coverage for src/local_deep_research/library/download_management/status_tracker.py: 92%
142 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Resource Status Tracker
4Tracks download attempts, failures, and cooldowns in the database.
5Provides persistent storage for failure classifications and retry eligibility.
6"""
8from datetime import datetime, timedelta, UTC
9from typing import Optional, Dict, Any
11from loguru import logger
12from sqlalchemy.orm import sessionmaker, Session
14from .models import ResourceDownloadStatus, Base
15from .failure_classifier import BaseFailure
17MAX_TOTAL_RETRIES = 5
20def compute_retry_cooldown(
21 attempt: int, default_cooldown: timedelta
22) -> Optional[timedelta]:
23 """Return cooldown for this attempt number, or None for permanent failure.
25 Schedule:
26 attempt 1: default_cooldown (from failure type)
27 attempt 2: 1 day
28 attempt 3-4: 30 days
29 attempt >= 5: None (permanent failure)
30 """
31 if attempt >= MAX_TOTAL_RETRIES:
32 return None
33 if attempt >= 3:
34 return timedelta(days=30)
35 if attempt == 2:
36 return timedelta(days=1)
37 return default_cooldown
40class ResourceStatusTracker:
41 """Track download attempts, failures, and cooldowns in database"""
43 def __init__(self, username: str, password: Optional[str] = None):
44 """
45 Initialize the status tracker for a user.
47 Args:
48 username: Username for database access
49 password: Optional password for encrypted database
50 """
51 self.username = username
52 self.password = password
54 # Use the global db_manager singleton to share cached connections
55 from ...database.encrypted_db import (
56 DatabaseInitializationError,
57 db_manager,
58 )
60 self.db_manager = db_manager
61 try:
62 self.engine = db_manager.open_user_database(username, password)
63 except DatabaseInitializationError:
64 # Surface init failures from the schedulers/library-init
65 # callers as a plain RuntimeError — they all wrap construction
66 # in try/except already, and propagating the typed exception
67 # would couple every caller to encrypted_db's internals.
68 logger.exception(
69 f"[STATUS_TRACKER] Database init failed for user: {username}"
70 )
71 raise RuntimeError(
72 f"Database initialisation failed for user {username}"
73 ) from None
74 self.Session = sessionmaker(bind=self.engine)
76 # Create tables if they don't exist
77 Base.metadata.create_all(self.engine)
78 logger.info(
79 f"[STATUS_TRACKER] Initialized for user: {username} with encrypted database"
80 )
82 def _get_session(self) -> Session:
83 """Get a database session"""
84 return self.Session()
86 def mark_failure(
87 self,
88 resource_id: int,
89 failure: BaseFailure,
90 session: Optional[Session] = None,
91 ) -> None:
92 """
93 Mark a resource as failed with classification.
95 Args:
96 resource_id: Resource identifier
97 failure: Classified failure object
98 session: Optional existing database session to reuse
99 """
100 if session is not None: 100 ↛ 104line 100 didn't jump to line 104 because the condition on line 100 was always true
101 self._apply_failure(session, resource_id, failure)
102 return
104 with self._get_session() as session:
105 self._apply_failure(session, resource_id, failure)
106 session.commit()
108 def _apply_failure(
109 self, session: Session, resource_id: int, failure: BaseFailure
110 ) -> None:
111 """Apply failure status updates to a session (does not commit)."""
112 # Get or create status record
113 status = (
114 session.query(ResourceDownloadStatus)
115 .filter_by(resource_id=resource_id)
116 .first()
117 )
119 if not status:
120 status = ResourceDownloadStatus(resource_id=resource_id)
121 session.add(status)
123 # Update status information
124 if failure.is_permanent():
125 status.status = "permanently_failed"
126 status.retry_after_timestamp = None
127 status.failure_type = failure.error_type
128 status.failure_message = failure.message
129 status.permanent_failure_at = datetime.now(UTC)
130 logger.info(
131 f"[STATUS_TRACKER] Marked resource {resource_id} as permanently failed: {failure.error_type}"
132 )
133 else:
134 status.failure_type = failure.error_type
135 status.failure_message = failure.message
137 attempt = (status.total_retry_count or 0) + 1
138 cooldown = compute_retry_cooldown(attempt, failure.retry_after)
140 if cooldown is None:
141 status.status = "permanently_failed"
142 status.permanent_failure_at = datetime.now(UTC)
143 status.retry_after_timestamp = None
144 logger.info(
145 f"[STATUS_TRACKER] Auto-promoted resource {resource_id} to "
146 f"permanently failed after {attempt} attempts"
147 )
148 else:
149 status.status = "temporarily_failed"
150 status.retry_after_timestamp = datetime.now(UTC) + cooldown
151 logger.info(
152 f"[STATUS_TRACKER] Marked resource {resource_id} as temporarily failed: "
153 f"{failure.error_type}, attempt {attempt}, retry after: {cooldown}"
154 )
156 # Update retry statistics
157 # Ensure total_retry_count is initialized (handle None from legacy data)
158 if status.total_retry_count is None:
159 status.total_retry_count = 0
160 status.total_retry_count += 1
162 # Check if last attempt was today (before overwriting last_attempt_at)
163 today = datetime.now(UTC).date()
164 last_attempt = (
165 status.last_attempt_at.date() if status.last_attempt_at else None
166 )
167 status.last_attempt_at = datetime.now(UTC)
168 if last_attempt == today:
169 # Ensure today_retry_count is initialized (handle None from legacy data)
170 if status.today_retry_count is None:
171 status.today_retry_count = 0
172 status.today_retry_count += 1
173 else:
174 status.today_retry_count = 1
176 logger.debug(
177 f"[STATUS_TRACKER] Updated failure status for resource {resource_id}"
178 )
180 def mark_success(
181 self, resource_id: int, session: Optional[Session] = None
182 ) -> None:
183 """
184 Mark a resource as successfully downloaded.
186 Args:
187 resource_id: Resource identifier
188 session: Optional existing database session to reuse
189 """
190 if session is not None: 190 ↛ 194line 190 didn't jump to line 194 because the condition on line 190 was always true
191 self._apply_success(session, resource_id)
192 return
194 with self._get_session() as session:
195 self._apply_success(session, resource_id)
196 session.commit()
198 def _apply_success(self, session: Session, resource_id: int) -> None:
199 """Apply success status updates to a session (does not commit)."""
200 status = (
201 session.query(ResourceDownloadStatus)
202 .filter_by(resource_id=resource_id)
203 .first()
204 )
206 if status:
207 status.status = "completed"
208 status.failure_type = None
209 status.failure_message = None
210 status.retry_after_timestamp = None
211 status.updated_at = datetime.now(UTC)
212 logger.info(
213 f"[STATUS_TRACKER] Marked resource {resource_id} as successfully completed"
214 )
216 def can_retry(self, resource_id: int) -> tuple[bool, Optional[str]]:
217 """
218 Check if a resource can be retried right now.
220 Args:
221 resource_id: Resource identifier
223 Returns:
224 Tuple of (can_retry, reason_if_not)
225 """
226 with self._get_session() as session:
227 status = (
228 session.query(ResourceDownloadStatus)
229 .filter_by(resource_id=resource_id)
230 .first()
231 )
233 if not status:
234 # No status record, can retry
235 return True, None
237 # Check if permanently failed
238 if status.status == "permanently_failed":
239 return (
240 False,
241 f"Permanently failed: {status.failure_message or status.failure_type}",
242 )
244 # Check if temporarily failed and cooldown not expired
245 if (
246 status.status == "temporarily_failed"
247 and status.retry_after_timestamp
248 ):
249 # Ensure retry_after_timestamp is timezone-aware (handle legacy data)
250 retry_timestamp = status.retry_after_timestamp
251 if retry_timestamp.tzinfo is None:
252 # Assume UTC for naive timestamps
253 retry_timestamp = retry_timestamp.replace(tzinfo=UTC)
255 if datetime.now(UTC) < retry_timestamp:
256 return (
257 False,
258 f"Cooldown active, retry available at {retry_timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
259 )
261 # Check daily retry limit (max 3 retries per day)
262 # today_retry_count is only reset inside _apply_failure, so check
263 # whether the stored count is actually from today before using it.
264 today = datetime.now(UTC).date()
265 last_attempt_date = (
266 status.last_attempt_at.date()
267 if status.last_attempt_at
268 else None
269 )
270 today_count = (
271 status.today_retry_count if last_attempt_date == today else 0
272 )
273 if today_count >= 3:
274 return (
275 False,
276 f"Daily retry limit exceeded ({today_count}/3). Retry available tomorrow.",
277 )
279 # Check total retry limit (safety net for records not yet auto-promoted)
280 if (status.total_retry_count or 0) >= MAX_TOTAL_RETRIES: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 return (
282 False,
283 f"Permanently failed after {status.total_retry_count} attempts. Will not retry.",
284 )
286 # Can retry
287 return True, None
289 def get_resource_status(self, resource_id: int) -> Optional[Dict[str, Any]]:
290 """
291 Get the current status of a resource.
293 Args:
294 resource_id: Resource identifier
296 Returns:
297 Status information dictionary or None if not found
298 """
299 with self._get_session() as session:
300 status = (
301 session.query(ResourceDownloadStatus)
302 .filter_by(resource_id=resource_id)
303 .first()
304 )
306 if not status:
307 return None
309 return {
310 "resource_id": status.resource_id,
311 "status": status.status,
312 "failure_type": status.failure_type,
313 "failure_message": status.failure_message,
314 "retry_after_timestamp": status.retry_after_timestamp.isoformat()
315 if status.retry_after_timestamp
316 else None,
317 "last_attempt_at": status.last_attempt_at.isoformat()
318 if status.last_attempt_at
319 else None,
320 "total_retry_count": status.total_retry_count,
321 "today_retry_count": status.today_retry_count,
322 "created_at": status.created_at.isoformat(),
323 "updated_at": status.updated_at.isoformat(),
324 }
326 def get_failed_resources_count(self) -> Dict[str, int]:
327 """
328 Get counts of resources by failure type.
330 Returns:
331 Dictionary mapping failure types to counts
332 """
333 with self._get_session() as session:
334 failed_resources = (
335 session.query(ResourceDownloadStatus)
336 .filter(
337 ResourceDownloadStatus.status.in_(
338 ["temporarily_failed", "permanently_failed"]
339 )
340 )
341 .all()
342 )
344 counts = {}
345 for resource in failed_resources:
346 failure_type = resource.failure_type or "unknown"
347 counts[failure_type] = counts.get(failure_type, 0) + 1
349 return counts
351 def clear_permanent_failures(self, older_than_days: int = 30) -> int:
352 """
353 Clear permanent failure statuses for old records.
355 Args:
356 older_than_days: Clear failures older than this many days
358 Returns:
359 Number of records cleared
360 """
361 cutoff_date = datetime.now(UTC) - timedelta(days=older_than_days)
363 with self._get_session() as session:
364 old_failures = (
365 session.query(ResourceDownloadStatus)
366 .filter(
367 ResourceDownloadStatus.status == "permanently_failed",
368 ResourceDownloadStatus.created_at < cutoff_date,
369 )
370 .all()
371 )
373 count = len(old_failures)
374 for failure in old_failures:
375 failure.status = "available"
376 failure.failure_type = None
377 failure.failure_message = None
378 failure.retry_after_timestamp = None
379 failure.permanent_failure_at = None
380 failure.total_retry_count = 0
381 failure.today_retry_count = 0
382 failure.updated_at = datetime.now(UTC)
384 session.commit()
385 logger.info(
386 f"[STATUS_TRACKER] Cleared {count} old permanent failure records"
387 )
388 return count