Coverage for src / local_deep_research / news / core / storage.py: 69%

121 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Base storage interfaces for the news system. 

3""" 

4 

5from abc import ABC, abstractmethod 

6from typing import List, Optional, Dict, Any 

7from datetime import datetime 

8import uuid 

9 

10 

11class BaseStorage(ABC): 

12 """Abstract base class for all storage interfaces""" 

13 

14 @abstractmethod 

15 def create(self, data: Dict[str, Any]) -> str: 

16 """Create a new record and return its ID""" 

17 pass 

18 

19 @abstractmethod 

20 def get(self, id: str) -> Optional[Dict[str, Any]]: 

21 """Get a record by ID""" 

22 pass 

23 

24 @abstractmethod 

25 def update(self, id: str, data: Dict[str, Any]) -> bool: 

26 """Update a record, return True if successful""" 

27 pass 

28 

29 @abstractmethod 

30 def delete(self, id: str) -> bool: 

31 """Delete a record, return True if successful""" 

32 pass 

33 

34 @abstractmethod 

35 def list( 

36 self, 

37 filters: Optional[Dict[str, Any]] = None, 

38 limit: int = 100, 

39 offset: int = 0, 

40 ) -> List[Dict[str, Any]]: 

41 """List records with optional filtering""" 

42 pass 

43 

44 def generate_id(self) -> str: 

45 """Generate a unique ID""" 

46 return str(uuid.uuid4()) 

47 

48 

49class CardStorage(BaseStorage): 

50 """Interface for news card storage""" 

51 

52 @abstractmethod 

53 def get_by_user( 

54 self, user_id: str, limit: int = 50, offset: int = 0 

55 ) -> List[Dict[str, Any]]: 

56 """Get cards for a specific user""" 

57 pass 

58 

59 @abstractmethod 

60 def get_latest_version(self, card_id: str) -> Optional[Dict[str, Any]]: 

61 """Get the latest version of a card""" 

62 pass 

63 

64 @abstractmethod 

65 def add_version(self, card_id: str, version_data: Dict[str, Any]) -> str: 

66 """Add a new version to a card""" 

67 pass 

68 

69 @abstractmethod 

70 def update_latest_info( 

71 self, card_id: str, version_data: Dict[str, Any] 

72 ) -> bool: 

73 """Update the denormalized latest version info on the card""" 

74 pass 

75 

76 @abstractmethod 

77 def archive_card(self, card_id: str) -> bool: 

78 """Archive a card""" 

79 pass 

80 

81 @abstractmethod 

82 def pin_card(self, card_id: str, pinned: bool = True) -> bool: 

83 """Pin or unpin a card""" 

84 pass 

85 

86 

87class SubscriptionStorage(BaseStorage): 

88 """Interface for subscription storage""" 

89 

90 @abstractmethod 

91 def get_active_subscriptions( 

92 self, user_id: Optional[str] = None 

93 ) -> List[Dict[str, Any]]: 

94 """Get all active subscriptions, optionally filtered by user""" 

95 pass 

96 

97 @abstractmethod 

98 def get_due_subscriptions(self, limit: int = 100) -> List[Dict[str, Any]]: 

99 """Get subscriptions that are due for refresh""" 

100 pass 

101 

102 @abstractmethod 

103 def update_refresh_time( 

104 self, 

105 subscription_id: str, 

106 last_refresh: datetime, 

107 next_refresh: datetime, 

108 ) -> bool: 

109 """Update refresh timestamps after processing""" 

110 pass 

111 

112 @abstractmethod 

113 def increment_stats(self, subscription_id: str, results_count: int) -> bool: 

114 """Increment refresh count and update results count""" 

115 pass 

116 

117 @abstractmethod 

118 def pause_subscription(self, subscription_id: str) -> bool: 

119 """Pause a subscription""" 

120 pass 

121 

122 @abstractmethod 

123 def resume_subscription(self, subscription_id: str) -> bool: 

124 """Resume a paused subscription""" 

125 pass 

126 

127 @abstractmethod 

128 def expire_subscription(self, subscription_id: str) -> bool: 

129 """Mark a subscription as expired""" 

130 pass 

131 

132 

133class RatingStorage(BaseStorage): 

134 """Interface for rating storage""" 

135 

136 @abstractmethod 

137 def get_user_rating( 

138 self, user_id: str, item_id: str, rating_type: str 

139 ) -> Optional[Dict[str, Any]]: 

140 """Get a user's rating for a specific item""" 

141 pass 

142 

143 @abstractmethod 

144 def upsert_rating( 

145 self, 

146 user_id: str, 

147 item_id: str, 

148 rating_type: str, 

149 rating_value: str, 

150 item_type: str = "card", 

151 ) -> str: 

152 """Create or update a rating""" 

153 pass 

154 

155 @abstractmethod 

156 def get_ratings_summary( 

157 self, item_id: str, item_type: str = "card" 

158 ) -> Dict[str, Any]: 

159 """Get aggregated ratings for an item""" 

160 pass 

161 

162 @abstractmethod 

163 def get_user_ratings( 

164 self, user_id: str, rating_type: Optional[str] = None, limit: int = 100 

165 ) -> List[Dict[str, Any]]: 

166 """Get all ratings by a user""" 

167 pass 

168 

169 

170class PreferenceStorage(BaseStorage): 

171 """Interface for user preference storage""" 

172 

173 @abstractmethod 

174 def get_user_preferences(self, user_id: str) -> Optional[Dict[str, Any]]: 

175 """Get preferences for a user""" 

176 pass 

177 

178 @abstractmethod 

179 def upsert_preferences( 

180 self, user_id: str, preferences: Dict[str, Any] 

181 ) -> str: 

182 """Create or update user preferences""" 

183 pass 

184 

185 @abstractmethod 

186 def add_liked_item( 

187 self, user_id: str, item_id: str, item_type: str = "news" 

188 ) -> bool: 

189 """Add an item to liked list""" 

190 pass 

191 

192 @abstractmethod 

193 def add_disliked_item( 

194 self, user_id: str, item_id: str, item_type: str = "news" 

195 ) -> bool: 

196 """Add an item to disliked list""" 

197 pass 

198 

199 @abstractmethod 

200 def update_preference_embedding( 

201 self, user_id: str, embedding: List[float] 

202 ) -> bool: 

203 """Update the user's preference embedding""" 

204 pass 

205 

206 

207class SearchHistoryStorage(BaseStorage): 

208 """Interface for search history storage (if tracking enabled)""" 

209 

210 @abstractmethod 

211 def record_search( 

212 self, user_id: str, query: str, search_data: Dict[str, Any] 

213 ) -> str: 

214 """Record a search query""" 

215 pass 

216 

217 @abstractmethod 

218 def get_recent_searches( 

219 self, user_id: str, hours: int = 48, limit: int = 50 

220 ) -> List[Dict[str, Any]]: 

221 """Get recent searches for a user""" 

222 pass 

223 

224 @abstractmethod 

225 def link_to_subscription( 

226 self, search_id: str, subscription_id: str 

227 ) -> bool: 

228 """Link a search to a subscription created from it""" 

229 pass 

230 

231 @abstractmethod 

232 def get_popular_searches( 

233 self, hours: int = 24, limit: int = 20 

234 ) -> List[Dict[str, Any]]: 

235 """Get popular searches across all users""" 

236 pass 

237 

238 

239class NewsItemStorage(BaseStorage): 

240 """Interface for news item storage (the raw news data)""" 

241 

242 @abstractmethod 

243 def get_recent( 

244 self, hours: int = 24, limit: int = 50 

245 ) -> List[Dict[str, Any]]: 

246 """Get recent news items""" 

247 pass 

248 

249 @abstractmethod 

250 def store_batch(self, news_items: List[Dict[str, Any]]) -> List[str]: 

251 """Store multiple news items at once""" 

252 pass 

253 

254 @abstractmethod 

255 def update_votes(self, news_id: str, vote_type: str) -> bool: 

256 """Update vote counts""" 

257 pass 

258 

259 @abstractmethod 

260 def get_by_category( 

261 self, category: str, limit: int = 50 

262 ) -> List[Dict[str, Any]]: 

263 """Get news items by category""" 

264 pass 

265 

266 @abstractmethod 

267 def cleanup_old_items(self, days: int = 7) -> int: 

268 """Remove old news items, return count deleted""" 

269 pass