Coverage for src / local_deep_research / news / subscription_manager / scheduler.py: 24%

586 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Activity-based news subscription scheduler for per-user encrypted databases. 

3Tracks user activity and temporarily stores credentials for automatic updates. 

4""" 

5 

6import random 

7import threading 

8from datetime import datetime, timedelta, UTC 

9from typing import Any, Dict, List 

10 

11from loguru import logger 

12from local_deep_research.settings.logger import log_settings 

13 

14from apscheduler.schedulers.background import BackgroundScheduler 

15from apscheduler.jobstores.base import JobLookupError 

16 

17# RAG indexing imports 

18from ...research_library.services.library_rag_service import LibraryRAGService 

19from ...database.library_init import get_default_library_id 

20from ...database.models.library import Document, DocumentCollection 

21 

22 

23SCHEDULER_AVAILABLE = True # Always available since it's a required dependency 

24 

25 

26class NewsScheduler: 

27 """ 

28 Singleton scheduler that manages news subscriptions for active users. 

29 

30 This scheduler: 

31 - Monitors user activity through database access 

32 - Temporarily stores user credentials in memory 

33 - Automatically schedules subscription checks 

34 - Cleans up inactive users after configurable period 

35 """ 

36 

37 _instance = None 

38 _lock = threading.Lock() 

39 

40 def __new__(cls): 

41 """Ensure singleton instance.""" 

42 if cls._instance is None: 42 ↛ 46line 42 didn't jump to line 46 because the condition on line 42 was always true

43 with cls._lock: 

44 if cls._instance is None: 44 ↛ 46line 44 didn't jump to line 46

45 cls._instance = super().__new__(cls) 

46 return cls._instance 

47 

48 def __init__(self): 

49 """Initialize the scheduler (only runs once due to singleton).""" 

50 # Skip if already initialized 

51 if hasattr(self, "_initialized"): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 return 

53 

54 # User session tracking 

55 self.user_sessions = {} # user_id -> {password, last_activity, scheduled_jobs} 

56 self.lock = threading.Lock() 

57 

58 # Scheduler instance 

59 self.scheduler = BackgroundScheduler() 

60 

61 # Configuration (will be loaded from settings) 

62 self.config = self._load_default_config() 

63 

64 # State 

65 self.is_running = False 

66 

67 self._initialized = True 

68 logger.info("News scheduler initialized") 

69 

70 def _load_default_config(self) -> Dict[str, Any]: 

71 """Load default configuration (will be overridden by settings manager).""" 

72 return { 

73 "enabled": True, 

74 "retention_hours": 48, 

75 "cleanup_interval_hours": 1, 

76 "max_jitter_seconds": 300, 

77 "max_concurrent_jobs": 10, 

78 "subscription_batch_size": 5, 

79 "activity_check_interval_minutes": 5, 

80 } 

81 

82 def initialize_with_settings(self, settings_manager): 

83 """Initialize configuration from settings manager.""" 

84 try: 

85 # Load all scheduler settings 

86 self.settings_manager = settings_manager 

87 self.config = { 

88 "enabled": self._get_setting("news.scheduler.enabled", True), 

89 "retention_hours": self._get_setting( 

90 "news.scheduler.retention_hours", 48 

91 ), 

92 "cleanup_interval_hours": self._get_setting( 

93 "news.scheduler.cleanup_interval_hours", 1 

94 ), 

95 "max_jitter_seconds": self._get_setting( 

96 "news.scheduler.max_jitter_seconds", 300 

97 ), 

98 "max_concurrent_jobs": self._get_setting( 

99 "news.scheduler.max_concurrent_jobs", 10 

100 ), 

101 "subscription_batch_size": self._get_setting( 

102 "news.scheduler.batch_size", 5 

103 ), 

104 "activity_check_interval_minutes": self._get_setting( 

105 "news.scheduler.activity_check_interval", 5 

106 ), 

107 } 

108 log_settings(self.config, "Scheduler configuration loaded") 

109 except Exception: 

110 logger.exception("Error loading scheduler settings") 

111 # Keep default config 

112 

113 def _get_setting(self, key: str, default: Any) -> Any: 

114 """Get setting with fallback to default.""" 

115 if hasattr(self, "settings_manager") and self.settings_manager: 115 ↛ 117line 115 didn't jump to line 117 because the condition on line 115 was always true

116 return self.settings_manager.get_setting(key, default=default) 

117 return default 

118 

119 def start(self): 

120 """Start the scheduler.""" 

121 if not self.config.get("enabled", True): 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 logger.info("News scheduler is disabled in settings") 

123 return 

124 

125 if self.is_running: 

126 logger.warning("Scheduler is already running") 

127 return 

128 

129 # Schedule cleanup job 

130 self.scheduler.add_job( 

131 self._run_cleanup_with_tracking, 

132 "interval", 

133 hours=self.config["cleanup_interval_hours"], 

134 id="cleanup_inactive_users", 

135 name="Cleanup Inactive User Sessions", 

136 jitter=60, # Add some jitter to cleanup 

137 ) 

138 

139 # Schedule configuration reload 

140 self.scheduler.add_job( 

141 self._reload_config, 

142 "interval", 

143 minutes=30, 

144 id="reload_config", 

145 name="Reload Configuration", 

146 ) 

147 

148 # Start the scheduler 

149 self.scheduler.start() 

150 self.is_running = True 

151 

152 # Schedule initial cleanup after a delay 

153 self.scheduler.add_job( 

154 self._run_cleanup_with_tracking, 

155 "date", 

156 run_date=datetime.now(UTC) + timedelta(seconds=30), 

157 id="initial_cleanup", 

158 ) 

159 

160 logger.info("News scheduler started") 

161 

162 def stop(self): 

163 """Stop the scheduler.""" 

164 if self.is_running: 

165 self.scheduler.shutdown(wait=True) 

166 self.is_running = False 

167 

168 # Clear all user sessions 

169 with self.lock: 

170 self.user_sessions.clear() 

171 

172 logger.info("News scheduler stopped") 

173 

174 def update_user_info(self, username: str, password: str): 

175 """ 

176 Update user info in scheduler. Called on every database interaction. 

177 

178 Args: 

179 username: User's username 

180 password: User's password 

181 """ 

182 logger.info( 

183 f"[SCHEDULER] update_user_info called for {username}, is_running={self.is_running}, active_users={len(self.user_sessions)}" 

184 ) 

185 logger.debug( 

186 f"[SCHEDULER] Current active users: {list(self.user_sessions.keys())}" 

187 ) 

188 

189 if not self.is_running: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 logger.warning( 

191 f"[SCHEDULER] Scheduler not running, cannot update user {username}" 

192 ) 

193 return 

194 

195 with self.lock: 

196 now = datetime.now(UTC) 

197 

198 if username not in self.user_sessions: 

199 # New user - create session info 

200 logger.info(f"[SCHEDULER] New user in scheduler: {username}") 

201 self.user_sessions[username] = { 

202 "password": password, 

203 "last_activity": now, 

204 "scheduled_jobs": set(), 

205 } 

206 logger.debug( 

207 f"[SCHEDULER] Created session for {username}, scheduling subscriptions" 

208 ) 

209 # Schedule their subscriptions 

210 self._schedule_user_subscriptions(username) 

211 else: 

212 # Existing user - update info 

213 logger.info( 

214 f"[SCHEDULER] Updating existing user {username} activity, will reschedule" 

215 ) 

216 old_activity = self.user_sessions[username]["last_activity"] 

217 activity_delta = now - old_activity 

218 logger.debug( 

219 f"[SCHEDULER] User {username} last activity: {old_activity}, delta: {activity_delta}" 

220 ) 

221 

222 self.user_sessions[username]["password"] = password 

223 self.user_sessions[username]["last_activity"] = now 

224 logger.debug( 

225 f"[SCHEDULER] Updated {username} session info, scheduling subscriptions" 

226 ) 

227 # Reschedule their subscriptions in case they changed 

228 self._schedule_user_subscriptions(username) 

229 

230 def unregister_user(self, username: str): 

231 """ 

232 Unregister a user and clean up their scheduled jobs. 

233 Called when user logs out. 

234 """ 

235 with self.lock: 

236 if username in self.user_sessions: 

237 logger.info(f"Unregistering user {username}") 

238 

239 # Remove all scheduled jobs for this user 

240 session_info = self.user_sessions[username] 

241 for job_id in session_info["scheduled_jobs"].copy(): 

242 try: 

243 self.scheduler.remove_job(job_id) 

244 except JobLookupError: 

245 pass 

246 

247 # Remove user session 

248 del self.user_sessions[username] 

249 logger.info(f"User {username} unregistered successfully") 

250 

251 def _schedule_user_subscriptions(self, username: str): 

252 """Schedule all active subscriptions for a user.""" 

253 logger.info(f"_schedule_user_subscriptions called for {username}") 

254 try: 

255 session_info = self.user_sessions.get(username) 

256 if not session_info: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 logger.warning(f"No session info found for {username}") 

258 return 

259 

260 password = session_info["password"] 

261 logger.debug( 

262 f"Got password for {username}, length: {len(password) if password else 0}" 

263 ) 

264 

265 # Get user's subscriptions from their encrypted database 

266 from ...database.session_context import get_user_db_session 

267 from ...database.models.news import NewsSubscription 

268 

269 with get_user_db_session(username, password) as db: 

270 subscriptions = ( 

271 db.query(NewsSubscription).filter_by(is_active=True).all() 

272 ) 

273 logger.debug( 

274 f"Query executed, found {len(subscriptions)} results" 

275 ) 

276 

277 # Log details of each subscription 

278 for sub in subscriptions: 278 ↛ 279line 278 didn't jump to line 279 because the loop on line 278 never started

279 logger.debug( 

280 f"Subscription {sub.id}: name='{sub.name}', is_active={sub.is_active}, status='{sub.status}', refresh_interval={sub.refresh_interval_minutes} minutes" 

281 ) 

282 

283 logger.info( 

284 f"Found {len(subscriptions)} active subscriptions for {username}" 

285 ) 

286 

287 # Clear old jobs for this user 

288 for job_id in session_info["scheduled_jobs"].copy(): 

289 try: 

290 self.scheduler.remove_job(job_id) 

291 session_info["scheduled_jobs"].remove(job_id) 

292 except JobLookupError: 

293 pass 

294 

295 # Schedule each subscription with jitter 

296 for sub in subscriptions: 296 ↛ 297line 296 didn't jump to line 297 because the loop on line 296 never started

297 job_id = f"{username}_{sub.id}" 

298 

299 # Calculate jitter 

300 max_jitter = int(self.config.get("max_jitter_seconds", 300)) 

301 jitter = random.randint(0, max_jitter) 

302 

303 # Determine trigger based on frequency 

304 refresh_minutes = sub.refresh_interval_minutes 

305 

306 if refresh_minutes <= 60: # 60 minutes or less 

307 # For hourly or more frequent, use interval trigger 

308 trigger = "interval" 

309 trigger_args = { 

310 "minutes": refresh_minutes, 

311 "jitter": jitter, 

312 "start_date": datetime.now(UTC), # Start immediately 

313 } 

314 else: 

315 # For less frequent, calculate next run time 

316 now = datetime.now(UTC) 

317 if sub.next_refresh: 

318 # Convert to timezone-aware for comparison 

319 next_refresh_aware = sub.next_refresh.replace( 

320 tzinfo=None 

321 ) 

322 if next_refresh_aware <= now: 

323 # Subscription is overdue - run it immediately with small jitter 

324 logger.info( 

325 f"Subscription {sub.id} is overdue, scheduling immediate run" 

326 ) 

327 next_run = now + timedelta(seconds=jitter) 

328 else: 

329 next_run = next_refresh_aware 

330 else: 

331 next_run = now + timedelta( 

332 minutes=refresh_minutes, seconds=jitter 

333 ) 

334 

335 trigger = "date" 

336 trigger_args = {"run_date": next_run} 

337 

338 # Add the job 

339 self.scheduler.add_job( 

340 func=self._check_subscription, 

341 args=[username, sub.id], 

342 trigger=trigger, 

343 id=job_id, 

344 name=f"Check {sub.name or sub.query_or_topic[:30]}", 

345 replace_existing=True, 

346 **trigger_args, 

347 ) 

348 

349 session_info["scheduled_jobs"].add(job_id) 

350 logger.info(f"Scheduled job {job_id} with {trigger} trigger") 

351 

352 except Exception as e: 

353 logger.exception( 

354 f"Error scheduling subscriptions for {username}: {e}" 

355 ) 

356 

357 # Add document processing for this user 

358 self._schedule_document_processing(username) 

359 

360 def _schedule_document_processing(self, username: str): 

361 """Schedule document processing for a user.""" 

362 logger.info( 

363 f"[DOC_SCHEDULER] Scheduling document processing for {username}" 

364 ) 

365 logger.debug( 

366 f"[DOC_SCHEDULER] Current user sessions: {list(self.user_sessions.keys())}" 

367 ) 

368 

369 try: 

370 session_info = self.user_sessions.get(username) 

371 if not session_info: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 logger.warning( 

373 f"[DOC_SCHEDULER] No session info found for {username}" 

374 ) 

375 logger.debug( 

376 f"[DOC_SCHEDULER] Available sessions: {list(self.user_sessions.keys())}" 

377 ) 

378 return 

379 

380 password = session_info["password"] 

381 logger.debug( 

382 f"[DOC_SCHEDULER] Retrieved password for {username}, scheduler running: {self.is_running}" 

383 ) 

384 

385 # Get user's document scheduler settings from their database 

386 from ...database.session_context import get_user_db_session 

387 from ...settings.manager import SettingsManager 

388 

389 logger.debug( 

390 f"[DOC_SCHEDULER] Connecting to database for {username} settings" 

391 ) 

392 with get_user_db_session(username, password) as db: 

393 settings_manager = SettingsManager(db) 

394 logger.debug( 

395 "[DOC_SCHEDULER] Connected to database, retrieving settings" 

396 ) 

397 

398 # Check if document processing is enabled for this user 

399 enabled = settings_manager.get_setting( 

400 "document_scheduler.enabled", True 

401 ) 

402 logger.info( 

403 f"[DOC_SCHEDULER] Document scheduler enabled for {username}: {enabled}" 

404 ) 

405 

406 if not enabled: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true

407 logger.info( 

408 f"[DOC_SCHEDULER] Document scheduler disabled for user {username}" 

409 ) 

410 return 

411 

412 # Get processing settings 

413 interval_seconds = settings_manager.get_setting( 

414 "document_scheduler.interval_seconds", 1800 

415 ) 

416 download_pdfs = settings_manager.get_setting( 

417 "document_scheduler.download_pdfs", False 

418 ) 

419 extract_text = settings_manager.get_setting( 

420 "document_scheduler.extract_text", True 

421 ) 

422 generate_rag = settings_manager.get_setting( 

423 "document_scheduler.generate_rag", False 

424 ) 

425 

426 logger.info( 

427 f"[DOC_SCHEDULER] User {username} document settings: enabled={enabled}, interval={interval_seconds}s, " 

428 f"pdfs={download_pdfs}, text={extract_text}, rag={generate_rag}" 

429 ) 

430 

431 # Schedule document processing job 

432 job_id = f"{username}_document_processing" 

433 logger.debug(f"[DOC_SCHEDULER] Preparing to schedule job {job_id}") 

434 

435 # Remove existing document job if any 

436 try: 

437 self.scheduler.remove_job(job_id) 

438 session_info["scheduled_jobs"].discard(job_id) 

439 logger.debug(f"[DOC_SCHEDULER] Removed existing job {job_id}") 

440 except JobLookupError: 

441 logger.debug( 

442 f"[DOC_SCHEDULER] No existing job {job_id} to remove" 

443 ) 

444 pass # Job doesn't exist, that's fine 

445 

446 # Add new document processing job 

447 logger.debug( 

448 f"[DOC_SCHEDULER] Adding new document processing job with interval {interval_seconds}s" 

449 ) 

450 self.scheduler.add_job( 

451 func=self._process_user_documents, 

452 args=[username], 

453 trigger="interval", 

454 seconds=interval_seconds, 

455 id=job_id, 

456 name=f"Process Documents for {username}", 

457 jitter=30, # Add small jitter to prevent multiple users from processing simultaneously 

458 max_instances=1, # Prevent overlapping document processing for same user 

459 replace_existing=True, 

460 ) 

461 

462 session_info["scheduled_jobs"].add(job_id) 

463 logger.info( 

464 f"[DOC_SCHEDULER] Scheduled document processing job {job_id} for {username} with {interval_seconds}s interval" 

465 ) 

466 logger.debug( 

467 f"[DOC_SCHEDULER] User {username} now has {len(session_info['scheduled_jobs'])} scheduled jobs: {list(session_info['scheduled_jobs'])}" 

468 ) 

469 

470 # Verify job was added 

471 job = self.scheduler.get_job(job_id) 

472 if job: 472 ↛ 477line 472 didn't jump to line 477 because the condition on line 472 was always true

473 logger.info( 

474 f"[DOC_SCHEDULER] Successfully verified job {job_id} exists, next run: {job.next_run_time}" 

475 ) 

476 else: 

477 logger.error( 

478 f"[DOC_SCHEDULER] Failed to verify job {job_id} exists!" 

479 ) 

480 

481 except Exception as e: 

482 logger.exception( 

483 f"Error scheduling document processing for {username}: {e}" 

484 ) 

485 

486 def _process_user_documents(self, username: str): 

487 """Process documents for a user.""" 

488 logger.info(f"[DOC_SCHEDULER] Processing documents for user {username}") 

489 start_time = datetime.now(UTC) 

490 

491 try: 

492 session_info = self.user_sessions.get(username) 

493 if not session_info: 

494 logger.warning( 

495 f"[DOC_SCHEDULER] No session info found for user {username}" 

496 ) 

497 return 

498 

499 password = session_info["password"] 

500 logger.debug( 

501 f"[DOC_SCHEDULER] Starting document processing for {username}" 

502 ) 

503 

504 # Get user's settings from their database 

505 from ...database.session_context import get_user_db_session 

506 from ...database.models.research import ResearchHistory 

507 from ...settings.manager import SettingsManager 

508 

509 logger.debug( 

510 f"[DOC_SCHEDULER] Connecting to database for {username}" 

511 ) 

512 with get_user_db_session(username, password) as db: 

513 settings_manager = SettingsManager(db) 

514 logger.debug( 

515 "[DOC_SCHEDULER] Connected to database for document processing" 

516 ) 

517 

518 # Get processing settings 

519 download_pdfs = settings_manager.get_setting( 

520 "document_scheduler.download_pdfs", False 

521 ) 

522 extract_text = settings_manager.get_setting( 

523 "document_scheduler.extract_text", True 

524 ) 

525 generate_rag = settings_manager.get_setting( 

526 "document_scheduler.generate_rag", False 

527 ) 

528 

529 logger.info( 

530 f"[DOC_SCHEDULER] Processing settings for {username}: pdfs={download_pdfs}, text={extract_text}, rag={generate_rag}" 

531 ) 

532 

533 if not any([download_pdfs, extract_text, generate_rag]): 

534 logger.info( 

535 f"[DOC_SCHEDULER] No processing options enabled for user {username}" 

536 ) 

537 return 

538 

539 # Get completed research sessions since last run 

540 last_run_str = settings_manager.get_setting( 

541 "document_scheduler.last_run", "" 

542 ) 

543 last_run = ( 

544 datetime.fromisoformat(last_run_str) 

545 if last_run_str 

546 else None 

547 ) 

548 

549 logger.info( 

550 f"[DOC_SCHEDULER] Last run for {username}: {last_run}" 

551 ) 

552 

553 # Query for completed research since last run 

554 logger.debug( 

555 f"[DOC_SCHEDULER] Querying for completed research since {last_run}" 

556 ) 

557 query = db.query(ResearchHistory).filter( 

558 ResearchHistory.status == "completed", 

559 ResearchHistory.completed_at.is_not( 

560 None 

561 ), # Ensure completed_at is not null 

562 ) 

563 

564 if last_run: 

565 query = query.filter( 

566 ResearchHistory.completed_at > last_run 

567 ) 

568 

569 # Limit to recent research to prevent overwhelming 

570 query = query.order_by( 

571 ResearchHistory.completed_at.desc() 

572 ).limit(20) 

573 

574 research_sessions = query.all() 

575 logger.debug( 

576 f"[DOC_SCHEDULER] Query executed, found {len(research_sessions)} sessions" 

577 ) 

578 

579 if not research_sessions: 

580 logger.info( 

581 f"[DOC_SCHEDULER] No new completed research sessions found for user {username}" 

582 ) 

583 return 

584 

585 logger.info( 

586 f"[DOC_SCHEDULER] Found {len(research_sessions)} research sessions to process for {username}" 

587 ) 

588 

589 # Log details of each research session 

590 for i, research in enumerate( 

591 research_sessions[:5] 

592 ): # Log first 5 details 

593 title_safe = ( 

594 (research.title[:50] + "...") 

595 if research.title 

596 else "No title" 

597 ) 

598 completed_safe = ( 

599 research.completed_at 

600 if research.completed_at 

601 else "No completion time" 

602 ) 

603 logger.debug( 

604 f"[DOC_SCHEDULER] Session {i + 1}: id={research.id}, title={title_safe}, completed={completed_safe}" 

605 ) 

606 

607 # Handle completed_at which might be a string or datetime 

608 completed_at_obj = None 

609 if research.completed_at: 

610 if isinstance(research.completed_at, str): 

611 try: 

612 completed_at_obj = datetime.fromisoformat( 

613 research.completed_at.replace("Z", "+00:00") 

614 ) 

615 except: 

616 completed_at_obj = None 

617 else: 

618 completed_at_obj = research.completed_at 

619 

620 logger.debug( 

621 f"[DOC_SCHEDULER] - completed_at type: {type(research.completed_at)}" 

622 ) 

623 logger.debug( 

624 f"[DOC_SCHEDULER] - completed_at timezone: {completed_at_obj.tzinfo if completed_at_obj else 'None'}" 

625 ) 

626 logger.debug(f"[DOC_SCHEDULER] - last_run: {last_run}") 

627 logger.debug( 

628 f"[DOC_SCHEDULER] - completed_at > last_run: {completed_at_obj > last_run if last_run and completed_at_obj else 'N/A'}" 

629 ) 

630 

631 processed_count = 0 

632 for research in research_sessions: 

633 try: 

634 logger.info( 

635 f"[DOC_SCHEDULER] Processing research {research.id} for user {username}" 

636 ) 

637 

638 # Call actual processing APIs 

639 if download_pdfs: 

640 logger.info( 

641 f"[DOC_SCHEDULER] Downloading PDFs for research {research.id}" 

642 ) 

643 try: 

644 # Use the DownloadService to queue PDF downloads 

645 from ...research_library.services.download_service import ( 

646 DownloadService, 

647 ) 

648 

649 download_service = DownloadService( 

650 username=username, password=password 

651 ) 

652 queued_count = ( 

653 download_service.queue_research_downloads( 

654 research.id 

655 ) 

656 ) 

657 logger.info( 

658 f"[DOC_SCHEDULER] Queued {queued_count} PDF downloads for research {research.id}" 

659 ) 

660 except Exception as e: 

661 logger.exception( 

662 f"[DOC_SCHEDULER] Failed to download PDFs for research {research.id}: {e}" 

663 ) 

664 

665 if extract_text: 

666 logger.info( 

667 f"[DOC_SCHEDULER] Extracting text for research {research.id}" 

668 ) 

669 try: 

670 # Use the DownloadService to extract text for all resources 

671 from ...research_library.services.download_service import ( 

672 DownloadService, 

673 ) 

674 from ...database.models.research import ( 

675 ResearchResource, 

676 ) 

677 

678 download_service = DownloadService( 

679 username=username, password=password 

680 ) 

681 

682 # Get all resources for this research (reuse existing db session) 

683 resources = ( 

684 db.query(ResearchResource) 

685 .filter_by(research_id=research.id) 

686 .all() 

687 ) 

688 processed_count = 0 

689 for resource in resources: 

690 # We need to pass the password to the download service 

691 # The DownloadService creates its own database sessions, so we need to ensure password is available 

692 try: 

693 success, error = ( 

694 download_service.download_as_text( 

695 resource.id 

696 ) 

697 ) 

698 if success: 

699 processed_count += 1 

700 logger.info( 

701 f"[DOC_SCHEDULER] Successfully extracted text for resource {resource.id}" 

702 ) 

703 else: 

704 logger.warning( 

705 f"[DOC_SCHEDULER] Failed to extract text for resource {resource.id}: {error}" 

706 ) 

707 except Exception as resource_error: 

708 logger.exception( 

709 f"[DOC_SCHEDULER] Error processing resource {resource.id}: {resource_error}" 

710 ) 

711 logger.info( 

712 f"[DOC_SCHEDULER] Text extraction completed for research {research.id}: {processed_count}/{len(resources)} resources processed" 

713 ) 

714 except Exception as e: 

715 logger.exception( 

716 f"[DOC_SCHEDULER] Failed to extract text for research {research.id}: {e}" 

717 ) 

718 

719 if generate_rag: 

720 logger.info( 

721 f"[DOC_SCHEDULER] Generating RAG embeddings for research {research.id}" 

722 ) 

723 try: 

724 # Get embedding settings from user configuration 

725 embedding_model = settings_manager.get_setting( 

726 "local_search_embedding_model", 

727 "all-MiniLM-L6-v2", 

728 ) 

729 embedding_provider = ( 

730 settings_manager.get_setting( 

731 "local_search_embedding_provider", 

732 "sentence_transformers", 

733 ) 

734 ) 

735 chunk_size = int( 

736 settings_manager.get_setting( 

737 "local_search_chunk_size", 1000 

738 ) 

739 ) 

740 chunk_overlap = int( 

741 settings_manager.get_setting( 

742 "local_search_chunk_overlap", 200 

743 ) 

744 ) 

745 

746 # Initialize RAG service with user's embedding configuration 

747 rag_service = LibraryRAGService( 

748 username=username, 

749 embedding_model=embedding_model, 

750 embedding_provider=embedding_provider, 

751 chunk_size=chunk_size, 

752 chunk_overlap=chunk_overlap, 

753 ) 

754 

755 # Get default Library collection ID 

756 library_collection_id = get_default_library_id( 

757 username 

758 ) 

759 

760 # Query for unindexed documents from this research session 

761 documents_to_index = ( 

762 db.query(Document.id, Document.title) 

763 .outerjoin( 

764 DocumentCollection, 

765 ( 

766 DocumentCollection.document_id 

767 == Document.id 

768 ) 

769 & ( 

770 DocumentCollection.collection_id 

771 == library_collection_id 

772 ), 

773 ) 

774 .filter( 

775 Document.research_id == research.id, 

776 Document.text_content.isnot(None), 

777 ( 

778 DocumentCollection.indexed.is_( 

779 False 

780 ) 

781 | DocumentCollection.id.is_(None) 

782 ), 

783 ) 

784 .all() 

785 ) 

786 

787 if not documents_to_index: 

788 logger.info( 

789 f"[DOC_SCHEDULER] No unindexed documents found for research {research.id}" 

790 ) 

791 else: 

792 # Index each document 

793 indexed_count = 0 

794 for doc_id, doc_title in documents_to_index: 

795 try: 

796 result = rag_service.index_document( 

797 document_id=doc_id, 

798 collection_id=library_collection_id, 

799 force_reindex=False, 

800 ) 

801 if result["status"] == "success": 

802 indexed_count += 1 

803 logger.info( 

804 f"[DOC_SCHEDULER] Indexed document {doc_id} ({doc_title}) " 

805 f"with {result.get('chunk_count', 0)} chunks" 

806 ) 

807 except Exception as doc_error: 

808 logger.exception( 

809 f"[DOC_SCHEDULER] Failed to index document {doc_id}: {doc_error}" 

810 ) 

811 

812 logger.info( 

813 f"[DOC_SCHEDULER] RAG indexing completed for research {research.id}: " 

814 f"{indexed_count}/{len(documents_to_index)} documents indexed" 

815 ) 

816 except Exception as e: 

817 logger.exception( 

818 f"[DOC_SCHEDULER] Failed to generate RAG embeddings for research {research.id}: {e}" 

819 ) 

820 

821 processed_count += 1 

822 logger.debug( 

823 f"[DOC_SCHEDULER] Successfully queued processing for research {research.id}" 

824 ) 

825 

826 except Exception as e: 

827 logger.exception( 

828 f"[DOC_SCHEDULER] Error processing research {research.id} for user {username}: {e}" 

829 ) 

830 

831 # Update last run time in user's settings 

832 current_time = datetime.now(UTC).isoformat() 

833 settings_manager.set_setting( 

834 "document_scheduler.last_run", current_time, commit=True 

835 ) 

836 logger.debug( 

837 f"[DOC_SCHEDULER] Updated last run time for {username} to {current_time}" 

838 ) 

839 

840 end_time = datetime.now(UTC) 

841 duration = (end_time - start_time).total_seconds() 

842 logger.info( 

843 f"[DOC_SCHEDULER] Completed document processing for user {username}: {processed_count} sessions processed in {duration:.2f}s" 

844 ) 

845 

846 except Exception as e: 

847 logger.exception( 

848 f"[DOC_SCHEDULER] Error processing documents for user {username}: {e}" 

849 ) 

850 

851 def get_document_scheduler_status(self, username: str) -> Dict[str, Any]: 

852 """Get document scheduler status for a specific user.""" 

853 try: 

854 session_info = self.user_sessions.get(username) 

855 if not session_info: 

856 return { 

857 "enabled": False, 

858 "message": "User not found in scheduler", 

859 } 

860 

861 password = session_info["password"] 

862 

863 # Get user's settings 

864 from ...database.session_context import get_user_db_session 

865 from ...settings.manager import SettingsManager 

866 

867 with get_user_db_session(username, password) as db: 

868 settings_manager = SettingsManager(db) 

869 

870 # Get configuration 

871 enabled = settings_manager.get_setting( 

872 "document_scheduler.enabled", True 

873 ) 

874 interval_seconds = settings_manager.get_setting( 

875 "document_scheduler.interval_seconds", 1800 

876 ) 

877 download_pdfs = settings_manager.get_setting( 

878 "document_scheduler.download_pdfs", False 

879 ) 

880 extract_text = settings_manager.get_setting( 

881 "document_scheduler.extract_text", True 

882 ) 

883 generate_rag = settings_manager.get_setting( 

884 "document_scheduler.generate_rag", False 

885 ) 

886 last_run = settings_manager.get_setting( 

887 "document_scheduler.last_run", "" 

888 ) 

889 

890 # Check if user has document processing job 

891 job_id = f"{username}_document_processing" 

892 has_job = job_id in session_info.get("scheduled_jobs", set()) 

893 

894 return { 

895 "enabled": enabled, 

896 "interval_seconds": interval_seconds, 

897 "processing_options": { 

898 "download_pdfs": download_pdfs, 

899 "extract_text": extract_text, 

900 "generate_rag": generate_rag, 

901 }, 

902 "last_run": last_run, 

903 "has_scheduled_job": has_job, 

904 "user_active": username in self.user_sessions, 

905 } 

906 

907 except Exception as e: 

908 logger.exception( 

909 f"Error getting document scheduler status for user {username}" 

910 ) 

911 return { 

912 "enabled": False, 

913 "message": f"Failed to retrieve scheduler status: {type(e).__name__}", 

914 } 

915 

916 def trigger_document_processing(self, username: str) -> bool: 

917 """Trigger immediate document processing for a user.""" 

918 logger.info( 

919 f"[DOC_SCHEDULER] Manual trigger requested for user {username}" 

920 ) 

921 try: 

922 session_info = self.user_sessions.get(username) 

923 if not session_info: 

924 logger.warning( 

925 f"[DOC_SCHEDULER] User {username} not found in scheduler" 

926 ) 

927 logger.debug( 

928 f"[DOC_SCHEDULER] Available users: {list(self.user_sessions.keys())}" 

929 ) 

930 return False 

931 

932 if not self.is_running: 

933 logger.warning( 

934 f"[DOC_SCHEDULER] Scheduler not running, cannot trigger document processing for {username}" 

935 ) 

936 return False 

937 

938 # Trigger immediate processing 

939 job_id = f"{username}_document_processing_manual" 

940 logger.debug(f"[DOC_SCHEDULER] Scheduling manual job {job_id}") 

941 

942 self.scheduler.add_job( 

943 func=self._process_user_documents, 

944 args=[username], 

945 trigger="date", 

946 run_date=datetime.now(UTC) + timedelta(seconds=1), 

947 id=job_id, 

948 name=f"Manual Document Processing for {username}", 

949 replace_existing=True, 

950 ) 

951 

952 # Verify job was added 

953 job = self.scheduler.get_job(job_id) 

954 if job: 

955 logger.info( 

956 f"[DOC_SCHEDULER] Successfully triggered manual document processing for user {username}, job {job_id}, next run: {job.next_run_time}" 

957 ) 

958 else: 

959 logger.error( 

960 f"[DOC_SCHEDULER] Failed to verify manual job {job_id} was added!" 

961 ) 

962 return False 

963 

964 return True 

965 

966 except Exception as e: 

967 logger.exception( 

968 f"[DOC_SCHEDULER] Error triggering document processing for user {username}: {e}" 

969 ) 

970 return False 

971 

972 def _check_user_overdue_subscriptions(self, username: str): 

973 """Check and immediately run any overdue subscriptions for a user.""" 

974 try: 

975 session_info = self.user_sessions.get(username) 

976 if not session_info: 

977 return 

978 

979 password = session_info["password"] 

980 

981 # Get user's overdue subscriptions 

982 from ...database.session_context import get_user_db_session 

983 from ...database.models.news import NewsSubscription 

984 from datetime import timezone 

985 

986 with get_user_db_session(username, password) as db: 

987 now = datetime.now(timezone.utc) 

988 overdue_subs = ( 

989 db.query(NewsSubscription) 

990 .filter( 

991 NewsSubscription.is_active.is_(True), 

992 NewsSubscription.next_refresh.is_not(None), 

993 NewsSubscription.next_refresh <= now, 

994 ) 

995 .all() 

996 ) 

997 

998 if overdue_subs: 

999 logger.info( 

1000 f"Found {len(overdue_subs)} overdue subscriptions for {username}" 

1001 ) 

1002 

1003 for sub in overdue_subs: 

1004 # Run immediately with small random delay 

1005 delay_seconds = random.randint(1, 30) 

1006 job_id = ( 

1007 f"overdue_{username}_{sub.id}_{int(now.timestamp())}" 

1008 ) 

1009 

1010 self.scheduler.add_job( 

1011 func=self._check_subscription, 

1012 args=[username, sub.id], 

1013 trigger="date", 

1014 run_date=now + timedelta(seconds=delay_seconds), 

1015 id=job_id, 

1016 name=f"Overdue: {sub.name or sub.query_or_topic[:30]}", 

1017 replace_existing=True, 

1018 ) 

1019 

1020 logger.info( 

1021 f"Scheduled overdue subscription {sub.id} to run in {delay_seconds} seconds" 

1022 ) 

1023 

1024 except Exception as e: 

1025 logger.exception( 

1026 f"Error checking overdue subscriptions for {username}: {e}" 

1027 ) 

1028 

1029 def _check_subscription(self, username: str, subscription_id: int): 

1030 """Check and refresh a single subscription.""" 

1031 logger.info( 

1032 f"_check_subscription called for user {username}, subscription {subscription_id}" 

1033 ) 

1034 try: 

1035 session_info = self.user_sessions.get(username) 

1036 if not session_info: 

1037 # User no longer active, cancel job 

1038 job_id = f"{username}_{subscription_id}" 

1039 try: 

1040 self.scheduler.remove_job(job_id) 

1041 except JobLookupError: 

1042 pass 

1043 return 

1044 

1045 password = session_info["password"] 

1046 

1047 # Get subscription details 

1048 from ...database.session_context import get_user_db_session 

1049 from ...database.models.news import NewsSubscription 

1050 

1051 with get_user_db_session(username, password) as db: 

1052 sub = db.query(NewsSubscription).get(subscription_id) 

1053 if not sub or not sub.is_active: 

1054 logger.info( 

1055 f"Subscription {subscription_id} not active, skipping" 

1056 ) 

1057 return 

1058 

1059 # Prepare query with date replacement using user's timezone 

1060 query = sub.query_or_topic 

1061 if "YYYY-MM-DD" in query: 

1062 from ..core.utils import get_local_date_string 

1063 from ...settings.manager import SettingsManager 

1064 

1065 settings_manager = SettingsManager(db) 

1066 local_date = get_local_date_string(settings_manager) 

1067 query = query.replace("YYYY-MM-DD", local_date) 

1068 

1069 # Update last/next refresh times 

1070 sub.last_refresh = datetime.now(UTC) 

1071 sub.next_refresh = datetime.now(UTC) + timedelta( 

1072 minutes=sub.refresh_interval_minutes 

1073 ) 

1074 db.commit() 

1075 

1076 subscription_data = { 

1077 "id": sub.id, 

1078 "name": sub.name, 

1079 "query": query, 

1080 "original_query": sub.query_or_topic, 

1081 "model_provider": sub.model_provider, 

1082 "model": sub.model, 

1083 "search_strategy": sub.search_strategy, 

1084 "search_engine": sub.search_engine, 

1085 } 

1086 

1087 logger.info( 

1088 f"Refreshing subscription {subscription_id}: {subscription_data['name']}" 

1089 ) 

1090 

1091 # Trigger research synchronously using requests with proper auth 

1092 self._trigger_subscription_research_sync( 

1093 username, subscription_data 

1094 ) 

1095 

1096 # Reschedule for next interval if using interval trigger 

1097 job_id = f"{username}_{subscription_id}" 

1098 job = self.scheduler.get_job(job_id) 

1099 if job and job.trigger.__class__.__name__ == "DateTrigger": 

1100 # For date triggers, reschedule 

1101 next_run = datetime.now(UTC) + timedelta( 

1102 minutes=sub.refresh_interval_minutes, 

1103 seconds=random.randint( 

1104 0, int(self.config.get("max_jitter_seconds", 300)) 

1105 ), 

1106 ) 

1107 self.scheduler.add_job( 

1108 func=self._check_subscription, 

1109 args=[username, subscription_id], 

1110 trigger="date", 

1111 run_date=next_run, 

1112 id=job_id, 

1113 replace_existing=True, 

1114 ) 

1115 

1116 except Exception as e: 

1117 logger.exception( 

1118 f"Error checking subscription {subscription_id}: {e}" 

1119 ) 

1120 

1121 def _trigger_subscription_research_sync( 

1122 self, username: str, subscription: Dict[str, Any] 

1123 ): 

1124 """Trigger research for a subscription using programmatic API.""" 

1125 try: 

1126 # Get user's password from session info 

1127 session_info = self.user_sessions.get(username) 

1128 if not session_info: 

1129 logger.error(f"No session info for user {username}") 

1130 return 

1131 

1132 password = session_info["password"] 

1133 

1134 # Generate research ID 

1135 import uuid 

1136 

1137 research_id = str(uuid.uuid4()) 

1138 

1139 logger.info( 

1140 f"Starting research {research_id} for subscription {subscription['id']}" 

1141 ) 

1142 

1143 # Get user settings for research 

1144 from ...database.session_context import get_user_db_session 

1145 from ...settings.manager import SettingsManager 

1146 

1147 with get_user_db_session(username, password) as db: 

1148 settings_manager = SettingsManager(db) 

1149 settings_snapshot = settings_manager.get_settings_snapshot() 

1150 

1151 # Use the search engine from the subscription if specified 

1152 search_engine = subscription.get("search_engine") 

1153 

1154 if search_engine: 

1155 settings_snapshot["search.tool"] = { 

1156 "value": search_engine, 

1157 "ui_element": "select", 

1158 } 

1159 logger.info( 

1160 f"Using subscription's search engine: '{search_engine}' for {subscription['id']}" 

1161 ) 

1162 else: 

1163 # Use the user's default search tool from their settings 

1164 default_search_tool = settings_snapshot.get( 

1165 "search.tool", "auto" 

1166 ) 

1167 logger.info( 

1168 f"Using user's default search tool: '{default_search_tool}' for {subscription['id']}" 

1169 ) 

1170 

1171 logger.debug( 

1172 f"Settings snapshot has {len(settings_snapshot)} settings" 

1173 ) 

1174 # Log a few key settings to verify they're present 

1175 logger.debug( 

1176 f"Key settings: llm.model={settings_snapshot.get('llm.model')}, llm.provider={settings_snapshot.get('llm.provider')}, search.tool={settings_snapshot.get('search.tool')}" 

1177 ) 

1178 

1179 # Set up research parameters 

1180 query = subscription["query"] 

1181 

1182 # Build metadata for news search 

1183 metadata = { 

1184 "is_news_search": True, 

1185 "search_type": "news_analysis", 

1186 "display_in": "news_feed", 

1187 "subscription_id": subscription["id"], 

1188 "triggered_by": "scheduler", 

1189 "subscription_name": subscription["name"], 

1190 "title": subscription["name"] if subscription["name"] else None, 

1191 "scheduled_at": datetime.now(UTC).isoformat(), 

1192 "original_query": subscription["original_query"], 

1193 "user_id": username, 

1194 } 

1195 

1196 # Use programmatic API with settings context 

1197 from ...api.research_functions import quick_summary 

1198 from ...config.thread_settings import set_settings_context 

1199 

1200 # Create and set settings context for this thread 

1201 class SettingsContext: 

1202 def __init__(self, snapshot): 

1203 self.snapshot = snapshot or {} 

1204 self.values = {} 

1205 for key, setting in self.snapshot.items(): 

1206 if isinstance(setting, dict) and "value" in setting: 

1207 self.values[key] = setting["value"] 

1208 else: 

1209 self.values[key] = setting 

1210 

1211 def get_setting(self, key, default=None): 

1212 """Get setting from snapshot only""" 

1213 return self.values.get(key, default) 

1214 

1215 # Set the context for this thread 

1216 settings_context = SettingsContext(settings_snapshot) 

1217 set_settings_context(settings_context) 

1218 

1219 # Get search strategy from subscription data (for the API call) 

1220 search_strategy = subscription.get( 

1221 "search_strategy", "news_aggregation" 

1222 ) 

1223 

1224 # Call quick_summary with appropriate parameters 

1225 result = quick_summary( 

1226 query=query, 

1227 research_id=research_id, 

1228 username=username, 

1229 user_password=password, 

1230 settings_snapshot=settings_snapshot, 

1231 search_strategy=search_strategy, 

1232 model_name=subscription.get("model"), 

1233 provider=subscription.get("model_provider"), 

1234 iterations=1, # Single iteration for news 

1235 metadata=metadata, 

1236 search_original_query=False, # Don't send long subscription prompts to search engines 

1237 ) 

1238 

1239 logger.info( 

1240 f"Completed research {research_id} for subscription {subscription['id']}" 

1241 ) 

1242 

1243 # Store the research result in the database 

1244 self._store_research_result( 

1245 username, 

1246 password, 

1247 research_id, 

1248 subscription["id"], 

1249 result, 

1250 subscription, 

1251 ) 

1252 

1253 except Exception as e: 

1254 logger.exception( 

1255 f"Error triggering research for subscription {subscription['id']}: {e}" 

1256 ) 

1257 

1258 def _store_research_result( 

1259 self, 

1260 username: str, 

1261 password: str, 

1262 research_id: str, 

1263 subscription_id: int, 

1264 result: Dict[str, Any], 

1265 subscription: Dict[str, Any], 

1266 ): 

1267 """Store research result in database for news display.""" 

1268 try: 

1269 from ...database.session_context import get_user_db_session 

1270 from ...database.models import ResearchHistory 

1271 from ...settings.manager import SettingsManager 

1272 import json 

1273 

1274 # Convert result to JSON-serializable format 

1275 def make_serializable(obj): 

1276 """Convert non-serializable objects to dictionaries.""" 

1277 if hasattr(obj, "dict"): 

1278 return obj.dict() 

1279 elif hasattr(obj, "__dict__"): 

1280 return { 

1281 k: make_serializable(v) 

1282 for k, v in obj.__dict__.items() 

1283 if not k.startswith("_") 

1284 } 

1285 elif isinstance(obj, (list, tuple)): 

1286 return [make_serializable(item) for item in obj] 

1287 elif isinstance(obj, dict): 

1288 return {k: make_serializable(v) for k, v in obj.items()} 

1289 else: 

1290 return obj 

1291 

1292 serializable_result = make_serializable(result) 

1293 

1294 with get_user_db_session(username, password) as db: 

1295 # Get user settings to store in metadata 

1296 settings_manager = SettingsManager(db) 

1297 settings_snapshot = settings_manager.get_settings_snapshot() 

1298 

1299 # Get the report content - check both 'report' and 'summary' fields 

1300 report_content = serializable_result.get( 

1301 "report" 

1302 ) or serializable_result.get("summary") 

1303 logger.debug( 

1304 f"Report content length: {len(report_content) if report_content else 0} chars" 

1305 ) 

1306 

1307 # Extract sources/links from the result 

1308 sources = serializable_result.get("sources", []) 

1309 

1310 # First add the sources/references section if we have sources 

1311 if report_content and sources: 

1312 # Import utilities for formatting links 

1313 from ...utilities.search_utilities import ( 

1314 format_links_to_markdown, 

1315 ) 

1316 

1317 # Format the links/citations 

1318 formatted_links = format_links_to_markdown(sources) 

1319 

1320 # Add references section to the report 

1321 if formatted_links: 

1322 report_content = f"{report_content}\n\n## Sources\n\n{formatted_links}" 

1323 

1324 # Then format citations in the report content 

1325 if report_content: 

1326 # Import citation formatter 

1327 from ...text_optimization.citation_formatter import ( 

1328 CitationFormatter, 

1329 CitationMode, 

1330 ) 

1331 from ...config.search_config import ( 

1332 get_setting_from_snapshot, 

1333 ) 

1334 

1335 # Get citation format from settings 

1336 citation_format = get_setting_from_snapshot( 

1337 "report.citation_format", "domain_id_hyperlinks" 

1338 ) 

1339 mode_map = { 

1340 "number_hyperlinks": CitationMode.NUMBER_HYPERLINKS, 

1341 "domain_hyperlinks": CitationMode.DOMAIN_HYPERLINKS, 

1342 "domain_id_hyperlinks": CitationMode.DOMAIN_ID_HYPERLINKS, 

1343 "domain_id_always_hyperlinks": CitationMode.DOMAIN_ID_ALWAYS_HYPERLINKS, 

1344 "no_hyperlinks": CitationMode.NO_HYPERLINKS, 

1345 } 

1346 mode = mode_map.get( 

1347 citation_format, CitationMode.DOMAIN_ID_HYPERLINKS 

1348 ) 

1349 formatter = CitationFormatter(mode=mode) 

1350 

1351 # Format citations within the content 

1352 report_content = formatter.format_document(report_content) 

1353 

1354 if not report_content: 

1355 # If neither field exists, use the full result as JSON 

1356 report_content = json.dumps(serializable_result) 

1357 

1358 # Generate headline and topics for news searches 

1359 from ...news.utils.headline_generator import generate_headline 

1360 from ...news.utils.topic_generator import generate_topics 

1361 

1362 query_text = result.get( 

1363 "query", subscription.get("query", "News Update") 

1364 ) 

1365 

1366 # Generate headline from the actual research findings 

1367 logger.info( 

1368 f"Generating headline for subscription {subscription_id}" 

1369 ) 

1370 generated_headline = generate_headline( 

1371 query=query_text, 

1372 findings=report_content, 

1373 max_length=200, # Allow longer headlines for news 

1374 ) 

1375 

1376 # Generate topics from the findings 

1377 logger.info( 

1378 f"Generating topics for subscription {subscription_id}" 

1379 ) 

1380 generated_topics = generate_topics( 

1381 query=query_text, 

1382 findings=report_content, 

1383 category=subscription.get("name", "News"), 

1384 max_topics=6, 

1385 ) 

1386 

1387 logger.info( 

1388 f"Generated headline: {generated_headline}, topics: {generated_topics}" 

1389 ) 

1390 

1391 # Get subscription name for metadata 

1392 subscription_name = subscription.get("name", "") 

1393 

1394 # Use generated headline as title, or fallback 

1395 if generated_headline: 

1396 title = generated_headline 

1397 else: 

1398 if subscription_name: 

1399 title = f"{subscription_name} - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1400 else: 

1401 title = f"{query_text[:60]}... - {datetime.now(UTC).isoformat(timespec='minutes')}" 

1402 

1403 # Create research history entry 

1404 history_entry = ResearchHistory( 

1405 id=research_id, 

1406 query=result.get("query", ""), 

1407 mode="news_subscription", 

1408 status="completed", 

1409 created_at=datetime.now(UTC).isoformat(), 

1410 completed_at=datetime.now(UTC).isoformat(), 

1411 title=title, 

1412 research_meta={ 

1413 "subscription_id": subscription_id, 

1414 "triggered_by": "scheduler", 

1415 "is_news_search": True, 

1416 "username": username, 

1417 "subscription_name": subscription_name, # Store subscription name for display 

1418 "settings_snapshot": settings_snapshot, # Store settings snapshot for later retrieval 

1419 "generated_headline": generated_headline, # Store generated headline for news display 

1420 "generated_topics": generated_topics, # Store topics for categorization 

1421 }, 

1422 ) 

1423 db.add(history_entry) 

1424 db.commit() 

1425 

1426 # Store the report content using storage abstraction 

1427 from ...storage import get_report_storage 

1428 

1429 # Use storage to save the report (report_content already retrieved above) 

1430 storage = get_report_storage(session=db) 

1431 storage.save_report( 

1432 research_id=research_id, 

1433 content=report_content, 

1434 username=username, 

1435 ) 

1436 

1437 logger.info( 

1438 f"Stored research result {research_id} for subscription {subscription_id}" 

1439 ) 

1440 

1441 except Exception: 

1442 logger.exception("Error storing research result") 

1443 

1444 def _run_cleanup_with_tracking(self): 

1445 """Wrapper that tracks cleanup execution.""" 

1446 

1447 try: 

1448 cleaned_count = self._cleanup_inactive_users() 

1449 

1450 logger.info( 

1451 f"Cleanup successful: removed {cleaned_count} inactive users" 

1452 ) 

1453 

1454 except Exception: 

1455 logger.exception("Cleanup job failed") 

1456 

1457 def _cleanup_inactive_users(self) -> int: 

1458 """Remove users inactive for longer than retention period.""" 

1459 retention_hours = self.config.get("retention_hours", 48) 

1460 cutoff = datetime.now(UTC) - timedelta(hours=retention_hours) 

1461 

1462 cleaned_count = 0 

1463 

1464 with self.lock: 

1465 inactive_users = [ 

1466 user_id 

1467 for user_id, session in self.user_sessions.items() 

1468 if session["last_activity"] < cutoff 

1469 ] 

1470 

1471 for user_id in inactive_users: 1471 ↛ 1473line 1471 didn't jump to line 1473 because the loop on line 1471 never started

1472 # Remove all scheduled jobs 

1473 for job_id in self.user_sessions[user_id]["scheduled_jobs"]: 

1474 try: 

1475 self.scheduler.remove_job(job_id) 

1476 except JobLookupError: 

1477 pass 

1478 

1479 # Clear password from memory 

1480 del self.user_sessions[user_id] 

1481 cleaned_count += 1 

1482 logger.info(f"Cleaned up inactive user {user_id}") 

1483 

1484 return cleaned_count 

1485 

1486 def _reload_config(self): 

1487 """Reload configuration from settings manager.""" 

1488 if not hasattr(self, "settings_manager") or not self.settings_manager: 

1489 return 

1490 

1491 try: 

1492 old_retention = self.config.get("retention_hours", 48) 

1493 

1494 # Reload all settings 

1495 for key in self.config: 

1496 if key == "enabled": 

1497 continue # Don't change enabled state while running 

1498 

1499 full_key = f"news.scheduler.{key}" 

1500 self.config[key] = self._get_setting(full_key, self.config[key]) 

1501 

1502 # Handle changes that need immediate action 

1503 if old_retention != self.config["retention_hours"]: 

1504 logger.info( 

1505 f"Retention period changed from {old_retention} " 

1506 f"to {self.config['retention_hours']} hours" 

1507 ) 

1508 # Trigger immediate cleanup with new retention 

1509 self.scheduler.add_job( 

1510 self._run_cleanup_with_tracking, 

1511 "date", 

1512 run_date=datetime.now(UTC) + timedelta(seconds=5), 

1513 id="immediate_cleanup_config_change", 

1514 ) 

1515 

1516 except Exception: 

1517 logger.exception("Error reloading configuration") 

1518 

1519 def get_status(self) -> Dict[str, Any]: 

1520 """Get scheduler status information.""" 

1521 with self.lock: 

1522 active_users = len(self.user_sessions) 

1523 total_jobs = sum( 

1524 len(session["scheduled_jobs"]) 

1525 for session in self.user_sessions.values() 

1526 ) 

1527 

1528 # Get next run time for cleanup job 

1529 next_cleanup = None 

1530 if self.is_running: 

1531 job = self.scheduler.get_job("cleanup_inactive_users") 

1532 if job: 

1533 next_cleanup = job.next_run_time 

1534 

1535 return { 

1536 "is_running": self.is_running, 

1537 "config": self.config, 

1538 "active_users": active_users, 

1539 "total_scheduled_jobs": total_jobs, 

1540 "next_cleanup": next_cleanup.isoformat() if next_cleanup else None, 

1541 "memory_usage": self._estimate_memory_usage(), 

1542 } 

1543 

1544 def _estimate_memory_usage(self) -> int: 

1545 """Estimate memory usage of user sessions.""" 

1546 

1547 # Rough estimate: username (50) + password (100) + metadata (200) per user 

1548 per_user_estimate = 350 

1549 return len(self.user_sessions) * per_user_estimate 

1550 

1551 def get_user_sessions_summary(self) -> List[Dict[str, Any]]: 

1552 """Get summary of active user sessions (without passwords).""" 

1553 with self.lock: 

1554 summary = [] 

1555 for user_id, session in self.user_sessions.items(): 

1556 summary.append( 

1557 { 

1558 "user_id": user_id, 

1559 "last_activity": session["last_activity"].isoformat(), 

1560 "scheduled_jobs": len(session["scheduled_jobs"]), 

1561 "time_since_activity": str( 

1562 datetime.now(UTC) - session["last_activity"] 

1563 ), 

1564 } 

1565 ) 

1566 return summary 

1567 

1568 

1569# Singleton instance getter 

1570_scheduler_instance = None 

1571 

1572 

1573def get_news_scheduler() -> NewsScheduler: 

1574 """Get the singleton news scheduler instance.""" 

1575 global _scheduler_instance 

1576 if _scheduler_instance is None: 

1577 _scheduler_instance = NewsScheduler() 

1578 return _scheduler_instance