Coverage for src / local_deep_research / security / path_validator.py: 63%

131 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Centralized path validation utilities for security. 

3 

4This module provides secure path validation to prevent path traversal attacks 

5and other filesystem-based security vulnerabilities. 

6""" 

7 

8import os 

9import re 

10from pathlib import Path 

11from typing import Optional, Union 

12 

13from loguru import logger 

14from werkzeug.security import safe_join 

15 

16from ..config.paths import get_models_directory 

17 

18 

19class PathValidator: 

20 """Centralized path validation for security.""" 

21 

22 # Regex for safe filename/path characters 

23 SAFE_PATH_PATTERN = re.compile(r"^[a-zA-Z0-9._/\-]+$") 

24 

25 # Allowed config file extensions 

26 CONFIG_EXTENSIONS = (".json", ".yaml", ".yml", ".toml", ".ini", ".conf") 

27 

28 @staticmethod 

29 def validate_safe_path( 

30 user_input: str, 

31 base_dir: Union[str, Path], 

32 allow_absolute: bool = False, 

33 required_extensions: Optional[tuple] = None, 

34 ) -> Optional[Path]: 

35 """ 

36 Validate and sanitize a user-provided path. 

37 

38 Args: 

39 user_input: The user-provided path string 

40 base_dir: The safe base directory to contain paths within 

41 allow_absolute: Whether to allow absolute paths (with restrictions) 

42 required_extensions: Tuple of required file extensions (e.g., ('.json', '.yaml')) 

43 

44 Returns: 

45 Path object if valid, None if invalid 

46 

47 Raises: 

48 ValueError: If the path is invalid or unsafe 

49 """ 

50 if not user_input or not isinstance(user_input, str): 

51 raise ValueError("Invalid path input") 

52 

53 # Strip whitespace 

54 user_input = user_input.strip() 

55 

56 # Use werkzeug's safe_join for secure path joining 

57 # This handles path traversal attempts automatically 

58 base_dir = Path(base_dir).resolve() 

59 

60 try: 

61 # safe_join returns None if the path tries to escape base_dir 

62 safe_path = safe_join(str(base_dir), user_input) 

63 

64 if safe_path is None: 

65 logger.warning(f"Path traversal attempt blocked: {user_input}") 

66 raise ValueError("Invalid path - potential traversal attempt") 

67 

68 result_path = Path(safe_path) 

69 

70 # Check extensions if required 

71 if ( 

72 required_extensions 

73 and result_path.suffix not in required_extensions 

74 ): 

75 raise ValueError( 

76 f"Invalid file type. Allowed: {required_extensions}" 

77 ) 

78 

79 return result_path 

80 

81 except Exception as e: 

82 logger.warning( 

83 f"Path validation failed for input '{user_input}': {e}" 

84 ) 

85 raise ValueError(f"Invalid path: {e}") from e 

86 

87 @staticmethod 

88 def validate_local_filesystem_path( 

89 user_path: str, 

90 restricted_dirs: Optional[list[Path]] = None, 

91 ) -> Path: 

92 """ 

93 Validate a user-provided absolute filesystem path for local indexing. 

94 

95 This is for features like local folder indexing where users need to 

96 access files anywhere on their own machine, but system directories 

97 should be blocked. 

98 

99 Args: 

100 user_path: User-provided path string (absolute or with ~) 

101 restricted_dirs: List of restricted directories to block 

102 

103 Returns: 

104 Validated and resolved Path object 

105 

106 Raises: 

107 ValueError: If path is invalid or points to restricted location 

108 """ 

109 import sys 

110 

111 if not user_path or not isinstance(user_path, str): 

112 raise ValueError("Invalid path input") 

113 

114 user_path = user_path.strip() 

115 

116 # Basic sanitation: forbid null bytes and control characters 

117 if "\x00" in user_path: 

118 raise ValueError("Null bytes are not allowed in path") 

119 if any(ord(ch) < 32 for ch in user_path): 

120 raise ValueError("Control characters are not allowed in path") 

121 

122 # Expand ~ to home directory 

123 if user_path.startswith("~"): 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true

124 home_dir = Path.home() 

125 relative_part = user_path[2:].lstrip("/") 

126 if relative_part: 

127 user_path = str(home_dir / relative_part) 

128 else: 

129 user_path = str(home_dir) 

130 

131 # Disallow malformed Windows paths (e.g. "/C:/Windows") 

132 if ( 132 ↛ 137line 132 didn't jump to line 137 because the condition on line 132 was never true

133 sys.platform == "win32" 

134 and user_path.startswith(("/", "\\")) 

135 and ":" in user_path 

136 ): 

137 raise ValueError("Malformed Windows path input") 

138 

139 # Block path traversal patterns before resolving 

140 # This explicit check helps static analyzers understand the security intent 

141 if ".." in user_path: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 raise ValueError("Path traversal patterns not allowed") 

143 

144 # Use safe_join to sanitize the path - this is recognized by static analyzers 

145 # For absolute paths, we validate against the root directory 

146 if user_path.startswith("/"): 146 ↛ 152line 146 didn't jump to line 152 because the condition on line 146 was always true

147 # Unix absolute path - use safe_join with root 

148 safe_path = safe_join("/", user_path.lstrip("/")) 

149 if safe_path is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 raise ValueError("Invalid path - failed security validation") 

151 validated_path = Path(safe_path).resolve() 

152 elif len(user_path) > 2 and user_path[1] == ":": 

153 # Windows absolute path (e.g., C:\Users\...) 

154 drive = user_path[:2] 

155 rest = user_path[2:].lstrip("\\").lstrip("/") 

156 safe_path = safe_join(drive + "\\", rest) 

157 if safe_path is None: 

158 raise ValueError("Invalid path - failed security validation") 

159 validated_path = Path(safe_path).resolve() 

160 else: 

161 # Relative path - resolve relative to current directory 

162 # Use safe_join to validate 

163 cwd = os.getcwd() 

164 safe_path = safe_join(cwd, user_path) 

165 if safe_path is None: 

166 raise ValueError("Invalid path - failed security validation") 

167 validated_path = Path(safe_path).resolve() 

168 

169 # Default restricted directories 

170 if restricted_dirs is None: 

171 restricted_dirs = [ 

172 Path("/etc"), 

173 Path("/sys"), 

174 Path("/proc"), 

175 Path("/dev"), 

176 Path("/root"), 

177 Path("/boot"), 

178 Path("/var/log"), 

179 ] 

180 # Add Windows system directories if on Windows 

181 if sys.platform == "win32": 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 for drive in ["C:", "D:", "E:"]: 

183 restricted_dirs.extend( 

184 [ 

185 Path(f"{drive}\\Windows"), 

186 Path(f"{drive}\\System32"), 

187 Path(f"{drive}\\Program Files"), 

188 Path(f"{drive}\\Program Files (x86)"), 

189 ] 

190 ) 

191 

192 # Check against restricted directories 

193 for restricted in restricted_dirs: 

194 if validated_path.is_relative_to(restricted): 

195 logger.error( 

196 f"Security: Blocked access to restricted directory: {validated_path}" 

197 ) 

198 raise ValueError("Cannot access system directories") 

199 

200 return validated_path 

201 

202 @staticmethod 

203 def sanitize_for_filesystem_ops(validated_path: Path) -> Path: 

204 """ 

205 Re-sanitize a validated path for static analyzer recognition. 

206 

207 This method takes an already-validated Path and passes it through 

208 werkzeug's safe_join to create a path that static analyzers like 

209 CodeQL recognize as sanitized. 

210 

211 Note: This exists because CodeQL doesn't trace through custom validation 

212 functions. The path is already secure from validate_local_filesystem_path(), 

213 but safe_join makes that explicit to static analyzers. 

214 

215 Args: 

216 validated_path: A Path that has already been validated by 

217 validate_local_filesystem_path() 

218 

219 Returns: 

220 A Path object safe for filesystem operations 

221 

222 Raises: 

223 ValueError: If the path fails sanitization 

224 """ 

225 if not validated_path.is_absolute(): 

226 raise ValueError("Path must be absolute") 

227 

228 # Use safe_join to create a sanitized path that static analyzers recognize 

229 # safe_join handles path traversal detection properly (not substring matching) 

230 path_str = str(validated_path) 

231 safe_path_str = safe_join("/", path_str.lstrip("/")) 

232 if safe_path_str is None: 

233 raise ValueError("Path failed security sanitization") 

234 

235 return Path(safe_path_str) 

236 

237 @staticmethod 

238 def validate_model_path( 

239 model_path: str, model_root: Optional[str] = None 

240 ) -> Path: 

241 """ 

242 Validate a model file path specifically. 

243 

244 Args: 

245 model_path: Path to the model file 

246 model_root: Root directory for models (defaults to ~/.local/share/llm_models) 

247 

248 Returns: 

249 Validated Path object 

250 

251 Raises: 

252 ValueError: If the path is invalid 

253 """ 

254 if model_root is None: 254 ↛ 256line 254 didn't jump to line 256 because the condition on line 254 was never true

255 # Default model root - uses centralized path config (respects LDR_DATA_DIR) 

256 model_root = str(get_models_directory()) 

257 

258 # Create model root if it doesn't exist 

259 model_root_path = Path(model_root).resolve() 

260 model_root_path.mkdir(parents=True, exist_ok=True) 

261 

262 # Validate the path 

263 validated_path = PathValidator.validate_safe_path( 

264 model_path, 

265 model_root_path, 

266 allow_absolute=False, # Models should always be relative to model root 

267 required_extensions=None, # Models can have various extensions 

268 ) 

269 

270 if not validated_path: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 raise ValueError("Invalid model path") 

272 

273 # Check if the file exists 

274 if not validated_path.exists(): 

275 raise ValueError(f"Model file not found: {validated_path}") 

276 

277 if not validated_path.is_file(): 

278 raise ValueError(f"Model path is not a file: {validated_path}") 

279 

280 return validated_path 

281 

282 @staticmethod 

283 def validate_data_path(file_path: str, data_root: str) -> Path: 

284 """ 

285 Validate a path within the data directory. 

286 

287 Args: 

288 file_path: Path relative to data root 

289 data_root: The data root directory 

290 

291 Returns: 

292 Validated Path object 

293 

294 Raises: 

295 ValueError: If the path is invalid 

296 """ 

297 validated_path = PathValidator.validate_safe_path( 

298 file_path, 

299 data_root, 

300 allow_absolute=False, # Data paths should be relative 

301 required_extensions=None, 

302 ) 

303 

304 if not validated_path: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 raise ValueError("Invalid data path") 

306 

307 return validated_path 

308 

309 @staticmethod 

310 def validate_config_path( 

311 config_path: str, config_root: Optional[str] = None 

312 ) -> Path: 

313 """ 

314 Validate a configuration file path. 

315 

316 Args: 

317 config_path: Path to config file 

318 config_root: Root directory for configs (optional for absolute paths) 

319 

320 Returns: 

321 Validated Path object 

322 

323 Raises: 

324 ValueError: If the path is invalid 

325 """ 

326 # Sanitize input first - remove any null bytes and normalize 

327 if not config_path or not isinstance(config_path, str): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 raise ValueError("Invalid config path input") 

329 

330 # Remove null bytes and normalize 

331 config_path = config_path.replace("\x00", "").strip() 

332 

333 # Check for path traversal attempts in the string itself 

334 # Define restricted system directories that should never be accessed 

335 RESTRICTED_PREFIXES = ("etc", "proc", "sys", "dev") 

336 

337 if ".." in config_path: 

338 raise ValueError("Invalid path - potential traversal attempt") 

339 

340 # Check if path starts with any restricted system directory 

341 normalized_path = config_path.lstrip("/").lower() 

342 for restricted in RESTRICTED_PREFIXES: 

343 if ( 

344 normalized_path.startswith(restricted + "/") 

345 or normalized_path == restricted 

346 ): 

347 raise ValueError( 

348 f"Invalid path - restricted system directory: {restricted}" 

349 ) 

350 

351 # For config files, we might allow absolute paths with restrictions 

352 # Check if path starts with / or drive letter (Windows) to detect absolute paths 

353 # This avoids using Path() or os.path on user input 

354 is_absolute = ( 

355 config_path.startswith("/") # Unix absolute 

356 or ( 

357 len(config_path) > 2 and config_path[1] == ":" 

358 ) # Windows absolute 

359 ) 

360 

361 if is_absolute: 361 ↛ 365line 361 didn't jump to line 365 because the condition on line 361 was never true

362 # For absolute paths, use safe_join with root directory 

363 # This validates the path without using Path() directly on user input 

364 # Use safe_join to validate the absolute path 

365 safe_path = safe_join("/", config_path) 

366 if safe_path is None: 

367 raise ValueError("Invalid absolute path") 

368 

369 # Now it's safe to create Path object from validated string 

370 path_obj = Path(safe_path) 

371 

372 # Additional validation for config files 

373 if path_obj.suffix not in PathValidator.CONFIG_EXTENSIONS: 

374 raise ValueError(f"Invalid config file type: {path_obj.suffix}") 

375 

376 # Check existence using validated path 

377 if not path_obj.exists(): 

378 raise ValueError(f"Config file not found: {path_obj}") 

379 

380 return path_obj 

381 else: 

382 # For relative paths, use the config root 

383 if config_root is None: 383 ↛ 384line 383 didn't jump to line 384 because the condition on line 383 was never true

384 from ..config.paths import get_data_directory 

385 

386 config_root = get_data_directory() 

387 

388 return PathValidator.validate_safe_path( 

389 config_path, 

390 config_root, 

391 allow_absolute=False, 

392 required_extensions=PathValidator.CONFIG_EXTENSIONS, 

393 )