Coverage for src/local_deep_research/library/download_management/status_tracker.py: 92%

142 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Resource Status Tracker 

3 

4Tracks download attempts, failures, and cooldowns in the database. 

5Provides persistent storage for failure classifications and retry eligibility. 

6""" 

7 

8from datetime import datetime, timedelta, UTC 

9from typing import Optional, Dict, Any 

10 

11from loguru import logger 

12from sqlalchemy.orm import sessionmaker, Session 

13 

14from .models import ResourceDownloadStatus, Base 

15from .failure_classifier import BaseFailure 

16 

17MAX_TOTAL_RETRIES = 5 

18 

19 

20def compute_retry_cooldown( 

21 attempt: int, default_cooldown: timedelta 

22) -> Optional[timedelta]: 

23 """Return cooldown for this attempt number, or None for permanent failure. 

24 

25 Schedule: 

26 attempt 1: default_cooldown (from failure type) 

27 attempt 2: 1 day 

28 attempt 3-4: 30 days 

29 attempt >= 5: None (permanent failure) 

30 """ 

31 if attempt >= MAX_TOTAL_RETRIES: 

32 return None 

33 if attempt >= 3: 

34 return timedelta(days=30) 

35 if attempt == 2: 

36 return timedelta(days=1) 

37 return default_cooldown 

38 

39 

40class ResourceStatusTracker: 

41 """Track download attempts, failures, and cooldowns in database""" 

42 

43 def __init__(self, username: str, password: Optional[str] = None): 

44 """ 

45 Initialize the status tracker for a user. 

46 

47 Args: 

48 username: Username for database access 

49 password: Optional password for encrypted database 

50 """ 

51 self.username = username 

52 self.password = password 

53 

54 # Use the global db_manager singleton to share cached connections 

55 from ...database.encrypted_db import ( 

56 DatabaseInitializationError, 

57 db_manager, 

58 ) 

59 

60 self.db_manager = db_manager 

61 try: 

62 self.engine = db_manager.open_user_database(username, password) 

63 except DatabaseInitializationError: 

64 # Surface init failures from the schedulers/library-init 

65 # callers as a plain RuntimeError — they all wrap construction 

66 # in try/except already, and propagating the typed exception 

67 # would couple every caller to encrypted_db's internals. 

68 logger.exception( 

69 f"[STATUS_TRACKER] Database init failed for user: {username}" 

70 ) 

71 raise RuntimeError( 

72 f"Database initialisation failed for user {username}" 

73 ) from None 

74 self.Session = sessionmaker(bind=self.engine) 

75 

76 # Create tables if they don't exist 

77 Base.metadata.create_all(self.engine) 

78 logger.info( 

79 f"[STATUS_TRACKER] Initialized for user: {username} with encrypted database" 

80 ) 

81 

82 def _get_session(self) -> Session: 

83 """Get a database session""" 

84 return self.Session() 

85 

86 def mark_failure( 

87 self, 

88 resource_id: int, 

89 failure: BaseFailure, 

90 session: Optional[Session] = None, 

91 ) -> None: 

92 """ 

93 Mark a resource as failed with classification. 

94 

95 Args: 

96 resource_id: Resource identifier 

97 failure: Classified failure object 

98 session: Optional existing database session to reuse 

99 """ 

100 if session is not None: 100 ↛ 104line 100 didn't jump to line 104 because the condition on line 100 was always true

101 self._apply_failure(session, resource_id, failure) 

102 return 

103 

104 with self._get_session() as session: 

105 self._apply_failure(session, resource_id, failure) 

106 session.commit() 

107 

108 def _apply_failure( 

109 self, session: Session, resource_id: int, failure: BaseFailure 

110 ) -> None: 

111 """Apply failure status updates to a session (does not commit).""" 

112 # Get or create status record 

113 status = ( 

114 session.query(ResourceDownloadStatus) 

115 .filter_by(resource_id=resource_id) 

116 .first() 

117 ) 

118 

119 if not status: 

120 status = ResourceDownloadStatus(resource_id=resource_id) 

121 session.add(status) 

122 

123 # Update status information 

124 if failure.is_permanent(): 

125 status.status = "permanently_failed" 

126 status.retry_after_timestamp = None 

127 status.failure_type = failure.error_type 

128 status.failure_message = failure.message 

129 status.permanent_failure_at = datetime.now(UTC) 

130 logger.info( 

131 f"[STATUS_TRACKER] Marked resource {resource_id} as permanently failed: {failure.error_type}" 

132 ) 

133 else: 

134 status.failure_type = failure.error_type 

135 status.failure_message = failure.message 

136 

137 attempt = (status.total_retry_count or 0) + 1 

138 cooldown = compute_retry_cooldown(attempt, failure.retry_after) 

139 

140 if cooldown is None: 

141 status.status = "permanently_failed" 

142 status.permanent_failure_at = datetime.now(UTC) 

143 status.retry_after_timestamp = None 

144 logger.info( 

145 f"[STATUS_TRACKER] Auto-promoted resource {resource_id} to " 

146 f"permanently failed after {attempt} attempts" 

147 ) 

148 else: 

149 status.status = "temporarily_failed" 

150 status.retry_after_timestamp = datetime.now(UTC) + cooldown 

151 logger.info( 

152 f"[STATUS_TRACKER] Marked resource {resource_id} as temporarily failed: " 

153 f"{failure.error_type}, attempt {attempt}, retry after: {cooldown}" 

154 ) 

155 

156 # Update retry statistics 

157 # Ensure total_retry_count is initialized (handle None from legacy data) 

158 if status.total_retry_count is None: 

159 status.total_retry_count = 0 

160 status.total_retry_count += 1 

161 

162 # Check if last attempt was today (before overwriting last_attempt_at) 

163 today = datetime.now(UTC).date() 

164 last_attempt = ( 

165 status.last_attempt_at.date() if status.last_attempt_at else None 

166 ) 

167 status.last_attempt_at = datetime.now(UTC) 

168 if last_attempt == today: 

169 # Ensure today_retry_count is initialized (handle None from legacy data) 

170 if status.today_retry_count is None: 

171 status.today_retry_count = 0 

172 status.today_retry_count += 1 

173 else: 

174 status.today_retry_count = 1 

175 

176 logger.debug( 

177 f"[STATUS_TRACKER] Updated failure status for resource {resource_id}" 

178 ) 

179 

180 def mark_success( 

181 self, resource_id: int, session: Optional[Session] = None 

182 ) -> None: 

183 """ 

184 Mark a resource as successfully downloaded. 

185 

186 Args: 

187 resource_id: Resource identifier 

188 session: Optional existing database session to reuse 

189 """ 

190 if session is not None: 190 ↛ 194line 190 didn't jump to line 194 because the condition on line 190 was always true

191 self._apply_success(session, resource_id) 

192 return 

193 

194 with self._get_session() as session: 

195 self._apply_success(session, resource_id) 

196 session.commit() 

197 

198 def _apply_success(self, session: Session, resource_id: int) -> None: 

199 """Apply success status updates to a session (does not commit).""" 

200 status = ( 

201 session.query(ResourceDownloadStatus) 

202 .filter_by(resource_id=resource_id) 

203 .first() 

204 ) 

205 

206 if status: 

207 status.status = "completed" 

208 status.failure_type = None 

209 status.failure_message = None 

210 status.retry_after_timestamp = None 

211 status.updated_at = datetime.now(UTC) 

212 logger.info( 

213 f"[STATUS_TRACKER] Marked resource {resource_id} as successfully completed" 

214 ) 

215 

216 def can_retry(self, resource_id: int) -> tuple[bool, Optional[str]]: 

217 """ 

218 Check if a resource can be retried right now. 

219 

220 Args: 

221 resource_id: Resource identifier 

222 

223 Returns: 

224 Tuple of (can_retry, reason_if_not) 

225 """ 

226 with self._get_session() as session: 

227 status = ( 

228 session.query(ResourceDownloadStatus) 

229 .filter_by(resource_id=resource_id) 

230 .first() 

231 ) 

232 

233 if not status: 

234 # No status record, can retry 

235 return True, None 

236 

237 # Check if permanently failed 

238 if status.status == "permanently_failed": 

239 return ( 

240 False, 

241 f"Permanently failed: {status.failure_message or status.failure_type}", 

242 ) 

243 

244 # Check if temporarily failed and cooldown not expired 

245 if ( 

246 status.status == "temporarily_failed" 

247 and status.retry_after_timestamp 

248 ): 

249 # Ensure retry_after_timestamp is timezone-aware (handle legacy data) 

250 retry_timestamp = status.retry_after_timestamp 

251 if retry_timestamp.tzinfo is None: 

252 # Assume UTC for naive timestamps 

253 retry_timestamp = retry_timestamp.replace(tzinfo=UTC) 

254 

255 if datetime.now(UTC) < retry_timestamp: 

256 return ( 

257 False, 

258 f"Cooldown active, retry available at {retry_timestamp.strftime('%Y-%m-%d %H:%M:%S')}", 

259 ) 

260 

261 # Check daily retry limit (max 3 retries per day) 

262 # today_retry_count is only reset inside _apply_failure, so check 

263 # whether the stored count is actually from today before using it. 

264 today = datetime.now(UTC).date() 

265 last_attempt_date = ( 

266 status.last_attempt_at.date() 

267 if status.last_attempt_at 

268 else None 

269 ) 

270 today_count = ( 

271 status.today_retry_count if last_attempt_date == today else 0 

272 ) 

273 if today_count >= 3: 

274 return ( 

275 False, 

276 f"Daily retry limit exceeded ({today_count}/3). Retry available tomorrow.", 

277 ) 

278 

279 # Check total retry limit (safety net for records not yet auto-promoted) 

280 if (status.total_retry_count or 0) >= MAX_TOTAL_RETRIES: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 return ( 

282 False, 

283 f"Permanently failed after {status.total_retry_count} attempts. Will not retry.", 

284 ) 

285 

286 # Can retry 

287 return True, None 

288 

289 def get_resource_status(self, resource_id: int) -> Optional[Dict[str, Any]]: 

290 """ 

291 Get the current status of a resource. 

292 

293 Args: 

294 resource_id: Resource identifier 

295 

296 Returns: 

297 Status information dictionary or None if not found 

298 """ 

299 with self._get_session() as session: 

300 status = ( 

301 session.query(ResourceDownloadStatus) 

302 .filter_by(resource_id=resource_id) 

303 .first() 

304 ) 

305 

306 if not status: 

307 return None 

308 

309 return { 

310 "resource_id": status.resource_id, 

311 "status": status.status, 

312 "failure_type": status.failure_type, 

313 "failure_message": status.failure_message, 

314 "retry_after_timestamp": status.retry_after_timestamp.isoformat() 

315 if status.retry_after_timestamp 

316 else None, 

317 "last_attempt_at": status.last_attempt_at.isoformat() 

318 if status.last_attempt_at 

319 else None, 

320 "total_retry_count": status.total_retry_count, 

321 "today_retry_count": status.today_retry_count, 

322 "created_at": status.created_at.isoformat(), 

323 "updated_at": status.updated_at.isoformat(), 

324 } 

325 

326 def get_failed_resources_count(self) -> Dict[str, int]: 

327 """ 

328 Get counts of resources by failure type. 

329 

330 Returns: 

331 Dictionary mapping failure types to counts 

332 """ 

333 with self._get_session() as session: 

334 failed_resources = ( 

335 session.query(ResourceDownloadStatus) 

336 .filter( 

337 ResourceDownloadStatus.status.in_( 

338 ["temporarily_failed", "permanently_failed"] 

339 ) 

340 ) 

341 .all() 

342 ) 

343 

344 counts = {} 

345 for resource in failed_resources: 

346 failure_type = resource.failure_type or "unknown" 

347 counts[failure_type] = counts.get(failure_type, 0) + 1 

348 

349 return counts 

350 

351 def clear_permanent_failures(self, older_than_days: int = 30) -> int: 

352 """ 

353 Clear permanent failure statuses for old records. 

354 

355 Args: 

356 older_than_days: Clear failures older than this many days 

357 

358 Returns: 

359 Number of records cleared 

360 """ 

361 cutoff_date = datetime.now(UTC) - timedelta(days=older_than_days) 

362 

363 with self._get_session() as session: 

364 old_failures = ( 

365 session.query(ResourceDownloadStatus) 

366 .filter( 

367 ResourceDownloadStatus.status == "permanently_failed", 

368 ResourceDownloadStatus.created_at < cutoff_date, 

369 ) 

370 .all() 

371 ) 

372 

373 count = len(old_failures) 

374 for failure in old_failures: 

375 failure.status = "available" 

376 failure.failure_type = None 

377 failure.failure_message = None 

378 failure.retry_after_timestamp = None 

379 failure.permanent_failure_at = None 

380 failure.total_retry_count = 0 

381 failure.today_retry_count = 0 

382 failure.updated_at = datetime.now(UTC) 

383 

384 session.commit() 

385 logger.info( 

386 f"[STATUS_TRACKER] Cleared {count} old permanent failure records" 

387 ) 

388 return count