Coverage for src / local_deep_research / benchmarks / web_api / benchmark_service.py: 9%

484 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1"""Benchmark service for handling web-based benchmark execution.""" 

2 

3import hashlib 

4import json 

5import threading 

6import time 

7from datetime import UTC, datetime 

8from enum import Enum 

9from typing import Any, Dict, List, Optional 

10 

11from loguru import logger 

12 

13from ...api.research_functions import quick_summary 

14from ...database.models.benchmark import ( 

15 BenchmarkResult, 

16 BenchmarkRun, 

17 BenchmarkStatus, 

18 DatasetType, 

19) 

20from ...web.services.socket_service import SocketIOService 

21from ..datasets import load_dataset 

22from ..graders import extract_answer_from_response, grade_single_result 

23from ..runners import format_query 

24 

25 

26class BenchmarkTaskStatus(Enum): 

27 """Status values for benchmark tasks in the queue tracker.""" 

28 

29 QUEUED = "queued" 

30 PROCESSING = "processing" 

31 COMPLETED = "completed" 

32 FAILED = "failed" 

33 CANCELLED = "cancelled" 

34 

35 

36class BenchmarkQueueTracker: 

37 """Simple in-memory tracker for benchmark queue status. 

38 

39 This replaces the removed memory_queue functionality for benchmarks. 

40 Since benchmarks are temporary and don't need persistence, 

41 this simple in-memory solution is sufficient. 

42 

43 Thread-safe for concurrent access from multiple benchmark threads. 

44 """ 

45 

46 def __init__(self): 

47 self.tasks = {} 

48 self._lock = threading.Lock() 

49 

50 def add_task( 

51 self, task_id: str, username: str, task_type: str = "benchmark" 

52 ): 

53 """Add a new task to tracking. 

54 

55 Also performs opportunistic cleanup of old completed tasks. 

56 """ 

57 # Cleanup old tasks before adding new one (outside lock for better performance) 

58 self.cleanup_completed_tasks() 

59 

60 with self._lock: 

61 self.tasks[task_id] = { 

62 "username": username, 

63 "task_type": task_type, 

64 "status": BenchmarkTaskStatus.QUEUED.value, 

65 "created_at": datetime.now(UTC), 

66 } 

67 

68 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus): 

69 """Update the status of a task.""" 

70 with self._lock: 

71 if task_id in self.tasks: 

72 self.tasks[task_id]["status"] = status.value 

73 self.tasks[task_id]["updated_at"] = datetime.now(UTC) 

74 else: 

75 logger.warning( 

76 f"Attempted to update status for non-existent task: {task_id}" 

77 ) 

78 

79 def get_task_status(self, task_id: str) -> Optional[Dict]: 

80 """Get the current status of a task.""" 

81 with self._lock: 

82 return self.tasks.get(task_id) 

83 

84 def remove_task(self, task_id: str): 

85 """Remove a task from tracking.""" 

86 with self._lock: 

87 self.tasks.pop(task_id, None) 

88 

89 def cleanup_completed_tasks(self, max_age_seconds: int = 3600): 

90 """Remove completed tasks older than max_age_seconds. 

91 

92 Args: 

93 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour) 

94 """ 

95 with self._lock: 

96 now = datetime.now(UTC) 

97 to_remove = [] 

98 for task_id, task_data in self.tasks.items(): 

99 # Only cleanup completed, failed, or cancelled tasks 

100 if task_data["status"] in [ 

101 BenchmarkTaskStatus.COMPLETED.value, 

102 BenchmarkTaskStatus.FAILED.value, 

103 BenchmarkTaskStatus.CANCELLED.value, 

104 ]: 

105 # Check if task has updated_at timestamp 

106 updated_at = task_data.get( 

107 "updated_at", task_data.get("created_at") 

108 ) 

109 if updated_at: 

110 age = (now - updated_at).total_seconds() 

111 if age > max_age_seconds: 

112 to_remove.append(task_id) 

113 

114 for task_id in to_remove: 

115 self.tasks.pop(task_id, None) 

116 logger.debug(f"Cleaned up old task: {task_id}") 

117 

118 if to_remove: 

119 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks") 

120 

121 

122class BenchmarkService: 

123 """Service for managing benchmark runs through the web interface.""" 

124 

125 def __init__(self, socket_service=None): 

126 self.active_runs: Dict[int, Dict] = {} 

127 self.socket_service = socket_service or self._get_socket_service() 

128 self.rate_limit_detected: Dict[ 

129 int, bool 

130 ] = {} # Track rate limiting per benchmark run 

131 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker 

132 

133 def _get_socket_service(self): 

134 """Get socket service instance, handling cases where Flask app is not available.""" 

135 try: 

136 return SocketIOService() 

137 except Exception: 

138 # Return a mock socket service for testing/standalone use 

139 class MockSocketService: 

140 def emit_to_room(self, room, event, data): 

141 pass 

142 

143 return MockSocketService() 

144 

145 def generate_config_hash(self, search_config: Dict[str, Any]) -> str: 

146 """Generate a hash for search configuration compatibility checking.""" 

147 relevant_params = { 

148 "iterations": search_config.get("iterations"), 

149 "questions_per_iteration": search_config.get( 

150 "questions_per_iteration" 

151 ), 

152 "search_tool": search_config.get("search_tool"), 

153 "search_strategy": search_config.get("search_strategy"), 

154 "model_name": search_config.get("model_name"), 

155 "provider": search_config.get("provider"), 

156 } 

157 # Remove None values 

158 relevant_params = { 

159 k: v for k, v in relevant_params.items() if v is not None 

160 } 

161 config_str = json.dumps(relevant_params, sort_keys=True) 

162 return hashlib.md5( # DevSkim: ignore DS126858 

163 config_str.encode(), usedforsecurity=False 

164 ).hexdigest()[:8] 

165 

166 def generate_query_hash(self, question: str, dataset_type: str) -> str: 

167 """Generate a hash for a query to enable deduplication.""" 

168 query_content = f"{question.strip()}|{dataset_type.lower()}" 

169 return hashlib.md5( # DevSkim: ignore DS126858 

170 query_content.encode(), usedforsecurity=False 

171 ).hexdigest() 

172 

173 def create_benchmark_run( 

174 self, 

175 run_name: Optional[str], 

176 search_config: Dict[str, Any], 

177 evaluation_config: Dict[str, Any], 

178 datasets_config: Dict[str, Dict], 

179 username: str = None, 

180 user_password: str = None, 

181 ) -> int: 

182 """Create a new benchmark run in the database.""" 

183 from ...database.session_context import get_user_db_session 

184 

185 with get_user_db_session(username, user_password) as session: 

186 try: 

187 config_hash = self.generate_config_hash(search_config) 

188 

189 # Calculate total examples 

190 total_examples = sum( 

191 config.get("count", 0) 

192 for config in datasets_config.values() 

193 ) 

194 

195 benchmark_run = BenchmarkRun( 

196 run_name=run_name, 

197 config_hash=config_hash, 

198 query_hash_list=[], # Will be populated as we process 

199 search_config=search_config, 

200 evaluation_config=evaluation_config, 

201 datasets_config=datasets_config, 

202 total_examples=total_examples, 

203 status=BenchmarkStatus.PENDING, 

204 ) 

205 

206 session.add(benchmark_run) 

207 session.commit() 

208 

209 logger.info( 

210 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}" 

211 ) 

212 return benchmark_run.id 

213 

214 except Exception: 

215 session.rollback() 

216 logger.exception("Error creating benchmark run") 

217 raise 

218 

219 def get_existing_results( 

220 self, config_hash: str, username: str = None, user_password: str = None 

221 ) -> Dict[str, Dict]: 

222 """Get existing results with compatible configuration.""" 

223 from ...database.session_context import get_user_db_session 

224 

225 with get_user_db_session(username, user_password) as session: 

226 try: 

227 # Find compatible runs 

228 compatible_runs = ( 

229 session.query(BenchmarkRun) 

230 .filter(BenchmarkRun.config_hash == config_hash) 

231 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED) 

232 .all() 

233 ) 

234 

235 existing_results = {} 

236 for run in compatible_runs: 

237 results = ( 

238 session.query(BenchmarkResult) 

239 .filter(BenchmarkResult.benchmark_run_id == run.id) 

240 .filter( 

241 BenchmarkResult.is_correct.isnot(None) 

242 ) # Only completed evaluations 

243 .all() 

244 ) 

245 

246 for result in results: 

247 existing_results[result.query_hash] = { 

248 "id": result.example_id, 

249 "dataset_type": result.dataset_type.value, 

250 "problem": result.question, 

251 "correct_answer": result.correct_answer, 

252 "response": result.response, 

253 "extracted_answer": result.extracted_answer, 

254 "confidence": result.confidence, 

255 "processing_time": result.processing_time, 

256 "sources": result.sources, 

257 "is_correct": result.is_correct, 

258 "graded_confidence": result.graded_confidence, 

259 "grader_response": result.grader_response, 

260 "query_hash": result.query_hash, 

261 } 

262 

263 logger.info( 

264 f"Found {len(existing_results)} existing results for config hash {config_hash}" 

265 ) 

266 return existing_results 

267 

268 except Exception: 

269 logger.exception("Error loading existing results") 

270 return {} 

271 

272 def start_benchmark( 

273 self, 

274 benchmark_run_id: int, 

275 username: str = None, 

276 user_password: str = None, 

277 ) -> bool: 

278 """Start a benchmark run in a background thread.""" 

279 from ...database.session_context import get_user_db_session 

280 

281 try: 

282 # Get all data from the database in the main thread 

283 # This avoids database access from the background thread 

284 with get_user_db_session(username, user_password) as session: 

285 # Get benchmark run details 

286 benchmark_run = ( 

287 session.query(BenchmarkRun) 

288 .filter(BenchmarkRun.id == benchmark_run_id) 

289 .first() 

290 ) 

291 if not benchmark_run: 

292 raise ValueError( 

293 f"Benchmark run {benchmark_run_id} not found" 

294 ) 

295 

296 # Create settings snapshot for thread safety 

297 from local_deep_research.settings import SettingsManager 

298 

299 settings_manager = SettingsManager(session) 

300 settings_snapshot = settings_manager.get_all_settings() 

301 

302 # Get user password for metrics tracking in background thread 

303 from flask import session as flask_session 

304 from ...database.session_passwords import session_password_store 

305 

306 user_password = None 

307 session_id = flask_session.get("session_id") 

308 if session_id and username: 

309 user_password = session_password_store.get_session_password( 

310 username, session_id 

311 ) 

312 if not user_password: 

313 logger.warning( 

314 f"No password found for user {username} in current session" 

315 ) 

316 

317 # Extract all data we need 

318 benchmark_data = { 

319 "benchmark_run_id": benchmark_run_id, 

320 "username": username or "benchmark_user", 

321 "user_password": user_password, # Add password for metrics tracking 

322 "config_hash": benchmark_run.config_hash, 

323 "datasets_config": benchmark_run.datasets_config, 

324 "search_config": benchmark_run.search_config, 

325 "evaluation_config": benchmark_run.evaluation_config, 

326 "existing_results": self.get_existing_results( 

327 benchmark_run.config_hash, username, user_password 

328 ), 

329 "settings_snapshot": settings_snapshot, # Add settings snapshot 

330 } 

331 

332 # Update status in database 

333 benchmark_run.status = BenchmarkStatus.IN_PROGRESS 

334 benchmark_run.start_time = datetime.now(UTC) 

335 session.commit() 

336 

337 # Store data in memory for the thread 

338 self.active_runs[benchmark_run_id] = { 

339 "data": benchmark_data, 

340 "start_time": datetime.now(UTC), 

341 "status": "running", 

342 "results": [], 

343 } 

344 

345 # Start background thread 

346 thread = threading.Thread( 

347 target=self._run_benchmark_thread, 

348 args=(benchmark_run_id,), 

349 daemon=True, 

350 ) 

351 thread.start() 

352 

353 self.active_runs[benchmark_run_id]["thread"] = thread 

354 

355 logger.info(f"Started benchmark run {benchmark_run_id}") 

356 return True 

357 

358 except Exception as e: 

359 logger.exception(f"Error starting benchmark {benchmark_run_id}") 

360 # Update status using user database 

361 with get_user_db_session(username, user_password) as session: 

362 benchmark_run = ( 

363 session.query(BenchmarkRun) 

364 .filter(BenchmarkRun.id == benchmark_run_id) 

365 .first() 

366 ) 

367 if benchmark_run: 

368 benchmark_run.status = BenchmarkStatus.FAILED 

369 benchmark_run.error_message = str(e) 

370 session.commit() 

371 return False 

372 

373 def _run_benchmark_thread(self, benchmark_run_id: int): 

374 """Main benchmark execution thread.""" 

375 # IMPORTANT: This runs in a background thread, so we cannot access the user database 

376 # Using in-memory queue tracker for benchmark status tracking 

377 

378 task_id = None 

379 try: 

380 # Get the benchmark data that was passed to us 

381 # We need to retrieve this from the service database or from memory 

382 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get( 

383 "data" 

384 ) 

385 

386 if not benchmark_data: 

387 raise ValueError( 

388 f"Benchmark data for run {benchmark_run_id} not found" 

389 ) 

390 

391 # Set up settings context for thread-local access 

392 settings_snapshot = benchmark_data.get("settings_snapshot", {}) 

393 username = benchmark_data.get("username", "benchmark_user") 

394 

395 # Create a settings context that threads can use 

396 class SettingsContext: 

397 def __init__(self, snapshot, username): 

398 self.snapshot = snapshot or {} 

399 self.username = username 

400 # Extract values from setting objects if needed 

401 self.values = {} 

402 for key, setting in self.snapshot.items(): 

403 if isinstance(setting, dict) and "value" in setting: 

404 # It's a full setting object, extract the value 

405 self.values[key] = setting["value"] 

406 else: 

407 # It's already just a value 

408 self.values[key] = setting 

409 

410 def get_setting(self, key, default=None): 

411 """Get setting from snapshot only - no database access in threads""" 

412 if key in self.values: 

413 return self.values[key] 

414 # No fallback to database - threads must use snapshot only 

415 logger.warning( 

416 f"Setting '{key}' not found in snapshot, using default" 

417 ) 

418 return default 

419 

420 settings_context = SettingsContext(settings_snapshot, username) 

421 

422 # Set the context in thread-local storage 

423 from ...config.thread_settings import set_settings_context 

424 

425 set_settings_context(settings_context) 

426 

427 # Extract all the data we need 

428 datasets_config = benchmark_data["datasets_config"] 

429 search_config = benchmark_data["search_config"] 

430 evaluation_config = benchmark_data["evaluation_config"] 

431 existing_results = benchmark_data.get("existing_results", {}) 

432 

433 # Create task queue 

434 task_queue = self._create_task_queue( 

435 datasets_config, 

436 existing_results, 

437 benchmark_run_id, 

438 ) 

439 

440 # Calculate totals 

441 total_examples = len(task_queue) + len(existing_results) 

442 completed_examples = len(existing_results) 

443 

444 # Initialize task tracking 

445 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}" 

446 username = benchmark_data.get("username", "benchmark_user") 

447 self.queue_tracker.add_task(task_id, username, "benchmark") 

448 self.queue_tracker.update_task_status( 

449 task_id, BenchmarkTaskStatus.PROCESSING 

450 ) 

451 

452 # Track progress in memory 

453 progress_info = { 

454 "total_examples": total_examples, 

455 "completed_examples": completed_examples, 

456 "failed_examples": 0, 

457 "start_time": datetime.now(UTC), 

458 } 

459 

460 # Process tasks 

461 logger.info( 

462 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks" 

463 ) 

464 for i, task in enumerate(task_queue): 

465 # Check if benchmark has been cancelled 

466 if ( 

467 benchmark_run_id in self.active_runs 

468 and self.active_runs[benchmark_run_id].get("status") 

469 == "cancelled" 

470 ): 

471 logger.info( 

472 f"Benchmark {benchmark_run_id} was cancelled, stopping processing" 

473 ) 

474 break 

475 

476 logger.info( 

477 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}" 

478 ) 

479 try: 

480 # Add username and password to task for metrics tracking 

481 task["username"] = benchmark_data.get("username") 

482 task["user_password"] = benchmark_data.get("user_password") 

483 

484 # Process single task 

485 result = self._process_benchmark_task( 

486 task, 

487 search_config, 

488 evaluation_config, 

489 ) 

490 

491 # Store result in memory for now (will be saved later) 

492 if "results" not in self.active_runs[benchmark_run_id]: 

493 self.active_runs[benchmark_run_id]["results"] = [] 

494 self.active_runs[benchmark_run_id]["results"].append(result) 

495 

496 # Update progress 

497 progress_info["completed_examples"] += 1 

498 

499 logger.info( 

500 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. " 

501 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples" 

502 ) 

503 

504 # Send real-time update 

505 self._send_progress_update( 

506 benchmark_run_id, 

507 progress_info["completed_examples"], 

508 progress_info["total_examples"], 

509 ) 

510 

511 except Exception as e: 

512 logger.exception(f"Error processing task {i}: {str(e)}") 

513 progress_info["failed_examples"] += 1 

514 logger.info( 

515 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. " 

516 f"Total failed: {progress_info['failed_examples']}" 

517 ) 

518 

519 # Check if this is a rate limiting error 

520 error_str = str(e).lower() 

521 if ( 

522 "403" in error_str 

523 or "rate limit" in error_str 

524 or "forbidden" in error_str 

525 ): 

526 self.rate_limit_detected[benchmark_run_id] = True 

527 # Send rate limit warning via WebSocket 

528 self.socket_service.emit_to_subscribers( 

529 "research_progress", 

530 benchmark_run_id, 

531 { 

532 "rate_limit_detected": True, 

533 "message": "SearXNG rate limiting detected", 

534 }, 

535 ) 

536 

537 # Mark as completed in memory tracker 

538 progress_info["end_time"] = datetime.now(UTC) 

539 

540 # Check if benchmark was cancelled 

541 was_cancelled = ( 

542 benchmark_run_id in self.active_runs 

543 and self.active_runs[benchmark_run_id].get("status") 

544 == "cancelled" 

545 ) 

546 

547 if was_cancelled: 

548 status = BenchmarkStatus.CANCELLED 

549 message = "Benchmark cancelled by user" 

550 if task_id: 

551 self.queue_tracker.update_task_status( 

552 task_id, BenchmarkTaskStatus.CANCELLED 

553 ) 

554 else: 

555 status = BenchmarkStatus.COMPLETED 

556 message = "Benchmark completed successfully" 

557 if task_id: 

558 self.queue_tracker.update_task_status( 

559 task_id, BenchmarkTaskStatus.COMPLETED 

560 ) 

561 

562 # Store completion info for later database update 

563 self.active_runs[benchmark_run_id]["completion_info"] = { 

564 "status": status, 

565 "end_time": progress_info["end_time"], 

566 "completed_examples": progress_info["completed_examples"], 

567 "failed_examples": progress_info["failed_examples"], 

568 } 

569 

570 # Send completion notification 

571 self.socket_service.emit_to_subscribers( 

572 "research_progress", 

573 benchmark_run_id, 

574 { 

575 "status": "cancelled" if was_cancelled else "completed", 

576 "message": message, 

577 "progress": ( 

578 progress_info["completed_examples"] 

579 / progress_info["total_examples"] 

580 * 100 

581 ) 

582 if progress_info["total_examples"] > 0 

583 else 0, 

584 "benchmark_run_id": benchmark_run_id, 

585 }, 

586 ) 

587 

588 except Exception as e: 

589 logger.exception(f"Benchmark run {benchmark_run_id} failed") 

590 # Update task status if we have a task_id 

591 if task_id: 

592 self.queue_tracker.update_task_status( 

593 task_id, BenchmarkTaskStatus.FAILED 

594 ) 

595 # Store failure info for later database update 

596 if benchmark_run_id in self.active_runs: 

597 self.active_runs[benchmark_run_id]["completion_info"] = { 

598 "status": BenchmarkStatus.FAILED, 

599 "error_message": str(e), 

600 } 

601 finally: 

602 # Clean up active run tracking 

603 if benchmark_run_id in self.active_runs: 

604 # Mark that thread is done but keep data for database update 

605 self.active_runs[benchmark_run_id]["thread_complete"] = True 

606 

607 # Try to save results to database immediately if possible 

608 self._sync_results_to_database(benchmark_run_id) 

609 

610 def _create_task_queue( 

611 self, 

612 datasets_config: Dict, 

613 existing_results: Dict, 

614 benchmark_run_id: int, 

615 ) -> List[Dict]: 

616 """Create list of tasks to process, excluding existing results.""" 

617 tasks = [] 

618 

619 for dataset_name, config in datasets_config.items(): 

620 if config.get("count", 0) > 0: 

621 dataset = load_dataset( 

622 dataset_type=dataset_name, 

623 num_examples=config["count"], 

624 seed=None, 

625 ) 

626 

627 for i, example in enumerate(dataset): 

628 # Extract question based on dataset type 

629 if dataset_name.lower() == "simpleqa": 

630 question = example.get("problem", "") 

631 correct_answer = example.get("answer", "") 

632 else: # browsecomp 

633 question = example.get("problem", "") 

634 correct_answer = example.get("answer", "") 

635 

636 # Generate query hash 

637 query_hash = self.generate_query_hash( 

638 question, dataset_name 

639 ) 

640 

641 # Skip if already processed 

642 if query_hash in existing_results: 

643 continue 

644 

645 tasks.append( 

646 { 

647 "benchmark_run_id": benchmark_run_id, 

648 "example_id": example.get("id", f"example_{i}"), 

649 "dataset_type": dataset_name, 

650 "question": question, 

651 "correct_answer": correct_answer, 

652 "query_hash": query_hash, 

653 "task_index": len(tasks), 

654 } 

655 ) 

656 

657 return tasks 

658 

659 def _process_benchmark_task( 

660 self, task: Dict, search_config: Dict, evaluation_config: Dict 

661 ) -> Dict: 

662 """Process a single benchmark task.""" 

663 try: 

664 logger.info( 

665 f"Starting benchmark task {task['task_index'] + 1}: " 

666 f"example_id={task['example_id']}, dataset={task['dataset_type']}, " 

667 f"question_preview='{task['question'][:100]}...'" 

668 ) 

669 

670 # Get settings context from thread-local storage 

671 from ...config.thread_settings import get_settings_context 

672 

673 settings_context = get_settings_context() 

674 

675 # Generate a unique tracking ID for this benchmark task 

676 import uuid 

677 

678 tracking_id = str(uuid.uuid4()) 

679 logger.info( 

680 f"Task {task['example_id']} assigned tracking_id: {tracking_id}" 

681 ) 

682 

683 # Format query 

684 formatted_query = format_query( 

685 task["question"], task["dataset_type"] 

686 ) 

687 logger.info( 

688 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'" 

689 ) 

690 

691 # Run research with progress callback for WebSocket updates 

692 start_time = time.time() 

693 logger.info(f"Task {task['example_id']} starting research phase...") 

694 

695 def benchmark_progress_callback( 

696 status: str, progress: int, data: dict 

697 ): 

698 """Progress callback to emit detailed research progress via WebSocket""" 

699 try: 

700 timestamp = datetime.now(UTC).isoformat() 

701 

702 # Create research-compatible log entry 

703 log_entry = { 

704 "time": timestamp, 

705 "message": f"Example {task['example_id']}: {status}", 

706 "progress": progress, 

707 "metadata": { 

708 "phase": data.get("phase", "benchmark_processing"), 

709 "type": data.get("type", "info"), 

710 "example_id": task["example_id"], 

711 "benchmark_run_id": task["benchmark_run_id"], 

712 **data, # Include all other data 

713 }, 

714 } 

715 

716 # Determine log type based on status/message content 

717 if ( 

718 "complete" in status.lower() 

719 or "finished" in status.lower() 

720 ): 

721 log_entry["metadata"]["type"] = "milestone" 

722 elif ( 

723 "error" in status.lower() or "failed" in status.lower() 

724 ): 

725 log_entry["metadata"]["type"] = "error" 

726 elif ( 

727 "starting" in status.lower() 

728 or "begin" in status.lower() 

729 ): 

730 log_entry["metadata"]["type"] = "milestone" 

731 

732 # Create progress data in research format 

733 progress_data = { 

734 "progress": progress, 

735 "message": status, 

736 "status": "in_progress", 

737 "log_entry": log_entry, 

738 "progress_log": json.dumps( 

739 [log_entry] 

740 ), # Array format expected by socket.js 

741 } 

742 

743 # Emit using research_progress format that the UI expects 

744 self.socket_service.emit_to_subscribers( 

745 "research_progress", 

746 task["benchmark_run_id"], 

747 progress_data, 

748 ) 

749 

750 except Exception: 

751 logger.exception("Error sending benchmark progress update") 

752 

753 # Get user password from task data 

754 user_password = task.get("user_password") 

755 

756 search_result = quick_summary( 

757 query=formatted_query, 

758 research_id=tracking_id, # Pass the tracking ID 

759 iterations=search_config.get("iterations", 8), 

760 questions_per_iteration=search_config.get( 

761 "questions_per_iteration", 5 

762 ), 

763 search_tool=search_config.get("search_tool", "searxng"), 

764 search_strategy=search_config.get( 

765 "search_strategy", "focused_iteration" 

766 ), 

767 progress_callback=benchmark_progress_callback, 

768 model_name=search_config.get("model_name"), 

769 provider=search_config.get("provider"), 

770 temperature=search_config.get("temperature", 0.7), 

771 openai_endpoint_url=search_config.get("openai_endpoint_url"), 

772 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety 

773 username=task.get("username"), # Pass username 

774 user_password=user_password, # Pass password for metrics tracking 

775 ) 

776 processing_time = time.time() - start_time 

777 logger.info( 

778 f"Task {task['example_id']} research completed in {processing_time:.2f}s, " 

779 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}" 

780 ) 

781 

782 # Extract answer 

783 response = search_result.get("summary", "") 

784 logger.info( 

785 f"Task {task['example_id']} response length: {len(response)} chars" 

786 ) 

787 

788 extracted_data = extract_answer_from_response( 

789 response, task["dataset_type"] 

790 ) 

791 extracted_answer = ( 

792 extracted_data.get("extracted_answer", "") 

793 if isinstance(extracted_data, dict) 

794 else str(extracted_data) 

795 ) 

796 logger.info( 

797 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'" 

798 ) 

799 

800 # Extract sources - handle both direct sources and all_links_of_system 

801 sources = search_result.get("sources", []) 

802 if not sources and "all_links_of_system" in search_result: 

803 sources = search_result.get("all_links_of_system", []) 

804 

805 # Log for debugging 

806 logger.debug(f"Search result keys: {list(search_result.keys())}") 

807 logger.debug(f"Sources found: {len(sources)} items") 

808 

809 # Prepare result 

810 result = { 

811 **task, 

812 "response": response, 

813 "extracted_answer": extracted_answer, 

814 "confidence": str( 

815 extracted_data.get("confidence", "100") 

816 if isinstance(extracted_data, dict) 

817 else "100" 

818 ), 

819 "processing_time": processing_time, 

820 "sources": json.dumps(sources), # Convert to JSON string 

821 "completed_at": datetime.now(UTC), 

822 "research_id": tracking_id, # Store the UUID in the research_id field 

823 } 

824 

825 # Evaluate result - requires proper grading model 

826 try: 

827 logger.info(f"Task {task['example_id']} starting evaluation...") 

828 eval_start_time = time.time() 

829 

830 # Always attempt evaluation, regardless of provider 

831 # Modern local models like Ollama are capable of grading 

832 # Try to evaluate with proper model 

833 result_data = { 

834 "id": task["example_id"], 

835 "problem": task["question"], 

836 "correct_answer": task["correct_answer"], 

837 "response": response, 

838 "extracted_answer": extracted_answer, 

839 } 

840 

841 eval_result = grade_single_result( 

842 result_data, 

843 task["dataset_type"], 

844 evaluation_config, 

845 settings_context.snapshot, 

846 ) 

847 eval_time = time.time() - eval_start_time 

848 logger.info( 

849 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s" 

850 ) 

851 if eval_result and not eval_result.get("grading_error"): 

852 result.update( 

853 { 

854 "is_correct": eval_result.get("is_correct", False), 

855 "graded_confidence": eval_result.get( 

856 "graded_confidence", "0" 

857 ), 

858 "grader_response": eval_result.get( 

859 "grader_response", "" 

860 ), 

861 } 

862 ) 

863 else: 

864 error_msg = ( 

865 eval_result.get( 

866 "grading_error", "Unknown evaluation error" 

867 ) 

868 if eval_result 

869 else "No evaluation results returned" 

870 ) 

871 result.update( 

872 { 

873 "is_correct": None, 

874 "graded_confidence": "0", 

875 "grader_response": f"Evaluation failed: {error_msg}", 

876 "evaluation_error": error_msg, 

877 } 

878 ) 

879 

880 except Exception as e: 

881 logger.exception("Evaluation error") 

882 result.update( 

883 { 

884 "is_correct": None, 

885 "graded_confidence": "0", 

886 "grader_response": f"Evaluation failed: {e!s}", 

887 "evaluation_error": str(e), 

888 } 

889 ) 

890 

891 return result 

892 

893 except Exception as e: 

894 logger.exception("Research error") 

895 return { 

896 **task, 

897 "research_error": str(e), 

898 "completed_at": datetime.now(UTC), 

899 } 

900 

901 def sync_pending_results(self, benchmark_run_id: int, username: str = None): 

902 """Sync any pending results to database. Can be called from main thread.""" 

903 if benchmark_run_id not in self.active_runs: 

904 return 0 

905 

906 run_data = self.active_runs[benchmark_run_id] 

907 results_to_save = run_data.get("results", []) 

908 saved_indices = run_data.get("saved_indices", set()) 

909 

910 if not username: 

911 username = run_data.get("data", {}).get("username") 

912 

913 user_password = run_data.get("data", {}).get("user_password") 

914 

915 saved_count = 0 

916 from ...database.session_context import get_user_db_session 

917 from ...database.models.benchmark import BenchmarkResult 

918 

919 try: 

920 with get_user_db_session(username, user_password) as session: 

921 # Save any results that haven't been saved yet 

922 for idx, result in enumerate(results_to_save): 

923 if idx not in saved_indices: 

924 # Check if this result already exists in the database 

925 existing = ( 

926 session.query(BenchmarkResult) 

927 .filter_by( 

928 benchmark_run_id=benchmark_run_id, 

929 query_hash=result["query_hash"], 

930 ) 

931 .first() 

932 ) 

933 

934 if existing: 

935 # Skip if already exists 

936 saved_indices.add(idx) 

937 continue 

938 

939 benchmark_result = BenchmarkResult( 

940 benchmark_run_id=benchmark_run_id, 

941 example_id=result["example_id"], 

942 query_hash=result["query_hash"], 

943 dataset_type=DatasetType(result["dataset_type"]), 

944 research_id=result.get("research_id"), 

945 question=result["question"], 

946 correct_answer=result["correct_answer"], 

947 response=result.get("response"), 

948 extracted_answer=result.get("extracted_answer"), 

949 confidence=result.get("confidence"), 

950 processing_time=result.get("processing_time"), 

951 sources=result.get("sources"), 

952 is_correct=result.get("is_correct"), 

953 graded_confidence=result.get("graded_confidence"), 

954 grader_response=result.get("grader_response"), 

955 completed_at=result.get("completed_at"), 

956 research_error=result.get("research_error"), 

957 evaluation_error=result.get("evaluation_error"), 

958 task_index=result.get("task_index"), 

959 ) 

960 session.add(benchmark_result) 

961 saved_indices.add(idx) 

962 saved_count += 1 

963 

964 if saved_count > 0: 

965 session.commit() 

966 run_data["saved_indices"] = saved_indices 

967 logger.info( 

968 f"Saved {saved_count} new results for benchmark {benchmark_run_id}" 

969 ) 

970 

971 except Exception: 

972 logger.exception( 

973 f"Error syncing pending results for benchmark {benchmark_run_id}" 

974 ) 

975 # Roll back the session on error to prevent PendingRollbackError 

976 try: 

977 session.rollback() 

978 except: 

979 pass 

980 

981 return saved_count 

982 

983 def _sync_results_to_database(self, benchmark_run_id: int): 

984 """Sync benchmark results from memory to database after thread completes.""" 

985 if benchmark_run_id not in self.active_runs: 

986 return 

987 

988 run_data = self.active_runs[benchmark_run_id] 

989 if not run_data.get("thread_complete"): 

990 return 

991 

992 username = run_data.get("data", {}).get("username") 

993 user_password = run_data.get("data", {}).get("user_password") 

994 from ...database.session_context import get_user_db_session 

995 

996 try: 

997 with get_user_db_session(username, user_password) as session: 

998 # Update benchmark run status 

999 benchmark_run = ( 

1000 session.query(BenchmarkRun) 

1001 .filter(BenchmarkRun.id == benchmark_run_id) 

1002 .first() 

1003 ) 

1004 

1005 if benchmark_run and "completion_info" in run_data: 

1006 info = run_data["completion_info"] 

1007 benchmark_run.status = info["status"] 

1008 benchmark_run.end_time = info.get( 

1009 "end_time", datetime.now(UTC) 

1010 ) 

1011 benchmark_run.completed_examples = info.get( 

1012 "completed_examples", 0 

1013 ) 

1014 benchmark_run.failed_examples = info.get( 

1015 "failed_examples", 0 

1016 ) 

1017 benchmark_run.error_message = info.get("error_message") 

1018 

1019 # Save all results (skip already saved ones) 

1020 saved_indices = run_data.get("saved_indices", set()) 

1021 for idx, result in enumerate(run_data.get("results", [])): 

1022 if idx in saved_indices: 

1023 continue 

1024 benchmark_result = BenchmarkResult( 

1025 benchmark_run_id=benchmark_run_id, 

1026 example_id=result["example_id"], 

1027 query_hash=result["query_hash"], 

1028 dataset_type=DatasetType(result["dataset_type"]), 

1029 research_id=result.get("research_id"), 

1030 question=result["question"], 

1031 correct_answer=result["correct_answer"], 

1032 response=result.get("response"), 

1033 extracted_answer=result.get("extracted_answer"), 

1034 confidence=result.get("confidence"), 

1035 processing_time=result.get("processing_time"), 

1036 sources=result.get("sources"), 

1037 is_correct=result.get("is_correct"), 

1038 graded_confidence=result.get("graded_confidence"), 

1039 grader_response=result.get("grader_response"), 

1040 completed_at=result.get("completed_at"), 

1041 research_error=result.get("research_error"), 

1042 evaluation_error=result.get("evaluation_error"), 

1043 task_index=result.get("task_index"), 

1044 ) 

1045 session.add(benchmark_result) 

1046 

1047 # Calculate final accuracy 

1048 if benchmark_run.status == BenchmarkStatus.COMPLETED: 

1049 correct_results = [ 

1050 r 

1051 for r in run_data.get("results", []) 

1052 if r.get("is_correct") 

1053 ] 

1054 evaluated_results = [ 

1055 r 

1056 for r in run_data.get("results", []) 

1057 if r.get("is_correct") is not None 

1058 ] 

1059 

1060 if evaluated_results: 

1061 benchmark_run.overall_accuracy = ( 

1062 len(correct_results) / len(evaluated_results) 

1063 ) * 100 

1064 

1065 # Calculate processing rate 

1066 total_time = sum( 

1067 r.get("processing_time", 0) 

1068 for r in evaluated_results 

1069 ) 

1070 if total_time > 0: 

1071 benchmark_run.processing_rate = len( 

1072 evaluated_results 

1073 ) / (total_time / 60) 

1074 

1075 session.commit() 

1076 logger.info( 

1077 f"Successfully synced results for benchmark {benchmark_run_id}" 

1078 ) 

1079 

1080 # Clean up memory 

1081 del self.active_runs[benchmark_run_id] 

1082 

1083 except Exception: 

1084 logger.exception("Error syncing benchmark results to database") 

1085 

1086 def _send_progress_update( 

1087 self, benchmark_run_id: int, completed: int, total: int 

1088 ): 

1089 """Send real-time progress update via websocket.""" 

1090 try: 

1091 percentage = (completed / total * 100) if total > 0 else 0 

1092 

1093 # Create log entry for milestone progress 

1094 log_entry = { 

1095 "time": datetime.now(UTC).isoformat(), 

1096 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)", 

1097 "progress": percentage, 

1098 "metadata": { 

1099 "phase": "benchmark_progress", 

1100 "type": "milestone", 

1101 "completed": completed, 

1102 "total": total, 

1103 "benchmark_run_id": benchmark_run_id, 

1104 }, 

1105 } 

1106 

1107 progress_data = { 

1108 "status": "in_progress", 

1109 "message": f"Processing examples: {completed}/{total}", 

1110 "progress": percentage, 

1111 "completed": completed, 

1112 "total": total, 

1113 "benchmark_run_id": benchmark_run_id, 

1114 "log_entry": log_entry, 

1115 "progress_log": json.dumps([log_entry]), 

1116 } 

1117 

1118 self.socket_service.emit_to_subscribers( 

1119 "research_progress", benchmark_run_id, progress_data 

1120 ) 

1121 

1122 except Exception: 

1123 logger.exception("Error sending progress update") 

1124 

1125 def _calculate_final_accuracy( 

1126 self, benchmark_run_id: int, username: str = None 

1127 ): 

1128 """Calculate and save final accuracy metrics.""" 

1129 from ...database.session_context import get_user_db_session 

1130 

1131 with get_user_db_session(username) as session: 

1132 try: 

1133 # Get all results for this run 

1134 results = ( 

1135 session.query(BenchmarkResult) 

1136 .filter( 

1137 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1138 ) 

1139 .filter(BenchmarkResult.is_correct.isnot(None)) 

1140 .all() 

1141 ) 

1142 

1143 if results: 

1144 correct_count = sum(1 for r in results if r.is_correct) 

1145 overall_accuracy = (correct_count / len(results)) * 100 

1146 

1147 # Calculate processing rate 

1148 total_time = sum(r.processing_time or 0 for r in results) 

1149 processing_rate = ( 

1150 (len(results) / (total_time / 60)) 

1151 if total_time > 0 

1152 else 0 

1153 ) 

1154 

1155 # Update benchmark run 

1156 benchmark_run = ( 

1157 session.query(BenchmarkRun) 

1158 .filter(BenchmarkRun.id == benchmark_run_id) 

1159 .first() 

1160 ) 

1161 if benchmark_run: 

1162 benchmark_run.overall_accuracy = overall_accuracy 

1163 benchmark_run.processing_rate = processing_rate 

1164 session.commit() 

1165 

1166 except Exception: 

1167 logger.exception("Error calculating final accuracy") 

1168 

1169 def update_benchmark_status( 

1170 self, 

1171 benchmark_run_id: int, 

1172 status: BenchmarkStatus, 

1173 error_message: str = None, 

1174 username: str = None, 

1175 ): 

1176 """Update benchmark run status.""" 

1177 from ...database.session_context import get_user_db_session 

1178 

1179 with get_user_db_session(username) as session: 

1180 try: 

1181 benchmark_run = ( 

1182 session.query(BenchmarkRun) 

1183 .filter(BenchmarkRun.id == benchmark_run_id) 

1184 .first() 

1185 ) 

1186 if benchmark_run: 

1187 benchmark_run.status = status 

1188 benchmark_run.updated_at = datetime.now(UTC) 

1189 

1190 if error_message: 

1191 benchmark_run.error_message = error_message 

1192 

1193 if ( 

1194 status == BenchmarkStatus.IN_PROGRESS 

1195 and not benchmark_run.start_time 

1196 ): 

1197 benchmark_run.start_time = datetime.now(UTC) 

1198 elif ( 

1199 status 

1200 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED] 

1201 and not benchmark_run.end_time 

1202 ): 

1203 benchmark_run.end_time = datetime.now(UTC) 

1204 

1205 session.commit() 

1206 

1207 except Exception: 

1208 session.rollback() 

1209 logger.exception("Error updating benchmark status") 

1210 

1211 def get_benchmark_status( 

1212 self, benchmark_run_id: int, username: str = None 

1213 ) -> Optional[Dict]: 

1214 """Get current status of a benchmark run.""" 

1215 from ...database.session_context import get_user_db_session 

1216 

1217 with get_user_db_session(username) as session: 

1218 try: 

1219 benchmark_run = ( 

1220 session.query(BenchmarkRun) 

1221 .filter(BenchmarkRun.id == benchmark_run_id) 

1222 .first() 

1223 ) 

1224 if not benchmark_run: 

1225 return None 

1226 

1227 # Calculate running accuracy from current results AND reused results from compatible runs 

1228 # First get results specifically for this benchmark run 

1229 current_results = ( 

1230 session.query(BenchmarkResult) 

1231 .filter( 

1232 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1233 ) 

1234 .filter(BenchmarkResult.is_correct.isnot(None)) 

1235 .all() 

1236 ) 

1237 

1238 # Then get reused results from compatible benchmark runs (same config hash) 

1239 # Only count results up to the number we say we've "completed" 

1240 if benchmark_run.completed_examples > len(current_results): 

1241 # We have reused results, get them from compatible runs 

1242 reused_count_needed = ( 

1243 benchmark_run.completed_examples - len(current_results) 

1244 ) 

1245 

1246 compatible_results = ( 

1247 session.query(BenchmarkResult) 

1248 .join( 

1249 BenchmarkRun, 

1250 BenchmarkResult.benchmark_run_id == BenchmarkRun.id, 

1251 ) 

1252 .filter( 

1253 BenchmarkRun.config_hash 

1254 == benchmark_run.config_hash 

1255 ) 

1256 .filter( 

1257 BenchmarkRun.id != benchmark_run_id 

1258 ) # Exclude current run 

1259 .filter( 

1260 BenchmarkRun.status == BenchmarkStatus.COMPLETED 

1261 ) 

1262 .filter(BenchmarkResult.is_correct.isnot(None)) 

1263 .order_by(BenchmarkResult.id) # Consistent ordering 

1264 .limit(reused_count_needed) 

1265 .all() 

1266 ) 

1267 

1268 # Combine current and reused results 

1269 results = ( 

1270 current_results 

1271 + compatible_results[:reused_count_needed] 

1272 ) 

1273 else: 

1274 # No reused results, just use current results 

1275 results = current_results 

1276 

1277 running_accuracy = None 

1278 # Dynamic per-dataset accuracy tracking 

1279 dataset_accuracies = {} 

1280 

1281 if results: 

1282 # Overall running accuracy 

1283 correct_count = sum(1 for r in results if r.is_correct) 

1284 running_accuracy = (correct_count / len(results)) * 100 

1285 

1286 # Calculate accuracy for each dataset type dynamically 

1287 from collections import defaultdict 

1288 

1289 dataset_results = defaultdict(list) 

1290 

1291 # Group results by dataset type 

1292 for r in results: 

1293 dataset_results[r.dataset_type.value].append(r) 

1294 

1295 # Calculate accuracy for each dataset 

1296 for ( 

1297 dataset_type, 

1298 dataset_result_list, 

1299 ) in dataset_results.items(): 

1300 if dataset_result_list: 

1301 correct = sum( 

1302 1 for r in dataset_result_list if r.is_correct 

1303 ) 

1304 accuracy = ( 

1305 correct / len(dataset_result_list) 

1306 ) * 100 

1307 # Store with _accuracy suffix for consistency 

1308 dataset_accuracies[f"{dataset_type}_accuracy"] = ( 

1309 accuracy 

1310 ) 

1311 

1312 # Calculate time estimates and reliability metrics 

1313 estimated_time_remaining = None 

1314 total_elapsed_time = None 

1315 avg_time_per_example = None 

1316 accuracy_confidence = None 

1317 

1318 # Get ALL results for timing calculation (including those pending evaluation) 

1319 all_results_for_timing = ( 

1320 session.query(BenchmarkResult) 

1321 .filter( 

1322 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1323 ) 

1324 .all() 

1325 ) 

1326 

1327 if benchmark_run.start_time and all_results_for_timing: 

1328 # Calculate elapsed time 

1329 current_time = datetime.now(UTC) 

1330 total_elapsed_time = ( 

1331 current_time - benchmark_run.start_time 

1332 ).total_seconds() 

1333 

1334 # Calculate average processing time per example using actual count 

1335 avg_time_per_example = total_elapsed_time / len( 

1336 all_results_for_timing 

1337 ) 

1338 

1339 logger.info( 

1340 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, " 

1341 f"results_count: {len(all_results_for_timing)}, " 

1342 f"avg_per_example: {avg_time_per_example:.2f}s" 

1343 ) 

1344 

1345 # Estimate remaining time 

1346 remaining_examples = benchmark_run.total_examples - len( 

1347 all_results_for_timing 

1348 ) 

1349 if remaining_examples > 0: 

1350 estimated_time_remaining = ( 

1351 avg_time_per_example * remaining_examples 

1352 ) 

1353 logger.info( 

1354 f"Time estimation - total: {benchmark_run.total_examples}, " 

1355 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, " 

1356 f"avg_time: {avg_time_per_example:.2f}s, " 

1357 f"estimated_remaining: {estimated_time_remaining:.2f}s" 

1358 ) 

1359 

1360 # Calculate accuracy confidence interval (95% confidence) 

1361 if results and len(results) >= 3: 

1362 import math 

1363 

1364 n = len(results) 

1365 p = running_accuracy / 100 if running_accuracy else 0 

1366 # Standard error for proportion 

1367 se = math.sqrt(p * (1 - p) / n) 

1368 # 95% confidence interval (±1.96 * SE) 

1369 margin_of_error = 1.96 * se * 100 

1370 accuracy_confidence = { 

1371 "lower_bound": max( 

1372 0, running_accuracy - margin_of_error 

1373 ), 

1374 "upper_bound": min( 

1375 100, running_accuracy + margin_of_error 

1376 ), 

1377 "margin_of_error": margin_of_error, 

1378 "sample_size": n, 

1379 } 

1380 

1381 status_data = { 

1382 "id": benchmark_run.id, 

1383 "run_name": benchmark_run.run_name, 

1384 "status": benchmark_run.status.value, 

1385 "completed_examples": len( 

1386 all_results_for_timing 

1387 ), # Use actual count from DB 

1388 "total_examples": benchmark_run.total_examples, 

1389 "failed_examples": benchmark_run.failed_examples, 

1390 "overall_accuracy": benchmark_run.overall_accuracy 

1391 or running_accuracy, # Use running accuracy if final not calculated 

1392 "running_accuracy": running_accuracy, # Current running accuracy 

1393 "processing_rate": benchmark_run.processing_rate, 

1394 "estimated_time_remaining": estimated_time_remaining, # seconds 

1395 "total_elapsed_time": total_elapsed_time, # seconds 

1396 "avg_time_per_example": avg_time_per_example, # seconds 

1397 "accuracy_confidence": accuracy_confidence, # confidence interval 

1398 "created_at": benchmark_run.created_at.isoformat() 

1399 if benchmark_run.created_at 

1400 else None, 

1401 "start_time": benchmark_run.start_time.isoformat() 

1402 if benchmark_run.start_time 

1403 else None, 

1404 "end_time": benchmark_run.end_time.isoformat() 

1405 if benchmark_run.end_time 

1406 else None, 

1407 "error_message": benchmark_run.error_message, 

1408 # Add all per-dataset accuracies dynamically 

1409 **dataset_accuracies, 

1410 } 

1411 

1412 logger.info( 

1413 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, " 

1414 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, " 

1415 f"avg_time: {avg_time_per_example}" 

1416 ) 

1417 

1418 return status_data 

1419 

1420 except Exception: 

1421 logger.exception("Error getting benchmark status") 

1422 return None 

1423 

1424 def cancel_benchmark( 

1425 self, benchmark_run_id: int, username: str = None 

1426 ) -> bool: 

1427 """Cancel a running benchmark.""" 

1428 try: 

1429 if benchmark_run_id in self.active_runs: 

1430 self.active_runs[benchmark_run_id]["status"] = "cancelled" 

1431 

1432 self.update_benchmark_status( 

1433 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username 

1434 ) 

1435 logger.info(f"Cancelled benchmark run {benchmark_run_id}") 

1436 return True 

1437 

1438 except Exception: 

1439 logger.exception(f"Error cancelling benchmark {benchmark_run_id}") 

1440 return False 

1441 

1442 

1443# Global service instance 

1444benchmark_service = BenchmarkService()