Coverage for src / local_deep_research / database / encrypted_db.py: 76%

259 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Encrypted database management using SQLCipher. 

3Handles per-user encrypted databases with browser-friendly authentication. 

4""" 

5 

6import os 

7from pathlib import Path 

8from typing import Any, Dict, Optional 

9 

10from loguru import logger 

11from sqlalchemy import create_engine, event, text 

12from sqlalchemy.engine import Engine 

13from sqlalchemy.orm import Session, sessionmaker 

14from sqlalchemy.pool import QueuePool, NullPool, StaticPool 

15 

16from ..config.paths import get_data_directory, get_user_database_filename 

17from .sqlcipher_compat import get_sqlcipher_module 

18from .sqlcipher_utils import ( 

19 set_sqlcipher_key, 

20 set_sqlcipher_rekey, 

21 apply_sqlcipher_pragmas, 

22 apply_performance_pragmas, 

23 verify_sqlcipher_connection, 

24) 

25 

26 

27class DatabaseManager: 

28 """Manages encrypted SQLCipher databases for each user.""" 

29 

30 def __init__(self): 

31 self.connections: Dict[str, Engine] = {} 

32 self.data_dir = get_data_directory() / "encrypted_databases" 

33 self.data_dir.mkdir(parents=True, exist_ok=True) 

34 

35 # Check SQLCipher availability 

36 self.has_encryption = self._check_encryption_available() 

37 

38 # Determine pool class based on environment 

39 # Use StaticPool for testing to avoid locking issues 

40 self._use_static_pool = bool(os.environ.get("TESTING")) 

41 self._pool_class = StaticPool if self._use_static_pool else QueuePool 

42 

43 def _get_pool_kwargs(self) -> Dict[str, Any]: 

44 """Get pool configuration kwargs based on pool type. 

45 

46 StaticPool doesn't support pool_size or max_overflow, 

47 so we only include them for QueuePool. 

48 """ 

49 if self._use_static_pool: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true

50 return {} 

51 return { 

52 "pool_size": 10, 

53 "max_overflow": 30, # Increase overflow so we don't run out of connections 

54 } 

55 

56 def _is_valid_encryption_key(self, password: str) -> bool: 

57 """ 

58 Check if the provided password is valid (not None or empty). 

59 

60 Args: 

61 password: The password to check 

62 

63 Returns: 

64 True if the password is valid, False otherwise 

65 """ 

66 return password is not None and password != "" 

67 

68 def _check_encryption_available(self) -> bool: 

69 """Check if SQLCipher is available for encryption.""" 

70 try: 

71 import os as os_module 

72 import tempfile 

73 

74 # Test if SQLCipher actually works, not just if it imports 

75 with tempfile.NamedTemporaryFile(delete=False) as tmp: 

76 tmp_path = tmp.name 

77 

78 try: 

79 # Try to create a test encrypted database 

80 

81 # Use raw sqlcipher3 connection to test 

82 sqlcipher_module = get_sqlcipher_module() 

83 sqlcipher = sqlcipher_module.dbapi2 

84 

85 conn = sqlcipher.connect(tmp_path) 

86 # Use centralized key setting 

87 set_sqlcipher_key(conn, "testpass") 

88 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)") 

89 conn.execute("INSERT INTO test VALUES (1)") 

90 result = conn.execute("SELECT * FROM test").fetchone() 

91 conn.close() 

92 

93 if result != (1,): 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 raise Exception("SQLCipher encryption test failed") 

95 logger.info( 

96 "SQLCipher available and working - databases will be encrypted" 

97 ) 

98 return True 

99 except Exception as e: 

100 logger.warning(f"SQLCipher module found but not working: {e}") 

101 raise ImportError("SQLCipher not functional") 

102 finally: 

103 # Clean up test file 

104 try: 

105 os_module.unlink(tmp_path) 

106 except: 

107 pass 

108 

109 except ImportError: 

110 import os 

111 

112 # Check if user has explicitly allowed unencrypted databases 

113 allow_unencrypted = ( 

114 os.environ.get("LDR_ALLOW_UNENCRYPTED", "").lower() == "true" 

115 ) 

116 

117 if not allow_unencrypted: 

118 logger.exception( 

119 "SECURITY ERROR: SQLCipher is not installed!\n" 

120 "Your databases will NOT be encrypted.\n" 

121 "To fix this:\n" 

122 "1. Install SQLCipher: sudo apt install sqlcipher libsqlcipher-dev\n" 

123 "2. Reinstall project: pdm install\n" 

124 "Or use Docker with SQLCipher pre-installed.\n\n" 

125 "To explicitly allow unencrypted databases (NOT RECOMMENDED):\n" 

126 "export LDR_ALLOW_UNENCRYPTED=true" 

127 ) 

128 raise RuntimeError( 

129 "SQLCipher not available. Set LDR_ALLOW_UNENCRYPTED=true to proceed without encryption (NOT RECOMMENDED)" 

130 ) 

131 else: 

132 logger.warning( 

133 "⚠️ WARNING: Running with UNENCRYPTED databases!\n" 

134 "This means:\n" 

135 "- Passwords don't protect data access\n" 

136 "- API keys are stored in plain text\n" 

137 "- Anyone with file access can read all data\n" 

138 "Install SQLCipher for secure operation!" 

139 ) 

140 return False 

141 

142 def _get_user_db_path(self, username: str) -> Path: 

143 """Get the path for a user's encrypted database.""" 

144 return self.data_dir / get_user_database_filename(username) 

145 

146 def _apply_pragmas(self, connection, connection_record): 

147 """Apply pragmas for optimal performance.""" 

148 # Check if this is SQLCipher or regular SQLite 

149 is_encrypted = self.has_encryption 

150 

151 # Use centralized performance pragma application 

152 from .sqlcipher_utils import apply_performance_pragmas 

153 

154 apply_performance_pragmas(connection) 

155 

156 # SQLCipher-specific pragmas 

157 if is_encrypted: 

158 from .sqlcipher_utils import get_sqlcipher_settings 

159 

160 settings = get_sqlcipher_settings() 

161 pragmas = [ 

162 f"PRAGMA kdf_iter = {settings['kdf_iterations']}", 

163 f"PRAGMA cipher_page_size = {settings['page_size']}", 

164 ] 

165 for pragma in pragmas: 

166 try: 

167 connection.execute(pragma) 

168 except Exception: 

169 pass 

170 else: 

171 # Regular SQLite pragma 

172 try: 

173 connection.execute( 

174 "PRAGMA mmap_size = 268435456" 

175 ) # 256MB memory mapping 

176 except Exception: 

177 pass 

178 

179 def create_user_database(self, username: str, password: str) -> Engine: 

180 """Create a new encrypted database for a user.""" 

181 

182 # Validate the encryption key 

183 if not self._is_valid_encryption_key(password): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 logger.error( 

185 f"Invalid encryption key for user {username}: password is None or empty" 

186 ) 

187 raise ValueError( 

188 "Invalid encryption key: password cannot be None or empty" 

189 ) 

190 

191 db_path = self._get_user_db_path(username) 

192 

193 if db_path.exists(): 

194 raise ValueError(f"Database already exists for user {username}") 

195 

196 # Create connection string - use regular SQLite when SQLCipher not available 

197 if self.has_encryption: 197 ↛ 287line 197 didn't jump to line 287 because the condition on line 197 was always true

198 # Create directory if it doesn't exist 

199 db_path.parent.mkdir(parents=True, exist_ok=True) 

200 

201 # SOLUTION: Create database structure using raw SQLCipher outside SQLAlchemy 

202 # This bypasses the SQLAlchemy DDL execution that causes MemoryError in Flask 

203 try: 

204 sqlcipher3 = get_sqlcipher_module() 

205 

206 # Create tables directly with SQLCipher, bypassing SQLAlchemy DDL 

207 # Use IMMEDIATE isolation level for proper write transaction handling 

208 conn = sqlcipher3.connect( 

209 str(db_path), 

210 isolation_level="IMMEDIATE", 

211 check_same_thread=False, 

212 ) 

213 # Use centralized SQLCipher setup 

214 set_sqlcipher_key(conn, password) 

215 apply_sqlcipher_pragmas(conn, creation_mode=True) 

216 

217 # Get the CREATE TABLE statements from SQLAlchemy models 

218 from sqlalchemy.dialects import sqlite 

219 from sqlalchemy.schema import CreateTable 

220 

221 from .models import Base 

222 

223 # Create tables one by one 

224 sqlite_dialect = sqlite.dialect() 

225 for table in Base.metadata.sorted_tables: 

226 if table.name != "users": 

227 # Get the SQL for this table with SQLite dialect 

228 create_sql = str( 

229 CreateTable(table).compile(dialect=sqlite_dialect) 

230 ) 

231 logger.debug(f"Creating table {table.name}") 

232 conn.execute(create_sql) 

233 

234 conn.commit() 

235 conn.close() 

236 

237 logger.info( 

238 f"Database structure created successfully for {username}" 

239 ) 

240 

241 except Exception: 

242 logger.exception("Error creating database structure") 

243 raise 

244 

245 # Small delay to ensure file is fully written 

246 import time 

247 

248 time.sleep(0.1) 

249 

250 # Now create SQLAlchemy engine using custom connection creator 

251 # This ensures encryption is properly initialized for every connection 

252 sqlcipher3 = get_sqlcipher_module() 

253 

254 def create_sqlcipher_connection(): 

255 """Create a properly initialized SQLCipher connection.""" 

256 conn = sqlcipher3.connect( 

257 str(db_path), 

258 isolation_level="IMMEDIATE", 

259 check_same_thread=False, 

260 ) 

261 cursor = conn.cursor() 

262 

263 # Use centralized SQLCipher setup 

264 set_sqlcipher_key(cursor, password) 

265 

266 # Verify connection works 

267 if not verify_sqlcipher_connection(cursor): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 raise ValueError("Failed to verify database key") 

269 

270 # Apply SQLCipher and performance settings 

271 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

272 apply_performance_pragmas(cursor, username) 

273 

274 cursor.close() 

275 return conn 

276 

277 # Create engine with custom creator function and optimized cache 

278 engine = create_engine( 

279 "sqlite://", 

280 creator=create_sqlcipher_connection, 

281 poolclass=self._pool_class, 

282 echo=False, 

283 query_cache_size=1000, # Increased for complex queries with SQLCipher 

284 **self._get_pool_kwargs(), 

285 ) 

286 else: 

287 logger.warning( 

288 f"SQLCipher not available - creating UNENCRYPTED database for user {username}" 

289 ) 

290 # Fall back to regular SQLite with query cache 

291 engine = create_engine( 

292 f"sqlite:///{db_path}", 

293 connect_args={"check_same_thread": False, "timeout": 30}, 

294 poolclass=self._pool_class, 

295 echo=False, 

296 query_cache_size=1000, # Same optimization for unencrypted 

297 **self._get_pool_kwargs(), 

298 ) 

299 

300 # For unencrypted databases, just apply pragmas 

301 event.listen(engine, "connect", self._apply_pragmas) 

302 

303 # Tables have already been created using raw SQLCipher above 

304 # No need to create them again with SQLAlchemy 

305 

306 # Store connection 

307 self.connections[username] = engine 

308 

309 # Initialize database tables using centralized initialization 

310 from .initialize import initialize_database 

311 

312 try: 

313 # Create a session for settings initialization 

314 Session = sessionmaker(bind=engine) 

315 with Session() as session: 

316 initialize_database(engine, session) 

317 except Exception as e: 

318 logger.warning(f"Could not initialize database fully: {e}") 

319 # Still continue - basic tables were created above 

320 

321 logger.info(f"Created encrypted database for user {username}") 

322 return engine 

323 

324 def open_user_database( 

325 self, username: str, password: str 

326 ) -> Optional[Engine]: 

327 """Open an existing encrypted database for a user.""" 

328 

329 # Validate the encryption key 

330 if not self._is_valid_encryption_key(password): 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 logger.error( 

332 f"Invalid encryption key when opening database for user {username}: password is None or empty" 

333 ) 

334 # TODO: Fix the root cause - research threads are not getting the correct password 

335 logger.error( 

336 "TODO: This usually means the research thread is not receiving the user's " 

337 "password for database encryption. Need to ensure password is passed from " 

338 "the main thread to research threads." 

339 ) 

340 raise ValueError( 

341 "Invalid encryption key: password cannot be None or empty" 

342 ) 

343 

344 # Check if already open 

345 if username in self.connections: 

346 return self.connections[username] 

347 

348 db_path = self._get_user_db_path(username) 

349 

350 if not db_path.exists(): 

351 logger.error(f"No database found for user {username}") 

352 return None 

353 

354 # Create connection string - use regular SQLite when SQLCipher not available 

355 if self.has_encryption: 355 ↛ 392line 355 didn't jump to line 392 because the condition on line 355 was always true

356 # Use the same custom connection creator approach as create_user_database 

357 sqlcipher3 = get_sqlcipher_module() 

358 

359 def create_sqlcipher_connection(): 

360 """Create a properly initialized SQLCipher connection.""" 

361 conn = sqlcipher3.connect( 

362 str(db_path), 

363 isolation_level="IMMEDIATE", 

364 check_same_thread=False, 

365 ) 

366 cursor = conn.cursor() 

367 

368 # Use centralized SQLCipher setup 

369 set_sqlcipher_key(cursor, password) 

370 

371 # Verify connection works 

372 if not verify_sqlcipher_connection(cursor): 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 raise ValueError("Failed to verify database key") 

374 

375 # Apply SQLCipher and performance settings 

376 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

377 apply_performance_pragmas(cursor, username) 

378 

379 cursor.close() 

380 return conn 

381 

382 # Create engine with custom creator function and optimized cache 

383 engine = create_engine( 

384 "sqlite://", 

385 creator=create_sqlcipher_connection, 

386 poolclass=self._pool_class, 

387 echo=False, 

388 query_cache_size=1000, # Increased for complex queries with SQLCipher 

389 **self._get_pool_kwargs(), 

390 ) 

391 else: 

392 logger.warning( 

393 f"SQLCipher not available - opening UNENCRYPTED database for user {username}" 

394 ) 

395 # Fall back to regular SQLite (no password protection!) 

396 engine = create_engine( 

397 f"sqlite:///{db_path}", 

398 connect_args={"check_same_thread": False, "timeout": 30}, 

399 poolclass=self._pool_class, 

400 echo=False, 

401 query_cache_size=1000, # Same optimization for unencrypted 

402 **self._get_pool_kwargs(), 

403 ) 

404 

405 # For unencrypted databases, just apply pragmas 

406 event.listen(engine, "connect", self._apply_pragmas) 

407 

408 try: 

409 # Test connection by running a simple query 

410 with engine.connect() as conn: 

411 conn.execute(text("SELECT 1")) 

412 

413 # Store connection 

414 self.connections[username] = engine 

415 

416 # Run database initialization (creates missing tables and runs migrations) 

417 from .initialize import initialize_database 

418 

419 try: 

420 initialize_database(engine) 

421 except Exception as e: 

422 logger.warning(f"Could not run migrations for {username}: {e}") 

423 

424 logger.info(f"Opened encrypted database for user {username}") 

425 return engine 

426 

427 except Exception as e: 

428 logger.exception( 

429 f"Failed to open database for user {username}: {e}" 

430 ) 

431 return None 

432 

433 def get_session(self, username: str) -> Optional[Session]: 

434 """Create a new session for a user's database.""" 

435 if username not in self.connections: 

436 # Use debug level for this common scenario to reduce log noise 

437 logger.debug(f"No open database for user {username}") 

438 return None 

439 

440 # Always create a fresh session to avoid stale session issues 

441 engine = self.connections[username] 

442 SessionLocal = sessionmaker(bind=engine) 

443 return SessionLocal() 

444 

445 def close_user_database(self, username: str): 

446 """Close a user's database connection.""" 

447 if username in self.connections: 

448 self.connections[username].dispose() 

449 del self.connections[username] 

450 logger.info(f"Closed database for user {username}") 

451 

452 def check_database_integrity(self, username: str) -> bool: 

453 """Check integrity of a user's encrypted database.""" 

454 if username not in self.connections: 

455 return False 

456 

457 try: 

458 with self.connections[username].connect() as conn: 

459 # Quick integrity check 

460 result = conn.execute(text("PRAGMA quick_check")) 

461 if result.fetchone()[0] != "ok": 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 return False 

463 

464 # SQLCipher integrity check 

465 result = conn.execute(text("PRAGMA cipher_integrity_check")) 

466 # If this returns any rows, there are HMAC failures 

467 failures = list(result) 

468 if failures: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 logger.error( 

470 f"Integrity check failed for {username}: {len(failures)} HMAC failures" 

471 ) 

472 return False 

473 

474 return True 

475 

476 except Exception: 

477 logger.exception(f"Integrity check error for user: {username}") 

478 return False 

479 

480 def change_password( 

481 self, username: str, old_password: str, new_password: str 

482 ) -> bool: 

483 """Change the encryption password for a user's database.""" 

484 if not self.has_encryption: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 logger.warning( 

486 "Cannot change password - SQLCipher not available (databases are unencrypted)" 

487 ) 

488 return False 

489 

490 db_path = self._get_user_db_path(username) 

491 

492 if not db_path.exists(): 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 return False 

494 

495 try: 

496 # Close existing connection if any 

497 self.close_user_database(username) 

498 

499 # Open with old password 

500 engine = self.open_user_database(username, old_password) 

501 if not engine: 

502 return False 

503 

504 # Rekey the database (only works with SQLCipher) 

505 with engine.connect() as conn: 

506 # Use centralized rekey function 

507 set_sqlcipher_rekey(conn, new_password) 

508 

509 logger.info(f"Password changed for user {username}") 

510 return True 

511 

512 except Exception: 

513 logger.exception(f"Failed to change password for user: {username}") 

514 return False 

515 finally: 

516 # Close the connection 

517 self.close_user_database(username) 

518 

519 def user_exists(self, username: str) -> bool: 

520 """Check if a user exists in the auth database.""" 

521 from .auth_db import get_auth_db_session 

522 from .models.auth import User 

523 

524 auth_db = get_auth_db_session() 

525 user = auth_db.query(User).filter_by(username=username).first() 

526 auth_db.close() 

527 

528 return user is not None 

529 

530 def get_memory_usage(self) -> Dict[str, Any]: 

531 """Get memory usage statistics.""" 

532 return { 

533 "active_connections": len(self.connections), 

534 "active_sessions": 0, # Sessions are created on-demand, not tracked 

535 "estimated_memory_mb": len(self.connections) 

536 * 3.5, # ~3.5MB per connection 

537 } 

538 

539 def create_thread_safe_session_for_metrics( 

540 self, username: str, password: str 

541 ): 

542 """ 

543 Create a new database session safe for use in background threads. 

544 This is specifically for metrics/logging - NOT for settings or user data. 

545 

546 Args: 

547 username: The username 

548 password: The user's password (encryption key) 

549 

550 Returns: 

551 A SQLAlchemy session that can be used in the current thread 

552 

553 IMPORTANT: This should ONLY be used for: 

554 - Writing token metrics 

555 - Writing search metrics 

556 - Writing logs 

557 

558 DO NOT use this for: 

559 - Reading/writing settings 

560 - Modifying user data 

561 - Any operation that should be synchronized with user requests 

562 """ 

563 db_path = self._get_user_db_path(username) 

564 

565 if not db_path.exists(): 

566 raise ValueError(f"No database found for user {username}") 

567 

568 # Create a thread-local engine 

569 if self.has_encryption: 569 ↛ 607line 569 didn't jump to line 607 because the condition on line 569 was always true

570 sqlcipher3 = get_sqlcipher_module() 

571 

572 def create_thread_connection(): 

573 """Create a SQLCipher connection for this thread.""" 

574 try: 

575 conn = sqlcipher3.connect( 

576 str(db_path), check_same_thread=False 

577 ) 

578 cursor = conn.cursor() 

579 

580 # Use centralized SQLCipher setup 

581 set_sqlcipher_key(cursor, password) 

582 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

583 

584 # Verify connection works 

585 if not verify_sqlcipher_connection(cursor): 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 raise ValueError("Failed to verify database key") 

587 except Exception as e: 

588 logger.exception( 

589 f"Failed to create thread connection for {username}: {e}" 

590 ) 

591 raise 

592 

593 # Apply performance pragmas for metrics writes 

594 apply_performance_pragmas(cursor, username) 

595 

596 cursor.close() 

597 return conn 

598 

599 engine = create_engine( 

600 "sqlite://", 

601 creator=create_thread_connection, 

602 poolclass=NullPool, # Important: no connection pooling for threads 

603 echo=False, 

604 ) 

605 else: 

606 # Unencrypted fallback 

607 logger.warning("Creating unencrypted thread session for metrics") 

608 engine = create_engine( 

609 f"sqlite:///{db_path}", 

610 poolclass=NullPool, 

611 echo=False, 

612 ) 

613 

614 # Create session 

615 Session = sessionmaker(bind=engine) 

616 return Session() 

617 

618 

619# Global instance 

620db_manager = DatabaseManager()