Coverage for src / local_deep_research / news / subscription_manager / base_subscription.py: 93%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Base class for all subscription types. 

3Following LDR's pattern from BaseSearchStrategy. 

4""" 

5 

6from abc import ABC, abstractmethod 

7from datetime import datetime, timedelta 

8from typing import Dict, Any, Optional 

9from loguru import logger 

10 

11from ..core.utils import generate_subscription_id, utc_now 

12from ..core.base_card import CardSource 

13from .storage import SQLSubscriptionStorage 

14 

15 

16class BaseSubscription(ABC): 

17 """Abstract base class for all subscription types.""" 

18 

19 def __init__( 

20 self, 

21 user_id: str, 

22 source: CardSource, 

23 query_or_topic: str, 

24 refresh_interval_minutes: int = 240, # Default 4 hours 

25 subscription_id: Optional[str] = None, 

26 storage: Optional[SQLSubscriptionStorage] = None, 

27 ): 

28 """ 

29 Initialize base subscription. 

30 

31 Args: 

32 user_id: ID of the user who owns this subscription 

33 source: Source information for tracking origin 

34 query_or_topic: The query or topic to subscribe to 

35 refresh_interval_minutes: How often to check for updates in minutes 

36 subscription_id: Optional ID, will generate if not provided 

37 storage: Optional storage backend for persistence 

38 """ 

39 if storage is not None: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 self.storage = storage 

41 else: 

42 self.storage = SQLSubscriptionStorage(session=None) 

43 self.id = subscription_id or generate_subscription_id() 

44 self.user_id = user_id 

45 self.source = source 

46 self.query_or_topic = query_or_topic 

47 self.refresh_interval_minutes = refresh_interval_minutes 

48 

49 # Timestamps 

50 self.created_at = utc_now() 

51 self.last_refreshed: Optional[datetime] = None 

52 self.next_refresh = self._calculate_next_refresh() 

53 

54 # Status 

55 self.is_active = True 

56 self.refresh_count = 0 

57 self.error_count = 0 

58 self.last_error: Optional[str] = None 

59 

60 # Metadata 

61 self.metadata: Dict[str, Any] = {} 

62 

63 # Subscription type (to be set by subclasses) 

64 self.subscription_type: Optional[str] = None 

65 

66 def _calculate_next_refresh(self) -> datetime: 

67 """Calculate when this subscription should next be refreshed.""" 

68 if self.last_refreshed is None: 

69 # For new subscriptions, next refresh is created_at + interval 

70 return self.created_at + timedelta( 

71 minutes=self.refresh_interval_minutes 

72 ) 

73 return self.last_refreshed + timedelta( 

74 minutes=self.refresh_interval_minutes 

75 ) 

76 

77 def should_refresh(self) -> bool: 

78 """ 

79 Check if this subscription needs to be refreshed. 

80 

81 Returns: 

82 bool: True if refresh is needed 

83 """ 

84 if not self.is_active: 

85 return False 

86 

87 return utc_now() >= self.next_refresh 

88 

89 def is_due_for_refresh(self) -> bool: 

90 """Alias for should_refresh for backward compatibility.""" 

91 return self.should_refresh() 

92 

93 @abstractmethod 

94 def generate_search_query(self) -> str: 

95 """ 

96 Generate the search query for this subscription. 

97 Must be implemented by subclasses. 

98 

99 Returns: 

100 str: The search query to execute 

101 """ 

102 pass 

103 

104 @abstractmethod 

105 def get_subscription_type(self) -> str: 

106 """ 

107 Get the type of this subscription. 

108 

109 Returns: 

110 str: Subscription type identifier 

111 """ 

112 pass 

113 

114 def on_refresh_start(self) -> None: 

115 """Called when a refresh begins.""" 

116 logger.debug(f"Starting refresh for subscription {self.id}") 

117 self.last_refreshed = utc_now() 

118 

119 def on_refresh_success(self, results: Any) -> None: 

120 """ 

121 Called when a refresh completes successfully. 

122 

123 Args: 

124 results: The results from the refresh 

125 """ 

126 self.refresh_count += 1 

127 self.next_refresh = self._calculate_next_refresh() 

128 self.error_count = 0 # Reset error count on success 

129 logger.debug(f"Subscription {self.id} refreshed successfully") 

130 

131 def on_refresh_error(self, error: Exception) -> None: 

132 """ 

133 Called when a refresh fails. 

134 

135 Args: 

136 error: The exception that occurred 

137 """ 

138 self.error_count += 1 

139 self.last_error = str(error) 

140 

141 # Exponential backoff for errors 

142 backoff_minutes = min( 

143 self.refresh_interval_minutes * (2**self.error_count), 

144 24 * 60 * 7, # Max 1 week in minutes 

145 ) 

146 self.next_refresh = utc_now() + timedelta(minutes=backoff_minutes) 

147 

148 logger.error(f"Subscription {self.id} refresh failed: {error}") 

149 

150 # Disable after too many errors 

151 if self.error_count >= 10: 

152 self.is_active = False 

153 logger.warning(f"Subscription {self.id} disabled after 10 errors") 

154 

155 def pause(self) -> None: 

156 """Pause this subscription.""" 

157 self.is_active = False 

158 logger.info(f"Subscription {self.id} paused") 

159 

160 def resume(self) -> None: 

161 """Resume this subscription.""" 

162 self.is_active = True 

163 self.error_count = 0 # Reset errors on resume 

164 self.next_refresh = self._calculate_next_refresh() 

165 logger.info(f"Subscription {self.id} resumed") 

166 

167 def update_interval(self, new_interval_minutes: int) -> None: 

168 """ 

169 Update the refresh interval. 

170 

171 Args: 

172 new_interval_minutes: New interval in minutes 

173 """ 

174 if new_interval_minutes < 60: 

175 raise ValueError( 

176 "Refresh interval must be at least 60 minutes (1 hour)" 

177 ) 

178 if new_interval_minutes > 60 * 24 * 30: 

179 raise ValueError("Refresh interval cannot exceed 30 days") 

180 

181 self.refresh_interval_minutes = new_interval_minutes 

182 self.next_refresh = self._calculate_next_refresh() 

183 

184 logger.info( 

185 f"Subscription {self.id} interval updated to {new_interval_minutes} minutes" 

186 ) 

187 

188 def save(self) -> str: 

189 """ 

190 Save subscription to database. 

191 

192 Returns: 

193 str: The subscription ID 

194 """ 

195 if self.storage is None: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 raise RuntimeError("Storage must be set before calling save()") 

197 data = { 

198 "id": self.id, 

199 "user_id": self.user_id, 

200 "subscription_type": self.get_subscription_type(), 

201 "query_or_topic": self.query_or_topic, 

202 "refresh_interval_minutes": self.refresh_interval_minutes, 

203 "source_type": self.source.type, 

204 "source_id": self.source.source_id, 

205 "created_from": self.source.created_from, 

206 "is_active": self.is_active, 

207 "metadata": self.metadata, 

208 "next_refresh": self.next_refresh, 

209 "created_at": self.created_at, 

210 } 

211 return self.storage.create(data) 

212 

213 def mark_refreshed(self, results_count: int) -> None: 

214 """ 

215 Update subscription after successful refresh. 

216 

217 Args: 

218 results_count: Number of results found in this refresh 

219 """ 

220 if self.storage is None: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 raise RuntimeError( 

222 "Storage must be set before calling mark_refreshed()" 

223 ) 

224 now = utc_now() 

225 self.last_refreshed = now 

226 self.next_refresh = self._calculate_next_refresh() 

227 self.refresh_count += 1 

228 

229 # Update in database 

230 self.storage.update_refresh_time(self.id, now, self.next_refresh) 

231 self.storage.increment_stats(self.id, results_count) 

232 

233 logger.debug( 

234 f"Subscription {self.id} marked as refreshed with {results_count} results" 

235 ) 

236 

237 def to_dict(self) -> Dict[str, Any]: 

238 """Convert subscription to dictionary representation.""" 

239 return { 

240 "id": self.id, 

241 "type": self.get_subscription_type(), 

242 "user_id": self.user_id, 

243 "query_or_topic": self.query_or_topic, 

244 "source": { 

245 "type": self.source.type, 

246 "source_id": self.source.source_id, 

247 "created_from": self.source.created_from, 

248 "metadata": self.source.metadata, 

249 }, 

250 "created_at": self.created_at.isoformat(), 

251 "last_refreshed": self.last_refreshed.isoformat() 

252 if self.last_refreshed 

253 else None, 

254 "next_refresh": self.next_refresh.isoformat(), 

255 "refresh_interval_minutes": self.refresh_interval_minutes, 

256 "is_active": self.is_active, 

257 "refresh_count": self.refresh_count, 

258 "error_count": self.error_count, 

259 "last_error": self.last_error, 

260 "metadata": self.metadata, 

261 }