Coverage for src/local_deep_research/notifications/service.py: 98%
93 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Core notification service using Apprise.
3"""
5import re
6from typing import Dict, List, Optional, Any
7from urllib.parse import urlparse
9import apprise
10from loguru import logger
11from tenacity import (
12 retry,
13 stop_after_attempt,
14 wait_exponential,
15 retry_if_exception_type,
16)
18from .exceptions import ServiceError, SendError
19from .templates import EventType, NotificationTemplate
20from ..security.url_builder import mask_sensitive_url
21from ..security.notification_validator import (
22 NotificationURLValidator,
23)
25# Backward compatibility constants - now handled by Tenacity internally
26MAX_RETRY_ATTEMPTS = 3
27INITIAL_RETRY_DELAY = 0.5
28RETRY_BACKOFF_MULTIPLIER = 2
31class NotificationService:
32 """
33 Low-level notification service that wraps Apprise.
34 """
36 # Regex patterns for common service types (for validation)
37 SERVICE_PATTERNS = {
38 "email": r"^mailto://",
39 "discord": r"^discord://",
40 "slack": r"^slack://",
41 "telegram": r"^tgram://",
42 "smtp": r"^(smtp|smtps)://",
43 }
45 def __init__(
46 self,
47 allow_private_ips: bool = False,
48 outbound_allowed: bool = False,
49 ):
50 """
51 Initialize the notification service.
53 Args:
54 allow_private_ips: Whether to allow notifications to private/local IPs
55 (default: False for security). Set to True for
56 development/testing environments only.
57 outbound_allowed: Server-level master switch
58 (env-only via LDR_NOTIFICATIONS_ALLOW_OUTBOUND).
59 Default False — outbound notifications are off until
60 the operator opts in. See SECURITY.md "Notification
61 Webhook SSRF" for the rationale.
62 """
63 self.apprise = apprise.Apprise()
64 self.allow_private_ips = allow_private_ips
65 self.outbound_allowed = outbound_allowed
67 @retry(
68 stop=stop_after_attempt(3),
69 wait=wait_exponential(multiplier=1, min=0.5, max=10),
70 retry=retry_if_exception_type((Exception,)),
71 reraise=True,
72 )
73 def _send_with_retry(
74 self,
75 title: str,
76 body: str,
77 apprise_instance: apprise.Apprise,
78 tag: Optional[str] = None,
79 attach: Optional[List[str]] = None,
80 ) -> bool:
81 """
82 Send a notification using the provided Apprise instance with retry logic.
84 This method is decorated with Tenacity to handle retries automatically.
86 Args:
87 title: Notification title
88 body: Notification body text
89 apprise_instance: Apprise instance to use for sending
90 tag: Optional tag to target specific services
91 attach: Optional list of file paths to attach
93 Returns:
94 True if notification was sent successfully
96 Raises:
97 SendError: If sending fails after all retry attempts
98 """
99 logger.debug(
100 f"Sending notification: title='{title[:50]}...', tag={tag}"
101 )
102 logger.debug(f"Body preview: {body[:200]}...")
104 # Send notification
105 notify_result = apprise_instance.notify(
106 title=title,
107 body=body,
108 tag=tag,
109 attach=attach,
110 )
112 if notify_result:
113 logger.debug(f"Notification sent successfully: '{title[:50]}...'")
114 return True
115 error_msg = "Failed to send notification to any service"
116 logger.warning(error_msg)
117 raise SendError(error_msg)
119 def send(
120 self,
121 title: str,
122 body: str,
123 service_urls: Optional[str] = None,
124 tag: Optional[str] = None,
125 attach: Optional[List[str]] = None,
126 ) -> bool:
127 """
128 Send a notification to service URLs with automatic retry.
130 Args:
131 title: Notification title
132 body: Notification body text
133 service_urls: Comma-separated list of service URLs to override configured ones
134 tag: Optional tag to target specific services
135 attach: Optional list of file paths to attach
137 Returns:
138 True if notification was sent successfully to at least one service
140 Raises:
141 SendError: If sending fails after all retry attempts
143 Note:
144 Temporary Apprise instances are created for each send operation
145 and are automatically garbage collected by Python when they go
146 out of scope. This simple approach is ideal for small deployments
147 (~5 users) and avoids memory management complexity.
148 """
149 # Defense-in-depth: enforce the operator-level master switch at
150 # the service layer too, not just at NotificationManager.
151 # Today the manager always wraps this method, but keeping the
152 # gate here means a future direct caller cannot accidentally
153 # bypass it. See SECURITY.md "Notification Webhook SSRF".
154 if not self.outbound_allowed:
155 logger.warning(
156 "Notification not sent: outbound notifications are disabled "
157 "at the server level. Set "
158 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable. See "
159 "SECURITY.md 'Notification Webhook SSRF'."
160 )
161 return False
163 # If service_urls are provided, validate before trying to send
164 if service_urls:
165 logger.debug("Creating Apprise instance for provided service URLs")
167 # Validate service URLs for security (SSRF prevention)
168 is_valid, error_msg = (
169 NotificationURLValidator.validate_multiple_urls(
170 service_urls, allow_private_ips=self.allow_private_ips
171 )
172 )
174 if not is_valid:
175 logger.error(
176 f"Service URL validation failed: {error_msg}. "
177 f"URL: {mask_sensitive_url(service_urls)}"
178 )
179 raise ServiceError(f"Invalid service URL: {error_msg}")
181 try:
182 # If service_urls are provided, create a new Apprise instance
183 if service_urls:
184 temp_apprise = apprise.Apprise()
185 result = temp_apprise.add(service_urls, tag=tag)
187 if not result:
188 logger.error(
189 f"Failed to add service URLs to Apprise: "
190 f"{mask_sensitive_url(service_urls)}"
191 )
192 return False
194 # Send notification with the temp instance (with retry)
195 return self._send_with_retry(
196 title, body, temp_apprise, tag, attach
197 )
198 # Use the configured apprise instance
199 if len(self.apprise) == 0: 199 ↛ 204line 199 didn't jump to line 204 because the condition on line 199 was always true
200 logger.debug("No notification services configured in Apprise")
201 return False
203 # Send notification (with retry)
204 return self._send_with_retry(title, body, self.apprise, tag, attach)
206 except Exception as e:
207 # Tenacity will retry, but if all retries fail, raise SendError
208 logger.exception(
209 f"Failed to send notification after retries: '{title[:50]}...'"
210 )
211 raise SendError(f"Failed to send notification: {str(e)}")
213 def send_event(
214 self,
215 event_type: EventType,
216 context: Dict[str, Any],
217 service_urls: Optional[str] = None,
218 tag: Optional[str] = None,
219 custom_template: Optional[Dict[str, str]] = None,
220 ) -> bool:
221 """
222 Send a notification for a specific event type.
224 Args:
225 event_type: Type of event
226 context: Context data for template formatting
227 service_urls: Comma-separated list of service URLs
228 tag: Optional tag to target specific services
229 custom_template: Optional custom template override
231 Returns:
232 True if notification was sent successfully
233 """
234 logger.debug(f"send_event: event_type={event_type.value}, tag={tag}")
235 logger.debug(f"Context: {context}")
237 # Format notification using template
238 message = NotificationTemplate.format(
239 event_type, context, custom_template
240 )
241 logger.debug(
242 f"Template formatted - title: '{message['title'][:50]}...'"
243 )
245 # Send notification
246 return self.send(
247 title=message["title"],
248 body=message["body"],
249 service_urls=service_urls,
250 tag=tag,
251 )
253 def test_service(self, url: str) -> Dict[str, Any]:
254 """
255 Test a notification service.
257 Args:
258 url: Apprise-compatible service URL
260 Returns:
261 Dict with 'success' boolean and optional 'error' message
262 """
263 # Server-level master switch (env-only). Mirrors
264 # NotificationManager's gate so the "Send Test Notification"
265 # button cannot bypass it. WARNING level so the operator sees
266 # the actionable signal naming the env var.
267 if not self.outbound_allowed:
268 logger.warning(
269 "Notification test refused: outbound notifications are "
270 "disabled at the server level. Set "
271 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable. See "
272 "SECURITY.md 'Notification Webhook SSRF'."
273 )
274 return {
275 "success": False,
276 "error": (
277 "Outbound notifications are disabled. The server "
278 "administrator must set "
279 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable "
280 "notification webhooks. See SECURITY.md "
281 "'Notification Webhook SSRF' for details."
282 ),
283 }
285 try:
286 # Validate service URL for security (SSRF prevention)
287 is_valid, error_msg = NotificationURLValidator.validate_service_url(
288 url, allow_private_ips=self.allow_private_ips
289 )
291 if not is_valid:
292 logger.warning(
293 f"Test service URL validation failed: {error_msg}. "
294 f"URL: {mask_sensitive_url(url)}"
295 )
296 return {
297 "success": False,
298 "error": "Invalid notification service URL.",
299 }
301 # Create temporary Apprise instance
302 temp_apprise = apprise.Apprise()
303 add_result = temp_apprise.add(url)
305 if not add_result:
306 return {
307 "success": False,
308 "error": "Failed to add service URL",
309 }
311 # Send test notification
312 result = temp_apprise.notify(
313 title="Test Notification",
314 body=(
315 "This is a test notification from Local Deep Research. "
316 "If you see this, your service is configured correctly!"
317 ),
318 )
320 if result:
321 return {
322 "success": True,
323 "message": "Test notification sent successfully",
324 }
325 return {
326 "success": False,
327 "error": "Failed to send test notification",
328 }
330 except Exception:
331 logger.exception("Error testing notification service")
332 return {
333 "success": False,
334 "error": "Failed to test notification service.",
335 }
337 @staticmethod
338 def _validate_url(url: str) -> None:
339 """
340 Validate a notification service URL.
342 Args:
343 url: URL to validate
345 Raises:
346 ServiceError: If URL is invalid
348 Note:
349 URL scheme validation is handled by Apprise itself, which maintains
350 a comprehensive whitelist of supported notification services.
351 Apprise will reject unsupported schemes like 'file://' or 'javascript://'.
352 See: https://github.com/caronc/apprise/wiki
353 """
354 if not url or not isinstance(url, str):
355 raise ServiceError("URL must be a non-empty string")
357 # Check if it looks like a URL
358 parsed = urlparse(url)
359 if not parsed.scheme:
360 raise ServiceError(
361 "Invalid URL format. Must be an Apprise-compatible "
362 "service URL (e.g., discord://webhook_id/token)"
363 )
365 def get_service_type(self, url: str) -> Optional[str]:
366 """
367 Detect service type from URL.
369 Args:
370 url: Service URL
372 Returns:
373 Service type name or None if unknown
374 """
375 for service_name, pattern in self.SERVICE_PATTERNS.items():
376 if re.match(pattern, url, re.IGNORECASE):
377 return service_name
378 return "unknown"