Coverage for src / local_deep_research / research_library / downloaders / pubmed.py: 96%
269 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2PubMed/PMC PDF Downloader
3"""
5import re
6import time
7from typing import Optional
8from urllib.parse import urlparse
9from loguru import logger
11from .base import BaseDownloader, ContentType, DownloadResult
14class PubMedDownloader(BaseDownloader):
15 """Downloader for PubMed and PubMed Central articles with PDF and text support."""
17 def __init__(self, timeout: int = 30, rate_limit_delay: float = 1.0):
18 """
19 Initialize PubMed downloader.
21 Args:
22 timeout: Request timeout in seconds
23 rate_limit_delay: Delay between requests to avoid rate limiting
24 """
25 super().__init__(timeout)
26 self.rate_limit_delay = rate_limit_delay
27 self.last_request_time: float = 0.0
29 def can_handle(self, url: str) -> bool:
30 """Check if URL is from PubMed or PMC."""
31 try:
32 parsed = urlparse(url)
33 hostname = parsed.hostname
34 if not hostname:
35 return False
37 # Check for pubmed.ncbi.nlm.nih.gov
38 if hostname == "pubmed.ncbi.nlm.nih.gov":
39 return True
41 # Check for ncbi.nlm.nih.gov with /pmc in path
42 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed.path:
43 return True
45 # Check for europepmc.org and its subdomains
46 if hostname == "europepmc.org" or hostname.endswith(
47 ".europepmc.org"
48 ):
49 return True
51 return False
52 except Exception:
53 return False
55 def download(
56 self, url: str, content_type: ContentType = ContentType.PDF
57 ) -> Optional[bytes]:
58 """Download content from PubMed/PMC."""
59 # Apply rate limiting
60 self._apply_rate_limit()
62 if content_type == ContentType.TEXT:
63 # Try to get full text from API
64 return self._download_text(url)
65 # Download PDF
66 return self._download_pdf_content(url)
68 def download_with_result(
69 self, url: str, content_type: ContentType = ContentType.PDF
70 ) -> DownloadResult:
71 """Download content and return detailed result with skip reason."""
72 # Apply rate limiting
73 self._apply_rate_limit()
75 if content_type == ContentType.TEXT:
76 content = self._download_text(url)
77 if content:
78 return DownloadResult(content=content, is_success=True)
79 return DownloadResult(
80 skip_reason="Full text not available - may require subscription"
81 )
82 # Try to download PDF with detailed tracking
83 return self._download_pdf_with_result(url)
85 def _download_pdf_content(self, url: str) -> Optional[bytes]:
86 """Download PDF from PubMed/PMC."""
87 # Handle different URL types
88 parsed = urlparse(url)
89 hostname = parsed.hostname or ""
90 path = parsed.path or ""
92 # Check for PMC article direct download
93 if hostname == "ncbi.nlm.nih.gov" and "/pmc/articles/PMC" in path:
94 return self._download_pmc_direct(url)
95 # Check for PubMed main site
96 if hostname == "pubmed.ncbi.nlm.nih.gov":
97 return self._download_pubmed(url)
98 # Check for Europe PMC and subdomains
99 if hostname == "europepmc.org" or hostname.endswith(".europepmc.org"):
100 return self._download_europe_pmc(url)
102 return None
104 def _download_pdf_with_result(self, url: str) -> DownloadResult:
105 """Download PDF and return detailed result with skip reason."""
106 # Handle different URL types
107 if "/pmc/articles/PMC" in url:
108 pmc_match = re.search(r"(PMC\d+)", url)
109 if not pmc_match:
110 return DownloadResult(skip_reason="Invalid PMC URL format")
112 pmc_id = pmc_match.group(1)
113 logger.info(f"Downloading PMC article: {pmc_id}")
115 # Try Europe PMC first
116 pdf_content = self._download_via_europe_pmc(pmc_id)
117 if pdf_content:
118 return DownloadResult(content=pdf_content, is_success=True)
120 # Try NCBI PMC
121 pdf_content = self._download_via_ncbi_pmc(pmc_id)
122 if pdf_content:
123 return DownloadResult(content=pdf_content, is_success=True)
125 return DownloadResult(
126 skip_reason=f"PMC article {pmc_id} not accessible - may be retracted or embargoed"
127 )
129 if urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov":
130 # Extract PMID
131 pmid_match = re.search(r"/(\d+)/?", url)
132 if not pmid_match:
133 return DownloadResult(skip_reason="Invalid PubMed URL format")
135 pmid = pmid_match.group(1)
136 logger.info(f"Processing PubMed article: {pmid}")
138 # Check if article is open access via Europe PMC
139 try:
140 api_url = (
141 "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
142 )
143 params = {"query": f"EXT_ID:{pmid}", "format": "json"}
145 response = self.session.get(api_url, params=params, timeout=10)
147 if response.status_code == 200: 147 ↛ 185line 147 didn't jump to line 185 because the condition on line 147 was always true
148 data = response.json()
149 results = data.get("resultList", {}).get("result", [])
151 if results:
152 article = results[0]
154 # Check if article exists but is not open access
155 if article.get("isOpenAccess") != "Y":
156 journal = article.get(
157 "journalTitle", "Unknown journal"
158 )
159 return DownloadResult(
160 skip_reason=f"Article requires subscription to {journal}"
161 )
163 # Check if PDF is available
164 if article.get("hasPDF") != "Y":
165 return DownloadResult(
166 skip_reason="No PDF version available for this article"
167 )
169 # Try to download
170 pmcid = article.get("pmcid")
171 if pmcid: 171 ↛ 185line 171 didn't jump to line 185 because the condition on line 171 was always true
172 pdf_content = self._download_via_europe_pmc(pmcid)
173 if pdf_content: 173 ↛ 185line 173 didn't jump to line 185 because the condition on line 173 was always true
174 return DownloadResult(
175 content=pdf_content, is_success=True
176 )
177 else:
178 return DownloadResult(
179 skip_reason=f"Article PMID:{pmid} not found in Europe PMC database"
180 )
181 except Exception as e:
182 logger.debug(f"Error checking article status: {e}")
184 # Try to find PMC ID via NCBI
185 pmc_id = self._get_pmc_id_from_pmid(pmid)
186 if pmc_id:
187 logger.info(f"Found PMC ID: {pmc_id} for PMID: {pmid}")
189 # Try downloading via PMC
190 pdf_content = self._download_via_europe_pmc(pmc_id)
191 if pdf_content:
192 return DownloadResult(content=pdf_content, is_success=True)
194 pdf_content = self._download_via_ncbi_pmc(pmc_id)
195 if pdf_content: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 return DownloadResult(content=pdf_content, is_success=True)
198 return DownloadResult(
199 skip_reason=f"PMC version exists but PDF not accessible (PMC ID: {pmc_id})"
200 )
202 return DownloadResult(
203 skip_reason="No free full-text available - article may be paywalled"
204 )
206 parsed = urlparse(url)
207 hostname = parsed.hostname or ""
208 if hostname == "europepmc.org" or hostname.endswith(".europepmc.org"):
209 pmc_match = re.search(r"(PMC\d+)", url)
210 if pmc_match:
211 pmc_id = pmc_match.group(1)
212 pdf_content = self._download_via_europe_pmc(pmc_id)
213 if pdf_content:
214 return DownloadResult(content=pdf_content, is_success=True)
215 return DownloadResult(
216 skip_reason=f"Europe PMC article {pmc_id} not accessible"
217 )
218 return DownloadResult(skip_reason="Invalid Europe PMC URL format")
219 return DownloadResult(skip_reason="Unsupported PubMed/PMC URL format")
221 def _download_text(self, url: str) -> Optional[bytes]:
222 """Download full text content from PubMed/PMC APIs."""
223 # Extract PMID or PMC ID
224 pmid = None
225 pmc_id = None
227 parsed_url = urlparse(url)
228 if parsed_url.hostname == "pubmed.ncbi.nlm.nih.gov":
229 pmid_match = re.search(r"/(\d+)/?", url)
230 if pmid_match: 230 ↛ 238line 230 didn't jump to line 238 because the condition on line 230 was always true
231 pmid = pmid_match.group(1)
232 elif "/pmc/articles/PMC" in url:
233 pmc_match = re.search(r"(PMC\d+)", url)
234 if pmc_match: 234 ↛ 238line 234 didn't jump to line 238 because the condition on line 234 was always true
235 pmc_id = pmc_match.group(1)
237 # Try Europe PMC API for full text
238 if pmid or pmc_id:
239 text = self._fetch_text_from_europe_pmc(pmid, pmc_id)
240 if text:
241 return text.encode("utf-8")
243 # Fallback: Download PDF and extract text
244 pdf_content = self._download_pdf_content(url)
245 if pdf_content:
246 text = self.extract_text_from_pdf(pdf_content)
247 if text:
248 return text.encode("utf-8")
250 return None
252 def _fetch_text_from_europe_pmc(
253 self, pmid: Optional[str], pmc_id: Optional[str]
254 ) -> Optional[str]:
255 """Fetch full text from Europe PMC API."""
256 try:
257 # Construct query
258 if pmc_id:
259 query = f"PMC:{pmc_id.replace('PMC', '')}"
260 elif pmid:
261 query = f"EXT_ID:{pmid}"
262 else:
263 return None
265 # Get article metadata first
266 api_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
267 params = {
268 "query": query,
269 "format": "json",
270 "resultType": "core", # Get more detailed results
271 }
273 response = self.session.get(api_url, params=params, timeout=10)
275 if response.status_code == 200: 275 ↛ 306line 275 didn't jump to line 306 because the condition on line 275 was always true
276 data = response.json()
277 results = data.get("resultList", {}).get("result", [])
279 if results and results[0].get("isOpenAccess") == "Y":
280 article = results[0]
281 # Try to get full text XML
282 if article.get("pmcid"):
283 fulltext_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/{article['pmcid']}/fullTextXML"
284 text_response = self.session.get(
285 fulltext_url, timeout=30
286 )
288 if text_response.status_code == 200:
289 # Extract text from XML (simple approach - just get text content)
290 import re
292 xml_content = text_response.text
293 # Remove XML tags to get plain text
294 text = re.sub(r"<[^>]+>", " ", xml_content)
295 text = " ".join(text.split())
297 if text:
298 logger.info(
299 "Retrieved full text from Europe PMC API"
300 )
301 return text
303 except Exception as e:
304 logger.debug(f"Failed to fetch text from Europe PMC: {e}")
306 return None
308 def _apply_rate_limit(self):
309 """Apply rate limiting between requests."""
310 current_time = time.time()
311 time_since_last = current_time - self.last_request_time
313 if time_since_last < self.rate_limit_delay:
314 sleep_time = self.rate_limit_delay - time_since_last
315 logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s")
316 time.sleep(sleep_time)
318 self.last_request_time = time.time()
320 def _download_pmc_direct(self, url: str) -> Optional[bytes]:
321 """Download directly from PMC URL."""
322 pmc_match = re.search(r"(PMC\d+)", url)
323 if not pmc_match:
324 return None
326 pmc_id = pmc_match.group(1)
327 logger.info(f"Downloading PMC article: {pmc_id}")
329 # Try Europe PMC first (more reliable)
330 pdf_content = self._download_via_europe_pmc(pmc_id)
331 if pdf_content:
332 return pdf_content
334 # Fallback to NCBI PMC
335 return self._download_via_ncbi_pmc(pmc_id)
337 def _download_pubmed(self, url: str) -> Optional[bytes]:
338 """Download from PubMed URL."""
339 # Extract PMID
340 pmid_match = re.search(r"/(\d+)/?", url)
341 if not pmid_match:
342 return None
344 pmid = pmid_match.group(1)
345 logger.info(f"Processing PubMed article: {pmid}")
347 # Try Europe PMC API first
348 pdf_content = self._try_europe_pmc_api(pmid)
349 if pdf_content:
350 return pdf_content
352 # Try to find PMC ID via NCBI API
353 pmc_id = self._get_pmc_id_from_pmid(pmid)
354 if pmc_id:
355 logger.info(f"Found PMC ID: {pmc_id} for PMID: {pmid}")
357 # Try Europe PMC with PMC ID
358 pdf_content = self._download_via_europe_pmc(pmc_id)
359 if pdf_content: 359 ↛ 363line 359 didn't jump to line 363 because the condition on line 359 was always true
360 return pdf_content
362 # Try NCBI PMC
363 pdf_content = self._download_via_ncbi_pmc(pmc_id)
364 if pdf_content:
365 return pdf_content
367 logger.info(f"No PMC version available for PMID: {pmid}")
368 return None
370 def _download_europe_pmc(self, url: str) -> Optional[bytes]:
371 """Download from Europe PMC URL."""
372 # Extract PMC ID from URL
373 pmc_match = re.search(r"(PMC\d+)", url)
374 if pmc_match:
375 pmc_id = pmc_match.group(1)
376 return self._download_via_europe_pmc(pmc_id)
377 return None
379 def _try_europe_pmc_api(self, pmid: str) -> Optional[bytes]:
380 """Try downloading via Europe PMC API using PMID."""
381 try:
382 # Query Europe PMC API
383 api_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
384 params = {"query": f"EXT_ID:{pmid}", "format": "json"}
386 response = self.session.get(api_url, params=params, timeout=10)
388 if response.status_code == 200:
389 data = response.json()
390 results = data.get("resultList", {}).get("result", [])
392 if results:
393 article = results[0]
394 # Check if article has open access PDF
395 if (
396 article.get("isOpenAccess") == "Y"
397 and article.get("hasPDF") == "Y"
398 ):
399 pmcid = article.get("pmcid")
400 if pmcid:
401 logger.info(
402 f"Found open access PDF via Europe PMC API: {pmcid}"
403 )
404 return self._download_via_europe_pmc(pmcid)
406 except Exception as e:
407 logger.debug(f"Europe PMC API query failed: {e}")
409 return None
411 def _get_pmc_id_from_pmid(self, pmid: str) -> Optional[str]:
412 """Convert PMID to PMC ID using NCBI E-utilities."""
413 try:
414 # Use NCBI E-utilities to find PMC ID
415 elink_url = (
416 "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi"
417 )
418 params = {
419 "dbfrom": "pubmed",
420 "db": "pmc",
421 "id": pmid,
422 "retmode": "json",
423 }
425 response = self.session.get(elink_url, params=params, timeout=10)
427 if response.status_code == 200:
428 data = response.json()
429 link_sets = data.get("linksets", [])
431 if link_sets and "linksetdbs" in link_sets[0]:
432 for linksetdb in link_sets[0]["linksetdbs"]:
433 if linksetdb.get("dbto") == "pmc" and linksetdb.get(
434 "links"
435 ):
436 pmc_id_num = linksetdb["links"][0]
437 return f"PMC{pmc_id_num}"
439 except Exception as e:
440 logger.debug(f"NCBI E-utilities lookup failed: {e}")
442 # Fallback: Try scraping the PubMed page
443 try:
444 response = self.session.get(
445 f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/", timeout=10
446 )
448 if response.status_code == 200:
449 pmc_match = re.search(r"PMC\d+", response.text)
450 if pmc_match: 450 ↛ 456line 450 didn't jump to line 456 because the condition on line 450 was always true
451 return pmc_match.group(0)
453 except Exception as e:
454 logger.debug(f"PubMed page scraping failed: {e}")
456 return None
458 def _download_via_europe_pmc(self, pmc_id: str) -> Optional[bytes]:
459 """Download PDF via Europe PMC."""
460 # Europe PMC PDF URL
461 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
463 logger.debug(f"Trying Europe PMC: {pdf_url}")
464 pdf_content = self._download_pdf(pdf_url)
466 if pdf_content:
467 logger.info(f"Successfully downloaded from Europe PMC: {pmc_id}")
469 return pdf_content
471 def _download_via_ncbi_pmc(self, pmc_id: str) -> Optional[bytes]:
472 """Download PDF via NCBI PMC."""
473 # Try different NCBI PMC URL patterns
474 url_patterns = [
475 f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/pdf/",
476 f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/pdf/main.pdf",
477 ]
479 for pdf_url in url_patterns:
480 logger.debug(f"Trying NCBI PMC: {pdf_url}")
482 # Add referer header for NCBI
483 headers = {
484 "Referer": f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/"
485 }
487 pdf_content = self._download_pdf(pdf_url, headers)
488 if pdf_content:
489 logger.info(f"Successfully downloaded from NCBI PMC: {pmc_id}")
490 return pdf_content
492 return None