Coverage for src / local_deep_research / news / rating_system / storage.py: 97%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2SQLAlchemy storage implementation for ratings. 

3""" 

4 

5from typing import List, Optional, Dict, Any 

6from sqlalchemy.orm import Session 

7from sqlalchemy import desc 

8 

9from ..core.storage import RatingStorage 

10from ...database.models.news import RatingType, UserRating 

11 

12 

13class SQLRatingStorage(RatingStorage): 

14 """SQLAlchemy implementation of rating storage""" 

15 

16 def __init__(self, session: Session): 

17 """Initialize with a database session from the user's encrypted database""" 

18 if not session: 

19 raise ValueError("Session is required for SQLRatingStorage") 

20 self._session = session 

21 

22 @property 

23 def session(self): 

24 """Get database session""" 

25 return self._session 

26 

27 def create(self, data: Dict[str, Any]) -> str: 

28 """Create a new rating""" 

29 with self.session as session: 

30 rating = UserRating( 

31 user_id=data["user_id"], 

32 item_id=data["item_id"], 

33 item_type=data.get("item_type", "card"), 

34 relevance_vote=data.get( 

35 "rating_value" 

36 ), # Map rating_value to relevance_vote 

37 quality_rating=data.get("quality_rating"), 

38 ) 

39 

40 session.add(rating) 

41 session.commit() 

42 

43 return str(rating.id) 

44 

45 def get(self, id: str) -> Optional[Dict[str, Any]]: 

46 """Get a rating by ID""" 

47 with self.session as session: 

48 rating = session.query(UserRating).filter_by(id=int(id)).first() 

49 if not rating: 

50 return None 

51 return { 

52 "id": rating.id, 

53 "user_id": rating.user_id, 

54 "item_id": rating.item_id, 

55 "item_type": rating.item_type, 

56 "relevance_vote": rating.relevance_vote, 

57 "quality_rating": rating.quality_rating, 

58 "created_at": rating.created_at, 

59 "updated_at": rating.updated_at, 

60 } 

61 

62 def update(self, id: str, data: Dict[str, Any]) -> bool: 

63 """Update a rating""" 

64 with self.session as session: 

65 rating = session.query(UserRating).filter_by(id=int(id)).first() 

66 if not rating: 

67 return False 

68 

69 # Update allowed fields 

70 if "rating_value" in data: 

71 rating.rating_value = data["rating_value"] 

72 if "comment" in data: 

73 rating.comment = data["comment"] 

74 

75 session.commit() 

76 return True 

77 

78 def delete(self, id: str) -> bool: 

79 """Delete a rating""" 

80 with self.session as session: 

81 rating = session.query(UserRating).filter_by(id=int(id)).first() 

82 if not rating: 

83 return False 

84 

85 session.delete(rating) 

86 session.commit() 

87 return True 

88 

89 def list( 

90 self, 

91 filters: Optional[Dict[str, Any]] = None, 

92 limit: int = 100, 

93 offset: int = 0, 

94 ) -> List[Dict[str, Any]]: 

95 """List ratings with optional filtering""" 

96 with self.session as session: 

97 query = session.query(UserRating) 

98 

99 if filters: 

100 if "user_id" in filters: 

101 query = query.filter_by(user_id=filters["user_id"]) 

102 if "item_id" in filters: 

103 query = query.filter_by(item_id=filters["item_id"]) 

104 if "item_type" in filters: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 query = query.filter_by(item_type=filters["item_type"]) 

106 # For backward compatibility, also check card_id 

107 if "card_id" in filters: 

108 query = query.filter_by(item_id=filters["card_id"]) 

109 

110 ratings = ( 

111 query.order_by(desc(UserRating.created_at)) 

112 .limit(limit) 

113 .offset(offset) 

114 .all() 

115 ) 

116 return [ 

117 { 

118 "id": rating.id, 

119 "user_id": rating.user_id, 

120 "item_id": rating.item_id, 

121 "item_type": rating.item_type, 

122 "relevance_vote": rating.relevance_vote, 

123 "quality_rating": rating.quality_rating, 

124 "created_at": rating.created_at, 

125 "updated_at": rating.updated_at, 

126 } 

127 for rating in ratings 

128 ] 

129 

130 def get_user_rating( 

131 self, user_id: str, item_id: str, rating_type: str 

132 ) -> Optional[Dict[str, Any]]: 

133 """Get a user's rating for a specific item""" 

134 with self.session as session: 

135 query = session.query(UserRating).filter_by( 

136 user_id=user_id, rating_type=RatingType(rating_type) 

137 ) 

138 

139 # Check both card and news item IDs 

140 rating = query.filter( 

141 (UserRating.card_id == item_id) 

142 | (UserRating.news_item_id == item_id) 

143 ).first() 

144 

145 return rating.to_dict() if rating else None 

146 

147 def upsert_rating( 

148 self, 

149 user_id: str, 

150 item_id: str, 

151 rating_type: str, 

152 rating_value: str, 

153 item_type: str = "card", 

154 ) -> str: 

155 """Create or update a rating""" 

156 # Check if rating exists 

157 existing = self.get_user_rating(user_id, item_id, rating_type) 

158 

159 if existing: 

160 # Update existing rating 

161 self.update(str(existing["id"]), {"rating_value": rating_value}) 

162 return str(existing["id"]) 

163 # Create new rating 

164 data = { 

165 "user_id": user_id, 

166 "item_id": item_id, 

167 "item_type": item_type, 

168 "rating_type": rating_type, 

169 "rating_value": rating_value, 

170 } 

171 return self.create(data) 

172 

173 def get_ratings_summary( 

174 self, item_id: str, item_type: str = "card" 

175 ) -> Dict[str, Any]: 

176 """Get aggregated ratings for an item""" 

177 with self.session as session: 

178 # Build query based on item type 

179 if item_type == "card": 179 ↛ 184line 179 didn't jump to line 184 because the condition on line 179 was always true

180 base_query = session.query(UserRating).filter_by( 

181 card_id=item_id 

182 ) 

183 else: 

184 base_query = session.query(UserRating).filter_by( 

185 news_item_id=item_id 

186 ) 

187 

188 # Get quality ratings 

189 quality_ratings = base_query.filter_by( 

190 rating_type=RatingType.QUALITY 

191 ).all() 

192 quality_values = [ 

193 int(r.rating_value) 

194 for r in quality_ratings 

195 if r.rating_value.isdigit() 

196 ] 

197 

198 # Get relevance ratings 

199 relevance_ratings = base_query.filter_by( 

200 rating_type=RatingType.RELEVANCE 

201 ).all() 

202 up_votes = sum( 

203 1 for r in relevance_ratings if r.rating_value == "up" 

204 ) 

205 down_votes = sum( 

206 1 for r in relevance_ratings if r.rating_value == "down" 

207 ) 

208 

209 return { 

210 "item_id": item_id, 

211 "item_type": item_type, 

212 "quality": { 

213 "count": len(quality_values), 

214 "average": sum(quality_values) / len(quality_values) 

215 if quality_values 

216 else 0, 

217 "distribution": self._get_rating_distribution( 

218 quality_values 

219 ), 

220 }, 

221 "relevance": { 

222 "up_votes": up_votes, 

223 "down_votes": down_votes, 

224 "net_score": up_votes - down_votes, 

225 }, 

226 } 

227 

228 def get_user_ratings( 

229 self, user_id: str, rating_type: Optional[str] = None, limit: int = 100 

230 ) -> List[Dict[str, Any]]: 

231 """Get all ratings by a user""" 

232 filters = {"user_id": user_id} 

233 if rating_type: 

234 filters["rating_type"] = rating_type 

235 

236 return self.list(filters, limit) 

237 

238 def _get_rating_distribution(self, ratings: List[int]) -> Dict[int, int]: 

239 """Get distribution of ratings (1-5)""" 

240 distribution = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} 

241 for rating in ratings: 

242 if 1 <= rating <= 5: 

243 distribution[rating] += 1 

244 return distribution