Coverage for src / local_deep_research / web / queue / processor_v2.py: 92%

282 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Queue processor v2 - uses encrypted user databases instead of service.db 

3Supports both direct execution and queue modes. 

4""" 

5 

6import threading 

7import time 

8import uuid 

9from typing import Any, Dict, Optional, Set 

10 

11from loguru import logger 

12 

13from ...constants import ResearchStatus 

14from ...database.encrypted_db import db_manager 

15from ...database.models import ( 

16 QueuedResearch, 

17 ResearchHistory, 

18 UserActiveResearch, 

19) 

20from ...database.queue_service import UserQueueService 

21from ...database.session_context import get_user_db_session 

22from ...database.session_passwords import session_password_store 

23from ...notifications.queue_helpers import ( 

24 send_research_completed_notification_from_session, 

25 send_research_failed_notification_from_session, 

26) 

27from ..routes.globals import active_research, termination_flags 

28from ..services.research_service import ( 

29 run_research_process, 

30 start_research_process, 

31) 

32 

33# Retry configuration constants for notification database queries 

34MAX_RESEARCH_LOOKUP_RETRIES = 3 

35INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds 

36RETRY_BACKOFF_MULTIPLIER = 2 

37 

38 

39class QueueProcessorV2: 

40 """ 

41 Processes queued researches using encrypted user databases. 

42 This replaces the service.db approach. 

43 """ 

44 

45 def __init__(self, check_interval=10): 

46 """ 

47 Initialize the queue processor. 

48 

49 Args: 

50 check_interval: How often to check for work (seconds) 

51 """ 

52 self.check_interval = check_interval 

53 self.running = False 

54 self.thread = None 

55 

56 # Per-user settings will be retrieved from each user's database 

57 # when processing their queue using SettingsManager 

58 logger.info( 

59 "Queue processor v2 initialized - will use per-user settings from SettingsManager" 

60 ) 

61 

62 # Track which users we should check 

63 self._users_to_check: Set[str] = set() 

64 self._users_lock = threading.Lock() 

65 

66 # Track pending operations from background threads 

67 self.pending_operations = {} 

68 self._pending_operations_lock = threading.Lock() 

69 

70 def start(self): 

71 """Start the queue processor thread.""" 

72 if self.running: 

73 logger.warning("Queue processor already running") 

74 return 

75 

76 self.running = True 

77 self.thread = threading.Thread( 

78 target=self._process_queue_loop, daemon=True 

79 ) 

80 self.thread.start() 

81 logger.info("Queue processor v2 started") 

82 

83 def stop(self): 

84 """Stop the queue processor thread.""" 

85 self.running = False 

86 if self.thread: 

87 self.thread.join(timeout=10) 

88 logger.info("Queue processor v2 stopped") 

89 

90 def notify_user_activity(self, username: str, session_id: str): 

91 """ 

92 Notify that a user has activity and their queue should be checked. 

93 

94 Args: 

95 username: The username 

96 session_id: The Flask session ID (for password access) 

97 """ 

98 with self._users_lock: 

99 self._users_to_check.add(f"{username}:{session_id}") 

100 logger.debug(f"User {username} added to queue check list") 

101 

102 def notify_research_queued(self, username: str, research_id: str, **kwargs): 

103 """ 

104 Notify that a research was queued. 

105 In direct mode, this immediately starts the research if slots are available. 

106 In queue mode, it adds to the queue. 

107 

108 Args: 

109 username: The username 

110 research_id: The research ID 

111 **kwargs: Additional parameters for direct execution (query, mode, etc.) 

112 """ 

113 # Check user's queue_mode setting when we have database access 

114 if kwargs: 

115 session_id = kwargs.get("session_id") 

116 if session_id: 

117 # Check if we can start it directly 

118 password = session_password_store.get_session_password( 

119 username, session_id 

120 ) 

121 if password: 

122 try: 

123 # Open database and check settings + active count 

124 engine = db_manager.open_user_database( 

125 username, password 

126 ) 

127 if engine: 127 ↛ 192line 127 didn't jump to line 192 because the condition on line 127 was always true

128 with get_user_db_session(username) as db_session: 

129 # Get user's settings using SettingsManager 

130 from ...settings.manager import SettingsManager 

131 

132 settings_manager = SettingsManager(db_session) 

133 

134 # Get user's queue_mode setting (env > DB > default) 

135 queue_mode = settings_manager.get_setting( 

136 "app.queue_mode", "direct" 

137 ) 

138 

139 # Get user's max concurrent setting (env > DB > default) 

140 max_concurrent = settings_manager.get_setting( 

141 "app.max_concurrent_researches", 3 

142 ) 

143 

144 logger.debug( 

145 f"User {username} settings: queue_mode={queue_mode}, " 

146 f"max_concurrent={max_concurrent}" 

147 ) 

148 

149 # Only try direct execution if user has queue_mode="direct" 

150 if queue_mode == "direct": 

151 # Count active researches 

152 active_count = ( 

153 db_session.query(UserActiveResearch) 

154 .filter_by( 

155 username=username, 

156 status=ResearchStatus.IN_PROGRESS, 

157 ) 

158 .count() 

159 ) 

160 

161 if active_count < max_concurrent: 

162 # We have slots - start directly! 

163 logger.info( 

164 f"Direct mode: Starting research {research_id} immediately " 

165 f"(active: {active_count}/{max_concurrent})" 

166 ) 

167 

168 # Start the research directly 

169 self._start_research_directly( 

170 username, 

171 research_id, 

172 password, 

173 **kwargs, 

174 ) 

175 return 

176 else: 

177 logger.info( 

178 f"Direct mode: Max concurrent reached ({active_count}/" 

179 f"{max_concurrent}), queueing {research_id}" 

180 ) 

181 else: 

182 logger.info( 

183 f"User {username} has queue_mode={queue_mode}, " 

184 f"queueing research {research_id}" 

185 ) 

186 except Exception: 

187 logger.exception( 

188 f"Error in direct execution for {username}" 

189 ) 

190 

191 # Fall back to queue mode (or if direct mode failed) 

192 try: 

193 with get_user_db_session(username) as session: 

194 queue_service = UserQueueService(session) 

195 queue_service.add_task_metadata( 

196 task_id=research_id, 

197 task_type="research", 

198 priority=0, 

199 ) 

200 logger.info( 

201 f"Research {research_id} queued for user {username}" 

202 ) 

203 except Exception: 

204 logger.exception(f"Failed to update queue status for {username}") 

205 

206 def _start_research_directly( 

207 self, username: str, research_id: str, password: str, **kwargs 

208 ): 

209 """ 

210 Start a research directly without queueing. 

211 

212 Args: 

213 username: The username 

214 research_id: The research ID 

215 password: The user's password 

216 **kwargs: Research parameters (query, mode, settings, etc.) 

217 """ 

218 query = kwargs.get("query") 

219 mode = kwargs.get("mode") 

220 settings_snapshot = kwargs.get("settings_snapshot", {}) 

221 

222 # Create active research record 

223 try: 

224 with get_user_db_session(username) as db_session: 

225 active_record = UserActiveResearch( 

226 username=username, 

227 research_id=research_id, 

228 status=ResearchStatus.IN_PROGRESS, 

229 thread_id="pending", 

230 settings_snapshot=settings_snapshot, 

231 ) 

232 db_session.add(active_record) 

233 db_session.commit() 

234 

235 # Update task status if it exists 

236 queue_service = UserQueueService(db_session) 

237 queue_service.update_task_status(research_id, "processing") 

238 except Exception: 

239 logger.exception( 

240 f"Failed to create active research record for {research_id}" 

241 ) 

242 return 

243 

244 # Extract parameters from kwargs 

245 model_provider = kwargs.get("model_provider") 

246 model = kwargs.get("model") 

247 custom_endpoint = kwargs.get("custom_endpoint") 

248 search_engine = kwargs.get("search_engine") 

249 

250 # Start the research process 

251 try: 

252 research_thread = start_research_process( 

253 research_id, 

254 query, 

255 mode, 

256 active_research, 

257 termination_flags, 

258 run_research_process, 

259 username=username, 

260 user_password=password, 

261 model_provider=model_provider, 

262 model=model, 

263 custom_endpoint=custom_endpoint, 

264 search_engine=search_engine, 

265 max_results=kwargs.get("max_results"), 

266 time_period=kwargs.get("time_period"), 

267 iterations=kwargs.get("iterations"), 

268 questions_per_iteration=kwargs.get("questions_per_iteration"), 

269 strategy=kwargs.get("strategy", "source-based"), 

270 settings_snapshot=settings_snapshot, 

271 ) 

272 

273 # Update thread ID 

274 try: 

275 with get_user_db_session(username) as db_session: 

276 active_record = ( 

277 db_session.query(UserActiveResearch) 

278 .filter_by(username=username, research_id=research_id) 

279 .first() 

280 ) 

281 if active_record: 281 ↛ 289line 281 didn't jump to line 289

282 active_record.thread_id = str(research_thread.ident) 

283 db_session.commit() 

284 except Exception: 

285 logger.exception( 

286 f"Failed to update thread ID for {research_id}" 

287 ) 

288 

289 logger.info( 

290 f"Direct execution: Started research {research_id} for user {username} " 

291 f"in thread {research_thread.ident}" 

292 ) 

293 

294 except Exception: 

295 logger.exception(f"Failed to start research {research_id} directly") 

296 # Clean up the active record 

297 try: 

298 with get_user_db_session(username) as db_session: 

299 active_record = ( 

300 db_session.query(UserActiveResearch) 

301 .filter_by(username=username, research_id=research_id) 

302 .first() 

303 ) 

304 if active_record: 304 ↛ exitline 304 didn't jump to the function exit

305 db_session.delete(active_record) 

306 db_session.commit() 

307 except Exception: 

308 pass 

309 

310 def notify_research_completed( 

311 self, username: str, research_id: str, user_password: str = None 

312 ): 

313 """ 

314 Notify that a research completed. 

315 Updates the user's queue status in their database. 

316 

317 Args: 

318 username: The username 

319 research_id: The research ID 

320 user_password: User password for database access. Required for queue 

321 updates and database lookups during notification sending. 

322 Optional only because some callers may not have it 

323 available, in which case only basic updates occur. 

324 """ 

325 try: 

326 # get_user_db_session is already imported at module level (line 19) 

327 # It accepts optional password parameter and returns a context manager 

328 with get_user_db_session(username, user_password) as session: 

329 queue_service = UserQueueService(session) 

330 queue_service.update_task_status( 

331 research_id, ResearchStatus.COMPLETED 

332 ) 

333 logger.info( 

334 f"Research {research_id} completed for user {username}" 

335 ) 

336 

337 # Send notification using helper from notification module 

338 send_research_completed_notification_from_session( 

339 username=username, 

340 research_id=research_id, 

341 db_session=session, 

342 ) 

343 

344 except Exception: 

345 logger.exception( 

346 f"Failed to update completion status for {username}" 

347 ) 

348 

349 def notify_research_failed( 

350 self, 

351 username: str, 

352 research_id: str, 

353 error_message: str = None, 

354 user_password: str = None, 

355 ): 

356 """ 

357 Notify that a research failed. 

358 Updates the user's queue status in their database and sends notification. 

359 

360 Args: 

361 username: The username 

362 research_id: The research ID 

363 error_message: Optional error message 

364 user_password: User password for database access. Required for queue 

365 updates and database lookups during notification sending. 

366 Optional only because some callers may not have it 

367 available, in which case only basic updates occur. 

368 """ 

369 try: 

370 # get_user_db_session is already imported at module level (line 19) 

371 # It accepts optional password parameter and returns a context manager 

372 with get_user_db_session(username, user_password) as session: 

373 queue_service = UserQueueService(session) 

374 queue_service.update_task_status( 

375 research_id, 

376 ResearchStatus.FAILED, 

377 error_message=error_message, 

378 ) 

379 logger.info( 

380 f"Research {research_id} failed for user {username}: " 

381 f"{error_message}" 

382 ) 

383 

384 # Send notification using helper from notification module 

385 send_research_failed_notification_from_session( 

386 username=username, 

387 research_id=research_id, 

388 error_message=error_message or "Unknown error", 

389 db_session=session, 

390 ) 

391 

392 except Exception: 

393 logger.exception(f"Failed to update failure status for {username}") 

394 

395 def _process_queue_loop(self): 

396 """Main loop that processes the queue.""" 

397 while self.running: 

398 try: 

399 # Get list of users to check (don't clear immediately) 

400 with self._users_lock: 

401 users_to_check = list(self._users_to_check) 

402 

403 # Process each user's queue 

404 users_to_remove = [] 

405 for user_session in users_to_check: 

406 try: 

407 username, session_id = user_session.split(":", 1) 

408 # _process_user_queue returns True if queue is empty 

409 queue_empty = self._process_user_queue( 

410 username, session_id 

411 ) 

412 if queue_empty: 

413 users_to_remove.append(user_session) 

414 except Exception: 

415 logger.exception( 

416 f"Error processing queue for {user_session}" 

417 ) 

418 # Don't remove on error - the _process_user_queue method 

419 # determines whether to keep checking based on error type 

420 

421 # Only remove users whose queues are now empty 

422 with self._users_lock: 

423 for user_session in users_to_remove: 

424 self._users_to_check.discard(user_session) 

425 

426 except Exception: 

427 logger.exception("Error in queue processor loop") 

428 

429 time.sleep(self.check_interval) 

430 

431 def _process_user_queue(self, username: str, session_id: str) -> bool: 

432 """ 

433 Process the queue for a specific user. 

434 

435 Args: 

436 username: The username 

437 session_id: The Flask session ID 

438 

439 Returns: 

440 True if the queue is empty, False if there are still items 

441 """ 

442 # Get the user's password from session store 

443 password = session_password_store.get_session_password( 

444 username, session_id 

445 ) 

446 if not password: 

447 logger.debug( 

448 f"No password available for user {username}, skipping queue check" 

449 ) 

450 return True # Remove from checking - session expired 

451 

452 # Open the user's encrypted database 

453 try: 

454 # First ensure the database is open 

455 engine = db_manager.open_user_database(username, password) 

456 if not engine: 

457 logger.error(f"Failed to open database for user {username}") 

458 return False # Keep checking - could be temporary DB issue 

459 

460 # Get a session and process the queue 

461 with get_user_db_session(username, password) as db_session: 

462 queue_service = UserQueueService(db_session) 

463 

464 # Get user's settings using SettingsManager 

465 from ...settings.manager import SettingsManager 

466 

467 settings_manager = SettingsManager(db_session) 

468 

469 # Get user's max concurrent setting (env > DB > default) 

470 max_concurrent = settings_manager.get_setting( 

471 "app.max_concurrent_researches", 3 

472 ) 

473 

474 # Get queue status 

475 queue_status = queue_service.get_queue_status() or { 

476 "active_tasks": 0, 

477 "queued_tasks": 0, 

478 } 

479 

480 # Calculate available slots 

481 available_slots = max_concurrent - queue_status["active_tasks"] 

482 

483 if available_slots <= 0: 

484 # No slots available, but queue might not be empty 

485 return False # Keep checking 

486 

487 if queue_status["queued_tasks"] == 0: 487 ↛ 491line 487 didn't jump to line 491 because the condition on line 487 was always true

488 # Queue is empty 

489 return True # Remove from checking 

490 

491 logger.info( 

492 f"Processing queue for {username}: " 

493 f"{queue_status['active_tasks']} active, " 

494 f"{queue_status['queued_tasks']} queued, " 

495 f"{available_slots} slots available" 

496 ) 

497 

498 # Process queued researches 

499 self._start_queued_researches( 

500 db_session, 

501 queue_service, 

502 username, 

503 password, 

504 available_slots, 

505 ) 

506 

507 # Check if there are still items in queue 

508 updated_status = queue_service.get_queue_status() or { 

509 "queued_tasks": 0 

510 } 

511 return updated_status["queued_tasks"] == 0 

512 

513 except Exception: 

514 logger.exception(f"Error processing queue for user {username}") 

515 return False # Keep checking - errors might be temporary 

516 

517 def _start_queued_researches( 

518 self, 

519 db_session, 

520 queue_service: UserQueueService, 

521 username: str, 

522 password: str, 

523 available_slots: int, 

524 ): 

525 """Start queued researches up to available slots.""" 

526 # Get queued researches 

527 queued = ( 

528 db_session.query(QueuedResearch) 

529 .filter_by(username=username, is_processing=False) 

530 .order_by(QueuedResearch.position) 

531 .limit(available_slots) 

532 .all() 

533 ) 

534 

535 for queued_research in queued: 

536 try: 

537 # Mark as processing 

538 queued_research.is_processing = True 

539 db_session.commit() 

540 

541 # Update task status 

542 queue_service.update_task_status( 

543 queued_research.research_id, "processing" 

544 ) 

545 

546 # Start the research 

547 self._start_research( 

548 db_session, 

549 username, 

550 password, 

551 queued_research, 

552 ) 

553 

554 # Remove from queue 

555 db_session.delete(queued_research) 

556 db_session.commit() 

557 

558 logger.info( 

559 f"Started queued research {queued_research.research_id} " 

560 f"for user {username}" 

561 ) 

562 

563 except Exception: 

564 logger.exception( 

565 f"Error starting queued research {queued_research.research_id}" 

566 ) 

567 # Reset processing flag 

568 queued_research.is_processing = False 

569 db_session.commit() 

570 

571 # Update task status 

572 queue_service.update_task_status( 

573 queued_research.research_id, 

574 ResearchStatus.FAILED, 

575 error_message="Failed to start research", 

576 ) 

577 

578 def _start_research( 

579 self, 

580 db_session, 

581 username: str, 

582 password: str, 

583 queued_research, 

584 ): 

585 """Start a queued research.""" 

586 # Update research status 

587 research = ( 

588 db_session.query(ResearchHistory) 

589 .filter_by(id=queued_research.research_id) 

590 .first() 

591 ) 

592 

593 if not research: 

594 raise ValueError( 

595 f"Research {queued_research.research_id} not found" 

596 ) 

597 

598 research.status = ResearchStatus.IN_PROGRESS 

599 db_session.commit() 

600 

601 # Create active research record 

602 active_record = UserActiveResearch( 

603 username=username, 

604 research_id=queued_research.research_id, 

605 status=ResearchStatus.IN_PROGRESS, 

606 thread_id="pending", 

607 settings_snapshot=queued_research.settings_snapshot, 

608 ) 

609 db_session.add(active_record) 

610 db_session.commit() 

611 

612 # Extract settings 

613 settings_snapshot = queued_research.settings_snapshot or {} 

614 

615 # Handle new vs legacy structure 

616 if ( 

617 isinstance(settings_snapshot, dict) 

618 and "submission" in settings_snapshot 

619 ): 

620 submission_params = settings_snapshot.get("submission", {}) 

621 complete_settings = settings_snapshot.get("settings_snapshot", {}) 

622 else: 

623 submission_params = settings_snapshot 

624 complete_settings = {} 

625 

626 # Start the research process with password 

627 research_thread = start_research_process( 

628 queued_research.research_id, 

629 queued_research.query, 

630 queued_research.mode, 

631 active_research, 

632 termination_flags, 

633 run_research_process, 

634 username=username, 

635 user_password=password, # Pass password for metrics 

636 model_provider=submission_params.get("model_provider"), 

637 model=submission_params.get("model"), 

638 custom_endpoint=submission_params.get("custom_endpoint"), 

639 search_engine=submission_params.get("search_engine"), 

640 max_results=submission_params.get("max_results"), 

641 time_period=submission_params.get("time_period"), 

642 iterations=submission_params.get("iterations"), 

643 questions_per_iteration=submission_params.get( 

644 "questions_per_iteration" 

645 ), 

646 strategy=submission_params.get("strategy", "source-based"), 

647 settings_snapshot=complete_settings, 

648 ) 

649 

650 # Update thread ID 

651 active_record.thread_id = str(research_thread.ident) 

652 db_session.commit() 

653 

654 def process_user_request(self, username: str, session_id: str) -> int: 

655 """ 

656 Process queue for a user during their request. 

657 This is called from request context to check and start queued items. 

658 

659 Returns: 

660 Number of researches started 

661 """ 

662 try: 

663 # Add user to check list 

664 self.notify_user_activity(username, session_id) 

665 

666 # Force immediate check (don't wait for loop) 

667 password = session_password_store.get_session_password( 

668 username, session_id 

669 ) 

670 if password: 

671 # Open database and check queue 

672 engine = db_manager.open_user_database(username, password) 

673 if engine: 673 ↛ 686line 673 didn't jump to line 686 because the condition on line 673 was always true

674 with get_user_db_session(username) as db_session: 

675 queue_service = UserQueueService(db_session) 

676 status = queue_service.get_queue_status() 

677 

678 if status and status["queued_tasks"] > 0: 

679 logger.info( 

680 f"User {username} has {status['queued_tasks']} " 

681 f"queued tasks, triggering immediate processing" 

682 ) 

683 # Process will happen in background thread 

684 return status["queued_tasks"] 

685 

686 return 0 

687 

688 except Exception: 

689 logger.exception(f"Error in process_user_request for {username}") 

690 return 0 

691 

692 def queue_progress_update( 

693 self, username: str, research_id: str, progress: float 

694 ): 

695 """ 

696 Queue a progress update that needs database access. 

697 For compatibility with old processor during migration. 

698 

699 Args: 

700 username: The username 

701 research_id: The research ID 

702 progress: The progress value (0-100) 

703 """ 

704 # In processor_v2, we can update directly if we have database access 

705 # or queue it for later processing 

706 operation_id = str(uuid.uuid4()) 

707 with self._pending_operations_lock: 

708 self.pending_operations[operation_id] = { 

709 "username": username, 

710 "operation_type": "progress_update", 

711 "research_id": research_id, 

712 "progress": progress, 

713 "timestamp": time.time(), 

714 } 

715 logger.debug( 

716 f"Queued progress update for research {research_id}: {progress}%" 

717 ) 

718 

719 def queue_error_update( 

720 self, 

721 username: str, 

722 research_id: str, 

723 status: str, 

724 error_message: str, 

725 metadata: Dict[str, Any], 

726 completed_at: str, 

727 report_path: Optional[str] = None, 

728 ): 

729 """ 

730 Queue an error status update that needs database access. 

731 For compatibility with old processor during migration. 

732 

733 Args: 

734 username: The username 

735 research_id: The research ID 

736 status: The status to set (failed, suspended, etc.) 

737 error_message: The error message 

738 metadata: Research metadata 

739 completed_at: Completion timestamp 

740 report_path: Optional path to error report 

741 """ 

742 operation_id = str(uuid.uuid4()) 

743 with self._pending_operations_lock: 

744 self.pending_operations[operation_id] = { 

745 "username": username, 

746 "operation_type": "error_update", 

747 "research_id": research_id, 

748 "status": status, 

749 "error_message": error_message, 

750 "metadata": metadata, 

751 "completed_at": completed_at, 

752 "report_path": report_path, 

753 "timestamp": time.time(), 

754 } 

755 logger.info( 

756 f"Queued error update for research {research_id} with status {status}" 

757 ) 

758 

759 def process_pending_operations_for_user( 

760 self, username: str, db_session 

761 ) -> int: 

762 """ 

763 Process pending operations for a user when we have database access. 

764 Called from request context where encrypted database is accessible. 

765 For compatibility with old processor during migration. 

766 

767 Args: 

768 username: Username to process operations for 

769 db_session: Active database session for the user 

770 

771 Returns: 

772 Number of operations processed 

773 """ 

774 # Find pending operations for this user (with lock) 

775 operations_to_process = [] 

776 with self._pending_operations_lock: 

777 for op_id, op_data in list(self.pending_operations.items()): 

778 if op_data["username"] == username: 

779 operations_to_process.append((op_id, op_data)) 

780 # Remove immediately to prevent duplicate processing 

781 del self.pending_operations[op_id] 

782 

783 if not operations_to_process: 

784 return 0 

785 

786 processed_count = 0 

787 

788 # Process operations outside the lock (to avoid holding lock during DB operations) 

789 for op_id, op_data in operations_to_process: 

790 try: 

791 operation_type = op_data.get("operation_type") 

792 

793 if operation_type == "progress_update": 

794 # Update progress in database 

795 from ...database.models import ResearchHistory 

796 

797 research = ( 

798 db_session.query(ResearchHistory) 

799 .filter_by(id=op_data["research_id"]) 

800 .first() 

801 ) 

802 if research: 802 ↛ 789line 802 didn't jump to line 789 because the condition on line 802 was always true

803 # Update the progress column directly 

804 research.progress = op_data["progress"] 

805 db_session.commit() 

806 processed_count += 1 

807 

808 elif operation_type == "error_update": 808 ↛ 789line 808 didn't jump to line 789 because the condition on line 808 was always true

809 # Update error status in database 

810 from ...database.models import ResearchHistory 

811 

812 research = ( 

813 db_session.query(ResearchHistory) 

814 .filter_by(id=op_data["research_id"]) 

815 .first() 

816 ) 

817 if research: 817 ↛ 789line 817 didn't jump to line 789 because the condition on line 817 was always true

818 research.status = op_data["status"] 

819 research.error_message = op_data["error_message"] 

820 research.research_meta = op_data["metadata"] 

821 research.completed_at = op_data["completed_at"] 

822 if op_data.get("report_path"): 

823 research.report_path = op_data["report_path"] 

824 db_session.commit() 

825 processed_count += 1 

826 

827 except Exception: 

828 logger.exception(f"Error processing operation {op_id}") 

829 # Rollback to clear the failed transaction state 

830 try: 

831 db_session.rollback() 

832 except Exception: 

833 logger.warning( 

834 f"Failed to rollback after error in operation {op_id}" 

835 ) 

836 

837 return processed_count 

838 

839 

840# Global queue processor instance 

841queue_processor = QueueProcessorV2()