Coverage for src/local_deep_research/utilities/thread_context.py: 100%

39 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Utility functions for handling research context propagation. 

3 

4This module provides helpers for propagating research context across thread 

5and asyncio boundaries. Built on ``contextvars.ContextVar`` so the context 

6is correctly inherited by frameworks that copy the context to worker 

7threads (e.g. langchain's ``ContextThreadPoolExecutor`` used by LangGraph 

8for parallel tool execution). For stdlib ``ThreadPoolExecutor`` — which 

9does not copy context — use ``preserve_research_context`` below. 

10""" 

11 

12import functools 

13from contextlib import contextmanager 

14from contextvars import ContextVar 

15from typing import Any, Callable, Dict, Generator, Optional 

16 

17from loguru import logger 

18 

19_search_context_var: ContextVar[Optional[Dict[str, Any]]] = ContextVar( 

20 "ldr_search_context", default=None 

21) 

22 

23 

24def set_search_context(context: Dict[str, Any]) -> None: 

25 """ 

26 Sets the research context for the current execution context. 

27 

28 Args: 

29 context: The context to set. 

30 

31 """ 

32 if _search_context_var.get() is not None: 

33 logger.debug( 

34 "Context already set for this thread. It will be overwritten." 

35 ) 

36 _search_context_var.set(context.copy()) 

37 

38 

39def clear_search_context() -> None: 

40 """ 

41 Clears the research context for the current execution context. 

42 

43 Should be called in a finally block after set_search_context() to prevent 

44 context from leaking to subsequent tasks when threads are reused in a pool. 

45 """ 

46 _search_context_var.set(None) 

47 

48 

49def get_search_context() -> Dict[str, Any] | None: 

50 """ 

51 Gets the current research context. 

52 

53 Returns: 

54 The context dictionary, or None if no context is set. 

55 

56 """ 

57 context = _search_context_var.get() 

58 if context is not None: 

59 context = context.copy() 

60 return context 

61 

62 

63@contextmanager 

64def search_context(context: Dict[str, Any]) -> Generator[None, None, None]: 

65 """Context manager that sets and clears search context automatically. 

66 

67 Ensures cleanup even if an exception occurs, preventing context leaks 

68 when threads are reused in a pool. 

69 

70 Example: 

71 with search_context({"research_id": "123"}): 

72 results = engine.run(query) 

73 """ 

74 set_search_context(context) 

75 try: 

76 yield 

77 finally: 

78 clear_search_context() 

79 

80 

81def preserve_research_context(func: Callable) -> Callable: 

82 """ 

83 Decorator that preserves research context across thread boundaries. 

84 

85 Use this decorator on functions that will be executed in ThreadPoolExecutor 

86 to ensure the research context (including research_id) is properly propagated. 

87 

88 When metrics are disabled (e.g., in programmatic mode), this decorator 

89 safely does nothing to avoid database dependencies. 

90 

91 Example: 

92 @preserve_research_context 

93 def search_task(query): 

94 return search_engine.run(query) 

95 """ 

96 # Try to capture current context, but don't fail if it's not set. There 

97 # are legitimate cases where it might not be set, such as for 

98 # programmatic access. 

99 context = get_search_context() 

100 

101 @functools.wraps(func) 

102 def wrapper(*args, **kwargs): 

103 if context is not None: 

104 set_search_context(context) 

105 

106 try: 

107 return func(*args, **kwargs) 

108 finally: 

109 if context is not None: 

110 clear_search_context() 

111 # Clean up thread-local DB engines created by metrics recording 

112 try: 

113 from ..database.thread_local_session import ( 

114 cleanup_current_thread, 

115 ) 

116 

117 cleanup_current_thread() 

118 except Exception: 

119 logger.debug( 

120 "preserve_research_context: error during cleanup_current_thread", 

121 exc_info=True, 

122 ) 

123 

124 return wrapper