Coverage for src/local_deep_research/web/auth/decorators.py: 100%
48 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Authentication decorators for protecting routes.
3"""
5from functools import wraps
7from flask import g, jsonify, redirect, request, session, url_for
8from loguru import logger
10from ...database.encrypted_db import db_manager
11from ...security.url_validator import URLValidator
14def _safe_redirect_to_login():
15 """
16 Redirect to login with validated next parameter.
18 Uses request.url as next parameter only if it passes
19 security validation to prevent open redirect vulnerabilities.
21 Returns:
22 Flask redirect response
23 """
24 next_url = request.url
25 # Validate that next URL is safe before using it
26 if URLValidator.is_safe_redirect_url(next_url, request.host_url):
27 return redirect(url_for("auth.login", next=next_url))
28 # Fall back to login without next parameter if validation fails
29 return redirect(url_for("auth.login"))
32def _is_api_path(path: str) -> bool:
33 """Detect API request paths that should receive JSON, not HTML redirects.
35 Matches `/api/` anywhere in the path (so nested API blueprints like
36 `/news/api/...` and `/library/api/...` work, not just top-level
37 `/api/...`), and also paths that end in `/api` with no further
38 segments (e.g. `/settings/api`, `/history/api` are JSON endpoints).
40 The `api` segment must be slash-bounded — non-API paths like
41 `/apidocs` or `/openapi.json` are not matched.
42 """
43 return "/api/" in path or path.endswith("/api")
46def login_required(f):
47 """
48 Decorator to require authentication for a route.
49 Redirects to login page if not authenticated.
50 """
52 @wraps(f)
53 def decorated_function(*args, **kwargs):
54 if "username" not in session:
55 logger.debug(
56 f"Unauthenticated access attempt to {request.endpoint}"
57 )
58 if _is_api_path(request.path):
59 return jsonify({"error": "Authentication required"}), 401
60 return _safe_redirect_to_login()
62 # Check if we have an active database connection
63 username = session["username"]
64 if not db_manager.is_user_connected(username):
65 # Use debug level to reduce log noise for persistent sessions
66 logger.debug(
67 f"No database connection for authenticated user {username}"
68 )
69 if _is_api_path(request.path):
70 return jsonify({"error": "Database connection required"}), 401
71 session.clear()
72 return _safe_redirect_to_login()
74 return f(*args, **kwargs)
76 return decorated_function
79def current_user():
80 """
81 Get the current authenticated user's username.
82 Returns None if not authenticated.
83 """
84 return session.get("username")
87def get_current_db_session():
88 """
89 Get the database session for the current user.
90 Must be called within a login_required route.
91 """
92 username = current_user()
93 if username:
94 return db_manager.get_session(username)
95 return None
98def inject_current_user():
99 """
100 Flask before_request handler to inject current user into g.
101 """
102 g.current_user = current_user()
103 if g.current_user:
104 # Check connectivity
105 if not db_manager.is_user_connected(g.current_user):
106 # For API/auth routes, allow the request to continue
107 if _is_api_path(request.path) or request.path.startswith("/auth/"):
108 logger.debug(
109 f"No database for user {g.current_user} on API/auth route"
110 )
111 else:
112 logger.debug(
113 f"Clearing stale session for user {g.current_user}"
114 )
115 session.clear()
116 g.current_user = None
117 g.db_session = None
118 else:
119 # Session will be created lazily by get_g_db_session() on first
120 # access. This avoids checking out a pool connection for requests
121 # that never touch the database (status polls, health checks, etc.).
122 g.db_session = None
123 else:
124 g.db_session = None