Coverage for src / local_deep_research / news / folder_manager.py: 14%

82 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Simple folder manager for subscription organization. 

3Handles folder CRUD operations for the API routes. 

4""" 

5 

6from typing import List, Optional, Dict, Any 

7from datetime import datetime, timezone 

8from sqlalchemy.orm import Session 

9from ..database.models import ( 

10 SubscriptionFolder, 

11 NewsSubscription as BaseSubscription, 

12) 

13 

14 

15class FolderManager: 

16 """Manages subscription folders and organization.""" 

17 

18 def __init__(self, session: Session): 

19 self.session = session 

20 

21 def get_user_folders(self, user_id: str) -> List[SubscriptionFolder]: 

22 """Get all folders for a user (user_id parameter kept for API compatibility).""" 

23 # In per-user databases, all folders belong to the current user 

24 return ( 

25 self.session.query(SubscriptionFolder) 

26 .order_by(SubscriptionFolder.name) 

27 .all() 

28 ) 

29 

30 def create_folder( 

31 self, name: str, description: Optional[str] = None 

32 ) -> SubscriptionFolder: 

33 """Create a new folder.""" 

34 import uuid 

35 

36 folder = SubscriptionFolder( 

37 id=str(uuid.uuid4()), 

38 name=name, 

39 description=description, 

40 ) 

41 self.session.add(folder) 

42 self.session.commit() 

43 return folder 

44 

45 def update_folder( 

46 self, folder_id: int, **kwargs 

47 ) -> Optional[SubscriptionFolder]: 

48 """Update a folder.""" 

49 folder = self.session.query(SubscriptionFolder).get(folder_id) 

50 if not folder: 

51 return None 

52 

53 for key, value in kwargs.items(): 

54 if hasattr(folder, key) and key not in [ 

55 "id", 

56 "created_at", 

57 ]: 

58 setattr(folder, key, value) 

59 

60 folder.updated_at = datetime.now(timezone.utc) 

61 self.session.commit() 

62 return folder 

63 

64 def delete_folder( 

65 self, folder_id: int, move_to: Optional[str] = None 

66 ) -> bool: 

67 """Delete a folder, optionally moving subscriptions to another folder.""" 

68 folder = self.session.query(SubscriptionFolder).get(folder_id) 

69 if not folder: 

70 return False 

71 

72 # Move subscriptions if specified 

73 if move_to: 

74 self.session.query(BaseSubscription).filter_by( 

75 folder=folder.name 

76 ).update({"folder": move_to}) 

77 else: 

78 # Set to None if no target folder 

79 self.session.query(BaseSubscription).filter_by( 

80 folder=folder.name 

81 ).update({"folder": None}) 

82 

83 self.session.delete(folder) 

84 self.session.commit() 

85 return True 

86 

87 def get_subscriptions_by_folder(self, user_id: str) -> Dict[str, Any]: 

88 """Get subscriptions organized by folder.""" 

89 folders = self.get_user_folders(user_id) 

90 

91 result = {"folders": [], "uncategorized": []} 

92 

93 # Get subscriptions for each folder 

94 for folder in folders: 

95 subs = ( 

96 self.session.query(BaseSubscription) 

97 .filter_by(folder=folder.name, status="active") 

98 .all() 

99 ) 

100 

101 result["folders"].append( 

102 { 

103 "folder": folder.to_dict(), 

104 "subscriptions": [self._sub_to_dict(s) for s in subs], 

105 } 

106 ) 

107 

108 # Get uncategorized subscriptions 

109 uncategorized = ( 

110 self.session.query(BaseSubscription) 

111 .filter_by(folder=None, status="active") 

112 .all() 

113 ) 

114 

115 result["uncategorized"] = [self._sub_to_dict(s) for s in uncategorized] 

116 

117 return result 

118 

119 def update_subscription( 

120 self, subscription_id: str, **kwargs 

121 ) -> Optional[BaseSubscription]: 

122 """Update a subscription.""" 

123 sub = ( 

124 self.session.query(BaseSubscription) 

125 .filter_by(id=subscription_id) 

126 .first() 

127 ) 

128 if not sub: 

129 return None 

130 

131 for key, value in kwargs.items(): 

132 if hasattr(sub, key) and key not in ["id", "created_at"]: 

133 setattr(sub, key, value) 

134 

135 # Recalculate next_refresh if refresh_interval_minutes changed 

136 if "refresh_interval_minutes" in kwargs: 

137 from datetime import timedelta 

138 from loguru import logger 

139 

140 old_next = sub.next_refresh 

141 new_minutes = kwargs["refresh_interval_minutes"] 

142 

143 if sub.last_refresh: 

144 sub.next_refresh = sub.last_refresh + timedelta( 

145 minutes=new_minutes 

146 ) 

147 logger.info( 

148 f"Updated subscription {sub.id} next_refresh based on last_refresh: {sub.last_refresh} + {new_minutes}min = {sub.next_refresh}" 

149 ) 

150 else: 

151 # If no last_refresh, calculate from now 

152 sub.next_refresh = datetime.now(timezone.utc) + timedelta( 

153 minutes=new_minutes 

154 ) 

155 logger.info( 

156 f"Updated subscription {sub.id} next_refresh from now: {sub.next_refresh}" 

157 ) 

158 

159 logger.info( 

160 f"Subscription {sub.id} next_refresh changed from {old_next} to {sub.next_refresh}" 

161 ) 

162 

163 sub.updated_at = datetime.now(timezone.utc) 

164 self.session.commit() 

165 return sub 

166 

167 def delete_subscription(self, subscription_id: str) -> bool: 

168 """Delete a subscription.""" 

169 sub = ( 

170 self.session.query(BaseSubscription) 

171 .filter_by(id=subscription_id) 

172 .first() 

173 ) 

174 if not sub: 

175 return False 

176 

177 self.session.delete(sub) 

178 self.session.commit() 

179 return True 

180 

181 def get_subscription_stats(self, user_id: str) -> Dict[str, Any]: 

182 """Get subscription statistics for a user (user_id kept for API compatibility).""" 

183 # In per-user databases, all data belongs to the current user 

184 total = self.session.query(BaseSubscription).count() 

185 

186 active = ( 

187 self.session.query(BaseSubscription) 

188 .filter_by(status="active") 

189 .count() 

190 ) 

191 

192 by_type = {} 

193 for sub_type in ["search", "topic"]: 

194 count = ( 

195 self.session.query(BaseSubscription) 

196 .filter_by(subscription_type=sub_type, status="active") 

197 .count() 

198 ) 

199 by_type[sub_type] = count 

200 

201 return { 

202 "total": total, 

203 "active": active, 

204 "by_type": by_type, 

205 "folders": len(self.get_user_folders(user_id)), 

206 } 

207 

208 def _sub_to_dict(self, sub: BaseSubscription) -> Dict[str, Any]: 

209 """Convert subscription to dictionary.""" 

210 return { 

211 "id": sub.id, 

212 "type": sub.subscription_type, 

213 "query_or_topic": sub.query_or_topic, 

214 "created_at": sub.created_at.isoformat() 

215 if sub.created_at 

216 else None, 

217 "last_refresh": sub.last_refresh.isoformat() 

218 if sub.last_refresh 

219 else None, 

220 "next_refresh": sub.next_refresh.isoformat() 

221 if sub.next_refresh 

222 else None, 

223 "refresh_interval_minutes": sub.refresh_interval_minutes, 

224 "status": sub.status, 

225 }