Coverage for src/local_deep_research/news/flask_api.py: 88%
675 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Flask API endpoints for news system.
3Converted from FastAPI to match LDR's Flask architecture.
4"""
6from functools import wraps
7from typing import Any
8from flask import Blueprint, request, jsonify
9from loguru import logger
11from . import api
12from .folder_manager import FolderManager
13from ..database.models import SubscriptionFolder
14from ..web.auth.decorators import login_required
15from ..database.session_context import get_user_db_session
16from ..settings.env_registry import get_env_setting
17from ..utilities.db_utils import get_settings_manager
18from ..security import safe_post
19from ..llm.providers.base import normalize_provider
20from ..security.decorators import require_json_body
23def scheduler_control_required(f):
24 """Decorator that gates global scheduler control behind a setting.
26 The news scheduler is a global singleton — starting, stopping, or
27 triggering it affects all users. This decorator checks the
28 ``news.scheduler.allow_api_control`` setting (env var
29 ``LDR_NEWS_SCHEDULER_ALLOW_API_CONTROL``, default ``false``) and
30 returns 403 when the setting is disabled.
32 Must be placed *after* ``@login_required`` in the decorator stack.
33 """
35 @wraps(f)
36 def wrapper(*args, **kwargs):
37 if not get_env_setting("news.scheduler.allow_api_control", False):
38 from flask import session as flask_session
40 username = flask_session.get("username", "unknown")
41 remote_addr = request.remote_addr
42 logger.warning(
43 "Scheduler API control blocked for endpoint {} (user={}, ip={})",
44 f.__name__,
45 username,
46 remote_addr,
47 )
48 return (
49 jsonify(
50 {
51 "error": "Scheduler API control is disabled. "
52 "Contact your administrator to enable it."
53 }
54 ),
55 403,
56 )
57 return f(*args, **kwargs)
59 return wrapper
62def safe_error_message(e: Exception, context: str = "") -> str:
63 """
64 Return a safe error message that doesn't expose internal details.
66 Args:
67 e: The exception
68 context: Optional context about what was being attempted
70 Returns:
71 A generic error message safe for external users
72 """
73 # Log the actual error for debugging
74 logger.exception(f"Error in {context}")
76 # Return generic messages based on exception type
77 if isinstance(e, ValueError):
78 return "Invalid input provided"
79 if isinstance(e, KeyError):
80 return "Required data missing"
81 if isinstance(e, TypeError):
82 return "Invalid data format"
83 # Generic message for production
84 return f"An error occurred{f' while {context}' if context else ''}"
87def _is_job_owned_by_user(job, username, scheduler):
88 """Check if an APScheduler job belongs to a specific user."""
89 # Primary: all news scheduler jobs pass username as first arg
90 if hasattr(job, "args") and job.args and job.args[0] == username:
91 return True
92 # Fallback: check the tracked scheduled_jobs set
93 if hasattr(scheduler, "user_sessions"):
94 session_info = scheduler.user_sessions.get(username, {})
95 if job.id in session_info.get("scheduled_jobs", set()):
96 return True
97 return False
100# Create Blueprint - no url_prefix here since parent blueprint already has /news
101news_api_bp = Blueprint("news_api", __name__, url_prefix="/api")
102# NOTE: Routes use session["username"] (not .get()) intentionally.
103# @login_required guarantees the key exists; direct access fails fast
104# if the decorator is ever removed.
106# Components are initialized in api.py
109def get_user_id():
110 """Get current user ID from session"""
111 from ..web.auth.decorators import current_user
113 username = current_user()
115 if not username:
116 # For news, we need authenticated users
117 return None
119 return username
122@news_api_bp.route("/feed", methods=["GET"])
123@login_required
124def get_news_feed() -> Any:
125 """
126 Get personalized news feed for user.
128 Query params:
129 user_id: User identifier (default: anonymous)
130 limit: Maximum number of cards to return (default: 20)
131 use_cache: Whether to use cached news (default: true)
132 strategy: Override default recommendation strategy
133 focus: Optional focus area for news
134 """
135 try:
136 # Get current user (login_required ensures we have one)
137 user_id = get_user_id()
138 logger.info(f"News feed requested by user: {user_id}")
140 # Get query parameters
141 settings_manager = get_settings_manager()
142 default_limit = settings_manager.get_setting("news.feed.default_limit")
143 limit = int(request.args.get("limit", default_limit))
144 use_cache = request.args.get("use_cache", "true").lower() == "true"
145 strategy = request.args.get("strategy")
146 focus = request.args.get("focus")
147 subscription_id = request.args.get("subscription_id")
149 logger.info(
150 f"News feed params: limit={limit}, subscription_id={subscription_id}, focus={focus}"
151 )
153 # Call the direct API function (now synchronous)
154 result = api.get_news_feed(
155 user_id=user_id,
156 limit=limit,
157 use_cache=use_cache,
158 focus=focus,
159 search_strategy=strategy,
160 subscription_id=subscription_id,
161 )
163 # Check for errors in result
164 if "error" in result and result.get("news_items") == []:
165 # Sanitize error message before returning to client
166 safe_msg = safe_error_message(
167 Exception(result["error"]), context="get_news_feed"
168 )
169 return jsonify(
170 {"error": safe_msg, "news_items": []}
171 ), 400 if "must be between" in result["error"] else 500
173 # Debug: Log the result before returning
174 logger.info(
175 f"API returning {len(result.get('news_items', []))} news items"
176 )
177 if result.get("news_items"):
178 logger.info(
179 f"First item ID: {result['news_items'][0].get('id', 'NO_ID')}"
180 )
182 return jsonify(result)
184 except Exception as e:
185 return jsonify(
186 {
187 "error": safe_error_message(e, "getting news feed"),
188 "news_items": [],
189 }
190 ), 500
193@news_api_bp.route("/subscribe", methods=["POST"])
194@login_required
195@require_json_body(error_message="No JSON data provided")
196def create_subscription() -> Any:
197 """
198 Create a new subscription for user.
200 JSON body:
201 query: Search query or topic
202 subscription_type: "search" or "topic" (default: "search")
203 refresh_minutes: Refresh interval in minutes (default: from settings)
204 """
205 try:
206 data = request.get_json(force=True)
207 except Exception:
208 # Handle invalid JSON
209 return jsonify({"error": "Invalid JSON data"}), 400
211 try:
212 # Get current user
213 user_id = get_user_id()
215 # Extract parameters
216 query = data.get("query")
217 subscription_type = data.get("subscription_type", "search")
218 refresh_minutes = data.get(
219 "refresh_minutes"
220 ) # Will use default from api.py
222 # Extract model configuration (optional)
223 model_provider = normalize_provider(data.get("model_provider"))
224 model = data.get("model")
225 search_strategy = data.get("search_strategy", "news_aggregation")
226 custom_endpoint = data.get("custom_endpoint")
228 # Extract additional fields
229 name = data.get("name")
230 folder_id = data.get("folder_id")
231 is_active = data.get("is_active", True)
232 search_engine = data.get("search_engine")
233 search_iterations = data.get("search_iterations")
234 questions_per_iteration = data.get("questions_per_iteration")
236 # Validate required fields
237 if not query:
238 return jsonify({"error": "query is required"}), 400
240 # Call the direct API function
241 result = api.create_subscription(
242 user_id=user_id,
243 query=query,
244 subscription_type=subscription_type,
245 refresh_minutes=refresh_minutes,
246 model_provider=model_provider,
247 model=model,
248 search_strategy=search_strategy,
249 custom_endpoint=custom_endpoint,
250 name=name,
251 folder_id=folder_id,
252 is_active=is_active,
253 search_engine=search_engine,
254 search_iterations=search_iterations,
255 questions_per_iteration=questions_per_iteration,
256 )
258 return jsonify(result)
260 except ValueError as e:
261 return jsonify(
262 {"error": safe_error_message(e, "creating subscription")}
263 ), 400
264 except Exception as e:
265 return jsonify(
266 {"error": safe_error_message(e, "creating subscription")}
267 ), 500
270@news_api_bp.route("/vote", methods=["POST"])
271@login_required
272@require_json_body(error_message="No JSON data provided")
273def vote_on_news() -> Any:
274 """
275 Submit vote on a news item.
277 JSON body:
278 card_id: ID of the news card
279 vote: "up" or "down"
280 """
281 try:
282 data = request.get_json()
284 # Get current user
285 user_id = get_user_id()
287 card_id = data.get("card_id")
288 vote = data.get("vote")
290 # Validate
291 if not all([card_id, vote]):
292 return jsonify({"error": "card_id and vote are required"}), 400
294 # Call the direct API function
295 result = api.submit_feedback(
296 card_id=card_id, user_id=user_id, vote=vote
297 )
299 return jsonify(result)
301 except ValueError as e:
302 error_msg = str(e)
303 if "not found" in error_msg.lower():
304 return jsonify({"error": "Resource not found"}), 404
305 return jsonify({"error": safe_error_message(e, "submitting vote")}), 400
306 except Exception as e:
307 return jsonify({"error": safe_error_message(e, "submitting vote")}), 500
310@news_api_bp.route("/feedback/batch", methods=["POST"])
311@login_required
312@require_json_body(error_message="No JSON data provided")
313def get_batch_feedback() -> Any:
314 """
315 Get feedback (votes) for multiple news cards.
316 JSON body:
317 card_ids: List of card IDs
318 """
319 try:
320 data = request.get_json()
321 card_ids = data.get("card_ids", [])
322 if not card_ids:
323 return jsonify({"votes": {}})
325 # Get current user
326 user_id = get_user_id()
328 # Call the direct API function
329 result = api.get_votes_for_cards(card_ids=card_ids, user_id=user_id)
331 return jsonify(result)
333 except ValueError as e:
334 error_msg = str(e)
335 if "not found" in error_msg.lower(): 335 ↛ 337line 335 didn't jump to line 337 because the condition on line 335 was always true
336 return jsonify({"error": "Resource not found"}), 404
337 return jsonify({"error": safe_error_message(e, "getting votes")}), 400
338 except Exception as e:
339 logger.exception("Error getting batch feedback")
340 return jsonify({"error": safe_error_message(e, "getting votes")}), 500
343@news_api_bp.route("/feedback/<card_id>", methods=["POST"])
344@login_required
345@require_json_body(error_message="No JSON data provided")
346def submit_feedback(card_id: str) -> Any:
347 """
348 Submit feedback (vote) for a news card.
350 JSON body:
351 vote: "up" or "down"
352 """
353 try:
354 data = request.get_json()
356 # Get current user
357 user_id = get_user_id()
358 vote = data.get("vote")
360 # Validate
361 if not vote:
362 return jsonify({"error": "vote is required"}), 400
364 # Call the direct API function
365 result = api.submit_feedback(
366 card_id=card_id, user_id=user_id, vote=vote
367 )
369 return jsonify(result)
371 except ValueError as e:
372 error_msg = str(e)
373 if "not found" in error_msg.lower():
374 return jsonify({"error": "Resource not found"}), 404
375 if "must be" in error_msg.lower():
376 return jsonify({"error": "Invalid input value"}), 400
377 return jsonify(
378 {"error": safe_error_message(e, "submitting feedback")}
379 ), 400
380 except Exception as e:
381 return jsonify(
382 {"error": safe_error_message(e, "submitting feedback")}
383 ), 500
386@news_api_bp.route("/research/<card_id>", methods=["POST"])
387@login_required
388def research_news_item(card_id: str) -> Any:
389 """
390 Perform deeper research on a news item.
392 JSON body:
393 depth: "quick", "detailed", or "report" (default: "quick")
394 """
395 try:
396 data = request.get_json() or {}
397 depth = data.get("depth", "quick")
399 # Call the API function which handles the research
400 result = api.research_news_item(card_id, depth)
402 return jsonify(result)
404 except Exception as e:
405 return jsonify(
406 {"error": safe_error_message(e, "researching news item")}
407 ), 500
410@news_api_bp.route("/subscriptions/current", methods=["GET"])
411@login_required
412def get_current_user_subscriptions() -> Any:
413 """Get all subscriptions for current user."""
414 try:
415 # Get current user
416 user_id = get_user_id()
418 # Ensure we have a database session for the user
419 # This will trigger register_activity
420 logger.debug(f"Getting news feed for user {user_id}")
422 # Use the API function
423 result = api.get_subscriptions(user_id)
424 if "error" in result:
425 logger.error(
426 f"Error getting subscriptions for user {user_id}: {result['error']}"
427 )
428 return jsonify({"error": "Failed to retrieve subscriptions"}), 500
429 return jsonify(result)
431 except Exception as e:
432 return jsonify(
433 {"error": safe_error_message(e, "getting subscriptions")}
434 ), 500
437@news_api_bp.route("/subscriptions/<subscription_id>", methods=["GET"])
438@login_required
439def get_subscription(subscription_id: str) -> Any:
440 """Get a single subscription by ID."""
441 try:
442 # Handle null or invalid subscription IDs
443 if (
444 subscription_id == "null"
445 or subscription_id == "undefined"
446 or not subscription_id
447 ):
448 return jsonify({"error": "Invalid subscription ID"}), 400
450 # Get the subscription
451 subscription = api.get_subscription(subscription_id)
453 if not subscription:
454 return jsonify({"error": "Subscription not found"}), 404
456 return jsonify(subscription)
458 except Exception as e:
459 return jsonify(
460 {"error": safe_error_message(e, "getting subscription")}
461 ), 500
464@news_api_bp.route("/subscriptions/<subscription_id>", methods=["PUT"])
465@login_required
466@require_json_body(error_message="No JSON data provided")
467def update_subscription(subscription_id: str) -> Any:
468 """Update a subscription."""
469 try:
470 data = request.get_json(force=True)
471 except Exception:
472 return jsonify({"error": "Invalid JSON data"}), 400
474 try:
475 # Prepare update data
476 update_data = {}
478 # Map fields from request to storage format
479 field_mapping = {
480 "query": "query_or_topic",
481 "name": "name",
482 "refresh_minutes": "refresh_interval_minutes",
483 "is_active": "is_active",
484 "folder_id": "folder_id",
485 "model_provider": "model_provider",
486 "model": "model",
487 "search_strategy": "search_strategy",
488 "custom_endpoint": "custom_endpoint",
489 "search_engine": "search_engine",
490 "search_iterations": "search_iterations",
491 "questions_per_iteration": "questions_per_iteration",
492 }
494 for request_field, storage_field in field_mapping.items():
495 if request_field in data:
496 update_data[storage_field] = data[request_field]
498 # Update subscription
499 result = api.update_subscription(subscription_id, update_data)
501 if "error" in result:
502 # Sanitize error message before returning to client
503 original_error = result["error"]
504 result["error"] = safe_error_message(
505 Exception(original_error), "updating subscription"
506 )
507 if "not found" in original_error.lower():
508 return jsonify(result), 404
509 return jsonify(result), 400
511 return jsonify(result)
513 except Exception as e:
514 return jsonify(
515 {"error": safe_error_message(e, "updating subscription")}
516 ), 500
519@news_api_bp.route("/subscriptions/<subscription_id>", methods=["DELETE"])
520@login_required
521def delete_subscription(subscription_id: str) -> Any:
522 """Delete a subscription."""
523 try:
524 # Call the direct API function
525 success = api.delete_subscription(subscription_id)
527 if success:
528 return jsonify(
529 {
530 "status": "success",
531 "message": f"Subscription {subscription_id} deleted",
532 }
533 )
534 return jsonify({"error": "Subscription not found"}), 404
536 except Exception as e:
537 return jsonify(
538 {"error": safe_error_message(e, "deleting subscription")}
539 ), 500
542@news_api_bp.route("/subscriptions/<subscription_id>/run", methods=["POST"])
543@login_required
544def run_subscription_now(subscription_id: str) -> Any:
545 """Manually trigger a subscription to run now."""
546 try:
547 # Get the subscription from the API
548 subscription_data = api.get_subscriptions(get_user_id())
550 # Find the specific subscription
551 subscription = None
552 for sub in subscription_data.get("subscriptions", []):
553 if sub["id"] == subscription_id: 553 ↛ 552line 553 didn't jump to line 552 because the condition on line 553 was always true
554 subscription = sub
555 break
557 if not subscription:
558 return jsonify({"error": "Subscription not found"}), 404
560 # Get timezone-aware current date using settings
561 from flask import session
562 from .core.utils import get_local_date_string
563 from ..database.session_context import get_user_db_session
564 from ..settings.manager import SettingsManager
566 username = session["username"]
567 with get_user_db_session(username) as db:
568 settings_manager = SettingsManager(db)
569 current_date = get_local_date_string(settings_manager)
571 # Get the query and update dates
572 query = subscription["query"]
574 # Replace YYYY-MM-DD placeholder ONLY (not all dates)
575 query = query.replace("YYYY-MM-DD", current_date)
577 # Build request data similar to news page
578 request_data = {
579 "query": query,
580 "mode": "quick",
581 # Use subscription's model configuration if available
582 "model_provider": subscription.get(
583 "model_provider", "ollama"
584 ), # Default: llm.provider
585 "model": subscription.get("model", "llama3"), # Default: llm.model
586 "strategy": subscription.get("search_strategy", "news_aggregation"),
587 "metadata": {
588 "is_news_search": True,
589 "search_type": "news_analysis",
590 "display_in": "news_feed",
591 "subscription_id": subscription_id,
592 "triggered_by": "manual",
593 "original_query": subscription[
594 "query"
595 ], # Store original query with placeholder
596 "processed_query": query, # Store processed query with replaced date
597 "news_date": current_date, # Store the actual date used
598 "title": subscription.get("name")
599 if subscription.get("name")
600 else None,
601 },
602 }
604 # Add custom endpoint if specified
605 if subscription.get("custom_endpoint"): 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true
606 request_data["custom_endpoint"] = subscription["custom_endpoint"]
608 # Call the research API endpoint (api_bp at /research/api)
609 # Use request.host_url to get the actual URL the server is responding on
610 base_url = request.host_url.rstrip("/")
612 response = safe_post(
613 f"{base_url}/research/api/start",
614 json=request_data,
615 headers={"Content-Type": "application/json"},
616 allow_localhost=True,
617 allow_private_ips=True,
618 )
620 if response.ok:
621 data = response.json()
622 if data.get("status") in ("success", "queued"): 622 ↛ 631line 622 didn't jump to line 631 because the condition on line 622 was always true
623 return jsonify(
624 {
625 "status": "success",
626 "message": "Research started",
627 "research_id": data.get("research_id"),
628 "url": f"/progress/{data.get('research_id')}",
629 }
630 )
631 return jsonify(
632 {"error": data.get("message", "Failed to start research")}
633 ), 500
634 return jsonify(
635 {"error": f"Failed to start research: {response.status_code}"}
636 ), response.status_code
638 except Exception as e:
639 return jsonify(
640 {"error": safe_error_message(e, "running subscription")}
641 ), 500
644@news_api_bp.route("/subscriptions/<subscription_id>/history", methods=["GET"])
645@login_required
646def get_subscription_history(subscription_id: str) -> Any:
647 """Get research history for a subscription."""
648 try:
649 settings_manager = get_settings_manager()
650 default_limit = settings_manager.get_setting("news.feed.default_limit")
651 limit = int(request.args.get("limit", default_limit))
652 result = api.get_subscription_history(subscription_id, limit)
653 if "error" in result:
654 logger.error(
655 f"Error getting subscription history: {result['error']}"
656 )
657 return jsonify(
658 {
659 "error": "Failed to retrieve subscription history",
660 "history": [],
661 }
662 ), 500
663 return jsonify(result)
664 except Exception as e:
665 return jsonify(
666 {"error": safe_error_message(e, "getting subscription history")}
667 ), 500
670@news_api_bp.route("/preferences", methods=["POST"])
671@login_required
672@require_json_body(error_message="No JSON data provided")
673def save_preferences() -> Any:
674 """Save user preferences for news."""
675 try:
676 data = request.get_json()
678 # Get current user
679 user_id = get_user_id()
680 preferences = data.get("preferences", {})
682 # Call the direct API function
683 result = api.save_news_preferences(user_id, preferences)
685 return jsonify(result)
687 except Exception as e:
688 return jsonify(
689 {"error": safe_error_message(e, "saving preferences")}
690 ), 500
693@news_api_bp.route("/categories", methods=["GET"])
694@login_required
695def get_categories() -> Any:
696 """Get news category distribution."""
697 try:
698 # Call the direct API function
699 result = api.get_news_categories()
701 return jsonify(result)
703 except Exception as e:
704 return jsonify(
705 {"error": safe_error_message(e, "getting categories")}
706 ), 500
709@news_api_bp.route("/scheduler/status", methods=["GET"])
710@login_required
711def get_scheduler_status() -> Any:
712 """Get activity-based scheduler status."""
713 try:
714 logger.info("Scheduler status endpoint called")
715 from flask import session
716 from ..scheduler.background import get_background_job_scheduler
718 # Get scheduler instance
719 scheduler = get_background_job_scheduler()
720 username = session["username"]
721 show_all = get_env_setting("news.scheduler.allow_api_control", False)
722 logger.info(
723 f"Scheduler instance obtained: is_running={scheduler.is_running}"
724 )
726 # Build status manually to avoid potential deadlock
727 if show_all:
728 active_users = (
729 len(scheduler.user_sessions)
730 if hasattr(scheduler, "user_sessions")
731 else 0
732 )
733 else:
734 active_users = (
735 1
736 if hasattr(scheduler, "user_sessions")
737 and username in scheduler.user_sessions
738 else 0
739 )
741 status = {
742 "scheduler_available": True, # APScheduler is installed and working
743 "is_running": scheduler.is_running,
744 "config": scheduler.config.copy()
745 if hasattr(scheduler, "config")
746 else {},
747 "active_users": active_users,
748 "total_scheduled_jobs": 0,
749 }
751 # Count scheduled jobs
752 if hasattr(scheduler, "user_sessions"): 752 ↛ 764line 752 didn't jump to line 764 because the condition on line 752 was always true
753 if show_all:
754 total_jobs = sum(
755 len(sess.get("scheduled_jobs", set()))
756 for sess in scheduler.user_sessions.values()
757 )
758 else:
759 user_session = scheduler.user_sessions.get(username, {})
760 total_jobs = len(user_session.get("scheduled_jobs", set()))
761 status["total_scheduled_jobs"] = total_jobs
763 # Also count actual APScheduler jobs
764 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
765 try:
766 apscheduler_jobs = scheduler.scheduler.get_jobs()
767 if not show_all:
768 apscheduler_jobs = [
769 j
770 for j in apscheduler_jobs
771 if _is_job_owned_by_user(j, username, scheduler)
772 ]
773 status["apscheduler_job_count"] = len(apscheduler_jobs)
774 status["apscheduler_jobs"] = [
775 {
776 "id": job.id,
777 "name": job.name,
778 "next_run": job.next_run_time.isoformat()
779 if job.next_run_time
780 else None,
781 }
782 for job in apscheduler_jobs[
783 :10
784 ] # Limit to first 10 for display
785 ]
786 except Exception:
787 logger.exception("Error getting APScheduler jobs")
788 status["apscheduler_job_count"] = 0
790 logger.info(f"Status built: {list(status.keys())}")
792 # Add scheduled_jobs field that JS expects
793 status["scheduled_jobs"] = status.get("total_scheduled_jobs", 0)
795 logger.info(
796 f"Returning status: is_running={status.get('is_running')}, active_users={status.get('active_users')}"
797 )
798 return jsonify(status)
800 except Exception as e:
801 return jsonify(
802 {"error": safe_error_message(e, "getting scheduler status")}
803 ), 500
806@news_api_bp.route("/scheduler/start", methods=["POST"])
807@login_required
808@scheduler_control_required
809def start_scheduler() -> Any:
810 """Start the subscription scheduler."""
811 try:
812 from flask import current_app
813 from ..scheduler.background import get_background_job_scheduler
815 # Get scheduler instance
816 scheduler = get_background_job_scheduler()
818 if scheduler.is_running:
819 return jsonify({"message": "Scheduler is already running"}), 200
821 # Start the scheduler
822 scheduler.start()
824 # Update app reference
825 current_app.background_job_scheduler = scheduler # type: ignore[attr-defined,unused-ignore]
827 logger.info("News scheduler started via API")
828 return jsonify(
829 {
830 "status": "success",
831 "message": "Scheduler started",
832 "active_users": len(scheduler.user_sessions),
833 }
834 )
836 except Exception as e:
837 return jsonify(
838 {"error": safe_error_message(e, "starting scheduler")}
839 ), 500
842@news_api_bp.route("/scheduler/stop", methods=["POST"])
843@login_required
844@scheduler_control_required
845def stop_scheduler() -> Any:
846 """Stop the subscription scheduler."""
847 try:
848 from flask import current_app
850 if (
851 hasattr(current_app, "background_job_scheduler")
852 and current_app.background_job_scheduler
853 ):
854 scheduler = current_app.background_job_scheduler
855 if scheduler.is_running:
856 scheduler.stop()
857 logger.info("News scheduler stopped via API")
858 return jsonify(
859 {"status": "success", "message": "Scheduler stopped"}
860 )
861 return jsonify({"message": "Scheduler is not running"}), 200
862 return jsonify({"message": "No scheduler instance found"}), 404
864 except Exception as e:
865 return jsonify(
866 {"error": safe_error_message(e, "stopping scheduler")}
867 ), 500
870@news_api_bp.route("/scheduler/check-now", methods=["POST"])
871@login_required
872@scheduler_control_required
873def check_subscriptions_now() -> Any:
874 """Manually trigger subscription checking."""
875 try:
876 from flask import current_app
878 if (
879 not hasattr(current_app, "background_job_scheduler")
880 or not current_app.background_job_scheduler
881 ):
882 return jsonify({"error": "Scheduler not initialized"}), 503
884 scheduler = current_app.background_job_scheduler
885 if not scheduler.is_running: 885 ↛ 889line 885 didn't jump to line 889 because the condition on line 885 was always true
886 return jsonify({"error": "Scheduler is not running"}), 503
888 # Run the check subscriptions task immediately
889 scheduler_instance = current_app.background_job_scheduler
891 # Get count of due subscriptions
892 from ..database.models import NewsSubscription as BaseSubscription
893 from datetime import datetime, timedelta, timezone
895 with get_user_db_session() as session:
896 now = datetime.now(timezone.utc)
897 count = (
898 session.query(BaseSubscription)
899 .filter(
900 BaseSubscription.is_active.is_(True),
901 BaseSubscription.next_refresh.is_not(None),
902 BaseSubscription.next_refresh <= now,
903 )
904 .count()
905 )
907 # Trigger the check asynchronously via APScheduler with app context
908 username = get_user_id()
909 if not username:
910 return jsonify({"error": "No authenticated user"}), 401
912 scheduler_instance.scheduler.add_job(
913 func=scheduler_instance._wrap_job(
914 scheduler_instance._check_user_overdue_subscriptions
915 ),
916 args=[username],
917 trigger="date",
918 run_date=datetime.now(timezone.utc) + timedelta(seconds=1),
919 id=f"manual_check_{username}",
920 replace_existing=True,
921 )
923 return jsonify(
924 {
925 "status": "success",
926 "message": f"Checking {count} due subscriptions",
927 "count": count,
928 }
929 )
931 except Exception as e:
932 return jsonify(
933 {"error": safe_error_message(e, "checking subscriptions")}
934 ), 500
937@news_api_bp.route("/scheduler/cleanup-now", methods=["POST"])
938@login_required
939@scheduler_control_required
940def trigger_cleanup() -> Any:
941 """Manually trigger cleanup job."""
942 try:
943 from ..scheduler.background import get_background_job_scheduler
944 from datetime import datetime, UTC, timedelta
946 scheduler = get_background_job_scheduler()
948 if not scheduler.is_running:
949 return jsonify({"error": "Scheduler is not running"}), 400
951 # Schedule cleanup to run in 1 second
952 scheduler.scheduler.add_job(
953 scheduler._wrap_job(scheduler._run_cleanup_with_tracking),
954 "date",
955 run_date=datetime.now(UTC) + timedelta(seconds=1),
956 id="manual_cleanup_trigger",
957 replace_existing=True,
958 )
960 return jsonify(
961 {
962 "status": "triggered",
963 "message": "Cleanup job will run within seconds",
964 }
965 )
967 except Exception as e:
968 return jsonify(
969 {"error": safe_error_message(e, "triggering cleanup")}
970 ), 500
973@news_api_bp.route("/scheduler/users", methods=["GET"])
974@login_required
975def get_active_users() -> Any:
976 """Get summary of active user sessions."""
977 try:
978 from flask import session
979 from ..scheduler.background import get_background_job_scheduler
981 scheduler = get_background_job_scheduler()
982 username = session["username"]
983 users_summary = scheduler.get_user_sessions_summary()
985 show_all = get_env_setting("news.scheduler.allow_api_control", False)
986 if not show_all:
987 users_summary = [
988 u for u in users_summary if u.get("user_id") == username
989 ]
991 return jsonify(
992 {"active_users": len(users_summary), "users": users_summary}
993 )
995 except Exception as e:
996 return jsonify(
997 {"error": safe_error_message(e, "getting active users")}
998 ), 500
1001@news_api_bp.route("/scheduler/stats", methods=["GET"])
1002@login_required
1003def scheduler_stats() -> Any:
1004 """Get scheduler statistics and state."""
1005 try:
1006 from ..scheduler.background import get_background_job_scheduler
1007 from flask import session
1009 scheduler = get_background_job_scheduler()
1010 username = session["username"]
1012 # Debug info
1013 debug_info = {
1014 "current_user": username,
1015 "scheduler_running": scheduler.is_running,
1016 "user_sessions": {},
1017 "apscheduler_jobs": [],
1018 }
1020 show_all = get_env_setting("news.scheduler.allow_api_control", False)
1022 # Get user session info
1023 if hasattr(scheduler, "user_sessions"): 1023 ↛ 1042line 1023 didn't jump to line 1042 because the condition on line 1023 was always true
1024 for user, session_info in scheduler.user_sessions.items():
1025 if not show_all and user != username:
1026 continue
1027 debug_info["user_sessions"][user] = {
1028 "has_password": bool(
1029 scheduler._credential_store.retrieve(user)
1030 ),
1031 "last_activity": session_info.get(
1032 "last_activity"
1033 ).isoformat()
1034 if session_info.get("last_activity")
1035 else None,
1036 "scheduled_jobs_count": len(
1037 session_info.get("scheduled_jobs", set())
1038 ),
1039 }
1041 # Get APScheduler jobs
1042 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
1043 jobs = scheduler.scheduler.get_jobs()
1044 if not show_all:
1045 jobs = [
1046 j
1047 for j in jobs
1048 if _is_job_owned_by_user(j, username, scheduler)
1049 ]
1050 debug_info["apscheduler_jobs"] = [
1051 {
1052 "id": job.id,
1053 "name": job.name,
1054 "next_run": job.next_run_time.isoformat()
1055 if job.next_run_time
1056 else None,
1057 "trigger": str(job.trigger),
1058 }
1059 for job in jobs
1060 ]
1062 return jsonify(debug_info)
1064 except Exception as e:
1065 return jsonify(
1066 {"error": safe_error_message(e, "getting scheduler stats")}
1067 ), 500
1070@news_api_bp.route("/check-overdue", methods=["POST"])
1071@login_required
1072def check_overdue_subscriptions():
1073 """Check and run all overdue subscriptions for the current user."""
1074 try:
1075 from flask import session
1076 from ..database.session_context import get_user_db_session
1077 from ..database.models.news import NewsSubscription
1078 from datetime import datetime, UTC, timedelta
1080 username = session["username"]
1082 # Get overdue subscriptions
1083 overdue_count = 0
1084 results = []
1085 with get_user_db_session(username) as db:
1086 now = datetime.now(UTC)
1087 overdue_subs = (
1088 db.query(NewsSubscription)
1089 .filter(
1090 NewsSubscription.status == "active",
1091 NewsSubscription.next_refresh <= now,
1092 )
1093 .all()
1094 )
1096 logger.info(
1097 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1098 )
1100 # Get timezone-aware current date using settings
1101 from .core.utils import get_local_date_string
1102 from ..settings.manager import SettingsManager
1104 settings_manager = SettingsManager(db)
1105 current_date = get_local_date_string(settings_manager)
1107 for sub in overdue_subs:
1108 try:
1109 # Run the subscription using the same pattern as run_subscription_now
1110 logger.info(
1111 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1112 )
1114 # Update any date placeholders with current date in user's timezone
1115 query = sub.query_or_topic.replace(
1116 "YYYY-MM-DD", current_date
1117 )
1119 # Build request data
1120 request_data = {
1121 "query": query,
1122 "mode": "quick",
1123 "model_provider": sub.model_provider or "ollama",
1124 "model": sub.model or "llama3",
1125 "strategy": sub.search_strategy or "news_aggregation",
1126 "metadata": {
1127 "is_news_search": True,
1128 "search_type": "news_analysis",
1129 "display_in": "news_feed",
1130 "subscription_id": str(sub.id),
1131 "triggered_by": "overdue_check",
1132 "original_query": sub.query_or_topic,
1133 "processed_query": query,
1134 "news_date": current_date,
1135 "title": sub.name if sub.name else None,
1136 },
1137 }
1139 # Add optional search parameters
1140 if sub.search_engine: 1140 ↛ 1141line 1140 didn't jump to line 1141 because the condition on line 1140 was never true
1141 request_data["search_engine"] = sub.search_engine
1142 if sub.custom_endpoint: 1142 ↛ 1143line 1142 didn't jump to line 1143 because the condition on line 1142 was never true
1143 request_data["custom_endpoint"] = sub.custom_endpoint
1145 # Start research using HTTP request like run_subscription_now
1146 logger.info(
1147 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1148 )
1150 # Make HTTP request to research API
1151 from flask import request
1153 # Use request.host_url to get the actual URL the server is responding on
1154 base_url = request.host_url.rstrip("/")
1156 # Use the session from the current request to maintain authentication
1157 session_cookie = request.cookies.get("session")
1159 response = safe_post(
1160 f"{base_url}/research/api/start",
1161 json=request_data,
1162 headers={
1163 "Content-Type": "application/json",
1164 "Cookie": f"session={session_cookie}"
1165 if session_cookie
1166 else "",
1167 },
1168 timeout=30,
1169 allow_localhost=True,
1170 allow_private_ips=True,
1171 )
1173 if response.ok: 1173 ↛ 1176line 1173 didn't jump to line 1176 because the condition on line 1173 was always true
1174 result = response.json()
1175 else:
1176 result = {
1177 "status": "error",
1178 "error": f"HTTP {response.status_code}: {response.text}",
1179 }
1181 if result.get("status") in ("success", "queued"):
1182 overdue_count += 1
1184 # Update subscription's last/next refresh times
1185 sub.last_refresh = datetime.now(UTC)
1186 sub.next_refresh = datetime.now(UTC) + timedelta(
1187 minutes=sub.refresh_interval_minutes
1188 )
1189 db.commit()
1191 results.append(
1192 {
1193 "id": str(sub.id),
1194 "name": sub.name or sub.query_or_topic[:50],
1195 "research_id": result.get("research_id"),
1196 }
1197 )
1198 else:
1199 results.append(
1200 {
1201 "id": str(sub.id),
1202 "name": sub.name or sub.query_or_topic[:50],
1203 "error": result.get(
1204 "error", "Failed to start research"
1205 ),
1206 }
1207 )
1208 except Exception as e:
1209 logger.exception(f"Error running subscription {sub.id}")
1210 results.append(
1211 {
1212 "id": str(sub.id),
1213 "name": sub.name or sub.query_or_topic[:50],
1214 "error": safe_error_message(
1215 e, "running subscription"
1216 ),
1217 }
1218 )
1220 return jsonify(
1221 {
1222 "status": "success",
1223 "overdue_found": len(overdue_subs),
1224 "started": overdue_count,
1225 "results": results,
1226 }
1227 )
1229 except Exception as e:
1230 return jsonify(
1231 {"error": safe_error_message(e, "checking overdue subscriptions")}
1232 ), 500
1235# Folder and subscription management routes
1236@news_api_bp.route("/subscription/folders", methods=["GET"])
1237@login_required
1238def get_folders():
1239 """Get all folders for the current user"""
1240 try:
1241 user_id = get_user_id()
1243 with get_user_db_session() as session:
1244 manager = FolderManager(session)
1245 folders = manager.get_user_folders(user_id)
1247 return jsonify([folder.to_dict() for folder in folders])
1249 except Exception as e:
1250 return jsonify({"error": safe_error_message(e, "getting folders")}), 500
1253@news_api_bp.route("/subscription/folders", methods=["POST"])
1254@login_required
1255@require_json_body()
1256def create_folder():
1257 """Create a new folder"""
1258 try:
1259 data = request.json
1260 if not data.get("name"):
1261 return jsonify({"error": "Folder name is required"}), 400
1263 with get_user_db_session() as session:
1264 manager = FolderManager(session)
1266 # Check if folder already exists
1267 existing = (
1268 session.query(SubscriptionFolder)
1269 .filter_by(name=data["name"])
1270 .first()
1271 )
1272 if existing:
1273 return jsonify({"error": "Folder already exists"}), 409
1275 folder = manager.create_folder(
1276 name=data["name"],
1277 description=data.get("description"),
1278 )
1280 return jsonify(folder.to_dict()), 201
1282 except Exception as e:
1283 return jsonify({"error": safe_error_message(e, "creating folder")}), 500
1286@news_api_bp.route("/subscription/folders/<folder_id>", methods=["PUT"])
1287@login_required
1288@require_json_body()
1289def update_folder(folder_id):
1290 """Update a folder"""
1291 try:
1292 data = request.json
1293 with get_user_db_session() as session:
1294 manager = FolderManager(session)
1295 folder = manager.update_folder(folder_id, **data)
1297 if not folder:
1298 return jsonify({"error": "Folder not found"}), 404
1300 return jsonify(folder.to_dict())
1302 except Exception as e:
1303 return jsonify({"error": safe_error_message(e, "updating folder")}), 500
1306@news_api_bp.route("/subscription/folders/<folder_id>", methods=["DELETE"])
1307@login_required
1308def delete_folder(folder_id):
1309 """Delete a folder"""
1310 try:
1311 move_to = request.args.get("move_to")
1313 with get_user_db_session() as session:
1314 manager = FolderManager(session)
1315 success = manager.delete_folder(folder_id, move_to)
1317 if not success:
1318 return jsonify({"error": "Folder not found"}), 404
1320 return jsonify({"status": "deleted"}), 200
1322 except Exception as e:
1323 return jsonify({"error": safe_error_message(e, "deleting folder")}), 500
1326@news_api_bp.route("/subscription/subscriptions/organized", methods=["GET"])
1327@login_required
1328def get_subscriptions_organized():
1329 """Get subscriptions organized by folder"""
1330 try:
1331 user_id = get_user_id()
1333 with get_user_db_session() as session:
1334 manager = FolderManager(session)
1335 organized = manager.get_subscriptions_by_folder(user_id)
1337 # Convert to JSON-friendly format
1338 result = {}
1339 for folder, subs in organized.items():
1340 result[folder] = [sub.to_dict() for sub in subs]
1342 return jsonify(result)
1344 except Exception as e:
1345 return jsonify(
1346 {"error": safe_error_message(e, "getting organized subscriptions")}
1347 ), 500
1350@news_api_bp.route(
1351 "/subscription/subscriptions/<subscription_id>", methods=["PUT"]
1352)
1353@login_required
1354@require_json_body()
1355def update_subscription_folder(subscription_id):
1356 """Update a subscription (mainly for folder assignment)"""
1357 try:
1358 data = request.json
1359 logger.info(
1360 f"Updating subscription {subscription_id} with data: {data}"
1361 )
1363 with get_user_db_session() as session:
1364 # Manually handle the update to ensure next_refresh is recalculated
1365 from ...database.models import NewsSubscription as BaseSubscription
1366 from datetime import datetime, timedelta, timezone
1368 sub = (
1369 session.query(BaseSubscription)
1370 .filter_by(id=subscription_id)
1371 .first()
1372 )
1373 if not sub:
1374 return jsonify({"error": "Subscription not found"}), 404
1376 # Update fields
1377 for key, value in data.items():
1378 if hasattr(sub, key) and key not in [
1379 "id",
1380 "user_id",
1381 "created_at",
1382 ]:
1383 setattr(sub, key, value)
1385 # Recalculate next_refresh if refresh_interval_minutes changed
1386 if "refresh_interval_minutes" in data:
1387 new_minutes = data["refresh_interval_minutes"]
1388 if sub.last_refresh:
1389 sub.next_refresh = sub.last_refresh + timedelta(
1390 minutes=new_minutes
1391 )
1392 else:
1393 sub.next_refresh = datetime.now(timezone.utc) + timedelta(
1394 minutes=new_minutes
1395 )
1396 logger.info(f"Recalculated next_refresh: {sub.next_refresh}")
1398 sub.updated_at = datetime.now(timezone.utc)
1399 session.commit()
1401 result = sub.to_dict()
1402 logger.info(
1403 f"Updated subscription result: refresh_interval_minutes={result.get('refresh_interval_minutes')}, next_refresh={result.get('next_refresh')}"
1404 )
1405 return jsonify(result)
1406 # Force reload: v2
1408 except Exception as e:
1409 return jsonify(
1410 {"error": safe_error_message(e, "updating subscription")}
1411 ), 500
1414@news_api_bp.route("/subscription/stats", methods=["GET"])
1415@login_required
1416def get_subscription_stats():
1417 """Get subscription statistics"""
1418 try:
1419 user_id = get_user_id()
1421 with get_user_db_session() as session:
1422 manager = FolderManager(session)
1423 stats = manager.get_subscription_stats(user_id)
1425 return jsonify(stats)
1427 except Exception as e:
1428 return jsonify({"error": safe_error_message(e, "getting stats")}), 500
1431# Error handlers
1432@news_api_bp.errorhandler(400)
1433def bad_request(e):
1434 return jsonify({"error": "Bad request"}), 400
1437@news_api_bp.errorhandler(404)
1438def not_found(e):
1439 return jsonify({"error": "Resource not found"}), 404
1442@news_api_bp.errorhandler(500)
1443def internal_error(e):
1444 return jsonify({"error": "Internal server error"}), 500
1447@news_api_bp.route("/search-history", methods=["GET"])
1448@login_required
1449def get_search_history():
1450 """Get search history for current user."""
1451 try:
1452 # Get username from session
1453 from ..web.auth.decorators import current_user
1455 username = current_user()
1456 if not username:
1457 # Not authenticated, return empty history
1458 return jsonify({"search_history": []})
1460 # Get search history from user's encrypted database
1461 from ..database.session_context import get_user_db_session
1462 from ..database.models import UserNewsSearchHistory
1464 # Get password from Flask g object (set by middleware)
1465 from flask import g
1467 password = getattr(g, "user_password", None)
1469 with get_user_db_session(username, password) as db_session:
1470 history = (
1471 db_session.query(UserNewsSearchHistory)
1472 .order_by(UserNewsSearchHistory.created_at.desc())
1473 .limit(20)
1474 .all()
1475 )
1477 return jsonify(
1478 {"search_history": [item.to_dict() for item in history]}
1479 )
1481 except Exception as e:
1482 return jsonify(
1483 {"error": safe_error_message(e, "getting search history")}
1484 ), 500
1487@news_api_bp.route("/search-history", methods=["POST"])
1488@login_required
1489def add_search_history():
1490 """Add a search to the history."""
1491 try:
1492 # Get username from session
1493 from ..web.auth.decorators import current_user
1495 username = current_user()
1496 if not username:
1497 # Not authenticated
1498 return jsonify({"error": "Authentication required"}), 401
1500 data = request.get_json()
1501 logger.info(
1502 f"add_search_history received data keys: {list(data.keys()) if data else 'None'}"
1503 )
1504 if not data or not data.get("query"):
1505 logger.warning("Invalid search history data: missing query")
1506 return jsonify({"error": "query is required"}), 400
1508 # Add to user's encrypted database
1509 from ..database.session_context import get_user_db_session
1510 from ..database.models import UserNewsSearchHistory
1512 # Get password from Flask g object (set by middleware)
1513 from flask import g
1515 password = getattr(g, "user_password", None)
1517 with get_user_db_session(username, password) as db_session:
1518 search_history = UserNewsSearchHistory(
1519 query=data["query"],
1520 search_type=data.get("type", "filter"),
1521 result_count=data.get("resultCount", 0),
1522 )
1523 db_session.add(search_history)
1524 db_session.commit()
1526 return jsonify({"status": "success", "id": search_history.id})
1528 except Exception as e:
1529 logger.exception("Error adding search history")
1530 return jsonify(
1531 {"error": safe_error_message(e, "adding search history")}
1532 ), 500
1535@news_api_bp.route("/search-history", methods=["DELETE"])
1536@login_required
1537def clear_search_history():
1538 """Clear all search history for current user."""
1539 try:
1540 # Get username from session
1541 from ..web.auth.decorators import current_user
1543 username = current_user()
1544 if not username:
1545 return jsonify({"status": "success"})
1547 # Clear from user's encrypted database
1548 from ..database.session_context import get_user_db_session
1549 from ..database.models import UserNewsSearchHistory
1551 # Get password from Flask g object (set by middleware)
1552 from flask import g
1554 password = getattr(g, "user_password", None)
1556 with get_user_db_session(username, password) as db_session:
1557 db_session.query(UserNewsSearchHistory).delete()
1558 db_session.commit()
1560 return jsonify({"status": "success"})
1562 except Exception as e:
1563 return jsonify(
1564 {"error": safe_error_message(e, "clearing search history")}
1565 ), 500