Coverage for src / local_deep_research / notifications / manager.py: 92%

149 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2High-level notification manager with database integration. 

3""" 

4 

5from datetime import datetime, timezone, timedelta 

6from typing import Dict, Optional, Any 

7from collections import deque 

8import threading 

9 

10from loguru import logger 

11 

12from .service import NotificationService 

13from .templates import EventType 

14from .exceptions import RateLimitError 

15 

16 

17class NotificationManager: 

18 """ 

19 High-level notification manager that uses settings snapshots for 

20 thread-safe access to user settings. 

21 

22 This manager is designed to be used from background threads (e.g., queue 

23 processors) by passing a settings_snapshot dictionary captured from the 

24 main thread. 

25 

26 **Per-User Rate Limiting:** 

27 The rate limiter is shared across ALL NotificationManager instances as a 

28 singleton, but supports per-user rate limit configuration. Each user has 

29 their own rate limits based on their settings, which are configured when 

30 the NotificationManager is initialized with the required user_id parameter. 

31 

32 **How It Works:** 

33 - The first NotificationManager instance creates the shared RateLimiter 

34 with default limits 

35 - Each instance configures user-specific limits by passing user_id to __init__ 

36 - The rate limiter maintains separate counters and limits for each user 

37 - Users are completely isolated - one user's limit doesn't affect others 

38 

39 Example: 

40 >>> # User A with conservative limits 

41 >>> snapshot_a = {"notifications.rate_limit_per_hour": 5} 

42 >>> manager_a = NotificationManager(snapshot_a, user_id="user_a") 

43 >>> # ✅ user_a gets 5/hour 

44 >>> 

45 >>> # User B with generous limits (doesn't affect User A!) 

46 >>> snapshot_b = {"notifications.rate_limit_per_hour": 20} 

47 >>> manager_b = NotificationManager(snapshot_b, user_id="user_b") 

48 >>> # ✅ user_b gets 20/hour, user_a still has 5/hour 

49 """ 

50 

51 # Shared rate limiter instance across all NotificationManager instances 

52 # This ensures rate limits are enforced correctly even when multiple 

53 # NotificationManager instances are created 

54 _shared_rate_limiter: Optional["RateLimiter"] = None 

55 _rate_limiter_lock = threading.Lock() 

56 

57 def __init__(self, settings_snapshot: Dict[str, Any], user_id: str): 

58 """ 

59 Initialize the notification manager. 

60 

61 Args: 

62 settings_snapshot: Dictionary of settings key-value pairs captured 

63 from SettingsManager.get_settings_snapshot(). 

64 This allows thread-safe access to user settings 

65 from background threads. 

66 user_id: User identifier for per-user rate limiting. The rate limits 

67 from settings_snapshot will be configured for this user. 

68 

69 Example: 

70 >>> # In main thread with database session 

71 >>> settings_manager = SettingsManager(session) 

72 >>> snapshot = settings_manager.get_settings_snapshot() 

73 >>> 

74 >>> # In background thread (thread-safe) 

75 >>> notification_manager = NotificationManager( 

76 ... settings_snapshot=snapshot, 

77 ... user_id="user123" 

78 ... ) 

79 >>> notification_manager.send_notification(...) 

80 """ 

81 # Store settings snapshot for thread-safe access 

82 self._settings_snapshot = settings_snapshot 

83 self._user_id = user_id 

84 

85 # Get security settings for notification service 

86 # Default to False for security - only enable for development/testing 

87 allow_private_ips = self._get_setting( 

88 "notifications.allow_private_ips", default=False 

89 ) 

90 

91 self.service = NotificationService(allow_private_ips=allow_private_ips) 

92 

93 # Initialize shared rate limiter on first use 

94 # The shared rate limiter now supports per-user limits, so each user's 

95 # settings are respected regardless of initialization order. 

96 with NotificationManager._rate_limiter_lock: 

97 if NotificationManager._shared_rate_limiter is None: 

98 # Create shared rate limiter with default limits 

99 # (individual users can have different limits) 

100 default_max_per_hour = self._get_setting( 

101 "notifications.rate_limit_per_hour", default=10 

102 ) 

103 default_max_per_day = self._get_setting( 

104 "notifications.rate_limit_per_day", default=50 

105 ) 

106 

107 logger.info( 

108 f"Initializing shared rate limiter with defaults: " 

109 f"{default_max_per_hour}/hour, {default_max_per_day}/day" 

110 ) 

111 

112 NotificationManager._shared_rate_limiter = RateLimiter( 

113 max_per_hour=default_max_per_hour, 

114 max_per_day=default_max_per_day, 

115 ) 

116 

117 # Use the shared instance 

118 self._rate_limiter = NotificationManager._shared_rate_limiter 

119 

120 # Configure per-user rate limits 

121 max_per_hour = self._get_setting( 

122 "notifications.rate_limit_per_hour", default=10 

123 ) 

124 max_per_day = self._get_setting( 

125 "notifications.rate_limit_per_day", default=50 

126 ) 

127 

128 self._rate_limiter.set_user_limits( 

129 user_id, max_per_hour, max_per_day 

130 ) 

131 logger.debug( 

132 f"Configured rate limits for user {user_id}: " 

133 f"{max_per_hour}/hour, {max_per_day}/day" 

134 ) 

135 

136 def _get_setting(self, key: str, default: Any = None) -> Any: 

137 """ 

138 Get a setting value from snapshot. 

139 

140 Args: 

141 key: Setting key 

142 default: Default value if not found 

143 

144 Returns: 

145 Setting value or default 

146 """ 

147 return self._settings_snapshot.get(key, default) 

148 

149 def send_notification( 

150 self, 

151 event_type: EventType, 

152 context: Dict[str, Any], 

153 force: bool = False, 

154 ) -> bool: 

155 """ 

156 Send a notification for an event. 

157 

158 Uses the user_id that was provided during initialization for 

159 rate limiting and user preferences. 

160 

161 Args: 

162 event_type: Type of event 

163 context: Context data for the notification 

164 force: If True, bypass rate limiting 

165 

166 Returns: 

167 True if notification was sent successfully 

168 

169 Raises: 

170 RateLimitError: If rate limit is exceeded and force=False 

171 """ 

172 try: 

173 logger.debug( 

174 f"Sending notification: event_type={event_type.value}, " 

175 f"user_id={self._user_id}, force={force}" 

176 ) 

177 logger.debug(f"Context keys: {list(context.keys())}") 

178 

179 # Check if notifications are enabled for this event type 

180 should_notify = self._should_notify(event_type) 

181 logger.debug( 

182 f"Notification enabled check for {event_type.value}: " 

183 f"{should_notify}" 

184 ) 

185 

186 if not force and not should_notify: 

187 logger.debug( 

188 f"Notifications disabled for event type: " 

189 f"{event_type.value} (user: {self._user_id})" 

190 ) 

191 return False 

192 

193 # Check rate limit using the manager's user_id 

194 rate_limit_ok = self._rate_limiter.is_allowed(self._user_id) 

195 logger.debug( 

196 f"Rate limit check for {self._user_id}: {rate_limit_ok}" 

197 ) 

198 

199 if not force and not rate_limit_ok: 

200 logger.warning(f"Rate limit exceeded for user {self._user_id}") 

201 raise RateLimitError( 

202 "Notification rate limit exceeded. " 

203 "Please wait before sending more notifications." 

204 ) 

205 

206 # Get service URLs from settings (snapshot or database) 

207 service_urls = self._get_setting( 

208 "notifications.service_url", default="" 

209 ) 

210 

211 if not service_urls or not service_urls.strip(): 

212 logger.debug( 

213 f"No notification service URLs configured for user " 

214 f"{self._user_id}" 

215 ) 

216 return False 

217 

218 # Send notification with service URLs 

219 logger.debug(f"Calling service.send_event for {event_type.value}") 

220 result = self.service.send_event( 

221 event_type, context, service_urls=service_urls 

222 ) 

223 

224 # Log to database if enabled 

225 if result: 

226 self._log_notification(event_type, context) 

227 logger.info( 

228 f"Notification sent: {event_type.value} to user " 

229 f"{self._user_id}" 

230 ) 

231 else: 

232 logger.warning( 

233 f"Notification failed: {event_type.value} to user " 

234 f"{self._user_id}" 

235 ) 

236 

237 return result 

238 

239 except RateLimitError: 

240 logger.warning(f"Rate limit error for user {self._user_id}") 

241 raise 

242 except Exception as e: 

243 logger.exception( 

244 f"Error sending notification for {event_type.value} to user " 

245 f"{self._user_id}: {e}" 

246 ) 

247 return False 

248 

249 def test_service(self, url: str) -> Dict[str, Any]: 

250 """ 

251 Test a notification service. 

252 

253 Args: 

254 url: Service URL to test 

255 

256 Returns: 

257 Dict with test results 

258 """ 

259 return self.service.test_service(url) 

260 

261 def _should_notify(self, event_type: EventType) -> bool: 

262 """ 

263 Check if notifications should be sent for this event type. 

264 

265 Uses the manager's settings snapshot to determine if the event type 

266 is enabled for the user. 

267 

268 Args: 

269 event_type: Event type to check 

270 

271 Returns: 

272 True if notifications should be sent 

273 """ 

274 try: 

275 # Check event-specific setting (from snapshot or database) 

276 setting_key = f"notifications.on_{event_type.value}" 

277 enabled = self._get_setting(setting_key, default=False) 

278 

279 return enabled 

280 

281 except Exception as e: 

282 logger.debug(f"Error checking notification preferences: {e}") 

283 # Default to disabled on error to avoid infinite loops during login 

284 return False 

285 

286 def _log_notification( 

287 self, event_type: EventType, context: Dict[str, Any] 

288 ) -> None: 

289 """ 

290 Log a sent notification (simplified logging to application logs only). 

291 

292 Uses the manager's user_id for logging. 

293 

294 Args: 

295 event_type: Event type 

296 context: Notification context 

297 """ 

298 try: 

299 title = ( 

300 context.get("query") 

301 or context.get("subscription_name") 

302 or "Unknown" 

303 ) 

304 logger.info( 

305 f"Notification sent: {event_type.value} - {title} " 

306 f"(user: {self._user_id})" 

307 ) 

308 except Exception as e: 

309 logger.debug(f"Failed to log notification: {e}") 

310 

311 

312class RateLimiter: 

313 """ 

314 Simple in-memory rate limiter for notifications with per-user limit support. 

315 

316 This rate limiter tracks notification counts per user and enforces 

317 configurable rate limits. Each user can have their own rate limits, 

318 which are stored separately from the notification counts. 

319 

320 **Per-User Limits:** 

321 Rate limits can be configured per-user using `set_user_limits()`. 

322 If no user-specific limits are set, the default limits (passed to 

323 __init__) are used. 

324 

325 **Memory Storage:** 

326 This implementation stores rate limits in memory only, which means 

327 limits are reset when the server restarts. This is acceptable for normal 

328 users since they cannot restart the server. If an admin restarts the server, 

329 rate limits reset which is reasonable behavior. 

330 

331 **Thread Safety:** 

332 This implementation is thread-safe using threading.Lock() for concurrent 

333 requests from the same user. 

334 

335 **Multi-Worker Limitation:** 

336 In multi-worker deployments, each worker process maintains its own rate 

337 limit counters. Users could potentially bypass rate limits by distributing 

338 requests across different workers, getting up to N × max_per_hour 

339 notifications (where N = number of workers). For single-worker deployments 

340 (the default for LDR), this is not a concern. For production multi-worker 

341 deployments, consider implementing Redis-based rate limiting. 

342 

343 Example: 

344 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50) 

345 >>> # Set custom limits for specific user 

346 >>> limiter.set_user_limits("user_a", max_per_hour=5, max_per_day=25) 

347 >>> limiter.set_user_limits("user_b", max_per_hour=20, max_per_day=100) 

348 >>> # Users get their configured limits 

349 >>> limiter.is_allowed("user_a") # Limited to 5/hour 

350 >>> limiter.is_allowed("user_b") # Limited to 20/hour 

351 >>> limiter.is_allowed("user_c") # Uses defaults: 10/hour 

352 """ 

353 

354 def __init__( 

355 self, 

356 max_per_hour: int = 10, 

357 max_per_day: int = 50, 

358 cleanup_interval_hours: int = 24, 

359 ): 

360 """ 

361 Initialize rate limiter with default limits. 

362 

363 Args: 

364 max_per_hour: Default maximum notifications per hour per user 

365 max_per_day: Default maximum notifications per day per user 

366 cleanup_interval_hours: How often to run cleanup of inactive users (hours) 

367 """ 

368 # Default limits used when no user-specific limits are set 

369 self.max_per_hour = max_per_hour 

370 self.max_per_day = max_per_day 

371 self.cleanup_interval_hours = cleanup_interval_hours 

372 

373 # Per-user rate limit configuration (user_id -> (max_per_hour, max_per_day)) 

374 self._user_limits: Dict[str, tuple[int, int]] = {} 

375 

376 # Per-user notification counts 

377 self._hourly_counts: Dict[str, deque] = {} 

378 self._daily_counts: Dict[str, deque] = {} 

379 

380 self._last_cleanup = datetime.now(timezone.utc) 

381 self._lock = threading.Lock() # Thread safety for all operations 

382 

383 def set_user_limits( 

384 self, user_id: str, max_per_hour: int, max_per_day: int 

385 ) -> None: 

386 """ 

387 Set rate limits for a specific user. 

388 

389 This allows each user to have their own rate limit configuration. 

390 If not set, the user will use the default limits passed to __init__. 

391 

392 Args: 

393 user_id: User identifier 

394 max_per_hour: Maximum notifications per hour for this user 

395 max_per_day: Maximum notifications per day for this user 

396 

397 Example: 

398 >>> limiter = RateLimiter(max_per_hour=10, max_per_day=50) 

399 >>> limiter.set_user_limits("power_user", max_per_hour=20, max_per_day=100) 

400 >>> limiter.set_user_limits("limited_user", max_per_hour=5, max_per_day=25) 

401 """ 

402 with self._lock: 

403 self._user_limits[user_id] = (max_per_hour, max_per_day) 

404 logger.debug( 

405 f"Set rate limits for user {user_id}: " 

406 f"{max_per_hour}/hour, {max_per_day}/day" 

407 ) 

408 

409 def get_user_limits(self, user_id: str) -> tuple[int, int]: 

410 """ 

411 Get the effective rate limits for a user. 

412 

413 Returns the user-specific limits if set, otherwise returns defaults. 

414 

415 Args: 

416 user_id: User identifier 

417 

418 Returns: 

419 Tuple of (max_per_hour, max_per_day) 

420 """ 

421 with self._lock: 

422 return self._user_limits.get( 

423 user_id, (self.max_per_hour, self.max_per_day) 

424 ) 

425 

426 def is_allowed(self, user_id: str) -> bool: 

427 """ 

428 Check if a notification is allowed for a user. 

429 

430 Uses per-user rate limits if configured via set_user_limits(), 

431 otherwise uses the default limits from __init__. 

432 

433 Args: 

434 user_id: User identifier 

435 

436 Returns: 

437 True if notification is allowed, False if rate limit exceeded 

438 """ 

439 with self._lock: 

440 now = datetime.now(timezone.utc) 

441 

442 # Periodic cleanup of inactive users 

443 self._cleanup_inactive_users_if_needed(now) 

444 

445 # Initialize queues for user if needed 

446 if user_id not in self._hourly_counts: 

447 self._hourly_counts[user_id] = deque() 

448 self._daily_counts[user_id] = deque() 

449 

450 # Clean old entries 

451 self._clean_old_entries(user_id, now) 

452 

453 # Get user-specific limits or defaults 

454 max_per_hour, max_per_day = self._user_limits.get( 

455 user_id, (self.max_per_hour, self.max_per_day) 

456 ) 

457 

458 # Check limits 

459 hourly_count = len(self._hourly_counts[user_id]) 

460 daily_count = len(self._daily_counts[user_id]) 

461 

462 if hourly_count >= max_per_hour: 

463 logger.warning( 

464 f"Hourly rate limit exceeded for user {user_id}: " 

465 f"{hourly_count}/{max_per_hour}" 

466 ) 

467 return False 

468 

469 if daily_count >= max_per_day: 

470 logger.warning( 

471 f"Daily rate limit exceeded for user {user_id}: " 

472 f"{daily_count}/{max_per_day}" 

473 ) 

474 return False 

475 

476 # Record this notification 

477 self._hourly_counts[user_id].append(now) 

478 self._daily_counts[user_id].append(now) 

479 

480 return True 

481 

482 def _clean_old_entries(self, user_id: str, now: datetime) -> None: 

483 """ 

484 Remove old entries from rate limit counters. 

485 

486 Args: 

487 user_id: User identifier 

488 now: Current time 

489 """ 

490 hour_ago = now - timedelta(hours=1) 

491 day_ago = now - timedelta(days=1) 

492 

493 # Clean hourly queue 

494 while ( 494 ↛ 498line 494 didn't jump to line 498 because the condition on line 494 was never true

495 self._hourly_counts[user_id] 

496 and self._hourly_counts[user_id][0] < hour_ago 

497 ): 

498 self._hourly_counts[user_id].popleft() 

499 

500 # Clean daily queue 

501 while ( 501 ↛ 505line 501 didn't jump to line 505 because the condition on line 501 was never true

502 self._daily_counts[user_id] 

503 and self._daily_counts[user_id][0] < day_ago 

504 ): 

505 self._daily_counts[user_id].popleft() 

506 

507 def reset(self, user_id: Optional[str] = None) -> None: 

508 """ 

509 Reset rate limits for a user or all users. 

510 

511 Args: 

512 user_id: User to reset, or None for all users 

513 """ 

514 if user_id: 

515 self._hourly_counts.pop(user_id, None) 

516 self._daily_counts.pop(user_id, None) 

517 else: 

518 self._hourly_counts.clear() 

519 self._daily_counts.clear() 

520 

521 def _cleanup_inactive_users_if_needed(self, now: datetime) -> None: 

522 """ 

523 Periodically clean up data for inactive users to prevent memory leaks. 

524 

525 Args: 

526 now: Current timestamp 

527 """ 

528 # Check if cleanup is needed 

529 if now - self._last_cleanup < timedelta( 

530 hours=self.cleanup_interval_hours 

531 ): 

532 return 

533 

534 logger.debug("Running periodic cleanup of inactive notification users") 

535 

536 # Define inactive threshold (users with no activity for 7 days) 

537 inactive_threshold = now - timedelta(days=7) 

538 

539 inactive_users = [] 

540 

541 # Find users with no recent activity 

542 # Convert to list to avoid "dictionary changed size during iteration" error 

543 for user_id in list(self._hourly_counts.keys()): 

544 # Check if user has any recent entries 

545 hourly_entries = self._hourly_counts.get(user_id, []) 

546 daily_entries = self._daily_counts.get(user_id, []) 

547 

548 # If no entries or all entries are old, mark as inactive 

549 has_recent_activity = False 

550 for entry in hourly_entries + daily_entries: 

551 if entry > inactive_threshold: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 has_recent_activity = True 

553 break 

554 

555 if not has_recent_activity: 555 ↛ 543line 555 didn't jump to line 543 because the condition on line 555 was always true

556 inactive_users.append(user_id) 

557 

558 # Remove inactive users 

559 for user_id in inactive_users: 

560 self._hourly_counts.pop(user_id, None) 

561 self._daily_counts.pop(user_id, None) 

562 logger.debug( 

563 f"Cleaned up inactive user {user_id} from rate limiter" 

564 ) 

565 

566 if inactive_users: 566 ↛ 571line 566 didn't jump to line 571 because the condition on line 566 was always true

567 logger.info( 

568 f"Cleaned up {len(inactive_users)} inactive users from rate limiter" 

569 ) 

570 

571 self._last_cleanup = now