Coverage for src / local_deep_research / web / queue / processor_v2.py: 95%

295 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Queue processor v2 - uses encrypted user databases instead of service.db 

3Supports both direct execution and queue modes. 

4""" 

5 

6import threading 

7import time 

8import uuid 

9from typing import Any, Dict, Optional 

10 

11from loguru import logger 

12 

13from ...constants import ResearchStatus 

14from ...database.encrypted_db import db_manager 

15from ...database.models import ( 

16 QueuedResearch, 

17 ResearchHistory, 

18 UserActiveResearch, 

19) 

20from ...database.queue_service import UserQueueService 

21from ...database.session_context import get_user_db_session 

22from ...database.session_passwords import session_password_store 

23from ...notifications.queue_helpers import ( 

24 send_research_completed_notification_from_session, 

25 send_research_failed_notification_from_session, 

26) 

27from ..services.research_service import ( 

28 run_research_process, 

29 start_research_process, 

30) 

31 

32# Retry configuration constants for notification database queries 

33MAX_RESEARCH_LOOKUP_RETRIES = 3 

34INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds 

35RETRY_BACKOFF_MULTIPLIER = 2 

36 

37 

38class QueueProcessorV2: 

39 """ 

40 Processes queued researches using encrypted user databases. 

41 This replaces the service.db approach. 

42 """ 

43 

44 def __init__(self, check_interval=10): 

45 """ 

46 Initialize the queue processor. 

47 

48 Args: 

49 check_interval: How often to check for work (seconds) 

50 """ 

51 self.check_interval = check_interval 

52 self.running = False 

53 self.thread = None 

54 self._loop_iteration = 0 

55 

56 # Per-user settings will be retrieved from each user's database 

57 # when processing their queue using SettingsManager 

58 logger.info( 

59 "Queue processor v2 initialized - will use per-user settings from SettingsManager" 

60 ) 

61 

62 # Track which users we should check 

63 self._users_to_check: set[tuple[str, str]] = set() 

64 self._users_lock = threading.Lock() 

65 

66 # Track pending operations from background threads 

67 self.pending_operations = {} 

68 self._pending_operations_lock = threading.Lock() 

69 

70 def start(self): 

71 """Start the queue processor thread.""" 

72 if self.running: 

73 logger.warning("Queue processor already running") 

74 return 

75 

76 self.running = True 

77 self.thread = threading.Thread( 

78 target=self._process_queue_loop, daemon=True 

79 ) 

80 self.thread.start() 

81 logger.info("Queue processor v2 started") 

82 

83 def stop(self): 

84 """Stop the queue processor thread.""" 

85 self.running = False 

86 if self.thread: 

87 self.thread.join(timeout=10) 

88 logger.info("Queue processor v2 stopped") 

89 

90 def notify_user_activity(self, username: str, session_id: str): 

91 """ 

92 Notify that a user has activity and their queue should be checked. 

93 

94 Args: 

95 username: The username 

96 session_id: The Flask session ID (for password access) 

97 """ 

98 with self._users_lock: 

99 self._users_to_check.add((username, session_id)) 

100 logger.debug(f"User {username} added to queue check list") 

101 

102 def notify_research_queued(self, username: str, research_id: str, **kwargs): 

103 """ 

104 Notify that a research was queued. 

105 In direct mode, this immediately starts the research if slots are available. 

106 In queue mode, it adds to the queue. 

107 

108 Args: 

109 username: The username 

110 research_id: The research ID 

111 **kwargs: Additional parameters for direct execution (query, mode, etc.) 

112 """ 

113 # Check user's queue_mode setting when we have database access 

114 if kwargs: 

115 session_id = kwargs.get("session_id") 

116 if session_id: 

117 # Check if we can start it directly 

118 password = session_password_store.get_session_password( 

119 username, session_id 

120 ) 

121 if password: 

122 try: 

123 # Open database and check settings + active count 

124 engine = db_manager.open_user_database( 

125 username, password 

126 ) 

127 if engine: 127 ↛ 191line 127 didn't jump to line 191 because the condition on line 127 was always true

128 with get_user_db_session(username) as db_session: 

129 # Get user's settings using SettingsManager 

130 from ...settings.manager import SettingsManager 

131 

132 settings_manager = SettingsManager(db_session) 

133 

134 # Get user's queue_mode setting (env > DB > default) 

135 queue_mode = settings_manager.get_setting( 

136 "app.queue_mode", "direct" 

137 ) 

138 

139 # Get user's max concurrent setting (env > DB > default) 

140 max_concurrent = settings_manager.get_setting( 

141 "app.max_concurrent_researches", 3 

142 ) 

143 

144 logger.debug( 

145 f"User {username} settings: queue_mode={queue_mode}, " 

146 f"max_concurrent={max_concurrent}" 

147 ) 

148 

149 # Only try direct execution if user has queue_mode="direct" 

150 if queue_mode == "direct": 

151 # Count active researches 

152 active_count = ( 

153 db_session.query(UserActiveResearch) 

154 .filter_by( 

155 username=username, 

156 status=ResearchStatus.IN_PROGRESS, 

157 ) 

158 .count() 

159 ) 

160 

161 if active_count < max_concurrent: 

162 # We have slots - start directly! 

163 logger.info( 

164 f"Direct mode: Starting research {research_id} immediately " 

165 f"(active: {active_count}/{max_concurrent})" 

166 ) 

167 

168 # Start the research directly 

169 self._start_research_directly( 

170 username, 

171 research_id, 

172 password, 

173 **kwargs, 

174 ) 

175 return 

176 logger.info( 

177 f"Direct mode: Max concurrent reached ({active_count}/" 

178 f"{max_concurrent}), queueing {research_id}" 

179 ) 

180 else: 

181 logger.info( 

182 f"User {username} has queue_mode={queue_mode}, " 

183 f"queueing research {research_id}" 

184 ) 

185 except Exception: 

186 logger.exception( 

187 f"Error in direct execution for {username}" 

188 ) 

189 

190 # Fall back to queue mode (or if direct mode failed) 

191 try: 

192 with get_user_db_session(username) as session: 

193 queue_service = UserQueueService(session) 

194 queue_service.add_task_metadata( 

195 task_id=research_id, 

196 task_type="research", 

197 priority=0, 

198 ) 

199 logger.info( 

200 f"Research {research_id} queued for user {username}" 

201 ) 

202 except Exception: 

203 logger.exception(f"Failed to update queue status for {username}") 

204 

205 def _start_research_directly( 

206 self, username: str, research_id: str, password: str, **kwargs 

207 ): 

208 """ 

209 Start a research directly without queueing. 

210 

211 Args: 

212 username: The username 

213 research_id: The research ID 

214 password: The user's password 

215 **kwargs: Research parameters (query, mode, settings, etc.) 

216 """ 

217 query = kwargs.get("query") 

218 mode = kwargs.get("mode") 

219 settings_snapshot = kwargs.get("settings_snapshot", {}) 

220 

221 # Create active research record 

222 try: 

223 with get_user_db_session(username) as db_session: 

224 active_record = UserActiveResearch( 

225 username=username, 

226 research_id=research_id, 

227 status=ResearchStatus.IN_PROGRESS, 

228 thread_id="pending", 

229 settings_snapshot=settings_snapshot, 

230 ) 

231 db_session.add(active_record) 

232 db_session.commit() 

233 

234 # Update task status if it exists 

235 queue_service = UserQueueService(db_session) 

236 queue_service.update_task_status(research_id, "processing") 

237 except Exception: 

238 logger.exception( 

239 f"Failed to create active research record for {research_id}" 

240 ) 

241 return 

242 

243 # Extract parameters from kwargs 

244 model_provider = kwargs.get("model_provider") 

245 model = kwargs.get("model") 

246 custom_endpoint = kwargs.get("custom_endpoint") 

247 search_engine = kwargs.get("search_engine") 

248 

249 # Start the research process 

250 try: 

251 research_thread = start_research_process( 

252 research_id, 

253 query, 

254 mode, 

255 run_research_process, 

256 username=username, 

257 user_password=password, 

258 model_provider=model_provider, 

259 model=model, 

260 custom_endpoint=custom_endpoint, 

261 search_engine=search_engine, 

262 max_results=kwargs.get("max_results"), 

263 time_period=kwargs.get("time_period"), 

264 iterations=kwargs.get("iterations"), 

265 questions_per_iteration=kwargs.get("questions_per_iteration"), 

266 strategy=kwargs.get("strategy", "source-based"), 

267 settings_snapshot=settings_snapshot, 

268 ) 

269 

270 # Update thread ID 

271 try: 

272 with get_user_db_session(username) as db_session: 

273 active_record = ( 

274 db_session.query(UserActiveResearch) 

275 .filter_by(username=username, research_id=research_id) 

276 .first() 

277 ) 

278 if active_record: 278 ↛ 286line 278 didn't jump to line 286

279 active_record.thread_id = str(research_thread.ident) 

280 db_session.commit() 

281 except Exception: 

282 logger.exception( 

283 f"Failed to update thread ID for {research_id}" 

284 ) 

285 

286 logger.info( 

287 f"Direct execution: Started research {research_id} for user {username} " 

288 f"in thread {research_thread.ident}" 

289 ) 

290 

291 except Exception: 

292 logger.exception(f"Failed to start research {research_id} directly") 

293 # Clean up the active record 

294 try: 

295 with get_user_db_session(username) as db_session: 

296 active_record = ( 

297 db_session.query(UserActiveResearch) 

298 .filter_by(username=username, research_id=research_id) 

299 .first() 

300 ) 

301 if active_record: 301 ↛ exitline 301 didn't jump to the function exit

302 db_session.delete(active_record) 

303 db_session.commit() 

304 except Exception: 

305 logger.exception( 

306 f"Failed to clean up active research record for {research_id}" 

307 ) 

308 

309 def notify_research_completed( 

310 self, username: str, research_id: str, user_password: str | None = None 

311 ): 

312 """ 

313 Notify that a research completed. 

314 Updates the user's queue status in their database. 

315 

316 Args: 

317 username: The username 

318 research_id: The research ID 

319 user_password: User password for database access. Required for queue 

320 updates and database lookups during notification sending. 

321 Optional only because some callers may not have it 

322 available, in which case only basic updates occur. 

323 """ 

324 try: 

325 # get_user_db_session is already imported at module level (line 19) 

326 # It accepts optional password parameter and returns a context manager 

327 with get_user_db_session(username, user_password) as session: 

328 queue_service = UserQueueService(session) 

329 queue_service.update_task_status( 

330 research_id, ResearchStatus.COMPLETED 

331 ) 

332 logger.info( 

333 f"Research {research_id} completed for user {username}" 

334 ) 

335 

336 # Send notification using helper from notification module 

337 send_research_completed_notification_from_session( 

338 username=username, 

339 research_id=research_id, 

340 db_session=session, 

341 ) 

342 

343 except Exception: 

344 logger.exception( 

345 f"Failed to update completion status for {username}" 

346 ) 

347 

348 # Auto-convert research to document in History collection. 

349 # Documents only — FAISS indexing is triggered separately by the user 

350 # via "Index All" on the History page. 

351 from ...research_library.search.services.research_history_indexer import ( 

352 auto_convert_research, 

353 ) 

354 

355 auto_convert_research(username, research_id, db_password=user_password) 

356 

357 def notify_research_failed( 

358 self, 

359 username: str, 

360 research_id: str, 

361 error_message: str | None = None, 

362 user_password: str | None = None, 

363 ): 

364 """ 

365 Notify that a research failed. 

366 Updates the user's queue status in their database and sends notification. 

367 

368 Args: 

369 username: The username 

370 research_id: The research ID 

371 error_message: Optional error message 

372 user_password: User password for database access. Required for queue 

373 updates and database lookups during notification sending. 

374 Optional only because some callers may not have it 

375 available, in which case only basic updates occur. 

376 """ 

377 try: 

378 # get_user_db_session is already imported at module level (line 19) 

379 # It accepts optional password parameter and returns a context manager 

380 with get_user_db_session(username, user_password) as session: 

381 queue_service = UserQueueService(session) 

382 queue_service.update_task_status( 

383 research_id, 

384 ResearchStatus.FAILED, 

385 error_message=error_message, 

386 ) 

387 logger.info( 

388 f"Research {research_id} failed for user {username}: " 

389 f"{error_message}" 

390 ) 

391 

392 # Send notification using helper from notification module 

393 send_research_failed_notification_from_session( 

394 username=username, 

395 research_id=research_id, 

396 error_message=error_message or "Unknown error", 

397 db_session=session, 

398 ) 

399 

400 except Exception: 

401 logger.exception(f"Failed to update failure status for {username}") 

402 

403 def _process_queue_loop(self): 

404 """Main loop that processes the queue.""" 

405 while self.running: 

406 try: 

407 # Get list of users to check (don't clear immediately) 

408 with self._users_lock: 

409 users_to_check = list(self._users_to_check) 

410 

411 # Process each user's queue 

412 users_to_remove = [] 

413 for user_session in users_to_check: 

414 try: 

415 username, session_id = user_session 

416 # _process_user_queue returns True if queue is empty 

417 queue_empty = self._process_user_queue( 

418 username, session_id 

419 ) 

420 if queue_empty: 

421 users_to_remove.append(user_session) 

422 except Exception: 

423 logger.exception( 

424 f"Error processing queue for {user_session}" 

425 ) 

426 # Don't remove on error - the _process_user_queue method 

427 # determines whether to keep checking based on error type 

428 

429 # Only remove users whose queues are now empty 

430 with self._users_lock: 

431 for user_session in users_to_remove: 

432 self._users_to_check.discard(user_session) 

433 

434 except Exception: 

435 logger.exception("Error in queue processor loop") 

436 finally: 

437 # Clean up thread-local database session after each iteration. 

438 # The loop opens a new session each iteration via get_user_db_session(); 

439 # closing it returns the connection to the shared QueuePool promptly. 

440 try: 

441 from ...database.thread_local_session import ( 

442 cleanup_current_thread, 

443 cleanup_dead_threads, 

444 ) 

445 

446 cleanup_current_thread() 

447 except Exception: 

448 logger.debug( 

449 "thread-local cleanup on shutdown", exc_info=True 

450 ) 

451 

452 # Periodic dead-thread credential sweep (every ~60s). 

453 # One of three sweep trigger points (app_factory 

454 # teardown, connection_cleanup scheduler, and here). 

455 self._loop_iteration += 1 

456 if self._loop_iteration % 6 == 0: # Every ~60s (10s × 6) 

457 try: 

458 cleanup_dead_threads() 

459 except Exception: 

460 logger.debug( 

461 "periodic dead-thread sweep", exc_info=True 

462 ) 

463 

464 time.sleep(self.check_interval) 

465 

466 def _process_user_queue(self, username: str, session_id: str) -> bool: 

467 """ 

468 Process the queue for a specific user. 

469 

470 Args: 

471 username: The username 

472 session_id: The Flask session ID 

473 

474 Returns: 

475 True if the queue is empty, False if there are still items 

476 """ 

477 # Get the user's password from session store 

478 password = session_password_store.get_session_password( 

479 username, session_id 

480 ) 

481 if not password: 

482 logger.debug( 

483 f"No password available for user {username}, skipping queue check" 

484 ) 

485 return True # Remove from checking - session expired 

486 

487 # Open the user's encrypted database 

488 try: 

489 # First ensure the database is open 

490 engine = db_manager.open_user_database(username, password) 

491 if not engine: 

492 logger.error(f"Failed to open database for user {username}") 

493 return False # Keep checking - could be temporary DB issue 

494 

495 # Get a session and process the queue 

496 with get_user_db_session(username, password) as db_session: 

497 queue_service = UserQueueService(db_session) 

498 

499 # Get user's settings using SettingsManager 

500 from ...settings.manager import SettingsManager 

501 

502 settings_manager = SettingsManager(db_session) 

503 

504 # Get user's max concurrent setting (env > DB > default) 

505 max_concurrent = settings_manager.get_setting( 

506 "app.max_concurrent_researches", 3 

507 ) 

508 

509 # Get queue status 

510 queue_status = queue_service.get_queue_status() or { 

511 "active_tasks": 0, 

512 "queued_tasks": 0, 

513 } 

514 

515 # Calculate available slots 

516 available_slots = max_concurrent - queue_status["active_tasks"] 

517 

518 if available_slots <= 0: 

519 # No slots available, but queue might not be empty 

520 return False # Keep checking 

521 

522 if queue_status["queued_tasks"] == 0: 522 ↛ 526line 522 didn't jump to line 526 because the condition on line 522 was always true

523 # Queue is empty 

524 return True # Remove from checking 

525 

526 logger.info( 

527 f"Processing queue for {username}: " 

528 f"{queue_status['active_tasks']} active, " 

529 f"{queue_status['queued_tasks']} queued, " 

530 f"{available_slots} slots available" 

531 ) 

532 

533 # Process queued researches 

534 self._start_queued_researches( 

535 db_session, 

536 queue_service, 

537 username, 

538 password, 

539 available_slots, 

540 ) 

541 

542 # Check if there are still items in queue 

543 updated_status = queue_service.get_queue_status() or { 

544 "queued_tasks": 0 

545 } 

546 return bool(updated_status["queued_tasks"] == 0) 

547 

548 except Exception: 

549 logger.exception(f"Error processing queue for user {username}") 

550 return False # Keep checking - errors might be temporary 

551 

552 def _start_queued_researches( 

553 self, 

554 db_session, 

555 queue_service: UserQueueService, 

556 username: str, 

557 password: str, 

558 available_slots: int, 

559 ): 

560 """Start queued researches up to available slots.""" 

561 # Get queued researches 

562 queued = ( 

563 db_session.query(QueuedResearch) 

564 .filter_by(username=username, is_processing=False) 

565 .order_by(QueuedResearch.position) 

566 .limit(available_slots) 

567 .all() 

568 ) 

569 

570 for queued_research in queued: 

571 try: 

572 # Mark as processing 

573 queued_research.is_processing = True 

574 db_session.commit() 

575 

576 # Update task status 

577 queue_service.update_task_status( 

578 queued_research.research_id, "processing" 

579 ) 

580 

581 # Start the research 

582 self._start_research( 

583 db_session, 

584 username, 

585 password, 

586 queued_research, 

587 ) 

588 

589 # Remove from queue 

590 db_session.delete(queued_research) 

591 db_session.commit() 

592 

593 logger.info( 

594 f"Started queued research {queued_research.research_id} " 

595 f"for user {username}" 

596 ) 

597 

598 except Exception: 

599 logger.exception( 

600 f"Error starting queued research {queued_research.research_id}" 

601 ) 

602 # Reset processing flag 

603 queued_research.is_processing = False 

604 db_session.commit() 

605 

606 # Update task status 

607 queue_service.update_task_status( 

608 queued_research.research_id, 

609 ResearchStatus.FAILED, 

610 error_message="Failed to start research", 

611 ) 

612 

613 def _start_research( 

614 self, 

615 db_session, 

616 username: str, 

617 password: str, 

618 queued_research, 

619 ): 

620 """Start a queued research.""" 

621 # Update research status 

622 research = ( 

623 db_session.query(ResearchHistory) 

624 .filter_by(id=queued_research.research_id) 

625 .first() 

626 ) 

627 

628 if not research: 

629 raise ValueError( 

630 f"Research {queued_research.research_id} not found" 

631 ) 

632 

633 research.status = ResearchStatus.IN_PROGRESS 

634 db_session.commit() 

635 

636 # Create active research record 

637 active_record = UserActiveResearch( 

638 username=username, 

639 research_id=queued_research.research_id, 

640 status=ResearchStatus.IN_PROGRESS, 

641 thread_id="pending", 

642 settings_snapshot=queued_research.settings_snapshot, 

643 ) 

644 db_session.add(active_record) 

645 db_session.commit() 

646 

647 # Extract settings 

648 settings_snapshot = queued_research.settings_snapshot or {} 

649 

650 # Handle new vs legacy structure 

651 if ( 

652 isinstance(settings_snapshot, dict) 

653 and "submission" in settings_snapshot 

654 ): 

655 submission_params = settings_snapshot.get("submission", {}) 

656 complete_settings = settings_snapshot.get("settings_snapshot", {}) 

657 else: 

658 submission_params = settings_snapshot 

659 complete_settings = {} 

660 

661 # Start the research process with password 

662 research_thread = start_research_process( 

663 queued_research.research_id, 

664 queued_research.query, 

665 queued_research.mode, 

666 run_research_process, 

667 username=username, 

668 user_password=password, # Pass password for metrics 

669 model_provider=submission_params.get("model_provider"), 

670 model=submission_params.get("model"), 

671 custom_endpoint=submission_params.get("custom_endpoint"), 

672 search_engine=submission_params.get("search_engine"), 

673 max_results=submission_params.get("max_results"), 

674 time_period=submission_params.get("time_period"), 

675 iterations=submission_params.get("iterations"), 

676 questions_per_iteration=submission_params.get( 

677 "questions_per_iteration" 

678 ), 

679 strategy=submission_params.get("strategy", "source-based"), 

680 settings_snapshot=complete_settings, 

681 ) 

682 

683 # Update thread ID 

684 active_record.thread_id = str(research_thread.ident) # type: ignore[assignment] 

685 db_session.commit() 

686 

687 def process_user_request(self, username: str, session_id: str) -> int: 

688 """ 

689 Process queue for a user during their request. 

690 This is called from request context to check and start queued items. 

691 

692 Returns: 

693 Number of researches started 

694 """ 

695 try: 

696 # Add user to check list 

697 self.notify_user_activity(username, session_id) 

698 

699 # Force immediate check (don't wait for loop) 

700 password = session_password_store.get_session_password( 

701 username, session_id 

702 ) 

703 if password: 

704 # Open database and check queue 

705 engine = db_manager.open_user_database(username, password) 

706 if engine: 706 ↛ 719line 706 didn't jump to line 719 because the condition on line 706 was always true

707 with get_user_db_session(username) as db_session: 

708 queue_service = UserQueueService(db_session) 

709 status = queue_service.get_queue_status() 

710 

711 if status and status["queued_tasks"] > 0: 

712 logger.info( 

713 f"User {username} has {status['queued_tasks']} " 

714 f"queued tasks, triggering immediate processing" 

715 ) 

716 # Process will happen in background thread 

717 return int(status["queued_tasks"]) 

718 

719 return 0 

720 

721 except Exception: 

722 logger.exception(f"Error in process_user_request for {username}") 

723 return 0 

724 

725 def queue_progress_update( 

726 self, username: str, research_id: str, progress: float 

727 ): 

728 """ 

729 Queue a progress update that needs database access. 

730 For compatibility with old processor during migration. 

731 

732 Args: 

733 username: The username 

734 research_id: The research ID 

735 progress: The progress value (0-100) 

736 """ 

737 # In processor_v2, we can update directly if we have database access 

738 # or queue it for later processing 

739 operation_id = str(uuid.uuid4()) 

740 with self._pending_operations_lock: 

741 self.pending_operations[operation_id] = { 

742 "username": username, 

743 "operation_type": "progress_update", 

744 "research_id": research_id, 

745 "progress": progress, 

746 "timestamp": time.time(), 

747 } 

748 logger.debug( 

749 f"Queued progress update for research {research_id}: {progress}%" 

750 ) 

751 

752 def queue_error_update( 

753 self, 

754 username: str, 

755 research_id: str, 

756 status: str, 

757 error_message: str, 

758 metadata: Dict[str, Any], 

759 completed_at: str, 

760 report_path: Optional[str] = None, 

761 ): 

762 """ 

763 Queue an error status update that needs database access. 

764 For compatibility with old processor during migration. 

765 

766 Args: 

767 username: The username 

768 research_id: The research ID 

769 status: The status to set (failed, suspended, etc.) 

770 error_message: The error message 

771 metadata: Research metadata 

772 completed_at: Completion timestamp 

773 report_path: Optional path to error report 

774 """ 

775 operation_id = str(uuid.uuid4()) 

776 with self._pending_operations_lock: 

777 self.pending_operations[operation_id] = { 

778 "username": username, 

779 "operation_type": "error_update", 

780 "research_id": research_id, 

781 "status": status, 

782 "error_message": error_message, 

783 "metadata": metadata, 

784 "completed_at": completed_at, 

785 "report_path": report_path, 

786 "timestamp": time.time(), 

787 } 

788 logger.info( 

789 f"Queued error update for research {research_id} with status {status}" 

790 ) 

791 

792 def process_pending_operations_for_user( 

793 self, username: str, db_session 

794 ) -> int: 

795 """ 

796 Process pending operations for a user when we have database access. 

797 Called from request context where encrypted database is accessible. 

798 For compatibility with old processor during migration. 

799 

800 Args: 

801 username: Username to process operations for 

802 db_session: Active database session for the user 

803 

804 Returns: 

805 Number of operations processed 

806 """ 

807 # Find pending operations for this user (with lock) 

808 operations_to_process = [] 

809 with self._pending_operations_lock: 

810 for op_id, op_data in list(self.pending_operations.items()): 

811 if op_data["username"] == username: 

812 operations_to_process.append((op_id, op_data)) 

813 # Remove immediately to prevent duplicate processing 

814 del self.pending_operations[op_id] 

815 

816 if not operations_to_process: 

817 return 0 

818 

819 processed_count = 0 

820 

821 # Process operations outside the lock (to avoid holding lock during DB operations) 

822 for op_id, op_data in operations_to_process: 

823 try: 

824 operation_type = op_data.get("operation_type") 

825 

826 if operation_type == "progress_update": 

827 # Update progress in database 

828 from ...database.models import ResearchHistory 

829 

830 research = ( 

831 db_session.query(ResearchHistory) 

832 .filter_by(id=op_data["research_id"]) 

833 .first() 

834 ) 

835 if research: 835 ↛ 822line 835 didn't jump to line 822 because the condition on line 835 was always true

836 # Update the progress column directly 

837 research.progress = op_data["progress"] 

838 db_session.commit() 

839 processed_count += 1 

840 

841 elif operation_type == "error_update": 841 ↛ 822line 841 didn't jump to line 822 because the condition on line 841 was always true

842 # Update error status in database 

843 from ...database.models import ResearchHistory 

844 

845 research = ( 

846 db_session.query(ResearchHistory) 

847 .filter_by(id=op_data["research_id"]) 

848 .first() 

849 ) 

850 if research: 850 ↛ 822line 850 didn't jump to line 822 because the condition on line 850 was always true

851 research.status = op_data["status"] 

852 research.error_message = op_data["error_message"] 

853 research.research_meta = op_data["metadata"] 

854 research.completed_at = op_data["completed_at"] 

855 if op_data.get("report_path"): 

856 research.report_path = op_data["report_path"] 

857 db_session.commit() 

858 processed_count += 1 

859 

860 except Exception: 

861 logger.exception(f"Error processing operation {op_id}") 

862 # Rollback to clear the failed transaction state 

863 try: 

864 db_session.rollback() 

865 except Exception: 

866 logger.warning( 

867 f"Failed to rollback after error in operation {op_id}" 

868 ) 

869 

870 return processed_count 

871 

872 

873# Global queue processor instance 

874queue_processor = QueueProcessorV2()