Coverage for src / local_deep_research / research_library / services / download_service.py: 95%

705 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2PDF Download Service for Research Library 

3 

4Handles downloading PDFs from various academic sources with: 

5- Deduplication using download tracker 

6- Source-specific download strategies (arXiv, PubMed, etc.) 

7- Progress tracking and error handling 

8- File organization and storage 

9""" 

10 

11import hashlib 

12import os 

13import re 

14import time 

15import uuid 

16from datetime import datetime, UTC 

17from pathlib import Path 

18from typing import Optional, Tuple 

19from urllib.parse import urlparse 

20 

21import requests 

22from loguru import logger 

23from sqlalchemy.orm import Session 

24import pdfplumber 

25from pypdf import PdfReader 

26 

27from ...constants import FILE_PATH_SENTINELS, FILE_PATH_TEXT_ONLY 

28from ...database.models.download_tracker import ( 

29 DownloadAttempt, 

30 DownloadTracker, 

31) 

32from ...security import safe_get, sanitize_for_log 

33from ...security.path_validator import PathValidator 

34from ...database.models.library import ( 

35 Collection, 

36 Document as Document, 

37 DocumentStatus, 

38 DownloadQueue as LibraryDownloadQueue, 

39 DocumentCollection, 

40) 

41from .pdf_storage_manager import PDFStorageManager 

42from ...database.models.research import ResearchResource 

43from ...database.library_init import get_source_type_id, get_default_library_id 

44from ...database.session_context import get_user_db_session 

45from ...utilities.db_utils import get_settings_manager 

46from ...library.download_management import RetryManager 

47from ...config.paths import get_library_directory 

48from ..utils import ( 

49 get_document_for_resource, 

50 get_url_hash, 

51 get_absolute_path_from_settings, 

52 is_downloadable_url, 

53) 

54 

55# Import our modular downloaders 

56from ..downloaders import ( 

57 ContentType, 

58 ArxivDownloader, 

59 PubMedDownloader, 

60 BioRxivDownloader, 

61 DirectPDFDownloader, 

62 SemanticScholarDownloader, 

63 OpenAlexDownloader, 

64 GenericDownloader, 

65) 

66 

67 

68class DownloadService: 

69 """Service for downloading and managing research PDFs.""" 

70 

71 def __init__(self, username: str, password: Optional[str] = None): 

72 """Initialize download service for a user. 

73 

74 Args: 

75 username: The username to download for 

76 password: Optional password for encrypted database access 

77 """ 

78 self.username = username 

79 self.password = password 

80 self.settings = get_settings_manager(username=username) 

81 self._closed = False 

82 

83 # Debug settings manager and user context 

84 logger.info( 

85 f"[DOWNLOAD_SERVICE] Settings manager initialized: {type(self.settings)}, username: {self.username}" 

86 ) 

87 

88 # Get library path from settings (uses centralized path, respects LDR_DATA_DIR) 

89 storage_path_setting = self.settings.get_setting( 

90 "research_library.storage_path", 

91 str(get_library_directory()), 

92 ) 

93 logger.warning( 

94 f"[DOWNLOAD_SERVICE_INIT] Storage path setting retrieved: {storage_path_setting} (type: {type(storage_path_setting)})" 

95 ) 

96 

97 if storage_path_setting is None: 

98 logger.error( 

99 "[DOWNLOAD_SERVICE_INIT] CRITICAL: storage_path_setting is None!" 

100 ) 

101 raise ValueError("Storage path setting cannot be None") 

102 

103 self.library_root = str( 

104 Path(os.path.expandvars(storage_path_setting)) 

105 .expanduser() 

106 .resolve() 

107 ) 

108 logger.warning( 

109 f"[DOWNLOAD_SERVICE_INIT] Library root resolved to: {self.library_root}" 

110 ) 

111 

112 # Create directory structure 

113 self._setup_directories() 

114 

115 # Initialize modular downloaders 

116 # DirectPDFDownloader first for efficiency with direct PDF links 

117 

118 # Get Semantic Scholar API key from settings 

119 semantic_scholar_api_key = self.settings.get_setting( 

120 "search.engine.web.semantic_scholar.api_key", "" 

121 ) 

122 

123 self.downloaders = [ 

124 DirectPDFDownloader(timeout=30), # Handle direct PDF links first 

125 SemanticScholarDownloader( 

126 timeout=30, 

127 api_key=semantic_scholar_api_key 

128 if semantic_scholar_api_key 

129 else None, 

130 ), 

131 OpenAlexDownloader( 

132 timeout=30 

133 ), # OpenAlex with API lookup (no key needed) 

134 ArxivDownloader(timeout=30), 

135 PubMedDownloader(timeout=30, rate_limit_delay=1.0), 

136 BioRxivDownloader(timeout=30), 

137 GenericDownloader(timeout=30), # Generic should be last (fallback) 

138 ] 

139 

140 # Initialize retry manager for smart failure tracking 

141 self.retry_manager = RetryManager(username, password) 

142 logger.info( 

143 f"[DOWNLOAD_SERVICE] Initialized retry manager for user: {username}" 

144 ) 

145 

146 # PubMed rate limiting state 

147 self._pubmed_delay = 1.0 # 1 second delay for PubMed 

148 self._last_pubmed_request = 0.0 # Track last request time 

149 

150 def close(self): 

151 """Close all downloader resources.""" 

152 if self._closed: 

153 return 

154 self._closed = True 

155 

156 from ...utilities.resource_utils import safe_close 

157 

158 for downloader in self.downloaders: 

159 safe_close(downloader, "downloader") 

160 

161 # Close the settings manager's DB session to return the connection 

162 # to the pool. SettingsManager.close() is idempotent. 

163 safe_close(self.settings, "settings manager", allow_none=True) 

164 

165 # Clear references to allow garbage collection 

166 self.downloaders = [] 

167 self.retry_manager = None 

168 self.settings = None 

169 

170 def __enter__(self): 

171 """Enter context manager.""" 

172 return self 

173 

174 def __exit__(self, exc_type, exc_val, exc_tb): 

175 """Exit context manager, ensuring cleanup.""" 

176 self.close() 

177 return False 

178 

179 def _setup_directories(self): 

180 """Create library directory structure.""" 

181 # Only create the root and pdfs folder - flat structure 

182 paths = [ 

183 self.library_root, 

184 str(Path(self.library_root) / "pdfs"), 

185 ] 

186 for path in paths: 

187 Path(path).mkdir(parents=True, exist_ok=True) 

188 

189 def _normalize_url(self, url: str) -> str: 

190 """Normalize URL for consistent hashing.""" 

191 # Remove protocol variations 

192 url = re.sub(r"^https?://", "", url) 

193 # Remove www 

194 url = re.sub(r"^www\.", "", url) 

195 # Remove trailing slashes 

196 url = url.rstrip("/") 

197 # Sort query parameters 

198 if "?" in url: 

199 base, query = url.split("?", 1) 

200 params = sorted(query.split("&")) 

201 url = f"{base}?{'&'.join(params)}" 

202 return url.lower() 

203 

204 def _get_url_hash(self, url: str) -> str: 

205 """Generate SHA256 hash of normalized URL.""" 

206 normalized = self._normalize_url(url) 

207 return get_url_hash(normalized) 

208 

209 def is_already_downloaded(self, url: str) -> Tuple[bool, Optional[str]]: 

210 """ 

211 Check if URL is already downloaded. 

212 

213 Returns: 

214 Tuple of (is_downloaded, file_path) 

215 """ 

216 url_hash = self._get_url_hash(url) 

217 

218 with get_user_db_session(self.username, self.password) as session: 

219 tracker = ( 

220 session.query(DownloadTracker) 

221 .filter_by(url_hash=url_hash, is_downloaded=True) 

222 .first() 

223 ) 

224 

225 if tracker and tracker.file_path: 

226 # Compute absolute path and verify file still exists 

227 absolute_path = get_absolute_path_from_settings( 

228 tracker.file_path 

229 ) 

230 if absolute_path and absolute_path.is_file(): 

231 return True, str(absolute_path) 

232 if absolute_path: 

233 # File was deleted, mark as not downloaded 

234 tracker.is_downloaded = False 

235 session.commit() 

236 # If absolute_path is None, path was blocked - treat as not downloaded 

237 

238 return False, None 

239 

240 def get_text_content(self, resource_id: int) -> Optional[str]: 

241 """ 

242 Get text content for a research resource. 

243 

244 This will try to: 

245 1. Fetch text directly from APIs if available 

246 2. Extract text from downloaded PDF if exists 

247 3. Download PDF and extract text if not yet downloaded 

248 

249 Args: 

250 resource_id: ID of the research resource 

251 

252 Returns: 

253 Text content as string, or None if extraction failed 

254 """ 

255 with get_user_db_session(self.username, self.password) as session: 

256 resource = session.query(ResearchResource).get(resource_id) 

257 if not resource: 

258 logger.error(f"Resource {resource_id} not found") 

259 return None 

260 

261 url = resource.url 

262 

263 # Find appropriate downloader 

264 for downloader in self.downloaders: 

265 if downloader.can_handle(url): 

266 logger.info( 

267 f"Using {downloader.__class__.__name__} for text extraction from {url}" 

268 ) 

269 try: 

270 # Try to get text content 

271 text = downloader.download_text(url) 

272 if text: 

273 logger.info( 

274 f"Successfully extracted text for: {resource.title[:50]}" 

275 ) 

276 return text 

277 except Exception: 

278 logger.exception("Failed to extract text") 

279 break 

280 

281 logger.warning(f"Could not extract text for {url}") 

282 return None 

283 

284 def queue_research_downloads( 

285 self, research_id: str, collection_id: Optional[str] = None 

286 ) -> int: 

287 """ 

288 Queue all downloadable PDFs from a research session. 

289 

290 Args: 

291 research_id: The research session ID 

292 collection_id: Optional target collection ID (defaults to Library if not provided) 

293 

294 Returns: 

295 Number of items queued 

296 """ 

297 queued = 0 

298 

299 # Get default library collection if no collection_id provided 

300 if not collection_id: 

301 from ...database.library_init import get_default_library_id 

302 

303 collection_id = get_default_library_id(self.username, self.password) 

304 

305 with get_user_db_session(self.username, self.password) as session: 

306 # Get all resources for this research 

307 resources = ( 

308 session.query(ResearchResource) 

309 .filter_by(research_id=research_id) 

310 .all() 

311 ) 

312 

313 for resource in resources: 

314 if self._is_downloadable(resource): 

315 # Library resources linked via document_id are already done 

316 if resource.document_id: 

317 continue 

318 

319 # Check if already queued 

320 existing_queue = ( 

321 session.query(LibraryDownloadQueue) 

322 .filter_by( 

323 resource_id=resource.id, 

324 status=DocumentStatus.PENDING, 

325 ) 

326 .first() 

327 ) 

328 

329 # Check if already downloaded (trust the database status) 

330 existing_doc = ( 

331 session.query(Document) 

332 .filter_by( 

333 resource_id=resource.id, 

334 status=DocumentStatus.COMPLETED, 

335 ) 

336 .first() 

337 ) 

338 

339 # Queue if not already queued and not marked as completed 

340 if not existing_queue and not existing_doc: 340 ↛ 313line 340 didn't jump to line 313 because the condition on line 340 was always true

341 # Check one more time if ANY queue entry exists (regardless of status) 

342 any_queue = ( 

343 session.query(LibraryDownloadQueue) 

344 .filter_by(resource_id=resource.id) 

345 .first() 

346 ) 

347 

348 if any_queue: 

349 # Reset the existing queue entry 

350 any_queue.status = DocumentStatus.PENDING 

351 any_queue.research_id = research_id 

352 any_queue.collection_id = collection_id 

353 queued += 1 

354 else: 

355 # Add new queue entry 

356 queue_entry = LibraryDownloadQueue( 

357 resource_id=resource.id, 

358 research_id=research_id, 

359 collection_id=collection_id, 

360 priority=0, 

361 status=DocumentStatus.PENDING, 

362 ) 

363 session.add(queue_entry) 

364 queued += 1 

365 

366 session.commit() 

367 logger.info( 

368 f"Queued {queued} downloads for research {research_id} to collection {collection_id}" 

369 ) 

370 

371 return queued 

372 

373 def _is_downloadable(self, resource: ResearchResource) -> bool: 

374 """Check if a resource is likely downloadable as PDF. 

375 

376 Delegates to the consolidated is_downloadable_url() from utils. 

377 """ 

378 return is_downloadable_url(resource.url) 

379 

380 def download_resource(self, resource_id: int) -> Tuple[bool, Optional[str]]: 

381 """ 

382 Download a specific resource. 

383 

384 Returns: 

385 Tuple of (success: bool, skip_reason: str or None) 

386 """ 

387 with get_user_db_session(self.username, self.password) as session: 

388 resource = session.query(ResearchResource).get(resource_id) 

389 if not resource: 

390 logger.error(f"Resource {resource_id} not found") 

391 return False, "Resource not found" 

392 

393 # Check if already downloaded (trust the database after sync) 

394 existing_doc = ( 

395 session.query(Document) 

396 .filter_by( 

397 resource_id=resource_id, status=DocumentStatus.COMPLETED 

398 ) 

399 .first() 

400 ) 

401 

402 if existing_doc: 

403 logger.info( 

404 "Resource already downloaded (according to database)" 

405 ) 

406 return True, None 

407 

408 # Get collection_id from queue entry if it exists 

409 queue_entry = ( 

410 session.query(LibraryDownloadQueue) 

411 .filter_by(resource_id=resource_id) 

412 .first() 

413 ) 

414 collection_id = ( 

415 queue_entry.collection_id 

416 if queue_entry and queue_entry.collection_id 

417 else None 

418 ) 

419 

420 # Create download tracker entry 

421 url_hash = self._get_url_hash(resource.url) 

422 tracker = ( 

423 session.query(DownloadTracker) 

424 .filter_by(url_hash=url_hash) 

425 .first() 

426 ) 

427 

428 if not tracker: 

429 tracker = DownloadTracker( 

430 url=resource.url, 

431 url_hash=url_hash, 

432 first_resource_id=resource.id, 

433 is_downloaded=False, 

434 ) 

435 session.add(tracker) 

436 session.commit() 

437 

438 # Attempt download 

439 success, skip_reason, status_code = self._download_pdf( 

440 resource, tracker, session, collection_id 

441 ) 

442 

443 # Record attempt with retry manager for smart failure tracking 

444 self.retry_manager.record_attempt( 

445 resource_id=resource.id, 

446 result=(success, skip_reason), 

447 status_code=status_code, 

448 url=resource.url, 

449 details=skip_reason 

450 or ( 

451 "Successfully downloaded" if success else "Download failed" 

452 ), 

453 session=session, 

454 ) 

455 

456 # Update queue status if exists 

457 queue_entry = ( 

458 session.query(LibraryDownloadQueue) 

459 .filter_by(resource_id=resource_id) 

460 .first() 

461 ) 

462 

463 if queue_entry: 

464 queue_entry.status = ( 

465 DocumentStatus.COMPLETED 

466 if success 

467 else DocumentStatus.FAILED 

468 ) 

469 queue_entry.completed_at = datetime.now(UTC) 

470 

471 session.commit() 

472 

473 # Trigger auto-indexing for successfully downloaded documents 

474 if success and self.password: 

475 try: 

476 from ..routes.rag_routes import trigger_auto_index 

477 from ...database.library_init import get_default_library_id 

478 

479 # Get the document that was just created 

480 doc = ( 

481 session.query(Document) 

482 .filter_by(resource_id=resource_id) 

483 .order_by(Document.created_at.desc()) 

484 .first() 

485 ) 

486 if doc: 

487 # Use collection_id from queue entry or default Library 

488 # NB: pass username string, not the SQLAlchemy session 

489 target_collection = ( 

490 collection_id 

491 or get_default_library_id( 

492 self.username, self.password 

493 ) 

494 ) 

495 if target_collection: 495 ↛ anywhereline 495 didn't jump anywhere: it always raised an exception.

496 trigger_auto_index( 

497 [doc.id], 

498 target_collection, 

499 self.username, 

500 self.password, 

501 ) 

502 except Exception: 

503 logger.exception("Failed to trigger auto-indexing") 

504 

505 return success, skip_reason 

506 

507 def _download_pdf( 

508 self, 

509 resource: ResearchResource, 

510 tracker: DownloadTracker, 

511 session: Session, 

512 collection_id: Optional[str] = None, 

513 ) -> Tuple[bool, Optional[str], Optional[int]]: 

514 """ 

515 Perform the actual PDF download. 

516 

517 Args: 

518 resource: The research resource to download 

519 tracker: Download tracker for this URL 

520 session: Database session 

521 collection_id: Optional target collection ID (defaults to Library if not provided) 

522 

523 Returns: 

524 Tuple of (success: bool, skip_reason: Optional[str], status_code: Optional[int]) 

525 """ 

526 url = resource.url 

527 

528 # Log attempt 

529 attempt = DownloadAttempt( 

530 url_hash=tracker.url_hash, 

531 attempt_number=tracker.download_attempts.count() + 1 

532 if hasattr(tracker, "download_attempts") 

533 else 1, 

534 attempted_at=datetime.now(UTC), 

535 ) 

536 session.add(attempt) 

537 

538 try: 

539 # Use modular downloaders with detailed skip reasons 

540 pdf_content = None 

541 downloader_used = None 

542 skip_reason = None 

543 status_code = None 

544 

545 for downloader in self.downloaders: 

546 if downloader.can_handle(url): 

547 logger.info( 

548 f"Using {downloader.__class__.__name__} for {url}" 

549 ) 

550 result = downloader.download_with_result( 

551 url, ContentType.PDF 

552 ) 

553 downloader_used = downloader.__class__.__name__ 

554 

555 if result.is_success and result.content: 

556 pdf_content = result.content 

557 status_code = result.status_code 

558 break 

559 if result.skip_reason: 559 ↛ 545line 559 didn't jump to line 545 because the condition on line 559 was always true

560 skip_reason = result.skip_reason 

561 status_code = result.status_code 

562 logger.info(f"Download skipped: {skip_reason}") 

563 # Keep trying other downloaders unless it's the GenericDownloader 

564 if isinstance(downloader, GenericDownloader): 

565 break 

566 

567 if not downloader_used: 

568 logger.error(f"No downloader found for {url}") 

569 skip_reason = "No compatible downloader available" 

570 

571 if not pdf_content: 

572 error_msg = skip_reason or "Failed to download PDF content" 

573 # Store skip reason in attempt for retrieval 

574 attempt.error_message = error_msg 

575 attempt.succeeded = False 

576 session.commit() 

577 logger.info(f"Download failed with reason: {error_msg}") 

578 return False, error_msg, status_code 

579 

580 # Get PDF storage mode setting 

581 pdf_storage_mode = self.settings.get_setting( 

582 "research_library.pdf_storage_mode", "none" 

583 ) 

584 max_pdf_size_mb = int( 

585 self.settings.get_setting( 

586 "research_library.max_pdf_size_mb", 100 

587 ) 

588 ) 

589 logger.info( 

590 f"[DOWNLOAD_SERVICE] PDF storage mode: {pdf_storage_mode}" 

591 ) 

592 

593 # Update tracker 

594 import hashlib 

595 

596 tracker.file_hash = hashlib.sha256(pdf_content).hexdigest() 

597 tracker.file_size = len(pdf_content) 

598 tracker.is_downloaded = True 

599 tracker.downloaded_at = datetime.now(UTC) 

600 

601 # Initialize PDF storage manager 

602 pdf_storage_manager = PDFStorageManager( 

603 library_root=self.library_root, 

604 storage_mode=pdf_storage_mode, 

605 max_pdf_size_mb=max_pdf_size_mb, 

606 ) 

607 

608 # Update attempt with success info 

609 attempt.succeeded = True 

610 

611 # Check if library document already exists 

612 existing_doc = get_document_for_resource(session, resource) 

613 

614 if existing_doc: 

615 # Update existing document 

616 existing_doc.document_hash = tracker.file_hash 

617 existing_doc.file_size = len(pdf_content) 

618 existing_doc.status = DocumentStatus.COMPLETED 

619 existing_doc.processed_at = datetime.now(UTC) 

620 

621 # Save PDF using storage manager (updates storage_mode and file_path) 

622 file_path_result, _ = pdf_storage_manager.save_pdf( 

623 pdf_content=pdf_content, 

624 document=existing_doc, 

625 session=session, 

626 filename=f"{resource.id}.pdf", 

627 url=url, 

628 resource_id=resource.id, 

629 ) 

630 

631 # Update tracker 

632 tracker.file_path = ( 

633 file_path_result if file_path_result else None 

634 ) 

635 tracker.file_name = ( 

636 Path(file_path_result).name 

637 if file_path_result and file_path_result != "database" 

638 else None 

639 ) 

640 else: 

641 # Get source type ID for research downloads 

642 try: 

643 source_type_id = get_source_type_id( 

644 self.username, "research_download", self.password 

645 ) 

646 # Use provided collection_id or default to Library 

647 library_collection_id = ( 

648 collection_id 

649 or get_default_library_id(self.username, self.password) 

650 ) 

651 except Exception: 

652 logger.exception( 

653 "Failed to get source type or library collection" 

654 ) 

655 raise 

656 

657 # Create new unified document entry 

658 doc_id = str(uuid.uuid4()) 

659 doc = Document( 

660 id=doc_id, 

661 source_type_id=source_type_id, 

662 resource_id=resource.id, 

663 research_id=resource.research_id, 

664 document_hash=tracker.file_hash, 

665 original_url=url, 

666 file_size=len(pdf_content), 

667 file_type="pdf", 

668 mime_type="application/pdf", 

669 title=resource.title, 

670 status=DocumentStatus.COMPLETED, 

671 processed_at=datetime.now(UTC), 

672 storage_mode=pdf_storage_mode, 

673 ) 

674 session.add(doc) 

675 session.flush() # Ensure doc.id is available for blob storage 

676 

677 # Save PDF using storage manager (updates storage_mode and file_path) 

678 file_path_result, _ = pdf_storage_manager.save_pdf( 

679 pdf_content=pdf_content, 

680 document=doc, 

681 session=session, 

682 filename=f"{resource.id}.pdf", 

683 url=url, 

684 resource_id=resource.id, 

685 ) 

686 

687 # Update tracker 

688 tracker.file_path = ( 

689 file_path_result if file_path_result else None 

690 ) 

691 tracker.file_name = ( 

692 Path(file_path_result).name 

693 if file_path_result and file_path_result != "database" 

694 else None 

695 ) 

696 

697 # Link document to default Library collection 

698 doc_collection = DocumentCollection( 

699 document_id=doc_id, 

700 collection_id=library_collection_id, 

701 indexed=False, 

702 ) 

703 session.add(doc_collection) 

704 

705 # Update attempt 

706 attempt.succeeded = True 

707 attempt.bytes_downloaded = len(pdf_content) 

708 

709 if pdf_storage_mode == "database": 

710 logger.info( 

711 f"Successfully stored PDF in database: {resource.url}" 

712 ) 

713 elif pdf_storage_mode == "filesystem": 

714 logger.info(f"Successfully downloaded: {tracker.file_path}") 

715 else: 

716 logger.info(f"Successfully extracted text from: {resource.url}") 

717 

718 # Automatically extract and save text after successful PDF download 

719 try: 

720 logger.info( 

721 f"Extracting text from downloaded PDF for: {resource.title[:50]}" 

722 ) 

723 text = self._extract_text_from_pdf(pdf_content) 

724 

725 if text: 

726 # Get the document ID we just created/updated 

727 pdf_doc = get_document_for_resource(session, resource) 

728 pdf_document_id = pdf_doc.id if pdf_doc else None 

729 

730 # Save text to encrypted database 

731 self._save_text_with_db( 

732 resource=resource, 

733 text=text, 

734 session=session, 

735 extraction_method="pdf_extraction", 

736 extraction_source="local_pdf", 

737 pdf_document_id=pdf_document_id, 

738 ) 

739 logger.info( 

740 f"Successfully extracted and saved text for: {resource.title[:50]}" 

741 ) 

742 else: 

743 logger.warning( 

744 f"Text extraction returned empty text for: {resource.title[:50]}" 

745 ) 

746 except Exception: 

747 logger.exception( 

748 "Failed to extract text from PDF, but PDF download succeeded" 

749 ) 

750 # Don't fail the entire download if text extraction fails 

751 

752 return True, None, status_code 

753 

754 except Exception as e: 

755 logger.exception(f"Download failed for {url}") 

756 attempt.succeeded = False 

757 attempt.error_type = type(e).__name__ 

758 attempt.error_message = sanitize_for_log(str(e), max_length=200) 

759 tracker.is_accessible = False 

760 # Sanitize error message before returning to API 

761 safe_error = attempt.error_message 

762 return False, safe_error, None 

763 

764 def _extract_text_from_pdf(self, pdf_content: bytes) -> Optional[str]: 

765 """ 

766 Extract text from PDF content using multiple methods for best results. 

767 

768 Args: 

769 pdf_content: Raw PDF bytes 

770 

771 Returns: 

772 Extracted text or None if extraction fails 

773 """ 

774 try: 

775 # First try with pdfplumber (better for complex layouts) 

776 import io 

777 

778 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf: 

779 text_parts = [] 

780 for page in pdf.pages: 

781 page_text = page.extract_text() 

782 if page_text: 

783 text_parts.append(page_text) 

784 

785 if text_parts: 

786 return "\n\n".join(text_parts) 

787 

788 # Fallback to PyPDF if pdfplumber fails 

789 reader = PdfReader(io.BytesIO(pdf_content)) 

790 text_parts = [] 

791 for page in reader.pages: 

792 text = page.extract_text() 

793 if text: 

794 text_parts.append(text) 

795 

796 if text_parts: 

797 return "\n\n".join(text_parts) 

798 

799 logger.warning("No text could be extracted from PDF") 

800 return None 

801 

802 except Exception: 

803 logger.exception("Failed to extract text from PDF") 

804 return None 

805 

806 def download_as_text(self, resource_id: int) -> Tuple[bool, Optional[str]]: 

807 """ 

808 Download resource and extract text to encrypted database. 

809 

810 Args: 

811 resource_id: ID of the resource to download 

812 

813 Returns: 

814 Tuple of (success, error_message) 

815 """ 

816 with get_user_db_session(self.username, self.password) as session: 

817 # Get the resource 

818 resource = ( 

819 session.query(ResearchResource) 

820 .filter_by(id=resource_id) 

821 .first() 

822 ) 

823 if not resource: 

824 return False, "Resource not found" 

825 

826 # Handle library resources — content already in the database 

827 # (local-only, no network — skip retry checks) 

828 if resource.source_type == "library" or ( 

829 resource.url and resource.url.startswith("/library/document/") 

830 ): 

831 return self._try_library_text_extraction(session, resource) 

832 

833 # Try existing text in database 

834 result = self._try_existing_text(session, resource_id) 

835 if result is not None: 

836 return result 

837 

838 # Try legacy text files on disk 

839 result = self._try_legacy_text_file(session, resource, resource_id) 

840 if result is not None: 840 ↛ 841line 840 didn't jump to line 841 because the condition on line 840 was never true

841 return result 

842 

843 # Try extracting from existing PDF 

844 result = self._try_existing_pdf_extraction( 

845 session, resource, resource_id 

846 ) 

847 if result is not None: 847 ↛ 848line 847 didn't jump to line 848 because the condition on line 847 was never true

848 return result 

849 

850 # Check retry eligibility before network-dependent extraction 

851 if self.retry_manager: 851 ↛ 860line 851 didn't jump to line 860 because the condition on line 851 was always true

852 decision = self.retry_manager.should_retry_resource(resource_id) 

853 if not decision.can_retry: 853 ↛ 854line 853 didn't jump to line 854 because the condition on line 853 was never true

854 logger.info( 

855 f"Skipping resource {resource_id}: {decision.reason}" 

856 ) 

857 return False, decision.reason 

858 

859 # Try API text extraction 

860 result = self._try_api_text_extraction(session, resource) 

861 if result is not None: 

862 self._record_retry_attempt(resource, result, session) 

863 return result 

864 

865 # Fallback: Download PDF and extract 

866 result = self._fallback_pdf_extraction(session, resource) 

867 self._record_retry_attempt(resource, result, session) 

868 return result 

869 

870 def _record_retry_attempt( 

871 self, resource, result: Tuple[bool, Optional[str]], session=None 

872 ) -> None: 

873 """Record a download attempt with the retry manager.""" 

874 if not self.retry_manager: 874 ↛ 875line 874 didn't jump to line 875 because the condition on line 874 was never true

875 return 

876 self.retry_manager.record_attempt( 

877 resource_id=resource.id, 

878 result=result, 

879 url=resource.url or "", 

880 details=result[1] 

881 or ( 

882 "Successfully extracted text" 

883 if result[0] 

884 else "Text extraction failed" 

885 ), 

886 session=session, 

887 ) 

888 

889 def _try_library_text_extraction( 

890 self, session, resource 

891 ) -> Tuple[bool, Optional[str]]: 

892 """Handle library resources — content already exists in local database. 

893 

894 Returns: 

895 Tuple of (success, error_message). On success: (True, None). 

896 On failure: (False, description of what went wrong). 

897 """ 

898 # 1. Extract document UUID from metadata (authoritative) or URL (fallback) 

899 doc_id = None 

900 metadata = (resource.resource_metadata or {}).get("original_data", {}) 

901 if isinstance(metadata, dict): 

902 meta_inner = metadata.get("metadata", {}) 

903 if isinstance(meta_inner, dict): 

904 doc_id = meta_inner.get("source_id") or meta_inner.get( 

905 "document_id" 

906 ) 

907 

908 if not doc_id and resource.url: 

909 # Parse from /library/document/{uuid} or /library/document/{uuid}/pdf 

910 match = re.match(r"^/library/document/([^/]+)", resource.url) 

911 if match: 911 ↛ 914line 911 didn't jump to line 914 because the condition on line 911 was always true

912 doc_id = match.group(1) 

913 

914 if not doc_id: 

915 return False, "Could not extract library document ID" 

916 

917 # 2. Query the existing Document by its primary key (UUID string) 

918 doc = session.query(Document).filter_by(id=doc_id).first() 

919 if not doc: 

920 return False, f"Library document {doc_id} not found in database" 

921 

922 # 3. If text already exists and extraction succeeded, return success 

923 if ( 

924 doc.text_content 

925 and doc.extraction_method 

926 and doc.extraction_method != "failed" 

927 ): 

928 logger.info(f"Library document {doc_id} already has text content") 

929 resource.document_id = doc.id 

930 session.commit() 

931 return True, None 

932 

933 # 4. Try extracting text from stored PDF 

934 pdf_storage_mode = self.settings.get_setting( 

935 "research_library.pdf_storage_mode", "none" 

936 ) 

937 pdf_manager = PDFStorageManager( 

938 library_root=self.library_root, 

939 storage_mode=pdf_storage_mode, 

940 ) 

941 pdf_content = pdf_manager.load_pdf(doc, session) 

942 

943 if not pdf_content: 

944 return ( 

945 False, 

946 f"Library document {doc_id} has no text or PDF content", 

947 ) 

948 

949 text = self._extract_text_from_pdf(pdf_content) 

950 if not text: 

951 return ( 

952 False, 

953 f"Failed to extract text from library document {doc_id} PDF", 

954 ) 

955 

956 # 5. Update Document directly (don't use _save_text_with_db — it 

957 # queries by resource_id which mismatches for library docs) 

958 doc.text_content = text 

959 doc.character_count = len(text) 

960 doc.word_count = len(text.split()) 

961 doc.extraction_method = "pdf_extraction" 

962 doc.extraction_source = "pdfplumber" 

963 doc.extraction_quality = "medium" 

964 

965 resource.document_id = doc.id 

966 session.commit() 

967 

968 logger.info( 

969 f"Extracted text from library document {doc_id} " 

970 f"({doc.word_count} words)" 

971 ) 

972 return True, None 

973 

974 def _try_existing_text( 

975 self, session, resource_id: int 

976 ) -> Optional[Tuple[bool, Optional[str]]]: 

977 """Check if text already exists in database (in Document.text_content).""" 

978 existing_doc = ( 

979 session.query(Document).filter_by(resource_id=resource_id).first() 

980 ) 

981 

982 if not existing_doc: 

983 return None 

984 

985 # Check if text content exists and extraction was successful 

986 if ( 

987 existing_doc.text_content 

988 and existing_doc.extraction_method 

989 and existing_doc.extraction_method != "failed" 

990 ): 

991 logger.info( 

992 f"Text content already exists in Document for resource_id={resource_id}, extraction_method={existing_doc.extraction_method}" 

993 ) 

994 return True, None 

995 

996 # No text content or failed extraction 

997 logger.debug( 

998 f"Document exists but no valid text content: resource_id={resource_id}, extraction_method={existing_doc.extraction_method}" 

999 ) 

1000 return None # Fall through to re-extraction 

1001 

1002 def _try_legacy_text_file( 

1003 self, session, resource, resource_id: int 

1004 ) -> Optional[Tuple[bool, Optional[str]]]: 

1005 """Check for legacy text files on disk.""" 

1006 txt_path = Path(self.library_root) / "txt" 

1007 existing_files = ( 

1008 list(txt_path.glob(f"*_{resource_id}.txt")) 

1009 if txt_path.exists() 

1010 else [] 

1011 ) 

1012 

1013 if not existing_files: 

1014 return None 

1015 

1016 logger.info(f"Text file already exists on disk: {existing_files[0]}") 

1017 self._create_text_document_record( 

1018 session, 

1019 resource, 

1020 existing_files[0], 

1021 extraction_method="unknown", 

1022 extraction_source="legacy_file", 

1023 ) 

1024 session.commit() 

1025 return True, None 

1026 

1027 def _try_existing_pdf_extraction( 

1028 self, session, resource, resource_id: int 

1029 ) -> Optional[Tuple[bool, Optional[str]]]: 

1030 """Try extracting text from existing PDF in database.""" 

1031 pdf_document = ( 

1032 session.query(Document).filter_by(resource_id=resource_id).first() 

1033 ) 

1034 

1035 if not pdf_document or pdf_document.status != "completed": 

1036 return None 

1037 

1038 # Validate path to prevent path traversal attacks 

1039 if ( 

1040 not pdf_document.file_path 

1041 or pdf_document.file_path in FILE_PATH_SENTINELS 

1042 ): 

1043 return None 

1044 try: 

1045 safe_path = PathValidator.validate_safe_path( 

1046 pdf_document.file_path, str(self.library_root) 

1047 ) 

1048 pdf_path = Path(safe_path) 

1049 except ValueError: 

1050 logger.warning(f"Path traversal blocked: {pdf_document.file_path}") 

1051 return None 

1052 if not pdf_path.is_file(): 

1053 return None 

1054 

1055 logger.info(f"Found existing PDF, extracting text from: {pdf_path}") 

1056 try: 

1057 with open(pdf_path, "rb") as f: 

1058 pdf_content = f.read() 

1059 text = self._extract_text_from_pdf(pdf_content) 

1060 

1061 if not text: 

1062 return None 

1063 

1064 self._save_text_with_db( 

1065 resource, 

1066 text, 

1067 session, 

1068 extraction_method="pdf_extraction", 

1069 extraction_source="pdfplumber", 

1070 pdf_document_id=pdf_document.id, 

1071 ) 

1072 session.commit() 

1073 return True, None 

1074 

1075 except Exception: 

1076 logger.exception( 

1077 f"Failed to extract text from existing PDF: {pdf_path}" 

1078 ) 

1079 return None # Fall through to other methods 

1080 

1081 def _try_api_text_extraction( 

1082 self, session, resource 

1083 ) -> Optional[Tuple[bool, Optional[str]]]: 

1084 """Try direct API text extraction.""" 

1085 logger.info( 

1086 f"Attempting direct API text extraction from: {resource.url}" 

1087 ) 

1088 

1089 downloader = self._get_downloader(resource.url) 

1090 if not downloader: 

1091 return None 

1092 

1093 result = downloader.download_with_result(resource.url, ContentType.TEXT) 

1094 

1095 if not result.is_success or not result.content: 

1096 return None 

1097 

1098 # Decode text content 

1099 text = ( 

1100 result.content.decode("utf-8", errors="ignore") 

1101 if isinstance(result.content, bytes) 

1102 else result.content 

1103 ) 

1104 

1105 # Determine extraction source 

1106 extraction_source = "unknown" 

1107 if isinstance(downloader, ArxivDownloader): 

1108 extraction_source = "arxiv_api" 

1109 elif isinstance(downloader, PubMedDownloader): 1109 ↛ 1110line 1109 didn't jump to line 1110 because the condition on line 1109 was never true

1110 extraction_source = "pubmed_api" 

1111 

1112 try: 

1113 self._save_text_with_db( 

1114 resource, 

1115 text, 

1116 session, 

1117 extraction_method="native_api", 

1118 extraction_source=extraction_source, 

1119 ) 

1120 session.commit() 

1121 logger.info( 

1122 f"✓ SUCCESS: Got text from {extraction_source.upper()} API for '{resource.title[:50]}...'" 

1123 ) 

1124 return True, None 

1125 except Exception as e: 

1126 logger.exception(f"Failed to save text for resource {resource.id}") 

1127 # Sanitize error message before returning to API 

1128 safe_error = sanitize_for_log( 

1129 f"Failed to save text: {str(e)}", max_length=200 

1130 ) 

1131 return False, safe_error 

1132 

1133 def _fallback_pdf_extraction( 

1134 self, session, resource 

1135 ) -> Tuple[bool, Optional[str]]: 

1136 """Fallback: Download PDF to memory and extract text.""" 

1137 logger.info( 

1138 f"API text extraction failed, falling back to in-memory PDF download for: {resource.url}" 

1139 ) 

1140 

1141 downloader = self._get_downloader(resource.url) 

1142 if not downloader: 

1143 error_msg = "No compatible downloader found" 

1144 logger.warning( 

1145 f"✗ FAILED: {error_msg} for '{resource.title[:50]}...'" 

1146 ) 

1147 self._record_failed_text_extraction( 

1148 session, resource, error=error_msg 

1149 ) 

1150 session.commit() 

1151 return False, error_msg 

1152 

1153 result = downloader.download_with_result(resource.url, ContentType.PDF) 

1154 

1155 if not result.is_success or not result.content: 

1156 error_msg = result.skip_reason or "Failed to download PDF" 

1157 logger.warning( 

1158 f"✗ FAILED: Could not download PDF for '{resource.title[:50]}...' | Error: {error_msg}" 

1159 ) 

1160 self._record_failed_text_extraction( 

1161 session, resource, error=f"PDF download failed: {error_msg}" 

1162 ) 

1163 session.commit() 

1164 return False, f"PDF extraction failed: {error_msg}" 

1165 

1166 # Extract text from PDF 

1167 text = self._extract_text_from_pdf(result.content) 

1168 if not text: 

1169 error_msg = "PDF text extraction returned empty text" 

1170 logger.warning( 

1171 f"Failed to extract text from PDF for: {resource.url}" 

1172 ) 

1173 self._record_failed_text_extraction( 

1174 session, resource, error=error_msg 

1175 ) 

1176 session.commit() 

1177 return False, error_msg 

1178 

1179 try: 

1180 self._save_text_with_db( 

1181 resource, 

1182 text, 

1183 session, 

1184 extraction_method="pdf_extraction", 

1185 extraction_source="pdfplumber_fallback", 

1186 ) 

1187 session.commit() 

1188 logger.info( 

1189 f"✓ SUCCESS: Extracted text from '{resource.title[:50]}...'" 

1190 ) 

1191 return True, None 

1192 except Exception as e: 

1193 logger.exception(f"Failed to save text for resource {resource.id}") 

1194 # Sanitize error message before returning to API 

1195 safe_error = sanitize_for_log( 

1196 f"Failed to save text: {str(e)}", max_length=200 

1197 ) 

1198 return False, safe_error 

1199 

1200 def _get_downloader(self, url: str): 

1201 """ 

1202 Get the appropriate downloader for a URL. 

1203 

1204 Args: 

1205 url: The URL to download from 

1206 

1207 Returns: 

1208 The appropriate downloader instance or None 

1209 """ 

1210 for downloader in self.downloaders: 

1211 if downloader.can_handle(url): 

1212 return downloader 

1213 return None 

1214 

1215 def _download_generic(self, url: str) -> Optional[bytes]: 

1216 """Generic PDF download method.""" 

1217 try: 

1218 headers = { 

1219 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1220 } 

1221 response = safe_get( 

1222 url, headers=headers, timeout=30, allow_redirects=True 

1223 ) 

1224 response.raise_for_status() 

1225 

1226 # Verify it's a PDF 

1227 content_type = response.headers.get("Content-Type", "") 

1228 if ( 

1229 "pdf" not in content_type.lower() 

1230 and not response.content.startswith(b"%PDF") 

1231 ): 

1232 logger.warning(f"Response is not a PDF: {content_type}") 

1233 return None 

1234 

1235 return response.content 

1236 

1237 except Exception: 

1238 logger.exception("Generic download failed") 

1239 return None 

1240 

1241 def _download_arxiv(self, url: str) -> Optional[bytes]: 

1242 """Download from arXiv.""" 

1243 try: 

1244 # Convert abstract URL to PDF URL 

1245 pdf_url = url.replace("abs", "pdf") 

1246 if not pdf_url.endswith(".pdf"): 

1247 pdf_url += ".pdf" 

1248 

1249 return self._download_generic(pdf_url) 

1250 except Exception: 

1251 logger.exception("arXiv download failed") 

1252 return None 

1253 

1254 def _try_europe_pmc(self, pmid: str) -> Optional[bytes]: 

1255 """Try downloading from Europe PMC which often has better PDF availability.""" 

1256 try: 

1257 # Europe PMC API is more reliable for PDFs 

1258 # Check if PDF is available 

1259 api_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=EXT_ID:{pmid}&format=json" 

1260 response = safe_get(api_url, timeout=10) 

1261 

1262 if response.status_code == 200: 

1263 data = response.json() 

1264 results = data.get("resultList", {}).get("result", []) 

1265 

1266 if results: 

1267 article = results[0] 

1268 # Check if article has open access PDF 

1269 if ( 

1270 article.get("isOpenAccess") == "Y" 

1271 and article.get("hasPDF") == "Y" 

1272 ): 

1273 pmcid = article.get("pmcid") 

1274 if pmcid: 

1275 # Europe PMC PDF URL 

1276 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmcid}&blobtype=pdf" 

1277 logger.info( 

1278 f"Found Europe PMC PDF for PMID {pmid}: {pmcid}" 

1279 ) 

1280 

1281 headers = { 

1282 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1283 } 

1284 

1285 pdf_response = safe_get( 

1286 pdf_url, 

1287 headers=headers, 

1288 timeout=30, 

1289 allow_redirects=True, 

1290 ) 

1291 if pdf_response.status_code == 200: 1291 ↛ 1303line 1291 didn't jump to line 1303 because the condition on line 1291 was always true

1292 content_type = pdf_response.headers.get( 

1293 "content-type", "" 

1294 ) 

1295 if ( 1295 ↛ 1303line 1295 didn't jump to line 1303 because the condition on line 1295 was always true

1296 "pdf" in content_type.lower() 

1297 or len(pdf_response.content) > 1000 

1298 ): 

1299 return pdf_response.content 

1300 except Exception as e: 

1301 logger.debug(f"Europe PMC download failed: {e}") 

1302 

1303 return None 

1304 

1305 def _download_pubmed(self, url: str) -> Optional[bytes]: 

1306 """Download from PubMed/PubMed Central with rate limiting.""" 

1307 try: 

1308 # Apply rate limiting for PubMed requests 

1309 current_time = time.time() 

1310 time_since_last = current_time - self._last_pubmed_request 

1311 if time_since_last < self._pubmed_delay: 

1312 sleep_time = self._pubmed_delay - time_since_last 

1313 logger.debug( 

1314 f"Rate limiting: sleeping {sleep_time:.2f}s before PubMed request" 

1315 ) 

1316 time.sleep(sleep_time) 

1317 self._last_pubmed_request = time.time() 

1318 

1319 # If it's already a PMC article, download directly 

1320 if "/articles/PMC" in url: 

1321 pmc_match = re.search(r"(PMC\d+)", url) 

1322 if pmc_match: 1322 ↛ 1526line 1322 didn't jump to line 1526 because the condition on line 1322 was always true

1323 pmc_id = pmc_match.group(1) 

1324 

1325 # Try Europe PMC (more reliable) 

1326 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1327 headers = { 

1328 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1329 } 

1330 

1331 try: 

1332 response = safe_get( 

1333 europe_url, 

1334 headers=headers, 

1335 timeout=30, 

1336 allow_redirects=True, 

1337 ) 

1338 if response.status_code == 200: 1338 ↛ 1526line 1338 didn't jump to line 1526 because the condition on line 1338 was always true

1339 content_type = response.headers.get( 

1340 "content-type", "" 

1341 ) 

1342 if ( 1342 ↛ 1526line 1342 didn't jump to line 1526 because the condition on line 1342 was always true

1343 "pdf" in content_type.lower() 

1344 or len(response.content) > 1000 

1345 ): 

1346 logger.info( 

1347 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1348 ) 

1349 return response.content 

1350 except Exception as e: 

1351 logger.debug(f"Direct Europe PMC download failed: {e}") 

1352 return None 

1353 

1354 # If it's a regular PubMed URL, try to find PMC version 

1355 elif urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov": 

1356 # Extract PMID from URL 

1357 pmid_match = re.search(r"/(\d+)/?", url) 

1358 if pmid_match: 

1359 pmid = pmid_match.group(1) 

1360 logger.info(f"Attempting to download PDF for PMID: {pmid}") 

1361 

1362 # Try Europe PMC first (more reliable) 

1363 pdf_content = self._try_europe_pmc(pmid) 

1364 if pdf_content: 

1365 return pdf_content 

1366 

1367 # First try using NCBI E-utilities API to find PMC ID 

1368 try: 

1369 # Use elink to convert PMID to PMCID 

1370 elink_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi" 

1371 params = { 

1372 "dbfrom": "pubmed", 

1373 "db": "pmc", 

1374 "id": pmid, 

1375 "retmode": "json", 

1376 } 

1377 

1378 api_response = safe_get( 

1379 elink_url, params=params, timeout=10 

1380 ) 

1381 if api_response.status_code == 200: 1381 ↛ 1464line 1381 didn't jump to line 1464 because the condition on line 1381 was always true

1382 data = api_response.json() 

1383 # Parse the response to find PMC ID 

1384 link_sets = data.get("linksets", []) 

1385 if link_sets and "linksetdbs" in link_sets[0]: 

1386 for linksetdb in link_sets[0]["linksetdbs"]: 1386 ↛ 1464line 1386 didn't jump to line 1464 because the loop on line 1386 didn't complete

1387 if linksetdb.get( 1387 ↛ 1386line 1387 didn't jump to line 1386 because the condition on line 1387 was always true

1388 "dbto" 

1389 ) == "pmc" and linksetdb.get("links"): 

1390 pmc_id_num = linksetdb["links"][0] 

1391 # Now fetch PMC details to get the correct PMC ID format 

1392 esummary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi" 

1393 summary_params = { 

1394 "db": "pmc", 

1395 "id": pmc_id_num, 

1396 "retmode": "json", 

1397 } 

1398 summary_response = safe_get( 

1399 esummary_url, 

1400 params=summary_params, 

1401 timeout=10, 

1402 ) 

1403 if summary_response.status_code == 200: 1403 ↛ 1386line 1403 didn't jump to line 1386 because the condition on line 1403 was always true

1404 summary_data = ( 

1405 summary_response.json() 

1406 ) 

1407 result = summary_data.get( 

1408 "result", {} 

1409 ).get(str(pmc_id_num), {}) 

1410 if result: 1410 ↛ 1386line 1410 didn't jump to line 1386 because the condition on line 1410 was always true

1411 # PMC IDs in the API don't have the "PMC" prefix 

1412 pmc_id = f"PMC{pmc_id_num}" 

1413 logger.info( 

1414 f"Found PMC ID via API: {pmc_id} for PMID: {pmid}" 

1415 ) 

1416 

1417 # Try Europe PMC with the PMC ID 

1418 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1419 

1420 time.sleep(self._pubmed_delay) 

1421 

1422 headers = { 

1423 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1424 } 

1425 

1426 try: 

1427 response = safe_get( 

1428 europe_url, 

1429 headers=headers, 

1430 timeout=30, 

1431 allow_redirects=True, 

1432 ) 

1433 if ( 1433 ↛ 1386line 1433 didn't jump to line 1386 because the condition on line 1433 was always true

1434 response.status_code 

1435 == 200 

1436 ): 

1437 content_type = response.headers.get( 

1438 "content-type", "" 

1439 ) 

1440 if ( 1440 ↛ 1386line 1440 didn't jump to line 1386 because the condition on line 1440 was always true

1441 "pdf" 

1442 in content_type.lower() 

1443 or len( 

1444 response.content 

1445 ) 

1446 > 1000 

1447 ): 

1448 logger.info( 

1449 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1450 ) 

1451 return ( 

1452 response.content 

1453 ) 

1454 except Exception as e: 

1455 logger.debug( 

1456 f"Europe PMC download with PMC ID failed: {e}" 

1457 ) 

1458 except Exception as e: 

1459 logger.debug( 

1460 f"API lookup failed, trying webpage scraping: {e}" 

1461 ) 

1462 

1463 # Fallback to webpage scraping if API fails 

1464 try: 

1465 headers = { 

1466 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1467 } 

1468 response = safe_get(url, headers=headers, timeout=10) 

1469 if response.status_code == 200: 1469 ↛ 1526line 1469 didn't jump to line 1526 because the condition on line 1469 was always true

1470 # Look for PMC ID in the page 

1471 pmc_match = re.search(r"PMC\d+", response.text) 

1472 if pmc_match: 1472 ↛ 1511line 1472 didn't jump to line 1511 because the condition on line 1472 was always true

1473 pmc_id = pmc_match.group(0) 

1474 logger.info( 

1475 f"Found PMC ID via webpage: {pmc_id} for PMID: {pmid}" 

1476 ) 

1477 

1478 # Add delay before downloading PDF 

1479 time.sleep(self._pubmed_delay) 

1480 

1481 # Try Europe PMC with the PMC ID (more reliable) 

1482 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1483 headers = { 

1484 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1485 } 

1486 

1487 try: 

1488 response = safe_get( 

1489 europe_url, 

1490 headers=headers, 

1491 timeout=30, 

1492 allow_redirects=True, 

1493 ) 

1494 if response.status_code == 200: 1494 ↛ 1526line 1494 didn't jump to line 1526 because the condition on line 1494 was always true

1495 content_type = response.headers.get( 

1496 "content-type", "" 

1497 ) 

1498 if ( 1498 ↛ 1526line 1498 didn't jump to line 1526 because the condition on line 1498 was always true

1499 "pdf" in content_type.lower() 

1500 or len(response.content) > 1000 

1501 ): 

1502 logger.info( 

1503 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1504 ) 

1505 return response.content 

1506 except Exception as e: 

1507 logger.debug( 

1508 f"Europe PMC download failed: {e}" 

1509 ) 

1510 else: 

1511 logger.info( 

1512 f"No PMC version found for PMID: {pmid}" 

1513 ) 

1514 except requests.exceptions.HTTPError as e: 

1515 if e.response.status_code == 429: 1515 ↛ 1522line 1515 didn't jump to line 1522 because the condition on line 1515 was always true

1516 logger.warning( 

1517 "Rate limited by PubMed, increasing delay" 

1518 ) 

1519 self._pubmed_delay = min( 

1520 self._pubmed_delay * 2, 5.0 

1521 ) # Max 5 seconds 

1522 raise 

1523 except Exception as e: 

1524 logger.debug(f"Could not check for PMC version: {e}") 

1525 

1526 return self._download_generic(url) 

1527 except Exception: 

1528 logger.exception("PubMed download failed") 

1529 return None 

1530 

1531 def _download_semantic_scholar(self, url: str) -> Optional[bytes]: 

1532 """Download from Semantic Scholar.""" 

1533 # Semantic Scholar doesn't host PDFs directly 

1534 # Would need to extract actual PDF URL from page 

1535 return None 

1536 

1537 def _download_biorxiv(self, url: str) -> Optional[bytes]: 

1538 """Download from bioRxiv.""" 

1539 try: 

1540 # Convert to PDF URL 

1541 pdf_url = url.replace(".org/", ".org/content/") 

1542 pdf_url = re.sub(r"v\d+$", "", pdf_url) # Remove version 

1543 pdf_url += ".full.pdf" 

1544 

1545 return self._download_generic(pdf_url) 

1546 except Exception: 

1547 logger.exception("bioRxiv download failed") 

1548 return None 

1549 

1550 def _save_text_with_db( 

1551 self, 

1552 resource: ResearchResource, 

1553 text: str, 

1554 session: Session, 

1555 extraction_method: str, 

1556 extraction_source: str, 

1557 pdf_document_id: Optional[int] = None, 

1558 ) -> Optional[str]: 

1559 """ 

1560 Save extracted text to encrypted database. 

1561 

1562 Args: 

1563 resource: The research resource 

1564 text: Extracted text content 

1565 session: Database session 

1566 extraction_method: How the text was extracted 

1567 extraction_source: Specific tool/API used 

1568 pdf_document_id: ID of PDF document if extracted from PDF 

1569 

1570 Returns: 

1571 None (previously returned text file path, now removed) 

1572 """ 

1573 try: 

1574 # Calculate text metadata for database 

1575 word_count = len(text.split()) 

1576 character_count = len(text) 

1577 

1578 # Find the document by pdf_document_id or resource_id 

1579 doc = None 

1580 if pdf_document_id: 

1581 doc = ( 

1582 session.query(Document) 

1583 .filter_by(id=pdf_document_id) 

1584 .first() 

1585 ) 

1586 else: 

1587 doc = get_document_for_resource(session, resource) 

1588 

1589 if doc: 

1590 # Update existing document with extracted text 

1591 doc.text_content = text 

1592 doc.character_count = character_count 

1593 doc.word_count = word_count 

1594 doc.extraction_method = extraction_method 

1595 doc.extraction_source = extraction_source 

1596 doc.status = DocumentStatus.COMPLETED 

1597 doc.document_hash = hashlib.sha256(text.encode()).hexdigest() 

1598 doc.processed_at = datetime.now(UTC) 

1599 

1600 # Set quality based on method 

1601 if extraction_method == "native_api": 

1602 doc.extraction_quality = "high" 

1603 elif ( 

1604 extraction_method == "pdf_extraction" 

1605 and extraction_source == "pdfplumber" 

1606 ): 

1607 doc.extraction_quality = "medium" 

1608 else: 

1609 doc.extraction_quality = "low" 

1610 

1611 logger.debug( 

1612 f"Updated document {doc.id} with extracted text ({word_count} words)" 

1613 ) 

1614 else: 

1615 # Create a new Document for text-only extraction 

1616 # Generate hash from text content 

1617 text_hash = hashlib.sha256(text.encode()).hexdigest() 

1618 

1619 # Get source type for research downloads 

1620 try: 

1621 source_type_id = get_source_type_id( 

1622 self.username, "research_download", self.password 

1623 ) 

1624 except Exception: 

1625 logger.exception( 

1626 "Failed to get source type for text document" 

1627 ) 

1628 raise 

1629 

1630 # Create new document 

1631 doc_id = str(uuid.uuid4()) 

1632 doc = Document( 

1633 id=doc_id, 

1634 source_type_id=source_type_id, 

1635 resource_id=resource.id, 

1636 research_id=resource.research_id, 

1637 document_hash=text_hash, 

1638 original_url=resource.url, 

1639 file_path=FILE_PATH_TEXT_ONLY, 

1640 file_size=character_count, # Use character count as file size for text-only 

1641 file_type="text", 

1642 mime_type="text/plain", 

1643 title=resource.title, 

1644 text_content=text, 

1645 character_count=character_count, 

1646 word_count=word_count, 

1647 extraction_method=extraction_method, 

1648 extraction_source=extraction_source, 

1649 extraction_quality="high" 

1650 if extraction_method == "native_api" 

1651 else "medium", 

1652 status=DocumentStatus.COMPLETED, 

1653 processed_at=datetime.now(UTC), 

1654 ) 

1655 session.add(doc) 

1656 

1657 # Link to default Library collection 

1658 library_collection = ( 

1659 session.query(Collection).filter_by(name="Library").first() 

1660 ) 

1661 if library_collection: 

1662 doc_collection = DocumentCollection( 

1663 document_id=doc_id, 

1664 collection_id=library_collection.id, 

1665 indexed=False, 

1666 chunk_count=0, 

1667 ) 

1668 session.add(doc_collection) 

1669 else: 

1670 logger.warning( 

1671 f"Library collection not found - document {doc_id} will not be linked to default collection" 

1672 ) 

1673 

1674 logger.info( 

1675 f"Created new document {doc_id} for text-only extraction ({word_count} words)" 

1676 ) 

1677 

1678 logger.info( 

1679 f"Saved text to encrypted database ({word_count} words)" 

1680 ) 

1681 return None 

1682 

1683 except Exception: 

1684 logger.exception("Error saving text to encrypted database") 

1685 raise # Re-raise so caller can handle the error 

1686 

1687 def _create_text_document_record( 

1688 self, 

1689 session: Session, 

1690 resource: ResearchResource, 

1691 file_path: Path, 

1692 extraction_method: str, 

1693 extraction_source: str, 

1694 ): 

1695 """Update existing Document with text from file (for legacy text files).""" 

1696 try: 

1697 # Read file to get metadata 

1698 text = file_path.read_text(encoding="utf-8", errors="ignore") 

1699 word_count = len(text.split()) 

1700 character_count = len(text) 

1701 

1702 # Find the Document for this resource 

1703 doc = get_document_for_resource(session, resource) 

1704 

1705 if doc: 

1706 # Update existing document with text content 

1707 doc.text_content = text 

1708 doc.character_count = character_count 

1709 doc.word_count = word_count 

1710 doc.extraction_method = extraction_method 

1711 doc.extraction_source = extraction_source 

1712 doc.extraction_quality = ( 

1713 "low" # Unknown quality for legacy files 

1714 ) 

1715 logger.info( 

1716 f"Updated document {doc.id} with text from file: {file_path.name}" 

1717 ) 

1718 else: 

1719 logger.warning( 

1720 f"No document found to update for resource {resource.id}" 

1721 ) 

1722 

1723 except Exception: 

1724 logger.exception("Error updating document with text from file") 

1725 

1726 def _record_failed_text_extraction( 

1727 self, session: Session, resource: ResearchResource, error: str 

1728 ): 

1729 """Record a failed text extraction attempt in the Document.""" 

1730 try: 

1731 # Find the Document for this resource 

1732 doc = get_document_for_resource(session, resource) 

1733 

1734 if doc: 

1735 # Update document with extraction error 

1736 doc.error_message = error 

1737 doc.extraction_method = "failed" 

1738 doc.extraction_quality = "low" 

1739 doc.status = DocumentStatus.FAILED 

1740 logger.info( 

1741 f"Recorded failed text extraction for document {doc.id}: {error}" 

1742 ) 

1743 else: 

1744 # Create a new Document for failed extraction 

1745 # This enables tracking failures and retry capability 

1746 source_type_id = get_source_type_id( 

1747 self.username, "research_download", self.password 

1748 ) 

1749 

1750 # Deterministic hash so retries update the same record 

1751 failed_hash = hashlib.sha256( 

1752 f"failed:{resource.url}:{resource.id}".encode() 

1753 ).hexdigest() 

1754 

1755 doc_id = str(uuid.uuid4()) 

1756 doc = Document( 

1757 id=doc_id, 

1758 source_type_id=source_type_id, 

1759 resource_id=resource.id, 

1760 research_id=resource.research_id, 

1761 document_hash=failed_hash, 

1762 original_url=resource.url, 

1763 file_path=None, 

1764 file_size=0, 

1765 file_type="unknown", 

1766 title=resource.title, 

1767 status=DocumentStatus.FAILED, 

1768 error_message=error, 

1769 extraction_method="failed", 

1770 extraction_quality="low", 

1771 processed_at=datetime.now(UTC), 

1772 ) 

1773 session.add(doc) 

1774 

1775 logger.info( 

1776 f"Created failed document {doc_id} for resource {resource.id}: {error}" 

1777 ) 

1778 

1779 except Exception: 

1780 logger.exception("Error recording failed text extraction")