Coverage for src / local_deep_research / library / download_management / status_tracker.py: 94%
138 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Resource Status Tracker
4Tracks download attempts, failures, and cooldowns in the database.
5Provides persistent storage for failure classifications and retry eligibility.
6"""
8from datetime import datetime, timedelta, UTC
9from typing import Optional, Dict, Any
11from loguru import logger
12from sqlalchemy.orm import sessionmaker, Session
14from .models import ResourceDownloadStatus, Base
15from .failure_classifier import BaseFailure
17MAX_TOTAL_RETRIES = 5
20def compute_retry_cooldown(
21 attempt: int, default_cooldown: timedelta
22) -> Optional[timedelta]:
23 """Return cooldown for this attempt number, or None for permanent failure.
25 Schedule:
26 attempt 1: default_cooldown (from failure type)
27 attempt 2: 1 day
28 attempt 3-4: 30 days
29 attempt >= 5: None (permanent failure)
30 """
31 if attempt >= MAX_TOTAL_RETRIES:
32 return None
33 if attempt >= 3:
34 return timedelta(days=30)
35 if attempt == 2:
36 return timedelta(days=1)
37 return default_cooldown
40class ResourceStatusTracker:
41 """Track download attempts, failures, and cooldowns in database"""
43 def __init__(self, username: str, password: Optional[str] = None):
44 """
45 Initialize the status tracker for a user.
47 Args:
48 username: Username for database access
49 password: Optional password for encrypted database
50 """
51 self.username = username
52 self.password = password
54 # Use the global db_manager singleton to share cached connections
55 from ...database.encrypted_db import db_manager
57 self.db_manager = db_manager
58 self.engine = db_manager.open_user_database(username, password)
59 self.Session = sessionmaker(bind=self.engine)
61 # Create tables if they don't exist
62 Base.metadata.create_all(self.engine)
63 logger.info(
64 f"[STATUS_TRACKER] Initialized for user: {username} with encrypted database"
65 )
67 def _get_session(self) -> Session:
68 """Get a database session"""
69 return self.Session()
71 def mark_failure(
72 self,
73 resource_id: int,
74 failure: BaseFailure,
75 session: Optional[Session] = None,
76 ) -> None:
77 """
78 Mark a resource as failed with classification.
80 Args:
81 resource_id: Resource identifier
82 failure: Classified failure object
83 session: Optional existing database session to reuse
84 """
85 if session is not None: 85 ↛ 89line 85 didn't jump to line 89 because the condition on line 85 was always true
86 self._apply_failure(session, resource_id, failure)
87 return
89 with self._get_session() as session:
90 self._apply_failure(session, resource_id, failure)
91 session.commit()
93 def _apply_failure(
94 self, session: Session, resource_id: int, failure: BaseFailure
95 ) -> None:
96 """Apply failure status updates to a session (does not commit)."""
97 # Get or create status record
98 status = (
99 session.query(ResourceDownloadStatus)
100 .filter_by(resource_id=resource_id)
101 .first()
102 )
104 if not status:
105 status = ResourceDownloadStatus(resource_id=resource_id)
106 session.add(status)
108 # Update status information
109 if failure.is_permanent():
110 status.status = "permanently_failed"
111 status.retry_after_timestamp = None
112 status.failure_type = failure.error_type
113 status.failure_message = failure.message
114 status.permanent_failure_at = datetime.now(UTC)
115 logger.info(
116 f"[STATUS_TRACKER] Marked resource {resource_id} as permanently failed: {failure.error_type}"
117 )
118 else:
119 status.failure_type = failure.error_type
120 status.failure_message = failure.message
122 attempt = (status.total_retry_count or 0) + 1
123 cooldown = compute_retry_cooldown(attempt, failure.retry_after)
125 if cooldown is None:
126 status.status = "permanently_failed"
127 status.permanent_failure_at = datetime.now(UTC)
128 status.retry_after_timestamp = None
129 logger.info(
130 f"[STATUS_TRACKER] Auto-promoted resource {resource_id} to "
131 f"permanently failed after {attempt} attempts"
132 )
133 else:
134 status.status = "temporarily_failed"
135 status.retry_after_timestamp = datetime.now(UTC) + cooldown
136 logger.info(
137 f"[STATUS_TRACKER] Marked resource {resource_id} as temporarily failed: "
138 f"{failure.error_type}, attempt {attempt}, retry after: {cooldown}"
139 )
141 # Update retry statistics
142 # Ensure total_retry_count is initialized (handle None from legacy data)
143 if status.total_retry_count is None:
144 status.total_retry_count = 0
145 status.total_retry_count += 1
147 # Check if last attempt was today (before overwriting last_attempt_at)
148 today = datetime.now(UTC).date()
149 last_attempt = (
150 status.last_attempt_at.date() if status.last_attempt_at else None
151 )
152 status.last_attempt_at = datetime.now(UTC)
153 if last_attempt == today:
154 # Ensure today_retry_count is initialized (handle None from legacy data)
155 if status.today_retry_count is None:
156 status.today_retry_count = 0
157 status.today_retry_count += 1
158 else:
159 status.today_retry_count = 1
161 logger.debug(
162 f"[STATUS_TRACKER] Updated failure status for resource {resource_id}"
163 )
165 def mark_success(
166 self, resource_id: int, session: Optional[Session] = None
167 ) -> None:
168 """
169 Mark a resource as successfully downloaded.
171 Args:
172 resource_id: Resource identifier
173 session: Optional existing database session to reuse
174 """
175 if session is not None: 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true
176 self._apply_success(session, resource_id)
177 return
179 with self._get_session() as session:
180 self._apply_success(session, resource_id)
181 session.commit()
183 def _apply_success(self, session: Session, resource_id: int) -> None:
184 """Apply success status updates to a session (does not commit)."""
185 status = (
186 session.query(ResourceDownloadStatus)
187 .filter_by(resource_id=resource_id)
188 .first()
189 )
191 if status:
192 status.status = "completed"
193 status.failure_type = None
194 status.failure_message = None
195 status.retry_after_timestamp = None
196 status.updated_at = datetime.now(UTC)
197 logger.info(
198 f"[STATUS_TRACKER] Marked resource {resource_id} as successfully completed"
199 )
201 def can_retry(self, resource_id: int) -> tuple[bool, Optional[str]]:
202 """
203 Check if a resource can be retried right now.
205 Args:
206 resource_id: Resource identifier
208 Returns:
209 Tuple of (can_retry, reason_if_not)
210 """
211 with self._get_session() as session:
212 status = (
213 session.query(ResourceDownloadStatus)
214 .filter_by(resource_id=resource_id)
215 .first()
216 )
218 if not status:
219 # No status record, can retry
220 return True, None
222 # Check if permanently failed
223 if status.status == "permanently_failed":
224 return (
225 False,
226 f"Permanently failed: {status.failure_message or status.failure_type}",
227 )
229 # Check if temporarily failed and cooldown not expired
230 if (
231 status.status == "temporarily_failed"
232 and status.retry_after_timestamp
233 ):
234 # Ensure retry_after_timestamp is timezone-aware (handle legacy data)
235 retry_timestamp = status.retry_after_timestamp
236 if retry_timestamp.tzinfo is None:
237 # Assume UTC for naive timestamps
238 retry_timestamp = retry_timestamp.replace(tzinfo=UTC)
240 if datetime.now(UTC) < retry_timestamp:
241 return (
242 False,
243 f"Cooldown active, retry available at {retry_timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
244 )
246 # Check daily retry limit (max 3 retries per day)
247 # today_retry_count is only reset inside _apply_failure, so check
248 # whether the stored count is actually from today before using it.
249 today = datetime.now(UTC).date()
250 last_attempt_date = (
251 status.last_attempt_at.date()
252 if status.last_attempt_at
253 else None
254 )
255 today_count = (
256 status.today_retry_count if last_attempt_date == today else 0
257 )
258 if today_count >= 3:
259 return (
260 False,
261 f"Daily retry limit exceeded ({today_count}/3). Retry available tomorrow.",
262 )
264 # Check total retry limit (safety net for records not yet auto-promoted)
265 if (status.total_retry_count or 0) >= MAX_TOTAL_RETRIES: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 return (
267 False,
268 f"Permanently failed after {status.total_retry_count} attempts. Will not retry.",
269 )
271 # Can retry
272 return True, None
274 def get_resource_status(self, resource_id: int) -> Optional[Dict[str, Any]]:
275 """
276 Get the current status of a resource.
278 Args:
279 resource_id: Resource identifier
281 Returns:
282 Status information dictionary or None if not found
283 """
284 with self._get_session() as session:
285 status = (
286 session.query(ResourceDownloadStatus)
287 .filter_by(resource_id=resource_id)
288 .first()
289 )
291 if not status:
292 return None
294 return {
295 "resource_id": status.resource_id,
296 "status": status.status,
297 "failure_type": status.failure_type,
298 "failure_message": status.failure_message,
299 "retry_after_timestamp": status.retry_after_timestamp.isoformat()
300 if status.retry_after_timestamp
301 else None,
302 "last_attempt_at": status.last_attempt_at.isoformat()
303 if status.last_attempt_at
304 else None,
305 "total_retry_count": status.total_retry_count,
306 "today_retry_count": status.today_retry_count,
307 "created_at": status.created_at.isoformat(),
308 "updated_at": status.updated_at.isoformat(),
309 }
311 def get_failed_resources_count(self) -> Dict[str, int]:
312 """
313 Get counts of resources by failure type.
315 Returns:
316 Dictionary mapping failure types to counts
317 """
318 with self._get_session() as session:
319 failed_resources = (
320 session.query(ResourceDownloadStatus)
321 .filter(
322 ResourceDownloadStatus.status.in_(
323 ["temporarily_failed", "permanently_failed"]
324 )
325 )
326 .all()
327 )
329 counts = {}
330 for resource in failed_resources:
331 failure_type = resource.failure_type or "unknown"
332 counts[failure_type] = counts.get(failure_type, 0) + 1
334 return counts
336 def clear_permanent_failures(self, older_than_days: int = 30) -> int:
337 """
338 Clear permanent failure statuses for old records.
340 Args:
341 older_than_days: Clear failures older than this many days
343 Returns:
344 Number of records cleared
345 """
346 cutoff_date = datetime.now(UTC) - timedelta(days=older_than_days)
348 with self._get_session() as session:
349 old_failures = (
350 session.query(ResourceDownloadStatus)
351 .filter(
352 ResourceDownloadStatus.status == "permanently_failed",
353 ResourceDownloadStatus.created_at < cutoff_date,
354 )
355 .all()
356 )
358 count = len(old_failures)
359 for failure in old_failures:
360 failure.status = "available"
361 failure.failure_type = None
362 failure.failure_message = None
363 failure.retry_after_timestamp = None
364 failure.permanent_failure_at = None
365 failure.total_retry_count = 0
366 failure.today_retry_count = 0
367 failure.updated_at = datetime.now(UTC)
369 session.commit()
370 logger.info(
371 f"[STATUS_TRACKER] Cleared {count} old permanent failure records"
372 )
373 return count