Coverage for src / local_deep_research / database / sqlcipher_utils.py: 71%
73 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2SQLCipher utility functions for consistent database operations.
4This module centralizes all SQLCipher-specific operations to ensure
5consistent password handling and PRAGMA settings across the codebase.
6"""
8from typing import Any, Optional
9from hashlib import pbkdf2_hmac
10from functools import cache
12from loguru import logger
15@cache
16def _get_key_from_password(password: str) -> bytes:
17 """
18 Generates an encryption key from the user's password.
20 Args:
21 password: The password.
23 Returns:
24 The generated key.
26 """
27 # Generate a secure key based on the password.
28 settings = get_sqlcipher_settings()
29 logger.info("Generating DB encryption key...")
31 # Use PBKDF2 to stretch the password into a hex key.
32 # Note: SQLCipher handles per-database salting internally, so this
33 # placeholder salt is acceptable. See PBKDF2_PLACEHOLDER_SALT docstring.
34 key = pbkdf2_hmac(
35 "sha512",
36 password.encode(),
37 PBKDF2_PLACEHOLDER_SALT,
38 settings["kdf_iterations"],
39 )
41 logger.info("Generated DB encryption key.")
43 return key
46def set_sqlcipher_key(cursor_or_conn: Any, password: str) -> None:
47 """
48 Set the SQLCipher encryption key using hexadecimal encoding.
50 This avoids SQL injection and escaping issues with special characters.
52 Args:
53 cursor_or_conn: SQLCipher cursor or connection object
54 password: The password to use for encryption
56 """
57 key = _get_key_from_password(password)
58 cursor_or_conn.execute(f"PRAGMA key = \"x'{key.hex()}'\"")
61def set_sqlcipher_rekey(cursor_or_conn: Any, new_password: str) -> None:
62 """
63 Change the SQLCipher encryption key using hexadecimal encoding.
65 Args:
66 cursor_or_conn: SQLCipher cursor or connection object
67 new_password: The new password to use for encryption
68 """
69 hex_password = new_password.encode("utf-8").hex()
71 # Handle SQLAlchemy connections that need text()
72 if hasattr(cursor_or_conn, "execute") and hasattr( 72 ↛ 85line 72 didn't jump to line 85 because the condition on line 72 was always true
73 cursor_or_conn.execute, "__self__"
74 ):
75 # This is likely a SQLAlchemy connection
76 from sqlalchemy import text
78 # SQLAlchemy doesn't support parameterized PRAGMA, so we use the safe hex encoding
79 # The hex encoding already prevents injection since it only contains [0-9a-f]
80 safe_sql = f"PRAGMA rekey = \"x'{hex_password}'\""
81 cursor_or_conn.execute(text(safe_sql))
82 else:
83 # Raw SQLCipher connection - also doesn't support parameterized PRAGMA
84 # The hex encoding already prevents injection since it only contains [0-9a-f]
85 safe_sql = f"PRAGMA rekey = \"x'{hex_password}'\""
86 cursor_or_conn.execute(safe_sql)
89# Default SQLCipher configuration (can be overridden by settings)
90# Reduced for testing - in production use higher values
91DEFAULT_KDF_ITERATIONS = 256000 # Reduced for testing (was 256000)
92DEFAULT_PAGE_SIZE = 16384 # 16KB pages for maximum performance with caching
93DEFAULT_HMAC_ALGORITHM = "HMAC_SHA512"
94DEFAULT_KDF_ALGORITHM = "PBKDF2_HMAC_SHA512"
96# SQLCipher handles per-database salting internally when using PRAGMA key.
97# This placeholder is used only for PBKDF2 key stretching to convert the
98# user's password into a hex key format. The actual encryption salt is
99# managed by SQLCipher on a per-database basis, providing proper salt
100# uniqueness. See: https://www.zetetic.net/sqlcipher/sqlcipher-api/#key
101# WARNING: Do NOT change this value - it would break all existing databases!
102PBKDF2_PLACEHOLDER_SALT = b"no salt"
105def get_sqlcipher_settings(username: Optional[str] = None) -> dict:
106 """
107 Get SQLCipher settings from environment variables or use defaults.
109 These settings cannot be changed after database creation, so they
110 must be configured via environment variables only.
112 Args:
113 username: Username to get settings for (not used anymore)
115 Returns:
116 Dictionary with SQLCipher configuration
117 """
118 import os
120 settings = {
121 "kdf_iterations": int(
122 os.environ.get("LDR_DB_KDF_ITERATIONS", DEFAULT_KDF_ITERATIONS)
123 ),
124 "page_size": int(os.environ.get("LDR_DB_PAGE_SIZE", DEFAULT_PAGE_SIZE)),
125 "hmac_algorithm": os.environ.get(
126 "LDR_DB_HMAC_ALGORITHM", DEFAULT_HMAC_ALGORITHM
127 ),
128 "kdf_algorithm": os.environ.get(
129 "LDR_DB_KDF_ALGORITHM", DEFAULT_KDF_ALGORITHM
130 ),
131 }
133 return settings
136def apply_sqlcipher_pragmas(
137 cursor_or_conn: Any,
138 creation_mode: bool = False,
139 username: Optional[str] = None,
140) -> None:
141 """
142 Apply standard SQLCipher PRAGMA settings.
144 Args:
145 cursor_or_conn: SQLCipher cursor or connection object
146 creation_mode: If True, applies settings for database creation.
147 If False, applies settings for existing database access.
148 username: Username to get settings for (if available)
149 """
150 # Get settings (from database if available, otherwise defaults)
151 settings = get_sqlcipher_settings(username)
153 # Core settings needed for both creation and access
154 cursor_or_conn.execute(f"PRAGMA cipher_page_size = {settings['page_size']}")
155 cursor_or_conn.execute(
156 f"PRAGMA cipher_hmac_algorithm = {settings['hmac_algorithm']}"
157 )
158 cursor_or_conn.execute(f"PRAGMA kdf_iter = {settings['kdf_iterations']}")
160 if creation_mode:
161 # Additional settings only needed during creation
162 cursor_or_conn.execute(
163 "PRAGMA cipher_memory_security = OFF"
164 ) # Better performance
167def apply_performance_pragmas(
168 cursor_or_conn: Any, username: Optional[str] = None
169) -> None:
170 """
171 Apply performance-related PRAGMA settings from environment variables.
173 These settings cannot be changed after database creation, so they
174 must be configured via environment variables only.
176 Args:
177 cursor_or_conn: SQLCipher cursor or connection object
178 username: Username to get settings for (not used anymore)
179 """
180 import os
182 # Default values that are always applied
183 cursor_or_conn.execute("PRAGMA temp_store = MEMORY")
184 cursor_or_conn.execute("PRAGMA busy_timeout = 10000") # 10 second timeout
186 # Get settings from environment variables
187 cache_mb = int(os.environ.get("LDR_DB_CACHE_SIZE_MB", "64"))
188 cache_pages = -(cache_mb * 1024) # Negative for KB cache size
189 cursor_or_conn.execute(f"PRAGMA cache_size = {cache_pages}")
191 journal_mode = os.environ.get("LDR_DB_JOURNAL_MODE", "WAL")
192 cursor_or_conn.execute(f"PRAGMA journal_mode = {journal_mode}")
194 sync_mode = os.environ.get("LDR_DB_SYNCHRONOUS", "NORMAL")
195 cursor_or_conn.execute(f"PRAGMA synchronous = {sync_mode}")
198def verify_sqlcipher_connection(cursor_or_conn: Any) -> bool:
199 """
200 Verify that the SQLCipher connection is working correctly.
202 Args:
203 cursor_or_conn: SQLCipher cursor or connection object
205 Returns:
206 True if the connection is valid, False otherwise
207 """
208 try:
209 # Force key derivation with test query
210 cursor_or_conn.execute("SELECT 1")
211 result = (
212 cursor_or_conn.fetchone()
213 if hasattr(cursor_or_conn, "fetchone")
214 else cursor_or_conn.execute("SELECT 1").fetchone()
215 )
216 return result == (1,)
217 except Exception:
218 logger.exception("SQLCipher verification failed")
219 return False
222def create_sqlcipher_connection(db_path: str, password: str) -> Any:
223 """
224 Create a properly configured SQLCipher connection.
226 Args:
227 db_path: Path to the database file
228 password: The password for encryption
230 Returns:
231 SQLCipher connection object
233 Raises:
234 ImportError: If sqlcipher3 is not available
235 ValueError: If the connection cannot be established
236 """
237 # Import the appropriate SQLCipher module
238 from .sqlcipher_compat import get_sqlcipher_module
240 try:
241 sqlcipher3 = get_sqlcipher_module()
242 except ImportError:
243 raise ImportError(
244 "sqlcipher3 is not available for encrypted databases. "
245 "Ensure SQLCipher system library is installed, then run: pdm install"
246 )
248 conn = sqlcipher3.connect(str(db_path))
249 cursor = conn.cursor()
251 # Set encryption key
252 set_sqlcipher_key(cursor, password)
254 # Apply SQLCipher settings
255 apply_sqlcipher_pragmas(cursor, creation_mode=False)
257 # Verify connection
258 if not verify_sqlcipher_connection(cursor):
259 conn.close()
260 raise ValueError("Failed to establish encrypted database connection")
262 # Apply performance settings
263 apply_performance_pragmas(cursor)
265 cursor.close()
266 return conn