Coverage for src / local_deep_research / news / subscription_manager / scheduler.py: 24%
586 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Activity-based news subscription scheduler for per-user encrypted databases.
3Tracks user activity and temporarily stores credentials for automatic updates.
4"""
6import random
7import threading
8from datetime import datetime, timedelta, UTC
9from typing import Any, Dict, List
11from loguru import logger
12from local_deep_research.settings.logger import log_settings
14from apscheduler.schedulers.background import BackgroundScheduler
15from apscheduler.jobstores.base import JobLookupError
17# RAG indexing imports
18from ...research_library.services.library_rag_service import LibraryRAGService
19from ...database.library_init import get_default_library_id
20from ...database.models.library import Document, DocumentCollection
23SCHEDULER_AVAILABLE = True # Always available since it's a required dependency
26class NewsScheduler:
27 """
28 Singleton scheduler that manages news subscriptions for active users.
30 This scheduler:
31 - Monitors user activity through database access
32 - Temporarily stores user credentials in memory
33 - Automatically schedules subscription checks
34 - Cleans up inactive users after configurable period
35 """
37 _instance = None
38 _lock = threading.Lock()
40 def __new__(cls):
41 """Ensure singleton instance."""
42 if cls._instance is None: 42 ↛ 46line 42 didn't jump to line 46 because the condition on line 42 was always true
43 with cls._lock:
44 if cls._instance is None: 44 ↛ 46line 44 didn't jump to line 46
45 cls._instance = super().__new__(cls)
46 return cls._instance
48 def __init__(self):
49 """Initialize the scheduler (only runs once due to singleton)."""
50 # Skip if already initialized
51 if hasattr(self, "_initialized"): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 return
54 # User session tracking
55 self.user_sessions = {} # user_id -> {password, last_activity, scheduled_jobs}
56 self.lock = threading.Lock()
58 # Scheduler instance
59 self.scheduler = BackgroundScheduler()
61 # Configuration (will be loaded from settings)
62 self.config = self._load_default_config()
64 # State
65 self.is_running = False
67 self._initialized = True
68 logger.info("News scheduler initialized")
70 def _load_default_config(self) -> Dict[str, Any]:
71 """Load default configuration (will be overridden by settings manager)."""
72 return {
73 "enabled": True,
74 "retention_hours": 48,
75 "cleanup_interval_hours": 1,
76 "max_jitter_seconds": 300,
77 "max_concurrent_jobs": 10,
78 "subscription_batch_size": 5,
79 "activity_check_interval_minutes": 5,
80 }
82 def initialize_with_settings(self, settings_manager):
83 """Initialize configuration from settings manager."""
84 try:
85 # Load all scheduler settings
86 self.settings_manager = settings_manager
87 self.config = {
88 "enabled": self._get_setting("news.scheduler.enabled", True),
89 "retention_hours": self._get_setting(
90 "news.scheduler.retention_hours", 48
91 ),
92 "cleanup_interval_hours": self._get_setting(
93 "news.scheduler.cleanup_interval_hours", 1
94 ),
95 "max_jitter_seconds": self._get_setting(
96 "news.scheduler.max_jitter_seconds", 300
97 ),
98 "max_concurrent_jobs": self._get_setting(
99 "news.scheduler.max_concurrent_jobs", 10
100 ),
101 "subscription_batch_size": self._get_setting(
102 "news.scheduler.batch_size", 5
103 ),
104 "activity_check_interval_minutes": self._get_setting(
105 "news.scheduler.activity_check_interval", 5
106 ),
107 }
108 log_settings(self.config, "Scheduler configuration loaded")
109 except Exception:
110 logger.exception("Error loading scheduler settings")
111 # Keep default config
113 def _get_setting(self, key: str, default: Any) -> Any:
114 """Get setting with fallback to default."""
115 if hasattr(self, "settings_manager") and self.settings_manager: 115 ↛ 117line 115 didn't jump to line 117 because the condition on line 115 was always true
116 return self.settings_manager.get_setting(key, default=default)
117 return default
119 def start(self):
120 """Start the scheduler."""
121 if not self.config.get("enabled", True): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 logger.info("News scheduler is disabled in settings")
123 return
125 if self.is_running:
126 logger.warning("Scheduler is already running")
127 return
129 # Schedule cleanup job
130 self.scheduler.add_job(
131 self._run_cleanup_with_tracking,
132 "interval",
133 hours=self.config["cleanup_interval_hours"],
134 id="cleanup_inactive_users",
135 name="Cleanup Inactive User Sessions",
136 jitter=60, # Add some jitter to cleanup
137 )
139 # Schedule configuration reload
140 self.scheduler.add_job(
141 self._reload_config,
142 "interval",
143 minutes=30,
144 id="reload_config",
145 name="Reload Configuration",
146 )
148 # Start the scheduler
149 self.scheduler.start()
150 self.is_running = True
152 # Schedule initial cleanup after a delay
153 self.scheduler.add_job(
154 self._run_cleanup_with_tracking,
155 "date",
156 run_date=datetime.now(UTC) + timedelta(seconds=30),
157 id="initial_cleanup",
158 )
160 logger.info("News scheduler started")
162 def stop(self):
163 """Stop the scheduler."""
164 if self.is_running:
165 self.scheduler.shutdown(wait=True)
166 self.is_running = False
168 # Clear all user sessions
169 with self.lock:
170 self.user_sessions.clear()
172 logger.info("News scheduler stopped")
174 def update_user_info(self, username: str, password: str):
175 """
176 Update user info in scheduler. Called on every database interaction.
178 Args:
179 username: User's username
180 password: User's password
181 """
182 logger.info(
183 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}"
184 )
185 logger.debug(
186 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}"
187 )
189 if not self.is_running: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 logger.warning(
191 f"[SCHEDULER] Scheduler not running, cannot update user {username}"
192 )
193 return
195 with self.lock:
196 now = datetime.now(UTC)
198 if username not in self.user_sessions:
199 # New user - create session info
200 logger.info(f"[SCHEDULER] New user in scheduler: {username}")
201 self.user_sessions[username] = {
202 "password": password,
203 "last_activity": now,
204 "scheduled_jobs": set(),
205 }
206 logger.debug(
207 f"[SCHEDULER] Created session for {username}, scheduling subscriptions"
208 )
209 # Schedule their subscriptions
210 self._schedule_user_subscriptions(username)
211 else:
212 # Existing user - update info
213 logger.info(
214 f"[SCHEDULER] Updating existing user {username} activity, will reschedule"
215 )
216 old_activity = self.user_sessions[username]["last_activity"]
217 activity_delta = now - old_activity
218 logger.debug(
219 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}"
220 )
222 self.user_sessions[username]["password"] = password
223 self.user_sessions[username]["last_activity"] = now
224 logger.debug(
225 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions"
226 )
227 # Reschedule their subscriptions in case they changed
228 self._schedule_user_subscriptions(username)
230 def unregister_user(self, username: str):
231 """
232 Unregister a user and clean up their scheduled jobs.
233 Called when user logs out.
234 """
235 with self.lock:
236 if username in self.user_sessions:
237 logger.info(f"Unregistering user {username}")
239 # Remove all scheduled jobs for this user
240 session_info = self.user_sessions[username]
241 for job_id in session_info["scheduled_jobs"].copy():
242 try:
243 self.scheduler.remove_job(job_id)
244 except JobLookupError:
245 pass
247 # Remove user session
248 del self.user_sessions[username]
249 logger.info(f"User {username} unregistered successfully")
251 def _schedule_user_subscriptions(self, username: str):
252 """Schedule all active subscriptions for a user."""
253 logger.info(f"_schedule_user_subscriptions called for {username}")
254 try:
255 session_info = self.user_sessions.get(username)
256 if not session_info: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 logger.warning(f"No session info found for {username}")
258 return
260 password = session_info["password"]
261 logger.debug(
262 f"Got password for {username}, length: {len(password) if password else 0}"
263 )
265 # Get user's subscriptions from their encrypted database
266 from ...database.session_context import get_user_db_session
267 from ...database.models.news import NewsSubscription
269 with get_user_db_session(username, password) as db:
270 subscriptions = (
271 db.query(NewsSubscription).filter_by(is_active=True).all()
272 )
273 logger.debug(
274 f"Query executed, found {len(subscriptions)} results"
275 )
277 # Log details of each subscription
278 for sub in subscriptions: 278 ↛ 279line 278 didn't jump to line 279 because the loop on line 278 never started
279 logger.debug(
280 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes"
281 )
283 logger.info(
284 f"Found {len(subscriptions)} active subscriptions for {username}"
285 )
287 # Clear old jobs for this user
288 for job_id in session_info["scheduled_jobs"].copy():
289 try:
290 self.scheduler.remove_job(job_id)
291 session_info["scheduled_jobs"].remove(job_id)
292 except JobLookupError:
293 pass
295 # Schedule each subscription with jitter
296 for sub in subscriptions: 296 ↛ 297line 296 didn't jump to line 297 because the loop on line 296 never started
297 job_id = f"{username}_{sub.id}"
299 # Calculate jitter
300 max_jitter = int(self.config.get("max_jitter_seconds", 300))
301 jitter = random.randint(0, max_jitter)
303 # Determine trigger based on frequency
304 refresh_minutes = sub.refresh_interval_minutes
306 if refresh_minutes <= 60: # 60 minutes or less
307 # For hourly or more frequent, use interval trigger
308 trigger = "interval"
309 trigger_args = {
310 "minutes": refresh_minutes,
311 "jitter": jitter,
312 "start_date": datetime.now(UTC), # Start immediately
313 }
314 else:
315 # For less frequent, calculate next run time
316 now = datetime.now(UTC)
317 if sub.next_refresh:
318 # Convert to timezone-aware for comparison
319 next_refresh_aware = sub.next_refresh.replace(
320 tzinfo=None
321 )
322 if next_refresh_aware <= now:
323 # Subscription is overdue - run it immediately with small jitter
324 logger.info(
325 f"Subscription {sub.id} is overdue, scheduling immediate run"
326 )
327 next_run = now + timedelta(seconds=jitter)
328 else:
329 next_run = next_refresh_aware
330 else:
331 next_run = now + timedelta(
332 minutes=refresh_minutes, seconds=jitter
333 )
335 trigger = "date"
336 trigger_args = {"run_date": next_run}
338 # Add the job
339 self.scheduler.add_job(
340 func=self._check_subscription,
341 args=[username, sub.id],
342 trigger=trigger,
343 id=job_id,
344 name=f"Check {sub.name or sub.query_or_topic[:30]}",
345 replace_existing=True,
346 **trigger_args,
347 )
349 session_info["scheduled_jobs"].add(job_id)
350 logger.info(f"Scheduled job {job_id} with {trigger} trigger")
352 except Exception as e:
353 logger.exception(
354 f"Error scheduling subscriptions for {username}: {e}"
355 )
357 # Add document processing for this user
358 self._schedule_document_processing(username)
360 def _schedule_document_processing(self, username: str):
361 """Schedule document processing for a user."""
362 logger.info(
363 f"[DOC_SCHEDULER] Scheduling document processing for {username}"
364 )
365 logger.debug(
366 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}"
367 )
369 try:
370 session_info = self.user_sessions.get(username)
371 if not session_info: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 logger.warning(
373 f"[DOC_SCHEDULER] No session info found for {username}"
374 )
375 logger.debug(
376 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}"
377 )
378 return
380 password = session_info["password"]
381 logger.debug(
382 f"[DOC_SCHEDULER] Retrieved password for {username}, scheduler running: {self.is_running}"
383 )
385 # Get user's document scheduler settings from their database
386 from ...database.session_context import get_user_db_session
387 from ...settings.manager import SettingsManager
389 logger.debug(
390 f"[DOC_SCHEDULER] Connecting to database for {username} settings"
391 )
392 with get_user_db_session(username, password) as db:
393 settings_manager = SettingsManager(db)
394 logger.debug(
395 "[DOC_SCHEDULER] Connected to database, retrieving settings"
396 )
398 # Check if document processing is enabled for this user
399 enabled = settings_manager.get_setting(
400 "document_scheduler.enabled", True
401 )
402 logger.info(
403 f"[DOC_SCHEDULER] Document scheduler enabled for {username}: {enabled}"
404 )
406 if not enabled: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true
407 logger.info(
408 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}"
409 )
410 return
412 # Get processing settings
413 interval_seconds = settings_manager.get_setting(
414 "document_scheduler.interval_seconds", 1800
415 )
416 download_pdfs = settings_manager.get_setting(
417 "document_scheduler.download_pdfs", False
418 )
419 extract_text = settings_manager.get_setting(
420 "document_scheduler.extract_text", True
421 )
422 generate_rag = settings_manager.get_setting(
423 "document_scheduler.generate_rag", False
424 )
426 logger.info(
427 f"[DOC_SCHEDULER] User {username} document settings: enabled={enabled}, interval={interval_seconds}s, "
428 f"pdfs={download_pdfs}, text={extract_text}, rag={generate_rag}"
429 )
431 # Schedule document processing job
432 job_id = f"{username}_document_processing"
433 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}")
435 # Remove existing document job if any
436 try:
437 self.scheduler.remove_job(job_id)
438 session_info["scheduled_jobs"].discard(job_id)
439 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}")
440 except JobLookupError:
441 logger.debug(
442 f"[DOC_SCHEDULER] No existing job {job_id} to remove"
443 )
444 pass # Job doesn't exist, that's fine
446 # Add new document processing job
447 logger.debug(
448 f"[DOC_SCHEDULER] Adding new document processing job with interval {interval_seconds}s"
449 )
450 self.scheduler.add_job(
451 func=self._process_user_documents,
452 args=[username],
453 trigger="interval",
454 seconds=interval_seconds,
455 id=job_id,
456 name=f"Process Documents for {username}",
457 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously
458 max_instances=1, # Prevent overlapping document processing for same user
459 replace_existing=True,
460 )
462 session_info["scheduled_jobs"].add(job_id)
463 logger.info(
464 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {interval_seconds}s interval"
465 )
466 logger.debug(
467 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}"
468 )
470 # Verify job was added
471 job = self.scheduler.get_job(job_id)
472 if job: 472 ↛ 477line 472 didn't jump to line 477 because the condition on line 472 was always true
473 logger.info(
474 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}"
475 )
476 else:
477 logger.error(
478 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!"
479 )
481 except Exception as e:
482 logger.exception(
483 f"Error scheduling document processing for {username}: {e}"
484 )
486 def _process_user_documents(self, username: str):
487 """Process documents for a user."""
488 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}")
489 start_time = datetime.now(UTC)
491 try:
492 session_info = self.user_sessions.get(username)
493 if not session_info:
494 logger.warning(
495 f"[DOC_SCHEDULER] No session info found for user {username}"
496 )
497 return
499 password = session_info["password"]
500 logger.debug(
501 f"[DOC_SCHEDULER] Starting document processing for {username}"
502 )
504 # Get user's settings from their database
505 from ...database.session_context import get_user_db_session
506 from ...database.models.research import ResearchHistory
507 from ...settings.manager import SettingsManager
509 logger.debug(
510 f"[DOC_SCHEDULER] Connecting to database for {username}"
511 )
512 with get_user_db_session(username, password) as db:
513 settings_manager = SettingsManager(db)
514 logger.debug(
515 "[DOC_SCHEDULER] Connected to database for document processing"
516 )
518 # Get processing settings
519 download_pdfs = settings_manager.get_setting(
520 "document_scheduler.download_pdfs", False
521 )
522 extract_text = settings_manager.get_setting(
523 "document_scheduler.extract_text", True
524 )
525 generate_rag = settings_manager.get_setting(
526 "document_scheduler.generate_rag", False
527 )
529 logger.info(
530 f"[DOC_SCHEDULER] Processing settings for {username}: pdfs={download_pdfs}, text={extract_text}, rag={generate_rag}"
531 )
533 if not any([download_pdfs, extract_text, generate_rag]):
534 logger.info(
535 f"[DOC_SCHEDULER] No processing options enabled for user {username}"
536 )
537 return
539 # Get completed research sessions since last run
540 last_run_str = settings_manager.get_setting(
541 "document_scheduler.last_run", ""
542 )
543 last_run = (
544 datetime.fromisoformat(last_run_str)
545 if last_run_str
546 else None
547 )
549 logger.info(
550 f"[DOC_SCHEDULER] Last run for {username}: {last_run}"
551 )
553 # Query for completed research since last run
554 logger.debug(
555 f"[DOC_SCHEDULER] Querying for completed research since {last_run}"
556 )
557 query = db.query(ResearchHistory).filter(
558 ResearchHistory.status == "completed",
559 ResearchHistory.completed_at.is_not(
560 None
561 ), # Ensure completed_at is not null
562 )
564 if last_run:
565 query = query.filter(
566 ResearchHistory.completed_at > last_run
567 )
569 # Limit to recent research to prevent overwhelming
570 query = query.order_by(
571 ResearchHistory.completed_at.desc()
572 ).limit(20)
574 research_sessions = query.all()
575 logger.debug(
576 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions"
577 )
579 if not research_sessions:
580 logger.info(
581 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}"
582 )
583 return
585 logger.info(
586 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}"
587 )
589 # Log details of each research session
590 for i, research in enumerate(
591 research_sessions[:5]
592 ): # Log first 5 details
593 title_safe = (
594 (research.title[:50] + "...")
595 if research.title
596 else "No title"
597 )
598 completed_safe = (
599 research.completed_at
600 if research.completed_at
601 else "No completion time"
602 )
603 logger.debug(
604 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}"
605 )
607 # Handle completed_at which might be a string or datetime
608 completed_at_obj = None
609 if research.completed_at:
610 if isinstance(research.completed_at, str):
611 try:
612 completed_at_obj = datetime.fromisoformat(
613 research.completed_at.replace("Z", "+00:00")
614 )
615 except:
616 completed_at_obj = None
617 else:
618 completed_at_obj = research.completed_at
620 logger.debug(
621 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}"
622 )
623 logger.debug(
624 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}"
625 )
626 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}")
627 logger.debug(
628 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}"
629 )
631 processed_count = 0
632 for research in research_sessions:
633 try:
634 logger.info(
635 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}"
636 )
638 # Call actual processing APIs
639 if download_pdfs:
640 logger.info(
641 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}"
642 )
643 try:
644 # Use the DownloadService to queue PDF downloads
645 from ...research_library.services.download_service import (
646 DownloadService,
647 )
649 download_service = DownloadService(
650 username=username, password=password
651 )
652 queued_count = (
653 download_service.queue_research_downloads(
654 research.id
655 )
656 )
657 logger.info(
658 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}"
659 )
660 except Exception as e:
661 logger.exception(
662 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}: {e}"
663 )
665 if extract_text:
666 logger.info(
667 f"[DOC_SCHEDULER] Extracting text for research {research.id}"
668 )
669 try:
670 # Use the DownloadService to extract text for all resources
671 from ...research_library.services.download_service import (
672 DownloadService,
673 )
674 from ...database.models.research import (
675 ResearchResource,
676 )
678 download_service = DownloadService(
679 username=username, password=password
680 )
682 # Get all resources for this research (reuse existing db session)
683 resources = (
684 db.query(ResearchResource)
685 .filter_by(research_id=research.id)
686 .all()
687 )
688 processed_count = 0
689 for resource in resources:
690 # We need to pass the password to the download service
691 # The DownloadService creates its own database sessions, so we need to ensure password is available
692 try:
693 success, error = (
694 download_service.download_as_text(
695 resource.id
696 )
697 )
698 if success:
699 processed_count += 1
700 logger.info(
701 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}"
702 )
703 else:
704 logger.warning(
705 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}"
706 )
707 except Exception as resource_error:
708 logger.exception(
709 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}"
710 )
711 logger.info(
712 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed"
713 )
714 except Exception as e:
715 logger.exception(
716 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}: {e}"
717 )
719 if generate_rag:
720 logger.info(
721 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}"
722 )
723 try:
724 # Get embedding settings from user configuration
725 embedding_model = settings_manager.get_setting(
726 "local_search_embedding_model",
727 "all-MiniLM-L6-v2",
728 )
729 embedding_provider = (
730 settings_manager.get_setting(
731 "local_search_embedding_provider",
732 "sentence_transformers",
733 )
734 )
735 chunk_size = int(
736 settings_manager.get_setting(
737 "local_search_chunk_size", 1000
738 )
739 )
740 chunk_overlap = int(
741 settings_manager.get_setting(
742 "local_search_chunk_overlap", 200
743 )
744 )
746 # Initialize RAG service with user's embedding configuration
747 rag_service = LibraryRAGService(
748 username=username,
749 embedding_model=embedding_model,
750 embedding_provider=embedding_provider,
751 chunk_size=chunk_size,
752 chunk_overlap=chunk_overlap,
753 )
755 # Get default Library collection ID
756 library_collection_id = get_default_library_id(
757 username
758 )
760 # Query for unindexed documents from this research session
761 documents_to_index = (
762 db.query(Document.id, Document.title)
763 .outerjoin(
764 DocumentCollection,
765 (
766 DocumentCollection.document_id
767 == Document.id
768 )
769 & (
770 DocumentCollection.collection_id
771 == library_collection_id
772 ),
773 )
774 .filter(
775 Document.research_id == research.id,
776 Document.text_content.isnot(None),
777 (
778 DocumentCollection.indexed.is_(
779 False
780 )
781 | DocumentCollection.id.is_(None)
782 ),
783 )
784 .all()
785 )
787 if not documents_to_index:
788 logger.info(
789 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}"
790 )
791 else:
792 # Index each document
793 indexed_count = 0
794 for doc_id, doc_title in documents_to_index:
795 try:
796 result = rag_service.index_document(
797 document_id=doc_id,
798 collection_id=library_collection_id,
799 force_reindex=False,
800 )
801 if result["status"] == "success":
802 indexed_count += 1
803 logger.info(
804 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) "
805 f"with {result.get('chunk_count', 0)} chunks"
806 )
807 except Exception as doc_error:
808 logger.exception(
809 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}"
810 )
812 logger.info(
813 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: "
814 f"{indexed_count}/{len(documents_to_index)} documents indexed"
815 )
816 except Exception as e:
817 logger.exception(
818 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}: {e}"
819 )
821 processed_count += 1
822 logger.debug(
823 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}"
824 )
826 except Exception as e:
827 logger.exception(
828 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}: {e}"
829 )
831 # Update last run time in user's settings
832 current_time = datetime.now(UTC).isoformat()
833 settings_manager.set_setting(
834 "document_scheduler.last_run", current_time, commit=True
835 )
836 logger.debug(
837 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}"
838 )
840 end_time = datetime.now(UTC)
841 duration = (end_time - start_time).total_seconds()
842 logger.info(
843 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s"
844 )
846 except Exception as e:
847 logger.exception(
848 f"[DOC_SCHEDULER] Error processing documents for user {username}: {e}"
849 )
851 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]:
852 """Get document scheduler status for a specific user."""
853 try:
854 session_info = self.user_sessions.get(username)
855 if not session_info:
856 return {
857 "enabled": False,
858 "message": "User not found in scheduler",
859 }
861 password = session_info["password"]
863 # Get user's settings
864 from ...database.session_context import get_user_db_session
865 from ...settings.manager import SettingsManager
867 with get_user_db_session(username, password) as db:
868 settings_manager = SettingsManager(db)
870 # Get configuration
871 enabled = settings_manager.get_setting(
872 "document_scheduler.enabled", True
873 )
874 interval_seconds = settings_manager.get_setting(
875 "document_scheduler.interval_seconds", 1800
876 )
877 download_pdfs = settings_manager.get_setting(
878 "document_scheduler.download_pdfs", False
879 )
880 extract_text = settings_manager.get_setting(
881 "document_scheduler.extract_text", True
882 )
883 generate_rag = settings_manager.get_setting(
884 "document_scheduler.generate_rag", False
885 )
886 last_run = settings_manager.get_setting(
887 "document_scheduler.last_run", ""
888 )
890 # Check if user has document processing job
891 job_id = f"{username}_document_processing"
892 has_job = job_id in session_info.get("scheduled_jobs", set())
894 return {
895 "enabled": enabled,
896 "interval_seconds": interval_seconds,
897 "processing_options": {
898 "download_pdfs": download_pdfs,
899 "extract_text": extract_text,
900 "generate_rag": generate_rag,
901 },
902 "last_run": last_run,
903 "has_scheduled_job": has_job,
904 "user_active": username in self.user_sessions,
905 }
907 except Exception as e:
908 logger.exception(
909 f"Error getting document scheduler status for user {username}"
910 )
911 return {
912 "enabled": False,
913 "message": f"Failed to retrieve scheduler status: {type(e).__name__}",
914 }
916 def trigger_document_processing(self, username: str) -> bool:
917 """Trigger immediate document processing for a user."""
918 logger.info(
919 f"[DOC_SCHEDULER] Manual trigger requested for user {username}"
920 )
921 try:
922 session_info = self.user_sessions.get(username)
923 if not session_info:
924 logger.warning(
925 f"[DOC_SCHEDULER] User {username} not found in scheduler"
926 )
927 logger.debug(
928 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}"
929 )
930 return False
932 if not self.is_running:
933 logger.warning(
934 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}"
935 )
936 return False
938 # Trigger immediate processing
939 job_id = f"{username}_document_processing_manual"
940 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}")
942 self.scheduler.add_job(
943 func=self._process_user_documents,
944 args=[username],
945 trigger="date",
946 run_date=datetime.now(UTC) + timedelta(seconds=1),
947 id=job_id,
948 name=f"Manual Document Processing for {username}",
949 replace_existing=True,
950 )
952 # Verify job was added
953 job = self.scheduler.get_job(job_id)
954 if job:
955 logger.info(
956 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}"
957 )
958 else:
959 logger.error(
960 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!"
961 )
962 return False
964 return True
966 except Exception as e:
967 logger.exception(
968 f"[DOC_SCHEDULER] Error triggering document processing for user {username}: {e}"
969 )
970 return False
972 def _check_user_overdue_subscriptions(self, username: str):
973 """Check and immediately run any overdue subscriptions for a user."""
974 try:
975 session_info = self.user_sessions.get(username)
976 if not session_info:
977 return
979 password = session_info["password"]
981 # Get user's overdue subscriptions
982 from ...database.session_context import get_user_db_session
983 from ...database.models.news import NewsSubscription
984 from datetime import timezone
986 with get_user_db_session(username, password) as db:
987 now = datetime.now(timezone.utc)
988 overdue_subs = (
989 db.query(NewsSubscription)
990 .filter(
991 NewsSubscription.is_active.is_(True),
992 NewsSubscription.next_refresh.is_not(None),
993 NewsSubscription.next_refresh <= now,
994 )
995 .all()
996 )
998 if overdue_subs:
999 logger.info(
1000 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1001 )
1003 for sub in overdue_subs:
1004 # Run immediately with small random delay
1005 delay_seconds = random.randint(1, 30)
1006 job_id = (
1007 f"overdue_{username}_{sub.id}_{int(now.timestamp())}"
1008 )
1010 self.scheduler.add_job(
1011 func=self._check_subscription,
1012 args=[username, sub.id],
1013 trigger="date",
1014 run_date=now + timedelta(seconds=delay_seconds),
1015 id=job_id,
1016 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}",
1017 replace_existing=True,
1018 )
1020 logger.info(
1021 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds"
1022 )
1024 except Exception as e:
1025 logger.exception(
1026 f"Error checking overdue subscriptions for {username}: {e}"
1027 )
1029 def _check_subscription(self, username: str, subscription_id: int):
1030 """Check and refresh a single subscription."""
1031 logger.info(
1032 f"_check_subscription called for user {username}, subscription {subscription_id}"
1033 )
1034 try:
1035 session_info = self.user_sessions.get(username)
1036 if not session_info:
1037 # User no longer active, cancel job
1038 job_id = f"{username}_{subscription_id}"
1039 try:
1040 self.scheduler.remove_job(job_id)
1041 except JobLookupError:
1042 pass
1043 return
1045 password = session_info["password"]
1047 # Get subscription details
1048 from ...database.session_context import get_user_db_session
1049 from ...database.models.news import NewsSubscription
1051 with get_user_db_session(username, password) as db:
1052 sub = db.query(NewsSubscription).get(subscription_id)
1053 if not sub or not sub.is_active:
1054 logger.info(
1055 f"Subscription {subscription_id} not active, skipping"
1056 )
1057 return
1059 # Prepare query with date replacement using user's timezone
1060 query = sub.query_or_topic
1061 if "YYYY-MM-DD" in query:
1062 from ..core.utils import get_local_date_string
1063 from ...settings.manager import SettingsManager
1065 settings_manager = SettingsManager(db)
1066 local_date = get_local_date_string(settings_manager)
1067 query = query.replace("YYYY-MM-DD", local_date)
1069 # Update last/next refresh times
1070 sub.last_refresh = datetime.now(UTC)
1071 sub.next_refresh = datetime.now(UTC) + timedelta(
1072 minutes=sub.refresh_interval_minutes
1073 )
1074 db.commit()
1076 subscription_data = {
1077 "id": sub.id,
1078 "name": sub.name,
1079 "query": query,
1080 "original_query": sub.query_or_topic,
1081 "model_provider": sub.model_provider,
1082 "model": sub.model,
1083 "search_strategy": sub.search_strategy,
1084 "search_engine": sub.search_engine,
1085 }
1087 logger.info(
1088 f"Refreshing subscription {subscription_id}: {subscription_data['name']}"
1089 )
1091 # Trigger research synchronously using requests with proper auth
1092 self._trigger_subscription_research_sync(
1093 username, subscription_data
1094 )
1096 # Reschedule for next interval if using interval trigger
1097 job_id = f"{username}_{subscription_id}"
1098 job = self.scheduler.get_job(job_id)
1099 if job and job.trigger.__class__.__name__ == "DateTrigger":
1100 # For date triggers, reschedule
1101 next_run = datetime.now(UTC) + timedelta(
1102 minutes=sub.refresh_interval_minutes,
1103 seconds=random.randint(
1104 0, int(self.config.get("max_jitter_seconds", 300))
1105 ),
1106 )
1107 self.scheduler.add_job(
1108 func=self._check_subscription,
1109 args=[username, subscription_id],
1110 trigger="date",
1111 run_date=next_run,
1112 id=job_id,
1113 replace_existing=True,
1114 )
1116 except Exception as e:
1117 logger.exception(
1118 f"Error checking subscription {subscription_id}: {e}"
1119 )
1121 def _trigger_subscription_research_sync(
1122 self, username: str, subscription: Dict[str, Any]
1123 ):
1124 """Trigger research for a subscription using programmatic API."""
1125 try:
1126 # Get user's password from session info
1127 session_info = self.user_sessions.get(username)
1128 if not session_info:
1129 logger.error(f"No session info for user {username}")
1130 return
1132 password = session_info["password"]
1134 # Generate research ID
1135 import uuid
1137 research_id = str(uuid.uuid4())
1139 logger.info(
1140 f"Starting research {research_id} for subscription {subscription['id']}"
1141 )
1143 # Get user settings for research
1144 from ...database.session_context import get_user_db_session
1145 from ...settings.manager import SettingsManager
1147 with get_user_db_session(username, password) as db:
1148 settings_manager = SettingsManager(db)
1149 settings_snapshot = settings_manager.get_settings_snapshot()
1151 # Use the search engine from the subscription if specified
1152 search_engine = subscription.get("search_engine")
1154 if search_engine:
1155 settings_snapshot["search.tool"] = {
1156 "value": search_engine,
1157 "ui_element": "select",
1158 }
1159 logger.info(
1160 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}"
1161 )
1162 else:
1163 # Use the user's default search tool from their settings
1164 default_search_tool = settings_snapshot.get(
1165 "search.tool", "auto"
1166 )
1167 logger.info(
1168 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}"
1169 )
1171 logger.debug(
1172 f"Settings snapshot has {len(settings_snapshot)} settings"
1173 )
1174 # Log a few key settings to verify they're present
1175 logger.debug(
1176 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}"
1177 )
1179 # Set up research parameters
1180 query = subscription["query"]
1182 # Build metadata for news search
1183 metadata = {
1184 "is_news_search": True,
1185 "search_type": "news_analysis",
1186 "display_in": "news_feed",
1187 "subscription_id": subscription["id"],
1188 "triggered_by": "scheduler",
1189 "subscription_name": subscription["name"],
1190 "title": subscription["name"] if subscription["name"] else None,
1191 "scheduled_at": datetime.now(UTC).isoformat(),
1192 "original_query": subscription["original_query"],
1193 "user_id": username,
1194 }
1196 # Use programmatic API with settings context
1197 from ...api.research_functions import quick_summary
1198 from ...config.thread_settings import set_settings_context
1200 # Create and set settings context for this thread
1201 class SettingsContext:
1202 def __init__(self, snapshot):
1203 self.snapshot = snapshot or {}
1204 self.values = {}
1205 for key, setting in self.snapshot.items():
1206 if isinstance(setting, dict) and "value" in setting:
1207 self.values[key] = setting["value"]
1208 else:
1209 self.values[key] = setting
1211 def get_setting(self, key, default=None):
1212 """Get setting from snapshot only"""
1213 return self.values.get(key, default)
1215 # Set the context for this thread
1216 settings_context = SettingsContext(settings_snapshot)
1217 set_settings_context(settings_context)
1219 # Get search strategy from subscription data (for the API call)
1220 search_strategy = subscription.get(
1221 "search_strategy", "news_aggregation"
1222 )
1224 # Call quick_summary with appropriate parameters
1225 result = quick_summary(
1226 query=query,
1227 research_id=research_id,
1228 username=username,
1229 user_password=password,
1230 settings_snapshot=settings_snapshot,
1231 search_strategy=search_strategy,
1232 model_name=subscription.get("model"),
1233 provider=subscription.get("model_provider"),
1234 iterations=1, # Single iteration for news
1235 metadata=metadata,
1236 search_original_query=False, # Don't send long subscription prompts to search engines
1237 )
1239 logger.info(
1240 f"Completed research {research_id} for subscription {subscription['id']}"
1241 )
1243 # Store the research result in the database
1244 self._store_research_result(
1245 username,
1246 password,
1247 research_id,
1248 subscription["id"],
1249 result,
1250 subscription,
1251 )
1253 except Exception as e:
1254 logger.exception(
1255 f"Error triggering research for subscription {subscription['id']}: {e}"
1256 )
1258 def _store_research_result(
1259 self,
1260 username: str,
1261 password: str,
1262 research_id: str,
1263 subscription_id: int,
1264 result: Dict[str, Any],
1265 subscription: Dict[str, Any],
1266 ):
1267 """Store research result in database for news display."""
1268 try:
1269 from ...database.session_context import get_user_db_session
1270 from ...database.models import ResearchHistory
1271 from ...settings.manager import SettingsManager
1272 import json
1274 # Convert result to JSON-serializable format
1275 def make_serializable(obj):
1276 """Convert non-serializable objects to dictionaries."""
1277 if hasattr(obj, "dict"):
1278 return obj.dict()
1279 elif hasattr(obj, "__dict__"):
1280 return {
1281 k: make_serializable(v)
1282 for k, v in obj.__dict__.items()
1283 if not k.startswith("_")
1284 }
1285 elif isinstance(obj, (list, tuple)):
1286 return [make_serializable(item) for item in obj]
1287 elif isinstance(obj, dict):
1288 return {k: make_serializable(v) for k, v in obj.items()}
1289 else:
1290 return obj
1292 serializable_result = make_serializable(result)
1294 with get_user_db_session(username, password) as db:
1295 # Get user settings to store in metadata
1296 settings_manager = SettingsManager(db)
1297 settings_snapshot = settings_manager.get_settings_snapshot()
1299 # Get the report content - check both 'report' and 'summary' fields
1300 report_content = serializable_result.get(
1301 "report"
1302 ) or serializable_result.get("summary")
1303 logger.debug(
1304 f"Report content length: {len(report_content) if report_content else 0} chars"
1305 )
1307 # Extract sources/links from the result
1308 sources = serializable_result.get("sources", [])
1310 # First add the sources/references section if we have sources
1311 if report_content and sources:
1312 # Import utilities for formatting links
1313 from ...utilities.search_utilities import (
1314 format_links_to_markdown,
1315 )
1317 # Format the links/citations
1318 formatted_links = format_links_to_markdown(sources)
1320 # Add references section to the report
1321 if formatted_links:
1322 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}"
1324 # Then format citations in the report content
1325 if report_content:
1326 # Import citation formatter
1327 from ...text_optimization.citation_formatter import (
1328 CitationFormatter,
1329 CitationMode,
1330 )
1331 from ...config.search_config import (
1332 get_setting_from_snapshot,
1333 )
1335 # Get citation format from settings
1336 citation_format = get_setting_from_snapshot(
1337 "report.citation_format", "domain_id_hyperlinks"
1338 )
1339 mode_map = {
1340 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS,
1341 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS,
1342 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS,
1343 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS,
1344 "no_hyperlinks": CitationMode.NO_HYPERLINKS,
1345 }
1346 mode = mode_map.get(
1347 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS
1348 )
1349 formatter = CitationFormatter(mode=mode)
1351 # Format citations within the content
1352 report_content = formatter.format_document(report_content)
1354 if not report_content:
1355 # If neither field exists, use the full result as JSON
1356 report_content = json.dumps(serializable_result)
1358 # Generate headline and topics for news searches
1359 from ...news.utils.headline_generator import generate_headline
1360 from ...news.utils.topic_generator import generate_topics
1362 query_text = result.get(
1363 "query", subscription.get("query", "News Update")
1364 )
1366 # Generate headline from the actual research findings
1367 logger.info(
1368 f"Generating headline for subscription {subscription_id}"
1369 )
1370 generated_headline = generate_headline(
1371 query=query_text,
1372 findings=report_content,
1373 max_length=200, # Allow longer headlines for news
1374 )
1376 # Generate topics from the findings
1377 logger.info(
1378 f"Generating topics for subscription {subscription_id}"
1379 )
1380 generated_topics = generate_topics(
1381 query=query_text,
1382 findings=report_content,
1383 category=subscription.get("name", "News"),
1384 max_topics=6,
1385 )
1387 logger.info(
1388 f"Generated headline: {generated_headline}, topics: {generated_topics}"
1389 )
1391 # Get subscription name for metadata
1392 subscription_name = subscription.get("name", "")
1394 # Use generated headline as title, or fallback
1395 if generated_headline:
1396 title = generated_headline
1397 else:
1398 if subscription_name:
1399 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}"
1400 else:
1401 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}"
1403 # Create research history entry
1404 history_entry = ResearchHistory(
1405 id=research_id,
1406 query=result.get("query", ""),
1407 mode="news_subscription",
1408 status="completed",
1409 created_at=datetime.now(UTC).isoformat(),
1410 completed_at=datetime.now(UTC).isoformat(),
1411 title=title,
1412 research_meta={
1413 "subscription_id": subscription_id,
1414 "triggered_by": "scheduler",
1415 "is_news_search": True,
1416 "username": username,
1417 "subscription_name": subscription_name, # Store subscription name for display
1418 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval
1419 "generated_headline": generated_headline, # Store generated headline for news display
1420 "generated_topics": generated_topics, # Store topics for categorization
1421 },
1422 )
1423 db.add(history_entry)
1424 db.commit()
1426 # Store the report content using storage abstraction
1427 from ...storage import get_report_storage
1429 # Use storage to save the report (report_content already retrieved above)
1430 storage = get_report_storage(session=db)
1431 storage.save_report(
1432 research_id=research_id,
1433 content=report_content,
1434 username=username,
1435 )
1437 logger.info(
1438 f"Stored research result {research_id} for subscription {subscription_id}"
1439 )
1441 except Exception:
1442 logger.exception("Error storing research result")
1444 def _run_cleanup_with_tracking(self):
1445 """Wrapper that tracks cleanup execution."""
1447 try:
1448 cleaned_count = self._cleanup_inactive_users()
1450 logger.info(
1451 f"Cleanup successful: removed {cleaned_count} inactive users"
1452 )
1454 except Exception:
1455 logger.exception("Cleanup job failed")
1457 def _cleanup_inactive_users(self) -> int:
1458 """Remove users inactive for longer than retention period."""
1459 retention_hours = self.config.get("retention_hours", 48)
1460 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours)
1462 cleaned_count = 0
1464 with self.lock:
1465 inactive_users = [
1466 user_id
1467 for user_id, session in self.user_sessions.items()
1468 if session["last_activity"] < cutoff
1469 ]
1471 for user_id in inactive_users: 1471 ↛ 1473line 1471 didn't jump to line 1473 because the loop on line 1471 never started
1472 # Remove all scheduled jobs
1473 for job_id in self.user_sessions[user_id]["scheduled_jobs"]:
1474 try:
1475 self.scheduler.remove_job(job_id)
1476 except JobLookupError:
1477 pass
1479 # Clear password from memory
1480 del self.user_sessions[user_id]
1481 cleaned_count += 1
1482 logger.info(f"Cleaned up inactive user {user_id}")
1484 return cleaned_count
1486 def _reload_config(self):
1487 """Reload configuration from settings manager."""
1488 if not hasattr(self, "settings_manager") or not self.settings_manager:
1489 return
1491 try:
1492 old_retention = self.config.get("retention_hours", 48)
1494 # Reload all settings
1495 for key in self.config:
1496 if key == "enabled":
1497 continue # Don't change enabled state while running
1499 full_key = f"news.scheduler.{key}"
1500 self.config[key] = self._get_setting(full_key, self.config[key])
1502 # Handle changes that need immediate action
1503 if old_retention != self.config["retention_hours"]:
1504 logger.info(
1505 f"Retention period changed from {old_retention} "
1506 f"to {self.config['retention_hours']} hours"
1507 )
1508 # Trigger immediate cleanup with new retention
1509 self.scheduler.add_job(
1510 self._run_cleanup_with_tracking,
1511 "date",
1512 run_date=datetime.now(UTC) + timedelta(seconds=5),
1513 id="immediate_cleanup_config_change",
1514 )
1516 except Exception:
1517 logger.exception("Error reloading configuration")
1519 def get_status(self) -> Dict[str, Any]:
1520 """Get scheduler status information."""
1521 with self.lock:
1522 active_users = len(self.user_sessions)
1523 total_jobs = sum(
1524 len(session["scheduled_jobs"])
1525 for session in self.user_sessions.values()
1526 )
1528 # Get next run time for cleanup job
1529 next_cleanup = None
1530 if self.is_running:
1531 job = self.scheduler.get_job("cleanup_inactive_users")
1532 if job:
1533 next_cleanup = job.next_run_time
1535 return {
1536 "is_running": self.is_running,
1537 "config": self.config,
1538 "active_users": active_users,
1539 "total_scheduled_jobs": total_jobs,
1540 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None,
1541 "memory_usage": self._estimate_memory_usage(),
1542 }
1544 def _estimate_memory_usage(self) -> int:
1545 """Estimate memory usage of user sessions."""
1547 # Rough estimate: username (50) + password (100) + metadata (200) per user
1548 per_user_estimate = 350
1549 return len(self.user_sessions) * per_user_estimate
1551 def get_user_sessions_summary(self) -> List[Dict[str, Any]]:
1552 """Get summary of active user sessions (without passwords)."""
1553 with self.lock:
1554 summary = []
1555 for user_id, session in self.user_sessions.items():
1556 summary.append(
1557 {
1558 "user_id": user_id,
1559 "last_activity": session["last_activity"].isoformat(),
1560 "scheduled_jobs": len(session["scheduled_jobs"]),
1561 "time_since_activity": str(
1562 datetime.now(UTC) - session["last_activity"]
1563 ),
1564 }
1565 )
1566 return summary
1569# Singleton instance getter
1570_scheduler_instance = None
1573def get_news_scheduler() -> NewsScheduler:
1574 """Get the singleton news scheduler instance."""
1575 global _scheduler_instance
1576 if _scheduler_instance is None:
1577 _scheduler_instance = NewsScheduler()
1578 return _scheduler_instance