Coverage for src / local_deep_research / benchmarks / web_api / benchmark_service.py: 93%

478 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""Benchmark service for handling web-based benchmark execution.""" 

2 

3import hashlib 

4import json 

5import threading 

6import time 

7from datetime import UTC, datetime 

8from enum import Enum 

9from typing import Any, Dict, List, Optional 

10 

11from loguru import logger 

12 

13from ...api.research_functions import quick_summary 

14from ...settings.manager import SnapshotSettingsContext 

15from ...web.services.research_service import _global_research_semaphore 

16from ...database.models.benchmark import ( 

17 BenchmarkResult, 

18 BenchmarkRun, 

19 BenchmarkStatus, 

20 DatasetType, 

21) 

22from ...web.services.socket_service import SocketIOService 

23from ..datasets import load_dataset 

24from ..graders import extract_answer_from_response, grade_single_result 

25from ..runners import format_query 

26from ...database.thread_local_session import thread_cleanup 

27 

28 

29class BenchmarkTaskStatus(Enum): 

30 """Status values for benchmark tasks in the queue tracker.""" 

31 

32 QUEUED = "queued" 

33 PROCESSING = "processing" 

34 COMPLETED = "completed" 

35 FAILED = "failed" 

36 CANCELLED = "cancelled" 

37 

38 

39class BenchmarkQueueTracker: 

40 """Simple in-memory tracker for benchmark queue status. 

41 

42 This replaces the removed memory_queue functionality for benchmarks. 

43 Since benchmarks are temporary and don't need persistence, 

44 this simple in-memory solution is sufficient. 

45 

46 Thread-safe for concurrent access from multiple benchmark threads. 

47 """ 

48 

49 def __init__(self): 

50 self.tasks = {} 

51 self._lock = threading.Lock() 

52 

53 def add_task( 

54 self, task_id: str, username: str, task_type: str = "benchmark" 

55 ): 

56 """Add a new task to tracking. 

57 

58 Also performs opportunistic cleanup of old completed tasks. 

59 """ 

60 # Cleanup old tasks before adding new one (outside lock for better performance) 

61 self.cleanup_completed_tasks() 

62 

63 with self._lock: 

64 self.tasks[task_id] = { 

65 "username": username, 

66 "task_type": task_type, 

67 "status": BenchmarkTaskStatus.QUEUED.value, 

68 "created_at": datetime.now(UTC), 

69 } 

70 

71 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus): 

72 """Update the status of a task.""" 

73 with self._lock: 

74 if task_id in self.tasks: 

75 self.tasks[task_id]["status"] = status.value 

76 self.tasks[task_id]["updated_at"] = datetime.now(UTC) 

77 else: 

78 logger.warning( 

79 f"Attempted to update status for non-existent task: {task_id}" 

80 ) 

81 

82 def get_task_status(self, task_id: str) -> Optional[Dict]: 

83 """Get the current status of a task.""" 

84 with self._lock: 

85 return self.tasks.get(task_id) 

86 

87 def remove_task(self, task_id: str): 

88 """Remove a task from tracking.""" 

89 with self._lock: 

90 self.tasks.pop(task_id, None) 

91 

92 def cleanup_completed_tasks(self, max_age_seconds: int = 3600): 

93 """Remove completed tasks older than max_age_seconds. 

94 

95 Args: 

96 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour) 

97 """ 

98 with self._lock: 

99 now = datetime.now(UTC) 

100 to_remove = [] 

101 for task_id, task_data in self.tasks.items(): 

102 # Only cleanup completed, failed, or cancelled tasks 

103 if task_data["status"] in [ 

104 BenchmarkTaskStatus.COMPLETED.value, 

105 BenchmarkTaskStatus.FAILED.value, 

106 BenchmarkTaskStatus.CANCELLED.value, 

107 ]: 

108 # Check if task has updated_at timestamp 

109 updated_at = task_data.get( 

110 "updated_at", task_data.get("created_at") 

111 ) 

112 if updated_at: 112 ↛ 101line 112 didn't jump to line 101 because the condition on line 112 was always true

113 age = (now - updated_at).total_seconds() 

114 if age > max_age_seconds: 

115 to_remove.append(task_id) 

116 

117 for task_id in to_remove: 

118 self.tasks.pop(task_id, None) 

119 logger.debug(f"Cleaned up old task: {task_id}") 

120 

121 if to_remove: 

122 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks") 

123 

124 

125class BenchmarkService: 

126 """Service for managing benchmark runs through the web interface.""" 

127 

128 def __init__(self, socket_service=None): 

129 self.active_runs: Dict[int, Dict] = {} 

130 self.socket_service = socket_service or self._get_socket_service() 

131 self.rate_limit_detected: Dict[ 

132 int, bool 

133 ] = {} # Track rate limiting per benchmark run 

134 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker 

135 

136 def _get_socket_service(self): 

137 """Get socket service instance, handling cases where Flask app is not available.""" 

138 try: 

139 return SocketIOService() 

140 except Exception: 

141 # Return a mock socket service for testing/standalone use 

142 class MockSocketService: 

143 def emit_to_room(self, room, event, data): 

144 pass 

145 

146 return MockSocketService() 

147 

148 def generate_config_hash(self, search_config: Dict[str, Any]) -> str: 

149 """Generate a hash for search configuration compatibility checking.""" 

150 relevant_params = { 

151 "iterations": search_config.get("iterations"), 

152 "questions_per_iteration": search_config.get( 

153 "questions_per_iteration" 

154 ), 

155 "search_tool": search_config.get("search_tool"), 

156 "search_strategy": search_config.get("search_strategy"), 

157 "model_name": search_config.get("model_name"), 

158 "provider": search_config.get("provider"), 

159 } 

160 # Remove None values 

161 relevant_params = { 

162 k: v for k, v in relevant_params.items() if v is not None 

163 } 

164 config_str = json.dumps(relevant_params, sort_keys=True) 

165 return hashlib.md5( # DevSkim: ignore DS126858 

166 config_str.encode(), usedforsecurity=False 

167 ).hexdigest()[:8] 

168 

169 def generate_query_hash(self, question: str, dataset_type: str) -> str: 

170 """Generate a hash for a query to enable deduplication.""" 

171 query_content = f"{question.strip()}|{dataset_type.lower()}" 

172 return hashlib.md5( # DevSkim: ignore DS126858 

173 query_content.encode(), usedforsecurity=False 

174 ).hexdigest() 

175 

176 def create_benchmark_run( 

177 self, 

178 run_name: Optional[str], 

179 search_config: Dict[str, Any], 

180 evaluation_config: Dict[str, Any], 

181 datasets_config: Dict[str, Dict], 

182 username: Optional[str] = None, 

183 user_password: Optional[str] = None, 

184 ) -> int: 

185 """Create a new benchmark run in the database.""" 

186 from ...database.session_context import get_user_db_session 

187 

188 with get_user_db_session(username, user_password) as session: 

189 try: 

190 config_hash = self.generate_config_hash(search_config) 

191 

192 # Calculate total examples 

193 total_examples = sum( 

194 config.get("count", 0) 

195 for config in datasets_config.values() 

196 ) 

197 

198 benchmark_run = BenchmarkRun( 

199 run_name=run_name, 

200 config_hash=config_hash, 

201 query_hash_list=[], # Will be populated as we process 

202 search_config=search_config, 

203 evaluation_config=evaluation_config, 

204 datasets_config=datasets_config, 

205 total_examples=total_examples, 

206 status=BenchmarkStatus.PENDING, 

207 ) 

208 

209 session.add(benchmark_run) 

210 session.commit() 

211 

212 logger.info( 

213 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}" 

214 ) 

215 return int(benchmark_run.id) 

216 

217 except Exception: 

218 session.rollback() 

219 logger.exception("Error creating benchmark run") 

220 raise 

221 

222 def get_existing_results( 

223 self, 

224 config_hash: str, 

225 username: Optional[str] = None, 

226 user_password: Optional[str] = None, 

227 ) -> Dict[str, Dict]: 

228 """Get existing results with compatible configuration.""" 

229 from ...database.session_context import get_user_db_session 

230 

231 with get_user_db_session(username, user_password) as session: 

232 try: 

233 # Find compatible runs 

234 compatible_runs = ( 

235 session.query(BenchmarkRun) 

236 .filter(BenchmarkRun.config_hash == config_hash) 

237 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED) 

238 .all() 

239 ) 

240 

241 existing_results = {} 

242 for run in compatible_runs: 

243 results = ( 

244 session.query(BenchmarkResult) 

245 .filter(BenchmarkResult.benchmark_run_id == run.id) 

246 .filter( 

247 BenchmarkResult.is_correct.isnot(None) 

248 ) # Only completed evaluations 

249 .all() 

250 ) 

251 

252 for result in results: 

253 existing_results[result.query_hash] = { 

254 "id": result.example_id, 

255 "dataset_type": result.dataset_type.value, 

256 "problem": result.question, 

257 "correct_answer": result.correct_answer, 

258 "response": result.response, 

259 "extracted_answer": result.extracted_answer, 

260 "confidence": result.confidence, 

261 "processing_time": result.processing_time, 

262 "sources": result.sources, 

263 "is_correct": result.is_correct, 

264 "graded_confidence": result.graded_confidence, 

265 "grader_response": result.grader_response, 

266 "query_hash": result.query_hash, 

267 } 

268 

269 logger.info( 

270 f"Found {len(existing_results)} existing results for config hash {config_hash}" 

271 ) 

272 return existing_results 

273 

274 except Exception: 

275 logger.exception("Error loading existing results") 

276 return {} 

277 

278 def start_benchmark( 

279 self, 

280 benchmark_run_id: int, 

281 username: Optional[str] = None, 

282 user_password: Optional[str] = None, 

283 ) -> bool: 

284 """Start a benchmark run in a background thread.""" 

285 from ...database.session_context import get_user_db_session 

286 

287 try: 

288 # Get all data from the database in the main thread 

289 # This avoids database access from the background thread 

290 with get_user_db_session(username, user_password) as session: 

291 # Get benchmark run details 

292 benchmark_run = ( 

293 session.query(BenchmarkRun) 

294 .filter(BenchmarkRun.id == benchmark_run_id) 

295 .first() 

296 ) 

297 if not benchmark_run: 

298 raise ValueError( # noqa: TRY301 — caught by except, sets FAILED status in DB 

299 f"Benchmark run {benchmark_run_id} not found" 

300 ) 

301 

302 # Create settings snapshot for thread safety 

303 from local_deep_research.settings import SettingsManager 

304 

305 settings_manager = SettingsManager(session) 

306 settings_snapshot = settings_manager.get_all_settings() 

307 

308 # Get user password for metrics tracking in background thread 

309 from flask import session as flask_session 

310 from ...database.session_passwords import session_password_store 

311 

312 _user_password = None 

313 session_id = flask_session.get("session_id") 

314 if session_id and username: 314 ↛ 326line 314 didn't jump to line 326 because the condition on line 314 was always true

315 _user_password = ( 

316 session_password_store.get_session_password( 

317 username, session_id 

318 ) 

319 ) 

320 if not _user_password: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 logger.warning( 

322 f"No password found for user {username} in current session" 

323 ) 

324 

325 # Extract all data we need 

326 benchmark_data = { 

327 "benchmark_run_id": benchmark_run_id, 

328 "username": username or "benchmark_user", 

329 "user_password": _user_password, # Add password for metrics tracking 

330 "config_hash": benchmark_run.config_hash, 

331 "datasets_config": benchmark_run.datasets_config, 

332 "search_config": benchmark_run.search_config, 

333 "evaluation_config": benchmark_run.evaluation_config, 

334 "existing_results": self.get_existing_results( 

335 benchmark_run.config_hash, username, user_password 

336 ), 

337 "settings_snapshot": settings_snapshot, # Add settings snapshot 

338 } 

339 

340 # Update status in database 

341 benchmark_run.status = BenchmarkStatus.IN_PROGRESS 

342 benchmark_run.start_time = datetime.now(UTC) 

343 session.commit() 

344 

345 # Store data in memory for the thread 

346 self.active_runs[benchmark_run_id] = { 

347 "data": benchmark_data, 

348 "start_time": datetime.now(UTC), 

349 "status": "running", 

350 "results": [], 

351 } 

352 

353 # Start background thread 

354 thread = threading.Thread( 

355 target=self._run_benchmark_thread, 

356 args=(benchmark_run_id,), 

357 daemon=True, 

358 ) 

359 thread.start() 

360 

361 self.active_runs[benchmark_run_id]["thread"] = thread 

362 

363 logger.info(f"Started benchmark run {benchmark_run_id}") 

364 return True 

365 

366 except Exception as e: 

367 logger.exception(f"Error starting benchmark {benchmark_run_id}") 

368 # Update status using user database 

369 with get_user_db_session(username, user_password) as session: 

370 benchmark_run = ( 

371 session.query(BenchmarkRun) 

372 .filter(BenchmarkRun.id == benchmark_run_id) 

373 .first() 

374 ) 

375 if benchmark_run: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true

376 benchmark_run.status = BenchmarkStatus.FAILED 

377 benchmark_run.error_message = str(e) 

378 session.commit() 

379 return False 

380 

381 @thread_cleanup 

382 def _run_benchmark_thread(self, benchmark_run_id: int): 

383 """Main benchmark execution thread.""" 

384 # IMPORTANT: This runs in a background thread, so we cannot access the user database 

385 # Using in-memory queue tracker for benchmark status tracking 

386 

387 task_id = None 

388 

389 # Get the benchmark data that was passed to us 

390 # We need to retrieve this from the service database or from memory 

391 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get("data") 

392 

393 try: 

394 if not benchmark_data: 

395 raise ValueError( # noqa: TRY301 

396 f"Benchmark data for run {benchmark_run_id} not found" 

397 ) 

398 # Set up settings context for thread-local access 

399 settings_snapshot = benchmark_data.get("settings_snapshot", {}) 

400 username = benchmark_data.get("username", "benchmark_user") 

401 

402 # Create a settings context that threads can use 

403 settings_context = SnapshotSettingsContext( 

404 settings_snapshot, 

405 username=username, 

406 missing_key_log_level="WARNING", 

407 ) 

408 

409 # Set the context in thread-local storage 

410 from ...config.thread_settings import set_settings_context 

411 

412 set_settings_context(settings_context) 

413 

414 # Extract all the data we need 

415 datasets_config = benchmark_data["datasets_config"] 

416 search_config = benchmark_data["search_config"] 

417 evaluation_config = benchmark_data["evaluation_config"] 

418 existing_results = benchmark_data.get("existing_results", {}) 

419 

420 # Create task queue 

421 task_queue = self._create_task_queue( 

422 datasets_config, 

423 existing_results, 

424 benchmark_run_id, 

425 ) 

426 

427 # Calculate totals 

428 total_examples = len(task_queue) + len(existing_results) 

429 completed_examples = len(existing_results) 

430 

431 # Initialize task tracking 

432 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}" 

433 username = benchmark_data.get("username", "benchmark_user") 

434 self.queue_tracker.add_task(task_id, username, "benchmark") 

435 self.queue_tracker.update_task_status( 

436 task_id, BenchmarkTaskStatus.PROCESSING 

437 ) 

438 

439 # Track progress in memory 

440 progress_info = { 

441 "total_examples": total_examples, 

442 "completed_examples": completed_examples, 

443 "failed_examples": 0, 

444 "start_time": datetime.now(UTC), 

445 } 

446 

447 # Process tasks 

448 logger.info( 

449 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks" 

450 ) 

451 for i, task in enumerate(task_queue): 

452 # Check if benchmark has been cancelled 

453 if ( 

454 benchmark_run_id in self.active_runs 

455 and self.active_runs[benchmark_run_id].get("status") 

456 == "cancelled" 

457 ): 

458 logger.info( 

459 f"Benchmark {benchmark_run_id} was cancelled, stopping processing" 

460 ) 

461 break 

462 

463 logger.info( 

464 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}" 

465 ) 

466 try: 

467 # Add username and password to task for metrics tracking 

468 task["username"] = benchmark_data.get("username") 

469 task["user_password"] = benchmark_data.get("user_password") 

470 

471 # Acquire the global research semaphore so benchmark 

472 # tasks count against the server-wide concurrency limit 

473 _global_research_semaphore.acquire() 

474 try: 

475 # Process single task 

476 result = self._process_benchmark_task( 

477 task, 

478 search_config, 

479 evaluation_config, 

480 ) 

481 finally: 

482 _global_research_semaphore.release() 

483 

484 # Store result in memory for now (will be saved later) 

485 if "results" not in self.active_runs[benchmark_run_id]: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true

486 self.active_runs[benchmark_run_id]["results"] = [] 

487 self.active_runs[benchmark_run_id]["results"].append(result) 

488 

489 # Update progress 

490 progress_info["completed_examples"] += 1 

491 

492 logger.info( 

493 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. " 

494 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples" 

495 ) 

496 

497 # Send real-time update 

498 self._send_progress_update( 

499 benchmark_run_id, 

500 progress_info["completed_examples"], 

501 progress_info["total_examples"], 

502 ) 

503 

504 except Exception as e: 

505 logger.exception(f"Error processing task {i}") 

506 progress_info["failed_examples"] += 1 

507 logger.info( 

508 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. " 

509 f"Total failed: {progress_info['failed_examples']}" 

510 ) 

511 

512 # Check if this is a rate limiting error 

513 error_str = str(e).lower() 

514 if ( 514 ↛ 451line 514 didn't jump to line 451 because the condition on line 514 was always true

515 "403" in error_str 

516 or "rate limit" in error_str 

517 or "forbidden" in error_str 

518 ): 

519 self.rate_limit_detected[benchmark_run_id] = True 

520 # Send rate limit warning via WebSocket 

521 self.socket_service.emit_to_subscribers( 

522 "research_progress", 

523 benchmark_run_id, 

524 { 

525 "rate_limit_detected": True, 

526 "message": "SearXNG rate limiting detected", 

527 }, 

528 ) 

529 

530 # Mark as completed in memory tracker 

531 progress_info["end_time"] = datetime.now(UTC) 

532 

533 # Check if benchmark was cancelled 

534 was_cancelled = ( 

535 benchmark_run_id in self.active_runs 

536 and self.active_runs[benchmark_run_id].get("status") 

537 == "cancelled" 

538 ) 

539 

540 if was_cancelled: 

541 status = BenchmarkStatus.CANCELLED 

542 message = "Benchmark cancelled by user" 

543 if task_id: 543 ↛ 556line 543 didn't jump to line 556 because the condition on line 543 was always true

544 self.queue_tracker.update_task_status( 

545 task_id, BenchmarkTaskStatus.CANCELLED 

546 ) 

547 else: 

548 status = BenchmarkStatus.COMPLETED 

549 message = "Benchmark completed successfully" 

550 if task_id: 550 ↛ 556line 550 didn't jump to line 556 because the condition on line 550 was always true

551 self.queue_tracker.update_task_status( 

552 task_id, BenchmarkTaskStatus.COMPLETED 

553 ) 

554 

555 # Store completion info for later database update 

556 self.active_runs[benchmark_run_id]["completion_info"] = { 

557 "status": status, 

558 "end_time": progress_info["end_time"], 

559 "completed_examples": progress_info["completed_examples"], 

560 "failed_examples": progress_info["failed_examples"], 

561 } 

562 

563 # Send completion notification 

564 self.socket_service.emit_to_subscribers( 

565 "research_progress", 

566 benchmark_run_id, 

567 { 

568 "status": "cancelled" if was_cancelled else "completed", 

569 "message": message, 

570 "progress": ( 

571 progress_info["completed_examples"] 

572 / progress_info["total_examples"] 

573 * 100 

574 ) 

575 if progress_info["total_examples"] > 0 

576 else 0, 

577 "benchmark_run_id": benchmark_run_id, 

578 }, 

579 ) 

580 

581 except Exception as e: 

582 logger.exception(f"Benchmark run {benchmark_run_id} failed") 

583 # Update task status if we have a task_id 

584 if task_id: 584 ↛ 585line 584 didn't jump to line 585 because the condition on line 584 was never true

585 self.queue_tracker.update_task_status( 

586 task_id, BenchmarkTaskStatus.FAILED 

587 ) 

588 # Store failure info for later database update 

589 if benchmark_run_id in self.active_runs: 589 ↛ 596line 589 didn't jump to line 596 because the condition on line 589 was always true

590 self.active_runs[benchmark_run_id]["completion_info"] = { 

591 "status": BenchmarkStatus.FAILED, 

592 "error_message": str(e), 

593 } 

594 finally: 

595 # Clean up active run tracking 

596 if benchmark_run_id in self.active_runs: 596 ↛ exitline 596 didn't return from function '_run_benchmark_thread' because the condition on line 596 was always true

597 # Mark that thread is done but keep data for database update 

598 self.active_runs[benchmark_run_id]["thread_complete"] = True 

599 

600 # Try to save results to database immediately if possible 

601 self._sync_results_to_database(benchmark_run_id) 

602 

603 def _create_task_queue( 

604 self, 

605 datasets_config: Dict, 

606 existing_results: Dict, 

607 benchmark_run_id: int, 

608 ) -> List[Dict]: 

609 """Create list of tasks to process, excluding existing results.""" 

610 tasks: List[Dict[str, Any]] = [] 

611 

612 for dataset_name, config in datasets_config.items(): 

613 if config.get("count", 0) > 0: 

614 dataset = load_dataset( 

615 dataset_type=dataset_name, 

616 num_examples=config["count"], 

617 seed=None, 

618 ) 

619 

620 for i, example in enumerate(dataset): 

621 # Extract question based on dataset type 

622 if dataset_name.lower() == "simpleqa": 

623 question = example.get("problem", "") 

624 correct_answer = example.get("answer", "") 

625 else: # browsecomp 

626 question = example.get("problem", "") 

627 correct_answer = example.get("answer", "") 

628 

629 # Generate query hash 

630 query_hash = self.generate_query_hash( 

631 question, dataset_name 

632 ) 

633 

634 # Skip if already processed 

635 if query_hash in existing_results: 

636 continue 

637 

638 tasks.append( 

639 { 

640 "benchmark_run_id": benchmark_run_id, 

641 "example_id": example.get("id", f"example_{i}"), 

642 "dataset_type": dataset_name, 

643 "question": question, 

644 "correct_answer": correct_answer, 

645 "query_hash": query_hash, 

646 "task_index": len(tasks), 

647 } 

648 ) 

649 

650 return tasks 

651 

652 def _process_benchmark_task( 

653 self, task: Dict, search_config: Dict, evaluation_config: Dict 

654 ) -> Dict: 

655 """Process a single benchmark task.""" 

656 try: 

657 logger.info( 

658 f"Starting benchmark task {task['task_index'] + 1}: " 

659 f"example_id={task['example_id']}, dataset={task['dataset_type']}, " 

660 f"question_preview='{task['question'][:100]}...'" 

661 ) 

662 

663 # Get settings context from thread-local storage 

664 from ...config.thread_settings import get_settings_context 

665 

666 settings_context = get_settings_context() 

667 

668 # Generate a unique tracking ID for this benchmark task 

669 import uuid 

670 

671 tracking_id = str(uuid.uuid4()) 

672 logger.info( 

673 f"Task {task['example_id']} assigned tracking_id: {tracking_id}" 

674 ) 

675 

676 # Format query 

677 formatted_query = format_query( 

678 task["question"], task["dataset_type"] 

679 ) 

680 logger.info( 

681 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'" 

682 ) 

683 

684 # Run research with progress callback for WebSocket updates 

685 start_time = time.time() 

686 logger.info(f"Task {task['example_id']} starting research phase...") 

687 

688 def benchmark_progress_callback( 

689 status: str, progress: int, data: dict 

690 ): 

691 """Progress callback to emit detailed research progress via WebSocket""" 

692 try: 

693 timestamp = datetime.now(UTC).isoformat() 

694 

695 # Create research-compatible log entry 

696 log_entry = { 

697 "time": timestamp, 

698 "message": f"Example {task['example_id']}: {status}", 

699 "progress": progress, 

700 "metadata": { 

701 "phase": data.get("phase", "benchmark_processing"), 

702 "type": data.get("type", "info"), 

703 "example_id": task["example_id"], 

704 "benchmark_run_id": task["benchmark_run_id"], 

705 **data, # Include all other data 

706 }, 

707 } 

708 

709 # Determine log type based on status/message content 

710 if ( 

711 "complete" in status.lower() 

712 or "finished" in status.lower() 

713 ): 

714 log_entry["metadata"]["type"] = "milestone" 

715 elif ( 

716 "error" in status.lower() or "failed" in status.lower() 

717 ): 

718 log_entry["metadata"]["type"] = "error" 

719 elif ( 

720 "starting" in status.lower() 

721 or "begin" in status.lower() 

722 ): 

723 log_entry["metadata"]["type"] = "milestone" 

724 

725 # Create progress data in research format 

726 progress_data = { 

727 "progress": progress, 

728 "message": status, 

729 "status": "in_progress", 

730 "log_entry": log_entry, 

731 "progress_log": json.dumps( 

732 [log_entry] 

733 ), # Array format expected by socket.js 

734 } 

735 

736 # Emit using research_progress format that the UI expects 

737 self.socket_service.emit_to_subscribers( 

738 "research_progress", 

739 task["benchmark_run_id"], 

740 progress_data, 

741 ) 

742 

743 except Exception: 

744 logger.exception("Error sending benchmark progress update") 

745 

746 # Get user password from task data 

747 user_password = task.get("user_password") 

748 

749 search_result = quick_summary( 

750 query=formatted_query, 

751 research_id=tracking_id, # Pass the tracking ID 

752 iterations=search_config.get("iterations", 8), 

753 questions_per_iteration=search_config.get( 

754 "questions_per_iteration", 5 

755 ), 

756 search_tool=search_config.get("search_tool", "searxng"), 

757 search_strategy=search_config.get( 

758 "search_strategy", "focused_iteration" 

759 ), 

760 progress_callback=benchmark_progress_callback, 

761 model_name=search_config.get("model_name"), 

762 provider=search_config.get("provider"), 

763 temperature=search_config.get("temperature", 0.7), 

764 openai_endpoint_url=search_config.get("openai_endpoint_url"), 

765 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety 

766 username=task.get("username"), # Pass username 

767 user_password=user_password, # Pass password for metrics tracking 

768 ) 

769 processing_time = time.time() - start_time 

770 logger.info( 

771 f"Task {task['example_id']} research completed in {processing_time:.2f}s, " 

772 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}" 

773 ) 

774 

775 # Extract answer 

776 response = search_result.get("summary", "") 

777 logger.info( 

778 f"Task {task['example_id']} response length: {len(response)} chars" 

779 ) 

780 

781 extracted_data = extract_answer_from_response( 

782 response, task["dataset_type"] 

783 ) 

784 extracted_answer = ( 

785 extracted_data.get("extracted_answer", "") 

786 if isinstance(extracted_data, dict) 

787 else str(extracted_data) 

788 ) 

789 logger.info( 

790 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'" 

791 ) 

792 

793 # Extract sources - handle both direct sources and all_links_of_system 

794 sources = search_result.get("sources", []) 

795 if not sources and "all_links_of_system" in search_result: 

796 sources = search_result.get("all_links_of_system", []) 

797 

798 # Log for debugging 

799 logger.debug(f"Search result keys: {list(search_result.keys())}") 

800 logger.debug(f"Sources found: {len(sources)} items") 

801 

802 # Prepare result 

803 result = { 

804 **task, 

805 "response": response, 

806 "extracted_answer": extracted_answer, 

807 "confidence": str( 

808 extracted_data.get("confidence", "100") 

809 if isinstance(extracted_data, dict) 

810 else "100" 

811 ), 

812 "processing_time": processing_time, 

813 "sources": json.dumps(sources), # Convert to JSON string 

814 "completed_at": datetime.now(UTC), 

815 "research_id": tracking_id, # Store the UUID in the research_id field 

816 } 

817 

818 # Evaluate result - requires proper grading model 

819 try: 

820 logger.info(f"Task {task['example_id']} starting evaluation...") 

821 eval_start_time = time.time() 

822 

823 # Always attempt evaluation, regardless of provider 

824 # Modern local models like Ollama are capable of grading 

825 # Try to evaluate with proper model 

826 result_data = { 

827 "id": task["example_id"], 

828 "problem": task["question"], 

829 "correct_answer": task["correct_answer"], 

830 "response": response, 

831 "extracted_answer": extracted_answer, 

832 } 

833 

834 eval_result = grade_single_result( 

835 result_data, 

836 task["dataset_type"], 

837 evaluation_config, 

838 settings_context.snapshot, 

839 ) 

840 eval_time = time.time() - eval_start_time 

841 logger.info( 

842 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s" 

843 ) 

844 if eval_result and not eval_result.get("grading_error"): 

845 result.update( 

846 { 

847 "is_correct": eval_result.get("is_correct", False), 

848 "graded_confidence": eval_result.get( 

849 "graded_confidence", "0" 

850 ), 

851 "grader_response": eval_result.get( 

852 "grader_response", "" 

853 ), 

854 } 

855 ) 

856 else: 

857 error_msg = ( 

858 eval_result.get( 

859 "grading_error", "Unknown evaluation error" 

860 ) 

861 if eval_result 

862 else "No evaluation results returned" 

863 ) 

864 result.update( 

865 { 

866 "is_correct": None, 

867 "graded_confidence": "0", 

868 "grader_response": f"Evaluation failed: {error_msg}", 

869 "evaluation_error": error_msg, 

870 } 

871 ) 

872 

873 except Exception as e: 

874 logger.exception("Evaluation error") 

875 result.update( 

876 { 

877 "is_correct": None, 

878 "graded_confidence": "0", 

879 "grader_response": f"Evaluation failed: {e!s}", 

880 "evaluation_error": str(e), 

881 } 

882 ) 

883 

884 return result 

885 

886 except Exception as e: 

887 logger.exception("Research error") 

888 return { 

889 **task, 

890 "research_error": str(e), 

891 "completed_at": datetime.now(UTC), 

892 } 

893 

894 def sync_pending_results( 

895 self, benchmark_run_id: int, username: Optional[str] = None 

896 ): 

897 """Sync any pending results to database. Can be called from main thread.""" 

898 if benchmark_run_id not in self.active_runs: 

899 return 0 

900 

901 run_data = self.active_runs[benchmark_run_id] 

902 results_to_save = run_data.get("results", []) 

903 saved_indices = run_data.get("saved_indices", set()) 

904 

905 if not username: 

906 username = run_data.get("data", {}).get("username") 

907 

908 user_password = run_data.get("data", {}).get("user_password") 

909 

910 saved_count = 0 

911 from ...database.session_context import get_user_db_session 

912 from ...database.models.benchmark import BenchmarkResult 

913 

914 try: 

915 with get_user_db_session(username, user_password) as session: 

916 # Save any results that haven't been saved yet 

917 for idx, result in enumerate(results_to_save): 

918 if idx not in saved_indices: 

919 # Check if this result already exists in the database 

920 existing = ( 

921 session.query(BenchmarkResult) 

922 .filter_by( 

923 benchmark_run_id=benchmark_run_id, 

924 query_hash=result["query_hash"], 

925 ) 

926 .first() 

927 ) 

928 

929 if existing: 

930 # Skip if already exists 

931 saved_indices.add(idx) 

932 continue 

933 

934 benchmark_result = BenchmarkResult( 

935 benchmark_run_id=benchmark_run_id, 

936 example_id=result["example_id"], 

937 query_hash=result["query_hash"], 

938 dataset_type=DatasetType(result["dataset_type"]), 

939 research_id=result.get("research_id"), 

940 question=result["question"], 

941 correct_answer=result["correct_answer"], 

942 response=result.get("response"), 

943 extracted_answer=result.get("extracted_answer"), 

944 confidence=result.get("confidence"), 

945 processing_time=result.get("processing_time"), 

946 sources=result.get("sources"), 

947 is_correct=result.get("is_correct"), 

948 graded_confidence=result.get("graded_confidence"), 

949 grader_response=result.get("grader_response"), 

950 completed_at=result.get("completed_at"), 

951 research_error=result.get("research_error"), 

952 evaluation_error=result.get("evaluation_error"), 

953 task_index=result.get("task_index"), 

954 ) 

955 session.add(benchmark_result) 

956 saved_indices.add(idx) 

957 saved_count += 1 

958 

959 if saved_count > 0: 

960 session.commit() 

961 run_data["saved_indices"] = saved_indices 

962 logger.info( 

963 f"Saved {saved_count} new results for benchmark {benchmark_run_id}" 

964 ) 

965 

966 except Exception: 

967 logger.exception( 

968 f"Error syncing pending results for benchmark {benchmark_run_id}" 

969 ) 

970 # Roll back the session on error to prevent PendingRollbackError 

971 try: 

972 session.rollback() 

973 except Exception: 

974 logger.debug("Failed to rollback session after sync error") 

975 

976 return saved_count 

977 

978 def _sync_results_to_database(self, benchmark_run_id: int): 

979 """Sync benchmark results from memory to database after thread completes.""" 

980 if benchmark_run_id not in self.active_runs: 

981 return 

982 

983 run_data = self.active_runs[benchmark_run_id] 

984 if not run_data.get("thread_complete"): 

985 return 

986 

987 username = run_data.get("data", {}).get("username") 

988 user_password = run_data.get("data", {}).get("user_password") 

989 from ...database.session_context import get_user_db_session 

990 

991 try: 

992 with get_user_db_session(username, user_password) as session: 

993 # Update benchmark run status 

994 benchmark_run = ( 

995 session.query(BenchmarkRun) 

996 .filter(BenchmarkRun.id == benchmark_run_id) 

997 .first() 

998 ) 

999 

1000 if benchmark_run and "completion_info" in run_data: 1000 ↛ 1076line 1000 didn't jump to line 1076

1001 info = run_data["completion_info"] 

1002 benchmark_run.status = info["status"] 

1003 benchmark_run.end_time = info.get( 

1004 "end_time", datetime.now(UTC) 

1005 ) 

1006 benchmark_run.completed_examples = info.get( 

1007 "completed_examples", 0 

1008 ) 

1009 benchmark_run.failed_examples = info.get( 

1010 "failed_examples", 0 

1011 ) 

1012 benchmark_run.error_message = info.get("error_message") 

1013 

1014 # Save all results (skip already saved ones) 

1015 saved_indices = run_data.get("saved_indices", set()) 

1016 for idx, result in enumerate(run_data.get("results", [])): 

1017 if idx in saved_indices: 

1018 continue 

1019 benchmark_result = BenchmarkResult( 

1020 benchmark_run_id=benchmark_run_id, 

1021 example_id=result["example_id"], 

1022 query_hash=result["query_hash"], 

1023 dataset_type=DatasetType(result["dataset_type"]), 

1024 research_id=result.get("research_id"), 

1025 question=result["question"], 

1026 correct_answer=result["correct_answer"], 

1027 response=result.get("response"), 

1028 extracted_answer=result.get("extracted_answer"), 

1029 confidence=result.get("confidence"), 

1030 processing_time=result.get("processing_time"), 

1031 sources=result.get("sources"), 

1032 is_correct=result.get("is_correct"), 

1033 graded_confidence=result.get("graded_confidence"), 

1034 grader_response=result.get("grader_response"), 

1035 completed_at=result.get("completed_at"), 

1036 research_error=result.get("research_error"), 

1037 evaluation_error=result.get("evaluation_error"), 

1038 task_index=result.get("task_index"), 

1039 ) 

1040 session.add(benchmark_result) 

1041 

1042 # Calculate final accuracy 

1043 if benchmark_run.status == BenchmarkStatus.COMPLETED: 

1044 correct_results = [ 

1045 r 

1046 for r in run_data.get("results", []) 

1047 if r.get("is_correct") 

1048 ] 

1049 evaluated_results = [ 

1050 r 

1051 for r in run_data.get("results", []) 

1052 if r.get("is_correct") is not None 

1053 ] 

1054 

1055 if evaluated_results: 1055 ↛ 1070line 1055 didn't jump to line 1070 because the condition on line 1055 was always true

1056 benchmark_run.overall_accuracy = ( 

1057 len(correct_results) / len(evaluated_results) 

1058 ) * 100 

1059 

1060 # Calculate processing rate 

1061 total_time = sum( 

1062 r.get("processing_time", 0) 

1063 for r in evaluated_results 

1064 ) 

1065 if total_time > 0: 1065 ↛ 1070line 1065 didn't jump to line 1070 because the condition on line 1065 was always true

1066 benchmark_run.processing_rate = len( 

1067 evaluated_results 

1068 ) / (total_time / 60) 

1069 

1070 session.commit() 

1071 logger.info( 

1072 f"Successfully synced results for benchmark {benchmark_run_id}" 

1073 ) 

1074 

1075 # Clean up memory 

1076 del self.active_runs[benchmark_run_id] 

1077 

1078 except Exception: 

1079 logger.exception("Error syncing benchmark results to database") 

1080 

1081 def _send_progress_update( 

1082 self, benchmark_run_id: int, completed: int, total: int 

1083 ): 

1084 """Send real-time progress update via websocket.""" 

1085 try: 

1086 percentage = (completed / total * 100) if total > 0 else 0 

1087 

1088 # Create log entry for milestone progress 

1089 log_entry = { 

1090 "time": datetime.now(UTC).isoformat(), 

1091 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)", 

1092 "progress": percentage, 

1093 "metadata": { 

1094 "phase": "benchmark_progress", 

1095 "type": "milestone", 

1096 "completed": completed, 

1097 "total": total, 

1098 "benchmark_run_id": benchmark_run_id, 

1099 }, 

1100 } 

1101 

1102 progress_data = { 

1103 "status": "in_progress", 

1104 "message": f"Processing examples: {completed}/{total}", 

1105 "progress": percentage, 

1106 "completed": completed, 

1107 "total": total, 

1108 "benchmark_run_id": benchmark_run_id, 

1109 "log_entry": log_entry, 

1110 "progress_log": json.dumps([log_entry]), 

1111 } 

1112 

1113 self.socket_service.emit_to_subscribers( 

1114 "research_progress", benchmark_run_id, progress_data 

1115 ) 

1116 

1117 except Exception: 

1118 logger.exception("Error sending progress update") 

1119 

1120 def _calculate_final_accuracy( 

1121 self, benchmark_run_id: int, username: Optional[str] = None 

1122 ): 

1123 """Calculate and save final accuracy metrics.""" 

1124 from ...database.session_context import get_user_db_session 

1125 

1126 with get_user_db_session(username) as session: 

1127 try: 

1128 # Get all results for this run 

1129 results = ( 

1130 session.query(BenchmarkResult) 

1131 .filter( 

1132 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1133 ) 

1134 .filter(BenchmarkResult.is_correct.isnot(None)) 

1135 .all() 

1136 ) 

1137 

1138 if results: 

1139 correct_count = sum(1 for r in results if r.is_correct) 

1140 overall_accuracy = (correct_count / len(results)) * 100 

1141 

1142 # Calculate processing rate 

1143 total_time = sum(r.processing_time or 0 for r in results) 

1144 processing_rate = ( 

1145 (len(results) / (total_time / 60)) 

1146 if total_time > 0 

1147 else 0 

1148 ) 

1149 

1150 # Update benchmark run 

1151 benchmark_run = ( 

1152 session.query(BenchmarkRun) 

1153 .filter(BenchmarkRun.id == benchmark_run_id) 

1154 .first() 

1155 ) 

1156 if benchmark_run: 1156 ↛ exitline 1156 didn't jump to the function exit

1157 benchmark_run.overall_accuracy = overall_accuracy 

1158 benchmark_run.processing_rate = processing_rate 

1159 session.commit() 

1160 

1161 except Exception: 

1162 logger.exception("Error calculating final accuracy") 

1163 

1164 def update_benchmark_status( 

1165 self, 

1166 benchmark_run_id: int, 

1167 status: BenchmarkStatus, 

1168 error_message: Optional[str] = None, 

1169 username: Optional[str] = None, 

1170 ): 

1171 """Update benchmark run status.""" 

1172 from ...database.session_context import get_user_db_session 

1173 

1174 with get_user_db_session(username) as session: 

1175 try: 

1176 benchmark_run = ( 

1177 session.query(BenchmarkRun) 

1178 .filter(BenchmarkRun.id == benchmark_run_id) 

1179 .first() 

1180 ) 

1181 if benchmark_run: 

1182 benchmark_run.status = status 

1183 benchmark_run.updated_at = datetime.now(UTC) 

1184 

1185 if error_message: 

1186 benchmark_run.error_message = error_message 

1187 

1188 if ( 

1189 status == BenchmarkStatus.IN_PROGRESS 

1190 and not benchmark_run.start_time 

1191 ): 

1192 benchmark_run.start_time = datetime.now(UTC) 

1193 elif ( 1193 ↛ 1200line 1193 didn't jump to line 1200 because the condition on line 1193 was always true

1194 status 

1195 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED] 

1196 and not benchmark_run.end_time 

1197 ): 

1198 benchmark_run.end_time = datetime.now(UTC) 

1199 

1200 session.commit() 

1201 

1202 except Exception: 

1203 session.rollback() 

1204 logger.exception("Error updating benchmark status") 

1205 

1206 def get_benchmark_status( 

1207 self, benchmark_run_id: int, username: str = None 

1208 ) -> Optional[Dict]: 

1209 """Get current status of a benchmark run.""" 

1210 from ...database.session_context import get_user_db_session 

1211 

1212 with get_user_db_session(username) as session: 

1213 try: 

1214 benchmark_run = ( 

1215 session.query(BenchmarkRun) 

1216 .filter(BenchmarkRun.id == benchmark_run_id) 

1217 .first() 

1218 ) 

1219 if not benchmark_run: 

1220 return None 

1221 

1222 # Calculate running accuracy from current results AND reused results from compatible runs 

1223 # First get results specifically for this benchmark run 

1224 current_results = ( 

1225 session.query(BenchmarkResult) 

1226 .filter( 

1227 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1228 ) 

1229 .filter(BenchmarkResult.is_correct.isnot(None)) 

1230 .all() 

1231 ) 

1232 

1233 # Then get reused results from compatible benchmark runs (same config hash) 

1234 # Only count results up to the number we say we've "completed" 

1235 if benchmark_run.completed_examples > len(current_results): 1235 ↛ 1237line 1235 didn't jump to line 1237 because the condition on line 1235 was never true

1236 # We have reused results, get them from compatible runs 

1237 reused_count_needed = ( 

1238 benchmark_run.completed_examples - len(current_results) 

1239 ) 

1240 

1241 compatible_results = ( 

1242 session.query(BenchmarkResult) 

1243 .join( 

1244 BenchmarkRun, 

1245 BenchmarkResult.benchmark_run_id == BenchmarkRun.id, 

1246 ) 

1247 .filter( 

1248 BenchmarkRun.config_hash 

1249 == benchmark_run.config_hash 

1250 ) 

1251 .filter( 

1252 BenchmarkRun.id != benchmark_run_id 

1253 ) # Exclude current run 

1254 .filter( 

1255 BenchmarkRun.status == BenchmarkStatus.COMPLETED 

1256 ) 

1257 .filter(BenchmarkResult.is_correct.isnot(None)) 

1258 .order_by(BenchmarkResult.id) # Consistent ordering 

1259 .limit(reused_count_needed) 

1260 .all() 

1261 ) 

1262 

1263 # Combine current and reused results 

1264 results = ( 

1265 current_results 

1266 + compatible_results[:reused_count_needed] 

1267 ) 

1268 else: 

1269 # No reused results, just use current results 

1270 results = current_results 

1271 

1272 running_accuracy = None 

1273 # Dynamic per-dataset accuracy tracking 

1274 dataset_accuracies = {} 

1275 

1276 if results: 1276 ↛ 1308line 1276 didn't jump to line 1308 because the condition on line 1276 was always true

1277 # Overall running accuracy 

1278 correct_count = sum(1 for r in results if r.is_correct) 

1279 running_accuracy = (correct_count / len(results)) * 100 

1280 

1281 # Calculate accuracy for each dataset type dynamically 

1282 from collections import defaultdict 

1283 

1284 dataset_results = defaultdict(list) 

1285 

1286 # Group results by dataset type 

1287 for r in results: 

1288 dataset_results[r.dataset_type.value].append(r) 

1289 

1290 # Calculate accuracy for each dataset 

1291 for ( 

1292 dataset_type, 

1293 dataset_result_list, 

1294 ) in dataset_results.items(): 

1295 if dataset_result_list: 1295 ↛ 1291line 1295 didn't jump to line 1291 because the condition on line 1295 was always true

1296 correct = sum( 

1297 1 for r in dataset_result_list if r.is_correct 

1298 ) 

1299 accuracy = ( 

1300 correct / len(dataset_result_list) 

1301 ) * 100 

1302 # Store with _accuracy suffix for consistency 

1303 dataset_accuracies[f"{dataset_type}_accuracy"] = ( 

1304 accuracy 

1305 ) 

1306 

1307 # Calculate time estimates and reliability metrics 

1308 estimated_time_remaining = None 

1309 total_elapsed_time = None 

1310 avg_time_per_example = None 

1311 accuracy_confidence = None 

1312 

1313 # Get ALL results for timing calculation (including those pending evaluation) 

1314 all_results_for_timing = ( 

1315 session.query(BenchmarkResult) 

1316 .filter( 

1317 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1318 ) 

1319 .all() 

1320 ) 

1321 

1322 if benchmark_run.start_time and all_results_for_timing: 1322 ↛ 1356line 1322 didn't jump to line 1356 because the condition on line 1322 was always true

1323 # Calculate elapsed time 

1324 current_time = datetime.now(UTC) 

1325 total_elapsed_time = ( 

1326 current_time - benchmark_run.start_time 

1327 ).total_seconds() 

1328 

1329 # Calculate average processing time per example using actual count 

1330 avg_time_per_example = total_elapsed_time / len( 

1331 all_results_for_timing 

1332 ) 

1333 

1334 logger.info( 

1335 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, " 

1336 f"results_count: {len(all_results_for_timing)}, " 

1337 f"avg_per_example: {avg_time_per_example:.2f}s" 

1338 ) 

1339 

1340 # Estimate remaining time 

1341 remaining_examples = benchmark_run.total_examples - len( 

1342 all_results_for_timing 

1343 ) 

1344 if remaining_examples > 0: 1344 ↛ 1356line 1344 didn't jump to line 1356 because the condition on line 1344 was always true

1345 estimated_time_remaining = ( 

1346 avg_time_per_example * remaining_examples 

1347 ) 

1348 logger.info( 

1349 f"Time estimation - total: {benchmark_run.total_examples}, " 

1350 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, " 

1351 f"avg_time: {avg_time_per_example:.2f}s, " 

1352 f"estimated_remaining: {estimated_time_remaining:.2f}s" 

1353 ) 

1354 

1355 # Calculate accuracy confidence interval (95% confidence) 

1356 if results and len(results) >= 3: 1356 ↛ 1357line 1356 didn't jump to line 1357 because the condition on line 1356 was never true

1357 import math 

1358 

1359 n = len(results) 

1360 p = running_accuracy / 100 if running_accuracy else 0 

1361 # Standard error for proportion 

1362 se = math.sqrt(p * (1 - p) / n) 

1363 # 95% confidence interval (±1.96 * SE) 

1364 margin_of_error = 1.96 * se * 100 

1365 _running_acc = running_accuracy or 0.0 

1366 accuracy_confidence = { 

1367 "lower_bound": max(0, _running_acc - margin_of_error), 

1368 "upper_bound": min(100, _running_acc + margin_of_error), 

1369 "margin_of_error": margin_of_error, 

1370 "sample_size": n, 

1371 } 

1372 

1373 status_data = { 

1374 "id": benchmark_run.id, 

1375 "run_name": benchmark_run.run_name, 

1376 "status": benchmark_run.status.value, 

1377 "completed_examples": len( 

1378 all_results_for_timing 

1379 ), # Use actual count from DB 

1380 "total_examples": benchmark_run.total_examples, 

1381 "failed_examples": benchmark_run.failed_examples, 

1382 "overall_accuracy": benchmark_run.overall_accuracy 

1383 or running_accuracy, # Use running accuracy if final not calculated 

1384 "running_accuracy": running_accuracy, # Current running accuracy 

1385 "processing_rate": benchmark_run.processing_rate, 

1386 "estimated_time_remaining": estimated_time_remaining, # seconds 

1387 "total_elapsed_time": total_elapsed_time, # seconds 

1388 "avg_time_per_example": avg_time_per_example, # seconds 

1389 "accuracy_confidence": accuracy_confidence, # confidence interval 

1390 "created_at": benchmark_run.created_at.isoformat() 

1391 if benchmark_run.created_at 

1392 else None, 

1393 "start_time": benchmark_run.start_time.isoformat() 

1394 if benchmark_run.start_time 

1395 else None, 

1396 "end_time": benchmark_run.end_time.isoformat() 

1397 if benchmark_run.end_time 

1398 else None, 

1399 "error_message": benchmark_run.error_message, 

1400 # Add all per-dataset accuracies dynamically 

1401 **dataset_accuracies, 

1402 } 

1403 

1404 logger.info( 

1405 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, " 

1406 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, " 

1407 f"avg_time: {avg_time_per_example}" 

1408 ) 

1409 

1410 return status_data 

1411 

1412 except Exception: 

1413 logger.exception("Error getting benchmark status") 

1414 return None 

1415 

1416 def cancel_benchmark( 

1417 self, benchmark_run_id: int, username: Optional[str] = None 

1418 ) -> bool: 

1419 """Cancel a running benchmark.""" 

1420 try: 

1421 if benchmark_run_id in self.active_runs: 

1422 self.active_runs[benchmark_run_id]["status"] = "cancelled" 

1423 

1424 self.update_benchmark_status( 

1425 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username 

1426 ) 

1427 logger.info(f"Cancelled benchmark run {benchmark_run_id}") 

1428 return True 

1429 

1430 except Exception: 

1431 logger.exception(f"Error cancelling benchmark {benchmark_run_id}") 

1432 return False 

1433 

1434 

1435# Global service instance 

1436benchmark_service = BenchmarkService()