Coverage for src / local_deep_research / web / auth / connection_cleanup.py: 65%
94 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Automatic cleanup of idle database connections.
4Periodically closes database connections for users who have no active sessions
5and no active research, preventing resource leaks when users close their browser
6without logging out.
8Also periodically disposes all QueuePool engines to release accumulated WAL/SHM
9file handles. See ADR-0004 for why this is necessary with SQLCipher + WAL mode.
10"""
12import os
13import time
14from pathlib import Path
16from apscheduler.schedulers.background import BackgroundScheduler
17from loguru import logger
19from ...database.session_passwords import session_password_store
20from ...database.thread_local_session import cleanup_dead_threads
21from ...web.routes.globals import get_usernames_with_active_research
23# ---------------------------------------------------------------------------
24# File Descriptor Monitoring
25# ---------------------------------------------------------------------------
26# WHY: After days of idle operation in Docker, the app crashed with
27# OSError: [Errno 24] Too many open files
28# This monitoring logs the FD count every 5 minutes so we can correlate
29# FD growth with specific events and find leaks.
30#
31# WHAT IT LOGS:
32# - open_fds: total open file descriptors for the process
33# - pool_engines: number of per-user QueuePool engines
34# - pool_checked_out: connections currently checked out from QueuePool
35# - protected_users: users with active sessions
36#
37# HOW TO USE: grep "Resource monitor" in container logs. If open_fds
38# grows steadily over hours, something is leaking.
39# ---------------------------------------------------------------------------
41# Dispose all pool engines every 30 minutes to release WAL/SHM handles.
42# SQLCipher + WAL mode leaks handles when connections close out of order
43# (which QueuePool's pool_recycle causes). Periodic dispose() closes ALL
44# pooled connections at once, resetting the handle state cleanly.
45# The next DB operation transparently reopens a fresh connection.
46_DISPOSE_INTERVAL_SECONDS = 1800
47_last_dispose_time = 0.0
50def _count_open_fds() -> int:
51 """Count open file descriptors for the current process."""
52 proc_fd = Path("/proc/self/fd")
53 if proc_fd.is_dir(): 53 ↛ 58line 53 didn't jump to line 58 because the condition on line 53 was always true
54 try:
55 return len(list(proc_fd.iterdir()))
56 except OSError:
57 pass
58 import resource
60 soft_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
61 count = 0
62 for fd in range(soft_limit):
63 try:
64 os.fstat(fd)
65 count += 1
66 except OSError:
67 pass
68 return count
71def cleanup_idle_connections(session_manager, db_manager):
72 """Close db connections for users with no active sessions and no active research."""
73 # 1. Purge expired sessions first
74 session_manager.cleanup_expired_sessions()
76 # 2. Get protected usernames (active sessions OR active research)
77 active_usernames = session_manager.get_active_usernames()
78 researching_usernames = get_usernames_with_active_research()
79 protected = active_usernames | researching_usernames
81 # 3. Get usernames with open connections
82 connected_usernames = db_manager.get_connected_usernames()
84 # 4. Find idle candidates
85 candidates = connected_usernames - protected
87 # 5. Double-check before closing (narrows race window)
88 closed = 0
89 for username in candidates:
90 if session_manager.has_active_sessions_for(username):
91 logger.debug(
92 f"Skipped {username} (active session appeared since snapshot)"
93 )
94 continue # User logged in since snapshot
95 if username in get_usernames_with_active_research():
96 logger.debug(
97 f"Skipped {username} (active research appeared since snapshot)"
98 )
99 continue # Research started since snapshot
100 # Unregister news scheduler jobs (matches logout pattern in routes.py)
101 try:
102 from ...news.subscription_manager.scheduler import (
103 get_news_scheduler,
104 )
106 sched = get_news_scheduler()
107 if sched.is_running: 107 ↛ 113line 107 didn't jump to line 113 because the condition on line 107 was always true
108 sched.unregister_user(username)
109 except Exception:
110 logger.warning(
111 f"Failed to unregister scheduler for {username}",
112 )
113 try:
114 db_manager.close_user_database(username)
115 session_password_store.clear_all_for_user(username)
116 closed += 1
117 logger.debug(f"Closed idle connection for {username}")
118 except Exception:
119 logger.warning(f"Connection cleanup failed for {username}")
121 if closed:
122 logger.info(f"Connection cleanup: closed {closed} idle connection(s)")
123 logger.debug(
124 f"Connection cleanup: evaluated {len(candidates)} candidate(s), "
125 f"closed {closed}, protected {len(protected)} active user(s)"
126 )
128 # Sweep dead-thread sessions and credentials — safety net when neither
129 # HTTP requests nor the queue processor are triggering sweeps.
130 cleanup_dead_threads()
132 # --- Periodic pool dispose to release WAL/SHM handles ---
133 # SQLCipher + WAL mode accumulates file handles when QueuePool recycles
134 # connections out of open-order. Periodically calling dispose() on all
135 # engines closes ALL pooled connections, releasing any leaked handles.
136 # The pool is transparently recreated on the next DB operation.
137 global _last_dispose_time
138 now = time.monotonic()
139 if now - _last_dispose_time >= _DISPOSE_INTERVAL_SECONDS: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 _last_dispose_time = now
141 disposed = 0
142 with db_manager._connections_lock:
143 for username, engine in list(db_manager.connections.items()):
144 try:
145 engine.dispose()
146 disposed += 1
147 except Exception:
148 logger.debug(f"Error disposing engine for {username}")
149 if disposed:
150 logger.info(
151 f"Pool dispose: reset {disposed} engine(s) to release "
152 f"WAL/SHM handles"
153 )
155 # --- FD monitoring ---
156 try:
157 fd_count = _count_open_fds()
158 pool_engine_count = len(db_manager.connections)
159 pool_checked_out = 0
160 with db_manager._connections_lock:
161 for engine in db_manager.connections.values(): 161 ↛ 162line 161 didn't jump to line 162 because the loop on line 161 never started
162 try:
163 pool_checked_out += engine.pool.checkedout()
164 except Exception: # noqa: silent-exception
165 pass
166 logger.debug(
167 f"Resource monitor: open_fds={fd_count}, "
168 f"pool_engines={pool_engine_count}, "
169 f"pool_checked_out={pool_checked_out}, "
170 f"protected_users={len(protected)}"
171 )
172 if fd_count > 800: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 logger.warning(
174 f"High FD count ({fd_count}) — approaching system limit. "
175 f"Check for resource leaks."
176 )
177 except Exception:
178 logger.debug("FD monitoring failed") # noqa: silent-exception
181def start_connection_cleanup_scheduler(
182 session_manager, db_manager, interval_seconds=300
183):
184 """Start APScheduler job for periodic connection cleanup.
186 Args:
187 session_manager: The SessionManager singleton.
188 db_manager: The DatabaseManager singleton.
189 interval_seconds: How often to run cleanup (default: 5 minutes).
191 Returns:
192 The BackgroundScheduler instance (for shutdown registration).
193 """
194 scheduler = BackgroundScheduler()
195 scheduler.add_job(
196 cleanup_idle_connections,
197 "interval",
198 seconds=interval_seconds,
199 args=[session_manager, db_manager],
200 id="cleanup_idle_connections",
201 jitter=30,
202 )
203 scheduler.start()
204 logger.info(
205 f"Connection cleanup scheduler started "
206 f"(interval={interval_seconds}s, jitter=30s)"
207 )
208 return scheduler