Coverage for src / local_deep_research / web / auth / routes.py: 89%

317 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Authentication routes for login, register, and logout. 

3Uses SQLCipher encrypted databases with browser password manager support. 

4""" 

5 

6import threading 

7from datetime import datetime, timezone, UTC 

8 

9from flask import ( 

10 Blueprint, 

11 flash, 

12 jsonify, 

13 redirect, 

14 render_template, 

15 request, 

16 session, 

17 url_for, 

18) 

19from loguru import logger 

20 

21from ...database.auth_db import auth_db_session 

22from ...database.encrypted_db import db_manager 

23from ...database.models.auth import User 

24from ...database.thread_local_session import thread_cleanup 

25from sqlalchemy.exc import IntegrityError 

26from ...utilities.threading_utils import thread_context, thread_with_app_context 

27from .session_manager import ( 

28 session_manager, 

29) # singleton from session_manager module 

30from ..server_config import load_server_config 

31from ...security.rate_limiter import ( 

32 login_limit, 

33 password_change_limit, 

34 registration_limit, 

35) 

36from urllib.parse import urlparse 

37 

38from ...security.url_validator import URLValidator 

39from ...security.account_lockout import get_account_lockout_manager 

40from ...security.password_validator import PasswordValidator 

41from ...security.log_sanitizer import sanitize_for_log 

42 

43auth_bp = Blueprint("auth", __name__, url_prefix="/auth") 

44 

45 

46@auth_bp.route("/csrf-token", methods=["GET"]) 

47def get_csrf_token(): 

48 """ 

49 Get CSRF token for API requests. 

50 Returns the current CSRF token for the session. 

51 This endpoint makes it easy for API clients to get the CSRF token 

52 programmatically without parsing HTML. 

53 """ 

54 from flask_wtf.csrf import generate_csrf 

55 

56 # Generate or get existing CSRF token for this session 

57 token = generate_csrf() 

58 

59 return jsonify({"csrf_token": token}), 200 

60 

61 

62@auth_bp.route("/login", methods=["GET"]) 

63def login_page(): 

64 """ 

65 Login page (GET only). 

66 Not rate limited - viewing the page should always work. 

67 """ 

68 config = load_server_config() 

69 # Check if already logged in 

70 if session.get("username"): 

71 return redirect(url_for("index")) 

72 

73 # Preserve the next parameter for post-login redirect 

74 next_page = request.args.get("next", "") 

75 

76 return render_template( 

77 "auth/login.html", 

78 has_encryption=db_manager.has_encryption, 

79 allow_registrations=config.get("allow_registrations", True), 

80 next_page=next_page, 

81 ) 

82 

83 

84@auth_bp.route("/login", methods=["POST"]) 

85@login_limit 

86def login(): 

87 """ 

88 Login handler (POST only). 

89 Rate limited to 5 attempts per 15 minutes per IP to prevent brute force attacks. 

90 """ 

91 config = load_server_config() 

92 # POST - Handle login 

93 username = request.form.get("username", "").strip() 

94 password = request.form.get("password", "") 

95 remember = request.form.get("remember", "false") == "true" 

96 

97 if not username or not password: 

98 flash("Username and password are required", "error") 

99 return render_template( 

100 "auth/login.html", 

101 has_encryption=db_manager.has_encryption, 

102 allow_registrations=config.get("allow_registrations", True), 

103 ), 400 

104 

105 # Check account lockout before attempting credential verification 

106 lockout_mgr = get_account_lockout_manager() 

107 if lockout_mgr.is_locked(username): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 logger.warning( 

109 f"Login attempt for locked account: {sanitize_for_log(username)}" 

110 ) 

111 flash("Account is temporarily locked. Please try again later.", "error") 

112 return render_template( 

113 "auth/login.html", 

114 has_encryption=db_manager.has_encryption, 

115 allow_registrations=config.get("allow_registrations", True), 

116 ), 429 

117 

118 # Try to open user's encrypted database 

119 engine = db_manager.open_user_database(username, password) 

120 

121 if engine is None: 

122 # Invalid credentials or database doesn't exist 

123 lockout_mgr.record_failure(username) 

124 logger.warning( 

125 f"Failed login attempt for username: {sanitize_for_log(username)}" 

126 ) 

127 flash("Invalid username or password", "error") 

128 return render_template( 

129 "auth/login.html", 

130 has_encryption=db_manager.has_encryption, 

131 allow_registrations=config.get("allow_registrations", True), 

132 ), 401 

133 

134 # Success — clear any prior failure count 

135 lockout_mgr.record_success(username) 

136 

137 # Prevent session fixation by clearing old session data before creating new 

138 session.clear() 

139 

140 # Create session 

141 session_id = session_manager.create_session(username, remember) 

142 session["session_id"] = session_id 

143 session["username"] = username 

144 session.permanent = remember 

145 

146 # Store password temporarily for post-login database access 

147 from ...database.temp_auth import temp_auth_store 

148 

149 auth_token = temp_auth_store.store_auth(username, password) 

150 session["temp_auth_token"] = auth_token 

151 

152 # Also store in session password store for metrics access 

153 from ...database.session_passwords import session_password_store 

154 

155 session_password_store.store_session_password( 

156 username, session_id, password 

157 ) 

158 

159 logger.info(f"User {username} logged in successfully") 

160 

161 # Defer non-critical post-login work to a background thread so the 

162 # redirect returns immediately (settings migration, library init, 

163 # news scheduler, and model cache clearing are all idempotent and 

164 # can safely run after the response). 

165 app_ctx = thread_context() 

166 thread = threading.Thread( 

167 target=thread_with_app_context(_perform_post_login_tasks), 

168 args=(app_ctx, username, password), 

169 daemon=True, 

170 ) 

171 thread.start() 

172 

173 next_page = request.args.get("next", "") 

174 safe_path = URLValidator.get_safe_redirect_path(next_page, request.host_url) 

175 if safe_path: 

176 safe_path = safe_path.replace("\\", "/") 

177 parsed = urlparse(safe_path) 

178 if not parsed.scheme and not parsed.netloc: 178 ↛ 180line 178 didn't jump to line 180 because the condition on line 178 was always true

179 return redirect(safe_path) 

180 return redirect(url_for("index")) 

181 

182 

183@thread_cleanup 

184def _perform_post_login_tasks(username: str, password: str) -> None: 

185 """Run non-critical post-login operations in a background thread. 

186 

187 Each operation is wrapped in its own try/except so that one failure 

188 does not prevent the others from running. All operations here are 

189 idempotent and safe to retry on the next login. 

190 """ 

191 # 1. Settings version check + migration 

192 try: 

193 from ...settings.manager import SettingsManager 

194 from ...database.session_context import get_user_db_session 

195 

196 with get_user_db_session(username, password) as db_session: 

197 settings_manager = SettingsManager(db_session) 

198 if not settings_manager.db_version_matches_package(): 

199 logger.info( 

200 f"Database version mismatch for {username} " 

201 "- loading missing default settings" 

202 ) 

203 settings_manager.load_from_defaults_file( 

204 commit=True, overwrite=False 

205 ) 

206 settings_manager.update_db_version() 

207 logger.info( 

208 f"Missing default settings loaded and version " 

209 f"updated for user {username}" 

210 ) 

211 except Exception: 

212 logger.exception(f"Post-login settings migration failed for {username}") 

213 

214 # 2. Initialize library system (source types and default collection) 

215 try: 

216 from ...database.library_init import initialize_library_for_user 

217 

218 init_results = initialize_library_for_user(username, password) 

219 if init_results.get("success"): 

220 logger.info(f"Library system initialized for user {username}") 

221 else: 

222 logger.warning( 

223 f"Library initialization issue for {username}: " 

224 f"{init_results.get('error', 'Unknown error')}" 

225 ) 

226 except Exception: 

227 logger.exception(f"Post-login library init failed for {username}") 

228 

229 # 3. Update last_login in auth DB + notify news scheduler 

230 try: 

231 with auth_db_session() as auth_db: 

232 user = auth_db.query(User).filter_by(username=username).first() 

233 if user: 

234 user.last_login = datetime.now(UTC) 

235 

236 try: 

237 from ...news.subscription_manager.scheduler import ( 

238 get_news_scheduler, 

239 ) 

240 

241 scheduler = get_news_scheduler() 

242 if scheduler.is_running: 242 ↛ 250line 242 didn't jump to line 250 because the condition on line 242 was always true

243 scheduler.update_user_info(username, password) 

244 logger.info( 

245 f"Updated scheduler with user info for {username}" 

246 ) 

247 except Exception: 

248 logger.exception("Could not update scheduler on login") 

249 

250 auth_db.commit() 

251 except Exception: 

252 logger.exception(f"Post-login auth DB update failed for {username}") 

253 

254 # 4. Clear the model cache to ensure fresh provider data 

255 try: 

256 from ...database.models import ProviderModel 

257 

258 with get_user_db_session(username, password) as user_db_session: 

259 deleted_count = user_db_session.query(ProviderModel).delete() 

260 user_db_session.commit() 

261 logger.info( 

262 f"Cleared {deleted_count} cached models " 

263 f"for user {username} on login" 

264 ) 

265 except Exception: 

266 logger.exception(f"Post-login model cache clear failed for {username}") 

267 

268 # 5. Schedule background database backup if enabled 

269 try: 

270 from ...database.backup import get_backup_scheduler 

271 from ...settings.manager import SettingsManager 

272 from ...database.session_context import get_user_db_session 

273 

274 with get_user_db_session(username, password) as db_session: 

275 sm = SettingsManager(db_session) 

276 backup_enabled = sm.get_setting("backup.enabled", True) 

277 

278 if backup_enabled: 278 ↛ 289line 278 didn't jump to line 289

279 max_backups = sm.get_setting("backup.max_count", 1) 

280 max_age_days = sm.get_setting("backup.max_age_days", 7) 

281 

282 get_backup_scheduler().schedule_backup( 

283 username, password, max_backups, max_age_days 

284 ) 

285 logger.info(f"Background backup scheduled for user {username}") 

286 except Exception: 

287 logger.exception(f"Post-login backup scheduling failed for {username}") 

288 

289 logger.info(f"Post-login tasks completed for user {username}") 

290 

291 

292@auth_bp.route("/validate-password", methods=["POST"]) 

293def validate_password(): 

294 """Validate password strength via API (used by client-side forms).""" 

295 password = request.form.get("password", "") 

296 errors = PasswordValidator.validate_strength(password) 

297 return jsonify({"valid": len(errors) == 0, "errors": errors}) 

298 

299 

300@auth_bp.route("/register", methods=["GET"]) 

301def register_page(): 

302 """ 

303 Registration page (GET only). 

304 Not rate limited - viewing the page should always work. 

305 """ 

306 config = load_server_config() 

307 if not config.get("allow_registrations", True): 

308 flash("New user registrations are currently disabled.", "error") 

309 return redirect(url_for("auth.login_page")) 

310 

311 return render_template( 

312 "auth/register.html", 

313 has_encryption=db_manager.has_encryption, 

314 password_requirements=PasswordValidator.get_requirements(), 

315 ) 

316 

317 

318@auth_bp.route("/register", methods=["POST"]) 

319@registration_limit 

320def register(): 

321 """ 

322 Registration handler (POST only). 

323 Creates new encrypted database for user with clear warnings about password recovery. 

324 Rate limited to 3 attempts per hour per IP to prevent registration spam. 

325 """ 

326 config = load_server_config() 

327 if not config.get("allow_registrations", True): 

328 flash("New user registrations are currently disabled.", "error") 

329 return redirect(url_for("auth.login_page")) 

330 

331 # POST - Handle registration 

332 username = request.form.get("username", "").strip() 

333 password = request.form.get("password", "") 

334 confirm_password = request.form.get("confirm_password", "") 

335 acknowledge = request.form.get("acknowledge", "false") == "true" 

336 

337 # Validation 

338 errors = [] 

339 

340 if not username: 

341 errors.append("Username is required") 

342 elif len(username) < 3: 

343 errors.append("Username must be at least 3 characters") 

344 elif not username.replace("_", "").replace("-", "").isalnum(): 

345 errors.append( 

346 "Username can only contain letters, numbers, underscores, and hyphens" 

347 ) 

348 

349 if not password: 

350 errors.append("Password is required") 

351 else: 

352 errors.extend(PasswordValidator.validate_strength(password)) 

353 

354 if password != confirm_password: 

355 errors.append("Passwords do not match") 

356 

357 if not acknowledge: 

358 errors.append( 

359 "You must acknowledge that password recovery is not possible" 

360 ) 

361 

362 # Check if user already exists 

363 # Use generic error message to prevent account enumeration 

364 # Note: While this creates a minor timing difference, it's acceptable because: 

365 # 1. Rate limiting prevents automated timing analysis 

366 # 2. Generic error message prevents content-based enumeration 

367 # 3. Local database query timing is minimal (no network calls) 

368 # 4. Better UX with immediate feedback outweighs minor timing risk 

369 # See: https://cheatsheetseries.owasp.org/cheatsheets/Authentication_Cheat_Sheet.html 

370 if not errors and username and db_manager.user_exists(username): 

371 errors.append("Registration failed. Please try a different username.") 

372 

373 if errors: 

374 for error in errors: 

375 flash(error, "error") 

376 return render_template( 

377 "auth/register.html", 

378 has_encryption=db_manager.has_encryption, 

379 password_requirements=PasswordValidator.get_requirements(), 

380 ), 400 

381 

382 # Create user in auth database 

383 with auth_db_session() as auth_db: 

384 try: 

385 new_user = User(username=username) 

386 auth_db.add(new_user) 

387 auth_db.commit() 

388 except IntegrityError: 

389 # Catch duplicate username specifically (race condition case) 

390 # This handles the edge case where two requests for the same username 

391 # pass the user_exists() check simultaneously 

392 logger.warning(f"Duplicate username attempted: {username}") 

393 auth_db.rollback() 

394 flash( 

395 "Registration failed. Please try a different username.", "error" 

396 ) 

397 return render_template( 

398 "auth/register.html", 

399 has_encryption=db_manager.has_encryption, 

400 password_requirements=PasswordValidator.get_requirements(), 

401 ), 400 

402 except Exception: 

403 logger.exception(f"Registration failed for {username}") 

404 auth_db.rollback() 

405 flash("Registration failed. Please try again.", "error") 

406 return render_template( 

407 "auth/register.html", 

408 has_encryption=db_manager.has_encryption, 

409 password_requirements=PasswordValidator.get_requirements(), 

410 ), 500 

411 

412 try: 

413 # Create encrypted database for user 

414 db_manager.create_user_database(username, password) 

415 

416 # Prevent session fixation by clearing old session data 

417 session.clear() 

418 

419 # Auto-login after registration 

420 session_id = session_manager.create_session(username, False) 

421 session["session_id"] = session_id 

422 session["username"] = username 

423 

424 # Store password temporarily for post-registration database access 

425 from ...database.temp_auth import temp_auth_store 

426 

427 auth_token = temp_auth_store.store_auth(username, password) 

428 session["temp_auth_token"] = auth_token 

429 

430 # Also store in session password store for metrics access 

431 from ...database.session_passwords import session_password_store 

432 

433 session_password_store.store_session_password( 

434 username, session_id, password 

435 ) 

436 

437 # Notify the news scheduler about the new user 

438 try: 

439 from ...news.subscription_manager.scheduler import ( 

440 get_news_scheduler, 

441 ) 

442 

443 scheduler = get_news_scheduler() 

444 if scheduler.is_running: 444 ↛ 452line 444 didn't jump to line 452 because the condition on line 444 was always true

445 scheduler.update_user_info(username, password) 

446 logger.info( 

447 f"Updated scheduler with new user info for {username}" 

448 ) 

449 except Exception: 

450 logger.exception("Could not update scheduler on registration") 

451 

452 logger.info(f"New user registered: {username}") 

453 

454 # Initialize library system (source types and default collection) 

455 from ...database.library_init import initialize_library_for_user 

456 

457 try: 

458 init_results = initialize_library_for_user(username, password) 

459 if init_results.get("success"): 459 ↛ 464line 459 didn't jump to line 464 because the condition on line 459 was always true

460 logger.info( 

461 f"Library system initialized for new user {username}" 

462 ) 

463 else: 

464 logger.warning( 

465 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}" 

466 ) 

467 except Exception: 

468 logger.exception( 

469 f"Error initializing library for new user {username}" 

470 ) 

471 # Don't block registration on library init failure 

472 

473 return redirect(url_for("index")) 

474 

475 except Exception: 

476 logger.exception(f"Registration failed for {username}") 

477 flash("Registration failed. Please try again.", "error") 

478 return render_template( 

479 "auth/register.html", 

480 has_encryption=db_manager.has_encryption, 

481 password_requirements=PasswordValidator.get_requirements(), 

482 ), 500 

483 

484 

485@auth_bp.route("/logout", methods=["POST"]) 

486def logout(): 

487 """ 

488 Logout handler. 

489 Clears session and closes database connections. 

490 POST-only to prevent CSRF-triggered logout via GET (e.g. <img src="/auth/logout">). 

491 """ 

492 username = session.get("username") 

493 session_id = session.get("session_id") 

494 

495 if username: 

496 # LOGOUT CLEANUP ORDER (order matters): 

497 # 1. Unregister from news scheduler — removes password from scheduler's 

498 # user_sessions dict and cancels scheduled jobs. Must happen BEFORE 

499 # close_user_database() because: scheduler jobs fetch the password 

500 # from user_sessions at runtime to call open_user_database(). If we 

501 # close the DB first, a running job that already has the password 

502 # can re-create the engine. Removing the password first ensures 

503 # future job invocations can't authenticate. 

504 # Note: a narrow race remains — a job that already fetched the 

505 # password (but hasn't called open_user_database yet) can still 

506 # recreate an engine. This is benign: the dead-thread sweep will 

507 # clean it up within 60 seconds. 

508 # 2. Close database connection — disposes QueuePool engine and cleans 

509 # up thread engines for this user. 

510 # 3. Destroy Flask session — invalidates session token. 

511 # 4. Clear session password store — removes password from secondary store. 

512 # 5. Clear Flask session dict — removes all session data. 

513 try: 

514 from ...news.subscription_manager.scheduler import ( 

515 get_news_scheduler, 

516 ) 

517 

518 sched = get_news_scheduler() 

519 if sched.is_running: 519 ↛ 525line 519 didn't jump to line 525 because the condition on line 519 was always true

520 sched.unregister_user(username) 

521 except Exception: 

522 logger.warning("Could not unregister user from scheduler") 

523 

524 # Close database connection 

525 db_manager.close_user_database(username) 

526 

527 # Clear session 

528 if session_id: 528 ↛ 536line 528 didn't jump to line 536 because the condition on line 528 was always true

529 session_manager.destroy_session(session_id) 

530 

531 # Clear session password 

532 from ...database.session_passwords import session_password_store 

533 

534 session_password_store.clear_session(username, session_id) 

535 

536 session.clear() 

537 

538 logger.info(f"User {username} logged out") 

539 flash("You have been logged out successfully", "info") 

540 

541 return redirect(url_for("auth.login")) 

542 

543 

544@auth_bp.route("/check", methods=["GET"]) 

545def check_auth(): 

546 """ 

547 Check if user is authenticated (for AJAX requests). 

548 """ 

549 if session.get("username"): 

550 return jsonify({"authenticated": True, "username": session["username"]}) 

551 return jsonify({"authenticated": False}), 401 

552 

553 

554@auth_bp.route("/change-password", methods=["GET"]) 

555def change_password_page(): 

556 """ 

557 Change password page (GET only). 

558 Not rate limited - viewing the page should always work. 

559 """ 

560 username = session.get("username") 

561 if not username: 

562 return redirect(url_for("auth.login")) 

563 

564 return render_template( 

565 "auth/change_password.html", 

566 password_requirements=PasswordValidator.get_requirements(), 

567 ) 

568 

569 

570@auth_bp.route("/change-password", methods=["POST"]) 

571@password_change_limit 

572def change_password(): 

573 """ 

574 Change password handler (POST only). 

575 Requires current password and re-encrypts database. 

576 Rate limited to prevent brute-force of current password. 

577 """ 

578 username = session.get("username") 

579 if not username: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 return redirect(url_for("auth.login")) 

581 

582 # POST - Handle password change 

583 current_password = request.form.get("current_password", "") 

584 new_password = request.form.get("new_password", "") 

585 confirm_password = request.form.get("confirm_password", "") 

586 

587 # Validation 

588 errors = [] 

589 

590 if not current_password: 

591 errors.append("Current password is required") 

592 

593 if not new_password: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true

594 errors.append("New password is required") 

595 else: 

596 errors.extend(PasswordValidator.validate_strength(new_password)) 

597 

598 if new_password != confirm_password: 

599 errors.append("New passwords do not match") 

600 

601 if current_password == new_password: 

602 errors.append("New password must be different from current password") 

603 

604 if errors: 

605 for error in errors: 

606 flash(error, "error") 

607 return render_template( 

608 "auth/change_password.html", 

609 password_requirements=PasswordValidator.get_requirements(), 

610 ), 400 

611 

612 # Attempt password change 

613 success = db_manager.change_password( 

614 username, current_password, new_password 

615 ) 

616 

617 if success: 

618 # The rekey is the ONLY step needed. The auth database stores no 

619 # password hash — login works by attempting to decrypt the user's 

620 # SQLCipher database. Do NOT add an auth-DB password-hash update 

621 # here; it would fail (User model has no set_password method) and 

622 # is architecturally unnecessary. 

623 

624 # Clean up stale credentials before clearing session 

625 # (mirrors logout handler cleanup steps 1–5). 

626 

627 # 1. Unregister from scheduler (removes stale credential) 

628 try: 

629 from ...news.subscription_manager.scheduler import ( 

630 get_news_scheduler, 

631 ) 

632 

633 sched = get_news_scheduler() 

634 if sched.is_running: 634 ↛ 644line 634 didn't jump to line 644 because the condition on line 634 was always true

635 sched.unregister_user(username) 

636 except Exception: 

637 logger.warning( 

638 "Could not unregister user from scheduler", 

639 ) 

640 

641 # 2. Close database connection (disposes old-password engine) 

642 # change_password() already closes in its finally block, but 

643 # an explicit close here is defensive — harmless if redundant. 

644 db_manager.close_user_database(username) 

645 

646 # 2b. Purge old backups (encrypted with old key) and create 

647 # a fresh backup with the new key. Old-key backups are a 

648 # security risk per NIST SP 800-57 / OWASP A02 — they remain 

649 # decryptable with the compromised password. 

650 try: 

651 from ...database.backup.backup_service import BackupService 

652 

653 svc = BackupService(username=username, password=new_password) 

654 result = svc.purge_and_refresh() 

655 if result.success: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true

656 logger.info( 

657 f"Backups refreshed after password change for {username}" 

658 ) 

659 else: 

660 logger.error( 

661 f"Post-password-change backup failed for {username}: " 

662 f"{result.error}. Old backups were purged." 

663 ) 

664 except Exception: 

665 logger.exception( 

666 f"Could not refresh backups after password change " 

667 f"for {username}" 

668 ) 

669 

670 # 3. Destroy ALL sessions for this user + clear password store 

671 session_manager.destroy_all_user_sessions(username) 

672 

673 from ...database.session_passwords import ( 

674 session_password_store, 

675 ) 

676 

677 session_password_store.clear_all_for_user(username) 

678 

679 # 4. Clear Flask session dict 

680 session.clear() 

681 

682 logger.info(f"Password changed for user {username}") 

683 flash( 

684 "Password changed successfully. Please login with your new password.", 

685 "success", 

686 ) 

687 return redirect(url_for("auth.login")) 

688 flash("Current password is incorrect", "error") 

689 return render_template( 

690 "auth/change_password.html", 

691 password_requirements=PasswordValidator.get_requirements(), 

692 ), 401 

693 

694 

695@auth_bp.route("/integrity-check", methods=["GET"]) 

696def integrity_check(): 

697 """ 

698 Check database integrity for current user. 

699 """ 

700 username = session.get("username") 

701 if not username: 

702 return jsonify({"error": "Not authenticated"}), 401 

703 

704 is_valid = db_manager.check_database_integrity(username) 

705 

706 return jsonify( 

707 { 

708 "username": username, 

709 "integrity": "valid" if is_valid else "corrupted", 

710 "timestamp": datetime.now(timezone.utc).isoformat(), 

711 } 

712 )