Coverage for src / local_deep_research / news / subscription_manager / base_subscription.py: 93%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Base class for all subscription types.
3Following LDR's pattern from BaseSearchStrategy.
4"""
6from abc import ABC, abstractmethod
7from datetime import datetime, timedelta
8from typing import Dict, Any, Optional
9from loguru import logger
11from ..core.utils import generate_subscription_id, utc_now
12from ..core.base_card import CardSource
13from .storage import SQLSubscriptionStorage
16class BaseSubscription(ABC):
17 """Abstract base class for all subscription types."""
19 def __init__(
20 self,
21 user_id: str,
22 source: CardSource,
23 query_or_topic: str,
24 refresh_interval_minutes: int = 240, # Default 4 hours
25 subscription_id: Optional[str] = None,
26 storage: Optional[SQLSubscriptionStorage] = None,
27 ):
28 """
29 Initialize base subscription.
31 Args:
32 user_id: ID of the user who owns this subscription
33 source: Source information for tracking origin
34 query_or_topic: The query or topic to subscribe to
35 refresh_interval_minutes: How often to check for updates in minutes
36 subscription_id: Optional ID, will generate if not provided
37 storage: Optional storage backend for persistence
38 """
39 if storage is not None: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true
40 self.storage = storage
41 else:
42 self.storage = SQLSubscriptionStorage(session=None)
43 self.id = subscription_id or generate_subscription_id()
44 self.user_id = user_id
45 self.source = source
46 self.query_or_topic = query_or_topic
47 self.refresh_interval_minutes = refresh_interval_minutes
49 # Timestamps
50 self.created_at = utc_now()
51 self.last_refreshed: Optional[datetime] = None
52 self.next_refresh = self._calculate_next_refresh()
54 # Status
55 self.is_active = True
56 self.refresh_count = 0
57 self.error_count = 0
58 self.last_error: Optional[str] = None
60 # Metadata
61 self.metadata: Dict[str, Any] = {}
63 # Subscription type (to be set by subclasses)
64 self.subscription_type: Optional[str] = None
66 def _calculate_next_refresh(self) -> datetime:
67 """Calculate when this subscription should next be refreshed."""
68 if self.last_refreshed is None:
69 # For new subscriptions, next refresh is created_at + interval
70 return self.created_at + timedelta(
71 minutes=self.refresh_interval_minutes
72 )
73 return self.last_refreshed + timedelta(
74 minutes=self.refresh_interval_minutes
75 )
77 def should_refresh(self) -> bool:
78 """
79 Check if this subscription needs to be refreshed.
81 Returns:
82 bool: True if refresh is needed
83 """
84 if not self.is_active:
85 return False
87 return utc_now() >= self.next_refresh
89 def is_due_for_refresh(self) -> bool:
90 """Alias for should_refresh for backward compatibility."""
91 return self.should_refresh()
93 @abstractmethod
94 def generate_search_query(self) -> str:
95 """
96 Generate the search query for this subscription.
97 Must be implemented by subclasses.
99 Returns:
100 str: The search query to execute
101 """
102 pass
104 @abstractmethod
105 def get_subscription_type(self) -> str:
106 """
107 Get the type of this subscription.
109 Returns:
110 str: Subscription type identifier
111 """
112 pass
114 def on_refresh_start(self) -> None:
115 """Called when a refresh begins."""
116 logger.debug(f"Starting refresh for subscription {self.id}")
117 self.last_refreshed = utc_now()
119 def on_refresh_success(self, results: Any) -> None:
120 """
121 Called when a refresh completes successfully.
123 Args:
124 results: The results from the refresh
125 """
126 self.refresh_count += 1
127 self.next_refresh = self._calculate_next_refresh()
128 self.error_count = 0 # Reset error count on success
129 logger.debug(f"Subscription {self.id} refreshed successfully")
131 def on_refresh_error(self, error: Exception) -> None:
132 """
133 Called when a refresh fails.
135 Args:
136 error: The exception that occurred
137 """
138 self.error_count += 1
139 self.last_error = str(error)
141 # Exponential backoff for errors
142 backoff_minutes = min(
143 self.refresh_interval_minutes * (2**self.error_count),
144 24 * 60 * 7, # Max 1 week in minutes
145 )
146 self.next_refresh = utc_now() + timedelta(minutes=backoff_minutes)
148 logger.error(f"Subscription {self.id} refresh failed: {error}")
150 # Disable after too many errors
151 if self.error_count >= 10:
152 self.is_active = False
153 logger.warning(f"Subscription {self.id} disabled after 10 errors")
155 def pause(self) -> None:
156 """Pause this subscription."""
157 self.is_active = False
158 logger.info(f"Subscription {self.id} paused")
160 def resume(self) -> None:
161 """Resume this subscription."""
162 self.is_active = True
163 self.error_count = 0 # Reset errors on resume
164 self.next_refresh = self._calculate_next_refresh()
165 logger.info(f"Subscription {self.id} resumed")
167 def update_interval(self, new_interval_minutes: int) -> None:
168 """
169 Update the refresh interval.
171 Args:
172 new_interval_minutes: New interval in minutes
173 """
174 if new_interval_minutes < 60:
175 raise ValueError(
176 "Refresh interval must be at least 60 minutes (1 hour)"
177 )
178 if new_interval_minutes > 60 * 24 * 30:
179 raise ValueError("Refresh interval cannot exceed 30 days")
181 self.refresh_interval_minutes = new_interval_minutes
182 self.next_refresh = self._calculate_next_refresh()
184 logger.info(
185 f"Subscription {self.id} interval updated to {new_interval_minutes} minutes"
186 )
188 def save(self) -> str:
189 """
190 Save subscription to database.
192 Returns:
193 str: The subscription ID
194 """
195 if self.storage is None: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 raise RuntimeError("Storage must be set before calling save()")
197 data = {
198 "id": self.id,
199 "user_id": self.user_id,
200 "subscription_type": self.get_subscription_type(),
201 "query_or_topic": self.query_or_topic,
202 "refresh_interval_minutes": self.refresh_interval_minutes,
203 "source_type": self.source.type,
204 "source_id": self.source.source_id,
205 "created_from": self.source.created_from,
206 "is_active": self.is_active,
207 "metadata": self.metadata,
208 "next_refresh": self.next_refresh,
209 "created_at": self.created_at,
210 }
211 return self.storage.create(data)
213 def mark_refreshed(self, results_count: int) -> None:
214 """
215 Update subscription after successful refresh.
217 Args:
218 results_count: Number of results found in this refresh
219 """
220 if self.storage is None: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 raise RuntimeError(
222 "Storage must be set before calling mark_refreshed()"
223 )
224 now = utc_now()
225 self.last_refreshed = now
226 self.next_refresh = self._calculate_next_refresh()
227 self.refresh_count += 1
229 # Update in database
230 self.storage.update_refresh_time(self.id, now, self.next_refresh)
231 self.storage.increment_stats(self.id, results_count)
233 logger.debug(
234 f"Subscription {self.id} marked as refreshed with {results_count} results"
235 )
237 def to_dict(self) -> Dict[str, Any]:
238 """Convert subscription to dictionary representation."""
239 return {
240 "id": self.id,
241 "type": self.get_subscription_type(),
242 "user_id": self.user_id,
243 "query_or_topic": self.query_or_topic,
244 "source": {
245 "type": self.source.type,
246 "source_id": self.source.source_id,
247 "created_from": self.source.created_from,
248 "metadata": self.source.metadata,
249 },
250 "created_at": self.created_at.isoformat(),
251 "last_refreshed": self.last_refreshed.isoformat()
252 if self.last_refreshed
253 else None,
254 "next_refresh": self.next_refresh.isoformat(),
255 "refresh_interval_minutes": self.refresh_interval_minutes,
256 "is_active": self.is_active,
257 "refresh_count": self.refresh_count,
258 "error_count": self.error_count,
259 "last_error": self.last_error,
260 "metadata": self.metadata,
261 }