Coverage for src / local_deep_research / web / models / database.py: 98%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import os 

2from datetime import datetime, UTC 

3 

4from loguru import logger 

5 

6from ...config.paths import get_data_directory 

7from ...database.models import ResearchLog 

8from ...database.session_context import get_user_db_session 

9 

10# Database paths using new centralized configuration 

11_raw_data_dir = get_data_directory() 

12DATA_DIR: str | None = None 

13if _raw_data_dir: 13 ↛ 20line 13 didn't jump to line 20 because the condition on line 13 was always true

14 DATA_DIR = str(_raw_data_dir) 

15 os.makedirs(DATA_DIR, exist_ok=True) 

16 

17# DB_PATH removed - use per-user encrypted databases instead 

18 

19 

20def get_db_connection(): 

21 """ 

22 Get a connection to the SQLite database. 

23 DEPRECATED: This uses the shared database which should not be used. 

24 Use get_db_session() instead for per-user databases. 

25 """ 

26 raise RuntimeError( 

27 "Shared database access is deprecated. Use get_db_session() for per-user databases." 

28 ) 

29 

30 

31def calculate_duration(created_at_str, completed_at_str=None): 

32 """ 

33 Calculate duration in seconds between created_at timestamp and completed_at or now. 

34 Handles various timestamp formats and returns None if calculation fails. 

35 

36 Args: 

37 created_at_str: The start timestamp 

38 completed_at_str: Optional end timestamp, defaults to current time if None 

39 

40 Returns: 

41 Duration in seconds or None if calculation fails 

42 """ 

43 if not created_at_str: 

44 return None 

45 

46 end_time = None 

47 if completed_at_str: 

48 # Use completed_at time if provided 

49 try: 

50 if "T" in completed_at_str: # ISO format with T separator 

51 end_time = datetime.fromisoformat(completed_at_str) 

52 else: # Older format without T 

53 # Try different formats 

54 try: 

55 end_time = datetime.strptime( 

56 completed_at_str, "%Y-%m-%d %H:%M:%S.%f" 

57 ) 

58 except ValueError: 

59 try: 

60 end_time = datetime.strptime( 

61 completed_at_str, "%Y-%m-%d %H:%M:%S" 

62 ) 

63 except ValueError: 

64 # Last resort fallback 

65 end_time = datetime.fromisoformat( 

66 completed_at_str.replace(" ", "T") 

67 ) 

68 except Exception: 

69 logger.exception("Error parsing completed_at timestamp") 

70 try: 

71 from dateutil import parser # type: ignore[import-untyped] 

72 

73 end_time = parser.parse(completed_at_str) 

74 except Exception: 

75 logger.exception( 

76 f"Fallback parsing also failed for completed_at: {completed_at_str}" 

77 ) 

78 # Fall back to current time 

79 end_time = datetime.now(UTC) 

80 else: 

81 # Use current time if no completed_at provided 

82 end_time = datetime.now(UTC) 

83 # Ensure end_time is UTC. 

84 end_time = end_time.astimezone(UTC) 

85 

86 start_time = None 

87 try: 

88 # Proper parsing of ISO format 

89 if "T" in created_at_str: # ISO format with T separator 

90 start_time = datetime.fromisoformat(created_at_str) 

91 else: # Older format without T 

92 # Try different formats 

93 try: 

94 start_time = datetime.strptime( 

95 created_at_str, "%Y-%m-%d %H:%M:%S.%f" 

96 ) 

97 except ValueError: 

98 try: 

99 start_time = datetime.strptime( 

100 created_at_str, "%Y-%m-%d %H:%M:%S" 

101 ) 

102 except ValueError: 

103 # Last resort fallback 

104 start_time = datetime.fromisoformat( 

105 created_at_str.replace(" ", "T") 

106 ) 

107 except Exception: 

108 logger.exception("Error parsing created_at timestamp") 

109 # Fallback method if parsing fails 

110 try: 

111 from dateutil import parser 

112 

113 start_time = parser.parse(created_at_str) 

114 except Exception: 

115 logger.exception( 

116 f"Fallback parsing also failed for created_at: {created_at_str}" 

117 ) 

118 return None 

119 

120 # Calculate duration if both timestamps are valid 

121 if start_time and end_time: 121 ↛ 127line 121 didn't jump to line 127 because the condition on line 121 was always true

122 try: 

123 return int((end_time - start_time).total_seconds()) 

124 except Exception: 

125 logger.exception("Error calculating duration") 

126 

127 return None 

128 

129 

130def get_logs_for_research(research_id): 

131 """ 

132 Retrieve all logs for a specific research ID 

133 

134 Args: 

135 research_id: ID of the research 

136 

137 Returns: 

138 List of log entries as dictionaries 

139 """ 

140 try: 

141 with get_user_db_session() as session: 

142 log_results = ( 

143 session.query(ResearchLog) 

144 .filter(ResearchLog.research_id == research_id) 

145 .order_by(ResearchLog.timestamp.asc()) 

146 .all() 

147 ) 

148 

149 logs = [] 

150 for result in log_results: 

151 # Convert entry for frontend consumption 

152 formatted_entry = { 

153 "time": result.timestamp, 

154 "message": result.message, 

155 "type": result.level, 

156 "module": result.module, 

157 "line_no": result.line_no, 

158 } 

159 logs.append(formatted_entry) 

160 

161 return logs 

162 except Exception: 

163 logger.exception("Error retrieving logs from database") 

164 return [] 

165 

166 

167@logger.catch 

168def get_total_logs_for_research(research_id): 

169 """ 

170 Returns the total number of logs for a given `research_id`. 

171 

172 Args: 

173 research_id (int): The ID of the research. 

174 

175 Returns: 

176 int: Total number of logs for the specified research ID. 

177 """ 

178 with get_user_db_session() as session: 

179 return ( 

180 session.query(ResearchLog) 

181 .filter(ResearchLog.research_id == research_id) 

182 .count() 

183 )