Coverage for src/local_deep_research/database/backup/backup_service.py: 89%
230 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""Core backup service for encrypted database backups.
3Uses sqlcipher_export() for safe atomic backups that preserve encryption
4and work correctly with WAL mode.
5"""
7import os
8import shutil
9import threading
10import time
11from dataclasses import dataclass
12from datetime import UTC, datetime, timedelta
13from pathlib import Path
14from typing import Optional
16from loguru import logger
18from ...utilities.resource_utils import safe_close
20from ...config.paths import (
21 get_encrypted_database_path,
22 get_user_backup_directory,
23 get_user_database_filename,
24)
25from ..sqlcipher_utils import (
26 apply_sqlcipher_pragmas,
27 create_sqlcipher_connection,
28 get_key_from_password,
29 get_sqlcipher_settings,
30 set_sqlcipher_key,
31 verify_sqlcipher_connection,
32)
34# Module-level per-user locks to prevent concurrent backup operations
35# for the same user across different BackupService instances
36_user_locks: dict[str, threading.Lock] = {}
37_user_locks_lock = threading.Lock()
40def _get_user_lock(username: str) -> threading.Lock:
41 """Get or create a lock for a specific user.
43 Thread-safe lazy initialization of per-user locks.
45 Args:
46 username: The username to get lock for
48 Returns:
49 A threading.Lock for the specified user
50 """
51 with _user_locks_lock:
52 if username not in _user_locks:
53 _user_locks[username] = threading.Lock()
54 return _user_locks[username]
57def pop_user_lock(username: str) -> None:
58 """Remove the per-user backup lock for ``username`` from the registry.
60 Called from the user-close path so the module-level dict doesn't
61 accumulate one entry per username across the process lifetime. The
62 next backup operation lazily re-creates the lock if needed — the
63 lock has no state that needs to persist across login/logout.
64 """
65 with _user_locks_lock:
66 _user_locks.pop(username, None)
69@dataclass
70class BackupResult:
71 """Result of a backup operation."""
73 success: bool
74 backup_path: Optional[Path] = None
75 error: Optional[str] = None
76 size_bytes: int = 0
79class BackupService:
80 """Service for creating and managing encrypted database backups.
82 Uses sqlcipher_export() for safe backups that:
83 - Work correctly with WAL mode
84 - Preserve encryption with the same key
85 - Create atomic copies via ATTACH + export + DETACH
86 - Never corrupt the source database
87 """
89 def __init__(
90 self,
91 username: str,
92 password: str,
93 max_backups: int = 1,
94 max_age_days: int = 7,
95 ):
96 """Initialize backup service.
98 Args:
99 username: User's username
100 password: User's password (for encryption)
101 max_backups: Maximum number of backup files to keep
102 max_age_days: Delete backups older than this many days
103 """
104 self.username = username
105 self.password = password
106 self.max_backups = max_backups
107 self.max_age_days = max_age_days
109 # Get paths
110 self.db_filename = get_user_database_filename(username)
111 self.db_path = get_encrypted_database_path() / self.db_filename
112 self.backup_dir = get_user_backup_directory(username)
114 def create_backup(self, force: bool = False) -> BackupResult:
115 """Create an encrypted backup of the user's database.
117 Uses sqlcipher_export() to create a safe, atomic backup that inherits
118 the encryption key from the source database. The backup is created
119 with a .tmp suffix and atomically renamed to prevent race conditions
120 with cleanup operations.
122 By default, only one backup per calendar day is created to prevent
123 a corrupted database from rapidly overwriting all good backups.
124 Use force=True to bypass this check (used by pre-migration backups).
126 This method is protected by a per-user lock to prevent concurrent
127 backup operations for the same user.
129 Args:
130 force: If True, skip the daily limit check.
132 Returns:
133 BackupResult with success status and backup path
134 """
135 # Acquire per-user lock to prevent concurrent backup operations
136 with _get_user_lock(self.username):
137 # Skip if a backup already exists for today (unless forced)
138 if not force:
139 today = datetime.now(UTC).strftime("%Y%m%d")
140 existing_today = list(
141 self.backup_dir.glob(f"ldr_backup_{today}_*.db")
142 )
143 if existing_today:
144 latest = max(existing_today, key=lambda p: p.name)
145 logger.debug(
146 f"Backup already exists for today ({latest.name}), "
147 "skipping"
148 )
149 return BackupResult(
150 success=True,
151 backup_path=latest,
152 size_bytes=latest.stat().st_size
153 if latest.exists()
154 else 0,
155 )
157 start = time.perf_counter()
158 result = self._create_backup_impl()
159 elapsed_ms = (time.perf_counter() - start) * 1000
160 size_info = (
161 f"{result.size_bytes / (1024 * 1024):.1f}MB"
162 if result.size_bytes
163 else "unknown size"
164 )
165 if elapsed_ms > 1000:
166 logger.info(
167 f"Backup for user {self.username} "
168 f"({size_info}) took {elapsed_ms:.0f}ms"
169 )
170 else:
171 logger.debug(
172 f"Backup for user {self.username} "
173 f"({size_info}) took {elapsed_ms:.0f}ms"
174 )
175 return result
177 def _create_backup_impl(self) -> BackupResult:
178 """Internal implementation of backup creation (must be called with lock held)."""
179 if not self.db_path.exists():
180 return BackupResult(
181 success=False,
182 error=f"Database not found: {self.db_path}",
183 )
185 # Check available disk space
186 try:
187 db_size = self.db_path.stat().st_size
188 free_space = shutil.disk_usage(self.backup_dir).free
189 # Require at least 2x the database size as free space
190 if free_space < db_size * 2:
191 return BackupResult(
192 success=False,
193 error=f"Insufficient disk space. Need {db_size * 2} bytes, have {free_space}",
194 )
195 except OSError as e:
196 # Fail closed - don't proceed with backup if we can't verify disk space
197 logger.warning("Could not check disk space, skipping backup")
198 return BackupResult(
199 success=False,
200 error=f"Could not verify disk space: {e}",
201 )
203 # Generate backup filename with timestamp
204 # Use .tmp suffix during creation to prevent cleanup race conditions
205 timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
206 backup_filename = f"ldr_backup_{timestamp}.db"
207 backup_path = self.backup_dir / backup_filename
208 temp_path = self.backup_dir / f"ldr_backup_{timestamp}.db.tmp"
210 try:
211 # Create connection to source database
212 conn = create_sqlcipher_connection(str(self.db_path), self.password)
213 cursor = conn.cursor()
215 # Set busy timeout so concurrent writers don't cause instant failure
216 cursor.execute("PRAGMA busy_timeout = 10000")
218 try:
219 # Use sqlcipher_export() to create an encrypted backup
220 # VACUUM INTO doesn't preserve encryption in SQLCipher
221 # Security: validate temp_path doesn't contain SQL injection chars
222 temp_path_str = str(temp_path)
223 if "'" in temp_path_str or '"' in temp_path_str:
224 raise ValueError(
225 f"Invalid characters in backup path: {temp_path_str}"
226 )
228 # Get the hex key for ATTACH (same key derivation as source)
229 hex_key = get_key_from_password(
230 self.password, db_path=self.db_path
231 ).hex()
233 # Defensive: ensure hex_key is strictly hexadecimal
234 if not hex_key or not all( 234 ↛ 237line 234 didn't jump to line 237 because the condition on line 234 was never true
235 c in "0123456789abcdef" for c in hex_key
236 ):
237 raise ValueError("Derived key is not valid hex")
239 # Attach backup database with encryption (using temp path)
240 # Note: ATTACH DATABASE does not support parameter binding
241 # in SQLite/SQLCipher — f-string is required here.
242 cursor.execute(
243 f"ATTACH DATABASE '{temp_path_str}' AS backup KEY \"x'{hex_key}'\""
244 )
246 try:
247 # Apply cipher settings to the backup database (must match source)
248 # Note: PRAGMA statements do not support parameter binding
249 # in SQLite — f-string is required. Values are validated
250 # upstream by get_sqlcipher_settings() against allow-lists.
251 settings = get_sqlcipher_settings()
252 page_size = int(settings["page_size"])
253 kdf_iter = int(settings["kdf_iterations"])
254 hmac_alg = str(settings["hmac_algorithm"])
255 cursor.execute(
256 f"PRAGMA backup.cipher_page_size = {page_size}"
257 )
258 cursor.execute(
259 f"PRAGMA backup.cipher_hmac_algorithm = {hmac_alg}"
260 )
261 cursor.execute(f"PRAGMA backup.kdf_iter = {kdf_iter}")
263 # Export all data to the backup database
264 cursor.execute("SELECT sqlcipher_export('backup')")
265 finally:
266 # Always detach to release the backup file handle
267 try:
268 cursor.execute("DETACH DATABASE backup")
269 except Exception:
270 logger.warning(
271 "DETACH failed (connection will release on close)"
272 )
273 finally:
274 safe_close(cursor, "backup cursor")
275 safe_close(conn, "backup connection")
277 # Verify the backup is valid (still using temp path)
278 if not self._verify_backup(temp_path):
279 # Delete corrupted backup
280 if temp_path.exists():
281 temp_path.unlink()
282 return BackupResult(
283 success=False,
284 error="Backup verification failed - backup was corrupted",
285 )
287 # Set restrictive permissions (owner read/write only)
288 # SECURITY: Backup files contain sensitive user data
289 os.chmod(temp_path, 0o600)
291 # Get backup size before rename
292 backup_size = temp_path.stat().st_size
294 # Atomic rename from .tmp to final .db
295 # This ensures cleanup won't see/delete partially created backups
296 temp_path.rename(backup_path)
298 logger.info(
299 f"Created backup for user: {backup_path.name} ({backup_size} bytes)"
300 )
302 # Cleanup old backups (safe now - new backup is finalized)
303 self._cleanup_old_backups()
305 return BackupResult(
306 success=True,
307 backup_path=backup_path,
308 size_bytes=backup_size,
309 )
311 except Exception as e:
312 logger.exception("Backup creation failed")
313 # Clean up any partial backup (temp file)
314 if temp_path.exists(): 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 try:
316 temp_path.unlink()
317 except OSError:
318 pass
319 # Also clean up final path in case rename partially succeeded
320 if backup_path.exists(): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 try:
322 backup_path.unlink()
323 except OSError:
324 pass
325 return BackupResult(
326 success=False,
327 error=str(e),
328 )
330 def _verify_backup(self, backup_path: Path) -> bool:
331 """Verify that a backup file is valid and readable.
333 Args:
334 backup_path: Path to the backup file
336 Returns:
337 True if backup is valid, False otherwise
338 """
339 if not backup_path.exists():
340 return False
342 if backup_path.stat().st_size == 0:
343 logger.warning("Backup file is empty (0 bytes)")
344 return False
346 try:
347 # Import SQLCipher module
348 from ..sqlcipher_compat import get_sqlcipher_module
350 sqlcipher3 = get_sqlcipher_module()
352 # Open the backup with the same password
353 conn = sqlcipher3.connect(str(backup_path))
354 cursor = conn.cursor()
356 try:
357 # Set encryption key using the SOURCE database's salt
358 # (backup was encrypted with the source DB's per-database salt)
359 set_sqlcipher_key(cursor, self.password, db_path=self.db_path)
360 apply_sqlcipher_pragmas(cursor, creation_mode=False)
362 # Run quick integrity check
363 cursor.execute("PRAGMA quick_check")
364 result = cursor.fetchone()
366 if result and result[0] == "ok":
367 # Additional verification: try to read a table
368 if verify_sqlcipher_connection(cursor): 368 ↛ 371line 368 didn't jump to line 371 because the condition on line 368 was always true
369 return True
371 logger.warning(f"Backup integrity check failed: {result}")
372 return False
374 finally:
375 safe_close(cursor, "backup cursor")
376 safe_close(conn, "backup connection")
378 except Exception:
379 logger.warning("Backup verification failed")
380 return False
382 def _cleanup_old_backups(self) -> int:
383 """Remove old backups based on age and count limits.
385 Also cleans up stale .tmp files from interrupted backups.
387 Returns:
388 Number of backups deleted
389 """
390 deleted_count = 0
391 cutoff_time = datetime.now(UTC) - timedelta(days=self.max_age_days)
392 stale_tmp_cutoff = datetime.now(UTC) - timedelta(hours=1)
394 try:
395 # Clean up stale .tmp files from interrupted/crashed backups
396 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"):
397 try:
398 mtime = datetime.fromtimestamp(
399 tmp_file.stat().st_mtime, tz=UTC
400 )
401 if mtime < stale_tmp_cutoff:
402 tmp_file.unlink()
403 logger.info(
404 f"Cleaned up stale temp file: {tmp_file.name}"
405 )
406 except (OSError, FileNotFoundError):
407 pass
409 # Get all backup files sorted by modification time (newest first)
410 def _safe_mtime(p: Path) -> float:
411 try:
412 return p.stat().st_mtime
413 except FileNotFoundError:
414 return 0.0
416 backups = [
417 p
418 for p in sorted(
419 self.backup_dir.glob("ldr_backup_*.db"),
420 key=_safe_mtime,
421 reverse=True,
422 )
423 if p.exists()
424 ]
426 for i, backup in enumerate(backups):
427 should_delete = False
429 # Delete if beyond max count
430 if i >= self.max_backups:
431 should_delete = True
432 reason = f"exceeds max count ({self.max_backups})"
434 # Delete if too old
435 else:
436 try:
437 mtime = datetime.fromtimestamp(
438 backup.stat().st_mtime, tz=UTC
439 )
440 if mtime < cutoff_time:
441 should_delete = True
442 reason = f"older than {self.max_age_days} days"
443 except FileNotFoundError:
444 continue
446 if should_delete:
447 try:
448 backup.unlink()
449 deleted_count += 1
450 logger.debug(
451 f"Deleted old backup {backup.name}: {reason}"
452 )
453 except OSError:
454 logger.warning(f"Could not delete backup {backup.name}")
456 except Exception:
457 logger.exception("Error during backup cleanup")
459 if deleted_count > 0:
460 logger.info(f"Cleaned up {deleted_count} old backups")
462 return deleted_count
464 def list_backups(self) -> list[dict]:
465 """List all backups for this user.
467 Returns:
468 List of backup info dictionaries with path, size, and timestamp
469 """
470 backups = []
472 try:
474 def _safe_mtime_list(p: Path) -> float:
475 try:
476 return p.stat().st_mtime
477 except FileNotFoundError:
478 return 0.0
480 for backup_file in sorted(
481 self.backup_dir.glob("ldr_backup_*.db"),
482 key=_safe_mtime_list,
483 reverse=True,
484 ):
485 try:
486 stat = backup_file.stat()
487 except FileNotFoundError:
488 continue
489 backups.append(
490 {
491 "filename": backup_file.name,
492 "path": str(backup_file),
493 "size_bytes": stat.st_size,
494 "created_at": datetime.fromtimestamp(
495 stat.st_mtime, tz=UTC
496 ).isoformat(),
497 }
498 )
499 except Exception:
500 logger.exception("Error listing backups")
502 return backups
504 def purge_and_refresh(self) -> "BackupResult":
505 """Delete all existing backups and create a fresh one.
507 Used after a password change to replace old-key backups with a
508 new backup encrypted under the current password. Old backups
509 encrypted with a previous password are a security risk (NIST
510 SP 800-57, OWASP A02) because they remain decryptable with the
511 old (potentially compromised) password.
513 Returns:
514 BackupResult from the fresh backup creation
515 """
516 # Hold per-user lock for the entire purge+create operation to
517 # prevent a concurrent backup from writing an old-key backup
518 # between the purge and the fresh backup creation.
519 with _get_user_lock(self.username):
520 # Delete all existing backup files
521 for info in self.list_backups():
522 try:
523 Path(info["path"]).unlink()
524 logger.debug(f"Purged old-key backup: {info['filename']}")
525 except OSError:
526 logger.warning(
527 f"Could not delete backup {info['filename']}"
528 )
530 # Also clean up any stale .tmp files
531 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"):
532 try:
533 tmp_file.unlink()
534 except OSError:
535 logger.warning(
536 f"Could not delete stale tmp file {tmp_file.name}"
537 )
539 # Create fresh backup with current password (lock already held)
540 return self._create_backup_impl()
542 def get_latest_backup(self) -> Optional[Path]:
543 """Get the path to the most recent backup.
545 Returns:
546 Path to latest backup, or None if no backups exist
547 """
548 try:
550 def _safe_mtime_latest(p: Path) -> float:
551 try:
552 return p.stat().st_mtime
553 except FileNotFoundError:
554 return 0.0
556 backups = [
557 p
558 for p in sorted(
559 self.backup_dir.glob("ldr_backup_*.db"),
560 key=_safe_mtime_latest,
561 reverse=True,
562 )
563 if p.exists()
564 ]
565 return backups[0] if backups else None
566 except Exception:
567 logger.exception("Error finding latest backup")
568 return None