Coverage for src / local_deep_research / news / flask_api.py: 19%
633 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Flask API endpoints for news system.
3Converted from FastAPI to match LDR's Flask architecture.
4"""
6from typing import Dict, Any
7from flask import Blueprint, request, jsonify
8from loguru import logger
10from . import api
11from .folder_manager import FolderManager
12from ..database.models import SubscriptionFolder
13from ..web.auth.decorators import login_required
14from ..database.session_context import get_user_db_session
15from ..utilities.db_utils import get_settings_manager
16from ..security import safe_post
19def safe_error_message(e: Exception, context: str = "") -> str:
20 """
21 Return a safe error message that doesn't expose internal details.
23 Args:
24 e: The exception
25 context: Optional context about what was being attempted
27 Returns:
28 A generic error message safe for external users
29 """
30 # Log the actual error for debugging
31 logger.exception(f"Error in {context}")
33 # Return generic messages based on exception type
34 if isinstance(e, ValueError):
35 return "Invalid input provided"
36 elif isinstance(e, KeyError):
37 return "Required data missing"
38 elif isinstance(e, TypeError):
39 return "Invalid data format"
40 else:
41 # Generic message for production
42 return f"An error occurred{f' while {context}' if context else ''}"
45# Create Blueprint - no url_prefix here since parent blueprint already has /news
46news_api_bp = Blueprint("news_api", __name__, url_prefix="/api")
48# Components are initialized in api.py
51def get_user_id():
52 """Get current user ID from session"""
53 from ..web.auth.decorators import current_user
55 username = current_user()
57 if not username:
58 # For news, we need authenticated users
59 return None
61 return username
64@news_api_bp.route("/feed", methods=["GET"])
65@login_required
66def get_news_feed() -> Dict[str, Any]:
67 """
68 Get personalized news feed for user.
70 Query params:
71 user_id: User identifier (default: anonymous)
72 limit: Maximum number of cards to return (default: 20)
73 use_cache: Whether to use cached news (default: true)
74 strategy: Override default recommendation strategy
75 focus: Optional focus area for news
76 """
77 try:
78 # Get current user (login_required ensures we have one)
79 user_id = get_user_id()
80 logger.info(f"News feed requested by user: {user_id}")
82 # Get query parameters
83 settings_manager = get_settings_manager()
84 default_limit = settings_manager.get_setting("news.feed.default_limit")
85 limit = int(request.args.get("limit", default_limit))
86 use_cache = request.args.get("use_cache", "true").lower() == "true"
87 strategy = request.args.get("strategy")
88 focus = request.args.get("focus")
89 subscription_id = request.args.get("subscription_id")
91 logger.info(
92 f"News feed params: limit={limit}, subscription_id={subscription_id}, focus={focus}"
93 )
95 # Call the direct API function (now synchronous)
96 result = api.get_news_feed(
97 user_id=user_id,
98 limit=limit,
99 use_cache=use_cache,
100 focus=focus,
101 search_strategy=strategy,
102 subscription_id=subscription_id,
103 )
105 # Check for errors in result
106 if "error" in result and result.get("news_items") == []:
107 # Sanitize error message before returning to client
108 safe_msg = safe_error_message(
109 Exception(result["error"]), context="get_news_feed"
110 )
111 return jsonify(
112 {"error": safe_msg, "news_items": []}
113 ), 400 if "must be between" in result["error"] else 500
115 # Debug: Log the result before returning
116 logger.info(
117 f"API returning {len(result.get('news_items', []))} news items"
118 )
119 if result.get("news_items"):
120 logger.info(
121 f"First item ID: {result['news_items'][0].get('id', 'NO_ID')}"
122 )
124 return jsonify(result)
126 except Exception as e:
127 return jsonify(
128 {
129 "error": safe_error_message(e, "getting news feed"),
130 "news_items": [],
131 }
132 ), 500
135@news_api_bp.route("/subscribe", methods=["POST"])
136@login_required
137def create_subscription() -> Dict[str, Any]:
138 """
139 Create a new subscription for user.
141 JSON body:
142 query: Search query or topic
143 subscription_type: "search" or "topic" (default: "search")
144 refresh_minutes: Refresh interval in minutes (default: from settings)
145 """
146 try:
147 data = request.get_json(force=True)
148 except Exception:
149 # Handle invalid JSON
150 return jsonify({"error": "Invalid JSON data"}), 400
152 try:
153 if not data:
154 return jsonify({"error": "No JSON data provided"}), 400
156 # Get current user
157 user_id = get_user_id()
159 # Extract parameters
160 query = data.get("query")
161 subscription_type = data.get("subscription_type", "search")
162 refresh_minutes = data.get(
163 "refresh_minutes"
164 ) # Will use default from api.py
166 # Extract model configuration (optional)
167 model_provider = data.get("model_provider")
168 model = data.get("model")
169 search_strategy = data.get("search_strategy", "news_aggregation")
170 custom_endpoint = data.get("custom_endpoint")
172 # Extract additional fields
173 name = data.get("name")
174 folder_id = data.get("folder_id")
175 is_active = data.get("is_active", True)
176 search_engine = data.get("search_engine")
177 search_iterations = data.get("search_iterations")
178 questions_per_iteration = data.get("questions_per_iteration")
180 # Validate required fields
181 if not query:
182 return jsonify({"error": "query is required"}), 400
184 # Call the direct API function
185 result = api.create_subscription(
186 user_id=user_id,
187 query=query,
188 subscription_type=subscription_type,
189 refresh_minutes=refresh_minutes,
190 model_provider=model_provider,
191 model=model,
192 search_strategy=search_strategy,
193 custom_endpoint=custom_endpoint,
194 name=name,
195 folder_id=folder_id,
196 is_active=is_active,
197 search_engine=search_engine,
198 search_iterations=search_iterations,
199 questions_per_iteration=questions_per_iteration,
200 )
202 return jsonify(result)
204 except ValueError as e:
205 return jsonify(
206 {"error": safe_error_message(e, "creating subscription")}
207 ), 400
208 except Exception as e:
209 return jsonify(
210 {"error": safe_error_message(e, "creating subscription")}
211 ), 500
214@news_api_bp.route("/vote", methods=["POST"])
215@login_required
216def vote_on_news() -> Dict[str, Any]:
217 """
218 Submit vote on a news item.
220 JSON body:
221 card_id: ID of the news card
222 vote: "up" or "down"
223 """
224 try:
225 data = request.get_json()
226 if not data:
227 return jsonify({"error": "No JSON data provided"}), 400
229 # Get current user
230 user_id = get_user_id()
232 card_id = data.get("card_id")
233 vote = data.get("vote")
235 # Validate
236 if not all([card_id, vote]):
237 return jsonify({"error": "card_id and vote are required"}), 400
239 # Call the direct API function
240 result = api.submit_feedback(
241 card_id=card_id, user_id=user_id, vote=vote
242 )
244 return jsonify(result)
246 except ValueError as e:
247 error_msg = str(e)
248 if "not found" in error_msg.lower():
249 return jsonify({"error": "Resource not found"}), 404
250 else:
251 return jsonify(
252 {"error": safe_error_message(e, "submitting vote")}
253 ), 400
254 except Exception as e:
255 return jsonify({"error": safe_error_message(e, "submitting vote")}), 500
258@news_api_bp.route("/feedback/batch", methods=["POST"])
259@login_required
260def get_batch_feedback() -> Dict[str, Any]:
261 """
262 Get feedback (votes) for multiple news cards.
263 JSON body:
264 card_ids: List of card IDs
265 """
266 try:
267 data = request.get_json()
268 if not data:
269 return jsonify({"error": "No JSON data provided"}), 400
271 card_ids = data.get("card_ids", [])
272 if not card_ids:
273 return jsonify({"votes": {}})
275 # Get current user
276 user_id = get_user_id()
278 # Call the direct API function
279 result = api.get_votes_for_cards(card_ids=card_ids, user_id=user_id)
281 return jsonify(result)
283 except ValueError as e:
284 error_msg = str(e)
285 if "not found" in error_msg.lower():
286 return jsonify({"error": "Resource not found"}), 404
287 return jsonify({"error": safe_error_message(e, "getting votes")}), 400
288 except Exception as e:
289 logger.exception("Error getting batch feedback")
290 return jsonify({"error": safe_error_message(e, "getting votes")}), 500
293@news_api_bp.route("/feedback/<card_id>", methods=["POST"])
294@login_required
295def submit_feedback(card_id: str) -> Dict[str, Any]:
296 """
297 Submit feedback (vote) for a news card.
299 JSON body:
300 vote: "up" or "down"
301 """
302 try:
303 data = request.get_json()
304 if not data:
305 return jsonify({"error": "No JSON data provided"}), 400
307 # Get current user
308 user_id = get_user_id()
309 vote = data.get("vote")
311 # Validate
312 if not vote:
313 return jsonify({"error": "vote is required"}), 400
315 # Call the direct API function
316 result = api.submit_feedback(
317 card_id=card_id, user_id=user_id, vote=vote
318 )
320 return jsonify(result)
322 except ValueError as e:
323 error_msg = str(e)
324 if "not found" in error_msg.lower():
325 return jsonify({"error": "Resource not found"}), 404
326 elif "must be" in error_msg.lower():
327 return jsonify({"error": "Invalid input value"}), 400
328 else:
329 return jsonify(
330 {"error": safe_error_message(e, "submitting feedback")}
331 ), 400
332 except Exception as e:
333 return jsonify(
334 {"error": safe_error_message(e, "submitting feedback")}
335 ), 500
338@news_api_bp.route("/research/<card_id>", methods=["POST"])
339@login_required
340def research_news_item(card_id: str) -> Dict[str, Any]:
341 """
342 Perform deeper research on a news item.
344 JSON body:
345 depth: "quick", "detailed", or "report" (default: "quick")
346 """
347 try:
348 data = request.get_json() or {}
349 depth = data.get("depth", "quick")
351 # Call the API function which handles the research
352 result = api.research_news_item(card_id, depth)
354 return jsonify(result)
356 except Exception as e:
357 return jsonify(
358 {"error": safe_error_message(e, "researching news item")}
359 ), 500
362@news_api_bp.route("/subscriptions/current", methods=["GET"])
363@login_required
364def get_current_user_subscriptions() -> Dict[str, Any]:
365 """Get all subscriptions for current user."""
366 try:
367 # Get current user
368 user_id = get_user_id()
370 # Ensure we have a database session for the user
371 # This will trigger register_activity
372 logger.debug(f"Getting news feed for user {user_id}")
374 # Use the API function
375 result = api.get_subscriptions(user_id)
376 if "error" in result:
377 logger.error(
378 f"Error getting subscriptions for user {user_id}: {result['error']}"
379 )
380 return jsonify({"error": "Failed to retrieve subscriptions"}), 500
381 return jsonify(result)
383 except Exception as e:
384 return jsonify(
385 {"error": safe_error_message(e, "getting subscriptions")}
386 ), 500
389@news_api_bp.route("/subscriptions/<subscription_id>", methods=["GET"])
390@login_required
391def get_subscription(subscription_id: str) -> Dict[str, Any]:
392 """Get a single subscription by ID."""
393 try:
394 # Handle null or invalid subscription IDs
395 if (
396 subscription_id == "null"
397 or subscription_id == "undefined"
398 or not subscription_id
399 ):
400 return jsonify({"error": "Invalid subscription ID"}), 400
402 # Get the subscription
403 subscription = api.get_subscription(subscription_id)
405 if not subscription:
406 return jsonify({"error": "Subscription not found"}), 404
408 return jsonify(subscription)
410 except Exception as e:
411 return jsonify(
412 {"error": safe_error_message(e, "getting subscription")}
413 ), 500
416@news_api_bp.route("/subscriptions/<subscription_id>", methods=["PUT"])
417@login_required
418def update_subscription(subscription_id: str) -> Dict[str, Any]:
419 """Update a subscription."""
420 try:
421 data = request.get_json(force=True)
422 except Exception:
423 return jsonify({"error": "Invalid JSON data"}), 400
425 try:
426 if not data:
427 return jsonify({"error": "No JSON data provided"}), 400
429 # Prepare update data
430 update_data = {}
432 # Map fields from request to storage format
433 field_mapping = {
434 "query": "query_or_topic",
435 "name": "name",
436 "refresh_minutes": "refresh_interval_minutes",
437 "is_active": "is_active",
438 "folder_id": "folder_id",
439 "model_provider": "model_provider",
440 "model": "model",
441 "search_strategy": "search_strategy",
442 "custom_endpoint": "custom_endpoint",
443 "search_engine": "search_engine",
444 "search_iterations": "search_iterations",
445 "questions_per_iteration": "questions_per_iteration",
446 }
448 for request_field, storage_field in field_mapping.items():
449 if request_field in data:
450 update_data[storage_field] = data[request_field]
452 # Update subscription
453 result = api.update_subscription(subscription_id, update_data)
455 if "error" in result:
456 # Sanitize error message before returning to client
457 original_error = result["error"]
458 result["error"] = safe_error_message(
459 Exception(original_error), "updating subscription"
460 )
461 if "not found" in original_error.lower():
462 return jsonify(result), 404
463 else:
464 return jsonify(result), 400
466 return jsonify(result)
468 except Exception as e:
469 return jsonify(
470 {"error": safe_error_message(e, "updating subscription")}
471 ), 500
474@news_api_bp.route("/subscriptions/<subscription_id>", methods=["DELETE"])
475@login_required
476def delete_subscription(subscription_id: str) -> Dict[str, Any]:
477 """Delete a subscription."""
478 try:
479 # Call the direct API function
480 success = api.delete_subscription(subscription_id)
482 if success:
483 return jsonify(
484 {
485 "status": "success",
486 "message": f"Subscription {subscription_id} deleted",
487 }
488 )
489 else:
490 return jsonify({"error": "Subscription not found"}), 404
492 except Exception as e:
493 return jsonify(
494 {"error": safe_error_message(e, "deleting subscription")}
495 ), 500
498@news_api_bp.route("/subscriptions/<subscription_id>/run", methods=["POST"])
499@login_required
500def run_subscription_now(subscription_id: str) -> Dict[str, Any]:
501 """Manually trigger a subscription to run now."""
502 try:
503 # Get the subscription from the API
504 subscription_data = api.get_subscriptions("anonymous")
506 # Find the specific subscription
507 subscription = None
508 for sub in subscription_data.get("subscriptions", []):
509 if sub["id"] == subscription_id:
510 subscription = sub
511 break
513 if not subscription:
514 return jsonify({"error": "Subscription not found"}), 404
516 # Get timezone-aware current date using settings
517 from flask import session
518 from .core.utils import get_local_date_string
519 from ..database.session_context import get_user_db_session
520 from ..settings.manager import SettingsManager
522 username = session.get("username", "anonymous")
523 with get_user_db_session(username) as db:
524 settings_manager = SettingsManager(db)
525 current_date = get_local_date_string(settings_manager)
527 # Get the query and update dates
528 query = subscription["query"]
530 # Replace YYYY-MM-DD placeholder ONLY (not all dates)
531 query = query.replace("YYYY-MM-DD", current_date)
533 # Build request data similar to news page
534 request_data = {
535 "query": query,
536 "mode": "quick",
537 # Use subscription's model configuration if available
538 "model_provider": subscription.get(
539 "model_provider", "OLLAMA"
540 ), # Default: llm.provider
541 "model": subscription.get("model", "llama3"), # Default: llm.model
542 "strategy": subscription.get("search_strategy", "news_aggregation"),
543 "metadata": {
544 "is_news_search": True,
545 "search_type": "news_analysis",
546 "display_in": "news_feed",
547 "subscription_id": subscription_id,
548 "triggered_by": "manual",
549 "original_query": subscription[
550 "query"
551 ], # Store original query with placeholder
552 "processed_query": query, # Store processed query with replaced date
553 "news_date": current_date, # Store the actual date used
554 "title": subscription.get("name")
555 if subscription.get("name")
556 else None,
557 },
558 }
560 # Add custom endpoint if specified
561 if subscription.get("custom_endpoint"):
562 request_data["custom_endpoint"] = subscription["custom_endpoint"]
564 # Call the CSRF-exempt research API endpoint (api_bp at /research/api)
565 # Use request.host_url to get the actual URL the server is responding on
566 base_url = request.host_url.rstrip("/")
568 response = safe_post(
569 f"{base_url}/research/api/start",
570 json=request_data,
571 headers={"Content-Type": "application/json"},
572 allow_localhost=True,
573 allow_private_ips=True,
574 )
576 if response.ok:
577 data = response.json()
578 if data.get("status") in ("success", "queued"):
579 return jsonify(
580 {
581 "status": "success",
582 "message": "Research started",
583 "research_id": data.get("research_id"),
584 "url": f"/progress/{data.get('research_id')}",
585 }
586 )
587 else:
588 return jsonify(
589 {"error": data.get("message", "Failed to start research")}
590 ), 500
591 else:
592 return jsonify(
593 {"error": f"Failed to start research: {response.status_code}"}
594 ), response.status_code
596 except Exception as e:
597 return jsonify(
598 {"error": safe_error_message(e, "running subscription")}
599 ), 500
602@news_api_bp.route("/subscriptions/<subscription_id>/history", methods=["GET"])
603@login_required
604def get_subscription_history(subscription_id: str) -> Dict[str, Any]:
605 """Get research history for a subscription."""
606 try:
607 settings_manager = get_settings_manager()
608 default_limit = settings_manager.get_setting("news.feed.default_limit")
609 limit = int(request.args.get("limit", default_limit))
610 result = api.get_subscription_history(subscription_id, limit)
611 if "error" in result:
612 logger.error(
613 f"Error getting subscription history: {result['error']}"
614 )
615 return jsonify(
616 {
617 "error": "Failed to retrieve subscription history",
618 "history": [],
619 }
620 ), 500
621 return jsonify(result)
622 except Exception as e:
623 return jsonify(
624 {"error": safe_error_message(e, "getting subscription history")}
625 ), 500
628@news_api_bp.route("/preferences", methods=["POST"])
629@login_required
630def save_preferences() -> Dict[str, Any]:
631 """Save user preferences for news."""
632 try:
633 data = request.get_json()
634 if not data:
635 return jsonify({"error": "No JSON data provided"}), 400
637 # Get current user
638 user_id = get_user_id()
639 preferences = data.get("preferences", {})
641 # Call the direct API function
642 result = api.save_news_preferences(user_id, preferences)
644 return jsonify(result)
646 except Exception as e:
647 return jsonify(
648 {"error": safe_error_message(e, "saving preferences")}
649 ), 500
652@news_api_bp.route("/categories", methods=["GET"])
653def get_categories() -> Dict[str, Any]:
654 """Get news category distribution."""
655 try:
656 # Call the direct API function
657 result = api.get_news_categories()
659 return jsonify(result)
661 except Exception as e:
662 return jsonify(
663 {"error": safe_error_message(e, "getting categories")}
664 ), 500
667@news_api_bp.route("/scheduler/status", methods=["GET"])
668@login_required
669def get_scheduler_status() -> Dict[str, Any]:
670 """Get activity-based scheduler status."""
671 try:
672 logger.info("Scheduler status endpoint called")
673 from .subscription_manager.scheduler import get_news_scheduler
675 # Get scheduler instance
676 scheduler = get_news_scheduler()
677 logger.info(
678 f"Scheduler instance obtained: is_running={scheduler.is_running}"
679 )
681 # Build status manually to avoid potential deadlock
682 status = {
683 "scheduler_available": True, # APScheduler is installed and working
684 "is_running": scheduler.is_running,
685 "config": scheduler.config.copy()
686 if hasattr(scheduler, "config")
687 else {},
688 "active_users": len(scheduler.user_sessions)
689 if hasattr(scheduler, "user_sessions")
690 else 0,
691 "total_scheduled_jobs": 0,
692 }
694 # Count scheduled jobs
695 if hasattr(scheduler, "user_sessions"):
696 total_jobs = sum(
697 len(session.get("scheduled_jobs", set()))
698 for session in scheduler.user_sessions.values()
699 )
700 status["total_scheduled_jobs"] = total_jobs
702 # Also count actual APScheduler jobs
703 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
704 try:
705 apscheduler_jobs = scheduler.scheduler.get_jobs()
706 status["apscheduler_job_count"] = len(apscheduler_jobs)
707 status["apscheduler_jobs"] = [
708 {
709 "id": job.id,
710 "name": job.name,
711 "next_run": job.next_run_time.isoformat()
712 if job.next_run_time
713 else None,
714 }
715 for job in apscheduler_jobs[
716 :10
717 ] # Limit to first 10 for display
718 ]
719 except Exception:
720 logger.exception("Error getting APScheduler jobs")
721 status["apscheduler_job_count"] = 0
723 logger.info(f"Status built: {list(status.keys())}")
725 # Add scheduled_jobs field that JS expects
726 status["scheduled_jobs"] = status.get("total_scheduled_jobs", 0)
728 logger.info(
729 f"Returning status: is_running={status.get('is_running')}, active_users={status.get('active_users')}"
730 )
731 return jsonify(status)
733 except Exception as e:
734 return jsonify(
735 {"error": safe_error_message(e, "getting scheduler status")}
736 ), 500
739@news_api_bp.route("/scheduler/start", methods=["POST"])
740@login_required
741def start_scheduler() -> Dict[str, Any]:
742 """Start the subscription scheduler."""
743 try:
744 from flask import current_app
745 from .subscription_manager.scheduler import get_news_scheduler
747 # Get scheduler instance
748 scheduler = get_news_scheduler()
750 if scheduler.is_running:
751 return jsonify({"message": "Scheduler is already running"}), 200
753 # Start the scheduler
754 scheduler.start()
756 # Update app reference
757 current_app.news_scheduler = scheduler
759 logger.info("News scheduler started via API")
760 return jsonify(
761 {
762 "status": "success",
763 "message": "Scheduler started",
764 "active_users": len(scheduler.user_sessions),
765 }
766 )
768 except Exception as e:
769 return jsonify(
770 {"error": safe_error_message(e, "starting scheduler")}
771 ), 500
774@news_api_bp.route("/scheduler/stop", methods=["POST"])
775@login_required
776def stop_scheduler() -> Dict[str, Any]:
777 """Stop the subscription scheduler."""
778 try:
779 from flask import current_app
781 if (
782 hasattr(current_app, "news_scheduler")
783 and current_app.news_scheduler
784 ):
785 scheduler = current_app.news_scheduler
786 if scheduler.is_running:
787 scheduler.stop()
788 logger.info("News scheduler stopped via API")
789 return jsonify(
790 {"status": "success", "message": "Scheduler stopped"}
791 )
792 else:
793 return jsonify({"message": "Scheduler is not running"}), 200
794 else:
795 return jsonify({"message": "No scheduler instance found"}), 404
797 except Exception as e:
798 return jsonify(
799 {"error": safe_error_message(e, "stopping scheduler")}
800 ), 500
803@news_api_bp.route("/scheduler/check-now", methods=["POST"])
804@login_required
805def check_subscriptions_now() -> Dict[str, Any]:
806 """Manually trigger subscription checking."""
807 try:
808 from flask import current_app
810 if (
811 not hasattr(current_app, "news_scheduler")
812 or not current_app.news_scheduler
813 ):
814 return jsonify({"error": "Scheduler not initialized"}), 503
816 scheduler = current_app.news_scheduler
817 if not scheduler.is_running:
818 return jsonify({"error": "Scheduler is not running"}), 503
820 # Run the check subscriptions task immediately
821 scheduler_instance = current_app.news_scheduler
823 # Get count of due subscriptions
824 from ..database.models import NewsSubscription as BaseSubscription
825 from datetime import datetime, timezone
827 with get_user_db_session() as session:
828 now = datetime.now(timezone.utc)
829 count = (
830 session.query(BaseSubscription)
831 .filter(
832 BaseSubscription.status == "active",
833 (BaseSubscription.next_refresh.is_(None))
834 | (BaseSubscription.next_refresh <= now),
835 )
836 .count()
837 )
839 # Trigger the check asynchronously
840 import threading
842 check_thread = threading.Thread(
843 target=scheduler_instance._check_subscriptions
844 )
845 check_thread.daemon = True
846 check_thread.start()
848 return jsonify(
849 {
850 "status": "success",
851 "message": f"Checking {count} due subscriptions",
852 "count": count,
853 }
854 )
856 except Exception as e:
857 return jsonify(
858 {"error": safe_error_message(e, "checking subscriptions")}
859 ), 500
862@news_api_bp.route("/scheduler/cleanup-now", methods=["POST"])
863@login_required
864def trigger_cleanup() -> Dict[str, Any]:
865 """Manually trigger cleanup job."""
866 try:
867 from .subscription_manager.scheduler import get_news_scheduler
868 from datetime import datetime, UTC, timedelta
870 scheduler = get_news_scheduler()
872 if not scheduler.is_running:
873 return jsonify({"error": "Scheduler is not running"}), 400
875 # Schedule cleanup to run in 1 second
876 scheduler.scheduler.add_job(
877 scheduler._run_cleanup_with_tracking,
878 "date",
879 run_date=datetime.now(UTC) + timedelta(seconds=1),
880 id="manual_cleanup_trigger",
881 )
883 return jsonify(
884 {
885 "status": "triggered",
886 "message": "Cleanup job will run within seconds",
887 }
888 )
890 except Exception as e:
891 return jsonify(
892 {"error": safe_error_message(e, "triggering cleanup")}
893 ), 500
896@news_api_bp.route("/scheduler/users", methods=["GET"])
897@login_required
898def get_active_users() -> Dict[str, Any]:
899 """Get summary of active user sessions."""
900 try:
901 from .subscription_manager.scheduler import get_news_scheduler
903 scheduler = get_news_scheduler()
904 users_summary = scheduler.get_user_sessions_summary()
906 return jsonify(
907 {"active_users": len(users_summary), "users": users_summary}
908 )
910 except Exception as e:
911 return jsonify(
912 {"error": safe_error_message(e, "getting active users")}
913 ), 500
916@news_api_bp.route("/scheduler/stats", methods=["GET"])
917@login_required
918def scheduler_stats() -> Dict[str, Any]:
919 """Get scheduler statistics and state."""
920 try:
921 from .subscription_manager.scheduler import get_news_scheduler
922 from flask import session
924 scheduler = get_news_scheduler()
925 username = session.get("username")
927 # Debug info
928 debug_info = {
929 "current_user": username,
930 "scheduler_running": scheduler.is_running,
931 "user_sessions": {},
932 "apscheduler_jobs": [],
933 }
935 # Get user session info
936 if hasattr(scheduler, "user_sessions"):
937 for user, session_info in scheduler.user_sessions.items():
938 debug_info["user_sessions"][user] = {
939 "has_password": bool(session_info.get("password")),
940 "last_activity": session_info.get(
941 "last_activity"
942 ).isoformat()
943 if session_info.get("last_activity")
944 else None,
945 "scheduled_jobs_count": len(
946 session_info.get("scheduled_jobs", set())
947 ),
948 }
950 # Get APScheduler jobs
951 if hasattr(scheduler, "scheduler") and scheduler.scheduler:
952 jobs = scheduler.scheduler.get_jobs()
953 debug_info["apscheduler_jobs"] = [
954 {
955 "id": job.id,
956 "name": job.name,
957 "next_run": job.next_run_time.isoformat()
958 if job.next_run_time
959 else None,
960 "trigger": str(job.trigger),
961 }
962 for job in jobs
963 ]
965 # Force schedule for current user
966 if username and username in scheduler.user_sessions:
967 logger.info(f"Forcing schedule update for {username}")
968 scheduler._schedule_user_subscriptions(username)
969 debug_info["forced_schedule"] = True
971 return jsonify(debug_info)
973 except Exception as e:
974 return jsonify(
975 {"error": safe_error_message(e, "getting scheduler stats")}
976 ), 500
979@news_api_bp.route("/check-overdue", methods=["POST"])
980@login_required
981def check_overdue_subscriptions():
982 """Check and run all overdue subscriptions for the current user."""
983 try:
984 from flask import session
985 from ..database.session_context import get_user_db_session
986 from ..database.models.news import NewsSubscription
987 from datetime import datetime, UTC, timedelta
989 username = session.get("username", "anonymous")
991 # Get overdue subscriptions
992 overdue_count = 0
993 results = []
994 with get_user_db_session(username) as db:
995 now = datetime.now(UTC)
996 overdue_subs = (
997 db.query(NewsSubscription)
998 .filter(
999 NewsSubscription.status == "active",
1000 NewsSubscription.next_refresh <= now,
1001 )
1002 .all()
1003 )
1005 logger.info(
1006 f"Found {len(overdue_subs)} overdue subscriptions for {username}"
1007 )
1009 # Get timezone-aware current date using settings
1010 from .core.utils import get_local_date_string
1011 from ..settings.manager import SettingsManager
1013 settings_manager = SettingsManager(db)
1014 current_date = get_local_date_string(settings_manager)
1016 for sub in overdue_subs:
1017 try:
1018 # Run the subscription using the same pattern as run_subscription_now
1019 logger.info(
1020 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1021 )
1023 # Update any date placeholders with current date in user's timezone
1024 query = sub.query_or_topic.replace(
1025 "YYYY-MM-DD", current_date
1026 )
1028 # Build request data
1029 request_data = {
1030 "query": query,
1031 "mode": "quick",
1032 "model_provider": sub.model_provider or "OLLAMA",
1033 "model": sub.model or "llama3",
1034 "strategy": sub.search_strategy or "news_aggregation",
1035 "metadata": {
1036 "is_news_search": True,
1037 "search_type": "news_analysis",
1038 "display_in": "news_feed",
1039 "subscription_id": str(sub.id),
1040 "triggered_by": "overdue_check",
1041 "original_query": sub.query_or_topic,
1042 "processed_query": query,
1043 "news_date": current_date,
1044 "title": sub.name if sub.name else None,
1045 },
1046 }
1048 # Add optional search parameters
1049 if sub.search_engine:
1050 request_data["search_engine"] = sub.search_engine
1051 if sub.custom_endpoint:
1052 request_data["custom_endpoint"] = sub.custom_endpoint
1054 # Start research using HTTP request like run_subscription_now
1055 logger.info(
1056 f"Running overdue subscription: {sub.name or sub.query_or_topic[:30]}"
1057 )
1059 # Make HTTP request to research API
1060 from flask import request
1062 # Use request.host_url to get the actual URL the server is responding on
1063 base_url = request.host_url.rstrip("/")
1065 # Use the session from the current request to maintain authentication
1066 session_cookie = request.cookies.get("session")
1068 response = safe_post(
1069 f"{base_url}/research/api/start",
1070 json=request_data,
1071 headers={
1072 "Content-Type": "application/json",
1073 "Cookie": f"session={session_cookie}"
1074 if session_cookie
1075 else "",
1076 },
1077 timeout=30,
1078 allow_localhost=True,
1079 allow_private_ips=True,
1080 )
1082 if response.ok:
1083 result = response.json()
1084 else:
1085 result = {
1086 "status": "error",
1087 "error": f"HTTP {response.status_code}: {response.text}",
1088 }
1090 if result.get("status") in ("success", "queued"):
1091 overdue_count += 1
1093 # Update subscription's last/next refresh times
1094 sub.last_refresh = datetime.now(UTC)
1095 sub.next_refresh = datetime.now(UTC) + timedelta(
1096 minutes=sub.refresh_interval_minutes
1097 )
1098 db.commit()
1100 results.append(
1101 {
1102 "id": str(sub.id),
1103 "name": sub.name or sub.query_or_topic[:50],
1104 "research_id": result.get("research_id"),
1105 }
1106 )
1107 else:
1108 results.append(
1109 {
1110 "id": str(sub.id),
1111 "name": sub.name or sub.query_or_topic[:50],
1112 "error": result.get(
1113 "error", "Failed to start research"
1114 ),
1115 }
1116 )
1117 except Exception as e:
1118 logger.exception(f"Error running subscription {sub.id}")
1119 results.append(
1120 {
1121 "id": str(sub.id),
1122 "name": sub.name or sub.query_or_topic[:50],
1123 "error": safe_error_message(
1124 e, "running subscription"
1125 ),
1126 }
1127 )
1129 return jsonify(
1130 {
1131 "status": "success",
1132 "overdue_found": len(overdue_subs),
1133 "started": overdue_count,
1134 "results": results,
1135 }
1136 )
1138 except Exception as e:
1139 return jsonify(
1140 {"error": safe_error_message(e, "checking overdue subscriptions")}
1141 ), 500
1144# Folder and subscription management routes
1145@news_api_bp.route("/subscription/folders", methods=["GET"])
1146@login_required
1147def get_folders():
1148 """Get all folders for the current user"""
1149 try:
1150 user_id = get_user_id()
1152 with get_user_db_session() as session:
1153 manager = FolderManager(session)
1154 folders = manager.get_user_folders(user_id)
1156 return jsonify([folder.to_dict() for folder in folders])
1158 except Exception as e:
1159 return jsonify({"error": safe_error_message(e, "getting folders")}), 500
1162@news_api_bp.route("/subscription/folders", methods=["POST"])
1163@login_required
1164def create_folder():
1165 """Create a new folder"""
1166 try:
1167 data = request.json
1169 if not data.get("name"):
1170 return jsonify({"error": "Folder name is required"}), 400
1172 with get_user_db_session() as session:
1173 manager = FolderManager(session)
1175 # Check if folder already exists
1176 existing = (
1177 session.query(SubscriptionFolder)
1178 .filter_by(name=data["name"])
1179 .first()
1180 )
1181 if existing:
1182 return jsonify({"error": "Folder already exists"}), 409
1184 folder = manager.create_folder(
1185 name=data["name"],
1186 description=data.get("description"),
1187 )
1189 return jsonify(folder.to_dict()), 201
1191 except Exception as e:
1192 return jsonify({"error": safe_error_message(e, "creating folder")}), 500
1195@news_api_bp.route("/subscription/folders/<folder_id>", methods=["PUT"])
1196@login_required
1197def update_folder(folder_id):
1198 """Update a folder"""
1199 try:
1200 data = request.json
1202 with get_user_db_session() as session:
1203 manager = FolderManager(session)
1204 folder = manager.update_folder(folder_id, **data)
1206 if not folder:
1207 return jsonify({"error": "Folder not found"}), 404
1209 return jsonify(folder.to_dict())
1211 except Exception as e:
1212 return jsonify({"error": safe_error_message(e, "updating folder")}), 500
1215@news_api_bp.route("/subscription/folders/<folder_id>", methods=["DELETE"])
1216@login_required
1217def delete_folder(folder_id):
1218 """Delete a folder"""
1219 try:
1220 move_to = request.args.get("move_to")
1222 with get_user_db_session() as session:
1223 manager = FolderManager(session)
1224 success = manager.delete_folder(folder_id, move_to)
1226 if not success:
1227 return jsonify({"error": "Folder not found"}), 404
1229 return jsonify({"status": "deleted"}), 200
1231 except Exception as e:
1232 return jsonify({"error": safe_error_message(e, "deleting folder")}), 500
1235@news_api_bp.route("/subscription/subscriptions/organized", methods=["GET"])
1236@login_required
1237def get_subscriptions_organized():
1238 """Get subscriptions organized by folder"""
1239 try:
1240 user_id = get_user_id()
1242 with get_user_db_session() as session:
1243 manager = FolderManager(session)
1244 organized = manager.get_subscriptions_by_folder(user_id)
1246 # Convert to JSON-friendly format
1247 result = {}
1248 for folder, subs in organized.items():
1249 result[folder] = [sub.to_dict() for sub in subs]
1251 return jsonify(result)
1253 except Exception as e:
1254 return jsonify(
1255 {"error": safe_error_message(e, "getting organized subscriptions")}
1256 ), 500
1259@news_api_bp.route(
1260 "/subscription/subscriptions/<subscription_id>", methods=["PUT"]
1261)
1262@login_required
1263def update_subscription_folder(subscription_id):
1264 """Update a subscription (mainly for folder assignment)"""
1265 try:
1266 data = request.json
1267 logger.info(
1268 f"Updating subscription {subscription_id} with data: {data}"
1269 )
1271 with get_user_db_session() as session:
1272 # Manually handle the update to ensure next_refresh is recalculated
1273 from ...database.models import NewsSubscription as BaseSubscription
1274 from datetime import datetime, timedelta, timezone
1276 sub = (
1277 session.query(BaseSubscription)
1278 .filter_by(id=subscription_id)
1279 .first()
1280 )
1281 if not sub:
1282 return jsonify({"error": "Subscription not found"}), 404
1284 # Update fields
1285 for key, value in data.items():
1286 if hasattr(sub, key) and key not in [
1287 "id",
1288 "user_id",
1289 "created_at",
1290 ]:
1291 setattr(sub, key, value)
1293 # Recalculate next_refresh if refresh_interval_minutes changed
1294 if "refresh_interval_minutes" in data:
1295 new_minutes = data["refresh_interval_minutes"]
1296 if sub.last_refresh:
1297 sub.next_refresh = sub.last_refresh + timedelta(
1298 minutes=new_minutes
1299 )
1300 else:
1301 sub.next_refresh = datetime.now(timezone.utc) + timedelta(
1302 minutes=new_minutes
1303 )
1304 logger.info(f"Recalculated next_refresh: {sub.next_refresh}")
1306 sub.updated_at = datetime.now(timezone.utc)
1307 session.commit()
1309 result = sub.to_dict()
1310 logger.info(
1311 f"Updated subscription result: refresh_interval_minutes={result.get('refresh_interval_minutes')}, next_refresh={result.get('next_refresh')}"
1312 )
1313 return jsonify(result)
1314 # Force reload: v2
1316 except Exception as e:
1317 return jsonify(
1318 {"error": safe_error_message(e, "updating subscription")}
1319 ), 500
1322@news_api_bp.route("/subscription/stats", methods=["GET"])
1323@login_required
1324def get_subscription_stats():
1325 """Get subscription statistics"""
1326 try:
1327 user_id = get_user_id()
1329 with get_user_db_session() as session:
1330 manager = FolderManager(session)
1331 stats = manager.get_subscription_stats(user_id)
1333 return jsonify(stats)
1335 except Exception as e:
1336 return jsonify({"error": safe_error_message(e, "getting stats")}), 500
1339# Error handlers
1340@news_api_bp.errorhandler(400)
1341def bad_request(e):
1342 return jsonify({"error": "Bad request"}), 400
1345@news_api_bp.errorhandler(404)
1346def not_found(e):
1347 return jsonify({"error": "Resource not found"}), 404
1350@news_api_bp.errorhandler(500)
1351def internal_error(e):
1352 return jsonify({"error": "Internal server error"}), 500
1355@news_api_bp.route("/search-history", methods=["GET"])
1356@login_required
1357def get_search_history():
1358 """Get search history for current user."""
1359 try:
1360 # Get username from session
1361 from ..web.auth.decorators import current_user
1363 username = current_user()
1364 if not username:
1365 # Not authenticated, return empty history
1366 return jsonify({"search_history": []})
1368 # Get search history from user's encrypted database
1369 from ..database.session_context import get_user_db_session
1370 from ..database.models import UserNewsSearchHistory
1372 # Get password from Flask g object (set by middleware)
1373 from flask import g
1375 password = getattr(g, "user_password", None)
1377 with get_user_db_session(username, password) as db_session:
1378 history = (
1379 db_session.query(UserNewsSearchHistory)
1380 .order_by(UserNewsSearchHistory.created_at.desc())
1381 .limit(20)
1382 .all()
1383 )
1385 return jsonify(
1386 {"search_history": [item.to_dict() for item in history]}
1387 )
1389 except Exception as e:
1390 return jsonify(
1391 {"error": safe_error_message(e, "getting search history")}
1392 ), 500
1395@news_api_bp.route("/search-history", methods=["POST"])
1396@login_required
1397def add_search_history():
1398 """Add a search to the history."""
1399 try:
1400 # Get username from session
1401 from ..web.auth.decorators import current_user
1403 username = current_user()
1404 if not username:
1405 # Not authenticated
1406 return jsonify({"error": "Authentication required"}), 401
1408 data = request.get_json()
1409 logger.info(
1410 f"add_search_history received data keys: {list(data.keys()) if data else 'None'}"
1411 )
1412 if not data or not data.get("query"):
1413 logger.warning("Invalid search history data: missing query")
1414 return jsonify({"error": "query is required"}), 400
1416 # Add to user's encrypted database
1417 from ..database.session_context import get_user_db_session
1418 from ..database.models import UserNewsSearchHistory
1420 # Get password from Flask g object (set by middleware)
1421 from flask import g
1423 password = getattr(g, "user_password", None)
1425 with get_user_db_session(username, password) as db_session:
1426 search_history = UserNewsSearchHistory(
1427 query=data["query"],
1428 search_type=data.get("type", "filter"),
1429 result_count=data.get("resultCount", 0),
1430 )
1431 db_session.add(search_history)
1432 db_session.commit()
1434 return jsonify({"status": "success", "id": search_history.id})
1436 except Exception as e:
1437 logger.exception("Error adding search history")
1438 return jsonify(
1439 {"error": safe_error_message(e, "adding search history")}
1440 ), 500
1443@news_api_bp.route("/search-history", methods=["DELETE"])
1444@login_required
1445def clear_search_history():
1446 """Clear all search history for current user."""
1447 try:
1448 # Get username from session
1449 from ..web.auth.decorators import current_user
1451 username = current_user()
1452 if not username:
1453 return jsonify({"status": "success"})
1455 # Clear from user's encrypted database
1456 from ..database.session_context import get_user_db_session
1457 from ..database.models import UserNewsSearchHistory
1459 # Get password from Flask g object (set by middleware)
1460 from flask import g
1462 password = getattr(g, "user_password", None)
1464 with get_user_db_session(username, password) as db_session:
1465 db_session.query(UserNewsSearchHistory).delete()
1466 db_session.commit()
1468 return jsonify({"status": "success"})
1470 except Exception as e:
1471 return jsonify(
1472 {"error": safe_error_message(e, "clearing search history")}
1473 ), 500