Coverage for src / local_deep_research / news / subscription_manager / base_subscription.py: 26%
86 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Base class for all subscription types.
3Following LDR's pattern from BaseSearchStrategy.
4"""
6from abc import ABC, abstractmethod
7from datetime import datetime, timedelta
8from typing import Dict, Any, Optional
9from loguru import logger
11from ..core.utils import generate_subscription_id, utc_now
12from ..core.base_card import CardSource
13from .storage import SQLSubscriptionStorage
16class BaseSubscription(ABC):
17 """Abstract base class for all subscription types."""
19 def __init__(
20 self,
21 user_id: str,
22 source: CardSource,
23 query_or_topic: str,
24 refresh_interval_minutes: int = 240, # Default 4 hours
25 subscription_id: Optional[str] = None,
26 ):
27 """
28 Initialize base subscription.
30 Args:
31 user_id: ID of the user who owns this subscription
32 source: Source information for tracking origin
33 query_or_topic: The query or topic to subscribe to
34 refresh_interval_minutes: How often to check for updates in minutes
35 subscription_id: Optional ID, will generate if not provided
36 """
37 self.storage = SQLSubscriptionStorage()
38 self.id = subscription_id or generate_subscription_id()
39 self.user_id = user_id
40 self.source = source
41 self.query_or_topic = query_or_topic
42 self.refresh_interval_minutes = refresh_interval_minutes
44 # Timestamps
45 self.created_at = utc_now()
46 self.last_refreshed = None
47 self.next_refresh = self._calculate_next_refresh()
49 # Status
50 self.is_active = True
51 self.refresh_count = 0
52 self.error_count = 0
53 self.last_error = None
55 # Metadata
56 self.metadata: Dict[str, Any] = {}
58 # Subscription type (to be set by subclasses)
59 self.subscription_type = None
61 def _calculate_next_refresh(self) -> datetime:
62 """Calculate when this subscription should next be refreshed."""
63 if self.last_refreshed is None:
64 # For new subscriptions, next refresh is created_at + interval
65 return self.created_at + timedelta(
66 minutes=self.refresh_interval_minutes
67 )
68 return self.last_refreshed + timedelta(
69 minutes=self.refresh_interval_minutes
70 )
72 def should_refresh(self) -> bool:
73 """
74 Check if this subscription needs to be refreshed.
76 Returns:
77 bool: True if refresh is needed
78 """
79 if not self.is_active:
80 return False
82 return utc_now() >= self.next_refresh
84 def is_due_for_refresh(self) -> bool:
85 """Alias for should_refresh for backward compatibility."""
86 return self.should_refresh()
88 @abstractmethod
89 def generate_search_query(self) -> str:
90 """
91 Generate the search query for this subscription.
92 Must be implemented by subclasses.
94 Returns:
95 str: The search query to execute
96 """
97 pass
99 @abstractmethod
100 def get_subscription_type(self) -> str:
101 """
102 Get the type of this subscription.
104 Returns:
105 str: Subscription type identifier
106 """
107 pass
109 def on_refresh_start(self) -> None:
110 """Called when a refresh begins."""
111 logger.debug(f"Starting refresh for subscription {self.id}")
112 self.last_refreshed = utc_now()
114 def on_refresh_success(self, results: Any) -> None:
115 """
116 Called when a refresh completes successfully.
118 Args:
119 results: The results from the refresh
120 """
121 self.refresh_count += 1
122 self.next_refresh = self._calculate_next_refresh()
123 self.error_count = 0 # Reset error count on success
124 logger.debug(f"Subscription {self.id} refreshed successfully")
126 def on_refresh_error(self, error: Exception) -> None:
127 """
128 Called when a refresh fails.
130 Args:
131 error: The exception that occurred
132 """
133 self.error_count += 1
134 self.last_error = str(error)
136 # Exponential backoff for errors
137 backoff_minutes = min(
138 self.refresh_interval_minutes * (2**self.error_count),
139 24 * 60 * 7, # Max 1 week in minutes
140 )
141 self.next_refresh = utc_now() + timedelta(minutes=backoff_minutes)
143 logger.error(f"Subscription {self.id} refresh failed: {error}")
145 # Disable after too many errors
146 if self.error_count >= 10:
147 self.is_active = False
148 logger.warning(f"Subscription {self.id} disabled after 10 errors")
150 def pause(self) -> None:
151 """Pause this subscription."""
152 self.is_active = False
153 logger.info(f"Subscription {self.id} paused")
155 def resume(self) -> None:
156 """Resume this subscription."""
157 self.is_active = True
158 self.error_count = 0 # Reset errors on resume
159 self.next_refresh = self._calculate_next_refresh()
160 logger.info(f"Subscription {self.id} resumed")
162 def update_interval(self, new_interval_minutes: int) -> None:
163 """
164 Update the refresh interval.
166 Args:
167 new_interval_minutes: New interval in minutes
168 """
169 if new_interval_minutes < 60:
170 raise ValueError(
171 "Refresh interval must be at least 60 minutes (1 hour)"
172 )
173 if new_interval_minutes > 60 * 24 * 30:
174 raise ValueError("Refresh interval cannot exceed 30 days")
176 self.refresh_interval_minutes = new_interval_minutes
177 self.next_refresh = self._calculate_next_refresh()
179 logger.info(
180 f"Subscription {self.id} interval updated to {new_interval_minutes} minutes"
181 )
183 def save(self) -> str:
184 """
185 Save subscription to database.
187 Returns:
188 str: The subscription ID
189 """
190 data = {
191 "id": self.id,
192 "user_id": self.user_id,
193 "subscription_type": self.get_subscription_type(),
194 "query_or_topic": self.query_or_topic,
195 "refresh_interval_minutes": self.refresh_interval_minutes,
196 "source_type": self.source.type,
197 "source_id": self.source.source_id,
198 "created_from": self.source.created_from,
199 "is_active": self.is_active,
200 "metadata": self.metadata,
201 "next_refresh": self.next_refresh,
202 "created_at": self.created_at,
203 }
204 return self.storage.create(data)
206 def mark_refreshed(self, results_count: int) -> None:
207 """
208 Update subscription after successful refresh.
210 Args:
211 results_count: Number of results found in this refresh
212 """
213 now = utc_now()
214 self.last_refreshed = now
215 self.next_refresh = self._calculate_next_refresh()
216 self.refresh_count += 1
218 # Update in database
219 self.storage.update_refresh_time(self.id, now, self.next_refresh)
220 self.storage.increment_stats(self.id, results_count)
222 logger.debug(
223 f"Subscription {self.id} marked as refreshed with {results_count} results"
224 )
226 def to_dict(self) -> Dict[str, Any]:
227 """Convert subscription to dictionary representation."""
228 return {
229 "id": self.id,
230 "type": self.get_subscription_type(),
231 "user_id": self.user_id,
232 "query_or_topic": self.query_or_topic,
233 "source": {
234 "type": self.source.type,
235 "source_id": self.source.source_id,
236 "created_from": self.source.created_from,
237 "metadata": self.source.metadata,
238 },
239 "created_at": self.created_at.isoformat(),
240 "last_refreshed": self.last_refreshed.isoformat()
241 if self.last_refreshed
242 else None,
243 "next_refresh": self.next_refresh.isoformat(),
244 "refresh_interval_minutes": self.refresh_interval_minutes,
245 "is_active": self.is_active,
246 "refresh_count": self.refresh_count,
247 "error_count": self.error_count,
248 "last_error": self.last_error,
249 "metadata": self.metadata,
250 }