Coverage for src / local_deep_research / security / file_upload_validator.py: 100%
62 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Centralized file upload validation for security.
4Provides validation for file uploads to prevent:
5- Memory exhaustion attacks (file size limits)
6- Malicious file uploads (structure validation)
7- Resource abuse (file count limits)
8- Type confusion attacks (MIME validation)
9"""
11import io
12from typing import Optional, Tuple
14import pdfplumber
15from loguru import logger
18class FileUploadValidator:
19 """Centralized file upload validation for security."""
21 # Security constants
22 MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB per file
23 MAX_FILES_PER_REQUEST = 200 # Maximum number of files in single request
24 PDF_MAGIC_BYTES = b"%PDF" # PDF file signature
25 ALLOWED_MIME_TYPES = {"application/pdf"}
27 @staticmethod
28 def validate_file_size(
29 content_length: Optional[int], file_content: Optional[bytes] = None
30 ) -> Tuple[bool, Optional[str]]:
31 """
32 Validate file size to prevent memory exhaustion attacks.
34 Args:
35 content_length: Content-Length header value (if available)
36 file_content: Actual file bytes (if already read)
38 Returns:
39 Tuple of (is_valid, error_message)
40 """
41 # Check Content-Length header first (before reading file)
42 if content_length is not None:
43 if content_length > FileUploadValidator.MAX_FILE_SIZE:
44 size_mb = content_length / (1024 * 1024)
45 max_mb = FileUploadValidator.MAX_FILE_SIZE / (1024 * 1024)
46 return (
47 False,
48 f"File too large: {size_mb:.1f}MB (max: {max_mb}MB)",
49 )
51 # Check actual file size if content is provided
52 if file_content is not None:
53 actual_size = len(file_content)
54 if actual_size > FileUploadValidator.MAX_FILE_SIZE:
55 size_mb = actual_size / (1024 * 1024)
56 max_mb = FileUploadValidator.MAX_FILE_SIZE / (1024 * 1024)
57 return (
58 False,
59 f"File too large: {size_mb:.1f}MB (max: {max_mb}MB)",
60 )
62 return True, None
64 @staticmethod
65 def validate_file_count(file_count: int) -> Tuple[bool, Optional[str]]:
66 """
67 Validate number of files to prevent resource abuse.
69 Args:
70 file_count: Number of files in the request
72 Returns:
73 Tuple of (is_valid, error_message)
74 """
75 if file_count > FileUploadValidator.MAX_FILES_PER_REQUEST:
76 return (
77 False,
78 f"Too many files: {file_count} (max: {FileUploadValidator.MAX_FILES_PER_REQUEST})",
79 )
81 if file_count <= 0:
82 return False, "No files provided"
84 return True, None
86 @staticmethod
87 def validate_mime_type(
88 filename: str, file_content: bytes
89 ) -> Tuple[bool, Optional[str]]:
90 """
91 Validate file MIME type and extension.
93 Args:
94 filename: Original filename
95 file_content: File content bytes
97 Returns:
98 Tuple of (is_valid, error_message)
99 """
100 # Check file extension
101 if not filename.lower().endswith(".pdf"):
102 return (
103 False,
104 f"Invalid file type: {filename}. Only PDF files allowed",
105 )
107 # Check PDF magic bytes (file signature)
108 if not file_content.startswith(FileUploadValidator.PDF_MAGIC_BYTES):
109 return (
110 False,
111 f"Invalid PDF file: {filename}. File signature mismatch",
112 )
114 return True, None
116 @staticmethod
117 def validate_pdf_structure(
118 filename: str, file_content: bytes
119 ) -> Tuple[bool, Optional[str]]:
120 """
121 Validate PDF structure to detect malicious or corrupted files.
123 This goes beyond just checking the magic bytes and actually attempts
124 to parse the PDF structure.
126 Args:
127 filename: Original filename
128 file_content: File content bytes
130 Returns:
131 Tuple of (is_valid, error_message)
132 """
133 try:
134 # Attempt to open and parse the PDF structure
135 with pdfplumber.open(io.BytesIO(file_content)) as pdf:
136 # Check if PDF has pages
137 if not pdf.pages or len(pdf.pages) == 0:
138 return False, f"Invalid PDF: {filename}. No pages found"
140 # Try to access first page metadata to ensure it's parseable
141 first_page = pdf.pages[0]
142 _ = first_page.width # Access basic metadata
143 _ = first_page.height
145 return True, None
147 except Exception as e:
148 logger.warning(
149 f"PDF structure validation failed for {filename}: {e}"
150 )
151 return False, f"Invalid or corrupted PDF file: {filename}"
153 @classmethod
154 def validate_upload(
155 cls,
156 filename: str,
157 file_content: bytes,
158 content_length: Optional[int] = None,
159 ) -> Tuple[bool, Optional[str]]:
160 """
161 Comprehensive validation for a single file upload.
163 Runs all validation checks in sequence. Stops at first failure.
165 Args:
166 filename: Original filename
167 file_content: File content bytes
168 content_length: Content-Length header (if available)
170 Returns:
171 Tuple of (is_valid, error_message)
172 """
173 # 1. Validate file size
174 is_valid, error = cls.validate_file_size(content_length, file_content)
175 if not is_valid:
176 return is_valid, error
178 # 2. Validate MIME type and extension
179 is_valid, error = cls.validate_mime_type(filename, file_content)
180 if not is_valid:
181 return is_valid, error
183 # 3. Validate PDF structure (more thorough check)
184 is_valid, error = cls.validate_pdf_structure(filename, file_content)
185 if not is_valid:
186 return is_valid, error
188 return True, None