Coverage for src/local_deep_research/web/routes/settings_routes.py: 85%
987 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Settings Routes Module
4This module handles all settings-related HTTP endpoints for the application.
6CHECKBOX HANDLING PATTERN:
7--------------------------
8This module supports TWO submission modes to handle checkboxes correctly:
10**MODE 1: AJAX/JSON Submission (Primary - /save_all_settings)**
11- JavaScript intercepts form submission with e.preventDefault()
12- Checkbox values read directly from DOM via checkbox.checked
13- Data sent as JSON: {"setting.key": true/false}
14- Hidden fallback inputs are managed but NOT used in this mode
15- Provides better UX with instant feedback and validation
17**MODE 2: Traditional POST Submission (Fallback - /save_settings)**
18- Used when JavaScript is disabled (accessibility/no-JS environments)
19- Browser submits form data naturally via request.form
20- Hidden fallback pattern CRITICAL here:
21 * Checked checkbox: Submits checkbox value, hidden input disabled
22 * Unchecked checkbox: Submits hidden input value "false"
23- Ensures unchecked checkboxes are captured (HTML limitation workaround)
25**Implementation Details:**
261. Each checkbox has `data-hidden-fallback` attribute → hidden input ID
272. checkbox_handler.js manages hidden input disabled state
283. AJAX mode: settings.js reads checkbox.checked directly (lines 2233-2240)
294. POST mode: Flask reads request.form including enabled hidden inputs
305. Both modes use convert_setting_value() for consistent boolean conversion
32**Why Both Patterns?**
33- AJAX: Better UX, immediate validation, no page reload
34- Traditional POST: Accessibility, progressive enhancement, JavaScript-free operation
35- Hidden inputs: Only meaningful for traditional POST, ignored in AJAX mode
37This dual-mode approach ensures the app works for all users while providing
38optimal experience when JavaScript is available.
39"""
41import platform
42import time
43from typing import Any, Optional, Tuple
44from datetime import UTC, datetime, timedelta, timezone
46import requests
47from flask import (
48 Blueprint,
49 flash,
50 jsonify,
51 redirect,
52 request,
53 session,
54 url_for,
55)
56from flask_wtf.csrf import generate_csrf
57from loguru import logger
59from ...config.constants import DEFAULT_OLLAMA_URL
60from ...llm.providers.base import normalize_provider
61from ...config.paths import get_data_directory, get_encrypted_database_path
62from ...database.models import Setting, SettingType
63from ...database.session_context import get_user_db_session
64from ...database.encrypted_db import db_manager
65from ...utilities.db_utils import get_settings_manager
66from ...utilities.url_utils import normalize_url
67from ...security.decorators import require_json_body
68from ..auth.decorators import login_required
69from ..utils.request_helpers import parse_bool_arg
70from ...security.rate_limiter import settings_limit
71from ...settings.manager import get_typed_setting_value, parse_boolean
72from ..services.settings_service import (
73 create_or_update_setting,
74 invalidate_settings_caches,
75 set_setting,
76)
77from ..utils.route_decorators import with_user_session
78from ..utils.templates import render_template_with_defaults
81from ...security import safe_get
82from ..warning_checks import calculate_warnings
84# Create a Blueprint for settings
85settings_bp = Blueprint("settings", __name__, url_prefix="/settings")
87# NOTE: Routes use session["username"] (not .get()) intentionally.
88# @login_required guarantees the key exists; direct access fails fast
89# if the decorator is ever removed.
91# Settings with dynamically populated options (excluded from validation)
92DYNAMIC_SETTINGS = ["llm.provider", "llm.model", "search.tool"]
94# Namespace validation for new setting creation via the web API.
95# Keys starting with any ALLOWED prefix may be created; any prefix in
96# BLOCKED takes precedence and is rejected even if it also matches an
97# allowed prefix. Existing keys (updates) bypass this check — it only
98# applies to creation of new DB rows through the three write routes.
99ALLOWED_SETTING_PREFIXES = frozenset(
100 {
101 "app.",
102 "backup.",
103 "benchmark.",
104 "chat.",
105 "database.",
106 "document_scheduler.",
107 "embeddings.",
108 "focused_iteration.",
109 "general.",
110 "langgraph_agent.",
111 "llm.",
112 "local_search_",
113 "mcp.",
114 "news.",
115 "notifications.",
116 "rag.",
117 "rate_limiting.",
118 "report.",
119 "research_library.",
120 "search.",
121 "ui.",
122 "web.",
123 }
124)
125BLOCKED_SETTING_PREFIXES = frozenset(
126 {
127 "auth.",
128 "bootstrap.",
129 "db_config.",
130 "security.",
131 "server.",
132 "testing.",
133 }
134)
137def _is_allowed_new_setting_key(key: str) -> bool:
138 """Return True if *key* is permitted to be created via the web API."""
139 if not isinstance(key, str) or not key or ".." in key:
140 return False
141 key = key.lower()
142 for prefix in BLOCKED_SETTING_PREFIXES:
143 if key.startswith(prefix):
144 return False
145 for prefix in ALLOWED_SETTING_PREFIXES:
146 if key.startswith(prefix):
147 return True
148 return False
151def _get_setting_from_session(key: str | None, default=None):
152 """Helper to get a setting using the current session context.
154 A ``None`` key returns ``default``. ``SettingsManager.get_setting``
155 treats ``key=None`` as "return all settings" (a real feature used by
156 ``get_all_settings`` for enumeration); this route helper is for
157 fetching one named setting and must not inherit that bulk-read
158 semantic. Without the guard, callers like the auto-discovered
159 model-listing loop would receive a dict of every setting (including
160 other providers' API keys) when a provider declares
161 ``api_key_setting = None`` (LM Studio, Llama.cpp).
162 """
163 if key is None:
164 return default
165 username = session.get("username")
166 with get_user_db_session(username) as db_session:
167 if db_session:
168 settings_manager = get_settings_manager(db_session, username)
169 return settings_manager.get_setting(key, default)
170 return default
173def validate_setting(
174 setting: Setting, value: Any
175) -> Tuple[bool, Optional[str]]:
176 """
177 Validate a setting value based on its type and constraints.
179 Args:
180 setting: The Setting object to validate against
181 value: The value to validate
183 Returns:
184 Tuple of (is_valid, error_message)
185 """
186 # Convert value to appropriate type first using SettingsManager's logic
187 value = get_typed_setting_value(
188 key=str(setting.key),
189 value=value,
190 ui_element=str(setting.ui_element),
191 default=None,
192 check_env=False,
193 )
195 # Validate based on UI element type
196 if setting.ui_element == "checkbox":
197 # After conversion, should be boolean
198 if not isinstance(value, bool):
199 return False, "Value must be a boolean"
201 elif setting.ui_element in ("number", "slider", "range"):
202 # After conversion, should be numeric
203 if not isinstance(value, (int, float)):
204 return False, "Value must be a number"
206 # Check min/max constraints if defined
207 if setting.min_value is not None and value < setting.min_value:
208 return False, f"Value must be at least {setting.min_value}"
209 if setting.max_value is not None and value > setting.max_value:
210 return False, f"Value must be at most {setting.max_value}"
212 elif setting.ui_element == "select":
213 # Check if value is in the allowed options
214 if setting.options:
215 # Skip options validation for dynamically populated dropdowns
216 if setting.key not in DYNAMIC_SETTINGS:
217 allowed_values = [
218 opt.get("value") if isinstance(opt, dict) else opt
219 for opt in list(setting.options) # type: ignore[arg-type]
220 ]
221 if value not in allowed_values:
222 return (
223 False,
224 f"Value must be one of: {', '.join(str(v) for v in allowed_values)}",
225 )
227 # All checks passed
228 return True, None
231def coerce_setting_for_write(key: str, value: Any, ui_element: str) -> Any:
232 """Coerce an incoming value to the correct type before writing to the DB.
234 All web routes that save settings should use this function to ensure
235 consistent type conversion.
237 No JSON pre-parsing (``json.loads``) is needed here because:
238 - ``get_typed_setting_value`` already parses JSON strings internally
239 via ``_parse_json_value`` (for ``ui_element="json"``) and
240 ``_parse_multiselect`` (for ``ui_element="multiselect"``).
241 - For JSON API endpoints, ``request.get_json()`` already delivers
242 dicts/lists as native Python objects.
243 - For ``ui_element="text"``, pre-parsing would corrupt data: a JSON
244 string like ``'{"k": "v"}'`` would become a dict, then ``str()``
245 would produce ``"{'k': 'v'}"`` (Python repr, not valid JSON).
246 """
247 # check_env=False: we are persisting a user-supplied value, not reading
248 # from an environment variable override. check_env=True (the default)
249 # would silently replace the user's value with an env var, which is
250 # incorrect on the write path.
251 return get_typed_setting_value(
252 key=key,
253 value=value,
254 ui_element=ui_element,
255 default=None,
256 check_env=False,
257 )
260@settings_bp.route("/", methods=["GET"])
261@login_required
262def settings_page():
263 """Main settings dashboard with links to specialized config pages"""
264 return render_template_with_defaults("settings_dashboard.html")
267@settings_bp.route("/save_all_settings", methods=["POST"])
268@login_required
269@settings_limit
270@require_json_body(
271 error_format="status", error_message="No settings data provided"
272)
273@with_user_session()
274def save_all_settings(db_session=None, settings_manager=None):
275 """Handle saving all settings at once from the unified settings page"""
276 try:
277 # Process JSON data
278 form_data = request.get_json()
279 if not form_data:
280 return (
281 jsonify(
282 {
283 "status": "error",
284 "message": "No settings data provided",
285 }
286 ),
287 400,
288 )
290 # Track validation errors
291 validation_errors = []
292 settings_by_type: dict[str, Any] = {}
294 # Track changes for logging
295 updated_settings = []
296 created_settings = []
298 # Store original values for better messaging
299 original_values = {}
301 # Fetch all settings once to avoid N+1 query problem
302 all_db_settings = {
303 setting.key: setting for setting in db_session.query(Setting).all()
304 }
306 # Filter out non-editable settings
307 non_editable_keys = [
308 key
309 for key in form_data.keys()
310 if key in all_db_settings and not all_db_settings[key].editable
311 ]
312 if non_editable_keys:
313 logger.warning(
314 f"Skipping non-editable settings: {non_editable_keys}"
315 )
316 for key in non_editable_keys:
317 del form_data[key]
319 # Update each setting
320 for key, value in form_data.items():
321 # Skip corrupted keys or empty strings as keys
322 if not key or not isinstance(key, str) or key.strip() == "":
323 continue
325 # Get the setting metadata from pre-fetched dict
326 current_setting = all_db_settings.get(key)
328 # EARLY VALIDATION: Convert checkbox values BEFORE any other processing
329 # This prevents incorrect triggering of corrupted value detection
330 if current_setting and current_setting.ui_element == "checkbox":
331 if not isinstance(value, bool):
332 logger.debug(
333 f"Converting checkbox {key} from {type(value).__name__} to bool: {value}"
334 )
335 value = parse_boolean(value)
336 form_data[key] = (
337 value # Update the form_data with converted value
338 )
340 # Store original value for messaging
341 if current_setting:
342 original_values[key] = current_setting.value
344 # Determine setting type and category
345 if key.startswith("llm."):
346 setting_type = SettingType.LLM
347 category = "llm_general"
348 if (
349 "temperature" in key
350 or "max_tokens" in key
351 or "batch" in key
352 or "layers" in key
353 ):
354 category = "llm_parameters"
355 elif key.startswith("search."):
356 setting_type = SettingType.SEARCH
357 category = "search_general"
358 if (
359 "iterations" in key
360 or "results" in key
361 or "region" in key
362 or "questions" in key
363 or "section" in key
364 ):
365 category = "search_parameters"
366 elif key.startswith("report."):
367 setting_type = SettingType.REPORT
368 category = "report_parameters"
369 elif key.startswith("database."):
370 setting_type = SettingType.DATABASE
371 category = "database_parameters"
372 elif key.startswith("app."):
373 setting_type = SettingType.APP
374 category = "app_interface"
375 elif key.startswith("chat."): 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true
376 setting_type = SettingType.CHAT
377 category = "chat"
378 else:
379 setting_type = None
380 category = None
382 # Special handling for corrupted or empty values
383 if value == "[object Object]" or (
384 isinstance(value, str)
385 and value.strip() in ["{}", "[]", "{", "["]
386 ):
387 if key.startswith("report."):
388 value = {}
389 else:
390 # Use default or null for other types
391 if key == "llm.model":
392 value = ""
393 elif key == "llm.provider":
394 value = "ollama"
395 elif key == "search.tool":
396 value = "auto"
397 elif key in ["app.theme", "app.default_theme"]:
398 value = "dark"
399 else:
400 value = None
402 logger.warning(f"Corrected corrupted value for {key}: {value}")
403 # NOTE: No JSON pre-parsing is done here. After the
404 # corruption replacement above, values are Python dicts
405 # (e.g. {}), hardcoded strings, or None — none are JSON
406 # strings that need parsing. Type conversion below via
407 # coerce_setting_for_write() handles everything; that
408 # function delegates to get_typed_setting_value() which
409 # already parses JSON internally for "json" and
410 # "multiselect" ui_elements.
412 if current_setting:
413 # Coerce to correct Python type (e.g. str "5" → int 5
414 # for number settings, str "true" → bool for checkboxes).
415 converted_value = coerce_setting_for_write(
416 key=current_setting.key,
417 value=value,
418 ui_element=current_setting.ui_element,
419 )
421 # Validate the setting
422 is_valid, error_message = validate_setting(
423 current_setting, converted_value
424 )
426 if is_valid:
427 # Save the converted setting using the same session
428 success = set_setting(
429 key, converted_value, db_session=db_session
430 )
431 if success: 431 ↛ 435line 431 didn't jump to line 435 because the condition on line 431 was always true
432 updated_settings.append(key)
434 # Track settings by type for exporting
435 if current_setting.type not in settings_by_type:
436 settings_by_type[current_setting.type] = []
437 settings_by_type[current_setting.type].append(
438 current_setting
439 )
440 else:
441 # Add to validation errors
442 validation_errors.append(
443 {
444 "key": key,
445 "name": current_setting.name,
446 "error": error_message,
447 }
448 )
449 else:
450 # Namespace validation: reject new keys outside allowed prefixes.
451 if not _is_allowed_new_setting_key(key):
452 logger.warning(
453 "Security: Rejected setting outside allowed namespaces: {!r} (user={!r})",
454 key,
455 session["username"],
456 )
457 validation_errors.append(
458 {
459 "key": key,
460 "name": key,
461 "error": "Creating settings under this namespace is not allowed.",
462 }
463 )
464 continue
466 # Create a new setting
467 new_setting = {
468 "key": key,
469 "value": value,
470 "type": setting_type.value.lower()
471 if setting_type is not None
472 else "app",
473 "name": key.split(".")[-1].replace("_", " ").title(),
474 "description": f"Setting for {key}",
475 "category": category,
476 "ui_element": "text", # Default UI element
477 }
479 # Determine better UI element based on value type
480 if isinstance(value, bool):
481 new_setting["ui_element"] = "checkbox"
482 elif isinstance(value, (int, float)) and not isinstance(
483 value, bool
484 ):
485 new_setting["ui_element"] = "number"
486 elif isinstance(value, (dict, list)):
487 new_setting["ui_element"] = "textarea"
489 # Create the setting
490 db_setting = create_or_update_setting(
491 new_setting, db_session=db_session
492 )
494 if db_setting:
495 created_settings.append(key)
496 # Track settings by type for exporting
497 if db_setting.type not in settings_by_type: 497 ↛ 499line 497 didn't jump to line 499 because the condition on line 497 was always true
498 settings_by_type[db_setting.type] = []
499 settings_by_type[db_setting.type].append(db_setting)
500 else:
501 validation_errors.append(
502 {
503 "key": key,
504 "name": new_setting["name"],
505 "error": "Failed to create setting",
506 }
507 )
509 # Report validation errors if any
510 if validation_errors:
511 return (
512 jsonify(
513 {
514 "status": "error",
515 "message": "Validation errors",
516 "errors": validation_errors,
517 }
518 ),
519 400,
520 )
522 # Get all settings to return to the client for proper state update
523 all_settings = {}
524 for setting in db_session.query(Setting).all():
525 # Convert enum to string if present
526 setting_type = setting.type
527 if hasattr(setting_type, "value"):
528 setting_type = setting_type.value
530 all_settings[setting.key] = {
531 "value": setting.value,
532 "name": setting.name,
533 "description": setting.description,
534 "type": setting_type,
535 "category": setting.category,
536 "ui_element": setting.ui_element,
537 "editable": setting.editable,
538 "options": setting.options,
539 "visible": setting.visible,
540 "min_value": setting.min_value,
541 "max_value": setting.max_value,
542 "step": setting.step,
543 }
545 # Customize the success message based on what changed
546 success_message = ""
547 if len(updated_settings) == 1:
548 # For a single update, provide more specific info about what changed
549 key = updated_settings[0]
550 # Reuse the already-fetched setting from our pre-fetched dict
551 updated_setting = all_db_settings.get(key)
552 name = (
553 updated_setting.name
554 if updated_setting
555 else key.split(".")[-1].replace("_", " ").title()
556 )
558 # Format the message
559 if key in original_values: 559 ↛ 573line 559 didn't jump to line 573 because the condition on line 559 was always true
560 new_value = updated_setting.value if updated_setting else None
562 # If it's a boolean, use "enabled/disabled" language
563 if isinstance(new_value, bool):
564 state = "enabled" if new_value else "disabled"
565 success_message = f"{name} {state}"
566 else:
567 # For non-boolean values
568 if isinstance(new_value, (dict, list)):
569 success_message = f"{name} updated"
570 else:
571 success_message = f"{name} updated"
572 else:
573 success_message = f"{name} updated"
574 else:
575 # Multiple settings or generic message
576 success_message = f"Settings saved successfully ({len(updated_settings)} updated, {len(created_settings)} created)"
578 # Check if any warning-affecting settings were changed and include warnings
579 response_data = {
580 "status": "success",
581 "message": success_message,
582 "updated": updated_settings,
583 "created": created_settings,
584 "settings": all_settings,
585 }
587 warning_affecting_keys = [
588 "llm.provider",
589 "search.tool",
590 "search.iterations",
591 "search.questions_per_iteration",
592 "llm.local_context_window_size",
593 "llm.context_window_unrestricted",
594 "llm.context_window_size",
595 ]
597 # Check if any warning-affecting settings were changed
598 if any(
599 key in warning_affecting_keys
600 for key in updated_settings + created_settings
601 ):
602 warnings = calculate_warnings()
603 response_data["warnings"] = warnings
604 logger.info(
605 f"Bulk settings update affected warning keys, calculated {len(warnings)} warnings"
606 )
608 invalidate_settings_caches(session["username"])
609 return jsonify(response_data)
611 except Exception:
612 logger.exception("Error saving settings")
613 return (
614 jsonify(
615 {
616 "status": "error",
617 "message": "An internal error occurred while saving settings.",
618 }
619 ),
620 500,
621 )
624@settings_bp.route("/reset_to_defaults", methods=["POST"])
625@login_required
626@settings_limit
627@with_user_session()
628def reset_to_defaults(db_session=None, settings_manager=None):
629 """Reset all settings to their default values"""
630 try:
631 settings_manager.load_from_defaults_file()
633 logger.info("Successfully imported settings from default files")
635 except Exception:
636 logger.exception("Error importing default settings")
637 return jsonify(
638 {
639 "status": "error",
640 "message": "Failed to reset settings to defaults",
641 }
642 ), 500
644 invalidate_settings_caches(session["username"])
645 return jsonify(
646 {
647 "status": "success",
648 "message": "All settings have been reset to default values",
649 }
650 )
653@settings_bp.route("/save_settings", methods=["POST"])
654@login_required
655@settings_limit
656@with_user_session()
657def save_settings(db_session=None, settings_manager=None):
658 """Save all settings from the form using POST method - fallback when JavaScript is disabled"""
659 try:
660 # Get form data
661 form_data = request.form.to_dict()
663 # Remove CSRF token from the data
664 form_data.pop("csrf_token", None)
666 updated_count = 0
667 failed_count = 0
668 rejected_count = 0
670 # Fetch all settings once to avoid N+1 query problem
671 all_db_settings = {
672 setting.key: setting for setting in db_session.query(Setting).all()
673 }
675 # Filter out non-editable settings
676 non_editable_keys = [
677 key
678 for key in form_data.keys()
679 if key in all_db_settings and not all_db_settings[key].editable
680 ]
681 if non_editable_keys:
682 logger.warning(
683 f"Skipping non-editable settings: {non_editable_keys}"
684 )
685 for key in non_editable_keys:
686 del form_data[key]
688 # Process each setting
689 for key, value in form_data.items():
690 try:
691 # Get the setting from pre-fetched dict
692 db_setting = all_db_settings.get(key)
694 # Namespace validation: reject new keys outside allowed prefixes.
695 # Existing keys (updates) bypass this check — it only applies
696 # to creation of brand-new rows through this form-POST route.
697 if db_setting is None and not _is_allowed_new_setting_key(key):
698 logger.warning(
699 "Security: Rejected setting outside allowed namespaces: {!r} (user={!r})",
700 key,
701 session["username"],
702 )
703 rejected_count += 1
704 continue
706 # Coerce form POST string to correct Python type.
707 if db_setting:
708 value = coerce_setting_for_write(
709 key=db_setting.key,
710 value=value,
711 ui_element=db_setting.ui_element,
712 )
714 # Save the setting
715 if settings_manager.set_setting(key, value, commit=False):
716 updated_count += 1
717 else:
718 failed_count += 1
719 logger.warning(f"Failed to save setting {key}")
721 except Exception:
722 logger.exception(f"Error saving setting {key}")
723 failed_count += 1
725 # Commit all changes at once
726 try:
727 db_session.commit()
729 flash(
730 f"Settings saved successfully! Updated {updated_count} settings.",
731 "success",
732 )
733 if failed_count > 0:
734 flash(
735 f"Warning: {failed_count} settings failed to save.",
736 "warning",
737 )
738 if rejected_count > 0:
739 flash(
740 f"Rejected {rejected_count} settings (unknown namespace). "
741 "This may indicate a bug or an attempted injection.",
742 "error",
743 )
744 invalidate_settings_caches(session["username"])
746 except Exception:
747 db_session.rollback()
748 logger.exception("Failed to commit settings")
749 flash("Error saving settings. Please try again.", "error")
751 return redirect(url_for("settings.settings_page"))
753 except Exception:
754 logger.exception("Error in save_settings")
755 flash("An internal error occurred while saving settings.", "error")
756 return redirect(url_for("settings.settings_page"))
759# API Routes
760@settings_bp.route("/api", methods=["GET"])
761@login_required
762@with_user_session()
763def api_get_all_settings(db_session=None, settings_manager=None):
764 """Get all settings"""
765 try:
766 # Get query parameters
767 category = request.args.get("category")
769 # Get settings
770 settings = settings_manager.get_all_settings()
772 # Filter by category if requested
773 if category:
774 # Need to get all setting details to check category
775 db_settings = db_session.query(Setting).all()
776 category_keys = [
777 s.key for s in db_settings if s.category == category
778 ]
780 # Filter settings by keys
781 settings = {
782 key: value
783 for key, value in settings.items()
784 if key in category_keys
785 }
787 return jsonify({"status": "success", "settings": settings})
788 except Exception:
789 logger.exception("Error getting settings")
790 return jsonify({"error": "Failed to retrieve settings"}), 500
793@settings_bp.route("/api/<path:key>", methods=["GET"])
794@login_required
795@with_user_session()
796def api_get_db_setting(key, db_session=None, settings_manager=None):
797 """Get a specific setting by key from DB, falling back to defaults."""
798 try:
799 # Get setting from database using the same session
800 db_setting = (
801 db_session.query(Setting).filter(Setting.key == key).first()
802 )
804 if db_setting:
805 # Return full setting details from DB
806 setting_data = {
807 "key": db_setting.key,
808 "value": db_setting.value,
809 "type": db_setting.type
810 if isinstance(db_setting.type, str)
811 else db_setting.type.value,
812 "name": db_setting.name,
813 "description": db_setting.description,
814 "category": db_setting.category,
815 "ui_element": db_setting.ui_element,
816 "options": db_setting.options,
817 "min_value": db_setting.min_value,
818 "max_value": db_setting.max_value,
819 "step": db_setting.step,
820 "visible": db_setting.visible,
821 "editable": db_setting.editable,
822 }
823 return jsonify(setting_data)
825 # Not in DB — check defaults so this endpoint is consistent
826 # with GET /settings/api which includes default settings
827 default_meta = settings_manager.default_settings.get(key)
828 if default_meta: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true
829 setting_data = {
830 "key": key,
831 "value": default_meta.get("value"),
832 "type": default_meta.get("type", "APP"),
833 "name": default_meta.get("name", key),
834 "description": default_meta.get("description"),
835 "category": default_meta.get("category"),
836 "ui_element": default_meta.get("ui_element", "text"),
837 "options": default_meta.get("options"),
838 "min_value": default_meta.get("min_value"),
839 "max_value": default_meta.get("max_value"),
840 "step": default_meta.get("step"),
841 "visible": default_meta.get("visible", True),
842 "editable": default_meta.get("editable", True),
843 }
844 return jsonify(setting_data)
846 return jsonify({"error": f"Setting not found: {key}"}), 404
847 except Exception:
848 logger.exception(f"Error getting setting {key}")
849 return jsonify({"error": "Failed to retrieve settings"}), 500
852@settings_bp.route("/api/<path:key>", methods=["PUT"])
853@login_required
854@settings_limit
855@require_json_body(error_message="No data provided")
856@with_user_session(include_settings_manager=False)
857def api_update_setting(key, db_session=None):
858 """Update a setting"""
859 try:
860 # Get request data
861 data = request.get_json()
862 value = data.get("value")
863 if value is None:
864 return jsonify({"error": "No value provided"}), 400
866 # Check if setting exists
867 db_setting = (
868 db_session.query(Setting).filter(Setting.key == key).first()
869 )
871 if db_setting:
872 # Check if setting is editable
873 if not db_setting.editable:
874 return jsonify({"error": f"Setting {key} is not editable"}), 403
876 # Coerce to correct Python type before saving.
877 # Without this, values from JSON API requests are stored
878 # as-is (e.g. string "5" instead of int 5 for number
879 # settings, string "true" instead of bool for checkboxes).
880 value = coerce_setting_for_write(
881 key=db_setting.key,
882 value=value,
883 ui_element=db_setting.ui_element,
884 )
886 # Validate the setting (matches save_all_settings pattern)
887 is_valid, error_message = validate_setting(db_setting, value)
888 if not is_valid:
889 logger.warning(
890 f"Validation failed for setting {key}: {error_message}"
891 )
892 return jsonify(
893 {"error": f"Invalid value for setting {key}"}
894 ), 400
896 # Update setting
897 # Pass the db_session to avoid session lookup issues
898 success = set_setting(key, value, db_session=db_session)
899 if success:
900 response_data: dict[str, Any] = {
901 "message": f"Setting {key} updated successfully"
902 }
904 # If this is a key that affects warnings, include warning calculations
905 warning_affecting_keys = [
906 "llm.provider",
907 "search.tool",
908 "search.iterations",
909 "search.questions_per_iteration",
910 "llm.local_context_window_size",
911 "llm.context_window_unrestricted",
912 "llm.context_window_size",
913 ]
915 if key in warning_affecting_keys:
916 warnings = calculate_warnings()
917 response_data["warnings"] = warnings
918 logger.debug(
919 f"Setting {key} changed to {value}, calculated {len(warnings)} warnings"
920 )
922 invalidate_settings_caches(session["username"])
923 return jsonify(response_data)
924 return jsonify({"error": f"Failed to update setting {key}"}), 500
926 # Namespace validation: reject new keys outside allowed prefixes.
927 if not _is_allowed_new_setting_key(key):
928 logger.warning(
929 "Security: Rejected setting outside allowed namespaces: {!r} (user={!r})",
930 key,
931 session["username"],
932 )
933 return jsonify(
934 {
935 "error": f"Creating settings under this namespace is not allowed: {key}"
936 }
937 ), 400
939 # Create new setting with default metadata
940 setting_dict = {
941 "key": key,
942 "value": value,
943 "name": key.split(".")[-1].replace("_", " ").title(),
944 "description": f"Setting for {key}",
945 }
947 # Add additional metadata if provided.
948 # 'visible' and 'editable' are system-controlled — not accepted from callers.
949 for field in [
950 "type",
951 "name",
952 "description",
953 "category",
954 "ui_element",
955 "options",
956 "min_value",
957 "max_value",
958 "step",
959 ]:
960 if field in data:
961 setting_dict[field] = data[field]
963 # Create setting
964 db_setting = create_or_update_setting(
965 setting_dict, db_session=db_session
966 )
968 if db_setting:
969 invalidate_settings_caches(session["username"])
970 return (
971 jsonify(
972 {
973 "message": f"Setting {key} created successfully",
974 "setting": {
975 "key": db_setting.key,
976 "value": db_setting.value,
977 "type": db_setting.type.value,
978 "name": db_setting.name,
979 },
980 }
981 ),
982 201,
983 )
984 return jsonify({"error": f"Failed to create setting {key}"}), 500
985 except Exception:
986 logger.exception(f"Error updating setting {key}")
987 return jsonify({"error": "Failed to update setting"}), 500
990@settings_bp.route("/api/<path:key>", methods=["DELETE"])
991@login_required
992@with_user_session()
993def api_delete_setting(key, db_session=None, settings_manager=None):
994 """Delete a setting"""
995 try:
996 # Check if setting exists
997 db_setting = (
998 db_session.query(Setting).filter(Setting.key == key).first()
999 )
1000 if not db_setting:
1001 return jsonify({"error": f"Setting not found: {key}"}), 404
1003 # Check if setting is editable
1004 if not db_setting.editable:
1005 return jsonify({"error": f"Setting {key} is not editable"}), 403
1007 # Delete setting
1008 success = settings_manager.delete_setting(key)
1009 if success:
1010 invalidate_settings_caches(session["username"])
1011 return jsonify({"message": f"Setting {key} deleted successfully"})
1012 return jsonify({"error": f"Failed to delete setting {key}"}), 500
1013 except Exception:
1014 logger.exception(f"Error deleting setting {key}")
1015 return jsonify({"error": "Failed to delete setting"}), 500
1018@settings_bp.route("/api/import", methods=["POST"])
1019@login_required
1020@settings_limit
1021@with_user_session()
1022def api_import_settings(db_session=None, settings_manager=None):
1023 """Import settings from defaults file"""
1024 try:
1025 settings_manager.load_from_defaults_file()
1027 invalidate_settings_caches(session["username"])
1028 return jsonify({"message": "Settings imported successfully"})
1029 except Exception:
1030 logger.exception("Error importing settings")
1031 return jsonify({"error": "Failed to import settings"}), 500
1034@settings_bp.route("/api/categories", methods=["GET"])
1035@login_required
1036@with_user_session(include_settings_manager=False)
1037def api_get_categories(db_session=None):
1038 """Get all setting categories"""
1039 try:
1040 # Get all distinct categories
1041 categories = db_session.query(Setting.category).distinct().all()
1042 category_list = [c[0] for c in categories if c[0] is not None]
1044 return jsonify({"categories": category_list})
1045 except Exception:
1046 logger.exception("Error getting categories")
1047 return jsonify({"error": "Failed to retrieve settings"}), 500
1050@settings_bp.route("/api/types", methods=["GET"])
1051@login_required
1052def api_get_types():
1053 """Get all setting types"""
1054 try:
1055 # Get all setting types
1056 types = [t.value for t in SettingType]
1057 return jsonify({"types": types})
1058 except Exception:
1059 logger.exception("Error getting types")
1060 return jsonify({"error": "Failed to retrieve settings"}), 500
1063@settings_bp.route("/api/ui_elements", methods=["GET"])
1064@login_required
1065def api_get_ui_elements():
1066 """Get all UI element types"""
1067 try:
1068 # Define supported UI element types
1069 ui_elements = [
1070 "text",
1071 "select",
1072 "checkbox",
1073 "slider",
1074 "number",
1075 "textarea",
1076 "color",
1077 "date",
1078 "file",
1079 "password",
1080 ]
1082 return jsonify({"ui_elements": ui_elements})
1083 except Exception:
1084 logger.exception("Error getting UI elements")
1085 return jsonify({"error": "Failed to retrieve settings"}), 500
1088@settings_bp.route("/api/available-models", methods=["GET"])
1089@login_required
1090def api_get_available_models():
1091 """Get available LLM models from various providers"""
1092 endpoint_start = time.perf_counter()
1093 try:
1094 from ...database.models import ProviderModel
1096 # Check if force_refresh is requested
1097 force_refresh = parse_bool_arg("force_refresh")
1099 # Get all auto-discovered providers (show all so users can discover
1100 # and configure providers they haven't set up yet)
1101 from ...llm.providers import get_discovered_provider_options
1103 provider_options = get_discovered_provider_options()
1105 # Add remaining hardcoded providers (complex local providers not yet migrated)
1106 provider_options.extend(
1107 [
1108 {
1109 "value": "LLAMACPP",
1110 "label": "Llama.cpp (Local GGUF files only)",
1111 },
1112 ]
1113 )
1115 # Available models by provider
1116 providers: dict[str, Any] = {}
1118 # Check database cache first (unless force_refresh is True)
1119 if not force_refresh:
1120 try:
1121 # Define cache expiration (24 hours)
1122 cache_expiry = datetime.now(UTC) - timedelta(hours=24)
1124 # Get cached models from database
1125 username = session["username"]
1126 with get_user_db_session(username) as db_session:
1127 cached_models = (
1128 db_session.query(ProviderModel)
1129 .filter(ProviderModel.last_updated > cache_expiry)
1130 .all()
1131 )
1133 if cached_models: 1133 ↛ 1134line 1133 didn't jump to line 1134 because the condition on line 1133 was never true
1134 logger.info(
1135 f"Found {len(cached_models)} cached models in database"
1136 )
1138 # Group models by provider
1139 for model in cached_models:
1140 provider_key = (
1141 f"{normalize_provider(model.provider)}_models"
1142 )
1143 if provider_key not in providers:
1144 providers[provider_key] = []
1146 providers[provider_key].append(
1147 {
1148 "value": model.model_key,
1149 "label": model.model_label,
1150 "provider": model.provider.upper(),
1151 }
1152 )
1154 # If we have cached data for all providers, return it
1155 if providers:
1156 _log_available_models_duration(
1157 endpoint_start, cache_hit=True
1158 )
1159 logger.info("Returning cached models from database")
1160 return jsonify(
1161 {
1162 "provider_options": provider_options,
1163 "providers": providers,
1164 }
1165 )
1167 except Exception:
1168 logger.warning("Error reading cached models from database")
1169 # Continue to fetch fresh data
1171 # Try to get Ollama models
1172 ollama_models = []
1173 try:
1174 import json
1175 import re
1177 import requests
1179 # Try to query the Ollama API directly
1180 try:
1181 logger.info("Attempting to connect to Ollama API")
1183 raw_base_url = _get_setting_from_session(
1184 "llm.ollama.url", DEFAULT_OLLAMA_URL
1185 )
1186 base_url = (
1187 normalize_url(raw_base_url)
1188 if raw_base_url
1189 else DEFAULT_OLLAMA_URL
1190 )
1192 ollama_fetch_start = time.perf_counter()
1193 ollama_response = safe_get(
1194 f"{base_url}/api/tags",
1195 timeout=5,
1196 allow_localhost=True,
1197 allow_private_ips=True,
1198 )
1199 ollama_fetch_ms = (
1200 time.perf_counter() - ollama_fetch_start
1201 ) * 1000
1202 if ollama_fetch_ms > 1000: 1202 ↛ 1203line 1202 didn't jump to line 1203 because the condition on line 1202 was never true
1203 logger.info(
1204 f"Ollama /api/tags fetch took {ollama_fetch_ms:.0f}ms"
1205 )
1206 else:
1207 logger.debug(
1208 f"Ollama /api/tags fetch took {ollama_fetch_ms:.0f}ms"
1209 )
1211 logger.debug(
1212 f"Ollama API response: Status {ollama_response.status_code}"
1213 )
1215 # Try to parse the response even if status code is not 200 to help with debugging
1216 response_text = ollama_response.text
1217 logger.debug(
1218 f"Ollama API raw response: {response_text[:500]}..."
1219 )
1221 if ollama_response.status_code == 200: 1221 ↛ 1291line 1221 didn't jump to line 1291 because the condition on line 1221 was always true
1222 try:
1223 ollama_data = ollama_response.json()
1224 logger.debug(
1225 f"Ollama API JSON data: {json.dumps(ollama_data)[:500]}..."
1226 )
1228 if "models" in ollama_data: 1228 ↛ 1258line 1228 didn't jump to line 1258 because the condition on line 1228 was always true
1229 # Format for newer Ollama API
1230 logger.info(
1231 f"Found {len(ollama_data.get('models', []))} models in newer Ollama API format"
1232 )
1233 for model in ollama_data.get("models", []): 1233 ↛ 1235line 1233 didn't jump to line 1235 because the loop on line 1233 never started
1234 # Extract name correctly from the model object
1235 name = model.get("name", "")
1236 if name:
1237 # Improved display name formatting
1238 display_name = re.sub(
1239 r"[:/]", " ", name
1240 ).strip()
1241 display_name = " ".join(
1242 word.capitalize()
1243 for word in display_name.split()
1244 )
1245 # Create the model entry with value and label
1246 ollama_models.append(
1247 {
1248 "value": name, # Original model name as value (for API calls)
1249 "label": f"{display_name} (Ollama)", # Pretty name as label
1250 "provider": "ollama", # Add provider field for consistency
1251 }
1252 )
1253 logger.debug(
1254 f"Added Ollama model: {name} -> {display_name}"
1255 )
1256 else:
1257 # Format for older Ollama API
1258 logger.info(
1259 f"Found {len(ollama_data)} models in older Ollama API format"
1260 )
1261 for model in ollama_data:
1262 name = model.get("name", "")
1263 if name:
1264 # Improved display name formatting
1265 display_name = re.sub(
1266 r"[:/]", " ", name
1267 ).strip()
1268 display_name = " ".join(
1269 word.capitalize()
1270 for word in display_name.split()
1271 )
1272 ollama_models.append(
1273 {
1274 "value": name,
1275 "label": f"{display_name} (Ollama)",
1276 "provider": "ollama", # Add provider field for consistency
1277 }
1278 )
1279 logger.debug(
1280 f"Added Ollama model: {name} -> {display_name}"
1281 )
1283 except json.JSONDecodeError as json_err:
1284 logger.exception(
1285 f"Failed to parse Ollama API response as JSON: {json_err}"
1286 )
1287 raise ValueError(
1288 f"Ollama API returned invalid JSON: {json_err}"
1289 )
1290 else:
1291 logger.warning(
1292 f"Ollama API returned non-200 status code: {ollama_response.status_code}"
1293 )
1294 raise ValueError(
1295 f"Ollama API returned status code {ollama_response.status_code}"
1296 )
1298 except requests.exceptions.RequestException:
1299 logger.warning("Could not connect to Ollama API")
1300 # No fallback models - just return empty list
1301 logger.info("Ollama not available - no models to display")
1302 ollama_models = []
1304 # Always set the ollama_models in providers, whether we got real or fallback models
1305 providers["ollama_models"] = ollama_models
1306 logger.info(f"Final Ollama models count: {len(ollama_models)}")
1308 # Log some model names for debugging
1309 if ollama_models: 1309 ↛ 1310line 1309 didn't jump to line 1310 because the condition on line 1309 was never true
1310 model_names = [m["value"] for m in ollama_models[:5]]
1311 logger.info(f"Sample Ollama models: {', '.join(model_names)}")
1313 except Exception:
1314 logger.exception("Error getting Ollama models")
1315 # No fallback models - just return empty list
1316 logger.info("Error getting Ollama models - no models to display")
1317 providers["ollama_models"] = []
1319 # Note: OpenAI-Compatible Endpoint models are fetched via auto-discovery
1320 # (see the auto-discovery loop below which handles OPENAI_ENDPOINT provider)
1322 # Get OpenAI models using the OpenAI package
1323 openai_models = []
1324 try:
1325 logger.info(
1326 "Attempting to connect to OpenAI API using OpenAI package"
1327 )
1329 # Get the API key from settings
1330 api_key = _get_setting_from_session("llm.openai.api_key", "")
1332 if api_key: 1332 ↛ 1333line 1332 didn't jump to line 1333 because the condition on line 1332 was never true
1333 import openai
1334 from openai import OpenAI
1336 # Create OpenAI client
1337 client = OpenAI(api_key=api_key)
1339 try:
1340 # Fetch models using the client
1341 logger.debug("Fetching models from OpenAI API")
1342 openai_fetch_start = time.perf_counter()
1343 models_response = client.models.list()
1344 openai_fetch_ms = (
1345 time.perf_counter() - openai_fetch_start
1346 ) * 1000
1347 if openai_fetch_ms > 1000:
1348 logger.info(
1349 f"OpenAI models.list() took {openai_fetch_ms:.0f}ms"
1350 )
1351 else:
1352 logger.debug(
1353 f"OpenAI models.list() took {openai_fetch_ms:.0f}ms"
1354 )
1356 # Process models from the response
1357 for model in models_response.data:
1358 model_id = model.id
1359 if model_id:
1360 # Create a clean display name
1361 display_name = model_id.replace("-", " ").strip()
1362 display_name = " ".join(
1363 word.capitalize()
1364 for word in display_name.split()
1365 )
1367 openai_models.append(
1368 {
1369 "value": model_id,
1370 "label": f"{display_name} (OpenAI)",
1371 "provider": "openai",
1372 }
1373 )
1374 logger.debug(
1375 f"Added OpenAI model: {model_id} -> {display_name}"
1376 )
1378 # Keep original order from OpenAI - their models are returned in a
1379 # meaningful order (newer/more capable models first)
1381 except openai.APIError as api_err:
1382 logger.exception(f"OpenAI API error: {api_err!s}")
1383 logger.info("No OpenAI models found due to API error")
1385 else:
1386 logger.info(
1387 "OpenAI API key not configured, no models available"
1388 )
1390 except Exception:
1391 logger.exception("Error getting OpenAI models")
1392 logger.info("No OpenAI models available due to error")
1394 # Always set the openai_models in providers (will be empty array if no models found)
1395 providers["openai_models"] = openai_models
1396 logger.info(f"Final OpenAI models count: {len(openai_models)}")
1398 # Try to get Anthropic models using the Anthropic package
1399 anthropic_models = []
1400 try:
1401 logger.info(
1402 "Attempting to connect to Anthropic API using Anthropic package"
1403 )
1405 # Get the API key from settings
1406 api_key = _get_setting_from_session("llm.anthropic.api_key", "")
1408 if api_key:
1409 # Import Anthropic package here to avoid dependency issues if not installed
1410 from anthropic import Anthropic
1412 # Create Anthropic client
1413 anthropic_client = Anthropic(api_key=api_key)
1415 try:
1416 # Fetch models using the client
1417 logger.debug("Fetching models from Anthropic API")
1418 models_response = anthropic_client.models.list()
1420 # Process models from the response
1421 for model in models_response.data:
1422 model_id = model.id
1423 if model_id:
1424 # Create a clean display name
1425 display_name = model_id.replace("-", " ").strip()
1426 display_name = " ".join(
1427 word.capitalize()
1428 for word in display_name.split()
1429 )
1431 anthropic_models.append(
1432 {
1433 "value": model_id,
1434 "label": f"{display_name} (Anthropic)",
1435 "provider": "anthropic",
1436 }
1437 )
1438 logger.debug(
1439 f"Added Anthropic model: {model_id} -> {display_name}"
1440 )
1442 except Exception as api_err:
1443 logger.exception(f"Anthropic API error: {api_err!s}")
1444 else:
1445 logger.info("Anthropic API key not configured")
1447 except ImportError:
1448 logger.warning(
1449 "Anthropic package not installed. No models will be available."
1450 )
1451 except Exception:
1452 logger.exception("Error getting Anthropic models")
1454 # Set anthropic_models in providers (could be empty if API call failed)
1455 providers["anthropic_models"] = anthropic_models
1456 logger.info(f"Final Anthropic models count: {len(anthropic_models)}")
1458 # Fetch models from auto-discovered providers
1459 from ...llm.providers import discover_providers
1461 discovered_providers = discover_providers()
1463 for provider_key, provider_info in discovered_providers.items():
1464 provider_models = []
1465 try:
1466 logger.info(
1467 f"Fetching models from {provider_info.provider_name}"
1468 )
1470 # Get the provider class
1471 provider_class = provider_info.provider_class
1473 # Get API key if configured
1474 api_key = _get_setting_from_session(
1475 provider_class.api_key_setting, ""
1476 )
1478 # Get base URL if provider has configurable URL
1479 provider_base_url: str | None = None
1480 if (
1481 hasattr(provider_class, "url_setting")
1482 and provider_class.url_setting
1483 ):
1484 provider_base_url = _get_setting_from_session(
1485 provider_class.url_setting, ""
1486 )
1488 # Use the provider's list_models_for_api method
1489 models = provider_class.list_models_for_api(
1490 api_key, provider_base_url
1491 )
1493 # Format models for the API response
1494 for model in models:
1495 provider_models.append(
1496 {
1497 "value": model["value"],
1498 "label": model[
1499 "label"
1500 ], # Use provider's label as-is
1501 "provider": provider_key,
1502 }
1503 )
1505 logger.info(
1506 f"Successfully fetched {len(provider_models)} models from {provider_info.provider_name}"
1507 )
1509 except Exception:
1510 logger.exception(
1511 f"Error getting {provider_info.provider_name} models"
1512 )
1514 # Set models in providers dict using lowercase key
1515 providers[f"{normalize_provider(provider_key)}_models"] = (
1516 provider_models
1517 )
1518 logger.info(
1519 f"Final {provider_key} models count: {len(provider_models)}"
1520 )
1522 # Save fetched models to database cache
1523 if force_refresh or providers: 1523 ↛ 1572line 1523 didn't jump to line 1572 because the condition on line 1523 was always true
1524 # We fetched fresh data, save it to database
1525 username = session["username"]
1526 with get_user_db_session(username) as db_session:
1527 try:
1528 if force_refresh:
1529 # When force refresh, clear ALL cached models to remove any stale data
1530 # from old code versions or deleted providers
1531 deleted_count = db_session.query(ProviderModel).delete()
1532 logger.info(
1533 f"Force refresh: cleared all {deleted_count} cached models"
1534 )
1535 else:
1536 # Clear old cache entries only for providers we're updating
1537 for provider_key in providers:
1538 provider_name = provider_key.replace(
1539 "_models", ""
1540 ).upper()
1541 db_session.query(ProviderModel).filter(
1542 ProviderModel.provider == provider_name
1543 ).delete()
1545 # Insert new models
1546 for provider_key, models in providers.items():
1547 provider_name = provider_key.replace(
1548 "_models", ""
1549 ).upper()
1550 for model in models:
1551 if ( 1551 ↛ 1550line 1551 didn't jump to line 1550 because the condition on line 1551 was always true
1552 isinstance(model, dict)
1553 and "value" in model
1554 and "label" in model
1555 ):
1556 new_model = ProviderModel(
1557 provider=provider_name,
1558 model_key=model["value"],
1559 model_label=model["label"],
1560 last_updated=datetime.now(UTC),
1561 )
1562 db_session.add(new_model)
1564 db_session.commit()
1565 logger.info("Successfully cached models to database")
1567 except Exception:
1568 logger.exception("Error saving models to database cache")
1569 db_session.rollback()
1571 # Return all options
1572 _log_available_models_duration(endpoint_start, cache_hit=False)
1573 return jsonify(
1574 {"provider_options": provider_options, "providers": providers}
1575 )
1577 except Exception:
1578 logger.exception("Error getting available models")
1579 _log_available_models_duration(
1580 endpoint_start, cache_hit=False, error=True
1581 )
1582 return jsonify(
1583 {
1584 "status": "error",
1585 "message": "Failed to retrieve available models",
1586 }
1587 ), 500
1590def _log_available_models_duration(
1591 start: float, cache_hit: bool, error: bool = False
1592) -> None:
1593 """Log /api/available-models endpoint duration.
1595 Uses INFO when the endpoint took > 1s (indicating a real provider fetch
1596 latency worth flagging), DEBUG otherwise. This is the likely culprit for
1597 Path C (LLM provider timeout masquerading as backend hang).
1598 """
1599 elapsed_ms = (time.perf_counter() - start) * 1000
1600 path = (
1601 "error"
1602 if error
1603 else ("cache hit" if cache_hit else "full provider fetch")
1604 )
1605 if elapsed_ms > 1000:
1606 logger.info(f"/api/available-models ({path}) took {elapsed_ms:.0f}ms")
1607 else:
1608 logger.debug(f"/api/available-models ({path}) took {elapsed_ms:.0f}ms")
1611def _get_engine_icon_and_category(
1612 engine_data: dict, engine_class=None
1613) -> tuple:
1614 """
1615 Get icon emoji and category label for a search engine based on its attributes.
1617 Args:
1618 engine_data: Engine configuration dictionary
1619 engine_class: Optional loaded engine class to check attributes
1621 Returns:
1622 Tuple of (icon, category) strings
1623 """
1624 # Check attributes from either the class or the engine data
1625 if engine_class:
1626 is_scientific = getattr(engine_class, "is_scientific", False)
1627 is_generic = getattr(engine_class, "is_generic", False)
1628 is_local = getattr(engine_class, "is_local", False)
1629 is_news = getattr(engine_class, "is_news", False)
1630 is_code = getattr(engine_class, "is_code", False)
1631 else:
1632 is_scientific = engine_data.get("is_scientific", False)
1633 is_generic = engine_data.get("is_generic", False)
1634 is_local = engine_data.get("is_local", False)
1635 is_news = engine_data.get("is_news", False)
1636 is_code = engine_data.get("is_code", False)
1638 # Check books attribute
1639 if engine_class:
1640 is_books = getattr(engine_class, "is_books", False)
1641 else:
1642 is_books = engine_data.get("is_books", False)
1644 # Return icon and category based on engine type
1645 # Priority: local > scientific > news > code > books > generic > default
1646 if is_local:
1647 return "📁", "Local RAG"
1648 if is_scientific:
1649 return "🔬", "Scientific"
1650 if is_news:
1651 return "📰", "News"
1652 if is_code:
1653 return "💻", "Code"
1654 if is_books:
1655 return "📚", "Books"
1656 if is_generic:
1657 return "🌐", "Web Search"
1658 return "🔍", "Search"
1661@settings_bp.route("/api/available-search-engines", methods=["GET"])
1662@login_required
1663@with_user_session()
1664def api_get_available_search_engines(db_session=None, settings_manager=None):
1665 """Get available search engines"""
1666 try:
1667 # Get search engines using the same approach as search_engines_config.py
1668 from ...web_search_engines.search_engines_config import search_config
1670 username = session["username"]
1671 search_engines = search_config(username=username, db_session=db_session)
1673 # Get user's favorites using SettingsManager
1674 favorites = settings_manager.get_setting("search.favorites", [])
1675 if not isinstance(favorites, list): 1675 ↛ 1676line 1675 didn't jump to line 1676 because the condition on line 1675 was never true
1676 favorites = []
1678 # Extract search engines from config
1679 engines_dict = {}
1680 engine_options = []
1682 if search_engines: 1682 ↛ 1749line 1682 didn't jump to line 1749 because the condition on line 1682 was always true
1683 # Format engines for API response with metadata
1684 from ...security.module_whitelist import (
1685 get_safe_module_class,
1686 SecurityError,
1687 )
1689 for engine_id, engine_data in search_engines.items():
1690 # Try to load the engine class to get metadata
1691 engine_class = None
1692 try:
1693 module_path = engine_data.get("module_path")
1694 class_name = engine_data.get("class_name")
1695 if module_path and class_name: 1695 ↛ 1710line 1695 didn't jump to line 1710 because the condition on line 1695 was always true
1696 # Use secure whitelist-validated import
1697 engine_class = get_safe_module_class(
1698 module_path, class_name
1699 )
1700 except SecurityError:
1701 logger.warning(
1702 f"Security: Blocked unsafe module for {engine_id}"
1703 )
1704 except Exception as e:
1705 logger.debug(
1706 f"Could not load engine class for {engine_id}: {e}"
1707 )
1709 # Get icon and category from engine attributes
1710 icon, category = _get_engine_icon_and_category(
1711 engine_data, engine_class
1712 )
1714 # Check if engine requires an API key
1715 requires_api_key = engine_data.get("requires_api_key", False)
1717 # Build display name with icon, category, and API key status
1718 base_name = engine_data.get("display_name", engine_id)
1719 if requires_api_key:
1720 label = f"{icon} {base_name} ({category}, API key)"
1721 else:
1722 label = f"{icon} {base_name} ({category}, Free)"
1724 # Check if engine is a favorite
1725 is_favorite = engine_id in favorites
1727 engines_dict[engine_id] = {
1728 "display_name": base_name,
1729 "description": engine_data.get("description", ""),
1730 "strengths": engine_data.get("strengths", []),
1731 "icon": icon,
1732 "category": category,
1733 "requires_api_key": requires_api_key,
1734 "is_favorite": is_favorite,
1735 }
1737 engine_options.append(
1738 {
1739 "value": engine_id,
1740 "label": label,
1741 "icon": icon,
1742 "category": category,
1743 "requires_api_key": requires_api_key,
1744 "is_favorite": is_favorite,
1745 }
1746 )
1748 # Sort engine_options: favorites first, then alphabetically by label
1749 engine_options.sort(
1750 key=lambda x: (
1751 not x.get("is_favorite", False),
1752 x.get("label", "").lower(),
1753 )
1754 )
1756 # If no engines found, log the issue but return empty list
1757 if not engine_options: 1757 ↛ 1758line 1757 didn't jump to line 1758 because the condition on line 1757 was never true
1758 logger.warning("No search engines found in configuration")
1760 return jsonify(
1761 {
1762 "engines": engines_dict,
1763 "engine_options": engine_options,
1764 "favorites": favorites,
1765 }
1766 )
1768 except Exception:
1769 logger.exception("Error getting available search engines")
1770 return jsonify({"error": "Failed to retrieve search engines"}), 500
1773@settings_bp.route("/api/search-favorites", methods=["GET"])
1774@login_required
1775@with_user_session()
1776def api_get_search_favorites(db_session=None, settings_manager=None):
1777 """Get the list of favorite search engines for the current user"""
1778 try:
1779 favorites = settings_manager.get_setting("search.favorites", [])
1780 if not isinstance(favorites, list):
1781 favorites = []
1782 return jsonify({"favorites": favorites})
1784 except Exception:
1785 logger.exception("Error getting search favorites")
1786 return jsonify({"error": "Failed to retrieve favorites"}), 500
1789@settings_bp.route("/api/search-favorites", methods=["PUT"])
1790@login_required
1791@require_json_body(error_message="No data provided")
1792@with_user_session()
1793def api_update_search_favorites(db_session=None, settings_manager=None):
1794 """Update the list of favorite search engines for the current user"""
1795 try:
1796 data = request.get_json()
1797 favorites = data.get("favorites")
1798 if favorites is None:
1799 return jsonify({"error": "No favorites provided"}), 400
1801 if not isinstance(favorites, list):
1802 return jsonify({"error": "Favorites must be a list"}), 400
1804 if settings_manager.set_setting("search.favorites", favorites):
1805 invalidate_settings_caches(session["username"])
1806 return jsonify(
1807 {
1808 "message": "Favorites updated successfully",
1809 "favorites": favorites,
1810 }
1811 )
1812 return jsonify({"error": "Failed to update favorites"}), 500
1814 except Exception:
1815 logger.exception("Error updating search favorites")
1816 return jsonify({"error": "Failed to update favorites"}), 500
1819@settings_bp.route("/api/search-favorites/toggle", methods=["POST"])
1820@login_required
1821@require_json_body(error_message="No data provided")
1822@with_user_session()
1823def api_toggle_search_favorite(db_session=None, settings_manager=None):
1824 """Toggle a search engine as favorite"""
1825 try:
1826 data = request.get_json()
1827 engine_id = data.get("engine_id")
1828 if not engine_id:
1829 return jsonify({"error": "No engine_id provided"}), 400
1831 # Get current favorites
1832 favorites = settings_manager.get_setting("search.favorites", [])
1833 if not isinstance(favorites, list):
1834 favorites = []
1835 else:
1836 # Make a copy to avoid modifying the original
1837 favorites = list(favorites)
1839 # Toggle the engine
1840 is_favorite = engine_id in favorites
1841 if is_favorite:
1842 favorites.remove(engine_id)
1843 is_favorite = False
1844 else:
1845 favorites.append(engine_id)
1846 is_favorite = True
1848 # Update the setting
1849 if settings_manager.set_setting("search.favorites", favorites):
1850 invalidate_settings_caches(session["username"])
1851 return jsonify(
1852 {
1853 "message": "Favorite toggled successfully",
1854 "engine_id": engine_id,
1855 "is_favorite": is_favorite,
1856 "favorites": favorites,
1857 }
1858 )
1859 return jsonify({"error": "Failed to toggle favorite"}), 500
1861 except Exception:
1862 logger.exception("Error toggling search favorite")
1863 return jsonify({"error": "Failed to toggle favorite"}), 500
1866# Legacy routes for backward compatibility - these will redirect to the new routes
1867@settings_bp.route("/main", methods=["GET"])
1868@login_required
1869def main_config_page():
1870 """Redirect to app settings page"""
1871 return redirect(url_for("settings.settings_page"))
1874@settings_bp.route("/collections", methods=["GET"])
1875@login_required
1876def collections_config_page():
1877 """Redirect to app settings page"""
1878 return redirect(url_for("settings.settings_page"))
1881@settings_bp.route("/api_keys", methods=["GET"])
1882@login_required
1883def api_keys_config_page():
1884 """Redirect to LLM settings page"""
1885 return redirect(url_for("settings.settings_page"))
1888@settings_bp.route("/search_engines", methods=["GET"])
1889@login_required
1890def search_engines_config_page():
1891 """Redirect to search settings page"""
1892 return redirect(url_for("settings.settings_page"))
1895@settings_bp.route("/llm", methods=["GET"])
1896@login_required
1897def llm_config_page():
1898 """Redirect to LLM settings page"""
1899 return redirect(url_for("settings.settings_page"))
1902@settings_bp.route("/open_file_location", methods=["POST"])
1903@login_required
1904def open_file_location():
1905 """Open the location of a configuration file.
1907 Security: This endpoint is disabled for server deployments.
1908 It only makes sense for desktop usage where the server and client are on the same machine.
1909 """
1910 return jsonify(
1911 {
1912 "status": "error",
1913 "message": "This feature is disabled. It is only available in desktop mode.",
1914 }
1915 ), 403
1918@settings_bp.context_processor
1919def inject_csrf_token():
1920 """Inject CSRF token into the template context for all settings routes."""
1921 return {"csrf_token": generate_csrf}
1924@settings_bp.route("/fix_corrupted_settings", methods=["POST"])
1925@login_required
1926@settings_limit
1927@with_user_session(include_settings_manager=False)
1928def fix_corrupted_settings(db_session=None):
1929 """Fix corrupted settings in the database"""
1930 try:
1931 # Track fixed and removed settings
1932 fixed_settings = []
1933 removed_duplicate_settings = []
1934 # First, find and remove duplicate settings with the same key
1935 # This happens because of errors in settings import/export
1936 from sqlalchemy import func as sql_func
1938 # Find keys with duplicates
1939 duplicate_keys = (
1940 db_session.query(Setting.key)
1941 .group_by(Setting.key)
1942 .having(sql_func.count(Setting.key) > 1)
1943 .all()
1944 )
1945 duplicate_keys = [key[0] for key in duplicate_keys]
1947 # For each duplicate key, keep the latest updated one and remove others
1948 for key in duplicate_keys:
1949 dupe_settings = (
1950 db_session.query(Setting)
1951 .filter(Setting.key == key)
1952 .order_by(Setting.updated_at.desc())
1953 .all()
1954 )
1956 # Keep the first one (most recently updated) and delete the rest
1957 for i, setting in enumerate(dupe_settings):
1958 if i > 0: # Skip the first one (keep it)
1959 db_session.delete(setting)
1960 removed_duplicate_settings.append(key)
1962 # Check for settings with corrupted values
1963 all_settings = db_session.query(Setting).all()
1964 for setting in all_settings:
1965 # Check different types of corruption
1966 is_corrupted = False
1968 if (
1969 setting.value is None
1970 or (
1971 isinstance(setting.value, str)
1972 and setting.value
1973 in [
1974 "{",
1975 "[",
1976 "{}",
1977 "[]",
1978 "[object Object]",
1979 "null",
1980 "undefined",
1981 ]
1982 )
1983 or (isinstance(setting.value, dict) and len(setting.value) == 0)
1984 ):
1985 is_corrupted = True
1987 # Skip if not corrupted
1988 if not is_corrupted:
1989 continue
1991 default_value: Any = None
1993 # Try to find a matching default setting based on key
1994 if setting.key.startswith("llm."):
1995 if setting.key == "llm.model":
1996 default_value = ""
1997 elif setting.key == "llm.provider":
1998 default_value = "ollama"
1999 elif setting.key == "llm.temperature":
2000 default_value = 0.7
2001 elif setting.key == "llm.max_tokens": 2001 ↛ 2042line 2001 didn't jump to line 2042 because the condition on line 2001 was always true
2002 default_value = 1024
2003 elif setting.key.startswith("search."):
2004 if setting.key == "search.tool":
2005 default_value = "auto"
2006 elif setting.key == "search.max_results":
2007 default_value = 10
2008 elif setting.key == "search.region":
2009 default_value = "us"
2010 elif setting.key == "search.questions_per_iteration":
2011 default_value = 3
2012 elif setting.key == "search.searches_per_section":
2013 default_value = 2
2014 elif setting.key == "search.skip_relevance_filter":
2015 default_value = False
2016 elif setting.key == "search.safe_search":
2017 default_value = True
2018 elif setting.key == "search.search_language": 2018 ↛ 2042line 2018 didn't jump to line 2042 because the condition on line 2018 was always true
2019 default_value = "English"
2020 elif setting.key.startswith("report."):
2021 if setting.key == "report.searches_per_section":
2022 default_value = 2
2023 elif setting.key.startswith("app."): 2023 ↛ 2042line 2023 didn't jump to line 2042 because the condition on line 2023 was always true
2024 if (
2025 setting.key == "app.theme"
2026 or setting.key == "app.default_theme"
2027 ):
2028 default_value = "dark"
2029 elif setting.key == "app.enable_notifications" or (
2030 setting.key == "app.enable_web"
2031 or setting.key == "app.web_interface"
2032 ):
2033 default_value = True
2034 elif setting.key == "app.host":
2035 default_value = "0.0.0.0"
2036 elif setting.key == "app.port":
2037 default_value = 5000
2038 elif setting.key == "app.debug": 2038 ↛ 2042line 2038 didn't jump to line 2042 because the condition on line 2038 was always true
2039 default_value = True
2041 # Update the setting with the default value if found
2042 if default_value is not None:
2043 setting.value = default_value
2044 fixed_settings.append(setting.key)
2045 else:
2046 # If no default found but it's a corrupted JSON, set to empty object
2047 if setting.key.startswith("report."): 2047 ↛ 1964line 2047 didn't jump to line 1964 because the condition on line 2047 was always true
2048 setting.value = {}
2049 fixed_settings.append(setting.key)
2051 # Commit changes
2052 if fixed_settings or removed_duplicate_settings:
2053 db_session.commit()
2054 logger.info(
2055 f"Fixed {len(fixed_settings)} corrupted settings: {', '.join(fixed_settings)}"
2056 )
2057 if removed_duplicate_settings:
2058 logger.info(
2059 f"Removed {len(removed_duplicate_settings)} duplicate settings"
2060 )
2061 invalidate_settings_caches(session["username"])
2063 # Return success
2064 return jsonify(
2065 {
2066 "status": "success",
2067 "message": f"Fixed {len(fixed_settings)} corrupted settings, removed {len(removed_duplicate_settings)} duplicates",
2068 "fixed_settings": fixed_settings,
2069 "removed_duplicates": removed_duplicate_settings,
2070 }
2071 )
2073 except Exception:
2074 logger.exception("Error fixing corrupted settings")
2075 db_session.rollback()
2076 return (
2077 jsonify(
2078 {
2079 "status": "error",
2080 "message": "An internal error occurred while fixing corrupted settings. Please try again later.",
2081 }
2082 ),
2083 500,
2084 )
2087@settings_bp.route("/api/warnings", methods=["GET"])
2088@login_required
2089def api_get_warnings():
2090 """Get current warnings based on settings"""
2091 try:
2092 warnings = calculate_warnings()
2093 return jsonify({"warnings": warnings})
2094 except Exception:
2095 logger.exception("Error getting warnings")
2096 return jsonify({"error": "Failed to retrieve warnings"}), 500
2099@settings_bp.route("/api/backup-status", methods=["GET"])
2100@login_required
2101def api_get_backup_status():
2102 """Get backup status for the current user."""
2103 try:
2104 from ...config.paths import get_user_backup_directory
2106 username = session.get("username")
2107 if not username:
2108 return jsonify({"error": "Not authenticated"}), 401
2110 from ...utilities.formatting import human_size
2112 backup_dir = get_user_backup_directory(username)
2114 # Sort by modification time (not filename) for robustness
2115 backup_list = []
2116 total_size = 0
2117 for b in backup_dir.glob("ldr_backup_*.db"):
2118 try:
2119 stat = b.stat()
2120 total_size += stat.st_size
2121 backup_list.append(
2122 {
2123 "filename": b.name,
2124 "size_bytes": stat.st_size,
2125 "size_human": human_size(stat.st_size),
2126 "created_at": datetime.fromtimestamp(
2127 stat.st_mtime, tz=timezone.utc
2128 ).isoformat(),
2129 "_mtime": stat.st_mtime,
2130 }
2131 )
2132 except FileNotFoundError:
2133 continue
2135 # Sort newest first by mtime, then remove internal field
2136 backup_list.sort(key=lambda x: x["_mtime"], reverse=True)
2137 for entry in backup_list:
2138 del entry["_mtime"]
2140 backup_enabled = _get_setting_from_session("backup.enabled", True)
2142 return jsonify(
2143 {
2144 "enabled": bool(backup_enabled),
2145 "count": len(backup_list),
2146 "backups": backup_list,
2147 "total_size_bytes": total_size,
2148 "total_size_human": human_size(total_size),
2149 }
2150 )
2152 except Exception:
2153 logger.exception("Error getting backup status")
2154 return jsonify({"error": "Failed to retrieve backup status"}), 500
2157@settings_bp.route("/api/ollama-status", methods=["GET"])
2158@login_required
2159def check_ollama_status():
2160 """Check if Ollama is running and available"""
2161 try:
2162 # Get Ollama URL from settings
2163 raw_base_url = _get_setting_from_session(
2164 "llm.ollama.url", DEFAULT_OLLAMA_URL
2165 )
2166 base_url = (
2167 normalize_url(raw_base_url) if raw_base_url else DEFAULT_OLLAMA_URL
2168 )
2170 response = safe_get(
2171 f"{base_url}/api/version",
2172 timeout=2,
2173 allow_localhost=True,
2174 allow_private_ips=True,
2175 )
2177 if response.status_code == 200:
2178 return jsonify(
2179 {
2180 "running": True,
2181 "version": response.json().get("version", "unknown"),
2182 }
2183 )
2184 return jsonify(
2185 {
2186 "running": False,
2187 "error": f"Ollama returned status code {response.status_code}",
2188 }
2189 )
2190 except requests.exceptions.RequestException:
2191 logger.exception("Ollama check failed")
2192 return jsonify(
2193 {"running": False, "error": "Failed to check search engine status"}
2194 )
2197@settings_bp.route("/api/rate-limiting/status", methods=["GET"])
2198@login_required
2199def api_get_rate_limiting_status():
2200 """Get current rate limiting status and statistics"""
2201 try:
2202 from ...web_search_engines.rate_limiting import get_tracker
2204 tracker = get_tracker()
2206 # Get basic status
2207 status = {
2208 "enabled": tracker.enabled,
2209 "exploration_rate": tracker.exploration_rate,
2210 "learning_rate": tracker.learning_rate,
2211 "memory_window": tracker.memory_window,
2212 }
2214 # Get engine statistics
2215 engine_stats = tracker.get_stats()
2216 engines = []
2218 for stat in engine_stats:
2219 (
2220 engine_type,
2221 base_wait,
2222 min_wait,
2223 max_wait,
2224 last_updated,
2225 total_attempts,
2226 success_rate,
2227 ) = stat
2228 engines.append(
2229 {
2230 "engine_type": engine_type,
2231 "base_wait_seconds": round(base_wait, 2),
2232 "min_wait_seconds": round(min_wait, 2),
2233 "max_wait_seconds": round(max_wait, 2),
2234 "last_updated": last_updated,
2235 "total_attempts": total_attempts,
2236 "success_rate": (
2237 round(success_rate * 100, 1) if success_rate else 0.0
2238 ),
2239 }
2240 )
2242 return jsonify({"status": status, "engines": engines})
2244 except Exception:
2245 logger.exception("Error getting rate limiting status")
2246 return jsonify({"error": "An internal error occurred"}), 500
2249@settings_bp.route(
2250 "/api/rate-limiting/engines/<engine_type>/reset", methods=["POST"]
2251)
2252@login_required
2253def api_reset_engine_rate_limiting(engine_type):
2254 """Reset rate limiting data for a specific engine"""
2255 try:
2256 from ...web_search_engines.rate_limiting import get_tracker
2258 tracker = get_tracker()
2259 tracker.reset_engine(engine_type)
2261 return jsonify(
2262 {"message": f"Rate limiting data reset for {engine_type}"}
2263 )
2265 except Exception:
2266 logger.exception(f"Error resetting rate limiting for {engine_type}")
2267 return jsonify({"error": "An internal error occurred"}), 500
2270@settings_bp.route("/api/rate-limiting/cleanup", methods=["POST"])
2271@login_required
2272def api_cleanup_rate_limiting():
2273 """Clean up old rate limiting data.
2275 Note: not using @require_json_body because the JSON body is optional
2276 here — the endpoint works with or without a payload (defaults to 30 days).
2277 """
2278 try:
2279 from ...web_search_engines.rate_limiting import get_tracker
2281 data = request.get_json() if request.is_json else None
2282 days = data.get("days", 30) if data is not None else 30
2284 tracker = get_tracker()
2285 tracker.cleanup_old_data(days)
2287 return jsonify(
2288 {"message": f"Cleaned up rate limiting data older than {days} days"}
2289 )
2291 except Exception:
2292 logger.exception("Error cleaning up rate limiting data")
2293 return jsonify({"error": "An internal error occurred"}), 500
2296@settings_bp.route("/api/bulk", methods=["GET"])
2297@login_required
2298def get_bulk_settings():
2299 """Get multiple settings at once for performance."""
2300 try:
2301 # Get requested settings from query parameters
2302 requested = request.args.getlist("keys[]")
2303 if not requested:
2304 # Default to common settings if none specified
2305 requested = [
2306 "llm.provider",
2307 "llm.model",
2308 "search.tool",
2309 "search.iterations",
2310 "search.questions_per_iteration",
2311 "search.search_strategy",
2312 "benchmark.evaluation.provider",
2313 "benchmark.evaluation.model",
2314 "benchmark.evaluation.temperature",
2315 "benchmark.evaluation.endpoint_url",
2316 ]
2318 # Fetch all settings at once
2319 result = {}
2320 for key in requested:
2321 try:
2322 value = _get_setting_from_session(key)
2323 result[key] = {"value": value, "exists": value is not None}
2324 except Exception:
2325 logger.warning(f"Error getting setting {key}")
2326 result[key] = {
2327 "value": None,
2328 "exists": False,
2329 "error": "Failed to retrieve setting",
2330 }
2332 return jsonify({"success": True, "settings": result})
2334 except Exception:
2335 logger.exception("Error getting bulk settings")
2336 return jsonify(
2337 {"success": False, "error": "An internal error occurred"}
2338 ), 500
2341@settings_bp.route("/api/data-location", methods=["GET"])
2342@login_required
2343def api_get_data_location():
2344 """Get information about data storage location and security"""
2345 try:
2346 # Get the data directory path
2347 data_dir = get_data_directory()
2348 # Get the encrypted databases path
2349 encrypted_db_path = get_encrypted_database_path()
2351 # Check if LDR_DATA_DIR environment variable is set
2352 from local_deep_research.settings.manager import SettingsManager
2354 settings_manager = SettingsManager()
2355 custom_data_dir = settings_manager.get_setting("bootstrap.data_dir")
2357 # Get platform-specific default location info
2358 platform_info = {
2359 "Windows": "C:\\Users\\Username\\AppData\\Local\\local-deep-research",
2360 "macOS": "~/Library/Application Support/local-deep-research",
2361 "Linux": "~/.local/share/local-deep-research",
2362 }
2364 # Current platform
2365 current_platform = platform.system()
2366 if current_platform == "Darwin":
2367 current_platform = "macOS"
2369 # Get SQLCipher settings from environment
2370 from ...database.sqlcipher_utils import get_sqlcipher_settings
2372 # Debug logging
2373 logger.info(f"db_manager type: {type(db_manager)}")
2374 logger.info(
2375 f"db_manager.has_encryption: {getattr(db_manager, 'has_encryption', 'ATTRIBUTE NOT FOUND')}"
2376 )
2378 cipher_settings = (
2379 get_sqlcipher_settings() if db_manager.has_encryption else {}
2380 )
2382 return jsonify(
2383 {
2384 "data_directory": str(data_dir),
2385 "database_path": str(encrypted_db_path),
2386 "encrypted_database_path": str(encrypted_db_path),
2387 "is_custom": custom_data_dir is not None,
2388 "custom_env_var": "LDR_DATA_DIR",
2389 "custom_env_value": custom_data_dir,
2390 "platform": current_platform,
2391 "platform_default": platform_info.get(
2392 current_platform, str(data_dir)
2393 ),
2394 "platform_info": platform_info,
2395 "security_notice": {
2396 "encrypted": db_manager.has_encryption,
2397 "warning": "All data including API keys stored in the database are securely encrypted."
2398 if db_manager.has_encryption
2399 else "All data including API keys stored in the database are currently unencrypted. Please ensure appropriate file system permissions are set.",
2400 "recommendation": "Your data is protected with database encryption."
2401 if db_manager.has_encryption
2402 else "Consider using environment variables for sensitive API keys instead of storing them in the database.",
2403 },
2404 "encryption_settings": cipher_settings,
2405 }
2406 )
2408 except Exception:
2409 logger.exception("Error getting data location information")
2410 return jsonify({"error": "Failed to retrieve data location"}), 500
2413@settings_bp.route("/api/notifications/test-url", methods=["POST"])
2414@login_required
2415def api_test_notification_url():
2416 """
2417 Test a notification service URL.
2419 This endpoint creates a temporary NotificationService instance to test
2420 the provided URL. No database session or password is required because:
2421 - The service URL is provided directly in the request body
2422 - Test notifications use a temporary Apprise instance
2423 - No user settings or database queries are performed
2425 Security note: Rate limiting is not applied here because users need to
2426 test URLs when configuring notifications. Abuse is mitigated by the
2427 @login_required decorator and the fact that users can only spam their
2428 own notification services.
2429 """
2430 try:
2431 from ...notifications.service import NotificationService
2432 from ...settings.env_registry import get_env_setting
2434 data = request.get_json()
2435 if not data or "service_url" not in data:
2436 return jsonify(
2437 {"success": False, "error": "service_url is required"}
2438 ), 400
2440 service_url = data["service_url"]
2442 # Create notification service instance and test the URL.
2443 # Gate by the env-only master switch so the test endpoint cannot
2444 # bypass the operator's risk-acceptance decision (see SECURITY.md
2445 # "Notification Webhook SSRF").
2446 notification_service = NotificationService(
2447 allow_private_ips=bool(
2448 get_env_setting("notifications.allow_private_ips", False)
2449 ),
2450 outbound_allowed=bool(
2451 get_env_setting("notifications.allow_outbound", False)
2452 ),
2453 )
2454 result = notification_service.test_service(service_url)
2456 # Only return expected fields to prevent information leakage
2457 safe_response = {
2458 "success": result.get("success", False),
2459 "message": result.get("message", ""),
2460 "error": result.get("error", ""),
2461 }
2462 return jsonify(safe_response)
2464 except Exception:
2465 logger.exception("Error testing notification URL")
2466 return jsonify(
2467 {
2468 "success": False,
2469 "error": "Failed to test notification service. Check logs for details.",
2470 }
2471 ), 500