Coverage for src / local_deep_research / web / auth / session_manager.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Session management for encrypted database connections. 

3Handles session creation, validation, and cleanup. 

4""" 

5 

6import datetime 

7import gc 

8import secrets 

9from datetime import UTC 

10from typing import Dict, Optional 

11 

12from loguru import logger 

13 

14from ...security import get_security_default 

15 

16 

17class SessionManager: 

18 """Manages user sessions and database connection lifecycle.""" 

19 

20 def __init__(self): 

21 self.sessions: Dict[str, dict] = {} 

22 # Load session timeouts from security settings 

23 session_hours = get_security_default( 

24 "security.session_timeout_hours", 2 

25 ) 

26 remember_days = get_security_default( 

27 "security.session_remember_me_days", 30 

28 ) 

29 self.session_timeout = datetime.timedelta(hours=session_hours) 

30 self.remember_me_timeout = datetime.timedelta(days=remember_days) 

31 

32 def create_session(self, username: str, remember_me: bool = False) -> str: 

33 """Create a new session for a user.""" 

34 session_id = secrets.token_urlsafe(32) 

35 

36 self.sessions[session_id] = { 

37 "username": username, 

38 "created_at": datetime.datetime.now(UTC), 

39 "last_access": datetime.datetime.now(UTC), 

40 "remember_me": remember_me, 

41 } 

42 

43 logger.debug(f"Created session {session_id[:8]}... for user {username}") 

44 return session_id 

45 

46 def validate_session(self, session_id: str) -> Optional[str]: 

47 """ 

48 Validate a session and return username if valid. 

49 Updates last access time. 

50 """ 

51 if session_id not in self.sessions: 

52 return None 

53 

54 session_data = self.sessions[session_id] 

55 now = datetime.datetime.now(UTC) 

56 

57 # Check timeout 

58 timeout = ( 

59 self.remember_me_timeout 

60 if session_data["remember_me"] 

61 else self.session_timeout 

62 ) 

63 if now - session_data["last_access"] > timeout: 

64 # Session expired 

65 self.destroy_session(session_id) 

66 logger.debug(f"Session {session_id[:8]}... expired") 

67 return None 

68 

69 # Update last access 

70 session_data["last_access"] = now 

71 return session_data["username"] 

72 

73 def destroy_session(self, session_id: str): 

74 """Destroy a session and clean up.""" 

75 if session_id in self.sessions: 

76 username = self.sessions[session_id]["username"] 

77 del self.sessions[session_id] 

78 

79 # Force garbage collection to clear any sensitive data 

80 gc.collect() 

81 

82 logger.debug( 

83 f"Destroyed session {session_id[:8]}... for user {username}" 

84 ) 

85 

86 def cleanup_expired_sessions(self): 

87 """Remove all expired sessions.""" 

88 now = datetime.datetime.now(UTC) 

89 expired = [] 

90 

91 for session_id, data in self.sessions.items(): 

92 timeout = ( 

93 self.remember_me_timeout 

94 if data["remember_me"] 

95 else self.session_timeout 

96 ) 

97 if now - data["last_access"] > timeout: 

98 expired.append(session_id) 

99 

100 for session_id in expired: 

101 self.destroy_session(session_id) 

102 

103 if expired: 

104 logger.info(f"Cleaned up {len(expired)} expired sessions") 

105 

106 def get_active_sessions_count(self) -> int: 

107 """Get count of active sessions.""" 

108 self.cleanup_expired_sessions() 

109 return len(self.sessions) 

110 

111 def get_user_sessions(self, username: str) -> list: 

112 """Get all active sessions for a user.""" 

113 user_sessions = [] 

114 for session_id, data in self.sessions.items(): 

115 if data["username"] == username: 

116 user_sessions.append( 

117 { 

118 "session_id": session_id[:8] + "...", 

119 "created_at": data["created_at"], 

120 "last_access": data["last_access"], 

121 "remember_me": data["remember_me"], 

122 } 

123 ) 

124 return user_sessions