Coverage for src/local_deep_research/utilities/thread_context.py: 100%
39 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Utility functions for handling research context propagation.
4This module provides helpers for propagating research context across thread
5and asyncio boundaries. Built on ``contextvars.ContextVar`` so the context
6is correctly inherited by frameworks that copy the context to worker
7threads (e.g. langchain's ``ContextThreadPoolExecutor`` used by LangGraph
8for parallel tool execution). For stdlib ``ThreadPoolExecutor`` — which
9does not copy context — use ``preserve_research_context`` below.
10"""
12import functools
13from contextlib import contextmanager
14from contextvars import ContextVar
15from typing import Any, Callable, Dict, Generator, Optional
17from loguru import logger
19_search_context_var: ContextVar[Optional[Dict[str, Any]]] = ContextVar(
20 "ldr_search_context", default=None
21)
24def set_search_context(context: Dict[str, Any]) -> None:
25 """
26 Sets the research context for the current execution context.
28 Args:
29 context: The context to set.
31 """
32 if _search_context_var.get() is not None:
33 logger.debug(
34 "Context already set for this thread. It will be overwritten."
35 )
36 _search_context_var.set(context.copy())
39def clear_search_context() -> None:
40 """
41 Clears the research context for the current execution context.
43 Should be called in a finally block after set_search_context() to prevent
44 context from leaking to subsequent tasks when threads are reused in a pool.
45 """
46 _search_context_var.set(None)
49def get_search_context() -> Dict[str, Any] | None:
50 """
51 Gets the current research context.
53 Returns:
54 The context dictionary, or None if no context is set.
56 """
57 context = _search_context_var.get()
58 if context is not None:
59 context = context.copy()
60 return context
63@contextmanager
64def search_context(context: Dict[str, Any]) -> Generator[None, None, None]:
65 """Context manager that sets and clears search context automatically.
67 Ensures cleanup even if an exception occurs, preventing context leaks
68 when threads are reused in a pool.
70 Example:
71 with search_context({"research_id": "123"}):
72 results = engine.run(query)
73 """
74 set_search_context(context)
75 try:
76 yield
77 finally:
78 clear_search_context()
81def preserve_research_context(func: Callable) -> Callable:
82 """
83 Decorator that preserves research context across thread boundaries.
85 Use this decorator on functions that will be executed in ThreadPoolExecutor
86 to ensure the research context (including research_id) is properly propagated.
88 When metrics are disabled (e.g., in programmatic mode), this decorator
89 safely does nothing to avoid database dependencies.
91 Example:
92 @preserve_research_context
93 def search_task(query):
94 return search_engine.run(query)
95 """
96 # Try to capture current context, but don't fail if it's not set. There
97 # are legitimate cases where it might not be set, such as for
98 # programmatic access.
99 context = get_search_context()
101 @functools.wraps(func)
102 def wrapper(*args, **kwargs):
103 if context is not None:
104 set_search_context(context)
106 try:
107 return func(*args, **kwargs)
108 finally:
109 if context is not None:
110 clear_search_context()
111 # Clean up thread-local DB engines created by metrics recording
112 try:
113 from ..database.thread_local_session import (
114 cleanup_current_thread,
115 )
117 cleanup_current_thread()
118 except Exception:
119 logger.debug(
120 "preserve_research_context: error during cleanup_current_thread",
121 exc_info=True,
122 )
124 return wrapper