Coverage for src / local_deep_research / news / core / storage_manager.py: 0%
180 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Centralized storage manager for all news system data.
3Provides unified access to cards, subscriptions, ratings, and preferences.
4"""
6from enum import Enum
7from typing import List, Dict, Any, Optional
8from datetime import datetime, timedelta, timezone
9from loguru import logger
10from flask import g, has_app_context
12from .card_factory import CardFactory
13from .base_card import BaseCard
14from .card_storage import SQLCardStorage
15from .relevance_service import get_relevance_service
16from ..subscription_manager.storage import SQLSubscriptionStorage
17from ..rating_system.storage import SQLRatingStorage
18from ..preference_manager.storage import SQLPreferenceStorage
21class InteractionType(Enum):
22 """Enum for card interaction types."""
24 VIEW = "view"
25 VOTE_UP = "vote_up"
26 VOTE_DOWN = "vote_down"
27 RESEARCH = "research"
28 SHARE = "share"
31class StorageManager:
32 """
33 Unified storage interface for the news system.
34 Manages all data access across modules.
35 """
37 def __init__(self):
38 """Initialize storage interfaces."""
39 # Storage interfaces will be created on demand
40 self._cards = None
41 self._subscriptions = None
42 self._ratings = None
43 self._preferences = None
45 # Card factory for reconstruction
46 self.card_factory = CardFactory
48 # Relevance service for business logic
49 self.relevance_service = get_relevance_service()
51 logger.info("StorageManager initialized")
53 def _get_current_session(self):
54 """Get the current database session from Flask context."""
55 if has_app_context() and hasattr(g, "db_session") and g.db_session:
56 return g.db_session
57 else:
58 # If no session in context, we need to create one
59 # This will trigger register_activity
60 return None
62 @property
63 def cards(self):
64 """Get cards storage interface."""
65 session = self._get_current_session()
66 if session:
67 return SQLCardStorage(session)
68 else:
69 # For now, create storage without session
70 # This will fail when trying to use it
71 if self._cards is None:
72 raise RuntimeError(
73 "No database session available for news storage"
74 )
75 return self._cards
77 @property
78 def subscriptions(self):
79 """Get subscriptions storage interface."""
80 session = self._get_current_session()
81 if session:
82 return SQLSubscriptionStorage(session)
83 else:
84 if self._subscriptions is None:
85 raise RuntimeError(
86 "No database session available for news storage"
87 )
88 return self._subscriptions
90 @property
91 def ratings(self):
92 """Get ratings storage interface."""
93 session = self._get_current_session()
94 if session:
95 return SQLRatingStorage(session)
96 else:
97 if self._ratings is None:
98 raise RuntimeError(
99 "No database session available for news storage"
100 )
101 return self._ratings
103 @property
104 def preferences(self):
105 """Get preferences storage interface."""
106 session = self._get_current_session()
107 if session:
108 return SQLPreferenceStorage(session)
109 else:
110 if self._preferences is None:
111 raise RuntimeError(
112 "No database session available for news storage"
113 )
114 return self._preferences
116 def get_user_feed(
117 self,
118 user_id: str,
119 limit: int = 20,
120 offset: int = 0,
121 include_seen: bool = True,
122 card_types: Optional[List[str]] = None,
123 ) -> List[BaseCard]:
124 """
125 Get personalized news feed for a user.
127 Args:
128 user_id: The user ID
129 limit: Maximum cards to return
130 offset: Pagination offset
131 include_seen: Whether to include already viewed cards
132 card_types: Filter by card types
134 Returns:
135 List of cards sorted by relevance
136 """
137 try:
138 # Get user preferences
139 user_prefs = self.preferences.get(user_id)
141 # Get recent cards
142 if user_prefs and user_prefs.get("liked_categories"):
143 # Filter by user's preferred categories
144 filters = {
145 "user_id": user_id,
146 "categories": user_prefs["liked_categories"],
147 }
148 else:
149 filters = {"user_id": user_id}
151 if card_types:
152 filters["card_type"] = card_types
154 # Get cards from storage
155 cards_data = self.cards.list(
156 filters=filters,
157 limit=limit * 2, # Get extra to filter
158 offset=offset,
159 )
161 # Reconstruct card objects
162 cards = []
163 for data in cards_data:
164 card = CardFactory.load_card(data["id"])
165 if card:
166 cards.append(card)
168 # Apply personalization
169 cards = self.relevance_service.personalize_feed(
170 cards, user_prefs, include_seen=include_seen
171 )
173 return cards[:limit]
175 except Exception:
176 logger.exception("Error getting user feed")
177 return []
179 def get_trending_news(
180 self, hours: int = 24, limit: int = 10, min_impact: int = 7
181 ) -> List[BaseCard]:
182 """
183 Get trending news across all users.
185 Args:
186 hours: Look back period
187 limit: Maximum cards
188 min_impact: Minimum impact score
190 Returns:
191 List of trending news cards
192 """
193 try:
194 # Get recent high-impact cards
195 cards = CardFactory.get_recent_cards(
196 hours=hours, card_types=["news"], limit=limit * 2
197 )
199 # Use relevance service to filter and sort trending
200 return self.relevance_service.filter_trending(
201 cards, min_impact=min_impact, limit=limit
202 )
204 except Exception:
205 logger.exception("Error getting trending news")
206 return []
208 def record_interaction(
209 self,
210 user_id: str,
211 card_id: str,
212 interaction_type: InteractionType,
213 metadata: Optional[Dict] = None,
214 ) -> bool:
215 """
216 Record user interaction with a card.
218 Args:
219 user_id: The user
220 card_id: The card
221 interaction_type: Type of interaction (view, vote, share, etc.)
222 metadata: Additional data
224 Returns:
225 Success status
226 """
227 try:
228 # Load the card
229 card = CardFactory.load_card(card_id)
230 if not card:
231 return False
233 # Update interaction data
234 if interaction_type == InteractionType.VIEW:
235 card.interaction["viewed"] = True
236 card.interaction["last_viewed"] = datetime.now(timezone.utc)
237 card.interaction["views"] = card.interaction.get("views", 0) + 1
239 elif interaction_type == InteractionType.VOTE_UP:
240 card.interaction["voted"] = "up"
241 card.interaction["votes_up"] = (
242 card.interaction.get("votes_up", 0) + 1
243 )
244 # Record in ratings
245 self.ratings.save(
246 {
247 "user_id": user_id,
248 "card_id": card_id,
249 "rating_type": "relevance",
250 "value": 1,
251 }
252 )
254 elif interaction_type == InteractionType.VOTE_DOWN:
255 card.interaction["voted"] = "down"
256 card.interaction["votes_down"] = (
257 card.interaction.get("votes_down", 0) + 1
258 )
259 # Record in ratings
260 self.ratings.save(
261 {
262 "user_id": user_id,
263 "card_id": card_id,
264 "rating_type": "relevance",
265 "value": -1,
266 }
267 )
269 elif interaction_type == InteractionType.RESEARCH:
270 card.interaction["researched"] = True
271 card.interaction["research_count"] = (
272 card.interaction.get("research_count", 0) + 1
273 )
275 # Add metadata if provided
276 if metadata:
277 card.interaction[f"{interaction_type}_metadata"] = metadata
279 # Save updated card
280 return CardFactory.update_card(card)
282 except Exception:
283 logger.exception("Error recording interaction")
284 return False
286 def get_user_subscriptions(self, user_id: str) -> List[Any]:
287 """
288 Get all subscriptions for a user.
290 Args:
291 user_id: The user
293 Returns:
294 List of subscription objects
295 """
296 try:
297 return self.subscriptions.list({"user_id": user_id})
298 except Exception:
299 logger.exception("Error getting user subscriptions")
300 return []
302 def get_user_stats(self, user_id: str) -> Dict[str, Any]:
303 """
304 Get statistics for a user.
306 Args:
307 user_id: The user
309 Returns:
310 Dictionary of statistics
311 """
312 try:
313 # Get counts from various storages
314 subscription_count = len(
315 self.subscriptions.list({"user_id": user_id})
316 )
318 # Get rating counts
319 ratings = self.ratings.list({"user_id": user_id})
320 votes_up = sum(1 for r in ratings if r.get("value", 0) > 0)
321 votes_down = sum(1 for r in ratings if r.get("value", 0) < 0)
323 # Get card interaction stats
324 user_cards = self.cards.list({"user_id": user_id})
325 total_views = sum(
326 c.get("interaction", {}).get("views", 0) for c in user_cards
327 )
329 return {
330 "subscriptions": subscription_count,
331 "votes_up": votes_up,
332 "votes_down": votes_down,
333 "total_views": total_views,
334 "cards_created": len(user_cards),
335 "member_since": user_cards[0]["created_at"]
336 if user_cards
337 else None,
338 }
340 except Exception:
341 logger.exception("Error getting user stats")
342 return {}
344 def get_card(self, card_id: str) -> Optional[BaseCard]:
345 """
346 Get a single card by ID.
348 Args:
349 card_id: The card ID
351 Returns:
352 Card object or None
353 """
354 try:
355 return CardFactory.load_card(card_id)
356 except Exception:
357 logger.exception("Error getting card")
358 return None
360 def get_card_interactions(self, card_id: str) -> List[Dict[str, Any]]:
361 """
362 Get all interactions for a card.
364 Args:
365 card_id: The card ID
367 Returns:
368 List of interaction records
369 """
370 try:
371 # Get ratings which represent votes
372 ratings = self.ratings.list({"card_id": card_id})
374 # Convert to interaction format
375 interactions = []
376 for rating in ratings:
377 interaction = {
378 "user_id": rating.get("user_id"),
379 "interaction_type": "vote",
380 "interaction_data": {
381 "vote": "up" if rating.get("value", 0) > 0 else "down"
382 },
383 "timestamp": rating.get("created_at"),
384 }
385 interactions.append(interaction)
387 return interactions
389 except Exception:
390 logger.exception("Error getting card interactions")
391 return []
393 def update_card(self, card: BaseCard) -> bool:
394 """
395 Update a card in storage.
397 Args:
398 card: The card to update
400 Returns:
401 Success status
402 """
403 try:
404 return CardFactory.update_card(card)
405 except Exception:
406 logger.exception("Error updating card")
407 return False
409 def cleanup_old_data(self, days: int = 30) -> Dict[str, int]:
410 """
411 Clean up old data from all storages.
413 Args:
414 days: Age threshold in days
416 Returns:
417 Counts of deleted items
418 """
419 try:
420 cutoff = datetime.now(timezone.utc) - timedelta(days=days)
421 counts = {}
423 # Clean old cards (except subscribed ones)
424 old_cards = self.cards.list(
425 {"created_before": cutoff, "not_subscribed": True}
426 )
427 deleted_cards = 0
428 for card_data in old_cards:
429 if self.cards.delete(card_data["id"]):
430 deleted_cards += 1
431 counts["cards"] = deleted_cards
433 # Clean old ratings
434 old_ratings = self.ratings.list({"created_before": cutoff})
435 counts["ratings"] = len(old_ratings)
436 for rating in old_ratings:
437 self.ratings.delete(rating["id"])
439 logger.info(f"Cleanup complete: {counts}")
440 return counts
442 except Exception:
443 logger.exception("Error during cleanup")
444 return {}
447# Singleton instance
448_storage_manager = None
451def get_storage_manager() -> StorageManager:
452 """Get or create the global StorageManager instance."""
453 global _storage_manager
454 if _storage_manager is None:
455 _storage_manager = StorageManager()
456 return _storage_manager