Coverage for src/local_deep_research/scheduler/background.py: 96%

678 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Activity-based news subscription scheduler for per-user encrypted databases. 

3Tracks user activity and temporarily stores credentials for automatic updates. 

4""" 

5 

6import random 

7import threading 

8from dataclasses import dataclass 

9from datetime import datetime, timedelta, UTC 

10from functools import wraps 

11from typing import Any, Callable, Dict, List 

12 

13from cachetools import TTLCache 

14from loguru import logger 

15from ..settings.logger import log_settings 

16from ..settings.manager import SnapshotSettingsContext 

17 

18from apscheduler.schedulers.background import BackgroundScheduler 

19from apscheduler.jobstores.base import JobLookupError 

20from ..constants import ResearchStatus 

21from ..database.credential_store_base import CredentialStoreBase 

22from ..database.session_context import safe_rollback 

23from ..database.thread_local_session import thread_cleanup 

24 

25# RAG indexing imports 

26from ..research_library.services.library_rag_service import LibraryRAGService 

27from ..database.library_init import get_default_library_id 

28from ..database.models.library import Document, DocumentCollection 

29 

30 

31SCHEDULER_AVAILABLE = True # Always available since it's a required dependency 

32 

33 

34class SchedulerCredentialStore(CredentialStoreBase): 

35 """Credential store for the news scheduler. 

36 

37 Stores user passwords with TTL expiration so that background scheduler 

38 jobs can access encrypted per-user databases. 

39 """ 

40 

41 def __init__(self, ttl_hours: int = 48): 

42 super().__init__(ttl_hours * 3600) 

43 

44 def store(self, username: str, password: str) -> None: 

45 """Store password for a user.""" 

46 self._store_credentials( 

47 username, {"username": username, "password": password} 

48 ) 

49 

50 def retrieve(self, username: str) -> str | None: 

51 """Retrieve password for a user. Returns None if expired/missing.""" 

52 result = self._retrieve_credentials(username, remove=False) 

53 return result[1] if result else None 

54 

55 def clear(self, username: str) -> None: 

56 """Clear stored password for a user.""" 

57 self.clear_entry(username) 

58 

59 

60@dataclass(frozen=True) 

61class DocumentSchedulerSettings: 

62 """ 

63 Immutable settings snapshot for document scheduler. 

64 

65 Thread-safe: This is a frozen dataclass that can be safely passed 

66 to and used from background threads. 

67 """ 

68 

69 enabled: bool = True 

70 interval_seconds: int = 1800 

71 download_pdfs: bool = False 

72 extract_text: bool = True 

73 generate_rag: bool = False 

74 last_run: str = "" 

75 

76 @classmethod 

77 def defaults(cls) -> "DocumentSchedulerSettings": 

78 """Return default settings.""" 

79 return cls() 

80 

81 

82class BackgroundJobScheduler: 

83 """ 

84 Singleton scheduler that manages news subscriptions for active users. 

85 

86 This scheduler: 

87 - Monitors user activity through database access 

88 - Temporarily stores user credentials in memory 

89 - Automatically schedules subscription checks 

90 - Cleans up inactive users after configurable period 

91 """ 

92 

93 _instance = None 

94 _lock = threading.Lock() 

95 

96 def __new__(cls): 

97 """Ensure singleton instance.""" 

98 if cls._instance is None: 

99 with cls._lock: 

100 if cls._instance is None: 100 ↛ 102line 100 didn't jump to line 102

101 cls._instance = super().__new__(cls) 

102 return cls._instance 

103 

104 def __init__(self): 

105 """Initialize the scheduler (only runs once due to singleton).""" 

106 # Skip if already initialized 

107 if hasattr(self, "_initialized"): 

108 return 

109 

110 # User session tracking 

111 self.user_sessions = {} # user_id -> {last_activity, scheduled_jobs} 

112 self.lock = threading.Lock() 

113 

114 # Credential store with TTL-based expiration 

115 self._credential_store = SchedulerCredentialStore(ttl_hours=48) 

116 

117 # Scheduler instance 

118 self.scheduler = BackgroundScheduler() 

119 

120 # Configuration (will be loaded from settings) 

121 self.config = self._load_default_config() 

122 

123 # State 

124 self.is_running = False 

125 self._app = None # Flask app reference for background job contexts 

126 

127 # Settings cache: username -> DocumentSchedulerSettings 

128 # TTL of 300 seconds (5 minutes) reduces database queries 

129 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300) 

130 self._settings_cache_lock = threading.Lock() 

131 

132 self._initialized = True 

133 logger.info("News scheduler initialized") 

134 

135 def _load_default_config(self) -> Dict[str, Any]: 

136 """Load default configuration (will be overridden by settings manager).""" 

137 return { 

138 "enabled": True, 

139 "retention_hours": 48, 

140 "cleanup_interval_hours": 1, 

141 "max_jitter_seconds": 300, 

142 "max_concurrent_jobs": 10, 

143 "subscription_batch_size": 5, 

144 "activity_check_interval_minutes": 5, 

145 } 

146 

147 def initialize_with_settings(self, settings_manager): 

148 """Initialize configuration from settings manager.""" 

149 try: 

150 # Load all scheduler settings 

151 self.settings_manager = settings_manager 

152 self.config = { 

153 "enabled": self._get_setting("news.scheduler.enabled", True), 

154 "retention_hours": self._get_setting( 

155 "news.scheduler.retention_hours", 48 

156 ), 

157 "cleanup_interval_hours": self._get_setting( 

158 "news.scheduler.cleanup_interval_hours", 1 

159 ), 

160 "max_jitter_seconds": self._get_setting( 

161 "news.scheduler.max_jitter_seconds", 300 

162 ), 

163 "max_concurrent_jobs": self._get_setting( 

164 "news.scheduler.max_concurrent_jobs", 10 

165 ), 

166 "subscription_batch_size": self._get_setting( 

167 "news.scheduler.batch_size", 5 

168 ), 

169 "activity_check_interval_minutes": self._get_setting( 

170 "news.scheduler.activity_check_interval", 5 

171 ), 

172 } 

173 log_settings(self.config, "Scheduler configuration loaded") 

174 except Exception: 

175 logger.exception("Error loading scheduler settings") 

176 # Keep default config 

177 

178 def _get_setting(self, key: str, default: Any) -> Any: 

179 """Get setting with fallback to default.""" 

180 if hasattr(self, "settings_manager") and self.settings_manager: 

181 return self.settings_manager.get_setting(key, default=default) 

182 return default 

183 

184 def set_app(self, app) -> None: 

185 """Store a reference to the Flask app for creating app contexts in background jobs.""" 

186 self._app = app 

187 

188 def _wrap_job(self, func: Callable) -> Callable: 

189 """Wrap a scheduler job function so it runs inside a Flask app context. 

190 

191 APScheduler runs jobs in a thread pool without Flask context. 

192 This wrapper pushes an app context before the job runs and pops it after. 

193 """ 

194 

195 @wraps(func) 

196 def wrapper(*args, **kwargs): 

197 if self._app is not None: 

198 with self._app.app_context(): 

199 return func(*args, **kwargs) 

200 else: 

201 logger.warning( 

202 f"No Flask app set on scheduler; running {func.__name__} without app context" 

203 ) 

204 return func(*args, **kwargs) 

205 

206 return wrapper 

207 

208 def _get_document_scheduler_settings( 

209 self, username: str, force_refresh: bool = False 

210 ) -> DocumentSchedulerSettings: 

211 """ 

212 Get document scheduler settings for a user with TTL caching. 

213 

214 This is the single source of truth for document scheduler settings. 

215 Settings are cached for 5 minutes by default to reduce database queries. 

216 

217 Args: 

218 username: User to get settings for 

219 force_refresh: If True, bypass cache and fetch fresh settings 

220 

221 Returns: 

222 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety) 

223 """ 

224 # Fast path: check cache without modifying it 

225 if not force_refresh: 

226 with self._settings_cache_lock: 

227 cached = self._settings_cache.get(username) 

228 if cached is not None: 

229 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}") 

230 cached_settings: DocumentSchedulerSettings = cached 

231 return cached_settings 

232 

233 # Cache miss - need to fetch from database 

234 logger.debug( 

235 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB" 

236 ) 

237 

238 # Get password from session 

239 session_info = self.user_sessions.get(username) 

240 if not session_info: 

241 logger.warning( 

242 f"[SETTINGS_CACHE] No session info for {username}, using defaults" 

243 ) 

244 return DocumentSchedulerSettings.defaults() 

245 

246 password = self._credential_store.retrieve(username) 

247 if not password: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 logger.warning( 

249 f"[SETTINGS_CACHE] Credentials expired for {username}, using defaults" 

250 ) 

251 return DocumentSchedulerSettings.defaults() 

252 

253 # Fetch settings from database (outside lock to avoid blocking) 

254 try: 

255 from ..database.session_context import get_user_db_session 

256 from ..settings.manager import SettingsManager 

257 

258 with get_user_db_session(username, password) as db: 

259 sm = SettingsManager(db) 

260 

261 settings = DocumentSchedulerSettings( 

262 enabled=sm.get_setting("document_scheduler.enabled", True), 

263 interval_seconds=sm.get_setting( 

264 "document_scheduler.interval_seconds", 1800 

265 ), 

266 download_pdfs=sm.get_setting( 

267 "document_scheduler.download_pdfs", False 

268 ), 

269 extract_text=sm.get_setting( 

270 "document_scheduler.extract_text", True 

271 ), 

272 generate_rag=sm.get_setting( 

273 "document_scheduler.generate_rag", False 

274 ), 

275 last_run=sm.get_setting("document_scheduler.last_run", ""), 

276 ) 

277 

278 # Store in cache 

279 with self._settings_cache_lock: 

280 self._settings_cache[username] = settings 

281 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}") 

282 

283 return settings 

284 

285 except Exception: 

286 logger.exception( 

287 f"[SETTINGS_CACHE] Error fetching settings for {username}" 

288 ) 

289 return DocumentSchedulerSettings.defaults() 

290 

291 def invalidate_user_settings_cache(self, username: str) -> bool: 

292 """ 

293 Invalidate cached settings for a specific user. 

294 

295 Call this when user settings change or user logs out. 

296 

297 Args: 

298 username: User whose cache to invalidate 

299 

300 Returns: 

301 True if cache entry was removed, False if not found 

302 """ 

303 with self._settings_cache_lock: 

304 if username in self._settings_cache: 

305 del self._settings_cache[username] 

306 logger.debug( 

307 f"[SETTINGS_CACHE] Invalidated cache for {username}" 

308 ) 

309 return True 

310 return False 

311 

312 def invalidate_all_settings_cache(self) -> int: 

313 """ 

314 Invalidate all cached settings. 

315 

316 Call this when doing bulk settings updates or during config reload. 

317 

318 Returns: 

319 Number of cache entries cleared 

320 """ 

321 with self._settings_cache_lock: 

322 count = len(self._settings_cache) 

323 self._settings_cache.clear() 

324 logger.info( 

325 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)" 

326 ) 

327 return count 

328 

329 def start(self): 

330 """Start the scheduler.""" 

331 if not self.config.get("enabled", True): 

332 logger.info("News scheduler is disabled in settings") 

333 return 

334 

335 if self.is_running: 

336 logger.warning("Scheduler is already running") 

337 return 

338 

339 if self._app is None: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 raise RuntimeError( 

341 "BackgroundJobScheduler.set_app() must be called before start()" 

342 ) 

343 

344 # Schedule cleanup job 

345 self.scheduler.add_job( 

346 self._wrap_job(self._run_cleanup_with_tracking), 

347 "interval", 

348 hours=self.config["cleanup_interval_hours"], 

349 id="cleanup_inactive_users", 

350 name="Cleanup Inactive User Sessions", 

351 jitter=60, # Add some jitter to cleanup 

352 ) 

353 

354 # Schedule configuration reload 

355 self.scheduler.add_job( 

356 self._wrap_job(self._reload_config), 

357 "interval", 

358 minutes=30, 

359 id="reload_config", 

360 name="Reload Configuration", 

361 ) 

362 

363 # Start the scheduler 

364 self.scheduler.start() 

365 self.is_running = True 

366 

367 # Schedule initial cleanup after a delay 

368 self.scheduler.add_job( 

369 self._wrap_job(self._run_cleanup_with_tracking), 

370 "date", 

371 run_date=datetime.now(UTC) + timedelta(seconds=30), 

372 id="initial_cleanup", 

373 ) 

374 

375 logger.info("News scheduler started") 

376 

377 def stop(self): 

378 """Stop the scheduler.""" 

379 if self.is_running: 

380 self.scheduler.shutdown(wait=True) 

381 self.is_running = False 

382 

383 # Clear all user sessions and credentials 

384 with self.lock: 

385 for username in self.user_sessions: 

386 self._credential_store.clear(username) 

387 self.user_sessions.clear() 

388 

389 logger.info("News scheduler stopped") 

390 

391 def update_user_info(self, username: str, password: str): 

392 """ 

393 Update user info in scheduler. Called on every database interaction. 

394 

395 Args: 

396 username: User's username 

397 password: User's password 

398 """ 

399 logger.info( 

400 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}" 

401 ) 

402 logger.debug( 

403 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}" 

404 ) 

405 

406 if not self.is_running: 

407 logger.warning( 

408 f"[SCHEDULER] Scheduler not running, cannot update user {username}" 

409 ) 

410 return 

411 

412 with self.lock: 

413 # Store password in credential store (inside lock to prevent 

414 # race where concurrent calls leave mismatched credentials) 

415 self._credential_store.store(username, password) 

416 

417 now = datetime.now(UTC) 

418 

419 if username not in self.user_sessions: 

420 # New user - create session info 

421 logger.info(f"[SCHEDULER] New user in scheduler: {username}") 

422 self.user_sessions[username] = { 

423 "last_activity": now, 

424 "scheduled_jobs": set(), 

425 } 

426 logger.debug( 

427 f"[SCHEDULER] Created session for {username}, scheduling subscriptions" 

428 ) 

429 # Schedule their subscriptions 

430 self._schedule_user_subscriptions(username) 

431 else: 

432 # Existing user - update info 

433 logger.info( 

434 f"[SCHEDULER] Updating existing user {username} activity, will reschedule" 

435 ) 

436 old_activity = self.user_sessions[username]["last_activity"] 

437 activity_delta = now - old_activity 

438 logger.debug( 

439 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}" 

440 ) 

441 

442 self.user_sessions[username]["last_activity"] = now 

443 logger.debug( 

444 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions" 

445 ) 

446 # Reschedule their subscriptions in case they changed 

447 self._schedule_user_subscriptions(username) 

448 

449 def unregister_user(self, username: str): 

450 """ 

451 Unregister a user and clean up their scheduled jobs. 

452 Called when user logs out. 

453 """ 

454 with self.lock: 

455 if username in self.user_sessions: 

456 logger.info(f"Unregistering user {username}") 

457 

458 # Remove all scheduled jobs for this user 

459 session_info = self.user_sessions[username] 

460 for job_id in session_info["scheduled_jobs"].copy(): 

461 try: 

462 self.scheduler.remove_job(job_id) 

463 except JobLookupError: 

464 pass 

465 

466 # Remove user session and clear credentials atomically 

467 del self.user_sessions[username] 

468 self._credential_store.clear(username) 

469 

470 # Invalidate settings cache for this user (outside lock) 

471 self.invalidate_user_settings_cache(username) 

472 logger.info(f"User {username} unregistered successfully") 

473 

474 def _schedule_user_subscriptions(self, username: str): 

475 """Schedule all active subscriptions for a user.""" 

476 logger.info(f"_schedule_user_subscriptions called for {username}") 

477 try: 

478 session_info = self.user_sessions.get(username) 

479 if not session_info: 

480 logger.warning(f"No session info found for {username}") 

481 return 

482 

483 password = self._credential_store.retrieve(username) 

484 if not password: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 logger.warning( 

486 f"Credentials expired for {username}, skipping subscription scheduling" 

487 ) 

488 return 

489 logger.debug(f"Got password for {username}: present") 

490 

491 # Get user's subscriptions from their encrypted database 

492 from ..database.session_context import get_user_db_session 

493 from ..database.models.news import NewsSubscription 

494 

495 with get_user_db_session(username, password) as db: 

496 subscriptions = ( 

497 db.query(NewsSubscription).filter_by(is_active=True).all() 

498 ) 

499 logger.debug( 

500 f"Query executed, found {len(subscriptions)} results" 

501 ) 

502 

503 # Log details of each subscription 

504 for sub in subscriptions: 

505 logger.debug( 

506 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes" 

507 ) 

508 

509 logger.info( 

510 f"Found {len(subscriptions)} active subscriptions for {username}" 

511 ) 

512 

513 # Clear old jobs for this user 

514 for job_id in session_info["scheduled_jobs"].copy(): 

515 try: 

516 self.scheduler.remove_job(job_id) 

517 session_info["scheduled_jobs"].remove(job_id) 

518 except JobLookupError: 

519 pass 

520 

521 # Schedule each subscription with jitter 

522 for sub in subscriptions: 

523 job_id = f"{username}_{sub.id}" 

524 

525 # Calculate jitter 

526 # Security: random jitter to distribute subscription timing, not security-sensitive 

527 max_jitter = int(self.config.get("max_jitter_seconds", 300)) 

528 jitter = random.randint(0, max_jitter) 

529 

530 # Determine trigger based on frequency 

531 refresh_minutes = sub.refresh_interval_minutes 

532 

533 if refresh_minutes <= 60: # 60 minutes or less 

534 # For hourly or more frequent, use interval trigger 

535 trigger = "interval" 

536 trigger_args = { 

537 "minutes": refresh_minutes, 

538 "jitter": jitter, 

539 "start_date": datetime.now(UTC), # Start immediately 

540 } 

541 else: 

542 # For less frequent, calculate next run time 

543 now = datetime.now(UTC) 

544 if sub.next_refresh: 

545 # Ensure timezone-aware for comparison with now (UTC) 

546 next_refresh_aware = sub.next_refresh 

547 if next_refresh_aware.tzinfo is None: 

548 logger.warning( 

549 f"Subscription {sub.id} has naive (non-tz-aware) " 

550 f"next_refresh datetime, assuming UTC" 

551 ) 

552 next_refresh_aware = next_refresh_aware.replace( 

553 tzinfo=UTC 

554 ) 

555 if next_refresh_aware <= now: 

556 # Subscription is overdue - run it immediately with small jitter 

557 logger.info( 

558 f"Subscription {sub.id} is overdue, scheduling immediate run" 

559 ) 

560 next_run = now + timedelta(seconds=jitter) 

561 else: 

562 next_run = next_refresh_aware 

563 else: 

564 next_run = now + timedelta( 

565 minutes=refresh_minutes, seconds=jitter 

566 ) 

567 

568 trigger = "date" 

569 trigger_args = {"run_date": next_run} 

570 

571 # Add the job 

572 self.scheduler.add_job( 

573 func=self._wrap_job(self._check_subscription), 

574 args=[username, sub.id], 

575 trigger=trigger, 

576 id=job_id, 

577 name=f"Check {sub.name or sub.query_or_topic[:30]}", 

578 replace_existing=True, 

579 **trigger_args, 

580 ) 

581 

582 session_info["scheduled_jobs"].add(job_id) 

583 logger.info(f"Scheduled job {job_id} with {trigger} trigger") 

584 

585 except Exception: 

586 logger.exception(f"Error scheduling subscriptions for {username}") 

587 

588 # Add document processing for this user 

589 self._schedule_document_processing(username) 

590 

591 def _schedule_document_processing(self, username: str): 

592 """Schedule document processing for a user.""" 

593 logger.info( 

594 f"[DOC_SCHEDULER] Scheduling document processing for {username}" 

595 ) 

596 logger.debug( 

597 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}" 

598 ) 

599 

600 try: 

601 session_info = self.user_sessions.get(username) 

602 if not session_info: 

603 logger.warning( 

604 f"[DOC_SCHEDULER] No session info found for {username}" 

605 ) 

606 logger.debug( 

607 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}" 

608 ) 

609 return 

610 

611 logger.debug( 

612 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}" 

613 ) 

614 

615 # Get user's document scheduler settings (cached) 

616 settings = self._get_document_scheduler_settings(username) 

617 

618 if not settings.enabled: 

619 logger.info( 

620 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}" 

621 ) 

622 return 

623 

624 logger.info( 

625 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, " 

626 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, " 

627 f"text={settings.extract_text}, rag={settings.generate_rag}" 

628 ) 

629 

630 # Schedule document processing job 

631 job_id = f"{username}_document_processing" 

632 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}") 

633 

634 # Remove existing document job if any 

635 try: 

636 self.scheduler.remove_job(job_id) 

637 session_info["scheduled_jobs"].discard(job_id) 

638 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}") 

639 except JobLookupError: 

640 logger.debug( 

641 f"[DOC_SCHEDULER] No existing job {job_id} to remove" 

642 ) 

643 pass # Job doesn't exist, that's fine 

644 

645 # Add new document processing job 

646 logger.debug( 

647 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s" 

648 ) 

649 self.scheduler.add_job( 

650 func=self._wrap_job(self._process_user_documents), 

651 args=[username], 

652 trigger="interval", 

653 seconds=settings.interval_seconds, 

654 id=job_id, 

655 name=f"Process Documents for {username}", 

656 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously 

657 max_instances=1, # Prevent overlapping document processing for same user 

658 replace_existing=True, 

659 ) 

660 

661 session_info["scheduled_jobs"].add(job_id) 

662 logger.info( 

663 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval" 

664 ) 

665 logger.debug( 

666 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}" 

667 ) 

668 

669 # Verify job was added 

670 job = self.scheduler.get_job(job_id) 

671 if job: 

672 logger.info( 

673 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}" 

674 ) 

675 else: 

676 logger.error( 

677 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!" 

678 ) 

679 

680 except Exception: 

681 logger.exception( 

682 f"Error scheduling document processing for {username}" 

683 ) 

684 

685 @thread_cleanup 

686 def _process_user_documents(self, username: str): 

687 """Process documents for a user.""" 

688 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}") 

689 start_time = datetime.now(UTC) 

690 

691 try: 

692 session_info = self.user_sessions.get(username) 

693 if not session_info: 

694 logger.warning( 

695 f"[DOC_SCHEDULER] No session info found for user {username}" 

696 ) 

697 return 

698 

699 password = self._credential_store.retrieve(username) 

700 if not password: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true

701 logger.warning( 

702 f"[DOC_SCHEDULER] Credentials expired for user {username}" 

703 ) 

704 return 

705 logger.debug( 

706 f"[DOC_SCHEDULER] Starting document processing for {username}" 

707 ) 

708 

709 # Get user's document scheduler settings (cached) 

710 settings = self._get_document_scheduler_settings(username) 

711 

712 logger.info( 

713 f"[DOC_SCHEDULER] Processing settings for {username}: " 

714 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}" 

715 ) 

716 

717 if not any( 

718 [ 

719 settings.download_pdfs, 

720 settings.extract_text, 

721 settings.generate_rag, 

722 ] 

723 ): 

724 logger.info( 

725 f"[DOC_SCHEDULER] No processing options enabled for user {username}" 

726 ) 

727 return 

728 

729 # Parse last_run from cached settings 

730 last_run = ( 

731 datetime.fromisoformat(settings.last_run) 

732 if settings.last_run 

733 else None 

734 ) 

735 

736 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}") 

737 

738 # Need database session for queries and updates 

739 from ..database.session_context import get_user_db_session 

740 from ..database.models.research import ResearchHistory 

741 from ..settings.manager import SettingsManager 

742 

743 with get_user_db_session(username, password) as db: 

744 settings_manager = SettingsManager(db) 

745 

746 # Query for completed research since last run 

747 logger.debug( 

748 f"[DOC_SCHEDULER] Querying for completed research since {last_run}" 

749 ) 

750 query = db.query(ResearchHistory).filter( 

751 ResearchHistory.status == ResearchStatus.COMPLETED, 

752 ResearchHistory.completed_at.is_not( 

753 None 

754 ), # Ensure completed_at is not null 

755 ) 

756 

757 if last_run: 

758 query = query.filter( 

759 ResearchHistory.completed_at > last_run 

760 ) 

761 

762 # Limit to recent research to prevent overwhelming 

763 query = query.order_by( 

764 ResearchHistory.completed_at.desc() 

765 ).limit(20) 

766 

767 research_sessions = query.all() 

768 logger.debug( 

769 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions" 

770 ) 

771 

772 if not research_sessions: 

773 logger.info( 

774 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}" 

775 ) 

776 return 

777 

778 logger.info( 

779 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}" 

780 ) 

781 

782 # Log details of each research session 

783 for i, research in enumerate( 

784 research_sessions[:5] 

785 ): # Log first 5 details 

786 title_safe = ( 

787 (research.title[:50] + "...") 

788 if research.title 

789 else "No title" 

790 ) 

791 completed_safe = ( 

792 research.completed_at 

793 if research.completed_at 

794 else "No completion time" 

795 ) 

796 logger.debug( 

797 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}" 

798 ) 

799 

800 # Handle completed_at which might be a string or datetime 

801 completed_at_obj = None 

802 if research.completed_at: 

803 if isinstance(research.completed_at, str): 

804 try: 

805 completed_at_obj = datetime.fromisoformat( 

806 research.completed_at.replace("Z", "+00:00") 

807 ) 

808 except (ValueError, TypeError, AttributeError): 

809 completed_at_obj = None 

810 else: 

811 completed_at_obj = research.completed_at 

812 

813 logger.debug( 

814 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}" 

815 ) 

816 logger.debug( 

817 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}" 

818 ) 

819 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}") 

820 logger.debug( 

821 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}" 

822 ) 

823 

824 processed_count = 0 

825 for research in research_sessions: 

826 try: 

827 logger.info( 

828 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}" 

829 ) 

830 

831 # Set search context so rate limiting works in both 

832 # download_pdfs and extract_text paths 

833 from ..utilities.thread_context import ( 

834 set_search_context, 

835 ) 

836 

837 set_search_context( 

838 { 

839 "research_id": str(research.id), 

840 "username": username, 

841 "user_password": password, 

842 "research_phase": "document_scheduler", 

843 } 

844 ) 

845 

846 # Call actual processing APIs 

847 if settings.download_pdfs: 

848 logger.info( 

849 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}" 

850 ) 

851 try: 

852 # Use the DownloadService to queue PDF downloads 

853 from ..research_library.services.download_service import ( 

854 DownloadService, 

855 ) 

856 

857 with DownloadService( 

858 username=username, password=password 

859 ) as download_service: 

860 queued_count = download_service.queue_research_downloads( 

861 research.id 

862 ) 

863 logger.info( 

864 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}" 

865 ) 

866 except Exception: 

867 # Recover the shared thread-local session 

868 # before continuing — without rollback the 

869 # next phase (text extract / RAG) and the 

870 # post-loop last_run commit run on a 

871 # poisoned session (issue #3827). 

872 safe_rollback(db, "DOC_SCHEDULER PDF download") 

873 logger.exception( 

874 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}" 

875 ) 

876 

877 if settings.extract_text: 

878 logger.info( 

879 f"[DOC_SCHEDULER] Extracting text for research {research.id}" 

880 ) 

881 try: 

882 # Use the DownloadService to extract text for all resources 

883 from ..research_library.services.download_service import ( 

884 DownloadService, 

885 ) 

886 from ..database.models.research import ( 

887 ResearchResource, 

888 ) 

889 

890 from ..research_library.utils import ( 

891 is_downloadable_url, 

892 ) 

893 

894 with DownloadService( 

895 username=username, password=password 

896 ) as download_service: 

897 # Get all resources for this research (reuse existing db session) 

898 all_resources = ( 

899 db.query(ResearchResource) 

900 .filter_by(research_id=research.id) 

901 .all() 

902 ) 

903 # Filter: only process downloadable resources (academic/PDF) 

904 resources = [ 

905 r 

906 for r in all_resources 

907 if is_downloadable_url(r.url) 

908 ] 

909 processed_count = 0 

910 for resource in resources: 

911 # We need to pass the password to the download service 

912 # The DownloadService creates its own database sessions, so we need to ensure password is available 

913 try: 

914 success, error = ( 

915 download_service.download_as_text( 

916 resource.id 

917 ) 

918 ) 

919 if success: 

920 processed_count += 1 

921 logger.info( 

922 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}" 

923 ) 

924 else: 

925 logger.warning( 

926 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}" 

927 ) 

928 except Exception as resource_error: 

929 # Roll back FIRST so the next 

930 # iteration's queries don't 

931 # cascade on a poisoned session 

932 # (issue #3827). 

933 safe_rollback( 

934 db, 

935 "DOC_SCHEDULER resource", 

936 ) 

937 logger.exception( 

938 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}" 

939 ) 

940 logger.info( 

941 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed" 

942 ) 

943 except Exception: 

944 safe_rollback( 

945 db, "DOC_SCHEDULER text extraction" 

946 ) 

947 logger.exception( 

948 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}" 

949 ) 

950 

951 if settings.generate_rag: 

952 logger.info( 

953 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}" 

954 ) 

955 try: 

956 # Get embedding settings from user configuration 

957 embedding_model = settings_manager.get_setting( 

958 "local_search_embedding_model", 

959 "all-MiniLM-L6-v2", 

960 ) 

961 embedding_provider = ( 

962 settings_manager.get_setting( 

963 "local_search_embedding_provider", 

964 "sentence_transformers", 

965 ) 

966 ) 

967 chunk_size = int( 

968 settings_manager.get_setting( 

969 "local_search_chunk_size", 1000 

970 ) 

971 ) 

972 chunk_overlap = int( 

973 settings_manager.get_setting( 

974 "local_search_chunk_overlap", 200 

975 ) 

976 ) 

977 

978 # Initialize RAG service with user's embedding configuration 

979 with LibraryRAGService( 

980 username=username, 

981 embedding_model=embedding_model, 

982 embedding_provider=embedding_provider, 

983 chunk_size=chunk_size, 

984 chunk_overlap=chunk_overlap, 

985 db_password=password, 

986 ) as rag_service: 

987 # Get default Library collection ID 

988 library_collection_id = ( 

989 get_default_library_id( 

990 username, password 

991 ) 

992 ) 

993 

994 # Query for unindexed documents from this research session 

995 documents_to_index = ( 

996 db.query(Document.id, Document.title) 

997 .outerjoin( 

998 DocumentCollection, 

999 ( 

1000 DocumentCollection.document_id 

1001 == Document.id 

1002 ) 

1003 & ( 

1004 DocumentCollection.collection_id 

1005 == library_collection_id 

1006 ), 

1007 ) 

1008 .filter( 

1009 Document.research_id == research.id, 

1010 Document.text_content.isnot(None), 

1011 ( 

1012 DocumentCollection.indexed.is_( 

1013 False 

1014 ) 

1015 | DocumentCollection.id.is_( 

1016 None 

1017 ) 

1018 ), 

1019 ) 

1020 .all() 

1021 ) 

1022 

1023 if not documents_to_index: 

1024 logger.info( 

1025 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}" 

1026 ) 

1027 else: 

1028 # Index each document 

1029 indexed_count = 0 

1030 for ( 

1031 doc_id, 

1032 doc_title, 

1033 ) in documents_to_index: 

1034 try: 

1035 result = rag_service.index_document( 

1036 document_id=doc_id, 

1037 collection_id=library_collection_id, 

1038 force_reindex=False, 

1039 ) 

1040 if ( 1040 ↛ 1030line 1040 didn't jump to line 1030 because the condition on line 1040 was always true

1041 result["status"] 

1042 == "success" 

1043 ): 

1044 indexed_count += 1 

1045 logger.info( 

1046 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) " 

1047 f"with {result.get('chunk_count', 0)} chunks" 

1048 ) 

1049 except Exception as doc_error: 

1050 logger.exception( 

1051 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}" 

1052 ) 

1053 

1054 logger.info( 

1055 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: " 

1056 f"{indexed_count}/{len(documents_to_index)} documents indexed" 

1057 ) 

1058 except Exception: 

1059 safe_rollback(db, "DOC_SCHEDULER RAG") 

1060 logger.exception( 

1061 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}" 

1062 ) 

1063 

1064 processed_count += 1 

1065 logger.debug( 

1066 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}" 

1067 ) 

1068 

1069 except Exception: 

1070 safe_rollback(db, "DOC_SCHEDULER research") 

1071 logger.exception( 

1072 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}" 

1073 ) 

1074 

1075 # Update last run time in user's settings. 

1076 # Intentionally NOT wrapped in try/finally: if upstream setup 

1077 # fails (DB open, SettingsManager init, initial query), 

1078 # last_run should stay put so the next tick retries. 

1079 # Advancing here would mask a persistent failure (corrupted 

1080 # DB, wrong password). See closed PR #3288. 

1081 current_time = datetime.now(UTC).isoformat() 

1082 settings_manager.set_setting( 

1083 "document_scheduler.last_run", current_time, commit=True 

1084 ) 

1085 logger.debug( 

1086 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}" 

1087 ) 

1088 

1089 end_time = datetime.now(UTC) 

1090 duration = (end_time - start_time).total_seconds() 

1091 logger.info( 

1092 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s" 

1093 ) 

1094 

1095 except Exception: 

1096 logger.exception( 

1097 f"[DOC_SCHEDULER] Error processing documents for user {username}" 

1098 ) 

1099 

1100 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]: 

1101 """Get document scheduler status for a specific user.""" 

1102 try: 

1103 session_info = self.user_sessions.get(username) 

1104 if not session_info: 

1105 return { 

1106 "enabled": False, 

1107 "message": "User not found in scheduler", 

1108 } 

1109 

1110 # Get user's document scheduler settings (cached) 

1111 settings = self._get_document_scheduler_settings(username) 

1112 

1113 # Check if user has document processing job 

1114 job_id = f"{username}_document_processing" 

1115 has_job = job_id in session_info.get("scheduled_jobs", set()) 

1116 

1117 return { 

1118 "enabled": settings.enabled, 

1119 "interval_seconds": settings.interval_seconds, 

1120 "processing_options": { 

1121 "download_pdfs": settings.download_pdfs, 

1122 "extract_text": settings.extract_text, 

1123 "generate_rag": settings.generate_rag, 

1124 }, 

1125 "last_run": settings.last_run, 

1126 "has_scheduled_job": has_job, 

1127 "user_active": username in self.user_sessions, 

1128 } 

1129 

1130 except Exception as e: 

1131 logger.exception( 

1132 f"Error getting document scheduler status for user {username}" 

1133 ) 

1134 return { 

1135 "enabled": False, 

1136 "message": f"Failed to retrieve scheduler status: {type(e).__name__}", 

1137 } 

1138 

1139 def trigger_document_processing(self, username: str) -> bool: 

1140 """Trigger immediate document processing for a user.""" 

1141 logger.info( 

1142 f"[DOC_SCHEDULER] Manual trigger requested for user {username}" 

1143 ) 

1144 try: 

1145 session_info = self.user_sessions.get(username) 

1146 if not session_info: 

1147 logger.warning( 

1148 f"[DOC_SCHEDULER] User {username} not found in scheduler" 

1149 ) 

1150 logger.debug( 

1151 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}" 

1152 ) 

1153 return False 

1154 

1155 if not self.is_running: 

1156 logger.warning( 

1157 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}" 

1158 ) 

1159 return False 

1160 

1161 # Trigger immediate processing 

1162 job_id = f"{username}_document_processing_manual" 

1163 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}") 

1164 

1165 self.scheduler.add_job( 

1166 func=self._wrap_job(self._process_user_documents), 

1167 args=[username], 

1168 trigger="date", 

1169 run_date=datetime.now(UTC) + timedelta(seconds=1), 

1170 id=job_id, 

1171 name=f"Manual Document Processing for {username}", 

1172 replace_existing=True, 

1173 ) 

1174 

1175 # Verify job was added 

1176 job = self.scheduler.get_job(job_id) 

1177 if job: 

1178 logger.info( 

1179 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}" 

1180 ) 

1181 else: 

1182 logger.error( 

1183 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!" 

1184 ) 

1185 return False 

1186 

1187 return True 

1188 

1189 except Exception: 

1190 logger.exception( 

1191 f"[DOC_SCHEDULER] Error triggering document processing for user {username}" 

1192 ) 

1193 return False 

1194 

1195 @thread_cleanup 

1196 def _check_user_overdue_subscriptions(self, username: str): 

1197 """Check and immediately run any overdue subscriptions for a user.""" 

1198 try: 

1199 session_info = self.user_sessions.get(username) 

1200 if not session_info: 

1201 return 

1202 

1203 password = self._credential_store.retrieve(username) 

1204 if not password: 

1205 return 

1206 

1207 # Get user's overdue subscriptions 

1208 from ..database.session_context import get_user_db_session 

1209 from ..database.models.news import NewsSubscription 

1210 from datetime import timezone 

1211 

1212 with get_user_db_session(username, password) as db: 

1213 now = datetime.now(timezone.utc) 

1214 overdue_subs = ( 

1215 db.query(NewsSubscription) 

1216 .filter( 

1217 NewsSubscription.is_active.is_(True), 

1218 NewsSubscription.next_refresh.is_not(None), 

1219 NewsSubscription.next_refresh <= now, 

1220 ) 

1221 .all() 

1222 ) 

1223 

1224 if overdue_subs: 

1225 logger.info( 

1226 f"Found {len(overdue_subs)} overdue subscriptions for {username}" 

1227 ) 

1228 

1229 for sub in overdue_subs: 

1230 # Run immediately with small random delay 

1231 # Security: random delay to stagger overdue jobs, not security-sensitive 

1232 delay_seconds = random.randint(1, 30) 

1233 job_id = ( 

1234 f"overdue_{username}_{sub.id}_{int(now.timestamp())}" 

1235 ) 

1236 

1237 self.scheduler.add_job( 

1238 func=self._wrap_job(self._check_subscription), 

1239 args=[username, sub.id], 

1240 trigger="date", 

1241 run_date=now + timedelta(seconds=delay_seconds), 

1242 id=job_id, 

1243 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}", 

1244 replace_existing=True, 

1245 ) 

1246 

1247 logger.info( 

1248 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds" 

1249 ) 

1250 

1251 except Exception: 

1252 logger.exception( 

1253 f"Error checking overdue subscriptions for {username}" 

1254 ) 

1255 

1256 @thread_cleanup 

1257 def _check_subscription(self, username: str, subscription_id: int): 

1258 """Check and refresh a single subscription.""" 

1259 logger.info( 

1260 f"_check_subscription called for user {username}, subscription {subscription_id}" 

1261 ) 

1262 try: 

1263 session_info = self.user_sessions.get(username) 

1264 if not session_info: 

1265 # User no longer active, cancel job 

1266 job_id = f"{username}_{subscription_id}" 

1267 try: 

1268 self.scheduler.remove_job(job_id) 

1269 except JobLookupError: 

1270 pass 

1271 return 

1272 

1273 password = self._credential_store.retrieve(username) 

1274 if not password: 1274 ↛ 1275line 1274 didn't jump to line 1275 because the condition on line 1274 was never true

1275 logger.warning( 

1276 f"Credentials expired for {username}, skipping subscription check" 

1277 ) 

1278 return 

1279 

1280 # Get subscription details 

1281 from ..database.session_context import get_user_db_session 

1282 from ..database.models.news import NewsSubscription 

1283 

1284 with get_user_db_session(username, password) as db: 

1285 sub = db.query(NewsSubscription).get(subscription_id) 

1286 if not sub or not sub.is_active: 

1287 logger.info( 

1288 f"Subscription {subscription_id} not active, skipping" 

1289 ) 

1290 return 

1291 

1292 # Prepare query with date replacement using user's timezone 

1293 query = sub.query_or_topic 

1294 if "YYYY-MM-DD" in query: 

1295 from local_deep_research.news.core.utils import ( 

1296 get_local_date_string, 

1297 ) 

1298 from ..settings.manager import SettingsManager 

1299 

1300 settings_manager = SettingsManager(db) 

1301 local_date = get_local_date_string(settings_manager) 

1302 query = query.replace("YYYY-MM-DD", local_date) 

1303 

1304 # Update last/next refresh times 

1305 sub.last_refresh = datetime.now(UTC) 

1306 sub.next_refresh = datetime.now(UTC) + timedelta( 

1307 minutes=sub.refresh_interval_minutes 

1308 ) 

1309 db.commit() 

1310 

1311 subscription_data = { 

1312 "id": sub.id, 

1313 "name": sub.name, 

1314 "query": query, 

1315 "original_query": sub.query_or_topic, 

1316 "model_provider": sub.model_provider, 

1317 "model": sub.model, 

1318 "search_strategy": sub.search_strategy, 

1319 "search_engine": sub.search_engine, 

1320 } 

1321 

1322 logger.info( 

1323 f"Refreshing subscription {subscription_id}: {subscription_data['name']}" 

1324 ) 

1325 

1326 # Trigger research synchronously using requests with proper auth 

1327 self._trigger_subscription_research_sync( 

1328 username, subscription_data 

1329 ) 

1330 

1331 # Reschedule for next interval if using interval trigger 

1332 job_id = f"{username}_{subscription_id}" 

1333 job = self.scheduler.get_job(job_id) 

1334 if job and job.trigger.__class__.__name__ == "DateTrigger": 

1335 # For date triggers, reschedule 

1336 # Security: random jitter to distribute subscription timing, not security-sensitive 

1337 next_run = datetime.now(UTC) + timedelta( 

1338 minutes=sub.refresh_interval_minutes, 

1339 seconds=random.randint( 

1340 0, int(self.config.get("max_jitter_seconds", 300)) 

1341 ), 

1342 ) 

1343 self.scheduler.add_job( 

1344 func=self._wrap_job(self._check_subscription), 

1345 args=[username, subscription_id], 

1346 trigger="date", 

1347 run_date=next_run, 

1348 id=job_id, 

1349 replace_existing=True, 

1350 ) 

1351 

1352 except Exception: 

1353 logger.exception(f"Error checking subscription {subscription_id}") 

1354 

1355 @thread_cleanup 

1356 def _trigger_subscription_research_sync( 

1357 self, username: str, subscription: Dict[str, Any] 

1358 ): 

1359 """Trigger research for a subscription using programmatic API.""" 

1360 from ..config.thread_settings import set_settings_context 

1361 

1362 try: 

1363 # Get user's password from session info 

1364 session_info = self.user_sessions.get(username) 

1365 if not session_info: 

1366 logger.error(f"No session info for user {username}") 

1367 return 

1368 

1369 password = self._credential_store.retrieve(username) 

1370 if not password: 1370 ↛ 1371line 1370 didn't jump to line 1371 because the condition on line 1370 was never true

1371 logger.error(f"Credentials expired for user {username}") 

1372 return 

1373 

1374 # Generate research ID 

1375 import uuid 

1376 

1377 research_id = str(uuid.uuid4()) 

1378 

1379 logger.info( 

1380 f"Starting research {research_id} for subscription {subscription['id']}" 

1381 ) 

1382 

1383 # Get user settings for research 

1384 from ..database.session_context import get_user_db_session 

1385 from ..settings.manager import SettingsManager 

1386 

1387 with get_user_db_session(username, password) as db: 

1388 settings_manager = SettingsManager(db) 

1389 settings_snapshot = settings_manager.get_settings_snapshot() 

1390 

1391 # Use the search engine from the subscription if specified 

1392 search_engine = subscription.get("search_engine") 

1393 

1394 if search_engine: 

1395 settings_snapshot["search.tool"] = { 

1396 "value": search_engine, 

1397 "ui_element": "select", 

1398 } 

1399 logger.info( 

1400 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}" 

1401 ) 

1402 else: 

1403 # Use the user's default search tool from their settings 

1404 default_search_tool = settings_snapshot.get( 

1405 "search.tool", "auto" 

1406 ) 

1407 logger.info( 

1408 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}" 

1409 ) 

1410 

1411 logger.debug( 

1412 f"Settings snapshot has {len(settings_snapshot)} settings" 

1413 ) 

1414 # Log a few key settings to verify they're present 

1415 logger.debug( 

1416 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}" 

1417 ) 

1418 

1419 # Set up research parameters 

1420 query = subscription["query"] 

1421 

1422 # Build metadata for news search 

1423 metadata = { 

1424 "is_news_search": True, 

1425 "search_type": "news_analysis", 

1426 "display_in": "news_feed", 

1427 "subscription_id": subscription["id"], 

1428 "triggered_by": "scheduler", 

1429 "subscription_name": subscription["name"], 

1430 "title": subscription["name"] if subscription["name"] else None, 

1431 "scheduled_at": datetime.now(UTC).isoformat(), 

1432 "original_query": subscription["original_query"], 

1433 "user_id": username, 

1434 } 

1435 

1436 # Use programmatic API with settings context 

1437 from ..api.research_functions import quick_summary 

1438 

1439 # Create and set settings context for this thread 

1440 settings_context = SnapshotSettingsContext(settings_snapshot) 

1441 set_settings_context(settings_context) 

1442 

1443 # Get search strategy from subscription data 

1444 search_strategy = subscription.get("search_strategy") 

1445 

1446 # Build kwargs for quick_summary, only including 

1447 # search_strategy if the subscription specifies one. 

1448 quick_summary_kwargs = { 

1449 "query": query, 

1450 "research_id": research_id, 

1451 "username": username, 

1452 "user_password": password, 

1453 "settings_snapshot": settings_snapshot, 

1454 "model_name": subscription.get("model"), 

1455 "provider": subscription.get("model_provider"), 

1456 "metadata": metadata, 

1457 "search_original_query": False, # Don't send long subscription prompts to search engines 

1458 } 

1459 if search_strategy: 1459 ↛ 1462line 1459 didn't jump to line 1462 because the condition on line 1459 was always true

1460 quick_summary_kwargs["search_strategy"] = search_strategy 

1461 

1462 result = quick_summary(**quick_summary_kwargs) 

1463 

1464 logger.info( 

1465 f"Completed research {research_id} for subscription {subscription['id']}" 

1466 ) 

1467 

1468 # Store the research result in the database 

1469 self._store_research_result( 

1470 username, 

1471 password, 

1472 research_id, 

1473 subscription["id"], 

1474 result, 

1475 subscription, 

1476 ) 

1477 

1478 except Exception: 

1479 logger.exception( 

1480 f"Error triggering research for subscription {subscription['id']}" 

1481 ) 

1482 

1483 def _store_research_result( 

1484 self, 

1485 username: str, 

1486 password: str, 

1487 research_id: str, 

1488 subscription_id: int, 

1489 result: Dict[str, Any], 

1490 subscription: Dict[str, Any], 

1491 ): 

1492 """Store research result in database for news display.""" 

1493 try: 

1494 from ..database.session_context import get_user_db_session 

1495 from ..database.models import ResearchHistory 

1496 from ..settings.manager import SettingsManager 

1497 import json 

1498 

1499 # Convert result to JSON-serializable format 

1500 def make_serializable(obj): 

1501 """Convert non-serializable objects to dictionaries.""" 

1502 if hasattr(obj, "dict"): 

1503 return obj.dict() 

1504 if hasattr(obj, "__dict__"): 1504 ↛ 1505line 1504 didn't jump to line 1505 because the condition on line 1504 was never true

1505 return { 

1506 k: make_serializable(v) 

1507 for k, v in obj.__dict__.items() 

1508 if not k.startswith("_") 

1509 } 

1510 if isinstance(obj, (list, tuple)): 

1511 return [make_serializable(item) for item in obj] 

1512 if isinstance(obj, dict): 

1513 return {k: make_serializable(v) for k, v in obj.items()} 

1514 return obj 

1515 

1516 serializable_result = make_serializable(result) 

1517 

1518 with get_user_db_session(username, password) as db: 

1519 # Get user settings to store in metadata 

1520 settings_manager = SettingsManager(db) 

1521 settings_snapshot = settings_manager.get_settings_snapshot() 

1522 

1523 # Get the report content - check both 'report' and 'summary' fields 

1524 report_content = serializable_result.get( 

1525 "report" 

1526 ) or serializable_result.get("summary") 

1527 logger.debug( 

1528 f"Report content length: {len(report_content) if report_content else 0} chars" 

1529 ) 

1530 

1531 # Extract sources/links from the result. They get 

1532 # persisted to research_resources AFTER history_entry 

1533 # commits below (FK requires research_id to exist). 

1534 sources = serializable_result.get("sources", []) 

1535 

1536 # Then format citations in the report content 

1537 if report_content: 

1538 # Import citation formatter 

1539 from ..text_optimization.citation_formatter import ( 

1540 CitationFormatter, 

1541 CitationMode, 

1542 ) 

1543 from ..config.search_config import ( 

1544 get_setting_from_snapshot, 

1545 ) 

1546 

1547 # Get citation format from settings 

1548 citation_format = get_setting_from_snapshot( 

1549 "report.citation_format", "domain_id_hyperlinks" 

1550 ) 

1551 mode_map = { 

1552 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

1553 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

1554 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

1555 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

1556 "source_tagged_hyperlinks": CitationMode.SOURCE_TAGGED_HYPERLINKS, 

1557 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

1558 } 

1559 mode = mode_map.get( 

1560 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS 

1561 ) 

1562 formatter = CitationFormatter(mode=mode) 

1563 

1564 # Format citations within the content 

1565 report_content = formatter.format_document(report_content) 

1566 

1567 if not report_content: 

1568 # If neither field exists, use the full result as JSON 

1569 report_content = json.dumps(serializable_result) 

1570 

1571 # Generate headline and topics for news searches 

1572 from ..news.utils.headline_generator import generate_headline 

1573 from ..news.utils.topic_generator import generate_topics 

1574 

1575 query_text = result.get( 

1576 "query", subscription.get("query", "News Update") 

1577 ) 

1578 

1579 # Generate headline from the actual research findings 

1580 logger.info( 

1581 f"Generating headline for subscription {subscription_id}" 

1582 ) 

1583 generated_headline = generate_headline( 

1584 query=query_text, 

1585 findings=report_content, 

1586 max_length=200, # Allow longer headlines for news 

1587 ) 

1588 

1589 # Generate topics from the findings 

1590 logger.info( 

1591 f"Generating topics for subscription {subscription_id}" 

1592 ) 

1593 generated_topics = generate_topics( 

1594 query=query_text, 

1595 findings=report_content, 

1596 category=subscription.get("name", "News"), 

1597 max_topics=6, 

1598 ) 

1599 

1600 logger.info( 

1601 f"Generated headline: {generated_headline}, topics: {generated_topics}" 

1602 ) 

1603 

1604 # Get subscription name for metadata 

1605 subscription_name = subscription.get("name", "") 

1606 

1607 # Use generated headline as title, or fallback 

1608 if generated_headline: 

1609 title = generated_headline 

1610 else: 

1611 if subscription_name: 

1612 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1613 else: 

1614 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1615 

1616 # Create research history entry 

1617 history_entry = ResearchHistory( 

1618 id=research_id, 

1619 query=result.get("query", ""), 

1620 mode="news_subscription", 

1621 status="completed", 

1622 created_at=datetime.now(UTC).isoformat(), 

1623 completed_at=datetime.now(UTC).isoformat(), 

1624 title=title, 

1625 research_meta={ 

1626 "subscription_id": subscription_id, 

1627 "triggered_by": "scheduler", 

1628 "is_news_search": True, 

1629 "username": username, 

1630 "subscription_name": subscription_name, # Store subscription name for display 

1631 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval 

1632 "generated_headline": generated_headline, # Store generated headline for news display 

1633 "generated_topics": generated_topics, # Store topics for categorization 

1634 }, 

1635 ) 

1636 db.add(history_entry) 

1637 db.commit() 

1638 

1639 # Persist sources to research_resources so the assembler 

1640 # can rebuild the Sources block at render time. Was 

1641 # previously written INLINE into report_content via a 

1642 # "## Sources" tail — the report_content refactor moves 

1643 # this to structured storage matching normal research. 

1644 if sources: 

1645 try: 

1646 from ..web.services.research_sources_service import ( 

1647 ResearchSourcesService, 

1648 ) 

1649 

1650 ResearchSourcesService.save_research_sources( 

1651 research_id=research_id, 

1652 sources=sources, 

1653 username=username, 

1654 ) 

1655 except Exception: 

1656 logger.exception( 

1657 "Failed to persist scheduler sources for " 

1658 "research {} — assembler will render no Sources " 

1659 "block for this row.", 

1660 research_id, 

1661 ) 

1662 

1663 # Store the report content using storage abstraction 

1664 from ..storage import get_report_storage 

1665 

1666 # Use storage to save the report (report_content already retrieved above) 

1667 storage = get_report_storage(session=db) 

1668 storage.save_report( 

1669 research_id=research_id, 

1670 content=report_content, 

1671 username=username, 

1672 ) 

1673 

1674 logger.info( 

1675 f"Stored research result {research_id} for subscription {subscription_id}" 

1676 ) 

1677 

1678 except Exception: 

1679 logger.exception("Error storing research result") 

1680 

1681 def _run_cleanup_with_tracking(self): 

1682 """Wrapper that tracks cleanup execution.""" 

1683 

1684 try: 

1685 cleaned_count = self._cleanup_inactive_users() 

1686 

1687 logger.info( 

1688 f"Cleanup successful: removed {cleaned_count} inactive users" 

1689 ) 

1690 

1691 except Exception: 

1692 logger.exception("Cleanup job failed") 

1693 

1694 def _cleanup_inactive_users(self) -> int: 

1695 """Remove users inactive for longer than retention period.""" 

1696 retention_hours = self.config.get("retention_hours", 48) 

1697 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours) 

1698 

1699 cleaned_count = 0 

1700 

1701 with self.lock: 

1702 inactive_users = [ 

1703 user_id 

1704 for user_id, session in self.user_sessions.items() 

1705 if session["last_activity"] < cutoff 

1706 ] 

1707 

1708 for user_id in inactive_users: 

1709 # Remove all scheduled jobs 

1710 for job_id in self.user_sessions[user_id][ 

1711 "scheduled_jobs" 

1712 ].copy(): 

1713 try: 

1714 self.scheduler.remove_job(job_id) 

1715 except JobLookupError: 

1716 pass 

1717 

1718 # Clear credentials and session data 

1719 self._credential_store.clear(user_id) 

1720 del self.user_sessions[user_id] 

1721 cleaned_count += 1 

1722 logger.info(f"Cleaned up inactive user {user_id}") 

1723 

1724 return cleaned_count 

1725 

1726 def _reload_config(self): 

1727 """Reload configuration from settings manager.""" 

1728 if not hasattr(self, "settings_manager") or not self.settings_manager: 

1729 return 

1730 

1731 try: 

1732 old_retention = self.config.get("retention_hours", 48) 

1733 

1734 # Reload all settings 

1735 for key in self.config: 

1736 if key == "enabled": 

1737 continue # Don't change enabled state while running 

1738 

1739 full_key = f"news.scheduler.{key}" 

1740 self.config[key] = self._get_setting(full_key, self.config[key]) 

1741 

1742 # Handle changes that need immediate action 

1743 if old_retention != self.config["retention_hours"]: 

1744 logger.info( 

1745 f"Retention period changed from {old_retention} " 

1746 f"to {self.config['retention_hours']} hours" 

1747 ) 

1748 # Trigger immediate cleanup with new retention 

1749 self.scheduler.add_job( 

1750 self._wrap_job(self._run_cleanup_with_tracking), 

1751 "date", 

1752 run_date=datetime.now(UTC) + timedelta(seconds=5), 

1753 id="immediate_cleanup_config_change", 

1754 ) 

1755 

1756 # Clear settings cache to pick up any user setting changes 

1757 self.invalidate_all_settings_cache() 

1758 

1759 except Exception: 

1760 logger.exception("Error reloading configuration") 

1761 

1762 def get_status(self) -> Dict[str, Any]: 

1763 """Get scheduler status information.""" 

1764 with self.lock: 

1765 active_users = len(self.user_sessions) 

1766 total_jobs = sum( 

1767 len(session["scheduled_jobs"]) 

1768 for session in self.user_sessions.values() 

1769 ) 

1770 

1771 # Get next run time for cleanup job 

1772 next_cleanup = None 

1773 if self.is_running: 

1774 job = self.scheduler.get_job("cleanup_inactive_users") 

1775 if job: 1775 ↛ 1778line 1775 didn't jump to line 1778 because the condition on line 1775 was always true

1776 next_cleanup = job.next_run_time 

1777 

1778 return { 

1779 "is_running": self.is_running, 

1780 "config": self.config, 

1781 "active_users": active_users, 

1782 "total_scheduled_jobs": total_jobs, 

1783 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None, 

1784 "memory_usage": self._estimate_memory_usage(), 

1785 } 

1786 

1787 def _estimate_memory_usage(self) -> int: 

1788 """Estimate memory usage of user sessions.""" 

1789 

1790 # Rough estimate: username (50) + password (100) + metadata (200) per user 

1791 per_user_estimate = 350 

1792 return len(self.user_sessions) * per_user_estimate 

1793 

1794 def get_user_sessions_summary(self) -> List[Dict[str, Any]]: 

1795 """Get summary of active user sessions (without passwords).""" 

1796 with self.lock: 

1797 summary = [] 

1798 for user_id, session in self.user_sessions.items(): 

1799 summary.append( 

1800 { 

1801 "user_id": user_id, 

1802 "last_activity": session["last_activity"].isoformat(), 

1803 "scheduled_jobs": len(session["scheduled_jobs"]), 

1804 "time_since_activity": str( 

1805 datetime.now(UTC) - session["last_activity"] 

1806 ), 

1807 } 

1808 ) 

1809 return summary 

1810 

1811 

1812# Singleton instance getter 

1813_scheduler_instance = None 

1814 

1815 

1816def get_background_job_scheduler() -> BackgroundJobScheduler: 

1817 """Get the singleton news scheduler instance.""" 

1818 global _scheduler_instance 

1819 if _scheduler_instance is None: 

1820 _scheduler_instance = BackgroundJobScheduler() 

1821 return _scheduler_instance