Coverage for src / local_deep_research / web / auth / session_manager.py: 100%
87 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Session management for encrypted database connections.
3Handles session creation, validation, and cleanup.
4"""
6import datetime
7import secrets
8import threading
9from datetime import UTC
10from typing import Dict, Optional, Set
12from loguru import logger
14from ...security import get_security_default
17class SessionManager:
18 """Manages user sessions and database connection lifecycle."""
20 def __init__(self):
21 self.sessions: Dict[str, dict] = {}
22 self._lock = threading.Lock()
23 # Load session timeouts from security settings
24 session_hours = get_security_default(
25 "security.session_timeout_hours", 2
26 )
27 remember_days = get_security_default(
28 "security.session_remember_me_days", 30
29 )
30 self.session_timeout = datetime.timedelta(hours=session_hours)
31 self.remember_me_timeout = datetime.timedelta(days=remember_days)
33 def create_session(self, username: str, remember_me: bool = False) -> str:
34 """Create a new session for a user.
36 Args:
37 username: The username to create a session for.
38 remember_me: If True, use extended session timeout.
40 Returns:
41 The session ID as a URL-safe string.
42 """
43 session_id = secrets.token_urlsafe(32)
45 with self._lock:
46 self.sessions[session_id] = {
47 "username": username,
48 "created_at": datetime.datetime.now(UTC),
49 "last_access": datetime.datetime.now(UTC),
50 "remember_me": remember_me,
51 }
53 logger.debug(f"Created session {session_id[:8]}... for user {username}")
54 return session_id
56 def validate_session(self, session_id: str) -> Optional[str]:
57 """
58 Validate a session and return username if valid.
59 Updates last access time.
60 """
61 with self._lock:
62 if session_id not in self.sessions:
63 return None
65 session_data = self.sessions[session_id]
66 now = datetime.datetime.now(UTC)
68 # Check timeout
69 timeout = (
70 self.remember_me_timeout
71 if session_data["remember_me"]
72 else self.session_timeout
73 )
74 if now - session_data["last_access"] > timeout:
75 # Session expired — remove inline (already under lock)
76 username = session_data["username"]
77 del self.sessions[session_id]
78 logger.debug(
79 f"Session {session_id[:8]}... expired for {username}"
80 )
81 return None
83 # Update last access
84 session_data["last_access"] = now
85 return str(session_data["username"])
87 def destroy_session(self, session_id: str):
88 """Destroy a session and clean up."""
89 with self._lock:
90 if session_id in self.sessions:
91 username = self.sessions[session_id]["username"]
92 del self.sessions[session_id]
93 logger.debug(
94 f"Destroyed session {session_id[:8]}... for user {username}"
95 )
97 def destroy_all_user_sessions(self, username: str) -> int:
98 """Destroy all sessions for a given user. Returns count destroyed."""
99 with self._lock:
100 to_delete = [
101 sid
102 for sid, data in self.sessions.items()
103 if data["username"] == username
104 ]
105 for sid in to_delete:
106 del self.sessions[sid]
107 if to_delete:
108 logger.debug(
109 f"Destroyed {len(to_delete)} session(s) for user {username}"
110 )
111 return len(to_delete)
113 def cleanup_expired_sessions(self):
114 """Remove all expired sessions."""
115 now = datetime.datetime.now(UTC)
116 expired = []
118 with self._lock:
119 for session_id, data in self.sessions.items():
120 timeout = (
121 self.remember_me_timeout
122 if data["remember_me"]
123 else self.session_timeout
124 )
125 if now - data["last_access"] > timeout:
126 expired.append(session_id)
128 for session_id in expired:
129 del self.sessions[session_id]
131 if expired:
132 logger.info(f"Cleaned up {len(expired)} expired sessions")
134 def get_active_sessions_count(self) -> int:
135 """Get count of active sessions."""
136 self.cleanup_expired_sessions()
137 with self._lock:
138 return len(self.sessions)
140 def get_user_sessions(self, username: str) -> list:
141 """Get all active sessions for a user."""
142 user_sessions = []
143 with self._lock:
144 for session_id, data in self.sessions.items():
145 if data["username"] == username:
146 user_sessions.append(
147 {
148 "session_id": session_id[:8] + "...",
149 "created_at": data["created_at"],
150 "last_access": data["last_access"],
151 "remember_me": data["remember_me"],
152 }
153 )
154 return user_sessions
156 def get_active_usernames(self) -> Set[str]:
157 """Return set of usernames with at least one non-expired session."""
158 now = datetime.datetime.now(UTC)
159 with self._lock:
160 return {
161 data["username"]
162 for data in self.sessions.values()
163 if now - data["last_access"]
164 <= (
165 self.remember_me_timeout
166 if data["remember_me"]
167 else self.session_timeout
168 )
169 }
171 def has_active_sessions_for(self, username: str) -> bool:
172 """Check if a user has any non-expired sessions."""
173 now = datetime.datetime.now(UTC)
174 with self._lock:
175 for data in self.sessions.values():
176 if data["username"] != username:
177 continue
178 timeout = (
179 self.remember_me_timeout
180 if data["remember_me"]
181 else self.session_timeout
182 )
183 if now - data["last_access"] <= timeout:
184 return True
185 return False
188# Module-level singleton
189session_manager = SessionManager()