Coverage for src / local_deep_research / database / thread_metrics.py: 82%
49 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Thread-safe metrics database access.
4This module provides a way for background threads to write metrics
5to the user's encrypted database by creating thread-local connections
6with the provided password.
7"""
9import threading
10from contextlib import contextmanager
11from typing import Optional
13from loguru import logger
14from sqlalchemy.orm import Session
16from .encrypted_db import db_manager
19class ThreadSafeMetricsWriter:
20 """
21 Thread-safe writer for metrics to encrypted user databases.
22 Creates encrypted connections per thread using provided passwords.
23 """
25 def __init__(self):
26 self._thread_local = threading.local()
28 def set_user_password(self, username: str, password: str):
29 """
30 Store user password for the current thread.
31 This allows the thread to create its own encrypted connection.
33 IMPORTANT: This is safe because:
34 1. Password is already in memory (user is logged in)
35 2. It's only stored thread-locally
36 3. It's cleared when the thread ends
37 """
39 if not hasattr(self._thread_local, "passwords"):
40 self._thread_local.passwords = {}
41 self._thread_local.passwords[username] = password
43 @contextmanager
44 def get_session(self, username: str = None) -> Session:
45 """
46 Get a database session for metrics in the current thread.
47 Creates a new encrypted connection if needed.
49 Args:
50 username: The username for database access. If not provided,
51 will attempt to get it from Flask session.
52 """
53 # If username not provided, try to get it from Flask session
54 if username is None:
55 try:
56 from flask import session as flask_session
57 from werkzeug.exceptions import Unauthorized
59 username = flask_session.get("username")
60 if not username: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 raise Unauthorized("No username in Flask session")
62 except (ImportError, RuntimeError) as e:
63 # Flask context not available or no session
64 raise ValueError(f"Cannot determine username: {e}")
66 # Get password for this user in this thread
67 if not hasattr(self._thread_local, "passwords"):
68 raise ValueError("No password set for thread metrics access")
70 password = self._thread_local.passwords.get(username)
72 if not password:
73 raise ValueError(
74 f"No password available for user {username} in this thread"
75 )
77 # Create a thread-safe session for this user
78 session = None
79 try:
80 session = db_manager.create_thread_safe_session_for_metrics(
81 username, password
82 )
83 if not session: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 raise ValueError(
85 f"Failed to create session for user {username}"
86 )
87 yield session
88 session.commit()
89 except Exception:
90 logger.exception(f"Session error for {username}")
91 if session: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 session.rollback()
93 raise
94 finally:
95 if session:
96 session.close()
98 def write_token_metrics(
99 self, username: str, research_id: Optional[int], token_data: dict
100 ):
101 """
102 Write token metrics from any thread.
104 Args:
105 username: The username (for database access)
106 research_id: The research ID
107 token_data: Dictionary with token metrics data
108 """
109 with self.get_session(username) as session:
110 # Import here to avoid circular imports
111 from .models import TokenUsage
113 # Create TokenUsage record
114 token_usage = TokenUsage(
115 research_id=research_id,
116 model_name=token_data.get("model_name"),
117 model_provider=token_data.get("provider"),
118 prompt_tokens=token_data.get("prompt_tokens", 0),
119 completion_tokens=token_data.get("completion_tokens", 0),
120 total_tokens=token_data.get("prompt_tokens", 0)
121 + token_data.get("completion_tokens", 0),
122 # Research context
123 research_query=token_data.get("research_query"),
124 research_mode=token_data.get("research_mode"),
125 research_phase=token_data.get("research_phase"),
126 search_iteration=token_data.get("search_iteration"),
127 # Performance metrics
128 response_time_ms=token_data.get("response_time_ms"),
129 success_status=token_data.get("success_status", "success"),
130 error_type=token_data.get("error_type"),
131 # Search engine context
132 search_engines_planned=token_data.get("search_engines_planned"),
133 search_engine_selected=token_data.get("search_engine_selected"),
134 # Call stack tracking
135 calling_file=token_data.get("calling_file"),
136 calling_function=token_data.get("calling_function"),
137 call_stack=token_data.get("call_stack"),
138 # Context overflow detection
139 context_limit=token_data.get("context_limit"),
140 context_truncated=token_data.get("context_truncated", False),
141 tokens_truncated=token_data.get("tokens_truncated"),
142 truncation_ratio=token_data.get("truncation_ratio"),
143 # Raw Ollama metrics
144 ollama_prompt_eval_count=token_data.get(
145 "ollama_prompt_eval_count"
146 ),
147 ollama_eval_count=token_data.get("ollama_eval_count"),
148 ollama_total_duration=token_data.get("ollama_total_duration"),
149 ollama_load_duration=token_data.get("ollama_load_duration"),
150 ollama_prompt_eval_duration=token_data.get(
151 "ollama_prompt_eval_duration"
152 ),
153 ollama_eval_duration=token_data.get("ollama_eval_duration"),
154 )
155 session.add(token_usage)
158# Global instance for thread-safe metrics
159metrics_writer = ThreadSafeMetricsWriter()