Coverage for src / local_deep_research / library / download_management / retry_manager.py: 41%

99 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Retry Manager 

3 

4Core retry logic and cooldown management for download attempts. 

5Prevents endless retry loops and implements intelligent retry strategies. 

6""" 

7 

8from dataclasses import dataclass 

9from datetime import datetime, timedelta, UTC 

10from typing import List, Optional, Tuple 

11 

12from loguru import logger 

13from .failure_classifier import FailureClassifier 

14from .status_tracker import ResourceStatusTracker 

15 

16 

17@dataclass 

18class RetryDecision: 

19 """Decision about whether to retry a resource""" 

20 

21 can_retry: bool 

22 reason: Optional[str] = None 

23 estimated_wait_time: Optional[timedelta] = None 

24 

25 

26class ResourceFilterResult: 

27 """Result of filtering a resource""" 

28 

29 def __init__( 

30 self, 

31 resource_id: int, 

32 can_retry: bool, 

33 status: str, 

34 reason: str = "", 

35 estimated_wait: Optional[timedelta] = None, 

36 ): 

37 self.resource_id = resource_id 

38 self.can_retry = can_retry 

39 self.status = status 

40 self.reason = reason 

41 self.estimated_wait = estimated_wait 

42 

43 

44class FilterSummary: 

45 """Summary of filtering results""" 

46 

47 def __init__(self): 

48 self.total_count = 0 

49 self.downloadable_count = 0 

50 self.permanently_failed_count = 0 

51 self.temporarily_failed_count = 0 

52 self.available_count = 0 

53 self.failure_type_counts = {} 

54 

55 def add_result(self, result: ResourceFilterResult): 

56 """Add a filtering result to the summary""" 

57 self.total_count += 1 

58 

59 if result.can_retry: 

60 self.downloadable_count += 1 

61 elif result.status == "permanently_failed": 

62 self.permanently_failed_count += 1 

63 elif result.status == "temporarily_failed": 63 ↛ 66line 63 didn't jump to line 66 because the condition on line 63 was always true

64 self.temporarily_failed_count += 1 

65 else: 

66 self.available_count += 1 

67 

68 def to_dict(self) -> dict: 

69 """Convert to dictionary""" 

70 return { 

71 "total_count": self.total_count, 

72 "downloadable_count": self.downloadable_count, 

73 "permanently_failed_count": self.permanently_failed_count, 

74 "temporarily_failed_count": self.temporarily_failed_count, 

75 "available_count": self.available_count, 

76 "failure_type_counts": self.failure_type_counts, 

77 } 

78 

79 

80class RetryManager: 

81 """Manage retry logic and prevent endless loops""" 

82 

83 def __init__(self, username: str, password: Optional[str] = None): 

84 """ 

85 Initialize the retry manager. 

86 

87 Args: 

88 username: Username for database access 

89 password: Optional password for encrypted database 

90 """ 

91 self.username = username 

92 self.failure_classifier = FailureClassifier() 

93 self.status_tracker = ResourceStatusTracker(username, password) 

94 

95 logger.info(f"Initialized for user: {username}") 

96 

97 def should_retry_resource(self, resource_id: int) -> RetryDecision: 

98 """ 

99 Determine if a resource should be retried based on history. 

100 

101 Args: 

102 resource_id: Resource identifier 

103 

104 Returns: 

105 RetryDecision with can_retry flag and reasoning 

106 """ 

107 can_retry, reason = self.status_tracker.can_retry(resource_id) 

108 return RetryDecision(can_retry=can_retry, reason=reason) 

109 

110 def record_attempt( 

111 self, 

112 resource_id: int, 

113 result: Tuple[bool, Optional[str]], 

114 status_code: Optional[int] = None, 

115 url: str = "", 

116 details: str = "", 

117 session=None, 

118 ) -> None: 

119 """ 

120 Record a download attempt result. 

121 

122 Args: 

123 resource_id: Resource identifier 

124 result: Tuple of (success, error_message) 

125 status_code: HTTP status code if available 

126 url: URL that was attempted 

127 details: Additional error details 

128 session: Optional database session to reuse 

129 """ 

130 success, error_message = result 

131 

132 if success: 

133 # Successful download 

134 self.status_tracker.mark_success(resource_id, session=session) 

135 logger.info(f"Resource {resource_id} marked as successful") 

136 else: 

137 # Failed download - classify the failure 

138 failure = self.failure_classifier.classify_failure( 

139 error_type=type(error_message).__name__ 

140 if error_message 

141 else "unknown", 

142 status_code=status_code, 

143 url=url, 

144 details=details or (error_message or "Unknown error"), 

145 ) 

146 

147 self.status_tracker.mark_failure( 

148 resource_id, failure, session=session 

149 ) 

150 logger.info( 

151 f"Resource {resource_id} marked as failed: {failure.error_type}" 

152 ) 

153 

154 def filter_resources(self, resources: List) -> List[ResourceFilterResult]: 

155 """ 

156 Filter resources based on their retry eligibility. 

157 

158 Args: 

159 resources: List of resources to filter 

160 

161 Returns: 

162 List of ResourceFilterResult objects 

163 """ 

164 results = [] 

165 

166 for resource in resources: 

167 if not hasattr(resource, "id"): 

168 # Skip resources without ID 

169 continue 

170 

171 can_retry, reason = self.status_tracker.can_retry(resource.id) 

172 status = self._get_resource_status(can_retry, reason) 

173 

174 # Get estimated wait time if temporarily failed 

175 estimated_wait = None 

176 if not can_retry and reason and "cooldown" in reason.lower(): 

177 try: 

178 status_info = self.status_tracker.get_resource_status( 

179 resource.id 

180 ) 

181 if status_info and status_info["retry_after_timestamp"]: 

182 retry_time = datetime.fromisoformat( 

183 status_info["retry_after_timestamp"] 

184 ) 

185 estimated_wait = retry_time - datetime.now(UTC) 

186 except Exception as e: 

187 logger.debug(f"Error calculating wait time: {e}") 

188 

189 result = ResourceFilterResult( 

190 resource_id=resource.id, 

191 can_retry=can_retry, 

192 status=status, 

193 reason=reason or "", 

194 estimated_wait=estimated_wait, 

195 ) 

196 results.append(result) 

197 

198 logger.info( 

199 f"Filtered {len(results)} resources: " 

200 f"{sum(1 for r in results if r.can_retry)} downloadable, " 

201 f"{sum(1 for r in results if r.status == 'permanently_failed')} permanently failed" 

202 ) 

203 

204 return results 

205 

206 def get_filter_summary( 

207 self, results: List[ResourceFilterResult] 

208 ) -> FilterSummary: 

209 """ 

210 Generate a summary of filtering results. 

211 

212 Args: 

213 results: List of ResourceFilterResult objects 

214 

215 Returns: 

216 FilterSummary object with counts 

217 """ 

218 summary = FilterSummary() 

219 for result in results: 

220 summary.add_result(result) 

221 return summary 

222 

223 def _get_resource_status( 

224 self, can_retry: bool, reason: Optional[str] 

225 ) -> str: 

226 """Get status string based on retry decision""" 

227 if not can_retry: 

228 if reason and "permanently failed" in reason: 

229 return "permanently_failed" 

230 elif reason and "cooldown" in reason: 

231 return "temporarily_failed" 

232 else: 

233 return "unavailable" 

234 return "available" 

235 

236 def get_retry_statistics(self) -> dict: 

237 """ 

238 Get retry statistics for monitoring. 

239 

240 Returns: 

241 Dictionary with retry statistics 

242 """ 

243 failure_counts = self.status_tracker.get_failed_resources_count() 

244 

245 return { 

246 "total_permanent_failures": sum( 

247 count for count in failure_counts.values() 

248 ), 

249 "total_temporary_failures": sum( 

250 count for count in failure_counts.values() 

251 ), 

252 "failure_type_breakdown": failure_counts, 

253 } 

254 

255 def reset_daily_retry_counters(self) -> int: 

256 """ 

257 Reset daily retry counters (call this at midnight). 

258 

259 Returns: 

260 Number of resources that had their daily counter reset 

261 """ 

262 with self.status_tracker._get_session() as session: 

263 # Reset all today_retry_count to 0 

264 from .models import ResourceDownloadStatus 

265 

266 result = session.query(ResourceDownloadStatus).update( 

267 {"today_retry_count": 0, "updated_at": datetime.now(UTC)} 

268 ) 

269 session.commit() 

270 logger.info(f"Reset daily retry counters for {result} resources") 

271 return result 

272 

273 def clear_old_permanent_failures(self, days: int = 30) -> int: 

274 """ 

275 Clear old permanent failure records. 

276 

277 Args: 

278 days: Clear failures older than this many days 

279 

280 Returns: 

281 Number of records cleared 

282 """ 

283 return self.status_tracker.clear_permanent_failures(days)