Coverage for src / local_deep_research / notifications / manager.py: 99%
147 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2High-level notification manager with database integration.
3"""
5from datetime import datetime, timezone, timedelta
6from typing import Dict, Optional, Any
7from collections import deque
8import threading
10from loguru import logger
12from ..settings.env_registry import get_env_setting
13from .service import NotificationService
14from .templates import EventType
15from .exceptions import RateLimitError
18class NotificationManager:
19 """
20 High-level notification manager that uses settings snapshots for
21 thread-safe access to user settings.
23 This manager is designed to be used from background threads (e.g., queue
24 processors) by passing a settings_snapshot dictionary captured from the
25 main thread.
27 **Per-User Rate Limiting:**
28 The rate limiter is shared across ALL NotificationManager instances as a
29 singleton, but supports per-user rate limit configuration. Each user has
30 their own rate limits based on their settings, which are configured when
31 the NotificationManager is initialized with the required user_id parameter.
33 **How It Works:**
34 - The first NotificationManager instance creates the shared RateLimiter
35 with default limits
36 - Each instance configures user-specific limits by passing user_id to __init__
37 - The rate limiter maintains separate counters and limits for each user
38 - Users are completely isolated - one user's limit doesn't affect others
40 Example:
41 >>> # User A with conservative limits
42 >>> snapshot_a = {"notifications.rate_limit_per_hour": 5}
43 >>> manager_a = NotificationManager(snapshot_a, user_id="user_a")
44 >>> # ✅ user_a gets 5/hour
45 >>>
46 >>> # User B with generous limits (doesn't affect User A!)
47 >>> snapshot_b = {"notifications.rate_limit_per_hour": 20}
48 >>> manager_b = NotificationManager(snapshot_b, user_id="user_b")
49 >>> # ✅ user_b gets 20/hour, user_a still has 5/hour
50 """
52 # Shared rate limiter instance across all NotificationManager instances
53 # This ensures rate limits are enforced correctly even when multiple
54 # NotificationManager instances are created
55 _shared_rate_limiter: Optional["RateLimiter"] = None
56 _rate_limiter_lock = threading.Lock()
58 def __init__(self, settings_snapshot: Dict[str, Any], user_id: str):
59 """
60 Initialize the notification manager.
62 Args:
63 settings_snapshot: Dictionary of settings key-value pairs captured
64 from SettingsManager.get_settings_snapshot().
65 This allows thread-safe access to user settings
66 from background threads.
67 user_id: User identifier for per-user rate limiting. The rate limits
68 from settings_snapshot will be configured for this user.
70 Example:
71 >>> # In main thread with database session
72 >>> settings_manager = SettingsManager(session)
73 >>> snapshot = settings_manager.get_settings_snapshot()
74 >>>
75 >>> # In background thread (thread-safe)
76 >>> notification_manager = NotificationManager(
77 ... settings_snapshot=snapshot,
78 ... user_id="user123"
79 ... )
80 >>> notification_manager.send_notification(...)
81 """
82 # Store settings snapshot for thread-safe access
83 self._settings_snapshot = settings_snapshot
84 self._user_id = user_id
86 # Security: read from server-side environment variable only — never from
87 # user-writable DB settings. Previously this was read via
88 # _get_setting("notifications.allow_private_ips"), which allowed any
89 # user to bypass SSRF protection through the settings API.
90 # Registered as env-only in settings/env_definitions/security.py
91 # so SettingsManager will always read LDR_NOTIFICATIONS_ALLOW_PRIVATE_IPS
92 # from the environment, never from the database.
93 allow_private_ips = get_env_setting(
94 "notifications.allow_private_ips", False
95 )
97 self.service = NotificationService(allow_private_ips=allow_private_ips)
99 # Initialize shared rate limiter on first use
100 # The shared rate limiter now supports per-user limits, so each user's
101 # settings are respected regardless of initialization order.
102 with NotificationManager._rate_limiter_lock:
103 if NotificationManager._shared_rate_limiter is None:
104 # Create shared rate limiter with default limits
105 # (individual users can have different limits)
106 default_max_per_hour = self._get_setting(
107 "notifications.rate_limit_per_hour", default=10
108 )
109 default_max_per_day = self._get_setting(
110 "notifications.rate_limit_per_day", default=50
111 )
113 logger.info(
114 f"Initializing shared rate limiter with defaults: "
115 f"{default_max_per_hour}/hour, {default_max_per_day}/day"
116 )
118 NotificationManager._shared_rate_limiter = RateLimiter(
119 max_per_hour=default_max_per_hour,
120 max_per_day=default_max_per_day,
121 )
123 # Use the shared instance
124 self._rate_limiter = NotificationManager._shared_rate_limiter
126 # Configure per-user rate limits
127 max_per_hour = self._get_setting(
128 "notifications.rate_limit_per_hour", default=10
129 )
130 max_per_day = self._get_setting(
131 "notifications.rate_limit_per_day", default=50
132 )
134 self._rate_limiter.set_user_limits(
135 user_id, max_per_hour, max_per_day
136 )
137 logger.debug(
138 f"Configured rate limits for user {user_id}: "
139 f"{max_per_hour}/hour, {max_per_day}/day"
140 )
142 def _get_setting(self, key: str, default: Any = None) -> Any:
143 """
144 Get a setting value from snapshot.
146 Args:
147 key: Setting key
148 default: Default value if not found
150 Returns:
151 Setting value or default
152 """
153 return self._settings_snapshot.get(key, default)
155 def send_notification(
156 self,
157 event_type: EventType,
158 context: Dict[str, Any],
159 force: bool = False,
160 ) -> bool:
161 """
162 Send a notification for an event.
164 Uses the user_id that was provided during initialization for
165 rate limiting and user preferences.
167 Args:
168 event_type: Type of event
169 context: Context data for the notification
170 force: If True, bypass rate limiting
172 Returns:
173 True if notification was sent successfully
175 Raises:
176 RateLimitError: If rate limit is exceeded and force=False
177 """
178 logger.debug(
179 f"Sending notification: event_type={event_type.value}, "
180 f"user_id={self._user_id}, force={force}"
181 )
182 logger.debug(f"Context keys: {list(context.keys())}")
184 # Check if notifications are enabled for this event type
185 should_notify = self._should_notify(event_type)
186 logger.debug(
187 f"Notification enabled check for {event_type.value}: "
188 f"{should_notify}"
189 )
191 if not force and not should_notify:
192 logger.debug(
193 f"Notifications disabled for event type: "
194 f"{event_type.value} (user: {self._user_id})"
195 )
196 return False
198 # Check rate limit using the manager's user_id
199 rate_limit_ok = self._rate_limiter.is_allowed(self._user_id)
200 logger.debug(f"Rate limit check for {self._user_id}: {rate_limit_ok}")
202 if not force and not rate_limit_ok:
203 logger.warning(f"Rate limit exceeded for user {self._user_id}")
204 raise RateLimitError(
205 "Notification rate limit exceeded. "
206 "Please wait before sending more notifications."
207 )
209 try:
210 # Get service URLs from settings (snapshot or database)
211 service_urls = self._get_setting(
212 "notifications.service_url", default=""
213 )
215 if not service_urls or not service_urls.strip():
216 logger.debug(
217 f"No notification service URLs configured for user "
218 f"{self._user_id}"
219 )
220 return False
222 # Send notification with service URLs
223 logger.debug(f"Calling service.send_event for {event_type.value}")
224 result = self.service.send_event(
225 event_type, context, service_urls=service_urls
226 )
228 # Log to database if enabled
229 if result:
230 self._log_notification(event_type, context)
231 logger.info(
232 f"Notification sent: {event_type.value} to user "
233 f"{self._user_id}"
234 )
235 else:
236 logger.warning(
237 f"Notification failed: {event_type.value} to user "
238 f"{self._user_id}"
239 )
241 return result
243 except Exception:
244 logger.exception(
245 f"Error sending notification for {event_type.value} to user "
246 f"{self._user_id}"
247 )
248 return False
250 def test_service(self, url: str) -> Dict[str, Any]:
251 """
252 Test a notification service.
254 Args:
255 url: Service URL to test
257 Returns:
258 Dict with test results
259 """
260 return self.service.test_service(url)
262 def _should_notify(self, event_type: EventType) -> bool:
263 """
264 Check if notifications should be sent for this event type.
266 Uses the manager's settings snapshot to determine if the event type
267 is enabled for the user.
269 Args:
270 event_type: Event type to check
272 Returns:
273 True if notifications should be sent
274 """
275 try:
276 # Check event-specific setting (from snapshot or database)
277 setting_key = f"notifications.on_{event_type.value}"
278 enabled = self._get_setting(setting_key, default=False)
280 return bool(enabled)
282 except Exception:
283 logger.warning("Error checking notification preferences")
284 # Default to disabled on error to avoid infinite loops during login
285 return False
287 def _log_notification(
288 self, event_type: EventType, context: Dict[str, Any]
289 ) -> None:
290 """
291 Log a sent notification (simplified logging to application logs only).
293 Uses the manager's user_id for logging.
295 Args:
296 event_type: Event type
297 context: Notification context
298 """
299 try:
300 title = (
301 context.get("query")
302 or context.get("subscription_name")
303 or "Unknown"
304 )
305 logger.info(
306 f"Notification sent: {event_type.value} - {title} "
307 f"(user: {self._user_id})"
308 )
309 except Exception as e:
310 logger.debug(f"Failed to log notification: {e}")
313class RateLimiter:
314 """
315 Simple in-memory rate limiter for notifications with per-user limit support.
317 This rate limiter tracks notification counts per user and enforces
318 configurable rate limits. Each user can have their own rate limits,
319 which are stored separately from the notification counts.
321 **Per-User Limits:**
322 Rate limits can be configured per-user using `set_user_limits()`.
323 If no user-specific limits are set, the default limits (passed to
324 __init__) are used.
326 **Memory Storage:**
327 This implementation stores rate limits in memory only, which means
328 limits are reset when the server restarts. This is acceptable for normal
329 users since they cannot restart the server. If an admin restarts the server,
330 rate limits reset which is reasonable behavior.
332 **Thread Safety:**
333 This implementation is thread-safe using threading.Lock() for concurrent
334 requests from the same user.
336 **Multi-Worker Limitation:**
337 In multi-worker deployments, each worker process maintains its own rate
338 limit counters. Users could potentially bypass rate limits by distributing
339 requests across different workers, getting up to N × max_per_hour
340 notifications (where N = number of workers). For single-worker deployments
341 (the default for LDR), this is not a concern. For production multi-worker
342 deployments, consider implementing Redis-based rate limiting.
344 Example:
345 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50)
346 >>> # Set custom limits for specific user
347 >>> limiter.set_user_limits("user_a", max_per_hour=5, max_per_day=25)
348 >>> limiter.set_user_limits("user_b", max_per_hour=20, max_per_day=100)
349 >>> # Users get their configured limits
350 >>> limiter.is_allowed("user_a") # Limited to 5/hour
351 >>> limiter.is_allowed("user_b") # Limited to 20/hour
352 >>> limiter.is_allowed("user_c") # Uses defaults: 10/hour
353 """
355 def __init__(
356 self,
357 max_per_hour: int = 10,
358 max_per_day: int = 50,
359 cleanup_interval_hours: int = 24,
360 ):
361 """
362 Initialize rate limiter with default limits.
364 Args:
365 max_per_hour: Default maximum notifications per hour per user
366 max_per_day: Default maximum notifications per day per user
367 cleanup_interval_hours: How often to run cleanup of inactive users (hours)
368 """
369 # Default limits used when no user-specific limits are set
370 self.max_per_hour = max_per_hour
371 self.max_per_day = max_per_day
372 self.cleanup_interval_hours = cleanup_interval_hours
374 # Per-user rate limit configuration (user_id -> (max_per_hour, max_per_day))
375 self._user_limits: Dict[str, tuple[int, int]] = {}
377 # Per-user notification counts
378 self._hourly_counts: Dict[str, deque] = {}
379 self._daily_counts: Dict[str, deque] = {}
381 self._last_cleanup = datetime.now(timezone.utc)
382 self._lock = threading.Lock() # Thread safety for all operations
384 def set_user_limits(
385 self, user_id: str, max_per_hour: int, max_per_day: int
386 ) -> None:
387 """
388 Set rate limits for a specific user.
390 This allows each user to have their own rate limit configuration.
391 If not set, the user will use the default limits passed to __init__.
393 Args:
394 user_id: User identifier
395 max_per_hour: Maximum notifications per hour for this user
396 max_per_day: Maximum notifications per day for this user
398 Example:
399 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50)
400 >>> limiter.set_user_limits("power_user", max_per_hour=20, max_per_day=100)
401 >>> limiter.set_user_limits("limited_user", max_per_hour=5, max_per_day=25)
402 """
403 with self._lock:
404 self._user_limits[user_id] = (max_per_hour, max_per_day)
405 logger.debug(
406 f"Set rate limits for user {user_id}: "
407 f"{max_per_hour}/hour, {max_per_day}/day"
408 )
410 def get_user_limits(self, user_id: str) -> tuple[int, int]:
411 """
412 Get the effective rate limits for a user.
414 Returns the user-specific limits if set, otherwise returns defaults.
416 Args:
417 user_id: User identifier
419 Returns:
420 Tuple of (max_per_hour, max_per_day)
421 """
422 with self._lock:
423 return self._user_limits.get(
424 user_id, (self.max_per_hour, self.max_per_day)
425 )
427 def is_allowed(self, user_id: str) -> bool:
428 """
429 Check if a notification is allowed for a user.
431 Uses per-user rate limits if configured via set_user_limits(),
432 otherwise uses the default limits from __init__.
434 Args:
435 user_id: User identifier
437 Returns:
438 True if notification is allowed, False if rate limit exceeded
439 """
440 with self._lock:
441 now = datetime.now(timezone.utc)
443 # Periodic cleanup of inactive users
444 self._cleanup_inactive_users_if_needed(now)
446 # Initialize queues for user if needed
447 if user_id not in self._hourly_counts:
448 self._hourly_counts[user_id] = deque()
449 self._daily_counts[user_id] = deque()
451 # Clean old entries
452 self._clean_old_entries(user_id, now)
454 # Get user-specific limits or defaults
455 max_per_hour, max_per_day = self._user_limits.get(
456 user_id, (self.max_per_hour, self.max_per_day)
457 )
459 # Check limits
460 hourly_count = len(self._hourly_counts[user_id])
461 daily_count = len(self._daily_counts[user_id])
463 if hourly_count >= max_per_hour:
464 logger.warning(
465 f"Hourly rate limit exceeded for user {user_id}: "
466 f"{hourly_count}/{max_per_hour}"
467 )
468 return False
470 if daily_count >= max_per_day:
471 logger.warning(
472 f"Daily rate limit exceeded for user {user_id}: "
473 f"{daily_count}/{max_per_day}"
474 )
475 return False
477 # Record this notification
478 self._hourly_counts[user_id].append(now)
479 self._daily_counts[user_id].append(now)
481 return True
483 def _clean_old_entries(self, user_id: str, now: datetime) -> None:
484 """
485 Remove old entries from rate limit counters.
487 Args:
488 user_id: User identifier
489 now: Current time
490 """
491 hour_ago = now - timedelta(hours=1)
492 day_ago = now - timedelta(days=1)
494 # Clean hourly queue
495 while (
496 self._hourly_counts[user_id]
497 and self._hourly_counts[user_id][0] < hour_ago
498 ):
499 self._hourly_counts[user_id].popleft()
501 # Clean daily queue
502 while (
503 self._daily_counts[user_id]
504 and self._daily_counts[user_id][0] < day_ago
505 ):
506 self._daily_counts[user_id].popleft()
508 def reset(self, user_id: Optional[str] = None) -> None:
509 """
510 Reset rate limits for a user or all users.
512 Args:
513 user_id: User to reset, or None for all users
514 """
515 if user_id:
516 self._hourly_counts.pop(user_id, None)
517 self._daily_counts.pop(user_id, None)
518 else:
519 self._hourly_counts.clear()
520 self._daily_counts.clear()
522 def _cleanup_inactive_users_if_needed(self, now: datetime) -> None:
523 """
524 Periodically clean up data for inactive users to prevent memory leaks.
526 Args:
527 now: Current timestamp
528 """
529 # Check if cleanup is needed
530 if now - self._last_cleanup < timedelta(
531 hours=self.cleanup_interval_hours
532 ):
533 return
535 logger.debug("Running periodic cleanup of inactive notification users")
537 # Define inactive threshold (users with no activity for 7 days)
538 inactive_threshold = now - timedelta(days=7)
540 inactive_users = []
542 # Find users with no recent activity
543 # Convert to list to avoid "dictionary changed size during iteration" error
544 for user_id in list(self._hourly_counts.keys()):
545 # Check if user has any recent entries
546 hourly_entries: list = list(self._hourly_counts.get(user_id, []))
547 daily_entries: list = list(self._daily_counts.get(user_id, []))
549 # If no entries or all entries are old, mark as inactive
550 has_recent_activity = False
551 for entry in hourly_entries + daily_entries:
552 if entry > inactive_threshold:
553 has_recent_activity = True
554 break
556 if not has_recent_activity:
557 inactive_users.append(user_id)
559 # Remove inactive users
560 for user_id in inactive_users:
561 self._hourly_counts.pop(user_id, None)
562 self._daily_counts.pop(user_id, None)
563 logger.debug(
564 f"Cleaned up inactive user {user_id} from rate limiter"
565 )
567 if inactive_users: 567 ↛ 572line 567 didn't jump to line 572 because the condition on line 567 was always true
568 logger.info(
569 f"Cleaned up {len(inactive_users)} inactive users from rate limiter"
570 )
572 self._last_cleanup = now