Coverage for src / local_deep_research / web / queue / processor_v2.py: 51%

281 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Queue processor v2 - uses encrypted user databases instead of service.db 

3Supports both direct execution and queue modes. 

4""" 

5 

6import threading 

7import time 

8import uuid 

9from typing import Any, Dict, Optional, Set 

10 

11from loguru import logger 

12 

13from ...database.encrypted_db import db_manager 

14from ...database.models import ( 

15 QueuedResearch, 

16 ResearchHistory, 

17 UserActiveResearch, 

18) 

19from ...database.queue_service import UserQueueService 

20from ...database.session_context import get_user_db_session 

21from ...database.session_passwords import session_password_store 

22from ...notifications.queue_helpers import ( 

23 send_research_completed_notification_from_session, 

24 send_research_failed_notification_from_session, 

25) 

26from ..routes.globals import active_research, termination_flags 

27from ..services.research_service import ( 

28 run_research_process, 

29 start_research_process, 

30) 

31 

32# Retry configuration constants for notification database queries 

33MAX_RESEARCH_LOOKUP_RETRIES = 3 

34INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds 

35RETRY_BACKOFF_MULTIPLIER = 2 

36 

37 

38class QueueProcessorV2: 

39 """ 

40 Processes queued researches using encrypted user databases. 

41 This replaces the service.db approach. 

42 """ 

43 

44 def __init__(self, check_interval=10): 

45 """ 

46 Initialize the queue processor. 

47 

48 Args: 

49 check_interval: How often to check for work (seconds) 

50 """ 

51 self.check_interval = check_interval 

52 self.running = False 

53 self.thread = None 

54 

55 # Per-user settings will be retrieved from each user's database 

56 # when processing their queue using SettingsManager 

57 logger.info( 

58 "Queue processor v2 initialized - will use per-user settings from SettingsManager" 

59 ) 

60 

61 # Track which users we should check 

62 self._users_to_check: Set[str] = set() 

63 self._users_lock = threading.Lock() 

64 

65 # Track pending operations from background threads 

66 self.pending_operations = {} 

67 self._pending_operations_lock = threading.Lock() 

68 

69 def start(self): 

70 """Start the queue processor thread.""" 

71 if self.running: 

72 logger.warning("Queue processor already running") 

73 return 

74 

75 self.running = True 

76 self.thread = threading.Thread( 

77 target=self._process_queue_loop, daemon=True 

78 ) 

79 self.thread.start() 

80 logger.info("Queue processor v2 started") 

81 

82 def stop(self): 

83 """Stop the queue processor thread.""" 

84 self.running = False 

85 if self.thread: 

86 self.thread.join(timeout=10) 

87 logger.info("Queue processor v2 stopped") 

88 

89 def notify_user_activity(self, username: str, session_id: str): 

90 """ 

91 Notify that a user has activity and their queue should be checked. 

92 

93 Args: 

94 username: The username 

95 session_id: The Flask session ID (for password access) 

96 """ 

97 with self._users_lock: 

98 self._users_to_check.add(f"{username}:{session_id}") 

99 logger.debug(f"User {username} added to queue check list") 

100 

101 def notify_research_queued(self, username: str, research_id: str, **kwargs): 

102 """ 

103 Notify that a research was queued. 

104 In direct mode, this immediately starts the research if slots are available. 

105 In queue mode, it adds to the queue. 

106 

107 Args: 

108 username: The username 

109 research_id: The research ID 

110 **kwargs: Additional parameters for direct execution (query, mode, etc.) 

111 """ 

112 # Check user's queue_mode setting when we have database access 

113 if kwargs: 

114 session_id = kwargs.get("session_id") 

115 if session_id: 

116 # Check if we can start it directly 

117 password = session_password_store.get_session_password( 

118 username, session_id 

119 ) 

120 if password: 

121 try: 

122 # Open database and check settings + active count 

123 engine = db_manager.open_user_database( 

124 username, password 

125 ) 

126 if engine: 

127 with get_user_db_session(username) as db_session: 

128 # Get user's settings using SettingsManager 

129 from ...settings.manager import SettingsManager 

130 

131 settings_manager = SettingsManager(db_session) 

132 

133 # Get user's queue_mode setting (env > DB > default) 

134 queue_mode = settings_manager.get_setting( 

135 "app.queue_mode", "direct" 

136 ) 

137 

138 # Get user's max concurrent setting (env > DB > default) 

139 max_concurrent = settings_manager.get_setting( 

140 "app.max_concurrent_researches", 3 

141 ) 

142 

143 logger.debug( 

144 f"User {username} settings: queue_mode={queue_mode}, " 

145 f"max_concurrent={max_concurrent}" 

146 ) 

147 

148 # Only try direct execution if user has queue_mode="direct" 

149 if queue_mode == "direct": 

150 # Count active researches 

151 active_count = ( 

152 db_session.query(UserActiveResearch) 

153 .filter_by( 

154 username=username, 

155 status="in_progress", 

156 ) 

157 .count() 

158 ) 

159 

160 if active_count < max_concurrent: 

161 # We have slots - start directly! 

162 logger.info( 

163 f"Direct mode: Starting research {research_id} immediately " 

164 f"(active: {active_count}/{max_concurrent})" 

165 ) 

166 

167 # Start the research directly 

168 self._start_research_directly( 

169 username, 

170 research_id, 

171 password, 

172 **kwargs, 

173 ) 

174 return 

175 else: 

176 logger.info( 

177 f"Direct mode: Max concurrent reached ({active_count}/" 

178 f"{max_concurrent}), queueing {research_id}" 

179 ) 

180 else: 

181 logger.info( 

182 f"User {username} has queue_mode={queue_mode}, " 

183 f"queueing research {research_id}" 

184 ) 

185 except Exception: 

186 logger.exception( 

187 f"Error in direct execution for {username}" 

188 ) 

189 

190 # Fall back to queue mode (or if direct mode failed) 

191 try: 

192 with get_user_db_session(username) as session: 

193 queue_service = UserQueueService(session) 

194 queue_service.add_task_metadata( 

195 task_id=research_id, 

196 task_type="research", 

197 priority=0, 

198 ) 

199 logger.info( 

200 f"Research {research_id} queued for user {username}" 

201 ) 

202 except Exception: 

203 logger.exception(f"Failed to update queue status for {username}") 

204 

205 def _start_research_directly( 

206 self, username: str, research_id: str, password: str, **kwargs 

207 ): 

208 """ 

209 Start a research directly without queueing. 

210 

211 Args: 

212 username: The username 

213 research_id: The research ID 

214 password: The user's password 

215 **kwargs: Research parameters (query, mode, settings, etc.) 

216 """ 

217 query = kwargs.get("query") 

218 mode = kwargs.get("mode") 

219 settings_snapshot = kwargs.get("settings_snapshot", {}) 

220 

221 # Create active research record 

222 try: 

223 with get_user_db_session(username) as db_session: 

224 active_record = UserActiveResearch( 

225 username=username, 

226 research_id=research_id, 

227 status="in_progress", 

228 thread_id="pending", 

229 settings_snapshot=settings_snapshot, 

230 ) 

231 db_session.add(active_record) 

232 db_session.commit() 

233 

234 # Update task status if it exists 

235 queue_service = UserQueueService(db_session) 

236 queue_service.update_task_status(research_id, "processing") 

237 except Exception: 

238 logger.exception( 

239 f"Failed to create active research record for {research_id}" 

240 ) 

241 return 

242 

243 # Extract parameters from kwargs 

244 model_provider = kwargs.get("model_provider") 

245 model = kwargs.get("model") 

246 custom_endpoint = kwargs.get("custom_endpoint") 

247 search_engine = kwargs.get("search_engine") 

248 

249 # Start the research process 

250 try: 

251 research_thread = start_research_process( 

252 research_id, 

253 query, 

254 mode, 

255 active_research, 

256 termination_flags, 

257 run_research_process, 

258 username=username, 

259 user_password=password, 

260 model_provider=model_provider, 

261 model=model, 

262 custom_endpoint=custom_endpoint, 

263 search_engine=search_engine, 

264 max_results=kwargs.get("max_results"), 

265 time_period=kwargs.get("time_period"), 

266 iterations=kwargs.get("iterations"), 

267 questions_per_iteration=kwargs.get("questions_per_iteration"), 

268 strategy=kwargs.get("strategy", "source-based"), 

269 settings_snapshot=settings_snapshot, 

270 ) 

271 

272 # Update thread ID 

273 try: 

274 with get_user_db_session(username) as db_session: 

275 active_record = ( 

276 db_session.query(UserActiveResearch) 

277 .filter_by(username=username, research_id=research_id) 

278 .first() 

279 ) 

280 if active_record: 

281 active_record.thread_id = str(research_thread.ident) 

282 db_session.commit() 

283 except Exception: 

284 logger.exception( 

285 f"Failed to update thread ID for {research_id}" 

286 ) 

287 

288 logger.info( 

289 f"Direct execution: Started research {research_id} for user {username} " 

290 f"in thread {research_thread.ident}" 

291 ) 

292 

293 except Exception: 

294 logger.exception(f"Failed to start research {research_id} directly") 

295 # Clean up the active record 

296 try: 

297 with get_user_db_session(username) as db_session: 

298 active_record = ( 

299 db_session.query(UserActiveResearch) 

300 .filter_by(username=username, research_id=research_id) 

301 .first() 

302 ) 

303 if active_record: 

304 db_session.delete(active_record) 

305 db_session.commit() 

306 except Exception: 

307 pass 

308 

309 def notify_research_completed( 

310 self, username: str, research_id: str, user_password: str = None 

311 ): 

312 """ 

313 Notify that a research completed. 

314 Updates the user's queue status in their database. 

315 

316 Args: 

317 username: The username 

318 research_id: The research ID 

319 user_password: User password for database access. Required for queue 

320 updates and database lookups during notification sending. 

321 Optional only because some callers may not have it 

322 available, in which case only basic updates occur. 

323 """ 

324 try: 

325 # get_user_db_session is already imported at module level (line 19) 

326 # It accepts optional password parameter and returns a context manager 

327 with get_user_db_session(username, user_password) as session: 

328 queue_service = UserQueueService(session) 

329 queue_service.update_task_status(research_id, "completed") 

330 logger.info( 

331 f"Research {research_id} completed for user {username}" 

332 ) 

333 

334 # Send notification using helper from notification module 

335 send_research_completed_notification_from_session( 

336 username=username, 

337 research_id=research_id, 

338 db_session=session, 

339 ) 

340 

341 except Exception: 

342 logger.exception( 

343 f"Failed to update completion status for {username}" 

344 ) 

345 

346 def notify_research_failed( 

347 self, 

348 username: str, 

349 research_id: str, 

350 error_message: str = None, 

351 user_password: str = None, 

352 ): 

353 """ 

354 Notify that a research failed. 

355 Updates the user's queue status in their database and sends notification. 

356 

357 Args: 

358 username: The username 

359 research_id: The research ID 

360 error_message: Optional error message 

361 user_password: User password for database access. Required for queue 

362 updates and database lookups during notification sending. 

363 Optional only because some callers may not have it 

364 available, in which case only basic updates occur. 

365 """ 

366 try: 

367 # get_user_db_session is already imported at module level (line 19) 

368 # It accepts optional password parameter and returns a context manager 

369 with get_user_db_session(username, user_password) as session: 

370 queue_service = UserQueueService(session) 

371 queue_service.update_task_status( 

372 research_id, "failed", error_message=error_message 

373 ) 

374 logger.info( 

375 f"Research {research_id} failed for user {username}: " 

376 f"{error_message}" 

377 ) 

378 

379 # Send notification using helper from notification module 

380 send_research_failed_notification_from_session( 

381 username=username, 

382 research_id=research_id, 

383 error_message=error_message or "Unknown error", 

384 db_session=session, 

385 ) 

386 

387 except Exception: 

388 logger.exception(f"Failed to update failure status for {username}") 

389 

390 def _process_queue_loop(self): 

391 """Main loop that processes the queue.""" 

392 while self.running: 392 ↛ exitline 392 didn't return from function '_process_queue_loop' because the condition on line 392 was always true

393 try: 

394 # Get list of users to check (don't clear immediately) 

395 with self._users_lock: 

396 users_to_check = list(self._users_to_check) 

397 

398 # Process each user's queue 

399 users_to_remove = [] 

400 for user_session in users_to_check: 

401 try: 

402 username, session_id = user_session.split(":", 1) 

403 # _process_user_queue returns True if queue is empty 

404 queue_empty = self._process_user_queue( 

405 username, session_id 

406 ) 

407 if queue_empty: 

408 users_to_remove.append(user_session) 

409 except Exception: 

410 logger.exception( 

411 f"Error processing queue for {user_session}" 

412 ) 

413 # Don't remove on error - the _process_user_queue method 

414 # determines whether to keep checking based on error type 

415 

416 # Only remove users whose queues are now empty 

417 with self._users_lock: 

418 for user_session in users_to_remove: 

419 self._users_to_check.discard(user_session) 

420 

421 except Exception: 

422 logger.exception("Error in queue processor loop") 

423 

424 time.sleep(self.check_interval) 

425 

426 def _process_user_queue(self, username: str, session_id: str) -> bool: 

427 """ 

428 Process the queue for a specific user. 

429 

430 Args: 

431 username: The username 

432 session_id: The Flask session ID 

433 

434 Returns: 

435 True if the queue is empty, False if there are still items 

436 """ 

437 # Get the user's password from session store 

438 password = session_password_store.get_session_password( 

439 username, session_id 

440 ) 

441 if not password: 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true

442 logger.debug( 

443 f"No password available for user {username}, skipping queue check" 

444 ) 

445 return True # Remove from checking - session expired 

446 

447 # Open the user's encrypted database 

448 try: 

449 # First ensure the database is open 

450 engine = db_manager.open_user_database(username, password) 

451 if not engine: 

452 logger.error(f"Failed to open database for user {username}") 

453 return False # Keep checking - could be temporary DB issue 

454 

455 # Get a session and process the queue 

456 with get_user_db_session(username, password) as db_session: 

457 queue_service = UserQueueService(db_session) 

458 

459 # Get user's settings using SettingsManager 

460 from ...settings.manager import SettingsManager 

461 

462 settings_manager = SettingsManager(db_session) 

463 

464 # Get user's max concurrent setting (env > DB > default) 

465 max_concurrent = settings_manager.get_setting( 

466 "app.max_concurrent_researches", 3 

467 ) 

468 

469 # Get queue status 

470 queue_status = queue_service.get_queue_status() or { 

471 "active_tasks": 0, 

472 "queued_tasks": 0, 

473 } 

474 

475 # Calculate available slots 

476 available_slots = max_concurrent - queue_status["active_tasks"] 

477 

478 if available_slots <= 0: 478 ↛ 480line 478 didn't jump to line 480 because the condition on line 478 was never true

479 # No slots available, but queue might not be empty 

480 return False # Keep checking 

481 

482 if queue_status["queued_tasks"] == 0: 482 ↛ 486line 482 didn't jump to line 486 because the condition on line 482 was always true

483 # Queue is empty 

484 return True # Remove from checking 

485 

486 logger.info( 

487 f"Processing queue for {username}: " 

488 f"{queue_status['active_tasks']} active, " 

489 f"{queue_status['queued_tasks']} queued, " 

490 f"{available_slots} slots available" 

491 ) 

492 

493 # Process queued researches 

494 self._start_queued_researches( 

495 db_session, 

496 queue_service, 

497 username, 

498 password, 

499 available_slots, 

500 ) 

501 

502 # Check if there are still items in queue 

503 updated_status = queue_service.get_queue_status() or { 

504 "queued_tasks": 0 

505 } 

506 return updated_status["queued_tasks"] == 0 

507 

508 except Exception: 

509 logger.exception(f"Error processing queue for user {username}") 

510 return False # Keep checking - errors might be temporary 

511 

512 def _start_queued_researches( 

513 self, 

514 db_session, 

515 queue_service: UserQueueService, 

516 username: str, 

517 password: str, 

518 available_slots: int, 

519 ): 

520 """Start queued researches up to available slots.""" 

521 # Get queued researches 

522 queued = ( 

523 db_session.query(QueuedResearch) 

524 .filter_by(username=username, is_processing=False) 

525 .order_by(QueuedResearch.position) 

526 .limit(available_slots) 

527 .all() 

528 ) 

529 

530 for queued_research in queued: 

531 try: 

532 # Mark as processing 

533 queued_research.is_processing = True 

534 db_session.commit() 

535 

536 # Update task status 

537 queue_service.update_task_status( 

538 queued_research.research_id, "processing" 

539 ) 

540 

541 # Start the research 

542 self._start_research( 

543 db_session, 

544 username, 

545 password, 

546 queued_research, 

547 ) 

548 

549 # Remove from queue 

550 db_session.delete(queued_research) 

551 db_session.commit() 

552 

553 logger.info( 

554 f"Started queued research {queued_research.research_id} " 

555 f"for user {username}" 

556 ) 

557 

558 except Exception: 

559 logger.exception( 

560 f"Error starting queued research {queued_research.research_id}" 

561 ) 

562 # Reset processing flag 

563 queued_research.is_processing = False 

564 db_session.commit() 

565 

566 # Update task status 

567 queue_service.update_task_status( 

568 queued_research.research_id, 

569 "failed", 

570 error_message="Failed to start research", 

571 ) 

572 

573 def _start_research( 

574 self, 

575 db_session, 

576 username: str, 

577 password: str, 

578 queued_research, 

579 ): 

580 """Start a queued research.""" 

581 # Update research status 

582 research = ( 

583 db_session.query(ResearchHistory) 

584 .filter_by(id=queued_research.research_id) 

585 .first() 

586 ) 

587 

588 if not research: 

589 raise ValueError( 

590 f"Research {queued_research.research_id} not found" 

591 ) 

592 

593 research.status = "in_progress" 

594 db_session.commit() 

595 

596 # Create active research record 

597 active_record = UserActiveResearch( 

598 username=username, 

599 research_id=queued_research.research_id, 

600 status="in_progress", 

601 thread_id="pending", 

602 settings_snapshot=queued_research.settings_snapshot, 

603 ) 

604 db_session.add(active_record) 

605 db_session.commit() 

606 

607 # Extract settings 

608 settings_snapshot = queued_research.settings_snapshot or {} 

609 

610 # Handle new vs legacy structure 

611 if ( 

612 isinstance(settings_snapshot, dict) 

613 and "submission" in settings_snapshot 

614 ): 

615 submission_params = settings_snapshot.get("submission", {}) 

616 complete_settings = settings_snapshot.get("settings_snapshot", {}) 

617 else: 

618 submission_params = settings_snapshot 

619 complete_settings = {} 

620 

621 # Start the research process with password 

622 research_thread = start_research_process( 

623 queued_research.research_id, 

624 queued_research.query, 

625 queued_research.mode, 

626 active_research, 

627 termination_flags, 

628 run_research_process, 

629 username=username, 

630 user_password=password, # Pass password for metrics 

631 model_provider=submission_params.get("model_provider"), 

632 model=submission_params.get("model"), 

633 custom_endpoint=submission_params.get("custom_endpoint"), 

634 search_engine=submission_params.get("search_engine"), 

635 max_results=submission_params.get("max_results"), 

636 time_period=submission_params.get("time_period"), 

637 iterations=submission_params.get("iterations"), 

638 questions_per_iteration=submission_params.get( 

639 "questions_per_iteration" 

640 ), 

641 strategy=submission_params.get("strategy", "source-based"), 

642 settings_snapshot=complete_settings, 

643 ) 

644 

645 # Update thread ID 

646 active_record.thread_id = str(research_thread.ident) 

647 db_session.commit() 

648 

649 def process_user_request(self, username: str, session_id: str) -> int: 

650 """ 

651 Process queue for a user during their request. 

652 This is called from request context to check and start queued items. 

653 

654 Returns: 

655 Number of researches started 

656 """ 

657 try: 

658 # Add user to check list 

659 self.notify_user_activity(username, session_id) 

660 

661 # Force immediate check (don't wait for loop) 

662 password = session_password_store.get_session_password( 

663 username, session_id 

664 ) 

665 if password: 665 ↛ 681line 665 didn't jump to line 681 because the condition on line 665 was always true

666 # Open database and check queue 

667 engine = db_manager.open_user_database(username, password) 

668 if engine: 668 ↛ 681line 668 didn't jump to line 681 because the condition on line 668 was always true

669 with get_user_db_session(username) as db_session: 

670 queue_service = UserQueueService(db_session) 

671 status = queue_service.get_queue_status() 

672 

673 if status and status["queued_tasks"] > 0: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true

674 logger.info( 

675 f"User {username} has {status['queued_tasks']} " 

676 f"queued tasks, triggering immediate processing" 

677 ) 

678 # Process will happen in background thread 

679 return status["queued_tasks"] 

680 

681 return 0 

682 

683 except Exception: 

684 logger.exception(f"Error in process_user_request for {username}") 

685 return 0 

686 

687 def queue_progress_update( 

688 self, username: str, research_id: str, progress: float 

689 ): 

690 """ 

691 Queue a progress update that needs database access. 

692 For compatibility with old processor during migration. 

693 

694 Args: 

695 username: The username 

696 research_id: The research ID 

697 progress: The progress value (0-100) 

698 """ 

699 # In processor_v2, we can update directly if we have database access 

700 # or queue it for later processing 

701 operation_id = str(uuid.uuid4()) 

702 with self._pending_operations_lock: 

703 self.pending_operations[operation_id] = { 

704 "username": username, 

705 "operation_type": "progress_update", 

706 "research_id": research_id, 

707 "progress": progress, 

708 "timestamp": time.time(), 

709 } 

710 logger.debug( 

711 f"Queued progress update for research {research_id}: {progress}%" 

712 ) 

713 

714 def queue_error_update( 

715 self, 

716 username: str, 

717 research_id: str, 

718 status: str, 

719 error_message: str, 

720 metadata: Dict[str, Any], 

721 completed_at: str, 

722 report_path: Optional[str] = None, 

723 ): 

724 """ 

725 Queue an error status update that needs database access. 

726 For compatibility with old processor during migration. 

727 

728 Args: 

729 username: The username 

730 research_id: The research ID 

731 status: The status to set (failed, suspended, etc.) 

732 error_message: The error message 

733 metadata: Research metadata 

734 completed_at: Completion timestamp 

735 report_path: Optional path to error report 

736 """ 

737 operation_id = str(uuid.uuid4()) 

738 with self._pending_operations_lock: 

739 self.pending_operations[operation_id] = { 

740 "username": username, 

741 "operation_type": "error_update", 

742 "research_id": research_id, 

743 "status": status, 

744 "error_message": error_message, 

745 "metadata": metadata, 

746 "completed_at": completed_at, 

747 "report_path": report_path, 

748 "timestamp": time.time(), 

749 } 

750 logger.info( 

751 f"Queued error update for research {research_id} with status {status}" 

752 ) 

753 

754 def process_pending_operations_for_user( 

755 self, username: str, db_session 

756 ) -> int: 

757 """ 

758 Process pending operations for a user when we have database access. 

759 Called from request context where encrypted database is accessible. 

760 For compatibility with old processor during migration. 

761 

762 Args: 

763 username: Username to process operations for 

764 db_session: Active database session for the user 

765 

766 Returns: 

767 Number of operations processed 

768 """ 

769 # Find pending operations for this user (with lock) 

770 operations_to_process = [] 

771 with self._pending_operations_lock: 

772 for op_id, op_data in list(self.pending_operations.items()): 

773 if op_data["username"] == username: 

774 operations_to_process.append((op_id, op_data)) 

775 # Remove immediately to prevent duplicate processing 

776 del self.pending_operations[op_id] 

777 

778 if not operations_to_process: 

779 return 0 

780 

781 processed_count = 0 

782 

783 # Process operations outside the lock (to avoid holding lock during DB operations) 

784 for op_id, op_data in operations_to_process: 

785 try: 

786 operation_type = op_data.get("operation_type") 

787 

788 if operation_type == "progress_update": 

789 # Update progress in database 

790 from ...database.models import ResearchHistory 

791 

792 research = ( 

793 db_session.query(ResearchHistory) 

794 .filter_by(id=op_data["research_id"]) 

795 .first() 

796 ) 

797 if research: 797 ↛ 784line 797 didn't jump to line 784 because the condition on line 797 was always true

798 # Update the progress column directly 

799 research.progress = op_data["progress"] 

800 db_session.commit() 

801 processed_count += 1 

802 

803 elif operation_type == "error_update": 803 ↛ 784line 803 didn't jump to line 784 because the condition on line 803 was always true

804 # Update error status in database 

805 from ...database.models import ResearchHistory 

806 

807 research = ( 

808 db_session.query(ResearchHistory) 

809 .filter_by(id=op_data["research_id"]) 

810 .first() 

811 ) 

812 if research: 812 ↛ 784line 812 didn't jump to line 784 because the condition on line 812 was always true

813 research.status = op_data["status"] 

814 research.error_message = op_data["error_message"] 

815 research.research_meta = op_data["metadata"] 

816 research.completed_at = op_data["completed_at"] 

817 if op_data.get("report_path"): 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true

818 research.report_path = op_data["report_path"] 

819 db_session.commit() 

820 processed_count += 1 

821 

822 except Exception: 

823 logger.exception(f"Error processing operation {op_id}") 

824 # Rollback to clear the failed transaction state 

825 try: 

826 db_session.rollback() 

827 except Exception: 

828 logger.warning( 

829 f"Failed to rollback after error in operation {op_id}" 

830 ) 

831 

832 return processed_count 

833 

834 

835# Global queue processor instance 

836queue_processor = QueueProcessorV2()