Coverage for src / local_deep_research / news / core / storage_manager.py: 0%

180 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Centralized storage manager for all news system data. 

3Provides unified access to cards, subscriptions, ratings, and preferences. 

4""" 

5 

6from enum import Enum 

7from typing import List, Dict, Any, Optional 

8from datetime import datetime, timedelta, timezone 

9from loguru import logger 

10from flask import g, has_app_context 

11 

12from .card_factory import CardFactory 

13from .base_card import BaseCard 

14from .card_storage import SQLCardStorage 

15from .relevance_service import get_relevance_service 

16from ..subscription_manager.storage import SQLSubscriptionStorage 

17from ..rating_system.storage import SQLRatingStorage 

18from ..preference_manager.storage import SQLPreferenceStorage 

19 

20 

21class InteractionType(Enum): 

22 """Enum for card interaction types.""" 

23 

24 VIEW = "view" 

25 VOTE_UP = "vote_up" 

26 VOTE_DOWN = "vote_down" 

27 RESEARCH = "research" 

28 SHARE = "share" 

29 

30 

31class StorageManager: 

32 """ 

33 Unified storage interface for the news system. 

34 Manages all data access across modules. 

35 """ 

36 

37 def __init__(self): 

38 """Initialize storage interfaces.""" 

39 # Storage interfaces will be created on demand 

40 self._cards = None 

41 self._subscriptions = None 

42 self._ratings = None 

43 self._preferences = None 

44 

45 # Card factory for reconstruction 

46 self.card_factory = CardFactory 

47 

48 # Relevance service for business logic 

49 self.relevance_service = get_relevance_service() 

50 

51 logger.info("StorageManager initialized") 

52 

53 def _get_current_session(self): 

54 """Get the current database session from Flask context.""" 

55 if has_app_context() and hasattr(g, "db_session") and g.db_session: 

56 return g.db_session 

57 else: 

58 # If no session in context, we need to create one 

59 # This will trigger register_activity 

60 return None 

61 

62 @property 

63 def cards(self): 

64 """Get cards storage interface.""" 

65 session = self._get_current_session() 

66 if session: 

67 return SQLCardStorage(session) 

68 else: 

69 # For now, create storage without session 

70 # This will fail when trying to use it 

71 if self._cards is None: 

72 raise RuntimeError( 

73 "No database session available for news storage" 

74 ) 

75 return self._cards 

76 

77 @property 

78 def subscriptions(self): 

79 """Get subscriptions storage interface.""" 

80 session = self._get_current_session() 

81 if session: 

82 return SQLSubscriptionStorage(session) 

83 else: 

84 if self._subscriptions is None: 

85 raise RuntimeError( 

86 "No database session available for news storage" 

87 ) 

88 return self._subscriptions 

89 

90 @property 

91 def ratings(self): 

92 """Get ratings storage interface.""" 

93 session = self._get_current_session() 

94 if session: 

95 return SQLRatingStorage(session) 

96 else: 

97 if self._ratings is None: 

98 raise RuntimeError( 

99 "No database session available for news storage" 

100 ) 

101 return self._ratings 

102 

103 @property 

104 def preferences(self): 

105 """Get preferences storage interface.""" 

106 session = self._get_current_session() 

107 if session: 

108 return SQLPreferenceStorage(session) 

109 else: 

110 if self._preferences is None: 

111 raise RuntimeError( 

112 "No database session available for news storage" 

113 ) 

114 return self._preferences 

115 

116 def get_user_feed( 

117 self, 

118 user_id: str, 

119 limit: int = 20, 

120 offset: int = 0, 

121 include_seen: bool = True, 

122 card_types: Optional[List[str]] = None, 

123 ) -> List[BaseCard]: 

124 """ 

125 Get personalized news feed for a user. 

126 

127 Args: 

128 user_id: The user ID 

129 limit: Maximum cards to return 

130 offset: Pagination offset 

131 include_seen: Whether to include already viewed cards 

132 card_types: Filter by card types 

133 

134 Returns: 

135 List of cards sorted by relevance 

136 """ 

137 try: 

138 # Get user preferences 

139 user_prefs = self.preferences.get(user_id) 

140 

141 # Get recent cards 

142 if user_prefs and user_prefs.get("liked_categories"): 

143 # Filter by user's preferred categories 

144 filters = { 

145 "user_id": user_id, 

146 "categories": user_prefs["liked_categories"], 

147 } 

148 else: 

149 filters = {"user_id": user_id} 

150 

151 if card_types: 

152 filters["card_type"] = card_types 

153 

154 # Get cards from storage 

155 cards_data = self.cards.list( 

156 filters=filters, 

157 limit=limit * 2, # Get extra to filter 

158 offset=offset, 

159 ) 

160 

161 # Reconstruct card objects 

162 cards = [] 

163 for data in cards_data: 

164 card = CardFactory.load_card(data["id"]) 

165 if card: 

166 cards.append(card) 

167 

168 # Apply personalization 

169 cards = self.relevance_service.personalize_feed( 

170 cards, user_prefs, include_seen=include_seen 

171 ) 

172 

173 return cards[:limit] 

174 

175 except Exception: 

176 logger.exception("Error getting user feed") 

177 return [] 

178 

179 def get_trending_news( 

180 self, hours: int = 24, limit: int = 10, min_impact: int = 7 

181 ) -> List[BaseCard]: 

182 """ 

183 Get trending news across all users. 

184 

185 Args: 

186 hours: Look back period 

187 limit: Maximum cards 

188 min_impact: Minimum impact score 

189 

190 Returns: 

191 List of trending news cards 

192 """ 

193 try: 

194 # Get recent high-impact cards 

195 cards = CardFactory.get_recent_cards( 

196 hours=hours, card_types=["news"], limit=limit * 2 

197 ) 

198 

199 # Use relevance service to filter and sort trending 

200 return self.relevance_service.filter_trending( 

201 cards, min_impact=min_impact, limit=limit 

202 ) 

203 

204 except Exception: 

205 logger.exception("Error getting trending news") 

206 return [] 

207 

208 def record_interaction( 

209 self, 

210 user_id: str, 

211 card_id: str, 

212 interaction_type: InteractionType, 

213 metadata: Optional[Dict] = None, 

214 ) -> bool: 

215 """ 

216 Record user interaction with a card. 

217 

218 Args: 

219 user_id: The user 

220 card_id: The card 

221 interaction_type: Type of interaction (view, vote, share, etc.) 

222 metadata: Additional data 

223 

224 Returns: 

225 Success status 

226 """ 

227 try: 

228 # Load the card 

229 card = CardFactory.load_card(card_id) 

230 if not card: 

231 return False 

232 

233 # Update interaction data 

234 if interaction_type == InteractionType.VIEW: 

235 card.interaction["viewed"] = True 

236 card.interaction["last_viewed"] = datetime.now(timezone.utc) 

237 card.interaction["views"] = card.interaction.get("views", 0) + 1 

238 

239 elif interaction_type == InteractionType.VOTE_UP: 

240 card.interaction["voted"] = "up" 

241 card.interaction["votes_up"] = ( 

242 card.interaction.get("votes_up", 0) + 1 

243 ) 

244 # Record in ratings 

245 self.ratings.save( 

246 { 

247 "user_id": user_id, 

248 "card_id": card_id, 

249 "rating_type": "relevance", 

250 "value": 1, 

251 } 

252 ) 

253 

254 elif interaction_type == InteractionType.VOTE_DOWN: 

255 card.interaction["voted"] = "down" 

256 card.interaction["votes_down"] = ( 

257 card.interaction.get("votes_down", 0) + 1 

258 ) 

259 # Record in ratings 

260 self.ratings.save( 

261 { 

262 "user_id": user_id, 

263 "card_id": card_id, 

264 "rating_type": "relevance", 

265 "value": -1, 

266 } 

267 ) 

268 

269 elif interaction_type == InteractionType.RESEARCH: 

270 card.interaction["researched"] = True 

271 card.interaction["research_count"] = ( 

272 card.interaction.get("research_count", 0) + 1 

273 ) 

274 

275 # Add metadata if provided 

276 if metadata: 

277 card.interaction[f"{interaction_type}_metadata"] = metadata 

278 

279 # Save updated card 

280 return CardFactory.update_card(card) 

281 

282 except Exception: 

283 logger.exception("Error recording interaction") 

284 return False 

285 

286 def get_user_subscriptions(self, user_id: str) -> List[Any]: 

287 """ 

288 Get all subscriptions for a user. 

289 

290 Args: 

291 user_id: The user 

292 

293 Returns: 

294 List of subscription objects 

295 """ 

296 try: 

297 return self.subscriptions.list({"user_id": user_id}) 

298 except Exception: 

299 logger.exception("Error getting user subscriptions") 

300 return [] 

301 

302 def get_user_stats(self, user_id: str) -> Dict[str, Any]: 

303 """ 

304 Get statistics for a user. 

305 

306 Args: 

307 user_id: The user 

308 

309 Returns: 

310 Dictionary of statistics 

311 """ 

312 try: 

313 # Get counts from various storages 

314 subscription_count = len( 

315 self.subscriptions.list({"user_id": user_id}) 

316 ) 

317 

318 # Get rating counts 

319 ratings = self.ratings.list({"user_id": user_id}) 

320 votes_up = sum(1 for r in ratings if r.get("value", 0) > 0) 

321 votes_down = sum(1 for r in ratings if r.get("value", 0) < 0) 

322 

323 # Get card interaction stats 

324 user_cards = self.cards.list({"user_id": user_id}) 

325 total_views = sum( 

326 c.get("interaction", {}).get("views", 0) for c in user_cards 

327 ) 

328 

329 return { 

330 "subscriptions": subscription_count, 

331 "votes_up": votes_up, 

332 "votes_down": votes_down, 

333 "total_views": total_views, 

334 "cards_created": len(user_cards), 

335 "member_since": user_cards[0]["created_at"] 

336 if user_cards 

337 else None, 

338 } 

339 

340 except Exception: 

341 logger.exception("Error getting user stats") 

342 return {} 

343 

344 def get_card(self, card_id: str) -> Optional[BaseCard]: 

345 """ 

346 Get a single card by ID. 

347 

348 Args: 

349 card_id: The card ID 

350 

351 Returns: 

352 Card object or None 

353 """ 

354 try: 

355 return CardFactory.load_card(card_id) 

356 except Exception: 

357 logger.exception("Error getting card") 

358 return None 

359 

360 def get_card_interactions(self, card_id: str) -> List[Dict[str, Any]]: 

361 """ 

362 Get all interactions for a card. 

363 

364 Args: 

365 card_id: The card ID 

366 

367 Returns: 

368 List of interaction records 

369 """ 

370 try: 

371 # Get ratings which represent votes 

372 ratings = self.ratings.list({"card_id": card_id}) 

373 

374 # Convert to interaction format 

375 interactions = [] 

376 for rating in ratings: 

377 interaction = { 

378 "user_id": rating.get("user_id"), 

379 "interaction_type": "vote", 

380 "interaction_data": { 

381 "vote": "up" if rating.get("value", 0) > 0 else "down" 

382 }, 

383 "timestamp": rating.get("created_at"), 

384 } 

385 interactions.append(interaction) 

386 

387 return interactions 

388 

389 except Exception: 

390 logger.exception("Error getting card interactions") 

391 return [] 

392 

393 def update_card(self, card: BaseCard) -> bool: 

394 """ 

395 Update a card in storage. 

396 

397 Args: 

398 card: The card to update 

399 

400 Returns: 

401 Success status 

402 """ 

403 try: 

404 return CardFactory.update_card(card) 

405 except Exception: 

406 logger.exception("Error updating card") 

407 return False 

408 

409 def cleanup_old_data(self, days: int = 30) -> Dict[str, int]: 

410 """ 

411 Clean up old data from all storages. 

412 

413 Args: 

414 days: Age threshold in days 

415 

416 Returns: 

417 Counts of deleted items 

418 """ 

419 try: 

420 cutoff = datetime.now(timezone.utc) - timedelta(days=days) 

421 counts = {} 

422 

423 # Clean old cards (except subscribed ones) 

424 old_cards = self.cards.list( 

425 {"created_before": cutoff, "not_subscribed": True} 

426 ) 

427 deleted_cards = 0 

428 for card_data in old_cards: 

429 if self.cards.delete(card_data["id"]): 

430 deleted_cards += 1 

431 counts["cards"] = deleted_cards 

432 

433 # Clean old ratings 

434 old_ratings = self.ratings.list({"created_before": cutoff}) 

435 counts["ratings"] = len(old_ratings) 

436 for rating in old_ratings: 

437 self.ratings.delete(rating["id"]) 

438 

439 logger.info(f"Cleanup complete: {counts}") 

440 return counts 

441 

442 except Exception: 

443 logger.exception("Error during cleanup") 

444 return {} 

445 

446 

447# Singleton instance 

448_storage_manager = None 

449 

450 

451def get_storage_manager() -> StorageManager: 

452 """Get or create the global StorageManager instance.""" 

453 global _storage_manager 

454 if _storage_manager is None: 

455 _storage_manager = StorageManager() 

456 return _storage_manager