Coverage for src/local_deep_research/database/encrypted_db.py: 90%
336 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Encrypted database management using SQLCipher.
3Handles per-user encrypted databases with browser-friendly authentication.
4"""
6import os
7import threading
8import time
9from pathlib import Path
10from typing import Any, Dict, Optional
12from loguru import logger
13from sqlalchemy import create_engine, event, text
14from sqlalchemy.engine import Engine
15from sqlalchemy.orm import Session, sessionmaker
16from sqlalchemy.pool import QueuePool, StaticPool
18from ..config.paths import get_data_directory, get_user_database_filename
19from ..settings.env_registry import get_env_setting
20from .sqlcipher_compat import get_sqlcipher_module
21from .pool_config import POOL_PRE_PING, POOL_RECYCLE_SECONDS
22from .sqlcipher_utils import (
23 set_sqlcipher_key,
24 set_sqlcipher_rekey,
25 apply_cipher_defaults_before_key,
26 apply_sqlcipher_pragmas,
27 apply_performance_pragmas,
28 verify_sqlcipher_connection,
29 create_database_salt,
30 has_per_database_salt,
31 get_key_from_password,
32 get_sqlcipher_version,
33 create_sqlcipher_connection,
34)
37class DatabaseInitializationError(Exception):
38 """Raised when a per-user database opens but its schema can't be initialised.
40 Distinct from credential / decryption failures (which return ``None``
41 from :py:meth:`DatabaseManager.open_user_database`) so callers — chiefly
42 the login route — can avoid penalising the user's lockout counter and
43 surface a different error message. The credentials are valid; the
44 database state isn't.
45 """
48class DatabaseManager:
49 """Manages encrypted SQLCipher databases for each user."""
51 def __init__(self):
52 self.connections: Dict[str, Engine] = {}
53 self._connections_lock = threading.RLock()
54 self.data_dir = get_data_directory() / "encrypted_databases"
55 self.data_dir.mkdir(parents=True, exist_ok=True)
57 # Check SQLCipher availability
58 self.has_encryption = self._check_encryption_available()
60 # ----------------------------------------------------------------
61 # Pool class selection — see ADR-0004
62 #
63 # We use QueuePool (pool_size=20, max_overflow=40,
64 # pool_timeout=10) for production and StaticPool for tests.
65 #
66 # Why pool_size=20:
67 #
68 # 1. SQLCipher + WAL mode can leak file handles when connections
69 # close out of open-order. Fewer pooled connections = fewer
70 # opportunities for out-of-order closes during pool_recycle.
71 # See: https://github.com/sqlcipher/android-database-sqlcipher/issues/6
72 # See: https://github.com/dotnet/efcore/issues/35010
73 #
74 # 2. SQLite serializes all writes through a single file lock.
75 # Multiple pooled connections don't improve throughput — they
76 # just hold FDs (up to 3 per connection in WAL mode).
77 #
78 # 3. The cleanup scheduler periodically calls engine.dispose()
79 # to release all pooled connections, preventing long-lived
80 # handles from accumulating over days of idle operation.
81 #
82 # Why pool_size=20 and not 1: inject_current_user() creates a
83 # QueuePool session on every request via g.db_session. With the
84 # UI polling /api/research/<id>/status every 1-2s plus other
85 # API calls and before_request middleware, pool_size=1
86 # (max_overflow=2, so 3 total) is easily exhausted — causing
87 # 30-second timeouts and PendingRollbackError cascades.
88 # pool_size=20 + max_overflow=40 (60 total) provides ample
89 # headroom for concurrent requests and multiple browser tabs.
90 #
91 # Why not NullPool: SQLCipher's PRAGMA key adds ~0.2ms per
92 # connection open. With 20-30 queries per page load, NullPool
93 # adds a noticeable 4-6ms overhead vs QueuePool's ~1.5ms.
94 # ----------------------------------------------------------------
95 self._use_static_pool = bool(os.environ.get("TESTING"))
96 self._pool_class = StaticPool if self._use_static_pool else QueuePool
98 def _get_pool_kwargs(self) -> Dict[str, Any]:
99 """Get pool configuration kwargs based on pool type.
101 StaticPool doesn't support pool_size or max_overflow.
102 QueuePool uses moderate sizing to handle concurrent web requests
103 while limiting FD usage. See ADR-0004 for rationale.
104 """
105 if self._use_static_pool:
106 return {}
107 return {
108 "pool_size": 20,
109 "max_overflow": 40,
110 "pool_timeout": 10,
111 "pool_pre_ping": POOL_PRE_PING,
112 "pool_recycle": POOL_RECYCLE_SECONDS,
113 }
115 def _is_valid_encryption_key(self, password: str) -> bool:
116 """
117 Check if the provided password is valid (not None, empty, or whitespace-only).
119 Args:
120 password: The password to check
122 Returns:
123 True if the password is valid, False otherwise
124 """
125 return password is not None and password.strip() != ""
127 def is_user_connected(self, username: str) -> bool:
128 """Check if a user has an active database connection.
130 Thread-safe accessor for external callers.
132 Args:
133 username: The username to check
135 Returns:
136 True if the user has an active connection
137 """
138 with self._connections_lock:
139 return username in self.connections
141 def _check_encryption_available(self) -> bool:
142 """Check if SQLCipher is available for encryption."""
143 try:
144 import os as os_module
145 import tempfile
147 # Test if SQLCipher actually works, not just if it imports
148 with tempfile.NamedTemporaryFile(delete=False) as tmp:
149 tmp_path = tmp.name
151 try:
152 # Try to create a test encrypted database
153 sqlcipher_module = get_sqlcipher_module()
154 sqlcipher = sqlcipher_module.dbapi2
156 conn = sqlcipher.connect(tmp_path)
157 try:
158 cursor = conn.cursor()
159 # Use creation_mode=True since we're creating a new test database
160 apply_cipher_defaults_before_key(cursor)
161 # Use centralized key setting
162 set_sqlcipher_key(cursor, "testpass")
163 # Apply post-key pragmas (kdf_iter for new DB)
164 apply_sqlcipher_pragmas(cursor, creation_mode=True)
165 apply_performance_pragmas(cursor)
167 # Check SQLCipher version
168 version = get_sqlcipher_version(cursor)
169 if version: 169 ↛ 181line 169 didn't jump to line 181 because the condition on line 169 was always true
170 major = (
171 version.split(".")[0]
172 if "." in version
173 else version[0]
174 )
175 if major.isdigit() and int(major) < 4: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 logger.warning(
177 f"SQLCipher version {version} detected. "
178 "Version 4.x+ is recommended for proper PRAGMA ordering."
179 )
181 cursor.close()
182 # Now use the connection for table operations
183 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
184 conn.execute("INSERT INTO test VALUES (1)")
185 result = conn.execute("SELECT * FROM test").fetchone()
187 if result != (1,): 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 raise RuntimeError("SQLCipher encryption test failed")
189 logger.info(
190 "SQLCipher available and working - databases will be encrypted"
191 )
192 return True
193 finally:
194 from ..utilities.resource_utils import safe_close
196 safe_close(conn, "SQLCipher test connection")
197 except Exception:
198 logger.warning("SQLCipher module found but not working")
199 raise ImportError("SQLCipher not functional")
200 finally:
201 # Clean up test file
202 try:
203 os_module.unlink(tmp_path)
204 except OSError as e:
205 logger.debug(
206 f"Failed to clean up temp file {tmp_path}: {e}"
207 )
209 except ImportError:
210 # Check if user has explicitly allowed unencrypted databases.
211 # Registry handles deprecated LDR_ALLOW_UNENCRYPTED fallback automatically.
212 allow_unencrypted = get_env_setting(
213 "bootstrap.allow_unencrypted", False
214 )
216 if not allow_unencrypted:
217 logger.exception(
218 "SECURITY ERROR: SQLCipher is not installed!\n"
219 "Your databases will NOT be encrypted.\n"
220 "To fix this:\n"
221 "1. Install SQLCipher: sudo apt install sqlcipher libsqlcipher-dev\n"
222 "2. Reinstall project: pdm install\n"
223 "Or use Docker with SQLCipher pre-installed.\n\n"
224 "To explicitly allow unencrypted databases (NOT RECOMMENDED):\n"
225 "export LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true"
226 )
227 raise RuntimeError(
228 "SQLCipher not available. Set LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true to proceed without encryption (NOT RECOMMENDED)"
229 )
230 logger.warning(
231 "WARNING: Running with UNENCRYPTED databases!\n"
232 "This means:\n"
233 "- Passwords don't protect data access\n"
234 "- API keys are stored in plain text\n"
235 "- Anyone with file access can read all data\n"
236 "Install SQLCipher for secure operation!"
237 )
238 return False
240 def _get_user_db_path(self, username: str) -> Path:
241 """Get the path for a user's encrypted database."""
242 return self.data_dir / get_user_database_filename(username)
244 def _apply_pragmas(self, connection, connection_record):
245 """Apply pragmas for optimal performance."""
246 # Check if this is SQLCipher or regular SQLite
247 is_encrypted = self.has_encryption
249 # Use centralized performance pragma application
251 apply_performance_pragmas(connection)
253 # SQLCipher-specific pragmas
254 if is_encrypted: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 from .sqlcipher_utils import get_sqlcipher_settings
257 settings = get_sqlcipher_settings()
258 pragmas = [
259 f"PRAGMA kdf_iter = {settings['kdf_iterations']}",
260 f"PRAGMA cipher_page_size = {settings['page_size']}",
261 ]
262 for pragma in pragmas:
263 try:
264 connection.execute(pragma)
265 except Exception as e:
266 logger.debug(f"Could not apply pragma '{pragma}': {e}")
267 else:
268 # Regular SQLite pragma
269 try:
270 connection.execute(
271 "PRAGMA mmap_size = 268435456"
272 ) # 256MB memory mapping
273 except Exception as e:
274 logger.debug(f"Could not apply mmap_size pragma: {e}")
276 @staticmethod
277 def _make_sqlcipher_connection(
278 db_path: Path,
279 password: str,
280 isolation_level: Optional[str] = "IMMEDIATE",
281 check_same_thread: bool = False,
282 ) -> Any:
283 """Create a properly initialized SQLCipher connection.
285 Follows the canonical SQLCipher initialization order: set key,
286 apply cipher pragmas, verify, then apply performance pragmas.
287 Cipher pragmas (page size, HMAC algorithm, KDF iterations) must
288 be configured before the first query (verification) because that
289 query triggers page decryption with the active cipher settings.
291 Args:
292 db_path: Path to the database file
293 password: The database encryption passphrase
294 isolation_level: SQLite isolation level (``""`` for deferred
295 transactions, ``None`` for autocommit)
296 check_same_thread: SQLite check_same_thread flag
298 Returns:
299 A raw ``sqlcipher3`` connection ready for use.
301 Raises:
302 ValueError: If the database key cannot be verified.
303 """
304 sqlcipher3 = get_sqlcipher_module()
305 conn = sqlcipher3.connect(
306 str(db_path),
307 isolation_level=isolation_level,
308 check_same_thread=check_same_thread,
309 )
310 cursor = conn.cursor()
312 try:
313 set_sqlcipher_key(cursor, password, db_path=db_path)
314 apply_sqlcipher_pragmas(cursor, creation_mode=False)
316 if not verify_sqlcipher_connection(cursor):
317 raise ValueError("Failed to verify database key") # noqa: TRY301 — cleanup in except before re-raise
319 apply_performance_pragmas(cursor)
320 except Exception:
321 try:
322 cursor.close()
323 except Exception: # noqa: BLE001
324 logger.warning("Failed to close cursor during cleanup")
325 from ..utilities.resource_utils import safe_close
327 safe_close(conn, "encrypted DB connection")
328 raise
330 cursor.close()
331 return conn
333 def create_user_database(self, username: str, password: str) -> Engine:
334 """Create a new encrypted database for a user."""
336 # Validate the encryption key
337 if not self._is_valid_encryption_key(password):
338 logger.error(
339 f"Invalid encryption key for user {username}: password is None or empty"
340 )
341 raise ValueError(
342 "Invalid encryption key: password cannot be None or empty"
343 )
345 db_path = self._get_user_db_path(username)
347 if db_path.exists():
348 raise ValueError(f"Database already exists for user {username}")
350 # Create connection string - use regular SQLite when SQLCipher not available
351 if self.has_encryption:
352 # Create directory if it doesn't exist
353 db_path.parent.mkdir(parents=True, exist_ok=True)
355 # Create per-database salt for new databases (v2 security improvement)
356 create_database_salt(db_path)
357 logger.info(f"Created per-database salt for {username}")
359 # Pre-derive key before closures to avoid capturing plaintext password
360 hex_key = get_key_from_password(password, db_path=db_path).hex()
362 # Create database structure using raw SQLCipher outside SQLAlchemy
363 try:
364 conn = create_sqlcipher_connection(
365 db_path,
366 password=password,
367 creation_mode=True,
368 connect_kwargs={
369 # DEFERRED (empty string) so pure-SELECT transactions
370 # acquire only SQLite's SHARED lock, letting WAL-mode
371 # concurrent readers proceed while a writer is active.
372 # IMMEDIATE was previously set "defensively" and made
373 # every transaction (even reads) take a RESERVED lock,
374 # which was the single biggest contention source on
375 # the login-hang path. Race-prone check-then-insert
376 # call sites were made race-free at the application
377 # layer in the preceding prerequisite PR.
378 "isolation_level": "",
379 "check_same_thread": False,
380 },
381 )
382 try:
383 # Get the CREATE TABLE statements from SQLAlchemy models
384 from sqlalchemy.dialects import sqlite
385 from sqlalchemy.schema import CreateIndex, CreateTable
387 from .models import Base
389 # Indexes must be emitted explicitly — SQLAlchemy compiles
390 # `index=True`/`unique=True` and `Index(...)` to separate
391 # CREATE [UNIQUE] INDEX statements, not inline.
392 sqlite_dialect = sqlite.dialect()
393 for table in Base.metadata.sorted_tables:
394 if table.name == "users":
395 continue
396 create_sql = str(
397 CreateTable(table, if_not_exists=True).compile(
398 dialect=sqlite_dialect
399 )
400 )
401 logger.debug(f"Creating table {table.name}")
402 conn.execute(create_sql)
403 for index in table.indexes:
404 index_sql = str(
405 CreateIndex(index, if_not_exists=True).compile(
406 dialect=sqlite_dialect
407 )
408 )
409 conn.execute(index_sql)
411 conn.commit()
412 finally:
413 from ..utilities.resource_utils import safe_close
415 safe_close(conn, "user DB setup connection")
417 logger.info(
418 f"Database structure created successfully for {username}"
419 )
421 except Exception:
422 logger.exception("Error creating database structure")
423 # Cleanup partial DB file on failure
424 if db_path.exists():
425 db_path.unlink(missing_ok=True)
426 raise
428 # Small delay to ensure file is fully written
429 import time
431 time.sleep(0.1)
433 # Now create SQLAlchemy engine using custom connection creator
434 def create_engine_connection():
435 """Create a properly initialized SQLCipher connection."""
436 return create_sqlcipher_connection(
437 db_path,
438 hex_key=hex_key,
439 creation_mode=False,
440 connect_kwargs={
441 # DEFERRED (empty string) so pure-SELECT transactions
442 # acquire only SQLite's SHARED lock, letting WAL-mode
443 # concurrent readers proceed while a writer is active.
444 # IMMEDIATE was previously set "defensively" and made
445 # every transaction (even reads) take a RESERVED lock,
446 # which was the single biggest contention source on
447 # the login-hang path. Race-prone check-then-insert
448 # call sites were made race-free at the application
449 # layer in the preceding prerequisite PR.
450 "isolation_level": "",
451 "check_same_thread": False,
452 },
453 )
455 # Create engine with custom creator function and optimized cache
456 engine = create_engine(
457 "sqlite://",
458 creator=create_engine_connection,
459 poolclass=self._pool_class,
460 echo=False,
461 query_cache_size=1000,
462 **self._get_pool_kwargs(),
463 )
464 else:
465 logger.warning(
466 f"SQLCipher not available - creating UNENCRYPTED database for user {username}"
467 )
468 # Fall back to regular SQLite with query cache
469 engine = create_engine(
470 f"sqlite:///{db_path}",
471 connect_args={"check_same_thread": False, "timeout": 30},
472 poolclass=self._pool_class,
473 echo=False,
474 query_cache_size=1000,
475 **self._get_pool_kwargs(),
476 )
478 # For unencrypted databases, just apply pragmas
479 event.listen(engine, "connect", self._apply_pragmas)
481 # Tables have already been created using raw SQLCipher above
482 # No need to create them again with SQLAlchemy
484 # Initialize database tables using centralized initialization
485 from .initialize import initialize_database
487 # Mirror of the fail-loud change #3635 made to open_user_database.
488 # Previously this swallowed the exception with "tables exist but
489 # schema version not stamped — migrations will be retried on next
490 # process restart". That left a half-broken DB on disk: tables
491 # present, no alembic_version row. The next login then re-ran
492 # alembic, hit the same error, and (post-#3635) 503'd — so the
493 # user could register but never log in again. Better to fail
494 # registration loudly with the partial DB removed, so the real
495 # cause (e.g. world-writable migrations dir) gets fixed instead
496 # of producing a permanently-locked-out account.
497 try:
498 Session = sessionmaker(bind=engine)
499 with Session() as session:
500 initialize_database(engine, session)
501 except Exception:
502 logger.exception(
503 f"Database migration failed for {username} during creation"
504 " — removing partial DB"
505 )
506 engine.dispose()
507 if db_path.exists(): 507 ↛ 509line 507 didn't jump to line 509 because the condition on line 507 was always true
508 db_path.unlink(missing_ok=True)
509 raise
511 # Store connection AFTER migrations complete
512 with self._connections_lock:
513 self.connections[username] = engine
515 logger.info(f"Created encrypted database for user {username}")
516 return engine
518 def open_user_database(
519 self, username: str, password: str
520 ) -> Optional[Engine]:
521 """Open an existing encrypted database for a user."""
522 open_start = time.perf_counter()
524 # Validate the encryption key
525 if not self._is_valid_encryption_key(password):
526 logger.error(
527 f"Invalid encryption key when opening database for user {username}: password is None or empty"
528 )
529 # TODO: Fix the root cause - research threads are not getting the correct password
530 logger.error(
531 "TODO: This usually means the research thread is not receiving the user's "
532 "password for database encryption. Need to ensure password is passed from "
533 "the main thread to research threads."
534 )
535 raise ValueError(
536 "Invalid encryption key: password cannot be None or empty"
537 )
539 # Check if already open
540 with self._connections_lock:
541 if username in self.connections:
542 return self.connections[username]
544 db_path = self._get_user_db_path(username)
546 # Prevent timing attacks: always derive key before checking file existence
547 # This ensures both existing and non-existent users take the same amount of time,
548 # preventing username enumeration via timing analysis.
549 # Pre-derive key before closures to avoid capturing plaintext password
550 hex_key = get_key_from_password(password, db_path=db_path).hex()
552 if not db_path.exists():
553 logger.error(f"No database found for user {username}")
554 return None
556 # Warn if this is a legacy database without per-database salt
557 if self.has_encryption and not has_per_database_salt(db_path): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 logger.warning(
559 f"Database for user '{username}' uses the legacy shared salt "
560 f"(deprecated). For improved security, consider creating a new "
561 f"account to get a per-database salt. Legacy databases remain "
562 f"fully functional but are less resistant to multi-target attacks."
563 )
565 # Create connection string - use regular SQLite when SQLCipher not available
566 if self.has_encryption:
568 def create_open_connection():
569 """Create a properly initialized SQLCipher connection."""
570 return create_sqlcipher_connection(
571 db_path,
572 hex_key=hex_key,
573 creation_mode=False,
574 connect_kwargs={
575 # DEFERRED (empty string) so pure-SELECT transactions
576 # acquire only SQLite's SHARED lock, letting WAL-mode
577 # concurrent readers proceed while a writer is active.
578 # IMMEDIATE was previously set "defensively" and made
579 # every transaction (even reads) take a RESERVED lock,
580 # which was the single biggest contention source on
581 # the login-hang path. Race-prone check-then-insert
582 # call sites were made race-free at the application
583 # layer in the preceding prerequisite PR.
584 "isolation_level": "",
585 "check_same_thread": False,
586 },
587 )
589 # Create engine with custom creator function and optimized cache
590 engine = create_engine(
591 "sqlite://",
592 creator=create_open_connection,
593 poolclass=self._pool_class,
594 echo=False,
595 query_cache_size=1000,
596 **self._get_pool_kwargs(),
597 )
598 else:
599 logger.warning(
600 f"SQLCipher not available - opening UNENCRYPTED database for user {username}"
601 )
602 # Fall back to regular SQLite (no password protection!)
603 engine = create_engine(
604 f"sqlite:///{db_path}",
605 connect_args={"check_same_thread": False, "timeout": 30},
606 poolclass=self._pool_class,
607 echo=False,
608 query_cache_size=1000,
609 **self._get_pool_kwargs(),
610 )
612 # For unencrypted databases, just apply pragmas
613 event.listen(engine, "connect", self._apply_pragmas)
615 try:
616 # Test connection by running a simple query
617 with engine.connect() as conn:
618 conn.execute(text("SELECT 1"))
620 # Run database initialization (creates missing tables and runs migrations)
621 from .initialize import initialize_database
623 # Create backup before migration to protect against schema change failures
624 from .alembic_runner import needs_migration
626 if needs_migration(engine):
627 try:
628 from .backup.backup_service import BackupService
630 result = BackupService(
631 username=username, password=password
632 ).create_backup(force=True)
633 if result.success: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true
634 logger.info(
635 f"Pre-migration backup created: {result.backup_path}"
636 )
637 else:
638 logger.error(
639 f"Pre-migration backup failed: {result.error}"
640 )
641 except Exception:
642 logger.exception(
643 "Pre-migration backup failed — proceeding with migration"
644 )
646 # Init failures need to be distinguishable from credential
647 # failures at the call site: the credentials worked (we
648 # decrypted and ran SELECT 1), but the schema couldn't be
649 # brought up. Re-raise as a typed error so the login route
650 # can skip the lockout counter and surface a server-error
651 # message instead of "Invalid username or password".
652 try:
653 initialize_database(engine)
654 except Exception as init_err:
655 elapsed_ms = (time.perf_counter() - open_start) * 1000
656 logger.exception(
657 f"Database migration failed for {username} "
658 f"after {elapsed_ms:.0f}ms — refusing login"
659 )
660 engine.dispose()
661 raise DatabaseInitializationError(
662 f"Database initialisation failed for {username}: {init_err}"
663 ) from init_err
665 # Store connection AFTER migrations complete
666 with self._connections_lock:
667 self.connections[username] = engine
669 elapsed_ms = (time.perf_counter() - open_start) * 1000
670 if elapsed_ms > 100:
671 logger.info(
672 f"Opened encrypted database for user {username} "
673 f"(cold-open wall clock: {elapsed_ms:.0f}ms)"
674 )
675 else:
676 logger.info(
677 f"Opened encrypted database for user {username} "
678 f"({elapsed_ms:.0f}ms)"
679 )
680 return engine
682 except DatabaseInitializationError:
683 # Already logged + engine disposed at the raise site. Re-raise
684 # past the catch-all below so callers see the typed error.
685 raise
686 except Exception:
687 elapsed_ms = (time.perf_counter() - open_start) * 1000
688 logger.exception(
689 f"Failed to open database for user {username} "
690 f"after {elapsed_ms:.0f}ms"
691 )
692 engine.dispose()
693 return None
695 def get_session(self, username: str) -> Optional[Session]:
696 """Create a new session for a user's database."""
697 with self._connections_lock:
698 if username not in self.connections:
699 # Use debug level for this common scenario to reduce log noise
700 logger.debug(f"No open database for user {username}")
701 return None
702 engine = self.connections[username]
703 # Create session inside lock to prevent race with close_user_database()
704 SessionLocal = sessionmaker(bind=engine)
705 return SessionLocal()
707 def get_connected_usernames(self) -> set:
708 """Return a snapshot of usernames with open connections."""
709 with self._connections_lock:
710 return set(self.connections.keys())
712 def _checkpoint_wal(self, engine, context: str = ""):
713 """Checkpoint WAL before disposing engine to flush pending writes."""
714 try:
715 with engine.connect() as conn:
716 # TRUNCATE returns (busy, log_pages, checkpointed_pages).
717 # busy=1 means a reader/writer held a WAL lock and the WAL
718 # was NOT truncated — surface that so we can tell from logs
719 # whether the helper actually shrank the file.
720 result = conn.execute(
721 text("PRAGMA wal_checkpoint(TRUNCATE)")
722 ).fetchone()
723 if result and result[0] == 1: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 logger.debug(
725 f"WAL checkpoint busy {context} — WAL not truncated"
726 )
727 except Exception:
728 logger.debug(f"WAL checkpoint failed {context}", exc_info=True)
730 def close_user_database(self, username: str):
731 """Close a user's database connection."""
732 with self._connections_lock:
733 if username in self.connections:
734 try:
735 self._checkpoint_wal(
736 self.connections[username], f"for {username}"
737 )
738 self.connections[username].dispose()
739 except Exception:
740 logger.warning(
741 f"Failed to dispose engine for {username}",
742 )
743 del self.connections[username]
744 logger.info(f"Closed database for user {username}")
746 def close_all_databases(self):
747 """Close all open user database connections and release file locks."""
748 with self._connections_lock:
749 for username, engine in list(self.connections.items()):
750 try:
751 self._checkpoint_wal(engine, f"for {username}")
752 engine.dispose()
753 except Exception:
754 logger.debug(f"Error disposing engine for {username}")
755 self.connections.clear()
757 def check_database_integrity(self, username: str) -> bool:
758 """Check integrity of a user's encrypted database."""
759 with self._connections_lock:
760 if username not in self.connections:
761 return False
762 engine = self.connections[username]
764 try:
765 with engine.connect() as conn:
766 # Quick integrity check
767 result = conn.execute(text("PRAGMA quick_check"))
768 if result.fetchone()[0] != "ok":
769 return False
771 # SQLCipher integrity check
772 result = conn.execute(text("PRAGMA cipher_integrity_check"))
773 # If this returns any rows, there are HMAC failures
774 failures = list(result)
775 if failures:
776 logger.error(
777 f"Integrity check failed for {username}: {len(failures)} HMAC failures"
778 )
779 return False
781 return True
783 except Exception:
784 logger.exception(f"Integrity check error for user: {username}")
785 return False
787 def change_password(
788 self, username: str, old_password: str, new_password: str
789 ) -> bool:
790 """Change the encryption password for a user's database.
792 This rekeys the SQLCipher database — no separate auth-DB
793 password-hash update is needed because passwords are never
794 stored. Login verification is done by attempting decryption.
795 """
796 if not self.has_encryption:
797 logger.warning(
798 "Cannot change password - SQLCipher not available (databases are unencrypted)"
799 )
800 return False
802 db_path = self._get_user_db_path(username)
804 if not db_path.exists():
805 return False
807 try:
808 # Close existing connection if any
809 self.close_user_database(username)
811 # Open with old password
812 engine = self.open_user_database(username, old_password)
813 if not engine:
814 return False
816 # Rekey the database (only works with SQLCipher)
817 with engine.connect() as conn:
818 # Use centralized rekey function
819 set_sqlcipher_rekey(conn, new_password, db_path=db_path)
821 logger.info(f"Password changed for user {username}")
822 return True
824 except Exception:
825 logger.exception(f"Failed to change password for user: {username}")
826 return False
827 finally:
828 # Close the connection
829 self.close_user_database(username)
831 def user_exists(self, username: str) -> bool:
832 """Check if a user exists in the auth database."""
833 from .auth_db import auth_db_session
834 from .models.auth import User
836 with auth_db_session() as session:
837 user = session.query(User).filter_by(username=username).first()
838 return user is not None
840 def get_memory_usage(self) -> Dict[str, Any]:
841 """Get memory usage statistics."""
842 with self._connections_lock:
843 num_connections = len(self.connections)
844 return {
845 "active_connections": num_connections,
846 "active_sessions": 0, # Sessions are created on-demand, not tracked
847 "estimated_memory_mb": num_connections
848 * 3.5, # ~3.5MB per connection
849 }
851 def create_thread_safe_session_for_metrics(
852 self, username: str, password: str
853 ):
854 """
855 Create a new database session safe for use in background threads.
857 Previously this method created a dedicated NullPool engine per
858 (username, thread_id) pair, which leaked file descriptors under
859 load (SQLCipher + WAL holds 3 FDs per active connection and
860 orphaned engines accumulated when @thread_cleanup did not fire).
862 It now routes through the shared per-user QueuePool engine at
863 ``self.connections[username]``. That engine is already created
864 with ``check_same_thread=False`` (so background threads are
865 safe), is bounded by ``pool_size + max_overflow``, and is
866 subject to the periodic ``dispose()`` workaround in
867 ``connection_cleanup.py`` that mitigates the SQLCipher+WAL
868 out-of-order-close FD leak.
870 Args:
871 username: The username
872 password: The user's password (encryption key), used only
873 to open the user database on cache miss.
875 Returns:
876 A SQLAlchemy Session bound to the per-user QueuePool engine.
877 """
878 db_path = self._get_user_db_path(username)
880 if not db_path.exists():
881 raise ValueError(f"No database found for user {username}")
883 with self._connections_lock:
884 engine = self.connections.get(username)
886 if engine is None:
887 # Cache miss — open the user database. This is idempotent:
888 # after the first call it just returns the cached engine.
889 engine = self.open_user_database(username, password)
890 if engine is None: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 raise ValueError(f"Failed to open database for user {username}")
893 # Use SQLAlchemy's default expire_on_commit=True.
894 Session = sessionmaker(bind=engine)
895 return Session()
898# Global instance
899db_manager = DatabaseManager()