Coverage for src/local_deep_research/scheduler/background.py: 96%
678 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Activity-based news subscription scheduler for per-user encrypted databases.
3Tracks user activity and temporarily stores credentials for automatic updates.
4"""
6import random
7import threading
8from dataclasses import dataclass
9from datetime import datetime, timedelta, UTC
10from functools import wraps
11from typing import Any, Callable, Dict, List
13from cachetools import TTLCache
14from loguru import logger
15from ..settings.logger import log_settings
16from ..settings.manager import SnapshotSettingsContext
18from apscheduler.schedulers.background import BackgroundScheduler
19from apscheduler.jobstores.base import JobLookupError
20from ..constants import ResearchStatus
21from ..database.credential_store_base import CredentialStoreBase
22from ..database.session_context import safe_rollback
23from ..database.thread_local_session import thread_cleanup
25# RAG indexing imports
26from ..research_library.services.library_rag_service import LibraryRAGService
27from ..database.library_init import get_default_library_id
28from ..database.models.library import Document, DocumentCollection
31SCHEDULER_AVAILABLE = True # Always available since it's a required dependency
34class SchedulerCredentialStore(CredentialStoreBase):
35 """Credential store for the news scheduler.
37 Stores user passwords with TTL expiration so that background scheduler
38 jobs can access encrypted per-user databases.
39 """
41 def __init__(self, ttl_hours: int = 48):
42 super().__init__(ttl_hours * 3600)
44 def store(self, username: str, password: str) -> None:
45 """Store password for a user."""
46 self._store_credentials(
47 username, {"username": username, "password": password}
48 )
50 def retrieve(self, username: str) -> str | None:
51 """Retrieve password for a user. Returns None if expired/missing."""
52 result = self._retrieve_credentials(username, remove=False)
53 return result[1] if result else None
55 def clear(self, username: str) -> None:
56 """Clear stored password for a user."""
57 self.clear_entry(username)
60@dataclass(frozen=True)
61class DocumentSchedulerSettings:
62 """
63 Immutable settings snapshot for document scheduler.
65 Thread-safe: This is a frozen dataclass that can be safely passed
66 to and used from background threads.
67 """
69 enabled: bool = True
70 interval_seconds: int = 1800
71 download_pdfs: bool = False
72 extract_text: bool = True
73 generate_rag: bool = False
74 last_run: str = ""
76 @classmethod
77 def defaults(cls) -> "DocumentSchedulerSettings":
78 """Return default settings."""
79 return cls()
82class BackgroundJobScheduler:
83 """
84 Singleton scheduler that manages news subscriptions for active users.
86 This scheduler:
87 - Monitors user activity through database access
88 - Temporarily stores user credentials in memory
89 - Automatically schedules subscription checks
90 - Cleans up inactive users after configurable period
91 """
93 _instance = None
94 _lock = threading.Lock()
96 def __new__(cls):
97 """Ensure singleton instance."""
98 if cls._instance is None:
99 with cls._lock:
100 if cls._instance is None: 100 ↛ 102line 100 didn't jump to line 102
101 cls._instance = super().__new__(cls)
102 return cls._instance
104 def __init__(self):
105 """Initialize the scheduler (only runs once due to singleton)."""
106 # Skip if already initialized
107 if hasattr(self, "_initialized"):
108 return
110 # User session tracking
111 self.user_sessions = {} # user_id -> {last_activity, scheduled_jobs}
112 self.lock = threading.Lock()
114 # Credential store with TTL-based expiration
115 self._credential_store = SchedulerCredentialStore(ttl_hours=48)
117 # Scheduler instance
118 self.scheduler = BackgroundScheduler()
120 # Configuration (will be loaded from settings)
121 self.config = self._load_default_config()
123 # State
124 self.is_running = False
125 self._app = None # Flask app reference for background job contexts
127 # Settings cache: username -> DocumentSchedulerSettings
128 # TTL of 300 seconds (5 minutes) reduces database queries
129 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300)
130 self._settings_cache_lock = threading.Lock()
132 self._initialized = True
133 logger.info("News scheduler initialized")
135 def _load_default_config(self) -> Dict[str, Any]:
136 """Load default configuration (will be overridden by settings manager)."""
137 return {
138 "enabled": True,
139 "retention_hours": 48,
140 "cleanup_interval_hours": 1,
141 "max_jitter_seconds": 300,
142 "max_concurrent_jobs": 10,
143 "subscription_batch_size": 5,
144 "activity_check_interval_minutes": 5,
145 }
147 def initialize_with_settings(self, settings_manager):
148 """Initialize configuration from settings manager."""
149 try:
150 # Load all scheduler settings
151 self.settings_manager = settings_manager
152 self.config = {
153 "enabled": self._get_setting("news.scheduler.enabled", True),
154 "retention_hours": self._get_setting(
155 "news.scheduler.retention_hours", 48
156 ),
157 "cleanup_interval_hours": self._get_setting(
158 "news.scheduler.cleanup_interval_hours", 1
159 ),
160 "max_jitter_seconds": self._get_setting(
161 "news.scheduler.max_jitter_seconds", 300
162 ),
163 "max_concurrent_jobs": self._get_setting(
164 "news.scheduler.max_concurrent_jobs", 10
165 ),
166 "subscription_batch_size": self._get_setting(
167 "news.scheduler.batch_size", 5
168 ),
169 "activity_check_interval_minutes": self._get_setting(
170 "news.scheduler.activity_check_interval", 5
171 ),
172 }
173 log_settings(self.config, "Scheduler configuration loaded")
174 except Exception:
175 logger.exception("Error loading scheduler settings")
176 # Keep default config
178 def _get_setting(self, key: str, default: Any) -> Any:
179 """Get setting with fallback to default."""
180 if hasattr(self, "settings_manager") and self.settings_manager:
181 return self.settings_manager.get_setting(key, default=default)
182 return default
184 def set_app(self, app) -> None:
185 """Store a reference to the Flask app for creating app contexts in background jobs."""
186 self._app = app
188 def _wrap_job(self, func: Callable) -> Callable:
189 """Wrap a scheduler job function so it runs inside a Flask app context.
191 APScheduler runs jobs in a thread pool without Flask context.
192 This wrapper pushes an app context before the job runs and pops it after.
193 """
195 @wraps(func)
196 def wrapper(*args, **kwargs):
197 if self._app is not None:
198 with self._app.app_context():
199 return func(*args, **kwargs)
200 else:
201 logger.warning(
202 f"No Flask app set on scheduler; running {func.__name__} without app context"
203 )
204 return func(*args, **kwargs)
206 return wrapper
208 def _get_document_scheduler_settings(
209 self, username: str, force_refresh: bool = False
210 ) -> DocumentSchedulerSettings:
211 """
212 Get document scheduler settings for a user with TTL caching.
214 This is the single source of truth for document scheduler settings.
215 Settings are cached for 5 minutes by default to reduce database queries.
217 Args:
218 username: User to get settings for
219 force_refresh: If True, bypass cache and fetch fresh settings
221 Returns:
222 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety)
223 """
224 # Fast path: check cache without modifying it
225 if not force_refresh:
226 with self._settings_cache_lock:
227 cached = self._settings_cache.get(username)
228 if cached is not None:
229 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}")
230 cached_settings: DocumentSchedulerSettings = cached
231 return cached_settings
233 # Cache miss - need to fetch from database
234 logger.debug(
235 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB"
236 )
238 # Get password from session
239 session_info = self.user_sessions.get(username)
240 if not session_info:
241 logger.warning(
242 f"[SETTINGS_CACHE] No session info for {username}, using defaults"
243 )
244 return DocumentSchedulerSettings.defaults()
246 password = self._credential_store.retrieve(username)
247 if not password: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 logger.warning(
249 f"[SETTINGS_CACHE] Credentials expired for {username}, using defaults"
250 )
251 return DocumentSchedulerSettings.defaults()
253 # Fetch settings from database (outside lock to avoid blocking)
254 try:
255 from ..database.session_context import get_user_db_session
256 from ..settings.manager import SettingsManager
258 with get_user_db_session(username, password) as db:
259 sm = SettingsManager(db)
261 settings = DocumentSchedulerSettings(
262 enabled=sm.get_setting("document_scheduler.enabled", True),
263 interval_seconds=sm.get_setting(
264 "document_scheduler.interval_seconds", 1800
265 ),
266 download_pdfs=sm.get_setting(
267 "document_scheduler.download_pdfs", False
268 ),
269 extract_text=sm.get_setting(
270 "document_scheduler.extract_text", True
271 ),
272 generate_rag=sm.get_setting(
273 "document_scheduler.generate_rag", False
274 ),
275 last_run=sm.get_setting("document_scheduler.last_run", ""),
276 )
278 # Store in cache
279 with self._settings_cache_lock:
280 self._settings_cache[username] = settings
281 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}")
283 return settings
285 except Exception:
286 logger.exception(
287 f"[SETTINGS_CACHE] Error fetching settings for {username}"
288 )
289 return DocumentSchedulerSettings.defaults()
291 def invalidate_user_settings_cache(self, username: str) -> bool:
292 """
293 Invalidate cached settings for a specific user.
295 Call this when user settings change or user logs out.
297 Args:
298 username: User whose cache to invalidate
300 Returns:
301 True if cache entry was removed, False if not found
302 """
303 with self._settings_cache_lock:
304 if username in self._settings_cache:
305 del self._settings_cache[username]
306 logger.debug(
307 f"[SETTINGS_CACHE] Invalidated cache for {username}"
308 )
309 return True
310 return False
312 def invalidate_all_settings_cache(self) -> int:
313 """
314 Invalidate all cached settings.
316 Call this when doing bulk settings updates or during config reload.
318 Returns:
319 Number of cache entries cleared
320 """
321 with self._settings_cache_lock:
322 count = len(self._settings_cache)
323 self._settings_cache.clear()
324 logger.info(
325 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)"
326 )
327 return count
329 def start(self):
330 """Start the scheduler."""
331 if not self.config.get("enabled", True):
332 logger.info("News scheduler is disabled in settings")
333 return
335 if self.is_running:
336 logger.warning("Scheduler is already running")
337 return
339 if self._app is None: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 raise RuntimeError(
341 "BackgroundJobScheduler.set_app() must be called before start()"
342 )
344 # Schedule cleanup job
345 self.scheduler.add_job(
346 self._wrap_job(self._run_cleanup_with_tracking),
347 "interval",
348 hours=self.config["cleanup_interval_hours"],
349 id="cleanup_inactive_users",
350 name="Cleanup Inactive User Sessions",
351 jitter=60, # Add some jitter to cleanup
352 )
354 # Schedule configuration reload
355 self.scheduler.add_job(
356 self._wrap_job(self._reload_config),
357 "interval",
358 minutes=30,
359 id="reload_config",
360 name="Reload Configuration",
361 )
363 # Start the scheduler
364 self.scheduler.start()
365 self.is_running = True
367 # Schedule initial cleanup after a delay
368 self.scheduler.add_job(
369 self._wrap_job(self._run_cleanup_with_tracking),
370 "date",
371 run_date=datetime.now(UTC) + timedelta(seconds=30),
372 id="initial_cleanup",
373 )
375 logger.info("News scheduler started")
377 def stop(self):
378 """Stop the scheduler."""
379 if self.is_running:
380 self.scheduler.shutdown(wait=True)
381 self.is_running = False
383 # Clear all user sessions and credentials
384 with self.lock:
385 for username in self.user_sessions:
386 self._credential_store.clear(username)
387 self.user_sessions.clear()
389 logger.info("News scheduler stopped")
391 def update_user_info(self, username: str, password: str):
392 """
393 Update user info in scheduler. Called on every database interaction.
395 Args:
396 username: User's username
397 password: User's password
398 """
399 logger.info(
400 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}"
401 )
402 logger.debug(
403 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}"
404 )
406 if not self.is_running:
407 logger.warning(
408 f"[SCHEDULER] Scheduler not running, cannot update user {username}"
409 )
410 return
412 with self.lock:
413 # Store password in credential store (inside lock to prevent
414 # race where concurrent calls leave mismatched credentials)
415 self._credential_store.store(username, password)
417 now = datetime.now(UTC)
419 if username not in self.user_sessions:
420 # New user - create session info
421 logger.info(f"[SCHEDULER] New user in scheduler: {username}")
422 self.user_sessions[username] = {
423 "last_activity": now,
424 "scheduled_jobs": set(),
425 }
426 logger.debug(
427 f"[SCHEDULER] Created session for {username}, scheduling subscriptions"
428 )
429 # Schedule their subscriptions
430 self._schedule_user_subscriptions(username)
431 else:
432 # Existing user - update info
433 logger.info(
434 f"[SCHEDULER] Updating existing user {username} activity, will reschedule"
435 )
436 old_activity = self.user_sessions[username]["last_activity"]
437 activity_delta = now - old_activity
438 logger.debug(
439 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}"
440 )
442 self.user_sessions[username]["last_activity"] = now
443 logger.debug(
444 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions"
445 )
446 # Reschedule their subscriptions in case they changed
447 self._schedule_user_subscriptions(username)
449 def unregister_user(self, username: str):
450 """
451 Unregister a user and clean up their scheduled jobs.
452 Called when user logs out.
453 """
454 with self.lock:
455 if username in self.user_sessions:
456 logger.info(f"Unregistering user {username}")
458 # Remove all scheduled jobs for this user
459 session_info = self.user_sessions[username]
460 for job_id in session_info["scheduled_jobs"].copy():
461 try:
462 self.scheduler.remove_job(job_id)
463 except JobLookupError:
464 pass
466 # Remove user session and clear credentials atomically
467 del self.user_sessions[username]
468 self._credential_store.clear(username)
470 # Invalidate settings cache for this user (outside lock)
471 self.invalidate_user_settings_cache(username)
472 logger.info(f"User {username} unregistered successfully")
474 def _schedule_user_subscriptions(self, username: str):
475 """Schedule all active subscriptions for a user."""
476 logger.info(f"_schedule_user_subscriptions called for {username}")
477 try:
478 session_info = self.user_sessions.get(username)
479 if not session_info:
480 logger.warning(f"No session info found for {username}")
481 return
483 password = self._credential_store.retrieve(username)
484 if not password: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 logger.warning(
486 f"Credentials expired for {username}, skipping subscription scheduling"
487 )
488 return
489 logger.debug(f"Got password for {username}: present")
491 # Get user's subscriptions from their encrypted database
492 from ..database.session_context import get_user_db_session
493 from ..database.models.news import NewsSubscription
495 with get_user_db_session(username, password) as db:
496 subscriptions = (
497 db.query(NewsSubscription).filter_by(is_active=True).all()
498 )
499 logger.debug(
500 f"Query executed, found {len(subscriptions)} results"
501 )
503 # Log details of each subscription
504 for sub in subscriptions:
505 logger.debug(
506 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes"
507 )
509 logger.info(
510 f"Found {len(subscriptions)} active subscriptions for {username}"
511 )
513 # Clear old jobs for this user
514 for job_id in session_info["scheduled_jobs"].copy():
515 try:
516 self.scheduler.remove_job(job_id)
517 session_info["scheduled_jobs"].remove(job_id)
518 except JobLookupError:
519 pass
521 # Schedule each subscription with jitter
522 for sub in subscriptions:
523 job_id = f"{username}_{sub.id}"
525 # Calculate jitter
526 # Security: random jitter to distribute subscription timing, not security-sensitive
527 max_jitter = int(self.config.get("max_jitter_seconds", 300))
528 jitter = random.randint(0, max_jitter)
530 # Determine trigger based on frequency
531 refresh_minutes = sub.refresh_interval_minutes
533 if refresh_minutes <= 60: # 60 minutes or less
534 # For hourly or more frequent, use interval trigger
535 trigger = "interval"
536 trigger_args = {
537 "minutes": refresh_minutes,
538 "jitter": jitter,
539 "start_date": datetime.now(UTC), # Start immediately
540 }
541 else:
542 # For less frequent, calculate next run time
543 now = datetime.now(UTC)
544 if sub.next_refresh:
545 # Ensure timezone-aware for comparison with now (UTC)
546 next_refresh_aware = sub.next_refresh
547 if next_refresh_aware.tzinfo is None:
548 logger.warning(
549 f"Subscription {sub.id} has naive (non-tz-aware) "
550 f"next_refresh datetime, assuming UTC"
551 )
552 next_refresh_aware = next_refresh_aware.replace(
553 tzinfo=UTC
554 )
555 if next_refresh_aware <= now:
556 # Subscription is overdue - run it immediately with small jitter
557 logger.info(
558 f"Subscription {sub.id} is overdue, scheduling immediate run"
559 )
560 next_run = now + timedelta(seconds=jitter)
561 else:
562 next_run = next_refresh_aware
563 else:
564 next_run = now + timedelta(
565 minutes=refresh_minutes, seconds=jitter
566 )
568 trigger = "date"
569 trigger_args = {"run_date": next_run}
571 # Add the job
572 self.scheduler.add_job(
573 func=self._wrap_job(self._check_subscription),
574 args=[username, sub.id],
575 trigger=trigger,
576 id=job_id,
577 name=f"Check {sub.name or sub.query_or_topic[:30]}",
578 replace_existing=True,
579 **trigger_args,
580 )
582 session_info["scheduled_jobs"].add(job_id)
583 logger.info(f"Scheduled job {job_id} with {trigger} trigger")
585 except Exception:
586 logger.exception(f"Error scheduling subscriptions for {username}")
588 # Add document processing for this user
589 self._schedule_document_processing(username)
591 def _schedule_document_processing(self, username: str):
592 """Schedule document processing for a user."""
593 logger.info(
594 f"[DOC_SCHEDULER] Scheduling document processing for {username}"
595 )
596 logger.debug(
597 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}"
598 )
600 try:
601 session_info = self.user_sessions.get(username)
602 if not session_info:
603 logger.warning(
604 f"[DOC_SCHEDULER] No session info found for {username}"
605 )
606 logger.debug(
607 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}"
608 )
609 return
611 logger.debug(
612 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}"
613 )
615 # Get user's document scheduler settings (cached)
616 settings = self._get_document_scheduler_settings(username)
618 if not settings.enabled:
619 logger.info(
620 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}"
621 )
622 return
624 logger.info(
625 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, "
626 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, "
627 f"text={settings.extract_text}, rag={settings.generate_rag}"
628 )
630 # Schedule document processing job
631 job_id = f"{username}_document_processing"
632 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}")
634 # Remove existing document job if any
635 try:
636 self.scheduler.remove_job(job_id)
637 session_info["scheduled_jobs"].discard(job_id)
638 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}")
639 except JobLookupError:
640 logger.debug(
641 f"[DOC_SCHEDULER] No existing job {job_id} to remove"
642 )
643 pass # Job doesn't exist, that's fine
645 # Add new document processing job
646 logger.debug(
647 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s"
648 )
649 self.scheduler.add_job(
650 func=self._wrap_job(self._process_user_documents),
651 args=[username],
652 trigger="interval",
653 seconds=settings.interval_seconds,
654 id=job_id,
655 name=f"Process Documents for {username}",
656 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously
657 max_instances=1, # Prevent overlapping document processing for same user
658 replace_existing=True,
659 )
661 session_info["scheduled_jobs"].add(job_id)
662 logger.info(
663 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval"
664 )
665 logger.debug(
666 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}"
667 )
669 # Verify job was added
670 job = self.scheduler.get_job(job_id)
671 if job:
672 logger.info(
673 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}"
674 )
675 else:
676 logger.error(
677 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!"
678 )
680 except Exception:
681 logger.exception(
682 f"Error scheduling document processing for {username}"
683 )
685 @thread_cleanup
686 def _process_user_documents(self, username: str):
687 """Process documents for a user."""
688 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}")
689 start_time = datetime.now(UTC)
691 try:
692 session_info = self.user_sessions.get(username)
693 if not session_info:
694 logger.warning(
695 f"[DOC_SCHEDULER] No session info found for user {username}"
696 )
697 return
699 password = self._credential_store.retrieve(username)
700 if not password: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true
701 logger.warning(
702 f"[DOC_SCHEDULER] Credentials expired for user {username}"
703 )
704 return
705 logger.debug(
706 f"[DOC_SCHEDULER] Starting document processing for {username}"
707 )
709 # Get user's document scheduler settings (cached)
710 settings = self._get_document_scheduler_settings(username)
712 logger.info(
713 f"[DOC_SCHEDULER] Processing settings for {username}: "
714 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}"
715 )
717 if not any(
718 [
719 settings.download_pdfs,
720 settings.extract_text,
721 settings.generate_rag,
722 ]
723 ):
724 logger.info(
725 f"[DOC_SCHEDULER] No processing options enabled for user {username}"
726 )
727 return
729 # Parse last_run from cached settings
730 last_run = (
731 datetime.fromisoformat(settings.last_run)
732 if settings.last_run
733 else None
734 )
736 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}")
738 # Need database session for queries and updates
739 from ..database.session_context import get_user_db_session
740 from ..database.models.research import ResearchHistory
741 from ..settings.manager import SettingsManager
743 with get_user_db_session(username, password) as db:
744 settings_manager = SettingsManager(db)
746 # Query for completed research since last run
747 logger.debug(
748 f"[DOC_SCHEDULER] Querying for completed research since {last_run}"
749 )
750 query = db.query(ResearchHistory).filter(
751 ResearchHistory.status == ResearchStatus.COMPLETED,
752 ResearchHistory.completed_at.is_not(
753 None
754 ), # Ensure completed_at is not null
755 )
757 if last_run:
758 query = query.filter(
759 ResearchHistory.completed_at > last_run
760 )
762 # Limit to recent research to prevent overwhelming
763 query = query.order_by(
764 ResearchHistory.completed_at.desc()
765 ).limit(20)
767 research_sessions = query.all()
768 logger.debug(
769 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions"
770 )
772 if not research_sessions:
773 logger.info(
774 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}"
775 )
776 return
778 logger.info(
779 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}"
780 )
782 # Log details of each research session
783 for i, research in enumerate(
784 research_sessions[:5]
785 ): # Log first 5 details
786 title_safe = (
787 (research.title[:50] + "...")
788 if research.title
789 else "No title"
790 )
791 completed_safe = (
792 research.completed_at
793 if research.completed_at
794 else "No completion time"
795 )
796 logger.debug(
797 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}"
798 )
800 # Handle completed_at which might be a string or datetime
801 completed_at_obj = None
802 if research.completed_at:
803 if isinstance(research.completed_at, str):
804 try:
805 completed_at_obj = datetime.fromisoformat(
806 research.completed_at.replace("Z", "+00:00")
807 )
808 except (ValueError, TypeError, AttributeError):
809 completed_at_obj = None
810 else:
811 completed_at_obj = research.completed_at
813 logger.debug(
814 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}"
815 )
816 logger.debug(
817 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}"
818 )
819 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}")
820 logger.debug(
821 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}"
822 )
824 processed_count = 0
825 for research in research_sessions:
826 try:
827 logger.info(
828 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}"
829 )
831 # Set search context so rate limiting works in both
832 # download_pdfs and extract_text paths
833 from ..utilities.thread_context import (
834 set_search_context,
835 )
837 set_search_context(
838 {
839 "research_id": str(research.id),
840 "username": username,
841 "user_password": password,
842 "research_phase": "document_scheduler",
843 }
844 )
846 # Call actual processing APIs
847 if settings.download_pdfs:
848 logger.info(
849 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}"
850 )
851 try:
852 # Use the DownloadService to queue PDF downloads
853 from ..research_library.services.download_service import (
854 DownloadService,
855 )
857 with DownloadService(
858 username=username, password=password
859 ) as download_service:
860 queued_count = download_service.queue_research_downloads(
861 research.id
862 )
863 logger.info(
864 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}"
865 )
866 except Exception:
867 # Recover the shared thread-local session
868 # before continuing — without rollback the
869 # next phase (text extract / RAG) and the
870 # post-loop last_run commit run on a
871 # poisoned session (issue #3827).
872 safe_rollback(db, "DOC_SCHEDULER PDF download")
873 logger.exception(
874 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}"
875 )
877 if settings.extract_text:
878 logger.info(
879 f"[DOC_SCHEDULER] Extracting text for research {research.id}"
880 )
881 try:
882 # Use the DownloadService to extract text for all resources
883 from ..research_library.services.download_service import (
884 DownloadService,
885 )
886 from ..database.models.research import (
887 ResearchResource,
888 )
890 from ..research_library.utils import (
891 is_downloadable_url,
892 )
894 with DownloadService(
895 username=username, password=password
896 ) as download_service:
897 # Get all resources for this research (reuse existing db session)
898 all_resources = (
899 db.query(ResearchResource)
900 .filter_by(research_id=research.id)
901 .all()
902 )
903 # Filter: only process downloadable resources (academic/PDF)
904 resources = [
905 r
906 for r in all_resources
907 if is_downloadable_url(r.url)
908 ]
909 processed_count = 0
910 for resource in resources:
911 # We need to pass the password to the download service
912 # The DownloadService creates its own database sessions, so we need to ensure password is available
913 try:
914 success, error = (
915 download_service.download_as_text(
916 resource.id
917 )
918 )
919 if success:
920 processed_count += 1
921 logger.info(
922 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}"
923 )
924 else:
925 logger.warning(
926 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}"
927 )
928 except Exception as resource_error:
929 # Roll back FIRST so the next
930 # iteration's queries don't
931 # cascade on a poisoned session
932 # (issue #3827).
933 safe_rollback(
934 db,
935 "DOC_SCHEDULER resource",
936 )
937 logger.exception(
938 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}"
939 )
940 logger.info(
941 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed"
942 )
943 except Exception:
944 safe_rollback(
945 db, "DOC_SCHEDULER text extraction"
946 )
947 logger.exception(
948 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}"
949 )
951 if settings.generate_rag:
952 logger.info(
953 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}"
954 )
955 try:
956 # Get embedding settings from user configuration
957 embedding_model = settings_manager.get_setting(
958 "local_search_embedding_model",
959 "all-MiniLM-L6-v2",
960 )
961 embedding_provider = (
962 settings_manager.get_setting(
963 "local_search_embedding_provider",
964 "sentence_transformers",
965 )
966 )
967 chunk_size = int(
968 settings_manager.get_setting(
969 "local_search_chunk_size", 1000
970 )
971 )
972 chunk_overlap = int(
973 settings_manager.get_setting(
974 "local_search_chunk_overlap", 200
975 )
976 )
978 # Initialize RAG service with user's embedding configuration
979 with LibraryRAGService(
980 username=username,
981 embedding_model=embedding_model,
982 embedding_provider=embedding_provider,
983 chunk_size=chunk_size,
984 chunk_overlap=chunk_overlap,
985 db_password=password,
986 ) as rag_service:
987 # Get default Library collection ID
988 library_collection_id = (
989 get_default_library_id(
990 username, password
991 )
992 )
994 # Query for unindexed documents from this research session
995 documents_to_index = (
996 db.query(Document.id, Document.title)
997 .outerjoin(
998 DocumentCollection,
999 (
1000 DocumentCollection.document_id
1001 == Document.id
1002 )
1003 & (
1004 DocumentCollection.collection_id
1005 == library_collection_id
1006 ),
1007 )
1008 .filter(
1009 Document.research_id == research.id,
1010 Document.text_content.isnot(None),
1011 (
1012 DocumentCollection.indexed.is_(
1013 False
1014 )
1015 | DocumentCollection.id.is_(
1016 None
1017 )
1018 ),
1019 )
1020 .all()
1021 )
1023 if not documents_to_index:
1024 logger.info(
1025 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}"
1026 )
1027 else:
1028 # Index each document
1029 indexed_count = 0
1030 for (
1031 doc_id,
1032 doc_title,
1033 ) in documents_to_index:
1034 try:
1035 result = rag_service.index_document(
1036 document_id=doc_id,
1037 collection_id=library_collection_id,
1038 force_reindex=False,
1039 )
1040 if ( 1040 ↛ 1030line 1040 didn't jump to line 1030 because the condition on line 1040 was always true
1041 result["status"]
1042 == "success"
1043 ):
1044 indexed_count += 1
1045 logger.info(
1046 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) "
1047 f"with {result.get('chunk_count', 0)} chunks"
1048 )
1049 except Exception as doc_error:
1050 logger.exception(
1051 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}"
1052 )
1054 logger.info(
1055 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: "
1056 f"{indexed_count}/{len(documents_to_index)} documents indexed"
1057 )
1058 except Exception:
1059 safe_rollback(db, "DOC_SCHEDULER RAG")
1060 logger.exception(
1061 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}"
1062 )
1064 processed_count += 1
1065 logger.debug(
1066 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}"
1067 )
1069 except Exception:
1070 safe_rollback(db, "DOC_SCHEDULER research")
1071 logger.exception(
1072 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}"
1073 )
1075 # Update last run time in user's settings.
1076 # Intentionally NOT wrapped in try/finally: if upstream setup
1077 # fails (DB open, SettingsManager init, initial query),
1078 # last_run should stay put so the next tick retries.
1079 # Advancing here would mask a persistent failure (corrupted
1080 # DB, wrong password). See closed PR #3288.
1081 current_time = datetime.now(UTC).isoformat()
1082 settings_manager.set_setting(
1083 "document_scheduler.last_run", current_time, commit=True
1084 )
1085 logger.debug(
1086 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}"
1087 )
1089 end_time = datetime.now(UTC)
1090 duration = (end_time - start_time).total_seconds()
1091 logger.info(
1092 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s"
1093 )
1095 except Exception:
1096 logger.exception(
1097 f"[DOC_SCHEDULER] Error processing documents for user {username}"
1098 )
1100 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]:
1101 """Get document scheduler status for a specific user."""
1102 try:
1103 session_info = self.user_sessions.get(username)
1104 if not session_info:
1105 return {
1106 "enabled": False,
1107 "message": "User not found in scheduler",
1108 }
1110 # Get user's document scheduler settings (cached)
1111 settings = self._get_document_scheduler_settings(username)
1113 # Check if user has document processing job
1114 job_id = f"{username}_document_processing"
1115 has_job = job_id in session_info.get("scheduled_jobs", set())
1117 return {
1118 "enabled": settings.enabled,
1119 "interval_seconds": settings.interval_seconds,
1120 "processing_options": {
1121 "download_pdfs": settings.download_pdfs,
1122 "extract_text": settings.extract_text,
1123 "generate_rag": settings.generate_rag,
1124 },
1125 "last_run": settings.last_run,
1126 "has_scheduled_job": has_job,
1127 "user_active": username in self.user_sessions,
1128 }
1130 except Exception as e:
1131 logger.exception(
1132 f"Error getting document scheduler status for user {username}"
1133 )
1134 return {
1135 "enabled": False,
1136 "message": f"Failed to retrieve scheduler status: {type(e).__name__}",
1137 }
1139 def trigger_document_processing(self, username: str) -> bool:
1140 """Trigger immediate document processing for a user."""
1141 logger.info(
1142 f"[DOC_SCHEDULER] Manual trigger requested for user {username}"
1143 )
1144 try:
1145 session_info = self.user_sessions.get(username)
1146 if not session_info:
1147 logger.warning(
1148 f"[DOC_SCHEDULER] User {username} not found in scheduler"
1149 )
1150 logger.debug(
1151 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}"
1152 )
1153 return False
1155 if not self.is_running:
1156 logger.warning(
1157 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}"
1158 )
1159 return False
1161 # Trigger immediate processing
1162 job_id = f"{username}_document_processing_manual"
1163 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}")
1165 self.scheduler.add_job(
1166 func=self._wrap_job(self._process_user_documents),
1167 args=[username],
1168 trigger="date",
1169 run_date=datetime.now(UTC) + timedelta(seconds=1),
1170 id=job_id,
1171 name=f"Manual Document Processing for {username}",
1172 replace_existing=True,
1173 )
1175 # Verify job was added
1176 job = self.scheduler.get_job(job_id)
1177 if job:
1178 logger.info(
1179 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}"
1180 )
1181 else:
1182 logger.error(
1183 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!"
1184 )
1185 return False
1187 return True
1189 except Exception:
1190 logger.exception(
1191 f"[DOC_SCHEDULER] Error triggering document processing for user {username}"
1192 )
1193 return False
1195 @thread_cleanup
1196 def _check_user_overdue_subscriptions(self, username: str):
1197 """Check and immediately run any overdue subscriptions for a user."""
1198 try:
1199 session_info = self.user_sessions.get(username)
1200 if not session_info:
1201 return
1203 password = self._credential_store.retrieve(username)
1204 if not password:
1205 return
1207 # Get user's overdue subscriptions
1208 from ..database.session_context import get_user_db_session
1209 from ..database.models.news import NewsSubscription
1210 from datetime import timezone
1212 with get_user_db_session(username, password) as db:
1213 now = datetime.now(timezone.utc)
1214 overdue_subs = (
1215 db.query(NewsSubscription)
1216 .filter(
1217 NewsSubscription.is_active.is_(True),
1218 NewsSubscription.next_refresh.is_not(None),
1219 NewsSubscription.next_refresh <= now,
1220 )
1221 .all()
1222 )
1224 if overdue_subs:
1225 logger.info(
1226 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1227 )
1229 for sub in overdue_subs:
1230 # Run immediately with small random delay
1231 # Security: random delay to stagger overdue jobs, not security-sensitive
1232 delay_seconds = random.randint(1, 30)
1233 job_id = (
1234 f"overdue_{username}_{sub.id}_{int(now.timestamp())}"
1235 )
1237 self.scheduler.add_job(
1238 func=self._wrap_job(self._check_subscription),
1239 args=[username, sub.id],
1240 trigger="date",
1241 run_date=now + timedelta(seconds=delay_seconds),
1242 id=job_id,
1243 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}",
1244 replace_existing=True,
1245 )
1247 logger.info(
1248 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds"
1249 )
1251 except Exception:
1252 logger.exception(
1253 f"Error checking overdue subscriptions for {username}"
1254 )
1256 @thread_cleanup
1257 def _check_subscription(self, username: str, subscription_id: int):
1258 """Check and refresh a single subscription."""
1259 logger.info(
1260 f"_check_subscription called for user {username}, subscription {subscription_id}"
1261 )
1262 try:
1263 session_info = self.user_sessions.get(username)
1264 if not session_info:
1265 # User no longer active, cancel job
1266 job_id = f"{username}_{subscription_id}"
1267 try:
1268 self.scheduler.remove_job(job_id)
1269 except JobLookupError:
1270 pass
1271 return
1273 password = self._credential_store.retrieve(username)
1274 if not password: 1274 ↛ 1275line 1274 didn't jump to line 1275 because the condition on line 1274 was never true
1275 logger.warning(
1276 f"Credentials expired for {username}, skipping subscription check"
1277 )
1278 return
1280 # Get subscription details
1281 from ..database.session_context import get_user_db_session
1282 from ..database.models.news import NewsSubscription
1284 with get_user_db_session(username, password) as db:
1285 sub = db.query(NewsSubscription).get(subscription_id)
1286 if not sub or not sub.is_active:
1287 logger.info(
1288 f"Subscription {subscription_id} not active, skipping"
1289 )
1290 return
1292 # Prepare query with date replacement using user's timezone
1293 query = sub.query_or_topic
1294 if "YYYY-MM-DD" in query:
1295 from local_deep_research.news.core.utils import (
1296 get_local_date_string,
1297 )
1298 from ..settings.manager import SettingsManager
1300 settings_manager = SettingsManager(db)
1301 local_date = get_local_date_string(settings_manager)
1302 query = query.replace("YYYY-MM-DD", local_date)
1304 # Update last/next refresh times
1305 sub.last_refresh = datetime.now(UTC)
1306 sub.next_refresh = datetime.now(UTC) + timedelta(
1307 minutes=sub.refresh_interval_minutes
1308 )
1309 db.commit()
1311 subscription_data = {
1312 "id": sub.id,
1313 "name": sub.name,
1314 "query": query,
1315 "original_query": sub.query_or_topic,
1316 "model_provider": sub.model_provider,
1317 "model": sub.model,
1318 "search_strategy": sub.search_strategy,
1319 "search_engine": sub.search_engine,
1320 }
1322 logger.info(
1323 f"Refreshing subscription {subscription_id}: {subscription_data['name']}"
1324 )
1326 # Trigger research synchronously using requests with proper auth
1327 self._trigger_subscription_research_sync(
1328 username, subscription_data
1329 )
1331 # Reschedule for next interval if using interval trigger
1332 job_id = f"{username}_{subscription_id}"
1333 job = self.scheduler.get_job(job_id)
1334 if job and job.trigger.__class__.__name__ == "DateTrigger":
1335 # For date triggers, reschedule
1336 # Security: random jitter to distribute subscription timing, not security-sensitive
1337 next_run = datetime.now(UTC) + timedelta(
1338 minutes=sub.refresh_interval_minutes,
1339 seconds=random.randint(
1340 0, int(self.config.get("max_jitter_seconds", 300))
1341 ),
1342 )
1343 self.scheduler.add_job(
1344 func=self._wrap_job(self._check_subscription),
1345 args=[username, subscription_id],
1346 trigger="date",
1347 run_date=next_run,
1348 id=job_id,
1349 replace_existing=True,
1350 )
1352 except Exception:
1353 logger.exception(f"Error checking subscription {subscription_id}")
1355 @thread_cleanup
1356 def _trigger_subscription_research_sync(
1357 self, username: str, subscription: Dict[str, Any]
1358 ):
1359 """Trigger research for a subscription using programmatic API."""
1360 from ..config.thread_settings import set_settings_context
1362 try:
1363 # Get user's password from session info
1364 session_info = self.user_sessions.get(username)
1365 if not session_info:
1366 logger.error(f"No session info for user {username}")
1367 return
1369 password = self._credential_store.retrieve(username)
1370 if not password: 1370 ↛ 1371line 1370 didn't jump to line 1371 because the condition on line 1370 was never true
1371 logger.error(f"Credentials expired for user {username}")
1372 return
1374 # Generate research ID
1375 import uuid
1377 research_id = str(uuid.uuid4())
1379 logger.info(
1380 f"Starting research {research_id} for subscription {subscription['id']}"
1381 )
1383 # Get user settings for research
1384 from ..database.session_context import get_user_db_session
1385 from ..settings.manager import SettingsManager
1387 with get_user_db_session(username, password) as db:
1388 settings_manager = SettingsManager(db)
1389 settings_snapshot = settings_manager.get_settings_snapshot()
1391 # Use the search engine from the subscription if specified
1392 search_engine = subscription.get("search_engine")
1394 if search_engine:
1395 settings_snapshot["search.tool"] = {
1396 "value": search_engine,
1397 "ui_element": "select",
1398 }
1399 logger.info(
1400 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}"
1401 )
1402 else:
1403 # Use the user's default search tool from their settings
1404 default_search_tool = settings_snapshot.get(
1405 "search.tool", "auto"
1406 )
1407 logger.info(
1408 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}"
1409 )
1411 logger.debug(
1412 f"Settings snapshot has {len(settings_snapshot)} settings"
1413 )
1414 # Log a few key settings to verify they're present
1415 logger.debug(
1416 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}"
1417 )
1419 # Set up research parameters
1420 query = subscription["query"]
1422 # Build metadata for news search
1423 metadata = {
1424 "is_news_search": True,
1425 "search_type": "news_analysis",
1426 "display_in": "news_feed",
1427 "subscription_id": subscription["id"],
1428 "triggered_by": "scheduler",
1429 "subscription_name": subscription["name"],
1430 "title": subscription["name"] if subscription["name"] else None,
1431 "scheduled_at": datetime.now(UTC).isoformat(),
1432 "original_query": subscription["original_query"],
1433 "user_id": username,
1434 }
1436 # Use programmatic API with settings context
1437 from ..api.research_functions import quick_summary
1439 # Create and set settings context for this thread
1440 settings_context = SnapshotSettingsContext(settings_snapshot)
1441 set_settings_context(settings_context)
1443 # Get search strategy from subscription data
1444 search_strategy = subscription.get("search_strategy")
1446 # Build kwargs for quick_summary, only including
1447 # search_strategy if the subscription specifies one.
1448 quick_summary_kwargs = {
1449 "query": query,
1450 "research_id": research_id,
1451 "username": username,
1452 "user_password": password,
1453 "settings_snapshot": settings_snapshot,
1454 "model_name": subscription.get("model"),
1455 "provider": subscription.get("model_provider"),
1456 "metadata": metadata,
1457 "search_original_query": False, # Don't send long subscription prompts to search engines
1458 }
1459 if search_strategy: 1459 ↛ 1462line 1459 didn't jump to line 1462 because the condition on line 1459 was always true
1460 quick_summary_kwargs["search_strategy"] = search_strategy
1462 result = quick_summary(**quick_summary_kwargs)
1464 logger.info(
1465 f"Completed research {research_id} for subscription {subscription['id']}"
1466 )
1468 # Store the research result in the database
1469 self._store_research_result(
1470 username,
1471 password,
1472 research_id,
1473 subscription["id"],
1474 result,
1475 subscription,
1476 )
1478 except Exception:
1479 logger.exception(
1480 f"Error triggering research for subscription {subscription['id']}"
1481 )
1483 def _store_research_result(
1484 self,
1485 username: str,
1486 password: str,
1487 research_id: str,
1488 subscription_id: int,
1489 result: Dict[str, Any],
1490 subscription: Dict[str, Any],
1491 ):
1492 """Store research result in database for news display."""
1493 try:
1494 from ..database.session_context import get_user_db_session
1495 from ..database.models import ResearchHistory
1496 from ..settings.manager import SettingsManager
1497 import json
1499 # Convert result to JSON-serializable format
1500 def make_serializable(obj):
1501 """Convert non-serializable objects to dictionaries."""
1502 if hasattr(obj, "dict"):
1503 return obj.dict()
1504 if hasattr(obj, "__dict__"): 1504 ↛ 1505line 1504 didn't jump to line 1505 because the condition on line 1504 was never true
1505 return {
1506 k: make_serializable(v)
1507 for k, v in obj.__dict__.items()
1508 if not k.startswith("_")
1509 }
1510 if isinstance(obj, (list, tuple)):
1511 return [make_serializable(item) for item in obj]
1512 if isinstance(obj, dict):
1513 return {k: make_serializable(v) for k, v in obj.items()}
1514 return obj
1516 serializable_result = make_serializable(result)
1518 with get_user_db_session(username, password) as db:
1519 # Get user settings to store in metadata
1520 settings_manager = SettingsManager(db)
1521 settings_snapshot = settings_manager.get_settings_snapshot()
1523 # Get the report content - check both 'report' and 'summary' fields
1524 report_content = serializable_result.get(
1525 "report"
1526 ) or serializable_result.get("summary")
1527 logger.debug(
1528 f"Report content length: {len(report_content) if report_content else 0} chars"
1529 )
1531 # Extract sources/links from the result. They get
1532 # persisted to research_resources AFTER history_entry
1533 # commits below (FK requires research_id to exist).
1534 sources = serializable_result.get("sources", [])
1536 # Then format citations in the report content
1537 if report_content:
1538 # Import citation formatter
1539 from ..text_optimization.citation_formatter import (
1540 CitationFormatter,
1541 CitationMode,
1542 )
1543 from ..config.search_config import (
1544 get_setting_from_snapshot,
1545 )
1547 # Get citation format from settings
1548 citation_format = get_setting_from_snapshot(
1549 "report.citation_format", "domain_id_hyperlinks"
1550 )
1551 mode_map = {
1552 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
1553 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
1554 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
1555 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
1556 "source_tagged_hyperlinks": CitationMode.SOURCE_TAGGED_HYPERLINKS,
1557 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
1558 }
1559 mode = mode_map.get(
1560 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS
1561 )
1562 formatter = CitationFormatter(mode=mode)
1564 # Format citations within the content
1565 report_content = formatter.format_document(report_content)
1567 if not report_content:
1568 # If neither field exists, use the full result as JSON
1569 report_content = json.dumps(serializable_result)
1571 # Generate headline and topics for news searches
1572 from ..news.utils.headline_generator import generate_headline
1573 from ..news.utils.topic_generator import generate_topics
1575 query_text = result.get(
1576 "query", subscription.get("query", "News Update")
1577 )
1579 # Generate headline from the actual research findings
1580 logger.info(
1581 f"Generating headline for subscription {subscription_id}"
1582 )
1583 generated_headline = generate_headline(
1584 query=query_text,
1585 findings=report_content,
1586 max_length=200, # Allow longer headlines for news
1587 )
1589 # Generate topics from the findings
1590 logger.info(
1591 f"Generating topics for subscription {subscription_id}"
1592 )
1593 generated_topics = generate_topics(
1594 query=query_text,
1595 findings=report_content,
1596 category=subscription.get("name", "News"),
1597 max_topics=6,
1598 )
1600 logger.info(
1601 f"Generated headline: {generated_headline}, topics: {generated_topics}"
1602 )
1604 # Get subscription name for metadata
1605 subscription_name = subscription.get("name", "")
1607 # Use generated headline as title, or fallback
1608 if generated_headline:
1609 title = generated_headline
1610 else:
1611 if subscription_name:
1612 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}"
1613 else:
1614 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}"
1616 # Create research history entry
1617 history_entry = ResearchHistory(
1618 id=research_id,
1619 query=result.get("query", ""),
1620 mode="news_subscription",
1621 status="completed",
1622 created_at=datetime.now(UTC).isoformat(),
1623 completed_at=datetime.now(UTC).isoformat(),
1624 title=title,
1625 research_meta={
1626 "subscription_id": subscription_id,
1627 "triggered_by": "scheduler",
1628 "is_news_search": True,
1629 "username": username,
1630 "subscription_name": subscription_name, # Store subscription name for display
1631 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval
1632 "generated_headline": generated_headline, # Store generated headline for news display
1633 "generated_topics": generated_topics, # Store topics for categorization
1634 },
1635 )
1636 db.add(history_entry)
1637 db.commit()
1639 # Persist sources to research_resources so the assembler
1640 # can rebuild the Sources block at render time. Was
1641 # previously written INLINE into report_content via a
1642 # "## Sources" tail — the report_content refactor moves
1643 # this to structured storage matching normal research.
1644 if sources:
1645 try:
1646 from ..web.services.research_sources_service import (
1647 ResearchSourcesService,
1648 )
1650 ResearchSourcesService.save_research_sources(
1651 research_id=research_id,
1652 sources=sources,
1653 username=username,
1654 )
1655 except Exception:
1656 logger.exception(
1657 "Failed to persist scheduler sources for "
1658 "research {} — assembler will render no Sources "
1659 "block for this row.",
1660 research_id,
1661 )
1663 # Store the report content using storage abstraction
1664 from ..storage import get_report_storage
1666 # Use storage to save the report (report_content already retrieved above)
1667 storage = get_report_storage(session=db)
1668 storage.save_report(
1669 research_id=research_id,
1670 content=report_content,
1671 username=username,
1672 )
1674 logger.info(
1675 f"Stored research result {research_id} for subscription {subscription_id}"
1676 )
1678 except Exception:
1679 logger.exception("Error storing research result")
1681 def _run_cleanup_with_tracking(self):
1682 """Wrapper that tracks cleanup execution."""
1684 try:
1685 cleaned_count = self._cleanup_inactive_users()
1687 logger.info(
1688 f"Cleanup successful: removed {cleaned_count} inactive users"
1689 )
1691 except Exception:
1692 logger.exception("Cleanup job failed")
1694 def _cleanup_inactive_users(self) -> int:
1695 """Remove users inactive for longer than retention period."""
1696 retention_hours = self.config.get("retention_hours", 48)
1697 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours)
1699 cleaned_count = 0
1701 with self.lock:
1702 inactive_users = [
1703 user_id
1704 for user_id, session in self.user_sessions.items()
1705 if session["last_activity"] < cutoff
1706 ]
1708 for user_id in inactive_users:
1709 # Remove all scheduled jobs
1710 for job_id in self.user_sessions[user_id][
1711 "scheduled_jobs"
1712 ].copy():
1713 try:
1714 self.scheduler.remove_job(job_id)
1715 except JobLookupError:
1716 pass
1718 # Clear credentials and session data
1719 self._credential_store.clear(user_id)
1720 del self.user_sessions[user_id]
1721 cleaned_count += 1
1722 logger.info(f"Cleaned up inactive user {user_id}")
1724 return cleaned_count
1726 def _reload_config(self):
1727 """Reload configuration from settings manager."""
1728 if not hasattr(self, "settings_manager") or not self.settings_manager:
1729 return
1731 try:
1732 old_retention = self.config.get("retention_hours", 48)
1734 # Reload all settings
1735 for key in self.config:
1736 if key == "enabled":
1737 continue # Don't change enabled state while running
1739 full_key = f"news.scheduler.{key}"
1740 self.config[key] = self._get_setting(full_key, self.config[key])
1742 # Handle changes that need immediate action
1743 if old_retention != self.config["retention_hours"]:
1744 logger.info(
1745 f"Retention period changed from {old_retention} "
1746 f"to {self.config['retention_hours']} hours"
1747 )
1748 # Trigger immediate cleanup with new retention
1749 self.scheduler.add_job(
1750 self._wrap_job(self._run_cleanup_with_tracking),
1751 "date",
1752 run_date=datetime.now(UTC) + timedelta(seconds=5),
1753 id="immediate_cleanup_config_change",
1754 )
1756 # Clear settings cache to pick up any user setting changes
1757 self.invalidate_all_settings_cache()
1759 except Exception:
1760 logger.exception("Error reloading configuration")
1762 def get_status(self) -> Dict[str, Any]:
1763 """Get scheduler status information."""
1764 with self.lock:
1765 active_users = len(self.user_sessions)
1766 total_jobs = sum(
1767 len(session["scheduled_jobs"])
1768 for session in self.user_sessions.values()
1769 )
1771 # Get next run time for cleanup job
1772 next_cleanup = None
1773 if self.is_running:
1774 job = self.scheduler.get_job("cleanup_inactive_users")
1775 if job: 1775 ↛ 1778line 1775 didn't jump to line 1778 because the condition on line 1775 was always true
1776 next_cleanup = job.next_run_time
1778 return {
1779 "is_running": self.is_running,
1780 "config": self.config,
1781 "active_users": active_users,
1782 "total_scheduled_jobs": total_jobs,
1783 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None,
1784 "memory_usage": self._estimate_memory_usage(),
1785 }
1787 def _estimate_memory_usage(self) -> int:
1788 """Estimate memory usage of user sessions."""
1790 # Rough estimate: username (50) + password (100) + metadata (200) per user
1791 per_user_estimate = 350
1792 return len(self.user_sessions) * per_user_estimate
1794 def get_user_sessions_summary(self) -> List[Dict[str, Any]]:
1795 """Get summary of active user sessions (without passwords)."""
1796 with self.lock:
1797 summary = []
1798 for user_id, session in self.user_sessions.items():
1799 summary.append(
1800 {
1801 "user_id": user_id,
1802 "last_activity": session["last_activity"].isoformat(),
1803 "scheduled_jobs": len(session["scheduled_jobs"]),
1804 "time_since_activity": str(
1805 datetime.now(UTC) - session["last_activity"]
1806 ),
1807 }
1808 )
1809 return summary
1812# Singleton instance getter
1813_scheduler_instance = None
1816def get_background_job_scheduler() -> BackgroundJobScheduler:
1817 """Get the singleton news scheduler instance."""
1818 global _scheduler_instance
1819 if _scheduler_instance is None:
1820 _scheduler_instance = BackgroundJobScheduler()
1821 return _scheduler_instance