Coverage for src / local_deep_research / news / rating_system / storage.py: 97%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2SQLAlchemy storage implementation for ratings.
3"""
5from typing import List, Optional, Dict, Any
6from sqlalchemy.orm import Session
7from sqlalchemy import desc
9from ..core.storage import RatingStorage
10from ...database.models.news import RatingType, UserRating
13class SQLRatingStorage(RatingStorage):
14 """SQLAlchemy implementation of rating storage"""
16 def __init__(self, session: Session):
17 """Initialize with a database session from the user's encrypted database"""
18 if not session:
19 raise ValueError("Session is required for SQLRatingStorage")
20 self._session = session
22 @property
23 def session(self):
24 """Get database session"""
25 return self._session
27 def create(self, data: Dict[str, Any]) -> str:
28 """Create a new rating"""
29 with self.session as session:
30 rating = UserRating(
31 user_id=data["user_id"],
32 item_id=data["item_id"],
33 item_type=data.get("item_type", "card"),
34 relevance_vote=data.get(
35 "rating_value"
36 ), # Map rating_value to relevance_vote
37 quality_rating=data.get("quality_rating"),
38 )
40 session.add(rating)
41 session.commit()
43 return str(rating.id)
45 def get(self, id: str) -> Optional[Dict[str, Any]]:
46 """Get a rating by ID"""
47 with self.session as session:
48 rating = session.query(UserRating).filter_by(id=int(id)).first()
49 if not rating:
50 return None
51 return {
52 "id": rating.id,
53 "user_id": rating.user_id,
54 "item_id": rating.item_id,
55 "item_type": rating.item_type,
56 "relevance_vote": rating.relevance_vote,
57 "quality_rating": rating.quality_rating,
58 "created_at": rating.created_at,
59 "updated_at": rating.updated_at,
60 }
62 def update(self, id: str, data: Dict[str, Any]) -> bool:
63 """Update a rating"""
64 with self.session as session:
65 rating = session.query(UserRating).filter_by(id=int(id)).first()
66 if not rating:
67 return False
69 # Update allowed fields
70 if "rating_value" in data:
71 rating.rating_value = data["rating_value"]
72 if "comment" in data:
73 rating.comment = data["comment"]
75 session.commit()
76 return True
78 def delete(self, id: str) -> bool:
79 """Delete a rating"""
80 with self.session as session:
81 rating = session.query(UserRating).filter_by(id=int(id)).first()
82 if not rating:
83 return False
85 session.delete(rating)
86 session.commit()
87 return True
89 def list(
90 self,
91 filters: Optional[Dict[str, Any]] = None,
92 limit: int = 100,
93 offset: int = 0,
94 ) -> List[Dict[str, Any]]:
95 """List ratings with optional filtering"""
96 with self.session as session:
97 query = session.query(UserRating)
99 if filters:
100 if "user_id" in filters:
101 query = query.filter_by(user_id=filters["user_id"])
102 if "item_id" in filters:
103 query = query.filter_by(item_id=filters["item_id"])
104 if "item_type" in filters: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 query = query.filter_by(item_type=filters["item_type"])
106 # For backward compatibility, also check card_id
107 if "card_id" in filters:
108 query = query.filter_by(item_id=filters["card_id"])
110 ratings = (
111 query.order_by(desc(UserRating.created_at))
112 .limit(limit)
113 .offset(offset)
114 .all()
115 )
116 return [
117 {
118 "id": rating.id,
119 "user_id": rating.user_id,
120 "item_id": rating.item_id,
121 "item_type": rating.item_type,
122 "relevance_vote": rating.relevance_vote,
123 "quality_rating": rating.quality_rating,
124 "created_at": rating.created_at,
125 "updated_at": rating.updated_at,
126 }
127 for rating in ratings
128 ]
130 def get_user_rating(
131 self, user_id: str, item_id: str, rating_type: str
132 ) -> Optional[Dict[str, Any]]:
133 """Get a user's rating for a specific item"""
134 with self.session as session:
135 query = session.query(UserRating).filter_by(
136 user_id=user_id, rating_type=RatingType(rating_type)
137 )
139 # Check both card and news item IDs
140 rating = query.filter(
141 (UserRating.card_id == item_id)
142 | (UserRating.news_item_id == item_id)
143 ).first()
145 return rating.to_dict() if rating else None
147 def upsert_rating(
148 self,
149 user_id: str,
150 item_id: str,
151 rating_type: str,
152 rating_value: str,
153 item_type: str = "card",
154 ) -> str:
155 """Create or update a rating"""
156 # Check if rating exists
157 existing = self.get_user_rating(user_id, item_id, rating_type)
159 if existing:
160 # Update existing rating
161 self.update(str(existing["id"]), {"rating_value": rating_value})
162 return str(existing["id"])
163 # Create new rating
164 data = {
165 "user_id": user_id,
166 "item_id": item_id,
167 "item_type": item_type,
168 "rating_type": rating_type,
169 "rating_value": rating_value,
170 }
171 return self.create(data)
173 def get_ratings_summary(
174 self, item_id: str, item_type: str = "card"
175 ) -> Dict[str, Any]:
176 """Get aggregated ratings for an item"""
177 with self.session as session:
178 # Build query based on item type
179 if item_type == "card": 179 ↛ 184line 179 didn't jump to line 184 because the condition on line 179 was always true
180 base_query = session.query(UserRating).filter_by(
181 card_id=item_id
182 )
183 else:
184 base_query = session.query(UserRating).filter_by(
185 news_item_id=item_id
186 )
188 # Get quality ratings
189 quality_ratings = base_query.filter_by(
190 rating_type=RatingType.QUALITY
191 ).all()
192 quality_values = [
193 int(r.rating_value)
194 for r in quality_ratings
195 if r.rating_value.isdigit()
196 ]
198 # Get relevance ratings
199 relevance_ratings = base_query.filter_by(
200 rating_type=RatingType.RELEVANCE
201 ).all()
202 up_votes = sum(
203 1 for r in relevance_ratings if r.rating_value == "up"
204 )
205 down_votes = sum(
206 1 for r in relevance_ratings if r.rating_value == "down"
207 )
209 return {
210 "item_id": item_id,
211 "item_type": item_type,
212 "quality": {
213 "count": len(quality_values),
214 "average": sum(quality_values) / len(quality_values)
215 if quality_values
216 else 0,
217 "distribution": self._get_rating_distribution(
218 quality_values
219 ),
220 },
221 "relevance": {
222 "up_votes": up_votes,
223 "down_votes": down_votes,
224 "net_score": up_votes - down_votes,
225 },
226 }
228 def get_user_ratings(
229 self, user_id: str, rating_type: Optional[str] = None, limit: int = 100
230 ) -> List[Dict[str, Any]]:
231 """Get all ratings by a user"""
232 filters = {"user_id": user_id}
233 if rating_type:
234 filters["rating_type"] = rating_type
236 return self.list(filters, limit)
238 def _get_rating_distribution(self, ratings: List[int]) -> Dict[int, int]:
239 """Get distribution of ratings (1-5)"""
240 distribution = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
241 for rating in ratings:
242 if 1 <= rating <= 5:
243 distribution[rating] += 1
244 return distribution