Coverage for src / local_deep_research / web / auth / routes.py: 80%

243 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Authentication routes for login, register, and logout. 

3Uses SQLCipher encrypted databases with browser password manager support. 

4""" 

5 

6from datetime import datetime, timezone, UTC 

7from pathlib import PurePosixPath 

8 

9from flask import ( 

10 Blueprint, 

11 flash, 

12 jsonify, 

13 redirect, 

14 render_template, 

15 request, 

16 session, 

17 url_for, 

18) 

19from loguru import logger 

20 

21from ...database.auth_db import get_auth_db_session 

22from ...database.encrypted_db import db_manager 

23from ...database.models.auth import User 

24from .session_manager import SessionManager 

25from ..server_config import load_server_config 

26from ..utils.rate_limiter import login_limit, registration_limit 

27 

28auth_bp = Blueprint("auth", __name__, url_prefix="/auth") 

29session_manager = SessionManager() 

30 

31 

32@auth_bp.route("/csrf-token", methods=["GET"]) 

33def get_csrf_token(): 

34 """ 

35 Get CSRF token for API requests. 

36 Returns the current CSRF token for the session. 

37 This endpoint makes it easy for API clients to get the CSRF token 

38 programmatically without parsing HTML. 

39 """ 

40 from flask_wtf.csrf import generate_csrf 

41 

42 # Generate or get existing CSRF token for this session 

43 token = generate_csrf() 

44 

45 return jsonify({"csrf_token": token}), 200 

46 

47 

48@auth_bp.route("/login", methods=["GET"]) 

49def login_page(): 

50 """ 

51 Login page (GET only). 

52 Not rate limited - viewing the page should always work. 

53 """ 

54 config = load_server_config() 

55 # Check if already logged in 

56 if session.get("username"): 

57 return redirect(url_for("index")) 

58 

59 # Preserve the next parameter for post-login redirect 

60 next_page = request.args.get("next", "") 

61 

62 return render_template( 

63 "auth/login.html", 

64 has_encryption=db_manager.has_encryption, 

65 allow_registrations=config.get("allow_registrations", True), 

66 next_page=next_page, 

67 ) 

68 

69 

70@auth_bp.route("/login", methods=["POST"]) 

71@login_limit 

72def login(): 

73 """ 

74 Login handler (POST only). 

75 Rate limited to 5 attempts per 15 minutes per IP to prevent brute force attacks. 

76 """ 

77 config = load_server_config() 

78 # POST - Handle login 

79 username = request.form.get("username", "").strip() 

80 password = request.form.get("password", "") 

81 remember = request.form.get("remember", "false") == "true" 

82 

83 if not username or not password: 

84 flash("Username and password are required", "error") 

85 return render_template( 

86 "auth/login.html", 

87 has_encryption=db_manager.has_encryption, 

88 allow_registrations=config.get("allow_registrations", True), 

89 ), 400 

90 

91 # Try to open user's encrypted database 

92 engine = db_manager.open_user_database(username, password) 

93 

94 if engine is None: 

95 # Invalid credentials or database doesn't exist 

96 logger.warning(f"Failed login attempt for username: {username}") 

97 flash("Invalid username or password", "error") 

98 return render_template( 

99 "auth/login.html", 

100 has_encryption=db_manager.has_encryption, 

101 allow_registrations=config.get("allow_registrations", True), 

102 ), 401 

103 

104 # Check if user has settings loaded (first login after migration) 

105 from ..services.settings_manager import SettingsManager 

106 

107 db_session = db_manager.get_session(username) 

108 if db_session: 108 ↛ 126line 108 didn't jump to line 126 because the condition on line 108 was always true

109 settings_manager = SettingsManager(db_session) 

110 

111 # Check if DB version matches package version 

112 if not settings_manager.db_version_matches_package(): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 logger.info( 

114 f"Database version mismatch for {username} - loading missing default settings" 

115 ) 

116 # Load defaults but preserve existing user settings 

117 settings_manager.load_from_defaults_file( 

118 commit=True, overwrite=False 

119 ) 

120 settings_manager.update_db_version() 

121 logger.info( 

122 f"Missing default settings loaded and version updated for user {username}" 

123 ) 

124 

125 # Initialize library system (source types and default collection) 

126 from ...database.library_init import initialize_library_for_user 

127 

128 try: 

129 init_results = initialize_library_for_user(username, password) 

130 if init_results.get("success"): 130 ↛ 133line 130 didn't jump to line 133 because the condition on line 130 was always true

131 logger.info(f"Library system initialized for user {username}") 

132 else: 

133 logger.warning( 

134 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}" 

135 ) 

136 except Exception: 

137 logger.exception(f"Error initializing library for {username}") 

138 # Don't block login on library init failure 

139 

140 # Success! Create session 

141 session_id = session_manager.create_session(username, remember) 

142 session["session_id"] = session_id 

143 session["username"] = username 

144 session.permanent = remember 

145 

146 # Store password temporarily for post-login database access 

147 from ...database.temp_auth import temp_auth_store 

148 

149 auth_token = temp_auth_store.store_auth(username, password) 

150 session["temp_auth_token"] = auth_token 

151 

152 # Also store in session password store for metrics access 

153 from ...database.session_passwords import session_password_store 

154 

155 session_password_store.store_session_password( 

156 username, session_id, password 

157 ) 

158 

159 # Update last login in auth database 

160 auth_db = get_auth_db_session() 

161 user = auth_db.query(User).filter_by(username=username).first() 

162 if user: 

163 user.last_login = datetime.now(UTC) 

164 

165 # Notify the news scheduler about the user login 

166 try: 

167 from ...news.subscription_manager.scheduler import get_news_scheduler 

168 

169 scheduler = get_news_scheduler() 

170 if scheduler.is_running: 170 ↛ 176line 170 didn't jump to line 176 because the condition on line 170 was always true

171 scheduler.update_user_info(username, password) 

172 logger.info(f"Updated scheduler with user info for {username}") 

173 except Exception: 

174 logger.exception("Could not update scheduler on login") 

175 

176 auth_db.commit() 

177 auth_db.close() 

178 

179 logger.info(f"User {username} logged in successfully") 

180 

181 # Clear the model cache on login to ensure fresh provider data 

182 try: 

183 from ...database.models import ProviderModel 

184 

185 user_db_session = db_manager.get_session(username) 

186 if user_db_session: 186 ↛ 198line 186 didn't jump to line 198 because the condition on line 186 was always true

187 # Delete all cached models for this user 

188 deleted_count = user_db_session.query(ProviderModel).delete() 

189 user_db_session.commit() 

190 logger.info( 

191 f"Cleared {deleted_count} cached models for user {username} on login" 

192 ) 

193 except Exception: 

194 logger.exception("Failed to clear model cache on login") 

195 

196 # Redirect to original requested page or dashboard 

197 # Validate redirect URL to prevent open redirect vulnerability 

198 from urllib.parse import urlparse 

199 

200 next_page = request.args.get("next", url_for("index")) 

201 

202 # Parse the URL and validate its structure 

203 parsed_url = urlparse(next_page) 

204 

205 # Whitelist of allowed paths - add more as needed 

206 allowed_paths = { 

207 "/", 

208 "/dashboard", 

209 "/profile", 

210 "/settings", 

211 "/research", 

212 "/history", 

213 "/metrics", 

214 "/benchmark", 

215 } 

216 

217 # Also allow any path generated by url_for (they start with /) 

218 safe_redirect = url_for("index") # Default safe URL 

219 

220 # Validate the URL: must be relative (no netloc) and safe 

221 if next_page and not parsed_url.netloc and not parsed_url.scheme: 221 ↛ 249line 221 didn't jump to line 249 because the condition on line 221 was always true

222 # Normalize the path to prevent tricks like //example.com 

223 # Using PurePosixPath for URL path normalization (not filesystem paths) 

224 normalized_path = ( 

225 str(PurePosixPath(parsed_url.path)) if parsed_url.path else "" 

226 ) 

227 

228 # Check if it's in the whitelist or is a safe relative path 

229 if parsed_url.path in allowed_paths: 229 ↛ 231line 229 didn't jump to line 231 because the condition on line 229 was always true

230 safe_redirect = parsed_url.path # Use parsed path, not raw input 

231 elif ( 

232 parsed_url.path 

233 and parsed_url.path.startswith("/") 

234 and not parsed_url.path.startswith("//") 

235 ): 

236 # Additional checks for safety 

237 if ".." not in parsed_url.path and normalized_path.startswith("/"): 

238 safe_redirect = ( 

239 parsed_url.path 

240 ) # Use parsed path, not raw input 

241 else: 

242 logger.warning( 

243 f"Blocked potentially unsafe redirect with path traversal: {next_page}" 

244 ) 

245 else: 

246 logger.warning( 

247 f"Blocked potentially unsafe redirect to: {next_page}" 

248 ) 

249 elif next_page: 

250 logger.warning(f"Blocked redirect with protocol/domain: {next_page}") 

251 

252 return redirect(safe_redirect) 

253 

254 

255@auth_bp.route("/register", methods=["GET"]) 

256def register_page(): 

257 """ 

258 Registration page (GET only). 

259 Not rate limited - viewing the page should always work. 

260 """ 

261 config = load_server_config() 

262 if not config.get("allow_registrations", True): 

263 flash("New user registrations are currently disabled.", "error") 

264 return redirect(url_for("auth.login_page")) 

265 

266 return render_template( 

267 "auth/register.html", has_encryption=db_manager.has_encryption 

268 ) 

269 

270 

271@auth_bp.route("/register", methods=["POST"]) 

272@registration_limit 

273def register(): 

274 """ 

275 Registration handler (POST only). 

276 Creates new encrypted database for user with clear warnings about password recovery. 

277 Rate limited to 3 attempts per hour per IP to prevent registration spam. 

278 """ 

279 config = load_server_config() 

280 if not config.get("allow_registrations", True): 

281 flash("New user registrations are currently disabled.", "error") 

282 return redirect(url_for("auth.login_page")) 

283 

284 # POST - Handle registration 

285 username = request.form.get("username", "").strip() 

286 password = request.form.get("password", "") 

287 confirm_password = request.form.get("confirm_password", "") 

288 acknowledge = request.form.get("acknowledge", "false") == "true" 

289 

290 # Validation 

291 errors = [] 

292 

293 if not username: 

294 errors.append("Username is required") 

295 elif len(username) < 3: 

296 errors.append("Username must be at least 3 characters") 

297 elif not username.replace("_", "").replace("-", "").isalnum(): 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 errors.append( 

299 "Username can only contain letters, numbers, underscores, and hyphens" 

300 ) 

301 

302 if not password: 

303 errors.append("Password is required") 

304 elif len(password) < 8: 

305 errors.append("Password must be at least 8 characters") 

306 

307 if password != confirm_password: 

308 errors.append("Passwords do not match") 

309 

310 if not acknowledge: 

311 errors.append( 

312 "You must acknowledge that password recovery is not possible" 

313 ) 

314 

315 # Check if user already exists 

316 # Use generic error message to prevent account enumeration 

317 if username and db_manager.user_exists(username): 

318 errors.append("Registration failed. Please try a different username.") 

319 

320 if errors: 

321 for error in errors: 

322 flash(error, "error") 

323 return render_template( 

324 "auth/register.html", has_encryption=db_manager.has_encryption 

325 ), 400 

326 

327 try: 

328 # Create user in auth database 

329 auth_db = get_auth_db_session() 

330 new_user = User(username=username) 

331 auth_db.add(new_user) 

332 auth_db.commit() 

333 auth_db.close() 

334 

335 # Create encrypted database for user 

336 db_manager.create_user_database(username, password) 

337 

338 # Auto-login after registration 

339 session_id = session_manager.create_session(username, False) 

340 session["session_id"] = session_id 

341 session["username"] = username 

342 

343 # Store password temporarily for post-registration database access 

344 from ...database.temp_auth import temp_auth_store 

345 

346 auth_token = temp_auth_store.store_auth(username, password) 

347 session["temp_auth_token"] = auth_token 

348 

349 # Also store in session password store for metrics access 

350 from ...database.session_passwords import session_password_store 

351 

352 session_password_store.store_session_password( 

353 username, session_id, password 

354 ) 

355 

356 # Notify the news scheduler about the new user 

357 try: 

358 from ...news.subscription_manager.scheduler import ( 

359 get_news_scheduler, 

360 ) 

361 

362 scheduler = get_news_scheduler() 

363 if scheduler.is_running: 363 ↛ 371line 363 didn't jump to line 371 because the condition on line 363 was always true

364 scheduler.update_user_info(username, password) 

365 logger.info( 

366 f"Updated scheduler with new user info for {username}" 

367 ) 

368 except Exception: 

369 logger.exception("Could not update scheduler on registration") 

370 

371 logger.info(f"New user registered: {username}") 

372 

373 # Initialize library system (source types and default collection) 

374 from ...database.library_init import initialize_library_for_user 

375 

376 try: 

377 init_results = initialize_library_for_user(username, password) 

378 if init_results.get("success"): 378 ↛ 383line 378 didn't jump to line 383 because the condition on line 378 was always true

379 logger.info( 

380 f"Library system initialized for new user {username}" 

381 ) 

382 else: 

383 logger.warning( 

384 f"Library initialization issue for {username}: {init_results.get('error', 'Unknown error')}" 

385 ) 

386 except Exception: 

387 logger.exception( 

388 f"Error initializing library for new user {username}" 

389 ) 

390 # Don't block registration on library init failure 

391 

392 return redirect(url_for("index")) 

393 

394 except Exception: 

395 logger.exception(f"Registration failed for {username}") 

396 

397 # Rollback user creation if database creation failed 

398 auth_db.rollback() 

399 auth_db.close() 

400 

401 flash("Registration failed. Please try again.", "error") 

402 return render_template( 

403 "auth/register.html", has_encryption=db_manager.has_encryption 

404 ), 500 

405 

406 

407@auth_bp.route("/logout", methods=["GET", "POST"]) 

408def logout(): 

409 """ 

410 Logout handler. 

411 Clears session and closes database connections. 

412 Supports both GET (for direct navigation) and POST (for form submission). 

413 """ 

414 username = session.get("username") 

415 session_id = session.get("session_id") 

416 

417 if username: 

418 # Close database connection 

419 db_manager.close_user_database(username) 

420 

421 # Clear session 

422 if session_id: 422 ↛ 430line 422 didn't jump to line 430 because the condition on line 422 was always true

423 session_manager.destroy_session(session_id) 

424 

425 # Clear session password 

426 from ...database.session_passwords import session_password_store 

427 

428 session_password_store.clear_session(username, session_id) 

429 

430 session.clear() 

431 

432 logger.info(f"User {username} logged out") 

433 flash("You have been logged out successfully", "info") 

434 

435 return redirect(url_for("auth.login")) 

436 

437 

438@auth_bp.route("/check", methods=["GET"]) 

439def check_auth(): 

440 """ 

441 Check if user is authenticated (for AJAX requests). 

442 """ 

443 if session.get("username"): 

444 return jsonify({"authenticated": True, "username": session["username"]}) 

445 else: 

446 return jsonify({"authenticated": False}), 401 

447 

448 

449@auth_bp.route("/change-password", methods=["GET", "POST"]) 

450def change_password(): 

451 """ 

452 Change password for current user. 

453 Requires current password and re-encrypts database. 

454 """ 

455 username = session.get("username") 

456 if not username: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 return redirect(url_for("auth.login")) 

458 

459 if request.method == "GET": 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 return render_template("auth/change_password.html") 

461 

462 # POST - Handle password change 

463 current_password = request.form.get("current_password", "") 

464 new_password = request.form.get("new_password", "") 

465 confirm_password = request.form.get("confirm_password", "") 

466 

467 # Validation 

468 errors = [] 

469 

470 if not current_password: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 errors.append("Current password is required") 

472 

473 if not new_password: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true

474 errors.append("New password is required") 

475 elif len(new_password) < 8: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 errors.append("New password must be at least 8 characters") 

477 

478 if new_password != confirm_password: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 errors.append("New passwords do not match") 

480 

481 if current_password == new_password: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 errors.append("New password must be different from current password") 

483 

484 if errors: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true

485 for error in errors: 

486 flash(error, "error") 

487 return render_template("auth/change_password.html"), 400 

488 

489 # Attempt password change 

490 success = db_manager.change_password( 

491 username, current_password, new_password 

492 ) 

493 

494 if success: 494 ↛ 505line 494 didn't jump to line 505 because the condition on line 494 was always true

495 # Clear session to force re-login with new password 

496 session.clear() 

497 

498 logger.info(f"Password changed for user {username}") 

499 flash( 

500 "Password changed successfully. Please login with your new password.", 

501 "success", 

502 ) 

503 return redirect(url_for("auth.login")) 

504 else: 

505 flash("Current password is incorrect", "error") 

506 return render_template("auth/change_password.html"), 401 

507 

508 

509@auth_bp.route("/integrity-check", methods=["GET"]) 

510def integrity_check(): 

511 """ 

512 Check database integrity for current user. 

513 """ 

514 username = session.get("username") 

515 if not username: 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 return jsonify({"error": "Not authenticated"}), 401 

517 

518 is_valid = db_manager.check_database_integrity(username) 

519 

520 return jsonify( 

521 { 

522 "username": username, 

523 "integrity": "valid" if is_valid else "corrupted", 

524 "timestamp": datetime.now(timezone.utc).isoformat(), 

525 } 

526 )