Coverage for src / local_deep_research / benchmarks / web_api / benchmark_service.py: 9%
484 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""Benchmark service for handling web-based benchmark execution."""
3import hashlib
4import json
5import threading
6import time
7from datetime import UTC, datetime
8from enum import Enum
9from typing import Any, Dict, List, Optional
11from loguru import logger
13from ...api.research_functions import quick_summary
14from ...database.models.benchmark import (
15 BenchmarkResult,
16 BenchmarkRun,
17 BenchmarkStatus,
18 DatasetType,
19)
20from ...web.services.socket_service import SocketIOService
21from ..datasets import load_dataset
22from ..graders import extract_answer_from_response, grade_single_result
23from ..runners import format_query
26class BenchmarkTaskStatus(Enum):
27 """Status values for benchmark tasks in the queue tracker."""
29 QUEUED = "queued"
30 PROCESSING = "processing"
31 COMPLETED = "completed"
32 FAILED = "failed"
33 CANCELLED = "cancelled"
36class BenchmarkQueueTracker:
37 """Simple in-memory tracker for benchmark queue status.
39 This replaces the removed memory_queue functionality for benchmarks.
40 Since benchmarks are temporary and don't need persistence,
41 this simple in-memory solution is sufficient.
43 Thread-safe for concurrent access from multiple benchmark threads.
44 """
46 def __init__(self):
47 self.tasks = {}
48 self._lock = threading.Lock()
50 def add_task(
51 self, task_id: str, username: str, task_type: str = "benchmark"
52 ):
53 """Add a new task to tracking.
55 Also performs opportunistic cleanup of old completed tasks.
56 """
57 # Cleanup old tasks before adding new one (outside lock for better performance)
58 self.cleanup_completed_tasks()
60 with self._lock:
61 self.tasks[task_id] = {
62 "username": username,
63 "task_type": task_type,
64 "status": BenchmarkTaskStatus.QUEUED.value,
65 "created_at": datetime.now(UTC),
66 }
68 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus):
69 """Update the status of a task."""
70 with self._lock:
71 if task_id in self.tasks:
72 self.tasks[task_id]["status"] = status.value
73 self.tasks[task_id]["updated_at"] = datetime.now(UTC)
74 else:
75 logger.warning(
76 f"Attempted to update status for non-existent task: {task_id}"
77 )
79 def get_task_status(self, task_id: str) -> Optional[Dict]:
80 """Get the current status of a task."""
81 with self._lock:
82 return self.tasks.get(task_id)
84 def remove_task(self, task_id: str):
85 """Remove a task from tracking."""
86 with self._lock:
87 self.tasks.pop(task_id, None)
89 def cleanup_completed_tasks(self, max_age_seconds: int = 3600):
90 """Remove completed tasks older than max_age_seconds.
92 Args:
93 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour)
94 """
95 with self._lock:
96 now = datetime.now(UTC)
97 to_remove = []
98 for task_id, task_data in self.tasks.items():
99 # Only cleanup completed, failed, or cancelled tasks
100 if task_data["status"] in [
101 BenchmarkTaskStatus.COMPLETED.value,
102 BenchmarkTaskStatus.FAILED.value,
103 BenchmarkTaskStatus.CANCELLED.value,
104 ]:
105 # Check if task has updated_at timestamp
106 updated_at = task_data.get(
107 "updated_at", task_data.get("created_at")
108 )
109 if updated_at:
110 age = (now - updated_at).total_seconds()
111 if age > max_age_seconds:
112 to_remove.append(task_id)
114 for task_id in to_remove:
115 self.tasks.pop(task_id, None)
116 logger.debug(f"Cleaned up old task: {task_id}")
118 if to_remove:
119 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks")
122class BenchmarkService:
123 """Service for managing benchmark runs through the web interface."""
125 def __init__(self, socket_service=None):
126 self.active_runs: Dict[int, Dict] = {}
127 self.socket_service = socket_service or self._get_socket_service()
128 self.rate_limit_detected: Dict[
129 int, bool
130 ] = {} # Track rate limiting per benchmark run
131 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker
133 def _get_socket_service(self):
134 """Get socket service instance, handling cases where Flask app is not available."""
135 try:
136 return SocketIOService()
137 except Exception:
138 # Return a mock socket service for testing/standalone use
139 class MockSocketService:
140 def emit_to_room(self, room, event, data):
141 pass
143 return MockSocketService()
145 def generate_config_hash(self, search_config: Dict[str, Any]) -> str:
146 """Generate a hash for search configuration compatibility checking."""
147 relevant_params = {
148 "iterations": search_config.get("iterations"),
149 "questions_per_iteration": search_config.get(
150 "questions_per_iteration"
151 ),
152 "search_tool": search_config.get("search_tool"),
153 "search_strategy": search_config.get("search_strategy"),
154 "model_name": search_config.get("model_name"),
155 "provider": search_config.get("provider"),
156 }
157 # Remove None values
158 relevant_params = {
159 k: v for k, v in relevant_params.items() if v is not None
160 }
161 config_str = json.dumps(relevant_params, sort_keys=True)
162 return hashlib.md5( # DevSkim: ignore DS126858
163 config_str.encode(), usedforsecurity=False
164 ).hexdigest()[:8]
166 def generate_query_hash(self, question: str, dataset_type: str) -> str:
167 """Generate a hash for a query to enable deduplication."""
168 query_content = f"{question.strip()}|{dataset_type.lower()}"
169 return hashlib.md5( # DevSkim: ignore DS126858
170 query_content.encode(), usedforsecurity=False
171 ).hexdigest()
173 def create_benchmark_run(
174 self,
175 run_name: Optional[str],
176 search_config: Dict[str, Any],
177 evaluation_config: Dict[str, Any],
178 datasets_config: Dict[str, Dict],
179 username: str = None,
180 user_password: str = None,
181 ) -> int:
182 """Create a new benchmark run in the database."""
183 from ...database.session_context import get_user_db_session
185 with get_user_db_session(username, user_password) as session:
186 try:
187 config_hash = self.generate_config_hash(search_config)
189 # Calculate total examples
190 total_examples = sum(
191 config.get("count", 0)
192 for config in datasets_config.values()
193 )
195 benchmark_run = BenchmarkRun(
196 run_name=run_name,
197 config_hash=config_hash,
198 query_hash_list=[], # Will be populated as we process
199 search_config=search_config,
200 evaluation_config=evaluation_config,
201 datasets_config=datasets_config,
202 total_examples=total_examples,
203 status=BenchmarkStatus.PENDING,
204 )
206 session.add(benchmark_run)
207 session.commit()
209 logger.info(
210 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}"
211 )
212 return benchmark_run.id
214 except Exception:
215 session.rollback()
216 logger.exception("Error creating benchmark run")
217 raise
219 def get_existing_results(
220 self, config_hash: str, username: str = None, user_password: str = None
221 ) -> Dict[str, Dict]:
222 """Get existing results with compatible configuration."""
223 from ...database.session_context import get_user_db_session
225 with get_user_db_session(username, user_password) as session:
226 try:
227 # Find compatible runs
228 compatible_runs = (
229 session.query(BenchmarkRun)
230 .filter(BenchmarkRun.config_hash == config_hash)
231 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED)
232 .all()
233 )
235 existing_results = {}
236 for run in compatible_runs:
237 results = (
238 session.query(BenchmarkResult)
239 .filter(BenchmarkResult.benchmark_run_id == run.id)
240 .filter(
241 BenchmarkResult.is_correct.isnot(None)
242 ) # Only completed evaluations
243 .all()
244 )
246 for result in results:
247 existing_results[result.query_hash] = {
248 "id": result.example_id,
249 "dataset_type": result.dataset_type.value,
250 "problem": result.question,
251 "correct_answer": result.correct_answer,
252 "response": result.response,
253 "extracted_answer": result.extracted_answer,
254 "confidence": result.confidence,
255 "processing_time": result.processing_time,
256 "sources": result.sources,
257 "is_correct": result.is_correct,
258 "graded_confidence": result.graded_confidence,
259 "grader_response": result.grader_response,
260 "query_hash": result.query_hash,
261 }
263 logger.info(
264 f"Found {len(existing_results)} existing results for config hash {config_hash}"
265 )
266 return existing_results
268 except Exception:
269 logger.exception("Error loading existing results")
270 return {}
272 def start_benchmark(
273 self,
274 benchmark_run_id: int,
275 username: str = None,
276 user_password: str = None,
277 ) -> bool:
278 """Start a benchmark run in a background thread."""
279 from ...database.session_context import get_user_db_session
281 try:
282 # Get all data from the database in the main thread
283 # This avoids database access from the background thread
284 with get_user_db_session(username, user_password) as session:
285 # Get benchmark run details
286 benchmark_run = (
287 session.query(BenchmarkRun)
288 .filter(BenchmarkRun.id == benchmark_run_id)
289 .first()
290 )
291 if not benchmark_run:
292 raise ValueError(
293 f"Benchmark run {benchmark_run_id} not found"
294 )
296 # Create settings snapshot for thread safety
297 from local_deep_research.settings import SettingsManager
299 settings_manager = SettingsManager(session)
300 settings_snapshot = settings_manager.get_all_settings()
302 # Get user password for metrics tracking in background thread
303 from flask import session as flask_session
304 from ...database.session_passwords import session_password_store
306 user_password = None
307 session_id = flask_session.get("session_id")
308 if session_id and username:
309 user_password = session_password_store.get_session_password(
310 username, session_id
311 )
312 if not user_password:
313 logger.warning(
314 f"No password found for user {username} in current session"
315 )
317 # Extract all data we need
318 benchmark_data = {
319 "benchmark_run_id": benchmark_run_id,
320 "username": username or "benchmark_user",
321 "user_password": user_password, # Add password for metrics tracking
322 "config_hash": benchmark_run.config_hash,
323 "datasets_config": benchmark_run.datasets_config,
324 "search_config": benchmark_run.search_config,
325 "evaluation_config": benchmark_run.evaluation_config,
326 "existing_results": self.get_existing_results(
327 benchmark_run.config_hash, username, user_password
328 ),
329 "settings_snapshot": settings_snapshot, # Add settings snapshot
330 }
332 # Update status in database
333 benchmark_run.status = BenchmarkStatus.IN_PROGRESS
334 benchmark_run.start_time = datetime.now(UTC)
335 session.commit()
337 # Store data in memory for the thread
338 self.active_runs[benchmark_run_id] = {
339 "data": benchmark_data,
340 "start_time": datetime.now(UTC),
341 "status": "running",
342 "results": [],
343 }
345 # Start background thread
346 thread = threading.Thread(
347 target=self._run_benchmark_thread,
348 args=(benchmark_run_id,),
349 daemon=True,
350 )
351 thread.start()
353 self.active_runs[benchmark_run_id]["thread"] = thread
355 logger.info(f"Started benchmark run {benchmark_run_id}")
356 return True
358 except Exception as e:
359 logger.exception(f"Error starting benchmark {benchmark_run_id}")
360 # Update status using user database
361 with get_user_db_session(username, user_password) as session:
362 benchmark_run = (
363 session.query(BenchmarkRun)
364 .filter(BenchmarkRun.id == benchmark_run_id)
365 .first()
366 )
367 if benchmark_run:
368 benchmark_run.status = BenchmarkStatus.FAILED
369 benchmark_run.error_message = str(e)
370 session.commit()
371 return False
373 def _run_benchmark_thread(self, benchmark_run_id: int):
374 """Main benchmark execution thread."""
375 # IMPORTANT: This runs in a background thread, so we cannot access the user database
376 # Using in-memory queue tracker for benchmark status tracking
378 task_id = None
379 try:
380 # Get the benchmark data that was passed to us
381 # We need to retrieve this from the service database or from memory
382 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get(
383 "data"
384 )
386 if not benchmark_data:
387 raise ValueError(
388 f"Benchmark data for run {benchmark_run_id} not found"
389 )
391 # Set up settings context for thread-local access
392 settings_snapshot = benchmark_data.get("settings_snapshot", {})
393 username = benchmark_data.get("username", "benchmark_user")
395 # Create a settings context that threads can use
396 class SettingsContext:
397 def __init__(self, snapshot, username):
398 self.snapshot = snapshot or {}
399 self.username = username
400 # Extract values from setting objects if needed
401 self.values = {}
402 for key, setting in self.snapshot.items():
403 if isinstance(setting, dict) and "value" in setting:
404 # It's a full setting object, extract the value
405 self.values[key] = setting["value"]
406 else:
407 # It's already just a value
408 self.values[key] = setting
410 def get_setting(self, key, default=None):
411 """Get setting from snapshot only - no database access in threads"""
412 if key in self.values:
413 return self.values[key]
414 # No fallback to database - threads must use snapshot only
415 logger.warning(
416 f"Setting '{key}' not found in snapshot, using default"
417 )
418 return default
420 settings_context = SettingsContext(settings_snapshot, username)
422 # Set the context in thread-local storage
423 from ...config.thread_settings import set_settings_context
425 set_settings_context(settings_context)
427 # Extract all the data we need
428 datasets_config = benchmark_data["datasets_config"]
429 search_config = benchmark_data["search_config"]
430 evaluation_config = benchmark_data["evaluation_config"]
431 existing_results = benchmark_data.get("existing_results", {})
433 # Create task queue
434 task_queue = self._create_task_queue(
435 datasets_config,
436 existing_results,
437 benchmark_run_id,
438 )
440 # Calculate totals
441 total_examples = len(task_queue) + len(existing_results)
442 completed_examples = len(existing_results)
444 # Initialize task tracking
445 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}"
446 username = benchmark_data.get("username", "benchmark_user")
447 self.queue_tracker.add_task(task_id, username, "benchmark")
448 self.queue_tracker.update_task_status(
449 task_id, BenchmarkTaskStatus.PROCESSING
450 )
452 # Track progress in memory
453 progress_info = {
454 "total_examples": total_examples,
455 "completed_examples": completed_examples,
456 "failed_examples": 0,
457 "start_time": datetime.now(UTC),
458 }
460 # Process tasks
461 logger.info(
462 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks"
463 )
464 for i, task in enumerate(task_queue):
465 # Check if benchmark has been cancelled
466 if (
467 benchmark_run_id in self.active_runs
468 and self.active_runs[benchmark_run_id].get("status")
469 == "cancelled"
470 ):
471 logger.info(
472 f"Benchmark {benchmark_run_id} was cancelled, stopping processing"
473 )
474 break
476 logger.info(
477 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}"
478 )
479 try:
480 # Add username and password to task for metrics tracking
481 task["username"] = benchmark_data.get("username")
482 task["user_password"] = benchmark_data.get("user_password")
484 # Process single task
485 result = self._process_benchmark_task(
486 task,
487 search_config,
488 evaluation_config,
489 )
491 # Store result in memory for now (will be saved later)
492 if "results" not in self.active_runs[benchmark_run_id]:
493 self.active_runs[benchmark_run_id]["results"] = []
494 self.active_runs[benchmark_run_id]["results"].append(result)
496 # Update progress
497 progress_info["completed_examples"] += 1
499 logger.info(
500 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. "
501 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples"
502 )
504 # Send real-time update
505 self._send_progress_update(
506 benchmark_run_id,
507 progress_info["completed_examples"],
508 progress_info["total_examples"],
509 )
511 except Exception as e:
512 logger.exception(f"Error processing task {i}: {str(e)}")
513 progress_info["failed_examples"] += 1
514 logger.info(
515 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. "
516 f"Total failed: {progress_info['failed_examples']}"
517 )
519 # Check if this is a rate limiting error
520 error_str = str(e).lower()
521 if (
522 "403" in error_str
523 or "rate limit" in error_str
524 or "forbidden" in error_str
525 ):
526 self.rate_limit_detected[benchmark_run_id] = True
527 # Send rate limit warning via WebSocket
528 self.socket_service.emit_to_subscribers(
529 "research_progress",
530 benchmark_run_id,
531 {
532 "rate_limit_detected": True,
533 "message": "SearXNG rate limiting detected",
534 },
535 )
537 # Mark as completed in memory tracker
538 progress_info["end_time"] = datetime.now(UTC)
540 # Check if benchmark was cancelled
541 was_cancelled = (
542 benchmark_run_id in self.active_runs
543 and self.active_runs[benchmark_run_id].get("status")
544 == "cancelled"
545 )
547 if was_cancelled:
548 status = BenchmarkStatus.CANCELLED
549 message = "Benchmark cancelled by user"
550 if task_id:
551 self.queue_tracker.update_task_status(
552 task_id, BenchmarkTaskStatus.CANCELLED
553 )
554 else:
555 status = BenchmarkStatus.COMPLETED
556 message = "Benchmark completed successfully"
557 if task_id:
558 self.queue_tracker.update_task_status(
559 task_id, BenchmarkTaskStatus.COMPLETED
560 )
562 # Store completion info for later database update
563 self.active_runs[benchmark_run_id]["completion_info"] = {
564 "status": status,
565 "end_time": progress_info["end_time"],
566 "completed_examples": progress_info["completed_examples"],
567 "failed_examples": progress_info["failed_examples"],
568 }
570 # Send completion notification
571 self.socket_service.emit_to_subscribers(
572 "research_progress",
573 benchmark_run_id,
574 {
575 "status": "cancelled" if was_cancelled else "completed",
576 "message": message,
577 "progress": (
578 progress_info["completed_examples"]
579 / progress_info["total_examples"]
580 * 100
581 )
582 if progress_info["total_examples"] > 0
583 else 0,
584 "benchmark_run_id": benchmark_run_id,
585 },
586 )
588 except Exception as e:
589 logger.exception(f"Benchmark run {benchmark_run_id} failed")
590 # Update task status if we have a task_id
591 if task_id:
592 self.queue_tracker.update_task_status(
593 task_id, BenchmarkTaskStatus.FAILED
594 )
595 # Store failure info for later database update
596 if benchmark_run_id in self.active_runs:
597 self.active_runs[benchmark_run_id]["completion_info"] = {
598 "status": BenchmarkStatus.FAILED,
599 "error_message": str(e),
600 }
601 finally:
602 # Clean up active run tracking
603 if benchmark_run_id in self.active_runs:
604 # Mark that thread is done but keep data for database update
605 self.active_runs[benchmark_run_id]["thread_complete"] = True
607 # Try to save results to database immediately if possible
608 self._sync_results_to_database(benchmark_run_id)
610 def _create_task_queue(
611 self,
612 datasets_config: Dict,
613 existing_results: Dict,
614 benchmark_run_id: int,
615 ) -> List[Dict]:
616 """Create list of tasks to process, excluding existing results."""
617 tasks = []
619 for dataset_name, config in datasets_config.items():
620 if config.get("count", 0) > 0:
621 dataset = load_dataset(
622 dataset_type=dataset_name,
623 num_examples=config["count"],
624 seed=None,
625 )
627 for i, example in enumerate(dataset):
628 # Extract question based on dataset type
629 if dataset_name.lower() == "simpleqa":
630 question = example.get("problem", "")
631 correct_answer = example.get("answer", "")
632 else: # browsecomp
633 question = example.get("problem", "")
634 correct_answer = example.get("answer", "")
636 # Generate query hash
637 query_hash = self.generate_query_hash(
638 question, dataset_name
639 )
641 # Skip if already processed
642 if query_hash in existing_results:
643 continue
645 tasks.append(
646 {
647 "benchmark_run_id": benchmark_run_id,
648 "example_id": example.get("id", f"example_{i}"),
649 "dataset_type": dataset_name,
650 "question": question,
651 "correct_answer": correct_answer,
652 "query_hash": query_hash,
653 "task_index": len(tasks),
654 }
655 )
657 return tasks
659 def _process_benchmark_task(
660 self, task: Dict, search_config: Dict, evaluation_config: Dict
661 ) -> Dict:
662 """Process a single benchmark task."""
663 try:
664 logger.info(
665 f"Starting benchmark task {task['task_index'] + 1}: "
666 f"example_id={task['example_id']}, dataset={task['dataset_type']}, "
667 f"question_preview='{task['question'][:100]}...'"
668 )
670 # Get settings context from thread-local storage
671 from ...config.thread_settings import get_settings_context
673 settings_context = get_settings_context()
675 # Generate a unique tracking ID for this benchmark task
676 import uuid
678 tracking_id = str(uuid.uuid4())
679 logger.info(
680 f"Task {task['example_id']} assigned tracking_id: {tracking_id}"
681 )
683 # Format query
684 formatted_query = format_query(
685 task["question"], task["dataset_type"]
686 )
687 logger.info(
688 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'"
689 )
691 # Run research with progress callback for WebSocket updates
692 start_time = time.time()
693 logger.info(f"Task {task['example_id']} starting research phase...")
695 def benchmark_progress_callback(
696 status: str, progress: int, data: dict
697 ):
698 """Progress callback to emit detailed research progress via WebSocket"""
699 try:
700 timestamp = datetime.now(UTC).isoformat()
702 # Create research-compatible log entry
703 log_entry = {
704 "time": timestamp,
705 "message": f"Example {task['example_id']}: {status}",
706 "progress": progress,
707 "metadata": {
708 "phase": data.get("phase", "benchmark_processing"),
709 "type": data.get("type", "info"),
710 "example_id": task["example_id"],
711 "benchmark_run_id": task["benchmark_run_id"],
712 **data, # Include all other data
713 },
714 }
716 # Determine log type based on status/message content
717 if (
718 "complete" in status.lower()
719 or "finished" in status.lower()
720 ):
721 log_entry["metadata"]["type"] = "milestone"
722 elif (
723 "error" in status.lower() or "failed" in status.lower()
724 ):
725 log_entry["metadata"]["type"] = "error"
726 elif (
727 "starting" in status.lower()
728 or "begin" in status.lower()
729 ):
730 log_entry["metadata"]["type"] = "milestone"
732 # Create progress data in research format
733 progress_data = {
734 "progress": progress,
735 "message": status,
736 "status": "in_progress",
737 "log_entry": log_entry,
738 "progress_log": json.dumps(
739 [log_entry]
740 ), # Array format expected by socket.js
741 }
743 # Emit using research_progress format that the UI expects
744 self.socket_service.emit_to_subscribers(
745 "research_progress",
746 task["benchmark_run_id"],
747 progress_data,
748 )
750 except Exception:
751 logger.exception("Error sending benchmark progress update")
753 # Get user password from task data
754 user_password = task.get("user_password")
756 search_result = quick_summary(
757 query=formatted_query,
758 research_id=tracking_id, # Pass the tracking ID
759 iterations=search_config.get("iterations", 8),
760 questions_per_iteration=search_config.get(
761 "questions_per_iteration", 5
762 ),
763 search_tool=search_config.get("search_tool", "searxng"),
764 search_strategy=search_config.get(
765 "search_strategy", "focused_iteration"
766 ),
767 progress_callback=benchmark_progress_callback,
768 model_name=search_config.get("model_name"),
769 provider=search_config.get("provider"),
770 temperature=search_config.get("temperature", 0.7),
771 openai_endpoint_url=search_config.get("openai_endpoint_url"),
772 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety
773 username=task.get("username"), # Pass username
774 user_password=user_password, # Pass password for metrics tracking
775 )
776 processing_time = time.time() - start_time
777 logger.info(
778 f"Task {task['example_id']} research completed in {processing_time:.2f}s, "
779 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}"
780 )
782 # Extract answer
783 response = search_result.get("summary", "")
784 logger.info(
785 f"Task {task['example_id']} response length: {len(response)} chars"
786 )
788 extracted_data = extract_answer_from_response(
789 response, task["dataset_type"]
790 )
791 extracted_answer = (
792 extracted_data.get("extracted_answer", "")
793 if isinstance(extracted_data, dict)
794 else str(extracted_data)
795 )
796 logger.info(
797 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'"
798 )
800 # Extract sources - handle both direct sources and all_links_of_system
801 sources = search_result.get("sources", [])
802 if not sources and "all_links_of_system" in search_result:
803 sources = search_result.get("all_links_of_system", [])
805 # Log for debugging
806 logger.debug(f"Search result keys: {list(search_result.keys())}")
807 logger.debug(f"Sources found: {len(sources)} items")
809 # Prepare result
810 result = {
811 **task,
812 "response": response,
813 "extracted_answer": extracted_answer,
814 "confidence": str(
815 extracted_data.get("confidence", "100")
816 if isinstance(extracted_data, dict)
817 else "100"
818 ),
819 "processing_time": processing_time,
820 "sources": json.dumps(sources), # Convert to JSON string
821 "completed_at": datetime.now(UTC),
822 "research_id": tracking_id, # Store the UUID in the research_id field
823 }
825 # Evaluate result - requires proper grading model
826 try:
827 logger.info(f"Task {task['example_id']} starting evaluation...")
828 eval_start_time = time.time()
830 # Always attempt evaluation, regardless of provider
831 # Modern local models like Ollama are capable of grading
832 # Try to evaluate with proper model
833 result_data = {
834 "id": task["example_id"],
835 "problem": task["question"],
836 "correct_answer": task["correct_answer"],
837 "response": response,
838 "extracted_answer": extracted_answer,
839 }
841 eval_result = grade_single_result(
842 result_data,
843 task["dataset_type"],
844 evaluation_config,
845 settings_context.snapshot,
846 )
847 eval_time = time.time() - eval_start_time
848 logger.info(
849 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s"
850 )
851 if eval_result and not eval_result.get("grading_error"):
852 result.update(
853 {
854 "is_correct": eval_result.get("is_correct", False),
855 "graded_confidence": eval_result.get(
856 "graded_confidence", "0"
857 ),
858 "grader_response": eval_result.get(
859 "grader_response", ""
860 ),
861 }
862 )
863 else:
864 error_msg = (
865 eval_result.get(
866 "grading_error", "Unknown evaluation error"
867 )
868 if eval_result
869 else "No evaluation results returned"
870 )
871 result.update(
872 {
873 "is_correct": None,
874 "graded_confidence": "0",
875 "grader_response": f"Evaluation failed: {error_msg}",
876 "evaluation_error": error_msg,
877 }
878 )
880 except Exception as e:
881 logger.exception("Evaluation error")
882 result.update(
883 {
884 "is_correct": None,
885 "graded_confidence": "0",
886 "grader_response": f"Evaluation failed: {e!s}",
887 "evaluation_error": str(e),
888 }
889 )
891 return result
893 except Exception as e:
894 logger.exception("Research error")
895 return {
896 **task,
897 "research_error": str(e),
898 "completed_at": datetime.now(UTC),
899 }
901 def sync_pending_results(self, benchmark_run_id: int, username: str = None):
902 """Sync any pending results to database. Can be called from main thread."""
903 if benchmark_run_id not in self.active_runs:
904 return 0
906 run_data = self.active_runs[benchmark_run_id]
907 results_to_save = run_data.get("results", [])
908 saved_indices = run_data.get("saved_indices", set())
910 if not username:
911 username = run_data.get("data", {}).get("username")
913 user_password = run_data.get("data", {}).get("user_password")
915 saved_count = 0
916 from ...database.session_context import get_user_db_session
917 from ...database.models.benchmark import BenchmarkResult
919 try:
920 with get_user_db_session(username, user_password) as session:
921 # Save any results that haven't been saved yet
922 for idx, result in enumerate(results_to_save):
923 if idx not in saved_indices:
924 # Check if this result already exists in the database
925 existing = (
926 session.query(BenchmarkResult)
927 .filter_by(
928 benchmark_run_id=benchmark_run_id,
929 query_hash=result["query_hash"],
930 )
931 .first()
932 )
934 if existing:
935 # Skip if already exists
936 saved_indices.add(idx)
937 continue
939 benchmark_result = BenchmarkResult(
940 benchmark_run_id=benchmark_run_id,
941 example_id=result["example_id"],
942 query_hash=result["query_hash"],
943 dataset_type=DatasetType(result["dataset_type"]),
944 research_id=result.get("research_id"),
945 question=result["question"],
946 correct_answer=result["correct_answer"],
947 response=result.get("response"),
948 extracted_answer=result.get("extracted_answer"),
949 confidence=result.get("confidence"),
950 processing_time=result.get("processing_time"),
951 sources=result.get("sources"),
952 is_correct=result.get("is_correct"),
953 graded_confidence=result.get("graded_confidence"),
954 grader_response=result.get("grader_response"),
955 completed_at=result.get("completed_at"),
956 research_error=result.get("research_error"),
957 evaluation_error=result.get("evaluation_error"),
958 task_index=result.get("task_index"),
959 )
960 session.add(benchmark_result)
961 saved_indices.add(idx)
962 saved_count += 1
964 if saved_count > 0:
965 session.commit()
966 run_data["saved_indices"] = saved_indices
967 logger.info(
968 f"Saved {saved_count} new results for benchmark {benchmark_run_id}"
969 )
971 except Exception:
972 logger.exception(
973 f"Error syncing pending results for benchmark {benchmark_run_id}"
974 )
975 # Roll back the session on error to prevent PendingRollbackError
976 try:
977 session.rollback()
978 except:
979 pass
981 return saved_count
983 def _sync_results_to_database(self, benchmark_run_id: int):
984 """Sync benchmark results from memory to database after thread completes."""
985 if benchmark_run_id not in self.active_runs:
986 return
988 run_data = self.active_runs[benchmark_run_id]
989 if not run_data.get("thread_complete"):
990 return
992 username = run_data.get("data", {}).get("username")
993 user_password = run_data.get("data", {}).get("user_password")
994 from ...database.session_context import get_user_db_session
996 try:
997 with get_user_db_session(username, user_password) as session:
998 # Update benchmark run status
999 benchmark_run = (
1000 session.query(BenchmarkRun)
1001 .filter(BenchmarkRun.id == benchmark_run_id)
1002 .first()
1003 )
1005 if benchmark_run and "completion_info" in run_data:
1006 info = run_data["completion_info"]
1007 benchmark_run.status = info["status"]
1008 benchmark_run.end_time = info.get(
1009 "end_time", datetime.now(UTC)
1010 )
1011 benchmark_run.completed_examples = info.get(
1012 "completed_examples", 0
1013 )
1014 benchmark_run.failed_examples = info.get(
1015 "failed_examples", 0
1016 )
1017 benchmark_run.error_message = info.get("error_message")
1019 # Save all results (skip already saved ones)
1020 saved_indices = run_data.get("saved_indices", set())
1021 for idx, result in enumerate(run_data.get("results", [])):
1022 if idx in saved_indices:
1023 continue
1024 benchmark_result = BenchmarkResult(
1025 benchmark_run_id=benchmark_run_id,
1026 example_id=result["example_id"],
1027 query_hash=result["query_hash"],
1028 dataset_type=DatasetType(result["dataset_type"]),
1029 research_id=result.get("research_id"),
1030 question=result["question"],
1031 correct_answer=result["correct_answer"],
1032 response=result.get("response"),
1033 extracted_answer=result.get("extracted_answer"),
1034 confidence=result.get("confidence"),
1035 processing_time=result.get("processing_time"),
1036 sources=result.get("sources"),
1037 is_correct=result.get("is_correct"),
1038 graded_confidence=result.get("graded_confidence"),
1039 grader_response=result.get("grader_response"),
1040 completed_at=result.get("completed_at"),
1041 research_error=result.get("research_error"),
1042 evaluation_error=result.get("evaluation_error"),
1043 task_index=result.get("task_index"),
1044 )
1045 session.add(benchmark_result)
1047 # Calculate final accuracy
1048 if benchmark_run.status == BenchmarkStatus.COMPLETED:
1049 correct_results = [
1050 r
1051 for r in run_data.get("results", [])
1052 if r.get("is_correct")
1053 ]
1054 evaluated_results = [
1055 r
1056 for r in run_data.get("results", [])
1057 if r.get("is_correct") is not None
1058 ]
1060 if evaluated_results:
1061 benchmark_run.overall_accuracy = (
1062 len(correct_results) / len(evaluated_results)
1063 ) * 100
1065 # Calculate processing rate
1066 total_time = sum(
1067 r.get("processing_time", 0)
1068 for r in evaluated_results
1069 )
1070 if total_time > 0:
1071 benchmark_run.processing_rate = len(
1072 evaluated_results
1073 ) / (total_time / 60)
1075 session.commit()
1076 logger.info(
1077 f"Successfully synced results for benchmark {benchmark_run_id}"
1078 )
1080 # Clean up memory
1081 del self.active_runs[benchmark_run_id]
1083 except Exception:
1084 logger.exception("Error syncing benchmark results to database")
1086 def _send_progress_update(
1087 self, benchmark_run_id: int, completed: int, total: int
1088 ):
1089 """Send real-time progress update via websocket."""
1090 try:
1091 percentage = (completed / total * 100) if total > 0 else 0
1093 # Create log entry for milestone progress
1094 log_entry = {
1095 "time": datetime.now(UTC).isoformat(),
1096 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)",
1097 "progress": percentage,
1098 "metadata": {
1099 "phase": "benchmark_progress",
1100 "type": "milestone",
1101 "completed": completed,
1102 "total": total,
1103 "benchmark_run_id": benchmark_run_id,
1104 },
1105 }
1107 progress_data = {
1108 "status": "in_progress",
1109 "message": f"Processing examples: {completed}/{total}",
1110 "progress": percentage,
1111 "completed": completed,
1112 "total": total,
1113 "benchmark_run_id": benchmark_run_id,
1114 "log_entry": log_entry,
1115 "progress_log": json.dumps([log_entry]),
1116 }
1118 self.socket_service.emit_to_subscribers(
1119 "research_progress", benchmark_run_id, progress_data
1120 )
1122 except Exception:
1123 logger.exception("Error sending progress update")
1125 def _calculate_final_accuracy(
1126 self, benchmark_run_id: int, username: str = None
1127 ):
1128 """Calculate and save final accuracy metrics."""
1129 from ...database.session_context import get_user_db_session
1131 with get_user_db_session(username) as session:
1132 try:
1133 # Get all results for this run
1134 results = (
1135 session.query(BenchmarkResult)
1136 .filter(
1137 BenchmarkResult.benchmark_run_id == benchmark_run_id
1138 )
1139 .filter(BenchmarkResult.is_correct.isnot(None))
1140 .all()
1141 )
1143 if results:
1144 correct_count = sum(1 for r in results if r.is_correct)
1145 overall_accuracy = (correct_count / len(results)) * 100
1147 # Calculate processing rate
1148 total_time = sum(r.processing_time or 0 for r in results)
1149 processing_rate = (
1150 (len(results) / (total_time / 60))
1151 if total_time > 0
1152 else 0
1153 )
1155 # Update benchmark run
1156 benchmark_run = (
1157 session.query(BenchmarkRun)
1158 .filter(BenchmarkRun.id == benchmark_run_id)
1159 .first()
1160 )
1161 if benchmark_run:
1162 benchmark_run.overall_accuracy = overall_accuracy
1163 benchmark_run.processing_rate = processing_rate
1164 session.commit()
1166 except Exception:
1167 logger.exception("Error calculating final accuracy")
1169 def update_benchmark_status(
1170 self,
1171 benchmark_run_id: int,
1172 status: BenchmarkStatus,
1173 error_message: str = None,
1174 username: str = None,
1175 ):
1176 """Update benchmark run status."""
1177 from ...database.session_context import get_user_db_session
1179 with get_user_db_session(username) as session:
1180 try:
1181 benchmark_run = (
1182 session.query(BenchmarkRun)
1183 .filter(BenchmarkRun.id == benchmark_run_id)
1184 .first()
1185 )
1186 if benchmark_run:
1187 benchmark_run.status = status
1188 benchmark_run.updated_at = datetime.now(UTC)
1190 if error_message:
1191 benchmark_run.error_message = error_message
1193 if (
1194 status == BenchmarkStatus.IN_PROGRESS
1195 and not benchmark_run.start_time
1196 ):
1197 benchmark_run.start_time = datetime.now(UTC)
1198 elif (
1199 status
1200 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED]
1201 and not benchmark_run.end_time
1202 ):
1203 benchmark_run.end_time = datetime.now(UTC)
1205 session.commit()
1207 except Exception:
1208 session.rollback()
1209 logger.exception("Error updating benchmark status")
1211 def get_benchmark_status(
1212 self, benchmark_run_id: int, username: str = None
1213 ) -> Optional[Dict]:
1214 """Get current status of a benchmark run."""
1215 from ...database.session_context import get_user_db_session
1217 with get_user_db_session(username) as session:
1218 try:
1219 benchmark_run = (
1220 session.query(BenchmarkRun)
1221 .filter(BenchmarkRun.id == benchmark_run_id)
1222 .first()
1223 )
1224 if not benchmark_run:
1225 return None
1227 # Calculate running accuracy from current results AND reused results from compatible runs
1228 # First get results specifically for this benchmark run
1229 current_results = (
1230 session.query(BenchmarkResult)
1231 .filter(
1232 BenchmarkResult.benchmark_run_id == benchmark_run_id
1233 )
1234 .filter(BenchmarkResult.is_correct.isnot(None))
1235 .all()
1236 )
1238 # Then get reused results from compatible benchmark runs (same config hash)
1239 # Only count results up to the number we say we've "completed"
1240 if benchmark_run.completed_examples > len(current_results):
1241 # We have reused results, get them from compatible runs
1242 reused_count_needed = (
1243 benchmark_run.completed_examples - len(current_results)
1244 )
1246 compatible_results = (
1247 session.query(BenchmarkResult)
1248 .join(
1249 BenchmarkRun,
1250 BenchmarkResult.benchmark_run_id == BenchmarkRun.id,
1251 )
1252 .filter(
1253 BenchmarkRun.config_hash
1254 == benchmark_run.config_hash
1255 )
1256 .filter(
1257 BenchmarkRun.id != benchmark_run_id
1258 ) # Exclude current run
1259 .filter(
1260 BenchmarkRun.status == BenchmarkStatus.COMPLETED
1261 )
1262 .filter(BenchmarkResult.is_correct.isnot(None))
1263 .order_by(BenchmarkResult.id) # Consistent ordering
1264 .limit(reused_count_needed)
1265 .all()
1266 )
1268 # Combine current and reused results
1269 results = (
1270 current_results
1271 + compatible_results[:reused_count_needed]
1272 )
1273 else:
1274 # No reused results, just use current results
1275 results = current_results
1277 running_accuracy = None
1278 # Dynamic per-dataset accuracy tracking
1279 dataset_accuracies = {}
1281 if results:
1282 # Overall running accuracy
1283 correct_count = sum(1 for r in results if r.is_correct)
1284 running_accuracy = (correct_count / len(results)) * 100
1286 # Calculate accuracy for each dataset type dynamically
1287 from collections import defaultdict
1289 dataset_results = defaultdict(list)
1291 # Group results by dataset type
1292 for r in results:
1293 dataset_results[r.dataset_type.value].append(r)
1295 # Calculate accuracy for each dataset
1296 for (
1297 dataset_type,
1298 dataset_result_list,
1299 ) in dataset_results.items():
1300 if dataset_result_list:
1301 correct = sum(
1302 1 for r in dataset_result_list if r.is_correct
1303 )
1304 accuracy = (
1305 correct / len(dataset_result_list)
1306 ) * 100
1307 # Store with _accuracy suffix for consistency
1308 dataset_accuracies[f"{dataset_type}_accuracy"] = (
1309 accuracy
1310 )
1312 # Calculate time estimates and reliability metrics
1313 estimated_time_remaining = None
1314 total_elapsed_time = None
1315 avg_time_per_example = None
1316 accuracy_confidence = None
1318 # Get ALL results for timing calculation (including those pending evaluation)
1319 all_results_for_timing = (
1320 session.query(BenchmarkResult)
1321 .filter(
1322 BenchmarkResult.benchmark_run_id == benchmark_run_id
1323 )
1324 .all()
1325 )
1327 if benchmark_run.start_time and all_results_for_timing:
1328 # Calculate elapsed time
1329 current_time = datetime.now(UTC)
1330 total_elapsed_time = (
1331 current_time - benchmark_run.start_time
1332 ).total_seconds()
1334 # Calculate average processing time per example using actual count
1335 avg_time_per_example = total_elapsed_time / len(
1336 all_results_for_timing
1337 )
1339 logger.info(
1340 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, "
1341 f"results_count: {len(all_results_for_timing)}, "
1342 f"avg_per_example: {avg_time_per_example:.2f}s"
1343 )
1345 # Estimate remaining time
1346 remaining_examples = benchmark_run.total_examples - len(
1347 all_results_for_timing
1348 )
1349 if remaining_examples > 0:
1350 estimated_time_remaining = (
1351 avg_time_per_example * remaining_examples
1352 )
1353 logger.info(
1354 f"Time estimation - total: {benchmark_run.total_examples}, "
1355 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, "
1356 f"avg_time: {avg_time_per_example:.2f}s, "
1357 f"estimated_remaining: {estimated_time_remaining:.2f}s"
1358 )
1360 # Calculate accuracy confidence interval (95% confidence)
1361 if results and len(results) >= 3:
1362 import math
1364 n = len(results)
1365 p = running_accuracy / 100 if running_accuracy else 0
1366 # Standard error for proportion
1367 se = math.sqrt(p * (1 - p) / n)
1368 # 95% confidence interval (±1.96 * SE)
1369 margin_of_error = 1.96 * se * 100
1370 accuracy_confidence = {
1371 "lower_bound": max(
1372 0, running_accuracy - margin_of_error
1373 ),
1374 "upper_bound": min(
1375 100, running_accuracy + margin_of_error
1376 ),
1377 "margin_of_error": margin_of_error,
1378 "sample_size": n,
1379 }
1381 status_data = {
1382 "id": benchmark_run.id,
1383 "run_name": benchmark_run.run_name,
1384 "status": benchmark_run.status.value,
1385 "completed_examples": len(
1386 all_results_for_timing
1387 ), # Use actual count from DB
1388 "total_examples": benchmark_run.total_examples,
1389 "failed_examples": benchmark_run.failed_examples,
1390 "overall_accuracy": benchmark_run.overall_accuracy
1391 or running_accuracy, # Use running accuracy if final not calculated
1392 "running_accuracy": running_accuracy, # Current running accuracy
1393 "processing_rate": benchmark_run.processing_rate,
1394 "estimated_time_remaining": estimated_time_remaining, # seconds
1395 "total_elapsed_time": total_elapsed_time, # seconds
1396 "avg_time_per_example": avg_time_per_example, # seconds
1397 "accuracy_confidence": accuracy_confidence, # confidence interval
1398 "created_at": benchmark_run.created_at.isoformat()
1399 if benchmark_run.created_at
1400 else None,
1401 "start_time": benchmark_run.start_time.isoformat()
1402 if benchmark_run.start_time
1403 else None,
1404 "end_time": benchmark_run.end_time.isoformat()
1405 if benchmark_run.end_time
1406 else None,
1407 "error_message": benchmark_run.error_message,
1408 # Add all per-dataset accuracies dynamically
1409 **dataset_accuracies,
1410 }
1412 logger.info(
1413 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, "
1414 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, "
1415 f"avg_time: {avg_time_per_example}"
1416 )
1418 return status_data
1420 except Exception:
1421 logger.exception("Error getting benchmark status")
1422 return None
1424 def cancel_benchmark(
1425 self, benchmark_run_id: int, username: str = None
1426 ) -> bool:
1427 """Cancel a running benchmark."""
1428 try:
1429 if benchmark_run_id in self.active_runs:
1430 self.active_runs[benchmark_run_id]["status"] = "cancelled"
1432 self.update_benchmark_status(
1433 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username
1434 )
1435 logger.info(f"Cancelled benchmark run {benchmark_run_id}")
1436 return True
1438 except Exception:
1439 logger.exception(f"Error cancelling benchmark {benchmark_run_id}")
1440 return False
1443# Global service instance
1444benchmark_service = BenchmarkService()