Coverage for src / local_deep_research / security / path_validator.py: 63%
131 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Centralized path validation utilities for security.
4This module provides secure path validation to prevent path traversal attacks
5and other filesystem-based security vulnerabilities.
6"""
8import os
9import re
10from pathlib import Path
11from typing import Optional, Union
13from loguru import logger
14from werkzeug.security import safe_join
16from ..config.paths import get_models_directory
19class PathValidator:
20 """Centralized path validation for security."""
22 # Regex for safe filename/path characters
23 SAFE_PATH_PATTERN = re.compile(r"^[a-zA-Z0-9._/\-]+$")
25 # Allowed config file extensions
26 CONFIG_EXTENSIONS = (".json", ".yaml", ".yml", ".toml", ".ini", ".conf")
28 @staticmethod
29 def validate_safe_path(
30 user_input: str,
31 base_dir: Union[str, Path],
32 allow_absolute: bool = False,
33 required_extensions: Optional[tuple] = None,
34 ) -> Optional[Path]:
35 """
36 Validate and sanitize a user-provided path.
38 Args:
39 user_input: The user-provided path string
40 base_dir: The safe base directory to contain paths within
41 allow_absolute: Whether to allow absolute paths (with restrictions)
42 required_extensions: Tuple of required file extensions (e.g., ('.json', '.yaml'))
44 Returns:
45 Path object if valid, None if invalid
47 Raises:
48 ValueError: If the path is invalid or unsafe
49 """
50 if not user_input or not isinstance(user_input, str):
51 raise ValueError("Invalid path input")
53 # Strip whitespace
54 user_input = user_input.strip()
56 # Use werkzeug's safe_join for secure path joining
57 # This handles path traversal attempts automatically
58 base_dir = Path(base_dir).resolve()
60 try:
61 # safe_join returns None if the path tries to escape base_dir
62 safe_path = safe_join(str(base_dir), user_input)
64 if safe_path is None:
65 logger.warning(f"Path traversal attempt blocked: {user_input}")
66 raise ValueError("Invalid path - potential traversal attempt")
68 result_path = Path(safe_path)
70 # Check extensions if required
71 if (
72 required_extensions
73 and result_path.suffix not in required_extensions
74 ):
75 raise ValueError(
76 f"Invalid file type. Allowed: {required_extensions}"
77 )
79 return result_path
81 except Exception as e:
82 logger.warning(
83 f"Path validation failed for input '{user_input}': {e}"
84 )
85 raise ValueError(f"Invalid path: {e}") from e
87 @staticmethod
88 def validate_local_filesystem_path(
89 user_path: str,
90 restricted_dirs: Optional[list[Path]] = None,
91 ) -> Path:
92 """
93 Validate a user-provided absolute filesystem path for local indexing.
95 This is for features like local folder indexing where users need to
96 access files anywhere on their own machine, but system directories
97 should be blocked.
99 Args:
100 user_path: User-provided path string (absolute or with ~)
101 restricted_dirs: List of restricted directories to block
103 Returns:
104 Validated and resolved Path object
106 Raises:
107 ValueError: If path is invalid or points to restricted location
108 """
109 import sys
111 if not user_path or not isinstance(user_path, str):
112 raise ValueError("Invalid path input")
114 user_path = user_path.strip()
116 # Basic sanitation: forbid null bytes and control characters
117 if "\x00" in user_path:
118 raise ValueError("Null bytes are not allowed in path")
119 if any(ord(ch) < 32 for ch in user_path):
120 raise ValueError("Control characters are not allowed in path")
122 # Expand ~ to home directory
123 if user_path.startswith("~"): 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 home_dir = Path.home()
125 relative_part = user_path[2:].lstrip("/")
126 if relative_part:
127 user_path = str(home_dir / relative_part)
128 else:
129 user_path = str(home_dir)
131 # Disallow malformed Windows paths (e.g. "/C:/Windows")
132 if ( 132 ↛ 137line 132 didn't jump to line 137 because the condition on line 132 was never true
133 sys.platform == "win32"
134 and user_path.startswith(("/", "\\"))
135 and ":" in user_path
136 ):
137 raise ValueError("Malformed Windows path input")
139 # Block path traversal patterns before resolving
140 # This explicit check helps static analyzers understand the security intent
141 if ".." in user_path: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 raise ValueError("Path traversal patterns not allowed")
144 # Use safe_join to sanitize the path - this is recognized by static analyzers
145 # For absolute paths, we validate against the root directory
146 if user_path.startswith("/"): 146 ↛ 152line 146 didn't jump to line 152 because the condition on line 146 was always true
147 # Unix absolute path - use safe_join with root
148 safe_path = safe_join("/", user_path.lstrip("/"))
149 if safe_path is None: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 raise ValueError("Invalid path - failed security validation")
151 validated_path = Path(safe_path).resolve()
152 elif len(user_path) > 2 and user_path[1] == ":":
153 # Windows absolute path (e.g., C:\Users\...)
154 drive = user_path[:2]
155 rest = user_path[2:].lstrip("\\").lstrip("/")
156 safe_path = safe_join(drive + "\\", rest)
157 if safe_path is None:
158 raise ValueError("Invalid path - failed security validation")
159 validated_path = Path(safe_path).resolve()
160 else:
161 # Relative path - resolve relative to current directory
162 # Use safe_join to validate
163 cwd = os.getcwd()
164 safe_path = safe_join(cwd, user_path)
165 if safe_path is None:
166 raise ValueError("Invalid path - failed security validation")
167 validated_path = Path(safe_path).resolve()
169 # Default restricted directories
170 if restricted_dirs is None:
171 restricted_dirs = [
172 Path("/etc"),
173 Path("/sys"),
174 Path("/proc"),
175 Path("/dev"),
176 Path("/root"),
177 Path("/boot"),
178 Path("/var/log"),
179 ]
180 # Add Windows system directories if on Windows
181 if sys.platform == "win32": 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 for drive in ["C:", "D:", "E:"]:
183 restricted_dirs.extend(
184 [
185 Path(f"{drive}\\Windows"),
186 Path(f"{drive}\\System32"),
187 Path(f"{drive}\\Program Files"),
188 Path(f"{drive}\\Program Files (x86)"),
189 ]
190 )
192 # Check against restricted directories
193 for restricted in restricted_dirs:
194 if validated_path.is_relative_to(restricted):
195 logger.error(
196 f"Security: Blocked access to restricted directory: {validated_path}"
197 )
198 raise ValueError("Cannot access system directories")
200 return validated_path
202 @staticmethod
203 def sanitize_for_filesystem_ops(validated_path: Path) -> Path:
204 """
205 Re-sanitize a validated path for static analyzer recognition.
207 This method takes an already-validated Path and passes it through
208 werkzeug's safe_join to create a path that static analyzers like
209 CodeQL recognize as sanitized.
211 Note: This exists because CodeQL doesn't trace through custom validation
212 functions. The path is already secure from validate_local_filesystem_path(),
213 but safe_join makes that explicit to static analyzers.
215 Args:
216 validated_path: A Path that has already been validated by
217 validate_local_filesystem_path()
219 Returns:
220 A Path object safe for filesystem operations
222 Raises:
223 ValueError: If the path fails sanitization
224 """
225 if not validated_path.is_absolute():
226 raise ValueError("Path must be absolute")
228 # Use safe_join to create a sanitized path that static analyzers recognize
229 # safe_join handles path traversal detection properly (not substring matching)
230 path_str = str(validated_path)
231 safe_path_str = safe_join("/", path_str.lstrip("/"))
232 if safe_path_str is None:
233 raise ValueError("Path failed security sanitization")
235 return Path(safe_path_str)
237 @staticmethod
238 def validate_model_path(
239 model_path: str, model_root: Optional[str] = None
240 ) -> Path:
241 """
242 Validate a model file path specifically.
244 Args:
245 model_path: Path to the model file
246 model_root: Root directory for models (defaults to ~/.local/share/llm_models)
248 Returns:
249 Validated Path object
251 Raises:
252 ValueError: If the path is invalid
253 """
254 if model_root is None: 254 ↛ 256line 254 didn't jump to line 256 because the condition on line 254 was never true
255 # Default model root - uses centralized path config (respects LDR_DATA_DIR)
256 model_root = str(get_models_directory())
258 # Create model root if it doesn't exist
259 model_root_path = Path(model_root).resolve()
260 model_root_path.mkdir(parents=True, exist_ok=True)
262 # Validate the path
263 validated_path = PathValidator.validate_safe_path(
264 model_path,
265 model_root_path,
266 allow_absolute=False, # Models should always be relative to model root
267 required_extensions=None, # Models can have various extensions
268 )
270 if not validated_path: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 raise ValueError("Invalid model path")
273 # Check if the file exists
274 if not validated_path.exists():
275 raise ValueError(f"Model file not found: {validated_path}")
277 if not validated_path.is_file():
278 raise ValueError(f"Model path is not a file: {validated_path}")
280 return validated_path
282 @staticmethod
283 def validate_data_path(file_path: str, data_root: str) -> Path:
284 """
285 Validate a path within the data directory.
287 Args:
288 file_path: Path relative to data root
289 data_root: The data root directory
291 Returns:
292 Validated Path object
294 Raises:
295 ValueError: If the path is invalid
296 """
297 validated_path = PathValidator.validate_safe_path(
298 file_path,
299 data_root,
300 allow_absolute=False, # Data paths should be relative
301 required_extensions=None,
302 )
304 if not validated_path: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 raise ValueError("Invalid data path")
307 return validated_path
309 @staticmethod
310 def validate_config_path(
311 config_path: str, config_root: Optional[str] = None
312 ) -> Path:
313 """
314 Validate a configuration file path.
316 Args:
317 config_path: Path to config file
318 config_root: Root directory for configs (optional for absolute paths)
320 Returns:
321 Validated Path object
323 Raises:
324 ValueError: If the path is invalid
325 """
326 # Sanitize input first - remove any null bytes and normalize
327 if not config_path or not isinstance(config_path, str): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 raise ValueError("Invalid config path input")
330 # Remove null bytes and normalize
331 config_path = config_path.replace("\x00", "").strip()
333 # Check for path traversal attempts in the string itself
334 # Define restricted system directories that should never be accessed
335 RESTRICTED_PREFIXES = ("etc", "proc", "sys", "dev")
337 if ".." in config_path:
338 raise ValueError("Invalid path - potential traversal attempt")
340 # Check if path starts with any restricted system directory
341 normalized_path = config_path.lstrip("/").lower()
342 for restricted in RESTRICTED_PREFIXES:
343 if (
344 normalized_path.startswith(restricted + "/")
345 or normalized_path == restricted
346 ):
347 raise ValueError(
348 f"Invalid path - restricted system directory: {restricted}"
349 )
351 # For config files, we might allow absolute paths with restrictions
352 # Check if path starts with / or drive letter (Windows) to detect absolute paths
353 # This avoids using Path() or os.path on user input
354 is_absolute = (
355 config_path.startswith("/") # Unix absolute
356 or (
357 len(config_path) > 2 and config_path[1] == ":"
358 ) # Windows absolute
359 )
361 if is_absolute: 361 ↛ 365line 361 didn't jump to line 365 because the condition on line 361 was never true
362 # For absolute paths, use safe_join with root directory
363 # This validates the path without using Path() directly on user input
364 # Use safe_join to validate the absolute path
365 safe_path = safe_join("/", config_path)
366 if safe_path is None:
367 raise ValueError("Invalid absolute path")
369 # Now it's safe to create Path object from validated string
370 path_obj = Path(safe_path)
372 # Additional validation for config files
373 if path_obj.suffix not in PathValidator.CONFIG_EXTENSIONS:
374 raise ValueError(f"Invalid config file type: {path_obj.suffix}")
376 # Check existence using validated path
377 if not path_obj.exists():
378 raise ValueError(f"Config file not found: {path_obj}")
380 return path_obj
381 else:
382 # For relative paths, use the config root
383 if config_root is None: 383 ↛ 384line 383 didn't jump to line 384 because the condition on line 383 was never true
384 from ..config.paths import get_data_directory
386 config_root = get_data_directory()
388 return PathValidator.validate_safe_path(
389 config_path,
390 config_root,
391 allow_absolute=False,
392 required_extensions=PathValidator.CONFIG_EXTENSIONS,
393 )