Coverage for src / local_deep_research / security / account_lockout.py: 99%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Per-user account lockout after repeated failed login attempts. 

2 

3Complements the per-IP rate limiting in ``security/rate_limiter.py`` by 

4tracking failures at the *username* level. A user who exceeds the 

5configured threshold is temporarily locked out regardless of the source IP. 

6 

7Note: lockout state is in-memory and per-process. In multi-worker 

8deployments (e.g. gunicorn), each worker maintains separate state. 

9""" 

10 

11import threading 

12from datetime import datetime, timedelta, timezone 

13 

14from loguru import logger 

15 

16from .security_settings import get_security_default 

17 

18 

19class AccountLockoutManager: 

20 """Track failed login attempts and lock accounts after a threshold.""" 

21 

22 _MAX_STATE_ENTRIES = 10_000 

23 

24 def __init__( 

25 self, 

26 threshold: int | None = None, 

27 lockout_minutes: int | None = None, 

28 ) -> None: 

29 if threshold is None: 

30 threshold = get_security_default( 

31 "security.account_lockout_threshold", 10 

32 ) 

33 if lockout_minutes is None: 

34 lockout_minutes = get_security_default( 

35 "security.account_lockout_duration_minutes", 15 

36 ) 

37 

38 self.threshold: int = threshold 

39 self.lockout_minutes: int = lockout_minutes 

40 

41 # {username: {"count": int, "locked_until": datetime | None}} 

42 self._state: dict[str, dict] = {} 

43 self._lock = threading.Lock() 

44 

45 # ------------------------------------------------------------------ 

46 # Public API 

47 # ------------------------------------------------------------------ 

48 

49 def is_locked(self, username: str) -> bool: 

50 """Return ``True`` if *username* is currently locked out.""" 

51 with self._lock: 

52 entry = self._state.get(username) 

53 if entry is None: 

54 return False 

55 locked_until = entry.get("locked_until") 

56 if locked_until is None: 

57 return False 

58 if datetime.now(timezone.utc) >= locked_until: 

59 # Lockout expired — reset automatically 

60 self._state.pop(username, None) 

61 logger.info("Account lockout expired") 

62 return False 

63 return True 

64 

65 def _evict(self) -> None: 

66 """Remove expired/unlocked entries to reclaim memory. 

67 

68 Must be called while ``self._lock`` is held. 

69 """ 

70 now = datetime.now(timezone.utc) 

71 expired_keys = [ 

72 key 

73 for key, entry in self._state.items() 

74 if entry.get("locked_until") is None or entry["locked_until"] <= now 

75 ] 

76 for key in expired_keys: 

77 del self._state[key] 

78 

79 logger.info( 

80 "Evicted {} expired/unlocked lockout entries", len(expired_keys) 

81 ) 

82 

83 # Last resort: if still over limit, blanket clear 

84 if len(self._state) > self._MAX_STATE_ENTRIES: 

85 self._state.clear() 

86 logger.info("Blanket-cleared lockout state (still over limit)") 

87 

88 def record_failure(self, username: str) -> None: 

89 """Record a failed login attempt for *username*.""" 

90 with self._lock: 

91 if len(self._state) > self._MAX_STATE_ENTRIES: 

92 self._evict() 

93 entry = self._state.setdefault( 

94 username, {"count": 0, "locked_until": None} 

95 ) 

96 entry["count"] += 1 

97 if entry["count"] >= self.threshold: 

98 entry["locked_until"] = datetime.now(timezone.utc) + timedelta( 

99 minutes=self.lockout_minutes 

100 ) 

101 logger.warning( 

102 "Account locked after {} failed attempts", 

103 self.threshold, 

104 ) 

105 

106 def record_success(self, username: str) -> None: 

107 """Clear the failure counter for *username* after a successful login.""" 

108 with self._lock: 

109 removed = self._state.pop(username, None) 

110 if removed is not None: 

111 logger.info("Account lockout cleared after successful login") 

112 

113 

114# ------------------------------------------------------------------ 

115# Module-level singleton 

116# ------------------------------------------------------------------ 

117 

118_manager: AccountLockoutManager | None = None 

119_singleton_lock = threading.Lock() 

120 

121 

122def get_account_lockout_manager() -> AccountLockoutManager: 

123 """Return the module-level singleton ``AccountLockoutManager``.""" 

124 global _manager 

125 if _manager is None: 

126 with _singleton_lock: 

127 if _manager is None: 127 ↛ 129line 127 didn't jump to line 129

128 _manager = AccountLockoutManager() 

129 return _manager