Coverage for src/local_deep_research/benchmarks/metrics/calculation.py: 99%
140 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Unified metrics calculation module.
4This module provides functions for calculating metrics for both
5standard benchmarks and optimization tasks.
6"""
8import json
9from loguru import logger
10from pathlib import Path
11import tempfile
12import time
13from datetime import datetime, UTC
14from typing import Any, Dict, Optional
17def calculate_metrics(results_file: str) -> Dict[str, Any]:
18 """
19 Calculate evaluation metrics from results.
21 Args:
22 results_file: Path to results file
24 Returns:
25 Dictionary of metrics
26 """
27 # Load results
28 results = []
29 try:
30 with open(results_file, "r", encoding="utf-8") as f:
31 for line in f:
32 if line.strip():
33 results.append(json.loads(line))
34 except Exception as e:
35 logger.exception("Error loading results file")
36 return {"error": str(e)}
38 if not results:
39 return {"error": "No results found"}
41 from .statistics import wilson_score_interval
43 # Calculate accuracy
44 graded_results = [r for r in results if "is_correct" in r]
45 correct_count = sum(1 for r in graded_results if r.get("is_correct", False))
46 total_graded = len(graded_results)
47 accuracy = correct_count / total_graded if total_graded else 0
49 # Wilson score confidence interval for accuracy
50 accuracy_ci = wilson_score_interval(correct_count, total_graded)
52 # Calculate average processing time if available
53 processing_times = [
54 r.get("processing_time", 0) for r in results if "processing_time" in r
55 ]
56 avg_time = (
57 sum(processing_times) / len(processing_times) if processing_times else 0
58 )
60 # Average confidence if available
61 confidence_values = []
62 for r in results:
63 if r.get("confidence"):
64 try:
65 confidence_values.append(int(r["confidence"]))
66 except (ValueError, TypeError):
67 pass
69 avg_confidence = (
70 sum(confidence_values) / len(confidence_values)
71 if confidence_values
72 else 0
73 )
75 # Calculate error rate
76 error_count = sum(1 for r in results if "error" in r)
77 error_rate = error_count / len(results) if results else 0
79 # Basic metrics
80 metrics = {
81 "total_examples": len(results),
82 "graded_examples": total_graded,
83 "correct": correct_count,
84 "accuracy": accuracy,
85 "accuracy_ci": accuracy_ci,
86 "average_processing_time": avg_time,
87 "average_confidence": avg_confidence,
88 "error_count": error_count,
89 "error_rate": error_rate,
90 "timestamp": datetime.now(UTC).isoformat(),
91 }
93 # If we have category information, calculate per-category metrics
94 categories = {}
95 for r in graded_results:
96 if "category" in r:
97 category = r["category"]
98 if category not in categories:
99 categories[category] = {"total": 0, "correct": 0}
100 categories[category]["total"] += 1
101 if r.get("is_correct", False):
102 categories[category]["correct"] += 1
104 if categories:
105 category_metrics = {}
106 for category, counts in categories.items():
107 cat_accuracy = (
108 counts["correct"] / counts["total"] if counts["total"] else 0
109 )
110 category_metrics[category] = {
111 "total": counts["total"],
112 "correct": counts["correct"],
113 "accuracy": cat_accuracy,
114 "accuracy_ci": wilson_score_interval(
115 counts["correct"], counts["total"]
116 ),
117 }
118 metrics["categories"] = category_metrics
120 return metrics
123def evaluate_benchmark_quality(
124 system_config: Dict[str, Any],
125 num_examples: int = 10,
126 output_dir: Optional[str] = None,
127) -> Dict[str, Any]:
128 """
129 Evaluate quality using SimpleQA benchmark.
131 Args:
132 system_config: Configuration parameters to evaluate
133 num_examples: Number of benchmark examples to use
134 output_dir: Directory to save results (temporary if None)
136 Returns:
137 Dictionary with benchmark metrics
138 """
139 from ..runners import run_simpleqa_benchmark
141 # Create temporary directory if not provided
142 temp_dir = None
143 if output_dir is None:
144 temp_dir = tempfile.mkdtemp(prefix="ldr_benchmark_")
145 output_dir = temp_dir
147 try:
148 # Create search configuration from system config
149 search_config = {
150 "iterations": system_config.get("iterations", 2),
151 "questions_per_iteration": system_config.get(
152 "questions_per_iteration", 2
153 ),
154 "search_strategy": system_config.get("search_strategy", "iterdrag"),
155 "search_tool": system_config.get("search_tool", "searxng"),
156 "model_name": system_config.get("model_name"),
157 "provider": system_config.get("provider"),
158 }
160 # Run benchmark
161 logger.info(f"Running SimpleQA benchmark with {num_examples} examples")
162 benchmark_results = run_simpleqa_benchmark(
163 num_examples=num_examples,
164 output_dir=output_dir,
165 search_config=search_config,
166 run_evaluation=True,
167 )
169 # Extract key metrics
170 metrics = benchmark_results.get("metrics", {})
171 accuracy = metrics.get("accuracy", 0.0)
173 # Return only the most relevant metrics
174 return {
175 "accuracy": accuracy,
176 "quality_score": accuracy, # Map accuracy directly to quality score
177 }
179 except Exception as e:
180 logger.exception("Error in benchmark evaluation")
181 return {"accuracy": 0.0, "quality_score": 0.0, "error": str(e)}
183 finally:
184 # Clean up temporary directory if we created it
185 if temp_dir and Path(temp_dir).exists():
186 import shutil
188 try:
189 shutil.rmtree(temp_dir)
190 except Exception:
191 logger.warning("Failed to clean up temporary directory")
194def measure_execution_time(
195 system_config: Dict[str, Any],
196 query: str = "test query",
197 search_tool: Optional[str] = None,
198 num_runs: int = 1,
199) -> Dict[str, Any]:
200 """
201 Measure execution time for a given configuration.
203 Args:
204 system_config: Configuration parameters to evaluate
205 query: Query to use for timing tests
206 search_tool: Override search tool
207 num_runs: Number of runs to average time over
209 Returns:
210 Dictionary with speed metrics
211 """
212 from local_deep_research.search_system import AdvancedSearchSystem
213 from local_deep_research.config.llm_config import get_llm
214 from local_deep_research.config.search_config import get_search
216 if search_tool:
217 system_config["search_tool"] = search_tool
219 # Configure system — pre-initialize so finally can clean up partial init
220 llm = None
221 search_engine = None
222 system = None
224 try:
225 llm = get_llm()
226 search_engine = get_search(
227 system_config.get("search_tool", "searxng"),
228 llm_instance=llm,
229 )
230 system = AdvancedSearchSystem(
231 llm=llm,
232 search=search_engine,
233 max_iterations=system_config.get("iterations", 2),
234 questions_per_iteration=system_config.get(
235 "questions_per_iteration", 2
236 ),
237 strategy_name=system_config.get("search_strategy", "iterdrag"),
238 )
240 # Run multiple times and calculate average
241 total_time = 0.0
242 times: list[float] = []
244 for i in range(num_runs):
245 logger.info(f"Executing speed test run {i + 1}/{num_runs}")
246 start_time = time.time()
247 system.search(query, full_response=False)
248 end_time = time.time()
249 run_time = end_time - start_time
250 times.append(run_time)
251 total_time += run_time
253 # Calculate metrics
254 average_time = total_time / num_runs
256 # Calculate speed score (0-1 scale, lower times are better)
257 # Using sigmoid-like normalization where:
258 # - Times around 30s get ~0.5 score
259 # - Times under 10s get >0.8 score
260 # - Times over 2min get <0.2 score
261 speed_score = 1.0 / (1.0 + (average_time / 30.0))
263 return {
264 "average_time": average_time,
265 "min_time": min(times),
266 "max_time": max(times),
267 "speed_score": speed_score,
268 }
270 except Exception as e:
271 logger.exception("Error in speed measurement")
272 return {"average_time": 0.0, "speed_score": 0.0, "error": str(e)}
273 finally:
274 from local_deep_research.utilities.resource_utils import safe_close
276 safe_close(system, "benchmark search system", allow_none=True)
277 safe_close(search_engine, "benchmark search engine", allow_none=True)
278 safe_close(llm, "benchmark LLM", allow_none=True)
281def calculate_quality_metrics(
282 system_config: Dict[str, Any],
283 num_examples: int = 2, # Reduced for quicker demo
284 output_dir: Optional[str] = None,
285) -> Dict[str, Any]:
286 """
287 Calculate quality-related metrics for a configuration.
289 Args:
290 system_config: Configuration parameters to evaluate
291 num_examples: Number of benchmark examples to use
292 output_dir: Directory to save results (temporary if None)
294 Returns:
295 Dictionary with quality metrics
296 """
297 # Run quality evaluation
298 quality_results = evaluate_benchmark_quality(
299 system_config=system_config,
300 num_examples=num_examples,
301 output_dir=output_dir,
302 )
304 # Return normalized quality score
305 return {
306 "quality_score": quality_results.get("quality_score", 0.0),
307 "accuracy": quality_results.get("accuracy", 0.0),
308 }
311def calculate_speed_metrics(
312 system_config: Dict[str, Any],
313 query: str = "test query",
314 search_tool: Optional[str] = None,
315 num_runs: int = 1,
316) -> Dict[str, Any]:
317 """
318 Calculate speed-related metrics for a configuration.
320 Args:
321 system_config: Configuration parameters to evaluate
322 query: Query to use for timing tests
323 search_tool: Override search tool
324 num_runs: Number of runs to average time over
326 Returns:
327 Dictionary with speed metrics
328 """
329 # Run speed measurement
330 speed_results = measure_execution_time(
331 system_config=system_config,
332 query=query,
333 search_tool=search_tool,
334 num_runs=num_runs,
335 )
337 # Return normalized speed score
338 return {
339 "speed_score": speed_results.get("speed_score", 0.0),
340 "average_time": speed_results.get("average_time", 0.0),
341 }
344def calculate_resource_metrics(
345 system_config: Dict[str, Any],
346 query: str = "test query",
347 search_tool: Optional[str] = None,
348) -> Dict[str, Any]:
349 """
350 Calculate resource usage metrics for a configuration.
352 Args:
353 system_config: Configuration parameters to evaluate
354 query: Query to use for resource tests
355 search_tool: Override search tool
357 Returns:
358 Dictionary with resource metrics
359 """
360 # This is a simplified version - in a real implementation,
361 # you would measure memory usage, API call counts, etc.
363 # For now, we'll use a heuristic based on configuration values
364 iterations = system_config.get("iterations", 2)
365 questions = system_config.get("questions_per_iteration", 2)
366 max_results = system_config.get("max_results", 50)
368 # Simple heuristic: more iterations, questions, and results = more resources
369 complexity = iterations * questions * (max_results / 50)
371 # Normalize to 0-1 scale (lower is better)
372 resource_score = 1.0 / (1.0 + (complexity / 4.0))
374 return {
375 "resource_score": resource_score,
376 "estimated_complexity": complexity,
377 }
380def calculate_combined_score(
381 metrics: Dict[str, Dict[str, float]],
382 weights: Optional[Dict[str, float]] = None,
383) -> float:
384 """
385 Calculate a combined optimization score from multiple metrics.
387 Args:
388 metrics: Dictionary of metric categories and their values
389 weights: Dictionary of weights for each metric category
391 Returns:
392 Combined score between 0 and 1
393 """
394 # Default weights if not provided
395 if weights is None:
396 weights = {"quality": 0.6, "speed": 0.3, "resource": 0.1}
398 # Normalize weights to sum to 1
399 total_weight = sum(weights.values())
400 if total_weight == 0:
401 return 0.0
403 norm_weights = {k: v / total_weight for k, v in weights.items()}
405 # Calculate weighted score
406 score = 0.0
408 # Quality component
409 if "quality" in metrics and "quality" in norm_weights:
410 quality_score = metrics["quality"].get("quality_score", 0.0)
411 score += quality_score * norm_weights["quality"]
413 # Speed component
414 if "speed" in metrics and "speed" in norm_weights:
415 speed_score = metrics["speed"].get("speed_score", 0.0)
416 score += speed_score * norm_weights["speed"]
418 # Resource component
419 if "resource" in metrics and "resource" in norm_weights:
420 resource_score = metrics["resource"].get("resource_score", 0.0)
421 score += resource_score * norm_weights["resource"]
423 return score