Coverage for src / local_deep_research / news / subscription_manager / scheduler.py: 78%
626 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Activity-based news subscription scheduler for per-user encrypted databases.
3Tracks user activity and temporarily stores credentials for automatic updates.
4"""
6import random
7import threading
8from dataclasses import dataclass
9from datetime import datetime, timedelta, UTC
10from typing import Any, Dict, List
12from cachetools import TTLCache
13from loguru import logger
14from local_deep_research.settings.logger import log_settings
16from apscheduler.schedulers.background import BackgroundScheduler
17from apscheduler.jobstores.base import JobLookupError
18from ...constants import ResearchStatus
20# RAG indexing imports
21from ...research_library.services.library_rag_service import LibraryRAGService
22from ...database.library_init import get_default_library_id
23from ...database.models.library import Document, DocumentCollection
26SCHEDULER_AVAILABLE = True # Always available since it's a required dependency
29@dataclass(frozen=True)
30class DocumentSchedulerSettings:
31 """
32 Immutable settings snapshot for document scheduler.
34 Thread-safe: This is a frozen dataclass that can be safely passed
35 to and used from background threads.
36 """
38 enabled: bool = True
39 interval_seconds: int = 1800
40 download_pdfs: bool = False
41 extract_text: bool = True
42 generate_rag: bool = False
43 last_run: str = ""
45 @classmethod
46 def defaults(cls) -> "DocumentSchedulerSettings":
47 """Return default settings."""
48 return cls()
51class NewsScheduler:
52 """
53 Singleton scheduler that manages news subscriptions for active users.
55 This scheduler:
56 - Monitors user activity through database access
57 - Temporarily stores user credentials in memory
58 - Automatically schedules subscription checks
59 - Cleans up inactive users after configurable period
60 """
62 _instance = None
63 _lock = threading.Lock()
65 def __new__(cls):
66 """Ensure singleton instance."""
67 if cls._instance is None:
68 with cls._lock:
69 if cls._instance is None: 69 ↛ 71line 69 didn't jump to line 71
70 cls._instance = super().__new__(cls)
71 return cls._instance
73 def __init__(self):
74 """Initialize the scheduler (only runs once due to singleton)."""
75 # Skip if already initialized
76 if hasattr(self, "_initialized"):
77 return
79 # User session tracking
80 self.user_sessions = {} # user_id -> {password, last_activity, scheduled_jobs}
81 self.lock = threading.Lock()
83 # Scheduler instance
84 self.scheduler = BackgroundScheduler()
86 # Configuration (will be loaded from settings)
87 self.config = self._load_default_config()
89 # State
90 self.is_running = False
92 # Settings cache: username -> DocumentSchedulerSettings
93 # TTL of 300 seconds (5 minutes) reduces database queries
94 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300)
95 self._settings_cache_lock = threading.Lock()
97 self._initialized = True
98 logger.info("News scheduler initialized")
100 def _load_default_config(self) -> Dict[str, Any]:
101 """Load default configuration (will be overridden by settings manager)."""
102 return {
103 "enabled": True,
104 "retention_hours": 48,
105 "cleanup_interval_hours": 1,
106 "max_jitter_seconds": 300,
107 "max_concurrent_jobs": 10,
108 "subscription_batch_size": 5,
109 "activity_check_interval_minutes": 5,
110 }
112 def initialize_with_settings(self, settings_manager):
113 """Initialize configuration from settings manager."""
114 try:
115 # Load all scheduler settings
116 self.settings_manager = settings_manager
117 self.config = {
118 "enabled": self._get_setting("news.scheduler.enabled", True),
119 "retention_hours": self._get_setting(
120 "news.scheduler.retention_hours", 48
121 ),
122 "cleanup_interval_hours": self._get_setting(
123 "news.scheduler.cleanup_interval_hours", 1
124 ),
125 "max_jitter_seconds": self._get_setting(
126 "news.scheduler.max_jitter_seconds", 300
127 ),
128 "max_concurrent_jobs": self._get_setting(
129 "news.scheduler.max_concurrent_jobs", 10
130 ),
131 "subscription_batch_size": self._get_setting(
132 "news.scheduler.batch_size", 5
133 ),
134 "activity_check_interval_minutes": self._get_setting(
135 "news.scheduler.activity_check_interval", 5
136 ),
137 }
138 log_settings(self.config, "Scheduler configuration loaded")
139 except Exception:
140 logger.exception("Error loading scheduler settings")
141 # Keep default config
143 def _get_setting(self, key: str, default: Any) -> Any:
144 """Get setting with fallback to default."""
145 if hasattr(self, "settings_manager") and self.settings_manager:
146 return self.settings_manager.get_setting(key, default=default)
147 return default
149 def _get_document_scheduler_settings(
150 self, username: str, force_refresh: bool = False
151 ) -> DocumentSchedulerSettings:
152 """
153 Get document scheduler settings for a user with TTL caching.
155 This is the single source of truth for document scheduler settings.
156 Settings are cached for 5 minutes by default to reduce database queries.
158 Args:
159 username: User to get settings for
160 force_refresh: If True, bypass cache and fetch fresh settings
162 Returns:
163 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety)
164 """
165 # Fast path: check cache without modifying it
166 if not force_refresh:
167 with self._settings_cache_lock:
168 cached = self._settings_cache.get(username)
169 if cached is not None:
170 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}")
171 return cached
173 # Cache miss - need to fetch from database
174 logger.debug(
175 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB"
176 )
178 # Get password from session
179 session_info = self.user_sessions.get(username)
180 if not session_info:
181 logger.warning(
182 f"[SETTINGS_CACHE] No session info for {username}, using defaults"
183 )
184 return DocumentSchedulerSettings.defaults()
186 password = session_info["password"]
188 # Fetch settings from database (outside lock to avoid blocking)
189 try:
190 from ...database.session_context import get_user_db_session
191 from ...settings.manager import SettingsManager
193 with get_user_db_session(username, password) as db:
194 sm = SettingsManager(db)
196 settings = DocumentSchedulerSettings(
197 enabled=sm.get_setting("document_scheduler.enabled", True),
198 interval_seconds=sm.get_setting(
199 "document_scheduler.interval_seconds", 1800
200 ),
201 download_pdfs=sm.get_setting(
202 "document_scheduler.download_pdfs", False
203 ),
204 extract_text=sm.get_setting(
205 "document_scheduler.extract_text", True
206 ),
207 generate_rag=sm.get_setting(
208 "document_scheduler.generate_rag", False
209 ),
210 last_run=sm.get_setting("document_scheduler.last_run", ""),
211 )
213 # Store in cache
214 with self._settings_cache_lock:
215 self._settings_cache[username] = settings
216 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}")
218 return settings
220 except Exception:
221 logger.exception(
222 f"[SETTINGS_CACHE] Error fetching settings for {username}"
223 )
224 return DocumentSchedulerSettings.defaults()
226 def invalidate_user_settings_cache(self, username: str) -> bool:
227 """
228 Invalidate cached settings for a specific user.
230 Call this when user settings change or user logs out.
232 Args:
233 username: User whose cache to invalidate
235 Returns:
236 True if cache entry was removed, False if not found
237 """
238 with self._settings_cache_lock:
239 if username in self._settings_cache:
240 del self._settings_cache[username]
241 logger.debug(
242 f"[SETTINGS_CACHE] Invalidated cache for {username}"
243 )
244 return True
245 return False
247 def invalidate_all_settings_cache(self) -> int:
248 """
249 Invalidate all cached settings.
251 Call this when doing bulk settings updates or during config reload.
253 Returns:
254 Number of cache entries cleared
255 """
256 with self._settings_cache_lock:
257 count = len(self._settings_cache)
258 self._settings_cache.clear()
259 logger.info(
260 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)"
261 )
262 return count
264 def start(self):
265 """Start the scheduler."""
266 if not self.config.get("enabled", True):
267 logger.info("News scheduler is disabled in settings")
268 return
270 if self.is_running:
271 logger.warning("Scheduler is already running")
272 return
274 # Schedule cleanup job
275 self.scheduler.add_job(
276 self._run_cleanup_with_tracking,
277 "interval",
278 hours=self.config["cleanup_interval_hours"],
279 id="cleanup_inactive_users",
280 name="Cleanup Inactive User Sessions",
281 jitter=60, # Add some jitter to cleanup
282 )
284 # Schedule configuration reload
285 self.scheduler.add_job(
286 self._reload_config,
287 "interval",
288 minutes=30,
289 id="reload_config",
290 name="Reload Configuration",
291 )
293 # Start the scheduler
294 self.scheduler.start()
295 self.is_running = True
297 # Schedule initial cleanup after a delay
298 self.scheduler.add_job(
299 self._run_cleanup_with_tracking,
300 "date",
301 run_date=datetime.now(UTC) + timedelta(seconds=30),
302 id="initial_cleanup",
303 )
305 logger.info("News scheduler started")
307 def stop(self):
308 """Stop the scheduler."""
309 if self.is_running:
310 self.scheduler.shutdown(wait=True)
311 self.is_running = False
313 # Clear all user sessions
314 with self.lock:
315 self.user_sessions.clear()
317 logger.info("News scheduler stopped")
319 def update_user_info(self, username: str, password: str):
320 """
321 Update user info in scheduler. Called on every database interaction.
323 Args:
324 username: User's username
325 password: User's password
326 """
327 logger.info(
328 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}"
329 )
330 logger.debug(
331 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}"
332 )
334 if not self.is_running:
335 logger.warning(
336 f"[SCHEDULER] Scheduler not running, cannot update user {username}"
337 )
338 return
340 with self.lock:
341 now = datetime.now(UTC)
343 if username not in self.user_sessions:
344 # New user - create session info
345 logger.info(f"[SCHEDULER] New user in scheduler: {username}")
346 self.user_sessions[username] = {
347 "password": password,
348 "last_activity": now,
349 "scheduled_jobs": set(),
350 }
351 logger.debug(
352 f"[SCHEDULER] Created session for {username}, scheduling subscriptions"
353 )
354 # Schedule their subscriptions
355 self._schedule_user_subscriptions(username)
356 else:
357 # Existing user - update info
358 logger.info(
359 f"[SCHEDULER] Updating existing user {username} activity, will reschedule"
360 )
361 old_activity = self.user_sessions[username]["last_activity"]
362 activity_delta = now - old_activity
363 logger.debug(
364 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}"
365 )
367 self.user_sessions[username]["password"] = password
368 self.user_sessions[username]["last_activity"] = now
369 logger.debug(
370 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions"
371 )
372 # Reschedule their subscriptions in case they changed
373 self._schedule_user_subscriptions(username)
375 def unregister_user(self, username: str):
376 """
377 Unregister a user and clean up their scheduled jobs.
378 Called when user logs out.
379 """
380 with self.lock:
381 if username in self.user_sessions:
382 logger.info(f"Unregistering user {username}")
384 # Remove all scheduled jobs for this user
385 session_info = self.user_sessions[username]
386 for job_id in session_info["scheduled_jobs"].copy():
387 try:
388 self.scheduler.remove_job(job_id)
389 except JobLookupError:
390 pass
392 # Remove user session
393 del self.user_sessions[username]
395 # Invalidate settings cache for this user (outside lock)
396 self.invalidate_user_settings_cache(username)
397 logger.info(f"User {username} unregistered successfully")
399 def _schedule_user_subscriptions(self, username: str):
400 """Schedule all active subscriptions for a user."""
401 logger.info(f"_schedule_user_subscriptions called for {username}")
402 try:
403 session_info = self.user_sessions.get(username)
404 if not session_info:
405 logger.warning(f"No session info found for {username}")
406 return
408 password = session_info["password"]
409 logger.debug(
410 f"Got password for {username}: {'present' if password else 'missing'}"
411 )
413 # Get user's subscriptions from their encrypted database
414 from ...database.session_context import get_user_db_session
415 from ...database.models.news import NewsSubscription
417 with get_user_db_session(username, password) as db:
418 subscriptions = (
419 db.query(NewsSubscription).filter_by(is_active=True).all()
420 )
421 logger.debug(
422 f"Query executed, found {len(subscriptions)} results"
423 )
425 # Log details of each subscription
426 for sub in subscriptions:
427 logger.debug(
428 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes"
429 )
431 logger.info(
432 f"Found {len(subscriptions)} active subscriptions for {username}"
433 )
435 # Clear old jobs for this user
436 for job_id in session_info["scheduled_jobs"].copy():
437 try:
438 self.scheduler.remove_job(job_id)
439 session_info["scheduled_jobs"].remove(job_id)
440 except JobLookupError:
441 pass
443 # Schedule each subscription with jitter
444 for sub in subscriptions:
445 job_id = f"{username}_{sub.id}"
447 # Calculate jitter
448 # Security: random jitter to distribute subscription timing, not security-sensitive
449 max_jitter = int(self.config.get("max_jitter_seconds", 300))
450 jitter = random.randint(0, max_jitter)
452 # Determine trigger based on frequency
453 refresh_minutes = sub.refresh_interval_minutes
455 if refresh_minutes <= 60: # 60 minutes or less
456 # For hourly or more frequent, use interval trigger
457 trigger = "interval"
458 trigger_args = {
459 "minutes": refresh_minutes,
460 "jitter": jitter,
461 "start_date": datetime.now(UTC), # Start immediately
462 }
463 else:
464 # For less frequent, calculate next run time
465 now = datetime.now(UTC)
466 if sub.next_refresh:
467 # Ensure timezone-aware for comparison with now (UTC)
468 next_refresh_aware = sub.next_refresh
469 if next_refresh_aware.tzinfo is None: 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true
470 logger.warning(
471 f"Subscription {sub.id} has naive (non-tz-aware) "
472 f"next_refresh datetime, assuming UTC"
473 )
474 next_refresh_aware = next_refresh_aware.replace(
475 tzinfo=UTC
476 )
477 if next_refresh_aware <= now:
478 # Subscription is overdue - run it immediately with small jitter
479 logger.info(
480 f"Subscription {sub.id} is overdue, scheduling immediate run"
481 )
482 next_run = now + timedelta(seconds=jitter)
483 else:
484 next_run = next_refresh_aware
485 else:
486 next_run = now + timedelta(
487 minutes=refresh_minutes, seconds=jitter
488 )
490 trigger = "date"
491 trigger_args = {"run_date": next_run}
493 # Add the job
494 self.scheduler.add_job(
495 func=self._check_subscription,
496 args=[username, sub.id],
497 trigger=trigger,
498 id=job_id,
499 name=f"Check {sub.name or sub.query_or_topic[:30]}",
500 replace_existing=True,
501 **trigger_args,
502 )
504 session_info["scheduled_jobs"].add(job_id)
505 logger.info(f"Scheduled job {job_id} with {trigger} trigger")
507 except Exception:
508 logger.exception(f"Error scheduling subscriptions for {username}")
510 # Add document processing for this user
511 self._schedule_document_processing(username)
513 def _schedule_document_processing(self, username: str):
514 """Schedule document processing for a user."""
515 logger.info(
516 f"[DOC_SCHEDULER] Scheduling document processing for {username}"
517 )
518 logger.debug(
519 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}"
520 )
522 try:
523 session_info = self.user_sessions.get(username)
524 if not session_info:
525 logger.warning(
526 f"[DOC_SCHEDULER] No session info found for {username}"
527 )
528 logger.debug(
529 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}"
530 )
531 return
533 logger.debug(
534 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}"
535 )
537 # Get user's document scheduler settings (cached)
538 settings = self._get_document_scheduler_settings(username)
540 if not settings.enabled:
541 logger.info(
542 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}"
543 )
544 return
546 logger.info(
547 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, "
548 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, "
549 f"text={settings.extract_text}, rag={settings.generate_rag}"
550 )
552 # Schedule document processing job
553 job_id = f"{username}_document_processing"
554 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}")
556 # Remove existing document job if any
557 try:
558 self.scheduler.remove_job(job_id)
559 session_info["scheduled_jobs"].discard(job_id)
560 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}")
561 except JobLookupError:
562 logger.debug(
563 f"[DOC_SCHEDULER] No existing job {job_id} to remove"
564 )
565 pass # Job doesn't exist, that's fine
567 # Add new document processing job
568 logger.debug(
569 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s"
570 )
571 self.scheduler.add_job(
572 func=self._process_user_documents,
573 args=[username],
574 trigger="interval",
575 seconds=settings.interval_seconds,
576 id=job_id,
577 name=f"Process Documents for {username}",
578 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously
579 max_instances=1, # Prevent overlapping document processing for same user
580 replace_existing=True,
581 )
583 session_info["scheduled_jobs"].add(job_id)
584 logger.info(
585 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval"
586 )
587 logger.debug(
588 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}"
589 )
591 # Verify job was added
592 job = self.scheduler.get_job(job_id)
593 if job: 593 ↛ 598line 593 didn't jump to line 598 because the condition on line 593 was always true
594 logger.info(
595 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}"
596 )
597 else:
598 logger.error(
599 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!"
600 )
602 except Exception:
603 logger.exception(
604 f"Error scheduling document processing for {username}"
605 )
607 def _process_user_documents(self, username: str):
608 """Process documents for a user."""
609 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}")
610 start_time = datetime.now(UTC)
612 try:
613 session_info = self.user_sessions.get(username)
614 if not session_info:
615 logger.warning(
616 f"[DOC_SCHEDULER] No session info found for user {username}"
617 )
618 return
620 password = session_info["password"]
621 logger.debug(
622 f"[DOC_SCHEDULER] Starting document processing for {username}"
623 )
625 # Get user's document scheduler settings (cached)
626 settings = self._get_document_scheduler_settings(username)
628 logger.info(
629 f"[DOC_SCHEDULER] Processing settings for {username}: "
630 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}"
631 )
633 if not any( 633 ↛ 646line 633 didn't jump to line 646 because the condition on line 633 was always true
634 [
635 settings.download_pdfs,
636 settings.extract_text,
637 settings.generate_rag,
638 ]
639 ):
640 logger.info(
641 f"[DOC_SCHEDULER] No processing options enabled for user {username}"
642 )
643 return
645 # Parse last_run from cached settings
646 last_run = (
647 datetime.fromisoformat(settings.last_run)
648 if settings.last_run
649 else None
650 )
652 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}")
654 # Need database session for queries and updates
655 from ...database.session_context import get_user_db_session
656 from ...database.models.research import ResearchHistory
657 from ...settings.manager import SettingsManager
659 with get_user_db_session(username, password) as db:
660 settings_manager = SettingsManager(db)
662 # Query for completed research since last run
663 logger.debug(
664 f"[DOC_SCHEDULER] Querying for completed research since {last_run}"
665 )
666 query = db.query(ResearchHistory).filter(
667 ResearchHistory.status == ResearchStatus.COMPLETED,
668 ResearchHistory.completed_at.is_not(
669 None
670 ), # Ensure completed_at is not null
671 )
673 if last_run:
674 query = query.filter(
675 ResearchHistory.completed_at > last_run
676 )
678 # Limit to recent research to prevent overwhelming
679 query = query.order_by(
680 ResearchHistory.completed_at.desc()
681 ).limit(20)
683 research_sessions = query.all()
684 logger.debug(
685 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions"
686 )
688 if not research_sessions:
689 logger.info(
690 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}"
691 )
692 return
694 logger.info(
695 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}"
696 )
698 # Log details of each research session
699 for i, research in enumerate(
700 research_sessions[:5]
701 ): # Log first 5 details
702 title_safe = (
703 (research.title[:50] + "...")
704 if research.title
705 else "No title"
706 )
707 completed_safe = (
708 research.completed_at
709 if research.completed_at
710 else "No completion time"
711 )
712 logger.debug(
713 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}"
714 )
716 # Handle completed_at which might be a string or datetime
717 completed_at_obj = None
718 if research.completed_at:
719 if isinstance(research.completed_at, str):
720 try:
721 completed_at_obj = datetime.fromisoformat(
722 research.completed_at.replace("Z", "+00:00")
723 )
724 except (ValueError, TypeError, AttributeError):
725 completed_at_obj = None
726 else:
727 completed_at_obj = research.completed_at
729 logger.debug(
730 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}"
731 )
732 logger.debug(
733 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}"
734 )
735 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}")
736 logger.debug(
737 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}"
738 )
740 processed_count = 0
741 for research in research_sessions:
742 try:
743 logger.info(
744 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}"
745 )
747 # Call actual processing APIs
748 if settings.download_pdfs:
749 logger.info(
750 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}"
751 )
752 try:
753 # Use the DownloadService to queue PDF downloads
754 from ...research_library.services.download_service import (
755 DownloadService,
756 )
758 with DownloadService(
759 username=username, password=password
760 ) as download_service:
761 queued_count = download_service.queue_research_downloads(
762 research.id
763 )
764 logger.info(
765 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}"
766 )
767 except Exception:
768 logger.exception(
769 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}"
770 )
772 if settings.extract_text:
773 logger.info(
774 f"[DOC_SCHEDULER] Extracting text for research {research.id}"
775 )
776 try:
777 # Use the DownloadService to extract text for all resources
778 from ...research_library.services.download_service import (
779 DownloadService,
780 )
781 from ...database.models.research import (
782 ResearchResource,
783 )
785 with DownloadService(
786 username=username, password=password
787 ) as download_service:
788 # Get all resources for this research (reuse existing db session)
789 resources = (
790 db.query(ResearchResource)
791 .filter_by(research_id=research.id)
792 .all()
793 )
794 processed_count = 0
795 for resource in resources:
796 # We need to pass the password to the download service
797 # The DownloadService creates its own database sessions, so we need to ensure password is available
798 try:
799 success, error = (
800 download_service.download_as_text(
801 resource.id
802 )
803 )
804 if success:
805 processed_count += 1
806 logger.info(
807 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}"
808 )
809 else:
810 logger.warning(
811 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}"
812 )
813 except Exception as resource_error:
814 logger.exception(
815 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}"
816 )
817 logger.info(
818 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed"
819 )
820 except Exception:
821 logger.exception(
822 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}"
823 )
825 if settings.generate_rag:
826 logger.info(
827 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}"
828 )
829 try:
830 # Get embedding settings from user configuration
831 embedding_model = settings_manager.get_setting(
832 "local_search_embedding_model",
833 "all-MiniLM-L6-v2",
834 )
835 embedding_provider = (
836 settings_manager.get_setting(
837 "local_search_embedding_provider",
838 "sentence_transformers",
839 )
840 )
841 chunk_size = int(
842 settings_manager.get_setting(
843 "local_search_chunk_size", 1000
844 )
845 )
846 chunk_overlap = int(
847 settings_manager.get_setting(
848 "local_search_chunk_overlap", 200
849 )
850 )
852 # Initialize RAG service with user's embedding configuration
853 with LibraryRAGService(
854 username=username,
855 embedding_model=embedding_model,
856 embedding_provider=embedding_provider,
857 chunk_size=chunk_size,
858 chunk_overlap=chunk_overlap,
859 db_password=password,
860 ) as rag_service:
861 # Get default Library collection ID
862 library_collection_id = (
863 get_default_library_id(
864 username, password
865 )
866 )
868 # Query for unindexed documents from this research session
869 documents_to_index = (
870 db.query(Document.id, Document.title)
871 .outerjoin(
872 DocumentCollection,
873 (
874 DocumentCollection.document_id
875 == Document.id
876 )
877 & (
878 DocumentCollection.collection_id
879 == library_collection_id
880 ),
881 )
882 .filter(
883 Document.research_id == research.id,
884 Document.text_content.isnot(None),
885 (
886 DocumentCollection.indexed.is_(
887 False
888 )
889 | DocumentCollection.id.is_(
890 None
891 )
892 ),
893 )
894 .all()
895 )
897 if not documents_to_index:
898 logger.info(
899 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}"
900 )
901 else:
902 # Index each document
903 indexed_count = 0
904 for (
905 doc_id,
906 doc_title,
907 ) in documents_to_index:
908 try:
909 result = rag_service.index_document(
910 document_id=doc_id,
911 collection_id=library_collection_id,
912 force_reindex=False,
913 )
914 if (
915 result["status"]
916 == "success"
917 ):
918 indexed_count += 1
919 logger.info(
920 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) "
921 f"with {result.get('chunk_count', 0)} chunks"
922 )
923 except Exception as doc_error:
924 logger.exception(
925 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}"
926 )
928 logger.info(
929 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: "
930 f"{indexed_count}/{len(documents_to_index)} documents indexed"
931 )
932 except Exception:
933 logger.exception(
934 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}"
935 )
937 processed_count += 1
938 logger.debug(
939 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}"
940 )
942 except Exception:
943 logger.exception(
944 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}"
945 )
947 # Update last run time in user's settings
948 current_time = datetime.now(UTC).isoformat()
949 settings_manager.set_setting(
950 "document_scheduler.last_run", current_time, commit=True
951 )
952 logger.debug(
953 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}"
954 )
956 end_time = datetime.now(UTC)
957 duration = (end_time - start_time).total_seconds()
958 logger.info(
959 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s"
960 )
962 except Exception:
963 logger.exception(
964 f"[DOC_SCHEDULER] Error processing documents for user {username}"
965 )
966 finally:
967 # Clean up thread-local session after job completes
968 from ...database.thread_local_session import cleanup_current_thread
970 cleanup_current_thread()
972 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]:
973 """Get document scheduler status for a specific user."""
974 try:
975 session_info = self.user_sessions.get(username)
976 if not session_info:
977 return {
978 "enabled": False,
979 "message": "User not found in scheduler",
980 }
982 # Get user's document scheduler settings (cached)
983 settings = self._get_document_scheduler_settings(username)
985 # Check if user has document processing job
986 job_id = f"{username}_document_processing"
987 has_job = job_id in session_info.get("scheduled_jobs", set())
989 return {
990 "enabled": settings.enabled,
991 "interval_seconds": settings.interval_seconds,
992 "processing_options": {
993 "download_pdfs": settings.download_pdfs,
994 "extract_text": settings.extract_text,
995 "generate_rag": settings.generate_rag,
996 },
997 "last_run": settings.last_run,
998 "has_scheduled_job": has_job,
999 "user_active": username in self.user_sessions,
1000 }
1002 except Exception as e:
1003 logger.exception(
1004 f"Error getting document scheduler status for user {username}"
1005 )
1006 return {
1007 "enabled": False,
1008 "message": f"Failed to retrieve scheduler status: {type(e).__name__}",
1009 }
1011 def trigger_document_processing(self, username: str) -> bool:
1012 """Trigger immediate document processing for a user."""
1013 logger.info(
1014 f"[DOC_SCHEDULER] Manual trigger requested for user {username}"
1015 )
1016 try:
1017 session_info = self.user_sessions.get(username)
1018 if not session_info:
1019 logger.warning(
1020 f"[DOC_SCHEDULER] User {username} not found in scheduler"
1021 )
1022 logger.debug(
1023 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}"
1024 )
1025 return False
1027 if not self.is_running:
1028 logger.warning(
1029 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}"
1030 )
1031 return False
1033 # Trigger immediate processing
1034 job_id = f"{username}_document_processing_manual"
1035 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}")
1037 self.scheduler.add_job(
1038 func=self._process_user_documents,
1039 args=[username],
1040 trigger="date",
1041 run_date=datetime.now(UTC) + timedelta(seconds=1),
1042 id=job_id,
1043 name=f"Manual Document Processing for {username}",
1044 replace_existing=True,
1045 )
1047 # Verify job was added
1048 job = self.scheduler.get_job(job_id)
1049 if job:
1050 logger.info(
1051 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}"
1052 )
1053 else:
1054 logger.error(
1055 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!"
1056 )
1057 return False
1059 return True
1061 except Exception:
1062 logger.exception(
1063 f"[DOC_SCHEDULER] Error triggering document processing for user {username}"
1064 )
1065 return False
1067 def _check_user_overdue_subscriptions(self, username: str):
1068 """Check and immediately run any overdue subscriptions for a user."""
1069 try:
1070 session_info = self.user_sessions.get(username)
1071 if not session_info:
1072 return
1074 password = session_info["password"]
1076 # Get user's overdue subscriptions
1077 from ...database.session_context import get_user_db_session
1078 from ...database.models.news import NewsSubscription
1079 from datetime import timezone
1081 with get_user_db_session(username, password) as db:
1082 now = datetime.now(timezone.utc)
1083 overdue_subs = (
1084 db.query(NewsSubscription)
1085 .filter(
1086 NewsSubscription.is_active.is_(True),
1087 NewsSubscription.next_refresh.is_not(None),
1088 NewsSubscription.next_refresh <= now,
1089 )
1090 .all()
1091 )
1093 if overdue_subs:
1094 logger.info(
1095 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1096 )
1098 for sub in overdue_subs:
1099 # Run immediately with small random delay
1100 # Security: random delay to stagger overdue jobs, not security-sensitive
1101 delay_seconds = random.randint(1, 30)
1102 job_id = (
1103 f"overdue_{username}_{sub.id}_{int(now.timestamp())}"
1104 )
1106 self.scheduler.add_job(
1107 func=self._check_subscription,
1108 args=[username, sub.id],
1109 trigger="date",
1110 run_date=now + timedelta(seconds=delay_seconds),
1111 id=job_id,
1112 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}",
1113 replace_existing=True,
1114 )
1116 logger.info(
1117 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds"
1118 )
1120 except Exception:
1121 logger.exception(
1122 f"Error checking overdue subscriptions for {username}"
1123 )
1124 finally:
1125 # Clean up thread-local session after job completes
1126 from ...database.thread_local_session import cleanup_current_thread
1128 cleanup_current_thread()
1130 def _check_subscription(self, username: str, subscription_id: int):
1131 """Check and refresh a single subscription."""
1132 logger.info(
1133 f"_check_subscription called for user {username}, subscription {subscription_id}"
1134 )
1135 try:
1136 session_info = self.user_sessions.get(username)
1137 if not session_info:
1138 # User no longer active, cancel job
1139 job_id = f"{username}_{subscription_id}"
1140 try:
1141 self.scheduler.remove_job(job_id)
1142 except JobLookupError:
1143 pass
1144 return
1146 password = session_info["password"]
1148 # Get subscription details
1149 from ...database.session_context import get_user_db_session
1150 from ...database.models.news import NewsSubscription
1152 with get_user_db_session(username, password) as db:
1153 sub = db.query(NewsSubscription).get(subscription_id)
1154 if not sub or not sub.is_active:
1155 logger.info(
1156 f"Subscription {subscription_id} not active, skipping"
1157 )
1158 return
1160 # Prepare query with date replacement using user's timezone
1161 query = sub.query_or_topic
1162 if "YYYY-MM-DD" in query:
1163 from ..core.utils import get_local_date_string
1164 from ...settings.manager import SettingsManager
1166 settings_manager = SettingsManager(db)
1167 local_date = get_local_date_string(settings_manager)
1168 query = query.replace("YYYY-MM-DD", local_date)
1170 # Update last/next refresh times
1171 sub.last_refresh = datetime.now(UTC)
1172 sub.next_refresh = datetime.now(UTC) + timedelta(
1173 minutes=sub.refresh_interval_minutes
1174 )
1175 db.commit()
1177 subscription_data = {
1178 "id": sub.id,
1179 "name": sub.name,
1180 "query": query,
1181 "original_query": sub.query_or_topic,
1182 "model_provider": sub.model_provider,
1183 "model": sub.model,
1184 "search_strategy": sub.search_strategy,
1185 "search_engine": sub.search_engine,
1186 }
1188 logger.info(
1189 f"Refreshing subscription {subscription_id}: {subscription_data['name']}"
1190 )
1192 # Trigger research synchronously using requests with proper auth
1193 self._trigger_subscription_research_sync(
1194 username, subscription_data
1195 )
1197 # Reschedule for next interval if using interval trigger
1198 job_id = f"{username}_{subscription_id}"
1199 job = self.scheduler.get_job(job_id)
1200 if job and job.trigger.__class__.__name__ == "DateTrigger":
1201 # For date triggers, reschedule
1202 # Security: random jitter to distribute subscription timing, not security-sensitive
1203 next_run = datetime.now(UTC) + timedelta(
1204 minutes=sub.refresh_interval_minutes,
1205 seconds=random.randint(
1206 0, int(self.config.get("max_jitter_seconds", 300))
1207 ),
1208 )
1209 self.scheduler.add_job(
1210 func=self._check_subscription,
1211 args=[username, subscription_id],
1212 trigger="date",
1213 run_date=next_run,
1214 id=job_id,
1215 replace_existing=True,
1216 )
1218 except Exception:
1219 logger.exception(f"Error checking subscription {subscription_id}")
1220 finally:
1221 # Clean up thread-local session after job completes
1222 from ...database.thread_local_session import cleanup_current_thread
1224 cleanup_current_thread()
1226 def _trigger_subscription_research_sync(
1227 self, username: str, subscription: Dict[str, Any]
1228 ):
1229 """Trigger research for a subscription using programmatic API."""
1230 # Import clear_settings_context before try so it is always
1231 # available in the finally block, even if an early return or
1232 # exception occurs before the lazy imports inside try.
1233 from ...config.thread_settings import (
1234 clear_settings_context,
1235 set_settings_context,
1236 )
1238 try:
1239 # Get user's password from session info
1240 session_info = self.user_sessions.get(username)
1241 if not session_info:
1242 logger.error(f"No session info for user {username}")
1243 return
1245 password = session_info["password"]
1247 # Generate research ID
1248 import uuid
1250 research_id = str(uuid.uuid4())
1252 logger.info(
1253 f"Starting research {research_id} for subscription {subscription['id']}"
1254 )
1256 # Get user settings for research
1257 from ...database.session_context import get_user_db_session
1258 from ...settings.manager import SettingsManager
1260 with get_user_db_session(username, password) as db:
1261 settings_manager = SettingsManager(db)
1262 settings_snapshot = settings_manager.get_settings_snapshot()
1264 # Use the search engine from the subscription if specified
1265 search_engine = subscription.get("search_engine")
1267 if search_engine: 1267 ↛ 1277line 1267 didn't jump to line 1277 because the condition on line 1267 was always true
1268 settings_snapshot["search.tool"] = {
1269 "value": search_engine,
1270 "ui_element": "select",
1271 }
1272 logger.info(
1273 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}"
1274 )
1275 else:
1276 # Use the user's default search tool from their settings
1277 default_search_tool = settings_snapshot.get(
1278 "search.tool", "auto"
1279 )
1280 logger.info(
1281 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}"
1282 )
1284 logger.debug(
1285 f"Settings snapshot has {len(settings_snapshot)} settings"
1286 )
1287 # Log a few key settings to verify they're present
1288 logger.debug(
1289 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}"
1290 )
1292 # Set up research parameters
1293 query = subscription["query"]
1295 # Build metadata for news search
1296 metadata = {
1297 "is_news_search": True,
1298 "search_type": "news_analysis",
1299 "display_in": "news_feed",
1300 "subscription_id": subscription["id"],
1301 "triggered_by": "scheduler",
1302 "subscription_name": subscription["name"],
1303 "title": subscription["name"] if subscription["name"] else None,
1304 "scheduled_at": datetime.now(UTC).isoformat(),
1305 "original_query": subscription["original_query"],
1306 "user_id": username,
1307 }
1309 # Use programmatic API with settings context
1310 from ...api.research_functions import quick_summary
1312 # Create and set settings context for this thread
1313 class SettingsContext:
1314 def __init__(self, snapshot):
1315 self.snapshot = snapshot or {}
1316 self.values = {}
1317 for key, setting in self.snapshot.items():
1318 if isinstance(setting, dict) and "value" in setting: 1318 ↛ 1321line 1318 didn't jump to line 1321 because the condition on line 1318 was always true
1319 self.values[key] = setting["value"]
1320 else:
1321 self.values[key] = setting
1323 def get_setting(self, key, default=None):
1324 """Get setting from snapshot only"""
1325 return self.values.get(key, default)
1327 # Set the context for this thread
1328 settings_context = SettingsContext(settings_snapshot)
1329 set_settings_context(settings_context)
1331 # Get search strategy from subscription data (for the API call)
1332 search_strategy = subscription.get(
1333 "search_strategy", "news_aggregation"
1334 )
1336 # Call quick_summary with appropriate parameters
1337 result = quick_summary(
1338 query=query,
1339 research_id=research_id,
1340 username=username,
1341 user_password=password,
1342 settings_snapshot=settings_snapshot,
1343 search_strategy=search_strategy,
1344 model_name=subscription.get("model"),
1345 provider=subscription.get("model_provider"),
1346 iterations=1, # Single iteration for news
1347 metadata=metadata,
1348 search_original_query=False, # Don't send long subscription prompts to search engines
1349 )
1351 logger.info(
1352 f"Completed research {research_id} for subscription {subscription['id']}"
1353 )
1355 # Store the research result in the database
1356 self._store_research_result(
1357 username,
1358 password,
1359 research_id,
1360 subscription["id"],
1361 result,
1362 subscription,
1363 )
1365 except Exception:
1366 logger.exception(
1367 f"Error triggering research for subscription {subscription['id']}"
1368 )
1369 finally:
1370 clear_settings_context()
1372 def _store_research_result(
1373 self,
1374 username: str,
1375 password: str,
1376 research_id: str,
1377 subscription_id: int,
1378 result: Dict[str, Any],
1379 subscription: Dict[str, Any],
1380 ):
1381 """Store research result in database for news display."""
1382 try:
1383 from ...database.session_context import get_user_db_session
1384 from ...database.models import ResearchHistory
1385 from ...settings.manager import SettingsManager
1386 import json
1388 # Convert result to JSON-serializable format
1389 def make_serializable(obj):
1390 """Convert non-serializable objects to dictionaries."""
1391 if hasattr(obj, "dict"): 1391 ↛ 1392line 1391 didn't jump to line 1392 because the condition on line 1391 was never true
1392 return obj.dict()
1393 elif hasattr(obj, "__dict__"): 1393 ↛ 1394line 1393 didn't jump to line 1394 because the condition on line 1393 was never true
1394 return {
1395 k: make_serializable(v)
1396 for k, v in obj.__dict__.items()
1397 if not k.startswith("_")
1398 }
1399 elif isinstance(obj, (list, tuple)):
1400 return [make_serializable(item) for item in obj]
1401 elif isinstance(obj, dict):
1402 return {k: make_serializable(v) for k, v in obj.items()}
1403 else:
1404 return obj
1406 serializable_result = make_serializable(result)
1408 with get_user_db_session(username, password) as db:
1409 # Get user settings to store in metadata
1410 settings_manager = SettingsManager(db)
1411 settings_snapshot = settings_manager.get_settings_snapshot()
1413 # Get the report content - check both 'report' and 'summary' fields
1414 report_content = serializable_result.get(
1415 "report"
1416 ) or serializable_result.get("summary")
1417 logger.debug(
1418 f"Report content length: {len(report_content) if report_content else 0} chars"
1419 )
1421 # Extract sources/links from the result
1422 sources = serializable_result.get("sources", [])
1424 # First add the sources/references section if we have sources
1425 if report_content and sources: 1425 ↛ 1427line 1425 didn't jump to line 1427 because the condition on line 1425 was never true
1426 # Import utilities for formatting links
1427 from ...utilities.search_utilities import (
1428 format_links_to_markdown,
1429 )
1431 # Format the links/citations
1432 formatted_links = format_links_to_markdown(sources)
1434 # Add references section to the report
1435 if formatted_links:
1436 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}"
1438 # Then format citations in the report content
1439 if report_content: 1439 ↛ 1468line 1439 didn't jump to line 1468 because the condition on line 1439 was always true
1440 # Import citation formatter
1441 from ...text_optimization.citation_formatter import (
1442 CitationFormatter,
1443 CitationMode,
1444 )
1445 from ...config.search_config import (
1446 get_setting_from_snapshot,
1447 )
1449 # Get citation format from settings
1450 citation_format = get_setting_from_snapshot(
1451 "report.citation_format", "domain_id_hyperlinks"
1452 )
1453 mode_map = {
1454 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
1455 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
1456 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
1457 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
1458 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
1459 }
1460 mode = mode_map.get(
1461 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS
1462 )
1463 formatter = CitationFormatter(mode=mode)
1465 # Format citations within the content
1466 report_content = formatter.format_document(report_content)
1468 if not report_content: 1468 ↛ 1470line 1468 didn't jump to line 1470 because the condition on line 1468 was never true
1469 # If neither field exists, use the full result as JSON
1470 report_content = json.dumps(serializable_result)
1472 # Generate headline and topics for news searches
1473 from ...news.utils.headline_generator import generate_headline
1474 from ...news.utils.topic_generator import generate_topics
1476 query_text = result.get(
1477 "query", subscription.get("query", "News Update")
1478 )
1480 # Generate headline from the actual research findings
1481 logger.info(
1482 f"Generating headline for subscription {subscription_id}"
1483 )
1484 generated_headline = generate_headline(
1485 query=query_text,
1486 findings=report_content,
1487 max_length=200, # Allow longer headlines for news
1488 )
1490 # Generate topics from the findings
1491 logger.info(
1492 f"Generating topics for subscription {subscription_id}"
1493 )
1494 generated_topics = generate_topics(
1495 query=query_text,
1496 findings=report_content,
1497 category=subscription.get("name", "News"),
1498 max_topics=6,
1499 )
1501 logger.info(
1502 f"Generated headline: {generated_headline}, topics: {generated_topics}"
1503 )
1505 # Get subscription name for metadata
1506 subscription_name = subscription.get("name", "")
1508 # Use generated headline as title, or fallback
1509 if generated_headline: 1509 ↛ 1512line 1509 didn't jump to line 1512 because the condition on line 1509 was always true
1510 title = generated_headline
1511 else:
1512 if subscription_name:
1513 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}"
1514 else:
1515 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}"
1517 # Create research history entry
1518 history_entry = ResearchHistory(
1519 id=research_id,
1520 query=result.get("query", ""),
1521 mode="news_subscription",
1522 status="completed",
1523 created_at=datetime.now(UTC).isoformat(),
1524 completed_at=datetime.now(UTC).isoformat(),
1525 title=title,
1526 research_meta={
1527 "subscription_id": subscription_id,
1528 "triggered_by": "scheduler",
1529 "is_news_search": True,
1530 "username": username,
1531 "subscription_name": subscription_name, # Store subscription name for display
1532 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval
1533 "generated_headline": generated_headline, # Store generated headline for news display
1534 "generated_topics": generated_topics, # Store topics for categorization
1535 },
1536 )
1537 db.add(history_entry)
1538 db.commit()
1540 # Store the report content using storage abstraction
1541 from ...storage import get_report_storage
1543 # Use storage to save the report (report_content already retrieved above)
1544 storage = get_report_storage(session=db)
1545 storage.save_report(
1546 research_id=research_id,
1547 content=report_content,
1548 username=username,
1549 )
1551 logger.info(
1552 f"Stored research result {research_id} for subscription {subscription_id}"
1553 )
1555 except Exception:
1556 logger.exception("Error storing research result")
1558 def _run_cleanup_with_tracking(self):
1559 """Wrapper that tracks cleanup execution."""
1561 try:
1562 cleaned_count = self._cleanup_inactive_users()
1564 logger.info(
1565 f"Cleanup successful: removed {cleaned_count} inactive users"
1566 )
1568 except Exception:
1569 logger.exception("Cleanup job failed")
1571 def _cleanup_inactive_users(self) -> int:
1572 """Remove users inactive for longer than retention period."""
1573 retention_hours = self.config.get("retention_hours", 48)
1574 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours)
1576 cleaned_count = 0
1578 with self.lock:
1579 inactive_users = [
1580 user_id
1581 for user_id, session in self.user_sessions.items()
1582 if session["last_activity"] < cutoff
1583 ]
1585 for user_id in inactive_users:
1586 # Remove all scheduled jobs
1587 for job_id in self.user_sessions[user_id]["scheduled_jobs"]:
1588 try:
1589 self.scheduler.remove_job(job_id)
1590 except JobLookupError:
1591 pass
1593 # Clear password from memory
1594 del self.user_sessions[user_id]
1595 cleaned_count += 1
1596 logger.info(f"Cleaned up inactive user {user_id}")
1598 return cleaned_count
1600 def _reload_config(self):
1601 """Reload configuration from settings manager."""
1602 if not hasattr(self, "settings_manager") or not self.settings_manager:
1603 return
1605 try:
1606 old_retention = self.config.get("retention_hours", 48)
1608 # Reload all settings
1609 for key in self.config:
1610 if key == "enabled":
1611 continue # Don't change enabled state while running
1613 full_key = f"news.scheduler.{key}"
1614 self.config[key] = self._get_setting(full_key, self.config[key])
1616 # Handle changes that need immediate action
1617 if old_retention != self.config["retention_hours"]:
1618 logger.info(
1619 f"Retention period changed from {old_retention} "
1620 f"to {self.config['retention_hours']} hours"
1621 )
1622 # Trigger immediate cleanup with new retention
1623 self.scheduler.add_job(
1624 self._run_cleanup_with_tracking,
1625 "date",
1626 run_date=datetime.now(UTC) + timedelta(seconds=5),
1627 id="immediate_cleanup_config_change",
1628 )
1630 # Clear settings cache to pick up any user setting changes
1631 self.invalidate_all_settings_cache()
1633 except Exception:
1634 logger.exception("Error reloading configuration")
1636 def get_status(self) -> Dict[str, Any]:
1637 """Get scheduler status information."""
1638 with self.lock:
1639 active_users = len(self.user_sessions)
1640 total_jobs = sum(
1641 len(session["scheduled_jobs"])
1642 for session in self.user_sessions.values()
1643 )
1645 # Get next run time for cleanup job
1646 next_cleanup = None
1647 if self.is_running:
1648 job = self.scheduler.get_job("cleanup_inactive_users")
1649 if job: 1649 ↛ 1652line 1649 didn't jump to line 1652 because the condition on line 1649 was always true
1650 next_cleanup = job.next_run_time
1652 return {
1653 "is_running": self.is_running,
1654 "config": self.config,
1655 "active_users": active_users,
1656 "total_scheduled_jobs": total_jobs,
1657 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None,
1658 "memory_usage": self._estimate_memory_usage(),
1659 }
1661 def _estimate_memory_usage(self) -> int:
1662 """Estimate memory usage of user sessions."""
1664 # Rough estimate: username (50) + password (100) + metadata (200) per user
1665 per_user_estimate = 350
1666 return len(self.user_sessions) * per_user_estimate
1668 def get_user_sessions_summary(self) -> List[Dict[str, Any]]:
1669 """Get summary of active user sessions (without passwords)."""
1670 with self.lock:
1671 summary = []
1672 for user_id, session in self.user_sessions.items():
1673 summary.append(
1674 {
1675 "user_id": user_id,
1676 "last_activity": session["last_activity"].isoformat(),
1677 "scheduled_jobs": len(session["scheduled_jobs"]),
1678 "time_since_activity": str(
1679 datetime.now(UTC) - session["last_activity"]
1680 ),
1681 }
1682 )
1683 return summary
1686# Singleton instance getter
1687_scheduler_instance = None
1690def get_news_scheduler() -> NewsScheduler:
1691 """Get the singleton news scheduler instance."""
1692 global _scheduler_instance
1693 if _scheduler_instance is None:
1694 _scheduler_instance = NewsScheduler()
1695 return _scheduler_instance