Coverage for src / local_deep_research / news / rating_system / storage.py: 0%

92 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2SQLAlchemy storage implementation for ratings. 

3""" 

4 

5from typing import List, Optional, Dict, Any 

6from sqlalchemy.orm import Session 

7from sqlalchemy import desc 

8 

9from ..core.storage import RatingStorage 

10from ...database.models.news import RatingType, UserRating 

11 

12 

13class SQLRatingStorage(RatingStorage): 

14 """SQLAlchemy implementation of rating storage""" 

15 

16 def __init__(self, session: Session): 

17 """Initialize with a database session from the user's encrypted database""" 

18 if not session: 

19 raise ValueError("Session is required for SQLRatingStorage") 

20 self._session = session 

21 

22 @property 

23 def session(self): 

24 """Get database session""" 

25 return self._session 

26 

27 def create(self, data: Dict[str, Any]) -> str: 

28 """Create a new rating""" 

29 with self.session as session: 

30 rating = UserRating( 

31 user_id=data["user_id"], 

32 item_id=data["item_id"], 

33 item_type=data.get("item_type", "card"), 

34 relevance_vote=data.get( 

35 "rating_value" 

36 ), # Map rating_value to relevance_vote 

37 quality_rating=data.get("quality_rating"), 

38 ) 

39 

40 session.add(rating) 

41 session.commit() 

42 

43 return str(rating.id) 

44 

45 def get(self, id: str) -> Optional[Dict[str, Any]]: 

46 """Get a rating by ID""" 

47 with self.session as session: 

48 rating = session.query(UserRating).filter_by(id=int(id)).first() 

49 if not rating: 

50 return None 

51 return { 

52 "id": rating.id, 

53 "user_id": rating.user_id, 

54 "item_id": rating.item_id, 

55 "item_type": rating.item_type, 

56 "relevance_vote": rating.relevance_vote, 

57 "quality_rating": rating.quality_rating, 

58 "created_at": rating.created_at, 

59 "updated_at": rating.updated_at, 

60 } 

61 

62 def update(self, id: str, data: Dict[str, Any]) -> bool: 

63 """Update a rating""" 

64 with self.session as session: 

65 rating = session.query(UserRating).filter_by(id=int(id)).first() 

66 if not rating: 

67 return False 

68 

69 # Update allowed fields 

70 if "rating_value" in data: 

71 rating.rating_value = data["rating_value"] 

72 if "comment" in data: 

73 rating.comment = data["comment"] 

74 

75 session.commit() 

76 return True 

77 

78 def delete(self, id: str) -> bool: 

79 """Delete a rating""" 

80 with self.session as session: 

81 rating = session.query(UserRating).filter_by(id=int(id)).first() 

82 if not rating: 

83 return False 

84 

85 session.delete(rating) 

86 session.commit() 

87 return True 

88 

89 def list( 

90 self, 

91 filters: Optional[Dict[str, Any]] = None, 

92 limit: int = 100, 

93 offset: int = 0, 

94 ) -> List[Dict[str, Any]]: 

95 """List ratings with optional filtering""" 

96 with self.session as session: 

97 query = session.query(UserRating) 

98 

99 if filters: 

100 if "user_id" in filters: 

101 query = query.filter_by(user_id=filters["user_id"]) 

102 if "item_id" in filters: 

103 query = query.filter_by(item_id=filters["item_id"]) 

104 if "item_type" in filters: 

105 query = query.filter_by(item_type=filters["item_type"]) 

106 # For backward compatibility, also check card_id 

107 if "card_id" in filters: 

108 query = query.filter_by(item_id=filters["card_id"]) 

109 

110 ratings = ( 

111 query.order_by(desc(UserRating.created_at)) 

112 .limit(limit) 

113 .offset(offset) 

114 .all() 

115 ) 

116 return [ 

117 { 

118 "id": rating.id, 

119 "user_id": rating.user_id, 

120 "item_id": rating.item_id, 

121 "item_type": rating.item_type, 

122 "relevance_vote": rating.relevance_vote, 

123 "quality_rating": rating.quality_rating, 

124 "created_at": rating.created_at, 

125 "updated_at": rating.updated_at, 

126 } 

127 for rating in ratings 

128 ] 

129 

130 def get_user_rating( 

131 self, user_id: str, item_id: str, rating_type: str 

132 ) -> Optional[Dict[str, Any]]: 

133 """Get a user's rating for a specific item""" 

134 with self.session as session: 

135 query = session.query(UserRating).filter_by( 

136 user_id=user_id, rating_type=RatingType(rating_type) 

137 ) 

138 

139 # Check both card and news item IDs 

140 rating = query.filter( 

141 (UserRating.card_id == item_id) 

142 | (UserRating.news_item_id == item_id) 

143 ).first() 

144 

145 return rating.to_dict() if rating else None 

146 

147 def upsert_rating( 

148 self, 

149 user_id: str, 

150 item_id: str, 

151 rating_type: str, 

152 rating_value: str, 

153 item_type: str = "card", 

154 ) -> str: 

155 """Create or update a rating""" 

156 # Check if rating exists 

157 existing = self.get_user_rating(user_id, item_id, rating_type) 

158 

159 if existing: 

160 # Update existing rating 

161 self.update(str(existing["id"]), {"rating_value": rating_value}) 

162 return str(existing["id"]) 

163 else: 

164 # Create new rating 

165 data = { 

166 "user_id": user_id, 

167 "item_id": item_id, 

168 "item_type": item_type, 

169 "rating_type": rating_type, 

170 "rating_value": rating_value, 

171 } 

172 return self.create(data) 

173 

174 def get_ratings_summary( 

175 self, item_id: str, item_type: str = "card" 

176 ) -> Dict[str, Any]: 

177 """Get aggregated ratings for an item""" 

178 with self.session as session: 

179 # Build query based on item type 

180 if item_type == "card": 

181 base_query = session.query(UserRating).filter_by( 

182 card_id=item_id 

183 ) 

184 else: 

185 base_query = session.query(UserRating).filter_by( 

186 news_item_id=item_id 

187 ) 

188 

189 # Get quality ratings 

190 quality_ratings = base_query.filter_by( 

191 rating_type=RatingType.QUALITY 

192 ).all() 

193 quality_values = [ 

194 int(r.rating_value) 

195 for r in quality_ratings 

196 if r.rating_value.isdigit() 

197 ] 

198 

199 # Get relevance ratings 

200 relevance_ratings = base_query.filter_by( 

201 rating_type=RatingType.RELEVANCE 

202 ).all() 

203 up_votes = sum( 

204 1 for r in relevance_ratings if r.rating_value == "up" 

205 ) 

206 down_votes = sum( 

207 1 for r in relevance_ratings if r.rating_value == "down" 

208 ) 

209 

210 return { 

211 "item_id": item_id, 

212 "item_type": item_type, 

213 "quality": { 

214 "count": len(quality_values), 

215 "average": sum(quality_values) / len(quality_values) 

216 if quality_values 

217 else 0, 

218 "distribution": self._get_rating_distribution( 

219 quality_values 

220 ), 

221 }, 

222 "relevance": { 

223 "up_votes": up_votes, 

224 "down_votes": down_votes, 

225 "net_score": up_votes - down_votes, 

226 }, 

227 } 

228 

229 def get_user_ratings( 

230 self, user_id: str, rating_type: Optional[str] = None, limit: int = 100 

231 ) -> List[Dict[str, Any]]: 

232 """Get all ratings by a user""" 

233 filters = {"user_id": user_id} 

234 if rating_type: 

235 filters["rating_type"] = rating_type 

236 

237 return self.list(filters, limit) 

238 

239 def _get_rating_distribution(self, ratings: List[int]) -> Dict[int, int]: 

240 """Get distribution of ratings (1-5)""" 

241 distribution = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} 

242 for rating in ratings: 

243 if 1 <= rating <= 5: 

244 distribution[rating] += 1 

245 return distribution