Coverage for src / local_deep_research / web / routes / settings_routes.py: 85%

932 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Settings Routes Module 

3 

4This module handles all settings-related HTTP endpoints for the application. 

5 

6CHECKBOX HANDLING PATTERN: 

7-------------------------- 

8This module supports TWO submission modes to handle checkboxes correctly: 

9 

10**MODE 1: AJAX/JSON Submission (Primary - /save_all_settings)** 

11- JavaScript intercepts form submission with e.preventDefault() 

12- Checkbox values read directly from DOM via checkbox.checked 

13- Data sent as JSON: {"setting.key": true/false} 

14- Hidden fallback inputs are managed but NOT used in this mode 

15- Provides better UX with instant feedback and validation 

16 

17**MODE 2: Traditional POST Submission (Fallback - /save_settings)** 

18- Used when JavaScript is disabled (accessibility/no-JS environments) 

19- Browser submits form data naturally via request.form 

20- Hidden fallback pattern CRITICAL here: 

21 * Checked checkbox: Submits checkbox value, hidden input disabled 

22 * Unchecked checkbox: Submits hidden input value "false" 

23- Ensures unchecked checkboxes are captured (HTML limitation workaround) 

24 

25**Implementation Details:** 

261. Each checkbox has `data-hidden-fallback` attribute → hidden input ID 

272. checkbox_handler.js manages hidden input disabled state 

283. AJAX mode: settings.js reads checkbox.checked directly (lines 2233-2240) 

294. POST mode: Flask reads request.form including enabled hidden inputs 

305. Both modes use convert_setting_value() for consistent boolean conversion 

31 

32**Why Both Patterns?** 

33- AJAX: Better UX, immediate validation, no page reload 

34- Traditional POST: Accessibility, progressive enhancement, JavaScript-free operation 

35- Hidden inputs: Only meaningful for traditional POST, ignored in AJAX mode 

36 

37This dual-mode approach ensures the app works for all users while providing 

38optimal experience when JavaScript is available. 

39""" 

40 

41import platform 

42from typing import Any, Optional, Tuple 

43from datetime import UTC, datetime, timedelta, timezone 

44 

45import requests 

46from flask import ( 

47 Blueprint, 

48 flash, 

49 jsonify, 

50 redirect, 

51 request, 

52 session, 

53 url_for, 

54) 

55from flask_wtf.csrf import generate_csrf 

56from loguru import logger 

57 

58from ...config.constants import DEFAULT_OLLAMA_URL 

59from ...llm.providers.base import normalize_provider 

60from ...config.paths import get_data_directory, get_encrypted_database_path 

61from ...database.models import Setting, SettingType 

62from ...database.session_context import get_user_db_session 

63from ...database.encrypted_db import db_manager 

64from ...utilities.db_utils import get_settings_manager 

65from ...utilities.url_utils import normalize_url 

66from ...security.decorators import require_json_body 

67from ..auth.decorators import login_required 

68from ...security.rate_limiter import settings_limit 

69from ...settings.manager import get_typed_setting_value, parse_boolean 

70from ..services.settings_service import ( 

71 create_or_update_setting, 

72 invalidate_settings_caches, 

73 set_setting, 

74) 

75from ..utils.route_decorators import with_user_session 

76from ..utils.templates import render_template_with_defaults 

77 

78 

79from ...security import safe_get 

80from ..warning_checks import calculate_warnings 

81 

82# Create a Blueprint for settings 

83settings_bp = Blueprint("settings", __name__, url_prefix="/settings") 

84 

85# NOTE: Routes use session["username"] (not .get()) intentionally. 

86# @login_required guarantees the key exists; direct access fails fast 

87# if the decorator is ever removed. 

88 

89# Settings with dynamically populated options (excluded from validation) 

90DYNAMIC_SETTINGS = ["llm.provider", "llm.model", "search.tool"] 

91 

92 

93def _get_setting_from_session(key: str, default=None): 

94 """Helper to get a setting using the current session context.""" 

95 username = session.get("username") 

96 with get_user_db_session(username) as db_session: 

97 if db_session: 

98 settings_manager = get_settings_manager(db_session, username) 

99 return settings_manager.get_setting(key, default) 

100 return default 

101 

102 

103def validate_setting( 

104 setting: Setting, value: Any 

105) -> Tuple[bool, Optional[str]]: 

106 """ 

107 Validate a setting value based on its type and constraints. 

108 

109 Args: 

110 setting: The Setting object to validate against 

111 value: The value to validate 

112 

113 Returns: 

114 Tuple of (is_valid, error_message) 

115 """ 

116 # Convert value to appropriate type first using SettingsManager's logic 

117 value = get_typed_setting_value( 

118 key=str(setting.key), 

119 value=value, 

120 ui_element=str(setting.ui_element), 

121 default=None, 

122 check_env=False, 

123 ) 

124 

125 # Validate based on UI element type 

126 if setting.ui_element == "checkbox": 

127 # After conversion, should be boolean 

128 if not isinstance(value, bool): 

129 return False, "Value must be a boolean" 

130 

131 elif setting.ui_element in ("number", "slider", "range"): 

132 # After conversion, should be numeric 

133 if not isinstance(value, (int, float)): 

134 return False, "Value must be a number" 

135 

136 # Check min/max constraints if defined 

137 if setting.min_value is not None and value < setting.min_value: 

138 return False, f"Value must be at least {setting.min_value}" 

139 if setting.max_value is not None and value > setting.max_value: 

140 return False, f"Value must be at most {setting.max_value}" 

141 

142 elif setting.ui_element == "select": 

143 # Check if value is in the allowed options 

144 if setting.options: 

145 # Skip options validation for dynamically populated dropdowns 

146 if setting.key not in DYNAMIC_SETTINGS: 

147 allowed_values = [ 

148 opt.get("value") if isinstance(opt, dict) else opt 

149 for opt in list(setting.options) # type: ignore[arg-type] 

150 ] 

151 if value not in allowed_values: 

152 return ( 

153 False, 

154 f"Value must be one of: {', '.join(str(v) for v in allowed_values)}", 

155 ) 

156 

157 # All checks passed 

158 return True, None 

159 

160 

161def coerce_setting_for_write(key: str, value: Any, ui_element: str) -> Any: 

162 """Coerce an incoming value to the correct type before writing to the DB. 

163 

164 All web routes that save settings should use this function to ensure 

165 consistent type conversion. 

166 

167 No JSON pre-parsing (``json.loads``) is needed here because: 

168 - ``get_typed_setting_value`` already parses JSON strings internally 

169 via ``_parse_json_value`` (for ``ui_element="json"``) and 

170 ``_parse_multiselect`` (for ``ui_element="multiselect"``). 

171 - For JSON API endpoints, ``request.get_json()`` already delivers 

172 dicts/lists as native Python objects. 

173 - For ``ui_element="text"``, pre-parsing would corrupt data: a JSON 

174 string like ``'{"k": "v"}'`` would become a dict, then ``str()`` 

175 would produce ``"{'k': 'v'}"`` (Python repr, not valid JSON). 

176 """ 

177 # check_env=False: we are persisting a user-supplied value, not reading 

178 # from an environment variable override. check_env=True (the default) 

179 # would silently replace the user's value with an env var, which is 

180 # incorrect on the write path. 

181 return get_typed_setting_value( 

182 key=key, 

183 value=value, 

184 ui_element=ui_element, 

185 default=None, 

186 check_env=False, 

187 ) 

188 

189 

190@settings_bp.route("/", methods=["GET"]) 

191@login_required 

192def settings_page(): 

193 """Main settings dashboard with links to specialized config pages""" 

194 return render_template_with_defaults("settings_dashboard.html") 

195 

196 

197@settings_bp.route("/save_all_settings", methods=["POST"]) 

198@login_required 

199@settings_limit 

200@require_json_body( 

201 error_format="status", error_message="No settings data provided" 

202) 

203@with_user_session() 

204def save_all_settings(db_session=None, settings_manager=None): 

205 """Handle saving all settings at once from the unified settings page""" 

206 try: 

207 # Process JSON data 

208 form_data = request.get_json() 

209 if not form_data: 

210 return ( 

211 jsonify( 

212 { 

213 "status": "error", 

214 "message": "No settings data provided", 

215 } 

216 ), 

217 400, 

218 ) 

219 

220 # Track validation errors 

221 validation_errors = [] 

222 settings_by_type: dict[str, Any] = {} 

223 

224 # Track changes for logging 

225 updated_settings = [] 

226 created_settings = [] 

227 

228 # Store original values for better messaging 

229 original_values = {} 

230 

231 # Fetch all settings once to avoid N+1 query problem 

232 all_db_settings = { 

233 setting.key: setting for setting in db_session.query(Setting).all() 

234 } 

235 

236 # Filter out non-editable settings 

237 non_editable_keys = [ 

238 key 

239 for key in form_data.keys() 

240 if key in all_db_settings and not all_db_settings[key].editable 

241 ] 

242 if non_editable_keys: 

243 logger.warning( 

244 f"Skipping non-editable settings: {non_editable_keys}" 

245 ) 

246 for key in non_editable_keys: 

247 del form_data[key] 

248 

249 # Update each setting 

250 for key, value in form_data.items(): 

251 # Skip corrupted keys or empty strings as keys 

252 if not key or not isinstance(key, str) or key.strip() == "": 

253 continue 

254 

255 # Get the setting metadata from pre-fetched dict 

256 current_setting = all_db_settings.get(key) 

257 

258 # EARLY VALIDATION: Convert checkbox values BEFORE any other processing 

259 # This prevents incorrect triggering of corrupted value detection 

260 if current_setting and current_setting.ui_element == "checkbox": 

261 if not isinstance(value, bool): 

262 logger.debug( 

263 f"Converting checkbox {key} from {type(value).__name__} to bool: {value}" 

264 ) 

265 value = parse_boolean(value) 

266 form_data[key] = ( 

267 value # Update the form_data with converted value 

268 ) 

269 

270 # Store original value for messaging 

271 if current_setting: 

272 original_values[key] = current_setting.value 

273 

274 # Determine setting type and category 

275 if key.startswith("llm."): 

276 setting_type = SettingType.LLM 

277 category = "llm_general" 

278 if ( 

279 "temperature" in key 

280 or "max_tokens" in key 

281 or "batch" in key 

282 or "layers" in key 

283 ): 

284 category = "llm_parameters" 

285 elif key.startswith("search."): 

286 setting_type = SettingType.SEARCH 

287 category = "search_general" 

288 if ( 

289 "iterations" in key 

290 or "results" in key 

291 or "region" in key 

292 or "questions" in key 

293 or "section" in key 

294 ): 

295 category = "search_parameters" 

296 elif key.startswith("report."): 

297 setting_type = SettingType.REPORT 

298 category = "report_parameters" 

299 elif key.startswith("database."): 

300 setting_type = SettingType.DATABASE 

301 category = "database_parameters" 

302 elif key.startswith("app."): 

303 setting_type = SettingType.APP 

304 category = "app_interface" 

305 else: 

306 setting_type = None 

307 category = None 

308 

309 # Special handling for corrupted or empty values 

310 if value == "[object Object]" or ( 

311 isinstance(value, str) 

312 and value.strip() in ["{}", "[]", "{", "["] 

313 ): 

314 if key.startswith("report."): 

315 value = {} 

316 else: 

317 # Use default or null for other types 

318 if key == "llm.model": 

319 value = "gemma3:12b" 

320 elif key == "llm.provider": 

321 value = "ollama" 

322 elif key == "search.tool": 

323 value = "auto" 

324 elif key in ["app.theme", "app.default_theme"]: 

325 value = "dark" 

326 else: 

327 value = None 

328 

329 logger.warning(f"Corrected corrupted value for {key}: {value}") 

330 # NOTE: No JSON pre-parsing is done here. After the 

331 # corruption replacement above, values are Python dicts 

332 # (e.g. {}), hardcoded strings, or None — none are JSON 

333 # strings that need parsing. Type conversion below via 

334 # coerce_setting_for_write() handles everything; that 

335 # function delegates to get_typed_setting_value() which 

336 # already parses JSON internally for "json" and 

337 # "multiselect" ui_elements. 

338 

339 if current_setting: 

340 # Coerce to correct Python type (e.g. str "5" → int 5 

341 # for number settings, str "true" → bool for checkboxes). 

342 converted_value = coerce_setting_for_write( 

343 key=current_setting.key, 

344 value=value, 

345 ui_element=current_setting.ui_element, 

346 ) 

347 

348 # Validate the setting 

349 is_valid, error_message = validate_setting( 

350 current_setting, converted_value 

351 ) 

352 

353 if is_valid: 

354 # Save the converted setting using the same session 

355 success = set_setting( 

356 key, converted_value, db_session=db_session 

357 ) 

358 if success: 358 ↛ 362line 358 didn't jump to line 362 because the condition on line 358 was always true

359 updated_settings.append(key) 

360 

361 # Track settings by type for exporting 

362 if current_setting.type not in settings_by_type: 

363 settings_by_type[current_setting.type] = [] 

364 settings_by_type[current_setting.type].append( 

365 current_setting 

366 ) 

367 else: 

368 # Add to validation errors 

369 validation_errors.append( 

370 { 

371 "key": key, 

372 "name": current_setting.name, 

373 "error": error_message, 

374 } 

375 ) 

376 else: 

377 # Create a new setting 

378 new_setting = { 

379 "key": key, 

380 "value": value, 

381 "type": setting_type.value.lower() 

382 if setting_type is not None 

383 else "app", 

384 "name": key.split(".")[-1].replace("_", " ").title(), 

385 "description": f"Setting for {key}", 

386 "category": category, 

387 "ui_element": "text", # Default UI element 

388 } 

389 

390 # Determine better UI element based on value type 

391 if isinstance(value, bool): 

392 new_setting["ui_element"] = "checkbox" 

393 elif isinstance(value, (int, float)) and not isinstance( 

394 value, bool 

395 ): 

396 new_setting["ui_element"] = "number" 

397 elif isinstance(value, (dict, list)): 

398 new_setting["ui_element"] = "textarea" 

399 

400 # Create the setting 

401 db_setting = create_or_update_setting( 

402 new_setting, db_session=db_session 

403 ) 

404 

405 if db_setting: 

406 created_settings.append(key) 

407 # Track settings by type for exporting 

408 if db_setting.type not in settings_by_type: 408 ↛ 410line 408 didn't jump to line 410 because the condition on line 408 was always true

409 settings_by_type[db_setting.type] = [] 

410 settings_by_type[db_setting.type].append(db_setting) 

411 else: 

412 validation_errors.append( 

413 { 

414 "key": key, 

415 "name": new_setting["name"], 

416 "error": "Failed to create setting", 

417 } 

418 ) 

419 

420 # Report validation errors if any 

421 if validation_errors: 

422 return ( 

423 jsonify( 

424 { 

425 "status": "error", 

426 "message": "Validation errors", 

427 "errors": validation_errors, 

428 } 

429 ), 

430 400, 

431 ) 

432 

433 # Get all settings to return to the client for proper state update 

434 all_settings = {} 

435 for setting in db_session.query(Setting).all(): 

436 # Convert enum to string if present 

437 setting_type = setting.type 

438 if hasattr(setting_type, "value"): 

439 setting_type = setting_type.value 

440 

441 all_settings[setting.key] = { 

442 "value": setting.value, 

443 "name": setting.name, 

444 "description": setting.description, 

445 "type": setting_type, 

446 "category": setting.category, 

447 "ui_element": setting.ui_element, 

448 "editable": setting.editable, 

449 "options": setting.options, 

450 "visible": setting.visible, 

451 "min_value": setting.min_value, 

452 "max_value": setting.max_value, 

453 "step": setting.step, 

454 } 

455 

456 # Customize the success message based on what changed 

457 success_message = "" 

458 if len(updated_settings) == 1: 

459 # For a single update, provide more specific info about what changed 

460 key = updated_settings[0] 

461 # Reuse the already-fetched setting from our pre-fetched dict 

462 updated_setting = all_db_settings.get(key) 

463 name = ( 

464 updated_setting.name 

465 if updated_setting 

466 else key.split(".")[-1].replace("_", " ").title() 

467 ) 

468 

469 # Format the message 

470 if key in original_values: 470 ↛ 484line 470 didn't jump to line 484 because the condition on line 470 was always true

471 new_value = updated_setting.value if updated_setting else None 

472 

473 # If it's a boolean, use "enabled/disabled" language 

474 if isinstance(new_value, bool): 

475 state = "enabled" if new_value else "disabled" 

476 success_message = f"{name} {state}" 

477 else: 

478 # For non-boolean values 

479 if isinstance(new_value, (dict, list)): 

480 success_message = f"{name} updated" 

481 else: 

482 success_message = f"{name} updated" 

483 else: 

484 success_message = f"{name} updated" 

485 else: 

486 # Multiple settings or generic message 

487 success_message = f"Settings saved successfully ({len(updated_settings)} updated, {len(created_settings)} created)" 

488 

489 # Check if any warning-affecting settings were changed and include warnings 

490 response_data = { 

491 "status": "success", 

492 "message": success_message, 

493 "updated": updated_settings, 

494 "created": created_settings, 

495 "settings": all_settings, 

496 } 

497 

498 warning_affecting_keys = [ 

499 "llm.provider", 

500 "search.tool", 

501 "search.iterations", 

502 "search.questions_per_iteration", 

503 "llm.local_context_window_size", 

504 "llm.context_window_unrestricted", 

505 "llm.context_window_size", 

506 ] 

507 

508 # Check if any warning-affecting settings were changed 

509 if any( 

510 key in warning_affecting_keys 

511 for key in updated_settings + created_settings 

512 ): 

513 warnings = calculate_warnings() 

514 response_data["warnings"] = warnings 

515 logger.info( 

516 f"Bulk settings update affected warning keys, calculated {len(warnings)} warnings" 

517 ) 

518 

519 invalidate_settings_caches(session["username"]) 

520 return jsonify(response_data) 

521 

522 except Exception: 

523 logger.exception("Error saving settings") 

524 return ( 

525 jsonify( 

526 { 

527 "status": "error", 

528 "message": "An internal error occurred while saving settings.", 

529 } 

530 ), 

531 500, 

532 ) 

533 

534 

535@settings_bp.route("/reset_to_defaults", methods=["POST"]) 

536@login_required 

537@settings_limit 

538@with_user_session() 

539def reset_to_defaults(db_session=None, settings_manager=None): 

540 """Reset all settings to their default values""" 

541 try: 

542 settings_manager.load_from_defaults_file() 

543 

544 logger.info("Successfully imported settings from default files") 

545 

546 except Exception: 

547 logger.exception("Error importing default settings") 

548 return jsonify( 

549 { 

550 "status": "error", 

551 "message": "Failed to reset settings to defaults", 

552 } 

553 ), 500 

554 

555 invalidate_settings_caches(session["username"]) 

556 return jsonify( 

557 { 

558 "status": "success", 

559 "message": "All settings have been reset to default values", 

560 } 

561 ) 

562 

563 

564@settings_bp.route("/save_settings", methods=["POST"]) 

565@login_required 

566@settings_limit 

567@with_user_session() 

568def save_settings(db_session=None, settings_manager=None): 

569 """Save all settings from the form using POST method - fallback when JavaScript is disabled""" 

570 try: 

571 # Get form data 

572 form_data = request.form.to_dict() 

573 

574 # Remove CSRF token from the data 

575 form_data.pop("csrf_token", None) 

576 

577 updated_count = 0 

578 failed_count = 0 

579 

580 # Fetch all settings once to avoid N+1 query problem 

581 all_db_settings = { 

582 setting.key: setting for setting in db_session.query(Setting).all() 

583 } 

584 

585 # Filter out non-editable settings 

586 non_editable_keys = [ 

587 key 

588 for key in form_data.keys() 

589 if key in all_db_settings and not all_db_settings[key].editable 

590 ] 

591 if non_editable_keys: 

592 logger.warning( 

593 f"Skipping non-editable settings: {non_editable_keys}" 

594 ) 

595 for key in non_editable_keys: 

596 del form_data[key] 

597 

598 # Process each setting 

599 for key, value in form_data.items(): 

600 try: 

601 # Get the setting from pre-fetched dict 

602 db_setting = all_db_settings.get(key) 

603 

604 # Coerce form POST string to correct Python type. 

605 if db_setting: 

606 value = coerce_setting_for_write( 

607 key=db_setting.key, 

608 value=value, 

609 ui_element=db_setting.ui_element, 

610 ) 

611 

612 # Save the setting 

613 if settings_manager.set_setting(key, value, commit=False): 

614 updated_count += 1 

615 else: 

616 failed_count += 1 

617 logger.warning(f"Failed to save setting {key}") 

618 

619 except Exception: 

620 logger.exception(f"Error saving setting {key}") 

621 failed_count += 1 

622 

623 # Commit all changes at once 

624 try: 

625 db_session.commit() 

626 

627 flash( 

628 f"Settings saved successfully! Updated {updated_count} settings.", 

629 "success", 

630 ) 

631 if failed_count > 0: 

632 flash( 

633 f"Warning: {failed_count} settings failed to save.", 

634 "warning", 

635 ) 

636 invalidate_settings_caches(session["username"]) 

637 

638 except Exception: 

639 db_session.rollback() 

640 logger.exception("Failed to commit settings") 

641 flash("Error saving settings. Please try again.", "error") 

642 

643 return redirect(url_for("settings.settings_page")) 

644 

645 except Exception: 

646 logger.exception("Error in save_settings") 

647 flash("An internal error occurred while saving settings.", "error") 

648 return redirect(url_for("settings.settings_page")) 

649 

650 

651# API Routes 

652@settings_bp.route("/api", methods=["GET"]) 

653@login_required 

654@with_user_session() 

655def api_get_all_settings(db_session=None, settings_manager=None): 

656 """Get all settings""" 

657 try: 

658 # Get query parameters 

659 category = request.args.get("category") 

660 

661 # Get settings 

662 settings = settings_manager.get_all_settings() 

663 

664 # Filter by category if requested 

665 if category: 

666 # Need to get all setting details to check category 

667 db_settings = db_session.query(Setting).all() 

668 category_keys = [ 

669 s.key for s in db_settings if s.category == category 

670 ] 

671 

672 # Filter settings by keys 

673 settings = { 

674 key: value 

675 for key, value in settings.items() 

676 if key in category_keys 

677 } 

678 

679 return jsonify({"status": "success", "settings": settings}) 

680 except Exception: 

681 logger.exception("Error getting settings") 

682 return jsonify({"error": "Failed to retrieve settings"}), 500 

683 

684 

685@settings_bp.route("/api/<path:key>", methods=["GET"]) 

686@login_required 

687@with_user_session() 

688def api_get_db_setting(key, db_session=None, settings_manager=None): 

689 """Get a specific setting by key from DB, falling back to defaults.""" 

690 try: 

691 # Get setting from database using the same session 

692 db_setting = ( 

693 db_session.query(Setting).filter(Setting.key == key).first() 

694 ) 

695 

696 if db_setting: 

697 # Return full setting details from DB 

698 setting_data = { 

699 "key": db_setting.key, 

700 "value": db_setting.value, 

701 "type": db_setting.type 

702 if isinstance(db_setting.type, str) 

703 else db_setting.type.value, 

704 "name": db_setting.name, 

705 "description": db_setting.description, 

706 "category": db_setting.category, 

707 "ui_element": db_setting.ui_element, 

708 "options": db_setting.options, 

709 "min_value": db_setting.min_value, 

710 "max_value": db_setting.max_value, 

711 "step": db_setting.step, 

712 "visible": db_setting.visible, 

713 "editable": db_setting.editable, 

714 } 

715 return jsonify(setting_data) 

716 

717 # Not in DB — check defaults so this endpoint is consistent 

718 # with GET /settings/api which includes default settings 

719 default_meta = settings_manager.default_settings.get(key) 

720 if default_meta: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 setting_data = { 

722 "key": key, 

723 "value": default_meta.get("value"), 

724 "type": default_meta.get("type", "APP"), 

725 "name": default_meta.get("name", key), 

726 "description": default_meta.get("description"), 

727 "category": default_meta.get("category"), 

728 "ui_element": default_meta.get("ui_element", "text"), 

729 "options": default_meta.get("options"), 

730 "min_value": default_meta.get("min_value"), 

731 "max_value": default_meta.get("max_value"), 

732 "step": default_meta.get("step"), 

733 "visible": default_meta.get("visible", True), 

734 "editable": default_meta.get("editable", True), 

735 } 

736 return jsonify(setting_data) 

737 

738 return jsonify({"error": f"Setting not found: {key}"}), 404 

739 except Exception: 

740 logger.exception(f"Error getting setting {key}") 

741 return jsonify({"error": "Failed to retrieve settings"}), 500 

742 

743 

744@settings_bp.route("/api/<path:key>", methods=["PUT"]) 

745@login_required 

746@require_json_body(error_message="No data provided") 

747@with_user_session(include_settings_manager=False) 

748def api_update_setting(key, db_session=None): 

749 """Update a setting""" 

750 try: 

751 # Get request data 

752 data = request.get_json() 

753 value = data.get("value") 

754 if value is None: 

755 return jsonify({"error": "No value provided"}), 400 

756 

757 # Check if setting exists 

758 db_setting = ( 

759 db_session.query(Setting).filter(Setting.key == key).first() 

760 ) 

761 

762 if db_setting: 

763 # Check if setting is editable 

764 if not db_setting.editable: 

765 return jsonify({"error": f"Setting {key} is not editable"}), 403 

766 

767 # Coerce to correct Python type before saving. 

768 # Without this, values from JSON API requests are stored 

769 # as-is (e.g. string "5" instead of int 5 for number 

770 # settings, string "true" instead of bool for checkboxes). 

771 value = coerce_setting_for_write( 

772 key=db_setting.key, 

773 value=value, 

774 ui_element=db_setting.ui_element, 

775 ) 

776 

777 # Validate the setting (matches save_all_settings pattern) 

778 is_valid, error_message = validate_setting(db_setting, value) 

779 if not is_valid: 

780 logger.warning( 

781 f"Validation failed for setting {key}: {error_message}" 

782 ) 

783 return jsonify( 

784 {"error": f"Invalid value for setting {key}"} 

785 ), 400 

786 

787 # Update setting 

788 # Pass the db_session to avoid session lookup issues 

789 success = set_setting(key, value, db_session=db_session) 

790 if success: 

791 response_data: dict[str, Any] = { 

792 "message": f"Setting {key} updated successfully" 

793 } 

794 

795 # If this is a key that affects warnings, include warning calculations 

796 warning_affecting_keys = [ 

797 "llm.provider", 

798 "search.tool", 

799 "search.iterations", 

800 "search.questions_per_iteration", 

801 "llm.local_context_window_size", 

802 "llm.context_window_unrestricted", 

803 "llm.context_window_size", 

804 ] 

805 

806 if key in warning_affecting_keys: 

807 warnings = calculate_warnings() 

808 response_data["warnings"] = warnings 

809 logger.debug( 

810 f"Setting {key} changed to {value}, calculated {len(warnings)} warnings" 

811 ) 

812 

813 invalidate_settings_caches(session["username"]) 

814 return jsonify(response_data) 

815 return jsonify({"error": f"Failed to update setting {key}"}), 500 

816 # Create new setting with default metadata 

817 setting_dict = { 

818 "key": key, 

819 "value": value, 

820 "name": key.split(".")[-1].replace("_", " ").title(), 

821 "description": f"Setting for {key}", 

822 } 

823 

824 # Add additional metadata if provided 

825 for field in [ 

826 "type", 

827 "name", 

828 "description", 

829 "category", 

830 "ui_element", 

831 "options", 

832 "min_value", 

833 "max_value", 

834 "step", 

835 "visible", 

836 "editable", 

837 ]: 

838 if field in data: 

839 setting_dict[field] = data[field] 

840 

841 # Create setting 

842 db_setting = create_or_update_setting( 

843 setting_dict, db_session=db_session 

844 ) 

845 

846 if db_setting: 

847 invalidate_settings_caches(session["username"]) 

848 return ( 

849 jsonify( 

850 { 

851 "message": f"Setting {key} created successfully", 

852 "setting": { 

853 "key": db_setting.key, 

854 "value": db_setting.value, 

855 "type": db_setting.type.value, 

856 "name": db_setting.name, 

857 }, 

858 } 

859 ), 

860 201, 

861 ) 

862 return jsonify({"error": f"Failed to create setting {key}"}), 500 

863 except Exception: 

864 logger.exception(f"Error updating setting {key}") 

865 return jsonify({"error": "Failed to update setting"}), 500 

866 

867 

868@settings_bp.route("/api/<path:key>", methods=["DELETE"]) 

869@login_required 

870@with_user_session() 

871def api_delete_setting(key, db_session=None, settings_manager=None): 

872 """Delete a setting""" 

873 try: 

874 # Check if setting exists 

875 db_setting = ( 

876 db_session.query(Setting).filter(Setting.key == key).first() 

877 ) 

878 if not db_setting: 

879 return jsonify({"error": f"Setting not found: {key}"}), 404 

880 

881 # Check if setting is editable 

882 if not db_setting.editable: 

883 return jsonify({"error": f"Setting {key} is not editable"}), 403 

884 

885 # Delete setting 

886 success = settings_manager.delete_setting(key) 

887 if success: 

888 invalidate_settings_caches(session["username"]) 

889 return jsonify({"message": f"Setting {key} deleted successfully"}) 

890 return jsonify({"error": f"Failed to delete setting {key}"}), 500 

891 except Exception: 

892 logger.exception(f"Error deleting setting {key}") 

893 return jsonify({"error": "Failed to delete setting"}), 500 

894 

895 

896@settings_bp.route("/api/import", methods=["POST"]) 

897@login_required 

898@settings_limit 

899@with_user_session() 

900def api_import_settings(db_session=None, settings_manager=None): 

901 """Import settings from defaults file""" 

902 try: 

903 settings_manager.load_from_defaults_file() 

904 

905 invalidate_settings_caches(session["username"]) 

906 return jsonify({"message": "Settings imported successfully"}) 

907 except Exception: 

908 logger.exception("Error importing settings") 

909 return jsonify({"error": "Failed to import settings"}), 500 

910 

911 

912@settings_bp.route("/api/categories", methods=["GET"]) 

913@login_required 

914@with_user_session(include_settings_manager=False) 

915def api_get_categories(db_session=None): 

916 """Get all setting categories""" 

917 try: 

918 # Get all distinct categories 

919 categories = db_session.query(Setting.category).distinct().all() 

920 category_list = [c[0] for c in categories if c[0] is not None] 

921 

922 return jsonify({"categories": category_list}) 

923 except Exception: 

924 logger.exception("Error getting categories") 

925 return jsonify({"error": "Failed to retrieve settings"}), 500 

926 

927 

928@settings_bp.route("/api/types", methods=["GET"]) 

929@login_required 

930def api_get_types(): 

931 """Get all setting types""" 

932 try: 

933 # Get all setting types 

934 types = [t.value for t in SettingType] 

935 return jsonify({"types": types}) 

936 except Exception: 

937 logger.exception("Error getting types") 

938 return jsonify({"error": "Failed to retrieve settings"}), 500 

939 

940 

941@settings_bp.route("/api/ui_elements", methods=["GET"]) 

942@login_required 

943def api_get_ui_elements(): 

944 """Get all UI element types""" 

945 try: 

946 # Define supported UI element types 

947 ui_elements = [ 

948 "text", 

949 "select", 

950 "checkbox", 

951 "slider", 

952 "number", 

953 "textarea", 

954 "color", 

955 "date", 

956 "file", 

957 "password", 

958 ] 

959 

960 return jsonify({"ui_elements": ui_elements}) 

961 except Exception: 

962 logger.exception("Error getting UI elements") 

963 return jsonify({"error": "Failed to retrieve settings"}), 500 

964 

965 

966@settings_bp.route("/api/available-models", methods=["GET"]) 

967@login_required 

968def api_get_available_models(): 

969 """Get available LLM models from various providers""" 

970 try: 

971 from flask import request 

972 

973 from ...database.models import ProviderModel 

974 

975 # Check if force_refresh is requested 

976 force_refresh = ( 

977 request.args.get("force_refresh", "false").lower() == "true" 

978 ) 

979 

980 # Get all auto-discovered providers (show all so users can discover 

981 # and configure providers they haven't set up yet) 

982 from ...llm.providers import get_discovered_provider_options 

983 

984 provider_options = get_discovered_provider_options() 

985 

986 # Add remaining hardcoded providers (complex local providers not yet migrated) 

987 provider_options.extend( 

988 [ 

989 { 

990 "value": "LLAMACPP", 

991 "label": "Llama.cpp (Local GGUF files only)", 

992 }, 

993 ] 

994 ) 

995 

996 # Available models by provider 

997 providers: dict[str, Any] = {} 

998 

999 # Check database cache first (unless force_refresh is True) 

1000 if not force_refresh: 

1001 try: 

1002 # Define cache expiration (24 hours) 

1003 cache_expiry = datetime.now(UTC) - timedelta(hours=24) 

1004 

1005 # Get cached models from database 

1006 username = session["username"] 

1007 with get_user_db_session(username) as db_session: 

1008 cached_models = ( 

1009 db_session.query(ProviderModel) 

1010 .filter(ProviderModel.last_updated > cache_expiry) 

1011 .all() 

1012 ) 

1013 

1014 if cached_models: 1014 ↛ 1015line 1014 didn't jump to line 1015 because the condition on line 1014 was never true

1015 logger.info( 

1016 f"Found {len(cached_models)} cached models in database" 

1017 ) 

1018 

1019 # Group models by provider 

1020 for model in cached_models: 

1021 provider_key = ( 

1022 f"{normalize_provider(model.provider)}_models" 

1023 ) 

1024 if provider_key not in providers: 

1025 providers[provider_key] = [] 

1026 

1027 providers[provider_key].append( 

1028 { 

1029 "value": model.model_key, 

1030 "label": model.model_label, 

1031 "provider": model.provider.upper(), 

1032 } 

1033 ) 

1034 

1035 # If we have cached data for all providers, return it 

1036 if providers: 

1037 logger.info("Returning cached models from database") 

1038 return jsonify( 

1039 { 

1040 "provider_options": provider_options, 

1041 "providers": providers, 

1042 } 

1043 ) 

1044 

1045 except Exception: 

1046 logger.warning("Error reading cached models from database") 

1047 # Continue to fetch fresh data 

1048 

1049 # Try to get Ollama models 

1050 ollama_models = [] 

1051 try: 

1052 import json 

1053 import re 

1054 

1055 import requests 

1056 

1057 # Try to query the Ollama API directly 

1058 try: 

1059 logger.info("Attempting to connect to Ollama API") 

1060 

1061 raw_base_url = _get_setting_from_session( 

1062 "llm.ollama.url", DEFAULT_OLLAMA_URL 

1063 ) 

1064 base_url = ( 

1065 normalize_url(raw_base_url) 

1066 if raw_base_url 

1067 else DEFAULT_OLLAMA_URL 

1068 ) 

1069 

1070 ollama_response = safe_get( 

1071 f"{base_url}/api/tags", 

1072 timeout=5, 

1073 allow_localhost=True, 

1074 allow_private_ips=True, 

1075 ) 

1076 

1077 logger.debug( 

1078 f"Ollama API response: Status {ollama_response.status_code}" 

1079 ) 

1080 

1081 # Try to parse the response even if status code is not 200 to help with debugging 

1082 response_text = ollama_response.text 

1083 logger.debug( 

1084 f"Ollama API raw response: {response_text[:500]}..." 

1085 ) 

1086 

1087 if ollama_response.status_code == 200: 1087 ↛ 1157line 1087 didn't jump to line 1157 because the condition on line 1087 was always true

1088 try: 

1089 ollama_data = ollama_response.json() 

1090 logger.debug( 

1091 f"Ollama API JSON data: {json.dumps(ollama_data)[:500]}..." 

1092 ) 

1093 

1094 if "models" in ollama_data: 1094 ↛ 1124line 1094 didn't jump to line 1124 because the condition on line 1094 was always true

1095 # Format for newer Ollama API 

1096 logger.info( 

1097 f"Found {len(ollama_data.get('models', []))} models in newer Ollama API format" 

1098 ) 

1099 for model in ollama_data.get("models", []): 1099 ↛ 1101line 1099 didn't jump to line 1101 because the loop on line 1099 never started

1100 # Extract name correctly from the model object 

1101 name = model.get("name", "") 

1102 if name: 

1103 # Improved display name formatting 

1104 display_name = re.sub( 

1105 r"[:/]", " ", name 

1106 ).strip() 

1107 display_name = " ".join( 

1108 word.capitalize() 

1109 for word in display_name.split() 

1110 ) 

1111 # Create the model entry with value and label 

1112 ollama_models.append( 

1113 { 

1114 "value": name, # Original model name as value (for API calls) 

1115 "label": f"{display_name} (Ollama)", # Pretty name as label 

1116 "provider": "ollama", # Add provider field for consistency 

1117 } 

1118 ) 

1119 logger.debug( 

1120 f"Added Ollama model: {name} -> {display_name}" 

1121 ) 

1122 else: 

1123 # Format for older Ollama API 

1124 logger.info( 

1125 f"Found {len(ollama_data)} models in older Ollama API format" 

1126 ) 

1127 for model in ollama_data: 

1128 name = model.get("name", "") 

1129 if name: 

1130 # Improved display name formatting 

1131 display_name = re.sub( 

1132 r"[:/]", " ", name 

1133 ).strip() 

1134 display_name = " ".join( 

1135 word.capitalize() 

1136 for word in display_name.split() 

1137 ) 

1138 ollama_models.append( 

1139 { 

1140 "value": name, 

1141 "label": f"{display_name} (Ollama)", 

1142 "provider": "ollama", # Add provider field for consistency 

1143 } 

1144 ) 

1145 logger.debug( 

1146 f"Added Ollama model: {name} -> {display_name}" 

1147 ) 

1148 

1149 except json.JSONDecodeError as json_err: 

1150 logger.exception( 

1151 f"Failed to parse Ollama API response as JSON: {json_err}" 

1152 ) 

1153 raise ValueError( 

1154 f"Ollama API returned invalid JSON: {json_err}" 

1155 ) 

1156 else: 

1157 logger.warning( 

1158 f"Ollama API returned non-200 status code: {ollama_response.status_code}" 

1159 ) 

1160 raise ValueError( 

1161 f"Ollama API returned status code {ollama_response.status_code}" 

1162 ) 

1163 

1164 except requests.exceptions.RequestException: 

1165 logger.warning("Could not connect to Ollama API") 

1166 # No fallback models - just return empty list 

1167 logger.info("Ollama not available - no models to display") 

1168 ollama_models = [] 

1169 

1170 # Always set the ollama_models in providers, whether we got real or fallback models 

1171 providers["ollama_models"] = ollama_models 

1172 logger.info(f"Final Ollama models count: {len(ollama_models)}") 

1173 

1174 # Log some model names for debugging 

1175 if ollama_models: 1175 ↛ 1176line 1175 didn't jump to line 1176 because the condition on line 1175 was never true

1176 model_names = [m["value"] for m in ollama_models[:5]] 

1177 logger.info(f"Sample Ollama models: {', '.join(model_names)}") 

1178 

1179 except Exception: 

1180 logger.exception("Error getting Ollama models") 

1181 # No fallback models - just return empty list 

1182 logger.info("Error getting Ollama models - no models to display") 

1183 providers["ollama_models"] = [] 

1184 

1185 # Note: OpenAI-Compatible Endpoint models are fetched via auto-discovery 

1186 # (see the auto-discovery loop below which handles OPENAI_ENDPOINT provider) 

1187 

1188 # Get OpenAI models using the OpenAI package 

1189 openai_models = [] 

1190 try: 

1191 logger.info( 

1192 "Attempting to connect to OpenAI API using OpenAI package" 

1193 ) 

1194 

1195 # Get the API key from settings 

1196 api_key = _get_setting_from_session("llm.openai.api_key", "") 

1197 

1198 if api_key: 1198 ↛ 1199line 1198 didn't jump to line 1199 because the condition on line 1198 was never true

1199 import openai 

1200 from openai import OpenAI 

1201 

1202 # Create OpenAI client 

1203 client = OpenAI(api_key=api_key) 

1204 

1205 try: 

1206 # Fetch models using the client 

1207 logger.debug("Fetching models from OpenAI API") 

1208 models_response = client.models.list() 

1209 

1210 # Process models from the response 

1211 for model in models_response.data: 

1212 model_id = model.id 

1213 if model_id: 

1214 # Create a clean display name 

1215 display_name = model_id.replace("-", " ").strip() 

1216 display_name = " ".join( 

1217 word.capitalize() 

1218 for word in display_name.split() 

1219 ) 

1220 

1221 openai_models.append( 

1222 { 

1223 "value": model_id, 

1224 "label": f"{display_name} (OpenAI)", 

1225 "provider": "openai", 

1226 } 

1227 ) 

1228 logger.debug( 

1229 f"Added OpenAI model: {model_id} -> {display_name}" 

1230 ) 

1231 

1232 # Keep original order from OpenAI - their models are returned in a 

1233 # meaningful order (newer/more capable models first) 

1234 

1235 except openai.APIError as api_err: 

1236 logger.exception(f"OpenAI API error: {api_err!s}") 

1237 logger.info("No OpenAI models found due to API error") 

1238 

1239 else: 

1240 logger.info( 

1241 "OpenAI API key not configured, no models available" 

1242 ) 

1243 

1244 except Exception: 

1245 logger.exception("Error getting OpenAI models") 

1246 logger.info("No OpenAI models available due to error") 

1247 

1248 # Always set the openai_models in providers (will be empty array if no models found) 

1249 providers["openai_models"] = openai_models 

1250 logger.info(f"Final OpenAI models count: {len(openai_models)}") 

1251 

1252 # Try to get Anthropic models using the Anthropic package 

1253 anthropic_models = [] 

1254 try: 

1255 logger.info( 

1256 "Attempting to connect to Anthropic API using Anthropic package" 

1257 ) 

1258 

1259 # Get the API key from settings 

1260 api_key = _get_setting_from_session("llm.anthropic.api_key", "") 

1261 

1262 if api_key: 

1263 # Import Anthropic package here to avoid dependency issues if not installed 

1264 from anthropic import Anthropic 

1265 

1266 # Create Anthropic client 

1267 anthropic_client = Anthropic(api_key=api_key) 

1268 

1269 try: 

1270 # Fetch models using the client 

1271 logger.debug("Fetching models from Anthropic API") 

1272 models_response = anthropic_client.models.list() 

1273 

1274 # Process models from the response 

1275 for model in models_response.data: 

1276 model_id = model.id 

1277 if model_id: 

1278 # Create a clean display name 

1279 display_name = model_id.replace("-", " ").strip() 

1280 display_name = " ".join( 

1281 word.capitalize() 

1282 for word in display_name.split() 

1283 ) 

1284 

1285 anthropic_models.append( 

1286 { 

1287 "value": model_id, 

1288 "label": f"{display_name} (Anthropic)", 

1289 "provider": "anthropic", 

1290 } 

1291 ) 

1292 logger.debug( 

1293 f"Added Anthropic model: {model_id} -> {display_name}" 

1294 ) 

1295 

1296 except Exception as api_err: 

1297 logger.exception(f"Anthropic API error: {api_err!s}") 

1298 else: 

1299 logger.info("Anthropic API key not configured") 

1300 

1301 except ImportError: 

1302 logger.warning( 

1303 "Anthropic package not installed. No models will be available." 

1304 ) 

1305 except Exception: 

1306 logger.exception("Error getting Anthropic models") 

1307 

1308 # Set anthropic_models in providers (could be empty if API call failed) 

1309 providers["anthropic_models"] = anthropic_models 

1310 logger.info(f"Final Anthropic models count: {len(anthropic_models)}") 

1311 

1312 # Fetch models from auto-discovered providers 

1313 from ...llm.providers import discover_providers 

1314 

1315 discovered_providers = discover_providers() 

1316 

1317 for provider_key, provider_info in discovered_providers.items(): 

1318 provider_models = [] 

1319 try: 

1320 logger.info( 

1321 f"Fetching models from {provider_info.provider_name}" 

1322 ) 

1323 

1324 # Get the provider class 

1325 provider_class = provider_info.provider_class 

1326 

1327 # Get API key if configured 

1328 api_key = _get_setting_from_session( 

1329 provider_class.api_key_setting, "" 

1330 ) 

1331 

1332 # Get base URL if provider has configurable URL 

1333 provider_base_url: str | None = None 

1334 if ( 

1335 hasattr(provider_class, "url_setting") 

1336 and provider_class.url_setting 

1337 ): 

1338 provider_base_url = _get_setting_from_session( 

1339 provider_class.url_setting, "" 

1340 ) 

1341 

1342 # Use the provider's list_models_for_api method 

1343 models = provider_class.list_models_for_api( 

1344 api_key, provider_base_url 

1345 ) 

1346 

1347 # Format models for the API response 

1348 for model in models: 

1349 provider_models.append( 

1350 { 

1351 "value": model["value"], 

1352 "label": model[ 

1353 "label" 

1354 ], # Use provider's label as-is 

1355 "provider": provider_key, 

1356 } 

1357 ) 

1358 

1359 logger.info( 

1360 f"Successfully fetched {len(provider_models)} models from {provider_info.provider_name}" 

1361 ) 

1362 

1363 except Exception: 

1364 logger.exception( 

1365 f"Error getting {provider_info.provider_name} models" 

1366 ) 

1367 

1368 # Set models in providers dict using lowercase key 

1369 providers[f"{normalize_provider(provider_key)}_models"] = ( 

1370 provider_models 

1371 ) 

1372 logger.info( 

1373 f"Final {provider_key} models count: {len(provider_models)}" 

1374 ) 

1375 

1376 # Save fetched models to database cache 

1377 if force_refresh or providers: 1377 ↛ 1426line 1377 didn't jump to line 1426 because the condition on line 1377 was always true

1378 # We fetched fresh data, save it to database 

1379 username = session["username"] 

1380 with get_user_db_session(username) as db_session: 

1381 try: 

1382 if force_refresh: 

1383 # When force refresh, clear ALL cached models to remove any stale data 

1384 # from old code versions or deleted providers 

1385 deleted_count = db_session.query(ProviderModel).delete() 

1386 logger.info( 

1387 f"Force refresh: cleared all {deleted_count} cached models" 

1388 ) 

1389 else: 

1390 # Clear old cache entries only for providers we're updating 

1391 for provider_key in providers: 

1392 provider_name = provider_key.replace( 

1393 "_models", "" 

1394 ).upper() 

1395 db_session.query(ProviderModel).filter( 

1396 ProviderModel.provider == provider_name 

1397 ).delete() 

1398 

1399 # Insert new models 

1400 for provider_key, models in providers.items(): 

1401 provider_name = provider_key.replace( 

1402 "_models", "" 

1403 ).upper() 

1404 for model in models: 

1405 if ( 1405 ↛ 1404line 1405 didn't jump to line 1404 because the condition on line 1405 was always true

1406 isinstance(model, dict) 

1407 and "value" in model 

1408 and "label" in model 

1409 ): 

1410 new_model = ProviderModel( 

1411 provider=provider_name, 

1412 model_key=model["value"], 

1413 model_label=model["label"], 

1414 last_updated=datetime.now(UTC), 

1415 ) 

1416 db_session.add(new_model) 

1417 

1418 db_session.commit() 

1419 logger.info("Successfully cached models to database") 

1420 

1421 except Exception: 

1422 logger.exception("Error saving models to database cache") 

1423 db_session.rollback() 

1424 

1425 # Return all options 

1426 return jsonify( 

1427 {"provider_options": provider_options, "providers": providers} 

1428 ) 

1429 

1430 except Exception: 

1431 logger.exception("Error getting available models") 

1432 return jsonify( 

1433 { 

1434 "status": "error", 

1435 "message": "Failed to retrieve available models", 

1436 } 

1437 ), 500 

1438 

1439 

1440def _get_engine_icon_and_category( 

1441 engine_data: dict, engine_class=None 

1442) -> tuple: 

1443 """ 

1444 Get icon emoji and category label for a search engine based on its attributes. 

1445 

1446 Args: 

1447 engine_data: Engine configuration dictionary 

1448 engine_class: Optional loaded engine class to check attributes 

1449 

1450 Returns: 

1451 Tuple of (icon, category) strings 

1452 """ 

1453 # Check attributes from either the class or the engine data 

1454 if engine_class: 

1455 is_scientific = getattr(engine_class, "is_scientific", False) 

1456 is_generic = getattr(engine_class, "is_generic", False) 

1457 is_local = getattr(engine_class, "is_local", False) 

1458 is_news = getattr(engine_class, "is_news", False) 

1459 is_code = getattr(engine_class, "is_code", False) 

1460 else: 

1461 is_scientific = engine_data.get("is_scientific", False) 

1462 is_generic = engine_data.get("is_generic", False) 

1463 is_local = engine_data.get("is_local", False) 

1464 is_news = engine_data.get("is_news", False) 

1465 is_code = engine_data.get("is_code", False) 

1466 

1467 # Check books attribute 

1468 if engine_class: 

1469 is_books = getattr(engine_class, "is_books", False) 

1470 else: 

1471 is_books = engine_data.get("is_books", False) 

1472 

1473 # Return icon and category based on engine type 

1474 # Priority: local > scientific > news > code > books > generic > default 

1475 if is_local: 

1476 return "📁", "Local RAG" 

1477 if is_scientific: 

1478 return "🔬", "Scientific" 

1479 if is_news: 

1480 return "📰", "News" 

1481 if is_code: 

1482 return "💻", "Code" 

1483 if is_books: 

1484 return "📚", "Books" 

1485 if is_generic: 

1486 return "🌐", "Web Search" 

1487 return "🔍", "Search" 

1488 

1489 

1490@settings_bp.route("/api/available-search-engines", methods=["GET"]) 

1491@login_required 

1492@with_user_session() 

1493def api_get_available_search_engines(db_session=None, settings_manager=None): 

1494 """Get available search engines""" 

1495 try: 

1496 # Get search engines using the same approach as search_engines_config.py 

1497 from ...web_search_engines.search_engines_config import search_config 

1498 

1499 username = session["username"] 

1500 search_engines = search_config(username=username, db_session=db_session) 

1501 

1502 # Get user's favorites using SettingsManager 

1503 favorites = settings_manager.get_setting("search.favorites", []) 

1504 if not isinstance(favorites, list): 1504 ↛ 1505line 1504 didn't jump to line 1505 because the condition on line 1504 was never true

1505 favorites = [] 

1506 

1507 # Extract search engines from config 

1508 engines_dict = {} 

1509 engine_options = [] 

1510 

1511 if search_engines: 1511 ↛ 1578line 1511 didn't jump to line 1578 because the condition on line 1511 was always true

1512 # Format engines for API response with metadata 

1513 from ...security.module_whitelist import ( 

1514 get_safe_module_class, 

1515 SecurityError, 

1516 ) 

1517 

1518 for engine_id, engine_data in search_engines.items(): 

1519 # Try to load the engine class to get metadata 

1520 engine_class = None 

1521 try: 

1522 module_path = engine_data.get("module_path") 

1523 class_name = engine_data.get("class_name") 

1524 if module_path and class_name: 1524 ↛ 1539line 1524 didn't jump to line 1539 because the condition on line 1524 was always true

1525 # Use secure whitelist-validated import 

1526 engine_class = get_safe_module_class( 

1527 module_path, class_name 

1528 ) 

1529 except SecurityError: 

1530 logger.warning( 

1531 f"Security: Blocked unsafe module for {engine_id}" 

1532 ) 

1533 except Exception as e: 

1534 logger.debug( 

1535 f"Could not load engine class for {engine_id}: {e}" 

1536 ) 

1537 

1538 # Get icon and category from engine attributes 

1539 icon, category = _get_engine_icon_and_category( 

1540 engine_data, engine_class 

1541 ) 

1542 

1543 # Check if engine requires an API key 

1544 requires_api_key = engine_data.get("requires_api_key", False) 

1545 

1546 # Build display name with icon, category, and API key status 

1547 base_name = engine_data.get("display_name", engine_id) 

1548 if requires_api_key: 

1549 label = f"{icon} {base_name} ({category}, API key)" 

1550 else: 

1551 label = f"{icon} {base_name} ({category}, Free)" 

1552 

1553 # Check if engine is a favorite 

1554 is_favorite = engine_id in favorites 

1555 

1556 engines_dict[engine_id] = { 

1557 "display_name": base_name, 

1558 "description": engine_data.get("description", ""), 

1559 "strengths": engine_data.get("strengths", []), 

1560 "icon": icon, 

1561 "category": category, 

1562 "requires_api_key": requires_api_key, 

1563 "is_favorite": is_favorite, 

1564 } 

1565 

1566 engine_options.append( 

1567 { 

1568 "value": engine_id, 

1569 "label": label, 

1570 "icon": icon, 

1571 "category": category, 

1572 "requires_api_key": requires_api_key, 

1573 "is_favorite": is_favorite, 

1574 } 

1575 ) 

1576 

1577 # Sort engine_options: favorites first, then alphabetically by label 

1578 engine_options.sort( 

1579 key=lambda x: ( 

1580 not x.get("is_favorite", False), 

1581 x.get("label", "").lower(), 

1582 ) 

1583 ) 

1584 

1585 # If no engines found, log the issue but return empty list 

1586 if not engine_options: 1586 ↛ 1587line 1586 didn't jump to line 1587 because the condition on line 1586 was never true

1587 logger.warning("No search engines found in configuration") 

1588 

1589 return jsonify( 

1590 { 

1591 "engines": engines_dict, 

1592 "engine_options": engine_options, 

1593 "favorites": favorites, 

1594 } 

1595 ) 

1596 

1597 except Exception: 

1598 logger.exception("Error getting available search engines") 

1599 return jsonify({"error": "Failed to retrieve search engines"}), 500 

1600 

1601 

1602@settings_bp.route("/api/search-favorites", methods=["GET"]) 

1603@login_required 

1604@with_user_session() 

1605def api_get_search_favorites(db_session=None, settings_manager=None): 

1606 """Get the list of favorite search engines for the current user""" 

1607 try: 

1608 favorites = settings_manager.get_setting("search.favorites", []) 

1609 if not isinstance(favorites, list): 

1610 favorites = [] 

1611 return jsonify({"favorites": favorites}) 

1612 

1613 except Exception: 

1614 logger.exception("Error getting search favorites") 

1615 return jsonify({"error": "Failed to retrieve favorites"}), 500 

1616 

1617 

1618@settings_bp.route("/api/search-favorites", methods=["PUT"]) 

1619@login_required 

1620@require_json_body(error_message="No data provided") 

1621@with_user_session() 

1622def api_update_search_favorites(db_session=None, settings_manager=None): 

1623 """Update the list of favorite search engines for the current user""" 

1624 try: 

1625 data = request.get_json() 

1626 favorites = data.get("favorites") 

1627 if favorites is None: 

1628 return jsonify({"error": "No favorites provided"}), 400 

1629 

1630 if not isinstance(favorites, list): 

1631 return jsonify({"error": "Favorites must be a list"}), 400 

1632 

1633 if settings_manager.set_setting("search.favorites", favorites): 

1634 invalidate_settings_caches(session["username"]) 

1635 return jsonify( 

1636 { 

1637 "message": "Favorites updated successfully", 

1638 "favorites": favorites, 

1639 } 

1640 ) 

1641 return jsonify({"error": "Failed to update favorites"}), 500 

1642 

1643 except Exception: 

1644 logger.exception("Error updating search favorites") 

1645 return jsonify({"error": "Failed to update favorites"}), 500 

1646 

1647 

1648@settings_bp.route("/api/search-favorites/toggle", methods=["POST"]) 

1649@login_required 

1650@require_json_body(error_message="No data provided") 

1651@with_user_session() 

1652def api_toggle_search_favorite(db_session=None, settings_manager=None): 

1653 """Toggle a search engine as favorite""" 

1654 try: 

1655 data = request.get_json() 

1656 engine_id = data.get("engine_id") 

1657 if not engine_id: 

1658 return jsonify({"error": "No engine_id provided"}), 400 

1659 

1660 # Get current favorites 

1661 favorites = settings_manager.get_setting("search.favorites", []) 

1662 if not isinstance(favorites, list): 

1663 favorites = [] 

1664 else: 

1665 # Make a copy to avoid modifying the original 

1666 favorites = list(favorites) 

1667 

1668 # Toggle the engine 

1669 is_favorite = engine_id in favorites 

1670 if is_favorite: 

1671 favorites.remove(engine_id) 

1672 is_favorite = False 

1673 else: 

1674 favorites.append(engine_id) 

1675 is_favorite = True 

1676 

1677 # Update the setting 

1678 if settings_manager.set_setting("search.favorites", favorites): 

1679 invalidate_settings_caches(session["username"]) 

1680 return jsonify( 

1681 { 

1682 "message": "Favorite toggled successfully", 

1683 "engine_id": engine_id, 

1684 "is_favorite": is_favorite, 

1685 "favorites": favorites, 

1686 } 

1687 ) 

1688 return jsonify({"error": "Failed to toggle favorite"}), 500 

1689 

1690 except Exception: 

1691 logger.exception("Error toggling search favorite") 

1692 return jsonify({"error": "Failed to toggle favorite"}), 500 

1693 

1694 

1695# Legacy routes for backward compatibility - these will redirect to the new routes 

1696@settings_bp.route("/main", methods=["GET"]) 

1697@login_required 

1698def main_config_page(): 

1699 """Redirect to app settings page""" 

1700 return redirect(url_for("settings.settings_page")) 

1701 

1702 

1703@settings_bp.route("/collections", methods=["GET"]) 

1704@login_required 

1705def collections_config_page(): 

1706 """Redirect to app settings page""" 

1707 return redirect(url_for("settings.settings_page")) 

1708 

1709 

1710@settings_bp.route("/api_keys", methods=["GET"]) 

1711@login_required 

1712def api_keys_config_page(): 

1713 """Redirect to LLM settings page""" 

1714 return redirect(url_for("settings.settings_page")) 

1715 

1716 

1717@settings_bp.route("/search_engines", methods=["GET"]) 

1718@login_required 

1719def search_engines_config_page(): 

1720 """Redirect to search settings page""" 

1721 return redirect(url_for("settings.settings_page")) 

1722 

1723 

1724@settings_bp.route("/llm", methods=["GET"]) 

1725@login_required 

1726def llm_config_page(): 

1727 """Redirect to LLM settings page""" 

1728 return redirect(url_for("settings.settings_page")) 

1729 

1730 

1731@settings_bp.route("/open_file_location", methods=["POST"]) 

1732@login_required 

1733def open_file_location(): 

1734 """Open the location of a configuration file. 

1735 

1736 Security: This endpoint is disabled for server deployments. 

1737 It only makes sense for desktop usage where the server and client are on the same machine. 

1738 """ 

1739 return jsonify( 

1740 { 

1741 "status": "error", 

1742 "message": "This feature is disabled. It is only available in desktop mode.", 

1743 } 

1744 ), 403 

1745 

1746 

1747@settings_bp.context_processor 

1748def inject_csrf_token(): 

1749 """Inject CSRF token into the template context for all settings routes.""" 

1750 return {"csrf_token": generate_csrf} 

1751 

1752 

1753@settings_bp.route("/fix_corrupted_settings", methods=["POST"]) 

1754@login_required 

1755@settings_limit 

1756@with_user_session(include_settings_manager=False) 

1757def fix_corrupted_settings(db_session=None): 

1758 """Fix corrupted settings in the database""" 

1759 try: 

1760 # Track fixed and removed settings 

1761 fixed_settings = [] 

1762 removed_duplicate_settings = [] 

1763 # First, find and remove duplicate settings with the same key 

1764 # This happens because of errors in settings import/export 

1765 from sqlalchemy import func as sql_func 

1766 

1767 # Find keys with duplicates 

1768 duplicate_keys = ( 

1769 db_session.query(Setting.key) 

1770 .group_by(Setting.key) 

1771 .having(sql_func.count(Setting.key) > 1) 

1772 .all() 

1773 ) 

1774 duplicate_keys = [key[0] for key in duplicate_keys] 

1775 

1776 # For each duplicate key, keep the latest updated one and remove others 

1777 for key in duplicate_keys: 

1778 dupe_settings = ( 

1779 db_session.query(Setting) 

1780 .filter(Setting.key == key) 

1781 .order_by(Setting.updated_at.desc()) 

1782 .all() 

1783 ) 

1784 

1785 # Keep the first one (most recently updated) and delete the rest 

1786 for i, setting in enumerate(dupe_settings): 

1787 if i > 0: # Skip the first one (keep it) 

1788 db_session.delete(setting) 

1789 removed_duplicate_settings.append(key) 

1790 

1791 # Check for settings with corrupted values 

1792 all_settings = db_session.query(Setting).all() 

1793 for setting in all_settings: 

1794 # Check different types of corruption 

1795 is_corrupted = False 

1796 

1797 if ( 

1798 setting.value is None 

1799 or ( 

1800 isinstance(setting.value, str) 

1801 and setting.value 

1802 in [ 

1803 "{", 

1804 "[", 

1805 "{}", 

1806 "[]", 

1807 "[object Object]", 

1808 "null", 

1809 "undefined", 

1810 ] 

1811 ) 

1812 or (isinstance(setting.value, dict) and len(setting.value) == 0) 

1813 ): 

1814 is_corrupted = True 

1815 

1816 # Skip if not corrupted 

1817 if not is_corrupted: 

1818 continue 

1819 

1820 default_value: Any = None 

1821 

1822 # Try to find a matching default setting based on key 

1823 if setting.key.startswith("llm."): 

1824 if setting.key == "llm.model": 

1825 default_value = "gemma3:12b" 

1826 elif setting.key == "llm.provider": 

1827 default_value = "ollama" 

1828 elif setting.key == "llm.temperature": 

1829 default_value = 0.7 

1830 elif setting.key == "llm.max_tokens": 1830 ↛ 1871line 1830 didn't jump to line 1871 because the condition on line 1830 was always true

1831 default_value = 1024 

1832 elif setting.key.startswith("search."): 

1833 if setting.key == "search.tool": 

1834 default_value = "auto" 

1835 elif setting.key == "search.max_results": 

1836 default_value = 10 

1837 elif setting.key == "search.region": 

1838 default_value = "us" 

1839 elif setting.key == "search.questions_per_iteration": 

1840 default_value = 3 

1841 elif setting.key == "search.searches_per_section": 

1842 default_value = 2 

1843 elif setting.key == "search.skip_relevance_filter": 

1844 default_value = False 

1845 elif setting.key == "search.safe_search": 

1846 default_value = True 

1847 elif setting.key == "search.search_language": 1847 ↛ 1871line 1847 didn't jump to line 1871 because the condition on line 1847 was always true

1848 default_value = "English" 

1849 elif setting.key.startswith("report."): 

1850 if setting.key == "report.searches_per_section": 

1851 default_value = 2 

1852 elif setting.key.startswith("app."): 1852 ↛ 1871line 1852 didn't jump to line 1871 because the condition on line 1852 was always true

1853 if ( 

1854 setting.key == "app.theme" 

1855 or setting.key == "app.default_theme" 

1856 ): 

1857 default_value = "dark" 

1858 elif setting.key == "app.enable_notifications" or ( 

1859 setting.key == "app.enable_web" 

1860 or setting.key == "app.web_interface" 

1861 ): 

1862 default_value = True 

1863 elif setting.key == "app.host": 

1864 default_value = "0.0.0.0" 

1865 elif setting.key == "app.port": 

1866 default_value = 5000 

1867 elif setting.key == "app.debug": 1867 ↛ 1871line 1867 didn't jump to line 1871 because the condition on line 1867 was always true

1868 default_value = True 

1869 

1870 # Update the setting with the default value if found 

1871 if default_value is not None: 

1872 setting.value = default_value 

1873 fixed_settings.append(setting.key) 

1874 else: 

1875 # If no default found but it's a corrupted JSON, set to empty object 

1876 if setting.key.startswith("report."): 1876 ↛ 1793line 1876 didn't jump to line 1793 because the condition on line 1876 was always true

1877 setting.value = {} 

1878 fixed_settings.append(setting.key) 

1879 

1880 # Commit changes 

1881 if fixed_settings or removed_duplicate_settings: 

1882 db_session.commit() 

1883 logger.info( 

1884 f"Fixed {len(fixed_settings)} corrupted settings: {', '.join(fixed_settings)}" 

1885 ) 

1886 if removed_duplicate_settings: 

1887 logger.info( 

1888 f"Removed {len(removed_duplicate_settings)} duplicate settings" 

1889 ) 

1890 invalidate_settings_caches(session["username"]) 

1891 

1892 # Return success 

1893 return jsonify( 

1894 { 

1895 "status": "success", 

1896 "message": f"Fixed {len(fixed_settings)} corrupted settings, removed {len(removed_duplicate_settings)} duplicates", 

1897 "fixed_settings": fixed_settings, 

1898 "removed_duplicates": removed_duplicate_settings, 

1899 } 

1900 ) 

1901 

1902 except Exception: 

1903 logger.exception("Error fixing corrupted settings") 

1904 db_session.rollback() 

1905 return ( 

1906 jsonify( 

1907 { 

1908 "status": "error", 

1909 "message": "An internal error occurred while fixing corrupted settings. Please try again later.", 

1910 } 

1911 ), 

1912 500, 

1913 ) 

1914 

1915 

1916@settings_bp.route("/api/warnings", methods=["GET"]) 

1917@login_required 

1918def api_get_warnings(): 

1919 """Get current warnings based on settings""" 

1920 try: 

1921 warnings = calculate_warnings() 

1922 return jsonify({"warnings": warnings}) 

1923 except Exception: 

1924 logger.exception("Error getting warnings") 

1925 return jsonify({"error": "Failed to retrieve warnings"}), 500 

1926 

1927 

1928@settings_bp.route("/api/backup-status", methods=["GET"]) 

1929@login_required 

1930def api_get_backup_status(): 

1931 """Get backup status for the current user.""" 

1932 try: 

1933 from ...config.paths import get_user_backup_directory 

1934 

1935 username = session.get("username") 

1936 if not username: 

1937 return jsonify({"error": "Not authenticated"}), 401 

1938 

1939 from ...utilities.formatting import human_size 

1940 

1941 backup_dir = get_user_backup_directory(username) 

1942 

1943 # Sort by modification time (not filename) for robustness 

1944 backup_list = [] 

1945 total_size = 0 

1946 for b in backup_dir.glob("ldr_backup_*.db"): 

1947 try: 

1948 stat = b.stat() 

1949 total_size += stat.st_size 

1950 backup_list.append( 

1951 { 

1952 "filename": b.name, 

1953 "size_bytes": stat.st_size, 

1954 "size_human": human_size(stat.st_size), 

1955 "created_at": datetime.fromtimestamp( 

1956 stat.st_mtime, tz=timezone.utc 

1957 ).isoformat(), 

1958 "_mtime": stat.st_mtime, 

1959 } 

1960 ) 

1961 except FileNotFoundError: 

1962 continue 

1963 

1964 # Sort newest first by mtime, then remove internal field 

1965 backup_list.sort(key=lambda x: x["_mtime"], reverse=True) 

1966 for entry in backup_list: 

1967 del entry["_mtime"] 

1968 

1969 backup_enabled = _get_setting_from_session("backup.enabled", True) 

1970 

1971 return jsonify( 

1972 { 

1973 "enabled": bool(backup_enabled), 

1974 "count": len(backup_list), 

1975 "backups": backup_list, 

1976 "total_size_bytes": total_size, 

1977 "total_size_human": human_size(total_size), 

1978 } 

1979 ) 

1980 

1981 except Exception: 

1982 logger.exception("Error getting backup status") 

1983 return jsonify({"error": "Failed to retrieve backup status"}), 500 

1984 

1985 

1986@settings_bp.route("/api/ollama-status", methods=["GET"]) 

1987@login_required 

1988def check_ollama_status(): 

1989 """Check if Ollama is running and available""" 

1990 try: 

1991 # Get Ollama URL from settings 

1992 raw_base_url = _get_setting_from_session( 

1993 "llm.ollama.url", DEFAULT_OLLAMA_URL 

1994 ) 

1995 base_url = ( 

1996 normalize_url(raw_base_url) if raw_base_url else DEFAULT_OLLAMA_URL 

1997 ) 

1998 

1999 response = safe_get( 

2000 f"{base_url}/api/version", 

2001 timeout=2, 

2002 allow_localhost=True, 

2003 allow_private_ips=True, 

2004 ) 

2005 

2006 if response.status_code == 200: 

2007 return jsonify( 

2008 { 

2009 "running": True, 

2010 "version": response.json().get("version", "unknown"), 

2011 } 

2012 ) 

2013 return jsonify( 

2014 { 

2015 "running": False, 

2016 "error": f"Ollama returned status code {response.status_code}", 

2017 } 

2018 ) 

2019 except requests.exceptions.RequestException: 

2020 logger.exception("Ollama check failed") 

2021 return jsonify( 

2022 {"running": False, "error": "Failed to check search engine status"} 

2023 ) 

2024 

2025 

2026@settings_bp.route("/api/rate-limiting/status", methods=["GET"]) 

2027@login_required 

2028def api_get_rate_limiting_status(): 

2029 """Get current rate limiting status and statistics""" 

2030 try: 

2031 from ...web_search_engines.rate_limiting import get_tracker 

2032 

2033 tracker = get_tracker() 

2034 

2035 # Get basic status 

2036 status = { 

2037 "enabled": tracker.enabled, 

2038 "exploration_rate": tracker.exploration_rate, 

2039 "learning_rate": tracker.learning_rate, 

2040 "memory_window": tracker.memory_window, 

2041 } 

2042 

2043 # Get engine statistics 

2044 engine_stats = tracker.get_stats() 

2045 engines = [] 

2046 

2047 for stat in engine_stats: 

2048 ( 

2049 engine_type, 

2050 base_wait, 

2051 min_wait, 

2052 max_wait, 

2053 last_updated, 

2054 total_attempts, 

2055 success_rate, 

2056 ) = stat 

2057 engines.append( 

2058 { 

2059 "engine_type": engine_type, 

2060 "base_wait_seconds": round(base_wait, 2), 

2061 "min_wait_seconds": round(min_wait, 2), 

2062 "max_wait_seconds": round(max_wait, 2), 

2063 "last_updated": last_updated, 

2064 "total_attempts": total_attempts, 

2065 "success_rate": ( 

2066 round(success_rate * 100, 1) if success_rate else 0.0 

2067 ), 

2068 } 

2069 ) 

2070 

2071 return jsonify({"status": status, "engines": engines}) 

2072 

2073 except Exception: 

2074 logger.exception("Error getting rate limiting status") 

2075 return jsonify({"error": "An internal error occurred"}), 500 

2076 

2077 

2078@settings_bp.route( 

2079 "/api/rate-limiting/engines/<engine_type>/reset", methods=["POST"] 

2080) 

2081@login_required 

2082def api_reset_engine_rate_limiting(engine_type): 

2083 """Reset rate limiting data for a specific engine""" 

2084 try: 

2085 from ...web_search_engines.rate_limiting import get_tracker 

2086 

2087 tracker = get_tracker() 

2088 tracker.reset_engine(engine_type) 

2089 

2090 return jsonify( 

2091 {"message": f"Rate limiting data reset for {engine_type}"} 

2092 ) 

2093 

2094 except Exception: 

2095 logger.exception(f"Error resetting rate limiting for {engine_type}") 

2096 return jsonify({"error": "An internal error occurred"}), 500 

2097 

2098 

2099@settings_bp.route("/api/rate-limiting/cleanup", methods=["POST"]) 

2100@login_required 

2101def api_cleanup_rate_limiting(): 

2102 """Clean up old rate limiting data. 

2103 

2104 Note: not using @require_json_body because the JSON body is optional 

2105 here — the endpoint works with or without a payload (defaults to 30 days). 

2106 """ 

2107 try: 

2108 from ...web_search_engines.rate_limiting import get_tracker 

2109 

2110 data = request.get_json() if request.is_json else None 

2111 days = data.get("days", 30) if data is not None else 30 

2112 

2113 tracker = get_tracker() 

2114 tracker.cleanup_old_data(days) 

2115 

2116 return jsonify( 

2117 {"message": f"Cleaned up rate limiting data older than {days} days"} 

2118 ) 

2119 

2120 except Exception: 

2121 logger.exception("Error cleaning up rate limiting data") 

2122 return jsonify({"error": "An internal error occurred"}), 500 

2123 

2124 

2125@settings_bp.route("/api/bulk", methods=["GET"]) 

2126@login_required 

2127def get_bulk_settings(): 

2128 """Get multiple settings at once for performance.""" 

2129 try: 

2130 # Get requested settings from query parameters 

2131 requested = request.args.getlist("keys[]") 

2132 if not requested: 

2133 # Default to common settings if none specified 

2134 requested = [ 

2135 "llm.provider", 

2136 "llm.model", 

2137 "search.tool", 

2138 "search.iterations", 

2139 "search.questions_per_iteration", 

2140 "search.search_strategy", 

2141 "benchmark.evaluation.provider", 

2142 "benchmark.evaluation.model", 

2143 "benchmark.evaluation.temperature", 

2144 "benchmark.evaluation.endpoint_url", 

2145 ] 

2146 

2147 # Fetch all settings at once 

2148 result = {} 

2149 for key in requested: 

2150 try: 

2151 value = _get_setting_from_session(key) 

2152 result[key] = {"value": value, "exists": value is not None} 

2153 except Exception: 

2154 logger.warning(f"Error getting setting {key}") 

2155 result[key] = { 

2156 "value": None, 

2157 "exists": False, 

2158 "error": "Failed to retrieve setting", 

2159 } 

2160 

2161 return jsonify({"success": True, "settings": result}) 

2162 

2163 except Exception: 

2164 logger.exception("Error getting bulk settings") 

2165 return jsonify( 

2166 {"success": False, "error": "An internal error occurred"} 

2167 ), 500 

2168 

2169 

2170@settings_bp.route("/api/data-location", methods=["GET"]) 

2171@login_required 

2172def api_get_data_location(): 

2173 """Get information about data storage location and security""" 

2174 try: 

2175 # Get the data directory path 

2176 data_dir = get_data_directory() 

2177 # Get the encrypted databases path 

2178 encrypted_db_path = get_encrypted_database_path() 

2179 

2180 # Check if LDR_DATA_DIR environment variable is set 

2181 from local_deep_research.settings.manager import SettingsManager 

2182 

2183 settings_manager = SettingsManager() 

2184 custom_data_dir = settings_manager.get_setting("bootstrap.data_dir") 

2185 

2186 # Get platform-specific default location info 

2187 platform_info = { 

2188 "Windows": "C:\\Users\\Username\\AppData\\Local\\local-deep-research", 

2189 "macOS": "~/Library/Application Support/local-deep-research", 

2190 "Linux": "~/.local/share/local-deep-research", 

2191 } 

2192 

2193 # Current platform 

2194 current_platform = platform.system() 

2195 if current_platform == "Darwin": 

2196 current_platform = "macOS" 

2197 

2198 # Get SQLCipher settings from environment 

2199 from ...database.sqlcipher_utils import get_sqlcipher_settings 

2200 

2201 # Debug logging 

2202 logger.info(f"db_manager type: {type(db_manager)}") 

2203 logger.info( 

2204 f"db_manager.has_encryption: {getattr(db_manager, 'has_encryption', 'ATTRIBUTE NOT FOUND')}" 

2205 ) 

2206 

2207 cipher_settings = ( 

2208 get_sqlcipher_settings() if db_manager.has_encryption else {} 

2209 ) 

2210 

2211 return jsonify( 

2212 { 

2213 "data_directory": str(data_dir), 

2214 "database_path": str(encrypted_db_path), 

2215 "encrypted_database_path": str(encrypted_db_path), 

2216 "is_custom": custom_data_dir is not None, 

2217 "custom_env_var": "LDR_DATA_DIR", 

2218 "custom_env_value": custom_data_dir, 

2219 "platform": current_platform, 

2220 "platform_default": platform_info.get( 

2221 current_platform, str(data_dir) 

2222 ), 

2223 "platform_info": platform_info, 

2224 "security_notice": { 

2225 "encrypted": db_manager.has_encryption, 

2226 "warning": "All data including API keys stored in the database are securely encrypted." 

2227 if db_manager.has_encryption 

2228 else "All data including API keys stored in the database are currently unencrypted. Please ensure appropriate file system permissions are set.", 

2229 "recommendation": "Your data is protected with database encryption." 

2230 if db_manager.has_encryption 

2231 else "Consider using environment variables for sensitive API keys instead of storing them in the database.", 

2232 }, 

2233 "encryption_settings": cipher_settings, 

2234 } 

2235 ) 

2236 

2237 except Exception: 

2238 logger.exception("Error getting data location information") 

2239 return jsonify({"error": "Failed to retrieve data location"}), 500 

2240 

2241 

2242@settings_bp.route("/api/notifications/test-url", methods=["POST"]) 

2243@login_required 

2244def api_test_notification_url(): 

2245 """ 

2246 Test a notification service URL. 

2247 

2248 This endpoint creates a temporary NotificationService instance to test 

2249 the provided URL. No database session or password is required because: 

2250 - The service URL is provided directly in the request body 

2251 - Test notifications use a temporary Apprise instance 

2252 - No user settings or database queries are performed 

2253 

2254 Security note: Rate limiting is not applied here because users need to 

2255 test URLs when configuring notifications. Abuse is mitigated by the 

2256 @login_required decorator and the fact that users can only spam their 

2257 own notification services. 

2258 """ 

2259 try: 

2260 from ...notifications.service import NotificationService 

2261 

2262 data = request.get_json() 

2263 if not data or "service_url" not in data: 

2264 return jsonify( 

2265 {"success": False, "error": "service_url is required"} 

2266 ), 400 

2267 

2268 service_url = data["service_url"] 

2269 

2270 # Create notification service instance and test the URL 

2271 # No password/session needed - URL provided directly, no DB access 

2272 notification_service = NotificationService() 

2273 result = notification_service.test_service(service_url) 

2274 

2275 # Only return expected fields to prevent information leakage 

2276 safe_response = { 

2277 "success": result.get("success", False), 

2278 "message": result.get("message", ""), 

2279 "error": result.get("error", ""), 

2280 } 

2281 return jsonify(safe_response) 

2282 

2283 except Exception: 

2284 logger.exception("Error testing notification URL") 

2285 return jsonify( 

2286 { 

2287 "success": False, 

2288 "error": "Failed to test notification service. Check logs for details.", 

2289 } 

2290 ), 500