Coverage for src / local_deep_research / news / subscription_manager / scheduler.py: 97%
666 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Activity-based news subscription scheduler for per-user encrypted databases.
3Tracks user activity and temporarily stores credentials for automatic updates.
4"""
6import random
7import threading
8from dataclasses import dataclass
9from datetime import datetime, timedelta, UTC
10from functools import wraps
11from typing import Any, Callable, Dict, List
13from cachetools import TTLCache
14from loguru import logger
15from ...settings.logger import log_settings
16from ...settings.manager import SnapshotSettingsContext
18from apscheduler.schedulers.background import BackgroundScheduler
19from apscheduler.jobstores.base import JobLookupError
20from ...constants import ResearchStatus
21from ...database.credential_store_base import CredentialStoreBase
22from ...database.thread_local_session import thread_cleanup
24# RAG indexing imports
25from ...research_library.services.library_rag_service import LibraryRAGService
26from ...database.library_init import get_default_library_id
27from ...database.models.library import Document, DocumentCollection
30SCHEDULER_AVAILABLE = True # Always available since it's a required dependency
33class SchedulerCredentialStore(CredentialStoreBase):
34 """Credential store for the news scheduler.
36 Stores user passwords with TTL expiration so that background scheduler
37 jobs can access encrypted per-user databases.
38 """
40 def __init__(self, ttl_hours: int = 48):
41 super().__init__(ttl_hours * 3600)
43 def store(self, username: str, password: str) -> None:
44 """Store password for a user."""
45 self._store_credentials(
46 username, {"username": username, "password": password}
47 )
49 def retrieve(self, username: str) -> str | None:
50 """Retrieve password for a user. Returns None if expired/missing."""
51 result = self._retrieve_credentials(username, remove=False)
52 return result[1] if result else None
54 def clear(self, username: str) -> None:
55 """Clear stored password for a user."""
56 self.clear_entry(username)
59@dataclass(frozen=True)
60class DocumentSchedulerSettings:
61 """
62 Immutable settings snapshot for document scheduler.
64 Thread-safe: This is a frozen dataclass that can be safely passed
65 to and used from background threads.
66 """
68 enabled: bool = True
69 interval_seconds: int = 1800
70 download_pdfs: bool = False
71 extract_text: bool = True
72 generate_rag: bool = False
73 last_run: str = ""
75 @classmethod
76 def defaults(cls) -> "DocumentSchedulerSettings":
77 """Return default settings."""
78 return cls()
81class NewsScheduler:
82 """
83 Singleton scheduler that manages news subscriptions for active users.
85 This scheduler:
86 - Monitors user activity through database access
87 - Temporarily stores user credentials in memory
88 - Automatically schedules subscription checks
89 - Cleans up inactive users after configurable period
90 """
92 _instance = None
93 _lock = threading.Lock()
95 def __new__(cls):
96 """Ensure singleton instance."""
97 if cls._instance is None:
98 with cls._lock:
99 if cls._instance is None: 99 ↛ 101line 99 didn't jump to line 101
100 cls._instance = super().__new__(cls)
101 return cls._instance
103 def __init__(self):
104 """Initialize the scheduler (only runs once due to singleton)."""
105 # Skip if already initialized
106 if hasattr(self, "_initialized"):
107 return
109 # User session tracking
110 self.user_sessions = {} # user_id -> {last_activity, scheduled_jobs}
111 self.lock = threading.Lock()
113 # Credential store with TTL-based expiration
114 self._credential_store = SchedulerCredentialStore(ttl_hours=48)
116 # Scheduler instance
117 self.scheduler = BackgroundScheduler()
119 # Configuration (will be loaded from settings)
120 self.config = self._load_default_config()
122 # State
123 self.is_running = False
124 self._app = None # Flask app reference for background job contexts
126 # Settings cache: username -> DocumentSchedulerSettings
127 # TTL of 300 seconds (5 minutes) reduces database queries
128 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300)
129 self._settings_cache_lock = threading.Lock()
131 self._initialized = True
132 logger.info("News scheduler initialized")
134 def _load_default_config(self) -> Dict[str, Any]:
135 """Load default configuration (will be overridden by settings manager)."""
136 return {
137 "enabled": True,
138 "retention_hours": 48,
139 "cleanup_interval_hours": 1,
140 "max_jitter_seconds": 300,
141 "max_concurrent_jobs": 10,
142 "subscription_batch_size": 5,
143 "activity_check_interval_minutes": 5,
144 }
146 def initialize_with_settings(self, settings_manager):
147 """Initialize configuration from settings manager."""
148 try:
149 # Load all scheduler settings
150 self.settings_manager = settings_manager
151 self.config = {
152 "enabled": self._get_setting("news.scheduler.enabled", True),
153 "retention_hours": self._get_setting(
154 "news.scheduler.retention_hours", 48
155 ),
156 "cleanup_interval_hours": self._get_setting(
157 "news.scheduler.cleanup_interval_hours", 1
158 ),
159 "max_jitter_seconds": self._get_setting(
160 "news.scheduler.max_jitter_seconds", 300
161 ),
162 "max_concurrent_jobs": self._get_setting(
163 "news.scheduler.max_concurrent_jobs", 10
164 ),
165 "subscription_batch_size": self._get_setting(
166 "news.scheduler.batch_size", 5
167 ),
168 "activity_check_interval_minutes": self._get_setting(
169 "news.scheduler.activity_check_interval", 5
170 ),
171 }
172 log_settings(self.config, "Scheduler configuration loaded")
173 except Exception:
174 logger.exception("Error loading scheduler settings")
175 # Keep default config
177 def _get_setting(self, key: str, default: Any) -> Any:
178 """Get setting with fallback to default."""
179 if hasattr(self, "settings_manager") and self.settings_manager:
180 return self.settings_manager.get_setting(key, default=default)
181 return default
183 def set_app(self, app) -> None:
184 """Store a reference to the Flask app for creating app contexts in background jobs."""
185 self._app = app
187 def _wrap_job(self, func: Callable) -> Callable:
188 """Wrap a scheduler job function so it runs inside a Flask app context.
190 APScheduler runs jobs in a thread pool without Flask context.
191 This wrapper pushes an app context before the job runs and pops it after.
192 """
194 @wraps(func)
195 def wrapper(*args, **kwargs):
196 if self._app is not None: 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was always true
197 with self._app.app_context():
198 return func(*args, **kwargs)
199 else:
200 logger.warning(
201 f"No Flask app set on scheduler; running {func.__name__} without app context"
202 )
203 return func(*args, **kwargs)
205 return wrapper
207 def _get_document_scheduler_settings(
208 self, username: str, force_refresh: bool = False
209 ) -> DocumentSchedulerSettings:
210 """
211 Get document scheduler settings for a user with TTL caching.
213 This is the single source of truth for document scheduler settings.
214 Settings are cached for 5 minutes by default to reduce database queries.
216 Args:
217 username: User to get settings for
218 force_refresh: If True, bypass cache and fetch fresh settings
220 Returns:
221 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety)
222 """
223 # Fast path: check cache without modifying it
224 if not force_refresh:
225 with self._settings_cache_lock:
226 cached = self._settings_cache.get(username)
227 if cached is not None:
228 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}")
229 cached_settings: DocumentSchedulerSettings = cached
230 return cached_settings
232 # Cache miss - need to fetch from database
233 logger.debug(
234 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB"
235 )
237 # Get password from session
238 session_info = self.user_sessions.get(username)
239 if not session_info:
240 logger.warning(
241 f"[SETTINGS_CACHE] No session info for {username}, using defaults"
242 )
243 return DocumentSchedulerSettings.defaults()
245 password = self._credential_store.retrieve(username)
246 if not password: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 logger.warning(
248 f"[SETTINGS_CACHE] Credentials expired for {username}, using defaults"
249 )
250 return DocumentSchedulerSettings.defaults()
252 # Fetch settings from database (outside lock to avoid blocking)
253 try:
254 from ...database.session_context import get_user_db_session
255 from ...settings.manager import SettingsManager
257 with get_user_db_session(username, password) as db:
258 sm = SettingsManager(db)
260 settings = DocumentSchedulerSettings(
261 enabled=sm.get_setting("document_scheduler.enabled", True),
262 interval_seconds=sm.get_setting(
263 "document_scheduler.interval_seconds", 1800
264 ),
265 download_pdfs=sm.get_setting(
266 "document_scheduler.download_pdfs", False
267 ),
268 extract_text=sm.get_setting(
269 "document_scheduler.extract_text", True
270 ),
271 generate_rag=sm.get_setting(
272 "document_scheduler.generate_rag", False
273 ),
274 last_run=sm.get_setting("document_scheduler.last_run", ""),
275 )
277 # Store in cache
278 with self._settings_cache_lock:
279 self._settings_cache[username] = settings
280 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}")
282 return settings
284 except Exception:
285 logger.exception(
286 f"[SETTINGS_CACHE] Error fetching settings for {username}"
287 )
288 return DocumentSchedulerSettings.defaults()
290 def invalidate_user_settings_cache(self, username: str) -> bool:
291 """
292 Invalidate cached settings for a specific user.
294 Call this when user settings change or user logs out.
296 Args:
297 username: User whose cache to invalidate
299 Returns:
300 True if cache entry was removed, False if not found
301 """
302 with self._settings_cache_lock:
303 if username in self._settings_cache:
304 del self._settings_cache[username]
305 logger.debug(
306 f"[SETTINGS_CACHE] Invalidated cache for {username}"
307 )
308 return True
309 return False
311 def invalidate_all_settings_cache(self) -> int:
312 """
313 Invalidate all cached settings.
315 Call this when doing bulk settings updates or during config reload.
317 Returns:
318 Number of cache entries cleared
319 """
320 with self._settings_cache_lock:
321 count = len(self._settings_cache)
322 self._settings_cache.clear()
323 logger.info(
324 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)"
325 )
326 return count
328 def start(self):
329 """Start the scheduler."""
330 if not self.config.get("enabled", True):
331 logger.info("News scheduler is disabled in settings")
332 return
334 if self.is_running:
335 logger.warning("Scheduler is already running")
336 return
338 if self._app is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 raise RuntimeError(
340 "NewsScheduler.set_app() must be called before start()"
341 )
343 # Schedule cleanup job
344 self.scheduler.add_job(
345 self._wrap_job(self._run_cleanup_with_tracking),
346 "interval",
347 hours=self.config["cleanup_interval_hours"],
348 id="cleanup_inactive_users",
349 name="Cleanup Inactive User Sessions",
350 jitter=60, # Add some jitter to cleanup
351 )
353 # Schedule configuration reload
354 self.scheduler.add_job(
355 self._wrap_job(self._reload_config),
356 "interval",
357 minutes=30,
358 id="reload_config",
359 name="Reload Configuration",
360 )
362 # Start the scheduler
363 self.scheduler.start()
364 self.is_running = True
366 # Schedule initial cleanup after a delay
367 self.scheduler.add_job(
368 self._wrap_job(self._run_cleanup_with_tracking),
369 "date",
370 run_date=datetime.now(UTC) + timedelta(seconds=30),
371 id="initial_cleanup",
372 )
374 logger.info("News scheduler started")
376 def stop(self):
377 """Stop the scheduler."""
378 if self.is_running:
379 self.scheduler.shutdown(wait=True)
380 self.is_running = False
382 # Clear all user sessions and credentials
383 with self.lock:
384 for username in self.user_sessions:
385 self._credential_store.clear(username)
386 self.user_sessions.clear()
388 logger.info("News scheduler stopped")
390 def update_user_info(self, username: str, password: str):
391 """
392 Update user info in scheduler. Called on every database interaction.
394 Args:
395 username: User's username
396 password: User's password
397 """
398 logger.info(
399 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}"
400 )
401 logger.debug(
402 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}"
403 )
405 if not self.is_running:
406 logger.warning(
407 f"[SCHEDULER] Scheduler not running, cannot update user {username}"
408 )
409 return
411 with self.lock:
412 # Store password in credential store (inside lock to prevent
413 # race where concurrent calls leave mismatched credentials)
414 self._credential_store.store(username, password)
416 now = datetime.now(UTC)
418 if username not in self.user_sessions:
419 # New user - create session info
420 logger.info(f"[SCHEDULER] New user in scheduler: {username}")
421 self.user_sessions[username] = {
422 "last_activity": now,
423 "scheduled_jobs": set(),
424 }
425 logger.debug(
426 f"[SCHEDULER] Created session for {username}, scheduling subscriptions"
427 )
428 # Schedule their subscriptions
429 self._schedule_user_subscriptions(username)
430 else:
431 # Existing user - update info
432 logger.info(
433 f"[SCHEDULER] Updating existing user {username} activity, will reschedule"
434 )
435 old_activity = self.user_sessions[username]["last_activity"]
436 activity_delta = now - old_activity
437 logger.debug(
438 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}"
439 )
441 self.user_sessions[username]["last_activity"] = now
442 logger.debug(
443 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions"
444 )
445 # Reschedule their subscriptions in case they changed
446 self._schedule_user_subscriptions(username)
448 def unregister_user(self, username: str):
449 """
450 Unregister a user and clean up their scheduled jobs.
451 Called when user logs out.
452 """
453 with self.lock:
454 if username in self.user_sessions:
455 logger.info(f"Unregistering user {username}")
457 # Remove all scheduled jobs for this user
458 session_info = self.user_sessions[username]
459 for job_id in session_info["scheduled_jobs"].copy():
460 try:
461 self.scheduler.remove_job(job_id)
462 except JobLookupError:
463 pass
465 # Remove user session and clear credentials atomically
466 del self.user_sessions[username]
467 self._credential_store.clear(username)
469 # Invalidate settings cache for this user (outside lock)
470 self.invalidate_user_settings_cache(username)
471 logger.info(f"User {username} unregistered successfully")
473 def _schedule_user_subscriptions(self, username: str):
474 """Schedule all active subscriptions for a user."""
475 logger.info(f"_schedule_user_subscriptions called for {username}")
476 try:
477 session_info = self.user_sessions.get(username)
478 if not session_info:
479 logger.warning(f"No session info found for {username}")
480 return
482 password = self._credential_store.retrieve(username)
483 if not password: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 logger.warning(
485 f"Credentials expired for {username}, skipping subscription scheduling"
486 )
487 return
488 logger.debug(f"Got password for {username}: present")
490 # Get user's subscriptions from their encrypted database
491 from ...database.session_context import get_user_db_session
492 from ...database.models.news import NewsSubscription
494 with get_user_db_session(username, password) as db:
495 subscriptions = (
496 db.query(NewsSubscription).filter_by(is_active=True).all()
497 )
498 logger.debug(
499 f"Query executed, found {len(subscriptions)} results"
500 )
502 # Log details of each subscription
503 for sub in subscriptions:
504 logger.debug(
505 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes"
506 )
508 logger.info(
509 f"Found {len(subscriptions)} active subscriptions for {username}"
510 )
512 # Clear old jobs for this user
513 for job_id in session_info["scheduled_jobs"].copy():
514 try:
515 self.scheduler.remove_job(job_id)
516 session_info["scheduled_jobs"].remove(job_id)
517 except JobLookupError:
518 pass
520 # Schedule each subscription with jitter
521 for sub in subscriptions:
522 job_id = f"{username}_{sub.id}"
524 # Calculate jitter
525 # Security: random jitter to distribute subscription timing, not security-sensitive
526 max_jitter = int(self.config.get("max_jitter_seconds", 300))
527 jitter = random.randint(0, max_jitter)
529 # Determine trigger based on frequency
530 refresh_minutes = sub.refresh_interval_minutes
532 if refresh_minutes <= 60: # 60 minutes or less
533 # For hourly or more frequent, use interval trigger
534 trigger = "interval"
535 trigger_args = {
536 "minutes": refresh_minutes,
537 "jitter": jitter,
538 "start_date": datetime.now(UTC), # Start immediately
539 }
540 else:
541 # For less frequent, calculate next run time
542 now = datetime.now(UTC)
543 if sub.next_refresh:
544 # Ensure timezone-aware for comparison with now (UTC)
545 next_refresh_aware = sub.next_refresh
546 if next_refresh_aware.tzinfo is None:
547 logger.warning(
548 f"Subscription {sub.id} has naive (non-tz-aware) "
549 f"next_refresh datetime, assuming UTC"
550 )
551 next_refresh_aware = next_refresh_aware.replace(
552 tzinfo=UTC
553 )
554 if next_refresh_aware <= now:
555 # Subscription is overdue - run it immediately with small jitter
556 logger.info(
557 f"Subscription {sub.id} is overdue, scheduling immediate run"
558 )
559 next_run = now + timedelta(seconds=jitter)
560 else:
561 next_run = next_refresh_aware
562 else:
563 next_run = now + timedelta(
564 minutes=refresh_minutes, seconds=jitter
565 )
567 trigger = "date"
568 trigger_args = {"run_date": next_run}
570 # Add the job
571 self.scheduler.add_job(
572 func=self._wrap_job(self._check_subscription),
573 args=[username, sub.id],
574 trigger=trigger,
575 id=job_id,
576 name=f"Check {sub.name or sub.query_or_topic[:30]}",
577 replace_existing=True,
578 **trigger_args,
579 )
581 session_info["scheduled_jobs"].add(job_id)
582 logger.info(f"Scheduled job {job_id} with {trigger} trigger")
584 except Exception:
585 logger.exception(f"Error scheduling subscriptions for {username}")
587 # Add document processing for this user
588 self._schedule_document_processing(username)
590 def _schedule_document_processing(self, username: str):
591 """Schedule document processing for a user."""
592 logger.info(
593 f"[DOC_SCHEDULER] Scheduling document processing for {username}"
594 )
595 logger.debug(
596 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}"
597 )
599 try:
600 session_info = self.user_sessions.get(username)
601 if not session_info:
602 logger.warning(
603 f"[DOC_SCHEDULER] No session info found for {username}"
604 )
605 logger.debug(
606 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}"
607 )
608 return
610 logger.debug(
611 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}"
612 )
614 # Get user's document scheduler settings (cached)
615 settings = self._get_document_scheduler_settings(username)
617 if not settings.enabled:
618 logger.info(
619 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}"
620 )
621 return
623 logger.info(
624 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, "
625 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, "
626 f"text={settings.extract_text}, rag={settings.generate_rag}"
627 )
629 # Schedule document processing job
630 job_id = f"{username}_document_processing"
631 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}")
633 # Remove existing document job if any
634 try:
635 self.scheduler.remove_job(job_id)
636 session_info["scheduled_jobs"].discard(job_id)
637 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}")
638 except JobLookupError:
639 logger.debug(
640 f"[DOC_SCHEDULER] No existing job {job_id} to remove"
641 )
642 pass # Job doesn't exist, that's fine
644 # Add new document processing job
645 logger.debug(
646 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s"
647 )
648 self.scheduler.add_job(
649 func=self._wrap_job(self._process_user_documents),
650 args=[username],
651 trigger="interval",
652 seconds=settings.interval_seconds,
653 id=job_id,
654 name=f"Process Documents for {username}",
655 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously
656 max_instances=1, # Prevent overlapping document processing for same user
657 replace_existing=True,
658 )
660 session_info["scheduled_jobs"].add(job_id)
661 logger.info(
662 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval"
663 )
664 logger.debug(
665 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}"
666 )
668 # Verify job was added
669 job = self.scheduler.get_job(job_id)
670 if job:
671 logger.info(
672 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}"
673 )
674 else:
675 logger.error(
676 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!"
677 )
679 except Exception:
680 logger.exception(
681 f"Error scheduling document processing for {username}"
682 )
684 @thread_cleanup
685 def _process_user_documents(self, username: str):
686 """Process documents for a user."""
687 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}")
688 start_time = datetime.now(UTC)
690 try:
691 session_info = self.user_sessions.get(username)
692 if not session_info:
693 logger.warning(
694 f"[DOC_SCHEDULER] No session info found for user {username}"
695 )
696 return
698 password = self._credential_store.retrieve(username)
699 if not password:
700 logger.warning(
701 f"[DOC_SCHEDULER] Credentials expired for user {username}"
702 )
703 return
704 logger.debug(
705 f"[DOC_SCHEDULER] Starting document processing for {username}"
706 )
708 # Get user's document scheduler settings (cached)
709 settings = self._get_document_scheduler_settings(username)
711 logger.info(
712 f"[DOC_SCHEDULER] Processing settings for {username}: "
713 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}"
714 )
716 if not any(
717 [
718 settings.download_pdfs,
719 settings.extract_text,
720 settings.generate_rag,
721 ]
722 ):
723 logger.info(
724 f"[DOC_SCHEDULER] No processing options enabled for user {username}"
725 )
726 return
728 # Parse last_run from cached settings
729 last_run = (
730 datetime.fromisoformat(settings.last_run)
731 if settings.last_run
732 else None
733 )
735 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}")
737 # Need database session for queries and updates
738 from ...database.session_context import get_user_db_session
739 from ...database.models.research import ResearchHistory
740 from ...settings.manager import SettingsManager
742 with get_user_db_session(username, password) as db:
743 settings_manager = SettingsManager(db)
745 # Query for completed research since last run
746 logger.debug(
747 f"[DOC_SCHEDULER] Querying for completed research since {last_run}"
748 )
749 query = db.query(ResearchHistory).filter(
750 ResearchHistory.status == ResearchStatus.COMPLETED,
751 ResearchHistory.completed_at.is_not(
752 None
753 ), # Ensure completed_at is not null
754 )
756 if last_run:
757 query = query.filter(
758 ResearchHistory.completed_at > last_run
759 )
761 # Limit to recent research to prevent overwhelming
762 query = query.order_by(
763 ResearchHistory.completed_at.desc()
764 ).limit(20)
766 research_sessions = query.all()
767 logger.debug(
768 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions"
769 )
771 if not research_sessions:
772 logger.info(
773 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}"
774 )
775 return
777 logger.info(
778 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}"
779 )
781 # Log details of each research session
782 for i, research in enumerate(
783 research_sessions[:5]
784 ): # Log first 5 details
785 title_safe = (
786 (research.title[:50] + "...")
787 if research.title
788 else "No title"
789 )
790 completed_safe = (
791 research.completed_at
792 if research.completed_at
793 else "No completion time"
794 )
795 logger.debug(
796 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}"
797 )
799 # Handle completed_at which might be a string or datetime
800 completed_at_obj = None
801 if research.completed_at:
802 if isinstance(research.completed_at, str):
803 try:
804 completed_at_obj = datetime.fromisoformat(
805 research.completed_at.replace("Z", "+00:00")
806 )
807 except (ValueError, TypeError, AttributeError):
808 completed_at_obj = None
809 else:
810 completed_at_obj = research.completed_at
812 logger.debug(
813 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}"
814 )
815 logger.debug(
816 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}"
817 )
818 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}")
819 logger.debug(
820 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}"
821 )
823 processed_count = 0
824 for research in research_sessions:
825 try:
826 logger.info(
827 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}"
828 )
830 # Call actual processing APIs
831 if settings.download_pdfs:
832 logger.info(
833 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}"
834 )
835 try:
836 # Use the DownloadService to queue PDF downloads
837 from ...research_library.services.download_service import (
838 DownloadService,
839 )
841 with DownloadService(
842 username=username, password=password
843 ) as download_service:
844 queued_count = download_service.queue_research_downloads(
845 research.id
846 )
847 logger.info(
848 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}"
849 )
850 except Exception:
851 logger.exception(
852 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}"
853 )
855 if settings.extract_text:
856 logger.info(
857 f"[DOC_SCHEDULER] Extracting text for research {research.id}"
858 )
859 try:
860 # Use the DownloadService to extract text for all resources
861 from ...research_library.services.download_service import (
862 DownloadService,
863 )
864 from ...database.models.research import (
865 ResearchResource,
866 )
868 from ...research_library.utils import (
869 is_downloadable_url,
870 )
872 with DownloadService(
873 username=username, password=password
874 ) as download_service:
875 # Get all resources for this research (reuse existing db session)
876 all_resources = (
877 db.query(ResearchResource)
878 .filter_by(research_id=research.id)
879 .all()
880 )
881 # Filter: only process downloadable resources (academic/PDF)
882 resources = [
883 r
884 for r in all_resources
885 if is_downloadable_url(r.url)
886 ]
887 processed_count = 0
888 for resource in resources:
889 # We need to pass the password to the download service
890 # The DownloadService creates its own database sessions, so we need to ensure password is available
891 try:
892 success, error = (
893 download_service.download_as_text(
894 resource.id
895 )
896 )
897 if success:
898 processed_count += 1
899 logger.info(
900 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}"
901 )
902 else:
903 logger.warning(
904 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}"
905 )
906 except Exception as resource_error:
907 logger.exception(
908 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}"
909 )
910 logger.info(
911 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed"
912 )
913 except Exception:
914 logger.exception(
915 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}"
916 )
918 if settings.generate_rag:
919 logger.info(
920 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}"
921 )
922 try:
923 # Get embedding settings from user configuration
924 embedding_model = settings_manager.get_setting(
925 "local_search_embedding_model",
926 "all-MiniLM-L6-v2",
927 )
928 embedding_provider = (
929 settings_manager.get_setting(
930 "local_search_embedding_provider",
931 "sentence_transformers",
932 )
933 )
934 chunk_size = int(
935 settings_manager.get_setting(
936 "local_search_chunk_size", 1000
937 )
938 )
939 chunk_overlap = int(
940 settings_manager.get_setting(
941 "local_search_chunk_overlap", 200
942 )
943 )
945 # Initialize RAG service with user's embedding configuration
946 with LibraryRAGService(
947 username=username,
948 embedding_model=embedding_model,
949 embedding_provider=embedding_provider,
950 chunk_size=chunk_size,
951 chunk_overlap=chunk_overlap,
952 db_password=password,
953 ) as rag_service:
954 # Get default Library collection ID
955 library_collection_id = (
956 get_default_library_id(
957 username, password
958 )
959 )
961 # Query for unindexed documents from this research session
962 documents_to_index = (
963 db.query(Document.id, Document.title)
964 .outerjoin(
965 DocumentCollection,
966 (
967 DocumentCollection.document_id
968 == Document.id
969 )
970 & (
971 DocumentCollection.collection_id
972 == library_collection_id
973 ),
974 )
975 .filter(
976 Document.research_id == research.id,
977 Document.text_content.isnot(None),
978 (
979 DocumentCollection.indexed.is_(
980 False
981 )
982 | DocumentCollection.id.is_(
983 None
984 )
985 ),
986 )
987 .all()
988 )
990 if not documents_to_index:
991 logger.info(
992 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}"
993 )
994 else:
995 # Index each document
996 indexed_count = 0
997 for (
998 doc_id,
999 doc_title,
1000 ) in documents_to_index:
1001 try:
1002 result = rag_service.index_document(
1003 document_id=doc_id,
1004 collection_id=library_collection_id,
1005 force_reindex=False,
1006 )
1007 if ( 1007 ↛ 997line 1007 didn't jump to line 997 because the condition on line 1007 was always true
1008 result["status"]
1009 == "success"
1010 ):
1011 indexed_count += 1
1012 logger.info(
1013 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) "
1014 f"with {result.get('chunk_count', 0)} chunks"
1015 )
1016 except Exception as doc_error:
1017 logger.exception(
1018 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}"
1019 )
1021 logger.info(
1022 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: "
1023 f"{indexed_count}/{len(documents_to_index)} documents indexed"
1024 )
1025 except Exception:
1026 logger.exception(
1027 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}"
1028 )
1030 processed_count += 1
1031 logger.debug(
1032 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}"
1033 )
1035 except Exception:
1036 logger.exception(
1037 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}"
1038 )
1040 # Update last run time in user's settings
1041 current_time = datetime.now(UTC).isoformat()
1042 settings_manager.set_setting(
1043 "document_scheduler.last_run", current_time, commit=True
1044 )
1045 logger.debug(
1046 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}"
1047 )
1049 end_time = datetime.now(UTC)
1050 duration = (end_time - start_time).total_seconds()
1051 logger.info(
1052 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s"
1053 )
1055 except Exception:
1056 logger.exception(
1057 f"[DOC_SCHEDULER] Error processing documents for user {username}"
1058 )
1060 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]:
1061 """Get document scheduler status for a specific user."""
1062 try:
1063 session_info = self.user_sessions.get(username)
1064 if not session_info:
1065 return {
1066 "enabled": False,
1067 "message": "User not found in scheduler",
1068 }
1070 # Get user's document scheduler settings (cached)
1071 settings = self._get_document_scheduler_settings(username)
1073 # Check if user has document processing job
1074 job_id = f"{username}_document_processing"
1075 has_job = job_id in session_info.get("scheduled_jobs", set())
1077 return {
1078 "enabled": settings.enabled,
1079 "interval_seconds": settings.interval_seconds,
1080 "processing_options": {
1081 "download_pdfs": settings.download_pdfs,
1082 "extract_text": settings.extract_text,
1083 "generate_rag": settings.generate_rag,
1084 },
1085 "last_run": settings.last_run,
1086 "has_scheduled_job": has_job,
1087 "user_active": username in self.user_sessions,
1088 }
1090 except Exception as e:
1091 logger.exception(
1092 f"Error getting document scheduler status for user {username}"
1093 )
1094 return {
1095 "enabled": False,
1096 "message": f"Failed to retrieve scheduler status: {type(e).__name__}",
1097 }
1099 def trigger_document_processing(self, username: str) -> bool:
1100 """Trigger immediate document processing for a user."""
1101 logger.info(
1102 f"[DOC_SCHEDULER] Manual trigger requested for user {username}"
1103 )
1104 try:
1105 session_info = self.user_sessions.get(username)
1106 if not session_info:
1107 logger.warning(
1108 f"[DOC_SCHEDULER] User {username} not found in scheduler"
1109 )
1110 logger.debug(
1111 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}"
1112 )
1113 return False
1115 if not self.is_running:
1116 logger.warning(
1117 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}"
1118 )
1119 return False
1121 # Trigger immediate processing
1122 job_id = f"{username}_document_processing_manual"
1123 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}")
1125 self.scheduler.add_job(
1126 func=self._wrap_job(self._process_user_documents),
1127 args=[username],
1128 trigger="date",
1129 run_date=datetime.now(UTC) + timedelta(seconds=1),
1130 id=job_id,
1131 name=f"Manual Document Processing for {username}",
1132 replace_existing=True,
1133 )
1135 # Verify job was added
1136 job = self.scheduler.get_job(job_id)
1137 if job:
1138 logger.info(
1139 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}"
1140 )
1141 else:
1142 logger.error(
1143 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!"
1144 )
1145 return False
1147 return True
1149 except Exception:
1150 logger.exception(
1151 f"[DOC_SCHEDULER] Error triggering document processing for user {username}"
1152 )
1153 return False
1155 @thread_cleanup
1156 def _check_user_overdue_subscriptions(self, username: str):
1157 """Check and immediately run any overdue subscriptions for a user."""
1158 try:
1159 session_info = self.user_sessions.get(username)
1160 if not session_info:
1161 return
1163 password = self._credential_store.retrieve(username)
1164 if not password:
1165 return
1167 # Get user's overdue subscriptions
1168 from ...database.session_context import get_user_db_session
1169 from ...database.models.news import NewsSubscription
1170 from datetime import timezone
1172 with get_user_db_session(username, password) as db:
1173 now = datetime.now(timezone.utc)
1174 overdue_subs = (
1175 db.query(NewsSubscription)
1176 .filter(
1177 NewsSubscription.is_active.is_(True),
1178 NewsSubscription.next_refresh.is_not(None),
1179 NewsSubscription.next_refresh <= now,
1180 )
1181 .all()
1182 )
1184 if overdue_subs:
1185 logger.info(
1186 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1187 )
1189 for sub in overdue_subs:
1190 # Run immediately with small random delay
1191 # Security: random delay to stagger overdue jobs, not security-sensitive
1192 delay_seconds = random.randint(1, 30)
1193 job_id = (
1194 f"overdue_{username}_{sub.id}_{int(now.timestamp())}"
1195 )
1197 self.scheduler.add_job(
1198 func=self._wrap_job(self._check_subscription),
1199 args=[username, sub.id],
1200 trigger="date",
1201 run_date=now + timedelta(seconds=delay_seconds),
1202 id=job_id,
1203 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}",
1204 replace_existing=True,
1205 )
1207 logger.info(
1208 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds"
1209 )
1211 except Exception:
1212 logger.exception(
1213 f"Error checking overdue subscriptions for {username}"
1214 )
1216 @thread_cleanup
1217 def _check_subscription(self, username: str, subscription_id: int):
1218 """Check and refresh a single subscription."""
1219 logger.info(
1220 f"_check_subscription called for user {username}, subscription {subscription_id}"
1221 )
1222 try:
1223 session_info = self.user_sessions.get(username)
1224 if not session_info:
1225 # User no longer active, cancel job
1226 job_id = f"{username}_{subscription_id}"
1227 try:
1228 self.scheduler.remove_job(job_id)
1229 except JobLookupError:
1230 pass
1231 return
1233 password = self._credential_store.retrieve(username)
1234 if not password: 1234 ↛ 1235line 1234 didn't jump to line 1235 because the condition on line 1234 was never true
1235 logger.warning(
1236 f"Credentials expired for {username}, skipping subscription check"
1237 )
1238 return
1240 # Get subscription details
1241 from ...database.session_context import get_user_db_session
1242 from ...database.models.news import NewsSubscription
1244 with get_user_db_session(username, password) as db:
1245 sub = db.query(NewsSubscription).get(subscription_id)
1246 if not sub or not sub.is_active:
1247 logger.info(
1248 f"Subscription {subscription_id} not active, skipping"
1249 )
1250 return
1252 # Prepare query with date replacement using user's timezone
1253 query = sub.query_or_topic
1254 if "YYYY-MM-DD" in query:
1255 from ..core.utils import get_local_date_string
1256 from ...settings.manager import SettingsManager
1258 settings_manager = SettingsManager(db)
1259 local_date = get_local_date_string(settings_manager)
1260 query = query.replace("YYYY-MM-DD", local_date)
1262 # Update last/next refresh times
1263 sub.last_refresh = datetime.now(UTC)
1264 sub.next_refresh = datetime.now(UTC) + timedelta(
1265 minutes=sub.refresh_interval_minutes
1266 )
1267 db.commit()
1269 subscription_data = {
1270 "id": sub.id,
1271 "name": sub.name,
1272 "query": query,
1273 "original_query": sub.query_or_topic,
1274 "model_provider": sub.model_provider,
1275 "model": sub.model,
1276 "search_strategy": sub.search_strategy,
1277 "search_engine": sub.search_engine,
1278 }
1280 logger.info(
1281 f"Refreshing subscription {subscription_id}: {subscription_data['name']}"
1282 )
1284 # Trigger research synchronously using requests with proper auth
1285 self._trigger_subscription_research_sync(
1286 username, subscription_data
1287 )
1289 # Reschedule for next interval if using interval trigger
1290 job_id = f"{username}_{subscription_id}"
1291 job = self.scheduler.get_job(job_id)
1292 if job and job.trigger.__class__.__name__ == "DateTrigger":
1293 # For date triggers, reschedule
1294 # Security: random jitter to distribute subscription timing, not security-sensitive
1295 next_run = datetime.now(UTC) + timedelta(
1296 minutes=sub.refresh_interval_minutes,
1297 seconds=random.randint(
1298 0, int(self.config.get("max_jitter_seconds", 300))
1299 ),
1300 )
1301 self.scheduler.add_job(
1302 func=self._wrap_job(self._check_subscription),
1303 args=[username, subscription_id],
1304 trigger="date",
1305 run_date=next_run,
1306 id=job_id,
1307 replace_existing=True,
1308 )
1310 except Exception:
1311 logger.exception(f"Error checking subscription {subscription_id}")
1313 @thread_cleanup
1314 def _trigger_subscription_research_sync(
1315 self, username: str, subscription: Dict[str, Any]
1316 ):
1317 """Trigger research for a subscription using programmatic API."""
1318 from ...config.thread_settings import set_settings_context
1320 try:
1321 # Get user's password from session info
1322 session_info = self.user_sessions.get(username)
1323 if not session_info:
1324 logger.error(f"No session info for user {username}")
1325 return
1327 password = self._credential_store.retrieve(username)
1328 if not password: 1328 ↛ 1329line 1328 didn't jump to line 1329 because the condition on line 1328 was never true
1329 logger.error(f"Credentials expired for user {username}")
1330 return
1332 # Generate research ID
1333 import uuid
1335 research_id = str(uuid.uuid4())
1337 logger.info(
1338 f"Starting research {research_id} for subscription {subscription['id']}"
1339 )
1341 # Get user settings for research
1342 from ...database.session_context import get_user_db_session
1343 from ...settings.manager import SettingsManager
1345 with get_user_db_session(username, password) as db:
1346 settings_manager = SettingsManager(db)
1347 settings_snapshot = settings_manager.get_settings_snapshot()
1349 # Use the search engine from the subscription if specified
1350 search_engine = subscription.get("search_engine")
1352 if search_engine:
1353 settings_snapshot["search.tool"] = {
1354 "value": search_engine,
1355 "ui_element": "select",
1356 }
1357 logger.info(
1358 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}"
1359 )
1360 else:
1361 # Use the user's default search tool from their settings
1362 default_search_tool = settings_snapshot.get(
1363 "search.tool", "auto"
1364 )
1365 logger.info(
1366 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}"
1367 )
1369 logger.debug(
1370 f"Settings snapshot has {len(settings_snapshot)} settings"
1371 )
1372 # Log a few key settings to verify they're present
1373 logger.debug(
1374 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}"
1375 )
1377 # Set up research parameters
1378 query = subscription["query"]
1380 # Build metadata for news search
1381 metadata = {
1382 "is_news_search": True,
1383 "search_type": "news_analysis",
1384 "display_in": "news_feed",
1385 "subscription_id": subscription["id"],
1386 "triggered_by": "scheduler",
1387 "subscription_name": subscription["name"],
1388 "title": subscription["name"] if subscription["name"] else None,
1389 "scheduled_at": datetime.now(UTC).isoformat(),
1390 "original_query": subscription["original_query"],
1391 "user_id": username,
1392 }
1394 # Use programmatic API with settings context
1395 from ...api.research_functions import quick_summary
1397 # Create and set settings context for this thread
1398 settings_context = SnapshotSettingsContext(settings_snapshot)
1399 set_settings_context(settings_context)
1401 # Get search strategy from subscription data (for the API call)
1402 search_strategy = subscription.get(
1403 "search_strategy", "news_aggregation"
1404 )
1406 # Call quick_summary with appropriate parameters
1407 result = quick_summary(
1408 query=query,
1409 research_id=research_id,
1410 username=username,
1411 user_password=password,
1412 settings_snapshot=settings_snapshot,
1413 search_strategy=search_strategy,
1414 model_name=subscription.get("model"),
1415 provider=subscription.get("model_provider"),
1416 iterations=1, # Single iteration for news
1417 metadata=metadata,
1418 search_original_query=False, # Don't send long subscription prompts to search engines
1419 )
1421 logger.info(
1422 f"Completed research {research_id} for subscription {subscription['id']}"
1423 )
1425 # Store the research result in the database
1426 self._store_research_result(
1427 username,
1428 password,
1429 research_id,
1430 subscription["id"],
1431 result,
1432 subscription,
1433 )
1435 except Exception:
1436 logger.exception(
1437 f"Error triggering research for subscription {subscription['id']}"
1438 )
1440 def _store_research_result(
1441 self,
1442 username: str,
1443 password: str,
1444 research_id: str,
1445 subscription_id: int,
1446 result: Dict[str, Any],
1447 subscription: Dict[str, Any],
1448 ):
1449 """Store research result in database for news display."""
1450 try:
1451 from ...database.session_context import get_user_db_session
1452 from ...database.models import ResearchHistory
1453 from ...settings.manager import SettingsManager
1454 import json
1456 # Convert result to JSON-serializable format
1457 def make_serializable(obj):
1458 """Convert non-serializable objects to dictionaries."""
1459 if hasattr(obj, "dict"):
1460 return obj.dict()
1461 if hasattr(obj, "__dict__"): 1461 ↛ 1462line 1461 didn't jump to line 1462 because the condition on line 1461 was never true
1462 return {
1463 k: make_serializable(v)
1464 for k, v in obj.__dict__.items()
1465 if not k.startswith("_")
1466 }
1467 if isinstance(obj, (list, tuple)):
1468 return [make_serializable(item) for item in obj]
1469 if isinstance(obj, dict):
1470 return {k: make_serializable(v) for k, v in obj.items()}
1471 return obj
1473 serializable_result = make_serializable(result)
1475 with get_user_db_session(username, password) as db:
1476 # Get user settings to store in metadata
1477 settings_manager = SettingsManager(db)
1478 settings_snapshot = settings_manager.get_settings_snapshot()
1480 # Get the report content - check both 'report' and 'summary' fields
1481 report_content = serializable_result.get(
1482 "report"
1483 ) or serializable_result.get("summary")
1484 logger.debug(
1485 f"Report content length: {len(report_content) if report_content else 0} chars"
1486 )
1488 # Extract sources/links from the result
1489 sources = serializable_result.get("sources", [])
1491 # First add the sources/references section if we have sources
1492 if report_content and sources:
1493 # Import utilities for formatting links
1494 from ...utilities.search_utilities import (
1495 format_links_to_markdown,
1496 )
1498 # Format the links/citations
1499 formatted_links = format_links_to_markdown(sources)
1501 # Add references section to the report
1502 if formatted_links:
1503 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}"
1505 # Then format citations in the report content
1506 if report_content:
1507 # Import citation formatter
1508 from ...text_optimization.citation_formatter import (
1509 CitationFormatter,
1510 CitationMode,
1511 )
1512 from ...config.search_config import (
1513 get_setting_from_snapshot,
1514 )
1516 # Get citation format from settings
1517 citation_format = get_setting_from_snapshot(
1518 "report.citation_format", "domain_id_hyperlinks"
1519 )
1520 mode_map = {
1521 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
1522 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
1523 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
1524 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
1525 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
1526 }
1527 mode = mode_map.get(
1528 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS
1529 )
1530 formatter = CitationFormatter(mode=mode)
1532 # Format citations within the content
1533 report_content = formatter.format_document(report_content)
1535 if not report_content:
1536 # If neither field exists, use the full result as JSON
1537 report_content = json.dumps(serializable_result)
1539 # Generate headline and topics for news searches
1540 from ...news.utils.headline_generator import generate_headline
1541 from ...news.utils.topic_generator import generate_topics
1543 query_text = result.get(
1544 "query", subscription.get("query", "News Update")
1545 )
1547 # Generate headline from the actual research findings
1548 logger.info(
1549 f"Generating headline for subscription {subscription_id}"
1550 )
1551 generated_headline = generate_headline(
1552 query=query_text,
1553 findings=report_content,
1554 max_length=200, # Allow longer headlines for news
1555 )
1557 # Generate topics from the findings
1558 logger.info(
1559 f"Generating topics for subscription {subscription_id}"
1560 )
1561 generated_topics = generate_topics(
1562 query=query_text,
1563 findings=report_content,
1564 category=subscription.get("name", "News"),
1565 max_topics=6,
1566 )
1568 logger.info(
1569 f"Generated headline: {generated_headline}, topics: {generated_topics}"
1570 )
1572 # Get subscription name for metadata
1573 subscription_name = subscription.get("name", "")
1575 # Use generated headline as title, or fallback
1576 if generated_headline:
1577 title = generated_headline
1578 else:
1579 if subscription_name:
1580 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}"
1581 else:
1582 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}"
1584 # Create research history entry
1585 history_entry = ResearchHistory(
1586 id=research_id,
1587 query=result.get("query", ""),
1588 mode="news_subscription",
1589 status="completed",
1590 created_at=datetime.now(UTC).isoformat(),
1591 completed_at=datetime.now(UTC).isoformat(),
1592 title=title,
1593 research_meta={
1594 "subscription_id": subscription_id,
1595 "triggered_by": "scheduler",
1596 "is_news_search": True,
1597 "username": username,
1598 "subscription_name": subscription_name, # Store subscription name for display
1599 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval
1600 "generated_headline": generated_headline, # Store generated headline for news display
1601 "generated_topics": generated_topics, # Store topics for categorization
1602 },
1603 )
1604 db.add(history_entry)
1605 db.commit()
1607 # Store the report content using storage abstraction
1608 from ...storage import get_report_storage
1610 # Use storage to save the report (report_content already retrieved above)
1611 storage = get_report_storage(session=db)
1612 storage.save_report(
1613 research_id=research_id,
1614 content=report_content,
1615 username=username,
1616 )
1618 logger.info(
1619 f"Stored research result {research_id} for subscription {subscription_id}"
1620 )
1622 except Exception:
1623 logger.exception("Error storing research result")
1625 def _run_cleanup_with_tracking(self):
1626 """Wrapper that tracks cleanup execution."""
1628 try:
1629 cleaned_count = self._cleanup_inactive_users()
1631 logger.info(
1632 f"Cleanup successful: removed {cleaned_count} inactive users"
1633 )
1635 except Exception:
1636 logger.exception("Cleanup job failed")
1638 def _cleanup_inactive_users(self) -> int:
1639 """Remove users inactive for longer than retention period."""
1640 retention_hours = self.config.get("retention_hours", 48)
1641 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours)
1643 cleaned_count = 0
1645 with self.lock:
1646 inactive_users = [
1647 user_id
1648 for user_id, session in self.user_sessions.items()
1649 if session["last_activity"] < cutoff
1650 ]
1652 for user_id in inactive_users:
1653 # Remove all scheduled jobs
1654 for job_id in self.user_sessions[user_id][
1655 "scheduled_jobs"
1656 ].copy():
1657 try:
1658 self.scheduler.remove_job(job_id)
1659 except JobLookupError:
1660 pass
1662 # Clear credentials and session data
1663 self._credential_store.clear(user_id)
1664 del self.user_sessions[user_id]
1665 cleaned_count += 1
1666 logger.info(f"Cleaned up inactive user {user_id}")
1668 return cleaned_count
1670 def _reload_config(self):
1671 """Reload configuration from settings manager."""
1672 if not hasattr(self, "settings_manager") or not self.settings_manager:
1673 return
1675 try:
1676 old_retention = self.config.get("retention_hours", 48)
1678 # Reload all settings
1679 for key in self.config:
1680 if key == "enabled":
1681 continue # Don't change enabled state while running
1683 full_key = f"news.scheduler.{key}"
1684 self.config[key] = self._get_setting(full_key, self.config[key])
1686 # Handle changes that need immediate action
1687 if old_retention != self.config["retention_hours"]:
1688 logger.info(
1689 f"Retention period changed from {old_retention} "
1690 f"to {self.config['retention_hours']} hours"
1691 )
1692 # Trigger immediate cleanup with new retention
1693 self.scheduler.add_job(
1694 self._wrap_job(self._run_cleanup_with_tracking),
1695 "date",
1696 run_date=datetime.now(UTC) + timedelta(seconds=5),
1697 id="immediate_cleanup_config_change",
1698 )
1700 # Clear settings cache to pick up any user setting changes
1701 self.invalidate_all_settings_cache()
1703 except Exception:
1704 logger.exception("Error reloading configuration")
1706 def get_status(self) -> Dict[str, Any]:
1707 """Get scheduler status information."""
1708 with self.lock:
1709 active_users = len(self.user_sessions)
1710 total_jobs = sum(
1711 len(session["scheduled_jobs"])
1712 for session in self.user_sessions.values()
1713 )
1715 # Get next run time for cleanup job
1716 next_cleanup = None
1717 if self.is_running:
1718 job = self.scheduler.get_job("cleanup_inactive_users")
1719 if job: 1719 ↛ 1722line 1719 didn't jump to line 1722 because the condition on line 1719 was always true
1720 next_cleanup = job.next_run_time
1722 return {
1723 "is_running": self.is_running,
1724 "config": self.config,
1725 "active_users": active_users,
1726 "total_scheduled_jobs": total_jobs,
1727 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None,
1728 "memory_usage": self._estimate_memory_usage(),
1729 }
1731 def _estimate_memory_usage(self) -> int:
1732 """Estimate memory usage of user sessions."""
1734 # Rough estimate: username (50) + password (100) + metadata (200) per user
1735 per_user_estimate = 350
1736 return len(self.user_sessions) * per_user_estimate
1738 def get_user_sessions_summary(self) -> List[Dict[str, Any]]:
1739 """Get summary of active user sessions (without passwords)."""
1740 with self.lock:
1741 summary = []
1742 for user_id, session in self.user_sessions.items():
1743 summary.append(
1744 {
1745 "user_id": user_id,
1746 "last_activity": session["last_activity"].isoformat(),
1747 "scheduled_jobs": len(session["scheduled_jobs"]),
1748 "time_since_activity": str(
1749 datetime.now(UTC) - session["last_activity"]
1750 ),
1751 }
1752 )
1753 return summary
1756# Singleton instance getter
1757_scheduler_instance = None
1760def get_news_scheduler() -> NewsScheduler:
1761 """Get the singleton news scheduler instance."""
1762 global _scheduler_instance
1763 if _scheduler_instance is None:
1764 _scheduler_instance = NewsScheduler()
1765 return _scheduler_instance