Coverage for src / local_deep_research / web / routes / globals.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Thread-safe global state management. 

3 

4Wraps two module-level dicts (`_active_research`, `_termination_flags`) 

5with accessor functions protected by a single ``threading.RLock``. 

6All external code should use the accessor functions instead of touching the 

7dicts directly. 

8""" 

9 

10import threading 

11 

12# --------------------------------------------------------------------------- 

13# Internal state — never import these directly from other modules 

14# --------------------------------------------------------------------------- 

15_active_research: dict[int, dict] = {} 

16_termination_flags: dict[int, bool] = {} 

17 

18# A single lock protects both dicts. They have strongly correlated lifecycles 

19# (entries are created/destroyed together) and operations are fast dict lookups, 

20# so a single lock is simpler and eliminates any deadlock risk from lock ordering. 

21_lock = threading.RLock() 

22 

23 

24# =================================================================== 

25# active_research accessors 

26# =================================================================== 

27 

28 

29def is_research_active(research_id): 

30 """Return True if *research_id* is in the active-research dict.""" 

31 with _lock: 

32 return research_id in _active_research 

33 

34 

35def get_active_research_ids(): 

36 """Return a list of all active research IDs (snapshot).""" 

37 with _lock: 

38 return list(_active_research.keys()) 

39 

40 

41def get_active_research_snapshot(research_id): 

42 """Return a safe snapshot of an active-research entry, or ``None``. 

43 

44 The returned dict contains only serialisable fields (``thread`` is 

45 excluded). The ``log`` list is shallow-copied — individual entries 

46 are never mutated after creation, so this is safe. 

47 """ 

48 with _lock: 

49 entry = _active_research.get(research_id) 

50 if entry is None: 

51 return None 

52 return { 

53 "progress": entry.get("progress", 0), 

54 "status": entry.get("status"), 

55 "log": list(entry.get("log", [])), 

56 "settings": dict(s) 

57 if (s := entry.get("settings")) is not None 

58 else None, 

59 } 

60 

61 

62def get_research_field(research_id, field, default=None): 

63 """Return a single field from an active-research entry. 

64 

65 For mutable fields (``list``, ``dict``) a shallow copy is returned so 

66 callers cannot accidentally mutate shared state. 

67 """ 

68 with _lock: 

69 entry = _active_research.get(research_id) 

70 if entry is None: 

71 return default 

72 value = entry.get(field, default) 

73 # We explicitly check for list/dict rather than using copy.copy() 

74 # because entries may contain threading.Thread objects — 

75 # copy.copy(Thread) creates a broken shallow copy sharing the same 

76 # OS thread ident. Scalars (int, str, bool) are immutable and safe. 

77 # For Thread objects, access them via iter_active_research() snapshots. 

78 if isinstance(value, list): 

79 return list(value) 

80 if isinstance(value, dict): 

81 return dict(value) 

82 return value 

83 

84 

85def set_active_research(research_id, data): 

86 """Insert or replace the active-research entry for *research_id*.""" 

87 with _lock: 

88 _active_research[research_id] = data 

89 

90 

91def update_active_research(research_id, **fields): 

92 """Update one or more fields on an existing entry. 

93 

94 Silently does nothing if *research_id* is not active. 

95 """ 

96 with _lock: 

97 entry = _active_research.get(research_id) 

98 if entry is not None: 

99 entry.update(fields) 

100 

101 

102def append_research_log(research_id, log_entry): 

103 """Append *log_entry* to the ``log`` list for *research_id*. 

104 

105 Silently does nothing if *research_id* is not active. 

106 """ 

107 with _lock: 

108 entry = _active_research.get(research_id) 

109 if entry is not None: 

110 entry.setdefault("log", []).append(log_entry) 

111 

112 

113def update_progress_if_higher(research_id, new_progress): 

114 """Atomically update progress only if *new_progress* exceeds current. 

115 

116 Returns the resulting progress value, or ``None`` if the research is 

117 not active. 

118 """ 

119 with _lock: 

120 entry = _active_research.get(research_id) 

121 if entry is None: 

122 return None 

123 current = entry.get("progress", 0) 

124 if new_progress is not None and new_progress > current: 

125 entry["progress"] = new_progress 

126 return new_progress 

127 return current 

128 

129 

130def remove_active_research(research_id): 

131 """Remove *research_id* from the active-research dict (if present).""" 

132 with _lock: 

133 _active_research.pop(research_id, None) 

134 

135 

136def iter_active_research(): 

137 """Yield ``(research_id, snapshot)`` pairs for all active research. 

138 

139 Each *snapshot* is a shallow copy of the entry dict, safe to read 

140 outside the lock. 

141 """ 

142 with _lock: 

143 items = [ 

144 ( 

145 rid, 

146 { 

147 "progress": entry.get("progress", 0), 

148 "status": entry.get("status"), 

149 "log": list(entry.get("log", [])), 

150 "settings": dict(s) 

151 if (s := entry.get("settings")) is not None 

152 else None, 

153 }, 

154 ) 

155 for rid, entry in _active_research.items() 

156 ] 

157 for rid, data in items: 

158 yield rid, data 

159 

160 

161def get_active_research_count(): 

162 """Return the number of active research entries.""" 

163 with _lock: 

164 return len(_active_research) 

165 

166 

167def get_usernames_with_active_research() -> set: 

168 """Return set of usernames that have research currently running.""" 

169 with _lock: 

170 return { 

171 entry.get("settings", {}).get("username") 

172 for entry in _active_research.values() 

173 if entry.get("settings", {}).get("username") 

174 } 

175 

176 

177# =================================================================== 

178# termination_flags accessors 

179# =================================================================== 

180 

181 

182def is_termination_requested(research_id): 

183 """Return ``True`` if termination was requested for *research_id*.""" 

184 with _lock: 

185 return _termination_flags.get(research_id, False) 

186 

187 

188def set_termination_flag(research_id): 

189 """Signal that *research_id* should be terminated.""" 

190 with _lock: 

191 _termination_flags[research_id] = True 

192 

193 

194def clear_termination_flag(research_id): 

195 """Remove the termination flag for *research_id* (if present).""" 

196 with _lock: 

197 _termination_flags.pop(research_id, None) 

198 

199 

200# =================================================================== 

201# Compound / cleanup helpers 

202# =================================================================== 

203 

204 

205def is_research_thread_alive(research_id): 

206 """Return ``True`` if the research thread for *research_id* is alive. 

207 

208 Returns ``False`` if the research is not active or has no thread. 

209 """ 

210 with _lock: 

211 entry = _active_research.get(research_id) 

212 if entry is None: 

213 return False 

214 thread = entry.get("thread") 

215 return thread is not None and thread.is_alive() 

216 

217 

218def update_progress_and_check_active(research_id, new_progress): 

219 """Atomically update progress (if higher) and check if research is active. 

220 

221 Returns ``(progress_value, is_active)`` where *progress_value* is the 

222 resulting progress (or ``None`` if not active) and *is_active* indicates 

223 whether *research_id* is still in the active-research dict. 

224 """ 

225 with _lock: 

226 entry = _active_research.get(research_id) 

227 if entry is None: 

228 return (None, False) 

229 current = entry.get("progress", 0) 

230 if new_progress is not None and new_progress > current: 

231 entry["progress"] = new_progress 

232 return (new_progress, True) 

233 return (current, True) 

234 

235 

236def cleanup_research(research_id): 

237 """Remove *research_id* from both active_research and termination_flags 

238 atomically under the single shared lock. 

239 """ 

240 with _lock: 

241 _active_research.pop(research_id, None) 

242 _termination_flags.pop(research_id, None)