Coverage for src / local_deep_research / database / backup / backup_service.py: 89%

219 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Core backup service for encrypted database backups. 

2 

3Uses sqlcipher_export() for safe atomic backups that preserve encryption 

4and work correctly with WAL mode. 

5""" 

6 

7import os 

8import shutil 

9import threading 

10from dataclasses import dataclass 

11from datetime import UTC, datetime, timedelta 

12from pathlib import Path 

13from typing import Optional 

14 

15from loguru import logger 

16 

17from ...utilities.resource_utils import safe_close 

18 

19from ...config.paths import ( 

20 get_encrypted_database_path, 

21 get_user_backup_directory, 

22 get_user_database_filename, 

23) 

24from ..sqlcipher_utils import ( 

25 apply_sqlcipher_pragmas, 

26 create_sqlcipher_connection, 

27 get_key_from_password, 

28 get_sqlcipher_settings, 

29 set_sqlcipher_key, 

30 verify_sqlcipher_connection, 

31) 

32 

33# Module-level per-user locks to prevent concurrent backup operations 

34# for the same user across different BackupService instances 

35_user_locks: dict[str, threading.Lock] = {} 

36_user_locks_lock = threading.Lock() 

37 

38 

39def _get_user_lock(username: str) -> threading.Lock: 

40 """Get or create a lock for a specific user. 

41 

42 Thread-safe lazy initialization of per-user locks. 

43 

44 Args: 

45 username: The username to get lock for 

46 

47 Returns: 

48 A threading.Lock for the specified user 

49 """ 

50 with _user_locks_lock: 

51 if username not in _user_locks: 

52 _user_locks[username] = threading.Lock() 

53 return _user_locks[username] 

54 

55 

56@dataclass 

57class BackupResult: 

58 """Result of a backup operation.""" 

59 

60 success: bool 

61 backup_path: Optional[Path] = None 

62 error: Optional[str] = None 

63 size_bytes: int = 0 

64 

65 

66class BackupService: 

67 """Service for creating and managing encrypted database backups. 

68 

69 Uses sqlcipher_export() for safe backups that: 

70 - Work correctly with WAL mode 

71 - Preserve encryption with the same key 

72 - Create atomic copies via ATTACH + export + DETACH 

73 - Never corrupt the source database 

74 """ 

75 

76 def __init__( 

77 self, 

78 username: str, 

79 password: str, 

80 max_backups: int = 1, 

81 max_age_days: int = 7, 

82 ): 

83 """Initialize backup service. 

84 

85 Args: 

86 username: User's username 

87 password: User's password (for encryption) 

88 max_backups: Maximum number of backup files to keep 

89 max_age_days: Delete backups older than this many days 

90 """ 

91 self.username = username 

92 self.password = password 

93 self.max_backups = max_backups 

94 self.max_age_days = max_age_days 

95 

96 # Get paths 

97 self.db_filename = get_user_database_filename(username) 

98 self.db_path = get_encrypted_database_path() / self.db_filename 

99 self.backup_dir = get_user_backup_directory(username) 

100 

101 def create_backup(self, force: bool = False) -> BackupResult: 

102 """Create an encrypted backup of the user's database. 

103 

104 Uses sqlcipher_export() to create a safe, atomic backup that inherits 

105 the encryption key from the source database. The backup is created 

106 with a .tmp suffix and atomically renamed to prevent race conditions 

107 with cleanup operations. 

108 

109 By default, only one backup per calendar day is created to prevent 

110 a corrupted database from rapidly overwriting all good backups. 

111 Use force=True to bypass this check (used by pre-migration backups). 

112 

113 This method is protected by a per-user lock to prevent concurrent 

114 backup operations for the same user. 

115 

116 Args: 

117 force: If True, skip the daily limit check. 

118 

119 Returns: 

120 BackupResult with success status and backup path 

121 """ 

122 # Acquire per-user lock to prevent concurrent backup operations 

123 with _get_user_lock(self.username): 

124 # Skip if a backup already exists for today (unless forced) 

125 if not force: 

126 today = datetime.now(UTC).strftime("%Y%m%d") 

127 existing_today = list( 

128 self.backup_dir.glob(f"ldr_backup_{today}_*.db") 

129 ) 

130 if existing_today: 

131 latest = max(existing_today, key=lambda p: p.name) 

132 logger.debug( 

133 f"Backup already exists for today ({latest.name}), " 

134 "skipping" 

135 ) 

136 return BackupResult( 

137 success=True, 

138 backup_path=latest, 

139 size_bytes=latest.stat().st_size 

140 if latest.exists() 

141 else 0, 

142 ) 

143 

144 return self._create_backup_impl() 

145 

146 def _create_backup_impl(self) -> BackupResult: 

147 """Internal implementation of backup creation (must be called with lock held).""" 

148 if not self.db_path.exists(): 

149 return BackupResult( 

150 success=False, 

151 error=f"Database not found: {self.db_path}", 

152 ) 

153 

154 # Check available disk space 

155 try: 

156 db_size = self.db_path.stat().st_size 

157 free_space = shutil.disk_usage(self.backup_dir).free 

158 # Require at least 2x the database size as free space 

159 if free_space < db_size * 2: 

160 return BackupResult( 

161 success=False, 

162 error=f"Insufficient disk space. Need {db_size * 2} bytes, have {free_space}", 

163 ) 

164 except OSError as e: 

165 # Fail closed - don't proceed with backup if we can't verify disk space 

166 logger.warning("Could not check disk space, skipping backup") 

167 return BackupResult( 

168 success=False, 

169 error=f"Could not verify disk space: {e}", 

170 ) 

171 

172 # Generate backup filename with timestamp 

173 # Use .tmp suffix during creation to prevent cleanup race conditions 

174 timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") 

175 backup_filename = f"ldr_backup_{timestamp}.db" 

176 backup_path = self.backup_dir / backup_filename 

177 temp_path = self.backup_dir / f"ldr_backup_{timestamp}.db.tmp" 

178 

179 try: 

180 # Create connection to source database 

181 conn = create_sqlcipher_connection(str(self.db_path), self.password) 

182 cursor = conn.cursor() 

183 

184 # Set busy timeout so concurrent writers don't cause instant failure 

185 cursor.execute("PRAGMA busy_timeout = 10000") 

186 

187 try: 

188 # Use sqlcipher_export() to create an encrypted backup 

189 # VACUUM INTO doesn't preserve encryption in SQLCipher 

190 # Security: validate temp_path doesn't contain SQL injection chars 

191 temp_path_str = str(temp_path) 

192 if "'" in temp_path_str or '"' in temp_path_str: 

193 raise ValueError( 

194 f"Invalid characters in backup path: {temp_path_str}" 

195 ) 

196 

197 # Get the hex key for ATTACH (same key derivation as source) 

198 hex_key = get_key_from_password( 

199 self.password, db_path=self.db_path 

200 ).hex() 

201 

202 # Defensive: ensure hex_key is strictly hexadecimal 

203 if not hex_key or not all( 203 ↛ 206line 203 didn't jump to line 206 because the condition on line 203 was never true

204 c in "0123456789abcdef" for c in hex_key 

205 ): 

206 raise ValueError("Derived key is not valid hex") 

207 

208 # Attach backup database with encryption (using temp path) 

209 # Note: ATTACH DATABASE does not support parameter binding 

210 # in SQLite/SQLCipher — f-string is required here. 

211 cursor.execute( 

212 f"ATTACH DATABASE '{temp_path_str}' AS backup KEY \"x'{hex_key}'\"" 

213 ) 

214 

215 try: 

216 # Apply cipher settings to the backup database (must match source) 

217 # Note: PRAGMA statements do not support parameter binding 

218 # in SQLite — f-string is required. Values are validated 

219 # upstream by get_sqlcipher_settings() against allow-lists. 

220 settings = get_sqlcipher_settings() 

221 page_size = int(settings["page_size"]) 

222 kdf_iter = int(settings["kdf_iterations"]) 

223 hmac_alg = str(settings["hmac_algorithm"]) 

224 cursor.execute( 

225 f"PRAGMA backup.cipher_page_size = {page_size}" 

226 ) 

227 cursor.execute( 

228 f"PRAGMA backup.cipher_hmac_algorithm = {hmac_alg}" 

229 ) 

230 cursor.execute(f"PRAGMA backup.kdf_iter = {kdf_iter}") 

231 

232 # Export all data to the backup database 

233 cursor.execute("SELECT sqlcipher_export('backup')") 

234 finally: 

235 # Always detach to release the backup file handle 

236 try: 

237 cursor.execute("DETACH DATABASE backup") 

238 except Exception: 

239 logger.warning( 

240 "DETACH failed (connection will release on close)" 

241 ) 

242 finally: 

243 safe_close(cursor, "backup cursor") 

244 safe_close(conn, "backup connection") 

245 

246 # Verify the backup is valid (still using temp path) 

247 if not self._verify_backup(temp_path): 

248 # Delete corrupted backup 

249 if temp_path.exists(): 

250 temp_path.unlink() 

251 return BackupResult( 

252 success=False, 

253 error="Backup verification failed - backup was corrupted", 

254 ) 

255 

256 # Set restrictive permissions (owner read/write only) 

257 # SECURITY: Backup files contain sensitive user data 

258 os.chmod(temp_path, 0o600) 

259 

260 # Get backup size before rename 

261 backup_size = temp_path.stat().st_size 

262 

263 # Atomic rename from .tmp to final .db 

264 # This ensures cleanup won't see/delete partially created backups 

265 temp_path.rename(backup_path) 

266 

267 logger.info( 

268 f"Created backup for user: {backup_path.name} ({backup_size} bytes)" 

269 ) 

270 

271 # Cleanup old backups (safe now - new backup is finalized) 

272 self._cleanup_old_backups() 

273 

274 return BackupResult( 

275 success=True, 

276 backup_path=backup_path, 

277 size_bytes=backup_size, 

278 ) 

279 

280 except Exception as e: 

281 logger.exception("Backup creation failed") 

282 # Clean up any partial backup (temp file) 

283 if temp_path.exists(): 

284 try: 

285 temp_path.unlink() 

286 except OSError: 

287 pass 

288 # Also clean up final path in case rename partially succeeded 

289 if backup_path.exists(): 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 try: 

291 backup_path.unlink() 

292 except OSError: 

293 pass 

294 return BackupResult( 

295 success=False, 

296 error=str(e), 

297 ) 

298 

299 def _verify_backup(self, backup_path: Path) -> bool: 

300 """Verify that a backup file is valid and readable. 

301 

302 Args: 

303 backup_path: Path to the backup file 

304 

305 Returns: 

306 True if backup is valid, False otherwise 

307 """ 

308 if not backup_path.exists(): 

309 return False 

310 

311 if backup_path.stat().st_size == 0: 

312 logger.warning("Backup file is empty (0 bytes)") 

313 return False 

314 

315 try: 

316 # Import SQLCipher module 

317 from ..sqlcipher_compat import get_sqlcipher_module 

318 

319 sqlcipher3 = get_sqlcipher_module() 

320 

321 # Open the backup with the same password 

322 conn = sqlcipher3.connect(str(backup_path)) 

323 cursor = conn.cursor() 

324 

325 try: 

326 # Set encryption key using the SOURCE database's salt 

327 # (backup was encrypted with the source DB's per-database salt) 

328 set_sqlcipher_key(cursor, self.password, db_path=self.db_path) 

329 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

330 

331 # Run quick integrity check 

332 cursor.execute("PRAGMA quick_check") 

333 result = cursor.fetchone() 

334 

335 if result and result[0] == "ok": 

336 # Additional verification: try to read a table 

337 if verify_sqlcipher_connection(cursor): 337 ↛ 340line 337 didn't jump to line 340 because the condition on line 337 was always true

338 return True 

339 

340 logger.warning(f"Backup integrity check failed: {result}") 

341 return False 

342 

343 finally: 

344 safe_close(cursor, "backup cursor") 

345 safe_close(conn, "backup connection") 

346 

347 except Exception: 

348 logger.warning("Backup verification failed") 

349 return False 

350 

351 def _cleanup_old_backups(self) -> int: 

352 """Remove old backups based on age and count limits. 

353 

354 Also cleans up stale .tmp files from interrupted backups. 

355 

356 Returns: 

357 Number of backups deleted 

358 """ 

359 deleted_count = 0 

360 cutoff_time = datetime.now(UTC) - timedelta(days=self.max_age_days) 

361 stale_tmp_cutoff = datetime.now(UTC) - timedelta(hours=1) 

362 

363 try: 

364 # Clean up stale .tmp files from interrupted/crashed backups 

365 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"): 

366 try: 

367 mtime = datetime.fromtimestamp( 

368 tmp_file.stat().st_mtime, tz=UTC 

369 ) 

370 if mtime < stale_tmp_cutoff: 

371 tmp_file.unlink() 

372 logger.info( 

373 f"Cleaned up stale temp file: {tmp_file.name}" 

374 ) 

375 except (OSError, FileNotFoundError): 

376 pass 

377 

378 # Get all backup files sorted by modification time (newest first) 

379 def _safe_mtime(p: Path) -> float: 

380 try: 

381 return p.stat().st_mtime 

382 except FileNotFoundError: 

383 return 0.0 

384 

385 backups = [ 

386 p 

387 for p in sorted( 

388 self.backup_dir.glob("ldr_backup_*.db"), 

389 key=_safe_mtime, 

390 reverse=True, 

391 ) 

392 if p.exists() 

393 ] 

394 

395 for i, backup in enumerate(backups): 

396 should_delete = False 

397 

398 # Delete if beyond max count 

399 if i >= self.max_backups: 

400 should_delete = True 

401 reason = f"exceeds max count ({self.max_backups})" 

402 

403 # Delete if too old 

404 else: 

405 try: 

406 mtime = datetime.fromtimestamp( 

407 backup.stat().st_mtime, tz=UTC 

408 ) 

409 if mtime < cutoff_time: 

410 should_delete = True 

411 reason = f"older than {self.max_age_days} days" 

412 except FileNotFoundError: 

413 continue 

414 

415 if should_delete: 

416 try: 

417 backup.unlink() 

418 deleted_count += 1 

419 logger.debug( 

420 f"Deleted old backup {backup.name}: {reason}" 

421 ) 

422 except OSError: 

423 logger.warning(f"Could not delete backup {backup.name}") 

424 

425 except Exception: 

426 logger.exception("Error during backup cleanup") 

427 

428 if deleted_count > 0: 

429 logger.info(f"Cleaned up {deleted_count} old backups") 

430 

431 return deleted_count 

432 

433 def list_backups(self) -> list[dict]: 

434 """List all backups for this user. 

435 

436 Returns: 

437 List of backup info dictionaries with path, size, and timestamp 

438 """ 

439 backups = [] 

440 

441 try: 

442 

443 def _safe_mtime_list(p: Path) -> float: 

444 try: 

445 return p.stat().st_mtime 

446 except FileNotFoundError: 

447 return 0.0 

448 

449 for backup_file in sorted( 

450 self.backup_dir.glob("ldr_backup_*.db"), 

451 key=_safe_mtime_list, 

452 reverse=True, 

453 ): 

454 try: 

455 stat = backup_file.stat() 

456 except FileNotFoundError: 

457 continue 

458 backups.append( 

459 { 

460 "filename": backup_file.name, 

461 "path": str(backup_file), 

462 "size_bytes": stat.st_size, 

463 "created_at": datetime.fromtimestamp( 

464 stat.st_mtime, tz=UTC 

465 ).isoformat(), 

466 } 

467 ) 

468 except Exception: 

469 logger.exception("Error listing backups") 

470 

471 return backups 

472 

473 def purge_and_refresh(self) -> "BackupResult": 

474 """Delete all existing backups and create a fresh one. 

475 

476 Used after a password change to replace old-key backups with a 

477 new backup encrypted under the current password. Old backups 

478 encrypted with a previous password are a security risk (NIST 

479 SP 800-57, OWASP A02) because they remain decryptable with the 

480 old (potentially compromised) password. 

481 

482 Returns: 

483 BackupResult from the fresh backup creation 

484 """ 

485 # Hold per-user lock for the entire purge+create operation to 

486 # prevent a concurrent backup from writing an old-key backup 

487 # between the purge and the fresh backup creation. 

488 with _get_user_lock(self.username): 

489 # Delete all existing backup files 

490 for info in self.list_backups(): 

491 try: 

492 Path(info["path"]).unlink() 

493 logger.debug(f"Purged old-key backup: {info['filename']}") 

494 except OSError: 

495 logger.warning( 

496 f"Could not delete backup {info['filename']}" 

497 ) 

498 

499 # Also clean up any stale .tmp files 

500 for tmp_file in self.backup_dir.glob("ldr_backup_*.db.tmp"): 

501 try: 

502 tmp_file.unlink() 

503 except OSError: 

504 logger.warning( 

505 f"Could not delete stale tmp file {tmp_file.name}" 

506 ) 

507 

508 # Create fresh backup with current password (lock already held) 

509 return self._create_backup_impl() 

510 

511 def get_latest_backup(self) -> Optional[Path]: 

512 """Get the path to the most recent backup. 

513 

514 Returns: 

515 Path to latest backup, or None if no backups exist 

516 """ 

517 try: 

518 

519 def _safe_mtime_latest(p: Path) -> float: 

520 try: 

521 return p.stat().st_mtime 

522 except FileNotFoundError: 

523 return 0.0 

524 

525 backups = [ 

526 p 

527 for p in sorted( 

528 self.backup_dir.glob("ldr_backup_*.db"), 

529 key=_safe_mtime_latest, 

530 reverse=True, 

531 ) 

532 if p.exists() 

533 ] 

534 return backups[0] if backups else None 

535 except Exception: 

536 logger.exception("Error finding latest backup") 

537 return None