Coverage for src/local_deep_research/news/web.py: 98%
83 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Flask blueprint for news system web routes.
3"""
5from flask import Blueprint, jsonify, render_template
6from loguru import logger
8from ..constants import get_available_strategies
9from ..web.auth.decorators import login_required
10from . import api
12# get_db_setting not available in merged codebase - will use defaults
15def create_news_blueprint():
16 """
17 Create Flask blueprint for news routes.
19 Returns:
20 Flask Blueprint instance with both page routes and API routes
21 """
22 bp = Blueprint("news", __name__)
24 # Import the Flask API blueprint
25 from .flask_api import news_api_bp
27 # Register the API blueprint as sub-blueprint
28 bp.register_blueprint(news_api_bp) # type: ignore[attr-defined,unused-ignore]
30 def _show_all_strategies() -> bool:
31 """Check if user has enabled show_all_strategies setting."""
32 from flask import session
34 username = session.get("username", "anonymous")
35 if username == "anonymous":
36 return False
37 try:
38 from local_deep_research.database.session_context import (
39 get_user_db_session,
40 )
41 from local_deep_research.utilities.db_utils import (
42 get_settings_manager,
43 )
45 with get_user_db_session(username) as db_session:
46 settings_manager = get_settings_manager(db_session, username)
47 return bool(
48 settings_manager.get_setting(
49 "search.show_all_strategies", False
50 )
51 )
52 except Exception:
53 return False
55 # Page routes
56 @bp.route("/")
57 @login_required
58 def news_page():
59 """Render the main news page."""
60 return render_template(
61 "pages/news.html",
62 strategies=get_available_strategies(
63 show_all=_show_all_strategies()
64 ),
65 )
67 @bp.route("/subscriptions")
68 @login_required
69 def subscriptions_page():
70 """Render the subscriptions management page."""
71 return render_template("pages/subscriptions.html")
73 @bp.route("/subscriptions/new")
74 @login_required
75 def new_subscription_page():
76 """Render the create subscription page."""
77 from flask import session
79 # Get username from session
80 username = session.get("username", "anonymous")
82 # Try to get settings from database, fall back to defaults
83 default_settings = {
84 "iterations": 3,
85 "questions_per_iteration": 5,
86 "search_engine": "auto",
87 "model_provider": "ollama",
88 "model": "",
89 "search_strategy": "source-based",
90 }
92 # Only try to get settings if user is logged in
93 if username != "anonymous":
94 # Load user settings using the extracted function
95 from local_deep_research.database.session_context import (
96 get_user_db_session,
97 )
99 with get_user_db_session(username) as db_session:
100 load_user_settings(default_settings, db_session, username)
102 return render_template(
103 "pages/news-subscription-form.html",
104 subscription=None,
105 default_settings=default_settings,
106 strategies=get_available_strategies(
107 show_all=_show_all_strategies()
108 ),
109 )
111 @bp.route("/subscriptions/<subscription_id>/edit")
112 @login_required
113 def edit_subscription_page(subscription_id):
114 """Render the edit subscription page."""
115 from flask import session
117 # Get username from session
118 username = session.get("username", "anonymous")
120 # Load subscription data
121 subscription = None
122 default_settings = {
123 "iterations": 3,
124 "questions_per_iteration": 5,
125 "search_engine": "auto",
126 "model_provider": "ollama",
127 "model": "",
128 "search_strategy": "source-based",
129 }
131 try:
132 # Load the subscription using the API
133 subscription = api.get_subscription(subscription_id)
134 logger.info(
135 f"Loaded subscription {subscription_id}: {subscription}"
136 )
138 if not subscription:
139 logger.warning(f"Subscription {subscription_id} not found")
140 # Could redirect to 404 or subscriptions page
141 return render_template(
142 "pages/news-subscription-form.html",
143 subscription=None,
144 error="Subscription not found",
145 default_settings=default_settings,
146 strategies=get_available_strategies(
147 show_all=_show_all_strategies()
148 ),
149 )
151 # Load user's default settings if logged in
152 if username != "anonymous":
153 # Load user settings using the extracted function
154 from local_deep_research.database.session_context import (
155 get_user_db_session,
156 )
158 with get_user_db_session(username) as db_session:
159 load_user_settings(default_settings, db_session, username)
161 except Exception:
162 logger.exception(f"Error loading subscription {subscription_id}")
163 return render_template(
164 "pages/news-subscription-form.html",
165 subscription=None,
166 error="Error loading subscription",
167 default_settings=default_settings,
168 strategies=get_available_strategies(
169 show_all=_show_all_strategies()
170 ),
171 )
173 return render_template(
174 "pages/news-subscription-form.html",
175 subscription=subscription,
176 default_settings=default_settings,
177 strategies=get_available_strategies(
178 show_all=_show_all_strategies()
179 ),
180 )
182 # Health check
183 @bp.route("/health")
184 def health_check():
185 """Check if news system is healthy."""
186 try:
187 # Check if database is accessible
188 from .core.storage_manager import StorageManager
190 storage = StorageManager()
192 # Try a simple query
193 storage.get_user_feed("health_check", limit=1)
195 return jsonify(
196 {
197 "status": "healthy",
198 "enabled": True, # Default: get_db_setting("news.enabled", True)
199 "database": "connected",
200 }
201 )
202 except Exception:
203 logger.exception("Health check failed")
204 return jsonify(
205 {
206 "status": "unhealthy",
207 "error": "An internal error has occurred.",
208 }
209 ), 500
211 return bp
214def load_user_settings(default_settings, db_session=None, username=None):
215 """
216 Load user settings and update default_settings dictionary.
217 Extracted to avoid code duplication as suggested by djpetti.
219 Args:
220 default_settings: Dictionary to update with user settings
221 db_session: Database session for accessing settings
222 username: Username for settings context
223 """
224 if not db_session:
225 logger.warning("No database session provided, using defaults")
226 return
228 try:
229 from ..utilities.db_utils import get_settings_manager
231 settings_manager = get_settings_manager(db_session, username)
233 default_settings.update(
234 {
235 "iterations": settings_manager.get_setting(
236 "search.iterations", 3
237 ),
238 "questions_per_iteration": settings_manager.get_setting(
239 "search.questions_per_iteration", 5
240 ),
241 "search_engine": settings_manager.get_setting(
242 "search.tool", "auto"
243 ),
244 "model_provider": settings_manager.get_setting(
245 "llm.provider", "ollama"
246 ),
247 "model": settings_manager.get_setting("llm.model", ""),
248 "search_strategy": settings_manager.get_setting(
249 "search.search_strategy", "source-based"
250 ),
251 "custom_endpoint": settings_manager.get_setting(
252 "llm.openai_endpoint.url", ""
253 ),
254 }
255 )
256 except Exception:
257 logger.warning("Could not load user settings")
258 # Use defaults