Coverage for src / local_deep_research / security / file_upload_validator.py: 100%
62 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Centralized file upload validation for security.
4Provides validation for file uploads to prevent:
5- Memory exhaustion attacks (file size limits)
6- Malicious file uploads (structure validation)
7- Resource abuse (file count limits)
8- Type confusion attacks (MIME validation)
9"""
11import io
12from typing import Optional, Tuple
14import pdfplumber
15from loguru import logger
18class FileUploadValidator:
19 """Centralized file upload validation for security."""
21 # Security constants
22 MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB per file
23 MAX_FILES_PER_REQUEST = 200 # Maximum number of files in single request
24 PDF_MAGIC_BYTES = b"%PDF" # PDF file signature
25 ALLOWED_MIME_TYPES = {"application/pdf"}
27 @staticmethod
28 def validate_file_size(
29 content_length: Optional[int], file_content: Optional[bytes] = None
30 ) -> Tuple[bool, Optional[str]]:
31 """
32 Validate file size to prevent memory exhaustion attacks.
34 Args:
35 content_length: Content-Length header value (if available)
36 file_content: Actual file bytes (if already read)
38 Returns:
39 Tuple of (is_valid, error_message)
40 """
41 # Check Content-Length header first (before reading file)
42 if content_length is not None:
43 if content_length > FileUploadValidator.MAX_FILE_SIZE:
44 size_mb = content_length / (1024 * 1024)
45 max_mb = FileUploadValidator.MAX_FILE_SIZE / (1024 * 1024)
46 return (
47 False,
48 f"File too large: {size_mb:.1f}MB (max: {max_mb}MB)",
49 )
51 # Check actual file size if content is provided
52 if file_content is not None:
53 actual_size = len(file_content)
54 if actual_size > FileUploadValidator.MAX_FILE_SIZE:
55 size_mb = actual_size / (1024 * 1024)
56 max_mb = FileUploadValidator.MAX_FILE_SIZE / (1024 * 1024)
57 return (
58 False,
59 f"File too large: {size_mb:.1f}MB (max: {max_mb}MB)",
60 )
62 return True, None
64 @staticmethod
65 def validate_file_count(file_count: int) -> Tuple[bool, Optional[str]]:
66 """
67 Validate number of files to prevent resource abuse.
69 Args:
70 file_count: Number of files in the request
72 Returns:
73 Tuple of (is_valid, error_message)
74 """
75 if file_count > FileUploadValidator.MAX_FILES_PER_REQUEST:
76 return (
77 False,
78 f"Too many files: {file_count} (max: {FileUploadValidator.MAX_FILES_PER_REQUEST})",
79 )
81 if file_count <= 0:
82 return False, "No files provided"
84 return True, None
86 @staticmethod
87 def validate_mime_type(
88 filename: str, file_content: bytes
89 ) -> Tuple[bool, Optional[str]]:
90 """
91 Validate file MIME type and extension.
93 Args:
94 filename: Original filename
95 file_content: File content bytes
97 Returns:
98 Tuple of (is_valid, error_message)
99 """
100 # Check file extension
101 if not filename.lower().endswith(".pdf"):
102 return (
103 False,
104 f"Invalid file type: {filename}. Only PDF files allowed",
105 )
107 # Check PDF magic bytes (file signature)
108 if not file_content.startswith(FileUploadValidator.PDF_MAGIC_BYTES):
109 return (
110 False,
111 f"Invalid PDF file: {filename}. File signature mismatch",
112 )
114 return True, None
116 @staticmethod
117 def validate_pdf_structure(
118 filename: str, file_content: bytes
119 ) -> Tuple[bool, Optional[str]]:
120 """
121 Validate PDF structure to detect malicious or corrupted files.
123 This goes beyond just checking the magic bytes and actually attempts
124 to parse the PDF structure.
126 Args:
127 filename: Original filename
128 file_content: File content bytes
130 Returns:
131 Tuple of (is_valid, error_message)
132 """
133 try:
134 # Attempt to open and parse the PDF structure
135 with pdfplumber.open(io.BytesIO(file_content)) as pdf:
136 # Check if PDF has pages
137 if not pdf.pages or len(pdf.pages) == 0:
138 return False, f"Invalid PDF: {filename}. No pages found"
140 # Try to access first page metadata to ensure it's parseable
141 first_page = pdf.pages[0]
142 _ = first_page.width # Access basic metadata
143 _ = first_page.height
145 return True, None
147 except Exception:
148 logger.warning(f"PDF structure validation failed for {filename}")
149 return False, f"Invalid or corrupted PDF file: {filename}"
151 @classmethod
152 def validate_upload(
153 cls,
154 filename: str,
155 file_content: bytes,
156 content_length: Optional[int] = None,
157 ) -> Tuple[bool, Optional[str]]:
158 """
159 Comprehensive validation for a single file upload.
161 Runs all validation checks in sequence. Stops at first failure.
163 Args:
164 filename: Original filename
165 file_content: File content bytes
166 content_length: Content-Length header (if available)
168 Returns:
169 Tuple of (is_valid, error_message)
170 """
171 # 1. Validate file size
172 is_valid, error = cls.validate_file_size(content_length, file_content)
173 if not is_valid:
174 return is_valid, error
176 # 2. Validate MIME type and extension
177 is_valid, error = cls.validate_mime_type(filename, file_content)
178 if not is_valid:
179 return is_valid, error
181 # 3. Validate PDF structure (more thorough check)
182 is_valid, error = cls.validate_pdf_structure(filename, file_content)
183 if not is_valid:
184 return is_valid, error
186 return True, None