Coverage for src/local_deep_research/web/auth/connection_cleanup.py: 94%
118 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Automatic cleanup of idle database connections.
4Periodically closes database connections for users who have no active sessions
5and no active research, preventing resource leaks when users close their browser
6without logging out.
8Also periodically disposes all QueuePool engines to release accumulated WAL/SHM
9file handles. See ADR-0004 for why this is necessary with SQLCipher + WAL mode.
10"""
12import os
13import time
14from pathlib import Path
16from apscheduler.schedulers.background import BackgroundScheduler
17from loguru import logger
19from ...database.session_passwords import session_password_store
20from ...database.thread_local_session import cleanup_dead_threads
21from ...web.routes.globals import get_usernames_with_active_research
23# ---------------------------------------------------------------------------
24# File Descriptor Monitoring
25# ---------------------------------------------------------------------------
26# WHY: After days of idle operation in Docker, the app crashed with
27# OSError: [Errno 24] Too many open files
28# This monitoring logs the FD count every 5 minutes so we can correlate
29# FD growth with specific events and find leaks.
30#
31# WHAT IT LOGS:
32# - open_fds: total open file descriptors for the process
33# - pool_engines: number of per-user QueuePool engines
34# - pool_checked_out: connections currently checked out from QueuePool
35# - protected_users: users with active sessions
36#
37# HOW TO USE: grep "Resource monitor" in container logs. If open_fds
38# grows steadily over hours, something is leaking.
39# ---------------------------------------------------------------------------
41# Dispose all pool engines every 30 minutes to release WAL/SHM handles.
42# SQLCipher + WAL mode leaks handles when connections close out of order
43# (which QueuePool's pool_recycle causes). Periodic dispose() closes ALL
44# pooled connections at once, resetting the handle state cleanly.
45# The next DB operation transparently reopens a fresh connection.
46_DISPOSE_INTERVAL_SECONDS = 1800
47_last_dispose_time = 0.0
50def _pop_per_user_locks(username: str) -> None:
51 """Pop ``username`` from the four module-level per-user lock dicts.
53 The library-init, backup, queue-processor, and library-RAG modules
54 each maintain a ``dict[..., threading.Lock]`` keyed by username (or
55 by ``(username, ...)``) for serialising per-user critical sections.
56 None of them had a removal hook, so without this the dicts
57 accumulated one entry per username across the process lifetime —
58 bounded by total users (~296 bytes/entry × 4 = ~1.2 KB/user/dict)
59 but real on long-lived self-hosted instances with churn. Called
60 from the user-close paths so the next login starts with fresh
61 locks (the locks hold no state worth preserving across
62 login/logout).
64 Lazy-imported here to keep this module's import graph shallow:
65 ``connection_cleanup`` runs at startup and shouldn't pull in the
66 queue / backup / library-init / library-RAG modules eagerly.
67 """
68 try:
69 from ...database.library_init import pop_user_init_lock
71 pop_user_init_lock(username)
72 except Exception:
73 # Surface at WARNING to match the sibling scheduler-unregister
74 # error handler in this same module (line ~111). A failure
75 # here means the lock-dict entry will accumulate on every
76 # subsequent close cycle for this user; we want it visible.
77 logger.warning(f"Failed to pop _user_init_locks for {username}")
79 try:
80 from ...database.backup.backup_service import pop_user_lock
82 pop_user_lock(username)
83 except Exception:
84 logger.warning(f"Failed to pop _user_locks for {username}")
86 try:
87 from ...web.queue.processor_v2 import queue_processor
89 queue_processor.pop_user_critical_lock(username)
90 except Exception:
91 logger.warning(f"Failed to pop _user_critical_locks for {username}")
93 try:
94 from ...research_library.services.library_rag_service import (
95 pop_faiss_locks_for_user,
96 )
98 pop_faiss_locks_for_user(username)
99 except Exception:
100 logger.warning(f"Failed to pop _faiss_write_locks for {username}")
103def _count_open_fds() -> int:
104 """Count open file descriptors for the current process."""
105 proc_fd = Path("/proc/self/fd")
106 if proc_fd.is_dir():
107 try:
108 return len(list(proc_fd.iterdir()))
109 except OSError:
110 pass
111 import resource
113 soft_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
114 count = 0
115 for fd in range(soft_limit):
116 try:
117 os.fstat(fd)
118 count += 1
119 except OSError:
120 pass
121 return count
124def cleanup_idle_connections(session_manager, db_manager):
125 """Close db connections for users with no active sessions and no active research."""
126 # 1. Purge expired sessions first
127 session_manager.cleanup_expired_sessions()
129 # 2. Get protected usernames (active sessions OR active research)
130 active_usernames = session_manager.get_active_usernames()
131 researching_usernames = get_usernames_with_active_research()
132 protected = active_usernames | researching_usernames
134 # 3. Get usernames with open connections
135 connected_usernames = db_manager.get_connected_usernames()
137 # 4. Find idle candidates
138 candidates = connected_usernames - protected
140 # 5. Double-check before closing (narrows race window)
141 closed = 0
142 for username in candidates:
143 if session_manager.has_active_sessions_for(username):
144 logger.debug(
145 f"Skipped {username} (active session appeared since snapshot)"
146 )
147 continue # User logged in since snapshot
148 if username in get_usernames_with_active_research():
149 logger.debug(
150 f"Skipped {username} (active research appeared since snapshot)"
151 )
152 continue # Research started since snapshot
153 # Unregister news scheduler jobs (matches logout pattern in routes.py)
154 try:
155 from ...scheduler.background import (
156 get_background_job_scheduler,
157 )
159 sched = get_background_job_scheduler()
160 if sched.is_running:
161 sched.unregister_user(username)
162 except Exception:
163 logger.warning(
164 f"Failed to unregister scheduler for {username}",
165 )
166 try:
167 db_manager.close_user_database(username)
168 session_password_store.clear_all_for_user(username)
169 closed += 1
170 logger.debug(f"Closed idle connection for {username}")
171 except Exception:
172 logger.warning(f"Connection cleanup failed for {username}")
173 # Pop lock-dict entries regardless of whether close succeeded.
174 # The lock-dict cleanup is independent of DB-engine teardown;
175 # putting it inside the try above would skip pop on the very
176 # failure path it most matters for (close raises -> next login
177 # rebuilds the engine but the stale lock entries linger).
178 _pop_per_user_locks(username)
180 if closed:
181 logger.info(f"Connection cleanup: closed {closed} idle connection(s)")
182 logger.debug(
183 f"Connection cleanup: evaluated {len(candidates)} candidate(s), "
184 f"closed {closed}, protected {len(protected)} active user(s)"
185 )
187 # Sweep dead-thread sessions and credentials — safety net when neither
188 # HTTP requests nor the queue processor are triggering sweeps.
189 cleanup_dead_threads()
191 # --- Periodic pool dispose to release WAL/SHM handles ---
192 # SQLCipher + WAL mode accumulates file handles when QueuePool recycles
193 # connections out of open-order (ADR-0004). Periodically calling
194 # dispose() on all engines closes ALL pooled connections, releasing any
195 # leaked handles. The pool is transparently recreated on the next DB
196 # operation.
197 #
198 # Safe to run against engines with checked-out connections: SA 2.0
199 # `QueuePool.dispose` only drains idle queue entries and
200 # `Engine.dispose` calls `pool.recreate()`; a thread holding a
201 # checked-out connection keeps using it until return. SA docs are
202 # explicit — "Connections that are still checked out will not be
203 # closed". The post-login bulk write (_perform_post_login_tasks in
204 # web/auth/routes.py) is additionally protected by being a single
205 # atomic transaction, so any interruption (dispose, crash, OOM)
206 # rolls back cleanly without leaving partial state.
207 #
208 # Do not add a `checkedout() > 0` skip guard here without first
209 # reproducing a real torn-write against the actual SA source path:
210 # see PR #3487 discussion — the speculative skip introduces an
211 # unbounded "skip forever" risk on busy engines in exchange for
212 # preventing a failure mode that SA 2.0 does not produce.
213 global _last_dispose_time
214 now = time.monotonic()
215 if now - _last_dispose_time >= _DISPOSE_INTERVAL_SECONDS:
216 _last_dispose_time = now
217 disposed = 0
218 with db_manager._connections_lock:
219 for username, engine in list(db_manager.connections.items()):
220 try:
221 db_manager._checkpoint_wal(engine, f"for {username}")
222 engine.dispose()
223 disposed += 1
224 except Exception as exc:
225 # Surface the failure. Pre-fix this was logger.debug,
226 # which hid the symptom — if WAL checkpoint or pool
227 # dispose repeatedly fails (disk pressure, lock
228 # starvation, etc.) the WAL file silently grows on
229 # disk and pooled connections leak. The 30-min
230 # periodic-dispose workaround for ADR-0004's WAL/SHM
231 # handle leak depends on this loop succeeding.
232 #
233 # Only the exception's TYPE NAME is logged, matching
234 # the codebase's `_report_silent_exception` pattern
235 # (utilities/log_utils.py:146-194). The exception
236 # value itself can carry sensitive locals (DB paths,
237 # query fragments, etc.) and our sensitive-logging
238 # hook flags any `f"...{exc}"` interpolation.
239 exc_type = type(exc).__name__
240 logger.warning(
241 f"Error disposing engine for {username}: {exc_type}"
242 )
243 if disposed:
244 logger.info(
245 f"Pool dispose: reset {disposed} engine(s) to release "
246 f"WAL/SHM handles"
247 )
249 # --- FD monitoring ---
250 try:
251 fd_count = _count_open_fds()
252 pool_engine_count = len(db_manager.connections)
253 pool_checked_out = 0
254 with db_manager._connections_lock:
255 for engine in db_manager.connections.values():
256 try:
257 pool_checked_out += engine.pool.checkedout()
258 except Exception: # noqa: silent-exception
259 pass
260 logger.debug(
261 f"Resource monitor: open_fds={fd_count}, "
262 f"pool_engines={pool_engine_count}, "
263 f"pool_checked_out={pool_checked_out}, "
264 f"protected_users={len(protected)}"
265 )
266 if fd_count > 800:
267 logger.warning(
268 f"High FD count ({fd_count}) — approaching system limit. "
269 f"Check for resource leaks."
270 )
271 except Exception:
272 logger.debug("FD monitoring failed") # noqa: silent-exception
275def start_connection_cleanup_scheduler(
276 session_manager, db_manager, interval_seconds=300
277):
278 """Start APScheduler job for periodic connection cleanup.
280 Args:
281 session_manager: The SessionManager singleton.
282 db_manager: The DatabaseManager singleton.
283 interval_seconds: How often to run cleanup (default: 5 minutes).
285 Returns:
286 The BackgroundScheduler instance (for shutdown registration).
287 """
288 scheduler = BackgroundScheduler()
289 scheduler.add_job(
290 cleanup_idle_connections,
291 "interval",
292 seconds=interval_seconds,
293 args=[session_manager, db_manager],
294 id="cleanup_idle_connections",
295 jitter=30,
296 )
297 scheduler.start()
298 logger.info(
299 f"Connection cleanup scheduler started "
300 f"(interval={interval_seconds}s, jitter=30s)"
301 )
302 return scheduler