Coverage for src / local_deep_research / security / file_upload_validator.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Centralized file upload validation for security. 

3 

4Provides validation for file uploads to prevent: 

5- Memory exhaustion attacks (file size limits) 

6- Malicious file uploads (structure validation) 

7- Resource abuse (file count limits) 

8- Type confusion attacks (MIME validation) 

9""" 

10 

11import io 

12from typing import Optional, Tuple 

13 

14import pdfplumber 

15from loguru import logger 

16 

17 

18class FileUploadValidator: 

19 """Centralized file upload validation for security.""" 

20 

21 # Security constants 

22 MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB per file 

23 MAX_FILES_PER_REQUEST = 200 # Maximum number of files in single request 

24 PDF_MAGIC_BYTES = b"%PDF" # PDF file signature 

25 ALLOWED_MIME_TYPES = {"application/pdf"} 

26 

27 @staticmethod 

28 def validate_file_size( 

29 content_length: Optional[int], file_content: Optional[bytes] = None 

30 ) -> Tuple[bool, Optional[str]]: 

31 """ 

32 Validate file size to prevent memory exhaustion attacks. 

33 

34 Args: 

35 content_length: Content-Length header value (if available) 

36 file_content: Actual file bytes (if already read) 

37 

38 Returns: 

39 Tuple of (is_valid, error_message) 

40 """ 

41 # Check Content-Length header first (before reading file) 

42 if content_length is not None: 

43 if content_length > FileUploadValidator.MAX_FILE_SIZE: 

44 size_mb = content_length / (1024 * 1024) 

45 max_mb = FileUploadValidator.MAX_FILE_SIZE / (1024 * 1024) 

46 return ( 

47 False, 

48 f"File too large: {size_mb:.1f}MB (max: {max_mb}MB)", 

49 ) 

50 

51 # Check actual file size if content is provided 

52 if file_content is not None: 

53 actual_size = len(file_content) 

54 if actual_size > FileUploadValidator.MAX_FILE_SIZE: 

55 size_mb = actual_size / (1024 * 1024) 

56 max_mb = FileUploadValidator.MAX_FILE_SIZE / (1024 * 1024) 

57 return ( 

58 False, 

59 f"File too large: {size_mb:.1f}MB (max: {max_mb}MB)", 

60 ) 

61 

62 return True, None 

63 

64 @staticmethod 

65 def validate_file_count(file_count: int) -> Tuple[bool, Optional[str]]: 

66 """ 

67 Validate number of files to prevent resource abuse. 

68 

69 Args: 

70 file_count: Number of files in the request 

71 

72 Returns: 

73 Tuple of (is_valid, error_message) 

74 """ 

75 if file_count > FileUploadValidator.MAX_FILES_PER_REQUEST: 

76 return ( 

77 False, 

78 f"Too many files: {file_count} (max: {FileUploadValidator.MAX_FILES_PER_REQUEST})", 

79 ) 

80 

81 if file_count <= 0: 

82 return False, "No files provided" 

83 

84 return True, None 

85 

86 @staticmethod 

87 def validate_mime_type( 

88 filename: str, file_content: bytes 

89 ) -> Tuple[bool, Optional[str]]: 

90 """ 

91 Validate file MIME type and extension. 

92 

93 Args: 

94 filename: Original filename 

95 file_content: File content bytes 

96 

97 Returns: 

98 Tuple of (is_valid, error_message) 

99 """ 

100 # Check file extension 

101 if not filename.lower().endswith(".pdf"): 

102 return ( 

103 False, 

104 f"Invalid file type: {filename}. Only PDF files allowed", 

105 ) 

106 

107 # Check PDF magic bytes (file signature) 

108 if not file_content.startswith(FileUploadValidator.PDF_MAGIC_BYTES): 

109 return ( 

110 False, 

111 f"Invalid PDF file: {filename}. File signature mismatch", 

112 ) 

113 

114 return True, None 

115 

116 @staticmethod 

117 def validate_pdf_structure( 

118 filename: str, file_content: bytes 

119 ) -> Tuple[bool, Optional[str]]: 

120 """ 

121 Validate PDF structure to detect malicious or corrupted files. 

122 

123 This goes beyond just checking the magic bytes and actually attempts 

124 to parse the PDF structure. 

125 

126 Args: 

127 filename: Original filename 

128 file_content: File content bytes 

129 

130 Returns: 

131 Tuple of (is_valid, error_message) 

132 """ 

133 try: 

134 # Attempt to open and parse the PDF structure 

135 with pdfplumber.open(io.BytesIO(file_content)) as pdf: 

136 # Check if PDF has pages 

137 if not pdf.pages or len(pdf.pages) == 0: 

138 return False, f"Invalid PDF: {filename}. No pages found" 

139 

140 # Try to access first page metadata to ensure it's parseable 

141 first_page = pdf.pages[0] 

142 _ = first_page.width # Access basic metadata 

143 _ = first_page.height 

144 

145 return True, None 

146 

147 except Exception as e: 

148 logger.warning( 

149 f"PDF structure validation failed for {filename}: {e}" 

150 ) 

151 return False, f"Invalid or corrupted PDF file: {filename}" 

152 

153 @classmethod 

154 def validate_upload( 

155 cls, 

156 filename: str, 

157 file_content: bytes, 

158 content_length: Optional[int] = None, 

159 ) -> Tuple[bool, Optional[str]]: 

160 """ 

161 Comprehensive validation for a single file upload. 

162 

163 Runs all validation checks in sequence. Stops at first failure. 

164 

165 Args: 

166 filename: Original filename 

167 file_content: File content bytes 

168 content_length: Content-Length header (if available) 

169 

170 Returns: 

171 Tuple of (is_valid, error_message) 

172 """ 

173 # 1. Validate file size 

174 is_valid, error = cls.validate_file_size(content_length, file_content) 

175 if not is_valid: 

176 return is_valid, error 

177 

178 # 2. Validate MIME type and extension 

179 is_valid, error = cls.validate_mime_type(filename, file_content) 

180 if not is_valid: 

181 return is_valid, error 

182 

183 # 3. Validate PDF structure (more thorough check) 

184 is_valid, error = cls.validate_pdf_structure(filename, file_content) 

185 if not is_valid: 

186 return is_valid, error 

187 

188 return True, None