Coverage for src / local_deep_research / database / sqlcipher_utils.py: 97%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2SQLCipher utility functions for consistent database operations. 

3 

4This module centralizes all SQLCipher-specific operations to ensure 

5consistent password handling and PRAGMA settings across the codebase. 

6""" 

7 

8import os 

9import secrets 

10import threading 

11from functools import lru_cache 

12from hashlib import pbkdf2_hmac 

13from pathlib import Path 

14from typing import Any, Dict, Optional, Union 

15 

16from loguru import logger 

17 

18from ..settings.env_registry import get_env_setting 

19 

20# Lock to protect cipher_default_* global state during creation 

21_cipher_default_lock = threading.Lock() 

22 

23# Salt file constants 

24SALT_FILE_SUFFIX = ".salt" 

25SALT_SIZE = 32 # 256 bits 

26 

27 

28def get_salt_file_path(db_path: Union[str, Path]) -> Path: 

29 """ 

30 Get the path to the salt file for a database. 

31 

32 Args: 

33 db_path: Path to the database file 

34 

35 Returns: 

36 Path to the corresponding .salt file 

37 """ 

38 return Path(db_path).with_suffix(Path(db_path).suffix + SALT_FILE_SUFFIX) 

39 

40 

41def get_salt_for_database(db_path: Union[str, Path]) -> bytes: 

42 """ 

43 Get the salt for a database file. 

44 

45 For new databases (v2+): reads from the .salt file alongside the database. 

46 For legacy databases (v1): returns LEGACY_PBKDF2_SALT for backwards compatibility. 

47 

48 Args: 

49 db_path: Path to the database file 

50 

51 Returns: 

52 The salt bytes to use for key derivation 

53 """ 

54 salt_file = get_salt_file_path(db_path) 

55 

56 try: 

57 salt = salt_file.read_bytes() 

58 except FileNotFoundError: 

59 # v1: Legacy salt for backwards compatibility 

60 logger.warning( 

61 f"Database '{Path(db_path).name}' uses the legacy shared salt " 

62 f"(deprecated). Consider creating a new database to benefit from " 

63 f"per-database salt security. See issue #1439 for migration details." 

64 ) 

65 return LEGACY_PBKDF2_SALT 

66 

67 # v2: Per-database random salt 

68 if len(salt) != SALT_SIZE: 

69 raise ValueError( 

70 f"Salt file {salt_file} has unexpected size ({len(salt)} bytes), " 

71 f"expected {SALT_SIZE}. The salt file may be corrupted." 

72 ) 

73 return salt 

74 

75 

76def create_database_salt(db_path: Union[str, Path]) -> bytes: 

77 """ 

78 Create and store a new random salt for a database. 

79 

80 This should be called when creating a new database. 

81 The salt is stored in a .salt file alongside the database. 

82 

83 WARNING: If this salt file is deleted, the associated database becomes 

84 permanently unreadable. Always back up .salt files alongside their .db files. 

85 

86 Args: 

87 db_path: Path to the database file 

88 

89 Returns: 

90 The newly generated salt bytes 

91 

92 Raises: 

93 FileExistsError: If a salt file already exists for this database 

94 """ 

95 salt_file = get_salt_file_path(db_path) 

96 

97 if salt_file.exists(): 

98 raise FileExistsError( 

99 f"Salt file already exists: {salt_file}. " 

100 f"Refusing to overwrite to prevent data loss." 

101 ) 

102 

103 salt = secrets.token_bytes(SALT_SIZE) 

104 

105 # Ensure parent directory exists 

106 salt_file.parent.mkdir(parents=True, exist_ok=True) 

107 

108 # Write salt file with owner-only permissions (0o600) 

109 fd = os.open(str(salt_file), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) 

110 try: 

111 os.write(fd, salt) 

112 finally: 

113 os.close(fd) 

114 

115 logger.info(f"Created new database salt file: {salt_file}") 

116 return salt 

117 

118 

119def has_per_database_salt(db_path: Union[str, Path]) -> bool: 

120 """ 

121 Check if a database has a per-database salt file (v2). 

122 

123 Args: 

124 db_path: Path to the database file 

125 

126 Returns: 

127 True if the database has a .salt file, False otherwise 

128 """ 

129 return get_salt_file_path(db_path).exists() 

130 

131 

132@lru_cache(maxsize=16) 

133def _get_key_from_password( 

134 password: str, salt: bytes, kdf_iterations: int 

135) -> bytes: 

136 """ 

137 Generates an encryption key from the user's password and salt. 

138 

139 Cached for performance. The cache key includes (password, salt, kdf_iterations) 

140 so that different salts or iteration counts always produce different keys. 

141 

142 Args: 

143 password: The password. 

144 salt: The salt bytes to use for key derivation. 

145 kdf_iterations: Number of PBKDF2 iterations. 

146 

147 Returns: 

148 The generated key. 

149 """ 

150 logger.debug( 

151 f"Generating DB encryption key with {kdf_iterations} iterations..." 

152 ) 

153 

154 key = pbkdf2_hmac( 

155 "sha512", 

156 password.encode(), 

157 salt, 

158 kdf_iterations, 

159 ) 

160 

161 logger.debug("Generated DB encryption key.") 

162 return key 

163 

164 

165def get_key_from_password( 

166 password: str, db_path: Optional[Union[str, Path]] = None 

167) -> bytes: 

168 """ 

169 Wrapper that gets salt and settings, then calls the cached key derivation. 

170 

171 Args: 

172 password: The password. 

173 db_path: Optional path to the database file. If provided, uses 

174 per-database salt. If not provided, uses legacy salt. 

175 

176 Returns: 

177 The derived encryption key bytes. 

178 """ 

179 if db_path is not None: 

180 salt = get_salt_for_database(db_path) 

181 else: 

182 salt = LEGACY_PBKDF2_SALT 

183 

184 settings = get_sqlcipher_settings() 

185 return _get_key_from_password(password, salt, settings["kdf_iterations"]) 

186 

187 

188def set_sqlcipher_key( 

189 cursor_or_conn: Any, 

190 password: str, 

191 db_path: Optional[Union[str, Path]] = None, 

192) -> None: 

193 """ 

194 Set the SQLCipher encryption key using hexadecimal encoding. 

195 

196 This avoids SQL injection and escaping issues with special characters. 

197 

198 Args: 

199 cursor_or_conn: SQLCipher cursor or connection object 

200 password: The password to use for encryption 

201 db_path: Optional path to the database file. If provided, uses 

202 per-database salt. If not provided, uses legacy salt. 

203 """ 

204 key = get_key_from_password(password, db_path=db_path) # gitleaks:allow 

205 cursor_or_conn.execute(f"PRAGMA key = \"x'{key.hex()}'\"") 

206 

207 

208def set_sqlcipher_key_from_hex(cursor_or_conn: Any, hex_key: str) -> None: 

209 """ 

210 Set the SQLCipher encryption key from a pre-derived hex key string. 

211 

212 Used by connection closures to avoid capturing plaintext passwords. 

213 

214 Args: 

215 cursor_or_conn: SQLCipher cursor or connection object 

216 hex_key: Pre-derived hex key string (from get_key_from_password().hex()) 

217 """ 

218 cursor_or_conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # gitleaks:allow 

219 

220 

221def set_sqlcipher_rekey( 

222 cursor_or_conn: Any, 

223 new_password: str, 

224 db_path: Optional[Union[str, Path]] = None, 

225) -> None: 

226 """ 

227 Change the SQLCipher encryption key using hexadecimal encoding. 

228 

229 Uses the same PBKDF2 key derivation as set_sqlcipher_key() to ensure 

230 consistency when re-opening databases after password change. 

231 

232 Args: 

233 cursor_or_conn: SQLCipher cursor or connection object 

234 new_password: The new password to use for encryption 

235 db_path: Optional path to the database file. If provided, uses 

236 per-database salt. If not provided, uses legacy salt. 

237 """ 

238 # Use the same key derivation as set_sqlcipher_key for consistency 

239 key = get_key_from_password(new_password, db_path=db_path) # gitleaks:allow 

240 

241 # The hex encoding already prevents injection since it only contains [0-9a-f] 

242 safe_sql = f"PRAGMA rekey = \"x'{key.hex()}'\"" 

243 

244 try: 

245 # Try SQLAlchemy connection (needs text() wrapper) 

246 from sqlalchemy import text 

247 

248 cursor_or_conn.execute(text(safe_sql)) 

249 except TypeError: 

250 # Raw SQLCipher connection - use string directly 

251 cursor_or_conn.execute(safe_sql) 

252 

253 

254# Default SQLCipher configuration (can be overridden by settings) 

255DEFAULT_KDF_ITERATIONS = 256000 

256DEFAULT_PAGE_SIZE = 16384 # 16KB pages for maximum performance with caching 

257DEFAULT_HMAC_ALGORITHM = "HMAC_SHA512" 

258DEFAULT_KDF_ALGORITHM = "PBKDF2_HMAC_SHA512" 

259 

260# Valid page sizes (powers of 2 within the SQLite range). 

261# IntegerSetting validates min/max but not that the value is a power of 2, 

262# so we check against this set as an additional safeguard. 

263VALID_PAGE_SIZES = frozenset({512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}) 

264MAX_KDF_ITERATIONS = 1_000_000 

265 

266# Production minimum KDF iterations. Relaxed automatically in test/CI environments. 

267MIN_KDF_ITERATIONS_PRODUCTION = 100_000 

268MIN_KDF_ITERATIONS_TESTING = 1 

269 

270 

271def _get_min_kdf_iterations() -> int: 

272 """Get minimum KDF iterations, relaxed for test/CI environments. 

273 

274 Only relaxes when PYTEST_CURRENT_TEST (set automatically by pytest) or 

275 LDR_TEST_MODE (project-specific) is set. Generic env vars like CI or 

276 TESTING are NOT checked to avoid accidentally weakening production 

277 encryption in Docker/CD pipelines that set CI=true. 

278 """ 

279 is_testing = os.environ.get("PYTEST_CURRENT_TEST") or os.environ.get( 

280 "LDR_TEST_MODE" 

281 ) 

282 return ( 

283 MIN_KDF_ITERATIONS_TESTING 

284 if is_testing 

285 else MIN_KDF_ITERATIONS_PRODUCTION 

286 ) 

287 

288 

289# Legacy salt for backwards compatibility with databases created before v2. 

290# New databases use per-database random salts stored in .salt files. 

291# WARNING: Do NOT change this value - it would break all existing legacy databases! 

292LEGACY_PBKDF2_SALT = b"no salt" 

293 

294# Alias for backwards compatibility with code that references the old name 

295PBKDF2_PLACEHOLDER_SALT = LEGACY_PBKDF2_SALT 

296 

297 

298def get_sqlcipher_settings() -> dict: 

299 """ 

300 Get SQLCipher settings from environment variables or use defaults. 

301 

302 These settings cannot be changed after database creation, so they 

303 must be configured via environment variables only. 

304 

305 Settings are read via the env settings registry, which handles 

306 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback 

307 to deprecated names (LDR_DB_*) and deprecation warnings. 

308 

309 Returns: 

310 Dictionary with SQLCipher configuration 

311 """ 

312 # HMAC algorithm - registry validates against allowed values 

313 hmac_algorithm = get_env_setting( 

314 "db_config.hmac_algorithm", DEFAULT_HMAC_ALGORITHM 

315 ) 

316 

317 # KDF algorithm - registry validates against allowed values 

318 kdf_algorithm = get_env_setting( 

319 "db_config.kdf_algorithm", DEFAULT_KDF_ALGORITHM 

320 ) 

321 

322 # Page size - registry validates range, we also check power-of-2 

323 page_size = get_env_setting("db_config.page_size", DEFAULT_PAGE_SIZE) 

324 if page_size not in VALID_PAGE_SIZES: 

325 logger.warning( 

326 f"Invalid page_size value '{page_size}', using default " 

327 f"'{DEFAULT_PAGE_SIZE}'. Valid values: {sorted(VALID_PAGE_SIZES)}" 

328 ) 

329 page_size = DEFAULT_PAGE_SIZE 

330 

331 # KDF iterations - registry validates basic range, then apply CI-aware minimum 

332 kdf_iterations = get_env_setting( 

333 "db_config.kdf_iterations", DEFAULT_KDF_ITERATIONS 

334 ) 

335 min_kdf = _get_min_kdf_iterations() 

336 if not (min_kdf <= kdf_iterations <= MAX_KDF_ITERATIONS): 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 logger.warning( 

338 f"KDF iterations value '{kdf_iterations}' outside safe range " 

339 f"[{min_kdf}, {MAX_KDF_ITERATIONS}], using default " 

340 f"'{DEFAULT_KDF_ITERATIONS}'." 

341 ) 

342 kdf_iterations = DEFAULT_KDF_ITERATIONS 

343 

344 settings = { 

345 "kdf_iterations": kdf_iterations, 

346 "page_size": page_size, 

347 "hmac_algorithm": hmac_algorithm, 

348 "kdf_algorithm": kdf_algorithm, 

349 } 

350 

351 return settings 

352 

353 

354def apply_cipher_defaults_before_key( 

355 cursor_or_conn: Any, 

356) -> None: 

357 """ 

358 Apply cipher_default_* pragmas BEFORE PRAGMA key for new database creation. 

359 

360 Per SQLCipher 4.x docs, cipher_default_* pragmas set the defaults that 

361 apply when a key is set on a NEW database. These MUST be called before 

362 PRAGMA key. 

363 

364 For EXISTING databases, cipher_page_size/cipher_hmac_algorithm/ 

365 cipher_kdf_algorithm are set AFTER the key via apply_sqlcipher_pragmas(). 

366 

367 Args: 

368 cursor_or_conn: SQLCipher cursor or connection object 

369 """ 

370 settings = get_sqlcipher_settings() 

371 

372 logger.debug( 

373 f"Applying cipher_default_* pragmas for new DB: settings={settings}" 

374 ) 

375 

376 cursor_or_conn.execute( 

377 f"PRAGMA cipher_default_page_size = {settings['page_size']}" 

378 ) 

379 cursor_or_conn.execute( 

380 f"PRAGMA cipher_default_hmac_algorithm = {settings['hmac_algorithm']}" 

381 ) 

382 cursor_or_conn.execute( 

383 f"PRAGMA cipher_default_kdf_algorithm = {settings['kdf_algorithm']}" 

384 ) 

385 

386 

387def apply_sqlcipher_pragmas( 

388 cursor_or_conn: Any, 

389 creation_mode: bool = False, 

390) -> None: 

391 """ 

392 Apply SQLCipher PRAGMA settings that are set AFTER the key. 

393 

394 For SQLCipher 4.x: 

395 - New databases: cipher_default_* are set before key via 

396 apply_cipher_defaults_before_key(). This function only sets kdf_iter. 

397 - Existing databases: cipher_page_size, cipher_hmac_algorithm, 

398 cipher_kdf_algorithm MUST be set AFTER the key (not before). 

399 This function handles that. 

400 

401 Args: 

402 cursor_or_conn: SQLCipher cursor or connection object 

403 creation_mode: If True, only sets kdf_iter (defaults already applied). 

404 If False, sets cipher_* settings + kdf_iter for existing DB. 

405 """ 

406 settings = get_sqlcipher_settings() 

407 

408 if not creation_mode: 

409 # For existing databases: cipher_* pragmas go AFTER the key 

410 cursor_or_conn.execute( 

411 f"PRAGMA cipher_page_size = {settings['page_size']}" 

412 ) 

413 cursor_or_conn.execute( 

414 f"PRAGMA cipher_hmac_algorithm = {settings['hmac_algorithm']}" 

415 ) 

416 cursor_or_conn.execute( 

417 f"PRAGMA cipher_kdf_algorithm = {settings['kdf_algorithm']}" 

418 ) 

419 

420 # kdf_iter can be set after the key (applies to future derivation) 

421 cursor_or_conn.execute(f"PRAGMA kdf_iter = {settings['kdf_iterations']}") 

422 

423 # cipher_memory_security is a runtime PRAGMA. ON zeroes SQLCipher buffers 

424 # and calls mlock() to prevent swap; OFF skips this. Defaulting to OFF 

425 # because the password already sits unprotected in Flask session, db_manager, 

426 # and thread-local storage — mlock on SQLCipher's buffers alone doesn't help. 

427 # Users can opt in with LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY=ON + IPC_LOCK. 

428 # Applied on every connection (not just creation) so env var overrides work. 

429 mem_security = get_env_setting("db_config.cipher_memory_security", "OFF") 

430 cursor_or_conn.execute(f"PRAGMA cipher_memory_security = {mem_security}") 

431 

432 

433def apply_performance_pragmas(cursor_or_conn: Any) -> None: 

434 """ 

435 Apply performance-related PRAGMA settings from environment variables. 

436 

437 Settings are read via the env settings registry, which handles 

438 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback 

439 to deprecated names (LDR_DB_*) and deprecation warnings. 

440 

441 Args: 

442 cursor_or_conn: SQLCipher cursor or connection object 

443 """ 

444 # Default values that are always applied 

445 cursor_or_conn.execute("PRAGMA temp_store = MEMORY") 

446 cursor_or_conn.execute("PRAGMA busy_timeout = 10000") # 10 second timeout 

447 

448 # Cache size - registry validates min/max range 

449 cache_mb = get_env_setting("db_config.cache_size_mb", 64) 

450 cache_pages = -(cache_mb * 1024) # Negative for KB cache size 

451 cursor_or_conn.execute(f"PRAGMA cache_size = {cache_pages}") 

452 

453 # Journal mode - registry validates against allowed values 

454 journal_mode = get_env_setting("db_config.journal_mode", "WAL") 

455 cursor_or_conn.execute(f"PRAGMA journal_mode = {journal_mode}") 

456 

457 # Synchronous mode - registry validates against allowed values 

458 sync_mode = get_env_setting("db_config.synchronous", "NORMAL") 

459 cursor_or_conn.execute(f"PRAGMA synchronous = {sync_mode}") 

460 

461 

462def verify_sqlcipher_connection(cursor_or_conn: Any) -> bool: 

463 """ 

464 Verify that the SQLCipher connection is working correctly. 

465 

466 Args: 

467 cursor_or_conn: SQLCipher cursor or connection object 

468 

469 Returns: 

470 True if the connection is valid, False otherwise 

471 """ 

472 try: 

473 cursor_or_conn.execute("SELECT 1") 

474 result = ( 

475 cursor_or_conn.fetchone() 

476 if hasattr(cursor_or_conn, "fetchone") 

477 else cursor_or_conn.execute("SELECT 1").fetchone() 

478 ) 

479 is_valid = result == (1,) 

480 if not is_valid: 

481 logger.error( 

482 f"SQLCipher verification failed: result {result} != (1,)" 

483 ) 

484 return is_valid 

485 except Exception: 

486 logger.exception("SQLCipher verification failed") 

487 return False 

488 

489 

490def get_sqlcipher_version(cursor_or_conn: Any) -> Optional[str]: 

491 """ 

492 Get the SQLCipher version string. 

493 

494 Args: 

495 cursor_or_conn: SQLCipher cursor or connection object 

496 

497 Returns: 

498 Version string (e.g. "4.6.1 community") or None if unavailable 

499 """ 

500 try: 

501 cursor_or_conn.execute("PRAGMA cipher_version") 

502 result = cursor_or_conn.fetchone() 

503 return result[0] if result else None 

504 except Exception: 

505 return None 

506 

507 

508def create_sqlcipher_connection( 

509 db_path: Union[str, Path], 

510 password: Optional[str] = None, 

511 creation_mode: bool = False, 

512 connect_kwargs: Optional[Dict[str, Any]] = None, 

513 hex_key: Optional[str] = None, 

514) -> Any: 

515 """ 

516 Create a properly configured SQLCipher connection. 

517 

518 Implements the full PRAGMA sequence with proper error cleanup: 

519 - Creation: cipher_default_* -> key -> kdf_iter -> performance -> verify 

520 - Existing: key -> cipher_* + kdf_iter -> performance -> verify 

521 

522 Uses per-database salt if a .salt file exists alongside the database, 

523 otherwise falls back to legacy salt for backwards compatibility. 

524 

525 Args: 

526 db_path: Path to the database file 

527 password: The password for encryption (mutually exclusive with hex_key) 

528 creation_mode: If True, set cipher_default_* before key (new DB) 

529 connect_kwargs: Extra kwargs passed to sqlcipher3.connect() 

530 hex_key: Pre-derived hex key (skips PBKDF2 derivation) 

531 

532 Returns: 

533 SQLCipher connection object 

534 

535 Raises: 

536 ImportError: If sqlcipher3 is not available 

537 ValueError: If the connection cannot be established 

538 """ 

539 from .sqlcipher_compat import get_sqlcipher_module 

540 

541 try: 

542 sqlcipher3 = get_sqlcipher_module() 

543 except ImportError: 

544 raise ImportError( 

545 "sqlcipher3 is not available for encrypted databases. " 

546 "Ensure SQLCipher system library is installed, then run: pdm install" 

547 ) 

548 

549 conn = sqlcipher3.connect(str(db_path), **(connect_kwargs or {})) 

550 try: 

551 cursor = conn.cursor() 

552 

553 if creation_mode: 

554 with _cipher_default_lock: 

555 apply_cipher_defaults_before_key(cursor) 

556 

557 # Set encryption key (uses per-database salt when password + db_path) 

558 if hex_key: 

559 set_sqlcipher_key_from_hex(cursor, hex_key) 

560 elif password: 560 ↛ 563line 560 didn't jump to line 563 because the condition on line 560 was always true

561 set_sqlcipher_key(cursor, password, db_path=db_path) 

562 else: 

563 raise ValueError("Either password or hex_key must be provided") 

564 

565 # Apply post-key pragmas (cipher_* for existing, kdf_iter for both) 

566 apply_sqlcipher_pragmas(cursor, creation_mode=creation_mode) 

567 

568 # Apply performance settings 

569 apply_performance_pragmas(cursor) 

570 

571 # Verify connection works 

572 if not verify_sqlcipher_connection(cursor): 

573 raise ValueError( 

574 "Failed to establish encrypted database connection" 

575 ) 

576 

577 cursor.close() 

578 return conn 

579 except Exception: 

580 conn.close() 

581 raise 

582 

583 

584# Backwards compatibility alias — old name still importable 

585apply_cipher_settings_before_key = apply_cipher_defaults_before_key