Coverage for src / local_deep_research / benchmarks / metrics / calculation.py: 67%

126 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Unified metrics calculation module. 

3 

4This module provides functions for calculating metrics for both 

5standard benchmarks and optimization tasks. 

6""" 

7 

8import json 

9from loguru import logger 

10from pathlib import Path 

11import tempfile 

12import time 

13from datetime import datetime, UTC 

14from typing import Any, Dict, Optional 

15 

16 

17def calculate_metrics(results_file: str) -> Dict[str, Any]: 

18 """ 

19 Calculate evaluation metrics from results. 

20 

21 Args: 

22 results_file: Path to results file 

23 

24 Returns: 

25 Dictionary of metrics 

26 """ 

27 # Load results 

28 results = [] 

29 try: 

30 with open(results_file, "r") as f: 

31 for line in f: 

32 if line.strip(): 32 ↛ 31line 32 didn't jump to line 31 because the condition on line 32 was always true

33 results.append(json.loads(line)) 

34 except Exception as e: 

35 logger.exception("Error loading results file") 

36 return {"error": str(e)} 

37 

38 if not results: 

39 return {"error": "No results found"} 

40 

41 # Calculate accuracy 

42 graded_results = [r for r in results if "is_correct" in r] 

43 correct_count = sum(1 for r in graded_results if r.get("is_correct", False)) 

44 total_graded = len(graded_results) 

45 accuracy = correct_count / total_graded if total_graded else 0 

46 

47 # Calculate average processing time if available 

48 processing_times = [ 

49 r.get("processing_time", 0) for r in results if "processing_time" in r 

50 ] 

51 avg_time = ( 

52 sum(processing_times) / len(processing_times) if processing_times else 0 

53 ) 

54 

55 # Average confidence if available 

56 confidence_values = [] 

57 for r in results: 

58 if r.get("confidence"): 

59 try: 

60 confidence_values.append(int(r["confidence"])) 

61 except (ValueError, TypeError): 

62 pass 

63 

64 avg_confidence = ( 

65 sum(confidence_values) / len(confidence_values) 

66 if confidence_values 

67 else 0 

68 ) 

69 

70 # Calculate error rate 

71 error_count = sum(1 for r in results if "error" in r) 

72 error_rate = error_count / len(results) if results else 0 

73 

74 # Basic metrics 

75 metrics = { 

76 "total_examples": len(results), 

77 "graded_examples": total_graded, 

78 "correct": correct_count, 

79 "accuracy": accuracy, 

80 "average_processing_time": avg_time, 

81 "average_confidence": avg_confidence, 

82 "error_count": error_count, 

83 "error_rate": error_rate, 

84 "timestamp": datetime.now(UTC).isoformat(), 

85 } 

86 

87 # If we have category information, calculate per-category metrics 

88 categories = {} 

89 for r in graded_results: 

90 if "category" in r: 

91 category = r["category"] 

92 if category not in categories: 

93 categories[category] = {"total": 0, "correct": 0} 

94 categories[category]["total"] += 1 

95 if r.get("is_correct", False): 

96 categories[category]["correct"] += 1 

97 

98 if categories: 

99 category_metrics = {} 

100 for category, counts in categories.items(): 

101 category_metrics[category] = { 

102 "total": counts["total"], 

103 "correct": counts["correct"], 

104 "accuracy": ( 

105 counts["correct"] / counts["total"] 

106 if counts["total"] 

107 else 0 

108 ), 

109 } 

110 metrics["categories"] = category_metrics 

111 

112 return metrics 

113 

114 

115def evaluate_benchmark_quality( 

116 system_config: Dict[str, Any], 

117 num_examples: int = 10, 

118 output_dir: Optional[str] = None, 

119) -> Dict[str, float]: 

120 """ 

121 Evaluate quality using SimpleQA benchmark. 

122 

123 Args: 

124 system_config: Configuration parameters to evaluate 

125 num_examples: Number of benchmark examples to use 

126 output_dir: Directory to save results (temporary if None) 

127 

128 Returns: 

129 Dictionary with benchmark metrics 

130 """ 

131 from ..runners import run_simpleqa_benchmark 

132 

133 # Create temporary directory if not provided 

134 temp_dir = None 

135 if output_dir is None: 

136 temp_dir = tempfile.mkdtemp(prefix="ldr_benchmark_") 

137 output_dir = temp_dir 

138 

139 try: 

140 # Create search configuration from system config 

141 search_config = { 

142 "iterations": system_config.get("iterations", 2), 

143 "questions_per_iteration": system_config.get( 

144 "questions_per_iteration", 2 

145 ), 

146 "search_strategy": system_config.get("search_strategy", "iterdrag"), 

147 "search_tool": system_config.get("search_tool", "searxng"), 

148 "model_name": system_config.get("model_name"), 

149 "provider": system_config.get("provider"), 

150 } 

151 

152 # Run benchmark 

153 logger.info(f"Running SimpleQA benchmark with {num_examples} examples") 

154 benchmark_results = run_simpleqa_benchmark( 

155 num_examples=num_examples, 

156 output_dir=output_dir, 

157 search_config=search_config, 

158 run_evaluation=True, 

159 ) 

160 

161 # Extract key metrics 

162 metrics = benchmark_results.get("metrics", {}) 

163 accuracy = metrics.get("accuracy", 0.0) 

164 

165 # Return only the most relevant metrics 

166 return { 

167 "accuracy": accuracy, 

168 "quality_score": accuracy, # Map accuracy directly to quality score 

169 } 

170 

171 except Exception as e: 

172 logger.exception(f"Error in benchmark evaluation: {e!s}") 

173 return {"accuracy": 0.0, "quality_score": 0.0, "error": str(e)} 

174 

175 finally: 

176 # Clean up temporary directory if we created it 

177 if temp_dir and Path(temp_dir).exists(): 

178 import shutil 

179 

180 try: 

181 shutil.rmtree(temp_dir) 

182 except Exception as e: 

183 logger.warning(f"Failed to clean up temporary directory: {e!s}") 

184 

185 

186def measure_execution_time( 

187 system_config: Dict[str, Any], 

188 query: str = "test query", 

189 search_tool: Optional[str] = None, 

190 num_runs: int = 1, 

191) -> Dict[str, float]: 

192 """ 

193 Measure execution time for a given configuration. 

194 

195 Args: 

196 system_config: Configuration parameters to evaluate 

197 query: Query to use for timing tests 

198 search_tool: Override search tool 

199 num_runs: Number of runs to average time over 

200 

201 Returns: 

202 Dictionary with speed metrics 

203 """ 

204 from local_deep_research.search_system import SearchSystem 

205 

206 if search_tool: 

207 system_config["search_tool"] = search_tool 

208 

209 # Configure system 

210 system = SearchSystem( 

211 iterations=system_config.get("iterations", 2), 

212 questions_per_iteration=system_config.get("questions_per_iteration", 2), 

213 search_strategy=system_config.get("search_strategy", "iterdrag"), 

214 search_tool=system_config.get("search_tool", "searxng"), 

215 model_name=system_config.get("model_name"), 

216 provider=system_config.get("provider"), 

217 ) 

218 

219 # Run multiple times and calculate average 

220 total_time = 0 

221 times = [] 

222 

223 try: 

224 for i in range(num_runs): 

225 logger.info(f"Executing speed test run {i + 1}/{num_runs}") 

226 start_time = time.time() 

227 system.search(query, full_response=False) 

228 end_time = time.time() 

229 run_time = end_time - start_time 

230 times.append(run_time) 

231 total_time += run_time 

232 

233 # Calculate metrics 

234 average_time = total_time / num_runs 

235 

236 # Calculate speed score (0-1 scale, lower times are better) 

237 # Using sigmoid-like normalization where: 

238 # - Times around 30s get ~0.5 score 

239 # - Times under 10s get >0.8 score 

240 # - Times over 2min get <0.2 score 

241 speed_score = 1.0 / (1.0 + (average_time / 30.0)) 

242 

243 return { 

244 "average_time": average_time, 

245 "min_time": min(times), 

246 "max_time": max(times), 

247 "speed_score": speed_score, 

248 } 

249 

250 except Exception as e: 

251 logger.exception(f"Error in speed measurement: {e!s}") 

252 return {"average_time": 0.0, "speed_score": 0.0, "error": str(e)} 

253 

254 

255def calculate_quality_metrics( 

256 system_config: Dict[str, Any], 

257 num_examples: int = 2, # Reduced for quicker demo 

258 output_dir: Optional[str] = None, 

259) -> Dict[str, float]: 

260 """ 

261 Calculate quality-related metrics for a configuration. 

262 

263 Args: 

264 system_config: Configuration parameters to evaluate 

265 num_examples: Number of benchmark examples to use 

266 output_dir: Directory to save results (temporary if None) 

267 

268 Returns: 

269 Dictionary with quality metrics 

270 """ 

271 # Run quality evaluation 

272 quality_results = evaluate_benchmark_quality( 

273 system_config=system_config, 

274 num_examples=num_examples, 

275 output_dir=output_dir, 

276 ) 

277 

278 # Return normalized quality score 

279 return { 

280 "quality_score": quality_results.get("quality_score", 0.0), 

281 "accuracy": quality_results.get("accuracy", 0.0), 

282 } 

283 

284 

285def calculate_speed_metrics( 

286 system_config: Dict[str, Any], 

287 query: str = "test query", 

288 search_tool: Optional[str] = None, 

289 num_runs: int = 1, 

290) -> Dict[str, float]: 

291 """ 

292 Calculate speed-related metrics for a configuration. 

293 

294 Args: 

295 system_config: Configuration parameters to evaluate 

296 query: Query to use for timing tests 

297 search_tool: Override search tool 

298 num_runs: Number of runs to average time over 

299 

300 Returns: 

301 Dictionary with speed metrics 

302 """ 

303 # Run speed measurement 

304 speed_results = measure_execution_time( 

305 system_config=system_config, 

306 query=query, 

307 search_tool=search_tool, 

308 num_runs=num_runs, 

309 ) 

310 

311 # Return normalized speed score 

312 return { 

313 "speed_score": speed_results.get("speed_score", 0.0), 

314 "average_time": speed_results.get("average_time", 0.0), 

315 } 

316 

317 

318def calculate_resource_metrics( 

319 system_config: Dict[str, Any], 

320 query: str = "test query", 

321 search_tool: Optional[str] = None, 

322) -> Dict[str, float]: 

323 """ 

324 Calculate resource usage metrics for a configuration. 

325 

326 Args: 

327 system_config: Configuration parameters to evaluate 

328 query: Query to use for resource tests 

329 search_tool: Override search tool 

330 

331 Returns: 

332 Dictionary with resource metrics 

333 """ 

334 # This is a simplified version - in a real implementation, 

335 # you would measure memory usage, API call counts, etc. 

336 

337 # For now, we'll use a heuristic based on configuration values 

338 iterations = system_config.get("iterations", 2) 

339 questions = system_config.get("questions_per_iteration", 2) 

340 max_results = system_config.get("max_results", 50) 

341 

342 # Simple heuristic: more iterations, questions, and results = more resources 

343 complexity = iterations * questions * (max_results / 50) 

344 

345 # Normalize to 0-1 scale (lower is better) 

346 resource_score = 1.0 / (1.0 + (complexity / 4.0)) 

347 

348 return { 

349 "resource_score": resource_score, 

350 "estimated_complexity": complexity, 

351 } 

352 

353 

354def calculate_combined_score( 

355 metrics: Dict[str, Dict[str, float]], weights: Dict[str, float] = None 

356) -> float: 

357 """ 

358 Calculate a combined optimization score from multiple metrics. 

359 

360 Args: 

361 metrics: Dictionary of metric categories and their values 

362 weights: Dictionary of weights for each metric category 

363 

364 Returns: 

365 Combined score between 0 and 1 

366 """ 

367 # Default weights if not provided 

368 if weights is None: 

369 weights = {"quality": 0.6, "speed": 0.3, "resource": 0.1} 

370 

371 # Normalize weights to sum to 1 

372 total_weight = sum(weights.values()) 

373 if total_weight == 0: 

374 return 0.0 

375 

376 norm_weights = {k: v / total_weight for k, v in weights.items()} 

377 

378 # Calculate weighted score 

379 score = 0.0 

380 

381 # Quality component 

382 if "quality" in metrics and "quality" in norm_weights: 382 ↛ 387line 382 didn't jump to line 387 because the condition on line 382 was always true

383 quality_score = metrics["quality"].get("quality_score", 0.0) 

384 score += quality_score * norm_weights["quality"] 

385 

386 # Speed component 

387 if "speed" in metrics and "speed" in norm_weights: 

388 speed_score = metrics["speed"].get("speed_score", 0.0) 

389 score += speed_score * norm_weights["speed"] 

390 

391 # Resource component 

392 if "resource" in metrics and "resource" in norm_weights: 

393 resource_score = metrics["resource"].get("resource_score", 0.0) 

394 score += resource_score * norm_weights["resource"] 

395 

396 return score