Coverage for src / local_deep_research / web / routes / settings_routes.py: 35%

913 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Settings Routes Module 

3 

4This module handles all settings-related HTTP endpoints for the application. 

5 

6CHECKBOX HANDLING PATTERN: 

7-------------------------- 

8This module supports TWO submission modes to handle checkboxes correctly: 

9 

10**MODE 1: AJAX/JSON Submission (Primary - /save_all_settings)** 

11- JavaScript intercepts form submission with e.preventDefault() 

12- Checkbox values read directly from DOM via checkbox.checked 

13- Data sent as JSON: {"setting.key": true/false} 

14- Hidden fallback inputs are managed but NOT used in this mode 

15- Provides better UX with instant feedback and validation 

16 

17**MODE 2: Traditional POST Submission (Fallback - /save_settings)** 

18- Used when JavaScript is disabled (accessibility/no-JS environments) 

19- Browser submits form data naturally via request.form 

20- Hidden fallback pattern CRITICAL here: 

21 * Checked checkbox: Submits checkbox value, hidden input disabled 

22 * Unchecked checkbox: Submits hidden input value "false" 

23- Ensures unchecked checkboxes are captured (HTML limitation workaround) 

24 

25**Implementation Details:** 

261. Each checkbox has `data-hidden-fallback` attribute → hidden input ID 

272. checkbox_handler.js manages hidden input disabled state 

283. AJAX mode: settings.js reads checkbox.checked directly (lines 2233-2240) 

294. POST mode: Flask reads request.form including enabled hidden inputs 

305. Both modes use convert_setting_value() for consistent boolean conversion 

31 

32**Why Both Patterns?** 

33- AJAX: Better UX, immediate validation, no page reload 

34- Traditional POST: Accessibility, progressive enhancement, JavaScript-free operation 

35- Hidden inputs: Only meaningful for traditional POST, ignored in AJAX mode 

36 

37This dual-mode approach ensures the app works for all users while providing 

38optimal experience when JavaScript is available. 

39""" 

40 

41import json 

42import platform 

43import subprocess 

44from typing import Any, Optional, Tuple 

45from datetime import datetime, UTC, timedelta 

46 

47import requests 

48from flask import ( 

49 Blueprint, 

50 flash, 

51 jsonify, 

52 redirect, 

53 request, 

54 session, 

55 url_for, 

56) 

57from flask_wtf.csrf import generate_csrf 

58from loguru import logger 

59 

60from ...config.paths import get_data_directory, get_encrypted_database_path 

61from ...database.models import Setting, SettingType 

62from ...database.session_context import get_user_db_session 

63from ...database.encrypted_db import db_manager 

64from ...utilities.db_utils import get_settings_manager 

65from ...utilities.url_utils import normalize_url 

66from ..auth.decorators import login_required 

67from ...settings import SettingsManager 

68from ...settings.manager import get_typed_setting_value, parse_boolean 

69from ..services.settings_service import ( 

70 create_or_update_setting, 

71 set_setting, 

72) 

73from ..utils.templates import render_template_with_defaults 

74from ..server_config import sync_from_settings 

75from ...security import safe_get 

76 

77# Create a Blueprint for settings 

78settings_bp = Blueprint("settings", __name__, url_prefix="/settings") 

79 

80# Settings with dynamically populated options (excluded from validation) 

81DYNAMIC_SETTINGS = ["llm.provider", "llm.model", "search.tool"] 

82 

83 

84def _get_setting_from_session(key: str, default=None): 

85 """Helper to get a setting using the current session context.""" 

86 username = session.get("username") 

87 with get_user_db_session(username) as db_session: 

88 if db_session: 88 ↛ 91line 88 didn't jump to line 91

89 settings_manager = get_settings_manager(db_session, username) 

90 return settings_manager.get_setting(key, default) 

91 return default 

92 

93 

94def calculate_warnings(): 

95 """Calculate current warning conditions based on settings""" 

96 warnings = [] 

97 

98 try: 

99 # Get current settings using proper session context 

100 username = session.get("username") 

101 with get_user_db_session(username) as db_session: 

102 if db_session: 102 ↛ 121line 102 didn't jump to line 121

103 settings_manager = get_settings_manager(db_session, username) 

104 provider = settings_manager.get_setting( 

105 "llm.provider", "ollama" 

106 ).lower() 

107 local_context = settings_manager.get_setting( 

108 "llm.local_context_window_size", 4096 

109 ) 

110 

111 # Get dismissal settings 

112 dismiss_high_context = settings_manager.get_setting( 

113 "app.warnings.dismiss_high_context", False 

114 ) 

115 

116 logger.debug( 

117 f"Starting warning calculation - provider={provider}" 

118 ) 

119 

120 # Check warning conditions 

121 is_local_provider = provider in [ 

122 "ollama", 

123 "llamacpp", 

124 "lmstudio", 

125 "vllm", 

126 ] 

127 

128 # High context warning for local providers 

129 if ( 129 ↛ 134line 129 didn't jump to line 134 because the condition on line 129 was never true

130 is_local_provider 

131 and local_context > 8192 

132 and not dismiss_high_context 

133 ): 

134 warnings.append( 

135 { 

136 "type": "high_context", 

137 "icon": "⚠️", 

138 "title": "High Context Warning", 

139 "message": f"Context size ({local_context:,} tokens) may cause memory issues with {provider}. Increase VRAM or reduce context size if you experience slowdowns.", 

140 "dismissKey": "app.warnings.dismiss_high_context", 

141 } 

142 ) 

143 

144 # Get additional warning settings 

145 with get_user_db_session(username) as db_session: 

146 if db_session: 146 ↛ 157line 146 didn't jump to line 157

147 settings_manager = get_settings_manager(db_session, username) 

148 dismiss_model_mismatch = settings_manager.get_setting( 

149 "app.warnings.dismiss_model_mismatch", False 

150 ) 

151 

152 # Get current strategy and model (these need to be passed from the frontend or retrieved differently) 

153 # For now, we'll implement basic warnings that don't require form state 

154 

155 # Model mismatch warning (simplified - checking setting instead of form value) 

156 current_model = settings_manager.get_setting("llm.model", "") 

157 if ( 157 ↛ 164line 157 didn't jump to line 164 because the condition on line 157 was never true

158 current_model 

159 and "70b" in current_model.lower() 

160 and is_local_provider 

161 and local_context > 8192 

162 and not dismiss_model_mismatch 

163 ): 

164 warnings.append( 

165 { 

166 "type": "model_mismatch", 

167 "icon": "🧠", 

168 "title": "Model & Context Warning", 

169 "message": f"Large model ({current_model}) with high context ({local_context:,}) may exceed VRAM. Consider reducing context size or upgrading GPU memory.", 

170 "dismissKey": "app.warnings.dismiss_model_mismatch", 

171 } 

172 ) 

173 

174 except Exception as e: 

175 logger.warning(f"Error calculating warnings: {e}") 

176 

177 return warnings 

178 

179 

180def validate_setting( 

181 setting: Setting, value: Any 

182) -> Tuple[bool, Optional[str]]: 

183 """ 

184 Validate a setting value based on its type and constraints. 

185 

186 Args: 

187 setting: The Setting object to validate against 

188 value: The value to validate 

189 

190 Returns: 

191 Tuple of (is_valid, error_message) 

192 """ 

193 # Convert value to appropriate type first using SettingsManager's logic 

194 value = get_typed_setting_value( 

195 key=setting.key, 

196 value=value, 

197 ui_element=setting.ui_element, 

198 default=None, 

199 check_env=False, 

200 ) 

201 

202 # Validate based on UI element type 

203 if setting.ui_element == "checkbox": 

204 # After conversion, should be boolean 

205 if not isinstance(value, bool): 

206 return False, "Value must be a boolean" 

207 

208 elif setting.ui_element in ("number", "slider", "range"): 

209 # After conversion, should be numeric 

210 if not isinstance(value, (int, float)): 

211 return False, "Value must be a number" 

212 

213 # Check min/max constraints if defined 

214 if setting.min_value is not None and value < setting.min_value: 

215 return False, f"Value must be at least {setting.min_value}" 

216 if setting.max_value is not None and value > setting.max_value: 

217 return False, f"Value must be at most {setting.max_value}" 

218 

219 elif setting.ui_element == "select": 

220 # Check if value is in the allowed options 

221 if setting.options: 

222 # Skip options validation for dynamically populated dropdowns 

223 if setting.key not in DYNAMIC_SETTINGS: 

224 allowed_values = [opt.get("value") for opt in setting.options] 

225 if value not in allowed_values: 

226 return ( 

227 False, 

228 f"Value must be one of: {', '.join(str(v) for v in allowed_values)}", 

229 ) 

230 

231 # All checks passed 

232 return True, None 

233 

234 

235@settings_bp.route("/", methods=["GET"]) 

236@login_required 

237def settings_page(): 

238 """Main settings dashboard with links to specialized config pages""" 

239 return render_template_with_defaults("settings_dashboard.html") 

240 

241 

242@settings_bp.route("/save_all_settings", methods=["POST"]) 

243@login_required 

244def save_all_settings(): 

245 """Handle saving all settings at once from the unified settings page""" 

246 username = session.get("username") 

247 

248 with get_user_db_session(username) as db_session: 

249 # Get the settings manager but we don't need to assign it to a variable right now 

250 # get_db_settings_manager(db_session) 

251 

252 try: 

253 # Process JSON data 

254 form_data = request.get_json() 

255 if not form_data: 

256 return ( 

257 jsonify( 

258 { 

259 "status": "error", 

260 "message": "No settings data provided", 

261 } 

262 ), 

263 400, 

264 ) 

265 

266 # Track validation errors 

267 validation_errors = [] 

268 settings_by_type = {} 

269 

270 # Track changes for logging 

271 updated_settings = [] 

272 created_settings = [] 

273 

274 # Store original values for better messaging 

275 original_values = {} 

276 

277 # Fetch all settings once to avoid N+1 query problem 

278 all_db_settings = { 

279 setting.key: setting 

280 for setting in db_session.query(Setting).all() 

281 } 

282 

283 # Update each setting 

284 for key, value in form_data.items(): 

285 # Skip corrupted keys or empty strings as keys 

286 if not key or not isinstance(key, str) or key.strip() == "": 

287 continue 

288 

289 # Get the setting metadata from pre-fetched dict 

290 current_setting = all_db_settings.get(key) 

291 

292 # EARLY VALIDATION: Convert checkbox values BEFORE any other processing 

293 # This prevents incorrect triggering of corrupted value detection 

294 if current_setting and current_setting.ui_element == "checkbox": 

295 if not isinstance(value, bool): 

296 logger.debug( 

297 f"Converting checkbox {key} from {type(value).__name__} to bool: {value}" 

298 ) 

299 value = parse_boolean(value) 

300 form_data[key] = ( 

301 value # Update the form_data with converted value 

302 ) 

303 

304 # Store original value for messaging 

305 if current_setting: 

306 original_values[key] = current_setting.value 

307 

308 # Determine setting type and category 

309 if key.startswith("llm."): 

310 setting_type = SettingType.LLM 

311 category = "llm_general" 

312 if ( 

313 "temperature" in key 

314 or "max_tokens" in key 

315 or "batch" in key 

316 or "layers" in key 

317 ): 

318 category = "llm_parameters" 

319 elif key.startswith("search."): 

320 setting_type = SettingType.SEARCH 

321 category = "search_general" 

322 if ( 

323 "iterations" in key 

324 or "results" in key 

325 or "region" in key 

326 or "questions" in key 

327 or "section" in key 

328 ): 

329 category = "search_parameters" 

330 elif key.startswith("report."): 

331 setting_type = SettingType.REPORT 

332 category = "report_parameters" 

333 elif key.startswith("database."): 

334 setting_type = SettingType.DATABASE 

335 category = "database_parameters" 

336 elif key.startswith("app."): 

337 setting_type = SettingType.APP 

338 category = "app_interface" 

339 else: 

340 setting_type = None 

341 category = None 

342 

343 # Special handling for corrupted or empty values 

344 if value == "[object Object]" or ( 

345 isinstance(value, str) 

346 and value.strip() in ["{}", "[]", "{", "["] 

347 ): 

348 if key.startswith("report."): 

349 value = {} 

350 else: 

351 # Use default or null for other types 

352 if key == "llm.model": 

353 value = "gpt-3.5-turbo" 

354 elif key == "llm.provider": 

355 value = "openai" 

356 elif key == "search.tool": 

357 value = "auto" 

358 elif key in ["app.theme", "app.default_theme"]: 

359 value = "dark" 

360 else: 

361 value = None 

362 

363 logger.warning( 

364 f"Corrected corrupted value for {key}: {value}" 

365 ) 

366 

367 # Handle JSON string values (already parsed by JavaScript) 

368 if isinstance(value, (dict, list)): 

369 # Keep as is, already parsed 

370 pass 

371 # Handle string values that might be JSON 

372 elif isinstance(value, str) and ( 

373 value.startswith("{") or value.startswith("[") 

374 ): 

375 try: 

376 # Try to parse the string as JSON 

377 value = json.loads(value) 

378 except json.JSONDecodeError: 

379 # If it fails to parse, keep as string 

380 pass 

381 

382 if current_setting: 

383 # Convert value to appropriate type using SettingsManager's logic 

384 converted_value = get_typed_setting_value( 

385 key=current_setting.key, 

386 value=value, 

387 ui_element=current_setting.ui_element, 

388 default=None, 

389 check_env=False, 

390 ) 

391 

392 # Validate the setting 

393 is_valid, error_message = validate_setting( 

394 current_setting, converted_value 

395 ) 

396 

397 if is_valid: 

398 # Save the converted setting using the same session 

399 success = set_setting( 

400 key, converted_value, db_session=db_session 

401 ) 

402 if success: 

403 updated_settings.append(key) 

404 

405 # Track settings by type for exporting 

406 if current_setting.type not in settings_by_type: 

407 settings_by_type[current_setting.type] = [] 

408 settings_by_type[current_setting.type].append( 

409 current_setting 

410 ) 

411 else: 

412 # Add to validation errors 

413 validation_errors.append( 

414 { 

415 "key": key, 

416 "name": current_setting.name, 

417 "error": error_message, 

418 } 

419 ) 

420 else: 

421 # Create a new setting 

422 new_setting = { 

423 "key": key, 

424 "value": value, 

425 "type": setting_type.value.lower(), 

426 "name": key.split(".")[-1].replace("_", " ").title(), 

427 "description": f"Setting for {key}", 

428 "category": category, 

429 "ui_element": "text", # Default UI element 

430 } 

431 

432 # Determine better UI element based on value type 

433 if isinstance(value, bool): 

434 new_setting["ui_element"] = "checkbox" 

435 elif isinstance(value, (int, float)) and not isinstance( 

436 value, bool 

437 ): 

438 new_setting["ui_element"] = "number" 

439 elif isinstance(value, (dict, list)): 

440 new_setting["ui_element"] = "textarea" 

441 

442 # Create the setting 

443 db_setting = create_or_update_setting( 

444 new_setting, db_session=db_session 

445 ) 

446 

447 if db_setting: 

448 created_settings.append(key) 

449 # Track settings by type for exporting 

450 if db_setting.type not in settings_by_type: 

451 settings_by_type[db_setting.type] = [] 

452 settings_by_type[db_setting.type].append(db_setting) 

453 else: 

454 validation_errors.append( 

455 { 

456 "key": key, 

457 "name": new_setting["name"], 

458 "error": "Failed to create setting", 

459 } 

460 ) 

461 

462 # Report validation errors if any 

463 if validation_errors: 

464 return ( 

465 jsonify( 

466 { 

467 "status": "error", 

468 "message": "Validation errors", 

469 "errors": validation_errors, 

470 } 

471 ), 

472 400, 

473 ) 

474 

475 # Get all settings to return to the client for proper state update 

476 all_settings = [] 

477 for setting in db_session.query(Setting).all(): 

478 # Convert enum to string if present 

479 setting_type = setting.type 

480 if hasattr(setting_type, "value"): 

481 setting_type = setting_type.value 

482 

483 all_settings.append( 

484 { 

485 "key": setting.key, 

486 "value": setting.value, 

487 "name": setting.name, 

488 "description": setting.description, 

489 "type": setting_type, 

490 "category": setting.category, 

491 "ui_element": setting.ui_element, 

492 "editable": setting.editable, 

493 "options": setting.options, 

494 } 

495 ) 

496 

497 # Customize the success message based on what changed 

498 success_message = "" 

499 if len(updated_settings) == 1: 

500 # For a single update, provide more specific info about what changed 

501 key = updated_settings[0] 

502 # Reuse the already-fetched setting from our pre-fetched dict 

503 updated_setting = all_db_settings.get(key) 

504 name = ( 

505 updated_setting.name 

506 if updated_setting 

507 else key.split(".")[-1].replace("_", " ").title() 

508 ) 

509 

510 # Format the message 

511 if key in original_values: 

512 # Get original value but comment out if not used 

513 # old_value = original_values[key] 

514 new_value = ( 

515 updated_setting.value if updated_setting else None 

516 ) 

517 

518 # If it's a boolean, use "enabled/disabled" language 

519 if isinstance(new_value, bool): 

520 state = "enabled" if new_value else "disabled" 

521 success_message = f"{name} {state}" 

522 else: 

523 # For non-boolean values 

524 if isinstance(new_value, (dict, list)): 

525 success_message = f"{name} updated" 

526 else: 

527 success_message = f"{name} updated" 

528 else: 

529 success_message = f"{name} updated" 

530 else: 

531 # Multiple settings or generic message 

532 success_message = f"Settings saved successfully ({len(updated_settings)} updated, {len(created_settings)} created)" 

533 

534 # Check if any warning-affecting settings were changed and include warnings 

535 response_data = { 

536 "status": "success", 

537 "message": success_message, 

538 "updated": updated_settings, 

539 "created": created_settings, 

540 "settings": all_settings, 

541 } 

542 

543 warning_affecting_keys = [ 

544 "llm.provider", 

545 "search.tool", 

546 "search.iterations", 

547 "search.questions_per_iteration", 

548 "llm.local_context_window_size", 

549 "llm.context_window_unrestricted", 

550 "llm.context_window_size", 

551 ] 

552 

553 # Check if any warning-affecting settings were changed 

554 if any( 

555 key in warning_affecting_keys 

556 for key in updated_settings + created_settings 

557 ): 

558 warnings = calculate_warnings() 

559 response_data["warnings"] = warnings 

560 logger.info( 

561 f"Bulk settings update affected warning keys, calculated {len(warnings)} warnings" 

562 ) 

563 

564 return jsonify(response_data) 

565 

566 except Exception: 

567 logger.exception("Error saving settings") 

568 return ( 

569 jsonify( 

570 { 

571 "status": "error", 

572 "message": "An internal error occurred while saving settings.", 

573 } 

574 ), 

575 500, 

576 ) 

577 

578 

579@settings_bp.route("/reset_to_defaults", methods=["POST"]) 

580@login_required 

581def reset_to_defaults(): 

582 """Reset all settings to their default values""" 

583 username = session.get("username") 

584 

585 with get_user_db_session(username) as db_session: 

586 # Import default settings from files 

587 try: 

588 # Create settings manager with proper session context 

589 username = session.get("username") 

590 with get_user_db_session(username) as db_session: 

591 settings_mgr = SettingsManager(db_session) 

592 # Import settings from default files 

593 settings_mgr.load_from_defaults_file() 

594 

595 logger.info("Successfully imported settings from default files") 

596 

597 except Exception: 

598 logger.exception("Error importing default settings") 

599 

600 # Return success 

601 return jsonify( 

602 { 

603 "status": "success", 

604 "message": "All settings have been reset to default values", 

605 } 

606 ) 

607 

608 

609@settings_bp.route("/save_settings", methods=["POST"]) 

610@login_required 

611def save_settings(): 

612 """Save all settings from the form using POST method - fallback when JavaScript is disabled""" 

613 try: 

614 username = session.get("username") 

615 

616 # Get form data 

617 form_data = request.form.to_dict() 

618 

619 # Remove CSRF token from the data 

620 form_data.pop("csrf_token", None) 

621 

622 with get_user_db_session(username) as db_session: 

623 settings_manager = SettingsManager(db_session) 

624 

625 updated_count = 0 

626 failed_count = 0 

627 

628 # Fetch all settings once to avoid N+1 query problem 

629 all_db_settings = { 

630 setting.key: setting 

631 for setting in db_session.query(Setting).all() 

632 } 

633 

634 # Process each setting 

635 for key, value in form_data.items(): 

636 try: 

637 # Get the setting from pre-fetched dict 

638 db_setting = all_db_settings.get(key) 

639 

640 # Convert value to appropriate type using SettingsManager's logic 

641 if db_setting: 

642 value = get_typed_setting_value( 

643 key=db_setting.key, 

644 value=value, 

645 ui_element=db_setting.ui_element, 

646 default=None, 

647 check_env=False, 

648 ) 

649 

650 # Save the setting 

651 if settings_manager.set_setting(key, value, commit=False): 

652 updated_count += 1 

653 else: 

654 failed_count += 1 

655 logger.warning(f"Failed to save setting {key}") 

656 

657 except Exception: 

658 logger.exception(f"Error saving setting {key}") 

659 failed_count += 1 

660 

661 # Commit all changes at once 

662 try: 

663 db_session.commit() 

664 

665 flash( 

666 f"Settings saved successfully! Updated {updated_count} settings.", 

667 "success", 

668 ) 

669 if failed_count > 0: 

670 flash( 

671 f"Warning: {failed_count} settings failed to save.", 

672 "warning", 

673 ) 

674 

675 # Sync server config 

676 settings_snapshot = settings_manager.get_settings_snapshot() 

677 sync_from_settings(settings_snapshot) 

678 

679 except Exception: 

680 db_session.rollback() 

681 logger.exception("Failed to commit settings") 

682 flash("Error saving settings. Please try again.", "error") 

683 

684 return redirect(url_for("settings.settings_page")) 

685 

686 except Exception: 

687 logger.exception("Error in save_settings") 

688 flash("An internal error occurred while saving settings.", "error") 

689 return redirect(url_for("settings.settings_page")) 

690 

691 

692# API Routes 

693@settings_bp.route("/api", methods=["GET"]) 

694@login_required 

695def api_get_all_settings(): 

696 """Get all settings""" 

697 try: 

698 # Get query parameters 

699 category = request.args.get("category") 

700 username = session.get("username") 

701 

702 with get_user_db_session(username) as db_session: 

703 # Create settings manager with the session from context 

704 # This ensures thread safety 

705 settings_manager = SettingsManager(db_session) 

706 

707 # Get settings 

708 settings = settings_manager.get_all_settings() 

709 

710 # Filter by category if requested 

711 if category: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true

712 filtered_settings = {} 

713 # Need to get all setting details to check category 

714 db_settings = db_session.query(Setting).all() 

715 category_keys = [ 

716 s.key for s in db_settings if s.category == category 

717 ] 

718 

719 # Filter settings by keys 

720 for key, value in settings.items(): 

721 if key in category_keys: 

722 filtered_settings[key] = value 

723 

724 settings = filtered_settings 

725 

726 return jsonify({"status": "success", "settings": settings}) 

727 except Exception: 

728 logger.exception("Error getting settings") 

729 return jsonify({"error": "Failed to retrieve settings"}), 500 

730 

731 

732@settings_bp.route("/api/<path:key>", methods=["GET"]) 

733@login_required 

734def api_get_db_setting(key): 

735 """Get a specific setting by key""" 

736 try: 

737 username = session.get("username") 

738 

739 with get_user_db_session(username) as db_session: 

740 # Get setting from database using the same session 

741 db_setting = ( 

742 db_session.query(Setting).filter(Setting.key == key).first() 

743 ) 

744 

745 if db_setting: 

746 # Return full setting details 

747 setting_data = { 

748 "key": db_setting.key, 

749 "value": db_setting.value, 

750 "type": db_setting.type 

751 if isinstance(db_setting.type, str) 

752 else db_setting.type.value, 

753 "name": db_setting.name, 

754 "description": db_setting.description, 

755 "category": db_setting.category, 

756 "ui_element": db_setting.ui_element, 

757 "options": db_setting.options, 

758 "min_value": db_setting.min_value, 

759 "max_value": db_setting.max_value, 

760 "step": db_setting.step, 

761 "visible": db_setting.visible, 

762 "editable": db_setting.editable, 

763 } 

764 return jsonify(setting_data) 

765 else: 

766 # Setting not found 

767 return jsonify({"error": f"Setting not found: {key}"}), 404 

768 except Exception: 

769 logger.exception(f"Error getting setting {key}") 

770 return jsonify({"error": "Failed to retrieve settings"}), 500 

771 

772 

773@settings_bp.route("/api/<path:key>", methods=["PUT"]) 

774@login_required 

775def api_update_setting(key): 

776 """Update a setting""" 

777 try: 

778 # Get request data 

779 data = request.get_json() 

780 if not data: 780 ↛ 781line 780 didn't jump to line 781 because the condition on line 780 was never true

781 return jsonify({"error": "No data provided"}), 400 

782 

783 value = data.get("value") 

784 if value is None: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true

785 return jsonify({"error": "No value provided"}), 400 

786 

787 username = session.get("username") 

788 

789 with get_user_db_session(username) as db_session: 

790 # Only use settings_manager if needed - we don't need to assign if not used 

791 # get_db_settings_manager(db_session) 

792 

793 # Check if setting exists 

794 db_setting = ( 

795 db_session.query(Setting).filter(Setting.key == key).first() 

796 ) 

797 

798 if db_setting: 798 ↛ 800line 798 didn't jump to line 800 because the condition on line 798 was never true

799 # Check if setting is editable 

800 if not db_setting.editable: 

801 return jsonify( 

802 {"error": f"Setting {key} is not editable"} 

803 ), 403 

804 

805 # Update setting 

806 # Pass the db_session to avoid session lookup issues 

807 success = set_setting(key, value, db_session=db_session) 

808 if success: 

809 # Sync server config 

810 settings_manager = SettingsManager(db_session) 

811 settings_snapshot = settings_manager.get_settings_snapshot() 

812 sync_from_settings(settings_snapshot) 

813 

814 response_data = { 

815 "message": f"Setting {key} updated successfully" 

816 } 

817 

818 # If this is a key that affects warnings, include warning calculations 

819 warning_affecting_keys = [ 

820 "llm.provider", 

821 "search.tool", 

822 "search.iterations", 

823 "search.questions_per_iteration", 

824 "llm.local_context_window_size", 

825 "llm.context_window_unrestricted", 

826 "llm.context_window_size", 

827 ] 

828 

829 if key in warning_affecting_keys: 

830 warnings = calculate_warnings() 

831 response_data["warnings"] = warnings 

832 logger.debug( 

833 f"Setting {key} changed to {value}, calculated {len(warnings)} warnings" 

834 ) 

835 

836 return jsonify(response_data) 

837 else: 

838 return jsonify( 

839 {"error": f"Failed to update setting {key}"} 

840 ), 500 

841 else: 

842 # Create new setting with default metadata 

843 setting_dict = { 

844 "key": key, 

845 "value": value, 

846 "name": key.split(".")[-1].replace("_", " ").title(), 

847 "description": f"Setting for {key}", 

848 } 

849 

850 # Add additional metadata if provided 

851 for field in [ 

852 "type", 

853 "name", 

854 "description", 

855 "category", 

856 "ui_element", 

857 "options", 

858 "min_value", 

859 "max_value", 

860 "step", 

861 "visible", 

862 "editable", 

863 ]: 

864 if field in data: 

865 setting_dict[field] = data[field] 

866 

867 # Create setting 

868 db_setting = create_or_update_setting( 

869 setting_dict, db_session=db_session 

870 ) 

871 

872 if db_setting: 872 ↛ 893line 872 didn't jump to line 893 because the condition on line 872 was always true

873 # Sync server config 

874 settings_manager = SettingsManager(db_session) 

875 settings_snapshot = settings_manager.get_settings_snapshot() 

876 sync_from_settings(settings_snapshot) 

877 

878 return ( 

879 jsonify( 

880 { 

881 "message": f"Setting {key} created successfully", 

882 "setting": { 

883 "key": db_setting.key, 

884 "value": db_setting.value, 

885 "type": db_setting.type.value, 

886 "name": db_setting.name, 

887 }, 

888 } 

889 ), 

890 201, 

891 ) 

892 else: 

893 return jsonify( 

894 {"error": f"Failed to create setting {key}"} 

895 ), 500 

896 except Exception: 

897 logger.exception(f"Error updating setting {key}") 

898 return jsonify({"error": "Failed to retrieve settings"}), 500 

899 

900 

901@settings_bp.route("/api/<path:key>", methods=["DELETE"]) 

902@login_required 

903def api_delete_setting(key): 

904 """Delete a setting""" 

905 try: 

906 username = session.get("username") 

907 

908 with get_user_db_session(username) as db_session: 

909 # Create settings manager with the session from context 

910 settings_manager = SettingsManager(db_session) 

911 

912 # Check if setting exists 

913 db_setting = ( 

914 db_session.query(Setting).filter(Setting.key == key).first() 

915 ) 

916 if not db_setting: 

917 return jsonify({"error": f"Setting not found: {key}"}), 404 

918 

919 # Delete setting 

920 success = settings_manager.delete_setting(key) 

921 if success: 921 ↛ 926line 921 didn't jump to line 926 because the condition on line 921 was always true

922 return jsonify( 

923 {"message": f"Setting {key} deleted successfully"} 

924 ) 

925 else: 

926 return jsonify( 

927 {"error": f"Failed to delete setting {key}"} 

928 ), 500 

929 except Exception: 

930 logger.exception(f"Error deleting setting {key}") 

931 return jsonify({"error": "Failed to retrieve settings"}), 500 

932 

933 

934@settings_bp.route("/api/import", methods=["POST"]) 

935@login_required 

936def api_import_settings(): 

937 """Import settings from defaults file""" 

938 try: 

939 username = session.get("username") 

940 with get_user_db_session(username) as db_session: 

941 settings_manager = SettingsManager(db_session) 

942 success = settings_manager.load_from_defaults_file() 

943 

944 if success: 

945 return jsonify({"message": "Settings imported successfully"}) 

946 else: 

947 return jsonify({"error": "Failed to import settings"}), 500 

948 except Exception: 

949 logger.exception("Error importing settings") 

950 return jsonify({"error": "Failed to retrieve settings"}), 500 

951 

952 

953@settings_bp.route("/api/categories", methods=["GET"]) 

954@login_required 

955def api_get_categories(): 

956 """Get all setting categories""" 

957 try: 

958 username = session.get("username") 

959 

960 with get_user_db_session(username) as db_session: 

961 # Get all distinct categories 

962 categories = db_session.query(Setting.category).distinct().all() 

963 category_list = [c[0] for c in categories if c[0] is not None] 

964 

965 return jsonify({"categories": category_list}) 

966 except Exception: 

967 logger.exception("Error getting categories") 

968 return jsonify({"error": "Failed to retrieve settings"}), 500 

969 

970 

971@settings_bp.route("/api/types", methods=["GET"]) 

972@login_required 

973def api_get_types(): 

974 """Get all setting types""" 

975 try: 

976 # Get all setting types 

977 types = [t.value for t in SettingType] 

978 return jsonify({"types": types}) 

979 except Exception: 

980 logger.exception("Error getting types") 

981 return jsonify({"error": "Failed to retrieve settings"}), 500 

982 

983 

984@settings_bp.route("/api/ui_elements", methods=["GET"]) 

985@login_required 

986def api_get_ui_elements(): 

987 """Get all UI element types""" 

988 try: 

989 # Define supported UI element types 

990 ui_elements = [ 

991 "text", 

992 "select", 

993 "checkbox", 

994 "slider", 

995 "number", 

996 "textarea", 

997 "color", 

998 "date", 

999 "file", 

1000 "password", 

1001 ] 

1002 

1003 return jsonify({"ui_elements": ui_elements}) 

1004 except Exception: 

1005 logger.exception("Error getting UI elements") 

1006 return jsonify({"error": "Failed to retrieve settings"}), 500 

1007 

1008 

1009@settings_bp.route("/api/available-models", methods=["GET"]) 

1010@login_required 

1011def api_get_available_models(): 

1012 """Get available LLM models from various providers""" 

1013 try: 

1014 from flask import request 

1015 

1016 from ...database.models import ProviderModel 

1017 

1018 # Check if force_refresh is requested 

1019 force_refresh = ( 

1020 request.args.get("force_refresh", "false").lower() == "true" 

1021 ) 

1022 

1023 # Get all auto-discovered providers 

1024 from ...llm.providers import get_discovered_provider_options 

1025 

1026 provider_options = get_discovered_provider_options() 

1027 

1028 # Add remaining hardcoded providers (complex local providers not yet migrated) 

1029 provider_options.extend( 

1030 [ 

1031 {"value": "VLLM", "label": "vLLM (Local)"}, 

1032 {"value": "LLAMACPP", "label": "Llama.cpp (Local)"}, 

1033 ] 

1034 ) 

1035 

1036 # Available models by provider 

1037 providers = {} 

1038 

1039 # Check database cache first (unless force_refresh is True) 

1040 if not force_refresh: 1040 ↛ 1090line 1040 didn't jump to line 1090 because the condition on line 1040 was always true

1041 try: 

1042 # Define cache expiration (24 hours) 

1043 cache_expiry = datetime.now(UTC) - timedelta(hours=24) 

1044 

1045 # Get cached models from database 

1046 username = session.get("username") 

1047 with get_user_db_session(username) as db_session: 

1048 cached_models = ( 

1049 db_session.query(ProviderModel) 

1050 .filter(ProviderModel.last_updated > cache_expiry) 

1051 .all() 

1052 ) 

1053 

1054 if cached_models: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true

1055 logger.info( 

1056 f"Found {len(cached_models)} cached models in database" 

1057 ) 

1058 

1059 # Group models by provider 

1060 for model in cached_models: 

1061 provider_key = f"{model.provider.lower()}_models" 

1062 if provider_key not in providers: 

1063 providers[provider_key] = [] 

1064 

1065 providers[provider_key].append( 

1066 { 

1067 "value": model.model_key, 

1068 "label": model.model_label, 

1069 "provider": model.provider.upper(), 

1070 } 

1071 ) 

1072 

1073 # If we have cached data for all providers, return it 

1074 if providers: 

1075 logger.info("Returning cached models from database") 

1076 return jsonify( 

1077 { 

1078 "provider_options": provider_options, 

1079 "providers": providers, 

1080 } 

1081 ) 

1082 

1083 except Exception as e: 

1084 logger.warning( 

1085 f"Error reading cached models from database: {e}" 

1086 ) 

1087 # Continue to fetch fresh data 

1088 

1089 # Try to get Ollama models 

1090 ollama_models = [] 

1091 try: 

1092 import json 

1093 import re 

1094 

1095 import requests 

1096 

1097 # Try to query the Ollama API directly 

1098 try: 

1099 logger.info("Attempting to connect to Ollama API") 

1100 

1101 raw_base_url = _get_setting_from_session( 

1102 "llm.ollama.url", "http://localhost:11434" 

1103 ) 

1104 base_url = ( 

1105 normalize_url(raw_base_url) 

1106 if raw_base_url 

1107 else "http://localhost:11434" 

1108 ) 

1109 

1110 ollama_response = safe_get( 

1111 f"{base_url}/api/tags", 

1112 timeout=5, 

1113 allow_localhost=True, 

1114 allow_private_ips=True, 

1115 ) 

1116 

1117 logger.debug( 

1118 f"Ollama API response: Status {ollama_response.status_code}" 

1119 ) 

1120 

1121 # Try to parse the response even if status code is not 200 to help with debugging 

1122 response_text = ollama_response.text 

1123 logger.debug( 

1124 f"Ollama API raw response: {response_text[:500]}..." 

1125 ) 

1126 

1127 if ollama_response.status_code == 200: 

1128 try: 

1129 ollama_data = ollama_response.json() 

1130 logger.debug( 

1131 f"Ollama API JSON data: {json.dumps(ollama_data)[:500]}..." 

1132 ) 

1133 

1134 if "models" in ollama_data: 

1135 # Format for newer Ollama API 

1136 logger.info( 

1137 f"Found {len(ollama_data.get('models', []))} models in newer Ollama API format" 

1138 ) 

1139 for model in ollama_data.get("models", []): 

1140 # Extract name correctly from the model object 

1141 name = model.get("name", "") 

1142 if name: 

1143 # Improved display name formatting 

1144 display_name = re.sub( 

1145 r"[:/]", " ", name 

1146 ).strip() 

1147 display_name = " ".join( 

1148 word.capitalize() 

1149 for word in display_name.split() 

1150 ) 

1151 # Create the model entry with value and label 

1152 ollama_models.append( 

1153 { 

1154 "value": name, # Original model name as value (for API calls) 

1155 "label": f"{display_name} (Ollama)", # Pretty name as label 

1156 "provider": "OLLAMA", # Add provider field for consistency 

1157 } 

1158 ) 

1159 logger.debug( 

1160 f"Added Ollama model: {name} -> {display_name}" 

1161 ) 

1162 else: 

1163 # Format for older Ollama API 

1164 logger.info( 

1165 f"Found {len(ollama_data)} models in older Ollama API format" 

1166 ) 

1167 for model in ollama_data: 

1168 name = model.get("name", "") 

1169 if name: 

1170 # Improved display name formatting 

1171 display_name = re.sub( 

1172 r"[:/]", " ", name 

1173 ).strip() 

1174 display_name = " ".join( 

1175 word.capitalize() 

1176 for word in display_name.split() 

1177 ) 

1178 ollama_models.append( 

1179 { 

1180 "value": name, 

1181 "label": f"{display_name} (Ollama)", 

1182 "provider": "OLLAMA", # Add provider field for consistency 

1183 } 

1184 ) 

1185 logger.debug( 

1186 f"Added Ollama model: {name} -> {display_name}" 

1187 ) 

1188 

1189 except json.JSONDecodeError as json_err: 

1190 logger.exception( 

1191 f"Failed to parse Ollama API response as JSON: {json_err}" 

1192 ) 

1193 raise Exception( 

1194 f"Ollama API returned invalid JSON: {json_err}" 

1195 ) 

1196 else: 

1197 logger.warning( 

1198 f"Ollama API returned non-200 status code: {ollama_response.status_code}" 

1199 ) 

1200 raise Exception( 

1201 f"Ollama API returned status code {ollama_response.status_code}" 

1202 ) 

1203 

1204 except requests.exceptions.RequestException as e: 

1205 logger.warning(f"Could not connect to Ollama API: {e!s}") 

1206 # No fallback models - just return empty list 

1207 logger.info("Ollama not available - no models to display") 

1208 ollama_models = [] 

1209 

1210 # Always set the ollama_models in providers, whether we got real or fallback models 

1211 providers["ollama_models"] = ollama_models 

1212 logger.info(f"Final Ollama models count: {len(ollama_models)}") 

1213 

1214 # Log some model names for debugging 

1215 if ollama_models: 1215 ↛ 1216line 1215 didn't jump to line 1216 because the condition on line 1215 was never true

1216 model_names = [m["value"] for m in ollama_models[:5]] 

1217 logger.info(f"Sample Ollama models: {', '.join(model_names)}") 

1218 

1219 except Exception: 

1220 logger.exception("Error getting Ollama models") 

1221 # No fallback models - just return empty list 

1222 logger.info("Error getting Ollama models - no models to display") 

1223 providers["ollama_models"] = [] 

1224 

1225 # Note: Custom OpenAI Endpoint models are fetched via auto-discovery 

1226 # (see the auto-discovery loop below which handles OPENAI_ENDPOINT provider) 

1227 

1228 # Get OpenAI models using the OpenAI package 

1229 openai_models = [] 

1230 try: 

1231 logger.info( 

1232 "Attempting to connect to OpenAI API using OpenAI package" 

1233 ) 

1234 

1235 # Get the API key from settings 

1236 api_key = _get_setting_from_session("llm.openai.api_key", "") 

1237 

1238 if api_key: 1238 ↛ 1239line 1238 didn't jump to line 1239 because the condition on line 1238 was never true

1239 import openai 

1240 from openai import OpenAI 

1241 

1242 # Create OpenAI client 

1243 client = OpenAI(api_key=api_key) 

1244 

1245 try: 

1246 # Fetch models using the client 

1247 logger.debug("Fetching models from OpenAI API") 

1248 models_response = client.models.list() 

1249 

1250 # Process models from the response 

1251 for model in models_response.data: 

1252 model_id = model.id 

1253 if model_id: 

1254 # Create a clean display name 

1255 display_name = model_id.replace("-", " ").strip() 

1256 display_name = " ".join( 

1257 word.capitalize() 

1258 for word in display_name.split() 

1259 ) 

1260 

1261 openai_models.append( 

1262 { 

1263 "value": model_id, 

1264 "label": f"{display_name} (OpenAI)", 

1265 "provider": "OPENAI", 

1266 } 

1267 ) 

1268 logger.debug( 

1269 f"Added OpenAI model: {model_id} -> {display_name}" 

1270 ) 

1271 

1272 # Keep original order from OpenAI - their models are returned in a 

1273 # meaningful order (newer/more capable models first) 

1274 

1275 except openai.APIError as api_err: 

1276 logger.exception(f"OpenAI API error: {api_err!s}") 

1277 logger.info("No OpenAI models found due to API error") 

1278 

1279 else: 

1280 logger.info( 

1281 "OpenAI API key not configured, no models available" 

1282 ) 

1283 

1284 except Exception as e: 

1285 logger.exception(f"Error getting OpenAI models: {e!s}") 

1286 logger.info("No OpenAI models available due to error") 

1287 

1288 # Always set the openai_models in providers (will be empty array if no models found) 

1289 providers["openai_models"] = openai_models 

1290 logger.info(f"Final OpenAI models count: {len(openai_models)}") 

1291 

1292 # Try to get Anthropic models using the Anthropic package 

1293 anthropic_models = [] 

1294 try: 

1295 logger.info( 

1296 "Attempting to connect to Anthropic API using Anthropic package" 

1297 ) 

1298 

1299 # Get the API key from settings 

1300 api_key = _get_setting_from_session("llm.anthropic.api_key", "") 

1301 

1302 if api_key: 1302 ↛ 1304line 1302 didn't jump to line 1304 because the condition on line 1302 was never true

1303 # Import Anthropic package here to avoid dependency issues if not installed 

1304 from anthropic import Anthropic 

1305 

1306 # Create Anthropic client 

1307 client = Anthropic(api_key=api_key) 

1308 

1309 try: 

1310 # Fetch models using the client 

1311 logger.debug("Fetching models from Anthropic API") 

1312 models_response = client.models.list() 

1313 

1314 # Process models from the response 

1315 for model in models_response.data: 

1316 model_id = model.id 

1317 if model_id: 

1318 # Create a clean display name 

1319 display_name = model_id.replace("-", " ").strip() 

1320 display_name = " ".join( 

1321 word.capitalize() 

1322 for word in display_name.split() 

1323 ) 

1324 

1325 anthropic_models.append( 

1326 { 

1327 "value": model_id, 

1328 "label": f"{display_name} (Anthropic)", 

1329 "provider": "ANTHROPIC", 

1330 } 

1331 ) 

1332 logger.debug( 

1333 f"Added Anthropic model: {model_id} -> {display_name}" 

1334 ) 

1335 

1336 except Exception as api_err: 

1337 logger.exception(f"Anthropic API error: {api_err!s}") 

1338 else: 

1339 logger.info("Anthropic API key not configured") 

1340 

1341 except ImportError: 

1342 logger.warning( 

1343 "Anthropic package not installed. No models will be available." 

1344 ) 

1345 except Exception as e: 

1346 logger.exception(f"Error getting Anthropic models: {e!s}") 

1347 

1348 # Set anthropic_models in providers (could be empty if API call failed) 

1349 providers["anthropic_models"] = anthropic_models 

1350 logger.info(f"Final Anthropic models count: {len(anthropic_models)}") 

1351 

1352 # Fetch models from auto-discovered providers 

1353 from ...llm.providers import discover_providers 

1354 

1355 discovered_providers = discover_providers() 

1356 

1357 for provider_key, provider_info in discovered_providers.items(): 

1358 provider_models = [] 

1359 try: 

1360 logger.info( 

1361 f"Fetching models from {provider_info.provider_name}" 

1362 ) 

1363 

1364 # Get the provider class 

1365 provider_class = provider_info.provider_class 

1366 

1367 # Get API key if configured 

1368 api_key = _get_setting_from_session( 

1369 provider_class.api_key_setting, "" 

1370 ) 

1371 

1372 # Get base URL if provider has configurable URL 

1373 base_url = None 

1374 if ( 

1375 hasattr(provider_class, "url_setting") 

1376 and provider_class.url_setting 

1377 ): 

1378 base_url = _get_setting_from_session( 

1379 provider_class.url_setting, "" 

1380 ) 

1381 

1382 # Use the provider's list_models_for_api method 

1383 models = provider_class.list_models_for_api(api_key, base_url) 

1384 

1385 # Format models for the API response 

1386 for model in models: 

1387 provider_models.append( 

1388 { 

1389 "value": model["value"], 

1390 "label": model[ 

1391 "label" 

1392 ], # Use provider's label as-is 

1393 "provider": provider_key, 

1394 } 

1395 ) 

1396 

1397 logger.info( 

1398 f"Successfully fetched {len(provider_models)} models from {provider_info.provider_name}" 

1399 ) 

1400 

1401 except Exception as e: 

1402 logger.exception( 

1403 f"Error getting {provider_info.provider_name} models: {e!s}" 

1404 ) 

1405 

1406 # Set models in providers dict using lowercase key 

1407 providers[f"{provider_key.lower()}_models"] = provider_models 

1408 logger.info( 

1409 f"Final {provider_key} models count: {len(provider_models)}" 

1410 ) 

1411 

1412 # Save fetched models to database cache 

1413 if force_refresh or providers: 1413 ↛ 1462line 1413 didn't jump to line 1462 because the condition on line 1413 was always true

1414 # We fetched fresh data, save it to database 

1415 username = session.get("username") 

1416 with get_user_db_session(username) as db_session: 

1417 try: 

1418 if force_refresh: 1418 ↛ 1421line 1418 didn't jump to line 1421 because the condition on line 1418 was never true

1419 # When force refresh, clear ALL cached models to remove any stale data 

1420 # from old code versions or deleted providers 

1421 deleted_count = db_session.query(ProviderModel).delete() 

1422 logger.info( 

1423 f"Force refresh: cleared all {deleted_count} cached models" 

1424 ) 

1425 else: 

1426 # Clear old cache entries only for providers we're updating 

1427 for provider_key in providers: 

1428 provider_name = provider_key.replace( 

1429 "_models", "" 

1430 ).upper() 

1431 db_session.query(ProviderModel).filter( 

1432 ProviderModel.provider == provider_name 

1433 ).delete() 

1434 

1435 # Insert new models 

1436 for provider_key, models in providers.items(): 

1437 provider_name = provider_key.replace( 

1438 "_models", "" 

1439 ).upper() 

1440 for model in models: 

1441 if ( 1441 ↛ 1440line 1441 didn't jump to line 1440 because the condition on line 1441 was always true

1442 isinstance(model, dict) 

1443 and "value" in model 

1444 and "label" in model 

1445 ): 

1446 new_model = ProviderModel( 

1447 provider=provider_name, 

1448 model_key=model["value"], 

1449 model_label=model["label"], 

1450 last_updated=datetime.now(UTC), 

1451 ) 

1452 db_session.add(new_model) 

1453 

1454 db_session.commit() 

1455 logger.info("Successfully cached models to database") 

1456 

1457 except Exception: 

1458 logger.exception("Error saving models to database cache") 

1459 db_session.rollback() 

1460 

1461 # Return all options 

1462 return jsonify( 

1463 {"provider_options": provider_options, "providers": providers} 

1464 ) 

1465 

1466 except Exception: 

1467 logger.exception("Error getting available models") 

1468 return jsonify( 

1469 {"status": "error", "message": "Failed to save settings"} 

1470 ), 500 

1471 

1472 

1473def _get_engine_icon_and_category( 

1474 engine_data: dict, engine_class=None 

1475) -> tuple: 

1476 """ 

1477 Get icon emoji and category label for a search engine based on its attributes. 

1478 

1479 Args: 

1480 engine_data: Engine configuration dictionary 

1481 engine_class: Optional loaded engine class to check attributes 

1482 

1483 Returns: 

1484 Tuple of (icon, category) strings 

1485 """ 

1486 # Check attributes from either the class or the engine data 

1487 if engine_class: 1487 ↛ 1494line 1487 didn't jump to line 1494 because the condition on line 1487 was always true

1488 is_scientific = getattr(engine_class, "is_scientific", False) 

1489 is_generic = getattr(engine_class, "is_generic", False) 

1490 is_local = getattr(engine_class, "is_local", False) 

1491 is_news = getattr(engine_class, "is_news", False) 

1492 is_code = getattr(engine_class, "is_code", False) 

1493 else: 

1494 is_scientific = engine_data.get("is_scientific", False) 

1495 is_generic = engine_data.get("is_generic", False) 

1496 is_local = engine_data.get("is_local", False) 

1497 is_news = engine_data.get("is_news", False) 

1498 is_code = engine_data.get("is_code", False) 

1499 

1500 # Return icon and category based on engine type 

1501 # Priority: local > scientific > news > code > generic > default 

1502 if is_local: 

1503 return "📁", "Local RAG" 

1504 elif is_scientific: 

1505 return "🔬", "Scientific" 

1506 elif is_news: 

1507 return "📰", "News" 

1508 elif is_code: 1508 ↛ 1509line 1508 didn't jump to line 1509 because the condition on line 1508 was never true

1509 return "💻", "Code" 

1510 elif is_generic: 

1511 return "🌐", "Web Search" 

1512 else: 

1513 return "🔍", "Search" 

1514 

1515 

1516@settings_bp.route("/api/available-search-engines", methods=["GET"]) 

1517@login_required 

1518def api_get_available_search_engines(): 

1519 """Get available search engines""" 

1520 try: 

1521 # Get search engines using the same approach as search_engines_config.py 

1522 from ...web_search_engines.search_engines_config import search_config 

1523 from ...database.session_context import get_user_db_session 

1524 

1525 username = session.get("username") 

1526 with get_user_db_session(username) as db_session: 

1527 search_engines = search_config( 

1528 username=username, db_session=db_session 

1529 ) 

1530 

1531 # Extract search engines from config 

1532 engines_dict = {} 

1533 engine_options = [] 

1534 

1535 if search_engines: 1535 ↛ 1586line 1535 didn't jump to line 1586 because the condition on line 1535 was always true

1536 # Format engines for API response with metadata 

1537 from importlib import import_module 

1538 

1539 for engine_id, engine_data in search_engines.items(): 

1540 # Try to load the engine class to get metadata 

1541 engine_class = None 

1542 try: 

1543 module_path = engine_data.get("module_path") 

1544 class_name = engine_data.get("class_name") 

1545 if module_path and class_name: 1545 ↛ 1560line 1545 didn't jump to line 1560 because the condition on line 1545 was always true

1546 # Handle relative imports 

1547 package = None 

1548 if module_path.startswith("."): 

1549 package = ( 

1550 "local_deep_research.web_search_engines" 

1551 ) 

1552 module = import_module(module_path, package=package) 

1553 engine_class = getattr(module, class_name, None) 

1554 except Exception as e: 

1555 logger.debug( 

1556 f"Could not load engine class for {engine_id}: {e}" 

1557 ) 

1558 

1559 # Get icon and category from engine attributes 

1560 icon, category = _get_engine_icon_and_category( 

1561 engine_data, engine_class 

1562 ) 

1563 

1564 # Build display name with icon and category 

1565 base_name = engine_data.get("display_name", engine_id) 

1566 label = f"{icon} {base_name} ({category})" 

1567 

1568 engines_dict[engine_id] = { 

1569 "display_name": base_name, 

1570 "description": engine_data.get("description", ""), 

1571 "strengths": engine_data.get("strengths", []), 

1572 "icon": icon, 

1573 "category": category, 

1574 } 

1575 

1576 engine_options.append( 

1577 { 

1578 "value": engine_id, 

1579 "label": label, 

1580 "icon": icon, 

1581 "category": category, 

1582 } 

1583 ) 

1584 

1585 # If no engines found, log the issue but return empty list 

1586 if not engine_options: 1586 ↛ 1587line 1586 didn't jump to line 1587 because the condition on line 1586 was never true

1587 logger.warning("No search engines found in configuration") 

1588 

1589 return jsonify( 

1590 {"engines": engines_dict, "engine_options": engine_options} 

1591 ) 

1592 

1593 except Exception: 

1594 logger.exception("Error getting available search engines") 

1595 return jsonify({"error": "Failed to retrieve settings"}), 500 

1596 

1597 

1598# Legacy routes for backward compatibility - these will redirect to the new routes 

1599@settings_bp.route("/main", methods=["GET"]) 

1600@login_required 

1601def main_config_page(): 

1602 """Redirect to app settings page""" 

1603 return redirect(url_for("settings.settings_page")) 

1604 

1605 

1606@settings_bp.route("/collections", methods=["GET"]) 

1607@login_required 

1608def collections_config_page(): 

1609 """Redirect to app settings page""" 

1610 return redirect(url_for("settings.settings_page")) 

1611 

1612 

1613@settings_bp.route("/api_keys", methods=["GET"]) 

1614@login_required 

1615def api_keys_config_page(): 

1616 """Redirect to LLM settings page""" 

1617 return redirect(url_for("settings.settings_page")) 

1618 

1619 

1620@settings_bp.route("/search_engines", methods=["GET"]) 

1621@login_required 

1622def search_engines_config_page(): 

1623 """Redirect to search settings page""" 

1624 return redirect(url_for("settings.settings_page")) 

1625 

1626 

1627@settings_bp.route("/open_file_location", methods=["POST"]) 

1628@login_required 

1629def open_file_location(): 

1630 """Open the location of a configuration file""" 

1631 file_path = request.form.get("file_path") 

1632 

1633 if not file_path: 

1634 flash("No file path provided", "error") 

1635 return redirect(url_for("settings.settings_page")) 

1636 

1637 try: 

1638 # Use centralized path validator for security 

1639 from ...security.path_validator import PathValidator 

1640 from ...config.paths import get_data_directory 

1641 

1642 try: 

1643 # PathValidator.validate_config_path already checks existence 

1644 resolved_path = PathValidator.validate_config_path( 

1645 file_path, get_data_directory() 

1646 ) 

1647 except ValueError as e: 

1648 # The validator will raise ValueError if file doesn't exist 

1649 flash(f"Invalid file path: {str(e)}", "error") 

1650 return redirect(url_for("settings.settings_page")) 

1651 

1652 # Get the directory containing the file 

1653 dir_path = resolved_path.parent 

1654 file_path = resolved_path # Use resolved path going forward 

1655 

1656 # Open the directory in the file explorer 

1657 if platform.system() == "Windows": 

1658 subprocess.Popen(["explorer", str(dir_path)]) 

1659 elif platform.system() == "Darwin": # macOS 

1660 subprocess.Popen(["open", str(dir_path)]) 

1661 else: # Linux 

1662 subprocess.Popen(["xdg-open", str(dir_path)]) 

1663 

1664 flash(f"Opening folder: {dir_path}", "success") 

1665 except Exception as e: 

1666 logger.exception("Error opening folder") 

1667 flash(f"Error opening folder: {e!s}", "error") 

1668 

1669 # Redirect back to the settings page 

1670 return redirect(url_for("settings.settings_page")) 

1671 

1672 

1673@settings_bp.context_processor 

1674def inject_csrf_token(): 

1675 """Inject CSRF token into the template context for all settings routes.""" 

1676 return dict(csrf_token=generate_csrf) 

1677 

1678 

1679@settings_bp.route("/fix_corrupted_settings", methods=["POST"]) 

1680@login_required 

1681def fix_corrupted_settings(): 

1682 """Fix corrupted settings in the database""" 

1683 username = session.get("username") 

1684 

1685 with get_user_db_session(username) as db_session: 

1686 try: 

1687 # Track fixed and removed settings 

1688 fixed_settings = [] 

1689 removed_duplicate_settings = [] 

1690 fixed_scoping_issues = [] 

1691 

1692 # First, find and remove duplicate settings with the same key 

1693 # This happens because of errors in settings import/export 

1694 from sqlalchemy import func as sql_func 

1695 

1696 # Find keys with duplicates 

1697 duplicate_keys = ( 

1698 db_session.query(Setting.key) 

1699 .group_by(Setting.key) 

1700 .having(sql_func.count(Setting.key) > 1) 

1701 .all() 

1702 ) 

1703 duplicate_keys = [key[0] for key in duplicate_keys] 

1704 

1705 # For each duplicate key, keep the latest updated one and remove others 

1706 for key in duplicate_keys: 

1707 dupe_settings = ( 

1708 db_session.query(Setting) 

1709 .filter(Setting.key == key) 

1710 .order_by(Setting.updated_at.desc()) 

1711 .all() 

1712 ) 

1713 

1714 # Keep the first one (most recently updated) and delete the rest 

1715 for i, setting in enumerate(dupe_settings): 

1716 if i > 0: # Skip the first one (keep it) 

1717 db_session.delete(setting) 

1718 removed_duplicate_settings.append(key) 

1719 

1720 # Fix scoping issues - remove app.* settings that should be in other categories 

1721 # Report settings 

1722 for key in [ 

1723 "app.enable_fact_checking", 

1724 "app.knowledge_accumulation", 

1725 "app.knowledge_accumulation_context_limit", 

1726 "app.output_dir", 

1727 ]: 

1728 setting = ( 

1729 db_session.query(Setting).filter(Setting.key == key).first() 

1730 ) 

1731 if setting: 

1732 # Move to proper category if not already there 

1733 proper_key = key.replace("app.", "report.") 

1734 existing_proper = ( 

1735 db_session.query(Setting) 

1736 .filter(Setting.key == proper_key) 

1737 .first() 

1738 ) 

1739 

1740 if not existing_proper: 

1741 # Create proper setting 

1742 new_setting = Setting( 

1743 key=proper_key, 

1744 value=setting.value, 

1745 type=SettingType.REPORT, 

1746 name=setting.name, 

1747 description=setting.description, 

1748 category=( 

1749 setting.category.replace("app", "report") 

1750 if setting.category 

1751 else "report_parameters" 

1752 ), 

1753 ui_element=setting.ui_element, 

1754 options=setting.options, 

1755 min_value=setting.min_value, 

1756 max_value=setting.max_value, 

1757 step=setting.step, 

1758 visible=setting.visible, 

1759 editable=setting.editable, 

1760 ) 

1761 db_session.add(new_setting) 

1762 

1763 # Delete the app one 

1764 db_session.delete(setting) 

1765 fixed_scoping_issues.append(key) 

1766 

1767 # Search settings 

1768 for key in [ 

1769 "app.questions_per_iteration", 

1770 "app.search_engine", 

1771 "app.iterations", 

1772 "app.max_results", 

1773 "app.region", 

1774 "app.safe_search", 

1775 "app.search_language", 

1776 "app.snippets_only", 

1777 ]: 

1778 setting = ( 

1779 db_session.query(Setting).filter(Setting.key == key).first() 

1780 ) 

1781 if setting: 

1782 # Move to proper category if not already there 

1783 proper_key = key.replace("app.", "search.") 

1784 existing_proper = ( 

1785 db_session.query(Setting) 

1786 .filter(Setting.key == proper_key) 

1787 .first() 

1788 ) 

1789 

1790 if not existing_proper: 

1791 # Create proper setting 

1792 new_setting = Setting( 

1793 key=proper_key, 

1794 value=setting.value, 

1795 type=SettingType.SEARCH, 

1796 name=setting.name, 

1797 description=setting.description, 

1798 category=( 

1799 setting.category.replace("app", "search") 

1800 if setting.category 

1801 else "search_parameters" 

1802 ), 

1803 ui_element=setting.ui_element, 

1804 options=setting.options, 

1805 min_value=setting.min_value, 

1806 max_value=setting.max_value, 

1807 step=setting.step, 

1808 visible=setting.visible, 

1809 editable=setting.editable, 

1810 ) 

1811 db_session.add(new_setting) 

1812 

1813 # Delete the app one 

1814 db_session.delete(setting) 

1815 fixed_scoping_issues.append(key) 

1816 

1817 # LLM settings 

1818 for key in [ 

1819 "app.model", 

1820 "app.provider", 

1821 "app.temperature", 

1822 "app.max_tokens", 

1823 "app.openai_endpoint_url", 

1824 "app.lmstudio_url", 

1825 "app.llamacpp_model_path", 

1826 ]: 

1827 setting = ( 

1828 db_session.query(Setting).filter(Setting.key == key).first() 

1829 ) 

1830 if setting: 

1831 # Move to proper category if not already there 

1832 proper_key = key.replace("app.", "llm.") 

1833 existing_proper = ( 

1834 db_session.query(Setting) 

1835 .filter(Setting.key == proper_key) 

1836 .first() 

1837 ) 

1838 

1839 if not existing_proper: 

1840 # Create proper setting 

1841 new_setting = Setting( 

1842 key=proper_key, 

1843 value=setting.value, 

1844 type=SettingType.LLM, 

1845 name=setting.name, 

1846 description=setting.description, 

1847 category=( 

1848 setting.category.replace("app", "llm") 

1849 if setting.category 

1850 else "llm_parameters" 

1851 ), 

1852 ui_element=setting.ui_element, 

1853 options=setting.options, 

1854 min_value=setting.min_value, 

1855 max_value=setting.max_value, 

1856 step=setting.step, 

1857 visible=setting.visible, 

1858 editable=setting.editable, 

1859 ) 

1860 db_session.add(new_setting) 

1861 

1862 # Delete the app one 

1863 db_session.delete(setting) 

1864 fixed_scoping_issues.append(key) 

1865 

1866 # Check for settings with corrupted values 

1867 all_settings = db_session.query(Setting).all() 

1868 for setting in all_settings: 

1869 # Check different types of corruption 

1870 is_corrupted = False 

1871 

1872 if ( 

1873 setting.value is None 

1874 or ( 

1875 isinstance(setting.value, str) 

1876 and setting.value 

1877 in [ 

1878 "{", 

1879 "[", 

1880 "{}", 

1881 "[]", 

1882 "[object Object]", 

1883 "null", 

1884 "undefined", 

1885 ] 

1886 ) 

1887 or ( 

1888 isinstance(setting.value, dict) 

1889 and len(setting.value) == 0 

1890 ) 

1891 ): 

1892 is_corrupted = True 

1893 

1894 # Skip if not corrupted 

1895 if not is_corrupted: 

1896 continue 

1897 

1898 # Get default value from migrations 

1899 # Import commented out as it's not directly used 

1900 # from ..database.migrations import setup_predefined_settings 

1901 

1902 default_value = None 

1903 

1904 # Try to find a matching default setting based on key 

1905 if setting.key.startswith("llm."): 

1906 if setting.key == "llm.model": 

1907 default_value = "gpt-3.5-turbo" 

1908 elif setting.key == "llm.provider": 

1909 default_value = "openai" 

1910 elif setting.key == "llm.temperature": 

1911 default_value = 0.7 

1912 elif setting.key == "llm.max_tokens": 

1913 default_value = 1024 

1914 elif setting.key.startswith("search."): 

1915 if setting.key == "search.tool": 

1916 default_value = "auto" 

1917 elif setting.key == "search.max_results": 

1918 default_value = 10 

1919 elif setting.key == "search.region": 

1920 default_value = "us" 

1921 elif setting.key == "search.questions_per_iteration": 

1922 default_value = 3 

1923 elif setting.key == "search.searches_per_section": 

1924 default_value = 2 

1925 elif setting.key == "search.skip_relevance_filter": 

1926 default_value = False 

1927 elif setting.key == "search.safe_search": 

1928 default_value = True 

1929 elif setting.key == "search.search_language": 

1930 default_value = "English" 

1931 elif setting.key.startswith("report."): 

1932 if setting.key == "report.searches_per_section": 

1933 default_value = 2 

1934 elif ( 

1935 setting.key == "report.enable_fact_checking" 

1936 or setting.key == "report.detailed_citations" 

1937 ): 

1938 default_value = True 

1939 elif setting.key.startswith("app."): 

1940 if ( 

1941 setting.key == "app.theme" 

1942 or setting.key == "app.default_theme" 

1943 ): 

1944 default_value = "dark" 

1945 elif setting.key == "app.enable_notifications" or ( 

1946 setting.key == "app.enable_web" 

1947 or setting.key == "app.web_interface" 

1948 ): 

1949 default_value = True 

1950 elif setting.key == "app.host": 

1951 default_value = "0.0.0.0" 

1952 elif setting.key == "app.port": 

1953 default_value = 5000 

1954 elif setting.key == "app.debug": 

1955 default_value = True 

1956 

1957 # Update the setting with the default value if found 

1958 if default_value is not None: 

1959 setting.value = default_value 

1960 fixed_settings.append(setting.key) 

1961 else: 

1962 # If no default found but it's a corrupted JSON, set to empty object 

1963 if setting.key.startswith("report."): 

1964 setting.value = {} 

1965 fixed_settings.append(setting.key) 

1966 

1967 # Commit changes 

1968 if ( 

1969 fixed_settings 

1970 or removed_duplicate_settings 

1971 or fixed_scoping_issues 

1972 ): 

1973 db_session.commit() 

1974 logger.info( 

1975 f"Fixed {len(fixed_settings)} corrupted settings: {', '.join(fixed_settings)}" 

1976 ) 

1977 if removed_duplicate_settings: 

1978 logger.info( 

1979 f"Removed {len(removed_duplicate_settings)} duplicate settings" 

1980 ) 

1981 if fixed_scoping_issues: 

1982 logger.info( 

1983 f"Fixed {len(fixed_scoping_issues)} scoping issues" 

1984 ) 

1985 

1986 # Return success 

1987 return jsonify( 

1988 { 

1989 "status": "success", 

1990 "message": f"Fixed {len(fixed_settings)} corrupted settings, removed {len(removed_duplicate_settings)} duplicates, and fixed {len(fixed_scoping_issues)} scoping issues", 

1991 "fixed_settings": fixed_settings, 

1992 "removed_duplicates": removed_duplicate_settings, 

1993 "fixed_scoping": fixed_scoping_issues, 

1994 } 

1995 ) 

1996 

1997 except Exception: 

1998 logger.exception("Error fixing corrupted settings") 

1999 db_session.rollback() 

2000 return ( 

2001 jsonify( 

2002 { 

2003 "status": "error", 

2004 "message": "An internal error occurred while fixing corrupted settings. Please try again later.", 

2005 } 

2006 ), 

2007 500, 

2008 ) 

2009 

2010 

2011@settings_bp.route("/api/warnings", methods=["GET"]) 

2012@login_required 

2013def api_get_warnings(): 

2014 """Get current warnings based on settings""" 

2015 try: 

2016 warnings = calculate_warnings() 

2017 return jsonify({"warnings": warnings}) 

2018 except Exception: 

2019 logger.exception("Error getting warnings") 

2020 return jsonify({"error": "Failed to retrieve settings"}), 500 

2021 

2022 

2023@settings_bp.route("/api/ollama-status", methods=["GET"]) 

2024@login_required 

2025def check_ollama_status(): 

2026 """Check if Ollama is running and available""" 

2027 try: 

2028 # Get Ollama URL from settings 

2029 raw_base_url = _get_setting_from_session( 

2030 "llm.ollama.url", "http://localhost:11434" 

2031 ) 

2032 base_url = ( 

2033 normalize_url(raw_base_url) 

2034 if raw_base_url 

2035 else "http://localhost:11434" 

2036 ) 

2037 

2038 response = safe_get( 

2039 f"{base_url}/api/version", 

2040 timeout=2.0, 

2041 allow_localhost=True, 

2042 allow_private_ips=True, 

2043 ) 

2044 

2045 if response.status_code == 200: 

2046 return jsonify( 

2047 { 

2048 "running": True, 

2049 "version": response.json().get("version", "unknown"), 

2050 } 

2051 ) 

2052 else: 

2053 return jsonify( 

2054 { 

2055 "running": False, 

2056 "error": f"Ollama returned status code {response.status_code}", 

2057 } 

2058 ) 

2059 except requests.exceptions.RequestException: 

2060 logger.exception("Ollama check failed") 

2061 return jsonify( 

2062 {"running": False, "error": "Failed to check search engine status"} 

2063 ) 

2064 

2065 

2066@settings_bp.route("/api/rate-limiting/status", methods=["GET"]) 

2067@login_required 

2068def api_get_rate_limiting_status(): 

2069 """Get current rate limiting status and statistics""" 

2070 try: 

2071 from ...web_search_engines.rate_limiting import get_tracker 

2072 

2073 tracker = get_tracker() 

2074 

2075 # Get basic status 

2076 status = { 

2077 "enabled": tracker.enabled, 

2078 "exploration_rate": tracker.exploration_rate, 

2079 "learning_rate": tracker.learning_rate, 

2080 "memory_window": tracker.memory_window, 

2081 } 

2082 

2083 # Get engine statistics 

2084 engine_stats = tracker.get_stats() 

2085 engines = [] 

2086 

2087 for stat in engine_stats: 2087 ↛ 2088line 2087 didn't jump to line 2088 because the loop on line 2087 never started

2088 ( 

2089 engine_type, 

2090 base_wait, 

2091 min_wait, 

2092 max_wait, 

2093 last_updated, 

2094 total_attempts, 

2095 success_rate, 

2096 ) = stat 

2097 engines.append( 

2098 { 

2099 "engine_type": engine_type, 

2100 "base_wait_seconds": round(base_wait, 2), 

2101 "min_wait_seconds": round(min_wait, 2), 

2102 "max_wait_seconds": round(max_wait, 2), 

2103 "last_updated": last_updated, 

2104 "total_attempts": total_attempts, 

2105 "success_rate": ( 

2106 round(success_rate * 100, 1) if success_rate else 0.0 

2107 ), 

2108 } 

2109 ) 

2110 

2111 return jsonify({"status": status, "engines": engines}) 

2112 

2113 except Exception: 

2114 logger.exception("Error getting rate limiting status") 

2115 return jsonify({"error": "An internal error occurred"}), 500 

2116 

2117 

2118@settings_bp.route( 

2119 "/api/rate-limiting/engines/<engine_type>/reset", methods=["POST"] 

2120) 

2121@login_required 

2122def api_reset_engine_rate_limiting(engine_type): 

2123 """Reset rate limiting data for a specific engine""" 

2124 try: 

2125 from ...web_search_engines.rate_limiting import get_tracker 

2126 

2127 tracker = get_tracker() 

2128 tracker.reset_engine(engine_type) 

2129 

2130 return jsonify( 

2131 {"message": f"Rate limiting data reset for {engine_type}"} 

2132 ) 

2133 

2134 except Exception: 

2135 logger.exception(f"Error resetting rate limiting for {engine_type}") 

2136 return jsonify({"error": "An internal error occurred"}), 500 

2137 

2138 

2139@settings_bp.route("/api/rate-limiting/cleanup", methods=["POST"]) 

2140@login_required 

2141def api_cleanup_rate_limiting(): 

2142 """Clean up old rate limiting data""" 

2143 try: 

2144 from ...web_search_engines.rate_limiting import get_tracker 

2145 

2146 days = request.json.get("days", 30) if request.is_json else 30 

2147 

2148 tracker = get_tracker() 

2149 tracker.cleanup_old_data(days) 

2150 

2151 return jsonify( 

2152 {"message": f"Cleaned up rate limiting data older than {days} days"} 

2153 ) 

2154 

2155 except Exception: 

2156 logger.exception("Error cleaning up rate limiting data") 

2157 return jsonify({"error": "An internal error occurred"}), 500 

2158 

2159 

2160@settings_bp.route("/api/bulk", methods=["GET"]) 

2161@login_required 

2162def get_bulk_settings(): 

2163 """Get multiple settings at once for performance.""" 

2164 try: 

2165 # Get requested settings from query parameters 

2166 requested = request.args.getlist("keys[]") 

2167 if not requested: 2167 ↛ 2183line 2167 didn't jump to line 2183 because the condition on line 2167 was always true

2168 # Default to common settings if none specified 

2169 requested = [ 

2170 "llm.provider", 

2171 "llm.model", 

2172 "search.tool", 

2173 "search.iterations", 

2174 "search.questions_per_iteration", 

2175 "search.search_strategy", 

2176 "benchmark.evaluation.provider", 

2177 "benchmark.evaluation.model", 

2178 "benchmark.evaluation.temperature", 

2179 "benchmark.evaluation.endpoint_url", 

2180 ] 

2181 

2182 # Fetch all settings at once 

2183 result = {} 

2184 for key in requested: 

2185 try: 

2186 value = _get_setting_from_session(key) 

2187 result[key] = {"value": value, "exists": value is not None} 

2188 except Exception as e: 

2189 logger.warning(f"Error getting setting {key}: {e}") 

2190 result[key] = { 

2191 "value": None, 

2192 "exists": False, 

2193 "error": "Failed to retrieve setting", 

2194 } 

2195 

2196 return jsonify({"success": True, "settings": result}) 

2197 

2198 except Exception: 

2199 logger.exception("Error getting bulk settings") 

2200 return jsonify( 

2201 {"success": False, "error": "An internal error occurred"} 

2202 ), 500 

2203 

2204 

2205@settings_bp.route("/api/data-location", methods=["GET"]) 

2206@login_required 

2207def api_get_data_location(): 

2208 """Get information about data storage location and security""" 

2209 try: 

2210 # Get the data directory path 

2211 data_dir = get_data_directory() 

2212 # Get the encrypted databases path 

2213 encrypted_db_path = get_encrypted_database_path() 

2214 

2215 # Check if LDR_DATA_DIR environment variable is set 

2216 from local_deep_research.settings.manager import SettingsManager 

2217 

2218 settings_manager = SettingsManager() 

2219 custom_data_dir = settings_manager.get_setting("bootstrap.data_dir") 

2220 

2221 # Get platform-specific default location info 

2222 platform_info = { 

2223 "Windows": "C:\\Users\\Username\\AppData\\Local\\local-deep-research", 

2224 "macOS": "~/Library/Application Support/local-deep-research", 

2225 "Linux": "~/.local/share/local-deep-research", 

2226 } 

2227 

2228 # Current platform 

2229 current_platform = platform.system() 

2230 if current_platform == "Darwin": 2230 ↛ 2231line 2230 didn't jump to line 2231 because the condition on line 2230 was never true

2231 current_platform = "macOS" 

2232 

2233 # Get SQLCipher settings from environment 

2234 from ...database.sqlcipher_utils import get_sqlcipher_settings 

2235 

2236 # Debug logging 

2237 logger.info(f"db_manager type: {type(db_manager)}") 

2238 logger.info( 

2239 f"db_manager.has_encryption: {getattr(db_manager, 'has_encryption', 'ATTRIBUTE NOT FOUND')}" 

2240 ) 

2241 

2242 cipher_settings = ( 

2243 get_sqlcipher_settings() if db_manager.has_encryption else {} 

2244 ) 

2245 

2246 return jsonify( 

2247 { 

2248 "data_directory": str(data_dir), 

2249 "database_path": str(encrypted_db_path), 

2250 "encrypted_database_path": str(encrypted_db_path), 

2251 "is_custom": custom_data_dir is not None, 

2252 "custom_env_var": "LDR_DATA_DIR", 

2253 "custom_env_value": custom_data_dir, 

2254 "platform": current_platform, 

2255 "platform_default": platform_info.get( 

2256 current_platform, str(data_dir) 

2257 ), 

2258 "platform_info": platform_info, 

2259 "security_notice": { 

2260 "encrypted": db_manager.has_encryption, 

2261 "warning": "All data including API keys stored in the database are securely encrypted." 

2262 if db_manager.has_encryption 

2263 else "All data including API keys stored in the database are currently unencrypted. Please ensure appropriate file system permissions are set.", 

2264 "recommendation": "Your data is protected with database encryption." 

2265 if db_manager.has_encryption 

2266 else "Consider using environment variables for sensitive API keys instead of storing them in the database.", 

2267 }, 

2268 "encryption_settings": cipher_settings, 

2269 } 

2270 ) 

2271 

2272 except Exception: 

2273 logger.exception("Error getting data location information") 

2274 return jsonify({"error": "Failed to retrieve settings"}), 500 

2275 

2276 

2277@settings_bp.route("/api/notifications/test-url", methods=["POST"]) 

2278@login_required 

2279def api_test_notification_url(): 

2280 """ 

2281 Test a notification service URL. 

2282 

2283 This endpoint creates a temporary NotificationService instance to test 

2284 the provided URL. No database session or password is required because: 

2285 - The service URL is provided directly in the request body 

2286 - Test notifications use a temporary Apprise instance 

2287 - No user settings or database queries are performed 

2288 

2289 Security note: Rate limiting is not applied here because users need to 

2290 test URLs when configuring notifications. Abuse is mitigated by the 

2291 @login_required decorator and the fact that users can only spam their 

2292 own notification services. 

2293 """ 

2294 try: 

2295 from ...notifications.service import NotificationService 

2296 

2297 data = request.get_json() 

2298 if not data or "service_url" not in data: 

2299 return jsonify( 

2300 {"success": False, "error": "service_url is required"} 

2301 ), 400 

2302 

2303 service_url = data["service_url"] 

2304 

2305 # Create notification service instance and test the URL 

2306 # No password/session needed - URL provided directly, no DB access 

2307 notification_service = NotificationService() 

2308 result = notification_service.test_service(service_url) 

2309 

2310 # Only return expected fields to prevent information leakage 

2311 safe_response = { 

2312 "success": result.get("success", False), 

2313 "message": result.get("message", ""), 

2314 "error": result.get("error", ""), 

2315 } 

2316 return jsonify(safe_response) 

2317 

2318 except Exception: 

2319 logger.exception("Error testing notification URL") 

2320 return jsonify( 

2321 { 

2322 "success": False, 

2323 "error": "Failed to test notification service. Check logs for details.", 

2324 } 

2325 ), 500