Coverage for src / local_deep_research / news / flask_api.py: 88%
674 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Flask API endpoints for news system.
3Converted from FastAPI to match LDR's Flask architecture.
4"""
6from functools import wraps
7from typing import Any
8from flask import Blueprint, request, jsonify
9from loguru import logger
11from . import api
12from .folder_manager import FolderManager
13from ..database.models import SubscriptionFolder
14from ..web.auth.decorators import login_required
15from ..database.session_context import get_user_db_session
16from ..settings.env_registry import get_env_setting
17from ..utilities.db_utils import get_settings_manager
18from ..security import safe_post
19from ..llm.providers.base import normalize_provider
20from ..security.decorators import require_json_body
23def scheduler_control_required(f):
24 """Decorator that gates global scheduler control behind a setting.
26 The news scheduler is a global singleton — starting, stopping, or
27 triggering it affects all users. This decorator checks the
28 ``news.scheduler.allow_api_control`` setting (env var
29 ``LDR_NEWS_SCHEDULER_ALLOW_API_CONTROL``, default ``false``) and
30 returns 403 when the setting is disabled.
32 Must be placed *after* ``@login_required`` in the decorator stack.
33 """
35 @wraps(f)
36 def wrapper(*args, **kwargs):
37 if not get_env_setting("news.scheduler.allow_api_control", False):
38 from flask import session as flask_session
40 username = flask_session.get("username", "unknown")
41 remote_addr = request.remote_addr
42 logger.warning(
43 "Scheduler API control blocked for endpoint {} (user={}, ip={})",
44 f.__name__,
45 username,
46 remote_addr,
47 )
48 return (
49 jsonify(
50 {
51 "error": "Scheduler API control is disabled. "
52 "Contact your administrator to enable it."
53 }
54 ),
55 403,
56 )
57 return f(*args, **kwargs)
59 return wrapper
62def safe_error_message(e: Exception, context: str = "") -> str:
63 """
64 Return a safe error message that doesn't expose internal details.
66 Args:
67 e: The exception
68 context: Optional context about what was being attempted
70 Returns:
71 A generic error message safe for external users
72 """
73 # Log the actual error for debugging
74 logger.exception(f"Error in {context}")
76 # Return generic messages based on exception type
77 if isinstance(e, ValueError):
78 return "Invalid input provided"
79 if isinstance(e, KeyError):
80 return "Required data missing"
81 if isinstance(e, TypeError):
82 return "Invalid data format"
83 # Generic message for production
84 return f"An error occurred{f' while {context}' if context else ''}"
87def _is_job_owned_by_user(job, username, scheduler):
88 """Check if an APScheduler job belongs to a specific user."""
89 # Primary: all news scheduler jobs pass username as first arg
90 if hasattr(job, "args") and job.args and job.args[0] == username:
91 return True
92 # Fallback: check the tracked scheduled_jobs set
93 if hasattr(scheduler, "user_sessions"):
94 session_info = scheduler.user_sessions.get(username, {})
95 if job.id in session_info.get("scheduled_jobs", set()):
96 return True
97 return False
100# Create Blueprint - no url_prefix here since parent blueprint already has /news
101news_api_bp = Blueprint("news_api", __name__, url_prefix="/api")
102# NOTE: Routes use session["username"] (not .get()) intentionally.
103# @login_required guarantees the key exists; direct access fails fast
104# if the decorator is ever removed.
106# Components are initialized in api.py
109def get_user_id():
110 """Get current user ID from session"""
111 from ..web.auth.decorators import current_user
113 username = current_user()
115 if not username:
116 # For news, we need authenticated users
117 return None
119 return username
122@news_api_bp.route("/feed", methods=["GET"])
123@login_required
124def get_news_feed() -> Any:
125 """
126 Get personalized news feed for user.
128 Query params:
129 user_id: User identifier (default: anonymous)
130 limit: Maximum number of cards to return (default: 20)
131 use_cache: Whether to use cached news (default: true)
132 strategy: Override default recommendation strategy
133 focus: Optional focus area for news
134 """
135 try:
136 # Get current user (login_required ensures we have one)
137 user_id = get_user_id()
138 logger.info(f"News feed requested by user: {user_id}")
140 # Get query parameters
141 settings_manager = get_settings_manager()
142 default_limit = settings_manager.get_setting("news.feed.default_limit")
143 limit = int(request.args.get("limit", default_limit))
144 use_cache = request.args.get("use_cache", "true").lower() == "true"
145 strategy = request.args.get("strategy")
146 focus = request.args.get("focus")
147 subscription_id = request.args.get("subscription_id")
149 logger.info(
150 f"News feed params: limit={limit}, subscription_id={subscription_id}, focus={focus}"
151 )
153 # Call the direct API function (now synchronous)
154 result = api.get_news_feed(
155 user_id=user_id,
156 limit=limit,
157 use_cache=use_cache,
158 focus=focus,
159 search_strategy=strategy,
160 subscription_id=subscription_id,
161 )
163 # Check for errors in result
164 if "error" in result and result.get("news_items") == []:
165 # Sanitize error message before returning to client
166 safe_msg = safe_error_message(
167 Exception(result["error"]), context="get_news_feed"
168 )
169 return jsonify(
170 {"error": safe_msg, "news_items": []}
171 ), 400 if "must be between" in result["error"] else 500
173 # Debug: Log the result before returning
174 logger.info(
175 f"API returning {len(result.get('news_items', []))} news items"
176 )
177 if result.get("news_items"):
178 logger.info(
179 f"First item ID: {result['news_items'][0].get('id', 'NO_ID')}"
180 )
182 return jsonify(result)
184 except Exception as e:
185 return jsonify(
186 {
187 "error": safe_error_message(e, "getting news feed"),
188 "news_items": [],
189 }
190 ), 500
193@news_api_bp.route("/subscribe", methods=["POST"])
194@login_required
195@require_json_body(error_message="No JSON data provided")
196def create_subscription() -> Any:
197 """
198 Create a new subscription for user.
200 JSON body:
201 query: Search query or topic
202 subscription_type: "search" or "topic" (default: "search")
203 refresh_minutes: Refresh interval in minutes (default: from settings)
204 """
205 try:
206 data = request.get_json(force=True)
207 except Exception:
208 # Handle invalid JSON
209 return jsonify({"error": "Invalid JSON data"}), 400
211 try:
212 # Get current user
213 user_id = get_user_id()
215 # Extract parameters
216 query = data.get("query")
217 subscription_type = data.get("subscription_type", "search")
218 refresh_minutes = data.get(
219 "refresh_minutes"
220 ) # Will use default from api.py
222 # Extract model configuration (optional)
223 model_provider = normalize_provider(data.get("model_provider"))
224 model = data.get("model")
225 search_strategy = data.get("search_strategy", "news_aggregation")
226 custom_endpoint = data.get("custom_endpoint")
228 # Extract additional fields
229 name = data.get("name")
230 folder_id = data.get("folder_id")
231 is_active = data.get("is_active", True)
232 search_engine = data.get("search_engine")
233 search_iterations = data.get("search_iterations")
234 questions_per_iteration = data.get("questions_per_iteration")
236 # Validate required fields
237 if not query:
238 return jsonify({"error": "query is required"}), 400
240 # Call the direct API function
241 result = api.create_subscription(
242 user_id=user_id,
243 query=query,
244 subscription_type=subscription_type,
245 refresh_minutes=refresh_minutes,
246 model_provider=model_provider,
247 model=model,
248 search_strategy=search_strategy,
249 custom_endpoint=custom_endpoint,
250 name=name,
251 folder_id=folder_id,
252 is_active=is_active,
253 search_engine=search_engine,
254 search_iterations=search_iterations,
255 questions_per_iteration=questions_per_iteration,
256 )
258 return jsonify(result)
260 except ValueError as e:
261 return jsonify(
262 {"error": safe_error_message(e, "creating subscription")}
263 ), 400
264 except Exception as e:
265 return jsonify(
266 {"error": safe_error_message(e, "creating subscription")}
267 ), 500
270@news_api_bp.route("/vote", methods=["POST"])
271@login_required
272@require_json_body(error_message="No JSON data provided")
273def vote_on_news() -> Any:
274 """
275 Submit vote on a news item.
277 JSON body:
278 card_id: ID of the news card
279 vote: "up" or "down"
280 """
281 try:
282 data = request.get_json()
284 # Get current user
285 user_id = get_user_id()
287 card_id = data.get("card_id")
288 vote = data.get("vote")
290 # Validate
291 if not all([card_id, vote]):
292 return jsonify({"error": "card_id and vote are required"}), 400
294 # Call the direct API function
295 result = api.submit_feedback(
296 card_id=card_id, user_id=user_id, vote=vote
297 )
299 return jsonify(result)
301 except ValueError as e:
302 error_msg = str(e)
303 if "not found" in error_msg.lower():
304 return jsonify({"error": "Resource not found"}), 404
305 return jsonify({"error": safe_error_message(e, "submitting vote")}), 400
306 except Exception as e:
307 return jsonify({"error": safe_error_message(e, "submitting vote")}), 500
310@news_api_bp.route("/feedback/batch", methods=["POST"])
311@login_required
312@require_json_body(error_message="No JSON data provided")
313def get_batch_feedback() -> Any:
314 """
315 Get feedback (votes) for multiple news cards.
316 JSON body:
317 card_ids: List of card IDs
318 """
319 try:
320 data = request.get_json()
321 card_ids = data.get("card_ids", [])
322 if not card_ids:
323 return jsonify({"votes": {}})
325 # Get current user
326 user_id = get_user_id()
328 # Call the direct API function
329 result = api.get_votes_for_cards(card_ids=card_ids, user_id=user_id)
331 return jsonify(result)
333 except ValueError as e:
334 error_msg = str(e)
335 if "not found" in error_msg.lower(): 335 ↛ 337line 335 didn't jump to line 337 because the condition on line 335 was always true
336 return jsonify({"error": "Resource not found"}), 404
337 return jsonify({"error": safe_error_message(e, "getting votes")}), 400
338 except Exception as e:
339 logger.exception("Error getting batch feedback")
340 return jsonify({"error": safe_error_message(e, "getting votes")}), 500
343@news_api_bp.route("/feedback/<card_id>", methods=["POST"])
344@login_required
345@require_json_body(error_message="No JSON data provided")
346def submit_feedback(card_id: str) -> Any:
347 """
348 Submit feedback (vote) for a news card.
350 JSON body:
351 vote: "up" or "down"
352 """
353 try:
354 data = request.get_json()
356 # Get current user
357 user_id = get_user_id()
358 vote = data.get("vote")
360 # Validate
361 if not vote:
362 return jsonify({"error": "vote is required"}), 400
364 # Call the direct API function
365 result = api.submit_feedback(
366 card_id=card_id, user_id=user_id, vote=vote
367 )
369 return jsonify(result)
371 except ValueError as e:
372 error_msg = str(e)
373 if "not found" in error_msg.lower():
374 return jsonify({"error": "Resource not found"}), 404
375 if "must be" in error_msg.lower():
376 return jsonify({"error": "Invalid input value"}), 400
377 return jsonify(
378 {"error": safe_error_message(e, "submitting feedback")}
379 ), 400
380 except Exception as e:
381 return jsonify(
382 {"error": safe_error_message(e, "submitting feedback")}
383 ), 500
386@news_api_bp.route("/research/<card_id>", methods=["POST"])
387@login_required
388def research_news_item(card_id: str) -> Any:
389 """
390 Perform deeper research on a news item.
392 JSON body:
393 depth: "quick", "detailed", or "report" (default: "quick")
394 """
395 try:
396 data = request.get_json() or {}
397 depth = data.get("depth", "quick")
399 # Call the API function which handles the research
400 result = api.research_news_item(card_id, depth)
402 return jsonify(result)
404 except Exception as e:
405 return jsonify(
406 {"error": safe_error_message(e, "researching news item")}
407 ), 500
410@news_api_bp.route("/subscriptions/current", methods=["GET"])
411@login_required
412def get_current_user_subscriptions() -> Any:
413 """Get all subscriptions for current user."""
414 try:
415 # Get current user
416 user_id = get_user_id()
418 # Ensure we have a database session for the user
419 # This will trigger register_activity
420 logger.debug(f"Getting news feed for user {user_id}")
422 # Use the API function
423 result = api.get_subscriptions(user_id)
424 if "error" in result:
425 logger.error(
426 f"Error getting subscriptions for user {user_id}: {result['error']}"
427 )
428 return jsonify({"error": "Failed to retrieve subscriptions"}), 500
429 return jsonify(result)
431 except Exception as e:
432 return jsonify(
433 {"error": safe_error_message(e, "getting subscriptions")}
434 ), 500
437@news_api_bp.route("/subscriptions/<subscription_id>", methods=["GET"])
438@login_required
439def get_subscription(subscription_id: str) -> Any:
440 """Get a single subscription by ID."""
441 try:
442 # Handle null or invalid subscription IDs
443 if (
444 subscription_id == "null"
445 or subscription_id == "undefined"
446 or not subscription_id
447 ):
448 return jsonify({"error": "Invalid subscription ID"}), 400
450 # Get the subscription
451 subscription = api.get_subscription(subscription_id)
453 if not subscription:
454 return jsonify({"error": "Subscription not found"}), 404
456 return jsonify(subscription)
458 except Exception as e:
459 return jsonify(
460 {"error": safe_error_message(e, "getting subscription")}
461 ), 500
464@news_api_bp.route("/subscriptions/<subscription_id>", methods=["PUT"])
465@login_required
466@require_json_body(error_message="No JSON data provided")
467def update_subscription(subscription_id: str) -> Any:
468 """Update a subscription."""
469 try:
470 data = request.get_json(force=True)
471 except Exception:
472 return jsonify({"error": "Invalid JSON data"}), 400
474 try:
475 # Prepare update data
476 update_data = {}
478 # Map fields from request to storage format
479 field_mapping = {
480 "query": "query_or_topic",
481 "name": "name",
482 "refresh_minutes": "refresh_interval_minutes",
483 "is_active": "is_active",
484 "folder_id": "folder_id",
485 "model_provider": "model_provider",
486 "model": "model",
487 "search_strategy": "search_strategy",
488 "custom_endpoint": "custom_endpoint",
489 "search_engine": "search_engine",
490 "search_iterations": "search_iterations",
491 "questions_per_iteration": "questions_per_iteration",
492 }
494 for request_field, storage_field in field_mapping.items():
495 if request_field in data:
496 update_data[storage_field] = data[request_field]
498 # Update subscription
499 result = api.update_subscription(subscription_id, update_data)
501 if "error" in result:
502 # Sanitize error message before returning to client
503 original_error = result["error"]
504 result["error"] = safe_error_message(
505 Exception(original_error), "updating subscription"
506 )
507 if "not found" in original_error.lower():
508 return jsonify(result), 404
509 return jsonify(result), 400
511 return jsonify(result)
513 except Exception as e:
514 return jsonify(
515 {"error": safe_error_message(e, "updating subscription")}
516 ), 500
519@news_api_bp.route("/subscriptions/<subscription_id>", methods=["DELETE"])
520@login_required
521def delete_subscription(subscription_id: str) -> Any:
522 """Delete a subscription."""
523 try:
524 # Call the direct API function
525 success = api.delete_subscription(subscription_id)
527 if success:
528 return jsonify(
529 {
530 "status": "success",
531 "message": f"Subscription {subscription_id} deleted",
532 }
533 )
534 return jsonify({"error": "Subscription not found"}), 404
536 except Exception as e:
537 return jsonify(
538 {"error": safe_error_message(e, "deleting subscription")}
539 ), 500
542@news_api_bp.route("/subscriptions/<subscription_id>/run", methods=["POST"])
543@login_required
544def run_subscription_now(subscription_id: str) -> Any:
545 """Manually trigger a subscription to run now."""
546 try:
547 # Get the subscription from the API
548 subscription_data = api.get_subscriptions(get_user_id())
550 # Find the specific subscription
551 subscription = None
552 for sub in subscription_data.get("subscriptions", []):
553 if sub["id"] == subscription_id: 553 ↛ 552line 553 didn't jump to line 552 because the condition on line 553 was always true
554 subscription = sub
555 break
557 if not subscription:
558 return jsonify({"error": "Subscription not found"}), 404
560 # Get timezone-aware current date using settings
561 from flask import session
562 from .core.utils import get_local_date_string
563 from ..database.session_context import get_user_db_session
564 from ..settings.manager import SettingsManager
566 username = session["username"]
567 with get_user_db_session(username) as db:
568 settings_manager = SettingsManager(db)
569 current_date = get_local_date_string(settings_manager)
571 # Get the query and update dates
572 query = subscription["query"]
574 # Replace YYYY-MM-DD placeholder ONLY (not all dates)
575 query = query.replace("YYYY-MM-DD", current_date)
577 # Build request data similar to news page
578 request_data = {
579 "query": query,
580 "mode": "quick",
581 # Use subscription's model configuration if available
582 "model_provider": subscription.get(
583 "model_provider", "ollama"
584 ), # Default: llm.provider
585 "model": subscription.get("model", "llama3"), # Default: llm.model
586 "strategy": subscription.get("search_strategy", "news_aggregation"),
587 "metadata": {
588 "is_news_search": True,
589 "search_type": "news_analysis",
590 "display_in": "news_feed",
591 "subscription_id": subscription_id,
592 "triggered_by": "manual",
593 "original_query": subscription[
594 "query"
595 ], # Store original query with placeholder
596 "processed_query": query, # Store processed query with replaced date
597 "news_date": current_date, # Store the actual date used
598 "title": subscription.get("name")
599 if subscription.get("name")
600 else None,
601 },
602 }
604 # Add custom endpoint if specified
605 if subscription.get("custom_endpoint"): 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true
606 request_data["custom_endpoint"] = subscription["custom_endpoint"]
608 # Call the research API endpoint (api_bp at /research/api)
609 # Use request.host_url to get the actual URL the server is responding on
610 base_url = request.host_url.rstrip("/")
612 response = safe_post(
613 f"{base_url}/research/api/start",
614 json=request_data,
615 headers={"Content-Type": "application/json"},
616 allow_localhost=True,
617 allow_private_ips=True,
618 )
620 if response.ok:
621 data = response.json()
622 if data.get("status") in ("success", "queued"): 622 ↛ 631line 622 didn't jump to line 631 because the condition on line 622 was always true
623 return jsonify(
624 {
625 "status": "success",
626 "message": "Research started",
627 "research_id": data.get("research_id"),
628 "url": f"/progress/{data.get('research_id')}",
629 }
630 )
631 return jsonify(
632 {"error": data.get("message", "Failed to start research")}
633 ), 500
634 return jsonify(
635 {"error": f"Failed to start research: {response.status_code}"}
636 ), response.status_code
638 except Exception as e:
639 return jsonify(
640 {"error": safe_error_message(e, "running subscription")}
641 ), 500
644@news_api_bp.route("/subscriptions/<subscription_id>/history", methods=["GET"])
645@login_required
646def get_subscription_history(subscription_id: str) -> Any:
647 """Get research history for a subscription."""
648 try:
649 settings_manager = get_settings_manager()
650 default_limit = settings_manager.get_setting("news.feed.default_limit")
651 limit = int(request.args.get("limit", default_limit))
652 result = api.get_subscription_history(subscription_id, limit)
653 if "error" in result:
654 logger.error(
655 f"Error getting subscription history: {result['error']}"
656 )
657 return jsonify(
658 {
659 "error": "Failed to retrieve subscription history",
660 "history": [],
661 }
662 ), 500
663 return jsonify(result)
664 except Exception as e:
665 return jsonify(
666 {"error": safe_error_message(e, "getting subscription history")}
667 ), 500
670@news_api_bp.route("/preferences", methods=["POST"])
671@login_required
672@require_json_body(error_message="No JSON data provided")
673def save_preferences() -> Any:
674 """Save user preferences for news."""
675 try:
676 data = request.get_json()
678 # Get current user
679 user_id = get_user_id()
680 preferences = data.get("preferences", {})
682 # Call the direct API function
683 result = api.save_news_preferences(user_id, preferences)
685 return jsonify(result)
687 except Exception as e:
688 return jsonify(
689 {"error": safe_error_message(e, "saving preferences")}
690 ), 500
693@news_api_bp.route("/categories", methods=["GET"])
694def get_categories() -> Any:
695 """Get news category distribution."""
696 try:
697 # Call the direct API function
698 result = api.get_news_categories()
700 return jsonify(result)
702 except Exception as e:
703 return jsonify(
704 {"error": safe_error_message(e, "getting categories")}
705 ), 500
708@news_api_bp.route("/scheduler/status", methods=["GET"])
709@login_required
710def get_scheduler_status() -> Any:
711 """Get activity-based scheduler status."""
712 try:
713 logger.info("Scheduler status endpoint called")
714 from flask import session
715 from .subscription_manager.scheduler import get_news_scheduler
717 # Get scheduler instance
718 scheduler = get_news_scheduler()
719 username = session["username"]
720 show_all = get_env_setting("news.scheduler.allow_api_control", False)
721 logger.info(
722 f"Scheduler instance obtained: is_running={scheduler.is_running}"
723 )
725 # Build status manually to avoid potential deadlock
726 if show_all:
727 active_users = (
728 len(scheduler.user_sessions)
729 if hasattr(scheduler, "user_sessions")
730 else 0
731 )
732 else:
733 active_users = (
734 1
735 if hasattr(scheduler, "user_sessions")
736 and username in scheduler.user_sessions
737 else 0
738 )
740 status = {
741 "scheduler_available": True, # APScheduler is installed and working
742 "is_running": scheduler.is_running,
743 "config": scheduler.config.copy()
744 if hasattr(scheduler, "config")
745 else {},
746 "active_users": active_users,
747 "total_scheduled_jobs": 0,
748 }
750 # Count scheduled jobs
751 if hasattr(scheduler, "user_sessions"): 751 ↛ 763line 751 didn't jump to line 763 because the condition on line 751 was always true
752 if show_all:
753 total_jobs = sum(
754 len(sess.get("scheduled_jobs", set()))
755 for sess in scheduler.user_sessions.values()
756 )
757 else:
758 user_session = scheduler.user_sessions.get(username, {})
759 total_jobs = len(user_session.get("scheduled_jobs", set()))
760 status["total_scheduled_jobs"] = total_jobs
762 # Also count actual APScheduler jobs
763 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
764 try:
765 apscheduler_jobs = scheduler.scheduler.get_jobs()
766 if not show_all:
767 apscheduler_jobs = [
768 j
769 for j in apscheduler_jobs
770 if _is_job_owned_by_user(j, username, scheduler)
771 ]
772 status["apscheduler_job_count"] = len(apscheduler_jobs)
773 status["apscheduler_jobs"] = [
774 {
775 "id": job.id,
776 "name": job.name,
777 "next_run": job.next_run_time.isoformat()
778 if job.next_run_time
779 else None,
780 }
781 for job in apscheduler_jobs[
782 :10
783 ] # Limit to first 10 for display
784 ]
785 except Exception:
786 logger.exception("Error getting APScheduler jobs")
787 status["apscheduler_job_count"] = 0
789 logger.info(f"Status built: {list(status.keys())}")
791 # Add scheduled_jobs field that JS expects
792 status["scheduled_jobs"] = status.get("total_scheduled_jobs", 0)
794 logger.info(
795 f"Returning status: is_running={status.get('is_running')}, active_users={status.get('active_users')}"
796 )
797 return jsonify(status)
799 except Exception as e:
800 return jsonify(
801 {"error": safe_error_message(e, "getting scheduler status")}
802 ), 500
805@news_api_bp.route("/scheduler/start", methods=["POST"])
806@login_required
807@scheduler_control_required
808def start_scheduler() -> Any:
809 """Start the subscription scheduler."""
810 try:
811 from flask import current_app
812 from .subscription_manager.scheduler import get_news_scheduler
814 # Get scheduler instance
815 scheduler = get_news_scheduler()
817 if scheduler.is_running:
818 return jsonify({"message": "Scheduler is already running"}), 200
820 # Start the scheduler
821 scheduler.start()
823 # Update app reference
824 current_app.news_scheduler = scheduler # type: ignore[attr-defined,unused-ignore]
826 logger.info("News scheduler started via API")
827 return jsonify(
828 {
829 "status": "success",
830 "message": "Scheduler started",
831 "active_users": len(scheduler.user_sessions),
832 }
833 )
835 except Exception as e:
836 return jsonify(
837 {"error": safe_error_message(e, "starting scheduler")}
838 ), 500
841@news_api_bp.route("/scheduler/stop", methods=["POST"])
842@login_required
843@scheduler_control_required
844def stop_scheduler() -> Any:
845 """Stop the subscription scheduler."""
846 try:
847 from flask import current_app
849 if (
850 hasattr(current_app, "news_scheduler")
851 and current_app.news_scheduler
852 ):
853 scheduler = current_app.news_scheduler
854 if scheduler.is_running:
855 scheduler.stop()
856 logger.info("News scheduler stopped via API")
857 return jsonify(
858 {"status": "success", "message": "Scheduler stopped"}
859 )
860 return jsonify({"message": "Scheduler is not running"}), 200
861 return jsonify({"message": "No scheduler instance found"}), 404
863 except Exception as e:
864 return jsonify(
865 {"error": safe_error_message(e, "stopping scheduler")}
866 ), 500
869@news_api_bp.route("/scheduler/check-now", methods=["POST"])
870@login_required
871@scheduler_control_required
872def check_subscriptions_now() -> Any:
873 """Manually trigger subscription checking."""
874 try:
875 from flask import current_app
877 if (
878 not hasattr(current_app, "news_scheduler")
879 or not current_app.news_scheduler
880 ):
881 return jsonify({"error": "Scheduler not initialized"}), 503
883 scheduler = current_app.news_scheduler
884 if not scheduler.is_running: 884 ↛ 888line 884 didn't jump to line 888 because the condition on line 884 was always true
885 return jsonify({"error": "Scheduler is not running"}), 503
887 # Run the check subscriptions task immediately
888 scheduler_instance = current_app.news_scheduler
890 # Get count of due subscriptions
891 from ..database.models import NewsSubscription as BaseSubscription
892 from datetime import datetime, timedelta, timezone
894 with get_user_db_session() as session:
895 now = datetime.now(timezone.utc)
896 count = (
897 session.query(BaseSubscription)
898 .filter(
899 BaseSubscription.is_active.is_(True),
900 BaseSubscription.next_refresh.is_not(None),
901 BaseSubscription.next_refresh <= now,
902 )
903 .count()
904 )
906 # Trigger the check asynchronously via APScheduler with app context
907 username = get_user_id()
908 if not username:
909 return jsonify({"error": "No authenticated user"}), 401
911 scheduler_instance.scheduler.add_job(
912 func=scheduler_instance._wrap_job(
913 scheduler_instance._check_user_overdue_subscriptions
914 ),
915 args=[username],
916 trigger="date",
917 run_date=datetime.now(timezone.utc) + timedelta(seconds=1),
918 id=f"manual_check_{username}",
919 replace_existing=True,
920 )
922 return jsonify(
923 {
924 "status": "success",
925 "message": f"Checking {count} due subscriptions",
926 "count": count,
927 }
928 )
930 except Exception as e:
931 return jsonify(
932 {"error": safe_error_message(e, "checking subscriptions")}
933 ), 500
936@news_api_bp.route("/scheduler/cleanup-now", methods=["POST"])
937@login_required
938@scheduler_control_required
939def trigger_cleanup() -> Any:
940 """Manually trigger cleanup job."""
941 try:
942 from .subscription_manager.scheduler import get_news_scheduler
943 from datetime import datetime, UTC, timedelta
945 scheduler = get_news_scheduler()
947 if not scheduler.is_running:
948 return jsonify({"error": "Scheduler is not running"}), 400
950 # Schedule cleanup to run in 1 second
951 scheduler.scheduler.add_job(
952 scheduler._wrap_job(scheduler._run_cleanup_with_tracking),
953 "date",
954 run_date=datetime.now(UTC) + timedelta(seconds=1),
955 id="manual_cleanup_trigger",
956 replace_existing=True,
957 )
959 return jsonify(
960 {
961 "status": "triggered",
962 "message": "Cleanup job will run within seconds",
963 }
964 )
966 except Exception as e:
967 return jsonify(
968 {"error": safe_error_message(e, "triggering cleanup")}
969 ), 500
972@news_api_bp.route("/scheduler/users", methods=["GET"])
973@login_required
974def get_active_users() -> Any:
975 """Get summary of active user sessions."""
976 try:
977 from flask import session
978 from .subscription_manager.scheduler import get_news_scheduler
980 scheduler = get_news_scheduler()
981 username = session["username"]
982 users_summary = scheduler.get_user_sessions_summary()
984 show_all = get_env_setting("news.scheduler.allow_api_control", False)
985 if not show_all:
986 users_summary = [
987 u for u in users_summary if u.get("user_id") == username
988 ]
990 return jsonify(
991 {"active_users": len(users_summary), "users": users_summary}
992 )
994 except Exception as e:
995 return jsonify(
996 {"error": safe_error_message(e, "getting active users")}
997 ), 500
1000@news_api_bp.route("/scheduler/stats", methods=["GET"])
1001@login_required
1002def scheduler_stats() -> Any:
1003 """Get scheduler statistics and state."""
1004 try:
1005 from .subscription_manager.scheduler import get_news_scheduler
1006 from flask import session
1008 scheduler = get_news_scheduler()
1009 username = session["username"]
1011 # Debug info
1012 debug_info = {
1013 "current_user": username,
1014 "scheduler_running": scheduler.is_running,
1015 "user_sessions": {},
1016 "apscheduler_jobs": [],
1017 }
1019 show_all = get_env_setting("news.scheduler.allow_api_control", False)
1021 # Get user session info
1022 if hasattr(scheduler, "user_sessions"): 1022 ↛ 1041line 1022 didn't jump to line 1041 because the condition on line 1022 was always true
1023 for user, session_info in scheduler.user_sessions.items():
1024 if not show_all and user != username:
1025 continue
1026 debug_info["user_sessions"][user] = {
1027 "has_password": bool(
1028 scheduler._credential_store.retrieve(user)
1029 ),
1030 "last_activity": session_info.get(
1031 "last_activity"
1032 ).isoformat()
1033 if session_info.get("last_activity")
1034 else None,
1035 "scheduled_jobs_count": len(
1036 session_info.get("scheduled_jobs", set())
1037 ),
1038 }
1040 # Get APScheduler jobs
1041 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
1042 jobs = scheduler.scheduler.get_jobs()
1043 if not show_all:
1044 jobs = [
1045 j
1046 for j in jobs
1047 if _is_job_owned_by_user(j, username, scheduler)
1048 ]
1049 debug_info["apscheduler_jobs"] = [
1050 {
1051 "id": job.id,
1052 "name": job.name,
1053 "next_run": job.next_run_time.isoformat()
1054 if job.next_run_time
1055 else None,
1056 "trigger": str(job.trigger),
1057 }
1058 for job in jobs
1059 ]
1061 return jsonify(debug_info)
1063 except Exception as e:
1064 return jsonify(
1065 {"error": safe_error_message(e, "getting scheduler stats")}
1066 ), 500
1069@news_api_bp.route("/check-overdue", methods=["POST"])
1070@login_required
1071def check_overdue_subscriptions():
1072 """Check and run all overdue subscriptions for the current user."""
1073 try:
1074 from flask import session
1075 from ..database.session_context import get_user_db_session
1076 from ..database.models.news import NewsSubscription
1077 from datetime import datetime, UTC, timedelta
1079 username = session["username"]
1081 # Get overdue subscriptions
1082 overdue_count = 0
1083 results = []
1084 with get_user_db_session(username) as db:
1085 now = datetime.now(UTC)
1086 overdue_subs = (
1087 db.query(NewsSubscription)
1088 .filter(
1089 NewsSubscription.status == "active",
1090 NewsSubscription.next_refresh <= now,
1091 )
1092 .all()
1093 )
1095 logger.info(
1096 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1097 )
1099 # Get timezone-aware current date using settings
1100 from .core.utils import get_local_date_string
1101 from ..settings.manager import SettingsManager
1103 settings_manager = SettingsManager(db)
1104 current_date = get_local_date_string(settings_manager)
1106 for sub in overdue_subs:
1107 try:
1108 # Run the subscription using the same pattern as run_subscription_now
1109 logger.info(
1110 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1111 )
1113 # Update any date placeholders with current date in user's timezone
1114 query = sub.query_or_topic.replace(
1115 "YYYY-MM-DD", current_date
1116 )
1118 # Build request data
1119 request_data = {
1120 "query": query,
1121 "mode": "quick",
1122 "model_provider": sub.model_provider or "ollama",
1123 "model": sub.model or "llama3",
1124 "strategy": sub.search_strategy or "news_aggregation",
1125 "metadata": {
1126 "is_news_search": True,
1127 "search_type": "news_analysis",
1128 "display_in": "news_feed",
1129 "subscription_id": str(sub.id),
1130 "triggered_by": "overdue_check",
1131 "original_query": sub.query_or_topic,
1132 "processed_query": query,
1133 "news_date": current_date,
1134 "title": sub.name if sub.name else None,
1135 },
1136 }
1138 # Add optional search parameters
1139 if sub.search_engine: 1139 ↛ 1140line 1139 didn't jump to line 1140 because the condition on line 1139 was never true
1140 request_data["search_engine"] = sub.search_engine
1141 if sub.custom_endpoint: 1141 ↛ 1142line 1141 didn't jump to line 1142 because the condition on line 1141 was never true
1142 request_data["custom_endpoint"] = sub.custom_endpoint
1144 # Start research using HTTP request like run_subscription_now
1145 logger.info(
1146 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1147 )
1149 # Make HTTP request to research API
1150 from flask import request
1152 # Use request.host_url to get the actual URL the server is responding on
1153 base_url = request.host_url.rstrip("/")
1155 # Use the session from the current request to maintain authentication
1156 session_cookie = request.cookies.get("session")
1158 response = safe_post(
1159 f"{base_url}/research/api/start",
1160 json=request_data,
1161 headers={
1162 "Content-Type": "application/json",
1163 "Cookie": f"session={session_cookie}"
1164 if session_cookie
1165 else "",
1166 },
1167 timeout=30,
1168 allow_localhost=True,
1169 allow_private_ips=True,
1170 )
1172 if response.ok: 1172 ↛ 1175line 1172 didn't jump to line 1175 because the condition on line 1172 was always true
1173 result = response.json()
1174 else:
1175 result = {
1176 "status": "error",
1177 "error": f"HTTP {response.status_code}: {response.text}",
1178 }
1180 if result.get("status") in ("success", "queued"):
1181 overdue_count += 1
1183 # Update subscription's last/next refresh times
1184 sub.last_refresh = datetime.now(UTC)
1185 sub.next_refresh = datetime.now(UTC) + timedelta(
1186 minutes=sub.refresh_interval_minutes
1187 )
1188 db.commit()
1190 results.append(
1191 {
1192 "id": str(sub.id),
1193 "name": sub.name or sub.query_or_topic[:50],
1194 "research_id": result.get("research_id"),
1195 }
1196 )
1197 else:
1198 results.append(
1199 {
1200 "id": str(sub.id),
1201 "name": sub.name or sub.query_or_topic[:50],
1202 "error": result.get(
1203 "error", "Failed to start research"
1204 ),
1205 }
1206 )
1207 except Exception as e:
1208 logger.exception(f"Error running subscription {sub.id}")
1209 results.append(
1210 {
1211 "id": str(sub.id),
1212 "name": sub.name or sub.query_or_topic[:50],
1213 "error": safe_error_message(
1214 e, "running subscription"
1215 ),
1216 }
1217 )
1219 return jsonify(
1220 {
1221 "status": "success",
1222 "overdue_found": len(overdue_subs),
1223 "started": overdue_count,
1224 "results": results,
1225 }
1226 )
1228 except Exception as e:
1229 return jsonify(
1230 {"error": safe_error_message(e, "checking overdue subscriptions")}
1231 ), 500
1234# Folder and subscription management routes
1235@news_api_bp.route("/subscription/folders", methods=["GET"])
1236@login_required
1237def get_folders():
1238 """Get all folders for the current user"""
1239 try:
1240 user_id = get_user_id()
1242 with get_user_db_session() as session:
1243 manager = FolderManager(session)
1244 folders = manager.get_user_folders(user_id)
1246 return jsonify([folder.to_dict() for folder in folders])
1248 except Exception as e:
1249 return jsonify({"error": safe_error_message(e, "getting folders")}), 500
1252@news_api_bp.route("/subscription/folders", methods=["POST"])
1253@login_required
1254@require_json_body()
1255def create_folder():
1256 """Create a new folder"""
1257 try:
1258 data = request.json
1259 if not data.get("name"):
1260 return jsonify({"error": "Folder name is required"}), 400
1262 with get_user_db_session() as session:
1263 manager = FolderManager(session)
1265 # Check if folder already exists
1266 existing = (
1267 session.query(SubscriptionFolder)
1268 .filter_by(name=data["name"])
1269 .first()
1270 )
1271 if existing:
1272 return jsonify({"error": "Folder already exists"}), 409
1274 folder = manager.create_folder(
1275 name=data["name"],
1276 description=data.get("description"),
1277 )
1279 return jsonify(folder.to_dict()), 201
1281 except Exception as e:
1282 return jsonify({"error": safe_error_message(e, "creating folder")}), 500
1285@news_api_bp.route("/subscription/folders/<folder_id>", methods=["PUT"])
1286@login_required
1287@require_json_body()
1288def update_folder(folder_id):
1289 """Update a folder"""
1290 try:
1291 data = request.json
1292 with get_user_db_session() as session:
1293 manager = FolderManager(session)
1294 folder = manager.update_folder(folder_id, **data)
1296 if not folder:
1297 return jsonify({"error": "Folder not found"}), 404
1299 return jsonify(folder.to_dict())
1301 except Exception as e:
1302 return jsonify({"error": safe_error_message(e, "updating folder")}), 500
1305@news_api_bp.route("/subscription/folders/<folder_id>", methods=["DELETE"])
1306@login_required
1307def delete_folder(folder_id):
1308 """Delete a folder"""
1309 try:
1310 move_to = request.args.get("move_to")
1312 with get_user_db_session() as session:
1313 manager = FolderManager(session)
1314 success = manager.delete_folder(folder_id, move_to)
1316 if not success:
1317 return jsonify({"error": "Folder not found"}), 404
1319 return jsonify({"status": "deleted"}), 200
1321 except Exception as e:
1322 return jsonify({"error": safe_error_message(e, "deleting folder")}), 500
1325@news_api_bp.route("/subscription/subscriptions/organized", methods=["GET"])
1326@login_required
1327def get_subscriptions_organized():
1328 """Get subscriptions organized by folder"""
1329 try:
1330 user_id = get_user_id()
1332 with get_user_db_session() as session:
1333 manager = FolderManager(session)
1334 organized = manager.get_subscriptions_by_folder(user_id)
1336 # Convert to JSON-friendly format
1337 result = {}
1338 for folder, subs in organized.items():
1339 result[folder] = [sub.to_dict() for sub in subs]
1341 return jsonify(result)
1343 except Exception as e:
1344 return jsonify(
1345 {"error": safe_error_message(e, "getting organized subscriptions")}
1346 ), 500
1349@news_api_bp.route(
1350 "/subscription/subscriptions/<subscription_id>", methods=["PUT"]
1351)
1352@login_required
1353@require_json_body()
1354def update_subscription_folder(subscription_id):
1355 """Update a subscription (mainly for folder assignment)"""
1356 try:
1357 data = request.json
1358 logger.info(
1359 f"Updating subscription {subscription_id} with data: {data}"
1360 )
1362 with get_user_db_session() as session:
1363 # Manually handle the update to ensure next_refresh is recalculated
1364 from ...database.models import NewsSubscription as BaseSubscription
1365 from datetime import datetime, timedelta, timezone
1367 sub = (
1368 session.query(BaseSubscription)
1369 .filter_by(id=subscription_id)
1370 .first()
1371 )
1372 if not sub:
1373 return jsonify({"error": "Subscription not found"}), 404
1375 # Update fields
1376 for key, value in data.items():
1377 if hasattr(sub, key) and key not in [
1378 "id",
1379 "user_id",
1380 "created_at",
1381 ]:
1382 setattr(sub, key, value)
1384 # Recalculate next_refresh if refresh_interval_minutes changed
1385 if "refresh_interval_minutes" in data:
1386 new_minutes = data["refresh_interval_minutes"]
1387 if sub.last_refresh:
1388 sub.next_refresh = sub.last_refresh + timedelta(
1389 minutes=new_minutes
1390 )
1391 else:
1392 sub.next_refresh = datetime.now(timezone.utc) + timedelta(
1393 minutes=new_minutes
1394 )
1395 logger.info(f"Recalculated next_refresh: {sub.next_refresh}")
1397 sub.updated_at = datetime.now(timezone.utc)
1398 session.commit()
1400 result = sub.to_dict()
1401 logger.info(
1402 f"Updated subscription result: refresh_interval_minutes={result.get('refresh_interval_minutes')}, next_refresh={result.get('next_refresh')}"
1403 )
1404 return jsonify(result)
1405 # Force reload: v2
1407 except Exception as e:
1408 return jsonify(
1409 {"error": safe_error_message(e, "updating subscription")}
1410 ), 500
1413@news_api_bp.route("/subscription/stats", methods=["GET"])
1414@login_required
1415def get_subscription_stats():
1416 """Get subscription statistics"""
1417 try:
1418 user_id = get_user_id()
1420 with get_user_db_session() as session:
1421 manager = FolderManager(session)
1422 stats = manager.get_subscription_stats(user_id)
1424 return jsonify(stats)
1426 except Exception as e:
1427 return jsonify({"error": safe_error_message(e, "getting stats")}), 500
1430# Error handlers
1431@news_api_bp.errorhandler(400)
1432def bad_request(e):
1433 return jsonify({"error": "Bad request"}), 400
1436@news_api_bp.errorhandler(404)
1437def not_found(e):
1438 return jsonify({"error": "Resource not found"}), 404
1441@news_api_bp.errorhandler(500)
1442def internal_error(e):
1443 return jsonify({"error": "Internal server error"}), 500
1446@news_api_bp.route("/search-history", methods=["GET"])
1447@login_required
1448def get_search_history():
1449 """Get search history for current user."""
1450 try:
1451 # Get username from session
1452 from ..web.auth.decorators import current_user
1454 username = current_user()
1455 if not username:
1456 # Not authenticated, return empty history
1457 return jsonify({"search_history": []})
1459 # Get search history from user's encrypted database
1460 from ..database.session_context import get_user_db_session
1461 from ..database.models import UserNewsSearchHistory
1463 # Get password from Flask g object (set by middleware)
1464 from flask import g
1466 password = getattr(g, "user_password", None)
1468 with get_user_db_session(username, password) as db_session:
1469 history = (
1470 db_session.query(UserNewsSearchHistory)
1471 .order_by(UserNewsSearchHistory.created_at.desc())
1472 .limit(20)
1473 .all()
1474 )
1476 return jsonify(
1477 {"search_history": [item.to_dict() for item in history]}
1478 )
1480 except Exception as e:
1481 return jsonify(
1482 {"error": safe_error_message(e, "getting search history")}
1483 ), 500
1486@news_api_bp.route("/search-history", methods=["POST"])
1487@login_required
1488def add_search_history():
1489 """Add a search to the history."""
1490 try:
1491 # Get username from session
1492 from ..web.auth.decorators import current_user
1494 username = current_user()
1495 if not username:
1496 # Not authenticated
1497 return jsonify({"error": "Authentication required"}), 401
1499 data = request.get_json()
1500 logger.info(
1501 f"add_search_history received data keys: {list(data.keys()) if data else 'None'}"
1502 )
1503 if not data or not data.get("query"):
1504 logger.warning("Invalid search history data: missing query")
1505 return jsonify({"error": "query is required"}), 400
1507 # Add to user's encrypted database
1508 from ..database.session_context import get_user_db_session
1509 from ..database.models import UserNewsSearchHistory
1511 # Get password from Flask g object (set by middleware)
1512 from flask import g
1514 password = getattr(g, "user_password", None)
1516 with get_user_db_session(username, password) as db_session:
1517 search_history = UserNewsSearchHistory(
1518 query=data["query"],
1519 search_type=data.get("type", "filter"),
1520 result_count=data.get("resultCount", 0),
1521 )
1522 db_session.add(search_history)
1523 db_session.commit()
1525 return jsonify({"status": "success", "id": search_history.id})
1527 except Exception as e:
1528 logger.exception("Error adding search history")
1529 return jsonify(
1530 {"error": safe_error_message(e, "adding search history")}
1531 ), 500
1534@news_api_bp.route("/search-history", methods=["DELETE"])
1535@login_required
1536def clear_search_history():
1537 """Clear all search history for current user."""
1538 try:
1539 # Get username from session
1540 from ..web.auth.decorators import current_user
1542 username = current_user()
1543 if not username:
1544 return jsonify({"status": "success"})
1546 # Clear from user's encrypted database
1547 from ..database.session_context import get_user_db_session
1548 from ..database.models import UserNewsSearchHistory
1550 # Get password from Flask g object (set by middleware)
1551 from flask import g
1553 password = getattr(g, "user_password", None)
1555 with get_user_db_session(username, password) as db_session:
1556 db_session.query(UserNewsSearchHistory).delete()
1557 db_session.commit()
1559 return jsonify({"status": "success"})
1561 except Exception as e:
1562 return jsonify(
1563 {"error": safe_error_message(e, "clearing search history")}
1564 ), 500