Coverage for src/local_deep_research/benchmarks/web_api/benchmark_service.py: 94%
477 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""Benchmark service for handling web-based benchmark execution."""
3import hashlib
4import json
5import threading
6import time
7from datetime import UTC, datetime
8from enum import Enum
9from typing import Any, Dict, List, Optional
11from loguru import logger
13from ...api.research_functions import quick_summary
14from ...settings.manager import SnapshotSettingsContext
15from ...web.services.research_service import _global_research_semaphore
16from ...database.models.benchmark import (
17 BenchmarkResult,
18 BenchmarkRun,
19 BenchmarkStatus,
20 DatasetType,
21)
22from ...web.services.socket_service import SocketIOService
23from ..datasets import load_dataset
24from ..graders import extract_answer_from_response, grade_single_result
25from ..runners import format_query
26from ...database.thread_local_session import thread_cleanup
29class BenchmarkTaskStatus(Enum):
30 """Status values for benchmark tasks in the queue tracker."""
32 QUEUED = "queued"
33 PROCESSING = "processing"
34 COMPLETED = "completed"
35 FAILED = "failed"
36 CANCELLED = "cancelled"
39class BenchmarkQueueTracker:
40 """Simple in-memory tracker for benchmark queue status.
42 This replaces the removed memory_queue functionality for benchmarks.
43 Since benchmarks are temporary and don't need persistence,
44 this simple in-memory solution is sufficient.
46 Thread-safe for concurrent access from multiple benchmark threads.
47 """
49 def __init__(self):
50 self.tasks = {}
51 self._lock = threading.Lock()
53 def add_task(
54 self, task_id: str, username: str, task_type: str = "benchmark"
55 ):
56 """Add a new task to tracking.
58 Also performs opportunistic cleanup of old completed tasks.
59 """
60 # Cleanup old tasks before adding new one (outside lock for better performance)
61 self.cleanup_completed_tasks()
63 with self._lock:
64 self.tasks[task_id] = {
65 "username": username,
66 "task_type": task_type,
67 "status": BenchmarkTaskStatus.QUEUED.value,
68 "created_at": datetime.now(UTC),
69 }
71 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus):
72 """Update the status of a task."""
73 with self._lock:
74 if task_id in self.tasks:
75 self.tasks[task_id]["status"] = status.value
76 self.tasks[task_id]["updated_at"] = datetime.now(UTC)
77 else:
78 logger.warning(
79 f"Attempted to update status for non-existent task: {task_id}"
80 )
82 def get_task_status(self, task_id: str) -> Optional[Dict]:
83 """Get the current status of a task."""
84 with self._lock:
85 return self.tasks.get(task_id)
87 def remove_task(self, task_id: str):
88 """Remove a task from tracking."""
89 with self._lock:
90 self.tasks.pop(task_id, None)
92 def cleanup_completed_tasks(self, max_age_seconds: int = 3600):
93 """Remove completed tasks older than max_age_seconds.
95 Args:
96 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour)
97 """
98 with self._lock:
99 now = datetime.now(UTC)
100 to_remove = []
101 for task_id, task_data in self.tasks.items():
102 # Only cleanup completed, failed, or cancelled tasks
103 if task_data["status"] in [
104 BenchmarkTaskStatus.COMPLETED.value,
105 BenchmarkTaskStatus.FAILED.value,
106 BenchmarkTaskStatus.CANCELLED.value,
107 ]:
108 # Check if task has updated_at timestamp
109 updated_at = task_data.get(
110 "updated_at", task_data.get("created_at")
111 )
112 if updated_at: 112 ↛ 101line 112 didn't jump to line 101 because the condition on line 112 was always true
113 age = (now - updated_at).total_seconds()
114 if age > max_age_seconds:
115 to_remove.append(task_id)
117 for task_id in to_remove:
118 self.tasks.pop(task_id, None)
119 logger.debug(f"Cleaned up old task: {task_id}")
121 if to_remove:
122 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks")
125class BenchmarkService:
126 """Service for managing benchmark runs through the web interface."""
128 def __init__(self, socket_service=None):
129 self.active_runs: Dict[int, Dict] = {}
130 self.socket_service = socket_service or self._get_socket_service()
131 self.rate_limit_detected: Dict[
132 int, bool
133 ] = {} # Track rate limiting per benchmark run
134 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker
136 def _get_socket_service(self):
137 """Get socket service instance, handling cases where Flask app is not available."""
138 try:
139 return SocketIOService()
140 except Exception:
141 # Return a mock socket service for testing/standalone use
142 class MockSocketService:
143 def emit_to_room(self, room, event, data):
144 pass
146 return MockSocketService()
148 def generate_config_hash(self, search_config: Dict[str, Any]) -> str:
149 """Generate a hash for search configuration compatibility checking."""
150 relevant_params = {
151 "iterations": search_config.get("iterations"),
152 "questions_per_iteration": search_config.get(
153 "questions_per_iteration"
154 ),
155 "search_tool": search_config.get("search_tool"),
156 "search_strategy": search_config.get("search_strategy"),
157 "model_name": search_config.get("model_name"),
158 "provider": search_config.get("provider"),
159 }
160 # Remove None values
161 relevant_params = {
162 k: v for k, v in relevant_params.items() if v is not None
163 }
164 config_str = json.dumps(relevant_params, sort_keys=True)
165 return hashlib.md5( # DevSkim: ignore DS126858
166 config_str.encode(), usedforsecurity=False
167 ).hexdigest()[:8]
169 def generate_query_hash(self, question: str, dataset_type: str) -> str:
170 """Generate a hash for a query to enable deduplication."""
171 query_content = f"{question.strip()}|{dataset_type.lower()}"
172 return hashlib.md5( # DevSkim: ignore DS126858
173 query_content.encode(), usedforsecurity=False
174 ).hexdigest()
176 def create_benchmark_run(
177 self,
178 run_name: Optional[str],
179 search_config: Dict[str, Any],
180 evaluation_config: Dict[str, Any],
181 datasets_config: Dict[str, Dict],
182 username: Optional[str] = None,
183 user_password: Optional[str] = None,
184 ) -> int:
185 """Create a new benchmark run in the database."""
186 from ...database.session_context import get_user_db_session
188 with get_user_db_session(username, user_password) as session:
189 try:
190 config_hash = self.generate_config_hash(search_config)
192 # Calculate total examples
193 total_examples = sum(
194 config.get("count", 0)
195 for config in datasets_config.values()
196 )
198 benchmark_run = BenchmarkRun(
199 run_name=run_name,
200 config_hash=config_hash,
201 query_hash_list=[], # Will be populated as we process
202 search_config=search_config,
203 evaluation_config=evaluation_config,
204 datasets_config=datasets_config,
205 total_examples=total_examples,
206 status=BenchmarkStatus.PENDING,
207 )
209 session.add(benchmark_run)
210 session.commit()
212 logger.info(
213 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}"
214 )
215 return int(benchmark_run.id)
217 except Exception:
218 session.rollback()
219 logger.exception("Error creating benchmark run")
220 raise
222 def get_existing_results(
223 self,
224 config_hash: str,
225 username: Optional[str] = None,
226 user_password: Optional[str] = None,
227 ) -> Dict[str, Dict]:
228 """Get existing results with compatible configuration."""
229 from ...database.session_context import get_user_db_session
231 with get_user_db_session(username, user_password) as session:
232 try:
233 # Find compatible runs
234 compatible_runs = (
235 session.query(BenchmarkRun)
236 .filter(BenchmarkRun.config_hash == config_hash)
237 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED)
238 .all()
239 )
241 existing_results = {}
242 for run in compatible_runs:
243 results = (
244 session.query(BenchmarkResult)
245 .filter(BenchmarkResult.benchmark_run_id == run.id)
246 .filter(
247 BenchmarkResult.is_correct.isnot(None)
248 ) # Only completed evaluations
249 .all()
250 )
252 for result in results:
253 existing_results[result.query_hash] = {
254 "id": result.example_id,
255 "dataset_type": result.dataset_type.value,
256 "problem": result.question,
257 "correct_answer": result.correct_answer,
258 "response": result.response,
259 "extracted_answer": result.extracted_answer,
260 "confidence": result.confidence,
261 "processing_time": result.processing_time,
262 "sources": result.sources,
263 "is_correct": result.is_correct,
264 "graded_confidence": result.graded_confidence,
265 "grader_response": result.grader_response,
266 "query_hash": result.query_hash,
267 }
269 logger.info(
270 f"Found {len(existing_results)} existing results for config hash {config_hash}"
271 )
272 return existing_results
274 except Exception:
275 logger.exception("Error loading existing results")
276 return {}
278 def start_benchmark(
279 self,
280 benchmark_run_id: int,
281 username: Optional[str] = None,
282 user_password: Optional[str] = None,
283 ) -> bool:
284 """Start a benchmark run in a background thread."""
285 from ...database.session_context import get_user_db_session
287 try:
288 # Get all data from the database in the main thread
289 # This avoids database access from the background thread
290 with get_user_db_session(username, user_password) as session:
291 # Get benchmark run details
292 benchmark_run = (
293 session.query(BenchmarkRun)
294 .filter(BenchmarkRun.id == benchmark_run_id)
295 .first()
296 )
297 if not benchmark_run:
298 raise ValueError( # noqa: TRY301 — caught by except, sets FAILED status in DB
299 f"Benchmark run {benchmark_run_id} not found"
300 )
302 # Create settings snapshot for thread safety
303 from local_deep_research.settings import SettingsManager
305 settings_manager = SettingsManager(session)
306 settings_snapshot = settings_manager.get_all_settings()
308 # Get user password for metrics tracking in background thread
309 from flask import session as flask_session
310 from ...database.session_passwords import session_password_store
312 _user_password = None
313 session_id = flask_session.get("session_id")
314 if session_id and username: 314 ↛ 326line 314 didn't jump to line 326 because the condition on line 314 was always true
315 _user_password = (
316 session_password_store.get_session_password(
317 username, session_id
318 )
319 )
320 if not _user_password: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 logger.warning(
322 f"No password found for user {username} in current session"
323 )
325 # Extract all data we need
326 benchmark_data = {
327 "benchmark_run_id": benchmark_run_id,
328 "username": username or "benchmark_user",
329 "user_password": _user_password, # Add password for metrics tracking
330 "config_hash": benchmark_run.config_hash,
331 "datasets_config": benchmark_run.datasets_config,
332 "search_config": benchmark_run.search_config,
333 "evaluation_config": benchmark_run.evaluation_config,
334 "existing_results": self.get_existing_results(
335 benchmark_run.config_hash, username, user_password
336 ),
337 "settings_snapshot": settings_snapshot, # Add settings snapshot
338 }
340 # Update status in database
341 benchmark_run.status = BenchmarkStatus.IN_PROGRESS
342 benchmark_run.start_time = datetime.now(UTC)
343 session.commit()
345 # Store data in memory for the thread
346 self.active_runs[benchmark_run_id] = {
347 "data": benchmark_data,
348 "start_time": datetime.now(UTC),
349 "status": "running",
350 "results": [],
351 }
353 # Start background thread
354 thread = threading.Thread(
355 target=self._run_benchmark_thread,
356 args=(benchmark_run_id,),
357 daemon=True,
358 )
359 thread.start()
361 self.active_runs[benchmark_run_id]["thread"] = thread
363 logger.info(f"Started benchmark run {benchmark_run_id}")
364 return True
366 except Exception as e:
367 logger.exception(f"Error starting benchmark {benchmark_run_id}")
368 # If we populated active_runs before the spawn failed, drop the
369 # stale entry — it has no thread and would mislead subsequent
370 # cancel_benchmark / get_run_status calls.
371 self.active_runs.pop(benchmark_run_id, None)
372 # Update status using user database
373 with get_user_db_session(username, user_password) as session:
374 benchmark_run = (
375 session.query(BenchmarkRun)
376 .filter(BenchmarkRun.id == benchmark_run_id)
377 .first()
378 )
379 if benchmark_run:
380 benchmark_run.status = BenchmarkStatus.FAILED
381 benchmark_run.error_message = str(e)
382 session.commit()
383 return False
385 @thread_cleanup
386 def _run_benchmark_thread(self, benchmark_run_id: int):
387 """Main benchmark execution thread."""
388 # IMPORTANT: This runs in a background thread, so we cannot access the user database
389 # Using in-memory queue tracker for benchmark status tracking
391 task_id = None
393 # Get the benchmark data that was passed to us
394 # We need to retrieve this from the service database or from memory
395 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get("data")
397 try:
398 if not benchmark_data:
399 raise ValueError( # noqa: TRY301
400 f"Benchmark data for run {benchmark_run_id} not found"
401 )
402 # Set up settings context for thread-local access
403 settings_snapshot = benchmark_data.get("settings_snapshot", {})
404 username = benchmark_data.get("username", "benchmark_user")
406 # Create a settings context that threads can use
407 settings_context = SnapshotSettingsContext(
408 settings_snapshot,
409 username=username,
410 missing_key_log_level="WARNING",
411 )
413 # Set the context in thread-local storage
414 from ...config.thread_settings import set_settings_context
416 set_settings_context(settings_context)
418 # Extract all the data we need
419 datasets_config = benchmark_data["datasets_config"]
420 search_config = benchmark_data["search_config"]
421 evaluation_config = benchmark_data["evaluation_config"]
422 existing_results = benchmark_data.get("existing_results", {})
424 # Create task queue
425 task_queue = self._create_task_queue(
426 datasets_config,
427 existing_results,
428 benchmark_run_id,
429 )
431 # Calculate totals
432 total_examples = len(task_queue) + len(existing_results)
433 completed_examples = len(existing_results)
435 # Initialize task tracking
436 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}"
437 username = benchmark_data.get("username", "benchmark_user")
438 self.queue_tracker.add_task(task_id, username, "benchmark")
439 self.queue_tracker.update_task_status(
440 task_id, BenchmarkTaskStatus.PROCESSING
441 )
443 # Track progress in memory
444 progress_info = {
445 "total_examples": total_examples,
446 "completed_examples": completed_examples,
447 "failed_examples": 0,
448 "start_time": datetime.now(UTC),
449 }
451 # Process tasks
452 logger.info(
453 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks"
454 )
455 for i, task in enumerate(task_queue):
456 # Check if benchmark has been cancelled
457 if (
458 benchmark_run_id in self.active_runs
459 and self.active_runs[benchmark_run_id].get("status")
460 == "cancelled"
461 ):
462 logger.info(
463 f"Benchmark {benchmark_run_id} was cancelled, stopping processing"
464 )
465 break
467 logger.info(
468 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}"
469 )
470 try:
471 # Add username and password to task for metrics tracking
472 task["username"] = benchmark_data.get("username")
473 task["user_password"] = benchmark_data.get("user_password")
475 # Acquire the global research semaphore so benchmark
476 # tasks count against the server-wide concurrency limit
477 _global_research_semaphore.acquire()
478 try:
479 # Process single task
480 result = self._process_benchmark_task(
481 task,
482 search_config,
483 evaluation_config,
484 )
485 finally:
486 _global_research_semaphore.release()
488 # Store result in memory for now (will be saved later)
489 if "results" not in self.active_runs[benchmark_run_id]: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true
490 self.active_runs[benchmark_run_id]["results"] = []
491 self.active_runs[benchmark_run_id]["results"].append(result)
493 # Update progress
494 progress_info["completed_examples"] += 1
496 logger.info(
497 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. "
498 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples"
499 )
501 # Send real-time update
502 self._send_progress_update(
503 benchmark_run_id,
504 progress_info["completed_examples"],
505 progress_info["total_examples"],
506 )
508 except Exception as e:
509 logger.exception(f"Error processing task {i}")
510 progress_info["failed_examples"] += 1
511 logger.info(
512 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. "
513 f"Total failed: {progress_info['failed_examples']}"
514 )
516 # Check if this is a rate limiting error
517 error_str = str(e).lower()
518 if ( 518 ↛ 455line 518 didn't jump to line 455 because the condition on line 518 was always true
519 "403" in error_str
520 or "rate limit" in error_str
521 or "forbidden" in error_str
522 ):
523 self.rate_limit_detected[benchmark_run_id] = True
524 # Send rate limit warning via WebSocket
525 self.socket_service.emit_to_subscribers(
526 "research_progress",
527 benchmark_run_id,
528 {
529 "rate_limit_detected": True,
530 "message": "SearXNG rate limiting detected",
531 },
532 )
534 # Mark as completed in memory tracker
535 progress_info["end_time"] = datetime.now(UTC)
537 # Check if benchmark was cancelled
538 was_cancelled = (
539 benchmark_run_id in self.active_runs
540 and self.active_runs[benchmark_run_id].get("status")
541 == "cancelled"
542 )
544 if was_cancelled:
545 status = BenchmarkStatus.CANCELLED
546 message = "Benchmark cancelled by user"
547 if task_id: 547 ↛ 560line 547 didn't jump to line 560 because the condition on line 547 was always true
548 self.queue_tracker.update_task_status(
549 task_id, BenchmarkTaskStatus.CANCELLED
550 )
551 else:
552 status = BenchmarkStatus.COMPLETED
553 message = "Benchmark completed successfully"
554 if task_id: 554 ↛ 560line 554 didn't jump to line 560 because the condition on line 554 was always true
555 self.queue_tracker.update_task_status(
556 task_id, BenchmarkTaskStatus.COMPLETED
557 )
559 # Store completion info for later database update
560 self.active_runs[benchmark_run_id]["completion_info"] = {
561 "status": status,
562 "end_time": progress_info["end_time"],
563 "completed_examples": progress_info["completed_examples"],
564 "failed_examples": progress_info["failed_examples"],
565 }
567 # Send completion notification
568 self.socket_service.emit_to_subscribers(
569 "research_progress",
570 benchmark_run_id,
571 {
572 "status": "cancelled" if was_cancelled else "completed",
573 "message": message,
574 "progress": (
575 progress_info["completed_examples"]
576 / progress_info["total_examples"]
577 * 100
578 )
579 if progress_info["total_examples"] > 0
580 else 0,
581 "benchmark_run_id": benchmark_run_id,
582 },
583 )
585 except Exception as e:
586 logger.exception(f"Benchmark run {benchmark_run_id} failed")
587 # Update task status if we have a task_id
588 if task_id: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true
589 self.queue_tracker.update_task_status(
590 task_id, BenchmarkTaskStatus.FAILED
591 )
592 # Store failure info for later database update
593 if benchmark_run_id in self.active_runs: 593 ↛ 600line 593 didn't jump to line 600 because the condition on line 593 was always true
594 self.active_runs[benchmark_run_id]["completion_info"] = {
595 "status": BenchmarkStatus.FAILED,
596 "error_message": str(e),
597 }
598 finally:
599 # Clean up active run tracking
600 if benchmark_run_id in self.active_runs: 600 ↛ exitline 600 didn't return from function '_run_benchmark_thread' because the condition on line 600 was always true
601 # Mark that thread is done but keep data for database update
602 self.active_runs[benchmark_run_id]["thread_complete"] = True
604 # Try to save results to database immediately if possible
605 self._sync_results_to_database(benchmark_run_id)
607 def _create_task_queue(
608 self,
609 datasets_config: Dict,
610 existing_results: Dict,
611 benchmark_run_id: int,
612 ) -> List[Dict]:
613 """Create list of tasks to process, excluding existing results."""
614 tasks: List[Dict[str, Any]] = []
616 for dataset_name, config in datasets_config.items():
617 if config.get("count", 0) > 0:
618 dataset = load_dataset(
619 dataset_type=dataset_name,
620 num_examples=config["count"],
621 seed=None,
622 )
624 for i, example in enumerate(dataset):
625 # Extract question based on dataset type
626 if dataset_name.lower() == "simpleqa":
627 question = example.get("problem", "")
628 correct_answer = example.get("answer", "")
629 else: # browsecomp
630 question = example.get("problem", "")
631 correct_answer = example.get("answer", "")
633 # Generate query hash
634 query_hash = self.generate_query_hash(
635 question, dataset_name
636 )
638 # Skip if already processed
639 if query_hash in existing_results:
640 continue
642 tasks.append(
643 {
644 "benchmark_run_id": benchmark_run_id,
645 "example_id": example.get("id", f"example_{i}"),
646 "dataset_type": dataset_name,
647 "question": question,
648 "correct_answer": correct_answer,
649 "query_hash": query_hash,
650 "task_index": len(tasks),
651 }
652 )
654 return tasks
656 def _process_benchmark_task(
657 self, task: Dict, search_config: Dict, evaluation_config: Dict
658 ) -> Dict:
659 """Process a single benchmark task."""
660 try:
661 logger.info(
662 f"Starting benchmark task {task['task_index'] + 1}: "
663 f"example_id={task['example_id']}, dataset={task['dataset_type']}, "
664 f"question_preview='{task['question'][:100]}...'"
665 )
667 # Get settings context from thread-local storage
668 from ...config.thread_settings import get_settings_context
670 settings_context = get_settings_context()
672 # Generate a unique tracking ID for this benchmark task
673 import uuid
675 tracking_id = str(uuid.uuid4())
676 logger.info(
677 f"Task {task['example_id']} assigned tracking_id: {tracking_id}"
678 )
680 # Format query
681 formatted_query = format_query(
682 task["question"], task["dataset_type"]
683 )
684 logger.info(
685 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'"
686 )
688 # Run research with progress callback for WebSocket updates
689 start_time = time.time()
690 logger.info(f"Task {task['example_id']} starting research phase...")
692 def benchmark_progress_callback(
693 status: str, progress: int, data: dict
694 ):
695 """Progress callback to emit detailed research progress via WebSocket"""
696 try:
697 timestamp = datetime.now(UTC).isoformat()
699 # Create research-compatible log entry
700 log_entry = {
701 "time": timestamp,
702 "message": f"Example {task['example_id']}: {status}",
703 "progress": progress,
704 "metadata": {
705 "phase": data.get("phase", "benchmark_processing"),
706 "type": data.get("type", "info"),
707 "example_id": task["example_id"],
708 "benchmark_run_id": task["benchmark_run_id"],
709 **data, # Include all other data
710 },
711 }
713 # Determine log type based on status/message content
714 if (
715 "complete" in status.lower()
716 or "finished" in status.lower()
717 ):
718 log_entry["metadata"]["type"] = "milestone"
719 elif (
720 "error" in status.lower() or "failed" in status.lower()
721 ):
722 log_entry["metadata"]["type"] = "error"
723 elif (
724 "starting" in status.lower()
725 or "begin" in status.lower()
726 ):
727 log_entry["metadata"]["type"] = "milestone"
729 # Create progress data in research format
730 progress_data = {
731 "progress": progress,
732 "message": status,
733 "status": "in_progress",
734 "log_entry": log_entry,
735 "progress_log": json.dumps(
736 [log_entry]
737 ), # Array format expected by socket.js
738 }
740 # Emit using research_progress format that the UI expects
741 self.socket_service.emit_to_subscribers(
742 "research_progress",
743 task["benchmark_run_id"],
744 progress_data,
745 )
747 except Exception:
748 logger.exception("Error sending benchmark progress update")
750 # Get user password from task data
751 user_password = task.get("user_password")
753 search_result = quick_summary(
754 query=formatted_query,
755 research_id=tracking_id, # Pass the tracking ID
756 iterations=search_config.get("iterations", 8),
757 questions_per_iteration=search_config.get(
758 "questions_per_iteration", 5
759 ),
760 search_tool=search_config.get("search_tool", "searxng"),
761 search_strategy=search_config.get(
762 "search_strategy", "focused_iteration"
763 ),
764 progress_callback=benchmark_progress_callback,
765 model_name=search_config.get("model_name"),
766 provider=search_config.get("provider"),
767 temperature=search_config.get("temperature", 0.7),
768 openai_endpoint_url=search_config.get("openai_endpoint_url"),
769 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety
770 username=task.get("username"), # Pass username
771 user_password=user_password, # Pass password for metrics tracking
772 # The web benchmark runs against the user's encrypted DB
773 # (it has username/password and wants search metrics
774 # persisted). quick_summary's default is programmatic_mode=True
775 # for true library callers; override here so the engine's
776 # metrics path stays active.
777 programmatic_mode=False,
778 )
779 processing_time = time.time() - start_time
780 logger.info(
781 f"Task {task['example_id']} research completed in {processing_time:.2f}s, "
782 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}"
783 )
785 # Extract answer
786 response = search_result.get("summary", "")
787 logger.info(
788 f"Task {task['example_id']} response length: {len(response)} chars"
789 )
791 extracted_data = extract_answer_from_response(
792 response, task["dataset_type"]
793 )
794 extracted_answer = (
795 extracted_data.get("extracted_answer", "")
796 if isinstance(extracted_data, dict)
797 else str(extracted_data)
798 )
799 logger.info(
800 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'"
801 )
803 # Extract sources - handle both direct sources and all_links_of_system
804 sources = search_result.get("sources", [])
805 if not sources and "all_links_of_system" in search_result:
806 sources = search_result.get("all_links_of_system", [])
808 # Log for debugging
809 logger.debug(f"Search result keys: {list(search_result.keys())}")
810 logger.debug(f"Sources found: {len(sources)} items")
812 # Prepare result
813 result = {
814 **task,
815 "response": response,
816 "extracted_answer": extracted_answer,
817 "confidence": str(
818 extracted_data.get("confidence", "100")
819 if isinstance(extracted_data, dict)
820 else "100"
821 ),
822 "processing_time": processing_time,
823 "sources": json.dumps(sources), # Convert to JSON string
824 "completed_at": datetime.now(UTC),
825 "research_id": tracking_id, # Store the UUID in the research_id field
826 }
828 # Evaluate result - requires proper grading model
829 try:
830 logger.info(f"Task {task['example_id']} starting evaluation...")
831 eval_start_time = time.time()
833 # Always attempt evaluation, regardless of provider
834 # Modern local models like Ollama are capable of grading
835 # Try to evaluate with proper model
836 result_data = {
837 "id": task["example_id"],
838 "problem": task["question"],
839 "correct_answer": task["correct_answer"],
840 "response": response,
841 "extracted_answer": extracted_answer,
842 }
844 eval_result = grade_single_result(
845 result_data,
846 task["dataset_type"],
847 evaluation_config,
848 settings_context.snapshot,
849 )
850 eval_time = time.time() - eval_start_time
851 logger.info(
852 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s"
853 )
854 if eval_result and not eval_result.get("grading_error"):
855 result.update(
856 {
857 "is_correct": eval_result.get("is_correct", False),
858 "graded_confidence": eval_result.get(
859 "graded_confidence", "0"
860 ),
861 "grader_response": eval_result.get(
862 "grader_response", ""
863 ),
864 }
865 )
866 else:
867 error_msg = (
868 eval_result.get(
869 "grading_error", "Unknown evaluation error"
870 )
871 if eval_result
872 else "No evaluation results returned"
873 )
874 result.update(
875 {
876 "is_correct": None,
877 "graded_confidence": "0",
878 "grader_response": f"Evaluation failed: {error_msg}",
879 "evaluation_error": error_msg,
880 }
881 )
883 except Exception as e:
884 logger.exception("Evaluation error")
885 result.update(
886 {
887 "is_correct": None,
888 "graded_confidence": "0",
889 "grader_response": f"Evaluation failed: {e!s}",
890 "evaluation_error": str(e),
891 }
892 )
894 return result
896 except Exception as e:
897 logger.exception("Research error")
898 return {
899 **task,
900 "research_error": str(e),
901 "completed_at": datetime.now(UTC),
902 }
904 def sync_pending_results(
905 self, benchmark_run_id: int, username: Optional[str] = None
906 ):
907 """Sync any pending results to database. Can be called from main thread."""
908 if benchmark_run_id not in self.active_runs:
909 return 0
911 run_data = self.active_runs[benchmark_run_id]
912 results_to_save = run_data.get("results", [])
913 saved_indices = run_data.get("saved_indices", set())
915 if not username:
916 username = run_data.get("data", {}).get("username")
918 user_password = run_data.get("data", {}).get("user_password")
920 saved_count = 0
921 from ...database.session_context import get_user_db_session
922 from ...database.models.benchmark import BenchmarkResult
924 try:
925 with get_user_db_session(username, user_password) as session:
926 # Save any results that haven't been saved yet
927 for idx, result in enumerate(results_to_save):
928 if idx not in saved_indices:
929 # Check if this result already exists in the database
930 existing = (
931 session.query(BenchmarkResult)
932 .filter_by(
933 benchmark_run_id=benchmark_run_id,
934 query_hash=result["query_hash"],
935 )
936 .first()
937 )
939 if existing:
940 # Skip if already exists
941 saved_indices.add(idx)
942 continue
944 benchmark_result = BenchmarkResult(
945 benchmark_run_id=benchmark_run_id,
946 example_id=result["example_id"],
947 query_hash=result["query_hash"],
948 dataset_type=DatasetType(result["dataset_type"]),
949 research_id=result.get("research_id"),
950 question=result["question"],
951 correct_answer=result["correct_answer"],
952 response=result.get("response"),
953 extracted_answer=result.get("extracted_answer"),
954 confidence=result.get("confidence"),
955 processing_time=result.get("processing_time"),
956 sources=result.get("sources"),
957 is_correct=result.get("is_correct"),
958 graded_confidence=result.get("graded_confidence"),
959 grader_response=result.get("grader_response"),
960 completed_at=result.get("completed_at"),
961 research_error=result.get("research_error"),
962 evaluation_error=result.get("evaluation_error"),
963 task_index=result.get("task_index"),
964 )
965 session.add(benchmark_result)
966 saved_indices.add(idx)
967 saved_count += 1
969 if saved_count > 0:
970 session.commit()
971 run_data["saved_indices"] = saved_indices
972 logger.info(
973 f"Saved {saved_count} new results for benchmark {benchmark_run_id}"
974 )
976 except Exception:
977 logger.exception(
978 f"Error syncing pending results for benchmark {benchmark_run_id}"
979 )
980 # Roll back the session on error to prevent PendingRollbackError
981 try:
982 session.rollback()
983 except Exception:
984 logger.debug("Failed to rollback session after sync error")
986 return saved_count
988 def _sync_results_to_database(self, benchmark_run_id: int):
989 """Sync benchmark results from memory to database after thread completes."""
990 if benchmark_run_id not in self.active_runs:
991 return
993 run_data = self.active_runs[benchmark_run_id]
994 if not run_data.get("thread_complete"):
995 return
997 username = run_data.get("data", {}).get("username")
998 user_password = run_data.get("data", {}).get("user_password")
999 from ...database.session_context import get_user_db_session
1001 try:
1002 with get_user_db_session(username, user_password) as session:
1003 # Update benchmark run status
1004 benchmark_run = (
1005 session.query(BenchmarkRun)
1006 .filter(BenchmarkRun.id == benchmark_run_id)
1007 .first()
1008 )
1010 if benchmark_run and "completion_info" in run_data: 1010 ↛ 1086line 1010 didn't jump to line 1086
1011 info = run_data["completion_info"]
1012 benchmark_run.status = info["status"]
1013 benchmark_run.end_time = info.get(
1014 "end_time", datetime.now(UTC)
1015 )
1016 benchmark_run.completed_examples = info.get(
1017 "completed_examples", 0
1018 )
1019 benchmark_run.failed_examples = info.get(
1020 "failed_examples", 0
1021 )
1022 benchmark_run.error_message = info.get("error_message")
1024 # Save all results (skip already saved ones)
1025 saved_indices = run_data.get("saved_indices", set())
1026 for idx, result in enumerate(run_data.get("results", [])):
1027 if idx in saved_indices:
1028 continue
1029 benchmark_result = BenchmarkResult(
1030 benchmark_run_id=benchmark_run_id,
1031 example_id=result["example_id"],
1032 query_hash=result["query_hash"],
1033 dataset_type=DatasetType(result["dataset_type"]),
1034 research_id=result.get("research_id"),
1035 question=result["question"],
1036 correct_answer=result["correct_answer"],
1037 response=result.get("response"),
1038 extracted_answer=result.get("extracted_answer"),
1039 confidence=result.get("confidence"),
1040 processing_time=result.get("processing_time"),
1041 sources=result.get("sources"),
1042 is_correct=result.get("is_correct"),
1043 graded_confidence=result.get("graded_confidence"),
1044 grader_response=result.get("grader_response"),
1045 completed_at=result.get("completed_at"),
1046 research_error=result.get("research_error"),
1047 evaluation_error=result.get("evaluation_error"),
1048 task_index=result.get("task_index"),
1049 )
1050 session.add(benchmark_result)
1052 # Calculate final accuracy
1053 if benchmark_run.status == BenchmarkStatus.COMPLETED:
1054 correct_results = [
1055 r
1056 for r in run_data.get("results", [])
1057 if r.get("is_correct")
1058 ]
1059 evaluated_results = [
1060 r
1061 for r in run_data.get("results", [])
1062 if r.get("is_correct") is not None
1063 ]
1065 if evaluated_results: 1065 ↛ 1080line 1065 didn't jump to line 1080 because the condition on line 1065 was always true
1066 benchmark_run.overall_accuracy = (
1067 len(correct_results) / len(evaluated_results)
1068 ) * 100
1070 # Calculate processing rate
1071 total_time = sum(
1072 r.get("processing_time", 0)
1073 for r in evaluated_results
1074 )
1075 if total_time > 0: 1075 ↛ 1080line 1075 didn't jump to line 1080 because the condition on line 1075 was always true
1076 benchmark_run.processing_rate = len(
1077 evaluated_results
1078 ) / (total_time / 60)
1080 session.commit()
1081 logger.info(
1082 f"Successfully synced results for benchmark {benchmark_run_id}"
1083 )
1085 # Clean up memory
1086 del self.active_runs[benchmark_run_id]
1088 except Exception:
1089 logger.exception("Error syncing benchmark results to database")
1091 def _send_progress_update(
1092 self, benchmark_run_id: int, completed: int, total: int
1093 ):
1094 """Send real-time progress update via websocket."""
1095 try:
1096 percentage = (completed / total * 100) if total > 0 else 0
1098 # Create log entry for milestone progress
1099 log_entry = {
1100 "time": datetime.now(UTC).isoformat(),
1101 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)",
1102 "progress": percentage,
1103 "metadata": {
1104 "phase": "benchmark_progress",
1105 "type": "milestone",
1106 "completed": completed,
1107 "total": total,
1108 "benchmark_run_id": benchmark_run_id,
1109 },
1110 }
1112 progress_data = {
1113 "status": "in_progress",
1114 "message": f"Processing examples: {completed}/{total}",
1115 "progress": percentage,
1116 "completed": completed,
1117 "total": total,
1118 "benchmark_run_id": benchmark_run_id,
1119 "log_entry": log_entry,
1120 "progress_log": json.dumps([log_entry]),
1121 }
1123 self.socket_service.emit_to_subscribers(
1124 "research_progress", benchmark_run_id, progress_data
1125 )
1127 except Exception:
1128 logger.exception("Error sending progress update")
1130 def _calculate_final_accuracy(
1131 self, benchmark_run_id: int, username: Optional[str] = None
1132 ):
1133 """Calculate and save final accuracy metrics."""
1134 from ...database.session_context import get_user_db_session
1136 with get_user_db_session(username) as session:
1137 try:
1138 # Get all results for this run
1139 results = (
1140 session.query(BenchmarkResult)
1141 .filter(
1142 BenchmarkResult.benchmark_run_id == benchmark_run_id
1143 )
1144 .filter(BenchmarkResult.is_correct.isnot(None))
1145 .all()
1146 )
1148 if results:
1149 correct_count = sum(1 for r in results if r.is_correct)
1150 overall_accuracy = (correct_count / len(results)) * 100
1152 # Calculate processing rate
1153 total_time = sum(r.processing_time or 0 for r in results)
1154 processing_rate = (
1155 (len(results) / (total_time / 60))
1156 if total_time > 0
1157 else 0
1158 )
1160 # Update benchmark run
1161 benchmark_run = (
1162 session.query(BenchmarkRun)
1163 .filter(BenchmarkRun.id == benchmark_run_id)
1164 .first()
1165 )
1166 if benchmark_run: 1166 ↛ exitline 1166 didn't jump to the function exit
1167 benchmark_run.overall_accuracy = overall_accuracy
1168 benchmark_run.processing_rate = processing_rate
1169 session.commit()
1171 except Exception:
1172 logger.exception("Error calculating final accuracy")
1174 def update_benchmark_status(
1175 self,
1176 benchmark_run_id: int,
1177 status: BenchmarkStatus,
1178 error_message: Optional[str] = None,
1179 username: Optional[str] = None,
1180 ):
1181 """Update benchmark run status."""
1182 from ...database.session_context import get_user_db_session
1184 with get_user_db_session(username) as session:
1185 try:
1186 benchmark_run = (
1187 session.query(BenchmarkRun)
1188 .filter(BenchmarkRun.id == benchmark_run_id)
1189 .first()
1190 )
1191 if benchmark_run:
1192 benchmark_run.status = status
1193 benchmark_run.updated_at = datetime.now(UTC)
1195 if error_message:
1196 benchmark_run.error_message = error_message
1198 if (
1199 status == BenchmarkStatus.IN_PROGRESS
1200 and not benchmark_run.start_time
1201 ):
1202 benchmark_run.start_time = datetime.now(UTC)
1203 elif ( 1203 ↛ 1210line 1203 didn't jump to line 1210 because the condition on line 1203 was always true
1204 status
1205 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED]
1206 and not benchmark_run.end_time
1207 ):
1208 benchmark_run.end_time = datetime.now(UTC)
1210 session.commit()
1212 except Exception:
1213 session.rollback()
1214 logger.exception("Error updating benchmark status")
1216 def get_benchmark_status(
1217 self, benchmark_run_id: int, username: str = None
1218 ) -> Optional[Dict]:
1219 """Get current status of a benchmark run."""
1220 from ...database.session_context import get_user_db_session
1222 with get_user_db_session(username) as session:
1223 try:
1224 benchmark_run = (
1225 session.query(BenchmarkRun)
1226 .filter(BenchmarkRun.id == benchmark_run_id)
1227 .first()
1228 )
1229 if not benchmark_run:
1230 return None
1232 # Calculate running accuracy from current results AND reused results from compatible runs
1233 # First get results specifically for this benchmark run
1234 current_results = (
1235 session.query(BenchmarkResult)
1236 .filter(
1237 BenchmarkResult.benchmark_run_id == benchmark_run_id
1238 )
1239 .filter(BenchmarkResult.is_correct.isnot(None))
1240 .all()
1241 )
1243 # Then get reused results from compatible benchmark runs (same config hash)
1244 # Only count results up to the number we say we've "completed"
1245 if benchmark_run.completed_examples > len(current_results): 1245 ↛ 1247line 1245 didn't jump to line 1247 because the condition on line 1245 was never true
1246 # We have reused results, get them from compatible runs
1247 reused_count_needed = (
1248 benchmark_run.completed_examples - len(current_results)
1249 )
1251 compatible_results = (
1252 session.query(BenchmarkResult)
1253 .join(
1254 BenchmarkRun,
1255 BenchmarkResult.benchmark_run_id == BenchmarkRun.id,
1256 )
1257 .filter(
1258 BenchmarkRun.config_hash
1259 == benchmark_run.config_hash
1260 )
1261 .filter(
1262 BenchmarkRun.id != benchmark_run_id
1263 ) # Exclude current run
1264 .filter(
1265 BenchmarkRun.status == BenchmarkStatus.COMPLETED
1266 )
1267 .filter(BenchmarkResult.is_correct.isnot(None))
1268 .order_by(BenchmarkResult.id) # Consistent ordering
1269 .limit(reused_count_needed)
1270 .all()
1271 )
1273 # Combine current and reused results
1274 results = (
1275 current_results
1276 + compatible_results[:reused_count_needed]
1277 )
1278 else:
1279 # No reused results, just use current results
1280 results = current_results
1282 running_accuracy = None
1283 # Dynamic per-dataset accuracy tracking
1284 dataset_accuracies = {}
1286 if results: 1286 ↛ 1318line 1286 didn't jump to line 1318 because the condition on line 1286 was always true
1287 # Overall running accuracy
1288 correct_count = sum(1 for r in results if r.is_correct)
1289 running_accuracy = (correct_count / len(results)) * 100
1291 # Calculate accuracy for each dataset type dynamically
1292 from collections import defaultdict
1294 dataset_results = defaultdict(list)
1296 # Group results by dataset type
1297 for r in results:
1298 dataset_results[r.dataset_type.value].append(r)
1300 # Calculate accuracy for each dataset
1301 for (
1302 dataset_type,
1303 dataset_result_list,
1304 ) in dataset_results.items():
1305 if dataset_result_list: 1305 ↛ 1301line 1305 didn't jump to line 1301 because the condition on line 1305 was always true
1306 correct = sum(
1307 1 for r in dataset_result_list if r.is_correct
1308 )
1309 accuracy = (
1310 correct / len(dataset_result_list)
1311 ) * 100
1312 # Store with _accuracy suffix for consistency
1313 dataset_accuracies[f"{dataset_type}_accuracy"] = (
1314 accuracy
1315 )
1317 # Calculate time estimates and reliability metrics
1318 estimated_time_remaining = None
1319 total_elapsed_time = None
1320 avg_time_per_example = None
1321 accuracy_confidence = None
1323 # Get ALL results for timing calculation (including those pending evaluation)
1324 all_results_for_timing = (
1325 session.query(BenchmarkResult)
1326 .filter(
1327 BenchmarkResult.benchmark_run_id == benchmark_run_id
1328 )
1329 .all()
1330 )
1332 if benchmark_run.start_time and all_results_for_timing: 1332 ↛ 1366line 1332 didn't jump to line 1366 because the condition on line 1332 was always true
1333 # Calculate elapsed time
1334 current_time = datetime.now(UTC)
1335 total_elapsed_time = (
1336 current_time - benchmark_run.start_time
1337 ).total_seconds()
1339 # Calculate average processing time per example using actual count
1340 avg_time_per_example = total_elapsed_time / len(
1341 all_results_for_timing
1342 )
1344 logger.info(
1345 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, "
1346 f"results_count: {len(all_results_for_timing)}, "
1347 f"avg_per_example: {avg_time_per_example:.2f}s"
1348 )
1350 # Estimate remaining time
1351 remaining_examples = benchmark_run.total_examples - len(
1352 all_results_for_timing
1353 )
1354 if remaining_examples > 0: 1354 ↛ 1366line 1354 didn't jump to line 1366 because the condition on line 1354 was always true
1355 estimated_time_remaining = (
1356 avg_time_per_example * remaining_examples
1357 )
1358 logger.info(
1359 f"Time estimation - total: {benchmark_run.total_examples}, "
1360 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, "
1361 f"avg_time: {avg_time_per_example:.2f}s, "
1362 f"estimated_remaining: {estimated_time_remaining:.2f}s"
1363 )
1365 # Calculate accuracy confidence interval (Wilson score, 95%)
1366 if results and len(results) >= 3: 1366 ↛ 1367line 1366 didn't jump to line 1367 because the condition on line 1366 was never true
1367 from local_deep_research.benchmarks.metrics.statistics import (
1368 wilson_score_interval,
1369 )
1371 n = len(results)
1372 correct = sum(1 for r in results if r.is_correct)
1373 ci = wilson_score_interval(correct, n)
1374 accuracy_confidence = {
1375 "lower_bound": ci["lower"] * 100,
1376 "upper_bound": ci["upper"] * 100,
1377 "margin_of_error": ci["margin_of_error"] * 100,
1378 "sample_size": n,
1379 }
1381 status_data = {
1382 "id": benchmark_run.id,
1383 "run_name": benchmark_run.run_name,
1384 "status": benchmark_run.status.value,
1385 "completed_examples": len(
1386 all_results_for_timing
1387 ), # Use actual count from DB
1388 "total_examples": benchmark_run.total_examples,
1389 "failed_examples": benchmark_run.failed_examples,
1390 "overall_accuracy": benchmark_run.overall_accuracy
1391 or running_accuracy, # Use running accuracy if final not calculated
1392 "running_accuracy": running_accuracy, # Current running accuracy
1393 "processing_rate": benchmark_run.processing_rate,
1394 "estimated_time_remaining": estimated_time_remaining, # seconds
1395 "total_elapsed_time": total_elapsed_time, # seconds
1396 "avg_time_per_example": avg_time_per_example, # seconds
1397 "accuracy_confidence": accuracy_confidence, # confidence interval
1398 "created_at": benchmark_run.created_at.isoformat()
1399 if benchmark_run.created_at
1400 else None,
1401 "start_time": benchmark_run.start_time.isoformat()
1402 if benchmark_run.start_time
1403 else None,
1404 "end_time": benchmark_run.end_time.isoformat()
1405 if benchmark_run.end_time
1406 else None,
1407 "error_message": benchmark_run.error_message,
1408 # Add all per-dataset accuracies dynamically
1409 **dataset_accuracies,
1410 }
1412 logger.info(
1413 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, "
1414 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, "
1415 f"avg_time: {avg_time_per_example}"
1416 )
1418 return status_data
1420 except Exception:
1421 logger.exception("Error getting benchmark status")
1422 return None
1424 def cancel_benchmark(
1425 self, benchmark_run_id: int, username: Optional[str] = None
1426 ) -> bool:
1427 """Cancel a running benchmark."""
1428 try:
1429 if benchmark_run_id in self.active_runs:
1430 self.active_runs[benchmark_run_id]["status"] = "cancelled"
1432 self.update_benchmark_status(
1433 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username
1434 )
1435 logger.info(f"Cancelled benchmark run {benchmark_run_id}")
1436 return True
1438 except Exception:
1439 logger.exception(f"Error cancelling benchmark {benchmark_run_id}")
1440 return False
1443# Global service instance
1444benchmark_service = BenchmarkService()