Coverage for src/local_deep_research/security/file_upload_validator.py: 100%
76 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Centralized file upload validation for security.
4Provides validation for file uploads to prevent:
5- Memory exhaustion attacks (file size limits)
6- Malicious file uploads (structure validation)
7- Resource abuse (file count limits)
8- Type confusion attacks (MIME validation)
9"""
11import io
12from typing import Optional, Tuple
14import pdfplumber
15from loguru import logger
17from ..settings.manager import check_env_setting
19# Built-in default for the per-file upload cap, in megabytes.
20# Override via the LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB environment variable
21# (or its server-config alias `security.upload_max_file_size_mb`).
22#
23# A separate library-side cap (`research_library.max_pdf_size_mb`,
24# default 3072) gates whether a PDF can be *stored* after upload. Both
25# values default to 3 GB so the two boundaries stay aligned; raising the
26# library cap above this value has no effect because uploads above this
27# cap are rejected before they reach storage.
28_DEFAULT_MAX_FILE_SIZE_MB = 3072 # 3 GB
31def _format_size(size_bytes: int) -> str:
32 """Render a byte count as ``X.YGB`` when >=1 GB, else ``X.YMB``."""
33 one_gb = 1024 * 1024 * 1024
34 if size_bytes >= one_gb:
35 return f"{size_bytes / one_gb:.1f}GB"
36 return f"{size_bytes / (1024 * 1024):.1f}MB"
39def _resolve_max_file_size() -> int:
40 """Resolve the per-file upload cap, in bytes.
42 Reads ``LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB`` (via
43 ``settings.manager.check_env_setting``) so deployments can lower the
44 cap without code changes. Falls back to ``_DEFAULT_MAX_FILE_SIZE_MB``
45 when the variable is unset or unparseable.
46 """
47 env_value = check_env_setting("security.upload_max_file_size_mb")
48 if env_value is not None:
49 try:
50 mb = int(env_value)
51 except ValueError:
52 logger.warning(
53 "LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB={!r} is not an integer; "
54 "falling back to {} MB default.",
55 env_value,
56 _DEFAULT_MAX_FILE_SIZE_MB,
57 )
58 else:
59 # Reject zero / negative values — they would silently break
60 # all uploads. Fall back to the default and log so the
61 # operator can spot the misconfiguration.
62 if mb > 0:
63 return mb * 1024 * 1024
64 logger.warning(
65 "LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB={!r} must be > 0; "
66 "falling back to {} MB default.",
67 env_value,
68 _DEFAULT_MAX_FILE_SIZE_MB,
69 )
70 return _DEFAULT_MAX_FILE_SIZE_MB * 1024 * 1024
73class FileUploadValidator:
74 """Centralized file upload validation for security."""
76 # Security constants. ``MAX_FILE_SIZE`` is loaded from server config so
77 # it can be lowered per-deployment via the env var or UI setting; the
78 # built-in default is 3 GB (see ``server_config._DEFAULTS``).
79 MAX_FILE_SIZE = _resolve_max_file_size()
80 MAX_FILES_PER_REQUEST = 200 # Maximum number of files in single request
81 PDF_MAGIC_BYTES = b"%PDF" # PDF file signature
82 ALLOWED_MIME_TYPES = {"application/pdf"}
84 @staticmethod
85 def validate_file_size(
86 content_length: Optional[int], file_content: Optional[bytes] = None
87 ) -> Tuple[bool, Optional[str]]:
88 """
89 Validate file size to prevent memory exhaustion attacks.
91 Args:
92 content_length: Content-Length header value (if available)
93 file_content: Actual file bytes (if already read)
95 Returns:
96 Tuple of (is_valid, error_message)
97 """
98 # Check Content-Length header first (before reading file)
99 if content_length is not None:
100 if content_length > FileUploadValidator.MAX_FILE_SIZE:
101 return (
102 False,
103 f"File too large: {_format_size(content_length)} "
104 f"(max: {_format_size(FileUploadValidator.MAX_FILE_SIZE)})",
105 )
107 # Check actual file size if content is provided
108 if file_content is not None:
109 actual_size = len(file_content)
110 if actual_size > FileUploadValidator.MAX_FILE_SIZE:
111 return (
112 False,
113 f"File too large: {_format_size(actual_size)} "
114 f"(max: {_format_size(FileUploadValidator.MAX_FILE_SIZE)})",
115 )
117 return True, None
119 @staticmethod
120 def validate_file_count(file_count: int) -> Tuple[bool, Optional[str]]:
121 """
122 Validate number of files to prevent resource abuse.
124 Args:
125 file_count: Number of files in the request
127 Returns:
128 Tuple of (is_valid, error_message)
129 """
130 if file_count > FileUploadValidator.MAX_FILES_PER_REQUEST:
131 return (
132 False,
133 f"Too many files: {file_count} (max: {FileUploadValidator.MAX_FILES_PER_REQUEST})",
134 )
136 if file_count <= 0:
137 return False, "No files provided"
139 return True, None
141 @staticmethod
142 def validate_mime_type(
143 filename: str, file_content: bytes
144 ) -> Tuple[bool, Optional[str]]:
145 """
146 Validate file MIME type and extension.
148 Args:
149 filename: Original filename
150 file_content: File content bytes
152 Returns:
153 Tuple of (is_valid, error_message)
154 """
155 # Check file extension
156 if not filename.lower().endswith(".pdf"):
157 return (
158 False,
159 f"Invalid file type: {filename}. Only PDF files allowed",
160 )
162 # Check PDF magic bytes (file signature)
163 if not file_content.startswith(FileUploadValidator.PDF_MAGIC_BYTES):
164 return (
165 False,
166 f"Invalid PDF file: {filename}. File signature mismatch",
167 )
169 return True, None
171 @staticmethod
172 def validate_pdf_structure(
173 filename: str, file_content: bytes
174 ) -> Tuple[bool, Optional[str]]:
175 """
176 Validate PDF structure to detect malicious or corrupted files.
178 This goes beyond just checking the magic bytes and actually attempts
179 to parse the PDF structure.
181 Args:
182 filename: Original filename
183 file_content: File content bytes
185 Returns:
186 Tuple of (is_valid, error_message)
187 """
188 try:
189 # Attempt to open and parse the PDF structure
190 with pdfplumber.open(io.BytesIO(file_content)) as pdf:
191 # Check if PDF has pages
192 if not pdf.pages or len(pdf.pages) == 0:
193 return False, f"Invalid PDF: {filename}. No pages found"
195 # Try to access first page metadata to ensure it's parseable
196 first_page = pdf.pages[0]
197 _ = first_page.width # Access basic metadata
198 _ = first_page.height
200 return True, None
202 except Exception:
203 logger.warning(f"PDF structure validation failed for {filename}")
204 return False, f"Invalid or corrupted PDF file: {filename}"
206 @classmethod
207 def validate_upload(
208 cls,
209 filename: str,
210 file_content: bytes,
211 content_length: Optional[int] = None,
212 ) -> Tuple[bool, Optional[str]]:
213 """
214 Comprehensive validation for a single file upload.
216 Runs all validation checks in sequence. Stops at first failure.
218 Args:
219 filename: Original filename
220 file_content: File content bytes
221 content_length: Content-Length header (if available)
223 Returns:
224 Tuple of (is_valid, error_message)
225 """
226 # 1. Validate file size
227 is_valid, error = cls.validate_file_size(content_length, file_content)
228 if not is_valid:
229 return is_valid, error
231 # 2. Validate MIME type and extension
232 is_valid, error = cls.validate_mime_type(filename, file_content)
233 if not is_valid:
234 return is_valid, error
236 # 3. Validate PDF structure (more thorough check)
237 is_valid, error = cls.validate_pdf_structure(filename, file_content)
238 if not is_valid:
239 return is_valid, error
241 return True, None