Coverage for src / local_deep_research / notifications / queue_helpers.py: 12%
99 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Queue notification helpers for the notification system.
4Provides helper functions for sending queue-related notifications
5to keep the queue manager focused on queue logic.
6"""
8from typing import Dict, Any, Optional
9from loguru import logger
11from .manager import NotificationManager
12from .templates import EventType
15def send_queue_notification(
16 username: str,
17 research_id: str,
18 query: str,
19 settings_snapshot: Dict[str, Any],
20 position: Optional[int] = None,
21) -> bool:
22 """
23 Send a research queued notification.
25 Args:
26 username: User who owns the research
27 research_id: UUID of the research
28 query: Research query string
29 settings_snapshot: Settings snapshot for thread-safe access
30 position: Queue position (optional)
32 Returns:
33 True if notification was sent successfully, False otherwise
34 """
35 try:
36 notification_manager = NotificationManager(
37 settings_snapshot=settings_snapshot, user_id=username
38 )
40 # Build notification context
41 context = {
42 "query": query,
43 "research_id": research_id,
44 }
46 if position is not None:
47 context["position"] = position
48 context["wait_time"] = (
49 "Unknown" # Could estimate based on active researches
50 )
52 return notification_manager.send_notification(
53 event_type=EventType.RESEARCH_QUEUED,
54 context=context,
55 )
57 except Exception as e:
58 logger.warning(
59 f"Failed to send queued notification for {research_id}: {e}"
60 )
61 return False
64def send_queue_failed_notification(
65 username: str,
66 research_id: str,
67 query: str,
68 error_message: Optional[str] = None,
69 settings_snapshot: Optional[Dict[str, Any]] = None,
70) -> bool:
71 """
72 Send a research failed notification from queue operations.
74 Args:
75 username: User who owns the research
76 research_id: UUID of the research
77 query: Research query string
78 error_message: Optional error message
79 settings_snapshot: Settings snapshot for thread-safe access
81 Returns:
82 True if notification was sent successfully, False otherwise
83 """
84 if not settings_snapshot:
85 logger.debug("No settings snapshot provided for failed notification")
86 return False
88 try:
89 notification_manager = NotificationManager(
90 settings_snapshot=settings_snapshot, user_id=username
91 )
93 # Build notification context
94 context = {
95 "query": query,
96 "research_id": research_id,
97 }
99 if error_message:
100 context["error"] = error_message
102 return notification_manager.send_notification(
103 event_type=EventType.RESEARCH_FAILED,
104 context=context,
105 )
107 except Exception as e:
108 logger.warning(
109 f"Failed to send failed notification for {research_id}: {e}"
110 )
111 return False
114def send_queue_failed_notification_from_session(
115 username: str,
116 research_id: str,
117 query: str,
118 error_message: str,
119 db_session,
120) -> None:
121 """
122 Send a research failed notification, fetching settings from db_session.
124 This is a convenience wrapper for the queue processor that handles
125 settings snapshot retrieval, logging, and error handling internally.
126 All notification logic is contained within this function.
128 Args:
129 username: User who owns the research
130 research_id: UUID of the research
131 query: Research query string
132 error_message: Error message to include in notification
133 db_session: Database session to fetch settings from
134 """
135 try:
136 from ...settings import SettingsManager
138 # Get settings snapshot from database session
139 settings_manager = SettingsManager(db_session)
140 settings_snapshot = settings_manager.get_settings_snapshot()
142 # Send notification using the helper function
143 success = send_queue_failed_notification(
144 username=username,
145 research_id=research_id,
146 query=query,
147 error_message=error_message,
148 settings_snapshot=settings_snapshot,
149 )
151 if success:
152 logger.info(f"Sent failure notification for research {research_id}")
153 else:
154 logger.warning(
155 f"Failed to send failure notification for {research_id} (disabled or rate limited)"
156 )
158 except Exception:
159 logger.exception(
160 f"Failed to send failure notification for {research_id}"
161 )
164def send_research_completed_notification_from_session(
165 username: str,
166 research_id: str,
167 db_session,
168) -> None:
169 """
170 Send research completed notification with summary and URL.
172 This is a convenience wrapper for the queue processor that handles
173 all notification logic for completed research, including:
174 - Research database lookup
175 - Report content retrieval
176 - URL building
177 - Context building with summary
178 - All logging and error handling
180 Args:
181 username: User who owns the research
182 research_id: UUID of the research
183 db_session: Database session to fetch research and settings from
184 """
185 try:
186 logger.info(
187 f"Starting completed notification process for research {research_id}, "
188 f"user {username}"
189 )
191 # Import here to avoid circular dependencies
192 from ...database.models import ResearchHistory
193 from ...settings import SettingsManager
194 from .manager import NotificationManager
195 from .url_builder import build_notification_url
197 # Get research details for notification
198 research = (
199 db_session.query(ResearchHistory).filter_by(id=research_id).first()
200 )
202 # Get settings snapshot for thread-safe notification sending
203 settings_manager = SettingsManager(db_session)
204 settings_snapshot = settings_manager.get_settings_snapshot()
206 if research:
207 logger.info(
208 f"Found research record, creating NotificationManager "
209 f"for user {username}"
210 )
212 # Create notification manager with settings snapshot
213 notification_manager = NotificationManager(
214 settings_snapshot=settings_snapshot, user_id=username
215 )
217 # Build full URL for notification
218 full_url = build_notification_url(
219 f"/research/{research_id}",
220 settings_manager=settings_manager,
221 )
223 # Build notification context with required fields
224 context = {
225 "query": research.query or "Unknown query",
226 "research_id": research_id,
227 "summary": "No summary available",
228 "url": full_url,
229 }
231 # Get report content for notification
232 from ...storage import get_report_storage
234 storage = get_report_storage(session=db_session)
235 report_content = storage.get_report(research_id)
237 if report_content:
238 # Truncate summary if too long
239 context["summary"] = (
240 report_content[:200] + "..."
241 if len(report_content) > 200
242 else report_content
243 )
245 logger.info(
246 f"Sending RESEARCH_COMPLETED notification for research "
247 f"{research_id} to user {username}"
248 )
249 logger.debug(f"Notification context: {context}")
251 # Send notification using the manager
252 result = notification_manager.send_notification(
253 event_type=EventType.RESEARCH_COMPLETED,
254 context=context,
255 )
257 if result:
258 logger.info(
259 f"Successfully sent completion notification for research {research_id}"
260 )
261 else:
262 logger.warning(
263 f"Completion notification not sent for {research_id} (disabled or rate limited)"
264 )
266 else:
267 logger.warning(
268 f"Could not find research {research_id} in database, "
269 f"sending notification with minimal details"
270 )
272 # Create notification manager with settings snapshot
273 notification_manager = NotificationManager(
274 settings_snapshot=settings_snapshot, user_id=username
275 )
277 # Build minimal context
278 context = {
279 "query": f"Research {research_id}",
280 "research_id": research_id,
281 "summary": "Research completed but details unavailable",
282 "url": f"/research/{research_id}",
283 }
285 notification_manager.send_notification(
286 event_type=EventType.RESEARCH_COMPLETED,
287 context=context,
288 )
289 logger.info(
290 f"Sent completion notification for research {research_id} (minimal details)"
291 )
293 except Exception:
294 logger.exception(
295 f"Failed to send completion notification for {research_id}"
296 )
299def send_research_failed_notification_from_session(
300 username: str,
301 research_id: str,
302 error_message: str,
303 db_session,
304) -> None:
305 """
306 Send research failed notification (research-specific version).
308 This is a convenience wrapper for the queue processor that handles
309 all notification logic for failed research, including:
310 - Research database lookup to get query
311 - Context building with sanitized error message
312 - All logging and error handling
314 Args:
315 username: User who owns the research
316 research_id: UUID of the research
317 error_message: Error message (will be sanitized for security)
318 db_session: Database session to fetch research and settings from
319 """
320 try:
321 logger.info(
322 f"Starting failed notification process for research {research_id}, "
323 f"user {username}"
324 )
326 # Import here to avoid circular dependencies
327 from ...database.models import ResearchHistory
328 from ...settings import SettingsManager
329 from .manager import NotificationManager
331 # Get research details for notification
332 research = (
333 db_session.query(ResearchHistory).filter_by(id=research_id).first()
334 )
336 # Get settings snapshot for thread-safe notification sending
337 settings_manager = SettingsManager(db_session)
338 settings_snapshot = settings_manager.get_settings_snapshot()
340 # Sanitize error message for notification to avoid exposing
341 # sensitive information (as noted by github-advanced-security)
342 safe_error = "Research failed. Check logs for details."
344 if research:
345 logger.info(
346 f"Found research record, creating NotificationManager "
347 f"for user {username}"
348 )
350 # Create notification manager with settings snapshot
351 notification_manager = NotificationManager(
352 settings_snapshot=settings_snapshot, user_id=username
353 )
355 # Build notification context
356 context = {
357 "query": research.query or "Unknown query",
358 "research_id": research_id,
359 "error": safe_error,
360 }
362 logger.info(
363 f"Sending RESEARCH_FAILED notification for research "
364 f"{research_id} to user {username}"
365 )
366 logger.debug(f"Notification context: {context}")
368 # Send notification using the manager
369 result = notification_manager.send_notification(
370 event_type=EventType.RESEARCH_FAILED,
371 context=context,
372 )
374 if result:
375 logger.info(
376 f"Successfully sent failure notification for research {research_id}"
377 )
378 else:
379 logger.warning(
380 f"Failure notification not sent for {research_id} (disabled or rate limited)"
381 )
383 else:
384 logger.warning(
385 f"Could not find research {research_id} in database, "
386 f"sending notification with minimal details"
387 )
389 # Create notification manager with settings snapshot
390 notification_manager = NotificationManager(
391 settings_snapshot=settings_snapshot, user_id=username
392 )
394 # Build minimal context
395 context = {
396 "query": f"Research {research_id}",
397 "research_id": research_id,
398 "error": safe_error,
399 }
401 notification_manager.send_notification(
402 event_type=EventType.RESEARCH_FAILED,
403 context=context,
404 )
405 logger.info(
406 f"Sent failure notification for research {research_id} (minimal details)"
407 )
409 except Exception:
410 logger.exception(
411 f"Failed to send failure notification for {research_id}"
412 )