Coverage for src / local_deep_research / utilities / thread_context.py: 81%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Utility functions for handling thread-local context propagation. 

3 

4This module provides helpers for propagating research context across thread boundaries, 

5which is necessary when strategies use ThreadPoolExecutor for parallel searches. 

6""" 

7 

8import functools 

9from contextlib import contextmanager 

10from threading import local 

11from typing import Any, Callable, Dict, Generator 

12 

13from loguru import logger 

14 

15# Lazy import to avoid database initialization for programmatic access 

16_search_tracker = None 

17 

18_g_thread_data = local() 

19""" 

20Thread-local storage for research context data. 

21""" 

22 

23 

24def set_search_context(context: Dict[str, Any]) -> None: 

25 """ 

26 Sets the research context for this entire thread. 

27 

28 Args: 

29 context: The context to set. 

30 

31 """ 

32 global _g_thread_data 

33 if hasattr(_g_thread_data, "context"): 

34 logger.debug( 

35 "Context already set for this thread. It will be overwritten." 

36 ) 

37 _g_thread_data.context = context.copy() 

38 

39 

40def clear_search_context() -> None: 

41 """ 

42 Clears the research context for this thread. 

43 

44 Should be called in a finally block after set_search_context() to prevent 

45 context from leaking to subsequent tasks when threads are reused in a pool. 

46 """ 

47 global _g_thread_data 

48 if hasattr(_g_thread_data, "context"): 

49 del _g_thread_data.context 

50 

51 

52def get_search_context() -> Dict[str, Any] | None: 

53 """ 

54 Gets the current research context for this thread. 

55 

56 Returns: 

57 The context dictionary, or None if no context is set. 

58 

59 """ 

60 context = getattr(_g_thread_data, "context", None) 

61 if context is not None: 

62 context = context.copy() 

63 return context 

64 

65 

66@contextmanager 

67def search_context(context: Dict[str, Any]) -> Generator[None, None, None]: 

68 """Context manager that sets and clears search context automatically. 

69 

70 Ensures cleanup even if an exception occurs, preventing context leaks 

71 when threads are reused in a pool. 

72 

73 Example: 

74 with search_context({"research_id": "123"}): 

75 results = engine.run(query) 

76 """ 

77 set_search_context(context) 

78 try: 

79 yield 

80 finally: 

81 clear_search_context() 

82 

83 

84def _get_search_tracker_if_needed(): 

85 """Get search tracker only if metrics are enabled.""" 

86 global _search_tracker 

87 if _search_tracker is None: 

88 try: 

89 from ..metrics.search_tracker import get_search_tracker 

90 

91 _search_tracker = get_search_tracker() 

92 except (ImportError, RuntimeError) as e: 

93 # If import fails due to database issues, metrics are disabled 

94 from loguru import logger 

95 

96 logger.debug( 

97 f"Metrics tracking disabled - search tracker not available: {e}" 

98 ) 

99 return None 

100 return _search_tracker 

101 

102 

103def preserve_research_context(func: Callable) -> Callable: 

104 """ 

105 Decorator that preserves research context across thread boundaries. 

106 

107 Use this decorator on functions that will be executed in ThreadPoolExecutor 

108 to ensure the research context (including research_id) is properly propagated. 

109 

110 When metrics are disabled (e.g., in programmatic mode), this decorator 

111 safely does nothing to avoid database dependencies. 

112 

113 Example: 

114 @preserve_research_context 

115 def search_task(query): 

116 return search_engine.run(query) 

117 """ 

118 # Try to capture current context, but don't fail if it's not set. There 

119 # are legitimate cases where it might not be set, such as for 

120 # programmatic access. 

121 context = get_search_context() 

122 

123 @functools.wraps(func) 

124 def wrapper(*args, **kwargs): 

125 if context is not None: 

126 set_search_context(context) 

127 

128 try: 

129 return func(*args, **kwargs) 

130 finally: 

131 if context is not None: 

132 clear_search_context() 

133 

134 return wrapper