Coverage for src/local_deep_research/notifications/manager.py: 99%

152 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2High-level notification manager with database integration. 

3""" 

4 

5from datetime import datetime, timezone, timedelta 

6from typing import Dict, Optional, Any 

7from collections import deque 

8import threading 

9 

10from loguru import logger 

11 

12from ..settings.env_registry import get_env_setting 

13from .service import NotificationService 

14from .templates import EventType 

15from .exceptions import RateLimitError 

16 

17 

18class NotificationManager: 

19 """ 

20 High-level notification manager that uses settings snapshots for 

21 thread-safe access to user settings. 

22 

23 This manager is designed to be used from background threads (e.g., queue 

24 processors) by passing a settings_snapshot dictionary captured from the 

25 main thread. 

26 

27 **Per-User Rate Limiting:** 

28 The rate limiter is shared across ALL NotificationManager instances as a 

29 singleton, but supports per-user rate limit configuration. Each user has 

30 their own rate limits based on their settings, which are configured when 

31 the NotificationManager is initialized with the required user_id parameter. 

32 

33 **How It Works:** 

34 - The first NotificationManager instance creates the shared RateLimiter 

35 with default limits 

36 - Each instance configures user-specific limits by passing user_id to __init__ 

37 - The rate limiter maintains separate counters and limits for each user 

38 - Users are completely isolated - one user's limit doesn't affect others 

39 

40 Example: 

41 >>> # User A with conservative limits 

42 >>> snapshot_a = {"notifications.rate_limit_per_hour": 5} 

43 >>> manager_a = NotificationManager(snapshot_a, user_id="user_a") 

44 >>> # ✅ user_a gets 5/hour 

45 >>> 

46 >>> # User B with generous limits (doesn't affect User A!) 

47 >>> snapshot_b = {"notifications.rate_limit_per_hour": 20} 

48 >>> manager_b = NotificationManager(snapshot_b, user_id="user_b") 

49 >>> # ✅ user_b gets 20/hour, user_a still has 5/hour 

50 """ 

51 

52 # Shared rate limiter instance across all NotificationManager instances 

53 # This ensures rate limits are enforced correctly even when multiple 

54 # NotificationManager instances are created 

55 _shared_rate_limiter: Optional["RateLimiter"] = None 

56 _rate_limiter_lock = threading.Lock() 

57 

58 def __init__(self, settings_snapshot: Dict[str, Any], user_id: str): 

59 """ 

60 Initialize the notification manager. 

61 

62 Args: 

63 settings_snapshot: Dictionary of settings key-value pairs captured 

64 from SettingsManager.get_settings_snapshot(). 

65 This allows thread-safe access to user settings 

66 from background threads. 

67 user_id: User identifier for per-user rate limiting. The rate limits 

68 from settings_snapshot will be configured for this user. 

69 

70 Example: 

71 >>> # In main thread with database session 

72 >>> settings_manager = SettingsManager(session) 

73 >>> snapshot = settings_manager.get_settings_snapshot() 

74 >>> 

75 >>> # In background thread (thread-safe) 

76 >>> notification_manager = NotificationManager( 

77 ... settings_snapshot=snapshot, 

78 ... user_id="user123" 

79 ... ) 

80 >>> notification_manager.send_notification(...) 

81 """ 

82 # Store settings snapshot for thread-safe access 

83 self._settings_snapshot = settings_snapshot 

84 self._user_id = user_id 

85 

86 # Security: read from server-side environment variable only — never from 

87 # user-writable DB settings. Previously this was read via 

88 # _get_setting("notifications.allow_private_ips"), which allowed any 

89 # user to bypass SSRF protection through the settings API. 

90 # Registered as env-only in settings/env_definitions/security.py 

91 # so SettingsManager will always read LDR_NOTIFICATIONS_ALLOW_PRIVATE_IPS 

92 # from the environment, never from the database. 

93 allow_private_ips = get_env_setting( 

94 "notifications.allow_private_ips", False 

95 ) 

96 

97 # Server-level master switch (env-only). Distinct from the per-user 

98 # notifications.enabled toggle in the UI: this gate is set by the 

99 # deployment operator and cannot be flipped via the user-writable 

100 # settings API. Disabled by default; enabling it accepts the 

101 # documented DNS-rebinding TOCTOU residual risk 

102 # (see SECURITY.md "Notification Webhook SSRF"). 

103 self._outbound_allowed = bool( 

104 get_env_setting("notifications.allow_outbound", False) 

105 ) 

106 

107 self.service = NotificationService( 

108 allow_private_ips=allow_private_ips, 

109 outbound_allowed=self._outbound_allowed, 

110 ) 

111 

112 # Initialize shared rate limiter on first use 

113 # The shared rate limiter now supports per-user limits, so each user's 

114 # settings are respected regardless of initialization order. 

115 with NotificationManager._rate_limiter_lock: 

116 if NotificationManager._shared_rate_limiter is None: 

117 # Create shared rate limiter with default limits 

118 # (individual users can have different limits) 

119 default_max_per_hour = self._get_setting( 

120 "notifications.rate_limit_per_hour", default=10 

121 ) 

122 default_max_per_day = self._get_setting( 

123 "notifications.rate_limit_per_day", default=50 

124 ) 

125 

126 logger.info( 

127 f"Initializing shared rate limiter with defaults: " 

128 f"{default_max_per_hour}/hour, {default_max_per_day}/day" 

129 ) 

130 

131 NotificationManager._shared_rate_limiter = RateLimiter( 

132 max_per_hour=default_max_per_hour, 

133 max_per_day=default_max_per_day, 

134 ) 

135 

136 # Use the shared instance 

137 self._rate_limiter = NotificationManager._shared_rate_limiter 

138 

139 # Configure per-user rate limits 

140 max_per_hour = self._get_setting( 

141 "notifications.rate_limit_per_hour", default=10 

142 ) 

143 max_per_day = self._get_setting( 

144 "notifications.rate_limit_per_day", default=50 

145 ) 

146 

147 self._rate_limiter.set_user_limits( 

148 user_id, max_per_hour, max_per_day 

149 ) 

150 logger.debug( 

151 f"Configured rate limits for user {user_id}: " 

152 f"{max_per_hour}/hour, {max_per_day}/day" 

153 ) 

154 

155 def _get_setting(self, key: str, default: Any = None) -> Any: 

156 """ 

157 Get a setting value from snapshot. 

158 

159 Args: 

160 key: Setting key 

161 default: Default value if not found 

162 

163 Returns: 

164 Setting value or default 

165 """ 

166 return self._settings_snapshot.get(key, default) 

167 

168 def send_notification( 

169 self, 

170 event_type: EventType, 

171 context: Dict[str, Any], 

172 force: bool = False, 

173 ) -> bool: 

174 """ 

175 Send a notification for an event. 

176 

177 Uses the user_id that was provided during initialization for 

178 rate limiting and user preferences. 

179 

180 Args: 

181 event_type: Type of event 

182 context: Context data for the notification 

183 force: If True, bypass rate limiting 

184 

185 Returns: 

186 True if notification was sent successfully 

187 

188 Raises: 

189 RateLimitError: If rate limit is exceeded and force=False 

190 """ 

191 logger.debug( 

192 f"Sending notification: event_type={event_type.value}, " 

193 f"user_id={self._user_id}, force={force}" 

194 ) 

195 logger.debug(f"Context keys: {list(context.keys())}") 

196 

197 # Server-level master switch (env-only). Disabled by default; 

198 # flipping it on is the operator's acknowledgement of the 

199 # documented SSRF rebinding residual risk. force=True does NOT 

200 # bypass this — it bypasses the per-user event-type and 

201 # rate-limit toggles only. WARNING level so an operator wondering 

202 # "why aren't notifications firing?" sees the actionable signal 

203 # at default log level. 

204 if not self._outbound_allowed: 

205 logger.warning( 

206 "Notification refused: outbound notifications are disabled " 

207 "at the server level. Set " 

208 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable. See " 

209 "SECURITY.md 'Notification Webhook SSRF' for the rationale " 

210 "and residual risk. (event={}, user={})", 

211 event_type.value, 

212 self._user_id, 

213 ) 

214 return False 

215 

216 # Check if notifications are enabled for this event type 

217 should_notify = self._should_notify(event_type) 

218 logger.debug( 

219 f"Notification enabled check for {event_type.value}: " 

220 f"{should_notify}" 

221 ) 

222 

223 if not force and not should_notify: 

224 logger.debug( 

225 f"Notifications disabled for event type: " 

226 f"{event_type.value} (user: {self._user_id})" 

227 ) 

228 return False 

229 

230 # Check rate limit using the manager's user_id 

231 rate_limit_ok = self._rate_limiter.is_allowed(self._user_id) 

232 logger.debug(f"Rate limit check for {self._user_id}: {rate_limit_ok}") 

233 

234 if not force and not rate_limit_ok: 

235 logger.warning(f"Rate limit exceeded for user {self._user_id}") 

236 raise RateLimitError( 

237 "Notification rate limit exceeded. " 

238 "Please wait before sending more notifications." 

239 ) 

240 

241 try: 

242 # Get service URLs from settings (snapshot or database) 

243 service_urls = self._get_setting( 

244 "notifications.service_url", default="" 

245 ) 

246 

247 if not service_urls or not service_urls.strip(): 

248 logger.debug( 

249 f"No notification service URLs configured for user " 

250 f"{self._user_id}" 

251 ) 

252 return False 

253 

254 # Send notification with service URLs 

255 logger.debug(f"Calling service.send_event for {event_type.value}") 

256 result = self.service.send_event( 

257 event_type, context, service_urls=service_urls 

258 ) 

259 

260 # Log to database if enabled 

261 if result: 

262 self._log_notification(event_type, context) 

263 logger.info( 

264 f"Notification sent: {event_type.value} to user " 

265 f"{self._user_id}" 

266 ) 

267 else: 

268 logger.warning( 

269 f"Notification failed: {event_type.value} to user " 

270 f"{self._user_id}" 

271 ) 

272 

273 return result 

274 

275 except Exception: 

276 logger.exception( 

277 f"Error sending notification for {event_type.value} to user " 

278 f"{self._user_id}" 

279 ) 

280 return False 

281 

282 def test_service(self, url: str) -> Dict[str, Any]: 

283 """ 

284 Test a notification service. 

285 

286 Args: 

287 url: Service URL to test 

288 

289 Returns: 

290 Dict with test results 

291 """ 

292 return self.service.test_service(url) 

293 

294 def _should_notify(self, event_type: EventType) -> bool: 

295 """ 

296 Check if notifications should be sent for this event type. 

297 

298 Uses the manager's settings snapshot to determine if the event type 

299 is enabled for the user. 

300 

301 Args: 

302 event_type: Event type to check 

303 

304 Returns: 

305 True if notifications should be sent 

306 """ 

307 try: 

308 # Check event-specific setting (from snapshot or database) 

309 setting_key = f"notifications.on_{event_type.value}" 

310 enabled = self._get_setting(setting_key, default=False) 

311 

312 return bool(enabled) 

313 

314 except Exception: 

315 logger.warning("Error checking notification preferences") 

316 # Default to disabled on error to avoid infinite loops during login 

317 return False 

318 

319 def _log_notification( 

320 self, event_type: EventType, context: Dict[str, Any] 

321 ) -> None: 

322 """ 

323 Log a sent notification (simplified logging to application logs only). 

324 

325 Uses the manager's user_id for logging. 

326 

327 Args: 

328 event_type: Event type 

329 context: Notification context 

330 """ 

331 try: 

332 title = ( 

333 context.get("query") 

334 or context.get("subscription_name") 

335 or "Unknown" 

336 ) 

337 logger.info( 

338 f"Notification sent: {event_type.value} - {title} " 

339 f"(user: {self._user_id})" 

340 ) 

341 except Exception as e: 

342 logger.debug(f"Failed to log notification: {e}") 

343 

344 

345class RateLimiter: 

346 """ 

347 Simple in-memory rate limiter for notifications with per-user limit support. 

348 

349 This rate limiter tracks notification counts per user and enforces 

350 configurable rate limits. Each user can have their own rate limits, 

351 which are stored separately from the notification counts. 

352 

353 **Per-User Limits:** 

354 Rate limits can be configured per-user using `set_user_limits()`. 

355 If no user-specific limits are set, the default limits (passed to 

356 __init__) are used. 

357 

358 **Memory Storage:** 

359 This implementation stores rate limits in memory only, which means 

360 limits are reset when the server restarts. This is acceptable for normal 

361 users since they cannot restart the server. If an admin restarts the server, 

362 rate limits reset which is reasonable behavior. 

363 

364 **Thread Safety:** 

365 This implementation is thread-safe using threading.Lock() for concurrent 

366 requests from the same user. 

367 

368 **Multi-Worker Limitation:** 

369 In multi-worker deployments, each worker process maintains its own rate 

370 limit counters. Users could potentially bypass rate limits by distributing 

371 requests across different workers, getting up to N × max_per_hour 

372 notifications (where N = number of workers). For single-worker deployments 

373 (the default for LDR), this is not a concern. For production multi-worker 

374 deployments, consider implementing Redis-based rate limiting. 

375 

376 Example: 

377 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50) 

378 >>> # Set custom limits for specific user 

379 >>> limiter.set_user_limits("user_a", max_per_hour=5, max_per_day=25) 

380 >>> limiter.set_user_limits("user_b", max_per_hour=20, max_per_day=100) 

381 >>> # Users get their configured limits 

382 >>> limiter.is_allowed("user_a") # Limited to 5/hour 

383 >>> limiter.is_allowed("user_b") # Limited to 20/hour 

384 >>> limiter.is_allowed("user_c") # Uses defaults: 10/hour 

385 """ 

386 

387 def __init__( 

388 self, 

389 max_per_hour: int = 10, 

390 max_per_day: int = 50, 

391 cleanup_interval_hours: int = 24, 

392 ): 

393 """ 

394 Initialize rate limiter with default limits. 

395 

396 Args: 

397 max_per_hour: Default maximum notifications per hour per user 

398 max_per_day: Default maximum notifications per day per user 

399 cleanup_interval_hours: How often to run cleanup of inactive users (hours) 

400 """ 

401 # Default limits used when no user-specific limits are set 

402 self.max_per_hour = max_per_hour 

403 self.max_per_day = max_per_day 

404 self.cleanup_interval_hours = cleanup_interval_hours 

405 

406 # Per-user rate limit configuration (user_id -> (max_per_hour, max_per_day)) 

407 self._user_limits: Dict[str, tuple[int, int]] = {} 

408 

409 # Per-user notification counts 

410 self._hourly_counts: Dict[str, deque] = {} 

411 self._daily_counts: Dict[str, deque] = {} 

412 

413 self._last_cleanup = datetime.now(timezone.utc) 

414 self._lock = threading.Lock() # Thread safety for all operations 

415 

416 def set_user_limits( 

417 self, user_id: str, max_per_hour: int, max_per_day: int 

418 ) -> None: 

419 """ 

420 Set rate limits for a specific user. 

421 

422 This allows each user to have their own rate limit configuration. 

423 If not set, the user will use the default limits passed to __init__. 

424 

425 Args: 

426 user_id: User identifier 

427 max_per_hour: Maximum notifications per hour for this user 

428 max_per_day: Maximum notifications per day for this user 

429 

430 Example: 

431 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50) 

432 >>> limiter.set_user_limits("power_user", max_per_hour=20, max_per_day=100) 

433 >>> limiter.set_user_limits("limited_user", max_per_hour=5, max_per_day=25) 

434 """ 

435 with self._lock: 

436 self._user_limits[user_id] = (max_per_hour, max_per_day) 

437 logger.debug( 

438 f"Set rate limits for user {user_id}: " 

439 f"{max_per_hour}/hour, {max_per_day}/day" 

440 ) 

441 

442 def get_user_limits(self, user_id: str) -> tuple[int, int]: 

443 """ 

444 Get the effective rate limits for a user. 

445 

446 Returns the user-specific limits if set, otherwise returns defaults. 

447 

448 Args: 

449 user_id: User identifier 

450 

451 Returns: 

452 Tuple of (max_per_hour, max_per_day) 

453 """ 

454 with self._lock: 

455 return self._user_limits.get( 

456 user_id, (self.max_per_hour, self.max_per_day) 

457 ) 

458 

459 def is_allowed(self, user_id: str) -> bool: 

460 """ 

461 Check if a notification is allowed for a user. 

462 

463 Uses per-user rate limits if configured via set_user_limits(), 

464 otherwise uses the default limits from __init__. 

465 

466 Args: 

467 user_id: User identifier 

468 

469 Returns: 

470 True if notification is allowed, False if rate limit exceeded 

471 """ 

472 with self._lock: 

473 now = datetime.now(timezone.utc) 

474 

475 # Periodic cleanup of inactive users 

476 self._cleanup_inactive_users_if_needed(now) 

477 

478 # Initialize queues for user if needed 

479 if user_id not in self._hourly_counts: 

480 self._hourly_counts[user_id] = deque() 

481 self._daily_counts[user_id] = deque() 

482 

483 # Clean old entries 

484 self._clean_old_entries(user_id, now) 

485 

486 # Get user-specific limits or defaults 

487 max_per_hour, max_per_day = self._user_limits.get( 

488 user_id, (self.max_per_hour, self.max_per_day) 

489 ) 

490 

491 # Check limits 

492 hourly_count = len(self._hourly_counts[user_id]) 

493 daily_count = len(self._daily_counts[user_id]) 

494 

495 if hourly_count >= max_per_hour: 

496 logger.warning( 

497 f"Hourly rate limit exceeded for user {user_id}: " 

498 f"{hourly_count}/{max_per_hour}" 

499 ) 

500 return False 

501 

502 if daily_count >= max_per_day: 

503 logger.warning( 

504 f"Daily rate limit exceeded for user {user_id}: " 

505 f"{daily_count}/{max_per_day}" 

506 ) 

507 return False 

508 

509 # Record this notification 

510 self._hourly_counts[user_id].append(now) 

511 self._daily_counts[user_id].append(now) 

512 

513 return True 

514 

515 def _clean_old_entries(self, user_id: str, now: datetime) -> None: 

516 """ 

517 Remove old entries from rate limit counters. 

518 

519 Args: 

520 user_id: User identifier 

521 now: Current time 

522 """ 

523 hour_ago = now - timedelta(hours=1) 

524 day_ago = now - timedelta(days=1) 

525 

526 # Clean hourly queue 

527 while ( 

528 self._hourly_counts[user_id] 

529 and self._hourly_counts[user_id][0] < hour_ago 

530 ): 

531 self._hourly_counts[user_id].popleft() 

532 

533 # Clean daily queue 

534 while ( 

535 self._daily_counts[user_id] 

536 and self._daily_counts[user_id][0] < day_ago 

537 ): 

538 self._daily_counts[user_id].popleft() 

539 

540 def reset(self, user_id: Optional[str] = None) -> None: 

541 """ 

542 Reset rate limits for a user or all users. 

543 

544 Args: 

545 user_id: User to reset, or None for all users 

546 """ 

547 with self._lock: 

548 if user_id: 

549 self._hourly_counts.pop(user_id, None) 

550 self._daily_counts.pop(user_id, None) 

551 else: 

552 self._hourly_counts.clear() 

553 self._daily_counts.clear() 

554 

555 def _cleanup_inactive_users_if_needed(self, now: datetime) -> None: 

556 """ 

557 Periodically clean up data for inactive users to prevent memory leaks. 

558 

559 Args: 

560 now: Current timestamp 

561 """ 

562 # Check if cleanup is needed 

563 if now - self._last_cleanup < timedelta( 

564 hours=self.cleanup_interval_hours 

565 ): 

566 return 

567 

568 logger.debug("Running periodic cleanup of inactive notification users") 

569 

570 # Define inactive threshold (users with no activity for 7 days) 

571 inactive_threshold = now - timedelta(days=7) 

572 

573 inactive_users = [] 

574 

575 # Find users with no recent activity 

576 # Convert to list to avoid "dictionary changed size during iteration" error 

577 for user_id in list(self._hourly_counts.keys()): 

578 # Check if user has any recent entries 

579 hourly_entries: list = list(self._hourly_counts.get(user_id, [])) 

580 daily_entries: list = list(self._daily_counts.get(user_id, [])) 

581 

582 # If no entries or all entries are old, mark as inactive 

583 has_recent_activity = False 

584 for entry in hourly_entries + daily_entries: 

585 if entry > inactive_threshold: 

586 has_recent_activity = True 

587 break 

588 

589 if not has_recent_activity: 

590 inactive_users.append(user_id) 

591 

592 # Remove inactive users 

593 for user_id in inactive_users: 

594 self._hourly_counts.pop(user_id, None) 

595 self._daily_counts.pop(user_id, None) 

596 logger.debug( 

597 f"Cleaned up inactive user {user_id} from rate limiter" 

598 ) 

599 

600 if inactive_users: 600 ↛ 605line 600 didn't jump to line 605 because the condition on line 600 was always true

601 logger.info( 

602 f"Cleaned up {len(inactive_users)} inactive users from rate limiter" 

603 ) 

604 

605 self._last_cleanup = now