Coverage for src / local_deep_research / web / auth / routes.py: 83%
265 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Authentication routes for login, register, and logout.
3Uses SQLCipher encrypted databases with browser password manager support.
4"""
6import threading
7from datetime import datetime, timezone, UTC
9from flask import (
10 Blueprint,
11 flash,
12 jsonify,
13 redirect,
14 render_template,
15 request,
16 session,
17 url_for,
18)
19from loguru import logger
21from ...database.auth_db import get_auth_db_session
22from ...database.encrypted_db import db_manager
23from ...database.models.auth import User
24from sqlalchemy.exc import IntegrityError
25from ...utilities.threading_utils import thread_context, thread_with_app_context
26from .session_manager import SessionManager
27from ..server_config import load_server_config
28from ..utils.rate_limiter import login_limit, registration_limit
29from ...security.url_validator import URLValidator
31auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
32session_manager = SessionManager()
35@auth_bp.route("/csrf-token", methods=["GET"])
36def get_csrf_token():
37 """
38 Get CSRF token for API requests.
39 Returns the current CSRF token for the session.
40 This endpoint makes it easy for API clients to get the CSRF token
41 programmatically without parsing HTML.
42 """
43 from flask_wtf.csrf import generate_csrf
45 # Generate or get existing CSRF token for this session
46 token = generate_csrf()
48 return jsonify({"csrf_token": token}), 200
51@auth_bp.route("/login", methods=["GET"])
52def login_page():
53 """
54 Login page (GET only).
55 Not rate limited - viewing the page should always work.
56 """
57 config = load_server_config()
58 # Check if already logged in
59 if session.get("username"):
60 return redirect(url_for("index"))
62 # Preserve the next parameter for post-login redirect
63 next_page = request.args.get("next", "")
65 return render_template(
66 "auth/login.html",
67 has_encryption=db_manager.has_encryption,
68 allow_registrations=config.get("allow_registrations", True),
69 next_page=next_page,
70 )
73@auth_bp.route("/login", methods=["POST"])
74@login_limit
75def login():
76 """
77 Login handler (POST only).
78 Rate limited to 5 attempts per 15 minutes per IP to prevent brute force attacks.
79 """
80 config = load_server_config()
81 # POST - Handle login
82 username = request.form.get("username", "").strip()
83 password = request.form.get("password", "")
84 remember = request.form.get("remember", "false") == "true"
86 if not username or not password:
87 flash("Username and password are required", "error")
88 return render_template(
89 "auth/login.html",
90 has_encryption=db_manager.has_encryption,
91 allow_registrations=config.get("allow_registrations", True),
92 ), 400
94 # Try to open user's encrypted database
95 engine = db_manager.open_user_database(username, password)
97 if engine is None:
98 # Invalid credentials or database doesn't exist
99 logger.warning(f"Failed login attempt for username: {username}")
100 flash("Invalid username or password", "error")
101 return render_template(
102 "auth/login.html",
103 has_encryption=db_manager.has_encryption,
104 allow_registrations=config.get("allow_registrations", True),
105 ), 401
107 # Success! Create session
108 session_id = session_manager.create_session(username, remember)
109 session["session_id"] = session_id
110 session["username"] = username
111 session.permanent = remember
113 # Store password temporarily for post-login database access
114 from ...database.temp_auth import temp_auth_store
116 auth_token = temp_auth_store.store_auth(username, password)
117 session["temp_auth_token"] = auth_token
119 # Also store in session password store for metrics access
120 from ...database.session_passwords import session_password_store
122 session_password_store.store_session_password(
123 username, session_id, password
124 )
126 logger.info(f"User {username} logged in successfully")
128 # Defer non-critical post-login work to a background thread so the
129 # redirect returns immediately (settings migration, library init,
130 # news scheduler, and model cache clearing are all idempotent and
131 # can safely run after the response).
132 app_ctx = thread_context()
133 thread = threading.Thread(
134 target=thread_with_app_context(_perform_post_login_tasks),
135 args=(app_ctx, username, password),
136 daemon=True,
137 )
138 thread.start()
140 # Redirect to original requested page or dashboard
141 next_page = request.args.get("next", "")
143 # Security: URLValidator.is_safe_redirect_url() validates that the target URL
144 # has http/https scheme and netloc matches the application host, preventing
145 # open redirect attacks (CWE-601). CodeQL doesn't recognize custom sanitizers.
146 if next_page and URLValidator.is_safe_redirect_url(
147 next_page, request.host_url
148 ):
149 return redirect(next_page) # codeql[py/url-redirection]
151 return redirect(url_for("index"))
154def _perform_post_login_tasks(username: str, password: str) -> None:
155 """Run non-critical post-login operations in a background thread.
157 Each operation is wrapped in its own try/except so that one failure
158 does not prevent the others from running. All operations here are
159 idempotent and safe to retry on the next login.
160 """
161 # 1. Settings version check + migration
162 try:
163 from ...settings.manager import SettingsManager
165 db_session = db_manager.get_session(username)
166 try:
167 if db_session: 167 ↛ 183line 167 didn't jump to line 183 because the condition on line 167 was always true
168 settings_manager = SettingsManager(db_session)
169 if not settings_manager.db_version_matches_package(): 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 logger.info(
171 f"Database version mismatch for {username} "
172 "- loading missing default settings"
173 )
174 settings_manager.load_from_defaults_file(
175 commit=True, overwrite=False
176 )
177 settings_manager.update_db_version()
178 logger.info(
179 f"Missing default settings loaded and version "
180 f"updated for user {username}"
181 )
182 finally:
183 if db_session: 183 ↛ 189line 183 didn't jump to line 189 because the condition on line 183 was always true
184 db_session.close()
185 except Exception:
186 logger.exception(f"Post-login settings migration failed for {username}")
188 # 2. Initialize library system (source types and default collection)
189 try:
190 from ...database.library_init import initialize_library_for_user
192 init_results = initialize_library_for_user(username, password)
193 if init_results.get("success"):
194 logger.info(f"Library system initialized for user {username}")
195 else:
196 logger.warning(
197 f"Library initialization issue for {username}: "
198 f"{init_results.get('error', 'Unknown error')}"
199 )
200 except Exception:
201 logger.exception(f"Post-login library init failed for {username}")
203 # 3. Update last_login in auth DB + notify news scheduler
204 try:
205 auth_db = get_auth_db_session()
206 try:
207 user = auth_db.query(User).filter_by(username=username).first()
208 if user:
209 user.last_login = datetime.now(UTC)
211 try:
212 from ...news.subscription_manager.scheduler import (
213 get_news_scheduler,
214 )
216 scheduler = get_news_scheduler()
217 if scheduler.is_running: 217 ↛ 225line 217 didn't jump to line 225 because the condition on line 217 was always true
218 scheduler.update_user_info(username, password)
219 logger.info(
220 f"Updated scheduler with user info for {username}"
221 )
222 except Exception:
223 logger.exception("Could not update scheduler on login")
225 auth_db.commit()
226 finally:
227 auth_db.close()
228 except Exception:
229 logger.exception(f"Post-login auth DB update failed for {username}")
231 # 4. Clear the model cache to ensure fresh provider data
232 try:
233 from ...database.models import ProviderModel
235 user_db_session = db_manager.get_session(username)
236 try:
237 if user_db_session:
238 deleted_count = user_db_session.query(ProviderModel).delete()
239 user_db_session.commit()
240 logger.info(
241 f"Cleared {deleted_count} cached models "
242 f"for user {username} on login"
243 )
244 finally:
245 if user_db_session:
246 user_db_session.close()
247 except Exception:
248 logger.exception(f"Post-login model cache clear failed for {username}")
250 logger.info(f"Post-login tasks completed for user {username}")
253@auth_bp.route("/register", methods=["GET"])
254def register_page():
255 """
256 Registration page (GET only).
257 Not rate limited - viewing the page should always work.
258 """
259 config = load_server_config()
260 if not config.get("allow_registrations", True):
261 flash("New user registrations are currently disabled.", "error")
262 return redirect(url_for("auth.login_page"))
264 return render_template(
265 "auth/register.html", has_encryption=db_manager.has_encryption
266 )
269@auth_bp.route("/register", methods=["POST"])
270@registration_limit
271def register():
272 """
273 Registration handler (POST only).
274 Creates new encrypted database for user with clear warnings about password recovery.
275 Rate limited to 3 attempts per hour per IP to prevent registration spam.
276 """
277 config = load_server_config()
278 if not config.get("allow_registrations", True):
279 flash("New user registrations are currently disabled.", "error")
280 return redirect(url_for("auth.login_page"))
282 # POST - Handle registration
283 username = request.form.get("username", "").strip()
284 password = request.form.get("password", "")
285 confirm_password = request.form.get("confirm_password", "")
286 acknowledge = request.form.get("acknowledge", "false") == "true"
288 # Validation
289 errors = []
291 if not username:
292 errors.append("Username is required")
293 elif len(username) < 3:
294 errors.append("Username must be at least 3 characters")
295 elif not username.replace("_", "").replace("-", "").isalnum():
296 errors.append(
297 "Username can only contain letters, numbers, underscores, and hyphens"
298 )
300 if not password:
301 errors.append("Password is required")
302 elif len(password) < 8:
303 errors.append("Password must be at least 8 characters")
305 if password != confirm_password:
306 errors.append("Passwords do not match")
308 if not acknowledge:
309 errors.append(
310 "You must acknowledge that password recovery is not possible"
311 )
313 # Check if user already exists
314 # Use generic error message to prevent account enumeration
315 # Note: While this creates a minor timing difference, it's acceptable because:
316 # 1. Rate limiting prevents automated timing analysis
317 # 2. Generic error message prevents content-based enumeration
318 # 3. Local database query timing is minimal (no network calls)
319 # 4. Better UX with immediate feedback outweighs minor timing risk
320 # See: https://cheatsheetseries.owasp.org/cheatsheets/Authentication_Cheat_Sheet.html
321 if username and db_manager.user_exists(username):
322 errors.append("Registration failed. Please try a different username.")
324 if errors:
325 for error in errors:
326 flash(error, "error")
327 return render_template(
328 "auth/register.html", has_encryption=db_manager.has_encryption
329 ), 400
331 # Create user in auth database
332 auth_db = get_auth_db_session()
333 try:
334 new_user = User(username=username)
335 auth_db.add(new_user)
336 auth_db.commit()
337 except IntegrityError:
338 # Catch duplicate username specifically (race condition case)
339 # This handles the edge case where two requests for the same username
340 # pass the user_exists() check simultaneously
341 logger.warning(f"Duplicate username attempted: {username}")
342 auth_db.rollback()
343 auth_db.close()
344 flash("Registration failed. Please try a different username.", "error")
345 return render_template(
346 "auth/register.html", has_encryption=db_manager.has_encryption
347 ), 400
348 except Exception:
349 logger.exception(f"Registration failed for {username}")
350 auth_db.rollback()
351 auth_db.close()
352 flash("Registration failed. Please try again.", "error")
353 return render_template(
354 "auth/register.html", has_encryption=db_manager.has_encryption
355 ), 500
356 finally:
357 # Note: In success case, we've already committed; in exception case,
358 # we rollback and close above, but finally ensures close if commit succeeded
359 try:
360 auth_db.close()
361 except Exception:
362 pass # Already closed in exception handler
364 try:
365 # Create encrypted database for user
366 db_manager.create_user_database(username, password)
368 # Auto-login after registration
369 session_id = session_manager.create_session(username, False)
370 session["session_id"] = session_id
371 session["username"] = username
373 # Store password temporarily for post-registration database access
374 from ...database.temp_auth import temp_auth_store
376 auth_token = temp_auth_store.store_auth(username, password)
377 session["temp_auth_token"] = auth_token
379 # Also store in session password store for metrics access
380 from ...database.session_passwords import session_password_store
382 session_password_store.store_session_password(
383 username, session_id, password
384 )
386 # Notify the news scheduler about the new user
387 try:
388 from ...news.subscription_manager.scheduler import (
389 get_news_scheduler,
390 )
392 scheduler = get_news_scheduler()
393 if scheduler.is_running: 393 ↛ 401line 393 didn't jump to line 401 because the condition on line 393 was always true
394 scheduler.update_user_info(username, password)
395 logger.info(
396 f"Updated scheduler with new user info for {username}"
397 )
398 except Exception:
399 logger.exception("Could not update scheduler on registration")
401 logger.info(f"New user registered: {username}")
403 # Initialize library system (source types and default collection)
404 from ...database.library_init import initialize_library_for_user
406 try:
407 init_results = initialize_library_for_user(username, password)
408 if init_results.get("success"): 408 ↛ 413line 408 didn't jump to line 413 because the condition on line 408 was always true
409 logger.info(
410 f"Library system initialized for new user {username}"
411 )
412 else:
413 logger.warning(
414 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}"
415 )
416 except Exception:
417 logger.exception(
418 f"Error initializing library for new user {username}"
419 )
420 # Don't block registration on library init failure
422 return redirect(url_for("index"))
424 except Exception:
425 logger.exception(f"Registration failed for {username}")
426 flash("Registration failed. Please try again.", "error")
427 return render_template(
428 "auth/register.html", has_encryption=db_manager.has_encryption
429 ), 500
432@auth_bp.route("/logout", methods=["GET", "POST"])
433def logout():
434 """
435 Logout handler.
436 Clears session and closes database connections.
437 Supports both GET (for direct navigation) and POST (for form submission).
438 """
439 username = session.get("username")
440 session_id = session.get("session_id")
442 if username:
443 # Close database connection
444 db_manager.close_user_database(username)
446 # Clear session
447 if session_id: 447 ↛ 455line 447 didn't jump to line 455 because the condition on line 447 was always true
448 session_manager.destroy_session(session_id)
450 # Clear session password
451 from ...database.session_passwords import session_password_store
453 session_password_store.clear_session(username, session_id)
455 session.clear()
457 logger.info(f"User {username} logged out")
458 flash("You have been logged out successfully", "info")
460 return redirect(url_for("auth.login"))
463@auth_bp.route("/check", methods=["GET"])
464def check_auth():
465 """
466 Check if user is authenticated (for AJAX requests).
467 """
468 if session.get("username"):
469 return jsonify({"authenticated": True, "username": session["username"]})
470 else:
471 return jsonify({"authenticated": False}), 401
474@auth_bp.route("/change-password", methods=["GET", "POST"])
475def change_password():
476 """
477 Change password for current user.
478 Requires current password and re-encrypts database.
479 """
480 username = session.get("username")
481 if not username:
482 return redirect(url_for("auth.login"))
484 if request.method == "GET": 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 return render_template("auth/change_password.html")
487 # POST - Handle password change
488 current_password = request.form.get("current_password", "")
489 new_password = request.form.get("new_password", "")
490 confirm_password = request.form.get("confirm_password", "")
492 # Validation
493 errors = []
495 if not current_password:
496 errors.append("Current password is required")
498 if not new_password: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 errors.append("New password is required")
500 elif len(new_password) < 8: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 errors.append("New password must be at least 8 characters")
503 if new_password != confirm_password: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true
504 errors.append("New passwords do not match")
506 if current_password == new_password:
507 errors.append("New password must be different from current password")
509 if errors: 509 ↛ 515line 509 didn't jump to line 515 because the condition on line 509 was always true
510 for error in errors:
511 flash(error, "error")
512 return render_template("auth/change_password.html"), 400
514 # Attempt password change
515 success = db_manager.change_password(
516 username, current_password, new_password
517 )
519 if success:
520 # The rekey is the ONLY step needed. The auth database stores no
521 # password hash — login works by attempting to decrypt the user's
522 # SQLCipher database. Do NOT add an auth-DB password-hash update
523 # here; it would fail (User model has no set_password method) and
524 # is architecturally unnecessary.
525 session.clear()
527 logger.info(f"Password changed for user {username}")
528 flash(
529 "Password changed successfully. Please login with your new password.",
530 "success",
531 )
532 return redirect(url_for("auth.login"))
533 else:
534 flash("Current password is incorrect", "error")
535 return render_template("auth/change_password.html"), 401
538@auth_bp.route("/integrity-check", methods=["GET"])
539def integrity_check():
540 """
541 Check database integrity for current user.
542 """
543 username = session.get("username")
544 if not username:
545 return jsonify({"error": "Not authenticated"}), 401
547 is_valid = db_manager.check_database_integrity(username)
549 return jsonify(
550 {
551 "username": username,
552 "integrity": "valid" if is_valid else "corrupted",
553 "timestamp": datetime.now(timezone.utc).isoformat(),
554 }
555 )