Coverage for src / local_deep_research / web / auth / routes.py: 83%

265 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Authentication routes for login, register, and logout. 

3Uses SQLCipher encrypted databases with browser password manager support. 

4""" 

5 

6import threading 

7from datetime import datetime, timezone, UTC 

8 

9from flask import ( 

10 Blueprint, 

11 flash, 

12 jsonify, 

13 redirect, 

14 render_template, 

15 request, 

16 session, 

17 url_for, 

18) 

19from loguru import logger 

20 

21from ...database.auth_db import get_auth_db_session 

22from ...database.encrypted_db import db_manager 

23from ...database.models.auth import User 

24from sqlalchemy.exc import IntegrityError 

25from ...utilities.threading_utils import thread_context, thread_with_app_context 

26from .session_manager import SessionManager 

27from ..server_config import load_server_config 

28from ..utils.rate_limiter import login_limit, registration_limit 

29from ...security.url_validator import URLValidator 

30 

31auth_bp = Blueprint("auth", __name__, url_prefix="/auth") 

32session_manager = SessionManager() 

33 

34 

35@auth_bp.route("/csrf-token", methods=["GET"]) 

36def get_csrf_token(): 

37 """ 

38 Get CSRF token for API requests. 

39 Returns the current CSRF token for the session. 

40 This endpoint makes it easy for API clients to get the CSRF token 

41 programmatically without parsing HTML. 

42 """ 

43 from flask_wtf.csrf import generate_csrf 

44 

45 # Generate or get existing CSRF token for this session 

46 token = generate_csrf() 

47 

48 return jsonify({"csrf_token": token}), 200 

49 

50 

51@auth_bp.route("/login", methods=["GET"]) 

52def login_page(): 

53 """ 

54 Login page (GET only). 

55 Not rate limited - viewing the page should always work. 

56 """ 

57 config = load_server_config() 

58 # Check if already logged in 

59 if session.get("username"): 

60 return redirect(url_for("index")) 

61 

62 # Preserve the next parameter for post-login redirect 

63 next_page = request.args.get("next", "") 

64 

65 return render_template( 

66 "auth/login.html", 

67 has_encryption=db_manager.has_encryption, 

68 allow_registrations=config.get("allow_registrations", True), 

69 next_page=next_page, 

70 ) 

71 

72 

73@auth_bp.route("/login", methods=["POST"]) 

74@login_limit 

75def login(): 

76 """ 

77 Login handler (POST only). 

78 Rate limited to 5 attempts per 15 minutes per IP to prevent brute force attacks. 

79 """ 

80 config = load_server_config() 

81 # POST - Handle login 

82 username = request.form.get("username", "").strip() 

83 password = request.form.get("password", "") 

84 remember = request.form.get("remember", "false") == "true" 

85 

86 if not username or not password: 

87 flash("Username and password are required", "error") 

88 return render_template( 

89 "auth/login.html", 

90 has_encryption=db_manager.has_encryption, 

91 allow_registrations=config.get("allow_registrations", True), 

92 ), 400 

93 

94 # Try to open user's encrypted database 

95 engine = db_manager.open_user_database(username, password) 

96 

97 if engine is None: 

98 # Invalid credentials or database doesn't exist 

99 logger.warning(f"Failed login attempt for username: {username}") 

100 flash("Invalid username or password", "error") 

101 return render_template( 

102 "auth/login.html", 

103 has_encryption=db_manager.has_encryption, 

104 allow_registrations=config.get("allow_registrations", True), 

105 ), 401 

106 

107 # Success! Create session 

108 session_id = session_manager.create_session(username, remember) 

109 session["session_id"] = session_id 

110 session["username"] = username 

111 session.permanent = remember 

112 

113 # Store password temporarily for post-login database access 

114 from ...database.temp_auth import temp_auth_store 

115 

116 auth_token = temp_auth_store.store_auth(username, password) 

117 session["temp_auth_token"] = auth_token 

118 

119 # Also store in session password store for metrics access 

120 from ...database.session_passwords import session_password_store 

121 

122 session_password_store.store_session_password( 

123 username, session_id, password 

124 ) 

125 

126 logger.info(f"User {username} logged in successfully") 

127 

128 # Defer non-critical post-login work to a background thread so the 

129 # redirect returns immediately (settings migration, library init, 

130 # news scheduler, and model cache clearing are all idempotent and 

131 # can safely run after the response). 

132 app_ctx = thread_context() 

133 thread = threading.Thread( 

134 target=thread_with_app_context(_perform_post_login_tasks), 

135 args=(app_ctx, username, password), 

136 daemon=True, 

137 ) 

138 thread.start() 

139 

140 # Redirect to original requested page or dashboard 

141 next_page = request.args.get("next", "") 

142 

143 # Security: URLValidator.is_safe_redirect_url() validates that the target URL 

144 # has http/https scheme and netloc matches the application host, preventing 

145 # open redirect attacks (CWE-601). CodeQL doesn't recognize custom sanitizers. 

146 if next_page and URLValidator.is_safe_redirect_url( 

147 next_page, request.host_url 

148 ): 

149 return redirect(next_page) # codeql[py/url-redirection] 

150 

151 return redirect(url_for("index")) 

152 

153 

154def _perform_post_login_tasks(username: str, password: str) -> None: 

155 """Run non-critical post-login operations in a background thread. 

156 

157 Each operation is wrapped in its own try/except so that one failure 

158 does not prevent the others from running. All operations here are 

159 idempotent and safe to retry on the next login. 

160 """ 

161 # 1. Settings version check + migration 

162 try: 

163 from ...settings.manager import SettingsManager 

164 

165 db_session = db_manager.get_session(username) 

166 try: 

167 if db_session: 167 ↛ 183line 167 didn't jump to line 183 because the condition on line 167 was always true

168 settings_manager = SettingsManager(db_session) 

169 if not settings_manager.db_version_matches_package(): 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 logger.info( 

171 f"Database version mismatch for {username} " 

172 "- loading missing default settings" 

173 ) 

174 settings_manager.load_from_defaults_file( 

175 commit=True, overwrite=False 

176 ) 

177 settings_manager.update_db_version() 

178 logger.info( 

179 f"Missing default settings loaded and version " 

180 f"updated for user {username}" 

181 ) 

182 finally: 

183 if db_session: 183 ↛ 189line 183 didn't jump to line 189 because the condition on line 183 was always true

184 db_session.close() 

185 except Exception: 

186 logger.exception(f"Post-login settings migration failed for {username}") 

187 

188 # 2. Initialize library system (source types and default collection) 

189 try: 

190 from ...database.library_init import initialize_library_for_user 

191 

192 init_results = initialize_library_for_user(username, password) 

193 if init_results.get("success"): 

194 logger.info(f"Library system initialized for user {username}") 

195 else: 

196 logger.warning( 

197 f"Library initialization issue for {username}: " 

198 f"{init_results.get('error', 'Unknown error')}" 

199 ) 

200 except Exception: 

201 logger.exception(f"Post-login library init failed for {username}") 

202 

203 # 3. Update last_login in auth DB + notify news scheduler 

204 try: 

205 auth_db = get_auth_db_session() 

206 try: 

207 user = auth_db.query(User).filter_by(username=username).first() 

208 if user: 

209 user.last_login = datetime.now(UTC) 

210 

211 try: 

212 from ...news.subscription_manager.scheduler import ( 

213 get_news_scheduler, 

214 ) 

215 

216 scheduler = get_news_scheduler() 

217 if scheduler.is_running: 217 ↛ 225line 217 didn't jump to line 225 because the condition on line 217 was always true

218 scheduler.update_user_info(username, password) 

219 logger.info( 

220 f"Updated scheduler with user info for {username}" 

221 ) 

222 except Exception: 

223 logger.exception("Could not update scheduler on login") 

224 

225 auth_db.commit() 

226 finally: 

227 auth_db.close() 

228 except Exception: 

229 logger.exception(f"Post-login auth DB update failed for {username}") 

230 

231 # 4. Clear the model cache to ensure fresh provider data 

232 try: 

233 from ...database.models import ProviderModel 

234 

235 user_db_session = db_manager.get_session(username) 

236 try: 

237 if user_db_session: 

238 deleted_count = user_db_session.query(ProviderModel).delete() 

239 user_db_session.commit() 

240 logger.info( 

241 f"Cleared {deleted_count} cached models " 

242 f"for user {username} on login" 

243 ) 

244 finally: 

245 if user_db_session: 

246 user_db_session.close() 

247 except Exception: 

248 logger.exception(f"Post-login model cache clear failed for {username}") 

249 

250 logger.info(f"Post-login tasks completed for user {username}") 

251 

252 

253@auth_bp.route("/register", methods=["GET"]) 

254def register_page(): 

255 """ 

256 Registration page (GET only). 

257 Not rate limited - viewing the page should always work. 

258 """ 

259 config = load_server_config() 

260 if not config.get("allow_registrations", True): 

261 flash("New user registrations are currently disabled.", "error") 

262 return redirect(url_for("auth.login_page")) 

263 

264 return render_template( 

265 "auth/register.html", has_encryption=db_manager.has_encryption 

266 ) 

267 

268 

269@auth_bp.route("/register", methods=["POST"]) 

270@registration_limit 

271def register(): 

272 """ 

273 Registration handler (POST only). 

274 Creates new encrypted database for user with clear warnings about password recovery. 

275 Rate limited to 3 attempts per hour per IP to prevent registration spam. 

276 """ 

277 config = load_server_config() 

278 if not config.get("allow_registrations", True): 

279 flash("New user registrations are currently disabled.", "error") 

280 return redirect(url_for("auth.login_page")) 

281 

282 # POST - Handle registration 

283 username = request.form.get("username", "").strip() 

284 password = request.form.get("password", "") 

285 confirm_password = request.form.get("confirm_password", "") 

286 acknowledge = request.form.get("acknowledge", "false") == "true" 

287 

288 # Validation 

289 errors = [] 

290 

291 if not username: 

292 errors.append("Username is required") 

293 elif len(username) < 3: 

294 errors.append("Username must be at least 3 characters") 

295 elif not username.replace("_", "").replace("-", "").isalnum(): 

296 errors.append( 

297 "Username can only contain letters, numbers, underscores, and hyphens" 

298 ) 

299 

300 if not password: 

301 errors.append("Password is required") 

302 elif len(password) < 8: 

303 errors.append("Password must be at least 8 characters") 

304 

305 if password != confirm_password: 

306 errors.append("Passwords do not match") 

307 

308 if not acknowledge: 

309 errors.append( 

310 "You must acknowledge that password recovery is not possible" 

311 ) 

312 

313 # Check if user already exists 

314 # Use generic error message to prevent account enumeration 

315 # Note: While this creates a minor timing difference, it's acceptable because: 

316 # 1. Rate limiting prevents automated timing analysis 

317 # 2. Generic error message prevents content-based enumeration 

318 # 3. Local database query timing is minimal (no network calls) 

319 # 4. Better UX with immediate feedback outweighs minor timing risk 

320 # See: https://cheatsheetseries.owasp.org/cheatsheets/Authentication_Cheat_Sheet.html 

321 if username and db_manager.user_exists(username): 

322 errors.append("Registration failed. Please try a different username.") 

323 

324 if errors: 

325 for error in errors: 

326 flash(error, "error") 

327 return render_template( 

328 "auth/register.html", has_encryption=db_manager.has_encryption 

329 ), 400 

330 

331 # Create user in auth database 

332 auth_db = get_auth_db_session() 

333 try: 

334 new_user = User(username=username) 

335 auth_db.add(new_user) 

336 auth_db.commit() 

337 except IntegrityError: 

338 # Catch duplicate username specifically (race condition case) 

339 # This handles the edge case where two requests for the same username 

340 # pass the user_exists() check simultaneously 

341 logger.warning(f"Duplicate username attempted: {username}") 

342 auth_db.rollback() 

343 auth_db.close() 

344 flash("Registration failed. Please try a different username.", "error") 

345 return render_template( 

346 "auth/register.html", has_encryption=db_manager.has_encryption 

347 ), 400 

348 except Exception: 

349 logger.exception(f"Registration failed for {username}") 

350 auth_db.rollback() 

351 auth_db.close() 

352 flash("Registration failed. Please try again.", "error") 

353 return render_template( 

354 "auth/register.html", has_encryption=db_manager.has_encryption 

355 ), 500 

356 finally: 

357 # Note: In success case, we've already committed; in exception case, 

358 # we rollback and close above, but finally ensures close if commit succeeded 

359 try: 

360 auth_db.close() 

361 except Exception: 

362 pass # Already closed in exception handler 

363 

364 try: 

365 # Create encrypted database for user 

366 db_manager.create_user_database(username, password) 

367 

368 # Auto-login after registration 

369 session_id = session_manager.create_session(username, False) 

370 session["session_id"] = session_id 

371 session["username"] = username 

372 

373 # Store password temporarily for post-registration database access 

374 from ...database.temp_auth import temp_auth_store 

375 

376 auth_token = temp_auth_store.store_auth(username, password) 

377 session["temp_auth_token"] = auth_token 

378 

379 # Also store in session password store for metrics access 

380 from ...database.session_passwords import session_password_store 

381 

382 session_password_store.store_session_password( 

383 username, session_id, password 

384 ) 

385 

386 # Notify the news scheduler about the new user 

387 try: 

388 from ...news.subscription_manager.scheduler import ( 

389 get_news_scheduler, 

390 ) 

391 

392 scheduler = get_news_scheduler() 

393 if scheduler.is_running: 393 ↛ 401line 393 didn't jump to line 401 because the condition on line 393 was always true

394 scheduler.update_user_info(username, password) 

395 logger.info( 

396 f"Updated scheduler with new user info for {username}" 

397 ) 

398 except Exception: 

399 logger.exception("Could not update scheduler on registration") 

400 

401 logger.info(f"New user registered: {username}") 

402 

403 # Initialize library system (source types and default collection) 

404 from ...database.library_init import initialize_library_for_user 

405 

406 try: 

407 init_results = initialize_library_for_user(username, password) 

408 if init_results.get("success"): 408 ↛ 413line 408 didn't jump to line 413 because the condition on line 408 was always true

409 logger.info( 

410 f"Library system initialized for new user {username}" 

411 ) 

412 else: 

413 logger.warning( 

414 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}" 

415 ) 

416 except Exception: 

417 logger.exception( 

418 f"Error initializing library for new user {username}" 

419 ) 

420 # Don't block registration on library init failure 

421 

422 return redirect(url_for("index")) 

423 

424 except Exception: 

425 logger.exception(f"Registration failed for {username}") 

426 flash("Registration failed. Please try again.", "error") 

427 return render_template( 

428 "auth/register.html", has_encryption=db_manager.has_encryption 

429 ), 500 

430 

431 

432@auth_bp.route("/logout", methods=["GET", "POST"]) 

433def logout(): 

434 """ 

435 Logout handler. 

436 Clears session and closes database connections. 

437 Supports both GET (for direct navigation) and POST (for form submission). 

438 """ 

439 username = session.get("username") 

440 session_id = session.get("session_id") 

441 

442 if username: 

443 # Close database connection 

444 db_manager.close_user_database(username) 

445 

446 # Clear session 

447 if session_id: 447 ↛ 455line 447 didn't jump to line 455 because the condition on line 447 was always true

448 session_manager.destroy_session(session_id) 

449 

450 # Clear session password 

451 from ...database.session_passwords import session_password_store 

452 

453 session_password_store.clear_session(username, session_id) 

454 

455 session.clear() 

456 

457 logger.info(f"User {username} logged out") 

458 flash("You have been logged out successfully", "info") 

459 

460 return redirect(url_for("auth.login")) 

461 

462 

463@auth_bp.route("/check", methods=["GET"]) 

464def check_auth(): 

465 """ 

466 Check if user is authenticated (for AJAX requests). 

467 """ 

468 if session.get("username"): 

469 return jsonify({"authenticated": True, "username": session["username"]}) 

470 else: 

471 return jsonify({"authenticated": False}), 401 

472 

473 

474@auth_bp.route("/change-password", methods=["GET", "POST"]) 

475def change_password(): 

476 """ 

477 Change password for current user. 

478 Requires current password and re-encrypts database. 

479 """ 

480 username = session.get("username") 

481 if not username: 

482 return redirect(url_for("auth.login")) 

483 

484 if request.method == "GET": 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 return render_template("auth/change_password.html") 

486 

487 # POST - Handle password change 

488 current_password = request.form.get("current_password", "") 

489 new_password = request.form.get("new_password", "") 

490 confirm_password = request.form.get("confirm_password", "") 

491 

492 # Validation 

493 errors = [] 

494 

495 if not current_password: 

496 errors.append("Current password is required") 

497 

498 if not new_password: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 errors.append("New password is required") 

500 elif len(new_password) < 8: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 errors.append("New password must be at least 8 characters") 

502 

503 if new_password != confirm_password: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true

504 errors.append("New passwords do not match") 

505 

506 if current_password == new_password: 

507 errors.append("New password must be different from current password") 

508 

509 if errors: 509 ↛ 515line 509 didn't jump to line 515 because the condition on line 509 was always true

510 for error in errors: 

511 flash(error, "error") 

512 return render_template("auth/change_password.html"), 400 

513 

514 # Attempt password change 

515 success = db_manager.change_password( 

516 username, current_password, new_password 

517 ) 

518 

519 if success: 

520 # The rekey is the ONLY step needed. The auth database stores no 

521 # password hash — login works by attempting to decrypt the user's 

522 # SQLCipher database. Do NOT add an auth-DB password-hash update 

523 # here; it would fail (User model has no set_password method) and 

524 # is architecturally unnecessary. 

525 session.clear() 

526 

527 logger.info(f"Password changed for user {username}") 

528 flash( 

529 "Password changed successfully. Please login with your new password.", 

530 "success", 

531 ) 

532 return redirect(url_for("auth.login")) 

533 else: 

534 flash("Current password is incorrect", "error") 

535 return render_template("auth/change_password.html"), 401 

536 

537 

538@auth_bp.route("/integrity-check", methods=["GET"]) 

539def integrity_check(): 

540 """ 

541 Check database integrity for current user. 

542 """ 

543 username = session.get("username") 

544 if not username: 

545 return jsonify({"error": "Not authenticated"}), 401 

546 

547 is_valid = db_manager.check_database_integrity(username) 

548 

549 return jsonify( 

550 { 

551 "username": username, 

552 "integrity": "valid" if is_valid else "corrupted", 

553 "timestamp": datetime.now(timezone.utc).isoformat(), 

554 } 

555 )