Coverage for src / local_deep_research / research_library / downloaders / pubmed.py: 46%

269 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2PubMed/PMC PDF Downloader 

3""" 

4 

5import re 

6import time 

7from typing import Optional 

8from urllib.parse import urlparse 

9from loguru import logger 

10 

11from .base import BaseDownloader, ContentType, DownloadResult 

12 

13 

14class PubMedDownloader(BaseDownloader): 

15 """Downloader for PubMed and PubMed Central articles with PDF and text support.""" 

16 

17 def __init__(self, timeout: int = 30, rate_limit_delay: float = 1.0): 

18 """ 

19 Initialize PubMed downloader. 

20 

21 Args: 

22 timeout: Request timeout in seconds 

23 rate_limit_delay: Delay between requests to avoid rate limiting 

24 """ 

25 super().__init__(timeout) 

26 self.rate_limit_delay = rate_limit_delay 

27 self.last_request_time = 0 

28 

29 def can_handle(self, url: str) -> bool: 

30 """Check if URL is from PubMed or PMC.""" 

31 try: 

32 parsed = urlparse(url) 

33 hostname = parsed.hostname 

34 if not hostname: 

35 return False 

36 

37 # Check for pubmed.ncbi.nlm.nih.gov 

38 if hostname == "pubmed.ncbi.nlm.nih.gov": 

39 return True 

40 

41 # Check for ncbi.nlm.nih.gov with /pmc in path 

42 if hostname == "ncbi.nlm.nih.gov" and "/pmc" in parsed.path: 

43 return True 

44 

45 # Check for europepmc.org and its subdomains 

46 if hostname == "europepmc.org" or hostname.endswith( 

47 ".europepmc.org" 

48 ): 

49 return True 

50 

51 return False 

52 except Exception: 

53 return False 

54 

55 def download( 

56 self, url: str, content_type: ContentType = ContentType.PDF 

57 ) -> Optional[bytes]: 

58 """Download content from PubMed/PMC.""" 

59 # Apply rate limiting 

60 self._apply_rate_limit() 

61 

62 if content_type == ContentType.TEXT: 62 ↛ 64line 62 didn't jump to line 64 because the condition on line 62 was never true

63 # Try to get full text from API 

64 return self._download_text(url) 

65 else: 

66 # Download PDF 

67 return self._download_pdf_content(url) 

68 

69 def download_with_result( 

70 self, url: str, content_type: ContentType = ContentType.PDF 

71 ) -> DownloadResult: 

72 """Download content and return detailed result with skip reason.""" 

73 # Apply rate limiting 

74 self._apply_rate_limit() 

75 

76 if content_type == ContentType.TEXT: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 content = self._download_text(url) 

78 if content: 

79 return DownloadResult(content=content, is_success=True) 

80 else: 

81 return DownloadResult( 

82 skip_reason="Full text not available - may require subscription" 

83 ) 

84 else: 

85 # Try to download PDF with detailed tracking 

86 return self._download_pdf_with_result(url) 

87 

88 def _download_pdf_content(self, url: str) -> Optional[bytes]: 

89 """Download PDF from PubMed/PMC.""" 

90 # Handle different URL types 

91 parsed = urlparse(url) 

92 hostname = parsed.hostname or "" 

93 path = parsed.path or "" 

94 

95 # Check for PMC article direct download 

96 if hostname == "ncbi.nlm.nih.gov" and "/pmc/articles/PMC" in path: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true

97 return self._download_pmc_direct(url) 

98 # Check for PubMed main site 

99 elif hostname == "pubmed.ncbi.nlm.nih.gov": 99 ↛ 102line 99 didn't jump to line 102 because the condition on line 99 was always true

100 return self._download_pubmed(url) 

101 # Check for Europe PMC and subdomains 

102 elif hostname == "europepmc.org" or hostname.endswith(".europepmc.org"): 

103 return self._download_europe_pmc(url) 

104 

105 return None 

106 

107 def _download_pdf_with_result(self, url: str) -> DownloadResult: 

108 """Download PDF and return detailed result with skip reason.""" 

109 # Handle different URL types 

110 if "/pmc/articles/PMC" in url: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 pmc_match = re.search(r"(PMC\d+)", url) 

112 if not pmc_match: 

113 return DownloadResult(skip_reason="Invalid PMC URL format") 

114 

115 pmc_id = pmc_match.group(1) 

116 logger.info(f"Downloading PMC article: {pmc_id}") 

117 

118 # Try Europe PMC first 

119 pdf_content = self._download_via_europe_pmc(pmc_id) 

120 if pdf_content: 

121 return DownloadResult(content=pdf_content, is_success=True) 

122 

123 # Try NCBI PMC 

124 pdf_content = self._download_via_ncbi_pmc(pmc_id) 

125 if pdf_content: 

126 return DownloadResult(content=pdf_content, is_success=True) 

127 

128 return DownloadResult( 

129 skip_reason=f"PMC article {pmc_id} not accessible - may be retracted or embargoed" 

130 ) 

131 

132 elif urlparse(url).hostname == "pubmed.ncbi.nlm.nih.gov": 

133 # Extract PMID 

134 pmid_match = re.search(r"/(\d+)/?", url) 

135 if not pmid_match: 

136 return DownloadResult(skip_reason="Invalid PubMed URL format") 

137 

138 pmid = pmid_match.group(1) 

139 logger.info(f"Processing PubMed article: {pmid}") 

140 

141 # Check if article is open access via Europe PMC 

142 try: 

143 api_url = ( 

144 "https://www.ebi.ac.uk/europepmc/webservices/rest/search" 

145 ) 

146 params = {"query": f"EXT_ID:{pmid}", "format": "json"} 

147 

148 response = self.session.get(api_url, params=params, timeout=10) 

149 

150 if response.status_code == 200: 150 ↛ 188line 150 didn't jump to line 188 because the condition on line 150 was always true

151 data = response.json() 

152 results = data.get("resultList", {}).get("result", []) 

153 

154 if results: 154 ↛ 181line 154 didn't jump to line 181 because the condition on line 154 was always true

155 article = results[0] 

156 

157 # Check if article exists but is not open access 

158 if article.get("isOpenAccess") != "Y": 

159 journal = article.get( 

160 "journalTitle", "Unknown journal" 

161 ) 

162 return DownloadResult( 

163 skip_reason=f"Article requires subscription to {journal}" 

164 ) 

165 

166 # Check if PDF is available 

167 if article.get("hasPDF") != "Y": 167 ↛ 173line 167 didn't jump to line 173 because the condition on line 167 was always true

168 return DownloadResult( 

169 skip_reason="No PDF version available for this article" 

170 ) 

171 

172 # Try to download 

173 pmcid = article.get("pmcid") 

174 if pmcid: 

175 pdf_content = self._download_via_europe_pmc(pmcid) 

176 if pdf_content: 

177 return DownloadResult( 

178 content=pdf_content, is_success=True 

179 ) 

180 else: 

181 return DownloadResult( 

182 skip_reason=f"Article PMID:{pmid} not found in Europe PMC database" 

183 ) 

184 except Exception as e: 

185 logger.debug(f"Error checking article status: {e}") 

186 

187 # Try to find PMC ID via NCBI 

188 pmc_id = self._get_pmc_id_from_pmid(pmid) 

189 if pmc_id: 

190 logger.info(f"Found PMC ID: {pmc_id} for PMID: {pmid}") 

191 

192 # Try downloading via PMC 

193 pdf_content = self._download_via_europe_pmc(pmc_id) 

194 if pdf_content: 

195 return DownloadResult(content=pdf_content, is_success=True) 

196 

197 pdf_content = self._download_via_ncbi_pmc(pmc_id) 

198 if pdf_content: 

199 return DownloadResult(content=pdf_content, is_success=True) 

200 

201 return DownloadResult( 

202 skip_reason=f"PMC version exists but PDF not accessible (PMC ID: {pmc_id})" 

203 ) 

204 

205 return DownloadResult( 

206 skip_reason="No free full-text available - article may be paywalled" 

207 ) 

208 

209 else: 

210 parsed = urlparse(url) 

211 hostname = parsed.hostname or "" 

212 if hostname == "europepmc.org" or hostname.endswith( 212 ↛ 215line 212 didn't jump to line 215 because the condition on line 212 was never true

213 ".europepmc.org" 

214 ): 

215 pmc_match = re.search(r"(PMC\d+)", url) 

216 if pmc_match: 

217 pmc_id = pmc_match.group(1) 

218 pdf_content = self._download_via_europe_pmc(pmc_id) 

219 if pdf_content: 

220 return DownloadResult( 

221 content=pdf_content, is_success=True 

222 ) 

223 return DownloadResult( 

224 skip_reason=f"Europe PMC article {pmc_id} not accessible" 

225 ) 

226 return DownloadResult( 

227 skip_reason="Invalid Europe PMC URL format" 

228 ) 

229 else: 

230 return DownloadResult( 

231 skip_reason="Unsupported PubMed/PMC URL format" 

232 ) 

233 

234 def _download_text(self, url: str) -> Optional[bytes]: 

235 """Download full text content from PubMed/PMC APIs.""" 

236 # Extract PMID or PMC ID 

237 pmid = None 

238 pmc_id = None 

239 

240 parsed_url = urlparse(url) 

241 if parsed_url.hostname == "pubmed.ncbi.nlm.nih.gov": 

242 pmid_match = re.search(r"/(\d+)/?", url) 

243 if pmid_match: 

244 pmid = pmid_match.group(1) 

245 elif "/pmc/articles/PMC" in url: 

246 pmc_match = re.search(r"(PMC\d+)", url) 

247 if pmc_match: 

248 pmc_id = pmc_match.group(1) 

249 

250 # Try Europe PMC API for full text 

251 if pmid or pmc_id: 

252 text = self._fetch_text_from_europe_pmc(pmid, pmc_id) 

253 if text: 

254 return text.encode("utf-8") 

255 

256 # Fallback: Download PDF and extract text 

257 pdf_content = self._download_pdf_content(url) 

258 if pdf_content: 

259 text = self.extract_text_from_pdf(pdf_content) 

260 if text: 

261 return text.encode("utf-8") 

262 

263 return None 

264 

265 def _fetch_text_from_europe_pmc( 

266 self, pmid: Optional[str], pmc_id: Optional[str] 

267 ) -> Optional[str]: 

268 """Fetch full text from Europe PMC API.""" 

269 try: 

270 # Construct query 

271 if pmc_id: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 query = f"PMC:{pmc_id.replace('PMC', '')}" 

273 elif pmid: 273 ↛ 276line 273 didn't jump to line 276 because the condition on line 273 was always true

274 query = f"EXT_ID:{pmid}" 

275 else: 

276 return None 

277 

278 # Get article metadata first 

279 api_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search" 

280 params = { 

281 "query": query, 

282 "format": "json", 

283 "resultType": "core", # Get more detailed results 

284 } 

285 

286 response = self.session.get(api_url, params=params, timeout=10) 

287 

288 if response.status_code == 200: 288 ↛ 319line 288 didn't jump to line 319 because the condition on line 288 was always true

289 data = response.json() 

290 results = data.get("resultList", {}).get("result", []) 

291 

292 if results and results[0].get("isOpenAccess") == "Y": 

293 article = results[0] 

294 # Try to get full text XML 

295 if article.get("pmcid"): 295 ↛ 319line 295 didn't jump to line 319 because the condition on line 295 was always true

296 fulltext_url = f"https://www.ebi.ac.uk/europepmc/webservices/rest/{article['pmcid']}/fullTextXML" 

297 text_response = self.session.get( 

298 fulltext_url, timeout=30 

299 ) 

300 

301 if text_response.status_code == 200: 301 ↛ 319line 301 didn't jump to line 319 because the condition on line 301 was always true

302 # Extract text from XML (simple approach - just get text content) 

303 import re 

304 

305 xml_content = text_response.text 

306 # Remove XML tags to get plain text 

307 text = re.sub(r"<[^>]+>", " ", xml_content) 

308 text = " ".join(text.split()) 

309 

310 if text: 310 ↛ 319line 310 didn't jump to line 319 because the condition on line 310 was always true

311 logger.info( 

312 "Retrieved full text from Europe PMC API" 

313 ) 

314 return text 

315 

316 except Exception as e: 

317 logger.debug(f"Failed to fetch text from Europe PMC: {e}") 

318 

319 return None 

320 

321 def _apply_rate_limit(self): 

322 """Apply rate limiting between requests.""" 

323 current_time = time.time() 

324 time_since_last = current_time - self.last_request_time 

325 

326 if time_since_last < self.rate_limit_delay: 

327 sleep_time = self.rate_limit_delay - time_since_last 

328 logger.debug(f"Rate limiting: sleeping {sleep_time:.2f}s") 

329 time.sleep(sleep_time) 

330 

331 self.last_request_time = time.time() 

332 

333 def _download_pmc_direct(self, url: str) -> Optional[bytes]: 

334 """Download directly from PMC URL.""" 

335 pmc_match = re.search(r"(PMC\d+)", url) 

336 if not pmc_match: 

337 return None 

338 

339 pmc_id = pmc_match.group(1) 

340 logger.info(f"Downloading PMC article: {pmc_id}") 

341 

342 # Try Europe PMC first (more reliable) 

343 pdf_content = self._download_via_europe_pmc(pmc_id) 

344 if pdf_content: 

345 return pdf_content 

346 

347 # Fallback to NCBI PMC 

348 return self._download_via_ncbi_pmc(pmc_id) 

349 

350 def _download_pubmed(self, url: str) -> Optional[bytes]: 

351 """Download from PubMed URL.""" 

352 # Extract PMID 

353 pmid_match = re.search(r"/(\d+)/?", url) 

354 if not pmid_match: 

355 return None 

356 

357 pmid = pmid_match.group(1) 

358 logger.info(f"Processing PubMed article: {pmid}") 

359 

360 # Try Europe PMC API first 

361 pdf_content = self._try_europe_pmc_api(pmid) 

362 if pdf_content: 

363 return pdf_content 

364 

365 # Try to find PMC ID via NCBI API 

366 pmc_id = self._get_pmc_id_from_pmid(pmid) 

367 if pmc_id: 

368 logger.info(f"Found PMC ID: {pmc_id} for PMID: {pmid}") 

369 

370 # Try Europe PMC with PMC ID 

371 pdf_content = self._download_via_europe_pmc(pmc_id) 

372 if pdf_content: 

373 return pdf_content 

374 

375 # Try NCBI PMC 

376 pdf_content = self._download_via_ncbi_pmc(pmc_id) 

377 if pdf_content: 

378 return pdf_content 

379 

380 logger.info(f"No PMC version available for PMID: {pmid}") 

381 return None 

382 

383 def _download_europe_pmc(self, url: str) -> Optional[bytes]: 

384 """Download from Europe PMC URL.""" 

385 # Extract PMC ID from URL 

386 pmc_match = re.search(r"(PMC\d+)", url) 

387 if pmc_match: 

388 pmc_id = pmc_match.group(1) 

389 return self._download_via_europe_pmc(pmc_id) 

390 return None 

391 

392 def _try_europe_pmc_api(self, pmid: str) -> Optional[bytes]: 

393 """Try downloading via Europe PMC API using PMID.""" 

394 try: 

395 # Query Europe PMC API 

396 api_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search" 

397 params = {"query": f"EXT_ID:{pmid}", "format": "json"} 

398 

399 response = self.session.get(api_url, params=params, timeout=10) 

400 

401 if response.status_code == 200: 

402 data = response.json() 

403 results = data.get("resultList", {}).get("result", []) 

404 

405 if results: 

406 article = results[0] 

407 # Check if article has open access PDF 

408 if ( 

409 article.get("isOpenAccess") == "Y" 

410 and article.get("hasPDF") == "Y" 

411 ): 

412 pmcid = article.get("pmcid") 

413 if pmcid: 

414 logger.info( 

415 f"Found open access PDF via Europe PMC API: {pmcid}" 

416 ) 

417 return self._download_via_europe_pmc(pmcid) 

418 

419 except Exception as e: 

420 logger.debug(f"Europe PMC API query failed: {e}") 

421 

422 return None 

423 

424 def _get_pmc_id_from_pmid(self, pmid: str) -> Optional[str]: 

425 """Convert PMID to PMC ID using NCBI E-utilities.""" 

426 try: 

427 # Use NCBI E-utilities to find PMC ID 

428 elink_url = ( 

429 "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi" 

430 ) 

431 params = { 

432 "dbfrom": "pubmed", 

433 "db": "pmc", 

434 "id": pmid, 

435 "retmode": "json", 

436 } 

437 

438 response = self.session.get(elink_url, params=params, timeout=10) 

439 

440 if response.status_code == 200: 

441 data = response.json() 

442 link_sets = data.get("linksets", []) 

443 

444 if link_sets and "linksetdbs" in link_sets[0]: 

445 for linksetdb in link_sets[0]["linksetdbs"]: 445 ↛ 456line 445 didn't jump to line 456 because the loop on line 445 didn't complete

446 if linksetdb.get("dbto") == "pmc" and linksetdb.get( 446 ↛ 445line 446 didn't jump to line 445 because the condition on line 446 was always true

447 "links" 

448 ): 

449 pmc_id_num = linksetdb["links"][0] 

450 return f"PMC{pmc_id_num}" 

451 

452 except Exception as e: 

453 logger.debug(f"NCBI E-utilities lookup failed: {e}") 

454 

455 # Fallback: Try scraping the PubMed page 

456 try: 

457 response = self.session.get( 

458 f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/", timeout=10 

459 ) 

460 

461 if response.status_code == 200: 

462 pmc_match = re.search(r"PMC\d+", response.text) 

463 if pmc_match: 

464 return pmc_match.group(0) 

465 

466 except Exception as e: 

467 logger.debug(f"PubMed page scraping failed: {e}") 

468 

469 return None 

470 

471 def _download_via_europe_pmc(self, pmc_id: str) -> Optional[bytes]: 

472 """Download PDF via Europe PMC.""" 

473 # Europe PMC PDF URL 

474 pdf_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" 

475 

476 logger.debug(f"Trying Europe PMC: {pdf_url}") 

477 pdf_content = self._download_pdf(pdf_url) 

478 

479 if pdf_content: 479 ↛ 482line 479 didn't jump to line 482 because the condition on line 479 was always true

480 logger.info(f"Successfully downloaded from Europe PMC: {pmc_id}") 

481 

482 return pdf_content 

483 

484 def _download_via_ncbi_pmc(self, pmc_id: str) -> Optional[bytes]: 

485 """Download PDF via NCBI PMC.""" 

486 # Try different NCBI PMC URL patterns 

487 url_patterns = [ 

488 f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/pdf/", 

489 f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/pdf/main.pdf", 

490 ] 

491 

492 for pdf_url in url_patterns: 492 ↛ 505line 492 didn't jump to line 505 because the loop on line 492 didn't complete

493 logger.debug(f"Trying NCBI PMC: {pdf_url}") 

494 

495 # Add referer header for NCBI 

496 headers = { 

497 "Referer": f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" 

498 } 

499 

500 pdf_content = self._download_pdf(pdf_url, headers) 

501 if pdf_content: 

502 logger.info(f"Successfully downloaded from NCBI PMC: {pmc_id}") 

503 return pdf_content 

504 

505 return None