Coverage for src / local_deep_research / security / account_lockout.py: 99%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Per-user account lockout after repeated failed login attempts.
3Complements the per-IP rate limiting in ``security/rate_limiter.py`` by
4tracking failures at the *username* level. A user who exceeds the
5configured threshold is temporarily locked out regardless of the source IP.
7Note: lockout state is in-memory and per-process. In multi-worker
8deployments (e.g. gunicorn), each worker maintains separate state.
9"""
11import threading
12from datetime import datetime, timedelta, timezone
14from loguru import logger
16from .security_settings import get_security_default
19class AccountLockoutManager:
20 """Track failed login attempts and lock accounts after a threshold."""
22 _MAX_STATE_ENTRIES = 10_000
24 def __init__(
25 self,
26 threshold: int | None = None,
27 lockout_minutes: int | None = None,
28 ) -> None:
29 if threshold is None:
30 threshold = get_security_default(
31 "security.account_lockout_threshold", 10
32 )
33 if lockout_minutes is None:
34 lockout_minutes = get_security_default(
35 "security.account_lockout_duration_minutes", 15
36 )
38 self.threshold: int = threshold
39 self.lockout_minutes: int = lockout_minutes
41 # {username: {"count": int, "locked_until": datetime | None}}
42 self._state: dict[str, dict] = {}
43 self._lock = threading.Lock()
45 # ------------------------------------------------------------------
46 # Public API
47 # ------------------------------------------------------------------
49 def is_locked(self, username: str) -> bool:
50 """Return ``True`` if *username* is currently locked out."""
51 with self._lock:
52 entry = self._state.get(username)
53 if entry is None:
54 return False
55 locked_until = entry.get("locked_until")
56 if locked_until is None:
57 return False
58 if datetime.now(timezone.utc) >= locked_until:
59 # Lockout expired — reset automatically
60 self._state.pop(username, None)
61 logger.info("Account lockout expired")
62 return False
63 return True
65 def _evict(self) -> None:
66 """Remove expired/unlocked entries to reclaim memory.
68 Must be called while ``self._lock`` is held.
69 """
70 now = datetime.now(timezone.utc)
71 expired_keys = [
72 key
73 for key, entry in self._state.items()
74 if entry.get("locked_until") is None or entry["locked_until"] <= now
75 ]
76 for key in expired_keys:
77 del self._state[key]
79 logger.info(
80 "Evicted {} expired/unlocked lockout entries", len(expired_keys)
81 )
83 # Last resort: if still over limit, blanket clear
84 if len(self._state) > self._MAX_STATE_ENTRIES:
85 self._state.clear()
86 logger.info("Blanket-cleared lockout state (still over limit)")
88 def record_failure(self, username: str) -> None:
89 """Record a failed login attempt for *username*."""
90 with self._lock:
91 if len(self._state) > self._MAX_STATE_ENTRIES:
92 self._evict()
93 entry = self._state.setdefault(
94 username, {"count": 0, "locked_until": None}
95 )
96 entry["count"] += 1
97 if entry["count"] >= self.threshold:
98 entry["locked_until"] = datetime.now(timezone.utc) + timedelta(
99 minutes=self.lockout_minutes
100 )
101 logger.warning(
102 "Account locked after {} failed attempts",
103 self.threshold,
104 )
106 def record_success(self, username: str) -> None:
107 """Clear the failure counter for *username* after a successful login."""
108 with self._lock:
109 removed = self._state.pop(username, None)
110 if removed is not None:
111 logger.info("Account lockout cleared after successful login")
114# ------------------------------------------------------------------
115# Module-level singleton
116# ------------------------------------------------------------------
118_manager: AccountLockoutManager | None = None
119_singleton_lock = threading.Lock()
122def get_account_lockout_manager() -> AccountLockoutManager:
123 """Return the module-level singleton ``AccountLockoutManager``."""
124 global _manager
125 if _manager is None:
126 with _singleton_lock:
127 if _manager is None: 127 ↛ 129line 127 didn't jump to line 129
128 _manager = AccountLockoutManager()
129 return _manager