Coverage for src / local_deep_research / web / routes / settings_routes.py: 51%
996 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Settings Routes Module
4This module handles all settings-related HTTP endpoints for the application.
6CHECKBOX HANDLING PATTERN:
7--------------------------
8This module supports TWO submission modes to handle checkboxes correctly:
10**MODE 1: AJAX/JSON Submission (Primary - /save_all_settings)**
11- JavaScript intercepts form submission with e.preventDefault()
12- Checkbox values read directly from DOM via checkbox.checked
13- Data sent as JSON: {"setting.key": true/false}
14- Hidden fallback inputs are managed but NOT used in this mode
15- Provides better UX with instant feedback and validation
17**MODE 2: Traditional POST Submission (Fallback - /save_settings)**
18- Used when JavaScript is disabled (accessibility/no-JS environments)
19- Browser submits form data naturally via request.form
20- Hidden fallback pattern CRITICAL here:
21 * Checked checkbox: Submits checkbox value, hidden input disabled
22 * Unchecked checkbox: Submits hidden input value "false"
23- Ensures unchecked checkboxes are captured (HTML limitation workaround)
25**Implementation Details:**
261. Each checkbox has `data-hidden-fallback` attribute → hidden input ID
272. checkbox_handler.js manages hidden input disabled state
283. AJAX mode: settings.js reads checkbox.checked directly (lines 2233-2240)
294. POST mode: Flask reads request.form including enabled hidden inputs
305. Both modes use convert_setting_value() for consistent boolean conversion
32**Why Both Patterns?**
33- AJAX: Better UX, immediate validation, no page reload
34- Traditional POST: Accessibility, progressive enhancement, JavaScript-free operation
35- Hidden inputs: Only meaningful for traditional POST, ignored in AJAX mode
37This dual-mode approach ensures the app works for all users while providing
38optimal experience when JavaScript is available.
39"""
41import platform
42from typing import Any, Optional, Tuple
43from datetime import datetime, UTC, timedelta
45import requests
46from flask import (
47 Blueprint,
48 flash,
49 jsonify,
50 redirect,
51 request,
52 session,
53 url_for,
54)
55from flask_wtf.csrf import generate_csrf
56from loguru import logger
58from ...config.paths import get_data_directory, get_encrypted_database_path
59from ...database.models import Setting, SettingType
60from ...database.session_context import get_user_db_session
61from ...database.encrypted_db import db_manager
62from ...utilities.db_utils import get_settings_manager
63from ...utilities.url_utils import normalize_url
64from ..auth.decorators import login_required
65from ..utils.rate_limiter import settings_limit
66from ...settings import SettingsManager
67from ...settings.manager import get_typed_setting_value, parse_boolean
68from ..services.settings_service import (
69 create_or_update_setting,
70 set_setting,
71)
72from ..utils.templates import render_template_with_defaults
73from ..server_config import sync_from_settings
74from ...security import safe_get
76# Create a Blueprint for settings
77settings_bp = Blueprint("settings", __name__, url_prefix="/settings")
79# Settings with dynamically populated options (excluded from validation)
80DYNAMIC_SETTINGS = ["llm.provider", "llm.model", "search.tool"]
82# Security: Block modification of settings that could enable code execution
83# These patterns match setting keys that could be used for dynamic imports
84# or other security-sensitive operations
85BLOCKED_SETTING_PATTERNS = [
86 "module_path", # Any setting ending with .module_path or containing module_path
87 "class_name", # Any setting ending with .class_name or containing class_name
88 "module", # Any setting containing "module" (e.g., custom_module, module_loader)
89 "class", # Any setting containing "class" (e.g., handler_class, class_loader)
90]
93def is_blocked_setting(key: str) -> bool:
94 """
95 Check if a setting key matches any blocked pattern.
97 These settings are blocked because they could potentially be used
98 for dynamic code imports/execution, which is a security risk.
100 Args:
101 key: The setting key to check (e.g., "search.custom_engine.module_path")
103 Returns:
104 True if the setting is blocked, False otherwise
105 """
106 key_lower = key.lower()
107 for pattern in BLOCKED_SETTING_PATTERNS:
108 if pattern in key_lower:
109 return True
110 return False
113def get_blocked_settings_error(blocked_keys: list) -> dict:
114 """
115 Generate an error response for blocked setting modification attempts.
117 Args:
118 blocked_keys: List of setting keys that were blocked
120 Returns:
121 Error response dictionary
122 """
123 return {
124 "status": "error",
125 "message": "Security violation: Attempted to modify protected settings",
126 "errors": [
127 {
128 "key": key,
129 "name": key,
130 "error": "This setting cannot be modified for security reasons",
131 }
132 for key in blocked_keys
133 ],
134 }
137def _get_setting_from_session(key: str, default=None):
138 """Helper to get a setting using the current session context."""
139 username = session.get("username")
140 with get_user_db_session(username) as db_session:
141 if db_session:
142 settings_manager = get_settings_manager(db_session, username)
143 return settings_manager.get_setting(key, default)
144 return default
147def calculate_warnings():
148 """Calculate current warning conditions based on settings"""
149 warnings = []
151 try:
152 # Get current settings using proper session context
153 username = session.get("username")
154 with get_user_db_session(username) as db_session:
155 if db_session: 155 ↛ 174line 155 didn't jump to line 174
156 settings_manager = get_settings_manager(db_session, username)
157 provider = settings_manager.get_setting(
158 "llm.provider", "ollama"
159 ).lower()
160 local_context = settings_manager.get_setting(
161 "llm.local_context_window_size", 4096
162 )
164 # Get dismissal settings
165 dismiss_high_context = settings_manager.get_setting(
166 "app.warnings.dismiss_high_context", False
167 )
169 logger.debug(
170 f"Starting warning calculation - provider={provider}"
171 )
173 # Check warning conditions
174 is_local_provider = provider in [
175 "ollama",
176 "llamacpp",
177 "lmstudio",
178 "vllm",
179 ]
181 # High context warning for local providers
182 if (
183 is_local_provider
184 and local_context > 8192
185 and not dismiss_high_context
186 ):
187 warnings.append(
188 {
189 "type": "high_context",
190 "icon": "⚠️",
191 "title": "High Context Warning",
192 "message": f"Context size ({local_context:,} tokens) may cause memory issues with {provider}. Increase VRAM or reduce context size if you experience slowdowns.",
193 "dismissKey": "app.warnings.dismiss_high_context",
194 }
195 )
197 # Get additional warning settings
198 with get_user_db_session(username) as db_session:
199 if db_session: 199 ↛ 210line 199 didn't jump to line 210
200 settings_manager = get_settings_manager(db_session, username)
201 dismiss_model_mismatch = settings_manager.get_setting(
202 "app.warnings.dismiss_model_mismatch", False
203 )
205 # Get current strategy and model (these need to be passed from the frontend or retrieved differently)
206 # For now, we'll implement basic warnings that don't require form state
208 # Model mismatch warning (simplified - checking setting instead of form value)
209 current_model = settings_manager.get_setting("llm.model", "")
210 if ( 210 ↛ 217line 210 didn't jump to line 217 because the condition on line 210 was never true
211 current_model
212 and "70b" in current_model.lower()
213 and is_local_provider
214 and local_context > 8192
215 and not dismiss_model_mismatch
216 ):
217 warnings.append(
218 {
219 "type": "model_mismatch",
220 "icon": "🧠",
221 "title": "Model & Context Warning",
222 "message": f"Large model ({current_model}) with high context ({local_context:,}) may exceed VRAM. Consider reducing context size or upgrading GPU memory.",
223 "dismissKey": "app.warnings.dismiss_model_mismatch",
224 }
225 )
227 except Exception as e:
228 logger.warning(f"Error calculating warnings: {e}")
230 return warnings
233def validate_setting(
234 setting: Setting, value: Any
235) -> Tuple[bool, Optional[str]]:
236 """
237 Validate a setting value based on its type and constraints.
239 Args:
240 setting: The Setting object to validate against
241 value: The value to validate
243 Returns:
244 Tuple of (is_valid, error_message)
245 """
246 # Convert value to appropriate type first using SettingsManager's logic
247 value = get_typed_setting_value(
248 key=setting.key,
249 value=value,
250 ui_element=setting.ui_element,
251 default=None,
252 check_env=False,
253 )
255 # Validate based on UI element type
256 if setting.ui_element == "checkbox":
257 # After conversion, should be boolean
258 if not isinstance(value, bool): 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 return False, "Value must be a boolean"
261 elif setting.ui_element in ("number", "slider", "range"):
262 # After conversion, should be numeric
263 if not isinstance(value, (int, float)):
264 return False, "Value must be a number"
266 # Check min/max constraints if defined
267 if setting.min_value is not None and value < setting.min_value:
268 return False, f"Value must be at least {setting.min_value}"
269 if setting.max_value is not None and value > setting.max_value:
270 return False, f"Value must be at most {setting.max_value}"
272 elif setting.ui_element == "select":
273 # Check if value is in the allowed options
274 if setting.options: 274 ↛ 288line 274 didn't jump to line 288 because the condition on line 274 was always true
275 # Skip options validation for dynamically populated dropdowns
276 if setting.key not in DYNAMIC_SETTINGS:
277 allowed_values = [
278 opt.get("value") if isinstance(opt, dict) else opt
279 for opt in setting.options
280 ]
281 if value not in allowed_values:
282 return (
283 False,
284 f"Value must be one of: {', '.join(str(v) for v in allowed_values)}",
285 )
287 # All checks passed
288 return True, None
291def coerce_setting_for_write(key: str, value: Any, ui_element: str) -> Any:
292 """Coerce an incoming value to the correct type before writing to the DB.
294 All web routes that save settings should use this function to ensure
295 consistent type conversion.
297 No JSON pre-parsing (``json.loads``) is needed here because:
298 - ``get_typed_setting_value`` already parses JSON strings internally
299 via ``_parse_json_value`` (for ``ui_element="json"``) and
300 ``_parse_multiselect`` (for ``ui_element="multiselect"``).
301 - For JSON API endpoints, ``request.get_json()`` already delivers
302 dicts/lists as native Python objects.
303 - For ``ui_element="text"``, pre-parsing would corrupt data: a JSON
304 string like ``'{"k": "v"}'`` would become a dict, then ``str()``
305 would produce ``"{'k': 'v'}"`` (Python repr, not valid JSON).
306 """
307 # check_env=False: we are persisting a user-supplied value, not reading
308 # from an environment variable override. check_env=True (the default)
309 # would silently replace the user's value with an env var, which is
310 # incorrect on the write path.
311 return get_typed_setting_value(
312 key=key,
313 value=value,
314 ui_element=ui_element,
315 default=None,
316 check_env=False,
317 )
320@settings_bp.route("/", methods=["GET"])
321@login_required
322def settings_page():
323 """Main settings dashboard with links to specialized config pages"""
324 return render_template_with_defaults("settings_dashboard.html")
327@settings_bp.route("/save_all_settings", methods=["POST"])
328@login_required
329@settings_limit
330def save_all_settings():
331 """Handle saving all settings at once from the unified settings page"""
332 username = session.get("username")
334 with get_user_db_session(username) as db_session:
335 # Get the settings manager but we don't need to assign it to a variable right now
336 # get_db_settings_manager(db_session)
338 try:
339 # Process JSON data
340 form_data = request.get_json()
341 if not form_data:
342 return (
343 jsonify(
344 {
345 "status": "error",
346 "message": "No settings data provided",
347 }
348 ),
349 400,
350 )
352 # Security check: Block any attempts to modify dangerous settings
353 blocked_keys = [
354 key for key in form_data.keys() if is_blocked_setting(key)
355 ]
356 if blocked_keys:
357 logger.warning(
358 f"Blocked attempt to modify protected settings: {blocked_keys}"
359 )
360 return jsonify(get_blocked_settings_error(blocked_keys)), 403
362 # Track validation errors
363 validation_errors = []
364 settings_by_type = {}
366 # Track changes for logging
367 updated_settings = []
368 created_settings = []
370 # Store original values for better messaging
371 original_values = {}
373 # Fetch all settings once to avoid N+1 query problem
374 all_db_settings = {
375 setting.key: setting
376 for setting in db_session.query(Setting).all()
377 }
379 # Filter out non-editable settings
380 non_editable_keys = [
381 key
382 for key in form_data.keys()
383 if key in all_db_settings and not all_db_settings[key].editable
384 ]
385 if non_editable_keys: 385 ↛ 393line 385 didn't jump to line 393 because the condition on line 385 was always true
386 logger.warning(
387 f"Skipping non-editable settings: {non_editable_keys}"
388 )
389 for key in non_editable_keys:
390 del form_data[key]
392 # Update each setting
393 for key, value in form_data.items(): 393 ↛ 395line 393 didn't jump to line 395 because the loop on line 393 never started
394 # Skip corrupted keys or empty strings as keys
395 if not key or not isinstance(key, str) or key.strip() == "":
396 continue
398 # Get the setting metadata from pre-fetched dict
399 current_setting = all_db_settings.get(key)
401 # EARLY VALIDATION: Convert checkbox values BEFORE any other processing
402 # This prevents incorrect triggering of corrupted value detection
403 if current_setting and current_setting.ui_element == "checkbox":
404 if not isinstance(value, bool):
405 logger.debug(
406 f"Converting checkbox {key} from {type(value).__name__} to bool: {value}"
407 )
408 value = parse_boolean(value)
409 form_data[key] = (
410 value # Update the form_data with converted value
411 )
413 # Store original value for messaging
414 if current_setting:
415 original_values[key] = current_setting.value
417 # Determine setting type and category
418 if key.startswith("llm."):
419 setting_type = SettingType.LLM
420 category = "llm_general"
421 if (
422 "temperature" in key
423 or "max_tokens" in key
424 or "batch" in key
425 or "layers" in key
426 ):
427 category = "llm_parameters"
428 elif key.startswith("search."):
429 setting_type = SettingType.SEARCH
430 category = "search_general"
431 if (
432 "iterations" in key
433 or "results" in key
434 or "region" in key
435 or "questions" in key
436 or "section" in key
437 ):
438 category = "search_parameters"
439 elif key.startswith("report."):
440 setting_type = SettingType.REPORT
441 category = "report_parameters"
442 elif key.startswith("database."):
443 setting_type = SettingType.DATABASE
444 category = "database_parameters"
445 elif key.startswith("app."):
446 setting_type = SettingType.APP
447 category = "app_interface"
448 else:
449 setting_type = None
450 category = None
452 # Special handling for corrupted or empty values
453 if value == "[object Object]" or (
454 isinstance(value, str)
455 and value.strip() in ["{}", "[]", "{", "["]
456 ):
457 if key.startswith("report."):
458 value = {}
459 else:
460 # Use default or null for other types
461 if key == "llm.model":
462 value = "gpt-3.5-turbo"
463 elif key == "llm.provider":
464 value = "openai"
465 elif key == "search.tool":
466 value = "auto"
467 elif key in ["app.theme", "app.default_theme"]:
468 value = "dark"
469 else:
470 value = None
472 logger.warning(
473 f"Corrected corrupted value for {key}: {value}"
474 )
475 # NOTE: No JSON pre-parsing is done here. After the
476 # corruption replacement above, values are Python dicts
477 # (e.g. {}), hardcoded strings, or None — none are JSON
478 # strings that need parsing. Type conversion below via
479 # coerce_setting_for_write() handles everything; that
480 # function delegates to get_typed_setting_value() which
481 # already parses JSON internally for "json" and
482 # "multiselect" ui_elements.
484 if current_setting:
485 # Coerce to correct Python type (e.g. str "5" → int 5
486 # for number settings, str "true" → bool for checkboxes).
487 converted_value = coerce_setting_for_write(
488 key=current_setting.key,
489 value=value,
490 ui_element=current_setting.ui_element,
491 )
493 # Validate the setting
494 is_valid, error_message = validate_setting(
495 current_setting, converted_value
496 )
498 if is_valid:
499 # Save the converted setting using the same session
500 success = set_setting(
501 key, converted_value, db_session=db_session
502 )
503 if success:
504 updated_settings.append(key)
506 # Track settings by type for exporting
507 if current_setting.type not in settings_by_type:
508 settings_by_type[current_setting.type] = []
509 settings_by_type[current_setting.type].append(
510 current_setting
511 )
512 else:
513 # Add to validation errors
514 validation_errors.append(
515 {
516 "key": key,
517 "name": current_setting.name,
518 "error": error_message,
519 }
520 )
521 else:
522 # Create a new setting
523 new_setting = {
524 "key": key,
525 "value": value,
526 "type": setting_type.value.lower(),
527 "name": key.split(".")[-1].replace("_", " ").title(),
528 "description": f"Setting for {key}",
529 "category": category,
530 "ui_element": "text", # Default UI element
531 }
533 # Determine better UI element based on value type
534 if isinstance(value, bool):
535 new_setting["ui_element"] = "checkbox"
536 elif isinstance(value, (int, float)) and not isinstance(
537 value, bool
538 ):
539 new_setting["ui_element"] = "number"
540 elif isinstance(value, (dict, list)):
541 new_setting["ui_element"] = "textarea"
543 # Create the setting
544 db_setting = create_or_update_setting(
545 new_setting, db_session=db_session
546 )
548 if db_setting:
549 created_settings.append(key)
550 # Track settings by type for exporting
551 if db_setting.type not in settings_by_type:
552 settings_by_type[db_setting.type] = []
553 settings_by_type[db_setting.type].append(db_setting)
554 else:
555 validation_errors.append(
556 {
557 "key": key,
558 "name": new_setting["name"],
559 "error": "Failed to create setting",
560 }
561 )
563 # Report validation errors if any
564 if validation_errors: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true
565 return (
566 jsonify(
567 {
568 "status": "error",
569 "message": "Validation errors",
570 "errors": validation_errors,
571 }
572 ),
573 400,
574 )
576 # Get all settings to return to the client for proper state update
577 all_settings = []
578 for setting in db_session.query(Setting).all():
579 # Convert enum to string if present
580 setting_type = setting.type
581 if hasattr(setting_type, "value"): 581 ↛ 584line 581 didn't jump to line 584 because the condition on line 581 was always true
582 setting_type = setting_type.value
584 all_settings.append(
585 {
586 "key": setting.key,
587 "value": setting.value,
588 "name": setting.name,
589 "description": setting.description,
590 "type": setting_type,
591 "category": setting.category,
592 "ui_element": setting.ui_element,
593 "editable": setting.editable,
594 "options": setting.options,
595 }
596 )
598 # Customize the success message based on what changed
599 success_message = ""
600 if len(updated_settings) == 1: 600 ↛ 602line 600 didn't jump to line 602 because the condition on line 600 was never true
601 # For a single update, provide more specific info about what changed
602 key = updated_settings[0]
603 # Reuse the already-fetched setting from our pre-fetched dict
604 updated_setting = all_db_settings.get(key)
605 name = (
606 updated_setting.name
607 if updated_setting
608 else key.split(".")[-1].replace("_", " ").title()
609 )
611 # Format the message
612 if key in original_values:
613 # Get original value but comment out if not used
614 # old_value = original_values[key]
615 new_value = (
616 updated_setting.value if updated_setting else None
617 )
619 # If it's a boolean, use "enabled/disabled" language
620 if isinstance(new_value, bool):
621 state = "enabled" if new_value else "disabled"
622 success_message = f"{name} {state}"
623 else:
624 # For non-boolean values
625 if isinstance(new_value, (dict, list)):
626 success_message = f"{name} updated"
627 else:
628 success_message = f"{name} updated"
629 else:
630 success_message = f"{name} updated"
631 else:
632 # Multiple settings or generic message
633 success_message = f"Settings saved successfully ({len(updated_settings)} updated, {len(created_settings)} created)"
635 # Check if any warning-affecting settings were changed and include warnings
636 response_data = {
637 "status": "success",
638 "message": success_message,
639 "updated": updated_settings,
640 "created": created_settings,
641 "settings": all_settings,
642 }
644 warning_affecting_keys = [
645 "llm.provider",
646 "search.tool",
647 "search.iterations",
648 "search.questions_per_iteration",
649 "llm.local_context_window_size",
650 "llm.context_window_unrestricted",
651 "llm.context_window_size",
652 ]
654 # Check if any warning-affecting settings were changed
655 if any( 655 ↛ 659line 655 didn't jump to line 659 because the condition on line 655 was never true
656 key in warning_affecting_keys
657 for key in updated_settings + created_settings
658 ):
659 warnings = calculate_warnings()
660 response_data["warnings"] = warnings
661 logger.info(
662 f"Bulk settings update affected warning keys, calculated {len(warnings)} warnings"
663 )
665 return jsonify(response_data)
667 except Exception:
668 logger.exception("Error saving settings")
669 return (
670 jsonify(
671 {
672 "status": "error",
673 "message": "An internal error occurred while saving settings.",
674 }
675 ),
676 500,
677 )
680@settings_bp.route("/reset_to_defaults", methods=["POST"])
681@login_required
682@settings_limit
683def reset_to_defaults():
684 """Reset all settings to their default values"""
685 username = session.get("username")
687 with get_user_db_session(username) as db_session:
688 # Import default settings from files
689 try:
690 # Create settings manager with proper session context
691 username = session.get("username")
692 with get_user_db_session(username) as db_session:
693 settings_mgr = SettingsManager(db_session)
694 # Import settings from default files
695 settings_mgr.load_from_defaults_file()
697 logger.info("Successfully imported settings from default files")
699 except Exception:
700 logger.exception("Error importing default settings")
702 # Return success
703 return jsonify(
704 {
705 "status": "success",
706 "message": "All settings have been reset to default values",
707 }
708 )
711@settings_bp.route("/save_settings", methods=["POST"])
712@login_required
713@settings_limit
714def save_settings():
715 """Save all settings from the form using POST method - fallback when JavaScript is disabled"""
716 try:
717 username = session.get("username")
719 # Get form data
720 form_data = request.form.to_dict()
722 # Remove CSRF token from the data
723 form_data.pop("csrf_token", None)
725 # Security check: Block any attempts to modify dangerous settings
726 blocked_keys = [
727 key for key in form_data.keys() if is_blocked_setting(key)
728 ]
729 if blocked_keys: 729 ↛ 730line 729 didn't jump to line 730 because the condition on line 729 was never true
730 logger.warning(
731 f"Blocked attempt to modify protected settings: {blocked_keys}"
732 )
733 flash(
734 "Security error: Attempted to modify protected settings. "
735 "These settings cannot be modified for security reasons.",
736 "error",
737 )
738 return redirect(url_for("settings.settings_page"))
740 with get_user_db_session(username) as db_session:
741 settings_manager = SettingsManager(db_session)
743 updated_count = 0
744 failed_count = 0
746 # Fetch all settings once to avoid N+1 query problem
747 all_db_settings = {
748 setting.key: setting
749 for setting in db_session.query(Setting).all()
750 }
752 # Filter out non-editable settings
753 non_editable_keys = [
754 key
755 for key in form_data.keys()
756 if key in all_db_settings and not all_db_settings[key].editable
757 ]
758 if non_editable_keys: 758 ↛ 766line 758 didn't jump to line 766 because the condition on line 758 was always true
759 logger.warning(
760 f"Skipping non-editable settings: {non_editable_keys}"
761 )
762 for key in non_editable_keys:
763 del form_data[key]
765 # Process each setting
766 for key, value in form_data.items(): 766 ↛ 767line 766 didn't jump to line 767 because the loop on line 766 never started
767 try:
768 # Get the setting from pre-fetched dict
769 db_setting = all_db_settings.get(key)
771 # Coerce form POST string to correct Python type.
772 if db_setting:
773 value = coerce_setting_for_write(
774 key=db_setting.key,
775 value=value,
776 ui_element=db_setting.ui_element,
777 )
779 # Save the setting
780 if settings_manager.set_setting(key, value, commit=False):
781 updated_count += 1
782 else:
783 failed_count += 1
784 logger.warning(f"Failed to save setting {key}")
786 except Exception:
787 logger.exception(f"Error saving setting {key}")
788 failed_count += 1
790 # Commit all changes at once
791 try:
792 db_session.commit()
794 flash(
795 f"Settings saved successfully! Updated {updated_count} settings.",
796 "success",
797 )
798 if failed_count > 0: 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true
799 flash(
800 f"Warning: {failed_count} settings failed to save.",
801 "warning",
802 )
804 # Sync server config
805 settings_snapshot = settings_manager.get_settings_snapshot()
806 sync_from_settings(settings_snapshot)
808 except Exception:
809 db_session.rollback()
810 logger.exception("Failed to commit settings")
811 flash("Error saving settings. Please try again.", "error")
813 return redirect(url_for("settings.settings_page"))
815 except Exception:
816 logger.exception("Error in save_settings")
817 flash("An internal error occurred while saving settings.", "error")
818 return redirect(url_for("settings.settings_page"))
821# API Routes
822@settings_bp.route("/api", methods=["GET"])
823@login_required
824def api_get_all_settings():
825 """Get all settings"""
826 try:
827 # Get query parameters
828 category = request.args.get("category")
829 username = session.get("username")
831 with get_user_db_session(username) as db_session:
832 # Create settings manager with the session from context
833 # This ensures thread safety
834 settings_manager = SettingsManager(db_session)
836 # Get settings
837 settings = settings_manager.get_all_settings()
839 # Filter by category if requested
840 if category: 840 ↛ 841line 840 didn't jump to line 841 because the condition on line 840 was never true
841 filtered_settings = {}
842 # Need to get all setting details to check category
843 db_settings = db_session.query(Setting).all()
844 category_keys = [
845 s.key for s in db_settings if s.category == category
846 ]
848 # Filter settings by keys
849 for key, value in settings.items():
850 if key in category_keys:
851 filtered_settings[key] = value
853 settings = filtered_settings
855 return jsonify({"status": "success", "settings": settings})
856 except Exception:
857 logger.exception("Error getting settings")
858 return jsonify({"error": "Failed to retrieve settings"}), 500
861@settings_bp.route("/api/<path:key>", methods=["GET"])
862@login_required
863def api_get_db_setting(key):
864 """Get a specific setting by key"""
865 try:
866 username = session.get("username")
868 with get_user_db_session(username) as db_session:
869 # Get setting from database using the same session
870 db_setting = (
871 db_session.query(Setting).filter(Setting.key == key).first()
872 )
874 if db_setting:
875 # Return full setting details
876 setting_data = {
877 "key": db_setting.key,
878 "value": db_setting.value,
879 "type": db_setting.type
880 if isinstance(db_setting.type, str)
881 else db_setting.type.value,
882 "name": db_setting.name,
883 "description": db_setting.description,
884 "category": db_setting.category,
885 "ui_element": db_setting.ui_element,
886 "options": db_setting.options,
887 "min_value": db_setting.min_value,
888 "max_value": db_setting.max_value,
889 "step": db_setting.step,
890 "visible": db_setting.visible,
891 "editable": db_setting.editable,
892 }
893 return jsonify(setting_data)
894 else:
895 # Setting not found
896 return jsonify({"error": f"Setting not found: {key}"}), 404
897 except Exception:
898 logger.exception(f"Error getting setting {key}")
899 return jsonify({"error": "Failed to retrieve settings"}), 500
902@settings_bp.route("/api/<path:key>", methods=["PUT"])
903@login_required
904def api_update_setting(key):
905 """Update a setting"""
906 try:
907 # Security check: Block any attempts to modify dangerous settings
908 if is_blocked_setting(key): 908 ↛ 909line 908 didn't jump to line 909 because the condition on line 908 was never true
909 logger.warning(
910 f"Blocked attempt to modify protected setting via API: {key}"
911 )
912 return jsonify(
913 {
914 "error": "This setting cannot be modified for security reasons"
915 }
916 ), 403
918 # Get request data
919 data = request.get_json()
920 if not data: 920 ↛ 921line 920 didn't jump to line 921 because the condition on line 920 was never true
921 return jsonify({"error": "No data provided"}), 400
923 value = data.get("value")
924 if value is None: 924 ↛ 925line 924 didn't jump to line 925 because the condition on line 924 was never true
925 return jsonify({"error": "No value provided"}), 400
927 username = session.get("username")
929 with get_user_db_session(username) as db_session:
930 # Only use settings_manager if needed - we don't need to assign if not used
931 # get_db_settings_manager(db_session)
933 # Check if setting exists
934 db_setting = (
935 db_session.query(Setting).filter(Setting.key == key).first()
936 )
938 if db_setting:
939 # Check if setting is editable
940 if not db_setting.editable:
941 return jsonify(
942 {"error": f"Setting {key} is not editable"}
943 ), 403
945 # Coerce to correct Python type before saving.
946 # Without this, values from JSON API requests are stored
947 # as-is (e.g. string "5" instead of int 5 for number
948 # settings, string "true" instead of bool for checkboxes).
949 value = coerce_setting_for_write(
950 key=db_setting.key,
951 value=value,
952 ui_element=db_setting.ui_element,
953 )
955 # Validate the setting (matches save_all_settings pattern)
956 is_valid, error_message = validate_setting(db_setting, value)
957 if not is_valid: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 logger.warning(
959 f"Validation failed for setting {key}: {error_message}"
960 )
961 return jsonify(
962 {"error": f"Invalid value for setting {key}"}
963 ), 400
965 # Update setting
966 # Pass the db_session to avoid session lookup issues
967 success = set_setting(key, value, db_session=db_session)
968 if success: 968 ↛ 998line 968 didn't jump to line 998 because the condition on line 968 was always true
969 # Sync server config
970 settings_manager = SettingsManager(db_session)
971 settings_snapshot = settings_manager.get_settings_snapshot()
972 sync_from_settings(settings_snapshot)
974 response_data = {
975 "message": f"Setting {key} updated successfully"
976 }
978 # If this is a key that affects warnings, include warning calculations
979 warning_affecting_keys = [
980 "llm.provider",
981 "search.tool",
982 "search.iterations",
983 "search.questions_per_iteration",
984 "llm.local_context_window_size",
985 "llm.context_window_unrestricted",
986 "llm.context_window_size",
987 ]
989 if key in warning_affecting_keys:
990 warnings = calculate_warnings()
991 response_data["warnings"] = warnings
992 logger.debug(
993 f"Setting {key} changed to {value}, calculated {len(warnings)} warnings"
994 )
996 return jsonify(response_data)
997 else:
998 return jsonify(
999 {"error": f"Failed to update setting {key}"}
1000 ), 500
1001 else:
1002 # Create new setting with default metadata
1003 setting_dict = {
1004 "key": key,
1005 "value": value,
1006 "name": key.split(".")[-1].replace("_", " ").title(),
1007 "description": f"Setting for {key}",
1008 }
1010 # Add additional metadata if provided
1011 for field in [
1012 "type",
1013 "name",
1014 "description",
1015 "category",
1016 "ui_element",
1017 "options",
1018 "min_value",
1019 "max_value",
1020 "step",
1021 "visible",
1022 "editable",
1023 ]:
1024 if field in data:
1025 setting_dict[field] = data[field]
1027 # Create setting
1028 db_setting = create_or_update_setting(
1029 setting_dict, db_session=db_session
1030 )
1032 if db_setting: 1032 ↛ 1053line 1032 didn't jump to line 1053 because the condition on line 1032 was always true
1033 # Sync server config
1034 settings_manager = SettingsManager(db_session)
1035 settings_snapshot = settings_manager.get_settings_snapshot()
1036 sync_from_settings(settings_snapshot)
1038 return (
1039 jsonify(
1040 {
1041 "message": f"Setting {key} created successfully",
1042 "setting": {
1043 "key": db_setting.key,
1044 "value": db_setting.value,
1045 "type": db_setting.type.value,
1046 "name": db_setting.name,
1047 },
1048 }
1049 ),
1050 201,
1051 )
1052 else:
1053 return jsonify(
1054 {"error": f"Failed to create setting {key}"}
1055 ), 500
1056 except Exception:
1057 logger.exception(f"Error updating setting {key}")
1058 return jsonify({"error": "Failed to update setting"}), 500
1061@settings_bp.route("/api/<path:key>", methods=["DELETE"])
1062@login_required
1063def api_delete_setting(key):
1064 """Delete a setting"""
1065 try:
1066 # Security check: Block any attempts to delete dangerous settings
1067 if is_blocked_setting(key): 1067 ↛ 1068line 1067 didn't jump to line 1068 because the condition on line 1067 was never true
1068 logger.warning(
1069 f"Blocked attempt to delete protected setting via API: {key}"
1070 )
1071 return jsonify(
1072 {
1073 "error": "This setting cannot be modified for security reasons"
1074 }
1075 ), 403
1077 username = session.get("username")
1079 with get_user_db_session(username) as db_session:
1080 # Create settings manager with the session from context
1081 settings_manager = SettingsManager(db_session)
1083 # Check if setting exists
1084 db_setting = (
1085 db_session.query(Setting).filter(Setting.key == key).first()
1086 )
1087 if not db_setting:
1088 return jsonify({"error": f"Setting not found: {key}"}), 404
1090 # Check if setting is editable
1091 if not db_setting.editable:
1092 return jsonify({"error": f"Setting {key} is not editable"}), 403
1094 # Delete setting
1095 success = settings_manager.delete_setting(key)
1096 if success: 1096 ↛ 1101line 1096 didn't jump to line 1101 because the condition on line 1096 was always true
1097 return jsonify(
1098 {"message": f"Setting {key} deleted successfully"}
1099 )
1100 else:
1101 return jsonify(
1102 {"error": f"Failed to delete setting {key}"}
1103 ), 500
1104 except Exception:
1105 logger.exception(f"Error deleting setting {key}")
1106 return jsonify({"error": "Failed to delete setting"}), 500
1109@settings_bp.route("/api/import", methods=["POST"])
1110@login_required
1111@settings_limit
1112def api_import_settings():
1113 """Import settings from defaults file"""
1114 try:
1115 username = session.get("username")
1116 with get_user_db_session(username) as db_session:
1117 settings_manager = SettingsManager(db_session)
1118 settings_manager.load_from_defaults_file()
1120 return jsonify({"message": "Settings imported successfully"})
1121 except Exception:
1122 logger.exception("Error importing settings")
1123 return jsonify({"error": "Failed to import settings"}), 500
1126@settings_bp.route("/api/categories", methods=["GET"])
1127@login_required
1128def api_get_categories():
1129 """Get all setting categories"""
1130 try:
1131 username = session.get("username")
1133 with get_user_db_session(username) as db_session:
1134 # Get all distinct categories
1135 categories = db_session.query(Setting.category).distinct().all()
1136 category_list = [c[0] for c in categories if c[0] is not None]
1138 return jsonify({"categories": category_list})
1139 except Exception:
1140 logger.exception("Error getting categories")
1141 return jsonify({"error": "Failed to retrieve settings"}), 500
1144@settings_bp.route("/api/types", methods=["GET"])
1145@login_required
1146def api_get_types():
1147 """Get all setting types"""
1148 try:
1149 # Get all setting types
1150 types = [t.value for t in SettingType]
1151 return jsonify({"types": types})
1152 except Exception:
1153 logger.exception("Error getting types")
1154 return jsonify({"error": "Failed to retrieve settings"}), 500
1157@settings_bp.route("/api/ui_elements", methods=["GET"])
1158@login_required
1159def api_get_ui_elements():
1160 """Get all UI element types"""
1161 try:
1162 # Define supported UI element types
1163 ui_elements = [
1164 "text",
1165 "select",
1166 "checkbox",
1167 "slider",
1168 "number",
1169 "textarea",
1170 "color",
1171 "date",
1172 "file",
1173 "password",
1174 ]
1176 return jsonify({"ui_elements": ui_elements})
1177 except Exception:
1178 logger.exception("Error getting UI elements")
1179 return jsonify({"error": "Failed to retrieve settings"}), 500
1182@settings_bp.route("/api/available-models", methods=["GET"])
1183@login_required
1184def api_get_available_models():
1185 """Get available LLM models from various providers"""
1186 try:
1187 from flask import request
1189 from ...database.models import ProviderModel
1191 # Check if force_refresh is requested
1192 force_refresh = (
1193 request.args.get("force_refresh", "false").lower() == "true"
1194 )
1196 # Get all auto-discovered providers
1197 from ...llm.providers import get_discovered_provider_options
1199 provider_options = get_discovered_provider_options()
1201 # Add remaining hardcoded providers (complex local providers not yet migrated)
1202 provider_options.extend(
1203 [
1204 {"value": "VLLM", "label": "vLLM (Local)"},
1205 {"value": "LLAMACPP", "label": "Llama.cpp (Local)"},
1206 ]
1207 )
1209 # Available models by provider
1210 providers = {}
1212 # Check database cache first (unless force_refresh is True)
1213 if not force_refresh: 1213 ↛ 1263line 1213 didn't jump to line 1263 because the condition on line 1213 was always true
1214 try:
1215 # Define cache expiration (24 hours)
1216 cache_expiry = datetime.now(UTC) - timedelta(hours=24)
1218 # Get cached models from database
1219 username = session.get("username")
1220 with get_user_db_session(username) as db_session:
1221 cached_models = (
1222 db_session.query(ProviderModel)
1223 .filter(ProviderModel.last_updated > cache_expiry)
1224 .all()
1225 )
1227 if cached_models: 1227 ↛ 1228line 1227 didn't jump to line 1228 because the condition on line 1227 was never true
1228 logger.info(
1229 f"Found {len(cached_models)} cached models in database"
1230 )
1232 # Group models by provider
1233 for model in cached_models:
1234 provider_key = f"{model.provider.lower()}_models"
1235 if provider_key not in providers:
1236 providers[provider_key] = []
1238 providers[provider_key].append(
1239 {
1240 "value": model.model_key,
1241 "label": model.model_label,
1242 "provider": model.provider.upper(),
1243 }
1244 )
1246 # If we have cached data for all providers, return it
1247 if providers:
1248 logger.info("Returning cached models from database")
1249 return jsonify(
1250 {
1251 "provider_options": provider_options,
1252 "providers": providers,
1253 }
1254 )
1256 except Exception as e:
1257 logger.warning(
1258 f"Error reading cached models from database: {e}"
1259 )
1260 # Continue to fetch fresh data
1262 # Try to get Ollama models
1263 ollama_models = []
1264 try:
1265 import json
1266 import re
1268 import requests
1270 # Try to query the Ollama API directly
1271 try:
1272 logger.info("Attempting to connect to Ollama API")
1274 raw_base_url = _get_setting_from_session(
1275 "llm.ollama.url", "http://localhost:11434"
1276 )
1277 base_url = (
1278 normalize_url(raw_base_url)
1279 if raw_base_url
1280 else "http://localhost:11434"
1281 )
1283 ollama_response = safe_get(
1284 f"{base_url}/api/tags",
1285 timeout=5,
1286 allow_localhost=True,
1287 allow_private_ips=True,
1288 )
1290 logger.debug(
1291 f"Ollama API response: Status {ollama_response.status_code}"
1292 )
1294 # Try to parse the response even if status code is not 200 to help with debugging
1295 response_text = ollama_response.text
1296 logger.debug(
1297 f"Ollama API raw response: {response_text[:500]}..."
1298 )
1300 if ollama_response.status_code == 200:
1301 try:
1302 ollama_data = ollama_response.json()
1303 logger.debug(
1304 f"Ollama API JSON data: {json.dumps(ollama_data)[:500]}..."
1305 )
1307 if "models" in ollama_data:
1308 # Format for newer Ollama API
1309 logger.info(
1310 f"Found {len(ollama_data.get('models', []))} models in newer Ollama API format"
1311 )
1312 for model in ollama_data.get("models", []):
1313 # Extract name correctly from the model object
1314 name = model.get("name", "")
1315 if name:
1316 # Improved display name formatting
1317 display_name = re.sub(
1318 r"[:/]", " ", name
1319 ).strip()
1320 display_name = " ".join(
1321 word.capitalize()
1322 for word in display_name.split()
1323 )
1324 # Create the model entry with value and label
1325 ollama_models.append(
1326 {
1327 "value": name, # Original model name as value (for API calls)
1328 "label": f"{display_name} (Ollama)", # Pretty name as label
1329 "provider": "OLLAMA", # Add provider field for consistency
1330 }
1331 )
1332 logger.debug(
1333 f"Added Ollama model: {name} -> {display_name}"
1334 )
1335 else:
1336 # Format for older Ollama API
1337 logger.info(
1338 f"Found {len(ollama_data)} models in older Ollama API format"
1339 )
1340 for model in ollama_data:
1341 name = model.get("name", "")
1342 if name:
1343 # Improved display name formatting
1344 display_name = re.sub(
1345 r"[:/]", " ", name
1346 ).strip()
1347 display_name = " ".join(
1348 word.capitalize()
1349 for word in display_name.split()
1350 )
1351 ollama_models.append(
1352 {
1353 "value": name,
1354 "label": f"{display_name} (Ollama)",
1355 "provider": "OLLAMA", # Add provider field for consistency
1356 }
1357 )
1358 logger.debug(
1359 f"Added Ollama model: {name} -> {display_name}"
1360 )
1362 except json.JSONDecodeError as json_err:
1363 logger.exception(
1364 f"Failed to parse Ollama API response as JSON: {json_err}"
1365 )
1366 raise Exception(
1367 f"Ollama API returned invalid JSON: {json_err}"
1368 )
1369 else:
1370 logger.warning(
1371 f"Ollama API returned non-200 status code: {ollama_response.status_code}"
1372 )
1373 raise Exception(
1374 f"Ollama API returned status code {ollama_response.status_code}"
1375 )
1377 except requests.exceptions.RequestException as e:
1378 logger.warning(f"Could not connect to Ollama API: {e!s}")
1379 # No fallback models - just return empty list
1380 logger.info("Ollama not available - no models to display")
1381 ollama_models = []
1383 # Always set the ollama_models in providers, whether we got real or fallback models
1384 providers["ollama_models"] = ollama_models
1385 logger.info(f"Final Ollama models count: {len(ollama_models)}")
1387 # Log some model names for debugging
1388 if ollama_models: 1388 ↛ 1389line 1388 didn't jump to line 1389 because the condition on line 1388 was never true
1389 model_names = [m["value"] for m in ollama_models[:5]]
1390 logger.info(f"Sample Ollama models: {', '.join(model_names)}")
1392 except Exception:
1393 logger.exception("Error getting Ollama models")
1394 # No fallback models - just return empty list
1395 logger.info("Error getting Ollama models - no models to display")
1396 providers["ollama_models"] = []
1398 # Note: Custom OpenAI Endpoint models are fetched via auto-discovery
1399 # (see the auto-discovery loop below which handles OPENAI_ENDPOINT provider)
1401 # Get OpenAI models using the OpenAI package
1402 openai_models = []
1403 try:
1404 logger.info(
1405 "Attempting to connect to OpenAI API using OpenAI package"
1406 )
1408 # Get the API key from settings
1409 api_key = _get_setting_from_session("llm.openai.api_key", "")
1411 if api_key: 1411 ↛ 1412line 1411 didn't jump to line 1412 because the condition on line 1411 was never true
1412 import openai
1413 from openai import OpenAI
1415 # Create OpenAI client
1416 client = OpenAI(api_key=api_key)
1418 try:
1419 # Fetch models using the client
1420 logger.debug("Fetching models from OpenAI API")
1421 models_response = client.models.list()
1423 # Process models from the response
1424 for model in models_response.data:
1425 model_id = model.id
1426 if model_id:
1427 # Create a clean display name
1428 display_name = model_id.replace("-", " ").strip()
1429 display_name = " ".join(
1430 word.capitalize()
1431 for word in display_name.split()
1432 )
1434 openai_models.append(
1435 {
1436 "value": model_id,
1437 "label": f"{display_name} (OpenAI)",
1438 "provider": "OPENAI",
1439 }
1440 )
1441 logger.debug(
1442 f"Added OpenAI model: {model_id} -> {display_name}"
1443 )
1445 # Keep original order from OpenAI - their models are returned in a
1446 # meaningful order (newer/more capable models first)
1448 except openai.APIError as api_err:
1449 logger.exception(f"OpenAI API error: {api_err!s}")
1450 logger.info("No OpenAI models found due to API error")
1452 else:
1453 logger.info(
1454 "OpenAI API key not configured, no models available"
1455 )
1457 except Exception:
1458 logger.exception("Error getting OpenAI models")
1459 logger.info("No OpenAI models available due to error")
1461 # Always set the openai_models in providers (will be empty array if no models found)
1462 providers["openai_models"] = openai_models
1463 logger.info(f"Final OpenAI models count: {len(openai_models)}")
1465 # Try to get Anthropic models using the Anthropic package
1466 anthropic_models = []
1467 try:
1468 logger.info(
1469 "Attempting to connect to Anthropic API using Anthropic package"
1470 )
1472 # Get the API key from settings
1473 api_key = _get_setting_from_session("llm.anthropic.api_key", "")
1475 if api_key: 1475 ↛ 1477line 1475 didn't jump to line 1477 because the condition on line 1475 was never true
1476 # Import Anthropic package here to avoid dependency issues if not installed
1477 from anthropic import Anthropic
1479 # Create Anthropic client
1480 client = Anthropic(api_key=api_key)
1482 try:
1483 # Fetch models using the client
1484 logger.debug("Fetching models from Anthropic API")
1485 models_response = client.models.list()
1487 # Process models from the response
1488 for model in models_response.data:
1489 model_id = model.id
1490 if model_id:
1491 # Create a clean display name
1492 display_name = model_id.replace("-", " ").strip()
1493 display_name = " ".join(
1494 word.capitalize()
1495 for word in display_name.split()
1496 )
1498 anthropic_models.append(
1499 {
1500 "value": model_id,
1501 "label": f"{display_name} (Anthropic)",
1502 "provider": "ANTHROPIC",
1503 }
1504 )
1505 logger.debug(
1506 f"Added Anthropic model: {model_id} -> {display_name}"
1507 )
1509 except Exception as api_err:
1510 logger.exception(f"Anthropic API error: {api_err!s}")
1511 else:
1512 logger.info("Anthropic API key not configured")
1514 except ImportError:
1515 logger.warning(
1516 "Anthropic package not installed. No models will be available."
1517 )
1518 except Exception:
1519 logger.exception("Error getting Anthropic models")
1521 # Set anthropic_models in providers (could be empty if API call failed)
1522 providers["anthropic_models"] = anthropic_models
1523 logger.info(f"Final Anthropic models count: {len(anthropic_models)}")
1525 # Fetch models from auto-discovered providers
1526 from ...llm.providers import discover_providers
1528 discovered_providers = discover_providers()
1530 for provider_key, provider_info in discovered_providers.items():
1531 provider_models = []
1532 try:
1533 logger.info(
1534 f"Fetching models from {provider_info.provider_name}"
1535 )
1537 # Get the provider class
1538 provider_class = provider_info.provider_class
1540 # Get API key if configured
1541 api_key = _get_setting_from_session(
1542 provider_class.api_key_setting, ""
1543 )
1545 # Get base URL if provider has configurable URL
1546 base_url = None
1547 if (
1548 hasattr(provider_class, "url_setting")
1549 and provider_class.url_setting
1550 ):
1551 base_url = _get_setting_from_session(
1552 provider_class.url_setting, ""
1553 )
1555 # Use the provider's list_models_for_api method
1556 models = provider_class.list_models_for_api(api_key, base_url)
1558 # Format models for the API response
1559 for model in models:
1560 provider_models.append(
1561 {
1562 "value": model["value"],
1563 "label": model[
1564 "label"
1565 ], # Use provider's label as-is
1566 "provider": provider_key,
1567 }
1568 )
1570 logger.info(
1571 f"Successfully fetched {len(provider_models)} models from {provider_info.provider_name}"
1572 )
1574 except Exception:
1575 logger.exception(
1576 f"Error getting {provider_info.provider_name} models"
1577 )
1579 # Set models in providers dict using lowercase key
1580 providers[f"{provider_key.lower()}_models"] = provider_models
1581 logger.info(
1582 f"Final {provider_key} models count: {len(provider_models)}"
1583 )
1585 # Save fetched models to database cache
1586 if force_refresh or providers: 1586 ↛ 1635line 1586 didn't jump to line 1635 because the condition on line 1586 was always true
1587 # We fetched fresh data, save it to database
1588 username = session.get("username")
1589 with get_user_db_session(username) as db_session:
1590 try:
1591 if force_refresh: 1591 ↛ 1594line 1591 didn't jump to line 1594 because the condition on line 1591 was never true
1592 # When force refresh, clear ALL cached models to remove any stale data
1593 # from old code versions or deleted providers
1594 deleted_count = db_session.query(ProviderModel).delete()
1595 logger.info(
1596 f"Force refresh: cleared all {deleted_count} cached models"
1597 )
1598 else:
1599 # Clear old cache entries only for providers we're updating
1600 for provider_key in providers:
1601 provider_name = provider_key.replace(
1602 "_models", ""
1603 ).upper()
1604 db_session.query(ProviderModel).filter(
1605 ProviderModel.provider == provider_name
1606 ).delete()
1608 # Insert new models
1609 for provider_key, models in providers.items():
1610 provider_name = provider_key.replace(
1611 "_models", ""
1612 ).upper()
1613 for model in models:
1614 if ( 1614 ↛ 1613line 1614 didn't jump to line 1613 because the condition on line 1614 was always true
1615 isinstance(model, dict)
1616 and "value" in model
1617 and "label" in model
1618 ):
1619 new_model = ProviderModel(
1620 provider=provider_name,
1621 model_key=model["value"],
1622 model_label=model["label"],
1623 last_updated=datetime.now(UTC),
1624 )
1625 db_session.add(new_model)
1627 db_session.commit()
1628 logger.info("Successfully cached models to database")
1630 except Exception:
1631 logger.exception("Error saving models to database cache")
1632 db_session.rollback()
1634 # Return all options
1635 return jsonify(
1636 {"provider_options": provider_options, "providers": providers}
1637 )
1639 except Exception:
1640 logger.exception("Error getting available models")
1641 return jsonify(
1642 {"status": "error", "message": "Failed to save settings"}
1643 ), 500
1646def _get_engine_icon_and_category(
1647 engine_data: dict, engine_class=None
1648) -> tuple:
1649 """
1650 Get icon emoji and category label for a search engine based on its attributes.
1652 Args:
1653 engine_data: Engine configuration dictionary
1654 engine_class: Optional loaded engine class to check attributes
1656 Returns:
1657 Tuple of (icon, category) strings
1658 """
1659 # Check attributes from either the class or the engine data
1660 if engine_class:
1661 is_scientific = getattr(engine_class, "is_scientific", False)
1662 is_generic = getattr(engine_class, "is_generic", False)
1663 is_local = getattr(engine_class, "is_local", False)
1664 is_news = getattr(engine_class, "is_news", False)
1665 is_code = getattr(engine_class, "is_code", False)
1666 else:
1667 is_scientific = engine_data.get("is_scientific", False)
1668 is_generic = engine_data.get("is_generic", False)
1669 is_local = engine_data.get("is_local", False)
1670 is_news = engine_data.get("is_news", False)
1671 is_code = engine_data.get("is_code", False)
1673 # Return icon and category based on engine type
1674 # Priority: local > scientific > news > code > generic > default
1675 if is_local:
1676 return "📁", "Local RAG"
1677 elif is_scientific:
1678 return "🔬", "Scientific"
1679 elif is_news:
1680 return "📰", "News"
1681 elif is_code:
1682 return "💻", "Code"
1683 elif is_generic:
1684 return "🌐", "Web Search"
1685 else:
1686 return "🔍", "Search"
1689@settings_bp.route("/api/available-search-engines", methods=["GET"])
1690@login_required
1691def api_get_available_search_engines():
1692 """Get available search engines"""
1693 try:
1694 # Get search engines using the same approach as search_engines_config.py
1695 from ...web_search_engines.search_engines_config import search_config
1696 from ...database.session_context import get_user_db_session
1698 username = session.get("username")
1699 with get_user_db_session(username) as db_session:
1700 search_engines = search_config(
1701 username=username, db_session=db_session
1702 )
1704 # Get user's favorites using SettingsManager
1705 settings_manager = SettingsManager(db_session)
1706 favorites = settings_manager.get_setting("search.favorites", [])
1707 if not isinstance(favorites, list): 1707 ↛ 1708line 1707 didn't jump to line 1708 because the condition on line 1707 was never true
1708 favorites = []
1710 # Extract search engines from config
1711 engines_dict = {}
1712 engine_options = []
1714 if search_engines: 1714 ↛ 1773line 1714 didn't jump to line 1773 because the condition on line 1714 was always true
1715 # Format engines for API response with metadata
1716 from ...security.module_whitelist import (
1717 get_safe_module_class,
1718 SecurityError,
1719 )
1721 for engine_id, engine_data in search_engines.items():
1722 # Try to load the engine class to get metadata
1723 engine_class = None
1724 try:
1725 module_path = engine_data.get("module_path")
1726 class_name = engine_data.get("class_name")
1727 if module_path and class_name: 1727 ↛ 1742line 1727 didn't jump to line 1742 because the condition on line 1727 was always true
1728 # Use secure whitelist-validated import
1729 engine_class = get_safe_module_class(
1730 module_path, class_name
1731 )
1732 except SecurityError as e:
1733 logger.warning(
1734 f"Security: Blocked unsafe module for {engine_id}: {e}"
1735 )
1736 except Exception as e:
1737 logger.debug(
1738 f"Could not load engine class for {engine_id}: {e}"
1739 )
1741 # Get icon and category from engine attributes
1742 icon, category = _get_engine_icon_and_category(
1743 engine_data, engine_class
1744 )
1746 # Build display name with icon and category
1747 base_name = engine_data.get("display_name", engine_id)
1748 label = f"{icon} {base_name} ({category})"
1750 # Check if engine is a favorite
1751 is_favorite = engine_id in favorites
1753 engines_dict[engine_id] = {
1754 "display_name": base_name,
1755 "description": engine_data.get("description", ""),
1756 "strengths": engine_data.get("strengths", []),
1757 "icon": icon,
1758 "category": category,
1759 "is_favorite": is_favorite,
1760 }
1762 engine_options.append(
1763 {
1764 "value": engine_id,
1765 "label": label,
1766 "icon": icon,
1767 "category": category,
1768 "is_favorite": is_favorite,
1769 }
1770 )
1772 # Sort engine_options: favorites first, then alphabetically by label
1773 engine_options.sort(
1774 key=lambda x: (
1775 not x.get("is_favorite", False),
1776 x.get("label", "").lower(),
1777 )
1778 )
1780 # If no engines found, log the issue but return empty list
1781 if not engine_options: 1781 ↛ 1782line 1781 didn't jump to line 1782 because the condition on line 1781 was never true
1782 logger.warning("No search engines found in configuration")
1784 return jsonify(
1785 {
1786 "engines": engines_dict,
1787 "engine_options": engine_options,
1788 "favorites": favorites,
1789 }
1790 )
1792 except Exception:
1793 logger.exception("Error getting available search engines")
1794 return jsonify({"error": "Failed to retrieve search engines"}), 500
1797@settings_bp.route("/api/search-favorites", methods=["GET"])
1798@login_required
1799def api_get_search_favorites():
1800 """Get the list of favorite search engines for the current user"""
1801 try:
1802 username = session.get("username")
1803 with get_user_db_session(username) as db_session:
1804 settings_manager = SettingsManager(db_session)
1805 favorites = settings_manager.get_setting("search.favorites", [])
1806 if not isinstance(favorites, list):
1807 favorites = []
1808 return jsonify({"favorites": favorites})
1810 except Exception:
1811 logger.exception("Error getting search favorites")
1812 return jsonify({"error": "Failed to retrieve favorites"}), 500
1815@settings_bp.route("/api/search-favorites", methods=["PUT"])
1816@login_required
1817def api_update_search_favorites():
1818 """Update the list of favorite search engines for the current user"""
1819 try:
1820 data = request.get_json()
1821 if not data: 1821 ↛ 1822line 1821 didn't jump to line 1822 because the condition on line 1821 was never true
1822 return jsonify({"error": "No data provided"}), 400
1824 favorites = data.get("favorites")
1825 if favorites is None:
1826 return jsonify({"error": "No favorites provided"}), 400
1828 if not isinstance(favorites, list):
1829 return jsonify({"error": "Favorites must be a list"}), 400
1831 username = session.get("username")
1832 with get_user_db_session(username) as db_session:
1833 settings_manager = SettingsManager(db_session)
1834 if settings_manager.set_setting("search.favorites", favorites):
1835 return jsonify(
1836 {
1837 "message": "Favorites updated successfully",
1838 "favorites": favorites,
1839 }
1840 )
1841 else:
1842 return jsonify({"error": "Failed to update favorites"}), 500
1844 except Exception:
1845 logger.exception("Error updating search favorites")
1846 return jsonify({"error": "Failed to update favorites"}), 500
1849@settings_bp.route("/api/search-favorites/toggle", methods=["POST"])
1850@login_required
1851def api_toggle_search_favorite():
1852 """Toggle a search engine as favorite"""
1853 try:
1854 data = request.get_json()
1855 if not data: 1855 ↛ 1856line 1855 didn't jump to line 1856 because the condition on line 1855 was never true
1856 return jsonify({"error": "No data provided"}), 400
1858 engine_id = data.get("engine_id")
1859 if not engine_id:
1860 return jsonify({"error": "No engine_id provided"}), 400
1862 username = session.get("username")
1863 with get_user_db_session(username) as db_session:
1864 settings_manager = SettingsManager(db_session)
1866 # Get current favorites
1867 favorites = settings_manager.get_setting("search.favorites", [])
1868 if not isinstance(favorites, list):
1869 favorites = []
1870 else:
1871 # Make a copy to avoid modifying the original
1872 favorites = list(favorites)
1874 # Toggle the engine
1875 is_favorite = engine_id in favorites
1876 if is_favorite:
1877 favorites.remove(engine_id)
1878 is_favorite = False
1879 else:
1880 favorites.append(engine_id)
1881 is_favorite = True
1883 # Update the setting
1884 if settings_manager.set_setting("search.favorites", favorites):
1885 return jsonify(
1886 {
1887 "message": "Favorite toggled successfully",
1888 "engine_id": engine_id,
1889 "is_favorite": is_favorite,
1890 "favorites": favorites,
1891 }
1892 )
1893 else:
1894 return jsonify({"error": "Failed to toggle favorite"}), 500
1896 except Exception:
1897 logger.exception("Error toggling search favorite")
1898 return jsonify({"error": "Failed to toggle favorite"}), 500
1901# Legacy routes for backward compatibility - these will redirect to the new routes
1902@settings_bp.route("/main", methods=["GET"])
1903@login_required
1904def main_config_page():
1905 """Redirect to app settings page"""
1906 return redirect(url_for("settings.settings_page"))
1909@settings_bp.route("/collections", methods=["GET"])
1910@login_required
1911def collections_config_page():
1912 """Redirect to app settings page"""
1913 return redirect(url_for("settings.settings_page"))
1916@settings_bp.route("/api_keys", methods=["GET"])
1917@login_required
1918def api_keys_config_page():
1919 """Redirect to LLM settings page"""
1920 return redirect(url_for("settings.settings_page"))
1923@settings_bp.route("/search_engines", methods=["GET"])
1924@login_required
1925def search_engines_config_page():
1926 """Redirect to search settings page"""
1927 return redirect(url_for("settings.settings_page"))
1930@settings_bp.route("/open_file_location", methods=["POST"])
1931@login_required
1932def open_file_location():
1933 """Open the location of a configuration file.
1935 Security: This endpoint is disabled for server deployments.
1936 It only makes sense for desktop usage where the server and client are on the same machine.
1937 """
1938 return jsonify(
1939 {
1940 "status": "error",
1941 "message": "This feature is disabled. It is only available in desktop mode.",
1942 }
1943 ), 403
1946@settings_bp.context_processor
1947def inject_csrf_token():
1948 """Inject CSRF token into the template context for all settings routes."""
1949 return dict(csrf_token=generate_csrf)
1952@settings_bp.route("/fix_corrupted_settings", methods=["POST"])
1953@login_required
1954@settings_limit
1955def fix_corrupted_settings():
1956 """Fix corrupted settings in the database"""
1957 username = session.get("username")
1959 with get_user_db_session(username) as db_session:
1960 try:
1961 # Track fixed and removed settings
1962 fixed_settings = []
1963 removed_duplicate_settings = []
1964 fixed_scoping_issues = []
1966 # First, find and remove duplicate settings with the same key
1967 # This happens because of errors in settings import/export
1968 from sqlalchemy import func as sql_func
1970 # Find keys with duplicates
1971 duplicate_keys = (
1972 db_session.query(Setting.key)
1973 .group_by(Setting.key)
1974 .having(sql_func.count(Setting.key) > 1)
1975 .all()
1976 )
1977 duplicate_keys = [key[0] for key in duplicate_keys]
1979 # For each duplicate key, keep the latest updated one and remove others
1980 for key in duplicate_keys:
1981 dupe_settings = (
1982 db_session.query(Setting)
1983 .filter(Setting.key == key)
1984 .order_by(Setting.updated_at.desc())
1985 .all()
1986 )
1988 # Keep the first one (most recently updated) and delete the rest
1989 for i, setting in enumerate(dupe_settings):
1990 if i > 0: # Skip the first one (keep it)
1991 db_session.delete(setting)
1992 removed_duplicate_settings.append(key)
1994 # Fix scoping issues - remove app.* settings that should be in other categories
1995 # Report settings
1996 for key in [
1997 "app.enable_fact_checking",
1998 "app.knowledge_accumulation",
1999 "app.knowledge_accumulation_context_limit",
2000 "app.output_dir",
2001 ]:
2002 setting = (
2003 db_session.query(Setting).filter(Setting.key == key).first()
2004 )
2005 if setting:
2006 # Move to proper category if not already there
2007 proper_key = key.replace("app.", "report.")
2008 existing_proper = (
2009 db_session.query(Setting)
2010 .filter(Setting.key == proper_key)
2011 .first()
2012 )
2014 if not existing_proper:
2015 # Create proper setting
2016 new_setting = Setting(
2017 key=proper_key,
2018 value=setting.value,
2019 type=SettingType.REPORT,
2020 name=setting.name,
2021 description=setting.description,
2022 category=(
2023 setting.category.replace("app", "report")
2024 if setting.category
2025 else "report_parameters"
2026 ),
2027 ui_element=setting.ui_element,
2028 options=setting.options,
2029 min_value=setting.min_value,
2030 max_value=setting.max_value,
2031 step=setting.step,
2032 visible=setting.visible,
2033 editable=setting.editable,
2034 )
2035 db_session.add(new_setting)
2037 # Delete the app one
2038 db_session.delete(setting)
2039 fixed_scoping_issues.append(key)
2041 # Search settings
2042 for key in [
2043 "app.questions_per_iteration",
2044 "app.search_engine",
2045 "app.iterations",
2046 "app.max_results",
2047 "app.region",
2048 "app.safe_search",
2049 "app.search_language",
2050 "app.snippets_only",
2051 ]:
2052 setting = (
2053 db_session.query(Setting).filter(Setting.key == key).first()
2054 )
2055 if setting:
2056 # Move to proper category if not already there
2057 proper_key = key.replace("app.", "search.")
2058 existing_proper = (
2059 db_session.query(Setting)
2060 .filter(Setting.key == proper_key)
2061 .first()
2062 )
2064 if not existing_proper:
2065 # Create proper setting
2066 new_setting = Setting(
2067 key=proper_key,
2068 value=setting.value,
2069 type=SettingType.SEARCH,
2070 name=setting.name,
2071 description=setting.description,
2072 category=(
2073 setting.category.replace("app", "search")
2074 if setting.category
2075 else "search_parameters"
2076 ),
2077 ui_element=setting.ui_element,
2078 options=setting.options,
2079 min_value=setting.min_value,
2080 max_value=setting.max_value,
2081 step=setting.step,
2082 visible=setting.visible,
2083 editable=setting.editable,
2084 )
2085 db_session.add(new_setting)
2087 # Delete the app one
2088 db_session.delete(setting)
2089 fixed_scoping_issues.append(key)
2091 # LLM settings
2092 for key in [
2093 "app.model",
2094 "app.provider",
2095 "app.temperature",
2096 "app.max_tokens",
2097 "app.openai_endpoint_url",
2098 "app.lmstudio_url",
2099 "app.llamacpp_model_path",
2100 ]:
2101 setting = (
2102 db_session.query(Setting).filter(Setting.key == key).first()
2103 )
2104 if setting:
2105 # Move to proper category if not already there
2106 proper_key = key.replace("app.", "llm.")
2107 existing_proper = (
2108 db_session.query(Setting)
2109 .filter(Setting.key == proper_key)
2110 .first()
2111 )
2113 if not existing_proper:
2114 # Create proper setting
2115 new_setting = Setting(
2116 key=proper_key,
2117 value=setting.value,
2118 type=SettingType.LLM,
2119 name=setting.name,
2120 description=setting.description,
2121 category=(
2122 setting.category.replace("app", "llm")
2123 if setting.category
2124 else "llm_parameters"
2125 ),
2126 ui_element=setting.ui_element,
2127 options=setting.options,
2128 min_value=setting.min_value,
2129 max_value=setting.max_value,
2130 step=setting.step,
2131 visible=setting.visible,
2132 editable=setting.editable,
2133 )
2134 db_session.add(new_setting)
2136 # Delete the app one
2137 db_session.delete(setting)
2138 fixed_scoping_issues.append(key)
2140 # Check for settings with corrupted values
2141 all_settings = db_session.query(Setting).all()
2142 for setting in all_settings:
2143 # Check different types of corruption
2144 is_corrupted = False
2146 if (
2147 setting.value is None
2148 or (
2149 isinstance(setting.value, str)
2150 and setting.value
2151 in [
2152 "{",
2153 "[",
2154 "{}",
2155 "[]",
2156 "[object Object]",
2157 "null",
2158 "undefined",
2159 ]
2160 )
2161 or (
2162 isinstance(setting.value, dict)
2163 and len(setting.value) == 0
2164 )
2165 ):
2166 is_corrupted = True
2168 # Skip if not corrupted
2169 if not is_corrupted:
2170 continue
2172 # Get default value from migrations
2173 # Import commented out as it's not directly used
2174 # from ..database.migrations import setup_predefined_settings
2176 default_value = None
2178 # Try to find a matching default setting based on key
2179 if setting.key.startswith("llm."):
2180 if setting.key == "llm.model":
2181 default_value = "gpt-3.5-turbo"
2182 elif setting.key == "llm.provider":
2183 default_value = "openai"
2184 elif setting.key == "llm.temperature":
2185 default_value = 0.7
2186 elif setting.key == "llm.max_tokens":
2187 default_value = 1024
2188 elif setting.key.startswith("search."):
2189 if setting.key == "search.tool":
2190 default_value = "auto"
2191 elif setting.key == "search.max_results":
2192 default_value = 10
2193 elif setting.key == "search.region":
2194 default_value = "us"
2195 elif setting.key == "search.questions_per_iteration":
2196 default_value = 3
2197 elif setting.key == "search.searches_per_section":
2198 default_value = 2
2199 elif setting.key == "search.skip_relevance_filter":
2200 default_value = False
2201 elif setting.key == "search.safe_search":
2202 default_value = True
2203 elif setting.key == "search.search_language":
2204 default_value = "English"
2205 elif setting.key.startswith("report."):
2206 if setting.key == "report.searches_per_section":
2207 default_value = 2
2208 elif (
2209 setting.key == "report.enable_fact_checking"
2210 or setting.key == "report.detailed_citations"
2211 ):
2212 default_value = True
2213 elif setting.key.startswith("app."):
2214 if (
2215 setting.key == "app.theme"
2216 or setting.key == "app.default_theme"
2217 ):
2218 default_value = "dark"
2219 elif setting.key == "app.enable_notifications" or (
2220 setting.key == "app.enable_web"
2221 or setting.key == "app.web_interface"
2222 ):
2223 default_value = True
2224 elif setting.key == "app.host":
2225 default_value = "0.0.0.0"
2226 elif setting.key == "app.port":
2227 default_value = 5000
2228 elif setting.key == "app.debug":
2229 default_value = True
2231 # Update the setting with the default value if found
2232 if default_value is not None:
2233 setting.value = default_value
2234 fixed_settings.append(setting.key)
2235 else:
2236 # If no default found but it's a corrupted JSON, set to empty object
2237 if setting.key.startswith("report."):
2238 setting.value = {}
2239 fixed_settings.append(setting.key)
2241 # Commit changes
2242 if (
2243 fixed_settings
2244 or removed_duplicate_settings
2245 or fixed_scoping_issues
2246 ):
2247 db_session.commit()
2248 logger.info(
2249 f"Fixed {len(fixed_settings)} corrupted settings: {', '.join(fixed_settings)}"
2250 )
2251 if removed_duplicate_settings:
2252 logger.info(
2253 f"Removed {len(removed_duplicate_settings)} duplicate settings"
2254 )
2255 if fixed_scoping_issues:
2256 logger.info(
2257 f"Fixed {len(fixed_scoping_issues)} scoping issues"
2258 )
2260 # Return success
2261 return jsonify(
2262 {
2263 "status": "success",
2264 "message": f"Fixed {len(fixed_settings)} corrupted settings, removed {len(removed_duplicate_settings)} duplicates, and fixed {len(fixed_scoping_issues)} scoping issues",
2265 "fixed_settings": fixed_settings,
2266 "removed_duplicates": removed_duplicate_settings,
2267 "fixed_scoping": fixed_scoping_issues,
2268 }
2269 )
2271 except Exception:
2272 logger.exception("Error fixing corrupted settings")
2273 db_session.rollback()
2274 return (
2275 jsonify(
2276 {
2277 "status": "error",
2278 "message": "An internal error occurred while fixing corrupted settings. Please try again later.",
2279 }
2280 ),
2281 500,
2282 )
2285@settings_bp.route("/api/warnings", methods=["GET"])
2286@login_required
2287def api_get_warnings():
2288 """Get current warnings based on settings"""
2289 try:
2290 warnings = calculate_warnings()
2291 return jsonify({"warnings": warnings})
2292 except Exception:
2293 logger.exception("Error getting warnings")
2294 return jsonify({"error": "Failed to retrieve warnings"}), 500
2297@settings_bp.route("/api/ollama-status", methods=["GET"])
2298@login_required
2299def check_ollama_status():
2300 """Check if Ollama is running and available"""
2301 try:
2302 # Get Ollama URL from settings
2303 raw_base_url = _get_setting_from_session(
2304 "llm.ollama.url", "http://localhost:11434"
2305 )
2306 base_url = (
2307 normalize_url(raw_base_url)
2308 if raw_base_url
2309 else "http://localhost:11434"
2310 )
2312 response = safe_get(
2313 f"{base_url}/api/version",
2314 timeout=2.0,
2315 allow_localhost=True,
2316 allow_private_ips=True,
2317 )
2319 if response.status_code == 200:
2320 return jsonify(
2321 {
2322 "running": True,
2323 "version": response.json().get("version", "unknown"),
2324 }
2325 )
2326 else:
2327 return jsonify(
2328 {
2329 "running": False,
2330 "error": f"Ollama returned status code {response.status_code}",
2331 }
2332 )
2333 except requests.exceptions.RequestException:
2334 logger.exception("Ollama check failed")
2335 return jsonify(
2336 {"running": False, "error": "Failed to check search engine status"}
2337 )
2340@settings_bp.route("/api/rate-limiting/status", methods=["GET"])
2341@login_required
2342def api_get_rate_limiting_status():
2343 """Get current rate limiting status and statistics"""
2344 try:
2345 from ...web_search_engines.rate_limiting import get_tracker
2347 tracker = get_tracker()
2349 # Get basic status
2350 status = {
2351 "enabled": tracker.enabled,
2352 "exploration_rate": tracker.exploration_rate,
2353 "learning_rate": tracker.learning_rate,
2354 "memory_window": tracker.memory_window,
2355 }
2357 # Get engine statistics
2358 engine_stats = tracker.get_stats()
2359 engines = []
2361 for stat in engine_stats: 2361 ↛ 2362line 2361 didn't jump to line 2362 because the loop on line 2361 never started
2362 (
2363 engine_type,
2364 base_wait,
2365 min_wait,
2366 max_wait,
2367 last_updated,
2368 total_attempts,
2369 success_rate,
2370 ) = stat
2371 engines.append(
2372 {
2373 "engine_type": engine_type,
2374 "base_wait_seconds": round(base_wait, 2),
2375 "min_wait_seconds": round(min_wait, 2),
2376 "max_wait_seconds": round(max_wait, 2),
2377 "last_updated": last_updated,
2378 "total_attempts": total_attempts,
2379 "success_rate": (
2380 round(success_rate * 100, 1) if success_rate else 0.0
2381 ),
2382 }
2383 )
2385 return jsonify({"status": status, "engines": engines})
2387 except Exception:
2388 logger.exception("Error getting rate limiting status")
2389 return jsonify({"error": "An internal error occurred"}), 500
2392@settings_bp.route(
2393 "/api/rate-limiting/engines/<engine_type>/reset", methods=["POST"]
2394)
2395@login_required
2396def api_reset_engine_rate_limiting(engine_type):
2397 """Reset rate limiting data for a specific engine"""
2398 try:
2399 from ...web_search_engines.rate_limiting import get_tracker
2401 tracker = get_tracker()
2402 tracker.reset_engine(engine_type)
2404 return jsonify(
2405 {"message": f"Rate limiting data reset for {engine_type}"}
2406 )
2408 except Exception:
2409 logger.exception(f"Error resetting rate limiting for {engine_type}")
2410 return jsonify({"error": "An internal error occurred"}), 500
2413@settings_bp.route("/api/rate-limiting/cleanup", methods=["POST"])
2414@login_required
2415def api_cleanup_rate_limiting():
2416 """Clean up old rate limiting data"""
2417 try:
2418 from ...web_search_engines.rate_limiting import get_tracker
2420 days = request.json.get("days", 30) if request.is_json else 30
2422 tracker = get_tracker()
2423 tracker.cleanup_old_data(days)
2425 return jsonify(
2426 {"message": f"Cleaned up rate limiting data older than {days} days"}
2427 )
2429 except Exception:
2430 logger.exception("Error cleaning up rate limiting data")
2431 return jsonify({"error": "An internal error occurred"}), 500
2434@settings_bp.route("/api/bulk", methods=["GET"])
2435@login_required
2436def get_bulk_settings():
2437 """Get multiple settings at once for performance."""
2438 try:
2439 # Get requested settings from query parameters
2440 requested = request.args.getlist("keys[]")
2441 if not requested: 2441 ↛ 2457line 2441 didn't jump to line 2457 because the condition on line 2441 was always true
2442 # Default to common settings if none specified
2443 requested = [
2444 "llm.provider",
2445 "llm.model",
2446 "search.tool",
2447 "search.iterations",
2448 "search.questions_per_iteration",
2449 "search.search_strategy",
2450 "benchmark.evaluation.provider",
2451 "benchmark.evaluation.model",
2452 "benchmark.evaluation.temperature",
2453 "benchmark.evaluation.endpoint_url",
2454 ]
2456 # Fetch all settings at once
2457 result = {}
2458 for key in requested:
2459 try:
2460 value = _get_setting_from_session(key)
2461 result[key] = {"value": value, "exists": value is not None}
2462 except Exception as e:
2463 logger.warning(f"Error getting setting {key}: {e}")
2464 result[key] = {
2465 "value": None,
2466 "exists": False,
2467 "error": "Failed to retrieve setting",
2468 }
2470 return jsonify({"success": True, "settings": result})
2472 except Exception:
2473 logger.exception("Error getting bulk settings")
2474 return jsonify(
2475 {"success": False, "error": "An internal error occurred"}
2476 ), 500
2479@settings_bp.route("/api/data-location", methods=["GET"])
2480@login_required
2481def api_get_data_location():
2482 """Get information about data storage location and security"""
2483 try:
2484 # Get the data directory path
2485 data_dir = get_data_directory()
2486 # Get the encrypted databases path
2487 encrypted_db_path = get_encrypted_database_path()
2489 # Check if LDR_DATA_DIR environment variable is set
2490 from local_deep_research.settings.manager import SettingsManager
2492 settings_manager = SettingsManager()
2493 custom_data_dir = settings_manager.get_setting("bootstrap.data_dir")
2495 # Get platform-specific default location info
2496 platform_info = {
2497 "Windows": "C:\\Users\\Username\\AppData\\Local\\local-deep-research",
2498 "macOS": "~/Library/Application Support/local-deep-research",
2499 "Linux": "~/.local/share/local-deep-research",
2500 }
2502 # Current platform
2503 current_platform = platform.system()
2504 if current_platform == "Darwin": 2504 ↛ 2505line 2504 didn't jump to line 2505 because the condition on line 2504 was never true
2505 current_platform = "macOS"
2507 # Get SQLCipher settings from environment
2508 from ...database.sqlcipher_utils import get_sqlcipher_settings
2510 # Debug logging
2511 logger.info(f"db_manager type: {type(db_manager)}")
2512 logger.info(
2513 f"db_manager.has_encryption: {getattr(db_manager, 'has_encryption', 'ATTRIBUTE NOT FOUND')}"
2514 )
2516 cipher_settings = (
2517 get_sqlcipher_settings() if db_manager.has_encryption else {}
2518 )
2520 return jsonify(
2521 {
2522 "data_directory": str(data_dir),
2523 "database_path": str(encrypted_db_path),
2524 "encrypted_database_path": str(encrypted_db_path),
2525 "is_custom": custom_data_dir is not None,
2526 "custom_env_var": "LDR_DATA_DIR",
2527 "custom_env_value": custom_data_dir,
2528 "platform": current_platform,
2529 "platform_default": platform_info.get(
2530 current_platform, str(data_dir)
2531 ),
2532 "platform_info": platform_info,
2533 "security_notice": {
2534 "encrypted": db_manager.has_encryption,
2535 "warning": "All data including API keys stored in the database are securely encrypted."
2536 if db_manager.has_encryption
2537 else "All data including API keys stored in the database are currently unencrypted. Please ensure appropriate file system permissions are set.",
2538 "recommendation": "Your data is protected with database encryption."
2539 if db_manager.has_encryption
2540 else "Consider using environment variables for sensitive API keys instead of storing them in the database.",
2541 },
2542 "encryption_settings": cipher_settings,
2543 }
2544 )
2546 except Exception:
2547 logger.exception("Error getting data location information")
2548 return jsonify({"error": "Failed to retrieve data location"}), 500
2551@settings_bp.route("/api/notifications/test-url", methods=["POST"])
2552@login_required
2553def api_test_notification_url():
2554 """
2555 Test a notification service URL.
2557 This endpoint creates a temporary NotificationService instance to test
2558 the provided URL. No database session or password is required because:
2559 - The service URL is provided directly in the request body
2560 - Test notifications use a temporary Apprise instance
2561 - No user settings or database queries are performed
2563 Security note: Rate limiting is not applied here because users need to
2564 test URLs when configuring notifications. Abuse is mitigated by the
2565 @login_required decorator and the fact that users can only spam their
2566 own notification services.
2567 """
2568 try:
2569 from ...notifications.service import NotificationService
2571 data = request.get_json()
2572 if not data or "service_url" not in data:
2573 return jsonify(
2574 {"success": False, "error": "service_url is required"}
2575 ), 400
2577 service_url = data["service_url"]
2579 # Create notification service instance and test the URL
2580 # No password/session needed - URL provided directly, no DB access
2581 notification_service = NotificationService()
2582 result = notification_service.test_service(service_url)
2584 # Only return expected fields to prevent information leakage
2585 safe_response = {
2586 "success": result.get("success", False),
2587 "message": result.get("message", ""),
2588 "error": result.get("error", ""),
2589 }
2590 return jsonify(safe_response)
2592 except Exception:
2593 logger.exception("Error testing notification URL")
2594 return jsonify(
2595 {
2596 "success": False,
2597 "error": "Failed to test notification service. Check logs for details.",
2598 }
2599 ), 500