Coverage for src/local_deep_research/security/file_upload_validator.py: 100%

76 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Centralized file upload validation for security. 

3 

4Provides validation for file uploads to prevent: 

5- Memory exhaustion attacks (file size limits) 

6- Malicious file uploads (structure validation) 

7- Resource abuse (file count limits) 

8- Type confusion attacks (MIME validation) 

9""" 

10 

11import io 

12from typing import Optional, Tuple 

13 

14import pdfplumber 

15from loguru import logger 

16 

17from ..settings.manager import check_env_setting 

18 

19# Built-in default for the per-file upload cap, in megabytes. 

20# Override via the LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB environment variable 

21# (or its server-config alias `security.upload_max_file_size_mb`). 

22# 

23# A separate library-side cap (`research_library.max_pdf_size_mb`, 

24# default 3072) gates whether a PDF can be *stored* after upload. Both 

25# values default to 3 GB so the two boundaries stay aligned; raising the 

26# library cap above this value has no effect because uploads above this 

27# cap are rejected before they reach storage. 

28_DEFAULT_MAX_FILE_SIZE_MB = 3072 # 3 GB 

29 

30 

31def _format_size(size_bytes: int) -> str: 

32 """Render a byte count as ``X.YGB`` when >=1 GB, else ``X.YMB``.""" 

33 one_gb = 1024 * 1024 * 1024 

34 if size_bytes >= one_gb: 

35 return f"{size_bytes / one_gb:.1f}GB" 

36 return f"{size_bytes / (1024 * 1024):.1f}MB" 

37 

38 

39def _resolve_max_file_size() -> int: 

40 """Resolve the per-file upload cap, in bytes. 

41 

42 Reads ``LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB`` (via 

43 ``settings.manager.check_env_setting``) so deployments can lower the 

44 cap without code changes. Falls back to ``_DEFAULT_MAX_FILE_SIZE_MB`` 

45 when the variable is unset or unparseable. 

46 """ 

47 env_value = check_env_setting("security.upload_max_file_size_mb") 

48 if env_value is not None: 

49 try: 

50 mb = int(env_value) 

51 except ValueError: 

52 logger.warning( 

53 "LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB={!r} is not an integer; " 

54 "falling back to {} MB default.", 

55 env_value, 

56 _DEFAULT_MAX_FILE_SIZE_MB, 

57 ) 

58 else: 

59 # Reject zero / negative values — they would silently break 

60 # all uploads. Fall back to the default and log so the 

61 # operator can spot the misconfiguration. 

62 if mb > 0: 

63 return mb * 1024 * 1024 

64 logger.warning( 

65 "LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB={!r} must be > 0; " 

66 "falling back to {} MB default.", 

67 env_value, 

68 _DEFAULT_MAX_FILE_SIZE_MB, 

69 ) 

70 return _DEFAULT_MAX_FILE_SIZE_MB * 1024 * 1024 

71 

72 

73class FileUploadValidator: 

74 """Centralized file upload validation for security.""" 

75 

76 # Security constants. ``MAX_FILE_SIZE`` is loaded from server config so 

77 # it can be lowered per-deployment via the env var or UI setting; the 

78 # built-in default is 3 GB (see ``server_config._DEFAULTS``). 

79 MAX_FILE_SIZE = _resolve_max_file_size() 

80 MAX_FILES_PER_REQUEST = 200 # Maximum number of files in single request 

81 PDF_MAGIC_BYTES = b"%PDF" # PDF file signature 

82 ALLOWED_MIME_TYPES = {"application/pdf"} 

83 

84 @staticmethod 

85 def validate_file_size( 

86 content_length: Optional[int], file_content: Optional[bytes] = None 

87 ) -> Tuple[bool, Optional[str]]: 

88 """ 

89 Validate file size to prevent memory exhaustion attacks. 

90 

91 Args: 

92 content_length: Content-Length header value (if available) 

93 file_content: Actual file bytes (if already read) 

94 

95 Returns: 

96 Tuple of (is_valid, error_message) 

97 """ 

98 # Check Content-Length header first (before reading file) 

99 if content_length is not None: 

100 if content_length > FileUploadValidator.MAX_FILE_SIZE: 

101 return ( 

102 False, 

103 f"File too large: {_format_size(content_length)} " 

104 f"(max: {_format_size(FileUploadValidator.MAX_FILE_SIZE)})", 

105 ) 

106 

107 # Check actual file size if content is provided 

108 if file_content is not None: 

109 actual_size = len(file_content) 

110 if actual_size > FileUploadValidator.MAX_FILE_SIZE: 

111 return ( 

112 False, 

113 f"File too large: {_format_size(actual_size)} " 

114 f"(max: {_format_size(FileUploadValidator.MAX_FILE_SIZE)})", 

115 ) 

116 

117 return True, None 

118 

119 @staticmethod 

120 def validate_file_count(file_count: int) -> Tuple[bool, Optional[str]]: 

121 """ 

122 Validate number of files to prevent resource abuse. 

123 

124 Args: 

125 file_count: Number of files in the request 

126 

127 Returns: 

128 Tuple of (is_valid, error_message) 

129 """ 

130 if file_count > FileUploadValidator.MAX_FILES_PER_REQUEST: 

131 return ( 

132 False, 

133 f"Too many files: {file_count} (max: {FileUploadValidator.MAX_FILES_PER_REQUEST})", 

134 ) 

135 

136 if file_count <= 0: 

137 return False, "No files provided" 

138 

139 return True, None 

140 

141 @staticmethod 

142 def validate_mime_type( 

143 filename: str, file_content: bytes 

144 ) -> Tuple[bool, Optional[str]]: 

145 """ 

146 Validate file MIME type and extension. 

147 

148 Args: 

149 filename: Original filename 

150 file_content: File content bytes 

151 

152 Returns: 

153 Tuple of (is_valid, error_message) 

154 """ 

155 # Check file extension 

156 if not filename.lower().endswith(".pdf"): 

157 return ( 

158 False, 

159 f"Invalid file type: {filename}. Only PDF files allowed", 

160 ) 

161 

162 # Check PDF magic bytes (file signature) 

163 if not file_content.startswith(FileUploadValidator.PDF_MAGIC_BYTES): 

164 return ( 

165 False, 

166 f"Invalid PDF file: {filename}. File signature mismatch", 

167 ) 

168 

169 return True, None 

170 

171 @staticmethod 

172 def validate_pdf_structure( 

173 filename: str, file_content: bytes 

174 ) -> Tuple[bool, Optional[str]]: 

175 """ 

176 Validate PDF structure to detect malicious or corrupted files. 

177 

178 This goes beyond just checking the magic bytes and actually attempts 

179 to parse the PDF structure. 

180 

181 Args: 

182 filename: Original filename 

183 file_content: File content bytes 

184 

185 Returns: 

186 Tuple of (is_valid, error_message) 

187 """ 

188 try: 

189 # Attempt to open and parse the PDF structure 

190 with pdfplumber.open(io.BytesIO(file_content)) as pdf: 

191 # Check if PDF has pages 

192 if not pdf.pages or len(pdf.pages) == 0: 

193 return False, f"Invalid PDF: {filename}. No pages found" 

194 

195 # Try to access first page metadata to ensure it's parseable 

196 first_page = pdf.pages[0] 

197 _ = first_page.width # Access basic metadata 

198 _ = first_page.height 

199 

200 return True, None 

201 

202 except Exception: 

203 logger.warning(f"PDF structure validation failed for {filename}") 

204 return False, f"Invalid or corrupted PDF file: {filename}" 

205 

206 @classmethod 

207 def validate_upload( 

208 cls, 

209 filename: str, 

210 file_content: bytes, 

211 content_length: Optional[int] = None, 

212 ) -> Tuple[bool, Optional[str]]: 

213 """ 

214 Comprehensive validation for a single file upload. 

215 

216 Runs all validation checks in sequence. Stops at first failure. 

217 

218 Args: 

219 filename: Original filename 

220 file_content: File content bytes 

221 content_length: Content-Length header (if available) 

222 

223 Returns: 

224 Tuple of (is_valid, error_message) 

225 """ 

226 # 1. Validate file size 

227 is_valid, error = cls.validate_file_size(content_length, file_content) 

228 if not is_valid: 

229 return is_valid, error 

230 

231 # 2. Validate MIME type and extension 

232 is_valid, error = cls.validate_mime_type(filename, file_content) 

233 if not is_valid: 

234 return is_valid, error 

235 

236 # 3. Validate PDF structure (more thorough check) 

237 is_valid, error = cls.validate_pdf_structure(filename, file_content) 

238 if not is_valid: 

239 return is_valid, error 

240 

241 return True, None