Coverage for src / local_deep_research / research_library / services / download_service.py: 39%

643 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2PDF Download Service for Research Library 

3 

4Handles downloading PDFs from various academic sources with: 

5- Deduplication using download tracker 

6- Source-specific download strategies (arXiv, PubMed, etc.) 

7- Progress tracking and error handling 

8- File organization and storage 

9""" 

10 

11import hashlib 

12import re 

13import time 

14import uuid 

15from datetime import datetime, UTC 

16from pathlib import Path 

17from typing import Optional, Tuple 

18from urllib.parse import urlparse 

19 

20import requests 

21from loguru import logger 

22from sqlalchemy.orm import Session 

23import pdfplumber 

24from pypdf import PdfReader 

25 

26from ...database.models.download_tracker import ( 

27 DownloadAttempt, 

28 DownloadTracker, 

29) 

30from ...security import redact_data, safe_get 

31from ...database.models.library import ( 

32 Collection, 

33 Document as Document, 

34 DocumentStatus, 

35 DownloadQueue as LibraryDownloadQueue, 

36 DocumentCollection, 

37) 

38from .pdf_storage_manager import PDFStorageManager 

39from ...database.models.research import ResearchResource 

40from ...database.library_init import get_source_type_id, get_default_library_id 

41from ...database.session_context import get_user_db_session 

42from ...utilities.db_utils import get_settings_manager 

43from ...library.download_management import RetryManager 

44from ...config.paths import get_library_directory 

45from ..utils import ( 

46 get_url_hash, 

47 get_absolute_path_from_settings, 

48) 

49 

50# Import our modular downloaders 

51from ..downloaders import ( 

52 ContentType, 

53 ArxivDownloader, 

54 PubMedDownloader, 

55 BioRxivDownloader, 

56 DirectPDFDownloader, 

57 SemanticScholarDownloader, 

58 OpenAlexDownloader, 

59 GenericDownloader, 

60) 

61 

62 

63class DownloadService: 

64 """Service for downloading and managing research PDFs.""" 

65 

66 def __init__(self, username: str, password: Optional[str] = None): 

67 """Initialize download service for a user. 

68 

69 Args: 

70 username: The username to download for 

71 password: Optional password for encrypted database access 

72 """ 

73 self.username = username 

74 self.password = password 

75 self.settings = get_settings_manager(username=username) 

76 self._closed = False 

77 

78 # Debug settings manager and user context 

79 logger.info( 

80 f"[DOWNLOAD_SERVICE] Settings manager initialized: {type(self.settings)}, username: {self.username}" 

81 ) 

82 

83 # Get library path from settings (uses centralized path, respects LDR_DATA_DIR) 

84 storage_path_setting = self.settings.get_setting( 

85 "research_library.storage_path", 

86 str(get_library_directory()), 

87 ) 

88 logger.warning( 

89 f"[DOWNLOAD_SERVICE_INIT] Storage path setting retrieved: {storage_path_setting} (type: {type(storage_path_setting)})" 

90 ) 

91 

92 if storage_path_setting is None: 

93 logger.error( 

94 "[DOWNLOAD_SERVICE_INIT] CRITICAL: storage_path_setting is None!" 

95 ) 

96 raise ValueError("Storage path setting cannot be None") 

97 

98 self.library_root = str(Path(storage_path_setting).expanduser()) 

99 logger.warning( 

100 f"[DOWNLOAD_SERVICE_INIT] Library root resolved to: {self.library_root}" 

101 ) 

102 

103 # Create directory structure 

104 self._setup_directories() 

105 

106 # Initialize modular downloaders 

107 # DirectPDFDownloader first for efficiency with direct PDF links 

108 

109 # Get Semantic Scholar API key from settings 

110 semantic_scholar_api_key = self.settings.get_setting( 

111 "search.engine.web.semantic_scholar.api_key", "" 

112 ) 

113 

114 self.downloaders = [ 

115 DirectPDFDownloader(timeout=30), # Handle direct PDF links first 

116 SemanticScholarDownloader( 

117 timeout=30, 

118 api_key=semantic_scholar_api_key 

119 if semantic_scholar_api_key 

120 else None, 

121 ), 

122 OpenAlexDownloader( 

123 timeout=30 

124 ), # OpenAlex with API lookup (no key needed) 

125 ArxivDownloader(timeout=30), 

126 PubMedDownloader(timeout=30, rate_limit_delay=1.0), 

127 BioRxivDownloader(timeout=30), 

128 GenericDownloader(timeout=30), # Generic should be last (fallback) 

129 ] 

130 

131 # Initialize retry manager for smart failure tracking 

132 self.retry_manager = RetryManager(username, password) 

133 logger.info( 

134 f"[DOWNLOAD_SERVICE] Initialized retry manager for user: {username}" 

135 ) 

136 

137 # PubMed rate limiting state 

138 self._pubmed_delay = 1.0 # 1 second delay for PubMed 

139 self._last_pubmed_request = 0.0 # Track last request time 

140 

141 def close(self): 

142 """Close all downloader resources.""" 

143 if self._closed: 

144 return 

145 self._closed = True 

146 

147 for downloader in self.downloaders: 

148 if hasattr(downloader, "close"): 148 ↛ 147line 148 didn't jump to line 147 because the condition on line 148 was always true

149 try: 

150 downloader.close() 

151 except Exception: 

152 pass 

153 

154 # Clear references to allow garbage collection 

155 self.downloaders = [] 

156 self.retry_manager = None 

157 self.settings = None 

158 

159 def __enter__(self): 

160 """Enter context manager.""" 

161 return self 

162 

163 def __exit__(self, exc_type, exc_val, exc_tb): 

164 """Exit context manager, ensuring cleanup.""" 

165 self.close() 

166 return False 

167 

168 def _setup_directories(self): 

169 """Create library directory structure.""" 

170 # Only create the root and pdfs folder - flat structure 

171 paths = [ 

172 self.library_root, 

173 str(Path(self.library_root) / "pdfs"), 

174 ] 

175 for path in paths: 

176 Path(path).mkdir(parents=True, exist_ok=True) 

177 

178 def _normalize_url(self, url: str) -> str: 

179 """Normalize URL for consistent hashing.""" 

180 # Remove protocol variations 

181 url = re.sub(r"^https?://", "", url) 

182 # Remove www 

183 url = re.sub(r"^www\.", "", url) 

184 # Remove trailing slashes 

185 url = url.rstrip("/") 

186 # Sort query parameters 

187 if "?" in url: 

188 base, query = url.split("?", 1) 

189 params = sorted(query.split("&")) 

190 url = f"{base}?{'&'.join(params)}" 

191 return url.lower() 

192 

193 def _get_url_hash(self, url: str) -> str: 

194 """Generate SHA256 hash of normalized URL.""" 

195 normalized = self._normalize_url(url) 

196 return get_url_hash(normalized) 

197 

198 def is_already_downloaded(self, url: str) -> Tuple[bool, Optional[str]]: 

199 """ 

200 Check if URL is already downloaded. 

201 

202 Returns: 

203 Tuple of (is_downloaded, file_path) 

204 """ 

205 url_hash = self._get_url_hash(url) 

206 

207 with get_user_db_session(self.username, self.password) as session: 

208 tracker = ( 

209 session.query(DownloadTracker) 

210 .filter_by(url_hash=url_hash, is_downloaded=True) 

211 .first() 

212 ) 

213 

214 if tracker and tracker.file_path: 

215 # Compute absolute path and verify file still exists 

216 absolute_path = get_absolute_path_from_settings( 

217 tracker.file_path 

218 ) 

219 if absolute_path.exists(): 219 ↛ 223line 219 didn't jump to line 223 because the condition on line 219 was always true

220 return True, str(absolute_path) 

221 else: 

222 # File was deleted, mark as not downloaded 

223 tracker.is_downloaded = False 

224 session.commit() 

225 

226 return False, None 

227 

228 def get_text_content(self, resource_id: int) -> Optional[str]: 

229 """ 

230 Get text content for a research resource. 

231 

232 This will try to: 

233 1. Fetch text directly from APIs if available 

234 2. Extract text from downloaded PDF if exists 

235 3. Download PDF and extract text if not yet downloaded 

236 

237 Args: 

238 resource_id: ID of the research resource 

239 

240 Returns: 

241 Text content as string, or None if extraction failed 

242 """ 

243 with get_user_db_session(self.username, self.password) as session: 

244 resource = session.query(ResearchResource).get(resource_id) 

245 if not resource: 

246 logger.error(f"Resource {resource_id} not found") 

247 return None 

248 

249 url = resource.url 

250 

251 # Find appropriate downloader 

252 for downloader in self.downloaders: 

253 if downloader.can_handle(url): 

254 logger.info( 

255 f"Using {downloader.__class__.__name__} for text extraction from {url}" 

256 ) 

257 try: 

258 # Try to get text content 

259 text = downloader.download_text(url) 

260 if text: 260 ↛ 267line 260 didn't jump to line 267 because the condition on line 260 was always true

261 logger.info( 

262 f"Successfully extracted text for: {resource.title[:50]}" 

263 ) 

264 return text 

265 except Exception: 

266 logger.exception("Failed to extract text") 

267 break 

268 

269 logger.warning(f"Could not extract text for {url}") 

270 return None 

271 

272 def queue_research_downloads( 

273 self, research_id: str, collection_id: Optional[str] = None 

274 ) -> int: 

275 """ 

276 Queue all downloadable PDFs from a research session. 

277 

278 Args: 

279 research_id: The research session ID 

280 collection_id: Optional target collection ID (defaults to Library if not provided) 

281 

282 Returns: 

283 Number of items queued 

284 """ 

285 queued = 0 

286 

287 # Get default library collection if no collection_id provided 

288 if not collection_id: 288 ↛ 293line 288 didn't jump to line 293 because the condition on line 288 was always true

289 from ...database.library_init import get_default_library_id 

290 

291 collection_id = get_default_library_id(self.username, self.password) 

292 

293 with get_user_db_session(self.username, self.password) as session: 

294 # Get all resources for this research 

295 resources = ( 

296 session.query(ResearchResource) 

297 .filter_by(research_id=research_id) 

298 .all() 

299 ) 

300 

301 for resource in resources: 

302 if self._is_downloadable(resource): 

303 # Check if already queued 

304 existing_queue = ( 

305 session.query(LibraryDownloadQueue) 

306 .filter_by( 

307 resource_id=resource.id, 

308 status=DocumentStatus.PENDING, 

309 ) 

310 .first() 

311 ) 

312 

313 # Check if already downloaded (trust the database status) 

314 existing_doc = ( 

315 session.query(Document) 

316 .filter_by( 

317 resource_id=resource.id, 

318 status=DocumentStatus.COMPLETED, 

319 ) 

320 .first() 

321 ) 

322 

323 # Queue if not already queued and not marked as completed 

324 if not existing_queue and not existing_doc: 324 ↛ 301line 324 didn't jump to line 301 because the condition on line 324 was always true

325 # Check one more time if ANY queue entry exists (regardless of status) 

326 any_queue = ( 

327 session.query(LibraryDownloadQueue) 

328 .filter_by(resource_id=resource.id) 

329 .first() 

330 ) 

331 

332 if any_queue: 332 ↛ 334line 332 didn't jump to line 334 because the condition on line 332 was never true

333 # Reset the existing queue entry 

334 any_queue.status = DocumentStatus.PENDING 

335 any_queue.research_id = research_id 

336 any_queue.collection_id = collection_id 

337 queued += 1 

338 else: 

339 # Add new queue entry 

340 queue_entry = LibraryDownloadQueue( 

341 resource_id=resource.id, 

342 research_id=research_id, 

343 collection_id=collection_id, 

344 priority=0, 

345 status=DocumentStatus.PENDING, 

346 ) 

347 session.add(queue_entry) 

348 queued += 1 

349 

350 session.commit() 

351 logger.info( 

352 f"Queued {queued} downloads for research {research_id} to collection {collection_id}" 

353 ) 

354 

355 return queued 

356 

357 def _is_downloadable(self, resource: ResearchResource) -> bool: 

358 """Check if a resource is likely downloadable as PDF.""" 

359 url = resource.url.lower() if resource.url else "" 

360 

361 # Check for PDF extensions 

362 if url.endswith(".pdf"): 

363 return True 

364 

365 # Check for known academic sources with downloadable PDFs 

366 downloadable_domains = [ 

367 "arxiv.org", 

368 "biorxiv.org", 

369 "medrxiv.org", 

370 "ncbi.nlm.nih.gov/pmc", # PubMed Central has PDFs 

371 "pubmed.ncbi.nlm.nih.gov", # Try to find PMC version 

372 "semanticscholar.org", 

373 "researchgate.net", 

374 "academia.edu", 

375 ] 

376 

377 return any(domain in url for domain in downloadable_domains) 

378 

379 def download_resource(self, resource_id: int) -> Tuple[bool, Optional[str]]: 

380 """ 

381 Download a specific resource. 

382 

383 Returns: 

384 Tuple of (success: bool, skip_reason: str or None) 

385 """ 

386 with get_user_db_session(self.username, self.password) as session: 

387 resource = session.query(ResearchResource).get(resource_id) 

388 if not resource: 388 ↛ 393line 388 didn't jump to line 393 because the condition on line 388 was always true

389 logger.error(f"Resource {resource_id} not found") 

390 return False, "Resource not found" 

391 

392 # Check if already downloaded (trust the database after sync) 

393 existing_doc = ( 

394 session.query(Document) 

395 .filter_by( 

396 resource_id=resource_id, status=DocumentStatus.COMPLETED 

397 ) 

398 .first() 

399 ) 

400 

401 if existing_doc: 

402 logger.info( 

403 "Resource already downloaded (according to database)" 

404 ) 

405 return True, None 

406 

407 # Get collection_id from queue entry if it exists 

408 queue_entry = ( 

409 session.query(LibraryDownloadQueue) 

410 .filter_by(resource_id=resource_id) 

411 .first() 

412 ) 

413 collection_id = ( 

414 queue_entry.collection_id 

415 if queue_entry and queue_entry.collection_id 

416 else None 

417 ) 

418 

419 # Create download tracker entry 

420 url_hash = self._get_url_hash(resource.url) 

421 tracker = ( 

422 session.query(DownloadTracker) 

423 .filter_by(url_hash=url_hash) 

424 .first() 

425 ) 

426 

427 if not tracker: 

428 tracker = DownloadTracker( 

429 url=resource.url, 

430 url_hash=url_hash, 

431 first_resource_id=resource.id, 

432 is_downloaded=False, 

433 ) 

434 session.add(tracker) 

435 session.commit() 

436 

437 # Attempt download 

438 result = self._download_pdf( 

439 resource, tracker, session, collection_id 

440 ) 

441 

442 # Get skip reason if failed 

443 skip_reason = None 

444 if isinstance(result, tuple): 

445 success, skip_reason = result 

446 else: 

447 success = result 

448 if not success: 

449 # Try to get skip reason from last attempt 

450 last_attempt = ( 

451 session.query(DownloadAttempt) 

452 .filter_by(url_hash=tracker.url_hash) 

453 .order_by(DownloadAttempt.attempted_at.desc()) 

454 .first() 

455 ) 

456 if last_attempt and hasattr(last_attempt, "error_message"): 

457 skip_reason = last_attempt.error_message 

458 

459 # Record attempt with retry manager for smart failure tracking 

460 self.retry_manager.record_attempt( 

461 resource_id=resource.id, 

462 result=(success, skip_reason), 

463 url=resource.url, 

464 details=skip_reason 

465 or ( 

466 "Successfully downloaded" if success else "Download failed" 

467 ), 

468 session=session, 

469 ) 

470 

471 # Update queue status if exists 

472 queue_entry = ( 

473 session.query(LibraryDownloadQueue) 

474 .filter_by(resource_id=resource_id) 

475 .first() 

476 ) 

477 

478 if queue_entry: 

479 queue_entry.status = ( 

480 DocumentStatus.COMPLETED 

481 if success 

482 else DocumentStatus.FAILED 

483 ) 

484 queue_entry.completed_at = datetime.now(UTC) 

485 

486 session.commit() 

487 

488 # Trigger auto-indexing for successfully downloaded documents 

489 if success and self.password: 

490 try: 

491 from ..routes.rag_routes import trigger_auto_index 

492 from ...database.library_init import get_default_library_id 

493 

494 # Get the document that was just created 

495 doc = ( 

496 session.query(Document) 

497 .filter_by(resource_id=resource_id) 

498 .order_by(Document.created_at.desc()) 

499 .first() 

500 ) 

501 if doc: 

502 # Use collection_id from queue entry or default Library 

503 # NB: pass username string, not the SQLAlchemy session 

504 target_collection = ( 

505 collection_id 

506 or get_default_library_id( 

507 self.username, self.password 

508 ) 

509 ) 

510 if target_collection: 

511 trigger_auto_index( 

512 [doc.id], 

513 target_collection, 

514 self.username, 

515 self.password, 

516 ) 

517 except Exception: 

518 logger.exception("Failed to trigger auto-indexing") 

519 

520 return success, skip_reason 

521 

522 def _download_pdf( 

523 self, 

524 resource: ResearchResource, 

525 tracker: DownloadTracker, 

526 session: Session, 

527 collection_id: Optional[str] = None, 

528 ) -> Tuple[bool, Optional[str]]: 

529 """ 

530 Perform the actual PDF download. 

531 

532 Args: 

533 resource: The research resource to download 

534 tracker: Download tracker for this URL 

535 session: Database session 

536 collection_id: Optional target collection ID (defaults to Library if not provided) 

537 

538 Returns: 

539 Tuple of (success: bool, skip_reason: Optional[str]) 

540 """ 

541 url = resource.url 

542 

543 # Log attempt 

544 attempt = DownloadAttempt( 

545 url_hash=tracker.url_hash, 

546 attempt_number=tracker.download_attempts.count() + 1 

547 if hasattr(tracker, "download_attempts") 

548 else 1, 

549 attempted_at=datetime.now(UTC), 

550 ) 

551 session.add(attempt) 

552 

553 try: 

554 # Use modular downloaders with detailed skip reasons 

555 pdf_content = None 

556 downloader_used = None 

557 skip_reason = None 

558 

559 for downloader in self.downloaders: 

560 if downloader.can_handle(url): 

561 logger.info( 

562 f"Using {downloader.__class__.__name__} for {url}" 

563 ) 

564 result = downloader.download_with_result( 

565 url, ContentType.PDF 

566 ) 

567 downloader_used = downloader.__class__.__name__ 

568 

569 if result.is_success and result.content: 569 ↛ 572line 569 didn't jump to line 572 because the condition on line 569 was always true

570 pdf_content = result.content 

571 break 

572 elif result.skip_reason: 

573 skip_reason = result.skip_reason 

574 logger.info(f"Download skipped: {skip_reason}") 

575 # Keep trying other downloaders unless it's the GenericDownloader 

576 if isinstance(downloader, GenericDownloader): 

577 break 

578 

579 if not downloader_used: 

580 logger.error(f"No downloader found for {url}") 

581 skip_reason = "No compatible downloader available" 

582 

583 if not pdf_content: 

584 error_msg = skip_reason or "Failed to download PDF content" 

585 # Store skip reason in attempt for retrieval 

586 attempt.error_message = error_msg 

587 attempt.succeeded = False 

588 session.commit() 

589 logger.info(f"Download failed with reason: {error_msg}") 

590 return False, error_msg 

591 

592 # Get PDF storage mode setting 

593 pdf_storage_mode = self.settings.get_setting( 

594 "research_library.pdf_storage_mode", "none" 

595 ) 

596 max_pdf_size_mb = int( 

597 self.settings.get_setting( 

598 "research_library.max_pdf_size_mb", 100 

599 ) 

600 ) 

601 logger.info( 

602 f"[DOWNLOAD_SERVICE] PDF storage mode: {pdf_storage_mode}" 

603 ) 

604 

605 # Update tracker 

606 import hashlib 

607 

608 tracker.file_hash = hashlib.sha256(pdf_content).hexdigest() 

609 tracker.file_size = len(pdf_content) 

610 tracker.is_downloaded = True 

611 tracker.downloaded_at = datetime.now(UTC) 

612 

613 # Initialize PDF storage manager 

614 pdf_storage_manager = PDFStorageManager( 

615 library_root=self.library_root, 

616 storage_mode=pdf_storage_mode, 

617 max_pdf_size_mb=max_pdf_size_mb, 

618 ) 

619 

620 # Update attempt with success info 

621 attempt.succeeded = True 

622 

623 # Check if library document already exists 

624 existing_doc = ( 

625 session.query(Document) 

626 .filter_by(resource_id=resource.id) 

627 .first() 

628 ) 

629 

630 if existing_doc: 

631 # Update existing document 

632 existing_doc.document_hash = tracker.file_hash 

633 existing_doc.file_size = len(pdf_content) 

634 existing_doc.status = DocumentStatus.COMPLETED 

635 existing_doc.processed_at = datetime.now(UTC) 

636 

637 # Save PDF using storage manager (updates storage_mode and file_path) 

638 file_path_result, _ = pdf_storage_manager.save_pdf( 

639 pdf_content=pdf_content, 

640 document=existing_doc, 

641 session=session, 

642 filename=f"{resource.id}.pdf", 

643 url=url, 

644 resource_id=resource.id, 

645 ) 

646 

647 # Update tracker 

648 tracker.file_path = ( 

649 file_path_result if file_path_result else None 

650 ) 

651 tracker.file_name = ( 

652 Path(file_path_result).name 

653 if file_path_result and file_path_result != "database" 

654 else None 

655 ) 

656 else: 

657 # Get source type ID for research downloads 

658 try: 

659 source_type_id = get_source_type_id( 

660 self.username, "research_download", self.password 

661 ) 

662 # Use provided collection_id or default to Library 

663 library_collection_id = ( 

664 collection_id 

665 or get_default_library_id(self.username, self.password) 

666 ) 

667 except Exception: 

668 logger.exception( 

669 "Failed to get source type or library collection" 

670 ) 

671 raise 

672 

673 # Create new unified document entry 

674 doc_id = str(uuid.uuid4()) 

675 doc = Document( 

676 id=doc_id, 

677 source_type_id=source_type_id, 

678 resource_id=resource.id, 

679 research_id=resource.research_id, 

680 document_hash=tracker.file_hash, 

681 original_url=url, 

682 file_size=len(pdf_content), 

683 file_type="pdf", 

684 mime_type="application/pdf", 

685 title=resource.title, 

686 status=DocumentStatus.COMPLETED, 

687 processed_at=datetime.now(UTC), 

688 storage_mode=pdf_storage_mode, 

689 ) 

690 session.add(doc) 

691 session.flush() # Ensure doc.id is available for blob storage 

692 

693 # Save PDF using storage manager (updates storage_mode and file_path) 

694 file_path_result, _ = pdf_storage_manager.save_pdf( 

695 pdf_content=pdf_content, 

696 document=doc, 

697 session=session, 

698 filename=f"{resource.id}.pdf", 

699 url=url, 

700 resource_id=resource.id, 

701 ) 

702 

703 # Update tracker 

704 tracker.file_path = ( 

705 file_path_result if file_path_result else None 

706 ) 

707 tracker.file_name = ( 

708 Path(file_path_result).name 

709 if file_path_result and file_path_result != "database" 

710 else None 

711 ) 

712 

713 # Link document to default Library collection 

714 doc_collection = DocumentCollection( 

715 document_id=doc_id, 

716 collection_id=library_collection_id, 

717 indexed=False, 

718 ) 

719 session.add(doc_collection) 

720 

721 # Update attempt 

722 attempt.succeeded = True 

723 attempt.bytes_downloaded = len(pdf_content) 

724 

725 if pdf_storage_mode == "database": 

726 logger.info( 

727 f"Successfully stored PDF in database: {resource.url}" 

728 ) 

729 elif pdf_storage_mode == "filesystem": 

730 logger.info(f"Successfully downloaded: {tracker.file_path}") 

731 else: 

732 logger.info(f"Successfully extracted text from: {resource.url}") 

733 

734 # Automatically extract and save text after successful PDF download 

735 try: 

736 logger.info( 

737 f"Extracting text from downloaded PDF for: {resource.title[:50]}" 

738 ) 

739 text = self._extract_text_from_pdf(pdf_content) 

740 

741 if text: 

742 # Get the document ID we just created/updated 

743 pdf_doc = ( 

744 session.query(Document) 

745 .filter_by(resource_id=resource.id) 

746 .first() 

747 ) 

748 pdf_document_id = pdf_doc.id if pdf_doc else None 

749 

750 # Save text to encrypted database 

751 self._save_text_with_db( 

752 resource=resource, 

753 text=text, 

754 session=session, 

755 extraction_method="pdf_extraction", 

756 extraction_source="local_pdf", 

757 pdf_document_id=pdf_document_id, 

758 ) 

759 logger.info( 

760 f"Successfully extracted and saved text for: {resource.title[:50]}" 

761 ) 

762 else: 

763 logger.warning( 

764 f"Text extraction returned empty text for: {resource.title[:50]}" 

765 ) 

766 except Exception: 

767 logger.exception( 

768 "Failed to extract text from PDF, but PDF download succeeded" 

769 ) 

770 # Don't fail the entire download if text extraction fails 

771 

772 return True, None 

773 

774 except Exception as e: 

775 logger.exception(f"Download failed for {url}") 

776 attempt.succeeded = False 

777 attempt.error_type = type(e).__name__ 

778 attempt.error_message = str(e) 

779 tracker.is_accessible = False 

780 # Sanitize error message before returning to API 

781 safe_error = redact_data(str(e)) 

782 return False, safe_error 

783 

784 def _extract_text_from_pdf(self, pdf_content: bytes) -> Optional[str]: 

785 """ 

786 Extract text from PDF content using multiple methods for best results. 

787 

788 Args: 

789 pdf_content: Raw PDF bytes 

790 

791 Returns: 

792 Extracted text or None if extraction fails 

793 """ 

794 try: 

795 # First try with pdfplumber (better for complex layouts) 

796 import io 

797 

798 with pdfplumber.open(io.BytesIO(pdf_content)) as pdf: 

799 text_parts = [] 

800 for page in pdf.pages: 

801 page_text = page.extract_text() 

802 if page_text: 

803 text_parts.append(page_text) 

804 

805 if text_parts: 

806 return "\n\n".join(text_parts) 

807 

808 # Fallback to PyPDF if pdfplumber fails 

809 reader = PdfReader(io.BytesIO(pdf_content)) 

810 text_parts = [] 

811 for page in reader.pages: 

812 text = page.extract_text() 

813 if text: 

814 text_parts.append(text) 

815 

816 if text_parts: 

817 return "\n\n".join(text_parts) 

818 

819 logger.warning("No text could be extracted from PDF") 

820 return None 

821 

822 except Exception: 

823 logger.exception("Failed to extract text from PDF") 

824 return None 

825 

826 def download_as_text(self, resource_id: int) -> Tuple[bool, Optional[str]]: 

827 """ 

828 Download resource and extract text to encrypted database. 

829 

830 Args: 

831 resource_id: ID of the resource to download 

832 

833 Returns: 

834 Tuple of (success, error_message) 

835 """ 

836 with get_user_db_session(self.username, self.password) as session: 

837 # Get the resource 

838 resource = ( 

839 session.query(ResearchResource) 

840 .filter_by(id=resource_id) 

841 .first() 

842 ) 

843 if not resource: 

844 return False, "Resource not found" 

845 

846 # Try existing text in database 

847 result = self._try_existing_text(session, resource_id) 

848 if result is not None: 848 ↛ 852line 848 didn't jump to line 852 because the condition on line 848 was always true

849 return result 

850 

851 # Try legacy text files on disk 

852 result = self._try_legacy_text_file(session, resource, resource_id) 

853 if result is not None: 

854 return result 

855 

856 # Try extracting from existing PDF 

857 result = self._try_existing_pdf_extraction( 

858 session, resource, resource_id 

859 ) 

860 if result is not None: 

861 return result 

862 

863 # Try API text extraction 

864 result = self._try_api_text_extraction(session, resource) 

865 if result is not None: 

866 return result 

867 

868 # Fallback: Download PDF and extract 

869 return self._fallback_pdf_extraction(session, resource) 

870 

871 def _try_existing_text( 

872 self, session, resource_id: int 

873 ) -> Optional[Tuple[bool, Optional[str]]]: 

874 """Check if text already exists in database (in Document.text_content).""" 

875 existing_doc = ( 

876 session.query(Document).filter_by(resource_id=resource_id).first() 

877 ) 

878 

879 if not existing_doc: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true

880 return None 

881 

882 # Check if text content exists and extraction was successful 

883 if ( 883 ↛ 894line 883 didn't jump to line 894 because the condition on line 883 was always true

884 existing_doc.text_content 

885 and existing_doc.extraction_method 

886 and existing_doc.extraction_method != "failed" 

887 ): 

888 logger.info( 

889 f"Text content already exists in Document for resource_id={resource_id}, extraction_method={existing_doc.extraction_method}" 

890 ) 

891 return True, None 

892 

893 # No text content or failed extraction 

894 logger.debug( 

895 f"Document exists but no valid text content: resource_id={resource_id}, extraction_method={existing_doc.extraction_method}" 

896 ) 

897 return None # Fall through to re-extraction 

898 

899 def _try_legacy_text_file( 

900 self, session, resource, resource_id: int 

901 ) -> Optional[Tuple[bool, Optional[str]]]: 

902 """Check for legacy text files on disk.""" 

903 txt_path = Path(self.library_root) / "txt" 

904 existing_files = ( 

905 list(txt_path.glob(f"*_{resource_id}.txt")) 

906 if txt_path.exists() 

907 else [] 

908 ) 

909 

910 if not existing_files: 

911 return None 

912 

913 logger.info(f"Text file already exists on disk: {existing_files[0]}") 

914 self._create_text_document_record( 

915 session, 

916 resource, 

917 existing_files[0], 

918 extraction_method="unknown", 

919 extraction_source="legacy_file", 

920 ) 

921 session.commit() 

922 return True, None 

923 

924 def _try_existing_pdf_extraction( 

925 self, session, resource, resource_id: int 

926 ) -> Optional[Tuple[bool, Optional[str]]]: 

927 """Try extracting text from existing PDF in database.""" 

928 pdf_document = ( 

929 session.query(Document).filter_by(resource_id=resource_id).first() 

930 ) 

931 

932 if not pdf_document or pdf_document.status != "completed": 

933 return None 

934 

935 pdf_path = Path(self.library_root) / pdf_document.file_path 

936 if not pdf_path.exists(): 936 ↛ 939line 936 didn't jump to line 939 because the condition on line 936 was always true

937 return None 

938 

939 logger.info(f"Found existing PDF, extracting text from: {pdf_path}") 

940 try: 

941 with open(pdf_path, "rb") as f: 

942 pdf_content = f.read() 

943 text = self._extract_text_from_pdf(pdf_content) 

944 

945 if not text: 

946 return None 

947 

948 self._save_text_with_db( 

949 resource, 

950 text, 

951 session, 

952 extraction_method="pdf_extraction", 

953 extraction_source="pdfplumber", 

954 pdf_document_id=pdf_document.id, 

955 ) 

956 session.commit() 

957 return True, None 

958 

959 except Exception: 

960 logger.exception( 

961 f"Failed to extract text from existing PDF: {pdf_path}" 

962 ) 

963 return None # Fall through to other methods 

964 

965 def _try_api_text_extraction( 

966 self, session, resource 

967 ) -> Optional[Tuple[bool, Optional[str]]]: 

968 """Try direct API text extraction.""" 

969 logger.info( 

970 f"Attempting direct API text extraction from: {resource.url}" 

971 ) 

972 

973 downloader = self._get_downloader(resource.url) 

974 if not downloader: 

975 return None 

976 

977 result = downloader.download_with_result(resource.url, ContentType.TEXT) 

978 

979 if not result.is_success or not result.content: 

980 return None 

981 

982 # Decode text content 

983 text = ( 

984 result.content.decode("utf-8", errors="ignore") 

985 if isinstance(result.content, bytes) 

986 else result.content 

987 ) 

988 

989 # Determine extraction source 

990 extraction_source = "unknown" 

991 if isinstance(downloader, ArxivDownloader): 

992 extraction_source = "arxiv_api" 

993 elif isinstance(downloader, PubMedDownloader): 

994 extraction_source = "pubmed_api" 

995 

996 try: 

997 self._save_text_with_db( 

998 resource, 

999 text, 

1000 session, 

1001 extraction_method="native_api", 

1002 extraction_source=extraction_source, 

1003 ) 

1004 session.commit() 

1005 logger.info( 

1006 f"✓ SUCCESS: Got text from {extraction_source.upper()} API for '{resource.title[:50]}...'" 

1007 ) 

1008 return True, None 

1009 except Exception as e: 

1010 logger.exception(f"Failed to save text for resource {resource.id}") 

1011 # Sanitize error message before returning to API 

1012 safe_error = redact_data(f"Failed to save text: {str(e)}") 

1013 return False, safe_error 

1014 

1015 def _fallback_pdf_extraction( 

1016 self, session, resource 

1017 ) -> Tuple[bool, Optional[str]]: 

1018 """Fallback: Download PDF to memory and extract text.""" 

1019 logger.info( 

1020 f"API text extraction failed, falling back to in-memory PDF download for: {resource.url}" 

1021 ) 

1022 

1023 downloader = self._get_downloader(resource.url) 

1024 if not downloader: 

1025 error_msg = "No compatible downloader found" 

1026 logger.warning( 

1027 f"✗ FAILED: {error_msg} for '{resource.title[:50]}...'" 

1028 ) 

1029 self._record_failed_text_extraction( 

1030 session, resource, error=error_msg 

1031 ) 

1032 session.commit() 

1033 return False, error_msg 

1034 

1035 result = downloader.download_with_result(resource.url, ContentType.PDF) 

1036 

1037 if not result.is_success or not result.content: 1037 ↛ 1049line 1037 didn't jump to line 1049 because the condition on line 1037 was always true

1038 error_msg = result.skip_reason or "Failed to download PDF" 

1039 logger.warning( 

1040 f"✗ FAILED: Could not download PDF for '{resource.title[:50]}...' | Error: {error_msg}" 

1041 ) 

1042 self._record_failed_text_extraction( 

1043 session, resource, error=f"PDF download failed: {error_msg}" 

1044 ) 

1045 session.commit() 

1046 return False, f"PDF extraction failed: {error_msg}" 

1047 

1048 # Extract text from PDF 

1049 text = self._extract_text_from_pdf(result.content) 

1050 if not text: 

1051 error_msg = "PDF text extraction returned empty text" 

1052 logger.warning( 

1053 f"Failed to extract text from PDF for: {resource.url}" 

1054 ) 

1055 self._record_failed_text_extraction( 

1056 session, resource, error=error_msg 

1057 ) 

1058 session.commit() 

1059 return False, error_msg 

1060 

1061 try: 

1062 self._save_text_with_db( 

1063 resource, 

1064 text, 

1065 session, 

1066 extraction_method="pdf_extraction", 

1067 extraction_source="pdfplumber_fallback", 

1068 ) 

1069 session.commit() 

1070 logger.info( 

1071 f"✓ SUCCESS: Extracted text from '{resource.title[:50]}...'" 

1072 ) 

1073 return True, None 

1074 except Exception as e: 

1075 logger.exception(f"Failed to save text for resource {resource.id}") 

1076 # Sanitize error message before returning to API 

1077 safe_error = redact_data(f"Failed to save text: {str(e)}") 

1078 return False, safe_error 

1079 

1080 def _get_downloader(self, url: str): 

1081 """ 

1082 Get the appropriate downloader for a URL. 

1083 

1084 Args: 

1085 url: The URL to download from 

1086 

1087 Returns: 

1088 The appropriate downloader instance or None 

1089 """ 

1090 for downloader in self.downloaders: 

1091 if downloader.can_handle(url): 

1092 return downloader 

1093 return None 

1094 

1095 def _download_generic(self, url: str) -> Optional[bytes]: 

1096 """Generic PDF download method.""" 

1097 try: 

1098 headers = { 

1099 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1100 } 

1101 response = safe_get( 

1102 url, headers=headers, timeout=30, allow_redirects=True 

1103 ) 

1104 response.raise_for_status() 

1105 

1106 # Verify it's a PDF 

1107 content_type = response.headers.get("Content-Type", "") 

1108 if ( 

1109 "pdf" not in content_type.lower() 

1110 and not response.content.startswith(b"%PDF") 

1111 ): 

1112 logger.warning(f"Response is not a PDF: {content_type}") 

1113 return None 

1114 

1115 return response.content 

1116 

1117 except Exception: 

1118 logger.exception("Generic download failed") 

1119 return None 

1120 

1121 def _download_arxiv(self, url: str) -> Optional[bytes]: 

1122 """Download from arXiv.""" 

1123 try: 

1124 # Convert abstract URL to PDF URL 

1125 pdf_url = url.replace("abs", "pdf") 

1126 if not pdf_url.endswith(".pdf"): 

1127 pdf_url += ".pdf" 

1128 

1129 return self._download_generic(pdf_url) 

1130 except Exception: 

1131 logger.exception("arXiv download failed") 

1132 return None 

1133 

1134 def _try_europe_pmc(self, pmid: str) -> Optional[bytes]: 

1135 """Try downloading from Europe PMC which often has better PDF availability.""" 

1136 try: 

1137 # Europe PMC API is more reliable for PDFs 

1138 # Check if PDF is available 

1139 api_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=EXT_ID:{pmid}&format=json" 

1140 response = safe_get(api_url, timeout=10) 

1141 

1142 if response.status_code == 200: 

1143 data = response.json() 

1144 results = data.get("resultList", {}).get("result", []) 

1145 

1146 if results: 

1147 article = results[0] 

1148 # Check if article has open access PDF 

1149 if ( 

1150 article.get("isOpenAccess") == "Y" 

1151 and article.get("hasPDF") == "Y" 

1152 ): 

1153 pmcid = article.get("pmcid") 

1154 if pmcid: 

1155 # Europe PMC PDF URL 

1156 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmcid}&blobtype=pdf" 

1157 logger.info( 

1158 f"Found Europe PMC PDF for PMID {pmid}: {pmcid}" 

1159 ) 

1160 

1161 headers = { 

1162 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1163 } 

1164 

1165 pdf_response = safe_get( 

1166 pdf_url, 

1167 headers=headers, 

1168 timeout=30, 

1169 allow_redirects=True, 

1170 ) 

1171 if pdf_response.status_code == 200: 

1172 content_type = pdf_response.headers.get( 

1173 "content-type", "" 

1174 ) 

1175 if ( 

1176 "pdf" in content_type.lower() 

1177 or len(pdf_response.content) > 1000 

1178 ): 

1179 return pdf_response.content 

1180 except Exception as e: 

1181 logger.debug(f"Europe PMC download failed: {e}") 

1182 

1183 return None 

1184 

1185 def _download_pubmed(self, url: str) -> Optional[bytes]: 

1186 """Download from PubMed/PubMed Central with rate limiting.""" 

1187 try: 

1188 # Apply rate limiting for PubMed requests 

1189 current_time = time.time() 

1190 time_since_last = current_time - self._last_pubmed_request 

1191 if time_since_last < self._pubmed_delay: 

1192 sleep_time = self._pubmed_delay - time_since_last 

1193 logger.debug( 

1194 f"Rate limiting: sleeping {sleep_time:.2f}s before PubMed request" 

1195 ) 

1196 time.sleep(sleep_time) 

1197 self._last_pubmed_request = time.time() 

1198 

1199 # If it's already a PMC article, download directly 

1200 if "/articles/PMC" in url: 

1201 pmc_match = re.search(r"(PMC\d+)", url) 

1202 if pmc_match: 

1203 pmc_id = pmc_match.group(1) 

1204 

1205 # Try Europe PMC (more reliable) 

1206 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1207 headers = { 

1208 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1209 } 

1210 

1211 try: 

1212 response = safe_get( 

1213 europe_url, 

1214 headers=headers, 

1215 timeout=30, 

1216 allow_redirects=True, 

1217 ) 

1218 if response.status_code == 200: 

1219 content_type = response.headers.get( 

1220 "content-type", "" 

1221 ) 

1222 if ( 

1223 "pdf" in content_type.lower() 

1224 or len(response.content) > 1000 

1225 ): 

1226 logger.info( 

1227 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1228 ) 

1229 return response.content 

1230 except Exception as e: 

1231 logger.debug(f"Direct Europe PMC download failed: {e}") 

1232 return None 

1233 

1234 # If it's a regular PubMed URL, try to find PMC version 

1235 elif urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov": 

1236 # Extract PMID from URL 

1237 pmid_match = re.search(r"/(\d+)/?", url) 

1238 if pmid_match: 

1239 pmid = pmid_match.group(1) 

1240 logger.info(f"Attempting to download PDF for PMID: {pmid}") 

1241 

1242 # Try Europe PMC first (more reliable) 

1243 pdf_content = self._try_europe_pmc(pmid) 

1244 if pdf_content: 

1245 return pdf_content 

1246 

1247 # First try using NCBI E-utilities API to find PMC ID 

1248 try: 

1249 # Use elink to convert PMID to PMCID 

1250 elink_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi" 

1251 params = { 

1252 "dbfrom": "pubmed", 

1253 "db": "pmc", 

1254 "id": pmid, 

1255 "retmode": "json", 

1256 } 

1257 

1258 api_response = safe_get( 

1259 elink_url, params=params, timeout=10 

1260 ) 

1261 if api_response.status_code == 200: 

1262 data = api_response.json() 

1263 # Parse the response to find PMC ID 

1264 link_sets = data.get("linksets", []) 

1265 if link_sets and "linksetdbs" in link_sets[0]: 

1266 for linksetdb in link_sets[0]["linksetdbs"]: 

1267 if linksetdb.get( 

1268 "dbto" 

1269 ) == "pmc" and linksetdb.get("links"): 

1270 pmc_id_num = linksetdb["links"][0] 

1271 # Now fetch PMC details to get the correct PMC ID format 

1272 esummary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi" 

1273 summary_params = { 

1274 "db": "pmc", 

1275 "id": pmc_id_num, 

1276 "retmode": "json", 

1277 } 

1278 summary_response = safe_get( 

1279 esummary_url, 

1280 params=summary_params, 

1281 timeout=10, 

1282 ) 

1283 if summary_response.status_code == 200: 

1284 summary_data = ( 

1285 summary_response.json() 

1286 ) 

1287 result = summary_data.get( 

1288 "result", {} 

1289 ).get(str(pmc_id_num), {}) 

1290 if result: 

1291 # PMC IDs in the API don't have the "PMC" prefix 

1292 pmc_id = f"PMC{pmc_id_num}" 

1293 logger.info( 

1294 f"Found PMC ID via API: {pmc_id} for PMID: {pmid}" 

1295 ) 

1296 

1297 # Try Europe PMC with the PMC ID 

1298 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1299 

1300 time.sleep(self._pubmed_delay) 

1301 

1302 headers = { 

1303 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1304 } 

1305 

1306 try: 

1307 response = safe_get( 

1308 europe_url, 

1309 headers=headers, 

1310 timeout=30, 

1311 allow_redirects=True, 

1312 ) 

1313 if ( 

1314 response.status_code 

1315 == 200 

1316 ): 

1317 content_type = response.headers.get( 

1318 "content-type", "" 

1319 ) 

1320 if ( 

1321 "pdf" 

1322 in content_type.lower() 

1323 or len( 

1324 response.content 

1325 ) 

1326 > 1000 

1327 ): 

1328 logger.info( 

1329 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1330 ) 

1331 return ( 

1332 response.content 

1333 ) 

1334 except Exception as e: 

1335 logger.debug( 

1336 f"Europe PMC download with PMC ID failed: {e}" 

1337 ) 

1338 except Exception as e: 

1339 logger.debug( 

1340 f"API lookup failed, trying webpage scraping: {e}" 

1341 ) 

1342 

1343 # Fallback to webpage scraping if API fails 

1344 try: 

1345 headers = { 

1346 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1347 } 

1348 response = safe_get(url, headers=headers, timeout=10) 

1349 if response.status_code == 200: 

1350 # Look for PMC ID in the page 

1351 pmc_match = re.search(r"PMC\d+", response.text) 

1352 if pmc_match: 

1353 pmc_id = pmc_match.group(0) 

1354 logger.info( 

1355 f"Found PMC ID via webpage: {pmc_id} for PMID: {pmid}" 

1356 ) 

1357 

1358 # Add delay before downloading PDF 

1359 time.sleep(self._pubmed_delay) 

1360 

1361 # Try Europe PMC with the PMC ID (more reliable) 

1362 europe_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

1363 headers = { 

1364 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" 

1365 } 

1366 

1367 try: 

1368 response = safe_get( 

1369 europe_url, 

1370 headers=headers, 

1371 timeout=30, 

1372 allow_redirects=True, 

1373 ) 

1374 if response.status_code == 200: 

1375 content_type = response.headers.get( 

1376 "content-type", "" 

1377 ) 

1378 if ( 

1379 "pdf" in content_type.lower() 

1380 or len(response.content) > 1000 

1381 ): 

1382 logger.info( 

1383 f"Downloaded PDF via Europe PMC for {pmc_id}" 

1384 ) 

1385 return response.content 

1386 except Exception as e: 

1387 logger.debug( 

1388 f"Europe PMC download failed: {e}" 

1389 ) 

1390 else: 

1391 logger.info( 

1392 f"No PMC version found for PMID: {pmid}" 

1393 ) 

1394 except requests.exceptions.HTTPError as e: 

1395 if e.response.status_code == 429: 

1396 logger.warning( 

1397 "Rate limited by PubMed, increasing delay" 

1398 ) 

1399 self._pubmed_delay = min( 

1400 self._pubmed_delay * 2, 5.0 

1401 ) # Max 5 seconds 

1402 raise 

1403 except Exception as e: 

1404 logger.debug(f"Could not check for PMC version: {e}") 

1405 

1406 return self._download_generic(url) 

1407 except Exception: 

1408 logger.exception("PubMed download failed") 

1409 return None 

1410 

1411 def _download_semantic_scholar(self, url: str) -> Optional[bytes]: 

1412 """Download from Semantic Scholar.""" 

1413 # Semantic Scholar doesn't host PDFs directly 

1414 # Would need to extract actual PDF URL from page 

1415 return None 

1416 

1417 def _download_biorxiv(self, url: str) -> Optional[bytes]: 

1418 """Download from bioRxiv.""" 

1419 try: 

1420 # Convert to PDF URL 

1421 pdf_url = url.replace(".org/", ".org/content/") 

1422 pdf_url = re.sub(r"v\d+$", "", pdf_url) # Remove version 

1423 pdf_url += ".full.pdf" 

1424 

1425 return self._download_generic(pdf_url) 

1426 except Exception: 

1427 logger.exception("bioRxiv download failed") 

1428 return None 

1429 

1430 def _download_medrxiv(self, url: str) -> Optional[bytes]: 

1431 """Download from medRxiv.""" 

1432 # Same as bioRxiv 

1433 return self._download_biorxiv(url) 

1434 

1435 def _save_text_with_db( 

1436 self, 

1437 resource: ResearchResource, 

1438 text: str, 

1439 session: Session, 

1440 extraction_method: str, 

1441 extraction_source: str, 

1442 pdf_document_id: Optional[int] = None, 

1443 ) -> Optional[str]: 

1444 """ 

1445 Save extracted text to encrypted database. 

1446 

1447 Args: 

1448 resource: The research resource 

1449 text: Extracted text content 

1450 session: Database session 

1451 extraction_method: How the text was extracted 

1452 extraction_source: Specific tool/API used 

1453 pdf_document_id: ID of PDF document if extracted from PDF 

1454 

1455 Returns: 

1456 None (previously returned text file path, now removed) 

1457 """ 

1458 try: 

1459 # Calculate text metadata for database 

1460 word_count = len(text.split()) 

1461 character_count = len(text) 

1462 

1463 # Find the document by pdf_document_id or resource_id 

1464 doc = None 

1465 if pdf_document_id: 

1466 doc = ( 

1467 session.query(Document) 

1468 .filter_by(id=pdf_document_id) 

1469 .first() 

1470 ) 

1471 else: 

1472 doc = ( 

1473 session.query(Document) 

1474 .filter_by(resource_id=resource.id) 

1475 .first() 

1476 ) 

1477 

1478 if doc: 

1479 # Update existing document with extracted text 

1480 doc.text_content = text 

1481 doc.character_count = character_count 

1482 doc.word_count = word_count 

1483 doc.extraction_method = extraction_method 

1484 doc.extraction_source = extraction_source 

1485 

1486 # Set quality based on method 

1487 if extraction_method == "native_api": 

1488 doc.extraction_quality = "high" 

1489 elif ( 

1490 extraction_method == "pdf_extraction" 

1491 and extraction_source == "pdfplumber" 

1492 ): 

1493 doc.extraction_quality = "medium" 

1494 else: 

1495 doc.extraction_quality = "low" 

1496 

1497 logger.debug( 

1498 f"Updated document {doc.id} with extracted text ({word_count} words)" 

1499 ) 

1500 else: 

1501 # Create a new Document for text-only extraction 

1502 # Generate hash from text content 

1503 text_hash = hashlib.sha256(text.encode()).hexdigest() 

1504 

1505 # Get source type for research downloads 

1506 try: 

1507 source_type_id = get_source_type_id( 

1508 self.username, "research_download", self.password 

1509 ) 

1510 except Exception: 

1511 logger.exception( 

1512 "Failed to get source type for text document" 

1513 ) 

1514 raise 

1515 

1516 # Create new document 

1517 doc_id = str(uuid.uuid4()) 

1518 doc = Document( 

1519 id=doc_id, 

1520 source_type_id=source_type_id, 

1521 resource_id=resource.id, 

1522 research_id=resource.research_id, 

1523 document_hash=text_hash, 

1524 original_url=resource.url, 

1525 file_path="text_only_not_stored", 

1526 file_size=character_count, # Use character count as file size for text-only 

1527 file_type="text", 

1528 mime_type="text/plain", 

1529 title=resource.title, 

1530 text_content=text, 

1531 character_count=character_count, 

1532 word_count=word_count, 

1533 extraction_method=extraction_method, 

1534 extraction_source=extraction_source, 

1535 extraction_quality="high" 

1536 if extraction_method == "native_api" 

1537 else "medium", 

1538 status=DocumentStatus.COMPLETED, 

1539 processed_at=datetime.now(UTC), 

1540 ) 

1541 session.add(doc) 

1542 

1543 # Link to default Library collection 

1544 library_collection = ( 

1545 session.query(Collection).filter_by(name="Library").first() 

1546 ) 

1547 if library_collection: 

1548 doc_collection = DocumentCollection( 

1549 document_id=doc_id, 

1550 collection_id=library_collection.id, 

1551 indexed=False, 

1552 chunk_count=0, 

1553 ) 

1554 session.add(doc_collection) 

1555 else: 

1556 logger.warning( 

1557 f"Library collection not found - document {doc_id} will not be linked to default collection" 

1558 ) 

1559 

1560 logger.info( 

1561 f"Created new document {doc_id} for text-only extraction ({word_count} words)" 

1562 ) 

1563 

1564 logger.info( 

1565 f"Saved text to encrypted database ({word_count} words)" 

1566 ) 

1567 return None 

1568 

1569 except Exception: 

1570 logger.exception("Error saving text to encrypted database") 

1571 raise # Re-raise so caller can handle the error 

1572 

1573 def _create_text_document_record( 

1574 self, 

1575 session: Session, 

1576 resource: ResearchResource, 

1577 file_path: Path, 

1578 extraction_method: str, 

1579 extraction_source: str, 

1580 ): 

1581 """Update existing Document with text from file (for legacy text files).""" 

1582 try: 

1583 # Read file to get metadata 

1584 text = file_path.read_text(encoding="utf-8", errors="ignore") 

1585 word_count = len(text.split()) 

1586 character_count = len(text) 

1587 

1588 # Find the Document by resource_id 

1589 doc = ( 

1590 session.query(Document) 

1591 .filter_by(resource_id=resource.id) 

1592 .first() 

1593 ) 

1594 

1595 if doc: 

1596 # Update existing document with text content 

1597 doc.text_content = text 

1598 doc.character_count = character_count 

1599 doc.word_count = word_count 

1600 doc.extraction_method = extraction_method 

1601 doc.extraction_source = extraction_source 

1602 doc.extraction_quality = ( 

1603 "low" # Unknown quality for legacy files 

1604 ) 

1605 logger.info( 

1606 f"Updated document {doc.id} with text from file: {file_path.name}" 

1607 ) 

1608 else: 

1609 logger.warning( 

1610 f"No document found to update for resource {resource.id}" 

1611 ) 

1612 

1613 except Exception: 

1614 logger.exception("Error updating document with text from file") 

1615 

1616 def _record_failed_text_extraction( 

1617 self, session: Session, resource: ResearchResource, error: str 

1618 ): 

1619 """Record a failed text extraction attempt in the Document.""" 

1620 try: 

1621 # Find the Document by resource_id 

1622 doc = ( 

1623 session.query(Document) 

1624 .filter_by(resource_id=resource.id) 

1625 .first() 

1626 ) 

1627 

1628 if doc: 1628 ↛ 1637line 1628 didn't jump to line 1637 because the condition on line 1628 was always true

1629 # Update document with extraction error 

1630 doc.error_message = error 

1631 doc.extraction_method = "failed" 

1632 doc.extraction_quality = "low" 

1633 logger.info( 

1634 f"Recorded failed text extraction for document {doc.id}: {error}" 

1635 ) 

1636 else: 

1637 logger.warning( 

1638 f"No document found to record extraction failure for resource {resource.id}" 

1639 ) 

1640 

1641 except Exception: 

1642 logger.exception("Error recording failed text extraction")