Coverage for src / local_deep_research / library / download_management / status_tracker.py: 94%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Resource Status Tracker 

3 

4Tracks download attempts, failures, and cooldowns in the database. 

5Provides persistent storage for failure classifications and retry eligibility. 

6""" 

7 

8from datetime import datetime, timedelta, UTC 

9from typing import Optional, Dict, Any 

10 

11from loguru import logger 

12from sqlalchemy.orm import sessionmaker, Session 

13 

14from .models import ResourceDownloadStatus, Base 

15from .failure_classifier import BaseFailure 

16 

17MAX_TOTAL_RETRIES = 5 

18 

19 

20def compute_retry_cooldown( 

21 attempt: int, default_cooldown: timedelta 

22) -> Optional[timedelta]: 

23 """Return cooldown for this attempt number, or None for permanent failure. 

24 

25 Schedule: 

26 attempt 1: default_cooldown (from failure type) 

27 attempt 2: 1 day 

28 attempt 3-4: 30 days 

29 attempt >= 5: None (permanent failure) 

30 """ 

31 if attempt >= MAX_TOTAL_RETRIES: 

32 return None 

33 if attempt >= 3: 

34 return timedelta(days=30) 

35 if attempt == 2: 

36 return timedelta(days=1) 

37 return default_cooldown 

38 

39 

40class ResourceStatusTracker: 

41 """Track download attempts, failures, and cooldowns in database""" 

42 

43 def __init__(self, username: str, password: Optional[str] = None): 

44 """ 

45 Initialize the status tracker for a user. 

46 

47 Args: 

48 username: Username for database access 

49 password: Optional password for encrypted database 

50 """ 

51 self.username = username 

52 self.password = password 

53 

54 # Use the global db_manager singleton to share cached connections 

55 from ...database.encrypted_db import db_manager 

56 

57 self.db_manager = db_manager 

58 self.engine = db_manager.open_user_database(username, password) 

59 self.Session = sessionmaker(bind=self.engine) 

60 

61 # Create tables if they don't exist 

62 Base.metadata.create_all(self.engine) 

63 logger.info( 

64 f"[STATUS_TRACKER] Initialized for user: {username} with encrypted database" 

65 ) 

66 

67 def _get_session(self) -> Session: 

68 """Get a database session""" 

69 return self.Session() 

70 

71 def mark_failure( 

72 self, 

73 resource_id: int, 

74 failure: BaseFailure, 

75 session: Optional[Session] = None, 

76 ) -> None: 

77 """ 

78 Mark a resource as failed with classification. 

79 

80 Args: 

81 resource_id: Resource identifier 

82 failure: Classified failure object 

83 session: Optional existing database session to reuse 

84 """ 

85 if session is not None: 85 ↛ 89line 85 didn't jump to line 89 because the condition on line 85 was always true

86 self._apply_failure(session, resource_id, failure) 

87 return 

88 

89 with self._get_session() as session: 

90 self._apply_failure(session, resource_id, failure) 

91 session.commit() 

92 

93 def _apply_failure( 

94 self, session: Session, resource_id: int, failure: BaseFailure 

95 ) -> None: 

96 """Apply failure status updates to a session (does not commit).""" 

97 # Get or create status record 

98 status = ( 

99 session.query(ResourceDownloadStatus) 

100 .filter_by(resource_id=resource_id) 

101 .first() 

102 ) 

103 

104 if not status: 

105 status = ResourceDownloadStatus(resource_id=resource_id) 

106 session.add(status) 

107 

108 # Update status information 

109 if failure.is_permanent(): 

110 status.status = "permanently_failed" 

111 status.retry_after_timestamp = None 

112 status.failure_type = failure.error_type 

113 status.failure_message = failure.message 

114 status.permanent_failure_at = datetime.now(UTC) 

115 logger.info( 

116 f"[STATUS_TRACKER] Marked resource {resource_id} as permanently failed: {failure.error_type}" 

117 ) 

118 else: 

119 status.failure_type = failure.error_type 

120 status.failure_message = failure.message 

121 

122 attempt = (status.total_retry_count or 0) + 1 

123 cooldown = compute_retry_cooldown(attempt, failure.retry_after) 

124 

125 if cooldown is None: 

126 status.status = "permanently_failed" 

127 status.permanent_failure_at = datetime.now(UTC) 

128 status.retry_after_timestamp = None 

129 logger.info( 

130 f"[STATUS_TRACKER] Auto-promoted resource {resource_id} to " 

131 f"permanently failed after {attempt} attempts" 

132 ) 

133 else: 

134 status.status = "temporarily_failed" 

135 status.retry_after_timestamp = datetime.now(UTC) + cooldown 

136 logger.info( 

137 f"[STATUS_TRACKER] Marked resource {resource_id} as temporarily failed: " 

138 f"{failure.error_type}, attempt {attempt}, retry after: {cooldown}" 

139 ) 

140 

141 # Update retry statistics 

142 # Ensure total_retry_count is initialized (handle None from legacy data) 

143 if status.total_retry_count is None: 

144 status.total_retry_count = 0 

145 status.total_retry_count += 1 

146 

147 # Check if last attempt was today (before overwriting last_attempt_at) 

148 today = datetime.now(UTC).date() 

149 last_attempt = ( 

150 status.last_attempt_at.date() if status.last_attempt_at else None 

151 ) 

152 status.last_attempt_at = datetime.now(UTC) 

153 if last_attempt == today: 

154 # Ensure today_retry_count is initialized (handle None from legacy data) 

155 if status.today_retry_count is None: 

156 status.today_retry_count = 0 

157 status.today_retry_count += 1 

158 else: 

159 status.today_retry_count = 1 

160 

161 logger.debug( 

162 f"[STATUS_TRACKER] Updated failure status for resource {resource_id}" 

163 ) 

164 

165 def mark_success( 

166 self, resource_id: int, session: Optional[Session] = None 

167 ) -> None: 

168 """ 

169 Mark a resource as successfully downloaded. 

170 

171 Args: 

172 resource_id: Resource identifier 

173 session: Optional existing database session to reuse 

174 """ 

175 if session is not None: 175 ↛ 179line 175 didn't jump to line 179 because the condition on line 175 was always true

176 self._apply_success(session, resource_id) 

177 return 

178 

179 with self._get_session() as session: 

180 self._apply_success(session, resource_id) 

181 session.commit() 

182 

183 def _apply_success(self, session: Session, resource_id: int) -> None: 

184 """Apply success status updates to a session (does not commit).""" 

185 status = ( 

186 session.query(ResourceDownloadStatus) 

187 .filter_by(resource_id=resource_id) 

188 .first() 

189 ) 

190 

191 if status: 

192 status.status = "completed" 

193 status.failure_type = None 

194 status.failure_message = None 

195 status.retry_after_timestamp = None 

196 status.updated_at = datetime.now(UTC) 

197 logger.info( 

198 f"[STATUS_TRACKER] Marked resource {resource_id} as successfully completed" 

199 ) 

200 

201 def can_retry(self, resource_id: int) -> tuple[bool, Optional[str]]: 

202 """ 

203 Check if a resource can be retried right now. 

204 

205 Args: 

206 resource_id: Resource identifier 

207 

208 Returns: 

209 Tuple of (can_retry, reason_if_not) 

210 """ 

211 with self._get_session() as session: 

212 status = ( 

213 session.query(ResourceDownloadStatus) 

214 .filter_by(resource_id=resource_id) 

215 .first() 

216 ) 

217 

218 if not status: 

219 # No status record, can retry 

220 return True, None 

221 

222 # Check if permanently failed 

223 if status.status == "permanently_failed": 

224 return ( 

225 False, 

226 f"Permanently failed: {status.failure_message or status.failure_type}", 

227 ) 

228 

229 # Check if temporarily failed and cooldown not expired 

230 if ( 

231 status.status == "temporarily_failed" 

232 and status.retry_after_timestamp 

233 ): 

234 # Ensure retry_after_timestamp is timezone-aware (handle legacy data) 

235 retry_timestamp = status.retry_after_timestamp 

236 if retry_timestamp.tzinfo is None: 

237 # Assume UTC for naive timestamps 

238 retry_timestamp = retry_timestamp.replace(tzinfo=UTC) 

239 

240 if datetime.now(UTC) < retry_timestamp: 

241 return ( 

242 False, 

243 f"Cooldown active, retry available at {retry_timestamp.strftime('%Y-%m-%d %H:%M:%S')}", 

244 ) 

245 

246 # Check daily retry limit (max 3 retries per day) 

247 # today_retry_count is only reset inside _apply_failure, so check 

248 # whether the stored count is actually from today before using it. 

249 today = datetime.now(UTC).date() 

250 last_attempt_date = ( 

251 status.last_attempt_at.date() 

252 if status.last_attempt_at 

253 else None 

254 ) 

255 today_count = ( 

256 status.today_retry_count if last_attempt_date == today else 0 

257 ) 

258 if today_count >= 3: 

259 return ( 

260 False, 

261 f"Daily retry limit exceeded ({today_count}/3). Retry available tomorrow.", 

262 ) 

263 

264 # Check total retry limit (safety net for records not yet auto-promoted) 

265 if (status.total_retry_count or 0) >= MAX_TOTAL_RETRIES: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 return ( 

267 False, 

268 f"Permanently failed after {status.total_retry_count} attempts. Will not retry.", 

269 ) 

270 

271 # Can retry 

272 return True, None 

273 

274 def get_resource_status(self, resource_id: int) -> Optional[Dict[str, Any]]: 

275 """ 

276 Get the current status of a resource. 

277 

278 Args: 

279 resource_id: Resource identifier 

280 

281 Returns: 

282 Status information dictionary or None if not found 

283 """ 

284 with self._get_session() as session: 

285 status = ( 

286 session.query(ResourceDownloadStatus) 

287 .filter_by(resource_id=resource_id) 

288 .first() 

289 ) 

290 

291 if not status: 

292 return None 

293 

294 return { 

295 "resource_id": status.resource_id, 

296 "status": status.status, 

297 "failure_type": status.failure_type, 

298 "failure_message": status.failure_message, 

299 "retry_after_timestamp": status.retry_after_timestamp.isoformat() 

300 if status.retry_after_timestamp 

301 else None, 

302 "last_attempt_at": status.last_attempt_at.isoformat() 

303 if status.last_attempt_at 

304 else None, 

305 "total_retry_count": status.total_retry_count, 

306 "today_retry_count": status.today_retry_count, 

307 "created_at": status.created_at.isoformat(), 

308 "updated_at": status.updated_at.isoformat(), 

309 } 

310 

311 def get_failed_resources_count(self) -> Dict[str, int]: 

312 """ 

313 Get counts of resources by failure type. 

314 

315 Returns: 

316 Dictionary mapping failure types to counts 

317 """ 

318 with self._get_session() as session: 

319 failed_resources = ( 

320 session.query(ResourceDownloadStatus) 

321 .filter( 

322 ResourceDownloadStatus.status.in_( 

323 ["temporarily_failed", "permanently_failed"] 

324 ) 

325 ) 

326 .all() 

327 ) 

328 

329 counts = {} 

330 for resource in failed_resources: 

331 failure_type = resource.failure_type or "unknown" 

332 counts[failure_type] = counts.get(failure_type, 0) + 1 

333 

334 return counts 

335 

336 def clear_permanent_failures(self, older_than_days: int = 30) -> int: 

337 """ 

338 Clear permanent failure statuses for old records. 

339 

340 Args: 

341 older_than_days: Clear failures older than this many days 

342 

343 Returns: 

344 Number of records cleared 

345 """ 

346 cutoff_date = datetime.now(UTC) - timedelta(days=older_than_days) 

347 

348 with self._get_session() as session: 

349 old_failures = ( 

350 session.query(ResourceDownloadStatus) 

351 .filter( 

352 ResourceDownloadStatus.status == "permanently_failed", 

353 ResourceDownloadStatus.created_at < cutoff_date, 

354 ) 

355 .all() 

356 ) 

357 

358 count = len(old_failures) 

359 for failure in old_failures: 

360 failure.status = "available" 

361 failure.failure_type = None 

362 failure.failure_message = None 

363 failure.retry_after_timestamp = None 

364 failure.permanent_failure_at = None 

365 failure.total_retry_count = 0 

366 failure.today_retry_count = 0 

367 failure.updated_at = datetime.now(UTC) 

368 

369 session.commit() 

370 logger.info( 

371 f"[STATUS_TRACKER] Cleared {count} old permanent failure records" 

372 ) 

373 return count