Coverage for src / local_deep_research / benchmarks / web_api / benchmark_service.py: 93%
478 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""Benchmark service for handling web-based benchmark execution."""
3import hashlib
4import json
5import threading
6import time
7from datetime import UTC, datetime
8from enum import Enum
9from typing import Any, Dict, List, Optional
11from loguru import logger
13from ...api.research_functions import quick_summary
14from ...settings.manager import SnapshotSettingsContext
15from ...web.services.research_service import _global_research_semaphore
16from ...database.models.benchmark import (
17 BenchmarkResult,
18 BenchmarkRun,
19 BenchmarkStatus,
20 DatasetType,
21)
22from ...web.services.socket_service import SocketIOService
23from ..datasets import load_dataset
24from ..graders import extract_answer_from_response, grade_single_result
25from ..runners import format_query
26from ...database.thread_local_session import thread_cleanup
29class BenchmarkTaskStatus(Enum):
30 """Status values for benchmark tasks in the queue tracker."""
32 QUEUED = "queued"
33 PROCESSING = "processing"
34 COMPLETED = "completed"
35 FAILED = "failed"
36 CANCELLED = "cancelled"
39class BenchmarkQueueTracker:
40 """Simple in-memory tracker for benchmark queue status.
42 This replaces the removed memory_queue functionality for benchmarks.
43 Since benchmarks are temporary and don't need persistence,
44 this simple in-memory solution is sufficient.
46 Thread-safe for concurrent access from multiple benchmark threads.
47 """
49 def __init__(self):
50 self.tasks = {}
51 self._lock = threading.Lock()
53 def add_task(
54 self, task_id: str, username: str, task_type: str = "benchmark"
55 ):
56 """Add a new task to tracking.
58 Also performs opportunistic cleanup of old completed tasks.
59 """
60 # Cleanup old tasks before adding new one (outside lock for better performance)
61 self.cleanup_completed_tasks()
63 with self._lock:
64 self.tasks[task_id] = {
65 "username": username,
66 "task_type": task_type,
67 "status": BenchmarkTaskStatus.QUEUED.value,
68 "created_at": datetime.now(UTC),
69 }
71 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus):
72 """Update the status of a task."""
73 with self._lock:
74 if task_id in self.tasks:
75 self.tasks[task_id]["status"] = status.value
76 self.tasks[task_id]["updated_at"] = datetime.now(UTC)
77 else:
78 logger.warning(
79 f"Attempted to update status for non-existent task: {task_id}"
80 )
82 def get_task_status(self, task_id: str) -> Optional[Dict]:
83 """Get the current status of a task."""
84 with self._lock:
85 return self.tasks.get(task_id)
87 def remove_task(self, task_id: str):
88 """Remove a task from tracking."""
89 with self._lock:
90 self.tasks.pop(task_id, None)
92 def cleanup_completed_tasks(self, max_age_seconds: int = 3600):
93 """Remove completed tasks older than max_age_seconds.
95 Args:
96 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour)
97 """
98 with self._lock:
99 now = datetime.now(UTC)
100 to_remove = []
101 for task_id, task_data in self.tasks.items():
102 # Only cleanup completed, failed, or cancelled tasks
103 if task_data["status"] in [
104 BenchmarkTaskStatus.COMPLETED.value,
105 BenchmarkTaskStatus.FAILED.value,
106 BenchmarkTaskStatus.CANCELLED.value,
107 ]:
108 # Check if task has updated_at timestamp
109 updated_at = task_data.get(
110 "updated_at", task_data.get("created_at")
111 )
112 if updated_at: 112 ↛ 101line 112 didn't jump to line 101 because the condition on line 112 was always true
113 age = (now - updated_at).total_seconds()
114 if age > max_age_seconds:
115 to_remove.append(task_id)
117 for task_id in to_remove:
118 self.tasks.pop(task_id, None)
119 logger.debug(f"Cleaned up old task: {task_id}")
121 if to_remove:
122 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks")
125class BenchmarkService:
126 """Service for managing benchmark runs through the web interface."""
128 def __init__(self, socket_service=None):
129 self.active_runs: Dict[int, Dict] = {}
130 self.socket_service = socket_service or self._get_socket_service()
131 self.rate_limit_detected: Dict[
132 int, bool
133 ] = {} # Track rate limiting per benchmark run
134 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker
136 def _get_socket_service(self):
137 """Get socket service instance, handling cases where Flask app is not available."""
138 try:
139 return SocketIOService()
140 except Exception:
141 # Return a mock socket service for testing/standalone use
142 class MockSocketService:
143 def emit_to_room(self, room, event, data):
144 pass
146 return MockSocketService()
148 def generate_config_hash(self, search_config: Dict[str, Any]) -> str:
149 """Generate a hash for search configuration compatibility checking."""
150 relevant_params = {
151 "iterations": search_config.get("iterations"),
152 "questions_per_iteration": search_config.get(
153 "questions_per_iteration"
154 ),
155 "search_tool": search_config.get("search_tool"),
156 "search_strategy": search_config.get("search_strategy"),
157 "model_name": search_config.get("model_name"),
158 "provider": search_config.get("provider"),
159 }
160 # Remove None values
161 relevant_params = {
162 k: v for k, v in relevant_params.items() if v is not None
163 }
164 config_str = json.dumps(relevant_params, sort_keys=True)
165 return hashlib.md5( # DevSkim: ignore DS126858
166 config_str.encode(), usedforsecurity=False
167 ).hexdigest()[:8]
169 def generate_query_hash(self, question: str, dataset_type: str) -> str:
170 """Generate a hash for a query to enable deduplication."""
171 query_content = f"{question.strip()}|{dataset_type.lower()}"
172 return hashlib.md5( # DevSkim: ignore DS126858
173 query_content.encode(), usedforsecurity=False
174 ).hexdigest()
176 def create_benchmark_run(
177 self,
178 run_name: Optional[str],
179 search_config: Dict[str, Any],
180 evaluation_config: Dict[str, Any],
181 datasets_config: Dict[str, Dict],
182 username: Optional[str] = None,
183 user_password: Optional[str] = None,
184 ) -> int:
185 """Create a new benchmark run in the database."""
186 from ...database.session_context import get_user_db_session
188 with get_user_db_session(username, user_password) as session:
189 try:
190 config_hash = self.generate_config_hash(search_config)
192 # Calculate total examples
193 total_examples = sum(
194 config.get("count", 0)
195 for config in datasets_config.values()
196 )
198 benchmark_run = BenchmarkRun(
199 run_name=run_name,
200 config_hash=config_hash,
201 query_hash_list=[], # Will be populated as we process
202 search_config=search_config,
203 evaluation_config=evaluation_config,
204 datasets_config=datasets_config,
205 total_examples=total_examples,
206 status=BenchmarkStatus.PENDING,
207 )
209 session.add(benchmark_run)
210 session.commit()
212 logger.info(
213 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}"
214 )
215 return int(benchmark_run.id)
217 except Exception:
218 session.rollback()
219 logger.exception("Error creating benchmark run")
220 raise
222 def get_existing_results(
223 self,
224 config_hash: str,
225 username: Optional[str] = None,
226 user_password: Optional[str] = None,
227 ) -> Dict[str, Dict]:
228 """Get existing results with compatible configuration."""
229 from ...database.session_context import get_user_db_session
231 with get_user_db_session(username, user_password) as session:
232 try:
233 # Find compatible runs
234 compatible_runs = (
235 session.query(BenchmarkRun)
236 .filter(BenchmarkRun.config_hash == config_hash)
237 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED)
238 .all()
239 )
241 existing_results = {}
242 for run in compatible_runs:
243 results = (
244 session.query(BenchmarkResult)
245 .filter(BenchmarkResult.benchmark_run_id == run.id)
246 .filter(
247 BenchmarkResult.is_correct.isnot(None)
248 ) # Only completed evaluations
249 .all()
250 )
252 for result in results:
253 existing_results[result.query_hash] = {
254 "id": result.example_id,
255 "dataset_type": result.dataset_type.value,
256 "problem": result.question,
257 "correct_answer": result.correct_answer,
258 "response": result.response,
259 "extracted_answer": result.extracted_answer,
260 "confidence": result.confidence,
261 "processing_time": result.processing_time,
262 "sources": result.sources,
263 "is_correct": result.is_correct,
264 "graded_confidence": result.graded_confidence,
265 "grader_response": result.grader_response,
266 "query_hash": result.query_hash,
267 }
269 logger.info(
270 f"Found {len(existing_results)} existing results for config hash {config_hash}"
271 )
272 return existing_results
274 except Exception:
275 logger.exception("Error loading existing results")
276 return {}
278 def start_benchmark(
279 self,
280 benchmark_run_id: int,
281 username: Optional[str] = None,
282 user_password: Optional[str] = None,
283 ) -> bool:
284 """Start a benchmark run in a background thread."""
285 from ...database.session_context import get_user_db_session
287 try:
288 # Get all data from the database in the main thread
289 # This avoids database access from the background thread
290 with get_user_db_session(username, user_password) as session:
291 # Get benchmark run details
292 benchmark_run = (
293 session.query(BenchmarkRun)
294 .filter(BenchmarkRun.id == benchmark_run_id)
295 .first()
296 )
297 if not benchmark_run:
298 raise ValueError( # noqa: TRY301 — caught by except, sets FAILED status in DB
299 f"Benchmark run {benchmark_run_id} not found"
300 )
302 # Create settings snapshot for thread safety
303 from local_deep_research.settings import SettingsManager
305 settings_manager = SettingsManager(session)
306 settings_snapshot = settings_manager.get_all_settings()
308 # Get user password for metrics tracking in background thread
309 from flask import session as flask_session
310 from ...database.session_passwords import session_password_store
312 _user_password = None
313 session_id = flask_session.get("session_id")
314 if session_id and username: 314 ↛ 326line 314 didn't jump to line 326 because the condition on line 314 was always true
315 _user_password = (
316 session_password_store.get_session_password(
317 username, session_id
318 )
319 )
320 if not _user_password: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 logger.warning(
322 f"No password found for user {username} in current session"
323 )
325 # Extract all data we need
326 benchmark_data = {
327 "benchmark_run_id": benchmark_run_id,
328 "username": username or "benchmark_user",
329 "user_password": _user_password, # Add password for metrics tracking
330 "config_hash": benchmark_run.config_hash,
331 "datasets_config": benchmark_run.datasets_config,
332 "search_config": benchmark_run.search_config,
333 "evaluation_config": benchmark_run.evaluation_config,
334 "existing_results": self.get_existing_results(
335 benchmark_run.config_hash, username, user_password
336 ),
337 "settings_snapshot": settings_snapshot, # Add settings snapshot
338 }
340 # Update status in database
341 benchmark_run.status = BenchmarkStatus.IN_PROGRESS
342 benchmark_run.start_time = datetime.now(UTC)
343 session.commit()
345 # Store data in memory for the thread
346 self.active_runs[benchmark_run_id] = {
347 "data": benchmark_data,
348 "start_time": datetime.now(UTC),
349 "status": "running",
350 "results": [],
351 }
353 # Start background thread
354 thread = threading.Thread(
355 target=self._run_benchmark_thread,
356 args=(benchmark_run_id,),
357 daemon=True,
358 )
359 thread.start()
361 self.active_runs[benchmark_run_id]["thread"] = thread
363 logger.info(f"Started benchmark run {benchmark_run_id}")
364 return True
366 except Exception as e:
367 logger.exception(f"Error starting benchmark {benchmark_run_id}")
368 # Update status using user database
369 with get_user_db_session(username, user_password) as session:
370 benchmark_run = (
371 session.query(BenchmarkRun)
372 .filter(BenchmarkRun.id == benchmark_run_id)
373 .first()
374 )
375 if benchmark_run: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true
376 benchmark_run.status = BenchmarkStatus.FAILED
377 benchmark_run.error_message = str(e)
378 session.commit()
379 return False
381 @thread_cleanup
382 def _run_benchmark_thread(self, benchmark_run_id: int):
383 """Main benchmark execution thread."""
384 # IMPORTANT: This runs in a background thread, so we cannot access the user database
385 # Using in-memory queue tracker for benchmark status tracking
387 task_id = None
389 # Get the benchmark data that was passed to us
390 # We need to retrieve this from the service database or from memory
391 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get("data")
393 try:
394 if not benchmark_data:
395 raise ValueError( # noqa: TRY301
396 f"Benchmark data for run {benchmark_run_id} not found"
397 )
398 # Set up settings context for thread-local access
399 settings_snapshot = benchmark_data.get("settings_snapshot", {})
400 username = benchmark_data.get("username", "benchmark_user")
402 # Create a settings context that threads can use
403 settings_context = SnapshotSettingsContext(
404 settings_snapshot,
405 username=username,
406 missing_key_log_level="WARNING",
407 )
409 # Set the context in thread-local storage
410 from ...config.thread_settings import set_settings_context
412 set_settings_context(settings_context)
414 # Extract all the data we need
415 datasets_config = benchmark_data["datasets_config"]
416 search_config = benchmark_data["search_config"]
417 evaluation_config = benchmark_data["evaluation_config"]
418 existing_results = benchmark_data.get("existing_results", {})
420 # Create task queue
421 task_queue = self._create_task_queue(
422 datasets_config,
423 existing_results,
424 benchmark_run_id,
425 )
427 # Calculate totals
428 total_examples = len(task_queue) + len(existing_results)
429 completed_examples = len(existing_results)
431 # Initialize task tracking
432 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}"
433 username = benchmark_data.get("username", "benchmark_user")
434 self.queue_tracker.add_task(task_id, username, "benchmark")
435 self.queue_tracker.update_task_status(
436 task_id, BenchmarkTaskStatus.PROCESSING
437 )
439 # Track progress in memory
440 progress_info = {
441 "total_examples": total_examples,
442 "completed_examples": completed_examples,
443 "failed_examples": 0,
444 "start_time": datetime.now(UTC),
445 }
447 # Process tasks
448 logger.info(
449 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks"
450 )
451 for i, task in enumerate(task_queue):
452 # Check if benchmark has been cancelled
453 if (
454 benchmark_run_id in self.active_runs
455 and self.active_runs[benchmark_run_id].get("status")
456 == "cancelled"
457 ):
458 logger.info(
459 f"Benchmark {benchmark_run_id} was cancelled, stopping processing"
460 )
461 break
463 logger.info(
464 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}"
465 )
466 try:
467 # Add username and password to task for metrics tracking
468 task["username"] = benchmark_data.get("username")
469 task["user_password"] = benchmark_data.get("user_password")
471 # Acquire the global research semaphore so benchmark
472 # tasks count against the server-wide concurrency limit
473 _global_research_semaphore.acquire()
474 try:
475 # Process single task
476 result = self._process_benchmark_task(
477 task,
478 search_config,
479 evaluation_config,
480 )
481 finally:
482 _global_research_semaphore.release()
484 # Store result in memory for now (will be saved later)
485 if "results" not in self.active_runs[benchmark_run_id]: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true
486 self.active_runs[benchmark_run_id]["results"] = []
487 self.active_runs[benchmark_run_id]["results"].append(result)
489 # Update progress
490 progress_info["completed_examples"] += 1
492 logger.info(
493 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. "
494 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples"
495 )
497 # Send real-time update
498 self._send_progress_update(
499 benchmark_run_id,
500 progress_info["completed_examples"],
501 progress_info["total_examples"],
502 )
504 except Exception as e:
505 logger.exception(f"Error processing task {i}")
506 progress_info["failed_examples"] += 1
507 logger.info(
508 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. "
509 f"Total failed: {progress_info['failed_examples']}"
510 )
512 # Check if this is a rate limiting error
513 error_str = str(e).lower()
514 if ( 514 ↛ 451line 514 didn't jump to line 451 because the condition on line 514 was always true
515 "403" in error_str
516 or "rate limit" in error_str
517 or "forbidden" in error_str
518 ):
519 self.rate_limit_detected[benchmark_run_id] = True
520 # Send rate limit warning via WebSocket
521 self.socket_service.emit_to_subscribers(
522 "research_progress",
523 benchmark_run_id,
524 {
525 "rate_limit_detected": True,
526 "message": "SearXNG rate limiting detected",
527 },
528 )
530 # Mark as completed in memory tracker
531 progress_info["end_time"] = datetime.now(UTC)
533 # Check if benchmark was cancelled
534 was_cancelled = (
535 benchmark_run_id in self.active_runs
536 and self.active_runs[benchmark_run_id].get("status")
537 == "cancelled"
538 )
540 if was_cancelled:
541 status = BenchmarkStatus.CANCELLED
542 message = "Benchmark cancelled by user"
543 if task_id: 543 ↛ 556line 543 didn't jump to line 556 because the condition on line 543 was always true
544 self.queue_tracker.update_task_status(
545 task_id, BenchmarkTaskStatus.CANCELLED
546 )
547 else:
548 status = BenchmarkStatus.COMPLETED
549 message = "Benchmark completed successfully"
550 if task_id: 550 ↛ 556line 550 didn't jump to line 556 because the condition on line 550 was always true
551 self.queue_tracker.update_task_status(
552 task_id, BenchmarkTaskStatus.COMPLETED
553 )
555 # Store completion info for later database update
556 self.active_runs[benchmark_run_id]["completion_info"] = {
557 "status": status,
558 "end_time": progress_info["end_time"],
559 "completed_examples": progress_info["completed_examples"],
560 "failed_examples": progress_info["failed_examples"],
561 }
563 # Send completion notification
564 self.socket_service.emit_to_subscribers(
565 "research_progress",
566 benchmark_run_id,
567 {
568 "status": "cancelled" if was_cancelled else "completed",
569 "message": message,
570 "progress": (
571 progress_info["completed_examples"]
572 / progress_info["total_examples"]
573 * 100
574 )
575 if progress_info["total_examples"] > 0
576 else 0,
577 "benchmark_run_id": benchmark_run_id,
578 },
579 )
581 except Exception as e:
582 logger.exception(f"Benchmark run {benchmark_run_id} failed")
583 # Update task status if we have a task_id
584 if task_id: 584 ↛ 585line 584 didn't jump to line 585 because the condition on line 584 was never true
585 self.queue_tracker.update_task_status(
586 task_id, BenchmarkTaskStatus.FAILED
587 )
588 # Store failure info for later database update
589 if benchmark_run_id in self.active_runs: 589 ↛ 596line 589 didn't jump to line 596 because the condition on line 589 was always true
590 self.active_runs[benchmark_run_id]["completion_info"] = {
591 "status": BenchmarkStatus.FAILED,
592 "error_message": str(e),
593 }
594 finally:
595 # Clean up active run tracking
596 if benchmark_run_id in self.active_runs: 596 ↛ exitline 596 didn't return from function '_run_benchmark_thread' because the condition on line 596 was always true
597 # Mark that thread is done but keep data for database update
598 self.active_runs[benchmark_run_id]["thread_complete"] = True
600 # Try to save results to database immediately if possible
601 self._sync_results_to_database(benchmark_run_id)
603 def _create_task_queue(
604 self,
605 datasets_config: Dict,
606 existing_results: Dict,
607 benchmark_run_id: int,
608 ) -> List[Dict]:
609 """Create list of tasks to process, excluding existing results."""
610 tasks: List[Dict[str, Any]] = []
612 for dataset_name, config in datasets_config.items():
613 if config.get("count", 0) > 0:
614 dataset = load_dataset(
615 dataset_type=dataset_name,
616 num_examples=config["count"],
617 seed=None,
618 )
620 for i, example in enumerate(dataset):
621 # Extract question based on dataset type
622 if dataset_name.lower() == "simpleqa":
623 question = example.get("problem", "")
624 correct_answer = example.get("answer", "")
625 else: # browsecomp
626 question = example.get("problem", "")
627 correct_answer = example.get("answer", "")
629 # Generate query hash
630 query_hash = self.generate_query_hash(
631 question, dataset_name
632 )
634 # Skip if already processed
635 if query_hash in existing_results:
636 continue
638 tasks.append(
639 {
640 "benchmark_run_id": benchmark_run_id,
641 "example_id": example.get("id", f"example_{i}"),
642 "dataset_type": dataset_name,
643 "question": question,
644 "correct_answer": correct_answer,
645 "query_hash": query_hash,
646 "task_index": len(tasks),
647 }
648 )
650 return tasks
652 def _process_benchmark_task(
653 self, task: Dict, search_config: Dict, evaluation_config: Dict
654 ) -> Dict:
655 """Process a single benchmark task."""
656 try:
657 logger.info(
658 f"Starting benchmark task {task['task_index'] + 1}: "
659 f"example_id={task['example_id']}, dataset={task['dataset_type']}, "
660 f"question_preview='{task['question'][:100]}...'"
661 )
663 # Get settings context from thread-local storage
664 from ...config.thread_settings import get_settings_context
666 settings_context = get_settings_context()
668 # Generate a unique tracking ID for this benchmark task
669 import uuid
671 tracking_id = str(uuid.uuid4())
672 logger.info(
673 f"Task {task['example_id']} assigned tracking_id: {tracking_id}"
674 )
676 # Format query
677 formatted_query = format_query(
678 task["question"], task["dataset_type"]
679 )
680 logger.info(
681 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'"
682 )
684 # Run research with progress callback for WebSocket updates
685 start_time = time.time()
686 logger.info(f"Task {task['example_id']} starting research phase...")
688 def benchmark_progress_callback(
689 status: str, progress: int, data: dict
690 ):
691 """Progress callback to emit detailed research progress via WebSocket"""
692 try:
693 timestamp = datetime.now(UTC).isoformat()
695 # Create research-compatible log entry
696 log_entry = {
697 "time": timestamp,
698 "message": f"Example {task['example_id']}: {status}",
699 "progress": progress,
700 "metadata": {
701 "phase": data.get("phase", "benchmark_processing"),
702 "type": data.get("type", "info"),
703 "example_id": task["example_id"],
704 "benchmark_run_id": task["benchmark_run_id"],
705 **data, # Include all other data
706 },
707 }
709 # Determine log type based on status/message content
710 if (
711 "complete" in status.lower()
712 or "finished" in status.lower()
713 ):
714 log_entry["metadata"]["type"] = "milestone"
715 elif (
716 "error" in status.lower() or "failed" in status.lower()
717 ):
718 log_entry["metadata"]["type"] = "error"
719 elif (
720 "starting" in status.lower()
721 or "begin" in status.lower()
722 ):
723 log_entry["metadata"]["type"] = "milestone"
725 # Create progress data in research format
726 progress_data = {
727 "progress": progress,
728 "message": status,
729 "status": "in_progress",
730 "log_entry": log_entry,
731 "progress_log": json.dumps(
732 [log_entry]
733 ), # Array format expected by socket.js
734 }
736 # Emit using research_progress format that the UI expects
737 self.socket_service.emit_to_subscribers(
738 "research_progress",
739 task["benchmark_run_id"],
740 progress_data,
741 )
743 except Exception:
744 logger.exception("Error sending benchmark progress update")
746 # Get user password from task data
747 user_password = task.get("user_password")
749 search_result = quick_summary(
750 query=formatted_query,
751 research_id=tracking_id, # Pass the tracking ID
752 iterations=search_config.get("iterations", 8),
753 questions_per_iteration=search_config.get(
754 "questions_per_iteration", 5
755 ),
756 search_tool=search_config.get("search_tool", "searxng"),
757 search_strategy=search_config.get(
758 "search_strategy", "focused_iteration"
759 ),
760 progress_callback=benchmark_progress_callback,
761 model_name=search_config.get("model_name"),
762 provider=search_config.get("provider"),
763 temperature=search_config.get("temperature", 0.7),
764 openai_endpoint_url=search_config.get("openai_endpoint_url"),
765 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety
766 username=task.get("username"), # Pass username
767 user_password=user_password, # Pass password for metrics tracking
768 )
769 processing_time = time.time() - start_time
770 logger.info(
771 f"Task {task['example_id']} research completed in {processing_time:.2f}s, "
772 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}"
773 )
775 # Extract answer
776 response = search_result.get("summary", "")
777 logger.info(
778 f"Task {task['example_id']} response length: {len(response)} chars"
779 )
781 extracted_data = extract_answer_from_response(
782 response, task["dataset_type"]
783 )
784 extracted_answer = (
785 extracted_data.get("extracted_answer", "")
786 if isinstance(extracted_data, dict)
787 else str(extracted_data)
788 )
789 logger.info(
790 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'"
791 )
793 # Extract sources - handle both direct sources and all_links_of_system
794 sources = search_result.get("sources", [])
795 if not sources and "all_links_of_system" in search_result:
796 sources = search_result.get("all_links_of_system", [])
798 # Log for debugging
799 logger.debug(f"Search result keys: {list(search_result.keys())}")
800 logger.debug(f"Sources found: {len(sources)} items")
802 # Prepare result
803 result = {
804 **task,
805 "response": response,
806 "extracted_answer": extracted_answer,
807 "confidence": str(
808 extracted_data.get("confidence", "100")
809 if isinstance(extracted_data, dict)
810 else "100"
811 ),
812 "processing_time": processing_time,
813 "sources": json.dumps(sources), # Convert to JSON string
814 "completed_at": datetime.now(UTC),
815 "research_id": tracking_id, # Store the UUID in the research_id field
816 }
818 # Evaluate result - requires proper grading model
819 try:
820 logger.info(f"Task {task['example_id']} starting evaluation...")
821 eval_start_time = time.time()
823 # Always attempt evaluation, regardless of provider
824 # Modern local models like Ollama are capable of grading
825 # Try to evaluate with proper model
826 result_data = {
827 "id": task["example_id"],
828 "problem": task["question"],
829 "correct_answer": task["correct_answer"],
830 "response": response,
831 "extracted_answer": extracted_answer,
832 }
834 eval_result = grade_single_result(
835 result_data,
836 task["dataset_type"],
837 evaluation_config,
838 settings_context.snapshot,
839 )
840 eval_time = time.time() - eval_start_time
841 logger.info(
842 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s"
843 )
844 if eval_result and not eval_result.get("grading_error"):
845 result.update(
846 {
847 "is_correct": eval_result.get("is_correct", False),
848 "graded_confidence": eval_result.get(
849 "graded_confidence", "0"
850 ),
851 "grader_response": eval_result.get(
852 "grader_response", ""
853 ),
854 }
855 )
856 else:
857 error_msg = (
858 eval_result.get(
859 "grading_error", "Unknown evaluation error"
860 )
861 if eval_result
862 else "No evaluation results returned"
863 )
864 result.update(
865 {
866 "is_correct": None,
867 "graded_confidence": "0",
868 "grader_response": f"Evaluation failed: {error_msg}",
869 "evaluation_error": error_msg,
870 }
871 )
873 except Exception as e:
874 logger.exception("Evaluation error")
875 result.update(
876 {
877 "is_correct": None,
878 "graded_confidence": "0",
879 "grader_response": f"Evaluation failed: {e!s}",
880 "evaluation_error": str(e),
881 }
882 )
884 return result
886 except Exception as e:
887 logger.exception("Research error")
888 return {
889 **task,
890 "research_error": str(e),
891 "completed_at": datetime.now(UTC),
892 }
894 def sync_pending_results(
895 self, benchmark_run_id: int, username: Optional[str] = None
896 ):
897 """Sync any pending results to database. Can be called from main thread."""
898 if benchmark_run_id not in self.active_runs:
899 return 0
901 run_data = self.active_runs[benchmark_run_id]
902 results_to_save = run_data.get("results", [])
903 saved_indices = run_data.get("saved_indices", set())
905 if not username:
906 username = run_data.get("data", {}).get("username")
908 user_password = run_data.get("data", {}).get("user_password")
910 saved_count = 0
911 from ...database.session_context import get_user_db_session
912 from ...database.models.benchmark import BenchmarkResult
914 try:
915 with get_user_db_session(username, user_password) as session:
916 # Save any results that haven't been saved yet
917 for idx, result in enumerate(results_to_save):
918 if idx not in saved_indices:
919 # Check if this result already exists in the database
920 existing = (
921 session.query(BenchmarkResult)
922 .filter_by(
923 benchmark_run_id=benchmark_run_id,
924 query_hash=result["query_hash"],
925 )
926 .first()
927 )
929 if existing:
930 # Skip if already exists
931 saved_indices.add(idx)
932 continue
934 benchmark_result = BenchmarkResult(
935 benchmark_run_id=benchmark_run_id,
936 example_id=result["example_id"],
937 query_hash=result["query_hash"],
938 dataset_type=DatasetType(result["dataset_type"]),
939 research_id=result.get("research_id"),
940 question=result["question"],
941 correct_answer=result["correct_answer"],
942 response=result.get("response"),
943 extracted_answer=result.get("extracted_answer"),
944 confidence=result.get("confidence"),
945 processing_time=result.get("processing_time"),
946 sources=result.get("sources"),
947 is_correct=result.get("is_correct"),
948 graded_confidence=result.get("graded_confidence"),
949 grader_response=result.get("grader_response"),
950 completed_at=result.get("completed_at"),
951 research_error=result.get("research_error"),
952 evaluation_error=result.get("evaluation_error"),
953 task_index=result.get("task_index"),
954 )
955 session.add(benchmark_result)
956 saved_indices.add(idx)
957 saved_count += 1
959 if saved_count > 0:
960 session.commit()
961 run_data["saved_indices"] = saved_indices
962 logger.info(
963 f"Saved {saved_count} new results for benchmark {benchmark_run_id}"
964 )
966 except Exception:
967 logger.exception(
968 f"Error syncing pending results for benchmark {benchmark_run_id}"
969 )
970 # Roll back the session on error to prevent PendingRollbackError
971 try:
972 session.rollback()
973 except Exception:
974 logger.debug("Failed to rollback session after sync error")
976 return saved_count
978 def _sync_results_to_database(self, benchmark_run_id: int):
979 """Sync benchmark results from memory to database after thread completes."""
980 if benchmark_run_id not in self.active_runs:
981 return
983 run_data = self.active_runs[benchmark_run_id]
984 if not run_data.get("thread_complete"):
985 return
987 username = run_data.get("data", {}).get("username")
988 user_password = run_data.get("data", {}).get("user_password")
989 from ...database.session_context import get_user_db_session
991 try:
992 with get_user_db_session(username, user_password) as session:
993 # Update benchmark run status
994 benchmark_run = (
995 session.query(BenchmarkRun)
996 .filter(BenchmarkRun.id == benchmark_run_id)
997 .first()
998 )
1000 if benchmark_run and "completion_info" in run_data: 1000 ↛ 1076line 1000 didn't jump to line 1076
1001 info = run_data["completion_info"]
1002 benchmark_run.status = info["status"]
1003 benchmark_run.end_time = info.get(
1004 "end_time", datetime.now(UTC)
1005 )
1006 benchmark_run.completed_examples = info.get(
1007 "completed_examples", 0
1008 )
1009 benchmark_run.failed_examples = info.get(
1010 "failed_examples", 0
1011 )
1012 benchmark_run.error_message = info.get("error_message")
1014 # Save all results (skip already saved ones)
1015 saved_indices = run_data.get("saved_indices", set())
1016 for idx, result in enumerate(run_data.get("results", [])):
1017 if idx in saved_indices:
1018 continue
1019 benchmark_result = BenchmarkResult(
1020 benchmark_run_id=benchmark_run_id,
1021 example_id=result["example_id"],
1022 query_hash=result["query_hash"],
1023 dataset_type=DatasetType(result["dataset_type"]),
1024 research_id=result.get("research_id"),
1025 question=result["question"],
1026 correct_answer=result["correct_answer"],
1027 response=result.get("response"),
1028 extracted_answer=result.get("extracted_answer"),
1029 confidence=result.get("confidence"),
1030 processing_time=result.get("processing_time"),
1031 sources=result.get("sources"),
1032 is_correct=result.get("is_correct"),
1033 graded_confidence=result.get("graded_confidence"),
1034 grader_response=result.get("grader_response"),
1035 completed_at=result.get("completed_at"),
1036 research_error=result.get("research_error"),
1037 evaluation_error=result.get("evaluation_error"),
1038 task_index=result.get("task_index"),
1039 )
1040 session.add(benchmark_result)
1042 # Calculate final accuracy
1043 if benchmark_run.status == BenchmarkStatus.COMPLETED:
1044 correct_results = [
1045 r
1046 for r in run_data.get("results", [])
1047 if r.get("is_correct")
1048 ]
1049 evaluated_results = [
1050 r
1051 for r in run_data.get("results", [])
1052 if r.get("is_correct") is not None
1053 ]
1055 if evaluated_results: 1055 ↛ 1070line 1055 didn't jump to line 1070 because the condition on line 1055 was always true
1056 benchmark_run.overall_accuracy = (
1057 len(correct_results) / len(evaluated_results)
1058 ) * 100
1060 # Calculate processing rate
1061 total_time = sum(
1062 r.get("processing_time", 0)
1063 for r in evaluated_results
1064 )
1065 if total_time > 0: 1065 ↛ 1070line 1065 didn't jump to line 1070 because the condition on line 1065 was always true
1066 benchmark_run.processing_rate = len(
1067 evaluated_results
1068 ) / (total_time / 60)
1070 session.commit()
1071 logger.info(
1072 f"Successfully synced results for benchmark {benchmark_run_id}"
1073 )
1075 # Clean up memory
1076 del self.active_runs[benchmark_run_id]
1078 except Exception:
1079 logger.exception("Error syncing benchmark results to database")
1081 def _send_progress_update(
1082 self, benchmark_run_id: int, completed: int, total: int
1083 ):
1084 """Send real-time progress update via websocket."""
1085 try:
1086 percentage = (completed / total * 100) if total > 0 else 0
1088 # Create log entry for milestone progress
1089 log_entry = {
1090 "time": datetime.now(UTC).isoformat(),
1091 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)",
1092 "progress": percentage,
1093 "metadata": {
1094 "phase": "benchmark_progress",
1095 "type": "milestone",
1096 "completed": completed,
1097 "total": total,
1098 "benchmark_run_id": benchmark_run_id,
1099 },
1100 }
1102 progress_data = {
1103 "status": "in_progress",
1104 "message": f"Processing examples: {completed}/{total}",
1105 "progress": percentage,
1106 "completed": completed,
1107 "total": total,
1108 "benchmark_run_id": benchmark_run_id,
1109 "log_entry": log_entry,
1110 "progress_log": json.dumps([log_entry]),
1111 }
1113 self.socket_service.emit_to_subscribers(
1114 "research_progress", benchmark_run_id, progress_data
1115 )
1117 except Exception:
1118 logger.exception("Error sending progress update")
1120 def _calculate_final_accuracy(
1121 self, benchmark_run_id: int, username: Optional[str] = None
1122 ):
1123 """Calculate and save final accuracy metrics."""
1124 from ...database.session_context import get_user_db_session
1126 with get_user_db_session(username) as session:
1127 try:
1128 # Get all results for this run
1129 results = (
1130 session.query(BenchmarkResult)
1131 .filter(
1132 BenchmarkResult.benchmark_run_id == benchmark_run_id
1133 )
1134 .filter(BenchmarkResult.is_correct.isnot(None))
1135 .all()
1136 )
1138 if results:
1139 correct_count = sum(1 for r in results if r.is_correct)
1140 overall_accuracy = (correct_count / len(results)) * 100
1142 # Calculate processing rate
1143 total_time = sum(r.processing_time or 0 for r in results)
1144 processing_rate = (
1145 (len(results) / (total_time / 60))
1146 if total_time > 0
1147 else 0
1148 )
1150 # Update benchmark run
1151 benchmark_run = (
1152 session.query(BenchmarkRun)
1153 .filter(BenchmarkRun.id == benchmark_run_id)
1154 .first()
1155 )
1156 if benchmark_run: 1156 ↛ exitline 1156 didn't jump to the function exit
1157 benchmark_run.overall_accuracy = overall_accuracy
1158 benchmark_run.processing_rate = processing_rate
1159 session.commit()
1161 except Exception:
1162 logger.exception("Error calculating final accuracy")
1164 def update_benchmark_status(
1165 self,
1166 benchmark_run_id: int,
1167 status: BenchmarkStatus,
1168 error_message: Optional[str] = None,
1169 username: Optional[str] = None,
1170 ):
1171 """Update benchmark run status."""
1172 from ...database.session_context import get_user_db_session
1174 with get_user_db_session(username) as session:
1175 try:
1176 benchmark_run = (
1177 session.query(BenchmarkRun)
1178 .filter(BenchmarkRun.id == benchmark_run_id)
1179 .first()
1180 )
1181 if benchmark_run:
1182 benchmark_run.status = status
1183 benchmark_run.updated_at = datetime.now(UTC)
1185 if error_message:
1186 benchmark_run.error_message = error_message
1188 if (
1189 status == BenchmarkStatus.IN_PROGRESS
1190 and not benchmark_run.start_time
1191 ):
1192 benchmark_run.start_time = datetime.now(UTC)
1193 elif ( 1193 ↛ 1200line 1193 didn't jump to line 1200 because the condition on line 1193 was always true
1194 status
1195 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED]
1196 and not benchmark_run.end_time
1197 ):
1198 benchmark_run.end_time = datetime.now(UTC)
1200 session.commit()
1202 except Exception:
1203 session.rollback()
1204 logger.exception("Error updating benchmark status")
1206 def get_benchmark_status(
1207 self, benchmark_run_id: int, username: str = None
1208 ) -> Optional[Dict]:
1209 """Get current status of a benchmark run."""
1210 from ...database.session_context import get_user_db_session
1212 with get_user_db_session(username) as session:
1213 try:
1214 benchmark_run = (
1215 session.query(BenchmarkRun)
1216 .filter(BenchmarkRun.id == benchmark_run_id)
1217 .first()
1218 )
1219 if not benchmark_run:
1220 return None
1222 # Calculate running accuracy from current results AND reused results from compatible runs
1223 # First get results specifically for this benchmark run
1224 current_results = (
1225 session.query(BenchmarkResult)
1226 .filter(
1227 BenchmarkResult.benchmark_run_id == benchmark_run_id
1228 )
1229 .filter(BenchmarkResult.is_correct.isnot(None))
1230 .all()
1231 )
1233 # Then get reused results from compatible benchmark runs (same config hash)
1234 # Only count results up to the number we say we've "completed"
1235 if benchmark_run.completed_examples > len(current_results): 1235 ↛ 1237line 1235 didn't jump to line 1237 because the condition on line 1235 was never true
1236 # We have reused results, get them from compatible runs
1237 reused_count_needed = (
1238 benchmark_run.completed_examples - len(current_results)
1239 )
1241 compatible_results = (
1242 session.query(BenchmarkResult)
1243 .join(
1244 BenchmarkRun,
1245 BenchmarkResult.benchmark_run_id == BenchmarkRun.id,
1246 )
1247 .filter(
1248 BenchmarkRun.config_hash
1249 == benchmark_run.config_hash
1250 )
1251 .filter(
1252 BenchmarkRun.id != benchmark_run_id
1253 ) # Exclude current run
1254 .filter(
1255 BenchmarkRun.status == BenchmarkStatus.COMPLETED
1256 )
1257 .filter(BenchmarkResult.is_correct.isnot(None))
1258 .order_by(BenchmarkResult.id) # Consistent ordering
1259 .limit(reused_count_needed)
1260 .all()
1261 )
1263 # Combine current and reused results
1264 results = (
1265 current_results
1266 + compatible_results[:reused_count_needed]
1267 )
1268 else:
1269 # No reused results, just use current results
1270 results = current_results
1272 running_accuracy = None
1273 # Dynamic per-dataset accuracy tracking
1274 dataset_accuracies = {}
1276 if results: 1276 ↛ 1308line 1276 didn't jump to line 1308 because the condition on line 1276 was always true
1277 # Overall running accuracy
1278 correct_count = sum(1 for r in results if r.is_correct)
1279 running_accuracy = (correct_count / len(results)) * 100
1281 # Calculate accuracy for each dataset type dynamically
1282 from collections import defaultdict
1284 dataset_results = defaultdict(list)
1286 # Group results by dataset type
1287 for r in results:
1288 dataset_results[r.dataset_type.value].append(r)
1290 # Calculate accuracy for each dataset
1291 for (
1292 dataset_type,
1293 dataset_result_list,
1294 ) in dataset_results.items():
1295 if dataset_result_list: 1295 ↛ 1291line 1295 didn't jump to line 1291 because the condition on line 1295 was always true
1296 correct = sum(
1297 1 for r in dataset_result_list if r.is_correct
1298 )
1299 accuracy = (
1300 correct / len(dataset_result_list)
1301 ) * 100
1302 # Store with _accuracy suffix for consistency
1303 dataset_accuracies[f"{dataset_type}_accuracy"] = (
1304 accuracy
1305 )
1307 # Calculate time estimates and reliability metrics
1308 estimated_time_remaining = None
1309 total_elapsed_time = None
1310 avg_time_per_example = None
1311 accuracy_confidence = None
1313 # Get ALL results for timing calculation (including those pending evaluation)
1314 all_results_for_timing = (
1315 session.query(BenchmarkResult)
1316 .filter(
1317 BenchmarkResult.benchmark_run_id == benchmark_run_id
1318 )
1319 .all()
1320 )
1322 if benchmark_run.start_time and all_results_for_timing: 1322 ↛ 1356line 1322 didn't jump to line 1356 because the condition on line 1322 was always true
1323 # Calculate elapsed time
1324 current_time = datetime.now(UTC)
1325 total_elapsed_time = (
1326 current_time - benchmark_run.start_time
1327 ).total_seconds()
1329 # Calculate average processing time per example using actual count
1330 avg_time_per_example = total_elapsed_time / len(
1331 all_results_for_timing
1332 )
1334 logger.info(
1335 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, "
1336 f"results_count: {len(all_results_for_timing)}, "
1337 f"avg_per_example: {avg_time_per_example:.2f}s"
1338 )
1340 # Estimate remaining time
1341 remaining_examples = benchmark_run.total_examples - len(
1342 all_results_for_timing
1343 )
1344 if remaining_examples > 0: 1344 ↛ 1356line 1344 didn't jump to line 1356 because the condition on line 1344 was always true
1345 estimated_time_remaining = (
1346 avg_time_per_example * remaining_examples
1347 )
1348 logger.info(
1349 f"Time estimation - total: {benchmark_run.total_examples}, "
1350 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, "
1351 f"avg_time: {avg_time_per_example:.2f}s, "
1352 f"estimated_remaining: {estimated_time_remaining:.2f}s"
1353 )
1355 # Calculate accuracy confidence interval (95% confidence)
1356 if results and len(results) >= 3: 1356 ↛ 1357line 1356 didn't jump to line 1357 because the condition on line 1356 was never true
1357 import math
1359 n = len(results)
1360 p = running_accuracy / 100 if running_accuracy else 0
1361 # Standard error for proportion
1362 se = math.sqrt(p * (1 - p) / n)
1363 # 95% confidence interval (±1.96 * SE)
1364 margin_of_error = 1.96 * se * 100
1365 _running_acc = running_accuracy or 0.0
1366 accuracy_confidence = {
1367 "lower_bound": max(0, _running_acc - margin_of_error),
1368 "upper_bound": min(100, _running_acc + margin_of_error),
1369 "margin_of_error": margin_of_error,
1370 "sample_size": n,
1371 }
1373 status_data = {
1374 "id": benchmark_run.id,
1375 "run_name": benchmark_run.run_name,
1376 "status": benchmark_run.status.value,
1377 "completed_examples": len(
1378 all_results_for_timing
1379 ), # Use actual count from DB
1380 "total_examples": benchmark_run.total_examples,
1381 "failed_examples": benchmark_run.failed_examples,
1382 "overall_accuracy": benchmark_run.overall_accuracy
1383 or running_accuracy, # Use running accuracy if final not calculated
1384 "running_accuracy": running_accuracy, # Current running accuracy
1385 "processing_rate": benchmark_run.processing_rate,
1386 "estimated_time_remaining": estimated_time_remaining, # seconds
1387 "total_elapsed_time": total_elapsed_time, # seconds
1388 "avg_time_per_example": avg_time_per_example, # seconds
1389 "accuracy_confidence": accuracy_confidence, # confidence interval
1390 "created_at": benchmark_run.created_at.isoformat()
1391 if benchmark_run.created_at
1392 else None,
1393 "start_time": benchmark_run.start_time.isoformat()
1394 if benchmark_run.start_time
1395 else None,
1396 "end_time": benchmark_run.end_time.isoformat()
1397 if benchmark_run.end_time
1398 else None,
1399 "error_message": benchmark_run.error_message,
1400 # Add all per-dataset accuracies dynamically
1401 **dataset_accuracies,
1402 }
1404 logger.info(
1405 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, "
1406 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, "
1407 f"avg_time: {avg_time_per_example}"
1408 )
1410 return status_data
1412 except Exception:
1413 logger.exception("Error getting benchmark status")
1414 return None
1416 def cancel_benchmark(
1417 self, benchmark_run_id: int, username: Optional[str] = None
1418 ) -> bool:
1419 """Cancel a running benchmark."""
1420 try:
1421 if benchmark_run_id in self.active_runs:
1422 self.active_runs[benchmark_run_id]["status"] = "cancelled"
1424 self.update_benchmark_status(
1425 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username
1426 )
1427 logger.info(f"Cancelled benchmark run {benchmark_run_id}")
1428 return True
1430 except Exception:
1431 logger.exception(f"Error cancelling benchmark {benchmark_run_id}")
1432 return False
1435# Global service instance
1436benchmark_service = BenchmarkService()