Coverage for src / local_deep_research / web / routes / settings_routes.py: 35%
913 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Settings Routes Module
4This module handles all settings-related HTTP endpoints for the application.
6CHECKBOX HANDLING PATTERN:
7--------------------------
8This module supports TWO submission modes to handle checkboxes correctly:
10**MODE 1: AJAX/JSON Submission (Primary - /save_all_settings)**
11- JavaScript intercepts form submission with e.preventDefault()
12- Checkbox values read directly from DOM via checkbox.checked
13- Data sent as JSON: {"setting.key": true/false}
14- Hidden fallback inputs are managed but NOT used in this mode
15- Provides better UX with instant feedback and validation
17**MODE 2: Traditional POST Submission (Fallback - /save_settings)**
18- Used when JavaScript is disabled (accessibility/no-JS environments)
19- Browser submits form data naturally via request.form
20- Hidden fallback pattern CRITICAL here:
21 * Checked checkbox: Submits checkbox value, hidden input disabled
22 * Unchecked checkbox: Submits hidden input value "false"
23- Ensures unchecked checkboxes are captured (HTML limitation workaround)
25**Implementation Details:**
261. Each checkbox has `data-hidden-fallback` attribute → hidden input ID
272. checkbox_handler.js manages hidden input disabled state
283. AJAX mode: settings.js reads checkbox.checked directly (lines 2233-2240)
294. POST mode: Flask reads request.form including enabled hidden inputs
305. Both modes use convert_setting_value() for consistent boolean conversion
32**Why Both Patterns?**
33- AJAX: Better UX, immediate validation, no page reload
34- Traditional POST: Accessibility, progressive enhancement, JavaScript-free operation
35- Hidden inputs: Only meaningful for traditional POST, ignored in AJAX mode
37This dual-mode approach ensures the app works for all users while providing
38optimal experience when JavaScript is available.
39"""
41import json
42import platform
43import subprocess
44from typing import Any, Optional, Tuple
45from datetime import datetime, UTC, timedelta
47import requests
48from flask import (
49 Blueprint,
50 flash,
51 jsonify,
52 redirect,
53 request,
54 session,
55 url_for,
56)
57from flask_wtf.csrf import generate_csrf
58from loguru import logger
60from ...config.paths import get_data_directory, get_encrypted_database_path
61from ...database.models import Setting, SettingType
62from ...database.session_context import get_user_db_session
63from ...database.encrypted_db import db_manager
64from ...utilities.db_utils import get_settings_manager
65from ...utilities.url_utils import normalize_url
66from ..auth.decorators import login_required
67from ...settings import SettingsManager
68from ...settings.manager import get_typed_setting_value, parse_boolean
69from ..services.settings_service import (
70 create_or_update_setting,
71 set_setting,
72)
73from ..utils.templates import render_template_with_defaults
74from ..server_config import sync_from_settings
75from ...security import safe_get
77# Create a Blueprint for settings
78settings_bp = Blueprint("settings", __name__, url_prefix="/settings")
80# Settings with dynamically populated options (excluded from validation)
81DYNAMIC_SETTINGS = ["llm.provider", "llm.model", "search.tool"]
84def _get_setting_from_session(key: str, default=None):
85 """Helper to get a setting using the current session context."""
86 username = session.get("username")
87 with get_user_db_session(username) as db_session:
88 if db_session: 88 ↛ 91line 88 didn't jump to line 91
89 settings_manager = get_settings_manager(db_session, username)
90 return settings_manager.get_setting(key, default)
91 return default
94def calculate_warnings():
95 """Calculate current warning conditions based on settings"""
96 warnings = []
98 try:
99 # Get current settings using proper session context
100 username = session.get("username")
101 with get_user_db_session(username) as db_session:
102 if db_session: 102 ↛ 121line 102 didn't jump to line 121
103 settings_manager = get_settings_manager(db_session, username)
104 provider = settings_manager.get_setting(
105 "llm.provider", "ollama"
106 ).lower()
107 local_context = settings_manager.get_setting(
108 "llm.local_context_window_size", 4096
109 )
111 # Get dismissal settings
112 dismiss_high_context = settings_manager.get_setting(
113 "app.warnings.dismiss_high_context", False
114 )
116 logger.debug(
117 f"Starting warning calculation - provider={provider}"
118 )
120 # Check warning conditions
121 is_local_provider = provider in [
122 "ollama",
123 "llamacpp",
124 "lmstudio",
125 "vllm",
126 ]
128 # High context warning for local providers
129 if ( 129 ↛ 134line 129 didn't jump to line 134 because the condition on line 129 was never true
130 is_local_provider
131 and local_context > 8192
132 and not dismiss_high_context
133 ):
134 warnings.append(
135 {
136 "type": "high_context",
137 "icon": "⚠️",
138 "title": "High Context Warning",
139 "message": f"Context size ({local_context:,} tokens) may cause memory issues with {provider}. Increase VRAM or reduce context size if you experience slowdowns.",
140 "dismissKey": "app.warnings.dismiss_high_context",
141 }
142 )
144 # Get additional warning settings
145 with get_user_db_session(username) as db_session:
146 if db_session: 146 ↛ 157line 146 didn't jump to line 157
147 settings_manager = get_settings_manager(db_session, username)
148 dismiss_model_mismatch = settings_manager.get_setting(
149 "app.warnings.dismiss_model_mismatch", False
150 )
152 # Get current strategy and model (these need to be passed from the frontend or retrieved differently)
153 # For now, we'll implement basic warnings that don't require form state
155 # Model mismatch warning (simplified - checking setting instead of form value)
156 current_model = settings_manager.get_setting("llm.model", "")
157 if ( 157 ↛ 164line 157 didn't jump to line 164 because the condition on line 157 was never true
158 current_model
159 and "70b" in current_model.lower()
160 and is_local_provider
161 and local_context > 8192
162 and not dismiss_model_mismatch
163 ):
164 warnings.append(
165 {
166 "type": "model_mismatch",
167 "icon": "🧠",
168 "title": "Model & Context Warning",
169 "message": f"Large model ({current_model}) with high context ({local_context:,}) may exceed VRAM. Consider reducing context size or upgrading GPU memory.",
170 "dismissKey": "app.warnings.dismiss_model_mismatch",
171 }
172 )
174 except Exception as e:
175 logger.warning(f"Error calculating warnings: {e}")
177 return warnings
180def validate_setting(
181 setting: Setting, value: Any
182) -> Tuple[bool, Optional[str]]:
183 """
184 Validate a setting value based on its type and constraints.
186 Args:
187 setting: The Setting object to validate against
188 value: The value to validate
190 Returns:
191 Tuple of (is_valid, error_message)
192 """
193 # Convert value to appropriate type first using SettingsManager's logic
194 value = get_typed_setting_value(
195 key=setting.key,
196 value=value,
197 ui_element=setting.ui_element,
198 default=None,
199 check_env=False,
200 )
202 # Validate based on UI element type
203 if setting.ui_element == "checkbox":
204 # After conversion, should be boolean
205 if not isinstance(value, bool):
206 return False, "Value must be a boolean"
208 elif setting.ui_element in ("number", "slider", "range"):
209 # After conversion, should be numeric
210 if not isinstance(value, (int, float)):
211 return False, "Value must be a number"
213 # Check min/max constraints if defined
214 if setting.min_value is not None and value < setting.min_value:
215 return False, f"Value must be at least {setting.min_value}"
216 if setting.max_value is not None and value > setting.max_value:
217 return False, f"Value must be at most {setting.max_value}"
219 elif setting.ui_element == "select":
220 # Check if value is in the allowed options
221 if setting.options:
222 # Skip options validation for dynamically populated dropdowns
223 if setting.key not in DYNAMIC_SETTINGS:
224 allowed_values = [opt.get("value") for opt in setting.options]
225 if value not in allowed_values:
226 return (
227 False,
228 f"Value must be one of: {', '.join(str(v) for v in allowed_values)}",
229 )
231 # All checks passed
232 return True, None
235@settings_bp.route("/", methods=["GET"])
236@login_required
237def settings_page():
238 """Main settings dashboard with links to specialized config pages"""
239 return render_template_with_defaults("settings_dashboard.html")
242@settings_bp.route("/save_all_settings", methods=["POST"])
243@login_required
244def save_all_settings():
245 """Handle saving all settings at once from the unified settings page"""
246 username = session.get("username")
248 with get_user_db_session(username) as db_session:
249 # Get the settings manager but we don't need to assign it to a variable right now
250 # get_db_settings_manager(db_session)
252 try:
253 # Process JSON data
254 form_data = request.get_json()
255 if not form_data:
256 return (
257 jsonify(
258 {
259 "status": "error",
260 "message": "No settings data provided",
261 }
262 ),
263 400,
264 )
266 # Track validation errors
267 validation_errors = []
268 settings_by_type = {}
270 # Track changes for logging
271 updated_settings = []
272 created_settings = []
274 # Store original values for better messaging
275 original_values = {}
277 # Fetch all settings once to avoid N+1 query problem
278 all_db_settings = {
279 setting.key: setting
280 for setting in db_session.query(Setting).all()
281 }
283 # Update each setting
284 for key, value in form_data.items():
285 # Skip corrupted keys or empty strings as keys
286 if not key or not isinstance(key, str) or key.strip() == "":
287 continue
289 # Get the setting metadata from pre-fetched dict
290 current_setting = all_db_settings.get(key)
292 # EARLY VALIDATION: Convert checkbox values BEFORE any other processing
293 # This prevents incorrect triggering of corrupted value detection
294 if current_setting and current_setting.ui_element == "checkbox":
295 if not isinstance(value, bool):
296 logger.debug(
297 f"Converting checkbox {key} from {type(value).__name__} to bool: {value}"
298 )
299 value = parse_boolean(value)
300 form_data[key] = (
301 value # Update the form_data with converted value
302 )
304 # Store original value for messaging
305 if current_setting:
306 original_values[key] = current_setting.value
308 # Determine setting type and category
309 if key.startswith("llm."):
310 setting_type = SettingType.LLM
311 category = "llm_general"
312 if (
313 "temperature" in key
314 or "max_tokens" in key
315 or "batch" in key
316 or "layers" in key
317 ):
318 category = "llm_parameters"
319 elif key.startswith("search."):
320 setting_type = SettingType.SEARCH
321 category = "search_general"
322 if (
323 "iterations" in key
324 or "results" in key
325 or "region" in key
326 or "questions" in key
327 or "section" in key
328 ):
329 category = "search_parameters"
330 elif key.startswith("report."):
331 setting_type = SettingType.REPORT
332 category = "report_parameters"
333 elif key.startswith("database."):
334 setting_type = SettingType.DATABASE
335 category = "database_parameters"
336 elif key.startswith("app."):
337 setting_type = SettingType.APP
338 category = "app_interface"
339 else:
340 setting_type = None
341 category = None
343 # Special handling for corrupted or empty values
344 if value == "[object Object]" or (
345 isinstance(value, str)
346 and value.strip() in ["{}", "[]", "{", "["]
347 ):
348 if key.startswith("report."):
349 value = {}
350 else:
351 # Use default or null for other types
352 if key == "llm.model":
353 value = "gpt-3.5-turbo"
354 elif key == "llm.provider":
355 value = "openai"
356 elif key == "search.tool":
357 value = "auto"
358 elif key in ["app.theme", "app.default_theme"]:
359 value = "dark"
360 else:
361 value = None
363 logger.warning(
364 f"Corrected corrupted value for {key}: {value}"
365 )
367 # Handle JSON string values (already parsed by JavaScript)
368 if isinstance(value, (dict, list)):
369 # Keep as is, already parsed
370 pass
371 # Handle string values that might be JSON
372 elif isinstance(value, str) and (
373 value.startswith("{") or value.startswith("[")
374 ):
375 try:
376 # Try to parse the string as JSON
377 value = json.loads(value)
378 except json.JSONDecodeError:
379 # If it fails to parse, keep as string
380 pass
382 if current_setting:
383 # Convert value to appropriate type using SettingsManager's logic
384 converted_value = get_typed_setting_value(
385 key=current_setting.key,
386 value=value,
387 ui_element=current_setting.ui_element,
388 default=None,
389 check_env=False,
390 )
392 # Validate the setting
393 is_valid, error_message = validate_setting(
394 current_setting, converted_value
395 )
397 if is_valid:
398 # Save the converted setting using the same session
399 success = set_setting(
400 key, converted_value, db_session=db_session
401 )
402 if success:
403 updated_settings.append(key)
405 # Track settings by type for exporting
406 if current_setting.type not in settings_by_type:
407 settings_by_type[current_setting.type] = []
408 settings_by_type[current_setting.type].append(
409 current_setting
410 )
411 else:
412 # Add to validation errors
413 validation_errors.append(
414 {
415 "key": key,
416 "name": current_setting.name,
417 "error": error_message,
418 }
419 )
420 else:
421 # Create a new setting
422 new_setting = {
423 "key": key,
424 "value": value,
425 "type": setting_type.value.lower(),
426 "name": key.split(".")[-1].replace("_", " ").title(),
427 "description": f"Setting for {key}",
428 "category": category,
429 "ui_element": "text", # Default UI element
430 }
432 # Determine better UI element based on value type
433 if isinstance(value, bool):
434 new_setting["ui_element"] = "checkbox"
435 elif isinstance(value, (int, float)) and not isinstance(
436 value, bool
437 ):
438 new_setting["ui_element"] = "number"
439 elif isinstance(value, (dict, list)):
440 new_setting["ui_element"] = "textarea"
442 # Create the setting
443 db_setting = create_or_update_setting(
444 new_setting, db_session=db_session
445 )
447 if db_setting:
448 created_settings.append(key)
449 # Track settings by type for exporting
450 if db_setting.type not in settings_by_type:
451 settings_by_type[db_setting.type] = []
452 settings_by_type[db_setting.type].append(db_setting)
453 else:
454 validation_errors.append(
455 {
456 "key": key,
457 "name": new_setting["name"],
458 "error": "Failed to create setting",
459 }
460 )
462 # Report validation errors if any
463 if validation_errors:
464 return (
465 jsonify(
466 {
467 "status": "error",
468 "message": "Validation errors",
469 "errors": validation_errors,
470 }
471 ),
472 400,
473 )
475 # Get all settings to return to the client for proper state update
476 all_settings = []
477 for setting in db_session.query(Setting).all():
478 # Convert enum to string if present
479 setting_type = setting.type
480 if hasattr(setting_type, "value"):
481 setting_type = setting_type.value
483 all_settings.append(
484 {
485 "key": setting.key,
486 "value": setting.value,
487 "name": setting.name,
488 "description": setting.description,
489 "type": setting_type,
490 "category": setting.category,
491 "ui_element": setting.ui_element,
492 "editable": setting.editable,
493 "options": setting.options,
494 }
495 )
497 # Customize the success message based on what changed
498 success_message = ""
499 if len(updated_settings) == 1:
500 # For a single update, provide more specific info about what changed
501 key = updated_settings[0]
502 # Reuse the already-fetched setting from our pre-fetched dict
503 updated_setting = all_db_settings.get(key)
504 name = (
505 updated_setting.name
506 if updated_setting
507 else key.split(".")[-1].replace("_", " ").title()
508 )
510 # Format the message
511 if key in original_values:
512 # Get original value but comment out if not used
513 # old_value = original_values[key]
514 new_value = (
515 updated_setting.value if updated_setting else None
516 )
518 # If it's a boolean, use "enabled/disabled" language
519 if isinstance(new_value, bool):
520 state = "enabled" if new_value else "disabled"
521 success_message = f"{name} {state}"
522 else:
523 # For non-boolean values
524 if isinstance(new_value, (dict, list)):
525 success_message = f"{name} updated"
526 else:
527 success_message = f"{name} updated"
528 else:
529 success_message = f"{name} updated"
530 else:
531 # Multiple settings or generic message
532 success_message = f"Settings saved successfully ({len(updated_settings)} updated, {len(created_settings)} created)"
534 # Check if any warning-affecting settings were changed and include warnings
535 response_data = {
536 "status": "success",
537 "message": success_message,
538 "updated": updated_settings,
539 "created": created_settings,
540 "settings": all_settings,
541 }
543 warning_affecting_keys = [
544 "llm.provider",
545 "search.tool",
546 "search.iterations",
547 "search.questions_per_iteration",
548 "llm.local_context_window_size",
549 "llm.context_window_unrestricted",
550 "llm.context_window_size",
551 ]
553 # Check if any warning-affecting settings were changed
554 if any(
555 key in warning_affecting_keys
556 for key in updated_settings + created_settings
557 ):
558 warnings = calculate_warnings()
559 response_data["warnings"] = warnings
560 logger.info(
561 f"Bulk settings update affected warning keys, calculated {len(warnings)} warnings"
562 )
564 return jsonify(response_data)
566 except Exception:
567 logger.exception("Error saving settings")
568 return (
569 jsonify(
570 {
571 "status": "error",
572 "message": "An internal error occurred while saving settings.",
573 }
574 ),
575 500,
576 )
579@settings_bp.route("/reset_to_defaults", methods=["POST"])
580@login_required
581def reset_to_defaults():
582 """Reset all settings to their default values"""
583 username = session.get("username")
585 with get_user_db_session(username) as db_session:
586 # Import default settings from files
587 try:
588 # Create settings manager with proper session context
589 username = session.get("username")
590 with get_user_db_session(username) as db_session:
591 settings_mgr = SettingsManager(db_session)
592 # Import settings from default files
593 settings_mgr.load_from_defaults_file()
595 logger.info("Successfully imported settings from default files")
597 except Exception:
598 logger.exception("Error importing default settings")
600 # Return success
601 return jsonify(
602 {
603 "status": "success",
604 "message": "All settings have been reset to default values",
605 }
606 )
609@settings_bp.route("/save_settings", methods=["POST"])
610@login_required
611def save_settings():
612 """Save all settings from the form using POST method - fallback when JavaScript is disabled"""
613 try:
614 username = session.get("username")
616 # Get form data
617 form_data = request.form.to_dict()
619 # Remove CSRF token from the data
620 form_data.pop("csrf_token", None)
622 with get_user_db_session(username) as db_session:
623 settings_manager = SettingsManager(db_session)
625 updated_count = 0
626 failed_count = 0
628 # Fetch all settings once to avoid N+1 query problem
629 all_db_settings = {
630 setting.key: setting
631 for setting in db_session.query(Setting).all()
632 }
634 # Process each setting
635 for key, value in form_data.items():
636 try:
637 # Get the setting from pre-fetched dict
638 db_setting = all_db_settings.get(key)
640 # Convert value to appropriate type using SettingsManager's logic
641 if db_setting:
642 value = get_typed_setting_value(
643 key=db_setting.key,
644 value=value,
645 ui_element=db_setting.ui_element,
646 default=None,
647 check_env=False,
648 )
650 # Save the setting
651 if settings_manager.set_setting(key, value, commit=False):
652 updated_count += 1
653 else:
654 failed_count += 1
655 logger.warning(f"Failed to save setting {key}")
657 except Exception:
658 logger.exception(f"Error saving setting {key}")
659 failed_count += 1
661 # Commit all changes at once
662 try:
663 db_session.commit()
665 flash(
666 f"Settings saved successfully! Updated {updated_count} settings.",
667 "success",
668 )
669 if failed_count > 0:
670 flash(
671 f"Warning: {failed_count} settings failed to save.",
672 "warning",
673 )
675 # Sync server config
676 settings_snapshot = settings_manager.get_settings_snapshot()
677 sync_from_settings(settings_snapshot)
679 except Exception:
680 db_session.rollback()
681 logger.exception("Failed to commit settings")
682 flash("Error saving settings. Please try again.", "error")
684 return redirect(url_for("settings.settings_page"))
686 except Exception:
687 logger.exception("Error in save_settings")
688 flash("An internal error occurred while saving settings.", "error")
689 return redirect(url_for("settings.settings_page"))
692# API Routes
693@settings_bp.route("/api", methods=["GET"])
694@login_required
695def api_get_all_settings():
696 """Get all settings"""
697 try:
698 # Get query parameters
699 category = request.args.get("category")
700 username = session.get("username")
702 with get_user_db_session(username) as db_session:
703 # Create settings manager with the session from context
704 # This ensures thread safety
705 settings_manager = SettingsManager(db_session)
707 # Get settings
708 settings = settings_manager.get_all_settings()
710 # Filter by category if requested
711 if category: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true
712 filtered_settings = {}
713 # Need to get all setting details to check category
714 db_settings = db_session.query(Setting).all()
715 category_keys = [
716 s.key for s in db_settings if s.category == category
717 ]
719 # Filter settings by keys
720 for key, value in settings.items():
721 if key in category_keys:
722 filtered_settings[key] = value
724 settings = filtered_settings
726 return jsonify({"status": "success", "settings": settings})
727 except Exception:
728 logger.exception("Error getting settings")
729 return jsonify({"error": "Failed to retrieve settings"}), 500
732@settings_bp.route("/api/<path:key>", methods=["GET"])
733@login_required
734def api_get_db_setting(key):
735 """Get a specific setting by key"""
736 try:
737 username = session.get("username")
739 with get_user_db_session(username) as db_session:
740 # Get setting from database using the same session
741 db_setting = (
742 db_session.query(Setting).filter(Setting.key == key).first()
743 )
745 if db_setting:
746 # Return full setting details
747 setting_data = {
748 "key": db_setting.key,
749 "value": db_setting.value,
750 "type": db_setting.type
751 if isinstance(db_setting.type, str)
752 else db_setting.type.value,
753 "name": db_setting.name,
754 "description": db_setting.description,
755 "category": db_setting.category,
756 "ui_element": db_setting.ui_element,
757 "options": db_setting.options,
758 "min_value": db_setting.min_value,
759 "max_value": db_setting.max_value,
760 "step": db_setting.step,
761 "visible": db_setting.visible,
762 "editable": db_setting.editable,
763 }
764 return jsonify(setting_data)
765 else:
766 # Setting not found
767 return jsonify({"error": f"Setting not found: {key}"}), 404
768 except Exception:
769 logger.exception(f"Error getting setting {key}")
770 return jsonify({"error": "Failed to retrieve settings"}), 500
773@settings_bp.route("/api/<path:key>", methods=["PUT"])
774@login_required
775def api_update_setting(key):
776 """Update a setting"""
777 try:
778 # Get request data
779 data = request.get_json()
780 if not data: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true
781 return jsonify({"error": "No data provided"}), 400
783 value = data.get("value")
784 if value is None: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true
785 return jsonify({"error": "No value provided"}), 400
787 username = session.get("username")
789 with get_user_db_session(username) as db_session:
790 # Only use settings_manager if needed - we don't need to assign if not used
791 # get_db_settings_manager(db_session)
793 # Check if setting exists
794 db_setting = (
795 db_session.query(Setting).filter(Setting.key == key).first()
796 )
798 if db_setting: 798 ↛ 800line 798 didn't jump to line 800 because the condition on line 798 was never true
799 # Check if setting is editable
800 if not db_setting.editable:
801 return jsonify(
802 {"error": f"Setting {key} is not editable"}
803 ), 403
805 # Update setting
806 # Pass the db_session to avoid session lookup issues
807 success = set_setting(key, value, db_session=db_session)
808 if success:
809 # Sync server config
810 settings_manager = SettingsManager(db_session)
811 settings_snapshot = settings_manager.get_settings_snapshot()
812 sync_from_settings(settings_snapshot)
814 response_data = {
815 "message": f"Setting {key} updated successfully"
816 }
818 # If this is a key that affects warnings, include warning calculations
819 warning_affecting_keys = [
820 "llm.provider",
821 "search.tool",
822 "search.iterations",
823 "search.questions_per_iteration",
824 "llm.local_context_window_size",
825 "llm.context_window_unrestricted",
826 "llm.context_window_size",
827 ]
829 if key in warning_affecting_keys:
830 warnings = calculate_warnings()
831 response_data["warnings"] = warnings
832 logger.debug(
833 f"Setting {key} changed to {value}, calculated {len(warnings)} warnings"
834 )
836 return jsonify(response_data)
837 else:
838 return jsonify(
839 {"error": f"Failed to update setting {key}"}
840 ), 500
841 else:
842 # Create new setting with default metadata
843 setting_dict = {
844 "key": key,
845 "value": value,
846 "name": key.split(".")[-1].replace("_", " ").title(),
847 "description": f"Setting for {key}",
848 }
850 # Add additional metadata if provided
851 for field in [
852 "type",
853 "name",
854 "description",
855 "category",
856 "ui_element",
857 "options",
858 "min_value",
859 "max_value",
860 "step",
861 "visible",
862 "editable",
863 ]:
864 if field in data:
865 setting_dict[field] = data[field]
867 # Create setting
868 db_setting = create_or_update_setting(
869 setting_dict, db_session=db_session
870 )
872 if db_setting: 872 ↛ 893line 872 didn't jump to line 893 because the condition on line 872 was always true
873 # Sync server config
874 settings_manager = SettingsManager(db_session)
875 settings_snapshot = settings_manager.get_settings_snapshot()
876 sync_from_settings(settings_snapshot)
878 return (
879 jsonify(
880 {
881 "message": f"Setting {key} created successfully",
882 "setting": {
883 "key": db_setting.key,
884 "value": db_setting.value,
885 "type": db_setting.type.value,
886 "name": db_setting.name,
887 },
888 }
889 ),
890 201,
891 )
892 else:
893 return jsonify(
894 {"error": f"Failed to create setting {key}"}
895 ), 500
896 except Exception:
897 logger.exception(f"Error updating setting {key}")
898 return jsonify({"error": "Failed to retrieve settings"}), 500
901@settings_bp.route("/api/<path:key>", methods=["DELETE"])
902@login_required
903def api_delete_setting(key):
904 """Delete a setting"""
905 try:
906 username = session.get("username")
908 with get_user_db_session(username) as db_session:
909 # Create settings manager with the session from context
910 settings_manager = SettingsManager(db_session)
912 # Check if setting exists
913 db_setting = (
914 db_session.query(Setting).filter(Setting.key == key).first()
915 )
916 if not db_setting:
917 return jsonify({"error": f"Setting not found: {key}"}), 404
919 # Delete setting
920 success = settings_manager.delete_setting(key)
921 if success: 921 ↛ 926line 921 didn't jump to line 926 because the condition on line 921 was always true
922 return jsonify(
923 {"message": f"Setting {key} deleted successfully"}
924 )
925 else:
926 return jsonify(
927 {"error": f"Failed to delete setting {key}"}
928 ), 500
929 except Exception:
930 logger.exception(f"Error deleting setting {key}")
931 return jsonify({"error": "Failed to retrieve settings"}), 500
934@settings_bp.route("/api/import", methods=["POST"])
935@login_required
936def api_import_settings():
937 """Import settings from defaults file"""
938 try:
939 username = session.get("username")
940 with get_user_db_session(username) as db_session:
941 settings_manager = SettingsManager(db_session)
942 success = settings_manager.load_from_defaults_file()
944 if success:
945 return jsonify({"message": "Settings imported successfully"})
946 else:
947 return jsonify({"error": "Failed to import settings"}), 500
948 except Exception:
949 logger.exception("Error importing settings")
950 return jsonify({"error": "Failed to retrieve settings"}), 500
953@settings_bp.route("/api/categories", methods=["GET"])
954@login_required
955def api_get_categories():
956 """Get all setting categories"""
957 try:
958 username = session.get("username")
960 with get_user_db_session(username) as db_session:
961 # Get all distinct categories
962 categories = db_session.query(Setting.category).distinct().all()
963 category_list = [c[0] for c in categories if c[0] is not None]
965 return jsonify({"categories": category_list})
966 except Exception:
967 logger.exception("Error getting categories")
968 return jsonify({"error": "Failed to retrieve settings"}), 500
971@settings_bp.route("/api/types", methods=["GET"])
972@login_required
973def api_get_types():
974 """Get all setting types"""
975 try:
976 # Get all setting types
977 types = [t.value for t in SettingType]
978 return jsonify({"types": types})
979 except Exception:
980 logger.exception("Error getting types")
981 return jsonify({"error": "Failed to retrieve settings"}), 500
984@settings_bp.route("/api/ui_elements", methods=["GET"])
985@login_required
986def api_get_ui_elements():
987 """Get all UI element types"""
988 try:
989 # Define supported UI element types
990 ui_elements = [
991 "text",
992 "select",
993 "checkbox",
994 "slider",
995 "number",
996 "textarea",
997 "color",
998 "date",
999 "file",
1000 "password",
1001 ]
1003 return jsonify({"ui_elements": ui_elements})
1004 except Exception:
1005 logger.exception("Error getting UI elements")
1006 return jsonify({"error": "Failed to retrieve settings"}), 500
1009@settings_bp.route("/api/available-models", methods=["GET"])
1010@login_required
1011def api_get_available_models():
1012 """Get available LLM models from various providers"""
1013 try:
1014 from flask import request
1016 from ...database.models import ProviderModel
1018 # Check if force_refresh is requested
1019 force_refresh = (
1020 request.args.get("force_refresh", "false").lower() == "true"
1021 )
1023 # Get all auto-discovered providers
1024 from ...llm.providers import get_discovered_provider_options
1026 provider_options = get_discovered_provider_options()
1028 # Add remaining hardcoded providers (complex local providers not yet migrated)
1029 provider_options.extend(
1030 [
1031 {"value": "VLLM", "label": "vLLM (Local)"},
1032 {"value": "LLAMACPP", "label": "Llama.cpp (Local)"},
1033 ]
1034 )
1036 # Available models by provider
1037 providers = {}
1039 # Check database cache first (unless force_refresh is True)
1040 if not force_refresh: 1040 ↛ 1090line 1040 didn't jump to line 1090 because the condition on line 1040 was always true
1041 try:
1042 # Define cache expiration (24 hours)
1043 cache_expiry = datetime.now(UTC) - timedelta(hours=24)
1045 # Get cached models from database
1046 username = session.get("username")
1047 with get_user_db_session(username) as db_session:
1048 cached_models = (
1049 db_session.query(ProviderModel)
1050 .filter(ProviderModel.last_updated > cache_expiry)
1051 .all()
1052 )
1054 if cached_models: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true
1055 logger.info(
1056 f"Found {len(cached_models)} cached models in database"
1057 )
1059 # Group models by provider
1060 for model in cached_models:
1061 provider_key = f"{model.provider.lower()}_models"
1062 if provider_key not in providers:
1063 providers[provider_key] = []
1065 providers[provider_key].append(
1066 {
1067 "value": model.model_key,
1068 "label": model.model_label,
1069 "provider": model.provider.upper(),
1070 }
1071 )
1073 # If we have cached data for all providers, return it
1074 if providers:
1075 logger.info("Returning cached models from database")
1076 return jsonify(
1077 {
1078 "provider_options": provider_options,
1079 "providers": providers,
1080 }
1081 )
1083 except Exception as e:
1084 logger.warning(
1085 f"Error reading cached models from database: {e}"
1086 )
1087 # Continue to fetch fresh data
1089 # Try to get Ollama models
1090 ollama_models = []
1091 try:
1092 import json
1093 import re
1095 import requests
1097 # Try to query the Ollama API directly
1098 try:
1099 logger.info("Attempting to connect to Ollama API")
1101 raw_base_url = _get_setting_from_session(
1102 "llm.ollama.url", "http://localhost:11434"
1103 )
1104 base_url = (
1105 normalize_url(raw_base_url)
1106 if raw_base_url
1107 else "http://localhost:11434"
1108 )
1110 ollama_response = safe_get(
1111 f"{base_url}/api/tags",
1112 timeout=5,
1113 allow_localhost=True,
1114 allow_private_ips=True,
1115 )
1117 logger.debug(
1118 f"Ollama API response: Status {ollama_response.status_code}"
1119 )
1121 # Try to parse the response even if status code is not 200 to help with debugging
1122 response_text = ollama_response.text
1123 logger.debug(
1124 f"Ollama API raw response: {response_text[:500]}..."
1125 )
1127 if ollama_response.status_code == 200:
1128 try:
1129 ollama_data = ollama_response.json()
1130 logger.debug(
1131 f"Ollama API JSON data: {json.dumps(ollama_data)[:500]}..."
1132 )
1134 if "models" in ollama_data:
1135 # Format for newer Ollama API
1136 logger.info(
1137 f"Found {len(ollama_data.get('models', []))} models in newer Ollama API format"
1138 )
1139 for model in ollama_data.get("models", []):
1140 # Extract name correctly from the model object
1141 name = model.get("name", "")
1142 if name:
1143 # Improved display name formatting
1144 display_name = re.sub(
1145 r"[:/]", " ", name
1146 ).strip()
1147 display_name = " ".join(
1148 word.capitalize()
1149 for word in display_name.split()
1150 )
1151 # Create the model entry with value and label
1152 ollama_models.append(
1153 {
1154 "value": name, # Original model name as value (for API calls)
1155 "label": f"{display_name} (Ollama)", # Pretty name as label
1156 "provider": "OLLAMA", # Add provider field for consistency
1157 }
1158 )
1159 logger.debug(
1160 f"Added Ollama model: {name} -> {display_name}"
1161 )
1162 else:
1163 # Format for older Ollama API
1164 logger.info(
1165 f"Found {len(ollama_data)} models in older Ollama API format"
1166 )
1167 for model in ollama_data:
1168 name = model.get("name", "")
1169 if name:
1170 # Improved display name formatting
1171 display_name = re.sub(
1172 r"[:/]", " ", name
1173 ).strip()
1174 display_name = " ".join(
1175 word.capitalize()
1176 for word in display_name.split()
1177 )
1178 ollama_models.append(
1179 {
1180 "value": name,
1181 "label": f"{display_name} (Ollama)",
1182 "provider": "OLLAMA", # Add provider field for consistency
1183 }
1184 )
1185 logger.debug(
1186 f"Added Ollama model: {name} -> {display_name}"
1187 )
1189 except json.JSONDecodeError as json_err:
1190 logger.exception(
1191 f"Failed to parse Ollama API response as JSON: {json_err}"
1192 )
1193 raise Exception(
1194 f"Ollama API returned invalid JSON: {json_err}"
1195 )
1196 else:
1197 logger.warning(
1198 f"Ollama API returned non-200 status code: {ollama_response.status_code}"
1199 )
1200 raise Exception(
1201 f"Ollama API returned status code {ollama_response.status_code}"
1202 )
1204 except requests.exceptions.RequestException as e:
1205 logger.warning(f"Could not connect to Ollama API: {e!s}")
1206 # No fallback models - just return empty list
1207 logger.info("Ollama not available - no models to display")
1208 ollama_models = []
1210 # Always set the ollama_models in providers, whether we got real or fallback models
1211 providers["ollama_models"] = ollama_models
1212 logger.info(f"Final Ollama models count: {len(ollama_models)}")
1214 # Log some model names for debugging
1215 if ollama_models: 1215 ↛ 1216line 1215 didn't jump to line 1216 because the condition on line 1215 was never true
1216 model_names = [m["value"] for m in ollama_models[:5]]
1217 logger.info(f"Sample Ollama models: {', '.join(model_names)}")
1219 except Exception:
1220 logger.exception("Error getting Ollama models")
1221 # No fallback models - just return empty list
1222 logger.info("Error getting Ollama models - no models to display")
1223 providers["ollama_models"] = []
1225 # Note: Custom OpenAI Endpoint models are fetched via auto-discovery
1226 # (see the auto-discovery loop below which handles OPENAI_ENDPOINT provider)
1228 # Get OpenAI models using the OpenAI package
1229 openai_models = []
1230 try:
1231 logger.info(
1232 "Attempting to connect to OpenAI API using OpenAI package"
1233 )
1235 # Get the API key from settings
1236 api_key = _get_setting_from_session("llm.openai.api_key", "")
1238 if api_key: 1238 ↛ 1239line 1238 didn't jump to line 1239 because the condition on line 1238 was never true
1239 import openai
1240 from openai import OpenAI
1242 # Create OpenAI client
1243 client = OpenAI(api_key=api_key)
1245 try:
1246 # Fetch models using the client
1247 logger.debug("Fetching models from OpenAI API")
1248 models_response = client.models.list()
1250 # Process models from the response
1251 for model in models_response.data:
1252 model_id = model.id
1253 if model_id:
1254 # Create a clean display name
1255 display_name = model_id.replace("-", " ").strip()
1256 display_name = " ".join(
1257 word.capitalize()
1258 for word in display_name.split()
1259 )
1261 openai_models.append(
1262 {
1263 "value": model_id,
1264 "label": f"{display_name} (OpenAI)",
1265 "provider": "OPENAI",
1266 }
1267 )
1268 logger.debug(
1269 f"Added OpenAI model: {model_id} -> {display_name}"
1270 )
1272 # Keep original order from OpenAI - their models are returned in a
1273 # meaningful order (newer/more capable models first)
1275 except openai.APIError as api_err:
1276 logger.exception(f"OpenAI API error: {api_err!s}")
1277 logger.info("No OpenAI models found due to API error")
1279 else:
1280 logger.info(
1281 "OpenAI API key not configured, no models available"
1282 )
1284 except Exception as e:
1285 logger.exception(f"Error getting OpenAI models: {e!s}")
1286 logger.info("No OpenAI models available due to error")
1288 # Always set the openai_models in providers (will be empty array if no models found)
1289 providers["openai_models"] = openai_models
1290 logger.info(f"Final OpenAI models count: {len(openai_models)}")
1292 # Try to get Anthropic models using the Anthropic package
1293 anthropic_models = []
1294 try:
1295 logger.info(
1296 "Attempting to connect to Anthropic API using Anthropic package"
1297 )
1299 # Get the API key from settings
1300 api_key = _get_setting_from_session("llm.anthropic.api_key", "")
1302 if api_key: 1302 ↛ 1304line 1302 didn't jump to line 1304 because the condition on line 1302 was never true
1303 # Import Anthropic package here to avoid dependency issues if not installed
1304 from anthropic import Anthropic
1306 # Create Anthropic client
1307 client = Anthropic(api_key=api_key)
1309 try:
1310 # Fetch models using the client
1311 logger.debug("Fetching models from Anthropic API")
1312 models_response = client.models.list()
1314 # Process models from the response
1315 for model in models_response.data:
1316 model_id = model.id
1317 if model_id:
1318 # Create a clean display name
1319 display_name = model_id.replace("-", " ").strip()
1320 display_name = " ".join(
1321 word.capitalize()
1322 for word in display_name.split()
1323 )
1325 anthropic_models.append(
1326 {
1327 "value": model_id,
1328 "label": f"{display_name} (Anthropic)",
1329 "provider": "ANTHROPIC",
1330 }
1331 )
1332 logger.debug(
1333 f"Added Anthropic model: {model_id} -> {display_name}"
1334 )
1336 except Exception as api_err:
1337 logger.exception(f"Anthropic API error: {api_err!s}")
1338 else:
1339 logger.info("Anthropic API key not configured")
1341 except ImportError:
1342 logger.warning(
1343 "Anthropic package not installed. No models will be available."
1344 )
1345 except Exception as e:
1346 logger.exception(f"Error getting Anthropic models: {e!s}")
1348 # Set anthropic_models in providers (could be empty if API call failed)
1349 providers["anthropic_models"] = anthropic_models
1350 logger.info(f"Final Anthropic models count: {len(anthropic_models)}")
1352 # Fetch models from auto-discovered providers
1353 from ...llm.providers import discover_providers
1355 discovered_providers = discover_providers()
1357 for provider_key, provider_info in discovered_providers.items():
1358 provider_models = []
1359 try:
1360 logger.info(
1361 f"Fetching models from {provider_info.provider_name}"
1362 )
1364 # Get the provider class
1365 provider_class = provider_info.provider_class
1367 # Get API key if configured
1368 api_key = _get_setting_from_session(
1369 provider_class.api_key_setting, ""
1370 )
1372 # Get base URL if provider has configurable URL
1373 base_url = None
1374 if (
1375 hasattr(provider_class, "url_setting")
1376 and provider_class.url_setting
1377 ):
1378 base_url = _get_setting_from_session(
1379 provider_class.url_setting, ""
1380 )
1382 # Use the provider's list_models_for_api method
1383 models = provider_class.list_models_for_api(api_key, base_url)
1385 # Format models for the API response
1386 for model in models:
1387 provider_models.append(
1388 {
1389 "value": model["value"],
1390 "label": model[
1391 "label"
1392 ], # Use provider's label as-is
1393 "provider": provider_key,
1394 }
1395 )
1397 logger.info(
1398 f"Successfully fetched {len(provider_models)} models from {provider_info.provider_name}"
1399 )
1401 except Exception as e:
1402 logger.exception(
1403 f"Error getting {provider_info.provider_name} models: {e!s}"
1404 )
1406 # Set models in providers dict using lowercase key
1407 providers[f"{provider_key.lower()}_models"] = provider_models
1408 logger.info(
1409 f"Final {provider_key} models count: {len(provider_models)}"
1410 )
1412 # Save fetched models to database cache
1413 if force_refresh or providers: 1413 ↛ 1462line 1413 didn't jump to line 1462 because the condition on line 1413 was always true
1414 # We fetched fresh data, save it to database
1415 username = session.get("username")
1416 with get_user_db_session(username) as db_session:
1417 try:
1418 if force_refresh: 1418 ↛ 1421line 1418 didn't jump to line 1421 because the condition on line 1418 was never true
1419 # When force refresh, clear ALL cached models to remove any stale data
1420 # from old code versions or deleted providers
1421 deleted_count = db_session.query(ProviderModel).delete()
1422 logger.info(
1423 f"Force refresh: cleared all {deleted_count} cached models"
1424 )
1425 else:
1426 # Clear old cache entries only for providers we're updating
1427 for provider_key in providers:
1428 provider_name = provider_key.replace(
1429 "_models", ""
1430 ).upper()
1431 db_session.query(ProviderModel).filter(
1432 ProviderModel.provider == provider_name
1433 ).delete()
1435 # Insert new models
1436 for provider_key, models in providers.items():
1437 provider_name = provider_key.replace(
1438 "_models", ""
1439 ).upper()
1440 for model in models:
1441 if ( 1441 ↛ 1440line 1441 didn't jump to line 1440 because the condition on line 1441 was always true
1442 isinstance(model, dict)
1443 and "value" in model
1444 and "label" in model
1445 ):
1446 new_model = ProviderModel(
1447 provider=provider_name,
1448 model_key=model["value"],
1449 model_label=model["label"],
1450 last_updated=datetime.now(UTC),
1451 )
1452 db_session.add(new_model)
1454 db_session.commit()
1455 logger.info("Successfully cached models to database")
1457 except Exception:
1458 logger.exception("Error saving models to database cache")
1459 db_session.rollback()
1461 # Return all options
1462 return jsonify(
1463 {"provider_options": provider_options, "providers": providers}
1464 )
1466 except Exception:
1467 logger.exception("Error getting available models")
1468 return jsonify(
1469 {"status": "error", "message": "Failed to save settings"}
1470 ), 500
1473def _get_engine_icon_and_category(
1474 engine_data: dict, engine_class=None
1475) -> tuple:
1476 """
1477 Get icon emoji and category label for a search engine based on its attributes.
1479 Args:
1480 engine_data: Engine configuration dictionary
1481 engine_class: Optional loaded engine class to check attributes
1483 Returns:
1484 Tuple of (icon, category) strings
1485 """
1486 # Check attributes from either the class or the engine data
1487 if engine_class: 1487 ↛ 1494line 1487 didn't jump to line 1494 because the condition on line 1487 was always true
1488 is_scientific = getattr(engine_class, "is_scientific", False)
1489 is_generic = getattr(engine_class, "is_generic", False)
1490 is_local = getattr(engine_class, "is_local", False)
1491 is_news = getattr(engine_class, "is_news", False)
1492 is_code = getattr(engine_class, "is_code", False)
1493 else:
1494 is_scientific = engine_data.get("is_scientific", False)
1495 is_generic = engine_data.get("is_generic", False)
1496 is_local = engine_data.get("is_local", False)
1497 is_news = engine_data.get("is_news", False)
1498 is_code = engine_data.get("is_code", False)
1500 # Return icon and category based on engine type
1501 # Priority: local > scientific > news > code > generic > default
1502 if is_local:
1503 return "📁", "Local RAG"
1504 elif is_scientific:
1505 return "🔬", "Scientific"
1506 elif is_news:
1507 return "📰", "News"
1508 elif is_code: 1508 ↛ 1509line 1508 didn't jump to line 1509 because the condition on line 1508 was never true
1509 return "💻", "Code"
1510 elif is_generic:
1511 return "🌐", "Web Search"
1512 else:
1513 return "🔍", "Search"
1516@settings_bp.route("/api/available-search-engines", methods=["GET"])
1517@login_required
1518def api_get_available_search_engines():
1519 """Get available search engines"""
1520 try:
1521 # Get search engines using the same approach as search_engines_config.py
1522 from ...web_search_engines.search_engines_config import search_config
1523 from ...database.session_context import get_user_db_session
1525 username = session.get("username")
1526 with get_user_db_session(username) as db_session:
1527 search_engines = search_config(
1528 username=username, db_session=db_session
1529 )
1531 # Extract search engines from config
1532 engines_dict = {}
1533 engine_options = []
1535 if search_engines: 1535 ↛ 1586line 1535 didn't jump to line 1586 because the condition on line 1535 was always true
1536 # Format engines for API response with metadata
1537 from importlib import import_module
1539 for engine_id, engine_data in search_engines.items():
1540 # Try to load the engine class to get metadata
1541 engine_class = None
1542 try:
1543 module_path = engine_data.get("module_path")
1544 class_name = engine_data.get("class_name")
1545 if module_path and class_name: 1545 ↛ 1560line 1545 didn't jump to line 1560 because the condition on line 1545 was always true
1546 # Handle relative imports
1547 package = None
1548 if module_path.startswith("."):
1549 package = (
1550 "local_deep_research.web_search_engines"
1551 )
1552 module = import_module(module_path, package=package)
1553 engine_class = getattr(module, class_name, None)
1554 except Exception as e:
1555 logger.debug(
1556 f"Could not load engine class for {engine_id}: {e}"
1557 )
1559 # Get icon and category from engine attributes
1560 icon, category = _get_engine_icon_and_category(
1561 engine_data, engine_class
1562 )
1564 # Build display name with icon and category
1565 base_name = engine_data.get("display_name", engine_id)
1566 label = f"{icon} {base_name} ({category})"
1568 engines_dict[engine_id] = {
1569 "display_name": base_name,
1570 "description": engine_data.get("description", ""),
1571 "strengths": engine_data.get("strengths", []),
1572 "icon": icon,
1573 "category": category,
1574 }
1576 engine_options.append(
1577 {
1578 "value": engine_id,
1579 "label": label,
1580 "icon": icon,
1581 "category": category,
1582 }
1583 )
1585 # If no engines found, log the issue but return empty list
1586 if not engine_options: 1586 ↛ 1587line 1586 didn't jump to line 1587 because the condition on line 1586 was never true
1587 logger.warning("No search engines found in configuration")
1589 return jsonify(
1590 {"engines": engines_dict, "engine_options": engine_options}
1591 )
1593 except Exception:
1594 logger.exception("Error getting available search engines")
1595 return jsonify({"error": "Failed to retrieve settings"}), 500
1598# Legacy routes for backward compatibility - these will redirect to the new routes
1599@settings_bp.route("/main", methods=["GET"])
1600@login_required
1601def main_config_page():
1602 """Redirect to app settings page"""
1603 return redirect(url_for("settings.settings_page"))
1606@settings_bp.route("/collections", methods=["GET"])
1607@login_required
1608def collections_config_page():
1609 """Redirect to app settings page"""
1610 return redirect(url_for("settings.settings_page"))
1613@settings_bp.route("/api_keys", methods=["GET"])
1614@login_required
1615def api_keys_config_page():
1616 """Redirect to LLM settings page"""
1617 return redirect(url_for("settings.settings_page"))
1620@settings_bp.route("/search_engines", methods=["GET"])
1621@login_required
1622def search_engines_config_page():
1623 """Redirect to search settings page"""
1624 return redirect(url_for("settings.settings_page"))
1627@settings_bp.route("/open_file_location", methods=["POST"])
1628@login_required
1629def open_file_location():
1630 """Open the location of a configuration file"""
1631 file_path = request.form.get("file_path")
1633 if not file_path:
1634 flash("No file path provided", "error")
1635 return redirect(url_for("settings.settings_page"))
1637 try:
1638 # Use centralized path validator for security
1639 from ...security.path_validator import PathValidator
1640 from ...config.paths import get_data_directory
1642 try:
1643 # PathValidator.validate_config_path already checks existence
1644 resolved_path = PathValidator.validate_config_path(
1645 file_path, get_data_directory()
1646 )
1647 except ValueError as e:
1648 # The validator will raise ValueError if file doesn't exist
1649 flash(f"Invalid file path: {str(e)}", "error")
1650 return redirect(url_for("settings.settings_page"))
1652 # Get the directory containing the file
1653 dir_path = resolved_path.parent
1654 file_path = resolved_path # Use resolved path going forward
1656 # Open the directory in the file explorer
1657 if platform.system() == "Windows":
1658 subprocess.Popen(["explorer", str(dir_path)])
1659 elif platform.system() == "Darwin": # macOS
1660 subprocess.Popen(["open", str(dir_path)])
1661 else: # Linux
1662 subprocess.Popen(["xdg-open", str(dir_path)])
1664 flash(f"Opening folder: {dir_path}", "success")
1665 except Exception as e:
1666 logger.exception("Error opening folder")
1667 flash(f"Error opening folder: {e!s}", "error")
1669 # Redirect back to the settings page
1670 return redirect(url_for("settings.settings_page"))
1673@settings_bp.context_processor
1674def inject_csrf_token():
1675 """Inject CSRF token into the template context for all settings routes."""
1676 return dict(csrf_token=generate_csrf)
1679@settings_bp.route("/fix_corrupted_settings", methods=["POST"])
1680@login_required
1681def fix_corrupted_settings():
1682 """Fix corrupted settings in the database"""
1683 username = session.get("username")
1685 with get_user_db_session(username) as db_session:
1686 try:
1687 # Track fixed and removed settings
1688 fixed_settings = []
1689 removed_duplicate_settings = []
1690 fixed_scoping_issues = []
1692 # First, find and remove duplicate settings with the same key
1693 # This happens because of errors in settings import/export
1694 from sqlalchemy import func as sql_func
1696 # Find keys with duplicates
1697 duplicate_keys = (
1698 db_session.query(Setting.key)
1699 .group_by(Setting.key)
1700 .having(sql_func.count(Setting.key) > 1)
1701 .all()
1702 )
1703 duplicate_keys = [key[0] for key in duplicate_keys]
1705 # For each duplicate key, keep the latest updated one and remove others
1706 for key in duplicate_keys:
1707 dupe_settings = (
1708 db_session.query(Setting)
1709 .filter(Setting.key == key)
1710 .order_by(Setting.updated_at.desc())
1711 .all()
1712 )
1714 # Keep the first one (most recently updated) and delete the rest
1715 for i, setting in enumerate(dupe_settings):
1716 if i > 0: # Skip the first one (keep it)
1717 db_session.delete(setting)
1718 removed_duplicate_settings.append(key)
1720 # Fix scoping issues - remove app.* settings that should be in other categories
1721 # Report settings
1722 for key in [
1723 "app.enable_fact_checking",
1724 "app.knowledge_accumulation",
1725 "app.knowledge_accumulation_context_limit",
1726 "app.output_dir",
1727 ]:
1728 setting = (
1729 db_session.query(Setting).filter(Setting.key == key).first()
1730 )
1731 if setting:
1732 # Move to proper category if not already there
1733 proper_key = key.replace("app.", "report.")
1734 existing_proper = (
1735 db_session.query(Setting)
1736 .filter(Setting.key == proper_key)
1737 .first()
1738 )
1740 if not existing_proper:
1741 # Create proper setting
1742 new_setting = Setting(
1743 key=proper_key,
1744 value=setting.value,
1745 type=SettingType.REPORT,
1746 name=setting.name,
1747 description=setting.description,
1748 category=(
1749 setting.category.replace("app", "report")
1750 if setting.category
1751 else "report_parameters"
1752 ),
1753 ui_element=setting.ui_element,
1754 options=setting.options,
1755 min_value=setting.min_value,
1756 max_value=setting.max_value,
1757 step=setting.step,
1758 visible=setting.visible,
1759 editable=setting.editable,
1760 )
1761 db_session.add(new_setting)
1763 # Delete the app one
1764 db_session.delete(setting)
1765 fixed_scoping_issues.append(key)
1767 # Search settings
1768 for key in [
1769 "app.questions_per_iteration",
1770 "app.search_engine",
1771 "app.iterations",
1772 "app.max_results",
1773 "app.region",
1774 "app.safe_search",
1775 "app.search_language",
1776 "app.snippets_only",
1777 ]:
1778 setting = (
1779 db_session.query(Setting).filter(Setting.key == key).first()
1780 )
1781 if setting:
1782 # Move to proper category if not already there
1783 proper_key = key.replace("app.", "search.")
1784 existing_proper = (
1785 db_session.query(Setting)
1786 .filter(Setting.key == proper_key)
1787 .first()
1788 )
1790 if not existing_proper:
1791 # Create proper setting
1792 new_setting = Setting(
1793 key=proper_key,
1794 value=setting.value,
1795 type=SettingType.SEARCH,
1796 name=setting.name,
1797 description=setting.description,
1798 category=(
1799 setting.category.replace("app", "search")
1800 if setting.category
1801 else "search_parameters"
1802 ),
1803 ui_element=setting.ui_element,
1804 options=setting.options,
1805 min_value=setting.min_value,
1806 max_value=setting.max_value,
1807 step=setting.step,
1808 visible=setting.visible,
1809 editable=setting.editable,
1810 )
1811 db_session.add(new_setting)
1813 # Delete the app one
1814 db_session.delete(setting)
1815 fixed_scoping_issues.append(key)
1817 # LLM settings
1818 for key in [
1819 "app.model",
1820 "app.provider",
1821 "app.temperature",
1822 "app.max_tokens",
1823 "app.openai_endpoint_url",
1824 "app.lmstudio_url",
1825 "app.llamacpp_model_path",
1826 ]:
1827 setting = (
1828 db_session.query(Setting).filter(Setting.key == key).first()
1829 )
1830 if setting:
1831 # Move to proper category if not already there
1832 proper_key = key.replace("app.", "llm.")
1833 existing_proper = (
1834 db_session.query(Setting)
1835 .filter(Setting.key == proper_key)
1836 .first()
1837 )
1839 if not existing_proper:
1840 # Create proper setting
1841 new_setting = Setting(
1842 key=proper_key,
1843 value=setting.value,
1844 type=SettingType.LLM,
1845 name=setting.name,
1846 description=setting.description,
1847 category=(
1848 setting.category.replace("app", "llm")
1849 if setting.category
1850 else "llm_parameters"
1851 ),
1852 ui_element=setting.ui_element,
1853 options=setting.options,
1854 min_value=setting.min_value,
1855 max_value=setting.max_value,
1856 step=setting.step,
1857 visible=setting.visible,
1858 editable=setting.editable,
1859 )
1860 db_session.add(new_setting)
1862 # Delete the app one
1863 db_session.delete(setting)
1864 fixed_scoping_issues.append(key)
1866 # Check for settings with corrupted values
1867 all_settings = db_session.query(Setting).all()
1868 for setting in all_settings:
1869 # Check different types of corruption
1870 is_corrupted = False
1872 if (
1873 setting.value is None
1874 or (
1875 isinstance(setting.value, str)
1876 and setting.value
1877 in [
1878 "{",
1879 "[",
1880 "{}",
1881 "[]",
1882 "[object Object]",
1883 "null",
1884 "undefined",
1885 ]
1886 )
1887 or (
1888 isinstance(setting.value, dict)
1889 and len(setting.value) == 0
1890 )
1891 ):
1892 is_corrupted = True
1894 # Skip if not corrupted
1895 if not is_corrupted:
1896 continue
1898 # Get default value from migrations
1899 # Import commented out as it's not directly used
1900 # from ..database.migrations import setup_predefined_settings
1902 default_value = None
1904 # Try to find a matching default setting based on key
1905 if setting.key.startswith("llm."):
1906 if setting.key == "llm.model":
1907 default_value = "gpt-3.5-turbo"
1908 elif setting.key == "llm.provider":
1909 default_value = "openai"
1910 elif setting.key == "llm.temperature":
1911 default_value = 0.7
1912 elif setting.key == "llm.max_tokens":
1913 default_value = 1024
1914 elif setting.key.startswith("search."):
1915 if setting.key == "search.tool":
1916 default_value = "auto"
1917 elif setting.key == "search.max_results":
1918 default_value = 10
1919 elif setting.key == "search.region":
1920 default_value = "us"
1921 elif setting.key == "search.questions_per_iteration":
1922 default_value = 3
1923 elif setting.key == "search.searches_per_section":
1924 default_value = 2
1925 elif setting.key == "search.skip_relevance_filter":
1926 default_value = False
1927 elif setting.key == "search.safe_search":
1928 default_value = True
1929 elif setting.key == "search.search_language":
1930 default_value = "English"
1931 elif setting.key.startswith("report."):
1932 if setting.key == "report.searches_per_section":
1933 default_value = 2
1934 elif (
1935 setting.key == "report.enable_fact_checking"
1936 or setting.key == "report.detailed_citations"
1937 ):
1938 default_value = True
1939 elif setting.key.startswith("app."):
1940 if (
1941 setting.key == "app.theme"
1942 or setting.key == "app.default_theme"
1943 ):
1944 default_value = "dark"
1945 elif setting.key == "app.enable_notifications" or (
1946 setting.key == "app.enable_web"
1947 or setting.key == "app.web_interface"
1948 ):
1949 default_value = True
1950 elif setting.key == "app.host":
1951 default_value = "0.0.0.0"
1952 elif setting.key == "app.port":
1953 default_value = 5000
1954 elif setting.key == "app.debug":
1955 default_value = True
1957 # Update the setting with the default value if found
1958 if default_value is not None:
1959 setting.value = default_value
1960 fixed_settings.append(setting.key)
1961 else:
1962 # If no default found but it's a corrupted JSON, set to empty object
1963 if setting.key.startswith("report."):
1964 setting.value = {}
1965 fixed_settings.append(setting.key)
1967 # Commit changes
1968 if (
1969 fixed_settings
1970 or removed_duplicate_settings
1971 or fixed_scoping_issues
1972 ):
1973 db_session.commit()
1974 logger.info(
1975 f"Fixed {len(fixed_settings)} corrupted settings: {', '.join(fixed_settings)}"
1976 )
1977 if removed_duplicate_settings:
1978 logger.info(
1979 f"Removed {len(removed_duplicate_settings)} duplicate settings"
1980 )
1981 if fixed_scoping_issues:
1982 logger.info(
1983 f"Fixed {len(fixed_scoping_issues)} scoping issues"
1984 )
1986 # Return success
1987 return jsonify(
1988 {
1989 "status": "success",
1990 "message": f"Fixed {len(fixed_settings)} corrupted settings, removed {len(removed_duplicate_settings)} duplicates, and fixed {len(fixed_scoping_issues)} scoping issues",
1991 "fixed_settings": fixed_settings,
1992 "removed_duplicates": removed_duplicate_settings,
1993 "fixed_scoping": fixed_scoping_issues,
1994 }
1995 )
1997 except Exception:
1998 logger.exception("Error fixing corrupted settings")
1999 db_session.rollback()
2000 return (
2001 jsonify(
2002 {
2003 "status": "error",
2004 "message": "An internal error occurred while fixing corrupted settings. Please try again later.",
2005 }
2006 ),
2007 500,
2008 )
2011@settings_bp.route("/api/warnings", methods=["GET"])
2012@login_required
2013def api_get_warnings():
2014 """Get current warnings based on settings"""
2015 try:
2016 warnings = calculate_warnings()
2017 return jsonify({"warnings": warnings})
2018 except Exception:
2019 logger.exception("Error getting warnings")
2020 return jsonify({"error": "Failed to retrieve settings"}), 500
2023@settings_bp.route("/api/ollama-status", methods=["GET"])
2024@login_required
2025def check_ollama_status():
2026 """Check if Ollama is running and available"""
2027 try:
2028 # Get Ollama URL from settings
2029 raw_base_url = _get_setting_from_session(
2030 "llm.ollama.url", "http://localhost:11434"
2031 )
2032 base_url = (
2033 normalize_url(raw_base_url)
2034 if raw_base_url
2035 else "http://localhost:11434"
2036 )
2038 response = safe_get(
2039 f"{base_url}/api/version",
2040 timeout=2.0,
2041 allow_localhost=True,
2042 allow_private_ips=True,
2043 )
2045 if response.status_code == 200:
2046 return jsonify(
2047 {
2048 "running": True,
2049 "version": response.json().get("version", "unknown"),
2050 }
2051 )
2052 else:
2053 return jsonify(
2054 {
2055 "running": False,
2056 "error": f"Ollama returned status code {response.status_code}",
2057 }
2058 )
2059 except requests.exceptions.RequestException:
2060 logger.exception("Ollama check failed")
2061 return jsonify(
2062 {"running": False, "error": "Failed to check search engine status"}
2063 )
2066@settings_bp.route("/api/rate-limiting/status", methods=["GET"])
2067@login_required
2068def api_get_rate_limiting_status():
2069 """Get current rate limiting status and statistics"""
2070 try:
2071 from ...web_search_engines.rate_limiting import get_tracker
2073 tracker = get_tracker()
2075 # Get basic status
2076 status = {
2077 "enabled": tracker.enabled,
2078 "exploration_rate": tracker.exploration_rate,
2079 "learning_rate": tracker.learning_rate,
2080 "memory_window": tracker.memory_window,
2081 }
2083 # Get engine statistics
2084 engine_stats = tracker.get_stats()
2085 engines = []
2087 for stat in engine_stats: 2087 ↛ 2088line 2087 didn't jump to line 2088 because the loop on line 2087 never started
2088 (
2089 engine_type,
2090 base_wait,
2091 min_wait,
2092 max_wait,
2093 last_updated,
2094 total_attempts,
2095 success_rate,
2096 ) = stat
2097 engines.append(
2098 {
2099 "engine_type": engine_type,
2100 "base_wait_seconds": round(base_wait, 2),
2101 "min_wait_seconds": round(min_wait, 2),
2102 "max_wait_seconds": round(max_wait, 2),
2103 "last_updated": last_updated,
2104 "total_attempts": total_attempts,
2105 "success_rate": (
2106 round(success_rate * 100, 1) if success_rate else 0.0
2107 ),
2108 }
2109 )
2111 return jsonify({"status": status, "engines": engines})
2113 except Exception:
2114 logger.exception("Error getting rate limiting status")
2115 return jsonify({"error": "An internal error occurred"}), 500
2118@settings_bp.route(
2119 "/api/rate-limiting/engines/<engine_type>/reset", methods=["POST"]
2120)
2121@login_required
2122def api_reset_engine_rate_limiting(engine_type):
2123 """Reset rate limiting data for a specific engine"""
2124 try:
2125 from ...web_search_engines.rate_limiting import get_tracker
2127 tracker = get_tracker()
2128 tracker.reset_engine(engine_type)
2130 return jsonify(
2131 {"message": f"Rate limiting data reset for {engine_type}"}
2132 )
2134 except Exception:
2135 logger.exception(f"Error resetting rate limiting for {engine_type}")
2136 return jsonify({"error": "An internal error occurred"}), 500
2139@settings_bp.route("/api/rate-limiting/cleanup", methods=["POST"])
2140@login_required
2141def api_cleanup_rate_limiting():
2142 """Clean up old rate limiting data"""
2143 try:
2144 from ...web_search_engines.rate_limiting import get_tracker
2146 days = request.json.get("days", 30) if request.is_json else 30
2148 tracker = get_tracker()
2149 tracker.cleanup_old_data(days)
2151 return jsonify(
2152 {"message": f"Cleaned up rate limiting data older than {days} days"}
2153 )
2155 except Exception:
2156 logger.exception("Error cleaning up rate limiting data")
2157 return jsonify({"error": "An internal error occurred"}), 500
2160@settings_bp.route("/api/bulk", methods=["GET"])
2161@login_required
2162def get_bulk_settings():
2163 """Get multiple settings at once for performance."""
2164 try:
2165 # Get requested settings from query parameters
2166 requested = request.args.getlist("keys[]")
2167 if not requested: 2167 ↛ 2183line 2167 didn't jump to line 2183 because the condition on line 2167 was always true
2168 # Default to common settings if none specified
2169 requested = [
2170 "llm.provider",
2171 "llm.model",
2172 "search.tool",
2173 "search.iterations",
2174 "search.questions_per_iteration",
2175 "search.search_strategy",
2176 "benchmark.evaluation.provider",
2177 "benchmark.evaluation.model",
2178 "benchmark.evaluation.temperature",
2179 "benchmark.evaluation.endpoint_url",
2180 ]
2182 # Fetch all settings at once
2183 result = {}
2184 for key in requested:
2185 try:
2186 value = _get_setting_from_session(key)
2187 result[key] = {"value": value, "exists": value is not None}
2188 except Exception as e:
2189 logger.warning(f"Error getting setting {key}: {e}")
2190 result[key] = {
2191 "value": None,
2192 "exists": False,
2193 "error": "Failed to retrieve setting",
2194 }
2196 return jsonify({"success": True, "settings": result})
2198 except Exception:
2199 logger.exception("Error getting bulk settings")
2200 return jsonify(
2201 {"success": False, "error": "An internal error occurred"}
2202 ), 500
2205@settings_bp.route("/api/data-location", methods=["GET"])
2206@login_required
2207def api_get_data_location():
2208 """Get information about data storage location and security"""
2209 try:
2210 # Get the data directory path
2211 data_dir = get_data_directory()
2212 # Get the encrypted databases path
2213 encrypted_db_path = get_encrypted_database_path()
2215 # Check if LDR_DATA_DIR environment variable is set
2216 from local_deep_research.settings.manager import SettingsManager
2218 settings_manager = SettingsManager()
2219 custom_data_dir = settings_manager.get_setting("bootstrap.data_dir")
2221 # Get platform-specific default location info
2222 platform_info = {
2223 "Windows": "C:\\Users\\Username\\AppData\\Local\\local-deep-research",
2224 "macOS": "~/Library/Application Support/local-deep-research",
2225 "Linux": "~/.local/share/local-deep-research",
2226 }
2228 # Current platform
2229 current_platform = platform.system()
2230 if current_platform == "Darwin": 2230 ↛ 2231line 2230 didn't jump to line 2231 because the condition on line 2230 was never true
2231 current_platform = "macOS"
2233 # Get SQLCipher settings from environment
2234 from ...database.sqlcipher_utils import get_sqlcipher_settings
2236 # Debug logging
2237 logger.info(f"db_manager type: {type(db_manager)}")
2238 logger.info(
2239 f"db_manager.has_encryption: {getattr(db_manager, 'has_encryption', 'ATTRIBUTE NOT FOUND')}"
2240 )
2242 cipher_settings = (
2243 get_sqlcipher_settings() if db_manager.has_encryption else {}
2244 )
2246 return jsonify(
2247 {
2248 "data_directory": str(data_dir),
2249 "database_path": str(encrypted_db_path),
2250 "encrypted_database_path": str(encrypted_db_path),
2251 "is_custom": custom_data_dir is not None,
2252 "custom_env_var": "LDR_DATA_DIR",
2253 "custom_env_value": custom_data_dir,
2254 "platform": current_platform,
2255 "platform_default": platform_info.get(
2256 current_platform, str(data_dir)
2257 ),
2258 "platform_info": platform_info,
2259 "security_notice": {
2260 "encrypted": db_manager.has_encryption,
2261 "warning": "All data including API keys stored in the database are securely encrypted."
2262 if db_manager.has_encryption
2263 else "All data including API keys stored in the database are currently unencrypted. Please ensure appropriate file system permissions are set.",
2264 "recommendation": "Your data is protected with database encryption."
2265 if db_manager.has_encryption
2266 else "Consider using environment variables for sensitive API keys instead of storing them in the database.",
2267 },
2268 "encryption_settings": cipher_settings,
2269 }
2270 )
2272 except Exception:
2273 logger.exception("Error getting data location information")
2274 return jsonify({"error": "Failed to retrieve settings"}), 500
2277@settings_bp.route("/api/notifications/test-url", methods=["POST"])
2278@login_required
2279def api_test_notification_url():
2280 """
2281 Test a notification service URL.
2283 This endpoint creates a temporary NotificationService instance to test
2284 the provided URL. No database session or password is required because:
2285 - The service URL is provided directly in the request body
2286 - Test notifications use a temporary Apprise instance
2287 - No user settings or database queries are performed
2289 Security note: Rate limiting is not applied here because users need to
2290 test URLs when configuring notifications. Abuse is mitigated by the
2291 @login_required decorator and the fact that users can only spam their
2292 own notification services.
2293 """
2294 try:
2295 from ...notifications.service import NotificationService
2297 data = request.get_json()
2298 if not data or "service_url" not in data:
2299 return jsonify(
2300 {"success": False, "error": "service_url is required"}
2301 ), 400
2303 service_url = data["service_url"]
2305 # Create notification service instance and test the URL
2306 # No password/session needed - URL provided directly, no DB access
2307 notification_service = NotificationService()
2308 result = notification_service.test_service(service_url)
2310 # Only return expected fields to prevent information leakage
2311 safe_response = {
2312 "success": result.get("success", False),
2313 "message": result.get("message", ""),
2314 "error": result.get("error", ""),
2315 }
2316 return jsonify(safe_response)
2318 except Exception:
2319 logger.exception("Error testing notification URL")
2320 return jsonify(
2321 {
2322 "success": False,
2323 "error": "Failed to test notification service. Check logs for details.",
2324 }
2325 ), 500