Coverage for src / local_deep_research / benchmarks / metrics / calculation.py: 99%
137 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Unified metrics calculation module.
4This module provides functions for calculating metrics for both
5standard benchmarks and optimization tasks.
6"""
8import json
9from loguru import logger
10from pathlib import Path
11import tempfile
12import time
13from datetime import datetime, UTC
14from typing import Any, Dict, Optional
17def calculate_metrics(results_file: str) -> Dict[str, Any]:
18 """
19 Calculate evaluation metrics from results.
21 Args:
22 results_file: Path to results file
24 Returns:
25 Dictionary of metrics
26 """
27 # Load results
28 results = []
29 try:
30 with open(results_file, "r") as f:
31 for line in f:
32 if line.strip():
33 results.append(json.loads(line))
34 except Exception as e:
35 logger.exception("Error loading results file")
36 return {"error": str(e)}
38 if not results:
39 return {"error": "No results found"}
41 # Calculate accuracy
42 graded_results = [r for r in results if "is_correct" in r]
43 correct_count = sum(1 for r in graded_results if r.get("is_correct", False))
44 total_graded = len(graded_results)
45 accuracy = correct_count / total_graded if total_graded else 0
47 # Calculate average processing time if available
48 processing_times = [
49 r.get("processing_time", 0) for r in results if "processing_time" in r
50 ]
51 avg_time = (
52 sum(processing_times) / len(processing_times) if processing_times else 0
53 )
55 # Average confidence if available
56 confidence_values = []
57 for r in results:
58 if r.get("confidence"):
59 try:
60 confidence_values.append(int(r["confidence"]))
61 except (ValueError, TypeError):
62 pass
64 avg_confidence = (
65 sum(confidence_values) / len(confidence_values)
66 if confidence_values
67 else 0
68 )
70 # Calculate error rate
71 error_count = sum(1 for r in results if "error" in r)
72 error_rate = error_count / len(results) if results else 0
74 # Basic metrics
75 metrics = {
76 "total_examples": len(results),
77 "graded_examples": total_graded,
78 "correct": correct_count,
79 "accuracy": accuracy,
80 "average_processing_time": avg_time,
81 "average_confidence": avg_confidence,
82 "error_count": error_count,
83 "error_rate": error_rate,
84 "timestamp": datetime.now(UTC).isoformat(),
85 }
87 # If we have category information, calculate per-category metrics
88 categories = {}
89 for r in graded_results:
90 if "category" in r:
91 category = r["category"]
92 if category not in categories:
93 categories[category] = {"total": 0, "correct": 0}
94 categories[category]["total"] += 1
95 if r.get("is_correct", False):
96 categories[category]["correct"] += 1
98 if categories:
99 category_metrics = {}
100 for category, counts in categories.items():
101 category_metrics[category] = {
102 "total": counts["total"],
103 "correct": counts["correct"],
104 "accuracy": (
105 counts["correct"] / counts["total"]
106 if counts["total"]
107 else 0
108 ),
109 }
110 metrics["categories"] = category_metrics
112 return metrics
115def evaluate_benchmark_quality(
116 system_config: Dict[str, Any],
117 num_examples: int = 10,
118 output_dir: Optional[str] = None,
119) -> Dict[str, Any]:
120 """
121 Evaluate quality using SimpleQA benchmark.
123 Args:
124 system_config: Configuration parameters to evaluate
125 num_examples: Number of benchmark examples to use
126 output_dir: Directory to save results (temporary if None)
128 Returns:
129 Dictionary with benchmark metrics
130 """
131 from ..runners import run_simpleqa_benchmark
133 # Create temporary directory if not provided
134 temp_dir = None
135 if output_dir is None:
136 temp_dir = tempfile.mkdtemp(prefix="ldr_benchmark_")
137 output_dir = temp_dir
139 try:
140 # Create search configuration from system config
141 search_config = {
142 "iterations": system_config.get("iterations", 2),
143 "questions_per_iteration": system_config.get(
144 "questions_per_iteration", 2
145 ),
146 "search_strategy": system_config.get("search_strategy", "iterdrag"),
147 "search_tool": system_config.get("search_tool", "searxng"),
148 "model_name": system_config.get("model_name"),
149 "provider": system_config.get("provider"),
150 }
152 # Run benchmark
153 logger.info(f"Running SimpleQA benchmark with {num_examples} examples")
154 benchmark_results = run_simpleqa_benchmark(
155 num_examples=num_examples,
156 output_dir=output_dir,
157 search_config=search_config,
158 run_evaluation=True,
159 )
161 # Extract key metrics
162 metrics = benchmark_results.get("metrics", {})
163 accuracy = metrics.get("accuracy", 0.0)
165 # Return only the most relevant metrics
166 return {
167 "accuracy": accuracy,
168 "quality_score": accuracy, # Map accuracy directly to quality score
169 }
171 except Exception as e:
172 logger.exception("Error in benchmark evaluation")
173 return {"accuracy": 0.0, "quality_score": 0.0, "error": str(e)}
175 finally:
176 # Clean up temporary directory if we created it
177 if temp_dir and Path(temp_dir).exists():
178 import shutil
180 try:
181 shutil.rmtree(temp_dir)
182 except Exception:
183 logger.warning("Failed to clean up temporary directory")
186def measure_execution_time(
187 system_config: Dict[str, Any],
188 query: str = "test query",
189 search_tool: Optional[str] = None,
190 num_runs: int = 1,
191) -> Dict[str, Any]:
192 """
193 Measure execution time for a given configuration.
195 Args:
196 system_config: Configuration parameters to evaluate
197 query: Query to use for timing tests
198 search_tool: Override search tool
199 num_runs: Number of runs to average time over
201 Returns:
202 Dictionary with speed metrics
203 """
204 from local_deep_research.search_system import AdvancedSearchSystem
205 from local_deep_research.config.llm_config import get_llm
206 from local_deep_research.config.search_config import get_search
208 if search_tool:
209 system_config["search_tool"] = search_tool
211 # Configure system — pre-initialize so finally can clean up partial init
212 llm = None
213 search_engine = None
214 system = None
216 try:
217 llm = get_llm()
218 search_engine = get_search(
219 system_config.get("search_tool", "searxng"),
220 llm_instance=llm,
221 )
222 system = AdvancedSearchSystem(
223 llm=llm,
224 search=search_engine,
225 max_iterations=system_config.get("iterations", 2),
226 questions_per_iteration=system_config.get(
227 "questions_per_iteration", 2
228 ),
229 strategy_name=system_config.get("search_strategy", "iterdrag"),
230 )
232 # Run multiple times and calculate average
233 total_time = 0.0
234 times: list[float] = []
236 for i in range(num_runs):
237 logger.info(f"Executing speed test run {i + 1}/{num_runs}")
238 start_time = time.time()
239 system.search(query, full_response=False)
240 end_time = time.time()
241 run_time = end_time - start_time
242 times.append(run_time)
243 total_time += run_time
245 # Calculate metrics
246 average_time = total_time / num_runs
248 # Calculate speed score (0-1 scale, lower times are better)
249 # Using sigmoid-like normalization where:
250 # - Times around 30s get ~0.5 score
251 # - Times under 10s get >0.8 score
252 # - Times over 2min get <0.2 score
253 speed_score = 1.0 / (1.0 + (average_time / 30.0))
255 return {
256 "average_time": average_time,
257 "min_time": min(times),
258 "max_time": max(times),
259 "speed_score": speed_score,
260 }
262 except Exception as e:
263 logger.exception("Error in speed measurement")
264 return {"average_time": 0.0, "speed_score": 0.0, "error": str(e)}
265 finally:
266 from local_deep_research.utilities.resource_utils import safe_close
268 safe_close(system, "benchmark search system", allow_none=True)
269 safe_close(search_engine, "benchmark search engine", allow_none=True)
270 safe_close(llm, "benchmark LLM", allow_none=True)
273def calculate_quality_metrics(
274 system_config: Dict[str, Any],
275 num_examples: int = 2, # Reduced for quicker demo
276 output_dir: Optional[str] = None,
277) -> Dict[str, Any]:
278 """
279 Calculate quality-related metrics for a configuration.
281 Args:
282 system_config: Configuration parameters to evaluate
283 num_examples: Number of benchmark examples to use
284 output_dir: Directory to save results (temporary if None)
286 Returns:
287 Dictionary with quality metrics
288 """
289 # Run quality evaluation
290 quality_results = evaluate_benchmark_quality(
291 system_config=system_config,
292 num_examples=num_examples,
293 output_dir=output_dir,
294 )
296 # Return normalized quality score
297 return {
298 "quality_score": quality_results.get("quality_score", 0.0),
299 "accuracy": quality_results.get("accuracy", 0.0),
300 }
303def calculate_speed_metrics(
304 system_config: Dict[str, Any],
305 query: str = "test query",
306 search_tool: Optional[str] = None,
307 num_runs: int = 1,
308) -> Dict[str, Any]:
309 """
310 Calculate speed-related metrics for a configuration.
312 Args:
313 system_config: Configuration parameters to evaluate
314 query: Query to use for timing tests
315 search_tool: Override search tool
316 num_runs: Number of runs to average time over
318 Returns:
319 Dictionary with speed metrics
320 """
321 # Run speed measurement
322 speed_results = measure_execution_time(
323 system_config=system_config,
324 query=query,
325 search_tool=search_tool,
326 num_runs=num_runs,
327 )
329 # Return normalized speed score
330 return {
331 "speed_score": speed_results.get("speed_score", 0.0),
332 "average_time": speed_results.get("average_time", 0.0),
333 }
336def calculate_resource_metrics(
337 system_config: Dict[str, Any],
338 query: str = "test query",
339 search_tool: Optional[str] = None,
340) -> Dict[str, Any]:
341 """
342 Calculate resource usage metrics for a configuration.
344 Args:
345 system_config: Configuration parameters to evaluate
346 query: Query to use for resource tests
347 search_tool: Override search tool
349 Returns:
350 Dictionary with resource metrics
351 """
352 # This is a simplified version - in a real implementation,
353 # you would measure memory usage, API call counts, etc.
355 # For now, we'll use a heuristic based on configuration values
356 iterations = system_config.get("iterations", 2)
357 questions = system_config.get("questions_per_iteration", 2)
358 max_results = system_config.get("max_results", 50)
360 # Simple heuristic: more iterations, questions, and results = more resources
361 complexity = iterations * questions * (max_results / 50)
363 # Normalize to 0-1 scale (lower is better)
364 resource_score = 1.0 / (1.0 + (complexity / 4.0))
366 return {
367 "resource_score": resource_score,
368 "estimated_complexity": complexity,
369 }
372def calculate_combined_score(
373 metrics: Dict[str, Dict[str, float]],
374 weights: Optional[Dict[str, float]] = None,
375) -> float:
376 """
377 Calculate a combined optimization score from multiple metrics.
379 Args:
380 metrics: Dictionary of metric categories and their values
381 weights: Dictionary of weights for each metric category
383 Returns:
384 Combined score between 0 and 1
385 """
386 # Default weights if not provided
387 if weights is None:
388 weights = {"quality": 0.6, "speed": 0.3, "resource": 0.1}
390 # Normalize weights to sum to 1
391 total_weight = sum(weights.values())
392 if total_weight == 0:
393 return 0.0
395 norm_weights = {k: v / total_weight for k, v in weights.items()}
397 # Calculate weighted score
398 score = 0.0
400 # Quality component
401 if "quality" in metrics and "quality" in norm_weights:
402 quality_score = metrics["quality"].get("quality_score", 0.0)
403 score += quality_score * norm_weights["quality"]
405 # Speed component
406 if "speed" in metrics and "speed" in norm_weights:
407 speed_score = metrics["speed"].get("speed_score", 0.0)
408 score += speed_score * norm_weights["speed"]
410 # Resource component
411 if "resource" in metrics and "resource" in norm_weights:
412 resource_score = metrics["resource"].get("resource_score", 0.0)
413 score += resource_score * norm_weights["resource"]
415 return score