Coverage for src / local_deep_research / web / auth / routes.py: 89%
317 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Authentication routes for login, register, and logout.
3Uses SQLCipher encrypted databases with browser password manager support.
4"""
6import threading
7from datetime import datetime, timezone, UTC
9from flask import (
10 Blueprint,
11 flash,
12 jsonify,
13 redirect,
14 render_template,
15 request,
16 session,
17 url_for,
18)
19from loguru import logger
21from ...database.auth_db import auth_db_session
22from ...database.encrypted_db import db_manager
23from ...database.models.auth import User
24from ...database.thread_local_session import thread_cleanup
25from sqlalchemy.exc import IntegrityError
26from ...utilities.threading_utils import thread_context, thread_with_app_context
27from .session_manager import (
28 session_manager,
29) # singleton from session_manager module
30from ..server_config import load_server_config
31from ...security.rate_limiter import (
32 login_limit,
33 password_change_limit,
34 registration_limit,
35)
36from urllib.parse import urlparse
38from ...security.url_validator import URLValidator
39from ...security.account_lockout import get_account_lockout_manager
40from ...security.password_validator import PasswordValidator
41from ...security.log_sanitizer import sanitize_for_log
43auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
46@auth_bp.route("/csrf-token", methods=["GET"])
47def get_csrf_token():
48 """
49 Get CSRF token for API requests.
50 Returns the current CSRF token for the session.
51 This endpoint makes it easy for API clients to get the CSRF token
52 programmatically without parsing HTML.
53 """
54 from flask_wtf.csrf import generate_csrf
56 # Generate or get existing CSRF token for this session
57 token = generate_csrf()
59 return jsonify({"csrf_token": token}), 200
62@auth_bp.route("/login", methods=["GET"])
63def login_page():
64 """
65 Login page (GET only).
66 Not rate limited - viewing the page should always work.
67 """
68 config = load_server_config()
69 # Check if already logged in
70 if session.get("username"):
71 return redirect(url_for("index"))
73 # Preserve the next parameter for post-login redirect
74 next_page = request.args.get("next", "")
76 return render_template(
77 "auth/login.html",
78 has_encryption=db_manager.has_encryption,
79 allow_registrations=config.get("allow_registrations", True),
80 next_page=next_page,
81 )
84@auth_bp.route("/login", methods=["POST"])
85@login_limit
86def login():
87 """
88 Login handler (POST only).
89 Rate limited to 5 attempts per 15 minutes per IP to prevent brute force attacks.
90 """
91 config = load_server_config()
92 # POST - Handle login
93 username = request.form.get("username", "").strip()
94 password = request.form.get("password", "")
95 remember = request.form.get("remember", "false") == "true"
97 if not username or not password:
98 flash("Username and password are required", "error")
99 return render_template(
100 "auth/login.html",
101 has_encryption=db_manager.has_encryption,
102 allow_registrations=config.get("allow_registrations", True),
103 ), 400
105 # Check account lockout before attempting credential verification
106 lockout_mgr = get_account_lockout_manager()
107 if lockout_mgr.is_locked(username): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 logger.warning(
109 f"Login attempt for locked account: {sanitize_for_log(username)}"
110 )
111 flash("Account is temporarily locked. Please try again later.", "error")
112 return render_template(
113 "auth/login.html",
114 has_encryption=db_manager.has_encryption,
115 allow_registrations=config.get("allow_registrations", True),
116 ), 429
118 # Try to open user's encrypted database
119 engine = db_manager.open_user_database(username, password)
121 if engine is None:
122 # Invalid credentials or database doesn't exist
123 lockout_mgr.record_failure(username)
124 logger.warning(
125 f"Failed login attempt for username: {sanitize_for_log(username)}"
126 )
127 flash("Invalid username or password", "error")
128 return render_template(
129 "auth/login.html",
130 has_encryption=db_manager.has_encryption,
131 allow_registrations=config.get("allow_registrations", True),
132 ), 401
134 # Success — clear any prior failure count
135 lockout_mgr.record_success(username)
137 # Prevent session fixation by clearing old session data before creating new
138 session.clear()
140 # Create session
141 session_id = session_manager.create_session(username, remember)
142 session["session_id"] = session_id
143 session["username"] = username
144 session.permanent = remember
146 # Store password temporarily for post-login database access
147 from ...database.temp_auth import temp_auth_store
149 auth_token = temp_auth_store.store_auth(username, password)
150 session["temp_auth_token"] = auth_token
152 # Also store in session password store for metrics access
153 from ...database.session_passwords import session_password_store
155 session_password_store.store_session_password(
156 username, session_id, password
157 )
159 logger.info(f"User {username} logged in successfully")
161 # Defer non-critical post-login work to a background thread so the
162 # redirect returns immediately (settings migration, library init,
163 # news scheduler, and model cache clearing are all idempotent and
164 # can safely run after the response).
165 app_ctx = thread_context()
166 thread = threading.Thread(
167 target=thread_with_app_context(_perform_post_login_tasks),
168 args=(app_ctx, username, password),
169 daemon=True,
170 )
171 thread.start()
173 next_page = request.args.get("next", "")
174 safe_path = URLValidator.get_safe_redirect_path(next_page, request.host_url)
175 if safe_path:
176 safe_path = safe_path.replace("\\", "/")
177 parsed = urlparse(safe_path)
178 if not parsed.scheme and not parsed.netloc: 178 ↛ 180line 178 didn't jump to line 180 because the condition on line 178 was always true
179 return redirect(safe_path)
180 return redirect(url_for("index"))
183@thread_cleanup
184def _perform_post_login_tasks(username: str, password: str) -> None:
185 """Run non-critical post-login operations in a background thread.
187 Each operation is wrapped in its own try/except so that one failure
188 does not prevent the others from running. All operations here are
189 idempotent and safe to retry on the next login.
190 """
191 # 1. Settings version check + migration
192 try:
193 from ...settings.manager import SettingsManager
194 from ...database.session_context import get_user_db_session
196 with get_user_db_session(username, password) as db_session:
197 settings_manager = SettingsManager(db_session)
198 if not settings_manager.db_version_matches_package():
199 logger.info(
200 f"Database version mismatch for {username} "
201 "- loading missing default settings"
202 )
203 settings_manager.load_from_defaults_file(
204 commit=True, overwrite=False
205 )
206 settings_manager.update_db_version()
207 logger.info(
208 f"Missing default settings loaded and version "
209 f"updated for user {username}"
210 )
211 except Exception:
212 logger.exception(f"Post-login settings migration failed for {username}")
214 # 2. Initialize library system (source types and default collection)
215 try:
216 from ...database.library_init import initialize_library_for_user
218 init_results = initialize_library_for_user(username, password)
219 if init_results.get("success"):
220 logger.info(f"Library system initialized for user {username}")
221 else:
222 logger.warning(
223 f"Library initialization issue for {username}: "
224 f"{init_results.get('error', 'Unknown error')}"
225 )
226 except Exception:
227 logger.exception(f"Post-login library init failed for {username}")
229 # 3. Update last_login in auth DB + notify news scheduler
230 try:
231 with auth_db_session() as auth_db:
232 user = auth_db.query(User).filter_by(username=username).first()
233 if user:
234 user.last_login = datetime.now(UTC)
236 try:
237 from ...news.subscription_manager.scheduler import (
238 get_news_scheduler,
239 )
241 scheduler = get_news_scheduler()
242 if scheduler.is_running: 242 ↛ 250line 242 didn't jump to line 250 because the condition on line 242 was always true
243 scheduler.update_user_info(username, password)
244 logger.info(
245 f"Updated scheduler with user info for {username}"
246 )
247 except Exception:
248 logger.exception("Could not update scheduler on login")
250 auth_db.commit()
251 except Exception:
252 logger.exception(f"Post-login auth DB update failed for {username}")
254 # 4. Clear the model cache to ensure fresh provider data
255 try:
256 from ...database.models import ProviderModel
258 with get_user_db_session(username, password) as user_db_session:
259 deleted_count = user_db_session.query(ProviderModel).delete()
260 user_db_session.commit()
261 logger.info(
262 f"Cleared {deleted_count} cached models "
263 f"for user {username} on login"
264 )
265 except Exception:
266 logger.exception(f"Post-login model cache clear failed for {username}")
268 # 5. Schedule background database backup if enabled
269 try:
270 from ...database.backup import get_backup_scheduler
271 from ...settings.manager import SettingsManager
272 from ...database.session_context import get_user_db_session
274 with get_user_db_session(username, password) as db_session:
275 sm = SettingsManager(db_session)
276 backup_enabled = sm.get_setting("backup.enabled", True)
278 if backup_enabled: 278 ↛ 289line 278 didn't jump to line 289
279 max_backups = sm.get_setting("backup.max_count", 1)
280 max_age_days = sm.get_setting("backup.max_age_days", 7)
282 get_backup_scheduler().schedule_backup(
283 username, password, max_backups, max_age_days
284 )
285 logger.info(f"Background backup scheduled for user {username}")
286 except Exception:
287 logger.exception(f"Post-login backup scheduling failed for {username}")
289 logger.info(f"Post-login tasks completed for user {username}")
292@auth_bp.route("/validate-password", methods=["POST"])
293def validate_password():
294 """Validate password strength via API (used by client-side forms)."""
295 password = request.form.get("password", "")
296 errors = PasswordValidator.validate_strength(password)
297 return jsonify({"valid": len(errors) == 0, "errors": errors})
300@auth_bp.route("/register", methods=["GET"])
301def register_page():
302 """
303 Registration page (GET only).
304 Not rate limited - viewing the page should always work.
305 """
306 config = load_server_config()
307 if not config.get("allow_registrations", True):
308 flash("New user registrations are currently disabled.", "error")
309 return redirect(url_for("auth.login_page"))
311 return render_template(
312 "auth/register.html",
313 has_encryption=db_manager.has_encryption,
314 password_requirements=PasswordValidator.get_requirements(),
315 )
318@auth_bp.route("/register", methods=["POST"])
319@registration_limit
320def register():
321 """
322 Registration handler (POST only).
323 Creates new encrypted database for user with clear warnings about password recovery.
324 Rate limited to 3 attempts per hour per IP to prevent registration spam.
325 """
326 config = load_server_config()
327 if not config.get("allow_registrations", True):
328 flash("New user registrations are currently disabled.", "error")
329 return redirect(url_for("auth.login_page"))
331 # POST - Handle registration
332 username = request.form.get("username", "").strip()
333 password = request.form.get("password", "")
334 confirm_password = request.form.get("confirm_password", "")
335 acknowledge = request.form.get("acknowledge", "false") == "true"
337 # Validation
338 errors = []
340 if not username:
341 errors.append("Username is required")
342 elif len(username) < 3:
343 errors.append("Username must be at least 3 characters")
344 elif not username.replace("_", "").replace("-", "").isalnum():
345 errors.append(
346 "Username can only contain letters, numbers, underscores, and hyphens"
347 )
349 if not password:
350 errors.append("Password is required")
351 else:
352 errors.extend(PasswordValidator.validate_strength(password))
354 if password != confirm_password:
355 errors.append("Passwords do not match")
357 if not acknowledge:
358 errors.append(
359 "You must acknowledge that password recovery is not possible"
360 )
362 # Check if user already exists
363 # Use generic error message to prevent account enumeration
364 # Note: While this creates a minor timing difference, it's acceptable because:
365 # 1. Rate limiting prevents automated timing analysis
366 # 2. Generic error message prevents content-based enumeration
367 # 3. Local database query timing is minimal (no network calls)
368 # 4. Better UX with immediate feedback outweighs minor timing risk
369 # See: https://cheatsheetseries.owasp.org/cheatsheets/Authentication_Cheat_Sheet.html
370 if not errors and username and db_manager.user_exists(username):
371 errors.append("Registration failed. Please try a different username.")
373 if errors:
374 for error in errors:
375 flash(error, "error")
376 return render_template(
377 "auth/register.html",
378 has_encryption=db_manager.has_encryption,
379 password_requirements=PasswordValidator.get_requirements(),
380 ), 400
382 # Create user in auth database
383 with auth_db_session() as auth_db:
384 try:
385 new_user = User(username=username)
386 auth_db.add(new_user)
387 auth_db.commit()
388 except IntegrityError:
389 # Catch duplicate username specifically (race condition case)
390 # This handles the edge case where two requests for the same username
391 # pass the user_exists() check simultaneously
392 logger.warning(f"Duplicate username attempted: {username}")
393 auth_db.rollback()
394 flash(
395 "Registration failed. Please try a different username.", "error"
396 )
397 return render_template(
398 "auth/register.html",
399 has_encryption=db_manager.has_encryption,
400 password_requirements=PasswordValidator.get_requirements(),
401 ), 400
402 except Exception:
403 logger.exception(f"Registration failed for {username}")
404 auth_db.rollback()
405 flash("Registration failed. Please try again.", "error")
406 return render_template(
407 "auth/register.html",
408 has_encryption=db_manager.has_encryption,
409 password_requirements=PasswordValidator.get_requirements(),
410 ), 500
412 try:
413 # Create encrypted database for user
414 db_manager.create_user_database(username, password)
416 # Prevent session fixation by clearing old session data
417 session.clear()
419 # Auto-login after registration
420 session_id = session_manager.create_session(username, False)
421 session["session_id"] = session_id
422 session["username"] = username
424 # Store password temporarily for post-registration database access
425 from ...database.temp_auth import temp_auth_store
427 auth_token = temp_auth_store.store_auth(username, password)
428 session["temp_auth_token"] = auth_token
430 # Also store in session password store for metrics access
431 from ...database.session_passwords import session_password_store
433 session_password_store.store_session_password(
434 username, session_id, password
435 )
437 # Notify the news scheduler about the new user
438 try:
439 from ...news.subscription_manager.scheduler import (
440 get_news_scheduler,
441 )
443 scheduler = get_news_scheduler()
444 if scheduler.is_running: 444 ↛ 452line 444 didn't jump to line 452 because the condition on line 444 was always true
445 scheduler.update_user_info(username, password)
446 logger.info(
447 f"Updated scheduler with new user info for {username}"
448 )
449 except Exception:
450 logger.exception("Could not update scheduler on registration")
452 logger.info(f"New user registered: {username}")
454 # Initialize library system (source types and default collection)
455 from ...database.library_init import initialize_library_for_user
457 try:
458 init_results = initialize_library_for_user(username, password)
459 if init_results.get("success"): 459 ↛ 464line 459 didn't jump to line 464 because the condition on line 459 was always true
460 logger.info(
461 f"Library system initialized for new user {username}"
462 )
463 else:
464 logger.warning(
465 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}"
466 )
467 except Exception:
468 logger.exception(
469 f"Error initializing library for new user {username}"
470 )
471 # Don't block registration on library init failure
473 return redirect(url_for("index"))
475 except Exception:
476 logger.exception(f"Registration failed for {username}")
477 flash("Registration failed. Please try again.", "error")
478 return render_template(
479 "auth/register.html",
480 has_encryption=db_manager.has_encryption,
481 password_requirements=PasswordValidator.get_requirements(),
482 ), 500
485@auth_bp.route("/logout", methods=["POST"])
486def logout():
487 """
488 Logout handler.
489 Clears session and closes database connections.
490 POST-only to prevent CSRF-triggered logout via GET (e.g. <img src="/auth/logout">).
491 """
492 username = session.get("username")
493 session_id = session.get("session_id")
495 if username:
496 # LOGOUT CLEANUP ORDER (order matters):
497 # 1. Unregister from news scheduler — removes password from scheduler's
498 # user_sessions dict and cancels scheduled jobs. Must happen BEFORE
499 # close_user_database() because: scheduler jobs fetch the password
500 # from user_sessions at runtime to call open_user_database(). If we
501 # close the DB first, a running job that already has the password
502 # can re-create the engine. Removing the password first ensures
503 # future job invocations can't authenticate.
504 # Note: a narrow race remains — a job that already fetched the
505 # password (but hasn't called open_user_database yet) can still
506 # recreate an engine. This is benign: the dead-thread sweep will
507 # clean it up within 60 seconds.
508 # 2. Close database connection — disposes QueuePool engine and cleans
509 # up thread engines for this user.
510 # 3. Destroy Flask session — invalidates session token.
511 # 4. Clear session password store — removes password from secondary store.
512 # 5. Clear Flask session dict — removes all session data.
513 try:
514 from ...news.subscription_manager.scheduler import (
515 get_news_scheduler,
516 )
518 sched = get_news_scheduler()
519 if sched.is_running: 519 ↛ 525line 519 didn't jump to line 525 because the condition on line 519 was always true
520 sched.unregister_user(username)
521 except Exception:
522 logger.warning("Could not unregister user from scheduler")
524 # Close database connection
525 db_manager.close_user_database(username)
527 # Clear session
528 if session_id: 528 ↛ 536line 528 didn't jump to line 536 because the condition on line 528 was always true
529 session_manager.destroy_session(session_id)
531 # Clear session password
532 from ...database.session_passwords import session_password_store
534 session_password_store.clear_session(username, session_id)
536 session.clear()
538 logger.info(f"User {username} logged out")
539 flash("You have been logged out successfully", "info")
541 return redirect(url_for("auth.login"))
544@auth_bp.route("/check", methods=["GET"])
545def check_auth():
546 """
547 Check if user is authenticated (for AJAX requests).
548 """
549 if session.get("username"):
550 return jsonify({"authenticated": True, "username": session["username"]})
551 return jsonify({"authenticated": False}), 401
554@auth_bp.route("/change-password", methods=["GET"])
555def change_password_page():
556 """
557 Change password page (GET only).
558 Not rate limited - viewing the page should always work.
559 """
560 username = session.get("username")
561 if not username:
562 return redirect(url_for("auth.login"))
564 return render_template(
565 "auth/change_password.html",
566 password_requirements=PasswordValidator.get_requirements(),
567 )
570@auth_bp.route("/change-password", methods=["POST"])
571@password_change_limit
572def change_password():
573 """
574 Change password handler (POST only).
575 Requires current password and re-encrypts database.
576 Rate limited to prevent brute-force of current password.
577 """
578 username = session.get("username")
579 if not username: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true
580 return redirect(url_for("auth.login"))
582 # POST - Handle password change
583 current_password = request.form.get("current_password", "")
584 new_password = request.form.get("new_password", "")
585 confirm_password = request.form.get("confirm_password", "")
587 # Validation
588 errors = []
590 if not current_password:
591 errors.append("Current password is required")
593 if not new_password: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true
594 errors.append("New password is required")
595 else:
596 errors.extend(PasswordValidator.validate_strength(new_password))
598 if new_password != confirm_password:
599 errors.append("New passwords do not match")
601 if current_password == new_password:
602 errors.append("New password must be different from current password")
604 if errors:
605 for error in errors:
606 flash(error, "error")
607 return render_template(
608 "auth/change_password.html",
609 password_requirements=PasswordValidator.get_requirements(),
610 ), 400
612 # Attempt password change
613 success = db_manager.change_password(
614 username, current_password, new_password
615 )
617 if success:
618 # The rekey is the ONLY step needed. The auth database stores no
619 # password hash — login works by attempting to decrypt the user's
620 # SQLCipher database. Do NOT add an auth-DB password-hash update
621 # here; it would fail (User model has no set_password method) and
622 # is architecturally unnecessary.
624 # Clean up stale credentials before clearing session
625 # (mirrors logout handler cleanup steps 1–5).
627 # 1. Unregister from scheduler (removes stale credential)
628 try:
629 from ...news.subscription_manager.scheduler import (
630 get_news_scheduler,
631 )
633 sched = get_news_scheduler()
634 if sched.is_running: 634 ↛ 644line 634 didn't jump to line 644 because the condition on line 634 was always true
635 sched.unregister_user(username)
636 except Exception:
637 logger.warning(
638 "Could not unregister user from scheduler",
639 )
641 # 2. Close database connection (disposes old-password engine)
642 # change_password() already closes in its finally block, but
643 # an explicit close here is defensive — harmless if redundant.
644 db_manager.close_user_database(username)
646 # 2b. Purge old backups (encrypted with old key) and create
647 # a fresh backup with the new key. Old-key backups are a
648 # security risk per NIST SP 800-57 / OWASP A02 — they remain
649 # decryptable with the compromised password.
650 try:
651 from ...database.backup.backup_service import BackupService
653 svc = BackupService(username=username, password=new_password)
654 result = svc.purge_and_refresh()
655 if result.success: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true
656 logger.info(
657 f"Backups refreshed after password change for {username}"
658 )
659 else:
660 logger.error(
661 f"Post-password-change backup failed for {username}: "
662 f"{result.error}. Old backups were purged."
663 )
664 except Exception:
665 logger.exception(
666 f"Could not refresh backups after password change "
667 f"for {username}"
668 )
670 # 3. Destroy ALL sessions for this user + clear password store
671 session_manager.destroy_all_user_sessions(username)
673 from ...database.session_passwords import (
674 session_password_store,
675 )
677 session_password_store.clear_all_for_user(username)
679 # 4. Clear Flask session dict
680 session.clear()
682 logger.info(f"Password changed for user {username}")
683 flash(
684 "Password changed successfully. Please login with your new password.",
685 "success",
686 )
687 return redirect(url_for("auth.login"))
688 flash("Current password is incorrect", "error")
689 return render_template(
690 "auth/change_password.html",
691 password_requirements=PasswordValidator.get_requirements(),
692 ), 401
695@auth_bp.route("/integrity-check", methods=["GET"])
696def integrity_check():
697 """
698 Check database integrity for current user.
699 """
700 username = session.get("username")
701 if not username:
702 return jsonify({"error": "Not authenticated"}), 401
704 is_valid = db_manager.check_database_integrity(username)
706 return jsonify(
707 {
708 "username": username,
709 "integrity": "valid" if is_valid else "corrupted",
710 "timestamp": datetime.now(timezone.utc).isoformat(),
711 }
712 )