Coverage for src / local_deep_research / utilities / thread_context.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Utility functions for handling thread-local context propagation. 

3 

4This module provides helpers for propagating research context across thread boundaries, 

5which is necessary when strategies use ThreadPoolExecutor for parallel searches. 

6""" 

7 

8import functools 

9from contextlib import contextmanager 

10from threading import local 

11from typing import Any, Callable, Dict, Generator 

12 

13from loguru import logger 

14 

15_g_thread_data = local() 

16""" 

17Thread-local storage for research context data. 

18""" 

19 

20 

21def set_search_context(context: Dict[str, Any]) -> None: 

22 """ 

23 Sets the research context for this entire thread. 

24 

25 Args: 

26 context: The context to set. 

27 

28 """ 

29 global _g_thread_data 

30 if hasattr(_g_thread_data, "context"): 

31 logger.debug( 

32 "Context already set for this thread. It will be overwritten." 

33 ) 

34 _g_thread_data.context = context.copy() 

35 

36 

37def clear_search_context() -> None: 

38 """ 

39 Clears the research context for this thread. 

40 

41 Should be called in a finally block after set_search_context() to prevent 

42 context from leaking to subsequent tasks when threads are reused in a pool. 

43 """ 

44 global _g_thread_data 

45 if hasattr(_g_thread_data, "context"): 

46 del _g_thread_data.context 

47 

48 

49def get_search_context() -> Dict[str, Any] | None: 

50 """ 

51 Gets the current research context for this thread. 

52 

53 Returns: 

54 The context dictionary, or None if no context is set. 

55 

56 """ 

57 context = getattr(_g_thread_data, "context", None) 

58 if context is not None: 

59 context = context.copy() 

60 return context 

61 

62 

63@contextmanager 

64def search_context(context: Dict[str, Any]) -> Generator[None, None, None]: 

65 """Context manager that sets and clears search context automatically. 

66 

67 Ensures cleanup even if an exception occurs, preventing context leaks 

68 when threads are reused in a pool. 

69 

70 Example: 

71 with search_context({"research_id": "123"}): 

72 results = engine.run(query) 

73 """ 

74 set_search_context(context) 

75 try: 

76 yield 

77 finally: 

78 clear_search_context() 

79 

80 

81def preserve_research_context(func: Callable) -> Callable: 

82 """ 

83 Decorator that preserves research context across thread boundaries. 

84 

85 Use this decorator on functions that will be executed in ThreadPoolExecutor 

86 to ensure the research context (including research_id) is properly propagated. 

87 

88 When metrics are disabled (e.g., in programmatic mode), this decorator 

89 safely does nothing to avoid database dependencies. 

90 

91 Example: 

92 @preserve_research_context 

93 def search_task(query): 

94 return search_engine.run(query) 

95 """ 

96 # Try to capture current context, but don't fail if it's not set. There 

97 # are legitimate cases where it might not be set, such as for 

98 # programmatic access. 

99 context = get_search_context() 

100 

101 @functools.wraps(func) 

102 def wrapper(*args, **kwargs): 

103 if context is not None: 

104 set_search_context(context) 

105 

106 try: 

107 return func(*args, **kwargs) 

108 finally: 

109 if context is not None: 

110 clear_search_context() 

111 # Clean up thread-local DB engines created by metrics recording 

112 try: 

113 from ..database.thread_local_session import ( 

114 cleanup_current_thread, 

115 ) 

116 

117 cleanup_current_thread() 

118 except Exception: 

119 logger.debug( 

120 "preserve_research_context: error during cleanup_current_thread", 

121 exc_info=True, 

122 ) 

123 

124 return wrapper