Coverage for src / local_deep_research / metrics / search_tracker.py: 98%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Search call tracking system for metrics collection. 

3Similar to token_counter.py but tracks search engine usage. 

4""" 

5 

6from typing import Any, Dict, List, Optional 

7 

8from loguru import logger 

9from sqlalchemy import case, func 

10 

11from ..utilities.thread_context import get_search_context 

12from ..database.models import SearchCall 

13from .database import MetricsDatabase 

14from .query_utils import get_research_mode_condition, get_time_filter_condition 

15 

16 

17class SearchTracker: 

18 """Track search engine calls and performance metrics.""" 

19 

20 def __init__(self, db: Optional[MetricsDatabase] = None): 

21 """Initialize the search tracker.""" 

22 self.db = db or MetricsDatabase() 

23 

24 @staticmethod 

25 def record_search( 

26 engine_name: str, 

27 query: str, 

28 results_count: int = 0, 

29 response_time_ms: int = 0, 

30 success: bool = True, 

31 error_message: Optional[str] = None, 

32 ) -> None: 

33 """Record a completed search operation directly to database.""" 

34 

35 # Extract research context (thread-safe) 

36 context = get_search_context() 

37 

38 # Skip metrics recording in programmatic mode or when no context is set 

39 if context is None: 

40 logger.warning( 

41 "Skipping search metrics recording - no research context available " 

42 "(likely in programmatic mode)" 

43 ) 

44 return 

45 

46 research_id = context.get("research_id") 

47 

48 # Convert research_id to string if it's an integer (for backward compatibility) 

49 if isinstance(research_id, int): 

50 research_id = str(research_id) 

51 research_query = context.get("research_query") 

52 research_mode = context.get("research_mode", "unknown") 

53 research_phase = context.get("research_phase", "search") 

54 search_iteration = context.get("search_iteration", 0) 

55 

56 # Determine success status 

57 success_status = "success" if success else "error" 

58 error_type = None 

59 if error_message: 

60 error_type = ( 

61 type(error_message).__name__ 

62 if isinstance(error_message, Exception) 

63 else "unknown_error" 

64 ) 

65 

66 # Record search call in database - only from background threads 

67 try: 

68 # Get username from context for thread-safe database 

69 username = context.get("username") 

70 if not username: 

71 logger.warning( 

72 f"Cannot save search metrics - no username in research context. " 

73 f"Search: {engine_name} for '{query}'" 

74 ) 

75 return 

76 

77 # Get password from context 

78 password = context.get("user_password") 

79 if not password: 

80 logger.warning( 

81 f"Cannot save search metrics - no password in research context. " 

82 f"Search: {engine_name} for '{query}', username: {username}" 

83 ) 

84 return 

85 

86 # Use thread-safe metrics writer 

87 from ..database.thread_metrics import metrics_writer 

88 

89 try: 

90 # Set password for this thread 

91 metrics_writer.set_user_password(username, password) 

92 

93 with metrics_writer.get_session(username) as session: 

94 search_call = SearchCall( 

95 research_id=research_id, 

96 research_query=research_query, 

97 research_mode=research_mode, 

98 research_phase=research_phase, 

99 search_iteration=search_iteration, 

100 search_engine=engine_name, 

101 query=query, 

102 results_count=results_count, 

103 response_time_ms=response_time_ms, 

104 success_status=success_status, 

105 error_type=error_type, 

106 error_message=str(error_message) 

107 if error_message 

108 else None, 

109 ) 

110 session.add(search_call) 

111 

112 logger.debug( 

113 f"Search call recorded to encrypted DB: {engine_name} - " 

114 f"{results_count} results in {response_time_ms}ms" 

115 ) 

116 except Exception: 

117 logger.exception("Failed to write search metrics") 

118 

119 except Exception: 

120 logger.exception("Failed to record search call") 

121 

122 def get_search_metrics( 

123 self, 

124 period: str = "30d", 

125 research_mode: str = "all", 

126 username: Optional[str] = None, 

127 password: Optional[str] = None, 

128 ) -> Dict[str, Any]: 

129 """Get search engine usage metrics.""" 

130 with self.db.get_session( 

131 username=username, password=password 

132 ) as session: 

133 try: 

134 # Build base query with filters 

135 query = session.query(SearchCall).filter( 

136 SearchCall.search_engine.isnot(None) 

137 ) 

138 

139 # Apply time filter 

140 time_condition = get_time_filter_condition( 

141 period, SearchCall.timestamp 

142 ) 

143 if time_condition is not None: 

144 query = query.filter(time_condition) 

145 

146 # Apply research mode filter 

147 mode_condition = get_research_mode_condition( 

148 research_mode, SearchCall.research_mode 

149 ) 

150 if mode_condition is not None: 

151 query = query.filter(mode_condition) 

152 

153 # Get search engine statistics using ORM aggregation 

154 search_stats = session.query( 

155 SearchCall.search_engine, 

156 func.count().label("call_count"), 

157 func.avg(SearchCall.response_time_ms).label( 

158 "avg_response_time" 

159 ), 

160 func.sum(SearchCall.results_count).label("total_results"), 

161 func.avg(SearchCall.results_count).label( 

162 "avg_results_per_call" 

163 ), 

164 func.sum( 

165 case( 

166 (SearchCall.success_status == "success", 1), else_=0 

167 ) 

168 ).label("success_count"), 

169 func.sum( 

170 case((SearchCall.success_status == "error", 1), else_=0) 

171 ).label("error_count"), 

172 ).filter(SearchCall.search_engine.isnot(None)) 

173 

174 # Apply same filters to stats query 

175 if time_condition is not None: 

176 search_stats = search_stats.filter(time_condition) 

177 if mode_condition is not None: 

178 search_stats = search_stats.filter(mode_condition) 

179 

180 search_stats = ( 

181 search_stats.group_by(SearchCall.search_engine) 

182 .order_by(func.count().desc()) 

183 .all() 

184 ) 

185 

186 # Get recent search calls 

187 recent_calls_query = session.query(SearchCall) 

188 if time_condition is not None: 

189 recent_calls_query = recent_calls_query.filter( 

190 time_condition 

191 ) 

192 if mode_condition is not None: 

193 recent_calls_query = recent_calls_query.filter( 

194 mode_condition 

195 ) 

196 

197 recent_calls = ( 

198 recent_calls_query.order_by(SearchCall.timestamp.desc()) 

199 .limit(20) 

200 .all() 

201 ) 

202 

203 return { 

204 "search_engine_stats": [ 

205 { 

206 "engine": stat.search_engine, 

207 "call_count": stat.call_count, 

208 "avg_response_time": stat.avg_response_time or 0, 

209 "total_results": stat.total_results or 0, 

210 "avg_results_per_call": stat.avg_results_per_call 

211 or 0, 

212 "success_rate": ( 

213 (stat.success_count / stat.call_count * 100) 

214 if stat.call_count > 0 

215 else 0 

216 ), 

217 "error_count": stat.error_count or 0, 

218 } 

219 for stat in search_stats 

220 ], 

221 "recent_calls": [ 

222 { 

223 "engine": call.search_engine, 

224 "query": ( 

225 call.query[:100] + "..." 

226 if len(call.query or "") > 100 

227 else call.query 

228 ), 

229 "results_count": call.results_count, 

230 "response_time_ms": call.response_time_ms, 

231 "success_status": call.success_status, 

232 "timestamp": str(call.timestamp), 

233 } 

234 for call in recent_calls 

235 ], 

236 } 

237 

238 except Exception: 

239 logger.exception("Error getting search metrics") 

240 return {"search_engine_stats": [], "recent_calls": []} 

241 

242 def get_research_search_metrics( 

243 self, 

244 research_id: str, 

245 username: Optional[str] = None, 

246 password: Optional[str] = None, 

247 ) -> Dict[str, Any]: 

248 """Get search metrics for a specific research session.""" 

249 with self.db.get_session( 

250 username=username, password=password 

251 ) as session: 

252 try: 

253 # Get all search calls for this research 

254 search_calls = ( 

255 session.query(SearchCall) 

256 .filter(SearchCall.research_id == research_id) 

257 .order_by(SearchCall.timestamp.asc()) 

258 .all() 

259 ) 

260 

261 # Get search engine stats for this research 

262 engine_stats = ( 

263 session.query( 

264 SearchCall.search_engine, 

265 func.count().label("call_count"), 

266 func.avg(SearchCall.response_time_ms).label( 

267 "avg_response_time" 

268 ), 

269 func.sum(SearchCall.results_count).label( 

270 "total_results" 

271 ), 

272 func.sum( 

273 case( 

274 (SearchCall.success_status == "success", 1), 

275 else_=0, 

276 ) 

277 ).label("success_count"), 

278 ) 

279 .filter(SearchCall.research_id == research_id) 

280 .group_by(SearchCall.search_engine) 

281 .order_by(func.count().desc()) 

282 .all() 

283 ) 

284 

285 # Calculate totals 

286 total_searches = len(search_calls) 

287 total_results = sum( 

288 call.results_count or 0 for call in search_calls 

289 ) 

290 avg_response_time = ( 

291 sum(call.response_time_ms or 0 for call in search_calls) 

292 / total_searches 

293 if total_searches > 0 

294 else 0 

295 ) 

296 successful_searches = sum( 

297 1 

298 for call in search_calls 

299 if call.success_status == "success" 

300 ) 

301 success_rate = ( 

302 (successful_searches / total_searches * 100) 

303 if total_searches > 0 

304 else 0 

305 ) 

306 

307 return { 

308 "total_searches": total_searches, 

309 "total_results": total_results, 

310 "avg_response_time": round(avg_response_time), 

311 "success_rate": round(success_rate, 1), 

312 "search_calls": [ 

313 { 

314 "engine": call.search_engine, 

315 "query": call.query, 

316 "results_count": call.results_count, 

317 "response_time_ms": call.response_time_ms, 

318 "success_status": call.success_status, 

319 "timestamp": str(call.timestamp), 

320 } 

321 for call in search_calls 

322 ], 

323 "engine_stats": [ 

324 { 

325 "engine": stat.search_engine, 

326 "call_count": stat.call_count, 

327 "avg_response_time": stat.avg_response_time or 0, 

328 "total_results": stat.total_results or 0, 

329 "success_rate": ( 

330 (stat.success_count / stat.call_count * 100) 

331 if stat.call_count > 0 

332 else 0 

333 ), 

334 } 

335 for stat in engine_stats 

336 ], 

337 } 

338 

339 except Exception: 

340 logger.exception("Error getting research search metrics") 

341 return { 

342 "total_searches": 0, 

343 "total_results": 0, 

344 "avg_response_time": 0, 

345 "success_rate": 0, 

346 "search_calls": [], 

347 "engine_stats": [], 

348 } 

349 

350 def get_search_time_series( 

351 self, 

352 period: str = "30d", 

353 research_mode: str = "all", 

354 username: Optional[str] = None, 

355 password: Optional[str] = None, 

356 ) -> List[Dict[str, Any]]: 

357 """Get search activity time series data for charting. 

358 

359 Args: 

360 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all') 

361 research_mode: Research mode to filter by ('quick', 'detailed', 'all') 

362 username: Username for database access 

363 password: Password for database access 

364 

365 Returns: 

366 List of time series data points with search engine activity 

367 """ 

368 with self.db.get_session( 

369 username=username, password=password 

370 ) as session: 

371 try: 

372 # Build base query 

373 query = session.query(SearchCall).filter( 

374 SearchCall.search_engine.isnot(None), 

375 SearchCall.timestamp.isnot(None), 

376 ) 

377 

378 # Apply time filter 

379 time_condition = get_time_filter_condition( 

380 period, SearchCall.timestamp 

381 ) 

382 if time_condition is not None: 382 ↛ 386line 382 didn't jump to line 386 because the condition on line 382 was always true

383 query = query.filter(time_condition) 

384 

385 # Apply research mode filter 

386 mode_condition = get_research_mode_condition( 

387 research_mode, SearchCall.research_mode 

388 ) 

389 if mode_condition is not None: 

390 query = query.filter(mode_condition) 

391 

392 # Get all search calls ordered by time 

393 search_calls = query.order_by(SearchCall.timestamp.asc()).all() 

394 

395 # Create time series data 

396 time_series = [] 

397 for call in search_calls: 

398 time_series.append( 

399 { 

400 "timestamp": ( 

401 str(call.timestamp) if call.timestamp else None 

402 ), 

403 "search_engine": call.search_engine, 

404 "results_count": call.results_count or 0, 

405 "response_time_ms": call.response_time_ms or 0, 

406 "success_status": call.success_status, 

407 "query": ( 

408 call.query[:50] + "..." 

409 if call.query and len(call.query) > 50 

410 else call.query 

411 ), 

412 } 

413 ) 

414 

415 return time_series 

416 

417 except Exception: 

418 logger.exception("Error getting search time series") 

419 return [] 

420 

421 

422def get_search_tracker() -> SearchTracker: 

423 """Create a SearchTracker instance. 

424 

425 Returns a fresh instance each time. Callers should pass username/password 

426 to the query methods so the correct per-user database is accessed. 

427 """ 

428 return SearchTracker()