Coverage for src/local_deep_research/web/routes/globals.py: 99%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Thread-safe global state management. 

3 

4Wraps two module-level dicts (`_active_research`, `_termination_flags`) 

5with accessor functions protected by a single ``threading.RLock``. 

6All external code should use the accessor functions instead of touching the 

7dicts directly. 

8""" 

9 

10import threading 

11 

12# --------------------------------------------------------------------------- 

13# Internal state — never import these directly from other modules 

14# --------------------------------------------------------------------------- 

15_active_research: dict[int, dict] = {} 

16_termination_flags: dict[int, bool] = {} 

17 

18# A single lock protects both dicts. They have strongly correlated lifecycles 

19# (entries are created/destroyed together) and operations are fast dict lookups, 

20# so a single lock is simpler and eliminates any deadlock risk from lock ordering. 

21_lock = threading.RLock() 

22 

23 

24# =================================================================== 

25# active_research accessors 

26# =================================================================== 

27 

28 

29def is_research_active(research_id): 

30 """Return True if *research_id* is in the active-research dict.""" 

31 with _lock: 

32 return research_id in _active_research 

33 

34 

35def get_active_research_ids(): 

36 """Return a list of all active research IDs (snapshot).""" 

37 with _lock: 

38 return list(_active_research.keys()) 

39 

40 

41def get_active_research_snapshot(research_id): 

42 """Return a safe snapshot of an active-research entry, or ``None``. 

43 

44 The returned dict contains only serialisable fields (``thread`` is 

45 excluded). The ``log`` list is shallow-copied — individual entries 

46 are never mutated after creation, so this is safe. 

47 """ 

48 with _lock: 

49 entry = _active_research.get(research_id) 

50 if entry is None: 

51 return None 

52 return { 

53 "progress": entry.get("progress", 0), 

54 "status": entry.get("status"), 

55 "log": list(entry.get("log", [])), 

56 "settings": dict(s) 

57 if (s := entry.get("settings")) is not None 

58 else None, 

59 } 

60 

61 

62def get_research_field(research_id, field, default=None): 

63 """Return a single field from an active-research entry. 

64 

65 For mutable fields (``list``, ``dict``) a shallow copy is returned so 

66 callers cannot accidentally mutate shared state. 

67 """ 

68 with _lock: 

69 entry = _active_research.get(research_id) 

70 if entry is None: 

71 return default 

72 value = entry.get(field, default) 

73 # We explicitly check for list/dict rather than using copy.copy() 

74 # because entries may contain threading.Thread objects — 

75 # copy.copy(Thread) creates a broken shallow copy sharing the same 

76 # OS thread ident. Scalars (int, str, bool) are immutable and safe. 

77 # For Thread objects, access them via iter_active_research() snapshots. 

78 if isinstance(value, list): 

79 return list(value) 

80 if isinstance(value, dict): 

81 return dict(value) 

82 return value 

83 

84 

85def set_active_research(research_id, data): 

86 """Insert or replace the active-research entry for *research_id*.""" 

87 with _lock: 

88 _active_research[research_id] = data 

89 

90 

91def check_and_start_research(research_id, data) -> bool: 

92 """Atomically register a research entry iff no live thread exists. 

93 

94 If ``_active_research[research_id]`` already holds an entry whose 

95 ``thread`` is alive, returns ``False`` without mutating state and 

96 without calling ``.start()``. Otherwise, starts ``data['thread']`` 

97 and writes *data* into the active-research dict, then returns ``True``. 

98 

99 The entire check-and-start is done under the single shared lock, so 

100 two concurrent callers with the same *research_id* cannot both pass 

101 the liveness check and end up with two live threads for the same ID. 

102 """ 

103 thread = data.get("thread") if isinstance(data, dict) else None 

104 if thread is None: 

105 raise ValueError("data must contain a 'thread' entry") 

106 with _lock: 

107 entry = _active_research.get(research_id) 

108 if entry is not None: 

109 existing = entry.get("thread") 

110 if existing is not None and existing.is_alive(): 

111 return False 

112 thread.start() 

113 _active_research[research_id] = data 

114 return True 

115 

116 

117def update_active_research(research_id, **fields): 

118 """Update one or more fields on an existing entry. 

119 

120 Silently does nothing if *research_id* is not active. 

121 """ 

122 with _lock: 

123 entry = _active_research.get(research_id) 

124 if entry is not None: 

125 entry.update(fields) 

126 

127 

128def append_research_log(research_id, log_entry): 

129 """Append *log_entry* to the ``log`` list for *research_id*. 

130 

131 Silently does nothing if *research_id* is not active. 

132 """ 

133 with _lock: 

134 entry = _active_research.get(research_id) 

135 if entry is not None: 

136 entry.setdefault("log", []).append(log_entry) 

137 

138 

139def update_progress_if_higher(research_id, new_progress): 

140 """Atomically update progress only if *new_progress* exceeds current. 

141 

142 Returns the resulting progress value, or ``None`` if the research is 

143 not active. 

144 """ 

145 with _lock: 

146 entry = _active_research.get(research_id) 

147 if entry is None: 

148 return None 

149 current = entry.get("progress", 0) 

150 if new_progress is not None and new_progress > current: 

151 entry["progress"] = new_progress 

152 return new_progress 

153 return current 

154 

155 

156def remove_active_research(research_id): 

157 """Remove *research_id* from the active-research dict (if present).""" 

158 with _lock: 

159 _active_research.pop(research_id, None) 

160 

161 

162def iter_active_research(): 

163 """Yield ``(research_id, snapshot)`` pairs for all active research. 

164 

165 Each *snapshot* is a shallow copy of the entry dict, safe to read 

166 outside the lock. 

167 """ 

168 with _lock: 

169 items = [ 

170 ( 

171 rid, 

172 { 

173 "progress": entry.get("progress", 0), 

174 "status": entry.get("status"), 

175 "log": list(entry.get("log", [])), 

176 "settings": dict(s) 

177 if (s := entry.get("settings")) is not None 

178 else None, 

179 }, 

180 ) 

181 for rid, entry in _active_research.items() 

182 ] 

183 for rid, data in items: 

184 yield rid, data 

185 

186 

187def get_active_research_count(): 

188 """Return the number of active research entries.""" 

189 with _lock: 

190 return len(_active_research) 

191 

192 

193def get_usernames_with_active_research() -> set: 

194 """Return set of usernames that have research currently running.""" 

195 with _lock: 

196 return { 

197 entry.get("settings", {}).get("username") 

198 for entry in _active_research.values() 

199 if entry.get("settings", {}).get("username") 

200 } 

201 

202 

203# =================================================================== 

204# termination_flags accessors 

205# =================================================================== 

206 

207 

208def is_termination_requested(research_id): 

209 """Return ``True`` if termination was requested for *research_id*.""" 

210 with _lock: 

211 return _termination_flags.get(research_id, False) 

212 

213 

214def set_termination_flag(research_id): 

215 """Signal that *research_id* should be terminated.""" 

216 with _lock: 

217 _termination_flags[research_id] = True 

218 

219 

220def clear_termination_flag(research_id): 

221 """Remove the termination flag for *research_id* (if present).""" 

222 with _lock: 

223 _termination_flags.pop(research_id, None) 

224 

225 

226# =================================================================== 

227# Compound / cleanup helpers 

228# =================================================================== 

229 

230 

231def is_research_thread_alive(research_id): 

232 """Return ``True`` if the research thread for *research_id* is alive. 

233 

234 Returns ``False`` if the research is not active or has no thread. 

235 """ 

236 with _lock: 

237 entry = _active_research.get(research_id) 

238 if entry is None: 

239 return False 

240 thread = entry.get("thread") 

241 return thread is not None and thread.is_alive() 

242 

243 

244def update_progress_and_check_active(research_id, new_progress): 

245 """Atomically update progress (if higher) and check if research is active. 

246 

247 Returns ``(progress_value, is_active)`` where *progress_value* is the 

248 resulting progress (or ``None`` if not active) and *is_active* indicates 

249 whether *research_id* is still in the active-research dict. 

250 """ 

251 with _lock: 

252 entry = _active_research.get(research_id) 

253 if entry is None: 

254 return (None, False) 

255 current = entry.get("progress", 0) 

256 if new_progress is not None and new_progress > current: 

257 entry["progress"] = new_progress 

258 return (new_progress, True) 

259 return (current, True) 

260 

261 

262def cleanup_research(research_id): 

263 """Remove *research_id* from both active_research and termination_flags 

264 atomically under the single shared lock. 

265 """ 

266 with _lock: 

267 _active_research.pop(research_id, None) 

268 _termination_flags.pop(research_id, None) 

269 

270 

271def reclaim_stale_user_active_research( 

272 db_session, username, *, grace_cutoff_dt=None, logger=None 

273): 

274 """Flip ``UserActiveResearch`` rows whose worker thread is dead. 

275 

276 Shared between ``research_routes.start_research`` (no grace window) 

277 and ``chat.routes.send_message`` (30-second grace window) — both 

278 sites historically iterated the same query / status flip / cleanup 

279 pattern inline; consolidating here makes the difference (grace vs. 

280 no grace) explicit instead of two near-duplicate copies drifting. 

281 

282 The helper does NOT commit — callers compose this with surrounding 

283 DB writes and commit once at the end. Returns ``True`` if any rows 

284 were reclaimed so the caller can decide whether a commit is needed. 

285 

286 Args: 

287 db_session: open SQLAlchemy session against the user's DB. 

288 username: the per-user DB owner (used to scope the query). 

289 grace_cutoff_dt: optional ``datetime`` boundary; rows whose 

290 ``started_at`` is at-or-after this are skipped (avoids 

291 killing a sibling request's just-spawned thread). ``None`` 

292 disables the grace filter — matches the 

293 ``research_routes.start_research`` original behaviour. 

294 logger: optional ``loguru``-style logger for the reclaim 

295 audit line. Both call sites log at WARNING so operators can 

296 trace why an active-research cap was released. 

297 

298 Returns: 

299 ``True`` if any row was flipped (caller should commit), else 

300 ``False``. 

301 """ 

302 # Lazy import to avoid pulling SQLAlchemy at module import time; 

303 # this helper is only called from the route paths that already have 

304 # the ORM models in scope. 

305 from ...constants import ResearchStatus 

306 from ...database.models import UserActiveResearch 

307 

308 query = db_session.query(UserActiveResearch).filter( 

309 UserActiveResearch.username == username, 

310 UserActiveResearch.status == ResearchStatus.IN_PROGRESS, 

311 ) 

312 if grace_cutoff_dt is not None: 

313 query = query.filter(UserActiveResearch.started_at < grace_cutoff_dt) 

314 

315 reclaimed = False 

316 for row in query.all(): 

317 if is_research_thread_alive(row.research_id): 

318 continue 

319 if logger is not None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 logger.warning( 

321 "Reclaiming stale UserActiveResearch {short_id}... " 

322 "(thread dead) for user {user}", 

323 short_id=row.research_id[:8], 

324 user=username, 

325 ) 

326 row.status = ResearchStatus.FAILED 

327 cleanup_research(row.research_id) 

328 reclaimed = True 

329 return reclaimed