Coverage for src / local_deep_research / database / sqlcipher_utils.py: 71%

73 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2SQLCipher utility functions for consistent database operations. 

3 

4This module centralizes all SQLCipher-specific operations to ensure 

5consistent password handling and PRAGMA settings across the codebase. 

6""" 

7 

8from typing import Any, Optional 

9from hashlib import pbkdf2_hmac 

10from functools import cache 

11 

12from loguru import logger 

13 

14 

15@cache 

16def _get_key_from_password(password: str) -> bytes: 

17 """ 

18 Generates an encryption key from the user's password. 

19 

20 Args: 

21 password: The password. 

22 

23 Returns: 

24 The generated key. 

25 

26 """ 

27 # Generate a secure key based on the password. 

28 settings = get_sqlcipher_settings() 

29 logger.info("Generating DB encryption key...") 

30 

31 # Use PBKDF2 to stretch the password into a hex key. 

32 # Note: SQLCipher handles per-database salting internally, so this 

33 # placeholder salt is acceptable. See PBKDF2_PLACEHOLDER_SALT docstring. 

34 key = pbkdf2_hmac( 

35 "sha512", 

36 password.encode(), 

37 PBKDF2_PLACEHOLDER_SALT, 

38 settings["kdf_iterations"], 

39 ) 

40 

41 logger.info("Generated DB encryption key.") 

42 

43 return key 

44 

45 

46def set_sqlcipher_key(cursor_or_conn: Any, password: str) -> None: 

47 """ 

48 Set the SQLCipher encryption key using hexadecimal encoding. 

49 

50 This avoids SQL injection and escaping issues with special characters. 

51 

52 Args: 

53 cursor_or_conn: SQLCipher cursor or connection object 

54 password: The password to use for encryption 

55 

56 """ 

57 key = _get_key_from_password(password) 

58 cursor_or_conn.execute(f"PRAGMA key = \"x'{key.hex()}'\"") 

59 

60 

61def set_sqlcipher_rekey(cursor_or_conn: Any, new_password: str) -> None: 

62 """ 

63 Change the SQLCipher encryption key using hexadecimal encoding. 

64 

65 Args: 

66 cursor_or_conn: SQLCipher cursor or connection object 

67 new_password: The new password to use for encryption 

68 """ 

69 hex_password = new_password.encode("utf-8").hex() 

70 

71 # Handle SQLAlchemy connections that need text() 

72 if hasattr(cursor_or_conn, "execute") and hasattr( 72 ↛ 85line 72 didn't jump to line 85 because the condition on line 72 was always true

73 cursor_or_conn.execute, "__self__" 

74 ): 

75 # This is likely a SQLAlchemy connection 

76 from sqlalchemy import text 

77 

78 # SQLAlchemy doesn't support parameterized PRAGMA, so we use the safe hex encoding 

79 # The hex encoding already prevents injection since it only contains [0-9a-f] 

80 safe_sql = f"PRAGMA rekey = \"x'{hex_password}'\"" 

81 cursor_or_conn.execute(text(safe_sql)) 

82 else: 

83 # Raw SQLCipher connection - also doesn't support parameterized PRAGMA 

84 # The hex encoding already prevents injection since it only contains [0-9a-f] 

85 safe_sql = f"PRAGMA rekey = \"x'{hex_password}'\"" 

86 cursor_or_conn.execute(safe_sql) 

87 

88 

89# Default SQLCipher configuration (can be overridden by settings) 

90# Reduced for testing - in production use higher values 

91DEFAULT_KDF_ITERATIONS = 256000 # Reduced for testing (was 256000) 

92DEFAULT_PAGE_SIZE = 16384 # 16KB pages for maximum performance with caching 

93DEFAULT_HMAC_ALGORITHM = "HMAC_SHA512" 

94DEFAULT_KDF_ALGORITHM = "PBKDF2_HMAC_SHA512" 

95 

96# SQLCipher handles per-database salting internally when using PRAGMA key. 

97# This placeholder is used only for PBKDF2 key stretching to convert the 

98# user's password into a hex key format. The actual encryption salt is 

99# managed by SQLCipher on a per-database basis, providing proper salt 

100# uniqueness. See: https://www.zetetic.net/sqlcipher/sqlcipher-api/#key 

101# WARNING: Do NOT change this value - it would break all existing databases! 

102PBKDF2_PLACEHOLDER_SALT = b"no salt" 

103 

104 

105def get_sqlcipher_settings(username: Optional[str] = None) -> dict: 

106 """ 

107 Get SQLCipher settings from environment variables or use defaults. 

108 

109 These settings cannot be changed after database creation, so they 

110 must be configured via environment variables only. 

111 

112 Args: 

113 username: Username to get settings for (not used anymore) 

114 

115 Returns: 

116 Dictionary with SQLCipher configuration 

117 """ 

118 import os 

119 

120 settings = { 

121 "kdf_iterations": int( 

122 os.environ.get("LDR_DB_KDF_ITERATIONS", DEFAULT_KDF_ITERATIONS) 

123 ), 

124 "page_size": int(os.environ.get("LDR_DB_PAGE_SIZE", DEFAULT_PAGE_SIZE)), 

125 "hmac_algorithm": os.environ.get( 

126 "LDR_DB_HMAC_ALGORITHM", DEFAULT_HMAC_ALGORITHM 

127 ), 

128 "kdf_algorithm": os.environ.get( 

129 "LDR_DB_KDF_ALGORITHM", DEFAULT_KDF_ALGORITHM 

130 ), 

131 } 

132 

133 return settings 

134 

135 

136def apply_sqlcipher_pragmas( 

137 cursor_or_conn: Any, 

138 creation_mode: bool = False, 

139 username: Optional[str] = None, 

140) -> None: 

141 """ 

142 Apply standard SQLCipher PRAGMA settings. 

143 

144 Args: 

145 cursor_or_conn: SQLCipher cursor or connection object 

146 creation_mode: If True, applies settings for database creation. 

147 If False, applies settings for existing database access. 

148 username: Username to get settings for (if available) 

149 """ 

150 # Get settings (from database if available, otherwise defaults) 

151 settings = get_sqlcipher_settings(username) 

152 

153 # Core settings needed for both creation and access 

154 cursor_or_conn.execute(f"PRAGMA cipher_page_size = {settings['page_size']}") 

155 cursor_or_conn.execute( 

156 f"PRAGMA cipher_hmac_algorithm = {settings['hmac_algorithm']}" 

157 ) 

158 cursor_or_conn.execute(f"PRAGMA kdf_iter = {settings['kdf_iterations']}") 

159 

160 if creation_mode: 

161 # Additional settings only needed during creation 

162 cursor_or_conn.execute( 

163 "PRAGMA cipher_memory_security = OFF" 

164 ) # Better performance 

165 

166 

167def apply_performance_pragmas( 

168 cursor_or_conn: Any, username: Optional[str] = None 

169) -> None: 

170 """ 

171 Apply performance-related PRAGMA settings from environment variables. 

172 

173 These settings cannot be changed after database creation, so they 

174 must be configured via environment variables only. 

175 

176 Args: 

177 cursor_or_conn: SQLCipher cursor or connection object 

178 username: Username to get settings for (not used anymore) 

179 """ 

180 import os 

181 

182 # Default values that are always applied 

183 cursor_or_conn.execute("PRAGMA temp_store = MEMORY") 

184 cursor_or_conn.execute("PRAGMA busy_timeout = 10000") # 10 second timeout 

185 

186 # Get settings from environment variables 

187 cache_mb = int(os.environ.get("LDR_DB_CACHE_SIZE_MB", "64")) 

188 cache_pages = -(cache_mb * 1024) # Negative for KB cache size 

189 cursor_or_conn.execute(f"PRAGMA cache_size = {cache_pages}") 

190 

191 journal_mode = os.environ.get("LDR_DB_JOURNAL_MODE", "WAL") 

192 cursor_or_conn.execute(f"PRAGMA journal_mode = {journal_mode}") 

193 

194 sync_mode = os.environ.get("LDR_DB_SYNCHRONOUS", "NORMAL") 

195 cursor_or_conn.execute(f"PRAGMA synchronous = {sync_mode}") 

196 

197 

198def verify_sqlcipher_connection(cursor_or_conn: Any) -> bool: 

199 """ 

200 Verify that the SQLCipher connection is working correctly. 

201 

202 Args: 

203 cursor_or_conn: SQLCipher cursor or connection object 

204 

205 Returns: 

206 True if the connection is valid, False otherwise 

207 """ 

208 try: 

209 # Force key derivation with test query 

210 cursor_or_conn.execute("SELECT 1") 

211 result = ( 

212 cursor_or_conn.fetchone() 

213 if hasattr(cursor_or_conn, "fetchone") 

214 else cursor_or_conn.execute("SELECT 1").fetchone() 

215 ) 

216 return result == (1,) 

217 except Exception: 

218 logger.exception("SQLCipher verification failed") 

219 return False 

220 

221 

222def create_sqlcipher_connection(db_path: str, password: str) -> Any: 

223 """ 

224 Create a properly configured SQLCipher connection. 

225 

226 Args: 

227 db_path: Path to the database file 

228 password: The password for encryption 

229 

230 Returns: 

231 SQLCipher connection object 

232 

233 Raises: 

234 ImportError: If sqlcipher3 is not available 

235 ValueError: If the connection cannot be established 

236 """ 

237 # Import the appropriate SQLCipher module 

238 from .sqlcipher_compat import get_sqlcipher_module 

239 

240 try: 

241 sqlcipher3 = get_sqlcipher_module() 

242 except ImportError: 

243 raise ImportError( 

244 "sqlcipher3 is not available for encrypted databases. " 

245 "Ensure SQLCipher system library is installed, then run: pdm install" 

246 ) 

247 

248 conn = sqlcipher3.connect(str(db_path)) 

249 cursor = conn.cursor() 

250 

251 # Set encryption key 

252 set_sqlcipher_key(cursor, password) 

253 

254 # Apply SQLCipher settings 

255 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

256 

257 # Verify connection 

258 if not verify_sqlcipher_connection(cursor): 

259 conn.close() 

260 raise ValueError("Failed to establish encrypted database connection") 

261 

262 # Apply performance settings 

263 apply_performance_pragmas(cursor) 

264 

265 cursor.close() 

266 return conn