Coverage for src / local_deep_research / news / flask_api.py: 15%
642 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Flask API endpoints for news system.
3Converted from FastAPI to match LDR's Flask architecture.
4"""
6from typing import Dict, Any
7from flask import Blueprint, request, jsonify
8from loguru import logger
10from . import api
11from .folder_manager import FolderManager
12from ..database.models import SubscriptionFolder
13from ..web.auth.decorators import login_required
14from ..database.session_context import get_user_db_session
15from ..utilities.db_utils import get_settings_manager
16from ..security import safe_post
19def safe_error_message(e: Exception, context: str = "") -> str:
20 """
21 Return a safe error message that doesn't expose internal details.
23 Args:
24 e: The exception
25 context: Optional context about what was being attempted
27 Returns:
28 A generic error message safe for external users
29 """
30 # Log the actual error for debugging
31 logger.exception(f"Error in {context}")
33 # Return generic messages based on exception type
34 if isinstance(e, ValueError):
35 return "Invalid input provided"
36 elif isinstance(e, KeyError):
37 return "Required data missing"
38 elif isinstance(e, TypeError):
39 return "Invalid data format"
40 else:
41 # Generic message for production
42 return f"An error occurred{f' while {context}' if context else ''}"
45# Create Blueprint - no url_prefix here since parent blueprint already has /news
46news_api_bp = Blueprint("news_api", __name__, url_prefix="/api")
48# Components are initialized in api.py
51def get_user_id():
52 """Get current user ID from session"""
53 from ..web.auth.decorators import current_user
55 username = current_user()
57 if not username:
58 # For news, we need authenticated users
59 return None
61 return username
64@news_api_bp.route("/feed", methods=["GET"])
65@login_required
66def get_news_feed() -> Dict[str, Any]:
67 """
68 Get personalized news feed for user.
70 Query params:
71 user_id: User identifier (default: anonymous)
72 limit: Maximum number of cards to return (default: 20)
73 use_cache: Whether to use cached news (default: true)
74 strategy: Override default recommendation strategy
75 focus: Optional focus area for news
76 """
77 try:
78 # Get current user (login_required ensures we have one)
79 user_id = get_user_id()
80 logger.info(f"News feed requested by user: {user_id}")
82 # Get query parameters
83 settings_manager = get_settings_manager()
84 default_limit = settings_manager.get_setting("news.feed.default_limit")
85 limit = int(request.args.get("limit", default_limit))
86 use_cache = request.args.get("use_cache", "true").lower() == "true"
87 strategy = request.args.get("strategy")
88 focus = request.args.get("focus")
89 subscription_id = request.args.get("subscription_id")
91 logger.info(
92 f"News feed params: limit={limit}, subscription_id={subscription_id}, focus={focus}"
93 )
95 # Call the direct API function (now synchronous)
96 result = api.get_news_feed(
97 user_id=user_id,
98 limit=limit,
99 use_cache=use_cache,
100 focus=focus,
101 search_strategy=strategy,
102 subscription_id=subscription_id,
103 )
105 # Check for errors in result
106 if "error" in result and result.get("news_items") == []:
107 # Sanitize error message before returning to client
108 safe_msg = safe_error_message(
109 Exception(result["error"]), context="get_news_feed"
110 )
111 return jsonify(
112 {"error": safe_msg, "news_items": []}
113 ), 400 if "must be between" in result["error"] else 500
115 # Debug: Log the result before returning
116 logger.info(
117 f"API returning {len(result.get('news_items', []))} news items"
118 )
119 if result.get("news_items"):
120 logger.info(
121 f"First item ID: {result['news_items'][0].get('id', 'NO_ID')}"
122 )
124 return jsonify(result)
126 except Exception as e:
127 return jsonify(
128 {
129 "error": safe_error_message(e, "getting news feed"),
130 "news_items": [],
131 }
132 ), 500
135@news_api_bp.route("/subscribe", methods=["POST"])
136@login_required
137def create_subscription() -> Dict[str, Any]:
138 """
139 Create a new subscription for user.
141 JSON body:
142 query: Search query or topic
143 subscription_type: "search" or "topic" (default: "search")
144 refresh_minutes: Refresh interval in minutes (default: from settings)
145 """
146 try:
147 data = request.get_json(force=True)
148 except Exception:
149 # Handle invalid JSON
150 return jsonify({"error": "Invalid JSON data"}), 400
152 try:
153 if not data:
154 return jsonify({"error": "No JSON data provided"}), 400
156 # Get current user
157 user_id = get_user_id()
159 # Extract parameters
160 query = data.get("query")
161 subscription_type = data.get("subscription_type", "search")
162 refresh_minutes = data.get(
163 "refresh_minutes"
164 ) # Will use default from api.py
166 # Extract model configuration (optional)
167 model_provider = data.get("model_provider")
168 model = data.get("model")
169 search_strategy = data.get("search_strategy", "news_aggregation")
170 custom_endpoint = data.get("custom_endpoint")
172 # Extract additional fields
173 name = data.get("name")
174 folder_id = data.get("folder_id")
175 is_active = data.get("is_active", True)
176 search_engine = data.get("search_engine")
177 search_iterations = data.get("search_iterations")
178 questions_per_iteration = data.get("questions_per_iteration")
180 # Validate required fields
181 if not query:
182 return jsonify({"error": "query is required"}), 400
184 # Call the direct API function
185 result = api.create_subscription(
186 user_id=user_id,
187 query=query,
188 subscription_type=subscription_type,
189 refresh_minutes=refresh_minutes,
190 model_provider=model_provider,
191 model=model,
192 search_strategy=search_strategy,
193 custom_endpoint=custom_endpoint,
194 name=name,
195 folder_id=folder_id,
196 is_active=is_active,
197 search_engine=search_engine,
198 search_iterations=search_iterations,
199 questions_per_iteration=questions_per_iteration,
200 )
202 return jsonify(result)
204 except ValueError as e:
205 return jsonify(
206 {"error": safe_error_message(e, "creating subscription")}
207 ), 400
208 except Exception as e:
209 return jsonify(
210 {"error": safe_error_message(e, "creating subscription")}
211 ), 500
214@news_api_bp.route("/vote", methods=["POST"])
215@login_required
216def vote_on_news() -> Dict[str, Any]:
217 """
218 Submit vote on a news item.
220 JSON body:
221 card_id: ID of the news card
222 vote: "up" or "down"
223 """
224 try:
225 data = request.get_json()
226 if not data:
227 return jsonify({"error": "No JSON data provided"}), 400
229 # Get current user
230 user_id = get_user_id()
232 card_id = data.get("card_id")
233 vote = data.get("vote")
235 # Validate
236 if not all([card_id, vote]):
237 return jsonify({"error": "card_id and vote are required"}), 400
239 # Call the direct API function
240 result = api.submit_feedback(
241 card_id=card_id, user_id=user_id, vote=vote
242 )
244 return jsonify(result)
246 except ValueError as e:
247 error_msg = str(e)
248 if "not found" in error_msg.lower():
249 return jsonify({"error": "Resource not found"}), 404
250 else:
251 return jsonify(
252 {"error": safe_error_message(e, "submitting vote")}
253 ), 400
254 except Exception as e:
255 return jsonify({"error": safe_error_message(e, "submitting vote")}), 500
258@news_api_bp.route("/feedback/batch", methods=["POST"])
259@login_required
260def get_batch_feedback() -> Dict[str, Any]:
261 """
262 Get feedback (votes) for multiple news cards.
263 JSON body:
264 card_ids: List of card IDs
265 """
266 try:
267 data = request.get_json()
268 if not data:
269 return jsonify({"error": "No JSON data provided"}), 400
271 card_ids = data.get("card_ids", [])
272 if not card_ids:
273 return jsonify({"votes": {}})
275 # Get current user
276 user_id = get_user_id()
278 # Call the direct API function
279 result = api.get_votes_for_cards(card_ids=card_ids, user_id=user_id)
281 return jsonify(result)
283 except ValueError as e:
284 error_msg = str(e)
285 if "not found" in error_msg.lower():
286 return jsonify({"error": "Resource not found"}), 404
287 return jsonify({"error": safe_error_message(e, "getting votes")}), 400
288 except Exception as e:
289 logger.exception("Error getting batch feedback")
290 return jsonify({"error": safe_error_message(e, "getting votes")}), 500
293@news_api_bp.route("/feedback/<card_id>", methods=["POST"])
294@login_required
295def submit_feedback(card_id: str) -> Dict[str, Any]:
296 """
297 Submit feedback (vote) for a news card.
299 JSON body:
300 vote: "up" or "down"
301 """
302 try:
303 data = request.get_json()
304 if not data:
305 return jsonify({"error": "No JSON data provided"}), 400
307 # Get current user
308 user_id = get_user_id()
309 vote = data.get("vote")
311 # Validate
312 if not vote:
313 return jsonify({"error": "vote is required"}), 400
315 # Call the direct API function
316 result = api.submit_feedback(
317 card_id=card_id, user_id=user_id, vote=vote
318 )
320 return jsonify(result)
322 except ValueError as e:
323 error_msg = str(e)
324 if "not found" in error_msg.lower():
325 return jsonify({"error": "Resource not found"}), 404
326 elif "must be" in error_msg.lower():
327 return jsonify({"error": "Invalid input value"}), 400
328 else:
329 return jsonify(
330 {"error": safe_error_message(e, "submitting feedback")}
331 ), 400
332 except Exception as e:
333 return jsonify(
334 {"error": safe_error_message(e, "submitting feedback")}
335 ), 500
338@news_api_bp.route("/research/<card_id>", methods=["POST"])
339@login_required
340def research_news_item(card_id: str) -> Dict[str, Any]:
341 """
342 Perform deeper research on a news item.
344 JSON body:
345 depth: "quick", "detailed", or "report" (default: "quick")
346 """
347 try:
348 data = request.get_json() or {}
349 depth = data.get("depth", "quick")
351 # Call the API function which handles the research
352 result = api.research_news_item(card_id, depth)
354 return jsonify(result)
356 except Exception as e:
357 return jsonify(
358 {"error": safe_error_message(e, "researching news item")}
359 ), 500
362@news_api_bp.route("/subscriptions/current", methods=["GET"])
363@login_required
364def get_current_user_subscriptions() -> Dict[str, Any]:
365 """Get all subscriptions for current user."""
366 try:
367 # Get current user
368 user_id = get_user_id()
370 # Ensure we have a database session for the user
371 # This will trigger register_activity
372 logger.debug(f"Getting news feed for user {user_id}")
374 # Use the API function
375 result = api.get_subscriptions(user_id)
376 if "error" in result:
377 logger.error(
378 f"Error getting subscriptions for user {user_id}: {result['error']}"
379 )
380 return jsonify({"error": "Failed to retrieve subscriptions"}), 500
381 return jsonify(result)
383 except Exception as e:
384 return jsonify(
385 {"error": safe_error_message(e, "getting subscriptions")}
386 ), 500
389@news_api_bp.route("/subscriptions/<subscription_id>", methods=["GET"])
390@login_required
391def get_subscription(subscription_id: str) -> Dict[str, Any]:
392 """Get a single subscription by ID."""
393 try:
394 # Handle null or invalid subscription IDs
395 if (
396 subscription_id == "null"
397 or subscription_id == "undefined"
398 or not subscription_id
399 ):
400 return jsonify({"error": "Invalid subscription ID"}), 400
402 # Get the subscription
403 subscription = api.get_subscription(subscription_id)
405 if not subscription:
406 return jsonify({"error": "Subscription not found"}), 404
408 return jsonify(subscription)
410 except Exception as e:
411 return jsonify(
412 {"error": safe_error_message(e, "getting subscription")}
413 ), 500
416@news_api_bp.route("/subscriptions/<subscription_id>", methods=["PUT"])
417@login_required
418def update_subscription(subscription_id: str) -> Dict[str, Any]:
419 """Update a subscription."""
420 try:
421 data = request.get_json(force=True)
422 except Exception:
423 return jsonify({"error": "Invalid JSON data"}), 400
425 try:
426 if not data:
427 return jsonify({"error": "No JSON data provided"}), 400
429 # Prepare update data
430 update_data = {}
432 # Map fields from request to storage format
433 field_mapping = {
434 "query": "query_or_topic",
435 "name": "name",
436 "refresh_minutes": "refresh_interval_minutes",
437 "is_active": "is_active",
438 "folder_id": "folder_id",
439 "model_provider": "model_provider",
440 "model": "model",
441 "search_strategy": "search_strategy",
442 "custom_endpoint": "custom_endpoint",
443 "search_engine": "search_engine",
444 "search_iterations": "search_iterations",
445 "questions_per_iteration": "questions_per_iteration",
446 }
448 for request_field, storage_field in field_mapping.items():
449 if request_field in data:
450 update_data[storage_field] = data[request_field]
452 # Update subscription
453 result = api.update_subscription(subscription_id, update_data)
455 if "error" in result:
456 # Sanitize error message before returning to client
457 original_error = result["error"]
458 result["error"] = safe_error_message(
459 Exception(original_error), "updating subscription"
460 )
461 if "not found" in original_error.lower():
462 return jsonify(result), 404
463 else:
464 return jsonify(result), 400
466 return jsonify(result)
468 except Exception as e:
469 return jsonify(
470 {"error": safe_error_message(e, "updating subscription")}
471 ), 500
474@news_api_bp.route("/subscriptions/<subscription_id>", methods=["DELETE"])
475@login_required
476def delete_subscription(subscription_id: str) -> Dict[str, Any]:
477 """Delete a subscription."""
478 try:
479 # Call the direct API function
480 success = api.delete_subscription(subscription_id)
482 if success:
483 return jsonify(
484 {
485 "status": "success",
486 "message": f"Subscription {subscription_id} deleted",
487 }
488 )
489 else:
490 return jsonify({"error": "Subscription not found"}), 404
492 except Exception as e:
493 return jsonify(
494 {"error": safe_error_message(e, "deleting subscription")}
495 ), 500
498@news_api_bp.route("/subscriptions/<subscription_id>/run", methods=["POST"])
499@login_required
500def run_subscription_now(subscription_id: str) -> Dict[str, Any]:
501 """Manually trigger a subscription to run now."""
502 try:
503 # Get the subscription from the API
504 subscription_data = api.get_subscriptions("anonymous")
506 # Find the specific subscription
507 subscription = None
508 for sub in subscription_data.get("subscriptions", []):
509 if sub["id"] == subscription_id:
510 subscription = sub
511 break
513 if not subscription:
514 return jsonify({"error": "Subscription not found"}), 404
516 # Get timezone-aware current date using settings
517 from flask import session
518 from .core.utils import get_local_date_string
519 from ..database.session_context import get_user_db_session
520 from ..settings.manager import SettingsManager
522 username = session.get("username", "anonymous")
523 with get_user_db_session(username) as db:
524 settings_manager = SettingsManager(db)
525 current_date = get_local_date_string(settings_manager)
527 # Get the query and update dates
528 query = subscription["query"]
530 # Replace YYYY-MM-DD placeholder ONLY (not all dates)
531 query = query.replace("YYYY-MM-DD", current_date)
533 # Build request data similar to news page
534 request_data = {
535 "query": query,
536 "mode": "quick",
537 # Use subscription's model configuration if available
538 "model_provider": subscription.get(
539 "model_provider", "OLLAMA"
540 ), # Default: llm.provider
541 "model": subscription.get("model", "llama3"), # Default: llm.model
542 "strategy": subscription.get("search_strategy", "news_aggregation"),
543 "metadata": {
544 "is_news_search": True,
545 "search_type": "news_analysis",
546 "display_in": "news_feed",
547 "subscription_id": subscription_id,
548 "triggered_by": "manual",
549 "original_query": subscription[
550 "query"
551 ], # Store original query with placeholder
552 "processed_query": query, # Store processed query with replaced date
553 "news_date": current_date, # Store the actual date used
554 "title": subscription.get("name")
555 if subscription.get("name")
556 else None,
557 },
558 }
560 # Add custom endpoint if specified
561 if subscription.get("custom_endpoint"):
562 request_data["custom_endpoint"] = subscription["custom_endpoint"]
564 # Call the main research API endpoint (use the one from research blueprint)
565 # Use request.host_url to get the actual URL the server is responding on
566 base_url = request.host_url.rstrip("/")
568 response = safe_post(
569 f"{base_url}/research/api/start_research",
570 json=request_data,
571 headers={"Content-Type": "application/json"},
572 allow_localhost=True,
573 allow_private_ips=True,
574 )
576 if response.ok:
577 data = response.json()
578 if data.get("status") == "success":
579 return jsonify(
580 {
581 "status": "success",
582 "message": "Research started",
583 "research_id": data.get("research_id"),
584 "url": f"/progress/{data.get('research_id')}",
585 }
586 )
587 else:
588 return jsonify(
589 {"error": data.get("message", "Failed to start research")}
590 ), 500
591 else:
592 return jsonify(
593 {"error": f"Failed to start research: {response.status_code}"}
594 ), response.status_code
596 except Exception as e:
597 return jsonify(
598 {"error": safe_error_message(e, "running subscription")}
599 ), 500
602@news_api_bp.route("/subscriptions/<subscription_id>/history", methods=["GET"])
603@login_required
604def get_subscription_history(subscription_id: str) -> Dict[str, Any]:
605 """Get research history for a subscription."""
606 try:
607 settings_manager = get_settings_manager()
608 default_limit = settings_manager.get_setting("news.feed.default_limit")
609 limit = int(request.args.get("limit", default_limit))
610 result = api.get_subscription_history(subscription_id, limit)
611 if "error" in result:
612 logger.error(
613 f"Error getting subscription history: {result['error']}"
614 )
615 return jsonify(
616 {
617 "error": "Failed to retrieve subscription history",
618 "history": [],
619 }
620 ), 500
621 return jsonify(result)
622 except Exception as e:
623 return jsonify(
624 {"error": safe_error_message(e, "getting subscription history")}
625 ), 500
628@news_api_bp.route("/preferences", methods=["POST"])
629@login_required
630def save_preferences() -> Dict[str, Any]:
631 """Save user preferences for news."""
632 try:
633 data = request.get_json()
634 if not data:
635 return jsonify({"error": "No JSON data provided"}), 400
637 # Get current user
638 user_id = get_user_id()
639 preferences = data.get("preferences", {})
641 # Call the direct API function
642 result = api.save_news_preferences(user_id, preferences)
644 return jsonify(result)
646 except Exception as e:
647 return jsonify(
648 {"error": safe_error_message(e, "saving preferences")}
649 ), 500
652@news_api_bp.route("/categories", methods=["GET"])
653def get_categories() -> Dict[str, Any]:
654 """Get news category distribution."""
655 try:
656 # Call the direct API function
657 result = api.get_news_categories()
659 return jsonify(result)
661 except Exception as e:
662 return jsonify(
663 {"error": safe_error_message(e, "getting categories")}
664 ), 500
667@news_api_bp.route("/scheduler/status", methods=["GET"])
668def get_scheduler_status() -> Dict[str, Any]:
669 """Get activity-based scheduler status."""
670 try:
671 logger.info("Scheduler status endpoint called")
672 from .subscription_manager.scheduler import get_news_scheduler
674 # Get scheduler instance
675 scheduler = get_news_scheduler()
676 logger.info(
677 f"Scheduler instance obtained: is_running={scheduler.is_running}"
678 )
680 # Build status manually to avoid potential deadlock
681 status = {
682 "scheduler_available": True, # APScheduler is installed and working
683 "is_running": scheduler.is_running,
684 "config": scheduler.config.copy()
685 if hasattr(scheduler, "config")
686 else {},
687 "active_users": len(scheduler.user_sessions)
688 if hasattr(scheduler, "user_sessions")
689 else 0,
690 "total_scheduled_jobs": 0,
691 }
693 # Count scheduled jobs
694 if hasattr(scheduler, "user_sessions"):
695 total_jobs = sum(
696 len(session.get("scheduled_jobs", set()))
697 for session in scheduler.user_sessions.values()
698 )
699 status["total_scheduled_jobs"] = total_jobs
701 # Also count actual APScheduler jobs
702 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
703 try:
704 apscheduler_jobs = scheduler.scheduler.get_jobs()
705 status["apscheduler_job_count"] = len(apscheduler_jobs)
706 status["apscheduler_jobs"] = [
707 {
708 "id": job.id,
709 "name": job.name,
710 "next_run": job.next_run_time.isoformat()
711 if job.next_run_time
712 else None,
713 }
714 for job in apscheduler_jobs[
715 :10
716 ] # Limit to first 10 for display
717 ]
718 except Exception:
719 logger.exception("Error getting APScheduler jobs")
720 status["apscheduler_job_count"] = 0
722 logger.info(f"Status built: {list(status.keys())}")
724 # Add scheduled_jobs field that JS expects
725 status["scheduled_jobs"] = status.get("total_scheduled_jobs", 0)
727 logger.info(
728 f"Returning status: is_running={status.get('is_running')}, active_users={status.get('active_users')}"
729 )
730 return jsonify(status)
732 except Exception as e:
733 return jsonify(
734 {"error": safe_error_message(e, "getting scheduler status")}
735 ), 500
738@news_api_bp.route("/scheduler/start", methods=["POST"])
739@login_required
740def start_scheduler() -> Dict[str, Any]:
741 """Start the subscription scheduler."""
742 try:
743 from flask import current_app
744 from .subscription_manager.scheduler import get_news_scheduler
746 # Get scheduler instance
747 scheduler = get_news_scheduler()
749 if scheduler.is_running:
750 return jsonify({"message": "Scheduler is already running"}), 200
752 # Start the scheduler
753 scheduler.start()
755 # Update app reference
756 current_app.news_scheduler = scheduler
758 logger.info("News scheduler started via API")
759 return jsonify(
760 {
761 "status": "success",
762 "message": "Scheduler started",
763 "active_users": len(scheduler.user_sessions),
764 }
765 )
767 except Exception as e:
768 return jsonify(
769 {"error": safe_error_message(e, "starting scheduler")}
770 ), 500
773@news_api_bp.route("/scheduler/stop", methods=["POST"])
774def stop_scheduler() -> Dict[str, Any]:
775 """Stop the subscription scheduler."""
776 try:
777 from flask import current_app
779 if (
780 hasattr(current_app, "news_scheduler")
781 and current_app.news_scheduler
782 ):
783 scheduler = current_app.news_scheduler
784 if scheduler.is_running:
785 scheduler.stop()
786 logger.info("News scheduler stopped via API")
787 return jsonify(
788 {"status": "success", "message": "Scheduler stopped"}
789 )
790 else:
791 return jsonify({"message": "Scheduler is not running"}), 200
792 else:
793 return jsonify({"message": "No scheduler instance found"}), 404
795 except Exception as e:
796 return jsonify(
797 {"error": safe_error_message(e, "stopping scheduler")}
798 ), 500
801@news_api_bp.route("/scheduler/check-now", methods=["POST"])
802def check_subscriptions_now() -> Dict[str, Any]:
803 """Manually trigger subscription checking."""
804 try:
805 from flask import current_app
807 if (
808 not hasattr(current_app, "news_scheduler")
809 or not current_app.news_scheduler
810 ):
811 return jsonify({"error": "Scheduler not initialized"}), 503
813 scheduler = current_app.news_scheduler
814 if not scheduler.is_running:
815 return jsonify({"error": "Scheduler is not running"}), 503
817 # Run the check subscriptions task immediately
818 scheduler_instance = current_app.news_scheduler
820 # Get count of due subscriptions
821 from ..database.models import NewsSubscription as BaseSubscription
822 from datetime import datetime, timezone
824 with get_user_db_session() as session:
825 now = datetime.now(timezone.utc)
826 count = (
827 session.query(BaseSubscription)
828 .filter(
829 BaseSubscription.status == "active",
830 (BaseSubscription.next_refresh.is_(None))
831 | (BaseSubscription.next_refresh <= now),
832 )
833 .count()
834 )
836 # Trigger the check asynchronously
837 import threading
839 check_thread = threading.Thread(
840 target=scheduler_instance._check_subscriptions
841 )
842 check_thread.daemon = True
843 check_thread.start()
845 return jsonify(
846 {
847 "status": "success",
848 "message": f"Checking {count} due subscriptions",
849 "count": count,
850 }
851 )
853 except Exception as e:
854 return jsonify(
855 {"error": safe_error_message(e, "checking subscriptions")}
856 ), 500
859@news_api_bp.route("/scheduler/cleanup-now", methods=["POST"])
860@login_required
861def trigger_cleanup() -> Dict[str, Any]:
862 """Manually trigger cleanup job."""
863 try:
864 from .subscription_manager.scheduler import get_news_scheduler
865 from datetime import datetime, UTC, timedelta
867 scheduler = get_news_scheduler()
869 if not scheduler.is_running:
870 return jsonify({"error": "Scheduler is not running"}), 400
872 # Schedule cleanup to run in 1 second
873 scheduler.scheduler.add_job(
874 scheduler._run_cleanup_with_tracking,
875 "date",
876 run_date=datetime.now(UTC) + timedelta(seconds=1),
877 id="manual_cleanup_trigger",
878 )
880 return jsonify(
881 {
882 "status": "triggered",
883 "message": "Cleanup job will run within seconds",
884 }
885 )
887 except Exception as e:
888 return jsonify(
889 {"error": safe_error_message(e, "triggering cleanup")}
890 ), 500
893@news_api_bp.route("/scheduler/users", methods=["GET"])
894@login_required
895def get_active_users() -> Dict[str, Any]:
896 """Get summary of active user sessions."""
897 try:
898 from .subscription_manager.scheduler import get_news_scheduler
900 scheduler = get_news_scheduler()
901 users_summary = scheduler.get_user_sessions_summary()
903 return jsonify(
904 {"active_users": len(users_summary), "users": users_summary}
905 )
907 except Exception as e:
908 return jsonify(
909 {"error": safe_error_message(e, "getting active users")}
910 ), 500
913@news_api_bp.route("/scheduler/stats", methods=["GET"])
914@login_required
915def scheduler_stats() -> Dict[str, Any]:
916 """Get scheduler statistics and state."""
917 try:
918 from .subscription_manager.scheduler import get_news_scheduler
919 from flask import session
921 scheduler = get_news_scheduler()
922 username = session.get("username")
924 # Debug info
925 debug_info = {
926 "current_user": username,
927 "scheduler_running": scheduler.is_running,
928 "user_sessions": {},
929 "apscheduler_jobs": [],
930 }
932 # Get user session info
933 if hasattr(scheduler, "user_sessions"):
934 for user, session_info in scheduler.user_sessions.items():
935 debug_info["user_sessions"][user] = {
936 "has_password": bool(session_info.get("password")),
937 "last_activity": session_info.get(
938 "last_activity"
939 ).isoformat()
940 if session_info.get("last_activity")
941 else None,
942 "scheduled_jobs_count": len(
943 session_info.get("scheduled_jobs", set())
944 ),
945 }
947 # Get APScheduler jobs
948 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
949 jobs = scheduler.scheduler.get_jobs()
950 debug_info["apscheduler_jobs"] = [
951 {
952 "id": job.id,
953 "name": job.name,
954 "next_run": job.next_run_time.isoformat()
955 if job.next_run_time
956 else None,
957 "trigger": str(job.trigger),
958 }
959 for job in jobs
960 ]
962 # Force schedule for current user
963 if username and username in scheduler.user_sessions:
964 logger.info(f"Forcing schedule update for {username}")
965 scheduler._schedule_user_subscriptions(username)
966 debug_info["forced_schedule"] = True
968 return jsonify(debug_info)
970 except Exception as e:
971 return jsonify(
972 {"error": safe_error_message(e, "getting scheduler stats")}
973 ), 500
976@news_api_bp.route("/check-overdue", methods=["POST"])
977@login_required
978def check_overdue_subscriptions():
979 """Check and run all overdue subscriptions for the current user."""
980 try:
981 from flask import session
982 from ..database.session_context import get_user_db_session
983 from ..database.models.news import NewsSubscription
984 from datetime import datetime, UTC, timedelta
986 username = session.get("username", "anonymous")
988 # Get overdue subscriptions
989 overdue_count = 0
990 results = []
991 with get_user_db_session(username) as db:
992 now = datetime.now(UTC)
993 overdue_subs = (
994 db.query(NewsSubscription)
995 .filter(
996 NewsSubscription.status == "active",
997 NewsSubscription.next_refresh <= now,
998 )
999 .all()
1000 )
1002 logger.info(
1003 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1004 )
1006 # Get timezone-aware current date using settings
1007 from .core.utils import get_local_date_string
1008 from ..settings.manager import SettingsManager
1010 settings_manager = SettingsManager(db)
1011 current_date = get_local_date_string(settings_manager)
1013 for sub in overdue_subs:
1014 try:
1015 # Run the subscription using the same pattern as run_subscription_now
1016 logger.info(
1017 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1018 )
1020 # Update any date placeholders with current date in user's timezone
1021 query = sub.query_or_topic.replace(
1022 "YYYY-MM-DD", current_date
1023 )
1025 # Build request data
1026 request_data = {
1027 "query": query,
1028 "mode": "quick",
1029 "model_provider": sub.model_provider or "OLLAMA",
1030 "model": sub.model or "llama3",
1031 "strategy": sub.search_strategy or "news_aggregation",
1032 "metadata": {
1033 "is_news_search": True,
1034 "search_type": "news_analysis",
1035 "display_in": "news_feed",
1036 "subscription_id": str(sub.id),
1037 "triggered_by": "overdue_check",
1038 "original_query": sub.query_or_topic,
1039 "processed_query": query,
1040 "news_date": current_date,
1041 "title": sub.name if sub.name else None,
1042 },
1043 }
1045 # Add optional search parameters
1046 if sub.search_engine:
1047 request_data["search_engine"] = sub.search_engine
1048 if sub.custom_endpoint:
1049 request_data["custom_endpoint"] = sub.custom_endpoint
1051 # Start research using HTTP request like run_subscription_now
1052 logger.info(
1053 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1054 )
1056 # Make HTTP request to research API
1057 from flask import request
1059 # Use request.host_url to get the actual URL the server is responding on
1060 base_url = request.host_url.rstrip("/")
1062 # Use the session from the current request to maintain authentication
1063 session_cookie = request.cookies.get("session")
1065 response = safe_post(
1066 f"{base_url}/research/api/start_research",
1067 json=request_data,
1068 headers={
1069 "Content-Type": "application/json",
1070 "Cookie": f"session={session_cookie}"
1071 if session_cookie
1072 else "",
1073 },
1074 timeout=30,
1075 allow_localhost=True,
1076 allow_private_ips=True,
1077 )
1079 if response.ok:
1080 result = response.json()
1081 else:
1082 result = {
1083 "status": "error",
1084 "error": f"HTTP {response.status_code}: {response.text}",
1085 }
1087 if result.get("status") == "success":
1088 overdue_count += 1
1090 # Update subscription's last/next refresh times
1091 sub.last_refresh = datetime.now(UTC)
1092 sub.next_refresh = datetime.now(UTC) + timedelta(
1093 minutes=sub.refresh_interval_minutes
1094 )
1095 db.commit()
1097 results.append(
1098 {
1099 "id": str(sub.id),
1100 "name": sub.name or sub.query_or_topic[:50],
1101 "research_id": result.get("research_id"),
1102 }
1103 )
1104 else:
1105 results.append(
1106 {
1107 "id": str(sub.id),
1108 "name": sub.name or sub.query_or_topic[:50],
1109 "error": result.get(
1110 "error", "Failed to start research"
1111 ),
1112 }
1113 )
1114 except Exception as e:
1115 logger.exception(
1116 f"Error running subscription {sub.id}: {e}"
1117 )
1118 results.append(
1119 {
1120 "id": str(sub.id),
1121 "name": sub.name or sub.query_or_topic[:50],
1122 "error": safe_error_message(
1123 e, "running subscription"
1124 ),
1125 }
1126 )
1128 return jsonify(
1129 {
1130 "status": "success",
1131 "overdue_found": len(overdue_subs),
1132 "started": overdue_count,
1133 "results": results,
1134 }
1135 )
1137 except Exception as e:
1138 return jsonify(
1139 {"error": safe_error_message(e, "checking overdue subscriptions")}
1140 ), 500
1143# Folder and subscription management routes
1144@news_api_bp.route("/subscription/folders", methods=["GET"])
1145@login_required
1146def get_folders():
1147 """Get all folders for the current user"""
1148 try:
1149 user_id = get_user_id()
1151 with get_user_db_session() as session:
1152 manager = FolderManager(session)
1153 folders = manager.get_user_folders(user_id)
1155 return jsonify([folder.to_dict() for folder in folders])
1157 except Exception as e:
1158 return jsonify({"error": safe_error_message(e, "getting folders")}), 500
1161@news_api_bp.route("/subscription/folders", methods=["POST"])
1162@login_required
1163def create_folder():
1164 """Create a new folder"""
1165 try:
1166 data = request.json
1168 if not data.get("name"):
1169 return jsonify({"error": "Folder name is required"}), 400
1171 with get_user_db_session() as session:
1172 manager = FolderManager(session)
1174 # Check if folder already exists
1175 existing = (
1176 session.query(SubscriptionFolder)
1177 .filter_by(name=data["name"])
1178 .first()
1179 )
1180 if existing:
1181 return jsonify({"error": "Folder already exists"}), 409
1183 folder = manager.create_folder(
1184 name=data["name"],
1185 description=data.get("description"),
1186 )
1188 return jsonify(folder.to_dict()), 201
1190 except Exception as e:
1191 return jsonify({"error": safe_error_message(e, "creating folder")}), 500
1194@news_api_bp.route("/subscription/folders/<folder_id>", methods=["PUT"])
1195@login_required
1196def update_folder(folder_id):
1197 """Update a folder"""
1198 try:
1199 data = request.json
1201 with get_user_db_session() as session:
1202 manager = FolderManager(session)
1203 folder = manager.update_folder(folder_id, **data)
1205 if not folder:
1206 return jsonify({"error": "Folder not found"}), 404
1208 return jsonify(folder.to_dict())
1210 except Exception as e:
1211 return jsonify({"error": safe_error_message(e, "updating folder")}), 500
1214@news_api_bp.route("/subscription/folders/<folder_id>", methods=["DELETE"])
1215@login_required
1216def delete_folder(folder_id):
1217 """Delete a folder"""
1218 try:
1219 move_to = request.args.get("move_to")
1221 with get_user_db_session() as session:
1222 manager = FolderManager(session)
1223 success = manager.delete_folder(folder_id, move_to)
1225 if not success:
1226 return jsonify({"error": "Folder not found"}), 404
1228 return jsonify({"status": "deleted"}), 200
1230 except Exception as e:
1231 return jsonify({"error": safe_error_message(e, "deleting folder")}), 500
1234@news_api_bp.route("/subscription/subscriptions/organized", methods=["GET"])
1235@login_required
1236def get_subscriptions_organized():
1237 """Get subscriptions organized by folder"""
1238 try:
1239 user_id = get_user_id()
1241 with get_user_db_session() as session:
1242 manager = FolderManager(session)
1243 organized = manager.get_subscriptions_by_folder(user_id)
1245 # Convert to JSON-friendly format
1246 result = {}
1247 for folder, subs in organized.items():
1248 result[folder] = [sub.to_dict() for sub in subs]
1250 return jsonify(result)
1252 except Exception as e:
1253 return jsonify(
1254 {"error": safe_error_message(e, "getting organized subscriptions")}
1255 ), 500
1258@news_api_bp.route(
1259 "/subscription/subscriptions/<subscription_id>", methods=["PUT"]
1260)
1261@login_required
1262def update_subscription_folder(subscription_id):
1263 """Update a subscription (mainly for folder assignment)"""
1264 try:
1265 data = request.json
1266 logger.info(
1267 f"Updating subscription {subscription_id} with data: {data}"
1268 )
1270 with get_user_db_session() as session:
1271 # Manually handle the update to ensure next_refresh is recalculated
1272 from ...database.models import NewsSubscription as BaseSubscription
1273 from datetime import datetime, timedelta, timezone
1275 sub = (
1276 session.query(BaseSubscription)
1277 .filter_by(id=subscription_id)
1278 .first()
1279 )
1280 if not sub:
1281 return jsonify({"error": "Subscription not found"}), 404
1283 # Update fields
1284 for key, value in data.items():
1285 if hasattr(sub, key) and key not in [
1286 "id",
1287 "user_id",
1288 "created_at",
1289 ]:
1290 setattr(sub, key, value)
1292 # Recalculate next_refresh if refresh_interval_minutes changed
1293 if "refresh_interval_minutes" in data:
1294 new_minutes = data["refresh_interval_minutes"]
1295 if sub.last_refresh:
1296 sub.next_refresh = sub.last_refresh + timedelta(
1297 minutes=new_minutes
1298 )
1299 else:
1300 sub.next_refresh = datetime.now(timezone.utc) + timedelta(
1301 minutes=new_minutes
1302 )
1303 logger.info(f"Recalculated next_refresh: {sub.next_refresh}")
1305 sub.updated_at = datetime.now(timezone.utc)
1306 session.commit()
1308 result = sub.to_dict()
1309 logger.info(
1310 f"Updated subscription result: refresh_interval_minutes={result.get('refresh_interval_minutes')}, next_refresh={result.get('next_refresh')}"
1311 )
1312 return jsonify(result)
1313 # Force reload: v2
1315 except Exception as e:
1316 return jsonify(
1317 {"error": safe_error_message(e, "updating subscription")}
1318 ), 500
1321@news_api_bp.route("/subscription/stats", methods=["GET"])
1322@login_required
1323def get_subscription_stats():
1324 """Get subscription statistics"""
1325 try:
1326 user_id = get_user_id()
1328 with get_user_db_session() as session:
1329 manager = FolderManager(session)
1330 stats = manager.get_subscription_stats(user_id)
1332 return jsonify(stats)
1334 except Exception as e:
1335 return jsonify({"error": safe_error_message(e, "getting stats")}), 500
1338# Error handlers
1339@news_api_bp.errorhandler(400)
1340def bad_request(e):
1341 return jsonify({"error": "Bad request"}), 400
1344@news_api_bp.errorhandler(404)
1345def not_found(e):
1346 return jsonify({"error": "Resource not found"}), 404
1349@news_api_bp.errorhandler(500)
1350def internal_error(e):
1351 return jsonify({"error": "Internal server error"}), 500
1354@news_api_bp.route("/search-history", methods=["GET"])
1355@login_required
1356def get_search_history():
1357 """Get search history for current user."""
1358 try:
1359 # Get username from session
1360 from ..web.auth.decorators import current_user
1362 username = current_user()
1363 if not username:
1364 # Not authenticated, return empty history
1365 return jsonify({"search_history": []})
1367 # Get search history from user's encrypted database
1368 from ..database.session_context import get_user_db_session
1369 from ..database.models import UserNewsSearchHistory
1371 # Get password from Flask g object (set by middleware)
1372 from flask import g
1374 password = getattr(g, "user_password", None)
1376 with get_user_db_session(username, password) as db_session:
1377 history = (
1378 db_session.query(UserNewsSearchHistory)
1379 .order_by(UserNewsSearchHistory.created_at.desc())
1380 .limit(20)
1381 .all()
1382 )
1384 return jsonify(
1385 {"search_history": [item.to_dict() for item in history]}
1386 )
1388 except Exception as e:
1389 return jsonify(
1390 {"error": safe_error_message(e, "getting search history")}
1391 ), 500
1394@news_api_bp.route("/search-history", methods=["POST"])
1395@login_required
1396def add_search_history():
1397 """Add a search to the history."""
1398 try:
1399 # Get username from session
1400 from ..web.auth.decorators import current_user
1402 username = current_user()
1403 if not username:
1404 # Not authenticated
1405 return jsonify({"error": "Authentication required"}), 401
1407 data = request.get_json()
1408 logger.info(f"add_search_history received data: {data}")
1409 if not data or not data.get("query"):
1410 logger.warning(f"Invalid search history data: {data}")
1411 return jsonify({"error": "query is required"}), 400
1413 # Add to user's encrypted database
1414 from ..database.session_context import get_user_db_session
1415 from ..database.models import UserNewsSearchHistory
1417 # Get password from Flask g object (set by middleware)
1418 from flask import g
1420 password = getattr(g, "user_password", None)
1422 with get_user_db_session(username, password) as db_session:
1423 search_history = UserNewsSearchHistory(
1424 query=data["query"],
1425 search_type=data.get("type", "filter"),
1426 result_count=data.get("resultCount", 0),
1427 )
1428 db_session.add(search_history)
1429 db_session.commit()
1431 return jsonify({"status": "success", "id": search_history.id})
1433 except Exception as e:
1434 logger.exception("Error adding search history")
1435 return jsonify(
1436 {"error": safe_error_message(e, "adding search history")}
1437 ), 500
1440@news_api_bp.route("/search-history", methods=["DELETE"])
1441@login_required
1442def clear_search_history():
1443 """Clear all search history for current user."""
1444 try:
1445 # Get username from session
1446 from ..web.auth.decorators import current_user
1448 username = current_user()
1449 if not username:
1450 return jsonify({"status": "success"})
1452 # Clear from user's encrypted database
1453 from ..database.session_context import get_user_db_session
1454 from ..database.models import UserNewsSearchHistory
1456 # Get password from Flask g object (set by middleware)
1457 from flask import g
1459 password = getattr(g, "user_password", None)
1461 with get_user_db_session(username, password) as db_session:
1462 db_session.query(UserNewsSearchHistory).delete()
1463 db_session.commit()
1465 return jsonify({"status": "success"})
1467 except Exception as e:
1468 return jsonify(
1469 {"error": safe_error_message(e, "clearing search history")}
1470 ), 500
1473@news_api_bp.route("/debug", methods=["GET"])
1474def debug_database():
1475 """Debug endpoint to check database content."""
1476 try:
1477 user_id = get_user_id()
1478 result = api.debug_research_items(user_id)
1479 if "error" in result:
1480 logger.error(
1481 f"Debug endpoint error for user {user_id}: {result['error']}"
1482 )
1483 return jsonify({"error": "Internal server error"}), 500
1484 return jsonify(result)
1485 except Exception:
1486 logger.exception("Exception in debug endpoint")
1487 return jsonify({"error": "Internal server error"}), 500