Coverage for src / local_deep_research / scheduler / background.py: 89%
668 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-19 11:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-19 11:23 +0000
1"""
2Activity-based news subscription scheduler for per-user encrypted databases.
3Tracks user activity and temporarily stores credentials for automatic updates.
4"""
6import random
7import threading
8from dataclasses import dataclass
9from datetime import datetime, timedelta, UTC
10from functools import wraps
11from typing import Any, Callable, Dict, List
13from cachetools import TTLCache
14from loguru import logger
15from ..settings.logger import log_settings
16from ..settings.manager import SnapshotSettingsContext
18from apscheduler.schedulers.background import BackgroundScheduler
19from apscheduler.jobstores.base import JobLookupError
20from ..constants import ResearchStatus
21from ..database.credential_store_base import CredentialStoreBase
22from ..database.thread_local_session import thread_cleanup
24# RAG indexing imports
25from ..research_library.services.library_rag_service import LibraryRAGService
26from ..database.library_init import get_default_library_id
27from ..database.models.library import Document, DocumentCollection
30SCHEDULER_AVAILABLE = True # Always available since it's a required dependency
33class SchedulerCredentialStore(CredentialStoreBase):
34 """Credential store for the news scheduler.
36 Stores user passwords with TTL expiration so that background scheduler
37 jobs can access encrypted per-user databases.
38 """
40 def __init__(self, ttl_hours: int = 48):
41 super().__init__(ttl_hours * 3600)
43 def store(self, username: str, password: str) -> None:
44 """Store password for a user."""
45 self._store_credentials(
46 username, {"username": username, "password": password}
47 )
49 def retrieve(self, username: str) -> str | None:
50 """Retrieve password for a user. Returns None if expired/missing."""
51 result = self._retrieve_credentials(username, remove=False)
52 return result[1] if result else None
54 def clear(self, username: str) -> None:
55 """Clear stored password for a user."""
56 self.clear_entry(username)
59@dataclass(frozen=True)
60class DocumentSchedulerSettings:
61 """
62 Immutable settings snapshot for document scheduler.
64 Thread-safe: This is a frozen dataclass that can be safely passed
65 to and used from background threads.
66 """
68 enabled: bool = True
69 interval_seconds: int = 1800
70 download_pdfs: bool = False
71 extract_text: bool = True
72 generate_rag: bool = False
73 last_run: str = ""
75 @classmethod
76 def defaults(cls) -> "DocumentSchedulerSettings":
77 """Return default settings."""
78 return cls()
81class BackgroundJobScheduler:
82 """
83 Singleton scheduler that manages news subscriptions for active users.
85 This scheduler:
86 - Monitors user activity through database access
87 - Temporarily stores user credentials in memory
88 - Automatically schedules subscription checks
89 - Cleans up inactive users after configurable period
90 """
92 _instance = None
93 _lock = threading.Lock()
95 def __new__(cls):
96 """Ensure singleton instance."""
97 if cls._instance is None:
98 with cls._lock:
99 if cls._instance is None: 99 ↛ 101line 99 didn't jump to line 101
100 cls._instance = super().__new__(cls)
101 return cls._instance
103 def __init__(self):
104 """Initialize the scheduler (only runs once due to singleton)."""
105 # Skip if already initialized
106 if hasattr(self, "_initialized"):
107 return
109 # User session tracking
110 self.user_sessions = {} # user_id -> {last_activity, scheduled_jobs}
111 self.lock = threading.Lock()
113 # Credential store with TTL-based expiration
114 self._credential_store = SchedulerCredentialStore(ttl_hours=48)
116 # Scheduler instance
117 self.scheduler = BackgroundScheduler()
119 # Configuration (will be loaded from settings)
120 self.config = self._load_default_config()
122 # State
123 self.is_running = False
124 self._app = None # Flask app reference for background job contexts
126 # Settings cache: username -> DocumentSchedulerSettings
127 # TTL of 300 seconds (5 minutes) reduces database queries
128 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300)
129 self._settings_cache_lock = threading.Lock()
131 self._initialized = True
132 logger.info("News scheduler initialized")
134 def _load_default_config(self) -> Dict[str, Any]:
135 """Load default configuration (will be overridden by settings manager)."""
136 return {
137 "enabled": True,
138 "retention_hours": 48,
139 "cleanup_interval_hours": 1,
140 "max_jitter_seconds": 300,
141 "max_concurrent_jobs": 10,
142 "subscription_batch_size": 5,
143 "activity_check_interval_minutes": 5,
144 }
146 def initialize_with_settings(self, settings_manager):
147 """Initialize configuration from settings manager."""
148 try:
149 # Load all scheduler settings
150 self.settings_manager = settings_manager
151 self.config = {
152 "enabled": self._get_setting("news.scheduler.enabled", True),
153 "retention_hours": self._get_setting(
154 "news.scheduler.retention_hours", 48
155 ),
156 "cleanup_interval_hours": self._get_setting(
157 "news.scheduler.cleanup_interval_hours", 1
158 ),
159 "max_jitter_seconds": self._get_setting(
160 "news.scheduler.max_jitter_seconds", 300
161 ),
162 "max_concurrent_jobs": self._get_setting(
163 "news.scheduler.max_concurrent_jobs", 10
164 ),
165 "subscription_batch_size": self._get_setting(
166 "news.scheduler.batch_size", 5
167 ),
168 "activity_check_interval_minutes": self._get_setting(
169 "news.scheduler.activity_check_interval", 5
170 ),
171 }
172 log_settings(self.config, "Scheduler configuration loaded")
173 except Exception:
174 logger.exception("Error loading scheduler settings")
175 # Keep default config
177 def _get_setting(self, key: str, default: Any) -> Any:
178 """Get setting with fallback to default."""
179 if hasattr(self, "settings_manager") and self.settings_manager:
180 return self.settings_manager.get_setting(key, default=default)
181 return default
183 def set_app(self, app) -> None:
184 """Store a reference to the Flask app for creating app contexts in background jobs."""
185 self._app = app
187 def _wrap_job(self, func: Callable) -> Callable:
188 """Wrap a scheduler job function so it runs inside a Flask app context.
190 APScheduler runs jobs in a thread pool without Flask context.
191 This wrapper pushes an app context before the job runs and pops it after.
192 """
194 @wraps(func)
195 def wrapper(*args, **kwargs):
196 if self._app is not None: 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was always true
197 with self._app.app_context():
198 return func(*args, **kwargs)
199 else:
200 logger.warning(
201 f"No Flask app set on scheduler; running {func.__name__} without app context"
202 )
203 return func(*args, **kwargs)
205 return wrapper
207 def _get_document_scheduler_settings(
208 self, username: str, force_refresh: bool = False
209 ) -> DocumentSchedulerSettings:
210 """
211 Get document scheduler settings for a user with TTL caching.
213 This is the single source of truth for document scheduler settings.
214 Settings are cached for 5 minutes by default to reduce database queries.
216 Args:
217 username: User to get settings for
218 force_refresh: If True, bypass cache and fetch fresh settings
220 Returns:
221 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety)
222 """
223 # Fast path: check cache without modifying it
224 if not force_refresh:
225 with self._settings_cache_lock:
226 cached = self._settings_cache.get(username)
227 if cached is not None:
228 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}")
229 cached_settings: DocumentSchedulerSettings = cached
230 return cached_settings
232 # Cache miss - need to fetch from database
233 logger.debug(
234 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB"
235 )
237 # Get password from session
238 session_info = self.user_sessions.get(username)
239 if not session_info:
240 logger.warning(
241 f"[SETTINGS_CACHE] No session info for {username}, using defaults"
242 )
243 return DocumentSchedulerSettings.defaults()
245 password = self._credential_store.retrieve(username)
246 if not password: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 logger.warning(
248 f"[SETTINGS_CACHE] Credentials expired for {username}, using defaults"
249 )
250 return DocumentSchedulerSettings.defaults()
252 # Fetch settings from database (outside lock to avoid blocking)
253 try:
254 from ..database.session_context import get_user_db_session
255 from ..settings.manager import SettingsManager
257 with get_user_db_session(username, password) as db:
258 sm = SettingsManager(db)
260 settings = DocumentSchedulerSettings(
261 enabled=sm.get_setting("document_scheduler.enabled", True),
262 interval_seconds=sm.get_setting(
263 "document_scheduler.interval_seconds", 1800
264 ),
265 download_pdfs=sm.get_setting(
266 "document_scheduler.download_pdfs", False
267 ),
268 extract_text=sm.get_setting(
269 "document_scheduler.extract_text", True
270 ),
271 generate_rag=sm.get_setting(
272 "document_scheduler.generate_rag", False
273 ),
274 last_run=sm.get_setting("document_scheduler.last_run", ""),
275 )
277 # Store in cache
278 with self._settings_cache_lock:
279 self._settings_cache[username] = settings
280 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}")
282 return settings
284 except Exception:
285 logger.exception(
286 f"[SETTINGS_CACHE] Error fetching settings for {username}"
287 )
288 return DocumentSchedulerSettings.defaults()
290 def invalidate_user_settings_cache(self, username: str) -> bool:
291 """
292 Invalidate cached settings for a specific user.
294 Call this when user settings change or user logs out.
296 Args:
297 username: User whose cache to invalidate
299 Returns:
300 True if cache entry was removed, False if not found
301 """
302 with self._settings_cache_lock:
303 if username in self._settings_cache:
304 del self._settings_cache[username]
305 logger.debug(
306 f"[SETTINGS_CACHE] Invalidated cache for {username}"
307 )
308 return True
309 return False
311 def invalidate_all_settings_cache(self) -> int:
312 """
313 Invalidate all cached settings.
315 Call this when doing bulk settings updates or during config reload.
317 Returns:
318 Number of cache entries cleared
319 """
320 with self._settings_cache_lock:
321 count = len(self._settings_cache)
322 self._settings_cache.clear()
323 logger.info(
324 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)"
325 )
326 return count
328 def start(self):
329 """Start the scheduler."""
330 if not self.config.get("enabled", True):
331 logger.info("News scheduler is disabled in settings")
332 return
334 if self.is_running:
335 logger.warning("Scheduler is already running")
336 return
338 if self._app is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 raise RuntimeError(
340 "BackgroundJobScheduler.set_app() must be called before start()"
341 )
343 # Schedule cleanup job
344 self.scheduler.add_job(
345 self._wrap_job(self._run_cleanup_with_tracking),
346 "interval",
347 hours=self.config["cleanup_interval_hours"],
348 id="cleanup_inactive_users",
349 name="Cleanup Inactive User Sessions",
350 jitter=60, # Add some jitter to cleanup
351 )
353 # Schedule configuration reload
354 self.scheduler.add_job(
355 self._wrap_job(self._reload_config),
356 "interval",
357 minutes=30,
358 id="reload_config",
359 name="Reload Configuration",
360 )
362 # Start the scheduler
363 self.scheduler.start()
364 self.is_running = True
366 # Schedule initial cleanup after a delay
367 self.scheduler.add_job(
368 self._wrap_job(self._run_cleanup_with_tracking),
369 "date",
370 run_date=datetime.now(UTC) + timedelta(seconds=30),
371 id="initial_cleanup",
372 )
374 logger.info("News scheduler started")
376 def stop(self):
377 """Stop the scheduler."""
378 if self.is_running:
379 self.scheduler.shutdown(wait=True)
380 self.is_running = False
382 # Clear all user sessions and credentials
383 with self.lock:
384 for username in self.user_sessions:
385 self._credential_store.clear(username)
386 self.user_sessions.clear()
388 logger.info("News scheduler stopped")
390 def update_user_info(self, username: str, password: str):
391 """
392 Update user info in scheduler. Called on every database interaction.
394 Args:
395 username: User's username
396 password: User's password
397 """
398 logger.info(
399 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}"
400 )
401 logger.debug(
402 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}"
403 )
405 if not self.is_running:
406 logger.warning(
407 f"[SCHEDULER] Scheduler not running, cannot update user {username}"
408 )
409 return
411 with self.lock:
412 # Store password in credential store (inside lock to prevent
413 # race where concurrent calls leave mismatched credentials)
414 self._credential_store.store(username, password)
416 now = datetime.now(UTC)
418 if username not in self.user_sessions:
419 # New user - create session info
420 logger.info(f"[SCHEDULER] New user in scheduler: {username}")
421 self.user_sessions[username] = {
422 "last_activity": now,
423 "scheduled_jobs": set(),
424 }
425 logger.debug(
426 f"[SCHEDULER] Created session for {username}, scheduling subscriptions"
427 )
428 # Schedule their subscriptions
429 self._schedule_user_subscriptions(username)
430 else:
431 # Existing user - update info
432 logger.info(
433 f"[SCHEDULER] Updating existing user {username} activity, will reschedule"
434 )
435 old_activity = self.user_sessions[username]["last_activity"]
436 activity_delta = now - old_activity
437 logger.debug(
438 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}"
439 )
441 self.user_sessions[username]["last_activity"] = now
442 logger.debug(
443 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions"
444 )
445 # Reschedule their subscriptions in case they changed
446 self._schedule_user_subscriptions(username)
448 def unregister_user(self, username: str):
449 """
450 Unregister a user and clean up their scheduled jobs.
451 Called when user logs out.
452 """
453 with self.lock:
454 if username in self.user_sessions:
455 logger.info(f"Unregistering user {username}")
457 # Remove all scheduled jobs for this user
458 session_info = self.user_sessions[username]
459 for job_id in session_info["scheduled_jobs"].copy():
460 try:
461 self.scheduler.remove_job(job_id)
462 except JobLookupError:
463 pass
465 # Remove user session and clear credentials atomically
466 del self.user_sessions[username]
467 self._credential_store.clear(username)
469 # Invalidate settings cache for this user (outside lock)
470 self.invalidate_user_settings_cache(username)
471 logger.info(f"User {username} unregistered successfully")
473 def _schedule_user_subscriptions(self, username: str):
474 """Schedule all active subscriptions for a user."""
475 logger.info(f"_schedule_user_subscriptions called for {username}")
476 try:
477 session_info = self.user_sessions.get(username)
478 if not session_info:
479 logger.warning(f"No session info found for {username}")
480 return
482 password = self._credential_store.retrieve(username)
483 if not password: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 logger.warning(
485 f"Credentials expired for {username}, skipping subscription scheduling"
486 )
487 return
488 logger.debug(f"Got password for {username}: present")
490 # Get user's subscriptions from their encrypted database
491 from ..database.session_context import get_user_db_session
492 from ..database.models.news import NewsSubscription
494 with get_user_db_session(username, password) as db:
495 subscriptions = (
496 db.query(NewsSubscription).filter_by(is_active=True).all()
497 )
498 logger.debug(
499 f"Query executed, found {len(subscriptions)} results"
500 )
502 # Log details of each subscription
503 for sub in subscriptions:
504 logger.debug(
505 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes"
506 )
508 logger.info(
509 f"Found {len(subscriptions)} active subscriptions for {username}"
510 )
512 # Clear old jobs for this user
513 for job_id in session_info["scheduled_jobs"].copy():
514 try:
515 self.scheduler.remove_job(job_id)
516 session_info["scheduled_jobs"].remove(job_id)
517 except JobLookupError:
518 pass
520 # Schedule each subscription with jitter
521 for sub in subscriptions:
522 job_id = f"{username}_{sub.id}"
524 # Calculate jitter
525 # Security: random jitter to distribute subscription timing, not security-sensitive
526 max_jitter = int(self.config.get("max_jitter_seconds", 300))
527 jitter = random.randint(0, max_jitter)
529 # Determine trigger based on frequency
530 refresh_minutes = sub.refresh_interval_minutes
532 if refresh_minutes <= 60: # 60 minutes or less
533 # For hourly or more frequent, use interval trigger
534 trigger = "interval"
535 trigger_args = {
536 "minutes": refresh_minutes,
537 "jitter": jitter,
538 "start_date": datetime.now(UTC), # Start immediately
539 }
540 else:
541 # For less frequent, calculate next run time
542 now = datetime.now(UTC)
543 if sub.next_refresh:
544 # Ensure timezone-aware for comparison with now (UTC)
545 next_refresh_aware = sub.next_refresh
546 if next_refresh_aware.tzinfo is None:
547 logger.warning(
548 f"Subscription {sub.id} has naive (non-tz-aware) "
549 f"next_refresh datetime, assuming UTC"
550 )
551 next_refresh_aware = next_refresh_aware.replace(
552 tzinfo=UTC
553 )
554 if next_refresh_aware <= now:
555 # Subscription is overdue - run it immediately with small jitter
556 logger.info(
557 f"Subscription {sub.id} is overdue, scheduling immediate run"
558 )
559 next_run = now + timedelta(seconds=jitter)
560 else:
561 next_run = next_refresh_aware
562 else:
563 next_run = now + timedelta(
564 minutes=refresh_minutes, seconds=jitter
565 )
567 trigger = "date"
568 trigger_args = {"run_date": next_run}
570 # Add the job
571 self.scheduler.add_job(
572 func=self._wrap_job(self._check_subscription),
573 args=[username, sub.id],
574 trigger=trigger,
575 id=job_id,
576 name=f"Check {sub.name or sub.query_or_topic[:30]}",
577 replace_existing=True,
578 **trigger_args,
579 )
581 session_info["scheduled_jobs"].add(job_id)
582 logger.info(f"Scheduled job {job_id} with {trigger} trigger")
584 except Exception:
585 logger.exception(f"Error scheduling subscriptions for {username}")
587 # Add document processing for this user
588 self._schedule_document_processing(username)
590 def _schedule_document_processing(self, username: str):
591 """Schedule document processing for a user."""
592 logger.info(
593 f"[DOC_SCHEDULER] Scheduling document processing for {username}"
594 )
595 logger.debug(
596 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}"
597 )
599 try:
600 session_info = self.user_sessions.get(username)
601 if not session_info:
602 logger.warning(
603 f"[DOC_SCHEDULER] No session info found for {username}"
604 )
605 logger.debug(
606 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}"
607 )
608 return
610 logger.debug(
611 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}"
612 )
614 # Get user's document scheduler settings (cached)
615 settings = self._get_document_scheduler_settings(username)
617 if not settings.enabled:
618 logger.info(
619 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}"
620 )
621 return
623 logger.info(
624 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, "
625 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, "
626 f"text={settings.extract_text}, rag={settings.generate_rag}"
627 )
629 # Schedule document processing job
630 job_id = f"{username}_document_processing"
631 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}")
633 # Remove existing document job if any
634 try:
635 self.scheduler.remove_job(job_id)
636 session_info["scheduled_jobs"].discard(job_id)
637 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}")
638 except JobLookupError:
639 logger.debug(
640 f"[DOC_SCHEDULER] No existing job {job_id} to remove"
641 )
642 pass # Job doesn't exist, that's fine
644 # Add new document processing job
645 logger.debug(
646 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s"
647 )
648 self.scheduler.add_job(
649 func=self._wrap_job(self._process_user_documents),
650 args=[username],
651 trigger="interval",
652 seconds=settings.interval_seconds,
653 id=job_id,
654 name=f"Process Documents for {username}",
655 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously
656 max_instances=1, # Prevent overlapping document processing for same user
657 replace_existing=True,
658 )
660 session_info["scheduled_jobs"].add(job_id)
661 logger.info(
662 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval"
663 )
664 logger.debug(
665 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}"
666 )
668 # Verify job was added
669 job = self.scheduler.get_job(job_id)
670 if job:
671 logger.info(
672 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}"
673 )
674 else:
675 logger.error(
676 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!"
677 )
679 except Exception:
680 logger.exception(
681 f"Error scheduling document processing for {username}"
682 )
684 @thread_cleanup
685 def _process_user_documents(self, username: str):
686 """Process documents for a user."""
687 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}")
688 start_time = datetime.now(UTC)
690 try:
691 session_info = self.user_sessions.get(username)
692 if not session_info:
693 logger.warning(
694 f"[DOC_SCHEDULER] No session info found for user {username}"
695 )
696 return
698 password = self._credential_store.retrieve(username)
699 if not password:
700 logger.warning(
701 f"[DOC_SCHEDULER] Credentials expired for user {username}"
702 )
703 return
704 logger.debug(
705 f"[DOC_SCHEDULER] Starting document processing for {username}"
706 )
708 # Get user's document scheduler settings (cached)
709 settings = self._get_document_scheduler_settings(username)
711 logger.info(
712 f"[DOC_SCHEDULER] Processing settings for {username}: "
713 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}"
714 )
716 if not any(
717 [
718 settings.download_pdfs,
719 settings.extract_text,
720 settings.generate_rag,
721 ]
722 ):
723 logger.info(
724 f"[DOC_SCHEDULER] No processing options enabled for user {username}"
725 )
726 return
728 # Parse last_run from cached settings
729 last_run = (
730 datetime.fromisoformat(settings.last_run)
731 if settings.last_run
732 else None
733 )
735 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}")
737 # Need database session for queries and updates
738 from ..database.session_context import get_user_db_session
739 from ..database.models.research import ResearchHistory
740 from ..settings.manager import SettingsManager
742 with get_user_db_session(username, password) as db:
743 settings_manager = SettingsManager(db)
745 # Query for completed research since last run
746 logger.debug(
747 f"[DOC_SCHEDULER] Querying for completed research since {last_run}"
748 )
749 query = db.query(ResearchHistory).filter(
750 ResearchHistory.status == ResearchStatus.COMPLETED,
751 ResearchHistory.completed_at.is_not(
752 None
753 ), # Ensure completed_at is not null
754 )
756 if last_run:
757 query = query.filter(
758 ResearchHistory.completed_at > last_run
759 )
761 # Limit to recent research to prevent overwhelming
762 query = query.order_by(
763 ResearchHistory.completed_at.desc()
764 ).limit(20)
766 research_sessions = query.all()
767 logger.debug(
768 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions"
769 )
771 if not research_sessions:
772 logger.info(
773 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}"
774 )
775 return
777 logger.info(
778 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}"
779 )
781 # Log details of each research session
782 for i, research in enumerate(
783 research_sessions[:5]
784 ): # Log first 5 details
785 title_safe = (
786 (research.title[:50] + "...")
787 if research.title
788 else "No title"
789 )
790 completed_safe = (
791 research.completed_at
792 if research.completed_at
793 else "No completion time"
794 )
795 logger.debug(
796 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}"
797 )
799 # Handle completed_at which might be a string or datetime
800 completed_at_obj = None
801 if research.completed_at:
802 if isinstance(research.completed_at, str):
803 try:
804 completed_at_obj = datetime.fromisoformat(
805 research.completed_at.replace("Z", "+00:00")
806 )
807 except (ValueError, TypeError, AttributeError):
808 completed_at_obj = None
809 else:
810 completed_at_obj = research.completed_at
812 logger.debug(
813 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}"
814 )
815 logger.debug(
816 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}"
817 )
818 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}")
819 logger.debug(
820 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}"
821 )
823 processed_count = 0
824 for research in research_sessions:
825 try:
826 logger.info(
827 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}"
828 )
830 # Set search context so rate limiting works in both
831 # download_pdfs and extract_text paths
832 from ...utilities.thread_context import (
833 set_search_context,
834 )
836 set_search_context(
837 {
838 "research_id": str(research.id),
839 "username": username,
840 "user_password": password,
841 "research_phase": "document_scheduler",
842 }
843 )
845 # Call actual processing APIs
846 if settings.download_pdfs:
847 logger.info(
848 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}"
849 )
850 try:
851 # Use the DownloadService to queue PDF downloads
852 from ..research_library.services.download_service import (
853 DownloadService,
854 )
856 with DownloadService(
857 username=username, password=password
858 ) as download_service:
859 queued_count = download_service.queue_research_downloads(
860 research.id
861 )
862 logger.info(
863 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}"
864 )
865 except Exception:
866 logger.exception(
867 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}"
868 )
870 if settings.extract_text:
871 logger.info(
872 f"[DOC_SCHEDULER] Extracting text for research {research.id}"
873 )
874 try:
875 # Use the DownloadService to extract text for all resources
876 from ..research_library.services.download_service import (
877 DownloadService,
878 )
879 from ..database.models.research import (
880 ResearchResource,
881 )
883 from ..research_library.utils import (
884 is_downloadable_url,
885 )
887 with DownloadService(
888 username=username, password=password
889 ) as download_service:
890 # Get all resources for this research (reuse existing db session)
891 all_resources = (
892 db.query(ResearchResource)
893 .filter_by(research_id=research.id)
894 .all()
895 )
896 # Filter: only process downloadable resources (academic/PDF)
897 resources = [
898 r
899 for r in all_resources
900 if is_downloadable_url(r.url)
901 ]
902 processed_count = 0
903 for resource in resources:
904 # We need to pass the password to the download service
905 # The DownloadService creates its own database sessions, so we need to ensure password is available
906 try:
907 success, error = (
908 download_service.download_as_text(
909 resource.id
910 )
911 )
912 if success:
913 processed_count += 1
914 logger.info(
915 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}"
916 )
917 else:
918 logger.warning(
919 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}"
920 )
921 except Exception as resource_error:
922 logger.exception(
923 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}"
924 )
925 logger.info(
926 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed"
927 )
928 except Exception:
929 logger.exception(
930 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}"
931 )
933 if settings.generate_rag:
934 logger.info(
935 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}"
936 )
937 try:
938 # Get embedding settings from user configuration
939 embedding_model = settings_manager.get_setting(
940 "local_search_embedding_model",
941 "all-MiniLM-L6-v2",
942 )
943 embedding_provider = (
944 settings_manager.get_setting(
945 "local_search_embedding_provider",
946 "sentence_transformers",
947 )
948 )
949 chunk_size = int(
950 settings_manager.get_setting(
951 "local_search_chunk_size", 1000
952 )
953 )
954 chunk_overlap = int(
955 settings_manager.get_setting(
956 "local_search_chunk_overlap", 200
957 )
958 )
960 # Initialize RAG service with user's embedding configuration
961 with LibraryRAGService(
962 username=username,
963 embedding_model=embedding_model,
964 embedding_provider=embedding_provider,
965 chunk_size=chunk_size,
966 chunk_overlap=chunk_overlap,
967 db_password=password,
968 ) as rag_service:
969 # Get default Library collection ID
970 library_collection_id = (
971 get_default_library_id(
972 username, password
973 )
974 )
976 # Query for unindexed documents from this research session
977 documents_to_index = (
978 db.query(Document.id, Document.title)
979 .outerjoin(
980 DocumentCollection,
981 (
982 DocumentCollection.document_id
983 == Document.id
984 )
985 & (
986 DocumentCollection.collection_id
987 == library_collection_id
988 ),
989 )
990 .filter(
991 Document.research_id == research.id,
992 Document.text_content.isnot(None),
993 (
994 DocumentCollection.indexed.is_(
995 False
996 )
997 | DocumentCollection.id.is_(
998 None
999 )
1000 ),
1001 )
1002 .all()
1003 )
1005 if not documents_to_index:
1006 logger.info(
1007 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}"
1008 )
1009 else:
1010 # Index each document
1011 indexed_count = 0
1012 for (
1013 doc_id,
1014 doc_title,
1015 ) in documents_to_index:
1016 try:
1017 result = rag_service.index_document(
1018 document_id=doc_id,
1019 collection_id=library_collection_id,
1020 force_reindex=False,
1021 )
1022 if (
1023 result["status"]
1024 == "success"
1025 ):
1026 indexed_count += 1
1027 logger.info(
1028 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) "
1029 f"with {result.get('chunk_count', 0)} chunks"
1030 )
1031 except Exception as doc_error:
1032 logger.exception(
1033 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}"
1034 )
1036 logger.info(
1037 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: "
1038 f"{indexed_count}/{len(documents_to_index)} documents indexed"
1039 )
1040 except Exception:
1041 logger.exception(
1042 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}"
1043 )
1045 processed_count += 1
1046 logger.debug(
1047 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}"
1048 )
1050 except Exception:
1051 logger.exception(
1052 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}"
1053 )
1055 # Update last run time in user's settings.
1056 # Intentionally NOT wrapped in try/finally: if upstream setup
1057 # fails (DB open, SettingsManager init, initial query),
1058 # last_run should stay put so the next tick retries.
1059 # Advancing here would mask a persistent failure (corrupted
1060 # DB, wrong password). See closed PR #3288.
1061 current_time = datetime.now(UTC).isoformat()
1062 settings_manager.set_setting(
1063 "document_scheduler.last_run", current_time, commit=True
1064 )
1065 logger.debug(
1066 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}"
1067 )
1069 end_time = datetime.now(UTC)
1070 duration = (end_time - start_time).total_seconds()
1071 logger.info(
1072 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s"
1073 )
1075 except Exception:
1076 logger.exception(
1077 f"[DOC_SCHEDULER] Error processing documents for user {username}"
1078 )
1080 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]:
1081 """Get document scheduler status for a specific user."""
1082 try:
1083 session_info = self.user_sessions.get(username)
1084 if not session_info:
1085 return {
1086 "enabled": False,
1087 "message": "User not found in scheduler",
1088 }
1090 # Get user's document scheduler settings (cached)
1091 settings = self._get_document_scheduler_settings(username)
1093 # Check if user has document processing job
1094 job_id = f"{username}_document_processing"
1095 has_job = job_id in session_info.get("scheduled_jobs", set())
1097 return {
1098 "enabled": settings.enabled,
1099 "interval_seconds": settings.interval_seconds,
1100 "processing_options": {
1101 "download_pdfs": settings.download_pdfs,
1102 "extract_text": settings.extract_text,
1103 "generate_rag": settings.generate_rag,
1104 },
1105 "last_run": settings.last_run,
1106 "has_scheduled_job": has_job,
1107 "user_active": username in self.user_sessions,
1108 }
1110 except Exception as e:
1111 logger.exception(
1112 f"Error getting document scheduler status for user {username}"
1113 )
1114 return {
1115 "enabled": False,
1116 "message": f"Failed to retrieve scheduler status: {type(e).__name__}",
1117 }
1119 def trigger_document_processing(self, username: str) -> bool:
1120 """Trigger immediate document processing for a user."""
1121 logger.info(
1122 f"[DOC_SCHEDULER] Manual trigger requested for user {username}"
1123 )
1124 try:
1125 session_info = self.user_sessions.get(username)
1126 if not session_info:
1127 logger.warning(
1128 f"[DOC_SCHEDULER] User {username} not found in scheduler"
1129 )
1130 logger.debug(
1131 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}"
1132 )
1133 return False
1135 if not self.is_running:
1136 logger.warning(
1137 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}"
1138 )
1139 return False
1141 # Trigger immediate processing
1142 job_id = f"{username}_document_processing_manual"
1143 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}")
1145 self.scheduler.add_job(
1146 func=self._wrap_job(self._process_user_documents),
1147 args=[username],
1148 trigger="date",
1149 run_date=datetime.now(UTC) + timedelta(seconds=1),
1150 id=job_id,
1151 name=f"Manual Document Processing for {username}",
1152 replace_existing=True,
1153 )
1155 # Verify job was added
1156 job = self.scheduler.get_job(job_id)
1157 if job:
1158 logger.info(
1159 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}"
1160 )
1161 else:
1162 logger.error(
1163 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!"
1164 )
1165 return False
1167 return True
1169 except Exception:
1170 logger.exception(
1171 f"[DOC_SCHEDULER] Error triggering document processing for user {username}"
1172 )
1173 return False
1175 @thread_cleanup
1176 def _check_user_overdue_subscriptions(self, username: str):
1177 """Check and immediately run any overdue subscriptions for a user."""
1178 try:
1179 session_info = self.user_sessions.get(username)
1180 if not session_info:
1181 return
1183 password = self._credential_store.retrieve(username)
1184 if not password:
1185 return
1187 # Get user's overdue subscriptions
1188 from ..database.session_context import get_user_db_session
1189 from ..database.models.news import NewsSubscription
1190 from datetime import timezone
1192 with get_user_db_session(username, password) as db:
1193 now = datetime.now(timezone.utc)
1194 overdue_subs = (
1195 db.query(NewsSubscription)
1196 .filter(
1197 NewsSubscription.is_active.is_(True),
1198 NewsSubscription.next_refresh.is_not(None),
1199 NewsSubscription.next_refresh <= now,
1200 )
1201 .all()
1202 )
1204 if overdue_subs:
1205 logger.info(
1206 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1207 )
1209 for sub in overdue_subs:
1210 # Run immediately with small random delay
1211 # Security: random delay to stagger overdue jobs, not security-sensitive
1212 delay_seconds = random.randint(1, 30)
1213 job_id = (
1214 f"overdue_{username}_{sub.id}_{int(now.timestamp())}"
1215 )
1217 self.scheduler.add_job(
1218 func=self._wrap_job(self._check_subscription),
1219 args=[username, sub.id],
1220 trigger="date",
1221 run_date=now + timedelta(seconds=delay_seconds),
1222 id=job_id,
1223 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}",
1224 replace_existing=True,
1225 )
1227 logger.info(
1228 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds"
1229 )
1231 except Exception:
1232 logger.exception(
1233 f"Error checking overdue subscriptions for {username}"
1234 )
1236 @thread_cleanup
1237 def _check_subscription(self, username: str, subscription_id: int):
1238 """Check and refresh a single subscription."""
1239 logger.info(
1240 f"_check_subscription called for user {username}, subscription {subscription_id}"
1241 )
1242 try:
1243 session_info = self.user_sessions.get(username)
1244 if not session_info:
1245 # User no longer active, cancel job
1246 job_id = f"{username}_{subscription_id}"
1247 try:
1248 self.scheduler.remove_job(job_id)
1249 except JobLookupError:
1250 pass
1251 return
1253 password = self._credential_store.retrieve(username)
1254 if not password: 1254 ↛ 1255line 1254 didn't jump to line 1255 because the condition on line 1254 was never true
1255 logger.warning(
1256 f"Credentials expired for {username}, skipping subscription check"
1257 )
1258 return
1260 # Get subscription details
1261 from ..database.session_context import get_user_db_session
1262 from ..database.models.news import NewsSubscription
1264 with get_user_db_session(username, password) as db:
1265 sub = db.query(NewsSubscription).get(subscription_id)
1266 if not sub or not sub.is_active:
1267 logger.info(
1268 f"Subscription {subscription_id} not active, skipping"
1269 )
1270 return
1272 # Prepare query with date replacement using user's timezone
1273 query = sub.query_or_topic
1274 if "YYYY-MM-DD" in query:
1275 from local_deep_research.news.core.utils import (
1276 get_local_date_string,
1277 )
1278 from ..settings.manager import SettingsManager
1280 settings_manager = SettingsManager(db)
1281 local_date = get_local_date_string(settings_manager)
1282 query = query.replace("YYYY-MM-DD", local_date)
1284 # Update last/next refresh times
1285 sub.last_refresh = datetime.now(UTC)
1286 sub.next_refresh = datetime.now(UTC) + timedelta(
1287 minutes=sub.refresh_interval_minutes
1288 )
1289 db.commit()
1291 subscription_data = {
1292 "id": sub.id,
1293 "name": sub.name,
1294 "query": query,
1295 "original_query": sub.query_or_topic,
1296 "model_provider": sub.model_provider,
1297 "model": sub.model,
1298 "search_strategy": sub.search_strategy,
1299 "search_engine": sub.search_engine,
1300 }
1302 logger.info(
1303 f"Refreshing subscription {subscription_id}: {subscription_data['name']}"
1304 )
1306 # Trigger research synchronously using requests with proper auth
1307 self._trigger_subscription_research_sync(
1308 username, subscription_data
1309 )
1311 # Reschedule for next interval if using interval trigger
1312 job_id = f"{username}_{subscription_id}"
1313 job = self.scheduler.get_job(job_id)
1314 if job and job.trigger.__class__.__name__ == "DateTrigger":
1315 # For date triggers, reschedule
1316 # Security: random jitter to distribute subscription timing, not security-sensitive
1317 next_run = datetime.now(UTC) + timedelta(
1318 minutes=sub.refresh_interval_minutes,
1319 seconds=random.randint(
1320 0, int(self.config.get("max_jitter_seconds", 300))
1321 ),
1322 )
1323 self.scheduler.add_job(
1324 func=self._wrap_job(self._check_subscription),
1325 args=[username, subscription_id],
1326 trigger="date",
1327 run_date=next_run,
1328 id=job_id,
1329 replace_existing=True,
1330 )
1332 except Exception:
1333 logger.exception(f"Error checking subscription {subscription_id}")
1335 @thread_cleanup
1336 def _trigger_subscription_research_sync(
1337 self, username: str, subscription: Dict[str, Any]
1338 ):
1339 """Trigger research for a subscription using programmatic API."""
1340 from ..config.thread_settings import set_settings_context
1342 try:
1343 # Get user's password from session info
1344 session_info = self.user_sessions.get(username)
1345 if not session_info:
1346 logger.error(f"No session info for user {username}")
1347 return
1349 password = self._credential_store.retrieve(username)
1350 if not password: 1350 ↛ 1351line 1350 didn't jump to line 1351 because the condition on line 1350 was never true
1351 logger.error(f"Credentials expired for user {username}")
1352 return
1354 # Generate research ID
1355 import uuid
1357 research_id = str(uuid.uuid4())
1359 logger.info(
1360 f"Starting research {research_id} for subscription {subscription['id']}"
1361 )
1363 # Get user settings for research
1364 from ..database.session_context import get_user_db_session
1365 from ..settings.manager import SettingsManager
1367 with get_user_db_session(username, password) as db:
1368 settings_manager = SettingsManager(db)
1369 settings_snapshot = settings_manager.get_settings_snapshot()
1371 # Use the search engine from the subscription if specified
1372 search_engine = subscription.get("search_engine")
1374 if search_engine:
1375 settings_snapshot["search.tool"] = {
1376 "value": search_engine,
1377 "ui_element": "select",
1378 }
1379 logger.info(
1380 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}"
1381 )
1382 else:
1383 # Use the user's default search tool from their settings
1384 default_search_tool = settings_snapshot.get(
1385 "search.tool", "auto"
1386 )
1387 logger.info(
1388 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}"
1389 )
1391 logger.debug(
1392 f"Settings snapshot has {len(settings_snapshot)} settings"
1393 )
1394 # Log a few key settings to verify they're present
1395 logger.debug(
1396 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}"
1397 )
1399 # Set up research parameters
1400 query = subscription["query"]
1402 # Build metadata for news search
1403 metadata = {
1404 "is_news_search": True,
1405 "search_type": "news_analysis",
1406 "display_in": "news_feed",
1407 "subscription_id": subscription["id"],
1408 "triggered_by": "scheduler",
1409 "subscription_name": subscription["name"],
1410 "title": subscription["name"] if subscription["name"] else None,
1411 "scheduled_at": datetime.now(UTC).isoformat(),
1412 "original_query": subscription["original_query"],
1413 "user_id": username,
1414 }
1416 # Use programmatic API with settings context
1417 from ..api.research_functions import quick_summary
1419 # Create and set settings context for this thread
1420 settings_context = SnapshotSettingsContext(settings_snapshot)
1421 set_settings_context(settings_context)
1423 # Get search strategy from subscription data (for the API call)
1424 search_strategy = subscription.get(
1425 "search_strategy", "news_aggregation"
1426 )
1428 # Call quick_summary with appropriate parameters
1429 result = quick_summary(
1430 query=query,
1431 research_id=research_id,
1432 username=username,
1433 user_password=password,
1434 settings_snapshot=settings_snapshot,
1435 search_strategy=search_strategy,
1436 model_name=subscription.get("model"),
1437 provider=subscription.get("model_provider"),
1438 iterations=1, # Single iteration for news
1439 metadata=metadata,
1440 search_original_query=False, # Don't send long subscription prompts to search engines
1441 )
1443 logger.info(
1444 f"Completed research {research_id} for subscription {subscription['id']}"
1445 )
1447 # Store the research result in the database
1448 self._store_research_result(
1449 username,
1450 password,
1451 research_id,
1452 subscription["id"],
1453 result,
1454 subscription,
1455 )
1457 except Exception:
1458 logger.exception(
1459 f"Error triggering research for subscription {subscription['id']}"
1460 )
1462 def _store_research_result(
1463 self,
1464 username: str,
1465 password: str,
1466 research_id: str,
1467 subscription_id: int,
1468 result: Dict[str, Any],
1469 subscription: Dict[str, Any],
1470 ):
1471 """Store research result in database for news display."""
1472 try:
1473 from ..database.session_context import get_user_db_session
1474 from ..database.models import ResearchHistory
1475 from ..settings.manager import SettingsManager
1476 import json
1478 # Convert result to JSON-serializable format
1479 def make_serializable(obj):
1480 """Convert non-serializable objects to dictionaries."""
1481 if hasattr(obj, "dict"):
1482 return obj.dict()
1483 if hasattr(obj, "__dict__"): 1483 ↛ 1484line 1483 didn't jump to line 1484 because the condition on line 1483 was never true
1484 return {
1485 k: make_serializable(v)
1486 for k, v in obj.__dict__.items()
1487 if not k.startswith("_")
1488 }
1489 if isinstance(obj, (list, tuple)):
1490 return [make_serializable(item) for item in obj]
1491 if isinstance(obj, dict):
1492 return {k: make_serializable(v) for k, v in obj.items()}
1493 return obj
1495 serializable_result = make_serializable(result)
1497 with get_user_db_session(username, password) as db:
1498 # Get user settings to store in metadata
1499 settings_manager = SettingsManager(db)
1500 settings_snapshot = settings_manager.get_settings_snapshot()
1502 # Get the report content - check both 'report' and 'summary' fields
1503 report_content = serializable_result.get(
1504 "report"
1505 ) or serializable_result.get("summary")
1506 logger.debug(
1507 f"Report content length: {len(report_content) if report_content else 0} chars"
1508 )
1510 # Extract sources/links from the result
1511 sources = serializable_result.get("sources", [])
1513 # First add the sources/references section if we have sources
1514 if report_content and sources:
1515 # Import utilities for formatting links
1516 from ..utilities.search_utilities import (
1517 format_links_to_markdown,
1518 )
1520 # Format the links/citations
1521 formatted_links = format_links_to_markdown(sources)
1523 # Add references section to the report
1524 if formatted_links:
1525 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}"
1527 # Then format citations in the report content
1528 if report_content:
1529 # Import citation formatter
1530 from ..text_optimization.citation_formatter import (
1531 CitationFormatter,
1532 CitationMode,
1533 )
1534 from ..config.search_config import (
1535 get_setting_from_snapshot,
1536 )
1538 # Get citation format from settings
1539 citation_format = get_setting_from_snapshot(
1540 "report.citation_format", "domain_id_hyperlinks"
1541 )
1542 mode_map = {
1543 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
1544 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
1545 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
1546 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
1547 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
1548 }
1549 mode = mode_map.get(
1550 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS
1551 )
1552 formatter = CitationFormatter(mode=mode)
1554 # Format citations within the content
1555 report_content = formatter.format_document(report_content)
1557 if not report_content:
1558 # If neither field exists, use the full result as JSON
1559 report_content = json.dumps(serializable_result)
1561 # Generate headline and topics for news searches
1562 from ..news.utils.headline_generator import generate_headline
1563 from ..news.utils.topic_generator import generate_topics
1565 query_text = result.get(
1566 "query", subscription.get("query", "News Update")
1567 )
1569 # Generate headline from the actual research findings
1570 logger.info(
1571 f"Generating headline for subscription {subscription_id}"
1572 )
1573 generated_headline = generate_headline(
1574 query=query_text,
1575 findings=report_content,
1576 max_length=200, # Allow longer headlines for news
1577 )
1579 # Generate topics from the findings
1580 logger.info(
1581 f"Generating topics for subscription {subscription_id}"
1582 )
1583 generated_topics = generate_topics(
1584 query=query_text,
1585 findings=report_content,
1586 category=subscription.get("name", "News"),
1587 max_topics=6,
1588 )
1590 logger.info(
1591 f"Generated headline: {generated_headline}, topics: {generated_topics}"
1592 )
1594 # Get subscription name for metadata
1595 subscription_name = subscription.get("name", "")
1597 # Use generated headline as title, or fallback
1598 if generated_headline:
1599 title = generated_headline
1600 else:
1601 if subscription_name:
1602 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}"
1603 else:
1604 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}"
1606 # Create research history entry
1607 history_entry = ResearchHistory(
1608 id=research_id,
1609 query=result.get("query", ""),
1610 mode="news_subscription",
1611 status="completed",
1612 created_at=datetime.now(UTC).isoformat(),
1613 completed_at=datetime.now(UTC).isoformat(),
1614 title=title,
1615 research_meta={
1616 "subscription_id": subscription_id,
1617 "triggered_by": "scheduler",
1618 "is_news_search": True,
1619 "username": username,
1620 "subscription_name": subscription_name, # Store subscription name for display
1621 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval
1622 "generated_headline": generated_headline, # Store generated headline for news display
1623 "generated_topics": generated_topics, # Store topics for categorization
1624 },
1625 )
1626 db.add(history_entry)
1627 db.commit()
1629 # Store the report content using storage abstraction
1630 from ..storage import get_report_storage
1632 # Use storage to save the report (report_content already retrieved above)
1633 storage = get_report_storage(session=db)
1634 storage.save_report(
1635 research_id=research_id,
1636 content=report_content,
1637 username=username,
1638 )
1640 logger.info(
1641 f"Stored research result {research_id} for subscription {subscription_id}"
1642 )
1644 except Exception:
1645 logger.exception("Error storing research result")
1647 def _run_cleanup_with_tracking(self):
1648 """Wrapper that tracks cleanup execution."""
1650 try:
1651 cleaned_count = self._cleanup_inactive_users()
1653 logger.info(
1654 f"Cleanup successful: removed {cleaned_count} inactive users"
1655 )
1657 except Exception:
1658 logger.exception("Cleanup job failed")
1660 def _cleanup_inactive_users(self) -> int:
1661 """Remove users inactive for longer than retention period."""
1662 retention_hours = self.config.get("retention_hours", 48)
1663 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours)
1665 cleaned_count = 0
1667 with self.lock:
1668 inactive_users = [
1669 user_id
1670 for user_id, session in self.user_sessions.items()
1671 if session["last_activity"] < cutoff
1672 ]
1674 for user_id in inactive_users:
1675 # Remove all scheduled jobs
1676 for job_id in self.user_sessions[user_id][
1677 "scheduled_jobs"
1678 ].copy():
1679 try:
1680 self.scheduler.remove_job(job_id)
1681 except JobLookupError:
1682 pass
1684 # Clear credentials and session data
1685 self._credential_store.clear(user_id)
1686 del self.user_sessions[user_id]
1687 cleaned_count += 1
1688 logger.info(f"Cleaned up inactive user {user_id}")
1690 return cleaned_count
1692 def _reload_config(self):
1693 """Reload configuration from settings manager."""
1694 if not hasattr(self, "settings_manager") or not self.settings_manager:
1695 return
1697 try:
1698 old_retention = self.config.get("retention_hours", 48)
1700 # Reload all settings
1701 for key in self.config:
1702 if key == "enabled":
1703 continue # Don't change enabled state while running
1705 full_key = f"news.scheduler.{key}"
1706 self.config[key] = self._get_setting(full_key, self.config[key])
1708 # Handle changes that need immediate action
1709 if old_retention != self.config["retention_hours"]:
1710 logger.info(
1711 f"Retention period changed from {old_retention} "
1712 f"to {self.config['retention_hours']} hours"
1713 )
1714 # Trigger immediate cleanup with new retention
1715 self.scheduler.add_job(
1716 self._wrap_job(self._run_cleanup_with_tracking),
1717 "date",
1718 run_date=datetime.now(UTC) + timedelta(seconds=5),
1719 id="immediate_cleanup_config_change",
1720 )
1722 # Clear settings cache to pick up any user setting changes
1723 self.invalidate_all_settings_cache()
1725 except Exception:
1726 logger.exception("Error reloading configuration")
1728 def get_status(self) -> Dict[str, Any]:
1729 """Get scheduler status information."""
1730 with self.lock:
1731 active_users = len(self.user_sessions)
1732 total_jobs = sum(
1733 len(session["scheduled_jobs"])
1734 for session in self.user_sessions.values()
1735 )
1737 # Get next run time for cleanup job
1738 next_cleanup = None
1739 if self.is_running:
1740 job = self.scheduler.get_job("cleanup_inactive_users")
1741 if job: 1741 ↛ 1744line 1741 didn't jump to line 1744 because the condition on line 1741 was always true
1742 next_cleanup = job.next_run_time
1744 return {
1745 "is_running": self.is_running,
1746 "config": self.config,
1747 "active_users": active_users,
1748 "total_scheduled_jobs": total_jobs,
1749 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None,
1750 "memory_usage": self._estimate_memory_usage(),
1751 }
1753 def _estimate_memory_usage(self) -> int:
1754 """Estimate memory usage of user sessions."""
1756 # Rough estimate: username (50) + password (100) + metadata (200) per user
1757 per_user_estimate = 350
1758 return len(self.user_sessions) * per_user_estimate
1760 def get_user_sessions_summary(self) -> List[Dict[str, Any]]:
1761 """Get summary of active user sessions (without passwords)."""
1762 with self.lock:
1763 summary = []
1764 for user_id, session in self.user_sessions.items():
1765 summary.append(
1766 {
1767 "user_id": user_id,
1768 "last_activity": session["last_activity"].isoformat(),
1769 "scheduled_jobs": len(session["scheduled_jobs"]),
1770 "time_since_activity": str(
1771 datetime.now(UTC) - session["last_activity"]
1772 ),
1773 }
1774 )
1775 return summary
1778# Singleton instance getter
1779_scheduler_instance = None
1782def get_background_job_scheduler() -> BackgroundJobScheduler:
1783 """Get the singleton news scheduler instance."""
1784 global _scheduler_instance
1785 if _scheduler_instance is None:
1786 _scheduler_instance = BackgroundJobScheduler()
1787 return _scheduler_instance