Coverage for src/local_deep_research/database/encrypted_db.py: 90%

336 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Encrypted database management using SQLCipher. 

3Handles per-user encrypted databases with browser-friendly authentication. 

4""" 

5 

6import os 

7import threading 

8import time 

9from pathlib import Path 

10from typing import Any, Dict, Optional 

11 

12from loguru import logger 

13from sqlalchemy import create_engine, event, text 

14from sqlalchemy.engine import Engine 

15from sqlalchemy.orm import Session, sessionmaker 

16from sqlalchemy.pool import QueuePool, StaticPool 

17 

18from ..config.paths import get_data_directory, get_user_database_filename 

19from ..settings.env_registry import get_env_setting 

20from .sqlcipher_compat import get_sqlcipher_module 

21from .pool_config import POOL_PRE_PING, POOL_RECYCLE_SECONDS 

22from .sqlcipher_utils import ( 

23 set_sqlcipher_key, 

24 set_sqlcipher_rekey, 

25 apply_cipher_defaults_before_key, 

26 apply_sqlcipher_pragmas, 

27 apply_performance_pragmas, 

28 verify_sqlcipher_connection, 

29 create_database_salt, 

30 has_per_database_salt, 

31 get_key_from_password, 

32 get_sqlcipher_version, 

33 create_sqlcipher_connection, 

34) 

35 

36 

37class DatabaseInitializationError(Exception): 

38 """Raised when a per-user database opens but its schema can't be initialised. 

39 

40 Distinct from credential / decryption failures (which return ``None`` 

41 from :py:meth:`DatabaseManager.open_user_database`) so callers — chiefly 

42 the login route — can avoid penalising the user's lockout counter and 

43 surface a different error message. The credentials are valid; the 

44 database state isn't. 

45 """ 

46 

47 

48class DatabaseManager: 

49 """Manages encrypted SQLCipher databases for each user.""" 

50 

51 def __init__(self): 

52 self.connections: Dict[str, Engine] = {} 

53 self._connections_lock = threading.RLock() 

54 self.data_dir = get_data_directory() / "encrypted_databases" 

55 self.data_dir.mkdir(parents=True, exist_ok=True) 

56 

57 # Check SQLCipher availability 

58 self.has_encryption = self._check_encryption_available() 

59 

60 # ---------------------------------------------------------------- 

61 # Pool class selection — see ADR-0004 

62 # 

63 # We use QueuePool (pool_size=20, max_overflow=40, 

64 # pool_timeout=10) for production and StaticPool for tests. 

65 # 

66 # Why pool_size=20: 

67 # 

68 # 1. SQLCipher + WAL mode can leak file handles when connections 

69 # close out of open-order. Fewer pooled connections = fewer 

70 # opportunities for out-of-order closes during pool_recycle. 

71 # See: https://github.com/sqlcipher/android-database-sqlcipher/issues/6 

72 # See: https://github.com/dotnet/efcore/issues/35010 

73 # 

74 # 2. SQLite serializes all writes through a single file lock. 

75 # Multiple pooled connections don't improve throughput — they 

76 # just hold FDs (up to 3 per connection in WAL mode). 

77 # 

78 # 3. The cleanup scheduler periodically calls engine.dispose() 

79 # to release all pooled connections, preventing long-lived 

80 # handles from accumulating over days of idle operation. 

81 # 

82 # Why pool_size=20 and not 1: inject_current_user() creates a 

83 # QueuePool session on every request via g.db_session. With the 

84 # UI polling /api/research/<id>/status every 1-2s plus other 

85 # API calls and before_request middleware, pool_size=1 

86 # (max_overflow=2, so 3 total) is easily exhausted — causing 

87 # 30-second timeouts and PendingRollbackError cascades. 

88 # pool_size=20 + max_overflow=40 (60 total) provides ample 

89 # headroom for concurrent requests and multiple browser tabs. 

90 # 

91 # Why not NullPool: SQLCipher's PRAGMA key adds ~0.2ms per 

92 # connection open. With 20-30 queries per page load, NullPool 

93 # adds a noticeable 4-6ms overhead vs QueuePool's ~1.5ms. 

94 # ---------------------------------------------------------------- 

95 self._use_static_pool = bool(os.environ.get("TESTING")) 

96 self._pool_class = StaticPool if self._use_static_pool else QueuePool 

97 

98 def _get_pool_kwargs(self) -> Dict[str, Any]: 

99 """Get pool configuration kwargs based on pool type. 

100 

101 StaticPool doesn't support pool_size or max_overflow. 

102 QueuePool uses moderate sizing to handle concurrent web requests 

103 while limiting FD usage. See ADR-0004 for rationale. 

104 """ 

105 if self._use_static_pool: 

106 return {} 

107 return { 

108 "pool_size": 20, 

109 "max_overflow": 40, 

110 "pool_timeout": 10, 

111 "pool_pre_ping": POOL_PRE_PING, 

112 "pool_recycle": POOL_RECYCLE_SECONDS, 

113 } 

114 

115 def _is_valid_encryption_key(self, password: str) -> bool: 

116 """ 

117 Check if the provided password is valid (not None, empty, or whitespace-only). 

118 

119 Args: 

120 password: The password to check 

121 

122 Returns: 

123 True if the password is valid, False otherwise 

124 """ 

125 return password is not None and password.strip() != "" 

126 

127 def is_user_connected(self, username: str) -> bool: 

128 """Check if a user has an active database connection. 

129 

130 Thread-safe accessor for external callers. 

131 

132 Args: 

133 username: The username to check 

134 

135 Returns: 

136 True if the user has an active connection 

137 """ 

138 with self._connections_lock: 

139 return username in self.connections 

140 

141 def _check_encryption_available(self) -> bool: 

142 """Check if SQLCipher is available for encryption.""" 

143 try: 

144 import os as os_module 

145 import tempfile 

146 

147 # Test if SQLCipher actually works, not just if it imports 

148 with tempfile.NamedTemporaryFile(delete=False) as tmp: 

149 tmp_path = tmp.name 

150 

151 try: 

152 # Try to create a test encrypted database 

153 sqlcipher_module = get_sqlcipher_module() 

154 sqlcipher = sqlcipher_module.dbapi2 

155 

156 conn = sqlcipher.connect(tmp_path) 

157 try: 

158 cursor = conn.cursor() 

159 # Use creation_mode=True since we're creating a new test database 

160 apply_cipher_defaults_before_key(cursor) 

161 # Use centralized key setting 

162 set_sqlcipher_key(cursor, "testpass") 

163 # Apply post-key pragmas (kdf_iter for new DB) 

164 apply_sqlcipher_pragmas(cursor, creation_mode=True) 

165 apply_performance_pragmas(cursor) 

166 

167 # Check SQLCipher version 

168 version = get_sqlcipher_version(cursor) 

169 if version: 169 ↛ 181line 169 didn't jump to line 181 because the condition on line 169 was always true

170 major = ( 

171 version.split(".")[0] 

172 if "." in version 

173 else version[0] 

174 ) 

175 if major.isdigit() and int(major) < 4: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 logger.warning( 

177 f"SQLCipher version {version} detected. " 

178 "Version 4.x+ is recommended for proper PRAGMA ordering." 

179 ) 

180 

181 cursor.close() 

182 # Now use the connection for table operations 

183 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)") 

184 conn.execute("INSERT INTO test VALUES (1)") 

185 result = conn.execute("SELECT * FROM test").fetchone() 

186 

187 if result != (1,): 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 raise RuntimeError("SQLCipher encryption test failed") 

189 logger.info( 

190 "SQLCipher available and working - databases will be encrypted" 

191 ) 

192 return True 

193 finally: 

194 from ..utilities.resource_utils import safe_close 

195 

196 safe_close(conn, "SQLCipher test connection") 

197 except Exception: 

198 logger.warning("SQLCipher module found but not working") 

199 raise ImportError("SQLCipher not functional") 

200 finally: 

201 # Clean up test file 

202 try: 

203 os_module.unlink(tmp_path) 

204 except OSError as e: 

205 logger.debug( 

206 f"Failed to clean up temp file {tmp_path}: {e}" 

207 ) 

208 

209 except ImportError: 

210 # Check if user has explicitly allowed unencrypted databases. 

211 # Registry handles deprecated LDR_ALLOW_UNENCRYPTED fallback automatically. 

212 allow_unencrypted = get_env_setting( 

213 "bootstrap.allow_unencrypted", False 

214 ) 

215 

216 if not allow_unencrypted: 

217 logger.exception( 

218 "SECURITY ERROR: SQLCipher is not installed!\n" 

219 "Your databases will NOT be encrypted.\n" 

220 "To fix this:\n" 

221 "1. Install SQLCipher: sudo apt install sqlcipher libsqlcipher-dev\n" 

222 "2. Reinstall project: pdm install\n" 

223 "Or use Docker with SQLCipher pre-installed.\n\n" 

224 "To explicitly allow unencrypted databases (NOT RECOMMENDED):\n" 

225 "export LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true" 

226 ) 

227 raise RuntimeError( 

228 "SQLCipher not available. Set LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true to proceed without encryption (NOT RECOMMENDED)" 

229 ) 

230 logger.warning( 

231 "WARNING: Running with UNENCRYPTED databases!\n" 

232 "This means:\n" 

233 "- Passwords don't protect data access\n" 

234 "- API keys are stored in plain text\n" 

235 "- Anyone with file access can read all data\n" 

236 "Install SQLCipher for secure operation!" 

237 ) 

238 return False 

239 

240 def _get_user_db_path(self, username: str) -> Path: 

241 """Get the path for a user's encrypted database.""" 

242 return self.data_dir / get_user_database_filename(username) 

243 

244 def _apply_pragmas(self, connection, connection_record): 

245 """Apply pragmas for optimal performance.""" 

246 # Check if this is SQLCipher or regular SQLite 

247 is_encrypted = self.has_encryption 

248 

249 # Use centralized performance pragma application 

250 

251 apply_performance_pragmas(connection) 

252 

253 # SQLCipher-specific pragmas 

254 if is_encrypted: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 from .sqlcipher_utils import get_sqlcipher_settings 

256 

257 settings = get_sqlcipher_settings() 

258 pragmas = [ 

259 f"PRAGMA kdf_iter = {settings['kdf_iterations']}", 

260 f"PRAGMA cipher_page_size = {settings['page_size']}", 

261 ] 

262 for pragma in pragmas: 

263 try: 

264 connection.execute(pragma) 

265 except Exception as e: 

266 logger.debug(f"Could not apply pragma '{pragma}': {e}") 

267 else: 

268 # Regular SQLite pragma 

269 try: 

270 connection.execute( 

271 "PRAGMA mmap_size = 268435456" 

272 ) # 256MB memory mapping 

273 except Exception as e: 

274 logger.debug(f"Could not apply mmap_size pragma: {e}") 

275 

276 @staticmethod 

277 def _make_sqlcipher_connection( 

278 db_path: Path, 

279 password: str, 

280 isolation_level: Optional[str] = "IMMEDIATE", 

281 check_same_thread: bool = False, 

282 ) -> Any: 

283 """Create a properly initialized SQLCipher connection. 

284 

285 Follows the canonical SQLCipher initialization order: set key, 

286 apply cipher pragmas, verify, then apply performance pragmas. 

287 Cipher pragmas (page size, HMAC algorithm, KDF iterations) must 

288 be configured before the first query (verification) because that 

289 query triggers page decryption with the active cipher settings. 

290 

291 Args: 

292 db_path: Path to the database file 

293 password: The database encryption passphrase 

294 isolation_level: SQLite isolation level (``""`` for deferred 

295 transactions, ``None`` for autocommit) 

296 check_same_thread: SQLite check_same_thread flag 

297 

298 Returns: 

299 A raw ``sqlcipher3`` connection ready for use. 

300 

301 Raises: 

302 ValueError: If the database key cannot be verified. 

303 """ 

304 sqlcipher3 = get_sqlcipher_module() 

305 conn = sqlcipher3.connect( 

306 str(db_path), 

307 isolation_level=isolation_level, 

308 check_same_thread=check_same_thread, 

309 ) 

310 cursor = conn.cursor() 

311 

312 try: 

313 set_sqlcipher_key(cursor, password, db_path=db_path) 

314 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

315 

316 if not verify_sqlcipher_connection(cursor): 

317 raise ValueError("Failed to verify database key") # noqa: TRY301 — cleanup in except before re-raise 

318 

319 apply_performance_pragmas(cursor) 

320 except Exception: 

321 try: 

322 cursor.close() 

323 except Exception: # noqa: BLE001 

324 logger.warning("Failed to close cursor during cleanup") 

325 from ..utilities.resource_utils import safe_close 

326 

327 safe_close(conn, "encrypted DB connection") 

328 raise 

329 

330 cursor.close() 

331 return conn 

332 

333 def create_user_database(self, username: str, password: str) -> Engine: 

334 """Create a new encrypted database for a user.""" 

335 

336 # Validate the encryption key 

337 if not self._is_valid_encryption_key(password): 

338 logger.error( 

339 f"Invalid encryption key for user {username}: password is None or empty" 

340 ) 

341 raise ValueError( 

342 "Invalid encryption key: password cannot be None or empty" 

343 ) 

344 

345 db_path = self._get_user_db_path(username) 

346 

347 if db_path.exists(): 

348 raise ValueError(f"Database already exists for user {username}") 

349 

350 # Create connection string - use regular SQLite when SQLCipher not available 

351 if self.has_encryption: 

352 # Create directory if it doesn't exist 

353 db_path.parent.mkdir(parents=True, exist_ok=True) 

354 

355 # Create per-database salt for new databases (v2 security improvement) 

356 create_database_salt(db_path) 

357 logger.info(f"Created per-database salt for {username}") 

358 

359 # Pre-derive key before closures to avoid capturing plaintext password 

360 hex_key = get_key_from_password(password, db_path=db_path).hex() 

361 

362 # Create database structure using raw SQLCipher outside SQLAlchemy 

363 try: 

364 conn = create_sqlcipher_connection( 

365 db_path, 

366 password=password, 

367 creation_mode=True, 

368 connect_kwargs={ 

369 # DEFERRED (empty string) so pure-SELECT transactions 

370 # acquire only SQLite's SHARED lock, letting WAL-mode 

371 # concurrent readers proceed while a writer is active. 

372 # IMMEDIATE was previously set "defensively" and made 

373 # every transaction (even reads) take a RESERVED lock, 

374 # which was the single biggest contention source on 

375 # the login-hang path. Race-prone check-then-insert 

376 # call sites were made race-free at the application 

377 # layer in the preceding prerequisite PR. 

378 "isolation_level": "", 

379 "check_same_thread": False, 

380 }, 

381 ) 

382 try: 

383 # Get the CREATE TABLE statements from SQLAlchemy models 

384 from sqlalchemy.dialects import sqlite 

385 from sqlalchemy.schema import CreateIndex, CreateTable 

386 

387 from .models import Base 

388 

389 # Indexes must be emitted explicitly — SQLAlchemy compiles 

390 # `index=True`/`unique=True` and `Index(...)` to separate 

391 # CREATE [UNIQUE] INDEX statements, not inline. 

392 sqlite_dialect = sqlite.dialect() 

393 for table in Base.metadata.sorted_tables: 

394 if table.name == "users": 

395 continue 

396 create_sql = str( 

397 CreateTable(table, if_not_exists=True).compile( 

398 dialect=sqlite_dialect 

399 ) 

400 ) 

401 logger.debug(f"Creating table {table.name}") 

402 conn.execute(create_sql) 

403 for index in table.indexes: 

404 index_sql = str( 

405 CreateIndex(index, if_not_exists=True).compile( 

406 dialect=sqlite_dialect 

407 ) 

408 ) 

409 conn.execute(index_sql) 

410 

411 conn.commit() 

412 finally: 

413 from ..utilities.resource_utils import safe_close 

414 

415 safe_close(conn, "user DB setup connection") 

416 

417 logger.info( 

418 f"Database structure created successfully for {username}" 

419 ) 

420 

421 except Exception: 

422 logger.exception("Error creating database structure") 

423 # Cleanup partial DB file on failure 

424 if db_path.exists(): 

425 db_path.unlink(missing_ok=True) 

426 raise 

427 

428 # Small delay to ensure file is fully written 

429 import time 

430 

431 time.sleep(0.1) 

432 

433 # Now create SQLAlchemy engine using custom connection creator 

434 def create_engine_connection(): 

435 """Create a properly initialized SQLCipher connection.""" 

436 return create_sqlcipher_connection( 

437 db_path, 

438 hex_key=hex_key, 

439 creation_mode=False, 

440 connect_kwargs={ 

441 # DEFERRED (empty string) so pure-SELECT transactions 

442 # acquire only SQLite's SHARED lock, letting WAL-mode 

443 # concurrent readers proceed while a writer is active. 

444 # IMMEDIATE was previously set "defensively" and made 

445 # every transaction (even reads) take a RESERVED lock, 

446 # which was the single biggest contention source on 

447 # the login-hang path. Race-prone check-then-insert 

448 # call sites were made race-free at the application 

449 # layer in the preceding prerequisite PR. 

450 "isolation_level": "", 

451 "check_same_thread": False, 

452 }, 

453 ) 

454 

455 # Create engine with custom creator function and optimized cache 

456 engine = create_engine( 

457 "sqlite://", 

458 creator=create_engine_connection, 

459 poolclass=self._pool_class, 

460 echo=False, 

461 query_cache_size=1000, 

462 **self._get_pool_kwargs(), 

463 ) 

464 else: 

465 logger.warning( 

466 f"SQLCipher not available - creating UNENCRYPTED database for user {username}" 

467 ) 

468 # Fall back to regular SQLite with query cache 

469 engine = create_engine( 

470 f"sqlite:///{db_path}", 

471 connect_args={"check_same_thread": False, "timeout": 30}, 

472 poolclass=self._pool_class, 

473 echo=False, 

474 query_cache_size=1000, 

475 **self._get_pool_kwargs(), 

476 ) 

477 

478 # For unencrypted databases, just apply pragmas 

479 event.listen(engine, "connect", self._apply_pragmas) 

480 

481 # Tables have already been created using raw SQLCipher above 

482 # No need to create them again with SQLAlchemy 

483 

484 # Initialize database tables using centralized initialization 

485 from .initialize import initialize_database 

486 

487 # Mirror of the fail-loud change #3635 made to open_user_database. 

488 # Previously this swallowed the exception with "tables exist but 

489 # schema version not stamped — migrations will be retried on next 

490 # process restart". That left a half-broken DB on disk: tables 

491 # present, no alembic_version row. The next login then re-ran 

492 # alembic, hit the same error, and (post-#3635) 503'd — so the 

493 # user could register but never log in again. Better to fail 

494 # registration loudly with the partial DB removed, so the real 

495 # cause (e.g. world-writable migrations dir) gets fixed instead 

496 # of producing a permanently-locked-out account. 

497 try: 

498 Session = sessionmaker(bind=engine) 

499 with Session() as session: 

500 initialize_database(engine, session) 

501 except Exception: 

502 logger.exception( 

503 f"Database migration failed for {username} during creation" 

504 " — removing partial DB" 

505 ) 

506 engine.dispose() 

507 if db_path.exists(): 507 ↛ 509line 507 didn't jump to line 509 because the condition on line 507 was always true

508 db_path.unlink(missing_ok=True) 

509 raise 

510 

511 # Store connection AFTER migrations complete 

512 with self._connections_lock: 

513 self.connections[username] = engine 

514 

515 logger.info(f"Created encrypted database for user {username}") 

516 return engine 

517 

518 def open_user_database( 

519 self, username: str, password: str 

520 ) -> Optional[Engine]: 

521 """Open an existing encrypted database for a user.""" 

522 open_start = time.perf_counter() 

523 

524 # Validate the encryption key 

525 if not self._is_valid_encryption_key(password): 

526 logger.error( 

527 f"Invalid encryption key when opening database for user {username}: password is None or empty" 

528 ) 

529 # TODO: Fix the root cause - research threads are not getting the correct password 

530 logger.error( 

531 "TODO: This usually means the research thread is not receiving the user's " 

532 "password for database encryption. Need to ensure password is passed from " 

533 "the main thread to research threads." 

534 ) 

535 raise ValueError( 

536 "Invalid encryption key: password cannot be None or empty" 

537 ) 

538 

539 # Check if already open 

540 with self._connections_lock: 

541 if username in self.connections: 

542 return self.connections[username] 

543 

544 db_path = self._get_user_db_path(username) 

545 

546 # Prevent timing attacks: always derive key before checking file existence 

547 # This ensures both existing and non-existent users take the same amount of time, 

548 # preventing username enumeration via timing analysis. 

549 # Pre-derive key before closures to avoid capturing plaintext password 

550 hex_key = get_key_from_password(password, db_path=db_path).hex() 

551 

552 if not db_path.exists(): 

553 logger.error(f"No database found for user {username}") 

554 return None 

555 

556 # Warn if this is a legacy database without per-database salt 

557 if self.has_encryption and not has_per_database_salt(db_path): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 logger.warning( 

559 f"Database for user '{username}' uses the legacy shared salt " 

560 f"(deprecated). For improved security, consider creating a new " 

561 f"account to get a per-database salt. Legacy databases remain " 

562 f"fully functional but are less resistant to multi-target attacks." 

563 ) 

564 

565 # Create connection string - use regular SQLite when SQLCipher not available 

566 if self.has_encryption: 

567 

568 def create_open_connection(): 

569 """Create a properly initialized SQLCipher connection.""" 

570 return create_sqlcipher_connection( 

571 db_path, 

572 hex_key=hex_key, 

573 creation_mode=False, 

574 connect_kwargs={ 

575 # DEFERRED (empty string) so pure-SELECT transactions 

576 # acquire only SQLite's SHARED lock, letting WAL-mode 

577 # concurrent readers proceed while a writer is active. 

578 # IMMEDIATE was previously set "defensively" and made 

579 # every transaction (even reads) take a RESERVED lock, 

580 # which was the single biggest contention source on 

581 # the login-hang path. Race-prone check-then-insert 

582 # call sites were made race-free at the application 

583 # layer in the preceding prerequisite PR. 

584 "isolation_level": "", 

585 "check_same_thread": False, 

586 }, 

587 ) 

588 

589 # Create engine with custom creator function and optimized cache 

590 engine = create_engine( 

591 "sqlite://", 

592 creator=create_open_connection, 

593 poolclass=self._pool_class, 

594 echo=False, 

595 query_cache_size=1000, 

596 **self._get_pool_kwargs(), 

597 ) 

598 else: 

599 logger.warning( 

600 f"SQLCipher not available - opening UNENCRYPTED database for user {username}" 

601 ) 

602 # Fall back to regular SQLite (no password protection!) 

603 engine = create_engine( 

604 f"sqlite:///{db_path}", 

605 connect_args={"check_same_thread": False, "timeout": 30}, 

606 poolclass=self._pool_class, 

607 echo=False, 

608 query_cache_size=1000, 

609 **self._get_pool_kwargs(), 

610 ) 

611 

612 # For unencrypted databases, just apply pragmas 

613 event.listen(engine, "connect", self._apply_pragmas) 

614 

615 try: 

616 # Test connection by running a simple query 

617 with engine.connect() as conn: 

618 conn.execute(text("SELECT 1")) 

619 

620 # Run database initialization (creates missing tables and runs migrations) 

621 from .initialize import initialize_database 

622 

623 # Create backup before migration to protect against schema change failures 

624 from .alembic_runner import needs_migration 

625 

626 if needs_migration(engine): 

627 try: 

628 from .backup.backup_service import BackupService 

629 

630 result = BackupService( 

631 username=username, password=password 

632 ).create_backup(force=True) 

633 if result.success: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true

634 logger.info( 

635 f"Pre-migration backup created: {result.backup_path}" 

636 ) 

637 else: 

638 logger.error( 

639 f"Pre-migration backup failed: {result.error}" 

640 ) 

641 except Exception: 

642 logger.exception( 

643 "Pre-migration backup failed — proceeding with migration" 

644 ) 

645 

646 # Init failures need to be distinguishable from credential 

647 # failures at the call site: the credentials worked (we 

648 # decrypted and ran SELECT 1), but the schema couldn't be 

649 # brought up. Re-raise as a typed error so the login route 

650 # can skip the lockout counter and surface a server-error 

651 # message instead of "Invalid username or password". 

652 try: 

653 initialize_database(engine) 

654 except Exception as init_err: 

655 elapsed_ms = (time.perf_counter() - open_start) * 1000 

656 logger.exception( 

657 f"Database migration failed for {username} " 

658 f"after {elapsed_ms:.0f}ms — refusing login" 

659 ) 

660 engine.dispose() 

661 raise DatabaseInitializationError( 

662 f"Database initialisation failed for {username}: {init_err}" 

663 ) from init_err 

664 

665 # Store connection AFTER migrations complete 

666 with self._connections_lock: 

667 self.connections[username] = engine 

668 

669 elapsed_ms = (time.perf_counter() - open_start) * 1000 

670 if elapsed_ms > 100: 

671 logger.info( 

672 f"Opened encrypted database for user {username} " 

673 f"(cold-open wall clock: {elapsed_ms:.0f}ms)" 

674 ) 

675 else: 

676 logger.info( 

677 f"Opened encrypted database for user {username} " 

678 f"({elapsed_ms:.0f}ms)" 

679 ) 

680 return engine 

681 

682 except DatabaseInitializationError: 

683 # Already logged + engine disposed at the raise site. Re-raise 

684 # past the catch-all below so callers see the typed error. 

685 raise 

686 except Exception: 

687 elapsed_ms = (time.perf_counter() - open_start) * 1000 

688 logger.exception( 

689 f"Failed to open database for user {username} " 

690 f"after {elapsed_ms:.0f}ms" 

691 ) 

692 engine.dispose() 

693 return None 

694 

695 def get_session(self, username: str) -> Optional[Session]: 

696 """Create a new session for a user's database.""" 

697 with self._connections_lock: 

698 if username not in self.connections: 

699 # Use debug level for this common scenario to reduce log noise 

700 logger.debug(f"No open database for user {username}") 

701 return None 

702 engine = self.connections[username] 

703 # Create session inside lock to prevent race with close_user_database() 

704 SessionLocal = sessionmaker(bind=engine) 

705 return SessionLocal() 

706 

707 def get_connected_usernames(self) -> set: 

708 """Return a snapshot of usernames with open connections.""" 

709 with self._connections_lock: 

710 return set(self.connections.keys()) 

711 

712 def _checkpoint_wal(self, engine, context: str = ""): 

713 """Checkpoint WAL before disposing engine to flush pending writes.""" 

714 try: 

715 with engine.connect() as conn: 

716 # TRUNCATE returns (busy, log_pages, checkpointed_pages). 

717 # busy=1 means a reader/writer held a WAL lock and the WAL 

718 # was NOT truncated — surface that so we can tell from logs 

719 # whether the helper actually shrank the file. 

720 result = conn.execute( 

721 text("PRAGMA wal_checkpoint(TRUNCATE)") 

722 ).fetchone() 

723 if result and result[0] == 1: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 logger.debug( 

725 f"WAL checkpoint busy {context} — WAL not truncated" 

726 ) 

727 except Exception: 

728 logger.debug(f"WAL checkpoint failed {context}", exc_info=True) 

729 

730 def close_user_database(self, username: str): 

731 """Close a user's database connection.""" 

732 with self._connections_lock: 

733 if username in self.connections: 

734 try: 

735 self._checkpoint_wal( 

736 self.connections[username], f"for {username}" 

737 ) 

738 self.connections[username].dispose() 

739 except Exception: 

740 logger.warning( 

741 f"Failed to dispose engine for {username}", 

742 ) 

743 del self.connections[username] 

744 logger.info(f"Closed database for user {username}") 

745 

746 def close_all_databases(self): 

747 """Close all open user database connections and release file locks.""" 

748 with self._connections_lock: 

749 for username, engine in list(self.connections.items()): 

750 try: 

751 self._checkpoint_wal(engine, f"for {username}") 

752 engine.dispose() 

753 except Exception: 

754 logger.debug(f"Error disposing engine for {username}") 

755 self.connections.clear() 

756 

757 def check_database_integrity(self, username: str) -> bool: 

758 """Check integrity of a user's encrypted database.""" 

759 with self._connections_lock: 

760 if username not in self.connections: 

761 return False 

762 engine = self.connections[username] 

763 

764 try: 

765 with engine.connect() as conn: 

766 # Quick integrity check 

767 result = conn.execute(text("PRAGMA quick_check")) 

768 if result.fetchone()[0] != "ok": 

769 return False 

770 

771 # SQLCipher integrity check 

772 result = conn.execute(text("PRAGMA cipher_integrity_check")) 

773 # If this returns any rows, there are HMAC failures 

774 failures = list(result) 

775 if failures: 

776 logger.error( 

777 f"Integrity check failed for {username}: {len(failures)} HMAC failures" 

778 ) 

779 return False 

780 

781 return True 

782 

783 except Exception: 

784 logger.exception(f"Integrity check error for user: {username}") 

785 return False 

786 

787 def change_password( 

788 self, username: str, old_password: str, new_password: str 

789 ) -> bool: 

790 """Change the encryption password for a user's database. 

791 

792 This rekeys the SQLCipher database — no separate auth-DB 

793 password-hash update is needed because passwords are never 

794 stored. Login verification is done by attempting decryption. 

795 """ 

796 if not self.has_encryption: 

797 logger.warning( 

798 "Cannot change password - SQLCipher not available (databases are unencrypted)" 

799 ) 

800 return False 

801 

802 db_path = self._get_user_db_path(username) 

803 

804 if not db_path.exists(): 

805 return False 

806 

807 try: 

808 # Close existing connection if any 

809 self.close_user_database(username) 

810 

811 # Open with old password 

812 engine = self.open_user_database(username, old_password) 

813 if not engine: 

814 return False 

815 

816 # Rekey the database (only works with SQLCipher) 

817 with engine.connect() as conn: 

818 # Use centralized rekey function 

819 set_sqlcipher_rekey(conn, new_password, db_path=db_path) 

820 

821 logger.info(f"Password changed for user {username}") 

822 return True 

823 

824 except Exception: 

825 logger.exception(f"Failed to change password for user: {username}") 

826 return False 

827 finally: 

828 # Close the connection 

829 self.close_user_database(username) 

830 

831 def user_exists(self, username: str) -> bool: 

832 """Check if a user exists in the auth database.""" 

833 from .auth_db import auth_db_session 

834 from .models.auth import User 

835 

836 with auth_db_session() as session: 

837 user = session.query(User).filter_by(username=username).first() 

838 return user is not None 

839 

840 def get_memory_usage(self) -> Dict[str, Any]: 

841 """Get memory usage statistics.""" 

842 with self._connections_lock: 

843 num_connections = len(self.connections) 

844 return { 

845 "active_connections": num_connections, 

846 "active_sessions": 0, # Sessions are created on-demand, not tracked 

847 "estimated_memory_mb": num_connections 

848 * 3.5, # ~3.5MB per connection 

849 } 

850 

851 def create_thread_safe_session_for_metrics( 

852 self, username: str, password: str 

853 ): 

854 """ 

855 Create a new database session safe for use in background threads. 

856 

857 Previously this method created a dedicated NullPool engine per 

858 (username, thread_id) pair, which leaked file descriptors under 

859 load (SQLCipher + WAL holds 3 FDs per active connection and 

860 orphaned engines accumulated when @thread_cleanup did not fire). 

861 

862 It now routes through the shared per-user QueuePool engine at 

863 ``self.connections[username]``. That engine is already created 

864 with ``check_same_thread=False`` (so background threads are 

865 safe), is bounded by ``pool_size + max_overflow``, and is 

866 subject to the periodic ``dispose()`` workaround in 

867 ``connection_cleanup.py`` that mitigates the SQLCipher+WAL 

868 out-of-order-close FD leak. 

869 

870 Args: 

871 username: The username 

872 password: The user's password (encryption key), used only 

873 to open the user database on cache miss. 

874 

875 Returns: 

876 A SQLAlchemy Session bound to the per-user QueuePool engine. 

877 """ 

878 db_path = self._get_user_db_path(username) 

879 

880 if not db_path.exists(): 

881 raise ValueError(f"No database found for user {username}") 

882 

883 with self._connections_lock: 

884 engine = self.connections.get(username) 

885 

886 if engine is None: 

887 # Cache miss — open the user database. This is idempotent: 

888 # after the first call it just returns the cached engine. 

889 engine = self.open_user_database(username, password) 

890 if engine is None: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true

891 raise ValueError(f"Failed to open database for user {username}") 

892 

893 # Use SQLAlchemy's default expire_on_commit=True. 

894 Session = sessionmaker(bind=engine) 

895 return Session() 

896 

897 

898# Global instance 

899db_manager = DatabaseManager()