Coverage for src / local_deep_research / web / auth / connection_cleanup.py: 65%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Automatic cleanup of idle database connections. 

3 

4Periodically closes database connections for users who have no active sessions 

5and no active research, preventing resource leaks when users close their browser 

6without logging out. 

7 

8Also periodically disposes all QueuePool engines to release accumulated WAL/SHM 

9file handles. See ADR-0004 for why this is necessary with SQLCipher + WAL mode. 

10""" 

11 

12import os 

13import time 

14from pathlib import Path 

15 

16from apscheduler.schedulers.background import BackgroundScheduler 

17from loguru import logger 

18 

19from ...database.session_passwords import session_password_store 

20from ...database.thread_local_session import cleanup_dead_threads 

21from ...web.routes.globals import get_usernames_with_active_research 

22 

23# --------------------------------------------------------------------------- 

24# File Descriptor Monitoring 

25# --------------------------------------------------------------------------- 

26# WHY: After days of idle operation in Docker, the app crashed with 

27# OSError: [Errno 24] Too many open files 

28# This monitoring logs the FD count every 5 minutes so we can correlate 

29# FD growth with specific events and find leaks. 

30# 

31# WHAT IT LOGS: 

32# - open_fds: total open file descriptors for the process 

33# - pool_engines: number of per-user QueuePool engines 

34# - pool_checked_out: connections currently checked out from QueuePool 

35# - protected_users: users with active sessions 

36# 

37# HOW TO USE: grep "Resource monitor" in container logs. If open_fds 

38# grows steadily over hours, something is leaking. 

39# --------------------------------------------------------------------------- 

40 

41# Dispose all pool engines every 30 minutes to release WAL/SHM handles. 

42# SQLCipher + WAL mode leaks handles when connections close out of order 

43# (which QueuePool's pool_recycle causes). Periodic dispose() closes ALL 

44# pooled connections at once, resetting the handle state cleanly. 

45# The next DB operation transparently reopens a fresh connection. 

46_DISPOSE_INTERVAL_SECONDS = 1800 

47_last_dispose_time = 0.0 

48 

49 

50def _count_open_fds() -> int: 

51 """Count open file descriptors for the current process.""" 

52 proc_fd = Path("/proc/self/fd") 

53 if proc_fd.is_dir(): 53 ↛ 58line 53 didn't jump to line 58 because the condition on line 53 was always true

54 try: 

55 return len(list(proc_fd.iterdir())) 

56 except OSError: 

57 pass 

58 import resource 

59 

60 soft_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] 

61 count = 0 

62 for fd in range(soft_limit): 

63 try: 

64 os.fstat(fd) 

65 count += 1 

66 except OSError: 

67 pass 

68 return count 

69 

70 

71def cleanup_idle_connections(session_manager, db_manager): 

72 """Close db connections for users with no active sessions and no active research.""" 

73 # 1. Purge expired sessions first 

74 session_manager.cleanup_expired_sessions() 

75 

76 # 2. Get protected usernames (active sessions OR active research) 

77 active_usernames = session_manager.get_active_usernames() 

78 researching_usernames = get_usernames_with_active_research() 

79 protected = active_usernames | researching_usernames 

80 

81 # 3. Get usernames with open connections 

82 connected_usernames = db_manager.get_connected_usernames() 

83 

84 # 4. Find idle candidates 

85 candidates = connected_usernames - protected 

86 

87 # 5. Double-check before closing (narrows race window) 

88 closed = 0 

89 for username in candidates: 

90 if session_manager.has_active_sessions_for(username): 

91 logger.debug( 

92 f"Skipped {username} (active session appeared since snapshot)" 

93 ) 

94 continue # User logged in since snapshot 

95 if username in get_usernames_with_active_research(): 

96 logger.debug( 

97 f"Skipped {username} (active research appeared since snapshot)" 

98 ) 

99 continue # Research started since snapshot 

100 # Unregister news scheduler jobs (matches logout pattern in routes.py) 

101 try: 

102 from ...news.subscription_manager.scheduler import ( 

103 get_news_scheduler, 

104 ) 

105 

106 sched = get_news_scheduler() 

107 if sched.is_running: 107 ↛ 113line 107 didn't jump to line 113 because the condition on line 107 was always true

108 sched.unregister_user(username) 

109 except Exception: 

110 logger.warning( 

111 f"Failed to unregister scheduler for {username}", 

112 ) 

113 try: 

114 db_manager.close_user_database(username) 

115 session_password_store.clear_all_for_user(username) 

116 closed += 1 

117 logger.debug(f"Closed idle connection for {username}") 

118 except Exception: 

119 logger.warning(f"Connection cleanup failed for {username}") 

120 

121 if closed: 

122 logger.info(f"Connection cleanup: closed {closed} idle connection(s)") 

123 logger.debug( 

124 f"Connection cleanup: evaluated {len(candidates)} candidate(s), " 

125 f"closed {closed}, protected {len(protected)} active user(s)" 

126 ) 

127 

128 # Sweep dead-thread sessions and credentials — safety net when neither 

129 # HTTP requests nor the queue processor are triggering sweeps. 

130 cleanup_dead_threads() 

131 

132 # --- Periodic pool dispose to release WAL/SHM handles --- 

133 # SQLCipher + WAL mode accumulates file handles when QueuePool recycles 

134 # connections out of open-order. Periodically calling dispose() on all 

135 # engines closes ALL pooled connections, releasing any leaked handles. 

136 # The pool is transparently recreated on the next DB operation. 

137 global _last_dispose_time 

138 now = time.monotonic() 

139 if now - _last_dispose_time >= _DISPOSE_INTERVAL_SECONDS: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 _last_dispose_time = now 

141 disposed = 0 

142 with db_manager._connections_lock: 

143 for username, engine in list(db_manager.connections.items()): 

144 try: 

145 engine.dispose() 

146 disposed += 1 

147 except Exception: 

148 logger.debug(f"Error disposing engine for {username}") 

149 if disposed: 

150 logger.info( 

151 f"Pool dispose: reset {disposed} engine(s) to release " 

152 f"WAL/SHM handles" 

153 ) 

154 

155 # --- FD monitoring --- 

156 try: 

157 fd_count = _count_open_fds() 

158 pool_engine_count = len(db_manager.connections) 

159 pool_checked_out = 0 

160 with db_manager._connections_lock: 

161 for engine in db_manager.connections.values(): 161 ↛ 162line 161 didn't jump to line 162 because the loop on line 161 never started

162 try: 

163 pool_checked_out += engine.pool.checkedout() 

164 except Exception: # noqa: silent-exception 

165 pass 

166 logger.debug( 

167 f"Resource monitor: open_fds={fd_count}, " 

168 f"pool_engines={pool_engine_count}, " 

169 f"pool_checked_out={pool_checked_out}, " 

170 f"protected_users={len(protected)}" 

171 ) 

172 if fd_count > 800: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 logger.warning( 

174 f"High FD count ({fd_count}) — approaching system limit. " 

175 f"Check for resource leaks." 

176 ) 

177 except Exception: 

178 logger.debug("FD monitoring failed") # noqa: silent-exception 

179 

180 

181def start_connection_cleanup_scheduler( 

182 session_manager, db_manager, interval_seconds=300 

183): 

184 """Start APScheduler job for periodic connection cleanup. 

185 

186 Args: 

187 session_manager: The SessionManager singleton. 

188 db_manager: The DatabaseManager singleton. 

189 interval_seconds: How often to run cleanup (default: 5 minutes). 

190 

191 Returns: 

192 The BackgroundScheduler instance (for shutdown registration). 

193 """ 

194 scheduler = BackgroundScheduler() 

195 scheduler.add_job( 

196 cleanup_idle_connections, 

197 "interval", 

198 seconds=interval_seconds, 

199 args=[session_manager, db_manager], 

200 id="cleanup_idle_connections", 

201 jitter=30, 

202 ) 

203 scheduler.start() 

204 logger.info( 

205 f"Connection cleanup scheduler started " 

206 f"(interval={interval_seconds}s, jitter=30s)" 

207 ) 

208 return scheduler