Coverage for src / local_deep_research / database / backup / backup_scheduler.py: 99%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Background scheduler for database backups.
3Runs backups in a thread pool to avoid blocking the login flow.
4"""
6import atexit
7import threading
8from concurrent.futures import ThreadPoolExecutor
9from typing import Optional
11from loguru import logger
13from .backup_service import BackupResult, BackupService
16class BackupScheduler:
17 """Singleton scheduler for running database backups in background threads.
19 Uses ThreadPoolExecutor to run backups asynchronously without blocking
20 the login flow.
21 """
23 _instance: Optional["BackupScheduler"] = None
24 _lock = threading.Lock()
26 def __new__(cls) -> "BackupScheduler":
27 """Ensure singleton instance."""
28 if cls._instance is None:
29 with cls._lock:
30 if cls._instance is None: 30 ↛ 32line 30 didn't jump to line 32
31 cls._instance = super().__new__(cls)
32 return cls._instance
34 def __init__(self) -> None:
35 """Initialize the scheduler (only runs once due to singleton)."""
36 if hasattr(self, "_initialized"):
37 return
39 # Thread pool for running backups
40 # Max 2 workers to limit concurrent backup operations
41 self._executor = ThreadPoolExecutor(
42 max_workers=2,
43 thread_name_prefix="backup_worker",
44 )
46 # Track pending backups to avoid duplicates
47 self._pending_backups: set[str] = set()
48 self._pending_lock = threading.Lock()
50 self._initialized = True
52 # Register atexit handler to ensure clean shutdown
53 atexit.register(self.shutdown)
54 logger.info("Backup scheduler initialized")
56 def schedule_backup(
57 self,
58 username: str,
59 password: str,
60 max_backups: int = 1,
61 max_age_days: int = 7,
62 ) -> bool:
63 """Schedule a background backup for a user.
65 This method returns immediately without waiting for the backup
66 to complete.
68 Args:
69 username: User's username
70 password: User's password (for encryption)
71 max_backups: Maximum number of backups to keep
72 max_age_days: Delete backups older than this many days
74 Returns:
75 True if backup was scheduled, False if already pending
76 """
77 with self._pending_lock:
78 if username in self._pending_backups:
79 logger.debug("Backup already pending for user, skipping")
80 return False
81 self._pending_backups.add(username)
83 # Submit backup to thread pool
84 future = self._executor.submit(
85 self._run_backup,
86 username,
87 password,
88 max_backups,
89 max_age_days,
90 )
92 # Add callback to remove from pending set when done
93 future.add_done_callback(lambda f: self._backup_completed(username, f))
95 logger.debug("Background backup scheduled for user")
96 return True
98 def _run_backup(
99 self,
100 username: str,
101 password: str,
102 max_backups: int,
103 max_age_days: int,
104 ) -> BackupResult:
105 """Run the actual backup operation.
107 Args:
108 username: User's username
109 password: User's password
110 max_backups: Maximum number of backups to keep
111 max_age_days: Delete backups older than this many days
113 Returns:
114 BackupResult from the backup operation
115 """
116 try:
117 service = BackupService(
118 username=username,
119 password=password,
120 max_backups=max_backups,
121 max_age_days=max_age_days,
122 )
123 result = service.create_backup()
125 if result.success:
126 logger.info(
127 f"Background backup completed: {result.backup_path.name if result.backup_path else 'unknown'}"
128 )
129 else:
130 logger.warning(f"Background backup failed: {result.error}")
132 return result
134 except Exception as e:
135 logger.exception("Background backup error")
136 return BackupResult(success=False, error=str(e))
138 def _backup_completed(self, username: str, future) -> None:
139 """Callback when a backup completes.
141 Args:
142 username: User whose backup completed
143 future: The completed future
144 """
145 with self._pending_lock:
146 self._pending_backups.discard(username)
148 # Log any exceptions that weren't caught
149 try:
150 future.result()
151 except Exception:
152 logger.exception("Unhandled backup exception")
154 def shutdown(self, wait: bool = True) -> None:
155 """Shutdown the scheduler.
157 Args:
158 wait: If True, wait for pending backups to complete
159 """
160 logger.info("Shutting down backup scheduler")
161 self._executor.shutdown(wait=wait)
163 def get_pending_count(self) -> int:
164 """Get number of pending backups.
166 Returns:
167 Number of backups currently in progress
168 """
169 with self._pending_lock:
170 return len(self._pending_backups)
173def get_backup_scheduler() -> BackupScheduler:
174 """Get the singleton backup scheduler instance.
176 Returns:
177 The BackupScheduler singleton (thread-safe via __new__)
178 """
179 return BackupScheduler()