Coverage for src / local_deep_research / database / encrypted_db.py: 88%

340 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Encrypted database management using SQLCipher. 

3Handles per-user encrypted databases with browser-friendly authentication. 

4""" 

5 

6import os 

7import threading 

8from pathlib import Path 

9from typing import Any, Dict, Optional, Tuple 

10 

11from loguru import logger 

12from sqlalchemy import create_engine, event, text 

13from sqlalchemy.engine import Engine 

14from sqlalchemy.orm import Session, sessionmaker 

15from sqlalchemy.pool import QueuePool, NullPool, StaticPool 

16 

17from ..config.paths import get_data_directory, get_user_database_filename 

18from ..settings.env_registry import get_env_setting 

19from .sqlcipher_compat import get_sqlcipher_module 

20from .sqlcipher_utils import ( 

21 set_sqlcipher_key, 

22 set_sqlcipher_rekey, 

23 apply_cipher_defaults_before_key, 

24 apply_sqlcipher_pragmas, 

25 apply_performance_pragmas, 

26 verify_sqlcipher_connection, 

27 create_database_salt, 

28 has_per_database_salt, 

29 get_key_from_password, 

30 get_sqlcipher_version, 

31 create_sqlcipher_connection, 

32) 

33 

34 

35class DatabaseManager: 

36 """Manages encrypted SQLCipher databases for each user.""" 

37 

38 def __init__(self): 

39 self.connections: Dict[str, Engine] = {} 

40 self._connections_lock = threading.RLock() 

41 # Track thread-specific engines for cleanup (key: (username, thread_id) tuple) 

42 self._thread_engines: Dict[Tuple[str, int], Engine] = {} 

43 self._thread_engine_lock = threading.Lock() 

44 self.data_dir = get_data_directory() / "encrypted_databases" 

45 self.data_dir.mkdir(parents=True, exist_ok=True) 

46 

47 # Check SQLCipher availability 

48 self.has_encryption = self._check_encryption_available() 

49 

50 # Determine pool class based on environment 

51 # Use StaticPool for testing to avoid locking issues 

52 self._use_static_pool = bool(os.environ.get("TESTING")) 

53 self._pool_class = StaticPool if self._use_static_pool else QueuePool 

54 

55 def _get_pool_kwargs(self) -> Dict[str, Any]: 

56 """Get pool configuration kwargs based on pool type. 

57 

58 StaticPool doesn't support pool_size or max_overflow, 

59 so we only include them for QueuePool. 

60 """ 

61 if self._use_static_pool: 

62 return {} 

63 return { 

64 "pool_size": 10, 

65 "max_overflow": 30, # Increase overflow so we don't run out of connections 

66 "pool_pre_ping": True, # Validate connections before use (prevents stale connections) 

67 } 

68 

69 def _is_valid_encryption_key(self, password: str) -> bool: 

70 """ 

71 Check if the provided password is valid (not None, empty, or whitespace-only). 

72 

73 Args: 

74 password: The password to check 

75 

76 Returns: 

77 True if the password is valid, False otherwise 

78 """ 

79 return password is not None and password.strip() != "" 

80 

81 def is_user_connected(self, username: str) -> bool: 

82 """Check if a user has an active database connection. 

83 

84 Thread-safe accessor for external callers. 

85 

86 Args: 

87 username: The username to check 

88 

89 Returns: 

90 True if the user has an active connection 

91 """ 

92 with self._connections_lock: 

93 return username in self.connections 

94 

95 def _check_encryption_available(self) -> bool: 

96 """Check if SQLCipher is available for encryption.""" 

97 try: 

98 import os as os_module 

99 import tempfile 

100 

101 # Test if SQLCipher actually works, not just if it imports 

102 with tempfile.NamedTemporaryFile(delete=False) as tmp: 

103 tmp_path = tmp.name 

104 

105 try: 

106 # Try to create a test encrypted database 

107 sqlcipher_module = get_sqlcipher_module() 

108 sqlcipher = sqlcipher_module.dbapi2 

109 

110 conn = sqlcipher.connect(tmp_path) 

111 try: 

112 cursor = conn.cursor() 

113 # Use creation_mode=True since we're creating a new test database 

114 apply_cipher_defaults_before_key(cursor) 

115 # Use centralized key setting 

116 set_sqlcipher_key(cursor, "testpass") 

117 # Apply post-key pragmas (kdf_iter for new DB) 

118 apply_sqlcipher_pragmas(cursor, creation_mode=True) 

119 apply_performance_pragmas(cursor) 

120 

121 # Check SQLCipher version 

122 version = get_sqlcipher_version(cursor) 

123 if version: 123 ↛ 135line 123 didn't jump to line 135 because the condition on line 123 was always true

124 major = ( 

125 version.split(".")[0] 

126 if "." in version 

127 else version[0] 

128 ) 

129 if major.isdigit() and int(major) < 4: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 logger.warning( 

131 f"SQLCipher version {version} detected. " 

132 "Version 4.x+ is recommended for proper PRAGMA ordering." 

133 ) 

134 

135 cursor.close() 

136 # Now use the connection for table operations 

137 conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)") 

138 conn.execute("INSERT INTO test VALUES (1)") 

139 result = conn.execute("SELECT * FROM test").fetchone() 

140 

141 if result != (1,): 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 raise Exception("SQLCipher encryption test failed") 

143 logger.info( 

144 "SQLCipher available and working - databases will be encrypted" 

145 ) 

146 return True 

147 finally: 

148 conn.close() 

149 except Exception as e: 

150 logger.warning(f"SQLCipher module found but not working: {e}") 

151 raise ImportError("SQLCipher not functional") 

152 finally: 

153 # Clean up test file 

154 try: 

155 os_module.unlink(tmp_path) 

156 except OSError as e: 

157 logger.debug( 

158 f"Failed to clean up temp file {tmp_path}: {e}" 

159 ) 

160 

161 except ImportError: 

162 # Check if user has explicitly allowed unencrypted databases. 

163 # Registry handles deprecated LDR_ALLOW_UNENCRYPTED fallback automatically. 

164 allow_unencrypted = get_env_setting( 

165 "bootstrap.allow_unencrypted", False 

166 ) 

167 

168 if not allow_unencrypted: 

169 logger.exception( 

170 "SECURITY ERROR: SQLCipher is not installed!\n" 

171 "Your databases will NOT be encrypted.\n" 

172 "To fix this:\n" 

173 "1. Install SQLCipher: sudo apt install sqlcipher libsqlcipher-dev\n" 

174 "2. Reinstall project: pdm install\n" 

175 "Or use Docker with SQLCipher pre-installed.\n\n" 

176 "To explicitly allow unencrypted databases (NOT RECOMMENDED):\n" 

177 "export LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true" 

178 ) 

179 raise RuntimeError( 

180 "SQLCipher not available. Set LDR_BOOTSTRAP_ALLOW_UNENCRYPTED=true to proceed without encryption (NOT RECOMMENDED)" 

181 ) 

182 else: 

183 logger.warning( 

184 "WARNING: Running with UNENCRYPTED databases!\n" 

185 "This means:\n" 

186 "- Passwords don't protect data access\n" 

187 "- API keys are stored in plain text\n" 

188 "- Anyone with file access can read all data\n" 

189 "Install SQLCipher for secure operation!" 

190 ) 

191 return False 

192 

193 def _get_user_db_path(self, username: str) -> Path: 

194 """Get the path for a user's encrypted database.""" 

195 return self.data_dir / get_user_database_filename(username) 

196 

197 def _apply_pragmas(self, connection, connection_record): 

198 """Apply pragmas for optimal performance.""" 

199 # Check if this is SQLCipher or regular SQLite 

200 is_encrypted = self.has_encryption 

201 

202 # Use centralized performance pragma application 

203 

204 apply_performance_pragmas(connection) 

205 

206 # SQLCipher-specific pragmas 

207 if is_encrypted: 

208 from .sqlcipher_utils import get_sqlcipher_settings 

209 

210 settings = get_sqlcipher_settings() 

211 pragmas = [ 

212 f"PRAGMA kdf_iter = {settings['kdf_iterations']}", 

213 f"PRAGMA cipher_page_size = {settings['page_size']}", 

214 ] 

215 for pragma in pragmas: 

216 try: 

217 connection.execute(pragma) 

218 except Exception as e: 

219 logger.debug(f"Could not apply pragma '{pragma}': {e}") 

220 else: 

221 # Regular SQLite pragma 

222 try: 

223 connection.execute( 

224 "PRAGMA mmap_size = 268435456" 

225 ) # 256MB memory mapping 

226 except Exception as e: 

227 logger.debug(f"Could not apply mmap_size pragma: {e}") 

228 

229 @staticmethod 

230 def _make_sqlcipher_connection( 

231 db_path: Path, 

232 password: str, 

233 isolation_level: Optional[str] = "IMMEDIATE", 

234 check_same_thread: bool = False, 

235 ) -> Any: 

236 """Create a properly initialized SQLCipher connection. 

237 

238 Follows the canonical SQLCipher initialization order: set key, 

239 apply cipher pragmas, verify, then apply performance pragmas. 

240 Cipher pragmas (page size, HMAC algorithm, KDF iterations) must 

241 be configured before the first query (verification) because that 

242 query triggers page decryption with the active cipher settings. 

243 

244 Args: 

245 db_path: Path to the database file 

246 password: The database encryption passphrase 

247 isolation_level: SQLite isolation level (``""`` for deferred 

248 transactions, ``None`` for autocommit) 

249 check_same_thread: SQLite check_same_thread flag 

250 

251 Returns: 

252 A raw ``sqlcipher3`` connection ready for use. 

253 

254 Raises: 

255 ValueError: If the database key cannot be verified. 

256 """ 

257 sqlcipher3 = get_sqlcipher_module() 

258 conn = sqlcipher3.connect( 

259 str(db_path), 

260 isolation_level=isolation_level, 

261 check_same_thread=check_same_thread, 

262 ) 

263 cursor = conn.cursor() 

264 

265 try: 

266 set_sqlcipher_key(cursor, password, db_path=db_path) 

267 apply_sqlcipher_pragmas(cursor, creation_mode=False) 

268 

269 if not verify_sqlcipher_connection(cursor): 

270 raise ValueError("Failed to verify database key") 

271 

272 apply_performance_pragmas(cursor) 

273 except Exception: 

274 try: 

275 cursor.close() 

276 except Exception: # noqa: BLE001 

277 logger.warning( 

278 "Failed to close cursor during cleanup", exc_info=True 

279 ) 

280 conn.close() 

281 raise 

282 

283 cursor.close() 

284 return conn 

285 

286 def create_user_database(self, username: str, password: str) -> Engine: 

287 """Create a new encrypted database for a user.""" 

288 

289 # Validate the encryption key 

290 if not self._is_valid_encryption_key(password): 

291 logger.error( 

292 f"Invalid encryption key for user {username}: password is None or empty" 

293 ) 

294 raise ValueError( 

295 "Invalid encryption key: password cannot be None or empty" 

296 ) 

297 

298 db_path = self._get_user_db_path(username) 

299 

300 if db_path.exists(): 

301 raise ValueError(f"Database already exists for user {username}") 

302 

303 # Create connection string - use regular SQLite when SQLCipher not available 

304 if self.has_encryption: 

305 # Create directory if it doesn't exist 

306 db_path.parent.mkdir(parents=True, exist_ok=True) 

307 

308 # Create per-database salt for new databases (v2 security improvement) 

309 create_database_salt(db_path) 

310 logger.info(f"Created per-database salt for {username}") 

311 

312 # Pre-derive key before closures to avoid capturing plaintext password 

313 hex_key = get_key_from_password(password, db_path=db_path).hex() 

314 

315 # Create database structure using raw SQLCipher outside SQLAlchemy 

316 try: 

317 conn = create_sqlcipher_connection( 

318 db_path, 

319 password=password, 

320 creation_mode=True, 

321 connect_kwargs={ 

322 "isolation_level": "IMMEDIATE", 

323 "check_same_thread": False, 

324 }, 

325 ) 

326 try: 

327 # Get the CREATE TABLE statements from SQLAlchemy models 

328 from sqlalchemy.dialects import sqlite 

329 from sqlalchemy.schema import CreateTable 

330 

331 from .models import Base 

332 

333 # Create tables one by one 

334 sqlite_dialect = sqlite.dialect() 

335 for table in Base.metadata.sorted_tables: 

336 if table.name != "users": 

337 create_sql = str( 

338 CreateTable(table).compile( 

339 dialect=sqlite_dialect 

340 ) 

341 ) 

342 logger.debug(f"Creating table {table.name}") 

343 conn.execute(create_sql) 

344 

345 conn.commit() 

346 finally: 

347 conn.close() 

348 

349 logger.info( 

350 f"Database structure created successfully for {username}" 

351 ) 

352 

353 except Exception: 

354 logger.exception("Error creating database structure") 

355 # Cleanup partial DB file on failure 

356 if db_path.exists(): 

357 db_path.unlink(missing_ok=True) 

358 raise 

359 

360 # Small delay to ensure file is fully written 

361 import time 

362 

363 time.sleep(0.1) 

364 

365 # Now create SQLAlchemy engine using custom connection creator 

366 def create_engine_connection(): 

367 """Create a properly initialized SQLCipher connection.""" 

368 return create_sqlcipher_connection( 

369 db_path, 

370 hex_key=hex_key, 

371 creation_mode=False, 

372 connect_kwargs={ 

373 "isolation_level": "IMMEDIATE", 

374 "check_same_thread": False, 

375 }, 

376 ) 

377 

378 # Create engine with custom creator function and optimized cache 

379 engine = create_engine( 

380 "sqlite://", 

381 creator=create_engine_connection, 

382 poolclass=self._pool_class, 

383 echo=False, 

384 query_cache_size=1000, 

385 **self._get_pool_kwargs(), 

386 ) 

387 else: 

388 logger.warning( 

389 f"SQLCipher not available - creating UNENCRYPTED database for user {username}" 

390 ) 

391 # Fall back to regular SQLite with query cache 

392 engine = create_engine( 

393 f"sqlite:///{db_path}", 

394 connect_args={"check_same_thread": False, "timeout": 30}, 

395 poolclass=self._pool_class, 

396 echo=False, 

397 query_cache_size=1000, 

398 **self._get_pool_kwargs(), 

399 ) 

400 

401 # For unencrypted databases, just apply pragmas 

402 event.listen(engine, "connect", self._apply_pragmas) 

403 

404 # Tables have already been created using raw SQLCipher above 

405 # No need to create them again with SQLAlchemy 

406 

407 # Store connection 

408 with self._connections_lock: 

409 self.connections[username] = engine 

410 

411 # Initialize database tables using centralized initialization 

412 from .initialize import initialize_database 

413 

414 try: 

415 # Create a session for settings initialization 

416 Session = sessionmaker(bind=engine) 

417 with Session() as session: 

418 initialize_database(engine, session) 

419 except Exception as e: 

420 logger.warning(f"Could not initialize database fully: {e}") 

421 # Still continue - basic tables were created above 

422 

423 logger.info(f"Created encrypted database for user {username}") 

424 return engine 

425 

426 def open_user_database( 

427 self, username: str, password: str 

428 ) -> Optional[Engine]: 

429 """Open an existing encrypted database for a user.""" 

430 

431 # Validate the encryption key 

432 if not self._is_valid_encryption_key(password): 

433 logger.error( 

434 f"Invalid encryption key when opening database for user {username}: password is None or empty" 

435 ) 

436 # TODO: Fix the root cause - research threads are not getting the correct password 

437 logger.error( 

438 "TODO: This usually means the research thread is not receiving the user's " 

439 "password for database encryption. Need to ensure password is passed from " 

440 "the main thread to research threads." 

441 ) 

442 raise ValueError( 

443 "Invalid encryption key: password cannot be None or empty" 

444 ) 

445 

446 # Check if already open 

447 with self._connections_lock: 

448 if username in self.connections: 

449 return self.connections[username] 

450 

451 db_path = self._get_user_db_path(username) 

452 

453 # Prevent timing attacks: always derive key before checking file existence 

454 # This ensures both existing and non-existent users take the same amount of time, 

455 # preventing username enumeration via timing analysis. 

456 # Pre-derive key before closures to avoid capturing plaintext password 

457 hex_key = get_key_from_password(password, db_path=db_path).hex() 

458 

459 if not db_path.exists(): 

460 logger.error(f"No database found for user {username}") 

461 return None 

462 

463 # Warn if this is a legacy database without per-database salt 

464 if self.has_encryption and not has_per_database_salt(db_path): 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true

465 logger.warning( 

466 f"Database for user '{username}' uses the legacy shared salt " 

467 f"(deprecated). For improved security, consider creating a new " 

468 f"account to get a per-database salt. Legacy databases remain " 

469 f"fully functional but are less resistant to multi-target attacks." 

470 ) 

471 

472 # Create connection string - use regular SQLite when SQLCipher not available 

473 if self.has_encryption: 

474 

475 def create_open_connection(): 

476 """Create a properly initialized SQLCipher connection.""" 

477 return create_sqlcipher_connection( 

478 db_path, 

479 hex_key=hex_key, 

480 creation_mode=False, 

481 connect_kwargs={ 

482 "isolation_level": "IMMEDIATE", 

483 "check_same_thread": False, 

484 }, 

485 ) 

486 

487 # Create engine with custom creator function and optimized cache 

488 engine = create_engine( 

489 "sqlite://", 

490 creator=create_open_connection, 

491 poolclass=self._pool_class, 

492 echo=False, 

493 query_cache_size=1000, 

494 **self._get_pool_kwargs(), 

495 ) 

496 else: 

497 logger.warning( 

498 f"SQLCipher not available - opening UNENCRYPTED database for user {username}" 

499 ) 

500 # Fall back to regular SQLite (no password protection!) 

501 engine = create_engine( 

502 f"sqlite:///{db_path}", 

503 connect_args={"check_same_thread": False, "timeout": 30}, 

504 poolclass=self._pool_class, 

505 echo=False, 

506 query_cache_size=1000, 

507 **self._get_pool_kwargs(), 

508 ) 

509 

510 # For unencrypted databases, just apply pragmas 

511 event.listen(engine, "connect", self._apply_pragmas) 

512 

513 try: 

514 # Test connection by running a simple query 

515 with engine.connect() as conn: 

516 conn.execute(text("SELECT 1")) 

517 

518 # Store connection 

519 with self._connections_lock: 

520 self.connections[username] = engine 

521 

522 # Run database initialization (creates missing tables and runs migrations) 

523 from .initialize import initialize_database 

524 

525 try: 

526 initialize_database(engine) 

527 except Exception as e: 

528 logger.warning(f"Could not run migrations for {username}: {e}") 

529 

530 logger.info(f"Opened encrypted database for user {username}") 

531 return engine 

532 

533 except Exception: 

534 logger.exception(f"Failed to open database for user {username}") 

535 engine.dispose() 

536 return None 

537 

538 def get_session(self, username: str) -> Optional[Session]: 

539 """Create a new session for a user's database.""" 

540 with self._connections_lock: 

541 if username not in self.connections: 

542 # Use debug level for this common scenario to reduce log noise 

543 logger.debug(f"No open database for user {username}") 

544 return None 

545 engine = self.connections[username] 

546 # Create session inside lock to prevent race with close_user_database() 

547 SessionLocal = sessionmaker(bind=engine) 

548 return SessionLocal() 

549 

550 def close_user_database(self, username: str): 

551 """Close a user's database connection.""" 

552 with self._connections_lock: 

553 if username in self.connections: 

554 self.connections[username].dispose() 

555 del self.connections[username] 

556 logger.info(f"Closed database for user {username}") 

557 

558 # Also cleanup any thread engines for this user 

559 self.cleanup_thread_engines(username=username) 

560 

561 def check_database_integrity(self, username: str) -> bool: 

562 """Check integrity of a user's encrypted database.""" 

563 with self._connections_lock: 

564 if username not in self.connections: 

565 return False 

566 engine = self.connections[username] 

567 

568 try: 

569 with engine.connect() as conn: 

570 # Quick integrity check 

571 result = conn.execute(text("PRAGMA quick_check")) 

572 if result.fetchone()[0] != "ok": 

573 return False 

574 

575 # SQLCipher integrity check 

576 result = conn.execute(text("PRAGMA cipher_integrity_check")) 

577 # If this returns any rows, there are HMAC failures 

578 failures = list(result) 

579 if failures: 

580 logger.error( 

581 f"Integrity check failed for {username}: {len(failures)} HMAC failures" 

582 ) 

583 return False 

584 

585 return True 

586 

587 except Exception: 

588 logger.exception(f"Integrity check error for user: {username}") 

589 return False 

590 

591 def change_password( 

592 self, username: str, old_password: str, new_password: str 

593 ) -> bool: 

594 """Change the encryption password for a user's database. 

595 

596 This rekeys the SQLCipher database — no separate auth-DB 

597 password-hash update is needed because passwords are never 

598 stored. Login verification is done by attempting decryption. 

599 """ 

600 if not self.has_encryption: 

601 logger.warning( 

602 "Cannot change password - SQLCipher not available (databases are unencrypted)" 

603 ) 

604 return False 

605 

606 db_path = self._get_user_db_path(username) 

607 

608 if not db_path.exists(): 

609 return False 

610 

611 try: 

612 # Close existing connection if any 

613 self.close_user_database(username) 

614 

615 # Open with old password 

616 engine = self.open_user_database(username, old_password) 

617 if not engine: 

618 return False 

619 

620 # Rekey the database (only works with SQLCipher) 

621 with engine.connect() as conn: 

622 # Use centralized rekey function 

623 set_sqlcipher_rekey(conn, new_password, db_path=db_path) 

624 

625 logger.info(f"Password changed for user {username}") 

626 return True 

627 

628 except Exception: 

629 logger.exception(f"Failed to change password for user: {username}") 

630 return False 

631 finally: 

632 # Close the connection 

633 self.close_user_database(username) 

634 

635 def user_exists(self, username: str) -> bool: 

636 """Check if a user exists in the auth database.""" 

637 from .auth_db import auth_db_session 

638 from .models.auth import User 

639 

640 with auth_db_session() as session: 

641 user = session.query(User).filter_by(username=username).first() 

642 return user is not None 

643 

644 def get_memory_usage(self) -> Dict[str, Any]: 

645 """Get memory usage statistics.""" 

646 with self._connections_lock: 

647 num_connections = len(self.connections) 

648 return { 

649 "active_connections": num_connections, 

650 "thread_engines": len(self._thread_engines), 

651 "active_sessions": 0, # Sessions are created on-demand, not tracked 

652 "estimated_memory_mb": (num_connections + len(self._thread_engines)) 

653 * 3.5, # ~3.5MB per connection 

654 } 

655 

656 def cleanup_thread_engines( 

657 self, username: str = None, thread_id: int = None 

658 ): 

659 """ 

660 Clean up thread-specific engines. 

661 

662 Args: 

663 username: If provided, cleanup engines for this user only 

664 thread_id: If provided, cleanup engines for this thread only 

665 If neither provided, cleanup current thread's engines 

666 """ 

667 if thread_id is None and username is None: 

668 thread_id = threading.get_ident() 

669 

670 with self._thread_engine_lock: 

671 keys_to_remove = [] 

672 for key in self._thread_engines: 

673 # Key is a tuple: (username, thread_id) 

674 key_user, key_thread = key 

675 

676 should_remove = False 

677 if username and thread_id: 

678 should_remove = ( 

679 key_user == username and key_thread == thread_id 

680 ) 

681 elif username: 

682 should_remove = key_user == username 

683 elif thread_id: 683 ↛ 686line 683 didn't jump to line 686 because the condition on line 683 was always true

684 should_remove = key_thread == thread_id 

685 

686 if should_remove: 

687 keys_to_remove.append(key) 

688 

689 for key in keys_to_remove: 

690 try: 

691 self._thread_engines[key].dispose() 

692 logger.debug(f"Disposed thread engine: {key}") 

693 except Exception as e: 

694 logger.warning(f"Error disposing thread engine {key}: {e}") 

695 del self._thread_engines[key] 

696 

697 def cleanup_all_thread_engines(self): 

698 """Clean up all thread-specific engines (for shutdown).""" 

699 with self._thread_engine_lock: 

700 count = len(self._thread_engines) 

701 for key, engine in list(self._thread_engines.items()): 

702 try: 

703 engine.dispose() 

704 except Exception as e: 

705 logger.warning(f"Error disposing thread engine {key}: {e}") 

706 self._thread_engines.clear() 

707 logger.info(f"All thread engines disposed ({count} total)") 

708 

709 def create_thread_safe_session_for_metrics( 

710 self, username: str, password: str 

711 ): 

712 """ 

713 Create a new database session safe for use in background threads. 

714 This is specifically for metrics/logging - NOT for settings or user data. 

715 

716 Args: 

717 username: The username 

718 password: The user's password (encryption key) 

719 

720 Returns: 

721 A SQLAlchemy session that can be used in the current thread 

722 

723 IMPORTANT: This should ONLY be used for: 

724 - Writing token metrics 

725 - Writing search metrics 

726 - Writing logs 

727 

728 DO NOT use this for: 

729 - Reading/writing settings 

730 - Modifying user data 

731 - Any operation that should be synchronized with user requests 

732 """ 

733 db_path = self._get_user_db_path(username) 

734 

735 # Warn if this is a legacy database without per-database salt 

736 if self.has_encryption and not has_per_database_salt(db_path): 

737 logger.warning( 

738 f"Database for user '{username}' uses a legacy shared salt " 

739 f"(deprecated). For improved security, consider creating a new " 

740 f"account to get a per-database salt. Legacy databases remain " 

741 f"fully functional but are less resistant to multi-target attacks." 

742 ) 

743 

744 # Prevent timing attacks: always derive key before checking file existence 

745 # This ensures both existing and non-existent users take the same amount of time, 

746 # preventing username enumeration via timing analysis. 

747 # Pre-derive key before closures 

748 hex_key = get_key_from_password(password, db_path=db_path).hex() 

749 

750 if not db_path.exists(): 

751 raise ValueError(f"No database found for user {username}") 

752 

753 # Create thread-specific key for engine reuse (tuple is cleaner than string) 

754 thread_id = threading.get_ident() 

755 engine_key = (username, thread_id) 

756 

757 # Check for existing engine for this thread/user combo 

758 with self._thread_engine_lock: 

759 if engine_key in self._thread_engines: 

760 engine = self._thread_engines[engine_key] 

761 # Verify engine is still valid 

762 try: 

763 with engine.connect() as conn: 

764 conn.execute(text("SELECT 1")) 

765 # Engine is valid, create session from it 

766 Session = sessionmaker(bind=engine) 

767 return Session() 

768 except Exception as e: 

769 # Engine is stale, remove it and create new one 

770 logger.debug( 

771 f"Stale thread engine for {engine_key}, recreating: {e}" 

772 ) 

773 try: 

774 engine.dispose() 

775 except Exception as dispose_error: 

776 logger.debug( 

777 f"Error disposing stale engine: {dispose_error}" 

778 ) 

779 del self._thread_engines[engine_key] 

780 

781 # Create a thread-local engine 

782 if self.has_encryption: 782 ↛ 807line 782 didn't jump to line 807 because the condition on line 782 was always true

783 

784 def create_thread_connection(): 

785 """Create a SQLCipher connection for this thread.""" 

786 try: 

787 return create_sqlcipher_connection( 

788 db_path, 

789 hex_key=hex_key, 

790 creation_mode=False, 

791 connect_kwargs={"check_same_thread": False}, 

792 ) 

793 except Exception: 

794 logger.exception( 

795 f"Failed to create thread connection for {username}" 

796 ) 

797 raise 

798 

799 engine = create_engine( 

800 "sqlite://", 

801 creator=create_thread_connection, 

802 poolclass=NullPool, # Important: no connection pooling for threads 

803 echo=False, 

804 ) 

805 else: 

806 # Unencrypted fallback 

807 logger.warning("Creating unencrypted thread session for metrics") 

808 engine = create_engine( 

809 f"sqlite:///{db_path}", 

810 poolclass=NullPool, 

811 echo=False, 

812 ) 

813 

814 # Store engine for reuse within this thread 

815 with self._thread_engine_lock: 

816 self._thread_engines[engine_key] = engine 

817 

818 # Create session 

819 Session = sessionmaker(bind=engine) 

820 return Session() 

821 

822 

823# Global instance 

824db_manager = DatabaseManager()