Coverage for src / local_deep_research / database / sqlcipher_utils.py: 97%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2SQLCipher utility functions for consistent database operations. 

3 

4This module centralizes all SQLCipher-specific operations to ensure 

5consistent password handling and PRAGMA settings across the codebase. 

6""" 

7 

8import os 

9import secrets 

10import threading 

11from hashlib import pbkdf2_hmac 

12from pathlib import Path 

13from typing import Any, Dict, Optional, Union 

14 

15from loguru import logger 

16 

17from ..settings.env_registry import get_env_setting 

18 

19# Lock to protect cipher_default_* global state during creation 

20_cipher_default_lock = threading.Lock() 

21 

22# Salt file constants 

23SALT_FILE_SUFFIX = ".salt" 

24SALT_SIZE = 32 # 256 bits 

25 

26 

27def get_salt_file_path(db_path: Union[str, Path]) -> Path: 

28 """ 

29 Get the path to the salt file for a database. 

30 

31 Args: 

32 db_path: Path to the database file 

33 

34 Returns: 

35 Path to the corresponding .salt file 

36 """ 

37 return Path(db_path).with_suffix(Path(db_path).suffix + SALT_FILE_SUFFIX) 

38 

39 

40def get_salt_for_database(db_path: Union[str, Path]) -> bytes: 

41 """ 

42 Get the salt for a database file. 

43 

44 For new databases (v2+): reads from the .salt file alongside the database. 

45 For legacy databases (v1): returns LEGACY_PBKDF2_SALT for backwards compatibility. 

46 

47 Args: 

48 db_path: Path to the database file 

49 

50 Returns: 

51 The salt bytes to use for key derivation 

52 """ 

53 salt_file = get_salt_file_path(db_path) 

54 

55 try: 

56 salt = salt_file.read_bytes() 

57 except FileNotFoundError: 

58 # v1: Legacy salt for backwards compatibility 

59 logger.warning( 

60 f"Database '{Path(db_path).name}' uses the legacy shared salt " 

61 f"(deprecated). Consider creating a new database to benefit from " 

62 f"per-database salt security. See issue #1439 for migration details." 

63 ) 

64 return LEGACY_PBKDF2_SALT 

65 

66 # v2: Per-database random salt 

67 if len(salt) != SALT_SIZE: 

68 raise ValueError( 

69 f"Salt file {salt_file} has unexpected size ({len(salt)} bytes), " 

70 f"expected {SALT_SIZE}. The salt file may be corrupted." 

71 ) 

72 return salt 

73 

74 

75def create_database_salt(db_path: Union[str, Path]) -> bytes: 

76 """ 

77 Create and store a new random salt for a database. 

78 

79 This should be called when creating a new database. 

80 The salt is stored in a .salt file alongside the database. 

81 

82 WARNING: If this salt file is deleted, the associated database becomes 

83 permanently unreadable. Always back up .salt files alongside their .db files. 

84 

85 Args: 

86 db_path: Path to the database file 

87 

88 Returns: 

89 The newly generated salt bytes 

90 

91 Raises: 

92 FileExistsError: If a salt file already exists for this database 

93 """ 

94 salt_file = get_salt_file_path(db_path) 

95 

96 if salt_file.exists(): 

97 raise FileExistsError( 

98 f"Salt file already exists: {salt_file}. " 

99 f"Refusing to overwrite to prevent data loss." 

100 ) 

101 

102 salt = secrets.token_bytes(SALT_SIZE) 

103 

104 # Ensure parent directory exists 

105 salt_file.parent.mkdir(parents=True, exist_ok=True) 

106 

107 # Write salt file with owner-only permissions (0o600) 

108 fd = os.open(str(salt_file), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) 

109 try: 

110 os.write(fd, salt) 

111 finally: 

112 os.close(fd) 

113 

114 logger.info(f"Created new database salt file: {salt_file}") 

115 return salt 

116 

117 

118def has_per_database_salt(db_path: Union[str, Path]) -> bool: 

119 """ 

120 Check if a database has a per-database salt file (v2). 

121 

122 Args: 

123 db_path: Path to the database file 

124 

125 Returns: 

126 True if the database has a .salt file, False otherwise 

127 """ 

128 return get_salt_file_path(db_path).exists() 

129 

130 

131def _get_key_from_password( 

132 password: str, salt: bytes, kdf_iterations: int 

133) -> bytes: 

134 """ 

135 Generates an encryption key from the user's password and salt. 

136 

137 Args: 

138 password: The password. 

139 salt: The salt bytes to use for key derivation. 

140 kdf_iterations: Number of PBKDF2 iterations. 

141 

142 Returns: 

143 The generated key. 

144 """ 

145 logger.debug( 

146 f"Generating DB encryption key with {kdf_iterations} iterations..." 

147 ) 

148 

149 key = pbkdf2_hmac( 

150 "sha512", 

151 password.encode(), 

152 salt, 

153 kdf_iterations, 

154 ) 

155 

156 logger.debug("Generated DB encryption key.") 

157 return key 

158 

159 

160def get_key_from_password( 

161 password: str, db_path: Optional[Union[str, Path]] = None 

162) -> bytes: 

163 """ 

164 Wrapper that gets salt and settings, then calls the key derivation. 

165 

166 Args: 

167 password: The password. 

168 db_path: Optional path to the database file. If provided, uses 

169 per-database salt. If not provided, uses legacy salt. 

170 

171 Returns: 

172 The derived encryption key bytes. 

173 """ 

174 if db_path is not None: 

175 salt = get_salt_for_database(db_path) 

176 else: 

177 salt = LEGACY_PBKDF2_SALT 

178 

179 settings = get_sqlcipher_settings() 

180 return _get_key_from_password(password, salt, settings["kdf_iterations"]) 

181 

182 

183def set_sqlcipher_key( 

184 cursor_or_conn: Any, 

185 password: str, 

186 db_path: Optional[Union[str, Path]] = None, 

187) -> None: 

188 """ 

189 Set the SQLCipher encryption key using hexadecimal encoding. 

190 

191 This avoids SQL injection and escaping issues with special characters. 

192 

193 Args: 

194 cursor_or_conn: SQLCipher cursor or connection object 

195 password: The password to use for encryption 

196 db_path: Optional path to the database file. If provided, uses 

197 per-database salt. If not provided, uses legacy salt. 

198 """ 

199 key = get_key_from_password(password, db_path=db_path) # gitleaks:allow 

200 cursor_or_conn.execute(f"PRAGMA key = \"x'{key.hex()}'\"") 

201 

202 

203def set_sqlcipher_key_from_hex(cursor_or_conn: Any, hex_key: str) -> None: 

204 """ 

205 Set the SQLCipher encryption key from a pre-derived hex key string. 

206 

207 Used by connection closures to avoid capturing plaintext passwords. 

208 

209 Args: 

210 cursor_or_conn: SQLCipher cursor or connection object 

211 hex_key: Pre-derived hex key string (from get_key_from_password().hex()) 

212 """ 

213 cursor_or_conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # gitleaks:allow 

214 

215 

216def set_sqlcipher_rekey( 

217 cursor_or_conn: Any, 

218 new_password: str, 

219 db_path: Optional[Union[str, Path]] = None, 

220) -> None: 

221 """ 

222 Change the SQLCipher encryption key using hexadecimal encoding. 

223 

224 Uses the same PBKDF2 key derivation as set_sqlcipher_key() to ensure 

225 consistency when re-opening databases after password change. 

226 

227 Args: 

228 cursor_or_conn: SQLCipher cursor or connection object 

229 new_password: The new password to use for encryption 

230 db_path: Optional path to the database file. If provided, uses 

231 per-database salt. If not provided, uses legacy salt. 

232 """ 

233 # Use the same key derivation as set_sqlcipher_key for consistency 

234 key = get_key_from_password(new_password, db_path=db_path) # gitleaks:allow 

235 

236 # The hex encoding already prevents injection since it only contains [0-9a-f] 

237 safe_sql = f"PRAGMA rekey = \"x'{key.hex()}'\"" 

238 

239 try: 

240 # Try SQLAlchemy connection (needs text() wrapper) 

241 from sqlalchemy import text 

242 

243 cursor_or_conn.execute(text(safe_sql)) 

244 except TypeError: 

245 # Raw SQLCipher connection - use string directly 

246 cursor_or_conn.execute(safe_sql) 

247 

248 

249# Default SQLCipher configuration (can be overridden by settings) 

250DEFAULT_KDF_ITERATIONS = 256000 

251DEFAULT_PAGE_SIZE = 16384 # 16KB pages for maximum performance with caching 

252DEFAULT_HMAC_ALGORITHM = "HMAC_SHA512" 

253DEFAULT_KDF_ALGORITHM = "PBKDF2_HMAC_SHA512" 

254 

255# Valid page sizes (powers of 2 within the SQLite range). 

256# IntegerSetting validates min/max but not that the value is a power of 2, 

257# so we check against this set as an additional safeguard. 

258VALID_PAGE_SIZES = frozenset({512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}) 

259MAX_KDF_ITERATIONS = 1_000_000 

260 

261# Production minimum KDF iterations. Relaxed automatically in test/CI environments. 

262MIN_KDF_ITERATIONS_PRODUCTION = 100_000 

263MIN_KDF_ITERATIONS_TESTING = 1 

264 

265 

266def _get_min_kdf_iterations() -> int: 

267 """Get minimum KDF iterations, relaxed for test/CI environments. 

268 

269 Only relaxes when PYTEST_CURRENT_TEST (set automatically by pytest) or 

270 LDR_TEST_MODE (project-specific) is set. Generic env vars like CI or 

271 TESTING are NOT checked to avoid accidentally weakening production 

272 encryption in Docker/CD pipelines that set CI=true. 

273 """ 

274 is_testing = os.environ.get("PYTEST_CURRENT_TEST") or os.environ.get( 

275 "LDR_TEST_MODE" 

276 ) 

277 return ( 

278 MIN_KDF_ITERATIONS_TESTING 

279 if is_testing 

280 else MIN_KDF_ITERATIONS_PRODUCTION 

281 ) 

282 

283 

284# Legacy salt for backwards compatibility with databases created before v2. 

285# New databases use per-database random salts stored in .salt files. 

286# WARNING: Do NOT change this value - it would break all existing legacy databases! 

287LEGACY_PBKDF2_SALT = b"no salt" 

288 

289# Alias for backwards compatibility with code that references the old name 

290PBKDF2_PLACEHOLDER_SALT = LEGACY_PBKDF2_SALT 

291 

292 

293def get_sqlcipher_settings() -> dict: 

294 """ 

295 Get SQLCipher settings from environment variables or use defaults. 

296 

297 These settings cannot be changed after database creation, so they 

298 must be configured via environment variables only. 

299 

300 Settings are read via the env settings registry, which handles 

301 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback 

302 to deprecated names (LDR_DB_*) and deprecation warnings. 

303 

304 Returns: 

305 Dictionary with SQLCipher configuration 

306 """ 

307 # HMAC algorithm - registry validates against allowed values 

308 hmac_algorithm = get_env_setting( 

309 "db_config.hmac_algorithm", DEFAULT_HMAC_ALGORITHM 

310 ) 

311 

312 # KDF algorithm - registry validates against allowed values 

313 kdf_algorithm = get_env_setting( 

314 "db_config.kdf_algorithm", DEFAULT_KDF_ALGORITHM 

315 ) 

316 

317 # Page size - registry validates range, we also check power-of-2 

318 page_size = get_env_setting("db_config.page_size", DEFAULT_PAGE_SIZE) 

319 if page_size not in VALID_PAGE_SIZES: 

320 logger.warning( 

321 f"Invalid page_size value '{page_size}', using default " 

322 f"'{DEFAULT_PAGE_SIZE}'. Valid values: {sorted(VALID_PAGE_SIZES)}" 

323 ) 

324 page_size = DEFAULT_PAGE_SIZE 

325 

326 # KDF iterations - registry validates basic range, then apply CI-aware minimum 

327 kdf_iterations = get_env_setting( 

328 "db_config.kdf_iterations", DEFAULT_KDF_ITERATIONS 

329 ) 

330 min_kdf = _get_min_kdf_iterations() 

331 if not (min_kdf <= kdf_iterations <= MAX_KDF_ITERATIONS): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 logger.warning( 

333 f"KDF iterations value '{kdf_iterations}' outside safe range " 

334 f"[{min_kdf}, {MAX_KDF_ITERATIONS}], using default " 

335 f"'{DEFAULT_KDF_ITERATIONS}'." 

336 ) 

337 kdf_iterations = DEFAULT_KDF_ITERATIONS 

338 

339 return { 

340 "kdf_iterations": kdf_iterations, 

341 "page_size": page_size, 

342 "hmac_algorithm": hmac_algorithm, 

343 "kdf_algorithm": kdf_algorithm, 

344 } 

345 

346 

347def apply_cipher_defaults_before_key( 

348 cursor_or_conn: Any, 

349) -> None: 

350 """ 

351 Apply cipher_default_* pragmas BEFORE PRAGMA key for new database creation. 

352 

353 Per SQLCipher 4.x docs, cipher_default_* pragmas set the defaults that 

354 apply when a key is set on a NEW database. These MUST be called before 

355 PRAGMA key. 

356 

357 For EXISTING databases, cipher_page_size/cipher_hmac_algorithm/ 

358 cipher_kdf_algorithm are set AFTER the key via apply_sqlcipher_pragmas(). 

359 

360 Args: 

361 cursor_or_conn: SQLCipher cursor or connection object 

362 """ 

363 settings = get_sqlcipher_settings() 

364 

365 logger.debug( 

366 f"Applying cipher_default_* pragmas for new DB: settings={settings}" 

367 ) 

368 

369 cursor_or_conn.execute( 

370 f"PRAGMA cipher_default_page_size = {settings['page_size']}" 

371 ) 

372 cursor_or_conn.execute( 

373 f"PRAGMA cipher_default_hmac_algorithm = {settings['hmac_algorithm']}" 

374 ) 

375 cursor_or_conn.execute( 

376 f"PRAGMA cipher_default_kdf_algorithm = {settings['kdf_algorithm']}" 

377 ) 

378 

379 

380def apply_sqlcipher_pragmas( 

381 cursor_or_conn: Any, 

382 creation_mode: bool = False, 

383) -> None: 

384 """ 

385 Apply SQLCipher PRAGMA settings that are set AFTER the key. 

386 

387 For SQLCipher 4.x: 

388 - New databases: cipher_default_* are set before key via 

389 apply_cipher_defaults_before_key(). This function only sets kdf_iter. 

390 - Existing databases: cipher_page_size, cipher_hmac_algorithm, 

391 cipher_kdf_algorithm MUST be set AFTER the key (not before). 

392 This function handles that. 

393 

394 Args: 

395 cursor_or_conn: SQLCipher cursor or connection object 

396 creation_mode: If True, only sets kdf_iter (defaults already applied). 

397 If False, sets cipher_* settings + kdf_iter for existing DB. 

398 """ 

399 settings = get_sqlcipher_settings() 

400 

401 if not creation_mode: 

402 # For existing databases: cipher_* pragmas go AFTER the key 

403 cursor_or_conn.execute( 

404 f"PRAGMA cipher_page_size = {settings['page_size']}" 

405 ) 

406 cursor_or_conn.execute( 

407 f"PRAGMA cipher_hmac_algorithm = {settings['hmac_algorithm']}" 

408 ) 

409 cursor_or_conn.execute( 

410 f"PRAGMA cipher_kdf_algorithm = {settings['kdf_algorithm']}" 

411 ) 

412 

413 # kdf_iter can be set after the key (applies to future derivation) 

414 cursor_or_conn.execute(f"PRAGMA kdf_iter = {settings['kdf_iterations']}") 

415 

416 # cipher_memory_security is a runtime PRAGMA. ON zeroes SQLCipher buffers 

417 # and calls mlock() to prevent swap; OFF skips this. Defaulting to OFF 

418 # because the password already sits unprotected in Flask session, db_manager, 

419 # and thread-local storage — mlock on SQLCipher's buffers alone doesn't help. 

420 # Users can opt in with LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY=ON + IPC_LOCK. 

421 # Applied on every connection (not just creation) so env var overrides work. 

422 mem_security = get_env_setting("db_config.cipher_memory_security", "OFF") 

423 cursor_or_conn.execute(f"PRAGMA cipher_memory_security = {mem_security}") 

424 

425 

426def apply_performance_pragmas(cursor_or_conn: Any) -> None: 

427 """ 

428 Apply performance-related PRAGMA settings from environment variables. 

429 

430 Settings are read via the env settings registry, which handles 

431 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback 

432 to deprecated names (LDR_DB_*) and deprecation warnings. 

433 

434 Args: 

435 cursor_or_conn: SQLCipher cursor or connection object 

436 """ 

437 # Default values that are always applied 

438 cursor_or_conn.execute("PRAGMA temp_store = MEMORY") 

439 cursor_or_conn.execute("PRAGMA busy_timeout = 10000") # 10 second timeout 

440 

441 # Cache size - registry validates min/max range 

442 cache_mb = get_env_setting("db_config.cache_size_mb", 64) 

443 cache_pages = -(cache_mb * 1024) # Negative for KB cache size 

444 cursor_or_conn.execute(f"PRAGMA cache_size = {cache_pages}") 

445 

446 # Journal mode - registry validates against allowed values 

447 journal_mode = get_env_setting("db_config.journal_mode", "WAL") 

448 cursor_or_conn.execute(f"PRAGMA journal_mode = {journal_mode}") 

449 

450 # Synchronous mode - registry validates against allowed values 

451 sync_mode = get_env_setting("db_config.synchronous", "NORMAL") 

452 cursor_or_conn.execute(f"PRAGMA synchronous = {sync_mode}") 

453 

454 

455def verify_sqlcipher_connection(cursor_or_conn: Any) -> bool: 

456 """ 

457 Verify that the SQLCipher connection is working correctly. 

458 

459 Args: 

460 cursor_or_conn: SQLCipher cursor or connection object 

461 

462 Returns: 

463 True if the connection is valid, False otherwise 

464 """ 

465 try: 

466 cursor_or_conn.execute("SELECT 1") 

467 result = ( 

468 cursor_or_conn.fetchone() 

469 if hasattr(cursor_or_conn, "fetchone") 

470 else cursor_or_conn.execute("SELECT 1").fetchone() 

471 ) 

472 is_valid = result == (1,) 

473 if not is_valid: 

474 logger.error( 

475 f"SQLCipher verification failed: result {result} != (1,)" 

476 ) 

477 return is_valid 

478 except Exception: 

479 logger.exception("SQLCipher verification failed") 

480 return False 

481 

482 

483def get_sqlcipher_version(cursor_or_conn: Any) -> Optional[str]: 

484 """ 

485 Get the SQLCipher version string. 

486 

487 Args: 

488 cursor_or_conn: SQLCipher cursor or connection object 

489 

490 Returns: 

491 Version string (e.g. "4.6.1 community") or None if unavailable 

492 """ 

493 try: 

494 cursor_or_conn.execute("PRAGMA cipher_version") 

495 result = cursor_or_conn.fetchone() 

496 return result[0] if result else None 

497 except Exception: 

498 logger.debug("Could not query SQLCipher version", exc_info=True) 

499 return None 

500 

501 

502def create_sqlcipher_connection( 

503 db_path: Union[str, Path], 

504 password: Optional[str] = None, 

505 creation_mode: bool = False, 

506 connect_kwargs: Optional[Dict[str, Any]] = None, 

507 hex_key: Optional[str] = None, 

508) -> Any: 

509 """ 

510 Create a properly configured SQLCipher connection. 

511 

512 Implements the full PRAGMA sequence with proper error cleanup: 

513 - Creation: cipher_default_* -> key -> kdf_iter -> performance -> verify 

514 - Existing: key -> cipher_* + kdf_iter -> performance -> verify 

515 

516 Uses per-database salt if a .salt file exists alongside the database, 

517 otherwise falls back to legacy salt for backwards compatibility. 

518 

519 Args: 

520 db_path: Path to the database file 

521 password: The password for encryption (mutually exclusive with hex_key) 

522 creation_mode: If True, set cipher_default_* before key (new DB) 

523 connect_kwargs: Extra kwargs passed to sqlcipher3.connect() 

524 hex_key: Pre-derived hex key (skips PBKDF2 derivation) 

525 

526 Returns: 

527 SQLCipher connection object 

528 

529 Raises: 

530 ImportError: If sqlcipher3 is not available 

531 ValueError: If the connection cannot be established 

532 """ 

533 from .sqlcipher_compat import get_sqlcipher_module 

534 

535 try: 

536 sqlcipher3 = get_sqlcipher_module() 

537 except ImportError: 

538 raise ImportError( 

539 "sqlcipher3 is not available for encrypted databases. " 

540 "Ensure SQLCipher system library is installed, then run: pdm install" 

541 ) 

542 

543 conn = sqlcipher3.connect(str(db_path), **(connect_kwargs or {})) 

544 try: 

545 cursor = conn.cursor() 

546 

547 if creation_mode: 

548 with _cipher_default_lock: 

549 apply_cipher_defaults_before_key(cursor) 

550 

551 # Set encryption key (uses per-database salt when password + db_path) 

552 if hex_key: 

553 set_sqlcipher_key_from_hex(cursor, hex_key) 

554 elif password: 554 ↛ 557line 554 didn't jump to line 557 because the condition on line 554 was always true

555 set_sqlcipher_key(cursor, password, db_path=db_path) 

556 else: 

557 raise ValueError("Either password or hex_key must be provided") # noqa: TRY301 — except does connection cleanup before re-raise 

558 

559 # Apply post-key pragmas (cipher_* for existing, kdf_iter for both) 

560 apply_sqlcipher_pragmas(cursor, creation_mode=creation_mode) 

561 

562 # Apply performance settings 

563 apply_performance_pragmas(cursor) 

564 

565 # Verify connection works 

566 if not verify_sqlcipher_connection(cursor): 

567 raise ValueError( # noqa: TRY301 — except does connection cleanup before re-raise 

568 "Failed to establish encrypted database connection" 

569 ) 

570 

571 cursor.close() 

572 return conn 

573 except Exception: 

574 from ..utilities.resource_utils import safe_close 

575 

576 safe_close(conn, "SQLCipher connection") 

577 raise 

578 

579 

580# Backwards compatibility alias — old name still importable 

581apply_cipher_settings_before_key = apply_cipher_defaults_before_key