Coverage for src / local_deep_research / news / core / card_storage.py: 95%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2SQLAlchemy storage implementation for news cards. 

3 

4Note: This module aligns with the NewsCard SQLAlchemy model in database/models/news.py. 

5The NewsCard model has these relevant fields: 

6- id, title, summary, content, url 

7- source_name, source_type, source_id 

8- category, tags, card_type 

9- published_at, discovered_at 

10- is_read, read_at, is_saved, saved_at 

11- extra_data, subscription_id 

12""" 

13 

14from datetime import datetime, timedelta, timezone 

15from typing import List, Optional, Dict, Any 

16from sqlalchemy.orm import Session 

17from sqlalchemy import desc 

18from loguru import logger 

19 

20from .storage import CardStorage 

21from ...database.models.news import NewsCard 

22 

23 

24class SQLCardStorage(CardStorage): 

25 """SQLAlchemy implementation of card storage. 

26 

27 Maps between the card system's data model and the NewsCard database model. 

28 Some fields from the card system are stored in extra_data JSON field. 

29 """ 

30 

31 def __init__(self, session: Session): 

32 """Initialize with a database session from the user's encrypted database""" 

33 if not session: 

34 raise ValueError("Session is required for SQLCardStorage") 

35 self._session = session 

36 

37 @property 

38 def session(self): 

39 """Get database session""" 

40 return self._session 

41 

42 def create(self, data: Dict[str, Any]) -> str: 

43 """Create a new card. 

44 

45 Maps card system fields to NewsCard model: 

46 - topic → title 

47 - user_id, parent_card_id, created_from → stored in extra_data 

48 """ 

49 card_id = data.get("id") or self.generate_id() 

50 

51 # Extract source info if it's nested 

52 source_info = data.get("source", {}) 

53 if isinstance(source_info, dict): 53 ↛ 58line 53 didn't jump to line 58 because the condition on line 53 was always true

54 source_type = source_info.get("type") 

55 source_id = source_info.get("source_id") 

56 created_from = source_info.get("created_from") 

57 else: 

58 source_type = data.get("source_type") 

59 source_id = data.get("source_id") 

60 created_from = data.get("created_from") 

61 

62 # Map card_type enum properly 

63 card_type_str = data.get("card_type", data.get("type", "news")) 

64 

65 # Store extended fields in extra_data 

66 extra_data = data.get("extra_data", {}) or {} 

67 extra_data.update( 

68 { 

69 "user_id": data.get("user_id"), 

70 "parent_card_id": data.get("parent_card_id"), 

71 "created_from": created_from, 

72 "metadata": data.get("metadata", {}), 

73 "interaction": data.get("interaction", {}), 

74 } 

75 ) 

76 

77 with self.session as session: 

78 card = NewsCard( 

79 id=card_id, 

80 title=data.get("topic", data.get("title", "Untitled")), 

81 summary=data.get("summary"), 

82 content=data.get("content"), 

83 url=data.get("url", data.get("source_url")), 

84 source_name=data.get("source_name"), 

85 source_type=source_type, 

86 source_id=source_id, 

87 category=data.get("category"), 

88 tags=data.get("tags"), 

89 card_type=card_type_str, 

90 extra_data=extra_data, 

91 ) 

92 

93 session.add(card) 

94 session.commit() 

95 

96 user_id = data.get("user_id", "unknown") 

97 logger.info(f"Created card {card_id} for user {user_id}") 

98 return card_id 

99 

100 def get(self, id: str) -> Optional[Dict[str, Any]]: 

101 """Get a card by ID""" 

102 with self.session as session: 

103 card = session.query(NewsCard).filter_by(id=id).first() 

104 if not card: 

105 return None 

106 return self._card_to_dict(card) 

107 

108 def update(self, id: str, data: Dict[str, Any]) -> bool: 

109 """Update a card. 

110 

111 Maps card system fields to NewsCard model: 

112 - is_archived → stored in extra_data 

113 - is_pinned → is_saved 

114 - last_viewed → read_at (and sets is_read=True) 

115 """ 

116 with self.session as session: 

117 card = session.query(NewsCard).filter_by(id=id).first() 

118 if not card: 

119 return False 

120 

121 # Map is_pinned to is_saved 

122 if "is_pinned" in data: 

123 card.is_saved = data["is_pinned"] 

124 if data["is_pinned"]: 

125 card.saved_at = datetime.now(timezone.utc) 

126 

127 # Map last_viewed to read_at 

128 if "last_viewed" in data: 

129 card.is_read = True 

130 card.read_at = data["last_viewed"] 

131 

132 # Store is_archived and other custom fields in extra_data 

133 extra_data = card.extra_data or {} 

134 if "is_archived" in data: 

135 extra_data["is_archived"] = data["is_archived"] 

136 if "interaction" in data: 

137 extra_data["interaction"] = data["interaction"] 

138 card.extra_data = extra_data 

139 

140 session.commit() 

141 return True 

142 

143 def delete(self, id: str) -> bool: 

144 """Delete a card""" 

145 with self.session as session: 

146 card = session.query(NewsCard).filter_by(id=id).first() 

147 if not card: 

148 return False 

149 

150 session.delete(card) 

151 session.commit() 

152 return True 

153 

154 def list( 

155 self, 

156 filters: Optional[Dict[str, Any]] = None, 

157 limit: int = 100, 

158 offset: int = 0, 

159 ) -> List[Dict[str, Any]]: 

160 """List cards with optional filtering. 

161 

162 Supported filters: 

163 - user_id: Filter by user (stored in extra_data) 

164 - card_type: Filter by card type 

165 - is_archived: Filter by archived status (in extra_data) 

166 - is_pinned: Filter by pinned/saved status 

167 - category: Filter by category 

168 """ 

169 with self.session as session: 

170 query = session.query(NewsCard) 

171 

172 if filters: 

173 if "card_type" in filters: 

174 card_type_val = filters["card_type"] 

175 # Handle both string and list of types 

176 if isinstance(card_type_val, list): 

177 query = query.filter( 

178 NewsCard.card_type.in_(card_type_val) 

179 ) 

180 else: 

181 query = query.filter_by(card_type=card_type_val) 

182 if "is_pinned" in filters: 

183 query = query.filter_by(is_saved=filters["is_pinned"]) 

184 if "category" in filters: 

185 query = query.filter_by(category=filters["category"]) 

186 # Note: user_id and is_archived filtering would require 

187 # JSON querying which varies by database backend 

188 

189 # Order by discovery date (newest first) 

190 query = query.order_by(desc(NewsCard.discovered_at)) 

191 

192 cards = query.limit(limit).offset(offset).all() 

193 return [self._card_to_dict(card) for card in cards] 

194 

195 def get_recent( 

196 self, 

197 hours: int = 24, 

198 card_types: Optional[List[str]] = None, 

199 limit: int = 50, 

200 ) -> List[Dict[str, Any]]: 

201 """Get recent cards within the specified time window. 

202 

203 Args: 

204 hours: How many hours back to look (default 24) 

205 card_types: Optional list of card types to filter 

206 limit: Maximum number of cards to return 

207 

208 Returns: 

209 List of card dictionaries 

210 """ 

211 cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) 

212 

213 with self.session as session: 

214 query = session.query(NewsCard).filter( 

215 NewsCard.discovered_at >= cutoff 

216 ) 

217 

218 if card_types: 

219 query = query.filter(NewsCard.card_type.in_(card_types)) 

220 

221 query = query.order_by(desc(NewsCard.discovered_at)) 

222 cards = query.limit(limit).all() 

223 

224 return [self._card_to_dict(card) for card in cards] 

225 

226 def _card_to_dict(self, card: NewsCard) -> Dict[str, Any]: 

227 """Convert a NewsCard model to the dictionary format expected by the card system. 

228 

229 Maps NewsCard model fields back to card system format: 

230 - title → topic 

231 - is_saved → is_pinned 

232 - extra_data fields → top-level fields 

233 """ 

234 extra_data = card.extra_data or {} 

235 

236 return { 

237 "id": card.id, 

238 "topic": card.title, # Map title back to topic 

239 "title": card.title, 

240 "summary": card.summary, 

241 "content": card.content, 

242 "url": card.url, 

243 "source_name": card.source_name, 

244 "source_type": card.source_type, 

245 "source_id": card.source_id, 

246 "category": card.category, 

247 "tags": card.tags, 

248 "card_type": card.card_type, 

249 "published_at": card.published_at.isoformat() 

250 if card.published_at 

251 else None, 

252 "discovered_at": card.discovered_at.isoformat() 

253 if card.discovered_at 

254 else None, 

255 "created_at": card.discovered_at.isoformat() 

256 if card.discovered_at 

257 else None, # Alias for compatibility 

258 "updated_at": card.discovered_at.isoformat() 

259 if card.discovered_at 

260 else None, # Best approximation 

261 "is_read": card.is_read, 

262 "read_at": card.read_at.isoformat() if card.read_at else None, 

263 "is_saved": card.is_saved, 

264 "is_pinned": card.is_saved, # Alias for compatibility 

265 "saved_at": card.saved_at.isoformat() if card.saved_at else None, 

266 # Fields from extra_data 

267 "user_id": extra_data.get("user_id"), 

268 "parent_card_id": extra_data.get("parent_card_id"), 

269 "created_from": extra_data.get("created_from"), 

270 "is_archived": extra_data.get("is_archived", False), 

271 "metadata": extra_data.get("metadata", {}), 

272 "interaction": extra_data.get("interaction", {}), 

273 "source": { 

274 "type": card.source_type, 

275 "source_id": card.source_id, 

276 "created_from": extra_data.get("created_from", ""), 

277 "metadata": extra_data.get("metadata", {}), 

278 }, 

279 } 

280 

281 def get_by_user( 

282 self, user_id: str, limit: int = 50, offset: int = 0 

283 ) -> List[Dict[str, Any]]: 

284 """Get cards for a specific user. 

285 

286 Note: Since user_id is stored in extra_data JSON, this does a 

287 post-filter. For better performance with large datasets, 

288 consider adding a proper user_id column. 

289 """ 

290 # Get more cards than needed to account for filtering 

291 all_cards = self.list(filters=None, limit=limit * 3, offset=0) 

292 

293 # Filter by user_id from extra_data 

294 user_cards = [ 

295 card 

296 for card in all_cards 

297 if card.get("user_id") == user_id 

298 and not card.get("is_archived", False) 

299 ] 

300 

301 # Apply pagination 

302 return user_cards[offset : offset + limit] 

303 

304 def get_latest_version(self, card_id: str) -> Optional[Dict[str, Any]]: 

305 """Get the latest version of a card. 

306 

307 Note: The versioning system is not yet implemented at the database level. 

308 CardVersion is a Python dataclass for in-memory use, not a SQLAlchemy model. 

309 This method returns version info stored in extra_data if available. 

310 """ 

311 card_data = self.get(card_id) 

312 if not card_data: 

313 return None 

314 

315 # Check if version info is stored in extra_data 

316 extra_data = card_data.get("metadata", {}) 

317 if "latest_version" in extra_data: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 return extra_data["latest_version"] 

319 

320 # Return card's current state as version 1 

321 return { 

322 "version_id": f"{card_id}_v1", 

323 "version_number": 1, 

324 "headline": card_data.get("title"), 

325 "summary": card_data.get("summary"), 

326 "card_id": card_id, 

327 } 

328 

329 def add_version(self, card_id: str, version_data: Dict[str, Any]) -> str: 

330 """Add a new version to a card. 

331 

332 Note: The versioning system stores version data in the card's extra_data 

333 field since CardVersion is not a database model. For full versioning 

334 support, a CardVersion SQLAlchemy model would need to be created. 

335 """ 

336 version_id = version_data.get("id") or self.generate_id() 

337 

338 with self.session as session: 

339 card = session.query(NewsCard).filter_by(id=card_id).first() 

340 if not card: 

341 raise ValueError(f"Card {card_id} not found") 

342 

343 # Get current version count from extra_data 

344 extra_data = card.extra_data or {} 

345 versions = extra_data.get("versions", []) 

346 version_number = len(versions) + 1 

347 

348 # Create version record 

349 version_record = { 

350 "id": version_id, 

351 "version_number": version_number, 

352 "search_query": version_data.get("search_query"), 

353 "headline": version_data.get("headline"), 

354 "summary": version_data.get("summary"), 

355 "findings": version_data.get("findings"), 

356 "sources": version_data.get("sources"), 

357 "impact_score": version_data.get("impact_score"), 

358 "topics": version_data.get("topics"), 

359 "entities": version_data.get("entities"), 

360 "created_at": datetime.now(timezone.utc).isoformat(), 

361 } 

362 

363 versions.append(version_record) 

364 extra_data["versions"] = versions 

365 extra_data["latest_version"] = version_record 

366 

367 # Update card fields with latest version info 

368 if version_data.get("headline"): 368 ↛ 370line 368 didn't jump to line 370 because the condition on line 368 was always true

369 card.title = version_data["headline"] 

370 if version_data.get("summary"): 

371 card.summary = version_data["summary"] 

372 

373 card.extra_data = extra_data 

374 session.commit() 

375 

376 logger.info(f"Added version {version_number} to card {card_id}") 

377 return version_id 

378 

379 def update_latest_info( 

380 self, card_id: str, version_data: Dict[str, Any] 

381 ) -> bool: 

382 """Update the denormalized latest version info on the card. 

383 

384 Updates the card's main fields with the latest version data. 

385 """ 

386 with self.session as session: 

387 card = session.query(NewsCard).filter_by(id=card_id).first() 

388 if not card: 

389 return False 

390 

391 # Update card's display fields 

392 if version_data.get("headline"): 392 ↛ 394line 392 didn't jump to line 394 because the condition on line 392 was always true

393 card.title = version_data["headline"] 

394 if version_data.get("summary"): 394 ↛ 398line 394 didn't jump to line 398 because the condition on line 394 was always true

395 card.summary = version_data["summary"] 

396 

397 # Store version metadata in extra_data 

398 extra_data = card.extra_data or {} 

399 extra_data["latest_version"] = { 

400 "id": version_data.get("id"), 

401 "headline": version_data.get("headline"), 

402 "summary": version_data.get("summary"), 

403 "impact_score": version_data.get("impact_score"), 

404 } 

405 card.extra_data = extra_data 

406 

407 session.commit() 

408 return True 

409 

410 def archive_card(self, card_id: str) -> bool: 

411 """Archive a card""" 

412 return self.update(card_id, {"is_archived": True}) 

413 

414 def pin_card(self, card_id: str, pinned: bool = True) -> bool: 

415 """Pin or unpin a card""" 

416 return self.update(card_id, {"is_pinned": pinned})