Coverage for src / local_deep_research / web / auth / routes.py: 80%
243 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Authentication routes for login, register, and logout.
3Uses SQLCipher encrypted databases with browser password manager support.
4"""
6from datetime import datetime, timezone, UTC
7from pathlib import PurePosixPath
9from flask import (
10 Blueprint,
11 flash,
12 jsonify,
13 redirect,
14 render_template,
15 request,
16 session,
17 url_for,
18)
19from loguru import logger
21from ...database.auth_db import get_auth_db_session
22from ...database.encrypted_db import db_manager
23from ...database.models.auth import User
24from .session_manager import SessionManager
25from ..server_config import load_server_config
26from ..utils.rate_limiter import login_limit, registration_limit
28auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
29session_manager = SessionManager()
32@auth_bp.route("/csrf-token", methods=["GET"])
33def get_csrf_token():
34 """
35 Get CSRF token for API requests.
36 Returns the current CSRF token for the session.
37 This endpoint makes it easy for API clients to get the CSRF token
38 programmatically without parsing HTML.
39 """
40 from flask_wtf.csrf import generate_csrf
42 # Generate or get existing CSRF token for this session
43 token = generate_csrf()
45 return jsonify({"csrf_token": token}), 200
48@auth_bp.route("/login", methods=["GET"])
49def login_page():
50 """
51 Login page (GET only).
52 Not rate limited - viewing the page should always work.
53 """
54 config = load_server_config()
55 # Check if already logged in
56 if session.get("username"):
57 return redirect(url_for("index"))
59 # Preserve the next parameter for post-login redirect
60 next_page = request.args.get("next", "")
62 return render_template(
63 "auth/login.html",
64 has_encryption=db_manager.has_encryption,
65 allow_registrations=config.get("allow_registrations", True),
66 next_page=next_page,
67 )
70@auth_bp.route("/login", methods=["POST"])
71@login_limit
72def login():
73 """
74 Login handler (POST only).
75 Rate limited to 5 attempts per 15 minutes per IP to prevent brute force attacks.
76 """
77 config = load_server_config()
78 # POST - Handle login
79 username = request.form.get("username", "").strip()
80 password = request.form.get("password", "")
81 remember = request.form.get("remember", "false") == "true"
83 if not username or not password:
84 flash("Username and password are required", "error")
85 return render_template(
86 "auth/login.html",
87 has_encryption=db_manager.has_encryption,
88 allow_registrations=config.get("allow_registrations", True),
89 ), 400
91 # Try to open user's encrypted database
92 engine = db_manager.open_user_database(username, password)
94 if engine is None:
95 # Invalid credentials or database doesn't exist
96 logger.warning(f"Failed login attempt for username: {username}")
97 flash("Invalid username or password", "error")
98 return render_template(
99 "auth/login.html",
100 has_encryption=db_manager.has_encryption,
101 allow_registrations=config.get("allow_registrations", True),
102 ), 401
104 # Check if user has settings loaded (first login after migration)
105 from ..services.settings_manager import SettingsManager
107 db_session = db_manager.get_session(username)
108 if db_session: 108 ↛ 126line 108 didn't jump to line 126 because the condition on line 108 was always true
109 settings_manager = SettingsManager(db_session)
111 # Check if DB version matches package version
112 if not settings_manager.db_version_matches_package(): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 logger.info(
114 f"Database version mismatch for {username} - loading missing default settings"
115 )
116 # Load defaults but preserve existing user settings
117 settings_manager.load_from_defaults_file(
118 commit=True, overwrite=False
119 )
120 settings_manager.update_db_version()
121 logger.info(
122 f"Missing default settings loaded and version updated for user {username}"
123 )
125 # Initialize library system (source types and default collection)
126 from ...database.library_init import initialize_library_for_user
128 try:
129 init_results = initialize_library_for_user(username, password)
130 if init_results.get("success"): 130 ↛ 133line 130 didn't jump to line 133 because the condition on line 130 was always true
131 logger.info(f"Library system initialized for user {username}")
132 else:
133 logger.warning(
134 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}"
135 )
136 except Exception:
137 logger.exception(f"Error initializing library for {username}")
138 # Don't block login on library init failure
140 # Success! Create session
141 session_id = session_manager.create_session(username, remember)
142 session["session_id"] = session_id
143 session["username"] = username
144 session.permanent = remember
146 # Store password temporarily for post-login database access
147 from ...database.temp_auth import temp_auth_store
149 auth_token = temp_auth_store.store_auth(username, password)
150 session["temp_auth_token"] = auth_token
152 # Also store in session password store for metrics access
153 from ...database.session_passwords import session_password_store
155 session_password_store.store_session_password(
156 username, session_id, password
157 )
159 # Update last login in auth database
160 auth_db = get_auth_db_session()
161 user = auth_db.query(User).filter_by(username=username).first()
162 if user:
163 user.last_login = datetime.now(UTC)
165 # Notify the news scheduler about the user login
166 try:
167 from ...news.subscription_manager.scheduler import get_news_scheduler
169 scheduler = get_news_scheduler()
170 if scheduler.is_running: 170 ↛ 176line 170 didn't jump to line 176 because the condition on line 170 was always true
171 scheduler.update_user_info(username, password)
172 logger.info(f"Updated scheduler with user info for {username}")
173 except Exception:
174 logger.exception("Could not update scheduler on login")
176 auth_db.commit()
177 auth_db.close()
179 logger.info(f"User {username} logged in successfully")
181 # Clear the model cache on login to ensure fresh provider data
182 try:
183 from ...database.models import ProviderModel
185 user_db_session = db_manager.get_session(username)
186 if user_db_session: 186 ↛ 198line 186 didn't jump to line 198 because the condition on line 186 was always true
187 # Delete all cached models for this user
188 deleted_count = user_db_session.query(ProviderModel).delete()
189 user_db_session.commit()
190 logger.info(
191 f"Cleared {deleted_count} cached models for user {username} on login"
192 )
193 except Exception:
194 logger.exception("Failed to clear model cache on login")
196 # Redirect to original requested page or dashboard
197 # Validate redirect URL to prevent open redirect vulnerability
198 from urllib.parse import urlparse
200 next_page = request.args.get("next", url_for("index"))
202 # Parse the URL and validate its structure
203 parsed_url = urlparse(next_page)
205 # Whitelist of allowed paths - add more as needed
206 allowed_paths = {
207 "/",
208 "/dashboard",
209 "/profile",
210 "/settings",
211 "/research",
212 "/history",
213 "/metrics",
214 "/benchmark",
215 }
217 # Also allow any path generated by url_for (they start with /)
218 safe_redirect = url_for("index") # Default safe URL
220 # Validate the URL: must be relative (no netloc) and safe
221 if next_page and not parsed_url.netloc and not parsed_url.scheme: 221 ↛ 249line 221 didn't jump to line 249 because the condition on line 221 was always true
222 # Normalize the path to prevent tricks like //example.com
223 # Using PurePosixPath for URL path normalization (not filesystem paths)
224 normalized_path = (
225 str(PurePosixPath(parsed_url.path)) if parsed_url.path else ""
226 )
228 # Check if it's in the whitelist or is a safe relative path
229 if parsed_url.path in allowed_paths: 229 ↛ 231line 229 didn't jump to line 231 because the condition on line 229 was always true
230 safe_redirect = parsed_url.path # Use parsed path, not raw input
231 elif (
232 parsed_url.path
233 and parsed_url.path.startswith("/")
234 and not parsed_url.path.startswith("//")
235 ):
236 # Additional checks for safety
237 if ".." not in parsed_url.path and normalized_path.startswith("/"):
238 safe_redirect = (
239 parsed_url.path
240 ) # Use parsed path, not raw input
241 else:
242 logger.warning(
243 f"Blocked potentially unsafe redirect with path traversal: {next_page}"
244 )
245 else:
246 logger.warning(
247 f"Blocked potentially unsafe redirect to: {next_page}"
248 )
249 elif next_page:
250 logger.warning(f"Blocked redirect with protocol/domain: {next_page}")
252 return redirect(safe_redirect)
255@auth_bp.route("/register", methods=["GET"])
256def register_page():
257 """
258 Registration page (GET only).
259 Not rate limited - viewing the page should always work.
260 """
261 config = load_server_config()
262 if not config.get("allow_registrations", True):
263 flash("New user registrations are currently disabled.", "error")
264 return redirect(url_for("auth.login_page"))
266 return render_template(
267 "auth/register.html", has_encryption=db_manager.has_encryption
268 )
271@auth_bp.route("/register", methods=["POST"])
272@registration_limit
273def register():
274 """
275 Registration handler (POST only).
276 Creates new encrypted database for user with clear warnings about password recovery.
277 Rate limited to 3 attempts per hour per IP to prevent registration spam.
278 """
279 config = load_server_config()
280 if not config.get("allow_registrations", True):
281 flash("New user registrations are currently disabled.", "error")
282 return redirect(url_for("auth.login_page"))
284 # POST - Handle registration
285 username = request.form.get("username", "").strip()
286 password = request.form.get("password", "")
287 confirm_password = request.form.get("confirm_password", "")
288 acknowledge = request.form.get("acknowledge", "false") == "true"
290 # Validation
291 errors = []
293 if not username:
294 errors.append("Username is required")
295 elif len(username) < 3:
296 errors.append("Username must be at least 3 characters")
297 elif not username.replace("_", "").replace("-", "").isalnum(): 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 errors.append(
299 "Username can only contain letters, numbers, underscores, and hyphens"
300 )
302 if not password:
303 errors.append("Password is required")
304 elif len(password) < 8:
305 errors.append("Password must be at least 8 characters")
307 if password != confirm_password:
308 errors.append("Passwords do not match")
310 if not acknowledge:
311 errors.append(
312 "You must acknowledge that password recovery is not possible"
313 )
315 # Check if user already exists
316 # Use generic error message to prevent account enumeration
317 if username and db_manager.user_exists(username):
318 errors.append("Registration failed. Please try a different username.")
320 if errors:
321 for error in errors:
322 flash(error, "error")
323 return render_template(
324 "auth/register.html", has_encryption=db_manager.has_encryption
325 ), 400
327 try:
328 # Create user in auth database
329 auth_db = get_auth_db_session()
330 new_user = User(username=username)
331 auth_db.add(new_user)
332 auth_db.commit()
333 auth_db.close()
335 # Create encrypted database for user
336 db_manager.create_user_database(username, password)
338 # Auto-login after registration
339 session_id = session_manager.create_session(username, False)
340 session["session_id"] = session_id
341 session["username"] = username
343 # Store password temporarily for post-registration database access
344 from ...database.temp_auth import temp_auth_store
346 auth_token = temp_auth_store.store_auth(username, password)
347 session["temp_auth_token"] = auth_token
349 # Also store in session password store for metrics access
350 from ...database.session_passwords import session_password_store
352 session_password_store.store_session_password(
353 username, session_id, password
354 )
356 # Notify the news scheduler about the new user
357 try:
358 from ...news.subscription_manager.scheduler import (
359 get_news_scheduler,
360 )
362 scheduler = get_news_scheduler()
363 if scheduler.is_running: 363 ↛ 371line 363 didn't jump to line 371 because the condition on line 363 was always true
364 scheduler.update_user_info(username, password)
365 logger.info(
366 f"Updated scheduler with new user info for {username}"
367 )
368 except Exception:
369 logger.exception("Could not update scheduler on registration")
371 logger.info(f"New user registered: {username}")
373 # Initialize library system (source types and default collection)
374 from ...database.library_init import initialize_library_for_user
376 try:
377 init_results = initialize_library_for_user(username, password)
378 if init_results.get("success"): 378 ↛ 383line 378 didn't jump to line 383 because the condition on line 378 was always true
379 logger.info(
380 f"Library system initialized for new user {username}"
381 )
382 else:
383 logger.warning(
384 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}"
385 )
386 except Exception:
387 logger.exception(
388 f"Error initializing library for new user {username}"
389 )
390 # Don't block registration on library init failure
392 return redirect(url_for("index"))
394 except Exception:
395 logger.exception(f"Registration failed for {username}")
397 # Rollback user creation if database creation failed
398 auth_db.rollback()
399 auth_db.close()
401 flash("Registration failed. Please try again.", "error")
402 return render_template(
403 "auth/register.html", has_encryption=db_manager.has_encryption
404 ), 500
407@auth_bp.route("/logout", methods=["GET", "POST"])
408def logout():
409 """
410 Logout handler.
411 Clears session and closes database connections.
412 Supports both GET (for direct navigation) and POST (for form submission).
413 """
414 username = session.get("username")
415 session_id = session.get("session_id")
417 if username:
418 # Close database connection
419 db_manager.close_user_database(username)
421 # Clear session
422 if session_id: 422 ↛ 430line 422 didn't jump to line 430 because the condition on line 422 was always true
423 session_manager.destroy_session(session_id)
425 # Clear session password
426 from ...database.session_passwords import session_password_store
428 session_password_store.clear_session(username, session_id)
430 session.clear()
432 logger.info(f"User {username} logged out")
433 flash("You have been logged out successfully", "info")
435 return redirect(url_for("auth.login"))
438@auth_bp.route("/check", methods=["GET"])
439def check_auth():
440 """
441 Check if user is authenticated (for AJAX requests).
442 """
443 if session.get("username"):
444 return jsonify({"authenticated": True, "username": session["username"]})
445 else:
446 return jsonify({"authenticated": False}), 401
449@auth_bp.route("/change-password", methods=["GET", "POST"])
450def change_password():
451 """
452 Change password for current user.
453 Requires current password and re-encrypts database.
454 """
455 username = session.get("username")
456 if not username: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true
457 return redirect(url_for("auth.login"))
459 if request.method == "GET": 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 return render_template("auth/change_password.html")
462 # POST - Handle password change
463 current_password = request.form.get("current_password", "")
464 new_password = request.form.get("new_password", "")
465 confirm_password = request.form.get("confirm_password", "")
467 # Validation
468 errors = []
470 if not current_password: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 errors.append("Current password is required")
473 if not new_password: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true
474 errors.append("New password is required")
475 elif len(new_password) < 8: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 errors.append("New password must be at least 8 characters")
478 if new_password != confirm_password: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 errors.append("New passwords do not match")
481 if current_password == new_password: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 errors.append("New password must be different from current password")
484 if errors: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 for error in errors:
486 flash(error, "error")
487 return render_template("auth/change_password.html"), 400
489 # Attempt password change
490 success = db_manager.change_password(
491 username, current_password, new_password
492 )
494 if success: 494 ↛ 505line 494 didn't jump to line 505 because the condition on line 494 was always true
495 # Clear session to force re-login with new password
496 session.clear()
498 logger.info(f"Password changed for user {username}")
499 flash(
500 "Password changed successfully. Please login with your new password.",
501 "success",
502 )
503 return redirect(url_for("auth.login"))
504 else:
505 flash("Current password is incorrect", "error")
506 return render_template("auth/change_password.html"), 401
509@auth_bp.route("/integrity-check", methods=["GET"])
510def integrity_check():
511 """
512 Check database integrity for current user.
513 """
514 username = session.get("username")
515 if not username: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 return jsonify({"error": "Not authenticated"}), 401
518 is_valid = db_manager.check_database_integrity(username)
520 return jsonify(
521 {
522 "username": username,
523 "integrity": "valid" if is_valid else "corrupted",
524 "timestamp": datetime.now(timezone.utc).isoformat(),
525 }
526 )