Coverage for src / local_deep_research / database / sqlcipher_utils.py: 97%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2SQLCipher utility functions for consistent database operations.
4This module centralizes all SQLCipher-specific operations to ensure
5consistent password handling and PRAGMA settings across the codebase.
6"""
8import os
9import secrets
10import threading
11from hashlib import pbkdf2_hmac
12from pathlib import Path
13from typing import Any, Dict, Optional, Union
15from loguru import logger
17from ..settings.env_registry import get_env_setting
19# Lock to protect cipher_default_* global state during creation
20_cipher_default_lock = threading.Lock()
22# Salt file constants
23SALT_FILE_SUFFIX = ".salt"
24SALT_SIZE = 32 # 256 bits
27def get_salt_file_path(db_path: Union[str, Path]) -> Path:
28 """
29 Get the path to the salt file for a database.
31 Args:
32 db_path: Path to the database file
34 Returns:
35 Path to the corresponding .salt file
36 """
37 return Path(db_path).with_suffix(Path(db_path).suffix + SALT_FILE_SUFFIX)
40def get_salt_for_database(db_path: Union[str, Path]) -> bytes:
41 """
42 Get the salt for a database file.
44 For new databases (v2+): reads from the .salt file alongside the database.
45 For legacy databases (v1): returns LEGACY_PBKDF2_SALT for backwards compatibility.
47 Args:
48 db_path: Path to the database file
50 Returns:
51 The salt bytes to use for key derivation
52 """
53 salt_file = get_salt_file_path(db_path)
55 try:
56 salt = salt_file.read_bytes()
57 except FileNotFoundError:
58 # v1: Legacy salt for backwards compatibility
59 logger.warning(
60 f"Database '{Path(db_path).name}' uses the legacy shared salt "
61 f"(deprecated). Consider creating a new database to benefit from "
62 f"per-database salt security. See issue #1439 for migration details."
63 )
64 return LEGACY_PBKDF2_SALT
66 # v2: Per-database random salt
67 if len(salt) != SALT_SIZE:
68 raise ValueError(
69 f"Salt file {salt_file} has unexpected size ({len(salt)} bytes), "
70 f"expected {SALT_SIZE}. The salt file may be corrupted."
71 )
72 return salt
75def create_database_salt(db_path: Union[str, Path]) -> bytes:
76 """
77 Create and store a new random salt for a database.
79 This should be called when creating a new database.
80 The salt is stored in a .salt file alongside the database.
82 WARNING: If this salt file is deleted, the associated database becomes
83 permanently unreadable. Always back up .salt files alongside their .db files.
85 Args:
86 db_path: Path to the database file
88 Returns:
89 The newly generated salt bytes
91 Raises:
92 FileExistsError: If a salt file already exists for this database
93 """
94 salt_file = get_salt_file_path(db_path)
96 if salt_file.exists():
97 raise FileExistsError(
98 f"Salt file already exists: {salt_file}. "
99 f"Refusing to overwrite to prevent data loss."
100 )
102 salt = secrets.token_bytes(SALT_SIZE)
104 # Ensure parent directory exists
105 salt_file.parent.mkdir(parents=True, exist_ok=True)
107 # Write salt file with owner-only permissions (0o600)
108 fd = os.open(str(salt_file), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
109 try:
110 os.write(fd, salt)
111 finally:
112 os.close(fd)
114 logger.info(f"Created new database salt file: {salt_file}")
115 return salt
118def has_per_database_salt(db_path: Union[str, Path]) -> bool:
119 """
120 Check if a database has a per-database salt file (v2).
122 Args:
123 db_path: Path to the database file
125 Returns:
126 True if the database has a .salt file, False otherwise
127 """
128 return get_salt_file_path(db_path).exists()
131def _get_key_from_password(
132 password: str, salt: bytes, kdf_iterations: int
133) -> bytes:
134 """
135 Generates an encryption key from the user's password and salt.
137 Args:
138 password: The password.
139 salt: The salt bytes to use for key derivation.
140 kdf_iterations: Number of PBKDF2 iterations.
142 Returns:
143 The generated key.
144 """
145 logger.debug(
146 f"Generating DB encryption key with {kdf_iterations} iterations..."
147 )
149 key = pbkdf2_hmac(
150 "sha512",
151 password.encode(),
152 salt,
153 kdf_iterations,
154 )
156 logger.debug("Generated DB encryption key.")
157 return key
160def get_key_from_password(
161 password: str, db_path: Optional[Union[str, Path]] = None
162) -> bytes:
163 """
164 Wrapper that gets salt and settings, then calls the key derivation.
166 Args:
167 password: The password.
168 db_path: Optional path to the database file. If provided, uses
169 per-database salt. If not provided, uses legacy salt.
171 Returns:
172 The derived encryption key bytes.
173 """
174 if db_path is not None:
175 salt = get_salt_for_database(db_path)
176 else:
177 salt = LEGACY_PBKDF2_SALT
179 settings = get_sqlcipher_settings()
180 return _get_key_from_password(password, salt, settings["kdf_iterations"])
183def set_sqlcipher_key(
184 cursor_or_conn: Any,
185 password: str,
186 db_path: Optional[Union[str, Path]] = None,
187) -> None:
188 """
189 Set the SQLCipher encryption key using hexadecimal encoding.
191 This avoids SQL injection and escaping issues with special characters.
193 Args:
194 cursor_or_conn: SQLCipher cursor or connection object
195 password: The password to use for encryption
196 db_path: Optional path to the database file. If provided, uses
197 per-database salt. If not provided, uses legacy salt.
198 """
199 key = get_key_from_password(password, db_path=db_path) # gitleaks:allow
200 cursor_or_conn.execute(f"PRAGMA key = \"x'{key.hex()}'\"")
203def set_sqlcipher_key_from_hex(cursor_or_conn: Any, hex_key: str) -> None:
204 """
205 Set the SQLCipher encryption key from a pre-derived hex key string.
207 Used by connection closures to avoid capturing plaintext passwords.
209 Args:
210 cursor_or_conn: SQLCipher cursor or connection object
211 hex_key: Pre-derived hex key string (from get_key_from_password().hex())
212 """
213 cursor_or_conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # gitleaks:allow
216def set_sqlcipher_rekey(
217 cursor_or_conn: Any,
218 new_password: str,
219 db_path: Optional[Union[str, Path]] = None,
220) -> None:
221 """
222 Change the SQLCipher encryption key using hexadecimal encoding.
224 Uses the same PBKDF2 key derivation as set_sqlcipher_key() to ensure
225 consistency when re-opening databases after password change.
227 Args:
228 cursor_or_conn: SQLCipher cursor or connection object
229 new_password: The new password to use for encryption
230 db_path: Optional path to the database file. If provided, uses
231 per-database salt. If not provided, uses legacy salt.
232 """
233 # Use the same key derivation as set_sqlcipher_key for consistency
234 key = get_key_from_password(new_password, db_path=db_path) # gitleaks:allow
236 # The hex encoding already prevents injection since it only contains [0-9a-f]
237 safe_sql = f"PRAGMA rekey = \"x'{key.hex()}'\""
239 try:
240 # Try SQLAlchemy connection (needs text() wrapper)
241 from sqlalchemy import text
243 cursor_or_conn.execute(text(safe_sql))
244 except TypeError:
245 # Raw SQLCipher connection - use string directly
246 cursor_or_conn.execute(safe_sql)
249# Default SQLCipher configuration (can be overridden by settings)
250DEFAULT_KDF_ITERATIONS = 256000
251DEFAULT_PAGE_SIZE = 16384 # 16KB pages for maximum performance with caching
252DEFAULT_HMAC_ALGORITHM = "HMAC_SHA512"
253DEFAULT_KDF_ALGORITHM = "PBKDF2_HMAC_SHA512"
255# Valid page sizes (powers of 2 within the SQLite range).
256# IntegerSetting validates min/max but not that the value is a power of 2,
257# so we check against this set as an additional safeguard.
258VALID_PAGE_SIZES = frozenset({512, 1024, 2048, 4096, 8192, 16384, 32768, 65536})
259MAX_KDF_ITERATIONS = 1_000_000
261# Production minimum KDF iterations. Relaxed automatically in test/CI environments.
262MIN_KDF_ITERATIONS_PRODUCTION = 100_000
263MIN_KDF_ITERATIONS_TESTING = 1
266def _get_min_kdf_iterations() -> int:
267 """Get minimum KDF iterations, relaxed for test/CI environments.
269 Only relaxes when PYTEST_CURRENT_TEST (set automatically by pytest) or
270 LDR_TEST_MODE (project-specific) is set. Generic env vars like CI or
271 TESTING are NOT checked to avoid accidentally weakening production
272 encryption in Docker/CD pipelines that set CI=true.
273 """
274 is_testing = os.environ.get("PYTEST_CURRENT_TEST") or os.environ.get(
275 "LDR_TEST_MODE"
276 )
277 return (
278 MIN_KDF_ITERATIONS_TESTING
279 if is_testing
280 else MIN_KDF_ITERATIONS_PRODUCTION
281 )
284# Legacy salt for backwards compatibility with databases created before v2.
285# New databases use per-database random salts stored in .salt files.
286# WARNING: Do NOT change this value - it would break all existing legacy databases!
287LEGACY_PBKDF2_SALT = b"no salt"
289# Alias for backwards compatibility with code that references the old name
290PBKDF2_PLACEHOLDER_SALT = LEGACY_PBKDF2_SALT
293def get_sqlcipher_settings() -> dict:
294 """
295 Get SQLCipher settings from environment variables or use defaults.
297 These settings cannot be changed after database creation, so they
298 must be configured via environment variables only.
300 Settings are read via the env settings registry, which handles
301 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback
302 to deprecated names (LDR_DB_*) and deprecation warnings.
304 Returns:
305 Dictionary with SQLCipher configuration
306 """
307 # HMAC algorithm - registry validates against allowed values
308 hmac_algorithm = get_env_setting(
309 "db_config.hmac_algorithm", DEFAULT_HMAC_ALGORITHM
310 )
312 # KDF algorithm - registry validates against allowed values
313 kdf_algorithm = get_env_setting(
314 "db_config.kdf_algorithm", DEFAULT_KDF_ALGORITHM
315 )
317 # Page size - registry validates range, we also check power-of-2
318 page_size = get_env_setting("db_config.page_size", DEFAULT_PAGE_SIZE)
319 if page_size not in VALID_PAGE_SIZES:
320 logger.warning(
321 f"Invalid page_size value '{page_size}', using default "
322 f"'{DEFAULT_PAGE_SIZE}'. Valid values: {sorted(VALID_PAGE_SIZES)}"
323 )
324 page_size = DEFAULT_PAGE_SIZE
326 # KDF iterations - registry validates basic range, then apply CI-aware minimum
327 kdf_iterations = get_env_setting(
328 "db_config.kdf_iterations", DEFAULT_KDF_ITERATIONS
329 )
330 min_kdf = _get_min_kdf_iterations()
331 if not (min_kdf <= kdf_iterations <= MAX_KDF_ITERATIONS): 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 logger.warning(
333 f"KDF iterations value '{kdf_iterations}' outside safe range "
334 f"[{min_kdf}, {MAX_KDF_ITERATIONS}], using default "
335 f"'{DEFAULT_KDF_ITERATIONS}'."
336 )
337 kdf_iterations = DEFAULT_KDF_ITERATIONS
339 return {
340 "kdf_iterations": kdf_iterations,
341 "page_size": page_size,
342 "hmac_algorithm": hmac_algorithm,
343 "kdf_algorithm": kdf_algorithm,
344 }
347def apply_cipher_defaults_before_key(
348 cursor_or_conn: Any,
349) -> None:
350 """
351 Apply cipher_default_* pragmas BEFORE PRAGMA key for new database creation.
353 Per SQLCipher 4.x docs, cipher_default_* pragmas set the defaults that
354 apply when a key is set on a NEW database. These MUST be called before
355 PRAGMA key.
357 For EXISTING databases, cipher_page_size/cipher_hmac_algorithm/
358 cipher_kdf_algorithm are set AFTER the key via apply_sqlcipher_pragmas().
360 Args:
361 cursor_or_conn: SQLCipher cursor or connection object
362 """
363 settings = get_sqlcipher_settings()
365 logger.debug(
366 f"Applying cipher_default_* pragmas for new DB: settings={settings}"
367 )
369 cursor_or_conn.execute(
370 f"PRAGMA cipher_default_page_size = {settings['page_size']}"
371 )
372 cursor_or_conn.execute(
373 f"PRAGMA cipher_default_hmac_algorithm = {settings['hmac_algorithm']}"
374 )
375 cursor_or_conn.execute(
376 f"PRAGMA cipher_default_kdf_algorithm = {settings['kdf_algorithm']}"
377 )
380def apply_sqlcipher_pragmas(
381 cursor_or_conn: Any,
382 creation_mode: bool = False,
383) -> None:
384 """
385 Apply SQLCipher PRAGMA settings that are set AFTER the key.
387 For SQLCipher 4.x:
388 - New databases: cipher_default_* are set before key via
389 apply_cipher_defaults_before_key(). This function only sets kdf_iter.
390 - Existing databases: cipher_page_size, cipher_hmac_algorithm,
391 cipher_kdf_algorithm MUST be set AFTER the key (not before).
392 This function handles that.
394 Args:
395 cursor_or_conn: SQLCipher cursor or connection object
396 creation_mode: If True, only sets kdf_iter (defaults already applied).
397 If False, sets cipher_* settings + kdf_iter for existing DB.
398 """
399 settings = get_sqlcipher_settings()
401 if not creation_mode:
402 # For existing databases: cipher_* pragmas go AFTER the key
403 cursor_or_conn.execute(
404 f"PRAGMA cipher_page_size = {settings['page_size']}"
405 )
406 cursor_or_conn.execute(
407 f"PRAGMA cipher_hmac_algorithm = {settings['hmac_algorithm']}"
408 )
409 cursor_or_conn.execute(
410 f"PRAGMA cipher_kdf_algorithm = {settings['kdf_algorithm']}"
411 )
413 # kdf_iter can be set after the key (applies to future derivation)
414 cursor_or_conn.execute(f"PRAGMA kdf_iter = {settings['kdf_iterations']}")
416 # cipher_memory_security is a runtime PRAGMA. ON zeroes SQLCipher buffers
417 # and calls mlock() to prevent swap; OFF skips this. Defaulting to OFF
418 # because the password already sits unprotected in Flask session, db_manager,
419 # and thread-local storage — mlock on SQLCipher's buffers alone doesn't help.
420 # Users can opt in with LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY=ON + IPC_LOCK.
421 # Applied on every connection (not just creation) so env var overrides work.
422 mem_security = get_env_setting("db_config.cipher_memory_security", "OFF")
423 cursor_or_conn.execute(f"PRAGMA cipher_memory_security = {mem_security}")
426def apply_performance_pragmas(cursor_or_conn: Any) -> None:
427 """
428 Apply performance-related PRAGMA settings from environment variables.
430 Settings are read via the env settings registry, which handles
431 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback
432 to deprecated names (LDR_DB_*) and deprecation warnings.
434 Args:
435 cursor_or_conn: SQLCipher cursor or connection object
436 """
437 # Default values that are always applied
438 cursor_or_conn.execute("PRAGMA temp_store = MEMORY")
439 cursor_or_conn.execute("PRAGMA busy_timeout = 10000") # 10 second timeout
441 # Cache size - registry validates min/max range
442 cache_mb = get_env_setting("db_config.cache_size_mb", 64)
443 cache_pages = -(cache_mb * 1024) # Negative for KB cache size
444 cursor_or_conn.execute(f"PRAGMA cache_size = {cache_pages}")
446 # Journal mode - registry validates against allowed values
447 journal_mode = get_env_setting("db_config.journal_mode", "WAL")
448 cursor_or_conn.execute(f"PRAGMA journal_mode = {journal_mode}")
450 # Synchronous mode - registry validates against allowed values
451 sync_mode = get_env_setting("db_config.synchronous", "NORMAL")
452 cursor_or_conn.execute(f"PRAGMA synchronous = {sync_mode}")
455def verify_sqlcipher_connection(cursor_or_conn: Any) -> bool:
456 """
457 Verify that the SQLCipher connection is working correctly.
459 Args:
460 cursor_or_conn: SQLCipher cursor or connection object
462 Returns:
463 True if the connection is valid, False otherwise
464 """
465 try:
466 cursor_or_conn.execute("SELECT 1")
467 result = (
468 cursor_or_conn.fetchone()
469 if hasattr(cursor_or_conn, "fetchone")
470 else cursor_or_conn.execute("SELECT 1").fetchone()
471 )
472 is_valid = result == (1,)
473 if not is_valid:
474 logger.error(
475 f"SQLCipher verification failed: result {result} != (1,)"
476 )
477 return is_valid
478 except Exception:
479 logger.exception("SQLCipher verification failed")
480 return False
483def get_sqlcipher_version(cursor_or_conn: Any) -> Optional[str]:
484 """
485 Get the SQLCipher version string.
487 Args:
488 cursor_or_conn: SQLCipher cursor or connection object
490 Returns:
491 Version string (e.g. "4.6.1 community") or None if unavailable
492 """
493 try:
494 cursor_or_conn.execute("PRAGMA cipher_version")
495 result = cursor_or_conn.fetchone()
496 return result[0] if result else None
497 except Exception:
498 logger.debug("Could not query SQLCipher version", exc_info=True)
499 return None
502def create_sqlcipher_connection(
503 db_path: Union[str, Path],
504 password: Optional[str] = None,
505 creation_mode: bool = False,
506 connect_kwargs: Optional[Dict[str, Any]] = None,
507 hex_key: Optional[str] = None,
508) -> Any:
509 """
510 Create a properly configured SQLCipher connection.
512 Implements the full PRAGMA sequence with proper error cleanup:
513 - Creation: cipher_default_* -> key -> kdf_iter -> performance -> verify
514 - Existing: key -> cipher_* + kdf_iter -> performance -> verify
516 Uses per-database salt if a .salt file exists alongside the database,
517 otherwise falls back to legacy salt for backwards compatibility.
519 Args:
520 db_path: Path to the database file
521 password: The password for encryption (mutually exclusive with hex_key)
522 creation_mode: If True, set cipher_default_* before key (new DB)
523 connect_kwargs: Extra kwargs passed to sqlcipher3.connect()
524 hex_key: Pre-derived hex key (skips PBKDF2 derivation)
526 Returns:
527 SQLCipher connection object
529 Raises:
530 ImportError: If sqlcipher3 is not available
531 ValueError: If the connection cannot be established
532 """
533 from .sqlcipher_compat import get_sqlcipher_module
535 try:
536 sqlcipher3 = get_sqlcipher_module()
537 except ImportError:
538 raise ImportError(
539 "sqlcipher3 is not available for encrypted databases. "
540 "Ensure SQLCipher system library is installed, then run: pdm install"
541 )
543 conn = sqlcipher3.connect(str(db_path), **(connect_kwargs or {}))
544 try:
545 cursor = conn.cursor()
547 if creation_mode:
548 with _cipher_default_lock:
549 apply_cipher_defaults_before_key(cursor)
551 # Set encryption key (uses per-database salt when password + db_path)
552 if hex_key:
553 set_sqlcipher_key_from_hex(cursor, hex_key)
554 elif password: 554 ↛ 557line 554 didn't jump to line 557 because the condition on line 554 was always true
555 set_sqlcipher_key(cursor, password, db_path=db_path)
556 else:
557 raise ValueError("Either password or hex_key must be provided") # noqa: TRY301 — except does connection cleanup before re-raise
559 # Apply post-key pragmas (cipher_* for existing, kdf_iter for both)
560 apply_sqlcipher_pragmas(cursor, creation_mode=creation_mode)
562 # Apply performance settings
563 apply_performance_pragmas(cursor)
565 # Verify connection works
566 if not verify_sqlcipher_connection(cursor):
567 raise ValueError( # noqa: TRY301 — except does connection cleanup before re-raise
568 "Failed to establish encrypted database connection"
569 )
571 cursor.close()
572 return conn
573 except Exception:
574 from ..utilities.resource_utils import safe_close
576 safe_close(conn, "SQLCipher connection")
577 raise
580# Backwards compatibility alias — old name still importable
581apply_cipher_settings_before_key = apply_cipher_defaults_before_key