Coverage for src/local_deep_research/database/backup/backup_service.py: 89%

230 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""Core backup service for encrypted database backups. 

2 

3Uses sqlcipher_export() for safe atomic backups that preserve encryption 

4and work correctly with WAL mode. 

5""" 

6 

7import os 

8import shutil 

9import threading 

10import time 

11from dataclasses import dataclass 

12from datetime import UTC, datetime, timedelta 

13from pathlib import Path 

14from typing import Optional 

15 

16from loguru import logger 

17 

18from ...utilities.resource_utils import safe_close 

19 

20from ...config.paths import ( 

21 get_encrypted_database_path, 

22 get_user_backup_directory, 

23 get_user_database_filename, 

24) 

25from ..sqlcipher_utils import ( 

26 apply_sqlcipher_pragmas, 

27 create_sqlcipher_connection, 

28 get_key_from_password, 

29 get_sqlcipher_settings, 

30 set_sqlcipher_key, 

31 verify_sqlcipher_connection, 

32) 

33 

34# Module-level per-user locks to prevent concurrent backup operations 

35# for the same user across different BackupService instances 

36_user_locks: dict[str, threading.Lock] = {} 

37_user_locks_lock = threading.Lock() 

38 

39 

40def _get_user_lock(username: str) -> threading.Lock: 

41 """Get or create a lock for a specific user. 

42 

43 Thread-safe lazy initialization of per-user locks. 

44 

45 Args: 

46 username: The username to get lock for 

47 

48 Returns: 

49 A threading.Lock for the specified user 

50 """ 

51 with _user_locks_lock: 

52 if username not in _user_locks: 

53 _user_locks[username] = threading.Lock() 

54 return _user_locks[username] 

55 

56 

57def pop_user_lock(username: str) -> None: 

58 """Remove the per-user backup lock for ``username`` from the registry. 

59 

60 Called from the user-close path so the module-level dict doesn't 

61 accumulate one entry per username across the process lifetime. The 

62 next backup operation lazily re-creates the lock if needed — the 

63 lock has no state that needs to persist across login/logout. 

64 """ 

65 with _user_locks_lock: 

66 _user_locks.pop(username, None) 

67 

68 

69@dataclass 

70class BackupResult: 

71 """Result of a backup operation.""" 

72 

73 success: bool 

74 backup_path: Optional[Path] = None 

75 error: Optional[str] = None 

76 size_bytes: int = 0 

77 

78 

79class BackupService: 

80 """Service for creating and managing encrypted database backups. 

81 

82 Uses sqlcipher_export() for safe backups that: 

83 - Work correctly with WAL mode 

84 - Preserve encryption with the same key 

85 - Create atomic copies via ATTACH + export + DETACH 

86 - Never corrupt the source database 

87 """ 

88 

89 def __init__( 

90 self, 

91 username: str, 

92 password: str, 

93 max_backups: int = 1, 

94 max_age_days: int = 7, 

95 ): 

96 """Initialize backup service. 

97 

98 Args: 

99 username: User's username 

100 password: User's password (for encryption) 

101 max_backups: Maximum number of backup files to keep 

102 max_age_days: Delete backups older than this many days 

103 """ 

104 self.username = username 

105 self.password = password 

106 self.max_backups = max_backups 

107 self.max_age_days = max_age_days 

108 

109 # Get paths 

110 self.db_filename = get_user_database_filename(username) 

111 self.db_path = get_encrypted_database_path() / self.db_filename 

112 self.backup_dir = get_user_backup_directory(username) 

113 

114 def create_backup(self, force: bool = False) -> BackupResult: 

115 """Create an encrypted backup of the user's database. 

116 

117 Uses sqlcipher_export() to create a safe, atomic backup that inherits 

118 the encryption key from the source database. The backup is created 

119 with a .tmp suffix and atomically renamed to prevent race conditions 

120 with cleanup operations. 

121 

122 By default, only one backup per calendar day is created to prevent 

123 a corrupted database from rapidly overwriting all good backups. 

124 Use force=True to bypass this check (used by pre-migration backups). 

125 

126 This method is protected by a per-user lock to prevent concurrent 

127 backup operations for the same user. 

128 

129 Args: 

130 force: If True, skip the daily limit check. 

131 

132 Returns: 

133 BackupResult with success status and backup path 

134 """ 

135 # Acquire per-user lock to prevent concurrent backup operations 

136 with _get_user_lock(self.username): 

137 # Skip if a backup already exists for today (unless forced) 

138 if not force: 

139 today = datetime.now(UTC).strftime("%Y%m%d") 

140 existing_today = list( 

141 self.backup_dir.glob(f"ldr_backup_{today}_*.db") 

142 ) 

143 if existing_today: 

144 latest = max(existing_today, key=lambda p: p.name) 

145 logger.debug( 

146 f"Backup already exists for today ({latest.name}), " 

147 "skipping" 

148 ) 

149 return BackupResult( 

150 success=True, 

151 backup_path=latest, 

152 size_bytes=latest.stat().st_size 

153 if latest.exists() 

154 else 0, 

155 ) 

156 

157 start = time.perf_counter() 

158 result = self._create_backup_impl() 

159 elapsed_ms = (time.perf_counter() - start) * 1000 

160 size_info = ( 

161 f"{result.size_bytes / (1024 * 1024):.1f}MB" 

162 if result.size_bytes 

163 else "unknown size" 

164 ) 

165 if elapsed_ms > 1000: 

166 logger.info( 

167 f"Backup for user {self.username} " 

168 f"({size_info}) took {elapsed_ms:.0f}ms" 

169 ) 

170 else: 

171 logger.debug( 

172 f"Backup for user {self.username} " 

173 f"({size_info}) took {elapsed_ms:.0f}ms" 

174 ) 

175 return result 

176 

177 def _create_backup_impl(self) -> BackupResult: 

178 """Internal implementation of backup creation (must be called with lock held).""" 

179 if not self.db_path.exists(): 

180 return BackupResult( 

181 success=False, 

182 error=f"Database not found: {self.db_path}", 

183 ) 

184 

185 # Check available disk space 

186 try: 

187 db_size = self.db_path.stat().st_size 

188 free_space = shutil.disk_usage(self.backup_dir).free 

189 # Require at least 2x the database size as free space 

190 if free_space < db_size * 2: 

191 return BackupResult( 

192 success=False, 

193 error=f"Insufficient disk space. Need {db_size * 2} bytes, have {free_space}", 

194 ) 

195 except OSError as e: 

196 # Fail closed - don't proceed with backup if we can't verify disk space 

197 logger.warning("Could not check disk space, skipping backup") 

198 return BackupResult( 

199 success=False, 

200 error=f"Could not verify disk space: {e}", 

201 ) 

202 

203 # Generate backup filename with timestamp 

204 # Use .tmp suffix during creation to prevent cleanup race conditions 

205 timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") 

206 backup_filename = f"ldr_backup_{timestamp}.db" 

207 backup_path = self.backup_dir / backup_filename 

208 temp_path = self.backup_dir / f"ldr_backup_{timestamp}.db.tmp" 

209 

210 try: 

211 # Create connection to source database 

212 conn = create_sqlcipher_connection(str(self.db_path), self.password) 

213 cursor = conn.cursor() 

214 

215 # Set busy timeout so concurrent writers don't cause instant failure 

216 cursor.execute("PRAGMA busy_timeout = 10000") 

217 

218 try: 

219 # Use sqlcipher_export() to create an encrypted backup 

220 # VACUUM INTO doesn't preserve encryption in SQLCipher 

221 # Security: validate temp_path doesn't contain SQL injection chars 

222 temp_path_str = str(temp_path) 

223 if "'" in temp_path_str or '"' in temp_path_str: 

224 raise ValueError( 

225 f"Invalid characters in backup path: {temp_path_str}" 

226 ) 

227 

228 # Get the hex key for ATTACH (same key derivation as source) 

229 hex_key = get_key_from_password( 

230 self.password, db_path=self.db_path 

231 ).hex() 

232 

233 # Defensive: ensure hex_key is strictly hexadecimal 

234 if not hex_key or not all( 234 ↛ 237line 234 didn't jump to line 237 because the condition on line 234 was never true

235 c in "0123456789abcdef" for c in hex_key 

236 ): 

237 raise ValueError("Derived key is not valid hex") 

238 

239 # Attach backup database with encryption (using temp path) 

240 # Note: ATTACH DATABASE does not support parameter binding 

241 # in SQLite/SQLCipher — f-string is required here. 

242 cursor.execute( 

243 f"ATTACH DATABASE '{temp_path_str}' AS backup KEY \"x'{hex_key}'\"" 

244 ) 

245 

246 try: 

247 # Apply cipher settings to the backup database (must match source) 

248 # Note: PRAGMA statements do not support parameter binding 

249 # in SQLite — f-string is required. Values are validated 

250 # upstream by get_sqlcipher_settings() against allow-lists. 

251 settings = get_sqlcipher_settings() 

252 page_size = int(settings["page_size"]) 

253 kdf_iter = int(settings["kdf_iterations"]) 

254 hmac_alg = str(settings["hmac_algorithm"]) 

255 cursor.execute( 

256 f"PRAGMA backup.cipher_page_size = {page_size}" 

257 ) 

258 cursor.execute( 

259 f"PRAGMA backup.cipher_hmac_algorithm = {hmac_alg}" 

260 ) 

261 cursor.execute(f"PRAGMA backup.kdf_iter = {kdf_iter}") 

262 

263 # Export all data to the backup database 

264 cursor.execute("SELECT sqlcipher_export('backup')") 

265 finally: 

266 # Always detach to release the backup file handle 

267 try: 

268 cursor.execute("DETACH DATABASE backup") 

269 except Exception: 

270 logger.warning( 

271 "DETACH failed (connection will release on close)" 

272 ) 

273 finally: 

274 safe_close(cursor, "backup cursor") 

275 safe_close(conn, "backup connection") 

276 

277 # Verify the backup is valid (still using temp path) 

278 if not self._verify_backup(temp_path): 

279 # Delete corrupted backup 

280 if temp_path.exists(): 

281 temp_path.unlink() 

282 return BackupResult( 

283 success=False, 

284 error="Backup verification failed - backup was corrupted", 

285 ) 

286 

287 # Set restrictive permissions (owner read/write only) 

288 # SECURITY: Backup files contain sensitive user data 

289 os.chmod(temp_path, 0o600) 

290 

291 # Get backup size before rename 

292 backup_size = temp_path.stat().st_size 

293 

294 # Atomic rename from .tmp to final .db 

295 # This ensures cleanup won't see/delete partially created backups 

296 temp_path.rename(backup_path) 

297 

298 logger.info( 

299 f"Created backup for user: {backup_path.name} ({backup_size} bytes)" 

300 ) 

301 

302 # Cleanup old backups (safe now - new backup is finalized) 

303 self._cleanup_old_backups() 

304 

305 return BackupResult( 

306 success=True, 

307 backup_path=backup_path, 

308 size_bytes=backup_size, 

309 ) 

310 

311 except Exception as e: 

312 logger.exception("Backup creation failed") 

313 # Clean up any partial backup (temp file) 

314 if temp_path.exists(): 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 try: 

316 temp_path.unlink() 

317 except OSError: 

318 pass 

319 # Also clean up final path in case rename partially succeeded 

320 if backup_path.exists(): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 try: 

322 backup_path.unlink() 

323 except OSError: 

324 pass 

325 return BackupResult( 

326 success=False, 

327 error=str(e), 

328 ) 

329 

330 def _verify_backup(self, backup_path: Path) -> bool: 

331 """Verify that a backup file is valid and readable. 

332 

333 Args: 

334 backup_path: Path to the backup file 

335 

336 Returns: 

337 True if backup is valid, False otherwise 

338 """ 

339 if not backup_path.exists(): 

340 return False 

341 

342 if backup_path.stat().st_size == 0: 

343 logger.warning("Backup file is empty (0 bytes)") 

344 return False 

345 

346 try: 

347 # Import SQLCipher module 

348 from ..sqlcipher_compat import get_sqlcipher_module 

349 

350 sqlcipher3 = get_sqlcipher_module() 

351 

352 # Open the backup with the same password 

353 conn = sqlcipher3.connect(str(backup_path)) 

354 cursor = conn.cursor() 

355 

356 try: 

357 # Set encryption key using the SOURCE database's salt 

358 # (backup was encrypted with the source DB's per-database salt) 

359 set_sqlcipher_key(cursor, self.password, db_path=self.db_path) 

360 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

361 

362 # Run quick integrity check 

363 cursor.execute("PRAGMA quick_check") 

364 result = cursor.fetchone() 

365 

366 if result and result[0] == "ok": 

367 # Additional verification: try to read a table 

368 if verify_sqlcipher_connection(cursor): 368 ↛ 371line 368 didn't jump to line 371 because the condition on line 368 was always true

369 return True 

370 

371 logger.warning(f"Backup integrity check failed: {result}") 

372 return False 

373 

374 finally: 

375 safe_close(cursor, "backup cursor") 

376 safe_close(conn, "backup connection") 

377 

378 except Exception: 

379 logger.warning("Backup verification failed") 

380 return False 

381 

382 def _cleanup_old_backups(self) -> int: 

383 """Remove old backups based on age and count limits. 

384 

385 Also cleans up stale .tmp files from interrupted backups. 

386 

387 Returns: 

388 Number of backups deleted 

389 """ 

390 deleted_count = 0 

391 cutoff_time = datetime.now(UTC) - timedelta(days=self.max_age_days) 

392 stale_tmp_cutoff = datetime.now(UTC) - timedelta(hours=1) 

393 

394 try: 

395 # Clean up stale .tmp files from interrupted/crashed backups 

396 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"): 

397 try: 

398 mtime = datetime.fromtimestamp( 

399 tmp_file.stat().st_mtime, tz=UTC 

400 ) 

401 if mtime < stale_tmp_cutoff: 

402 tmp_file.unlink() 

403 logger.info( 

404 f"Cleaned up stale temp file: {tmp_file.name}" 

405 ) 

406 except (OSError, FileNotFoundError): 

407 pass 

408 

409 # Get all backup files sorted by modification time (newest first) 

410 def _safe_mtime(p: Path) -> float: 

411 try: 

412 return p.stat().st_mtime 

413 except FileNotFoundError: 

414 return 0.0 

415 

416 backups = [ 

417 p 

418 for p in sorted( 

419 self.backup_dir.glob("ldr_backup_*.db"), 

420 key=_safe_mtime, 

421 reverse=True, 

422 ) 

423 if p.exists() 

424 ] 

425 

426 for i, backup in enumerate(backups): 

427 should_delete = False 

428 

429 # Delete if beyond max count 

430 if i >= self.max_backups: 

431 should_delete = True 

432 reason = f"exceeds max count ({self.max_backups})" 

433 

434 # Delete if too old 

435 else: 

436 try: 

437 mtime = datetime.fromtimestamp( 

438 backup.stat().st_mtime, tz=UTC 

439 ) 

440 if mtime < cutoff_time: 

441 should_delete = True 

442 reason = f"older than {self.max_age_days} days" 

443 except FileNotFoundError: 

444 continue 

445 

446 if should_delete: 

447 try: 

448 backup.unlink() 

449 deleted_count += 1 

450 logger.debug( 

451 f"Deleted old backup {backup.name}: {reason}" 

452 ) 

453 except OSError: 

454 logger.warning(f"Could not delete backup {backup.name}") 

455 

456 except Exception: 

457 logger.exception("Error during backup cleanup") 

458 

459 if deleted_count > 0: 

460 logger.info(f"Cleaned up {deleted_count} old backups") 

461 

462 return deleted_count 

463 

464 def list_backups(self) -> list[dict]: 

465 """List all backups for this user. 

466 

467 Returns: 

468 List of backup info dictionaries with path, size, and timestamp 

469 """ 

470 backups = [] 

471 

472 try: 

473 

474 def _safe_mtime_list(p: Path) -> float: 

475 try: 

476 return p.stat().st_mtime 

477 except FileNotFoundError: 

478 return 0.0 

479 

480 for backup_file in sorted( 

481 self.backup_dir.glob("ldr_backup_*.db"), 

482 key=_safe_mtime_list, 

483 reverse=True, 

484 ): 

485 try: 

486 stat = backup_file.stat() 

487 except FileNotFoundError: 

488 continue 

489 backups.append( 

490 { 

491 "filename": backup_file.name, 

492 "path": str(backup_file), 

493 "size_bytes": stat.st_size, 

494 "created_at": datetime.fromtimestamp( 

495 stat.st_mtime, tz=UTC 

496 ).isoformat(), 

497 } 

498 ) 

499 except Exception: 

500 logger.exception("Error listing backups") 

501 

502 return backups 

503 

504 def purge_and_refresh(self) -> "BackupResult": 

505 """Delete all existing backups and create a fresh one. 

506 

507 Used after a password change to replace old-key backups with a 

508 new backup encrypted under the current password. Old backups 

509 encrypted with a previous password are a security risk (NIST 

510 SP 800-57, OWASP A02) because they remain decryptable with the 

511 old (potentially compromised) password. 

512 

513 Returns: 

514 BackupResult from the fresh backup creation 

515 """ 

516 # Hold per-user lock for the entire purge+create operation to 

517 # prevent a concurrent backup from writing an old-key backup 

518 # between the purge and the fresh backup creation. 

519 with _get_user_lock(self.username): 

520 # Delete all existing backup files 

521 for info in self.list_backups(): 

522 try: 

523 Path(info["path"]).unlink() 

524 logger.debug(f"Purged old-key backup: {info['filename']}") 

525 except OSError: 

526 logger.warning( 

527 f"Could not delete backup {info['filename']}" 

528 ) 

529 

530 # Also clean up any stale .tmp files 

531 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"): 

532 try: 

533 tmp_file.unlink() 

534 except OSError: 

535 logger.warning( 

536 f"Could not delete stale tmp file {tmp_file.name}" 

537 ) 

538 

539 # Create fresh backup with current password (lock already held) 

540 return self._create_backup_impl() 

541 

542 def get_latest_backup(self) -> Optional[Path]: 

543 """Get the path to the most recent backup. 

544 

545 Returns: 

546 Path to latest backup, or None if no backups exist 

547 """ 

548 try: 

549 

550 def _safe_mtime_latest(p: Path) -> float: 

551 try: 

552 return p.stat().st_mtime 

553 except FileNotFoundError: 

554 return 0.0 

555 

556 backups = [ 

557 p 

558 for p in sorted( 

559 self.backup_dir.glob("ldr_backup_*.db"), 

560 key=_safe_mtime_latest, 

561 reverse=True, 

562 ) 

563 if p.exists() 

564 ] 

565 return backups[0] if backups else None 

566 except Exception: 

567 logger.exception("Error finding latest backup") 

568 return None