Coverage for src / local_deep_research / notifications / queue_helpers.py: 97%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Queue notification helpers for the notification system.
4Provides helper functions for sending queue-related notifications
5to keep the queue manager focused on queue logic.
6"""
8from typing import Dict, Any, Optional
9from loguru import logger
11from .exceptions import RateLimitError
12from .manager import NotificationManager
13from .templates import EventType
16def send_queue_notification(
17 username: str,
18 research_id: str,
19 query: str,
20 settings_snapshot: Dict[str, Any],
21 position: Optional[int] = None,
22) -> bool:
23 """
24 Send a research queued notification.
26 Args:
27 username: User who owns the research
28 research_id: UUID of the research
29 query: Research query string
30 settings_snapshot: Settings snapshot for thread-safe access
31 position: Queue position (optional)
33 Returns:
34 True if notification was sent successfully, False otherwise
35 """
36 try:
37 notification_manager = NotificationManager(
38 settings_snapshot=settings_snapshot, user_id=username
39 )
41 # Build notification context
42 context: Dict[str, Any] = {
43 "query": query,
44 "research_id": research_id,
45 }
47 if position is not None:
48 context["position"] = position
49 context["wait_time"] = (
50 "Unknown" # Could estimate based on active researches
51 )
53 return notification_manager.send_notification(
54 event_type=EventType.RESEARCH_QUEUED,
55 context=context,
56 )
58 except RateLimitError:
59 logger.warning(
60 f"Rate limited sending queued notification for {research_id}"
61 )
62 return False
63 except Exception:
64 logger.warning(f"Failed to send queued notification for {research_id}")
65 return False
68def send_queue_failed_notification(
69 username: str,
70 research_id: str,
71 query: str,
72 error_message: Optional[str] = None,
73 settings_snapshot: Optional[Dict[str, Any]] = None,
74) -> bool:
75 """
76 Send a research failed notification from queue operations.
78 Args:
79 username: User who owns the research
80 research_id: UUID of the research
81 query: Research query string
82 error_message: Optional error message
83 settings_snapshot: Settings snapshot for thread-safe access
85 Returns:
86 True if notification was sent successfully, False otherwise
87 """
88 if not settings_snapshot:
89 logger.debug("No settings snapshot provided for failed notification")
90 return False
92 try:
93 notification_manager = NotificationManager(
94 settings_snapshot=settings_snapshot, user_id=username
95 )
97 # Build notification context
98 context = {
99 "query": query,
100 "research_id": research_id,
101 }
103 if error_message:
104 context["error"] = error_message
106 return notification_manager.send_notification(
107 event_type=EventType.RESEARCH_FAILED,
108 context=context,
109 )
111 except RateLimitError:
112 logger.warning(
113 f"Rate limited sending failed notification for {research_id}"
114 )
115 return False
116 except Exception:
117 logger.warning(f"Failed to send failed notification for {research_id}")
118 return False
121def send_queue_failed_notification_from_session(
122 username: str,
123 research_id: str,
124 query: str,
125 error_message: str,
126 db_session,
127) -> None:
128 """
129 Send a research failed notification, fetching settings from db_session.
131 This is a convenience wrapper for the queue processor that handles
132 settings snapshot retrieval, logging, and error handling internally.
133 All notification logic is contained within this function.
135 Args:
136 username: User who owns the research
137 research_id: UUID of the research
138 query: Research query string
139 error_message: Error message to include in notification
140 db_session: Database session to fetch settings from
141 """
142 try:
143 from ..settings import SettingsManager
145 # Get settings snapshot from database session
146 settings_manager = SettingsManager(db_session)
147 settings_snapshot = settings_manager.get_settings_snapshot()
149 # Send notification (handles RateLimitError internally, returns False)
150 success = send_queue_failed_notification(
151 username=username,
152 research_id=research_id,
153 query=query,
154 error_message=error_message,
155 settings_snapshot=settings_snapshot,
156 )
158 if success:
159 logger.info(f"Sent failure notification for research {research_id}")
160 else:
161 logger.warning(
162 f"Failed to send failure notification for {research_id} (not sent)"
163 )
164 except Exception:
165 logger.exception(
166 f"Failed to send failure notification for {research_id}"
167 )
170def send_research_completed_notification_from_session(
171 username: str,
172 research_id: str,
173 db_session,
174) -> None:
175 """
176 Send research completed notification with summary and URL.
178 This is a convenience wrapper for the queue processor that handles
179 all notification logic for completed research, including:
180 - Research database lookup
181 - Report content retrieval
182 - URL building
183 - Context building with summary
184 - All logging and error handling
186 Args:
187 username: User who owns the research
188 research_id: UUID of the research
189 db_session: Database session to fetch research and settings from
190 """
191 try:
192 logger.info(
193 f"Starting completed notification process for research {research_id}, "
194 f"user {username}"
195 )
197 # Import here to avoid circular dependencies
198 from ..database.models import ResearchHistory
199 from ..settings import SettingsManager
200 from .manager import NotificationManager
201 from .url_builder import build_notification_url
203 # Get research details for notification
204 research = (
205 db_session.query(ResearchHistory).filter_by(id=research_id).first()
206 )
208 # Get settings snapshot for thread-safe notification sending
209 settings_manager = SettingsManager(db_session)
210 settings_snapshot = settings_manager.get_settings_snapshot()
212 if research:
213 logger.info(
214 f"Found research record, creating NotificationManager "
215 f"for user {username}"
216 )
218 # Create notification manager with settings snapshot
219 notification_manager = NotificationManager(
220 settings_snapshot=settings_snapshot, user_id=username
221 )
223 # Build full URL for notification
224 full_url = build_notification_url(
225 f"/research/{research_id}",
226 settings_manager=settings_manager,
227 )
229 # Build notification context with required fields
230 context = {
231 "query": research.query or "Unknown query",
232 "research_id": research_id,
233 "summary": "No summary available",
234 "url": full_url,
235 }
237 # Get report content for notification
238 from ..storage import get_report_storage
240 storage = get_report_storage(session=db_session)
241 report_content = storage.get_report(research_id)
243 if report_content:
244 # Truncate summary if too long
245 context["summary"] = (
246 report_content[:200] + "..."
247 if len(report_content) > 200
248 else report_content
249 )
251 logger.info(
252 f"Sending RESEARCH_COMPLETED notification for research "
253 f"{research_id} to user {username}"
254 )
255 logger.debug(f"Notification context: {context}")
257 # Send notification using the manager
258 result = notification_manager.send_notification(
259 event_type=EventType.RESEARCH_COMPLETED,
260 context=context,
261 )
263 if result:
264 logger.info(
265 f"Successfully sent completion notification for research {research_id}"
266 )
267 else:
268 logger.warning(
269 f"Completion notification not sent for {research_id} (disabled)"
270 )
272 else:
273 logger.warning(
274 f"Could not find research {research_id} in database, "
275 f"sending notification with minimal details"
276 )
278 # Create notification manager with settings snapshot
279 notification_manager = NotificationManager(
280 settings_snapshot=settings_snapshot, user_id=username
281 )
283 # Build minimal context
284 context = {
285 "query": f"Research {research_id}",
286 "research_id": research_id,
287 "summary": "Research completed but details unavailable",
288 "url": f"/research/{research_id}",
289 }
291 notification_manager.send_notification(
292 event_type=EventType.RESEARCH_COMPLETED,
293 context=context,
294 )
295 logger.info(
296 f"Sent completion notification for research {research_id} (minimal details)"
297 )
299 except RateLimitError:
300 logger.warning(
301 f"Rate limited sending completion notification for {research_id}"
302 )
303 except Exception:
304 logger.exception(
305 f"Failed to send completion notification for {research_id}"
306 )
309def send_research_failed_notification_from_session(
310 username: str,
311 research_id: str,
312 error_message: str,
313 db_session,
314) -> None:
315 """
316 Send research failed notification (research-specific version).
318 This is a convenience wrapper for the queue processor that handles
319 all notification logic for failed research, including:
320 - Research database lookup to get query
321 - Context building with sanitized error message
322 - All logging and error handling
324 Args:
325 username: User who owns the research
326 research_id: UUID of the research
327 error_message: Error message (will be sanitized for security)
328 db_session: Database session to fetch research and settings from
329 """
330 try:
331 logger.info(
332 f"Starting failed notification process for research {research_id}, "
333 f"user {username}"
334 )
336 # Import here to avoid circular dependencies
337 from ..database.models import ResearchHistory
338 from ..settings import SettingsManager
339 from .manager import NotificationManager
341 # Get research details for notification
342 research = (
343 db_session.query(ResearchHistory).filter_by(id=research_id).first()
344 )
346 # Get settings snapshot for thread-safe notification sending
347 settings_manager = SettingsManager(db_session)
348 settings_snapshot = settings_manager.get_settings_snapshot()
350 # Sanitize error message for notification to avoid exposing
351 # sensitive information (as noted by github-advanced-security)
352 safe_error = "Research failed. Check logs for details."
354 if research:
355 logger.info(
356 f"Found research record, creating NotificationManager "
357 f"for user {username}"
358 )
360 # Create notification manager with settings snapshot
361 notification_manager = NotificationManager(
362 settings_snapshot=settings_snapshot, user_id=username
363 )
365 # Build notification context
366 context = {
367 "query": research.query or "Unknown query",
368 "research_id": research_id,
369 "error": safe_error,
370 }
372 logger.info(
373 f"Sending RESEARCH_FAILED notification for research "
374 f"{research_id} to user {username}"
375 )
376 logger.debug(f"Notification context: {context}")
378 # Send notification using the manager
379 result = notification_manager.send_notification(
380 event_type=EventType.RESEARCH_FAILED,
381 context=context,
382 )
384 if result: 384 ↛ 389line 384 didn't jump to line 389 because the condition on line 384 was always true
385 logger.info(
386 f"Successfully sent failure notification for research {research_id}"
387 )
388 else:
389 logger.warning(
390 f"Failure notification not sent for {research_id} (disabled)"
391 )
393 else:
394 logger.warning(
395 f"Could not find research {research_id} in database, "
396 f"sending notification with minimal details"
397 )
399 # Create notification manager with settings snapshot
400 notification_manager = NotificationManager(
401 settings_snapshot=settings_snapshot, user_id=username
402 )
404 # Build minimal context
405 context = {
406 "query": f"Research {research_id}",
407 "research_id": research_id,
408 "error": safe_error,
409 }
411 notification_manager.send_notification(
412 event_type=EventType.RESEARCH_FAILED,
413 context=context,
414 )
415 logger.info(
416 f"Sent failure notification for research {research_id} (minimal details)"
417 )
419 except RateLimitError:
420 logger.warning(
421 f"Rate limited sending failure notification for {research_id}"
422 )
423 except Exception:
424 logger.exception(
425 f"Failed to send failure notification for {research_id}"
426 )