Coverage for src / local_deep_research / security / path_validator.py: 98%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Centralized path validation utilities for security. 

3 

4This module provides secure path validation to prevent path traversal attacks 

5and other filesystem-based security vulnerabilities. 

6""" 

7 

8import os 

9import re 

10from pathlib import Path 

11from typing import Optional, Union 

12 

13from loguru import logger 

14from werkzeug.security import safe_join 

15 

16from ..config.paths import get_models_directory 

17 

18 

19class PathValidator: 

20 """Centralized path validation for security.""" 

21 

22 # Regex for safe filename/path characters 

23 SAFE_PATH_PATTERN = re.compile(r"^[a-zA-Z0-9._/\-]+$") 

24 

25 # Allowed config file extensions 

26 CONFIG_EXTENSIONS = (".json", ".yaml", ".yml", ".toml", ".ini", ".conf") 

27 

28 @staticmethod 

29 def validate_safe_path( 

30 user_input: str, 

31 base_dir: Union[str, Path], 

32 allow_absolute: bool = False, 

33 required_extensions: Optional[tuple] = None, 

34 ) -> Optional[Path]: 

35 """ 

36 Validate and sanitize a user-provided path. 

37 

38 Args: 

39 user_input: The user-provided path string 

40 base_dir: The safe base directory to contain paths within 

41 allow_absolute: Whether to allow absolute paths (with restrictions) 

42 required_extensions: Tuple of required file extensions (e.g., ('.json', '.yaml')) 

43 

44 Returns: 

45 Path object if valid, None if invalid 

46 

47 Raises: 

48 ValueError: If the path is invalid or unsafe 

49 """ 

50 if not user_input or not isinstance(user_input, str): 

51 raise ValueError("Invalid path input") 

52 

53 if "\x00" in user_input: 

54 raise ValueError("Null bytes are not allowed in path") 

55 

56 # Strip whitespace 

57 user_input = user_input.strip() 

58 

59 # Use werkzeug's safe_join for secure path joining 

60 # This handles path traversal attempts automatically 

61 base_dir = Path(base_dir).resolve() 

62 

63 try: 

64 # safe_join returns None if the path tries to escape base_dir 

65 safe_path = safe_join(str(base_dir), user_input) 

66 except ValueError: 

67 raise 

68 except Exception as e: 

69 logger.warning(f"Path validation failed for input '{user_input}'") 

70 raise ValueError(f"Invalid path: {e}") from e 

71 

72 if safe_path is None: 

73 logger.warning(f"Path traversal attempt blocked: {user_input}") 

74 raise ValueError("Invalid path - potential traversal attempt") 

75 

76 result_path = Path(safe_path) 

77 

78 # Check extensions if required 

79 if ( 

80 required_extensions 

81 and result_path.suffix not in required_extensions 

82 ): 

83 raise ValueError( 

84 f"Invalid file type. Allowed: {required_extensions}" 

85 ) 

86 

87 return result_path 

88 

89 @staticmethod 

90 def validate_local_filesystem_path( 

91 user_path: str, 

92 restricted_dirs: Optional[list[Path]] = None, 

93 ) -> Path: 

94 """ 

95 Validate a user-provided absolute filesystem path for local indexing. 

96 

97 This is for features like local folder indexing where users need to 

98 access files anywhere on their own machine, but system directories 

99 should be blocked. 

100 

101 Args: 

102 user_path: User-provided path string (absolute or with ~) 

103 restricted_dirs: List of restricted directories to block 

104 

105 Returns: 

106 Validated and resolved Path object 

107 

108 Raises: 

109 ValueError: If path is invalid or points to restricted location 

110 """ 

111 import sys 

112 

113 if not user_path or not isinstance(user_path, str): 

114 raise ValueError("Invalid path input") 

115 

116 user_path = user_path.strip() 

117 

118 # Basic sanitation: forbid null bytes and control characters 

119 if "\x00" in user_path: 

120 raise ValueError("Null bytes are not allowed in path") 

121 if any(ord(ch) < 32 for ch in user_path): 

122 raise ValueError("Control characters are not allowed in path") 

123 

124 # Expand ~ to home directory 

125 if user_path.startswith("~"): 

126 home_dir = Path.home() 

127 relative_part = user_path[2:].lstrip("/") 

128 if relative_part: 

129 user_path = str(home_dir / relative_part) 

130 else: 

131 user_path = str(home_dir) 

132 

133 # Disallow malformed Windows paths (e.g. "/C:/Windows") 

134 if ( 

135 sys.platform == "win32" 

136 and user_path.startswith(("/", "\\")) 

137 and ":" in user_path 

138 ): 

139 raise ValueError("Malformed Windows path input") 

140 

141 # Block path traversal patterns before resolving 

142 # This explicit check helps static analyzers understand the security intent 

143 if ".." in user_path: 

144 raise ValueError("Path traversal patterns not allowed") 

145 

146 # Use safe_join to sanitize the path - this is recognized by static analyzers 

147 # For absolute paths, we validate against the root directory 

148 if user_path.startswith("/"): 

149 # Unix absolute path - use safe_join with root 

150 safe_path = safe_join("/", user_path.lstrip("/")) 

151 if safe_path is None: 

152 raise ValueError("Invalid path - failed security validation") 

153 validated_path = Path(safe_path).resolve() 

154 elif len(user_path) > 2 and user_path[1] == ":": 

155 # Windows absolute path (e.g., C:\Users\...) 

156 drive = user_path[:2] 

157 rest = user_path[2:].lstrip("\\").lstrip("/") 

158 safe_path = safe_join(drive + "\\", rest) 

159 if safe_path is None: 

160 raise ValueError("Invalid path - failed security validation") 

161 validated_path = Path(safe_path).resolve() 

162 else: 

163 # Relative path - resolve relative to current directory 

164 # Use safe_join to validate 

165 cwd = os.getcwd() 

166 safe_path = safe_join(cwd, user_path) 

167 if safe_path is None: 

168 raise ValueError("Invalid path - failed security validation") 

169 validated_path = Path(safe_path).resolve() 

170 

171 # Default restricted directories 

172 if restricted_dirs is None: 

173 restricted_dirs = [ 

174 Path("/etc"), 

175 Path("/sys"), 

176 Path("/proc"), 

177 Path("/dev"), 

178 Path("/root"), 

179 Path("/boot"), 

180 Path("/var/log"), 

181 ] 

182 # Add Windows system directories if on Windows 

183 if sys.platform == "win32": 

184 for drive in ["C:", "D:", "E:"]: 

185 restricted_dirs.extend( 

186 [ 

187 Path(f"{drive}\\Windows"), 

188 Path(f"{drive}\\System32"), 

189 Path(f"{drive}\\Program Files"), 

190 Path(f"{drive}\\Program Files (x86)"), 

191 ] 

192 ) 

193 

194 # Check against restricted directories 

195 for restricted in restricted_dirs: 

196 if validated_path.is_relative_to(restricted): 

197 logger.error( 

198 f"Security: Blocked access to restricted directory: {validated_path}" 

199 ) 

200 raise ValueError("Cannot access system directories") 

201 

202 return validated_path 

203 

204 @staticmethod 

205 def sanitize_for_filesystem_ops(validated_path: Path) -> Path: 

206 """ 

207 Re-sanitize a validated path for static analyzer recognition. 

208 

209 This method takes an already-validated Path and passes it through 

210 werkzeug's safe_join to create a path that static analyzers like 

211 CodeQL recognize as sanitized. 

212 

213 Note: This exists because CodeQL doesn't trace through custom validation 

214 functions. The path is already secure from validate_local_filesystem_path(), 

215 but safe_join makes that explicit to static analyzers. 

216 

217 Args: 

218 validated_path: A Path that has already been validated by 

219 validate_local_filesystem_path() 

220 

221 Returns: 

222 A Path object safe for filesystem operations 

223 

224 Raises: 

225 ValueError: If the path fails sanitization 

226 """ 

227 if not validated_path.is_absolute(): 

228 raise ValueError("Path must be absolute") 

229 

230 # Use safe_join to create a sanitized path that static analyzers recognize 

231 # safe_join handles path traversal detection properly (not substring matching) 

232 path_str = str(validated_path) 

233 safe_path_str = safe_join("/", path_str.lstrip("/")) 

234 if safe_path_str is None: 

235 raise ValueError("Path failed security sanitization") 

236 

237 return Path(safe_path_str) 

238 

239 @staticmethod 

240 def validate_model_path( 

241 model_path: str, model_root: Optional[str] = None 

242 ) -> Path: 

243 """ 

244 Validate a model file path specifically. 

245 

246 Args: 

247 model_path: Path to the model file 

248 model_root: Root directory for models (defaults to ~/.local/share/llm_models) 

249 

250 Returns: 

251 Validated Path object 

252 

253 Raises: 

254 ValueError: If the path is invalid 

255 """ 

256 if model_root is None: 

257 # Default model root - uses centralized path config (respects LDR_DATA_DIR) 

258 model_root = str(get_models_directory()) 

259 

260 # Create model root if it doesn't exist 

261 model_root_path = Path(model_root).resolve() 

262 model_root_path.mkdir(parents=True, exist_ok=True) 

263 

264 # Validate the path 

265 validated_path = PathValidator.validate_safe_path( 

266 model_path, 

267 model_root_path, 

268 allow_absolute=False, # Models should always be relative to model root 

269 required_extensions=None, # Models can have various extensions 

270 ) 

271 

272 if not validated_path: 

273 raise ValueError("Invalid model path") 

274 

275 # Check if the file exists 

276 if not validated_path.exists(): 

277 raise ValueError(f"Model file not found: {validated_path}") 

278 

279 if not validated_path.is_file(): 

280 raise ValueError(f"Model path is not a file: {validated_path}") 

281 

282 return validated_path 

283 

284 @staticmethod 

285 def validate_data_path(file_path: str, data_root: str) -> Path: 

286 """ 

287 Validate a path within the data directory. 

288 

289 Args: 

290 file_path: Path relative to data root 

291 data_root: The data root directory 

292 

293 Returns: 

294 Validated Path object 

295 

296 Raises: 

297 ValueError: If the path is invalid 

298 """ 

299 validated_path = PathValidator.validate_safe_path( 

300 file_path, 

301 data_root, 

302 allow_absolute=False, # Data paths should be relative 

303 required_extensions=None, 

304 ) 

305 

306 if not validated_path: 

307 raise ValueError("Invalid data path") 

308 

309 return validated_path 

310 

311 @staticmethod 

312 def validate_config_path( 

313 config_path: str, config_root: Optional[Union[str, Path]] = None 

314 ) -> Path: 

315 """ 

316 Validate a configuration file path. 

317 

318 Args: 

319 config_path: Path to config file 

320 config_root: Root directory for configs (optional for absolute paths) 

321 

322 Returns: 

323 Validated Path object 

324 

325 Raises: 

326 ValueError: If the path is invalid 

327 """ 

328 # Validate input: reject null bytes, then normalize whitespace 

329 if not config_path or not isinstance(config_path, str): 

330 raise ValueError("Invalid config path input") 

331 

332 if "\x00" in config_path: 

333 raise ValueError("Null bytes are not allowed in config path") 

334 

335 config_path = config_path.strip() 

336 

337 # Check for path traversal attempts in the string itself 

338 # Define restricted system directories that should never be accessed 

339 RESTRICTED_PREFIXES = ("etc", "proc", "sys", "dev") 

340 

341 if ".." in config_path: 

342 raise ValueError("Invalid path - potential traversal attempt") 

343 

344 # Check if path starts with any restricted system directory 

345 normalized_path = config_path.lstrip("/").lower() 

346 for restricted in RESTRICTED_PREFIXES: 

347 if ( 

348 normalized_path.startswith(restricted + "/") 

349 or normalized_path == restricted 

350 ): 

351 raise ValueError( 

352 f"Invalid path - restricted system directory: {restricted}" 

353 ) 

354 

355 # For config files, we might allow absolute paths with restrictions 

356 # Check if path starts with / or drive letter (Windows) to detect absolute paths 

357 # This avoids using Path() or os.path on user input 

358 is_absolute = ( 

359 config_path.startswith("/") # Unix absolute 

360 or ( 

361 len(config_path) > 2 and config_path[1] == ":" 

362 ) # Windows absolute 

363 ) 

364 

365 if is_absolute: 

366 # For absolute paths, use safe_join with root directory 

367 # This validates the path without using Path() directly on user input 

368 # Use safe_join to validate the absolute path 

369 safe_path = safe_join("/", config_path) 

370 if safe_path is None: 

371 raise ValueError("Invalid absolute path") 

372 

373 # Now it's safe to create Path object from validated string 

374 path_obj = Path(safe_path) 

375 

376 # Additional validation for config files 

377 if path_obj.suffix not in PathValidator.CONFIG_EXTENSIONS: 

378 raise ValueError(f"Invalid config file type: {path_obj.suffix}") 

379 

380 # Check existence using validated path 

381 if not path_obj.exists(): 

382 raise ValueError(f"Config file not found: {path_obj}") 

383 

384 return path_obj 

385 # For relative paths, use the config root 

386 if config_root is None: 

387 from ..config.paths import get_data_directory 

388 

389 config_root = get_data_directory() 

390 

391 validated = PathValidator.validate_safe_path( 

392 config_path, 

393 config_root, 

394 allow_absolute=False, 

395 required_extensions=PathValidator.CONFIG_EXTENSIONS, 

396 ) 

397 if validated is None: 

398 raise ValueError(f"Invalid config path: {config_path}") 

399 return validated