Coverage for src/local_deep_research/research_library/services/download_service.py: 95%

717 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2PDF Download Service for Research Library 

3 

4Handles downloading PDFs from various academic sources with: 

5- Deduplication using download tracker 

6- Source-specific download strategies (arXiv, PubMed, etc.) 

7- Progress tracking and error handling 

8- File organization and storage 

9""" 

10 

11import hashlib 

12import os 

13import re 

14import time 

15import uuid 

16from datetime import datetime, UTC 

17from pathlib import Path 

18from typing import Optional, Tuple 

19from urllib.parse import urlparse 

20 

21import requests 

22from loguru import logger 

23from sqlalchemy.orm import Session 

24import pdfplumber 

25from pypdf import PdfReader 

26 

27from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY 

28from ...database.models.download_tracker import ( 

29 DownloadAttempt, 

30 DownloadTracker, 

31) 

32from ...security import safe_get, sanitize_for_log 

33from ...security.path_validator import PathValidator 

34from ...database.models.library import ( 

35 Collection, 

36 Document as Document, 

37 DocumentStatus, 

38 DownloadQueue as LibraryDownloadQueue, 

39) 

40from .pdf_storage_manager import DEFAULT_MAX_PDF_SIZE_MB, PDFStorageManager 

41from ...database.models.research import ResearchResource 

42from ...database.library_init import get_source_type_id, get_default_library_id 

43from ...database.session_context import get_user_db_session, safe_rollback 

44from ...utilities.db_utils import get_settings_manager 

45from ...library.download_management import RetryManager 

46from ...config.paths import get_library_directory 

47from ..utils import ( 

48 ensure_in_collection, 

49 get_document_for_resource, 

50 get_url_hash, 

51 get_absolute_path_from_settings, 

52 is_downloadable_url, 

53) 

54 

55# Import our modular downloaders 

56from ..downloaders import ( 

57 ContentType, 

58 ArxivDownloader, 

59 PubMedDownloader, 

60 BioRxivDownloader, 

61 DirectPDFDownloader, 

62 SemanticScholarDownloader, 

63 OpenAlexDownloader, 

64 GenericDownloader, 

65) 

66 

67 

68class DownloadService: 

69 """Service for downloading and managing research PDFs.""" 

70 

71 def __init__(self, username: str, password: Optional[str] = None): 

72 """Initialize download service for a user. 

73 

74 Args: 

75 username: The username to download for 

76 password: Optional password for encrypted database access 

77 """ 

78 self.username = username 

79 self.password = password 

80 self.settings = get_settings_manager(username=username) 

81 self._closed = False 

82 

83 # Debug settings manager and user context 

84 logger.info( 

85 f"[DOWNLOAD_SERVICE] Settings manager initialized: {type(self.settings)}, username: {self.username}" 

86 ) 

87 

88 # Get library path from settings (uses centralized path, respects LDR_DATA_DIR) 

89 storage_path_setting = self.settings.get_setting( 

90 "research_library.storage_path", 

91 str(get_library_directory()), 

92 ) 

93 logger.warning( 

94 f"[DOWNLOAD_SERVICE_INIT] Storage path setting retrieved: {storage_path_setting} (type: {type(storage_path_setting)})" 

95 ) 

96 

97 if storage_path_setting is None: 

98 logger.error( 

99 "[DOWNLOAD_SERVICE_INIT] CRITICAL: storage_path_setting is None!" 

100 ) 

101 raise ValueError("Storage path setting cannot be None") 

102 

103 self.library_root = str( 

104 Path(os.path.expandvars(storage_path_setting)) 

105 .expanduser() 

106 .resolve() 

107 ) 

108 logger.warning( 

109 f"[DOWNLOAD_SERVICE_INIT] Library root resolved to: {self.library_root}" 

110 ) 

111 

112 # Create directory structure 

113 self._setup_directories() 

114 

115 # Initialize modular downloaders 

116 # DirectPDFDownloader first for efficiency with direct PDF links 

117 

118 # Get Semantic Scholar API key from settings 

119 semantic_scholar_api_key = self.settings.get_setting( 

120 "search.engine.web.semantic_scholar.api_key", "" 

121 ) 

122 

123 self.downloaders = [ 

124 DirectPDFDownloader(timeout=30), # Handle direct PDF links first 

125 SemanticScholarDownloader( 

126 timeout=30, 

127 api_key=semantic_scholar_api_key 

128 if semantic_scholar_api_key 

129 else None, 

130 ), 

131 OpenAlexDownloader( 

132 timeout=30 

133 ), # OpenAlex with API lookup (no key needed) 

134 ArxivDownloader(timeout=30), 

135 PubMedDownloader(timeout=30, rate_limit_delay=1.0), 

136 BioRxivDownloader(timeout=30), 

137 GenericDownloader(timeout=30), # Generic should be last (fallback) 

138 ] 

139 

140 # Initialize retry manager for smart failure tracking 

141 self.retry_manager = RetryManager(username, password) 

142 logger.info( 

143 f"[DOWNLOAD_SERVICE] Initialized retry manager for user: {username}" 

144 ) 

145 

146 # PubMed rate limiting state 

147 self._pubmed_delay = 1.0 # 1 second delay for PubMed 

148 self._last_pubmed_request = 0.0 # Track last request time 

149 

150 def close(self): 

151 """Close all downloader resources.""" 

152 if self._closed: 

153 return 

154 self._closed = True 

155 

156 from ...utilities.resource_utils import safe_close 

157 

158 for downloader in self.downloaders: 

159 safe_close(downloader, "downloader") 

160 

161 # Close the settings manager's DB session to return the connection 

162 # to the pool. SettingsManager.close() is idempotent. 

163 safe_close(self.settings, "settings manager", allow_none=True) 

164 

165 # Clear references to allow garbage collection 

166 self.downloaders = [] 

167 self.retry_manager = None 

168 self.settings = None 

169 

170 def __enter__(self): 

171 """Enter context manager.""" 

172 return self 

173 

174 def __exit__(self, exc_type, exc_val, exc_tb): 

175 """Exit context manager, ensuring cleanup.""" 

176 self.close() 

177 return False 

178 

179 def _setup_directories(self): 

180 """Create library directory structure.""" 

181 # Only create the root and pdfs folder - flat structure 

182 paths = [ 

183 self.library_root, 

184 str(Path(self.library_root) / "pdfs"), 

185 ] 

186 for path in paths: 

187 Path(path).mkdir(parents=True, exist_ok=True) 

188 

189 def _normalize_url(self, url: str) -> str: 

190 """Normalize URL for consistent hashing.""" 

191 # Remove protocol variations 

192 url = re.sub(r"^https?://", "", url) 

193 # Remove www 

194 url = re.sub(r"^www\.", "", url) 

195 # Remove trailing slashes 

196 url = url.rstrip("/") 

197 # Sort query parameters 

198 if "?" in url: 

199 base, query = url.split("?", 1) 

200 params = sorted(query.split("&")) 

201 url = f"{base}?{'&'.join(params)}" 

202 return url.lower() 

203 

204 def _get_url_hash(self, url: str) -> str: 

205 """Generate SHA256 hash of normalized URL.""" 

206 normalized = self._normalize_url(url) 

207 return get_url_hash(normalized) 

208 

209 def is_already_downloaded(self, url: str) -> Tuple[bool, Optional[str]]: 

210 """ 

211 Check if URL is already downloaded. 

212 

213 Returns: 

214 Tuple of (is_downloaded, file_path) 

215 """ 

216 url_hash = self._get_url_hash(url) 

217 

218 with get_user_db_session(self.username, self.password) as session: 

219 tracker = ( 

220 session.query(DownloadTracker) 

221 .filter_by(url_hash=url_hash, is_downloaded=True) 

222 .first() 

223 ) 

224 

225 if tracker and tracker.file_path: 

226 # Compute absolute path and verify file still exists 

227 absolute_path = get_absolute_path_from_settings( 

228 tracker.file_path 

229 ) 

230 if absolute_path and absolute_path.is_file(): 

231 return True, str(absolute_path) 

232 if absolute_path: 

233 # File was deleted, mark as not downloaded 

234 tracker.is_downloaded = False 

235 session.commit() 

236 # If absolute_path is None, path was blocked - treat as not downloaded 

237 

238 return False, None 

239 

240 def get_text_content(self, resource_id: int) -> Optional[str]: 

241 """ 

242 Get text content for a research resource. 

243 

244 This will try to: 

245 1. Fetch text directly from APIs if available 

246 2. Extract text from downloaded PDF if exists 

247 3. Download PDF and extract text if not yet downloaded 

248 

249 Args: 

250 resource_id: ID of the research resource 

251 

252 Returns: 

253 Text content as string, or None if extraction failed 

254 """ 

255 with get_user_db_session(self.username, self.password) as session: 

256 resource = session.query(ResearchResource).get(resource_id) 

257 if not resource: 

258 logger.error(f"Resource {resource_id} not found") 

259 return None 

260 

261 url = resource.url 

262 

263 # Find appropriate downloader 

264 for downloader in self.downloaders: 

265 if downloader.can_handle(url): 

266 logger.info( 

267 f"Using {downloader.__class__.__name__} for text extraction from {url}" 

268 ) 

269 try: 

270 # Try to get text content 

271 text = downloader.download_text(url) 

272 if text: 

273 logger.info( 

274 f"Successfully extracted text for: {resource.title[:50]}" 

275 ) 

276 return text 

277 except Exception: 

278 logger.exception("Failed to extract text") 

279 break 

280 

281 logger.warning(f"Could not extract text for {url}") 

282 return None 

283 

284 def queue_research_downloads( 

285 self, research_id: str, collection_id: Optional[str] = None 

286 ) -> int: 

287 """ 

288 Queue all downloadable PDFs from a research session. 

289 

290 Args: 

291 research_id: The research session ID 

292 collection_id: Optional target collection ID (defaults to Library if not provided) 

293 

294 Returns: 

295 Number of items queued 

296 """ 

297 queued = 0 

298 

299 # Get default library collection if no collection_id provided 

300 if not collection_id: 

301 from ...database.library_init import get_default_library_id 

302 

303 collection_id = get_default_library_id(self.username, self.password) 

304 

305 with get_user_db_session(self.username, self.password) as session: 

306 # Get all resources for this research 

307 resources = ( 

308 session.query(ResearchResource) 

309 .filter_by(research_id=research_id) 

310 .all() 

311 ) 

312 

313 for resource in resources: 

314 if self._is_downloadable(resource): 

315 # Library resources linked via document_id are already done 

316 if resource.document_id: 

317 continue 

318 

319 # Check if already queued 

320 existing_queue = ( 

321 session.query(LibraryDownloadQueue) 

322 .filter_by( 

323 resource_id=resource.id, 

324 status=DocumentStatus.PENDING, 

325 ) 

326 .first() 

327 ) 

328 

329 # Check if already downloaded (trust the database status) 

330 existing_doc = ( 

331 session.query(Document) 

332 .filter_by( 

333 resource_id=resource.id, 

334 status=DocumentStatus.COMPLETED, 

335 ) 

336 .first() 

337 ) 

338 

339 # Queue if not already queued and not marked as completed 

340 if not existing_queue and not existing_doc: 340 ↛ 313line 340 didn't jump to line 313 because the condition on line 340 was always true

341 # Check one more time if ANY queue entry exists (regardless of status) 

342 any_queue = ( 

343 session.query(LibraryDownloadQueue) 

344 .filter_by(resource_id=resource.id) 

345 .first() 

346 ) 

347 

348 if any_queue: 

349 # Reset the existing queue entry 

350 any_queue.status = DocumentStatus.PENDING 

351 any_queue.research_id = research_id 

352 any_queue.collection_id = collection_id 

353 queued += 1 

354 else: 

355 # Add new queue entry 

356 queue_entry = LibraryDownloadQueue( 

357 resource_id=resource.id, 

358 research_id=research_id, 

359 collection_id=collection_id, 

360 priority=0, 

361 status=DocumentStatus.PENDING, 

362 ) 

363 session.add(queue_entry) 

364 queued += 1 

365 

366 session.commit() 

367 logger.info( 

368 f"Queued {queued} downloads for research {research_id} to collection {collection_id}" 

369 ) 

370 

371 return queued 

372 

373 def _is_downloadable(self, resource: ResearchResource) -> bool: 

374 """Check if a resource is likely downloadable as PDF. 

375 

376 Delegates to the consolidated is_downloadable_url() from utils. 

377 """ 

378 return is_downloadable_url(resource.url) 

379 

380 def download_resource(self, resource_id: int) -> Tuple[bool, Optional[str]]: 

381 """ 

382 Download a specific resource. 

383 

384 Returns: 

385 Tuple of (success: bool, skip_reason: str or None) 

386 """ 

387 with get_user_db_session(self.username, self.password) as session: 

388 resource = session.query(ResearchResource).get(resource_id) 

389 if not resource: 

390 logger.error(f"Resource {resource_id} not found") 

391 return False, "Resource not found" 

392 

393 # Check if already downloaded (trust the database after sync) 

394 existing_doc = ( 

395 session.query(Document) 

396 .filter_by( 

397 resource_id=resource_id, status=DocumentStatus.COMPLETED 

398 ) 

399 .first() 

400 ) 

401 

402 if existing_doc: 

403 logger.info( 

404 "Resource already downloaded (according to database)" 

405 ) 

406 return True, None 

407 

408 # Get collection_id from queue entry if it exists 

409 queue_entry = ( 

410 session.query(LibraryDownloadQueue) 

411 .filter_by(resource_id=resource_id) 

412 .first() 

413 ) 

414 collection_id = ( 

415 queue_entry.collection_id 

416 if queue_entry and queue_entry.collection_id 

417 else None 

418 ) 

419 

420 # Create download tracker entry 

421 url_hash = self._get_url_hash(resource.url) 

422 tracker = ( 

423 session.query(DownloadTracker) 

424 .filter_by(url_hash=url_hash) 

425 .first() 

426 ) 

427 

428 if not tracker: 

429 tracker = DownloadTracker( 

430 url=resource.url, 

431 url_hash=url_hash, 

432 first_resource_id=resource.id, 

433 is_downloaded=False, 

434 ) 

435 session.add(tracker) 

436 session.commit() 

437 

438 # Attempt download 

439 success, skip_reason, status_code = self._download_pdf( 

440 resource, tracker, session, collection_id 

441 ) 

442 

443 # Record attempt with retry manager for smart failure tracking 

444 self.retry_manager.record_attempt( 

445 resource_id=resource.id, 

446 result=(success, skip_reason), 

447 status_code=status_code, 

448 url=resource.url, 

449 details=skip_reason 

450 or ( 

451 "Successfully downloaded" if success else "Download failed" 

452 ), 

453 session=session, 

454 ) 

455 

456 # Update queue status if exists 

457 queue_entry = ( 

458 session.query(LibraryDownloadQueue) 

459 .filter_by(resource_id=resource_id) 

460 .first() 

461 ) 

462 

463 if queue_entry: 

464 queue_entry.status = ( 

465 DocumentStatus.COMPLETED 

466 if success 

467 else DocumentStatus.FAILED 

468 ) 

469 queue_entry.completed_at = datetime.now(UTC) 

470 

471 session.commit() 

472 

473 # Trigger auto-indexing for successfully downloaded documents 

474 if success and self.password: 

475 try: 

476 from ..routes.rag_routes import trigger_auto_index 

477 from ...database.library_init import get_default_library_id 

478 

479 # Get the document that was just created 

480 doc = ( 

481 session.query(Document) 

482 .filter_by(resource_id=resource_id) 

483 .order_by(Document.created_at.desc()) 

484 .first() 

485 ) 

486 if doc: 

487 # Use collection_id from queue entry or default Library 

488 # NB: pass username string, not the SQLAlchemy session 

489 target_collection = ( 

490 collection_id 

491 or get_default_library_id( 

492 self.username, self.password 

493 ) 

494 ) 

495 if target_collection: 495 ↛ anywhereline 495 didn't jump anywhere: it always raised an exception.

496 trigger_auto_index( 

497 [doc.id], 

498 target_collection, 

499 self.username, 

500 self.password, 

501 ) 

502 except Exception: 

503 logger.exception("Failed to trigger auto-indexing") 

504 

505 return success, skip_reason 

506 

507 def _download_pdf( 

508 self, 

509 resource: ResearchResource, 

510 tracker: DownloadTracker, 

511 session: Session, 

512 collection_id: Optional[str] = None, 

513 ) -> Tuple[bool, Optional[str], Optional[int]]: 

514 """ 

515 Perform the actual PDF download. 

516 

517 Args: 

518 resource: The research resource to download 

519 tracker: Download tracker for this URL 

520 session: Database session 

521 collection_id: Optional target collection ID (defaults to Library if not provided) 

522 

523 Returns: 

524 Tuple of (success: bool, skip_reason: Optional[str], status_code: Optional[int]) 

525 """ 

526 url = resource.url 

527 

528 # Log attempt 

529 attempt = DownloadAttempt( 

530 url_hash=tracker.url_hash, 

531 attempt_number=tracker.download_attempts.count() + 1 

532 if hasattr(tracker, "download_attempts") 

533 else 1, 

534 attempted_at=datetime.now(UTC), 

535 ) 

536 session.add(attempt) 

537 

538 try: 

539 # Use modular downloaders with detailed skip reasons 

540 pdf_content = None 

541 downloader_used = None 

542 skip_reason = None 

543 status_code = None 

544 

545 for downloader in self.downloaders: 

546 if downloader.can_handle(url): 

547 logger.info( 

548 f"Using {downloader.__class__.__name__} for {url}" 

549 ) 

550 result = downloader.download_with_result( 

551 url, ContentType.PDF 

552 ) 

553 downloader_used = downloader.__class__.__name__ 

554 

555 if result.is_success and result.content: 

556 pdf_content = result.content 

557 status_code = result.status_code 

558 break 

559 if result.skip_reason: 559 ↛ 545line 559 didn't jump to line 545 because the condition on line 559 was always true

560 skip_reason = result.skip_reason 

561 status_code = result.status_code 

562 logger.info(f"Download skipped: {skip_reason}") 

563 # Keep trying other downloaders unless it's the GenericDownloader 

564 if isinstance(downloader, GenericDownloader): 

565 break 

566 

567 if not downloader_used: 

568 logger.error(f"No downloader found for {url}") 

569 skip_reason = "No compatible downloader available" 

570 

571 if not pdf_content: 

572 error_msg = skip_reason or "Failed to download PDF content" 

573 # Store skip reason in attempt for retrieval 

574 attempt.error_message = error_msg 

575 attempt.succeeded = False 

576 session.commit() 

577 logger.info(f"Download failed with reason: {error_msg}") 

578 return False, error_msg, status_code 

579 

580 # Get PDF storage mode setting 

581 pdf_storage_mode = self.settings.get_setting( 

582 "research_library.pdf_storage_mode", "none" 

583 ) 

584 max_pdf_size_mb = int( 

585 self.settings.get_setting( 

586 "research_library.max_pdf_size_mb", 

587 DEFAULT_MAX_PDF_SIZE_MB, 

588 ) 

589 ) 

590 logger.info( 

591 f"[DOWNLOAD_SERVICE] PDF storage mode: {pdf_storage_mode}" 

592 ) 

593 

594 # Update tracker 

595 import hashlib 

596 

597 tracker.file_hash = hashlib.sha256(pdf_content).hexdigest() 

598 tracker.file_size = len(pdf_content) 

599 tracker.is_downloaded = True 

600 tracker.downloaded_at = datetime.now(UTC) 

601 

602 # Initialize PDF storage manager 

603 pdf_storage_manager = PDFStorageManager( 

604 library_root=self.library_root, 

605 storage_mode=pdf_storage_mode, 

606 max_pdf_size_mb=max_pdf_size_mb, 

607 ) 

608 

609 # Update attempt with success info 

610 attempt.succeeded = True 

611 

612 # Check if library document already exists 

613 existing_doc = get_document_for_resource(session, resource) 

614 

615 if existing_doc: 

616 # Update existing document. Only replace document_hash when 

617 # transitioning from FAILED — that's the placeholder hash from 

618 # _record_failed_text_extraction. For any other prior state 

619 # the hash is already a real content hash and clobbering it 

620 # risks UNIQUE-constraint collisions (issue #3827). 

621 was_failed = existing_doc.status == DocumentStatus.FAILED 

622 if was_failed: 622 ↛ 623line 622 didn't jump to line 623 because the condition on line 622 was never true

623 existing_doc.document_hash = tracker.file_hash 

624 existing_doc.file_size = len(pdf_content) 

625 existing_doc.status = DocumentStatus.COMPLETED 

626 existing_doc.processed_at = datetime.now(UTC) 

627 

628 # Save PDF using storage manager (updates storage_mode and file_path) 

629 file_path_result, _ = pdf_storage_manager.save_pdf( 

630 pdf_content=pdf_content, 

631 document=existing_doc, 

632 session=session, 

633 filename=f"{resource.id}.pdf", 

634 url=url, 

635 resource_id=resource.id, 

636 ) 

637 

638 # Update tracker 

639 tracker.file_path = ( 

640 file_path_result if file_path_result else None 

641 ) 

642 tracker.file_name = ( 

643 Path(file_path_result).name 

644 if file_path_result and file_path_result != "database" 

645 else None 

646 ) 

647 else: 

648 # Get source type ID for research downloads 

649 try: 

650 source_type_id = get_source_type_id( 

651 self.username, "research_download", self.password 

652 ) 

653 # Use provided collection_id or default to Library 

654 library_collection_id = ( 

655 collection_id 

656 or get_default_library_id(self.username, self.password) 

657 ) 

658 except Exception: 

659 logger.exception( 

660 "Failed to get source type or library collection" 

661 ) 

662 raise 

663 

664 # Create new unified document entry 

665 doc_id = str(uuid.uuid4()) 

666 doc = Document( 

667 id=doc_id, 

668 source_type_id=source_type_id, 

669 resource_id=resource.id, 

670 research_id=resource.research_id, 

671 document_hash=tracker.file_hash, 

672 original_url=url, 

673 file_size=len(pdf_content), 

674 file_type="pdf", 

675 mime_type="application/pdf", 

676 title=resource.title, 

677 status=DocumentStatus.COMPLETED, 

678 processed_at=datetime.now(UTC), 

679 storage_mode=pdf_storage_mode, 

680 ) 

681 session.add(doc) 

682 session.flush() # Ensure doc.id is available for blob storage 

683 

684 # Save PDF using storage manager (updates storage_mode and file_path) 

685 file_path_result, _ = pdf_storage_manager.save_pdf( 

686 pdf_content=pdf_content, 

687 document=doc, 

688 session=session, 

689 filename=f"{resource.id}.pdf", 

690 url=url, 

691 resource_id=resource.id, 

692 ) 

693 

694 # Update tracker 

695 tracker.file_path = ( 

696 file_path_result if file_path_result else None 

697 ) 

698 tracker.file_name = ( 

699 Path(file_path_result).name 

700 if file_path_result and file_path_result != "database" 

701 else None 

702 ) 

703 

704 # Link document to default Library collection 

705 ensure_in_collection(session, doc_id, library_collection_id) 

706 

707 # Update attempt 

708 attempt.succeeded = True 

709 attempt.bytes_downloaded = len(pdf_content) 

710 

711 if pdf_storage_mode == "database": 

712 logger.info( 

713 f"Successfully stored PDF in database: {resource.url}" 

714 ) 

715 elif pdf_storage_mode == "filesystem": 

716 logger.info(f"Successfully downloaded: {tracker.file_path}") 

717 else: 

718 logger.info(f"Successfully extracted text from: {resource.url}") 

719 

720 # Automatically extract and save text after successful PDF download 

721 try: 

722 logger.info( 

723 f"Extracting text from downloaded PDF for: {resource.title[:50]}" 

724 ) 

725 text = self._extract_text_from_pdf(pdf_content) 

726 

727 if text: 

728 # Get the document ID we just created/updated 

729 pdf_doc = get_document_for_resource(session, resource) 

730 pdf_document_id = pdf_doc.id if pdf_doc else None 

731 

732 # Save text to encrypted database 

733 self._save_text_with_db( 

734 resource=resource, 

735 text=text, 

736 session=session, 

737 extraction_method="pdf_extraction", 

738 extraction_source="local_pdf", 

739 pdf_document_id=pdf_document_id, 

740 ) 

741 logger.info( 

742 f"Successfully extracted and saved text for: {resource.title[:50]}" 

743 ) 

744 else: 

745 logger.warning( 

746 f"Text extraction returned empty text for: {resource.title[:50]}" 

747 ) 

748 except Exception: 

749 logger.exception( 

750 "Failed to extract text from PDF, but PDF download succeeded" 

751 ) 

752 # Don't fail the entire download if text extraction fails 

753 

754 return True, None, status_code 

755 

756 except Exception as e: 

757 logger.exception(f"Download failed for {url}") 

758 attempt.succeeded = False 

759 attempt.error_type = type(e).__name__ 

760 attempt.error_message = sanitize_for_log(str(e), max_length=200) 

761 tracker.is_accessible = False 

762 # Sanitize error message before returning to API 

763 safe_error = attempt.error_message 

764 return False, safe_error, None 

765 

766 def _extract_text_from_pdf(self, pdf_content: bytes) -> Optional[str]: 

767 """ 

768 Extract text from PDF content using multiple methods for best results. 

769 

770 Args: 

771 pdf_content: Raw PDF bytes 

772 

773 Returns: 

774 Extracted text or None if extraction fails 

775 """ 

776 try: 

777 # First try with pdfplumber (better for complex layouts) 

778 import io 

779 

780 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf: 

781 text_parts = [] 

782 for page in pdf.pages: 

783 page_text = page.extract_text() 

784 if page_text: 

785 text_parts.append(page_text) 

786 

787 if text_parts: 

788 return "\n\n".join(text_parts) 

789 

790 # Fallback to PyPDF if pdfplumber fails 

791 reader = PdfReader(io.BytesIO(pdf_content)) 

792 text_parts = [] 

793 for page in reader.pages: 

794 text = page.extract_text() 

795 if text: 

796 text_parts.append(text) 

797 

798 if text_parts: 

799 return "\n\n".join(text_parts) 

800 

801 logger.warning("No text could be extracted from PDF") 

802 return None 

803 

804 except Exception: 

805 logger.exception("Failed to extract text from PDF") 

806 return None 

807 

808 def download_as_text(self, resource_id: int) -> Tuple[bool, Optional[str]]: 

809 """ 

810 Download resource and extract text to encrypted database. 

811 

812 Args: 

813 resource_id: ID of the resource to download 

814 

815 Returns: 

816 Tuple of (success, error_message) 

817 """ 

818 with get_user_db_session(self.username, self.password) as session: 

819 # Get the resource 

820 resource = ( 

821 session.query(ResearchResource) 

822 .filter_by(id=resource_id) 

823 .first() 

824 ) 

825 if not resource: 

826 return False, "Resource not found" 

827 

828 # Handle library resources — content already in the database 

829 # (local-only, no network — skip retry checks) 

830 if resource.source_type == "library" or ( 

831 resource.url and resource.url.startswith("/library/document/") 

832 ): 

833 return self._try_library_text_extraction(session, resource) 

834 

835 # Try existing text in database 

836 result = self._try_existing_text(session, resource_id) 

837 if result is not None: 

838 return result 

839 

840 # Try legacy text files on disk 

841 result = self._try_legacy_text_file(session, resource, resource_id) 

842 if result is not None: 842 ↛ 843line 842 didn't jump to line 843 because the condition on line 842 was never true

843 return result 

844 

845 # Try extracting from existing PDF 

846 result = self._try_existing_pdf_extraction( 

847 session, resource, resource_id 

848 ) 

849 if result is not None: 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true

850 return result 

851 

852 # Check retry eligibility before network-dependent extraction 

853 if self.retry_manager: 853 ↛ 862line 853 didn't jump to line 862 because the condition on line 853 was always true

854 decision = self.retry_manager.should_retry_resource(resource_id) 

855 if not decision.can_retry: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true

856 logger.info( 

857 f"Skipping resource {resource_id}: {decision.reason}" 

858 ) 

859 return False, decision.reason 

860 

861 # Try API text extraction 

862 result = self._try_api_text_extraction(session, resource) 

863 if result is not None: 

864 self._record_retry_attempt(resource, result, session) 

865 return result 

866 

867 # Fallback: Download PDF and extract 

868 result = self._fallback_pdf_extraction(session, resource) 

869 self._record_retry_attempt(resource, result, session) 

870 return result 

871 

872 def _record_retry_attempt( 

873 self, resource, result: Tuple[bool, Optional[str]], session=None 

874 ) -> None: 

875 """Record a download attempt with the retry manager.""" 

876 if not self.retry_manager: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 return 

878 self.retry_manager.record_attempt( 

879 resource_id=resource.id, 

880 result=result, 

881 url=resource.url or "", 

882 details=result[1] 

883 or ( 

884 "Successfully extracted text" 

885 if result[0] 

886 else "Text extraction failed" 

887 ), 

888 session=session, 

889 ) 

890 

891 def _try_library_text_extraction( 

892 self, session, resource 

893 ) -> Tuple[bool, Optional[str]]: 

894 """Handle library resources — content already exists in local database. 

895 

896 Returns: 

897 Tuple of (success, error_message). On success: (True, None). 

898 On failure: (False, description of what went wrong). 

899 """ 

900 # 1. Extract document UUID from metadata (authoritative) or URL (fallback) 

901 doc_id = None 

902 metadata = (resource.resource_metadata or {}).get("original_data", {}) 

903 if isinstance(metadata, dict): 

904 meta_inner = metadata.get("metadata", {}) 

905 if isinstance(meta_inner, dict): 

906 doc_id = meta_inner.get("source_id") or meta_inner.get( 

907 "document_id" 

908 ) 

909 

910 if not doc_id and resource.url: 

911 # Parse from /library/document/{uuid} or /library/document/{uuid}/pdf 

912 match = re.match(r"^/library/document/([^/]+)", resource.url) 

913 if match: 913 ↛ 916line 913 didn't jump to line 916 because the condition on line 913 was always true

914 doc_id = match.group(1) 

915 

916 if not doc_id: 

917 return False, "Could not extract library document ID" 

918 

919 # 2. Query the existing Document by its primary key (UUID string) 

920 doc = session.query(Document).filter_by(id=doc_id).first() 

921 if not doc: 

922 return False, f"Library document {doc_id} not found in database" 

923 

924 # 3. If text already exists and extraction succeeded, return success 

925 if ( 

926 doc.text_content 

927 and doc.extraction_method 

928 and doc.extraction_method != "failed" 

929 ): 

930 logger.info(f"Library document {doc_id} already has text content") 

931 resource.document_id = doc.id 

932 session.commit() 

933 return True, None 

934 

935 # 4. Try extracting text from stored PDF 

936 pdf_storage_mode = self.settings.get_setting( 

937 "research_library.pdf_storage_mode", "none" 

938 ) 

939 pdf_manager = PDFStorageManager( 

940 library_root=self.library_root, 

941 storage_mode=pdf_storage_mode, 

942 ) 

943 pdf_content = pdf_manager.load_pdf(doc, session) 

944 

945 if not pdf_content: 

946 return ( 

947 False, 

948 f"Library document {doc_id} has no text or PDF content", 

949 ) 

950 

951 text = self._extract_text_from_pdf(pdf_content) 

952 if not text: 

953 return ( 

954 False, 

955 f"Failed to extract text from library document {doc_id} PDF", 

956 ) 

957 

958 # 5. Update Document directly (don't use _save_text_with_db — it 

959 # queries by resource_id which mismatches for library docs) 

960 doc.text_content = text 

961 doc.character_count = len(text) 

962 doc.word_count = len(text.split()) 

963 doc.extraction_method = "pdf_extraction" 

964 doc.extraction_source = "pdfplumber" 

965 doc.extraction_quality = "medium" 

966 

967 resource.document_id = doc.id 

968 session.commit() 

969 

970 logger.info( 

971 f"Extracted text from library document {doc_id} " 

972 f"({doc.word_count} words)" 

973 ) 

974 return True, None 

975 

976 def _try_existing_text( 

977 self, session, resource_id: int 

978 ) -> Optional[Tuple[bool, Optional[str]]]: 

979 """Check if text already exists in database (in Document.text_content).""" 

980 existing_doc = ( 

981 session.query(Document).filter_by(resource_id=resource_id).first() 

982 ) 

983 

984 if not existing_doc: 

985 return None 

986 

987 # Check if text content exists and extraction was successful 

988 if ( 

989 existing_doc.text_content 

990 and existing_doc.extraction_method 

991 and existing_doc.extraction_method != "failed" 

992 ): 

993 logger.info( 

994 f"Text content already exists in Document for resource_id={resource_id}, extraction_method={existing_doc.extraction_method}" 

995 ) 

996 return True, None 

997 

998 # No text content or failed extraction 

999 logger.debug( 

1000 f"Document exists but no valid text content: resource_id={resource_id}, extraction_method={existing_doc.extraction_method}" 

1001 ) 

1002 return None # Fall through to re-extraction 

1003 

1004 def _try_legacy_text_file( 

1005 self, session, resource, resource_id: int 

1006 ) -> Optional[Tuple[bool, Optional[str]]]: 

1007 """Check for legacy text files on disk.""" 

1008 txt_path = Path(self.library_root) / "txt" 

1009 existing_files = ( 

1010 list(txt_path.glob(f"*_{resource_id}.txt")) 

1011 if txt_path.exists() 

1012 else [] 

1013 ) 

1014 

1015 if not existing_files: 

1016 return None 

1017 

1018 logger.info(f"Text file already exists on disk: {existing_files[0]}") 

1019 self._create_text_document_record( 

1020 session, 

1021 resource, 

1022 existing_files[0], 

1023 extraction_method="unknown", 

1024 extraction_source="legacy_file", 

1025 ) 

1026 session.commit() 

1027 return True, None 

1028 

1029 def _try_existing_pdf_extraction( 

1030 self, session, resource, resource_id: int 

1031 ) -> Optional[Tuple[bool, Optional[str]]]: 

1032 """Try extracting text from existing PDF in database.""" 

1033 pdf_document = ( 

1034 session.query(Document).filter_by(resource_id=resource_id).first() 

1035 ) 

1036 

1037 if not pdf_document or pdf_document.status != "completed": 

1038 return None 

1039 

1040 # Validate path to prevent path traversal attacks 

1041 if ( 

1042 not pdf_document.file_path 

1043 or pdf_document.file_path in FILE_PATH_SENTINELS 

1044 ): 

1045 return None 

1046 try: 

1047 safe_path = PathValidator.validate_safe_path( 

1048 pdf_document.file_path, str(self.library_root) 

1049 ) 

1050 pdf_path = Path(safe_path) 

1051 except ValueError: 

1052 logger.warning(f"Path traversal blocked: {pdf_document.file_path}") 

1053 return None 

1054 if not pdf_path.is_file(): 

1055 return None 

1056 

1057 logger.info(f"Found existing PDF, extracting text from: {pdf_path}") 

1058 try: 

1059 with open(pdf_path, "rb") as f: 

1060 pdf_content = f.read() 

1061 text = self._extract_text_from_pdf(pdf_content) 

1062 

1063 if not text: 

1064 return None 

1065 

1066 self._save_text_with_db( 

1067 resource, 

1068 text, 

1069 session, 

1070 extraction_method="pdf_extraction", 

1071 extraction_source="pdfplumber", 

1072 pdf_document_id=pdf_document.id, 

1073 ) 

1074 session.commit() 

1075 return True, None 

1076 

1077 except Exception: 

1078 logger.exception( 

1079 f"Failed to extract text from existing PDF: {pdf_path}" 

1080 ) 

1081 return None # Fall through to other methods 

1082 

1083 def _try_api_text_extraction( 

1084 self, session, resource 

1085 ) -> Optional[Tuple[bool, Optional[str]]]: 

1086 """Try direct API text extraction.""" 

1087 logger.info( 

1088 f"Attempting direct API text extraction from: {resource.url}" 

1089 ) 

1090 

1091 downloader = self._get_downloader(resource.url) 

1092 if not downloader: 

1093 return None 

1094 

1095 result = downloader.download_with_result(resource.url, ContentType.TEXT) 

1096 

1097 if not result.is_success or not result.content: 

1098 return None 

1099 

1100 # Decode text content 

1101 text = ( 

1102 result.content.decode("utf-8", errors="ignore") 

1103 if isinstance(result.content, bytes) 

1104 else result.content 

1105 ) 

1106 

1107 # Determine extraction source 

1108 extraction_source = "unknown" 

1109 if isinstance(downloader, ArxivDownloader): 

1110 extraction_source = "arxiv_api" 

1111 elif isinstance(downloader, PubMedDownloader): 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true

1112 extraction_source = "pubmed_api" 

1113 

1114 try: 

1115 self._save_text_with_db( 

1116 resource, 

1117 text, 

1118 session, 

1119 extraction_method="native_api", 

1120 extraction_source=extraction_source, 

1121 ) 

1122 session.commit() 

1123 logger.info( 

1124 f"✓ SUCCESS: Got text from {extraction_source.upper()} API for '{resource.title[:50]}...'" 

1125 ) 

1126 return True, None 

1127 except Exception as e: 

1128 logger.exception(f"Failed to save text for resource {resource.id}") 

1129 # Sanitize error message before returning to API 

1130 safe_error = sanitize_for_log( 

1131 f"Failed to save text: {str(e)}", max_length=200 

1132 ) 

1133 return False, safe_error 

1134 

1135 def _fallback_pdf_extraction( 

1136 self, session, resource 

1137 ) -> Tuple[bool, Optional[str]]: 

1138 """Fallback: Download PDF to memory and extract text.""" 

1139 logger.info( 

1140 f"API text extraction failed, falling back to in-memory PDF download for: {resource.url}" 

1141 ) 

1142 

1143 downloader = self._get_downloader(resource.url) 

1144 if not downloader: 

1145 error_msg = "No compatible downloader found" 

1146 logger.warning( 

1147 f"✗ FAILED: {error_msg} for '{resource.title[:50]}...'" 

1148 ) 

1149 self._record_failed_text_extraction( 

1150 session, resource, error=error_msg 

1151 ) 

1152 session.commit() 

1153 return False, error_msg 

1154 

1155 result = downloader.download_with_result(resource.url, ContentType.PDF) 

1156 

1157 if not result.is_success or not result.content: 

1158 error_msg = result.skip_reason or "Failed to download PDF" 

1159 logger.warning( 

1160 f"✗ FAILED: Could not download PDF for '{resource.title[:50]}...' | Error: {error_msg}" 

1161 ) 

1162 self._record_failed_text_extraction( 

1163 session, resource, error=f"PDF download failed: {error_msg}" 

1164 ) 

1165 session.commit() 

1166 return False, f"PDF extraction failed: {error_msg}" 

1167 

1168 # Extract text from PDF 

1169 text = self._extract_text_from_pdf(result.content) 

1170 if not text: 

1171 error_msg = "PDF text extraction returned empty text" 

1172 logger.warning( 

1173 f"Failed to extract text from PDF for: {resource.url}" 

1174 ) 

1175 self._record_failed_text_extraction( 

1176 session, resource, error=error_msg 

1177 ) 

1178 session.commit() 

1179 return False, error_msg 

1180 

1181 try: 

1182 self._save_text_with_db( 

1183 resource, 

1184 text, 

1185 session, 

1186 extraction_method="pdf_extraction", 

1187 extraction_source="pdfplumber_fallback", 

1188 ) 

1189 session.commit() 

1190 logger.info( 

1191 f"✓ SUCCESS: Extracted text from '{resource.title[:50]}...'" 

1192 ) 

1193 return True, None 

1194 except Exception as e: 

1195 logger.exception(f"Failed to save text for resource {resource.id}") 

1196 # Sanitize error message before returning to API 

1197 safe_error = sanitize_for_log( 

1198 f"Failed to save text: {str(e)}", max_length=200 

1199 ) 

1200 return False, safe_error 

1201 

1202 def _get_downloader(self, url: str): 

1203 """ 

1204 Get the appropriate downloader for a URL. 

1205 

1206 Args: 

1207 url: The URL to download from 

1208 

1209 Returns: 

1210 The appropriate downloader instance or None 

1211 """ 

1212 for downloader in self.downloaders: 

1213 if downloader.can_handle(url): 

1214 return downloader 

1215 return None 

1216 

1217 def _download_generic(self, url: str) -> Optional[bytes]: 

1218 """Generic PDF download method.""" 

1219 try: 

1220 headers = { 

1221 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1222 } 

1223 response = safe_get( 

1224 url, headers=headers, timeout=30, allow_redirects=True 

1225 ) 

1226 response.raise_for_status() 

1227 

1228 # Verify it's a PDF 

1229 content_type = response.headers.get("Content-Type", "") 

1230 if ( 

1231 "pdf" not in content_type.lower() 

1232 and not response.content.startswith(b"%PDF") 

1233 ): 

1234 logger.warning(f"Response is not a PDF: {content_type}") 

1235 return None 

1236 

1237 return response.content 

1238 

1239 except Exception: 

1240 logger.exception("Generic download failed") 

1241 return None 

1242 

1243 def _download_arxiv(self, url: str) -> Optional[bytes]: 

1244 """Download from arXiv.""" 

1245 try: 

1246 # Convert abstract URL to PDF URL 

1247 pdf_url = url.replace("abs", "pdf") 

1248 if not pdf_url.endswith(".pdf"): 

1249 pdf_url += ".pdf" 

1250 

1251 return self._download_generic(pdf_url) 

1252 except Exception: 

1253 logger.exception("arXiv download failed") 

1254 return None 

1255 

1256 def _try_europe_pmc(self, pmid: str) -> Optional[bytes]: 

1257 """Try downloading from Europe PMC which often has better PDF availability.""" 

1258 try: 

1259 # Europe PMC API is more reliable for PDFs 

1260 # Check if PDF is available 

1261 api_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=EXT_ID:{pmid}&format=json" 

1262 response = safe_get(api_url, timeout=10) 

1263 

1264 if response.status_code == 200: 

1265 data = response.json() 

1266 results = data.get("resultList", {}).get("result", []) 

1267 

1268 if results: 

1269 article = results[0] 

1270 # Check if article has open access PDF 

1271 if ( 

1272 article.get("isOpenAccess") == "Y" 

1273 and article.get("hasPDF") == "Y" 

1274 ): 

1275 pmcid = article.get("pmcid") 

1276 if pmcid: 

1277 # Europe PMC PDF URL 

1278 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmcid}&blobtype=pdf" 

1279 logger.info( 

1280 f"Found Europe PMC PDF for PMID {pmid}: {pmcid}" 

1281 ) 

1282 

1283 headers = { 

1284 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1285 } 

1286 

1287 pdf_response = safe_get( 

1288 pdf_url, 

1289 headers=headers, 

1290 timeout=30, 

1291 allow_redirects=True, 

1292 ) 

1293 if pdf_response.status_code == 200: 1293 ↛ 1305line 1293 didn't jump to line 1305 because the condition on line 1293 was always true

1294 content_type = pdf_response.headers.get( 

1295 "content-type", "" 

1296 ) 

1297 if ( 1297 ↛ 1305line 1297 didn't jump to line 1305 because the condition on line 1297 was always true

1298 "pdf" in content_type.lower() 

1299 or len(pdf_response.content) > 1000 

1300 ): 

1301 return pdf_response.content 

1302 except Exception as e: 

1303 logger.debug(f"Europe PMC download failed: {e}") 

1304 

1305 return None 

1306 

1307 def _download_pubmed(self, url: str) -> Optional[bytes]: 

1308 """Download from PubMed/PubMed Central with rate limiting.""" 

1309 try: 

1310 # Apply rate limiting for PubMed requests 

1311 current_time = time.time() 

1312 time_since_last = current_time - self._last_pubmed_request 

1313 if time_since_last < self._pubmed_delay: 

1314 sleep_time = self._pubmed_delay - time_since_last 

1315 logger.debug( 

1316 f"Rate limiting: sleeping {sleep_time:.2f}s before PubMed request" 

1317 ) 

1318 time.sleep(sleep_time) 

1319 self._last_pubmed_request = time.time() 

1320 

1321 # If it's already a PMC article, download directly 

1322 if "/articles/PMC" in url: 

1323 pmc_match = re.search(r"(PMC\d+)", url) 

1324 if pmc_match: 1324 ↛ 1528line 1324 didn't jump to line 1528 because the condition on line 1324 was always true

1325 pmc_id = pmc_match.group(1) 

1326 

1327 # Try Europe PMC (more reliable) 

1328 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1329 headers = { 

1330 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1331 } 

1332 

1333 try: 

1334 response = safe_get( 

1335 europe_url, 

1336 headers=headers, 

1337 timeout=30, 

1338 allow_redirects=True, 

1339 ) 

1340 if response.status_code == 200: 1340 ↛ 1528line 1340 didn't jump to line 1528 because the condition on line 1340 was always true

1341 content_type = response.headers.get( 

1342 "content-type", "" 

1343 ) 

1344 if ( 1344 ↛ 1528line 1344 didn't jump to line 1528 because the condition on line 1344 was always true

1345 "pdf" in content_type.lower() 

1346 or len(response.content) > 1000 

1347 ): 

1348 logger.info( 

1349 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1350 ) 

1351 return response.content 

1352 except Exception as e: 

1353 logger.debug(f"Direct Europe PMC download failed: {e}") 

1354 return None 

1355 

1356 # If it's a regular PubMed URL, try to find PMC version 

1357 elif urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov": 

1358 # Extract PMID from URL 

1359 pmid_match = re.search(r"/(\d+)/?", url) 

1360 if pmid_match: 

1361 pmid = pmid_match.group(1) 

1362 logger.info(f"Attempting to download PDF for PMID: {pmid}") 

1363 

1364 # Try Europe PMC first (more reliable) 

1365 pdf_content = self._try_europe_pmc(pmid) 

1366 if pdf_content: 

1367 return pdf_content 

1368 

1369 # First try using NCBI E-utilities API to find PMC ID 

1370 try: 

1371 # Use elink to convert PMID to PMCID 

1372 elink_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi" 

1373 params = { 

1374 "dbfrom": "pubmed", 

1375 "db": "pmc", 

1376 "id": pmid, 

1377 "retmode": "json", 

1378 } 

1379 

1380 api_response = safe_get( 

1381 elink_url, params=params, timeout=10 

1382 ) 

1383 if api_response.status_code == 200: 1383 ↛ 1466line 1383 didn't jump to line 1466 because the condition on line 1383 was always true

1384 data = api_response.json() 

1385 # Parse the response to find PMC ID 

1386 link_sets = data.get("linksets", []) 

1387 if link_sets and "linksetdbs" in link_sets[0]: 

1388 for linksetdb in link_sets[0]["linksetdbs"]: 1388 ↛ 1466line 1388 didn't jump to line 1466 because the loop on line 1388 didn't complete

1389 if linksetdb.get( 1389 ↛ 1388line 1389 didn't jump to line 1388 because the condition on line 1389 was always true

1390 "dbto" 

1391 ) == "pmc" and linksetdb.get("links"): 

1392 pmc_id_num = linksetdb["links"][0] 

1393 # Now fetch PMC details to get the correct PMC ID format 

1394 esummary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi" 

1395 summary_params = { 

1396 "db": "pmc", 

1397 "id": pmc_id_num, 

1398 "retmode": "json", 

1399 } 

1400 summary_response = safe_get( 

1401 esummary_url, 

1402 params=summary_params, 

1403 timeout=10, 

1404 ) 

1405 if summary_response.status_code == 200: 1405 ↛ 1388line 1405 didn't jump to line 1388 because the condition on line 1405 was always true

1406 summary_data = ( 

1407 summary_response.json() 

1408 ) 

1409 result = summary_data.get( 

1410 "result", {} 

1411 ).get(str(pmc_id_num), {}) 

1412 if result: 1412 ↛ 1388line 1412 didn't jump to line 1388 because the condition on line 1412 was always true

1413 # PMC IDs in the API don't have the "PMC" prefix 

1414 pmc_id = f"PMC{pmc_id_num}" 

1415 logger.info( 

1416 f"Found PMC ID via API: {pmc_id} for PMID: {pmid}" 

1417 ) 

1418 

1419 # Try Europe PMC with the PMC ID 

1420 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1421 

1422 time.sleep(self._pubmed_delay) 

1423 

1424 headers = { 

1425 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1426 } 

1427 

1428 try: 

1429 response = safe_get( 

1430 europe_url, 

1431 headers=headers, 

1432 timeout=30, 

1433 allow_redirects=True, 

1434 ) 

1435 if ( 1435 ↛ 1388line 1435 didn't jump to line 1388 because the condition on line 1435 was always true

1436 response.status_code 

1437 == 200 

1438 ): 

1439 content_type = response.headers.get( 

1440 "content-type", "" 

1441 ) 

1442 if ( 1442 ↛ 1388line 1442 didn't jump to line 1388 because the condition on line 1442 was always true

1443 "pdf" 

1444 in content_type.lower() 

1445 or len( 

1446 response.content 

1447 ) 

1448 > 1000 

1449 ): 

1450 logger.info( 

1451 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1452 ) 

1453 return ( 

1454 response.content 

1455 ) 

1456 except Exception as e: 

1457 logger.debug( 

1458 f"Europe PMC download with PMC ID failed: {e}" 

1459 ) 

1460 except Exception as e: 

1461 logger.debug( 

1462 f"API lookup failed, trying webpage scraping: {e}" 

1463 ) 

1464 

1465 # Fallback to webpage scraping if API fails 

1466 try: 

1467 headers = { 

1468 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1469 } 

1470 response = safe_get(url, headers=headers, timeout=10) 

1471 if response.status_code == 200: 1471 ↛ 1528line 1471 didn't jump to line 1528 because the condition on line 1471 was always true

1472 # Look for PMC ID in the page 

1473 pmc_match = re.search(r"PMC\d+", response.text) 

1474 if pmc_match: 1474 ↛ 1513line 1474 didn't jump to line 1513 because the condition on line 1474 was always true

1475 pmc_id = pmc_match.group(0) 

1476 logger.info( 

1477 f"Found PMC ID via webpage: {pmc_id} for PMID: {pmid}" 

1478 ) 

1479 

1480 # Add delay before downloading PDF 

1481 time.sleep(self._pubmed_delay) 

1482 

1483 # Try Europe PMC with the PMC ID (more reliable) 

1484 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1485 headers = { 

1486 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1487 } 

1488 

1489 try: 

1490 response = safe_get( 

1491 europe_url, 

1492 headers=headers, 

1493 timeout=30, 

1494 allow_redirects=True, 

1495 ) 

1496 if response.status_code == 200: 1496 ↛ 1528line 1496 didn't jump to line 1528 because the condition on line 1496 was always true

1497 content_type = response.headers.get( 

1498 "content-type", "" 

1499 ) 

1500 if ( 1500 ↛ 1528line 1500 didn't jump to line 1528 because the condition on line 1500 was always true

1501 "pdf" in content_type.lower() 

1502 or len(response.content) > 1000 

1503 ): 

1504 logger.info( 

1505 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1506 ) 

1507 return response.content 

1508 except Exception as e: 

1509 logger.debug( 

1510 f"Europe PMC download failed: {e}" 

1511 ) 

1512 else: 

1513 logger.info( 

1514 f"No PMC version found for PMID: {pmid}" 

1515 ) 

1516 except requests.exceptions.HTTPError as e: 

1517 if e.response.status_code == 429: 1517 ↛ 1524line 1517 didn't jump to line 1524 because the condition on line 1517 was always true

1518 logger.warning( 

1519 "Rate limited by PubMed, increasing delay" 

1520 ) 

1521 self._pubmed_delay = min( 

1522 self._pubmed_delay * 2, 5.0 

1523 ) # Max 5 seconds 

1524 raise 

1525 except Exception as e: 

1526 logger.debug(f"Could not check for PMC version: {e}") 

1527 

1528 return self._download_generic(url) 

1529 except Exception: 

1530 logger.exception("PubMed download failed") 

1531 return None 

1532 

1533 def _download_semantic_scholar(self, url: str) -> Optional[bytes]: 

1534 """Download from Semantic Scholar.""" 

1535 # Semantic Scholar doesn't host PDFs directly 

1536 # Would need to extract actual PDF URL from page 

1537 return None 

1538 

1539 def _download_biorxiv(self, url: str) -> Optional[bytes]: 

1540 """Download from bioRxiv.""" 

1541 try: 

1542 # Convert to PDF URL 

1543 pdf_url = url.replace(".org/", ".org/content/") 

1544 pdf_url = re.sub(r"v\d+$", "", pdf_url) # Remove version 

1545 pdf_url += ".full.pdf" 

1546 

1547 return self._download_generic(pdf_url) 

1548 except Exception: 

1549 logger.exception("bioRxiv download failed") 

1550 return None 

1551 

1552 def _save_text_with_db( 

1553 self, 

1554 resource: ResearchResource, 

1555 text: str, 

1556 session: Session, 

1557 extraction_method: str, 

1558 extraction_source: str, 

1559 pdf_document_id: Optional[int] = None, 

1560 ) -> Optional[str]: 

1561 """ 

1562 Save extracted text to encrypted database. 

1563 

1564 Args: 

1565 resource: The research resource 

1566 text: Extracted text content 

1567 session: Database session 

1568 extraction_method: How the text was extracted 

1569 extraction_source: Specific tool/API used 

1570 pdf_document_id: ID of PDF document if extracted from PDF 

1571 

1572 Returns: 

1573 None (previously returned text file path, now removed) 

1574 """ 

1575 try: 

1576 # Calculate text metadata for database 

1577 word_count = len(text.split()) 

1578 character_count = len(text) 

1579 

1580 # Find the document by pdf_document_id or resource_id 

1581 doc = None 

1582 if pdf_document_id: 

1583 doc = ( 

1584 session.query(Document) 

1585 .filter_by(id=pdf_document_id) 

1586 .first() 

1587 ) 

1588 else: 

1589 doc = get_document_for_resource(session, resource) 

1590 

1591 if doc: 

1592 # Update existing document with extracted text. Only replace 

1593 # document_hash when transitioning from FAILED — see issue 

1594 # #3827. PDF-bytes hashes set at creation must remain stable; 

1595 # text-content hashes collide far more often (identical 

1596 # extracted text from different PDFs). 

1597 was_failed = doc.status == DocumentStatus.FAILED 

1598 doc.text_content = text 

1599 doc.character_count = character_count 

1600 doc.word_count = word_count 

1601 doc.extraction_method = extraction_method 

1602 doc.extraction_source = extraction_source 

1603 doc.status = DocumentStatus.COMPLETED 

1604 if was_failed: 

1605 doc.document_hash = hashlib.sha256( 

1606 text.encode() 

1607 ).hexdigest() 

1608 doc.processed_at = datetime.now(UTC) 

1609 

1610 # Set quality based on method 

1611 if extraction_method == "native_api": 

1612 doc.extraction_quality = "high" 

1613 elif ( 

1614 extraction_method == "pdf_extraction" 

1615 and extraction_source == "pdfplumber" 

1616 ): 

1617 doc.extraction_quality = "medium" 

1618 else: 

1619 doc.extraction_quality = "low" 

1620 

1621 logger.debug( 

1622 f"Updated document {doc.id} with extracted text ({word_count} words)" 

1623 ) 

1624 else: 

1625 # Create a new Document for text-only extraction 

1626 # Generate hash from text content 

1627 text_hash = hashlib.sha256(text.encode()).hexdigest() 

1628 

1629 # Dedup against existing content hash. If another resource 

1630 # already produced identical extracted text, link this 

1631 # resource to the canonical Document instead of inserting 

1632 # a duplicate that would violate the UNIQUE constraint 

1633 # (issue #3827). Mirrors research_history_indexer.py:322. 

1634 existing_by_hash = ( 

1635 session.query(Document) 

1636 .filter_by(document_hash=text_hash) 

1637 .first() 

1638 ) 

1639 if existing_by_hash: 

1640 resource.document_id = existing_by_hash.id 

1641 library_collection = ( 

1642 session.query(Collection) 

1643 .filter_by(name="Library") 

1644 .first() 

1645 ) 

1646 if library_collection: 1646 ↛ 1653line 1646 didn't jump to line 1653 because the condition on line 1646 was always true

1647 ensure_in_collection( 

1648 session, 

1649 existing_by_hash.id, 

1650 library_collection.id, 

1651 ) 

1652 else: 

1653 logger.warning( 

1654 f"Library collection not found - deduped document {existing_by_hash.id} will not be linked to default collection" 

1655 ) 

1656 logger.info( 

1657 f"Linked resource {resource.id} to existing Document " 

1658 f"{existing_by_hash.id} (matched on content hash)" 

1659 ) 

1660 return None 

1661 

1662 # Get source type for research downloads 

1663 try: 

1664 source_type_id = get_source_type_id( 

1665 self.username, "research_download", self.password 

1666 ) 

1667 except Exception: 

1668 logger.exception( 

1669 "Failed to get source type for text document" 

1670 ) 

1671 raise 

1672 

1673 # Create new document 

1674 doc_id = str(uuid.uuid4()) 

1675 doc = Document( 

1676 id=doc_id, 

1677 source_type_id=source_type_id, 

1678 resource_id=resource.id, 

1679 research_id=resource.research_id, 

1680 document_hash=text_hash, 

1681 original_url=resource.url, 

1682 file_path=FILE_PATH_TEXT_ONLY, 

1683 file_size=character_count, # Use character count as file size for text-only 

1684 file_type="text", 

1685 mime_type="text/plain", 

1686 title=resource.title, 

1687 text_content=text, 

1688 character_count=character_count, 

1689 word_count=word_count, 

1690 extraction_method=extraction_method, 

1691 extraction_source=extraction_source, 

1692 extraction_quality="high" 

1693 if extraction_method == "native_api" 

1694 else "medium", 

1695 status=DocumentStatus.COMPLETED, 

1696 processed_at=datetime.now(UTC), 

1697 ) 

1698 session.add(doc) 

1699 

1700 # Link to default Library collection 

1701 library_collection = ( 

1702 session.query(Collection).filter_by(name="Library").first() 

1703 ) 

1704 if library_collection: 

1705 ensure_in_collection(session, doc_id, library_collection.id) 

1706 else: 

1707 logger.warning( 

1708 f"Library collection not found - document {doc_id} will not be linked to default collection" 

1709 ) 

1710 

1711 logger.info( 

1712 f"Created new document {doc_id} for text-only extraction ({word_count} words)" 

1713 ) 

1714 

1715 logger.info( 

1716 f"Saved text to encrypted database ({word_count} words)" 

1717 ) 

1718 return None 

1719 

1720 except Exception: 

1721 # Rollback BEFORE re-raising so the shared thread-local session 

1722 # is clean by the time the caller's loop reaches its next 

1723 # iteration. Without this, an IntegrityError leaves the session 

1724 # in PendingRollbackError state and every subsequent ORM access 

1725 # cascades (issue #3827). 

1726 safe_rollback(session, "_save_text_with_db") 

1727 logger.exception("Error saving text to encrypted database") 

1728 raise # Re-raise so caller can handle the error 

1729 

1730 def _create_text_document_record( 

1731 self, 

1732 session: Session, 

1733 resource: ResearchResource, 

1734 file_path: Path, 

1735 extraction_method: str, 

1736 extraction_source: str, 

1737 ): 

1738 """Update existing Document with text from file (for legacy text files).""" 

1739 try: 

1740 # Read file to get metadata 

1741 text = file_path.read_text(encoding="utf-8", errors="ignore") 

1742 word_count = len(text.split()) 

1743 character_count = len(text) 

1744 

1745 # Find the Document for this resource 

1746 doc = get_document_for_resource(session, resource) 

1747 

1748 if doc: 

1749 # Update existing document with text content 

1750 doc.text_content = text 

1751 doc.character_count = character_count 

1752 doc.word_count = word_count 

1753 doc.extraction_method = extraction_method 

1754 doc.extraction_source = extraction_source 

1755 doc.extraction_quality = ( 

1756 "low" # Unknown quality for legacy files 

1757 ) 

1758 logger.info( 

1759 f"Updated document {doc.id} with text from file: {file_path.name}" 

1760 ) 

1761 else: 

1762 logger.warning( 

1763 f"No document found to update for resource {resource.id}" 

1764 ) 

1765 

1766 except Exception: 

1767 logger.exception("Error updating document with text from file") 

1768 

1769 def _record_failed_text_extraction( 

1770 self, session: Session, resource: ResearchResource, error: str 

1771 ): 

1772 """Record a failed text extraction attempt in the Document.""" 

1773 try: 

1774 # Find the Document for this resource 

1775 doc = get_document_for_resource(session, resource) 

1776 

1777 if doc: 

1778 # Update document with extraction error 

1779 doc.error_message = error 

1780 doc.extraction_method = "failed" 

1781 doc.extraction_quality = "low" 

1782 doc.status = DocumentStatus.FAILED 

1783 logger.info( 

1784 f"Recorded failed text extraction for document {doc.id}: {error}" 

1785 ) 

1786 else: 

1787 # Create a new Document for failed extraction 

1788 # This enables tracking failures and retry capability 

1789 source_type_id = get_source_type_id( 

1790 self.username, "research_download", self.password 

1791 ) 

1792 

1793 # Deterministic hash so retries update the same record 

1794 failed_hash = hashlib.sha256( 

1795 f"failed:{resource.url}:{resource.id}".encode() 

1796 ).hexdigest() 

1797 

1798 doc_id = str(uuid.uuid4()) 

1799 doc = Document( 

1800 id=doc_id, 

1801 source_type_id=source_type_id, 

1802 resource_id=resource.id, 

1803 research_id=resource.research_id, 

1804 document_hash=failed_hash, 

1805 original_url=resource.url, 

1806 file_path=None, 

1807 file_size=0, 

1808 file_type="unknown", 

1809 title=resource.title, 

1810 status=DocumentStatus.FAILED, 

1811 error_message=error, 

1812 extraction_method="failed", 

1813 extraction_quality="low", 

1814 processed_at=datetime.now(UTC), 

1815 ) 

1816 session.add(doc) 

1817 

1818 logger.info( 

1819 f"Created failed document {doc_id} for resource {resource.id}: {error}" 

1820 ) 

1821 

1822 except Exception: 

1823 logger.exception("Error recording failed text extraction")