Coverage for src / local_deep_research / news / subscription_manager / scheduler.py: 97%

666 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Activity-based news subscription scheduler for per-user encrypted databases. 

3Tracks user activity and temporarily stores credentials for automatic updates. 

4""" 

5 

6import random 

7import threading 

8from dataclasses import dataclass 

9from datetime import datetime, timedelta, UTC 

10from functools import wraps 

11from typing import Any, Callable, Dict, List 

12 

13from cachetools import TTLCache 

14from loguru import logger 

15from ...settings.logger import log_settings 

16from ...settings.manager import SnapshotSettingsContext 

17 

18from apscheduler.schedulers.background import BackgroundScheduler 

19from apscheduler.jobstores.base import JobLookupError 

20from ...constants import ResearchStatus 

21from ...database.credential_store_base import CredentialStoreBase 

22from ...database.thread_local_session import thread_cleanup 

23 

24# RAG indexing imports 

25from ...research_library.services.library_rag_service import LibraryRAGService 

26from ...database.library_init import get_default_library_id 

27from ...database.models.library import Document, DocumentCollection 

28 

29 

30SCHEDULER_AVAILABLE = True # Always available since it's a required dependency 

31 

32 

33class SchedulerCredentialStore(CredentialStoreBase): 

34 """Credential store for the news scheduler. 

35 

36 Stores user passwords with TTL expiration so that background scheduler 

37 jobs can access encrypted per-user databases. 

38 """ 

39 

40 def __init__(self, ttl_hours: int = 48): 

41 super().__init__(ttl_hours * 3600) 

42 

43 def store(self, username: str, password: str) -> None: 

44 """Store password for a user.""" 

45 self._store_credentials( 

46 username, {"username": username, "password": password} 

47 ) 

48 

49 def retrieve(self, username: str) -> str | None: 

50 """Retrieve password for a user. Returns None if expired/missing.""" 

51 result = self._retrieve_credentials(username, remove=False) 

52 return result[1] if result else None 

53 

54 def clear(self, username: str) -> None: 

55 """Clear stored password for a user.""" 

56 self.clear_entry(username) 

57 

58 

59@dataclass(frozen=True) 

60class DocumentSchedulerSettings: 

61 """ 

62 Immutable settings snapshot for document scheduler. 

63 

64 Thread-safe: This is a frozen dataclass that can be safely passed 

65 to and used from background threads. 

66 """ 

67 

68 enabled: bool = True 

69 interval_seconds: int = 1800 

70 download_pdfs: bool = False 

71 extract_text: bool = True 

72 generate_rag: bool = False 

73 last_run: str = "" 

74 

75 @classmethod 

76 def defaults(cls) -> "DocumentSchedulerSettings": 

77 """Return default settings.""" 

78 return cls() 

79 

80 

81class NewsScheduler: 

82 """ 

83 Singleton scheduler that manages news subscriptions for active users. 

84 

85 This scheduler: 

86 - Monitors user activity through database access 

87 - Temporarily stores user credentials in memory 

88 - Automatically schedules subscription checks 

89 - Cleans up inactive users after configurable period 

90 """ 

91 

92 _instance = None 

93 _lock = threading.Lock() 

94 

95 def __new__(cls): 

96 """Ensure singleton instance.""" 

97 if cls._instance is None: 

98 with cls._lock: 

99 if cls._instance is None: 99 ↛ 101line 99 didn't jump to line 101

100 cls._instance = super().__new__(cls) 

101 return cls._instance 

102 

103 def __init__(self): 

104 """Initialize the scheduler (only runs once due to singleton).""" 

105 # Skip if already initialized 

106 if hasattr(self, "_initialized"): 

107 return 

108 

109 # User session tracking 

110 self.user_sessions = {} # user_id -> {last_activity, scheduled_jobs} 

111 self.lock = threading.Lock() 

112 

113 # Credential store with TTL-based expiration 

114 self._credential_store = SchedulerCredentialStore(ttl_hours=48) 

115 

116 # Scheduler instance 

117 self.scheduler = BackgroundScheduler() 

118 

119 # Configuration (will be loaded from settings) 

120 self.config = self._load_default_config() 

121 

122 # State 

123 self.is_running = False 

124 self._app = None # Flask app reference for background job contexts 

125 

126 # Settings cache: username -> DocumentSchedulerSettings 

127 # TTL of 300 seconds (5 minutes) reduces database queries 

128 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300) 

129 self._settings_cache_lock = threading.Lock() 

130 

131 self._initialized = True 

132 logger.info("News scheduler initialized") 

133 

134 def _load_default_config(self) -> Dict[str, Any]: 

135 """Load default configuration (will be overridden by settings manager).""" 

136 return { 

137 "enabled": True, 

138 "retention_hours": 48, 

139 "cleanup_interval_hours": 1, 

140 "max_jitter_seconds": 300, 

141 "max_concurrent_jobs": 10, 

142 "subscription_batch_size": 5, 

143 "activity_check_interval_minutes": 5, 

144 } 

145 

146 def initialize_with_settings(self, settings_manager): 

147 """Initialize configuration from settings manager.""" 

148 try: 

149 # Load all scheduler settings 

150 self.settings_manager = settings_manager 

151 self.config = { 

152 "enabled": self._get_setting("news.scheduler.enabled", True), 

153 "retention_hours": self._get_setting( 

154 "news.scheduler.retention_hours", 48 

155 ), 

156 "cleanup_interval_hours": self._get_setting( 

157 "news.scheduler.cleanup_interval_hours", 1 

158 ), 

159 "max_jitter_seconds": self._get_setting( 

160 "news.scheduler.max_jitter_seconds", 300 

161 ), 

162 "max_concurrent_jobs": self._get_setting( 

163 "news.scheduler.max_concurrent_jobs", 10 

164 ), 

165 "subscription_batch_size": self._get_setting( 

166 "news.scheduler.batch_size", 5 

167 ), 

168 "activity_check_interval_minutes": self._get_setting( 

169 "news.scheduler.activity_check_interval", 5 

170 ), 

171 } 

172 log_settings(self.config, "Scheduler configuration loaded") 

173 except Exception: 

174 logger.exception("Error loading scheduler settings") 

175 # Keep default config 

176 

177 def _get_setting(self, key: str, default: Any) -> Any: 

178 """Get setting with fallback to default.""" 

179 if hasattr(self, "settings_manager") and self.settings_manager: 

180 return self.settings_manager.get_setting(key, default=default) 

181 return default 

182 

183 def set_app(self, app) -> None: 

184 """Store a reference to the Flask app for creating app contexts in background jobs.""" 

185 self._app = app 

186 

187 def _wrap_job(self, func: Callable) -> Callable: 

188 """Wrap a scheduler job function so it runs inside a Flask app context. 

189 

190 APScheduler runs jobs in a thread pool without Flask context. 

191 This wrapper pushes an app context before the job runs and pops it after. 

192 """ 

193 

194 @wraps(func) 

195 def wrapper(*args, **kwargs): 

196 if self._app is not None: 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was always true

197 with self._app.app_context(): 

198 return func(*args, **kwargs) 

199 else: 

200 logger.warning( 

201 f"No Flask app set on scheduler; running {func.__name__} without app context" 

202 ) 

203 return func(*args, **kwargs) 

204 

205 return wrapper 

206 

207 def _get_document_scheduler_settings( 

208 self, username: str, force_refresh: bool = False 

209 ) -> DocumentSchedulerSettings: 

210 """ 

211 Get document scheduler settings for a user with TTL caching. 

212 

213 This is the single source of truth for document scheduler settings. 

214 Settings are cached for 5 minutes by default to reduce database queries. 

215 

216 Args: 

217 username: User to get settings for 

218 force_refresh: If True, bypass cache and fetch fresh settings 

219 

220 Returns: 

221 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety) 

222 """ 

223 # Fast path: check cache without modifying it 

224 if not force_refresh: 

225 with self._settings_cache_lock: 

226 cached = self._settings_cache.get(username) 

227 if cached is not None: 

228 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}") 

229 cached_settings: DocumentSchedulerSettings = cached 

230 return cached_settings 

231 

232 # Cache miss - need to fetch from database 

233 logger.debug( 

234 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB" 

235 ) 

236 

237 # Get password from session 

238 session_info = self.user_sessions.get(username) 

239 if not session_info: 

240 logger.warning( 

241 f"[SETTINGS_CACHE] No session info for {username}, using defaults" 

242 ) 

243 return DocumentSchedulerSettings.defaults() 

244 

245 password = self._credential_store.retrieve(username) 

246 if not password: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 logger.warning( 

248 f"[SETTINGS_CACHE] Credentials expired for {username}, using defaults" 

249 ) 

250 return DocumentSchedulerSettings.defaults() 

251 

252 # Fetch settings from database (outside lock to avoid blocking) 

253 try: 

254 from ...database.session_context import get_user_db_session 

255 from ...settings.manager import SettingsManager 

256 

257 with get_user_db_session(username, password) as db: 

258 sm = SettingsManager(db) 

259 

260 settings = DocumentSchedulerSettings( 

261 enabled=sm.get_setting("document_scheduler.enabled", True), 

262 interval_seconds=sm.get_setting( 

263 "document_scheduler.interval_seconds", 1800 

264 ), 

265 download_pdfs=sm.get_setting( 

266 "document_scheduler.download_pdfs", False 

267 ), 

268 extract_text=sm.get_setting( 

269 "document_scheduler.extract_text", True 

270 ), 

271 generate_rag=sm.get_setting( 

272 "document_scheduler.generate_rag", False 

273 ), 

274 last_run=sm.get_setting("document_scheduler.last_run", ""), 

275 ) 

276 

277 # Store in cache 

278 with self._settings_cache_lock: 

279 self._settings_cache[username] = settings 

280 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}") 

281 

282 return settings 

283 

284 except Exception: 

285 logger.exception( 

286 f"[SETTINGS_CACHE] Error fetching settings for {username}" 

287 ) 

288 return DocumentSchedulerSettings.defaults() 

289 

290 def invalidate_user_settings_cache(self, username: str) -> bool: 

291 """ 

292 Invalidate cached settings for a specific user. 

293 

294 Call this when user settings change or user logs out. 

295 

296 Args: 

297 username: User whose cache to invalidate 

298 

299 Returns: 

300 True if cache entry was removed, False if not found 

301 """ 

302 with self._settings_cache_lock: 

303 if username in self._settings_cache: 

304 del self._settings_cache[username] 

305 logger.debug( 

306 f"[SETTINGS_CACHE] Invalidated cache for {username}" 

307 ) 

308 return True 

309 return False 

310 

311 def invalidate_all_settings_cache(self) -> int: 

312 """ 

313 Invalidate all cached settings. 

314 

315 Call this when doing bulk settings updates or during config reload. 

316 

317 Returns: 

318 Number of cache entries cleared 

319 """ 

320 with self._settings_cache_lock: 

321 count = len(self._settings_cache) 

322 self._settings_cache.clear() 

323 logger.info( 

324 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)" 

325 ) 

326 return count 

327 

328 def start(self): 

329 """Start the scheduler.""" 

330 if not self.config.get("enabled", True): 

331 logger.info("News scheduler is disabled in settings") 

332 return 

333 

334 if self.is_running: 

335 logger.warning("Scheduler is already running") 

336 return 

337 

338 if self._app is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 raise RuntimeError( 

340 "NewsScheduler.set_app() must be called before start()" 

341 ) 

342 

343 # Schedule cleanup job 

344 self.scheduler.add_job( 

345 self._wrap_job(self._run_cleanup_with_tracking), 

346 "interval", 

347 hours=self.config["cleanup_interval_hours"], 

348 id="cleanup_inactive_users", 

349 name="Cleanup Inactive User Sessions", 

350 jitter=60, # Add some jitter to cleanup 

351 ) 

352 

353 # Schedule configuration reload 

354 self.scheduler.add_job( 

355 self._wrap_job(self._reload_config), 

356 "interval", 

357 minutes=30, 

358 id="reload_config", 

359 name="Reload Configuration", 

360 ) 

361 

362 # Start the scheduler 

363 self.scheduler.start() 

364 self.is_running = True 

365 

366 # Schedule initial cleanup after a delay 

367 self.scheduler.add_job( 

368 self._wrap_job(self._run_cleanup_with_tracking), 

369 "date", 

370 run_date=datetime.now(UTC) + timedelta(seconds=30), 

371 id="initial_cleanup", 

372 ) 

373 

374 logger.info("News scheduler started") 

375 

376 def stop(self): 

377 """Stop the scheduler.""" 

378 if self.is_running: 

379 self.scheduler.shutdown(wait=True) 

380 self.is_running = False 

381 

382 # Clear all user sessions and credentials 

383 with self.lock: 

384 for username in self.user_sessions: 

385 self._credential_store.clear(username) 

386 self.user_sessions.clear() 

387 

388 logger.info("News scheduler stopped") 

389 

390 def update_user_info(self, username: str, password: str): 

391 """ 

392 Update user info in scheduler. Called on every database interaction. 

393 

394 Args: 

395 username: User's username 

396 password: User's password 

397 """ 

398 logger.info( 

399 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}" 

400 ) 

401 logger.debug( 

402 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}" 

403 ) 

404 

405 if not self.is_running: 

406 logger.warning( 

407 f"[SCHEDULER] Scheduler not running, cannot update user {username}" 

408 ) 

409 return 

410 

411 with self.lock: 

412 # Store password in credential store (inside lock to prevent 

413 # race where concurrent calls leave mismatched credentials) 

414 self._credential_store.store(username, password) 

415 

416 now = datetime.now(UTC) 

417 

418 if username not in self.user_sessions: 

419 # New user - create session info 

420 logger.info(f"[SCHEDULER] New user in scheduler: {username}") 

421 self.user_sessions[username] = { 

422 "last_activity": now, 

423 "scheduled_jobs": set(), 

424 } 

425 logger.debug( 

426 f"[SCHEDULER] Created session for {username}, scheduling subscriptions" 

427 ) 

428 # Schedule their subscriptions 

429 self._schedule_user_subscriptions(username) 

430 else: 

431 # Existing user - update info 

432 logger.info( 

433 f"[SCHEDULER] Updating existing user {username} activity, will reschedule" 

434 ) 

435 old_activity = self.user_sessions[username]["last_activity"] 

436 activity_delta = now - old_activity 

437 logger.debug( 

438 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}" 

439 ) 

440 

441 self.user_sessions[username]["last_activity"] = now 

442 logger.debug( 

443 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions" 

444 ) 

445 # Reschedule their subscriptions in case they changed 

446 self._schedule_user_subscriptions(username) 

447 

448 def unregister_user(self, username: str): 

449 """ 

450 Unregister a user and clean up their scheduled jobs. 

451 Called when user logs out. 

452 """ 

453 with self.lock: 

454 if username in self.user_sessions: 

455 logger.info(f"Unregistering user {username}") 

456 

457 # Remove all scheduled jobs for this user 

458 session_info = self.user_sessions[username] 

459 for job_id in session_info["scheduled_jobs"].copy(): 

460 try: 

461 self.scheduler.remove_job(job_id) 

462 except JobLookupError: 

463 pass 

464 

465 # Remove user session and clear credentials atomically 

466 del self.user_sessions[username] 

467 self._credential_store.clear(username) 

468 

469 # Invalidate settings cache for this user (outside lock) 

470 self.invalidate_user_settings_cache(username) 

471 logger.info(f"User {username} unregistered successfully") 

472 

473 def _schedule_user_subscriptions(self, username: str): 

474 """Schedule all active subscriptions for a user.""" 

475 logger.info(f"_schedule_user_subscriptions called for {username}") 

476 try: 

477 session_info = self.user_sessions.get(username) 

478 if not session_info: 

479 logger.warning(f"No session info found for {username}") 

480 return 

481 

482 password = self._credential_store.retrieve(username) 

483 if not password: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 logger.warning( 

485 f"Credentials expired for {username}, skipping subscription scheduling" 

486 ) 

487 return 

488 logger.debug(f"Got password for {username}: present") 

489 

490 # Get user's subscriptions from their encrypted database 

491 from ...database.session_context import get_user_db_session 

492 from ...database.models.news import NewsSubscription 

493 

494 with get_user_db_session(username, password) as db: 

495 subscriptions = ( 

496 db.query(NewsSubscription).filter_by(is_active=True).all() 

497 ) 

498 logger.debug( 

499 f"Query executed, found {len(subscriptions)} results" 

500 ) 

501 

502 # Log details of each subscription 

503 for sub in subscriptions: 

504 logger.debug( 

505 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes" 

506 ) 

507 

508 logger.info( 

509 f"Found {len(subscriptions)} active subscriptions for {username}" 

510 ) 

511 

512 # Clear old jobs for this user 

513 for job_id in session_info["scheduled_jobs"].copy(): 

514 try: 

515 self.scheduler.remove_job(job_id) 

516 session_info["scheduled_jobs"].remove(job_id) 

517 except JobLookupError: 

518 pass 

519 

520 # Schedule each subscription with jitter 

521 for sub in subscriptions: 

522 job_id = f"{username}_{sub.id}" 

523 

524 # Calculate jitter 

525 # Security: random jitter to distribute subscription timing, not security-sensitive 

526 max_jitter = int(self.config.get("max_jitter_seconds", 300)) 

527 jitter = random.randint(0, max_jitter) 

528 

529 # Determine trigger based on frequency 

530 refresh_minutes = sub.refresh_interval_minutes 

531 

532 if refresh_minutes <= 60: # 60 minutes or less 

533 # For hourly or more frequent, use interval trigger 

534 trigger = "interval" 

535 trigger_args = { 

536 "minutes": refresh_minutes, 

537 "jitter": jitter, 

538 "start_date": datetime.now(UTC), # Start immediately 

539 } 

540 else: 

541 # For less frequent, calculate next run time 

542 now = datetime.now(UTC) 

543 if sub.next_refresh: 

544 # Ensure timezone-aware for comparison with now (UTC) 

545 next_refresh_aware = sub.next_refresh 

546 if next_refresh_aware.tzinfo is None: 

547 logger.warning( 

548 f"Subscription {sub.id} has naive (non-tz-aware) " 

549 f"next_refresh datetime, assuming UTC" 

550 ) 

551 next_refresh_aware = next_refresh_aware.replace( 

552 tzinfo=UTC 

553 ) 

554 if next_refresh_aware <= now: 

555 # Subscription is overdue - run it immediately with small jitter 

556 logger.info( 

557 f"Subscription {sub.id} is overdue, scheduling immediate run" 

558 ) 

559 next_run = now + timedelta(seconds=jitter) 

560 else: 

561 next_run = next_refresh_aware 

562 else: 

563 next_run = now + timedelta( 

564 minutes=refresh_minutes, seconds=jitter 

565 ) 

566 

567 trigger = "date" 

568 trigger_args = {"run_date": next_run} 

569 

570 # Add the job 

571 self.scheduler.add_job( 

572 func=self._wrap_job(self._check_subscription), 

573 args=[username, sub.id], 

574 trigger=trigger, 

575 id=job_id, 

576 name=f"Check {sub.name or sub.query_or_topic[:30]}", 

577 replace_existing=True, 

578 **trigger_args, 

579 ) 

580 

581 session_info["scheduled_jobs"].add(job_id) 

582 logger.info(f"Scheduled job {job_id} with {trigger} trigger") 

583 

584 except Exception: 

585 logger.exception(f"Error scheduling subscriptions for {username}") 

586 

587 # Add document processing for this user 

588 self._schedule_document_processing(username) 

589 

590 def _schedule_document_processing(self, username: str): 

591 """Schedule document processing for a user.""" 

592 logger.info( 

593 f"[DOC_SCHEDULER] Scheduling document processing for {username}" 

594 ) 

595 logger.debug( 

596 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}" 

597 ) 

598 

599 try: 

600 session_info = self.user_sessions.get(username) 

601 if not session_info: 

602 logger.warning( 

603 f"[DOC_SCHEDULER] No session info found for {username}" 

604 ) 

605 logger.debug( 

606 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}" 

607 ) 

608 return 

609 

610 logger.debug( 

611 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}" 

612 ) 

613 

614 # Get user's document scheduler settings (cached) 

615 settings = self._get_document_scheduler_settings(username) 

616 

617 if not settings.enabled: 

618 logger.info( 

619 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}" 

620 ) 

621 return 

622 

623 logger.info( 

624 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, " 

625 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, " 

626 f"text={settings.extract_text}, rag={settings.generate_rag}" 

627 ) 

628 

629 # Schedule document processing job 

630 job_id = f"{username}_document_processing" 

631 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}") 

632 

633 # Remove existing document job if any 

634 try: 

635 self.scheduler.remove_job(job_id) 

636 session_info["scheduled_jobs"].discard(job_id) 

637 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}") 

638 except JobLookupError: 

639 logger.debug( 

640 f"[DOC_SCHEDULER] No existing job {job_id} to remove" 

641 ) 

642 pass # Job doesn't exist, that's fine 

643 

644 # Add new document processing job 

645 logger.debug( 

646 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s" 

647 ) 

648 self.scheduler.add_job( 

649 func=self._wrap_job(self._process_user_documents), 

650 args=[username], 

651 trigger="interval", 

652 seconds=settings.interval_seconds, 

653 id=job_id, 

654 name=f"Process Documents for {username}", 

655 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously 

656 max_instances=1, # Prevent overlapping document processing for same user 

657 replace_existing=True, 

658 ) 

659 

660 session_info["scheduled_jobs"].add(job_id) 

661 logger.info( 

662 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval" 

663 ) 

664 logger.debug( 

665 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}" 

666 ) 

667 

668 # Verify job was added 

669 job = self.scheduler.get_job(job_id) 

670 if job: 

671 logger.info( 

672 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}" 

673 ) 

674 else: 

675 logger.error( 

676 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!" 

677 ) 

678 

679 except Exception: 

680 logger.exception( 

681 f"Error scheduling document processing for {username}" 

682 ) 

683 

684 @thread_cleanup 

685 def _process_user_documents(self, username: str): 

686 """Process documents for a user.""" 

687 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}") 

688 start_time = datetime.now(UTC) 

689 

690 try: 

691 session_info = self.user_sessions.get(username) 

692 if not session_info: 

693 logger.warning( 

694 f"[DOC_SCHEDULER] No session info found for user {username}" 

695 ) 

696 return 

697 

698 password = self._credential_store.retrieve(username) 

699 if not password: 

700 logger.warning( 

701 f"[DOC_SCHEDULER] Credentials expired for user {username}" 

702 ) 

703 return 

704 logger.debug( 

705 f"[DOC_SCHEDULER] Starting document processing for {username}" 

706 ) 

707 

708 # Get user's document scheduler settings (cached) 

709 settings = self._get_document_scheduler_settings(username) 

710 

711 logger.info( 

712 f"[DOC_SCHEDULER] Processing settings for {username}: " 

713 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}" 

714 ) 

715 

716 if not any( 

717 [ 

718 settings.download_pdfs, 

719 settings.extract_text, 

720 settings.generate_rag, 

721 ] 

722 ): 

723 logger.info( 

724 f"[DOC_SCHEDULER] No processing options enabled for user {username}" 

725 ) 

726 return 

727 

728 # Parse last_run from cached settings 

729 last_run = ( 

730 datetime.fromisoformat(settings.last_run) 

731 if settings.last_run 

732 else None 

733 ) 

734 

735 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}") 

736 

737 # Need database session for queries and updates 

738 from ...database.session_context import get_user_db_session 

739 from ...database.models.research import ResearchHistory 

740 from ...settings.manager import SettingsManager 

741 

742 with get_user_db_session(username, password) as db: 

743 settings_manager = SettingsManager(db) 

744 

745 # Query for completed research since last run 

746 logger.debug( 

747 f"[DOC_SCHEDULER] Querying for completed research since {last_run}" 

748 ) 

749 query = db.query(ResearchHistory).filter( 

750 ResearchHistory.status == ResearchStatus.COMPLETED, 

751 ResearchHistory.completed_at.is_not( 

752 None 

753 ), # Ensure completed_at is not null 

754 ) 

755 

756 if last_run: 

757 query = query.filter( 

758 ResearchHistory.completed_at > last_run 

759 ) 

760 

761 # Limit to recent research to prevent overwhelming 

762 query = query.order_by( 

763 ResearchHistory.completed_at.desc() 

764 ).limit(20) 

765 

766 research_sessions = query.all() 

767 logger.debug( 

768 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions" 

769 ) 

770 

771 if not research_sessions: 

772 logger.info( 

773 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}" 

774 ) 

775 return 

776 

777 logger.info( 

778 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}" 

779 ) 

780 

781 # Log details of each research session 

782 for i, research in enumerate( 

783 research_sessions[:5] 

784 ): # Log first 5 details 

785 title_safe = ( 

786 (research.title[:50] + "...") 

787 if research.title 

788 else "No title" 

789 ) 

790 completed_safe = ( 

791 research.completed_at 

792 if research.completed_at 

793 else "No completion time" 

794 ) 

795 logger.debug( 

796 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}" 

797 ) 

798 

799 # Handle completed_at which might be a string or datetime 

800 completed_at_obj = None 

801 if research.completed_at: 

802 if isinstance(research.completed_at, str): 

803 try: 

804 completed_at_obj = datetime.fromisoformat( 

805 research.completed_at.replace("Z", "+00:00") 

806 ) 

807 except (ValueError, TypeError, AttributeError): 

808 completed_at_obj = None 

809 else: 

810 completed_at_obj = research.completed_at 

811 

812 logger.debug( 

813 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}" 

814 ) 

815 logger.debug( 

816 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}" 

817 ) 

818 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}") 

819 logger.debug( 

820 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}" 

821 ) 

822 

823 processed_count = 0 

824 for research in research_sessions: 

825 try: 

826 logger.info( 

827 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}" 

828 ) 

829 

830 # Call actual processing APIs 

831 if settings.download_pdfs: 

832 logger.info( 

833 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}" 

834 ) 

835 try: 

836 # Use the DownloadService to queue PDF downloads 

837 from ...research_library.services.download_service import ( 

838 DownloadService, 

839 ) 

840 

841 with DownloadService( 

842 username=username, password=password 

843 ) as download_service: 

844 queued_count = download_service.queue_research_downloads( 

845 research.id 

846 ) 

847 logger.info( 

848 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}" 

849 ) 

850 except Exception: 

851 logger.exception( 

852 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}" 

853 ) 

854 

855 if settings.extract_text: 

856 logger.info( 

857 f"[DOC_SCHEDULER] Extracting text for research {research.id}" 

858 ) 

859 try: 

860 # Use the DownloadService to extract text for all resources 

861 from ...research_library.services.download_service import ( 

862 DownloadService, 

863 ) 

864 from ...database.models.research import ( 

865 ResearchResource, 

866 ) 

867 

868 from ...research_library.utils import ( 

869 is_downloadable_url, 

870 ) 

871 

872 with DownloadService( 

873 username=username, password=password 

874 ) as download_service: 

875 # Get all resources for this research (reuse existing db session) 

876 all_resources = ( 

877 db.query(ResearchResource) 

878 .filter_by(research_id=research.id) 

879 .all() 

880 ) 

881 # Filter: only process downloadable resources (academic/PDF) 

882 resources = [ 

883 r 

884 for r in all_resources 

885 if is_downloadable_url(r.url) 

886 ] 

887 processed_count = 0 

888 for resource in resources: 

889 # We need to pass the password to the download service 

890 # The DownloadService creates its own database sessions, so we need to ensure password is available 

891 try: 

892 success, error = ( 

893 download_service.download_as_text( 

894 resource.id 

895 ) 

896 ) 

897 if success: 

898 processed_count += 1 

899 logger.info( 

900 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}" 

901 ) 

902 else: 

903 logger.warning( 

904 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}" 

905 ) 

906 except Exception as resource_error: 

907 logger.exception( 

908 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}" 

909 ) 

910 logger.info( 

911 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed" 

912 ) 

913 except Exception: 

914 logger.exception( 

915 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}" 

916 ) 

917 

918 if settings.generate_rag: 

919 logger.info( 

920 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}" 

921 ) 

922 try: 

923 # Get embedding settings from user configuration 

924 embedding_model = settings_manager.get_setting( 

925 "local_search_embedding_model", 

926 "all-MiniLM-L6-v2", 

927 ) 

928 embedding_provider = ( 

929 settings_manager.get_setting( 

930 "local_search_embedding_provider", 

931 "sentence_transformers", 

932 ) 

933 ) 

934 chunk_size = int( 

935 settings_manager.get_setting( 

936 "local_search_chunk_size", 1000 

937 ) 

938 ) 

939 chunk_overlap = int( 

940 settings_manager.get_setting( 

941 "local_search_chunk_overlap", 200 

942 ) 

943 ) 

944 

945 # Initialize RAG service with user's embedding configuration 

946 with LibraryRAGService( 

947 username=username, 

948 embedding_model=embedding_model, 

949 embedding_provider=embedding_provider, 

950 chunk_size=chunk_size, 

951 chunk_overlap=chunk_overlap, 

952 db_password=password, 

953 ) as rag_service: 

954 # Get default Library collection ID 

955 library_collection_id = ( 

956 get_default_library_id( 

957 username, password 

958 ) 

959 ) 

960 

961 # Query for unindexed documents from this research session 

962 documents_to_index = ( 

963 db.query(Document.id, Document.title) 

964 .outerjoin( 

965 DocumentCollection, 

966 ( 

967 DocumentCollection.document_id 

968 == Document.id 

969 ) 

970 & ( 

971 DocumentCollection.collection_id 

972 == library_collection_id 

973 ), 

974 ) 

975 .filter( 

976 Document.research_id == research.id, 

977 Document.text_content.isnot(None), 

978 ( 

979 DocumentCollection.indexed.is_( 

980 False 

981 ) 

982 | DocumentCollection.id.is_( 

983 None 

984 ) 

985 ), 

986 ) 

987 .all() 

988 ) 

989 

990 if not documents_to_index: 

991 logger.info( 

992 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}" 

993 ) 

994 else: 

995 # Index each document 

996 indexed_count = 0 

997 for ( 

998 doc_id, 

999 doc_title, 

1000 ) in documents_to_index: 

1001 try: 

1002 result = rag_service.index_document( 

1003 document_id=doc_id, 

1004 collection_id=library_collection_id, 

1005 force_reindex=False, 

1006 ) 

1007 if ( 1007 ↛ 997line 1007 didn't jump to line 997 because the condition on line 1007 was always true

1008 result["status"] 

1009 == "success" 

1010 ): 

1011 indexed_count += 1 

1012 logger.info( 

1013 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) " 

1014 f"with {result.get('chunk_count', 0)} chunks" 

1015 ) 

1016 except Exception as doc_error: 

1017 logger.exception( 

1018 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}" 

1019 ) 

1020 

1021 logger.info( 

1022 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: " 

1023 f"{indexed_count}/{len(documents_to_index)} documents indexed" 

1024 ) 

1025 except Exception: 

1026 logger.exception( 

1027 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}" 

1028 ) 

1029 

1030 processed_count += 1 

1031 logger.debug( 

1032 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}" 

1033 ) 

1034 

1035 except Exception: 

1036 logger.exception( 

1037 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}" 

1038 ) 

1039 

1040 # Update last run time in user's settings 

1041 current_time = datetime.now(UTC).isoformat() 

1042 settings_manager.set_setting( 

1043 "document_scheduler.last_run", current_time, commit=True 

1044 ) 

1045 logger.debug( 

1046 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}" 

1047 ) 

1048 

1049 end_time = datetime.now(UTC) 

1050 duration = (end_time - start_time).total_seconds() 

1051 logger.info( 

1052 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s" 

1053 ) 

1054 

1055 except Exception: 

1056 logger.exception( 

1057 f"[DOC_SCHEDULER] Error processing documents for user {username}" 

1058 ) 

1059 

1060 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]: 

1061 """Get document scheduler status for a specific user.""" 

1062 try: 

1063 session_info = self.user_sessions.get(username) 

1064 if not session_info: 

1065 return { 

1066 "enabled": False, 

1067 "message": "User not found in scheduler", 

1068 } 

1069 

1070 # Get user's document scheduler settings (cached) 

1071 settings = self._get_document_scheduler_settings(username) 

1072 

1073 # Check if user has document processing job 

1074 job_id = f"{username}_document_processing" 

1075 has_job = job_id in session_info.get("scheduled_jobs", set()) 

1076 

1077 return { 

1078 "enabled": settings.enabled, 

1079 "interval_seconds": settings.interval_seconds, 

1080 "processing_options": { 

1081 "download_pdfs": settings.download_pdfs, 

1082 "extract_text": settings.extract_text, 

1083 "generate_rag": settings.generate_rag, 

1084 }, 

1085 "last_run": settings.last_run, 

1086 "has_scheduled_job": has_job, 

1087 "user_active": username in self.user_sessions, 

1088 } 

1089 

1090 except Exception as e: 

1091 logger.exception( 

1092 f"Error getting document scheduler status for user {username}" 

1093 ) 

1094 return { 

1095 "enabled": False, 

1096 "message": f"Failed to retrieve scheduler status: {type(e).__name__}", 

1097 } 

1098 

1099 def trigger_document_processing(self, username: str) -> bool: 

1100 """Trigger immediate document processing for a user.""" 

1101 logger.info( 

1102 f"[DOC_SCHEDULER] Manual trigger requested for user {username}" 

1103 ) 

1104 try: 

1105 session_info = self.user_sessions.get(username) 

1106 if not session_info: 

1107 logger.warning( 

1108 f"[DOC_SCHEDULER] User {username} not found in scheduler" 

1109 ) 

1110 logger.debug( 

1111 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}" 

1112 ) 

1113 return False 

1114 

1115 if not self.is_running: 

1116 logger.warning( 

1117 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}" 

1118 ) 

1119 return False 

1120 

1121 # Trigger immediate processing 

1122 job_id = f"{username}_document_processing_manual" 

1123 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}") 

1124 

1125 self.scheduler.add_job( 

1126 func=self._wrap_job(self._process_user_documents), 

1127 args=[username], 

1128 trigger="date", 

1129 run_date=datetime.now(UTC) + timedelta(seconds=1), 

1130 id=job_id, 

1131 name=f"Manual Document Processing for {username}", 

1132 replace_existing=True, 

1133 ) 

1134 

1135 # Verify job was added 

1136 job = self.scheduler.get_job(job_id) 

1137 if job: 

1138 logger.info( 

1139 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}" 

1140 ) 

1141 else: 

1142 logger.error( 

1143 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!" 

1144 ) 

1145 return False 

1146 

1147 return True 

1148 

1149 except Exception: 

1150 logger.exception( 

1151 f"[DOC_SCHEDULER] Error triggering document processing for user {username}" 

1152 ) 

1153 return False 

1154 

1155 @thread_cleanup 

1156 def _check_user_overdue_subscriptions(self, username: str): 

1157 """Check and immediately run any overdue subscriptions for a user.""" 

1158 try: 

1159 session_info = self.user_sessions.get(username) 

1160 if not session_info: 

1161 return 

1162 

1163 password = self._credential_store.retrieve(username) 

1164 if not password: 

1165 return 

1166 

1167 # Get user's overdue subscriptions 

1168 from ...database.session_context import get_user_db_session 

1169 from ...database.models.news import NewsSubscription 

1170 from datetime import timezone 

1171 

1172 with get_user_db_session(username, password) as db: 

1173 now = datetime.now(timezone.utc) 

1174 overdue_subs = ( 

1175 db.query(NewsSubscription) 

1176 .filter( 

1177 NewsSubscription.is_active.is_(True), 

1178 NewsSubscription.next_refresh.is_not(None), 

1179 NewsSubscription.next_refresh <= now, 

1180 ) 

1181 .all() 

1182 ) 

1183 

1184 if overdue_subs: 

1185 logger.info( 

1186 f"Found {len(overdue_subs)} overdue subscriptions for {username}" 

1187 ) 

1188 

1189 for sub in overdue_subs: 

1190 # Run immediately with small random delay 

1191 # Security: random delay to stagger overdue jobs, not security-sensitive 

1192 delay_seconds = random.randint(1, 30) 

1193 job_id = ( 

1194 f"overdue_{username}_{sub.id}_{int(now.timestamp())}" 

1195 ) 

1196 

1197 self.scheduler.add_job( 

1198 func=self._wrap_job(self._check_subscription), 

1199 args=[username, sub.id], 

1200 trigger="date", 

1201 run_date=now + timedelta(seconds=delay_seconds), 

1202 id=job_id, 

1203 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}", 

1204 replace_existing=True, 

1205 ) 

1206 

1207 logger.info( 

1208 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds" 

1209 ) 

1210 

1211 except Exception: 

1212 logger.exception( 

1213 f"Error checking overdue subscriptions for {username}" 

1214 ) 

1215 

1216 @thread_cleanup 

1217 def _check_subscription(self, username: str, subscription_id: int): 

1218 """Check and refresh a single subscription.""" 

1219 logger.info( 

1220 f"_check_subscription called for user {username}, subscription {subscription_id}" 

1221 ) 

1222 try: 

1223 session_info = self.user_sessions.get(username) 

1224 if not session_info: 

1225 # User no longer active, cancel job 

1226 job_id = f"{username}_{subscription_id}" 

1227 try: 

1228 self.scheduler.remove_job(job_id) 

1229 except JobLookupError: 

1230 pass 

1231 return 

1232 

1233 password = self._credential_store.retrieve(username) 

1234 if not password: 1234 ↛ 1235line 1234 didn't jump to line 1235 because the condition on line 1234 was never true

1235 logger.warning( 

1236 f"Credentials expired for {username}, skipping subscription check" 

1237 ) 

1238 return 

1239 

1240 # Get subscription details 

1241 from ...database.session_context import get_user_db_session 

1242 from ...database.models.news import NewsSubscription 

1243 

1244 with get_user_db_session(username, password) as db: 

1245 sub = db.query(NewsSubscription).get(subscription_id) 

1246 if not sub or not sub.is_active: 

1247 logger.info( 

1248 f"Subscription {subscription_id} not active, skipping" 

1249 ) 

1250 return 

1251 

1252 # Prepare query with date replacement using user's timezone 

1253 query = sub.query_or_topic 

1254 if "YYYY-MM-DD" in query: 

1255 from ..core.utils import get_local_date_string 

1256 from ...settings.manager import SettingsManager 

1257 

1258 settings_manager = SettingsManager(db) 

1259 local_date = get_local_date_string(settings_manager) 

1260 query = query.replace("YYYY-MM-DD", local_date) 

1261 

1262 # Update last/next refresh times 

1263 sub.last_refresh = datetime.now(UTC) 

1264 sub.next_refresh = datetime.now(UTC) + timedelta( 

1265 minutes=sub.refresh_interval_minutes 

1266 ) 

1267 db.commit() 

1268 

1269 subscription_data = { 

1270 "id": sub.id, 

1271 "name": sub.name, 

1272 "query": query, 

1273 "original_query": sub.query_or_topic, 

1274 "model_provider": sub.model_provider, 

1275 "model": sub.model, 

1276 "search_strategy": sub.search_strategy, 

1277 "search_engine": sub.search_engine, 

1278 } 

1279 

1280 logger.info( 

1281 f"Refreshing subscription {subscription_id}: {subscription_data['name']}" 

1282 ) 

1283 

1284 # Trigger research synchronously using requests with proper auth 

1285 self._trigger_subscription_research_sync( 

1286 username, subscription_data 

1287 ) 

1288 

1289 # Reschedule for next interval if using interval trigger 

1290 job_id = f"{username}_{subscription_id}" 

1291 job = self.scheduler.get_job(job_id) 

1292 if job and job.trigger.__class__.__name__ == "DateTrigger": 

1293 # For date triggers, reschedule 

1294 # Security: random jitter to distribute subscription timing, not security-sensitive 

1295 next_run = datetime.now(UTC) + timedelta( 

1296 minutes=sub.refresh_interval_minutes, 

1297 seconds=random.randint( 

1298 0, int(self.config.get("max_jitter_seconds", 300)) 

1299 ), 

1300 ) 

1301 self.scheduler.add_job( 

1302 func=self._wrap_job(self._check_subscription), 

1303 args=[username, subscription_id], 

1304 trigger="date", 

1305 run_date=next_run, 

1306 id=job_id, 

1307 replace_existing=True, 

1308 ) 

1309 

1310 except Exception: 

1311 logger.exception(f"Error checking subscription {subscription_id}") 

1312 

1313 @thread_cleanup 

1314 def _trigger_subscription_research_sync( 

1315 self, username: str, subscription: Dict[str, Any] 

1316 ): 

1317 """Trigger research for a subscription using programmatic API.""" 

1318 from ...config.thread_settings import set_settings_context 

1319 

1320 try: 

1321 # Get user's password from session info 

1322 session_info = self.user_sessions.get(username) 

1323 if not session_info: 

1324 logger.error(f"No session info for user {username}") 

1325 return 

1326 

1327 password = self._credential_store.retrieve(username) 

1328 if not password: 1328 ↛ 1329line 1328 didn't jump to line 1329 because the condition on line 1328 was never true

1329 logger.error(f"Credentials expired for user {username}") 

1330 return 

1331 

1332 # Generate research ID 

1333 import uuid 

1334 

1335 research_id = str(uuid.uuid4()) 

1336 

1337 logger.info( 

1338 f"Starting research {research_id} for subscription {subscription['id']}" 

1339 ) 

1340 

1341 # Get user settings for research 

1342 from ...database.session_context import get_user_db_session 

1343 from ...settings.manager import SettingsManager 

1344 

1345 with get_user_db_session(username, password) as db: 

1346 settings_manager = SettingsManager(db) 

1347 settings_snapshot = settings_manager.get_settings_snapshot() 

1348 

1349 # Use the search engine from the subscription if specified 

1350 search_engine = subscription.get("search_engine") 

1351 

1352 if search_engine: 

1353 settings_snapshot["search.tool"] = { 

1354 "value": search_engine, 

1355 "ui_element": "select", 

1356 } 

1357 logger.info( 

1358 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}" 

1359 ) 

1360 else: 

1361 # Use the user's default search tool from their settings 

1362 default_search_tool = settings_snapshot.get( 

1363 "search.tool", "auto" 

1364 ) 

1365 logger.info( 

1366 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}" 

1367 ) 

1368 

1369 logger.debug( 

1370 f"Settings snapshot has {len(settings_snapshot)} settings" 

1371 ) 

1372 # Log a few key settings to verify they're present 

1373 logger.debug( 

1374 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}" 

1375 ) 

1376 

1377 # Set up research parameters 

1378 query = subscription["query"] 

1379 

1380 # Build metadata for news search 

1381 metadata = { 

1382 "is_news_search": True, 

1383 "search_type": "news_analysis", 

1384 "display_in": "news_feed", 

1385 "subscription_id": subscription["id"], 

1386 "triggered_by": "scheduler", 

1387 "subscription_name": subscription["name"], 

1388 "title": subscription["name"] if subscription["name"] else None, 

1389 "scheduled_at": datetime.now(UTC).isoformat(), 

1390 "original_query": subscription["original_query"], 

1391 "user_id": username, 

1392 } 

1393 

1394 # Use programmatic API with settings context 

1395 from ...api.research_functions import quick_summary 

1396 

1397 # Create and set settings context for this thread 

1398 settings_context = SnapshotSettingsContext(settings_snapshot) 

1399 set_settings_context(settings_context) 

1400 

1401 # Get search strategy from subscription data (for the API call) 

1402 search_strategy = subscription.get( 

1403 "search_strategy", "news_aggregation" 

1404 ) 

1405 

1406 # Call quick_summary with appropriate parameters 

1407 result = quick_summary( 

1408 query=query, 

1409 research_id=research_id, 

1410 username=username, 

1411 user_password=password, 

1412 settings_snapshot=settings_snapshot, 

1413 search_strategy=search_strategy, 

1414 model_name=subscription.get("model"), 

1415 provider=subscription.get("model_provider"), 

1416 iterations=1, # Single iteration for news 

1417 metadata=metadata, 

1418 search_original_query=False, # Don't send long subscription prompts to search engines 

1419 ) 

1420 

1421 logger.info( 

1422 f"Completed research {research_id} for subscription {subscription['id']}" 

1423 ) 

1424 

1425 # Store the research result in the database 

1426 self._store_research_result( 

1427 username, 

1428 password, 

1429 research_id, 

1430 subscription["id"], 

1431 result, 

1432 subscription, 

1433 ) 

1434 

1435 except Exception: 

1436 logger.exception( 

1437 f"Error triggering research for subscription {subscription['id']}" 

1438 ) 

1439 

1440 def _store_research_result( 

1441 self, 

1442 username: str, 

1443 password: str, 

1444 research_id: str, 

1445 subscription_id: int, 

1446 result: Dict[str, Any], 

1447 subscription: Dict[str, Any], 

1448 ): 

1449 """Store research result in database for news display.""" 

1450 try: 

1451 from ...database.session_context import get_user_db_session 

1452 from ...database.models import ResearchHistory 

1453 from ...settings.manager import SettingsManager 

1454 import json 

1455 

1456 # Convert result to JSON-serializable format 

1457 def make_serializable(obj): 

1458 """Convert non-serializable objects to dictionaries.""" 

1459 if hasattr(obj, "dict"): 

1460 return obj.dict() 

1461 if hasattr(obj, "__dict__"): 1461 ↛ 1462line 1461 didn't jump to line 1462 because the condition on line 1461 was never true

1462 return { 

1463 k: make_serializable(v) 

1464 for k, v in obj.__dict__.items() 

1465 if not k.startswith("_") 

1466 } 

1467 if isinstance(obj, (list, tuple)): 

1468 return [make_serializable(item) for item in obj] 

1469 if isinstance(obj, dict): 

1470 return {k: make_serializable(v) for k, v in obj.items()} 

1471 return obj 

1472 

1473 serializable_result = make_serializable(result) 

1474 

1475 with get_user_db_session(username, password) as db: 

1476 # Get user settings to store in metadata 

1477 settings_manager = SettingsManager(db) 

1478 settings_snapshot = settings_manager.get_settings_snapshot() 

1479 

1480 # Get the report content - check both 'report' and 'summary' fields 

1481 report_content = serializable_result.get( 

1482 "report" 

1483 ) or serializable_result.get("summary") 

1484 logger.debug( 

1485 f"Report content length: {len(report_content) if report_content else 0} chars" 

1486 ) 

1487 

1488 # Extract sources/links from the result 

1489 sources = serializable_result.get("sources", []) 

1490 

1491 # First add the sources/references section if we have sources 

1492 if report_content and sources: 

1493 # Import utilities for formatting links 

1494 from ...utilities.search_utilities import ( 

1495 format_links_to_markdown, 

1496 ) 

1497 

1498 # Format the links/citations 

1499 formatted_links = format_links_to_markdown(sources) 

1500 

1501 # Add references section to the report 

1502 if formatted_links: 

1503 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}" 

1504 

1505 # Then format citations in the report content 

1506 if report_content: 

1507 # Import citation formatter 

1508 from ...text_optimization.citation_formatter import ( 

1509 CitationFormatter, 

1510 CitationMode, 

1511 ) 

1512 from ...config.search_config import ( 

1513 get_setting_from_snapshot, 

1514 ) 

1515 

1516 # Get citation format from settings 

1517 citation_format = get_setting_from_snapshot( 

1518 "report.citation_format", "domain_id_hyperlinks" 

1519 ) 

1520 mode_map = { 

1521 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

1522 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

1523 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

1524 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

1525 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

1526 } 

1527 mode = mode_map.get( 

1528 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS 

1529 ) 

1530 formatter = CitationFormatter(mode=mode) 

1531 

1532 # Format citations within the content 

1533 report_content = formatter.format_document(report_content) 

1534 

1535 if not report_content: 

1536 # If neither field exists, use the full result as JSON 

1537 report_content = json.dumps(serializable_result) 

1538 

1539 # Generate headline and topics for news searches 

1540 from ...news.utils.headline_generator import generate_headline 

1541 from ...news.utils.topic_generator import generate_topics 

1542 

1543 query_text = result.get( 

1544 "query", subscription.get("query", "News Update") 

1545 ) 

1546 

1547 # Generate headline from the actual research findings 

1548 logger.info( 

1549 f"Generating headline for subscription {subscription_id}" 

1550 ) 

1551 generated_headline = generate_headline( 

1552 query=query_text, 

1553 findings=report_content, 

1554 max_length=200, # Allow longer headlines for news 

1555 ) 

1556 

1557 # Generate topics from the findings 

1558 logger.info( 

1559 f"Generating topics for subscription {subscription_id}" 

1560 ) 

1561 generated_topics = generate_topics( 

1562 query=query_text, 

1563 findings=report_content, 

1564 category=subscription.get("name", "News"), 

1565 max_topics=6, 

1566 ) 

1567 

1568 logger.info( 

1569 f"Generated headline: {generated_headline}, topics: {generated_topics}" 

1570 ) 

1571 

1572 # Get subscription name for metadata 

1573 subscription_name = subscription.get("name", "") 

1574 

1575 # Use generated headline as title, or fallback 

1576 if generated_headline: 

1577 title = generated_headline 

1578 else: 

1579 if subscription_name: 

1580 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1581 else: 

1582 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1583 

1584 # Create research history entry 

1585 history_entry = ResearchHistory( 

1586 id=research_id, 

1587 query=result.get("query", ""), 

1588 mode="news_subscription", 

1589 status="completed", 

1590 created_at=datetime.now(UTC).isoformat(), 

1591 completed_at=datetime.now(UTC).isoformat(), 

1592 title=title, 

1593 research_meta={ 

1594 "subscription_id": subscription_id, 

1595 "triggered_by": "scheduler", 

1596 "is_news_search": True, 

1597 "username": username, 

1598 "subscription_name": subscription_name, # Store subscription name for display 

1599 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval 

1600 "generated_headline": generated_headline, # Store generated headline for news display 

1601 "generated_topics": generated_topics, # Store topics for categorization 

1602 }, 

1603 ) 

1604 db.add(history_entry) 

1605 db.commit() 

1606 

1607 # Store the report content using storage abstraction 

1608 from ...storage import get_report_storage 

1609 

1610 # Use storage to save the report (report_content already retrieved above) 

1611 storage = get_report_storage(session=db) 

1612 storage.save_report( 

1613 research_id=research_id, 

1614 content=report_content, 

1615 username=username, 

1616 ) 

1617 

1618 logger.info( 

1619 f"Stored research result {research_id} for subscription {subscription_id}" 

1620 ) 

1621 

1622 except Exception: 

1623 logger.exception("Error storing research result") 

1624 

1625 def _run_cleanup_with_tracking(self): 

1626 """Wrapper that tracks cleanup execution.""" 

1627 

1628 try: 

1629 cleaned_count = self._cleanup_inactive_users() 

1630 

1631 logger.info( 

1632 f"Cleanup successful: removed {cleaned_count} inactive users" 

1633 ) 

1634 

1635 except Exception: 

1636 logger.exception("Cleanup job failed") 

1637 

1638 def _cleanup_inactive_users(self) -> int: 

1639 """Remove users inactive for longer than retention period.""" 

1640 retention_hours = self.config.get("retention_hours", 48) 

1641 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours) 

1642 

1643 cleaned_count = 0 

1644 

1645 with self.lock: 

1646 inactive_users = [ 

1647 user_id 

1648 for user_id, session in self.user_sessions.items() 

1649 if session["last_activity"] < cutoff 

1650 ] 

1651 

1652 for user_id in inactive_users: 

1653 # Remove all scheduled jobs 

1654 for job_id in self.user_sessions[user_id][ 

1655 "scheduled_jobs" 

1656 ].copy(): 

1657 try: 

1658 self.scheduler.remove_job(job_id) 

1659 except JobLookupError: 

1660 pass 

1661 

1662 # Clear credentials and session data 

1663 self._credential_store.clear(user_id) 

1664 del self.user_sessions[user_id] 

1665 cleaned_count += 1 

1666 logger.info(f"Cleaned up inactive user {user_id}") 

1667 

1668 return cleaned_count 

1669 

1670 def _reload_config(self): 

1671 """Reload configuration from settings manager.""" 

1672 if not hasattr(self, "settings_manager") or not self.settings_manager: 

1673 return 

1674 

1675 try: 

1676 old_retention = self.config.get("retention_hours", 48) 

1677 

1678 # Reload all settings 

1679 for key in self.config: 

1680 if key == "enabled": 

1681 continue # Don't change enabled state while running 

1682 

1683 full_key = f"news.scheduler.{key}" 

1684 self.config[key] = self._get_setting(full_key, self.config[key]) 

1685 

1686 # Handle changes that need immediate action 

1687 if old_retention != self.config["retention_hours"]: 

1688 logger.info( 

1689 f"Retention period changed from {old_retention} " 

1690 f"to {self.config['retention_hours']} hours" 

1691 ) 

1692 # Trigger immediate cleanup with new retention 

1693 self.scheduler.add_job( 

1694 self._wrap_job(self._run_cleanup_with_tracking), 

1695 "date", 

1696 run_date=datetime.now(UTC) + timedelta(seconds=5), 

1697 id="immediate_cleanup_config_change", 

1698 ) 

1699 

1700 # Clear settings cache to pick up any user setting changes 

1701 self.invalidate_all_settings_cache() 

1702 

1703 except Exception: 

1704 logger.exception("Error reloading configuration") 

1705 

1706 def get_status(self) -> Dict[str, Any]: 

1707 """Get scheduler status information.""" 

1708 with self.lock: 

1709 active_users = len(self.user_sessions) 

1710 total_jobs = sum( 

1711 len(session["scheduled_jobs"]) 

1712 for session in self.user_sessions.values() 

1713 ) 

1714 

1715 # Get next run time for cleanup job 

1716 next_cleanup = None 

1717 if self.is_running: 

1718 job = self.scheduler.get_job("cleanup_inactive_users") 

1719 if job: 1719 ↛ 1722line 1719 didn't jump to line 1722 because the condition on line 1719 was always true

1720 next_cleanup = job.next_run_time 

1721 

1722 return { 

1723 "is_running": self.is_running, 

1724 "config": self.config, 

1725 "active_users": active_users, 

1726 "total_scheduled_jobs": total_jobs, 

1727 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None, 

1728 "memory_usage": self._estimate_memory_usage(), 

1729 } 

1730 

1731 def _estimate_memory_usage(self) -> int: 

1732 """Estimate memory usage of user sessions.""" 

1733 

1734 # Rough estimate: username (50) + password (100) + metadata (200) per user 

1735 per_user_estimate = 350 

1736 return len(self.user_sessions) * per_user_estimate 

1737 

1738 def get_user_sessions_summary(self) -> List[Dict[str, Any]]: 

1739 """Get summary of active user sessions (without passwords).""" 

1740 with self.lock: 

1741 summary = [] 

1742 for user_id, session in self.user_sessions.items(): 

1743 summary.append( 

1744 { 

1745 "user_id": user_id, 

1746 "last_activity": session["last_activity"].isoformat(), 

1747 "scheduled_jobs": len(session["scheduled_jobs"]), 

1748 "time_since_activity": str( 

1749 datetime.now(UTC) - session["last_activity"] 

1750 ), 

1751 } 

1752 ) 

1753 return summary 

1754 

1755 

1756# Singleton instance getter 

1757_scheduler_instance = None 

1758 

1759 

1760def get_news_scheduler() -> NewsScheduler: 

1761 """Get the singleton news scheduler instance.""" 

1762 global _scheduler_instance 

1763 if _scheduler_instance is None: 

1764 _scheduler_instance = NewsScheduler() 

1765 return _scheduler_instance