Coverage for src / local_deep_research / news / subscription_manager / topic_subscription.py: 23%
75 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Topic subscription - allows users to subscribe to specific news topics.
3Topics are extracted from news analysis and can evolve over time.
4"""
6from typing import Optional, Dict, Any, List
7from loguru import logger
9from .base_subscription import BaseSubscription
10from ..core.base_card import CardSource
11from ..core.utils import utc_now
14class TopicSubscription(BaseSubscription):
15 """
16 Subscription to a specific news topic.
17 Topics can be extracted from news or manually created.
18 """
20 def __init__(
21 self,
22 topic: str,
23 user_id: str,
24 refresh_interval_minutes: int = 240, # Default 4 hours
25 source: Optional[CardSource] = None,
26 related_topics: Optional[List[str]] = None,
27 subscription_id: Optional[str] = None,
28 ):
29 """
30 Initialize a topic subscription.
32 Args:
33 user_id: ID of the user
34 topic: The topic to follow
35 source: Source information (auto-created if not provided)
36 refresh_interval_minutes: How often to check for updates in minutes
37 related_topics: Other topics related to this one
38 subscription_id: Optional ID
39 """
40 # Create source if not provided
41 if source is None:
42 source = CardSource(
43 type="news_topic", created_from=f"Topic subscription: {topic}"
44 )
46 super().__init__(
47 user_id, source, topic, refresh_interval_minutes, subscription_id
48 )
50 self.topic = topic
51 self.related_topics = related_topics or []
53 # Set subscription type
54 self.subscription_type = "topic"
56 # Track topic evolution
57 self.topic_history = [topic]
58 self.current_topic = topic
60 # Track when topic was last significantly active
61 self.last_significant_activity = utc_now()
62 self.activity_threshold = 3 # Min news items to be "active"
64 # Metadata specific to topic subscriptions
65 self.metadata.update(
66 {
67 "subscription_type": "topic",
68 "original_topic": topic,
69 "is_trending": False,
70 "topic_category": None, # Will be set by analyzer
71 }
72 )
74 logger.info(f"Created topic subscription for: {topic}")
76 def get_subscription_type(self) -> str:
77 """Return the subscription type identifier."""
78 return "topic_subscription"
80 def generate_search_query(self) -> str:
81 """
82 Generate a search query for this topic.
84 Returns:
85 str: The search query for finding news about this topic
86 """
87 # Build query with main topic and related topics
88 query_parts = [self.current_topic]
90 # Add some related topics for broader coverage
91 if self.related_topics:
92 # Add up to 2 related topics
93 query_parts.extend(self.related_topics[:2])
95 # Combine with news-specific terms
96 base_query = " OR ".join(f'"{part}"' for part in query_parts)
97 news_query = f"{base_query} latest news today developments breaking"
99 # Update any date placeholders with current date in user's timezone
100 from ..core.utils import get_local_date_string
102 current_date = get_local_date_string()
104 # Replace YYYY-MM-DD placeholder ONLY (not all dates)
105 news_query = news_query.replace("YYYY-MM-DD", current_date)
107 logger.debug(f"Generated topic query: {news_query}")
108 return news_query
110 def update_activity(
111 self, news_count: int, significant_news: bool = False
112 ) -> None:
113 """
114 Update activity tracking for this topic.
116 Args:
117 news_count: Number of news items found
118 significant_news: Whether any news was particularly significant
119 """
120 if news_count >= self.activity_threshold or significant_news:
121 self.last_significant_activity = utc_now()
122 self.metadata["is_trending"] = True
123 else:
124 # Check if topic is becoming stale
125 hours_since_activity = (
126 utc_now() - self.last_significant_activity
127 ).total_seconds() / 3600
129 if hours_since_activity > 72: # 3 days
130 self.metadata["is_trending"] = False
132 def evolve_topic(
133 self, new_form: str, reason: str = "natural evolution"
134 ) -> None:
135 """
136 Evolve the topic to a new form.
138 Args:
139 new_form: The new form of the topic
140 reason: Why the topic evolved
141 """
142 if new_form != self.current_topic:
143 self.topic_history.append(new_form)
144 self.current_topic = new_form
146 self.metadata["last_evolution"] = {
147 "from": self.topic_history[-2],
148 "to": new_form,
149 "reason": reason,
150 "timestamp": utc_now().isoformat(),
151 }
153 logger.info(
154 f"Topic evolved from '{self.topic_history[-2]}' to '{new_form}' - {reason}"
155 )
157 def add_related_topic(self, topic: str) -> None:
158 """
159 Add a related topic.
161 Args:
162 topic: Related topic to add
163 """
164 if topic not in self.related_topics and topic != self.current_topic:
165 self.related_topics.append(topic)
166 logger.debug(
167 f"Added related topic '{topic}' to '{self.current_topic}'"
168 )
170 def merge_with(self, other_subscription: "TopicSubscription") -> None:
171 """
172 Merge another topic subscription into this one.
173 Useful when topics converge.
175 Args:
176 other_subscription: The subscription to merge
177 """
178 # Add the other topic as related
179 self.add_related_topic(other_subscription.current_topic)
181 # Merge related topics
182 for topic in other_subscription.related_topics:
183 self.add_related_topic(topic)
185 # Update metadata
186 self.metadata["merged_from"] = {
187 "topic": other_subscription.current_topic,
188 "subscription_id": other_subscription.id,
189 "timestamp": utc_now().isoformat(),
190 }
192 logger.info(
193 f"Merged topic '{other_subscription.current_topic}' into '{self.current_topic}'"
194 )
196 def should_auto_expire(self) -> bool:
197 """
198 Check if this subscription should auto-expire due to inactivity.
200 Returns:
201 bool: True if subscription should expire
202 """
203 # Don't expire if actively refreshing successfully
204 if self.error_count == 0 and self.refresh_count > 0:
205 # Check activity
206 days_inactive = (
207 utc_now() - self.last_significant_activity
208 ).total_seconds() / (24 * 3600)
210 # Expire after 30 days of no significant activity
211 return days_inactive > 30
213 return False
215 def get_statistics(self) -> Dict[str, Any]:
216 """Get statistics about this topic subscription."""
217 return {
218 "original_topic": self.topic,
219 "current_topic": self.current_topic,
220 "evolution_count": len(self.topic_history) - 1,
221 "related_topics_count": len(self.related_topics),
222 "is_trending": self.metadata.get("is_trending", False),
223 "days_since_activity": (
224 utc_now() - self.last_significant_activity
225 ).total_seconds()
226 / (24 * 3600),
227 "total_refreshes": self.refresh_count,
228 }
230 def to_dict(self) -> Dict[str, Any]:
231 """Convert to dictionary representation."""
232 data = super().to_dict()
233 data.update(
234 {
235 "topic": self.topic,
236 "current_topic": self.current_topic,
237 "related_topics": self.related_topics,
238 "topic_history": self.topic_history,
239 "last_significant_activity": self.last_significant_activity.isoformat(),
240 "statistics": self.get_statistics(),
241 }
242 )
243 return data
246class TopicSubscriptionFactory:
247 """
248 Factory for creating topic subscriptions from various sources.
249 """
251 @staticmethod
252 def from_news_extraction(
253 user_id: str,
254 topic: str,
255 source_news_id: str,
256 related_topics: Optional[List[str]] = None,
257 **kwargs,
258 ) -> TopicSubscription:
259 """
260 Create a subscription from an extracted news topic.
262 Args:
263 user_id: The user creating the subscription
264 topic: The extracted topic
265 source_news_id: ID of the news item it came from
266 related_topics: Other related topics
267 **kwargs: Additional arguments
269 Returns:
270 TopicSubscription instance
271 """
272 source = CardSource(
273 type="news_topic",
274 source_id=source_news_id,
275 created_from=f"Topic from news analysis: {topic}",
276 metadata={
277 "extraction_timestamp": utc_now().isoformat(),
278 "extraction_method": kwargs.get("extraction_method", "llm"),
279 },
280 )
282 return TopicSubscription(
283 user_id=user_id,
284 topic=topic,
285 source=source,
286 related_topics=related_topics,
287 **kwargs,
288 )
290 @staticmethod
291 def from_user_interest(
292 user_id: str, topic: str, **kwargs
293 ) -> TopicSubscription:
294 """
295 Create a subscription from direct user interest.
297 Args:
298 user_id: The user
299 topic: Topic they're interested in
300 **kwargs: Additional arguments
302 Returns:
303 TopicSubscription instance
304 """
305 source = CardSource(
306 type="user_interest",
307 created_from=f"Your interest: {topic}",
308 metadata={"created_via": kwargs.get("created_via", "manual")},
309 )
311 return TopicSubscription(
312 user_id=user_id, topic=topic, source=source, **kwargs
313 )