Coverage for src / local_deep_research / database / backup / backup_service.py: 89%
219 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Core backup service for encrypted database backups.
3Uses sqlcipher_export() for safe atomic backups that preserve encryption
4and work correctly with WAL mode.
5"""
7import os
8import shutil
9import threading
10from dataclasses import dataclass
11from datetime import UTC, datetime, timedelta
12from pathlib import Path
13from typing import Optional
15from loguru import logger
17from ...utilities.resource_utils import safe_close
19from ...config.paths import (
20 get_encrypted_database_path,
21 get_user_backup_directory,
22 get_user_database_filename,
23)
24from ..sqlcipher_utils import (
25 apply_sqlcipher_pragmas,
26 create_sqlcipher_connection,
27 get_key_from_password,
28 get_sqlcipher_settings,
29 set_sqlcipher_key,
30 verify_sqlcipher_connection,
31)
33# Module-level per-user locks to prevent concurrent backup operations
34# for the same user across different BackupService instances
35_user_locks: dict[str, threading.Lock] = {}
36_user_locks_lock = threading.Lock()
39def _get_user_lock(username: str) -> threading.Lock:
40 """Get or create a lock for a specific user.
42 Thread-safe lazy initialization of per-user locks.
44 Args:
45 username: The username to get lock for
47 Returns:
48 A threading.Lock for the specified user
49 """
50 with _user_locks_lock:
51 if username not in _user_locks:
52 _user_locks[username] = threading.Lock()
53 return _user_locks[username]
56@dataclass
57class BackupResult:
58 """Result of a backup operation."""
60 success: bool
61 backup_path: Optional[Path] = None
62 error: Optional[str] = None
63 size_bytes: int = 0
66class BackupService:
67 """Service for creating and managing encrypted database backups.
69 Uses sqlcipher_export() for safe backups that:
70 - Work correctly with WAL mode
71 - Preserve encryption with the same key
72 - Create atomic copies via ATTACH + export + DETACH
73 - Never corrupt the source database
74 """
76 def __init__(
77 self,
78 username: str,
79 password: str,
80 max_backups: int = 1,
81 max_age_days: int = 7,
82 ):
83 """Initialize backup service.
85 Args:
86 username: User's username
87 password: User's password (for encryption)
88 max_backups: Maximum number of backup files to keep
89 max_age_days: Delete backups older than this many days
90 """
91 self.username = username
92 self.password = password
93 self.max_backups = max_backups
94 self.max_age_days = max_age_days
96 # Get paths
97 self.db_filename = get_user_database_filename(username)
98 self.db_path = get_encrypted_database_path() / self.db_filename
99 self.backup_dir = get_user_backup_directory(username)
101 def create_backup(self, force: bool = False) -> BackupResult:
102 """Create an encrypted backup of the user's database.
104 Uses sqlcipher_export() to create a safe, atomic backup that inherits
105 the encryption key from the source database. The backup is created
106 with a .tmp suffix and atomically renamed to prevent race conditions
107 with cleanup operations.
109 By default, only one backup per calendar day is created to prevent
110 a corrupted database from rapidly overwriting all good backups.
111 Use force=True to bypass this check (used by pre-migration backups).
113 This method is protected by a per-user lock to prevent concurrent
114 backup operations for the same user.
116 Args:
117 force: If True, skip the daily limit check.
119 Returns:
120 BackupResult with success status and backup path
121 """
122 # Acquire per-user lock to prevent concurrent backup operations
123 with _get_user_lock(self.username):
124 # Skip if a backup already exists for today (unless forced)
125 if not force:
126 today = datetime.now(UTC).strftime("%Y%m%d")
127 existing_today = list(
128 self.backup_dir.glob(f"ldr_backup_{today}_*.db")
129 )
130 if existing_today:
131 latest = max(existing_today, key=lambda p: p.name)
132 logger.debug(
133 f"Backup already exists for today ({latest.name}), "
134 "skipping"
135 )
136 return BackupResult(
137 success=True,
138 backup_path=latest,
139 size_bytes=latest.stat().st_size
140 if latest.exists()
141 else 0,
142 )
144 return self._create_backup_impl()
146 def _create_backup_impl(self) -> BackupResult:
147 """Internal implementation of backup creation (must be called with lock held)."""
148 if not self.db_path.exists():
149 return BackupResult(
150 success=False,
151 error=f"Database not found: {self.db_path}",
152 )
154 # Check available disk space
155 try:
156 db_size = self.db_path.stat().st_size
157 free_space = shutil.disk_usage(self.backup_dir).free
158 # Require at least 2x the database size as free space
159 if free_space < db_size * 2:
160 return BackupResult(
161 success=False,
162 error=f"Insufficient disk space. Need {db_size * 2} bytes, have {free_space}",
163 )
164 except OSError as e:
165 # Fail closed - don't proceed with backup if we can't verify disk space
166 logger.warning("Could not check disk space, skipping backup")
167 return BackupResult(
168 success=False,
169 error=f"Could not verify disk space: {e}",
170 )
172 # Generate backup filename with timestamp
173 # Use .tmp suffix during creation to prevent cleanup race conditions
174 timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
175 backup_filename = f"ldr_backup_{timestamp}.db"
176 backup_path = self.backup_dir / backup_filename
177 temp_path = self.backup_dir / f"ldr_backup_{timestamp}.db.tmp"
179 try:
180 # Create connection to source database
181 conn = create_sqlcipher_connection(str(self.db_path), self.password)
182 cursor = conn.cursor()
184 # Set busy timeout so concurrent writers don't cause instant failure
185 cursor.execute("PRAGMA busy_timeout = 10000")
187 try:
188 # Use sqlcipher_export() to create an encrypted backup
189 # VACUUM INTO doesn't preserve encryption in SQLCipher
190 # Security: validate temp_path doesn't contain SQL injection chars
191 temp_path_str = str(temp_path)
192 if "'" in temp_path_str or '"' in temp_path_str:
193 raise ValueError(
194 f"Invalid characters in backup path: {temp_path_str}"
195 )
197 # Get the hex key for ATTACH (same key derivation as source)
198 hex_key = get_key_from_password(
199 self.password, db_path=self.db_path
200 ).hex()
202 # Defensive: ensure hex_key is strictly hexadecimal
203 if not hex_key or not all( 203 ↛ 206line 203 didn't jump to line 206 because the condition on line 203 was never true
204 c in "0123456789abcdef" for c in hex_key
205 ):
206 raise ValueError("Derived key is not valid hex")
208 # Attach backup database with encryption (using temp path)
209 # Note: ATTACH DATABASE does not support parameter binding
210 # in SQLite/SQLCipher — f-string is required here.
211 cursor.execute(
212 f"ATTACH DATABASE '{temp_path_str}' AS backup KEY \"x'{hex_key}'\""
213 )
215 try:
216 # Apply cipher settings to the backup database (must match source)
217 # Note: PRAGMA statements do not support parameter binding
218 # in SQLite — f-string is required. Values are validated
219 # upstream by get_sqlcipher_settings() against allow-lists.
220 settings = get_sqlcipher_settings()
221 page_size = int(settings["page_size"])
222 kdf_iter = int(settings["kdf_iterations"])
223 hmac_alg = str(settings["hmac_algorithm"])
224 cursor.execute(
225 f"PRAGMA backup.cipher_page_size = {page_size}"
226 )
227 cursor.execute(
228 f"PRAGMA backup.cipher_hmac_algorithm = {hmac_alg}"
229 )
230 cursor.execute(f"PRAGMA backup.kdf_iter = {kdf_iter}")
232 # Export all data to the backup database
233 cursor.execute("SELECT sqlcipher_export('backup')")
234 finally:
235 # Always detach to release the backup file handle
236 try:
237 cursor.execute("DETACH DATABASE backup")
238 except Exception:
239 logger.warning(
240 "DETACH failed (connection will release on close)"
241 )
242 finally:
243 safe_close(cursor, "backup cursor")
244 safe_close(conn, "backup connection")
246 # Verify the backup is valid (still using temp path)
247 if not self._verify_backup(temp_path):
248 # Delete corrupted backup
249 if temp_path.exists():
250 temp_path.unlink()
251 return BackupResult(
252 success=False,
253 error="Backup verification failed - backup was corrupted",
254 )
256 # Set restrictive permissions (owner read/write only)
257 # SECURITY: Backup files contain sensitive user data
258 os.chmod(temp_path, 0o600)
260 # Get backup size before rename
261 backup_size = temp_path.stat().st_size
263 # Atomic rename from .tmp to final .db
264 # This ensures cleanup won't see/delete partially created backups
265 temp_path.rename(backup_path)
267 logger.info(
268 f"Created backup for user: {backup_path.name} ({backup_size} bytes)"
269 )
271 # Cleanup old backups (safe now - new backup is finalized)
272 self._cleanup_old_backups()
274 return BackupResult(
275 success=True,
276 backup_path=backup_path,
277 size_bytes=backup_size,
278 )
280 except Exception as e:
281 logger.exception("Backup creation failed")
282 # Clean up any partial backup (temp file)
283 if temp_path.exists():
284 try:
285 temp_path.unlink()
286 except OSError:
287 pass
288 # Also clean up final path in case rename partially succeeded
289 if backup_path.exists(): 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 try:
291 backup_path.unlink()
292 except OSError:
293 pass
294 return BackupResult(
295 success=False,
296 error=str(e),
297 )
299 def _verify_backup(self, backup_path: Path) -> bool:
300 """Verify that a backup file is valid and readable.
302 Args:
303 backup_path: Path to the backup file
305 Returns:
306 True if backup is valid, False otherwise
307 """
308 if not backup_path.exists():
309 return False
311 if backup_path.stat().st_size == 0:
312 logger.warning("Backup file is empty (0 bytes)")
313 return False
315 try:
316 # Import SQLCipher module
317 from ..sqlcipher_compat import get_sqlcipher_module
319 sqlcipher3 = get_sqlcipher_module()
321 # Open the backup with the same password
322 conn = sqlcipher3.connect(str(backup_path))
323 cursor = conn.cursor()
325 try:
326 # Set encryption key using the SOURCE database's salt
327 # (backup was encrypted with the source DB's per-database salt)
328 set_sqlcipher_key(cursor, self.password, db_path=self.db_path)
329 apply_sqlcipher_pragmas(cursor, creation_mode=False)
331 # Run quick integrity check
332 cursor.execute("PRAGMA quick_check")
333 result = cursor.fetchone()
335 if result and result[0] == "ok":
336 # Additional verification: try to read a table
337 if verify_sqlcipher_connection(cursor): 337 ↛ 340line 337 didn't jump to line 340 because the condition on line 337 was always true
338 return True
340 logger.warning(f"Backup integrity check failed: {result}")
341 return False
343 finally:
344 safe_close(cursor, "backup cursor")
345 safe_close(conn, "backup connection")
347 except Exception:
348 logger.warning("Backup verification failed")
349 return False
351 def _cleanup_old_backups(self) -> int:
352 """Remove old backups based on age and count limits.
354 Also cleans up stale .tmp files from interrupted backups.
356 Returns:
357 Number of backups deleted
358 """
359 deleted_count = 0
360 cutoff_time = datetime.now(UTC) - timedelta(days=self.max_age_days)
361 stale_tmp_cutoff = datetime.now(UTC) - timedelta(hours=1)
363 try:
364 # Clean up stale .tmp files from interrupted/crashed backups
365 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"):
366 try:
367 mtime = datetime.fromtimestamp(
368 tmp_file.stat().st_mtime, tz=UTC
369 )
370 if mtime < stale_tmp_cutoff:
371 tmp_file.unlink()
372 logger.info(
373 f"Cleaned up stale temp file: {tmp_file.name}"
374 )
375 except (OSError, FileNotFoundError):
376 pass
378 # Get all backup files sorted by modification time (newest first)
379 def _safe_mtime(p: Path) -> float:
380 try:
381 return p.stat().st_mtime
382 except FileNotFoundError:
383 return 0.0
385 backups = [
386 p
387 for p in sorted(
388 self.backup_dir.glob("ldr_backup_*.db"),
389 key=_safe_mtime,
390 reverse=True,
391 )
392 if p.exists()
393 ]
395 for i, backup in enumerate(backups):
396 should_delete = False
398 # Delete if beyond max count
399 if i >= self.max_backups:
400 should_delete = True
401 reason = f"exceeds max count ({self.max_backups})"
403 # Delete if too old
404 else:
405 try:
406 mtime = datetime.fromtimestamp(
407 backup.stat().st_mtime, tz=UTC
408 )
409 if mtime < cutoff_time:
410 should_delete = True
411 reason = f"older than {self.max_age_days} days"
412 except FileNotFoundError:
413 continue
415 if should_delete:
416 try:
417 backup.unlink()
418 deleted_count += 1
419 logger.debug(
420 f"Deleted old backup {backup.name}: {reason}"
421 )
422 except OSError:
423 logger.warning(f"Could not delete backup {backup.name}")
425 except Exception:
426 logger.exception("Error during backup cleanup")
428 if deleted_count > 0:
429 logger.info(f"Cleaned up {deleted_count} old backups")
431 return deleted_count
433 def list_backups(self) -> list[dict]:
434 """List all backups for this user.
436 Returns:
437 List of backup info dictionaries with path, size, and timestamp
438 """
439 backups = []
441 try:
443 def _safe_mtime_list(p: Path) -> float:
444 try:
445 return p.stat().st_mtime
446 except FileNotFoundError:
447 return 0.0
449 for backup_file in sorted(
450 self.backup_dir.glob("ldr_backup_*.db"),
451 key=_safe_mtime_list,
452 reverse=True,
453 ):
454 try:
455 stat = backup_file.stat()
456 except FileNotFoundError:
457 continue
458 backups.append(
459 {
460 "filename": backup_file.name,
461 "path": str(backup_file),
462 "size_bytes": stat.st_size,
463 "created_at": datetime.fromtimestamp(
464 stat.st_mtime, tz=UTC
465 ).isoformat(),
466 }
467 )
468 except Exception:
469 logger.exception("Error listing backups")
471 return backups
473 def purge_and_refresh(self) -> "BackupResult":
474 """Delete all existing backups and create a fresh one.
476 Used after a password change to replace old-key backups with a
477 new backup encrypted under the current password. Old backups
478 encrypted with a previous password are a security risk (NIST
479 SP 800-57, OWASP A02) because they remain decryptable with the
480 old (potentially compromised) password.
482 Returns:
483 BackupResult from the fresh backup creation
484 """
485 # Hold per-user lock for the entire purge+create operation to
486 # prevent a concurrent backup from writing an old-key backup
487 # between the purge and the fresh backup creation.
488 with _get_user_lock(self.username):
489 # Delete all existing backup files
490 for info in self.list_backups():
491 try:
492 Path(info["path"]).unlink()
493 logger.debug(f"Purged old-key backup: {info['filename']}")
494 except OSError:
495 logger.warning(
496 f"Could not delete backup {info['filename']}"
497 )
499 # Also clean up any stale .tmp files
500 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"):
501 try:
502 tmp_file.unlink()
503 except OSError:
504 logger.warning(
505 f"Could not delete stale tmp file {tmp_file.name}"
506 )
508 # Create fresh backup with current password (lock already held)
509 return self._create_backup_impl()
511 def get_latest_backup(self) -> Optional[Path]:
512 """Get the path to the most recent backup.
514 Returns:
515 Path to latest backup, or None if no backups exist
516 """
517 try:
519 def _safe_mtime_latest(p: Path) -> float:
520 try:
521 return p.stat().st_mtime
522 except FileNotFoundError:
523 return 0.0
525 backups = [
526 p
527 for p in sorted(
528 self.backup_dir.glob("ldr_backup_*.db"),
529 key=_safe_mtime_latest,
530 reverse=True,
531 )
532 if p.exists()
533 ]
534 return backups[0] if backups else None
535 except Exception:
536 logger.exception("Error finding latest backup")
537 return None