Coverage for src / local_deep_research / research_library / services / pdf_storage_manager.py: 81%

171 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2PDF Storage Manager for Research Library 

3 

4Handles PDF storage across three modes: 

5- none: Don't store PDFs (text-only) 

6- filesystem: Store PDFs unencrypted on disk (fast, external tool compatible) 

7- database: Store PDFs encrypted in SQLCipher database (secure, portable) 

8""" 

9 

10import hashlib 

11import re 

12from datetime import datetime, UTC 

13from pathlib import Path 

14from typing import Optional, Tuple 

15from urllib.parse import urlparse 

16 

17from loguru import logger 

18from sqlalchemy.orm import Session 

19 

20from ...database.models.library import Document, DocumentBlob 

21from ...security.path_validator import PathValidator 

22 

23 

24class PDFStorageManager: 

25 """Unified interface for PDF storage across all modes.""" 

26 

27 def __init__( 

28 self, library_root: Path, storage_mode: str, max_pdf_size_mb: int = 100 

29 ): 

30 """ 

31 Initialize PDF storage manager. 

32 

33 Args: 

34 library_root: Base directory for filesystem storage 

35 storage_mode: One of 'none', 'filesystem', 'database' 

36 max_pdf_size_mb: Maximum PDF file size in MB (default 100) 

37 """ 

38 self.library_root = Path(library_root) 

39 self.storage_mode = storage_mode 

40 self.max_pdf_size_bytes = max_pdf_size_mb * 1024 * 1024 

41 

42 if storage_mode not in ("none", "filesystem", "database"): 

43 logger.warning( 

44 f"Unknown storage mode '{storage_mode}', defaulting to 'none'" 

45 ) 

46 self.storage_mode = "none" 

47 

48 def save_pdf( 

49 self, 

50 pdf_content: bytes, 

51 document: Document, 

52 session: Session, 

53 filename: str, 

54 url: Optional[str] = None, 

55 resource_id: Optional[int] = None, 

56 ) -> Tuple[Optional[str], int]: 

57 """ 

58 Save PDF based on configured storage mode. 

59 

60 Args: 

61 pdf_content: Raw PDF bytes 

62 document: Document model instance 

63 session: Database session 

64 filename: Filename to use for saving 

65 url: Source URL (for generating better filenames) 

66 resource_id: Resource ID (for generating better filenames) 

67 

68 Returns: 

69 Tuple of (file_path or storage indicator, file_size) 

70 - For filesystem: relative path string 

71 - For database: "database" 

72 - For none: None 

73 """ 

74 file_size = len(pdf_content) 

75 

76 # Check file size limit 

77 if file_size > self.max_pdf_size_bytes: 

78 max_mb = self.max_pdf_size_bytes / (1024 * 1024) 

79 logger.warning( 

80 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit " 

81 f"({max_mb:.0f}MB), skipping storage" 

82 ) 

83 return None, file_size 

84 

85 if self.storage_mode == "none": 

86 logger.debug("PDF storage mode is 'none' - skipping PDF save") 

87 return None, file_size 

88 

89 elif self.storage_mode == "filesystem": 

90 file_path = self._save_to_filesystem( 

91 pdf_content, filename, url, resource_id 

92 ) 

93 relative_path = str(file_path.relative_to(self.library_root)) 

94 document.storage_mode = "filesystem" 

95 document.file_path = relative_path 

96 logger.info(f"PDF saved to filesystem: {relative_path}") 

97 return relative_path, file_size 

98 

99 elif self.storage_mode == "database": 99 ↛ 106line 99 didn't jump to line 106 because the condition on line 99 was always true

100 self._save_to_database(pdf_content, document, session) 

101 document.storage_mode = "database" 

102 document.file_path = None # No filesystem path 

103 logger.info(f"PDF saved to database for document {document.id}") 

104 return "database", file_size 

105 

106 return None, file_size 

107 

108 def load_pdf(self, document: Document, session: Session) -> Optional[bytes]: 

109 """ 

110 Load PDF - check database first, then filesystem. 

111 

112 Smart retrieval: doesn't rely on storage_mode column, actually checks 

113 where the PDF exists. 

114 

115 Args: 

116 document: Document model instance 

117 session: Database session 

118 

119 Returns: 

120 PDF bytes or None if not available 

121 """ 

122 # 1. Check database first 

123 pdf_bytes = self._load_from_database(document, session) 

124 if pdf_bytes: 

125 logger.debug(f"Loaded PDF from database for document {document.id}") 

126 return pdf_bytes 

127 

128 # 2. Fallback to filesystem 

129 pdf_bytes = self._load_from_filesystem(document) 

130 if pdf_bytes: 

131 logger.debug( 

132 f"Loaded PDF from filesystem for document {document.id}" 

133 ) 

134 return pdf_bytes 

135 

136 logger.debug(f"No PDF available for document {document.id}") 

137 return None 

138 

139 def has_pdf(self, document: Document, session: Session) -> bool: 

140 """ 

141 Check if PDF is available without loading the actual bytes. 

142 

143 Args: 

144 document: Document model instance 

145 session: Database session 

146 

147 Returns: 

148 True if PDF is available (in database or filesystem) 

149 """ 

150 # Must be a PDF file type 

151 if document.file_type != "pdf": 

152 return False 

153 

154 # Check database first (has blob?) 

155 from ...database.models.library import DocumentBlob 

156 

157 has_blob = ( 

158 session.query(DocumentBlob.id) 

159 .filter_by(document_id=document.id) 

160 .first() 

161 is not None 

162 ) 

163 if has_blob: 

164 return True 

165 

166 # Check filesystem 

167 if document.file_path and document.file_path not in ( 

168 "metadata_only", 

169 "text_only_not_stored", 

170 ): 

171 file_path = self.library_root / document.file_path 

172 if file_path.exists(): 

173 return True 

174 

175 return False 

176 

177 def _infer_storage_mode(self, document: Document) -> str: 

178 """ 

179 Infer storage mode for documents without explicit mode set. 

180 Used for backward compatibility with existing documents. 

181 """ 

182 # If there's a blob, it's database storage 

183 if hasattr(document, "blob") and document.blob: 

184 return "database" 

185 # If there's a file_path (and not 'metadata_only'), it's filesystem 

186 if document.file_path and document.file_path != "metadata_only": 

187 return "filesystem" 

188 # Otherwise no storage 

189 return "none" 

190 

191 def _save_to_filesystem( 

192 self, 

193 pdf_content: bytes, 

194 filename: str, 

195 url: Optional[str] = None, 

196 resource_id: Optional[int] = None, 

197 ) -> Path: 

198 """ 

199 Save PDF to filesystem with organized structure. 

200 

201 Returns: 

202 Absolute path to saved file 

203 """ 

204 # Generate better filename if URL is provided 

205 if url: 

206 filename = self._generate_filename(url, resource_id, filename) 

207 

208 # Create simple flat directory structure - all PDFs in one folder 

209 pdf_path = self.library_root / "pdfs" 

210 pdf_path.mkdir(parents=True, exist_ok=True) 

211 

212 # Use PathValidator with relative path from library_root 

213 relative_path = f"pdfs/{filename}" 

214 validated_path = PathValidator.validate_safe_path( 

215 relative_path, 

216 base_dir=str(self.library_root), 

217 required_extensions=(".pdf",), 

218 ) 

219 

220 if not validated_path: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 raise ValueError("Invalid file path") 

222 

223 # Write the PDF file with security verification 

224 # Pass current storage_mode as snapshot since we already validated it 

225 from ...security.file_write_verifier import write_file_verified 

226 

227 write_file_verified( 

228 validated_path, 

229 pdf_content, 

230 "research_library.pdf_storage_mode", 

231 "filesystem", 

232 "library PDF storage", 

233 mode="wb", 

234 settings_snapshot={ 

235 "research_library.pdf_storage_mode": self.storage_mode 

236 }, 

237 ) 

238 

239 return Path(validated_path) 

240 

241 def _save_to_database( 

242 self, pdf_content: bytes, document: Document, session: Session 

243 ) -> None: 

244 """Store PDF in document_blobs table.""" 

245 # Check if blob already exists 

246 existing_blob = ( 

247 session.query(DocumentBlob) 

248 .filter_by(document_id=document.id) 

249 .first() 

250 ) 

251 

252 if existing_blob: 252 ↛ 254line 252 didn't jump to line 254 because the condition on line 252 was never true

253 # Update existing blob 

254 existing_blob.pdf_binary = pdf_content 

255 existing_blob.blob_hash = hashlib.sha256(pdf_content).hexdigest() 

256 existing_blob.stored_at = datetime.now(UTC) 

257 logger.debug(f"Updated existing blob for document {document.id}") 

258 else: 

259 # Create new blob 

260 blob = DocumentBlob( 

261 document_id=document.id, 

262 pdf_binary=pdf_content, 

263 blob_hash=hashlib.sha256(pdf_content).hexdigest(), 

264 stored_at=datetime.now(UTC), 

265 ) 

266 session.add(blob) 

267 logger.debug(f"Created new blob for document {document.id}") 

268 

269 def _load_from_filesystem(self, document: Document) -> Optional[bytes]: 

270 """Load PDF from filesystem.""" 

271 if not document.file_path or document.file_path == "metadata_only": 

272 return None 

273 

274 file_path = self.library_root / document.file_path 

275 

276 if not file_path.exists(): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 logger.warning(f"PDF file not found: {file_path}") 

278 return None 

279 

280 try: 

281 return file_path.read_bytes() 

282 except Exception: 

283 logger.exception(f"Failed to read PDF from {file_path}") 

284 return None 

285 

286 def _load_from_database( 

287 self, document: Document, session: Session 

288 ) -> Optional[bytes]: 

289 """Load PDF from document_blobs table.""" 

290 blob = ( 

291 session.query(DocumentBlob) 

292 .filter_by(document_id=document.id) 

293 .first() 

294 ) 

295 

296 if not blob: 

297 logger.debug(f"No blob found for document {document.id}") 

298 return None 

299 

300 # Update last accessed timestamp 

301 blob.last_accessed = datetime.now(UTC) 

302 

303 return blob.pdf_binary 

304 

305 def _generate_filename( 

306 self, url: str, resource_id: Optional[int], fallback_filename: str 

307 ) -> str: 

308 """Generate a meaningful filename from URL.""" 

309 parsed_url = urlparse(url) 

310 hostname = parsed_url.hostname or "" 

311 timestamp = datetime.now(UTC).strftime("%Y%m%d") 

312 

313 if hostname == "arxiv.org" or hostname.endswith(".arxiv.org"): 

314 # Extract arXiv ID 

315 match = re.search(r"(\d{4}\.\d{4,5})", url) 

316 if match: 316 ↛ 318line 316 didn't jump to line 318 because the condition on line 316 was always true

317 return f"arxiv_{match.group(1)}.pdf" 

318 return f"arxiv_{timestamp}_{resource_id or 'unknown'}.pdf" 

319 

320 elif hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed_url.path: 

321 # Extract PMC ID 

322 match = re.search(r"(PMC\d+)", url) 

323 if match: 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was always true

324 return f"pmc_{match.group(1)}.pdf" 

325 return f"pubmed_{timestamp}_{resource_id or 'unknown'}.pdf" 

326 

327 # Use fallback filename 

328 return fallback_filename 

329 

330 def delete_pdf(self, document: Document, session: Session) -> bool: 

331 """ 

332 Delete PDF for a document. 

333 

334 Args: 

335 document: Document model instance 

336 session: Database session 

337 

338 Returns: 

339 True if deletion succeeded 

340 """ 

341 storage_mode = document.storage_mode or self._infer_storage_mode( 

342 document 

343 ) 

344 

345 try: 

346 if storage_mode == "filesystem": 

347 if document.file_path and document.file_path != "metadata_only": 347 ↛ 352line 347 didn't jump to line 352 because the condition on line 347 was always true

348 file_path = self.library_root / document.file_path 

349 if file_path.exists(): 349 ↛ 352line 349 didn't jump to line 352 because the condition on line 349 was always true

350 file_path.unlink() 

351 logger.info(f"Deleted PDF file: {file_path}") 

352 document.file_path = None 

353 document.storage_mode = "none" 

354 return True 

355 

356 elif storage_mode == "database": 

357 blob = ( 

358 session.query(DocumentBlob) 

359 .filter_by(document_id=document.id) 

360 .first() 

361 ) 

362 if blob: 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was always true

363 session.delete(blob) 

364 logger.info(f"Deleted PDF blob for document {document.id}") 

365 document.storage_mode = "none" 

366 return True 

367 

368 return True # Nothing to delete for 'none' mode 

369 

370 except Exception: 

371 logger.exception(f"Failed to delete PDF for document {document.id}") 

372 return False 

373 

374 def upgrade_to_pdf( 

375 self, document: Document, pdf_content: bytes, session: Session 

376 ) -> bool: 

377 """ 

378 Upgrade a text-only document to include PDF storage. 

379 

380 If document already has a PDF stored, returns False (no action needed). 

381 If document is text-only, adds the PDF blob and updates storage_mode. 

382 

383 Args: 

384 document: Document model instance 

385 pdf_content: Raw PDF bytes 

386 session: Database session 

387 

388 Returns: 

389 True if PDF was added, False if already had PDF or failed 

390 """ 

391 # Only upgrade if document is currently text-only 

392 if document.storage_mode not in (None, "none"): 

393 logger.debug( 

394 f"Document {document.id} already has storage_mode={document.storage_mode}" 

395 ) 

396 return False 

397 

398 # Check if blob already exists (shouldn't happen, but be safe) 

399 existing_blob = ( 

400 session.query(DocumentBlob) 

401 .filter_by(document_id=document.id) 

402 .first() 

403 ) 

404 if existing_blob: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 logger.debug(f"Document {document.id} already has a blob") 

406 return False 

407 

408 # Check file size 

409 file_size = len(pdf_content) 

410 if file_size > self.max_pdf_size_bytes: 

411 max_mb = self.max_pdf_size_bytes / (1024 * 1024) 

412 logger.warning( 

413 f"PDF size ({file_size / (1024 * 1024):.1f}MB) exceeds limit " 

414 f"({max_mb:.0f}MB), skipping upgrade" 

415 ) 

416 return False 

417 

418 try: 

419 # Add the PDF blob 

420 self._save_to_database(pdf_content, document, session) 

421 document.storage_mode = "database" 

422 document.file_path = None 

423 logger.info(f"Upgraded document {document.id} with PDF blob") 

424 return True 

425 except Exception: 

426 logger.exception( 

427 f"Failed to upgrade document {document.id} with PDF" 

428 ) 

429 return False