Coverage for src / local_deep_research / database / backup / backup_scheduler.py: 99%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Background scheduler for database backups. 

2 

3Runs backups in a thread pool to avoid blocking the login flow. 

4""" 

5 

6import atexit 

7import threading 

8from concurrent.futures import ThreadPoolExecutor 

9from typing import Optional 

10 

11from loguru import logger 

12 

13from .backup_service import BackupResult, BackupService 

14 

15 

16class BackupScheduler: 

17 """Singleton scheduler for running database backups in background threads. 

18 

19 Uses ThreadPoolExecutor to run backups asynchronously without blocking 

20 the login flow. 

21 """ 

22 

23 _instance: Optional["BackupScheduler"] = None 

24 _lock = threading.Lock() 

25 

26 def __new__(cls) -> "BackupScheduler": 

27 """Ensure singleton instance.""" 

28 if cls._instance is None: 

29 with cls._lock: 

30 if cls._instance is None: 30 ↛ 32line 30 didn't jump to line 32

31 cls._instance = super().__new__(cls) 

32 return cls._instance 

33 

34 def __init__(self) -> None: 

35 """Initialize the scheduler (only runs once due to singleton).""" 

36 if hasattr(self, "_initialized"): 

37 return 

38 

39 # Thread pool for running backups 

40 # Max 2 workers to limit concurrent backup operations 

41 self._executor = ThreadPoolExecutor( 

42 max_workers=2, 

43 thread_name_prefix="backup_worker", 

44 ) 

45 

46 # Track pending backups to avoid duplicates 

47 self._pending_backups: set[str] = set() 

48 self._pending_lock = threading.Lock() 

49 

50 self._initialized = True 

51 

52 # Register atexit handler to ensure clean shutdown 

53 atexit.register(self.shutdown) 

54 logger.info("Backup scheduler initialized") 

55 

56 def schedule_backup( 

57 self, 

58 username: str, 

59 password: str, 

60 max_backups: int = 1, 

61 max_age_days: int = 7, 

62 ) -> bool: 

63 """Schedule a background backup for a user. 

64 

65 This method returns immediately without waiting for the backup 

66 to complete. 

67 

68 Args: 

69 username: User's username 

70 password: User's password (for encryption) 

71 max_backups: Maximum number of backups to keep 

72 max_age_days: Delete backups older than this many days 

73 

74 Returns: 

75 True if backup was scheduled, False if already pending 

76 """ 

77 with self._pending_lock: 

78 if username in self._pending_backups: 

79 logger.debug("Backup already pending for user, skipping") 

80 return False 

81 self._pending_backups.add(username) 

82 

83 # Submit backup to thread pool 

84 future = self._executor.submit( 

85 self._run_backup, 

86 username, 

87 password, 

88 max_backups, 

89 max_age_days, 

90 ) 

91 

92 # Add callback to remove from pending set when done 

93 future.add_done_callback(lambda f: self._backup_completed(username, f)) 

94 

95 logger.debug("Background backup scheduled for user") 

96 return True 

97 

98 def _run_backup( 

99 self, 

100 username: str, 

101 password: str, 

102 max_backups: int, 

103 max_age_days: int, 

104 ) -> BackupResult: 

105 """Run the actual backup operation. 

106 

107 Args: 

108 username: User's username 

109 password: User's password 

110 max_backups: Maximum number of backups to keep 

111 max_age_days: Delete backups older than this many days 

112 

113 Returns: 

114 BackupResult from the backup operation 

115 """ 

116 try: 

117 service = BackupService( 

118 username=username, 

119 password=password, 

120 max_backups=max_backups, 

121 max_age_days=max_age_days, 

122 ) 

123 result = service.create_backup() 

124 

125 if result.success: 

126 logger.info( 

127 f"Background backup completed: {result.backup_path.name if result.backup_path else 'unknown'}" 

128 ) 

129 else: 

130 logger.warning(f"Background backup failed: {result.error}") 

131 

132 return result 

133 

134 except Exception as e: 

135 logger.exception("Background backup error") 

136 return BackupResult(success=False, error=str(e)) 

137 

138 def _backup_completed(self, username: str, future) -> None: 

139 """Callback when a backup completes. 

140 

141 Args: 

142 username: User whose backup completed 

143 future: The completed future 

144 """ 

145 with self._pending_lock: 

146 self._pending_backups.discard(username) 

147 

148 # Log any exceptions that weren't caught 

149 try: 

150 future.result() 

151 except Exception: 

152 logger.exception("Unhandled backup exception") 

153 

154 def shutdown(self, wait: bool = True) -> None: 

155 """Shutdown the scheduler. 

156 

157 Args: 

158 wait: If True, wait for pending backups to complete 

159 """ 

160 logger.info("Shutting down backup scheduler") 

161 self._executor.shutdown(wait=wait) 

162 

163 def get_pending_count(self) -> int: 

164 """Get number of pending backups. 

165 

166 Returns: 

167 Number of backups currently in progress 

168 """ 

169 with self._pending_lock: 

170 return len(self._pending_backups) 

171 

172 

173def get_backup_scheduler() -> BackupScheduler: 

174 """Get the singleton backup scheduler instance. 

175 

176 Returns: 

177 The BackupScheduler singleton (thread-safe via __new__) 

178 """ 

179 return BackupScheduler()