Coverage for src / local_deep_research / database / auth_db.py: 98%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Authentication database initialization and management. 

3This manages the central ldr_auth.db which only stores usernames. 

4""" 

5 

6import threading 

7from contextlib import contextmanager 

8from pathlib import Path 

9from typing import Optional 

10 

11from loguru import logger 

12from sqlalchemy import create_engine, event 

13from sqlalchemy.engine import Engine 

14from sqlalchemy.orm import sessionmaker, Session 

15from sqlalchemy.pool import QueuePool 

16from sqlalchemy.schema import CreateIndex, CreateTable 

17 

18from ..config.paths import get_data_directory 

19from .models.auth import User 

20from .pool_config import POOL_PRE_PING, POOL_RECYCLE_SECONDS 

21 

22 

23# Global cached engine for auth database to prevent file descriptor leaks 

24_auth_engine: Optional[Engine] = None 

25_auth_engine_path: Optional[Path] = ( 

26 None # Track the path the engine was created for 

27) 

28_auth_engine_lock = threading.Lock() 

29 

30 

31def get_auth_db_path() -> Path: 

32 """Get the path to the authentication database.""" 

33 return get_data_directory() / "ldr_auth.db" 

34 

35 

36def _get_auth_engine() -> Engine: 

37 """ 

38 Get or create a cached engine for the auth database. 

39 

40 This prevents file descriptor leaks by reusing a single engine 

41 instead of creating a new one for every session. 

42 

43 The engine is invalidated if the data directory path changes 

44 (e.g., during testing when LDR_DATA_DIR is set to a temp directory). 

45 """ 

46 global _auth_engine, _auth_engine_path 

47 

48 auth_db_path = get_auth_db_path() 

49 

50 # Check if we have a cached engine for the current path 

51 if _auth_engine is not None and _auth_engine_path == auth_db_path: 

52 return _auth_engine 

53 

54 with _auth_engine_lock: 

55 # Double-check after acquiring lock 

56 if _auth_engine is not None and _auth_engine_path == auth_db_path: 

57 return _auth_engine 

58 

59 # If path changed, dispose old engine first 

60 if _auth_engine is not None and _auth_engine_path != auth_db_path: 

61 try: 

62 _auth_engine.dispose() 

63 logger.debug( 

64 "Disposed auth engine due to data directory change" 

65 ) 

66 except Exception: 

67 logger.warning("Error disposing old auth engine") 

68 _auth_engine = None 

69 _auth_engine_path = None 

70 

71 # Ensure database exists 

72 if not auth_db_path.exists(): 

73 init_auth_database() 

74 

75 # Moderate pool — auth DB is unencrypted SQLite used by every 

76 # authenticated request via before_request middleware. 

77 # See ADR-0004 for pool sizing rationale. 

78 _auth_engine = create_engine( 

79 f"sqlite:///{auth_db_path}", 

80 poolclass=QueuePool, 

81 pool_size=10, 

82 max_overflow=20, 

83 pool_pre_ping=POOL_PRE_PING, 

84 pool_recycle=POOL_RECYCLE_SECONDS, 

85 echo=False, 

86 ) 

87 

88 def _apply_auth_pragmas(dbapi_connection, connection_record): 

89 dbapi_connection.execute("PRAGMA busy_timeout = 10000") 

90 dbapi_connection.execute("PRAGMA temp_store = MEMORY") 

91 

92 event.listen(_auth_engine, "connect", _apply_auth_pragmas) 

93 

94 _auth_engine_path = auth_db_path 

95 logger.debug("Created cached auth database engine") 

96 

97 return _auth_engine 

98 

99 

100def init_auth_database(): 

101 """Initialize the authentication database if it doesn't exist. 

102 

103 Uses SQL-level IF NOT EXISTS (via CreateTable/CreateIndex) so that 

104 concurrent calls are handled atomically by the database engine, 

105 eliminating the TOCTOU race in SQLAlchemy's checkfirst inspect. 

106 """ 

107 auth_db_path = get_auth_db_path() 

108 

109 # Ensure the data directory exists 

110 auth_db_path.parent.mkdir(parents=True, exist_ok=True) 

111 

112 logger.debug(f"Ensuring auth database at {auth_db_path}") 

113 

114 # Create the database with a temporary engine 

115 engine = create_engine(f"sqlite:///{auth_db_path}") 

116 

117 # Use SQL-level IF NOT EXISTS — idempotent and atomic, safe for 

118 # concurrent calls without Python-level locking. 

119 with engine.begin() as conn: 

120 conn.execute(CreateTable(User.__table__, if_not_exists=True)) 

121 for index in User.__table__.indexes: 

122 conn.execute(CreateIndex(index, if_not_exists=True)) 

123 

124 # Dispose the temporary engine 

125 engine.dispose() 

126 

127 logger.debug("Auth database initialized successfully") 

128 

129 

130def get_auth_db_session() -> Session: 

131 """ 

132 Get a session for the auth database. 

133 

134 IMPORTANT: The caller MUST close the session when done to return 

135 the connection to the pool. Use auth_db_session() context manager 

136 for automatic cleanup. 

137 """ 

138 engine = _get_auth_engine() 

139 SessionFactory = sessionmaker(bind=engine) 

140 return SessionFactory() 

141 

142 

143@contextmanager 

144def auth_db_session(): 

145 """ 

146 Context manager for auth database sessions. 

147 

148 Usage: 

149 with auth_db_session() as session: 

150 user = session.query(User).filter_by(username=username).first() 

151 

152 The session is automatically closed when the context exits. 

153 """ 

154 session = get_auth_db_session() 

155 try: 

156 yield session 

157 finally: 

158 from ..utilities.resource_utils import safe_close 

159 

160 safe_close(session, "auth DB session") 

161 

162 

163def dispose_auth_engine(): 

164 """ 

165 Dispose the auth engine (for shutdown/cleanup). 

166 """ 

167 global _auth_engine, _auth_engine_path 

168 

169 with _auth_engine_lock: 

170 if _auth_engine is not None: 

171 _auth_engine.dispose() 

172 _auth_engine = None 

173 _auth_engine_path = None 

174 logger.debug("Disposed auth database engine")