Coverage for src / local_deep_research / benchmarks / web_api / benchmark_service.py: 45%
485 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""Benchmark service for handling web-based benchmark execution."""
3import hashlib
4import json
5import threading
6import time
7from datetime import UTC, datetime
8from enum import Enum
9from typing import Any, Dict, List, Optional
11from loguru import logger
13from ...api.research_functions import quick_summary
14from ...database.models.benchmark import (
15 BenchmarkResult,
16 BenchmarkRun,
17 BenchmarkStatus,
18 DatasetType,
19)
20from ...web.services.socket_service import SocketIOService
21from ..datasets import load_dataset
22from ..graders import extract_answer_from_response, grade_single_result
23from ..runners import format_query
26class BenchmarkTaskStatus(Enum):
27 """Status values for benchmark tasks in the queue tracker."""
29 QUEUED = "queued"
30 PROCESSING = "processing"
31 COMPLETED = "completed"
32 FAILED = "failed"
33 CANCELLED = "cancelled"
36class BenchmarkQueueTracker:
37 """Simple in-memory tracker for benchmark queue status.
39 This replaces the removed memory_queue functionality for benchmarks.
40 Since benchmarks are temporary and don't need persistence,
41 this simple in-memory solution is sufficient.
43 Thread-safe for concurrent access from multiple benchmark threads.
44 """
46 def __init__(self):
47 self.tasks = {}
48 self._lock = threading.Lock()
50 def add_task(
51 self, task_id: str, username: str, task_type: str = "benchmark"
52 ):
53 """Add a new task to tracking.
55 Also performs opportunistic cleanup of old completed tasks.
56 """
57 # Cleanup old tasks before adding new one (outside lock for better performance)
58 self.cleanup_completed_tasks()
60 with self._lock:
61 self.tasks[task_id] = {
62 "username": username,
63 "task_type": task_type,
64 "status": BenchmarkTaskStatus.QUEUED.value,
65 "created_at": datetime.now(UTC),
66 }
68 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus):
69 """Update the status of a task."""
70 with self._lock:
71 if task_id in self.tasks:
72 self.tasks[task_id]["status"] = status.value
73 self.tasks[task_id]["updated_at"] = datetime.now(UTC)
74 else:
75 logger.warning(
76 f"Attempted to update status for non-existent task: {task_id}"
77 )
79 def get_task_status(self, task_id: str) -> Optional[Dict]:
80 """Get the current status of a task."""
81 with self._lock:
82 return self.tasks.get(task_id)
84 def remove_task(self, task_id: str):
85 """Remove a task from tracking."""
86 with self._lock:
87 self.tasks.pop(task_id, None)
89 def cleanup_completed_tasks(self, max_age_seconds: int = 3600):
90 """Remove completed tasks older than max_age_seconds.
92 Args:
93 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour)
94 """
95 with self._lock:
96 now = datetime.now(UTC)
97 to_remove = []
98 for task_id, task_data in self.tasks.items():
99 # Only cleanup completed, failed, or cancelled tasks
100 if task_data["status"] in [
101 BenchmarkTaskStatus.COMPLETED.value,
102 BenchmarkTaskStatus.FAILED.value,
103 BenchmarkTaskStatus.CANCELLED.value,
104 ]:
105 # Check if task has updated_at timestamp
106 updated_at = task_data.get(
107 "updated_at", task_data.get("created_at")
108 )
109 if updated_at: 109 ↛ 98line 109 didn't jump to line 98 because the condition on line 109 was always true
110 age = (now - updated_at).total_seconds()
111 if age > max_age_seconds:
112 to_remove.append(task_id)
114 for task_id in to_remove:
115 self.tasks.pop(task_id, None)
116 logger.debug(f"Cleaned up old task: {task_id}")
118 if to_remove:
119 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks")
122class BenchmarkService:
123 """Service for managing benchmark runs through the web interface."""
125 def __init__(self, socket_service=None):
126 self.active_runs: Dict[int, Dict] = {}
127 self.socket_service = socket_service or self._get_socket_service()
128 self.rate_limit_detected: Dict[
129 int, bool
130 ] = {} # Track rate limiting per benchmark run
131 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker
133 def _get_socket_service(self):
134 """Get socket service instance, handling cases where Flask app is not available."""
135 try:
136 return SocketIOService()
137 except Exception:
138 # Return a mock socket service for testing/standalone use
139 class MockSocketService:
140 def emit_to_room(self, room, event, data):
141 pass
143 return MockSocketService()
145 def generate_config_hash(self, search_config: Dict[str, Any]) -> str:
146 """Generate a hash for search configuration compatibility checking."""
147 relevant_params = {
148 "iterations": search_config.get("iterations"),
149 "questions_per_iteration": search_config.get(
150 "questions_per_iteration"
151 ),
152 "search_tool": search_config.get("search_tool"),
153 "search_strategy": search_config.get("search_strategy"),
154 "model_name": search_config.get("model_name"),
155 "provider": search_config.get("provider"),
156 }
157 # Remove None values
158 relevant_params = {
159 k: v for k, v in relevant_params.items() if v is not None
160 }
161 config_str = json.dumps(relevant_params, sort_keys=True)
162 return hashlib.md5( # DevSkim: ignore DS126858
163 config_str.encode(), usedforsecurity=False
164 ).hexdigest()[:8]
166 def generate_query_hash(self, question: str, dataset_type: str) -> str:
167 """Generate a hash for a query to enable deduplication."""
168 query_content = f"{question.strip()}|{dataset_type.lower()}"
169 return hashlib.md5( # DevSkim: ignore DS126858
170 query_content.encode(), usedforsecurity=False
171 ).hexdigest()
173 def create_benchmark_run(
174 self,
175 run_name: Optional[str],
176 search_config: Dict[str, Any],
177 evaluation_config: Dict[str, Any],
178 datasets_config: Dict[str, Dict],
179 username: str = None,
180 user_password: str = None,
181 ) -> int:
182 """Create a new benchmark run in the database."""
183 from ...database.session_context import get_user_db_session
185 with get_user_db_session(username, user_password) as session:
186 try:
187 config_hash = self.generate_config_hash(search_config)
189 # Calculate total examples
190 total_examples = sum(
191 config.get("count", 0)
192 for config in datasets_config.values()
193 )
195 benchmark_run = BenchmarkRun(
196 run_name=run_name,
197 config_hash=config_hash,
198 query_hash_list=[], # Will be populated as we process
199 search_config=search_config,
200 evaluation_config=evaluation_config,
201 datasets_config=datasets_config,
202 total_examples=total_examples,
203 status=BenchmarkStatus.PENDING,
204 )
206 session.add(benchmark_run)
207 session.commit()
209 logger.info(
210 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}"
211 )
212 return benchmark_run.id
214 except Exception:
215 session.rollback()
216 logger.exception("Error creating benchmark run")
217 raise
219 def get_existing_results(
220 self, config_hash: str, username: str = None, user_password: str = None
221 ) -> Dict[str, Dict]:
222 """Get existing results with compatible configuration."""
223 from ...database.session_context import get_user_db_session
225 with get_user_db_session(username, user_password) as session:
226 try:
227 # Find compatible runs
228 compatible_runs = (
229 session.query(BenchmarkRun)
230 .filter(BenchmarkRun.config_hash == config_hash)
231 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED)
232 .all()
233 )
235 existing_results = {}
236 for run in compatible_runs:
237 results = (
238 session.query(BenchmarkResult)
239 .filter(BenchmarkResult.benchmark_run_id == run.id)
240 .filter(
241 BenchmarkResult.is_correct.isnot(None)
242 ) # Only completed evaluations
243 .all()
244 )
246 for result in results:
247 existing_results[result.query_hash] = {
248 "id": result.example_id,
249 "dataset_type": result.dataset_type.value,
250 "problem": result.question,
251 "correct_answer": result.correct_answer,
252 "response": result.response,
253 "extracted_answer": result.extracted_answer,
254 "confidence": result.confidence,
255 "processing_time": result.processing_time,
256 "sources": result.sources,
257 "is_correct": result.is_correct,
258 "graded_confidence": result.graded_confidence,
259 "grader_response": result.grader_response,
260 "query_hash": result.query_hash,
261 }
263 logger.info(
264 f"Found {len(existing_results)} existing results for config hash {config_hash}"
265 )
266 return existing_results
268 except Exception:
269 logger.exception("Error loading existing results")
270 return {}
272 def start_benchmark(
273 self,
274 benchmark_run_id: int,
275 username: str = None,
276 user_password: str = None,
277 ) -> bool:
278 """Start a benchmark run in a background thread."""
279 from ...database.session_context import get_user_db_session
281 try:
282 # Get all data from the database in the main thread
283 # This avoids database access from the background thread
284 with get_user_db_session(username, user_password) as session:
285 # Get benchmark run details
286 benchmark_run = (
287 session.query(BenchmarkRun)
288 .filter(BenchmarkRun.id == benchmark_run_id)
289 .first()
290 )
291 if not benchmark_run:
292 raise ValueError(
293 f"Benchmark run {benchmark_run_id} not found"
294 )
296 # Create settings snapshot for thread safety
297 from local_deep_research.settings import SettingsManager
299 settings_manager = SettingsManager(session)
300 settings_snapshot = settings_manager.get_all_settings()
302 # Get user password for metrics tracking in background thread
303 from flask import session as flask_session
304 from ...database.session_passwords import session_password_store
306 user_password = None
307 session_id = flask_session.get("session_id")
308 if session_id and username: 308 ↛ 318line 308 didn't jump to line 318 because the condition on line 308 was always true
309 user_password = session_password_store.get_session_password(
310 username, session_id
311 )
312 if not user_password: 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true
313 logger.warning(
314 f"No password found for user {username} in current session"
315 )
317 # Extract all data we need
318 benchmark_data = {
319 "benchmark_run_id": benchmark_run_id,
320 "username": username or "benchmark_user",
321 "user_password": user_password, # Add password for metrics tracking
322 "config_hash": benchmark_run.config_hash,
323 "datasets_config": benchmark_run.datasets_config,
324 "search_config": benchmark_run.search_config,
325 "evaluation_config": benchmark_run.evaluation_config,
326 "existing_results": self.get_existing_results(
327 benchmark_run.config_hash, username, user_password
328 ),
329 "settings_snapshot": settings_snapshot, # Add settings snapshot
330 }
332 # Update status in database
333 benchmark_run.status = BenchmarkStatus.IN_PROGRESS
334 benchmark_run.start_time = datetime.now(UTC)
335 session.commit()
337 # Store data in memory for the thread
338 self.active_runs[benchmark_run_id] = {
339 "data": benchmark_data,
340 "start_time": datetime.now(UTC),
341 "status": "running",
342 "results": [],
343 }
345 # Start background thread
346 thread = threading.Thread(
347 target=self._run_benchmark_thread,
348 args=(benchmark_run_id,),
349 daemon=True,
350 )
351 thread.start()
353 self.active_runs[benchmark_run_id]["thread"] = thread
355 logger.info(f"Started benchmark run {benchmark_run_id}")
356 return True
358 except Exception as e:
359 logger.exception(f"Error starting benchmark {benchmark_run_id}")
360 # Update status using user database
361 with get_user_db_session(username, user_password) as session:
362 benchmark_run = (
363 session.query(BenchmarkRun)
364 .filter(BenchmarkRun.id == benchmark_run_id)
365 .first()
366 )
367 if benchmark_run: 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true
368 benchmark_run.status = BenchmarkStatus.FAILED
369 benchmark_run.error_message = str(e)
370 session.commit()
371 return False
373 def _run_benchmark_thread(self, benchmark_run_id: int):
374 """Main benchmark execution thread."""
375 # IMPORTANT: This runs in a background thread, so we cannot access the user database
376 # Using in-memory queue tracker for benchmark status tracking
378 task_id = None
379 try:
380 # Get the benchmark data that was passed to us
381 # We need to retrieve this from the service database or from memory
382 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get(
383 "data"
384 )
386 if not benchmark_data:
387 raise ValueError(
388 f"Benchmark data for run {benchmark_run_id} not found"
389 )
391 # Set up settings context for thread-local access
392 settings_snapshot = benchmark_data.get("settings_snapshot", {})
393 username = benchmark_data.get("username", "benchmark_user")
395 # Create a settings context that threads can use
396 class SettingsContext:
397 def __init__(self, snapshot, username):
398 self.snapshot = snapshot or {}
399 self.username = username
400 # Extract values from setting objects if needed
401 self.values = {}
402 for key, setting in self.snapshot.items():
403 if isinstance(setting, dict) and "value" in setting:
404 # It's a full setting object, extract the value
405 self.values[key] = setting["value"]
406 else:
407 # It's already just a value
408 self.values[key] = setting
410 def get_setting(self, key, default=None):
411 """Get setting from snapshot only - no database access in threads"""
412 if key in self.values:
413 return self.values[key]
414 # No fallback to database - threads must use snapshot only
415 logger.warning(
416 f"Setting '{key}' not found in snapshot, using default"
417 )
418 return default
420 settings_context = SettingsContext(settings_snapshot, username)
422 # Set the context in thread-local storage
423 from ...config.thread_settings import (
424 clear_settings_context,
425 set_settings_context,
426 )
428 set_settings_context(settings_context)
430 # Extract all the data we need
431 datasets_config = benchmark_data["datasets_config"]
432 search_config = benchmark_data["search_config"]
433 evaluation_config = benchmark_data["evaluation_config"]
434 existing_results = benchmark_data.get("existing_results", {})
436 # Create task queue
437 task_queue = self._create_task_queue(
438 datasets_config,
439 existing_results,
440 benchmark_run_id,
441 )
443 # Calculate totals
444 total_examples = len(task_queue) + len(existing_results)
445 completed_examples = len(existing_results)
447 # Initialize task tracking
448 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}"
449 username = benchmark_data.get("username", "benchmark_user")
450 self.queue_tracker.add_task(task_id, username, "benchmark")
451 self.queue_tracker.update_task_status(
452 task_id, BenchmarkTaskStatus.PROCESSING
453 )
455 # Track progress in memory
456 progress_info = {
457 "total_examples": total_examples,
458 "completed_examples": completed_examples,
459 "failed_examples": 0,
460 "start_time": datetime.now(UTC),
461 }
463 # Process tasks
464 logger.info(
465 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks"
466 )
467 for i, task in enumerate(task_queue):
468 # Check if benchmark has been cancelled
469 if (
470 benchmark_run_id in self.active_runs
471 and self.active_runs[benchmark_run_id].get("status")
472 == "cancelled"
473 ):
474 logger.info(
475 f"Benchmark {benchmark_run_id} was cancelled, stopping processing"
476 )
477 break
479 logger.info(
480 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}"
481 )
482 try:
483 # Add username and password to task for metrics tracking
484 task["username"] = benchmark_data.get("username")
485 task["user_password"] = benchmark_data.get("user_password")
487 # Process single task
488 result = self._process_benchmark_task(
489 task,
490 search_config,
491 evaluation_config,
492 )
494 # Store result in memory for now (will be saved later)
495 if "results" not in self.active_runs[benchmark_run_id]:
496 self.active_runs[benchmark_run_id]["results"] = []
497 self.active_runs[benchmark_run_id]["results"].append(result)
499 # Update progress
500 progress_info["completed_examples"] += 1
502 logger.info(
503 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. "
504 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples"
505 )
507 # Send real-time update
508 self._send_progress_update(
509 benchmark_run_id,
510 progress_info["completed_examples"],
511 progress_info["total_examples"],
512 )
514 except Exception as e:
515 logger.exception(f"Error processing task {i}")
516 progress_info["failed_examples"] += 1
517 logger.info(
518 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. "
519 f"Total failed: {progress_info['failed_examples']}"
520 )
522 # Check if this is a rate limiting error
523 error_str = str(e).lower()
524 if (
525 "403" in error_str
526 or "rate limit" in error_str
527 or "forbidden" in error_str
528 ):
529 self.rate_limit_detected[benchmark_run_id] = True
530 # Send rate limit warning via WebSocket
531 self.socket_service.emit_to_subscribers(
532 "research_progress",
533 benchmark_run_id,
534 {
535 "rate_limit_detected": True,
536 "message": "SearXNG rate limiting detected",
537 },
538 )
540 # Mark as completed in memory tracker
541 progress_info["end_time"] = datetime.now(UTC)
543 # Check if benchmark was cancelled
544 was_cancelled = (
545 benchmark_run_id in self.active_runs
546 and self.active_runs[benchmark_run_id].get("status")
547 == "cancelled"
548 )
550 if was_cancelled:
551 status = BenchmarkStatus.CANCELLED
552 message = "Benchmark cancelled by user"
553 if task_id:
554 self.queue_tracker.update_task_status(
555 task_id, BenchmarkTaskStatus.CANCELLED
556 )
557 else:
558 status = BenchmarkStatus.COMPLETED
559 message = "Benchmark completed successfully"
560 if task_id:
561 self.queue_tracker.update_task_status(
562 task_id, BenchmarkTaskStatus.COMPLETED
563 )
565 # Store completion info for later database update
566 self.active_runs[benchmark_run_id]["completion_info"] = {
567 "status": status,
568 "end_time": progress_info["end_time"],
569 "completed_examples": progress_info["completed_examples"],
570 "failed_examples": progress_info["failed_examples"],
571 }
573 # Send completion notification
574 self.socket_service.emit_to_subscribers(
575 "research_progress",
576 benchmark_run_id,
577 {
578 "status": "cancelled" if was_cancelled else "completed",
579 "message": message,
580 "progress": (
581 progress_info["completed_examples"]
582 / progress_info["total_examples"]
583 * 100
584 )
585 if progress_info["total_examples"] > 0
586 else 0,
587 "benchmark_run_id": benchmark_run_id,
588 },
589 )
591 except Exception as e:
592 logger.exception(f"Benchmark run {benchmark_run_id} failed")
593 # Update task status if we have a task_id
594 if task_id:
595 self.queue_tracker.update_task_status(
596 task_id, BenchmarkTaskStatus.FAILED
597 )
598 # Store failure info for later database update
599 if benchmark_run_id in self.active_runs:
600 self.active_runs[benchmark_run_id]["completion_info"] = {
601 "status": BenchmarkStatus.FAILED,
602 "error_message": str(e),
603 }
604 finally:
605 # Clear thread-local settings context to prevent leaks
606 clear_settings_context()
608 # Clean up active run tracking
609 if benchmark_run_id in self.active_runs:
610 # Mark that thread is done but keep data for database update
611 self.active_runs[benchmark_run_id]["thread_complete"] = True
613 # Try to save results to database immediately if possible
614 self._sync_results_to_database(benchmark_run_id)
616 def _create_task_queue(
617 self,
618 datasets_config: Dict,
619 existing_results: Dict,
620 benchmark_run_id: int,
621 ) -> List[Dict]:
622 """Create list of tasks to process, excluding existing results."""
623 tasks = []
625 for dataset_name, config in datasets_config.items():
626 if config.get("count", 0) > 0: 626 ↛ 625line 626 didn't jump to line 625 because the condition on line 626 was always true
627 dataset = load_dataset(
628 dataset_type=dataset_name,
629 num_examples=config["count"],
630 seed=None,
631 )
633 for i, example in enumerate(dataset):
634 # Extract question based on dataset type
635 if dataset_name.lower() == "simpleqa": 635 ↛ 639line 635 didn't jump to line 639 because the condition on line 635 was always true
636 question = example.get("problem", "")
637 correct_answer = example.get("answer", "")
638 else: # browsecomp
639 question = example.get("problem", "")
640 correct_answer = example.get("answer", "")
642 # Generate query hash
643 query_hash = self.generate_query_hash(
644 question, dataset_name
645 )
647 # Skip if already processed
648 if query_hash in existing_results:
649 continue
651 tasks.append(
652 {
653 "benchmark_run_id": benchmark_run_id,
654 "example_id": example.get("id", f"example_{i}"),
655 "dataset_type": dataset_name,
656 "question": question,
657 "correct_answer": correct_answer,
658 "query_hash": query_hash,
659 "task_index": len(tasks),
660 }
661 )
663 return tasks
665 def _process_benchmark_task(
666 self, task: Dict, search_config: Dict, evaluation_config: Dict
667 ) -> Dict:
668 """Process a single benchmark task."""
669 try:
670 logger.info(
671 f"Starting benchmark task {task['task_index'] + 1}: "
672 f"example_id={task['example_id']}, dataset={task['dataset_type']}, "
673 f"question_preview='{task['question'][:100]}...'"
674 )
676 # Get settings context from thread-local storage
677 from ...config.thread_settings import get_settings_context
679 settings_context = get_settings_context()
681 # Generate a unique tracking ID for this benchmark task
682 import uuid
684 tracking_id = str(uuid.uuid4())
685 logger.info(
686 f"Task {task['example_id']} assigned tracking_id: {tracking_id}"
687 )
689 # Format query
690 formatted_query = format_query(
691 task["question"], task["dataset_type"]
692 )
693 logger.info(
694 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'"
695 )
697 # Run research with progress callback for WebSocket updates
698 start_time = time.time()
699 logger.info(f"Task {task['example_id']} starting research phase...")
701 def benchmark_progress_callback(
702 status: str, progress: int, data: dict
703 ):
704 """Progress callback to emit detailed research progress via WebSocket"""
705 try:
706 timestamp = datetime.now(UTC).isoformat()
708 # Create research-compatible log entry
709 log_entry = {
710 "time": timestamp,
711 "message": f"Example {task['example_id']}: {status}",
712 "progress": progress,
713 "metadata": {
714 "phase": data.get("phase", "benchmark_processing"),
715 "type": data.get("type", "info"),
716 "example_id": task["example_id"],
717 "benchmark_run_id": task["benchmark_run_id"],
718 **data, # Include all other data
719 },
720 }
722 # Determine log type based on status/message content
723 if (
724 "complete" in status.lower()
725 or "finished" in status.lower()
726 ):
727 log_entry["metadata"]["type"] = "milestone"
728 elif (
729 "error" in status.lower() or "failed" in status.lower()
730 ):
731 log_entry["metadata"]["type"] = "error"
732 elif (
733 "starting" in status.lower()
734 or "begin" in status.lower()
735 ):
736 log_entry["metadata"]["type"] = "milestone"
738 # Create progress data in research format
739 progress_data = {
740 "progress": progress,
741 "message": status,
742 "status": "in_progress",
743 "log_entry": log_entry,
744 "progress_log": json.dumps(
745 [log_entry]
746 ), # Array format expected by socket.js
747 }
749 # Emit using research_progress format that the UI expects
750 self.socket_service.emit_to_subscribers(
751 "research_progress",
752 task["benchmark_run_id"],
753 progress_data,
754 )
756 except Exception:
757 logger.exception("Error sending benchmark progress update")
759 # Get user password from task data
760 user_password = task.get("user_password")
762 search_result = quick_summary(
763 query=formatted_query,
764 research_id=tracking_id, # Pass the tracking ID
765 iterations=search_config.get("iterations", 8),
766 questions_per_iteration=search_config.get(
767 "questions_per_iteration", 5
768 ),
769 search_tool=search_config.get("search_tool", "searxng"),
770 search_strategy=search_config.get(
771 "search_strategy", "focused_iteration"
772 ),
773 progress_callback=benchmark_progress_callback,
774 model_name=search_config.get("model_name"),
775 provider=search_config.get("provider"),
776 temperature=search_config.get("temperature", 0.7),
777 openai_endpoint_url=search_config.get("openai_endpoint_url"),
778 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety
779 username=task.get("username"), # Pass username
780 user_password=user_password, # Pass password for metrics tracking
781 )
782 processing_time = time.time() - start_time
783 logger.info(
784 f"Task {task['example_id']} research completed in {processing_time:.2f}s, "
785 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}"
786 )
788 # Extract answer
789 response = search_result.get("summary", "")
790 logger.info(
791 f"Task {task['example_id']} response length: {len(response)} chars"
792 )
794 extracted_data = extract_answer_from_response(
795 response, task["dataset_type"]
796 )
797 extracted_answer = (
798 extracted_data.get("extracted_answer", "")
799 if isinstance(extracted_data, dict)
800 else str(extracted_data)
801 )
802 logger.info(
803 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'"
804 )
806 # Extract sources - handle both direct sources and all_links_of_system
807 sources = search_result.get("sources", [])
808 if not sources and "all_links_of_system" in search_result: 808 ↛ 809line 808 didn't jump to line 809 because the condition on line 808 was never true
809 sources = search_result.get("all_links_of_system", [])
811 # Log for debugging
812 logger.debug(f"Search result keys: {list(search_result.keys())}")
813 logger.debug(f"Sources found: {len(sources)} items")
815 # Prepare result
816 result = {
817 **task,
818 "response": response,
819 "extracted_answer": extracted_answer,
820 "confidence": str(
821 extracted_data.get("confidence", "100")
822 if isinstance(extracted_data, dict)
823 else "100"
824 ),
825 "processing_time": processing_time,
826 "sources": json.dumps(sources), # Convert to JSON string
827 "completed_at": datetime.now(UTC),
828 "research_id": tracking_id, # Store the UUID in the research_id field
829 }
831 # Evaluate result - requires proper grading model
832 try:
833 logger.info(f"Task {task['example_id']} starting evaluation...")
834 eval_start_time = time.time()
836 # Always attempt evaluation, regardless of provider
837 # Modern local models like Ollama are capable of grading
838 # Try to evaluate with proper model
839 result_data = {
840 "id": task["example_id"],
841 "problem": task["question"],
842 "correct_answer": task["correct_answer"],
843 "response": response,
844 "extracted_answer": extracted_answer,
845 }
847 eval_result = grade_single_result(
848 result_data,
849 task["dataset_type"],
850 evaluation_config,
851 settings_context.snapshot,
852 )
853 eval_time = time.time() - eval_start_time
854 logger.info(
855 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s"
856 )
857 if eval_result and not eval_result.get("grading_error"): 857 ↛ 870line 857 didn't jump to line 870 because the condition on line 857 was always true
858 result.update(
859 {
860 "is_correct": eval_result.get("is_correct", False),
861 "graded_confidence": eval_result.get(
862 "graded_confidence", "0"
863 ),
864 "grader_response": eval_result.get(
865 "grader_response", ""
866 ),
867 }
868 )
869 else:
870 error_msg = (
871 eval_result.get(
872 "grading_error", "Unknown evaluation error"
873 )
874 if eval_result
875 else "No evaluation results returned"
876 )
877 result.update(
878 {
879 "is_correct": None,
880 "graded_confidence": "0",
881 "grader_response": f"Evaluation failed: {error_msg}",
882 "evaluation_error": error_msg,
883 }
884 )
886 except Exception as e:
887 logger.exception("Evaluation error")
888 result.update(
889 {
890 "is_correct": None,
891 "graded_confidence": "0",
892 "grader_response": f"Evaluation failed: {e!s}",
893 "evaluation_error": str(e),
894 }
895 )
897 return result
899 except Exception as e:
900 logger.exception("Research error")
901 return {
902 **task,
903 "research_error": str(e),
904 "completed_at": datetime.now(UTC),
905 }
907 def sync_pending_results(self, benchmark_run_id: int, username: str = None):
908 """Sync any pending results to database. Can be called from main thread."""
909 if benchmark_run_id not in self.active_runs: 909 ↛ 912line 909 didn't jump to line 912 because the condition on line 909 was always true
910 return 0
912 run_data = self.active_runs[benchmark_run_id]
913 results_to_save = run_data.get("results", [])
914 saved_indices = run_data.get("saved_indices", set())
916 if not username:
917 username = run_data.get("data", {}).get("username")
919 user_password = run_data.get("data", {}).get("user_password")
921 saved_count = 0
922 from ...database.session_context import get_user_db_session
923 from ...database.models.benchmark import BenchmarkResult
925 try:
926 with get_user_db_session(username, user_password) as session:
927 # Save any results that haven't been saved yet
928 for idx, result in enumerate(results_to_save):
929 if idx not in saved_indices:
930 # Check if this result already exists in the database
931 existing = (
932 session.query(BenchmarkResult)
933 .filter_by(
934 benchmark_run_id=benchmark_run_id,
935 query_hash=result["query_hash"],
936 )
937 .first()
938 )
940 if existing:
941 # Skip if already exists
942 saved_indices.add(idx)
943 continue
945 benchmark_result = BenchmarkResult(
946 benchmark_run_id=benchmark_run_id,
947 example_id=result["example_id"],
948 query_hash=result["query_hash"],
949 dataset_type=DatasetType(result["dataset_type"]),
950 research_id=result.get("research_id"),
951 question=result["question"],
952 correct_answer=result["correct_answer"],
953 response=result.get("response"),
954 extracted_answer=result.get("extracted_answer"),
955 confidence=result.get("confidence"),
956 processing_time=result.get("processing_time"),
957 sources=result.get("sources"),
958 is_correct=result.get("is_correct"),
959 graded_confidence=result.get("graded_confidence"),
960 grader_response=result.get("grader_response"),
961 completed_at=result.get("completed_at"),
962 research_error=result.get("research_error"),
963 evaluation_error=result.get("evaluation_error"),
964 task_index=result.get("task_index"),
965 )
966 session.add(benchmark_result)
967 saved_indices.add(idx)
968 saved_count += 1
970 if saved_count > 0:
971 session.commit()
972 run_data["saved_indices"] = saved_indices
973 logger.info(
974 f"Saved {saved_count} new results for benchmark {benchmark_run_id}"
975 )
977 except Exception:
978 logger.exception(
979 f"Error syncing pending results for benchmark {benchmark_run_id}"
980 )
981 # Roll back the session on error to prevent PendingRollbackError
982 try:
983 session.rollback()
984 except Exception:
985 logger.debug("Failed to rollback session after sync error")
987 return saved_count
989 def _sync_results_to_database(self, benchmark_run_id: int):
990 """Sync benchmark results from memory to database after thread completes."""
991 if benchmark_run_id not in self.active_runs:
992 return
994 run_data = self.active_runs[benchmark_run_id]
995 if not run_data.get("thread_complete"):
996 return
998 username = run_data.get("data", {}).get("username")
999 user_password = run_data.get("data", {}).get("user_password")
1000 from ...database.session_context import get_user_db_session
1002 try:
1003 with get_user_db_session(username, user_password) as session:
1004 # Update benchmark run status
1005 benchmark_run = (
1006 session.query(BenchmarkRun)
1007 .filter(BenchmarkRun.id == benchmark_run_id)
1008 .first()
1009 )
1011 if benchmark_run and "completion_info" in run_data:
1012 info = run_data["completion_info"]
1013 benchmark_run.status = info["status"]
1014 benchmark_run.end_time = info.get(
1015 "end_time", datetime.now(UTC)
1016 )
1017 benchmark_run.completed_examples = info.get(
1018 "completed_examples", 0
1019 )
1020 benchmark_run.failed_examples = info.get(
1021 "failed_examples", 0
1022 )
1023 benchmark_run.error_message = info.get("error_message")
1025 # Save all results (skip already saved ones)
1026 saved_indices = run_data.get("saved_indices", set())
1027 for idx, result in enumerate(run_data.get("results", [])):
1028 if idx in saved_indices:
1029 continue
1030 benchmark_result = BenchmarkResult(
1031 benchmark_run_id=benchmark_run_id,
1032 example_id=result["example_id"],
1033 query_hash=result["query_hash"],
1034 dataset_type=DatasetType(result["dataset_type"]),
1035 research_id=result.get("research_id"),
1036 question=result["question"],
1037 correct_answer=result["correct_answer"],
1038 response=result.get("response"),
1039 extracted_answer=result.get("extracted_answer"),
1040 confidence=result.get("confidence"),
1041 processing_time=result.get("processing_time"),
1042 sources=result.get("sources"),
1043 is_correct=result.get("is_correct"),
1044 graded_confidence=result.get("graded_confidence"),
1045 grader_response=result.get("grader_response"),
1046 completed_at=result.get("completed_at"),
1047 research_error=result.get("research_error"),
1048 evaluation_error=result.get("evaluation_error"),
1049 task_index=result.get("task_index"),
1050 )
1051 session.add(benchmark_result)
1053 # Calculate final accuracy
1054 if benchmark_run.status == BenchmarkStatus.COMPLETED:
1055 correct_results = [
1056 r
1057 for r in run_data.get("results", [])
1058 if r.get("is_correct")
1059 ]
1060 evaluated_results = [
1061 r
1062 for r in run_data.get("results", [])
1063 if r.get("is_correct") is not None
1064 ]
1066 if evaluated_results:
1067 benchmark_run.overall_accuracy = (
1068 len(correct_results) / len(evaluated_results)
1069 ) * 100
1071 # Calculate processing rate
1072 total_time = sum(
1073 r.get("processing_time", 0)
1074 for r in evaluated_results
1075 )
1076 if total_time > 0:
1077 benchmark_run.processing_rate = len(
1078 evaluated_results
1079 ) / (total_time / 60)
1081 session.commit()
1082 logger.info(
1083 f"Successfully synced results for benchmark {benchmark_run_id}"
1084 )
1086 # Clean up memory
1087 del self.active_runs[benchmark_run_id]
1089 except Exception:
1090 logger.exception("Error syncing benchmark results to database")
1092 def _send_progress_update(
1093 self, benchmark_run_id: int, completed: int, total: int
1094 ):
1095 """Send real-time progress update via websocket."""
1096 try:
1097 percentage = (completed / total * 100) if total > 0 else 0
1099 # Create log entry for milestone progress
1100 log_entry = {
1101 "time": datetime.now(UTC).isoformat(),
1102 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)",
1103 "progress": percentage,
1104 "metadata": {
1105 "phase": "benchmark_progress",
1106 "type": "milestone",
1107 "completed": completed,
1108 "total": total,
1109 "benchmark_run_id": benchmark_run_id,
1110 },
1111 }
1113 progress_data = {
1114 "status": "in_progress",
1115 "message": f"Processing examples: {completed}/{total}",
1116 "progress": percentage,
1117 "completed": completed,
1118 "total": total,
1119 "benchmark_run_id": benchmark_run_id,
1120 "log_entry": log_entry,
1121 "progress_log": json.dumps([log_entry]),
1122 }
1124 self.socket_service.emit_to_subscribers(
1125 "research_progress", benchmark_run_id, progress_data
1126 )
1128 except Exception:
1129 logger.exception("Error sending progress update")
1131 def _calculate_final_accuracy(
1132 self, benchmark_run_id: int, username: str = None
1133 ):
1134 """Calculate and save final accuracy metrics."""
1135 from ...database.session_context import get_user_db_session
1137 with get_user_db_session(username) as session:
1138 try:
1139 # Get all results for this run
1140 results = (
1141 session.query(BenchmarkResult)
1142 .filter(
1143 BenchmarkResult.benchmark_run_id == benchmark_run_id
1144 )
1145 .filter(BenchmarkResult.is_correct.isnot(None))
1146 .all()
1147 )
1149 if results:
1150 correct_count = sum(1 for r in results if r.is_correct)
1151 overall_accuracy = (correct_count / len(results)) * 100
1153 # Calculate processing rate
1154 total_time = sum(r.processing_time or 0 for r in results)
1155 processing_rate = (
1156 (len(results) / (total_time / 60))
1157 if total_time > 0
1158 else 0
1159 )
1161 # Update benchmark run
1162 benchmark_run = (
1163 session.query(BenchmarkRun)
1164 .filter(BenchmarkRun.id == benchmark_run_id)
1165 .first()
1166 )
1167 if benchmark_run:
1168 benchmark_run.overall_accuracy = overall_accuracy
1169 benchmark_run.processing_rate = processing_rate
1170 session.commit()
1172 except Exception:
1173 logger.exception("Error calculating final accuracy")
1175 def update_benchmark_status(
1176 self,
1177 benchmark_run_id: int,
1178 status: BenchmarkStatus,
1179 error_message: str = None,
1180 username: str = None,
1181 ):
1182 """Update benchmark run status."""
1183 from ...database.session_context import get_user_db_session
1185 with get_user_db_session(username) as session:
1186 try:
1187 benchmark_run = (
1188 session.query(BenchmarkRun)
1189 .filter(BenchmarkRun.id == benchmark_run_id)
1190 .first()
1191 )
1192 if benchmark_run: 1192 ↛ exitline 1192 didn't jump to the function exit
1193 benchmark_run.status = status
1194 benchmark_run.updated_at = datetime.now(UTC)
1196 if error_message:
1197 benchmark_run.error_message = error_message
1199 if (
1200 status == BenchmarkStatus.IN_PROGRESS
1201 and not benchmark_run.start_time
1202 ):
1203 benchmark_run.start_time = datetime.now(UTC)
1204 elif ( 1204 ↛ 1211line 1204 didn't jump to line 1211 because the condition on line 1204 was always true
1205 status
1206 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED]
1207 and not benchmark_run.end_time
1208 ):
1209 benchmark_run.end_time = datetime.now(UTC)
1211 session.commit()
1213 except Exception:
1214 session.rollback()
1215 logger.exception("Error updating benchmark status")
1217 def get_benchmark_status(
1218 self, benchmark_run_id: int, username: str = None
1219 ) -> Optional[Dict]:
1220 """Get current status of a benchmark run."""
1221 from ...database.session_context import get_user_db_session
1223 with get_user_db_session(username) as session:
1224 try:
1225 benchmark_run = (
1226 session.query(BenchmarkRun)
1227 .filter(BenchmarkRun.id == benchmark_run_id)
1228 .first()
1229 )
1230 if not benchmark_run: 1230 ↛ 1235line 1230 didn't jump to line 1235 because the condition on line 1230 was always true
1231 return None
1233 # Calculate running accuracy from current results AND reused results from compatible runs
1234 # First get results specifically for this benchmark run
1235 current_results = (
1236 session.query(BenchmarkResult)
1237 .filter(
1238 BenchmarkResult.benchmark_run_id == benchmark_run_id
1239 )
1240 .filter(BenchmarkResult.is_correct.isnot(None))
1241 .all()
1242 )
1244 # Then get reused results from compatible benchmark runs (same config hash)
1245 # Only count results up to the number we say we've "completed"
1246 if benchmark_run.completed_examples > len(current_results):
1247 # We have reused results, get them from compatible runs
1248 reused_count_needed = (
1249 benchmark_run.completed_examples - len(current_results)
1250 )
1252 compatible_results = (
1253 session.query(BenchmarkResult)
1254 .join(
1255 BenchmarkRun,
1256 BenchmarkResult.benchmark_run_id == BenchmarkRun.id,
1257 )
1258 .filter(
1259 BenchmarkRun.config_hash
1260 == benchmark_run.config_hash
1261 )
1262 .filter(
1263 BenchmarkRun.id != benchmark_run_id
1264 ) # Exclude current run
1265 .filter(
1266 BenchmarkRun.status == BenchmarkStatus.COMPLETED
1267 )
1268 .filter(BenchmarkResult.is_correct.isnot(None))
1269 .order_by(BenchmarkResult.id) # Consistent ordering
1270 .limit(reused_count_needed)
1271 .all()
1272 )
1274 # Combine current and reused results
1275 results = (
1276 current_results
1277 + compatible_results[:reused_count_needed]
1278 )
1279 else:
1280 # No reused results, just use current results
1281 results = current_results
1283 running_accuracy = None
1284 # Dynamic per-dataset accuracy tracking
1285 dataset_accuracies = {}
1287 if results:
1288 # Overall running accuracy
1289 correct_count = sum(1 for r in results if r.is_correct)
1290 running_accuracy = (correct_count / len(results)) * 100
1292 # Calculate accuracy for each dataset type dynamically
1293 from collections import defaultdict
1295 dataset_results = defaultdict(list)
1297 # Group results by dataset type
1298 for r in results:
1299 dataset_results[r.dataset_type.value].append(r)
1301 # Calculate accuracy for each dataset
1302 for (
1303 dataset_type,
1304 dataset_result_list,
1305 ) in dataset_results.items():
1306 if dataset_result_list:
1307 correct = sum(
1308 1 for r in dataset_result_list if r.is_correct
1309 )
1310 accuracy = (
1311 correct / len(dataset_result_list)
1312 ) * 100
1313 # Store with _accuracy suffix for consistency
1314 dataset_accuracies[f"{dataset_type}_accuracy"] = (
1315 accuracy
1316 )
1318 # Calculate time estimates and reliability metrics
1319 estimated_time_remaining = None
1320 total_elapsed_time = None
1321 avg_time_per_example = None
1322 accuracy_confidence = None
1324 # Get ALL results for timing calculation (including those pending evaluation)
1325 all_results_for_timing = (
1326 session.query(BenchmarkResult)
1327 .filter(
1328 BenchmarkResult.benchmark_run_id == benchmark_run_id
1329 )
1330 .all()
1331 )
1333 if benchmark_run.start_time and all_results_for_timing:
1334 # Calculate elapsed time
1335 current_time = datetime.now(UTC)
1336 total_elapsed_time = (
1337 current_time - benchmark_run.start_time
1338 ).total_seconds()
1340 # Calculate average processing time per example using actual count
1341 avg_time_per_example = total_elapsed_time / len(
1342 all_results_for_timing
1343 )
1345 logger.info(
1346 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, "
1347 f"results_count: {len(all_results_for_timing)}, "
1348 f"avg_per_example: {avg_time_per_example:.2f}s"
1349 )
1351 # Estimate remaining time
1352 remaining_examples = benchmark_run.total_examples - len(
1353 all_results_for_timing
1354 )
1355 if remaining_examples > 0:
1356 estimated_time_remaining = (
1357 avg_time_per_example * remaining_examples
1358 )
1359 logger.info(
1360 f"Time estimation - total: {benchmark_run.total_examples}, "
1361 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, "
1362 f"avg_time: {avg_time_per_example:.2f}s, "
1363 f"estimated_remaining: {estimated_time_remaining:.2f}s"
1364 )
1366 # Calculate accuracy confidence interval (95% confidence)
1367 if results and len(results) >= 3:
1368 import math
1370 n = len(results)
1371 p = running_accuracy / 100 if running_accuracy else 0
1372 # Standard error for proportion
1373 se = math.sqrt(p * (1 - p) / n)
1374 # 95% confidence interval (±1.96 * SE)
1375 margin_of_error = 1.96 * se * 100
1376 accuracy_confidence = {
1377 "lower_bound": max(
1378 0, running_accuracy - margin_of_error
1379 ),
1380 "upper_bound": min(
1381 100, running_accuracy + margin_of_error
1382 ),
1383 "margin_of_error": margin_of_error,
1384 "sample_size": n,
1385 }
1387 status_data = {
1388 "id": benchmark_run.id,
1389 "run_name": benchmark_run.run_name,
1390 "status": benchmark_run.status.value,
1391 "completed_examples": len(
1392 all_results_for_timing
1393 ), # Use actual count from DB
1394 "total_examples": benchmark_run.total_examples,
1395 "failed_examples": benchmark_run.failed_examples,
1396 "overall_accuracy": benchmark_run.overall_accuracy
1397 or running_accuracy, # Use running accuracy if final not calculated
1398 "running_accuracy": running_accuracy, # Current running accuracy
1399 "processing_rate": benchmark_run.processing_rate,
1400 "estimated_time_remaining": estimated_time_remaining, # seconds
1401 "total_elapsed_time": total_elapsed_time, # seconds
1402 "avg_time_per_example": avg_time_per_example, # seconds
1403 "accuracy_confidence": accuracy_confidence, # confidence interval
1404 "created_at": benchmark_run.created_at.isoformat()
1405 if benchmark_run.created_at
1406 else None,
1407 "start_time": benchmark_run.start_time.isoformat()
1408 if benchmark_run.start_time
1409 else None,
1410 "end_time": benchmark_run.end_time.isoformat()
1411 if benchmark_run.end_time
1412 else None,
1413 "error_message": benchmark_run.error_message,
1414 # Add all per-dataset accuracies dynamically
1415 **dataset_accuracies,
1416 }
1418 logger.info(
1419 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, "
1420 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, "
1421 f"avg_time: {avg_time_per_example}"
1422 )
1424 return status_data
1426 except Exception:
1427 logger.exception("Error getting benchmark status")
1428 return None
1430 def cancel_benchmark(
1431 self, benchmark_run_id: int, username: str = None
1432 ) -> bool:
1433 """Cancel a running benchmark."""
1434 try:
1435 if benchmark_run_id in self.active_runs: 1435 ↛ 1438line 1435 didn't jump to line 1438 because the condition on line 1435 was always true
1436 self.active_runs[benchmark_run_id]["status"] = "cancelled"
1438 self.update_benchmark_status(
1439 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username
1440 )
1441 logger.info(f"Cancelled benchmark run {benchmark_run_id}")
1442 return True
1444 except Exception:
1445 logger.exception(f"Error cancelling benchmark {benchmark_run_id}")
1446 return False
1449# Global service instance
1450benchmark_service = BenchmarkService()