Coverage for src / local_deep_research / web / auth / decorators.py: 100%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Authentication decorators for protecting routes.
3"""
5from functools import wraps
7from flask import g, jsonify, redirect, request, session, url_for
8from loguru import logger
10from ...database.encrypted_db import db_manager
11from ...security.url_validator import URLValidator
14def _safe_redirect_to_login():
15 """
16 Redirect to login with validated next parameter.
18 Uses request.url as next parameter only if it passes
19 security validation to prevent open redirect vulnerabilities.
21 Returns:
22 Flask redirect response
23 """
24 next_url = request.url
25 # Validate that next URL is safe before using it
26 if URLValidator.is_safe_redirect_url(next_url, request.host_url):
27 return redirect(url_for("auth.login", next=next_url))
28 # Fall back to login without next parameter if validation fails
29 return redirect(url_for("auth.login"))
32def login_required(f):
33 """
34 Decorator to require authentication for a route.
35 Redirects to login page if not authenticated.
36 """
38 @wraps(f)
39 def decorated_function(*args, **kwargs):
40 if "username" not in session:
41 logger.debug(
42 f"Unauthenticated access attempt to {request.endpoint}"
43 )
44 # For API routes, return JSON error instead of redirect
45 if request.path.startswith("/api/") or request.path.startswith(
46 "/settings/api/"
47 ):
48 return jsonify({"error": "Authentication required"}), 401
49 return _safe_redirect_to_login()
51 # Check if we have an active database connection
52 username = session["username"]
53 if not db_manager.is_user_connected(username):
54 # Use debug level to reduce log noise for persistent sessions
55 logger.debug(
56 f"No database connection for authenticated user {username}"
57 )
58 # For API routes, return JSON error instead of redirect
59 if request.path.startswith("/api/") or request.path.startswith(
60 "/settings/api/"
61 ):
62 return jsonify({"error": "Database connection required"}), 401
63 session.clear()
64 return _safe_redirect_to_login()
66 return f(*args, **kwargs)
68 return decorated_function
71def current_user():
72 """
73 Get the current authenticated user's username.
74 Returns None if not authenticated.
75 """
76 return session.get("username")
79def get_current_db_session():
80 """
81 Get the database session for the current user.
82 Must be called within a login_required route.
83 """
84 username = current_user()
85 if username:
86 return db_manager.get_session(username)
87 return None
90def inject_current_user():
91 """
92 Flask before_request handler to inject current user into g.
93 """
94 g.current_user = current_user()
95 if g.current_user:
96 # Check connectivity
97 if not db_manager.is_user_connected(g.current_user):
98 # For API/auth routes, allow the request to continue
99 if (
100 request.path.startswith("/api/")
101 or request.path.startswith("/auth/")
102 or request.path.startswith("/settings/api/")
103 ):
104 logger.debug(
105 f"No database for user {g.current_user} on API/auth route"
106 )
107 else:
108 logger.debug(
109 f"Clearing stale session for user {g.current_user}"
110 )
111 session.clear()
112 g.current_user = None
113 g.db_session = None
114 else:
115 # Session will be created lazily by get_g_db_session() on first
116 # access. This avoids checking out a pool connection for requests
117 # that never touch the database (status polls, health checks, etc.).
118 g.db_session = None
119 else:
120 g.db_session = None