Coverage for src / local_deep_research / database / encrypted_db.py: 89%
306 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Encrypted database management using SQLCipher.
3Handles per-user encrypted databases with browser-friendly authentication.
4"""
6import os
7import threading
8from pathlib import Path
9from typing import Any, Dict, Optional
11from loguru import logger
12from sqlalchemy import create_engine, event, text
13from sqlalchemy.engine import Engine
14from sqlalchemy.orm import Session, sessionmaker
15from sqlalchemy.pool import QueuePool, StaticPool
17from ..config.paths import get_data_directory, get_user_database_filename
18from ..settings.env_registry import get_env_setting
19from .sqlcipher_compat import get_sqlcipher_module
20from .pool_config import POOL_PRE_PING, POOL_RECYCLE_SECONDS
21from .sqlcipher_utils import (
22 set_sqlcipher_key,
23 set_sqlcipher_rekey,
24 apply_cipher_defaults_before_key,
25 apply_sqlcipher_pragmas,
26 apply_performance_pragmas,
27 verify_sqlcipher_connection,
28 create_database_salt,
29 has_per_database_salt,
30 get_key_from_password,
31 get_sqlcipher_version,
32 create_sqlcipher_connection,
33)
36class DatabaseManager:
37 """Manages encrypted SQLCipher databases for each user."""
39 def __init__(self):
40 self.connections: Dict[str, Engine] = {}
41 self._connections_lock = threading.RLock()
42 self.data_dir = get_data_directory() / "encrypted_databases"
43 self.data_dir.mkdir(parents=True, exist_ok=True)
45 # Check SQLCipher availability
46 self.has_encryption = self._check_encryption_available()
48 # ----------------------------------------------------------------
49 # Pool class selection — see ADR-0004
50 #
51 # We use QueuePool (pool_size=20, max_overflow=40,
52 # pool_timeout=10) for production and StaticPool for tests.
53 #
54 # Why pool_size=20:
55 #
56 # 1. SQLCipher + WAL mode can leak file handles when connections
57 # close out of open-order. Fewer pooled connections = fewer
58 # opportunities for out-of-order closes during pool_recycle.
59 # See: https://github.com/sqlcipher/android-database-sqlcipher/issues/6
60 # See: https://github.com/dotnet/efcore/issues/35010
61 #
62 # 2. SQLite serializes all writes through a single file lock.
63 # Multiple pooled connections don't improve throughput — they
64 # just hold FDs (up to 3 per connection in WAL mode).
65 #
66 # 3. The cleanup scheduler periodically calls engine.dispose()
67 # to release all pooled connections, preventing long-lived
68 # handles from accumulating over days of idle operation.
69 #
70 # Why pool_size=20 and not 1: inject_current_user() creates a
71 # QueuePool session on every request via g.db_session. With the
72 # UI polling /api/research/<id>/status every 1-2s plus other
73 # API calls and before_request middleware, pool_size=1
74 # (max_overflow=2, so 3 total) is easily exhausted — causing
75 # 30-second timeouts and PendingRollbackError cascades.
76 # pool_size=20 + max_overflow=40 (60 total) provides ample
77 # headroom for concurrent requests and multiple browser tabs.
78 #
79 # Why not NullPool: SQLCipher's PRAGMA key adds ~0.2ms per
80 # connection open. With 20-30 queries per page load, NullPool
81 # adds a noticeable 4-6ms overhead vs QueuePool's ~1.5ms.
82 # ----------------------------------------------------------------
83 self._use_static_pool = bool(os.environ.get("TESTING"))
84 self._pool_class = StaticPool if self._use_static_pool else QueuePool
86 def _get_pool_kwargs(self) -> Dict[str, Any]:
87 """Get pool configuration kwargs based on pool type.
89 StaticPool doesn't support pool_size or max_overflow.
90 QueuePool uses moderate sizing to handle concurrent web requests
91 while limiting FD usage. See ADR-0004 for rationale.
92 """
93 if self._use_static_pool:
94 return {}
95 return {
96 "pool_size": 20,
97 "max_overflow": 40,
98 "pool_timeout": 10,
99 "pool_pre_ping": POOL_PRE_PING,
100 "pool_recycle": POOL_RECYCLE_SECONDS,
101 }
103 def _is_valid_encryption_key(self, password: str) -> bool:
104 """
105 Check if the provided password is valid (not None, empty, or whitespace-only).
107 Args:
108 password: The password to check
110 Returns:
111 True if the password is valid, False otherwise
112 """
113 return password is not None and password.strip() != ""
115 def is_user_connected(self, username: str) -> bool:
116 """Check if a user has an active database connection.
118 Thread-safe accessor for external callers.
120 Args:
121 username: The username to check
123 Returns:
124 True if the user has an active connection
125 """
126 with self._connections_lock:
127 return username in self.connections
129 def _check_encryption_available(self) -> bool:
130 """Check if SQLCipher is available for encryption."""
131 try:
132 import os as os_module
133 import tempfile
135 # Test if SQLCipher actually works, not just if it imports
136 with tempfile.NamedTemporaryFile(delete=False) as tmp:
137 tmp_path = tmp.name
139 try:
140 # Try to create a test encrypted database
141 sqlcipher_module = get_sqlcipher_module()
142 sqlcipher = sqlcipher_module.dbapi2
144 conn = sqlcipher.connect(tmp_path)
145 try:
146 cursor = conn.cursor()
147 # Use creation_mode=True since we're creating a new test database
148 apply_cipher_defaults_before_key(cursor)
149 # Use centralized key setting
150 set_sqlcipher_key(cursor, "testpass")
151 # Apply post-key pragmas (kdf_iter for new DB)
152 apply_sqlcipher_pragmas(cursor, creation_mode=True)
153 apply_performance_pragmas(cursor)
155 # Check SQLCipher version
156 version = get_sqlcipher_version(cursor)
157 if version: 157 ↛ 169line 157 didn't jump to line 169 because the condition on line 157 was always true
158 major = (
159 version.split(".")[0]
160 if "." in version
161 else version[0]
162 )
163 if major.isdigit() and int(major) < 4: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 logger.warning(
165 f"SQLCipher version {version} detected. "
166 "Version 4.x+ is recommended for proper PRAGMA ordering."
167 )
169 cursor.close()
170 # Now use the connection for table operations
171 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
172 conn.execute("INSERT INTO test VALUES (1)")
173 result = conn.execute("SELECT * FROM test").fetchone()
175 if result != (1,): 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 raise RuntimeError("SQLCipher encryption test failed")
177 logger.info(
178 "SQLCipher available and working - databases will be encrypted"
179 )
180 return True
181 finally:
182 from ..utilities.resource_utils import safe_close
184 safe_close(conn, "SQLCipher test connection")
185 except Exception:
186 logger.warning("SQLCipher module found but not working")
187 raise ImportError("SQLCipher not functional")
188 finally:
189 # Clean up test file
190 try:
191 os_module.unlink(tmp_path)
192 except OSError as e:
193 logger.debug(
194 f"Failed to clean up temp file {tmp_path}: {e}"
195 )
197 except ImportError:
198 # Check if user has explicitly allowed unencrypted databases.
199 # Registry handles deprecated LDR_ALLOW_UNENCRYPTED fallback automatically.
200 allow_unencrypted = get_env_setting(
201 "bootstrap.allow_unencrypted", False
202 )
204 if not allow_unencrypted:
205 logger.exception(
206 "SECURITY ERROR: SQLCipher is not installed!\n"
207 "Your databases will NOT be encrypted.\n"
208 "To fix this:\n"
209 "1. Install SQLCipher: sudo apt install sqlcipher libsqlcipher-dev\n"
210 "2. Reinstall project: pdm install\n"
211 "Or use Docker with SQLCipher pre-installed.\n\n"
212 "To explicitly allow unencrypted databases (NOT RECOMMENDED):\n"
213 "export LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true"
214 )
215 raise RuntimeError(
216 "SQLCipher not available. Set LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true to proceed without encryption (NOT RECOMMENDED)"
217 )
218 logger.warning(
219 "WARNING: Running with UNENCRYPTED databases!\n"
220 "This means:\n"
221 "- Passwords don't protect data access\n"
222 "- API keys are stored in plain text\n"
223 "- Anyone with file access can read all data\n"
224 "Install SQLCipher for secure operation!"
225 )
226 return False
228 def _get_user_db_path(self, username: str) -> Path:
229 """Get the path for a user's encrypted database."""
230 return self.data_dir / get_user_database_filename(username)
232 def _apply_pragmas(self, connection, connection_record):
233 """Apply pragmas for optimal performance."""
234 # Check if this is SQLCipher or regular SQLite
235 is_encrypted = self.has_encryption
237 # Use centralized performance pragma application
239 apply_performance_pragmas(connection)
241 # SQLCipher-specific pragmas
242 if is_encrypted: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 from .sqlcipher_utils import get_sqlcipher_settings
245 settings = get_sqlcipher_settings()
246 pragmas = [
247 f"PRAGMA kdf_iter = {settings['kdf_iterations']}",
248 f"PRAGMA cipher_page_size = {settings['page_size']}",
249 ]
250 for pragma in pragmas:
251 try:
252 connection.execute(pragma)
253 except Exception as e:
254 logger.debug(f"Could not apply pragma '{pragma}': {e}")
255 else:
256 # Regular SQLite pragma
257 try:
258 connection.execute(
259 "PRAGMA mmap_size = 268435456"
260 ) # 256MB memory mapping
261 except Exception as e:
262 logger.debug(f"Could not apply mmap_size pragma: {e}")
264 @staticmethod
265 def _make_sqlcipher_connection(
266 db_path: Path,
267 password: str,
268 isolation_level: Optional[str] = "IMMEDIATE",
269 check_same_thread: bool = False,
270 ) -> Any:
271 """Create a properly initialized SQLCipher connection.
273 Follows the canonical SQLCipher initialization order: set key,
274 apply cipher pragmas, verify, then apply performance pragmas.
275 Cipher pragmas (page size, HMAC algorithm, KDF iterations) must
276 be configured before the first query (verification) because that
277 query triggers page decryption with the active cipher settings.
279 Args:
280 db_path: Path to the database file
281 password: The database encryption passphrase
282 isolation_level: SQLite isolation level (``""`` for deferred
283 transactions, ``None`` for autocommit)
284 check_same_thread: SQLite check_same_thread flag
286 Returns:
287 A raw ``sqlcipher3`` connection ready for use.
289 Raises:
290 ValueError: If the database key cannot be verified.
291 """
292 sqlcipher3 = get_sqlcipher_module()
293 conn = sqlcipher3.connect(
294 str(db_path),
295 isolation_level=isolation_level,
296 check_same_thread=check_same_thread,
297 )
298 cursor = conn.cursor()
300 try:
301 set_sqlcipher_key(cursor, password, db_path=db_path)
302 apply_sqlcipher_pragmas(cursor, creation_mode=False)
304 if not verify_sqlcipher_connection(cursor):
305 raise ValueError("Failed to verify database key") # noqa: TRY301 — cleanup in except before re-raise
307 apply_performance_pragmas(cursor)
308 except Exception:
309 try:
310 cursor.close()
311 except Exception: # noqa: BLE001
312 logger.warning("Failed to close cursor during cleanup")
313 from ..utilities.resource_utils import safe_close
315 safe_close(conn, "encrypted DB connection")
316 raise
318 cursor.close()
319 return conn
321 def create_user_database(self, username: str, password: str) -> Engine:
322 """Create a new encrypted database for a user."""
324 # Validate the encryption key
325 if not self._is_valid_encryption_key(password):
326 logger.error(
327 f"Invalid encryption key for user {username}: password is None or empty"
328 )
329 raise ValueError(
330 "Invalid encryption key: password cannot be None or empty"
331 )
333 db_path = self._get_user_db_path(username)
335 if db_path.exists():
336 raise ValueError(f"Database already exists for user {username}")
338 # Create connection string - use regular SQLite when SQLCipher not available
339 if self.has_encryption:
340 # Create directory if it doesn't exist
341 db_path.parent.mkdir(parents=True, exist_ok=True)
343 # Create per-database salt for new databases (v2 security improvement)
344 create_database_salt(db_path)
345 logger.info(f"Created per-database salt for {username}")
347 # Pre-derive key before closures to avoid capturing plaintext password
348 hex_key = get_key_from_password(password, db_path=db_path).hex()
350 # Create database structure using raw SQLCipher outside SQLAlchemy
351 try:
352 conn = create_sqlcipher_connection(
353 db_path,
354 password=password,
355 creation_mode=True,
356 connect_kwargs={
357 "isolation_level": "IMMEDIATE",
358 "check_same_thread": False,
359 },
360 )
361 try:
362 # Get the CREATE TABLE statements from SQLAlchemy models
363 from sqlalchemy.dialects import sqlite
364 from sqlalchemy.schema import CreateTable
366 from .models import Base
368 # Create tables one by one
369 sqlite_dialect = sqlite.dialect()
370 for table in Base.metadata.sorted_tables:
371 if table.name != "users":
372 create_sql = str(
373 CreateTable(table).compile(
374 dialect=sqlite_dialect
375 )
376 )
377 logger.debug(f"Creating table {table.name}")
378 conn.execute(create_sql)
380 conn.commit()
381 finally:
382 from ..utilities.resource_utils import safe_close
384 safe_close(conn, "user DB setup connection")
386 logger.info(
387 f"Database structure created successfully for {username}"
388 )
390 except Exception:
391 logger.exception("Error creating database structure")
392 # Cleanup partial DB file on failure
393 if db_path.exists():
394 db_path.unlink(missing_ok=True)
395 raise
397 # Small delay to ensure file is fully written
398 import time
400 time.sleep(0.1)
402 # Now create SQLAlchemy engine using custom connection creator
403 def create_engine_connection():
404 """Create a properly initialized SQLCipher connection."""
405 return create_sqlcipher_connection(
406 db_path,
407 hex_key=hex_key,
408 creation_mode=False,
409 connect_kwargs={
410 "isolation_level": "IMMEDIATE",
411 "check_same_thread": False,
412 },
413 )
415 # Create engine with custom creator function and optimized cache
416 engine = create_engine(
417 "sqlite://",
418 creator=create_engine_connection,
419 poolclass=self._pool_class,
420 echo=False,
421 query_cache_size=1000,
422 **self._get_pool_kwargs(),
423 )
424 else:
425 logger.warning(
426 f"SQLCipher not available - creating UNENCRYPTED database for user {username}"
427 )
428 # Fall back to regular SQLite with query cache
429 engine = create_engine(
430 f"sqlite:///{db_path}",
431 connect_args={"check_same_thread": False, "timeout": 30},
432 poolclass=self._pool_class,
433 echo=False,
434 query_cache_size=1000,
435 **self._get_pool_kwargs(),
436 )
438 # For unencrypted databases, just apply pragmas
439 event.listen(engine, "connect", self._apply_pragmas)
441 # Tables have already been created using raw SQLCipher above
442 # No need to create them again with SQLAlchemy
444 # Initialize database tables using centralized initialization
445 from .initialize import initialize_database
447 try:
448 # Create a session for settings initialization
449 Session = sessionmaker(bind=engine)
450 with Session() as session:
451 initialize_database(engine, session)
452 except Exception:
453 logger.exception(
454 "Database migration failed during creation — "
455 "tables exist but schema version not stamped. "
456 "Migrations will be retried on next process restart."
457 )
459 # Store connection AFTER migrations complete
460 with self._connections_lock:
461 self.connections[username] = engine
463 logger.info(f"Created encrypted database for user {username}")
464 return engine
466 def open_user_database(
467 self, username: str, password: str
468 ) -> Optional[Engine]:
469 """Open an existing encrypted database for a user."""
471 # Validate the encryption key
472 if not self._is_valid_encryption_key(password):
473 logger.error(
474 f"Invalid encryption key when opening database for user {username}: password is None or empty"
475 )
476 # TODO: Fix the root cause - research threads are not getting the correct password
477 logger.error(
478 "TODO: This usually means the research thread is not receiving the user's "
479 "password for database encryption. Need to ensure password is passed from "
480 "the main thread to research threads."
481 )
482 raise ValueError(
483 "Invalid encryption key: password cannot be None or empty"
484 )
486 # Check if already open
487 with self._connections_lock:
488 if username in self.connections:
489 return self.connections[username]
491 db_path = self._get_user_db_path(username)
493 # Prevent timing attacks: always derive key before checking file existence
494 # This ensures both existing and non-existent users take the same amount of time,
495 # preventing username enumeration via timing analysis.
496 # Pre-derive key before closures to avoid capturing plaintext password
497 hex_key = get_key_from_password(password, db_path=db_path).hex()
499 if not db_path.exists():
500 logger.error(f"No database found for user {username}")
501 return None
503 # Warn if this is a legacy database without per-database salt
504 if self.has_encryption and not has_per_database_salt(db_path): 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 logger.warning(
506 f"Database for user '{username}' uses the legacy shared salt "
507 f"(deprecated). For improved security, consider creating a new "
508 f"account to get a per-database salt. Legacy databases remain "
509 f"fully functional but are less resistant to multi-target attacks."
510 )
512 # Create connection string - use regular SQLite when SQLCipher not available
513 if self.has_encryption:
515 def create_open_connection():
516 """Create a properly initialized SQLCipher connection."""
517 return create_sqlcipher_connection(
518 db_path,
519 hex_key=hex_key,
520 creation_mode=False,
521 connect_kwargs={
522 "isolation_level": "IMMEDIATE",
523 "check_same_thread": False,
524 },
525 )
527 # Create engine with custom creator function and optimized cache
528 engine = create_engine(
529 "sqlite://",
530 creator=create_open_connection,
531 poolclass=self._pool_class,
532 echo=False,
533 query_cache_size=1000,
534 **self._get_pool_kwargs(),
535 )
536 else:
537 logger.warning(
538 f"SQLCipher not available - opening UNENCRYPTED database for user {username}"
539 )
540 # Fall back to regular SQLite (no password protection!)
541 engine = create_engine(
542 f"sqlite:///{db_path}",
543 connect_args={"check_same_thread": False, "timeout": 30},
544 poolclass=self._pool_class,
545 echo=False,
546 query_cache_size=1000,
547 **self._get_pool_kwargs(),
548 )
550 # For unencrypted databases, just apply pragmas
551 event.listen(engine, "connect", self._apply_pragmas)
553 try:
554 # Test connection by running a simple query
555 with engine.connect() as conn:
556 conn.execute(text("SELECT 1"))
558 # Run database initialization (creates missing tables and runs migrations)
559 from .initialize import initialize_database
561 # Create backup before migration to protect against schema change failures
562 from .alembic_runner import needs_migration
564 if needs_migration(engine):
565 try:
566 from .backup.backup_service import BackupService
568 result = BackupService(
569 username=username, password=password
570 ).create_backup(force=True)
571 if result.success: 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true
572 logger.info(
573 f"Pre-migration backup created: {result.backup_path}"
574 )
575 else:
576 logger.error(
577 f"Pre-migration backup failed: {result.error}"
578 )
579 except Exception:
580 logger.exception(
581 "Pre-migration backup failed — proceeding with migration"
582 )
584 try:
585 initialize_database(engine)
586 except Exception:
587 logger.exception(
588 f"Database migration failed for {username} — "
589 "database remains at previous working revision (safe). "
590 "Migrations will be retried on next process restart."
591 )
593 # Store connection AFTER migrations complete
594 with self._connections_lock:
595 self.connections[username] = engine
597 logger.info(f"Opened encrypted database for user {username}")
598 return engine
600 except Exception:
601 logger.exception(f"Failed to open database for user {username}")
602 engine.dispose()
603 return None
605 def get_session(self, username: str) -> Optional[Session]:
606 """Create a new session for a user's database."""
607 with self._connections_lock:
608 if username not in self.connections:
609 # Use debug level for this common scenario to reduce log noise
610 logger.debug(f"No open database for user {username}")
611 return None
612 engine = self.connections[username]
613 # Create session inside lock to prevent race with close_user_database()
614 SessionLocal = sessionmaker(bind=engine)
615 return SessionLocal()
617 def get_connected_usernames(self) -> set:
618 """Return a snapshot of usernames with open connections."""
619 with self._connections_lock:
620 return set(self.connections.keys())
622 def close_user_database(self, username: str):
623 """Close a user's database connection."""
624 with self._connections_lock:
625 if username in self.connections:
626 try:
627 self.connections[username].dispose()
628 except Exception:
629 logger.warning(
630 f"Failed to dispose engine for {username}",
631 )
632 del self.connections[username]
633 logger.info(f"Closed database for user {username}")
635 def close_all_databases(self):
636 """Close all open user database connections and release file locks."""
637 with self._connections_lock:
638 for username, engine in list(self.connections.items()):
639 try:
640 engine.dispose()
641 except Exception:
642 logger.debug(f"Error disposing engine for {username}")
643 self.connections.clear()
645 def check_database_integrity(self, username: str) -> bool:
646 """Check integrity of a user's encrypted database."""
647 with self._connections_lock:
648 if username not in self.connections:
649 return False
650 engine = self.connections[username]
652 try:
653 with engine.connect() as conn:
654 # Quick integrity check
655 result = conn.execute(text("PRAGMA quick_check"))
656 if result.fetchone()[0] != "ok":
657 return False
659 # SQLCipher integrity check
660 result = conn.execute(text("PRAGMA cipher_integrity_check"))
661 # If this returns any rows, there are HMAC failures
662 failures = list(result)
663 if failures:
664 logger.error(
665 f"Integrity check failed for {username}: {len(failures)} HMAC failures"
666 )
667 return False
669 return True
671 except Exception:
672 logger.exception(f"Integrity check error for user: {username}")
673 return False
675 def change_password(
676 self, username: str, old_password: str, new_password: str
677 ) -> bool:
678 """Change the encryption password for a user's database.
680 This rekeys the SQLCipher database — no separate auth-DB
681 password-hash update is needed because passwords are never
682 stored. Login verification is done by attempting decryption.
683 """
684 if not self.has_encryption:
685 logger.warning(
686 "Cannot change password - SQLCipher not available (databases are unencrypted)"
687 )
688 return False
690 db_path = self._get_user_db_path(username)
692 if not db_path.exists():
693 return False
695 try:
696 # Close existing connection if any
697 self.close_user_database(username)
699 # Open with old password
700 engine = self.open_user_database(username, old_password)
701 if not engine:
702 return False
704 # Rekey the database (only works with SQLCipher)
705 with engine.connect() as conn:
706 # Use centralized rekey function
707 set_sqlcipher_rekey(conn, new_password, db_path=db_path)
709 logger.info(f"Password changed for user {username}")
710 return True
712 except Exception:
713 logger.exception(f"Failed to change password for user: {username}")
714 return False
715 finally:
716 # Close the connection
717 self.close_user_database(username)
719 def user_exists(self, username: str) -> bool:
720 """Check if a user exists in the auth database."""
721 from .auth_db import auth_db_session
722 from .models.auth import User
724 with auth_db_session() as session:
725 user = session.query(User).filter_by(username=username).first()
726 return user is not None
728 def get_memory_usage(self) -> Dict[str, Any]:
729 """Get memory usage statistics."""
730 with self._connections_lock:
731 num_connections = len(self.connections)
732 return {
733 "active_connections": num_connections,
734 "active_sessions": 0, # Sessions are created on-demand, not tracked
735 "estimated_memory_mb": num_connections
736 * 3.5, # ~3.5MB per connection
737 }
739 def create_thread_safe_session_for_metrics(
740 self, username: str, password: str
741 ):
742 """
743 Create a new database session safe for use in background threads.
745 Previously this method created a dedicated NullPool engine per
746 (username, thread_id) pair, which leaked file descriptors under
747 load (SQLCipher + WAL holds 3 FDs per active connection and
748 orphaned engines accumulated when @thread_cleanup did not fire).
750 It now routes through the shared per-user QueuePool engine at
751 ``self.connections[username]``. That engine is already created
752 with ``check_same_thread=False`` (so background threads are
753 safe), is bounded by ``pool_size + max_overflow``, and is
754 subject to the periodic ``dispose()`` workaround in
755 ``connection_cleanup.py`` that mitigates the SQLCipher+WAL
756 out-of-order-close FD leak.
758 Args:
759 username: The username
760 password: The user's password (encryption key), used only
761 to open the user database on cache miss.
763 Returns:
764 A SQLAlchemy Session bound to the per-user QueuePool engine.
765 """
766 db_path = self._get_user_db_path(username)
768 if not db_path.exists():
769 raise ValueError(f"No database found for user {username}")
771 with self._connections_lock:
772 engine = self.connections.get(username)
774 if engine is None:
775 # Cache miss — open the user database. This is idempotent:
776 # after the first call it just returns the cached engine.
777 engine = self.open_user_database(username, password)
778 if engine is None: 778 ↛ 779line 778 didn't jump to line 779 because the condition on line 778 was never true
779 raise ValueError(f"Failed to open database for user {username}")
781 # Use SQLAlchemy's default expire_on_commit=True.
782 Session = sessionmaker(bind=engine)
783 return Session()
786# Global instance
787db_manager = DatabaseManager()