Coverage for src / local_deep_research / utilities / threading_utils.py: 100%
42 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import threading
2import uuid
3from functools import wraps
4from typing import Any, Callable, Hashable, Tuple
6from cachetools import cached, keys
7from flask import current_app, g
8from flask.ctx import AppContext
9from loguru import logger
11g_thread_local_store = threading.local()
14def thread_specific_cache(*args: Any, **kwargs: Any) -> Callable:
15 """
16 A version of `cached()` that is local to a single thread. In other words,
17 cache entries will only be valid in the thread where they were created.
19 Args:
20 *args: Will be forwarded to `cached()`.
21 **kwargs: Will be forwarded to `cached()`.
23 Returns:
24 The wrapped function.
26 """
28 def _key_func(*args_: Any, **kwargs_: Any) -> Tuple[Hashable, ...]:
29 base_hash = keys.hashkey(*args_, **kwargs_)
31 if hasattr(g_thread_local_store, "thread_id"):
32 # We already gave this thread a unique ID. Use that.
33 thread_id = g_thread_local_store.thread_id
34 else:
35 # Give this thread a new unique ID.
36 thread_id = uuid.uuid4().hex
37 g_thread_local_store.thread_id = thread_id
39 return (thread_id,) + base_hash
41 return cached(*args, **kwargs, key=_key_func)
44def thread_with_app_context(to_wrap: Callable) -> Callable:
45 """
46 Decorator that wraps the entry point to a thread and injects the current
47 app context from Flask. This is useful when we want to use multiple
48 threads to handle a single request.
50 When using this wrapped function, `current_app.app_context()` should be
51 passed as the first argument when initializing the thread.
53 Args:
54 to_wrap: The function to wrap.
56 Returns:
57 The wrapped function.
59 """
61 @wraps(to_wrap)
62 def _run_with_context(
63 app_context: AppContext | None, *args: Any, **kwargs: Any
64 ) -> Any:
65 if app_context is None:
66 # Do nothing.
67 return to_wrap(*args, **kwargs)
69 with app_context:
70 return to_wrap(*args, **kwargs)
72 return _run_with_context
75def thread_context() -> AppContext | None:
76 """
77 Pushes a new app context for a thread that is being spawned to handle the
78 current request. Will copy all the global data from the current context.
80 Returns:
81 The new context, or None if no context is active.
83 """
84 # Copy global data.
85 global_data = {}
86 try:
87 for key in g:
88 global_data[key] = g.get(key)
89 except TypeError:
90 # Context is not initialized. Don't change anything.
91 pass
93 try:
94 context = current_app.app_context()
95 except RuntimeError:
96 # Context is not initialized.
97 logger.debug("No current app context, not passing to thread.")
98 return None
100 with context:
101 for key, value in global_data.items():
102 setattr(g, key, value)
104 return context