Coverage for src / local_deep_research / database / credential_store_base.py: 94%
41 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Base class for credential storage with TTL.
3Provides common functionality for storing credentials with TTL expiration.
5NOTE: Credentials are stored in plain text in memory. The encryption
6functionality was removed as it provided no real security benefit -
7if an attacker can read process memory, they have sufficient privileges
8to bypass any protections anyway.
9"""
11import time
12from abc import ABC, abstractmethod
13from threading import Lock
14from typing import Dict, Optional, Tuple
17class CredentialStoreBase(ABC):
18 """
19 Base class for storing credentials with expiration.
20 Credentials are stored in plain text in memory with TTL-based expiration.
21 """
23 def __init__(self, ttl_seconds: int):
24 """
25 Initialize the store.
27 Args:
28 ttl_seconds: How long to keep data before expiration
29 """
30 self.ttl = ttl_seconds
31 self._store: Dict[str, Dict] = {}
32 self._lock = Lock()
34 def _store_credentials(self, key: str, data: Dict[str, str]) -> None:
35 """
36 Store credentials with expiration.
38 Args:
39 key: Storage key
40 data: Dictionary with 'username' and 'password'
41 """
42 with self._lock:
43 # Store with expiration (plain text)
44 self._store[key] = {
45 "username": data["username"],
46 "password": data["password"],
47 "expires_at": time.time() + self.ttl,
48 }
50 # Clean up expired entries
51 self._cleanup_expired()
53 def _retrieve_credentials(
54 self, key: str, remove: bool = False
55 ) -> Optional[Tuple[str, str]]:
56 """
57 Retrieve stored credentials.
59 Args:
60 key: Storage key
61 remove: Whether to remove after retrieval
63 Returns:
64 Tuple of (username, password) or None if not found/expired
65 """
66 with self._lock:
67 if key not in self._store:
68 return None
70 entry = self._store[key]
72 # Check expiration
73 if time.time() > entry["expires_at"]:
74 del self._store[key]
75 return None
77 # Get data
78 username = entry["username"]
79 password = entry["password"]
81 # Remove if requested
82 if remove:
83 del self._store[key]
85 return username, password
87 def _cleanup_expired(self):
88 """Remove expired entries."""
89 current_time = time.time()
90 expired_keys = [
91 key
92 for key, entry in self._store.items()
93 if current_time > entry["expires_at"]
94 ]
95 for key in expired_keys:
96 del self._store[key]
98 def clear_entry(self, key: str) -> None:
99 """
100 Clear a specific entry.
102 Args:
103 key: Storage key
104 """
105 with self._lock:
106 if key in self._store: 106 ↛ exitline 106 didn't jump to the function exit
107 del self._store[key]
109 @abstractmethod
110 def store(self, *args, **kwargs):
111 """Store credentials - to be implemented by subclasses."""
112 pass
114 @abstractmethod
115 def retrieve(self, *args, **kwargs):
116 """Retrieve credentials - to be implemented by subclasses."""
117 pass