Coverage for src / local_deep_research / benchmarks / metrics / calculation.py: 99%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Unified metrics calculation module. 

3 

4This module provides functions for calculating metrics for both 

5standard benchmarks and optimization tasks. 

6""" 

7 

8import json 

9from loguru import logger 

10from pathlib import Path 

11import tempfile 

12import time 

13from datetime import datetime, UTC 

14from typing import Any, Dict, Optional 

15 

16 

17def calculate_metrics(results_file: str) -> Dict[str, Any]: 

18 """ 

19 Calculate evaluation metrics from results. 

20 

21 Args: 

22 results_file: Path to results file 

23 

24 Returns: 

25 Dictionary of metrics 

26 """ 

27 # Load results 

28 results = [] 

29 try: 

30 with open(results_file, "r") as f: 

31 for line in f: 

32 if line.strip(): 

33 results.append(json.loads(line)) 

34 except Exception as e: 

35 logger.exception("Error loading results file") 

36 return {"error": str(e)} 

37 

38 if not results: 

39 return {"error": "No results found"} 

40 

41 # Calculate accuracy 

42 graded_results = [r for r in results if "is_correct" in r] 

43 correct_count = sum(1 for r in graded_results if r.get("is_correct", False)) 

44 total_graded = len(graded_results) 

45 accuracy = correct_count / total_graded if total_graded else 0 

46 

47 # Calculate average processing time if available 

48 processing_times = [ 

49 r.get("processing_time", 0) for r in results if "processing_time" in r 

50 ] 

51 avg_time = ( 

52 sum(processing_times) / len(processing_times) if processing_times else 0 

53 ) 

54 

55 # Average confidence if available 

56 confidence_values = [] 

57 for r in results: 

58 if r.get("confidence"): 

59 try: 

60 confidence_values.append(int(r["confidence"])) 

61 except (ValueError, TypeError): 

62 pass 

63 

64 avg_confidence = ( 

65 sum(confidence_values) / len(confidence_values) 

66 if confidence_values 

67 else 0 

68 ) 

69 

70 # Calculate error rate 

71 error_count = sum(1 for r in results if "error" in r) 

72 error_rate = error_count / len(results) if results else 0 

73 

74 # Basic metrics 

75 metrics = { 

76 "total_examples": len(results), 

77 "graded_examples": total_graded, 

78 "correct": correct_count, 

79 "accuracy": accuracy, 

80 "average_processing_time": avg_time, 

81 "average_confidence": avg_confidence, 

82 "error_count": error_count, 

83 "error_rate": error_rate, 

84 "timestamp": datetime.now(UTC).isoformat(), 

85 } 

86 

87 # If we have category information, calculate per-category metrics 

88 categories = {} 

89 for r in graded_results: 

90 if "category" in r: 

91 category = r["category"] 

92 if category not in categories: 

93 categories[category] = {"total": 0, "correct": 0} 

94 categories[category]["total"] += 1 

95 if r.get("is_correct", False): 

96 categories[category]["correct"] += 1 

97 

98 if categories: 

99 category_metrics = {} 

100 for category, counts in categories.items(): 

101 category_metrics[category] = { 

102 "total": counts["total"], 

103 "correct": counts["correct"], 

104 "accuracy": ( 

105 counts["correct"] / counts["total"] 

106 if counts["total"] 

107 else 0 

108 ), 

109 } 

110 metrics["categories"] = category_metrics 

111 

112 return metrics 

113 

114 

115def evaluate_benchmark_quality( 

116 system_config: Dict[str, Any], 

117 num_examples: int = 10, 

118 output_dir: Optional[str] = None, 

119) -> Dict[str, Any]: 

120 """ 

121 Evaluate quality using SimpleQA benchmark. 

122 

123 Args: 

124 system_config: Configuration parameters to evaluate 

125 num_examples: Number of benchmark examples to use 

126 output_dir: Directory to save results (temporary if None) 

127 

128 Returns: 

129 Dictionary with benchmark metrics 

130 """ 

131 from ..runners import run_simpleqa_benchmark 

132 

133 # Create temporary directory if not provided 

134 temp_dir = None 

135 if output_dir is None: 

136 temp_dir = tempfile.mkdtemp(prefix="ldr_benchmark_") 

137 output_dir = temp_dir 

138 

139 try: 

140 # Create search configuration from system config 

141 search_config = { 

142 "iterations": system_config.get("iterations", 2), 

143 "questions_per_iteration": system_config.get( 

144 "questions_per_iteration", 2 

145 ), 

146 "search_strategy": system_config.get("search_strategy", "iterdrag"), 

147 "search_tool": system_config.get("search_tool", "searxng"), 

148 "model_name": system_config.get("model_name"), 

149 "provider": system_config.get("provider"), 

150 } 

151 

152 # Run benchmark 

153 logger.info(f"Running SimpleQA benchmark with {num_examples} examples") 

154 benchmark_results = run_simpleqa_benchmark( 

155 num_examples=num_examples, 

156 output_dir=output_dir, 

157 search_config=search_config, 

158 run_evaluation=True, 

159 ) 

160 

161 # Extract key metrics 

162 metrics = benchmark_results.get("metrics", {}) 

163 accuracy = metrics.get("accuracy", 0.0) 

164 

165 # Return only the most relevant metrics 

166 return { 

167 "accuracy": accuracy, 

168 "quality_score": accuracy, # Map accuracy directly to quality score 

169 } 

170 

171 except Exception as e: 

172 logger.exception("Error in benchmark evaluation") 

173 return {"accuracy": 0.0, "quality_score": 0.0, "error": str(e)} 

174 

175 finally: 

176 # Clean up temporary directory if we created it 

177 if temp_dir and Path(temp_dir).exists(): 

178 import shutil 

179 

180 try: 

181 shutil.rmtree(temp_dir) 

182 except Exception: 

183 logger.warning("Failed to clean up temporary directory") 

184 

185 

186def measure_execution_time( 

187 system_config: Dict[str, Any], 

188 query: str = "test query", 

189 search_tool: Optional[str] = None, 

190 num_runs: int = 1, 

191) -> Dict[str, Any]: 

192 """ 

193 Measure execution time for a given configuration. 

194 

195 Args: 

196 system_config: Configuration parameters to evaluate 

197 query: Query to use for timing tests 

198 search_tool: Override search tool 

199 num_runs: Number of runs to average time over 

200 

201 Returns: 

202 Dictionary with speed metrics 

203 """ 

204 from local_deep_research.search_system import AdvancedSearchSystem 

205 from local_deep_research.config.llm_config import get_llm 

206 from local_deep_research.config.search_config import get_search 

207 

208 if search_tool: 

209 system_config["search_tool"] = search_tool 

210 

211 # Configure system — pre-initialize so finally can clean up partial init 

212 llm = None 

213 search_engine = None 

214 system = None 

215 

216 try: 

217 llm = get_llm() 

218 search_engine = get_search( 

219 system_config.get("search_tool", "searxng"), 

220 llm_instance=llm, 

221 ) 

222 system = AdvancedSearchSystem( 

223 llm=llm, 

224 search=search_engine, 

225 max_iterations=system_config.get("iterations", 2), 

226 questions_per_iteration=system_config.get( 

227 "questions_per_iteration", 2 

228 ), 

229 strategy_name=system_config.get("search_strategy", "iterdrag"), 

230 ) 

231 

232 # Run multiple times and calculate average 

233 total_time = 0.0 

234 times: list[float] = [] 

235 

236 for i in range(num_runs): 

237 logger.info(f"Executing speed test run {i + 1}/{num_runs}") 

238 start_time = time.time() 

239 system.search(query, full_response=False) 

240 end_time = time.time() 

241 run_time = end_time - start_time 

242 times.append(run_time) 

243 total_time += run_time 

244 

245 # Calculate metrics 

246 average_time = total_time / num_runs 

247 

248 # Calculate speed score (0-1 scale, lower times are better) 

249 # Using sigmoid-like normalization where: 

250 # - Times around 30s get ~0.5 score 

251 # - Times under 10s get >0.8 score 

252 # - Times over 2min get <0.2 score 

253 speed_score = 1.0 / (1.0 + (average_time / 30.0)) 

254 

255 return { 

256 "average_time": average_time, 

257 "min_time": min(times), 

258 "max_time": max(times), 

259 "speed_score": speed_score, 

260 } 

261 

262 except Exception as e: 

263 logger.exception("Error in speed measurement") 

264 return {"average_time": 0.0, "speed_score": 0.0, "error": str(e)} 

265 finally: 

266 from local_deep_research.utilities.resource_utils import safe_close 

267 

268 safe_close(system, "benchmark search system", allow_none=True) 

269 safe_close(search_engine, "benchmark search engine", allow_none=True) 

270 safe_close(llm, "benchmark LLM", allow_none=True) 

271 

272 

273def calculate_quality_metrics( 

274 system_config: Dict[str, Any], 

275 num_examples: int = 2, # Reduced for quicker demo 

276 output_dir: Optional[str] = None, 

277) -> Dict[str, Any]: 

278 """ 

279 Calculate quality-related metrics for a configuration. 

280 

281 Args: 

282 system_config: Configuration parameters to evaluate 

283 num_examples: Number of benchmark examples to use 

284 output_dir: Directory to save results (temporary if None) 

285 

286 Returns: 

287 Dictionary with quality metrics 

288 """ 

289 # Run quality evaluation 

290 quality_results = evaluate_benchmark_quality( 

291 system_config=system_config, 

292 num_examples=num_examples, 

293 output_dir=output_dir, 

294 ) 

295 

296 # Return normalized quality score 

297 return { 

298 "quality_score": quality_results.get("quality_score", 0.0), 

299 "accuracy": quality_results.get("accuracy", 0.0), 

300 } 

301 

302 

303def calculate_speed_metrics( 

304 system_config: Dict[str, Any], 

305 query: str = "test query", 

306 search_tool: Optional[str] = None, 

307 num_runs: int = 1, 

308) -> Dict[str, Any]: 

309 """ 

310 Calculate speed-related metrics for a configuration. 

311 

312 Args: 

313 system_config: Configuration parameters to evaluate 

314 query: Query to use for timing tests 

315 search_tool: Override search tool 

316 num_runs: Number of runs to average time over 

317 

318 Returns: 

319 Dictionary with speed metrics 

320 """ 

321 # Run speed measurement 

322 speed_results = measure_execution_time( 

323 system_config=system_config, 

324 query=query, 

325 search_tool=search_tool, 

326 num_runs=num_runs, 

327 ) 

328 

329 # Return normalized speed score 

330 return { 

331 "speed_score": speed_results.get("speed_score", 0.0), 

332 "average_time": speed_results.get("average_time", 0.0), 

333 } 

334 

335 

336def calculate_resource_metrics( 

337 system_config: Dict[str, Any], 

338 query: str = "test query", 

339 search_tool: Optional[str] = None, 

340) -> Dict[str, Any]: 

341 """ 

342 Calculate resource usage metrics for a configuration. 

343 

344 Args: 

345 system_config: Configuration parameters to evaluate 

346 query: Query to use for resource tests 

347 search_tool: Override search tool 

348 

349 Returns: 

350 Dictionary with resource metrics 

351 """ 

352 # This is a simplified version - in a real implementation, 

353 # you would measure memory usage, API call counts, etc. 

354 

355 # For now, we'll use a heuristic based on configuration values 

356 iterations = system_config.get("iterations", 2) 

357 questions = system_config.get("questions_per_iteration", 2) 

358 max_results = system_config.get("max_results", 50) 

359 

360 # Simple heuristic: more iterations, questions, and results = more resources 

361 complexity = iterations * questions * (max_results / 50) 

362 

363 # Normalize to 0-1 scale (lower is better) 

364 resource_score = 1.0 / (1.0 + (complexity / 4.0)) 

365 

366 return { 

367 "resource_score": resource_score, 

368 "estimated_complexity": complexity, 

369 } 

370 

371 

372def calculate_combined_score( 

373 metrics: Dict[str, Dict[str, float]], 

374 weights: Optional[Dict[str, float]] = None, 

375) -> float: 

376 """ 

377 Calculate a combined optimization score from multiple metrics. 

378 

379 Args: 

380 metrics: Dictionary of metric categories and their values 

381 weights: Dictionary of weights for each metric category 

382 

383 Returns: 

384 Combined score between 0 and 1 

385 """ 

386 # Default weights if not provided 

387 if weights is None: 

388 weights = {"quality": 0.6, "speed": 0.3, "resource": 0.1} 

389 

390 # Normalize weights to sum to 1 

391 total_weight = sum(weights.values()) 

392 if total_weight == 0: 

393 return 0.0 

394 

395 norm_weights = {k: v / total_weight for k, v in weights.items()} 

396 

397 # Calculate weighted score 

398 score = 0.0 

399 

400 # Quality component 

401 if "quality" in metrics and "quality" in norm_weights: 

402 quality_score = metrics["quality"].get("quality_score", 0.0) 

403 score += quality_score * norm_weights["quality"] 

404 

405 # Speed component 

406 if "speed" in metrics and "speed" in norm_weights: 

407 speed_score = metrics["speed"].get("speed_score", 0.0) 

408 score += speed_score * norm_weights["speed"] 

409 

410 # Resource component 

411 if "resource" in metrics and "resource" in norm_weights: 

412 resource_score = metrics["resource"].get("resource_score", 0.0) 

413 score += resource_score * norm_weights["resource"] 

414 

415 return score