Coverage for src / local_deep_research / database / sqlcipher_utils.py: 97%
160 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2SQLCipher utility functions for consistent database operations.
4This module centralizes all SQLCipher-specific operations to ensure
5consistent password handling and PRAGMA settings across the codebase.
6"""
8import os
9import secrets
10import threading
11from functools import lru_cache
12from hashlib import pbkdf2_hmac
13from pathlib import Path
14from typing import Any, Dict, Optional, Union
16from loguru import logger
18from ..settings.env_registry import get_env_setting
20# Lock to protect cipher_default_* global state during creation
21_cipher_default_lock = threading.Lock()
23# Salt file constants
24SALT_FILE_SUFFIX = ".salt"
25SALT_SIZE = 32 # 256 bits
28def get_salt_file_path(db_path: Union[str, Path]) -> Path:
29 """
30 Get the path to the salt file for a database.
32 Args:
33 db_path: Path to the database file
35 Returns:
36 Path to the corresponding .salt file
37 """
38 return Path(db_path).with_suffix(Path(db_path).suffix + SALT_FILE_SUFFIX)
41def get_salt_for_database(db_path: Union[str, Path]) -> bytes:
42 """
43 Get the salt for a database file.
45 For new databases (v2+): reads from the .salt file alongside the database.
46 For legacy databases (v1): returns LEGACY_PBKDF2_SALT for backwards compatibility.
48 Args:
49 db_path: Path to the database file
51 Returns:
52 The salt bytes to use for key derivation
53 """
54 salt_file = get_salt_file_path(db_path)
56 try:
57 salt = salt_file.read_bytes()
58 except FileNotFoundError:
59 # v1: Legacy salt for backwards compatibility
60 logger.warning(
61 f"Database '{Path(db_path).name}' uses the legacy shared salt "
62 f"(deprecated). Consider creating a new database to benefit from "
63 f"per-database salt security. See issue #1439 for migration details."
64 )
65 return LEGACY_PBKDF2_SALT
67 # v2: Per-database random salt
68 if len(salt) != SALT_SIZE:
69 raise ValueError(
70 f"Salt file {salt_file} has unexpected size ({len(salt)} bytes), "
71 f"expected {SALT_SIZE}. The salt file may be corrupted."
72 )
73 return salt
76def create_database_salt(db_path: Union[str, Path]) -> bytes:
77 """
78 Create and store a new random salt for a database.
80 This should be called when creating a new database.
81 The salt is stored in a .salt file alongside the database.
83 WARNING: If this salt file is deleted, the associated database becomes
84 permanently unreadable. Always back up .salt files alongside their .db files.
86 Args:
87 db_path: Path to the database file
89 Returns:
90 The newly generated salt bytes
92 Raises:
93 FileExistsError: If a salt file already exists for this database
94 """
95 salt_file = get_salt_file_path(db_path)
97 if salt_file.exists():
98 raise FileExistsError(
99 f"Salt file already exists: {salt_file}. "
100 f"Refusing to overwrite to prevent data loss."
101 )
103 salt = secrets.token_bytes(SALT_SIZE)
105 # Ensure parent directory exists
106 salt_file.parent.mkdir(parents=True, exist_ok=True)
108 # Write salt file with owner-only permissions (0o600)
109 fd = os.open(str(salt_file), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600)
110 try:
111 os.write(fd, salt)
112 finally:
113 os.close(fd)
115 logger.info(f"Created new database salt file: {salt_file}")
116 return salt
119def has_per_database_salt(db_path: Union[str, Path]) -> bool:
120 """
121 Check if a database has a per-database salt file (v2).
123 Args:
124 db_path: Path to the database file
126 Returns:
127 True if the database has a .salt file, False otherwise
128 """
129 return get_salt_file_path(db_path).exists()
132@lru_cache(maxsize=16)
133def _get_key_from_password(
134 password: str, salt: bytes, kdf_iterations: int
135) -> bytes:
136 """
137 Generates an encryption key from the user's password and salt.
139 Cached for performance. The cache key includes (password, salt, kdf_iterations)
140 so that different salts or iteration counts always produce different keys.
142 Args:
143 password: The password.
144 salt: The salt bytes to use for key derivation.
145 kdf_iterations: Number of PBKDF2 iterations.
147 Returns:
148 The generated key.
149 """
150 logger.debug(
151 f"Generating DB encryption key with {kdf_iterations} iterations..."
152 )
154 key = pbkdf2_hmac(
155 "sha512",
156 password.encode(),
157 salt,
158 kdf_iterations,
159 )
161 logger.debug("Generated DB encryption key.")
162 return key
165def get_key_from_password(
166 password: str, db_path: Optional[Union[str, Path]] = None
167) -> bytes:
168 """
169 Wrapper that gets salt and settings, then calls the cached key derivation.
171 Args:
172 password: The password.
173 db_path: Optional path to the database file. If provided, uses
174 per-database salt. If not provided, uses legacy salt.
176 Returns:
177 The derived encryption key bytes.
178 """
179 if db_path is not None:
180 salt = get_salt_for_database(db_path)
181 else:
182 salt = LEGACY_PBKDF2_SALT
184 settings = get_sqlcipher_settings()
185 return _get_key_from_password(password, salt, settings["kdf_iterations"])
188def set_sqlcipher_key(
189 cursor_or_conn: Any,
190 password: str,
191 db_path: Optional[Union[str, Path]] = None,
192) -> None:
193 """
194 Set the SQLCipher encryption key using hexadecimal encoding.
196 This avoids SQL injection and escaping issues with special characters.
198 Args:
199 cursor_or_conn: SQLCipher cursor or connection object
200 password: The password to use for encryption
201 db_path: Optional path to the database file. If provided, uses
202 per-database salt. If not provided, uses legacy salt.
203 """
204 key = get_key_from_password(password, db_path=db_path) # gitleaks:allow
205 cursor_or_conn.execute(f"PRAGMA key = \"x'{key.hex()}'\"")
208def set_sqlcipher_key_from_hex(cursor_or_conn: Any, hex_key: str) -> None:
209 """
210 Set the SQLCipher encryption key from a pre-derived hex key string.
212 Used by connection closures to avoid capturing plaintext passwords.
214 Args:
215 cursor_or_conn: SQLCipher cursor or connection object
216 hex_key: Pre-derived hex key string (from get_key_from_password().hex())
217 """
218 cursor_or_conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # gitleaks:allow
221def set_sqlcipher_rekey(
222 cursor_or_conn: Any,
223 new_password: str,
224 db_path: Optional[Union[str, Path]] = None,
225) -> None:
226 """
227 Change the SQLCipher encryption key using hexadecimal encoding.
229 Uses the same PBKDF2 key derivation as set_sqlcipher_key() to ensure
230 consistency when re-opening databases after password change.
232 Args:
233 cursor_or_conn: SQLCipher cursor or connection object
234 new_password: The new password to use for encryption
235 db_path: Optional path to the database file. If provided, uses
236 per-database salt. If not provided, uses legacy salt.
237 """
238 # Use the same key derivation as set_sqlcipher_key for consistency
239 key = get_key_from_password(new_password, db_path=db_path) # gitleaks:allow
241 # The hex encoding already prevents injection since it only contains [0-9a-f]
242 safe_sql = f"PRAGMA rekey = \"x'{key.hex()}'\""
244 try:
245 # Try SQLAlchemy connection (needs text() wrapper)
246 from sqlalchemy import text
248 cursor_or_conn.execute(text(safe_sql))
249 except TypeError:
250 # Raw SQLCipher connection - use string directly
251 cursor_or_conn.execute(safe_sql)
254# Default SQLCipher configuration (can be overridden by settings)
255DEFAULT_KDF_ITERATIONS = 256000
256DEFAULT_PAGE_SIZE = 16384 # 16KB pages for maximum performance with caching
257DEFAULT_HMAC_ALGORITHM = "HMAC_SHA512"
258DEFAULT_KDF_ALGORITHM = "PBKDF2_HMAC_SHA512"
260# Valid page sizes (powers of 2 within the SQLite range).
261# IntegerSetting validates min/max but not that the value is a power of 2,
262# so we check against this set as an additional safeguard.
263VALID_PAGE_SIZES = frozenset({512, 1024, 2048, 4096, 8192, 16384, 32768, 65536})
264MAX_KDF_ITERATIONS = 1_000_000
266# Production minimum KDF iterations. Relaxed automatically in test/CI environments.
267MIN_KDF_ITERATIONS_PRODUCTION = 100_000
268MIN_KDF_ITERATIONS_TESTING = 1
271def _get_min_kdf_iterations() -> int:
272 """Get minimum KDF iterations, relaxed for test/CI environments.
274 Only relaxes when PYTEST_CURRENT_TEST (set automatically by pytest) or
275 LDR_TEST_MODE (project-specific) is set. Generic env vars like CI or
276 TESTING are NOT checked to avoid accidentally weakening production
277 encryption in Docker/CD pipelines that set CI=true.
278 """
279 is_testing = os.environ.get("PYTEST_CURRENT_TEST") or os.environ.get(
280 "LDR_TEST_MODE"
281 )
282 return (
283 MIN_KDF_ITERATIONS_TESTING
284 if is_testing
285 else MIN_KDF_ITERATIONS_PRODUCTION
286 )
289# Legacy salt for backwards compatibility with databases created before v2.
290# New databases use per-database random salts stored in .salt files.
291# WARNING: Do NOT change this value - it would break all existing legacy databases!
292LEGACY_PBKDF2_SALT = b"no salt"
294# Alias for backwards compatibility with code that references the old name
295PBKDF2_PLACEHOLDER_SALT = LEGACY_PBKDF2_SALT
298def get_sqlcipher_settings() -> dict:
299 """
300 Get SQLCipher settings from environment variables or use defaults.
302 These settings cannot be changed after database creation, so they
303 must be configured via environment variables only.
305 Settings are read via the env settings registry, which handles
306 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback
307 to deprecated names (LDR_DB_*) and deprecation warnings.
309 Returns:
310 Dictionary with SQLCipher configuration
311 """
312 # HMAC algorithm - registry validates against allowed values
313 hmac_algorithm = get_env_setting(
314 "db_config.hmac_algorithm", DEFAULT_HMAC_ALGORITHM
315 )
317 # KDF algorithm - registry validates against allowed values
318 kdf_algorithm = get_env_setting(
319 "db_config.kdf_algorithm", DEFAULT_KDF_ALGORITHM
320 )
322 # Page size - registry validates range, we also check power-of-2
323 page_size = get_env_setting("db_config.page_size", DEFAULT_PAGE_SIZE)
324 if page_size not in VALID_PAGE_SIZES:
325 logger.warning(
326 f"Invalid page_size value '{page_size}', using default "
327 f"'{DEFAULT_PAGE_SIZE}'. Valid values: {sorted(VALID_PAGE_SIZES)}"
328 )
329 page_size = DEFAULT_PAGE_SIZE
331 # KDF iterations - registry validates basic range, then apply CI-aware minimum
332 kdf_iterations = get_env_setting(
333 "db_config.kdf_iterations", DEFAULT_KDF_ITERATIONS
334 )
335 min_kdf = _get_min_kdf_iterations()
336 if not (min_kdf <= kdf_iterations <= MAX_KDF_ITERATIONS): 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 logger.warning(
338 f"KDF iterations value '{kdf_iterations}' outside safe range "
339 f"[{min_kdf}, {MAX_KDF_ITERATIONS}], using default "
340 f"'{DEFAULT_KDF_ITERATIONS}'."
341 )
342 kdf_iterations = DEFAULT_KDF_ITERATIONS
344 settings = {
345 "kdf_iterations": kdf_iterations,
346 "page_size": page_size,
347 "hmac_algorithm": hmac_algorithm,
348 "kdf_algorithm": kdf_algorithm,
349 }
351 return settings
354def apply_cipher_defaults_before_key(
355 cursor_or_conn: Any,
356) -> None:
357 """
358 Apply cipher_default_* pragmas BEFORE PRAGMA key for new database creation.
360 Per SQLCipher 4.x docs, cipher_default_* pragmas set the defaults that
361 apply when a key is set on a NEW database. These MUST be called before
362 PRAGMA key.
364 For EXISTING databases, cipher_page_size/cipher_hmac_algorithm/
365 cipher_kdf_algorithm are set AFTER the key via apply_sqlcipher_pragmas().
367 Args:
368 cursor_or_conn: SQLCipher cursor or connection object
369 """
370 settings = get_sqlcipher_settings()
372 logger.debug(
373 f"Applying cipher_default_* pragmas for new DB: settings={settings}"
374 )
376 cursor_or_conn.execute(
377 f"PRAGMA cipher_default_page_size = {settings['page_size']}"
378 )
379 cursor_or_conn.execute(
380 f"PRAGMA cipher_default_hmac_algorithm = {settings['hmac_algorithm']}"
381 )
382 cursor_or_conn.execute(
383 f"PRAGMA cipher_default_kdf_algorithm = {settings['kdf_algorithm']}"
384 )
387def apply_sqlcipher_pragmas(
388 cursor_or_conn: Any,
389 creation_mode: bool = False,
390) -> None:
391 """
392 Apply SQLCipher PRAGMA settings that are set AFTER the key.
394 For SQLCipher 4.x:
395 - New databases: cipher_default_* are set before key via
396 apply_cipher_defaults_before_key(). This function only sets kdf_iter.
397 - Existing databases: cipher_page_size, cipher_hmac_algorithm,
398 cipher_kdf_algorithm MUST be set AFTER the key (not before).
399 This function handles that.
401 Args:
402 cursor_or_conn: SQLCipher cursor or connection object
403 creation_mode: If True, only sets kdf_iter (defaults already applied).
404 If False, sets cipher_* settings + kdf_iter for existing DB.
405 """
406 settings = get_sqlcipher_settings()
408 if not creation_mode:
409 # For existing databases: cipher_* pragmas go AFTER the key
410 cursor_or_conn.execute(
411 f"PRAGMA cipher_page_size = {settings['page_size']}"
412 )
413 cursor_or_conn.execute(
414 f"PRAGMA cipher_hmac_algorithm = {settings['hmac_algorithm']}"
415 )
416 cursor_or_conn.execute(
417 f"PRAGMA cipher_kdf_algorithm = {settings['kdf_algorithm']}"
418 )
420 # kdf_iter can be set after the key (applies to future derivation)
421 cursor_or_conn.execute(f"PRAGMA kdf_iter = {settings['kdf_iterations']}")
423 # cipher_memory_security is a runtime PRAGMA. ON zeroes SQLCipher buffers
424 # and calls mlock() to prevent swap; OFF skips this. Defaulting to OFF
425 # because the password already sits unprotected in Flask session, db_manager,
426 # and thread-local storage — mlock on SQLCipher's buffers alone doesn't help.
427 # Users can opt in with LDR_DB_CONFIG_CIPHER_MEMORY_SECURITY=ON + IPC_LOCK.
428 # Applied on every connection (not just creation) so env var overrides work.
429 mem_security = get_env_setting("db_config.cipher_memory_security", "OFF")
430 cursor_or_conn.execute(f"PRAGMA cipher_memory_security = {mem_security}")
433def apply_performance_pragmas(cursor_or_conn: Any) -> None:
434 """
435 Apply performance-related PRAGMA settings from environment variables.
437 Settings are read via the env settings registry, which handles
438 canonical env var names (LDR_DB_CONFIG_*) with automatic fallback
439 to deprecated names (LDR_DB_*) and deprecation warnings.
441 Args:
442 cursor_or_conn: SQLCipher cursor or connection object
443 """
444 # Default values that are always applied
445 cursor_or_conn.execute("PRAGMA temp_store = MEMORY")
446 cursor_or_conn.execute("PRAGMA busy_timeout = 10000") # 10 second timeout
448 # Cache size - registry validates min/max range
449 cache_mb = get_env_setting("db_config.cache_size_mb", 64)
450 cache_pages = -(cache_mb * 1024) # Negative for KB cache size
451 cursor_or_conn.execute(f"PRAGMA cache_size = {cache_pages}")
453 # Journal mode - registry validates against allowed values
454 journal_mode = get_env_setting("db_config.journal_mode", "WAL")
455 cursor_or_conn.execute(f"PRAGMA journal_mode = {journal_mode}")
457 # Synchronous mode - registry validates against allowed values
458 sync_mode = get_env_setting("db_config.synchronous", "NORMAL")
459 cursor_or_conn.execute(f"PRAGMA synchronous = {sync_mode}")
462def verify_sqlcipher_connection(cursor_or_conn: Any) -> bool:
463 """
464 Verify that the SQLCipher connection is working correctly.
466 Args:
467 cursor_or_conn: SQLCipher cursor or connection object
469 Returns:
470 True if the connection is valid, False otherwise
471 """
472 try:
473 cursor_or_conn.execute("SELECT 1")
474 result = (
475 cursor_or_conn.fetchone()
476 if hasattr(cursor_or_conn, "fetchone")
477 else cursor_or_conn.execute("SELECT 1").fetchone()
478 )
479 is_valid = result == (1,)
480 if not is_valid:
481 logger.error(
482 f"SQLCipher verification failed: result {result} != (1,)"
483 )
484 return is_valid
485 except Exception:
486 logger.exception("SQLCipher verification failed")
487 return False
490def get_sqlcipher_version(cursor_or_conn: Any) -> Optional[str]:
491 """
492 Get the SQLCipher version string.
494 Args:
495 cursor_or_conn: SQLCipher cursor or connection object
497 Returns:
498 Version string (e.g. "4.6.1 community") or None if unavailable
499 """
500 try:
501 cursor_or_conn.execute("PRAGMA cipher_version")
502 result = cursor_or_conn.fetchone()
503 return result[0] if result else None
504 except Exception:
505 return None
508def create_sqlcipher_connection(
509 db_path: Union[str, Path],
510 password: Optional[str] = None,
511 creation_mode: bool = False,
512 connect_kwargs: Optional[Dict[str, Any]] = None,
513 hex_key: Optional[str] = None,
514) -> Any:
515 """
516 Create a properly configured SQLCipher connection.
518 Implements the full PRAGMA sequence with proper error cleanup:
519 - Creation: cipher_default_* -> key -> kdf_iter -> performance -> verify
520 - Existing: key -> cipher_* + kdf_iter -> performance -> verify
522 Uses per-database salt if a .salt file exists alongside the database,
523 otherwise falls back to legacy salt for backwards compatibility.
525 Args:
526 db_path: Path to the database file
527 password: The password for encryption (mutually exclusive with hex_key)
528 creation_mode: If True, set cipher_default_* before key (new DB)
529 connect_kwargs: Extra kwargs passed to sqlcipher3.connect()
530 hex_key: Pre-derived hex key (skips PBKDF2 derivation)
532 Returns:
533 SQLCipher connection object
535 Raises:
536 ImportError: If sqlcipher3 is not available
537 ValueError: If the connection cannot be established
538 """
539 from .sqlcipher_compat import get_sqlcipher_module
541 try:
542 sqlcipher3 = get_sqlcipher_module()
543 except ImportError:
544 raise ImportError(
545 "sqlcipher3 is not available for encrypted databases. "
546 "Ensure SQLCipher system library is installed, then run: pdm install"
547 )
549 conn = sqlcipher3.connect(str(db_path), **(connect_kwargs or {}))
550 try:
551 cursor = conn.cursor()
553 if creation_mode:
554 with _cipher_default_lock:
555 apply_cipher_defaults_before_key(cursor)
557 # Set encryption key (uses per-database salt when password + db_path)
558 if hex_key:
559 set_sqlcipher_key_from_hex(cursor, hex_key)
560 elif password: 560 ↛ 563line 560 didn't jump to line 563 because the condition on line 560 was always true
561 set_sqlcipher_key(cursor, password, db_path=db_path)
562 else:
563 raise ValueError("Either password or hex_key must be provided")
565 # Apply post-key pragmas (cipher_* for existing, kdf_iter for both)
566 apply_sqlcipher_pragmas(cursor, creation_mode=creation_mode)
568 # Apply performance settings
569 apply_performance_pragmas(cursor)
571 # Verify connection works
572 if not verify_sqlcipher_connection(cursor):
573 raise ValueError(
574 "Failed to establish encrypted database connection"
575 )
577 cursor.close()
578 return conn
579 except Exception:
580 conn.close()
581 raise
584# Backwards compatibility alias — old name still importable
585apply_cipher_settings_before_key = apply_cipher_defaults_before_key