Coverage for src / local_deep_research / news / core / storage_manager.py: 95%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Centralized storage manager for all news system data. 

3Provides unified access to cards, subscriptions, ratings, and preferences. 

4""" 

5 

6from enum import Enum 

7from typing import List, Dict, Any, Optional 

8from datetime import datetime, timedelta, timezone 

9from loguru import logger 

10from flask import has_app_context 

11 

12from .card_factory import CardFactory 

13from .base_card import BaseCard 

14from .card_storage import SQLCardStorage 

15from .relevance_service import get_relevance_service 

16from ..subscription_manager.storage import SQLSubscriptionStorage 

17from ..rating_system.storage import SQLRatingStorage 

18from ..preference_manager.storage import SQLPreferenceStorage 

19 

20 

21class InteractionType(Enum): 

22 """Enum for card interaction types.""" 

23 

24 VIEW = "view" 

25 VOTE_UP = "vote_up" 

26 VOTE_DOWN = "vote_down" 

27 RESEARCH = "research" 

28 SHARE = "share" 

29 

30 

31class StorageManager: 

32 """ 

33 Unified storage interface for the news system. 

34 Manages all data access across modules. 

35 """ 

36 

37 def __init__(self): 

38 """Initialize storage interfaces.""" 

39 # Storage interfaces will be created on demand 

40 self._cards = None 

41 self._subscriptions = None 

42 self._ratings = None 

43 self._preferences = None 

44 

45 # Card factory for reconstruction 

46 self.card_factory = CardFactory 

47 

48 # Relevance service for business logic 

49 self.relevance_service = get_relevance_service() 

50 

51 logger.info("StorageManager initialized") 

52 

53 def _get_current_session(self): 

54 """Get the current database session from Flask context (lazy creation).""" 

55 if has_app_context(): 

56 from ...database.session_context import get_g_db_session 

57 

58 db_session = get_g_db_session() 

59 if db_session: 59 ↛ 63line 59 didn't jump to line 63 because the condition on line 59 was always true

60 return db_session 

61 # If no session in context, we need to create one 

62 # This will trigger register_activity 

63 return None 

64 

65 @property 

66 def cards(self): 

67 """Get cards storage interface.""" 

68 session = self._get_current_session() 

69 if session: 

70 return SQLCardStorage(session) 

71 # For now, create storage without session 

72 # This will fail when trying to use it 

73 if self._cards is None: 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true

74 raise RuntimeError("No database session available for news storage") 

75 return self._cards 

76 

77 @property 

78 def subscriptions(self): 

79 """Get subscriptions storage interface.""" 

80 session = self._get_current_session() 

81 if session: 

82 return SQLSubscriptionStorage(session) 

83 if self._subscriptions is None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true

84 raise RuntimeError("No database session available for news storage") 

85 return self._subscriptions 

86 

87 @property 

88 def ratings(self): 

89 """Get ratings storage interface.""" 

90 session = self._get_current_session() 

91 if session: 

92 return SQLRatingStorage(session) 

93 if self._ratings is None: 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was always true

94 raise RuntimeError("No database session available for news storage") 

95 return self._ratings 

96 

97 @property 

98 def preferences(self): 

99 """Get preferences storage interface.""" 

100 session = self._get_current_session() 

101 if session: 

102 return SQLPreferenceStorage(session) 

103 if self._preferences is None: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true

104 raise RuntimeError("No database session available for news storage") 

105 return self._preferences 

106 

107 def get_user_feed( 

108 self, 

109 user_id: str, 

110 limit: int = 20, 

111 offset: int = 0, 

112 include_seen: bool = True, 

113 card_types: Optional[List[str]] = None, 

114 ) -> List[BaseCard]: 

115 """ 

116 Get personalized news feed for a user. 

117 

118 Args: 

119 user_id: The user ID 

120 limit: Maximum cards to return 

121 offset: Pagination offset 

122 include_seen: Whether to include already viewed cards 

123 card_types: Filter by card types 

124 

125 Returns: 

126 List of cards sorted by relevance 

127 """ 

128 try: 

129 # Get user preferences 

130 user_prefs = self.preferences.get(user_id) 

131 

132 # Get recent cards 

133 if user_prefs and user_prefs.get("liked_categories"): 

134 # Filter by user's preferred categories 

135 filters = { 

136 "user_id": user_id, 

137 "categories": user_prefs["liked_categories"], 

138 } 

139 else: 

140 filters = {"user_id": user_id} 

141 

142 if card_types: 

143 filters["card_type"] = card_types 

144 

145 # Get cards from storage 

146 cards_data = self.cards.list( 

147 filters=filters, 

148 limit=limit * 2, # Get extra to filter 

149 offset=offset, 

150 ) 

151 

152 # Reconstruct card objects 

153 cards = [] 

154 for data in cards_data: 

155 card = CardFactory.load_card(data["id"]) 

156 if card: 156 ↛ 154line 156 didn't jump to line 154 because the condition on line 156 was always true

157 cards.append(card) 

158 

159 # Apply personalization 

160 cards = self.relevance_service.personalize_feed( 

161 cards, user_prefs, include_seen=include_seen 

162 ) 

163 

164 return cards[:limit] 

165 

166 except Exception: 

167 logger.exception("Error getting user feed") 

168 return [] 

169 

170 def get_trending_news( 

171 self, hours: int = 24, limit: int = 10, min_impact: int = 7 

172 ) -> List[BaseCard]: 

173 """ 

174 Get trending news across all users. 

175 

176 Args: 

177 hours: Look back period 

178 limit: Maximum cards 

179 min_impact: Minimum impact score 

180 

181 Returns: 

182 List of trending news cards 

183 """ 

184 try: 

185 # Get recent high-impact cards 

186 cards = CardFactory.get_recent_cards( 

187 hours=hours, card_types=["news"], limit=limit * 2 

188 ) 

189 

190 # Use relevance service to filter and sort trending 

191 return self.relevance_service.filter_trending( 

192 cards, min_impact=min_impact, limit=limit 

193 ) 

194 

195 except Exception: 

196 logger.exception("Error getting trending news") 

197 return [] 

198 

199 def record_interaction( 

200 self, 

201 user_id: str, 

202 card_id: str, 

203 interaction_type: InteractionType, 

204 metadata: Optional[Dict] = None, 

205 ) -> bool: 

206 """ 

207 Record user interaction with a card. 

208 

209 Args: 

210 user_id: The user 

211 card_id: The card 

212 interaction_type: Type of interaction (view, vote, share, etc.) 

213 metadata: Additional data 

214 

215 Returns: 

216 Success status 

217 """ 

218 try: 

219 # Load the card 

220 card = CardFactory.load_card(card_id) 

221 if not card: 

222 return False 

223 

224 # Update interaction data 

225 if interaction_type == InteractionType.VIEW: 

226 card.interaction["viewed"] = True 

227 card.interaction["last_viewed"] = datetime.now(timezone.utc) 

228 card.interaction["views"] = card.interaction.get("views", 0) + 1 

229 

230 elif interaction_type == InteractionType.VOTE_UP: 

231 card.interaction["voted"] = "up" 

232 card.interaction["votes_up"] = ( 

233 card.interaction.get("votes_up", 0) + 1 

234 ) 

235 # Record in ratings 

236 self.ratings.save( 

237 { 

238 "user_id": user_id, 

239 "card_id": card_id, 

240 "rating_type": "relevance", 

241 "value": 1, 

242 } 

243 ) 

244 

245 elif interaction_type == InteractionType.VOTE_DOWN: 

246 card.interaction["voted"] = "down" 

247 card.interaction["votes_down"] = ( 

248 card.interaction.get("votes_down", 0) + 1 

249 ) 

250 # Record in ratings 

251 self.ratings.save( 

252 { 

253 "user_id": user_id, 

254 "card_id": card_id, 

255 "rating_type": "relevance", 

256 "value": -1, 

257 } 

258 ) 

259 

260 elif interaction_type == InteractionType.RESEARCH: 260 ↛ 267line 260 didn't jump to line 267 because the condition on line 260 was always true

261 card.interaction["researched"] = True 

262 card.interaction["research_count"] = ( 

263 card.interaction.get("research_count", 0) + 1 

264 ) 

265 

266 # Add metadata if provided 

267 if metadata: 

268 card.interaction[f"{interaction_type}_metadata"] = metadata 

269 

270 # Save updated card 

271 return CardFactory.update_card(card) 

272 

273 except Exception: 

274 logger.exception("Error recording interaction") 

275 return False 

276 

277 def get_user_subscriptions(self, user_id: str) -> List[Any]: 

278 """ 

279 Get all subscriptions for a user. 

280 

281 Args: 

282 user_id: The user 

283 

284 Returns: 

285 List of subscription objects 

286 """ 

287 try: 

288 result: List[Any] = self.subscriptions.list({"user_id": user_id}) 

289 return result 

290 except Exception: 

291 logger.exception("Error getting user subscriptions") 

292 return [] 

293 

294 def get_user_stats(self, user_id: str) -> Dict[str, Any]: 

295 """ 

296 Get statistics for a user. 

297 

298 Args: 

299 user_id: The user 

300 

301 Returns: 

302 Dictionary of statistics 

303 """ 

304 try: 

305 # Get counts from various storages 

306 subscription_count = len( 

307 self.subscriptions.list({"user_id": user_id}) 

308 ) 

309 

310 # Get rating counts 

311 ratings = self.ratings.list({"user_id": user_id}) 

312 votes_up = sum(1 for r in ratings if r.get("value", 0) > 0) 

313 votes_down = sum(1 for r in ratings if r.get("value", 0) < 0) 

314 

315 # Get card interaction stats 

316 user_cards = self.cards.list({"user_id": user_id}) 

317 total_views = sum( 

318 c.get("interaction", {}).get("views", 0) for c in user_cards 

319 ) 

320 

321 return { 

322 "subscriptions": subscription_count, 

323 "votes_up": votes_up, 

324 "votes_down": votes_down, 

325 "total_views": total_views, 

326 "cards_created": len(user_cards), 

327 "member_since": user_cards[0]["created_at"] 

328 if user_cards 

329 else None, 

330 } 

331 

332 except Exception: 

333 logger.exception("Error getting user stats") 

334 return {} 

335 

336 def get_card(self, card_id: str) -> Optional[BaseCard]: 

337 """ 

338 Get a single card by ID. 

339 

340 Args: 

341 card_id: The card ID 

342 

343 Returns: 

344 Card object or None 

345 """ 

346 try: 

347 return CardFactory.load_card(card_id) 

348 except Exception: 

349 logger.exception("Error getting card") 

350 return None 

351 

352 def get_card_interactions(self, card_id: str) -> List[Dict[str, Any]]: 

353 """ 

354 Get all interactions for a card. 

355 

356 Args: 

357 card_id: The card ID 

358 

359 Returns: 

360 List of interaction records 

361 """ 

362 try: 

363 # Get ratings which represent votes 

364 ratings = self.ratings.list({"card_id": card_id}) 

365 

366 # Convert to interaction format 

367 interactions = [] 

368 for rating in ratings: 

369 interaction = { 

370 "user_id": rating.get("user_id"), 

371 "interaction_type": "vote", 

372 "interaction_data": { 

373 "vote": "up" if rating.get("value", 0) > 0 else "down" 

374 }, 

375 "timestamp": rating.get("created_at"), 

376 } 

377 interactions.append(interaction) 

378 

379 return interactions 

380 

381 except Exception: 

382 logger.exception("Error getting card interactions") 

383 return [] 

384 

385 def update_card(self, card: BaseCard) -> bool: 

386 """ 

387 Update a card in storage. 

388 

389 Args: 

390 card: The card to update 

391 

392 Returns: 

393 Success status 

394 """ 

395 try: 

396 return CardFactory.update_card(card) 

397 except Exception: 

398 logger.exception("Error updating card") 

399 return False 

400 

401 def cleanup_old_data(self, days: int = 30) -> Dict[str, int]: 

402 """ 

403 Clean up old data from all storages. 

404 

405 Args: 

406 days: Age threshold in days 

407 

408 Returns: 

409 Counts of deleted items 

410 """ 

411 try: 

412 cutoff = datetime.now(timezone.utc) - timedelta(days=days) 

413 counts = {} 

414 

415 # Clean old cards (except subscribed ones) 

416 old_cards = self.cards.list( 

417 {"created_before": cutoff, "not_subscribed": True} 

418 ) 

419 deleted_cards = 0 

420 for card_data in old_cards: 

421 if self.cards.delete(card_data["id"]): 421 ↛ 420line 421 didn't jump to line 420 because the condition on line 421 was always true

422 deleted_cards += 1 

423 counts["cards"] = deleted_cards 

424 

425 # Clean old ratings 

426 old_ratings = self.ratings.list({"created_before": cutoff}) 

427 counts["ratings"] = len(old_ratings) 

428 for rating in old_ratings: 

429 self.ratings.delete(rating["id"]) 

430 

431 logger.info(f"Cleanup complete: {counts}") 

432 return counts 

433 

434 except Exception: 

435 logger.exception("Error during cleanup") 

436 return {} 

437 

438 

439# Singleton instance 

440_storage_manager = None 

441 

442 

443def get_storage_manager() -> StorageManager: 

444 """Get or create the global StorageManager instance.""" 

445 global _storage_manager 

446 if _storage_manager is None: 

447 _storage_manager = StorageManager() 

448 return _storage_manager