Coverage for src / local_deep_research / web / auth / session_manager.py: 100%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Session management for encrypted database connections. 

3Handles session creation, validation, and cleanup. 

4""" 

5 

6import datetime 

7import secrets 

8import threading 

9from datetime import UTC 

10from typing import Dict, Optional, Set 

11 

12from loguru import logger 

13 

14from ...security import get_security_default 

15 

16 

17class SessionManager: 

18 """Manages user sessions and database connection lifecycle.""" 

19 

20 def __init__(self): 

21 self.sessions: Dict[str, dict] = {} 

22 self._lock = threading.Lock() 

23 # Load session timeouts from security settings 

24 session_hours = get_security_default( 

25 "security.session_timeout_hours", 2 

26 ) 

27 remember_days = get_security_default( 

28 "security.session_remember_me_days", 30 

29 ) 

30 self.session_timeout = datetime.timedelta(hours=session_hours) 

31 self.remember_me_timeout = datetime.timedelta(days=remember_days) 

32 

33 def create_session(self, username: str, remember_me: bool = False) -> str: 

34 """Create a new session for a user. 

35 

36 Args: 

37 username: The username to create a session for. 

38 remember_me: If True, use extended session timeout. 

39 

40 Returns: 

41 The session ID as a URL-safe string. 

42 """ 

43 session_id = secrets.token_urlsafe(32) 

44 

45 with self._lock: 

46 self.sessions[session_id] = { 

47 "username": username, 

48 "created_at": datetime.datetime.now(UTC), 

49 "last_access": datetime.datetime.now(UTC), 

50 "remember_me": remember_me, 

51 } 

52 

53 logger.debug(f"Created session {session_id[:8]}... for user {username}") 

54 return session_id 

55 

56 def validate_session(self, session_id: str) -> Optional[str]: 

57 """ 

58 Validate a session and return username if valid. 

59 Updates last access time. 

60 """ 

61 with self._lock: 

62 if session_id not in self.sessions: 

63 return None 

64 

65 session_data = self.sessions[session_id] 

66 now = datetime.datetime.now(UTC) 

67 

68 # Check timeout 

69 timeout = ( 

70 self.remember_me_timeout 

71 if session_data["remember_me"] 

72 else self.session_timeout 

73 ) 

74 if now - session_data["last_access"] > timeout: 

75 # Session expired — remove inline (already under lock) 

76 username = session_data["username"] 

77 del self.sessions[session_id] 

78 logger.debug( 

79 f"Session {session_id[:8]}... expired for {username}" 

80 ) 

81 return None 

82 

83 # Update last access 

84 session_data["last_access"] = now 

85 return str(session_data["username"]) 

86 

87 def destroy_session(self, session_id: str): 

88 """Destroy a session and clean up.""" 

89 with self._lock: 

90 if session_id in self.sessions: 

91 username = self.sessions[session_id]["username"] 

92 del self.sessions[session_id] 

93 logger.debug( 

94 f"Destroyed session {session_id[:8]}... for user {username}" 

95 ) 

96 

97 def destroy_all_user_sessions(self, username: str) -> int: 

98 """Destroy all sessions for a given user. Returns count destroyed.""" 

99 with self._lock: 

100 to_delete = [ 

101 sid 

102 for sid, data in self.sessions.items() 

103 if data["username"] == username 

104 ] 

105 for sid in to_delete: 

106 del self.sessions[sid] 

107 if to_delete: 

108 logger.debug( 

109 f"Destroyed {len(to_delete)} session(s) for user {username}" 

110 ) 

111 return len(to_delete) 

112 

113 def cleanup_expired_sessions(self): 

114 """Remove all expired sessions.""" 

115 now = datetime.datetime.now(UTC) 

116 expired = [] 

117 

118 with self._lock: 

119 for session_id, data in self.sessions.items(): 

120 timeout = ( 

121 self.remember_me_timeout 

122 if data["remember_me"] 

123 else self.session_timeout 

124 ) 

125 if now - data["last_access"] > timeout: 

126 expired.append(session_id) 

127 

128 for session_id in expired: 

129 del self.sessions[session_id] 

130 

131 if expired: 

132 logger.info(f"Cleaned up {len(expired)} expired sessions") 

133 

134 def get_active_sessions_count(self) -> int: 

135 """Get count of active sessions.""" 

136 self.cleanup_expired_sessions() 

137 with self._lock: 

138 return len(self.sessions) 

139 

140 def get_user_sessions(self, username: str) -> list: 

141 """Get all active sessions for a user.""" 

142 user_sessions = [] 

143 with self._lock: 

144 for session_id, data in self.sessions.items(): 

145 if data["username"] == username: 

146 user_sessions.append( 

147 { 

148 "session_id": session_id[:8] + "...", 

149 "created_at": data["created_at"], 

150 "last_access": data["last_access"], 

151 "remember_me": data["remember_me"], 

152 } 

153 ) 

154 return user_sessions 

155 

156 def get_active_usernames(self) -> Set[str]: 

157 """Return set of usernames with at least one non-expired session.""" 

158 now = datetime.datetime.now(UTC) 

159 with self._lock: 

160 return { 

161 data["username"] 

162 for data in self.sessions.values() 

163 if now - data["last_access"] 

164 <= ( 

165 self.remember_me_timeout 

166 if data["remember_me"] 

167 else self.session_timeout 

168 ) 

169 } 

170 

171 def has_active_sessions_for(self, username: str) -> bool: 

172 """Check if a user has any non-expired sessions.""" 

173 now = datetime.datetime.now(UTC) 

174 with self._lock: 

175 for data in self.sessions.values(): 

176 if data["username"] != username: 

177 continue 

178 timeout = ( 

179 self.remember_me_timeout 

180 if data["remember_me"] 

181 else self.session_timeout 

182 ) 

183 if now - data["last_access"] <= timeout: 

184 return True 

185 return False 

186 

187 

188# Module-level singleton 

189session_manager = SessionManager()