Coverage for src / local_deep_research / web / auth / decorators.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Authentication decorators for protecting routes. 

3""" 

4 

5from functools import wraps 

6 

7from flask import g, jsonify, redirect, request, session, url_for 

8from loguru import logger 

9 

10from ...database.encrypted_db import db_manager 

11from ...security.url_validator import URLValidator 

12 

13 

14def _safe_redirect_to_login(): 

15 """ 

16 Redirect to login with validated next parameter. 

17 

18 Uses request.url as next parameter only if it passes 

19 security validation to prevent open redirect vulnerabilities. 

20 

21 Returns: 

22 Flask redirect response 

23 """ 

24 next_url = request.url 

25 # Validate that next URL is safe before using it 

26 if URLValidator.is_safe_redirect_url(next_url, request.host_url): 

27 return redirect(url_for("auth.login", next=next_url)) 

28 # Fall back to login without next parameter if validation fails 

29 return redirect(url_for("auth.login")) 

30 

31 

32def login_required(f): 

33 """ 

34 Decorator to require authentication for a route. 

35 Redirects to login page if not authenticated. 

36 """ 

37 

38 @wraps(f) 

39 def decorated_function(*args, **kwargs): 

40 if "username" not in session: 

41 logger.debug( 

42 f"Unauthenticated access attempt to {request.endpoint}" 

43 ) 

44 # For API routes, return JSON error instead of redirect 

45 if request.path.startswith("/api/") or request.path.startswith( 

46 "/settings/api/" 

47 ): 

48 return jsonify({"error": "Authentication required"}), 401 

49 return _safe_redirect_to_login() 

50 

51 # Check if we have an active database connection 

52 username = session["username"] 

53 if not db_manager.is_user_connected(username): 

54 # Use debug level to reduce log noise for persistent sessions 

55 logger.debug( 

56 f"No database connection for authenticated user {username}" 

57 ) 

58 # For API routes, return JSON error instead of redirect 

59 if request.path.startswith("/api/") or request.path.startswith( 

60 "/settings/api/" 

61 ): 

62 return jsonify({"error": "Database connection required"}), 401 

63 session.clear() 

64 return _safe_redirect_to_login() 

65 

66 return f(*args, **kwargs) 

67 

68 return decorated_function 

69 

70 

71def current_user(): 

72 """ 

73 Get the current authenticated user's username. 

74 Returns None if not authenticated. 

75 """ 

76 return session.get("username") 

77 

78 

79def get_current_db_session(): 

80 """ 

81 Get the database session for the current user. 

82 Must be called within a login_required route. 

83 """ 

84 username = current_user() 

85 if username: 

86 return db_manager.get_session(username) 

87 return None 

88 

89 

90def inject_current_user(): 

91 """ 

92 Flask before_request handler to inject current user into g. 

93 """ 

94 g.current_user = current_user() 

95 if g.current_user: 

96 # Check connectivity 

97 if not db_manager.is_user_connected(g.current_user): 

98 # For API/auth routes, allow the request to continue 

99 if ( 

100 request.path.startswith("/api/") 

101 or request.path.startswith("/auth/") 

102 or request.path.startswith("/settings/api/") 

103 ): 

104 logger.debug( 

105 f"No database for user {g.current_user} on API/auth route" 

106 ) 

107 else: 

108 logger.debug( 

109 f"Clearing stale session for user {g.current_user}" 

110 ) 

111 session.clear() 

112 g.current_user = None 

113 g.db_session = None 

114 else: 

115 # Session will be created lazily by get_g_db_session() on first 

116 # access. This avoids checking out a pool connection for requests 

117 # that never touch the database (status polls, health checks, etc.). 

118 g.db_session = None 

119 else: 

120 g.db_session = None