Coverage for src / local_deep_research / notifications / queue_helpers.py: 97%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Queue notification helpers for the notification system. 

3 

4Provides helper functions for sending queue-related notifications 

5to keep the queue manager focused on queue logic. 

6""" 

7 

8from typing import Dict, Any, Optional 

9from loguru import logger 

10 

11from .exceptions import RateLimitError 

12from .manager import NotificationManager 

13from .templates import EventType 

14 

15 

16def send_queue_notification( 

17 username: str, 

18 research_id: str, 

19 query: str, 

20 settings_snapshot: Dict[str, Any], 

21 position: Optional[int] = None, 

22) -> bool: 

23 """ 

24 Send a research queued notification. 

25 

26 Args: 

27 username: User who owns the research 

28 research_id: UUID of the research 

29 query: Research query string 

30 settings_snapshot: Settings snapshot for thread-safe access 

31 position: Queue position (optional) 

32 

33 Returns: 

34 True if notification was sent successfully, False otherwise 

35 """ 

36 try: 

37 notification_manager = NotificationManager( 

38 settings_snapshot=settings_snapshot, user_id=username 

39 ) 

40 

41 # Build notification context 

42 context: Dict[str, Any] = { 

43 "query": query, 

44 "research_id": research_id, 

45 } 

46 

47 if position is not None: 

48 context["position"] = position 

49 context["wait_time"] = ( 

50 "Unknown" # Could estimate based on active researches 

51 ) 

52 

53 return notification_manager.send_notification( 

54 event_type=EventType.RESEARCH_QUEUED, 

55 context=context, 

56 ) 

57 

58 except RateLimitError: 

59 logger.warning( 

60 f"Rate limited sending queued notification for {research_id}" 

61 ) 

62 return False 

63 except Exception: 

64 logger.warning(f"Failed to send queued notification for {research_id}") 

65 return False 

66 

67 

68def send_queue_failed_notification( 

69 username: str, 

70 research_id: str, 

71 query: str, 

72 error_message: Optional[str] = None, 

73 settings_snapshot: Optional[Dict[str, Any]] = None, 

74) -> bool: 

75 """ 

76 Send a research failed notification from queue operations. 

77 

78 Args: 

79 username: User who owns the research 

80 research_id: UUID of the research 

81 query: Research query string 

82 error_message: Optional error message 

83 settings_snapshot: Settings snapshot for thread-safe access 

84 

85 Returns: 

86 True if notification was sent successfully, False otherwise 

87 """ 

88 if not settings_snapshot: 

89 logger.debug("No settings snapshot provided for failed notification") 

90 return False 

91 

92 try: 

93 notification_manager = NotificationManager( 

94 settings_snapshot=settings_snapshot, user_id=username 

95 ) 

96 

97 # Build notification context 

98 context = { 

99 "query": query, 

100 "research_id": research_id, 

101 } 

102 

103 if error_message: 

104 context["error"] = error_message 

105 

106 return notification_manager.send_notification( 

107 event_type=EventType.RESEARCH_FAILED, 

108 context=context, 

109 ) 

110 

111 except RateLimitError: 

112 logger.warning( 

113 f"Rate limited sending failed notification for {research_id}" 

114 ) 

115 return False 

116 except Exception: 

117 logger.warning(f"Failed to send failed notification for {research_id}") 

118 return False 

119 

120 

121def send_queue_failed_notification_from_session( 

122 username: str, 

123 research_id: str, 

124 query: str, 

125 error_message: str, 

126 db_session, 

127) -> None: 

128 """ 

129 Send a research failed notification, fetching settings from db_session. 

130 

131 This is a convenience wrapper for the queue processor that handles 

132 settings snapshot retrieval, logging, and error handling internally. 

133 All notification logic is contained within this function. 

134 

135 Args: 

136 username: User who owns the research 

137 research_id: UUID of the research 

138 query: Research query string 

139 error_message: Error message to include in notification 

140 db_session: Database session to fetch settings from 

141 """ 

142 try: 

143 from ..settings import SettingsManager 

144 

145 # Get settings snapshot from database session 

146 settings_manager = SettingsManager(db_session) 

147 settings_snapshot = settings_manager.get_settings_snapshot() 

148 

149 # Send notification (handles RateLimitError internally, returns False) 

150 success = send_queue_failed_notification( 

151 username=username, 

152 research_id=research_id, 

153 query=query, 

154 error_message=error_message, 

155 settings_snapshot=settings_snapshot, 

156 ) 

157 

158 if success: 

159 logger.info(f"Sent failure notification for research {research_id}") 

160 else: 

161 logger.warning( 

162 f"Failed to send failure notification for {research_id} (not sent)" 

163 ) 

164 except Exception: 

165 logger.exception( 

166 f"Failed to send failure notification for {research_id}" 

167 ) 

168 

169 

170def send_research_completed_notification_from_session( 

171 username: str, 

172 research_id: str, 

173 db_session, 

174) -> None: 

175 """ 

176 Send research completed notification with summary and URL. 

177 

178 This is a convenience wrapper for the queue processor that handles 

179 all notification logic for completed research, including: 

180 - Research database lookup 

181 - Report content retrieval 

182 - URL building 

183 - Context building with summary 

184 - All logging and error handling 

185 

186 Args: 

187 username: User who owns the research 

188 research_id: UUID of the research 

189 db_session: Database session to fetch research and settings from 

190 """ 

191 try: 

192 logger.info( 

193 f"Starting completed notification process for research {research_id}, " 

194 f"user {username}" 

195 ) 

196 

197 # Import here to avoid circular dependencies 

198 from ..database.models import ResearchHistory 

199 from ..settings import SettingsManager 

200 from .manager import NotificationManager 

201 from .url_builder import build_notification_url 

202 

203 # Get research details for notification 

204 research = ( 

205 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

206 ) 

207 

208 # Get settings snapshot for thread-safe notification sending 

209 settings_manager = SettingsManager(db_session) 

210 settings_snapshot = settings_manager.get_settings_snapshot() 

211 

212 if research: 

213 logger.info( 

214 f"Found research record, creating NotificationManager " 

215 f"for user {username}" 

216 ) 

217 

218 # Create notification manager with settings snapshot 

219 notification_manager = NotificationManager( 

220 settings_snapshot=settings_snapshot, user_id=username 

221 ) 

222 

223 # Build full URL for notification 

224 full_url = build_notification_url( 

225 f"/research/{research_id}", 

226 settings_manager=settings_manager, 

227 ) 

228 

229 # Build notification context with required fields 

230 context = { 

231 "query": research.query or "Unknown query", 

232 "research_id": research_id, 

233 "summary": "No summary available", 

234 "url": full_url, 

235 } 

236 

237 # Get report content for notification 

238 from ..storage import get_report_storage 

239 

240 storage = get_report_storage(session=db_session) 

241 report_content = storage.get_report(research_id) 

242 

243 if report_content: 

244 # Truncate summary if too long 

245 context["summary"] = ( 

246 report_content[:200] + "..." 

247 if len(report_content) > 200 

248 else report_content 

249 ) 

250 

251 logger.info( 

252 f"Sending RESEARCH_COMPLETED notification for research " 

253 f"{research_id} to user {username}" 

254 ) 

255 logger.debug(f"Notification context: {context}") 

256 

257 # Send notification using the manager 

258 result = notification_manager.send_notification( 

259 event_type=EventType.RESEARCH_COMPLETED, 

260 context=context, 

261 ) 

262 

263 if result: 

264 logger.info( 

265 f"Successfully sent completion notification for research {research_id}" 

266 ) 

267 else: 

268 logger.warning( 

269 f"Completion notification not sent for {research_id} (disabled)" 

270 ) 

271 

272 else: 

273 logger.warning( 

274 f"Could not find research {research_id} in database, " 

275 f"sending notification with minimal details" 

276 ) 

277 

278 # Create notification manager with settings snapshot 

279 notification_manager = NotificationManager( 

280 settings_snapshot=settings_snapshot, user_id=username 

281 ) 

282 

283 # Build minimal context 

284 context = { 

285 "query": f"Research {research_id}", 

286 "research_id": research_id, 

287 "summary": "Research completed but details unavailable", 

288 "url": f"/research/{research_id}", 

289 } 

290 

291 notification_manager.send_notification( 

292 event_type=EventType.RESEARCH_COMPLETED, 

293 context=context, 

294 ) 

295 logger.info( 

296 f"Sent completion notification for research {research_id} (minimal details)" 

297 ) 

298 

299 except RateLimitError: 

300 logger.warning( 

301 f"Rate limited sending completion notification for {research_id}" 

302 ) 

303 except Exception: 

304 logger.exception( 

305 f"Failed to send completion notification for {research_id}" 

306 ) 

307 

308 

309def send_research_failed_notification_from_session( 

310 username: str, 

311 research_id: str, 

312 error_message: str, 

313 db_session, 

314) -> None: 

315 """ 

316 Send research failed notification (research-specific version). 

317 

318 This is a convenience wrapper for the queue processor that handles 

319 all notification logic for failed research, including: 

320 - Research database lookup to get query 

321 - Context building with sanitized error message 

322 - All logging and error handling 

323 

324 Args: 

325 username: User who owns the research 

326 research_id: UUID of the research 

327 error_message: Error message (will be sanitized for security) 

328 db_session: Database session to fetch research and settings from 

329 """ 

330 try: 

331 logger.info( 

332 f"Starting failed notification process for research {research_id}, " 

333 f"user {username}" 

334 ) 

335 

336 # Import here to avoid circular dependencies 

337 from ..database.models import ResearchHistory 

338 from ..settings import SettingsManager 

339 from .manager import NotificationManager 

340 

341 # Get research details for notification 

342 research = ( 

343 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

344 ) 

345 

346 # Get settings snapshot for thread-safe notification sending 

347 settings_manager = SettingsManager(db_session) 

348 settings_snapshot = settings_manager.get_settings_snapshot() 

349 

350 # Sanitize error message for notification to avoid exposing 

351 # sensitive information (as noted by github-advanced-security) 

352 safe_error = "Research failed. Check logs for details." 

353 

354 if research: 

355 logger.info( 

356 f"Found research record, creating NotificationManager " 

357 f"for user {username}" 

358 ) 

359 

360 # Create notification manager with settings snapshot 

361 notification_manager = NotificationManager( 

362 settings_snapshot=settings_snapshot, user_id=username 

363 ) 

364 

365 # Build notification context 

366 context = { 

367 "query": research.query or "Unknown query", 

368 "research_id": research_id, 

369 "error": safe_error, 

370 } 

371 

372 logger.info( 

373 f"Sending RESEARCH_FAILED notification for research " 

374 f"{research_id} to user {username}" 

375 ) 

376 logger.debug(f"Notification context: {context}") 

377 

378 # Send notification using the manager 

379 result = notification_manager.send_notification( 

380 event_type=EventType.RESEARCH_FAILED, 

381 context=context, 

382 ) 

383 

384 if result: 384 ↛ 389line 384 didn't jump to line 389 because the condition on line 384 was always true

385 logger.info( 

386 f"Successfully sent failure notification for research {research_id}" 

387 ) 

388 else: 

389 logger.warning( 

390 f"Failure notification not sent for {research_id} (disabled)" 

391 ) 

392 

393 else: 

394 logger.warning( 

395 f"Could not find research {research_id} in database, " 

396 f"sending notification with minimal details" 

397 ) 

398 

399 # Create notification manager with settings snapshot 

400 notification_manager = NotificationManager( 

401 settings_snapshot=settings_snapshot, user_id=username 

402 ) 

403 

404 # Build minimal context 

405 context = { 

406 "query": f"Research {research_id}", 

407 "research_id": research_id, 

408 "error": safe_error, 

409 } 

410 

411 notification_manager.send_notification( 

412 event_type=EventType.RESEARCH_FAILED, 

413 context=context, 

414 ) 

415 logger.info( 

416 f"Sent failure notification for research {research_id} (minimal details)" 

417 ) 

418 

419 except RateLimitError: 

420 logger.warning( 

421 f"Rate limited sending failure notification for {research_id}" 

422 ) 

423 except Exception: 

424 logger.exception( 

425 f"Failed to send failure notification for {research_id}" 

426 )