Coverage for src / local_deep_research / web / auth / database_middleware.py: 84%
40 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Middleware to ensure database connections are available for authenticated users.
3"""
5from flask import g, session
7from ...database.encrypted_db import db_manager
8from ...database.thread_local_session import get_metrics_session
9from .middleware_optimizer import should_skip_database_middleware
12def ensure_user_database():
13 """
14 Ensure the user's database is open for the current request.
15 This is called as a before_request handler.
16 """
17 # Skip for requests that don't need database access
18 if should_skip_database_middleware():
19 return
21 # Check if we already have a session for this request
22 if hasattr(g, "db_session") and g.db_session: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true
23 return # Already set up
25 username = session.get("username")
26 if username:
27 # Try to get password from various sources
28 password = None
30 # Check for temporary auth token (post-registration/login)
31 temp_auth_token = session.get("temp_auth_token")
32 if temp_auth_token:
33 from ...database.temp_auth import temp_auth_store
35 auth_data = temp_auth_store.retrieve_auth(temp_auth_token)
36 if auth_data: 36 ↛ 54line 36 didn't jump to line 54 because the condition on line 36 was always true
37 stored_username, password = auth_data
38 if stored_username == username: 38 ↛ 54line 38 didn't jump to line 54 because the condition on line 38 was always true
39 # Remove token from session after use
40 session.pop("temp_auth_token", None)
42 # Store in session password store for future requests
43 session_id = session.get("session_id")
44 if session_id: 44 ↛ 54line 44 didn't jump to line 54 because the condition on line 44 was always true
45 from ...database.session_passwords import (
46 session_password_store,
47 )
49 session_password_store.store_session_password(
50 username, session_id, password
51 )
53 # If no password from temp auth, try session password store
54 if not password:
55 session_id = session.get("session_id")
56 if session_id:
57 from ...database.session_passwords import session_password_store
59 password = session_password_store.get_session_password(
60 username, session_id
61 )
63 # For unencrypted databases, use dummy password
64 if not password and not db_manager.has_encryption: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 password = "dummy"
67 # If we have a password, get or create thread-local session
68 if password:
69 try:
70 # Use thread-local session manager for efficiency
71 db_session = get_metrics_session(username, password)
72 if db_session: 72 ↛ exitline 72 didn't return from function 'ensure_user_database' because the condition on line 72 was always true
73 g.db_session = db_session
74 g.user_password = password
75 g.username = username
76 except Exception:
77 # Don't log exceptions here to avoid deadlock
78 pass