Coverage for src / local_deep_research / database / auth_db.py: 97%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Authentication database initialization and management. 

3This manages the central ldr_auth.db which only stores usernames. 

4""" 

5 

6import threading 

7from contextlib import contextmanager 

8from pathlib import Path 

9from typing import Optional 

10 

11from loguru import logger 

12from sqlalchemy import create_engine 

13from sqlalchemy.engine import Engine 

14from sqlalchemy.orm import sessionmaker, Session 

15from sqlalchemy.pool import QueuePool 

16from sqlalchemy.schema import CreateIndex, CreateTable 

17 

18from ..config.paths import get_data_directory 

19from .models.auth import User 

20 

21 

22# Global cached engine for auth database to prevent file descriptor leaks 

23_auth_engine: Optional[Engine] = None 

24_auth_engine_path: Optional[Path] = ( 

25 None # Track the path the engine was created for 

26) 

27_auth_engine_lock = threading.Lock() 

28 

29 

30def get_auth_db_path() -> Path: 

31 """Get the path to the authentication database.""" 

32 return get_data_directory() / "ldr_auth.db" 

33 

34 

35def _get_auth_engine() -> Engine: 

36 """ 

37 Get or create a cached engine for the auth database. 

38 

39 This prevents file descriptor leaks by reusing a single engine 

40 instead of creating a new one for every session. 

41 

42 The engine is invalidated if the data directory path changes 

43 (e.g., during testing when LDR_DATA_DIR is set to a temp directory). 

44 """ 

45 global _auth_engine, _auth_engine_path 

46 

47 auth_db_path = get_auth_db_path() 

48 

49 # Check if we have a cached engine for the current path 

50 if _auth_engine is not None and _auth_engine_path == auth_db_path: 

51 return _auth_engine 

52 

53 with _auth_engine_lock: 

54 # Double-check after acquiring lock 

55 if _auth_engine is not None and _auth_engine_path == auth_db_path: 

56 return _auth_engine 

57 

58 # If path changed, dispose old engine first 

59 if _auth_engine is not None and _auth_engine_path != auth_db_path: 

60 try: 

61 _auth_engine.dispose() 

62 logger.debug( 

63 "Disposed auth engine due to data directory change" 

64 ) 

65 except Exception as e: 

66 logger.warning(f"Error disposing old auth engine: {e}") 

67 _auth_engine = None 

68 _auth_engine_path = None 

69 

70 # Ensure database exists 

71 if not auth_db_path.exists(): 

72 init_auth_database() 

73 

74 # Create engine with connection pooling 

75 _auth_engine = create_engine( 

76 f"sqlite:///{auth_db_path}", 

77 poolclass=QueuePool, 

78 pool_size=5, 

79 max_overflow=10, 

80 pool_pre_ping=True, # Verify connections before use 

81 echo=False, 

82 ) 

83 _auth_engine_path = auth_db_path 

84 logger.debug("Created cached auth database engine") 

85 

86 return _auth_engine 

87 

88 

89def init_auth_database(): 

90 """Initialize the authentication database if it doesn't exist. 

91 

92 Uses SQL-level IF NOT EXISTS (via CreateTable/CreateIndex) so that 

93 concurrent calls are handled atomically by the database engine, 

94 eliminating the TOCTOU race in SQLAlchemy's checkfirst inspect. 

95 """ 

96 auth_db_path = get_auth_db_path() 

97 

98 # Ensure the data directory exists 

99 auth_db_path.parent.mkdir(parents=True, exist_ok=True) 

100 

101 logger.debug(f"Ensuring auth database at {auth_db_path}") 

102 

103 # Create the database with a temporary engine 

104 engine = create_engine(f"sqlite:///{auth_db_path}") 

105 

106 # Use SQL-level IF NOT EXISTS — idempotent and atomic, safe for 

107 # concurrent calls without Python-level locking. 

108 with engine.begin() as conn: 

109 conn.execute(CreateTable(User.__table__, if_not_exists=True)) 

110 for index in User.__table__.indexes: 

111 conn.execute(CreateIndex(index, if_not_exists=True)) 

112 

113 # Dispose the temporary engine 

114 engine.dispose() 

115 

116 logger.debug("Auth database initialized successfully") 

117 

118 

119def get_auth_db_session() -> Session: 

120 """ 

121 Get a session for the auth database. 

122 

123 IMPORTANT: The caller MUST close the session when done to return 

124 the connection to the pool. Use auth_db_session() context manager 

125 for automatic cleanup. 

126 """ 

127 engine = _get_auth_engine() 

128 SessionFactory = sessionmaker(bind=engine) 

129 return SessionFactory() 

130 

131 

132@contextmanager 

133def auth_db_session(): 

134 """ 

135 Context manager for auth database sessions. 

136 

137 Usage: 

138 with auth_db_session() as session: 

139 user = session.query(User).filter_by(username=username).first() 

140 

141 The session is automatically closed when the context exits. 

142 """ 

143 session = get_auth_db_session() 

144 try: 

145 yield session 

146 finally: 

147 session.close() 

148 

149 

150def dispose_auth_engine(): 

151 """ 

152 Dispose the auth engine (for shutdown/cleanup). 

153 """ 

154 global _auth_engine, _auth_engine_path 

155 

156 with _auth_engine_lock: 

157 if _auth_engine is not None: 

158 _auth_engine.dispose() 

159 _auth_engine = None 

160 _auth_engine_path = None 

161 logger.debug("Disposed auth database engine")