Coverage for src / local_deep_research / news / subscription_manager / topic_subscription.py: 23%

75 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Topic subscription - allows users to subscribe to specific news topics. 

3Topics are extracted from news analysis and can evolve over time. 

4""" 

5 

6from typing import Optional, Dict, Any, List 

7from loguru import logger 

8 

9from .base_subscription import BaseSubscription 

10from ..core.base_card import CardSource 

11from ..core.utils import utc_now 

12 

13 

14class TopicSubscription(BaseSubscription): 

15 """ 

16 Subscription to a specific news topic. 

17 Topics can be extracted from news or manually created. 

18 """ 

19 

20 def __init__( 

21 self, 

22 topic: str, 

23 user_id: str, 

24 refresh_interval_minutes: int = 240, # Default 4 hours 

25 source: Optional[CardSource] = None, 

26 related_topics: Optional[List[str]] = None, 

27 subscription_id: Optional[str] = None, 

28 ): 

29 """ 

30 Initialize a topic subscription. 

31 

32 Args: 

33 user_id: ID of the user 

34 topic: The topic to follow 

35 source: Source information (auto-created if not provided) 

36 refresh_interval_minutes: How often to check for updates in minutes 

37 related_topics: Other topics related to this one 

38 subscription_id: Optional ID 

39 """ 

40 # Create source if not provided 

41 if source is None: 

42 source = CardSource( 

43 type="news_topic", created_from=f"Topic subscription: {topic}" 

44 ) 

45 

46 super().__init__( 

47 user_id, source, topic, refresh_interval_minutes, subscription_id 

48 ) 

49 

50 self.topic = topic 

51 self.related_topics = related_topics or [] 

52 

53 # Set subscription type 

54 self.subscription_type = "topic" 

55 

56 # Track topic evolution 

57 self.topic_history = [topic] 

58 self.current_topic = topic 

59 

60 # Track when topic was last significantly active 

61 self.last_significant_activity = utc_now() 

62 self.activity_threshold = 3 # Min news items to be "active" 

63 

64 # Metadata specific to topic subscriptions 

65 self.metadata.update( 

66 { 

67 "subscription_type": "topic", 

68 "original_topic": topic, 

69 "is_trending": False, 

70 "topic_category": None, # Will be set by analyzer 

71 } 

72 ) 

73 

74 logger.info(f"Created topic subscription for: {topic}") 

75 

76 def get_subscription_type(self) -> str: 

77 """Return the subscription type identifier.""" 

78 return "topic_subscription" 

79 

80 def generate_search_query(self) -> str: 

81 """ 

82 Generate a search query for this topic. 

83 

84 Returns: 

85 str: The search query for finding news about this topic 

86 """ 

87 # Build query with main topic and related topics 

88 query_parts = [self.current_topic] 

89 

90 # Add some related topics for broader coverage 

91 if self.related_topics: 

92 # Add up to 2 related topics 

93 query_parts.extend(self.related_topics[:2]) 

94 

95 # Combine with news-specific terms 

96 base_query = " OR ".join(f'"{part}"' for part in query_parts) 

97 news_query = f"{base_query} latest news today developments breaking" 

98 

99 # Update any date placeholders with current date in user's timezone 

100 from ..core.utils import get_local_date_string 

101 

102 current_date = get_local_date_string() 

103 

104 # Replace YYYY-MM-DD placeholder ONLY (not all dates) 

105 news_query = news_query.replace("YYYY-MM-DD", current_date) 

106 

107 logger.debug(f"Generated topic query: {news_query}") 

108 return news_query 

109 

110 def update_activity( 

111 self, news_count: int, significant_news: bool = False 

112 ) -> None: 

113 """ 

114 Update activity tracking for this topic. 

115 

116 Args: 

117 news_count: Number of news items found 

118 significant_news: Whether any news was particularly significant 

119 """ 

120 if news_count >= self.activity_threshold or significant_news: 

121 self.last_significant_activity = utc_now() 

122 self.metadata["is_trending"] = True 

123 else: 

124 # Check if topic is becoming stale 

125 hours_since_activity = ( 

126 utc_now() - self.last_significant_activity 

127 ).total_seconds() / 3600 

128 

129 if hours_since_activity > 72: # 3 days 

130 self.metadata["is_trending"] = False 

131 

132 def evolve_topic( 

133 self, new_form: str, reason: str = "natural evolution" 

134 ) -> None: 

135 """ 

136 Evolve the topic to a new form. 

137 

138 Args: 

139 new_form: The new form of the topic 

140 reason: Why the topic evolved 

141 """ 

142 if new_form != self.current_topic: 

143 self.topic_history.append(new_form) 

144 self.current_topic = new_form 

145 

146 self.metadata["last_evolution"] = { 

147 "from": self.topic_history[-2], 

148 "to": new_form, 

149 "reason": reason, 

150 "timestamp": utc_now().isoformat(), 

151 } 

152 

153 logger.info( 

154 f"Topic evolved from '{self.topic_history[-2]}' to '{new_form}' - {reason}" 

155 ) 

156 

157 def add_related_topic(self, topic: str) -> None: 

158 """ 

159 Add a related topic. 

160 

161 Args: 

162 topic: Related topic to add 

163 """ 

164 if topic not in self.related_topics and topic != self.current_topic: 

165 self.related_topics.append(topic) 

166 logger.debug( 

167 f"Added related topic '{topic}' to '{self.current_topic}'" 

168 ) 

169 

170 def merge_with(self, other_subscription: "TopicSubscription") -> None: 

171 """ 

172 Merge another topic subscription into this one. 

173 Useful when topics converge. 

174 

175 Args: 

176 other_subscription: The subscription to merge 

177 """ 

178 # Add the other topic as related 

179 self.add_related_topic(other_subscription.current_topic) 

180 

181 # Merge related topics 

182 for topic in other_subscription.related_topics: 

183 self.add_related_topic(topic) 

184 

185 # Update metadata 

186 self.metadata["merged_from"] = { 

187 "topic": other_subscription.current_topic, 

188 "subscription_id": other_subscription.id, 

189 "timestamp": utc_now().isoformat(), 

190 } 

191 

192 logger.info( 

193 f"Merged topic '{other_subscription.current_topic}' into '{self.current_topic}'" 

194 ) 

195 

196 def should_auto_expire(self) -> bool: 

197 """ 

198 Check if this subscription should auto-expire due to inactivity. 

199 

200 Returns: 

201 bool: True if subscription should expire 

202 """ 

203 # Don't expire if actively refreshing successfully 

204 if self.error_count == 0 and self.refresh_count > 0: 

205 # Check activity 

206 days_inactive = ( 

207 utc_now() - self.last_significant_activity 

208 ).total_seconds() / (24 * 3600) 

209 

210 # Expire after 30 days of no significant activity 

211 return days_inactive > 30 

212 

213 return False 

214 

215 def get_statistics(self) -> Dict[str, Any]: 

216 """Get statistics about this topic subscription.""" 

217 return { 

218 "original_topic": self.topic, 

219 "current_topic": self.current_topic, 

220 "evolution_count": len(self.topic_history) - 1, 

221 "related_topics_count": len(self.related_topics), 

222 "is_trending": self.metadata.get("is_trending", False), 

223 "days_since_activity": ( 

224 utc_now() - self.last_significant_activity 

225 ).total_seconds() 

226 / (24 * 3600), 

227 "total_refreshes": self.refresh_count, 

228 } 

229 

230 def to_dict(self) -> Dict[str, Any]: 

231 """Convert to dictionary representation.""" 

232 data = super().to_dict() 

233 data.update( 

234 { 

235 "topic": self.topic, 

236 "current_topic": self.current_topic, 

237 "related_topics": self.related_topics, 

238 "topic_history": self.topic_history, 

239 "last_significant_activity": self.last_significant_activity.isoformat(), 

240 "statistics": self.get_statistics(), 

241 } 

242 ) 

243 return data 

244 

245 

246class TopicSubscriptionFactory: 

247 """ 

248 Factory for creating topic subscriptions from various sources. 

249 """ 

250 

251 @staticmethod 

252 def from_news_extraction( 

253 user_id: str, 

254 topic: str, 

255 source_news_id: str, 

256 related_topics: Optional[List[str]] = None, 

257 **kwargs, 

258 ) -> TopicSubscription: 

259 """ 

260 Create a subscription from an extracted news topic. 

261 

262 Args: 

263 user_id: The user creating the subscription 

264 topic: The extracted topic 

265 source_news_id: ID of the news item it came from 

266 related_topics: Other related topics 

267 **kwargs: Additional arguments 

268 

269 Returns: 

270 TopicSubscription instance 

271 """ 

272 source = CardSource( 

273 type="news_topic", 

274 source_id=source_news_id, 

275 created_from=f"Topic from news analysis: {topic}", 

276 metadata={ 

277 "extraction_timestamp": utc_now().isoformat(), 

278 "extraction_method": kwargs.get("extraction_method", "llm"), 

279 }, 

280 ) 

281 

282 return TopicSubscription( 

283 user_id=user_id, 

284 topic=topic, 

285 source=source, 

286 related_topics=related_topics, 

287 **kwargs, 

288 ) 

289 

290 @staticmethod 

291 def from_user_interest( 

292 user_id: str, topic: str, **kwargs 

293 ) -> TopicSubscription: 

294 """ 

295 Create a subscription from direct user interest. 

296 

297 Args: 

298 user_id: The user 

299 topic: Topic they're interested in 

300 **kwargs: Additional arguments 

301 

302 Returns: 

303 TopicSubscription instance 

304 """ 

305 source = CardSource( 

306 type="user_interest", 

307 created_from=f"Your interest: {topic}", 

308 metadata={"created_via": kwargs.get("created_via", "manual")}, 

309 ) 

310 

311 return TopicSubscription( 

312 user_id=user_id, topic=topic, source=source, **kwargs 

313 )