Coverage for src / local_deep_research / web / auth / session_manager.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Session management for encrypted database connections. 

3Handles session creation, validation, and cleanup. 

4""" 

5 

6import gc 

7import secrets 

8import datetime 

9from datetime import UTC 

10from typing import Dict, Optional 

11 

12from loguru import logger 

13 

14 

15class SessionManager: 

16 """Manages user sessions and database connection lifecycle.""" 

17 

18 def __init__(self): 

19 self.sessions: Dict[str, dict] = {} 

20 self.session_timeout = datetime.timedelta(hours=2) # 2 hour timeout 

21 self.remember_me_timeout = datetime.timedelta( 

22 days=30 

23 ) # 30 days for "remember me" 

24 

25 def create_session(self, username: str, remember_me: bool = False) -> str: 

26 """Create a new session for a user.""" 

27 session_id = secrets.token_urlsafe(32) 

28 

29 self.sessions[session_id] = { 

30 "username": username, 

31 "created_at": datetime.datetime.now(UTC), 

32 "last_access": datetime.datetime.now(UTC), 

33 "remember_me": remember_me, 

34 } 

35 

36 logger.debug(f"Created session {session_id[:8]}... for user {username}") 

37 return session_id 

38 

39 def validate_session(self, session_id: str) -> Optional[str]: 

40 """ 

41 Validate a session and return username if valid. 

42 Updates last access time. 

43 """ 

44 if session_id not in self.sessions: 

45 return None 

46 

47 session_data = self.sessions[session_id] 

48 now = datetime.datetime.now(UTC) 

49 

50 # Check timeout 

51 timeout = ( 

52 self.remember_me_timeout 

53 if session_data["remember_me"] 

54 else self.session_timeout 

55 ) 

56 if now - session_data["last_access"] > timeout: 

57 # Session expired 

58 self.destroy_session(session_id) 

59 logger.debug(f"Session {session_id[:8]}... expired") 

60 return None 

61 

62 # Update last access 

63 session_data["last_access"] = now 

64 return session_data["username"] 

65 

66 def destroy_session(self, session_id: str): 

67 """Destroy a session and clean up.""" 

68 if session_id in self.sessions: 

69 username = self.sessions[session_id]["username"] 

70 del self.sessions[session_id] 

71 

72 # Force garbage collection to clear any sensitive data 

73 gc.collect() 

74 

75 logger.debug( 

76 f"Destroyed session {session_id[:8]}... for user {username}" 

77 ) 

78 

79 def cleanup_expired_sessions(self): 

80 """Remove all expired sessions.""" 

81 now = datetime.datetime.now(UTC) 

82 expired = [] 

83 

84 for session_id, data in self.sessions.items(): 

85 timeout = ( 

86 self.remember_me_timeout 

87 if data["remember_me"] 

88 else self.session_timeout 

89 ) 

90 if now - data["last_access"] > timeout: 

91 expired.append(session_id) 

92 

93 for session_id in expired: 

94 self.destroy_session(session_id) 

95 

96 if expired: 

97 logger.info(f"Cleaned up {len(expired)} expired sessions") 

98 

99 def get_active_sessions_count(self) -> int: 

100 """Get count of active sessions.""" 

101 self.cleanup_expired_sessions() 

102 return len(self.sessions) 

103 

104 def get_user_sessions(self, username: str) -> list: 

105 """Get all active sessions for a user.""" 

106 user_sessions = [] 

107 for session_id, data in self.sessions.items(): 

108 if data["username"] == username: 

109 user_sessions.append( 

110 { 

111 "session_id": session_id[:8] + "...", 

112 "created_at": data["created_at"], 

113 "last_access": data["last_access"], 

114 "remember_me": data["remember_me"], 

115 } 

116 ) 

117 return user_sessions