Coverage for src / local_deep_research / news / subscription_manager / base_subscription.py: 26%

86 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Base class for all subscription types. 

3Following LDR's pattern from BaseSearchStrategy. 

4""" 

5 

6from abc import ABC, abstractmethod 

7from datetime import datetime, timedelta 

8from typing import Dict, Any, Optional 

9from loguru import logger 

10 

11from ..core.utils import generate_subscription_id, utc_now 

12from ..core.base_card import CardSource 

13from .storage import SQLSubscriptionStorage 

14 

15 

16class BaseSubscription(ABC): 

17 """Abstract base class for all subscription types.""" 

18 

19 def __init__( 

20 self, 

21 user_id: str, 

22 source: CardSource, 

23 query_or_topic: str, 

24 refresh_interval_minutes: int = 240, # Default 4 hours 

25 subscription_id: Optional[str] = None, 

26 ): 

27 """ 

28 Initialize base subscription. 

29 

30 Args: 

31 user_id: ID of the user who owns this subscription 

32 source: Source information for tracking origin 

33 query_or_topic: The query or topic to subscribe to 

34 refresh_interval_minutes: How often to check for updates in minutes 

35 subscription_id: Optional ID, will generate if not provided 

36 """ 

37 self.storage = SQLSubscriptionStorage() 

38 self.id = subscription_id or generate_subscription_id() 

39 self.user_id = user_id 

40 self.source = source 

41 self.query_or_topic = query_or_topic 

42 self.refresh_interval_minutes = refresh_interval_minutes 

43 

44 # Timestamps 

45 self.created_at = utc_now() 

46 self.last_refreshed = None 

47 self.next_refresh = self._calculate_next_refresh() 

48 

49 # Status 

50 self.is_active = True 

51 self.refresh_count = 0 

52 self.error_count = 0 

53 self.last_error = None 

54 

55 # Metadata 

56 self.metadata: Dict[str, Any] = {} 

57 

58 # Subscription type (to be set by subclasses) 

59 self.subscription_type = None 

60 

61 def _calculate_next_refresh(self) -> datetime: 

62 """Calculate when this subscription should next be refreshed.""" 

63 if self.last_refreshed is None: 

64 # For new subscriptions, next refresh is created_at + interval 

65 return self.created_at + timedelta( 

66 minutes=self.refresh_interval_minutes 

67 ) 

68 return self.last_refreshed + timedelta( 

69 minutes=self.refresh_interval_minutes 

70 ) 

71 

72 def should_refresh(self) -> bool: 

73 """ 

74 Check if this subscription needs to be refreshed. 

75 

76 Returns: 

77 bool: True if refresh is needed 

78 """ 

79 if not self.is_active: 

80 return False 

81 

82 return utc_now() >= self.next_refresh 

83 

84 def is_due_for_refresh(self) -> bool: 

85 """Alias for should_refresh for backward compatibility.""" 

86 return self.should_refresh() 

87 

88 @abstractmethod 

89 def generate_search_query(self) -> str: 

90 """ 

91 Generate the search query for this subscription. 

92 Must be implemented by subclasses. 

93 

94 Returns: 

95 str: The search query to execute 

96 """ 

97 pass 

98 

99 @abstractmethod 

100 def get_subscription_type(self) -> str: 

101 """ 

102 Get the type of this subscription. 

103 

104 Returns: 

105 str: Subscription type identifier 

106 """ 

107 pass 

108 

109 def on_refresh_start(self) -> None: 

110 """Called when a refresh begins.""" 

111 logger.debug(f"Starting refresh for subscription {self.id}") 

112 self.last_refreshed = utc_now() 

113 

114 def on_refresh_success(self, results: Any) -> None: 

115 """ 

116 Called when a refresh completes successfully. 

117 

118 Args: 

119 results: The results from the refresh 

120 """ 

121 self.refresh_count += 1 

122 self.next_refresh = self._calculate_next_refresh() 

123 self.error_count = 0 # Reset error count on success 

124 logger.debug(f"Subscription {self.id} refreshed successfully") 

125 

126 def on_refresh_error(self, error: Exception) -> None: 

127 """ 

128 Called when a refresh fails. 

129 

130 Args: 

131 error: The exception that occurred 

132 """ 

133 self.error_count += 1 

134 self.last_error = str(error) 

135 

136 # Exponential backoff for errors 

137 backoff_minutes = min( 

138 self.refresh_interval_minutes * (2**self.error_count), 

139 24 * 60 * 7, # Max 1 week in minutes 

140 ) 

141 self.next_refresh = utc_now() + timedelta(minutes=backoff_minutes) 

142 

143 logger.error(f"Subscription {self.id} refresh failed: {error}") 

144 

145 # Disable after too many errors 

146 if self.error_count >= 10: 

147 self.is_active = False 

148 logger.warning(f"Subscription {self.id} disabled after 10 errors") 

149 

150 def pause(self) -> None: 

151 """Pause this subscription.""" 

152 self.is_active = False 

153 logger.info(f"Subscription {self.id} paused") 

154 

155 def resume(self) -> None: 

156 """Resume this subscription.""" 

157 self.is_active = True 

158 self.error_count = 0 # Reset errors on resume 

159 self.next_refresh = self._calculate_next_refresh() 

160 logger.info(f"Subscription {self.id} resumed") 

161 

162 def update_interval(self, new_interval_minutes: int) -> None: 

163 """ 

164 Update the refresh interval. 

165 

166 Args: 

167 new_interval_minutes: New interval in minutes 

168 """ 

169 if new_interval_minutes < 60: 

170 raise ValueError( 

171 "Refresh interval must be at least 60 minutes (1 hour)" 

172 ) 

173 if new_interval_minutes > 60 * 24 * 30: 

174 raise ValueError("Refresh interval cannot exceed 30 days") 

175 

176 self.refresh_interval_minutes = new_interval_minutes 

177 self.next_refresh = self._calculate_next_refresh() 

178 

179 logger.info( 

180 f"Subscription {self.id} interval updated to {new_interval_minutes} minutes" 

181 ) 

182 

183 def save(self) -> str: 

184 """ 

185 Save subscription to database. 

186 

187 Returns: 

188 str: The subscription ID 

189 """ 

190 data = { 

191 "id": self.id, 

192 "user_id": self.user_id, 

193 "subscription_type": self.get_subscription_type(), 

194 "query_or_topic": self.query_or_topic, 

195 "refresh_interval_minutes": self.refresh_interval_minutes, 

196 "source_type": self.source.type, 

197 "source_id": self.source.source_id, 

198 "created_from": self.source.created_from, 

199 "is_active": self.is_active, 

200 "metadata": self.metadata, 

201 "next_refresh": self.next_refresh, 

202 "created_at": self.created_at, 

203 } 

204 return self.storage.create(data) 

205 

206 def mark_refreshed(self, results_count: int) -> None: 

207 """ 

208 Update subscription after successful refresh. 

209 

210 Args: 

211 results_count: Number of results found in this refresh 

212 """ 

213 now = utc_now() 

214 self.last_refreshed = now 

215 self.next_refresh = self._calculate_next_refresh() 

216 self.refresh_count += 1 

217 

218 # Update in database 

219 self.storage.update_refresh_time(self.id, now, self.next_refresh) 

220 self.storage.increment_stats(self.id, results_count) 

221 

222 logger.debug( 

223 f"Subscription {self.id} marked as refreshed with {results_count} results" 

224 ) 

225 

226 def to_dict(self) -> Dict[str, Any]: 

227 """Convert subscription to dictionary representation.""" 

228 return { 

229 "id": self.id, 

230 "type": self.get_subscription_type(), 

231 "user_id": self.user_id, 

232 "query_or_topic": self.query_or_topic, 

233 "source": { 

234 "type": self.source.type, 

235 "source_id": self.source.source_id, 

236 "created_from": self.source.created_from, 

237 "metadata": self.source.metadata, 

238 }, 

239 "created_at": self.created_at.isoformat(), 

240 "last_refreshed": self.last_refreshed.isoformat() 

241 if self.last_refreshed 

242 else None, 

243 "next_refresh": self.next_refresh.isoformat(), 

244 "refresh_interval_minutes": self.refresh_interval_minutes, 

245 "is_active": self.is_active, 

246 "refresh_count": self.refresh_count, 

247 "error_count": self.error_count, 

248 "last_error": self.last_error, 

249 "metadata": self.metadata, 

250 }