Coverage for src / local_deep_research / database / encrypted_db.py: 88%
340 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Encrypted database management using SQLCipher.
3Handles per-user encrypted databases with browser-friendly authentication.
4"""
6import os
7import threading
8from pathlib import Path
9from typing import Any, Dict, Optional, Tuple
11from loguru import logger
12from sqlalchemy import create_engine, event, text
13from sqlalchemy.engine import Engine
14from sqlalchemy.orm import Session, sessionmaker
15from sqlalchemy.pool import QueuePool, NullPool, StaticPool
17from ..config.paths import get_data_directory, get_user_database_filename
18from ..settings.env_registry import get_env_setting
19from .sqlcipher_compat import get_sqlcipher_module
20from .sqlcipher_utils import (
21 set_sqlcipher_key,
22 set_sqlcipher_rekey,
23 apply_cipher_defaults_before_key,
24 apply_sqlcipher_pragmas,
25 apply_performance_pragmas,
26 verify_sqlcipher_connection,
27 create_database_salt,
28 has_per_database_salt,
29 get_key_from_password,
30 get_sqlcipher_version,
31 create_sqlcipher_connection,
32)
35class DatabaseManager:
36 """Manages encrypted SQLCipher databases for each user."""
38 def __init__(self):
39 self.connections: Dict[str, Engine] = {}
40 self._connections_lock = threading.RLock()
41 # Track thread-specific engines for cleanup (key: (username, thread_id) tuple)
42 self._thread_engines: Dict[Tuple[str, int], Engine] = {}
43 self._thread_engine_lock = threading.Lock()
44 self.data_dir = get_data_directory() / "encrypted_databases"
45 self.data_dir.mkdir(parents=True, exist_ok=True)
47 # Check SQLCipher availability
48 self.has_encryption = self._check_encryption_available()
50 # Determine pool class based on environment
51 # Use StaticPool for testing to avoid locking issues
52 self._use_static_pool = bool(os.environ.get("TESTING"))
53 self._pool_class = StaticPool if self._use_static_pool else QueuePool
55 def _get_pool_kwargs(self) -> Dict[str, Any]:
56 """Get pool configuration kwargs based on pool type.
58 StaticPool doesn't support pool_size or max_overflow,
59 so we only include them for QueuePool.
60 """
61 if self._use_static_pool:
62 return {}
63 return {
64 "pool_size": 10,
65 "max_overflow": 30, # Increase overflow so we don't run out of connections
66 "pool_pre_ping": True, # Validate connections before use (prevents stale connections)
67 }
69 def _is_valid_encryption_key(self, password: str) -> bool:
70 """
71 Check if the provided password is valid (not None, empty, or whitespace-only).
73 Args:
74 password: The password to check
76 Returns:
77 True if the password is valid, False otherwise
78 """
79 return password is not None and password.strip() != ""
81 def is_user_connected(self, username: str) -> bool:
82 """Check if a user has an active database connection.
84 Thread-safe accessor for external callers.
86 Args:
87 username: The username to check
89 Returns:
90 True if the user has an active connection
91 """
92 with self._connections_lock:
93 return username in self.connections
95 def _check_encryption_available(self) -> bool:
96 """Check if SQLCipher is available for encryption."""
97 try:
98 import os as os_module
99 import tempfile
101 # Test if SQLCipher actually works, not just if it imports
102 with tempfile.NamedTemporaryFile(delete=False) as tmp:
103 tmp_path = tmp.name
105 try:
106 # Try to create a test encrypted database
107 sqlcipher_module = get_sqlcipher_module()
108 sqlcipher = sqlcipher_module.dbapi2
110 conn = sqlcipher.connect(tmp_path)
111 try:
112 cursor = conn.cursor()
113 # Use creation_mode=True since we're creating a new test database
114 apply_cipher_defaults_before_key(cursor)
115 # Use centralized key setting
116 set_sqlcipher_key(cursor, "testpass")
117 # Apply post-key pragmas (kdf_iter for new DB)
118 apply_sqlcipher_pragmas(cursor, creation_mode=True)
119 apply_performance_pragmas(cursor)
121 # Check SQLCipher version
122 version = get_sqlcipher_version(cursor)
123 if version: 123 ↛ 135line 123 didn't jump to line 135 because the condition on line 123 was always true
124 major = (
125 version.split(".")[0]
126 if "." in version
127 else version[0]
128 )
129 if major.isdigit() and int(major) < 4: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 logger.warning(
131 f"SQLCipher version {version} detected. "
132 "Version 4.x+ is recommended for proper PRAGMA ordering."
133 )
135 cursor.close()
136 # Now use the connection for table operations
137 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
138 conn.execute("INSERT INTO test VALUES (1)")
139 result = conn.execute("SELECT * FROM test").fetchone()
141 if result != (1,): 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 raise Exception("SQLCipher encryption test failed")
143 logger.info(
144 "SQLCipher available and working - databases will be encrypted"
145 )
146 return True
147 finally:
148 conn.close()
149 except Exception as e:
150 logger.warning(f"SQLCipher module found but not working: {e}")
151 raise ImportError("SQLCipher not functional")
152 finally:
153 # Clean up test file
154 try:
155 os_module.unlink(tmp_path)
156 except OSError as e:
157 logger.debug(
158 f"Failed to clean up temp file {tmp_path}: {e}"
159 )
161 except ImportError:
162 # Check if user has explicitly allowed unencrypted databases.
163 # Registry handles deprecated LDR_ALLOW_UNENCRYPTED fallback automatically.
164 allow_unencrypted = get_env_setting(
165 "bootstrap.allow_unencrypted", False
166 )
168 if not allow_unencrypted:
169 logger.exception(
170 "SECURITY ERROR: SQLCipher is not installed!\n"
171 "Your databases will NOT be encrypted.\n"
172 "To fix this:\n"
173 "1. Install SQLCipher: sudo apt install sqlcipher libsqlcipher-dev\n"
174 "2. Reinstall project: pdm install\n"
175 "Or use Docker with SQLCipher pre-installed.\n\n"
176 "To explicitly allow unencrypted databases (NOT RECOMMENDED):\n"
177 "export LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true"
178 )
179 raise RuntimeError(
180 "SQLCipher not available. Set LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true to proceed without encryption (NOT RECOMMENDED)"
181 )
182 else:
183 logger.warning(
184 "WARNING: Running with UNENCRYPTED databases!\n"
185 "This means:\n"
186 "- Passwords don't protect data access\n"
187 "- API keys are stored in plain text\n"
188 "- Anyone with file access can read all data\n"
189 "Install SQLCipher for secure operation!"
190 )
191 return False
193 def _get_user_db_path(self, username: str) -> Path:
194 """Get the path for a user's encrypted database."""
195 return self.data_dir / get_user_database_filename(username)
197 def _apply_pragmas(self, connection, connection_record):
198 """Apply pragmas for optimal performance."""
199 # Check if this is SQLCipher or regular SQLite
200 is_encrypted = self.has_encryption
202 # Use centralized performance pragma application
204 apply_performance_pragmas(connection)
206 # SQLCipher-specific pragmas
207 if is_encrypted:
208 from .sqlcipher_utils import get_sqlcipher_settings
210 settings = get_sqlcipher_settings()
211 pragmas = [
212 f"PRAGMA kdf_iter = {settings['kdf_iterations']}",
213 f"PRAGMA cipher_page_size = {settings['page_size']}",
214 ]
215 for pragma in pragmas:
216 try:
217 connection.execute(pragma)
218 except Exception as e:
219 logger.debug(f"Could not apply pragma '{pragma}': {e}")
220 else:
221 # Regular SQLite pragma
222 try:
223 connection.execute(
224 "PRAGMA mmap_size = 268435456"
225 ) # 256MB memory mapping
226 except Exception as e:
227 logger.debug(f"Could not apply mmap_size pragma: {e}")
229 @staticmethod
230 def _make_sqlcipher_connection(
231 db_path: Path,
232 password: str,
233 isolation_level: Optional[str] = "IMMEDIATE",
234 check_same_thread: bool = False,
235 ) -> Any:
236 """Create a properly initialized SQLCipher connection.
238 Follows the canonical SQLCipher initialization order: set key,
239 apply cipher pragmas, verify, then apply performance pragmas.
240 Cipher pragmas (page size, HMAC algorithm, KDF iterations) must
241 be configured before the first query (verification) because that
242 query triggers page decryption with the active cipher settings.
244 Args:
245 db_path: Path to the database file
246 password: The database encryption passphrase
247 isolation_level: SQLite isolation level (``""`` for deferred
248 transactions, ``None`` for autocommit)
249 check_same_thread: SQLite check_same_thread flag
251 Returns:
252 A raw ``sqlcipher3`` connection ready for use.
254 Raises:
255 ValueError: If the database key cannot be verified.
256 """
257 sqlcipher3 = get_sqlcipher_module()
258 conn = sqlcipher3.connect(
259 str(db_path),
260 isolation_level=isolation_level,
261 check_same_thread=check_same_thread,
262 )
263 cursor = conn.cursor()
265 try:
266 set_sqlcipher_key(cursor, password, db_path=db_path)
267 apply_sqlcipher_pragmas(cursor, creation_mode=False)
269 if not verify_sqlcipher_connection(cursor):
270 raise ValueError("Failed to verify database key")
272 apply_performance_pragmas(cursor)
273 except Exception:
274 try:
275 cursor.close()
276 except Exception: # noqa: BLE001
277 logger.warning(
278 "Failed to close cursor during cleanup", exc_info=True
279 )
280 conn.close()
281 raise
283 cursor.close()
284 return conn
286 def create_user_database(self, username: str, password: str) -> Engine:
287 """Create a new encrypted database for a user."""
289 # Validate the encryption key
290 if not self._is_valid_encryption_key(password):
291 logger.error(
292 f"Invalid encryption key for user {username}: password is None or empty"
293 )
294 raise ValueError(
295 "Invalid encryption key: password cannot be None or empty"
296 )
298 db_path = self._get_user_db_path(username)
300 if db_path.exists():
301 raise ValueError(f"Database already exists for user {username}")
303 # Create connection string - use regular SQLite when SQLCipher not available
304 if self.has_encryption:
305 # Create directory if it doesn't exist
306 db_path.parent.mkdir(parents=True, exist_ok=True)
308 # Create per-database salt for new databases (v2 security improvement)
309 create_database_salt(db_path)
310 logger.info(f"Created per-database salt for {username}")
312 # Pre-derive key before closures to avoid capturing plaintext password
313 hex_key = get_key_from_password(password, db_path=db_path).hex()
315 # Create database structure using raw SQLCipher outside SQLAlchemy
316 try:
317 conn = create_sqlcipher_connection(
318 db_path,
319 password=password,
320 creation_mode=True,
321 connect_kwargs={
322 "isolation_level": "IMMEDIATE",
323 "check_same_thread": False,
324 },
325 )
326 try:
327 # Get the CREATE TABLE statements from SQLAlchemy models
328 from sqlalchemy.dialects import sqlite
329 from sqlalchemy.schema import CreateTable
331 from .models import Base
333 # Create tables one by one
334 sqlite_dialect = sqlite.dialect()
335 for table in Base.metadata.sorted_tables:
336 if table.name != "users":
337 create_sql = str(
338 CreateTable(table).compile(
339 dialect=sqlite_dialect
340 )
341 )
342 logger.debug(f"Creating table {table.name}")
343 conn.execute(create_sql)
345 conn.commit()
346 finally:
347 conn.close()
349 logger.info(
350 f"Database structure created successfully for {username}"
351 )
353 except Exception:
354 logger.exception("Error creating database structure")
355 # Cleanup partial DB file on failure
356 if db_path.exists():
357 db_path.unlink(missing_ok=True)
358 raise
360 # Small delay to ensure file is fully written
361 import time
363 time.sleep(0.1)
365 # Now create SQLAlchemy engine using custom connection creator
366 def create_engine_connection():
367 """Create a properly initialized SQLCipher connection."""
368 return create_sqlcipher_connection(
369 db_path,
370 hex_key=hex_key,
371 creation_mode=False,
372 connect_kwargs={
373 "isolation_level": "IMMEDIATE",
374 "check_same_thread": False,
375 },
376 )
378 # Create engine with custom creator function and optimized cache
379 engine = create_engine(
380 "sqlite://",
381 creator=create_engine_connection,
382 poolclass=self._pool_class,
383 echo=False,
384 query_cache_size=1000,
385 **self._get_pool_kwargs(),
386 )
387 else:
388 logger.warning(
389 f"SQLCipher not available - creating UNENCRYPTED database for user {username}"
390 )
391 # Fall back to regular SQLite with query cache
392 engine = create_engine(
393 f"sqlite:///{db_path}",
394 connect_args={"check_same_thread": False, "timeout": 30},
395 poolclass=self._pool_class,
396 echo=False,
397 query_cache_size=1000,
398 **self._get_pool_kwargs(),
399 )
401 # For unencrypted databases, just apply pragmas
402 event.listen(engine, "connect", self._apply_pragmas)
404 # Tables have already been created using raw SQLCipher above
405 # No need to create them again with SQLAlchemy
407 # Store connection
408 with self._connections_lock:
409 self.connections[username] = engine
411 # Initialize database tables using centralized initialization
412 from .initialize import initialize_database
414 try:
415 # Create a session for settings initialization
416 Session = sessionmaker(bind=engine)
417 with Session() as session:
418 initialize_database(engine, session)
419 except Exception as e:
420 logger.warning(f"Could not initialize database fully: {e}")
421 # Still continue - basic tables were created above
423 logger.info(f"Created encrypted database for user {username}")
424 return engine
426 def open_user_database(
427 self, username: str, password: str
428 ) -> Optional[Engine]:
429 """Open an existing encrypted database for a user."""
431 # Validate the encryption key
432 if not self._is_valid_encryption_key(password):
433 logger.error(
434 f"Invalid encryption key when opening database for user {username}: password is None or empty"
435 )
436 # TODO: Fix the root cause - research threads are not getting the correct password
437 logger.error(
438 "TODO: This usually means the research thread is not receiving the user's "
439 "password for database encryption. Need to ensure password is passed from "
440 "the main thread to research threads."
441 )
442 raise ValueError(
443 "Invalid encryption key: password cannot be None or empty"
444 )
446 # Check if already open
447 with self._connections_lock:
448 if username in self.connections:
449 return self.connections[username]
451 db_path = self._get_user_db_path(username)
453 # Prevent timing attacks: always derive key before checking file existence
454 # This ensures both existing and non-existent users take the same amount of time,
455 # preventing username enumeration via timing analysis.
456 # Pre-derive key before closures to avoid capturing plaintext password
457 hex_key = get_key_from_password(password, db_path=db_path).hex()
459 if not db_path.exists():
460 logger.error(f"No database found for user {username}")
461 return None
463 # Warn if this is a legacy database without per-database salt
464 if self.has_encryption and not has_per_database_salt(db_path): 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true
465 logger.warning(
466 f"Database for user '{username}' uses the legacy shared salt "
467 f"(deprecated). For improved security, consider creating a new "
468 f"account to get a per-database salt. Legacy databases remain "
469 f"fully functional but are less resistant to multi-target attacks."
470 )
472 # Create connection string - use regular SQLite when SQLCipher not available
473 if self.has_encryption:
475 def create_open_connection():
476 """Create a properly initialized SQLCipher connection."""
477 return create_sqlcipher_connection(
478 db_path,
479 hex_key=hex_key,
480 creation_mode=False,
481 connect_kwargs={
482 "isolation_level": "IMMEDIATE",
483 "check_same_thread": False,
484 },
485 )
487 # Create engine with custom creator function and optimized cache
488 engine = create_engine(
489 "sqlite://",
490 creator=create_open_connection,
491 poolclass=self._pool_class,
492 echo=False,
493 query_cache_size=1000,
494 **self._get_pool_kwargs(),
495 )
496 else:
497 logger.warning(
498 f"SQLCipher not available - opening UNENCRYPTED database for user {username}"
499 )
500 # Fall back to regular SQLite (no password protection!)
501 engine = create_engine(
502 f"sqlite:///{db_path}",
503 connect_args={"check_same_thread": False, "timeout": 30},
504 poolclass=self._pool_class,
505 echo=False,
506 query_cache_size=1000,
507 **self._get_pool_kwargs(),
508 )
510 # For unencrypted databases, just apply pragmas
511 event.listen(engine, "connect", self._apply_pragmas)
513 try:
514 # Test connection by running a simple query
515 with engine.connect() as conn:
516 conn.execute(text("SELECT 1"))
518 # Store connection
519 with self._connections_lock:
520 self.connections[username] = engine
522 # Run database initialization (creates missing tables and runs migrations)
523 from .initialize import initialize_database
525 try:
526 initialize_database(engine)
527 except Exception as e:
528 logger.warning(f"Could not run migrations for {username}: {e}")
530 logger.info(f"Opened encrypted database for user {username}")
531 return engine
533 except Exception:
534 logger.exception(f"Failed to open database for user {username}")
535 engine.dispose()
536 return None
538 def get_session(self, username: str) -> Optional[Session]:
539 """Create a new session for a user's database."""
540 with self._connections_lock:
541 if username not in self.connections:
542 # Use debug level for this common scenario to reduce log noise
543 logger.debug(f"No open database for user {username}")
544 return None
545 engine = self.connections[username]
546 # Create session inside lock to prevent race with close_user_database()
547 SessionLocal = sessionmaker(bind=engine)
548 return SessionLocal()
550 def close_user_database(self, username: str):
551 """Close a user's database connection."""
552 with self._connections_lock:
553 if username in self.connections:
554 self.connections[username].dispose()
555 del self.connections[username]
556 logger.info(f"Closed database for user {username}")
558 # Also cleanup any thread engines for this user
559 self.cleanup_thread_engines(username=username)
561 def check_database_integrity(self, username: str) -> bool:
562 """Check integrity of a user's encrypted database."""
563 with self._connections_lock:
564 if username not in self.connections:
565 return False
566 engine = self.connections[username]
568 try:
569 with engine.connect() as conn:
570 # Quick integrity check
571 result = conn.execute(text("PRAGMA quick_check"))
572 if result.fetchone()[0] != "ok":
573 return False
575 # SQLCipher integrity check
576 result = conn.execute(text("PRAGMA cipher_integrity_check"))
577 # If this returns any rows, there are HMAC failures
578 failures = list(result)
579 if failures:
580 logger.error(
581 f"Integrity check failed for {username}: {len(failures)} HMAC failures"
582 )
583 return False
585 return True
587 except Exception:
588 logger.exception(f"Integrity check error for user: {username}")
589 return False
591 def change_password(
592 self, username: str, old_password: str, new_password: str
593 ) -> bool:
594 """Change the encryption password for a user's database.
596 This rekeys the SQLCipher database — no separate auth-DB
597 password-hash update is needed because passwords are never
598 stored. Login verification is done by attempting decryption.
599 """
600 if not self.has_encryption:
601 logger.warning(
602 "Cannot change password - SQLCipher not available (databases are unencrypted)"
603 )
604 return False
606 db_path = self._get_user_db_path(username)
608 if not db_path.exists():
609 return False
611 try:
612 # Close existing connection if any
613 self.close_user_database(username)
615 # Open with old password
616 engine = self.open_user_database(username, old_password)
617 if not engine:
618 return False
620 # Rekey the database (only works with SQLCipher)
621 with engine.connect() as conn:
622 # Use centralized rekey function
623 set_sqlcipher_rekey(conn, new_password, db_path=db_path)
625 logger.info(f"Password changed for user {username}")
626 return True
628 except Exception:
629 logger.exception(f"Failed to change password for user: {username}")
630 return False
631 finally:
632 # Close the connection
633 self.close_user_database(username)
635 def user_exists(self, username: str) -> bool:
636 """Check if a user exists in the auth database."""
637 from .auth_db import auth_db_session
638 from .models.auth import User
640 with auth_db_session() as session:
641 user = session.query(User).filter_by(username=username).first()
642 return user is not None
644 def get_memory_usage(self) -> Dict[str, Any]:
645 """Get memory usage statistics."""
646 with self._connections_lock:
647 num_connections = len(self.connections)
648 return {
649 "active_connections": num_connections,
650 "thread_engines": len(self._thread_engines),
651 "active_sessions": 0, # Sessions are created on-demand, not tracked
652 "estimated_memory_mb": (num_connections + len(self._thread_engines))
653 * 3.5, # ~3.5MB per connection
654 }
656 def cleanup_thread_engines(
657 self, username: str = None, thread_id: int = None
658 ):
659 """
660 Clean up thread-specific engines.
662 Args:
663 username: If provided, cleanup engines for this user only
664 thread_id: If provided, cleanup engines for this thread only
665 If neither provided, cleanup current thread's engines
666 """
667 if thread_id is None and username is None:
668 thread_id = threading.get_ident()
670 with self._thread_engine_lock:
671 keys_to_remove = []
672 for key in self._thread_engines:
673 # Key is a tuple: (username, thread_id)
674 key_user, key_thread = key
676 should_remove = False
677 if username and thread_id:
678 should_remove = (
679 key_user == username and key_thread == thread_id
680 )
681 elif username:
682 should_remove = key_user == username
683 elif thread_id: 683 ↛ 686line 683 didn't jump to line 686 because the condition on line 683 was always true
684 should_remove = key_thread == thread_id
686 if should_remove:
687 keys_to_remove.append(key)
689 for key in keys_to_remove:
690 try:
691 self._thread_engines[key].dispose()
692 logger.debug(f"Disposed thread engine: {key}")
693 except Exception as e:
694 logger.warning(f"Error disposing thread engine {key}: {e}")
695 del self._thread_engines[key]
697 def cleanup_all_thread_engines(self):
698 """Clean up all thread-specific engines (for shutdown)."""
699 with self._thread_engine_lock:
700 count = len(self._thread_engines)
701 for key, engine in list(self._thread_engines.items()):
702 try:
703 engine.dispose()
704 except Exception as e:
705 logger.warning(f"Error disposing thread engine {key}: {e}")
706 self._thread_engines.clear()
707 logger.info(f"All thread engines disposed ({count} total)")
709 def create_thread_safe_session_for_metrics(
710 self, username: str, password: str
711 ):
712 """
713 Create a new database session safe for use in background threads.
714 This is specifically for metrics/logging - NOT for settings or user data.
716 Args:
717 username: The username
718 password: The user's password (encryption key)
720 Returns:
721 A SQLAlchemy session that can be used in the current thread
723 IMPORTANT: This should ONLY be used for:
724 - Writing token metrics
725 - Writing search metrics
726 - Writing logs
728 DO NOT use this for:
729 - Reading/writing settings
730 - Modifying user data
731 - Any operation that should be synchronized with user requests
732 """
733 db_path = self._get_user_db_path(username)
735 # Warn if this is a legacy database without per-database salt
736 if self.has_encryption and not has_per_database_salt(db_path):
737 logger.warning(
738 f"Database for user '{username}' uses a legacy shared salt "
739 f"(deprecated). For improved security, consider creating a new "
740 f"account to get a per-database salt. Legacy databases remain "
741 f"fully functional but are less resistant to multi-target attacks."
742 )
744 # Prevent timing attacks: always derive key before checking file existence
745 # This ensures both existing and non-existent users take the same amount of time,
746 # preventing username enumeration via timing analysis.
747 # Pre-derive key before closures
748 hex_key = get_key_from_password(password, db_path=db_path).hex()
750 if not db_path.exists():
751 raise ValueError(f"No database found for user {username}")
753 # Create thread-specific key for engine reuse (tuple is cleaner than string)
754 thread_id = threading.get_ident()
755 engine_key = (username, thread_id)
757 # Check for existing engine for this thread/user combo
758 with self._thread_engine_lock:
759 if engine_key in self._thread_engines:
760 engine = self._thread_engines[engine_key]
761 # Verify engine is still valid
762 try:
763 with engine.connect() as conn:
764 conn.execute(text("SELECT 1"))
765 # Engine is valid, create session from it
766 Session = sessionmaker(bind=engine)
767 return Session()
768 except Exception as e:
769 # Engine is stale, remove it and create new one
770 logger.debug(
771 f"Stale thread engine for {engine_key}, recreating: {e}"
772 )
773 try:
774 engine.dispose()
775 except Exception as dispose_error:
776 logger.debug(
777 f"Error disposing stale engine: {dispose_error}"
778 )
779 del self._thread_engines[engine_key]
781 # Create a thread-local engine
782 if self.has_encryption: 782 ↛ 807line 782 didn't jump to line 807 because the condition on line 782 was always true
784 def create_thread_connection():
785 """Create a SQLCipher connection for this thread."""
786 try:
787 return create_sqlcipher_connection(
788 db_path,
789 hex_key=hex_key,
790 creation_mode=False,
791 connect_kwargs={"check_same_thread": False},
792 )
793 except Exception:
794 logger.exception(
795 f"Failed to create thread connection for {username}"
796 )
797 raise
799 engine = create_engine(
800 "sqlite://",
801 creator=create_thread_connection,
802 poolclass=NullPool, # Important: no connection pooling for threads
803 echo=False,
804 )
805 else:
806 # Unencrypted fallback
807 logger.warning("Creating unencrypted thread session for metrics")
808 engine = create_engine(
809 f"sqlite:///{db_path}",
810 poolclass=NullPool,
811 echo=False,
812 )
814 # Store engine for reuse within this thread
815 with self._thread_engine_lock:
816 self._thread_engines[engine_key] = engine
818 # Create session
819 Session = sessionmaker(bind=engine)
820 return Session()
823# Global instance
824db_manager = DatabaseManager()