Coverage for src / local_deep_research / library / download_management / retry_manager.py: 41%
99 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Retry Manager
4Core retry logic and cooldown management for download attempts.
5Prevents endless retry loops and implements intelligent retry strategies.
6"""
8from dataclasses import dataclass
9from datetime import datetime, timedelta, UTC
10from typing import List, Optional, Tuple
12from loguru import logger
13from .failure_classifier import FailureClassifier
14from .status_tracker import ResourceStatusTracker
17@dataclass
18class RetryDecision:
19 """Decision about whether to retry a resource"""
21 can_retry: bool
22 reason: Optional[str] = None
23 estimated_wait_time: Optional[timedelta] = None
26class ResourceFilterResult:
27 """Result of filtering a resource"""
29 def __init__(
30 self,
31 resource_id: int,
32 can_retry: bool,
33 status: str,
34 reason: str = "",
35 estimated_wait: Optional[timedelta] = None,
36 ):
37 self.resource_id = resource_id
38 self.can_retry = can_retry
39 self.status = status
40 self.reason = reason
41 self.estimated_wait = estimated_wait
44class FilterSummary:
45 """Summary of filtering results"""
47 def __init__(self):
48 self.total_count = 0
49 self.downloadable_count = 0
50 self.permanently_failed_count = 0
51 self.temporarily_failed_count = 0
52 self.available_count = 0
53 self.failure_type_counts = {}
55 def add_result(self, result: ResourceFilterResult):
56 """Add a filtering result to the summary"""
57 self.total_count += 1
59 if result.can_retry:
60 self.downloadable_count += 1
61 elif result.status == "permanently_failed":
62 self.permanently_failed_count += 1
63 elif result.status == "temporarily_failed": 63 ↛ 66line 63 didn't jump to line 66 because the condition on line 63 was always true
64 self.temporarily_failed_count += 1
65 else:
66 self.available_count += 1
68 def to_dict(self) -> dict:
69 """Convert to dictionary"""
70 return {
71 "total_count": self.total_count,
72 "downloadable_count": self.downloadable_count,
73 "permanently_failed_count": self.permanently_failed_count,
74 "temporarily_failed_count": self.temporarily_failed_count,
75 "available_count": self.available_count,
76 "failure_type_counts": self.failure_type_counts,
77 }
80class RetryManager:
81 """Manage retry logic and prevent endless loops"""
83 def __init__(self, username: str, password: Optional[str] = None):
84 """
85 Initialize the retry manager.
87 Args:
88 username: Username for database access
89 password: Optional password for encrypted database
90 """
91 self.username = username
92 self.failure_classifier = FailureClassifier()
93 self.status_tracker = ResourceStatusTracker(username, password)
95 logger.info(f"Initialized for user: {username}")
97 def should_retry_resource(self, resource_id: int) -> RetryDecision:
98 """
99 Determine if a resource should be retried based on history.
101 Args:
102 resource_id: Resource identifier
104 Returns:
105 RetryDecision with can_retry flag and reasoning
106 """
107 can_retry, reason = self.status_tracker.can_retry(resource_id)
108 return RetryDecision(can_retry=can_retry, reason=reason)
110 def record_attempt(
111 self,
112 resource_id: int,
113 result: Tuple[bool, Optional[str]],
114 status_code: Optional[int] = None,
115 url: str = "",
116 details: str = "",
117 session=None,
118 ) -> None:
119 """
120 Record a download attempt result.
122 Args:
123 resource_id: Resource identifier
124 result: Tuple of (success, error_message)
125 status_code: HTTP status code if available
126 url: URL that was attempted
127 details: Additional error details
128 session: Optional database session to reuse
129 """
130 success, error_message = result
132 if success:
133 # Successful download
134 self.status_tracker.mark_success(resource_id, session=session)
135 logger.info(f"Resource {resource_id} marked as successful")
136 else:
137 # Failed download - classify the failure
138 failure = self.failure_classifier.classify_failure(
139 error_type=type(error_message).__name__
140 if error_message
141 else "unknown",
142 status_code=status_code,
143 url=url,
144 details=details or (error_message or "Unknown error"),
145 )
147 self.status_tracker.mark_failure(
148 resource_id, failure, session=session
149 )
150 logger.info(
151 f"Resource {resource_id} marked as failed: {failure.error_type}"
152 )
154 def filter_resources(self, resources: List) -> List[ResourceFilterResult]:
155 """
156 Filter resources based on their retry eligibility.
158 Args:
159 resources: List of resources to filter
161 Returns:
162 List of ResourceFilterResult objects
163 """
164 results = []
166 for resource in resources:
167 if not hasattr(resource, "id"):
168 # Skip resources without ID
169 continue
171 can_retry, reason = self.status_tracker.can_retry(resource.id)
172 status = self._get_resource_status(can_retry, reason)
174 # Get estimated wait time if temporarily failed
175 estimated_wait = None
176 if not can_retry and reason and "cooldown" in reason.lower():
177 try:
178 status_info = self.status_tracker.get_resource_status(
179 resource.id
180 )
181 if status_info and status_info["retry_after_timestamp"]:
182 retry_time = datetime.fromisoformat(
183 status_info["retry_after_timestamp"]
184 )
185 estimated_wait = retry_time - datetime.now(UTC)
186 except Exception as e:
187 logger.debug(f"Error calculating wait time: {e}")
189 result = ResourceFilterResult(
190 resource_id=resource.id,
191 can_retry=can_retry,
192 status=status,
193 reason=reason or "",
194 estimated_wait=estimated_wait,
195 )
196 results.append(result)
198 logger.info(
199 f"Filtered {len(results)} resources: "
200 f"{sum(1 for r in results if r.can_retry)} downloadable, "
201 f"{sum(1 for r in results if r.status == 'permanently_failed')} permanently failed"
202 )
204 return results
206 def get_filter_summary(
207 self, results: List[ResourceFilterResult]
208 ) -> FilterSummary:
209 """
210 Generate a summary of filtering results.
212 Args:
213 results: List of ResourceFilterResult objects
215 Returns:
216 FilterSummary object with counts
217 """
218 summary = FilterSummary()
219 for result in results:
220 summary.add_result(result)
221 return summary
223 def _get_resource_status(
224 self, can_retry: bool, reason: Optional[str]
225 ) -> str:
226 """Get status string based on retry decision"""
227 if not can_retry:
228 if reason and "permanently failed" in reason:
229 return "permanently_failed"
230 elif reason and "cooldown" in reason:
231 return "temporarily_failed"
232 else:
233 return "unavailable"
234 return "available"
236 def get_retry_statistics(self) -> dict:
237 """
238 Get retry statistics for monitoring.
240 Returns:
241 Dictionary with retry statistics
242 """
243 failure_counts = self.status_tracker.get_failed_resources_count()
245 return {
246 "total_permanent_failures": sum(
247 count for count in failure_counts.values()
248 ),
249 "total_temporary_failures": sum(
250 count for count in failure_counts.values()
251 ),
252 "failure_type_breakdown": failure_counts,
253 }
255 def reset_daily_retry_counters(self) -> int:
256 """
257 Reset daily retry counters (call this at midnight).
259 Returns:
260 Number of resources that had their daily counter reset
261 """
262 with self.status_tracker._get_session() as session:
263 # Reset all today_retry_count to 0
264 from .models import ResourceDownloadStatus
266 result = session.query(ResourceDownloadStatus).update(
267 {"today_retry_count": 0, "updated_at": datetime.now(UTC)}
268 )
269 session.commit()
270 logger.info(f"Reset daily retry counters for {result} resources")
271 return result
273 def clear_old_permanent_failures(self, days: int = 30) -> int:
274 """
275 Clear old permanent failure records.
277 Args:
278 days: Clear failures older than this many days
280 Returns:
281 Number of records cleared
282 """
283 return self.status_tracker.clear_permanent_failures(days)