Coverage for src / local_deep_research / web / auth / session_manager.py: 100%
57 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Session management for encrypted database connections.
3Handles session creation, validation, and cleanup.
4"""
6import datetime
7import gc
8import secrets
9from datetime import UTC
10from typing import Dict, Optional
12from loguru import logger
14from ...security import get_security_default
17class SessionManager:
18 """Manages user sessions and database connection lifecycle."""
20 def __init__(self):
21 self.sessions: Dict[str, dict] = {}
22 # Load session timeouts from security settings
23 session_hours = get_security_default(
24 "security.session_timeout_hours", 2
25 )
26 remember_days = get_security_default(
27 "security.session_remember_me_days", 30
28 )
29 self.session_timeout = datetime.timedelta(hours=session_hours)
30 self.remember_me_timeout = datetime.timedelta(days=remember_days)
32 def create_session(self, username: str, remember_me: bool = False) -> str:
33 """Create a new session for a user."""
34 session_id = secrets.token_urlsafe(32)
36 self.sessions[session_id] = {
37 "username": username,
38 "created_at": datetime.datetime.now(UTC),
39 "last_access": datetime.datetime.now(UTC),
40 "remember_me": remember_me,
41 }
43 logger.debug(f"Created session {session_id[:8]}... for user {username}")
44 return session_id
46 def validate_session(self, session_id: str) -> Optional[str]:
47 """
48 Validate a session and return username if valid.
49 Updates last access time.
50 """
51 if session_id not in self.sessions:
52 return None
54 session_data = self.sessions[session_id]
55 now = datetime.datetime.now(UTC)
57 # Check timeout
58 timeout = (
59 self.remember_me_timeout
60 if session_data["remember_me"]
61 else self.session_timeout
62 )
63 if now - session_data["last_access"] > timeout:
64 # Session expired
65 self.destroy_session(session_id)
66 logger.debug(f"Session {session_id[:8]}... expired")
67 return None
69 # Update last access
70 session_data["last_access"] = now
71 return session_data["username"]
73 def destroy_session(self, session_id: str):
74 """Destroy a session and clean up."""
75 if session_id in self.sessions:
76 username = self.sessions[session_id]["username"]
77 del self.sessions[session_id]
79 # Force garbage collection to clear any sensitive data
80 gc.collect()
82 logger.debug(
83 f"Destroyed session {session_id[:8]}... for user {username}"
84 )
86 def cleanup_expired_sessions(self):
87 """Remove all expired sessions."""
88 now = datetime.datetime.now(UTC)
89 expired = []
91 for session_id, data in self.sessions.items():
92 timeout = (
93 self.remember_me_timeout
94 if data["remember_me"]
95 else self.session_timeout
96 )
97 if now - data["last_access"] > timeout:
98 expired.append(session_id)
100 for session_id in expired:
101 self.destroy_session(session_id)
103 if expired:
104 logger.info(f"Cleaned up {len(expired)} expired sessions")
106 def get_active_sessions_count(self) -> int:
107 """Get count of active sessions."""
108 self.cleanup_expired_sessions()
109 return len(self.sessions)
111 def get_user_sessions(self, username: str) -> list:
112 """Get all active sessions for a user."""
113 user_sessions = []
114 for session_id, data in self.sessions.items():
115 if data["username"] == username:
116 user_sessions.append(
117 {
118 "session_id": session_id[:8] + "...",
119 "created_at": data["created_at"],
120 "last_access": data["last_access"],
121 "remember_me": data["remember_me"],
122 }
123 )
124 return user_sessions