Coverage for src / local_deep_research / notifications / service.py: 78%

86 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Core notification service using Apprise. 

3""" 

4 

5import re 

6from typing import Dict, List, Optional, Any 

7from urllib.parse import urlparse 

8 

9import apprise 

10from loguru import logger 

11from tenacity import ( 

12 retry, 

13 stop_after_attempt, 

14 wait_exponential, 

15 retry_if_exception_type, 

16) 

17 

18from .exceptions import ServiceError, SendError 

19from .templates import EventType, NotificationTemplate 

20from ..security.url_builder import mask_sensitive_url 

21from ..security.notification_validator import ( 

22 NotificationURLValidator, 

23) 

24 

25# Backward compatibility constants - now handled by Tenacity internally 

26MAX_RETRY_ATTEMPTS = 3 

27INITIAL_RETRY_DELAY = 0.5 

28RETRY_BACKOFF_MULTIPLIER = 2 

29 

30 

31class NotificationService: 

32 """ 

33 Low-level notification service that wraps Apprise. 

34 """ 

35 

36 # Regex patterns for common service types (for validation) 

37 SERVICE_PATTERNS = { 

38 "email": r"^mailto://", 

39 "discord": r"^discord://", 

40 "slack": r"^slack://", 

41 "telegram": r"^tgram://", 

42 "smtp": r"^(smtp|smtps)://", 

43 } 

44 

45 def __init__(self, allow_private_ips: bool = False): 

46 """ 

47 Initialize the notification service. 

48 

49 Args: 

50 allow_private_ips: Whether to allow notifications to private/local IPs 

51 (default: False for security). Set to True for 

52 development/testing environments only. 

53 """ 

54 self.apprise = apprise.Apprise() 

55 self.allow_private_ips = allow_private_ips 

56 

57 @retry( 

58 stop=stop_after_attempt(3), 

59 wait=wait_exponential(multiplier=1, min=0.5, max=10), 

60 retry=retry_if_exception_type((Exception,)), 

61 reraise=True, 

62 ) 

63 def _send_with_retry( 

64 self, 

65 title: str, 

66 body: str, 

67 apprise_instance: apprise.Apprise, 

68 tag: Optional[str] = None, 

69 attach: Optional[List[str]] = None, 

70 ) -> bool: 

71 """ 

72 Send a notification using the provided Apprise instance with retry logic. 

73 

74 This method is decorated with Tenacity to handle retries automatically. 

75 

76 Args: 

77 title: Notification title 

78 body: Notification body text 

79 apprise_instance: Apprise instance to use for sending 

80 tag: Optional tag to target specific services 

81 attach: Optional list of file paths to attach 

82 

83 Returns: 

84 True if notification was sent successfully 

85 

86 Raises: 

87 SendError: If sending fails after all retry attempts 

88 """ 

89 logger.debug( 

90 f"Sending notification: title='{title[:50]}...', tag={tag}" 

91 ) 

92 logger.debug(f"Body preview: {body[:200]}...") 

93 

94 # Send notification 

95 notify_result = apprise_instance.notify( 

96 title=title, 

97 body=body, 

98 tag=tag, 

99 attach=attach, 

100 ) 

101 

102 if notify_result: 

103 logger.debug(f"Notification sent successfully: '{title[:50]}...'") 

104 return True 

105 else: 

106 error_msg = "Failed to send notification to any service" 

107 logger.warning(error_msg) 

108 raise SendError(error_msg) 

109 

110 def send( 

111 self, 

112 title: str, 

113 body: str, 

114 service_urls: Optional[str] = None, 

115 tag: Optional[str] = None, 

116 attach: Optional[List[str]] = None, 

117 ) -> bool: 

118 """ 

119 Send a notification to service URLs with automatic retry. 

120 

121 Args: 

122 title: Notification title 

123 body: Notification body text 

124 service_urls: Comma-separated list of service URLs to override configured ones 

125 tag: Optional tag to target specific services 

126 attach: Optional list of file paths to attach 

127 

128 Returns: 

129 True if notification was sent successfully to at least one service 

130 

131 Raises: 

132 SendError: If sending fails after all retry attempts 

133 

134 Note: 

135 Temporary Apprise instances are created for each send operation 

136 and are automatically garbage collected by Python when they go 

137 out of scope. This simple approach is ideal for small deployments 

138 (~5 users) and avoids memory management complexity. 

139 """ 

140 try: 

141 # If service_urls are provided, create a new Apprise instance 

142 if service_urls: 

143 logger.debug( 

144 "Creating Apprise instance for provided service URLs" 

145 ) 

146 

147 # Validate service URLs for security (SSRF prevention) 

148 is_valid, error_msg = ( 

149 NotificationURLValidator.validate_multiple_urls( 

150 service_urls, allow_private_ips=self.allow_private_ips 

151 ) 

152 ) 

153 

154 if not is_valid: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 logger.error( 

156 f"Service URL validation failed: {error_msg}. " 

157 f"URL: {mask_sensitive_url(service_urls)}" 

158 ) 

159 raise ServiceError(f"Invalid service URL: {error_msg}") 

160 

161 temp_apprise = apprise.Apprise() 

162 result = temp_apprise.add(service_urls, tag=tag) 

163 

164 if not result: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 logger.error( 

166 f"Failed to add service URLs to Apprise: " 

167 f"{mask_sensitive_url(service_urls)}" 

168 ) 

169 return False 

170 

171 # Send notification with the temp instance (with retry) 

172 return self._send_with_retry( 

173 title, body, temp_apprise, tag, attach 

174 ) 

175 else: 

176 # Use the configured apprise instance 

177 if len(self.apprise) == 0: 177 ↛ 184line 177 didn't jump to line 184 because the condition on line 177 was always true

178 logger.debug( 

179 "No notification services configured in Apprise" 

180 ) 

181 return False 

182 

183 # Send notification (with retry) 

184 return self._send_with_retry( 

185 title, body, self.apprise, tag, attach 

186 ) 

187 

188 except Exception as e: 

189 # Tenacity will retry, but if all retries fail, raise SendError 

190 logger.exception( 

191 f"Failed to send notification after retries: '{title[:50]}...'" 

192 ) 

193 raise SendError(f"Failed to send notification: {str(e)}") 

194 

195 def send_event( 

196 self, 

197 event_type: EventType, 

198 context: Dict[str, Any], 

199 service_urls: Optional[str] = None, 

200 tag: Optional[str] = None, 

201 custom_template: Optional[Dict[str, str]] = None, 

202 ) -> bool: 

203 """ 

204 Send a notification for a specific event type. 

205 

206 Args: 

207 event_type: Type of event 

208 context: Context data for template formatting 

209 service_urls: Comma-separated list of service URLs 

210 tag: Optional tag to target specific services 

211 custom_template: Optional custom template override 

212 

213 Returns: 

214 True if notification was sent successfully 

215 """ 

216 logger.debug(f"send_event: event_type={event_type.value}, tag={tag}") 

217 logger.debug(f"Context: {context}") 

218 

219 # Format notification using template 

220 message = NotificationTemplate.format( 

221 event_type, context, custom_template 

222 ) 

223 logger.debug( 

224 f"Template formatted - title: '{message['title'][:50]}...'" 

225 ) 

226 

227 # Send notification 

228 result = self.send( 

229 title=message["title"], 

230 body=message["body"], 

231 service_urls=service_urls, 

232 tag=tag, 

233 ) 

234 return result 

235 

236 def test_service(self, url: str) -> Dict[str, Any]: 

237 """ 

238 Test a notification service. 

239 

240 Args: 

241 url: Apprise-compatible service URL 

242 

243 Returns: 

244 Dict with 'success' boolean and optional 'error' message 

245 """ 

246 try: 

247 # Validate service URL for security (SSRF prevention) 

248 is_valid, error_msg = NotificationURLValidator.validate_service_url( 

249 url, allow_private_ips=self.allow_private_ips 

250 ) 

251 

252 if not is_valid: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 logger.warning( 

254 f"Test service URL validation failed: {error_msg}. " 

255 f"URL: {mask_sensitive_url(url)}" 

256 ) 

257 return { 

258 "success": False, 

259 "error": "Invalid notification service URL.", 

260 } 

261 

262 # Create temporary Apprise instance 

263 temp_apprise = apprise.Apprise() 

264 add_result = temp_apprise.add(url) 

265 

266 if not add_result: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 return { 

268 "success": False, 

269 "error": "Failed to add service URL", 

270 } 

271 

272 # Send test notification 

273 result = temp_apprise.notify( 

274 title="Test Notification", 

275 body=( 

276 "This is a test notification from Local Deep Research. " 

277 "If you see this, your service is configured correctly!" 

278 ), 

279 ) 

280 

281 if result: 281 ↛ 287line 281 didn't jump to line 287 because the condition on line 281 was always true

282 return { 

283 "success": True, 

284 "message": "Test notification sent successfully", 

285 } 

286 else: 

287 return { 

288 "success": False, 

289 "error": "Failed to send test notification", 

290 } 

291 

292 except Exception: 

293 logger.exception("Error testing notification service") 

294 return { 

295 "success": False, 

296 "error": "Failed to test notification service.", 

297 } 

298 

299 @staticmethod 

300 def _validate_url(url: str) -> None: 

301 """ 

302 Validate a notification service URL. 

303 

304 Args: 

305 url: URL to validate 

306 

307 Raises: 

308 ServiceError: If URL is invalid 

309 

310 Note: 

311 URL scheme validation is handled by Apprise itself, which maintains 

312 a comprehensive whitelist of supported notification services. 

313 Apprise will reject unsupported schemes like 'file://' or 'javascript://'. 

314 See: https://github.com/caronc/apprise/wiki 

315 """ 

316 if not url or not isinstance(url, str): 

317 raise ServiceError("URL must be a non-empty string") 

318 

319 # Check if it looks like a URL 

320 parsed = urlparse(url) 

321 if not parsed.scheme: 

322 raise ServiceError( 

323 "Invalid URL format. Must be an Apprise-compatible " 

324 "service URL (e.g., discord://webhook_id/token)" 

325 ) 

326 

327 def get_service_type(self, url: str) -> Optional[str]: 

328 """ 

329 Detect service type from URL. 

330 

331 Args: 

332 url: Service URL 

333 

334 Returns: 

335 Service type name or None if unknown 

336 """ 

337 for service_name, pattern in self.SERVICE_PATTERNS.items(): 

338 if re.match(pattern, url, re.IGNORECASE): 

339 return service_name 

340 return "unknown"