Coverage for src/local_deep_research/web/routes/globals.py: 99%
123 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Thread-safe global state management.
4Wraps two module-level dicts (`_active_research`, `_termination_flags`)
5with accessor functions protected by a single ``threading.RLock``.
6All external code should use the accessor functions instead of touching the
7dicts directly.
8"""
10import threading
12# ---------------------------------------------------------------------------
13# Internal state — never import these directly from other modules
14# ---------------------------------------------------------------------------
15_active_research: dict[int, dict] = {}
16_termination_flags: dict[int, bool] = {}
18# A single lock protects both dicts. They have strongly correlated lifecycles
19# (entries are created/destroyed together) and operations are fast dict lookups,
20# so a single lock is simpler and eliminates any deadlock risk from lock ordering.
21_lock = threading.RLock()
24# ===================================================================
25# active_research accessors
26# ===================================================================
29def is_research_active(research_id):
30 """Return True if *research_id* is in the active-research dict."""
31 with _lock:
32 return research_id in _active_research
35def get_active_research_ids():
36 """Return a list of all active research IDs (snapshot)."""
37 with _lock:
38 return list(_active_research.keys())
41def get_active_research_snapshot(research_id):
42 """Return a safe snapshot of an active-research entry, or ``None``.
44 The returned dict contains only serialisable fields (``thread`` is
45 excluded). The ``log`` list is shallow-copied — individual entries
46 are never mutated after creation, so this is safe.
47 """
48 with _lock:
49 entry = _active_research.get(research_id)
50 if entry is None:
51 return None
52 return {
53 "progress": entry.get("progress", 0),
54 "status": entry.get("status"),
55 "log": list(entry.get("log", [])),
56 "settings": dict(s)
57 if (s := entry.get("settings")) is not None
58 else None,
59 }
62def get_research_field(research_id, field, default=None):
63 """Return a single field from an active-research entry.
65 For mutable fields (``list``, ``dict``) a shallow copy is returned so
66 callers cannot accidentally mutate shared state.
67 """
68 with _lock:
69 entry = _active_research.get(research_id)
70 if entry is None:
71 return default
72 value = entry.get(field, default)
73 # We explicitly check for list/dict rather than using copy.copy()
74 # because entries may contain threading.Thread objects —
75 # copy.copy(Thread) creates a broken shallow copy sharing the same
76 # OS thread ident. Scalars (int, str, bool) are immutable and safe.
77 # For Thread objects, access them via iter_active_research() snapshots.
78 if isinstance(value, list):
79 return list(value)
80 if isinstance(value, dict):
81 return dict(value)
82 return value
85def set_active_research(research_id, data):
86 """Insert or replace the active-research entry for *research_id*."""
87 with _lock:
88 _active_research[research_id] = data
91def check_and_start_research(research_id, data) -> bool:
92 """Atomically register a research entry iff no live thread exists.
94 If ``_active_research[research_id]`` already holds an entry whose
95 ``thread`` is alive, returns ``False`` without mutating state and
96 without calling ``.start()``. Otherwise, starts ``data['thread']``
97 and writes *data* into the active-research dict, then returns ``True``.
99 The entire check-and-start is done under the single shared lock, so
100 two concurrent callers with the same *research_id* cannot both pass
101 the liveness check and end up with two live threads for the same ID.
102 """
103 thread = data.get("thread") if isinstance(data, dict) else None
104 if thread is None:
105 raise ValueError("data must contain a 'thread' entry")
106 with _lock:
107 entry = _active_research.get(research_id)
108 if entry is not None:
109 existing = entry.get("thread")
110 if existing is not None and existing.is_alive():
111 return False
112 thread.start()
113 _active_research[research_id] = data
114 return True
117def update_active_research(research_id, **fields):
118 """Update one or more fields on an existing entry.
120 Silently does nothing if *research_id* is not active.
121 """
122 with _lock:
123 entry = _active_research.get(research_id)
124 if entry is not None:
125 entry.update(fields)
128def append_research_log(research_id, log_entry):
129 """Append *log_entry* to the ``log`` list for *research_id*.
131 Silently does nothing if *research_id* is not active.
132 """
133 with _lock:
134 entry = _active_research.get(research_id)
135 if entry is not None:
136 entry.setdefault("log", []).append(log_entry)
139def update_progress_if_higher(research_id, new_progress):
140 """Atomically update progress only if *new_progress* exceeds current.
142 Returns the resulting progress value, or ``None`` if the research is
143 not active.
144 """
145 with _lock:
146 entry = _active_research.get(research_id)
147 if entry is None:
148 return None
149 current = entry.get("progress", 0)
150 if new_progress is not None and new_progress > current:
151 entry["progress"] = new_progress
152 return new_progress
153 return current
156def remove_active_research(research_id):
157 """Remove *research_id* from the active-research dict (if present)."""
158 with _lock:
159 _active_research.pop(research_id, None)
162def iter_active_research():
163 """Yield ``(research_id, snapshot)`` pairs for all active research.
165 Each *snapshot* is a shallow copy of the entry dict, safe to read
166 outside the lock.
167 """
168 with _lock:
169 items = [
170 (
171 rid,
172 {
173 "progress": entry.get("progress", 0),
174 "status": entry.get("status"),
175 "log": list(entry.get("log", [])),
176 "settings": dict(s)
177 if (s := entry.get("settings")) is not None
178 else None,
179 },
180 )
181 for rid, entry in _active_research.items()
182 ]
183 for rid, data in items:
184 yield rid, data
187def get_active_research_count():
188 """Return the number of active research entries."""
189 with _lock:
190 return len(_active_research)
193def get_usernames_with_active_research() -> set:
194 """Return set of usernames that have research currently running."""
195 with _lock:
196 return {
197 entry.get("settings", {}).get("username")
198 for entry in _active_research.values()
199 if entry.get("settings", {}).get("username")
200 }
203# ===================================================================
204# termination_flags accessors
205# ===================================================================
208def is_termination_requested(research_id):
209 """Return ``True`` if termination was requested for *research_id*."""
210 with _lock:
211 return _termination_flags.get(research_id, False)
214def set_termination_flag(research_id):
215 """Signal that *research_id* should be terminated."""
216 with _lock:
217 _termination_flags[research_id] = True
220def clear_termination_flag(research_id):
221 """Remove the termination flag for *research_id* (if present)."""
222 with _lock:
223 _termination_flags.pop(research_id, None)
226# ===================================================================
227# Compound / cleanup helpers
228# ===================================================================
231def is_research_thread_alive(research_id):
232 """Return ``True`` if the research thread for *research_id* is alive.
234 Returns ``False`` if the research is not active or has no thread.
235 """
236 with _lock:
237 entry = _active_research.get(research_id)
238 if entry is None:
239 return False
240 thread = entry.get("thread")
241 return thread is not None and thread.is_alive()
244def update_progress_and_check_active(research_id, new_progress):
245 """Atomically update progress (if higher) and check if research is active.
247 Returns ``(progress_value, is_active)`` where *progress_value* is the
248 resulting progress (or ``None`` if not active) and *is_active* indicates
249 whether *research_id* is still in the active-research dict.
250 """
251 with _lock:
252 entry = _active_research.get(research_id)
253 if entry is None:
254 return (None, False)
255 current = entry.get("progress", 0)
256 if new_progress is not None and new_progress > current:
257 entry["progress"] = new_progress
258 return (new_progress, True)
259 return (current, True)
262def cleanup_research(research_id):
263 """Remove *research_id* from both active_research and termination_flags
264 atomically under the single shared lock.
265 """
266 with _lock:
267 _active_research.pop(research_id, None)
268 _termination_flags.pop(research_id, None)
271def reclaim_stale_user_active_research(
272 db_session, username, *, grace_cutoff_dt=None, logger=None
273):
274 """Flip ``UserActiveResearch`` rows whose worker thread is dead.
276 Shared between ``research_routes.start_research`` (no grace window)
277 and ``chat.routes.send_message`` (30-second grace window) — both
278 sites historically iterated the same query / status flip / cleanup
279 pattern inline; consolidating here makes the difference (grace vs.
280 no grace) explicit instead of two near-duplicate copies drifting.
282 The helper does NOT commit — callers compose this with surrounding
283 DB writes and commit once at the end. Returns ``True`` if any rows
284 were reclaimed so the caller can decide whether a commit is needed.
286 Args:
287 db_session: open SQLAlchemy session against the user's DB.
288 username: the per-user DB owner (used to scope the query).
289 grace_cutoff_dt: optional ``datetime`` boundary; rows whose
290 ``started_at`` is at-or-after this are skipped (avoids
291 killing a sibling request's just-spawned thread). ``None``
292 disables the grace filter — matches the
293 ``research_routes.start_research`` original behaviour.
294 logger: optional ``loguru``-style logger for the reclaim
295 audit line. Both call sites log at WARNING so operators can
296 trace why an active-research cap was released.
298 Returns:
299 ``True`` if any row was flipped (caller should commit), else
300 ``False``.
301 """
302 # Lazy import to avoid pulling SQLAlchemy at module import time;
303 # this helper is only called from the route paths that already have
304 # the ORM models in scope.
305 from ...constants import ResearchStatus
306 from ...database.models import UserActiveResearch
308 query = db_session.query(UserActiveResearch).filter(
309 UserActiveResearch.username == username,
310 UserActiveResearch.status == ResearchStatus.IN_PROGRESS,
311 )
312 if grace_cutoff_dt is not None:
313 query = query.filter(UserActiveResearch.started_at < grace_cutoff_dt)
315 reclaimed = False
316 for row in query.all():
317 if is_research_thread_alive(row.research_id):
318 continue
319 if logger is not None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 logger.warning(
321 "Reclaiming stale UserActiveResearch {short_id}... "
322 "(thread dead) for user {user}",
323 short_id=row.research_id[:8],
324 user=username,
325 )
326 row.status = ResearchStatus.FAILED
327 cleanup_research(row.research_id)
328 reclaimed = True
329 return reclaimed