Coverage for src / local_deep_research / web / routes / settings_routes.py: 85%
932 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Settings Routes Module
4This module handles all settings-related HTTP endpoints for the application.
6CHECKBOX HANDLING PATTERN:
7--------------------------
8This module supports TWO submission modes to handle checkboxes correctly:
10**MODE 1: AJAX/JSON Submission (Primary - /save_all_settings)**
11- JavaScript intercepts form submission with e.preventDefault()
12- Checkbox values read directly from DOM via checkbox.checked
13- Data sent as JSON: {"setting.key": true/false}
14- Hidden fallback inputs are managed but NOT used in this mode
15- Provides better UX with instant feedback and validation
17**MODE 2: Traditional POST Submission (Fallback - /save_settings)**
18- Used when JavaScript is disabled (accessibility/no-JS environments)
19- Browser submits form data naturally via request.form
20- Hidden fallback pattern CRITICAL here:
21 * Checked checkbox: Submits checkbox value, hidden input disabled
22 * Unchecked checkbox: Submits hidden input value "false"
23- Ensures unchecked checkboxes are captured (HTML limitation workaround)
25**Implementation Details:**
261. Each checkbox has `data-hidden-fallback` attribute → hidden input ID
272. checkbox_handler.js manages hidden input disabled state
283. AJAX mode: settings.js reads checkbox.checked directly (lines 2233-2240)
294. POST mode: Flask reads request.form including enabled hidden inputs
305. Both modes use convert_setting_value() for consistent boolean conversion
32**Why Both Patterns?**
33- AJAX: Better UX, immediate validation, no page reload
34- Traditional POST: Accessibility, progressive enhancement, JavaScript-free operation
35- Hidden inputs: Only meaningful for traditional POST, ignored in AJAX mode
37This dual-mode approach ensures the app works for all users while providing
38optimal experience when JavaScript is available.
39"""
41import platform
42from typing import Any, Optional, Tuple
43from datetime import UTC, datetime, timedelta, timezone
45import requests
46from flask import (
47 Blueprint,
48 flash,
49 jsonify,
50 redirect,
51 request,
52 session,
53 url_for,
54)
55from flask_wtf.csrf import generate_csrf
56from loguru import logger
58from ...config.constants import DEFAULT_OLLAMA_URL
59from ...llm.providers.base import normalize_provider
60from ...config.paths import get_data_directory, get_encrypted_database_path
61from ...database.models import Setting, SettingType
62from ...database.session_context import get_user_db_session
63from ...database.encrypted_db import db_manager
64from ...utilities.db_utils import get_settings_manager
65from ...utilities.url_utils import normalize_url
66from ...security.decorators import require_json_body
67from ..auth.decorators import login_required
68from ...security.rate_limiter import settings_limit
69from ...settings.manager import get_typed_setting_value, parse_boolean
70from ..services.settings_service import (
71 create_or_update_setting,
72 invalidate_settings_caches,
73 set_setting,
74)
75from ..utils.route_decorators import with_user_session
76from ..utils.templates import render_template_with_defaults
79from ...security import safe_get
80from ..warning_checks import calculate_warnings
82# Create a Blueprint for settings
83settings_bp = Blueprint("settings", __name__, url_prefix="/settings")
85# NOTE: Routes use session["username"] (not .get()) intentionally.
86# @login_required guarantees the key exists; direct access fails fast
87# if the decorator is ever removed.
89# Settings with dynamically populated options (excluded from validation)
90DYNAMIC_SETTINGS = ["llm.provider", "llm.model", "search.tool"]
93def _get_setting_from_session(key: str, default=None):
94 """Helper to get a setting using the current session context."""
95 username = session.get("username")
96 with get_user_db_session(username) as db_session:
97 if db_session:
98 settings_manager = get_settings_manager(db_session, username)
99 return settings_manager.get_setting(key, default)
100 return default
103def validate_setting(
104 setting: Setting, value: Any
105) -> Tuple[bool, Optional[str]]:
106 """
107 Validate a setting value based on its type and constraints.
109 Args:
110 setting: The Setting object to validate against
111 value: The value to validate
113 Returns:
114 Tuple of (is_valid, error_message)
115 """
116 # Convert value to appropriate type first using SettingsManager's logic
117 value = get_typed_setting_value(
118 key=str(setting.key),
119 value=value,
120 ui_element=str(setting.ui_element),
121 default=None,
122 check_env=False,
123 )
125 # Validate based on UI element type
126 if setting.ui_element == "checkbox":
127 # After conversion, should be boolean
128 if not isinstance(value, bool):
129 return False, "Value must be a boolean"
131 elif setting.ui_element in ("number", "slider", "range"):
132 # After conversion, should be numeric
133 if not isinstance(value, (int, float)):
134 return False, "Value must be a number"
136 # Check min/max constraints if defined
137 if setting.min_value is not None and value < setting.min_value:
138 return False, f"Value must be at least {setting.min_value}"
139 if setting.max_value is not None and value > setting.max_value:
140 return False, f"Value must be at most {setting.max_value}"
142 elif setting.ui_element == "select":
143 # Check if value is in the allowed options
144 if setting.options:
145 # Skip options validation for dynamically populated dropdowns
146 if setting.key not in DYNAMIC_SETTINGS:
147 allowed_values = [
148 opt.get("value") if isinstance(opt, dict) else opt
149 for opt in list(setting.options) # type: ignore[arg-type]
150 ]
151 if value not in allowed_values:
152 return (
153 False,
154 f"Value must be one of: {', '.join(str(v) for v in allowed_values)}",
155 )
157 # All checks passed
158 return True, None
161def coerce_setting_for_write(key: str, value: Any, ui_element: str) -> Any:
162 """Coerce an incoming value to the correct type before writing to the DB.
164 All web routes that save settings should use this function to ensure
165 consistent type conversion.
167 No JSON pre-parsing (``json.loads``) is needed here because:
168 - ``get_typed_setting_value`` already parses JSON strings internally
169 via ``_parse_json_value`` (for ``ui_element="json"``) and
170 ``_parse_multiselect`` (for ``ui_element="multiselect"``).
171 - For JSON API endpoints, ``request.get_json()`` already delivers
172 dicts/lists as native Python objects.
173 - For ``ui_element="text"``, pre-parsing would corrupt data: a JSON
174 string like ``'{"k": "v"}'`` would become a dict, then ``str()``
175 would produce ``"{'k': 'v'}"`` (Python repr, not valid JSON).
176 """
177 # check_env=False: we are persisting a user-supplied value, not reading
178 # from an environment variable override. check_env=True (the default)
179 # would silently replace the user's value with an env var, which is
180 # incorrect on the write path.
181 return get_typed_setting_value(
182 key=key,
183 value=value,
184 ui_element=ui_element,
185 default=None,
186 check_env=False,
187 )
190@settings_bp.route("/", methods=["GET"])
191@login_required
192def settings_page():
193 """Main settings dashboard with links to specialized config pages"""
194 return render_template_with_defaults("settings_dashboard.html")
197@settings_bp.route("/save_all_settings", methods=["POST"])
198@login_required
199@settings_limit
200@require_json_body(
201 error_format="status", error_message="No settings data provided"
202)
203@with_user_session()
204def save_all_settings(db_session=None, settings_manager=None):
205 """Handle saving all settings at once from the unified settings page"""
206 try:
207 # Process JSON data
208 form_data = request.get_json()
209 if not form_data:
210 return (
211 jsonify(
212 {
213 "status": "error",
214 "message": "No settings data provided",
215 }
216 ),
217 400,
218 )
220 # Track validation errors
221 validation_errors = []
222 settings_by_type: dict[str, Any] = {}
224 # Track changes for logging
225 updated_settings = []
226 created_settings = []
228 # Store original values for better messaging
229 original_values = {}
231 # Fetch all settings once to avoid N+1 query problem
232 all_db_settings = {
233 setting.key: setting for setting in db_session.query(Setting).all()
234 }
236 # Filter out non-editable settings
237 non_editable_keys = [
238 key
239 for key in form_data.keys()
240 if key in all_db_settings and not all_db_settings[key].editable
241 ]
242 if non_editable_keys:
243 logger.warning(
244 f"Skipping non-editable settings: {non_editable_keys}"
245 )
246 for key in non_editable_keys:
247 del form_data[key]
249 # Update each setting
250 for key, value in form_data.items():
251 # Skip corrupted keys or empty strings as keys
252 if not key or not isinstance(key, str) or key.strip() == "":
253 continue
255 # Get the setting metadata from pre-fetched dict
256 current_setting = all_db_settings.get(key)
258 # EARLY VALIDATION: Convert checkbox values BEFORE any other processing
259 # This prevents incorrect triggering of corrupted value detection
260 if current_setting and current_setting.ui_element == "checkbox":
261 if not isinstance(value, bool):
262 logger.debug(
263 f"Converting checkbox {key} from {type(value).__name__} to bool: {value}"
264 )
265 value = parse_boolean(value)
266 form_data[key] = (
267 value # Update the form_data with converted value
268 )
270 # Store original value for messaging
271 if current_setting:
272 original_values[key] = current_setting.value
274 # Determine setting type and category
275 if key.startswith("llm."):
276 setting_type = SettingType.LLM
277 category = "llm_general"
278 if (
279 "temperature" in key
280 or "max_tokens" in key
281 or "batch" in key
282 or "layers" in key
283 ):
284 category = "llm_parameters"
285 elif key.startswith("search."):
286 setting_type = SettingType.SEARCH
287 category = "search_general"
288 if (
289 "iterations" in key
290 or "results" in key
291 or "region" in key
292 or "questions" in key
293 or "section" in key
294 ):
295 category = "search_parameters"
296 elif key.startswith("report."):
297 setting_type = SettingType.REPORT
298 category = "report_parameters"
299 elif key.startswith("database."):
300 setting_type = SettingType.DATABASE
301 category = "database_parameters"
302 elif key.startswith("app."):
303 setting_type = SettingType.APP
304 category = "app_interface"
305 else:
306 setting_type = None
307 category = None
309 # Special handling for corrupted or empty values
310 if value == "[object Object]" or (
311 isinstance(value, str)
312 and value.strip() in ["{}", "[]", "{", "["]
313 ):
314 if key.startswith("report."):
315 value = {}
316 else:
317 # Use default or null for other types
318 if key == "llm.model":
319 value = "gemma3:12b"
320 elif key == "llm.provider":
321 value = "ollama"
322 elif key == "search.tool":
323 value = "auto"
324 elif key in ["app.theme", "app.default_theme"]:
325 value = "dark"
326 else:
327 value = None
329 logger.warning(f"Corrected corrupted value for {key}: {value}")
330 # NOTE: No JSON pre-parsing is done here. After the
331 # corruption replacement above, values are Python dicts
332 # (e.g. {}), hardcoded strings, or None — none are JSON
333 # strings that need parsing. Type conversion below via
334 # coerce_setting_for_write() handles everything; that
335 # function delegates to get_typed_setting_value() which
336 # already parses JSON internally for "json" and
337 # "multiselect" ui_elements.
339 if current_setting:
340 # Coerce to correct Python type (e.g. str "5" → int 5
341 # for number settings, str "true" → bool for checkboxes).
342 converted_value = coerce_setting_for_write(
343 key=current_setting.key,
344 value=value,
345 ui_element=current_setting.ui_element,
346 )
348 # Validate the setting
349 is_valid, error_message = validate_setting(
350 current_setting, converted_value
351 )
353 if is_valid:
354 # Save the converted setting using the same session
355 success = set_setting(
356 key, converted_value, db_session=db_session
357 )
358 if success: 358 ↛ 362line 358 didn't jump to line 362 because the condition on line 358 was always true
359 updated_settings.append(key)
361 # Track settings by type for exporting
362 if current_setting.type not in settings_by_type:
363 settings_by_type[current_setting.type] = []
364 settings_by_type[current_setting.type].append(
365 current_setting
366 )
367 else:
368 # Add to validation errors
369 validation_errors.append(
370 {
371 "key": key,
372 "name": current_setting.name,
373 "error": error_message,
374 }
375 )
376 else:
377 # Create a new setting
378 new_setting = {
379 "key": key,
380 "value": value,
381 "type": setting_type.value.lower()
382 if setting_type is not None
383 else "app",
384 "name": key.split(".")[-1].replace("_", " ").title(),
385 "description": f"Setting for {key}",
386 "category": category,
387 "ui_element": "text", # Default UI element
388 }
390 # Determine better UI element based on value type
391 if isinstance(value, bool):
392 new_setting["ui_element"] = "checkbox"
393 elif isinstance(value, (int, float)) and not isinstance(
394 value, bool
395 ):
396 new_setting["ui_element"] = "number"
397 elif isinstance(value, (dict, list)):
398 new_setting["ui_element"] = "textarea"
400 # Create the setting
401 db_setting = create_or_update_setting(
402 new_setting, db_session=db_session
403 )
405 if db_setting:
406 created_settings.append(key)
407 # Track settings by type for exporting
408 if db_setting.type not in settings_by_type: 408 ↛ 410line 408 didn't jump to line 410 because the condition on line 408 was always true
409 settings_by_type[db_setting.type] = []
410 settings_by_type[db_setting.type].append(db_setting)
411 else:
412 validation_errors.append(
413 {
414 "key": key,
415 "name": new_setting["name"],
416 "error": "Failed to create setting",
417 }
418 )
420 # Report validation errors if any
421 if validation_errors:
422 return (
423 jsonify(
424 {
425 "status": "error",
426 "message": "Validation errors",
427 "errors": validation_errors,
428 }
429 ),
430 400,
431 )
433 # Get all settings to return to the client for proper state update
434 all_settings = {}
435 for setting in db_session.query(Setting).all():
436 # Convert enum to string if present
437 setting_type = setting.type
438 if hasattr(setting_type, "value"):
439 setting_type = setting_type.value
441 all_settings[setting.key] = {
442 "value": setting.value,
443 "name": setting.name,
444 "description": setting.description,
445 "type": setting_type,
446 "category": setting.category,
447 "ui_element": setting.ui_element,
448 "editable": setting.editable,
449 "options": setting.options,
450 "visible": setting.visible,
451 "min_value": setting.min_value,
452 "max_value": setting.max_value,
453 "step": setting.step,
454 }
456 # Customize the success message based on what changed
457 success_message = ""
458 if len(updated_settings) == 1:
459 # For a single update, provide more specific info about what changed
460 key = updated_settings[0]
461 # Reuse the already-fetched setting from our pre-fetched dict
462 updated_setting = all_db_settings.get(key)
463 name = (
464 updated_setting.name
465 if updated_setting
466 else key.split(".")[-1].replace("_", " ").title()
467 )
469 # Format the message
470 if key in original_values: 470 ↛ 484line 470 didn't jump to line 484 because the condition on line 470 was always true
471 new_value = updated_setting.value if updated_setting else None
473 # If it's a boolean, use "enabled/disabled" language
474 if isinstance(new_value, bool):
475 state = "enabled" if new_value else "disabled"
476 success_message = f"{name} {state}"
477 else:
478 # For non-boolean values
479 if isinstance(new_value, (dict, list)):
480 success_message = f"{name} updated"
481 else:
482 success_message = f"{name} updated"
483 else:
484 success_message = f"{name} updated"
485 else:
486 # Multiple settings or generic message
487 success_message = f"Settings saved successfully ({len(updated_settings)} updated, {len(created_settings)} created)"
489 # Check if any warning-affecting settings were changed and include warnings
490 response_data = {
491 "status": "success",
492 "message": success_message,
493 "updated": updated_settings,
494 "created": created_settings,
495 "settings": all_settings,
496 }
498 warning_affecting_keys = [
499 "llm.provider",
500 "search.tool",
501 "search.iterations",
502 "search.questions_per_iteration",
503 "llm.local_context_window_size",
504 "llm.context_window_unrestricted",
505 "llm.context_window_size",
506 ]
508 # Check if any warning-affecting settings were changed
509 if any(
510 key in warning_affecting_keys
511 for key in updated_settings + created_settings
512 ):
513 warnings = calculate_warnings()
514 response_data["warnings"] = warnings
515 logger.info(
516 f"Bulk settings update affected warning keys, calculated {len(warnings)} warnings"
517 )
519 invalidate_settings_caches(session["username"])
520 return jsonify(response_data)
522 except Exception:
523 logger.exception("Error saving settings")
524 return (
525 jsonify(
526 {
527 "status": "error",
528 "message": "An internal error occurred while saving settings.",
529 }
530 ),
531 500,
532 )
535@settings_bp.route("/reset_to_defaults", methods=["POST"])
536@login_required
537@settings_limit
538@with_user_session()
539def reset_to_defaults(db_session=None, settings_manager=None):
540 """Reset all settings to their default values"""
541 try:
542 settings_manager.load_from_defaults_file()
544 logger.info("Successfully imported settings from default files")
546 except Exception:
547 logger.exception("Error importing default settings")
548 return jsonify(
549 {
550 "status": "error",
551 "message": "Failed to reset settings to defaults",
552 }
553 ), 500
555 invalidate_settings_caches(session["username"])
556 return jsonify(
557 {
558 "status": "success",
559 "message": "All settings have been reset to default values",
560 }
561 )
564@settings_bp.route("/save_settings", methods=["POST"])
565@login_required
566@settings_limit
567@with_user_session()
568def save_settings(db_session=None, settings_manager=None):
569 """Save all settings from the form using POST method - fallback when JavaScript is disabled"""
570 try:
571 # Get form data
572 form_data = request.form.to_dict()
574 # Remove CSRF token from the data
575 form_data.pop("csrf_token", None)
577 updated_count = 0
578 failed_count = 0
580 # Fetch all settings once to avoid N+1 query problem
581 all_db_settings = {
582 setting.key: setting for setting in db_session.query(Setting).all()
583 }
585 # Filter out non-editable settings
586 non_editable_keys = [
587 key
588 for key in form_data.keys()
589 if key in all_db_settings and not all_db_settings[key].editable
590 ]
591 if non_editable_keys:
592 logger.warning(
593 f"Skipping non-editable settings: {non_editable_keys}"
594 )
595 for key in non_editable_keys:
596 del form_data[key]
598 # Process each setting
599 for key, value in form_data.items():
600 try:
601 # Get the setting from pre-fetched dict
602 db_setting = all_db_settings.get(key)
604 # Coerce form POST string to correct Python type.
605 if db_setting:
606 value = coerce_setting_for_write(
607 key=db_setting.key,
608 value=value,
609 ui_element=db_setting.ui_element,
610 )
612 # Save the setting
613 if settings_manager.set_setting(key, value, commit=False):
614 updated_count += 1
615 else:
616 failed_count += 1
617 logger.warning(f"Failed to save setting {key}")
619 except Exception:
620 logger.exception(f"Error saving setting {key}")
621 failed_count += 1
623 # Commit all changes at once
624 try:
625 db_session.commit()
627 flash(
628 f"Settings saved successfully! Updated {updated_count} settings.",
629 "success",
630 )
631 if failed_count > 0:
632 flash(
633 f"Warning: {failed_count} settings failed to save.",
634 "warning",
635 )
636 invalidate_settings_caches(session["username"])
638 except Exception:
639 db_session.rollback()
640 logger.exception("Failed to commit settings")
641 flash("Error saving settings. Please try again.", "error")
643 return redirect(url_for("settings.settings_page"))
645 except Exception:
646 logger.exception("Error in save_settings")
647 flash("An internal error occurred while saving settings.", "error")
648 return redirect(url_for("settings.settings_page"))
651# API Routes
652@settings_bp.route("/api", methods=["GET"])
653@login_required
654@with_user_session()
655def api_get_all_settings(db_session=None, settings_manager=None):
656 """Get all settings"""
657 try:
658 # Get query parameters
659 category = request.args.get("category")
661 # Get settings
662 settings = settings_manager.get_all_settings()
664 # Filter by category if requested
665 if category:
666 # Need to get all setting details to check category
667 db_settings = db_session.query(Setting).all()
668 category_keys = [
669 s.key for s in db_settings if s.category == category
670 ]
672 # Filter settings by keys
673 settings = {
674 key: value
675 for key, value in settings.items()
676 if key in category_keys
677 }
679 return jsonify({"status": "success", "settings": settings})
680 except Exception:
681 logger.exception("Error getting settings")
682 return jsonify({"error": "Failed to retrieve settings"}), 500
685@settings_bp.route("/api/<path:key>", methods=["GET"])
686@login_required
687@with_user_session()
688def api_get_db_setting(key, db_session=None, settings_manager=None):
689 """Get a specific setting by key from DB, falling back to defaults."""
690 try:
691 # Get setting from database using the same session
692 db_setting = (
693 db_session.query(Setting).filter(Setting.key == key).first()
694 )
696 if db_setting:
697 # Return full setting details from DB
698 setting_data = {
699 "key": db_setting.key,
700 "value": db_setting.value,
701 "type": db_setting.type
702 if isinstance(db_setting.type, str)
703 else db_setting.type.value,
704 "name": db_setting.name,
705 "description": db_setting.description,
706 "category": db_setting.category,
707 "ui_element": db_setting.ui_element,
708 "options": db_setting.options,
709 "min_value": db_setting.min_value,
710 "max_value": db_setting.max_value,
711 "step": db_setting.step,
712 "visible": db_setting.visible,
713 "editable": db_setting.editable,
714 }
715 return jsonify(setting_data)
717 # Not in DB — check defaults so this endpoint is consistent
718 # with GET /settings/api which includes default settings
719 default_meta = settings_manager.default_settings.get(key)
720 if default_meta: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 setting_data = {
722 "key": key,
723 "value": default_meta.get("value"),
724 "type": default_meta.get("type", "APP"),
725 "name": default_meta.get("name", key),
726 "description": default_meta.get("description"),
727 "category": default_meta.get("category"),
728 "ui_element": default_meta.get("ui_element", "text"),
729 "options": default_meta.get("options"),
730 "min_value": default_meta.get("min_value"),
731 "max_value": default_meta.get("max_value"),
732 "step": default_meta.get("step"),
733 "visible": default_meta.get("visible", True),
734 "editable": default_meta.get("editable", True),
735 }
736 return jsonify(setting_data)
738 return jsonify({"error": f"Setting not found: {key}"}), 404
739 except Exception:
740 logger.exception(f"Error getting setting {key}")
741 return jsonify({"error": "Failed to retrieve settings"}), 500
744@settings_bp.route("/api/<path:key>", methods=["PUT"])
745@login_required
746@require_json_body(error_message="No data provided")
747@with_user_session(include_settings_manager=False)
748def api_update_setting(key, db_session=None):
749 """Update a setting"""
750 try:
751 # Get request data
752 data = request.get_json()
753 value = data.get("value")
754 if value is None:
755 return jsonify({"error": "No value provided"}), 400
757 # Check if setting exists
758 db_setting = (
759 db_session.query(Setting).filter(Setting.key == key).first()
760 )
762 if db_setting:
763 # Check if setting is editable
764 if not db_setting.editable:
765 return jsonify({"error": f"Setting {key} is not editable"}), 403
767 # Coerce to correct Python type before saving.
768 # Without this, values from JSON API requests are stored
769 # as-is (e.g. string "5" instead of int 5 for number
770 # settings, string "true" instead of bool for checkboxes).
771 value = coerce_setting_for_write(
772 key=db_setting.key,
773 value=value,
774 ui_element=db_setting.ui_element,
775 )
777 # Validate the setting (matches save_all_settings pattern)
778 is_valid, error_message = validate_setting(db_setting, value)
779 if not is_valid:
780 logger.warning(
781 f"Validation failed for setting {key}: {error_message}"
782 )
783 return jsonify(
784 {"error": f"Invalid value for setting {key}"}
785 ), 400
787 # Update setting
788 # Pass the db_session to avoid session lookup issues
789 success = set_setting(key, value, db_session=db_session)
790 if success:
791 response_data: dict[str, Any] = {
792 "message": f"Setting {key} updated successfully"
793 }
795 # If this is a key that affects warnings, include warning calculations
796 warning_affecting_keys = [
797 "llm.provider",
798 "search.tool",
799 "search.iterations",
800 "search.questions_per_iteration",
801 "llm.local_context_window_size",
802 "llm.context_window_unrestricted",
803 "llm.context_window_size",
804 ]
806 if key in warning_affecting_keys:
807 warnings = calculate_warnings()
808 response_data["warnings"] = warnings
809 logger.debug(
810 f"Setting {key} changed to {value}, calculated {len(warnings)} warnings"
811 )
813 invalidate_settings_caches(session["username"])
814 return jsonify(response_data)
815 return jsonify({"error": f"Failed to update setting {key}"}), 500
816 # Create new setting with default metadata
817 setting_dict = {
818 "key": key,
819 "value": value,
820 "name": key.split(".")[-1].replace("_", " ").title(),
821 "description": f"Setting for {key}",
822 }
824 # Add additional metadata if provided
825 for field in [
826 "type",
827 "name",
828 "description",
829 "category",
830 "ui_element",
831 "options",
832 "min_value",
833 "max_value",
834 "step",
835 "visible",
836 "editable",
837 ]:
838 if field in data:
839 setting_dict[field] = data[field]
841 # Create setting
842 db_setting = create_or_update_setting(
843 setting_dict, db_session=db_session
844 )
846 if db_setting:
847 invalidate_settings_caches(session["username"])
848 return (
849 jsonify(
850 {
851 "message": f"Setting {key} created successfully",
852 "setting": {
853 "key": db_setting.key,
854 "value": db_setting.value,
855 "type": db_setting.type.value,
856 "name": db_setting.name,
857 },
858 }
859 ),
860 201,
861 )
862 return jsonify({"error": f"Failed to create setting {key}"}), 500
863 except Exception:
864 logger.exception(f"Error updating setting {key}")
865 return jsonify({"error": "Failed to update setting"}), 500
868@settings_bp.route("/api/<path:key>", methods=["DELETE"])
869@login_required
870@with_user_session()
871def api_delete_setting(key, db_session=None, settings_manager=None):
872 """Delete a setting"""
873 try:
874 # Check if setting exists
875 db_setting = (
876 db_session.query(Setting).filter(Setting.key == key).first()
877 )
878 if not db_setting:
879 return jsonify({"error": f"Setting not found: {key}"}), 404
881 # Check if setting is editable
882 if not db_setting.editable:
883 return jsonify({"error": f"Setting {key} is not editable"}), 403
885 # Delete setting
886 success = settings_manager.delete_setting(key)
887 if success:
888 invalidate_settings_caches(session["username"])
889 return jsonify({"message": f"Setting {key} deleted successfully"})
890 return jsonify({"error": f"Failed to delete setting {key}"}), 500
891 except Exception:
892 logger.exception(f"Error deleting setting {key}")
893 return jsonify({"error": "Failed to delete setting"}), 500
896@settings_bp.route("/api/import", methods=["POST"])
897@login_required
898@settings_limit
899@with_user_session()
900def api_import_settings(db_session=None, settings_manager=None):
901 """Import settings from defaults file"""
902 try:
903 settings_manager.load_from_defaults_file()
905 invalidate_settings_caches(session["username"])
906 return jsonify({"message": "Settings imported successfully"})
907 except Exception:
908 logger.exception("Error importing settings")
909 return jsonify({"error": "Failed to import settings"}), 500
912@settings_bp.route("/api/categories", methods=["GET"])
913@login_required
914@with_user_session(include_settings_manager=False)
915def api_get_categories(db_session=None):
916 """Get all setting categories"""
917 try:
918 # Get all distinct categories
919 categories = db_session.query(Setting.category).distinct().all()
920 category_list = [c[0] for c in categories if c[0] is not None]
922 return jsonify({"categories": category_list})
923 except Exception:
924 logger.exception("Error getting categories")
925 return jsonify({"error": "Failed to retrieve settings"}), 500
928@settings_bp.route("/api/types", methods=["GET"])
929@login_required
930def api_get_types():
931 """Get all setting types"""
932 try:
933 # Get all setting types
934 types = [t.value for t in SettingType]
935 return jsonify({"types": types})
936 except Exception:
937 logger.exception("Error getting types")
938 return jsonify({"error": "Failed to retrieve settings"}), 500
941@settings_bp.route("/api/ui_elements", methods=["GET"])
942@login_required
943def api_get_ui_elements():
944 """Get all UI element types"""
945 try:
946 # Define supported UI element types
947 ui_elements = [
948 "text",
949 "select",
950 "checkbox",
951 "slider",
952 "number",
953 "textarea",
954 "color",
955 "date",
956 "file",
957 "password",
958 ]
960 return jsonify({"ui_elements": ui_elements})
961 except Exception:
962 logger.exception("Error getting UI elements")
963 return jsonify({"error": "Failed to retrieve settings"}), 500
966@settings_bp.route("/api/available-models", methods=["GET"])
967@login_required
968def api_get_available_models():
969 """Get available LLM models from various providers"""
970 try:
971 from flask import request
973 from ...database.models import ProviderModel
975 # Check if force_refresh is requested
976 force_refresh = (
977 request.args.get("force_refresh", "false").lower() == "true"
978 )
980 # Get all auto-discovered providers (show all so users can discover
981 # and configure providers they haven't set up yet)
982 from ...llm.providers import get_discovered_provider_options
984 provider_options = get_discovered_provider_options()
986 # Add remaining hardcoded providers (complex local providers not yet migrated)
987 provider_options.extend(
988 [
989 {
990 "value": "LLAMACPP",
991 "label": "Llama.cpp (Local GGUF files only)",
992 },
993 ]
994 )
996 # Available models by provider
997 providers: dict[str, Any] = {}
999 # Check database cache first (unless force_refresh is True)
1000 if not force_refresh:
1001 try:
1002 # Define cache expiration (24 hours)
1003 cache_expiry = datetime.now(UTC) - timedelta(hours=24)
1005 # Get cached models from database
1006 username = session["username"]
1007 with get_user_db_session(username) as db_session:
1008 cached_models = (
1009 db_session.query(ProviderModel)
1010 .filter(ProviderModel.last_updated > cache_expiry)
1011 .all()
1012 )
1014 if cached_models: 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true
1015 logger.info(
1016 f"Found {len(cached_models)} cached models in database"
1017 )
1019 # Group models by provider
1020 for model in cached_models:
1021 provider_key = (
1022 f"{normalize_provider(model.provider)}_models"
1023 )
1024 if provider_key not in providers:
1025 providers[provider_key] = []
1027 providers[provider_key].append(
1028 {
1029 "value": model.model_key,
1030 "label": model.model_label,
1031 "provider": model.provider.upper(),
1032 }
1033 )
1035 # If we have cached data for all providers, return it
1036 if providers:
1037 logger.info("Returning cached models from database")
1038 return jsonify(
1039 {
1040 "provider_options": provider_options,
1041 "providers": providers,
1042 }
1043 )
1045 except Exception:
1046 logger.warning("Error reading cached models from database")
1047 # Continue to fetch fresh data
1049 # Try to get Ollama models
1050 ollama_models = []
1051 try:
1052 import json
1053 import re
1055 import requests
1057 # Try to query the Ollama API directly
1058 try:
1059 logger.info("Attempting to connect to Ollama API")
1061 raw_base_url = _get_setting_from_session(
1062 "llm.ollama.url", DEFAULT_OLLAMA_URL
1063 )
1064 base_url = (
1065 normalize_url(raw_base_url)
1066 if raw_base_url
1067 else DEFAULT_OLLAMA_URL
1068 )
1070 ollama_response = safe_get(
1071 f"{base_url}/api/tags",
1072 timeout=5,
1073 allow_localhost=True,
1074 allow_private_ips=True,
1075 )
1077 logger.debug(
1078 f"Ollama API response: Status {ollama_response.status_code}"
1079 )
1081 # Try to parse the response even if status code is not 200 to help with debugging
1082 response_text = ollama_response.text
1083 logger.debug(
1084 f"Ollama API raw response: {response_text[:500]}..."
1085 )
1087 if ollama_response.status_code == 200: 1087 ↛ 1157line 1087 didn't jump to line 1157 because the condition on line 1087 was always true
1088 try:
1089 ollama_data = ollama_response.json()
1090 logger.debug(
1091 f"Ollama API JSON data: {json.dumps(ollama_data)[:500]}..."
1092 )
1094 if "models" in ollama_data: 1094 ↛ 1124line 1094 didn't jump to line 1124 because the condition on line 1094 was always true
1095 # Format for newer Ollama API
1096 logger.info(
1097 f"Found {len(ollama_data.get('models', []))} models in newer Ollama API format"
1098 )
1099 for model in ollama_data.get("models", []): 1099 ↛ 1101line 1099 didn't jump to line 1101 because the loop on line 1099 never started
1100 # Extract name correctly from the model object
1101 name = model.get("name", "")
1102 if name:
1103 # Improved display name formatting
1104 display_name = re.sub(
1105 r"[:/]", " ", name
1106 ).strip()
1107 display_name = " ".join(
1108 word.capitalize()
1109 for word in display_name.split()
1110 )
1111 # Create the model entry with value and label
1112 ollama_models.append(
1113 {
1114 "value": name, # Original model name as value (for API calls)
1115 "label": f"{display_name} (Ollama)", # Pretty name as label
1116 "provider": "ollama", # Add provider field for consistency
1117 }
1118 )
1119 logger.debug(
1120 f"Added Ollama model: {name} -> {display_name}"
1121 )
1122 else:
1123 # Format for older Ollama API
1124 logger.info(
1125 f"Found {len(ollama_data)} models in older Ollama API format"
1126 )
1127 for model in ollama_data:
1128 name = model.get("name", "")
1129 if name:
1130 # Improved display name formatting
1131 display_name = re.sub(
1132 r"[:/]", " ", name
1133 ).strip()
1134 display_name = " ".join(
1135 word.capitalize()
1136 for word in display_name.split()
1137 )
1138 ollama_models.append(
1139 {
1140 "value": name,
1141 "label": f"{display_name} (Ollama)",
1142 "provider": "ollama", # Add provider field for consistency
1143 }
1144 )
1145 logger.debug(
1146 f"Added Ollama model: {name} -> {display_name}"
1147 )
1149 except json.JSONDecodeError as json_err:
1150 logger.exception(
1151 f"Failed to parse Ollama API response as JSON: {json_err}"
1152 )
1153 raise ValueError(
1154 f"Ollama API returned invalid JSON: {json_err}"
1155 )
1156 else:
1157 logger.warning(
1158 f"Ollama API returned non-200 status code: {ollama_response.status_code}"
1159 )
1160 raise ValueError(
1161 f"Ollama API returned status code {ollama_response.status_code}"
1162 )
1164 except requests.exceptions.RequestException:
1165 logger.warning("Could not connect to Ollama API")
1166 # No fallback models - just return empty list
1167 logger.info("Ollama not available - no models to display")
1168 ollama_models = []
1170 # Always set the ollama_models in providers, whether we got real or fallback models
1171 providers["ollama_models"] = ollama_models
1172 logger.info(f"Final Ollama models count: {len(ollama_models)}")
1174 # Log some model names for debugging
1175 if ollama_models: 1175 ↛ 1176line 1175 didn't jump to line 1176 because the condition on line 1175 was never true
1176 model_names = [m["value"] for m in ollama_models[:5]]
1177 logger.info(f"Sample Ollama models: {', '.join(model_names)}")
1179 except Exception:
1180 logger.exception("Error getting Ollama models")
1181 # No fallback models - just return empty list
1182 logger.info("Error getting Ollama models - no models to display")
1183 providers["ollama_models"] = []
1185 # Note: OpenAI-Compatible Endpoint models are fetched via auto-discovery
1186 # (see the auto-discovery loop below which handles OPENAI_ENDPOINT provider)
1188 # Get OpenAI models using the OpenAI package
1189 openai_models = []
1190 try:
1191 logger.info(
1192 "Attempting to connect to OpenAI API using OpenAI package"
1193 )
1195 # Get the API key from settings
1196 api_key = _get_setting_from_session("llm.openai.api_key", "")
1198 if api_key: 1198 ↛ 1199line 1198 didn't jump to line 1199 because the condition on line 1198 was never true
1199 import openai
1200 from openai import OpenAI
1202 # Create OpenAI client
1203 client = OpenAI(api_key=api_key)
1205 try:
1206 # Fetch models using the client
1207 logger.debug("Fetching models from OpenAI API")
1208 models_response = client.models.list()
1210 # Process models from the response
1211 for model in models_response.data:
1212 model_id = model.id
1213 if model_id:
1214 # Create a clean display name
1215 display_name = model_id.replace("-", " ").strip()
1216 display_name = " ".join(
1217 word.capitalize()
1218 for word in display_name.split()
1219 )
1221 openai_models.append(
1222 {
1223 "value": model_id,
1224 "label": f"{display_name} (OpenAI)",
1225 "provider": "openai",
1226 }
1227 )
1228 logger.debug(
1229 f"Added OpenAI model: {model_id} -> {display_name}"
1230 )
1232 # Keep original order from OpenAI - their models are returned in a
1233 # meaningful order (newer/more capable models first)
1235 except openai.APIError as api_err:
1236 logger.exception(f"OpenAI API error: {api_err!s}")
1237 logger.info("No OpenAI models found due to API error")
1239 else:
1240 logger.info(
1241 "OpenAI API key not configured, no models available"
1242 )
1244 except Exception:
1245 logger.exception("Error getting OpenAI models")
1246 logger.info("No OpenAI models available due to error")
1248 # Always set the openai_models in providers (will be empty array if no models found)
1249 providers["openai_models"] = openai_models
1250 logger.info(f"Final OpenAI models count: {len(openai_models)}")
1252 # Try to get Anthropic models using the Anthropic package
1253 anthropic_models = []
1254 try:
1255 logger.info(
1256 "Attempting to connect to Anthropic API using Anthropic package"
1257 )
1259 # Get the API key from settings
1260 api_key = _get_setting_from_session("llm.anthropic.api_key", "")
1262 if api_key:
1263 # Import Anthropic package here to avoid dependency issues if not installed
1264 from anthropic import Anthropic
1266 # Create Anthropic client
1267 anthropic_client = Anthropic(api_key=api_key)
1269 try:
1270 # Fetch models using the client
1271 logger.debug("Fetching models from Anthropic API")
1272 models_response = anthropic_client.models.list()
1274 # Process models from the response
1275 for model in models_response.data:
1276 model_id = model.id
1277 if model_id:
1278 # Create a clean display name
1279 display_name = model_id.replace("-", " ").strip()
1280 display_name = " ".join(
1281 word.capitalize()
1282 for word in display_name.split()
1283 )
1285 anthropic_models.append(
1286 {
1287 "value": model_id,
1288 "label": f"{display_name} (Anthropic)",
1289 "provider": "anthropic",
1290 }
1291 )
1292 logger.debug(
1293 f"Added Anthropic model: {model_id} -> {display_name}"
1294 )
1296 except Exception as api_err:
1297 logger.exception(f"Anthropic API error: {api_err!s}")
1298 else:
1299 logger.info("Anthropic API key not configured")
1301 except ImportError:
1302 logger.warning(
1303 "Anthropic package not installed. No models will be available."
1304 )
1305 except Exception:
1306 logger.exception("Error getting Anthropic models")
1308 # Set anthropic_models in providers (could be empty if API call failed)
1309 providers["anthropic_models"] = anthropic_models
1310 logger.info(f"Final Anthropic models count: {len(anthropic_models)}")
1312 # Fetch models from auto-discovered providers
1313 from ...llm.providers import discover_providers
1315 discovered_providers = discover_providers()
1317 for provider_key, provider_info in discovered_providers.items():
1318 provider_models = []
1319 try:
1320 logger.info(
1321 f"Fetching models from {provider_info.provider_name}"
1322 )
1324 # Get the provider class
1325 provider_class = provider_info.provider_class
1327 # Get API key if configured
1328 api_key = _get_setting_from_session(
1329 provider_class.api_key_setting, ""
1330 )
1332 # Get base URL if provider has configurable URL
1333 provider_base_url: str | None = None
1334 if (
1335 hasattr(provider_class, "url_setting")
1336 and provider_class.url_setting
1337 ):
1338 provider_base_url = _get_setting_from_session(
1339 provider_class.url_setting, ""
1340 )
1342 # Use the provider's list_models_for_api method
1343 models = provider_class.list_models_for_api(
1344 api_key, provider_base_url
1345 )
1347 # Format models for the API response
1348 for model in models:
1349 provider_models.append(
1350 {
1351 "value": model["value"],
1352 "label": model[
1353 "label"
1354 ], # Use provider's label as-is
1355 "provider": provider_key,
1356 }
1357 )
1359 logger.info(
1360 f"Successfully fetched {len(provider_models)} models from {provider_info.provider_name}"
1361 )
1363 except Exception:
1364 logger.exception(
1365 f"Error getting {provider_info.provider_name} models"
1366 )
1368 # Set models in providers dict using lowercase key
1369 providers[f"{normalize_provider(provider_key)}_models"] = (
1370 provider_models
1371 )
1372 logger.info(
1373 f"Final {provider_key} models count: {len(provider_models)}"
1374 )
1376 # Save fetched models to database cache
1377 if force_refresh or providers: 1377 ↛ 1426line 1377 didn't jump to line 1426 because the condition on line 1377 was always true
1378 # We fetched fresh data, save it to database
1379 username = session["username"]
1380 with get_user_db_session(username) as db_session:
1381 try:
1382 if force_refresh:
1383 # When force refresh, clear ALL cached models to remove any stale data
1384 # from old code versions or deleted providers
1385 deleted_count = db_session.query(ProviderModel).delete()
1386 logger.info(
1387 f"Force refresh: cleared all {deleted_count} cached models"
1388 )
1389 else:
1390 # Clear old cache entries only for providers we're updating
1391 for provider_key in providers:
1392 provider_name = provider_key.replace(
1393 "_models", ""
1394 ).upper()
1395 db_session.query(ProviderModel).filter(
1396 ProviderModel.provider == provider_name
1397 ).delete()
1399 # Insert new models
1400 for provider_key, models in providers.items():
1401 provider_name = provider_key.replace(
1402 "_models", ""
1403 ).upper()
1404 for model in models:
1405 if ( 1405 ↛ 1404line 1405 didn't jump to line 1404 because the condition on line 1405 was always true
1406 isinstance(model, dict)
1407 and "value" in model
1408 and "label" in model
1409 ):
1410 new_model = ProviderModel(
1411 provider=provider_name,
1412 model_key=model["value"],
1413 model_label=model["label"],
1414 last_updated=datetime.now(UTC),
1415 )
1416 db_session.add(new_model)
1418 db_session.commit()
1419 logger.info("Successfully cached models to database")
1421 except Exception:
1422 logger.exception("Error saving models to database cache")
1423 db_session.rollback()
1425 # Return all options
1426 return jsonify(
1427 {"provider_options": provider_options, "providers": providers}
1428 )
1430 except Exception:
1431 logger.exception("Error getting available models")
1432 return jsonify(
1433 {
1434 "status": "error",
1435 "message": "Failed to retrieve available models",
1436 }
1437 ), 500
1440def _get_engine_icon_and_category(
1441 engine_data: dict, engine_class=None
1442) -> tuple:
1443 """
1444 Get icon emoji and category label for a search engine based on its attributes.
1446 Args:
1447 engine_data: Engine configuration dictionary
1448 engine_class: Optional loaded engine class to check attributes
1450 Returns:
1451 Tuple of (icon, category) strings
1452 """
1453 # Check attributes from either the class or the engine data
1454 if engine_class:
1455 is_scientific = getattr(engine_class, "is_scientific", False)
1456 is_generic = getattr(engine_class, "is_generic", False)
1457 is_local = getattr(engine_class, "is_local", False)
1458 is_news = getattr(engine_class, "is_news", False)
1459 is_code = getattr(engine_class, "is_code", False)
1460 else:
1461 is_scientific = engine_data.get("is_scientific", False)
1462 is_generic = engine_data.get("is_generic", False)
1463 is_local = engine_data.get("is_local", False)
1464 is_news = engine_data.get("is_news", False)
1465 is_code = engine_data.get("is_code", False)
1467 # Check books attribute
1468 if engine_class:
1469 is_books = getattr(engine_class, "is_books", False)
1470 else:
1471 is_books = engine_data.get("is_books", False)
1473 # Return icon and category based on engine type
1474 # Priority: local > scientific > news > code > books > generic > default
1475 if is_local:
1476 return "📁", "Local RAG"
1477 if is_scientific:
1478 return "🔬", "Scientific"
1479 if is_news:
1480 return "📰", "News"
1481 if is_code:
1482 return "💻", "Code"
1483 if is_books:
1484 return "📚", "Books"
1485 if is_generic:
1486 return "🌐", "Web Search"
1487 return "🔍", "Search"
1490@settings_bp.route("/api/available-search-engines", methods=["GET"])
1491@login_required
1492@with_user_session()
1493def api_get_available_search_engines(db_session=None, settings_manager=None):
1494 """Get available search engines"""
1495 try:
1496 # Get search engines using the same approach as search_engines_config.py
1497 from ...web_search_engines.search_engines_config import search_config
1499 username = session["username"]
1500 search_engines = search_config(username=username, db_session=db_session)
1502 # Get user's favorites using SettingsManager
1503 favorites = settings_manager.get_setting("search.favorites", [])
1504 if not isinstance(favorites, list): 1504 ↛ 1505line 1504 didn't jump to line 1505 because the condition on line 1504 was never true
1505 favorites = []
1507 # Extract search engines from config
1508 engines_dict = {}
1509 engine_options = []
1511 if search_engines: 1511 ↛ 1578line 1511 didn't jump to line 1578 because the condition on line 1511 was always true
1512 # Format engines for API response with metadata
1513 from ...security.module_whitelist import (
1514 get_safe_module_class,
1515 SecurityError,
1516 )
1518 for engine_id, engine_data in search_engines.items():
1519 # Try to load the engine class to get metadata
1520 engine_class = None
1521 try:
1522 module_path = engine_data.get("module_path")
1523 class_name = engine_data.get("class_name")
1524 if module_path and class_name: 1524 ↛ 1539line 1524 didn't jump to line 1539 because the condition on line 1524 was always true
1525 # Use secure whitelist-validated import
1526 engine_class = get_safe_module_class(
1527 module_path, class_name
1528 )
1529 except SecurityError:
1530 logger.warning(
1531 f"Security: Blocked unsafe module for {engine_id}"
1532 )
1533 except Exception as e:
1534 logger.debug(
1535 f"Could not load engine class for {engine_id}: {e}"
1536 )
1538 # Get icon and category from engine attributes
1539 icon, category = _get_engine_icon_and_category(
1540 engine_data, engine_class
1541 )
1543 # Check if engine requires an API key
1544 requires_api_key = engine_data.get("requires_api_key", False)
1546 # Build display name with icon, category, and API key status
1547 base_name = engine_data.get("display_name", engine_id)
1548 if requires_api_key:
1549 label = f"{icon} {base_name} ({category}, API key)"
1550 else:
1551 label = f"{icon} {base_name} ({category}, Free)"
1553 # Check if engine is a favorite
1554 is_favorite = engine_id in favorites
1556 engines_dict[engine_id] = {
1557 "display_name": base_name,
1558 "description": engine_data.get("description", ""),
1559 "strengths": engine_data.get("strengths", []),
1560 "icon": icon,
1561 "category": category,
1562 "requires_api_key": requires_api_key,
1563 "is_favorite": is_favorite,
1564 }
1566 engine_options.append(
1567 {
1568 "value": engine_id,
1569 "label": label,
1570 "icon": icon,
1571 "category": category,
1572 "requires_api_key": requires_api_key,
1573 "is_favorite": is_favorite,
1574 }
1575 )
1577 # Sort engine_options: favorites first, then alphabetically by label
1578 engine_options.sort(
1579 key=lambda x: (
1580 not x.get("is_favorite", False),
1581 x.get("label", "").lower(),
1582 )
1583 )
1585 # If no engines found, log the issue but return empty list
1586 if not engine_options: 1586 ↛ 1587line 1586 didn't jump to line 1587 because the condition on line 1586 was never true
1587 logger.warning("No search engines found in configuration")
1589 return jsonify(
1590 {
1591 "engines": engines_dict,
1592 "engine_options": engine_options,
1593 "favorites": favorites,
1594 }
1595 )
1597 except Exception:
1598 logger.exception("Error getting available search engines")
1599 return jsonify({"error": "Failed to retrieve search engines"}), 500
1602@settings_bp.route("/api/search-favorites", methods=["GET"])
1603@login_required
1604@with_user_session()
1605def api_get_search_favorites(db_session=None, settings_manager=None):
1606 """Get the list of favorite search engines for the current user"""
1607 try:
1608 favorites = settings_manager.get_setting("search.favorites", [])
1609 if not isinstance(favorites, list):
1610 favorites = []
1611 return jsonify({"favorites": favorites})
1613 except Exception:
1614 logger.exception("Error getting search favorites")
1615 return jsonify({"error": "Failed to retrieve favorites"}), 500
1618@settings_bp.route("/api/search-favorites", methods=["PUT"])
1619@login_required
1620@require_json_body(error_message="No data provided")
1621@with_user_session()
1622def api_update_search_favorites(db_session=None, settings_manager=None):
1623 """Update the list of favorite search engines for the current user"""
1624 try:
1625 data = request.get_json()
1626 favorites = data.get("favorites")
1627 if favorites is None:
1628 return jsonify({"error": "No favorites provided"}), 400
1630 if not isinstance(favorites, list):
1631 return jsonify({"error": "Favorites must be a list"}), 400
1633 if settings_manager.set_setting("search.favorites", favorites):
1634 invalidate_settings_caches(session["username"])
1635 return jsonify(
1636 {
1637 "message": "Favorites updated successfully",
1638 "favorites": favorites,
1639 }
1640 )
1641 return jsonify({"error": "Failed to update favorites"}), 500
1643 except Exception:
1644 logger.exception("Error updating search favorites")
1645 return jsonify({"error": "Failed to update favorites"}), 500
1648@settings_bp.route("/api/search-favorites/toggle", methods=["POST"])
1649@login_required
1650@require_json_body(error_message="No data provided")
1651@with_user_session()
1652def api_toggle_search_favorite(db_session=None, settings_manager=None):
1653 """Toggle a search engine as favorite"""
1654 try:
1655 data = request.get_json()
1656 engine_id = data.get("engine_id")
1657 if not engine_id:
1658 return jsonify({"error": "No engine_id provided"}), 400
1660 # Get current favorites
1661 favorites = settings_manager.get_setting("search.favorites", [])
1662 if not isinstance(favorites, list):
1663 favorites = []
1664 else:
1665 # Make a copy to avoid modifying the original
1666 favorites = list(favorites)
1668 # Toggle the engine
1669 is_favorite = engine_id in favorites
1670 if is_favorite:
1671 favorites.remove(engine_id)
1672 is_favorite = False
1673 else:
1674 favorites.append(engine_id)
1675 is_favorite = True
1677 # Update the setting
1678 if settings_manager.set_setting("search.favorites", favorites):
1679 invalidate_settings_caches(session["username"])
1680 return jsonify(
1681 {
1682 "message": "Favorite toggled successfully",
1683 "engine_id": engine_id,
1684 "is_favorite": is_favorite,
1685 "favorites": favorites,
1686 }
1687 )
1688 return jsonify({"error": "Failed to toggle favorite"}), 500
1690 except Exception:
1691 logger.exception("Error toggling search favorite")
1692 return jsonify({"error": "Failed to toggle favorite"}), 500
1695# Legacy routes for backward compatibility - these will redirect to the new routes
1696@settings_bp.route("/main", methods=["GET"])
1697@login_required
1698def main_config_page():
1699 """Redirect to app settings page"""
1700 return redirect(url_for("settings.settings_page"))
1703@settings_bp.route("/collections", methods=["GET"])
1704@login_required
1705def collections_config_page():
1706 """Redirect to app settings page"""
1707 return redirect(url_for("settings.settings_page"))
1710@settings_bp.route("/api_keys", methods=["GET"])
1711@login_required
1712def api_keys_config_page():
1713 """Redirect to LLM settings page"""
1714 return redirect(url_for("settings.settings_page"))
1717@settings_bp.route("/search_engines", methods=["GET"])
1718@login_required
1719def search_engines_config_page():
1720 """Redirect to search settings page"""
1721 return redirect(url_for("settings.settings_page"))
1724@settings_bp.route("/llm", methods=["GET"])
1725@login_required
1726def llm_config_page():
1727 """Redirect to LLM settings page"""
1728 return redirect(url_for("settings.settings_page"))
1731@settings_bp.route("/open_file_location", methods=["POST"])
1732@login_required
1733def open_file_location():
1734 """Open the location of a configuration file.
1736 Security: This endpoint is disabled for server deployments.
1737 It only makes sense for desktop usage where the server and client are on the same machine.
1738 """
1739 return jsonify(
1740 {
1741 "status": "error",
1742 "message": "This feature is disabled. It is only available in desktop mode.",
1743 }
1744 ), 403
1747@settings_bp.context_processor
1748def inject_csrf_token():
1749 """Inject CSRF token into the template context for all settings routes."""
1750 return {"csrf_token": generate_csrf}
1753@settings_bp.route("/fix_corrupted_settings", methods=["POST"])
1754@login_required
1755@settings_limit
1756@with_user_session(include_settings_manager=False)
1757def fix_corrupted_settings(db_session=None):
1758 """Fix corrupted settings in the database"""
1759 try:
1760 # Track fixed and removed settings
1761 fixed_settings = []
1762 removed_duplicate_settings = []
1763 # First, find and remove duplicate settings with the same key
1764 # This happens because of errors in settings import/export
1765 from sqlalchemy import func as sql_func
1767 # Find keys with duplicates
1768 duplicate_keys = (
1769 db_session.query(Setting.key)
1770 .group_by(Setting.key)
1771 .having(sql_func.count(Setting.key) > 1)
1772 .all()
1773 )
1774 duplicate_keys = [key[0] for key in duplicate_keys]
1776 # For each duplicate key, keep the latest updated one and remove others
1777 for key in duplicate_keys:
1778 dupe_settings = (
1779 db_session.query(Setting)
1780 .filter(Setting.key == key)
1781 .order_by(Setting.updated_at.desc())
1782 .all()
1783 )
1785 # Keep the first one (most recently updated) and delete the rest
1786 for i, setting in enumerate(dupe_settings):
1787 if i > 0: # Skip the first one (keep it)
1788 db_session.delete(setting)
1789 removed_duplicate_settings.append(key)
1791 # Check for settings with corrupted values
1792 all_settings = db_session.query(Setting).all()
1793 for setting in all_settings:
1794 # Check different types of corruption
1795 is_corrupted = False
1797 if (
1798 setting.value is None
1799 or (
1800 isinstance(setting.value, str)
1801 and setting.value
1802 in [
1803 "{",
1804 "[",
1805 "{}",
1806 "[]",
1807 "[object Object]",
1808 "null",
1809 "undefined",
1810 ]
1811 )
1812 or (isinstance(setting.value, dict) and len(setting.value) == 0)
1813 ):
1814 is_corrupted = True
1816 # Skip if not corrupted
1817 if not is_corrupted:
1818 continue
1820 default_value: Any = None
1822 # Try to find a matching default setting based on key
1823 if setting.key.startswith("llm."):
1824 if setting.key == "llm.model":
1825 default_value = "gemma3:12b"
1826 elif setting.key == "llm.provider":
1827 default_value = "ollama"
1828 elif setting.key == "llm.temperature":
1829 default_value = 0.7
1830 elif setting.key == "llm.max_tokens": 1830 ↛ 1871line 1830 didn't jump to line 1871 because the condition on line 1830 was always true
1831 default_value = 1024
1832 elif setting.key.startswith("search."):
1833 if setting.key == "search.tool":
1834 default_value = "auto"
1835 elif setting.key == "search.max_results":
1836 default_value = 10
1837 elif setting.key == "search.region":
1838 default_value = "us"
1839 elif setting.key == "search.questions_per_iteration":
1840 default_value = 3
1841 elif setting.key == "search.searches_per_section":
1842 default_value = 2
1843 elif setting.key == "search.skip_relevance_filter":
1844 default_value = False
1845 elif setting.key == "search.safe_search":
1846 default_value = True
1847 elif setting.key == "search.search_language": 1847 ↛ 1871line 1847 didn't jump to line 1871 because the condition on line 1847 was always true
1848 default_value = "English"
1849 elif setting.key.startswith("report."):
1850 if setting.key == "report.searches_per_section":
1851 default_value = 2
1852 elif setting.key.startswith("app."): 1852 ↛ 1871line 1852 didn't jump to line 1871 because the condition on line 1852 was always true
1853 if (
1854 setting.key == "app.theme"
1855 or setting.key == "app.default_theme"
1856 ):
1857 default_value = "dark"
1858 elif setting.key == "app.enable_notifications" or (
1859 setting.key == "app.enable_web"
1860 or setting.key == "app.web_interface"
1861 ):
1862 default_value = True
1863 elif setting.key == "app.host":
1864 default_value = "0.0.0.0"
1865 elif setting.key == "app.port":
1866 default_value = 5000
1867 elif setting.key == "app.debug": 1867 ↛ 1871line 1867 didn't jump to line 1871 because the condition on line 1867 was always true
1868 default_value = True
1870 # Update the setting with the default value if found
1871 if default_value is not None:
1872 setting.value = default_value
1873 fixed_settings.append(setting.key)
1874 else:
1875 # If no default found but it's a corrupted JSON, set to empty object
1876 if setting.key.startswith("report."): 1876 ↛ 1793line 1876 didn't jump to line 1793 because the condition on line 1876 was always true
1877 setting.value = {}
1878 fixed_settings.append(setting.key)
1880 # Commit changes
1881 if fixed_settings or removed_duplicate_settings:
1882 db_session.commit()
1883 logger.info(
1884 f"Fixed {len(fixed_settings)} corrupted settings: {', '.join(fixed_settings)}"
1885 )
1886 if removed_duplicate_settings:
1887 logger.info(
1888 f"Removed {len(removed_duplicate_settings)} duplicate settings"
1889 )
1890 invalidate_settings_caches(session["username"])
1892 # Return success
1893 return jsonify(
1894 {
1895 "status": "success",
1896 "message": f"Fixed {len(fixed_settings)} corrupted settings, removed {len(removed_duplicate_settings)} duplicates",
1897 "fixed_settings": fixed_settings,
1898 "removed_duplicates": removed_duplicate_settings,
1899 }
1900 )
1902 except Exception:
1903 logger.exception("Error fixing corrupted settings")
1904 db_session.rollback()
1905 return (
1906 jsonify(
1907 {
1908 "status": "error",
1909 "message": "An internal error occurred while fixing corrupted settings. Please try again later.",
1910 }
1911 ),
1912 500,
1913 )
1916@settings_bp.route("/api/warnings", methods=["GET"])
1917@login_required
1918def api_get_warnings():
1919 """Get current warnings based on settings"""
1920 try:
1921 warnings = calculate_warnings()
1922 return jsonify({"warnings": warnings})
1923 except Exception:
1924 logger.exception("Error getting warnings")
1925 return jsonify({"error": "Failed to retrieve warnings"}), 500
1928@settings_bp.route("/api/backup-status", methods=["GET"])
1929@login_required
1930def api_get_backup_status():
1931 """Get backup status for the current user."""
1932 try:
1933 from ...config.paths import get_user_backup_directory
1935 username = session.get("username")
1936 if not username:
1937 return jsonify({"error": "Not authenticated"}), 401
1939 from ...utilities.formatting import human_size
1941 backup_dir = get_user_backup_directory(username)
1943 # Sort by modification time (not filename) for robustness
1944 backup_list = []
1945 total_size = 0
1946 for b in backup_dir.glob("ldr_backup_*.db"):
1947 try:
1948 stat = b.stat()
1949 total_size += stat.st_size
1950 backup_list.append(
1951 {
1952 "filename": b.name,
1953 "size_bytes": stat.st_size,
1954 "size_human": human_size(stat.st_size),
1955 "created_at": datetime.fromtimestamp(
1956 stat.st_mtime, tz=timezone.utc
1957 ).isoformat(),
1958 "_mtime": stat.st_mtime,
1959 }
1960 )
1961 except FileNotFoundError:
1962 continue
1964 # Sort newest first by mtime, then remove internal field
1965 backup_list.sort(key=lambda x: x["_mtime"], reverse=True)
1966 for entry in backup_list:
1967 del entry["_mtime"]
1969 backup_enabled = _get_setting_from_session("backup.enabled", True)
1971 return jsonify(
1972 {
1973 "enabled": bool(backup_enabled),
1974 "count": len(backup_list),
1975 "backups": backup_list,
1976 "total_size_bytes": total_size,
1977 "total_size_human": human_size(total_size),
1978 }
1979 )
1981 except Exception:
1982 logger.exception("Error getting backup status")
1983 return jsonify({"error": "Failed to retrieve backup status"}), 500
1986@settings_bp.route("/api/ollama-status", methods=["GET"])
1987@login_required
1988def check_ollama_status():
1989 """Check if Ollama is running and available"""
1990 try:
1991 # Get Ollama URL from settings
1992 raw_base_url = _get_setting_from_session(
1993 "llm.ollama.url", DEFAULT_OLLAMA_URL
1994 )
1995 base_url = (
1996 normalize_url(raw_base_url) if raw_base_url else DEFAULT_OLLAMA_URL
1997 )
1999 response = safe_get(
2000 f"{base_url}/api/version",
2001 timeout=2,
2002 allow_localhost=True,
2003 allow_private_ips=True,
2004 )
2006 if response.status_code == 200:
2007 return jsonify(
2008 {
2009 "running": True,
2010 "version": response.json().get("version", "unknown"),
2011 }
2012 )
2013 return jsonify(
2014 {
2015 "running": False,
2016 "error": f"Ollama returned status code {response.status_code}",
2017 }
2018 )
2019 except requests.exceptions.RequestException:
2020 logger.exception("Ollama check failed")
2021 return jsonify(
2022 {"running": False, "error": "Failed to check search engine status"}
2023 )
2026@settings_bp.route("/api/rate-limiting/status", methods=["GET"])
2027@login_required
2028def api_get_rate_limiting_status():
2029 """Get current rate limiting status and statistics"""
2030 try:
2031 from ...web_search_engines.rate_limiting import get_tracker
2033 tracker = get_tracker()
2035 # Get basic status
2036 status = {
2037 "enabled": tracker.enabled,
2038 "exploration_rate": tracker.exploration_rate,
2039 "learning_rate": tracker.learning_rate,
2040 "memory_window": tracker.memory_window,
2041 }
2043 # Get engine statistics
2044 engine_stats = tracker.get_stats()
2045 engines = []
2047 for stat in engine_stats:
2048 (
2049 engine_type,
2050 base_wait,
2051 min_wait,
2052 max_wait,
2053 last_updated,
2054 total_attempts,
2055 success_rate,
2056 ) = stat
2057 engines.append(
2058 {
2059 "engine_type": engine_type,
2060 "base_wait_seconds": round(base_wait, 2),
2061 "min_wait_seconds": round(min_wait, 2),
2062 "max_wait_seconds": round(max_wait, 2),
2063 "last_updated": last_updated,
2064 "total_attempts": total_attempts,
2065 "success_rate": (
2066 round(success_rate * 100, 1) if success_rate else 0.0
2067 ),
2068 }
2069 )
2071 return jsonify({"status": status, "engines": engines})
2073 except Exception:
2074 logger.exception("Error getting rate limiting status")
2075 return jsonify({"error": "An internal error occurred"}), 500
2078@settings_bp.route(
2079 "/api/rate-limiting/engines/<engine_type>/reset", methods=["POST"]
2080)
2081@login_required
2082def api_reset_engine_rate_limiting(engine_type):
2083 """Reset rate limiting data for a specific engine"""
2084 try:
2085 from ...web_search_engines.rate_limiting import get_tracker
2087 tracker = get_tracker()
2088 tracker.reset_engine(engine_type)
2090 return jsonify(
2091 {"message": f"Rate limiting data reset for {engine_type}"}
2092 )
2094 except Exception:
2095 logger.exception(f"Error resetting rate limiting for {engine_type}")
2096 return jsonify({"error": "An internal error occurred"}), 500
2099@settings_bp.route("/api/rate-limiting/cleanup", methods=["POST"])
2100@login_required
2101def api_cleanup_rate_limiting():
2102 """Clean up old rate limiting data.
2104 Note: not using @require_json_body because the JSON body is optional
2105 here — the endpoint works with or without a payload (defaults to 30 days).
2106 """
2107 try:
2108 from ...web_search_engines.rate_limiting import get_tracker
2110 data = request.get_json() if request.is_json else None
2111 days = data.get("days", 30) if data is not None else 30
2113 tracker = get_tracker()
2114 tracker.cleanup_old_data(days)
2116 return jsonify(
2117 {"message": f"Cleaned up rate limiting data older than {days} days"}
2118 )
2120 except Exception:
2121 logger.exception("Error cleaning up rate limiting data")
2122 return jsonify({"error": "An internal error occurred"}), 500
2125@settings_bp.route("/api/bulk", methods=["GET"])
2126@login_required
2127def get_bulk_settings():
2128 """Get multiple settings at once for performance."""
2129 try:
2130 # Get requested settings from query parameters
2131 requested = request.args.getlist("keys[]")
2132 if not requested:
2133 # Default to common settings if none specified
2134 requested = [
2135 "llm.provider",
2136 "llm.model",
2137 "search.tool",
2138 "search.iterations",
2139 "search.questions_per_iteration",
2140 "search.search_strategy",
2141 "benchmark.evaluation.provider",
2142 "benchmark.evaluation.model",
2143 "benchmark.evaluation.temperature",
2144 "benchmark.evaluation.endpoint_url",
2145 ]
2147 # Fetch all settings at once
2148 result = {}
2149 for key in requested:
2150 try:
2151 value = _get_setting_from_session(key)
2152 result[key] = {"value": value, "exists": value is not None}
2153 except Exception:
2154 logger.warning(f"Error getting setting {key}")
2155 result[key] = {
2156 "value": None,
2157 "exists": False,
2158 "error": "Failed to retrieve setting",
2159 }
2161 return jsonify({"success": True, "settings": result})
2163 except Exception:
2164 logger.exception("Error getting bulk settings")
2165 return jsonify(
2166 {"success": False, "error": "An internal error occurred"}
2167 ), 500
2170@settings_bp.route("/api/data-location", methods=["GET"])
2171@login_required
2172def api_get_data_location():
2173 """Get information about data storage location and security"""
2174 try:
2175 # Get the data directory path
2176 data_dir = get_data_directory()
2177 # Get the encrypted databases path
2178 encrypted_db_path = get_encrypted_database_path()
2180 # Check if LDR_DATA_DIR environment variable is set
2181 from local_deep_research.settings.manager import SettingsManager
2183 settings_manager = SettingsManager()
2184 custom_data_dir = settings_manager.get_setting("bootstrap.data_dir")
2186 # Get platform-specific default location info
2187 platform_info = {
2188 "Windows": "C:\\Users\\Username\\AppData\\Local\\local-deep-research",
2189 "macOS": "~/Library/Application Support/local-deep-research",
2190 "Linux": "~/.local/share/local-deep-research",
2191 }
2193 # Current platform
2194 current_platform = platform.system()
2195 if current_platform == "Darwin":
2196 current_platform = "macOS"
2198 # Get SQLCipher settings from environment
2199 from ...database.sqlcipher_utils import get_sqlcipher_settings
2201 # Debug logging
2202 logger.info(f"db_manager type: {type(db_manager)}")
2203 logger.info(
2204 f"db_manager.has_encryption: {getattr(db_manager, 'has_encryption', 'ATTRIBUTE NOT FOUND')}"
2205 )
2207 cipher_settings = (
2208 get_sqlcipher_settings() if db_manager.has_encryption else {}
2209 )
2211 return jsonify(
2212 {
2213 "data_directory": str(data_dir),
2214 "database_path": str(encrypted_db_path),
2215 "encrypted_database_path": str(encrypted_db_path),
2216 "is_custom": custom_data_dir is not None,
2217 "custom_env_var": "LDR_DATA_DIR",
2218 "custom_env_value": custom_data_dir,
2219 "platform": current_platform,
2220 "platform_default": platform_info.get(
2221 current_platform, str(data_dir)
2222 ),
2223 "platform_info": platform_info,
2224 "security_notice": {
2225 "encrypted": db_manager.has_encryption,
2226 "warning": "All data including API keys stored in the database are securely encrypted."
2227 if db_manager.has_encryption
2228 else "All data including API keys stored in the database are currently unencrypted. Please ensure appropriate file system permissions are set.",
2229 "recommendation": "Your data is protected with database encryption."
2230 if db_manager.has_encryption
2231 else "Consider using environment variables for sensitive API keys instead of storing them in the database.",
2232 },
2233 "encryption_settings": cipher_settings,
2234 }
2235 )
2237 except Exception:
2238 logger.exception("Error getting data location information")
2239 return jsonify({"error": "Failed to retrieve data location"}), 500
2242@settings_bp.route("/api/notifications/test-url", methods=["POST"])
2243@login_required
2244def api_test_notification_url():
2245 """
2246 Test a notification service URL.
2248 This endpoint creates a temporary NotificationService instance to test
2249 the provided URL. No database session or password is required because:
2250 - The service URL is provided directly in the request body
2251 - Test notifications use a temporary Apprise instance
2252 - No user settings or database queries are performed
2254 Security note: Rate limiting is not applied here because users need to
2255 test URLs when configuring notifications. Abuse is mitigated by the
2256 @login_required decorator and the fact that users can only spam their
2257 own notification services.
2258 """
2259 try:
2260 from ...notifications.service import NotificationService
2262 data = request.get_json()
2263 if not data or "service_url" not in data:
2264 return jsonify(
2265 {"success": False, "error": "service_url is required"}
2266 ), 400
2268 service_url = data["service_url"]
2270 # Create notification service instance and test the URL
2271 # No password/session needed - URL provided directly, no DB access
2272 notification_service = NotificationService()
2273 result = notification_service.test_service(service_url)
2275 # Only return expected fields to prevent information leakage
2276 safe_response = {
2277 "success": result.get("success", False),
2278 "message": result.get("message", ""),
2279 "error": result.get("error", ""),
2280 }
2281 return jsonify(safe_response)
2283 except Exception:
2284 logger.exception("Error testing notification URL")
2285 return jsonify(
2286 {
2287 "success": False,
2288 "error": "Failed to test notification service. Check logs for details.",
2289 }
2290 ), 500