Coverage for src/local_deep_research/research_library/downloaders/extraction/pipeline.py: 87%

202 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Shared extraction pipeline. 

3 

4Two entry points: 

5 extract_content(html) — HTML string in, clean text out. 

6 fetch_and_extract(url) — URL in, clean text out (static + JS fallback). 

7 

8This is the single source of truth for content extraction in the project. 

9Used by HTMLDownloader, ContentFetcher, FullSearchResults, WaybackSearchEngine, 

10and any other code that needs clean text from a web page. 

11 

12Academic URLs (arXiv, PubMed, bioRxiv, etc.) are automatically routed to 

13specialized downloaders first, with the generic HTML pipeline as fallback. 

14""" 

15 

16from typing import Any, Dict, List, Optional 

17 

18from bs4 import BeautifulSoup 

19from loguru import logger 

20 

21from .trafilatura_extractor import TrafilaturaExtractor 

22from .readability_extractor import ReadabilityExtractor 

23from .justext_extractor import JustextExtractor 

24from .newspaper_extractor import NewspaperExtractor 

25from .metadata_extractor import extract_metadata, metadata_to_text 

26 

27 

28# --- Pipeline thresholds --- 

29# Minimum extracted text length to consider extraction successful 

30MIN_CONTENT_LENGTH = 50 

31# If content is shorter than this, enrich with structured metadata 

32# (JSON-LD, OpenGraph) — helps product pages, JS-heavy sites 

33METADATA_ENRICHMENT_THRESHOLD = 1000 

34# Boilerplate penalty per keyword when scoring extraction quality 

35BOILERPLATE_PENALTY = 500 

36# If an extractor discards more than this fraction of the previous 

37# extractor's output, skip it (protects non-English content) 

38SAFETY_DISCARD_RATIO = 0.2 

39 

40# Module-level singleton extractors (avoid re-creating per call) 

41_trafilatura = TrafilaturaExtractor() 

42_readability = ReadabilityExtractor() 

43_justext_en = JustextExtractor(language="English") 

44_newspaper = NewspaperExtractor() 

45 

46# Boilerplate keywords — used to penalize low-quality extraction 

47_BOILERPLATE_KEYWORDS = [ 

48 "cookie", 

49 "sign up", 

50 "newsletter", 

51 "subscribe", 

52 "accept all", 

53 "privacy policy", 

54 "terms of service", 

55] 

56 

57 

58def _run_extractors_parallel( 

59 html: str, url: str 

60) -> tuple[str | None, str | None]: 

61 """Run trafilatura and newspaper4k sequentially. 

62 

63 Both extractors call into lxml's C extension, which is not safe to 

64 share across threads — running them in a ThreadPoolExecutor caused 

65 Fatal Python error: Aborted during pool teardown on Python 3.14 

66 (the workers would deadlock in shutdown's join). Serializing the 

67 calls eliminates the crash; the perf cost is one extra extraction's 

68 worth of CPU per page, which is acceptable. 

69 

70 The function name is preserved for backwards compatibility with 

71 any callers/tests that import it directly. 

72 """ 

73 try: 

74 trafilatura_content = _trafilatura.extract(html) 

75 except Exception: 

76 logger.debug("Pipeline: trafilatura raised an exception") 

77 trafilatura_content = None 

78 

79 try: 

80 newspaper_content = _newspaper.extract(html, url) 

81 except Exception: 

82 logger.debug("Pipeline: newspaper4k raised an exception") 

83 newspaper_content = None 

84 

85 return trafilatura_content, newspaper_content 

86 

87 

88def _count_boilerplate(text: str) -> int: 

89 """Count boilerplate keyword occurrences in text.""" 

90 if not text: 

91 return 0 

92 lower = text.lower() 

93 return sum(1 for kw in _BOILERPLATE_KEYWORDS if kw in lower) 

94 

95 

96def _quality_score(text: str) -> int: 

97 """Score extraction quality: length minus boilerplate penalty.""" 

98 if not text: 

99 return 0 

100 return len(text) - (_count_boilerplate(text) * BOILERPLATE_PENALTY) 

101 

102 

103def extract_content( 

104 html: str, 

105 language: str = "English", 

106 min_length: int = MIN_CONTENT_LENGTH, 

107 url: str = "", 

108) -> Optional[str]: 

109 """Extract clean text content from HTML. 

110 

111 Pipeline: 

112 1. trafilatura (primary — best benchmarks, multilingual, markdown) 

113 2. newspaper4k (parallel — strong on news/forum pages) 

114 → pick the higher-quality result from steps 1-2 

115 3. readability → justext (fallback if both above fail, with 80% safety) 

116 4. soup.get_text() (last resort) 

117 

118 Args: 

119 html: Raw HTML string. 

120 language: Language for justext stoplist (fallback only). 

121 min_length: Minimum content length to accept. 

122 url: Source URL (improves newspaper4k extraction accuracy). 

123 

124 Returns: 

125 Extracted plain text, or None if content is below min_length. 

126 """ 

127 if not html or not html.strip(): 

128 return None 

129 

130 # Run trafilatura and newspaper4k in parallel, pick the better result. 

131 # newspaper4k is strong on news front pages and multi-answer threads 

132 # where trafilatura sometimes extracts less content. 

133 # 5s timeout per extractor — covers P95 of pages, cuts off outliers. 

134 trafilatura_content, newspaper_content = _run_extractors_parallel(html, url) 

135 

136 traf_score = _quality_score(trafilatura_content) 

137 np_score = _quality_score(newspaper_content) 

138 

139 if traf_score >= np_score and trafilatura_content: 

140 content = trafilatura_content 

141 winner = "trafilatura" 

142 elif newspaper_content: 

143 content = newspaper_content 

144 winner = "newspaper4k" 

145 else: 

146 content = trafilatura_content 

147 winner = "trafilatura" 

148 

149 if content and len(content.strip()) >= min_length: 

150 logger.debug( 

151 f"Pipeline: {winner} extracted {len(content)} chars" 

152 + ( 

153 f" (traf={len(trafilatura_content or '')}, " 

154 f"np4k={len(newspaper_content or '')})" 

155 if newspaper_content and trafilatura_content 

156 else "" 

157 ) 

158 ) 

159 else: 

160 # Fallback: readability → justext 

161 logger.debug( 

162 "Pipeline: primary extractors insufficient, using fallback" 

163 ) 

164 

165 soup = BeautifulSoup(html, "html.parser") 

166 for tag_name in [ 

167 "script", 

168 "style", 

169 "iframe", 

170 "noscript", 

171 "svg", 

172 "form", 

173 "button", 

174 "input", 

175 "select", 

176 "textarea", 

177 ]: 

178 for tag in soup.find_all(tag_name): 

179 tag.decompose() 

180 cleaned_html = str(soup) 

181 

182 justext_extractor = ( 

183 _justext_en 

184 if language == "English" 

185 else JustextExtractor(language=language) 

186 ) 

187 

188 content = None 

189 prev_text_len = 0 

190 

191 for extractor in [_readability, justext_extractor]: 

192 result = extractor.extract( 

193 cleaned_html if prev_text_len == 0 else content 

194 ) 

195 if result and result.strip(): 

196 result_len = len(result.strip()) 

197 # Safety: skip if extractor discards >80% of content. 

198 # Compare text lengths (strip HTML tags for fair comparison 

199 # since readability returns HTML but justext returns text). 

200 if ( 

201 prev_text_len > 0 

202 and result_len < prev_text_len * SAFETY_DISCARD_RATIO 

203 ): 

204 logger.debug( 

205 f"Pipeline: {extractor.__class__.__name__} discarded " 

206 f">80% of content — skipping" 

207 ) 

208 continue 

209 content = result 

210 # Store text-equivalent length for fair comparison 

211 if "<" in result: 

212 prev_text_len = len( 

213 BeautifulSoup(result, "html.parser").get_text() 

214 ) 

215 else: 

216 prev_text_len = result_len 

217 logger.debug( 

218 f"Pipeline: {extractor.__class__.__name__} " 

219 f"returned {result_len} chars" 

220 ) 

221 

222 # Strip remaining HTML tags (e.g. readability-only mode) 

223 if content and "<" in content: 

224 content = BeautifulSoup(content, "html.parser").get_text( 

225 separator="\n", strip=True 

226 ) 

227 

228 # Last resort 

229 if not content or len(content.strip()) < min_length: 

230 logger.debug("Pipeline: all extractors failed, using get_text()") 

231 content = soup.get_text(separator="\n", strip=True) 

232 

233 if not content or len(content.strip()) < min_length: 

234 return None 

235 

236 # Enrich with structured metadata when text extraction is thin 

237 # (e.g. product pages, JS-heavy sites) 

238 if len(content.strip()) < METADATA_ENRICHMENT_THRESHOLD: 

239 metadata = extract_metadata(html) 

240 supplement = metadata_to_text(metadata) 

241 if supplement and supplement.strip(): 

242 logger.debug( 

243 f"Pipeline: enriching with {len(supplement)} chars " 

244 f"of structured metadata" 

245 ) 

246 content = content.rstrip() + "\n\n" + supplement 

247 

248 return content 

249 

250 

251def extract_content_with_metadata( 

252 html: str, 

253 language: str = "English", 

254 min_length: int = MIN_CONTENT_LENGTH, 

255) -> Optional[Dict[str, Any]]: 

256 """Extract clean text and page metadata from HTML in a single pass. 

257 

258 Combines content extraction (trafilatura/readability/justext pipeline) 

259 with title and description extraction from HTML meta tags. This avoids 

260 the need for callers to do a separate BeautifulSoup parse for metadata. 

261 

262 Args: 

263 html: Raw HTML string. 

264 language: Language for justext stoplist (fallback only). 

265 min_length: Minimum content length to accept. 

266 

267 Returns: 

268 Dict with keys: content, title, description — or None if content 

269 is below min_length. 

270 """ 

271 if not html or not html.strip(): 

272 return None 

273 

274 # Single parse for metadata (title, description, og:*) 

275 soup = BeautifulSoup(html, "html.parser") 

276 

277 title = None 

278 if soup.title and soup.title.string: 

279 title = soup.title.string.strip() 

280 og_title = soup.find("meta", property="og:title") 

281 if og_title and og_title.get("content"): 

282 title = str(og_title["content"]).strip() 

283 

284 description = None 

285 meta_desc = soup.find("meta", attrs={"name": "description"}) 

286 if meta_desc and meta_desc.get("content"): 

287 description = str(meta_desc["content"]).strip() 

288 og_desc = soup.find("meta", property="og:description") 

289 if og_desc and og_desc.get("content"): 

290 description = str(og_desc["content"]).strip() 

291 

292 # Extract content using the shared pipeline 

293 content = extract_content(html, language=language, min_length=min_length) 

294 if not content: 

295 return None 

296 

297 return { 

298 "title": title, 

299 "description": description, 

300 "content": content, 

301 } 

302 

303 

304def _try_specialized_downloader(url: str, timeout: int = 30) -> Optional[str]: 

305 """Try a specialized downloader (arXiv, PubMed, etc.) for the URL. 

306 

307 Returns extracted text if a specialized downloader handles this URL 

308 and succeeds, or None to signal "fall back to generic HTML pipeline". 

309 """ 

310 try: 

311 from local_deep_research.content_fetcher.url_classifier import ( 

312 URLClassifier, 

313 URLType, 

314 ) 

315 except ImportError: 

316 return None 

317 

318 url_type = URLClassifier.classify(url) 

319 

320 # Only academic URL types have specialized downloaders worth trying. 

321 # HTML, DOI, PDF, INVALID fall through to the generic pipeline. 

322 _SPECIALIZED_TYPES = { 

323 URLType.ARXIV, 

324 URLType.PUBMED, 

325 URLType.PMC, 

326 URLType.SEMANTIC_SCHOLAR, 

327 URLType.BIORXIV, 

328 URLType.MEDRXIV, 

329 } 

330 if url_type not in _SPECIALIZED_TYPES: 

331 return None 

332 

333 # Map URL type to downloader class (lazy imports to avoid circular deps) 

334 downloader = None 

335 try: 

336 if url_type == URLType.ARXIV: 336 ↛ 340line 336 didn't jump to line 340 because the condition on line 336 was always true

337 from ..arxiv import ArxivDownloader 

338 

339 downloader = ArxivDownloader(timeout=timeout) 

340 elif url_type in (URLType.PUBMED, URLType.PMC): 

341 from ..pubmed import PubMedDownloader 

342 

343 downloader = PubMedDownloader(timeout=timeout) 

344 elif url_type == URLType.SEMANTIC_SCHOLAR: 

345 from ..semantic_scholar import SemanticScholarDownloader 

346 

347 downloader = SemanticScholarDownloader(timeout=timeout) 

348 elif url_type in (URLType.BIORXIV, URLType.MEDRXIV): 

349 from ..biorxiv import BioRxivDownloader 

350 

351 downloader = BioRxivDownloader(timeout=timeout) 

352 except ImportError: 

353 logger.debug( 

354 f"Pipeline: specialized downloader not available for {url_type.value}" 

355 ) 

356 return None 

357 

358 if not downloader: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 return None 

360 

361 try: 

362 from ..base import ContentType 

363 

364 result = downloader.download_with_result(url, ContentType.TEXT) 

365 if result.is_success and result.content: 365 ↛ 379line 365 didn't jump to line 379 because the condition on line 365 was always true

366 text = result.content.decode("utf-8", errors="replace") 

367 if len(text.strip()) >= MIN_CONTENT_LENGTH: 367 ↛ 379line 367 didn't jump to line 379 because the condition on line 367 was always true

368 logger.debug( 

369 f"Pipeline: specialized downloader ({url_type.value}) " 

370 f"returned {len(text)} chars for {url}" 

371 ) 

372 return text 

373 except Exception: 

374 logger.debug( 

375 f"Pipeline: specialized downloader failed for {url}", 

376 exc_info=True, 

377 ) 

378 finally: 

379 try: 

380 downloader.close() 

381 except Exception: # noqa: silent-exception 

382 pass 

383 

384 # Specialized downloader didn't produce content — fall back to HTML 

385 logger.debug( 

386 f"Pipeline: specialized downloader ({url_type.value}) returned " 

387 f"no content for {url}, falling back to HTML pipeline" 

388 ) 

389 return None 

390 

391 

392def fetch_and_extract( 

393 url: str, 

394 timeout: int = 30, 

395 language: str = "English", 

396 enable_js_rendering: bool = False, 

397) -> Optional[str]: 

398 """Fetch a URL and extract clean text content. 

399 

400 Pipeline: 

401 1. Specialized downloader (arXiv PDF, PubMed API, etc.) if URL matches 

402 2. Static HTTP fetch → Playwright fallback (if JS needed and 

403 ``enable_js_rendering`` is True) → trafilatura → readability → 

404 justext 

405 

406 Args: 

407 url: The URL to fetch. 

408 timeout: Request timeout in seconds. 

409 language: Language for justext stoplist. 

410 enable_js_rendering: When True, the HTML pipeline falls back to a 

411 headless browser for pages that need JavaScript. Defaults to 

412 False because the default Docker production image ships without 

413 Chromium. Limited internal benchmark comparisons (dev instances 

414 with Chromium vs Docker without) showed no measurable 

415 research-quality improvement from JS rendering, and most regular 

416 benchmark runs are on Docker without Chromium anyway. The 

417 user-facing toggle is ``web.enable_javascript_rendering``. 

418 

419 Returns: 

420 Extracted plain text, or None if fetch or extraction failed. 

421 """ 

422 # Try specialized downloader first (arXiv, PubMed, etc.) 

423 specialized = _try_specialized_downloader(url, timeout=timeout) 

424 if specialized: 

425 return specialized 

426 

427 # Generic HTML pipeline 

428 from ..playwright_html import AutoHTMLDownloader 

429 

430 downloader = AutoHTMLDownloader( 

431 timeout=timeout, 

432 language=language, 

433 enable_js_rendering=enable_js_rendering, 

434 ) 

435 try: 

436 # download() returns extracted text as UTF-8 bytes (not raw HTML): 

437 # AutoHTMLDownloader inherits HTMLDownloader.download() which runs 

438 # _fetch_html() → _extract_content() → the full extraction pipeline. 

439 result = downloader.download(url) 

440 if result: 

441 return result.decode("utf-8", errors="replace") 

442 return None 

443 except Exception: 

444 logger.exception(f"fetch_and_extract failed for {url}") 

445 return None 

446 finally: 

447 try: 

448 downloader.close() 

449 except Exception: 

450 logger.debug("Failed to close downloader in fetch_and_extract") 

451 

452 

453def batch_fetch_and_extract( 

454 urls: List[str], 

455 timeout: int = 30, 

456 language: str = "English", 

457 enable_js_rendering: bool = False, 

458) -> Dict[str, Optional[str]]: 

459 """Fetch multiple URLs and extract clean text from each. 

460 

461 For each URL: 

462 1. Try specialized downloader (arXiv, PubMed, etc.) if URL matches 

463 2. Fall back to generic HTML pipeline (AutoHTMLDownloader) 

464 

465 Uses a single AutoHTMLDownloader (and thus a single Playwright 

466 browser if JS fallback is triggered) for the generic HTML URLs. 

467 

468 Args: 

469 urls: List of URLs to fetch. 

470 timeout: Request timeout in seconds per URL. 

471 language: Language for justext stoplist. 

472 enable_js_rendering: When True, the HTML pipeline falls back to a 

473 headless browser for pages that need JavaScript. Defaults to 

474 False because the default Docker production image ships without 

475 Chromium. Limited internal benchmark comparisons (dev instances 

476 with Chromium vs Docker without) showed no measurable 

477 research-quality improvement from JS rendering, and most regular 

478 benchmark runs are on Docker without Chromium anyway. The 

479 user-facing toggle is ``web.enable_javascript_rendering``. 

480 

481 Returns: 

482 Dict mapping URL → extracted text (or None if failed). 

483 """ 

484 from ..playwright_html import AutoHTMLDownloader 

485 

486 results: Dict[str, Optional[str]] = {} 

487 

488 # Try specialized downloaders first — collect URLs that need HTML fallback 

489 html_urls: List[str] = [] 

490 for url in urls: 

491 try: 

492 specialized = _try_specialized_downloader(url, timeout=timeout) 

493 if specialized: 

494 results[url] = specialized 

495 continue 

496 except Exception: 

497 logger.debug( 

498 f"Pipeline: specialized downloader error for {url}", 

499 exc_info=True, 

500 ) 

501 html_urls.append(url) 

502 

503 # Generic HTML pipeline for remaining URLs 

504 if html_urls: 

505 downloader = AutoHTMLDownloader( 

506 timeout=timeout, 

507 language=language, 

508 enable_js_rendering=enable_js_rendering, 

509 ) 

510 try: 

511 for url in html_urls: 

512 try: 

513 data = downloader.download(url) 

514 if data: 514 ↛ 517line 514 didn't jump to line 517 because the condition on line 514 was always true

515 results[url] = data.decode("utf-8", errors="replace") 

516 else: 

517 results[url] = None 

518 except Exception: 

519 logger.exception( 

520 f"batch_fetch_and_extract failed for {url}" 

521 ) 

522 results[url] = None 

523 finally: 

524 try: 

525 downloader.close() 

526 except Exception: 

527 logger.debug( 

528 "Failed to close downloader in batch_fetch_and_extract" 

529 ) 

530 

531 return results