Coverage for src / local_deep_research / news / core / card_storage.py: 95%
149 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2SQLAlchemy storage implementation for news cards.
4Note: This module aligns with the NewsCard SQLAlchemy model in database/models/news.py.
5The NewsCard model has these relevant fields:
6- id, title, summary, content, url
7- source_name, source_type, source_id
8- category, tags, card_type
9- published_at, discovered_at
10- is_read, read_at, is_saved, saved_at
11- extra_data, subscription_id
12"""
14from datetime import datetime, timedelta, timezone
15from typing import List, Optional, Dict, Any
16from sqlalchemy.orm import Session
17from sqlalchemy import desc
18from loguru import logger
20from .storage import CardStorage
21from ...database.models.news import NewsCard
24class SQLCardStorage(CardStorage):
25 """SQLAlchemy implementation of card storage.
27 Maps between the card system's data model and the NewsCard database model.
28 Some fields from the card system are stored in extra_data JSON field.
29 """
31 def __init__(self, session: Session):
32 """Initialize with a database session from the user's encrypted database"""
33 if not session:
34 raise ValueError("Session is required for SQLCardStorage")
35 self._session = session
37 @property
38 def session(self):
39 """Get database session"""
40 return self._session
42 def create(self, data: Dict[str, Any]) -> str:
43 """Create a new card.
45 Maps card system fields to NewsCard model:
46 - topic → title
47 - user_id, parent_card_id, created_from → stored in extra_data
48 """
49 card_id = data.get("id") or self.generate_id()
51 # Extract source info if it's nested
52 source_info = data.get("source", {})
53 if isinstance(source_info, dict): 53 ↛ 58line 53 didn't jump to line 58 because the condition on line 53 was always true
54 source_type = source_info.get("type")
55 source_id = source_info.get("source_id")
56 created_from = source_info.get("created_from")
57 else:
58 source_type = data.get("source_type")
59 source_id = data.get("source_id")
60 created_from = data.get("created_from")
62 # Map card_type enum properly
63 card_type_str = data.get("card_type", data.get("type", "news"))
65 # Store extended fields in extra_data
66 extra_data = data.get("extra_data", {}) or {}
67 extra_data.update(
68 {
69 "user_id": data.get("user_id"),
70 "parent_card_id": data.get("parent_card_id"),
71 "created_from": created_from,
72 "metadata": data.get("metadata", {}),
73 "interaction": data.get("interaction", {}),
74 }
75 )
77 with self.session as session:
78 card = NewsCard(
79 id=card_id,
80 title=data.get("topic", data.get("title", "Untitled")),
81 summary=data.get("summary"),
82 content=data.get("content"),
83 url=data.get("url", data.get("source_url")),
84 source_name=data.get("source_name"),
85 source_type=source_type,
86 source_id=source_id,
87 category=data.get("category"),
88 tags=data.get("tags"),
89 card_type=card_type_str,
90 extra_data=extra_data,
91 )
93 session.add(card)
94 session.commit()
96 user_id = data.get("user_id", "unknown")
97 logger.info(f"Created card {card_id} for user {user_id}")
98 return card_id
100 def get(self, id: str) -> Optional[Dict[str, Any]]:
101 """Get a card by ID"""
102 with self.session as session:
103 card = session.query(NewsCard).filter_by(id=id).first()
104 if not card:
105 return None
106 return self._card_to_dict(card)
108 def update(self, id: str, data: Dict[str, Any]) -> bool:
109 """Update a card.
111 Maps card system fields to NewsCard model:
112 - is_archived → stored in extra_data
113 - is_pinned → is_saved
114 - last_viewed → read_at (and sets is_read=True)
115 """
116 with self.session as session:
117 card = session.query(NewsCard).filter_by(id=id).first()
118 if not card:
119 return False
121 # Map is_pinned to is_saved
122 if "is_pinned" in data:
123 card.is_saved = data["is_pinned"]
124 if data["is_pinned"]:
125 card.saved_at = datetime.now(timezone.utc)
127 # Map last_viewed to read_at
128 if "last_viewed" in data:
129 card.is_read = True
130 card.read_at = data["last_viewed"]
132 # Store is_archived and other custom fields in extra_data
133 extra_data = card.extra_data or {}
134 if "is_archived" in data:
135 extra_data["is_archived"] = data["is_archived"]
136 if "interaction" in data:
137 extra_data["interaction"] = data["interaction"]
138 card.extra_data = extra_data
140 session.commit()
141 return True
143 def delete(self, id: str) -> bool:
144 """Delete a card"""
145 with self.session as session:
146 card = session.query(NewsCard).filter_by(id=id).first()
147 if not card:
148 return False
150 session.delete(card)
151 session.commit()
152 return True
154 def list(
155 self,
156 filters: Optional[Dict[str, Any]] = None,
157 limit: int = 100,
158 offset: int = 0,
159 ) -> List[Dict[str, Any]]:
160 """List cards with optional filtering.
162 Supported filters:
163 - user_id: Filter by user (stored in extra_data)
164 - card_type: Filter by card type
165 - is_archived: Filter by archived status (in extra_data)
166 - is_pinned: Filter by pinned/saved status
167 - category: Filter by category
168 """
169 with self.session as session:
170 query = session.query(NewsCard)
172 if filters:
173 if "card_type" in filters:
174 card_type_val = filters["card_type"]
175 # Handle both string and list of types
176 if isinstance(card_type_val, list):
177 query = query.filter(
178 NewsCard.card_type.in_(card_type_val)
179 )
180 else:
181 query = query.filter_by(card_type=card_type_val)
182 if "is_pinned" in filters:
183 query = query.filter_by(is_saved=filters["is_pinned"])
184 if "category" in filters:
185 query = query.filter_by(category=filters["category"])
186 # Note: user_id and is_archived filtering would require
187 # JSON querying which varies by database backend
189 # Order by discovery date (newest first)
190 query = query.order_by(desc(NewsCard.discovered_at))
192 cards = query.limit(limit).offset(offset).all()
193 return [self._card_to_dict(card) for card in cards]
195 def get_recent(
196 self,
197 hours: int = 24,
198 card_types: Optional[List[str]] = None,
199 limit: int = 50,
200 ) -> List[Dict[str, Any]]:
201 """Get recent cards within the specified time window.
203 Args:
204 hours: How many hours back to look (default 24)
205 card_types: Optional list of card types to filter
206 limit: Maximum number of cards to return
208 Returns:
209 List of card dictionaries
210 """
211 cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
213 with self.session as session:
214 query = session.query(NewsCard).filter(
215 NewsCard.discovered_at >= cutoff
216 )
218 if card_types:
219 query = query.filter(NewsCard.card_type.in_(card_types))
221 query = query.order_by(desc(NewsCard.discovered_at))
222 cards = query.limit(limit).all()
224 return [self._card_to_dict(card) for card in cards]
226 def _card_to_dict(self, card: NewsCard) -> Dict[str, Any]:
227 """Convert a NewsCard model to the dictionary format expected by the card system.
229 Maps NewsCard model fields back to card system format:
230 - title → topic
231 - is_saved → is_pinned
232 - extra_data fields → top-level fields
233 """
234 extra_data: Dict[str, Any] = (
235 dict(card.extra_data) if card.extra_data else {}
236 )
238 return {
239 "id": card.id,
240 "topic": card.title, # Map title back to topic
241 "title": card.title,
242 "summary": card.summary,
243 "content": card.content,
244 "url": card.url,
245 "source_name": card.source_name,
246 "source_type": card.source_type,
247 "source_id": card.source_id,
248 "category": card.category,
249 "tags": card.tags,
250 "card_type": card.card_type,
251 "published_at": card.published_at.isoformat()
252 if card.published_at
253 else None,
254 "discovered_at": card.discovered_at.isoformat()
255 if card.discovered_at
256 else None,
257 "created_at": card.discovered_at.isoformat()
258 if card.discovered_at
259 else None, # Alias for compatibility
260 "updated_at": card.discovered_at.isoformat()
261 if card.discovered_at
262 else None, # Best approximation
263 "is_read": card.is_read,
264 "read_at": card.read_at.isoformat() if card.read_at else None,
265 "is_saved": card.is_saved,
266 "is_pinned": card.is_saved, # Alias for compatibility
267 "saved_at": card.saved_at.isoformat() if card.saved_at else None,
268 # Fields from extra_data
269 "user_id": extra_data.get("user_id"),
270 "parent_card_id": extra_data.get("parent_card_id"),
271 "created_from": extra_data.get("created_from"),
272 "is_archived": extra_data.get("is_archived", False),
273 "metadata": extra_data.get("metadata", {}),
274 "interaction": extra_data.get("interaction", {}),
275 "source": {
276 "type": card.source_type,
277 "source_id": card.source_id,
278 "created_from": extra_data.get("created_from", ""),
279 "metadata": extra_data.get("metadata", {}),
280 },
281 }
283 def get_by_user(
284 self, user_id: str, limit: int = 50, offset: int = 0
285 ) -> List[Dict[str, Any]]:
286 """Get cards for a specific user.
288 Note: Since user_id is stored in extra_data JSON, this does a
289 post-filter. For better performance with large datasets,
290 consider adding a proper user_id column.
291 """
292 # Get more cards than needed to account for filtering
293 all_cards = self.list(filters=None, limit=limit * 3, offset=0)
295 # Filter by user_id from extra_data
296 user_cards = [
297 card
298 for card in all_cards
299 if card.get("user_id") == user_id
300 and not card.get("is_archived", False)
301 ]
303 # Apply pagination
304 return user_cards[offset : offset + limit]
306 def get_latest_version(self, card_id: str) -> Optional[Dict[str, Any]]:
307 """Get the latest version of a card.
309 Note: The versioning system is not yet implemented at the database level.
310 CardVersion is a Python dataclass for in-memory use, not a SQLAlchemy model.
311 This method returns version info stored in extra_data if available.
312 """
313 card_data = self.get(card_id)
314 if not card_data:
315 return None
317 # Check if version info is stored in extra_data
318 extra_data = card_data.get("metadata", {})
319 if "latest_version" in extra_data: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 result: Dict[str, Any] = extra_data["latest_version"]
321 return result
323 # Return card's current state as version 1
324 return {
325 "version_id": f"{card_id}_v1",
326 "version_number": 1,
327 "headline": card_data.get("title"),
328 "summary": card_data.get("summary"),
329 "card_id": card_id,
330 }
332 def add_version(self, card_id: str, version_data: Dict[str, Any]) -> str:
333 """Add a new version to a card.
335 Note: The versioning system stores version data in the card's extra_data
336 field since CardVersion is not a database model. For full versioning
337 support, a CardVersion SQLAlchemy model would need to be created.
338 """
339 version_id = version_data.get("id") or self.generate_id()
341 with self.session as session:
342 card = session.query(NewsCard).filter_by(id=card_id).first()
343 if not card:
344 raise ValueError(f"Card {card_id} not found")
346 # Get current version count from extra_data
347 extra_data = card.extra_data or {}
348 versions = extra_data.get("versions", [])
349 version_number = len(versions) + 1
351 # Create version record
352 version_record = {
353 "id": version_id,
354 "version_number": version_number,
355 "search_query": version_data.get("search_query"),
356 "headline": version_data.get("headline"),
357 "summary": version_data.get("summary"),
358 "findings": version_data.get("findings"),
359 "sources": version_data.get("sources"),
360 "impact_score": version_data.get("impact_score"),
361 "topics": version_data.get("topics"),
362 "entities": version_data.get("entities"),
363 "created_at": datetime.now(timezone.utc).isoformat(),
364 }
366 versions.append(version_record)
367 extra_data["versions"] = versions
368 extra_data["latest_version"] = version_record
370 # Update card fields with latest version info
371 if version_data.get("headline"): 371 ↛ 373line 371 didn't jump to line 373 because the condition on line 371 was always true
372 card.title = version_data["headline"]
373 if version_data.get("summary"):
374 card.summary = version_data["summary"]
376 card.extra_data = extra_data
377 session.commit()
379 logger.info(f"Added version {version_number} to card {card_id}")
380 return version_id
382 def update_latest_info(
383 self, card_id: str, version_data: Dict[str, Any]
384 ) -> bool:
385 """Update the denormalized latest version info on the card.
387 Updates the card's main fields with the latest version data.
388 """
389 with self.session as session:
390 card = session.query(NewsCard).filter_by(id=card_id).first()
391 if not card:
392 return False
394 # Update card's display fields
395 if version_data.get("headline"): 395 ↛ 397line 395 didn't jump to line 397 because the condition on line 395 was always true
396 card.title = version_data["headline"]
397 if version_data.get("summary"): 397 ↛ 401line 397 didn't jump to line 401 because the condition on line 397 was always true
398 card.summary = version_data["summary"]
400 # Store version metadata in extra_data
401 extra_data = card.extra_data or {}
402 extra_data["latest_version"] = {
403 "id": version_data.get("id"),
404 "headline": version_data.get("headline"),
405 "summary": version_data.get("summary"),
406 "impact_score": version_data.get("impact_score"),
407 }
408 card.extra_data = extra_data
410 session.commit()
411 return True
413 def archive_card(self, card_id: str) -> bool:
414 """Archive a card"""
415 return self.update(card_id, {"is_archived": True})
417 def pin_card(self, card_id: str, pinned: bool = True) -> bool:
418 """Pin or unpin a card"""
419 return self.update(card_id, {"is_pinned": pinned})