Coverage for src / local_deep_research / news / core / card_storage.py: 95%

149 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2SQLAlchemy storage implementation for news cards. 

3 

4Note: This module aligns with the NewsCard SQLAlchemy model in database/models/news.py. 

5The NewsCard model has these relevant fields: 

6- id, title, summary, content, url 

7- source_name, source_type, source_id 

8- category, tags, card_type 

9- published_at, discovered_at 

10- is_read, read_at, is_saved, saved_at 

11- extra_data, subscription_id 

12""" 

13 

14from datetime import datetime, timedelta, timezone 

15from typing import List, Optional, Dict, Any 

16from sqlalchemy.orm import Session 

17from sqlalchemy import desc 

18from loguru import logger 

19 

20from .storage import CardStorage 

21from ...database.models.news import NewsCard 

22 

23 

24class SQLCardStorage(CardStorage): 

25 """SQLAlchemy implementation of card storage. 

26 

27 Maps between the card system's data model and the NewsCard database model. 

28 Some fields from the card system are stored in extra_data JSON field. 

29 """ 

30 

31 def __init__(self, session: Session): 

32 """Initialize with a database session from the user's encrypted database""" 

33 if not session: 

34 raise ValueError("Session is required for SQLCardStorage") 

35 self._session = session 

36 

37 @property 

38 def session(self): 

39 """Get database session""" 

40 return self._session 

41 

42 def create(self, data: Dict[str, Any]) -> str: 

43 """Create a new card. 

44 

45 Maps card system fields to NewsCard model: 

46 - topic → title 

47 - user_id, parent_card_id, created_from → stored in extra_data 

48 """ 

49 card_id = data.get("id") or self.generate_id() 

50 

51 # Extract source info if it's nested 

52 source_info = data.get("source", {}) 

53 if isinstance(source_info, dict): 53 ↛ 58line 53 didn't jump to line 58 because the condition on line 53 was always true

54 source_type = source_info.get("type") 

55 source_id = source_info.get("source_id") 

56 created_from = source_info.get("created_from") 

57 else: 

58 source_type = data.get("source_type") 

59 source_id = data.get("source_id") 

60 created_from = data.get("created_from") 

61 

62 # Map card_type enum properly 

63 card_type_str = data.get("card_type", data.get("type", "news")) 

64 

65 # Store extended fields in extra_data 

66 extra_data = data.get("extra_data", {}) or {} 

67 extra_data.update( 

68 { 

69 "user_id": data.get("user_id"), 

70 "parent_card_id": data.get("parent_card_id"), 

71 "created_from": created_from, 

72 "metadata": data.get("metadata", {}), 

73 "interaction": data.get("interaction", {}), 

74 } 

75 ) 

76 

77 with self.session as session: 

78 card = NewsCard( 

79 id=card_id, 

80 title=data.get("topic", data.get("title", "Untitled")), 

81 summary=data.get("summary"), 

82 content=data.get("content"), 

83 url=data.get("url", data.get("source_url")), 

84 source_name=data.get("source_name"), 

85 source_type=source_type, 

86 source_id=source_id, 

87 category=data.get("category"), 

88 tags=data.get("tags"), 

89 card_type=card_type_str, 

90 extra_data=extra_data, 

91 ) 

92 

93 session.add(card) 

94 session.commit() 

95 

96 user_id = data.get("user_id", "unknown") 

97 logger.info(f"Created card {card_id} for user {user_id}") 

98 return card_id 

99 

100 def get(self, id: str) -> Optional[Dict[str, Any]]: 

101 """Get a card by ID""" 

102 with self.session as session: 

103 card = session.query(NewsCard).filter_by(id=id).first() 

104 if not card: 

105 return None 

106 return self._card_to_dict(card) 

107 

108 def update(self, id: str, data: Dict[str, Any]) -> bool: 

109 """Update a card. 

110 

111 Maps card system fields to NewsCard model: 

112 - is_archived → stored in extra_data 

113 - is_pinned → is_saved 

114 - last_viewed → read_at (and sets is_read=True) 

115 """ 

116 with self.session as session: 

117 card = session.query(NewsCard).filter_by(id=id).first() 

118 if not card: 

119 return False 

120 

121 # Map is_pinned to is_saved 

122 if "is_pinned" in data: 

123 card.is_saved = data["is_pinned"] 

124 if data["is_pinned"]: 

125 card.saved_at = datetime.now(timezone.utc) 

126 

127 # Map last_viewed to read_at 

128 if "last_viewed" in data: 

129 card.is_read = True 

130 card.read_at = data["last_viewed"] 

131 

132 # Store is_archived and other custom fields in extra_data 

133 extra_data = card.extra_data or {} 

134 if "is_archived" in data: 

135 extra_data["is_archived"] = data["is_archived"] 

136 if "interaction" in data: 

137 extra_data["interaction"] = data["interaction"] 

138 card.extra_data = extra_data 

139 

140 session.commit() 

141 return True 

142 

143 def delete(self, id: str) -> bool: 

144 """Delete a card""" 

145 with self.session as session: 

146 card = session.query(NewsCard).filter_by(id=id).first() 

147 if not card: 

148 return False 

149 

150 session.delete(card) 

151 session.commit() 

152 return True 

153 

154 def list( 

155 self, 

156 filters: Optional[Dict[str, Any]] = None, 

157 limit: int = 100, 

158 offset: int = 0, 

159 ) -> List[Dict[str, Any]]: 

160 """List cards with optional filtering. 

161 

162 Supported filters: 

163 - user_id: Filter by user (stored in extra_data) 

164 - card_type: Filter by card type 

165 - is_archived: Filter by archived status (in extra_data) 

166 - is_pinned: Filter by pinned/saved status 

167 - category: Filter by category 

168 """ 

169 with self.session as session: 

170 query = session.query(NewsCard) 

171 

172 if filters: 

173 if "card_type" in filters: 

174 card_type_val = filters["card_type"] 

175 # Handle both string and list of types 

176 if isinstance(card_type_val, list): 

177 query = query.filter( 

178 NewsCard.card_type.in_(card_type_val) 

179 ) 

180 else: 

181 query = query.filter_by(card_type=card_type_val) 

182 if "is_pinned" in filters: 

183 query = query.filter_by(is_saved=filters["is_pinned"]) 

184 if "category" in filters: 

185 query = query.filter_by(category=filters["category"]) 

186 # Note: user_id and is_archived filtering would require 

187 # JSON querying which varies by database backend 

188 

189 # Order by discovery date (newest first) 

190 query = query.order_by(desc(NewsCard.discovered_at)) 

191 

192 cards = query.limit(limit).offset(offset).all() 

193 return [self._card_to_dict(card) for card in cards] 

194 

195 def get_recent( 

196 self, 

197 hours: int = 24, 

198 card_types: Optional[List[str]] = None, 

199 limit: int = 50, 

200 ) -> List[Dict[str, Any]]: 

201 """Get recent cards within the specified time window. 

202 

203 Args: 

204 hours: How many hours back to look (default 24) 

205 card_types: Optional list of card types to filter 

206 limit: Maximum number of cards to return 

207 

208 Returns: 

209 List of card dictionaries 

210 """ 

211 cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) 

212 

213 with self.session as session: 

214 query = session.query(NewsCard).filter( 

215 NewsCard.discovered_at >= cutoff 

216 ) 

217 

218 if card_types: 

219 query = query.filter(NewsCard.card_type.in_(card_types)) 

220 

221 query = query.order_by(desc(NewsCard.discovered_at)) 

222 cards = query.limit(limit).all() 

223 

224 return [self._card_to_dict(card) for card in cards] 

225 

226 def _card_to_dict(self, card: NewsCard) -> Dict[str, Any]: 

227 """Convert a NewsCard model to the dictionary format expected by the card system. 

228 

229 Maps NewsCard model fields back to card system format: 

230 - title → topic 

231 - is_saved → is_pinned 

232 - extra_data fields → top-level fields 

233 """ 

234 extra_data: Dict[str, Any] = ( 

235 dict(card.extra_data) if card.extra_data else {} 

236 ) 

237 

238 return { 

239 "id": card.id, 

240 "topic": card.title, # Map title back to topic 

241 "title": card.title, 

242 "summary": card.summary, 

243 "content": card.content, 

244 "url": card.url, 

245 "source_name": card.source_name, 

246 "source_type": card.source_type, 

247 "source_id": card.source_id, 

248 "category": card.category, 

249 "tags": card.tags, 

250 "card_type": card.card_type, 

251 "published_at": card.published_at.isoformat() 

252 if card.published_at 

253 else None, 

254 "discovered_at": card.discovered_at.isoformat() 

255 if card.discovered_at 

256 else None, 

257 "created_at": card.discovered_at.isoformat() 

258 if card.discovered_at 

259 else None, # Alias for compatibility 

260 "updated_at": card.discovered_at.isoformat() 

261 if card.discovered_at 

262 else None, # Best approximation 

263 "is_read": card.is_read, 

264 "read_at": card.read_at.isoformat() if card.read_at else None, 

265 "is_saved": card.is_saved, 

266 "is_pinned": card.is_saved, # Alias for compatibility 

267 "saved_at": card.saved_at.isoformat() if card.saved_at else None, 

268 # Fields from extra_data 

269 "user_id": extra_data.get("user_id"), 

270 "parent_card_id": extra_data.get("parent_card_id"), 

271 "created_from": extra_data.get("created_from"), 

272 "is_archived": extra_data.get("is_archived", False), 

273 "metadata": extra_data.get("metadata", {}), 

274 "interaction": extra_data.get("interaction", {}), 

275 "source": { 

276 "type": card.source_type, 

277 "source_id": card.source_id, 

278 "created_from": extra_data.get("created_from", ""), 

279 "metadata": extra_data.get("metadata", {}), 

280 }, 

281 } 

282 

283 def get_by_user( 

284 self, user_id: str, limit: int = 50, offset: int = 0 

285 ) -> List[Dict[str, Any]]: 

286 """Get cards for a specific user. 

287 

288 Note: Since user_id is stored in extra_data JSON, this does a 

289 post-filter. For better performance with large datasets, 

290 consider adding a proper user_id column. 

291 """ 

292 # Get more cards than needed to account for filtering 

293 all_cards = self.list(filters=None, limit=limit * 3, offset=0) 

294 

295 # Filter by user_id from extra_data 

296 user_cards = [ 

297 card 

298 for card in all_cards 

299 if card.get("user_id") == user_id 

300 and not card.get("is_archived", False) 

301 ] 

302 

303 # Apply pagination 

304 return user_cards[offset : offset + limit] 

305 

306 def get_latest_version(self, card_id: str) -> Optional[Dict[str, Any]]: 

307 """Get the latest version of a card. 

308 

309 Note: The versioning system is not yet implemented at the database level. 

310 CardVersion is a Python dataclass for in-memory use, not a SQLAlchemy model. 

311 This method returns version info stored in extra_data if available. 

312 """ 

313 card_data = self.get(card_id) 

314 if not card_data: 

315 return None 

316 

317 # Check if version info is stored in extra_data 

318 extra_data = card_data.get("metadata", {}) 

319 if "latest_version" in extra_data: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 result: Dict[str, Any] = extra_data["latest_version"] 

321 return result 

322 

323 # Return card's current state as version 1 

324 return { 

325 "version_id": f"{card_id}_v1", 

326 "version_number": 1, 

327 "headline": card_data.get("title"), 

328 "summary": card_data.get("summary"), 

329 "card_id": card_id, 

330 } 

331 

332 def add_version(self, card_id: str, version_data: Dict[str, Any]) -> str: 

333 """Add a new version to a card. 

334 

335 Note: The versioning system stores version data in the card's extra_data 

336 field since CardVersion is not a database model. For full versioning 

337 support, a CardVersion SQLAlchemy model would need to be created. 

338 """ 

339 version_id = version_data.get("id") or self.generate_id() 

340 

341 with self.session as session: 

342 card = session.query(NewsCard).filter_by(id=card_id).first() 

343 if not card: 

344 raise ValueError(f"Card {card_id} not found") 

345 

346 # Get current version count from extra_data 

347 extra_data = card.extra_data or {} 

348 versions = extra_data.get("versions", []) 

349 version_number = len(versions) + 1 

350 

351 # Create version record 

352 version_record = { 

353 "id": version_id, 

354 "version_number": version_number, 

355 "search_query": version_data.get("search_query"), 

356 "headline": version_data.get("headline"), 

357 "summary": version_data.get("summary"), 

358 "findings": version_data.get("findings"), 

359 "sources": version_data.get("sources"), 

360 "impact_score": version_data.get("impact_score"), 

361 "topics": version_data.get("topics"), 

362 "entities": version_data.get("entities"), 

363 "created_at": datetime.now(timezone.utc).isoformat(), 

364 } 

365 

366 versions.append(version_record) 

367 extra_data["versions"] = versions 

368 extra_data["latest_version"] = version_record 

369 

370 # Update card fields with latest version info 

371 if version_data.get("headline"): 371 ↛ 373line 371 didn't jump to line 373 because the condition on line 371 was always true

372 card.title = version_data["headline"] 

373 if version_data.get("summary"): 

374 card.summary = version_data["summary"] 

375 

376 card.extra_data = extra_data 

377 session.commit() 

378 

379 logger.info(f"Added version {version_number} to card {card_id}") 

380 return version_id 

381 

382 def update_latest_info( 

383 self, card_id: str, version_data: Dict[str, Any] 

384 ) -> bool: 

385 """Update the denormalized latest version info on the card. 

386 

387 Updates the card's main fields with the latest version data. 

388 """ 

389 with self.session as session: 

390 card = session.query(NewsCard).filter_by(id=card_id).first() 

391 if not card: 

392 return False 

393 

394 # Update card's display fields 

395 if version_data.get("headline"): 395 ↛ 397line 395 didn't jump to line 397 because the condition on line 395 was always true

396 card.title = version_data["headline"] 

397 if version_data.get("summary"): 397 ↛ 401line 397 didn't jump to line 401 because the condition on line 397 was always true

398 card.summary = version_data["summary"] 

399 

400 # Store version metadata in extra_data 

401 extra_data = card.extra_data or {} 

402 extra_data["latest_version"] = { 

403 "id": version_data.get("id"), 

404 "headline": version_data.get("headline"), 

405 "summary": version_data.get("summary"), 

406 "impact_score": version_data.get("impact_score"), 

407 } 

408 card.extra_data = extra_data 

409 

410 session.commit() 

411 return True 

412 

413 def archive_card(self, card_id: str) -> bool: 

414 """Archive a card""" 

415 return self.update(card_id, {"is_archived": True}) 

416 

417 def pin_card(self, card_id: str, pinned: bool = True) -> bool: 

418 """Pin or unpin a card""" 

419 return self.update(card_id, {"is_pinned": pinned})