Coverage for src / local_deep_research / research_library / services / download_service.py: 22%
613 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2PDF Download Service for Research Library
4Handles downloading PDFs from various academic sources with:
5- Deduplication using download tracker
6- Source-specific download strategies (arXiv, PubMed, etc.)
7- Progress tracking and error handling
8- File organization and storage
9"""
11import hashlib
12import re
13import time
14import uuid
15from datetime import datetime, UTC
16from pathlib import Path
17from typing import Optional, Tuple
18from urllib.parse import urlparse
20import requests
21from loguru import logger
22from sqlalchemy.orm import Session
23import pdfplumber
24from pypdf import PdfReader
26from ...database.models.download_tracker import (
27 DownloadAttempt,
28 DownloadTracker,
29)
30from ...security import redact_data, safe_get
31from ...database.models.library import (
32 Collection,
33 Document as Document,
34 DocumentStatus,
35 DownloadQueue as LibraryDownloadQueue,
36 DocumentCollection,
37)
38from .pdf_storage_manager import PDFStorageManager
39from ...database.models.research import ResearchResource
40from ...database.library_init import get_source_type_id, get_default_library_id
41from ...database.session_context import get_user_db_session
42from ...utilities.db_utils import get_settings_manager
43from ...library.download_management import RetryManager
44from ...config.paths import get_library_directory
45from ..utils import (
46 get_url_hash,
47 get_absolute_path_from_settings,
48)
50# Import our modular downloaders
51from ..downloaders import (
52 ContentType,
53 ArxivDownloader,
54 PubMedDownloader,
55 BioRxivDownloader,
56 DirectPDFDownloader,
57 SemanticScholarDownloader,
58 OpenAlexDownloader,
59 GenericDownloader,
60)
63class DownloadService:
64 """Service for downloading and managing research PDFs."""
66 def __init__(self, username: str, password: Optional[str] = None):
67 """Initialize download service for a user.
69 Args:
70 username: The username to download for
71 password: Optional password for encrypted database access
72 """
73 self.username = username
74 self.password = password
75 self.settings = get_settings_manager(username=username)
77 # Debug settings manager and user context
78 logger.info(
79 f"[DOWNLOAD_SERVICE] Settings manager initialized: {type(self.settings)}, username: {self.username}"
80 )
82 # Get library path from settings (uses centralized path, respects LDR_DATA_DIR)
83 storage_path_setting = self.settings.get_setting(
84 "research_library.storage_path",
85 str(get_library_directory()),
86 )
87 logger.warning(
88 f"[DOWNLOAD_SERVICE_INIT] Storage path setting retrieved: {storage_path_setting} (type: {type(storage_path_setting)})"
89 )
91 if storage_path_setting is None: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 logger.error(
93 "[DOWNLOAD_SERVICE_INIT] CRITICAL: storage_path_setting is None!"
94 )
95 raise ValueError("Storage path setting cannot be None")
97 self.library_root = str(Path(storage_path_setting).expanduser())
98 logger.warning(
99 f"[DOWNLOAD_SERVICE_INIT] Library root resolved to: {self.library_root}"
100 )
102 # Create directory structure
103 self._setup_directories()
105 # Initialize modular downloaders
106 # DirectPDFDownloader first for efficiency with direct PDF links
108 # Get Semantic Scholar API key from settings
109 semantic_scholar_api_key = self.settings.get_setting(
110 "search.engine.web.semantic_scholar.api_key", ""
111 )
113 self.downloaders = [
114 DirectPDFDownloader(timeout=30), # Handle direct PDF links first
115 SemanticScholarDownloader(
116 timeout=30,
117 api_key=semantic_scholar_api_key
118 if semantic_scholar_api_key
119 else None,
120 ),
121 OpenAlexDownloader(
122 timeout=30
123 ), # OpenAlex with API lookup (no key needed)
124 ArxivDownloader(timeout=30),
125 PubMedDownloader(timeout=30, rate_limit_delay=1.0),
126 BioRxivDownloader(timeout=30),
127 GenericDownloader(timeout=30), # Generic should be last (fallback)
128 ]
130 # Initialize retry manager for smart failure tracking
131 self.retry_manager = RetryManager(username, password)
132 logger.info(
133 f"[DOWNLOAD_SERVICE] Initialized retry manager for user: {username}"
134 )
136 # PubMed rate limiting state
137 self._pubmed_delay = 1.0 # 1 second delay for PubMed
138 self._last_pubmed_request = 0.0 # Track last request time
140 def _setup_directories(self):
141 """Create library directory structure."""
142 # Only create the root and pdfs folder - flat structure
143 paths = [
144 self.library_root,
145 str(Path(self.library_root) / "pdfs"),
146 ]
147 for path in paths:
148 Path(path).mkdir(parents=True, exist_ok=True)
150 def _normalize_url(self, url: str) -> str:
151 """Normalize URL for consistent hashing."""
152 # Remove protocol variations
153 url = re.sub(r"^https?://", "", url)
154 # Remove www
155 url = re.sub(r"^www\.", "", url)
156 # Remove trailing slashes
157 url = url.rstrip("/")
158 # Sort query parameters
159 if "?" in url:
160 base, query = url.split("?", 1)
161 params = sorted(query.split("&"))
162 url = f"{base}?{'&'.join(params)}"
163 return url.lower()
165 def _get_url_hash(self, url: str) -> str:
166 """Generate SHA256 hash of normalized URL."""
167 normalized = self._normalize_url(url)
168 return get_url_hash(normalized)
170 def is_already_downloaded(self, url: str) -> Tuple[bool, Optional[str]]:
171 """
172 Check if URL is already downloaded.
174 Returns:
175 Tuple of (is_downloaded, file_path)
176 """
177 url_hash = self._get_url_hash(url)
179 with get_user_db_session(self.username, self.password) as session:
180 tracker = (
181 session.query(DownloadTracker)
182 .filter_by(url_hash=url_hash, is_downloaded=True)
183 .first()
184 )
186 if tracker and tracker.file_path:
187 # Compute absolute path and verify file still exists
188 absolute_path = get_absolute_path_from_settings(
189 tracker.file_path
190 )
191 if absolute_path.exists(): 191 ↛ 195line 191 didn't jump to line 195 because the condition on line 191 was always true
192 return True, str(absolute_path)
193 else:
194 # File was deleted, mark as not downloaded
195 tracker.is_downloaded = False
196 session.commit()
198 return False, None
200 def get_text_content(self, resource_id: int) -> Optional[str]:
201 """
202 Get text content for a research resource.
204 This will try to:
205 1. Fetch text directly from APIs if available
206 2. Extract text from downloaded PDF if exists
207 3. Download PDF and extract text if not yet downloaded
209 Args:
210 resource_id: ID of the research resource
212 Returns:
213 Text content as string, or None if extraction failed
214 """
215 with get_user_db_session(self.username, self.password) as session:
216 resource = session.query(ResearchResource).get(resource_id)
217 if not resource:
218 logger.error(f"Resource {resource_id} not found")
219 return None
221 url = resource.url
223 # Find appropriate downloader
224 for downloader in self.downloaders:
225 if downloader.can_handle(url):
226 logger.info(
227 f"Using {downloader.__class__.__name__} for text extraction from {url}"
228 )
229 try:
230 # Try to get text content
231 text = downloader.download_text(url)
232 if text:
233 logger.info(
234 f"Successfully extracted text for: {resource.title[:50]}"
235 )
236 return text
237 except Exception:
238 logger.exception("Failed to extract text")
239 break
241 logger.warning(f"Could not extract text for {url}")
242 return None
244 def queue_research_downloads(
245 self, research_id: str, collection_id: Optional[str] = None
246 ) -> int:
247 """
248 Queue all downloadable PDFs from a research session.
250 Args:
251 research_id: The research session ID
252 collection_id: Optional target collection ID (defaults to Library if not provided)
254 Returns:
255 Number of items queued
256 """
257 queued = 0
259 # Get default library collection if no collection_id provided
260 if not collection_id: 260 ↛ 265line 260 didn't jump to line 265 because the condition on line 260 was always true
261 from ...database.library_init import get_default_library_id
263 collection_id = get_default_library_id(self.username)
265 with get_user_db_session(self.username, self.password) as session:
266 # Get all resources for this research
267 resources = (
268 session.query(ResearchResource)
269 .filter_by(research_id=research_id)
270 .all()
271 )
273 for resource in resources:
274 if self._is_downloadable(resource):
275 # Check if already queued
276 existing_queue = (
277 session.query(LibraryDownloadQueue)
278 .filter_by(
279 resource_id=resource.id,
280 status=DocumentStatus.PENDING,
281 )
282 .first()
283 )
285 # Check if already downloaded (trust the database status)
286 existing_doc = (
287 session.query(Document)
288 .filter_by(
289 resource_id=resource.id,
290 status=DocumentStatus.COMPLETED,
291 )
292 .first()
293 )
295 # Queue if not already queued and not marked as completed
296 if not existing_queue and not existing_doc: 296 ↛ 273line 296 didn't jump to line 273 because the condition on line 296 was always true
297 # Check one more time if ANY queue entry exists (regardless of status)
298 any_queue = (
299 session.query(LibraryDownloadQueue)
300 .filter_by(resource_id=resource.id)
301 .first()
302 )
304 if any_queue: 304 ↛ 306line 304 didn't jump to line 306 because the condition on line 304 was never true
305 # Reset the existing queue entry
306 any_queue.status = DocumentStatus.PENDING
307 any_queue.research_id = research_id
308 any_queue.collection_id = collection_id
309 queued += 1
310 else:
311 # Add new queue entry
312 queue_entry = LibraryDownloadQueue(
313 resource_id=resource.id,
314 research_id=research_id,
315 collection_id=collection_id,
316 priority=0,
317 status=DocumentStatus.PENDING,
318 )
319 session.add(queue_entry)
320 queued += 1
322 session.commit()
323 logger.info(
324 f"Queued {queued} downloads for research {research_id} to collection {collection_id}"
325 )
327 return queued
329 def _is_downloadable(self, resource: ResearchResource) -> bool:
330 """Check if a resource is likely downloadable as PDF."""
331 url = resource.url.lower() if resource.url else ""
333 # Check for PDF extensions
334 if url.endswith(".pdf"):
335 return True
337 # Check for known academic sources with downloadable PDFs
338 downloadable_domains = [
339 "arxiv.org",
340 "biorxiv.org",
341 "medrxiv.org",
342 "ncbi.nlm.nih.gov/pmc", # PubMed Central has PDFs
343 "pubmed.ncbi.nlm.nih.gov", # Try to find PMC version
344 "semanticscholar.org",
345 "researchgate.net",
346 "academia.edu",
347 ]
349 return any(domain in url for domain in downloadable_domains)
351 def download_resource(self, resource_id: int) -> Tuple[bool, Optional[str]]:
352 """
353 Download a specific resource.
355 Returns:
356 Tuple of (success: bool, skip_reason: str or None)
357 """
358 with get_user_db_session(self.username, self.password) as session:
359 resource = session.query(ResearchResource).get(resource_id)
360 if not resource: 360 ↛ 365line 360 didn't jump to line 365 because the condition on line 360 was always true
361 logger.error(f"Resource {resource_id} not found")
362 return False, "Resource not found"
364 # Check if already downloaded (trust the database after sync)
365 existing_doc = (
366 session.query(Document)
367 .filter_by(
368 resource_id=resource_id, status=DocumentStatus.COMPLETED
369 )
370 .first()
371 )
373 if existing_doc:
374 logger.info(
375 "Resource already downloaded (according to database)"
376 )
377 return True, None
379 # Get collection_id from queue entry if it exists
380 queue_entry = (
381 session.query(LibraryDownloadQueue)
382 .filter_by(resource_id=resource_id)
383 .first()
384 )
385 collection_id = (
386 queue_entry.collection_id
387 if queue_entry and queue_entry.collection_id
388 else None
389 )
391 # Create download tracker entry
392 url_hash = self._get_url_hash(resource.url)
393 tracker = (
394 session.query(DownloadTracker)
395 .filter_by(url_hash=url_hash)
396 .first()
397 )
399 if not tracker:
400 tracker = DownloadTracker(
401 url=resource.url,
402 url_hash=url_hash,
403 first_resource_id=resource.id,
404 is_downloaded=False,
405 )
406 session.add(tracker)
407 session.commit()
409 # Attempt download
410 result = self._download_pdf(
411 resource, tracker, session, collection_id
412 )
414 # Get skip reason if failed
415 skip_reason = None
416 if isinstance(result, tuple):
417 success, skip_reason = result
418 else:
419 success = result
420 if not success:
421 # Try to get skip reason from last attempt
422 last_attempt = (
423 session.query(DownloadAttempt)
424 .filter_by(url_hash=tracker.url_hash)
425 .order_by(DownloadAttempt.attempted_at.desc())
426 .first()
427 )
428 if last_attempt and hasattr(last_attempt, "error_message"):
429 skip_reason = last_attempt.error_message
431 # Record attempt with retry manager for smart failure tracking
432 self.retry_manager.record_attempt(
433 resource_id=resource.id,
434 result=(success, skip_reason),
435 url=resource.url,
436 details=skip_reason
437 or (
438 "Successfully downloaded" if success else "Download failed"
439 ),
440 session=session,
441 )
443 # Update queue status if exists
444 queue_entry = (
445 session.query(LibraryDownloadQueue)
446 .filter_by(resource_id=resource_id)
447 .first()
448 )
450 if queue_entry:
451 queue_entry.status = (
452 DocumentStatus.COMPLETED
453 if success
454 else DocumentStatus.FAILED
455 )
456 queue_entry.completed_at = datetime.now(UTC)
458 session.commit()
459 return success, skip_reason
461 def _download_pdf(
462 self,
463 resource: ResearchResource,
464 tracker: DownloadTracker,
465 session: Session,
466 collection_id: Optional[str] = None,
467 ) -> Tuple[bool, Optional[str]]:
468 """
469 Perform the actual PDF download.
471 Args:
472 resource: The research resource to download
473 tracker: Download tracker for this URL
474 session: Database session
475 collection_id: Optional target collection ID (defaults to Library if not provided)
477 Returns:
478 Tuple of (success: bool, skip_reason: Optional[str])
479 """
480 url = resource.url
482 # Log attempt
483 attempt = DownloadAttempt(
484 url_hash=tracker.url_hash,
485 attempt_number=tracker.download_attempts.count() + 1
486 if hasattr(tracker, "download_attempts")
487 else 1,
488 attempted_at=datetime.now(UTC),
489 )
490 session.add(attempt)
492 try:
493 # Use modular downloaders with detailed skip reasons
494 pdf_content = None
495 downloader_used = None
496 skip_reason = None
498 for downloader in self.downloaders:
499 if downloader.can_handle(url):
500 logger.info(
501 f"Using {downloader.__class__.__name__} for {url}"
502 )
503 result = downloader.download_with_result(
504 url, ContentType.PDF
505 )
506 downloader_used = downloader.__class__.__name__
508 if result.is_success and result.content:
509 pdf_content = result.content
510 break
511 elif result.skip_reason:
512 skip_reason = result.skip_reason
513 logger.info(f"Download skipped: {skip_reason}")
514 # Keep trying other downloaders unless it's the GenericDownloader
515 if isinstance(downloader, GenericDownloader):
516 break
518 if not downloader_used:
519 logger.error(f"No downloader found for {url}")
520 skip_reason = "No compatible downloader available"
522 if not pdf_content:
523 error_msg = skip_reason or "Failed to download PDF content"
524 # Store skip reason in attempt for retrieval
525 attempt.error_message = error_msg
526 attempt.succeeded = False
527 session.commit()
528 logger.info(f"Download failed with reason: {error_msg}")
529 return False, error_msg
531 # Get PDF storage mode setting
532 pdf_storage_mode = self.settings.get_setting(
533 "research_library.pdf_storage_mode", "none"
534 )
535 max_pdf_size_mb = int(
536 self.settings.get_setting(
537 "research_library.max_pdf_size_mb", 100
538 )
539 )
540 logger.info(
541 f"[DOWNLOAD_SERVICE] PDF storage mode: {pdf_storage_mode}"
542 )
544 # Update tracker
545 import hashlib
547 tracker.file_hash = hashlib.sha256(pdf_content).hexdigest()
548 tracker.file_size = len(pdf_content)
549 tracker.is_downloaded = True
550 tracker.downloaded_at = datetime.now(UTC)
552 # Initialize PDF storage manager
553 pdf_storage_manager = PDFStorageManager(
554 library_root=self.library_root,
555 storage_mode=pdf_storage_mode,
556 max_pdf_size_mb=max_pdf_size_mb,
557 )
559 # Update attempt with success info
560 attempt.succeeded = True
562 # Check if library document already exists
563 existing_doc = (
564 session.query(Document)
565 .filter_by(resource_id=resource.id)
566 .first()
567 )
569 if existing_doc:
570 # Update existing document
571 existing_doc.document_hash = tracker.file_hash
572 existing_doc.file_size = len(pdf_content)
573 existing_doc.status = DocumentStatus.COMPLETED
574 existing_doc.processed_at = datetime.now(UTC)
576 # Save PDF using storage manager (updates storage_mode and file_path)
577 file_path_result, _ = pdf_storage_manager.save_pdf(
578 pdf_content=pdf_content,
579 document=existing_doc,
580 session=session,
581 filename=f"{resource.id}.pdf",
582 url=url,
583 resource_id=resource.id,
584 )
586 # Update tracker
587 tracker.file_path = (
588 file_path_result if file_path_result else None
589 )
590 tracker.file_name = (
591 Path(file_path_result).name
592 if file_path_result and file_path_result != "database"
593 else None
594 )
595 else:
596 # Get source type ID for research downloads
597 try:
598 source_type_id = get_source_type_id(
599 self.username, "research_download"
600 )
601 # Use provided collection_id or default to Library
602 library_collection_id = (
603 collection_id or get_default_library_id(self.username)
604 )
605 except Exception:
606 logger.exception(
607 "Failed to get source type or library collection"
608 )
609 raise
611 # Create new unified document entry
612 doc_id = str(uuid.uuid4())
613 doc = Document(
614 id=doc_id,
615 source_type_id=source_type_id,
616 resource_id=resource.id,
617 research_id=resource.research_id,
618 document_hash=tracker.file_hash,
619 original_url=url,
620 file_size=len(pdf_content),
621 file_type="pdf",
622 mime_type="application/pdf",
623 title=resource.title,
624 status=DocumentStatus.COMPLETED,
625 processed_at=datetime.now(UTC),
626 storage_mode=pdf_storage_mode,
627 )
628 session.add(doc)
629 session.flush() # Ensure doc.id is available for blob storage
631 # Save PDF using storage manager (updates storage_mode and file_path)
632 file_path_result, _ = pdf_storage_manager.save_pdf(
633 pdf_content=pdf_content,
634 document=doc,
635 session=session,
636 filename=f"{resource.id}.pdf",
637 url=url,
638 resource_id=resource.id,
639 )
641 # Update tracker
642 tracker.file_path = (
643 file_path_result if file_path_result else None
644 )
645 tracker.file_name = (
646 Path(file_path_result).name
647 if file_path_result and file_path_result != "database"
648 else None
649 )
651 # Link document to default Library collection
652 doc_collection = DocumentCollection(
653 document_id=doc_id,
654 collection_id=library_collection_id,
655 indexed=False,
656 )
657 session.add(doc_collection)
659 # Update attempt
660 attempt.succeeded = True
661 attempt.bytes_downloaded = len(pdf_content)
663 if pdf_storage_mode == "database":
664 logger.info(
665 f"Successfully stored PDF in database: {resource.url}"
666 )
667 elif pdf_storage_mode == "filesystem":
668 logger.info(f"Successfully downloaded: {tracker.file_path}")
669 else:
670 logger.info(f"Successfully extracted text from: {resource.url}")
672 # Automatically extract and save text after successful PDF download
673 try:
674 logger.info(
675 f"Extracting text from downloaded PDF for: {resource.title[:50]}"
676 )
677 text = self._extract_text_from_pdf(pdf_content)
679 if text:
680 # Get the document ID we just created/updated
681 pdf_doc = (
682 session.query(Document)
683 .filter_by(resource_id=resource.id)
684 .first()
685 )
686 pdf_document_id = pdf_doc.id if pdf_doc else None
688 # Save text to encrypted database
689 self._save_text_with_db(
690 resource=resource,
691 text=text,
692 session=session,
693 extraction_method="pdf_extraction",
694 extraction_source="local_pdf",
695 pdf_document_id=pdf_document_id,
696 )
697 logger.info(
698 f"Successfully extracted and saved text for: {resource.title[:50]}"
699 )
700 else:
701 logger.warning(
702 f"Text extraction returned empty text for: {resource.title[:50]}"
703 )
704 except Exception as e:
705 logger.exception(
706 f"Failed to extract text from PDF, but PDF download succeeded: {e}"
707 )
708 # Don't fail the entire download if text extraction fails
710 return True, None
712 except Exception as e:
713 logger.exception(f"Download failed for {url}")
714 attempt.succeeded = False
715 attempt.error_type = type(e).__name__
716 attempt.error_message = str(e)
717 tracker.is_accessible = False
718 # Sanitize error message before returning to API
719 safe_error = redact_data(str(e))
720 return False, safe_error
722 def _extract_text_from_pdf(self, pdf_content: bytes) -> Optional[str]:
723 """
724 Extract text from PDF content using multiple methods for best results.
726 Args:
727 pdf_content: Raw PDF bytes
729 Returns:
730 Extracted text or None if extraction fails
731 """
732 try:
733 # First try with pdfplumber (better for complex layouts)
734 import io
736 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf:
737 text_parts = []
738 for page in pdf.pages:
739 page_text = page.extract_text()
740 if page_text:
741 text_parts.append(page_text)
743 if text_parts:
744 return "\n\n".join(text_parts)
746 # Fallback to PyPDF if pdfplumber fails
747 reader = PdfReader(io.BytesIO(pdf_content))
748 text_parts = []
749 for page in reader.pages:
750 text = page.extract_text()
751 if text:
752 text_parts.append(text)
754 if text_parts:
755 return "\n\n".join(text_parts)
757 logger.warning("No text could be extracted from PDF")
758 return None
760 except Exception:
761 logger.exception("Failed to extract text from PDF")
762 return None
764 def download_as_text(self, resource_id: int) -> Tuple[bool, Optional[str]]:
765 """
766 Download resource and extract text to encrypted database.
768 Args:
769 resource_id: ID of the resource to download
771 Returns:
772 Tuple of (success, error_message)
773 """
774 with get_user_db_session(self.username, self.password) as session:
775 # Get the resource
776 resource = (
777 session.query(ResearchResource)
778 .filter_by(id=resource_id)
779 .first()
780 )
781 if not resource:
782 return False, "Resource not found"
784 # Try existing text in database
785 result = self._try_existing_text(session, resource_id)
786 if result is not None:
787 return result
789 # Try legacy text files on disk
790 result = self._try_legacy_text_file(session, resource, resource_id)
791 if result is not None:
792 return result
794 # Try extracting from existing PDF
795 result = self._try_existing_pdf_extraction(
796 session, resource, resource_id
797 )
798 if result is not None:
799 return result
801 # Try API text extraction
802 result = self._try_api_text_extraction(session, resource)
803 if result is not None:
804 return result
806 # Fallback: Download PDF and extract
807 return self._fallback_pdf_extraction(session, resource)
809 def _try_existing_text(
810 self, session, resource_id: int
811 ) -> Optional[Tuple[bool, Optional[str]]]:
812 """Check if text already exists in database (in Document.text_content)."""
813 existing_doc = (
814 session.query(Document).filter_by(resource_id=resource_id).first()
815 )
817 if not existing_doc:
818 return None
820 # Check if text content exists and extraction was successful
821 if (
822 existing_doc.text_content
823 and existing_doc.extraction_method
824 and existing_doc.extraction_method != "failed"
825 ):
826 logger.info(
827 f"Text content already exists in Document for resource_id={resource_id}, extraction_method={existing_doc.extraction_method}"
828 )
829 return True, None
831 # No text content or failed extraction
832 logger.debug(
833 f"Document exists but no valid text content: resource_id={resource_id}, extraction_method={existing_doc.extraction_method}"
834 )
835 return None # Fall through to re-extraction
837 def _try_legacy_text_file(
838 self, session, resource, resource_id: int
839 ) -> Optional[Tuple[bool, Optional[str]]]:
840 """Check for legacy text files on disk."""
841 txt_path = Path(self.library_root) / "txt"
842 existing_files = (
843 list(txt_path.glob(f"*_{resource_id}.txt"))
844 if txt_path.exists()
845 else []
846 )
848 if not existing_files:
849 return None
851 logger.info(f"Text file already exists on disk: {existing_files[0]}")
852 self._create_text_document_record(
853 session,
854 resource,
855 existing_files[0],
856 extraction_method="unknown",
857 extraction_source="legacy_file",
858 )
859 session.commit()
860 return True, None
862 def _try_existing_pdf_extraction(
863 self, session, resource, resource_id: int
864 ) -> Optional[Tuple[bool, Optional[str]]]:
865 """Try extracting text from existing PDF in database."""
866 pdf_document = (
867 session.query(Document).filter_by(resource_id=resource_id).first()
868 )
870 if not pdf_document or pdf_document.status != "completed":
871 return None
873 pdf_path = Path(self.library_root) / pdf_document.file_path
874 if not pdf_path.exists():
875 return None
877 logger.info(f"Found existing PDF, extracting text from: {pdf_path}")
878 try:
879 with open(pdf_path, "rb") as f:
880 pdf_content = f.read()
881 text = self._extract_text_from_pdf(pdf_content)
883 if not text:
884 return None
886 self._save_text_with_db(
887 resource,
888 text,
889 session,
890 extraction_method="pdf_extraction",
891 extraction_source="pdfplumber",
892 pdf_document_id=pdf_document.id,
893 )
894 session.commit()
895 return True, None
897 except Exception:
898 logger.exception(
899 f"Failed to extract text from existing PDF: {pdf_path}"
900 )
901 return None # Fall through to other methods
903 def _try_api_text_extraction(
904 self, session, resource
905 ) -> Optional[Tuple[bool, Optional[str]]]:
906 """Try direct API text extraction."""
907 logger.info(
908 f"Attempting direct API text extraction from: {resource.url}"
909 )
911 downloader = self._get_downloader(resource.url)
912 if not downloader:
913 return None
915 result = downloader.download_with_result(resource.url, ContentType.TEXT)
917 if not result.is_success or not result.content:
918 return None
920 # Decode text content
921 text = (
922 result.content.decode("utf-8", errors="ignore")
923 if isinstance(result.content, bytes)
924 else result.content
925 )
927 # Determine extraction source
928 extraction_source = "unknown"
929 if isinstance(downloader, ArxivDownloader):
930 extraction_source = "arxiv_api"
931 elif isinstance(downloader, PubMedDownloader):
932 extraction_source = "pubmed_api"
934 try:
935 self._save_text_with_db(
936 resource,
937 text,
938 session,
939 extraction_method="native_api",
940 extraction_source=extraction_source,
941 )
942 session.commit()
943 logger.info(
944 f"✓ SUCCESS: Got text from {extraction_source.upper()} API for '{resource.title[:50]}...'"
945 )
946 return True, None
947 except Exception as e:
948 logger.exception(f"Failed to save text for resource {resource.id}")
949 # Sanitize error message before returning to API
950 safe_error = redact_data(f"Failed to save text: {str(e)}")
951 return False, safe_error
953 def _fallback_pdf_extraction(
954 self, session, resource
955 ) -> Tuple[bool, Optional[str]]:
956 """Fallback: Download PDF to memory and extract text."""
957 logger.info(
958 f"API text extraction failed, falling back to in-memory PDF download for: {resource.url}"
959 )
961 downloader = self._get_downloader(resource.url)
962 if not downloader:
963 error_msg = "No compatible downloader found"
964 logger.warning(
965 f"✗ FAILED: {error_msg} for '{resource.title[:50]}...'"
966 )
967 self._record_failed_text_extraction(
968 session, resource, error=error_msg
969 )
970 session.commit()
971 return False, error_msg
973 result = downloader.download_with_result(resource.url, ContentType.PDF)
975 if not result.is_success or not result.content:
976 error_msg = result.skip_reason or "Failed to download PDF"
977 logger.warning(
978 f"✗ FAILED: Could not download PDF for '{resource.title[:50]}...' | Error: {error_msg}"
979 )
980 self._record_failed_text_extraction(
981 session, resource, error=f"PDF download failed: {error_msg}"
982 )
983 session.commit()
984 return False, f"PDF extraction failed: {error_msg}"
986 # Extract text from PDF
987 text = self._extract_text_from_pdf(result.content)
988 if not text:
989 error_msg = "PDF text extraction returned empty text"
990 logger.warning(
991 f"Failed to extract text from PDF for: {resource.url}"
992 )
993 self._record_failed_text_extraction(
994 session, resource, error=error_msg
995 )
996 session.commit()
997 return False, error_msg
999 try:
1000 self._save_text_with_db(
1001 resource,
1002 text,
1003 session,
1004 extraction_method="pdf_extraction",
1005 extraction_source="pdfplumber_fallback",
1006 )
1007 session.commit()
1008 logger.info(
1009 f"✓ SUCCESS: Extracted text from '{resource.title[:50]}...'"
1010 )
1011 return True, None
1012 except Exception as e:
1013 logger.exception(f"Failed to save text for resource {resource.id}")
1014 # Sanitize error message before returning to API
1015 safe_error = redact_data(f"Failed to save text: {str(e)}")
1016 return False, safe_error
1018 def _get_downloader(self, url: str):
1019 """
1020 Get the appropriate downloader for a URL.
1022 Args:
1023 url: The URL to download from
1025 Returns:
1026 The appropriate downloader instance or None
1027 """
1028 for downloader in self.downloaders: 1028 ↛ 1031line 1028 didn't jump to line 1031 because the loop on line 1028 didn't complete
1029 if downloader.can_handle(url):
1030 return downloader
1031 return None
1033 def _download_generic(self, url: str) -> Optional[bytes]:
1034 """Generic PDF download method."""
1035 try:
1036 headers = {
1037 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1038 }
1039 response = safe_get(
1040 url, headers=headers, timeout=30, allow_redirects=True
1041 )
1042 response.raise_for_status()
1044 # Verify it's a PDF
1045 content_type = response.headers.get("Content-Type", "")
1046 if (
1047 "pdf" not in content_type.lower()
1048 and not response.content.startswith(b"%PDF")
1049 ):
1050 logger.warning(f"Response is not a PDF: {content_type}")
1051 return None
1053 return response.content
1055 except Exception:
1056 logger.exception("Generic download failed")
1057 return None
1059 def _download_arxiv(self, url: str) -> Optional[bytes]:
1060 """Download from arXiv."""
1061 try:
1062 # Convert abstract URL to PDF URL
1063 pdf_url = url.replace("abs", "pdf")
1064 if not pdf_url.endswith(".pdf"):
1065 pdf_url += ".pdf"
1067 return self._download_generic(pdf_url)
1068 except Exception:
1069 logger.exception("arXiv download failed")
1070 return None
1072 def _try_europe_pmc(self, pmid: str) -> Optional[bytes]:
1073 """Try downloading from Europe PMC which often has better PDF availability."""
1074 try:
1075 # Europe PMC API is more reliable for PDFs
1076 # Check if PDF is available
1077 api_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=EXT_ID:{pmid}&format=json"
1078 response = safe_get(api_url, timeout=10)
1080 if response.status_code == 200:
1081 data = response.json()
1082 results = data.get("resultList", {}).get("result", [])
1084 if results:
1085 article = results[0]
1086 # Check if article has open access PDF
1087 if (
1088 article.get("isOpenAccess") == "Y"
1089 and article.get("hasPDF") == "Y"
1090 ):
1091 pmcid = article.get("pmcid")
1092 if pmcid:
1093 # Europe PMC PDF URL
1094 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmcid}&blobtype=pdf"
1095 logger.info(
1096 f"Found Europe PMC PDF for PMID {pmid}: {pmcid}"
1097 )
1099 headers = {
1100 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1101 }
1103 pdf_response = safe_get(
1104 pdf_url,
1105 headers=headers,
1106 timeout=30,
1107 allow_redirects=True,
1108 )
1109 if pdf_response.status_code == 200:
1110 content_type = pdf_response.headers.get(
1111 "content-type", ""
1112 )
1113 if (
1114 "pdf" in content_type.lower()
1115 or len(pdf_response.content) > 1000
1116 ):
1117 return pdf_response.content
1118 except Exception as e:
1119 logger.debug(f"Europe PMC download failed: {e}")
1121 return None
1123 def _download_pubmed(self, url: str) -> Optional[bytes]:
1124 """Download from PubMed/PubMed Central with rate limiting."""
1125 try:
1126 # Apply rate limiting for PubMed requests
1127 current_time = time.time()
1128 time_since_last = current_time - self._last_pubmed_request
1129 if time_since_last < self._pubmed_delay:
1130 sleep_time = self._pubmed_delay - time_since_last
1131 logger.debug(
1132 f"Rate limiting: sleeping {sleep_time:.2f}s before PubMed request"
1133 )
1134 time.sleep(sleep_time)
1135 self._last_pubmed_request = time.time()
1137 # If it's already a PMC article, download directly
1138 if "/articles/PMC" in url:
1139 pmc_match = re.search(r"(PMC\d+)", url)
1140 if pmc_match:
1141 pmc_id = pmc_match.group(1)
1143 # Try Europe PMC (more reliable)
1144 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1145 headers = {
1146 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1147 }
1149 try:
1150 response = safe_get(
1151 europe_url,
1152 headers=headers,
1153 timeout=30,
1154 allow_redirects=True,
1155 )
1156 if response.status_code == 200:
1157 content_type = response.headers.get(
1158 "content-type", ""
1159 )
1160 if (
1161 "pdf" in content_type.lower()
1162 or len(response.content) > 1000
1163 ):
1164 logger.info(
1165 f"Downloaded PDF via Europe PMC for {pmc_id}"
1166 )
1167 return response.content
1168 except Exception as e:
1169 logger.debug(f"Direct Europe PMC download failed: {e}")
1170 return None
1172 # If it's a regular PubMed URL, try to find PMC version
1173 elif urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov":
1174 # Extract PMID from URL
1175 pmid_match = re.search(r"/(\d+)/?", url)
1176 if pmid_match:
1177 pmid = pmid_match.group(1)
1178 logger.info(f"Attempting to download PDF for PMID: {pmid}")
1180 # Try Europe PMC first (more reliable)
1181 pdf_content = self._try_europe_pmc(pmid)
1182 if pdf_content:
1183 return pdf_content
1185 # First try using NCBI E-utilities API to find PMC ID
1186 try:
1187 # Use elink to convert PMID to PMCID
1188 elink_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi"
1189 params = {
1190 "dbfrom": "pubmed",
1191 "db": "pmc",
1192 "id": pmid,
1193 "retmode": "json",
1194 }
1196 api_response = safe_get(
1197 elink_url, params=params, timeout=10
1198 )
1199 if api_response.status_code == 200:
1200 data = api_response.json()
1201 # Parse the response to find PMC ID
1202 link_sets = data.get("linksets", [])
1203 if link_sets and "linksetdbs" in link_sets[0]:
1204 for linksetdb in link_sets[0]["linksetdbs"]:
1205 if linksetdb.get(
1206 "dbto"
1207 ) == "pmc" and linksetdb.get("links"):
1208 pmc_id_num = linksetdb["links"][0]
1209 # Now fetch PMC details to get the correct PMC ID format
1210 esummary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
1211 summary_params = {
1212 "db": "pmc",
1213 "id": pmc_id_num,
1214 "retmode": "json",
1215 }
1216 summary_response = safe_get(
1217 esummary_url,
1218 params=summary_params,
1219 timeout=10,
1220 )
1221 if summary_response.status_code == 200:
1222 summary_data = (
1223 summary_response.json()
1224 )
1225 result = summary_data.get(
1226 "result", {}
1227 ).get(str(pmc_id_num), {})
1228 if result:
1229 # PMC IDs in the API don't have the "PMC" prefix
1230 pmc_id = f"PMC{pmc_id_num}"
1231 logger.info(
1232 f"Found PMC ID via API: {pmc_id} for PMID: {pmid}"
1233 )
1235 # Try Europe PMC with the PMC ID
1236 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1238 time.sleep(self._pubmed_delay)
1240 headers = {
1241 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1242 }
1244 try:
1245 response = safe_get(
1246 europe_url,
1247 headers=headers,
1248 timeout=30,
1249 allow_redirects=True,
1250 )
1251 if (
1252 response.status_code
1253 == 200
1254 ):
1255 content_type = response.headers.get(
1256 "content-type", ""
1257 )
1258 if (
1259 "pdf"
1260 in content_type.lower()
1261 or len(
1262 response.content
1263 )
1264 > 1000
1265 ):
1266 logger.info(
1267 f"Downloaded PDF via Europe PMC for {pmc_id}"
1268 )
1269 return (
1270 response.content
1271 )
1272 except Exception as e:
1273 logger.debug(
1274 f"Europe PMC download with PMC ID failed: {e}"
1275 )
1276 except Exception as e:
1277 logger.debug(
1278 f"API lookup failed, trying webpage scraping: {e}"
1279 )
1281 # Fallback to webpage scraping if API fails
1282 try:
1283 headers = {
1284 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1285 }
1286 response = safe_get(url, headers=headers, timeout=10)
1287 if response.status_code == 200:
1288 # Look for PMC ID in the page
1289 pmc_match = re.search(r"PMC\d+", response.text)
1290 if pmc_match:
1291 pmc_id = pmc_match.group(0)
1292 logger.info(
1293 f"Found PMC ID via webpage: {pmc_id} for PMID: {pmid}"
1294 )
1296 # Add delay before downloading PDF
1297 time.sleep(self._pubmed_delay)
1299 # Try Europe PMC with the PMC ID (more reliable)
1300 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1301 headers = {
1302 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1303 }
1305 try:
1306 response = safe_get(
1307 europe_url,
1308 headers=headers,
1309 timeout=30,
1310 allow_redirects=True,
1311 )
1312 if response.status_code == 200:
1313 content_type = response.headers.get(
1314 "content-type", ""
1315 )
1316 if (
1317 "pdf" in content_type.lower()
1318 or len(response.content) > 1000
1319 ):
1320 logger.info(
1321 f"Downloaded PDF via Europe PMC for {pmc_id}"
1322 )
1323 return response.content
1324 except Exception as e:
1325 logger.debug(
1326 f"Europe PMC download failed: {e}"
1327 )
1328 else:
1329 logger.info(
1330 f"No PMC version found for PMID: {pmid}"
1331 )
1332 except requests.exceptions.HTTPError as e:
1333 if e.response.status_code == 429:
1334 logger.warning(
1335 "Rate limited by PubMed, increasing delay"
1336 )
1337 self._pubmed_delay = min(
1338 self._pubmed_delay * 2, 5.0
1339 ) # Max 5 seconds
1340 raise
1341 except Exception as e:
1342 logger.debug(f"Could not check for PMC version: {e}")
1344 return self._download_generic(url)
1345 except Exception:
1346 logger.exception("PubMed download failed")
1347 return None
1349 def _download_semantic_scholar(self, url: str) -> Optional[bytes]:
1350 """Download from Semantic Scholar."""
1351 # Semantic Scholar doesn't host PDFs directly
1352 # Would need to extract actual PDF URL from page
1353 return None
1355 def _download_biorxiv(self, url: str) -> Optional[bytes]:
1356 """Download from bioRxiv."""
1357 try:
1358 # Convert to PDF URL
1359 pdf_url = url.replace(".org/", ".org/content/")
1360 pdf_url = re.sub(r"v\d+$", "", pdf_url) # Remove version
1361 pdf_url += ".full.pdf"
1363 return self._download_generic(pdf_url)
1364 except Exception:
1365 logger.exception("bioRxiv download failed")
1366 return None
1368 def _download_medrxiv(self, url: str) -> Optional[bytes]:
1369 """Download from medRxiv."""
1370 # Same as bioRxiv
1371 return self._download_biorxiv(url)
1373 def _save_text_with_db(
1374 self,
1375 resource: ResearchResource,
1376 text: str,
1377 session: Session,
1378 extraction_method: str,
1379 extraction_source: str,
1380 pdf_document_id: Optional[int] = None,
1381 ) -> Optional[str]:
1382 """
1383 Save extracted text to encrypted database.
1385 Args:
1386 resource: The research resource
1387 text: Extracted text content
1388 session: Database session
1389 extraction_method: How the text was extracted
1390 extraction_source: Specific tool/API used
1391 pdf_document_id: ID of PDF document if extracted from PDF
1393 Returns:
1394 None (previously returned text file path, now removed)
1395 """
1396 try:
1397 # Calculate text metadata for database
1398 word_count = len(text.split())
1399 character_count = len(text)
1401 # Find the document by pdf_document_id or resource_id
1402 doc = None
1403 if pdf_document_id:
1404 doc = (
1405 session.query(Document)
1406 .filter_by(id=pdf_document_id)
1407 .first()
1408 )
1409 else:
1410 doc = (
1411 session.query(Document)
1412 .filter_by(resource_id=resource.id)
1413 .first()
1414 )
1416 if doc:
1417 # Update existing document with extracted text
1418 doc.text_content = text
1419 doc.character_count = character_count
1420 doc.word_count = word_count
1421 doc.extraction_method = extraction_method
1422 doc.extraction_source = extraction_source
1424 # Set quality based on method
1425 if extraction_method == "native_api":
1426 doc.extraction_quality = "high"
1427 elif (
1428 extraction_method == "pdf_extraction"
1429 and extraction_source == "pdfplumber"
1430 ):
1431 doc.extraction_quality = "medium"
1432 else:
1433 doc.extraction_quality = "low"
1435 logger.debug(
1436 f"Updated document {doc.id} with extracted text ({word_count} words)"
1437 )
1438 else:
1439 # Create a new Document for text-only extraction
1440 # Generate hash from text content
1441 text_hash = hashlib.sha256(text.encode()).hexdigest()
1443 # Get source type for research downloads
1444 try:
1445 source_type_id = get_source_type_id(
1446 self.username, "research_download"
1447 )
1448 except Exception:
1449 logger.exception(
1450 "Failed to get source type for text document"
1451 )
1452 raise
1454 # Create new document
1455 doc_id = str(uuid.uuid4())
1456 doc = Document(
1457 id=doc_id,
1458 source_type_id=source_type_id,
1459 resource_id=resource.id,
1460 research_id=resource.research_id,
1461 document_hash=text_hash,
1462 original_url=resource.url,
1463 file_path="text_only_not_stored",
1464 file_size=character_count, # Use character count as file size for text-only
1465 file_type="text",
1466 mime_type="text/plain",
1467 title=resource.title,
1468 text_content=text,
1469 character_count=character_count,
1470 word_count=word_count,
1471 extraction_method=extraction_method,
1472 extraction_source=extraction_source,
1473 extraction_quality="high"
1474 if extraction_method == "native_api"
1475 else "medium",
1476 status=DocumentStatus.COMPLETED,
1477 processed_at=datetime.now(UTC),
1478 )
1479 session.add(doc)
1481 # Link to default Library collection
1482 library_collection = (
1483 session.query(Collection).filter_by(name="Library").first()
1484 )
1485 if library_collection:
1486 doc_collection = DocumentCollection(
1487 document_id=doc_id,
1488 collection_id=library_collection.id,
1489 indexed=False,
1490 chunk_count=0,
1491 )
1492 session.add(doc_collection)
1493 else:
1494 logger.warning(
1495 f"Library collection not found - document {doc_id} will not be linked to default collection"
1496 )
1498 logger.info(
1499 f"Created new document {doc_id} for text-only extraction ({word_count} words)"
1500 )
1502 logger.info(
1503 f"Saved text to encrypted database ({word_count} words)"
1504 )
1505 return None
1507 except Exception:
1508 logger.exception("Error saving text to encrypted database")
1509 raise # Re-raise so caller can handle the error
1511 def _create_text_document_record(
1512 self,
1513 session: Session,
1514 resource: ResearchResource,
1515 file_path: Path,
1516 extraction_method: str,
1517 extraction_source: str,
1518 ):
1519 """Update existing Document with text from file (for legacy text files)."""
1520 try:
1521 # Read file to get metadata
1522 text = file_path.read_text(encoding="utf-8", errors="ignore")
1523 word_count = len(text.split())
1524 character_count = len(text)
1526 # Find the Document by resource_id
1527 doc = (
1528 session.query(Document)
1529 .filter_by(resource_id=resource.id)
1530 .first()
1531 )
1533 if doc:
1534 # Update existing document with text content
1535 doc.text_content = text
1536 doc.character_count = character_count
1537 doc.word_count = word_count
1538 doc.extraction_method = extraction_method
1539 doc.extraction_source = extraction_source
1540 doc.extraction_quality = (
1541 "low" # Unknown quality for legacy files
1542 )
1543 logger.info(
1544 f"Updated document {doc.id} with text from file: {file_path.name}"
1545 )
1546 else:
1547 logger.warning(
1548 f"No document found to update for resource {resource.id}"
1549 )
1551 except Exception:
1552 logger.exception("Error updating document with text from file")
1554 def _record_failed_text_extraction(
1555 self, session: Session, resource: ResearchResource, error: str
1556 ):
1557 """Record a failed text extraction attempt in the Document."""
1558 try:
1559 # Find the Document by resource_id
1560 doc = (
1561 session.query(Document)
1562 .filter_by(resource_id=resource.id)
1563 .first()
1564 )
1566 if doc:
1567 # Update document with extraction error
1568 doc.error_message = error
1569 doc.extraction_method = "failed"
1570 doc.extraction_quality = "low"
1571 logger.info(
1572 f"Recorded failed text extraction for document {doc.id}: {error}"
1573 )
1574 else:
1575 logger.warning(
1576 f"No document found to record extraction failure for resource {resource.id}"
1577 )
1579 except Exception:
1580 logger.exception("Error recording failed text extraction")