Coverage for src / local_deep_research / research_library / services / download_service.py: 95%
705 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2PDF Download Service for Research Library
4Handles downloading PDFs from various academic sources with:
5- Deduplication using download tracker
6- Source-specific download strategies (arXiv, PubMed, etc.)
7- Progress tracking and error handling
8- File organization and storage
9"""
11import hashlib
12import os
13import re
14import time
15import uuid
16from datetime import datetime, UTC
17from pathlib import Path
18from typing import Optional, Tuple
19from urllib.parse import urlparse
21import requests
22from loguru import logger
23from sqlalchemy.orm import Session
24import pdfplumber
25from pypdf import PdfReader
27from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY
28from ...database.models.download_tracker import (
29 DownloadAttempt,
30 DownloadTracker,
31)
32from ...security import safe_get, sanitize_for_log
33from ...security.path_validator import PathValidator
34from ...database.models.library import (
35 Collection,
36 Document as Document,
37 DocumentStatus,
38 DownloadQueue as LibraryDownloadQueue,
39 DocumentCollection,
40)
41from .pdf_storage_manager import PDFStorageManager
42from ...database.models.research import ResearchResource
43from ...database.library_init import get_source_type_id, get_default_library_id
44from ...database.session_context import get_user_db_session
45from ...utilities.db_utils import get_settings_manager
46from ...library.download_management import RetryManager
47from ...config.paths import get_library_directory
48from ..utils import (
49 get_document_for_resource,
50 get_url_hash,
51 get_absolute_path_from_settings,
52 is_downloadable_url,
53)
55# Import our modular downloaders
56from ..downloaders import (
57 ContentType,
58 ArxivDownloader,
59 PubMedDownloader,
60 BioRxivDownloader,
61 DirectPDFDownloader,
62 SemanticScholarDownloader,
63 OpenAlexDownloader,
64 GenericDownloader,
65)
68class DownloadService:
69 """Service for downloading and managing research PDFs."""
71 def __init__(self, username: str, password: Optional[str] = None):
72 """Initialize download service for a user.
74 Args:
75 username: The username to download for
76 password: Optional password for encrypted database access
77 """
78 self.username = username
79 self.password = password
80 self.settings = get_settings_manager(username=username)
81 self._closed = False
83 # Debug settings manager and user context
84 logger.info(
85 f"[DOWNLOAD_SERVICE] Settings manager initialized: {type(self.settings)}, username: {self.username}"
86 )
88 # Get library path from settings (uses centralized path, respects LDR_DATA_DIR)
89 storage_path_setting = self.settings.get_setting(
90 "research_library.storage_path",
91 str(get_library_directory()),
92 )
93 logger.warning(
94 f"[DOWNLOAD_SERVICE_INIT] Storage path setting retrieved: {storage_path_setting} (type: {type(storage_path_setting)})"
95 )
97 if storage_path_setting is None:
98 logger.error(
99 "[DOWNLOAD_SERVICE_INIT] CRITICAL: storage_path_setting is None!"
100 )
101 raise ValueError("Storage path setting cannot be None")
103 self.library_root = str(
104 Path(os.path.expandvars(storage_path_setting))
105 .expanduser()
106 .resolve()
107 )
108 logger.warning(
109 f"[DOWNLOAD_SERVICE_INIT] Library root resolved to: {self.library_root}"
110 )
112 # Create directory structure
113 self._setup_directories()
115 # Initialize modular downloaders
116 # DirectPDFDownloader first for efficiency with direct PDF links
118 # Get Semantic Scholar API key from settings
119 semantic_scholar_api_key = self.settings.get_setting(
120 "search.engine.web.semantic_scholar.api_key", ""
121 )
123 self.downloaders = [
124 DirectPDFDownloader(timeout=30), # Handle direct PDF links first
125 SemanticScholarDownloader(
126 timeout=30,
127 api_key=semantic_scholar_api_key
128 if semantic_scholar_api_key
129 else None,
130 ),
131 OpenAlexDownloader(
132 timeout=30
133 ), # OpenAlex with API lookup (no key needed)
134 ArxivDownloader(timeout=30),
135 PubMedDownloader(timeout=30, rate_limit_delay=1.0),
136 BioRxivDownloader(timeout=30),
137 GenericDownloader(timeout=30), # Generic should be last (fallback)
138 ]
140 # Initialize retry manager for smart failure tracking
141 self.retry_manager = RetryManager(username, password)
142 logger.info(
143 f"[DOWNLOAD_SERVICE] Initialized retry manager for user: {username}"
144 )
146 # PubMed rate limiting state
147 self._pubmed_delay = 1.0 # 1 second delay for PubMed
148 self._last_pubmed_request = 0.0 # Track last request time
150 def close(self):
151 """Close all downloader resources."""
152 if self._closed:
153 return
154 self._closed = True
156 from ...utilities.resource_utils import safe_close
158 for downloader in self.downloaders:
159 safe_close(downloader, "downloader")
161 # Close the settings manager's DB session to return the connection
162 # to the pool. SettingsManager.close() is idempotent.
163 safe_close(self.settings, "settings manager", allow_none=True)
165 # Clear references to allow garbage collection
166 self.downloaders = []
167 self.retry_manager = None
168 self.settings = None
170 def __enter__(self):
171 """Enter context manager."""
172 return self
174 def __exit__(self, exc_type, exc_val, exc_tb):
175 """Exit context manager, ensuring cleanup."""
176 self.close()
177 return False
179 def _setup_directories(self):
180 """Create library directory structure."""
181 # Only create the root and pdfs folder - flat structure
182 paths = [
183 self.library_root,
184 str(Path(self.library_root) / "pdfs"),
185 ]
186 for path in paths:
187 Path(path).mkdir(parents=True, exist_ok=True)
189 def _normalize_url(self, url: str) -> str:
190 """Normalize URL for consistent hashing."""
191 # Remove protocol variations
192 url = re.sub(r"^https?://", "", url)
193 # Remove www
194 url = re.sub(r"^www\.", "", url)
195 # Remove trailing slashes
196 url = url.rstrip("/")
197 # Sort query parameters
198 if "?" in url:
199 base, query = url.split("?", 1)
200 params = sorted(query.split("&"))
201 url = f"{base}?{'&'.join(params)}"
202 return url.lower()
204 def _get_url_hash(self, url: str) -> str:
205 """Generate SHA256 hash of normalized URL."""
206 normalized = self._normalize_url(url)
207 return get_url_hash(normalized)
209 def is_already_downloaded(self, url: str) -> Tuple[bool, Optional[str]]:
210 """
211 Check if URL is already downloaded.
213 Returns:
214 Tuple of (is_downloaded, file_path)
215 """
216 url_hash = self._get_url_hash(url)
218 with get_user_db_session(self.username, self.password) as session:
219 tracker = (
220 session.query(DownloadTracker)
221 .filter_by(url_hash=url_hash, is_downloaded=True)
222 .first()
223 )
225 if tracker and tracker.file_path:
226 # Compute absolute path and verify file still exists
227 absolute_path = get_absolute_path_from_settings(
228 tracker.file_path
229 )
230 if absolute_path and absolute_path.is_file():
231 return True, str(absolute_path)
232 if absolute_path:
233 # File was deleted, mark as not downloaded
234 tracker.is_downloaded = False
235 session.commit()
236 # If absolute_path is None, path was blocked - treat as not downloaded
238 return False, None
240 def get_text_content(self, resource_id: int) -> Optional[str]:
241 """
242 Get text content for a research resource.
244 This will try to:
245 1. Fetch text directly from APIs if available
246 2. Extract text from downloaded PDF if exists
247 3. Download PDF and extract text if not yet downloaded
249 Args:
250 resource_id: ID of the research resource
252 Returns:
253 Text content as string, or None if extraction failed
254 """
255 with get_user_db_session(self.username, self.password) as session:
256 resource = session.query(ResearchResource).get(resource_id)
257 if not resource:
258 logger.error(f"Resource {resource_id} not found")
259 return None
261 url = resource.url
263 # Find appropriate downloader
264 for downloader in self.downloaders:
265 if downloader.can_handle(url):
266 logger.info(
267 f"Using {downloader.__class__.__name__} for text extraction from {url}"
268 )
269 try:
270 # Try to get text content
271 text = downloader.download_text(url)
272 if text:
273 logger.info(
274 f"Successfully extracted text for: {resource.title[:50]}"
275 )
276 return text
277 except Exception:
278 logger.exception("Failed to extract text")
279 break
281 logger.warning(f"Could not extract text for {url}")
282 return None
284 def queue_research_downloads(
285 self, research_id: str, collection_id: Optional[str] = None
286 ) -> int:
287 """
288 Queue all downloadable PDFs from a research session.
290 Args:
291 research_id: The research session ID
292 collection_id: Optional target collection ID (defaults to Library if not provided)
294 Returns:
295 Number of items queued
296 """
297 queued = 0
299 # Get default library collection if no collection_id provided
300 if not collection_id:
301 from ...database.library_init import get_default_library_id
303 collection_id = get_default_library_id(self.username, self.password)
305 with get_user_db_session(self.username, self.password) as session:
306 # Get all resources for this research
307 resources = (
308 session.query(ResearchResource)
309 .filter_by(research_id=research_id)
310 .all()
311 )
313 for resource in resources:
314 if self._is_downloadable(resource):
315 # Library resources linked via document_id are already done
316 if resource.document_id:
317 continue
319 # Check if already queued
320 existing_queue = (
321 session.query(LibraryDownloadQueue)
322 .filter_by(
323 resource_id=resource.id,
324 status=DocumentStatus.PENDING,
325 )
326 .first()
327 )
329 # Check if already downloaded (trust the database status)
330 existing_doc = (
331 session.query(Document)
332 .filter_by(
333 resource_id=resource.id,
334 status=DocumentStatus.COMPLETED,
335 )
336 .first()
337 )
339 # Queue if not already queued and not marked as completed
340 if not existing_queue and not existing_doc: 340 ↛ 313line 340 didn't jump to line 313 because the condition on line 340 was always true
341 # Check one more time if ANY queue entry exists (regardless of status)
342 any_queue = (
343 session.query(LibraryDownloadQueue)
344 .filter_by(resource_id=resource.id)
345 .first()
346 )
348 if any_queue:
349 # Reset the existing queue entry
350 any_queue.status = DocumentStatus.PENDING
351 any_queue.research_id = research_id
352 any_queue.collection_id = collection_id
353 queued += 1
354 else:
355 # Add new queue entry
356 queue_entry = LibraryDownloadQueue(
357 resource_id=resource.id,
358 research_id=research_id,
359 collection_id=collection_id,
360 priority=0,
361 status=DocumentStatus.PENDING,
362 )
363 session.add(queue_entry)
364 queued += 1
366 session.commit()
367 logger.info(
368 f"Queued {queued} downloads for research {research_id} to collection {collection_id}"
369 )
371 return queued
373 def _is_downloadable(self, resource: ResearchResource) -> bool:
374 """Check if a resource is likely downloadable as PDF.
376 Delegates to the consolidated is_downloadable_url() from utils.
377 """
378 return is_downloadable_url(resource.url)
380 def download_resource(self, resource_id: int) -> Tuple[bool, Optional[str]]:
381 """
382 Download a specific resource.
384 Returns:
385 Tuple of (success: bool, skip_reason: str or None)
386 """
387 with get_user_db_session(self.username, self.password) as session:
388 resource = session.query(ResearchResource).get(resource_id)
389 if not resource:
390 logger.error(f"Resource {resource_id} not found")
391 return False, "Resource not found"
393 # Check if already downloaded (trust the database after sync)
394 existing_doc = (
395 session.query(Document)
396 .filter_by(
397 resource_id=resource_id, status=DocumentStatus.COMPLETED
398 )
399 .first()
400 )
402 if existing_doc:
403 logger.info(
404 "Resource already downloaded (according to database)"
405 )
406 return True, None
408 # Get collection_id from queue entry if it exists
409 queue_entry = (
410 session.query(LibraryDownloadQueue)
411 .filter_by(resource_id=resource_id)
412 .first()
413 )
414 collection_id = (
415 queue_entry.collection_id
416 if queue_entry and queue_entry.collection_id
417 else None
418 )
420 # Create download tracker entry
421 url_hash = self._get_url_hash(resource.url)
422 tracker = (
423 session.query(DownloadTracker)
424 .filter_by(url_hash=url_hash)
425 .first()
426 )
428 if not tracker:
429 tracker = DownloadTracker(
430 url=resource.url,
431 url_hash=url_hash,
432 first_resource_id=resource.id,
433 is_downloaded=False,
434 )
435 session.add(tracker)
436 session.commit()
438 # Attempt download
439 success, skip_reason, status_code = self._download_pdf(
440 resource, tracker, session, collection_id
441 )
443 # Record attempt with retry manager for smart failure tracking
444 self.retry_manager.record_attempt(
445 resource_id=resource.id,
446 result=(success, skip_reason),
447 status_code=status_code,
448 url=resource.url,
449 details=skip_reason
450 or (
451 "Successfully downloaded" if success else "Download failed"
452 ),
453 session=session,
454 )
456 # Update queue status if exists
457 queue_entry = (
458 session.query(LibraryDownloadQueue)
459 .filter_by(resource_id=resource_id)
460 .first()
461 )
463 if queue_entry:
464 queue_entry.status = (
465 DocumentStatus.COMPLETED
466 if success
467 else DocumentStatus.FAILED
468 )
469 queue_entry.completed_at = datetime.now(UTC)
471 session.commit()
473 # Trigger auto-indexing for successfully downloaded documents
474 if success and self.password:
475 try:
476 from ..routes.rag_routes import trigger_auto_index
477 from ...database.library_init import get_default_library_id
479 # Get the document that was just created
480 doc = (
481 session.query(Document)
482 .filter_by(resource_id=resource_id)
483 .order_by(Document.created_at.desc())
484 .first()
485 )
486 if doc:
487 # Use collection_id from queue entry or default Library
488 # NB: pass username string, not the SQLAlchemy session
489 target_collection = (
490 collection_id
491 or get_default_library_id(
492 self.username, self.password
493 )
494 )
495 if target_collection: 495 ↛ anywhereline 495 didn't jump anywhere: it always raised an exception.
496 trigger_auto_index(
497 [doc.id],
498 target_collection,
499 self.username,
500 self.password,
501 )
502 except Exception:
503 logger.exception("Failed to trigger auto-indexing")
505 return success, skip_reason
507 def _download_pdf(
508 self,
509 resource: ResearchResource,
510 tracker: DownloadTracker,
511 session: Session,
512 collection_id: Optional[str] = None,
513 ) -> Tuple[bool, Optional[str], Optional[int]]:
514 """
515 Perform the actual PDF download.
517 Args:
518 resource: The research resource to download
519 tracker: Download tracker for this URL
520 session: Database session
521 collection_id: Optional target collection ID (defaults to Library if not provided)
523 Returns:
524 Tuple of (success: bool, skip_reason: Optional[str], status_code: Optional[int])
525 """
526 url = resource.url
528 # Log attempt
529 attempt = DownloadAttempt(
530 url_hash=tracker.url_hash,
531 attempt_number=tracker.download_attempts.count() + 1
532 if hasattr(tracker, "download_attempts")
533 else 1,
534 attempted_at=datetime.now(UTC),
535 )
536 session.add(attempt)
538 try:
539 # Use modular downloaders with detailed skip reasons
540 pdf_content = None
541 downloader_used = None
542 skip_reason = None
543 status_code = None
545 for downloader in self.downloaders:
546 if downloader.can_handle(url):
547 logger.info(
548 f"Using {downloader.__class__.__name__} for {url}"
549 )
550 result = downloader.download_with_result(
551 url, ContentType.PDF
552 )
553 downloader_used = downloader.__class__.__name__
555 if result.is_success and result.content:
556 pdf_content = result.content
557 status_code = result.status_code
558 break
559 if result.skip_reason: 559 ↛ 545line 559 didn't jump to line 545 because the condition on line 559 was always true
560 skip_reason = result.skip_reason
561 status_code = result.status_code
562 logger.info(f"Download skipped: {skip_reason}")
563 # Keep trying other downloaders unless it's the GenericDownloader
564 if isinstance(downloader, GenericDownloader):
565 break
567 if not downloader_used:
568 logger.error(f"No downloader found for {url}")
569 skip_reason = "No compatible downloader available"
571 if not pdf_content:
572 error_msg = skip_reason or "Failed to download PDF content"
573 # Store skip reason in attempt for retrieval
574 attempt.error_message = error_msg
575 attempt.succeeded = False
576 session.commit()
577 logger.info(f"Download failed with reason: {error_msg}")
578 return False, error_msg, status_code
580 # Get PDF storage mode setting
581 pdf_storage_mode = self.settings.get_setting(
582 "research_library.pdf_storage_mode", "none"
583 )
584 max_pdf_size_mb = int(
585 self.settings.get_setting(
586 "research_library.max_pdf_size_mb", 100
587 )
588 )
589 logger.info(
590 f"[DOWNLOAD_SERVICE] PDF storage mode: {pdf_storage_mode}"
591 )
593 # Update tracker
594 import hashlib
596 tracker.file_hash = hashlib.sha256(pdf_content).hexdigest()
597 tracker.file_size = len(pdf_content)
598 tracker.is_downloaded = True
599 tracker.downloaded_at = datetime.now(UTC)
601 # Initialize PDF storage manager
602 pdf_storage_manager = PDFStorageManager(
603 library_root=self.library_root,
604 storage_mode=pdf_storage_mode,
605 max_pdf_size_mb=max_pdf_size_mb,
606 )
608 # Update attempt with success info
609 attempt.succeeded = True
611 # Check if library document already exists
612 existing_doc = get_document_for_resource(session, resource)
614 if existing_doc:
615 # Update existing document
616 existing_doc.document_hash = tracker.file_hash
617 existing_doc.file_size = len(pdf_content)
618 existing_doc.status = DocumentStatus.COMPLETED
619 existing_doc.processed_at = datetime.now(UTC)
621 # Save PDF using storage manager (updates storage_mode and file_path)
622 file_path_result, _ = pdf_storage_manager.save_pdf(
623 pdf_content=pdf_content,
624 document=existing_doc,
625 session=session,
626 filename=f"{resource.id}.pdf",
627 url=url,
628 resource_id=resource.id,
629 )
631 # Update tracker
632 tracker.file_path = (
633 file_path_result if file_path_result else None
634 )
635 tracker.file_name = (
636 Path(file_path_result).name
637 if file_path_result and file_path_result != "database"
638 else None
639 )
640 else:
641 # Get source type ID for research downloads
642 try:
643 source_type_id = get_source_type_id(
644 self.username, "research_download", self.password
645 )
646 # Use provided collection_id or default to Library
647 library_collection_id = (
648 collection_id
649 or get_default_library_id(self.username, self.password)
650 )
651 except Exception:
652 logger.exception(
653 "Failed to get source type or library collection"
654 )
655 raise
657 # Create new unified document entry
658 doc_id = str(uuid.uuid4())
659 doc = Document(
660 id=doc_id,
661 source_type_id=source_type_id,
662 resource_id=resource.id,
663 research_id=resource.research_id,
664 document_hash=tracker.file_hash,
665 original_url=url,
666 file_size=len(pdf_content),
667 file_type="pdf",
668 mime_type="application/pdf",
669 title=resource.title,
670 status=DocumentStatus.COMPLETED,
671 processed_at=datetime.now(UTC),
672 storage_mode=pdf_storage_mode,
673 )
674 session.add(doc)
675 session.flush() # Ensure doc.id is available for blob storage
677 # Save PDF using storage manager (updates storage_mode and file_path)
678 file_path_result, _ = pdf_storage_manager.save_pdf(
679 pdf_content=pdf_content,
680 document=doc,
681 session=session,
682 filename=f"{resource.id}.pdf",
683 url=url,
684 resource_id=resource.id,
685 )
687 # Update tracker
688 tracker.file_path = (
689 file_path_result if file_path_result else None
690 )
691 tracker.file_name = (
692 Path(file_path_result).name
693 if file_path_result and file_path_result != "database"
694 else None
695 )
697 # Link document to default Library collection
698 doc_collection = DocumentCollection(
699 document_id=doc_id,
700 collection_id=library_collection_id,
701 indexed=False,
702 )
703 session.add(doc_collection)
705 # Update attempt
706 attempt.succeeded = True
707 attempt.bytes_downloaded = len(pdf_content)
709 if pdf_storage_mode == "database":
710 logger.info(
711 f"Successfully stored PDF in database: {resource.url}"
712 )
713 elif pdf_storage_mode == "filesystem":
714 logger.info(f"Successfully downloaded: {tracker.file_path}")
715 else:
716 logger.info(f"Successfully extracted text from: {resource.url}")
718 # Automatically extract and save text after successful PDF download
719 try:
720 logger.info(
721 f"Extracting text from downloaded PDF for: {resource.title[:50]}"
722 )
723 text = self._extract_text_from_pdf(pdf_content)
725 if text:
726 # Get the document ID we just created/updated
727 pdf_doc = get_document_for_resource(session, resource)
728 pdf_document_id = pdf_doc.id if pdf_doc else None
730 # Save text to encrypted database
731 self._save_text_with_db(
732 resource=resource,
733 text=text,
734 session=session,
735 extraction_method="pdf_extraction",
736 extraction_source="local_pdf",
737 pdf_document_id=pdf_document_id,
738 )
739 logger.info(
740 f"Successfully extracted and saved text for: {resource.title[:50]}"
741 )
742 else:
743 logger.warning(
744 f"Text extraction returned empty text for: {resource.title[:50]}"
745 )
746 except Exception:
747 logger.exception(
748 "Failed to extract text from PDF, but PDF download succeeded"
749 )
750 # Don't fail the entire download if text extraction fails
752 return True, None, status_code
754 except Exception as e:
755 logger.exception(f"Download failed for {url}")
756 attempt.succeeded = False
757 attempt.error_type = type(e).__name__
758 attempt.error_message = sanitize_for_log(str(e), max_length=200)
759 tracker.is_accessible = False
760 # Sanitize error message before returning to API
761 safe_error = attempt.error_message
762 return False, safe_error, None
764 def _extract_text_from_pdf(self, pdf_content: bytes) -> Optional[str]:
765 """
766 Extract text from PDF content using multiple methods for best results.
768 Args:
769 pdf_content: Raw PDF bytes
771 Returns:
772 Extracted text or None if extraction fails
773 """
774 try:
775 # First try with pdfplumber (better for complex layouts)
776 import io
778 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf:
779 text_parts = []
780 for page in pdf.pages:
781 page_text = page.extract_text()
782 if page_text:
783 text_parts.append(page_text)
785 if text_parts:
786 return "\n\n".join(text_parts)
788 # Fallback to PyPDF if pdfplumber fails
789 reader = PdfReader(io.BytesIO(pdf_content))
790 text_parts = []
791 for page in reader.pages:
792 text = page.extract_text()
793 if text:
794 text_parts.append(text)
796 if text_parts:
797 return "\n\n".join(text_parts)
799 logger.warning("No text could be extracted from PDF")
800 return None
802 except Exception:
803 logger.exception("Failed to extract text from PDF")
804 return None
806 def download_as_text(self, resource_id: int) -> Tuple[bool, Optional[str]]:
807 """
808 Download resource and extract text to encrypted database.
810 Args:
811 resource_id: ID of the resource to download
813 Returns:
814 Tuple of (success, error_message)
815 """
816 with get_user_db_session(self.username, self.password) as session:
817 # Get the resource
818 resource = (
819 session.query(ResearchResource)
820 .filter_by(id=resource_id)
821 .first()
822 )
823 if not resource:
824 return False, "Resource not found"
826 # Handle library resources — content already in the database
827 # (local-only, no network — skip retry checks)
828 if resource.source_type == "library" or (
829 resource.url and resource.url.startswith("/library/document/")
830 ):
831 return self._try_library_text_extraction(session, resource)
833 # Try existing text in database
834 result = self._try_existing_text(session, resource_id)
835 if result is not None:
836 return result
838 # Try legacy text files on disk
839 result = self._try_legacy_text_file(session, resource, resource_id)
840 if result is not None: 840 ↛ 841line 840 didn't jump to line 841 because the condition on line 840 was never true
841 return result
843 # Try extracting from existing PDF
844 result = self._try_existing_pdf_extraction(
845 session, resource, resource_id
846 )
847 if result is not None: 847 ↛ 848line 847 didn't jump to line 848 because the condition on line 847 was never true
848 return result
850 # Check retry eligibility before network-dependent extraction
851 if self.retry_manager: 851 ↛ 860line 851 didn't jump to line 860 because the condition on line 851 was always true
852 decision = self.retry_manager.should_retry_resource(resource_id)
853 if not decision.can_retry: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true
854 logger.info(
855 f"Skipping resource {resource_id}: {decision.reason}"
856 )
857 return False, decision.reason
859 # Try API text extraction
860 result = self._try_api_text_extraction(session, resource)
861 if result is not None:
862 self._record_retry_attempt(resource, result, session)
863 return result
865 # Fallback: Download PDF and extract
866 result = self._fallback_pdf_extraction(session, resource)
867 self._record_retry_attempt(resource, result, session)
868 return result
870 def _record_retry_attempt(
871 self, resource, result: Tuple[bool, Optional[str]], session=None
872 ) -> None:
873 """Record a download attempt with the retry manager."""
874 if not self.retry_manager: 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true
875 return
876 self.retry_manager.record_attempt(
877 resource_id=resource.id,
878 result=result,
879 url=resource.url or "",
880 details=result[1]
881 or (
882 "Successfully extracted text"
883 if result[0]
884 else "Text extraction failed"
885 ),
886 session=session,
887 )
889 def _try_library_text_extraction(
890 self, session, resource
891 ) -> Tuple[bool, Optional[str]]:
892 """Handle library resources — content already exists in local database.
894 Returns:
895 Tuple of (success, error_message). On success: (True, None).
896 On failure: (False, description of what went wrong).
897 """
898 # 1. Extract document UUID from metadata (authoritative) or URL (fallback)
899 doc_id = None
900 metadata = (resource.resource_metadata or {}).get("original_data", {})
901 if isinstance(metadata, dict):
902 meta_inner = metadata.get("metadata", {})
903 if isinstance(meta_inner, dict):
904 doc_id = meta_inner.get("source_id") or meta_inner.get(
905 "document_id"
906 )
908 if not doc_id and resource.url:
909 # Parse from /library/document/{uuid} or /library/document/{uuid}/pdf
910 match = re.match(r"^/library/document/([^/]+)", resource.url)
911 if match: 911 ↛ 914line 911 didn't jump to line 914 because the condition on line 911 was always true
912 doc_id = match.group(1)
914 if not doc_id:
915 return False, "Could not extract library document ID"
917 # 2. Query the existing Document by its primary key (UUID string)
918 doc = session.query(Document).filter_by(id=doc_id).first()
919 if not doc:
920 return False, f"Library document {doc_id} not found in database"
922 # 3. If text already exists and extraction succeeded, return success
923 if (
924 doc.text_content
925 and doc.extraction_method
926 and doc.extraction_method != "failed"
927 ):
928 logger.info(f"Library document {doc_id} already has text content")
929 resource.document_id = doc.id
930 session.commit()
931 return True, None
933 # 4. Try extracting text from stored PDF
934 pdf_storage_mode = self.settings.get_setting(
935 "research_library.pdf_storage_mode", "none"
936 )
937 pdf_manager = PDFStorageManager(
938 library_root=self.library_root,
939 storage_mode=pdf_storage_mode,
940 )
941 pdf_content = pdf_manager.load_pdf(doc, session)
943 if not pdf_content:
944 return (
945 False,
946 f"Library document {doc_id} has no text or PDF content",
947 )
949 text = self._extract_text_from_pdf(pdf_content)
950 if not text:
951 return (
952 False,
953 f"Failed to extract text from library document {doc_id} PDF",
954 )
956 # 5. Update Document directly (don't use _save_text_with_db — it
957 # queries by resource_id which mismatches for library docs)
958 doc.text_content = text
959 doc.character_count = len(text)
960 doc.word_count = len(text.split())
961 doc.extraction_method = "pdf_extraction"
962 doc.extraction_source = "pdfplumber"
963 doc.extraction_quality = "medium"
965 resource.document_id = doc.id
966 session.commit()
968 logger.info(
969 f"Extracted text from library document {doc_id} "
970 f"({doc.word_count} words)"
971 )
972 return True, None
974 def _try_existing_text(
975 self, session, resource_id: int
976 ) -> Optional[Tuple[bool, Optional[str]]]:
977 """Check if text already exists in database (in Document.text_content)."""
978 existing_doc = (
979 session.query(Document).filter_by(resource_id=resource_id).first()
980 )
982 if not existing_doc:
983 return None
985 # Check if text content exists and extraction was successful
986 if (
987 existing_doc.text_content
988 and existing_doc.extraction_method
989 and existing_doc.extraction_method != "failed"
990 ):
991 logger.info(
992 f"Text content already exists in Document for resource_id={resource_id}, extraction_method={existing_doc.extraction_method}"
993 )
994 return True, None
996 # No text content or failed extraction
997 logger.debug(
998 f"Document exists but no valid text content: resource_id={resource_id}, extraction_method={existing_doc.extraction_method}"
999 )
1000 return None # Fall through to re-extraction
1002 def _try_legacy_text_file(
1003 self, session, resource, resource_id: int
1004 ) -> Optional[Tuple[bool, Optional[str]]]:
1005 """Check for legacy text files on disk."""
1006 txt_path = Path(self.library_root) / "txt"
1007 existing_files = (
1008 list(txt_path.glob(f"*_{resource_id}.txt"))
1009 if txt_path.exists()
1010 else []
1011 )
1013 if not existing_files:
1014 return None
1016 logger.info(f"Text file already exists on disk: {existing_files[0]}")
1017 self._create_text_document_record(
1018 session,
1019 resource,
1020 existing_files[0],
1021 extraction_method="unknown",
1022 extraction_source="legacy_file",
1023 )
1024 session.commit()
1025 return True, None
1027 def _try_existing_pdf_extraction(
1028 self, session, resource, resource_id: int
1029 ) -> Optional[Tuple[bool, Optional[str]]]:
1030 """Try extracting text from existing PDF in database."""
1031 pdf_document = (
1032 session.query(Document).filter_by(resource_id=resource_id).first()
1033 )
1035 if not pdf_document or pdf_document.status != "completed":
1036 return None
1038 # Validate path to prevent path traversal attacks
1039 if (
1040 not pdf_document.file_path
1041 or pdf_document.file_path in FILE_PATH_SENTINELS
1042 ):
1043 return None
1044 try:
1045 safe_path = PathValidator.validate_safe_path(
1046 pdf_document.file_path, str(self.library_root)
1047 )
1048 pdf_path = Path(safe_path)
1049 except ValueError:
1050 logger.warning(f"Path traversal blocked: {pdf_document.file_path}")
1051 return None
1052 if not pdf_path.is_file():
1053 return None
1055 logger.info(f"Found existing PDF, extracting text from: {pdf_path}")
1056 try:
1057 with open(pdf_path, "rb") as f:
1058 pdf_content = f.read()
1059 text = self._extract_text_from_pdf(pdf_content)
1061 if not text:
1062 return None
1064 self._save_text_with_db(
1065 resource,
1066 text,
1067 session,
1068 extraction_method="pdf_extraction",
1069 extraction_source="pdfplumber",
1070 pdf_document_id=pdf_document.id,
1071 )
1072 session.commit()
1073 return True, None
1075 except Exception:
1076 logger.exception(
1077 f"Failed to extract text from existing PDF: {pdf_path}"
1078 )
1079 return None # Fall through to other methods
1081 def _try_api_text_extraction(
1082 self, session, resource
1083 ) -> Optional[Tuple[bool, Optional[str]]]:
1084 """Try direct API text extraction."""
1085 logger.info(
1086 f"Attempting direct API text extraction from: {resource.url}"
1087 )
1089 downloader = self._get_downloader(resource.url)
1090 if not downloader:
1091 return None
1093 result = downloader.download_with_result(resource.url, ContentType.TEXT)
1095 if not result.is_success or not result.content:
1096 return None
1098 # Decode text content
1099 text = (
1100 result.content.decode("utf-8", errors="ignore")
1101 if isinstance(result.content, bytes)
1102 else result.content
1103 )
1105 # Determine extraction source
1106 extraction_source = "unknown"
1107 if isinstance(downloader, ArxivDownloader):
1108 extraction_source = "arxiv_api"
1109 elif isinstance(downloader, PubMedDownloader): 1109 ↛ 1110line 1109 didn't jump to line 1110 because the condition on line 1109 was never true
1110 extraction_source = "pubmed_api"
1112 try:
1113 self._save_text_with_db(
1114 resource,
1115 text,
1116 session,
1117 extraction_method="native_api",
1118 extraction_source=extraction_source,
1119 )
1120 session.commit()
1121 logger.info(
1122 f"✓ SUCCESS: Got text from {extraction_source.upper()} API for '{resource.title[:50]}...'"
1123 )
1124 return True, None
1125 except Exception as e:
1126 logger.exception(f"Failed to save text for resource {resource.id}")
1127 # Sanitize error message before returning to API
1128 safe_error = sanitize_for_log(
1129 f"Failed to save text: {str(e)}", max_length=200
1130 )
1131 return False, safe_error
1133 def _fallback_pdf_extraction(
1134 self, session, resource
1135 ) -> Tuple[bool, Optional[str]]:
1136 """Fallback: Download PDF to memory and extract text."""
1137 logger.info(
1138 f"API text extraction failed, falling back to in-memory PDF download for: {resource.url}"
1139 )
1141 downloader = self._get_downloader(resource.url)
1142 if not downloader:
1143 error_msg = "No compatible downloader found"
1144 logger.warning(
1145 f"✗ FAILED: {error_msg} for '{resource.title[:50]}...'"
1146 )
1147 self._record_failed_text_extraction(
1148 session, resource, error=error_msg
1149 )
1150 session.commit()
1151 return False, error_msg
1153 result = downloader.download_with_result(resource.url, ContentType.PDF)
1155 if not result.is_success or not result.content:
1156 error_msg = result.skip_reason or "Failed to download PDF"
1157 logger.warning(
1158 f"✗ FAILED: Could not download PDF for '{resource.title[:50]}...' | Error: {error_msg}"
1159 )
1160 self._record_failed_text_extraction(
1161 session, resource, error=f"PDF download failed: {error_msg}"
1162 )
1163 session.commit()
1164 return False, f"PDF extraction failed: {error_msg}"
1166 # Extract text from PDF
1167 text = self._extract_text_from_pdf(result.content)
1168 if not text:
1169 error_msg = "PDF text extraction returned empty text"
1170 logger.warning(
1171 f"Failed to extract text from PDF for: {resource.url}"
1172 )
1173 self._record_failed_text_extraction(
1174 session, resource, error=error_msg
1175 )
1176 session.commit()
1177 return False, error_msg
1179 try:
1180 self._save_text_with_db(
1181 resource,
1182 text,
1183 session,
1184 extraction_method="pdf_extraction",
1185 extraction_source="pdfplumber_fallback",
1186 )
1187 session.commit()
1188 logger.info(
1189 f"✓ SUCCESS: Extracted text from '{resource.title[:50]}...'"
1190 )
1191 return True, None
1192 except Exception as e:
1193 logger.exception(f"Failed to save text for resource {resource.id}")
1194 # Sanitize error message before returning to API
1195 safe_error = sanitize_for_log(
1196 f"Failed to save text: {str(e)}", max_length=200
1197 )
1198 return False, safe_error
1200 def _get_downloader(self, url: str):
1201 """
1202 Get the appropriate downloader for a URL.
1204 Args:
1205 url: The URL to download from
1207 Returns:
1208 The appropriate downloader instance or None
1209 """
1210 for downloader in self.downloaders:
1211 if downloader.can_handle(url):
1212 return downloader
1213 return None
1215 def _download_generic(self, url: str) -> Optional[bytes]:
1216 """Generic PDF download method."""
1217 try:
1218 headers = {
1219 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1220 }
1221 response = safe_get(
1222 url, headers=headers, timeout=30, allow_redirects=True
1223 )
1224 response.raise_for_status()
1226 # Verify it's a PDF
1227 content_type = response.headers.get("Content-Type", "")
1228 if (
1229 "pdf" not in content_type.lower()
1230 and not response.content.startswith(b"%PDF")
1231 ):
1232 logger.warning(f"Response is not a PDF: {content_type}")
1233 return None
1235 return response.content
1237 except Exception:
1238 logger.exception("Generic download failed")
1239 return None
1241 def _download_arxiv(self, url: str) -> Optional[bytes]:
1242 """Download from arXiv."""
1243 try:
1244 # Convert abstract URL to PDF URL
1245 pdf_url = url.replace("abs", "pdf")
1246 if not pdf_url.endswith(".pdf"):
1247 pdf_url += ".pdf"
1249 return self._download_generic(pdf_url)
1250 except Exception:
1251 logger.exception("arXiv download failed")
1252 return None
1254 def _try_europe_pmc(self, pmid: str) -> Optional[bytes]:
1255 """Try downloading from Europe PMC which often has better PDF availability."""
1256 try:
1257 # Europe PMC API is more reliable for PDFs
1258 # Check if PDF is available
1259 api_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=EXT_ID:{pmid}&format=json"
1260 response = safe_get(api_url, timeout=10)
1262 if response.status_code == 200:
1263 data = response.json()
1264 results = data.get("resultList", {}).get("result", [])
1266 if results:
1267 article = results[0]
1268 # Check if article has open access PDF
1269 if (
1270 article.get("isOpenAccess") == "Y"
1271 and article.get("hasPDF") == "Y"
1272 ):
1273 pmcid = article.get("pmcid")
1274 if pmcid:
1275 # Europe PMC PDF URL
1276 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmcid}&blobtype=pdf"
1277 logger.info(
1278 f"Found Europe PMC PDF for PMID {pmid}: {pmcid}"
1279 )
1281 headers = {
1282 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1283 }
1285 pdf_response = safe_get(
1286 pdf_url,
1287 headers=headers,
1288 timeout=30,
1289 allow_redirects=True,
1290 )
1291 if pdf_response.status_code == 200: 1291 ↛ 1303line 1291 didn't jump to line 1303 because the condition on line 1291 was always true
1292 content_type = pdf_response.headers.get(
1293 "content-type", ""
1294 )
1295 if ( 1295 ↛ 1303line 1295 didn't jump to line 1303 because the condition on line 1295 was always true
1296 "pdf" in content_type.lower()
1297 or len(pdf_response.content) > 1000
1298 ):
1299 return pdf_response.content
1300 except Exception as e:
1301 logger.debug(f"Europe PMC download failed: {e}")
1303 return None
1305 def _download_pubmed(self, url: str) -> Optional[bytes]:
1306 """Download from PubMed/PubMed Central with rate limiting."""
1307 try:
1308 # Apply rate limiting for PubMed requests
1309 current_time = time.time()
1310 time_since_last = current_time - self._last_pubmed_request
1311 if time_since_last < self._pubmed_delay:
1312 sleep_time = self._pubmed_delay - time_since_last
1313 logger.debug(
1314 f"Rate limiting: sleeping {sleep_time:.2f}s before PubMed request"
1315 )
1316 time.sleep(sleep_time)
1317 self._last_pubmed_request = time.time()
1319 # If it's already a PMC article, download directly
1320 if "/articles/PMC" in url:
1321 pmc_match = re.search(r"(PMC\d+)", url)
1322 if pmc_match: 1322 ↛ 1526line 1322 didn't jump to line 1526 because the condition on line 1322 was always true
1323 pmc_id = pmc_match.group(1)
1325 # Try Europe PMC (more reliable)
1326 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1327 headers = {
1328 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1329 }
1331 try:
1332 response = safe_get(
1333 europe_url,
1334 headers=headers,
1335 timeout=30,
1336 allow_redirects=True,
1337 )
1338 if response.status_code == 200: 1338 ↛ 1526line 1338 didn't jump to line 1526 because the condition on line 1338 was always true
1339 content_type = response.headers.get(
1340 "content-type", ""
1341 )
1342 if ( 1342 ↛ 1526line 1342 didn't jump to line 1526 because the condition on line 1342 was always true
1343 "pdf" in content_type.lower()
1344 or len(response.content) > 1000
1345 ):
1346 logger.info(
1347 f"Downloaded PDF via Europe PMC for {pmc_id}"
1348 )
1349 return response.content
1350 except Exception as e:
1351 logger.debug(f"Direct Europe PMC download failed: {e}")
1352 return None
1354 # If it's a regular PubMed URL, try to find PMC version
1355 elif urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov":
1356 # Extract PMID from URL
1357 pmid_match = re.search(r"/(\d+)/?", url)
1358 if pmid_match:
1359 pmid = pmid_match.group(1)
1360 logger.info(f"Attempting to download PDF for PMID: {pmid}")
1362 # Try Europe PMC first (more reliable)
1363 pdf_content = self._try_europe_pmc(pmid)
1364 if pdf_content:
1365 return pdf_content
1367 # First try using NCBI E-utilities API to find PMC ID
1368 try:
1369 # Use elink to convert PMID to PMCID
1370 elink_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi"
1371 params = {
1372 "dbfrom": "pubmed",
1373 "db": "pmc",
1374 "id": pmid,
1375 "retmode": "json",
1376 }
1378 api_response = safe_get(
1379 elink_url, params=params, timeout=10
1380 )
1381 if api_response.status_code == 200: 1381 ↛ 1464line 1381 didn't jump to line 1464 because the condition on line 1381 was always true
1382 data = api_response.json()
1383 # Parse the response to find PMC ID
1384 link_sets = data.get("linksets", [])
1385 if link_sets and "linksetdbs" in link_sets[0]:
1386 for linksetdb in link_sets[0]["linksetdbs"]: 1386 ↛ 1464line 1386 didn't jump to line 1464 because the loop on line 1386 didn't complete
1387 if linksetdb.get( 1387 ↛ 1386line 1387 didn't jump to line 1386 because the condition on line 1387 was always true
1388 "dbto"
1389 ) == "pmc" and linksetdb.get("links"):
1390 pmc_id_num = linksetdb["links"][0]
1391 # Now fetch PMC details to get the correct PMC ID format
1392 esummary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
1393 summary_params = {
1394 "db": "pmc",
1395 "id": pmc_id_num,
1396 "retmode": "json",
1397 }
1398 summary_response = safe_get(
1399 esummary_url,
1400 params=summary_params,
1401 timeout=10,
1402 )
1403 if summary_response.status_code == 200: 1403 ↛ 1386line 1403 didn't jump to line 1386 because the condition on line 1403 was always true
1404 summary_data = (
1405 summary_response.json()
1406 )
1407 result = summary_data.get(
1408 "result", {}
1409 ).get(str(pmc_id_num), {})
1410 if result: 1410 ↛ 1386line 1410 didn't jump to line 1386 because the condition on line 1410 was always true
1411 # PMC IDs in the API don't have the "PMC" prefix
1412 pmc_id = f"PMC{pmc_id_num}"
1413 logger.info(
1414 f"Found PMC ID via API: {pmc_id} for PMID: {pmid}"
1415 )
1417 # Try Europe PMC with the PMC ID
1418 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1420 time.sleep(self._pubmed_delay)
1422 headers = {
1423 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1424 }
1426 try:
1427 response = safe_get(
1428 europe_url,
1429 headers=headers,
1430 timeout=30,
1431 allow_redirects=True,
1432 )
1433 if ( 1433 ↛ 1386line 1433 didn't jump to line 1386 because the condition on line 1433 was always true
1434 response.status_code
1435 == 200
1436 ):
1437 content_type = response.headers.get(
1438 "content-type", ""
1439 )
1440 if ( 1440 ↛ 1386line 1440 didn't jump to line 1386 because the condition on line 1440 was always true
1441 "pdf"
1442 in content_type.lower()
1443 or len(
1444 response.content
1445 )
1446 > 1000
1447 ):
1448 logger.info(
1449 f"Downloaded PDF via Europe PMC for {pmc_id}"
1450 )
1451 return (
1452 response.content
1453 )
1454 except Exception as e:
1455 logger.debug(
1456 f"Europe PMC download with PMC ID failed: {e}"
1457 )
1458 except Exception as e:
1459 logger.debug(
1460 f"API lookup failed, trying webpage scraping: {e}"
1461 )
1463 # Fallback to webpage scraping if API fails
1464 try:
1465 headers = {
1466 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1467 }
1468 response = safe_get(url, headers=headers, timeout=10)
1469 if response.status_code == 200: 1469 ↛ 1526line 1469 didn't jump to line 1526 because the condition on line 1469 was always true
1470 # Look for PMC ID in the page
1471 pmc_match = re.search(r"PMC\d+", response.text)
1472 if pmc_match: 1472 ↛ 1511line 1472 didn't jump to line 1511 because the condition on line 1472 was always true
1473 pmc_id = pmc_match.group(0)
1474 logger.info(
1475 f"Found PMC ID via webpage: {pmc_id} for PMID: {pmid}"
1476 )
1478 # Add delay before downloading PDF
1479 time.sleep(self._pubmed_delay)
1481 # Try Europe PMC with the PMC ID (more reliable)
1482 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
1483 headers = {
1484 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
1485 }
1487 try:
1488 response = safe_get(
1489 europe_url,
1490 headers=headers,
1491 timeout=30,
1492 allow_redirects=True,
1493 )
1494 if response.status_code == 200: 1494 ↛ 1526line 1494 didn't jump to line 1526 because the condition on line 1494 was always true
1495 content_type = response.headers.get(
1496 "content-type", ""
1497 )
1498 if ( 1498 ↛ 1526line 1498 didn't jump to line 1526 because the condition on line 1498 was always true
1499 "pdf" in content_type.lower()
1500 or len(response.content) > 1000
1501 ):
1502 logger.info(
1503 f"Downloaded PDF via Europe PMC for {pmc_id}"
1504 )
1505 return response.content
1506 except Exception as e:
1507 logger.debug(
1508 f"Europe PMC download failed: {e}"
1509 )
1510 else:
1511 logger.info(
1512 f"No PMC version found for PMID: {pmid}"
1513 )
1514 except requests.exceptions.HTTPError as e:
1515 if e.response.status_code == 429: 1515 ↛ 1522line 1515 didn't jump to line 1522 because the condition on line 1515 was always true
1516 logger.warning(
1517 "Rate limited by PubMed, increasing delay"
1518 )
1519 self._pubmed_delay = min(
1520 self._pubmed_delay * 2, 5.0
1521 ) # Max 5 seconds
1522 raise
1523 except Exception as e:
1524 logger.debug(f"Could not check for PMC version: {e}")
1526 return self._download_generic(url)
1527 except Exception:
1528 logger.exception("PubMed download failed")
1529 return None
1531 def _download_semantic_scholar(self, url: str) -> Optional[bytes]:
1532 """Download from Semantic Scholar."""
1533 # Semantic Scholar doesn't host PDFs directly
1534 # Would need to extract actual PDF URL from page
1535 return None
1537 def _download_biorxiv(self, url: str) -> Optional[bytes]:
1538 """Download from bioRxiv."""
1539 try:
1540 # Convert to PDF URL
1541 pdf_url = url.replace(".org/", ".org/content/")
1542 pdf_url = re.sub(r"v\d+$", "", pdf_url) # Remove version
1543 pdf_url += ".full.pdf"
1545 return self._download_generic(pdf_url)
1546 except Exception:
1547 logger.exception("bioRxiv download failed")
1548 return None
1550 def _save_text_with_db(
1551 self,
1552 resource: ResearchResource,
1553 text: str,
1554 session: Session,
1555 extraction_method: str,
1556 extraction_source: str,
1557 pdf_document_id: Optional[int] = None,
1558 ) -> Optional[str]:
1559 """
1560 Save extracted text to encrypted database.
1562 Args:
1563 resource: The research resource
1564 text: Extracted text content
1565 session: Database session
1566 extraction_method: How the text was extracted
1567 extraction_source: Specific tool/API used
1568 pdf_document_id: ID of PDF document if extracted from PDF
1570 Returns:
1571 None (previously returned text file path, now removed)
1572 """
1573 try:
1574 # Calculate text metadata for database
1575 word_count = len(text.split())
1576 character_count = len(text)
1578 # Find the document by pdf_document_id or resource_id
1579 doc = None
1580 if pdf_document_id:
1581 doc = (
1582 session.query(Document)
1583 .filter_by(id=pdf_document_id)
1584 .first()
1585 )
1586 else:
1587 doc = get_document_for_resource(session, resource)
1589 if doc:
1590 # Update existing document with extracted text
1591 doc.text_content = text
1592 doc.character_count = character_count
1593 doc.word_count = word_count
1594 doc.extraction_method = extraction_method
1595 doc.extraction_source = extraction_source
1596 doc.status = DocumentStatus.COMPLETED
1597 doc.document_hash = hashlib.sha256(text.encode()).hexdigest()
1598 doc.processed_at = datetime.now(UTC)
1600 # Set quality based on method
1601 if extraction_method == "native_api":
1602 doc.extraction_quality = "high"
1603 elif (
1604 extraction_method == "pdf_extraction"
1605 and extraction_source == "pdfplumber"
1606 ):
1607 doc.extraction_quality = "medium"
1608 else:
1609 doc.extraction_quality = "low"
1611 logger.debug(
1612 f"Updated document {doc.id} with extracted text ({word_count} words)"
1613 )
1614 else:
1615 # Create a new Document for text-only extraction
1616 # Generate hash from text content
1617 text_hash = hashlib.sha256(text.encode()).hexdigest()
1619 # Get source type for research downloads
1620 try:
1621 source_type_id = get_source_type_id(
1622 self.username, "research_download", self.password
1623 )
1624 except Exception:
1625 logger.exception(
1626 "Failed to get source type for text document"
1627 )
1628 raise
1630 # Create new document
1631 doc_id = str(uuid.uuid4())
1632 doc = Document(
1633 id=doc_id,
1634 source_type_id=source_type_id,
1635 resource_id=resource.id,
1636 research_id=resource.research_id,
1637 document_hash=text_hash,
1638 original_url=resource.url,
1639 file_path=FILE_PATH_TEXT_ONLY,
1640 file_size=character_count, # Use character count as file size for text-only
1641 file_type="text",
1642 mime_type="text/plain",
1643 title=resource.title,
1644 text_content=text,
1645 character_count=character_count,
1646 word_count=word_count,
1647 extraction_method=extraction_method,
1648 extraction_source=extraction_source,
1649 extraction_quality="high"
1650 if extraction_method == "native_api"
1651 else "medium",
1652 status=DocumentStatus.COMPLETED,
1653 processed_at=datetime.now(UTC),
1654 )
1655 session.add(doc)
1657 # Link to default Library collection
1658 library_collection = (
1659 session.query(Collection).filter_by(name="Library").first()
1660 )
1661 if library_collection:
1662 doc_collection = DocumentCollection(
1663 document_id=doc_id,
1664 collection_id=library_collection.id,
1665 indexed=False,
1666 chunk_count=0,
1667 )
1668 session.add(doc_collection)
1669 else:
1670 logger.warning(
1671 f"Library collection not found - document {doc_id} will not be linked to default collection"
1672 )
1674 logger.info(
1675 f"Created new document {doc_id} for text-only extraction ({word_count} words)"
1676 )
1678 logger.info(
1679 f"Saved text to encrypted database ({word_count} words)"
1680 )
1681 return None
1683 except Exception:
1684 logger.exception("Error saving text to encrypted database")
1685 raise # Re-raise so caller can handle the error
1687 def _create_text_document_record(
1688 self,
1689 session: Session,
1690 resource: ResearchResource,
1691 file_path: Path,
1692 extraction_method: str,
1693 extraction_source: str,
1694 ):
1695 """Update existing Document with text from file (for legacy text files)."""
1696 try:
1697 # Read file to get metadata
1698 text = file_path.read_text(encoding="utf-8", errors="ignore")
1699 word_count = len(text.split())
1700 character_count = len(text)
1702 # Find the Document for this resource
1703 doc = get_document_for_resource(session, resource)
1705 if doc:
1706 # Update existing document with text content
1707 doc.text_content = text
1708 doc.character_count = character_count
1709 doc.word_count = word_count
1710 doc.extraction_method = extraction_method
1711 doc.extraction_source = extraction_source
1712 doc.extraction_quality = (
1713 "low" # Unknown quality for legacy files
1714 )
1715 logger.info(
1716 f"Updated document {doc.id} with text from file: {file_path.name}"
1717 )
1718 else:
1719 logger.warning(
1720 f"No document found to update for resource {resource.id}"
1721 )
1723 except Exception:
1724 logger.exception("Error updating document with text from file")
1726 def _record_failed_text_extraction(
1727 self, session: Session, resource: ResearchResource, error: str
1728 ):
1729 """Record a failed text extraction attempt in the Document."""
1730 try:
1731 # Find the Document for this resource
1732 doc = get_document_for_resource(session, resource)
1734 if doc:
1735 # Update document with extraction error
1736 doc.error_message = error
1737 doc.extraction_method = "failed"
1738 doc.extraction_quality = "low"
1739 doc.status = DocumentStatus.FAILED
1740 logger.info(
1741 f"Recorded failed text extraction for document {doc.id}: {error}"
1742 )
1743 else:
1744 # Create a new Document for failed extraction
1745 # This enables tracking failures and retry capability
1746 source_type_id = get_source_type_id(
1747 self.username, "research_download", self.password
1748 )
1750 # Deterministic hash so retries update the same record
1751 failed_hash = hashlib.sha256(
1752 f"failed:{resource.url}:{resource.id}".encode()
1753 ).hexdigest()
1755 doc_id = str(uuid.uuid4())
1756 doc = Document(
1757 id=doc_id,
1758 source_type_id=source_type_id,
1759 resource_id=resource.id,
1760 research_id=resource.research_id,
1761 document_hash=failed_hash,
1762 original_url=resource.url,
1763 file_path=None,
1764 file_size=0,
1765 file_type="unknown",
1766 title=resource.title,
1767 status=DocumentStatus.FAILED,
1768 error_message=error,
1769 extraction_method="failed",
1770 extraction_quality="low",
1771 processed_at=datetime.now(UTC),
1772 )
1773 session.add(doc)
1775 logger.info(
1776 f"Created failed document {doc_id} for resource {resource.id}: {error}"
1777 )
1779 except Exception:
1780 logger.exception("Error recording failed text extraction")