Coverage for src / local_deep_research / database / credential_store_base.py: 94%

41 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Base class for credential storage with TTL. 

3Provides common functionality for storing credentials with TTL expiration. 

4 

5NOTE: Credentials are stored in plain text in memory. The encryption 

6functionality was removed as it provided no real security benefit - 

7if an attacker can read process memory, they have sufficient privileges 

8to bypass any protections anyway. 

9""" 

10 

11import time 

12from abc import ABC, abstractmethod 

13from threading import Lock 

14from typing import Dict, Optional, Tuple 

15 

16 

17class CredentialStoreBase(ABC): 

18 """ 

19 Base class for storing credentials with expiration. 

20 Credentials are stored in plain text in memory with TTL-based expiration. 

21 """ 

22 

23 def __init__(self, ttl_seconds: int): 

24 """ 

25 Initialize the store. 

26 

27 Args: 

28 ttl_seconds: How long to keep data before expiration 

29 """ 

30 self.ttl = ttl_seconds 

31 self._store: Dict[str, Dict] = {} 

32 self._lock = Lock() 

33 

34 def _store_credentials(self, key: str, data: Dict[str, str]) -> None: 

35 """ 

36 Store credentials with expiration. 

37 

38 Args: 

39 key: Storage key 

40 data: Dictionary with 'username' and 'password' 

41 """ 

42 with self._lock: 

43 # Store with expiration (plain text) 

44 self._store[key] = { 

45 "username": data["username"], 

46 "password": data["password"], 

47 "expires_at": time.time() + self.ttl, 

48 } 

49 

50 # Clean up expired entries 

51 self._cleanup_expired() 

52 

53 def _retrieve_credentials( 

54 self, key: str, remove: bool = False 

55 ) -> Optional[Tuple[str, str]]: 

56 """ 

57 Retrieve stored credentials. 

58 

59 Args: 

60 key: Storage key 

61 remove: Whether to remove after retrieval 

62 

63 Returns: 

64 Tuple of (username, password) or None if not found/expired 

65 """ 

66 with self._lock: 

67 if key not in self._store: 

68 return None 

69 

70 entry = self._store[key] 

71 

72 # Check expiration 

73 if time.time() > entry["expires_at"]: 

74 del self._store[key] 

75 return None 

76 

77 # Get data 

78 username = entry["username"] 

79 password = entry["password"] 

80 

81 # Remove if requested 

82 if remove: 

83 del self._store[key] 

84 

85 return username, password 

86 

87 def _cleanup_expired(self): 

88 """Remove expired entries.""" 

89 current_time = time.time() 

90 expired_keys = [ 

91 key 

92 for key, entry in self._store.items() 

93 if current_time > entry["expires_at"] 

94 ] 

95 for key in expired_keys: 

96 del self._store[key] 

97 

98 def clear_entry(self, key: str) -> None: 

99 """ 

100 Clear a specific entry. 

101 

102 Args: 

103 key: Storage key 

104 """ 

105 with self._lock: 

106 if key in self._store: 106 ↛ exitline 106 didn't jump to the function exit

107 del self._store[key] 

108 

109 @abstractmethod 

110 def store(self, *args, **kwargs): 

111 """Store credentials - to be implemented by subclasses.""" 

112 pass 

113 

114 @abstractmethod 

115 def retrieve(self, *args, **kwargs): 

116 """Retrieve credentials - to be implemented by subclasses.""" 

117 pass