Coverage for src / local_deep_research / utilities / db_utils.py: 86%
99 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import functools
2from typing import Any, Callable, Dict
4from cachetools import LRUCache
5from flask import (
6 g,
7 has_app_context,
8 has_request_context,
9 session as flask_session,
10)
11from loguru import logger
12from sqlalchemy.orm import Session
14from ..config.paths import get_data_directory
15from ..database.encrypted_db import db_manager
16from .threading_utils import thread_specific_cache
18# Database paths using new centralized configuration
19DATA_DIR = get_data_directory()
20# DB_PATH removed - use per-user encrypted databases instead
23@thread_specific_cache(cache=LRUCache(maxsize=10))
24def get_db_session(
25 _namespace: str = "", username: str | None = None
26) -> Session:
27 """
28 Get database session - uses encrypted per-user database if authenticated.
30 Args:
31 _namespace: This can be specified to an arbitrary string in order to
32 force the caching mechanism to create separate settings even in
33 the same thread. Usually it does not need to be specified.
34 username: Optional username for thread context (e.g., background research threads).
35 If not provided, will try to get from Flask context.
37 Returns:
38 The database session for the current user/context.
39 """
40 # CRITICAL: Detect if we're in a background thread and raise an error
41 # This helps identify code that's trying to access the database from threads
42 import threading
44 # Check if we're in a background thread (not in Flask request context)
45 # We check for request context specifically because app context might exist
46 # during startup but we still shouldn't access the database from background threads
47 thread_name = threading.current_thread().name
49 # Allow MainThread during startup, but not other threads
50 if not has_app_context() and thread_name != "MainThread":
51 thread_id = threading.get_ident()
52 raise RuntimeError(
53 f"Database access attempted from background thread '{thread_name}' (ID: {thread_id}). "
54 f"Database access from threads is not allowed due to SQLite thread safety constraints. "
55 f"Use settings_snapshot or pass all required data to the thread at creation time."
56 )
58 # If username is explicitly provided (e.g., from background thread)
59 if username:
60 user_session = db_manager.get_session(username)
61 if user_session:
62 return user_session
63 raise RuntimeError(f"No database found for user {username}")
65 # Otherwise, check Flask request context
66 try:
67 # Try lazy session creation via Flask g
68 from ..database.session_context import get_g_db_session
70 db_session = get_g_db_session()
71 if db_session:
72 return db_session
74 # Check if we have a username in the Flask session
75 username = flask_session.get("username")
76 if username:
77 user_session = db_manager.get_session(username)
78 if user_session:
79 return user_session
80 except Exception:
81 logger.debug(
82 "Flask context unavailable (CLI/background threads)", exc_info=True
83 )
85 # No shared database - return None to allow SettingsManager to work without DB
86 logger.warning(
87 "get_db_session() is deprecated. Use get_user_db_session() from database.session_context"
88 )
89 return None
92def get_settings_manager(
93 db_session: Session | None = None, username: str | None = None
94):
95 """
96 Get the settings manager for the current context.
98 Args:
99 db_session: Optional database session
100 username: Optional username for caching (required for SettingsManager)
102 Returns:
103 The appropriate settings manager instance.
104 """
105 # Track whether we are borrowing a session we don't own (caller-provided
106 # or Flask g.db_session). Borrowed sessions must NOT be closed by
107 # SettingsManager — their owner is responsible for cleanup.
108 borrowed_session = db_session is not None
110 # Reuse the Flask request session if it belongs to the requested user.
111 # This MUST happen before get_db_session() because that function is
112 # wrapped in @thread_specific_cache — the cache bypasses the function
113 # body on subsequent calls, so any g.db_session check inside it only
114 # fires once per thread.
115 if db_session is None:
116 try:
117 from ..database.session_context import get_g_db_session
119 if has_request_context():
120 lazy_session = get_g_db_session()
121 if lazy_session: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 g_user = getattr(g, "current_user", None)
123 if not isinstance(g_user, str):
124 g_user = getattr(g_user, "username", None)
125 if username is None or g_user == username:
126 db_session = lazy_session
127 borrowed_session = True
128 except Exception:
129 logger.debug("Could not reuse Flask request session", exc_info=True)
131 if db_session is None and username is None and has_request_context():
132 username = flask_session.get("username")
134 if db_session is None:
135 try:
136 db_session = get_db_session(username=username)
137 except RuntimeError:
138 # No authenticated user - settings manager will use defaults
139 db_session = None
140 username = "anonymous"
141 else:
142 # get_db_session() may return g.db_session via its internal
143 # check (line ~68). If it did, we must mark it as borrowed
144 # to prevent both SettingsManager.close() and Flask teardown
145 # from closing the same session.
146 if not borrowed_session and db_session is not None:
147 try:
148 if ( 148 ↛ anywhereline 148 didn't jump anywhere: it always raised an exception.
149 has_request_context()
150 and getattr(g, "db_session", None) is not None
151 and db_session is g.db_session
152 ):
153 borrowed_session = True
154 logger.warning(
155 "get_db_session() returned g.db_session after "
156 "direct g.db_session check failed — marking as "
157 "borrowed to prevent double-close"
158 )
159 except Exception:
160 logger.debug(
161 "Could not verify session identity after "
162 "get_db_session()",
163 exc_info=True,
164 )
166 # Import here to avoid circular imports
167 from ..settings import SettingsManager
169 logger.debug(
170 "get_settings_manager: session_source={}, owned={}",
171 "borrowed" if borrowed_session else ("new" if db_session else "None"),
172 not borrowed_session,
173 )
175 # Always use regular SettingsManager (now with built-in simple caching)
176 return SettingsManager(db_session, owns_session=not borrowed_session)
179def no_db_settings(func: Callable[..., Any]) -> Callable[..., Any]:
180 """
181 Decorator that runs the wrapped function with the settings database
182 completely disabled. This will prevent the function from accidentally
183 reading settings from the DB. Settings can only be read from environment
184 variables or the defaults file.
186 Args:
187 func: The function to wrap.
189 Returns:
190 The wrapped function.
192 """
194 @functools.wraps(func)
195 def wrapper(*args, **kwargs):
196 # Temporarily disable DB access in the settings manager.
197 manager = get_settings_manager()
198 db_session = manager.db_session
199 manager.db_session = None
201 try:
202 return func(*args, **kwargs)
203 finally:
204 # Restore the original database session.
205 manager.db_session = db_session
207 return wrapper
210def get_setting_from_db_main_thread(
211 key: str, default_value: Any | None = None, username: str | None = None
212) -> str | Dict[str, Any] | None:
213 """
214 Get a setting from the database with fallback to default value
216 Args:
217 key: The setting key.
218 default_value: If the setting is not found, it will return this instead.
219 username: Optional username for thread context (e.g., background research threads).
221 Returns:
222 The setting value.
224 """
225 # CRITICAL: Detect if we're in a background thread and raise an error
226 import threading
228 # Check if we're in a background thread
229 thread_name = threading.current_thread().name
231 # Allow MainThread during startup, but not other threads
232 if not has_app_context() and thread_name != "MainThread":
233 thread_id = threading.get_ident()
234 raise RuntimeError(
235 f"get_db_setting('{key}') called from background thread '{thread_name}' (ID: {thread_id}). "
236 f"Database access from threads is not allowed. Use settings_snapshot or thread-local settings context."
237 )
239 try:
240 # Use the new session context to ensure proper database access
241 from ..database.session_context import get_user_db_session
243 try:
244 with get_user_db_session(username) as db_session:
245 if db_session:
246 # Use the unified settings manager
247 settings_manager = get_settings_manager(
248 db_session, username
249 )
250 return settings_manager.get_setting(
251 key, default=default_value
252 )
253 except Exception:
254 logger.debug(
255 "fall back to default if session unavailable", exc_info=True
256 )
258 except Exception:
259 logger.exception(f"Error getting setting {key} from database")
261 logger.warning(f"Could not read setting '{key}' from the database.")
262 return default_value