Coverage for src/local_deep_research/notifications/service.py: 98%

93 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Core notification service using Apprise. 

3""" 

4 

5import re 

6from typing import Dict, List, Optional, Any 

7from urllib.parse import urlparse 

8 

9import apprise 

10from loguru import logger 

11from tenacity import ( 

12 retry, 

13 stop_after_attempt, 

14 wait_exponential, 

15 retry_if_exception_type, 

16) 

17 

18from .exceptions import ServiceError, SendError 

19from .templates import EventType, NotificationTemplate 

20from ..security.url_builder import mask_sensitive_url 

21from ..security.notification_validator import ( 

22 NotificationURLValidator, 

23) 

24 

25# Backward compatibility constants - now handled by Tenacity internally 

26MAX_RETRY_ATTEMPTS = 3 

27INITIAL_RETRY_DELAY = 0.5 

28RETRY_BACKOFF_MULTIPLIER = 2 

29 

30 

31class NotificationService: 

32 """ 

33 Low-level notification service that wraps Apprise. 

34 """ 

35 

36 # Regex patterns for common service types (for validation) 

37 SERVICE_PATTERNS = { 

38 "email": r"^mailto://", 

39 "discord": r"^discord://", 

40 "slack": r"^slack://", 

41 "telegram": r"^tgram://", 

42 "smtp": r"^(smtp|smtps)://", 

43 } 

44 

45 def __init__( 

46 self, 

47 allow_private_ips: bool = False, 

48 outbound_allowed: bool = False, 

49 ): 

50 """ 

51 Initialize the notification service. 

52 

53 Args: 

54 allow_private_ips: Whether to allow notifications to private/local IPs 

55 (default: False for security). Set to True for 

56 development/testing environments only. 

57 outbound_allowed: Server-level master switch 

58 (env-only via LDR_NOTIFICATIONS_ALLOW_OUTBOUND). 

59 Default False — outbound notifications are off until 

60 the operator opts in. See SECURITY.md "Notification 

61 Webhook SSRF" for the rationale. 

62 """ 

63 self.apprise = apprise.Apprise() 

64 self.allow_private_ips = allow_private_ips 

65 self.outbound_allowed = outbound_allowed 

66 

67 @retry( 

68 stop=stop_after_attempt(3), 

69 wait=wait_exponential(multiplier=1, min=0.5, max=10), 

70 retry=retry_if_exception_type((Exception,)), 

71 reraise=True, 

72 ) 

73 def _send_with_retry( 

74 self, 

75 title: str, 

76 body: str, 

77 apprise_instance: apprise.Apprise, 

78 tag: Optional[str] = None, 

79 attach: Optional[List[str]] = None, 

80 ) -> bool: 

81 """ 

82 Send a notification using the provided Apprise instance with retry logic. 

83 

84 This method is decorated with Tenacity to handle retries automatically. 

85 

86 Args: 

87 title: Notification title 

88 body: Notification body text 

89 apprise_instance: Apprise instance to use for sending 

90 tag: Optional tag to target specific services 

91 attach: Optional list of file paths to attach 

92 

93 Returns: 

94 True if notification was sent successfully 

95 

96 Raises: 

97 SendError: If sending fails after all retry attempts 

98 """ 

99 logger.debug( 

100 f"Sending notification: title='{title[:50]}...', tag={tag}" 

101 ) 

102 logger.debug(f"Body preview: {body[:200]}...") 

103 

104 # Send notification 

105 notify_result = apprise_instance.notify( 

106 title=title, 

107 body=body, 

108 tag=tag, 

109 attach=attach, 

110 ) 

111 

112 if notify_result: 

113 logger.debug(f"Notification sent successfully: '{title[:50]}...'") 

114 return True 

115 error_msg = "Failed to send notification to any service" 

116 logger.warning(error_msg) 

117 raise SendError(error_msg) 

118 

119 def send( 

120 self, 

121 title: str, 

122 body: str, 

123 service_urls: Optional[str] = None, 

124 tag: Optional[str] = None, 

125 attach: Optional[List[str]] = None, 

126 ) -> bool: 

127 """ 

128 Send a notification to service URLs with automatic retry. 

129 

130 Args: 

131 title: Notification title 

132 body: Notification body text 

133 service_urls: Comma-separated list of service URLs to override configured ones 

134 tag: Optional tag to target specific services 

135 attach: Optional list of file paths to attach 

136 

137 Returns: 

138 True if notification was sent successfully to at least one service 

139 

140 Raises: 

141 SendError: If sending fails after all retry attempts 

142 

143 Note: 

144 Temporary Apprise instances are created for each send operation 

145 and are automatically garbage collected by Python when they go 

146 out of scope. This simple approach is ideal for small deployments 

147 (~5 users) and avoids memory management complexity. 

148 """ 

149 # Defense-in-depth: enforce the operator-level master switch at 

150 # the service layer too, not just at NotificationManager. 

151 # Today the manager always wraps this method, but keeping the 

152 # gate here means a future direct caller cannot accidentally 

153 # bypass it. See SECURITY.md "Notification Webhook SSRF". 

154 if not self.outbound_allowed: 

155 logger.warning( 

156 "Notification not sent: outbound notifications are disabled " 

157 "at the server level. Set " 

158 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable. See " 

159 "SECURITY.md 'Notification Webhook SSRF'." 

160 ) 

161 return False 

162 

163 # If service_urls are provided, validate before trying to send 

164 if service_urls: 

165 logger.debug("Creating Apprise instance for provided service URLs") 

166 

167 # Validate service URLs for security (SSRF prevention) 

168 is_valid, error_msg = ( 

169 NotificationURLValidator.validate_multiple_urls( 

170 service_urls, allow_private_ips=self.allow_private_ips 

171 ) 

172 ) 

173 

174 if not is_valid: 

175 logger.error( 

176 f"Service URL validation failed: {error_msg}. " 

177 f"URL: {mask_sensitive_url(service_urls)}" 

178 ) 

179 raise ServiceError(f"Invalid service URL: {error_msg}") 

180 

181 try: 

182 # If service_urls are provided, create a new Apprise instance 

183 if service_urls: 

184 temp_apprise = apprise.Apprise() 

185 result = temp_apprise.add(service_urls, tag=tag) 

186 

187 if not result: 

188 logger.error( 

189 f"Failed to add service URLs to Apprise: " 

190 f"{mask_sensitive_url(service_urls)}" 

191 ) 

192 return False 

193 

194 # Send notification with the temp instance (with retry) 

195 return self._send_with_retry( 

196 title, body, temp_apprise, tag, attach 

197 ) 

198 # Use the configured apprise instance 

199 if len(self.apprise) == 0: 199 ↛ 204line 199 didn't jump to line 204 because the condition on line 199 was always true

200 logger.debug("No notification services configured in Apprise") 

201 return False 

202 

203 # Send notification (with retry) 

204 return self._send_with_retry(title, body, self.apprise, tag, attach) 

205 

206 except Exception as e: 

207 # Tenacity will retry, but if all retries fail, raise SendError 

208 logger.exception( 

209 f"Failed to send notification after retries: '{title[:50]}...'" 

210 ) 

211 raise SendError(f"Failed to send notification: {str(e)}") 

212 

213 def send_event( 

214 self, 

215 event_type: EventType, 

216 context: Dict[str, Any], 

217 service_urls: Optional[str] = None, 

218 tag: Optional[str] = None, 

219 custom_template: Optional[Dict[str, str]] = None, 

220 ) -> bool: 

221 """ 

222 Send a notification for a specific event type. 

223 

224 Args: 

225 event_type: Type of event 

226 context: Context data for template formatting 

227 service_urls: Comma-separated list of service URLs 

228 tag: Optional tag to target specific services 

229 custom_template: Optional custom template override 

230 

231 Returns: 

232 True if notification was sent successfully 

233 """ 

234 logger.debug(f"send_event: event_type={event_type.value}, tag={tag}") 

235 logger.debug(f"Context: {context}") 

236 

237 # Format notification using template 

238 message = NotificationTemplate.format( 

239 event_type, context, custom_template 

240 ) 

241 logger.debug( 

242 f"Template formatted - title: '{message['title'][:50]}...'" 

243 ) 

244 

245 # Send notification 

246 return self.send( 

247 title=message["title"], 

248 body=message["body"], 

249 service_urls=service_urls, 

250 tag=tag, 

251 ) 

252 

253 def test_service(self, url: str) -> Dict[str, Any]: 

254 """ 

255 Test a notification service. 

256 

257 Args: 

258 url: Apprise-compatible service URL 

259 

260 Returns: 

261 Dict with 'success' boolean and optional 'error' message 

262 """ 

263 # Server-level master switch (env-only). Mirrors 

264 # NotificationManager's gate so the "Send Test Notification" 

265 # button cannot bypass it. WARNING level so the operator sees 

266 # the actionable signal naming the env var. 

267 if not self.outbound_allowed: 

268 logger.warning( 

269 "Notification test refused: outbound notifications are " 

270 "disabled at the server level. Set " 

271 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable. See " 

272 "SECURITY.md 'Notification Webhook SSRF'." 

273 ) 

274 return { 

275 "success": False, 

276 "error": ( 

277 "Outbound notifications are disabled. The server " 

278 "administrator must set " 

279 "LDR_NOTIFICATIONS_ALLOW_OUTBOUND=true to enable " 

280 "notification webhooks. See SECURITY.md " 

281 "'Notification Webhook SSRF' for details." 

282 ), 

283 } 

284 

285 try: 

286 # Validate service URL for security (SSRF prevention) 

287 is_valid, error_msg = NotificationURLValidator.validate_service_url( 

288 url, allow_private_ips=self.allow_private_ips 

289 ) 

290 

291 if not is_valid: 

292 logger.warning( 

293 f"Test service URL validation failed: {error_msg}. " 

294 f"URL: {mask_sensitive_url(url)}" 

295 ) 

296 return { 

297 "success": False, 

298 "error": "Invalid notification service URL.", 

299 } 

300 

301 # Create temporary Apprise instance 

302 temp_apprise = apprise.Apprise() 

303 add_result = temp_apprise.add(url) 

304 

305 if not add_result: 

306 return { 

307 "success": False, 

308 "error": "Failed to add service URL", 

309 } 

310 

311 # Send test notification 

312 result = temp_apprise.notify( 

313 title="Test Notification", 

314 body=( 

315 "This is a test notification from Local Deep Research. " 

316 "If you see this, your service is configured correctly!" 

317 ), 

318 ) 

319 

320 if result: 

321 return { 

322 "success": True, 

323 "message": "Test notification sent successfully", 

324 } 

325 return { 

326 "success": False, 

327 "error": "Failed to send test notification", 

328 } 

329 

330 except Exception: 

331 logger.exception("Error testing notification service") 

332 return { 

333 "success": False, 

334 "error": "Failed to test notification service.", 

335 } 

336 

337 @staticmethod 

338 def _validate_url(url: str) -> None: 

339 """ 

340 Validate a notification service URL. 

341 

342 Args: 

343 url: URL to validate 

344 

345 Raises: 

346 ServiceError: If URL is invalid 

347 

348 Note: 

349 URL scheme validation is handled by Apprise itself, which maintains 

350 a comprehensive whitelist of supported notification services. 

351 Apprise will reject unsupported schemes like 'file://' or 'javascript://'. 

352 See: https://github.com/caronc/apprise/wiki 

353 """ 

354 if not url or not isinstance(url, str): 

355 raise ServiceError("URL must be a non-empty string") 

356 

357 # Check if it looks like a URL 

358 parsed = urlparse(url) 

359 if not parsed.scheme: 

360 raise ServiceError( 

361 "Invalid URL format. Must be an Apprise-compatible " 

362 "service URL (e.g., discord://webhook_id/token)" 

363 ) 

364 

365 def get_service_type(self, url: str) -> Optional[str]: 

366 """ 

367 Detect service type from URL. 

368 

369 Args: 

370 url: Service URL 

371 

372 Returns: 

373 Service type name or None if unknown 

374 """ 

375 for service_name, pattern in self.SERVICE_PATTERNS.items(): 

376 if re.match(pattern, url, re.IGNORECASE): 

377 return service_name 

378 return "unknown"