Coverage for src / local_deep_research / notifications / service.py: 98%
86 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Core notification service using Apprise.
3"""
5import re
6from typing import Dict, List, Optional, Any
7from urllib.parse import urlparse
9import apprise
10from loguru import logger
11from tenacity import (
12 retry,
13 stop_after_attempt,
14 wait_exponential,
15 retry_if_exception_type,
16)
18from .exceptions import ServiceError, SendError
19from .templates import EventType, NotificationTemplate
20from ..security.url_builder import mask_sensitive_url
21from ..security.notification_validator import (
22 NotificationURLValidator,
23)
25# Backward compatibility constants - now handled by Tenacity internally
26MAX_RETRY_ATTEMPTS = 3
27INITIAL_RETRY_DELAY = 0.5
28RETRY_BACKOFF_MULTIPLIER = 2
31class NotificationService:
32 """
33 Low-level notification service that wraps Apprise.
34 """
36 # Regex patterns for common service types (for validation)
37 SERVICE_PATTERNS = {
38 "email": r"^mailto://",
39 "discord": r"^discord://",
40 "slack": r"^slack://",
41 "telegram": r"^tgram://",
42 "smtp": r"^(smtp|smtps)://",
43 }
45 def __init__(self, allow_private_ips: bool = False):
46 """
47 Initialize the notification service.
49 Args:
50 allow_private_ips: Whether to allow notifications to private/local IPs
51 (default: False for security). Set to True for
52 development/testing environments only.
53 """
54 self.apprise = apprise.Apprise()
55 self.allow_private_ips = allow_private_ips
57 @retry(
58 stop=stop_after_attempt(3),
59 wait=wait_exponential(multiplier=1, min=0.5, max=10),
60 retry=retry_if_exception_type((Exception,)),
61 reraise=True,
62 )
63 def _send_with_retry(
64 self,
65 title: str,
66 body: str,
67 apprise_instance: apprise.Apprise,
68 tag: Optional[str] = None,
69 attach: Optional[List[str]] = None,
70 ) -> bool:
71 """
72 Send a notification using the provided Apprise instance with retry logic.
74 This method is decorated with Tenacity to handle retries automatically.
76 Args:
77 title: Notification title
78 body: Notification body text
79 apprise_instance: Apprise instance to use for sending
80 tag: Optional tag to target specific services
81 attach: Optional list of file paths to attach
83 Returns:
84 True if notification was sent successfully
86 Raises:
87 SendError: If sending fails after all retry attempts
88 """
89 logger.debug(
90 f"Sending notification: title='{title[:50]}...', tag={tag}"
91 )
92 logger.debug(f"Body preview: {body[:200]}...")
94 # Send notification
95 notify_result = apprise_instance.notify(
96 title=title,
97 body=body,
98 tag=tag,
99 attach=attach,
100 )
102 if notify_result:
103 logger.debug(f"Notification sent successfully: '{title[:50]}...'")
104 return True
105 error_msg = "Failed to send notification to any service"
106 logger.warning(error_msg)
107 raise SendError(error_msg)
109 def send(
110 self,
111 title: str,
112 body: str,
113 service_urls: Optional[str] = None,
114 tag: Optional[str] = None,
115 attach: Optional[List[str]] = None,
116 ) -> bool:
117 """
118 Send a notification to service URLs with automatic retry.
120 Args:
121 title: Notification title
122 body: Notification body text
123 service_urls: Comma-separated list of service URLs to override configured ones
124 tag: Optional tag to target specific services
125 attach: Optional list of file paths to attach
127 Returns:
128 True if notification was sent successfully to at least one service
130 Raises:
131 SendError: If sending fails after all retry attempts
133 Note:
134 Temporary Apprise instances are created for each send operation
135 and are automatically garbage collected by Python when they go
136 out of scope. This simple approach is ideal for small deployments
137 (~5 users) and avoids memory management complexity.
138 """
139 # If service_urls are provided, validate before trying to send
140 if service_urls:
141 logger.debug("Creating Apprise instance for provided service URLs")
143 # Validate service URLs for security (SSRF prevention)
144 is_valid, error_msg = (
145 NotificationURLValidator.validate_multiple_urls(
146 service_urls, allow_private_ips=self.allow_private_ips
147 )
148 )
150 if not is_valid:
151 logger.error(
152 f"Service URL validation failed: {error_msg}. "
153 f"URL: {mask_sensitive_url(service_urls)}"
154 )
155 raise ServiceError(f"Invalid service URL: {error_msg}")
157 try:
158 # If service_urls are provided, create a new Apprise instance
159 if service_urls:
160 temp_apprise = apprise.Apprise()
161 result = temp_apprise.add(service_urls, tag=tag)
163 if not result:
164 logger.error(
165 f"Failed to add service URLs to Apprise: "
166 f"{mask_sensitive_url(service_urls)}"
167 )
168 return False
170 # Send notification with the temp instance (with retry)
171 return self._send_with_retry(
172 title, body, temp_apprise, tag, attach
173 )
174 # Use the configured apprise instance
175 if len(self.apprise) == 0: 175 ↛ 180line 175 didn't jump to line 180 because the condition on line 175 was always true
176 logger.debug("No notification services configured in Apprise")
177 return False
179 # Send notification (with retry)
180 return self._send_with_retry(title, body, self.apprise, tag, attach)
182 except Exception as e:
183 # Tenacity will retry, but if all retries fail, raise SendError
184 logger.exception(
185 f"Failed to send notification after retries: '{title[:50]}...'"
186 )
187 raise SendError(f"Failed to send notification: {str(e)}")
189 def send_event(
190 self,
191 event_type: EventType,
192 context: Dict[str, Any],
193 service_urls: Optional[str] = None,
194 tag: Optional[str] = None,
195 custom_template: Optional[Dict[str, str]] = None,
196 ) -> bool:
197 """
198 Send a notification for a specific event type.
200 Args:
201 event_type: Type of event
202 context: Context data for template formatting
203 service_urls: Comma-separated list of service URLs
204 tag: Optional tag to target specific services
205 custom_template: Optional custom template override
207 Returns:
208 True if notification was sent successfully
209 """
210 logger.debug(f"send_event: event_type={event_type.value}, tag={tag}")
211 logger.debug(f"Context: {context}")
213 # Format notification using template
214 message = NotificationTemplate.format(
215 event_type, context, custom_template
216 )
217 logger.debug(
218 f"Template formatted - title: '{message['title'][:50]}...'"
219 )
221 # Send notification
222 return self.send(
223 title=message["title"],
224 body=message["body"],
225 service_urls=service_urls,
226 tag=tag,
227 )
229 def test_service(self, url: str) -> Dict[str, Any]:
230 """
231 Test a notification service.
233 Args:
234 url: Apprise-compatible service URL
236 Returns:
237 Dict with 'success' boolean and optional 'error' message
238 """
239 try:
240 # Validate service URL for security (SSRF prevention)
241 is_valid, error_msg = NotificationURLValidator.validate_service_url(
242 url, allow_private_ips=self.allow_private_ips
243 )
245 if not is_valid:
246 logger.warning(
247 f"Test service URL validation failed: {error_msg}. "
248 f"URL: {mask_sensitive_url(url)}"
249 )
250 return {
251 "success": False,
252 "error": "Invalid notification service URL.",
253 }
255 # Create temporary Apprise instance
256 temp_apprise = apprise.Apprise()
257 add_result = temp_apprise.add(url)
259 if not add_result:
260 return {
261 "success": False,
262 "error": "Failed to add service URL",
263 }
265 # Send test notification
266 result = temp_apprise.notify(
267 title="Test Notification",
268 body=(
269 "This is a test notification from Local Deep Research. "
270 "If you see this, your service is configured correctly!"
271 ),
272 )
274 if result:
275 return {
276 "success": True,
277 "message": "Test notification sent successfully",
278 }
279 return {
280 "success": False,
281 "error": "Failed to send test notification",
282 }
284 except Exception:
285 logger.exception("Error testing notification service")
286 return {
287 "success": False,
288 "error": "Failed to test notification service.",
289 }
291 @staticmethod
292 def _validate_url(url: str) -> None:
293 """
294 Validate a notification service URL.
296 Args:
297 url: URL to validate
299 Raises:
300 ServiceError: If URL is invalid
302 Note:
303 URL scheme validation is handled by Apprise itself, which maintains
304 a comprehensive whitelist of supported notification services.
305 Apprise will reject unsupported schemes like 'file://' or 'javascript://'.
306 See: https://github.com/caronc/apprise/wiki
307 """
308 if not url or not isinstance(url, str):
309 raise ServiceError("URL must be a non-empty string")
311 # Check if it looks like a URL
312 parsed = urlparse(url)
313 if not parsed.scheme:
314 raise ServiceError(
315 "Invalid URL format. Must be an Apprise-compatible "
316 "service URL (e.g., discord://webhook_id/token)"
317 )
319 def get_service_type(self, url: str) -> Optional[str]:
320 """
321 Detect service type from URL.
323 Args:
324 url: Service URL
326 Returns:
327 Service type name or None if unknown
328 """
329 for service_name, pattern in self.SERVICE_PATTERNS.items():
330 if re.match(pattern, url, re.IGNORECASE):
331 return service_name
332 return "unknown"