Coverage for src / local_deep_research / news / core / card_storage.py: 16%

108 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2SQLAlchemy storage implementation for news cards. 

3""" 

4 

5from typing import List, Optional, Dict, Any 

6from sqlalchemy.orm import Session 

7from sqlalchemy import desc 

8from loguru import logger 

9 

10from .storage import CardStorage 

11from ...database.models.news import NewsCard 

12from .base_card import CardVersion 

13 

14 

15class SQLCardStorage(CardStorage): 

16 """SQLAlchemy implementation of card storage""" 

17 

18 def __init__(self, session: Session): 

19 """Initialize with a database session from the user's encrypted database""" 

20 if not session: 

21 raise ValueError("Session is required for SQLCardStorage") 

22 self._session = session 

23 

24 @property 

25 def session(self): 

26 """Get database session""" 

27 return self._session 

28 

29 def create(self, data: Dict[str, Any]) -> str: 

30 """Create a new card""" 

31 card_id = data.get("id") or self.generate_id() 

32 

33 # Extract source info if it's nested 

34 source_info = data.get("source", {}) 

35 if isinstance(source_info, dict): 

36 source_type = source_info.get("type") 

37 source_id = source_info.get("source_id") 

38 created_from = source_info.get("created_from") 

39 else: 

40 source_type = data.get("source_type") 

41 source_id = data.get("source_id") 

42 created_from = data.get("created_from") 

43 

44 with self.session as session: 

45 card = NewsCard( 

46 id=card_id, 

47 user_id=data["user_id"], 

48 topic=data["topic"], 

49 card_type=data.get("card_type", data.get("type", "news")), 

50 source_type=source_type, 

51 source_id=source_id, 

52 created_from=created_from, 

53 parent_card_id=data.get("parent_card_id"), 

54 ) 

55 

56 session.add(card) 

57 session.commit() 

58 

59 logger.info(f"Created card {card_id} for user {data['user_id']}") 

60 return card_id 

61 

62 def get(self, id: str) -> Optional[Dict[str, Any]]: 

63 """Get a card by ID""" 

64 with self.session as session: 

65 card = session.query(NewsCard).filter_by(id=id).first() 

66 return card.to_dict() if card else None 

67 

68 def update(self, id: str, data: Dict[str, Any]) -> bool: 

69 """Update a card""" 

70 with self.session as session: 

71 card = session.query(NewsCard).filter_by(id=id).first() 

72 if not card: 

73 return False 

74 

75 # Update allowed fields 

76 updateable_fields = ["is_archived", "is_pinned", "last_viewed"] 

77 for field in updateable_fields: 

78 if field in data: 

79 setattr(card, field, data[field]) 

80 

81 session.commit() 

82 return True 

83 

84 def delete(self, id: str) -> bool: 

85 """Delete a card (and all its versions due to cascade)""" 

86 with self.session as session: 

87 card = session.query(NewsCard).filter_by(id=id).first() 

88 if not card: 

89 return False 

90 

91 session.delete(card) 

92 session.commit() 

93 return True 

94 

95 def list( 

96 self, 

97 filters: Optional[Dict[str, Any]] = None, 

98 limit: int = 100, 

99 offset: int = 0, 

100 ) -> List[Dict[str, Any]]: 

101 """List cards with optional filtering""" 

102 with self.session as session: 

103 query = session.query(NewsCard) 

104 

105 if filters: 

106 if "user_id" in filters: 

107 query = query.filter_by(user_id=filters["user_id"]) 

108 if "card_type" in filters: 

109 query = query.filter_by(card_type=filters["card_type"]) 

110 if "is_archived" in filters: 

111 query = query.filter_by(is_archived=filters["is_archived"]) 

112 if "is_pinned" in filters: 

113 query = query.filter_by(is_pinned=filters["is_pinned"]) 

114 

115 # Order by creation date (newest first) 

116 query = query.order_by(desc(NewsCard.created_at)) 

117 

118 cards = query.limit(limit).offset(offset).all() 

119 return [card.to_dict() for card in cards] 

120 

121 def get_by_user( 

122 self, user_id: str, limit: int = 50, offset: int = 0 

123 ) -> List[Dict[str, Any]]: 

124 """Get cards for a specific user""" 

125 filters = {"user_id": user_id, "is_archived": False} 

126 return self.list(filters, limit, offset) 

127 

128 def get_latest_version(self, card_id: str) -> Optional[Dict[str, Any]]: 

129 """Get the latest version of a card""" 

130 with self.session as session: 

131 version = ( 

132 session.query(CardVersion) 

133 .filter_by(card_id=card_id) 

134 .order_by(desc(CardVersion.version_number)) 

135 .first() 

136 ) 

137 

138 return version.to_dict() if version else None 

139 

140 def add_version(self, card_id: str, version_data: Dict[str, Any]) -> str: 

141 """Add a new version to a card""" 

142 version_id = version_data.get("id") or self.generate_id() 

143 

144 with self.session as session: 

145 # Get the card and current version count 

146 card = session.query(NewsCard).filter_by(id=card_id).first() 

147 if not card: 

148 raise ValueError(f"Card {card_id} not found") 

149 

150 # Get next version number 

151 current_max = ( 

152 session.query(CardVersion).filter_by(card_id=card_id).count() 

153 ) 

154 version_number = current_max + 1 

155 

156 # Create new version 

157 version = CardVersion( 

158 id=version_id, 

159 card_id=card_id, 

160 version_number=version_number, 

161 search_query=version_data.get("search_query"), 

162 research_result=version_data.get("research_result"), 

163 headline=version_data.get("headline"), 

164 summary=version_data.get("summary"), 

165 findings=version_data.get("findings"), 

166 sources=version_data.get("sources"), 

167 impact_score=version_data.get("impact_score"), 

168 topics=version_data.get("topics"), 

169 entities=version_data.get("entities"), 

170 embedding=version_data.get("embedding"), 

171 ) 

172 

173 session.add(version) 

174 

175 # Update card's latest info 

176 card.latest_version_id = version_id 

177 card.latest_headline = version_data.get("headline") 

178 card.latest_summary = version_data.get("summary") 

179 card.latest_impact_score = version_data.get("impact_score") 

180 

181 session.commit() 

182 

183 logger.info(f"Added version {version_number} to card {card_id}") 

184 return version_id 

185 

186 def update_latest_info( 

187 self, card_id: str, version_data: Dict[str, Any] 

188 ) -> bool: 

189 """Update the denormalized latest version info on the card""" 

190 with self.session as session: 

191 card = session.query(NewsCard).filter_by(id=card_id).first() 

192 if not card: 

193 return False 

194 

195 card.latest_version_id = version_data.get("id") 

196 card.latest_headline = version_data.get("headline") 

197 card.latest_summary = version_data.get("summary") 

198 card.latest_impact_score = version_data.get("impact_score") 

199 

200 session.commit() 

201 return True 

202 

203 def archive_card(self, card_id: str) -> bool: 

204 """Archive a card""" 

205 return self.update(card_id, {"is_archived": True}) 

206 

207 def pin_card(self, card_id: str, pinned: bool = True) -> bool: 

208 """Pin or unpin a card""" 

209 return self.update(card_id, {"is_pinned": pinned})