Coverage for src / local_deep_research / benchmarks / web_api / benchmark_service.py: 45%

485 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1"""Benchmark service for handling web-based benchmark execution.""" 

2 

3import hashlib 

4import json 

5import threading 

6import time 

7from datetime import UTC, datetime 

8from enum import Enum 

9from typing import Any, Dict, List, Optional 

10 

11from loguru import logger 

12 

13from ...api.research_functions import quick_summary 

14from ...database.models.benchmark import ( 

15 BenchmarkResult, 

16 BenchmarkRun, 

17 BenchmarkStatus, 

18 DatasetType, 

19) 

20from ...web.services.socket_service import SocketIOService 

21from ..datasets import load_dataset 

22from ..graders import extract_answer_from_response, grade_single_result 

23from ..runners import format_query 

24 

25 

26class BenchmarkTaskStatus(Enum): 

27 """Status values for benchmark tasks in the queue tracker.""" 

28 

29 QUEUED = "queued" 

30 PROCESSING = "processing" 

31 COMPLETED = "completed" 

32 FAILED = "failed" 

33 CANCELLED = "cancelled" 

34 

35 

36class BenchmarkQueueTracker: 

37 """Simple in-memory tracker for benchmark queue status. 

38 

39 This replaces the removed memory_queue functionality for benchmarks. 

40 Since benchmarks are temporary and don't need persistence, 

41 this simple in-memory solution is sufficient. 

42 

43 Thread-safe for concurrent access from multiple benchmark threads. 

44 """ 

45 

46 def __init__(self): 

47 self.tasks = {} 

48 self._lock = threading.Lock() 

49 

50 def add_task( 

51 self, task_id: str, username: str, task_type: str = "benchmark" 

52 ): 

53 """Add a new task to tracking. 

54 

55 Also performs opportunistic cleanup of old completed tasks. 

56 """ 

57 # Cleanup old tasks before adding new one (outside lock for better performance) 

58 self.cleanup_completed_tasks() 

59 

60 with self._lock: 

61 self.tasks[task_id] = { 

62 "username": username, 

63 "task_type": task_type, 

64 "status": BenchmarkTaskStatus.QUEUED.value, 

65 "created_at": datetime.now(UTC), 

66 } 

67 

68 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus): 

69 """Update the status of a task.""" 

70 with self._lock: 

71 if task_id in self.tasks: 

72 self.tasks[task_id]["status"] = status.value 

73 self.tasks[task_id]["updated_at"] = datetime.now(UTC) 

74 else: 

75 logger.warning( 

76 f"Attempted to update status for non-existent task: {task_id}" 

77 ) 

78 

79 def get_task_status(self, task_id: str) -> Optional[Dict]: 

80 """Get the current status of a task.""" 

81 with self._lock: 

82 return self.tasks.get(task_id) 

83 

84 def remove_task(self, task_id: str): 

85 """Remove a task from tracking.""" 

86 with self._lock: 

87 self.tasks.pop(task_id, None) 

88 

89 def cleanup_completed_tasks(self, max_age_seconds: int = 3600): 

90 """Remove completed tasks older than max_age_seconds. 

91 

92 Args: 

93 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour) 

94 """ 

95 with self._lock: 

96 now = datetime.now(UTC) 

97 to_remove = [] 

98 for task_id, task_data in self.tasks.items(): 

99 # Only cleanup completed, failed, or cancelled tasks 

100 if task_data["status"] in [ 

101 BenchmarkTaskStatus.COMPLETED.value, 

102 BenchmarkTaskStatus.FAILED.value, 

103 BenchmarkTaskStatus.CANCELLED.value, 

104 ]: 

105 # Check if task has updated_at timestamp 

106 updated_at = task_data.get( 

107 "updated_at", task_data.get("created_at") 

108 ) 

109 if updated_at: 109 ↛ 98line 109 didn't jump to line 98 because the condition on line 109 was always true

110 age = (now - updated_at).total_seconds() 

111 if age > max_age_seconds: 

112 to_remove.append(task_id) 

113 

114 for task_id in to_remove: 

115 self.tasks.pop(task_id, None) 

116 logger.debug(f"Cleaned up old task: {task_id}") 

117 

118 if to_remove: 

119 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks") 

120 

121 

122class BenchmarkService: 

123 """Service for managing benchmark runs through the web interface.""" 

124 

125 def __init__(self, socket_service=None): 

126 self.active_runs: Dict[int, Dict] = {} 

127 self.socket_service = socket_service or self._get_socket_service() 

128 self.rate_limit_detected: Dict[ 

129 int, bool 

130 ] = {} # Track rate limiting per benchmark run 

131 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker 

132 

133 def _get_socket_service(self): 

134 """Get socket service instance, handling cases where Flask app is not available.""" 

135 try: 

136 return SocketIOService() 

137 except Exception: 

138 # Return a mock socket service for testing/standalone use 

139 class MockSocketService: 

140 def emit_to_room(self, room, event, data): 

141 pass 

142 

143 return MockSocketService() 

144 

145 def generate_config_hash(self, search_config: Dict[str, Any]) -> str: 

146 """Generate a hash for search configuration compatibility checking.""" 

147 relevant_params = { 

148 "iterations": search_config.get("iterations"), 

149 "questions_per_iteration": search_config.get( 

150 "questions_per_iteration" 

151 ), 

152 "search_tool": search_config.get("search_tool"), 

153 "search_strategy": search_config.get("search_strategy"), 

154 "model_name": search_config.get("model_name"), 

155 "provider": search_config.get("provider"), 

156 } 

157 # Remove None values 

158 relevant_params = { 

159 k: v for k, v in relevant_params.items() if v is not None 

160 } 

161 config_str = json.dumps(relevant_params, sort_keys=True) 

162 return hashlib.md5( # DevSkim: ignore DS126858 

163 config_str.encode(), usedforsecurity=False 

164 ).hexdigest()[:8] 

165 

166 def generate_query_hash(self, question: str, dataset_type: str) -> str: 

167 """Generate a hash for a query to enable deduplication.""" 

168 query_content = f"{question.strip()}|{dataset_type.lower()}" 

169 return hashlib.md5( # DevSkim: ignore DS126858 

170 query_content.encode(), usedforsecurity=False 

171 ).hexdigest() 

172 

173 def create_benchmark_run( 

174 self, 

175 run_name: Optional[str], 

176 search_config: Dict[str, Any], 

177 evaluation_config: Dict[str, Any], 

178 datasets_config: Dict[str, Dict], 

179 username: str = None, 

180 user_password: str = None, 

181 ) -> int: 

182 """Create a new benchmark run in the database.""" 

183 from ...database.session_context import get_user_db_session 

184 

185 with get_user_db_session(username, user_password) as session: 

186 try: 

187 config_hash = self.generate_config_hash(search_config) 

188 

189 # Calculate total examples 

190 total_examples = sum( 

191 config.get("count", 0) 

192 for config in datasets_config.values() 

193 ) 

194 

195 benchmark_run = BenchmarkRun( 

196 run_name=run_name, 

197 config_hash=config_hash, 

198 query_hash_list=[], # Will be populated as we process 

199 search_config=search_config, 

200 evaluation_config=evaluation_config, 

201 datasets_config=datasets_config, 

202 total_examples=total_examples, 

203 status=BenchmarkStatus.PENDING, 

204 ) 

205 

206 session.add(benchmark_run) 

207 session.commit() 

208 

209 logger.info( 

210 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}" 

211 ) 

212 return benchmark_run.id 

213 

214 except Exception: 

215 session.rollback() 

216 logger.exception("Error creating benchmark run") 

217 raise 

218 

219 def get_existing_results( 

220 self, config_hash: str, username: str = None, user_password: str = None 

221 ) -> Dict[str, Dict]: 

222 """Get existing results with compatible configuration.""" 

223 from ...database.session_context import get_user_db_session 

224 

225 with get_user_db_session(username, user_password) as session: 

226 try: 

227 # Find compatible runs 

228 compatible_runs = ( 

229 session.query(BenchmarkRun) 

230 .filter(BenchmarkRun.config_hash == config_hash) 

231 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED) 

232 .all() 

233 ) 

234 

235 existing_results = {} 

236 for run in compatible_runs: 

237 results = ( 

238 session.query(BenchmarkResult) 

239 .filter(BenchmarkResult.benchmark_run_id == run.id) 

240 .filter( 

241 BenchmarkResult.is_correct.isnot(None) 

242 ) # Only completed evaluations 

243 .all() 

244 ) 

245 

246 for result in results: 

247 existing_results[result.query_hash] = { 

248 "id": result.example_id, 

249 "dataset_type": result.dataset_type.value, 

250 "problem": result.question, 

251 "correct_answer": result.correct_answer, 

252 "response": result.response, 

253 "extracted_answer": result.extracted_answer, 

254 "confidence": result.confidence, 

255 "processing_time": result.processing_time, 

256 "sources": result.sources, 

257 "is_correct": result.is_correct, 

258 "graded_confidence": result.graded_confidence, 

259 "grader_response": result.grader_response, 

260 "query_hash": result.query_hash, 

261 } 

262 

263 logger.info( 

264 f"Found {len(existing_results)} existing results for config hash {config_hash}" 

265 ) 

266 return existing_results 

267 

268 except Exception: 

269 logger.exception("Error loading existing results") 

270 return {} 

271 

272 def start_benchmark( 

273 self, 

274 benchmark_run_id: int, 

275 username: str = None, 

276 user_password: str = None, 

277 ) -> bool: 

278 """Start a benchmark run in a background thread.""" 

279 from ...database.session_context import get_user_db_session 

280 

281 try: 

282 # Get all data from the database in the main thread 

283 # This avoids database access from the background thread 

284 with get_user_db_session(username, user_password) as session: 

285 # Get benchmark run details 

286 benchmark_run = ( 

287 session.query(BenchmarkRun) 

288 .filter(BenchmarkRun.id == benchmark_run_id) 

289 .first() 

290 ) 

291 if not benchmark_run: 

292 raise ValueError( 

293 f"Benchmark run {benchmark_run_id} not found" 

294 ) 

295 

296 # Create settings snapshot for thread safety 

297 from local_deep_research.settings import SettingsManager 

298 

299 settings_manager = SettingsManager(session) 

300 settings_snapshot = settings_manager.get_all_settings() 

301 

302 # Get user password for metrics tracking in background thread 

303 from flask import session as flask_session 

304 from ...database.session_passwords import session_password_store 

305 

306 user_password = None 

307 session_id = flask_session.get("session_id") 

308 if session_id and username: 308 ↛ 318line 308 didn't jump to line 318 because the condition on line 308 was always true

309 user_password = session_password_store.get_session_password( 

310 username, session_id 

311 ) 

312 if not user_password: 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true

313 logger.warning( 

314 f"No password found for user {username} in current session" 

315 ) 

316 

317 # Extract all data we need 

318 benchmark_data = { 

319 "benchmark_run_id": benchmark_run_id, 

320 "username": username or "benchmark_user", 

321 "user_password": user_password, # Add password for metrics tracking 

322 "config_hash": benchmark_run.config_hash, 

323 "datasets_config": benchmark_run.datasets_config, 

324 "search_config": benchmark_run.search_config, 

325 "evaluation_config": benchmark_run.evaluation_config, 

326 "existing_results": self.get_existing_results( 

327 benchmark_run.config_hash, username, user_password 

328 ), 

329 "settings_snapshot": settings_snapshot, # Add settings snapshot 

330 } 

331 

332 # Update status in database 

333 benchmark_run.status = BenchmarkStatus.IN_PROGRESS 

334 benchmark_run.start_time = datetime.now(UTC) 

335 session.commit() 

336 

337 # Store data in memory for the thread 

338 self.active_runs[benchmark_run_id] = { 

339 "data": benchmark_data, 

340 "start_time": datetime.now(UTC), 

341 "status": "running", 

342 "results": [], 

343 } 

344 

345 # Start background thread 

346 thread = threading.Thread( 

347 target=self._run_benchmark_thread, 

348 args=(benchmark_run_id,), 

349 daemon=True, 

350 ) 

351 thread.start() 

352 

353 self.active_runs[benchmark_run_id]["thread"] = thread 

354 

355 logger.info(f"Started benchmark run {benchmark_run_id}") 

356 return True 

357 

358 except Exception as e: 

359 logger.exception(f"Error starting benchmark {benchmark_run_id}") 

360 # Update status using user database 

361 with get_user_db_session(username, user_password) as session: 

362 benchmark_run = ( 

363 session.query(BenchmarkRun) 

364 .filter(BenchmarkRun.id == benchmark_run_id) 

365 .first() 

366 ) 

367 if benchmark_run: 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true

368 benchmark_run.status = BenchmarkStatus.FAILED 

369 benchmark_run.error_message = str(e) 

370 session.commit() 

371 return False 

372 

373 def _run_benchmark_thread(self, benchmark_run_id: int): 

374 """Main benchmark execution thread.""" 

375 # IMPORTANT: This runs in a background thread, so we cannot access the user database 

376 # Using in-memory queue tracker for benchmark status tracking 

377 

378 task_id = None 

379 try: 

380 # Get the benchmark data that was passed to us 

381 # We need to retrieve this from the service database or from memory 

382 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get( 

383 "data" 

384 ) 

385 

386 if not benchmark_data: 

387 raise ValueError( 

388 f"Benchmark data for run {benchmark_run_id} not found" 

389 ) 

390 

391 # Set up settings context for thread-local access 

392 settings_snapshot = benchmark_data.get("settings_snapshot", {}) 

393 username = benchmark_data.get("username", "benchmark_user") 

394 

395 # Create a settings context that threads can use 

396 class SettingsContext: 

397 def __init__(self, snapshot, username): 

398 self.snapshot = snapshot or {} 

399 self.username = username 

400 # Extract values from setting objects if needed 

401 self.values = {} 

402 for key, setting in self.snapshot.items(): 

403 if isinstance(setting, dict) and "value" in setting: 

404 # It's a full setting object, extract the value 

405 self.values[key] = setting["value"] 

406 else: 

407 # It's already just a value 

408 self.values[key] = setting 

409 

410 def get_setting(self, key, default=None): 

411 """Get setting from snapshot only - no database access in threads""" 

412 if key in self.values: 

413 return self.values[key] 

414 # No fallback to database - threads must use snapshot only 

415 logger.warning( 

416 f"Setting '{key}' not found in snapshot, using default" 

417 ) 

418 return default 

419 

420 settings_context = SettingsContext(settings_snapshot, username) 

421 

422 # Set the context in thread-local storage 

423 from ...config.thread_settings import ( 

424 clear_settings_context, 

425 set_settings_context, 

426 ) 

427 

428 set_settings_context(settings_context) 

429 

430 # Extract all the data we need 

431 datasets_config = benchmark_data["datasets_config"] 

432 search_config = benchmark_data["search_config"] 

433 evaluation_config = benchmark_data["evaluation_config"] 

434 existing_results = benchmark_data.get("existing_results", {}) 

435 

436 # Create task queue 

437 task_queue = self._create_task_queue( 

438 datasets_config, 

439 existing_results, 

440 benchmark_run_id, 

441 ) 

442 

443 # Calculate totals 

444 total_examples = len(task_queue) + len(existing_results) 

445 completed_examples = len(existing_results) 

446 

447 # Initialize task tracking 

448 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}" 

449 username = benchmark_data.get("username", "benchmark_user") 

450 self.queue_tracker.add_task(task_id, username, "benchmark") 

451 self.queue_tracker.update_task_status( 

452 task_id, BenchmarkTaskStatus.PROCESSING 

453 ) 

454 

455 # Track progress in memory 

456 progress_info = { 

457 "total_examples": total_examples, 

458 "completed_examples": completed_examples, 

459 "failed_examples": 0, 

460 "start_time": datetime.now(UTC), 

461 } 

462 

463 # Process tasks 

464 logger.info( 

465 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks" 

466 ) 

467 for i, task in enumerate(task_queue): 

468 # Check if benchmark has been cancelled 

469 if ( 

470 benchmark_run_id in self.active_runs 

471 and self.active_runs[benchmark_run_id].get("status") 

472 == "cancelled" 

473 ): 

474 logger.info( 

475 f"Benchmark {benchmark_run_id} was cancelled, stopping processing" 

476 ) 

477 break 

478 

479 logger.info( 

480 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}" 

481 ) 

482 try: 

483 # Add username and password to task for metrics tracking 

484 task["username"] = benchmark_data.get("username") 

485 task["user_password"] = benchmark_data.get("user_password") 

486 

487 # Process single task 

488 result = self._process_benchmark_task( 

489 task, 

490 search_config, 

491 evaluation_config, 

492 ) 

493 

494 # Store result in memory for now (will be saved later) 

495 if "results" not in self.active_runs[benchmark_run_id]: 

496 self.active_runs[benchmark_run_id]["results"] = [] 

497 self.active_runs[benchmark_run_id]["results"].append(result) 

498 

499 # Update progress 

500 progress_info["completed_examples"] += 1 

501 

502 logger.info( 

503 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. " 

504 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples" 

505 ) 

506 

507 # Send real-time update 

508 self._send_progress_update( 

509 benchmark_run_id, 

510 progress_info["completed_examples"], 

511 progress_info["total_examples"], 

512 ) 

513 

514 except Exception as e: 

515 logger.exception(f"Error processing task {i}") 

516 progress_info["failed_examples"] += 1 

517 logger.info( 

518 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. " 

519 f"Total failed: {progress_info['failed_examples']}" 

520 ) 

521 

522 # Check if this is a rate limiting error 

523 error_str = str(e).lower() 

524 if ( 

525 "403" in error_str 

526 or "rate limit" in error_str 

527 or "forbidden" in error_str 

528 ): 

529 self.rate_limit_detected[benchmark_run_id] = True 

530 # Send rate limit warning via WebSocket 

531 self.socket_service.emit_to_subscribers( 

532 "research_progress", 

533 benchmark_run_id, 

534 { 

535 "rate_limit_detected": True, 

536 "message": "SearXNG rate limiting detected", 

537 }, 

538 ) 

539 

540 # Mark as completed in memory tracker 

541 progress_info["end_time"] = datetime.now(UTC) 

542 

543 # Check if benchmark was cancelled 

544 was_cancelled = ( 

545 benchmark_run_id in self.active_runs 

546 and self.active_runs[benchmark_run_id].get("status") 

547 == "cancelled" 

548 ) 

549 

550 if was_cancelled: 

551 status = BenchmarkStatus.CANCELLED 

552 message = "Benchmark cancelled by user" 

553 if task_id: 

554 self.queue_tracker.update_task_status( 

555 task_id, BenchmarkTaskStatus.CANCELLED 

556 ) 

557 else: 

558 status = BenchmarkStatus.COMPLETED 

559 message = "Benchmark completed successfully" 

560 if task_id: 

561 self.queue_tracker.update_task_status( 

562 task_id, BenchmarkTaskStatus.COMPLETED 

563 ) 

564 

565 # Store completion info for later database update 

566 self.active_runs[benchmark_run_id]["completion_info"] = { 

567 "status": status, 

568 "end_time": progress_info["end_time"], 

569 "completed_examples": progress_info["completed_examples"], 

570 "failed_examples": progress_info["failed_examples"], 

571 } 

572 

573 # Send completion notification 

574 self.socket_service.emit_to_subscribers( 

575 "research_progress", 

576 benchmark_run_id, 

577 { 

578 "status": "cancelled" if was_cancelled else "completed", 

579 "message": message, 

580 "progress": ( 

581 progress_info["completed_examples"] 

582 / progress_info["total_examples"] 

583 * 100 

584 ) 

585 if progress_info["total_examples"] > 0 

586 else 0, 

587 "benchmark_run_id": benchmark_run_id, 

588 }, 

589 ) 

590 

591 except Exception as e: 

592 logger.exception(f"Benchmark run {benchmark_run_id} failed") 

593 # Update task status if we have a task_id 

594 if task_id: 

595 self.queue_tracker.update_task_status( 

596 task_id, BenchmarkTaskStatus.FAILED 

597 ) 

598 # Store failure info for later database update 

599 if benchmark_run_id in self.active_runs: 

600 self.active_runs[benchmark_run_id]["completion_info"] = { 

601 "status": BenchmarkStatus.FAILED, 

602 "error_message": str(e), 

603 } 

604 finally: 

605 # Clear thread-local settings context to prevent leaks 

606 clear_settings_context() 

607 

608 # Clean up active run tracking 

609 if benchmark_run_id in self.active_runs: 

610 # Mark that thread is done but keep data for database update 

611 self.active_runs[benchmark_run_id]["thread_complete"] = True 

612 

613 # Try to save results to database immediately if possible 

614 self._sync_results_to_database(benchmark_run_id) 

615 

616 def _create_task_queue( 

617 self, 

618 datasets_config: Dict, 

619 existing_results: Dict, 

620 benchmark_run_id: int, 

621 ) -> List[Dict]: 

622 """Create list of tasks to process, excluding existing results.""" 

623 tasks = [] 

624 

625 for dataset_name, config in datasets_config.items(): 

626 if config.get("count", 0) > 0: 626 ↛ 625line 626 didn't jump to line 625 because the condition on line 626 was always true

627 dataset = load_dataset( 

628 dataset_type=dataset_name, 

629 num_examples=config["count"], 

630 seed=None, 

631 ) 

632 

633 for i, example in enumerate(dataset): 

634 # Extract question based on dataset type 

635 if dataset_name.lower() == "simpleqa": 635 ↛ 639line 635 didn't jump to line 639 because the condition on line 635 was always true

636 question = example.get("problem", "") 

637 correct_answer = example.get("answer", "") 

638 else: # browsecomp 

639 question = example.get("problem", "") 

640 correct_answer = example.get("answer", "") 

641 

642 # Generate query hash 

643 query_hash = self.generate_query_hash( 

644 question, dataset_name 

645 ) 

646 

647 # Skip if already processed 

648 if query_hash in existing_results: 

649 continue 

650 

651 tasks.append( 

652 { 

653 "benchmark_run_id": benchmark_run_id, 

654 "example_id": example.get("id", f"example_{i}"), 

655 "dataset_type": dataset_name, 

656 "question": question, 

657 "correct_answer": correct_answer, 

658 "query_hash": query_hash, 

659 "task_index": len(tasks), 

660 } 

661 ) 

662 

663 return tasks 

664 

665 def _process_benchmark_task( 

666 self, task: Dict, search_config: Dict, evaluation_config: Dict 

667 ) -> Dict: 

668 """Process a single benchmark task.""" 

669 try: 

670 logger.info( 

671 f"Starting benchmark task {task['task_index'] + 1}: " 

672 f"example_id={task['example_id']}, dataset={task['dataset_type']}, " 

673 f"question_preview='{task['question'][:100]}...'" 

674 ) 

675 

676 # Get settings context from thread-local storage 

677 from ...config.thread_settings import get_settings_context 

678 

679 settings_context = get_settings_context() 

680 

681 # Generate a unique tracking ID for this benchmark task 

682 import uuid 

683 

684 tracking_id = str(uuid.uuid4()) 

685 logger.info( 

686 f"Task {task['example_id']} assigned tracking_id: {tracking_id}" 

687 ) 

688 

689 # Format query 

690 formatted_query = format_query( 

691 task["question"], task["dataset_type"] 

692 ) 

693 logger.info( 

694 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'" 

695 ) 

696 

697 # Run research with progress callback for WebSocket updates 

698 start_time = time.time() 

699 logger.info(f"Task {task['example_id']} starting research phase...") 

700 

701 def benchmark_progress_callback( 

702 status: str, progress: int, data: dict 

703 ): 

704 """Progress callback to emit detailed research progress via WebSocket""" 

705 try: 

706 timestamp = datetime.now(UTC).isoformat() 

707 

708 # Create research-compatible log entry 

709 log_entry = { 

710 "time": timestamp, 

711 "message": f"Example {task['example_id']}: {status}", 

712 "progress": progress, 

713 "metadata": { 

714 "phase": data.get("phase", "benchmark_processing"), 

715 "type": data.get("type", "info"), 

716 "example_id": task["example_id"], 

717 "benchmark_run_id": task["benchmark_run_id"], 

718 **data, # Include all other data 

719 }, 

720 } 

721 

722 # Determine log type based on status/message content 

723 if ( 

724 "complete" in status.lower() 

725 or "finished" in status.lower() 

726 ): 

727 log_entry["metadata"]["type"] = "milestone" 

728 elif ( 

729 "error" in status.lower() or "failed" in status.lower() 

730 ): 

731 log_entry["metadata"]["type"] = "error" 

732 elif ( 

733 "starting" in status.lower() 

734 or "begin" in status.lower() 

735 ): 

736 log_entry["metadata"]["type"] = "milestone" 

737 

738 # Create progress data in research format 

739 progress_data = { 

740 "progress": progress, 

741 "message": status, 

742 "status": "in_progress", 

743 "log_entry": log_entry, 

744 "progress_log": json.dumps( 

745 [log_entry] 

746 ), # Array format expected by socket.js 

747 } 

748 

749 # Emit using research_progress format that the UI expects 

750 self.socket_service.emit_to_subscribers( 

751 "research_progress", 

752 task["benchmark_run_id"], 

753 progress_data, 

754 ) 

755 

756 except Exception: 

757 logger.exception("Error sending benchmark progress update") 

758 

759 # Get user password from task data 

760 user_password = task.get("user_password") 

761 

762 search_result = quick_summary( 

763 query=formatted_query, 

764 research_id=tracking_id, # Pass the tracking ID 

765 iterations=search_config.get("iterations", 8), 

766 questions_per_iteration=search_config.get( 

767 "questions_per_iteration", 5 

768 ), 

769 search_tool=search_config.get("search_tool", "searxng"), 

770 search_strategy=search_config.get( 

771 "search_strategy", "focused_iteration" 

772 ), 

773 progress_callback=benchmark_progress_callback, 

774 model_name=search_config.get("model_name"), 

775 provider=search_config.get("provider"), 

776 temperature=search_config.get("temperature", 0.7), 

777 openai_endpoint_url=search_config.get("openai_endpoint_url"), 

778 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety 

779 username=task.get("username"), # Pass username 

780 user_password=user_password, # Pass password for metrics tracking 

781 ) 

782 processing_time = time.time() - start_time 

783 logger.info( 

784 f"Task {task['example_id']} research completed in {processing_time:.2f}s, " 

785 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}" 

786 ) 

787 

788 # Extract answer 

789 response = search_result.get("summary", "") 

790 logger.info( 

791 f"Task {task['example_id']} response length: {len(response)} chars" 

792 ) 

793 

794 extracted_data = extract_answer_from_response( 

795 response, task["dataset_type"] 

796 ) 

797 extracted_answer = ( 

798 extracted_data.get("extracted_answer", "") 

799 if isinstance(extracted_data, dict) 

800 else str(extracted_data) 

801 ) 

802 logger.info( 

803 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'" 

804 ) 

805 

806 # Extract sources - handle both direct sources and all_links_of_system 

807 sources = search_result.get("sources", []) 

808 if not sources and "all_links_of_system" in search_result: 808 ↛ 809line 808 didn't jump to line 809 because the condition on line 808 was never true

809 sources = search_result.get("all_links_of_system", []) 

810 

811 # Log for debugging 

812 logger.debug(f"Search result keys: {list(search_result.keys())}") 

813 logger.debug(f"Sources found: {len(sources)} items") 

814 

815 # Prepare result 

816 result = { 

817 **task, 

818 "response": response, 

819 "extracted_answer": extracted_answer, 

820 "confidence": str( 

821 extracted_data.get("confidence", "100") 

822 if isinstance(extracted_data, dict) 

823 else "100" 

824 ), 

825 "processing_time": processing_time, 

826 "sources": json.dumps(sources), # Convert to JSON string 

827 "completed_at": datetime.now(UTC), 

828 "research_id": tracking_id, # Store the UUID in the research_id field 

829 } 

830 

831 # Evaluate result - requires proper grading model 

832 try: 

833 logger.info(f"Task {task['example_id']} starting evaluation...") 

834 eval_start_time = time.time() 

835 

836 # Always attempt evaluation, regardless of provider 

837 # Modern local models like Ollama are capable of grading 

838 # Try to evaluate with proper model 

839 result_data = { 

840 "id": task["example_id"], 

841 "problem": task["question"], 

842 "correct_answer": task["correct_answer"], 

843 "response": response, 

844 "extracted_answer": extracted_answer, 

845 } 

846 

847 eval_result = grade_single_result( 

848 result_data, 

849 task["dataset_type"], 

850 evaluation_config, 

851 settings_context.snapshot, 

852 ) 

853 eval_time = time.time() - eval_start_time 

854 logger.info( 

855 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s" 

856 ) 

857 if eval_result and not eval_result.get("grading_error"): 857 ↛ 870line 857 didn't jump to line 870 because the condition on line 857 was always true

858 result.update( 

859 { 

860 "is_correct": eval_result.get("is_correct", False), 

861 "graded_confidence": eval_result.get( 

862 "graded_confidence", "0" 

863 ), 

864 "grader_response": eval_result.get( 

865 "grader_response", "" 

866 ), 

867 } 

868 ) 

869 else: 

870 error_msg = ( 

871 eval_result.get( 

872 "grading_error", "Unknown evaluation error" 

873 ) 

874 if eval_result 

875 else "No evaluation results returned" 

876 ) 

877 result.update( 

878 { 

879 "is_correct": None, 

880 "graded_confidence": "0", 

881 "grader_response": f"Evaluation failed: {error_msg}", 

882 "evaluation_error": error_msg, 

883 } 

884 ) 

885 

886 except Exception as e: 

887 logger.exception("Evaluation error") 

888 result.update( 

889 { 

890 "is_correct": None, 

891 "graded_confidence": "0", 

892 "grader_response": f"Evaluation failed: {e!s}", 

893 "evaluation_error": str(e), 

894 } 

895 ) 

896 

897 return result 

898 

899 except Exception as e: 

900 logger.exception("Research error") 

901 return { 

902 **task, 

903 "research_error": str(e), 

904 "completed_at": datetime.now(UTC), 

905 } 

906 

907 def sync_pending_results(self, benchmark_run_id: int, username: str = None): 

908 """Sync any pending results to database. Can be called from main thread.""" 

909 if benchmark_run_id not in self.active_runs: 909 ↛ 912line 909 didn't jump to line 912 because the condition on line 909 was always true

910 return 0 

911 

912 run_data = self.active_runs[benchmark_run_id] 

913 results_to_save = run_data.get("results", []) 

914 saved_indices = run_data.get("saved_indices", set()) 

915 

916 if not username: 

917 username = run_data.get("data", {}).get("username") 

918 

919 user_password = run_data.get("data", {}).get("user_password") 

920 

921 saved_count = 0 

922 from ...database.session_context import get_user_db_session 

923 from ...database.models.benchmark import BenchmarkResult 

924 

925 try: 

926 with get_user_db_session(username, user_password) as session: 

927 # Save any results that haven't been saved yet 

928 for idx, result in enumerate(results_to_save): 

929 if idx not in saved_indices: 

930 # Check if this result already exists in the database 

931 existing = ( 

932 session.query(BenchmarkResult) 

933 .filter_by( 

934 benchmark_run_id=benchmark_run_id, 

935 query_hash=result["query_hash"], 

936 ) 

937 .first() 

938 ) 

939 

940 if existing: 

941 # Skip if already exists 

942 saved_indices.add(idx) 

943 continue 

944 

945 benchmark_result = BenchmarkResult( 

946 benchmark_run_id=benchmark_run_id, 

947 example_id=result["example_id"], 

948 query_hash=result["query_hash"], 

949 dataset_type=DatasetType(result["dataset_type"]), 

950 research_id=result.get("research_id"), 

951 question=result["question"], 

952 correct_answer=result["correct_answer"], 

953 response=result.get("response"), 

954 extracted_answer=result.get("extracted_answer"), 

955 confidence=result.get("confidence"), 

956 processing_time=result.get("processing_time"), 

957 sources=result.get("sources"), 

958 is_correct=result.get("is_correct"), 

959 graded_confidence=result.get("graded_confidence"), 

960 grader_response=result.get("grader_response"), 

961 completed_at=result.get("completed_at"), 

962 research_error=result.get("research_error"), 

963 evaluation_error=result.get("evaluation_error"), 

964 task_index=result.get("task_index"), 

965 ) 

966 session.add(benchmark_result) 

967 saved_indices.add(idx) 

968 saved_count += 1 

969 

970 if saved_count > 0: 

971 session.commit() 

972 run_data["saved_indices"] = saved_indices 

973 logger.info( 

974 f"Saved {saved_count} new results for benchmark {benchmark_run_id}" 

975 ) 

976 

977 except Exception: 

978 logger.exception( 

979 f"Error syncing pending results for benchmark {benchmark_run_id}" 

980 ) 

981 # Roll back the session on error to prevent PendingRollbackError 

982 try: 

983 session.rollback() 

984 except Exception: 

985 logger.debug("Failed to rollback session after sync error") 

986 

987 return saved_count 

988 

989 def _sync_results_to_database(self, benchmark_run_id: int): 

990 """Sync benchmark results from memory to database after thread completes.""" 

991 if benchmark_run_id not in self.active_runs: 

992 return 

993 

994 run_data = self.active_runs[benchmark_run_id] 

995 if not run_data.get("thread_complete"): 

996 return 

997 

998 username = run_data.get("data", {}).get("username") 

999 user_password = run_data.get("data", {}).get("user_password") 

1000 from ...database.session_context import get_user_db_session 

1001 

1002 try: 

1003 with get_user_db_session(username, user_password) as session: 

1004 # Update benchmark run status 

1005 benchmark_run = ( 

1006 session.query(BenchmarkRun) 

1007 .filter(BenchmarkRun.id == benchmark_run_id) 

1008 .first() 

1009 ) 

1010 

1011 if benchmark_run and "completion_info" in run_data: 

1012 info = run_data["completion_info"] 

1013 benchmark_run.status = info["status"] 

1014 benchmark_run.end_time = info.get( 

1015 "end_time", datetime.now(UTC) 

1016 ) 

1017 benchmark_run.completed_examples = info.get( 

1018 "completed_examples", 0 

1019 ) 

1020 benchmark_run.failed_examples = info.get( 

1021 "failed_examples", 0 

1022 ) 

1023 benchmark_run.error_message = info.get("error_message") 

1024 

1025 # Save all results (skip already saved ones) 

1026 saved_indices = run_data.get("saved_indices", set()) 

1027 for idx, result in enumerate(run_data.get("results", [])): 

1028 if idx in saved_indices: 

1029 continue 

1030 benchmark_result = BenchmarkResult( 

1031 benchmark_run_id=benchmark_run_id, 

1032 example_id=result["example_id"], 

1033 query_hash=result["query_hash"], 

1034 dataset_type=DatasetType(result["dataset_type"]), 

1035 research_id=result.get("research_id"), 

1036 question=result["question"], 

1037 correct_answer=result["correct_answer"], 

1038 response=result.get("response"), 

1039 extracted_answer=result.get("extracted_answer"), 

1040 confidence=result.get("confidence"), 

1041 processing_time=result.get("processing_time"), 

1042 sources=result.get("sources"), 

1043 is_correct=result.get("is_correct"), 

1044 graded_confidence=result.get("graded_confidence"), 

1045 grader_response=result.get("grader_response"), 

1046 completed_at=result.get("completed_at"), 

1047 research_error=result.get("research_error"), 

1048 evaluation_error=result.get("evaluation_error"), 

1049 task_index=result.get("task_index"), 

1050 ) 

1051 session.add(benchmark_result) 

1052 

1053 # Calculate final accuracy 

1054 if benchmark_run.status == BenchmarkStatus.COMPLETED: 

1055 correct_results = [ 

1056 r 

1057 for r in run_data.get("results", []) 

1058 if r.get("is_correct") 

1059 ] 

1060 evaluated_results = [ 

1061 r 

1062 for r in run_data.get("results", []) 

1063 if r.get("is_correct") is not None 

1064 ] 

1065 

1066 if evaluated_results: 

1067 benchmark_run.overall_accuracy = ( 

1068 len(correct_results) / len(evaluated_results) 

1069 ) * 100 

1070 

1071 # Calculate processing rate 

1072 total_time = sum( 

1073 r.get("processing_time", 0) 

1074 for r in evaluated_results 

1075 ) 

1076 if total_time > 0: 

1077 benchmark_run.processing_rate = len( 

1078 evaluated_results 

1079 ) / (total_time / 60) 

1080 

1081 session.commit() 

1082 logger.info( 

1083 f"Successfully synced results for benchmark {benchmark_run_id}" 

1084 ) 

1085 

1086 # Clean up memory 

1087 del self.active_runs[benchmark_run_id] 

1088 

1089 except Exception: 

1090 logger.exception("Error syncing benchmark results to database") 

1091 

1092 def _send_progress_update( 

1093 self, benchmark_run_id: int, completed: int, total: int 

1094 ): 

1095 """Send real-time progress update via websocket.""" 

1096 try: 

1097 percentage = (completed / total * 100) if total > 0 else 0 

1098 

1099 # Create log entry for milestone progress 

1100 log_entry = { 

1101 "time": datetime.now(UTC).isoformat(), 

1102 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)", 

1103 "progress": percentage, 

1104 "metadata": { 

1105 "phase": "benchmark_progress", 

1106 "type": "milestone", 

1107 "completed": completed, 

1108 "total": total, 

1109 "benchmark_run_id": benchmark_run_id, 

1110 }, 

1111 } 

1112 

1113 progress_data = { 

1114 "status": "in_progress", 

1115 "message": f"Processing examples: {completed}/{total}", 

1116 "progress": percentage, 

1117 "completed": completed, 

1118 "total": total, 

1119 "benchmark_run_id": benchmark_run_id, 

1120 "log_entry": log_entry, 

1121 "progress_log": json.dumps([log_entry]), 

1122 } 

1123 

1124 self.socket_service.emit_to_subscribers( 

1125 "research_progress", benchmark_run_id, progress_data 

1126 ) 

1127 

1128 except Exception: 

1129 logger.exception("Error sending progress update") 

1130 

1131 def _calculate_final_accuracy( 

1132 self, benchmark_run_id: int, username: str = None 

1133 ): 

1134 """Calculate and save final accuracy metrics.""" 

1135 from ...database.session_context import get_user_db_session 

1136 

1137 with get_user_db_session(username) as session: 

1138 try: 

1139 # Get all results for this run 

1140 results = ( 

1141 session.query(BenchmarkResult) 

1142 .filter( 

1143 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1144 ) 

1145 .filter(BenchmarkResult.is_correct.isnot(None)) 

1146 .all() 

1147 ) 

1148 

1149 if results: 

1150 correct_count = sum(1 for r in results if r.is_correct) 

1151 overall_accuracy = (correct_count / len(results)) * 100 

1152 

1153 # Calculate processing rate 

1154 total_time = sum(r.processing_time or 0 for r in results) 

1155 processing_rate = ( 

1156 (len(results) / (total_time / 60)) 

1157 if total_time > 0 

1158 else 0 

1159 ) 

1160 

1161 # Update benchmark run 

1162 benchmark_run = ( 

1163 session.query(BenchmarkRun) 

1164 .filter(BenchmarkRun.id == benchmark_run_id) 

1165 .first() 

1166 ) 

1167 if benchmark_run: 

1168 benchmark_run.overall_accuracy = overall_accuracy 

1169 benchmark_run.processing_rate = processing_rate 

1170 session.commit() 

1171 

1172 except Exception: 

1173 logger.exception("Error calculating final accuracy") 

1174 

1175 def update_benchmark_status( 

1176 self, 

1177 benchmark_run_id: int, 

1178 status: BenchmarkStatus, 

1179 error_message: str = None, 

1180 username: str = None, 

1181 ): 

1182 """Update benchmark run status.""" 

1183 from ...database.session_context import get_user_db_session 

1184 

1185 with get_user_db_session(username) as session: 

1186 try: 

1187 benchmark_run = ( 

1188 session.query(BenchmarkRun) 

1189 .filter(BenchmarkRun.id == benchmark_run_id) 

1190 .first() 

1191 ) 

1192 if benchmark_run: 1192 ↛ exitline 1192 didn't jump to the function exit

1193 benchmark_run.status = status 

1194 benchmark_run.updated_at = datetime.now(UTC) 

1195 

1196 if error_message: 

1197 benchmark_run.error_message = error_message 

1198 

1199 if ( 

1200 status == BenchmarkStatus.IN_PROGRESS 

1201 and not benchmark_run.start_time 

1202 ): 

1203 benchmark_run.start_time = datetime.now(UTC) 

1204 elif ( 1204 ↛ 1211line 1204 didn't jump to line 1211 because the condition on line 1204 was always true

1205 status 

1206 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED] 

1207 and not benchmark_run.end_time 

1208 ): 

1209 benchmark_run.end_time = datetime.now(UTC) 

1210 

1211 session.commit() 

1212 

1213 except Exception: 

1214 session.rollback() 

1215 logger.exception("Error updating benchmark status") 

1216 

1217 def get_benchmark_status( 

1218 self, benchmark_run_id: int, username: str = None 

1219 ) -> Optional[Dict]: 

1220 """Get current status of a benchmark run.""" 

1221 from ...database.session_context import get_user_db_session 

1222 

1223 with get_user_db_session(username) as session: 

1224 try: 

1225 benchmark_run = ( 

1226 session.query(BenchmarkRun) 

1227 .filter(BenchmarkRun.id == benchmark_run_id) 

1228 .first() 

1229 ) 

1230 if not benchmark_run: 1230 ↛ 1235line 1230 didn't jump to line 1235 because the condition on line 1230 was always true

1231 return None 

1232 

1233 # Calculate running accuracy from current results AND reused results from compatible runs 

1234 # First get results specifically for this benchmark run 

1235 current_results = ( 

1236 session.query(BenchmarkResult) 

1237 .filter( 

1238 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1239 ) 

1240 .filter(BenchmarkResult.is_correct.isnot(None)) 

1241 .all() 

1242 ) 

1243 

1244 # Then get reused results from compatible benchmark runs (same config hash) 

1245 # Only count results up to the number we say we've "completed" 

1246 if benchmark_run.completed_examples > len(current_results): 

1247 # We have reused results, get them from compatible runs 

1248 reused_count_needed = ( 

1249 benchmark_run.completed_examples - len(current_results) 

1250 ) 

1251 

1252 compatible_results = ( 

1253 session.query(BenchmarkResult) 

1254 .join( 

1255 BenchmarkRun, 

1256 BenchmarkResult.benchmark_run_id == BenchmarkRun.id, 

1257 ) 

1258 .filter( 

1259 BenchmarkRun.config_hash 

1260 == benchmark_run.config_hash 

1261 ) 

1262 .filter( 

1263 BenchmarkRun.id != benchmark_run_id 

1264 ) # Exclude current run 

1265 .filter( 

1266 BenchmarkRun.status == BenchmarkStatus.COMPLETED 

1267 ) 

1268 .filter(BenchmarkResult.is_correct.isnot(None)) 

1269 .order_by(BenchmarkResult.id) # Consistent ordering 

1270 .limit(reused_count_needed) 

1271 .all() 

1272 ) 

1273 

1274 # Combine current and reused results 

1275 results = ( 

1276 current_results 

1277 + compatible_results[:reused_count_needed] 

1278 ) 

1279 else: 

1280 # No reused results, just use current results 

1281 results = current_results 

1282 

1283 running_accuracy = None 

1284 # Dynamic per-dataset accuracy tracking 

1285 dataset_accuracies = {} 

1286 

1287 if results: 

1288 # Overall running accuracy 

1289 correct_count = sum(1 for r in results if r.is_correct) 

1290 running_accuracy = (correct_count / len(results)) * 100 

1291 

1292 # Calculate accuracy for each dataset type dynamically 

1293 from collections import defaultdict 

1294 

1295 dataset_results = defaultdict(list) 

1296 

1297 # Group results by dataset type 

1298 for r in results: 

1299 dataset_results[r.dataset_type.value].append(r) 

1300 

1301 # Calculate accuracy for each dataset 

1302 for ( 

1303 dataset_type, 

1304 dataset_result_list, 

1305 ) in dataset_results.items(): 

1306 if dataset_result_list: 

1307 correct = sum( 

1308 1 for r in dataset_result_list if r.is_correct 

1309 ) 

1310 accuracy = ( 

1311 correct / len(dataset_result_list) 

1312 ) * 100 

1313 # Store with _accuracy suffix for consistency 

1314 dataset_accuracies[f"{dataset_type}_accuracy"] = ( 

1315 accuracy 

1316 ) 

1317 

1318 # Calculate time estimates and reliability metrics 

1319 estimated_time_remaining = None 

1320 total_elapsed_time = None 

1321 avg_time_per_example = None 

1322 accuracy_confidence = None 

1323 

1324 # Get ALL results for timing calculation (including those pending evaluation) 

1325 all_results_for_timing = ( 

1326 session.query(BenchmarkResult) 

1327 .filter( 

1328 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1329 ) 

1330 .all() 

1331 ) 

1332 

1333 if benchmark_run.start_time and all_results_for_timing: 

1334 # Calculate elapsed time 

1335 current_time = datetime.now(UTC) 

1336 total_elapsed_time = ( 

1337 current_time - benchmark_run.start_time 

1338 ).total_seconds() 

1339 

1340 # Calculate average processing time per example using actual count 

1341 avg_time_per_example = total_elapsed_time / len( 

1342 all_results_for_timing 

1343 ) 

1344 

1345 logger.info( 

1346 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, " 

1347 f"results_count: {len(all_results_for_timing)}, " 

1348 f"avg_per_example: {avg_time_per_example:.2f}s" 

1349 ) 

1350 

1351 # Estimate remaining time 

1352 remaining_examples = benchmark_run.total_examples - len( 

1353 all_results_for_timing 

1354 ) 

1355 if remaining_examples > 0: 

1356 estimated_time_remaining = ( 

1357 avg_time_per_example * remaining_examples 

1358 ) 

1359 logger.info( 

1360 f"Time estimation - total: {benchmark_run.total_examples}, " 

1361 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, " 

1362 f"avg_time: {avg_time_per_example:.2f}s, " 

1363 f"estimated_remaining: {estimated_time_remaining:.2f}s" 

1364 ) 

1365 

1366 # Calculate accuracy confidence interval (95% confidence) 

1367 if results and len(results) >= 3: 

1368 import math 

1369 

1370 n = len(results) 

1371 p = running_accuracy / 100 if running_accuracy else 0 

1372 # Standard error for proportion 

1373 se = math.sqrt(p * (1 - p) / n) 

1374 # 95% confidence interval (±1.96 * SE) 

1375 margin_of_error = 1.96 * se * 100 

1376 accuracy_confidence = { 

1377 "lower_bound": max( 

1378 0, running_accuracy - margin_of_error 

1379 ), 

1380 "upper_bound": min( 

1381 100, running_accuracy + margin_of_error 

1382 ), 

1383 "margin_of_error": margin_of_error, 

1384 "sample_size": n, 

1385 } 

1386 

1387 status_data = { 

1388 "id": benchmark_run.id, 

1389 "run_name": benchmark_run.run_name, 

1390 "status": benchmark_run.status.value, 

1391 "completed_examples": len( 

1392 all_results_for_timing 

1393 ), # Use actual count from DB 

1394 "total_examples": benchmark_run.total_examples, 

1395 "failed_examples": benchmark_run.failed_examples, 

1396 "overall_accuracy": benchmark_run.overall_accuracy 

1397 or running_accuracy, # Use running accuracy if final not calculated 

1398 "running_accuracy": running_accuracy, # Current running accuracy 

1399 "processing_rate": benchmark_run.processing_rate, 

1400 "estimated_time_remaining": estimated_time_remaining, # seconds 

1401 "total_elapsed_time": total_elapsed_time, # seconds 

1402 "avg_time_per_example": avg_time_per_example, # seconds 

1403 "accuracy_confidence": accuracy_confidence, # confidence interval 

1404 "created_at": benchmark_run.created_at.isoformat() 

1405 if benchmark_run.created_at 

1406 else None, 

1407 "start_time": benchmark_run.start_time.isoformat() 

1408 if benchmark_run.start_time 

1409 else None, 

1410 "end_time": benchmark_run.end_time.isoformat() 

1411 if benchmark_run.end_time 

1412 else None, 

1413 "error_message": benchmark_run.error_message, 

1414 # Add all per-dataset accuracies dynamically 

1415 **dataset_accuracies, 

1416 } 

1417 

1418 logger.info( 

1419 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, " 

1420 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, " 

1421 f"avg_time: {avg_time_per_example}" 

1422 ) 

1423 

1424 return status_data 

1425 

1426 except Exception: 

1427 logger.exception("Error getting benchmark status") 

1428 return None 

1429 

1430 def cancel_benchmark( 

1431 self, benchmark_run_id: int, username: str = None 

1432 ) -> bool: 

1433 """Cancel a running benchmark.""" 

1434 try: 

1435 if benchmark_run_id in self.active_runs: 1435 ↛ 1438line 1435 didn't jump to line 1438 because the condition on line 1435 was always true

1436 self.active_runs[benchmark_run_id]["status"] = "cancelled" 

1437 

1438 self.update_benchmark_status( 

1439 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username 

1440 ) 

1441 logger.info(f"Cancelled benchmark run {benchmark_run_id}") 

1442 return True 

1443 

1444 except Exception: 

1445 logger.exception(f"Error cancelling benchmark {benchmark_run_id}") 

1446 return False 

1447 

1448 

1449# Global service instance 

1450benchmark_service = BenchmarkService()