Coverage for src / local_deep_research / error_handling / error_reporter.py: 90%
81 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2ErrorReporter - Main error categorization and handling logic
3"""
5import re
6from enum import Enum
7from typing import Any, Dict, Optional
9from loguru import logger
12class ErrorCategory(Enum):
13 """Categories of errors that can occur during research"""
15 CONNECTION_ERROR = "connection_error"
16 MODEL_ERROR = "model_error"
17 SEARCH_ERROR = "search_error"
18 SYNTHESIS_ERROR = "synthesis_error"
19 FILE_ERROR = "file_error"
20 RATE_LIMIT_ERROR = "rate_limit_error"
21 UNKNOWN_ERROR = "unknown_error"
24class ErrorReporter:
25 """
26 Analyzes and categorizes errors to provide better user feedback
27 """
29 def __init__(self):
30 self.error_patterns = {
31 ErrorCategory.CONNECTION_ERROR: [
32 r"POST predict.*EOF",
33 r"Connection refused",
34 r"timeout",
35 r"Connection.*failed",
36 r"HTTP error \d+",
37 r"network.*error",
38 r"\[Errno 111\]",
39 r"host\.docker\.internal",
40 r"host.*localhost.*Docker",
41 r"127\.0\.0\.1.*Docker",
42 r"localhost.*1234.*Docker",
43 r"LM.*Studio.*Docker.*Mac",
44 ],
45 ErrorCategory.MODEL_ERROR: [
46 r"Model.*not found",
47 r"Invalid.*model",
48 r"Ollama.*not available",
49 r"API key.*invalid",
50 r"Authentication.*error",
51 r"max_workers must be greater than 0",
52 r"TypeError.*Context.*Size",
53 r"'<' not supported between",
54 r"No auth credentials found",
55 r"401.*API key",
56 ],
57 ErrorCategory.RATE_LIMIT_ERROR: [
58 r"429.*resource.*exhausted",
59 r"429.*too many requests",
60 r"rate limit",
61 r"rate_limit",
62 r"ratelimit",
63 r"quota.*exceeded",
64 r"resource.*exhausted.*quota",
65 r"threshold.*requests",
66 r"LLM rate limit",
67 r"API rate limit",
68 r"maximum.*requests.*minute",
69 r"maximum.*requests.*hour",
70 ],
71 ErrorCategory.SEARCH_ERROR: [
72 r"Search.*failed",
73 r"No search results",
74 r"Search engine.*error",
75 r"The search is longer than 256 characters",
76 r"Failed to create search engine",
77 r"could not be found",
78 r"GitHub API error",
79 r"database.*locked",
80 ],
81 ErrorCategory.SYNTHESIS_ERROR: [
82 r"Error.*synthesis",
83 r"Failed.*generate",
84 r"Synthesis.*timeout",
85 r"detailed.*report.*stuck",
86 r"report.*taking.*long",
87 r"progress.*100.*stuck",
88 ],
89 ErrorCategory.FILE_ERROR: [
90 r"Permission denied",
91 r"File.*not found",
92 r"Cannot write.*file",
93 r"Disk.*full",
94 r"No module named.*local_deep_research",
95 r"HTTP error 404.*research results",
96 r"Attempt to write readonly database",
97 ],
98 }
100 def categorize_error(self, error_message: str) -> ErrorCategory:
101 """
102 Categorize an error based on its message
104 Args:
105 error_message: The error message to categorize
107 Returns:
108 ErrorCategory: The categorized error type
109 """
110 error_message = str(error_message).lower()
112 for category, patterns in self.error_patterns.items():
113 for pattern in patterns:
114 if re.search(pattern.lower(), error_message):
115 logger.debug(
116 f"Categorized error as {category.value}: {pattern}"
117 )
118 return category
120 return ErrorCategory.UNKNOWN_ERROR
122 def get_user_friendly_title(self, category: ErrorCategory) -> str:
123 """
124 Get a user-friendly title for an error category
126 Args:
127 category: The error category
129 Returns:
130 str: User-friendly title
131 """
132 titles = {
133 ErrorCategory.CONNECTION_ERROR: "Connection Issue",
134 ErrorCategory.MODEL_ERROR: "LLM Service Error",
135 ErrorCategory.SEARCH_ERROR: "Search Service Error",
136 ErrorCategory.SYNTHESIS_ERROR: "Report Generation Error",
137 ErrorCategory.FILE_ERROR: "File System Error",
138 ErrorCategory.RATE_LIMIT_ERROR: "API Rate Limit Exceeded",
139 ErrorCategory.UNKNOWN_ERROR: "Unexpected Error",
140 }
141 return titles.get(category, "Error")
143 def get_suggested_actions(self, category: ErrorCategory) -> list:
144 """
145 Get suggested actions for resolving an error
147 Args:
148 category: The error category
150 Returns:
151 list: List of suggested actions
152 """
153 suggestions = {
154 ErrorCategory.CONNECTION_ERROR: [
155 "Check if the LLM service (Ollama/LM Studio) is running",
156 "Verify network connectivity",
157 "Try switching to a different model provider",
158 "Check the service logs for more details",
159 ],
160 ErrorCategory.MODEL_ERROR: [
161 "Verify the model name is correct",
162 "Check if the model is downloaded and available",
163 "Validate API keys if using external services",
164 "Try switching to a different model",
165 ],
166 ErrorCategory.SEARCH_ERROR: [
167 "Check internet connectivity",
168 "Try reducing the number of search results",
169 "Wait a moment and try again",
170 "Check if search service is configured correctly",
171 "For local documents: ensure the path is absolute and folder exists",
172 "Try a different search engine if one is failing",
173 ],
174 ErrorCategory.SYNTHESIS_ERROR: [
175 "The research data was collected successfully",
176 "Try switching to a different model for report generation",
177 "Check the partial results below",
178 "Review the detailed logs for more information",
179 ],
180 ErrorCategory.FILE_ERROR: [
181 "Check disk space availability",
182 "Verify write permissions",
183 "Try changing the output directory",
184 "Restart the application",
185 ],
186 ErrorCategory.RATE_LIMIT_ERROR: [
187 "The API has reached its rate limit",
188 "Enable LLM Rate Limiting in Settings → Rate Limiting → Enable LLM Rate Limiting",
189 "Once enabled, the system will automatically learn and adapt to API limits",
190 "Consider upgrading to a paid API plan for higher limits",
191 "Try using a different model temporarily",
192 ],
193 ErrorCategory.UNKNOWN_ERROR: [
194 "Check the detailed logs below for more information",
195 "Try running the research again",
196 "Report this issue if it persists",
197 "Contact support with the error details",
198 ],
199 }
200 return suggestions.get(category, ["Check the logs for more details"])
202 def analyze_error(
203 self, error_message: str, context: Optional[Dict[str, Any]] = None
204 ) -> Dict[str, Any]:
205 """
206 Perform comprehensive error analysis
208 Args:
209 error_message: The error message to analyze
210 context: Optional context information
212 Returns:
213 dict: Comprehensive error analysis
214 """
215 category = self.categorize_error(error_message)
217 analysis = {
218 "category": category,
219 "title": self.get_user_friendly_title(category),
220 "original_error": error_message,
221 "suggestions": self.get_suggested_actions(category),
222 "severity": self._determine_severity(category),
223 "recoverable": self._is_recoverable(category),
224 }
226 # Add context-specific information
227 if context:
228 analysis["context"] = context
229 analysis["has_partial_results"] = bool(
230 context.get("findings")
231 or context.get("current_knowledge")
232 or context.get("search_results")
233 )
235 # Send notifications for specific error types
236 self._send_error_notifications(category, error_message, context)
238 return analysis
240 def _send_error_notifications(
241 self,
242 category: ErrorCategory,
243 error_message: str,
244 context: Optional[Dict[str, Any]] = None,
245 ) -> None:
246 """
247 Send notifications for specific error categories.
249 Args:
250 category: Error category
251 error_message: Error message
252 context: Optional context information
253 """
254 try:
255 # Only send notifications for AUTH and QUOTA errors
256 if category not in [
257 ErrorCategory.MODEL_ERROR,
258 ErrorCategory.RATE_LIMIT_ERROR,
259 ]:
260 return
262 # Try to get username from context
263 username = None
264 if context:
265 username = context.get("username")
267 # Don't send notifications if we can't determine user
268 if not username:
269 logger.debug(
270 "No username in context, skipping error notification"
271 )
272 return
274 from ..notifications.manager import NotificationManager
275 from ..notifications import EventType
276 from ..database.session_context import get_user_db_session
278 # Get settings snapshot for notification
279 with get_user_db_session(username) as session:
280 from ..settings import SettingsManager
282 settings_manager = SettingsManager(session)
283 settings_snapshot = settings_manager.get_settings_snapshot()
285 notification_manager = NotificationManager(
286 settings_snapshot=settings_snapshot, user_id=username
287 )
289 # Determine event type and build context
290 if category == ErrorCategory.MODEL_ERROR: 290 ↛ 310line 290 didn't jump to line 310 because the condition on line 290 was always true
291 # Check if it's an auth error specifically
292 error_str = error_message.lower()
293 if any( 293 ↛ 308line 293 didn't jump to line 308 because the condition on line 293 was always true
294 pattern in error_str
295 for pattern in [
296 "api key",
297 "authentication",
298 "401",
299 "unauthorized",
300 ]
301 ):
302 event_type = EventType.AUTH_ISSUE
303 notification_context = {
304 "service": self._extract_service_name(error_message),
305 }
306 else:
307 # Not an auth error, don't notify
308 return
310 elif category == ErrorCategory.RATE_LIMIT_ERROR:
311 event_type = EventType.API_QUOTA_WARNING
312 notification_context = {
313 "service": self._extract_service_name(error_message),
314 "current": "Unknown",
315 "limit": "Unknown",
316 "reset_time": "Unknown",
317 }
319 else:
320 return
322 # Send notification
323 notification_manager.send_notification(
324 event_type=event_type,
325 context=notification_context,
326 )
328 except Exception as e:
329 logger.debug(f"Failed to send error notification: {e}")
331 def _extract_service_name(self, error_message: str) -> str:
332 """
333 Extract service name from error message.
335 Args:
336 error_message: Error message
338 Returns:
339 Service name or "API Service"
340 """
341 error_lower = error_message.lower()
343 # Check for common service names
344 services = [
345 "openai",
346 "anthropic",
347 "google",
348 "ollama",
349 "searxng",
350 "tavily",
351 "brave",
352 ]
354 for service in services:
355 if service in error_lower:
356 return service.title()
358 return "API Service"
360 def _determine_severity(self, category: ErrorCategory) -> str:
361 """Determine error severity level"""
362 severity_map = {
363 ErrorCategory.CONNECTION_ERROR: "high",
364 ErrorCategory.MODEL_ERROR: "high",
365 ErrorCategory.SEARCH_ERROR: "medium",
366 ErrorCategory.SYNTHESIS_ERROR: "low", # Can often show partial results
367 ErrorCategory.FILE_ERROR: "medium",
368 ErrorCategory.RATE_LIMIT_ERROR: "medium", # Can be resolved with settings
369 ErrorCategory.UNKNOWN_ERROR: "high",
370 }
371 return severity_map.get(category, "medium")
373 def _is_recoverable(self, category: ErrorCategory) -> bool:
374 """Determine if error is recoverable with user action"""
375 recoverable = {
376 ErrorCategory.CONNECTION_ERROR: True,
377 ErrorCategory.MODEL_ERROR: True,
378 ErrorCategory.SEARCH_ERROR: True,
379 ErrorCategory.SYNTHESIS_ERROR: True,
380 ErrorCategory.FILE_ERROR: True,
381 ErrorCategory.RATE_LIMIT_ERROR: True, # Can enable rate limiting
382 ErrorCategory.UNKNOWN_ERROR: False,
383 }
384 return recoverable.get(category, False)