Coverage for src/local_deep_research/database/sqlcipher_utils.py: 97%

167 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2SQLCipher utility functions for consistent database operations. 

3 

4This module centralizes all SQLCipher-specific operations to ensure 

5consistent password handling and PRAGMA settings across the codebase. 

6""" 

7 

8import os 

9import secrets 

10import threading 

11import time 

12from hashlib import pbkdf2_hmac 

13from pathlib import Path 

14from typing import Any, Dict, Optional, Union 

15 

16from loguru import logger 

17 

18from ..settings.env_registry import get_env_setting 

19 

20# Lock to protect cipher_default_* global state during creation 

21_cipher_default_lock = threading.Lock() 

22 

23# Salt file constants 

24SALT_FILE_SUFFIX = ".salt" 

25SALT_SIZE = 32 # 256 bits 

26 

27 

28def get_salt_file_path(db_path: Union[str, Path]) -> Path: 

29 """ 

30 Get the path to the salt file for a database. 

31 

32 Args: 

33 db_path: Path to the database file 

34 

35 Returns: 

36 Path to the corresponding .salt file 

37 """ 

38 return Path(db_path).with_suffix(Path(db_path).suffix + SALT_FILE_SUFFIX) 

39 

40 

41def get_salt_for_database(db_path: Union[str, Path]) -> bytes: 

42 """ 

43 Get the salt for a database file. 

44 

45 For new databases (v2+): reads from the .salt file alongside the database. 

46 For legacy databases (v1): returns LEGACY_PBKDF2_SALT for backwards compatibility. 

47 

48 Args: 

49 db_path: Path to the database file 

50 

51 Returns: 

52 The salt bytes to use for key derivation 

53 """ 

54 salt_file = get_salt_file_path(db_path) 

55 

56 try: 

57 salt = salt_file.read_bytes() 

58 except FileNotFoundError: 

59 # v1: Legacy salt for backwards compatibility 

60 logger.warning( 

61 f"Database '{Path(db_path).name}' uses the legacy shared salt " 

62 f"(deprecated). Consider creating a new database to benefit from " 

63 f"per-database salt security. See issue #1439 for migration details." 

64 ) 

65 return LEGACY_PBKDF2_SALT 

66 

67 # v2: Per-database random salt 

68 if len(salt) != SALT_SIZE: 

69 raise ValueError( 

70 f"Salt file {salt_file} has unexpected size ({len(salt)} bytes), " 

71 f"expected {SALT_SIZE}. The salt file may be corrupted." 

72 ) 

73 return salt 

74 

75 

76def create_database_salt(db_path: Union[str, Path]) -> bytes: 

77 """ 

78 Create and store a new random salt for a database. 

79 

80 This should be called when creating a new database. 

81 The salt is stored in a .salt file alongside the database. 

82 

83 WARNING: If this salt file is deleted, the associated database becomes 

84 permanently unreadable. Always back up .salt files alongside their .db files. 

85 

86 Args: 

87 db_path: Path to the database file 

88 

89 Returns: 

90 The newly generated salt bytes 

91 

92 Raises: 

93 FileExistsError: If a salt file already exists for this database 

94 """ 

95 salt_file = get_salt_file_path(db_path) 

96 

97 if salt_file.exists(): 

98 raise FileExistsError( 

99 f"Salt file already exists: {salt_file}. " 

100 f"Refusing to overwrite to prevent data loss." 

101 ) 

102 

103 salt = secrets.token_bytes(SALT_SIZE) 

104 

105 # Ensure parent directory exists 

106 salt_file.parent.mkdir(parents=True, exist_ok=True) 

107 

108 # Write salt file with owner-only permissions (0o600) 

109 fd = os.open(str(salt_file), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) 

110 try: 

111 os.write(fd, salt) 

112 finally: 

113 os.close(fd) 

114 

115 logger.info(f"Created new database salt file: {salt_file}") 

116 return salt 

117 

118 

119def has_per_database_salt(db_path: Union[str, Path]) -> bool: 

120 """ 

121 Check if a database has a per-database salt file (v2). 

122 

123 Args: 

124 db_path: Path to the database file 

125 

126 Returns: 

127 True if the database has a .salt file, False otherwise 

128 """ 

129 return get_salt_file_path(db_path).exists() 

130 

131 

132def _get_key_from_password( 

133 password: str, salt: bytes, kdf_iterations: int 

134) -> bytes: 

135 """ 

136 Generates an encryption key from the user's password and salt. 

137 

138 Args: 

139 password: The password. 

140 salt: The salt bytes to use for key derivation. 

141 kdf_iterations: Number of PBKDF2 iterations. 

142 

143 Returns: 

144 The generated key. 

145 """ 

146 logger.debug( 

147 f"Generating DB encryption key with {kdf_iterations} iterations..." 

148 ) 

149 

150 start = time.perf_counter() 

151 key = pbkdf2_hmac( 

152 "sha512", 

153 password.encode(), 

154 salt, 

155 kdf_iterations, 

156 ) 

157 elapsed_ms = (time.perf_counter() - start) * 1000 

158 

159 if elapsed_ms > 500: 

160 logger.info( 

161 f"PBKDF2 key derivation took {elapsed_ms:.0f}ms " 

162 f"({kdf_iterations} iterations)" 

163 ) 

164 else: 

165 logger.debug( 

166 f"PBKDF2 key derivation took {elapsed_ms:.0f}ms " 

167 f"({kdf_iterations} iterations)" 

168 ) 

169 return key 

170 

171 

172def get_key_from_password( 

173 password: str, db_path: Optional[Union[str, Path]] = None 

174) -> bytes: 

175 """ 

176 Wrapper that gets salt and settings, then calls the key derivation. 

177 

178 Args: 

179 password: The password. 

180 db_path: Optional path to the database file. If provided, uses 

181 per-database salt. If not provided, uses legacy salt. 

182 

183 Returns: 

184 The derived encryption key bytes. 

185 """ 

186 if db_path is not None: 

187 salt = get_salt_for_database(db_path) 

188 else: 

189 salt = LEGACY_PBKDF2_SALT 

190 

191 settings = get_sqlcipher_settings() 

192 return _get_key_from_password(password, salt, settings["kdf_iterations"]) 

193 

194 

195def set_sqlcipher_key( 

196 cursor_or_conn: Any, 

197 password: str, 

198 db_path: Optional[Union[str, Path]] = None, 

199) -> None: 

200 """ 

201 Set the SQLCipher encryption key using hexadecimal encoding. 

202 

203 This avoids SQL injection and escaping issues with special characters. 

204 

205 Args: 

206 cursor_or_conn: SQLCipher cursor or connection object 

207 password: The password to use for encryption 

208 db_path: Optional path to the database file. If provided, uses 

209 per-database salt. If not provided, uses legacy salt. 

210 """ 

211 key = get_key_from_password(password, db_path=db_path) # gitleaks:allow 

212 cursor_or_conn.execute(f"PRAGMA key = \"x'{key.hex()}'\"") 

213 

214 

215def set_sqlcipher_key_from_hex(cursor_or_conn: Any, hex_key: str) -> None: 

216 """ 

217 Set the SQLCipher encryption key from a pre-derived hex key string. 

218 

219 Used by connection closures to avoid capturing plaintext passwords. 

220 

221 Args: 

222 cursor_or_conn: SQLCipher cursor or connection object 

223 hex_key: Pre-derived hex key string (from get_key_from_password().hex()) 

224 """ 

225 cursor_or_conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # gitleaks:allow 

226 

227 

228def set_sqlcipher_rekey( 

229 cursor_or_conn: Any, 

230 new_password: str, 

231 db_path: Optional[Union[str, Path]] = None, 

232) -> None: 

233 """ 

234 Change the SQLCipher encryption key using hexadecimal encoding. 

235 

236 Uses the same PBKDF2 key derivation as set_sqlcipher_key() to ensure 

237 consistency when re-opening databases after password change. 

238 

239 Args: 

240 cursor_or_conn: SQLCipher cursor or connection object 

241 new_password: The new password to use for encryption 

242 db_path: Optional path to the database file. If provided, uses 

243 per-database salt. If not provided, uses legacy salt. 

244 """ 

245 # Use the same key derivation as set_sqlcipher_key for consistency 

246 key = get_key_from_password(new_password, db_path=db_path) # gitleaks:allow 

247 

248 # The hex encoding already prevents injection since it only contains [0-9a-f] 

249 safe_sql = f"PRAGMA rekey = \"x'{key.hex()}'\"" 

250 

251 try: 

252 # Try SQLAlchemy connection (needs text() wrapper) 

253 from sqlalchemy import text 

254 

255 cursor_or_conn.execute(text(safe_sql)) 

256 except TypeError: 

257 # Raw SQLCipher connection - use string directly 

258 cursor_or_conn.execute(safe_sql) 

259 

260 

261# Default SQLCipher configuration (can be overridden by settings) 

262DEFAULT_KDF_ITERATIONS = 256000 

263DEFAULT_PAGE_SIZE = 16384 # 16KB pages for maximum performance with caching 

264DEFAULT_HMAC_ALGORITHM = "HMAC_SHA512" 

265DEFAULT_KDF_ALGORITHM = "PBKDF2_HMAC_SHA512" 

266 

267# Valid page sizes (powers of 2 within the SQLite range). 

268# IntegerSetting validates min/max but not that the value is a power of 2, 

269# so we check against this set as an additional safeguard. 

270VALID_PAGE_SIZES = frozenset({512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}) 

271MAX_KDF_ITERATIONS = 1_000_000 

272 

273# Production minimum KDF iterations. Relaxed automatically in test/CI environments. 

274MIN_KDF_ITERATIONS_PRODUCTION = 100_000 

275MIN_KDF_ITERATIONS_TESTING = 1 

276 

277 

278def _get_min_kdf_iterations() -> int: 

279 """Get minimum KDF iterations, relaxed for test/CI environments. 

280 

281 Only relaxes when PYTEST_CURRENT_TEST (set automatically by pytest) or 

282 LDR_TEST_MODE (project-specific) is set. Generic env vars like CI or 

283 TESTING are NOT checked to avoid accidentally weakening production 

284 encryption in Docker/CD pipelines that set CI=true. 

285 """ 

286 is_testing = os.environ.get("PYTEST_CURRENT_TEST") or os.environ.get( 

287 "LDR_TEST_MODE" 

288 ) 

289 return ( 

290 MIN_KDF_ITERATIONS_TESTING 

291 if is_testing 

292 else MIN_KDF_ITERATIONS_PRODUCTION 

293 ) 

294 

295 

296# Legacy salt for backwards compatibility with databases created before v2. 

297# New databases use per-database random salts stored in .salt files. 

298# WARNING: Do NOT change this value - it would break all existing legacy databases! 

299LEGACY_PBKDF2_SALT = b"no salt" 

300 

301# Alias for backwards compatibility with code that references the old name 

302PBKDF2_PLACEHOLDER_SALT = LEGACY_PBKDF2_SALT 

303 

304 

305def get_sqlcipher_settings() -> dict: 

306 """ 

307 Get SQLCipher settings from environment variables or use defaults. 

308 

309 These settings cannot be changed after database creation, so they 

310 must be configured via environment variables only. 

311 

312 Settings are read via the env settings registry, which handles 

313 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback 

314 to deprecated names (LDR_DB_*) and deprecation warnings. 

315 

316 Returns: 

317 Dictionary with SQLCipher configuration 

318 """ 

319 # HMAC algorithm - registry validates against allowed values 

320 hmac_algorithm = get_env_setting( 

321 "db_config.hmac_algorithm", DEFAULT_HMAC_ALGORITHM 

322 ) 

323 

324 # KDF algorithm - registry validates against allowed values 

325 kdf_algorithm = get_env_setting( 

326 "db_config.kdf_algorithm", DEFAULT_KDF_ALGORITHM 

327 ) 

328 

329 # Page size - registry validates range, we also check power-of-2 

330 page_size = get_env_setting("db_config.page_size", DEFAULT_PAGE_SIZE) 

331 if page_size not in VALID_PAGE_SIZES: 

332 logger.warning( 

333 f"Invalid page_size value '{page_size}', using default " 

334 f"'{DEFAULT_PAGE_SIZE}'. Valid values: {sorted(VALID_PAGE_SIZES)}" 

335 ) 

336 page_size = DEFAULT_PAGE_SIZE 

337 

338 # KDF iterations - registry validates basic range, then apply CI-aware minimum 

339 kdf_iterations = get_env_setting( 

340 "db_config.kdf_iterations", DEFAULT_KDF_ITERATIONS 

341 ) 

342 min_kdf = _get_min_kdf_iterations() 

343 if not (min_kdf <= kdf_iterations <= MAX_KDF_ITERATIONS): 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 logger.warning( 

345 f"KDF iterations value '{kdf_iterations}' outside safe range " 

346 f"[{min_kdf}, {MAX_KDF_ITERATIONS}], using default " 

347 f"'{DEFAULT_KDF_ITERATIONS}'." 

348 ) 

349 kdf_iterations = DEFAULT_KDF_ITERATIONS 

350 

351 return { 

352 "kdf_iterations": kdf_iterations, 

353 "page_size": page_size, 

354 "hmac_algorithm": hmac_algorithm, 

355 "kdf_algorithm": kdf_algorithm, 

356 } 

357 

358 

359def apply_cipher_defaults_before_key( 

360 cursor_or_conn: Any, 

361) -> None: 

362 """ 

363 Apply cipher_default_* pragmas BEFORE PRAGMA key for new database creation. 

364 

365 Per SQLCipher 4.x docs, cipher_default_* pragmas set the defaults that 

366 apply when a key is set on a NEW database. These MUST be called before 

367 PRAGMA key. 

368 

369 For EXISTING databases, cipher_page_size/cipher_hmac_algorithm/ 

370 cipher_kdf_algorithm are set AFTER the key via apply_sqlcipher_pragmas(). 

371 

372 Args: 

373 cursor_or_conn: SQLCipher cursor or connection object 

374 """ 

375 settings = get_sqlcipher_settings() 

376 

377 logger.debug( 

378 f"Applying cipher_default_* pragmas for new DB: settings={settings}" 

379 ) 

380 

381 cursor_or_conn.execute( 

382 f"PRAGMA cipher_default_page_size = {settings['page_size']}" 

383 ) 

384 cursor_or_conn.execute( 

385 f"PRAGMA cipher_default_hmac_algorithm = {settings['hmac_algorithm']}" 

386 ) 

387 cursor_or_conn.execute( 

388 f"PRAGMA cipher_default_kdf_algorithm = {settings['kdf_algorithm']}" 

389 ) 

390 

391 

392def apply_sqlcipher_pragmas( 

393 cursor_or_conn: Any, 

394 creation_mode: bool = False, 

395) -> None: 

396 """ 

397 Apply SQLCipher PRAGMA settings that are set AFTER the key. 

398 

399 For SQLCipher 4.x: 

400 - New databases: cipher_default_* are set before key via 

401 apply_cipher_defaults_before_key(). This function only sets kdf_iter. 

402 - Existing databases: cipher_page_size, cipher_hmac_algorithm, 

403 cipher_kdf_algorithm MUST be set AFTER the key (not before). 

404 This function handles that. 

405 

406 Args: 

407 cursor_or_conn: SQLCipher cursor or connection object 

408 creation_mode: If True, only sets kdf_iter (defaults already applied). 

409 If False, sets cipher_* settings + kdf_iter for existing DB. 

410 """ 

411 settings = get_sqlcipher_settings() 

412 

413 if not creation_mode: 

414 # For existing databases: cipher_* pragmas go AFTER the key 

415 cursor_or_conn.execute( 

416 f"PRAGMA cipher_page_size = {settings['page_size']}" 

417 ) 

418 cursor_or_conn.execute( 

419 f"PRAGMA cipher_hmac_algorithm = {settings['hmac_algorithm']}" 

420 ) 

421 cursor_or_conn.execute( 

422 f"PRAGMA cipher_kdf_algorithm = {settings['kdf_algorithm']}" 

423 ) 

424 

425 # kdf_iter can be set after the key (applies to future derivation) 

426 cursor_or_conn.execute(f"PRAGMA kdf_iter = {settings['kdf_iterations']}") 

427 

428 # cipher_memory_security is a runtime PRAGMA. ON zeroes SQLCipher buffers 

429 # and calls mlock() to prevent swap; OFF skips this. Defaulting to OFF 

430 # because the password already sits unprotected in Flask session, db_manager, 

431 # and thread-local storage — mlock on SQLCipher's buffers alone doesn't help. 

432 # Users can opt in with LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY=ON + IPC_LOCK. 

433 # Applied on every connection (not just creation) so env var overrides work. 

434 mem_security = get_env_setting("db_config.cipher_memory_security", "OFF") 

435 cursor_or_conn.execute(f"PRAGMA cipher_memory_security = {mem_security}") 

436 

437 

438def apply_performance_pragmas(cursor_or_conn: Any) -> None: 

439 """ 

440 Apply performance-related PRAGMA settings from environment variables. 

441 

442 Settings are read via the env settings registry, which handles 

443 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback 

444 to deprecated names (LDR_DB_*) and deprecation warnings. 

445 

446 Args: 

447 cursor_or_conn: SQLCipher cursor or connection object 

448 """ 

449 # Default values that are always applied 

450 cursor_or_conn.execute("PRAGMA temp_store = MEMORY") 

451 cursor_or_conn.execute("PRAGMA busy_timeout = 10000") # 10 second timeout 

452 

453 # SQLite defaults foreign_keys to OFF — without this every 

454 # ondelete="CASCADE"/"SET NULL" declared on an FK is inert, 

455 # including for raw-SQL DELETEs issued by Query.delete(). 

456 cursor_or_conn.execute("PRAGMA foreign_keys = ON") 

457 

458 # Cache size - registry validates min/max range 

459 cache_mb = get_env_setting("db_config.cache_size_mb", 64) 

460 cache_pages = -(cache_mb * 1024) # Negative for KB cache size 

461 cursor_or_conn.execute(f"PRAGMA cache_size = {cache_pages}") 

462 

463 # Journal mode - registry validates against allowed values 

464 journal_mode = get_env_setting("db_config.journal_mode", "WAL") 

465 cursor_or_conn.execute(f"PRAGMA journal_mode = {journal_mode}") 

466 

467 # Synchronous mode - registry validates against allowed values 

468 sync_mode = get_env_setting("db_config.synchronous", "NORMAL") 

469 cursor_or_conn.execute(f"PRAGMA synchronous = {sync_mode}") 

470 

471 # WAL autocheckpoint frame threshold. SQLite's default of 1000 frames 

472 # paired with our 16 KB page size means the WAL can grow to ~16 MB 

473 # before SQLite triggers a PASSIVE checkpoint at commit. Lowering the 

474 # threshold bounds the WAL high-water-mark on disk for users who never 

475 # log out (the explicit TRUNCATE checkpoint runs on dispose, not here). 

476 wal_autocheckpoint = get_env_setting("db_config.wal_autocheckpoint", 250) 

477 cursor_or_conn.execute(f"PRAGMA wal_autocheckpoint = {wal_autocheckpoint}") 

478 

479 

480def verify_sqlcipher_connection(cursor_or_conn: Any) -> bool: 

481 """ 

482 Verify that the SQLCipher connection is working correctly. 

483 

484 Args: 

485 cursor_or_conn: SQLCipher cursor or connection object 

486 

487 Returns: 

488 True if the connection is valid, False otherwise 

489 """ 

490 try: 

491 cursor_or_conn.execute("SELECT 1") 

492 result = ( 

493 cursor_or_conn.fetchone() 

494 if hasattr(cursor_or_conn, "fetchone") 

495 else cursor_or_conn.execute("SELECT 1").fetchone() 

496 ) 

497 is_valid = result == (1,) 

498 if not is_valid: 

499 logger.error( 

500 f"SQLCipher verification failed: result {result} != (1,)" 

501 ) 

502 return is_valid 

503 except Exception: 

504 logger.exception("SQLCipher verification failed") 

505 return False 

506 

507 

508def get_sqlcipher_version(cursor_or_conn: Any) -> Optional[str]: 

509 """ 

510 Get the SQLCipher version string. 

511 

512 Args: 

513 cursor_or_conn: SQLCipher cursor or connection object 

514 

515 Returns: 

516 Version string (e.g. "4.6.1 community") or None if unavailable 

517 """ 

518 try: 

519 cursor_or_conn.execute("PRAGMA cipher_version") 

520 result = cursor_or_conn.fetchone() 

521 return result[0] if result else None 

522 except Exception: 

523 logger.debug("Could not query SQLCipher version", exc_info=True) 

524 return None 

525 

526 

527def create_sqlcipher_connection( 

528 db_path: Union[str, Path], 

529 password: Optional[str] = None, 

530 creation_mode: bool = False, 

531 connect_kwargs: Optional[Dict[str, Any]] = None, 

532 hex_key: Optional[str] = None, 

533) -> Any: 

534 """ 

535 Create a properly configured SQLCipher connection. 

536 

537 Implements the full PRAGMA sequence with proper error cleanup: 

538 - Creation: cipher_default_* -> key -> kdf_iter -> performance -> verify 

539 - Existing: key -> cipher_* + kdf_iter -> performance -> verify 

540 

541 Uses per-database salt if a .salt file exists alongside the database, 

542 otherwise falls back to legacy salt for backwards compatibility. 

543 

544 Args: 

545 db_path: Path to the database file 

546 password: The password for encryption (mutually exclusive with hex_key) 

547 creation_mode: If True, set cipher_default_* before key (new DB) 

548 connect_kwargs: Extra kwargs passed to sqlcipher3.connect() 

549 hex_key: Pre-derived hex key (skips PBKDF2 derivation) 

550 

551 Returns: 

552 SQLCipher connection object 

553 

554 Raises: 

555 ImportError: If sqlcipher3 is not available 

556 ValueError: If the connection cannot be established 

557 """ 

558 from .sqlcipher_compat import get_sqlcipher_module 

559 

560 try: 

561 sqlcipher3 = get_sqlcipher_module() 

562 except ImportError: 

563 raise ImportError( 

564 "sqlcipher3 is not available for encrypted databases. " 

565 "Ensure SQLCipher system library is installed, then run: pdm install" 

566 ) 

567 

568 conn = sqlcipher3.connect(str(db_path), **(connect_kwargs or {})) 

569 try: 

570 cursor = conn.cursor() 

571 

572 if creation_mode: 

573 with _cipher_default_lock: 

574 apply_cipher_defaults_before_key(cursor) 

575 

576 # Set encryption key (uses per-database salt when password + db_path) 

577 if hex_key: 

578 set_sqlcipher_key_from_hex(cursor, hex_key) 

579 elif password: 579 ↛ 582line 579 didn't jump to line 582 because the condition on line 579 was always true

580 set_sqlcipher_key(cursor, password, db_path=db_path) 

581 else: 

582 raise ValueError("Either password or hex_key must be provided") # noqa: TRY301 — except does connection cleanup before re-raise 

583 

584 # Apply post-key pragmas (cipher_* for existing, kdf_iter for both) 

585 apply_sqlcipher_pragmas(cursor, creation_mode=creation_mode) 

586 

587 # Apply performance settings 

588 apply_performance_pragmas(cursor) 

589 

590 # Verify connection works 

591 if not verify_sqlcipher_connection(cursor): 

592 raise ValueError( # noqa: TRY301 — except does connection cleanup before re-raise 

593 "Failed to establish encrypted database connection" 

594 ) 

595 

596 cursor.close() 

597 return conn 

598 except Exception: 

599 from ..utilities.resource_utils import safe_close 

600 

601 safe_close(conn, "SQLCipher connection") 

602 raise 

603 

604 

605# Backwards compatibility alias — old name still importable 

606apply_cipher_settings_before_key = apply_cipher_defaults_before_key