Coverage for src / local_deep_research / research_library / services / pdf_storage_manager.py: 93%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2PDF Storage Manager for Research Library 

3 

4Handles PDF storage across three modes: 

5- none: Don't store PDFs (text-only) 

6- filesystem: Store PDFs unencrypted on disk (fast, external tool compatible) 

7- database: Store PDFs encrypted in SQLCipher database (secure, portable) 

8""" 

9 

10import hashlib 

11import re 

12from datetime import datetime, UTC 

13from pathlib import Path 

14from typing import Optional, Tuple 

15from urllib.parse import urlparse 

16 

17from loguru import logger 

18from sqlalchemy.orm import Session 

19 

20from ...constants import FILE_PATH_SENTINELS 

21from ...database.models.library import Document, DocumentBlob 

22from ...security.path_validator import PathValidator 

23 

24 

25class PDFStorageManager: 

26 """Unified interface for PDF storage across all modes.""" 

27 

28 def __init__( 

29 self, library_root: Path, storage_mode: str, max_pdf_size_mb: int = 100 

30 ): 

31 """ 

32 Initialize PDF storage manager. 

33 

34 Args: 

35 library_root: Base directory for filesystem storage 

36 storage_mode: One of 'none', 'filesystem', 'database' 

37 max_pdf_size_mb: Maximum PDF file size in MB (default 100) 

38 """ 

39 self.library_root = Path(library_root).resolve() 

40 self.storage_mode = storage_mode 

41 self.max_pdf_size_bytes = max_pdf_size_mb * 1024 * 1024 

42 

43 if storage_mode not in ("none", "filesystem", "database"): 

44 logger.warning( 

45 f"Unknown storage mode '{storage_mode}', defaulting to 'none'" 

46 ) 

47 self.storage_mode = "none" 

48 

49 def _get_safe_file_path(self, relative_path: str) -> Optional[Path]: 

50 """ 

51 Safely resolve a relative path within the library root. 

52 

53 Prevents path traversal attacks by validating the path stays within 

54 the library root directory. 

55 

56 Args: 

57 relative_path: Relative path from database 

58 

59 Returns: 

60 Validated absolute Path or None if path is invalid/unsafe 

61 """ 

62 if not relative_path or relative_path in FILE_PATH_SENTINELS: 

63 return None 

64 

65 try: 

66 # Use PathValidator to safely join and validate the path 

67 safe_path = PathValidator.validate_safe_path( 

68 relative_path, str(self.library_root) 

69 ) 

70 safe_path = Path(safe_path) 

71 # Block symbolic links to prevent symlink-based escapes 

72 if safe_path.is_symlink(): 

73 logger.warning(f"Symlink blocked: {relative_path}") 

74 return None 

75 return safe_path 

76 except ValueError: 

77 logger.warning(f"Path traversal blocked: {relative_path}") 

78 return None 

79 

80 def save_pdf( 

81 self, 

82 pdf_content: bytes, 

83 document: Document, 

84 session: Session, 

85 filename: str, 

86 url: Optional[str] = None, 

87 resource_id: Optional[int] = None, 

88 ) -> Tuple[Optional[str], int]: 

89 """ 

90 Save PDF based on configured storage mode. 

91 

92 Args: 

93 pdf_content: Raw PDF bytes 

94 document: Document model instance 

95 session: Database session 

96 filename: Filename to use for saving 

97 url: Source URL (for generating better filenames) 

98 resource_id: Resource ID (for generating better filenames) 

99 

100 Returns: 

101 Tuple of (file_path or storage indicator, file_size) 

102 - For filesystem: relative path string 

103 - For database: "database" 

104 - For none: None 

105 """ 

106 file_size = len(pdf_content) 

107 

108 # Check file size limit 

109 if file_size > self.max_pdf_size_bytes: 

110 max_mb = self.max_pdf_size_bytes / (1024 * 1024) 

111 logger.warning( 

112 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit " 

113 f"({max_mb:.0f}MB), skipping storage" 

114 ) 

115 return None, file_size 

116 

117 if self.storage_mode == "none": 

118 logger.debug("PDF storage mode is 'none' - skipping PDF save") 

119 return None, file_size 

120 

121 if self.storage_mode == "filesystem": 

122 file_path = self._save_to_filesystem( 

123 pdf_content, filename, url, resource_id 

124 ) 

125 relative_path = str(file_path.relative_to(self.library_root)) 

126 document.storage_mode = "filesystem" 

127 document.file_path = relative_path 

128 logger.info(f"PDF saved to filesystem: {relative_path}") 

129 return relative_path, file_size 

130 

131 if self.storage_mode == "database": 131 ↛ 138line 131 didn't jump to line 138 because the condition on line 131 was always true

132 self._save_to_database(pdf_content, document, session) 

133 document.storage_mode = "database" 

134 document.file_path = None # No filesystem path 

135 logger.info(f"PDF saved to database for document {document.id}") 

136 return "database", file_size 

137 

138 return None, file_size 

139 

140 def load_pdf(self, document: Document, session: Session) -> Optional[bytes]: 

141 """ 

142 Load PDF - check database first, then filesystem. 

143 

144 Smart retrieval: doesn't rely on storage_mode column, actually checks 

145 where the PDF exists. 

146 

147 Args: 

148 document: Document model instance 

149 session: Database session 

150 

151 Returns: 

152 PDF bytes or None if not available 

153 """ 

154 # 1. Check database first 

155 pdf_bytes = self._load_from_database(document, session) 

156 if pdf_bytes: 

157 logger.debug(f"Loaded PDF from database for document {document.id}") 

158 return pdf_bytes 

159 

160 # 2. Fallback to filesystem 

161 pdf_bytes = self._load_from_filesystem(document) 

162 if pdf_bytes: 

163 logger.debug( 

164 f"Loaded PDF from filesystem for document {document.id}" 

165 ) 

166 return pdf_bytes 

167 

168 logger.debug(f"No PDF available for document {document.id}") 

169 return None 

170 

171 def has_pdf(self, document: Document, session: Session) -> bool: 

172 """ 

173 Check if PDF is available without loading the actual bytes. 

174 

175 Args: 

176 document: Document model instance 

177 session: Database session 

178 

179 Returns: 

180 True if PDF is available (in database or filesystem) 

181 """ 

182 # Must be a PDF file type 

183 if document.file_type != "pdf": 

184 return False 

185 

186 # Check database first (has blob?) 

187 from ...database.models.library import DocumentBlob 

188 

189 has_blob = ( 

190 session.query(DocumentBlob.id) 

191 .filter_by(document_id=document.id) 

192 .first() 

193 is not None 

194 ) 

195 if has_blob: 

196 return True 

197 

198 # Check filesystem (with path traversal protection) 

199 file_path = self._get_safe_file_path(document.file_path) 

200 if file_path and file_path.is_file(): 

201 return True 

202 

203 return False 

204 

205 @classmethod 

206 def pdf_exists(cls, library_root, document, session): 

207 """Check if a PDF exists in any storage backend. 

208 

209 Use this when you need to check PDF availability without a specific 

210 storage mode — e.g. generating document URLs in search results. 

211 """ 

212 manager = cls(library_root, "none") 

213 return manager.has_pdf(document, session) 

214 

215 def _infer_storage_mode(self, document: Document) -> str: 

216 """ 

217 Infer storage mode for documents without explicit mode set. 

218 Used for backward compatibility with existing documents. 

219 """ 

220 # If there's a blob, it's database storage 

221 if hasattr(document, "blob") and document.blob: 

222 return "database" 

223 # If there's a file_path (and not a sentinel), it's filesystem 

224 if document.file_path and document.file_path not in FILE_PATH_SENTINELS: 

225 return "filesystem" 

226 # Otherwise no storage 

227 return "none" 

228 

229 def _save_to_filesystem( 

230 self, 

231 pdf_content: bytes, 

232 filename: str, 

233 url: Optional[str] = None, 

234 resource_id: Optional[int] = None, 

235 ) -> Path: 

236 """ 

237 Save PDF to filesystem with organized structure. 

238 

239 Returns: 

240 Absolute path to saved file 

241 """ 

242 # Generate better filename if URL is provided 

243 if url: 

244 filename = self._generate_filename(url, resource_id, filename) 

245 

246 # Create simple flat directory structure - all PDFs in one folder 

247 pdf_path = self.library_root / "pdfs" 

248 pdf_path.mkdir(parents=True, exist_ok=True) 

249 

250 # Use PathValidator with relative path from library_root 

251 relative_path = f"pdfs/{filename}" 

252 validated_path = PathValidator.validate_safe_path( 

253 relative_path, 

254 base_dir=str(self.library_root), 

255 required_extensions=(".pdf",), 

256 ) 

257 

258 # Write the PDF file with security verification 

259 # Pass current storage_mode as snapshot since we already validated it 

260 from ...security.file_write_verifier import write_file_verified 

261 

262 write_file_verified( 

263 validated_path, 

264 pdf_content, 

265 "research_library.pdf_storage_mode", 

266 "filesystem", 

267 "library PDF storage", 

268 mode="wb", 

269 settings_snapshot={ 

270 "research_library.pdf_storage_mode": self.storage_mode 

271 }, 

272 ) 

273 

274 return Path(validated_path) 

275 

276 def _save_to_database( 

277 self, pdf_content: bytes, document: Document, session: Session 

278 ) -> None: 

279 """Store PDF in document_blobs table.""" 

280 # Check if blob already exists 

281 existing_blob = ( 

282 session.query(DocumentBlob) 

283 .filter_by(document_id=document.id) 

284 .first() 

285 ) 

286 

287 if existing_blob: 

288 # Update existing blob 

289 existing_blob.pdf_binary = pdf_content 

290 existing_blob.blob_hash = hashlib.sha256(pdf_content).hexdigest() 

291 existing_blob.stored_at = datetime.now(UTC) 

292 logger.debug(f"Updated existing blob for document {document.id}") 

293 else: 

294 # Create new blob 

295 blob = DocumentBlob( 

296 document_id=document.id, 

297 pdf_binary=pdf_content, 

298 blob_hash=hashlib.sha256(pdf_content).hexdigest(), 

299 stored_at=datetime.now(UTC), 

300 ) 

301 session.add(blob) 

302 logger.debug(f"Created new blob for document {document.id}") 

303 

304 def _load_from_filesystem(self, document: Document) -> Optional[bytes]: 

305 """Load PDF from filesystem with path traversal protection.""" 

306 # Use safe path resolution to prevent path traversal attacks 

307 file_path = self._get_safe_file_path(document.file_path) 

308 if not file_path: 

309 return None 

310 

311 if not file_path.is_file(): 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 logger.warning(f"PDF file not found: {file_path}") 

313 return None 

314 

315 try: 

316 return file_path.read_bytes() 

317 except Exception: 

318 logger.exception(f"Failed to read PDF from {file_path}") 

319 return None 

320 

321 def _load_from_database( 

322 self, document: Document, session: Session 

323 ) -> Optional[bytes]: 

324 """Load PDF from document_blobs table.""" 

325 blob = ( 

326 session.query(DocumentBlob) 

327 .filter_by(document_id=document.id) 

328 .first() 

329 ) 

330 

331 if not blob: 

332 logger.debug(f"No blob found for document {document.id}") 

333 return None 

334 

335 # Update last accessed timestamp 

336 blob.last_accessed = datetime.now(UTC) 

337 

338 return blob.pdf_binary 

339 

340 def _generate_filename( 

341 self, url: str, resource_id: Optional[int], fallback_filename: str 

342 ) -> str: 

343 """Generate a meaningful filename from URL.""" 

344 parsed_url = urlparse(url) 

345 hostname = parsed_url.hostname or "" 

346 timestamp = datetime.now(UTC).strftime("%Y%m%d") 

347 

348 if hostname == "arxiv.org" or hostname.endswith(".arxiv.org"): 

349 # Extract arXiv ID 

350 match = re.search(r"(\d{4}\.\d{4,5})", url) 

351 if match: 

352 return f"arxiv_{match.group(1)}.pdf" 

353 return f"arxiv_{timestamp}_{resource_id or 'unknown'}.pdf" 

354 

355 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed_url.path: 

356 # Extract PMC ID 

357 match = re.search(r"(PMC\d+)", url) 

358 if match: 

359 return f"pmc_{match.group(1)}.pdf" 

360 return f"pubmed_{timestamp}_{resource_id or 'unknown'}.pdf" 

361 

362 # Use fallback filename 

363 return fallback_filename 

364 

365 def delete_pdf(self, document: Document, session: Session) -> bool: 

366 """ 

367 Delete PDF for a document. 

368 

369 Args: 

370 document: Document model instance 

371 session: Database session 

372 

373 Returns: 

374 True if deletion succeeded 

375 """ 

376 storage_mode = document.storage_mode or self._infer_storage_mode( 

377 document 

378 ) 

379 

380 try: 

381 if storage_mode == "filesystem": 

382 # Use safe path resolution to prevent path traversal attacks 

383 file_path = self._get_safe_file_path(document.file_path) 

384 if file_path and file_path.is_file(): 

385 file_path.unlink() 

386 logger.info(f"Deleted PDF file: {file_path}") 

387 document.file_path = None 

388 document.storage_mode = "none" 

389 return True 

390 

391 if storage_mode == "database": 

392 blob = ( 

393 session.query(DocumentBlob) 

394 .filter_by(document_id=document.id) 

395 .first() 

396 ) 

397 if blob: 397 ↛ 400line 397 didn't jump to line 400 because the condition on line 397 was always true

398 session.delete(blob) 

399 logger.info(f"Deleted PDF blob for document {document.id}") 

400 document.storage_mode = "none" 

401 return True 

402 

403 return True # Nothing to delete for 'none' mode 

404 

405 except Exception: 

406 logger.exception(f"Failed to delete PDF for document {document.id}") 

407 return False 

408 

409 def upgrade_to_pdf( 

410 self, document: Document, pdf_content: bytes, session: Session 

411 ) -> bool: 

412 """ 

413 Upgrade a text-only document to include PDF storage. 

414 

415 If document already has a PDF stored, returns False (no action needed). 

416 If document is text-only, adds the PDF blob and updates storage_mode. 

417 

418 Args: 

419 document: Document model instance 

420 pdf_content: Raw PDF bytes 

421 session: Database session 

422 

423 Returns: 

424 True if PDF was added, False if already had PDF or failed 

425 """ 

426 # Only upgrade if document is currently text-only 

427 if document.storage_mode not in (None, "none"): 

428 logger.debug( 

429 f"Document {document.id} already has storage_mode={document.storage_mode}" 

430 ) 

431 return False 

432 

433 # Check if blob already exists (shouldn't happen, but be safe) 

434 existing_blob = ( 

435 session.query(DocumentBlob) 

436 .filter_by(document_id=document.id) 

437 .first() 

438 ) 

439 if existing_blob: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 logger.debug(f"Document {document.id} already has a blob") 

441 return False 

442 

443 # Check file size 

444 file_size = len(pdf_content) 

445 if file_size > self.max_pdf_size_bytes: 

446 max_mb = self.max_pdf_size_bytes / (1024 * 1024) 

447 logger.warning( 

448 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit " 

449 f"({max_mb:.0f}MB), skipping upgrade" 

450 ) 

451 return False 

452 

453 try: 

454 # Add the PDF blob 

455 self._save_to_database(pdf_content, document, session) 

456 document.storage_mode = "database" 

457 document.file_path = None 

458 logger.info(f"Upgraded document {document.id} with PDF blob") 

459 return True 

460 except Exception: 

461 logger.exception( 

462 f"Failed to upgrade document {document.id} with PDF" 

463 ) 

464 return False