Coverage for src / local_deep_research / web / models / database.py: 98%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import os
2from datetime import datetime, UTC
4from loguru import logger
6from ...config.paths import get_data_directory
7from ...database.models import ResearchLog
8from ...database.session_context import get_user_db_session
10# Database paths using new centralized configuration
11_raw_data_dir = get_data_directory()
12DATA_DIR: str | None = None
13if _raw_data_dir: 13 ↛ 20line 13 didn't jump to line 20 because the condition on line 13 was always true
14 DATA_DIR = str(_raw_data_dir)
15 os.makedirs(DATA_DIR, exist_ok=True)
17# DB_PATH removed - use per-user encrypted databases instead
20def get_db_connection():
21 """
22 Get a connection to the SQLite database.
23 DEPRECATED: This uses the shared database which should not be used.
24 Use get_db_session() instead for per-user databases.
25 """
26 raise RuntimeError(
27 "Shared database access is deprecated. Use get_db_session() for per-user databases."
28 )
31def calculate_duration(created_at_str, completed_at_str=None):
32 """
33 Calculate duration in seconds between created_at timestamp and completed_at or now.
34 Handles various timestamp formats and returns None if calculation fails.
36 Args:
37 created_at_str: The start timestamp
38 completed_at_str: Optional end timestamp, defaults to current time if None
40 Returns:
41 Duration in seconds or None if calculation fails
42 """
43 if not created_at_str:
44 return None
46 end_time = None
47 if completed_at_str:
48 # Use completed_at time if provided
49 try:
50 if "T" in completed_at_str: # ISO format with T separator
51 end_time = datetime.fromisoformat(completed_at_str)
52 else: # Older format without T
53 # Try different formats
54 try:
55 end_time = datetime.strptime(
56 completed_at_str, "%Y-%m-%d %H:%M:%S.%f"
57 )
58 except ValueError:
59 try:
60 end_time = datetime.strptime(
61 completed_at_str, "%Y-%m-%d %H:%M:%S"
62 )
63 except ValueError:
64 # Last resort fallback
65 end_time = datetime.fromisoformat(
66 completed_at_str.replace(" ", "T")
67 )
68 except Exception:
69 logger.exception("Error parsing completed_at timestamp")
70 try:
71 from dateutil import parser # type: ignore[import-untyped]
73 end_time = parser.parse(completed_at_str)
74 except Exception:
75 logger.exception(
76 f"Fallback parsing also failed for completed_at: {completed_at_str}"
77 )
78 # Fall back to current time
79 end_time = datetime.now(UTC)
80 else:
81 # Use current time if no completed_at provided
82 end_time = datetime.now(UTC)
83 # Ensure end_time is UTC.
84 end_time = end_time.astimezone(UTC)
86 start_time = None
87 try:
88 # Proper parsing of ISO format
89 if "T" in created_at_str: # ISO format with T separator
90 start_time = datetime.fromisoformat(created_at_str)
91 else: # Older format without T
92 # Try different formats
93 try:
94 start_time = datetime.strptime(
95 created_at_str, "%Y-%m-%d %H:%M:%S.%f"
96 )
97 except ValueError:
98 try:
99 start_time = datetime.strptime(
100 created_at_str, "%Y-%m-%d %H:%M:%S"
101 )
102 except ValueError:
103 # Last resort fallback
104 start_time = datetime.fromisoformat(
105 created_at_str.replace(" ", "T")
106 )
107 except Exception:
108 logger.exception("Error parsing created_at timestamp")
109 # Fallback method if parsing fails
110 try:
111 from dateutil import parser
113 start_time = parser.parse(created_at_str)
114 except Exception:
115 logger.exception(
116 f"Fallback parsing also failed for created_at: {created_at_str}"
117 )
118 return None
120 # Calculate duration if both timestamps are valid
121 if start_time and end_time: 121 ↛ 127line 121 didn't jump to line 127 because the condition on line 121 was always true
122 try:
123 return int((end_time - start_time).total_seconds())
124 except Exception:
125 logger.exception("Error calculating duration")
127 return None
130def get_logs_for_research(research_id):
131 """
132 Retrieve all logs for a specific research ID
134 Args:
135 research_id: ID of the research
137 Returns:
138 List of log entries as dictionaries
139 """
140 try:
141 with get_user_db_session() as session:
142 log_results = (
143 session.query(ResearchLog)
144 .filter(ResearchLog.research_id == research_id)
145 .order_by(ResearchLog.timestamp.asc())
146 .all()
147 )
149 logs = []
150 for result in log_results:
151 # Convert entry for frontend consumption
152 formatted_entry = {
153 "time": result.timestamp,
154 "message": result.message,
155 "type": result.level,
156 "module": result.module,
157 "line_no": result.line_no,
158 }
159 logs.append(formatted_entry)
161 return logs
162 except Exception:
163 logger.exception("Error retrieving logs from database")
164 return []
167@logger.catch
168def get_total_logs_for_research(research_id):
169 """
170 Returns the total number of logs for a given `research_id`.
172 Args:
173 research_id (int): The ID of the research.
175 Returns:
176 int: Total number of logs for the specified research ID.
177 """
178 with get_user_db_session() as session:
179 return (
180 session.query(ResearchLog)
181 .filter(ResearchLog.research_id == research_id)
182 .count()
183 )