Coverage for src / local_deep_research / utilities / thread_context.py: 100%
41 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Utility functions for handling thread-local context propagation.
4This module provides helpers for propagating research context across thread boundaries,
5which is necessary when strategies use ThreadPoolExecutor for parallel searches.
6"""
8import functools
9from contextlib import contextmanager
10from threading import local
11from typing import Any, Callable, Dict, Generator
13from loguru import logger
15_g_thread_data = local()
16"""
17Thread-local storage for research context data.
18"""
21def set_search_context(context: Dict[str, Any]) -> None:
22 """
23 Sets the research context for this entire thread.
25 Args:
26 context: The context to set.
28 """
29 global _g_thread_data
30 if hasattr(_g_thread_data, "context"):
31 logger.debug(
32 "Context already set for this thread. It will be overwritten."
33 )
34 _g_thread_data.context = context.copy()
37def clear_search_context() -> None:
38 """
39 Clears the research context for this thread.
41 Should be called in a finally block after set_search_context() to prevent
42 context from leaking to subsequent tasks when threads are reused in a pool.
43 """
44 global _g_thread_data
45 if hasattr(_g_thread_data, "context"):
46 del _g_thread_data.context
49def get_search_context() -> Dict[str, Any] | None:
50 """
51 Gets the current research context for this thread.
53 Returns:
54 The context dictionary, or None if no context is set.
56 """
57 context = getattr(_g_thread_data, "context", None)
58 if context is not None:
59 context = context.copy()
60 return context
63@contextmanager
64def search_context(context: Dict[str, Any]) -> Generator[None, None, None]:
65 """Context manager that sets and clears search context automatically.
67 Ensures cleanup even if an exception occurs, preventing context leaks
68 when threads are reused in a pool.
70 Example:
71 with search_context({"research_id": "123"}):
72 results = engine.run(query)
73 """
74 set_search_context(context)
75 try:
76 yield
77 finally:
78 clear_search_context()
81def preserve_research_context(func: Callable) -> Callable:
82 """
83 Decorator that preserves research context across thread boundaries.
85 Use this decorator on functions that will be executed in ThreadPoolExecutor
86 to ensure the research context (including research_id) is properly propagated.
88 When metrics are disabled (e.g., in programmatic mode), this decorator
89 safely does nothing to avoid database dependencies.
91 Example:
92 @preserve_research_context
93 def search_task(query):
94 return search_engine.run(query)
95 """
96 # Try to capture current context, but don't fail if it's not set. There
97 # are legitimate cases where it might not be set, such as for
98 # programmatic access.
99 context = get_search_context()
101 @functools.wraps(func)
102 def wrapper(*args, **kwargs):
103 if context is not None:
104 set_search_context(context)
106 try:
107 return func(*args, **kwargs)
108 finally:
109 if context is not None:
110 clear_search_context()
111 # Clean up thread-local DB engines created by metrics recording
112 try:
113 from ..database.thread_local_session import (
114 cleanup_current_thread,
115 )
117 cleanup_current_thread()
118 except Exception:
119 logger.debug(
120 "preserve_research_context: error during cleanup_current_thread",
121 exc_info=True,
122 )
124 return wrapper