Coverage for src / local_deep_research / utilities / thread_context.py: 74%

34 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Utility functions for handling thread-local context propagation. 

3 

4This module provides helpers for propagating research context across thread boundaries, 

5which is necessary when strategies use ThreadPoolExecutor for parallel searches. 

6""" 

7 

8import functools 

9from threading import local 

10from typing import Any, Callable, Dict 

11 

12from loguru import logger 

13 

14# Lazy import to avoid database initialization for programmatic access 

15_search_tracker = None 

16 

17_g_thread_data = local() 

18""" 

19Thread-local storage for research context data. 

20""" 

21 

22 

23def set_search_context(context: Dict[str, Any]) -> None: 

24 """ 

25 Sets the research context for this entire thread. 

26 

27 Args: 

28 context: The context to set. 

29 

30 """ 

31 global _g_thread_data 

32 if hasattr(_g_thread_data, "context"): 

33 logger.warning( 

34 "Context already set for this thread. It will be overwritten." 

35 ) 

36 _g_thread_data.context = context.copy() 

37 

38 

39def get_search_context() -> Dict[str, Any] | None: 

40 """ 

41 Gets the current research context for this thread. 

42 

43 Returns: 

44 The context dictionary, or None if no context is set. 

45 

46 """ 

47 context = getattr(_g_thread_data, "context", None) 

48 if context is not None: 

49 context = context.copy() 

50 return context 

51 

52 

53def _get_search_tracker_if_needed(): 

54 """Get search tracker only if metrics are enabled.""" 

55 global _search_tracker 

56 if _search_tracker is None: 

57 try: 

58 from ..metrics.search_tracker import get_search_tracker 

59 

60 _search_tracker = get_search_tracker() 

61 except (ImportError, RuntimeError) as e: 

62 # If import fails due to database issues, metrics are disabled 

63 from loguru import logger 

64 

65 logger.debug( 

66 f"Metrics tracking disabled - search tracker not available: {e}" 

67 ) 

68 return None 

69 return _search_tracker 

70 

71 

72def preserve_research_context(func: Callable) -> Callable: 

73 """ 

74 Decorator that preserves research context across thread boundaries. 

75 

76 Use this decorator on functions that will be executed in ThreadPoolExecutor 

77 to ensure the research context (including research_id) is properly propagated. 

78 

79 When metrics are disabled (e.g., in programmatic mode), this decorator 

80 safely does nothing to avoid database dependencies. 

81 

82 Example: 

83 @preserve_research_context 

84 def search_task(query): 

85 return search_engine.run(query) 

86 """ 

87 # Try to capture current context, but don't fail if it's not set. There 

88 # are legitimate cases where it might not be set, such as for 

89 # programmatic access. 

90 context = get_search_context() 

91 

92 @functools.wraps(func) 

93 def wrapper(*args, **kwargs): 

94 if context is not None: 

95 set_search_context(context) 

96 

97 return func(*args, **kwargs) 

98 

99 return wrapper