Coverage for src/local_deep_research/research_library/services/download_service.py: 95%
717 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2PDF Download Service for Research Library
4Handles downloading PDFs from various academic sources with:
5- Deduplication using download tracker
6- Source-specific download strategies (arXiv, PubMed, etc.)
7- Progress tracking and error handling
8- File organization and storage
9"""
11import hashlib
12import os
13import re
14import time
15import uuid
16from datetime import datetime, UTC
17from pathlib import Path
18from typing import Optional, Tuple
19from urllib.parse import urlparse
21import requests
22from loguru import logger
23from sqlalchemy.orm import Session
24import pdfplumber
25from pypdf import PdfReader
27from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY
28from ...database.models.download_tracker import (
29 DownloadAttempt,
30 DownloadTracker,
31)
32from ...security import safe_get, sanitize_for_log
33from ...security.path_validator import PathValidator
34from ...database.models.library import (
35 Collection,
36 Document as Document,
37 DocumentStatus,
38 DownloadQueue as LibraryDownloadQueue,
39)
40from .pdf_storage_manager import DEFAULT_MAX_PDF_SIZE_MB, PDFStorageManager
41from ...database.models.research import ResearchResource
42from ...database.library_init import get_source_type_id, get_default_library_id
43from ...database.session_context import get_user_db_session, safe_rollback
44from ...utilities.db_utils import get_settings_manager
45from ...library.download_management import RetryManager
46from ...config.paths import get_library_directory
47from ..utils import (
48 ensure_in_collection,
49 get_document_for_resource,
50 get_url_hash,
51 get_absolute_path_from_settings,
52 is_downloadable_url,
53)
55# Import our modular downloaders
56from ..downloaders import (
57 ContentType,
58 ArxivDownloader,
59 PubMedDownloader,
60 BioRxivDownloader,
61 DirectPDFDownloader,
62 SemanticScholarDownloader,
63 OpenAlexDownloader,
64 GenericDownloader,
65)
68class DownloadService:
69 """Service for downloading and managing research PDFs."""
71 def __init__(self, username: str, password: Optional[str] = None):
72 """Initialize download service for a user.
74 Args:
75 username: The username to download for
76 password: Optional password for encrypted database access
77 """
78 self.username = username
79 self.password = password
80 self.settings = get_settings_manager(username=username)
81 self._closed = False
83 # Debug settings manager and user context
84 logger.info(
85 f"[DOWNLOAD_SERVICE] Settings manager initialized: {type(self.settings)}, username: {self.username}"
86 )
88 # Get library path from settings (uses centralized path, respects LDR_DATA_DIR)
89 storage_path_setting = self.settings.get_setting(
90 "research_library.storage_path",
91 str(get_library_directory()),
92 )
93 logger.warning(
94 f"[DOWNLOAD_SERVICE_INIT] Storage path setting retrieved: {storage_path_setting} (type: {type(storage_path_setting)})"
95 )
97 if storage_path_setting is None:
98 logger.error(
99 "[DOWNLOAD_SERVICE_INIT] CRITICAL: storage_path_setting is None!"
100 )
101 raise ValueError("Storage path setting cannot be None")
103 self.library_root = str(
104 Path(os.path.expandvars(storage_path_setting))
105 .expanduser()
106 .resolve()
107 )
108 logger.warning(
109 f"[DOWNLOAD_SERVICE_INIT] Library root resolved to: {self.library_root}"
110 )
112 # Create directory structure
113 self._setup_directories()
115 # Initialize modular downloaders
116 # DirectPDFDownloader first for efficiency with direct PDF links
118 # Get Semantic Scholar API key from settings
119 semantic_scholar_api_key = self.settings.get_setting(
120 "search.engine.web.semantic_scholar.api_key", ""
121 )
123 self.downloaders = [
124 DirectPDFDownloader(timeout=30), # Handle direct PDF links first
125 SemanticScholarDownloader(
126 timeout=30,
127 api_key=semantic_scholar_api_key
128 if semantic_scholar_api_key
129 else None,
130 ),
131 OpenAlexDownloader(
132 timeout=30
133 ), # OpenAlex with API lookup (no key needed)
134 ArxivDownloader(timeout=30),
135 PubMedDownloader(timeout=30, rate_limit_delay=1.0),
136 BioRxivDownloader(timeout=30),
137 GenericDownloader(timeout=30), # Generic should be last (fallback)
138 ]
140 # Initialize retry manager for smart failure tracking
141 self.retry_manager = RetryManager(username, password)
142 logger.info(
143 f"[DOWNLOAD_SERVICE] Initialized retry manager for user: {username}"
144 )
146 # PubMed rate limiting state
147 self._pubmed_delay = 1.0 # 1 second delay for PubMed
148 self._last_pubmed_request = 0.0 # Track last request time
150 def close(self):
151 """Close all downloader resources."""
152 if self._closed:
153 return
154 self._closed = True
156 from ...utilities.resource_utils import safe_close
158 for downloader in self.downloaders:
159 safe_close(downloader, "downloader")
161 # Close the settings manager's DB session to return the connection
162 # to the pool. SettingsManager.close() is idempotent.
163 safe_close(self.settings, "settings manager", allow_none=True)
165 # Clear references to allow garbage collection
166 self.downloaders = []
167 self.retry_manager = None
168 self.settings = None
170 def __enter__(self):
171 """Enter context manager."""
172 return self
174 def __exit__(self, exc_type, exc_val, exc_tb):
175 """Exit context manager, ensuring cleanup."""
176 self.close()
177 return False
179 def _setup_directories(self):
180 """Create library directory structure."""
181 # Only create the root and pdfs folder - flat structure
182 paths = [
183 self.library_root,
184 str(Path(self.library_root) / "pdfs"),
185 ]
186 for path in paths:
187 Path(path).mkdir(parents=True, exist_ok=True)
189 def _normalize_url(self, url: str) -> str:
190 """Normalize URL for consistent hashing."""
191 # Remove protocol variations
192 url = re.sub(r"^https?://", "", url)
193 # Remove www
194 url = re.sub(r"^www\.", "", url)
195 # Remove trailing slashes
196 url = url.rstrip("/")
197 # Sort query parameters
198 if "?" in url:
199 base, query = url.split("?", 1)
200 params = sorted(query.split("&"))
201 url = f"{base}?{'&'.join(params)}"
202 return url.lower()
204 def _get_url_hash(self, url: str) -> str:
205 """Generate SHA256 hash of normalized URL."""
206 normalized = self._normalize_url(url)
207 return get_url_hash(normalized)
209 def is_already_downloaded(self, url: str) -> Tuple[bool, Optional[str]]:
210 """
211 Check if URL is already downloaded.
213 Returns:
214 Tuple of (is_downloaded, file_path)
215 """
216 url_hash = self._get_url_hash(url)
218 with get_user_db_session(self.username, self.password) as session:
219 tracker = (
220 session.query(DownloadTracker)
221 .filter_by(url_hash=url_hash, is_downloaded=True)
222 .first()
223 )
225 if tracker and tracker.file_path:
226 # Compute absolute path and verify file still exists
227 absolute_path = get_absolute_path_from_settings(
228 tracker.file_path
229 )
230 if absolute_path and absolute_path.is_file():
231 return True, str(absolute_path)
232 if absolute_path:
233 # File was deleted, mark as not downloaded
234 tracker.is_downloaded = False
235 session.commit()
236 # If absolute_path is None, path was blocked - treat as not downloaded
238 return False, None
240 def get_text_content(self, resource_id: int) -> Optional[str]:
241 """
242 Get text content for a research resource.
244 This will try to:
245 1. Fetch text directly from APIs if available
246 2. Extract text from downloaded PDF if exists
247 3. Download PDF and extract text if not yet downloaded
249 Args:
250 resource_id: ID of the research resource
252 Returns:
253 Text content as string, or None if extraction failed
254 """
255 with get_user_db_session(self.username, self.password) as session:
256 resource = session.query(ResearchResource).get(resource_id)
257 if not resource:
258 logger.error(f"Resource {resource_id} not found")
259 return None
261 url = resource.url
263 # Find appropriate downloader
264 for downloader in self.downloaders:
265 if downloader.can_handle(url):
266 logger.info(
267 f"Using {downloader.__class__.__name__} for text extraction from {url}"
268 )
269 try:
270 # Try to get text content
271 text = downloader.download_text(url)
272 if text:
273 logger.info(
274 f"Successfully extracted text for: {resource.title[:50]}"
275 )
276 return text
277 except Exception:
278 logger.exception("Failed to extract text")
279 break
281 logger.warning(f"Could not extract text for {url}")
282 return None
284 def queue_research_downloads(
285 self, research_id: str, collection_id: Optional[str] = None
286 ) -> int:
287 """
288 Queue all downloadable PDFs from a research session.
290 Args:
291 research_id: The research session ID
292 collection_id: Optional target collection ID (defaults to Library if not provided)
294 Returns:
295 Number of items queued
296 """
297 queued = 0
299 # Get default library collection if no collection_id provided
300 if not collection_id:
301 from ...database.library_init import get_default_library_id
303 collection_id = get_default_library_id(self.username, self.password)
305 with get_user_db_session(self.username, self.password) as session:
306 # Get all resources for this research
307 resources = (
308 session.query(ResearchResource)
309 .filter_by(research_id=research_id)
310 .all()
311 )
313 for resource in resources:
314 if self._is_downloadable(resource):
315 # Library resources linked via document_id are already done
316 if resource.document_id:
317 continue
319 # Check if already queued
320 existing_queue = (
321 session.query(LibraryDownloadQueue)
322 .filter_by(
323 resource_id=resource.id,
324 status=DocumentStatus.PENDING,
325 )
326 .first()
327 )
329 # Check if already downloaded (trust the database status)
330 existing_doc = (
331 session.query(Document)
332 .filter_by(
333 resource_id=resource.id,
334 status=DocumentStatus.COMPLETED,
335 )
336 .first()
337 )
339 # Queue if not already queued and not marked as completed
340 if not existing_queue and not existing_doc: 340 ↛ 313line 340 didn't jump to line 313 because the condition on line 340 was always true
341 # Check one more time if ANY queue entry exists (regardless of status)
342 any_queue = (
343 session.query(LibraryDownloadQueue)
344 .filter_by(resource_id=resource.id)
345 .first()
346 )
348 if any_queue:
349 # Reset the existing queue entry
350 any_queue.status = DocumentStatus.PENDING
351 any_queue.research_id = research_id
352 any_queue.collection_id = collection_id
353 queued += 1
354 else:
355 # Add new queue entry
356 queue_entry = LibraryDownloadQueue(
357 resource_id=resource.id,
358 research_id=research_id,
359 collection_id=collection_id,
360 priority=0,
361 status=DocumentStatus.PENDING,
362 )
363 session.add(queue_entry)
364 queued += 1
366 session.commit()
367 logger.info(
368 f"Queued {queued} downloads for research {research_id} to collection {collection_id}"
369 )
371 return queued
373 def _is_downloadable(self, resource: ResearchResource) -> bool:
374 """Check if a resource is likely downloadable as PDF.
376 Delegates to the consolidated is_downloadable_url() from utils.
377 """
378 return is_downloadable_url(resource.url)
380 def download_resource(self, resource_id: int) -> Tuple[bool, Optional[str]]:
381 """
382 Download a specific resource.
384 Returns:
385 Tuple of (success: bool, skip_reason: str or None)
386 """
387 with get_user_db_session(self.username, self.password) as session:
388 resource = session.query(ResearchResource).get(resource_id)
389 if not resource:
390 logger.error(f"Resource {resource_id} not found")
391 return False, "Resource not found"
393 # Check if already downloaded (trust the database after sync)
394 existing_doc = (
395 session.query(Document)
396 .filter_by(
397 resource_id=resource_id, status=DocumentStatus.COMPLETED
398 )
399 .first()
400 )
402 if existing_doc:
403 logger.info(
404 "Resource already downloaded (according to database)"
405 )
406 return True, None
408 # Get collection_id from queue entry if it exists
409 queue_entry = (
410 session.query(LibraryDownloadQueue)
411 .filter_by(resource_id=resource_id)
412 .first()
413 )
414 collection_id = (
415 queue_entry.collection_id
416 if queue_entry and queue_entry.collection_id
417 else None
418 )
420 # Create download tracker entry
421 url_hash = self._get_url_hash(resource.url)
422 tracker = (
423 session.query(DownloadTracker)
424 .filter_by(url_hash=url_hash)
425 .first()
426 )
428 if not tracker:
429 tracker = DownloadTracker(
430 url=resource.url,
431 url_hash=url_hash,
432 first_resource_id=resource.id,
433 is_downloaded=False,
434 )
435 session.add(tracker)
436 session.commit()
438 # Attempt download
439 success, skip_reason, status_code = self._download_pdf(
440 resource, tracker, session, collection_id
441 )
443 # Record attempt with retry manager for smart failure tracking
444 self.retry_manager.record_attempt(
445 resource_id=resource.id,
446 result=(success, skip_reason),
447 status_code=status_code,
448 url=resource.url,
449 details=skip_reason
450 or (
451 "Successfully downloaded" if success else "Download failed"
452 ),
453 session=session,
454 )
456 # Update queue status if exists
457 queue_entry = (
458 session.query(LibraryDownloadQueue)
459 .filter_by(resource_id=resource_id)
460 .first()
461 )
463 if queue_entry:
464 queue_entry.status = (
465 DocumentStatus.COMPLETED
466 if success
467 else DocumentStatus.FAILED
468 )
469 queue_entry.completed_at = datetime.now(UTC)
471 session.commit()
473 # Trigger auto-indexing for successfully downloaded documents
474 if success and self.password:
475 try:
476 from ..routes.rag_routes import trigger_auto_index
477 from ...database.library_init import get_default_library_id
479 # Get the document that was just created
480 doc = (
481 session.query(Document)
482 .filter_by(resource_id=resource_id)
483 .order_by(Document.created_at.desc())
484 .first()
485 )
486 if doc:
487 # Use collection_id from queue entry or default Library
488 # NB: pass username string, not the SQLAlchemy session
489 target_collection = (
490 collection_id
491 or get_default_library_id(
492 self.username, self.password
493 )
494 )
495 if target_collection: 495 ↛ anywhereline 495 didn't jump anywhere: it always raised an exception.
496 trigger_auto_index(
497 [doc.id],
498 target_collection,
499 self.username,
500 self.password,
501 )
502 except Exception:
503 logger.exception("Failed to trigger auto-indexing")
505 return success, skip_reason
507 def _download_pdf(
508 self,
509 resource: ResearchResource,
510 tracker: DownloadTracker,
511 session: Session,
512 collection_id: Optional[str] = None,
513 ) -> Tuple[bool, Optional[str], Optional[int]]:
514 """
515 Perform the actual PDF download.
517 Args:
518 resource: The research resource to download
519 tracker: Download tracker for this URL
520 session: Database session
521 collection_id: Optional target collection ID (defaults to Library if not provided)
523 Returns:
524 Tuple of (success: bool, skip_reason: Optional[str], status_code: Optional[int])
525 """
526 url = resource.url
528 # Log attempt
529 attempt = DownloadAttempt(
530 url_hash=tracker.url_hash,
531 attempt_number=tracker.download_attempts.count() + 1
532 if hasattr(tracker, "download_attempts")
533 else 1,
534 attempted_at=datetime.now(UTC),
535 )
536 session.add(attempt)
538 try:
539 # Use modular downloaders with detailed skip reasons
540 pdf_content = None
541 downloader_used = None
542 skip_reason = None
543 status_code = None
545 for downloader in self.downloaders:
546 if downloader.can_handle(url):
547 logger.info(
548 f"Using {downloader.__class__.__name__} for {url}"
549 )
550 result = downloader.download_with_result(
551 url, ContentType.PDF
552 )
553 downloader_used = downloader.__class__.__name__
555 if result.is_success and result.content:
556 pdf_content = result.content
557 status_code = result.status_code
558 break
559 if result.skip_reason: 559 ↛ 545line 559 didn't jump to line 545 because the condition on line 559 was always true
560 skip_reason = result.skip_reason
561 status_code = result.status_code
562 logger.info(f"Download skipped: {skip_reason}")
563 # Keep trying other downloaders unless it's the GenericDownloader
564 if isinstance(downloader, GenericDownloader):
565 break
567 if not downloader_used:
568 logger.error(f"No downloader found for {url}")
569 skip_reason = "No compatible downloader available"
571 if not pdf_content:
572 error_msg = skip_reason or "Failed to download PDF content"
573 # Store skip reason in attempt for retrieval
574 attempt.error_message = error_msg
575 attempt.succeeded = False
576 session.commit()
577 logger.info(f"Download failed with reason: {error_msg}")
578 return False, error_msg, status_code
580 # Get PDF storage mode setting
581 pdf_storage_mode = self.settings.get_setting(
582 "research_library.pdf_storage_mode", "none"
583 )
584 max_pdf_size_mb = int(
585 self.settings.get_setting(
586 "research_library.max_pdf_size_mb",
587 DEFAULT_MAX_PDF_SIZE_MB,
588 )
589 )
590 logger.info(
591 f"[DOWNLOAD_SERVICE] PDF storage mode: {pdf_storage_mode}"
592 )
594 # Update tracker
595 import hashlib
597 tracker.file_hash = hashlib.sha256(pdf_content).hexdigest()
598 tracker.file_size = len(pdf_content)
599 tracker.is_downloaded = True
600 tracker.downloaded_at = datetime.now(UTC)
602 # Initialize PDF storage manager
603 pdf_storage_manager = PDFStorageManager(
604 library_root=self.library_root,
605 storage_mode=pdf_storage_mode,
606 max_pdf_size_mb=max_pdf_size_mb,
607 )
609 # Update attempt with success info
610 attempt.succeeded = True
612 # Check if library document already exists
613 existing_doc = get_document_for_resource(session, resource)
615 if existing_doc:
616 # Update existing document. Only replace document_hash when
617 # transitioning from FAILED — that's the placeholder hash from
618 # _record_failed_text_extraction. For any other prior state
619 # the hash is already a real content hash and clobbering it
620 # risks UNIQUE-constraint collisions (issue #3827).
621 was_failed = existing_doc.status == DocumentStatus.FAILED
622 if was_failed: 622 ↛ 623line 622 didn't jump to line 623 because the condition on line 622 was never true
623 existing_doc.document_hash = tracker.file_hash
624 existing_doc.file_size = len(pdf_content)
625 existing_doc.status = DocumentStatus.COMPLETED
626 existing_doc.processed_at = datetime.now(UTC)
628 # Save PDF using storage manager (updates storage_mode and file_path)
629 file_path_result, _ = pdf_storage_manager.save_pdf(
630 pdf_content=pdf_content,
631 document=existing_doc,
632 session=session,
633 filename=f"{resource.id}.pdf",
634 url=url,
635 resource_id=resource.id,
636 )
638 # Update tracker
639 tracker.file_path = (
640 file_path_result if file_path_result else None
641 )
642 tracker.file_name = (
643 Path(file_path_result).name
644 if file_path_result and file_path_result != "database"
645 else None
646 )
647 else:
648 # Get source type ID for research downloads
649 try:
650 source_type_id = get_source_type_id(
651 self.username, "research_download", self.password
652 )
653 # Use provided collection_id or default to Library
654 library_collection_id = (
655 collection_id
656 or get_default_library_id(self.username, self.password)
657 )
658 except Exception:
659 logger.exception(
660 "Failed to get source type or library collection"
661 )
662 raise
664 # Create new unified document entry
665 doc_id = str(uuid.uuid4())
666 doc = Document(
667 id=doc_id,
668 source_type_id=source_type_id,
669 resource_id=resource.id,
670 research_id=resource.research_id,
671 document_hash=tracker.file_hash,
672 original_url=url,
673 file_size=len(pdf_content),
674 file_type="pdf",
675 mime_type="application/pdf",
676 title=resource.title,
677 status=DocumentStatus.COMPLETED,
678 processed_at=datetime.now(UTC),
679 storage_mode=pdf_storage_mode,
680 )
681 session.add(doc)
682 session.flush() # Ensure doc.id is available for blob storage
684 # Save PDF using storage manager (updates storage_mode and file_path)
685 file_path_result, _ = pdf_storage_manager.save_pdf(
686 pdf_content=pdf_content,
687 document=doc,
688 session=session,
689 filename=f"{resource.id}.pdf",
690 url=url,
691 resource_id=resource.id,
692 )
694 # Update tracker
695 tracker.file_path = (
696 file_path_result if file_path_result else None
697 )
698 tracker.file_name = (
699 Path(file_path_result).name
700 if file_path_result and file_path_result != "database"
701 else None
702 )
704 # Link document to default Library collection
705 ensure_in_collection(session, doc_id, library_collection_id)
707 # Update attempt
708 attempt.succeeded = True
709 attempt.bytes_downloaded = len(pdf_content)
711 if pdf_storage_mode == "database":
712 logger.info(
713 f"Successfully stored PDF in database: {resource.url}"
714 )
715 elif pdf_storage_mode == "filesystem":
716 logger.info(f"Successfully downloaded: {tracker.file_path}")
717 else:
718 logger.info(f"Successfully extracted text from: {resource.url}")
720 # Automatically extract and save text after successful PDF download
721 try:
722 logger.info(
723 f"Extracting text from downloaded PDF for: {resource.title[:50]}"
724 )
725 text = self._extract_text_from_pdf(pdf_content)
727 if text:
728 # Get the document ID we just created/updated
729 pdf_doc = get_document_for_resource(session, resource)
730 pdf_document_id = pdf_doc.id if pdf_doc else None
732 # Save text to encrypted database
733 self._save_text_with_db(
734 resource=resource,
735 text=text,
736 session=session,
737 extraction_method="pdf_extraction",
738 extraction_source="local_pdf",
739 pdf_document_id=pdf_document_id,
740 )
741 logger.info(
742 f"Successfully extracted and saved text for: {resource.title[:50]}"
743 )
744 else:
745 logger.warning(
746 f"Text extraction returned empty text for: {resource.title[:50]}"
747 )
748 except Exception:
749 logger.exception(
750 "Failed to extract text from PDF, but PDF download succeeded"
751 )
752 # Don't fail the entire download if text extraction fails
754 return True, None, status_code
756 except Exception as e:
757 logger.exception(f"Download failed for {url}")
758 attempt.succeeded = False
759 attempt.error_type = type(e).__name__
760 attempt.error_message = sanitize_for_log(str(e), max_length=200)
761 tracker.is_accessible = False
762 # Sanitize error message before returning to API
763 safe_error = attempt.error_message
764 return False, safe_error, None
766 def _extract_text_from_pdf(self, pdf_content: bytes) -> Optional[str]:
767 """
768 Extract text from PDF content using multiple methods for best results.
770 Args:
771 pdf_content: Raw PDF bytes
773 Returns:
774 Extracted text or None if extraction fails
775 """
776 try:
777 # First try with pdfplumber (better for complex layouts)
778 import io
780 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf:
781 text_parts = []
782 for page in pdf.pages:
783 page_text = page.extract_text()
784 if page_text:
785 text_parts.append(page_text)
787 if text_parts:
788 return "\n\n".join(text_parts)
790 # Fallback to PyPDF if pdfplumber fails
791 reader = PdfReader(io.BytesIO(pdf_content))
792 text_parts = []
793 for page in reader.pages:
794 text = page.extract_text()
795 if text:
796 text_parts.append(text)
798 if text_parts:
799 return "\n\n".join(text_parts)
801 logger.warning("No text could be extracted from PDF")
802 return None
804 except Exception:
805 logger.exception("Failed to extract text from PDF")
806 return None
808 def download_as_text(self, resource_id: int) -> Tuple[bool, Optional[str]]:
809 """
810 Download resource and extract text to encrypted database.
812 Args:
813 resource_id: ID of the resource to download
815 Returns:
816 Tuple of (success, error_message)
817 """
818 with get_user_db_session(self.username, self.password) as session:
819 # Get the resource
820 resource = (
821 session.query(ResearchResource)
822 .filter_by(id=resource_id)
823 .first()
824 )
825 if not resource:
826 return False, "Resource not found"
828 # Handle library resources — content already in the database
829 # (local-only, no network — skip retry checks)
830 if resource.source_type == "library" or (
831 resource.url and resource.url.startswith("/library/document/")
832 ):
833 return self._try_library_text_extraction(session, resource)
835 # Try existing text in database
836 result = self._try_existing_text(session, resource_id)
837 if result is not None:
838 return result
840 # Try legacy text files on disk
841 result = self._try_legacy_text_file(session, resource, resource_id)
842 if result is not None: 842 ↛ 843line 842 didn't jump to line 843 because the condition on line 842 was never true
843 return result
845 # Try extracting from existing PDF
846 result = self._try_existing_pdf_extraction(
847 session, resource, resource_id
848 )
849 if result is not None: 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true
850 return result
852 # Check retry eligibility before network-dependent extraction
853 if self.retry_manager: 853 ↛ 862line 853 didn't jump to line 862 because the condition on line 853 was always true
854 decision = self.retry_manager.should_retry_resource(resource_id)
855 if not decision.can_retry: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true
856 logger.info(
857 f"Skipping resource {resource_id}: {decision.reason}"
858 )
859 return False, decision.reason
861 # Try API text extraction
862 result = self._try_api_text_extraction(session, resource)
863 if result is not None:
864 self._record_retry_attempt(resource, result, session)
865 return result
867 # Fallback: Download PDF and extract
868 result = self._fallback_pdf_extraction(session, resource)
869 self._record_retry_attempt(resource, result, session)
870 return result
872 def _record_retry_attempt(
873 self, resource, result: Tuple[bool, Optional[str]], session=None
874 ) -> None:
875 """Record a download attempt with the retry manager."""
876 if not self.retry_manager: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true
877 return
878 self.retry_manager.record_attempt(
879 resource_id=resource.id,
880 result=result,
881 url=resource.url or "",
882 details=result[1]
883 or (
884 "Successfully extracted text"
885 if result[0]
886 else "Text extraction failed"
887 ),
888 session=session,
889 )
891 def _try_library_text_extraction(
892 self, session, resource
893 ) -> Tuple[bool, Optional[str]]:
894 """Handle library resources — content already exists in local database.
896 Returns:
897 Tuple of (success, error_message). On success: (True, None).
898 On failure: (False, description of what went wrong).
899 """
900 # 1. Extract document UUID from metadata (authoritative) or URL (fallback)
901 doc_id = None
902 metadata = (resource.resource_metadata or {}).get("original_data", {})
903 if isinstance(metadata, dict):
904 meta_inner = metadata.get("metadata", {})
905 if isinstance(meta_inner, dict):
906 doc_id = meta_inner.get("source_id") or meta_inner.get(
907 "document_id"
908 )
910 if not doc_id and resource.url:
911 # Parse from /library/document/{uuid} or /library/document/{uuid}/pdf
912 match = re.match(r"^/library/document/([^/]+)", resource.url)
913 if match: 913 ↛ 916line 913 didn't jump to line 916 because the condition on line 913 was always true
914 doc_id = match.group(1)
916 if not doc_id:
917 return False, "Could not extract library document ID"
919 # 2. Query the existing Document by its primary key (UUID string)
920 doc = session.query(Document).filter_by(id=doc_id).first()
921 if not doc:
922 return False, f"Library document {doc_id} not found in database"
924 # 3. If text already exists and extraction succeeded, return success
925 if (
926 doc.text_content
927 and doc.extraction_method
928 and doc.extraction_method != "failed"
929 ):
930 logger.info(f"Library document {doc_id} already has text content")
931 resource.document_id = doc.id
932 session.commit()
933 return True, None
935 # 4. Try extracting text from stored PDF
936 pdf_storage_mode = self.settings.get_setting(
937 "research_library.pdf_storage_mode", "none"
938 )
939 pdf_manager = PDFStorageManager(
940 library_root=self.library_root,
941 storage_mode=pdf_storage_mode,
942 )
943 pdf_content = pdf_manager.load_pdf(doc, session)
945 if not pdf_content:
946 return (
947 False,
948 f"Library document {doc_id} has no text or PDF content",
949 )
951 text = self._extract_text_from_pdf(pdf_content)
952 if not text:
953 return (
954 False,
955 f"Failed to extract text from library document {doc_id} PDF",
956 )
958 # 5. Update Document directly (don't use _save_text_with_db — it
959 # queries by resource_id which mismatches for library docs)
960 doc.text_content = text
961 doc.character_count = len(text)
962 doc.word_count = len(text.split())
963 doc.extraction_method = "pdf_extraction"
964 doc.extraction_source = "pdfplumber"
965 doc.extraction_quality = "medium"
967 resource.document_id = doc.id
968 session.commit()
970 logger.info(
971 f"Extracted text from library document {doc_id} "
972 f"({doc.word_count} words)"
973 )
974 return True, None
976 def _try_existing_text(
977 self, session, resource_id: int
978 ) -> Optional[Tuple[bool, Optional[str]]]:
979 """Check if text already exists in database (in Document.text_content)."""
980 existing_doc = (
981 session.query(Document).filter_by(resource_id=resource_id).first()
982 )
984 if not existing_doc:
985 return None
987 # Check if text content exists and extraction was successful
988 if (
989 existing_doc.text_content
990 and existing_doc.extraction_method
991 and existing_doc.extraction_method != "failed"
992 ):
993 logger.info(
994 f"Text content already exists in Document for resource_id={resource_id}, extraction_method={existing_doc.extraction_method}"
995 )
996 return True, None
998 # No text content or failed extraction
999 logger.debug(
1000 f"Document exists but no valid text content: resource_id={resource_id}, extraction_method={existing_doc.extraction_method}"
1001 )
1002 return None # Fall through to re-extraction
1004 def _try_legacy_text_file(
1005 self, session, resource, resource_id: int
1006 ) -> Optional[Tuple[bool, Optional[str]]]:
1007 """Check for legacy text files on disk."""
1008 txt_path = Path(self.library_root) / "txt"
1009 existing_files = (
1010 list(txt_path.glob(f"*_{resource_id}.txt"))
1011 if txt_path.exists()
1012 else []
1013 )
1015 if not existing_files:
1016 return None
1018 logger.info(f"Text file already exists on disk: {existing_files[0]}")
1019 self._create_text_document_record(
1020 session,
1021 resource,
1022 existing_files[0],
1023 extraction_method="unknown",
1024 extraction_source="legacy_file",
1025 )
1026 session.commit()
1027 return True, None
1029 def _try_existing_pdf_extraction(
1030 self, session, resource, resource_id: int
1031 ) -> Optional[Tuple[bool, Optional[str]]]:
1032 """Try extracting text from existing PDF in database."""
1033 pdf_document = (
1034 session.query(Document).filter_by(resource_id=resource_id).first()
1035 )
1037 if not pdf_document or pdf_document.status != "completed":
1038 return None
1040 # Validate path to prevent path traversal attacks
1041 if (
1042 not pdf_document.file_path
1043 or pdf_document.file_path in FILE_PATH_SENTINELS
1044 ):
1045 return None
1046 try:
1047 safe_path = PathValidator.validate_safe_path(
1048 pdf_document.file_path, str(self.library_root)
1049 )
1050 pdf_path = Path(safe_path)
1051 except ValueError:
1052 logger.warning(f"Path traversal blocked: {pdf_document.file_path}")
1053 return None
1054 if not pdf_path.is_file():
1055 return None
1057 logger.info(f"Found existing PDF, extracting text from: {pdf_path}")
1058 try:
1059 with open(pdf_path, "rb") as f:
1060 pdf_content = f.read()
1061 text = self._extract_text_from_pdf(pdf_content)
1063 if not text:
1064 return None
1066 self._save_text_with_db(
1067 resource,
1068 text,
1069 session,
1070 extraction_method="pdf_extraction",
1071 extraction_source="pdfplumber",
1072 pdf_document_id=pdf_document.id,
1073 )
1074 session.commit()
1075 return True, None
1077 except Exception:
1078 logger.exception(
1079 f"Failed to extract text from existing PDF: {pdf_path}"
1080 )
1081 return None # Fall through to other methods
1083 def _try_api_text_extraction(
1084 self, session, resource
1085 ) -> Optional[Tuple[bool, Optional[str]]]:
1086 """Try direct API text extraction."""
1087 logger.info(
1088 f"Attempting direct API text extraction from: {resource.url}"
1089 )
1091 downloader = self._get_downloader(resource.url)
1092 if not downloader:
1093 return None
1095 result = downloader.download_with_result(resource.url, ContentType.TEXT)
1097 if not result.is_success or not result.content:
1098 return None
1100 # Decode text content
1101 text = (
1102 result.content.decode("utf-8", errors="ignore")
1103 if isinstance(result.content, bytes)
1104 else result.content
1105 )
1107 # Determine extraction source
1108 extraction_source = "unknown"
1109 if isinstance(downloader, ArxivDownloader):
1110 extraction_source = "arxiv_api"
1111 elif isinstance(downloader, PubMedDownloader): 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true
1112 extraction_source = "pubmed_api"
1114 try:
1115 self._save_text_with_db(
1116 resource,
1117 text,
1118 session,
1119 extraction_method="native_api",
1120 extraction_source=extraction_source,
1121 )
1122 session.commit()
1123 logger.info(
1124 f"✓ SUCCESS: Got text from {extraction_source.upper()} API for '{resource.title[:50]}...'"
1125 )
1126 return True, None
1127 except Exception as e:
1128 logger.exception(f"Failed to save text for resource {resource.id}")
1129 # Sanitize error message before returning to API
1130 safe_error = sanitize_for_log(
1131 f"Failed to save text: {str(e)}", max_length=200
1132 )
1133 return False, safe_error
1135 def _fallback_pdf_extraction(
1136 self, session, resource
1137 ) -> Tuple[bool, Optional[str]]:
1138 """Fallback: Download PDF to memory and extract text."""
1139 logger.info(
1140 f"API text extraction failed, falling back to in-memory PDF download for: {resource.url}"
1141 )
1143 downloader = self._get_downloader(resource.url)
1144 if not downloader:
1145 error_msg = "No compatible downloader found"
1146 logger.warning(
1147 f"✗ FAILED: {error_msg} for '{resource.title[:50]}...'"
1148 )
1149 self._record_failed_text_extraction(
1150 session, resource, error=error_msg
1151 )
1152 session.commit()
1153 return False, error_msg
1155 result = downloader.download_with_result(resource.url, ContentType.PDF)
1157 if not result.is_success or not result.content:
1158 error_msg = result.skip_reason or "Failed to download PDF"
1159 logger.warning(
1160 f"✗ FAILED: Could not download PDF for '{resource.title[:50]}...' | Error: {error_msg}"
1161 )
1162 self._record_failed_text_extraction(
1163 session, resource, error=f"PDF download failed: {error_msg}"
1164 )
1165 session.commit()
1166 return False, f"PDF extraction failed: {error_msg}"
1168 # Extract text from PDF
1169 text = self._extract_text_from_pdf(result.content)
1170 if not text:
1171 error_msg = "PDF text extraction returned empty text"
1172 logger.warning(
1173 f"Failed to extract text from PDF for: {resource.url}"
1174 )
1175 self._record_failed_text_extraction(
1176 session, resource, error=error_msg
1177 )
1178 session.commit()
1179 return False, error_msg
1181 try:
1182 self._save_text_with_db(
1183 resource,
1184 text,
1185 session,
1186 extraction_method="pdf_extraction",
1187 extraction_source="pdfplumber_fallback",
1188 )
1189 session.commit()
1190 logger.info(
1191 f"✓ SUCCESS: Extracted text from '{resource.title[:50]}...'"
1192 )
1193 return True, None
1194 except Exception as e:
1195 logger.exception(f"Failed to save text for resource {resource.id}")
1196 # Sanitize error message before returning to API
1197 safe_error = sanitize_for_log(
1198 f"Failed to save text: {str(e)}", max_length=200
1199 )
1200 return False, safe_error
1202 def _get_downloader(self, url: str):
1203 """
1204 Get the appropriate downloader for a URL.
1206 Args:
1207 url: The URL to download from
1209 Returns:
1210 The appropriate downloader instance or None
1211 """
1212 for downloader in self.downloaders:
1213 if downloader.can_handle(url):
1214 return downloader
1215 return None
1217 def _download_generic(self, url: str) -> Optional[bytes]:
1218 """Generic PDF download method."""
1219 try:
1220 headers = {
1221 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1222 }
1223 response = safe_get(
1224 url, headers=headers, timeout=30, allow_redirects=True
1225 )
1226 response.raise_for_status()
1228 # Verify it's a PDF
1229 content_type = response.headers.get("Content-Type", "")
1230 if (
1231 "pdf" not in content_type.lower()
1232 and not response.content.startswith(b"%PDF")
1233 ):
1234 logger.warning(f"Response is not a PDF: {content_type}")
1235 return None
1237 return response.content
1239 except Exception:
1240 logger.exception("Generic download failed")
1241 return None
1243 def _download_arxiv(self, url: str) -> Optional[bytes]:
1244 """Download from arXiv."""
1245 try:
1246 # Convert abstract URL to PDF URL
1247 pdf_url = url.replace("abs", "pdf")
1248 if not pdf_url.endswith(".pdf"):
1249 pdf_url += ".pdf"
1251 return self._download_generic(pdf_url)
1252 except Exception:
1253 logger.exception("arXiv download failed")
1254 return None
1256 def _try_europe_pmc(self, pmid: str) -> Optional[bytes]:
1257 """Try downloading from Europe PMC which often has better PDF availability."""
1258 try:
1259 # Europe PMC API is more reliable for PDFs
1260 # Check if PDF is available
1261 api_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=EXT_ID:{pmid}&format=json"
1262 response = safe_get(api_url, timeout=10)
1264 if response.status_code == 200:
1265 data = response.json()
1266 results = data.get("resultList", {}).get("result", [])
1268 if results:
1269 article = results[0]
1270 # Check if article has open access PDF
1271 if (
1272 article.get("isOpenAccess") == "Y"
1273 and article.get("hasPDF") == "Y"
1274 ):
1275 pmcid = article.get("pmcid")
1276 if pmcid:
1277 # Europe PMC PDF URL
1278 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmcid}&blobtype=pdf"
1279 logger.info(
1280 f"Found Europe PMC PDF for PMID {pmid}: {pmcid}"
1281 )
1283 headers = {
1284 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1285 }
1287 pdf_response = safe_get(
1288 pdf_url,
1289 headers=headers,
1290 timeout=30,
1291 allow_redirects=True,
1292 )
1293 if pdf_response.status_code == 200: 1293 ↛ 1305line 1293 didn't jump to line 1305 because the condition on line 1293 was always true
1294 content_type = pdf_response.headers.get(
1295 "content-type", ""
1296 )
1297 if ( 1297 ↛ 1305line 1297 didn't jump to line 1305 because the condition on line 1297 was always true
1298 "pdf" in content_type.lower()
1299 or len(pdf_response.content) > 1000
1300 ):
1301 return pdf_response.content
1302 except Exception as e:
1303 logger.debug(f"Europe PMC download failed: {e}")
1305 return None
1307 def _download_pubmed(self, url: str) -> Optional[bytes]:
1308 """Download from PubMed/PubMed Central with rate limiting."""
1309 try:
1310 # Apply rate limiting for PubMed requests
1311 current_time = time.time()
1312 time_since_last = current_time - self._last_pubmed_request
1313 if time_since_last < self._pubmed_delay:
1314 sleep_time = self._pubmed_delay - time_since_last
1315 logger.debug(
1316 f"Rate limiting: sleeping {sleep_time:.2f}s before PubMed request"
1317 )
1318 time.sleep(sleep_time)
1319 self._last_pubmed_request = time.time()
1321 # If it's already a PMC article, download directly
1322 if "/articles/PMC" in url:
1323 pmc_match = re.search(r"(PMC\d+)", url)
1324 if pmc_match: 1324 ↛ 1528line 1324 didn't jump to line 1528 because the condition on line 1324 was always true
1325 pmc_id = pmc_match.group(1)
1327 # Try Europe PMC (more reliable)
1328 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1329 headers = {
1330 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1331 }
1333 try:
1334 response = safe_get(
1335 europe_url,
1336 headers=headers,
1337 timeout=30,
1338 allow_redirects=True,
1339 )
1340 if response.status_code == 200: 1340 ↛ 1528line 1340 didn't jump to line 1528 because the condition on line 1340 was always true
1341 content_type = response.headers.get(
1342 "content-type", ""
1343 )
1344 if ( 1344 ↛ 1528line 1344 didn't jump to line 1528 because the condition on line 1344 was always true
1345 "pdf" in content_type.lower()
1346 or len(response.content) > 1000
1347 ):
1348 logger.info(
1349 f"Downloaded PDF via Europe PMC for {pmc_id}"
1350 )
1351 return response.content
1352 except Exception as e:
1353 logger.debug(f"Direct Europe PMC download failed: {e}")
1354 return None
1356 # If it's a regular PubMed URL, try to find PMC version
1357 elif urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov":
1358 # Extract PMID from URL
1359 pmid_match = re.search(r"/(\d+)/?", url)
1360 if pmid_match:
1361 pmid = pmid_match.group(1)
1362 logger.info(f"Attempting to download PDF for PMID: {pmid}")
1364 # Try Europe PMC first (more reliable)
1365 pdf_content = self._try_europe_pmc(pmid)
1366 if pdf_content:
1367 return pdf_content
1369 # First try using NCBI E-utilities API to find PMC ID
1370 try:
1371 # Use elink to convert PMID to PMCID
1372 elink_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi"
1373 params = {
1374 "dbfrom": "pubmed",
1375 "db": "pmc",
1376 "id": pmid,
1377 "retmode": "json",
1378 }
1380 api_response = safe_get(
1381 elink_url, params=params, timeout=10
1382 )
1383 if api_response.status_code == 200: 1383 ↛ 1466line 1383 didn't jump to line 1466 because the condition on line 1383 was always true
1384 data = api_response.json()
1385 # Parse the response to find PMC ID
1386 link_sets = data.get("linksets", [])
1387 if link_sets and "linksetdbs" in link_sets[0]:
1388 for linksetdb in link_sets[0]["linksetdbs"]: 1388 ↛ 1466line 1388 didn't jump to line 1466 because the loop on line 1388 didn't complete
1389 if linksetdb.get( 1389 ↛ 1388line 1389 didn't jump to line 1388 because the condition on line 1389 was always true
1390 "dbto"
1391 ) == "pmc" and linksetdb.get("links"):
1392 pmc_id_num = linksetdb["links"][0]
1393 # Now fetch PMC details to get the correct PMC ID format
1394 esummary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
1395 summary_params = {
1396 "db": "pmc",
1397 "id": pmc_id_num,
1398 "retmode": "json",
1399 }
1400 summary_response = safe_get(
1401 esummary_url,
1402 params=summary_params,
1403 timeout=10,
1404 )
1405 if summary_response.status_code == 200: 1405 ↛ 1388line 1405 didn't jump to line 1388 because the condition on line 1405 was always true
1406 summary_data = (
1407 summary_response.json()
1408 )
1409 result = summary_data.get(
1410 "result", {}
1411 ).get(str(pmc_id_num), {})
1412 if result: 1412 ↛ 1388line 1412 didn't jump to line 1388 because the condition on line 1412 was always true
1413 # PMC IDs in the API don't have the "PMC" prefix
1414 pmc_id = f"PMC{pmc_id_num}"
1415 logger.info(
1416 f"Found PMC ID via API: {pmc_id} for PMID: {pmid}"
1417 )
1419 # Try Europe PMC with the PMC ID
1420 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1422 time.sleep(self._pubmed_delay)
1424 headers = {
1425 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1426 }
1428 try:
1429 response = safe_get(
1430 europe_url,
1431 headers=headers,
1432 timeout=30,
1433 allow_redirects=True,
1434 )
1435 if ( 1435 ↛ 1388line 1435 didn't jump to line 1388 because the condition on line 1435 was always true
1436 response.status_code
1437 == 200
1438 ):
1439 content_type = response.headers.get(
1440 "content-type", ""
1441 )
1442 if ( 1442 ↛ 1388line 1442 didn't jump to line 1388 because the condition on line 1442 was always true
1443 "pdf"
1444 in content_type.lower()
1445 or len(
1446 response.content
1447 )
1448 > 1000
1449 ):
1450 logger.info(
1451 f"Downloaded PDF via Europe PMC for {pmc_id}"
1452 )
1453 return (
1454 response.content
1455 )
1456 except Exception as e:
1457 logger.debug(
1458 f"Europe PMC download with PMC ID failed: {e}"
1459 )
1460 except Exception as e:
1461 logger.debug(
1462 f"API lookup failed, trying webpage scraping: {e}"
1463 )
1465 # Fallback to webpage scraping if API fails
1466 try:
1467 headers = {
1468 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1469 }
1470 response = safe_get(url, headers=headers, timeout=10)
1471 if response.status_code == 200: 1471 ↛ 1528line 1471 didn't jump to line 1528 because the condition on line 1471 was always true
1472 # Look for PMC ID in the page
1473 pmc_match = re.search(r"PMC\d+", response.text)
1474 if pmc_match: 1474 ↛ 1513line 1474 didn't jump to line 1513 because the condition on line 1474 was always true
1475 pmc_id = pmc_match.group(0)
1476 logger.info(
1477 f"Found PMC ID via webpage: {pmc_id} for PMID: {pmid}"
1478 )
1480 # Add delay before downloading PDF
1481 time.sleep(self._pubmed_delay)
1483 # Try Europe PMC with the PMC ID (more reliable)
1484 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1485 headers = {
1486 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1487 }
1489 try:
1490 response = safe_get(
1491 europe_url,
1492 headers=headers,
1493 timeout=30,
1494 allow_redirects=True,
1495 )
1496 if response.status_code == 200: 1496 ↛ 1528line 1496 didn't jump to line 1528 because the condition on line 1496 was always true
1497 content_type = response.headers.get(
1498 "content-type", ""
1499 )
1500 if ( 1500 ↛ 1528line 1500 didn't jump to line 1528 because the condition on line 1500 was always true
1501 "pdf" in content_type.lower()
1502 or len(response.content) > 1000
1503 ):
1504 logger.info(
1505 f"Downloaded PDF via Europe PMC for {pmc_id}"
1506 )
1507 return response.content
1508 except Exception as e:
1509 logger.debug(
1510 f"Europe PMC download failed: {e}"
1511 )
1512 else:
1513 logger.info(
1514 f"No PMC version found for PMID: {pmid}"
1515 )
1516 except requests.exceptions.HTTPError as e:
1517 if e.response.status_code == 429: 1517 ↛ 1524line 1517 didn't jump to line 1524 because the condition on line 1517 was always true
1518 logger.warning(
1519 "Rate limited by PubMed, increasing delay"
1520 )
1521 self._pubmed_delay = min(
1522 self._pubmed_delay * 2, 5.0
1523 ) # Max 5 seconds
1524 raise
1525 except Exception as e:
1526 logger.debug(f"Could not check for PMC version: {e}")
1528 return self._download_generic(url)
1529 except Exception:
1530 logger.exception("PubMed download failed")
1531 return None
1533 def _download_semantic_scholar(self, url: str) -> Optional[bytes]:
1534 """Download from Semantic Scholar."""
1535 # Semantic Scholar doesn't host PDFs directly
1536 # Would need to extract actual PDF URL from page
1537 return None
1539 def _download_biorxiv(self, url: str) -> Optional[bytes]:
1540 """Download from bioRxiv."""
1541 try:
1542 # Convert to PDF URL
1543 pdf_url = url.replace(".org/", ".org/content/")
1544 pdf_url = re.sub(r"v\d+$", "", pdf_url) # Remove version
1545 pdf_url += ".full.pdf"
1547 return self._download_generic(pdf_url)
1548 except Exception:
1549 logger.exception("bioRxiv download failed")
1550 return None
1552 def _save_text_with_db(
1553 self,
1554 resource: ResearchResource,
1555 text: str,
1556 session: Session,
1557 extraction_method: str,
1558 extraction_source: str,
1559 pdf_document_id: Optional[int] = None,
1560 ) -> Optional[str]:
1561 """
1562 Save extracted text to encrypted database.
1564 Args:
1565 resource: The research resource
1566 text: Extracted text content
1567 session: Database session
1568 extraction_method: How the text was extracted
1569 extraction_source: Specific tool/API used
1570 pdf_document_id: ID of PDF document if extracted from PDF
1572 Returns:
1573 None (previously returned text file path, now removed)
1574 """
1575 try:
1576 # Calculate text metadata for database
1577 word_count = len(text.split())
1578 character_count = len(text)
1580 # Find the document by pdf_document_id or resource_id
1581 doc = None
1582 if pdf_document_id:
1583 doc = (
1584 session.query(Document)
1585 .filter_by(id=pdf_document_id)
1586 .first()
1587 )
1588 else:
1589 doc = get_document_for_resource(session, resource)
1591 if doc:
1592 # Update existing document with extracted text. Only replace
1593 # document_hash when transitioning from FAILED — see issue
1594 # #3827. PDF-bytes hashes set at creation must remain stable;
1595 # text-content hashes collide far more often (identical
1596 # extracted text from different PDFs).
1597 was_failed = doc.status == DocumentStatus.FAILED
1598 doc.text_content = text
1599 doc.character_count = character_count
1600 doc.word_count = word_count
1601 doc.extraction_method = extraction_method
1602 doc.extraction_source = extraction_source
1603 doc.status = DocumentStatus.COMPLETED
1604 if was_failed:
1605 doc.document_hash = hashlib.sha256(
1606 text.encode()
1607 ).hexdigest()
1608 doc.processed_at = datetime.now(UTC)
1610 # Set quality based on method
1611 if extraction_method == "native_api":
1612 doc.extraction_quality = "high"
1613 elif (
1614 extraction_method == "pdf_extraction"
1615 and extraction_source == "pdfplumber"
1616 ):
1617 doc.extraction_quality = "medium"
1618 else:
1619 doc.extraction_quality = "low"
1621 logger.debug(
1622 f"Updated document {doc.id} with extracted text ({word_count} words)"
1623 )
1624 else:
1625 # Create a new Document for text-only extraction
1626 # Generate hash from text content
1627 text_hash = hashlib.sha256(text.encode()).hexdigest()
1629 # Dedup against existing content hash. If another resource
1630 # already produced identical extracted text, link this
1631 # resource to the canonical Document instead of inserting
1632 # a duplicate that would violate the UNIQUE constraint
1633 # (issue #3827). Mirrors research_history_indexer.py:322.
1634 existing_by_hash = (
1635 session.query(Document)
1636 .filter_by(document_hash=text_hash)
1637 .first()
1638 )
1639 if existing_by_hash:
1640 resource.document_id = existing_by_hash.id
1641 library_collection = (
1642 session.query(Collection)
1643 .filter_by(name="Library")
1644 .first()
1645 )
1646 if library_collection: 1646 ↛ 1653line 1646 didn't jump to line 1653 because the condition on line 1646 was always true
1647 ensure_in_collection(
1648 session,
1649 existing_by_hash.id,
1650 library_collection.id,
1651 )
1652 else:
1653 logger.warning(
1654 f"Library collection not found - deduped document {existing_by_hash.id} will not be linked to default collection"
1655 )
1656 logger.info(
1657 f"Linked resource {resource.id} to existing Document "
1658 f"{existing_by_hash.id} (matched on content hash)"
1659 )
1660 return None
1662 # Get source type for research downloads
1663 try:
1664 source_type_id = get_source_type_id(
1665 self.username, "research_download", self.password
1666 )
1667 except Exception:
1668 logger.exception(
1669 "Failed to get source type for text document"
1670 )
1671 raise
1673 # Create new document
1674 doc_id = str(uuid.uuid4())
1675 doc = Document(
1676 id=doc_id,
1677 source_type_id=source_type_id,
1678 resource_id=resource.id,
1679 research_id=resource.research_id,
1680 document_hash=text_hash,
1681 original_url=resource.url,
1682 file_path=FILE_PATH_TEXT_ONLY,
1683 file_size=character_count, # Use character count as file size for text-only
1684 file_type="text",
1685 mime_type="text/plain",
1686 title=resource.title,
1687 text_content=text,
1688 character_count=character_count,
1689 word_count=word_count,
1690 extraction_method=extraction_method,
1691 extraction_source=extraction_source,
1692 extraction_quality="high"
1693 if extraction_method == "native_api"
1694 else "medium",
1695 status=DocumentStatus.COMPLETED,
1696 processed_at=datetime.now(UTC),
1697 )
1698 session.add(doc)
1700 # Link to default Library collection
1701 library_collection = (
1702 session.query(Collection).filter_by(name="Library").first()
1703 )
1704 if library_collection:
1705 ensure_in_collection(session, doc_id, library_collection.id)
1706 else:
1707 logger.warning(
1708 f"Library collection not found - document {doc_id} will not be linked to default collection"
1709 )
1711 logger.info(
1712 f"Created new document {doc_id} for text-only extraction ({word_count} words)"
1713 )
1715 logger.info(
1716 f"Saved text to encrypted database ({word_count} words)"
1717 )
1718 return None
1720 except Exception:
1721 # Rollback BEFORE re-raising so the shared thread-local session
1722 # is clean by the time the caller's loop reaches its next
1723 # iteration. Without this, an IntegrityError leaves the session
1724 # in PendingRollbackError state and every subsequent ORM access
1725 # cascades (issue #3827).
1726 safe_rollback(session, "_save_text_with_db")
1727 logger.exception("Error saving text to encrypted database")
1728 raise # Re-raise so caller can handle the error
1730 def _create_text_document_record(
1731 self,
1732 session: Session,
1733 resource: ResearchResource,
1734 file_path: Path,
1735 extraction_method: str,
1736 extraction_source: str,
1737 ):
1738 """Update existing Document with text from file (for legacy text files)."""
1739 try:
1740 # Read file to get metadata
1741 text = file_path.read_text(encoding="utf-8", errors="ignore")
1742 word_count = len(text.split())
1743 character_count = len(text)
1745 # Find the Document for this resource
1746 doc = get_document_for_resource(session, resource)
1748 if doc:
1749 # Update existing document with text content
1750 doc.text_content = text
1751 doc.character_count = character_count
1752 doc.word_count = word_count
1753 doc.extraction_method = extraction_method
1754 doc.extraction_source = extraction_source
1755 doc.extraction_quality = (
1756 "low" # Unknown quality for legacy files
1757 )
1758 logger.info(
1759 f"Updated document {doc.id} with text from file: {file_path.name}"
1760 )
1761 else:
1762 logger.warning(
1763 f"No document found to update for resource {resource.id}"
1764 )
1766 except Exception:
1767 logger.exception("Error updating document with text from file")
1769 def _record_failed_text_extraction(
1770 self, session: Session, resource: ResearchResource, error: str
1771 ):
1772 """Record a failed text extraction attempt in the Document."""
1773 try:
1774 # Find the Document for this resource
1775 doc = get_document_for_resource(session, resource)
1777 if doc:
1778 # Update document with extraction error
1779 doc.error_message = error
1780 doc.extraction_method = "failed"
1781 doc.extraction_quality = "low"
1782 doc.status = DocumentStatus.FAILED
1783 logger.info(
1784 f"Recorded failed text extraction for document {doc.id}: {error}"
1785 )
1786 else:
1787 # Create a new Document for failed extraction
1788 # This enables tracking failures and retry capability
1789 source_type_id = get_source_type_id(
1790 self.username, "research_download", self.password
1791 )
1793 # Deterministic hash so retries update the same record
1794 failed_hash = hashlib.sha256(
1795 f"failed:{resource.url}:{resource.id}".encode()
1796 ).hexdigest()
1798 doc_id = str(uuid.uuid4())
1799 doc = Document(
1800 id=doc_id,
1801 source_type_id=source_type_id,
1802 resource_id=resource.id,
1803 research_id=resource.research_id,
1804 document_hash=failed_hash,
1805 original_url=resource.url,
1806 file_path=None,
1807 file_size=0,
1808 file_type="unknown",
1809 title=resource.title,
1810 status=DocumentStatus.FAILED,
1811 error_message=error,
1812 extraction_method="failed",
1813 extraction_quality="low",
1814 processed_at=datetime.now(UTC),
1815 )
1816 session.add(doc)
1818 logger.info(
1819 f"Created failed document {doc_id} for resource {resource.id}: {error}"
1820 )
1822 except Exception:
1823 logger.exception("Error recording failed text extraction")