Coverage for src/local_deep_research/benchmarks/metrics/calculation.py: 99%

140 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Unified metrics calculation module. 

3 

4This module provides functions for calculating metrics for both 

5standard benchmarks and optimization tasks. 

6""" 

7 

8import json 

9from loguru import logger 

10from pathlib import Path 

11import tempfile 

12import time 

13from datetime import datetime, UTC 

14from typing import Any, Dict, Optional 

15 

16 

17def calculate_metrics(results_file: str) -> Dict[str, Any]: 

18 """ 

19 Calculate evaluation metrics from results. 

20 

21 Args: 

22 results_file: Path to results file 

23 

24 Returns: 

25 Dictionary of metrics 

26 """ 

27 # Load results 

28 results = [] 

29 try: 

30 with open(results_file, "r", encoding="utf-8") as f: 

31 for line in f: 

32 if line.strip(): 

33 results.append(json.loads(line)) 

34 except Exception as e: 

35 logger.exception("Error loading results file") 

36 return {"error": str(e)} 

37 

38 if not results: 

39 return {"error": "No results found"} 

40 

41 from .statistics import wilson_score_interval 

42 

43 # Calculate accuracy 

44 graded_results = [r for r in results if "is_correct" in r] 

45 correct_count = sum(1 for r in graded_results if r.get("is_correct", False)) 

46 total_graded = len(graded_results) 

47 accuracy = correct_count / total_graded if total_graded else 0 

48 

49 # Wilson score confidence interval for accuracy 

50 accuracy_ci = wilson_score_interval(correct_count, total_graded) 

51 

52 # Calculate average processing time if available 

53 processing_times = [ 

54 r.get("processing_time", 0) for r in results if "processing_time" in r 

55 ] 

56 avg_time = ( 

57 sum(processing_times) / len(processing_times) if processing_times else 0 

58 ) 

59 

60 # Average confidence if available 

61 confidence_values = [] 

62 for r in results: 

63 if r.get("confidence"): 

64 try: 

65 confidence_values.append(int(r["confidence"])) 

66 except (ValueError, TypeError): 

67 pass 

68 

69 avg_confidence = ( 

70 sum(confidence_values) / len(confidence_values) 

71 if confidence_values 

72 else 0 

73 ) 

74 

75 # Calculate error rate 

76 error_count = sum(1 for r in results if "error" in r) 

77 error_rate = error_count / len(results) if results else 0 

78 

79 # Basic metrics 

80 metrics = { 

81 "total_examples": len(results), 

82 "graded_examples": total_graded, 

83 "correct": correct_count, 

84 "accuracy": accuracy, 

85 "accuracy_ci": accuracy_ci, 

86 "average_processing_time": avg_time, 

87 "average_confidence": avg_confidence, 

88 "error_count": error_count, 

89 "error_rate": error_rate, 

90 "timestamp": datetime.now(UTC).isoformat(), 

91 } 

92 

93 # If we have category information, calculate per-category metrics 

94 categories = {} 

95 for r in graded_results: 

96 if "category" in r: 

97 category = r["category"] 

98 if category not in categories: 

99 categories[category] = {"total": 0, "correct": 0} 

100 categories[category]["total"] += 1 

101 if r.get("is_correct", False): 

102 categories[category]["correct"] += 1 

103 

104 if categories: 

105 category_metrics = {} 

106 for category, counts in categories.items(): 

107 cat_accuracy = ( 

108 counts["correct"] / counts["total"] if counts["total"] else 0 

109 ) 

110 category_metrics[category] = { 

111 "total": counts["total"], 

112 "correct": counts["correct"], 

113 "accuracy": cat_accuracy, 

114 "accuracy_ci": wilson_score_interval( 

115 counts["correct"], counts["total"] 

116 ), 

117 } 

118 metrics["categories"] = category_metrics 

119 

120 return metrics 

121 

122 

123def evaluate_benchmark_quality( 

124 system_config: Dict[str, Any], 

125 num_examples: int = 10, 

126 output_dir: Optional[str] = None, 

127) -> Dict[str, Any]: 

128 """ 

129 Evaluate quality using SimpleQA benchmark. 

130 

131 Args: 

132 system_config: Configuration parameters to evaluate 

133 num_examples: Number of benchmark examples to use 

134 output_dir: Directory to save results (temporary if None) 

135 

136 Returns: 

137 Dictionary with benchmark metrics 

138 """ 

139 from ..runners import run_simpleqa_benchmark 

140 

141 # Create temporary directory if not provided 

142 temp_dir = None 

143 if output_dir is None: 

144 temp_dir = tempfile.mkdtemp(prefix="ldr_benchmark_") 

145 output_dir = temp_dir 

146 

147 try: 

148 # Create search configuration from system config 

149 search_config = { 

150 "iterations": system_config.get("iterations", 2), 

151 "questions_per_iteration": system_config.get( 

152 "questions_per_iteration", 2 

153 ), 

154 "search_strategy": system_config.get("search_strategy", "iterdrag"), 

155 "search_tool": system_config.get("search_tool", "searxng"), 

156 "model_name": system_config.get("model_name"), 

157 "provider": system_config.get("provider"), 

158 } 

159 

160 # Run benchmark 

161 logger.info(f"Running SimpleQA benchmark with {num_examples} examples") 

162 benchmark_results = run_simpleqa_benchmark( 

163 num_examples=num_examples, 

164 output_dir=output_dir, 

165 search_config=search_config, 

166 run_evaluation=True, 

167 ) 

168 

169 # Extract key metrics 

170 metrics = benchmark_results.get("metrics", {}) 

171 accuracy = metrics.get("accuracy", 0.0) 

172 

173 # Return only the most relevant metrics 

174 return { 

175 "accuracy": accuracy, 

176 "quality_score": accuracy, # Map accuracy directly to quality score 

177 } 

178 

179 except Exception as e: 

180 logger.exception("Error in benchmark evaluation") 

181 return {"accuracy": 0.0, "quality_score": 0.0, "error": str(e)} 

182 

183 finally: 

184 # Clean up temporary directory if we created it 

185 if temp_dir and Path(temp_dir).exists(): 

186 import shutil 

187 

188 try: 

189 shutil.rmtree(temp_dir) 

190 except Exception: 

191 logger.warning("Failed to clean up temporary directory") 

192 

193 

194def measure_execution_time( 

195 system_config: Dict[str, Any], 

196 query: str = "test query", 

197 search_tool: Optional[str] = None, 

198 num_runs: int = 1, 

199) -> Dict[str, Any]: 

200 """ 

201 Measure execution time for a given configuration. 

202 

203 Args: 

204 system_config: Configuration parameters to evaluate 

205 query: Query to use for timing tests 

206 search_tool: Override search tool 

207 num_runs: Number of runs to average time over 

208 

209 Returns: 

210 Dictionary with speed metrics 

211 """ 

212 from local_deep_research.search_system import AdvancedSearchSystem 

213 from local_deep_research.config.llm_config import get_llm 

214 from local_deep_research.config.search_config import get_search 

215 

216 if search_tool: 

217 system_config["search_tool"] = search_tool 

218 

219 # Configure system — pre-initialize so finally can clean up partial init 

220 llm = None 

221 search_engine = None 

222 system = None 

223 

224 try: 

225 llm = get_llm() 

226 search_engine = get_search( 

227 system_config.get("search_tool", "searxng"), 

228 llm_instance=llm, 

229 ) 

230 system = AdvancedSearchSystem( 

231 llm=llm, 

232 search=search_engine, 

233 max_iterations=system_config.get("iterations", 2), 

234 questions_per_iteration=system_config.get( 

235 "questions_per_iteration", 2 

236 ), 

237 strategy_name=system_config.get("search_strategy", "iterdrag"), 

238 ) 

239 

240 # Run multiple times and calculate average 

241 total_time = 0.0 

242 times: list[float] = [] 

243 

244 for i in range(num_runs): 

245 logger.info(f"Executing speed test run {i + 1}/{num_runs}") 

246 start_time = time.time() 

247 system.search(query, full_response=False) 

248 end_time = time.time() 

249 run_time = end_time - start_time 

250 times.append(run_time) 

251 total_time += run_time 

252 

253 # Calculate metrics 

254 average_time = total_time / num_runs 

255 

256 # Calculate speed score (0-1 scale, lower times are better) 

257 # Using sigmoid-like normalization where: 

258 # - Times around 30s get ~0.5 score 

259 # - Times under 10s get >0.8 score 

260 # - Times over 2min get <0.2 score 

261 speed_score = 1.0 / (1.0 + (average_time / 30.0)) 

262 

263 return { 

264 "average_time": average_time, 

265 "min_time": min(times), 

266 "max_time": max(times), 

267 "speed_score": speed_score, 

268 } 

269 

270 except Exception as e: 

271 logger.exception("Error in speed measurement") 

272 return {"average_time": 0.0, "speed_score": 0.0, "error": str(e)} 

273 finally: 

274 from local_deep_research.utilities.resource_utils import safe_close 

275 

276 safe_close(system, "benchmark search system", allow_none=True) 

277 safe_close(search_engine, "benchmark search engine", allow_none=True) 

278 safe_close(llm, "benchmark LLM", allow_none=True) 

279 

280 

281def calculate_quality_metrics( 

282 system_config: Dict[str, Any], 

283 num_examples: int = 2, # Reduced for quicker demo 

284 output_dir: Optional[str] = None, 

285) -> Dict[str, Any]: 

286 """ 

287 Calculate quality-related metrics for a configuration. 

288 

289 Args: 

290 system_config: Configuration parameters to evaluate 

291 num_examples: Number of benchmark examples to use 

292 output_dir: Directory to save results (temporary if None) 

293 

294 Returns: 

295 Dictionary with quality metrics 

296 """ 

297 # Run quality evaluation 

298 quality_results = evaluate_benchmark_quality( 

299 system_config=system_config, 

300 num_examples=num_examples, 

301 output_dir=output_dir, 

302 ) 

303 

304 # Return normalized quality score 

305 return { 

306 "quality_score": quality_results.get("quality_score", 0.0), 

307 "accuracy": quality_results.get("accuracy", 0.0), 

308 } 

309 

310 

311def calculate_speed_metrics( 

312 system_config: Dict[str, Any], 

313 query: str = "test query", 

314 search_tool: Optional[str] = None, 

315 num_runs: int = 1, 

316) -> Dict[str, Any]: 

317 """ 

318 Calculate speed-related metrics for a configuration. 

319 

320 Args: 

321 system_config: Configuration parameters to evaluate 

322 query: Query to use for timing tests 

323 search_tool: Override search tool 

324 num_runs: Number of runs to average time over 

325 

326 Returns: 

327 Dictionary with speed metrics 

328 """ 

329 # Run speed measurement 

330 speed_results = measure_execution_time( 

331 system_config=system_config, 

332 query=query, 

333 search_tool=search_tool, 

334 num_runs=num_runs, 

335 ) 

336 

337 # Return normalized speed score 

338 return { 

339 "speed_score": speed_results.get("speed_score", 0.0), 

340 "average_time": speed_results.get("average_time", 0.0), 

341 } 

342 

343 

344def calculate_resource_metrics( 

345 system_config: Dict[str, Any], 

346 query: str = "test query", 

347 search_tool: Optional[str] = None, 

348) -> Dict[str, Any]: 

349 """ 

350 Calculate resource usage metrics for a configuration. 

351 

352 Args: 

353 system_config: Configuration parameters to evaluate 

354 query: Query to use for resource tests 

355 search_tool: Override search tool 

356 

357 Returns: 

358 Dictionary with resource metrics 

359 """ 

360 # This is a simplified version - in a real implementation, 

361 # you would measure memory usage, API call counts, etc. 

362 

363 # For now, we'll use a heuristic based on configuration values 

364 iterations = system_config.get("iterations", 2) 

365 questions = system_config.get("questions_per_iteration", 2) 

366 max_results = system_config.get("max_results", 50) 

367 

368 # Simple heuristic: more iterations, questions, and results = more resources 

369 complexity = iterations * questions * (max_results / 50) 

370 

371 # Normalize to 0-1 scale (lower is better) 

372 resource_score = 1.0 / (1.0 + (complexity / 4.0)) 

373 

374 return { 

375 "resource_score": resource_score, 

376 "estimated_complexity": complexity, 

377 } 

378 

379 

380def calculate_combined_score( 

381 metrics: Dict[str, Dict[str, float]], 

382 weights: Optional[Dict[str, float]] = None, 

383) -> float: 

384 """ 

385 Calculate a combined optimization score from multiple metrics. 

386 

387 Args: 

388 metrics: Dictionary of metric categories and their values 

389 weights: Dictionary of weights for each metric category 

390 

391 Returns: 

392 Combined score between 0 and 1 

393 """ 

394 # Default weights if not provided 

395 if weights is None: 

396 weights = {"quality": 0.6, "speed": 0.3, "resource": 0.1} 

397 

398 # Normalize weights to sum to 1 

399 total_weight = sum(weights.values()) 

400 if total_weight == 0: 

401 return 0.0 

402 

403 norm_weights = {k: v / total_weight for k, v in weights.items()} 

404 

405 # Calculate weighted score 

406 score = 0.0 

407 

408 # Quality component 

409 if "quality" in metrics and "quality" in norm_weights: 

410 quality_score = metrics["quality"].get("quality_score", 0.0) 

411 score += quality_score * norm_weights["quality"] 

412 

413 # Speed component 

414 if "speed" in metrics and "speed" in norm_weights: 

415 speed_score = metrics["speed"].get("speed_score", 0.0) 

416 score += speed_score * norm_weights["speed"] 

417 

418 # Resource component 

419 if "resource" in metrics and "resource" in norm_weights: 

420 resource_score = metrics["resource"].get("resource_score", 0.0) 

421 score += resource_score * norm_weights["resource"] 

422 

423 return score