Coverage for src / local_deep_research / research_library / services / pdf_storage_manager.py: 81%
171 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2PDF Storage Manager for Research Library
4Handles PDF storage across three modes:
5- none: Don't store PDFs (text-only)
6- filesystem: Store PDFs unencrypted on disk (fast, external tool compatible)
7- database: Store PDFs encrypted in SQLCipher database (secure, portable)
8"""
10import hashlib
11import re
12from datetime import datetime, UTC
13from pathlib import Path
14from typing import Optional, Tuple
15from urllib.parse import urlparse
17from loguru import logger
18from sqlalchemy.orm import Session
20from ...database.models.library import Document, DocumentBlob
21from ...security.path_validator import PathValidator
24class PDFStorageManager:
25 """Unified interface for PDF storage across all modes."""
27 def __init__(
28 self, library_root: Path, storage_mode: str, max_pdf_size_mb: int = 100
29 ):
30 """
31 Initialize PDF storage manager.
33 Args:
34 library_root: Base directory for filesystem storage
35 storage_mode: One of 'none', 'filesystem', 'database'
36 max_pdf_size_mb: Maximum PDF file size in MB (default 100)
37 """
38 self.library_root = Path(library_root)
39 self.storage_mode = storage_mode
40 self.max_pdf_size_bytes = max_pdf_size_mb * 1024 * 1024
42 if storage_mode not in ("none", "filesystem", "database"):
43 logger.warning(
44 f"Unknown storage mode '{storage_mode}', defaulting to 'none'"
45 )
46 self.storage_mode = "none"
48 def save_pdf(
49 self,
50 pdf_content: bytes,
51 document: Document,
52 session: Session,
53 filename: str,
54 url: Optional[str] = None,
55 resource_id: Optional[int] = None,
56 ) -> Tuple[Optional[str], int]:
57 """
58 Save PDF based on configured storage mode.
60 Args:
61 pdf_content: Raw PDF bytes
62 document: Document model instance
63 session: Database session
64 filename: Filename to use for saving
65 url: Source URL (for generating better filenames)
66 resource_id: Resource ID (for generating better filenames)
68 Returns:
69 Tuple of (file_path or storage indicator, file_size)
70 - For filesystem: relative path string
71 - For database: "database"
72 - For none: None
73 """
74 file_size = len(pdf_content)
76 # Check file size limit
77 if file_size > self.max_pdf_size_bytes:
78 max_mb = self.max_pdf_size_bytes / (1024 * 1024)
79 logger.warning(
80 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit "
81 f"({max_mb:.0f}MB), skipping storage"
82 )
83 return None, file_size
85 if self.storage_mode == "none":
86 logger.debug("PDF storage mode is 'none' - skipping PDF save")
87 return None, file_size
89 elif self.storage_mode == "filesystem":
90 file_path = self._save_to_filesystem(
91 pdf_content, filename, url, resource_id
92 )
93 relative_path = str(file_path.relative_to(self.library_root))
94 document.storage_mode = "filesystem"
95 document.file_path = relative_path
96 logger.info(f"PDF saved to filesystem: {relative_path}")
97 return relative_path, file_size
99 elif self.storage_mode == "database": 99 ↛ 106line 99 didn't jump to line 106 because the condition on line 99 was always true
100 self._save_to_database(pdf_content, document, session)
101 document.storage_mode = "database"
102 document.file_path = None # No filesystem path
103 logger.info(f"PDF saved to database for document {document.id}")
104 return "database", file_size
106 return None, file_size
108 def load_pdf(self, document: Document, session: Session) -> Optional[bytes]:
109 """
110 Load PDF - check database first, then filesystem.
112 Smart retrieval: doesn't rely on storage_mode column, actually checks
113 where the PDF exists.
115 Args:
116 document: Document model instance
117 session: Database session
119 Returns:
120 PDF bytes or None if not available
121 """
122 # 1. Check database first
123 pdf_bytes = self._load_from_database(document, session)
124 if pdf_bytes:
125 logger.debug(f"Loaded PDF from database for document {document.id}")
126 return pdf_bytes
128 # 2. Fallback to filesystem
129 pdf_bytes = self._load_from_filesystem(document)
130 if pdf_bytes:
131 logger.debug(
132 f"Loaded PDF from filesystem for document {document.id}"
133 )
134 return pdf_bytes
136 logger.debug(f"No PDF available for document {document.id}")
137 return None
139 def has_pdf(self, document: Document, session: Session) -> bool:
140 """
141 Check if PDF is available without loading the actual bytes.
143 Args:
144 document: Document model instance
145 session: Database session
147 Returns:
148 True if PDF is available (in database or filesystem)
149 """
150 # Must be a PDF file type
151 if document.file_type != "pdf":
152 return False
154 # Check database first (has blob?)
155 from ...database.models.library import DocumentBlob
157 has_blob = (
158 session.query(DocumentBlob.id)
159 .filter_by(document_id=document.id)
160 .first()
161 is not None
162 )
163 if has_blob:
164 return True
166 # Check filesystem
167 if document.file_path and document.file_path not in (
168 "metadata_only",
169 "text_only_not_stored",
170 ):
171 file_path = self.library_root / document.file_path
172 if file_path.exists():
173 return True
175 return False
177 def _infer_storage_mode(self, document: Document) -> str:
178 """
179 Infer storage mode for documents without explicit mode set.
180 Used for backward compatibility with existing documents.
181 """
182 # If there's a blob, it's database storage
183 if hasattr(document, "blob") and document.blob:
184 return "database"
185 # If there's a file_path (and not 'metadata_only'), it's filesystem
186 if document.file_path and document.file_path != "metadata_only":
187 return "filesystem"
188 # Otherwise no storage
189 return "none"
191 def _save_to_filesystem(
192 self,
193 pdf_content: bytes,
194 filename: str,
195 url: Optional[str] = None,
196 resource_id: Optional[int] = None,
197 ) -> Path:
198 """
199 Save PDF to filesystem with organized structure.
201 Returns:
202 Absolute path to saved file
203 """
204 # Generate better filename if URL is provided
205 if url:
206 filename = self._generate_filename(url, resource_id, filename)
208 # Create simple flat directory structure - all PDFs in one folder
209 pdf_path = self.library_root / "pdfs"
210 pdf_path.mkdir(parents=True, exist_ok=True)
212 # Use PathValidator with relative path from library_root
213 relative_path = f"pdfs/{filename}"
214 validated_path = PathValidator.validate_safe_path(
215 relative_path,
216 base_dir=str(self.library_root),
217 required_extensions=(".pdf",),
218 )
220 if not validated_path: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 raise ValueError("Invalid file path")
223 # Write the PDF file with security verification
224 # Pass current storage_mode as snapshot since we already validated it
225 from ...security.file_write_verifier import write_file_verified
227 write_file_verified(
228 validated_path,
229 pdf_content,
230 "research_library.pdf_storage_mode",
231 "filesystem",
232 "library PDF storage",
233 mode="wb",
234 settings_snapshot={
235 "research_library.pdf_storage_mode": self.storage_mode
236 },
237 )
239 return Path(validated_path)
241 def _save_to_database(
242 self, pdf_content: bytes, document: Document, session: Session
243 ) -> None:
244 """Store PDF in document_blobs table."""
245 # Check if blob already exists
246 existing_blob = (
247 session.query(DocumentBlob)
248 .filter_by(document_id=document.id)
249 .first()
250 )
252 if existing_blob: 252 ↛ 254line 252 didn't jump to line 254 because the condition on line 252 was never true
253 # Update existing blob
254 existing_blob.pdf_binary = pdf_content
255 existing_blob.blob_hash = hashlib.sha256(pdf_content).hexdigest()
256 existing_blob.stored_at = datetime.now(UTC)
257 logger.debug(f"Updated existing blob for document {document.id}")
258 else:
259 # Create new blob
260 blob = DocumentBlob(
261 document_id=document.id,
262 pdf_binary=pdf_content,
263 blob_hash=hashlib.sha256(pdf_content).hexdigest(),
264 stored_at=datetime.now(UTC),
265 )
266 session.add(blob)
267 logger.debug(f"Created new blob for document {document.id}")
269 def _load_from_filesystem(self, document: Document) -> Optional[bytes]:
270 """Load PDF from filesystem."""
271 if not document.file_path or document.file_path == "metadata_only":
272 return None
274 file_path = self.library_root / document.file_path
276 if not file_path.exists(): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 logger.warning(f"PDF file not found: {file_path}")
278 return None
280 try:
281 return file_path.read_bytes()
282 except Exception:
283 logger.exception(f"Failed to read PDF from {file_path}")
284 return None
286 def _load_from_database(
287 self, document: Document, session: Session
288 ) -> Optional[bytes]:
289 """Load PDF from document_blobs table."""
290 blob = (
291 session.query(DocumentBlob)
292 .filter_by(document_id=document.id)
293 .first()
294 )
296 if not blob:
297 logger.debug(f"No blob found for document {document.id}")
298 return None
300 # Update last accessed timestamp
301 blob.last_accessed = datetime.now(UTC)
303 return blob.pdf_binary
305 def _generate_filename(
306 self, url: str, resource_id: Optional[int], fallback_filename: str
307 ) -> str:
308 """Generate a meaningful filename from URL."""
309 parsed_url = urlparse(url)
310 hostname = parsed_url.hostname or ""
311 timestamp = datetime.now(UTC).strftime("%Y%m%d")
313 if hostname == "arxiv.org" or hostname.endswith(".arxiv.org"):
314 # Extract arXiv ID
315 match = re.search(r"(\d{4}\.\d{4,5})", url)
316 if match: 316 ↛ 318line 316 didn't jump to line 318 because the condition on line 316 was always true
317 return f"arxiv_{match.group(1)}.pdf"
318 return f"arxiv_{timestamp}_{resource_id or 'unknown'}.pdf"
320 elif hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed_url.path:
321 # Extract PMC ID
322 match = re.search(r"(PMC\d+)", url)
323 if match: 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was always true
324 return f"pmc_{match.group(1)}.pdf"
325 return f"pubmed_{timestamp}_{resource_id or 'unknown'}.pdf"
327 # Use fallback filename
328 return fallback_filename
330 def delete_pdf(self, document: Document, session: Session) -> bool:
331 """
332 Delete PDF for a document.
334 Args:
335 document: Document model instance
336 session: Database session
338 Returns:
339 True if deletion succeeded
340 """
341 storage_mode = document.storage_mode or self._infer_storage_mode(
342 document
343 )
345 try:
346 if storage_mode == "filesystem":
347 if document.file_path and document.file_path != "metadata_only": 347 ↛ 352line 347 didn't jump to line 352 because the condition on line 347 was always true
348 file_path = self.library_root / document.file_path
349 if file_path.exists(): 349 ↛ 352line 349 didn't jump to line 352 because the condition on line 349 was always true
350 file_path.unlink()
351 logger.info(f"Deleted PDF file: {file_path}")
352 document.file_path = None
353 document.storage_mode = "none"
354 return True
356 elif storage_mode == "database":
357 blob = (
358 session.query(DocumentBlob)
359 .filter_by(document_id=document.id)
360 .first()
361 )
362 if blob: 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was always true
363 session.delete(blob)
364 logger.info(f"Deleted PDF blob for document {document.id}")
365 document.storage_mode = "none"
366 return True
368 return True # Nothing to delete for 'none' mode
370 except Exception:
371 logger.exception(f"Failed to delete PDF for document {document.id}")
372 return False
374 def upgrade_to_pdf(
375 self, document: Document, pdf_content: bytes, session: Session
376 ) -> bool:
377 """
378 Upgrade a text-only document to include PDF storage.
380 If document already has a PDF stored, returns False (no action needed).
381 If document is text-only, adds the PDF blob and updates storage_mode.
383 Args:
384 document: Document model instance
385 pdf_content: Raw PDF bytes
386 session: Database session
388 Returns:
389 True if PDF was added, False if already had PDF or failed
390 """
391 # Only upgrade if document is currently text-only
392 if document.storage_mode not in (None, "none"):
393 logger.debug(
394 f"Document {document.id} already has storage_mode={document.storage_mode}"
395 )
396 return False
398 # Check if blob already exists (shouldn't happen, but be safe)
399 existing_blob = (
400 session.query(DocumentBlob)
401 .filter_by(document_id=document.id)
402 .first()
403 )
404 if existing_blob: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 logger.debug(f"Document {document.id} already has a blob")
406 return False
408 # Check file size
409 file_size = len(pdf_content)
410 if file_size > self.max_pdf_size_bytes:
411 max_mb = self.max_pdf_size_bytes / (1024 * 1024)
412 logger.warning(
413 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit "
414 f"({max_mb:.0f}MB), skipping upgrade"
415 )
416 return False
418 try:
419 # Add the PDF blob
420 self._save_to_database(pdf_content, document, session)
421 document.storage_mode = "database"
422 document.file_path = None
423 logger.info(f"Upgraded document {document.id} with PDF blob")
424 return True
425 except Exception:
426 logger.exception(
427 f"Failed to upgrade document {document.id} with PDF"
428 )
429 return False