Coverage for src / local_deep_research / web / routes / globals.py: 100%
94 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Thread-safe global state management.
4Wraps two module-level dicts (`_active_research`, `_termination_flags`)
5with accessor functions protected by a single ``threading.RLock``.
6All external code should use the accessor functions instead of touching the
7dicts directly.
8"""
10import threading
12# ---------------------------------------------------------------------------
13# Internal state — never import these directly from other modules
14# ---------------------------------------------------------------------------
15_active_research: dict[int, dict] = {}
16_termination_flags: dict[int, bool] = {}
18# A single lock protects both dicts. They have strongly correlated lifecycles
19# (entries are created/destroyed together) and operations are fast dict lookups,
20# so a single lock is simpler and eliminates any deadlock risk from lock ordering.
21_lock = threading.RLock()
24# ===================================================================
25# active_research accessors
26# ===================================================================
29def is_research_active(research_id):
30 """Return True if *research_id* is in the active-research dict."""
31 with _lock:
32 return research_id in _active_research
35def get_active_research_ids():
36 """Return a list of all active research IDs (snapshot)."""
37 with _lock:
38 return list(_active_research.keys())
41def get_active_research_snapshot(research_id):
42 """Return a safe snapshot of an active-research entry, or ``None``.
44 The returned dict contains only serialisable fields (``thread`` is
45 excluded). The ``log`` list is shallow-copied — individual entries
46 are never mutated after creation, so this is safe.
47 """
48 with _lock:
49 entry = _active_research.get(research_id)
50 if entry is None:
51 return None
52 return {
53 "progress": entry.get("progress", 0),
54 "status": entry.get("status"),
55 "log": list(entry.get("log", [])),
56 "settings": dict(s)
57 if (s := entry.get("settings")) is not None
58 else None,
59 }
62def get_research_field(research_id, field, default=None):
63 """Return a single field from an active-research entry.
65 For mutable fields (``list``, ``dict``) a shallow copy is returned so
66 callers cannot accidentally mutate shared state.
67 """
68 with _lock:
69 entry = _active_research.get(research_id)
70 if entry is None:
71 return default
72 value = entry.get(field, default)
73 # We explicitly check for list/dict rather than using copy.copy()
74 # because entries may contain threading.Thread objects —
75 # copy.copy(Thread) creates a broken shallow copy sharing the same
76 # OS thread ident. Scalars (int, str, bool) are immutable and safe.
77 # For Thread objects, access them via iter_active_research() snapshots.
78 if isinstance(value, list):
79 return list(value)
80 if isinstance(value, dict):
81 return dict(value)
82 return value
85def set_active_research(research_id, data):
86 """Insert or replace the active-research entry for *research_id*."""
87 with _lock:
88 _active_research[research_id] = data
91def update_active_research(research_id, **fields):
92 """Update one or more fields on an existing entry.
94 Silently does nothing if *research_id* is not active.
95 """
96 with _lock:
97 entry = _active_research.get(research_id)
98 if entry is not None:
99 entry.update(fields)
102def append_research_log(research_id, log_entry):
103 """Append *log_entry* to the ``log`` list for *research_id*.
105 Silently does nothing if *research_id* is not active.
106 """
107 with _lock:
108 entry = _active_research.get(research_id)
109 if entry is not None:
110 entry.setdefault("log", []).append(log_entry)
113def update_progress_if_higher(research_id, new_progress):
114 """Atomically update progress only if *new_progress* exceeds current.
116 Returns the resulting progress value, or ``None`` if the research is
117 not active.
118 """
119 with _lock:
120 entry = _active_research.get(research_id)
121 if entry is None:
122 return None
123 current = entry.get("progress", 0)
124 if new_progress is not None and new_progress > current:
125 entry["progress"] = new_progress
126 return new_progress
127 return current
130def remove_active_research(research_id):
131 """Remove *research_id* from the active-research dict (if present)."""
132 with _lock:
133 _active_research.pop(research_id, None)
136def iter_active_research():
137 """Yield ``(research_id, snapshot)`` pairs for all active research.
139 Each *snapshot* is a shallow copy of the entry dict, safe to read
140 outside the lock.
141 """
142 with _lock:
143 items = [
144 (
145 rid,
146 {
147 "progress": entry.get("progress", 0),
148 "status": entry.get("status"),
149 "log": list(entry.get("log", [])),
150 "settings": dict(s)
151 if (s := entry.get("settings")) is not None
152 else None,
153 },
154 )
155 for rid, entry in _active_research.items()
156 ]
157 for rid, data in items:
158 yield rid, data
161def get_active_research_count():
162 """Return the number of active research entries."""
163 with _lock:
164 return len(_active_research)
167def get_usernames_with_active_research() -> set:
168 """Return set of usernames that have research currently running."""
169 with _lock:
170 return {
171 entry.get("settings", {}).get("username")
172 for entry in _active_research.values()
173 if entry.get("settings", {}).get("username")
174 }
177# ===================================================================
178# termination_flags accessors
179# ===================================================================
182def is_termination_requested(research_id):
183 """Return ``True`` if termination was requested for *research_id*."""
184 with _lock:
185 return _termination_flags.get(research_id, False)
188def set_termination_flag(research_id):
189 """Signal that *research_id* should be terminated."""
190 with _lock:
191 _termination_flags[research_id] = True
194def clear_termination_flag(research_id):
195 """Remove the termination flag for *research_id* (if present)."""
196 with _lock:
197 _termination_flags.pop(research_id, None)
200# ===================================================================
201# Compound / cleanup helpers
202# ===================================================================
205def is_research_thread_alive(research_id):
206 """Return ``True`` if the research thread for *research_id* is alive.
208 Returns ``False`` if the research is not active or has no thread.
209 """
210 with _lock:
211 entry = _active_research.get(research_id)
212 if entry is None:
213 return False
214 thread = entry.get("thread")
215 return thread is not None and thread.is_alive()
218def update_progress_and_check_active(research_id, new_progress):
219 """Atomically update progress (if higher) and check if research is active.
221 Returns ``(progress_value, is_active)`` where *progress_value* is the
222 resulting progress (or ``None`` if not active) and *is_active* indicates
223 whether *research_id* is still in the active-research dict.
224 """
225 with _lock:
226 entry = _active_research.get(research_id)
227 if entry is None:
228 return (None, False)
229 current = entry.get("progress", 0)
230 if new_progress is not None and new_progress > current:
231 entry["progress"] = new_progress
232 return (new_progress, True)
233 return (current, True)
236def cleanup_research(research_id):
237 """Remove *research_id* from both active_research and termination_flags
238 atomically under the single shared lock.
239 """
240 with _lock:
241 _active_research.pop(research_id, None)
242 _termination_flags.pop(research_id, None)