Coverage for src / local_deep_research / scheduler / background.py: 89%

668 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-19 11:23 +0000

1""" 

2Activity-based news subscription scheduler for per-user encrypted databases. 

3Tracks user activity and temporarily stores credentials for automatic updates. 

4""" 

5 

6import random 

7import threading 

8from dataclasses import dataclass 

9from datetime import datetime, timedelta, UTC 

10from functools import wraps 

11from typing import Any, Callable, Dict, List 

12 

13from cachetools import TTLCache 

14from loguru import logger 

15from ..settings.logger import log_settings 

16from ..settings.manager import SnapshotSettingsContext 

17 

18from apscheduler.schedulers.background import BackgroundScheduler 

19from apscheduler.jobstores.base import JobLookupError 

20from ..constants import ResearchStatus 

21from ..database.credential_store_base import CredentialStoreBase 

22from ..database.thread_local_session import thread_cleanup 

23 

24# RAG indexing imports 

25from ..research_library.services.library_rag_service import LibraryRAGService 

26from ..database.library_init import get_default_library_id 

27from ..database.models.library import Document, DocumentCollection 

28 

29 

30SCHEDULER_AVAILABLE = True # Always available since it's a required dependency 

31 

32 

33class SchedulerCredentialStore(CredentialStoreBase): 

34 """Credential store for the news scheduler. 

35 

36 Stores user passwords with TTL expiration so that background scheduler 

37 jobs can access encrypted per-user databases. 

38 """ 

39 

40 def __init__(self, ttl_hours: int = 48): 

41 super().__init__(ttl_hours * 3600) 

42 

43 def store(self, username: str, password: str) -> None: 

44 """Store password for a user.""" 

45 self._store_credentials( 

46 username, {"username": username, "password": password} 

47 ) 

48 

49 def retrieve(self, username: str) -> str | None: 

50 """Retrieve password for a user. Returns None if expired/missing.""" 

51 result = self._retrieve_credentials(username, remove=False) 

52 return result[1] if result else None 

53 

54 def clear(self, username: str) -> None: 

55 """Clear stored password for a user.""" 

56 self.clear_entry(username) 

57 

58 

59@dataclass(frozen=True) 

60class DocumentSchedulerSettings: 

61 """ 

62 Immutable settings snapshot for document scheduler. 

63 

64 Thread-safe: This is a frozen dataclass that can be safely passed 

65 to and used from background threads. 

66 """ 

67 

68 enabled: bool = True 

69 interval_seconds: int = 1800 

70 download_pdfs: bool = False 

71 extract_text: bool = True 

72 generate_rag: bool = False 

73 last_run: str = "" 

74 

75 @classmethod 

76 def defaults(cls) -> "DocumentSchedulerSettings": 

77 """Return default settings.""" 

78 return cls() 

79 

80 

81class BackgroundJobScheduler: 

82 """ 

83 Singleton scheduler that manages news subscriptions for active users. 

84 

85 This scheduler: 

86 - Monitors user activity through database access 

87 - Temporarily stores user credentials in memory 

88 - Automatically schedules subscription checks 

89 - Cleans up inactive users after configurable period 

90 """ 

91 

92 _instance = None 

93 _lock = threading.Lock() 

94 

95 def __new__(cls): 

96 """Ensure singleton instance.""" 

97 if cls._instance is None: 

98 with cls._lock: 

99 if cls._instance is None: 99 ↛ 101line 99 didn't jump to line 101

100 cls._instance = super().__new__(cls) 

101 return cls._instance 

102 

103 def __init__(self): 

104 """Initialize the scheduler (only runs once due to singleton).""" 

105 # Skip if already initialized 

106 if hasattr(self, "_initialized"): 

107 return 

108 

109 # User session tracking 

110 self.user_sessions = {} # user_id -> {last_activity, scheduled_jobs} 

111 self.lock = threading.Lock() 

112 

113 # Credential store with TTL-based expiration 

114 self._credential_store = SchedulerCredentialStore(ttl_hours=48) 

115 

116 # Scheduler instance 

117 self.scheduler = BackgroundScheduler() 

118 

119 # Configuration (will be loaded from settings) 

120 self.config = self._load_default_config() 

121 

122 # State 

123 self.is_running = False 

124 self._app = None # Flask app reference for background job contexts 

125 

126 # Settings cache: username -> DocumentSchedulerSettings 

127 # TTL of 300 seconds (5 minutes) reduces database queries 

128 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300) 

129 self._settings_cache_lock = threading.Lock() 

130 

131 self._initialized = True 

132 logger.info("News scheduler initialized") 

133 

134 def _load_default_config(self) -> Dict[str, Any]: 

135 """Load default configuration (will be overridden by settings manager).""" 

136 return { 

137 "enabled": True, 

138 "retention_hours": 48, 

139 "cleanup_interval_hours": 1, 

140 "max_jitter_seconds": 300, 

141 "max_concurrent_jobs": 10, 

142 "subscription_batch_size": 5, 

143 "activity_check_interval_minutes": 5, 

144 } 

145 

146 def initialize_with_settings(self, settings_manager): 

147 """Initialize configuration from settings manager.""" 

148 try: 

149 # Load all scheduler settings 

150 self.settings_manager = settings_manager 

151 self.config = { 

152 "enabled": self._get_setting("news.scheduler.enabled", True), 

153 "retention_hours": self._get_setting( 

154 "news.scheduler.retention_hours", 48 

155 ), 

156 "cleanup_interval_hours": self._get_setting( 

157 "news.scheduler.cleanup_interval_hours", 1 

158 ), 

159 "max_jitter_seconds": self._get_setting( 

160 "news.scheduler.max_jitter_seconds", 300 

161 ), 

162 "max_concurrent_jobs": self._get_setting( 

163 "news.scheduler.max_concurrent_jobs", 10 

164 ), 

165 "subscription_batch_size": self._get_setting( 

166 "news.scheduler.batch_size", 5 

167 ), 

168 "activity_check_interval_minutes": self._get_setting( 

169 "news.scheduler.activity_check_interval", 5 

170 ), 

171 } 

172 log_settings(self.config, "Scheduler configuration loaded") 

173 except Exception: 

174 logger.exception("Error loading scheduler settings") 

175 # Keep default config 

176 

177 def _get_setting(self, key: str, default: Any) -> Any: 

178 """Get setting with fallback to default.""" 

179 if hasattr(self, "settings_manager") and self.settings_manager: 

180 return self.settings_manager.get_setting(key, default=default) 

181 return default 

182 

183 def set_app(self, app) -> None: 

184 """Store a reference to the Flask app for creating app contexts in background jobs.""" 

185 self._app = app 

186 

187 def _wrap_job(self, func: Callable) -> Callable: 

188 """Wrap a scheduler job function so it runs inside a Flask app context. 

189 

190 APScheduler runs jobs in a thread pool without Flask context. 

191 This wrapper pushes an app context before the job runs and pops it after. 

192 """ 

193 

194 @wraps(func) 

195 def wrapper(*args, **kwargs): 

196 if self._app is not None: 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was always true

197 with self._app.app_context(): 

198 return func(*args, **kwargs) 

199 else: 

200 logger.warning( 

201 f"No Flask app set on scheduler; running {func.__name__} without app context" 

202 ) 

203 return func(*args, **kwargs) 

204 

205 return wrapper 

206 

207 def _get_document_scheduler_settings( 

208 self, username: str, force_refresh: bool = False 

209 ) -> DocumentSchedulerSettings: 

210 """ 

211 Get document scheduler settings for a user with TTL caching. 

212 

213 This is the single source of truth for document scheduler settings. 

214 Settings are cached for 5 minutes by default to reduce database queries. 

215 

216 Args: 

217 username: User to get settings for 

218 force_refresh: If True, bypass cache and fetch fresh settings 

219 

220 Returns: 

221 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety) 

222 """ 

223 # Fast path: check cache without modifying it 

224 if not force_refresh: 

225 with self._settings_cache_lock: 

226 cached = self._settings_cache.get(username) 

227 if cached is not None: 

228 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}") 

229 cached_settings: DocumentSchedulerSettings = cached 

230 return cached_settings 

231 

232 # Cache miss - need to fetch from database 

233 logger.debug( 

234 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB" 

235 ) 

236 

237 # Get password from session 

238 session_info = self.user_sessions.get(username) 

239 if not session_info: 

240 logger.warning( 

241 f"[SETTINGS_CACHE] No session info for {username}, using defaults" 

242 ) 

243 return DocumentSchedulerSettings.defaults() 

244 

245 password = self._credential_store.retrieve(username) 

246 if not password: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 logger.warning( 

248 f"[SETTINGS_CACHE] Credentials expired for {username}, using defaults" 

249 ) 

250 return DocumentSchedulerSettings.defaults() 

251 

252 # Fetch settings from database (outside lock to avoid blocking) 

253 try: 

254 from ..database.session_context import get_user_db_session 

255 from ..settings.manager import SettingsManager 

256 

257 with get_user_db_session(username, password) as db: 

258 sm = SettingsManager(db) 

259 

260 settings = DocumentSchedulerSettings( 

261 enabled=sm.get_setting("document_scheduler.enabled", True), 

262 interval_seconds=sm.get_setting( 

263 "document_scheduler.interval_seconds", 1800 

264 ), 

265 download_pdfs=sm.get_setting( 

266 "document_scheduler.download_pdfs", False 

267 ), 

268 extract_text=sm.get_setting( 

269 "document_scheduler.extract_text", True 

270 ), 

271 generate_rag=sm.get_setting( 

272 "document_scheduler.generate_rag", False 

273 ), 

274 last_run=sm.get_setting("document_scheduler.last_run", ""), 

275 ) 

276 

277 # Store in cache 

278 with self._settings_cache_lock: 

279 self._settings_cache[username] = settings 

280 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}") 

281 

282 return settings 

283 

284 except Exception: 

285 logger.exception( 

286 f"[SETTINGS_CACHE] Error fetching settings for {username}" 

287 ) 

288 return DocumentSchedulerSettings.defaults() 

289 

290 def invalidate_user_settings_cache(self, username: str) -> bool: 

291 """ 

292 Invalidate cached settings for a specific user. 

293 

294 Call this when user settings change or user logs out. 

295 

296 Args: 

297 username: User whose cache to invalidate 

298 

299 Returns: 

300 True if cache entry was removed, False if not found 

301 """ 

302 with self._settings_cache_lock: 

303 if username in self._settings_cache: 

304 del self._settings_cache[username] 

305 logger.debug( 

306 f"[SETTINGS_CACHE] Invalidated cache for {username}" 

307 ) 

308 return True 

309 return False 

310 

311 def invalidate_all_settings_cache(self) -> int: 

312 """ 

313 Invalidate all cached settings. 

314 

315 Call this when doing bulk settings updates or during config reload. 

316 

317 Returns: 

318 Number of cache entries cleared 

319 """ 

320 with self._settings_cache_lock: 

321 count = len(self._settings_cache) 

322 self._settings_cache.clear() 

323 logger.info( 

324 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)" 

325 ) 

326 return count 

327 

328 def start(self): 

329 """Start the scheduler.""" 

330 if not self.config.get("enabled", True): 

331 logger.info("News scheduler is disabled in settings") 

332 return 

333 

334 if self.is_running: 

335 logger.warning("Scheduler is already running") 

336 return 

337 

338 if self._app is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 raise RuntimeError( 

340 "BackgroundJobScheduler.set_app() must be called before start()" 

341 ) 

342 

343 # Schedule cleanup job 

344 self.scheduler.add_job( 

345 self._wrap_job(self._run_cleanup_with_tracking), 

346 "interval", 

347 hours=self.config["cleanup_interval_hours"], 

348 id="cleanup_inactive_users", 

349 name="Cleanup Inactive User Sessions", 

350 jitter=60, # Add some jitter to cleanup 

351 ) 

352 

353 # Schedule configuration reload 

354 self.scheduler.add_job( 

355 self._wrap_job(self._reload_config), 

356 "interval", 

357 minutes=30, 

358 id="reload_config", 

359 name="Reload Configuration", 

360 ) 

361 

362 # Start the scheduler 

363 self.scheduler.start() 

364 self.is_running = True 

365 

366 # Schedule initial cleanup after a delay 

367 self.scheduler.add_job( 

368 self._wrap_job(self._run_cleanup_with_tracking), 

369 "date", 

370 run_date=datetime.now(UTC) + timedelta(seconds=30), 

371 id="initial_cleanup", 

372 ) 

373 

374 logger.info("News scheduler started") 

375 

376 def stop(self): 

377 """Stop the scheduler.""" 

378 if self.is_running: 

379 self.scheduler.shutdown(wait=True) 

380 self.is_running = False 

381 

382 # Clear all user sessions and credentials 

383 with self.lock: 

384 for username in self.user_sessions: 

385 self._credential_store.clear(username) 

386 self.user_sessions.clear() 

387 

388 logger.info("News scheduler stopped") 

389 

390 def update_user_info(self, username: str, password: str): 

391 """ 

392 Update user info in scheduler. Called on every database interaction. 

393 

394 Args: 

395 username: User's username 

396 password: User's password 

397 """ 

398 logger.info( 

399 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}" 

400 ) 

401 logger.debug( 

402 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}" 

403 ) 

404 

405 if not self.is_running: 

406 logger.warning( 

407 f"[SCHEDULER] Scheduler not running, cannot update user {username}" 

408 ) 

409 return 

410 

411 with self.lock: 

412 # Store password in credential store (inside lock to prevent 

413 # race where concurrent calls leave mismatched credentials) 

414 self._credential_store.store(username, password) 

415 

416 now = datetime.now(UTC) 

417 

418 if username not in self.user_sessions: 

419 # New user - create session info 

420 logger.info(f"[SCHEDULER] New user in scheduler: {username}") 

421 self.user_sessions[username] = { 

422 "last_activity": now, 

423 "scheduled_jobs": set(), 

424 } 

425 logger.debug( 

426 f"[SCHEDULER] Created session for {username}, scheduling subscriptions" 

427 ) 

428 # Schedule their subscriptions 

429 self._schedule_user_subscriptions(username) 

430 else: 

431 # Existing user - update info 

432 logger.info( 

433 f"[SCHEDULER] Updating existing user {username} activity, will reschedule" 

434 ) 

435 old_activity = self.user_sessions[username]["last_activity"] 

436 activity_delta = now - old_activity 

437 logger.debug( 

438 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}" 

439 ) 

440 

441 self.user_sessions[username]["last_activity"] = now 

442 logger.debug( 

443 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions" 

444 ) 

445 # Reschedule their subscriptions in case they changed 

446 self._schedule_user_subscriptions(username) 

447 

448 def unregister_user(self, username: str): 

449 """ 

450 Unregister a user and clean up their scheduled jobs. 

451 Called when user logs out. 

452 """ 

453 with self.lock: 

454 if username in self.user_sessions: 

455 logger.info(f"Unregistering user {username}") 

456 

457 # Remove all scheduled jobs for this user 

458 session_info = self.user_sessions[username] 

459 for job_id in session_info["scheduled_jobs"].copy(): 

460 try: 

461 self.scheduler.remove_job(job_id) 

462 except JobLookupError: 

463 pass 

464 

465 # Remove user session and clear credentials atomically 

466 del self.user_sessions[username] 

467 self._credential_store.clear(username) 

468 

469 # Invalidate settings cache for this user (outside lock) 

470 self.invalidate_user_settings_cache(username) 

471 logger.info(f"User {username} unregistered successfully") 

472 

473 def _schedule_user_subscriptions(self, username: str): 

474 """Schedule all active subscriptions for a user.""" 

475 logger.info(f"_schedule_user_subscriptions called for {username}") 

476 try: 

477 session_info = self.user_sessions.get(username) 

478 if not session_info: 

479 logger.warning(f"No session info found for {username}") 

480 return 

481 

482 password = self._credential_store.retrieve(username) 

483 if not password: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 logger.warning( 

485 f"Credentials expired for {username}, skipping subscription scheduling" 

486 ) 

487 return 

488 logger.debug(f"Got password for {username}: present") 

489 

490 # Get user's subscriptions from their encrypted database 

491 from ..database.session_context import get_user_db_session 

492 from ..database.models.news import NewsSubscription 

493 

494 with get_user_db_session(username, password) as db: 

495 subscriptions = ( 

496 db.query(NewsSubscription).filter_by(is_active=True).all() 

497 ) 

498 logger.debug( 

499 f"Query executed, found {len(subscriptions)} results" 

500 ) 

501 

502 # Log details of each subscription 

503 for sub in subscriptions: 

504 logger.debug( 

505 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes" 

506 ) 

507 

508 logger.info( 

509 f"Found {len(subscriptions)} active subscriptions for {username}" 

510 ) 

511 

512 # Clear old jobs for this user 

513 for job_id in session_info["scheduled_jobs"].copy(): 

514 try: 

515 self.scheduler.remove_job(job_id) 

516 session_info["scheduled_jobs"].remove(job_id) 

517 except JobLookupError: 

518 pass 

519 

520 # Schedule each subscription with jitter 

521 for sub in subscriptions: 

522 job_id = f"{username}_{sub.id}" 

523 

524 # Calculate jitter 

525 # Security: random jitter to distribute subscription timing, not security-sensitive 

526 max_jitter = int(self.config.get("max_jitter_seconds", 300)) 

527 jitter = random.randint(0, max_jitter) 

528 

529 # Determine trigger based on frequency 

530 refresh_minutes = sub.refresh_interval_minutes 

531 

532 if refresh_minutes <= 60: # 60 minutes or less 

533 # For hourly or more frequent, use interval trigger 

534 trigger = "interval" 

535 trigger_args = { 

536 "minutes": refresh_minutes, 

537 "jitter": jitter, 

538 "start_date": datetime.now(UTC), # Start immediately 

539 } 

540 else: 

541 # For less frequent, calculate next run time 

542 now = datetime.now(UTC) 

543 if sub.next_refresh: 

544 # Ensure timezone-aware for comparison with now (UTC) 

545 next_refresh_aware = sub.next_refresh 

546 if next_refresh_aware.tzinfo is None: 

547 logger.warning( 

548 f"Subscription {sub.id} has naive (non-tz-aware) " 

549 f"next_refresh datetime, assuming UTC" 

550 ) 

551 next_refresh_aware = next_refresh_aware.replace( 

552 tzinfo=UTC 

553 ) 

554 if next_refresh_aware <= now: 

555 # Subscription is overdue - run it immediately with small jitter 

556 logger.info( 

557 f"Subscription {sub.id} is overdue, scheduling immediate run" 

558 ) 

559 next_run = now + timedelta(seconds=jitter) 

560 else: 

561 next_run = next_refresh_aware 

562 else: 

563 next_run = now + timedelta( 

564 minutes=refresh_minutes, seconds=jitter 

565 ) 

566 

567 trigger = "date" 

568 trigger_args = {"run_date": next_run} 

569 

570 # Add the job 

571 self.scheduler.add_job( 

572 func=self._wrap_job(self._check_subscription), 

573 args=[username, sub.id], 

574 trigger=trigger, 

575 id=job_id, 

576 name=f"Check {sub.name or sub.query_or_topic[:30]}", 

577 replace_existing=True, 

578 **trigger_args, 

579 ) 

580 

581 session_info["scheduled_jobs"].add(job_id) 

582 logger.info(f"Scheduled job {job_id} with {trigger} trigger") 

583 

584 except Exception: 

585 logger.exception(f"Error scheduling subscriptions for {username}") 

586 

587 # Add document processing for this user 

588 self._schedule_document_processing(username) 

589 

590 def _schedule_document_processing(self, username: str): 

591 """Schedule document processing for a user.""" 

592 logger.info( 

593 f"[DOC_SCHEDULER] Scheduling document processing for {username}" 

594 ) 

595 logger.debug( 

596 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}" 

597 ) 

598 

599 try: 

600 session_info = self.user_sessions.get(username) 

601 if not session_info: 

602 logger.warning( 

603 f"[DOC_SCHEDULER] No session info found for {username}" 

604 ) 

605 logger.debug( 

606 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}" 

607 ) 

608 return 

609 

610 logger.debug( 

611 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}" 

612 ) 

613 

614 # Get user's document scheduler settings (cached) 

615 settings = self._get_document_scheduler_settings(username) 

616 

617 if not settings.enabled: 

618 logger.info( 

619 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}" 

620 ) 

621 return 

622 

623 logger.info( 

624 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, " 

625 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, " 

626 f"text={settings.extract_text}, rag={settings.generate_rag}" 

627 ) 

628 

629 # Schedule document processing job 

630 job_id = f"{username}_document_processing" 

631 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}") 

632 

633 # Remove existing document job if any 

634 try: 

635 self.scheduler.remove_job(job_id) 

636 session_info["scheduled_jobs"].discard(job_id) 

637 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}") 

638 except JobLookupError: 

639 logger.debug( 

640 f"[DOC_SCHEDULER] No existing job {job_id} to remove" 

641 ) 

642 pass # Job doesn't exist, that's fine 

643 

644 # Add new document processing job 

645 logger.debug( 

646 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s" 

647 ) 

648 self.scheduler.add_job( 

649 func=self._wrap_job(self._process_user_documents), 

650 args=[username], 

651 trigger="interval", 

652 seconds=settings.interval_seconds, 

653 id=job_id, 

654 name=f"Process Documents for {username}", 

655 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously 

656 max_instances=1, # Prevent overlapping document processing for same user 

657 replace_existing=True, 

658 ) 

659 

660 session_info["scheduled_jobs"].add(job_id) 

661 logger.info( 

662 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval" 

663 ) 

664 logger.debug( 

665 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}" 

666 ) 

667 

668 # Verify job was added 

669 job = self.scheduler.get_job(job_id) 

670 if job: 

671 logger.info( 

672 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}" 

673 ) 

674 else: 

675 logger.error( 

676 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!" 

677 ) 

678 

679 except Exception: 

680 logger.exception( 

681 f"Error scheduling document processing for {username}" 

682 ) 

683 

684 @thread_cleanup 

685 def _process_user_documents(self, username: str): 

686 """Process documents for a user.""" 

687 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}") 

688 start_time = datetime.now(UTC) 

689 

690 try: 

691 session_info = self.user_sessions.get(username) 

692 if not session_info: 

693 logger.warning( 

694 f"[DOC_SCHEDULER] No session info found for user {username}" 

695 ) 

696 return 

697 

698 password = self._credential_store.retrieve(username) 

699 if not password: 

700 logger.warning( 

701 f"[DOC_SCHEDULER] Credentials expired for user {username}" 

702 ) 

703 return 

704 logger.debug( 

705 f"[DOC_SCHEDULER] Starting document processing for {username}" 

706 ) 

707 

708 # Get user's document scheduler settings (cached) 

709 settings = self._get_document_scheduler_settings(username) 

710 

711 logger.info( 

712 f"[DOC_SCHEDULER] Processing settings for {username}: " 

713 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}" 

714 ) 

715 

716 if not any( 

717 [ 

718 settings.download_pdfs, 

719 settings.extract_text, 

720 settings.generate_rag, 

721 ] 

722 ): 

723 logger.info( 

724 f"[DOC_SCHEDULER] No processing options enabled for user {username}" 

725 ) 

726 return 

727 

728 # Parse last_run from cached settings 

729 last_run = ( 

730 datetime.fromisoformat(settings.last_run) 

731 if settings.last_run 

732 else None 

733 ) 

734 

735 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}") 

736 

737 # Need database session for queries and updates 

738 from ..database.session_context import get_user_db_session 

739 from ..database.models.research import ResearchHistory 

740 from ..settings.manager import SettingsManager 

741 

742 with get_user_db_session(username, password) as db: 

743 settings_manager = SettingsManager(db) 

744 

745 # Query for completed research since last run 

746 logger.debug( 

747 f"[DOC_SCHEDULER] Querying for completed research since {last_run}" 

748 ) 

749 query = db.query(ResearchHistory).filter( 

750 ResearchHistory.status == ResearchStatus.COMPLETED, 

751 ResearchHistory.completed_at.is_not( 

752 None 

753 ), # Ensure completed_at is not null 

754 ) 

755 

756 if last_run: 

757 query = query.filter( 

758 ResearchHistory.completed_at > last_run 

759 ) 

760 

761 # Limit to recent research to prevent overwhelming 

762 query = query.order_by( 

763 ResearchHistory.completed_at.desc() 

764 ).limit(20) 

765 

766 research_sessions = query.all() 

767 logger.debug( 

768 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions" 

769 ) 

770 

771 if not research_sessions: 

772 logger.info( 

773 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}" 

774 ) 

775 return 

776 

777 logger.info( 

778 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}" 

779 ) 

780 

781 # Log details of each research session 

782 for i, research in enumerate( 

783 research_sessions[:5] 

784 ): # Log first 5 details 

785 title_safe = ( 

786 (research.title[:50] + "...") 

787 if research.title 

788 else "No title" 

789 ) 

790 completed_safe = ( 

791 research.completed_at 

792 if research.completed_at 

793 else "No completion time" 

794 ) 

795 logger.debug( 

796 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}" 

797 ) 

798 

799 # Handle completed_at which might be a string or datetime 

800 completed_at_obj = None 

801 if research.completed_at: 

802 if isinstance(research.completed_at, str): 

803 try: 

804 completed_at_obj = datetime.fromisoformat( 

805 research.completed_at.replace("Z", "+00:00") 

806 ) 

807 except (ValueError, TypeError, AttributeError): 

808 completed_at_obj = None 

809 else: 

810 completed_at_obj = research.completed_at 

811 

812 logger.debug( 

813 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}" 

814 ) 

815 logger.debug( 

816 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}" 

817 ) 

818 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}") 

819 logger.debug( 

820 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}" 

821 ) 

822 

823 processed_count = 0 

824 for research in research_sessions: 

825 try: 

826 logger.info( 

827 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}" 

828 ) 

829 

830 # Set search context so rate limiting works in both 

831 # download_pdfs and extract_text paths 

832 from ...utilities.thread_context import ( 

833 set_search_context, 

834 ) 

835 

836 set_search_context( 

837 { 

838 "research_id": str(research.id), 

839 "username": username, 

840 "user_password": password, 

841 "research_phase": "document_scheduler", 

842 } 

843 ) 

844 

845 # Call actual processing APIs 

846 if settings.download_pdfs: 

847 logger.info( 

848 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}" 

849 ) 

850 try: 

851 # Use the DownloadService to queue PDF downloads 

852 from ..research_library.services.download_service import ( 

853 DownloadService, 

854 ) 

855 

856 with DownloadService( 

857 username=username, password=password 

858 ) as download_service: 

859 queued_count = download_service.queue_research_downloads( 

860 research.id 

861 ) 

862 logger.info( 

863 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}" 

864 ) 

865 except Exception: 

866 logger.exception( 

867 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}" 

868 ) 

869 

870 if settings.extract_text: 

871 logger.info( 

872 f"[DOC_SCHEDULER] Extracting text for research {research.id}" 

873 ) 

874 try: 

875 # Use the DownloadService to extract text for all resources 

876 from ..research_library.services.download_service import ( 

877 DownloadService, 

878 ) 

879 from ..database.models.research import ( 

880 ResearchResource, 

881 ) 

882 

883 from ..research_library.utils import ( 

884 is_downloadable_url, 

885 ) 

886 

887 with DownloadService( 

888 username=username, password=password 

889 ) as download_service: 

890 # Get all resources for this research (reuse existing db session) 

891 all_resources = ( 

892 db.query(ResearchResource) 

893 .filter_by(research_id=research.id) 

894 .all() 

895 ) 

896 # Filter: only process downloadable resources (academic/PDF) 

897 resources = [ 

898 r 

899 for r in all_resources 

900 if is_downloadable_url(r.url) 

901 ] 

902 processed_count = 0 

903 for resource in resources: 

904 # We need to pass the password to the download service 

905 # The DownloadService creates its own database sessions, so we need to ensure password is available 

906 try: 

907 success, error = ( 

908 download_service.download_as_text( 

909 resource.id 

910 ) 

911 ) 

912 if success: 

913 processed_count += 1 

914 logger.info( 

915 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}" 

916 ) 

917 else: 

918 logger.warning( 

919 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}" 

920 ) 

921 except Exception as resource_error: 

922 logger.exception( 

923 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}" 

924 ) 

925 logger.info( 

926 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed" 

927 ) 

928 except Exception: 

929 logger.exception( 

930 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}" 

931 ) 

932 

933 if settings.generate_rag: 

934 logger.info( 

935 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}" 

936 ) 

937 try: 

938 # Get embedding settings from user configuration 

939 embedding_model = settings_manager.get_setting( 

940 "local_search_embedding_model", 

941 "all-MiniLM-L6-v2", 

942 ) 

943 embedding_provider = ( 

944 settings_manager.get_setting( 

945 "local_search_embedding_provider", 

946 "sentence_transformers", 

947 ) 

948 ) 

949 chunk_size = int( 

950 settings_manager.get_setting( 

951 "local_search_chunk_size", 1000 

952 ) 

953 ) 

954 chunk_overlap = int( 

955 settings_manager.get_setting( 

956 "local_search_chunk_overlap", 200 

957 ) 

958 ) 

959 

960 # Initialize RAG service with user's embedding configuration 

961 with LibraryRAGService( 

962 username=username, 

963 embedding_model=embedding_model, 

964 embedding_provider=embedding_provider, 

965 chunk_size=chunk_size, 

966 chunk_overlap=chunk_overlap, 

967 db_password=password, 

968 ) as rag_service: 

969 # Get default Library collection ID 

970 library_collection_id = ( 

971 get_default_library_id( 

972 username, password 

973 ) 

974 ) 

975 

976 # Query for unindexed documents from this research session 

977 documents_to_index = ( 

978 db.query(Document.id, Document.title) 

979 .outerjoin( 

980 DocumentCollection, 

981 ( 

982 DocumentCollection.document_id 

983 == Document.id 

984 ) 

985 & ( 

986 DocumentCollection.collection_id 

987 == library_collection_id 

988 ), 

989 ) 

990 .filter( 

991 Document.research_id == research.id, 

992 Document.text_content.isnot(None), 

993 ( 

994 DocumentCollection.indexed.is_( 

995 False 

996 ) 

997 | DocumentCollection.id.is_( 

998 None 

999 ) 

1000 ), 

1001 ) 

1002 .all() 

1003 ) 

1004 

1005 if not documents_to_index: 

1006 logger.info( 

1007 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}" 

1008 ) 

1009 else: 

1010 # Index each document 

1011 indexed_count = 0 

1012 for ( 

1013 doc_id, 

1014 doc_title, 

1015 ) in documents_to_index: 

1016 try: 

1017 result = rag_service.index_document( 

1018 document_id=doc_id, 

1019 collection_id=library_collection_id, 

1020 force_reindex=False, 

1021 ) 

1022 if ( 

1023 result["status"] 

1024 == "success" 

1025 ): 

1026 indexed_count += 1 

1027 logger.info( 

1028 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) " 

1029 f"with {result.get('chunk_count', 0)} chunks" 

1030 ) 

1031 except Exception as doc_error: 

1032 logger.exception( 

1033 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}" 

1034 ) 

1035 

1036 logger.info( 

1037 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: " 

1038 f"{indexed_count}/{len(documents_to_index)} documents indexed" 

1039 ) 

1040 except Exception: 

1041 logger.exception( 

1042 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}" 

1043 ) 

1044 

1045 processed_count += 1 

1046 logger.debug( 

1047 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}" 

1048 ) 

1049 

1050 except Exception: 

1051 logger.exception( 

1052 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}" 

1053 ) 

1054 

1055 # Update last run time in user's settings. 

1056 # Intentionally NOT wrapped in try/finally: if upstream setup 

1057 # fails (DB open, SettingsManager init, initial query), 

1058 # last_run should stay put so the next tick retries. 

1059 # Advancing here would mask a persistent failure (corrupted 

1060 # DB, wrong password). See closed PR #3288. 

1061 current_time = datetime.now(UTC).isoformat() 

1062 settings_manager.set_setting( 

1063 "document_scheduler.last_run", current_time, commit=True 

1064 ) 

1065 logger.debug( 

1066 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}" 

1067 ) 

1068 

1069 end_time = datetime.now(UTC) 

1070 duration = (end_time - start_time).total_seconds() 

1071 logger.info( 

1072 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s" 

1073 ) 

1074 

1075 except Exception: 

1076 logger.exception( 

1077 f"[DOC_SCHEDULER] Error processing documents for user {username}" 

1078 ) 

1079 

1080 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]: 

1081 """Get document scheduler status for a specific user.""" 

1082 try: 

1083 session_info = self.user_sessions.get(username) 

1084 if not session_info: 

1085 return { 

1086 "enabled": False, 

1087 "message": "User not found in scheduler", 

1088 } 

1089 

1090 # Get user's document scheduler settings (cached) 

1091 settings = self._get_document_scheduler_settings(username) 

1092 

1093 # Check if user has document processing job 

1094 job_id = f"{username}_document_processing" 

1095 has_job = job_id in session_info.get("scheduled_jobs", set()) 

1096 

1097 return { 

1098 "enabled": settings.enabled, 

1099 "interval_seconds": settings.interval_seconds, 

1100 "processing_options": { 

1101 "download_pdfs": settings.download_pdfs, 

1102 "extract_text": settings.extract_text, 

1103 "generate_rag": settings.generate_rag, 

1104 }, 

1105 "last_run": settings.last_run, 

1106 "has_scheduled_job": has_job, 

1107 "user_active": username in self.user_sessions, 

1108 } 

1109 

1110 except Exception as e: 

1111 logger.exception( 

1112 f"Error getting document scheduler status for user {username}" 

1113 ) 

1114 return { 

1115 "enabled": False, 

1116 "message": f"Failed to retrieve scheduler status: {type(e).__name__}", 

1117 } 

1118 

1119 def trigger_document_processing(self, username: str) -> bool: 

1120 """Trigger immediate document processing for a user.""" 

1121 logger.info( 

1122 f"[DOC_SCHEDULER] Manual trigger requested for user {username}" 

1123 ) 

1124 try: 

1125 session_info = self.user_sessions.get(username) 

1126 if not session_info: 

1127 logger.warning( 

1128 f"[DOC_SCHEDULER] User {username} not found in scheduler" 

1129 ) 

1130 logger.debug( 

1131 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}" 

1132 ) 

1133 return False 

1134 

1135 if not self.is_running: 

1136 logger.warning( 

1137 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}" 

1138 ) 

1139 return False 

1140 

1141 # Trigger immediate processing 

1142 job_id = f"{username}_document_processing_manual" 

1143 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}") 

1144 

1145 self.scheduler.add_job( 

1146 func=self._wrap_job(self._process_user_documents), 

1147 args=[username], 

1148 trigger="date", 

1149 run_date=datetime.now(UTC) + timedelta(seconds=1), 

1150 id=job_id, 

1151 name=f"Manual Document Processing for {username}", 

1152 replace_existing=True, 

1153 ) 

1154 

1155 # Verify job was added 

1156 job = self.scheduler.get_job(job_id) 

1157 if job: 

1158 logger.info( 

1159 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}" 

1160 ) 

1161 else: 

1162 logger.error( 

1163 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!" 

1164 ) 

1165 return False 

1166 

1167 return True 

1168 

1169 except Exception: 

1170 logger.exception( 

1171 f"[DOC_SCHEDULER] Error triggering document processing for user {username}" 

1172 ) 

1173 return False 

1174 

1175 @thread_cleanup 

1176 def _check_user_overdue_subscriptions(self, username: str): 

1177 """Check and immediately run any overdue subscriptions for a user.""" 

1178 try: 

1179 session_info = self.user_sessions.get(username) 

1180 if not session_info: 

1181 return 

1182 

1183 password = self._credential_store.retrieve(username) 

1184 if not password: 

1185 return 

1186 

1187 # Get user's overdue subscriptions 

1188 from ..database.session_context import get_user_db_session 

1189 from ..database.models.news import NewsSubscription 

1190 from datetime import timezone 

1191 

1192 with get_user_db_session(username, password) as db: 

1193 now = datetime.now(timezone.utc) 

1194 overdue_subs = ( 

1195 db.query(NewsSubscription) 

1196 .filter( 

1197 NewsSubscription.is_active.is_(True), 

1198 NewsSubscription.next_refresh.is_not(None), 

1199 NewsSubscription.next_refresh <= now, 

1200 ) 

1201 .all() 

1202 ) 

1203 

1204 if overdue_subs: 

1205 logger.info( 

1206 f"Found {len(overdue_subs)} overdue subscriptions for {username}" 

1207 ) 

1208 

1209 for sub in overdue_subs: 

1210 # Run immediately with small random delay 

1211 # Security: random delay to stagger overdue jobs, not security-sensitive 

1212 delay_seconds = random.randint(1, 30) 

1213 job_id = ( 

1214 f"overdue_{username}_{sub.id}_{int(now.timestamp())}" 

1215 ) 

1216 

1217 self.scheduler.add_job( 

1218 func=self._wrap_job(self._check_subscription), 

1219 args=[username, sub.id], 

1220 trigger="date", 

1221 run_date=now + timedelta(seconds=delay_seconds), 

1222 id=job_id, 

1223 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}", 

1224 replace_existing=True, 

1225 ) 

1226 

1227 logger.info( 

1228 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds" 

1229 ) 

1230 

1231 except Exception: 

1232 logger.exception( 

1233 f"Error checking overdue subscriptions for {username}" 

1234 ) 

1235 

1236 @thread_cleanup 

1237 def _check_subscription(self, username: str, subscription_id: int): 

1238 """Check and refresh a single subscription.""" 

1239 logger.info( 

1240 f"_check_subscription called for user {username}, subscription {subscription_id}" 

1241 ) 

1242 try: 

1243 session_info = self.user_sessions.get(username) 

1244 if not session_info: 

1245 # User no longer active, cancel job 

1246 job_id = f"{username}_{subscription_id}" 

1247 try: 

1248 self.scheduler.remove_job(job_id) 

1249 except JobLookupError: 

1250 pass 

1251 return 

1252 

1253 password = self._credential_store.retrieve(username) 

1254 if not password: 1254 ↛ 1255line 1254 didn't jump to line 1255 because the condition on line 1254 was never true

1255 logger.warning( 

1256 f"Credentials expired for {username}, skipping subscription check" 

1257 ) 

1258 return 

1259 

1260 # Get subscription details 

1261 from ..database.session_context import get_user_db_session 

1262 from ..database.models.news import NewsSubscription 

1263 

1264 with get_user_db_session(username, password) as db: 

1265 sub = db.query(NewsSubscription).get(subscription_id) 

1266 if not sub or not sub.is_active: 

1267 logger.info( 

1268 f"Subscription {subscription_id} not active, skipping" 

1269 ) 

1270 return 

1271 

1272 # Prepare query with date replacement using user's timezone 

1273 query = sub.query_or_topic 

1274 if "YYYY-MM-DD" in query: 

1275 from local_deep_research.news.core.utils import ( 

1276 get_local_date_string, 

1277 ) 

1278 from ..settings.manager import SettingsManager 

1279 

1280 settings_manager = SettingsManager(db) 

1281 local_date = get_local_date_string(settings_manager) 

1282 query = query.replace("YYYY-MM-DD", local_date) 

1283 

1284 # Update last/next refresh times 

1285 sub.last_refresh = datetime.now(UTC) 

1286 sub.next_refresh = datetime.now(UTC) + timedelta( 

1287 minutes=sub.refresh_interval_minutes 

1288 ) 

1289 db.commit() 

1290 

1291 subscription_data = { 

1292 "id": sub.id, 

1293 "name": sub.name, 

1294 "query": query, 

1295 "original_query": sub.query_or_topic, 

1296 "model_provider": sub.model_provider, 

1297 "model": sub.model, 

1298 "search_strategy": sub.search_strategy, 

1299 "search_engine": sub.search_engine, 

1300 } 

1301 

1302 logger.info( 

1303 f"Refreshing subscription {subscription_id}: {subscription_data['name']}" 

1304 ) 

1305 

1306 # Trigger research synchronously using requests with proper auth 

1307 self._trigger_subscription_research_sync( 

1308 username, subscription_data 

1309 ) 

1310 

1311 # Reschedule for next interval if using interval trigger 

1312 job_id = f"{username}_{subscription_id}" 

1313 job = self.scheduler.get_job(job_id) 

1314 if job and job.trigger.__class__.__name__ == "DateTrigger": 

1315 # For date triggers, reschedule 

1316 # Security: random jitter to distribute subscription timing, not security-sensitive 

1317 next_run = datetime.now(UTC) + timedelta( 

1318 minutes=sub.refresh_interval_minutes, 

1319 seconds=random.randint( 

1320 0, int(self.config.get("max_jitter_seconds", 300)) 

1321 ), 

1322 ) 

1323 self.scheduler.add_job( 

1324 func=self._wrap_job(self._check_subscription), 

1325 args=[username, subscription_id], 

1326 trigger="date", 

1327 run_date=next_run, 

1328 id=job_id, 

1329 replace_existing=True, 

1330 ) 

1331 

1332 except Exception: 

1333 logger.exception(f"Error checking subscription {subscription_id}") 

1334 

1335 @thread_cleanup 

1336 def _trigger_subscription_research_sync( 

1337 self, username: str, subscription: Dict[str, Any] 

1338 ): 

1339 """Trigger research for a subscription using programmatic API.""" 

1340 from ..config.thread_settings import set_settings_context 

1341 

1342 try: 

1343 # Get user's password from session info 

1344 session_info = self.user_sessions.get(username) 

1345 if not session_info: 

1346 logger.error(f"No session info for user {username}") 

1347 return 

1348 

1349 password = self._credential_store.retrieve(username) 

1350 if not password: 1350 ↛ 1351line 1350 didn't jump to line 1351 because the condition on line 1350 was never true

1351 logger.error(f"Credentials expired for user {username}") 

1352 return 

1353 

1354 # Generate research ID 

1355 import uuid 

1356 

1357 research_id = str(uuid.uuid4()) 

1358 

1359 logger.info( 

1360 f"Starting research {research_id} for subscription {subscription['id']}" 

1361 ) 

1362 

1363 # Get user settings for research 

1364 from ..database.session_context import get_user_db_session 

1365 from ..settings.manager import SettingsManager 

1366 

1367 with get_user_db_session(username, password) as db: 

1368 settings_manager = SettingsManager(db) 

1369 settings_snapshot = settings_manager.get_settings_snapshot() 

1370 

1371 # Use the search engine from the subscription if specified 

1372 search_engine = subscription.get("search_engine") 

1373 

1374 if search_engine: 

1375 settings_snapshot["search.tool"] = { 

1376 "value": search_engine, 

1377 "ui_element": "select", 

1378 } 

1379 logger.info( 

1380 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}" 

1381 ) 

1382 else: 

1383 # Use the user's default search tool from their settings 

1384 default_search_tool = settings_snapshot.get( 

1385 "search.tool", "auto" 

1386 ) 

1387 logger.info( 

1388 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}" 

1389 ) 

1390 

1391 logger.debug( 

1392 f"Settings snapshot has {len(settings_snapshot)} settings" 

1393 ) 

1394 # Log a few key settings to verify they're present 

1395 logger.debug( 

1396 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}" 

1397 ) 

1398 

1399 # Set up research parameters 

1400 query = subscription["query"] 

1401 

1402 # Build metadata for news search 

1403 metadata = { 

1404 "is_news_search": True, 

1405 "search_type": "news_analysis", 

1406 "display_in": "news_feed", 

1407 "subscription_id": subscription["id"], 

1408 "triggered_by": "scheduler", 

1409 "subscription_name": subscription["name"], 

1410 "title": subscription["name"] if subscription["name"] else None, 

1411 "scheduled_at": datetime.now(UTC).isoformat(), 

1412 "original_query": subscription["original_query"], 

1413 "user_id": username, 

1414 } 

1415 

1416 # Use programmatic API with settings context 

1417 from ..api.research_functions import quick_summary 

1418 

1419 # Create and set settings context for this thread 

1420 settings_context = SnapshotSettingsContext(settings_snapshot) 

1421 set_settings_context(settings_context) 

1422 

1423 # Get search strategy from subscription data (for the API call) 

1424 search_strategy = subscription.get( 

1425 "search_strategy", "news_aggregation" 

1426 ) 

1427 

1428 # Call quick_summary with appropriate parameters 

1429 result = quick_summary( 

1430 query=query, 

1431 research_id=research_id, 

1432 username=username, 

1433 user_password=password, 

1434 settings_snapshot=settings_snapshot, 

1435 search_strategy=search_strategy, 

1436 model_name=subscription.get("model"), 

1437 provider=subscription.get("model_provider"), 

1438 iterations=1, # Single iteration for news 

1439 metadata=metadata, 

1440 search_original_query=False, # Don't send long subscription prompts to search engines 

1441 ) 

1442 

1443 logger.info( 

1444 f"Completed research {research_id} for subscription {subscription['id']}" 

1445 ) 

1446 

1447 # Store the research result in the database 

1448 self._store_research_result( 

1449 username, 

1450 password, 

1451 research_id, 

1452 subscription["id"], 

1453 result, 

1454 subscription, 

1455 ) 

1456 

1457 except Exception: 

1458 logger.exception( 

1459 f"Error triggering research for subscription {subscription['id']}" 

1460 ) 

1461 

1462 def _store_research_result( 

1463 self, 

1464 username: str, 

1465 password: str, 

1466 research_id: str, 

1467 subscription_id: int, 

1468 result: Dict[str, Any], 

1469 subscription: Dict[str, Any], 

1470 ): 

1471 """Store research result in database for news display.""" 

1472 try: 

1473 from ..database.session_context import get_user_db_session 

1474 from ..database.models import ResearchHistory 

1475 from ..settings.manager import SettingsManager 

1476 import json 

1477 

1478 # Convert result to JSON-serializable format 

1479 def make_serializable(obj): 

1480 """Convert non-serializable objects to dictionaries.""" 

1481 if hasattr(obj, "dict"): 

1482 return obj.dict() 

1483 if hasattr(obj, "__dict__"): 1483 ↛ 1484line 1483 didn't jump to line 1484 because the condition on line 1483 was never true

1484 return { 

1485 k: make_serializable(v) 

1486 for k, v in obj.__dict__.items() 

1487 if not k.startswith("_") 

1488 } 

1489 if isinstance(obj, (list, tuple)): 

1490 return [make_serializable(item) for item in obj] 

1491 if isinstance(obj, dict): 

1492 return {k: make_serializable(v) for k, v in obj.items()} 

1493 return obj 

1494 

1495 serializable_result = make_serializable(result) 

1496 

1497 with get_user_db_session(username, password) as db: 

1498 # Get user settings to store in metadata 

1499 settings_manager = SettingsManager(db) 

1500 settings_snapshot = settings_manager.get_settings_snapshot() 

1501 

1502 # Get the report content - check both 'report' and 'summary' fields 

1503 report_content = serializable_result.get( 

1504 "report" 

1505 ) or serializable_result.get("summary") 

1506 logger.debug( 

1507 f"Report content length: {len(report_content) if report_content else 0} chars" 

1508 ) 

1509 

1510 # Extract sources/links from the result 

1511 sources = serializable_result.get("sources", []) 

1512 

1513 # First add the sources/references section if we have sources 

1514 if report_content and sources: 

1515 # Import utilities for formatting links 

1516 from ..utilities.search_utilities import ( 

1517 format_links_to_markdown, 

1518 ) 

1519 

1520 # Format the links/citations 

1521 formatted_links = format_links_to_markdown(sources) 

1522 

1523 # Add references section to the report 

1524 if formatted_links: 

1525 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}" 

1526 

1527 # Then format citations in the report content 

1528 if report_content: 

1529 # Import citation formatter 

1530 from ..text_optimization.citation_formatter import ( 

1531 CitationFormatter, 

1532 CitationMode, 

1533 ) 

1534 from ..config.search_config import ( 

1535 get_setting_from_snapshot, 

1536 ) 

1537 

1538 # Get citation format from settings 

1539 citation_format = get_setting_from_snapshot( 

1540 "report.citation_format", "domain_id_hyperlinks" 

1541 ) 

1542 mode_map = { 

1543 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

1544 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

1545 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

1546 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

1547 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

1548 } 

1549 mode = mode_map.get( 

1550 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS 

1551 ) 

1552 formatter = CitationFormatter(mode=mode) 

1553 

1554 # Format citations within the content 

1555 report_content = formatter.format_document(report_content) 

1556 

1557 if not report_content: 

1558 # If neither field exists, use the full result as JSON 

1559 report_content = json.dumps(serializable_result) 

1560 

1561 # Generate headline and topics for news searches 

1562 from ..news.utils.headline_generator import generate_headline 

1563 from ..news.utils.topic_generator import generate_topics 

1564 

1565 query_text = result.get( 

1566 "query", subscription.get("query", "News Update") 

1567 ) 

1568 

1569 # Generate headline from the actual research findings 

1570 logger.info( 

1571 f"Generating headline for subscription {subscription_id}" 

1572 ) 

1573 generated_headline = generate_headline( 

1574 query=query_text, 

1575 findings=report_content, 

1576 max_length=200, # Allow longer headlines for news 

1577 ) 

1578 

1579 # Generate topics from the findings 

1580 logger.info( 

1581 f"Generating topics for subscription {subscription_id}" 

1582 ) 

1583 generated_topics = generate_topics( 

1584 query=query_text, 

1585 findings=report_content, 

1586 category=subscription.get("name", "News"), 

1587 max_topics=6, 

1588 ) 

1589 

1590 logger.info( 

1591 f"Generated headline: {generated_headline}, topics: {generated_topics}" 

1592 ) 

1593 

1594 # Get subscription name for metadata 

1595 subscription_name = subscription.get("name", "") 

1596 

1597 # Use generated headline as title, or fallback 

1598 if generated_headline: 

1599 title = generated_headline 

1600 else: 

1601 if subscription_name: 

1602 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1603 else: 

1604 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1605 

1606 # Create research history entry 

1607 history_entry = ResearchHistory( 

1608 id=research_id, 

1609 query=result.get("query", ""), 

1610 mode="news_subscription", 

1611 status="completed", 

1612 created_at=datetime.now(UTC).isoformat(), 

1613 completed_at=datetime.now(UTC).isoformat(), 

1614 title=title, 

1615 research_meta={ 

1616 "subscription_id": subscription_id, 

1617 "triggered_by": "scheduler", 

1618 "is_news_search": True, 

1619 "username": username, 

1620 "subscription_name": subscription_name, # Store subscription name for display 

1621 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval 

1622 "generated_headline": generated_headline, # Store generated headline for news display 

1623 "generated_topics": generated_topics, # Store topics for categorization 

1624 }, 

1625 ) 

1626 db.add(history_entry) 

1627 db.commit() 

1628 

1629 # Store the report content using storage abstraction 

1630 from ..storage import get_report_storage 

1631 

1632 # Use storage to save the report (report_content already retrieved above) 

1633 storage = get_report_storage(session=db) 

1634 storage.save_report( 

1635 research_id=research_id, 

1636 content=report_content, 

1637 username=username, 

1638 ) 

1639 

1640 logger.info( 

1641 f"Stored research result {research_id} for subscription {subscription_id}" 

1642 ) 

1643 

1644 except Exception: 

1645 logger.exception("Error storing research result") 

1646 

1647 def _run_cleanup_with_tracking(self): 

1648 """Wrapper that tracks cleanup execution.""" 

1649 

1650 try: 

1651 cleaned_count = self._cleanup_inactive_users() 

1652 

1653 logger.info( 

1654 f"Cleanup successful: removed {cleaned_count} inactive users" 

1655 ) 

1656 

1657 except Exception: 

1658 logger.exception("Cleanup job failed") 

1659 

1660 def _cleanup_inactive_users(self) -> int: 

1661 """Remove users inactive for longer than retention period.""" 

1662 retention_hours = self.config.get("retention_hours", 48) 

1663 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours) 

1664 

1665 cleaned_count = 0 

1666 

1667 with self.lock: 

1668 inactive_users = [ 

1669 user_id 

1670 for user_id, session in self.user_sessions.items() 

1671 if session["last_activity"] < cutoff 

1672 ] 

1673 

1674 for user_id in inactive_users: 

1675 # Remove all scheduled jobs 

1676 for job_id in self.user_sessions[user_id][ 

1677 "scheduled_jobs" 

1678 ].copy(): 

1679 try: 

1680 self.scheduler.remove_job(job_id) 

1681 except JobLookupError: 

1682 pass 

1683 

1684 # Clear credentials and session data 

1685 self._credential_store.clear(user_id) 

1686 del self.user_sessions[user_id] 

1687 cleaned_count += 1 

1688 logger.info(f"Cleaned up inactive user {user_id}") 

1689 

1690 return cleaned_count 

1691 

1692 def _reload_config(self): 

1693 """Reload configuration from settings manager.""" 

1694 if not hasattr(self, "settings_manager") or not self.settings_manager: 

1695 return 

1696 

1697 try: 

1698 old_retention = self.config.get("retention_hours", 48) 

1699 

1700 # Reload all settings 

1701 for key in self.config: 

1702 if key == "enabled": 

1703 continue # Don't change enabled state while running 

1704 

1705 full_key = f"news.scheduler.{key}" 

1706 self.config[key] = self._get_setting(full_key, self.config[key]) 

1707 

1708 # Handle changes that need immediate action 

1709 if old_retention != self.config["retention_hours"]: 

1710 logger.info( 

1711 f"Retention period changed from {old_retention} " 

1712 f"to {self.config['retention_hours']} hours" 

1713 ) 

1714 # Trigger immediate cleanup with new retention 

1715 self.scheduler.add_job( 

1716 self._wrap_job(self._run_cleanup_with_tracking), 

1717 "date", 

1718 run_date=datetime.now(UTC) + timedelta(seconds=5), 

1719 id="immediate_cleanup_config_change", 

1720 ) 

1721 

1722 # Clear settings cache to pick up any user setting changes 

1723 self.invalidate_all_settings_cache() 

1724 

1725 except Exception: 

1726 logger.exception("Error reloading configuration") 

1727 

1728 def get_status(self) -> Dict[str, Any]: 

1729 """Get scheduler status information.""" 

1730 with self.lock: 

1731 active_users = len(self.user_sessions) 

1732 total_jobs = sum( 

1733 len(session["scheduled_jobs"]) 

1734 for session in self.user_sessions.values() 

1735 ) 

1736 

1737 # Get next run time for cleanup job 

1738 next_cleanup = None 

1739 if self.is_running: 

1740 job = self.scheduler.get_job("cleanup_inactive_users") 

1741 if job: 1741 ↛ 1744line 1741 didn't jump to line 1744 because the condition on line 1741 was always true

1742 next_cleanup = job.next_run_time 

1743 

1744 return { 

1745 "is_running": self.is_running, 

1746 "config": self.config, 

1747 "active_users": active_users, 

1748 "total_scheduled_jobs": total_jobs, 

1749 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None, 

1750 "memory_usage": self._estimate_memory_usage(), 

1751 } 

1752 

1753 def _estimate_memory_usage(self) -> int: 

1754 """Estimate memory usage of user sessions.""" 

1755 

1756 # Rough estimate: username (50) + password (100) + metadata (200) per user 

1757 per_user_estimate = 350 

1758 return len(self.user_sessions) * per_user_estimate 

1759 

1760 def get_user_sessions_summary(self) -> List[Dict[str, Any]]: 

1761 """Get summary of active user sessions (without passwords).""" 

1762 with self.lock: 

1763 summary = [] 

1764 for user_id, session in self.user_sessions.items(): 

1765 summary.append( 

1766 { 

1767 "user_id": user_id, 

1768 "last_activity": session["last_activity"].isoformat(), 

1769 "scheduled_jobs": len(session["scheduled_jobs"]), 

1770 "time_since_activity": str( 

1771 datetime.now(UTC) - session["last_activity"] 

1772 ), 

1773 } 

1774 ) 

1775 return summary 

1776 

1777 

1778# Singleton instance getter 

1779_scheduler_instance = None 

1780 

1781 

1782def get_background_job_scheduler() -> BackgroundJobScheduler: 

1783 """Get the singleton news scheduler instance.""" 

1784 global _scheduler_instance 

1785 if _scheduler_instance is None: 

1786 _scheduler_instance = BackgroundJobScheduler() 

1787 return _scheduler_instance