Coverage for src / local_deep_research / metrics / search_tracker.py: 89%
123 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Search call tracking system for metrics collection.
3Similar to token_counter.py but tracks search engine usage.
4"""
6from typing import Any, Dict, List, Optional
8from loguru import logger
9from sqlalchemy import case, func
11from ..utilities.thread_context import get_search_context
12from ..database.models import SearchCall
13from .database import MetricsDatabase
14from .query_utils import get_research_mode_condition, get_time_filter_condition
17class SearchTracker:
18 """Track search engine calls and performance metrics."""
20 def __init__(self, db: Optional[MetricsDatabase] = None):
21 """Initialize the search tracker."""
22 self.db = db or MetricsDatabase()
24 @staticmethod
25 def record_search(
26 engine_name: str,
27 query: str,
28 results_count: int = 0,
29 response_time_ms: int = 0,
30 success: bool = True,
31 error_message: Optional[str] = None,
32 ) -> None:
33 """Record a completed search operation directly to database."""
35 # Extract research context (thread-safe)
36 context = get_search_context()
38 # Skip metrics recording in programmatic mode or when no context is set
39 if context is None:
40 logger.warning(
41 "Skipping search metrics recording - no research context available "
42 "(likely in programmatic mode)"
43 )
44 return
46 research_id = context.get("research_id")
48 # Convert research_id to string if it's an integer (for backward compatibility)
49 if isinstance(research_id, int):
50 research_id = str(research_id)
51 research_query = context.get("research_query")
52 research_mode = context.get("research_mode", "unknown")
53 research_phase = context.get("research_phase", "search")
54 search_iteration = context.get("search_iteration", 0)
56 # Determine success status
57 success_status = "success" if success else "error"
58 error_type = None
59 if error_message:
60 error_type = (
61 type(error_message).__name__
62 if isinstance(error_message, Exception)
63 else "unknown_error"
64 )
66 # Record search call in database - only from background threads
67 try:
68 # Get username from context for thread-safe database
69 username = context.get("username")
70 if not username:
71 logger.warning(
72 f"Cannot save search metrics - no username in research context. "
73 f"Search: {engine_name} for '{query}'"
74 )
75 return
77 # Get password from context
78 password = context.get("user_password")
79 if not password:
80 logger.warning(
81 f"Cannot save search metrics - no password in research context. "
82 f"Search: {engine_name} for '{query}', username: {username}"
83 )
84 return
86 # Use thread-safe metrics writer
87 from ..database.thread_metrics import metrics_writer
89 try:
90 # Set password for this thread
91 metrics_writer.set_user_password(username, password)
93 with metrics_writer.get_session(username) as session:
94 search_call = SearchCall(
95 research_id=research_id,
96 research_query=research_query,
97 research_mode=research_mode,
98 research_phase=research_phase,
99 search_iteration=search_iteration,
100 search_engine=engine_name,
101 query=query,
102 results_count=results_count,
103 response_time_ms=response_time_ms,
104 success_status=success_status,
105 error_type=error_type,
106 error_message=str(error_message)
107 if error_message
108 else None,
109 )
110 session.add(search_call)
112 logger.debug(
113 f"Search call recorded to encrypted DB: {engine_name} - "
114 f"{results_count} results in {response_time_ms}ms"
115 )
116 except Exception:
117 logger.exception("Failed to write search metrics")
119 except Exception:
120 logger.exception("Failed to record search call")
122 def get_search_metrics(
123 self,
124 period: str = "30d",
125 research_mode: str = "all",
126 username: Optional[str] = None,
127 password: Optional[str] = None,
128 ) -> Dict[str, Any]:
129 """Get search engine usage metrics."""
130 with self.db.get_session(
131 username=username, password=password
132 ) as session:
133 try:
134 # Build base query with filters
135 query = session.query(SearchCall).filter(
136 SearchCall.search_engine.isnot(None)
137 )
139 # Apply time filter
140 time_condition = get_time_filter_condition(
141 period, SearchCall.timestamp
142 )
143 if time_condition is not None: 143 ↛ 147line 143 didn't jump to line 147 because the condition on line 143 was always true
144 query = query.filter(time_condition)
146 # Apply research mode filter
147 mode_condition = get_research_mode_condition(
148 research_mode, SearchCall.research_mode
149 )
150 if mode_condition is not None:
151 query = query.filter(mode_condition)
153 # Get search engine statistics using ORM aggregation
154 search_stats = session.query(
155 SearchCall.search_engine,
156 func.count().label("call_count"),
157 func.avg(SearchCall.response_time_ms).label(
158 "avg_response_time"
159 ),
160 func.sum(SearchCall.results_count).label("total_results"),
161 func.avg(SearchCall.results_count).label(
162 "avg_results_per_call"
163 ),
164 func.sum(
165 case(
166 (SearchCall.success_status == "success", 1), else_=0
167 )
168 ).label("success_count"),
169 func.sum(
170 case((SearchCall.success_status == "error", 1), else_=0)
171 ).label("error_count"),
172 ).filter(SearchCall.search_engine.isnot(None))
174 # Apply same filters to stats query
175 if time_condition is not None: 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was always true
176 search_stats = search_stats.filter(time_condition)
177 if mode_condition is not None:
178 search_stats = search_stats.filter(mode_condition)
180 search_stats = (
181 search_stats.group_by(SearchCall.search_engine)
182 .order_by(func.count().desc())
183 .all()
184 )
186 # Get recent search calls
187 recent_calls_query = session.query(SearchCall)
188 if time_condition is not None: 188 ↛ 192line 188 didn't jump to line 192 because the condition on line 188 was always true
189 recent_calls_query = recent_calls_query.filter(
190 time_condition
191 )
192 if mode_condition is not None:
193 recent_calls_query = recent_calls_query.filter(
194 mode_condition
195 )
197 recent_calls = (
198 recent_calls_query.order_by(SearchCall.timestamp.desc())
199 .limit(20)
200 .all()
201 )
203 return {
204 "search_engine_stats": [
205 {
206 "engine": stat.search_engine,
207 "call_count": stat.call_count,
208 "avg_response_time": stat.avg_response_time or 0,
209 "total_results": stat.total_results or 0,
210 "avg_results_per_call": stat.avg_results_per_call
211 or 0,
212 "success_rate": (
213 (stat.success_count / stat.call_count * 100)
214 if stat.call_count > 0
215 else 0
216 ),
217 "error_count": stat.error_count or 0,
218 }
219 for stat in search_stats
220 ],
221 "recent_calls": [
222 {
223 "engine": call.search_engine,
224 "query": (
225 call.query[:100] + "..."
226 if len(call.query or "") > 100
227 else call.query
228 ),
229 "results_count": call.results_count,
230 "response_time_ms": call.response_time_ms,
231 "success_status": call.success_status,
232 "timestamp": str(call.timestamp),
233 }
234 for call in recent_calls
235 ],
236 }
238 except Exception:
239 logger.exception("Error getting search metrics")
240 return {"search_engine_stats": [], "recent_calls": []}
242 def get_research_search_metrics(self, research_id: str) -> Dict[str, Any]:
243 """Get search metrics for a specific research session."""
244 with self.db.get_session() as session:
245 try:
246 # Get all search calls for this research
247 search_calls = (
248 session.query(SearchCall)
249 .filter(SearchCall.research_id == research_id)
250 .order_by(SearchCall.timestamp.asc())
251 .all()
252 )
254 # Get search engine stats for this research
255 engine_stats = (
256 session.query(
257 SearchCall.search_engine,
258 func.count().label("call_count"),
259 func.avg(SearchCall.response_time_ms).label(
260 "avg_response_time"
261 ),
262 func.sum(SearchCall.results_count).label(
263 "total_results"
264 ),
265 func.sum(
266 case(
267 (SearchCall.success_status == "success", 1),
268 else_=0,
269 )
270 ).label("success_count"),
271 )
272 .filter(SearchCall.research_id == research_id)
273 .group_by(SearchCall.search_engine)
274 .order_by(func.count().desc())
275 .all()
276 )
278 # Calculate totals
279 total_searches = len(search_calls)
280 total_results = sum(
281 call.results_count or 0 for call in search_calls
282 )
283 avg_response_time = (
284 sum(call.response_time_ms or 0 for call in search_calls)
285 / total_searches
286 if total_searches > 0
287 else 0
288 )
289 successful_searches = sum(
290 1
291 for call in search_calls
292 if call.success_status == "success"
293 )
294 success_rate = (
295 (successful_searches / total_searches * 100)
296 if total_searches > 0
297 else 0
298 )
300 return {
301 "total_searches": total_searches,
302 "total_results": total_results,
303 "avg_response_time": round(avg_response_time),
304 "success_rate": round(success_rate, 1),
305 "search_calls": [
306 {
307 "engine": call.search_engine,
308 "query": call.query,
309 "results_count": call.results_count,
310 "response_time_ms": call.response_time_ms,
311 "success_status": call.success_status,
312 "timestamp": str(call.timestamp),
313 }
314 for call in search_calls
315 ],
316 "engine_stats": [
317 {
318 "engine": stat.search_engine,
319 "call_count": stat.call_count,
320 "avg_response_time": stat.avg_response_time or 0,
321 "total_results": stat.total_results or 0,
322 "success_rate": (
323 (stat.success_count / stat.call_count * 100)
324 if stat.call_count > 0
325 else 0
326 ),
327 }
328 for stat in engine_stats
329 ],
330 }
332 except Exception:
333 logger.exception("Error getting research search metrics")
334 return {
335 "total_searches": 0,
336 "total_results": 0,
337 "avg_response_time": 0,
338 "success_rate": 0,
339 "search_calls": [],
340 "engine_stats": [],
341 }
343 def get_search_time_series(
344 self, period: str = "30d", research_mode: str = "all"
345 ) -> List[Dict[str, Any]]:
346 """Get search activity time series data for charting.
348 Args:
349 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all')
350 research_mode: Research mode to filter by ('quick', 'detailed', 'all')
352 Returns:
353 List of time series data points with search engine activity
354 """
355 with self.db.get_session() as session:
356 try:
357 # Build base query
358 query = session.query(SearchCall).filter(
359 SearchCall.search_engine.isnot(None),
360 SearchCall.timestamp.isnot(None),
361 )
363 # Apply time filter
364 time_condition = get_time_filter_condition(
365 period, SearchCall.timestamp
366 )
367 if time_condition is not None: 367 ↛ 371line 367 didn't jump to line 371 because the condition on line 367 was always true
368 query = query.filter(time_condition)
370 # Apply research mode filter
371 mode_condition = get_research_mode_condition(
372 research_mode, SearchCall.research_mode
373 )
374 if mode_condition is not None:
375 query = query.filter(mode_condition)
377 # Get all search calls ordered by time
378 search_calls = query.order_by(SearchCall.timestamp.asc()).all()
380 # Create time series data
381 time_series = []
382 for call in search_calls: 382 ↛ 383line 382 didn't jump to line 383 because the loop on line 382 never started
383 time_series.append(
384 {
385 "timestamp": (
386 str(call.timestamp) if call.timestamp else None
387 ),
388 "search_engine": call.search_engine,
389 "results_count": call.results_count or 0,
390 "response_time_ms": call.response_time_ms or 0,
391 "success_status": call.success_status,
392 "query": (
393 call.query[:50] + "..."
394 if call.query and len(call.query) > 50
395 else call.query
396 ),
397 }
398 )
400 return time_series
402 except Exception:
403 logger.exception("Error getting search time series")
404 return []
407# Global search tracker instance
408_search_tracker = None
411def get_search_tracker() -> SearchTracker:
412 """Get the global search tracker instance with proper authentication."""
413 global _search_tracker
414 if _search_tracker is None:
415 # Try to get credentials from Flask session if available
416 try:
417 from flask import session as flask_session
418 from ..database.session_passwords import session_password_store
420 username = flask_session.get("username")
421 session_id = flask_session.get("session_id")
422 user_password = None
424 if session_id and username: 424 ↛ 430line 424 didn't jump to line 430 because the condition on line 424 was always true
425 user_password = session_password_store.get_session_password(
426 username, session_id
427 )
429 # Create metrics DB with credentials
430 from .database import MetricsDatabase
432 metrics_db = MetricsDatabase(
433 username=username, password=user_password
434 )
435 _search_tracker = SearchTracker(db=metrics_db)
436 except Exception:
437 logger.exception(
438 "Error initializing SearchTracker with Flask session credentials"
439 )
440 _search_tracker = SearchTracker()
442 return _search_tracker