Coverage for src / local_deep_research / library / download_management / status_tracker.py: 9%

119 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Resource Status Tracker 

3 

4Tracks download attempts, failures, and cooldowns in the database. 

5Provides persistent storage for failure classifications and retry eligibility. 

6""" 

7 

8from datetime import datetime, timedelta, UTC 

9from typing import Optional, Dict, Any 

10 

11from loguru import logger 

12from sqlalchemy.orm import sessionmaker, Session 

13 

14from .models import ResourceDownloadStatus, Base 

15from .failure_classifier import BaseFailure 

16 

17 

18class ResourceStatusTracker: 

19 """Track download attempts, failures, and cooldowns in database""" 

20 

21 def __init__(self, username: str, password: Optional[str] = None): 

22 """ 

23 Initialize the status tracker for a user. 

24 

25 Args: 

26 username: Username for database access 

27 password: Optional password for encrypted database 

28 """ 

29 self.username = username 

30 self.password = password 

31 

32 # Use the global db_manager singleton to share cached connections 

33 from ...database.encrypted_db import db_manager 

34 

35 self.db_manager = db_manager 

36 self.engine = db_manager.open_user_database(username, password) 

37 self.Session = sessionmaker(bind=self.engine) 

38 

39 # Create tables if they don't exist 

40 Base.metadata.create_all(self.engine) 

41 logger.info( 

42 f"[STATUS_TRACKER] Initialized for user: {username} with encrypted database" 

43 ) 

44 

45 def _get_session(self) -> Session: 

46 """Get a database session""" 

47 return self.Session() 

48 

49 def mark_failure( 

50 self, 

51 resource_id: int, 

52 failure: BaseFailure, 

53 session: Optional[Session] = None, 

54 ) -> None: 

55 """ 

56 Mark a resource as failed with classification. 

57 

58 Args: 

59 resource_id: Resource identifier 

60 failure: Classified failure object 

61 session: Optional existing database session to reuse 

62 """ 

63 # Use provided session or create new one 

64 should_close = session is None 

65 if session is None: 

66 session = self._get_session() 

67 

68 try: 

69 # Get or create status record 

70 status = ( 

71 session.query(ResourceDownloadStatus) 

72 .filter_by(resource_id=resource_id) 

73 .first() 

74 ) 

75 

76 if not status: 

77 status = ResourceDownloadStatus(resource_id=resource_id) 

78 session.add(status) 

79 

80 # Update status information 

81 if failure.is_permanent(): 

82 status.status = "permanently_failed" 

83 status.retry_after_timestamp = None 

84 status.failure_type = failure.error_type 

85 status.failure_message = failure.message 

86 status.permanent_failure_at = datetime.now(UTC) 

87 logger.info( 

88 f"[STATUS_TRACKER] Marked resource {resource_id} as permanently failed: {failure.error_type}" 

89 ) 

90 else: 

91 status.status = "temporarily_failed" 

92 status.retry_after_timestamp = ( 

93 failure.created_at + failure.retry_after 

94 ) 

95 status.failure_type = failure.error_type 

96 status.failure_message = failure.message 

97 logger.info( 

98 f"[STATUS_TRACKER] Marked resource {resource_id} as temporarily failed: {failure.error_type}, retry after: {failure.retry_after}" 

99 ) 

100 

101 # Update retry statistics 

102 # Ensure total_retry_count is initialized (handle None from legacy data) 

103 if status.total_retry_count is None: 

104 status.total_retry_count = 0 

105 status.total_retry_count += 1 

106 status.last_attempt_at = datetime.now(UTC) 

107 

108 # Check if retry is today 

109 today = datetime.now(UTC).date() 

110 last_attempt = ( 

111 status.last_attempt_at.date() 

112 if status.last_attempt_at 

113 else None 

114 ) 

115 if last_attempt == today: 

116 # Ensure today_retry_count is initialized (handle None from legacy data) 

117 if status.today_retry_count is None: 

118 status.today_retry_count = 0 

119 status.today_retry_count += 1 

120 else: 

121 status.today_retry_count = 1 

122 

123 # Only commit if we created the session 

124 if should_close: 

125 session.commit() 

126 logger.debug( 

127 f"[STATUS_TRACKER] Updated failure status for resource {resource_id}" 

128 ) 

129 finally: 

130 if should_close: 

131 session.close() 

132 

133 def mark_success( 

134 self, resource_id: int, session: Optional[Session] = None 

135 ) -> None: 

136 """ 

137 Mark a resource as successfully downloaded. 

138 

139 Args: 

140 resource_id: Resource identifier 

141 session: Optional existing database session to reuse 

142 """ 

143 # Use provided session or create new one 

144 should_close = session is None 

145 if session is None: 

146 session = self._get_session() 

147 

148 try: 

149 status = ( 

150 session.query(ResourceDownloadStatus) 

151 .filter_by(resource_id=resource_id) 

152 .first() 

153 ) 

154 

155 if status: 

156 status.status = "completed" 

157 status.failure_type = None 

158 status.failure_message = None 

159 status.retry_after_timestamp = None 

160 status.updated_at = datetime.now(UTC) 

161 logger.info( 

162 f"[STATUS_TRACKER] Marked resource {resource_id} as successfully completed" 

163 ) 

164 

165 # Only commit if we created the session 

166 if should_close: 

167 session.commit() 

168 finally: 

169 if should_close: 

170 session.close() 

171 

172 def can_retry(self, resource_id: int) -> tuple[bool, Optional[str]]: 

173 """ 

174 Check if a resource can be retried right now. 

175 

176 Args: 

177 resource_id: Resource identifier 

178 

179 Returns: 

180 Tuple of (can_retry, reason_if_not) 

181 """ 

182 with self._get_session() as session: 

183 status = ( 

184 session.query(ResourceDownloadStatus) 

185 .filter_by(resource_id=resource_id) 

186 .first() 

187 ) 

188 

189 if not status: 

190 # No status record, can retry 

191 return True, None 

192 

193 # Check if permanently failed 

194 if status.status == "permanently_failed": 

195 return ( 

196 False, 

197 f"Permanently failed: {status.failure_message or status.failure_type}", 

198 ) 

199 

200 # Check if temporarily failed and cooldown not expired 

201 if ( 

202 status.status == "temporarily_failed" 

203 and status.retry_after_timestamp 

204 ): 

205 # Ensure retry_after_timestamp is timezone-aware (handle legacy data) 

206 retry_timestamp = status.retry_after_timestamp 

207 if retry_timestamp.tzinfo is None: 

208 # Assume UTC for naive timestamps 

209 retry_timestamp = retry_timestamp.replace(tzinfo=UTC) 

210 

211 if datetime.now(UTC) < retry_timestamp: 

212 return ( 

213 False, 

214 f"Cooldown active, retry available at {retry_timestamp.strftime('%Y-%m-%d %H:%M:%S')}", 

215 ) 

216 

217 # Check daily retry limit (max 3 retries per day) 

218 if status.today_retry_count >= 3: 

219 return ( 

220 False, 

221 f"Daily retry limit exceeded ({status.today_retry_count}/3). Retry available tomorrow.", 

222 ) 

223 

224 # Can retry 

225 return True, None 

226 

227 def get_resource_status(self, resource_id: int) -> Optional[Dict[str, Any]]: 

228 """ 

229 Get the current status of a resource. 

230 

231 Args: 

232 resource_id: Resource identifier 

233 

234 Returns: 

235 Status information dictionary or None if not found 

236 """ 

237 with self._get_session() as session: 

238 status = ( 

239 session.query(ResourceDownloadStatus) 

240 .filter_by(resource_id=resource_id) 

241 .first() 

242 ) 

243 

244 if not status: 

245 return None 

246 

247 return { 

248 "resource_id": status.resource_id, 

249 "status": status.status, 

250 "failure_type": status.failure_type, 

251 "failure_message": status.failure_message, 

252 "retry_after_timestamp": status.retry_after_timestamp.isoformat() 

253 if status.retry_after_timestamp 

254 else None, 

255 "last_attempt_at": status.last_attempt_at.isoformat() 

256 if status.last_attempt_at 

257 else None, 

258 "total_retry_count": status.total_retry_count, 

259 "today_retry_count": status.today_retry_count, 

260 "created_at": status.created_at.isoformat(), 

261 "updated_at": status.updated_at.isoformat(), 

262 } 

263 

264 def get_failed_resources_count(self) -> Dict[str, int]: 

265 """ 

266 Get counts of resources by failure type. 

267 

268 Returns: 

269 Dictionary mapping failure types to counts 

270 """ 

271 with self._get_session() as session: 

272 failed_resources = ( 

273 session.query(ResourceDownloadStatus) 

274 .filter( 

275 ResourceDownloadStatus.status.in_( 

276 ["temporarily_failed", "permanently_failed"] 

277 ) 

278 ) 

279 .all() 

280 ) 

281 

282 counts = {} 

283 for resource in failed_resources: 

284 failure_type = resource.failure_type or "unknown" 

285 counts[failure_type] = counts.get(failure_type, 0) + 1 

286 

287 return counts 

288 

289 def clear_permanent_failures(self, older_than_days: int = 30) -> int: 

290 """ 

291 Clear permanent failure statuses for old records. 

292 

293 Args: 

294 older_than_days: Clear failures older than this many days 

295 

296 Returns: 

297 Number of records cleared 

298 """ 

299 cutoff_date = datetime.now(UTC) - timedelta(days=older_than_days) 

300 

301 with self._get_session() as session: 

302 old_failures = ( 

303 session.query(ResourceDownloadStatus) 

304 .filter( 

305 ResourceDownloadStatus.status == "permanently_failed", 

306 ResourceDownloadStatus.created_at < cutoff_date, 

307 ) 

308 .all() 

309 ) 

310 

311 count = len(old_failures) 

312 for failure in old_failures: 

313 failure.status = "available" 

314 failure.failure_type = None 

315 failure.failure_message = None 

316 failure.retry_after_timestamp = None 

317 failure.permanent_failure_at = None 

318 failure.total_retry_count = 0 

319 failure.today_retry_count = 0 

320 failure.updated_at = datetime.now(UTC) 

321 

322 session.commit() 

323 logger.info( 

324 f"[STATUS_TRACKER] Cleared {count} old permanent failure records" 

325 ) 

326 return count