Coverage for src / local_deep_research / news / rating_system / storage.py: 0%
92 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2SQLAlchemy storage implementation for ratings.
3"""
5from typing import List, Optional, Dict, Any
6from sqlalchemy.orm import Session
7from sqlalchemy import desc
9from ..core.storage import RatingStorage
10from ...database.models.news import RatingType, UserRating
13class SQLRatingStorage(RatingStorage):
14 """SQLAlchemy implementation of rating storage"""
16 def __init__(self, session: Session):
17 """Initialize with a database session from the user's encrypted database"""
18 if not session:
19 raise ValueError("Session is required for SQLRatingStorage")
20 self._session = session
22 @property
23 def session(self):
24 """Get database session"""
25 return self._session
27 def create(self, data: Dict[str, Any]) -> str:
28 """Create a new rating"""
29 with self.session as session:
30 rating = UserRating(
31 user_id=data["user_id"],
32 item_id=data["item_id"],
33 item_type=data.get("item_type", "card"),
34 relevance_vote=data.get(
35 "rating_value"
36 ), # Map rating_value to relevance_vote
37 quality_rating=data.get("quality_rating"),
38 )
40 session.add(rating)
41 session.commit()
43 return str(rating.id)
45 def get(self, id: str) -> Optional[Dict[str, Any]]:
46 """Get a rating by ID"""
47 with self.session as session:
48 rating = session.query(UserRating).filter_by(id=int(id)).first()
49 if not rating:
50 return None
51 return {
52 "id": rating.id,
53 "user_id": rating.user_id,
54 "item_id": rating.item_id,
55 "item_type": rating.item_type,
56 "relevance_vote": rating.relevance_vote,
57 "quality_rating": rating.quality_rating,
58 "created_at": rating.created_at,
59 "updated_at": rating.updated_at,
60 }
62 def update(self, id: str, data: Dict[str, Any]) -> bool:
63 """Update a rating"""
64 with self.session as session:
65 rating = session.query(UserRating).filter_by(id=int(id)).first()
66 if not rating:
67 return False
69 # Update allowed fields
70 if "rating_value" in data:
71 rating.rating_value = data["rating_value"]
72 if "comment" in data:
73 rating.comment = data["comment"]
75 session.commit()
76 return True
78 def delete(self, id: str) -> bool:
79 """Delete a rating"""
80 with self.session as session:
81 rating = session.query(UserRating).filter_by(id=int(id)).first()
82 if not rating:
83 return False
85 session.delete(rating)
86 session.commit()
87 return True
89 def list(
90 self,
91 filters: Optional[Dict[str, Any]] = None,
92 limit: int = 100,
93 offset: int = 0,
94 ) -> List[Dict[str, Any]]:
95 """List ratings with optional filtering"""
96 with self.session as session:
97 query = session.query(UserRating)
99 if filters:
100 if "user_id" in filters:
101 query = query.filter_by(user_id=filters["user_id"])
102 if "item_id" in filters:
103 query = query.filter_by(item_id=filters["item_id"])
104 if "item_type" in filters:
105 query = query.filter_by(item_type=filters["item_type"])
106 # For backward compatibility, also check card_id
107 if "card_id" in filters:
108 query = query.filter_by(item_id=filters["card_id"])
110 ratings = (
111 query.order_by(desc(UserRating.created_at))
112 .limit(limit)
113 .offset(offset)
114 .all()
115 )
116 return [
117 {
118 "id": rating.id,
119 "user_id": rating.user_id,
120 "item_id": rating.item_id,
121 "item_type": rating.item_type,
122 "relevance_vote": rating.relevance_vote,
123 "quality_rating": rating.quality_rating,
124 "created_at": rating.created_at,
125 "updated_at": rating.updated_at,
126 }
127 for rating in ratings
128 ]
130 def get_user_rating(
131 self, user_id: str, item_id: str, rating_type: str
132 ) -> Optional[Dict[str, Any]]:
133 """Get a user's rating for a specific item"""
134 with self.session as session:
135 query = session.query(UserRating).filter_by(
136 user_id=user_id, rating_type=RatingType(rating_type)
137 )
139 # Check both card and news item IDs
140 rating = query.filter(
141 (UserRating.card_id == item_id)
142 | (UserRating.news_item_id == item_id)
143 ).first()
145 return rating.to_dict() if rating else None
147 def upsert_rating(
148 self,
149 user_id: str,
150 item_id: str,
151 rating_type: str,
152 rating_value: str,
153 item_type: str = "card",
154 ) -> str:
155 """Create or update a rating"""
156 # Check if rating exists
157 existing = self.get_user_rating(user_id, item_id, rating_type)
159 if existing:
160 # Update existing rating
161 self.update(str(existing["id"]), {"rating_value": rating_value})
162 return str(existing["id"])
163 else:
164 # Create new rating
165 data = {
166 "user_id": user_id,
167 "item_id": item_id,
168 "item_type": item_type,
169 "rating_type": rating_type,
170 "rating_value": rating_value,
171 }
172 return self.create(data)
174 def get_ratings_summary(
175 self, item_id: str, item_type: str = "card"
176 ) -> Dict[str, Any]:
177 """Get aggregated ratings for an item"""
178 with self.session as session:
179 # Build query based on item type
180 if item_type == "card":
181 base_query = session.query(UserRating).filter_by(
182 card_id=item_id
183 )
184 else:
185 base_query = session.query(UserRating).filter_by(
186 news_item_id=item_id
187 )
189 # Get quality ratings
190 quality_ratings = base_query.filter_by(
191 rating_type=RatingType.QUALITY
192 ).all()
193 quality_values = [
194 int(r.rating_value)
195 for r in quality_ratings
196 if r.rating_value.isdigit()
197 ]
199 # Get relevance ratings
200 relevance_ratings = base_query.filter_by(
201 rating_type=RatingType.RELEVANCE
202 ).all()
203 up_votes = sum(
204 1 for r in relevance_ratings if r.rating_value == "up"
205 )
206 down_votes = sum(
207 1 for r in relevance_ratings if r.rating_value == "down"
208 )
210 return {
211 "item_id": item_id,
212 "item_type": item_type,
213 "quality": {
214 "count": len(quality_values),
215 "average": sum(quality_values) / len(quality_values)
216 if quality_values
217 else 0,
218 "distribution": self._get_rating_distribution(
219 quality_values
220 ),
221 },
222 "relevance": {
223 "up_votes": up_votes,
224 "down_votes": down_votes,
225 "net_score": up_votes - down_votes,
226 },
227 }
229 def get_user_ratings(
230 self, user_id: str, rating_type: Optional[str] = None, limit: int = 100
231 ) -> List[Dict[str, Any]]:
232 """Get all ratings by a user"""
233 filters = {"user_id": user_id}
234 if rating_type:
235 filters["rating_type"] = rating_type
237 return self.list(filters, limit)
239 def _get_rating_distribution(self, ratings: List[int]) -> Dict[int, int]:
240 """Get distribution of ratings (1-5)"""
241 distribution = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
242 for rating in ratings:
243 if 1 <= rating <= 5:
244 distribution[rating] += 1
245 return distribution