Coverage for src/local_deep_research/research_library/services/pdf_storage_manager.py: 93%

186 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2PDF Storage Manager for Research Library 

3 

4Handles PDF storage across three modes: 

5- none: Don't store PDFs (text-only) 

6- filesystem: Store PDFs unencrypted on disk (fast, external tool compatible) 

7- database: Store PDFs encrypted in SQLCipher database (secure, portable) 

8""" 

9 

10import hashlib 

11import re 

12from datetime import datetime, UTC 

13from pathlib import Path 

14from typing import Optional, Tuple 

15from urllib.parse import urlparse 

16 

17from loguru import logger 

18from sqlalchemy.orm import Session 

19 

20from ...constants import FILE_PATH_SENTINELS 

21from ...database.models.library import Document, DocumentBlob 

22from ...security.path_validator import PathValidator 

23 

24 

25# Default storage cap for individual PDFs (megabytes). Mirrors the 

26# upload-validator cap (`FileUploadValidator.MAX_FILE_SIZE`, configurable 

27# via `LDR_SECURITY_UPLOAD_MAX_FILE_SIZE_MB`) so a file that passes the 

28# upload step won't be silently dropped at storage time. The runtime 

29# value comes from the `research_library.max_pdf_size_mb` setting; this 

30# constant is the shared fallback used by every code-level default so the 

31# limit doesn't drift across files. 

32DEFAULT_MAX_PDF_SIZE_MB = 3072 # 3 GB 

33 

34 

35class PDFStorageManager: 

36 """Unified interface for PDF storage across all modes.""" 

37 

38 def __init__( 

39 self, 

40 library_root: Path, 

41 storage_mode: str, 

42 max_pdf_size_mb: int = DEFAULT_MAX_PDF_SIZE_MB, 

43 ): 

44 """ 

45 Initialize PDF storage manager. 

46 

47 Args: 

48 library_root: Base directory for filesystem storage 

49 storage_mode: One of 'none', 'filesystem', 'database' 

50 max_pdf_size_mb: Maximum PDF file size in MB. Should not 

51 exceed `FileUploadValidator.MAX_FILE_SIZE` (the upload 

52 validator's per-file cap, default 3 GB) — uploads above 

53 that cap are rejected before they reach this layer. 

54 """ 

55 self.library_root = Path(library_root).resolve() 

56 self.storage_mode = storage_mode 

57 self.max_pdf_size_bytes = max_pdf_size_mb * 1024 * 1024 

58 

59 if storage_mode not in ("none", "filesystem", "database"): 

60 logger.warning( 

61 f"Unknown storage mode '{storage_mode}', defaulting to 'none'" 

62 ) 

63 self.storage_mode = "none" 

64 

65 def _get_safe_file_path(self, relative_path: str) -> Optional[Path]: 

66 """ 

67 Safely resolve a relative path within the library root. 

68 

69 Prevents path traversal attacks by validating the path stays within 

70 the library root directory. 

71 

72 Args: 

73 relative_path: Relative path from database 

74 

75 Returns: 

76 Validated absolute Path or None if path is invalid/unsafe 

77 """ 

78 if not relative_path or relative_path in FILE_PATH_SENTINELS: 

79 return None 

80 

81 try: 

82 # Use PathValidator to safely join and validate the path 

83 safe_path = PathValidator.validate_safe_path( 

84 relative_path, str(self.library_root) 

85 ) 

86 safe_path = Path(safe_path) 

87 # Block symbolic links to prevent symlink-based escapes 

88 if safe_path.is_symlink(): 

89 logger.warning(f"Symlink blocked: {relative_path}") 

90 return None 

91 return safe_path 

92 except ValueError: 

93 logger.warning(f"Path traversal blocked: {relative_path}") 

94 return None 

95 

96 def save_pdf( 

97 self, 

98 pdf_content: bytes, 

99 document: Document, 

100 session: Session, 

101 filename: str, 

102 url: Optional[str] = None, 

103 resource_id: Optional[int] = None, 

104 ) -> Tuple[Optional[str], int]: 

105 """ 

106 Save PDF based on configured storage mode. 

107 

108 Args: 

109 pdf_content: Raw PDF bytes 

110 document: Document model instance 

111 session: Database session 

112 filename: Filename to use for saving 

113 url: Source URL (for generating better filenames) 

114 resource_id: Resource ID (for generating better filenames) 

115 

116 Returns: 

117 Tuple of (file_path or storage indicator, file_size) 

118 - For filesystem: relative path string 

119 - For database: "database" 

120 - For none: None 

121 """ 

122 file_size = len(pdf_content) 

123 

124 # Check file size limit 

125 if file_size > self.max_pdf_size_bytes: 

126 max_mb = self.max_pdf_size_bytes / (1024 * 1024) 

127 logger.warning( 

128 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit " 

129 f"({max_mb:.0f}MB), skipping storage" 

130 ) 

131 return None, file_size 

132 

133 if self.storage_mode == "none": 

134 logger.debug("PDF storage mode is 'none' - skipping PDF save") 

135 return None, file_size 

136 

137 if self.storage_mode == "filesystem": 

138 file_path = self._save_to_filesystem( 

139 pdf_content, filename, url, resource_id 

140 ) 

141 relative_path = str(file_path.relative_to(self.library_root)) 

142 document.storage_mode = "filesystem" 

143 document.file_path = relative_path 

144 logger.info(f"PDF saved to filesystem: {relative_path}") 

145 return relative_path, file_size 

146 

147 if self.storage_mode == "database": 147 ↛ 154line 147 didn't jump to line 154 because the condition on line 147 was always true

148 self._save_to_database(pdf_content, document, session) 

149 document.storage_mode = "database" 

150 document.file_path = None # No filesystem path 

151 logger.info(f"PDF saved to database for document {document.id}") 

152 return "database", file_size 

153 

154 return None, file_size 

155 

156 def load_pdf(self, document: Document, session: Session) -> Optional[bytes]: 

157 """ 

158 Load PDF - check database first, then filesystem. 

159 

160 Smart retrieval: doesn't rely on storage_mode column, actually checks 

161 where the PDF exists. 

162 

163 Args: 

164 document: Document model instance 

165 session: Database session 

166 

167 Returns: 

168 PDF bytes or None if not available 

169 """ 

170 # 1. Check database first 

171 pdf_bytes = self._load_from_database(document, session) 

172 if pdf_bytes: 

173 logger.debug(f"Loaded PDF from database for document {document.id}") 

174 return pdf_bytes 

175 

176 # 2. Fallback to filesystem 

177 pdf_bytes = self._load_from_filesystem(document) 

178 if pdf_bytes: 

179 logger.debug( 

180 f"Loaded PDF from filesystem for document {document.id}" 

181 ) 

182 return pdf_bytes 

183 

184 logger.debug(f"No PDF available for document {document.id}") 

185 return None 

186 

187 def has_pdf(self, document: Document, session: Session) -> bool: 

188 """ 

189 Check if PDF is available without loading the actual bytes. 

190 

191 Args: 

192 document: Document model instance 

193 session: Database session 

194 

195 Returns: 

196 True if PDF is available (in database or filesystem) 

197 """ 

198 # Must be a PDF file type 

199 if document.file_type != "pdf": 

200 return False 

201 

202 # Check database first (has blob?) 

203 from ...database.models.library import DocumentBlob 

204 

205 has_blob = ( 

206 session.query(DocumentBlob.id) 

207 .filter_by(document_id=document.id) 

208 .first() 

209 is not None 

210 ) 

211 if has_blob: 

212 return True 

213 

214 # Check filesystem (with path traversal protection) 

215 file_path = self._get_safe_file_path(document.file_path) 

216 if file_path and file_path.is_file(): 

217 return True 

218 

219 return False 

220 

221 @classmethod 

222 def pdf_exists(cls, library_root, document, session): 

223 """Check if a PDF exists in any storage backend. 

224 

225 Use this when you need to check PDF availability without a specific 

226 storage mode — e.g. generating document URLs in search results. 

227 """ 

228 manager = cls(library_root, "none") 

229 return manager.has_pdf(document, session) 

230 

231 def _infer_storage_mode(self, document: Document) -> str: 

232 """ 

233 Infer storage mode for documents without explicit mode set. 

234 Used for backward compatibility with existing documents. 

235 """ 

236 # If there's a blob, it's database storage 

237 if hasattr(document, "blob") and document.blob: 

238 return "database" 

239 # If there's a file_path (and not a sentinel), it's filesystem 

240 if document.file_path and document.file_path not in FILE_PATH_SENTINELS: 

241 return "filesystem" 

242 # Otherwise no storage 

243 return "none" 

244 

245 def _save_to_filesystem( 

246 self, 

247 pdf_content: bytes, 

248 filename: str, 

249 url: Optional[str] = None, 

250 resource_id: Optional[int] = None, 

251 ) -> Path: 

252 """ 

253 Save PDF to filesystem with organized structure. 

254 

255 Returns: 

256 Absolute path to saved file 

257 """ 

258 # Generate better filename if URL is provided 

259 if url: 

260 filename = self._generate_filename(url, resource_id, filename) 

261 

262 # Create simple flat directory structure - all PDFs in one folder 

263 pdf_path = self.library_root / "pdfs" 

264 pdf_path.mkdir(parents=True, exist_ok=True) 

265 

266 # Use PathValidator with relative path from library_root 

267 relative_path = f"pdfs/{filename}" 

268 validated_path = PathValidator.validate_safe_path( 

269 relative_path, 

270 base_dir=str(self.library_root), 

271 required_extensions=(".pdf",), 

272 ) 

273 

274 # Write the PDF file with security verification 

275 # Pass current storage_mode as snapshot since we already validated it 

276 from ...security.file_write_verifier import write_file_verified 

277 

278 write_file_verified( 

279 validated_path, 

280 pdf_content, 

281 "research_library.pdf_storage_mode", 

282 "filesystem", 

283 "library PDF storage", 

284 mode="wb", 

285 settings_snapshot={ 

286 "research_library.pdf_storage_mode": self.storage_mode 

287 }, 

288 ) 

289 

290 return Path(validated_path) 

291 

292 def _save_to_database( 

293 self, pdf_content: bytes, document: Document, session: Session 

294 ) -> None: 

295 """Store PDF in document_blobs table.""" 

296 # Check if blob already exists 

297 existing_blob = ( 

298 session.query(DocumentBlob) 

299 .filter_by(document_id=document.id) 

300 .first() 

301 ) 

302 

303 if existing_blob: 

304 # Update existing blob 

305 existing_blob.pdf_binary = pdf_content 

306 existing_blob.blob_hash = hashlib.sha256(pdf_content).hexdigest() 

307 existing_blob.stored_at = datetime.now(UTC) 

308 logger.debug(f"Updated existing blob for document {document.id}") 

309 else: 

310 # Create new blob 

311 blob = DocumentBlob( 

312 document_id=document.id, 

313 pdf_binary=pdf_content, 

314 blob_hash=hashlib.sha256(pdf_content).hexdigest(), 

315 stored_at=datetime.now(UTC), 

316 ) 

317 session.add(blob) 

318 logger.debug(f"Created new blob for document {document.id}") 

319 

320 def _load_from_filesystem(self, document: Document) -> Optional[bytes]: 

321 """Load PDF from filesystem with path traversal protection.""" 

322 # Use safe path resolution to prevent path traversal attacks 

323 file_path = self._get_safe_file_path(document.file_path) 

324 if not file_path: 

325 return None 

326 

327 if not file_path.is_file(): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 logger.warning(f"PDF file not found: {file_path}") 

329 return None 

330 

331 try: 

332 return file_path.read_bytes() 

333 except Exception: 

334 logger.exception(f"Failed to read PDF from {file_path}") 

335 return None 

336 

337 def _load_from_database( 

338 self, document: Document, session: Session 

339 ) -> Optional[bytes]: 

340 """Load PDF from document_blobs table.""" 

341 blob = ( 

342 session.query(DocumentBlob) 

343 .filter_by(document_id=document.id) 

344 .first() 

345 ) 

346 

347 if not blob: 

348 logger.debug(f"No blob found for document {document.id}") 

349 return None 

350 

351 # Update last accessed timestamp 

352 blob.last_accessed = datetime.now(UTC) 

353 

354 return blob.pdf_binary 

355 

356 def _generate_filename( 

357 self, url: str, resource_id: Optional[int], fallback_filename: str 

358 ) -> str: 

359 """Generate a meaningful filename from URL.""" 

360 parsed_url = urlparse(url) 

361 hostname = parsed_url.hostname or "" 

362 timestamp = datetime.now(UTC).strftime("%Y%m%d") 

363 

364 if hostname == "arxiv.org" or hostname.endswith(".arxiv.org"): 

365 # Extract arXiv ID 

366 match = re.search(r"(\d{4}\.\d{4,5})", url) 

367 if match: 

368 return f"arxiv_{match.group(1)}.pdf" 

369 return f"arxiv_{timestamp}_{resource_id or 'unknown'}.pdf" 

370 

371 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed_url.path: 

372 # Extract PMC ID 

373 match = re.search(r"(PMC\d+)", url) 

374 if match: 

375 return f"pmc_{match.group(1)}.pdf" 

376 return f"pubmed_{timestamp}_{resource_id or 'unknown'}.pdf" 

377 

378 # Use fallback filename 

379 return fallback_filename 

380 

381 def delete_pdf(self, document: Document, session: Session) -> bool: 

382 """ 

383 Delete PDF for a document. 

384 

385 Args: 

386 document: Document model instance 

387 session: Database session 

388 

389 Returns: 

390 True if deletion succeeded 

391 """ 

392 storage_mode = document.storage_mode or self._infer_storage_mode( 

393 document 

394 ) 

395 

396 try: 

397 if storage_mode == "filesystem": 

398 # Use safe path resolution to prevent path traversal attacks 

399 file_path = self._get_safe_file_path(document.file_path) 

400 if file_path and file_path.is_file(): 

401 file_path.unlink() 

402 logger.info(f"Deleted PDF file: {file_path}") 

403 document.file_path = None 

404 document.storage_mode = "none" 

405 return True 

406 

407 if storage_mode == "database": 

408 blob = ( 

409 session.query(DocumentBlob) 

410 .filter_by(document_id=document.id) 

411 .first() 

412 ) 

413 if blob: 413 ↛ 416line 413 didn't jump to line 416 because the condition on line 413 was always true

414 session.delete(blob) 

415 logger.info(f"Deleted PDF blob for document {document.id}") 

416 document.storage_mode = "none" 

417 return True 

418 

419 return True # Nothing to delete for 'none' mode 

420 

421 except Exception: 

422 logger.exception(f"Failed to delete PDF for document {document.id}") 

423 return False 

424 

425 def upgrade_to_pdf( 

426 self, document: Document, pdf_content: bytes, session: Session 

427 ) -> bool: 

428 """ 

429 Upgrade a text-only document to include PDF storage. 

430 

431 If document already has a PDF stored, returns False (no action needed). 

432 If document is text-only, adds the PDF blob and updates storage_mode. 

433 

434 Args: 

435 document: Document model instance 

436 pdf_content: Raw PDF bytes 

437 session: Database session 

438 

439 Returns: 

440 True if PDF was added, False if already had PDF or failed 

441 """ 

442 # Only upgrade if document is currently text-only 

443 if document.storage_mode not in (None, "none"): 

444 logger.debug( 

445 f"Document {document.id} already has storage_mode={document.storage_mode}" 

446 ) 

447 return False 

448 

449 # Check if blob already exists (shouldn't happen, but be safe) 

450 existing_blob = ( 

451 session.query(DocumentBlob) 

452 .filter_by(document_id=document.id) 

453 .first() 

454 ) 

455 if existing_blob: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 logger.debug(f"Document {document.id} already has a blob") 

457 return False 

458 

459 # Check file size 

460 file_size = len(pdf_content) 

461 if file_size > self.max_pdf_size_bytes: 

462 max_mb = self.max_pdf_size_bytes / (1024 * 1024) 

463 logger.warning( 

464 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit " 

465 f"({max_mb:.0f}MB), skipping upgrade" 

466 ) 

467 return False 

468 

469 try: 

470 # Add the PDF blob 

471 self._save_to_database(pdf_content, document, session) 

472 document.storage_mode = "database" 

473 document.file_path = None 

474 logger.info(f"Upgraded document {document.id} with PDF blob") 

475 return True 

476 except Exception: 

477 logger.exception( 

478 f"Failed to upgrade document {document.id} with PDF" 

479 ) 

480 return False