Coverage for src / local_deep_research / security / path_validator.py: 98%
140 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Centralized path validation utilities for security.
4This module provides secure path validation to prevent path traversal attacks
5and other filesystem-based security vulnerabilities.
6"""
8import os
9import re
10from pathlib import Path
11from typing import Optional, Union
13from loguru import logger
14from werkzeug.security import safe_join
16from ..config.paths import get_models_directory
19class PathValidator:
20 """Centralized path validation for security."""
22 # Regex for safe filename/path characters
23 SAFE_PATH_PATTERN = re.compile(r"^[a-zA-Z0-9._/\-]+$")
25 # Allowed config file extensions
26 CONFIG_EXTENSIONS = (".json", ".yaml", ".yml", ".toml", ".ini", ".conf")
28 @staticmethod
29 def validate_safe_path(
30 user_input: str,
31 base_dir: Union[str, Path],
32 allow_absolute: bool = False,
33 required_extensions: Optional[tuple] = None,
34 ) -> Optional[Path]:
35 """
36 Validate and sanitize a user-provided path.
38 Args:
39 user_input: The user-provided path string
40 base_dir: The safe base directory to contain paths within
41 allow_absolute: Whether to allow absolute paths (with restrictions)
42 required_extensions: Tuple of required file extensions (e.g., ('.json', '.yaml'))
44 Returns:
45 Path object if valid, None if invalid
47 Raises:
48 ValueError: If the path is invalid or unsafe
49 """
50 if not user_input or not isinstance(user_input, str):
51 raise ValueError("Invalid path input")
53 if "\x00" in user_input:
54 raise ValueError("Null bytes are not allowed in path")
56 # Strip whitespace
57 user_input = user_input.strip()
59 # Use werkzeug's safe_join for secure path joining
60 # This handles path traversal attempts automatically
61 base_dir = Path(base_dir).resolve()
63 try:
64 # safe_join returns None if the path tries to escape base_dir
65 safe_path = safe_join(str(base_dir), user_input)
66 except ValueError:
67 raise
68 except Exception as e:
69 logger.warning(f"Path validation failed for input '{user_input}'")
70 raise ValueError(f"Invalid path: {e}") from e
72 if safe_path is None:
73 logger.warning(f"Path traversal attempt blocked: {user_input}")
74 raise ValueError("Invalid path - potential traversal attempt")
76 result_path = Path(safe_path)
78 # Check extensions if required
79 if (
80 required_extensions
81 and result_path.suffix not in required_extensions
82 ):
83 raise ValueError(
84 f"Invalid file type. Allowed: {required_extensions}"
85 )
87 return result_path
89 @staticmethod
90 def validate_local_filesystem_path(
91 user_path: str,
92 restricted_dirs: Optional[list[Path]] = None,
93 ) -> Path:
94 """
95 Validate a user-provided absolute filesystem path for local indexing.
97 This is for features like local folder indexing where users need to
98 access files anywhere on their own machine, but system directories
99 should be blocked.
101 Args:
102 user_path: User-provided path string (absolute or with ~)
103 restricted_dirs: List of restricted directories to block
105 Returns:
106 Validated and resolved Path object
108 Raises:
109 ValueError: If path is invalid or points to restricted location
110 """
111 import sys
113 if not user_path or not isinstance(user_path, str):
114 raise ValueError("Invalid path input")
116 user_path = user_path.strip()
118 # Basic sanitation: forbid null bytes and control characters
119 if "\x00" in user_path:
120 raise ValueError("Null bytes are not allowed in path")
121 if any(ord(ch) < 32 for ch in user_path):
122 raise ValueError("Control characters are not allowed in path")
124 # Expand ~ to home directory
125 if user_path.startswith("~"):
126 home_dir = Path.home()
127 relative_part = user_path[2:].lstrip("/")
128 if relative_part:
129 user_path = str(home_dir / relative_part)
130 else:
131 user_path = str(home_dir)
133 # Disallow malformed Windows paths (e.g. "/C:/Windows")
134 if (
135 sys.platform == "win32"
136 and user_path.startswith(("/", "\\"))
137 and ":" in user_path
138 ):
139 raise ValueError("Malformed Windows path input")
141 # Block path traversal patterns before resolving
142 # This explicit check helps static analyzers understand the security intent
143 if ".." in user_path:
144 raise ValueError("Path traversal patterns not allowed")
146 # Use safe_join to sanitize the path - this is recognized by static analyzers
147 # For absolute paths, we validate against the root directory
148 if user_path.startswith("/"):
149 # Unix absolute path - use safe_join with root
150 safe_path = safe_join("/", user_path.lstrip("/"))
151 if safe_path is None:
152 raise ValueError("Invalid path - failed security validation")
153 validated_path = Path(safe_path).resolve()
154 elif len(user_path) > 2 and user_path[1] == ":":
155 # Windows absolute path (e.g., C:\Users\...)
156 drive = user_path[:2]
157 rest = user_path[2:].lstrip("\\").lstrip("/")
158 safe_path = safe_join(drive + "\\", rest)
159 if safe_path is None:
160 raise ValueError("Invalid path - failed security validation")
161 validated_path = Path(safe_path).resolve()
162 else:
163 # Relative path - resolve relative to current directory
164 # Use safe_join to validate
165 cwd = os.getcwd()
166 safe_path = safe_join(cwd, user_path)
167 if safe_path is None:
168 raise ValueError("Invalid path - failed security validation")
169 validated_path = Path(safe_path).resolve()
171 # Default restricted directories
172 if restricted_dirs is None:
173 restricted_dirs = [
174 Path("/etc"),
175 Path("/sys"),
176 Path("/proc"),
177 Path("/dev"),
178 Path("/root"),
179 Path("/boot"),
180 Path("/var/log"),
181 ]
182 # Add Windows system directories if on Windows
183 if sys.platform == "win32":
184 for drive in ["C:", "D:", "E:"]:
185 restricted_dirs.extend(
186 [
187 Path(f"{drive}\\Windows"),
188 Path(f"{drive}\\System32"),
189 Path(f"{drive}\\Program Files"),
190 Path(f"{drive}\\Program Files (x86)"),
191 ]
192 )
194 # Check against restricted directories
195 for restricted in restricted_dirs:
196 if validated_path.is_relative_to(restricted):
197 logger.error(
198 f"Security: Blocked access to restricted directory: {validated_path}"
199 )
200 raise ValueError("Cannot access system directories")
202 return validated_path
204 @staticmethod
205 def sanitize_for_filesystem_ops(validated_path: Path) -> Path:
206 """
207 Re-sanitize a validated path for static analyzer recognition.
209 This method takes an already-validated Path and passes it through
210 werkzeug's safe_join to create a path that static analyzers like
211 CodeQL recognize as sanitized.
213 Note: This exists because CodeQL doesn't trace through custom validation
214 functions. The path is already secure from validate_local_filesystem_path(),
215 but safe_join makes that explicit to static analyzers.
217 Args:
218 validated_path: A Path that has already been validated by
219 validate_local_filesystem_path()
221 Returns:
222 A Path object safe for filesystem operations
224 Raises:
225 ValueError: If the path fails sanitization
226 """
227 if not validated_path.is_absolute():
228 raise ValueError("Path must be absolute")
230 # Use safe_join to create a sanitized path that static analyzers recognize
231 # safe_join handles path traversal detection properly (not substring matching)
232 path_str = str(validated_path)
233 safe_path_str = safe_join("/", path_str.lstrip("/"))
234 if safe_path_str is None:
235 raise ValueError("Path failed security sanitization")
237 return Path(safe_path_str)
239 @staticmethod
240 def validate_model_path(
241 model_path: str, model_root: Optional[str] = None
242 ) -> Path:
243 """
244 Validate a model file path specifically.
246 Args:
247 model_path: Path to the model file
248 model_root: Root directory for models (defaults to ~/.local/share/llm_models)
250 Returns:
251 Validated Path object
253 Raises:
254 ValueError: If the path is invalid
255 """
256 if model_root is None:
257 # Default model root - uses centralized path config (respects LDR_DATA_DIR)
258 model_root = str(get_models_directory())
260 # Create model root if it doesn't exist
261 model_root_path = Path(model_root).resolve()
262 model_root_path.mkdir(parents=True, exist_ok=True)
264 # Validate the path
265 validated_path = PathValidator.validate_safe_path(
266 model_path,
267 model_root_path,
268 allow_absolute=False, # Models should always be relative to model root
269 required_extensions=None, # Models can have various extensions
270 )
272 if not validated_path:
273 raise ValueError("Invalid model path")
275 # Check if the file exists
276 if not validated_path.exists():
277 raise ValueError(f"Model file not found: {validated_path}")
279 if not validated_path.is_file():
280 raise ValueError(f"Model path is not a file: {validated_path}")
282 return validated_path
284 @staticmethod
285 def validate_data_path(file_path: str, data_root: str) -> Path:
286 """
287 Validate a path within the data directory.
289 Args:
290 file_path: Path relative to data root
291 data_root: The data root directory
293 Returns:
294 Validated Path object
296 Raises:
297 ValueError: If the path is invalid
298 """
299 validated_path = PathValidator.validate_safe_path(
300 file_path,
301 data_root,
302 allow_absolute=False, # Data paths should be relative
303 required_extensions=None,
304 )
306 if not validated_path:
307 raise ValueError("Invalid data path")
309 return validated_path
311 @staticmethod
312 def validate_config_path(
313 config_path: str, config_root: Optional[Union[str, Path]] = None
314 ) -> Path:
315 """
316 Validate a configuration file path.
318 Args:
319 config_path: Path to config file
320 config_root: Root directory for configs (optional for absolute paths)
322 Returns:
323 Validated Path object
325 Raises:
326 ValueError: If the path is invalid
327 """
328 # Validate input: reject null bytes, then normalize whitespace
329 if not config_path or not isinstance(config_path, str):
330 raise ValueError("Invalid config path input")
332 if "\x00" in config_path:
333 raise ValueError("Null bytes are not allowed in config path")
335 config_path = config_path.strip()
337 # Check for path traversal attempts in the string itself
338 # Define restricted system directories that should never be accessed
339 RESTRICTED_PREFIXES = ("etc", "proc", "sys", "dev")
341 if ".." in config_path:
342 raise ValueError("Invalid path - potential traversal attempt")
344 # Check if path starts with any restricted system directory
345 normalized_path = config_path.lstrip("/").lower()
346 for restricted in RESTRICTED_PREFIXES:
347 if (
348 normalized_path.startswith(restricted + "/")
349 or normalized_path == restricted
350 ):
351 raise ValueError(
352 f"Invalid path - restricted system directory: {restricted}"
353 )
355 # For config files, we might allow absolute paths with restrictions
356 # Check if path starts with / or drive letter (Windows) to detect absolute paths
357 # This avoids using Path() or os.path on user input
358 is_absolute = (
359 config_path.startswith("/") # Unix absolute
360 or (
361 len(config_path) > 2 and config_path[1] == ":"
362 ) # Windows absolute
363 )
365 if is_absolute:
366 # For absolute paths, use safe_join with root directory
367 # This validates the path without using Path() directly on user input
368 # Use safe_join to validate the absolute path
369 safe_path = safe_join("/", config_path)
370 if safe_path is None:
371 raise ValueError("Invalid absolute path")
373 # Now it's safe to create Path object from validated string
374 path_obj = Path(safe_path)
376 # Additional validation for config files
377 if path_obj.suffix not in PathValidator.CONFIG_EXTENSIONS:
378 raise ValueError(f"Invalid config file type: {path_obj.suffix}")
380 # Check existence using validated path
381 if not path_obj.exists():
382 raise ValueError(f"Config file not found: {path_obj}")
384 return path_obj
385 # For relative paths, use the config root
386 if config_root is None:
387 from ..config.paths import get_data_directory
389 config_root = get_data_directory()
391 validated = PathValidator.validate_safe_path(
392 config_path,
393 config_root,
394 allow_absolute=False,
395 required_extensions=PathValidator.CONFIG_EXTENSIONS,
396 )
397 if validated is None:
398 raise ValueError(f"Invalid config path: {config_path}")
399 return validated