Coverage for src / local_deep_research / web / auth / decorators.py: 99%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Authentication decorators for protecting routes. 

3""" 

4 

5from functools import wraps 

6 

7from flask import g, jsonify, redirect, request, session, url_for 

8from loguru import logger 

9 

10from ...database.encrypted_db import db_manager 

11from ...security.url_validator import URLValidator 

12 

13 

14def _safe_redirect_to_login(): 

15 """ 

16 Redirect to login with validated next parameter. 

17 

18 Uses request.url as next parameter only if it passes 

19 security validation to prevent open redirect vulnerabilities. 

20 

21 Returns: 

22 Flask redirect response 

23 """ 

24 next_url = request.url 

25 # Validate that next URL is safe before using it 

26 if URLValidator.is_safe_redirect_url(next_url, request.host_url): 

27 return redirect(url_for("auth.login", next=next_url)) 

28 # Fall back to login without next parameter if validation fails 

29 return redirect(url_for("auth.login")) 

30 

31 

32def login_required(f): 

33 """ 

34 Decorator to require authentication for a route. 

35 Redirects to login page if not authenticated. 

36 """ 

37 

38 @wraps(f) 

39 def decorated_function(*args, **kwargs): 

40 if "username" not in session: 

41 logger.debug( 

42 f"Unauthenticated access attempt to {request.endpoint}" 

43 ) 

44 # For API routes, return JSON error instead of redirect 

45 if request.path.startswith("/api/") or request.path.startswith( 

46 "/settings/api/" 

47 ): 

48 return jsonify({"error": "Authentication required"}), 401 

49 return _safe_redirect_to_login() 

50 

51 # Check if we have an active database connection 

52 username = session["username"] 

53 if not db_manager.is_user_connected(username): 

54 # Use debug level to reduce log noise for persistent sessions 

55 logger.debug( 

56 f"No database connection for authenticated user {username}" 

57 ) 

58 # For API routes, return JSON error instead of redirect 

59 if request.path.startswith("/api/") or request.path.startswith( 

60 "/settings/api/" 

61 ): 

62 return jsonify({"error": "Database connection required"}), 401 

63 session.clear() 

64 return _safe_redirect_to_login() 

65 

66 return f(*args, **kwargs) 

67 

68 return decorated_function 

69 

70 

71def current_user(): 

72 """ 

73 Get the current authenticated user's username. 

74 Returns None if not authenticated. 

75 """ 

76 return session.get("username") 

77 

78 

79def get_current_db_session(): 

80 """ 

81 Get the database session for the current user. 

82 Must be called within a login_required route. 

83 """ 

84 username = current_user() 

85 if username: 

86 return db_manager.get_session(username) 

87 return None 

88 

89 

90def inject_current_user(): 

91 """ 

92 Flask before_request handler to inject current user into g. 

93 """ 

94 g.current_user = current_user() 

95 if g.current_user: 

96 # Try to get the database session 

97 try: 

98 g.db_session = db_manager.get_session(g.current_user) 

99 if g.db_session is None: 

100 # Check if we have an active database connection for this user 

101 if not db_manager.is_user_connected(g.current_user): 101 ↛ exitline 101 didn't return from function 'inject_current_user' because the condition on line 101 was always true

102 # For authenticated users without a database connection, 

103 # we need to handle this differently based on the route type 

104 

105 # For API routes and auth routes, allow the request to continue 

106 # The individual route handlers will deal with the missing database 

107 if ( 

108 request.path.startswith("/api/") 

109 or request.path.startswith("/auth/") 

110 or request.path.startswith("/settings/api/") 

111 ): 

112 logger.debug( 

113 f"No database for user {g.current_user} on API/auth route" 

114 ) 

115 else: 

116 # For regular routes, this is a stale session that needs clearing 

117 logger.debug( 

118 f"Clearing stale session for user {g.current_user}" 

119 ) 

120 session.clear() 

121 g.current_user = None 

122 g.db_session = None 

123 except Exception: 

124 logger.exception(f"Error getting session for user {g.current_user}") 

125 g.db_session = None 

126 else: 

127 g.db_session = None