Coverage for src / local_deep_research / utilities / thread_context.py: 74%
34 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Utility functions for handling thread-local context propagation.
4This module provides helpers for propagating research context across thread boundaries,
5which is necessary when strategies use ThreadPoolExecutor for parallel searches.
6"""
8import functools
9from threading import local
10from typing import Any, Callable, Dict
12from loguru import logger
14# Lazy import to avoid database initialization for programmatic access
15_search_tracker = None
17_g_thread_data = local()
18"""
19Thread-local storage for research context data.
20"""
23def set_search_context(context: Dict[str, Any]) -> None:
24 """
25 Sets the research context for this entire thread.
27 Args:
28 context: The context to set.
30 """
31 global _g_thread_data
32 if hasattr(_g_thread_data, "context"):
33 logger.warning(
34 "Context already set for this thread. It will be overwritten."
35 )
36 _g_thread_data.context = context.copy()
39def get_search_context() -> Dict[str, Any] | None:
40 """
41 Gets the current research context for this thread.
43 Returns:
44 The context dictionary, or None if no context is set.
46 """
47 context = getattr(_g_thread_data, "context", None)
48 if context is not None:
49 context = context.copy()
50 return context
53def _get_search_tracker_if_needed():
54 """Get search tracker only if metrics are enabled."""
55 global _search_tracker
56 if _search_tracker is None:
57 try:
58 from ..metrics.search_tracker import get_search_tracker
60 _search_tracker = get_search_tracker()
61 except (ImportError, RuntimeError) as e:
62 # If import fails due to database issues, metrics are disabled
63 from loguru import logger
65 logger.debug(
66 f"Metrics tracking disabled - search tracker not available: {e}"
67 )
68 return None
69 return _search_tracker
72def preserve_research_context(func: Callable) -> Callable:
73 """
74 Decorator that preserves research context across thread boundaries.
76 Use this decorator on functions that will be executed in ThreadPoolExecutor
77 to ensure the research context (including research_id) is properly propagated.
79 When metrics are disabled (e.g., in programmatic mode), this decorator
80 safely does nothing to avoid database dependencies.
82 Example:
83 @preserve_research_context
84 def search_task(query):
85 return search_engine.run(query)
86 """
87 # Try to capture current context, but don't fail if it's not set. There
88 # are legitimate cases where it might not be set, such as for
89 # programmatic access.
90 context = get_search_context()
92 @functools.wraps(func)
93 def wrapper(*args, **kwargs):
94 if context is not None:
95 set_search_context(context)
97 return func(*args, **kwargs)
99 return wrapper