Coverage for src / local_deep_research / news / subscription_manager / scheduler.py: 78%

626 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Activity-based news subscription scheduler for per-user encrypted databases. 

3Tracks user activity and temporarily stores credentials for automatic updates. 

4""" 

5 

6import random 

7import threading 

8from dataclasses import dataclass 

9from datetime import datetime, timedelta, UTC 

10from typing import Any, Dict, List 

11 

12from cachetools import TTLCache 

13from loguru import logger 

14from local_deep_research.settings.logger import log_settings 

15 

16from apscheduler.schedulers.background import BackgroundScheduler 

17from apscheduler.jobstores.base import JobLookupError 

18from ...constants import ResearchStatus 

19 

20# RAG indexing imports 

21from ...research_library.services.library_rag_service import LibraryRAGService 

22from ...database.library_init import get_default_library_id 

23from ...database.models.library import Document, DocumentCollection 

24 

25 

26SCHEDULER_AVAILABLE = True # Always available since it's a required dependency 

27 

28 

29@dataclass(frozen=True) 

30class DocumentSchedulerSettings: 

31 """ 

32 Immutable settings snapshot for document scheduler. 

33 

34 Thread-safe: This is a frozen dataclass that can be safely passed 

35 to and used from background threads. 

36 """ 

37 

38 enabled: bool = True 

39 interval_seconds: int = 1800 

40 download_pdfs: bool = False 

41 extract_text: bool = True 

42 generate_rag: bool = False 

43 last_run: str = "" 

44 

45 @classmethod 

46 def defaults(cls) -> "DocumentSchedulerSettings": 

47 """Return default settings.""" 

48 return cls() 

49 

50 

51class NewsScheduler: 

52 """ 

53 Singleton scheduler that manages news subscriptions for active users. 

54 

55 This scheduler: 

56 - Monitors user activity through database access 

57 - Temporarily stores user credentials in memory 

58 - Automatically schedules subscription checks 

59 - Cleans up inactive users after configurable period 

60 """ 

61 

62 _instance = None 

63 _lock = threading.Lock() 

64 

65 def __new__(cls): 

66 """Ensure singleton instance.""" 

67 if cls._instance is None: 

68 with cls._lock: 

69 if cls._instance is None: 69 ↛ 71line 69 didn't jump to line 71

70 cls._instance = super().__new__(cls) 

71 return cls._instance 

72 

73 def __init__(self): 

74 """Initialize the scheduler (only runs once due to singleton).""" 

75 # Skip if already initialized 

76 if hasattr(self, "_initialized"): 

77 return 

78 

79 # User session tracking 

80 self.user_sessions = {} # user_id -> {password, last_activity, scheduled_jobs} 

81 self.lock = threading.Lock() 

82 

83 # Scheduler instance 

84 self.scheduler = BackgroundScheduler() 

85 

86 # Configuration (will be loaded from settings) 

87 self.config = self._load_default_config() 

88 

89 # State 

90 self.is_running = False 

91 

92 # Settings cache: username -> DocumentSchedulerSettings 

93 # TTL of 300 seconds (5 minutes) reduces database queries 

94 self._settings_cache: TTLCache = TTLCache(maxsize=100, ttl=300) 

95 self._settings_cache_lock = threading.Lock() 

96 

97 self._initialized = True 

98 logger.info("News scheduler initialized") 

99 

100 def _load_default_config(self) -> Dict[str, Any]: 

101 """Load default configuration (will be overridden by settings manager).""" 

102 return { 

103 "enabled": True, 

104 "retention_hours": 48, 

105 "cleanup_interval_hours": 1, 

106 "max_jitter_seconds": 300, 

107 "max_concurrent_jobs": 10, 

108 "subscription_batch_size": 5, 

109 "activity_check_interval_minutes": 5, 

110 } 

111 

112 def initialize_with_settings(self, settings_manager): 

113 """Initialize configuration from settings manager.""" 

114 try: 

115 # Load all scheduler settings 

116 self.settings_manager = settings_manager 

117 self.config = { 

118 "enabled": self._get_setting("news.scheduler.enabled", True), 

119 "retention_hours": self._get_setting( 

120 "news.scheduler.retention_hours", 48 

121 ), 

122 "cleanup_interval_hours": self._get_setting( 

123 "news.scheduler.cleanup_interval_hours", 1 

124 ), 

125 "max_jitter_seconds": self._get_setting( 

126 "news.scheduler.max_jitter_seconds", 300 

127 ), 

128 "max_concurrent_jobs": self._get_setting( 

129 "news.scheduler.max_concurrent_jobs", 10 

130 ), 

131 "subscription_batch_size": self._get_setting( 

132 "news.scheduler.batch_size", 5 

133 ), 

134 "activity_check_interval_minutes": self._get_setting( 

135 "news.scheduler.activity_check_interval", 5 

136 ), 

137 } 

138 log_settings(self.config, "Scheduler configuration loaded") 

139 except Exception: 

140 logger.exception("Error loading scheduler settings") 

141 # Keep default config 

142 

143 def _get_setting(self, key: str, default: Any) -> Any: 

144 """Get setting with fallback to default.""" 

145 if hasattr(self, "settings_manager") and self.settings_manager: 

146 return self.settings_manager.get_setting(key, default=default) 

147 return default 

148 

149 def _get_document_scheduler_settings( 

150 self, username: str, force_refresh: bool = False 

151 ) -> DocumentSchedulerSettings: 

152 """ 

153 Get document scheduler settings for a user with TTL caching. 

154 

155 This is the single source of truth for document scheduler settings. 

156 Settings are cached for 5 minutes by default to reduce database queries. 

157 

158 Args: 

159 username: User to get settings for 

160 force_refresh: If True, bypass cache and fetch fresh settings 

161 

162 Returns: 

163 DocumentSchedulerSettings dataclass (frozen/immutable for thread-safety) 

164 """ 

165 # Fast path: check cache without modifying it 

166 if not force_refresh: 

167 with self._settings_cache_lock: 

168 cached = self._settings_cache.get(username) 

169 if cached is not None: 

170 logger.debug(f"[SETTINGS_CACHE] Cache hit for {username}") 

171 return cached 

172 

173 # Cache miss - need to fetch from database 

174 logger.debug( 

175 f"[SETTINGS_CACHE] Cache miss for {username}, fetching from DB" 

176 ) 

177 

178 # Get password from session 

179 session_info = self.user_sessions.get(username) 

180 if not session_info: 

181 logger.warning( 

182 f"[SETTINGS_CACHE] No session info for {username}, using defaults" 

183 ) 

184 return DocumentSchedulerSettings.defaults() 

185 

186 password = session_info["password"] 

187 

188 # Fetch settings from database (outside lock to avoid blocking) 

189 try: 

190 from ...database.session_context import get_user_db_session 

191 from ...settings.manager import SettingsManager 

192 

193 with get_user_db_session(username, password) as db: 

194 sm = SettingsManager(db) 

195 

196 settings = DocumentSchedulerSettings( 

197 enabled=sm.get_setting("document_scheduler.enabled", True), 

198 interval_seconds=sm.get_setting( 

199 "document_scheduler.interval_seconds", 1800 

200 ), 

201 download_pdfs=sm.get_setting( 

202 "document_scheduler.download_pdfs", False 

203 ), 

204 extract_text=sm.get_setting( 

205 "document_scheduler.extract_text", True 

206 ), 

207 generate_rag=sm.get_setting( 

208 "document_scheduler.generate_rag", False 

209 ), 

210 last_run=sm.get_setting("document_scheduler.last_run", ""), 

211 ) 

212 

213 # Store in cache 

214 with self._settings_cache_lock: 

215 self._settings_cache[username] = settings 

216 logger.debug(f"[SETTINGS_CACHE] Cached settings for {username}") 

217 

218 return settings 

219 

220 except Exception: 

221 logger.exception( 

222 f"[SETTINGS_CACHE] Error fetching settings for {username}" 

223 ) 

224 return DocumentSchedulerSettings.defaults() 

225 

226 def invalidate_user_settings_cache(self, username: str) -> bool: 

227 """ 

228 Invalidate cached settings for a specific user. 

229 

230 Call this when user settings change or user logs out. 

231 

232 Args: 

233 username: User whose cache to invalidate 

234 

235 Returns: 

236 True if cache entry was removed, False if not found 

237 """ 

238 with self._settings_cache_lock: 

239 if username in self._settings_cache: 

240 del self._settings_cache[username] 

241 logger.debug( 

242 f"[SETTINGS_CACHE] Invalidated cache for {username}" 

243 ) 

244 return True 

245 return False 

246 

247 def invalidate_all_settings_cache(self) -> int: 

248 """ 

249 Invalidate all cached settings. 

250 

251 Call this when doing bulk settings updates or during config reload. 

252 

253 Returns: 

254 Number of cache entries cleared 

255 """ 

256 with self._settings_cache_lock: 

257 count = len(self._settings_cache) 

258 self._settings_cache.clear() 

259 logger.info( 

260 f"[SETTINGS_CACHE] Cleared all settings cache ({count} entries)" 

261 ) 

262 return count 

263 

264 def start(self): 

265 """Start the scheduler.""" 

266 if not self.config.get("enabled", True): 

267 logger.info("News scheduler is disabled in settings") 

268 return 

269 

270 if self.is_running: 

271 logger.warning("Scheduler is already running") 

272 return 

273 

274 # Schedule cleanup job 

275 self.scheduler.add_job( 

276 self._run_cleanup_with_tracking, 

277 "interval", 

278 hours=self.config["cleanup_interval_hours"], 

279 id="cleanup_inactive_users", 

280 name="Cleanup Inactive User Sessions", 

281 jitter=60, # Add some jitter to cleanup 

282 ) 

283 

284 # Schedule configuration reload 

285 self.scheduler.add_job( 

286 self._reload_config, 

287 "interval", 

288 minutes=30, 

289 id="reload_config", 

290 name="Reload Configuration", 

291 ) 

292 

293 # Start the scheduler 

294 self.scheduler.start() 

295 self.is_running = True 

296 

297 # Schedule initial cleanup after a delay 

298 self.scheduler.add_job( 

299 self._run_cleanup_with_tracking, 

300 "date", 

301 run_date=datetime.now(UTC) + timedelta(seconds=30), 

302 id="initial_cleanup", 

303 ) 

304 

305 logger.info("News scheduler started") 

306 

307 def stop(self): 

308 """Stop the scheduler.""" 

309 if self.is_running: 

310 self.scheduler.shutdown(wait=True) 

311 self.is_running = False 

312 

313 # Clear all user sessions 

314 with self.lock: 

315 self.user_sessions.clear() 

316 

317 logger.info("News scheduler stopped") 

318 

319 def update_user_info(self, username: str, password: str): 

320 """ 

321 Update user info in scheduler. Called on every database interaction. 

322 

323 Args: 

324 username: User's username 

325 password: User's password 

326 """ 

327 logger.info( 

328 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}" 

329 ) 

330 logger.debug( 

331 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}" 

332 ) 

333 

334 if not self.is_running: 

335 logger.warning( 

336 f"[SCHEDULER] Scheduler not running, cannot update user {username}" 

337 ) 

338 return 

339 

340 with self.lock: 

341 now = datetime.now(UTC) 

342 

343 if username not in self.user_sessions: 

344 # New user - create session info 

345 logger.info(f"[SCHEDULER] New user in scheduler: {username}") 

346 self.user_sessions[username] = { 

347 "password": password, 

348 "last_activity": now, 

349 "scheduled_jobs": set(), 

350 } 

351 logger.debug( 

352 f"[SCHEDULER] Created session for {username}, scheduling subscriptions" 

353 ) 

354 # Schedule their subscriptions 

355 self._schedule_user_subscriptions(username) 

356 else: 

357 # Existing user - update info 

358 logger.info( 

359 f"[SCHEDULER] Updating existing user {username} activity, will reschedule" 

360 ) 

361 old_activity = self.user_sessions[username]["last_activity"] 

362 activity_delta = now - old_activity 

363 logger.debug( 

364 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}" 

365 ) 

366 

367 self.user_sessions[username]["password"] = password 

368 self.user_sessions[username]["last_activity"] = now 

369 logger.debug( 

370 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions" 

371 ) 

372 # Reschedule their subscriptions in case they changed 

373 self._schedule_user_subscriptions(username) 

374 

375 def unregister_user(self, username: str): 

376 """ 

377 Unregister a user and clean up their scheduled jobs. 

378 Called when user logs out. 

379 """ 

380 with self.lock: 

381 if username in self.user_sessions: 

382 logger.info(f"Unregistering user {username}") 

383 

384 # Remove all scheduled jobs for this user 

385 session_info = self.user_sessions[username] 

386 for job_id in session_info["scheduled_jobs"].copy(): 

387 try: 

388 self.scheduler.remove_job(job_id) 

389 except JobLookupError: 

390 pass 

391 

392 # Remove user session 

393 del self.user_sessions[username] 

394 

395 # Invalidate settings cache for this user (outside lock) 

396 self.invalidate_user_settings_cache(username) 

397 logger.info(f"User {username} unregistered successfully") 

398 

399 def _schedule_user_subscriptions(self, username: str): 

400 """Schedule all active subscriptions for a user.""" 

401 logger.info(f"_schedule_user_subscriptions called for {username}") 

402 try: 

403 session_info = self.user_sessions.get(username) 

404 if not session_info: 

405 logger.warning(f"No session info found for {username}") 

406 return 

407 

408 password = session_info["password"] 

409 logger.debug( 

410 f"Got password for {username}: {'present' if password else 'missing'}" 

411 ) 

412 

413 # Get user's subscriptions from their encrypted database 

414 from ...database.session_context import get_user_db_session 

415 from ...database.models.news import NewsSubscription 

416 

417 with get_user_db_session(username, password) as db: 

418 subscriptions = ( 

419 db.query(NewsSubscription).filter_by(is_active=True).all() 

420 ) 

421 logger.debug( 

422 f"Query executed, found {len(subscriptions)} results" 

423 ) 

424 

425 # Log details of each subscription 

426 for sub in subscriptions: 

427 logger.debug( 

428 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes" 

429 ) 

430 

431 logger.info( 

432 f"Found {len(subscriptions)} active subscriptions for {username}" 

433 ) 

434 

435 # Clear old jobs for this user 

436 for job_id in session_info["scheduled_jobs"].copy(): 

437 try: 

438 self.scheduler.remove_job(job_id) 

439 session_info["scheduled_jobs"].remove(job_id) 

440 except JobLookupError: 

441 pass 

442 

443 # Schedule each subscription with jitter 

444 for sub in subscriptions: 

445 job_id = f"{username}_{sub.id}" 

446 

447 # Calculate jitter 

448 # Security: random jitter to distribute subscription timing, not security-sensitive 

449 max_jitter = int(self.config.get("max_jitter_seconds", 300)) 

450 jitter = random.randint(0, max_jitter) 

451 

452 # Determine trigger based on frequency 

453 refresh_minutes = sub.refresh_interval_minutes 

454 

455 if refresh_minutes <= 60: # 60 minutes or less 

456 # For hourly or more frequent, use interval trigger 

457 trigger = "interval" 

458 trigger_args = { 

459 "minutes": refresh_minutes, 

460 "jitter": jitter, 

461 "start_date": datetime.now(UTC), # Start immediately 

462 } 

463 else: 

464 # For less frequent, calculate next run time 

465 now = datetime.now(UTC) 

466 if sub.next_refresh: 

467 # Ensure timezone-aware for comparison with now (UTC) 

468 next_refresh_aware = sub.next_refresh 

469 if next_refresh_aware.tzinfo is None: 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true

470 logger.warning( 

471 f"Subscription {sub.id} has naive (non-tz-aware) " 

472 f"next_refresh datetime, assuming UTC" 

473 ) 

474 next_refresh_aware = next_refresh_aware.replace( 

475 tzinfo=UTC 

476 ) 

477 if next_refresh_aware <= now: 

478 # Subscription is overdue - run it immediately with small jitter 

479 logger.info( 

480 f"Subscription {sub.id} is overdue, scheduling immediate run" 

481 ) 

482 next_run = now + timedelta(seconds=jitter) 

483 else: 

484 next_run = next_refresh_aware 

485 else: 

486 next_run = now + timedelta( 

487 minutes=refresh_minutes, seconds=jitter 

488 ) 

489 

490 trigger = "date" 

491 trigger_args = {"run_date": next_run} 

492 

493 # Add the job 

494 self.scheduler.add_job( 

495 func=self._check_subscription, 

496 args=[username, sub.id], 

497 trigger=trigger, 

498 id=job_id, 

499 name=f"Check {sub.name or sub.query_or_topic[:30]}", 

500 replace_existing=True, 

501 **trigger_args, 

502 ) 

503 

504 session_info["scheduled_jobs"].add(job_id) 

505 logger.info(f"Scheduled job {job_id} with {trigger} trigger") 

506 

507 except Exception: 

508 logger.exception(f"Error scheduling subscriptions for {username}") 

509 

510 # Add document processing for this user 

511 self._schedule_document_processing(username) 

512 

513 def _schedule_document_processing(self, username: str): 

514 """Schedule document processing for a user.""" 

515 logger.info( 

516 f"[DOC_SCHEDULER] Scheduling document processing for {username}" 

517 ) 

518 logger.debug( 

519 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}" 

520 ) 

521 

522 try: 

523 session_info = self.user_sessions.get(username) 

524 if not session_info: 

525 logger.warning( 

526 f"[DOC_SCHEDULER] No session info found for {username}" 

527 ) 

528 logger.debug( 

529 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}" 

530 ) 

531 return 

532 

533 logger.debug( 

534 f"[DOC_SCHEDULER] Retrieved session for {username}, scheduler running: {self.is_running}" 

535 ) 

536 

537 # Get user's document scheduler settings (cached) 

538 settings = self._get_document_scheduler_settings(username) 

539 

540 if not settings.enabled: 

541 logger.info( 

542 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}" 

543 ) 

544 return 

545 

546 logger.info( 

547 f"[DOC_SCHEDULER] User {username} document settings: enabled={settings.enabled}, " 

548 f"interval={settings.interval_seconds}s, pdfs={settings.download_pdfs}, " 

549 f"text={settings.extract_text}, rag={settings.generate_rag}" 

550 ) 

551 

552 # Schedule document processing job 

553 job_id = f"{username}_document_processing" 

554 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}") 

555 

556 # Remove existing document job if any 

557 try: 

558 self.scheduler.remove_job(job_id) 

559 session_info["scheduled_jobs"].discard(job_id) 

560 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}") 

561 except JobLookupError: 

562 logger.debug( 

563 f"[DOC_SCHEDULER] No existing job {job_id} to remove" 

564 ) 

565 pass # Job doesn't exist, that's fine 

566 

567 # Add new document processing job 

568 logger.debug( 

569 f"[DOC_SCHEDULER] Adding new document processing job with interval {settings.interval_seconds}s" 

570 ) 

571 self.scheduler.add_job( 

572 func=self._process_user_documents, 

573 args=[username], 

574 trigger="interval", 

575 seconds=settings.interval_seconds, 

576 id=job_id, 

577 name=f"Process Documents for {username}", 

578 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously 

579 max_instances=1, # Prevent overlapping document processing for same user 

580 replace_existing=True, 

581 ) 

582 

583 session_info["scheduled_jobs"].add(job_id) 

584 logger.info( 

585 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {settings.interval_seconds}s interval" 

586 ) 

587 logger.debug( 

588 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}" 

589 ) 

590 

591 # Verify job was added 

592 job = self.scheduler.get_job(job_id) 

593 if job: 593 ↛ 598line 593 didn't jump to line 598 because the condition on line 593 was always true

594 logger.info( 

595 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}" 

596 ) 

597 else: 

598 logger.error( 

599 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!" 

600 ) 

601 

602 except Exception: 

603 logger.exception( 

604 f"Error scheduling document processing for {username}" 

605 ) 

606 

607 def _process_user_documents(self, username: str): 

608 """Process documents for a user.""" 

609 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}") 

610 start_time = datetime.now(UTC) 

611 

612 try: 

613 session_info = self.user_sessions.get(username) 

614 if not session_info: 

615 logger.warning( 

616 f"[DOC_SCHEDULER] No session info found for user {username}" 

617 ) 

618 return 

619 

620 password = session_info["password"] 

621 logger.debug( 

622 f"[DOC_SCHEDULER] Starting document processing for {username}" 

623 ) 

624 

625 # Get user's document scheduler settings (cached) 

626 settings = self._get_document_scheduler_settings(username) 

627 

628 logger.info( 

629 f"[DOC_SCHEDULER] Processing settings for {username}: " 

630 f"pdfs={settings.download_pdfs}, text={settings.extract_text}, rag={settings.generate_rag}" 

631 ) 

632 

633 if not any( 633 ↛ 646line 633 didn't jump to line 646 because the condition on line 633 was always true

634 [ 

635 settings.download_pdfs, 

636 settings.extract_text, 

637 settings.generate_rag, 

638 ] 

639 ): 

640 logger.info( 

641 f"[DOC_SCHEDULER] No processing options enabled for user {username}" 

642 ) 

643 return 

644 

645 # Parse last_run from cached settings 

646 last_run = ( 

647 datetime.fromisoformat(settings.last_run) 

648 if settings.last_run 

649 else None 

650 ) 

651 

652 logger.info(f"[DOC_SCHEDULER] Last run for {username}: {last_run}") 

653 

654 # Need database session for queries and updates 

655 from ...database.session_context import get_user_db_session 

656 from ...database.models.research import ResearchHistory 

657 from ...settings.manager import SettingsManager 

658 

659 with get_user_db_session(username, password) as db: 

660 settings_manager = SettingsManager(db) 

661 

662 # Query for completed research since last run 

663 logger.debug( 

664 f"[DOC_SCHEDULER] Querying for completed research since {last_run}" 

665 ) 

666 query = db.query(ResearchHistory).filter( 

667 ResearchHistory.status == ResearchStatus.COMPLETED, 

668 ResearchHistory.completed_at.is_not( 

669 None 

670 ), # Ensure completed_at is not null 

671 ) 

672 

673 if last_run: 

674 query = query.filter( 

675 ResearchHistory.completed_at > last_run 

676 ) 

677 

678 # Limit to recent research to prevent overwhelming 

679 query = query.order_by( 

680 ResearchHistory.completed_at.desc() 

681 ).limit(20) 

682 

683 research_sessions = query.all() 

684 logger.debug( 

685 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions" 

686 ) 

687 

688 if not research_sessions: 

689 logger.info( 

690 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}" 

691 ) 

692 return 

693 

694 logger.info( 

695 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}" 

696 ) 

697 

698 # Log details of each research session 

699 for i, research in enumerate( 

700 research_sessions[:5] 

701 ): # Log first 5 details 

702 title_safe = ( 

703 (research.title[:50] + "...") 

704 if research.title 

705 else "No title" 

706 ) 

707 completed_safe = ( 

708 research.completed_at 

709 if research.completed_at 

710 else "No completion time" 

711 ) 

712 logger.debug( 

713 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}" 

714 ) 

715 

716 # Handle completed_at which might be a string or datetime 

717 completed_at_obj = None 

718 if research.completed_at: 

719 if isinstance(research.completed_at, str): 

720 try: 

721 completed_at_obj = datetime.fromisoformat( 

722 research.completed_at.replace("Z", "+00:00") 

723 ) 

724 except (ValueError, TypeError, AttributeError): 

725 completed_at_obj = None 

726 else: 

727 completed_at_obj = research.completed_at 

728 

729 logger.debug( 

730 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}" 

731 ) 

732 logger.debug( 

733 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}" 

734 ) 

735 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}") 

736 logger.debug( 

737 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}" 

738 ) 

739 

740 processed_count = 0 

741 for research in research_sessions: 

742 try: 

743 logger.info( 

744 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}" 

745 ) 

746 

747 # Call actual processing APIs 

748 if settings.download_pdfs: 

749 logger.info( 

750 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}" 

751 ) 

752 try: 

753 # Use the DownloadService to queue PDF downloads 

754 from ...research_library.services.download_service import ( 

755 DownloadService, 

756 ) 

757 

758 with DownloadService( 

759 username=username, password=password 

760 ) as download_service: 

761 queued_count = download_service.queue_research_downloads( 

762 research.id 

763 ) 

764 logger.info( 

765 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}" 

766 ) 

767 except Exception: 

768 logger.exception( 

769 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}" 

770 ) 

771 

772 if settings.extract_text: 

773 logger.info( 

774 f"[DOC_SCHEDULER] Extracting text for research {research.id}" 

775 ) 

776 try: 

777 # Use the DownloadService to extract text for all resources 

778 from ...research_library.services.download_service import ( 

779 DownloadService, 

780 ) 

781 from ...database.models.research import ( 

782 ResearchResource, 

783 ) 

784 

785 with DownloadService( 

786 username=username, password=password 

787 ) as download_service: 

788 # Get all resources for this research (reuse existing db session) 

789 resources = ( 

790 db.query(ResearchResource) 

791 .filter_by(research_id=research.id) 

792 .all() 

793 ) 

794 processed_count = 0 

795 for resource in resources: 

796 # We need to pass the password to the download service 

797 # The DownloadService creates its own database sessions, so we need to ensure password is available 

798 try: 

799 success, error = ( 

800 download_service.download_as_text( 

801 resource.id 

802 ) 

803 ) 

804 if success: 

805 processed_count += 1 

806 logger.info( 

807 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}" 

808 ) 

809 else: 

810 logger.warning( 

811 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}" 

812 ) 

813 except Exception as resource_error: 

814 logger.exception( 

815 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}" 

816 ) 

817 logger.info( 

818 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed" 

819 ) 

820 except Exception: 

821 logger.exception( 

822 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}" 

823 ) 

824 

825 if settings.generate_rag: 

826 logger.info( 

827 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}" 

828 ) 

829 try: 

830 # Get embedding settings from user configuration 

831 embedding_model = settings_manager.get_setting( 

832 "local_search_embedding_model", 

833 "all-MiniLM-L6-v2", 

834 ) 

835 embedding_provider = ( 

836 settings_manager.get_setting( 

837 "local_search_embedding_provider", 

838 "sentence_transformers", 

839 ) 

840 ) 

841 chunk_size = int( 

842 settings_manager.get_setting( 

843 "local_search_chunk_size", 1000 

844 ) 

845 ) 

846 chunk_overlap = int( 

847 settings_manager.get_setting( 

848 "local_search_chunk_overlap", 200 

849 ) 

850 ) 

851 

852 # Initialize RAG service with user's embedding configuration 

853 with LibraryRAGService( 

854 username=username, 

855 embedding_model=embedding_model, 

856 embedding_provider=embedding_provider, 

857 chunk_size=chunk_size, 

858 chunk_overlap=chunk_overlap, 

859 db_password=password, 

860 ) as rag_service: 

861 # Get default Library collection ID 

862 library_collection_id = ( 

863 get_default_library_id( 

864 username, password 

865 ) 

866 ) 

867 

868 # Query for unindexed documents from this research session 

869 documents_to_index = ( 

870 db.query(Document.id, Document.title) 

871 .outerjoin( 

872 DocumentCollection, 

873 ( 

874 DocumentCollection.document_id 

875 == Document.id 

876 ) 

877 & ( 

878 DocumentCollection.collection_id 

879 == library_collection_id 

880 ), 

881 ) 

882 .filter( 

883 Document.research_id == research.id, 

884 Document.text_content.isnot(None), 

885 ( 

886 DocumentCollection.indexed.is_( 

887 False 

888 ) 

889 | DocumentCollection.id.is_( 

890 None 

891 ) 

892 ), 

893 ) 

894 .all() 

895 ) 

896 

897 if not documents_to_index: 

898 logger.info( 

899 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}" 

900 ) 

901 else: 

902 # Index each document 

903 indexed_count = 0 

904 for ( 

905 doc_id, 

906 doc_title, 

907 ) in documents_to_index: 

908 try: 

909 result = rag_service.index_document( 

910 document_id=doc_id, 

911 collection_id=library_collection_id, 

912 force_reindex=False, 

913 ) 

914 if ( 

915 result["status"] 

916 == "success" 

917 ): 

918 indexed_count += 1 

919 logger.info( 

920 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) " 

921 f"with {result.get('chunk_count', 0)} chunks" 

922 ) 

923 except Exception as doc_error: 

924 logger.exception( 

925 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}" 

926 ) 

927 

928 logger.info( 

929 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: " 

930 f"{indexed_count}/{len(documents_to_index)} documents indexed" 

931 ) 

932 except Exception: 

933 logger.exception( 

934 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}" 

935 ) 

936 

937 processed_count += 1 

938 logger.debug( 

939 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}" 

940 ) 

941 

942 except Exception: 

943 logger.exception( 

944 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}" 

945 ) 

946 

947 # Update last run time in user's settings 

948 current_time = datetime.now(UTC).isoformat() 

949 settings_manager.set_setting( 

950 "document_scheduler.last_run", current_time, commit=True 

951 ) 

952 logger.debug( 

953 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}" 

954 ) 

955 

956 end_time = datetime.now(UTC) 

957 duration = (end_time - start_time).total_seconds() 

958 logger.info( 

959 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s" 

960 ) 

961 

962 except Exception: 

963 logger.exception( 

964 f"[DOC_SCHEDULER] Error processing documents for user {username}" 

965 ) 

966 finally: 

967 # Clean up thread-local session after job completes 

968 from ...database.thread_local_session import cleanup_current_thread 

969 

970 cleanup_current_thread() 

971 

972 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]: 

973 """Get document scheduler status for a specific user.""" 

974 try: 

975 session_info = self.user_sessions.get(username) 

976 if not session_info: 

977 return { 

978 "enabled": False, 

979 "message": "User not found in scheduler", 

980 } 

981 

982 # Get user's document scheduler settings (cached) 

983 settings = self._get_document_scheduler_settings(username) 

984 

985 # Check if user has document processing job 

986 job_id = f"{username}_document_processing" 

987 has_job = job_id in session_info.get("scheduled_jobs", set()) 

988 

989 return { 

990 "enabled": settings.enabled, 

991 "interval_seconds": settings.interval_seconds, 

992 "processing_options": { 

993 "download_pdfs": settings.download_pdfs, 

994 "extract_text": settings.extract_text, 

995 "generate_rag": settings.generate_rag, 

996 }, 

997 "last_run": settings.last_run, 

998 "has_scheduled_job": has_job, 

999 "user_active": username in self.user_sessions, 

1000 } 

1001 

1002 except Exception as e: 

1003 logger.exception( 

1004 f"Error getting document scheduler status for user {username}" 

1005 ) 

1006 return { 

1007 "enabled": False, 

1008 "message": f"Failed to retrieve scheduler status: {type(e).__name__}", 

1009 } 

1010 

1011 def trigger_document_processing(self, username: str) -> bool: 

1012 """Trigger immediate document processing for a user.""" 

1013 logger.info( 

1014 f"[DOC_SCHEDULER] Manual trigger requested for user {username}" 

1015 ) 

1016 try: 

1017 session_info = self.user_sessions.get(username) 

1018 if not session_info: 

1019 logger.warning( 

1020 f"[DOC_SCHEDULER] User {username} not found in scheduler" 

1021 ) 

1022 logger.debug( 

1023 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}" 

1024 ) 

1025 return False 

1026 

1027 if not self.is_running: 

1028 logger.warning( 

1029 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}" 

1030 ) 

1031 return False 

1032 

1033 # Trigger immediate processing 

1034 job_id = f"{username}_document_processing_manual" 

1035 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}") 

1036 

1037 self.scheduler.add_job( 

1038 func=self._process_user_documents, 

1039 args=[username], 

1040 trigger="date", 

1041 run_date=datetime.now(UTC) + timedelta(seconds=1), 

1042 id=job_id, 

1043 name=f"Manual Document Processing for {username}", 

1044 replace_existing=True, 

1045 ) 

1046 

1047 # Verify job was added 

1048 job = self.scheduler.get_job(job_id) 

1049 if job: 

1050 logger.info( 

1051 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}" 

1052 ) 

1053 else: 

1054 logger.error( 

1055 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!" 

1056 ) 

1057 return False 

1058 

1059 return True 

1060 

1061 except Exception: 

1062 logger.exception( 

1063 f"[DOC_SCHEDULER] Error triggering document processing for user {username}" 

1064 ) 

1065 return False 

1066 

1067 def _check_user_overdue_subscriptions(self, username: str): 

1068 """Check and immediately run any overdue subscriptions for a user.""" 

1069 try: 

1070 session_info = self.user_sessions.get(username) 

1071 if not session_info: 

1072 return 

1073 

1074 password = session_info["password"] 

1075 

1076 # Get user's overdue subscriptions 

1077 from ...database.session_context import get_user_db_session 

1078 from ...database.models.news import NewsSubscription 

1079 from datetime import timezone 

1080 

1081 with get_user_db_session(username, password) as db: 

1082 now = datetime.now(timezone.utc) 

1083 overdue_subs = ( 

1084 db.query(NewsSubscription) 

1085 .filter( 

1086 NewsSubscription.is_active.is_(True), 

1087 NewsSubscription.next_refresh.is_not(None), 

1088 NewsSubscription.next_refresh <= now, 

1089 ) 

1090 .all() 

1091 ) 

1092 

1093 if overdue_subs: 

1094 logger.info( 

1095 f"Found {len(overdue_subs)} overdue subscriptions for {username}" 

1096 ) 

1097 

1098 for sub in overdue_subs: 

1099 # Run immediately with small random delay 

1100 # Security: random delay to stagger overdue jobs, not security-sensitive 

1101 delay_seconds = random.randint(1, 30) 

1102 job_id = ( 

1103 f"overdue_{username}_{sub.id}_{int(now.timestamp())}" 

1104 ) 

1105 

1106 self.scheduler.add_job( 

1107 func=self._check_subscription, 

1108 args=[username, sub.id], 

1109 trigger="date", 

1110 run_date=now + timedelta(seconds=delay_seconds), 

1111 id=job_id, 

1112 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}", 

1113 replace_existing=True, 

1114 ) 

1115 

1116 logger.info( 

1117 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds" 

1118 ) 

1119 

1120 except Exception: 

1121 logger.exception( 

1122 f"Error checking overdue subscriptions for {username}" 

1123 ) 

1124 finally: 

1125 # Clean up thread-local session after job completes 

1126 from ...database.thread_local_session import cleanup_current_thread 

1127 

1128 cleanup_current_thread() 

1129 

1130 def _check_subscription(self, username: str, subscription_id: int): 

1131 """Check and refresh a single subscription.""" 

1132 logger.info( 

1133 f"_check_subscription called for user {username}, subscription {subscription_id}" 

1134 ) 

1135 try: 

1136 session_info = self.user_sessions.get(username) 

1137 if not session_info: 

1138 # User no longer active, cancel job 

1139 job_id = f"{username}_{subscription_id}" 

1140 try: 

1141 self.scheduler.remove_job(job_id) 

1142 except JobLookupError: 

1143 pass 

1144 return 

1145 

1146 password = session_info["password"] 

1147 

1148 # Get subscription details 

1149 from ...database.session_context import get_user_db_session 

1150 from ...database.models.news import NewsSubscription 

1151 

1152 with get_user_db_session(username, password) as db: 

1153 sub = db.query(NewsSubscription).get(subscription_id) 

1154 if not sub or not sub.is_active: 

1155 logger.info( 

1156 f"Subscription {subscription_id} not active, skipping" 

1157 ) 

1158 return 

1159 

1160 # Prepare query with date replacement using user's timezone 

1161 query = sub.query_or_topic 

1162 if "YYYY-MM-DD" in query: 

1163 from ..core.utils import get_local_date_string 

1164 from ...settings.manager import SettingsManager 

1165 

1166 settings_manager = SettingsManager(db) 

1167 local_date = get_local_date_string(settings_manager) 

1168 query = query.replace("YYYY-MM-DD", local_date) 

1169 

1170 # Update last/next refresh times 

1171 sub.last_refresh = datetime.now(UTC) 

1172 sub.next_refresh = datetime.now(UTC) + timedelta( 

1173 minutes=sub.refresh_interval_minutes 

1174 ) 

1175 db.commit() 

1176 

1177 subscription_data = { 

1178 "id": sub.id, 

1179 "name": sub.name, 

1180 "query": query, 

1181 "original_query": sub.query_or_topic, 

1182 "model_provider": sub.model_provider, 

1183 "model": sub.model, 

1184 "search_strategy": sub.search_strategy, 

1185 "search_engine": sub.search_engine, 

1186 } 

1187 

1188 logger.info( 

1189 f"Refreshing subscription {subscription_id}: {subscription_data['name']}" 

1190 ) 

1191 

1192 # Trigger research synchronously using requests with proper auth 

1193 self._trigger_subscription_research_sync( 

1194 username, subscription_data 

1195 ) 

1196 

1197 # Reschedule for next interval if using interval trigger 

1198 job_id = f"{username}_{subscription_id}" 

1199 job = self.scheduler.get_job(job_id) 

1200 if job and job.trigger.__class__.__name__ == "DateTrigger": 

1201 # For date triggers, reschedule 

1202 # Security: random jitter to distribute subscription timing, not security-sensitive 

1203 next_run = datetime.now(UTC) + timedelta( 

1204 minutes=sub.refresh_interval_minutes, 

1205 seconds=random.randint( 

1206 0, int(self.config.get("max_jitter_seconds", 300)) 

1207 ), 

1208 ) 

1209 self.scheduler.add_job( 

1210 func=self._check_subscription, 

1211 args=[username, subscription_id], 

1212 trigger="date", 

1213 run_date=next_run, 

1214 id=job_id, 

1215 replace_existing=True, 

1216 ) 

1217 

1218 except Exception: 

1219 logger.exception(f"Error checking subscription {subscription_id}") 

1220 finally: 

1221 # Clean up thread-local session after job completes 

1222 from ...database.thread_local_session import cleanup_current_thread 

1223 

1224 cleanup_current_thread() 

1225 

1226 def _trigger_subscription_research_sync( 

1227 self, username: str, subscription: Dict[str, Any] 

1228 ): 

1229 """Trigger research for a subscription using programmatic API.""" 

1230 # Import clear_settings_context before try so it is always 

1231 # available in the finally block, even if an early return or 

1232 # exception occurs before the lazy imports inside try. 

1233 from ...config.thread_settings import ( 

1234 clear_settings_context, 

1235 set_settings_context, 

1236 ) 

1237 

1238 try: 

1239 # Get user's password from session info 

1240 session_info = self.user_sessions.get(username) 

1241 if not session_info: 

1242 logger.error(f"No session info for user {username}") 

1243 return 

1244 

1245 password = session_info["password"] 

1246 

1247 # Generate research ID 

1248 import uuid 

1249 

1250 research_id = str(uuid.uuid4()) 

1251 

1252 logger.info( 

1253 f"Starting research {research_id} for subscription {subscription['id']}" 

1254 ) 

1255 

1256 # Get user settings for research 

1257 from ...database.session_context import get_user_db_session 

1258 from ...settings.manager import SettingsManager 

1259 

1260 with get_user_db_session(username, password) as db: 

1261 settings_manager = SettingsManager(db) 

1262 settings_snapshot = settings_manager.get_settings_snapshot() 

1263 

1264 # Use the search engine from the subscription if specified 

1265 search_engine = subscription.get("search_engine") 

1266 

1267 if search_engine: 1267 ↛ 1277line 1267 didn't jump to line 1277 because the condition on line 1267 was always true

1268 settings_snapshot["search.tool"] = { 

1269 "value": search_engine, 

1270 "ui_element": "select", 

1271 } 

1272 logger.info( 

1273 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}" 

1274 ) 

1275 else: 

1276 # Use the user's default search tool from their settings 

1277 default_search_tool = settings_snapshot.get( 

1278 "search.tool", "auto" 

1279 ) 

1280 logger.info( 

1281 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}" 

1282 ) 

1283 

1284 logger.debug( 

1285 f"Settings snapshot has {len(settings_snapshot)} settings" 

1286 ) 

1287 # Log a few key settings to verify they're present 

1288 logger.debug( 

1289 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}" 

1290 ) 

1291 

1292 # Set up research parameters 

1293 query = subscription["query"] 

1294 

1295 # Build metadata for news search 

1296 metadata = { 

1297 "is_news_search": True, 

1298 "search_type": "news_analysis", 

1299 "display_in": "news_feed", 

1300 "subscription_id": subscription["id"], 

1301 "triggered_by": "scheduler", 

1302 "subscription_name": subscription["name"], 

1303 "title": subscription["name"] if subscription["name"] else None, 

1304 "scheduled_at": datetime.now(UTC).isoformat(), 

1305 "original_query": subscription["original_query"], 

1306 "user_id": username, 

1307 } 

1308 

1309 # Use programmatic API with settings context 

1310 from ...api.research_functions import quick_summary 

1311 

1312 # Create and set settings context for this thread 

1313 class SettingsContext: 

1314 def __init__(self, snapshot): 

1315 self.snapshot = snapshot or {} 

1316 self.values = {} 

1317 for key, setting in self.snapshot.items(): 

1318 if isinstance(setting, dict) and "value" in setting: 1318 ↛ 1321line 1318 didn't jump to line 1321 because the condition on line 1318 was always true

1319 self.values[key] = setting["value"] 

1320 else: 

1321 self.values[key] = setting 

1322 

1323 def get_setting(self, key, default=None): 

1324 """Get setting from snapshot only""" 

1325 return self.values.get(key, default) 

1326 

1327 # Set the context for this thread 

1328 settings_context = SettingsContext(settings_snapshot) 

1329 set_settings_context(settings_context) 

1330 

1331 # Get search strategy from subscription data (for the API call) 

1332 search_strategy = subscription.get( 

1333 "search_strategy", "news_aggregation" 

1334 ) 

1335 

1336 # Call quick_summary with appropriate parameters 

1337 result = quick_summary( 

1338 query=query, 

1339 research_id=research_id, 

1340 username=username, 

1341 user_password=password, 

1342 settings_snapshot=settings_snapshot, 

1343 search_strategy=search_strategy, 

1344 model_name=subscription.get("model"), 

1345 provider=subscription.get("model_provider"), 

1346 iterations=1, # Single iteration for news 

1347 metadata=metadata, 

1348 search_original_query=False, # Don't send long subscription prompts to search engines 

1349 ) 

1350 

1351 logger.info( 

1352 f"Completed research {research_id} for subscription {subscription['id']}" 

1353 ) 

1354 

1355 # Store the research result in the database 

1356 self._store_research_result( 

1357 username, 

1358 password, 

1359 research_id, 

1360 subscription["id"], 

1361 result, 

1362 subscription, 

1363 ) 

1364 

1365 except Exception: 

1366 logger.exception( 

1367 f"Error triggering research for subscription {subscription['id']}" 

1368 ) 

1369 finally: 

1370 clear_settings_context() 

1371 

1372 def _store_research_result( 

1373 self, 

1374 username: str, 

1375 password: str, 

1376 research_id: str, 

1377 subscription_id: int, 

1378 result: Dict[str, Any], 

1379 subscription: Dict[str, Any], 

1380 ): 

1381 """Store research result in database for news display.""" 

1382 try: 

1383 from ...database.session_context import get_user_db_session 

1384 from ...database.models import ResearchHistory 

1385 from ...settings.manager import SettingsManager 

1386 import json 

1387 

1388 # Convert result to JSON-serializable format 

1389 def make_serializable(obj): 

1390 """Convert non-serializable objects to dictionaries.""" 

1391 if hasattr(obj, "dict"): 1391 ↛ 1392line 1391 didn't jump to line 1392 because the condition on line 1391 was never true

1392 return obj.dict() 

1393 elif hasattr(obj, "__dict__"): 1393 ↛ 1394line 1393 didn't jump to line 1394 because the condition on line 1393 was never true

1394 return { 

1395 k: make_serializable(v) 

1396 for k, v in obj.__dict__.items() 

1397 if not k.startswith("_") 

1398 } 

1399 elif isinstance(obj, (list, tuple)): 

1400 return [make_serializable(item) for item in obj] 

1401 elif isinstance(obj, dict): 

1402 return {k: make_serializable(v) for k, v in obj.items()} 

1403 else: 

1404 return obj 

1405 

1406 serializable_result = make_serializable(result) 

1407 

1408 with get_user_db_session(username, password) as db: 

1409 # Get user settings to store in metadata 

1410 settings_manager = SettingsManager(db) 

1411 settings_snapshot = settings_manager.get_settings_snapshot() 

1412 

1413 # Get the report content - check both 'report' and 'summary' fields 

1414 report_content = serializable_result.get( 

1415 "report" 

1416 ) or serializable_result.get("summary") 

1417 logger.debug( 

1418 f"Report content length: {len(report_content) if report_content else 0} chars" 

1419 ) 

1420 

1421 # Extract sources/links from the result 

1422 sources = serializable_result.get("sources", []) 

1423 

1424 # First add the sources/references section if we have sources 

1425 if report_content and sources: 1425 ↛ 1427line 1425 didn't jump to line 1427 because the condition on line 1425 was never true

1426 # Import utilities for formatting links 

1427 from ...utilities.search_utilities import ( 

1428 format_links_to_markdown, 

1429 ) 

1430 

1431 # Format the links/citations 

1432 formatted_links = format_links_to_markdown(sources) 

1433 

1434 # Add references section to the report 

1435 if formatted_links: 

1436 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}" 

1437 

1438 # Then format citations in the report content 

1439 if report_content: 1439 ↛ 1468line 1439 didn't jump to line 1468 because the condition on line 1439 was always true

1440 # Import citation formatter 

1441 from ...text_optimization.citation_formatter import ( 

1442 CitationFormatter, 

1443 CitationMode, 

1444 ) 

1445 from ...config.search_config import ( 

1446 get_setting_from_snapshot, 

1447 ) 

1448 

1449 # Get citation format from settings 

1450 citation_format = get_setting_from_snapshot( 

1451 "report.citation_format", "domain_id_hyperlinks" 

1452 ) 

1453 mode_map = { 

1454 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

1455 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

1456 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

1457 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

1458 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

1459 } 

1460 mode = mode_map.get( 

1461 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS 

1462 ) 

1463 formatter = CitationFormatter(mode=mode) 

1464 

1465 # Format citations within the content 

1466 report_content = formatter.format_document(report_content) 

1467 

1468 if not report_content: 1468 ↛ 1470line 1468 didn't jump to line 1470 because the condition on line 1468 was never true

1469 # If neither field exists, use the full result as JSON 

1470 report_content = json.dumps(serializable_result) 

1471 

1472 # Generate headline and topics for news searches 

1473 from ...news.utils.headline_generator import generate_headline 

1474 from ...news.utils.topic_generator import generate_topics 

1475 

1476 query_text = result.get( 

1477 "query", subscription.get("query", "News Update") 

1478 ) 

1479 

1480 # Generate headline from the actual research findings 

1481 logger.info( 

1482 f"Generating headline for subscription {subscription_id}" 

1483 ) 

1484 generated_headline = generate_headline( 

1485 query=query_text, 

1486 findings=report_content, 

1487 max_length=200, # Allow longer headlines for news 

1488 ) 

1489 

1490 # Generate topics from the findings 

1491 logger.info( 

1492 f"Generating topics for subscription {subscription_id}" 

1493 ) 

1494 generated_topics = generate_topics( 

1495 query=query_text, 

1496 findings=report_content, 

1497 category=subscription.get("name", "News"), 

1498 max_topics=6, 

1499 ) 

1500 

1501 logger.info( 

1502 f"Generated headline: {generated_headline}, topics: {generated_topics}" 

1503 ) 

1504 

1505 # Get subscription name for metadata 

1506 subscription_name = subscription.get("name", "") 

1507 

1508 # Use generated headline as title, or fallback 

1509 if generated_headline: 1509 ↛ 1512line 1509 didn't jump to line 1512 because the condition on line 1509 was always true

1510 title = generated_headline 

1511 else: 

1512 if subscription_name: 

1513 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1514 else: 

1515 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1516 

1517 # Create research history entry 

1518 history_entry = ResearchHistory( 

1519 id=research_id, 

1520 query=result.get("query", ""), 

1521 mode="news_subscription", 

1522 status="completed", 

1523 created_at=datetime.now(UTC).isoformat(), 

1524 completed_at=datetime.now(UTC).isoformat(), 

1525 title=title, 

1526 research_meta={ 

1527 "subscription_id": subscription_id, 

1528 "triggered_by": "scheduler", 

1529 "is_news_search": True, 

1530 "username": username, 

1531 "subscription_name": subscription_name, # Store subscription name for display 

1532 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval 

1533 "generated_headline": generated_headline, # Store generated headline for news display 

1534 "generated_topics": generated_topics, # Store topics for categorization 

1535 }, 

1536 ) 

1537 db.add(history_entry) 

1538 db.commit() 

1539 

1540 # Store the report content using storage abstraction 

1541 from ...storage import get_report_storage 

1542 

1543 # Use storage to save the report (report_content already retrieved above) 

1544 storage = get_report_storage(session=db) 

1545 storage.save_report( 

1546 research_id=research_id, 

1547 content=report_content, 

1548 username=username, 

1549 ) 

1550 

1551 logger.info( 

1552 f"Stored research result {research_id} for subscription {subscription_id}" 

1553 ) 

1554 

1555 except Exception: 

1556 logger.exception("Error storing research result") 

1557 

1558 def _run_cleanup_with_tracking(self): 

1559 """Wrapper that tracks cleanup execution.""" 

1560 

1561 try: 

1562 cleaned_count = self._cleanup_inactive_users() 

1563 

1564 logger.info( 

1565 f"Cleanup successful: removed {cleaned_count} inactive users" 

1566 ) 

1567 

1568 except Exception: 

1569 logger.exception("Cleanup job failed") 

1570 

1571 def _cleanup_inactive_users(self) -> int: 

1572 """Remove users inactive for longer than retention period.""" 

1573 retention_hours = self.config.get("retention_hours", 48) 

1574 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours) 

1575 

1576 cleaned_count = 0 

1577 

1578 with self.lock: 

1579 inactive_users = [ 

1580 user_id 

1581 for user_id, session in self.user_sessions.items() 

1582 if session["last_activity"] < cutoff 

1583 ] 

1584 

1585 for user_id in inactive_users: 

1586 # Remove all scheduled jobs 

1587 for job_id in self.user_sessions[user_id]["scheduled_jobs"]: 

1588 try: 

1589 self.scheduler.remove_job(job_id) 

1590 except JobLookupError: 

1591 pass 

1592 

1593 # Clear password from memory 

1594 del self.user_sessions[user_id] 

1595 cleaned_count += 1 

1596 logger.info(f"Cleaned up inactive user {user_id}") 

1597 

1598 return cleaned_count 

1599 

1600 def _reload_config(self): 

1601 """Reload configuration from settings manager.""" 

1602 if not hasattr(self, "settings_manager") or not self.settings_manager: 

1603 return 

1604 

1605 try: 

1606 old_retention = self.config.get("retention_hours", 48) 

1607 

1608 # Reload all settings 

1609 for key in self.config: 

1610 if key == "enabled": 

1611 continue # Don't change enabled state while running 

1612 

1613 full_key = f"news.scheduler.{key}" 

1614 self.config[key] = self._get_setting(full_key, self.config[key]) 

1615 

1616 # Handle changes that need immediate action 

1617 if old_retention != self.config["retention_hours"]: 

1618 logger.info( 

1619 f"Retention period changed from {old_retention} " 

1620 f"to {self.config['retention_hours']} hours" 

1621 ) 

1622 # Trigger immediate cleanup with new retention 

1623 self.scheduler.add_job( 

1624 self._run_cleanup_with_tracking, 

1625 "date", 

1626 run_date=datetime.now(UTC) + timedelta(seconds=5), 

1627 id="immediate_cleanup_config_change", 

1628 ) 

1629 

1630 # Clear settings cache to pick up any user setting changes 

1631 self.invalidate_all_settings_cache() 

1632 

1633 except Exception: 

1634 logger.exception("Error reloading configuration") 

1635 

1636 def get_status(self) -> Dict[str, Any]: 

1637 """Get scheduler status information.""" 

1638 with self.lock: 

1639 active_users = len(self.user_sessions) 

1640 total_jobs = sum( 

1641 len(session["scheduled_jobs"]) 

1642 for session in self.user_sessions.values() 

1643 ) 

1644 

1645 # Get next run time for cleanup job 

1646 next_cleanup = None 

1647 if self.is_running: 

1648 job = self.scheduler.get_job("cleanup_inactive_users") 

1649 if job: 1649 ↛ 1652line 1649 didn't jump to line 1652 because the condition on line 1649 was always true

1650 next_cleanup = job.next_run_time 

1651 

1652 return { 

1653 "is_running": self.is_running, 

1654 "config": self.config, 

1655 "active_users": active_users, 

1656 "total_scheduled_jobs": total_jobs, 

1657 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None, 

1658 "memory_usage": self._estimate_memory_usage(), 

1659 } 

1660 

1661 def _estimate_memory_usage(self) -> int: 

1662 """Estimate memory usage of user sessions.""" 

1663 

1664 # Rough estimate: username (50) + password (100) + metadata (200) per user 

1665 per_user_estimate = 350 

1666 return len(self.user_sessions) * per_user_estimate 

1667 

1668 def get_user_sessions_summary(self) -> List[Dict[str, Any]]: 

1669 """Get summary of active user sessions (without passwords).""" 

1670 with self.lock: 

1671 summary = [] 

1672 for user_id, session in self.user_sessions.items(): 

1673 summary.append( 

1674 { 

1675 "user_id": user_id, 

1676 "last_activity": session["last_activity"].isoformat(), 

1677 "scheduled_jobs": len(session["scheduled_jobs"]), 

1678 "time_since_activity": str( 

1679 datetime.now(UTC) - session["last_activity"] 

1680 ), 

1681 } 

1682 ) 

1683 return summary 

1684 

1685 

1686# Singleton instance getter 

1687_scheduler_instance = None 

1688 

1689 

1690def get_news_scheduler() -> NewsScheduler: 

1691 """Get the singleton news scheduler instance.""" 

1692 global _scheduler_instance 

1693 if _scheduler_instance is None: 

1694 _scheduler_instance = NewsScheduler() 

1695 return _scheduler_instance