Coverage for src/local_deep_research/web/auth/connection_cleanup.py: 94%

118 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Automatic cleanup of idle database connections. 

3 

4Periodically closes database connections for users who have no active sessions 

5and no active research, preventing resource leaks when users close their browser 

6without logging out. 

7 

8Also periodically disposes all QueuePool engines to release accumulated WAL/SHM 

9file handles. See ADR-0004 for why this is necessary with SQLCipher + WAL mode. 

10""" 

11 

12import os 

13import time 

14from pathlib import Path 

15 

16from apscheduler.schedulers.background import BackgroundScheduler 

17from loguru import logger 

18 

19from ...database.session_passwords import session_password_store 

20from ...database.thread_local_session import cleanup_dead_threads 

21from ...web.routes.globals import get_usernames_with_active_research 

22 

23# --------------------------------------------------------------------------- 

24# File Descriptor Monitoring 

25# --------------------------------------------------------------------------- 

26# WHY: After days of idle operation in Docker, the app crashed with 

27# OSError: [Errno 24] Too many open files 

28# This monitoring logs the FD count every 5 minutes so we can correlate 

29# FD growth with specific events and find leaks. 

30# 

31# WHAT IT LOGS: 

32# - open_fds: total open file descriptors for the process 

33# - pool_engines: number of per-user QueuePool engines 

34# - pool_checked_out: connections currently checked out from QueuePool 

35# - protected_users: users with active sessions 

36# 

37# HOW TO USE: grep "Resource monitor" in container logs. If open_fds 

38# grows steadily over hours, something is leaking. 

39# --------------------------------------------------------------------------- 

40 

41# Dispose all pool engines every 30 minutes to release WAL/SHM handles. 

42# SQLCipher + WAL mode leaks handles when connections close out of order 

43# (which QueuePool's pool_recycle causes). Periodic dispose() closes ALL 

44# pooled connections at once, resetting the handle state cleanly. 

45# The next DB operation transparently reopens a fresh connection. 

46_DISPOSE_INTERVAL_SECONDS = 1800 

47_last_dispose_time = 0.0 

48 

49 

50def _pop_per_user_locks(username: str) -> None: 

51 """Pop ``username`` from the four module-level per-user lock dicts. 

52 

53 The library-init, backup, queue-processor, and library-RAG modules 

54 each maintain a ``dict[..., threading.Lock]`` keyed by username (or 

55 by ``(username, ...)``) for serialising per-user critical sections. 

56 None of them had a removal hook, so without this the dicts 

57 accumulated one entry per username across the process lifetime — 

58 bounded by total users (~296 bytes/entry × 4 = ~1.2 KB/user/dict) 

59 but real on long-lived self-hosted instances with churn. Called 

60 from the user-close paths so the next login starts with fresh 

61 locks (the locks hold no state worth preserving across 

62 login/logout). 

63 

64 Lazy-imported here to keep this module's import graph shallow: 

65 ``connection_cleanup`` runs at startup and shouldn't pull in the 

66 queue / backup / library-init / library-RAG modules eagerly. 

67 """ 

68 try: 

69 from ...database.library_init import pop_user_init_lock 

70 

71 pop_user_init_lock(username) 

72 except Exception: 

73 # Surface at WARNING to match the sibling scheduler-unregister 

74 # error handler in this same module (line ~111). A failure 

75 # here means the lock-dict entry will accumulate on every 

76 # subsequent close cycle for this user; we want it visible. 

77 logger.warning(f"Failed to pop _user_init_locks for {username}") 

78 

79 try: 

80 from ...database.backup.backup_service import pop_user_lock 

81 

82 pop_user_lock(username) 

83 except Exception: 

84 logger.warning(f"Failed to pop _user_locks for {username}") 

85 

86 try: 

87 from ...web.queue.processor_v2 import queue_processor 

88 

89 queue_processor.pop_user_critical_lock(username) 

90 except Exception: 

91 logger.warning(f"Failed to pop _user_critical_locks for {username}") 

92 

93 try: 

94 from ...research_library.services.library_rag_service import ( 

95 pop_faiss_locks_for_user, 

96 ) 

97 

98 pop_faiss_locks_for_user(username) 

99 except Exception: 

100 logger.warning(f"Failed to pop _faiss_write_locks for {username}") 

101 

102 

103def _count_open_fds() -> int: 

104 """Count open file descriptors for the current process.""" 

105 proc_fd = Path("/proc/self/fd") 

106 if proc_fd.is_dir(): 

107 try: 

108 return len(list(proc_fd.iterdir())) 

109 except OSError: 

110 pass 

111 import resource 

112 

113 soft_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] 

114 count = 0 

115 for fd in range(soft_limit): 

116 try: 

117 os.fstat(fd) 

118 count += 1 

119 except OSError: 

120 pass 

121 return count 

122 

123 

124def cleanup_idle_connections(session_manager, db_manager): 

125 """Close db connections for users with no active sessions and no active research.""" 

126 # 1. Purge expired sessions first 

127 session_manager.cleanup_expired_sessions() 

128 

129 # 2. Get protected usernames (active sessions OR active research) 

130 active_usernames = session_manager.get_active_usernames() 

131 researching_usernames = get_usernames_with_active_research() 

132 protected = active_usernames | researching_usernames 

133 

134 # 3. Get usernames with open connections 

135 connected_usernames = db_manager.get_connected_usernames() 

136 

137 # 4. Find idle candidates 

138 candidates = connected_usernames - protected 

139 

140 # 5. Double-check before closing (narrows race window) 

141 closed = 0 

142 for username in candidates: 

143 if session_manager.has_active_sessions_for(username): 

144 logger.debug( 

145 f"Skipped {username} (active session appeared since snapshot)" 

146 ) 

147 continue # User logged in since snapshot 

148 if username in get_usernames_with_active_research(): 

149 logger.debug( 

150 f"Skipped {username} (active research appeared since snapshot)" 

151 ) 

152 continue # Research started since snapshot 

153 # Unregister news scheduler jobs (matches logout pattern in routes.py) 

154 try: 

155 from ...scheduler.background import ( 

156 get_background_job_scheduler, 

157 ) 

158 

159 sched = get_background_job_scheduler() 

160 if sched.is_running: 

161 sched.unregister_user(username) 

162 except Exception: 

163 logger.warning( 

164 f"Failed to unregister scheduler for {username}", 

165 ) 

166 try: 

167 db_manager.close_user_database(username) 

168 session_password_store.clear_all_for_user(username) 

169 closed += 1 

170 logger.debug(f"Closed idle connection for {username}") 

171 except Exception: 

172 logger.warning(f"Connection cleanup failed for {username}") 

173 # Pop lock-dict entries regardless of whether close succeeded. 

174 # The lock-dict cleanup is independent of DB-engine teardown; 

175 # putting it inside the try above would skip pop on the very 

176 # failure path it most matters for (close raises -> next login 

177 # rebuilds the engine but the stale lock entries linger). 

178 _pop_per_user_locks(username) 

179 

180 if closed: 

181 logger.info(f"Connection cleanup: closed {closed} idle connection(s)") 

182 logger.debug( 

183 f"Connection cleanup: evaluated {len(candidates)} candidate(s), " 

184 f"closed {closed}, protected {len(protected)} active user(s)" 

185 ) 

186 

187 # Sweep dead-thread sessions and credentials — safety net when neither 

188 # HTTP requests nor the queue processor are triggering sweeps. 

189 cleanup_dead_threads() 

190 

191 # --- Periodic pool dispose to release WAL/SHM handles --- 

192 # SQLCipher + WAL mode accumulates file handles when QueuePool recycles 

193 # connections out of open-order (ADR-0004). Periodically calling 

194 # dispose() on all engines closes ALL pooled connections, releasing any 

195 # leaked handles. The pool is transparently recreated on the next DB 

196 # operation. 

197 # 

198 # Safe to run against engines with checked-out connections: SA 2.0 

199 # `QueuePool.dispose` only drains idle queue entries and 

200 # `Engine.dispose` calls `pool.recreate()`; a thread holding a 

201 # checked-out connection keeps using it until return. SA docs are 

202 # explicit — "Connections that are still checked out will not be 

203 # closed". The post-login bulk write (_perform_post_login_tasks in 

204 # web/auth/routes.py) is additionally protected by being a single 

205 # atomic transaction, so any interruption (dispose, crash, OOM) 

206 # rolls back cleanly without leaving partial state. 

207 # 

208 # Do not add a `checkedout() > 0` skip guard here without first 

209 # reproducing a real torn-write against the actual SA source path: 

210 # see PR #3487 discussion — the speculative skip introduces an 

211 # unbounded "skip forever" risk on busy engines in exchange for 

212 # preventing a failure mode that SA 2.0 does not produce. 

213 global _last_dispose_time 

214 now = time.monotonic() 

215 if now - _last_dispose_time >= _DISPOSE_INTERVAL_SECONDS: 

216 _last_dispose_time = now 

217 disposed = 0 

218 with db_manager._connections_lock: 

219 for username, engine in list(db_manager.connections.items()): 

220 try: 

221 db_manager._checkpoint_wal(engine, f"for {username}") 

222 engine.dispose() 

223 disposed += 1 

224 except Exception as exc: 

225 # Surface the failure. Pre-fix this was logger.debug, 

226 # which hid the symptom — if WAL checkpoint or pool 

227 # dispose repeatedly fails (disk pressure, lock 

228 # starvation, etc.) the WAL file silently grows on 

229 # disk and pooled connections leak. The 30-min 

230 # periodic-dispose workaround for ADR-0004's WAL/SHM 

231 # handle leak depends on this loop succeeding. 

232 # 

233 # Only the exception's TYPE NAME is logged, matching 

234 # the codebase's `_report_silent_exception` pattern 

235 # (utilities/log_utils.py:146-194). The exception 

236 # value itself can carry sensitive locals (DB paths, 

237 # query fragments, etc.) and our sensitive-logging 

238 # hook flags any `f"...{exc}"` interpolation. 

239 exc_type = type(exc).__name__ 

240 logger.warning( 

241 f"Error disposing engine for {username}: {exc_type}" 

242 ) 

243 if disposed: 

244 logger.info( 

245 f"Pool dispose: reset {disposed} engine(s) to release " 

246 f"WAL/SHM handles" 

247 ) 

248 

249 # --- FD monitoring --- 

250 try: 

251 fd_count = _count_open_fds() 

252 pool_engine_count = len(db_manager.connections) 

253 pool_checked_out = 0 

254 with db_manager._connections_lock: 

255 for engine in db_manager.connections.values(): 

256 try: 

257 pool_checked_out += engine.pool.checkedout() 

258 except Exception: # noqa: silent-exception 

259 pass 

260 logger.debug( 

261 f"Resource monitor: open_fds={fd_count}, " 

262 f"pool_engines={pool_engine_count}, " 

263 f"pool_checked_out={pool_checked_out}, " 

264 f"protected_users={len(protected)}" 

265 ) 

266 if fd_count > 800: 

267 logger.warning( 

268 f"High FD count ({fd_count}) — approaching system limit. " 

269 f"Check for resource leaks." 

270 ) 

271 except Exception: 

272 logger.debug("FD monitoring failed") # noqa: silent-exception 

273 

274 

275def start_connection_cleanup_scheduler( 

276 session_manager, db_manager, interval_seconds=300 

277): 

278 """Start APScheduler job for periodic connection cleanup. 

279 

280 Args: 

281 session_manager: The SessionManager singleton. 

282 db_manager: The DatabaseManager singleton. 

283 interval_seconds: How often to run cleanup (default: 5 minutes). 

284 

285 Returns: 

286 The BackgroundScheduler instance (for shutdown registration). 

287 """ 

288 scheduler = BackgroundScheduler() 

289 scheduler.add_job( 

290 cleanup_idle_connections, 

291 "interval", 

292 seconds=interval_seconds, 

293 args=[session_manager, db_manager], 

294 id="cleanup_idle_connections", 

295 jitter=30, 

296 ) 

297 scheduler.start() 

298 logger.info( 

299 f"Connection cleanup scheduler started " 

300 f"(interval={interval_seconds}s, jitter=30s)" 

301 ) 

302 return scheduler