Coverage for src/local_deep_research/web/queue/processor_v2.py: 89%

446 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Queue processor v2 - uses encrypted user databases instead of service.db 

3Supports both direct execution and queue modes. 

4""" 

5 

6import threading 

7import time 

8import uuid 

9from typing import Any, Dict, Optional 

10 

11from loguru import logger 

12 

13from ...constants import ResearchStatus 

14from ...database.encrypted_db import db_manager 

15from ...database.models import ( 

16 QueuedResearch, 

17 ResearchHistory, 

18 UserActiveResearch, 

19) 

20from ...database.queue_service import UserQueueService 

21from ...database.session_context import get_user_db_session 

22from ...database.session_passwords import session_password_store 

23from ...exceptions import DuplicateResearchError, SystemAtCapacityError 

24from ...notifications.queue_helpers import ( 

25 send_research_completed_notification_from_session, 

26 send_research_failed_notification_from_session, 

27) 

28from ..services.research_service import ( 

29 run_research_process, 

30 start_research_process, 

31) 

32 

33# Retry configuration constants for notification database queries 

34MAX_RESEARCH_LOOKUP_RETRIES = 3 

35INITIAL_RESEARCH_LOOKUP_DELAY = 0.5 # seconds 

36RETRY_BACKOFF_MULTIPLIER = 2 

37 

38# Give up on a queued research after this many consecutive spawn failures. 

39# Each failure leaves is_processing=False so the next loop tick retries. 

40SPAWN_RETRY_LIMIT = 3 

41 

42 

43class QueueProcessorV2: 

44 """ 

45 Processes queued researches using encrypted user databases. 

46 This replaces the service.db approach. 

47 """ 

48 

49 def __init__(self, check_interval=10): 

50 """ 

51 Initialize the queue processor. 

52 

53 Args: 

54 check_interval: How often to check for work (seconds) 

55 """ 

56 self.check_interval = check_interval 

57 self.running = False 

58 self.thread = None 

59 self._loop_iteration = 0 

60 

61 # Per-user settings will be retrieved from each user's database 

62 # when processing their queue using SettingsManager 

63 logger.info( 

64 "Queue processor v2 initialized - will use per-user settings from SettingsManager" 

65 ) 

66 

67 # Track which users we should check 

68 self._users_to_check: set[tuple[str, str]] = set() 

69 self._users_lock = threading.Lock() 

70 

71 # Track pending operations from background threads 

72 self.pending_operations = {} 

73 self._pending_operations_lock = threading.Lock() 

74 

75 # Per-user serialisation for the "count active → start direct" 

76 # critical section. Without this the count-then-insert races with 

77 # itself for the same user (e.g. two concurrent research submissions 

78 # from two browser tabs), and IMMEDIATE isolation was what used to 

79 # paper over it at the DB layer. 

80 self._user_critical_locks: Dict[str, threading.Lock] = {} 

81 self._user_critical_locks_lock = threading.Lock() 

82 

83 # Count consecutive spawn failures per research_id. Entries are 

84 # popped on success or after hitting SPAWN_RETRY_LIMIT (then the 

85 # research is marked FAILED). In-memory is sufficient: a restart 

86 # resets the counter and the research gets a fresh N retries, 

87 # which is the desired behavior if the underlying system issue 

88 # (thread pool, memory) cleared. 

89 # Access is guarded by _spawn_retry_counts_lock because the 

90 # increment path is a read-modify-write and the loop and direct 

91 # request paths can interleave. 

92 self._spawn_retry_counts: dict[str, int] = {} 

93 self._spawn_retry_counts_lock = threading.Lock() 

94 

95 def _get_user_critical_lock(self, username: str) -> threading.Lock: 

96 """Get (or lazily create) the per-user lock used to serialise the 

97 count-active-and-start-direct critical section for a given user. 

98 """ 

99 with self._user_critical_locks_lock: 

100 lock = self._user_critical_locks.get(username) 

101 if lock is None: 101 ↛ 104line 101 didn't jump to line 104 because the condition on line 101 was always true

102 lock = threading.Lock() 

103 self._user_critical_locks[username] = lock 

104 return lock 

105 

106 def pop_user_critical_lock(self, username: str) -> None: 

107 """Remove the per-user critical-section lock for ``username``. 

108 

109 Called from the user-close path so this instance dict doesn't 

110 accumulate one entry per username across the process lifetime. 

111 The next direct-research submission for that user lazily 

112 re-creates the lock if needed — the lock has no state that 

113 needs to persist across login/logout. 

114 """ 

115 with self._user_critical_locks_lock: 

116 self._user_critical_locks.pop(username, None) 

117 

118 def _bump_spawn_retry_count(self, research_id: str) -> int: 

119 """Atomically increment and return the spawn-retry counter for 

120 ``research_id``. Extracted so tests can exercise the real 

121 locked increment path instead of duplicating the lock in the 

122 test worker (which would be a tautology). 

123 """ 

124 with self._spawn_retry_counts_lock: 

125 attempts = self._spawn_retry_counts.get(research_id, 0) + 1 

126 self._spawn_retry_counts[research_id] = attempts 

127 return attempts 

128 

129 @staticmethod 

130 def _commit_with_safe_rollback(db_session, context: str) -> bool: 

131 """Commit ``db_session`` with best-effort rollback on failure. 

132 

133 Returns ``True`` on success, ``False`` if the commit raised. 

134 The failure path logs via ``logger.exception`` and attempts a 

135 rollback, itself guarded so a subsequent rollback failure is 

136 logged at debug level rather than propagated. 

137 

138 Extracted because the ``try: commit / except: log + try: 

139 rollback`` idiom repeats at ≥5 sites in this module; inlining 

140 hides the defensive structure behind nested ``try`` blocks and 

141 makes each callsite longer than the work it describes. 

142 """ 

143 try: 

144 db_session.commit() 

145 return True 

146 except Exception: 

147 logger.exception(f"Commit failed: {context}") 

148 try: 

149 db_session.rollback() 

150 except Exception: 

151 logger.debug( 

152 f"Rollback after commit failure ({context})", 

153 exc_info=True, 

154 ) 

155 return False 

156 

157 def _delete_queue_row_safely( 

158 self, db_session, username: str, research_id: str 

159 ) -> None: 

160 """Best-effort delete of the ``QueuedResearch`` row for 

161 ``(username, research_id)``. 

162 

163 Rolls back any pending state first (the session may be in 

164 ``PendingRollbackError`` from a failed commit inside 

165 ``_start_research``), re-queries the row fresh, deletes it if 

166 present, and commits via ``_commit_with_safe_rollback``. 

167 

168 Use this for ``DuplicateResearchError`` cleanup where the goal 

169 is "drop the queue row regardless of session state." Do NOT 

170 use it for paths that need the delete to be atomic with other 

171 writes (e.g. the terminal FAILED path bundles 

172 ``ResearchHistory.status = FAILED`` with the queue-row delete 

173 in a single commit — that stays inline). 

174 """ 

175 try: 

176 db_session.rollback() 

177 except Exception: 

178 logger.debug( 

179 f"Rollback before queue-row delete for {research_id}", 

180 exc_info=True, 

181 ) 

182 try: 

183 fresh_queued = ( 

184 db_session.query(QueuedResearch) 

185 .filter_by(username=username, research_id=research_id) 

186 .first() 

187 ) 

188 if fresh_queued: 

189 db_session.delete(fresh_queued) 

190 self._commit_with_safe_rollback( 

191 db_session, 

192 f"queue-row delete for research {research_id}", 

193 ) 

194 except Exception: 

195 logger.exception( 

196 f"Failed to query/delete queue row for {research_id}" 

197 ) 

198 try: 

199 db_session.rollback() 

200 except Exception: 

201 logger.debug( 

202 f"Rollback after queue-row delete failure for {research_id}", 

203 exc_info=True, 

204 ) 

205 

206 def start(self): 

207 """Start the queue processor thread.""" 

208 if self.running: 

209 logger.warning("Queue processor already running") 

210 return 

211 

212 self.running = True 

213 self.thread = threading.Thread( 

214 target=self._process_queue_loop, daemon=True 

215 ) 

216 self.thread.start() 

217 logger.info("Queue processor v2 started") 

218 

219 def stop(self): 

220 """Stop the queue processor thread.""" 

221 self.running = False 

222 if self.thread: 

223 self.thread.join(timeout=10) 

224 logger.info("Queue processor v2 stopped") 

225 

226 def notify_user_activity(self, username: str, session_id: str): 

227 """ 

228 Notify that a user has activity and their queue should be checked. 

229 

230 Args: 

231 username: The username 

232 session_id: The Flask session ID (for password access) 

233 """ 

234 with self._users_lock: 

235 self._users_to_check.add((username, session_id)) 

236 logger.debug(f"User {username} added to queue check list") 

237 

238 def notify_research_queued(self, username: str, research_id: str, **kwargs): 

239 """ 

240 Notify that a research was queued. 

241 In direct mode, this immediately starts the research if slots are available. 

242 In queue mode, it adds to the queue. 

243 

244 Args: 

245 username: The username 

246 research_id: The research ID 

247 **kwargs: Additional parameters for direct execution (query, mode, etc.) 

248 """ 

249 # Check user's queue_mode setting when we have database access 

250 if kwargs: 

251 session_id = kwargs.get("session_id") 

252 if session_id: 

253 # Check if we can start it directly 

254 password = session_password_store.get_session_password( 

255 username, session_id 

256 ) 

257 if password: 

258 try: 

259 # Open database and check settings + active count 

260 engine = db_manager.open_user_database( 

261 username, password 

262 ) 

263 if engine: 263 ↛ 335line 263 didn't jump to line 335 because the condition on line 263 was always true

264 with get_user_db_session(username) as db_session: 

265 # Get user's settings using SettingsManager 

266 from ...settings.manager import SettingsManager 

267 

268 settings_manager = SettingsManager(db_session) 

269 

270 # Get user's queue_mode setting (env > DB > default) 

271 queue_mode = settings_manager.get_setting( 

272 "app.queue_mode", "direct" 

273 ) 

274 

275 # Get user's max concurrent setting (env > DB > default) 

276 max_concurrent = settings_manager.get_setting( 

277 "app.max_concurrent_researches", 3 

278 ) 

279 

280 logger.debug( 

281 f"User {username} settings: queue_mode={queue_mode}, " 

282 f"max_concurrent={max_concurrent}" 

283 ) 

284 

285 # Only try direct execution if user has queue_mode="direct" 

286 if queue_mode == "direct": 

287 # Serialise the count→check→start critical 

288 # section at the application layer. Two 

289 # concurrent submissions for the same user 

290 # must not both observe the same active 

291 # count and both start — that would exceed 

292 # max_concurrent. A per-user Python lock 

293 # gives us that atomicity independent of 

294 # the DB isolation level. 

295 with self._get_user_critical_lock(username): 

296 active_count = ( 

297 db_session.query(UserActiveResearch) 

298 .filter_by( 

299 username=username, 

300 status=ResearchStatus.IN_PROGRESS, 

301 ) 

302 .count() 

303 ) 

304 

305 if active_count < max_concurrent: 

306 # We have slots - start directly! 

307 logger.info( 

308 f"Direct mode: Starting research {research_id} immediately " 

309 f"(active: {active_count}/{max_concurrent})" 

310 ) 

311 

312 # Start the research directly 

313 self._start_research_directly( 

314 username, 

315 research_id, 

316 password, 

317 **kwargs, 

318 ) 

319 return 

320 logger.info( 

321 f"Direct mode: Max concurrent reached ({active_count}/" 

322 f"{max_concurrent}), queueing {research_id}" 

323 ) 

324 else: 

325 logger.info( 

326 f"User {username} has queue_mode={queue_mode}, " 

327 f"queueing research {research_id}" 

328 ) 

329 except Exception: 

330 logger.exception( 

331 f"Error in direct execution for {username}" 

332 ) 

333 

334 # Fall back to queue mode (or if direct mode failed) 

335 try: 

336 with get_user_db_session(username) as session: 

337 queue_service = UserQueueService(session) 

338 queue_service.add_task_metadata( 

339 task_id=research_id, 

340 task_type="research", 

341 priority=0, 

342 ) 

343 logger.info( 

344 f"Research {research_id} queued for user {username}" 

345 ) 

346 except Exception: 

347 logger.exception(f"Failed to update queue status for {username}") 

348 

349 def _start_research_directly( 

350 self, username: str, research_id: str, password: str, **kwargs 

351 ): 

352 """ 

353 Start a research directly without queueing. 

354 

355 Args: 

356 username: The username 

357 research_id: The research ID 

358 password: The user's password 

359 **kwargs: Research parameters (query, mode, settings, etc.) 

360 """ 

361 query = kwargs.get("query") 

362 mode = kwargs.get("mode") 

363 settings_snapshot = kwargs.get("settings_snapshot", {}) 

364 

365 # Create active research record 

366 try: 

367 with get_user_db_session(username) as db_session: 

368 active_record = UserActiveResearch( 

369 username=username, 

370 research_id=research_id, 

371 status=ResearchStatus.IN_PROGRESS, 

372 thread_id="pending", 

373 settings_snapshot=settings_snapshot, 

374 ) 

375 db_session.add(active_record) 

376 db_session.commit() 

377 

378 # Update task status if it exists 

379 queue_service = UserQueueService(db_session) 

380 queue_service.update_task_status(research_id, "processing") 

381 except Exception: 

382 logger.exception( 

383 f"Failed to create active research record for {research_id}" 

384 ) 

385 return 

386 

387 # Extract parameters from kwargs 

388 model_provider = kwargs.get("model_provider") 

389 model = kwargs.get("model") 

390 custom_endpoint = kwargs.get("custom_endpoint") 

391 search_engine = kwargs.get("search_engine") 

392 

393 # Start the research process 

394 try: 

395 research_thread = start_research_process( 

396 research_id, 

397 query, 

398 mode, 

399 run_research_process, 

400 username=username, 

401 user_password=password, 

402 model_provider=model_provider, 

403 model=model, 

404 custom_endpoint=custom_endpoint, 

405 search_engine=search_engine, 

406 max_results=kwargs.get("max_results"), 

407 time_period=kwargs.get("time_period"), 

408 iterations=kwargs.get("iterations"), 

409 questions_per_iteration=kwargs.get("questions_per_iteration"), 

410 strategy=kwargs.get("strategy", "source-based"), 

411 settings_snapshot=settings_snapshot, 

412 ) 

413 

414 # Update thread ID 

415 try: 

416 with get_user_db_session(username) as db_session: 

417 active_record = ( 

418 db_session.query(UserActiveResearch) 

419 .filter_by(username=username, research_id=research_id) 

420 .first() 

421 ) 

422 if active_record: 422 ↛ 430line 422 didn't jump to line 430

423 active_record.thread_id = str(research_thread.ident) 

424 db_session.commit() 

425 except Exception: 

426 logger.exception( 

427 f"Failed to update thread ID for {research_id}" 

428 ) 

429 

430 logger.info( 

431 f"Direct execution: Started research {research_id} for user {username} " 

432 f"in thread {research_thread.ident}" 

433 ) 

434 

435 except DuplicateResearchError: 

436 # A live thread already owns this research_id. Do NOT delete 

437 # the UserActiveResearch row or mark ResearchHistory FAILED — 

438 # that state belongs to the live thread, and mutating it 

439 # would terminate a running research from the user's 

440 # perspective while it keeps executing. Same contract as the 

441 # queue processor's dedicated dup branch (#3506). 

442 logger.warning( 

443 f"Duplicate live thread detected for {research_id} " 

444 "in direct mode; leaving state intact" 

445 ) 

446 return 

447 except SystemAtCapacityError: 

448 # System at concurrent-research capacity in the direct-execution 

449 # path. Roll back the IN_PROGRESS active row and mark history 

450 # back to QUEUED so the queue processor can pick it up later. 

451 logger.info( 

452 f"Direct execution hit capacity for {research_id}; re-queueing" 

453 ) 

454 try: 

455 with get_user_db_session(username) as db_session: 

456 active_record = ( 

457 db_session.query(UserActiveResearch) 

458 .filter_by(username=username, research_id=research_id) 

459 .first() 

460 ) 

461 if active_record: 

462 db_session.delete(active_record) 

463 research_row = ( 

464 db_session.query(ResearchHistory) 

465 .filter_by(id=research_id) 

466 .first() 

467 ) 

468 if research_row: 

469 research_row.status = ResearchStatus.QUEUED 

470 # Bump queued_tasks so _process_user_queue's 

471 # `queued_tasks == 0` gate doesn't treat the queue as 

472 # empty and strand the QueuedResearch row the submit 

473 # path already created. The direct path returns before 

474 # the normal add_task_metadata call, so this is the 

475 # single, non-double-counting increment; 

476 # _start_queued_researches later dispatches the row and 

477 # update_task_status() transitions this TaskMetadata 

478 # queued->processing (balancing the counter). 

479 UserQueueService(db_session).add_task_metadata( 

480 task_id=research_id, 

481 task_type="research", 

482 priority=0, 

483 ) 

484 db_session.commit() 

485 except Exception: 

486 logger.exception( 

487 f"Cleanup after capacity reject failed for " 

488 f"{research_id}; the stale UserActiveResearch row is " 

489 f"recovered by reclaim_stale_user_active_research" 

490 ) 

491 return 

492 except Exception: 

493 logger.exception(f"Failed to start research {research_id} directly") 

494 # Clean up the active record AND mark the research terminal 

495 # FAILED so the user-visible state matches reality (no running 

496 # thread, not IN_PROGRESS). Same contract as the queue 

497 # processor's terminal-failure branch (#3481). 

498 try: 

499 with get_user_db_session(username) as db_session: 

500 active_record = ( 

501 db_session.query(UserActiveResearch) 

502 .filter_by(username=username, research_id=research_id) 

503 .first() 

504 ) 

505 if active_record: 505 ↛ 507line 505 didn't jump to line 507 because the condition on line 505 was always true

506 db_session.delete(active_record) 

507 research_row = ( 

508 db_session.query(ResearchHistory) 

509 .filter_by(id=research_id) 

510 .first() 

511 ) 

512 if research_row: 512 ↛ 514line 512 didn't jump to line 514 because the condition on line 512 was always true

513 research_row.status = ResearchStatus.FAILED 

514 db_session.commit() 

515 except Exception: 

516 logger.exception( 

517 f"Failed to clean up active research record for {research_id}" 

518 ) 

519 

520 def notify_research_completed( 

521 self, username: str, research_id: str, user_password: str | None = None 

522 ): 

523 """ 

524 Notify that a research completed. 

525 Updates the user's queue status in their database. 

526 

527 Args: 

528 username: The username 

529 research_id: The research ID 

530 user_password: User password for database access. Required for queue 

531 updates and database lookups during notification sending. 

532 Optional only because some callers may not have it 

533 available, in which case only basic updates occur. 

534 """ 

535 try: 

536 # get_user_db_session is already imported at module level (line 19) 

537 # It accepts optional password parameter and returns a context manager 

538 with get_user_db_session(username, user_password) as session: 

539 queue_service = UserQueueService(session) 

540 queue_service.update_task_status( 

541 research_id, ResearchStatus.COMPLETED 

542 ) 

543 logger.info( 

544 f"Research {research_id} completed for user {username}" 

545 ) 

546 

547 # Send notification using helper from notification module 

548 send_research_completed_notification_from_session( 

549 username=username, 

550 research_id=research_id, 

551 db_session=session, 

552 ) 

553 

554 except Exception: 

555 logger.exception( 

556 f"Failed to update completion status for {username}" 

557 ) 

558 

559 # Auto-convert research to document in History collection. 

560 # Documents only — FAISS indexing is triggered separately by the user 

561 # via "Index All" on the History page. 

562 from ...research_library.search.services.research_history_indexer import ( 

563 auto_convert_research, 

564 ) 

565 

566 auto_convert_research(username, research_id, db_password=user_password) 

567 

568 def notify_research_failed( 

569 self, 

570 username: str, 

571 research_id: str, 

572 error_message: str | None = None, 

573 user_password: str | None = None, 

574 ): 

575 """ 

576 Notify that a research failed. 

577 Updates the user's queue status in their database and sends notification. 

578 

579 Args: 

580 username: The username 

581 research_id: The research ID 

582 error_message: Optional error message 

583 user_password: User password for database access. Required for queue 

584 updates and database lookups during notification sending. 

585 Optional only because some callers may not have it 

586 available, in which case only basic updates occur. 

587 """ 

588 try: 

589 # get_user_db_session is already imported at module level (line 19) 

590 # It accepts optional password parameter and returns a context manager 

591 with get_user_db_session(username, user_password) as session: 

592 queue_service = UserQueueService(session) 

593 queue_service.update_task_status( 

594 research_id, 

595 ResearchStatus.FAILED, 

596 error_message=error_message, 

597 ) 

598 logger.info( 

599 f"Research {research_id} failed for user {username}: " 

600 f"{error_message}" 

601 ) 

602 

603 # Send notification using helper from notification module 

604 send_research_failed_notification_from_session( 

605 username=username, 

606 research_id=research_id, 

607 error_message=error_message or "Unknown error", 

608 db_session=session, 

609 ) 

610 

611 except Exception: 

612 logger.exception(f"Failed to update failure status for {username}") 

613 

614 def _process_queue_loop(self): 

615 """Main loop that processes the queue.""" 

616 while self.running: 

617 try: 

618 # Get list of users to check (don't clear immediately) 

619 with self._users_lock: 

620 users_to_check = list(self._users_to_check) 

621 

622 # Process each user's queue 

623 users_to_remove = [] 

624 for user_session in users_to_check: 

625 try: 

626 username, session_id = user_session 

627 # _process_user_queue returns True if queue is empty 

628 queue_empty = self._process_user_queue( 

629 username, session_id 

630 ) 

631 if queue_empty: 

632 users_to_remove.append(user_session) 

633 except Exception: 

634 logger.exception( 

635 f"Error processing queue for {user_session}" 

636 ) 

637 # Don't remove on error - the _process_user_queue method 

638 # determines whether to keep checking based on error type 

639 

640 # Only remove users whose queues are now empty 

641 with self._users_lock: 

642 for user_session in users_to_remove: 

643 self._users_to_check.discard(user_session) 

644 

645 except Exception: 

646 logger.exception("Error in queue processor loop") 

647 finally: 

648 # Clean up thread-local database session after each iteration. 

649 # The loop opens a new session each iteration via get_user_db_session(); 

650 # closing it returns the connection to the shared QueuePool promptly. 

651 try: 

652 from ...database.thread_local_session import ( 

653 cleanup_current_thread, 

654 cleanup_dead_threads, 

655 ) 

656 

657 cleanup_current_thread() 

658 except Exception: 

659 logger.debug( 

660 "thread-local cleanup on shutdown", exc_info=True 

661 ) 

662 

663 # Periodic dead-thread credential sweep (every ~60s). 

664 # One of three sweep trigger points (app_factory 

665 # teardown, connection_cleanup scheduler, and here). 

666 self._loop_iteration += 1 

667 if self._loop_iteration % 6 == 0: # Every ~60s (10s × 6) 

668 try: 

669 cleanup_dead_threads() 

670 except Exception: 

671 logger.debug( 

672 "periodic dead-thread sweep", exc_info=True 

673 ) 

674 

675 time.sleep(self.check_interval) 

676 

677 def _process_user_queue(self, username: str, session_id: str) -> bool: 

678 """ 

679 Process the queue for a specific user. 

680 

681 Args: 

682 username: The username 

683 session_id: The Flask session ID 

684 

685 Returns: 

686 True if the queue is empty, False if there are still items 

687 """ 

688 # Get the user's password from session store 

689 password = session_password_store.get_session_password( 

690 username, session_id 

691 ) 

692 if not password: 

693 logger.debug( 

694 f"No password available for user {username}, skipping queue check" 

695 ) 

696 return True # Remove from checking - session expired 

697 

698 # Open the user's encrypted database 

699 try: 

700 # First ensure the database is open 

701 engine = db_manager.open_user_database(username, password) 

702 if not engine: 

703 logger.error(f"Failed to open database for user {username}") 

704 return False # Keep checking - could be temporary DB issue 

705 

706 # Get a session and process the queue 

707 with get_user_db_session(username, password) as db_session: 

708 queue_service = UserQueueService(db_session) 

709 

710 # Get user's settings using SettingsManager 

711 from ...settings.manager import SettingsManager 

712 

713 settings_manager = SettingsManager(db_session) 

714 

715 # Get user's max concurrent setting (env > DB > default) 

716 max_concurrent = settings_manager.get_setting( 

717 "app.max_concurrent_researches", 3 

718 ) 

719 

720 # Get queue status 

721 queue_status = queue_service.get_queue_status() or { 

722 "active_tasks": 0, 

723 "queued_tasks": 0, 

724 } 

725 

726 # Calculate available slots 

727 available_slots = max_concurrent - queue_status["active_tasks"] 

728 

729 if available_slots <= 0: 

730 # No slots available, but queue might not be empty 

731 return False # Keep checking 

732 

733 if queue_status["queued_tasks"] == 0: 733 ↛ 737line 733 didn't jump to line 737 because the condition on line 733 was always true

734 # Queue is empty 

735 return True # Remove from checking 

736 

737 logger.info( 

738 f"Processing queue for {username}: " 

739 f"{queue_status['active_tasks']} active, " 

740 f"{queue_status['queued_tasks']} queued, " 

741 f"{available_slots} slots available" 

742 ) 

743 

744 # Process queued researches 

745 self._start_queued_researches( 

746 db_session, 

747 queue_service, 

748 username, 

749 password, 

750 available_slots, 

751 ) 

752 

753 # Check if there are still items in queue 

754 updated_status = queue_service.get_queue_status() or { 

755 "queued_tasks": 0 

756 } 

757 return bool(updated_status["queued_tasks"] == 0) 

758 

759 except Exception: 

760 logger.exception(f"Error processing queue for user {username}") 

761 return False # Keep checking - errors might be temporary 

762 

763 def _reclaim_stranded_queue_rows(self, db_session, username: str) -> int: 

764 """Reclaim queue rows stranded by a crash or restart. 

765 

766 A row is stranded when ``is_processing=True`` but no live thread 

767 exists in ``_active_research`` for its ``research_id``. This can 

768 happen after a crash/restart between the pre-spawn IN_PROGRESS 

769 commit and the queue-row deletion in ``_start_queued_researches`` 

770 — the row is invisible to the normal ``is_processing=False`` 

771 query and would never be retried. 

772 

773 Reverts ``QueuedResearch.is_processing`` to False and — if 

774 ``ResearchHistory.status`` is still IN_PROGRESS with no live 

775 thread — reverts that to QUEUED so the next tick can freshly 

776 spawn. Returns the number of rows reclaimed. 

777 """ 

778 from ..routes.globals import is_research_active 

779 

780 stranded = ( 

781 db_session.query(QueuedResearch) 

782 .filter_by(username=username, is_processing=True) 

783 .all() 

784 ) 

785 reclaimed = 0 

786 for row in stranded: 

787 if is_research_active(row.research_id): 

788 # A legitimate in-flight claim; don't touch. 

789 continue 

790 row.is_processing = False 

791 research = ( 

792 db_session.query(ResearchHistory) 

793 .filter_by(id=row.research_id) 

794 .first() 

795 ) 

796 status_changed = ( 

797 research is not None 

798 and research.status == ResearchStatus.IN_PROGRESS 

799 ) 

800 if status_changed: 

801 research.status = ResearchStatus.QUEUED 

802 reclaimed += 1 

803 logger.warning( 

804 f"Reclaimed stranded queue row for research " 

805 f"{row.research_id} (user {username}): no live thread, " 

806 "resetting is_processing=False" 

807 + (" and status=QUEUED" if status_changed else "") 

808 ) 

809 if reclaimed: 

810 if not self._commit_with_safe_rollback( 810 ↛ 814line 810 didn't jump to line 814 because the condition on line 810 was never true

811 db_session, 

812 f"reclaim of stranded rows for user {username}", 

813 ): 

814 return 0 

815 return reclaimed 

816 

817 def _start_queued_researches( 

818 self, 

819 db_session, 

820 queue_service: UserQueueService, 

821 username: str, 

822 password: str, 

823 available_slots: int, 

824 ): 

825 """Start queued researches up to available slots.""" 

826 # Before picking work, reclaim any rows stranded by a prior 

827 # crash — otherwise they are invisible to the is_processing=False 

828 # filter below and would never retry. 

829 self._reclaim_stranded_queue_rows(db_session, username) 

830 

831 # Get queued researches 

832 queued = ( 

833 db_session.query(QueuedResearch) 

834 .filter_by(username=username, is_processing=False) 

835 .order_by(QueuedResearch.position) 

836 .limit(available_slots) 

837 .all() 

838 ) 

839 

840 for queued_research in queued: 

841 research_id = queued_research.research_id 

842 try: 

843 # Atomically claim this item by flipping is_processing from 

844 # False to True in a single UPDATE. If another worker has 

845 # already claimed it since our SELECT above, the UPDATE will 

846 # match zero rows and we skip. Under non-IMMEDIATE isolation 

847 # the previous SELECT+assign pattern would race and two 

848 # workers could both process the same queued item. 

849 claimed = ( 

850 db_session.query(QueuedResearch) 

851 .filter_by( 

852 id=queued_research.id, 

853 is_processing=False, 

854 ) 

855 .update( 

856 {QueuedResearch.is_processing: True}, 

857 synchronize_session=False, 

858 ) 

859 ) 

860 db_session.commit() 

861 if not claimed: 

862 logger.debug( 

863 f"Queued research {research_id} " 

864 f"already claimed by another worker; skipping" 

865 ) 

866 continue 

867 # Refresh local object state now that we hold the claim 

868 db_session.refresh(queued_research) 

869 

870 # Update task status 

871 queue_service.update_task_status(research_id, "processing") 

872 

873 # Start the research 

874 self._start_research( 

875 db_session, 

876 username, 

877 password, 

878 queued_research, 

879 ) 

880 

881 # Success — clear any prior spawn-failure count and 

882 # remove the queue row. 

883 with self._spawn_retry_counts_lock: 

884 self._spawn_retry_counts.pop(research_id, None) 

885 db_session.delete(queued_research) 

886 db_session.commit() 

887 

888 logger.info( 

889 f"Started queued research {research_id} for user {username}" 

890 ) 

891 

892 except DuplicateResearchError: 

893 # Raised by _start_research when a prior attempt's thread 

894 # is still live, OR when the ResearchHistory row is in a 

895 # non-QUEUED state (IN_PROGRESS from a prior attempt's 

896 # successful pre-spawn commit; terminal COMPLETED / 

897 # FAILED / SUSPENDED from a thread that already finished 

898 # and cleaned up). In every case the correct behavior is 

899 # the same: clear the stale queue row and the retry 

900 # counter, and do NOT fall through to the FAILED/notify 

901 # path — that would terminate-status a live thread or 

902 # emit a false failure for a completed one. 

903 logger.warning( 

904 f"Research {research_id} is already started " 

905 "(live thread or non-QUEUED status); clearing stale " 

906 "queue row" 

907 ) 

908 with self._spawn_retry_counts_lock: 

909 self._spawn_retry_counts.pop(research_id, None) 

910 self._delete_queue_row_safely(db_session, username, research_id) 

911 continue 

912 

913 except SystemAtCapacityError: 

914 # System hit the global concurrent-research capacity while 

915 # dispatching this queued item. _start_research already 

916 # reset the ResearchHistory row back to QUEUED before 

917 # re-raising. This is a transient condition, NOT a spawn 

918 # failure, so it must NOT count toward SPAWN_RETRY_LIMIT — 

919 # otherwise a busy system would wrongly mark a perfectly 

920 # valid queued research FAILED after a few ticks. Just 

921 # release the processing claim so the next tick retries. 

922 # Mirrors the dedicated handler in _start_research_directly. 

923 logger.info( 

924 f"System at capacity dispatching queued research " 

925 f"{research_id}; leaving queued for next tick" 

926 ) 

927 # Revert the queued->processing claim from 

928 # update_task_status("processing") above. The research stays 

929 # queued for the next tick, so its slot must return to 

930 # queued_tasks rather than leaking into active_tasks on 

931 # every capacity-rejected retry. 

932 queue_service.update_task_status(research_id, "queued") 

933 fresh_queued = ( 

934 db_session.query(QueuedResearch) 

935 .filter_by(username=username, research_id=research_id) 

936 .first() 

937 ) 

938 if fresh_queued: 938 ↛ 945line 938 didn't jump to line 945 because the condition on line 938 was always true

939 fresh_queued.is_processing = False 

940 self._commit_with_safe_rollback( 

941 db_session, 

942 "is_processing reset after capacity reject for " 

943 f"research {research_id}", 

944 ) 

945 continue 

946 

947 except Exception: 

948 logger.exception( 

949 f"Error starting queued research {research_id}" 

950 ) 

951 # Session may be in PendingRollbackError state after a 

952 # failed commit inside _start_research. 

953 try: 

954 db_session.rollback() 

955 except Exception: 

956 logger.debug( 

957 "Rollback after start failure", 

958 exc_info=True, 

959 ) 

960 

961 attempts = self._bump_spawn_retry_count(research_id) 

962 

963 # Re-query in case rollback expired the ORM object. 

964 fresh_queued = ( 

965 db_session.query(QueuedResearch) 

966 .filter_by(username=username, research_id=research_id) 

967 .first() 

968 ) 

969 

970 if attempts < SPAWN_RETRY_LIMIT: 

971 # Transient failure — allow the next loop tick to 

972 # retry. _start_research rolls back its own 

973 # IN_PROGRESS write on spawn failure, so the only 

974 # fix-up needed here is resetting is_processing. 

975 logger.warning( 

976 f"Spawn failed for research {research_id} " 

977 f"(attempt {attempts}/{SPAWN_RETRY_LIMIT}), " 

978 "leaving queued for retry" 

979 ) 

980 if fresh_queued: 980 ↛ 986line 980 didn't jump to line 986 because the condition on line 980 was always true

981 fresh_queued.is_processing = False 

982 self._commit_with_safe_rollback( 

983 db_session, 

984 f"is_processing reset for research {research_id}", 

985 ) 

986 continue 

987 

988 # Exhausted retries — mark terminal FAILED, delete the 

989 # queue row to stop re-dispatch, and notify the user. 

990 # Use logger.warning (not logger.exception) because the 

991 # spawn traceback was already logged at the top of this 

992 # except block; re-logging with exc_info emits a second 

993 # full traceback. 

994 logger.warning( 

995 f"Spawn failed for research {research_id} " 

996 f"after {attempts} attempts; marking FAILED" 

997 ) 

998 with self._spawn_retry_counts_lock: 

999 self._spawn_retry_counts.pop(research_id, None) 

1000 try: 

1001 research = ( 

1002 db_session.query(ResearchHistory) 

1003 .filter_by(id=research_id) 

1004 .first() 

1005 ) 

1006 if research: 1006 ↛ 1008line 1006 didn't jump to line 1008 because the condition on line 1006 was always true

1007 research.status = ResearchStatus.FAILED 

1008 if fresh_queued: 1008 ↛ 1010line 1008 didn't jump to line 1010 because the condition on line 1008 was always true

1009 db_session.delete(fresh_queued) 

1010 db_session.commit() 

1011 except Exception: 

1012 logger.exception( 

1013 "Failed to persist terminal FAILED state for " 

1014 f"research {research_id}" 

1015 ) 

1016 try: 

1017 db_session.rollback() 

1018 except Exception: 

1019 logger.debug( 

1020 "Rollback after terminal update failure", 

1021 exc_info=True, 

1022 ) 

1023 

1024 # notify_research_failed opens its own session and 

1025 # sends the user notification. Called exactly once 

1026 # per research_id because the counter is popped above. 

1027 self.notify_research_failed( 

1028 username=username, 

1029 research_id=research_id, 

1030 error_message=( 

1031 f"Failed to start research after {attempts} attempts" 

1032 ), 

1033 user_password=password, 

1034 ) 

1035 

1036 def _start_research( 

1037 self, 

1038 db_session, 

1039 username: str, 

1040 password: str, 

1041 queued_research, 

1042 ): 

1043 """Start a queued research. 

1044 

1045 Commits ``ResearchHistory.status = IN_PROGRESS`` BEFORE spawning 

1046 the thread. If we did this after, a fast-completing thread 

1047 (which opens its own DB session) could write ``COMPLETED`` and 

1048 then our post-spawn commit would overwrite that with 

1049 ``IN_PROGRESS``, stranding the research as stuck IN_PROGRESS 

1050 after it had already finished. 

1051 

1052 If ``start_research_process`` raises, reset status back to 

1053 ``QUEUED`` and re-raise so the caller's 3-strike retry logic 

1054 handles it. ``DuplicateResearchError`` is re-raised as-is 

1055 because a thread is already running for this research; mutating 

1056 status further would be wrong. 

1057 """ 

1058 research_id = queued_research.research_id 

1059 research = ( 

1060 db_session.query(ResearchHistory).filter_by(id=research_id).first() 

1061 ) 

1062 

1063 if not research: 

1064 raise ValueError(f"Research {research_id} not found") 

1065 

1066 # Guard against re-entering _start_research on a retry when a 

1067 # prior attempt's post-spawn UserActiveResearch commit failed: 

1068 # - IN_PROGRESS means the prior thread is (or was) running. 

1069 # - COMPLETED/FAILED means the prior thread already finished 

1070 # and cleaned itself up out of _active_research, so a bare 

1071 # retry would both overwrite the terminal status with 

1072 # IN_PROGRESS and then spawn a *second* thread (because 

1073 # check_and_start_research sees no live entry), re-running 

1074 # the whole research. 

1075 # In all three cases the correct behavior is the same: raise 

1076 # DuplicateResearchError so the caller's existing dup branch 

1077 # deletes the queue row without mutating status or notifying. 

1078 if research.status != ResearchStatus.QUEUED: 

1079 raise DuplicateResearchError( 

1080 f"Research {research_id} is already started " 

1081 f"(status={research.status})" 

1082 ) 

1083 

1084 # Claim IN_PROGRESS before spawn to close the 

1085 # thread-completes-before-parent-commits race. 

1086 research.status = ResearchStatus.IN_PROGRESS 

1087 db_session.commit() 

1088 

1089 # Extract settings 

1090 settings_snapshot = queued_research.settings_snapshot or {} 

1091 

1092 # Handle new vs legacy structure 

1093 if ( 

1094 isinstance(settings_snapshot, dict) 

1095 and "submission" in settings_snapshot 

1096 ): 

1097 submission_params = settings_snapshot.get("submission", {}) 

1098 complete_settings = settings_snapshot.get("settings_snapshot", {}) 

1099 else: 

1100 submission_params = settings_snapshot 

1101 complete_settings = {} 

1102 

1103 try: 

1104 research_thread = start_research_process( 

1105 research_id, 

1106 queued_research.query, 

1107 queued_research.mode, 

1108 run_research_process, 

1109 username=username, 

1110 user_password=password, # Pass password for metrics 

1111 model_provider=submission_params.get("model_provider"), 

1112 model=submission_params.get("model"), 

1113 custom_endpoint=submission_params.get("custom_endpoint"), 

1114 search_engine=submission_params.get("search_engine"), 

1115 max_results=submission_params.get("max_results"), 

1116 time_period=submission_params.get("time_period"), 

1117 iterations=submission_params.get("iterations"), 

1118 questions_per_iteration=submission_params.get( 

1119 "questions_per_iteration" 

1120 ), 

1121 strategy=submission_params.get("strategy", "source-based"), 

1122 settings_snapshot=complete_settings, 

1123 ) 

1124 except DuplicateResearchError: 

1125 # A live thread already exists for this research_id (e.g. 

1126 # previous attempt's post-spawn commit failed). Do NOT 

1127 # reset status — that would contradict the running thread. 

1128 raise 

1129 except SystemAtCapacityError: 

1130 # System at concurrent-research capacity. No thread was 

1131 # spawned. Reset to QUEUED so the next dispatch tick can try 

1132 # again — this is not a permanent spawn failure and should 

1133 # NOT count toward SPAWN_RETRY_LIMIT. 

1134 logger.info( 

1135 f"System at capacity when dispatching {research_id}; " 

1136 "re-queueing for next tick" 

1137 ) 

1138 research.status = ResearchStatus.QUEUED 

1139 self._commit_with_safe_rollback( 

1140 db_session, 

1141 f"status reset to QUEUED after capacity reject for research {research_id}", 

1142 ) 

1143 raise 

1144 except Exception: 

1145 # Genuine spawn failure: no thread exists. Roll back the 

1146 # IN_PROGRESS claim so the retry sees a clean QUEUED row. 

1147 research.status = ResearchStatus.QUEUED 

1148 self._commit_with_safe_rollback( 

1149 db_session, 

1150 f"status reset to QUEUED after spawn failure for research {research_id}", 

1151 ) 

1152 raise 

1153 

1154 # Thread is running. Record the active-research row. If this 

1155 # commit fails the live thread is unrecorded but still running. 

1156 # Raise DuplicateResearchError instead of letting a generic 

1157 # exception propagate, so the caller's dup branch cleans up the 

1158 # queue row without bumping the retry counter — if we let this 

1159 # count as a spawn failure, three consecutive post-spawn commit 

1160 # failures (or one at LIMIT-1) would push the counter to 

1161 # SPAWN_RETRY_LIMIT and mark a LIVE thread as terminal FAILED. 

1162 active_record = UserActiveResearch( 

1163 username=username, 

1164 research_id=research_id, 

1165 status=ResearchStatus.IN_PROGRESS, 

1166 thread_id=str(research_thread.ident), 

1167 settings_snapshot=queued_research.settings_snapshot, 

1168 ) 

1169 db_session.add(active_record) 

1170 if not self._commit_with_safe_rollback( 

1171 db_session, 

1172 f"UserActiveResearch persist after spawn for research {research_id}", 

1173 ): 

1174 # Thread is live; the commit failing leaves the UAR row 

1175 # unrecorded but the thread running. Raise 

1176 # DuplicateResearchError so the caller's dup branch deletes 

1177 # the queue row without bumping the retry counter — if we 

1178 # let a plain exception count as a spawn failure, a commit 

1179 # failure at SPAWN_RETRY_LIMIT - 1 would mark a LIVE thread 

1180 # as terminal FAILED. 

1181 raise DuplicateResearchError( 

1182 f"Research {research_id} thread is live; " 

1183 "UserActiveResearch commit failed" 

1184 ) 

1185 

1186 def process_user_request(self, username: str, session_id: str) -> int: 

1187 """ 

1188 Process queue for a user during their request. 

1189 This is called from request context to check and start queued items. 

1190 

1191 Returns: 

1192 Number of researches started 

1193 """ 

1194 try: 

1195 # Add user to check list 

1196 self.notify_user_activity(username, session_id) 

1197 

1198 # Force immediate check (don't wait for loop) 

1199 password = session_password_store.get_session_password( 

1200 username, session_id 

1201 ) 

1202 if password: 

1203 # Open database and check queue 

1204 engine = db_manager.open_user_database(username, password) 

1205 if engine: 1205 ↛ 1218line 1205 didn't jump to line 1218 because the condition on line 1205 was always true

1206 with get_user_db_session(username) as db_session: 

1207 queue_service = UserQueueService(db_session) 

1208 status = queue_service.get_queue_status() 

1209 

1210 if status and status["queued_tasks"] > 0: 

1211 logger.info( 

1212 f"User {username} has {status['queued_tasks']} " 

1213 f"queued tasks, triggering immediate processing" 

1214 ) 

1215 # Process will happen in background thread 

1216 return int(status["queued_tasks"]) 

1217 

1218 return 0 

1219 

1220 except Exception: 

1221 logger.exception(f"Error in process_user_request for {username}") 

1222 return 0 

1223 

1224 def queue_progress_update( 

1225 self, username: str, research_id: str, progress: float 

1226 ): 

1227 """ 

1228 Queue a progress update that needs database access. 

1229 For compatibility with old processor during migration. 

1230 

1231 Args: 

1232 username: The username 

1233 research_id: The research ID 

1234 progress: The progress value (0-100) 

1235 """ 

1236 # In processor_v2, we can update directly if we have database access 

1237 # or queue it for later processing 

1238 operation_id = str(uuid.uuid4()) 

1239 with self._pending_operations_lock: 

1240 self.pending_operations[operation_id] = { 

1241 "username": username, 

1242 "operation_type": "progress_update", 

1243 "research_id": research_id, 

1244 "progress": progress, 

1245 "timestamp": time.time(), 

1246 } 

1247 logger.debug( 

1248 f"Queued progress update for research {research_id}: {progress}%" 

1249 ) 

1250 

1251 def queue_error_update( 

1252 self, 

1253 username: str, 

1254 research_id: str, 

1255 status: str, 

1256 error_message: str, 

1257 metadata: Dict[str, Any], 

1258 completed_at: str, 

1259 report_path: Optional[str] = None, 

1260 ): 

1261 """ 

1262 Queue an error status update that needs database access. 

1263 For compatibility with old processor during migration. 

1264 

1265 Args: 

1266 username: The username 

1267 research_id: The research ID 

1268 status: The status to set (failed, suspended, etc.) 

1269 error_message: The error message 

1270 metadata: Research metadata 

1271 completed_at: Completion timestamp 

1272 report_path: Optional path to error report 

1273 """ 

1274 operation_id = str(uuid.uuid4()) 

1275 with self._pending_operations_lock: 

1276 self.pending_operations[operation_id] = { 

1277 "username": username, 

1278 "operation_type": "error_update", 

1279 "research_id": research_id, 

1280 "status": status, 

1281 "error_message": error_message, 

1282 "metadata": metadata, 

1283 "completed_at": completed_at, 

1284 "report_path": report_path, 

1285 "timestamp": time.time(), 

1286 } 

1287 logger.info( 

1288 f"Queued error update for research {research_id} with status {status}" 

1289 ) 

1290 

1291 def process_pending_operations_for_user( 

1292 self, username: str, db_session 

1293 ) -> int: 

1294 """ 

1295 Process pending operations for a user when we have database access. 

1296 Called from request context where encrypted database is accessible. 

1297 For compatibility with old processor during migration. 

1298 

1299 Args: 

1300 username: Username to process operations for 

1301 db_session: Active database session for the user 

1302 

1303 Returns: 

1304 Number of operations processed 

1305 """ 

1306 # Find pending operations for this user (with lock) 

1307 operations_to_process = [] 

1308 with self._pending_operations_lock: 

1309 for op_id, op_data in list(self.pending_operations.items()): 

1310 if op_data["username"] == username: 

1311 operations_to_process.append((op_id, op_data)) 

1312 # Remove immediately to prevent duplicate processing 

1313 del self.pending_operations[op_id] 

1314 

1315 if not operations_to_process: 

1316 return 0 

1317 

1318 processed_count = 0 

1319 

1320 # Process operations outside the lock (to avoid holding lock during DB operations) 

1321 for op_id, op_data in operations_to_process: 

1322 try: 

1323 operation_type = op_data.get("operation_type") 

1324 

1325 if operation_type == "progress_update": 

1326 # Update progress in database 

1327 from ...database.models import ResearchHistory 

1328 

1329 research = ( 

1330 db_session.query(ResearchHistory) 

1331 .filter_by(id=op_data["research_id"]) 

1332 .first() 

1333 ) 

1334 if research: 1334 ↛ 1321line 1334 didn't jump to line 1321 because the condition on line 1334 was always true

1335 # Update the progress column directly 

1336 research.progress = op_data["progress"] 

1337 db_session.commit() 

1338 processed_count += 1 

1339 

1340 elif operation_type == "error_update": 1340 ↛ 1321line 1340 didn't jump to line 1321 because the condition on line 1340 was always true

1341 # Update error status in database 

1342 from ...database.models import ResearchHistory 

1343 

1344 research = ( 

1345 db_session.query(ResearchHistory) 

1346 .filter_by(id=op_data["research_id"]) 

1347 .first() 

1348 ) 

1349 if research: 1349 ↛ 1321line 1349 didn't jump to line 1321 because the condition on line 1349 was always true

1350 research.status = op_data["status"] 

1351 research.error_message = op_data["error_message"] 

1352 research.research_meta = op_data["metadata"] 

1353 research.completed_at = op_data["completed_at"] 

1354 if op_data.get("report_path"): 

1355 research.report_path = op_data["report_path"] 

1356 db_session.commit() 

1357 processed_count += 1 

1358 

1359 except Exception: 

1360 logger.exception(f"Error processing operation {op_id}") 

1361 # Rollback to clear the failed transaction state 

1362 try: 

1363 db_session.rollback() 

1364 except Exception: 

1365 logger.warning( 

1366 f"Failed to rollback after error in operation {op_id}" 

1367 ) 

1368 

1369 return processed_count 

1370 

1371 

1372# Global queue processor instance 

1373queue_processor = QueueProcessorV2()