Coverage for src / local_deep_research / utilities / db_utils.py: 86%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import functools 

2from typing import Any, Callable, Dict 

3 

4from cachetools import LRUCache 

5from flask import ( 

6 g, 

7 has_app_context, 

8 has_request_context, 

9 session as flask_session, 

10) 

11from loguru import logger 

12from sqlalchemy.orm import Session 

13 

14from ..config.paths import get_data_directory 

15from ..database.encrypted_db import db_manager 

16from .threading_utils import thread_specific_cache 

17 

18# Database paths using new centralized configuration 

19DATA_DIR = get_data_directory() 

20# DB_PATH removed - use per-user encrypted databases instead 

21 

22 

23@thread_specific_cache(cache=LRUCache(maxsize=10)) 

24def get_db_session( 

25 _namespace: str = "", username: str | None = None 

26) -> Session: 

27 """ 

28 Get database session - uses encrypted per-user database if authenticated. 

29 

30 Args: 

31 _namespace: This can be specified to an arbitrary string in order to 

32 force the caching mechanism to create separate settings even in 

33 the same thread. Usually it does not need to be specified. 

34 username: Optional username for thread context (e.g., background research threads). 

35 If not provided, will try to get from Flask context. 

36 

37 Returns: 

38 The database session for the current user/context. 

39 """ 

40 # CRITICAL: Detect if we're in a background thread and raise an error 

41 # This helps identify code that's trying to access the database from threads 

42 import threading 

43 

44 # Check if we're in a background thread (not in Flask request context) 

45 # We check for request context specifically because app context might exist 

46 # during startup but we still shouldn't access the database from background threads 

47 thread_name = threading.current_thread().name 

48 

49 # Allow MainThread during startup, but not other threads 

50 if not has_app_context() and thread_name != "MainThread": 

51 thread_id = threading.get_ident() 

52 raise RuntimeError( 

53 f"Database access attempted from background thread '{thread_name}' (ID: {thread_id}). " 

54 f"Database access from threads is not allowed due to SQLite thread safety constraints. " 

55 f"Use settings_snapshot or pass all required data to the thread at creation time." 

56 ) 

57 

58 # If username is explicitly provided (e.g., from background thread) 

59 if username: 

60 user_session = db_manager.get_session(username) 

61 if user_session: 

62 return user_session 

63 raise RuntimeError(f"No database found for user {username}") 

64 

65 # Otherwise, check Flask request context 

66 try: 

67 # Try lazy session creation via Flask g 

68 from ..database.session_context import get_g_db_session 

69 

70 db_session = get_g_db_session() 

71 if db_session: 

72 return db_session 

73 

74 # Check if we have a username in the Flask session 

75 username = flask_session.get("username") 

76 if username: 

77 user_session = db_manager.get_session(username) 

78 if user_session: 

79 return user_session 

80 except Exception: 

81 logger.debug( 

82 "Flask context unavailable (CLI/background threads)", exc_info=True 

83 ) 

84 

85 # No shared database - return None to allow SettingsManager to work without DB 

86 logger.warning( 

87 "get_db_session() is deprecated. Use get_user_db_session() from database.session_context" 

88 ) 

89 return None 

90 

91 

92def get_settings_manager( 

93 db_session: Session | None = None, username: str | None = None 

94): 

95 """ 

96 Get the settings manager for the current context. 

97 

98 Args: 

99 db_session: Optional database session 

100 username: Optional username for caching (required for SettingsManager) 

101 

102 Returns: 

103 The appropriate settings manager instance. 

104 """ 

105 # Track whether we are borrowing a session we don't own (caller-provided 

106 # or Flask g.db_session). Borrowed sessions must NOT be closed by 

107 # SettingsManager — their owner is responsible for cleanup. 

108 borrowed_session = db_session is not None 

109 

110 # Reuse the Flask request session if it belongs to the requested user. 

111 # This MUST happen before get_db_session() because that function is 

112 # wrapped in @thread_specific_cache — the cache bypasses the function 

113 # body on subsequent calls, so any g.db_session check inside it only 

114 # fires once per thread. 

115 if db_session is None: 

116 try: 

117 from ..database.session_context import get_g_db_session 

118 

119 if has_request_context(): 

120 lazy_session = get_g_db_session() 

121 if lazy_session: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 g_user = getattr(g, "current_user", None) 

123 if not isinstance(g_user, str): 

124 g_user = getattr(g_user, "username", None) 

125 if username is None or g_user == username: 

126 db_session = lazy_session 

127 borrowed_session = True 

128 except Exception: 

129 logger.debug("Could not reuse Flask request session", exc_info=True) 

130 

131 if db_session is None and username is None and has_request_context(): 

132 username = flask_session.get("username") 

133 

134 if db_session is None: 

135 try: 

136 db_session = get_db_session(username=username) 

137 except RuntimeError: 

138 # No authenticated user - settings manager will use defaults 

139 db_session = None 

140 username = "anonymous" 

141 else: 

142 # get_db_session() may return g.db_session via its internal 

143 # check (line ~68). If it did, we must mark it as borrowed 

144 # to prevent both SettingsManager.close() and Flask teardown 

145 # from closing the same session. 

146 if not borrowed_session and db_session is not None: 

147 try: 

148 if ( 148 ↛ anywhereline 148 didn't jump anywhere: it always raised an exception.

149 has_request_context() 

150 and getattr(g, "db_session", None) is not None 

151 and db_session is g.db_session 

152 ): 

153 borrowed_session = True 

154 logger.warning( 

155 "get_db_session() returned g.db_session after " 

156 "direct g.db_session check failed — marking as " 

157 "borrowed to prevent double-close" 

158 ) 

159 except Exception: 

160 logger.debug( 

161 "Could not verify session identity after " 

162 "get_db_session()", 

163 exc_info=True, 

164 ) 

165 

166 # Import here to avoid circular imports 

167 from ..settings import SettingsManager 

168 

169 logger.debug( 

170 "get_settings_manager: session_source={}, owned={}", 

171 "borrowed" if borrowed_session else ("new" if db_session else "None"), 

172 not borrowed_session, 

173 ) 

174 

175 # Always use regular SettingsManager (now with built-in simple caching) 

176 return SettingsManager(db_session, owns_session=not borrowed_session) 

177 

178 

179def no_db_settings(func: Callable[..., Any]) -> Callable[..., Any]: 

180 """ 

181 Decorator that runs the wrapped function with the settings database 

182 completely disabled. This will prevent the function from accidentally 

183 reading settings from the DB. Settings can only be read from environment 

184 variables or the defaults file. 

185 

186 Args: 

187 func: The function to wrap. 

188 

189 Returns: 

190 The wrapped function. 

191 

192 """ 

193 

194 @functools.wraps(func) 

195 def wrapper(*args, **kwargs): 

196 # Temporarily disable DB access in the settings manager. 

197 manager = get_settings_manager() 

198 db_session = manager.db_session 

199 manager.db_session = None 

200 

201 try: 

202 return func(*args, **kwargs) 

203 finally: 

204 # Restore the original database session. 

205 manager.db_session = db_session 

206 

207 return wrapper 

208 

209 

210def get_setting_from_db_main_thread( 

211 key: str, default_value: Any | None = None, username: str | None = None 

212) -> str | Dict[str, Any] | None: 

213 """ 

214 Get a setting from the database with fallback to default value 

215 

216 Args: 

217 key: The setting key. 

218 default_value: If the setting is not found, it will return this instead. 

219 username: Optional username for thread context (e.g., background research threads). 

220 

221 Returns: 

222 The setting value. 

223 

224 """ 

225 # CRITICAL: Detect if we're in a background thread and raise an error 

226 import threading 

227 

228 # Check if we're in a background thread 

229 thread_name = threading.current_thread().name 

230 

231 # Allow MainThread during startup, but not other threads 

232 if not has_app_context() and thread_name != "MainThread": 

233 thread_id = threading.get_ident() 

234 raise RuntimeError( 

235 f"get_db_setting('{key}') called from background thread '{thread_name}' (ID: {thread_id}). " 

236 f"Database access from threads is not allowed. Use settings_snapshot or thread-local settings context." 

237 ) 

238 

239 try: 

240 # Use the new session context to ensure proper database access 

241 from ..database.session_context import get_user_db_session 

242 

243 try: 

244 with get_user_db_session(username) as db_session: 

245 if db_session: 

246 # Use the unified settings manager 

247 settings_manager = get_settings_manager( 

248 db_session, username 

249 ) 

250 return settings_manager.get_setting( 

251 key, default=default_value 

252 ) 

253 except Exception: 

254 logger.debug( 

255 "fall back to default if session unavailable", exc_info=True 

256 ) 

257 

258 except Exception: 

259 logger.exception(f"Error getting setting {key} from database") 

260 

261 logger.warning(f"Could not read setting '{key}' from the database.") 

262 return default_value