Coverage for src / local_deep_research / news / core / storage_manager.py: 95%
184 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Centralized storage manager for all news system data.
3Provides unified access to cards, subscriptions, ratings, and preferences.
4"""
6from enum import Enum
7from typing import List, Dict, Any, Optional
8from datetime import datetime, timedelta, timezone
9from loguru import logger
10from flask import has_app_context
12from .card_factory import CardFactory
13from .base_card import BaseCard
14from .card_storage import SQLCardStorage
15from .relevance_service import get_relevance_service
16from ..subscription_manager.storage import SQLSubscriptionStorage
17from ..rating_system.storage import SQLRatingStorage
18from ..preference_manager.storage import SQLPreferenceStorage
21class InteractionType(Enum):
22 """Enum for card interaction types."""
24 VIEW = "view"
25 VOTE_UP = "vote_up"
26 VOTE_DOWN = "vote_down"
27 RESEARCH = "research"
28 SHARE = "share"
31class StorageManager:
32 """
33 Unified storage interface for the news system.
34 Manages all data access across modules.
35 """
37 def __init__(self):
38 """Initialize storage interfaces."""
39 # Storage interfaces will be created on demand
40 self._cards = None
41 self._subscriptions = None
42 self._ratings = None
43 self._preferences = None
45 # Card factory for reconstruction
46 self.card_factory = CardFactory
48 # Relevance service for business logic
49 self.relevance_service = get_relevance_service()
51 logger.info("StorageManager initialized")
53 def _get_current_session(self):
54 """Get the current database session from Flask context (lazy creation)."""
55 if has_app_context():
56 from ...database.session_context import get_g_db_session
58 db_session = get_g_db_session()
59 if db_session: 59 ↛ 63line 59 didn't jump to line 63 because the condition on line 59 was always true
60 return db_session
61 # If no session in context, we need to create one
62 # This will trigger register_activity
63 return None
65 @property
66 def cards(self):
67 """Get cards storage interface."""
68 session = self._get_current_session()
69 if session:
70 return SQLCardStorage(session)
71 # For now, create storage without session
72 # This will fail when trying to use it
73 if self._cards is None: 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true
74 raise RuntimeError("No database session available for news storage")
75 return self._cards
77 @property
78 def subscriptions(self):
79 """Get subscriptions storage interface."""
80 session = self._get_current_session()
81 if session:
82 return SQLSubscriptionStorage(session)
83 if self._subscriptions is None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true
84 raise RuntimeError("No database session available for news storage")
85 return self._subscriptions
87 @property
88 def ratings(self):
89 """Get ratings storage interface."""
90 session = self._get_current_session()
91 if session:
92 return SQLRatingStorage(session)
93 if self._ratings is None: 93 ↛ 95line 93 didn't jump to line 95 because the condition on line 93 was always true
94 raise RuntimeError("No database session available for news storage")
95 return self._ratings
97 @property
98 def preferences(self):
99 """Get preferences storage interface."""
100 session = self._get_current_session()
101 if session:
102 return SQLPreferenceStorage(session)
103 if self._preferences is None: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true
104 raise RuntimeError("No database session available for news storage")
105 return self._preferences
107 def get_user_feed(
108 self,
109 user_id: str,
110 limit: int = 20,
111 offset: int = 0,
112 include_seen: bool = True,
113 card_types: Optional[List[str]] = None,
114 ) -> List[BaseCard]:
115 """
116 Get personalized news feed for a user.
118 Args:
119 user_id: The user ID
120 limit: Maximum cards to return
121 offset: Pagination offset
122 include_seen: Whether to include already viewed cards
123 card_types: Filter by card types
125 Returns:
126 List of cards sorted by relevance
127 """
128 try:
129 # Get user preferences
130 user_prefs = self.preferences.get(user_id)
132 # Get recent cards
133 if user_prefs and user_prefs.get("liked_categories"):
134 # Filter by user's preferred categories
135 filters = {
136 "user_id": user_id,
137 "categories": user_prefs["liked_categories"],
138 }
139 else:
140 filters = {"user_id": user_id}
142 if card_types:
143 filters["card_type"] = card_types
145 # Get cards from storage
146 cards_data = self.cards.list(
147 filters=filters,
148 limit=limit * 2, # Get extra to filter
149 offset=offset,
150 )
152 # Reconstruct card objects
153 cards = []
154 for data in cards_data:
155 card = CardFactory.load_card(data["id"])
156 if card: 156 ↛ 154line 156 didn't jump to line 154 because the condition on line 156 was always true
157 cards.append(card)
159 # Apply personalization
160 cards = self.relevance_service.personalize_feed(
161 cards, user_prefs, include_seen=include_seen
162 )
164 return cards[:limit]
166 except Exception:
167 logger.exception("Error getting user feed")
168 return []
170 def get_trending_news(
171 self, hours: int = 24, limit: int = 10, min_impact: int = 7
172 ) -> List[BaseCard]:
173 """
174 Get trending news across all users.
176 Args:
177 hours: Look back period
178 limit: Maximum cards
179 min_impact: Minimum impact score
181 Returns:
182 List of trending news cards
183 """
184 try:
185 # Get recent high-impact cards
186 cards = CardFactory.get_recent_cards(
187 hours=hours, card_types=["news"], limit=limit * 2
188 )
190 # Use relevance service to filter and sort trending
191 return self.relevance_service.filter_trending(
192 cards, min_impact=min_impact, limit=limit
193 )
195 except Exception:
196 logger.exception("Error getting trending news")
197 return []
199 def record_interaction(
200 self,
201 user_id: str,
202 card_id: str,
203 interaction_type: InteractionType,
204 metadata: Optional[Dict] = None,
205 ) -> bool:
206 """
207 Record user interaction with a card.
209 Args:
210 user_id: The user
211 card_id: The card
212 interaction_type: Type of interaction (view, vote, share, etc.)
213 metadata: Additional data
215 Returns:
216 Success status
217 """
218 try:
219 # Load the card
220 card = CardFactory.load_card(card_id)
221 if not card:
222 return False
224 # Update interaction data
225 if interaction_type == InteractionType.VIEW:
226 card.interaction["viewed"] = True
227 card.interaction["last_viewed"] = datetime.now(timezone.utc)
228 card.interaction["views"] = card.interaction.get("views", 0) + 1
230 elif interaction_type == InteractionType.VOTE_UP:
231 card.interaction["voted"] = "up"
232 card.interaction["votes_up"] = (
233 card.interaction.get("votes_up", 0) + 1
234 )
235 # Record in ratings
236 self.ratings.save(
237 {
238 "user_id": user_id,
239 "card_id": card_id,
240 "rating_type": "relevance",
241 "value": 1,
242 }
243 )
245 elif interaction_type == InteractionType.VOTE_DOWN:
246 card.interaction["voted"] = "down"
247 card.interaction["votes_down"] = (
248 card.interaction.get("votes_down", 0) + 1
249 )
250 # Record in ratings
251 self.ratings.save(
252 {
253 "user_id": user_id,
254 "card_id": card_id,
255 "rating_type": "relevance",
256 "value": -1,
257 }
258 )
260 elif interaction_type == InteractionType.RESEARCH: 260 ↛ 267line 260 didn't jump to line 267 because the condition on line 260 was always true
261 card.interaction["researched"] = True
262 card.interaction["research_count"] = (
263 card.interaction.get("research_count", 0) + 1
264 )
266 # Add metadata if provided
267 if metadata:
268 card.interaction[f"{interaction_type}_metadata"] = metadata
270 # Save updated card
271 return CardFactory.update_card(card)
273 except Exception:
274 logger.exception("Error recording interaction")
275 return False
277 def get_user_subscriptions(self, user_id: str) -> List[Any]:
278 """
279 Get all subscriptions for a user.
281 Args:
282 user_id: The user
284 Returns:
285 List of subscription objects
286 """
287 try:
288 result: List[Any] = self.subscriptions.list({"user_id": user_id})
289 return result
290 except Exception:
291 logger.exception("Error getting user subscriptions")
292 return []
294 def get_user_stats(self, user_id: str) -> Dict[str, Any]:
295 """
296 Get statistics for a user.
298 Args:
299 user_id: The user
301 Returns:
302 Dictionary of statistics
303 """
304 try:
305 # Get counts from various storages
306 subscription_count = len(
307 self.subscriptions.list({"user_id": user_id})
308 )
310 # Get rating counts
311 ratings = self.ratings.list({"user_id": user_id})
312 votes_up = sum(1 for r in ratings if r.get("value", 0) > 0)
313 votes_down = sum(1 for r in ratings if r.get("value", 0) < 0)
315 # Get card interaction stats
316 user_cards = self.cards.list({"user_id": user_id})
317 total_views = sum(
318 c.get("interaction", {}).get("views", 0) for c in user_cards
319 )
321 return {
322 "subscriptions": subscription_count,
323 "votes_up": votes_up,
324 "votes_down": votes_down,
325 "total_views": total_views,
326 "cards_created": len(user_cards),
327 "member_since": user_cards[0]["created_at"]
328 if user_cards
329 else None,
330 }
332 except Exception:
333 logger.exception("Error getting user stats")
334 return {}
336 def get_card(self, card_id: str) -> Optional[BaseCard]:
337 """
338 Get a single card by ID.
340 Args:
341 card_id: The card ID
343 Returns:
344 Card object or None
345 """
346 try:
347 return CardFactory.load_card(card_id)
348 except Exception:
349 logger.exception("Error getting card")
350 return None
352 def get_card_interactions(self, card_id: str) -> List[Dict[str, Any]]:
353 """
354 Get all interactions for a card.
356 Args:
357 card_id: The card ID
359 Returns:
360 List of interaction records
361 """
362 try:
363 # Get ratings which represent votes
364 ratings = self.ratings.list({"card_id": card_id})
366 # Convert to interaction format
367 interactions = []
368 for rating in ratings:
369 interaction = {
370 "user_id": rating.get("user_id"),
371 "interaction_type": "vote",
372 "interaction_data": {
373 "vote": "up" if rating.get("value", 0) > 0 else "down"
374 },
375 "timestamp": rating.get("created_at"),
376 }
377 interactions.append(interaction)
379 return interactions
381 except Exception:
382 logger.exception("Error getting card interactions")
383 return []
385 def update_card(self, card: BaseCard) -> bool:
386 """
387 Update a card in storage.
389 Args:
390 card: The card to update
392 Returns:
393 Success status
394 """
395 try:
396 return CardFactory.update_card(card)
397 except Exception:
398 logger.exception("Error updating card")
399 return False
401 def cleanup_old_data(self, days: int = 30) -> Dict[str, int]:
402 """
403 Clean up old data from all storages.
405 Args:
406 days: Age threshold in days
408 Returns:
409 Counts of deleted items
410 """
411 try:
412 cutoff = datetime.now(timezone.utc) - timedelta(days=days)
413 counts = {}
415 # Clean old cards (except subscribed ones)
416 old_cards = self.cards.list(
417 {"created_before": cutoff, "not_subscribed": True}
418 )
419 deleted_cards = 0
420 for card_data in old_cards:
421 if self.cards.delete(card_data["id"]): 421 ↛ 420line 421 didn't jump to line 420 because the condition on line 421 was always true
422 deleted_cards += 1
423 counts["cards"] = deleted_cards
425 # Clean old ratings
426 old_ratings = self.ratings.list({"created_before": cutoff})
427 counts["ratings"] = len(old_ratings)
428 for rating in old_ratings:
429 self.ratings.delete(rating["id"])
431 logger.info(f"Cleanup complete: {counts}")
432 return counts
434 except Exception:
435 logger.exception("Error during cleanup")
436 return {}
439# Singleton instance
440_storage_manager = None
443def get_storage_manager() -> StorageManager:
444 """Get or create the global StorageManager instance."""
445 global _storage_manager
446 if _storage_manager is None:
447 _storage_manager = StorageManager()
448 return _storage_manager