Coverage for src / local_deep_research / notifications / queue_helpers.py: 12%

99 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Queue notification helpers for the notification system. 

3 

4Provides helper functions for sending queue-related notifications 

5to keep the queue manager focused on queue logic. 

6""" 

7 

8from typing import Dict, Any, Optional 

9from loguru import logger 

10 

11from .manager import NotificationManager 

12from .templates import EventType 

13 

14 

15def send_queue_notification( 

16 username: str, 

17 research_id: str, 

18 query: str, 

19 settings_snapshot: Dict[str, Any], 

20 position: Optional[int] = None, 

21) -> bool: 

22 """ 

23 Send a research queued notification. 

24 

25 Args: 

26 username: User who owns the research 

27 research_id: UUID of the research 

28 query: Research query string 

29 settings_snapshot: Settings snapshot for thread-safe access 

30 position: Queue position (optional) 

31 

32 Returns: 

33 True if notification was sent successfully, False otherwise 

34 """ 

35 try: 

36 notification_manager = NotificationManager( 

37 settings_snapshot=settings_snapshot, user_id=username 

38 ) 

39 

40 # Build notification context 

41 context = { 

42 "query": query, 

43 "research_id": research_id, 

44 } 

45 

46 if position is not None: 

47 context["position"] = position 

48 context["wait_time"] = ( 

49 "Unknown" # Could estimate based on active researches 

50 ) 

51 

52 return notification_manager.send_notification( 

53 event_type=EventType.RESEARCH_QUEUED, 

54 context=context, 

55 ) 

56 

57 except Exception as e: 

58 logger.warning( 

59 f"Failed to send queued notification for {research_id}: {e}" 

60 ) 

61 return False 

62 

63 

64def send_queue_failed_notification( 

65 username: str, 

66 research_id: str, 

67 query: str, 

68 error_message: Optional[str] = None, 

69 settings_snapshot: Optional[Dict[str, Any]] = None, 

70) -> bool: 

71 """ 

72 Send a research failed notification from queue operations. 

73 

74 Args: 

75 username: User who owns the research 

76 research_id: UUID of the research 

77 query: Research query string 

78 error_message: Optional error message 

79 settings_snapshot: Settings snapshot for thread-safe access 

80 

81 Returns: 

82 True if notification was sent successfully, False otherwise 

83 """ 

84 if not settings_snapshot: 

85 logger.debug("No settings snapshot provided for failed notification") 

86 return False 

87 

88 try: 

89 notification_manager = NotificationManager( 

90 settings_snapshot=settings_snapshot, user_id=username 

91 ) 

92 

93 # Build notification context 

94 context = { 

95 "query": query, 

96 "research_id": research_id, 

97 } 

98 

99 if error_message: 

100 context["error"] = error_message 

101 

102 return notification_manager.send_notification( 

103 event_type=EventType.RESEARCH_FAILED, 

104 context=context, 

105 ) 

106 

107 except Exception as e: 

108 logger.warning( 

109 f"Failed to send failed notification for {research_id}: {e}" 

110 ) 

111 return False 

112 

113 

114def send_queue_failed_notification_from_session( 

115 username: str, 

116 research_id: str, 

117 query: str, 

118 error_message: str, 

119 db_session, 

120) -> None: 

121 """ 

122 Send a research failed notification, fetching settings from db_session. 

123 

124 This is a convenience wrapper for the queue processor that handles 

125 settings snapshot retrieval, logging, and error handling internally. 

126 All notification logic is contained within this function. 

127 

128 Args: 

129 username: User who owns the research 

130 research_id: UUID of the research 

131 query: Research query string 

132 error_message: Error message to include in notification 

133 db_session: Database session to fetch settings from 

134 """ 

135 try: 

136 from ...settings import SettingsManager 

137 

138 # Get settings snapshot from database session 

139 settings_manager = SettingsManager(db_session) 

140 settings_snapshot = settings_manager.get_settings_snapshot() 

141 

142 # Send notification using the helper function 

143 success = send_queue_failed_notification( 

144 username=username, 

145 research_id=research_id, 

146 query=query, 

147 error_message=error_message, 

148 settings_snapshot=settings_snapshot, 

149 ) 

150 

151 if success: 

152 logger.info(f"Sent failure notification for research {research_id}") 

153 else: 

154 logger.warning( 

155 f"Failed to send failure notification for {research_id} (disabled or rate limited)" 

156 ) 

157 

158 except Exception: 

159 logger.exception( 

160 f"Failed to send failure notification for {research_id}" 

161 ) 

162 

163 

164def send_research_completed_notification_from_session( 

165 username: str, 

166 research_id: str, 

167 db_session, 

168) -> None: 

169 """ 

170 Send research completed notification with summary and URL. 

171 

172 This is a convenience wrapper for the queue processor that handles 

173 all notification logic for completed research, including: 

174 - Research database lookup 

175 - Report content retrieval 

176 - URL building 

177 - Context building with summary 

178 - All logging and error handling 

179 

180 Args: 

181 username: User who owns the research 

182 research_id: UUID of the research 

183 db_session: Database session to fetch research and settings from 

184 """ 

185 try: 

186 logger.info( 

187 f"Starting completed notification process for research {research_id}, " 

188 f"user {username}" 

189 ) 

190 

191 # Import here to avoid circular dependencies 

192 from ...database.models import ResearchHistory 

193 from ...settings import SettingsManager 

194 from .manager import NotificationManager 

195 from .url_builder import build_notification_url 

196 

197 # Get research details for notification 

198 research = ( 

199 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

200 ) 

201 

202 # Get settings snapshot for thread-safe notification sending 

203 settings_manager = SettingsManager(db_session) 

204 settings_snapshot = settings_manager.get_settings_snapshot() 

205 

206 if research: 

207 logger.info( 

208 f"Found research record, creating NotificationManager " 

209 f"for user {username}" 

210 ) 

211 

212 # Create notification manager with settings snapshot 

213 notification_manager = NotificationManager( 

214 settings_snapshot=settings_snapshot, user_id=username 

215 ) 

216 

217 # Build full URL for notification 

218 full_url = build_notification_url( 

219 f"/research/{research_id}", 

220 settings_manager=settings_manager, 

221 ) 

222 

223 # Build notification context with required fields 

224 context = { 

225 "query": research.query or "Unknown query", 

226 "research_id": research_id, 

227 "summary": "No summary available", 

228 "url": full_url, 

229 } 

230 

231 # Get report content for notification 

232 from ...storage import get_report_storage 

233 

234 storage = get_report_storage(session=db_session) 

235 report_content = storage.get_report(research_id) 

236 

237 if report_content: 

238 # Truncate summary if too long 

239 context["summary"] = ( 

240 report_content[:200] + "..." 

241 if len(report_content) > 200 

242 else report_content 

243 ) 

244 

245 logger.info( 

246 f"Sending RESEARCH_COMPLETED notification for research " 

247 f"{research_id} to user {username}" 

248 ) 

249 logger.debug(f"Notification context: {context}") 

250 

251 # Send notification using the manager 

252 result = notification_manager.send_notification( 

253 event_type=EventType.RESEARCH_COMPLETED, 

254 context=context, 

255 ) 

256 

257 if result: 

258 logger.info( 

259 f"Successfully sent completion notification for research {research_id}" 

260 ) 

261 else: 

262 logger.warning( 

263 f"Completion notification not sent for {research_id} (disabled or rate limited)" 

264 ) 

265 

266 else: 

267 logger.warning( 

268 f"Could not find research {research_id} in database, " 

269 f"sending notification with minimal details" 

270 ) 

271 

272 # Create notification manager with settings snapshot 

273 notification_manager = NotificationManager( 

274 settings_snapshot=settings_snapshot, user_id=username 

275 ) 

276 

277 # Build minimal context 

278 context = { 

279 "query": f"Research {research_id}", 

280 "research_id": research_id, 

281 "summary": "Research completed but details unavailable", 

282 "url": f"/research/{research_id}", 

283 } 

284 

285 notification_manager.send_notification( 

286 event_type=EventType.RESEARCH_COMPLETED, 

287 context=context, 

288 ) 

289 logger.info( 

290 f"Sent completion notification for research {research_id} (minimal details)" 

291 ) 

292 

293 except Exception: 

294 logger.exception( 

295 f"Failed to send completion notification for {research_id}" 

296 ) 

297 

298 

299def send_research_failed_notification_from_session( 

300 username: str, 

301 research_id: str, 

302 error_message: str, 

303 db_session, 

304) -> None: 

305 """ 

306 Send research failed notification (research-specific version). 

307 

308 This is a convenience wrapper for the queue processor that handles 

309 all notification logic for failed research, including: 

310 - Research database lookup to get query 

311 - Context building with sanitized error message 

312 - All logging and error handling 

313 

314 Args: 

315 username: User who owns the research 

316 research_id: UUID of the research 

317 error_message: Error message (will be sanitized for security) 

318 db_session: Database session to fetch research and settings from 

319 """ 

320 try: 

321 logger.info( 

322 f"Starting failed notification process for research {research_id}, " 

323 f"user {username}" 

324 ) 

325 

326 # Import here to avoid circular dependencies 

327 from ...database.models import ResearchHistory 

328 from ...settings import SettingsManager 

329 from .manager import NotificationManager 

330 

331 # Get research details for notification 

332 research = ( 

333 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

334 ) 

335 

336 # Get settings snapshot for thread-safe notification sending 

337 settings_manager = SettingsManager(db_session) 

338 settings_snapshot = settings_manager.get_settings_snapshot() 

339 

340 # Sanitize error message for notification to avoid exposing 

341 # sensitive information (as noted by github-advanced-security) 

342 safe_error = "Research failed. Check logs for details." 

343 

344 if research: 

345 logger.info( 

346 f"Found research record, creating NotificationManager " 

347 f"for user {username}" 

348 ) 

349 

350 # Create notification manager with settings snapshot 

351 notification_manager = NotificationManager( 

352 settings_snapshot=settings_snapshot, user_id=username 

353 ) 

354 

355 # Build notification context 

356 context = { 

357 "query": research.query or "Unknown query", 

358 "research_id": research_id, 

359 "error": safe_error, 

360 } 

361 

362 logger.info( 

363 f"Sending RESEARCH_FAILED notification for research " 

364 f"{research_id} to user {username}" 

365 ) 

366 logger.debug(f"Notification context: {context}") 

367 

368 # Send notification using the manager 

369 result = notification_manager.send_notification( 

370 event_type=EventType.RESEARCH_FAILED, 

371 context=context, 

372 ) 

373 

374 if result: 

375 logger.info( 

376 f"Successfully sent failure notification for research {research_id}" 

377 ) 

378 else: 

379 logger.warning( 

380 f"Failure notification not sent for {research_id} (disabled or rate limited)" 

381 ) 

382 

383 else: 

384 logger.warning( 

385 f"Could not find research {research_id} in database, " 

386 f"sending notification with minimal details" 

387 ) 

388 

389 # Create notification manager with settings snapshot 

390 notification_manager = NotificationManager( 

391 settings_snapshot=settings_snapshot, user_id=username 

392 ) 

393 

394 # Build minimal context 

395 context = { 

396 "query": f"Research {research_id}", 

397 "research_id": research_id, 

398 "error": safe_error, 

399 } 

400 

401 notification_manager.send_notification( 

402 event_type=EventType.RESEARCH_FAILED, 

403 context=context, 

404 ) 

405 logger.info( 

406 f"Sent failure notification for research {research_id} (minimal details)" 

407 ) 

408 

409 except Exception: 

410 logger.exception( 

411 f"Failed to send failure notification for {research_id}" 

412 )