Coverage for src / local_deep_research / metrics / search_tracker.py: 89%

123 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Search call tracking system for metrics collection. 

3Similar to token_counter.py but tracks search engine usage. 

4""" 

5 

6from typing import Any, Dict, List, Optional 

7 

8from loguru import logger 

9from sqlalchemy import case, func 

10 

11from ..utilities.thread_context import get_search_context 

12from ..database.models import SearchCall 

13from .database import MetricsDatabase 

14from .query_utils import get_research_mode_condition, get_time_filter_condition 

15 

16 

17class SearchTracker: 

18 """Track search engine calls and performance metrics.""" 

19 

20 def __init__(self, db: Optional[MetricsDatabase] = None): 

21 """Initialize the search tracker.""" 

22 self.db = db or MetricsDatabase() 

23 

24 @staticmethod 

25 def record_search( 

26 engine_name: str, 

27 query: str, 

28 results_count: int = 0, 

29 response_time_ms: int = 0, 

30 success: bool = True, 

31 error_message: Optional[str] = None, 

32 ) -> None: 

33 """Record a completed search operation directly to database.""" 

34 

35 # Extract research context (thread-safe) 

36 context = get_search_context() 

37 

38 # Skip metrics recording in programmatic mode or when no context is set 

39 if context is None: 

40 logger.warning( 

41 "Skipping search metrics recording - no research context available " 

42 "(likely in programmatic mode)" 

43 ) 

44 return 

45 

46 research_id = context.get("research_id") 

47 

48 # Convert research_id to string if it's an integer (for backward compatibility) 

49 if isinstance(research_id, int): 

50 research_id = str(research_id) 

51 research_query = context.get("research_query") 

52 research_mode = context.get("research_mode", "unknown") 

53 research_phase = context.get("research_phase", "search") 

54 search_iteration = context.get("search_iteration", 0) 

55 

56 # Determine success status 

57 success_status = "success" if success else "error" 

58 error_type = None 

59 if error_message: 

60 error_type = ( 

61 type(error_message).__name__ 

62 if isinstance(error_message, Exception) 

63 else "unknown_error" 

64 ) 

65 

66 # Record search call in database - only from background threads 

67 try: 

68 # Get username from context for thread-safe database 

69 username = context.get("username") 

70 if not username: 

71 logger.warning( 

72 f"Cannot save search metrics - no username in research context. " 

73 f"Search: {engine_name} for '{query}'" 

74 ) 

75 return 

76 

77 # Get password from context 

78 password = context.get("user_password") 

79 if not password: 

80 logger.warning( 

81 f"Cannot save search metrics - no password in research context. " 

82 f"Search: {engine_name} for '{query}', username: {username}" 

83 ) 

84 return 

85 

86 # Use thread-safe metrics writer 

87 from ..database.thread_metrics import metrics_writer 

88 

89 try: 

90 # Set password for this thread 

91 metrics_writer.set_user_password(username, password) 

92 

93 with metrics_writer.get_session(username) as session: 

94 search_call = SearchCall( 

95 research_id=research_id, 

96 research_query=research_query, 

97 research_mode=research_mode, 

98 research_phase=research_phase, 

99 search_iteration=search_iteration, 

100 search_engine=engine_name, 

101 query=query, 

102 results_count=results_count, 

103 response_time_ms=response_time_ms, 

104 success_status=success_status, 

105 error_type=error_type, 

106 error_message=str(error_message) 

107 if error_message 

108 else None, 

109 ) 

110 session.add(search_call) 

111 

112 logger.debug( 

113 f"Search call recorded to encrypted DB: {engine_name} - " 

114 f"{results_count} results in {response_time_ms}ms" 

115 ) 

116 except Exception: 

117 logger.exception("Failed to write search metrics") 

118 

119 except Exception: 

120 logger.exception("Failed to record search call") 

121 

122 def get_search_metrics( 

123 self, 

124 period: str = "30d", 

125 research_mode: str = "all", 

126 username: Optional[str] = None, 

127 password: Optional[str] = None, 

128 ) -> Dict[str, Any]: 

129 """Get search engine usage metrics.""" 

130 with self.db.get_session( 

131 username=username, password=password 

132 ) as session: 

133 try: 

134 # Build base query with filters 

135 query = session.query(SearchCall).filter( 

136 SearchCall.search_engine.isnot(None) 

137 ) 

138 

139 # Apply time filter 

140 time_condition = get_time_filter_condition( 

141 period, SearchCall.timestamp 

142 ) 

143 if time_condition is not None: 143 ↛ 147line 143 didn't jump to line 147 because the condition on line 143 was always true

144 query = query.filter(time_condition) 

145 

146 # Apply research mode filter 

147 mode_condition = get_research_mode_condition( 

148 research_mode, SearchCall.research_mode 

149 ) 

150 if mode_condition is not None: 

151 query = query.filter(mode_condition) 

152 

153 # Get search engine statistics using ORM aggregation 

154 search_stats = session.query( 

155 SearchCall.search_engine, 

156 func.count().label("call_count"), 

157 func.avg(SearchCall.response_time_ms).label( 

158 "avg_response_time" 

159 ), 

160 func.sum(SearchCall.results_count).label("total_results"), 

161 func.avg(SearchCall.results_count).label( 

162 "avg_results_per_call" 

163 ), 

164 func.sum( 

165 case( 

166 (SearchCall.success_status == "success", 1), else_=0 

167 ) 

168 ).label("success_count"), 

169 func.sum( 

170 case((SearchCall.success_status == "error", 1), else_=0) 

171 ).label("error_count"), 

172 ).filter(SearchCall.search_engine.isnot(None)) 

173 

174 # Apply same filters to stats query 

175 if time_condition is not None: 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was always true

176 search_stats = search_stats.filter(time_condition) 

177 if mode_condition is not None: 

178 search_stats = search_stats.filter(mode_condition) 

179 

180 search_stats = ( 

181 search_stats.group_by(SearchCall.search_engine) 

182 .order_by(func.count().desc()) 

183 .all() 

184 ) 

185 

186 # Get recent search calls 

187 recent_calls_query = session.query(SearchCall) 

188 if time_condition is not None: 188 ↛ 192line 188 didn't jump to line 192 because the condition on line 188 was always true

189 recent_calls_query = recent_calls_query.filter( 

190 time_condition 

191 ) 

192 if mode_condition is not None: 

193 recent_calls_query = recent_calls_query.filter( 

194 mode_condition 

195 ) 

196 

197 recent_calls = ( 

198 recent_calls_query.order_by(SearchCall.timestamp.desc()) 

199 .limit(20) 

200 .all() 

201 ) 

202 

203 return { 

204 "search_engine_stats": [ 

205 { 

206 "engine": stat.search_engine, 

207 "call_count": stat.call_count, 

208 "avg_response_time": stat.avg_response_time or 0, 

209 "total_results": stat.total_results or 0, 

210 "avg_results_per_call": stat.avg_results_per_call 

211 or 0, 

212 "success_rate": ( 

213 (stat.success_count / stat.call_count * 100) 

214 if stat.call_count > 0 

215 else 0 

216 ), 

217 "error_count": stat.error_count or 0, 

218 } 

219 for stat in search_stats 

220 ], 

221 "recent_calls": [ 

222 { 

223 "engine": call.search_engine, 

224 "query": ( 

225 call.query[:100] + "..." 

226 if len(call.query or "") > 100 

227 else call.query 

228 ), 

229 "results_count": call.results_count, 

230 "response_time_ms": call.response_time_ms, 

231 "success_status": call.success_status, 

232 "timestamp": str(call.timestamp), 

233 } 

234 for call in recent_calls 

235 ], 

236 } 

237 

238 except Exception: 

239 logger.exception("Error getting search metrics") 

240 return {"search_engine_stats": [], "recent_calls": []} 

241 

242 def get_research_search_metrics(self, research_id: str) -> Dict[str, Any]: 

243 """Get search metrics for a specific research session.""" 

244 with self.db.get_session() as session: 

245 try: 

246 # Get all search calls for this research 

247 search_calls = ( 

248 session.query(SearchCall) 

249 .filter(SearchCall.research_id == research_id) 

250 .order_by(SearchCall.timestamp.asc()) 

251 .all() 

252 ) 

253 

254 # Get search engine stats for this research 

255 engine_stats = ( 

256 session.query( 

257 SearchCall.search_engine, 

258 func.count().label("call_count"), 

259 func.avg(SearchCall.response_time_ms).label( 

260 "avg_response_time" 

261 ), 

262 func.sum(SearchCall.results_count).label( 

263 "total_results" 

264 ), 

265 func.sum( 

266 case( 

267 (SearchCall.success_status == "success", 1), 

268 else_=0, 

269 ) 

270 ).label("success_count"), 

271 ) 

272 .filter(SearchCall.research_id == research_id) 

273 .group_by(SearchCall.search_engine) 

274 .order_by(func.count().desc()) 

275 .all() 

276 ) 

277 

278 # Calculate totals 

279 total_searches = len(search_calls) 

280 total_results = sum( 

281 call.results_count or 0 for call in search_calls 

282 ) 

283 avg_response_time = ( 

284 sum(call.response_time_ms or 0 for call in search_calls) 

285 / total_searches 

286 if total_searches > 0 

287 else 0 

288 ) 

289 successful_searches = sum( 

290 1 

291 for call in search_calls 

292 if call.success_status == "success" 

293 ) 

294 success_rate = ( 

295 (successful_searches / total_searches * 100) 

296 if total_searches > 0 

297 else 0 

298 ) 

299 

300 return { 

301 "total_searches": total_searches, 

302 "total_results": total_results, 

303 "avg_response_time": round(avg_response_time), 

304 "success_rate": round(success_rate, 1), 

305 "search_calls": [ 

306 { 

307 "engine": call.search_engine, 

308 "query": call.query, 

309 "results_count": call.results_count, 

310 "response_time_ms": call.response_time_ms, 

311 "success_status": call.success_status, 

312 "timestamp": str(call.timestamp), 

313 } 

314 for call in search_calls 

315 ], 

316 "engine_stats": [ 

317 { 

318 "engine": stat.search_engine, 

319 "call_count": stat.call_count, 

320 "avg_response_time": stat.avg_response_time or 0, 

321 "total_results": stat.total_results or 0, 

322 "success_rate": ( 

323 (stat.success_count / stat.call_count * 100) 

324 if stat.call_count > 0 

325 else 0 

326 ), 

327 } 

328 for stat in engine_stats 

329 ], 

330 } 

331 

332 except Exception: 

333 logger.exception("Error getting research search metrics") 

334 return { 

335 "total_searches": 0, 

336 "total_results": 0, 

337 "avg_response_time": 0, 

338 "success_rate": 0, 

339 "search_calls": [], 

340 "engine_stats": [], 

341 } 

342 

343 def get_search_time_series( 

344 self, period: str = "30d", research_mode: str = "all" 

345 ) -> List[Dict[str, Any]]: 

346 """Get search activity time series data for charting. 

347 

348 Args: 

349 period: Time period to filter by ('7d', '30d', '3m', '1y', 'all') 

350 research_mode: Research mode to filter by ('quick', 'detailed', 'all') 

351 

352 Returns: 

353 List of time series data points with search engine activity 

354 """ 

355 with self.db.get_session() as session: 

356 try: 

357 # Build base query 

358 query = session.query(SearchCall).filter( 

359 SearchCall.search_engine.isnot(None), 

360 SearchCall.timestamp.isnot(None), 

361 ) 

362 

363 # Apply time filter 

364 time_condition = get_time_filter_condition( 

365 period, SearchCall.timestamp 

366 ) 

367 if time_condition is not None: 367 ↛ 371line 367 didn't jump to line 371 because the condition on line 367 was always true

368 query = query.filter(time_condition) 

369 

370 # Apply research mode filter 

371 mode_condition = get_research_mode_condition( 

372 research_mode, SearchCall.research_mode 

373 ) 

374 if mode_condition is not None: 

375 query = query.filter(mode_condition) 

376 

377 # Get all search calls ordered by time 

378 search_calls = query.order_by(SearchCall.timestamp.asc()).all() 

379 

380 # Create time series data 

381 time_series = [] 

382 for call in search_calls: 382 ↛ 383line 382 didn't jump to line 383 because the loop on line 382 never started

383 time_series.append( 

384 { 

385 "timestamp": ( 

386 str(call.timestamp) if call.timestamp else None 

387 ), 

388 "search_engine": call.search_engine, 

389 "results_count": call.results_count or 0, 

390 "response_time_ms": call.response_time_ms or 0, 

391 "success_status": call.success_status, 

392 "query": ( 

393 call.query[:50] + "..." 

394 if call.query and len(call.query) > 50 

395 else call.query 

396 ), 

397 } 

398 ) 

399 

400 return time_series 

401 

402 except Exception: 

403 logger.exception("Error getting search time series") 

404 return [] 

405 

406 

407# Global search tracker instance 

408_search_tracker = None 

409 

410 

411def get_search_tracker() -> SearchTracker: 

412 """Get the global search tracker instance with proper authentication.""" 

413 global _search_tracker 

414 if _search_tracker is None: 

415 # Try to get credentials from Flask session if available 

416 try: 

417 from flask import session as flask_session 

418 from ..database.session_passwords import session_password_store 

419 

420 username = flask_session.get("username") 

421 session_id = flask_session.get("session_id") 

422 user_password = None 

423 

424 if session_id and username: 424 ↛ 430line 424 didn't jump to line 430 because the condition on line 424 was always true

425 user_password = session_password_store.get_session_password( 

426 username, session_id 

427 ) 

428 

429 # Create metrics DB with credentials 

430 from .database import MetricsDatabase 

431 

432 metrics_db = MetricsDatabase( 

433 username=username, password=user_password 

434 ) 

435 _search_tracker = SearchTracker(db=metrics_db) 

436 except Exception: 

437 logger.exception( 

438 "Error initializing SearchTracker with Flask session credentials" 

439 ) 

440 _search_tracker = SearchTracker() 

441 

442 return _search_tracker