Coverage for src / local_deep_research / database / encrypted_db.py: 76%
259 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Encrypted database management using SQLCipher.
3Handles per-user encrypted databases with browser-friendly authentication.
4"""
6import os
7from pathlib import Path
8from typing import Any, Dict, Optional
10from loguru import logger
11from sqlalchemy import create_engine, event, text
12from sqlalchemy.engine import Engine
13from sqlalchemy.orm import Session, sessionmaker
14from sqlalchemy.pool import QueuePool, NullPool, StaticPool
16from ..config.paths import get_data_directory, get_user_database_filename
17from .sqlcipher_compat import get_sqlcipher_module
18from .sqlcipher_utils import (
19 set_sqlcipher_key,
20 set_sqlcipher_rekey,
21 apply_sqlcipher_pragmas,
22 apply_performance_pragmas,
23 verify_sqlcipher_connection,
24)
27class DatabaseManager:
28 """Manages encrypted SQLCipher databases for each user."""
30 def __init__(self):
31 self.connections: Dict[str, Engine] = {}
32 self.data_dir = get_data_directory() / "encrypted_databases"
33 self.data_dir.mkdir(parents=True, exist_ok=True)
35 # Check SQLCipher availability
36 self.has_encryption = self._check_encryption_available()
38 # Determine pool class based on environment
39 # Use StaticPool for testing to avoid locking issues
40 self._use_static_pool = bool(os.environ.get("TESTING"))
41 self._pool_class = StaticPool if self._use_static_pool else QueuePool
43 def _get_pool_kwargs(self) -> Dict[str, Any]:
44 """Get pool configuration kwargs based on pool type.
46 StaticPool doesn't support pool_size or max_overflow,
47 so we only include them for QueuePool.
48 """
49 if self._use_static_pool: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true
50 return {}
51 return {
52 "pool_size": 10,
53 "max_overflow": 30, # Increase overflow so we don't run out of connections
54 }
56 def _is_valid_encryption_key(self, password: str) -> bool:
57 """
58 Check if the provided password is valid (not None or empty).
60 Args:
61 password: The password to check
63 Returns:
64 True if the password is valid, False otherwise
65 """
66 return password is not None and password != ""
68 def _check_encryption_available(self) -> bool:
69 """Check if SQLCipher is available for encryption."""
70 try:
71 import os as os_module
72 import tempfile
74 # Test if SQLCipher actually works, not just if it imports
75 with tempfile.NamedTemporaryFile(delete=False) as tmp:
76 tmp_path = tmp.name
78 try:
79 # Try to create a test encrypted database
81 # Use raw sqlcipher3 connection to test
82 sqlcipher_module = get_sqlcipher_module()
83 sqlcipher = sqlcipher_module.dbapi2
85 conn = sqlcipher.connect(tmp_path)
86 # Use centralized key setting
87 set_sqlcipher_key(conn, "testpass")
88 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
89 conn.execute("INSERT INTO test VALUES (1)")
90 result = conn.execute("SELECT * FROM test").fetchone()
91 conn.close()
93 if result != (1,): 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 raise Exception("SQLCipher encryption test failed")
95 logger.info(
96 "SQLCipher available and working - databases will be encrypted"
97 )
98 return True
99 except Exception as e:
100 logger.warning(f"SQLCipher module found but not working: {e}")
101 raise ImportError("SQLCipher not functional")
102 finally:
103 # Clean up test file
104 try:
105 os_module.unlink(tmp_path)
106 except:
107 pass
109 except ImportError:
110 import os
112 # Check if user has explicitly allowed unencrypted databases
113 allow_unencrypted = (
114 os.environ.get("LDR_ALLOW_UNENCRYPTED", "").lower() == "true"
115 )
117 if not allow_unencrypted:
118 logger.exception(
119 "SECURITY ERROR: SQLCipher is not installed!\n"
120 "Your databases will NOT be encrypted.\n"
121 "To fix this:\n"
122 "1. Install SQLCipher: sudo apt install sqlcipher libsqlcipher-dev\n"
123 "2. Reinstall project: pdm install\n"
124 "Or use Docker with SQLCipher pre-installed.\n\n"
125 "To explicitly allow unencrypted databases (NOT RECOMMENDED):\n"
126 "export LDR_ALLOW_UNENCRYPTED=true"
127 )
128 raise RuntimeError(
129 "SQLCipher not available. Set LDR_ALLOW_UNENCRYPTED=true to proceed without encryption (NOT RECOMMENDED)"
130 )
131 else:
132 logger.warning(
133 "⚠️ WARNING: Running with UNENCRYPTED databases!\n"
134 "This means:\n"
135 "- Passwords don't protect data access\n"
136 "- API keys are stored in plain text\n"
137 "- Anyone with file access can read all data\n"
138 "Install SQLCipher for secure operation!"
139 )
140 return False
142 def _get_user_db_path(self, username: str) -> Path:
143 """Get the path for a user's encrypted database."""
144 return self.data_dir / get_user_database_filename(username)
146 def _apply_pragmas(self, connection, connection_record):
147 """Apply pragmas for optimal performance."""
148 # Check if this is SQLCipher or regular SQLite
149 is_encrypted = self.has_encryption
151 # Use centralized performance pragma application
152 from .sqlcipher_utils import apply_performance_pragmas
154 apply_performance_pragmas(connection)
156 # SQLCipher-specific pragmas
157 if is_encrypted:
158 from .sqlcipher_utils import get_sqlcipher_settings
160 settings = get_sqlcipher_settings()
161 pragmas = [
162 f"PRAGMA kdf_iter = {settings['kdf_iterations']}",
163 f"PRAGMA cipher_page_size = {settings['page_size']}",
164 ]
165 for pragma in pragmas:
166 try:
167 connection.execute(pragma)
168 except Exception:
169 pass
170 else:
171 # Regular SQLite pragma
172 try:
173 connection.execute(
174 "PRAGMA mmap_size = 268435456"
175 ) # 256MB memory mapping
176 except Exception:
177 pass
179 def create_user_database(self, username: str, password: str) -> Engine:
180 """Create a new encrypted database for a user."""
182 # Validate the encryption key
183 if not self._is_valid_encryption_key(password): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 logger.error(
185 f"Invalid encryption key for user {username}: password is None or empty"
186 )
187 raise ValueError(
188 "Invalid encryption key: password cannot be None or empty"
189 )
191 db_path = self._get_user_db_path(username)
193 if db_path.exists():
194 raise ValueError(f"Database already exists for user {username}")
196 # Create connection string - use regular SQLite when SQLCipher not available
197 if self.has_encryption: 197 ↛ 287line 197 didn't jump to line 287 because the condition on line 197 was always true
198 # Create directory if it doesn't exist
199 db_path.parent.mkdir(parents=True, exist_ok=True)
201 # SOLUTION: Create database structure using raw SQLCipher outside SQLAlchemy
202 # This bypasses the SQLAlchemy DDL execution that causes MemoryError in Flask
203 try:
204 sqlcipher3 = get_sqlcipher_module()
206 # Create tables directly with SQLCipher, bypassing SQLAlchemy DDL
207 # Use IMMEDIATE isolation level for proper write transaction handling
208 conn = sqlcipher3.connect(
209 str(db_path),
210 isolation_level="IMMEDIATE",
211 check_same_thread=False,
212 )
213 # Use centralized SQLCipher setup
214 set_sqlcipher_key(conn, password)
215 apply_sqlcipher_pragmas(conn, creation_mode=True)
217 # Get the CREATE TABLE statements from SQLAlchemy models
218 from sqlalchemy.dialects import sqlite
219 from sqlalchemy.schema import CreateTable
221 from .models import Base
223 # Create tables one by one
224 sqlite_dialect = sqlite.dialect()
225 for table in Base.metadata.sorted_tables:
226 if table.name != "users":
227 # Get the SQL for this table with SQLite dialect
228 create_sql = str(
229 CreateTable(table).compile(dialect=sqlite_dialect)
230 )
231 logger.debug(f"Creating table {table.name}")
232 conn.execute(create_sql)
234 conn.commit()
235 conn.close()
237 logger.info(
238 f"Database structure created successfully for {username}"
239 )
241 except Exception:
242 logger.exception("Error creating database structure")
243 raise
245 # Small delay to ensure file is fully written
246 import time
248 time.sleep(0.1)
250 # Now create SQLAlchemy engine using custom connection creator
251 # This ensures encryption is properly initialized for every connection
252 sqlcipher3 = get_sqlcipher_module()
254 def create_sqlcipher_connection():
255 """Create a properly initialized SQLCipher connection."""
256 conn = sqlcipher3.connect(
257 str(db_path),
258 isolation_level="IMMEDIATE",
259 check_same_thread=False,
260 )
261 cursor = conn.cursor()
263 # Use centralized SQLCipher setup
264 set_sqlcipher_key(cursor, password)
266 # Verify connection works
267 if not verify_sqlcipher_connection(cursor): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 raise ValueError("Failed to verify database key")
270 # Apply SQLCipher and performance settings
271 apply_sqlcipher_pragmas(cursor, creation_mode=False)
272 apply_performance_pragmas(cursor, username)
274 cursor.close()
275 return conn
277 # Create engine with custom creator function and optimized cache
278 engine = create_engine(
279 "sqlite://",
280 creator=create_sqlcipher_connection,
281 poolclass=self._pool_class,
282 echo=False,
283 query_cache_size=1000, # Increased for complex queries with SQLCipher
284 **self._get_pool_kwargs(),
285 )
286 else:
287 logger.warning(
288 f"SQLCipher not available - creating UNENCRYPTED database for user {username}"
289 )
290 # Fall back to regular SQLite with query cache
291 engine = create_engine(
292 f"sqlite:///{db_path}",
293 connect_args={"check_same_thread": False, "timeout": 30},
294 poolclass=self._pool_class,
295 echo=False,
296 query_cache_size=1000, # Same optimization for unencrypted
297 **self._get_pool_kwargs(),
298 )
300 # For unencrypted databases, just apply pragmas
301 event.listen(engine, "connect", self._apply_pragmas)
303 # Tables have already been created using raw SQLCipher above
304 # No need to create them again with SQLAlchemy
306 # Store connection
307 self.connections[username] = engine
309 # Initialize database tables using centralized initialization
310 from .initialize import initialize_database
312 try:
313 # Create a session for settings initialization
314 Session = sessionmaker(bind=engine)
315 with Session() as session:
316 initialize_database(engine, session)
317 except Exception as e:
318 logger.warning(f"Could not initialize database fully: {e}")
319 # Still continue - basic tables were created above
321 logger.info(f"Created encrypted database for user {username}")
322 return engine
324 def open_user_database(
325 self, username: str, password: str
326 ) -> Optional[Engine]:
327 """Open an existing encrypted database for a user."""
329 # Validate the encryption key
330 if not self._is_valid_encryption_key(password): 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 logger.error(
332 f"Invalid encryption key when opening database for user {username}: password is None or empty"
333 )
334 # TODO: Fix the root cause - research threads are not getting the correct password
335 logger.error(
336 "TODO: This usually means the research thread is not receiving the user's "
337 "password for database encryption. Need to ensure password is passed from "
338 "the main thread to research threads."
339 )
340 raise ValueError(
341 "Invalid encryption key: password cannot be None or empty"
342 )
344 # Check if already open
345 if username in self.connections:
346 return self.connections[username]
348 db_path = self._get_user_db_path(username)
350 if not db_path.exists():
351 logger.error(f"No database found for user {username}")
352 return None
354 # Create connection string - use regular SQLite when SQLCipher not available
355 if self.has_encryption: 355 ↛ 392line 355 didn't jump to line 392 because the condition on line 355 was always true
356 # Use the same custom connection creator approach as create_user_database
357 sqlcipher3 = get_sqlcipher_module()
359 def create_sqlcipher_connection():
360 """Create a properly initialized SQLCipher connection."""
361 conn = sqlcipher3.connect(
362 str(db_path),
363 isolation_level="IMMEDIATE",
364 check_same_thread=False,
365 )
366 cursor = conn.cursor()
368 # Use centralized SQLCipher setup
369 set_sqlcipher_key(cursor, password)
371 # Verify connection works
372 if not verify_sqlcipher_connection(cursor): 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 raise ValueError("Failed to verify database key")
375 # Apply SQLCipher and performance settings
376 apply_sqlcipher_pragmas(cursor, creation_mode=False)
377 apply_performance_pragmas(cursor, username)
379 cursor.close()
380 return conn
382 # Create engine with custom creator function and optimized cache
383 engine = create_engine(
384 "sqlite://",
385 creator=create_sqlcipher_connection,
386 poolclass=self._pool_class,
387 echo=False,
388 query_cache_size=1000, # Increased for complex queries with SQLCipher
389 **self._get_pool_kwargs(),
390 )
391 else:
392 logger.warning(
393 f"SQLCipher not available - opening UNENCRYPTED database for user {username}"
394 )
395 # Fall back to regular SQLite (no password protection!)
396 engine = create_engine(
397 f"sqlite:///{db_path}",
398 connect_args={"check_same_thread": False, "timeout": 30},
399 poolclass=self._pool_class,
400 echo=False,
401 query_cache_size=1000, # Same optimization for unencrypted
402 **self._get_pool_kwargs(),
403 )
405 # For unencrypted databases, just apply pragmas
406 event.listen(engine, "connect", self._apply_pragmas)
408 try:
409 # Test connection by running a simple query
410 with engine.connect() as conn:
411 conn.execute(text("SELECT 1"))
413 # Store connection
414 self.connections[username] = engine
416 # Run database initialization (creates missing tables and runs migrations)
417 from .initialize import initialize_database
419 try:
420 initialize_database(engine)
421 except Exception as e:
422 logger.warning(f"Could not run migrations for {username}: {e}")
424 logger.info(f"Opened encrypted database for user {username}")
425 return engine
427 except Exception as e:
428 logger.exception(
429 f"Failed to open database for user {username}: {e}"
430 )
431 return None
433 def get_session(self, username: str) -> Optional[Session]:
434 """Create a new session for a user's database."""
435 if username not in self.connections:
436 # Use debug level for this common scenario to reduce log noise
437 logger.debug(f"No open database for user {username}")
438 return None
440 # Always create a fresh session to avoid stale session issues
441 engine = self.connections[username]
442 SessionLocal = sessionmaker(bind=engine)
443 return SessionLocal()
445 def close_user_database(self, username: str):
446 """Close a user's database connection."""
447 if username in self.connections:
448 self.connections[username].dispose()
449 del self.connections[username]
450 logger.info(f"Closed database for user {username}")
452 def check_database_integrity(self, username: str) -> bool:
453 """Check integrity of a user's encrypted database."""
454 if username not in self.connections:
455 return False
457 try:
458 with self.connections[username].connect() as conn:
459 # Quick integrity check
460 result = conn.execute(text("PRAGMA quick_check"))
461 if result.fetchone()[0] != "ok": 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 return False
464 # SQLCipher integrity check
465 result = conn.execute(text("PRAGMA cipher_integrity_check"))
466 # If this returns any rows, there are HMAC failures
467 failures = list(result)
468 if failures: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 logger.error(
470 f"Integrity check failed for {username}: {len(failures)} HMAC failures"
471 )
472 return False
474 return True
476 except Exception:
477 logger.exception(f"Integrity check error for user: {username}")
478 return False
480 def change_password(
481 self, username: str, old_password: str, new_password: str
482 ) -> bool:
483 """Change the encryption password for a user's database."""
484 if not self.has_encryption: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 logger.warning(
486 "Cannot change password - SQLCipher not available (databases are unencrypted)"
487 )
488 return False
490 db_path = self._get_user_db_path(username)
492 if not db_path.exists(): 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 return False
495 try:
496 # Close existing connection if any
497 self.close_user_database(username)
499 # Open with old password
500 engine = self.open_user_database(username, old_password)
501 if not engine:
502 return False
504 # Rekey the database (only works with SQLCipher)
505 with engine.connect() as conn:
506 # Use centralized rekey function
507 set_sqlcipher_rekey(conn, new_password)
509 logger.info(f"Password changed for user {username}")
510 return True
512 except Exception:
513 logger.exception(f"Failed to change password for user: {username}")
514 return False
515 finally:
516 # Close the connection
517 self.close_user_database(username)
519 def user_exists(self, username: str) -> bool:
520 """Check if a user exists in the auth database."""
521 from .auth_db import get_auth_db_session
522 from .models.auth import User
524 auth_db = get_auth_db_session()
525 user = auth_db.query(User).filter_by(username=username).first()
526 auth_db.close()
528 return user is not None
530 def get_memory_usage(self) -> Dict[str, Any]:
531 """Get memory usage statistics."""
532 return {
533 "active_connections": len(self.connections),
534 "active_sessions": 0, # Sessions are created on-demand, not tracked
535 "estimated_memory_mb": len(self.connections)
536 * 3.5, # ~3.5MB per connection
537 }
539 def create_thread_safe_session_for_metrics(
540 self, username: str, password: str
541 ):
542 """
543 Create a new database session safe for use in background threads.
544 This is specifically for metrics/logging - NOT for settings or user data.
546 Args:
547 username: The username
548 password: The user's password (encryption key)
550 Returns:
551 A SQLAlchemy session that can be used in the current thread
553 IMPORTANT: This should ONLY be used for:
554 - Writing token metrics
555 - Writing search metrics
556 - Writing logs
558 DO NOT use this for:
559 - Reading/writing settings
560 - Modifying user data
561 - Any operation that should be synchronized with user requests
562 """
563 db_path = self._get_user_db_path(username)
565 if not db_path.exists():
566 raise ValueError(f"No database found for user {username}")
568 # Create a thread-local engine
569 if self.has_encryption: 569 ↛ 607line 569 didn't jump to line 607 because the condition on line 569 was always true
570 sqlcipher3 = get_sqlcipher_module()
572 def create_thread_connection():
573 """Create a SQLCipher connection for this thread."""
574 try:
575 conn = sqlcipher3.connect(
576 str(db_path), check_same_thread=False
577 )
578 cursor = conn.cursor()
580 # Use centralized SQLCipher setup
581 set_sqlcipher_key(cursor, password)
582 apply_sqlcipher_pragmas(cursor, creation_mode=False)
584 # Verify connection works
585 if not verify_sqlcipher_connection(cursor): 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 raise ValueError("Failed to verify database key")
587 except Exception as e:
588 logger.exception(
589 f"Failed to create thread connection for {username}: {e}"
590 )
591 raise
593 # Apply performance pragmas for metrics writes
594 apply_performance_pragmas(cursor, username)
596 cursor.close()
597 return conn
599 engine = create_engine(
600 "sqlite://",
601 creator=create_thread_connection,
602 poolclass=NullPool, # Important: no connection pooling for threads
603 echo=False,
604 )
605 else:
606 # Unencrypted fallback
607 logger.warning("Creating unencrypted thread session for metrics")
608 engine = create_engine(
609 f"sqlite:///{db_path}",
610 poolclass=NullPool,
611 echo=False,
612 )
614 # Create session
615 Session = sessionmaker(bind=engine)
616 return Session()
619# Global instance
620db_manager = DatabaseManager()