Coverage for src/local_deep_research/benchmarks/web_api/benchmark_service.py: 94%

477 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""Benchmark service for handling web-based benchmark execution.""" 

2 

3import hashlib 

4import json 

5import threading 

6import time 

7from datetime import UTC, datetime 

8from enum import Enum 

9from typing import Any, Dict, List, Optional 

10 

11from loguru import logger 

12 

13from ...api.research_functions import quick_summary 

14from ...settings.manager import SnapshotSettingsContext 

15from ...web.services.research_service import _global_research_semaphore 

16from ...database.models.benchmark import ( 

17 BenchmarkResult, 

18 BenchmarkRun, 

19 BenchmarkStatus, 

20 DatasetType, 

21) 

22from ...web.services.socket_service import SocketIOService 

23from ..datasets import load_dataset 

24from ..graders import extract_answer_from_response, grade_single_result 

25from ..runners import format_query 

26from ...database.thread_local_session import thread_cleanup 

27 

28 

29class BenchmarkTaskStatus(Enum): 

30 """Status values for benchmark tasks in the queue tracker.""" 

31 

32 QUEUED = "queued" 

33 PROCESSING = "processing" 

34 COMPLETED = "completed" 

35 FAILED = "failed" 

36 CANCELLED = "cancelled" 

37 

38 

39class BenchmarkQueueTracker: 

40 """Simple in-memory tracker for benchmark queue status. 

41 

42 This replaces the removed memory_queue functionality for benchmarks. 

43 Since benchmarks are temporary and don't need persistence, 

44 this simple in-memory solution is sufficient. 

45 

46 Thread-safe for concurrent access from multiple benchmark threads. 

47 """ 

48 

49 def __init__(self): 

50 self.tasks = {} 

51 self._lock = threading.Lock() 

52 

53 def add_task( 

54 self, task_id: str, username: str, task_type: str = "benchmark" 

55 ): 

56 """Add a new task to tracking. 

57 

58 Also performs opportunistic cleanup of old completed tasks. 

59 """ 

60 # Cleanup old tasks before adding new one (outside lock for better performance) 

61 self.cleanup_completed_tasks() 

62 

63 with self._lock: 

64 self.tasks[task_id] = { 

65 "username": username, 

66 "task_type": task_type, 

67 "status": BenchmarkTaskStatus.QUEUED.value, 

68 "created_at": datetime.now(UTC), 

69 } 

70 

71 def update_task_status(self, task_id: str, status: BenchmarkTaskStatus): 

72 """Update the status of a task.""" 

73 with self._lock: 

74 if task_id in self.tasks: 

75 self.tasks[task_id]["status"] = status.value 

76 self.tasks[task_id]["updated_at"] = datetime.now(UTC) 

77 else: 

78 logger.warning( 

79 f"Attempted to update status for non-existent task: {task_id}" 

80 ) 

81 

82 def get_task_status(self, task_id: str) -> Optional[Dict]: 

83 """Get the current status of a task.""" 

84 with self._lock: 

85 return self.tasks.get(task_id) 

86 

87 def remove_task(self, task_id: str): 

88 """Remove a task from tracking.""" 

89 with self._lock: 

90 self.tasks.pop(task_id, None) 

91 

92 def cleanup_completed_tasks(self, max_age_seconds: int = 3600): 

93 """Remove completed tasks older than max_age_seconds. 

94 

95 Args: 

96 max_age_seconds: Maximum age in seconds for completed tasks (default 1 hour) 

97 """ 

98 with self._lock: 

99 now = datetime.now(UTC) 

100 to_remove = [] 

101 for task_id, task_data in self.tasks.items(): 

102 # Only cleanup completed, failed, or cancelled tasks 

103 if task_data["status"] in [ 

104 BenchmarkTaskStatus.COMPLETED.value, 

105 BenchmarkTaskStatus.FAILED.value, 

106 BenchmarkTaskStatus.CANCELLED.value, 

107 ]: 

108 # Check if task has updated_at timestamp 

109 updated_at = task_data.get( 

110 "updated_at", task_data.get("created_at") 

111 ) 

112 if updated_at: 112 ↛ 101line 112 didn't jump to line 101 because the condition on line 112 was always true

113 age = (now - updated_at).total_seconds() 

114 if age > max_age_seconds: 

115 to_remove.append(task_id) 

116 

117 for task_id in to_remove: 

118 self.tasks.pop(task_id, None) 

119 logger.debug(f"Cleaned up old task: {task_id}") 

120 

121 if to_remove: 

122 logger.info(f"Cleaned up {len(to_remove)} old benchmark tasks") 

123 

124 

125class BenchmarkService: 

126 """Service for managing benchmark runs through the web interface.""" 

127 

128 def __init__(self, socket_service=None): 

129 self.active_runs: Dict[int, Dict] = {} 

130 self.socket_service = socket_service or self._get_socket_service() 

131 self.rate_limit_detected: Dict[ 

132 int, bool 

133 ] = {} # Track rate limiting per benchmark run 

134 self.queue_tracker = BenchmarkQueueTracker() # Initialize queue tracker 

135 

136 def _get_socket_service(self): 

137 """Get socket service instance, handling cases where Flask app is not available.""" 

138 try: 

139 return SocketIOService() 

140 except Exception: 

141 # Return a mock socket service for testing/standalone use 

142 class MockSocketService: 

143 def emit_to_room(self, room, event, data): 

144 pass 

145 

146 return MockSocketService() 

147 

148 def generate_config_hash(self, search_config: Dict[str, Any]) -> str: 

149 """Generate a hash for search configuration compatibility checking.""" 

150 relevant_params = { 

151 "iterations": search_config.get("iterations"), 

152 "questions_per_iteration": search_config.get( 

153 "questions_per_iteration" 

154 ), 

155 "search_tool": search_config.get("search_tool"), 

156 "search_strategy": search_config.get("search_strategy"), 

157 "model_name": search_config.get("model_name"), 

158 "provider": search_config.get("provider"), 

159 } 

160 # Remove None values 

161 relevant_params = { 

162 k: v for k, v in relevant_params.items() if v is not None 

163 } 

164 config_str = json.dumps(relevant_params, sort_keys=True) 

165 return hashlib.md5( # DevSkim: ignore DS126858 

166 config_str.encode(), usedforsecurity=False 

167 ).hexdigest()[:8] 

168 

169 def generate_query_hash(self, question: str, dataset_type: str) -> str: 

170 """Generate a hash for a query to enable deduplication.""" 

171 query_content = f"{question.strip()}|{dataset_type.lower()}" 

172 return hashlib.md5( # DevSkim: ignore DS126858 

173 query_content.encode(), usedforsecurity=False 

174 ).hexdigest() 

175 

176 def create_benchmark_run( 

177 self, 

178 run_name: Optional[str], 

179 search_config: Dict[str, Any], 

180 evaluation_config: Dict[str, Any], 

181 datasets_config: Dict[str, Dict], 

182 username: Optional[str] = None, 

183 user_password: Optional[str] = None, 

184 ) -> int: 

185 """Create a new benchmark run in the database.""" 

186 from ...database.session_context import get_user_db_session 

187 

188 with get_user_db_session(username, user_password) as session: 

189 try: 

190 config_hash = self.generate_config_hash(search_config) 

191 

192 # Calculate total examples 

193 total_examples = sum( 

194 config.get("count", 0) 

195 for config in datasets_config.values() 

196 ) 

197 

198 benchmark_run = BenchmarkRun( 

199 run_name=run_name, 

200 config_hash=config_hash, 

201 query_hash_list=[], # Will be populated as we process 

202 search_config=search_config, 

203 evaluation_config=evaluation_config, 

204 datasets_config=datasets_config, 

205 total_examples=total_examples, 

206 status=BenchmarkStatus.PENDING, 

207 ) 

208 

209 session.add(benchmark_run) 

210 session.commit() 

211 

212 logger.info( 

213 f"Created benchmark run {benchmark_run.id} with config hash {config_hash}" 

214 ) 

215 return int(benchmark_run.id) 

216 

217 except Exception: 

218 session.rollback() 

219 logger.exception("Error creating benchmark run") 

220 raise 

221 

222 def get_existing_results( 

223 self, 

224 config_hash: str, 

225 username: Optional[str] = None, 

226 user_password: Optional[str] = None, 

227 ) -> Dict[str, Dict]: 

228 """Get existing results with compatible configuration.""" 

229 from ...database.session_context import get_user_db_session 

230 

231 with get_user_db_session(username, user_password) as session: 

232 try: 

233 # Find compatible runs 

234 compatible_runs = ( 

235 session.query(BenchmarkRun) 

236 .filter(BenchmarkRun.config_hash == config_hash) 

237 .filter(BenchmarkRun.status == BenchmarkStatus.COMPLETED) 

238 .all() 

239 ) 

240 

241 existing_results = {} 

242 for run in compatible_runs: 

243 results = ( 

244 session.query(BenchmarkResult) 

245 .filter(BenchmarkResult.benchmark_run_id == run.id) 

246 .filter( 

247 BenchmarkResult.is_correct.isnot(None) 

248 ) # Only completed evaluations 

249 .all() 

250 ) 

251 

252 for result in results: 

253 existing_results[result.query_hash] = { 

254 "id": result.example_id, 

255 "dataset_type": result.dataset_type.value, 

256 "problem": result.question, 

257 "correct_answer": result.correct_answer, 

258 "response": result.response, 

259 "extracted_answer": result.extracted_answer, 

260 "confidence": result.confidence, 

261 "processing_time": result.processing_time, 

262 "sources": result.sources, 

263 "is_correct": result.is_correct, 

264 "graded_confidence": result.graded_confidence, 

265 "grader_response": result.grader_response, 

266 "query_hash": result.query_hash, 

267 } 

268 

269 logger.info( 

270 f"Found {len(existing_results)} existing results for config hash {config_hash}" 

271 ) 

272 return existing_results 

273 

274 except Exception: 

275 logger.exception("Error loading existing results") 

276 return {} 

277 

278 def start_benchmark( 

279 self, 

280 benchmark_run_id: int, 

281 username: Optional[str] = None, 

282 user_password: Optional[str] = None, 

283 ) -> bool: 

284 """Start a benchmark run in a background thread.""" 

285 from ...database.session_context import get_user_db_session 

286 

287 try: 

288 # Get all data from the database in the main thread 

289 # This avoids database access from the background thread 

290 with get_user_db_session(username, user_password) as session: 

291 # Get benchmark run details 

292 benchmark_run = ( 

293 session.query(BenchmarkRun) 

294 .filter(BenchmarkRun.id == benchmark_run_id) 

295 .first() 

296 ) 

297 if not benchmark_run: 

298 raise ValueError( # noqa: TRY301 — caught by except, sets FAILED status in DB 

299 f"Benchmark run {benchmark_run_id} not found" 

300 ) 

301 

302 # Create settings snapshot for thread safety 

303 from local_deep_research.settings import SettingsManager 

304 

305 settings_manager = SettingsManager(session) 

306 settings_snapshot = settings_manager.get_all_settings() 

307 

308 # Get user password for metrics tracking in background thread 

309 from flask import session as flask_session 

310 from ...database.session_passwords import session_password_store 

311 

312 _user_password = None 

313 session_id = flask_session.get("session_id") 

314 if session_id and username: 314 ↛ 326line 314 didn't jump to line 326 because the condition on line 314 was always true

315 _user_password = ( 

316 session_password_store.get_session_password( 

317 username, session_id 

318 ) 

319 ) 

320 if not _user_password: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 logger.warning( 

322 f"No password found for user {username} in current session" 

323 ) 

324 

325 # Extract all data we need 

326 benchmark_data = { 

327 "benchmark_run_id": benchmark_run_id, 

328 "username": username or "benchmark_user", 

329 "user_password": _user_password, # Add password for metrics tracking 

330 "config_hash": benchmark_run.config_hash, 

331 "datasets_config": benchmark_run.datasets_config, 

332 "search_config": benchmark_run.search_config, 

333 "evaluation_config": benchmark_run.evaluation_config, 

334 "existing_results": self.get_existing_results( 

335 benchmark_run.config_hash, username, user_password 

336 ), 

337 "settings_snapshot": settings_snapshot, # Add settings snapshot 

338 } 

339 

340 # Update status in database 

341 benchmark_run.status = BenchmarkStatus.IN_PROGRESS 

342 benchmark_run.start_time = datetime.now(UTC) 

343 session.commit() 

344 

345 # Store data in memory for the thread 

346 self.active_runs[benchmark_run_id] = { 

347 "data": benchmark_data, 

348 "start_time": datetime.now(UTC), 

349 "status": "running", 

350 "results": [], 

351 } 

352 

353 # Start background thread 

354 thread = threading.Thread( 

355 target=self._run_benchmark_thread, 

356 args=(benchmark_run_id,), 

357 daemon=True, 

358 ) 

359 thread.start() 

360 

361 self.active_runs[benchmark_run_id]["thread"] = thread 

362 

363 logger.info(f"Started benchmark run {benchmark_run_id}") 

364 return True 

365 

366 except Exception as e: 

367 logger.exception(f"Error starting benchmark {benchmark_run_id}") 

368 # If we populated active_runs before the spawn failed, drop the 

369 # stale entry — it has no thread and would mislead subsequent 

370 # cancel_benchmark / get_run_status calls. 

371 self.active_runs.pop(benchmark_run_id, None) 

372 # Update status using user database 

373 with get_user_db_session(username, user_password) as session: 

374 benchmark_run = ( 

375 session.query(BenchmarkRun) 

376 .filter(BenchmarkRun.id == benchmark_run_id) 

377 .first() 

378 ) 

379 if benchmark_run: 

380 benchmark_run.status = BenchmarkStatus.FAILED 

381 benchmark_run.error_message = str(e) 

382 session.commit() 

383 return False 

384 

385 @thread_cleanup 

386 def _run_benchmark_thread(self, benchmark_run_id: int): 

387 """Main benchmark execution thread.""" 

388 # IMPORTANT: This runs in a background thread, so we cannot access the user database 

389 # Using in-memory queue tracker for benchmark status tracking 

390 

391 task_id = None 

392 

393 # Get the benchmark data that was passed to us 

394 # We need to retrieve this from the service database or from memory 

395 benchmark_data = self.active_runs.get(benchmark_run_id, {}).get("data") 

396 

397 try: 

398 if not benchmark_data: 

399 raise ValueError( # noqa: TRY301 

400 f"Benchmark data for run {benchmark_run_id} not found" 

401 ) 

402 # Set up settings context for thread-local access 

403 settings_snapshot = benchmark_data.get("settings_snapshot", {}) 

404 username = benchmark_data.get("username", "benchmark_user") 

405 

406 # Create a settings context that threads can use 

407 settings_context = SnapshotSettingsContext( 

408 settings_snapshot, 

409 username=username, 

410 missing_key_log_level="WARNING", 

411 ) 

412 

413 # Set the context in thread-local storage 

414 from ...config.thread_settings import set_settings_context 

415 

416 set_settings_context(settings_context) 

417 

418 # Extract all the data we need 

419 datasets_config = benchmark_data["datasets_config"] 

420 search_config = benchmark_data["search_config"] 

421 evaluation_config = benchmark_data["evaluation_config"] 

422 existing_results = benchmark_data.get("existing_results", {}) 

423 

424 # Create task queue 

425 task_queue = self._create_task_queue( 

426 datasets_config, 

427 existing_results, 

428 benchmark_run_id, 

429 ) 

430 

431 # Calculate totals 

432 total_examples = len(task_queue) + len(existing_results) 

433 completed_examples = len(existing_results) 

434 

435 # Initialize task tracking 

436 task_id = f"benchmark_{benchmark_run_id}_{int(datetime.now(UTC).timestamp())}" 

437 username = benchmark_data.get("username", "benchmark_user") 

438 self.queue_tracker.add_task(task_id, username, "benchmark") 

439 self.queue_tracker.update_task_status( 

440 task_id, BenchmarkTaskStatus.PROCESSING 

441 ) 

442 

443 # Track progress in memory 

444 progress_info = { 

445 "total_examples": total_examples, 

446 "completed_examples": completed_examples, 

447 "failed_examples": 0, 

448 "start_time": datetime.now(UTC), 

449 } 

450 

451 # Process tasks 

452 logger.info( 

453 f"Benchmark {benchmark_run_id} starting to process {len(task_queue)} tasks" 

454 ) 

455 for i, task in enumerate(task_queue): 

456 # Check if benchmark has been cancelled 

457 if ( 

458 benchmark_run_id in self.active_runs 

459 and self.active_runs[benchmark_run_id].get("status") 

460 == "cancelled" 

461 ): 

462 logger.info( 

463 f"Benchmark {benchmark_run_id} was cancelled, stopping processing" 

464 ) 

465 break 

466 

467 logger.info( 

468 f"Benchmark {benchmark_run_id} processing task {i + 1}/{len(task_queue)}" 

469 ) 

470 try: 

471 # Add username and password to task for metrics tracking 

472 task["username"] = benchmark_data.get("username") 

473 task["user_password"] = benchmark_data.get("user_password") 

474 

475 # Acquire the global research semaphore so benchmark 

476 # tasks count against the server-wide concurrency limit 

477 _global_research_semaphore.acquire() 

478 try: 

479 # Process single task 

480 result = self._process_benchmark_task( 

481 task, 

482 search_config, 

483 evaluation_config, 

484 ) 

485 finally: 

486 _global_research_semaphore.release() 

487 

488 # Store result in memory for now (will be saved later) 

489 if "results" not in self.active_runs[benchmark_run_id]: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true

490 self.active_runs[benchmark_run_id]["results"] = [] 

491 self.active_runs[benchmark_run_id]["results"].append(result) 

492 

493 # Update progress 

494 progress_info["completed_examples"] += 1 

495 

496 logger.info( 

497 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} completed successfully. " 

498 f"Progress: {progress_info['completed_examples']}/{progress_info['total_examples']} total examples" 

499 ) 

500 

501 # Send real-time update 

502 self._send_progress_update( 

503 benchmark_run_id, 

504 progress_info["completed_examples"], 

505 progress_info["total_examples"], 

506 ) 

507 

508 except Exception as e: 

509 logger.exception(f"Error processing task {i}") 

510 progress_info["failed_examples"] += 1 

511 logger.info( 

512 f"Benchmark {benchmark_run_id} task {i + 1}/{len(task_queue)} failed. " 

513 f"Total failed: {progress_info['failed_examples']}" 

514 ) 

515 

516 # Check if this is a rate limiting error 

517 error_str = str(e).lower() 

518 if ( 518 ↛ 455line 518 didn't jump to line 455 because the condition on line 518 was always true

519 "403" in error_str 

520 or "rate limit" in error_str 

521 or "forbidden" in error_str 

522 ): 

523 self.rate_limit_detected[benchmark_run_id] = True 

524 # Send rate limit warning via WebSocket 

525 self.socket_service.emit_to_subscribers( 

526 "research_progress", 

527 benchmark_run_id, 

528 { 

529 "rate_limit_detected": True, 

530 "message": "SearXNG rate limiting detected", 

531 }, 

532 ) 

533 

534 # Mark as completed in memory tracker 

535 progress_info["end_time"] = datetime.now(UTC) 

536 

537 # Check if benchmark was cancelled 

538 was_cancelled = ( 

539 benchmark_run_id in self.active_runs 

540 and self.active_runs[benchmark_run_id].get("status") 

541 == "cancelled" 

542 ) 

543 

544 if was_cancelled: 

545 status = BenchmarkStatus.CANCELLED 

546 message = "Benchmark cancelled by user" 

547 if task_id: 547 ↛ 560line 547 didn't jump to line 560 because the condition on line 547 was always true

548 self.queue_tracker.update_task_status( 

549 task_id, BenchmarkTaskStatus.CANCELLED 

550 ) 

551 else: 

552 status = BenchmarkStatus.COMPLETED 

553 message = "Benchmark completed successfully" 

554 if task_id: 554 ↛ 560line 554 didn't jump to line 560 because the condition on line 554 was always true

555 self.queue_tracker.update_task_status( 

556 task_id, BenchmarkTaskStatus.COMPLETED 

557 ) 

558 

559 # Store completion info for later database update 

560 self.active_runs[benchmark_run_id]["completion_info"] = { 

561 "status": status, 

562 "end_time": progress_info["end_time"], 

563 "completed_examples": progress_info["completed_examples"], 

564 "failed_examples": progress_info["failed_examples"], 

565 } 

566 

567 # Send completion notification 

568 self.socket_service.emit_to_subscribers( 

569 "research_progress", 

570 benchmark_run_id, 

571 { 

572 "status": "cancelled" if was_cancelled else "completed", 

573 "message": message, 

574 "progress": ( 

575 progress_info["completed_examples"] 

576 / progress_info["total_examples"] 

577 * 100 

578 ) 

579 if progress_info["total_examples"] > 0 

580 else 0, 

581 "benchmark_run_id": benchmark_run_id, 

582 }, 

583 ) 

584 

585 except Exception as e: 

586 logger.exception(f"Benchmark run {benchmark_run_id} failed") 

587 # Update task status if we have a task_id 

588 if task_id: 588 ↛ 589line 588 didn't jump to line 589 because the condition on line 588 was never true

589 self.queue_tracker.update_task_status( 

590 task_id, BenchmarkTaskStatus.FAILED 

591 ) 

592 # Store failure info for later database update 

593 if benchmark_run_id in self.active_runs: 593 ↛ 600line 593 didn't jump to line 600 because the condition on line 593 was always true

594 self.active_runs[benchmark_run_id]["completion_info"] = { 

595 "status": BenchmarkStatus.FAILED, 

596 "error_message": str(e), 

597 } 

598 finally: 

599 # Clean up active run tracking 

600 if benchmark_run_id in self.active_runs: 600 ↛ exitline 600 didn't return from function '_run_benchmark_thread' because the condition on line 600 was always true

601 # Mark that thread is done but keep data for database update 

602 self.active_runs[benchmark_run_id]["thread_complete"] = True 

603 

604 # Try to save results to database immediately if possible 

605 self._sync_results_to_database(benchmark_run_id) 

606 

607 def _create_task_queue( 

608 self, 

609 datasets_config: Dict, 

610 existing_results: Dict, 

611 benchmark_run_id: int, 

612 ) -> List[Dict]: 

613 """Create list of tasks to process, excluding existing results.""" 

614 tasks: List[Dict[str, Any]] = [] 

615 

616 for dataset_name, config in datasets_config.items(): 

617 if config.get("count", 0) > 0: 

618 dataset = load_dataset( 

619 dataset_type=dataset_name, 

620 num_examples=config["count"], 

621 seed=None, 

622 ) 

623 

624 for i, example in enumerate(dataset): 

625 # Extract question based on dataset type 

626 if dataset_name.lower() == "simpleqa": 

627 question = example.get("problem", "") 

628 correct_answer = example.get("answer", "") 

629 else: # browsecomp 

630 question = example.get("problem", "") 

631 correct_answer = example.get("answer", "") 

632 

633 # Generate query hash 

634 query_hash = self.generate_query_hash( 

635 question, dataset_name 

636 ) 

637 

638 # Skip if already processed 

639 if query_hash in existing_results: 

640 continue 

641 

642 tasks.append( 

643 { 

644 "benchmark_run_id": benchmark_run_id, 

645 "example_id": example.get("id", f"example_{i}"), 

646 "dataset_type": dataset_name, 

647 "question": question, 

648 "correct_answer": correct_answer, 

649 "query_hash": query_hash, 

650 "task_index": len(tasks), 

651 } 

652 ) 

653 

654 return tasks 

655 

656 def _process_benchmark_task( 

657 self, task: Dict, search_config: Dict, evaluation_config: Dict 

658 ) -> Dict: 

659 """Process a single benchmark task.""" 

660 try: 

661 logger.info( 

662 f"Starting benchmark task {task['task_index'] + 1}: " 

663 f"example_id={task['example_id']}, dataset={task['dataset_type']}, " 

664 f"question_preview='{task['question'][:100]}...'" 

665 ) 

666 

667 # Get settings context from thread-local storage 

668 from ...config.thread_settings import get_settings_context 

669 

670 settings_context = get_settings_context() 

671 

672 # Generate a unique tracking ID for this benchmark task 

673 import uuid 

674 

675 tracking_id = str(uuid.uuid4()) 

676 logger.info( 

677 f"Task {task['example_id']} assigned tracking_id: {tracking_id}" 

678 ) 

679 

680 # Format query 

681 formatted_query = format_query( 

682 task["question"], task["dataset_type"] 

683 ) 

684 logger.info( 

685 f"Task {task['example_id']} formatted query: '{formatted_query[:150]}...'" 

686 ) 

687 

688 # Run research with progress callback for WebSocket updates 

689 start_time = time.time() 

690 logger.info(f"Task {task['example_id']} starting research phase...") 

691 

692 def benchmark_progress_callback( 

693 status: str, progress: int, data: dict 

694 ): 

695 """Progress callback to emit detailed research progress via WebSocket""" 

696 try: 

697 timestamp = datetime.now(UTC).isoformat() 

698 

699 # Create research-compatible log entry 

700 log_entry = { 

701 "time": timestamp, 

702 "message": f"Example {task['example_id']}: {status}", 

703 "progress": progress, 

704 "metadata": { 

705 "phase": data.get("phase", "benchmark_processing"), 

706 "type": data.get("type", "info"), 

707 "example_id": task["example_id"], 

708 "benchmark_run_id": task["benchmark_run_id"], 

709 **data, # Include all other data 

710 }, 

711 } 

712 

713 # Determine log type based on status/message content 

714 if ( 

715 "complete" in status.lower() 

716 or "finished" in status.lower() 

717 ): 

718 log_entry["metadata"]["type"] = "milestone" 

719 elif ( 

720 "error" in status.lower() or "failed" in status.lower() 

721 ): 

722 log_entry["metadata"]["type"] = "error" 

723 elif ( 

724 "starting" in status.lower() 

725 or "begin" in status.lower() 

726 ): 

727 log_entry["metadata"]["type"] = "milestone" 

728 

729 # Create progress data in research format 

730 progress_data = { 

731 "progress": progress, 

732 "message": status, 

733 "status": "in_progress", 

734 "log_entry": log_entry, 

735 "progress_log": json.dumps( 

736 [log_entry] 

737 ), # Array format expected by socket.js 

738 } 

739 

740 # Emit using research_progress format that the UI expects 

741 self.socket_service.emit_to_subscribers( 

742 "research_progress", 

743 task["benchmark_run_id"], 

744 progress_data, 

745 ) 

746 

747 except Exception: 

748 logger.exception("Error sending benchmark progress update") 

749 

750 # Get user password from task data 

751 user_password = task.get("user_password") 

752 

753 search_result = quick_summary( 

754 query=formatted_query, 

755 research_id=tracking_id, # Pass the tracking ID 

756 iterations=search_config.get("iterations", 8), 

757 questions_per_iteration=search_config.get( 

758 "questions_per_iteration", 5 

759 ), 

760 search_tool=search_config.get("search_tool", "searxng"), 

761 search_strategy=search_config.get( 

762 "search_strategy", "focused_iteration" 

763 ), 

764 progress_callback=benchmark_progress_callback, 

765 model_name=search_config.get("model_name"), 

766 provider=search_config.get("provider"), 

767 temperature=search_config.get("temperature", 0.7), 

768 openai_endpoint_url=search_config.get("openai_endpoint_url"), 

769 settings_snapshot=settings_context.snapshot, # Pass settings snapshot for thread safety 

770 username=task.get("username"), # Pass username 

771 user_password=user_password, # Pass password for metrics tracking 

772 # The web benchmark runs against the user's encrypted DB 

773 # (it has username/password and wants search metrics 

774 # persisted). quick_summary's default is programmatic_mode=True 

775 # for true library callers; override here so the engine's 

776 # metrics path stays active. 

777 programmatic_mode=False, 

778 ) 

779 processing_time = time.time() - start_time 

780 logger.info( 

781 f"Task {task['example_id']} research completed in {processing_time:.2f}s, " 

782 f"model={search_config.get('model_name')}, provider={search_config.get('provider')}" 

783 ) 

784 

785 # Extract answer 

786 response = search_result.get("summary", "") 

787 logger.info( 

788 f"Task {task['example_id']} response length: {len(response)} chars" 

789 ) 

790 

791 extracted_data = extract_answer_from_response( 

792 response, task["dataset_type"] 

793 ) 

794 extracted_answer = ( 

795 extracted_data.get("extracted_answer", "") 

796 if isinstance(extracted_data, dict) 

797 else str(extracted_data) 

798 ) 

799 logger.info( 

800 f"Task {task['example_id']} extracted answer: '{extracted_answer[:100]}...'" 

801 ) 

802 

803 # Extract sources - handle both direct sources and all_links_of_system 

804 sources = search_result.get("sources", []) 

805 if not sources and "all_links_of_system" in search_result: 

806 sources = search_result.get("all_links_of_system", []) 

807 

808 # Log for debugging 

809 logger.debug(f"Search result keys: {list(search_result.keys())}") 

810 logger.debug(f"Sources found: {len(sources)} items") 

811 

812 # Prepare result 

813 result = { 

814 **task, 

815 "response": response, 

816 "extracted_answer": extracted_answer, 

817 "confidence": str( 

818 extracted_data.get("confidence", "100") 

819 if isinstance(extracted_data, dict) 

820 else "100" 

821 ), 

822 "processing_time": processing_time, 

823 "sources": json.dumps(sources), # Convert to JSON string 

824 "completed_at": datetime.now(UTC), 

825 "research_id": tracking_id, # Store the UUID in the research_id field 

826 } 

827 

828 # Evaluate result - requires proper grading model 

829 try: 

830 logger.info(f"Task {task['example_id']} starting evaluation...") 

831 eval_start_time = time.time() 

832 

833 # Always attempt evaluation, regardless of provider 

834 # Modern local models like Ollama are capable of grading 

835 # Try to evaluate with proper model 

836 result_data = { 

837 "id": task["example_id"], 

838 "problem": task["question"], 

839 "correct_answer": task["correct_answer"], 

840 "response": response, 

841 "extracted_answer": extracted_answer, 

842 } 

843 

844 eval_result = grade_single_result( 

845 result_data, 

846 task["dataset_type"], 

847 evaluation_config, 

848 settings_context.snapshot, 

849 ) 

850 eval_time = time.time() - eval_start_time 

851 logger.info( 

852 f"Task {task['example_id']} evaluation completed in {eval_time:.2f}s" 

853 ) 

854 if eval_result and not eval_result.get("grading_error"): 

855 result.update( 

856 { 

857 "is_correct": eval_result.get("is_correct", False), 

858 "graded_confidence": eval_result.get( 

859 "graded_confidence", "0" 

860 ), 

861 "grader_response": eval_result.get( 

862 "grader_response", "" 

863 ), 

864 } 

865 ) 

866 else: 

867 error_msg = ( 

868 eval_result.get( 

869 "grading_error", "Unknown evaluation error" 

870 ) 

871 if eval_result 

872 else "No evaluation results returned" 

873 ) 

874 result.update( 

875 { 

876 "is_correct": None, 

877 "graded_confidence": "0", 

878 "grader_response": f"Evaluation failed: {error_msg}", 

879 "evaluation_error": error_msg, 

880 } 

881 ) 

882 

883 except Exception as e: 

884 logger.exception("Evaluation error") 

885 result.update( 

886 { 

887 "is_correct": None, 

888 "graded_confidence": "0", 

889 "grader_response": f"Evaluation failed: {e!s}", 

890 "evaluation_error": str(e), 

891 } 

892 ) 

893 

894 return result 

895 

896 except Exception as e: 

897 logger.exception("Research error") 

898 return { 

899 **task, 

900 "research_error": str(e), 

901 "completed_at": datetime.now(UTC), 

902 } 

903 

904 def sync_pending_results( 

905 self, benchmark_run_id: int, username: Optional[str] = None 

906 ): 

907 """Sync any pending results to database. Can be called from main thread.""" 

908 if benchmark_run_id not in self.active_runs: 

909 return 0 

910 

911 run_data = self.active_runs[benchmark_run_id] 

912 results_to_save = run_data.get("results", []) 

913 saved_indices = run_data.get("saved_indices", set()) 

914 

915 if not username: 

916 username = run_data.get("data", {}).get("username") 

917 

918 user_password = run_data.get("data", {}).get("user_password") 

919 

920 saved_count = 0 

921 from ...database.session_context import get_user_db_session 

922 from ...database.models.benchmark import BenchmarkResult 

923 

924 try: 

925 with get_user_db_session(username, user_password) as session: 

926 # Save any results that haven't been saved yet 

927 for idx, result in enumerate(results_to_save): 

928 if idx not in saved_indices: 

929 # Check if this result already exists in the database 

930 existing = ( 

931 session.query(BenchmarkResult) 

932 .filter_by( 

933 benchmark_run_id=benchmark_run_id, 

934 query_hash=result["query_hash"], 

935 ) 

936 .first() 

937 ) 

938 

939 if existing: 

940 # Skip if already exists 

941 saved_indices.add(idx) 

942 continue 

943 

944 benchmark_result = BenchmarkResult( 

945 benchmark_run_id=benchmark_run_id, 

946 example_id=result["example_id"], 

947 query_hash=result["query_hash"], 

948 dataset_type=DatasetType(result["dataset_type"]), 

949 research_id=result.get("research_id"), 

950 question=result["question"], 

951 correct_answer=result["correct_answer"], 

952 response=result.get("response"), 

953 extracted_answer=result.get("extracted_answer"), 

954 confidence=result.get("confidence"), 

955 processing_time=result.get("processing_time"), 

956 sources=result.get("sources"), 

957 is_correct=result.get("is_correct"), 

958 graded_confidence=result.get("graded_confidence"), 

959 grader_response=result.get("grader_response"), 

960 completed_at=result.get("completed_at"), 

961 research_error=result.get("research_error"), 

962 evaluation_error=result.get("evaluation_error"), 

963 task_index=result.get("task_index"), 

964 ) 

965 session.add(benchmark_result) 

966 saved_indices.add(idx) 

967 saved_count += 1 

968 

969 if saved_count > 0: 

970 session.commit() 

971 run_data["saved_indices"] = saved_indices 

972 logger.info( 

973 f"Saved {saved_count} new results for benchmark {benchmark_run_id}" 

974 ) 

975 

976 except Exception: 

977 logger.exception( 

978 f"Error syncing pending results for benchmark {benchmark_run_id}" 

979 ) 

980 # Roll back the session on error to prevent PendingRollbackError 

981 try: 

982 session.rollback() 

983 except Exception: 

984 logger.debug("Failed to rollback session after sync error") 

985 

986 return saved_count 

987 

988 def _sync_results_to_database(self, benchmark_run_id: int): 

989 """Sync benchmark results from memory to database after thread completes.""" 

990 if benchmark_run_id not in self.active_runs: 

991 return 

992 

993 run_data = self.active_runs[benchmark_run_id] 

994 if not run_data.get("thread_complete"): 

995 return 

996 

997 username = run_data.get("data", {}).get("username") 

998 user_password = run_data.get("data", {}).get("user_password") 

999 from ...database.session_context import get_user_db_session 

1000 

1001 try: 

1002 with get_user_db_session(username, user_password) as session: 

1003 # Update benchmark run status 

1004 benchmark_run = ( 

1005 session.query(BenchmarkRun) 

1006 .filter(BenchmarkRun.id == benchmark_run_id) 

1007 .first() 

1008 ) 

1009 

1010 if benchmark_run and "completion_info" in run_data: 1010 ↛ 1086line 1010 didn't jump to line 1086

1011 info = run_data["completion_info"] 

1012 benchmark_run.status = info["status"] 

1013 benchmark_run.end_time = info.get( 

1014 "end_time", datetime.now(UTC) 

1015 ) 

1016 benchmark_run.completed_examples = info.get( 

1017 "completed_examples", 0 

1018 ) 

1019 benchmark_run.failed_examples = info.get( 

1020 "failed_examples", 0 

1021 ) 

1022 benchmark_run.error_message = info.get("error_message") 

1023 

1024 # Save all results (skip already saved ones) 

1025 saved_indices = run_data.get("saved_indices", set()) 

1026 for idx, result in enumerate(run_data.get("results", [])): 

1027 if idx in saved_indices: 

1028 continue 

1029 benchmark_result = BenchmarkResult( 

1030 benchmark_run_id=benchmark_run_id, 

1031 example_id=result["example_id"], 

1032 query_hash=result["query_hash"], 

1033 dataset_type=DatasetType(result["dataset_type"]), 

1034 research_id=result.get("research_id"), 

1035 question=result["question"], 

1036 correct_answer=result["correct_answer"], 

1037 response=result.get("response"), 

1038 extracted_answer=result.get("extracted_answer"), 

1039 confidence=result.get("confidence"), 

1040 processing_time=result.get("processing_time"), 

1041 sources=result.get("sources"), 

1042 is_correct=result.get("is_correct"), 

1043 graded_confidence=result.get("graded_confidence"), 

1044 grader_response=result.get("grader_response"), 

1045 completed_at=result.get("completed_at"), 

1046 research_error=result.get("research_error"), 

1047 evaluation_error=result.get("evaluation_error"), 

1048 task_index=result.get("task_index"), 

1049 ) 

1050 session.add(benchmark_result) 

1051 

1052 # Calculate final accuracy 

1053 if benchmark_run.status == BenchmarkStatus.COMPLETED: 

1054 correct_results = [ 

1055 r 

1056 for r in run_data.get("results", []) 

1057 if r.get("is_correct") 

1058 ] 

1059 evaluated_results = [ 

1060 r 

1061 for r in run_data.get("results", []) 

1062 if r.get("is_correct") is not None 

1063 ] 

1064 

1065 if evaluated_results: 1065 ↛ 1080line 1065 didn't jump to line 1080 because the condition on line 1065 was always true

1066 benchmark_run.overall_accuracy = ( 

1067 len(correct_results) / len(evaluated_results) 

1068 ) * 100 

1069 

1070 # Calculate processing rate 

1071 total_time = sum( 

1072 r.get("processing_time", 0) 

1073 for r in evaluated_results 

1074 ) 

1075 if total_time > 0: 1075 ↛ 1080line 1075 didn't jump to line 1080 because the condition on line 1075 was always true

1076 benchmark_run.processing_rate = len( 

1077 evaluated_results 

1078 ) / (total_time / 60) 

1079 

1080 session.commit() 

1081 logger.info( 

1082 f"Successfully synced results for benchmark {benchmark_run_id}" 

1083 ) 

1084 

1085 # Clean up memory 

1086 del self.active_runs[benchmark_run_id] 

1087 

1088 except Exception: 

1089 logger.exception("Error syncing benchmark results to database") 

1090 

1091 def _send_progress_update( 

1092 self, benchmark_run_id: int, completed: int, total: int 

1093 ): 

1094 """Send real-time progress update via websocket.""" 

1095 try: 

1096 percentage = (completed / total * 100) if total > 0 else 0 

1097 

1098 # Create log entry for milestone progress 

1099 log_entry = { 

1100 "time": datetime.now(UTC).isoformat(), 

1101 "message": f"Completed {completed}/{total} examples ({percentage:.1f}%)", 

1102 "progress": percentage, 

1103 "metadata": { 

1104 "phase": "benchmark_progress", 

1105 "type": "milestone", 

1106 "completed": completed, 

1107 "total": total, 

1108 "benchmark_run_id": benchmark_run_id, 

1109 }, 

1110 } 

1111 

1112 progress_data = { 

1113 "status": "in_progress", 

1114 "message": f"Processing examples: {completed}/{total}", 

1115 "progress": percentage, 

1116 "completed": completed, 

1117 "total": total, 

1118 "benchmark_run_id": benchmark_run_id, 

1119 "log_entry": log_entry, 

1120 "progress_log": json.dumps([log_entry]), 

1121 } 

1122 

1123 self.socket_service.emit_to_subscribers( 

1124 "research_progress", benchmark_run_id, progress_data 

1125 ) 

1126 

1127 except Exception: 

1128 logger.exception("Error sending progress update") 

1129 

1130 def _calculate_final_accuracy( 

1131 self, benchmark_run_id: int, username: Optional[str] = None 

1132 ): 

1133 """Calculate and save final accuracy metrics.""" 

1134 from ...database.session_context import get_user_db_session 

1135 

1136 with get_user_db_session(username) as session: 

1137 try: 

1138 # Get all results for this run 

1139 results = ( 

1140 session.query(BenchmarkResult) 

1141 .filter( 

1142 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1143 ) 

1144 .filter(BenchmarkResult.is_correct.isnot(None)) 

1145 .all() 

1146 ) 

1147 

1148 if results: 

1149 correct_count = sum(1 for r in results if r.is_correct) 

1150 overall_accuracy = (correct_count / len(results)) * 100 

1151 

1152 # Calculate processing rate 

1153 total_time = sum(r.processing_time or 0 for r in results) 

1154 processing_rate = ( 

1155 (len(results) / (total_time / 60)) 

1156 if total_time > 0 

1157 else 0 

1158 ) 

1159 

1160 # Update benchmark run 

1161 benchmark_run = ( 

1162 session.query(BenchmarkRun) 

1163 .filter(BenchmarkRun.id == benchmark_run_id) 

1164 .first() 

1165 ) 

1166 if benchmark_run: 1166 ↛ exitline 1166 didn't jump to the function exit

1167 benchmark_run.overall_accuracy = overall_accuracy 

1168 benchmark_run.processing_rate = processing_rate 

1169 session.commit() 

1170 

1171 except Exception: 

1172 logger.exception("Error calculating final accuracy") 

1173 

1174 def update_benchmark_status( 

1175 self, 

1176 benchmark_run_id: int, 

1177 status: BenchmarkStatus, 

1178 error_message: Optional[str] = None, 

1179 username: Optional[str] = None, 

1180 ): 

1181 """Update benchmark run status.""" 

1182 from ...database.session_context import get_user_db_session 

1183 

1184 with get_user_db_session(username) as session: 

1185 try: 

1186 benchmark_run = ( 

1187 session.query(BenchmarkRun) 

1188 .filter(BenchmarkRun.id == benchmark_run_id) 

1189 .first() 

1190 ) 

1191 if benchmark_run: 

1192 benchmark_run.status = status 

1193 benchmark_run.updated_at = datetime.now(UTC) 

1194 

1195 if error_message: 

1196 benchmark_run.error_message = error_message 

1197 

1198 if ( 

1199 status == BenchmarkStatus.IN_PROGRESS 

1200 and not benchmark_run.start_time 

1201 ): 

1202 benchmark_run.start_time = datetime.now(UTC) 

1203 elif ( 1203 ↛ 1210line 1203 didn't jump to line 1210 because the condition on line 1203 was always true

1204 status 

1205 in [BenchmarkStatus.COMPLETED, BenchmarkStatus.FAILED] 

1206 and not benchmark_run.end_time 

1207 ): 

1208 benchmark_run.end_time = datetime.now(UTC) 

1209 

1210 session.commit() 

1211 

1212 except Exception: 

1213 session.rollback() 

1214 logger.exception("Error updating benchmark status") 

1215 

1216 def get_benchmark_status( 

1217 self, benchmark_run_id: int, username: str = None 

1218 ) -> Optional[Dict]: 

1219 """Get current status of a benchmark run.""" 

1220 from ...database.session_context import get_user_db_session 

1221 

1222 with get_user_db_session(username) as session: 

1223 try: 

1224 benchmark_run = ( 

1225 session.query(BenchmarkRun) 

1226 .filter(BenchmarkRun.id == benchmark_run_id) 

1227 .first() 

1228 ) 

1229 if not benchmark_run: 

1230 return None 

1231 

1232 # Calculate running accuracy from current results AND reused results from compatible runs 

1233 # First get results specifically for this benchmark run 

1234 current_results = ( 

1235 session.query(BenchmarkResult) 

1236 .filter( 

1237 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1238 ) 

1239 .filter(BenchmarkResult.is_correct.isnot(None)) 

1240 .all() 

1241 ) 

1242 

1243 # Then get reused results from compatible benchmark runs (same config hash) 

1244 # Only count results up to the number we say we've "completed" 

1245 if benchmark_run.completed_examples > len(current_results): 1245 ↛ 1247line 1245 didn't jump to line 1247 because the condition on line 1245 was never true

1246 # We have reused results, get them from compatible runs 

1247 reused_count_needed = ( 

1248 benchmark_run.completed_examples - len(current_results) 

1249 ) 

1250 

1251 compatible_results = ( 

1252 session.query(BenchmarkResult) 

1253 .join( 

1254 BenchmarkRun, 

1255 BenchmarkResult.benchmark_run_id == BenchmarkRun.id, 

1256 ) 

1257 .filter( 

1258 BenchmarkRun.config_hash 

1259 == benchmark_run.config_hash 

1260 ) 

1261 .filter( 

1262 BenchmarkRun.id != benchmark_run_id 

1263 ) # Exclude current run 

1264 .filter( 

1265 BenchmarkRun.status == BenchmarkStatus.COMPLETED 

1266 ) 

1267 .filter(BenchmarkResult.is_correct.isnot(None)) 

1268 .order_by(BenchmarkResult.id) # Consistent ordering 

1269 .limit(reused_count_needed) 

1270 .all() 

1271 ) 

1272 

1273 # Combine current and reused results 

1274 results = ( 

1275 current_results 

1276 + compatible_results[:reused_count_needed] 

1277 ) 

1278 else: 

1279 # No reused results, just use current results 

1280 results = current_results 

1281 

1282 running_accuracy = None 

1283 # Dynamic per-dataset accuracy tracking 

1284 dataset_accuracies = {} 

1285 

1286 if results: 1286 ↛ 1318line 1286 didn't jump to line 1318 because the condition on line 1286 was always true

1287 # Overall running accuracy 

1288 correct_count = sum(1 for r in results if r.is_correct) 

1289 running_accuracy = (correct_count / len(results)) * 100 

1290 

1291 # Calculate accuracy for each dataset type dynamically 

1292 from collections import defaultdict 

1293 

1294 dataset_results = defaultdict(list) 

1295 

1296 # Group results by dataset type 

1297 for r in results: 

1298 dataset_results[r.dataset_type.value].append(r) 

1299 

1300 # Calculate accuracy for each dataset 

1301 for ( 

1302 dataset_type, 

1303 dataset_result_list, 

1304 ) in dataset_results.items(): 

1305 if dataset_result_list: 1305 ↛ 1301line 1305 didn't jump to line 1301 because the condition on line 1305 was always true

1306 correct = sum( 

1307 1 for r in dataset_result_list if r.is_correct 

1308 ) 

1309 accuracy = ( 

1310 correct / len(dataset_result_list) 

1311 ) * 100 

1312 # Store with _accuracy suffix for consistency 

1313 dataset_accuracies[f"{dataset_type}_accuracy"] = ( 

1314 accuracy 

1315 ) 

1316 

1317 # Calculate time estimates and reliability metrics 

1318 estimated_time_remaining = None 

1319 total_elapsed_time = None 

1320 avg_time_per_example = None 

1321 accuracy_confidence = None 

1322 

1323 # Get ALL results for timing calculation (including those pending evaluation) 

1324 all_results_for_timing = ( 

1325 session.query(BenchmarkResult) 

1326 .filter( 

1327 BenchmarkResult.benchmark_run_id == benchmark_run_id 

1328 ) 

1329 .all() 

1330 ) 

1331 

1332 if benchmark_run.start_time and all_results_for_timing: 1332 ↛ 1366line 1332 didn't jump to line 1366 because the condition on line 1332 was always true

1333 # Calculate elapsed time 

1334 current_time = datetime.now(UTC) 

1335 total_elapsed_time = ( 

1336 current_time - benchmark_run.start_time 

1337 ).total_seconds() 

1338 

1339 # Calculate average processing time per example using actual count 

1340 avg_time_per_example = total_elapsed_time / len( 

1341 all_results_for_timing 

1342 ) 

1343 

1344 logger.info( 

1345 f"Time calculation - elapsed: {total_elapsed_time:.2f}s, " 

1346 f"results_count: {len(all_results_for_timing)}, " 

1347 f"avg_per_example: {avg_time_per_example:.2f}s" 

1348 ) 

1349 

1350 # Estimate remaining time 

1351 remaining_examples = benchmark_run.total_examples - len( 

1352 all_results_for_timing 

1353 ) 

1354 if remaining_examples > 0: 1354 ↛ 1366line 1354 didn't jump to line 1366 because the condition on line 1354 was always true

1355 estimated_time_remaining = ( 

1356 avg_time_per_example * remaining_examples 

1357 ) 

1358 logger.info( 

1359 f"Time estimation - total: {benchmark_run.total_examples}, " 

1360 f"completed: {len(all_results_for_timing)}, remaining: {remaining_examples}, " 

1361 f"avg_time: {avg_time_per_example:.2f}s, " 

1362 f"estimated_remaining: {estimated_time_remaining:.2f}s" 

1363 ) 

1364 

1365 # Calculate accuracy confidence interval (Wilson score, 95%) 

1366 if results and len(results) >= 3: 1366 ↛ 1367line 1366 didn't jump to line 1367 because the condition on line 1366 was never true

1367 from local_deep_research.benchmarks.metrics.statistics import ( 

1368 wilson_score_interval, 

1369 ) 

1370 

1371 n = len(results) 

1372 correct = sum(1 for r in results if r.is_correct) 

1373 ci = wilson_score_interval(correct, n) 

1374 accuracy_confidence = { 

1375 "lower_bound": ci["lower"] * 100, 

1376 "upper_bound": ci["upper"] * 100, 

1377 "margin_of_error": ci["margin_of_error"] * 100, 

1378 "sample_size": n, 

1379 } 

1380 

1381 status_data = { 

1382 "id": benchmark_run.id, 

1383 "run_name": benchmark_run.run_name, 

1384 "status": benchmark_run.status.value, 

1385 "completed_examples": len( 

1386 all_results_for_timing 

1387 ), # Use actual count from DB 

1388 "total_examples": benchmark_run.total_examples, 

1389 "failed_examples": benchmark_run.failed_examples, 

1390 "overall_accuracy": benchmark_run.overall_accuracy 

1391 or running_accuracy, # Use running accuracy if final not calculated 

1392 "running_accuracy": running_accuracy, # Current running accuracy 

1393 "processing_rate": benchmark_run.processing_rate, 

1394 "estimated_time_remaining": estimated_time_remaining, # seconds 

1395 "total_elapsed_time": total_elapsed_time, # seconds 

1396 "avg_time_per_example": avg_time_per_example, # seconds 

1397 "accuracy_confidence": accuracy_confidence, # confidence interval 

1398 "created_at": benchmark_run.created_at.isoformat() 

1399 if benchmark_run.created_at 

1400 else None, 

1401 "start_time": benchmark_run.start_time.isoformat() 

1402 if benchmark_run.start_time 

1403 else None, 

1404 "end_time": benchmark_run.end_time.isoformat() 

1405 if benchmark_run.end_time 

1406 else None, 

1407 "error_message": benchmark_run.error_message, 

1408 # Add all per-dataset accuracies dynamically 

1409 **dataset_accuracies, 

1410 } 

1411 

1412 logger.info( 

1413 f"Benchmark {benchmark_run_id} status - completed: {benchmark_run.completed_examples}, " 

1414 f"running_acc: {running_accuracy}, dataset_accuracies: {dataset_accuracies}, " 

1415 f"avg_time: {avg_time_per_example}" 

1416 ) 

1417 

1418 return status_data 

1419 

1420 except Exception: 

1421 logger.exception("Error getting benchmark status") 

1422 return None 

1423 

1424 def cancel_benchmark( 

1425 self, benchmark_run_id: int, username: Optional[str] = None 

1426 ) -> bool: 

1427 """Cancel a running benchmark.""" 

1428 try: 

1429 if benchmark_run_id in self.active_runs: 

1430 self.active_runs[benchmark_run_id]["status"] = "cancelled" 

1431 

1432 self.update_benchmark_status( 

1433 benchmark_run_id, BenchmarkStatus.CANCELLED, username=username 

1434 ) 

1435 logger.info(f"Cancelled benchmark run {benchmark_run_id}") 

1436 return True 

1437 

1438 except Exception: 

1439 logger.exception(f"Error cancelling benchmark {benchmark_run_id}") 

1440 return False 

1441 

1442 

1443# Global service instance 

1444benchmark_service = BenchmarkService()