Coverage for src/local_deep_research/notifications/manager.py: 99%
152 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2High-level notification manager with database integration.
3"""
5from datetime import datetime, timezone, timedelta
6from typing import Dict, Optional, Any
7from collections import deque
8import threading
10from loguru import logger
12from ..settings.env_registry import get_env_setting
13from .service import NotificationService
14from .templates import EventType
15from .exceptions import RateLimitError
18class NotificationManager:
19 """
20 High-level notification manager that uses settings snapshots for
21 thread-safe access to user settings.
23 This manager is designed to be used from background threads (e.g., queue
24 processors) by passing a settings_snapshot dictionary captured from the
25 main thread.
27 **Per-User Rate Limiting:**
28 The rate limiter is shared across ALL NotificationManager instances as a
29 singleton, but supports per-user rate limit configuration. Each user has
30 their own rate limits based on their settings, which are configured when
31 the NotificationManager is initialized with the required user_id parameter.
33 **How It Works:**
34 - The first NotificationManager instance creates the shared RateLimiter
35 with default limits
36 - Each instance configures user-specific limits by passing user_id to __init__
37 - The rate limiter maintains separate counters and limits for each user
38 - Users are completely isolated - one user's limit doesn't affect others
40 Example:
41 >>> # User A with conservative limits
42 >>> snapshot_a = {"notifications.rate_limit_per_hour": 5}
43 >>> manager_a = NotificationManager(snapshot_a, user_id="user_a")
44 >>> # ✅ user_a gets 5/hour
45 >>>
46 >>> # User B with generous limits (doesn't affect User A!)
47 >>> snapshot_b = {"notifications.rate_limit_per_hour": 20}
48 >>> manager_b = NotificationManager(snapshot_b, user_id="user_b")
49 >>> # ✅ user_b gets 20/hour, user_a still has 5/hour
50 """
52 # Shared rate limiter instance across all NotificationManager instances
53 # This ensures rate limits are enforced correctly even when multiple
54 # NotificationManager instances are created
55 _shared_rate_limiter: Optional["RateLimiter"] = None
56 _rate_limiter_lock = threading.Lock()
58 def __init__(self, settings_snapshot: Dict[str, Any], user_id: str):
59 """
60 Initialize the notification manager.
62 Args:
63 settings_snapshot: Dictionary of settings key-value pairs captured
64 from SettingsManager.get_settings_snapshot().
65 This allows thread-safe access to user settings
66 from background threads.
67 user_id: User identifier for per-user rate limiting. The rate limits
68 from settings_snapshot will be configured for this user.
70 Example:
71 >>> # In main thread with database session
72 >>> settings_manager = SettingsManager(session)
73 >>> snapshot = settings_manager.get_settings_snapshot()
74 >>>
75 >>> # In background thread (thread-safe)
76 >>> notification_manager = NotificationManager(
77 ... settings_snapshot=snapshot,
78 ... user_id="user123"
79 ... )
80 >>> notification_manager.send_notification(...)
81 """
82 # Store settings snapshot for thread-safe access
83 self._settings_snapshot = settings_snapshot
84 self._user_id = user_id
86 # Security: read from server-side environment variable only — never from
87 # user-writable DB settings. Previously this was read via
88 # _get_setting("notifications.allow_private_ips"), which allowed any
89 # user to bypass SSRF protection through the settings API.
90 # Registered as env-only in settings/env_definitions/security.py
91 # so SettingsManager will always read LDR_NOTIFICATIONS_ALLOW_PRIVATE_IPS
92 # from the environment, never from the database.
93 allow_private_ips = get_env_setting(
94 "notifications.allow_private_ips", False
95 )
97 # Server-level master switch (env-only). Distinct from the per-user
98 # notifications.enabled toggle in the UI: this gate is set by the
99 # deployment operator and cannot be flipped via the user-writable
100 # settings API. Disabled by default; enabling it accepts the
101 # documented DNS-rebinding TOCTOU residual risk
102 # (see SECURITY.md "Notification Webhook SSRF").
103 self._outbound_allowed = bool(
104 get_env_setting("notifications.allow_outbound", False)
105 )
107 self.service = NotificationService(
108 allow_private_ips=allow_private_ips,
109 outbound_allowed=self._outbound_allowed,
110 )
112 # Initialize shared rate limiter on first use
113 # The shared rate limiter now supports per-user limits, so each user's
114 # settings are respected regardless of initialization order.
115 with NotificationManager._rate_limiter_lock:
116 if NotificationManager._shared_rate_limiter is None:
117 # Create shared rate limiter with default limits
118 # (individual users can have different limits)
119 default_max_per_hour = self._get_setting(
120 "notifications.rate_limit_per_hour", default=10
121 )
122 default_max_per_day = self._get_setting(
123 "notifications.rate_limit_per_day", default=50
124 )
126 logger.info(
127 f"Initializing shared rate limiter with defaults: "
128 f"{default_max_per_hour}/hour, {default_max_per_day}/day"
129 )
131 NotificationManager._shared_rate_limiter = RateLimiter(
132 max_per_hour=default_max_per_hour,
133 max_per_day=default_max_per_day,
134 )
136 # Use the shared instance
137 self._rate_limiter = NotificationManager._shared_rate_limiter
139 # Configure per-user rate limits
140 max_per_hour = self._get_setting(
141 "notifications.rate_limit_per_hour", default=10
142 )
143 max_per_day = self._get_setting(
144 "notifications.rate_limit_per_day", default=50
145 )
147 self._rate_limiter.set_user_limits(
148 user_id, max_per_hour, max_per_day
149 )
150 logger.debug(
151 f"Configured rate limits for user {user_id}: "
152 f"{max_per_hour}/hour, {max_per_day}/day"
153 )
155 def _get_setting(self, key: str, default: Any = None) -> Any:
156 """
157 Get a setting value from snapshot.
159 Args:
160 key: Setting key
161 default: Default value if not found
163 Returns:
164 Setting value or default
165 """
166 return self._settings_snapshot.get(key, default)
168 def send_notification(
169 self,
170 event_type: EventType,
171 context: Dict[str, Any],
172 force: bool = False,
173 ) -> bool:
174 """
175 Send a notification for an event.
177 Uses the user_id that was provided during initialization for
178 rate limiting and user preferences.
180 Args:
181 event_type: Type of event
182 context: Context data for the notification
183 force: If True, bypass rate limiting
185 Returns:
186 True if notification was sent successfully
188 Raises:
189 RateLimitError: If rate limit is exceeded and force=False
190 """
191 logger.debug(
192 f"Sending notification: event_type={event_type.value}, "
193 f"user_id={self._user_id}, force={force}"
194 )
195 logger.debug(f"Context keys: {list(context.keys())}")
197 # Server-level master switch (env-only). Disabled by default;
198 # flipping it on is the operator's acknowledgement of the
199 # documented SSRF rebinding residual risk. force=True does NOT
200 # bypass this — it bypasses the per-user event-type and
201 # rate-limit toggles only. WARNING level so an operator wondering
202 # "why aren't notifications firing?" sees the actionable signal
203 # at default log level.
204 if not self._outbound_allowed:
205 logger.warning(
206 "Notification refused: outbound notifications are disabled "
207 "at the server level. Set "
208 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable. See "
209 "SECURITY.md 'Notification Webhook SSRF' for the rationale "
210 "and residual risk. (event={}, user={})",
211 event_type.value,
212 self._user_id,
213 )
214 return False
216 # Check if notifications are enabled for this event type
217 should_notify = self._should_notify(event_type)
218 logger.debug(
219 f"Notification enabled check for {event_type.value}: "
220 f"{should_notify}"
221 )
223 if not force and not should_notify:
224 logger.debug(
225 f"Notifications disabled for event type: "
226 f"{event_type.value} (user: {self._user_id})"
227 )
228 return False
230 # Check rate limit using the manager's user_id
231 rate_limit_ok = self._rate_limiter.is_allowed(self._user_id)
232 logger.debug(f"Rate limit check for {self._user_id}: {rate_limit_ok}")
234 if not force and not rate_limit_ok:
235 logger.warning(f"Rate limit exceeded for user {self._user_id}")
236 raise RateLimitError(
237 "Notification rate limit exceeded. "
238 "Please wait before sending more notifications."
239 )
241 try:
242 # Get service URLs from settings (snapshot or database)
243 service_urls = self._get_setting(
244 "notifications.service_url", default=""
245 )
247 if not service_urls or not service_urls.strip():
248 logger.debug(
249 f"No notification service URLs configured for user "
250 f"{self._user_id}"
251 )
252 return False
254 # Send notification with service URLs
255 logger.debug(f"Calling service.send_event for {event_type.value}")
256 result = self.service.send_event(
257 event_type, context, service_urls=service_urls
258 )
260 # Log to database if enabled
261 if result:
262 self._log_notification(event_type, context)
263 logger.info(
264 f"Notification sent: {event_type.value} to user "
265 f"{self._user_id}"
266 )
267 else:
268 logger.warning(
269 f"Notification failed: {event_type.value} to user "
270 f"{self._user_id}"
271 )
273 return result
275 except Exception:
276 logger.exception(
277 f"Error sending notification for {event_type.value} to user "
278 f"{self._user_id}"
279 )
280 return False
282 def test_service(self, url: str) -> Dict[str, Any]:
283 """
284 Test a notification service.
286 Args:
287 url: Service URL to test
289 Returns:
290 Dict with test results
291 """
292 return self.service.test_service(url)
294 def _should_notify(self, event_type: EventType) -> bool:
295 """
296 Check if notifications should be sent for this event type.
298 Uses the manager's settings snapshot to determine if the event type
299 is enabled for the user.
301 Args:
302 event_type: Event type to check
304 Returns:
305 True if notifications should be sent
306 """
307 try:
308 # Check event-specific setting (from snapshot or database)
309 setting_key = f"notifications.on_{event_type.value}"
310 enabled = self._get_setting(setting_key, default=False)
312 return bool(enabled)
314 except Exception:
315 logger.warning("Error checking notification preferences")
316 # Default to disabled on error to avoid infinite loops during login
317 return False
319 def _log_notification(
320 self, event_type: EventType, context: Dict[str, Any]
321 ) -> None:
322 """
323 Log a sent notification (simplified logging to application logs only).
325 Uses the manager's user_id for logging.
327 Args:
328 event_type: Event type
329 context: Notification context
330 """
331 try:
332 title = (
333 context.get("query")
334 or context.get("subscription_name")
335 or "Unknown"
336 )
337 logger.info(
338 f"Notification sent: {event_type.value} - {title} "
339 f"(user: {self._user_id})"
340 )
341 except Exception as e:
342 logger.debug(f"Failed to log notification: {e}")
345class RateLimiter:
346 """
347 Simple in-memory rate limiter for notifications with per-user limit support.
349 This rate limiter tracks notification counts per user and enforces
350 configurable rate limits. Each user can have their own rate limits,
351 which are stored separately from the notification counts.
353 **Per-User Limits:**
354 Rate limits can be configured per-user using `set_user_limits()`.
355 If no user-specific limits are set, the default limits (passed to
356 __init__) are used.
358 **Memory Storage:**
359 This implementation stores rate limits in memory only, which means
360 limits are reset when the server restarts. This is acceptable for normal
361 users since they cannot restart the server. If an admin restarts the server,
362 rate limits reset which is reasonable behavior.
364 **Thread Safety:**
365 This implementation is thread-safe using threading.Lock() for concurrent
366 requests from the same user.
368 **Multi-Worker Limitation:**
369 In multi-worker deployments, each worker process maintains its own rate
370 limit counters. Users could potentially bypass rate limits by distributing
371 requests across different workers, getting up to N × max_per_hour
372 notifications (where N = number of workers). For single-worker deployments
373 (the default for LDR), this is not a concern. For production multi-worker
374 deployments, consider implementing Redis-based rate limiting.
376 Example:
377 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50)
378 >>> # Set custom limits for specific user
379 >>> limiter.set_user_limits("user_a", max_per_hour=5, max_per_day=25)
380 >>> limiter.set_user_limits("user_b", max_per_hour=20, max_per_day=100)
381 >>> # Users get their configured limits
382 >>> limiter.is_allowed("user_a") # Limited to 5/hour
383 >>> limiter.is_allowed("user_b") # Limited to 20/hour
384 >>> limiter.is_allowed("user_c") # Uses defaults: 10/hour
385 """
387 def __init__(
388 self,
389 max_per_hour: int = 10,
390 max_per_day: int = 50,
391 cleanup_interval_hours: int = 24,
392 ):
393 """
394 Initialize rate limiter with default limits.
396 Args:
397 max_per_hour: Default maximum notifications per hour per user
398 max_per_day: Default maximum notifications per day per user
399 cleanup_interval_hours: How often to run cleanup of inactive users (hours)
400 """
401 # Default limits used when no user-specific limits are set
402 self.max_per_hour = max_per_hour
403 self.max_per_day = max_per_day
404 self.cleanup_interval_hours = cleanup_interval_hours
406 # Per-user rate limit configuration (user_id -> (max_per_hour, max_per_day))
407 self._user_limits: Dict[str, tuple[int, int]] = {}
409 # Per-user notification counts
410 self._hourly_counts: Dict[str, deque] = {}
411 self._daily_counts: Dict[str, deque] = {}
413 self._last_cleanup = datetime.now(timezone.utc)
414 self._lock = threading.Lock() # Thread safety for all operations
416 def set_user_limits(
417 self, user_id: str, max_per_hour: int, max_per_day: int
418 ) -> None:
419 """
420 Set rate limits for a specific user.
422 This allows each user to have their own rate limit configuration.
423 If not set, the user will use the default limits passed to __init__.
425 Args:
426 user_id: User identifier
427 max_per_hour: Maximum notifications per hour for this user
428 max_per_day: Maximum notifications per day for this user
430 Example:
431 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50)
432 >>> limiter.set_user_limits("power_user", max_per_hour=20, max_per_day=100)
433 >>> limiter.set_user_limits("limited_user", max_per_hour=5, max_per_day=25)
434 """
435 with self._lock:
436 self._user_limits[user_id] = (max_per_hour, max_per_day)
437 logger.debug(
438 f"Set rate limits for user {user_id}: "
439 f"{max_per_hour}/hour, {max_per_day}/day"
440 )
442 def get_user_limits(self, user_id: str) -> tuple[int, int]:
443 """
444 Get the effective rate limits for a user.
446 Returns the user-specific limits if set, otherwise returns defaults.
448 Args:
449 user_id: User identifier
451 Returns:
452 Tuple of (max_per_hour, max_per_day)
453 """
454 with self._lock:
455 return self._user_limits.get(
456 user_id, (self.max_per_hour, self.max_per_day)
457 )
459 def is_allowed(self, user_id: str) -> bool:
460 """
461 Check if a notification is allowed for a user.
463 Uses per-user rate limits if configured via set_user_limits(),
464 otherwise uses the default limits from __init__.
466 Args:
467 user_id: User identifier
469 Returns:
470 True if notification is allowed, False if rate limit exceeded
471 """
472 with self._lock:
473 now = datetime.now(timezone.utc)
475 # Periodic cleanup of inactive users
476 self._cleanup_inactive_users_if_needed(now)
478 # Initialize queues for user if needed
479 if user_id not in self._hourly_counts:
480 self._hourly_counts[user_id] = deque()
481 self._daily_counts[user_id] = deque()
483 # Clean old entries
484 self._clean_old_entries(user_id, now)
486 # Get user-specific limits or defaults
487 max_per_hour, max_per_day = self._user_limits.get(
488 user_id, (self.max_per_hour, self.max_per_day)
489 )
491 # Check limits
492 hourly_count = len(self._hourly_counts[user_id])
493 daily_count = len(self._daily_counts[user_id])
495 if hourly_count >= max_per_hour:
496 logger.warning(
497 f"Hourly rate limit exceeded for user {user_id}: "
498 f"{hourly_count}/{max_per_hour}"
499 )
500 return False
502 if daily_count >= max_per_day:
503 logger.warning(
504 f"Daily rate limit exceeded for user {user_id}: "
505 f"{daily_count}/{max_per_day}"
506 )
507 return False
509 # Record this notification
510 self._hourly_counts[user_id].append(now)
511 self._daily_counts[user_id].append(now)
513 return True
515 def _clean_old_entries(self, user_id: str, now: datetime) -> None:
516 """
517 Remove old entries from rate limit counters.
519 Args:
520 user_id: User identifier
521 now: Current time
522 """
523 hour_ago = now - timedelta(hours=1)
524 day_ago = now - timedelta(days=1)
526 # Clean hourly queue
527 while (
528 self._hourly_counts[user_id]
529 and self._hourly_counts[user_id][0] < hour_ago
530 ):
531 self._hourly_counts[user_id].popleft()
533 # Clean daily queue
534 while (
535 self._daily_counts[user_id]
536 and self._daily_counts[user_id][0] < day_ago
537 ):
538 self._daily_counts[user_id].popleft()
540 def reset(self, user_id: Optional[str] = None) -> None:
541 """
542 Reset rate limits for a user or all users.
544 Args:
545 user_id: User to reset, or None for all users
546 """
547 with self._lock:
548 if user_id:
549 self._hourly_counts.pop(user_id, None)
550 self._daily_counts.pop(user_id, None)
551 else:
552 self._hourly_counts.clear()
553 self._daily_counts.clear()
555 def _cleanup_inactive_users_if_needed(self, now: datetime) -> None:
556 """
557 Periodically clean up data for inactive users to prevent memory leaks.
559 Args:
560 now: Current timestamp
561 """
562 # Check if cleanup is needed
563 if now - self._last_cleanup < timedelta(
564 hours=self.cleanup_interval_hours
565 ):
566 return
568 logger.debug("Running periodic cleanup of inactive notification users")
570 # Define inactive threshold (users with no activity for 7 days)
571 inactive_threshold = now - timedelta(days=7)
573 inactive_users = []
575 # Find users with no recent activity
576 # Convert to list to avoid "dictionary changed size during iteration" error
577 for user_id in list(self._hourly_counts.keys()):
578 # Check if user has any recent entries
579 hourly_entries: list = list(self._hourly_counts.get(user_id, []))
580 daily_entries: list = list(self._daily_counts.get(user_id, []))
582 # If no entries or all entries are old, mark as inactive
583 has_recent_activity = False
584 for entry in hourly_entries + daily_entries:
585 if entry > inactive_threshold:
586 has_recent_activity = True
587 break
589 if not has_recent_activity:
590 inactive_users.append(user_id)
592 # Remove inactive users
593 for user_id in inactive_users:
594 self._hourly_counts.pop(user_id, None)
595 self._daily_counts.pop(user_id, None)
596 logger.debug(
597 f"Cleaned up inactive user {user_id} from rate limiter"
598 )
600 if inactive_users: 600 ↛ 605line 600 didn't jump to line 605 because the condition on line 600 was always true
601 logger.info(
602 f"Cleaned up {len(inactive_users)} inactive users from rate limiter"
603 )
605 self._last_cleanup = now