Coverage for src / local_deep_research / utilities / thread_context.py: 81%
47 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Utility functions for handling thread-local context propagation.
4This module provides helpers for propagating research context across thread boundaries,
5which is necessary when strategies use ThreadPoolExecutor for parallel searches.
6"""
8import functools
9from contextlib import contextmanager
10from threading import local
11from typing import Any, Callable, Dict, Generator
13from loguru import logger
15# Lazy import to avoid database initialization for programmatic access
16_search_tracker = None
18_g_thread_data = local()
19"""
20Thread-local storage for research context data.
21"""
24def set_search_context(context: Dict[str, Any]) -> None:
25 """
26 Sets the research context for this entire thread.
28 Args:
29 context: The context to set.
31 """
32 global _g_thread_data
33 if hasattr(_g_thread_data, "context"):
34 logger.debug(
35 "Context already set for this thread. It will be overwritten."
36 )
37 _g_thread_data.context = context.copy()
40def clear_search_context() -> None:
41 """
42 Clears the research context for this thread.
44 Should be called in a finally block after set_search_context() to prevent
45 context from leaking to subsequent tasks when threads are reused in a pool.
46 """
47 global _g_thread_data
48 if hasattr(_g_thread_data, "context"):
49 del _g_thread_data.context
52def get_search_context() -> Dict[str, Any] | None:
53 """
54 Gets the current research context for this thread.
56 Returns:
57 The context dictionary, or None if no context is set.
59 """
60 context = getattr(_g_thread_data, "context", None)
61 if context is not None:
62 context = context.copy()
63 return context
66@contextmanager
67def search_context(context: Dict[str, Any]) -> Generator[None, None, None]:
68 """Context manager that sets and clears search context automatically.
70 Ensures cleanup even if an exception occurs, preventing context leaks
71 when threads are reused in a pool.
73 Example:
74 with search_context({"research_id": "123"}):
75 results = engine.run(query)
76 """
77 set_search_context(context)
78 try:
79 yield
80 finally:
81 clear_search_context()
84def _get_search_tracker_if_needed():
85 """Get search tracker only if metrics are enabled."""
86 global _search_tracker
87 if _search_tracker is None:
88 try:
89 from ..metrics.search_tracker import get_search_tracker
91 _search_tracker = get_search_tracker()
92 except (ImportError, RuntimeError) as e:
93 # If import fails due to database issues, metrics are disabled
94 from loguru import logger
96 logger.debug(
97 f"Metrics tracking disabled - search tracker not available: {e}"
98 )
99 return None
100 return _search_tracker
103def preserve_research_context(func: Callable) -> Callable:
104 """
105 Decorator that preserves research context across thread boundaries.
107 Use this decorator on functions that will be executed in ThreadPoolExecutor
108 to ensure the research context (including research_id) is properly propagated.
110 When metrics are disabled (e.g., in programmatic mode), this decorator
111 safely does nothing to avoid database dependencies.
113 Example:
114 @preserve_research_context
115 def search_task(query):
116 return search_engine.run(query)
117 """
118 # Try to capture current context, but don't fail if it's not set. There
119 # are legitimate cases where it might not be set, such as for
120 # programmatic access.
121 context = get_search_context()
123 @functools.wraps(func)
124 def wrapper(*args, **kwargs):
125 if context is not None:
126 set_search_context(context)
128 try:
129 return func(*args, **kwargs)
130 finally:
131 if context is not None:
132 clear_search_context()
134 return wrapper