Coverage for src / local_deep_research / notifications / service.py: 78%
86 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Core notification service using Apprise.
3"""
5import re
6from typing import Dict, List, Optional, Any
7from urllib.parse import urlparse
9import apprise
10from loguru import logger
11from tenacity import (
12 retry,
13 stop_after_attempt,
14 wait_exponential,
15 retry_if_exception_type,
16)
18from .exceptions import ServiceError, SendError
19from .templates import EventType, NotificationTemplate
20from ..security.url_builder import mask_sensitive_url
21from ..security.notification_validator import (
22 NotificationURLValidator,
23)
25# Backward compatibility constants - now handled by Tenacity internally
26MAX_RETRY_ATTEMPTS = 3
27INITIAL_RETRY_DELAY = 0.5
28RETRY_BACKOFF_MULTIPLIER = 2
31class NotificationService:
32 """
33 Low-level notification service that wraps Apprise.
34 """
36 # Regex patterns for common service types (for validation)
37 SERVICE_PATTERNS = {
38 "email": r"^mailto://",
39 "discord": r"^discord://",
40 "slack": r"^slack://",
41 "telegram": r"^tgram://",
42 "smtp": r"^(smtp|smtps)://",
43 }
45 def __init__(self, allow_private_ips: bool = False):
46 """
47 Initialize the notification service.
49 Args:
50 allow_private_ips: Whether to allow notifications to private/local IPs
51 (default: False for security). Set to True for
52 development/testing environments only.
53 """
54 self.apprise = apprise.Apprise()
55 self.allow_private_ips = allow_private_ips
57 @retry(
58 stop=stop_after_attempt(3),
59 wait=wait_exponential(multiplier=1, min=0.5, max=10),
60 retry=retry_if_exception_type((Exception,)),
61 reraise=True,
62 )
63 def _send_with_retry(
64 self,
65 title: str,
66 body: str,
67 apprise_instance: apprise.Apprise,
68 tag: Optional[str] = None,
69 attach: Optional[List[str]] = None,
70 ) -> bool:
71 """
72 Send a notification using the provided Apprise instance with retry logic.
74 This method is decorated with Tenacity to handle retries automatically.
76 Args:
77 title: Notification title
78 body: Notification body text
79 apprise_instance: Apprise instance to use for sending
80 tag: Optional tag to target specific services
81 attach: Optional list of file paths to attach
83 Returns:
84 True if notification was sent successfully
86 Raises:
87 SendError: If sending fails after all retry attempts
88 """
89 logger.debug(
90 f"Sending notification: title='{title[:50]}...', tag={tag}"
91 )
92 logger.debug(f"Body preview: {body[:200]}...")
94 # Send notification
95 notify_result = apprise_instance.notify(
96 title=title,
97 body=body,
98 tag=tag,
99 attach=attach,
100 )
102 if notify_result:
103 logger.debug(f"Notification sent successfully: '{title[:50]}...'")
104 return True
105 else:
106 error_msg = "Failed to send notification to any service"
107 logger.warning(error_msg)
108 raise SendError(error_msg)
110 def send(
111 self,
112 title: str,
113 body: str,
114 service_urls: Optional[str] = None,
115 tag: Optional[str] = None,
116 attach: Optional[List[str]] = None,
117 ) -> bool:
118 """
119 Send a notification to service URLs with automatic retry.
121 Args:
122 title: Notification title
123 body: Notification body text
124 service_urls: Comma-separated list of service URLs to override configured ones
125 tag: Optional tag to target specific services
126 attach: Optional list of file paths to attach
128 Returns:
129 True if notification was sent successfully to at least one service
131 Raises:
132 SendError: If sending fails after all retry attempts
134 Note:
135 Temporary Apprise instances are created for each send operation
136 and are automatically garbage collected by Python when they go
137 out of scope. This simple approach is ideal for small deployments
138 (~5 users) and avoids memory management complexity.
139 """
140 try:
141 # If service_urls are provided, create a new Apprise instance
142 if service_urls:
143 logger.debug(
144 "Creating Apprise instance for provided service URLs"
145 )
147 # Validate service URLs for security (SSRF prevention)
148 is_valid, error_msg = (
149 NotificationURLValidator.validate_multiple_urls(
150 service_urls, allow_private_ips=self.allow_private_ips
151 )
152 )
154 if not is_valid: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 logger.error(
156 f"Service URL validation failed: {error_msg}. "
157 f"URL: {mask_sensitive_url(service_urls)}"
158 )
159 raise ServiceError(f"Invalid service URL: {error_msg}")
161 temp_apprise = apprise.Apprise()
162 result = temp_apprise.add(service_urls, tag=tag)
164 if not result: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 logger.error(
166 f"Failed to add service URLs to Apprise: "
167 f"{mask_sensitive_url(service_urls)}"
168 )
169 return False
171 # Send notification with the temp instance (with retry)
172 return self._send_with_retry(
173 title, body, temp_apprise, tag, attach
174 )
175 else:
176 # Use the configured apprise instance
177 if len(self.apprise) == 0: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true
178 logger.debug(
179 "No notification services configured in Apprise"
180 )
181 return False
183 # Send notification (with retry)
184 return self._send_with_retry(
185 title, body, self.apprise, tag, attach
186 )
188 except Exception as e:
189 # Tenacity will retry, but if all retries fail, raise SendError
190 logger.exception(
191 f"Failed to send notification after retries: '{title[:50]}...'"
192 )
193 raise SendError(f"Failed to send notification: {str(e)}")
195 def send_event(
196 self,
197 event_type: EventType,
198 context: Dict[str, Any],
199 service_urls: Optional[str] = None,
200 tag: Optional[str] = None,
201 custom_template: Optional[Dict[str, str]] = None,
202 ) -> bool:
203 """
204 Send a notification for a specific event type.
206 Args:
207 event_type: Type of event
208 context: Context data for template formatting
209 service_urls: Comma-separated list of service URLs
210 tag: Optional tag to target specific services
211 custom_template: Optional custom template override
213 Returns:
214 True if notification was sent successfully
215 """
216 logger.debug(f"send_event: event_type={event_type.value}, tag={tag}")
217 logger.debug(f"Context: {context}")
219 # Format notification using template
220 message = NotificationTemplate.format(
221 event_type, context, custom_template
222 )
223 logger.debug(
224 f"Template formatted - title: '{message['title'][:50]}...'"
225 )
227 # Send notification
228 result = self.send(
229 title=message["title"],
230 body=message["body"],
231 service_urls=service_urls,
232 tag=tag,
233 )
234 return result
236 def test_service(self, url: str) -> Dict[str, Any]:
237 """
238 Test a notification service.
240 Args:
241 url: Apprise-compatible service URL
243 Returns:
244 Dict with 'success' boolean and optional 'error' message
245 """
246 try:
247 # Validate service URL for security (SSRF prevention)
248 is_valid, error_msg = NotificationURLValidator.validate_service_url(
249 url, allow_private_ips=self.allow_private_ips
250 )
252 if not is_valid: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 logger.warning(
254 f"Test service URL validation failed: {error_msg}. "
255 f"URL: {mask_sensitive_url(url)}"
256 )
257 return {
258 "success": False,
259 "error": "Invalid notification service URL.",
260 }
262 # Create temporary Apprise instance
263 temp_apprise = apprise.Apprise()
264 add_result = temp_apprise.add(url)
266 if not add_result: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 return {
268 "success": False,
269 "error": "Failed to add service URL",
270 }
272 # Send test notification
273 result = temp_apprise.notify(
274 title="Test Notification",
275 body=(
276 "This is a test notification from Local Deep Research. "
277 "If you see this, your service is configured correctly!"
278 ),
279 )
281 if result: 281 ↛ 287line 281 didn't jump to line 287 because the condition on line 281 was always true
282 return {
283 "success": True,
284 "message": "Test notification sent successfully",
285 }
286 else:
287 return {
288 "success": False,
289 "error": "Failed to send test notification",
290 }
292 except Exception:
293 logger.exception("Error testing notification service")
294 return {
295 "success": False,
296 "error": "Failed to test notification service.",
297 }
299 @staticmethod
300 def _validate_url(url: str) -> None:
301 """
302 Validate a notification service URL.
304 Args:
305 url: URL to validate
307 Raises:
308 ServiceError: If URL is invalid
310 Note:
311 URL scheme validation is handled by Apprise itself, which maintains
312 a comprehensive whitelist of supported notification services.
313 Apprise will reject unsupported schemes like 'file://' or 'javascript://'.
314 See: https://github.com/caronc/apprise/wiki
315 """
316 if not url or not isinstance(url, str):
317 raise ServiceError("URL must be a non-empty string")
319 # Check if it looks like a URL
320 parsed = urlparse(url)
321 if not parsed.scheme:
322 raise ServiceError(
323 "Invalid URL format. Must be an Apprise-compatible "
324 "service URL (e.g., discord://webhook_id/token)"
325 )
327 def get_service_type(self, url: str) -> Optional[str]:
328 """
329 Detect service type from URL.
331 Args:
332 url: Service URL
334 Returns:
335 Service type name or None if unknown
336 """
337 for service_name, pattern in self.SERVICE_PATTERNS.items():
338 if re.match(pattern, url, re.IGNORECASE):
339 return service_name
340 return "unknown"