Coverage for src / local_deep_research / notifications / service.py: 98%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Core notification service using Apprise. 

3""" 

4 

5import re 

6from typing import Dict, List, Optional, Any 

7from urllib.parse import urlparse 

8 

9import apprise 

10from loguru import logger 

11from tenacity import ( 

12 retry, 

13 stop_after_attempt, 

14 wait_exponential, 

15 retry_if_exception_type, 

16) 

17 

18from .exceptions import ServiceError, SendError 

19from .templates import EventType, NotificationTemplate 

20from ..security.url_builder import mask_sensitive_url 

21from ..security.notification_validator import ( 

22 NotificationURLValidator, 

23) 

24 

25# Backward compatibility constants - now handled by Tenacity internally 

26MAX_RETRY_ATTEMPTS = 3 

27INITIAL_RETRY_DELAY = 0.5 

28RETRY_BACKOFF_MULTIPLIER = 2 

29 

30 

31class NotificationService: 

32 """ 

33 Low-level notification service that wraps Apprise. 

34 """ 

35 

36 # Regex patterns for common service types (for validation) 

37 SERVICE_PATTERNS = { 

38 "email": r"^mailto://", 

39 "discord": r"^discord://", 

40 "slack": r"^slack://", 

41 "telegram": r"^tgram://", 

42 "smtp": r"^(smtp|smtps)://", 

43 } 

44 

45 def __init__(self, allow_private_ips: bool = False): 

46 """ 

47 Initialize the notification service. 

48 

49 Args: 

50 allow_private_ips: Whether to allow notifications to private/local IPs 

51 (default: False for security). Set to True for 

52 development/testing environments only. 

53 """ 

54 self.apprise = apprise.Apprise() 

55 self.allow_private_ips = allow_private_ips 

56 

57 @retry( 

58 stop=stop_after_attempt(3), 

59 wait=wait_exponential(multiplier=1, min=0.5, max=10), 

60 retry=retry_if_exception_type((Exception,)), 

61 reraise=True, 

62 ) 

63 def _send_with_retry( 

64 self, 

65 title: str, 

66 body: str, 

67 apprise_instance: apprise.Apprise, 

68 tag: Optional[str] = None, 

69 attach: Optional[List[str]] = None, 

70 ) -> bool: 

71 """ 

72 Send a notification using the provided Apprise instance with retry logic. 

73 

74 This method is decorated with Tenacity to handle retries automatically. 

75 

76 Args: 

77 title: Notification title 

78 body: Notification body text 

79 apprise_instance: Apprise instance to use for sending 

80 tag: Optional tag to target specific services 

81 attach: Optional list of file paths to attach 

82 

83 Returns: 

84 True if notification was sent successfully 

85 

86 Raises: 

87 SendError: If sending fails after all retry attempts 

88 """ 

89 logger.debug( 

90 f"Sending notification: title='{title[:50]}...', tag={tag}" 

91 ) 

92 logger.debug(f"Body preview: {body[:200]}...") 

93 

94 # Send notification 

95 notify_result = apprise_instance.notify( 

96 title=title, 

97 body=body, 

98 tag=tag, 

99 attach=attach, 

100 ) 

101 

102 if notify_result: 

103 logger.debug(f"Notification sent successfully: '{title[:50]}...'") 

104 return True 

105 error_msg = "Failed to send notification to any service" 

106 logger.warning(error_msg) 

107 raise SendError(error_msg) 

108 

109 def send( 

110 self, 

111 title: str, 

112 body: str, 

113 service_urls: Optional[str] = None, 

114 tag: Optional[str] = None, 

115 attach: Optional[List[str]] = None, 

116 ) -> bool: 

117 """ 

118 Send a notification to service URLs with automatic retry. 

119 

120 Args: 

121 title: Notification title 

122 body: Notification body text 

123 service_urls: Comma-separated list of service URLs to override configured ones 

124 tag: Optional tag to target specific services 

125 attach: Optional list of file paths to attach 

126 

127 Returns: 

128 True if notification was sent successfully to at least one service 

129 

130 Raises: 

131 SendError: If sending fails after all retry attempts 

132 

133 Note: 

134 Temporary Apprise instances are created for each send operation 

135 and are automatically garbage collected by Python when they go 

136 out of scope. This simple approach is ideal for small deployments 

137 (~5 users) and avoids memory management complexity. 

138 """ 

139 # If service_urls are provided, validate before trying to send 

140 if service_urls: 

141 logger.debug("Creating Apprise instance for provided service URLs") 

142 

143 # Validate service URLs for security (SSRF prevention) 

144 is_valid, error_msg = ( 

145 NotificationURLValidator.validate_multiple_urls( 

146 service_urls, allow_private_ips=self.allow_private_ips 

147 ) 

148 ) 

149 

150 if not is_valid: 

151 logger.error( 

152 f"Service URL validation failed: {error_msg}. " 

153 f"URL: {mask_sensitive_url(service_urls)}" 

154 ) 

155 raise ServiceError(f"Invalid service URL: {error_msg}") 

156 

157 try: 

158 # If service_urls are provided, create a new Apprise instance 

159 if service_urls: 

160 temp_apprise = apprise.Apprise() 

161 result = temp_apprise.add(service_urls, tag=tag) 

162 

163 if not result: 

164 logger.error( 

165 f"Failed to add service URLs to Apprise: " 

166 f"{mask_sensitive_url(service_urls)}" 

167 ) 

168 return False 

169 

170 # Send notification with the temp instance (with retry) 

171 return self._send_with_retry( 

172 title, body, temp_apprise, tag, attach 

173 ) 

174 # Use the configured apprise instance 

175 if len(self.apprise) == 0: 175 ↛ 180line 175 didn't jump to line 180 because the condition on line 175 was always true

176 logger.debug("No notification services configured in Apprise") 

177 return False 

178 

179 # Send notification (with retry) 

180 return self._send_with_retry(title, body, self.apprise, tag, attach) 

181 

182 except Exception as e: 

183 # Tenacity will retry, but if all retries fail, raise SendError 

184 logger.exception( 

185 f"Failed to send notification after retries: '{title[:50]}...'" 

186 ) 

187 raise SendError(f"Failed to send notification: {str(e)}") 

188 

189 def send_event( 

190 self, 

191 event_type: EventType, 

192 context: Dict[str, Any], 

193 service_urls: Optional[str] = None, 

194 tag: Optional[str] = None, 

195 custom_template: Optional[Dict[str, str]] = None, 

196 ) -> bool: 

197 """ 

198 Send a notification for a specific event type. 

199 

200 Args: 

201 event_type: Type of event 

202 context: Context data for template formatting 

203 service_urls: Comma-separated list of service URLs 

204 tag: Optional tag to target specific services 

205 custom_template: Optional custom template override 

206 

207 Returns: 

208 True if notification was sent successfully 

209 """ 

210 logger.debug(f"send_event: event_type={event_type.value}, tag={tag}") 

211 logger.debug(f"Context: {context}") 

212 

213 # Format notification using template 

214 message = NotificationTemplate.format( 

215 event_type, context, custom_template 

216 ) 

217 logger.debug( 

218 f"Template formatted - title: '{message['title'][:50]}...'" 

219 ) 

220 

221 # Send notification 

222 return self.send( 

223 title=message["title"], 

224 body=message["body"], 

225 service_urls=service_urls, 

226 tag=tag, 

227 ) 

228 

229 def test_service(self, url: str) -> Dict[str, Any]: 

230 """ 

231 Test a notification service. 

232 

233 Args: 

234 url: Apprise-compatible service URL 

235 

236 Returns: 

237 Dict with 'success' boolean and optional 'error' message 

238 """ 

239 try: 

240 # Validate service URL for security (SSRF prevention) 

241 is_valid, error_msg = NotificationURLValidator.validate_service_url( 

242 url, allow_private_ips=self.allow_private_ips 

243 ) 

244 

245 if not is_valid: 

246 logger.warning( 

247 f"Test service URL validation failed: {error_msg}. " 

248 f"URL: {mask_sensitive_url(url)}" 

249 ) 

250 return { 

251 "success": False, 

252 "error": "Invalid notification service URL.", 

253 } 

254 

255 # Create temporary Apprise instance 

256 temp_apprise = apprise.Apprise() 

257 add_result = temp_apprise.add(url) 

258 

259 if not add_result: 

260 return { 

261 "success": False, 

262 "error": "Failed to add service URL", 

263 } 

264 

265 # Send test notification 

266 result = temp_apprise.notify( 

267 title="Test Notification", 

268 body=( 

269 "This is a test notification from Local Deep Research. " 

270 "If you see this, your service is configured correctly!" 

271 ), 

272 ) 

273 

274 if result: 

275 return { 

276 "success": True, 

277 "message": "Test notification sent successfully", 

278 } 

279 return { 

280 "success": False, 

281 "error": "Failed to send test notification", 

282 } 

283 

284 except Exception: 

285 logger.exception("Error testing notification service") 

286 return { 

287 "success": False, 

288 "error": "Failed to test notification service.", 

289 } 

290 

291 @staticmethod 

292 def _validate_url(url: str) -> None: 

293 """ 

294 Validate a notification service URL. 

295 

296 Args: 

297 url: URL to validate 

298 

299 Raises: 

300 ServiceError: If URL is invalid 

301 

302 Note: 

303 URL scheme validation is handled by Apprise itself, which maintains 

304 a comprehensive whitelist of supported notification services. 

305 Apprise will reject unsupported schemes like 'file://' or 'javascript://'. 

306 See: https://github.com/caronc/apprise/wiki 

307 """ 

308 if not url or not isinstance(url, str): 

309 raise ServiceError("URL must be a non-empty string") 

310 

311 # Check if it looks like a URL 

312 parsed = urlparse(url) 

313 if not parsed.scheme: 

314 raise ServiceError( 

315 "Invalid URL format. Must be an Apprise-compatible " 

316 "service URL (e.g., discord://webhook_id/token)" 

317 ) 

318 

319 def get_service_type(self, url: str) -> Optional[str]: 

320 """ 

321 Detect service type from URL. 

322 

323 Args: 

324 url: Service URL 

325 

326 Returns: 

327 Service type name or None if unknown 

328 """ 

329 for service_name, pattern in self.SERVICE_PATTERNS.items(): 

330 if re.match(pattern, url, re.IGNORECASE): 

331 return service_name 

332 return "unknown"