Coverage for src / local_deep_research / web / auth / session_manager.py: 100%
54 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Session management for encrypted database connections.
3Handles session creation, validation, and cleanup.
4"""
6import gc
7import secrets
8import datetime
9from datetime import UTC
10from typing import Dict, Optional
12from loguru import logger
15class SessionManager:
16 """Manages user sessions and database connection lifecycle."""
18 def __init__(self):
19 self.sessions: Dict[str, dict] = {}
20 self.session_timeout = datetime.timedelta(hours=2) # 2 hour timeout
21 self.remember_me_timeout = datetime.timedelta(
22 days=30
23 ) # 30 days for "remember me"
25 def create_session(self, username: str, remember_me: bool = False) -> str:
26 """Create a new session for a user."""
27 session_id = secrets.token_urlsafe(32)
29 self.sessions[session_id] = {
30 "username": username,
31 "created_at": datetime.datetime.now(UTC),
32 "last_access": datetime.datetime.now(UTC),
33 "remember_me": remember_me,
34 }
36 logger.debug(f"Created session {session_id[:8]}... for user {username}")
37 return session_id
39 def validate_session(self, session_id: str) -> Optional[str]:
40 """
41 Validate a session and return username if valid.
42 Updates last access time.
43 """
44 if session_id not in self.sessions:
45 return None
47 session_data = self.sessions[session_id]
48 now = datetime.datetime.now(UTC)
50 # Check timeout
51 timeout = (
52 self.remember_me_timeout
53 if session_data["remember_me"]
54 else self.session_timeout
55 )
56 if now - session_data["last_access"] > timeout:
57 # Session expired
58 self.destroy_session(session_id)
59 logger.debug(f"Session {session_id[:8]}... expired")
60 return None
62 # Update last access
63 session_data["last_access"] = now
64 return session_data["username"]
66 def destroy_session(self, session_id: str):
67 """Destroy a session and clean up."""
68 if session_id in self.sessions:
69 username = self.sessions[session_id]["username"]
70 del self.sessions[session_id]
72 # Force garbage collection to clear any sensitive data
73 gc.collect()
75 logger.debug(
76 f"Destroyed session {session_id[:8]}... for user {username}"
77 )
79 def cleanup_expired_sessions(self):
80 """Remove all expired sessions."""
81 now = datetime.datetime.now(UTC)
82 expired = []
84 for session_id, data in self.sessions.items():
85 timeout = (
86 self.remember_me_timeout
87 if data["remember_me"]
88 else self.session_timeout
89 )
90 if now - data["last_access"] > timeout:
91 expired.append(session_id)
93 for session_id in expired:
94 self.destroy_session(session_id)
96 if expired:
97 logger.info(f"Cleaned up {len(expired)} expired sessions")
99 def get_active_sessions_count(self) -> int:
100 """Get count of active sessions."""
101 self.cleanup_expired_sessions()
102 return len(self.sessions)
104 def get_user_sessions(self, username: str) -> list:
105 """Get all active sessions for a user."""
106 user_sessions = []
107 for session_id, data in self.sessions.items():
108 if data["username"] == username:
109 user_sessions.append(
110 {
111 "session_id": session_id[:8] + "...",
112 "created_at": data["created_at"],
113 "last_access": data["last_access"],
114 "remember_me": data["remember_me"],
115 }
116 )
117 return user_sessions