Coverage for src / local_deep_research / metrics / search_tracker.py: 98%
107 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Search call tracking system for metrics collection.
3Similar to token_counter.py but tracks search engine usage.
4"""
6from typing import Any, Dict, List, Optional
8from loguru import logger
9from sqlalchemy import case, func
11from ..utilities.thread_context import get_search_context
12from ..database.models import SearchCall
13from .database import MetricsDatabase
14from .query_utils import get_research_mode_condition, get_time_filter_condition
17class SearchTracker:
18 """Track search engine calls and performance metrics."""
20 def __init__(self, db: Optional[MetricsDatabase] = None):
21 """Initialize the search tracker."""
22 self.db = db or MetricsDatabase()
24 @staticmethod
25 def record_search(
26 engine_name: str,
27 query: str,
28 results_count: int = 0,
29 response_time_ms: int = 0,
30 success: bool = True,
31 error_message: Optional[str] = None,
32 ) -> None:
33 """Record a completed search operation directly to database."""
35 # Extract research context (thread-safe)
36 context = get_search_context()
38 # Skip metrics recording in programmatic mode or when no context is set
39 if context is None:
40 logger.warning(
41 "Skipping search metrics recording - no research context available "
42 "(likely in programmatic mode)"
43 )
44 return
46 research_id = context.get("research_id")
48 # Convert research_id to string if it's an integer (for backward compatibility)
49 if isinstance(research_id, int):
50 research_id = str(research_id)
51 research_query = context.get("research_query")
52 research_mode = context.get("research_mode", "unknown")
53 research_phase = context.get("research_phase", "search")
54 search_iteration = context.get("search_iteration", 0)
56 # Determine success status
57 success_status = "success" if success else "error"
58 error_type = None
59 if error_message:
60 error_type = (
61 type(error_message).__name__
62 if isinstance(error_message, Exception)
63 else "unknown_error"
64 )
66 # Record search call in database - only from background threads
67 try:
68 # Get username from context for thread-safe database
69 username = context.get("username")
70 if not username:
71 logger.warning(
72 f"Cannot save search metrics - no username in research context. "
73 f"Search: {engine_name} for '{query}'"
74 )
75 return
77 # Get password from context
78 password = context.get("user_password")
79 if not password:
80 logger.warning(
81 f"Cannot save search metrics - no password in research context. "
82 f"Search: {engine_name} for '{query}', username: {username}"
83 )
84 return
86 # Use thread-safe metrics writer
87 from ..database.thread_metrics import metrics_writer
89 try:
90 # Set password for this thread
91 metrics_writer.set_user_password(username, password)
93 with metrics_writer.get_session(username) as session:
94 search_call = SearchCall(
95 research_id=research_id,
96 research_query=research_query,
97 research_mode=research_mode,
98 research_phase=research_phase,
99 search_iteration=search_iteration,
100 search_engine=engine_name,
101 query=query,
102 results_count=results_count,
103 response_time_ms=response_time_ms,
104 success_status=success_status,
105 error_type=error_type,
106 error_message=str(error_message)
107 if error_message
108 else None,
109 )
110 session.add(search_call)
112 logger.debug(
113 f"Search call recorded to encrypted DB: {engine_name} - "
114 f"{results_count} results in {response_time_ms}ms"
115 )
116 except Exception:
117 logger.exception("Failed to write search metrics")
119 except Exception:
120 logger.exception("Failed to record search call")
122 def get_search_metrics(
123 self,
124 period: str = "30d",
125 research_mode: str = "all",
126 username: Optional[str] = None,
127 password: Optional[str] = None,
128 ) -> Dict[str, Any]:
129 """Get search engine usage metrics."""
130 with self.db.get_session(
131 username=username, password=password
132 ) as session:
133 try:
134 # Build base query with filters
135 query = session.query(SearchCall).filter(
136 SearchCall.search_engine.isnot(None)
137 )
139 # Apply time filter
140 time_condition = get_time_filter_condition(
141 period, SearchCall.timestamp
142 )
143 if time_condition is not None:
144 query = query.filter(time_condition)
146 # Apply research mode filter
147 mode_condition = get_research_mode_condition(
148 research_mode, SearchCall.research_mode
149 )
150 if mode_condition is not None:
151 query = query.filter(mode_condition)
153 # Get search engine statistics using ORM aggregation
154 search_stats = session.query(
155 SearchCall.search_engine,
156 func.count().label("call_count"),
157 func.avg(SearchCall.response_time_ms).label(
158 "avg_response_time"
159 ),
160 func.sum(SearchCall.results_count).label("total_results"),
161 func.avg(SearchCall.results_count).label(
162 "avg_results_per_call"
163 ),
164 func.sum(
165 case(
166 (SearchCall.success_status == "success", 1), else_=0
167 )
168 ).label("success_count"),
169 func.sum(
170 case((SearchCall.success_status == "error", 1), else_=0)
171 ).label("error_count"),
172 ).filter(SearchCall.search_engine.isnot(None))
174 # Apply same filters to stats query
175 if time_condition is not None:
176 search_stats = search_stats.filter(time_condition)
177 if mode_condition is not None:
178 search_stats = search_stats.filter(mode_condition)
180 search_stats = (
181 search_stats.group_by(SearchCall.search_engine)
182 .order_by(func.count().desc())
183 .all()
184 )
186 # Get recent search calls
187 recent_calls_query = session.query(SearchCall)
188 if time_condition is not None:
189 recent_calls_query = recent_calls_query.filter(
190 time_condition
191 )
192 if mode_condition is not None:
193 recent_calls_query = recent_calls_query.filter(
194 mode_condition
195 )
197 recent_calls = (
198 recent_calls_query.order_by(SearchCall.timestamp.desc())
199 .limit(20)
200 .all()
201 )
203 return {
204 "search_engine_stats": [
205 {
206 "engine": stat.search_engine,
207 "call_count": stat.call_count,
208 "avg_response_time": stat.avg_response_time or 0,
209 "total_results": stat.total_results or 0,
210 "avg_results_per_call": stat.avg_results_per_call
211 or 0,
212 "success_rate": (
213 (stat.success_count / stat.call_count * 100)
214 if stat.call_count > 0
215 else 0
216 ),
217 "error_count": stat.error_count or 0,
218 }
219 for stat in search_stats
220 ],
221 "recent_calls": [
222 {
223 "engine": call.search_engine,
224 "query": (
225 call.query[:100] + "..."
226 if len(call.query or "") > 100
227 else call.query
228 ),
229 "results_count": call.results_count,
230 "response_time_ms": call.response_time_ms,
231 "success_status": call.success_status,
232 "timestamp": str(call.timestamp),
233 }
234 for call in recent_calls
235 ],
236 }
238 except Exception:
239 logger.exception("Error getting search metrics")
240 return {"search_engine_stats": [], "recent_calls": []}
242 def get_research_search_metrics(
243 self,
244 research_id: str,
245 username: Optional[str] = None,
246 password: Optional[str] = None,
247 ) -> Dict[str, Any]:
248 """Get search metrics for a specific research session."""
249 with self.db.get_session(
250 username=username, password=password
251 ) as session:
252 try:
253 # Get all search calls for this research
254 search_calls = (
255 session.query(SearchCall)
256 .filter(SearchCall.research_id == research_id)
257 .order_by(SearchCall.timestamp.asc())
258 .all()
259 )
261 # Get search engine stats for this research
262 engine_stats = (
263 session.query(
264 SearchCall.search_engine,
265 func.count().label("call_count"),
266 func.avg(SearchCall.response_time_ms).label(
267 "avg_response_time"
268 ),
269 func.sum(SearchCall.results_count).label(
270 "total_results"
271 ),
272 func.sum(
273 case(
274 (SearchCall.success_status == "success", 1),
275 else_=0,
276 )
277 ).label("success_count"),
278 )
279 .filter(SearchCall.research_id == research_id)
280 .group_by(SearchCall.search_engine)
281 .order_by(func.count().desc())
282 .all()
283 )
285 # Calculate totals
286 total_searches = len(search_calls)
287 total_results = sum(
288 call.results_count or 0 for call in search_calls
289 )
290 avg_response_time = (
291 sum(call.response_time_ms or 0 for call in search_calls)
292 / total_searches
293 if total_searches > 0
294 else 0
295 )
296 successful_searches = sum(
297 1
298 for call in search_calls
299 if call.success_status == "success"
300 )
301 success_rate = (
302 (successful_searches / total_searches * 100)
303 if total_searches > 0
304 else 0
305 )
307 return {
308 "total_searches": total_searches,
309 "total_results": total_results,
310 "avg_response_time": round(avg_response_time),
311 "success_rate": round(success_rate, 1),
312 "search_calls": [
313 {
314 "engine": call.search_engine,
315 "query": call.query,
316 "results_count": call.results_count,
317 "response_time_ms": call.response_time_ms,
318 "success_status": call.success_status,
319 "timestamp": str(call.timestamp),
320 }
321 for call in search_calls
322 ],
323 "engine_stats": [
324 {
325 "engine": stat.search_engine,
326 "call_count": stat.call_count,
327 "avg_response_time": stat.avg_response_time or 0,
328 "total_results": stat.total_results or 0,
329 "success_rate": (
330 (stat.success_count / stat.call_count * 100)
331 if stat.call_count > 0
332 else 0
333 ),
334 }
335 for stat in engine_stats
336 ],
337 }
339 except Exception:
340 logger.exception("Error getting research search metrics")
341 return {
342 "total_searches": 0,
343 "total_results": 0,
344 "avg_response_time": 0,
345 "success_rate": 0,
346 "search_calls": [],
347 "engine_stats": [],
348 }
350 def get_search_time_series(
351 self,
352 period: str = "30d",
353 research_mode: str = "all",
354 username: Optional[str] = None,
355 password: Optional[str] = None,
356 ) -> List[Dict[str, Any]]:
357 """Get search activity time series data for charting.
359 Args:
360 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all')
361 research_mode: Research mode to filter by ('quick', 'detailed', 'all')
362 username: Username for database access
363 password: Password for database access
365 Returns:
366 List of time series data points with search engine activity
367 """
368 with self.db.get_session(
369 username=username, password=password
370 ) as session:
371 try:
372 # Build base query
373 query = session.query(SearchCall).filter(
374 SearchCall.search_engine.isnot(None),
375 SearchCall.timestamp.isnot(None),
376 )
378 # Apply time filter
379 time_condition = get_time_filter_condition(
380 period, SearchCall.timestamp
381 )
382 if time_condition is not None: 382 ↛ 386line 382 didn't jump to line 386 because the condition on line 382 was always true
383 query = query.filter(time_condition)
385 # Apply research mode filter
386 mode_condition = get_research_mode_condition(
387 research_mode, SearchCall.research_mode
388 )
389 if mode_condition is not None:
390 query = query.filter(mode_condition)
392 # Get all search calls ordered by time
393 search_calls = query.order_by(SearchCall.timestamp.asc()).all()
395 # Create time series data
396 time_series = []
397 for call in search_calls:
398 time_series.append(
399 {
400 "timestamp": (
401 str(call.timestamp) if call.timestamp else None
402 ),
403 "search_engine": call.search_engine,
404 "results_count": call.results_count or 0,
405 "response_time_ms": call.response_time_ms or 0,
406 "success_status": call.success_status,
407 "query": (
408 call.query[:50] + "..."
409 if call.query and len(call.query) > 50
410 else call.query
411 ),
412 }
413 )
415 return time_series
417 except Exception:
418 logger.exception("Error getting search time series")
419 return []
422def get_search_tracker() -> SearchTracker:
423 """Create a SearchTracker instance.
425 Returns a fresh instance each time. Callers should pass username/password
426 to the query methods so the correct per-user database is accessed.
427 """
428 return SearchTracker()