Coverage for src/local_deep_research/web/routes/settings_routes.py: 85%

987 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Settings Routes Module 

3 

4This module handles all settings-related HTTP endpoints for the application. 

5 

6CHECKBOX HANDLING PATTERN: 

7-------------------------- 

8This module supports TWO submission modes to handle checkboxes correctly: 

9 

10**MODE 1: AJAX/JSON Submission (Primary - /save_all_settings)** 

11- JavaScript intercepts form submission with e.preventDefault() 

12- Checkbox values read directly from DOM via checkbox.checked 

13- Data sent as JSON: {"setting.key": true/false} 

14- Hidden fallback inputs are managed but NOT used in this mode 

15- Provides better UX with instant feedback and validation 

16 

17**MODE 2: Traditional POST Submission (Fallback - /save_settings)** 

18- Used when JavaScript is disabled (accessibility/no-JS environments) 

19- Browser submits form data naturally via request.form 

20- Hidden fallback pattern CRITICAL here: 

21 * Checked checkbox: Submits checkbox value, hidden input disabled 

22 * Unchecked checkbox: Submits hidden input value "false" 

23- Ensures unchecked checkboxes are captured (HTML limitation workaround) 

24 

25**Implementation Details:** 

261. Each checkbox has `data-hidden-fallback` attribute → hidden input ID 

272. checkbox_handler.js manages hidden input disabled state 

283. AJAX mode: settings.js reads checkbox.checked directly (lines 2233-2240) 

294. POST mode: Flask reads request.form including enabled hidden inputs 

305. Both modes use convert_setting_value() for consistent boolean conversion 

31 

32**Why Both Patterns?** 

33- AJAX: Better UX, immediate validation, no page reload 

34- Traditional POST: Accessibility, progressive enhancement, JavaScript-free operation 

35- Hidden inputs: Only meaningful for traditional POST, ignored in AJAX mode 

36 

37This dual-mode approach ensures the app works for all users while providing 

38optimal experience when JavaScript is available. 

39""" 

40 

41import platform 

42import time 

43from typing import Any, Optional, Tuple 

44from datetime import UTC, datetime, timedelta, timezone 

45 

46import requests 

47from flask import ( 

48 Blueprint, 

49 flash, 

50 jsonify, 

51 redirect, 

52 request, 

53 session, 

54 url_for, 

55) 

56from flask_wtf.csrf import generate_csrf 

57from loguru import logger 

58 

59from ...config.constants import DEFAULT_OLLAMA_URL 

60from ...llm.providers.base import normalize_provider 

61from ...config.paths import get_data_directory, get_encrypted_database_path 

62from ...database.models import Setting, SettingType 

63from ...database.session_context import get_user_db_session 

64from ...database.encrypted_db import db_manager 

65from ...utilities.db_utils import get_settings_manager 

66from ...utilities.url_utils import normalize_url 

67from ...security.decorators import require_json_body 

68from ..auth.decorators import login_required 

69from ..utils.request_helpers import parse_bool_arg 

70from ...security.rate_limiter import settings_limit 

71from ...settings.manager import get_typed_setting_value, parse_boolean 

72from ..services.settings_service import ( 

73 create_or_update_setting, 

74 invalidate_settings_caches, 

75 set_setting, 

76) 

77from ..utils.route_decorators import with_user_session 

78from ..utils.templates import render_template_with_defaults 

79 

80 

81from ...security import safe_get 

82from ..warning_checks import calculate_warnings 

83 

84# Create a Blueprint for settings 

85settings_bp = Blueprint("settings", __name__, url_prefix="/settings") 

86 

87# NOTE: Routes use session["username"] (not .get()) intentionally. 

88# @login_required guarantees the key exists; direct access fails fast 

89# if the decorator is ever removed. 

90 

91# Settings with dynamically populated options (excluded from validation) 

92DYNAMIC_SETTINGS = ["llm.provider", "llm.model", "search.tool"] 

93 

94# Namespace validation for new setting creation via the web API. 

95# Keys starting with any ALLOWED prefix may be created; any prefix in 

96# BLOCKED takes precedence and is rejected even if it also matches an 

97# allowed prefix. Existing keys (updates) bypass this check — it only 

98# applies to creation of new DB rows through the three write routes. 

99ALLOWED_SETTING_PREFIXES = frozenset( 

100 { 

101 "app.", 

102 "backup.", 

103 "benchmark.", 

104 "chat.", 

105 "database.", 

106 "document_scheduler.", 

107 "embeddings.", 

108 "focused_iteration.", 

109 "general.", 

110 "langgraph_agent.", 

111 "llm.", 

112 "local_search_", 

113 "mcp.", 

114 "news.", 

115 "notifications.", 

116 "rag.", 

117 "rate_limiting.", 

118 "report.", 

119 "research_library.", 

120 "search.", 

121 "ui.", 

122 "web.", 

123 } 

124) 

125BLOCKED_SETTING_PREFIXES = frozenset( 

126 { 

127 "auth.", 

128 "bootstrap.", 

129 "db_config.", 

130 "security.", 

131 "server.", 

132 "testing.", 

133 } 

134) 

135 

136 

137def _is_allowed_new_setting_key(key: str) -> bool: 

138 """Return True if *key* is permitted to be created via the web API.""" 

139 if not isinstance(key, str) or not key or ".." in key: 

140 return False 

141 key = key.lower() 

142 for prefix in BLOCKED_SETTING_PREFIXES: 

143 if key.startswith(prefix): 

144 return False 

145 for prefix in ALLOWED_SETTING_PREFIXES: 

146 if key.startswith(prefix): 

147 return True 

148 return False 

149 

150 

151def _get_setting_from_session(key: str | None, default=None): 

152 """Helper to get a setting using the current session context. 

153 

154 A ``None`` key returns ``default``. ``SettingsManager.get_setting`` 

155 treats ``key=None`` as "return all settings" (a real feature used by 

156 ``get_all_settings`` for enumeration); this route helper is for 

157 fetching one named setting and must not inherit that bulk-read 

158 semantic. Without the guard, callers like the auto-discovered 

159 model-listing loop would receive a dict of every setting (including 

160 other providers' API keys) when a provider declares 

161 ``api_key_setting = None`` (LM Studio, Llama.cpp). 

162 """ 

163 if key is None: 

164 return default 

165 username = session.get("username") 

166 with get_user_db_session(username) as db_session: 

167 if db_session: 

168 settings_manager = get_settings_manager(db_session, username) 

169 return settings_manager.get_setting(key, default) 

170 return default 

171 

172 

173def validate_setting( 

174 setting: Setting, value: Any 

175) -> Tuple[bool, Optional[str]]: 

176 """ 

177 Validate a setting value based on its type and constraints. 

178 

179 Args: 

180 setting: The Setting object to validate against 

181 value: The value to validate 

182 

183 Returns: 

184 Tuple of (is_valid, error_message) 

185 """ 

186 # Convert value to appropriate type first using SettingsManager's logic 

187 value = get_typed_setting_value( 

188 key=str(setting.key), 

189 value=value, 

190 ui_element=str(setting.ui_element), 

191 default=None, 

192 check_env=False, 

193 ) 

194 

195 # Validate based on UI element type 

196 if setting.ui_element == "checkbox": 

197 # After conversion, should be boolean 

198 if not isinstance(value, bool): 

199 return False, "Value must be a boolean" 

200 

201 elif setting.ui_element in ("number", "slider", "range"): 

202 # After conversion, should be numeric 

203 if not isinstance(value, (int, float)): 

204 return False, "Value must be a number" 

205 

206 # Check min/max constraints if defined 

207 if setting.min_value is not None and value < setting.min_value: 

208 return False, f"Value must be at least {setting.min_value}" 

209 if setting.max_value is not None and value > setting.max_value: 

210 return False, f"Value must be at most {setting.max_value}" 

211 

212 elif setting.ui_element == "select": 

213 # Check if value is in the allowed options 

214 if setting.options: 

215 # Skip options validation for dynamically populated dropdowns 

216 if setting.key not in DYNAMIC_SETTINGS: 

217 allowed_values = [ 

218 opt.get("value") if isinstance(opt, dict) else opt 

219 for opt in list(setting.options) # type: ignore[arg-type] 

220 ] 

221 if value not in allowed_values: 

222 return ( 

223 False, 

224 f"Value must be one of: {', '.join(str(v) for v in allowed_values)}", 

225 ) 

226 

227 # All checks passed 

228 return True, None 

229 

230 

231def coerce_setting_for_write(key: str, value: Any, ui_element: str) -> Any: 

232 """Coerce an incoming value to the correct type before writing to the DB. 

233 

234 All web routes that save settings should use this function to ensure 

235 consistent type conversion. 

236 

237 No JSON pre-parsing (``json.loads``) is needed here because: 

238 - ``get_typed_setting_value`` already parses JSON strings internally 

239 via ``_parse_json_value`` (for ``ui_element="json"``) and 

240 ``_parse_multiselect`` (for ``ui_element="multiselect"``). 

241 - For JSON API endpoints, ``request.get_json()`` already delivers 

242 dicts/lists as native Python objects. 

243 - For ``ui_element="text"``, pre-parsing would corrupt data: a JSON 

244 string like ``'{"k": "v"}'`` would become a dict, then ``str()`` 

245 would produce ``"{'k': 'v'}"`` (Python repr, not valid JSON). 

246 """ 

247 # check_env=False: we are persisting a user-supplied value, not reading 

248 # from an environment variable override. check_env=True (the default) 

249 # would silently replace the user's value with an env var, which is 

250 # incorrect on the write path. 

251 return get_typed_setting_value( 

252 key=key, 

253 value=value, 

254 ui_element=ui_element, 

255 default=None, 

256 check_env=False, 

257 ) 

258 

259 

260@settings_bp.route("/", methods=["GET"]) 

261@login_required 

262def settings_page(): 

263 """Main settings dashboard with links to specialized config pages""" 

264 return render_template_with_defaults("settings_dashboard.html") 

265 

266 

267@settings_bp.route("/save_all_settings", methods=["POST"]) 

268@login_required 

269@settings_limit 

270@require_json_body( 

271 error_format="status", error_message="No settings data provided" 

272) 

273@with_user_session() 

274def save_all_settings(db_session=None, settings_manager=None): 

275 """Handle saving all settings at once from the unified settings page""" 

276 try: 

277 # Process JSON data 

278 form_data = request.get_json() 

279 if not form_data: 

280 return ( 

281 jsonify( 

282 { 

283 "status": "error", 

284 "message": "No settings data provided", 

285 } 

286 ), 

287 400, 

288 ) 

289 

290 # Track validation errors 

291 validation_errors = [] 

292 settings_by_type: dict[str, Any] = {} 

293 

294 # Track changes for logging 

295 updated_settings = [] 

296 created_settings = [] 

297 

298 # Store original values for better messaging 

299 original_values = {} 

300 

301 # Fetch all settings once to avoid N+1 query problem 

302 all_db_settings = { 

303 setting.key: setting for setting in db_session.query(Setting).all() 

304 } 

305 

306 # Filter out non-editable settings 

307 non_editable_keys = [ 

308 key 

309 for key in form_data.keys() 

310 if key in all_db_settings and not all_db_settings[key].editable 

311 ] 

312 if non_editable_keys: 

313 logger.warning( 

314 f"Skipping non-editable settings: {non_editable_keys}" 

315 ) 

316 for key in non_editable_keys: 

317 del form_data[key] 

318 

319 # Update each setting 

320 for key, value in form_data.items(): 

321 # Skip corrupted keys or empty strings as keys 

322 if not key or not isinstance(key, str) or key.strip() == "": 

323 continue 

324 

325 # Get the setting metadata from pre-fetched dict 

326 current_setting = all_db_settings.get(key) 

327 

328 # EARLY VALIDATION: Convert checkbox values BEFORE any other processing 

329 # This prevents incorrect triggering of corrupted value detection 

330 if current_setting and current_setting.ui_element == "checkbox": 

331 if not isinstance(value, bool): 

332 logger.debug( 

333 f"Converting checkbox {key} from {type(value).__name__} to bool: {value}" 

334 ) 

335 value = parse_boolean(value) 

336 form_data[key] = ( 

337 value # Update the form_data with converted value 

338 ) 

339 

340 # Store original value for messaging 

341 if current_setting: 

342 original_values[key] = current_setting.value 

343 

344 # Determine setting type and category 

345 if key.startswith("llm."): 

346 setting_type = SettingType.LLM 

347 category = "llm_general" 

348 if ( 

349 "temperature" in key 

350 or "max_tokens" in key 

351 or "batch" in key 

352 or "layers" in key 

353 ): 

354 category = "llm_parameters" 

355 elif key.startswith("search."): 

356 setting_type = SettingType.SEARCH 

357 category = "search_general" 

358 if ( 

359 "iterations" in key 

360 or "results" in key 

361 or "region" in key 

362 or "questions" in key 

363 or "section" in key 

364 ): 

365 category = "search_parameters" 

366 elif key.startswith("report."): 

367 setting_type = SettingType.REPORT 

368 category = "report_parameters" 

369 elif key.startswith("database."): 

370 setting_type = SettingType.DATABASE 

371 category = "database_parameters" 

372 elif key.startswith("app."): 

373 setting_type = SettingType.APP 

374 category = "app_interface" 

375 elif key.startswith("chat."): 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true

376 setting_type = SettingType.CHAT 

377 category = "chat" 

378 else: 

379 setting_type = None 

380 category = None 

381 

382 # Special handling for corrupted or empty values 

383 if value == "[object Object]" or ( 

384 isinstance(value, str) 

385 and value.strip() in ["{}", "[]", "{", "["] 

386 ): 

387 if key.startswith("report."): 

388 value = {} 

389 else: 

390 # Use default or null for other types 

391 if key == "llm.model": 

392 value = "" 

393 elif key == "llm.provider": 

394 value = "ollama" 

395 elif key == "search.tool": 

396 value = "auto" 

397 elif key in ["app.theme", "app.default_theme"]: 

398 value = "dark" 

399 else: 

400 value = None 

401 

402 logger.warning(f"Corrected corrupted value for {key}: {value}") 

403 # NOTE: No JSON pre-parsing is done here. After the 

404 # corruption replacement above, values are Python dicts 

405 # (e.g. {}), hardcoded strings, or None — none are JSON 

406 # strings that need parsing. Type conversion below via 

407 # coerce_setting_for_write() handles everything; that 

408 # function delegates to get_typed_setting_value() which 

409 # already parses JSON internally for "json" and 

410 # "multiselect" ui_elements. 

411 

412 if current_setting: 

413 # Coerce to correct Python type (e.g. str "5" → int 5 

414 # for number settings, str "true" → bool for checkboxes). 

415 converted_value = coerce_setting_for_write( 

416 key=current_setting.key, 

417 value=value, 

418 ui_element=current_setting.ui_element, 

419 ) 

420 

421 # Validate the setting 

422 is_valid, error_message = validate_setting( 

423 current_setting, converted_value 

424 ) 

425 

426 if is_valid: 

427 # Save the converted setting using the same session 

428 success = set_setting( 

429 key, converted_value, db_session=db_session 

430 ) 

431 if success: 431 ↛ 435line 431 didn't jump to line 435 because the condition on line 431 was always true

432 updated_settings.append(key) 

433 

434 # Track settings by type for exporting 

435 if current_setting.type not in settings_by_type: 

436 settings_by_type[current_setting.type] = [] 

437 settings_by_type[current_setting.type].append( 

438 current_setting 

439 ) 

440 else: 

441 # Add to validation errors 

442 validation_errors.append( 

443 { 

444 "key": key, 

445 "name": current_setting.name, 

446 "error": error_message, 

447 } 

448 ) 

449 else: 

450 # Namespace validation: reject new keys outside allowed prefixes. 

451 if not _is_allowed_new_setting_key(key): 

452 logger.warning( 

453 "Security: Rejected setting outside allowed namespaces: {!r} (user={!r})", 

454 key, 

455 session["username"], 

456 ) 

457 validation_errors.append( 

458 { 

459 "key": key, 

460 "name": key, 

461 "error": "Creating settings under this namespace is not allowed.", 

462 } 

463 ) 

464 continue 

465 

466 # Create a new setting 

467 new_setting = { 

468 "key": key, 

469 "value": value, 

470 "type": setting_type.value.lower() 

471 if setting_type is not None 

472 else "app", 

473 "name": key.split(".")[-1].replace("_", " ").title(), 

474 "description": f"Setting for {key}", 

475 "category": category, 

476 "ui_element": "text", # Default UI element 

477 } 

478 

479 # Determine better UI element based on value type 

480 if isinstance(value, bool): 

481 new_setting["ui_element"] = "checkbox" 

482 elif isinstance(value, (int, float)) and not isinstance( 

483 value, bool 

484 ): 

485 new_setting["ui_element"] = "number" 

486 elif isinstance(value, (dict, list)): 

487 new_setting["ui_element"] = "textarea" 

488 

489 # Create the setting 

490 db_setting = create_or_update_setting( 

491 new_setting, db_session=db_session 

492 ) 

493 

494 if db_setting: 

495 created_settings.append(key) 

496 # Track settings by type for exporting 

497 if db_setting.type not in settings_by_type: 497 ↛ 499line 497 didn't jump to line 499 because the condition on line 497 was always true

498 settings_by_type[db_setting.type] = [] 

499 settings_by_type[db_setting.type].append(db_setting) 

500 else: 

501 validation_errors.append( 

502 { 

503 "key": key, 

504 "name": new_setting["name"], 

505 "error": "Failed to create setting", 

506 } 

507 ) 

508 

509 # Report validation errors if any 

510 if validation_errors: 

511 return ( 

512 jsonify( 

513 { 

514 "status": "error", 

515 "message": "Validation errors", 

516 "errors": validation_errors, 

517 } 

518 ), 

519 400, 

520 ) 

521 

522 # Get all settings to return to the client for proper state update 

523 all_settings = {} 

524 for setting in db_session.query(Setting).all(): 

525 # Convert enum to string if present 

526 setting_type = setting.type 

527 if hasattr(setting_type, "value"): 

528 setting_type = setting_type.value 

529 

530 all_settings[setting.key] = { 

531 "value": setting.value, 

532 "name": setting.name, 

533 "description": setting.description, 

534 "type": setting_type, 

535 "category": setting.category, 

536 "ui_element": setting.ui_element, 

537 "editable": setting.editable, 

538 "options": setting.options, 

539 "visible": setting.visible, 

540 "min_value": setting.min_value, 

541 "max_value": setting.max_value, 

542 "step": setting.step, 

543 } 

544 

545 # Customize the success message based on what changed 

546 success_message = "" 

547 if len(updated_settings) == 1: 

548 # For a single update, provide more specific info about what changed 

549 key = updated_settings[0] 

550 # Reuse the already-fetched setting from our pre-fetched dict 

551 updated_setting = all_db_settings.get(key) 

552 name = ( 

553 updated_setting.name 

554 if updated_setting 

555 else key.split(".")[-1].replace("_", " ").title() 

556 ) 

557 

558 # Format the message 

559 if key in original_values: 559 ↛ 573line 559 didn't jump to line 573 because the condition on line 559 was always true

560 new_value = updated_setting.value if updated_setting else None 

561 

562 # If it's a boolean, use "enabled/disabled" language 

563 if isinstance(new_value, bool): 

564 state = "enabled" if new_value else "disabled" 

565 success_message = f"{name} {state}" 

566 else: 

567 # For non-boolean values 

568 if isinstance(new_value, (dict, list)): 

569 success_message = f"{name} updated" 

570 else: 

571 success_message = f"{name} updated" 

572 else: 

573 success_message = f"{name} updated" 

574 else: 

575 # Multiple settings or generic message 

576 success_message = f"Settings saved successfully ({len(updated_settings)} updated, {len(created_settings)} created)" 

577 

578 # Check if any warning-affecting settings were changed and include warnings 

579 response_data = { 

580 "status": "success", 

581 "message": success_message, 

582 "updated": updated_settings, 

583 "created": created_settings, 

584 "settings": all_settings, 

585 } 

586 

587 warning_affecting_keys = [ 

588 "llm.provider", 

589 "search.tool", 

590 "search.iterations", 

591 "search.questions_per_iteration", 

592 "llm.local_context_window_size", 

593 "llm.context_window_unrestricted", 

594 "llm.context_window_size", 

595 ] 

596 

597 # Check if any warning-affecting settings were changed 

598 if any( 

599 key in warning_affecting_keys 

600 for key in updated_settings + created_settings 

601 ): 

602 warnings = calculate_warnings() 

603 response_data["warnings"] = warnings 

604 logger.info( 

605 f"Bulk settings update affected warning keys, calculated {len(warnings)} warnings" 

606 ) 

607 

608 invalidate_settings_caches(session["username"]) 

609 return jsonify(response_data) 

610 

611 except Exception: 

612 logger.exception("Error saving settings") 

613 return ( 

614 jsonify( 

615 { 

616 "status": "error", 

617 "message": "An internal error occurred while saving settings.", 

618 } 

619 ), 

620 500, 

621 ) 

622 

623 

624@settings_bp.route("/reset_to_defaults", methods=["POST"]) 

625@login_required 

626@settings_limit 

627@with_user_session() 

628def reset_to_defaults(db_session=None, settings_manager=None): 

629 """Reset all settings to their default values""" 

630 try: 

631 settings_manager.load_from_defaults_file() 

632 

633 logger.info("Successfully imported settings from default files") 

634 

635 except Exception: 

636 logger.exception("Error importing default settings") 

637 return jsonify( 

638 { 

639 "status": "error", 

640 "message": "Failed to reset settings to defaults", 

641 } 

642 ), 500 

643 

644 invalidate_settings_caches(session["username"]) 

645 return jsonify( 

646 { 

647 "status": "success", 

648 "message": "All settings have been reset to default values", 

649 } 

650 ) 

651 

652 

653@settings_bp.route("/save_settings", methods=["POST"]) 

654@login_required 

655@settings_limit 

656@with_user_session() 

657def save_settings(db_session=None, settings_manager=None): 

658 """Save all settings from the form using POST method - fallback when JavaScript is disabled""" 

659 try: 

660 # Get form data 

661 form_data = request.form.to_dict() 

662 

663 # Remove CSRF token from the data 

664 form_data.pop("csrf_token", None) 

665 

666 updated_count = 0 

667 failed_count = 0 

668 rejected_count = 0 

669 

670 # Fetch all settings once to avoid N+1 query problem 

671 all_db_settings = { 

672 setting.key: setting for setting in db_session.query(Setting).all() 

673 } 

674 

675 # Filter out non-editable settings 

676 non_editable_keys = [ 

677 key 

678 for key in form_data.keys() 

679 if key in all_db_settings and not all_db_settings[key].editable 

680 ] 

681 if non_editable_keys: 

682 logger.warning( 

683 f"Skipping non-editable settings: {non_editable_keys}" 

684 ) 

685 for key in non_editable_keys: 

686 del form_data[key] 

687 

688 # Process each setting 

689 for key, value in form_data.items(): 

690 try: 

691 # Get the setting from pre-fetched dict 

692 db_setting = all_db_settings.get(key) 

693 

694 # Namespace validation: reject new keys outside allowed prefixes. 

695 # Existing keys (updates) bypass this check — it only applies 

696 # to creation of brand-new rows through this form-POST route. 

697 if db_setting is None and not _is_allowed_new_setting_key(key): 

698 logger.warning( 

699 "Security: Rejected setting outside allowed namespaces: {!r} (user={!r})", 

700 key, 

701 session["username"], 

702 ) 

703 rejected_count += 1 

704 continue 

705 

706 # Coerce form POST string to correct Python type. 

707 if db_setting: 

708 value = coerce_setting_for_write( 

709 key=db_setting.key, 

710 value=value, 

711 ui_element=db_setting.ui_element, 

712 ) 

713 

714 # Save the setting 

715 if settings_manager.set_setting(key, value, commit=False): 

716 updated_count += 1 

717 else: 

718 failed_count += 1 

719 logger.warning(f"Failed to save setting {key}") 

720 

721 except Exception: 

722 logger.exception(f"Error saving setting {key}") 

723 failed_count += 1 

724 

725 # Commit all changes at once 

726 try: 

727 db_session.commit() 

728 

729 flash( 

730 f"Settings saved successfully! Updated {updated_count} settings.", 

731 "success", 

732 ) 

733 if failed_count > 0: 

734 flash( 

735 f"Warning: {failed_count} settings failed to save.", 

736 "warning", 

737 ) 

738 if rejected_count > 0: 

739 flash( 

740 f"Rejected {rejected_count} settings (unknown namespace). " 

741 "This may indicate a bug or an attempted injection.", 

742 "error", 

743 ) 

744 invalidate_settings_caches(session["username"]) 

745 

746 except Exception: 

747 db_session.rollback() 

748 logger.exception("Failed to commit settings") 

749 flash("Error saving settings. Please try again.", "error") 

750 

751 return redirect(url_for("settings.settings_page")) 

752 

753 except Exception: 

754 logger.exception("Error in save_settings") 

755 flash("An internal error occurred while saving settings.", "error") 

756 return redirect(url_for("settings.settings_page")) 

757 

758 

759# API Routes 

760@settings_bp.route("/api", methods=["GET"]) 

761@login_required 

762@with_user_session() 

763def api_get_all_settings(db_session=None, settings_manager=None): 

764 """Get all settings""" 

765 try: 

766 # Get query parameters 

767 category = request.args.get("category") 

768 

769 # Get settings 

770 settings = settings_manager.get_all_settings() 

771 

772 # Filter by category if requested 

773 if category: 

774 # Need to get all setting details to check category 

775 db_settings = db_session.query(Setting).all() 

776 category_keys = [ 

777 s.key for s in db_settings if s.category == category 

778 ] 

779 

780 # Filter settings by keys 

781 settings = { 

782 key: value 

783 for key, value in settings.items() 

784 if key in category_keys 

785 } 

786 

787 return jsonify({"status": "success", "settings": settings}) 

788 except Exception: 

789 logger.exception("Error getting settings") 

790 return jsonify({"error": "Failed to retrieve settings"}), 500 

791 

792 

793@settings_bp.route("/api/<path:key>", methods=["GET"]) 

794@login_required 

795@with_user_session() 

796def api_get_db_setting(key, db_session=None, settings_manager=None): 

797 """Get a specific setting by key from DB, falling back to defaults.""" 

798 try: 

799 # Get setting from database using the same session 

800 db_setting = ( 

801 db_session.query(Setting).filter(Setting.key == key).first() 

802 ) 

803 

804 if db_setting: 

805 # Return full setting details from DB 

806 setting_data = { 

807 "key": db_setting.key, 

808 "value": db_setting.value, 

809 "type": db_setting.type 

810 if isinstance(db_setting.type, str) 

811 else db_setting.type.value, 

812 "name": db_setting.name, 

813 "description": db_setting.description, 

814 "category": db_setting.category, 

815 "ui_element": db_setting.ui_element, 

816 "options": db_setting.options, 

817 "min_value": db_setting.min_value, 

818 "max_value": db_setting.max_value, 

819 "step": db_setting.step, 

820 "visible": db_setting.visible, 

821 "editable": db_setting.editable, 

822 } 

823 return jsonify(setting_data) 

824 

825 # Not in DB — check defaults so this endpoint is consistent 

826 # with GET /settings/api which includes default settings 

827 default_meta = settings_manager.default_settings.get(key) 

828 if default_meta: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true

829 setting_data = { 

830 "key": key, 

831 "value": default_meta.get("value"), 

832 "type": default_meta.get("type", "APP"), 

833 "name": default_meta.get("name", key), 

834 "description": default_meta.get("description"), 

835 "category": default_meta.get("category"), 

836 "ui_element": default_meta.get("ui_element", "text"), 

837 "options": default_meta.get("options"), 

838 "min_value": default_meta.get("min_value"), 

839 "max_value": default_meta.get("max_value"), 

840 "step": default_meta.get("step"), 

841 "visible": default_meta.get("visible", True), 

842 "editable": default_meta.get("editable", True), 

843 } 

844 return jsonify(setting_data) 

845 

846 return jsonify({"error": f"Setting not found: {key}"}), 404 

847 except Exception: 

848 logger.exception(f"Error getting setting {key}") 

849 return jsonify({"error": "Failed to retrieve settings"}), 500 

850 

851 

852@settings_bp.route("/api/<path:key>", methods=["PUT"]) 

853@login_required 

854@settings_limit 

855@require_json_body(error_message="No data provided") 

856@with_user_session(include_settings_manager=False) 

857def api_update_setting(key, db_session=None): 

858 """Update a setting""" 

859 try: 

860 # Get request data 

861 data = request.get_json() 

862 value = data.get("value") 

863 if value is None: 

864 return jsonify({"error": "No value provided"}), 400 

865 

866 # Check if setting exists 

867 db_setting = ( 

868 db_session.query(Setting).filter(Setting.key == key).first() 

869 ) 

870 

871 if db_setting: 

872 # Check if setting is editable 

873 if not db_setting.editable: 

874 return jsonify({"error": f"Setting {key} is not editable"}), 403 

875 

876 # Coerce to correct Python type before saving. 

877 # Without this, values from JSON API requests are stored 

878 # as-is (e.g. string "5" instead of int 5 for number 

879 # settings, string "true" instead of bool for checkboxes). 

880 value = coerce_setting_for_write( 

881 key=db_setting.key, 

882 value=value, 

883 ui_element=db_setting.ui_element, 

884 ) 

885 

886 # Validate the setting (matches save_all_settings pattern) 

887 is_valid, error_message = validate_setting(db_setting, value) 

888 if not is_valid: 

889 logger.warning( 

890 f"Validation failed for setting {key}: {error_message}" 

891 ) 

892 return jsonify( 

893 {"error": f"Invalid value for setting {key}"} 

894 ), 400 

895 

896 # Update setting 

897 # Pass the db_session to avoid session lookup issues 

898 success = set_setting(key, value, db_session=db_session) 

899 if success: 

900 response_data: dict[str, Any] = { 

901 "message": f"Setting {key} updated successfully" 

902 } 

903 

904 # If this is a key that affects warnings, include warning calculations 

905 warning_affecting_keys = [ 

906 "llm.provider", 

907 "search.tool", 

908 "search.iterations", 

909 "search.questions_per_iteration", 

910 "llm.local_context_window_size", 

911 "llm.context_window_unrestricted", 

912 "llm.context_window_size", 

913 ] 

914 

915 if key in warning_affecting_keys: 

916 warnings = calculate_warnings() 

917 response_data["warnings"] = warnings 

918 logger.debug( 

919 f"Setting {key} changed to {value}, calculated {len(warnings)} warnings" 

920 ) 

921 

922 invalidate_settings_caches(session["username"]) 

923 return jsonify(response_data) 

924 return jsonify({"error": f"Failed to update setting {key}"}), 500 

925 

926 # Namespace validation: reject new keys outside allowed prefixes. 

927 if not _is_allowed_new_setting_key(key): 

928 logger.warning( 

929 "Security: Rejected setting outside allowed namespaces: {!r} (user={!r})", 

930 key, 

931 session["username"], 

932 ) 

933 return jsonify( 

934 { 

935 "error": f"Creating settings under this namespace is not allowed: {key}" 

936 } 

937 ), 400 

938 

939 # Create new setting with default metadata 

940 setting_dict = { 

941 "key": key, 

942 "value": value, 

943 "name": key.split(".")[-1].replace("_", " ").title(), 

944 "description": f"Setting for {key}", 

945 } 

946 

947 # Add additional metadata if provided. 

948 # 'visible' and 'editable' are system-controlled — not accepted from callers. 

949 for field in [ 

950 "type", 

951 "name", 

952 "description", 

953 "category", 

954 "ui_element", 

955 "options", 

956 "min_value", 

957 "max_value", 

958 "step", 

959 ]: 

960 if field in data: 

961 setting_dict[field] = data[field] 

962 

963 # Create setting 

964 db_setting = create_or_update_setting( 

965 setting_dict, db_session=db_session 

966 ) 

967 

968 if db_setting: 

969 invalidate_settings_caches(session["username"]) 

970 return ( 

971 jsonify( 

972 { 

973 "message": f"Setting {key} created successfully", 

974 "setting": { 

975 "key": db_setting.key, 

976 "value": db_setting.value, 

977 "type": db_setting.type.value, 

978 "name": db_setting.name, 

979 }, 

980 } 

981 ), 

982 201, 

983 ) 

984 return jsonify({"error": f"Failed to create setting {key}"}), 500 

985 except Exception: 

986 logger.exception(f"Error updating setting {key}") 

987 return jsonify({"error": "Failed to update setting"}), 500 

988 

989 

990@settings_bp.route("/api/<path:key>", methods=["DELETE"]) 

991@login_required 

992@with_user_session() 

993def api_delete_setting(key, db_session=None, settings_manager=None): 

994 """Delete a setting""" 

995 try: 

996 # Check if setting exists 

997 db_setting = ( 

998 db_session.query(Setting).filter(Setting.key == key).first() 

999 ) 

1000 if not db_setting: 

1001 return jsonify({"error": f"Setting not found: {key}"}), 404 

1002 

1003 # Check if setting is editable 

1004 if not db_setting.editable: 

1005 return jsonify({"error": f"Setting {key} is not editable"}), 403 

1006 

1007 # Delete setting 

1008 success = settings_manager.delete_setting(key) 

1009 if success: 

1010 invalidate_settings_caches(session["username"]) 

1011 return jsonify({"message": f"Setting {key} deleted successfully"}) 

1012 return jsonify({"error": f"Failed to delete setting {key}"}), 500 

1013 except Exception: 

1014 logger.exception(f"Error deleting setting {key}") 

1015 return jsonify({"error": "Failed to delete setting"}), 500 

1016 

1017 

1018@settings_bp.route("/api/import", methods=["POST"]) 

1019@login_required 

1020@settings_limit 

1021@with_user_session() 

1022def api_import_settings(db_session=None, settings_manager=None): 

1023 """Import settings from defaults file""" 

1024 try: 

1025 settings_manager.load_from_defaults_file() 

1026 

1027 invalidate_settings_caches(session["username"]) 

1028 return jsonify({"message": "Settings imported successfully"}) 

1029 except Exception: 

1030 logger.exception("Error importing settings") 

1031 return jsonify({"error": "Failed to import settings"}), 500 

1032 

1033 

1034@settings_bp.route("/api/categories", methods=["GET"]) 

1035@login_required 

1036@with_user_session(include_settings_manager=False) 

1037def api_get_categories(db_session=None): 

1038 """Get all setting categories""" 

1039 try: 

1040 # Get all distinct categories 

1041 categories = db_session.query(Setting.category).distinct().all() 

1042 category_list = [c[0] for c in categories if c[0] is not None] 

1043 

1044 return jsonify({"categories": category_list}) 

1045 except Exception: 

1046 logger.exception("Error getting categories") 

1047 return jsonify({"error": "Failed to retrieve settings"}), 500 

1048 

1049 

1050@settings_bp.route("/api/types", methods=["GET"]) 

1051@login_required 

1052def api_get_types(): 

1053 """Get all setting types""" 

1054 try: 

1055 # Get all setting types 

1056 types = [t.value for t in SettingType] 

1057 return jsonify({"types": types}) 

1058 except Exception: 

1059 logger.exception("Error getting types") 

1060 return jsonify({"error": "Failed to retrieve settings"}), 500 

1061 

1062 

1063@settings_bp.route("/api/ui_elements", methods=["GET"]) 

1064@login_required 

1065def api_get_ui_elements(): 

1066 """Get all UI element types""" 

1067 try: 

1068 # Define supported UI element types 

1069 ui_elements = [ 

1070 "text", 

1071 "select", 

1072 "checkbox", 

1073 "slider", 

1074 "number", 

1075 "textarea", 

1076 "color", 

1077 "date", 

1078 "file", 

1079 "password", 

1080 ] 

1081 

1082 return jsonify({"ui_elements": ui_elements}) 

1083 except Exception: 

1084 logger.exception("Error getting UI elements") 

1085 return jsonify({"error": "Failed to retrieve settings"}), 500 

1086 

1087 

1088@settings_bp.route("/api/available-models", methods=["GET"]) 

1089@login_required 

1090def api_get_available_models(): 

1091 """Get available LLM models from various providers""" 

1092 endpoint_start = time.perf_counter() 

1093 try: 

1094 from ...database.models import ProviderModel 

1095 

1096 # Check if force_refresh is requested 

1097 force_refresh = parse_bool_arg("force_refresh") 

1098 

1099 # Get all auto-discovered providers (show all so users can discover 

1100 # and configure providers they haven't set up yet) 

1101 from ...llm.providers import get_discovered_provider_options 

1102 

1103 provider_options = get_discovered_provider_options() 

1104 

1105 # Add remaining hardcoded providers (complex local providers not yet migrated) 

1106 provider_options.extend( 

1107 [ 

1108 { 

1109 "value": "LLAMACPP", 

1110 "label": "Llama.cpp (Local GGUF files only)", 

1111 }, 

1112 ] 

1113 ) 

1114 

1115 # Available models by provider 

1116 providers: dict[str, Any] = {} 

1117 

1118 # Check database cache first (unless force_refresh is True) 

1119 if not force_refresh: 

1120 try: 

1121 # Define cache expiration (24 hours) 

1122 cache_expiry = datetime.now(UTC) - timedelta(hours=24) 

1123 

1124 # Get cached models from database 

1125 username = session["username"] 

1126 with get_user_db_session(username) as db_session: 

1127 cached_models = ( 

1128 db_session.query(ProviderModel) 

1129 .filter(ProviderModel.last_updated > cache_expiry) 

1130 .all() 

1131 ) 

1132 

1133 if cached_models: 1133 ↛ 1134line 1133 didn't jump to line 1134 because the condition on line 1133 was never true

1134 logger.info( 

1135 f"Found {len(cached_models)} cached models in database" 

1136 ) 

1137 

1138 # Group models by provider 

1139 for model in cached_models: 

1140 provider_key = ( 

1141 f"{normalize_provider(model.provider)}_models" 

1142 ) 

1143 if provider_key not in providers: 

1144 providers[provider_key] = [] 

1145 

1146 providers[provider_key].append( 

1147 { 

1148 "value": model.model_key, 

1149 "label": model.model_label, 

1150 "provider": model.provider.upper(), 

1151 } 

1152 ) 

1153 

1154 # If we have cached data for all providers, return it 

1155 if providers: 

1156 _log_available_models_duration( 

1157 endpoint_start, cache_hit=True 

1158 ) 

1159 logger.info("Returning cached models from database") 

1160 return jsonify( 

1161 { 

1162 "provider_options": provider_options, 

1163 "providers": providers, 

1164 } 

1165 ) 

1166 

1167 except Exception: 

1168 logger.warning("Error reading cached models from database") 

1169 # Continue to fetch fresh data 

1170 

1171 # Try to get Ollama models 

1172 ollama_models = [] 

1173 try: 

1174 import json 

1175 import re 

1176 

1177 import requests 

1178 

1179 # Try to query the Ollama API directly 

1180 try: 

1181 logger.info("Attempting to connect to Ollama API") 

1182 

1183 raw_base_url = _get_setting_from_session( 

1184 "llm.ollama.url", DEFAULT_OLLAMA_URL 

1185 ) 

1186 base_url = ( 

1187 normalize_url(raw_base_url) 

1188 if raw_base_url 

1189 else DEFAULT_OLLAMA_URL 

1190 ) 

1191 

1192 ollama_fetch_start = time.perf_counter() 

1193 ollama_response = safe_get( 

1194 f"{base_url}/api/tags", 

1195 timeout=5, 

1196 allow_localhost=True, 

1197 allow_private_ips=True, 

1198 ) 

1199 ollama_fetch_ms = ( 

1200 time.perf_counter() - ollama_fetch_start 

1201 ) * 1000 

1202 if ollama_fetch_ms > 1000: 1202 ↛ 1203line 1202 didn't jump to line 1203 because the condition on line 1202 was never true

1203 logger.info( 

1204 f"Ollama /api/tags fetch took {ollama_fetch_ms:.0f}ms" 

1205 ) 

1206 else: 

1207 logger.debug( 

1208 f"Ollama /api/tags fetch took {ollama_fetch_ms:.0f}ms" 

1209 ) 

1210 

1211 logger.debug( 

1212 f"Ollama API response: Status {ollama_response.status_code}" 

1213 ) 

1214 

1215 # Try to parse the response even if status code is not 200 to help with debugging 

1216 response_text = ollama_response.text 

1217 logger.debug( 

1218 f"Ollama API raw response: {response_text[:500]}..." 

1219 ) 

1220 

1221 if ollama_response.status_code == 200: 1221 ↛ 1291line 1221 didn't jump to line 1291 because the condition on line 1221 was always true

1222 try: 

1223 ollama_data = ollama_response.json() 

1224 logger.debug( 

1225 f"Ollama API JSON data: {json.dumps(ollama_data)[:500]}..." 

1226 ) 

1227 

1228 if "models" in ollama_data: 1228 ↛ 1258line 1228 didn't jump to line 1258 because the condition on line 1228 was always true

1229 # Format for newer Ollama API 

1230 logger.info( 

1231 f"Found {len(ollama_data.get('models', []))} models in newer Ollama API format" 

1232 ) 

1233 for model in ollama_data.get("models", []): 1233 ↛ 1235line 1233 didn't jump to line 1235 because the loop on line 1233 never started

1234 # Extract name correctly from the model object 

1235 name = model.get("name", "") 

1236 if name: 

1237 # Improved display name formatting 

1238 display_name = re.sub( 

1239 r"[:/]", " ", name 

1240 ).strip() 

1241 display_name = " ".join( 

1242 word.capitalize() 

1243 for word in display_name.split() 

1244 ) 

1245 # Create the model entry with value and label 

1246 ollama_models.append( 

1247 { 

1248 "value": name, # Original model name as value (for API calls) 

1249 "label": f"{display_name} (Ollama)", # Pretty name as label 

1250 "provider": "ollama", # Add provider field for consistency 

1251 } 

1252 ) 

1253 logger.debug( 

1254 f"Added Ollama model: {name} -> {display_name}" 

1255 ) 

1256 else: 

1257 # Format for older Ollama API 

1258 logger.info( 

1259 f"Found {len(ollama_data)} models in older Ollama API format" 

1260 ) 

1261 for model in ollama_data: 

1262 name = model.get("name", "") 

1263 if name: 

1264 # Improved display name formatting 

1265 display_name = re.sub( 

1266 r"[:/]", " ", name 

1267 ).strip() 

1268 display_name = " ".join( 

1269 word.capitalize() 

1270 for word in display_name.split() 

1271 ) 

1272 ollama_models.append( 

1273 { 

1274 "value": name, 

1275 "label": f"{display_name} (Ollama)", 

1276 "provider": "ollama", # Add provider field for consistency 

1277 } 

1278 ) 

1279 logger.debug( 

1280 f"Added Ollama model: {name} -> {display_name}" 

1281 ) 

1282 

1283 except json.JSONDecodeError as json_err: 

1284 logger.exception( 

1285 f"Failed to parse Ollama API response as JSON: {json_err}" 

1286 ) 

1287 raise ValueError( 

1288 f"Ollama API returned invalid JSON: {json_err}" 

1289 ) 

1290 else: 

1291 logger.warning( 

1292 f"Ollama API returned non-200 status code: {ollama_response.status_code}" 

1293 ) 

1294 raise ValueError( 

1295 f"Ollama API returned status code {ollama_response.status_code}" 

1296 ) 

1297 

1298 except requests.exceptions.RequestException: 

1299 logger.warning("Could not connect to Ollama API") 

1300 # No fallback models - just return empty list 

1301 logger.info("Ollama not available - no models to display") 

1302 ollama_models = [] 

1303 

1304 # Always set the ollama_models in providers, whether we got real or fallback models 

1305 providers["ollama_models"] = ollama_models 

1306 logger.info(f"Final Ollama models count: {len(ollama_models)}") 

1307 

1308 # Log some model names for debugging 

1309 if ollama_models: 1309 ↛ 1310line 1309 didn't jump to line 1310 because the condition on line 1309 was never true

1310 model_names = [m["value"] for m in ollama_models[:5]] 

1311 logger.info(f"Sample Ollama models: {', '.join(model_names)}") 

1312 

1313 except Exception: 

1314 logger.exception("Error getting Ollama models") 

1315 # No fallback models - just return empty list 

1316 logger.info("Error getting Ollama models - no models to display") 

1317 providers["ollama_models"] = [] 

1318 

1319 # Note: OpenAI-Compatible Endpoint models are fetched via auto-discovery 

1320 # (see the auto-discovery loop below which handles OPENAI_ENDPOINT provider) 

1321 

1322 # Get OpenAI models using the OpenAI package 

1323 openai_models = [] 

1324 try: 

1325 logger.info( 

1326 "Attempting to connect to OpenAI API using OpenAI package" 

1327 ) 

1328 

1329 # Get the API key from settings 

1330 api_key = _get_setting_from_session("llm.openai.api_key", "") 

1331 

1332 if api_key: 1332 ↛ 1333line 1332 didn't jump to line 1333 because the condition on line 1332 was never true

1333 import openai 

1334 from openai import OpenAI 

1335 

1336 # Create OpenAI client 

1337 client = OpenAI(api_key=api_key) 

1338 

1339 try: 

1340 # Fetch models using the client 

1341 logger.debug("Fetching models from OpenAI API") 

1342 openai_fetch_start = time.perf_counter() 

1343 models_response = client.models.list() 

1344 openai_fetch_ms = ( 

1345 time.perf_counter() - openai_fetch_start 

1346 ) * 1000 

1347 if openai_fetch_ms > 1000: 

1348 logger.info( 

1349 f"OpenAI models.list() took {openai_fetch_ms:.0f}ms" 

1350 ) 

1351 else: 

1352 logger.debug( 

1353 f"OpenAI models.list() took {openai_fetch_ms:.0f}ms" 

1354 ) 

1355 

1356 # Process models from the response 

1357 for model in models_response.data: 

1358 model_id = model.id 

1359 if model_id: 

1360 # Create a clean display name 

1361 display_name = model_id.replace("-", " ").strip() 

1362 display_name = " ".join( 

1363 word.capitalize() 

1364 for word in display_name.split() 

1365 ) 

1366 

1367 openai_models.append( 

1368 { 

1369 "value": model_id, 

1370 "label": f"{display_name} (OpenAI)", 

1371 "provider": "openai", 

1372 } 

1373 ) 

1374 logger.debug( 

1375 f"Added OpenAI model: {model_id} -> {display_name}" 

1376 ) 

1377 

1378 # Keep original order from OpenAI - their models are returned in a 

1379 # meaningful order (newer/more capable models first) 

1380 

1381 except openai.APIError as api_err: 

1382 logger.exception(f"OpenAI API error: {api_err!s}") 

1383 logger.info("No OpenAI models found due to API error") 

1384 

1385 else: 

1386 logger.info( 

1387 "OpenAI API key not configured, no models available" 

1388 ) 

1389 

1390 except Exception: 

1391 logger.exception("Error getting OpenAI models") 

1392 logger.info("No OpenAI models available due to error") 

1393 

1394 # Always set the openai_models in providers (will be empty array if no models found) 

1395 providers["openai_models"] = openai_models 

1396 logger.info(f"Final OpenAI models count: {len(openai_models)}") 

1397 

1398 # Try to get Anthropic models using the Anthropic package 

1399 anthropic_models = [] 

1400 try: 

1401 logger.info( 

1402 "Attempting to connect to Anthropic API using Anthropic package" 

1403 ) 

1404 

1405 # Get the API key from settings 

1406 api_key = _get_setting_from_session("llm.anthropic.api_key", "") 

1407 

1408 if api_key: 

1409 # Import Anthropic package here to avoid dependency issues if not installed 

1410 from anthropic import Anthropic 

1411 

1412 # Create Anthropic client 

1413 anthropic_client = Anthropic(api_key=api_key) 

1414 

1415 try: 

1416 # Fetch models using the client 

1417 logger.debug("Fetching models from Anthropic API") 

1418 models_response = anthropic_client.models.list() 

1419 

1420 # Process models from the response 

1421 for model in models_response.data: 

1422 model_id = model.id 

1423 if model_id: 

1424 # Create a clean display name 

1425 display_name = model_id.replace("-", " ").strip() 

1426 display_name = " ".join( 

1427 word.capitalize() 

1428 for word in display_name.split() 

1429 ) 

1430 

1431 anthropic_models.append( 

1432 { 

1433 "value": model_id, 

1434 "label": f"{display_name} (Anthropic)", 

1435 "provider": "anthropic", 

1436 } 

1437 ) 

1438 logger.debug( 

1439 f"Added Anthropic model: {model_id} -> {display_name}" 

1440 ) 

1441 

1442 except Exception as api_err: 

1443 logger.exception(f"Anthropic API error: {api_err!s}") 

1444 else: 

1445 logger.info("Anthropic API key not configured") 

1446 

1447 except ImportError: 

1448 logger.warning( 

1449 "Anthropic package not installed. No models will be available." 

1450 ) 

1451 except Exception: 

1452 logger.exception("Error getting Anthropic models") 

1453 

1454 # Set anthropic_models in providers (could be empty if API call failed) 

1455 providers["anthropic_models"] = anthropic_models 

1456 logger.info(f"Final Anthropic models count: {len(anthropic_models)}") 

1457 

1458 # Fetch models from auto-discovered providers 

1459 from ...llm.providers import discover_providers 

1460 

1461 discovered_providers = discover_providers() 

1462 

1463 for provider_key, provider_info in discovered_providers.items(): 

1464 provider_models = [] 

1465 try: 

1466 logger.info( 

1467 f"Fetching models from {provider_info.provider_name}" 

1468 ) 

1469 

1470 # Get the provider class 

1471 provider_class = provider_info.provider_class 

1472 

1473 # Get API key if configured 

1474 api_key = _get_setting_from_session( 

1475 provider_class.api_key_setting, "" 

1476 ) 

1477 

1478 # Get base URL if provider has configurable URL 

1479 provider_base_url: str | None = None 

1480 if ( 

1481 hasattr(provider_class, "url_setting") 

1482 and provider_class.url_setting 

1483 ): 

1484 provider_base_url = _get_setting_from_session( 

1485 provider_class.url_setting, "" 

1486 ) 

1487 

1488 # Use the provider's list_models_for_api method 

1489 models = provider_class.list_models_for_api( 

1490 api_key, provider_base_url 

1491 ) 

1492 

1493 # Format models for the API response 

1494 for model in models: 

1495 provider_models.append( 

1496 { 

1497 "value": model["value"], 

1498 "label": model[ 

1499 "label" 

1500 ], # Use provider's label as-is 

1501 "provider": provider_key, 

1502 } 

1503 ) 

1504 

1505 logger.info( 

1506 f"Successfully fetched {len(provider_models)} models from {provider_info.provider_name}" 

1507 ) 

1508 

1509 except Exception: 

1510 logger.exception( 

1511 f"Error getting {provider_info.provider_name} models" 

1512 ) 

1513 

1514 # Set models in providers dict using lowercase key 

1515 providers[f"{normalize_provider(provider_key)}_models"] = ( 

1516 provider_models 

1517 ) 

1518 logger.info( 

1519 f"Final {provider_key} models count: {len(provider_models)}" 

1520 ) 

1521 

1522 # Save fetched models to database cache 

1523 if force_refresh or providers: 1523 ↛ 1572line 1523 didn't jump to line 1572 because the condition on line 1523 was always true

1524 # We fetched fresh data, save it to database 

1525 username = session["username"] 

1526 with get_user_db_session(username) as db_session: 

1527 try: 

1528 if force_refresh: 

1529 # When force refresh, clear ALL cached models to remove any stale data 

1530 # from old code versions or deleted providers 

1531 deleted_count = db_session.query(ProviderModel).delete() 

1532 logger.info( 

1533 f"Force refresh: cleared all {deleted_count} cached models" 

1534 ) 

1535 else: 

1536 # Clear old cache entries only for providers we're updating 

1537 for provider_key in providers: 

1538 provider_name = provider_key.replace( 

1539 "_models", "" 

1540 ).upper() 

1541 db_session.query(ProviderModel).filter( 

1542 ProviderModel.provider == provider_name 

1543 ).delete() 

1544 

1545 # Insert new models 

1546 for provider_key, models in providers.items(): 

1547 provider_name = provider_key.replace( 

1548 "_models", "" 

1549 ).upper() 

1550 for model in models: 

1551 if ( 1551 ↛ 1550line 1551 didn't jump to line 1550 because the condition on line 1551 was always true

1552 isinstance(model, dict) 

1553 and "value" in model 

1554 and "label" in model 

1555 ): 

1556 new_model = ProviderModel( 

1557 provider=provider_name, 

1558 model_key=model["value"], 

1559 model_label=model["label"], 

1560 last_updated=datetime.now(UTC), 

1561 ) 

1562 db_session.add(new_model) 

1563 

1564 db_session.commit() 

1565 logger.info("Successfully cached models to database") 

1566 

1567 except Exception: 

1568 logger.exception("Error saving models to database cache") 

1569 db_session.rollback() 

1570 

1571 # Return all options 

1572 _log_available_models_duration(endpoint_start, cache_hit=False) 

1573 return jsonify( 

1574 {"provider_options": provider_options, "providers": providers} 

1575 ) 

1576 

1577 except Exception: 

1578 logger.exception("Error getting available models") 

1579 _log_available_models_duration( 

1580 endpoint_start, cache_hit=False, error=True 

1581 ) 

1582 return jsonify( 

1583 { 

1584 "status": "error", 

1585 "message": "Failed to retrieve available models", 

1586 } 

1587 ), 500 

1588 

1589 

1590def _log_available_models_duration( 

1591 start: float, cache_hit: bool, error: bool = False 

1592) -> None: 

1593 """Log /api/available-models endpoint duration. 

1594 

1595 Uses INFO when the endpoint took > 1s (indicating a real provider fetch 

1596 latency worth flagging), DEBUG otherwise. This is the likely culprit for 

1597 Path C (LLM provider timeout masquerading as backend hang). 

1598 """ 

1599 elapsed_ms = (time.perf_counter() - start) * 1000 

1600 path = ( 

1601 "error" 

1602 if error 

1603 else ("cache hit" if cache_hit else "full provider fetch") 

1604 ) 

1605 if elapsed_ms > 1000: 

1606 logger.info(f"/api/available-models ({path}) took {elapsed_ms:.0f}ms") 

1607 else: 

1608 logger.debug(f"/api/available-models ({path}) took {elapsed_ms:.0f}ms") 

1609 

1610 

1611def _get_engine_icon_and_category( 

1612 engine_data: dict, engine_class=None 

1613) -> tuple: 

1614 """ 

1615 Get icon emoji and category label for a search engine based on its attributes. 

1616 

1617 Args: 

1618 engine_data: Engine configuration dictionary 

1619 engine_class: Optional loaded engine class to check attributes 

1620 

1621 Returns: 

1622 Tuple of (icon, category) strings 

1623 """ 

1624 # Check attributes from either the class or the engine data 

1625 if engine_class: 

1626 is_scientific = getattr(engine_class, "is_scientific", False) 

1627 is_generic = getattr(engine_class, "is_generic", False) 

1628 is_local = getattr(engine_class, "is_local", False) 

1629 is_news = getattr(engine_class, "is_news", False) 

1630 is_code = getattr(engine_class, "is_code", False) 

1631 else: 

1632 is_scientific = engine_data.get("is_scientific", False) 

1633 is_generic = engine_data.get("is_generic", False) 

1634 is_local = engine_data.get("is_local", False) 

1635 is_news = engine_data.get("is_news", False) 

1636 is_code = engine_data.get("is_code", False) 

1637 

1638 # Check books attribute 

1639 if engine_class: 

1640 is_books = getattr(engine_class, "is_books", False) 

1641 else: 

1642 is_books = engine_data.get("is_books", False) 

1643 

1644 # Return icon and category based on engine type 

1645 # Priority: local > scientific > news > code > books > generic > default 

1646 if is_local: 

1647 return "📁", "Local RAG" 

1648 if is_scientific: 

1649 return "🔬", "Scientific" 

1650 if is_news: 

1651 return "📰", "News" 

1652 if is_code: 

1653 return "💻", "Code" 

1654 if is_books: 

1655 return "📚", "Books" 

1656 if is_generic: 

1657 return "🌐", "Web Search" 

1658 return "🔍", "Search" 

1659 

1660 

1661@settings_bp.route("/api/available-search-engines", methods=["GET"]) 

1662@login_required 

1663@with_user_session() 

1664def api_get_available_search_engines(db_session=None, settings_manager=None): 

1665 """Get available search engines""" 

1666 try: 

1667 # Get search engines using the same approach as search_engines_config.py 

1668 from ...web_search_engines.search_engines_config import search_config 

1669 

1670 username = session["username"] 

1671 search_engines = search_config(username=username, db_session=db_session) 

1672 

1673 # Get user's favorites using SettingsManager 

1674 favorites = settings_manager.get_setting("search.favorites", []) 

1675 if not isinstance(favorites, list): 1675 ↛ 1676line 1675 didn't jump to line 1676 because the condition on line 1675 was never true

1676 favorites = [] 

1677 

1678 # Extract search engines from config 

1679 engines_dict = {} 

1680 engine_options = [] 

1681 

1682 if search_engines: 1682 ↛ 1749line 1682 didn't jump to line 1749 because the condition on line 1682 was always true

1683 # Format engines for API response with metadata 

1684 from ...security.module_whitelist import ( 

1685 get_safe_module_class, 

1686 SecurityError, 

1687 ) 

1688 

1689 for engine_id, engine_data in search_engines.items(): 

1690 # Try to load the engine class to get metadata 

1691 engine_class = None 

1692 try: 

1693 module_path = engine_data.get("module_path") 

1694 class_name = engine_data.get("class_name") 

1695 if module_path and class_name: 1695 ↛ 1710line 1695 didn't jump to line 1710 because the condition on line 1695 was always true

1696 # Use secure whitelist-validated import 

1697 engine_class = get_safe_module_class( 

1698 module_path, class_name 

1699 ) 

1700 except SecurityError: 

1701 logger.warning( 

1702 f"Security: Blocked unsafe module for {engine_id}" 

1703 ) 

1704 except Exception as e: 

1705 logger.debug( 

1706 f"Could not load engine class for {engine_id}: {e}" 

1707 ) 

1708 

1709 # Get icon and category from engine attributes 

1710 icon, category = _get_engine_icon_and_category( 

1711 engine_data, engine_class 

1712 ) 

1713 

1714 # Check if engine requires an API key 

1715 requires_api_key = engine_data.get("requires_api_key", False) 

1716 

1717 # Build display name with icon, category, and API key status 

1718 base_name = engine_data.get("display_name", engine_id) 

1719 if requires_api_key: 

1720 label = f"{icon} {base_name} ({category}, API key)" 

1721 else: 

1722 label = f"{icon} {base_name} ({category}, Free)" 

1723 

1724 # Check if engine is a favorite 

1725 is_favorite = engine_id in favorites 

1726 

1727 engines_dict[engine_id] = { 

1728 "display_name": base_name, 

1729 "description": engine_data.get("description", ""), 

1730 "strengths": engine_data.get("strengths", []), 

1731 "icon": icon, 

1732 "category": category, 

1733 "requires_api_key": requires_api_key, 

1734 "is_favorite": is_favorite, 

1735 } 

1736 

1737 engine_options.append( 

1738 { 

1739 "value": engine_id, 

1740 "label": label, 

1741 "icon": icon, 

1742 "category": category, 

1743 "requires_api_key": requires_api_key, 

1744 "is_favorite": is_favorite, 

1745 } 

1746 ) 

1747 

1748 # Sort engine_options: favorites first, then alphabetically by label 

1749 engine_options.sort( 

1750 key=lambda x: ( 

1751 not x.get("is_favorite", False), 

1752 x.get("label", "").lower(), 

1753 ) 

1754 ) 

1755 

1756 # If no engines found, log the issue but return empty list 

1757 if not engine_options: 1757 ↛ 1758line 1757 didn't jump to line 1758 because the condition on line 1757 was never true

1758 logger.warning("No search engines found in configuration") 

1759 

1760 return jsonify( 

1761 { 

1762 "engines": engines_dict, 

1763 "engine_options": engine_options, 

1764 "favorites": favorites, 

1765 } 

1766 ) 

1767 

1768 except Exception: 

1769 logger.exception("Error getting available search engines") 

1770 return jsonify({"error": "Failed to retrieve search engines"}), 500 

1771 

1772 

1773@settings_bp.route("/api/search-favorites", methods=["GET"]) 

1774@login_required 

1775@with_user_session() 

1776def api_get_search_favorites(db_session=None, settings_manager=None): 

1777 """Get the list of favorite search engines for the current user""" 

1778 try: 

1779 favorites = settings_manager.get_setting("search.favorites", []) 

1780 if not isinstance(favorites, list): 

1781 favorites = [] 

1782 return jsonify({"favorites": favorites}) 

1783 

1784 except Exception: 

1785 logger.exception("Error getting search favorites") 

1786 return jsonify({"error": "Failed to retrieve favorites"}), 500 

1787 

1788 

1789@settings_bp.route("/api/search-favorites", methods=["PUT"]) 

1790@login_required 

1791@require_json_body(error_message="No data provided") 

1792@with_user_session() 

1793def api_update_search_favorites(db_session=None, settings_manager=None): 

1794 """Update the list of favorite search engines for the current user""" 

1795 try: 

1796 data = request.get_json() 

1797 favorites = data.get("favorites") 

1798 if favorites is None: 

1799 return jsonify({"error": "No favorites provided"}), 400 

1800 

1801 if not isinstance(favorites, list): 

1802 return jsonify({"error": "Favorites must be a list"}), 400 

1803 

1804 if settings_manager.set_setting("search.favorites", favorites): 

1805 invalidate_settings_caches(session["username"]) 

1806 return jsonify( 

1807 { 

1808 "message": "Favorites updated successfully", 

1809 "favorites": favorites, 

1810 } 

1811 ) 

1812 return jsonify({"error": "Failed to update favorites"}), 500 

1813 

1814 except Exception: 

1815 logger.exception("Error updating search favorites") 

1816 return jsonify({"error": "Failed to update favorites"}), 500 

1817 

1818 

1819@settings_bp.route("/api/search-favorites/toggle", methods=["POST"]) 

1820@login_required 

1821@require_json_body(error_message="No data provided") 

1822@with_user_session() 

1823def api_toggle_search_favorite(db_session=None, settings_manager=None): 

1824 """Toggle a search engine as favorite""" 

1825 try: 

1826 data = request.get_json() 

1827 engine_id = data.get("engine_id") 

1828 if not engine_id: 

1829 return jsonify({"error": "No engine_id provided"}), 400 

1830 

1831 # Get current favorites 

1832 favorites = settings_manager.get_setting("search.favorites", []) 

1833 if not isinstance(favorites, list): 

1834 favorites = [] 

1835 else: 

1836 # Make a copy to avoid modifying the original 

1837 favorites = list(favorites) 

1838 

1839 # Toggle the engine 

1840 is_favorite = engine_id in favorites 

1841 if is_favorite: 

1842 favorites.remove(engine_id) 

1843 is_favorite = False 

1844 else: 

1845 favorites.append(engine_id) 

1846 is_favorite = True 

1847 

1848 # Update the setting 

1849 if settings_manager.set_setting("search.favorites", favorites): 

1850 invalidate_settings_caches(session["username"]) 

1851 return jsonify( 

1852 { 

1853 "message": "Favorite toggled successfully", 

1854 "engine_id": engine_id, 

1855 "is_favorite": is_favorite, 

1856 "favorites": favorites, 

1857 } 

1858 ) 

1859 return jsonify({"error": "Failed to toggle favorite"}), 500 

1860 

1861 except Exception: 

1862 logger.exception("Error toggling search favorite") 

1863 return jsonify({"error": "Failed to toggle favorite"}), 500 

1864 

1865 

1866# Legacy routes for backward compatibility - these will redirect to the new routes 

1867@settings_bp.route("/main", methods=["GET"]) 

1868@login_required 

1869def main_config_page(): 

1870 """Redirect to app settings page""" 

1871 return redirect(url_for("settings.settings_page")) 

1872 

1873 

1874@settings_bp.route("/collections", methods=["GET"]) 

1875@login_required 

1876def collections_config_page(): 

1877 """Redirect to app settings page""" 

1878 return redirect(url_for("settings.settings_page")) 

1879 

1880 

1881@settings_bp.route("/api_keys", methods=["GET"]) 

1882@login_required 

1883def api_keys_config_page(): 

1884 """Redirect to LLM settings page""" 

1885 return redirect(url_for("settings.settings_page")) 

1886 

1887 

1888@settings_bp.route("/search_engines", methods=["GET"]) 

1889@login_required 

1890def search_engines_config_page(): 

1891 """Redirect to search settings page""" 

1892 return redirect(url_for("settings.settings_page")) 

1893 

1894 

1895@settings_bp.route("/llm", methods=["GET"]) 

1896@login_required 

1897def llm_config_page(): 

1898 """Redirect to LLM settings page""" 

1899 return redirect(url_for("settings.settings_page")) 

1900 

1901 

1902@settings_bp.route("/open_file_location", methods=["POST"]) 

1903@login_required 

1904def open_file_location(): 

1905 """Open the location of a configuration file. 

1906 

1907 Security: This endpoint is disabled for server deployments. 

1908 It only makes sense for desktop usage where the server and client are on the same machine. 

1909 """ 

1910 return jsonify( 

1911 { 

1912 "status": "error", 

1913 "message": "This feature is disabled. It is only available in desktop mode.", 

1914 } 

1915 ), 403 

1916 

1917 

1918@settings_bp.context_processor 

1919def inject_csrf_token(): 

1920 """Inject CSRF token into the template context for all settings routes.""" 

1921 return {"csrf_token": generate_csrf} 

1922 

1923 

1924@settings_bp.route("/fix_corrupted_settings", methods=["POST"]) 

1925@login_required 

1926@settings_limit 

1927@with_user_session(include_settings_manager=False) 

1928def fix_corrupted_settings(db_session=None): 

1929 """Fix corrupted settings in the database""" 

1930 try: 

1931 # Track fixed and removed settings 

1932 fixed_settings = [] 

1933 removed_duplicate_settings = [] 

1934 # First, find and remove duplicate settings with the same key 

1935 # This happens because of errors in settings import/export 

1936 from sqlalchemy import func as sql_func 

1937 

1938 # Find keys with duplicates 

1939 duplicate_keys = ( 

1940 db_session.query(Setting.key) 

1941 .group_by(Setting.key) 

1942 .having(sql_func.count(Setting.key) > 1) 

1943 .all() 

1944 ) 

1945 duplicate_keys = [key[0] for key in duplicate_keys] 

1946 

1947 # For each duplicate key, keep the latest updated one and remove others 

1948 for key in duplicate_keys: 

1949 dupe_settings = ( 

1950 db_session.query(Setting) 

1951 .filter(Setting.key == key) 

1952 .order_by(Setting.updated_at.desc()) 

1953 .all() 

1954 ) 

1955 

1956 # Keep the first one (most recently updated) and delete the rest 

1957 for i, setting in enumerate(dupe_settings): 

1958 if i > 0: # Skip the first one (keep it) 

1959 db_session.delete(setting) 

1960 removed_duplicate_settings.append(key) 

1961 

1962 # Check for settings with corrupted values 

1963 all_settings = db_session.query(Setting).all() 

1964 for setting in all_settings: 

1965 # Check different types of corruption 

1966 is_corrupted = False 

1967 

1968 if ( 

1969 setting.value is None 

1970 or ( 

1971 isinstance(setting.value, str) 

1972 and setting.value 

1973 in [ 

1974 "{", 

1975 "[", 

1976 "{}", 

1977 "[]", 

1978 "[object Object]", 

1979 "null", 

1980 "undefined", 

1981 ] 

1982 ) 

1983 or (isinstance(setting.value, dict) and len(setting.value) == 0) 

1984 ): 

1985 is_corrupted = True 

1986 

1987 # Skip if not corrupted 

1988 if not is_corrupted: 

1989 continue 

1990 

1991 default_value: Any = None 

1992 

1993 # Try to find a matching default setting based on key 

1994 if setting.key.startswith("llm."): 

1995 if setting.key == "llm.model": 

1996 default_value = "" 

1997 elif setting.key == "llm.provider": 

1998 default_value = "ollama" 

1999 elif setting.key == "llm.temperature": 

2000 default_value = 0.7 

2001 elif setting.key == "llm.max_tokens": 2001 ↛ 2042line 2001 didn't jump to line 2042 because the condition on line 2001 was always true

2002 default_value = 1024 

2003 elif setting.key.startswith("search."): 

2004 if setting.key == "search.tool": 

2005 default_value = "auto" 

2006 elif setting.key == "search.max_results": 

2007 default_value = 10 

2008 elif setting.key == "search.region": 

2009 default_value = "us" 

2010 elif setting.key == "search.questions_per_iteration": 

2011 default_value = 3 

2012 elif setting.key == "search.searches_per_section": 

2013 default_value = 2 

2014 elif setting.key == "search.skip_relevance_filter": 

2015 default_value = False 

2016 elif setting.key == "search.safe_search": 

2017 default_value = True 

2018 elif setting.key == "search.search_language": 2018 ↛ 2042line 2018 didn't jump to line 2042 because the condition on line 2018 was always true

2019 default_value = "English" 

2020 elif setting.key.startswith("report."): 

2021 if setting.key == "report.searches_per_section": 

2022 default_value = 2 

2023 elif setting.key.startswith("app."): 2023 ↛ 2042line 2023 didn't jump to line 2042 because the condition on line 2023 was always true

2024 if ( 

2025 setting.key == "app.theme" 

2026 or setting.key == "app.default_theme" 

2027 ): 

2028 default_value = "dark" 

2029 elif setting.key == "app.enable_notifications" or ( 

2030 setting.key == "app.enable_web" 

2031 or setting.key == "app.web_interface" 

2032 ): 

2033 default_value = True 

2034 elif setting.key == "app.host": 

2035 default_value = "0.0.0.0" 

2036 elif setting.key == "app.port": 

2037 default_value = 5000 

2038 elif setting.key == "app.debug": 2038 ↛ 2042line 2038 didn't jump to line 2042 because the condition on line 2038 was always true

2039 default_value = True 

2040 

2041 # Update the setting with the default value if found 

2042 if default_value is not None: 

2043 setting.value = default_value 

2044 fixed_settings.append(setting.key) 

2045 else: 

2046 # If no default found but it's a corrupted JSON, set to empty object 

2047 if setting.key.startswith("report."): 2047 ↛ 1964line 2047 didn't jump to line 1964 because the condition on line 2047 was always true

2048 setting.value = {} 

2049 fixed_settings.append(setting.key) 

2050 

2051 # Commit changes 

2052 if fixed_settings or removed_duplicate_settings: 

2053 db_session.commit() 

2054 logger.info( 

2055 f"Fixed {len(fixed_settings)} corrupted settings: {', '.join(fixed_settings)}" 

2056 ) 

2057 if removed_duplicate_settings: 

2058 logger.info( 

2059 f"Removed {len(removed_duplicate_settings)} duplicate settings" 

2060 ) 

2061 invalidate_settings_caches(session["username"]) 

2062 

2063 # Return success 

2064 return jsonify( 

2065 { 

2066 "status": "success", 

2067 "message": f"Fixed {len(fixed_settings)} corrupted settings, removed {len(removed_duplicate_settings)} duplicates", 

2068 "fixed_settings": fixed_settings, 

2069 "removed_duplicates": removed_duplicate_settings, 

2070 } 

2071 ) 

2072 

2073 except Exception: 

2074 logger.exception("Error fixing corrupted settings") 

2075 db_session.rollback() 

2076 return ( 

2077 jsonify( 

2078 { 

2079 "status": "error", 

2080 "message": "An internal error occurred while fixing corrupted settings. Please try again later.", 

2081 } 

2082 ), 

2083 500, 

2084 ) 

2085 

2086 

2087@settings_bp.route("/api/warnings", methods=["GET"]) 

2088@login_required 

2089def api_get_warnings(): 

2090 """Get current warnings based on settings""" 

2091 try: 

2092 warnings = calculate_warnings() 

2093 return jsonify({"warnings": warnings}) 

2094 except Exception: 

2095 logger.exception("Error getting warnings") 

2096 return jsonify({"error": "Failed to retrieve warnings"}), 500 

2097 

2098 

2099@settings_bp.route("/api/backup-status", methods=["GET"]) 

2100@login_required 

2101def api_get_backup_status(): 

2102 """Get backup status for the current user.""" 

2103 try: 

2104 from ...config.paths import get_user_backup_directory 

2105 

2106 username = session.get("username") 

2107 if not username: 

2108 return jsonify({"error": "Not authenticated"}), 401 

2109 

2110 from ...utilities.formatting import human_size 

2111 

2112 backup_dir = get_user_backup_directory(username) 

2113 

2114 # Sort by modification time (not filename) for robustness 

2115 backup_list = [] 

2116 total_size = 0 

2117 for b in backup_dir.glob("ldr_backup_*.db"): 

2118 try: 

2119 stat = b.stat() 

2120 total_size += stat.st_size 

2121 backup_list.append( 

2122 { 

2123 "filename": b.name, 

2124 "size_bytes": stat.st_size, 

2125 "size_human": human_size(stat.st_size), 

2126 "created_at": datetime.fromtimestamp( 

2127 stat.st_mtime, tz=timezone.utc 

2128 ).isoformat(), 

2129 "_mtime": stat.st_mtime, 

2130 } 

2131 ) 

2132 except FileNotFoundError: 

2133 continue 

2134 

2135 # Sort newest first by mtime, then remove internal field 

2136 backup_list.sort(key=lambda x: x["_mtime"], reverse=True) 

2137 for entry in backup_list: 

2138 del entry["_mtime"] 

2139 

2140 backup_enabled = _get_setting_from_session("backup.enabled", True) 

2141 

2142 return jsonify( 

2143 { 

2144 "enabled": bool(backup_enabled), 

2145 "count": len(backup_list), 

2146 "backups": backup_list, 

2147 "total_size_bytes": total_size, 

2148 "total_size_human": human_size(total_size), 

2149 } 

2150 ) 

2151 

2152 except Exception: 

2153 logger.exception("Error getting backup status") 

2154 return jsonify({"error": "Failed to retrieve backup status"}), 500 

2155 

2156 

2157@settings_bp.route("/api/ollama-status", methods=["GET"]) 

2158@login_required 

2159def check_ollama_status(): 

2160 """Check if Ollama is running and available""" 

2161 try: 

2162 # Get Ollama URL from settings 

2163 raw_base_url = _get_setting_from_session( 

2164 "llm.ollama.url", DEFAULT_OLLAMA_URL 

2165 ) 

2166 base_url = ( 

2167 normalize_url(raw_base_url) if raw_base_url else DEFAULT_OLLAMA_URL 

2168 ) 

2169 

2170 response = safe_get( 

2171 f"{base_url}/api/version", 

2172 timeout=2, 

2173 allow_localhost=True, 

2174 allow_private_ips=True, 

2175 ) 

2176 

2177 if response.status_code == 200: 

2178 return jsonify( 

2179 { 

2180 "running": True, 

2181 "version": response.json().get("version", "unknown"), 

2182 } 

2183 ) 

2184 return jsonify( 

2185 { 

2186 "running": False, 

2187 "error": f"Ollama returned status code {response.status_code}", 

2188 } 

2189 ) 

2190 except requests.exceptions.RequestException: 

2191 logger.exception("Ollama check failed") 

2192 return jsonify( 

2193 {"running": False, "error": "Failed to check search engine status"} 

2194 ) 

2195 

2196 

2197@settings_bp.route("/api/rate-limiting/status", methods=["GET"]) 

2198@login_required 

2199def api_get_rate_limiting_status(): 

2200 """Get current rate limiting status and statistics""" 

2201 try: 

2202 from ...web_search_engines.rate_limiting import get_tracker 

2203 

2204 tracker = get_tracker() 

2205 

2206 # Get basic status 

2207 status = { 

2208 "enabled": tracker.enabled, 

2209 "exploration_rate": tracker.exploration_rate, 

2210 "learning_rate": tracker.learning_rate, 

2211 "memory_window": tracker.memory_window, 

2212 } 

2213 

2214 # Get engine statistics 

2215 engine_stats = tracker.get_stats() 

2216 engines = [] 

2217 

2218 for stat in engine_stats: 

2219 ( 

2220 engine_type, 

2221 base_wait, 

2222 min_wait, 

2223 max_wait, 

2224 last_updated, 

2225 total_attempts, 

2226 success_rate, 

2227 ) = stat 

2228 engines.append( 

2229 { 

2230 "engine_type": engine_type, 

2231 "base_wait_seconds": round(base_wait, 2), 

2232 "min_wait_seconds": round(min_wait, 2), 

2233 "max_wait_seconds": round(max_wait, 2), 

2234 "last_updated": last_updated, 

2235 "total_attempts": total_attempts, 

2236 "success_rate": ( 

2237 round(success_rate * 100, 1) if success_rate else 0.0 

2238 ), 

2239 } 

2240 ) 

2241 

2242 return jsonify({"status": status, "engines": engines}) 

2243 

2244 except Exception: 

2245 logger.exception("Error getting rate limiting status") 

2246 return jsonify({"error": "An internal error occurred"}), 500 

2247 

2248 

2249@settings_bp.route( 

2250 "/api/rate-limiting/engines/<engine_type>/reset", methods=["POST"] 

2251) 

2252@login_required 

2253def api_reset_engine_rate_limiting(engine_type): 

2254 """Reset rate limiting data for a specific engine""" 

2255 try: 

2256 from ...web_search_engines.rate_limiting import get_tracker 

2257 

2258 tracker = get_tracker() 

2259 tracker.reset_engine(engine_type) 

2260 

2261 return jsonify( 

2262 {"message": f"Rate limiting data reset for {engine_type}"} 

2263 ) 

2264 

2265 except Exception: 

2266 logger.exception(f"Error resetting rate limiting for {engine_type}") 

2267 return jsonify({"error": "An internal error occurred"}), 500 

2268 

2269 

2270@settings_bp.route("/api/rate-limiting/cleanup", methods=["POST"]) 

2271@login_required 

2272def api_cleanup_rate_limiting(): 

2273 """Clean up old rate limiting data. 

2274 

2275 Note: not using @require_json_body because the JSON body is optional 

2276 here — the endpoint works with or without a payload (defaults to 30 days). 

2277 """ 

2278 try: 

2279 from ...web_search_engines.rate_limiting import get_tracker 

2280 

2281 data = request.get_json() if request.is_json else None 

2282 days = data.get("days", 30) if data is not None else 30 

2283 

2284 tracker = get_tracker() 

2285 tracker.cleanup_old_data(days) 

2286 

2287 return jsonify( 

2288 {"message": f"Cleaned up rate limiting data older than {days} days"} 

2289 ) 

2290 

2291 except Exception: 

2292 logger.exception("Error cleaning up rate limiting data") 

2293 return jsonify({"error": "An internal error occurred"}), 500 

2294 

2295 

2296@settings_bp.route("/api/bulk", methods=["GET"]) 

2297@login_required 

2298def get_bulk_settings(): 

2299 """Get multiple settings at once for performance.""" 

2300 try: 

2301 # Get requested settings from query parameters 

2302 requested = request.args.getlist("keys[]") 

2303 if not requested: 

2304 # Default to common settings if none specified 

2305 requested = [ 

2306 "llm.provider", 

2307 "llm.model", 

2308 "search.tool", 

2309 "search.iterations", 

2310 "search.questions_per_iteration", 

2311 "search.search_strategy", 

2312 "benchmark.evaluation.provider", 

2313 "benchmark.evaluation.model", 

2314 "benchmark.evaluation.temperature", 

2315 "benchmark.evaluation.endpoint_url", 

2316 ] 

2317 

2318 # Fetch all settings at once 

2319 result = {} 

2320 for key in requested: 

2321 try: 

2322 value = _get_setting_from_session(key) 

2323 result[key] = {"value": value, "exists": value is not None} 

2324 except Exception: 

2325 logger.warning(f"Error getting setting {key}") 

2326 result[key] = { 

2327 "value": None, 

2328 "exists": False, 

2329 "error": "Failed to retrieve setting", 

2330 } 

2331 

2332 return jsonify({"success": True, "settings": result}) 

2333 

2334 except Exception: 

2335 logger.exception("Error getting bulk settings") 

2336 return jsonify( 

2337 {"success": False, "error": "An internal error occurred"} 

2338 ), 500 

2339 

2340 

2341@settings_bp.route("/api/data-location", methods=["GET"]) 

2342@login_required 

2343def api_get_data_location(): 

2344 """Get information about data storage location and security""" 

2345 try: 

2346 # Get the data directory path 

2347 data_dir = get_data_directory() 

2348 # Get the encrypted databases path 

2349 encrypted_db_path = get_encrypted_database_path() 

2350 

2351 # Check if LDR_DATA_DIR environment variable is set 

2352 from local_deep_research.settings.manager import SettingsManager 

2353 

2354 settings_manager = SettingsManager() 

2355 custom_data_dir = settings_manager.get_setting("bootstrap.data_dir") 

2356 

2357 # Get platform-specific default location info 

2358 platform_info = { 

2359 "Windows": "C:\\Users\\Username\\AppData\\Local\\local-deep-research", 

2360 "macOS": "~/Library/Application Support/local-deep-research", 

2361 "Linux": "~/.local/share/local-deep-research", 

2362 } 

2363 

2364 # Current platform 

2365 current_platform = platform.system() 

2366 if current_platform == "Darwin": 

2367 current_platform = "macOS" 

2368 

2369 # Get SQLCipher settings from environment 

2370 from ...database.sqlcipher_utils import get_sqlcipher_settings 

2371 

2372 # Debug logging 

2373 logger.info(f"db_manager type: {type(db_manager)}") 

2374 logger.info( 

2375 f"db_manager.has_encryption: {getattr(db_manager, 'has_encryption', 'ATTRIBUTE NOT FOUND')}" 

2376 ) 

2377 

2378 cipher_settings = ( 

2379 get_sqlcipher_settings() if db_manager.has_encryption else {} 

2380 ) 

2381 

2382 return jsonify( 

2383 { 

2384 "data_directory": str(data_dir), 

2385 "database_path": str(encrypted_db_path), 

2386 "encrypted_database_path": str(encrypted_db_path), 

2387 "is_custom": custom_data_dir is not None, 

2388 "custom_env_var": "LDR_DATA_DIR", 

2389 "custom_env_value": custom_data_dir, 

2390 "platform": current_platform, 

2391 "platform_default": platform_info.get( 

2392 current_platform, str(data_dir) 

2393 ), 

2394 "platform_info": platform_info, 

2395 "security_notice": { 

2396 "encrypted": db_manager.has_encryption, 

2397 "warning": "All data including API keys stored in the database are securely encrypted." 

2398 if db_manager.has_encryption 

2399 else "All data including API keys stored in the database are currently unencrypted. Please ensure appropriate file system permissions are set.", 

2400 "recommendation": "Your data is protected with database encryption." 

2401 if db_manager.has_encryption 

2402 else "Consider using environment variables for sensitive API keys instead of storing them in the database.", 

2403 }, 

2404 "encryption_settings": cipher_settings, 

2405 } 

2406 ) 

2407 

2408 except Exception: 

2409 logger.exception("Error getting data location information") 

2410 return jsonify({"error": "Failed to retrieve data location"}), 500 

2411 

2412 

2413@settings_bp.route("/api/notifications/test-url", methods=["POST"]) 

2414@login_required 

2415def api_test_notification_url(): 

2416 """ 

2417 Test a notification service URL. 

2418 

2419 This endpoint creates a temporary NotificationService instance to test 

2420 the provided URL. No database session or password is required because: 

2421 - The service URL is provided directly in the request body 

2422 - Test notifications use a temporary Apprise instance 

2423 - No user settings or database queries are performed 

2424 

2425 Security note: Rate limiting is not applied here because users need to 

2426 test URLs when configuring notifications. Abuse is mitigated by the 

2427 @login_required decorator and the fact that users can only spam their 

2428 own notification services. 

2429 """ 

2430 try: 

2431 from ...notifications.service import NotificationService 

2432 from ...settings.env_registry import get_env_setting 

2433 

2434 data = request.get_json() 

2435 if not data or "service_url" not in data: 

2436 return jsonify( 

2437 {"success": False, "error": "service_url is required"} 

2438 ), 400 

2439 

2440 service_url = data["service_url"] 

2441 

2442 # Create notification service instance and test the URL. 

2443 # Gate by the env-only master switch so the test endpoint cannot 

2444 # bypass the operator's risk-acceptance decision (see SECURITY.md 

2445 # "Notification Webhook SSRF"). 

2446 notification_service = NotificationService( 

2447 allow_private_ips=bool( 

2448 get_env_setting("notifications.allow_private_ips", False) 

2449 ), 

2450 outbound_allowed=bool( 

2451 get_env_setting("notifications.allow_outbound", False) 

2452 ), 

2453 ) 

2454 result = notification_service.test_service(service_url) 

2455 

2456 # Only return expected fields to prevent information leakage 

2457 safe_response = { 

2458 "success": result.get("success", False), 

2459 "message": result.get("message", ""), 

2460 "error": result.get("error", ""), 

2461 } 

2462 return jsonify(safe_response) 

2463 

2464 except Exception: 

2465 logger.exception("Error testing notification URL") 

2466 return jsonify( 

2467 { 

2468 "success": False, 

2469 "error": "Failed to test notification service. Check logs for details.", 

2470 } 

2471 ), 500