Coverage for src / local_deep_research / news / web.py: 98%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Flask blueprint for news system web routes.
3"""
5from flask import Blueprint, jsonify, render_template
6from loguru import logger
8from ..constants import get_available_strategies
9from . import api
11# get_db_setting not available in merged codebase - will use defaults
14def create_news_blueprint():
15 """
16 Create Flask blueprint for news routes.
18 Returns:
19 Flask Blueprint instance with both page routes and API routes
20 """
21 bp = Blueprint("news", __name__)
23 # Import the Flask API blueprint
24 from .flask_api import news_api_bp
26 # Register the API blueprint as sub-blueprint
27 bp.register_blueprint(news_api_bp) # type: ignore[attr-defined,unused-ignore]
29 def _show_all_strategies() -> bool:
30 """Check if user has enabled show_all_strategies setting."""
31 from flask import session
33 username = session.get("username", "anonymous")
34 if username == "anonymous":
35 return False
36 try:
37 from local_deep_research.database.session_context import (
38 get_user_db_session,
39 )
40 from local_deep_research.utilities.db_utils import (
41 get_settings_manager,
42 )
44 with get_user_db_session(username) as db_session:
45 settings_manager = get_settings_manager(db_session, username)
46 return bool(
47 settings_manager.get_setting(
48 "search.show_all_strategies", False
49 )
50 )
51 except Exception:
52 return False
54 # Page routes
55 @bp.route("/")
56 def news_page():
57 """Render the main news page."""
58 return render_template(
59 "pages/news.html",
60 strategies=get_available_strategies(
61 show_all=_show_all_strategies()
62 ),
63 )
65 @bp.route("/subscriptions")
66 def subscriptions_page():
67 """Render the subscriptions management page."""
68 return render_template("pages/subscriptions.html")
70 @bp.route("/subscriptions/new")
71 def new_subscription_page():
72 """Render the create subscription page."""
73 from flask import session
75 # Get username from session
76 username = session.get("username", "anonymous")
78 # Try to get settings from database, fall back to defaults
79 default_settings = {
80 "iterations": 3,
81 "questions_per_iteration": 5,
82 "search_engine": "auto",
83 "model_provider": "ollama",
84 "model": "",
85 "search_strategy": "source-based",
86 }
88 # Only try to get settings if user is logged in
89 if username != "anonymous":
90 # Load user settings using the extracted function
91 from local_deep_research.database.session_context import (
92 get_user_db_session,
93 )
95 with get_user_db_session(username) as db_session:
96 load_user_settings(default_settings, db_session, username)
98 return render_template(
99 "pages/news-subscription-form.html",
100 subscription=None,
101 default_settings=default_settings,
102 strategies=get_available_strategies(
103 show_all=_show_all_strategies()
104 ),
105 )
107 @bp.route("/subscriptions/<subscription_id>/edit")
108 def edit_subscription_page(subscription_id):
109 """Render the edit subscription page."""
110 from flask import session
112 # Get username from session
113 username = session.get("username", "anonymous")
115 # Load subscription data
116 subscription = None
117 default_settings = {
118 "iterations": 3,
119 "questions_per_iteration": 5,
120 "search_engine": "auto",
121 "model_provider": "ollama",
122 "model": "",
123 "search_strategy": "source-based",
124 }
126 try:
127 # Load the subscription using the API
128 subscription = api.get_subscription(subscription_id)
129 logger.info(
130 f"Loaded subscription {subscription_id}: {subscription}"
131 )
133 if not subscription:
134 logger.warning(f"Subscription {subscription_id} not found")
135 # Could redirect to 404 or subscriptions page
136 return render_template(
137 "pages/news-subscription-form.html",
138 subscription=None,
139 error="Subscription not found",
140 default_settings=default_settings,
141 strategies=get_available_strategies(
142 show_all=_show_all_strategies()
143 ),
144 )
146 # Load user's default settings if logged in
147 if username != "anonymous":
148 # Load user settings using the extracted function
149 from local_deep_research.database.session_context import (
150 get_user_db_session,
151 )
153 with get_user_db_session(username) as db_session:
154 load_user_settings(default_settings, db_session, username)
156 except Exception:
157 logger.exception(f"Error loading subscription {subscription_id}")
158 return render_template(
159 "pages/news-subscription-form.html",
160 subscription=None,
161 error="Error loading subscription",
162 default_settings=default_settings,
163 strategies=get_available_strategies(
164 show_all=_show_all_strategies()
165 ),
166 )
168 return render_template(
169 "pages/news-subscription-form.html",
170 subscription=subscription,
171 default_settings=default_settings,
172 strategies=get_available_strategies(
173 show_all=_show_all_strategies()
174 ),
175 )
177 # Health check
178 @bp.route("/health")
179 def health_check():
180 """Check if news system is healthy."""
181 try:
182 # Check if database is accessible
183 from .core.storage_manager import StorageManager
185 storage = StorageManager()
187 # Try a simple query
188 storage.get_user_feed("health_check", limit=1)
190 return jsonify(
191 {
192 "status": "healthy",
193 "enabled": True, # Default: get_db_setting("news.enabled", True)
194 "database": "connected",
195 }
196 )
197 except Exception:
198 logger.exception("Health check failed")
199 return jsonify(
200 {
201 "status": "unhealthy",
202 "error": "An internal error has occurred.",
203 }
204 ), 500
206 return bp
209def load_user_settings(default_settings, db_session=None, username=None):
210 """
211 Load user settings and update default_settings dictionary.
212 Extracted to avoid code duplication as suggested by djpetti.
214 Args:
215 default_settings: Dictionary to update with user settings
216 db_session: Database session for accessing settings
217 username: Username for settings context
218 """
219 if not db_session:
220 logger.warning("No database session provided, using defaults")
221 return
223 try:
224 from ..utilities.db_utils import get_settings_manager
226 settings_manager = get_settings_manager(db_session, username)
228 default_settings.update(
229 {
230 "iterations": settings_manager.get_setting(
231 "search.iterations", 3
232 ),
233 "questions_per_iteration": settings_manager.get_setting(
234 "search.questions_per_iteration", 5
235 ),
236 "search_engine": settings_manager.get_setting(
237 "search.tool", "auto"
238 ),
239 "model_provider": settings_manager.get_setting(
240 "llm.provider", "ollama"
241 ),
242 "model": settings_manager.get_setting("llm.model", ""),
243 "search_strategy": settings_manager.get_setting(
244 "search.search_strategy", "source-based"
245 ),
246 "custom_endpoint": settings_manager.get_setting(
247 "llm.openai_endpoint.url", ""
248 ),
249 }
250 )
251 except Exception:
252 logger.warning("Could not load user settings")
253 # Use defaults