Coverage for src / local_deep_research / news / core / card_storage.py: 95%
148 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2SQLAlchemy storage implementation for news cards.
4Note: This module aligns with the NewsCard SQLAlchemy model in database/models/news.py.
5The NewsCard model has these relevant fields:
6- id, title, summary, content, url
7- source_name, source_type, source_id
8- category, tags, card_type
9- published_at, discovered_at
10- is_read, read_at, is_saved, saved_at
11- extra_data, subscription_id
12"""
14from datetime import datetime, timedelta, timezone
15from typing import List, Optional, Dict, Any
16from sqlalchemy.orm import Session
17from sqlalchemy import desc
18from loguru import logger
20from .storage import CardStorage
21from ...database.models.news import NewsCard
24class SQLCardStorage(CardStorage):
25 """SQLAlchemy implementation of card storage.
27 Maps between the card system's data model and the NewsCard database model.
28 Some fields from the card system are stored in extra_data JSON field.
29 """
31 def __init__(self, session: Session):
32 """Initialize with a database session from the user's encrypted database"""
33 if not session:
34 raise ValueError("Session is required for SQLCardStorage")
35 self._session = session
37 @property
38 def session(self):
39 """Get database session"""
40 return self._session
42 def create(self, data: Dict[str, Any]) -> str:
43 """Create a new card.
45 Maps card system fields to NewsCard model:
46 - topic → title
47 - user_id, parent_card_id, created_from → stored in extra_data
48 """
49 card_id = data.get("id") or self.generate_id()
51 # Extract source info if it's nested
52 source_info = data.get("source", {})
53 if isinstance(source_info, dict): 53 ↛ 58line 53 didn't jump to line 58 because the condition on line 53 was always true
54 source_type = source_info.get("type")
55 source_id = source_info.get("source_id")
56 created_from = source_info.get("created_from")
57 else:
58 source_type = data.get("source_type")
59 source_id = data.get("source_id")
60 created_from = data.get("created_from")
62 # Map card_type enum properly
63 card_type_str = data.get("card_type", data.get("type", "news"))
65 # Store extended fields in extra_data
66 extra_data = data.get("extra_data", {}) or {}
67 extra_data.update(
68 {
69 "user_id": data.get("user_id"),
70 "parent_card_id": data.get("parent_card_id"),
71 "created_from": created_from,
72 "metadata": data.get("metadata", {}),
73 "interaction": data.get("interaction", {}),
74 }
75 )
77 with self.session as session:
78 card = NewsCard(
79 id=card_id,
80 title=data.get("topic", data.get("title", "Untitled")),
81 summary=data.get("summary"),
82 content=data.get("content"),
83 url=data.get("url", data.get("source_url")),
84 source_name=data.get("source_name"),
85 source_type=source_type,
86 source_id=source_id,
87 category=data.get("category"),
88 tags=data.get("tags"),
89 card_type=card_type_str,
90 extra_data=extra_data,
91 )
93 session.add(card)
94 session.commit()
96 user_id = data.get("user_id", "unknown")
97 logger.info(f"Created card {card_id} for user {user_id}")
98 return card_id
100 def get(self, id: str) -> Optional[Dict[str, Any]]:
101 """Get a card by ID"""
102 with self.session as session:
103 card = session.query(NewsCard).filter_by(id=id).first()
104 if not card:
105 return None
106 return self._card_to_dict(card)
108 def update(self, id: str, data: Dict[str, Any]) -> bool:
109 """Update a card.
111 Maps card system fields to NewsCard model:
112 - is_archived → stored in extra_data
113 - is_pinned → is_saved
114 - last_viewed → read_at (and sets is_read=True)
115 """
116 with self.session as session:
117 card = session.query(NewsCard).filter_by(id=id).first()
118 if not card:
119 return False
121 # Map is_pinned to is_saved
122 if "is_pinned" in data:
123 card.is_saved = data["is_pinned"]
124 if data["is_pinned"]:
125 card.saved_at = datetime.now(timezone.utc)
127 # Map last_viewed to read_at
128 if "last_viewed" in data:
129 card.is_read = True
130 card.read_at = data["last_viewed"]
132 # Store is_archived and other custom fields in extra_data
133 extra_data = card.extra_data or {}
134 if "is_archived" in data:
135 extra_data["is_archived"] = data["is_archived"]
136 if "interaction" in data:
137 extra_data["interaction"] = data["interaction"]
138 card.extra_data = extra_data
140 session.commit()
141 return True
143 def delete(self, id: str) -> bool:
144 """Delete a card"""
145 with self.session as session:
146 card = session.query(NewsCard).filter_by(id=id).first()
147 if not card:
148 return False
150 session.delete(card)
151 session.commit()
152 return True
154 def list(
155 self,
156 filters: Optional[Dict[str, Any]] = None,
157 limit: int = 100,
158 offset: int = 0,
159 ) -> List[Dict[str, Any]]:
160 """List cards with optional filtering.
162 Supported filters:
163 - user_id: Filter by user (stored in extra_data)
164 - card_type: Filter by card type
165 - is_archived: Filter by archived status (in extra_data)
166 - is_pinned: Filter by pinned/saved status
167 - category: Filter by category
168 """
169 with self.session as session:
170 query = session.query(NewsCard)
172 if filters:
173 if "card_type" in filters:
174 card_type_val = filters["card_type"]
175 # Handle both string and list of types
176 if isinstance(card_type_val, list):
177 query = query.filter(
178 NewsCard.card_type.in_(card_type_val)
179 )
180 else:
181 query = query.filter_by(card_type=card_type_val)
182 if "is_pinned" in filters:
183 query = query.filter_by(is_saved=filters["is_pinned"])
184 if "category" in filters:
185 query = query.filter_by(category=filters["category"])
186 # Note: user_id and is_archived filtering would require
187 # JSON querying which varies by database backend
189 # Order by discovery date (newest first)
190 query = query.order_by(desc(NewsCard.discovered_at))
192 cards = query.limit(limit).offset(offset).all()
193 return [self._card_to_dict(card) for card in cards]
195 def get_recent(
196 self,
197 hours: int = 24,
198 card_types: Optional[List[str]] = None,
199 limit: int = 50,
200 ) -> List[Dict[str, Any]]:
201 """Get recent cards within the specified time window.
203 Args:
204 hours: How many hours back to look (default 24)
205 card_types: Optional list of card types to filter
206 limit: Maximum number of cards to return
208 Returns:
209 List of card dictionaries
210 """
211 cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
213 with self.session as session:
214 query = session.query(NewsCard).filter(
215 NewsCard.discovered_at >= cutoff
216 )
218 if card_types:
219 query = query.filter(NewsCard.card_type.in_(card_types))
221 query = query.order_by(desc(NewsCard.discovered_at))
222 cards = query.limit(limit).all()
224 return [self._card_to_dict(card) for card in cards]
226 def _card_to_dict(self, card: NewsCard) -> Dict[str, Any]:
227 """Convert a NewsCard model to the dictionary format expected by the card system.
229 Maps NewsCard model fields back to card system format:
230 - title → topic
231 - is_saved → is_pinned
232 - extra_data fields → top-level fields
233 """
234 extra_data = card.extra_data or {}
236 return {
237 "id": card.id,
238 "topic": card.title, # Map title back to topic
239 "title": card.title,
240 "summary": card.summary,
241 "content": card.content,
242 "url": card.url,
243 "source_name": card.source_name,
244 "source_type": card.source_type,
245 "source_id": card.source_id,
246 "category": card.category,
247 "tags": card.tags,
248 "card_type": card.card_type,
249 "published_at": card.published_at.isoformat()
250 if card.published_at
251 else None,
252 "discovered_at": card.discovered_at.isoformat()
253 if card.discovered_at
254 else None,
255 "created_at": card.discovered_at.isoformat()
256 if card.discovered_at
257 else None, # Alias for compatibility
258 "updated_at": card.discovered_at.isoformat()
259 if card.discovered_at
260 else None, # Best approximation
261 "is_read": card.is_read,
262 "read_at": card.read_at.isoformat() if card.read_at else None,
263 "is_saved": card.is_saved,
264 "is_pinned": card.is_saved, # Alias for compatibility
265 "saved_at": card.saved_at.isoformat() if card.saved_at else None,
266 # Fields from extra_data
267 "user_id": extra_data.get("user_id"),
268 "parent_card_id": extra_data.get("parent_card_id"),
269 "created_from": extra_data.get("created_from"),
270 "is_archived": extra_data.get("is_archived", False),
271 "metadata": extra_data.get("metadata", {}),
272 "interaction": extra_data.get("interaction", {}),
273 "source": {
274 "type": card.source_type,
275 "source_id": card.source_id,
276 "created_from": extra_data.get("created_from", ""),
277 "metadata": extra_data.get("metadata", {}),
278 },
279 }
281 def get_by_user(
282 self, user_id: str, limit: int = 50, offset: int = 0
283 ) -> List[Dict[str, Any]]:
284 """Get cards for a specific user.
286 Note: Since user_id is stored in extra_data JSON, this does a
287 post-filter. For better performance with large datasets,
288 consider adding a proper user_id column.
289 """
290 # Get more cards than needed to account for filtering
291 all_cards = self.list(filters=None, limit=limit * 3, offset=0)
293 # Filter by user_id from extra_data
294 user_cards = [
295 card
296 for card in all_cards
297 if card.get("user_id") == user_id
298 and not card.get("is_archived", False)
299 ]
301 # Apply pagination
302 return user_cards[offset : offset + limit]
304 def get_latest_version(self, card_id: str) -> Optional[Dict[str, Any]]:
305 """Get the latest version of a card.
307 Note: The versioning system is not yet implemented at the database level.
308 CardVersion is a Python dataclass for in-memory use, not a SQLAlchemy model.
309 This method returns version info stored in extra_data if available.
310 """
311 card_data = self.get(card_id)
312 if not card_data:
313 return None
315 # Check if version info is stored in extra_data
316 extra_data = card_data.get("metadata", {})
317 if "latest_version" in extra_data: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 return extra_data["latest_version"]
320 # Return card's current state as version 1
321 return {
322 "version_id": f"{card_id}_v1",
323 "version_number": 1,
324 "headline": card_data.get("title"),
325 "summary": card_data.get("summary"),
326 "card_id": card_id,
327 }
329 def add_version(self, card_id: str, version_data: Dict[str, Any]) -> str:
330 """Add a new version to a card.
332 Note: The versioning system stores version data in the card's extra_data
333 field since CardVersion is not a database model. For full versioning
334 support, a CardVersion SQLAlchemy model would need to be created.
335 """
336 version_id = version_data.get("id") or self.generate_id()
338 with self.session as session:
339 card = session.query(NewsCard).filter_by(id=card_id).first()
340 if not card:
341 raise ValueError(f"Card {card_id} not found")
343 # Get current version count from extra_data
344 extra_data = card.extra_data or {}
345 versions = extra_data.get("versions", [])
346 version_number = len(versions) + 1
348 # Create version record
349 version_record = {
350 "id": version_id,
351 "version_number": version_number,
352 "search_query": version_data.get("search_query"),
353 "headline": version_data.get("headline"),
354 "summary": version_data.get("summary"),
355 "findings": version_data.get("findings"),
356 "sources": version_data.get("sources"),
357 "impact_score": version_data.get("impact_score"),
358 "topics": version_data.get("topics"),
359 "entities": version_data.get("entities"),
360 "created_at": datetime.now(timezone.utc).isoformat(),
361 }
363 versions.append(version_record)
364 extra_data["versions"] = versions
365 extra_data["latest_version"] = version_record
367 # Update card fields with latest version info
368 if version_data.get("headline"): 368 ↛ 370line 368 didn't jump to line 370 because the condition on line 368 was always true
369 card.title = version_data["headline"]
370 if version_data.get("summary"):
371 card.summary = version_data["summary"]
373 card.extra_data = extra_data
374 session.commit()
376 logger.info(f"Added version {version_number} to card {card_id}")
377 return version_id
379 def update_latest_info(
380 self, card_id: str, version_data: Dict[str, Any]
381 ) -> bool:
382 """Update the denormalized latest version info on the card.
384 Updates the card's main fields with the latest version data.
385 """
386 with self.session as session:
387 card = session.query(NewsCard).filter_by(id=card_id).first()
388 if not card:
389 return False
391 # Update card's display fields
392 if version_data.get("headline"): 392 ↛ 394line 392 didn't jump to line 394 because the condition on line 392 was always true
393 card.title = version_data["headline"]
394 if version_data.get("summary"): 394 ↛ 398line 394 didn't jump to line 398 because the condition on line 394 was always true
395 card.summary = version_data["summary"]
397 # Store version metadata in extra_data
398 extra_data = card.extra_data or {}
399 extra_data["latest_version"] = {
400 "id": version_data.get("id"),
401 "headline": version_data.get("headline"),
402 "summary": version_data.get("summary"),
403 "impact_score": version_data.get("impact_score"),
404 }
405 card.extra_data = extra_data
407 session.commit()
408 return True
410 def archive_card(self, card_id: str) -> bool:
411 """Archive a card"""
412 return self.update(card_id, {"is_archived": True})
414 def pin_card(self, card_id: str, pinned: bool = True) -> bool:
415 """Pin or unpin a card"""
416 return self.update(card_id, {"is_pinned": pinned})