Coverage for src / local_deep_research / utilities / threading_utils.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import threading 

2import uuid 

3from functools import wraps 

4from typing import Any, Callable, Hashable, Tuple 

5 

6from cachetools import cached, keys 

7from flask import current_app, g 

8from flask.ctx import AppContext 

9from loguru import logger 

10 

11g_thread_local_store = threading.local() 

12 

13 

14def thread_specific_cache(*args: Any, **kwargs: Any) -> Callable: 

15 """ 

16 A version of `cached()` that is local to a single thread. In other words, 

17 cache entries will only be valid in the thread where they were created. 

18 

19 Args: 

20 *args: Will be forwarded to `cached()`. 

21 **kwargs: Will be forwarded to `cached()`. 

22 

23 Returns: 

24 The wrapped function. 

25 

26 """ 

27 

28 def _key_func(*args_: Any, **kwargs_: Any) -> Tuple[Hashable, ...]: 

29 base_hash = keys.hashkey(*args_, **kwargs_) 

30 

31 if hasattr(g_thread_local_store, "thread_id"): 

32 # We already gave this thread a unique ID. Use that. 

33 thread_id = g_thread_local_store.thread_id 

34 else: 

35 # Give this thread a new unique ID. 

36 thread_id = uuid.uuid4().hex 

37 g_thread_local_store.thread_id = thread_id 

38 

39 return (thread_id,) + base_hash 

40 

41 return cached(*args, **kwargs, key=_key_func) 

42 

43 

44def thread_with_app_context(to_wrap: Callable) -> Callable: 

45 """ 

46 Decorator that wraps the entry point to a thread and injects the current 

47 app context from Flask. This is useful when we want to use multiple 

48 threads to handle a single request. 

49 

50 When using this wrapped function, `current_app.app_context()` should be 

51 passed as the first argument when initializing the thread. 

52 

53 Args: 

54 to_wrap: The function to wrap. 

55 

56 Returns: 

57 The wrapped function. 

58 

59 """ 

60 

61 @wraps(to_wrap) 

62 def _run_with_context( 

63 app_context: AppContext | None, *args: Any, **kwargs: Any 

64 ) -> Any: 

65 if app_context is None: 

66 # Do nothing. 

67 return to_wrap(*args, **kwargs) 

68 

69 with app_context: 

70 return to_wrap(*args, **kwargs) 

71 

72 return _run_with_context 

73 

74 

75def thread_context() -> AppContext | None: 

76 """ 

77 Pushes a new app context for a thread that is being spawned to handle the 

78 current request. Will copy all the global data from the current context. 

79 

80 Returns: 

81 The new context, or None if no context is active. 

82 

83 """ 

84 # Copy global data. 

85 global_data = {} 

86 try: 

87 for key in g: 

88 global_data[key] = g.get(key) 

89 except TypeError: 

90 # Context is not initialized. Don't change anything. 

91 pass 

92 

93 try: 

94 context = current_app.app_context() 

95 except RuntimeError: 

96 # Context is not initialized. 

97 logger.debug("No current app context, not passing to thread.") 

98 return None 

99 

100 with context: 

101 for key, value in global_data.items(): 

102 setattr(g, key, value) 

103 

104 return context