Coverage for src / local_deep_research / notifications / manager.py: 92%
149 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2High-level notification manager with database integration.
3"""
5from datetime import datetime, timezone, timedelta
6from typing import Dict, Optional, Any
7from collections import deque
8import threading
10from loguru import logger
12from .service import NotificationService
13from .templates import EventType
14from .exceptions import RateLimitError
17class NotificationManager:
18 """
19 High-level notification manager that uses settings snapshots for
20 thread-safe access to user settings.
22 This manager is designed to be used from background threads (e.g., queue
23 processors) by passing a settings_snapshot dictionary captured from the
24 main thread.
26 **Per-User Rate Limiting:**
27 The rate limiter is shared across ALL NotificationManager instances as a
28 singleton, but supports per-user rate limit configuration. Each user has
29 their own rate limits based on their settings, which are configured when
30 the NotificationManager is initialized with the required user_id parameter.
32 **How It Works:**
33 - The first NotificationManager instance creates the shared RateLimiter
34 with default limits
35 - Each instance configures user-specific limits by passing user_id to __init__
36 - The rate limiter maintains separate counters and limits for each user
37 - Users are completely isolated - one user's limit doesn't affect others
39 Example:
40 >>> # User A with conservative limits
41 >>> snapshot_a = {"notifications.rate_limit_per_hour": 5}
42 >>> manager_a = NotificationManager(snapshot_a, user_id="user_a")
43 >>> # ✅ user_a gets 5/hour
44 >>>
45 >>> # User B with generous limits (doesn't affect User A!)
46 >>> snapshot_b = {"notifications.rate_limit_per_hour": 20}
47 >>> manager_b = NotificationManager(snapshot_b, user_id="user_b")
48 >>> # ✅ user_b gets 20/hour, user_a still has 5/hour
49 """
51 # Shared rate limiter instance across all NotificationManager instances
52 # This ensures rate limits are enforced correctly even when multiple
53 # NotificationManager instances are created
54 _shared_rate_limiter: Optional["RateLimiter"] = None
55 _rate_limiter_lock = threading.Lock()
57 def __init__(self, settings_snapshot: Dict[str, Any], user_id: str):
58 """
59 Initialize the notification manager.
61 Args:
62 settings_snapshot: Dictionary of settings key-value pairs captured
63 from SettingsManager.get_settings_snapshot().
64 This allows thread-safe access to user settings
65 from background threads.
66 user_id: User identifier for per-user rate limiting. The rate limits
67 from settings_snapshot will be configured for this user.
69 Example:
70 >>> # In main thread with database session
71 >>> settings_manager = SettingsManager(session)
72 >>> snapshot = settings_manager.get_settings_snapshot()
73 >>>
74 >>> # In background thread (thread-safe)
75 >>> notification_manager = NotificationManager(
76 ... settings_snapshot=snapshot,
77 ... user_id="user123"
78 ... )
79 >>> notification_manager.send_notification(...)
80 """
81 # Store settings snapshot for thread-safe access
82 self._settings_snapshot = settings_snapshot
83 self._user_id = user_id
85 # Get security settings for notification service
86 # Default to False for security - only enable for development/testing
87 allow_private_ips = self._get_setting(
88 "notifications.allow_private_ips", default=False
89 )
91 self.service = NotificationService(allow_private_ips=allow_private_ips)
93 # Initialize shared rate limiter on first use
94 # The shared rate limiter now supports per-user limits, so each user's
95 # settings are respected regardless of initialization order.
96 with NotificationManager._rate_limiter_lock:
97 if NotificationManager._shared_rate_limiter is None:
98 # Create shared rate limiter with default limits
99 # (individual users can have different limits)
100 default_max_per_hour = self._get_setting(
101 "notifications.rate_limit_per_hour", default=10
102 )
103 default_max_per_day = self._get_setting(
104 "notifications.rate_limit_per_day", default=50
105 )
107 logger.info(
108 f"Initializing shared rate limiter with defaults: "
109 f"{default_max_per_hour}/hour, {default_max_per_day}/day"
110 )
112 NotificationManager._shared_rate_limiter = RateLimiter(
113 max_per_hour=default_max_per_hour,
114 max_per_day=default_max_per_day,
115 )
117 # Use the shared instance
118 self._rate_limiter = NotificationManager._shared_rate_limiter
120 # Configure per-user rate limits
121 max_per_hour = self._get_setting(
122 "notifications.rate_limit_per_hour", default=10
123 )
124 max_per_day = self._get_setting(
125 "notifications.rate_limit_per_day", default=50
126 )
128 self._rate_limiter.set_user_limits(
129 user_id, max_per_hour, max_per_day
130 )
131 logger.debug(
132 f"Configured rate limits for user {user_id}: "
133 f"{max_per_hour}/hour, {max_per_day}/day"
134 )
136 def _get_setting(self, key: str, default: Any = None) -> Any:
137 """
138 Get a setting value from snapshot.
140 Args:
141 key: Setting key
142 default: Default value if not found
144 Returns:
145 Setting value or default
146 """
147 return self._settings_snapshot.get(key, default)
149 def send_notification(
150 self,
151 event_type: EventType,
152 context: Dict[str, Any],
153 force: bool = False,
154 ) -> bool:
155 """
156 Send a notification for an event.
158 Uses the user_id that was provided during initialization for
159 rate limiting and user preferences.
161 Args:
162 event_type: Type of event
163 context: Context data for the notification
164 force: If True, bypass rate limiting
166 Returns:
167 True if notification was sent successfully
169 Raises:
170 RateLimitError: If rate limit is exceeded and force=False
171 """
172 try:
173 logger.debug(
174 f"Sending notification: event_type={event_type.value}, "
175 f"user_id={self._user_id}, force={force}"
176 )
177 logger.debug(f"Context keys: {list(context.keys())}")
179 # Check if notifications are enabled for this event type
180 should_notify = self._should_notify(event_type)
181 logger.debug(
182 f"Notification enabled check for {event_type.value}: "
183 f"{should_notify}"
184 )
186 if not force and not should_notify:
187 logger.debug(
188 f"Notifications disabled for event type: "
189 f"{event_type.value} (user: {self._user_id})"
190 )
191 return False
193 # Check rate limit using the manager's user_id
194 rate_limit_ok = self._rate_limiter.is_allowed(self._user_id)
195 logger.debug(
196 f"Rate limit check for {self._user_id}: {rate_limit_ok}"
197 )
199 if not force and not rate_limit_ok:
200 logger.warning(f"Rate limit exceeded for user {self._user_id}")
201 raise RateLimitError(
202 "Notification rate limit exceeded. "
203 "Please wait before sending more notifications."
204 )
206 # Get service URLs from settings (snapshot or database)
207 service_urls = self._get_setting(
208 "notifications.service_url", default=""
209 )
211 if not service_urls or not service_urls.strip():
212 logger.debug(
213 f"No notification service URLs configured for user "
214 f"{self._user_id}"
215 )
216 return False
218 # Send notification with service URLs
219 logger.debug(f"Calling service.send_event for {event_type.value}")
220 result = self.service.send_event(
221 event_type, context, service_urls=service_urls
222 )
224 # Log to database if enabled
225 if result:
226 self._log_notification(event_type, context)
227 logger.info(
228 f"Notification sent: {event_type.value} to user "
229 f"{self._user_id}"
230 )
231 else:
232 logger.warning(
233 f"Notification failed: {event_type.value} to user "
234 f"{self._user_id}"
235 )
237 return result
239 except RateLimitError:
240 logger.warning(f"Rate limit error for user {self._user_id}")
241 raise
242 except Exception as e:
243 logger.exception(
244 f"Error sending notification for {event_type.value} to user "
245 f"{self._user_id}: {e}"
246 )
247 return False
249 def test_service(self, url: str) -> Dict[str, Any]:
250 """
251 Test a notification service.
253 Args:
254 url: Service URL to test
256 Returns:
257 Dict with test results
258 """
259 return self.service.test_service(url)
261 def _should_notify(self, event_type: EventType) -> bool:
262 """
263 Check if notifications should be sent for this event type.
265 Uses the manager's settings snapshot to determine if the event type
266 is enabled for the user.
268 Args:
269 event_type: Event type to check
271 Returns:
272 True if notifications should be sent
273 """
274 try:
275 # Check event-specific setting (from snapshot or database)
276 setting_key = f"notifications.on_{event_type.value}"
277 enabled = self._get_setting(setting_key, default=False)
279 return enabled
281 except Exception as e:
282 logger.debug(f"Error checking notification preferences: {e}")
283 # Default to disabled on error to avoid infinite loops during login
284 return False
286 def _log_notification(
287 self, event_type: EventType, context: Dict[str, Any]
288 ) -> None:
289 """
290 Log a sent notification (simplified logging to application logs only).
292 Uses the manager's user_id for logging.
294 Args:
295 event_type: Event type
296 context: Notification context
297 """
298 try:
299 title = (
300 context.get("query")
301 or context.get("subscription_name")
302 or "Unknown"
303 )
304 logger.info(
305 f"Notification sent: {event_type.value} - {title} "
306 f"(user: {self._user_id})"
307 )
308 except Exception as e:
309 logger.debug(f"Failed to log notification: {e}")
312class RateLimiter:
313 """
314 Simple in-memory rate limiter for notifications with per-user limit support.
316 This rate limiter tracks notification counts per user and enforces
317 configurable rate limits. Each user can have their own rate limits,
318 which are stored separately from the notification counts.
320 **Per-User Limits:**
321 Rate limits can be configured per-user using `set_user_limits()`.
322 If no user-specific limits are set, the default limits (passed to
323 __init__) are used.
325 **Memory Storage:**
326 This implementation stores rate limits in memory only, which means
327 limits are reset when the server restarts. This is acceptable for normal
328 users since they cannot restart the server. If an admin restarts the server,
329 rate limits reset which is reasonable behavior.
331 **Thread Safety:**
332 This implementation is thread-safe using threading.Lock() for concurrent
333 requests from the same user.
335 **Multi-Worker Limitation:**
336 In multi-worker deployments, each worker process maintains its own rate
337 limit counters. Users could potentially bypass rate limits by distributing
338 requests across different workers, getting up to N × max_per_hour
339 notifications (where N = number of workers). For single-worker deployments
340 (the default for LDR), this is not a concern. For production multi-worker
341 deployments, consider implementing Redis-based rate limiting.
343 Example:
344 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50)
345 >>> # Set custom limits for specific user
346 >>> limiter.set_user_limits("user_a", max_per_hour=5, max_per_day=25)
347 >>> limiter.set_user_limits("user_b", max_per_hour=20, max_per_day=100)
348 >>> # Users get their configured limits
349 >>> limiter.is_allowed("user_a") # Limited to 5/hour
350 >>> limiter.is_allowed("user_b") # Limited to 20/hour
351 >>> limiter.is_allowed("user_c") # Uses defaults: 10/hour
352 """
354 def __init__(
355 self,
356 max_per_hour: int = 10,
357 max_per_day: int = 50,
358 cleanup_interval_hours: int = 24,
359 ):
360 """
361 Initialize rate limiter with default limits.
363 Args:
364 max_per_hour: Default maximum notifications per hour per user
365 max_per_day: Default maximum notifications per day per user
366 cleanup_interval_hours: How often to run cleanup of inactive users (hours)
367 """
368 # Default limits used when no user-specific limits are set
369 self.max_per_hour = max_per_hour
370 self.max_per_day = max_per_day
371 self.cleanup_interval_hours = cleanup_interval_hours
373 # Per-user rate limit configuration (user_id -> (max_per_hour, max_per_day))
374 self._user_limits: Dict[str, tuple[int, int]] = {}
376 # Per-user notification counts
377 self._hourly_counts: Dict[str, deque] = {}
378 self._daily_counts: Dict[str, deque] = {}
380 self._last_cleanup = datetime.now(timezone.utc)
381 self._lock = threading.Lock() # Thread safety for all operations
383 def set_user_limits(
384 self, user_id: str, max_per_hour: int, max_per_day: int
385 ) -> None:
386 """
387 Set rate limits for a specific user.
389 This allows each user to have their own rate limit configuration.
390 If not set, the user will use the default limits passed to __init__.
392 Args:
393 user_id: User identifier
394 max_per_hour: Maximum notifications per hour for this user
395 max_per_day: Maximum notifications per day for this user
397 Example:
398 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50)
399 >>> limiter.set_user_limits("power_user", max_per_hour=20, max_per_day=100)
400 >>> limiter.set_user_limits("limited_user", max_per_hour=5, max_per_day=25)
401 """
402 with self._lock:
403 self._user_limits[user_id] = (max_per_hour, max_per_day)
404 logger.debug(
405 f"Set rate limits for user {user_id}: "
406 f"{max_per_hour}/hour, {max_per_day}/day"
407 )
409 def get_user_limits(self, user_id: str) -> tuple[int, int]:
410 """
411 Get the effective rate limits for a user.
413 Returns the user-specific limits if set, otherwise returns defaults.
415 Args:
416 user_id: User identifier
418 Returns:
419 Tuple of (max_per_hour, max_per_day)
420 """
421 with self._lock:
422 return self._user_limits.get(
423 user_id, (self.max_per_hour, self.max_per_day)
424 )
426 def is_allowed(self, user_id: str) -> bool:
427 """
428 Check if a notification is allowed for a user.
430 Uses per-user rate limits if configured via set_user_limits(),
431 otherwise uses the default limits from __init__.
433 Args:
434 user_id: User identifier
436 Returns:
437 True if notification is allowed, False if rate limit exceeded
438 """
439 with self._lock:
440 now = datetime.now(timezone.utc)
442 # Periodic cleanup of inactive users
443 self._cleanup_inactive_users_if_needed(now)
445 # Initialize queues for user if needed
446 if user_id not in self._hourly_counts:
447 self._hourly_counts[user_id] = deque()
448 self._daily_counts[user_id] = deque()
450 # Clean old entries
451 self._clean_old_entries(user_id, now)
453 # Get user-specific limits or defaults
454 max_per_hour, max_per_day = self._user_limits.get(
455 user_id, (self.max_per_hour, self.max_per_day)
456 )
458 # Check limits
459 hourly_count = len(self._hourly_counts[user_id])
460 daily_count = len(self._daily_counts[user_id])
462 if hourly_count >= max_per_hour:
463 logger.warning(
464 f"Hourly rate limit exceeded for user {user_id}: "
465 f"{hourly_count}/{max_per_hour}"
466 )
467 return False
469 if daily_count >= max_per_day:
470 logger.warning(
471 f"Daily rate limit exceeded for user {user_id}: "
472 f"{daily_count}/{max_per_day}"
473 )
474 return False
476 # Record this notification
477 self._hourly_counts[user_id].append(now)
478 self._daily_counts[user_id].append(now)
480 return True
482 def _clean_old_entries(self, user_id: str, now: datetime) -> None:
483 """
484 Remove old entries from rate limit counters.
486 Args:
487 user_id: User identifier
488 now: Current time
489 """
490 hour_ago = now - timedelta(hours=1)
491 day_ago = now - timedelta(days=1)
493 # Clean hourly queue
494 while ( 494 ↛ 498line 494 didn't jump to line 498 because the condition on line 494 was never true
495 self._hourly_counts[user_id]
496 and self._hourly_counts[user_id][0] < hour_ago
497 ):
498 self._hourly_counts[user_id].popleft()
500 # Clean daily queue
501 while ( 501 ↛ 505line 501 didn't jump to line 505 because the condition on line 501 was never true
502 self._daily_counts[user_id]
503 and self._daily_counts[user_id][0] < day_ago
504 ):
505 self._daily_counts[user_id].popleft()
507 def reset(self, user_id: Optional[str] = None) -> None:
508 """
509 Reset rate limits for a user or all users.
511 Args:
512 user_id: User to reset, or None for all users
513 """
514 if user_id:
515 self._hourly_counts.pop(user_id, None)
516 self._daily_counts.pop(user_id, None)
517 else:
518 self._hourly_counts.clear()
519 self._daily_counts.clear()
521 def _cleanup_inactive_users_if_needed(self, now: datetime) -> None:
522 """
523 Periodically clean up data for inactive users to prevent memory leaks.
525 Args:
526 now: Current timestamp
527 """
528 # Check if cleanup is needed
529 if now - self._last_cleanup < timedelta(
530 hours=self.cleanup_interval_hours
531 ):
532 return
534 logger.debug("Running periodic cleanup of inactive notification users")
536 # Define inactive threshold (users with no activity for 7 days)
537 inactive_threshold = now - timedelta(days=7)
539 inactive_users = []
541 # Find users with no recent activity
542 # Convert to list to avoid "dictionary changed size during iteration" error
543 for user_id in list(self._hourly_counts.keys()):
544 # Check if user has any recent entries
545 hourly_entries = self._hourly_counts.get(user_id, [])
546 daily_entries = self._daily_counts.get(user_id, [])
548 # If no entries or all entries are old, mark as inactive
549 has_recent_activity = False
550 for entry in hourly_entries + daily_entries:
551 if entry > inactive_threshold: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 has_recent_activity = True
553 break
555 if not has_recent_activity: 555 ↛ 543line 555 didn't jump to line 543 because the condition on line 555 was always true
556 inactive_users.append(user_id)
558 # Remove inactive users
559 for user_id in inactive_users:
560 self._hourly_counts.pop(user_id, None)
561 self._daily_counts.pop(user_id, None)
562 logger.debug(
563 f"Cleaned up inactive user {user_id} from rate limiter"
564 )
566 if inactive_users: 566 ↛ 571line 566 didn't jump to line 571 because the condition on line 566 was always true
567 logger.info(
568 f"Cleaned up {len(inactive_users)} inactive users from rate limiter"
569 )
571 self._last_cleanup = now