Coverage for src / local_deep_research / notifications / manager.py: 99%

147 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2High-level notification manager with database integration. 

3""" 

4 

5from datetime import datetime, timezone, timedelta 

6from typing import Dict, Optional, Any 

7from collections import deque 

8import threading 

9 

10from loguru import logger 

11 

12from ..settings.env_registry import get_env_setting 

13from .service import NotificationService 

14from .templates import EventType 

15from .exceptions import RateLimitError 

16 

17 

18class NotificationManager: 

19 """ 

20 High-level notification manager that uses settings snapshots for 

21 thread-safe access to user settings. 

22 

23 This manager is designed to be used from background threads (e.g., queue 

24 processors) by passing a settings_snapshot dictionary captured from the 

25 main thread. 

26 

27 **Per-User Rate Limiting:** 

28 The rate limiter is shared across ALL NotificationManager instances as a 

29 singleton, but supports per-user rate limit configuration. Each user has 

30 their own rate limits based on their settings, which are configured when 

31 the NotificationManager is initialized with the required user_id parameter. 

32 

33 **How It Works:** 

34 - The first NotificationManager instance creates the shared RateLimiter 

35 with default limits 

36 - Each instance configures user-specific limits by passing user_id to __init__ 

37 - The rate limiter maintains separate counters and limits for each user 

38 - Users are completely isolated - one user's limit doesn't affect others 

39 

40 Example: 

41 >>> # User A with conservative limits 

42 >>> snapshot_a = {"notifications.rate_limit_per_hour": 5} 

43 >>> manager_a = NotificationManager(snapshot_a, user_id="user_a") 

44 >>> # ✅ user_a gets 5/hour 

45 >>> 

46 >>> # User B with generous limits (doesn't affect User A!) 

47 >>> snapshot_b = {"notifications.rate_limit_per_hour": 20} 

48 >>> manager_b = NotificationManager(snapshot_b, user_id="user_b") 

49 >>> # ✅ user_b gets 20/hour, user_a still has 5/hour 

50 """ 

51 

52 # Shared rate limiter instance across all NotificationManager instances 

53 # This ensures rate limits are enforced correctly even when multiple 

54 # NotificationManager instances are created 

55 _shared_rate_limiter: Optional["RateLimiter"] = None 

56 _rate_limiter_lock = threading.Lock() 

57 

58 def __init__(self, settings_snapshot: Dict[str, Any], user_id: str): 

59 """ 

60 Initialize the notification manager. 

61 

62 Args: 

63 settings_snapshot: Dictionary of settings key-value pairs captured 

64 from SettingsManager.get_settings_snapshot(). 

65 This allows thread-safe access to user settings 

66 from background threads. 

67 user_id: User identifier for per-user rate limiting. The rate limits 

68 from settings_snapshot will be configured for this user. 

69 

70 Example: 

71 >>> # In main thread with database session 

72 >>> settings_manager = SettingsManager(session) 

73 >>> snapshot = settings_manager.get_settings_snapshot() 

74 >>> 

75 >>> # In background thread (thread-safe) 

76 >>> notification_manager = NotificationManager( 

77 ... settings_snapshot=snapshot, 

78 ... user_id="user123" 

79 ... ) 

80 >>> notification_manager.send_notification(...) 

81 """ 

82 # Store settings snapshot for thread-safe access 

83 self._settings_snapshot = settings_snapshot 

84 self._user_id = user_id 

85 

86 # Security: read from server-side environment variable only — never from 

87 # user-writable DB settings. Previously this was read via 

88 # _get_setting("notifications.allow_private_ips"), which allowed any 

89 # user to bypass SSRF protection through the settings API. 

90 # Registered as env-only in settings/env_definitions/security.py 

91 # so SettingsManager will always read LDR_NOTIFICATIONS_ALLOW_PRIVATE_IPS 

92 # from the environment, never from the database. 

93 allow_private_ips = get_env_setting( 

94 "notifications.allow_private_ips", False 

95 ) 

96 

97 self.service = NotificationService(allow_private_ips=allow_private_ips) 

98 

99 # Initialize shared rate limiter on first use 

100 # The shared rate limiter now supports per-user limits, so each user's 

101 # settings are respected regardless of initialization order. 

102 with NotificationManager._rate_limiter_lock: 

103 if NotificationManager._shared_rate_limiter is None: 

104 # Create shared rate limiter with default limits 

105 # (individual users can have different limits) 

106 default_max_per_hour = self._get_setting( 

107 "notifications.rate_limit_per_hour", default=10 

108 ) 

109 default_max_per_day = self._get_setting( 

110 "notifications.rate_limit_per_day", default=50 

111 ) 

112 

113 logger.info( 

114 f"Initializing shared rate limiter with defaults: " 

115 f"{default_max_per_hour}/hour, {default_max_per_day}/day" 

116 ) 

117 

118 NotificationManager._shared_rate_limiter = RateLimiter( 

119 max_per_hour=default_max_per_hour, 

120 max_per_day=default_max_per_day, 

121 ) 

122 

123 # Use the shared instance 

124 self._rate_limiter = NotificationManager._shared_rate_limiter 

125 

126 # Configure per-user rate limits 

127 max_per_hour = self._get_setting( 

128 "notifications.rate_limit_per_hour", default=10 

129 ) 

130 max_per_day = self._get_setting( 

131 "notifications.rate_limit_per_day", default=50 

132 ) 

133 

134 self._rate_limiter.set_user_limits( 

135 user_id, max_per_hour, max_per_day 

136 ) 

137 logger.debug( 

138 f"Configured rate limits for user {user_id}: " 

139 f"{max_per_hour}/hour, {max_per_day}/day" 

140 ) 

141 

142 def _get_setting(self, key: str, default: Any = None) -> Any: 

143 """ 

144 Get a setting value from snapshot. 

145 

146 Args: 

147 key: Setting key 

148 default: Default value if not found 

149 

150 Returns: 

151 Setting value or default 

152 """ 

153 return self._settings_snapshot.get(key, default) 

154 

155 def send_notification( 

156 self, 

157 event_type: EventType, 

158 context: Dict[str, Any], 

159 force: bool = False, 

160 ) -> bool: 

161 """ 

162 Send a notification for an event. 

163 

164 Uses the user_id that was provided during initialization for 

165 rate limiting and user preferences. 

166 

167 Args: 

168 event_type: Type of event 

169 context: Context data for the notification 

170 force: If True, bypass rate limiting 

171 

172 Returns: 

173 True if notification was sent successfully 

174 

175 Raises: 

176 RateLimitError: If rate limit is exceeded and force=False 

177 """ 

178 logger.debug( 

179 f"Sending notification: event_type={event_type.value}, " 

180 f"user_id={self._user_id}, force={force}" 

181 ) 

182 logger.debug(f"Context keys: {list(context.keys())}") 

183 

184 # Check if notifications are enabled for this event type 

185 should_notify = self._should_notify(event_type) 

186 logger.debug( 

187 f"Notification enabled check for {event_type.value}: " 

188 f"{should_notify}" 

189 ) 

190 

191 if not force and not should_notify: 

192 logger.debug( 

193 f"Notifications disabled for event type: " 

194 f"{event_type.value} (user: {self._user_id})" 

195 ) 

196 return False 

197 

198 # Check rate limit using the manager's user_id 

199 rate_limit_ok = self._rate_limiter.is_allowed(self._user_id) 

200 logger.debug(f"Rate limit check for {self._user_id}: {rate_limit_ok}") 

201 

202 if not force and not rate_limit_ok: 

203 logger.warning(f"Rate limit exceeded for user {self._user_id}") 

204 raise RateLimitError( 

205 "Notification rate limit exceeded. " 

206 "Please wait before sending more notifications." 

207 ) 

208 

209 try: 

210 # Get service URLs from settings (snapshot or database) 

211 service_urls = self._get_setting( 

212 "notifications.service_url", default="" 

213 ) 

214 

215 if not service_urls or not service_urls.strip(): 

216 logger.debug( 

217 f"No notification service URLs configured for user " 

218 f"{self._user_id}" 

219 ) 

220 return False 

221 

222 # Send notification with service URLs 

223 logger.debug(f"Calling service.send_event for {event_type.value}") 

224 result = self.service.send_event( 

225 event_type, context, service_urls=service_urls 

226 ) 

227 

228 # Log to database if enabled 

229 if result: 

230 self._log_notification(event_type, context) 

231 logger.info( 

232 f"Notification sent: {event_type.value} to user " 

233 f"{self._user_id}" 

234 ) 

235 else: 

236 logger.warning( 

237 f"Notification failed: {event_type.value} to user " 

238 f"{self._user_id}" 

239 ) 

240 

241 return result 

242 

243 except Exception: 

244 logger.exception( 

245 f"Error sending notification for {event_type.value} to user " 

246 f"{self._user_id}" 

247 ) 

248 return False 

249 

250 def test_service(self, url: str) -> Dict[str, Any]: 

251 """ 

252 Test a notification service. 

253 

254 Args: 

255 url: Service URL to test 

256 

257 Returns: 

258 Dict with test results 

259 """ 

260 return self.service.test_service(url) 

261 

262 def _should_notify(self, event_type: EventType) -> bool: 

263 """ 

264 Check if notifications should be sent for this event type. 

265 

266 Uses the manager's settings snapshot to determine if the event type 

267 is enabled for the user. 

268 

269 Args: 

270 event_type: Event type to check 

271 

272 Returns: 

273 True if notifications should be sent 

274 """ 

275 try: 

276 # Check event-specific setting (from snapshot or database) 

277 setting_key = f"notifications.on_{event_type.value}" 

278 enabled = self._get_setting(setting_key, default=False) 

279 

280 return bool(enabled) 

281 

282 except Exception: 

283 logger.warning("Error checking notification preferences") 

284 # Default to disabled on error to avoid infinite loops during login 

285 return False 

286 

287 def _log_notification( 

288 self, event_type: EventType, context: Dict[str, Any] 

289 ) -> None: 

290 """ 

291 Log a sent notification (simplified logging to application logs only). 

292 

293 Uses the manager's user_id for logging. 

294 

295 Args: 

296 event_type: Event type 

297 context: Notification context 

298 """ 

299 try: 

300 title = ( 

301 context.get("query") 

302 or context.get("subscription_name") 

303 or "Unknown" 

304 ) 

305 logger.info( 

306 f"Notification sent: {event_type.value} - {title} " 

307 f"(user: {self._user_id})" 

308 ) 

309 except Exception as e: 

310 logger.debug(f"Failed to log notification: {e}") 

311 

312 

313class RateLimiter: 

314 """ 

315 Simple in-memory rate limiter for notifications with per-user limit support. 

316 

317 This rate limiter tracks notification counts per user and enforces 

318 configurable rate limits. Each user can have their own rate limits, 

319 which are stored separately from the notification counts. 

320 

321 **Per-User Limits:** 

322 Rate limits can be configured per-user using `set_user_limits()`. 

323 If no user-specific limits are set, the default limits (passed to 

324 __init__) are used. 

325 

326 **Memory Storage:** 

327 This implementation stores rate limits in memory only, which means 

328 limits are reset when the server restarts. This is acceptable for normal 

329 users since they cannot restart the server. If an admin restarts the server, 

330 rate limits reset which is reasonable behavior. 

331 

332 **Thread Safety:** 

333 This implementation is thread-safe using threading.Lock() for concurrent 

334 requests from the same user. 

335 

336 **Multi-Worker Limitation:** 

337 In multi-worker deployments, each worker process maintains its own rate 

338 limit counters. Users could potentially bypass rate limits by distributing 

339 requests across different workers, getting up to N × max_per_hour 

340 notifications (where N = number of workers). For single-worker deployments 

341 (the default for LDR), this is not a concern. For production multi-worker 

342 deployments, consider implementing Redis-based rate limiting. 

343 

344 Example: 

345 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50) 

346 >>> # Set custom limits for specific user 

347 >>> limiter.set_user_limits("user_a", max_per_hour=5, max_per_day=25) 

348 >>> limiter.set_user_limits("user_b", max_per_hour=20, max_per_day=100) 

349 >>> # Users get their configured limits 

350 >>> limiter.is_allowed("user_a") # Limited to 5/hour 

351 >>> limiter.is_allowed("user_b") # Limited to 20/hour 

352 >>> limiter.is_allowed("user_c") # Uses defaults: 10/hour 

353 """ 

354 

355 def __init__( 

356 self, 

357 max_per_hour: int = 10, 

358 max_per_day: int = 50, 

359 cleanup_interval_hours: int = 24, 

360 ): 

361 """ 

362 Initialize rate limiter with default limits. 

363 

364 Args: 

365 max_per_hour: Default maximum notifications per hour per user 

366 max_per_day: Default maximum notifications per day per user 

367 cleanup_interval_hours: How often to run cleanup of inactive users (hours) 

368 """ 

369 # Default limits used when no user-specific limits are set 

370 self.max_per_hour = max_per_hour 

371 self.max_per_day = max_per_day 

372 self.cleanup_interval_hours = cleanup_interval_hours 

373 

374 # Per-user rate limit configuration (user_id -> (max_per_hour, max_per_day)) 

375 self._user_limits: Dict[str, tuple[int, int]] = {} 

376 

377 # Per-user notification counts 

378 self._hourly_counts: Dict[str, deque] = {} 

379 self._daily_counts: Dict[str, deque] = {} 

380 

381 self._last_cleanup = datetime.now(timezone.utc) 

382 self._lock = threading.Lock() # Thread safety for all operations 

383 

384 def set_user_limits( 

385 self, user_id: str, max_per_hour: int, max_per_day: int 

386 ) -> None: 

387 """ 

388 Set rate limits for a specific user. 

389 

390 This allows each user to have their own rate limit configuration. 

391 If not set, the user will use the default limits passed to __init__. 

392 

393 Args: 

394 user_id: User identifier 

395 max_per_hour: Maximum notifications per hour for this user 

396 max_per_day: Maximum notifications per day for this user 

397 

398 Example: 

399 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50) 

400 >>> limiter.set_user_limits("power_user", max_per_hour=20, max_per_day=100) 

401 >>> limiter.set_user_limits("limited_user", max_per_hour=5, max_per_day=25) 

402 """ 

403 with self._lock: 

404 self._user_limits[user_id] = (max_per_hour, max_per_day) 

405 logger.debug( 

406 f"Set rate limits for user {user_id}: " 

407 f"{max_per_hour}/hour, {max_per_day}/day" 

408 ) 

409 

410 def get_user_limits(self, user_id: str) -> tuple[int, int]: 

411 """ 

412 Get the effective rate limits for a user. 

413 

414 Returns the user-specific limits if set, otherwise returns defaults. 

415 

416 Args: 

417 user_id: User identifier 

418 

419 Returns: 

420 Tuple of (max_per_hour, max_per_day) 

421 """ 

422 with self._lock: 

423 return self._user_limits.get( 

424 user_id, (self.max_per_hour, self.max_per_day) 

425 ) 

426 

427 def is_allowed(self, user_id: str) -> bool: 

428 """ 

429 Check if a notification is allowed for a user. 

430 

431 Uses per-user rate limits if configured via set_user_limits(), 

432 otherwise uses the default limits from __init__. 

433 

434 Args: 

435 user_id: User identifier 

436 

437 Returns: 

438 True if notification is allowed, False if rate limit exceeded 

439 """ 

440 with self._lock: 

441 now = datetime.now(timezone.utc) 

442 

443 # Periodic cleanup of inactive users 

444 self._cleanup_inactive_users_if_needed(now) 

445 

446 # Initialize queues for user if needed 

447 if user_id not in self._hourly_counts: 

448 self._hourly_counts[user_id] = deque() 

449 self._daily_counts[user_id] = deque() 

450 

451 # Clean old entries 

452 self._clean_old_entries(user_id, now) 

453 

454 # Get user-specific limits or defaults 

455 max_per_hour, max_per_day = self._user_limits.get( 

456 user_id, (self.max_per_hour, self.max_per_day) 

457 ) 

458 

459 # Check limits 

460 hourly_count = len(self._hourly_counts[user_id]) 

461 daily_count = len(self._daily_counts[user_id]) 

462 

463 if hourly_count >= max_per_hour: 

464 logger.warning( 

465 f"Hourly rate limit exceeded for user {user_id}: " 

466 f"{hourly_count}/{max_per_hour}" 

467 ) 

468 return False 

469 

470 if daily_count >= max_per_day: 

471 logger.warning( 

472 f"Daily rate limit exceeded for user {user_id}: " 

473 f"{daily_count}/{max_per_day}" 

474 ) 

475 return False 

476 

477 # Record this notification 

478 self._hourly_counts[user_id].append(now) 

479 self._daily_counts[user_id].append(now) 

480 

481 return True 

482 

483 def _clean_old_entries(self, user_id: str, now: datetime) -> None: 

484 """ 

485 Remove old entries from rate limit counters. 

486 

487 Args: 

488 user_id: User identifier 

489 now: Current time 

490 """ 

491 hour_ago = now - timedelta(hours=1) 

492 day_ago = now - timedelta(days=1) 

493 

494 # Clean hourly queue 

495 while ( 

496 self._hourly_counts[user_id] 

497 and self._hourly_counts[user_id][0] < hour_ago 

498 ): 

499 self._hourly_counts[user_id].popleft() 

500 

501 # Clean daily queue 

502 while ( 

503 self._daily_counts[user_id] 

504 and self._daily_counts[user_id][0] < day_ago 

505 ): 

506 self._daily_counts[user_id].popleft() 

507 

508 def reset(self, user_id: Optional[str] = None) -> None: 

509 """ 

510 Reset rate limits for a user or all users. 

511 

512 Args: 

513 user_id: User to reset, or None for all users 

514 """ 

515 if user_id: 

516 self._hourly_counts.pop(user_id, None) 

517 self._daily_counts.pop(user_id, None) 

518 else: 

519 self._hourly_counts.clear() 

520 self._daily_counts.clear() 

521 

522 def _cleanup_inactive_users_if_needed(self, now: datetime) -> None: 

523 """ 

524 Periodically clean up data for inactive users to prevent memory leaks. 

525 

526 Args: 

527 now: Current timestamp 

528 """ 

529 # Check if cleanup is needed 

530 if now - self._last_cleanup < timedelta( 

531 hours=self.cleanup_interval_hours 

532 ): 

533 return 

534 

535 logger.debug("Running periodic cleanup of inactive notification users") 

536 

537 # Define inactive threshold (users with no activity for 7 days) 

538 inactive_threshold = now - timedelta(days=7) 

539 

540 inactive_users = [] 

541 

542 # Find users with no recent activity 

543 # Convert to list to avoid "dictionary changed size during iteration" error 

544 for user_id in list(self._hourly_counts.keys()): 

545 # Check if user has any recent entries 

546 hourly_entries: list = list(self._hourly_counts.get(user_id, [])) 

547 daily_entries: list = list(self._daily_counts.get(user_id, [])) 

548 

549 # If no entries or all entries are old, mark as inactive 

550 has_recent_activity = False 

551 for entry in hourly_entries + daily_entries: 

552 if entry > inactive_threshold: 

553 has_recent_activity = True 

554 break 

555 

556 if not has_recent_activity: 

557 inactive_users.append(user_id) 

558 

559 # Remove inactive users 

560 for user_id in inactive_users: 

561 self._hourly_counts.pop(user_id, None) 

562 self._daily_counts.pop(user_id, None) 

563 logger.debug( 

564 f"Cleaned up inactive user {user_id} from rate limiter" 

565 ) 

566 

567 if inactive_users: 567 ↛ 572line 567 didn't jump to line 572 because the condition on line 567 was always true

568 logger.info( 

569 f"Cleaned up {len(inactive_users)} inactive users from rate limiter" 

570 ) 

571 

572 self._last_cleanup = now