Coverage for src / local_deep_research / research_library / downloaders / extraction / pipeline.py: 50%

202 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Shared extraction pipeline. 

3 

4Two entry points: 

5 extract_content(html) — HTML string in, clean text out. 

6 fetch_and_extract(url) — URL in, clean text out (static + JS fallback). 

7 

8This is the single source of truth for content extraction in the project. 

9Used by HTMLDownloader, ContentFetcher, FullSearchResults, WaybackSearchEngine, 

10and any other code that needs clean text from a web page. 

11 

12Academic URLs (arXiv, PubMed, bioRxiv, etc.) are automatically routed to 

13specialized downloaders first, with the generic HTML pipeline as fallback. 

14""" 

15 

16from typing import Any, Dict, List, Optional 

17 

18from bs4 import BeautifulSoup 

19from loguru import logger 

20 

21from .trafilatura_extractor import TrafilaturaExtractor 

22from .readability_extractor import ReadabilityExtractor 

23from .justext_extractor import JustextExtractor 

24from .newspaper_extractor import NewspaperExtractor 

25from .metadata_extractor import extract_metadata, metadata_to_text 

26 

27 

28# --- Pipeline thresholds --- 

29# Minimum extracted text length to consider extraction successful 

30MIN_CONTENT_LENGTH = 50 

31# If content is shorter than this, enrich with structured metadata 

32# (JSON-LD, OpenGraph) — helps product pages, JS-heavy sites 

33METADATA_ENRICHMENT_THRESHOLD = 1000 

34# Boilerplate penalty per keyword when scoring extraction quality 

35BOILERPLATE_PENALTY = 500 

36# If an extractor discards more than this fraction of the previous 

37# extractor's output, skip it (protects non-English content) 

38SAFETY_DISCARD_RATIO = 0.2 

39 

40# Module-level singleton extractors (avoid re-creating per call) 

41_trafilatura = TrafilaturaExtractor() 

42_readability = ReadabilityExtractor() 

43_justext_en = JustextExtractor(language="English") 

44_newspaper = NewspaperExtractor() 

45 

46# Boilerplate keywords — used to penalize low-quality extraction 

47_BOILERPLATE_KEYWORDS = [ 

48 "cookie", 

49 "sign up", 

50 "newsletter", 

51 "subscribe", 

52 "accept all", 

53 "privacy policy", 

54 "terms of service", 

55] 

56 

57 

58def _run_extractors_parallel( 

59 html: str, url: str 

60) -> tuple[str | None, str | None]: 

61 """Run trafilatura and newspaper4k sequentially. 

62 

63 Both extractors call into lxml's C extension, which is not safe to 

64 share across threads — running them in a ThreadPoolExecutor caused 

65 Fatal Python error: Aborted during pool teardown on Python 3.14 

66 (the workers would deadlock in shutdown's join). Serializing the 

67 calls eliminates the crash; the perf cost is one extra extraction's 

68 worth of CPU per page, which is acceptable. 

69 

70 The function name is preserved for backwards compatibility with 

71 any callers/tests that import it directly. 

72 """ 

73 try: 

74 trafilatura_content = _trafilatura.extract(html) 

75 except Exception: 

76 logger.debug("Pipeline: trafilatura raised an exception") 

77 trafilatura_content = None 

78 

79 try: 

80 newspaper_content = _newspaper.extract(html, url) 

81 except Exception: 

82 logger.debug("Pipeline: newspaper4k raised an exception") 

83 newspaper_content = None 

84 

85 return trafilatura_content, newspaper_content 

86 

87 

88def _count_boilerplate(text: str) -> int: 

89 """Count boilerplate keyword occurrences in text.""" 

90 if not text: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 return 0 

92 lower = text.lower() 

93 return sum(1 for kw in _BOILERPLATE_KEYWORDS if kw in lower) 

94 

95 

96def _quality_score(text: str) -> int: 

97 """Score extraction quality: length minus boilerplate penalty.""" 

98 if not text: 

99 return 0 

100 return len(text) - (_count_boilerplate(text) * BOILERPLATE_PENALTY) 

101 

102 

103def extract_content( 

104 html: str, 

105 language: str = "English", 

106 min_length: int = MIN_CONTENT_LENGTH, 

107 url: str = "", 

108) -> Optional[str]: 

109 """Extract clean text content from HTML. 

110 

111 Pipeline: 

112 1. trafilatura (primary — best benchmarks, multilingual, markdown) 

113 2. newspaper4k (parallel — strong on news/forum pages) 

114 → pick the higher-quality result from steps 1-2 

115 3. readability → justext (fallback if both above fail, with 80% safety) 

116 4. soup.get_text() (last resort) 

117 

118 Args: 

119 html: Raw HTML string. 

120 language: Language for justext stoplist (fallback only). 

121 min_length: Minimum content length to accept. 

122 url: Source URL (improves newspaper4k extraction accuracy). 

123 

124 Returns: 

125 Extracted plain text, or None if content is below min_length. 

126 """ 

127 if not html or not html.strip(): 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 return None 

129 

130 # Run trafilatura and newspaper4k in parallel, pick the better result. 

131 # newspaper4k is strong on news front pages and multi-answer threads 

132 # where trafilatura sometimes extracts less content. 

133 # 5s timeout per extractor — covers P95 of pages, cuts off outliers. 

134 trafilatura_content, newspaper_content = _run_extractors_parallel(html, url) 

135 

136 traf_score = _quality_score(trafilatura_content) 

137 np_score = _quality_score(newspaper_content) 

138 

139 if traf_score >= np_score and trafilatura_content: 

140 content = trafilatura_content 

141 winner = "trafilatura" 

142 elif newspaper_content: 

143 content = newspaper_content 

144 winner = "newspaper4k" 

145 else: 

146 content = trafilatura_content 

147 winner = "trafilatura" 

148 

149 if content and len(content.strip()) >= min_length: 

150 logger.debug( 

151 f"Pipeline: {winner} extracted {len(content)} chars" 

152 + ( 

153 f" (traf={len(trafilatura_content or '')}, " 

154 f"np4k={len(newspaper_content or '')})" 

155 if newspaper_content and trafilatura_content 

156 else "" 

157 ) 

158 ) 

159 else: 

160 # Fallback: readability → justext 

161 logger.debug( 

162 "Pipeline: primary extractors insufficient, using fallback" 

163 ) 

164 

165 soup = BeautifulSoup(html, "html.parser") 

166 for tag_name in [ 

167 "script", 

168 "style", 

169 "iframe", 

170 "noscript", 

171 "svg", 

172 "form", 

173 "button", 

174 "input", 

175 "select", 

176 "textarea", 

177 ]: 

178 for tag in soup.find_all(tag_name): 178 ↛ 179line 178 didn't jump to line 179 because the loop on line 178 never started

179 tag.decompose() 

180 cleaned_html = str(soup) 

181 

182 justext_extractor = ( 

183 _justext_en 

184 if language == "English" 

185 else JustextExtractor(language=language) 

186 ) 

187 

188 content = None 

189 prev_text_len = 0 

190 

191 for extractor in [_readability, justext_extractor]: 

192 result = extractor.extract( 

193 cleaned_html if prev_text_len == 0 else content 

194 ) 

195 if result and result.strip(): 

196 result_len = len(result.strip()) 

197 # Safety: skip if extractor discards >80% of content. 

198 # Compare text lengths (strip HTML tags for fair comparison 

199 # since readability returns HTML but justext returns text). 

200 if ( 200 ↛ 204line 200 didn't jump to line 204 because the condition on line 200 was never true

201 prev_text_len > 0 

202 and result_len < prev_text_len * SAFETY_DISCARD_RATIO 

203 ): 

204 logger.debug( 

205 f"Pipeline: {extractor.__class__.__name__} discarded " 

206 f">80% of content — skipping" 

207 ) 

208 continue 

209 content = result 

210 # Store text-equivalent length for fair comparison 

211 if "<" in result: 211 ↛ 216line 211 didn't jump to line 216 because the condition on line 211 was always true

212 prev_text_len = len( 

213 BeautifulSoup(result, "html.parser").get_text() 

214 ) 

215 else: 

216 prev_text_len = result_len 

217 logger.debug( 

218 f"Pipeline: {extractor.__class__.__name__} " 

219 f"returned {result_len} chars" 

220 ) 

221 

222 # Strip remaining HTML tags (e.g. readability-only mode) 

223 if content and "<" in content: 

224 content = BeautifulSoup(content, "html.parser").get_text( 

225 separator="\n", strip=True 

226 ) 

227 

228 # Last resort 

229 if not content or len(content.strip()) < min_length: 229 ↛ 233line 229 didn't jump to line 233 because the condition on line 229 was always true

230 logger.debug("Pipeline: all extractors failed, using get_text()") 

231 content = soup.get_text(separator="\n", strip=True) 

232 

233 if not content or len(content.strip()) < min_length: 

234 return None 

235 

236 # Enrich with structured metadata when text extraction is thin 

237 # (e.g. product pages, JS-heavy sites) 

238 if len(content.strip()) < METADATA_ENRICHMENT_THRESHOLD: 

239 metadata = extract_metadata(html) 

240 supplement = metadata_to_text(metadata) 

241 if supplement and supplement.strip(): 

242 logger.debug( 

243 f"Pipeline: enriching with {len(supplement)} chars " 

244 f"of structured metadata" 

245 ) 

246 content = content.rstrip() + "\n\n" + supplement 

247 

248 return content 

249 

250 

251def extract_content_with_metadata( 

252 html: str, 

253 language: str = "English", 

254 min_length: int = MIN_CONTENT_LENGTH, 

255) -> Optional[Dict[str, Any]]: 

256 """Extract clean text and page metadata from HTML in a single pass. 

257 

258 Combines content extraction (trafilatura/readability/justext pipeline) 

259 with title and description extraction from HTML meta tags. This avoids 

260 the need for callers to do a separate BeautifulSoup parse for metadata. 

261 

262 Args: 

263 html: Raw HTML string. 

264 language: Language for justext stoplist (fallback only). 

265 min_length: Minimum content length to accept. 

266 

267 Returns: 

268 Dict with keys: content, title, description — or None if content 

269 is below min_length. 

270 """ 

271 if not html or not html.strip(): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 return None 

273 

274 # Single parse for metadata (title, description, og:*) 

275 soup = BeautifulSoup(html, "html.parser") 

276 

277 title = None 

278 if soup.title and soup.title.string: 

279 title = soup.title.string.strip() 

280 og_title = soup.find("meta", property="og:title") 

281 if og_title and og_title.get("content"): 

282 title = str(og_title["content"]).strip() 

283 

284 description = None 

285 meta_desc = soup.find("meta", attrs={"name": "description"}) 

286 if meta_desc and meta_desc.get("content"): 

287 description = str(meta_desc["content"]).strip() 

288 og_desc = soup.find("meta", property="og:description") 

289 if og_desc and og_desc.get("content"): 

290 description = str(og_desc["content"]).strip() 

291 

292 # Extract content using the shared pipeline 

293 content = extract_content(html, language=language, min_length=min_length) 

294 if not content: 

295 return None 

296 

297 return { 

298 "title": title, 

299 "description": description, 

300 "content": content, 

301 } 

302 

303 

304def _try_specialized_downloader(url: str, timeout: int = 30) -> Optional[str]: 

305 """Try a specialized downloader (arXiv, PubMed, etc.) for the URL. 

306 

307 Returns extracted text if a specialized downloader handles this URL 

308 and succeeds, or None to signal "fall back to generic HTML pipeline". 

309 """ 

310 try: 

311 from local_deep_research.content_fetcher.url_classifier import ( 

312 URLClassifier, 

313 URLType, 

314 ) 

315 except ImportError: 

316 return None 

317 

318 url_type = URLClassifier.classify(url) 

319 

320 # Only academic URL types have specialized downloaders worth trying. 

321 # HTML, DOI, PDF, INVALID fall through to the generic pipeline. 

322 _SPECIALIZED_TYPES = { 

323 URLType.ARXIV, 

324 URLType.PUBMED, 

325 URLType.PMC, 

326 URLType.SEMANTIC_SCHOLAR, 

327 URLType.BIORXIV, 

328 URLType.MEDRXIV, 

329 } 

330 if url_type not in _SPECIALIZED_TYPES: 

331 return None 

332 

333 # Map URL type to downloader class (lazy imports to avoid circular deps) 

334 downloader = None 

335 try: 

336 if url_type == URLType.ARXIV: 

337 from ..arxiv import ArxivDownloader 

338 

339 downloader = ArxivDownloader(timeout=timeout) 

340 elif url_type in (URLType.PUBMED, URLType.PMC): 

341 from ..pubmed import PubMedDownloader 

342 

343 downloader = PubMedDownloader(timeout=timeout) 

344 elif url_type == URLType.SEMANTIC_SCHOLAR: 

345 from ..semantic_scholar import SemanticScholarDownloader 

346 

347 downloader = SemanticScholarDownloader(timeout=timeout) 

348 elif url_type in (URLType.BIORXIV, URLType.MEDRXIV): 

349 from ..biorxiv import BioRxivDownloader 

350 

351 downloader = BioRxivDownloader(timeout=timeout) 

352 except ImportError: 

353 logger.debug( 

354 f"Pipeline: specialized downloader not available for {url_type.value}" 

355 ) 

356 return None 

357 

358 if not downloader: 

359 return None 

360 

361 try: 

362 from ..base import ContentType 

363 

364 result = downloader.download_with_result(url, ContentType.TEXT) 

365 if result.is_success and result.content: 

366 text = result.content.decode("utf-8", errors="replace") 

367 if len(text.strip()) >= MIN_CONTENT_LENGTH: 

368 logger.debug( 

369 f"Pipeline: specialized downloader ({url_type.value}) " 

370 f"returned {len(text)} chars for {url}" 

371 ) 

372 return text 

373 except Exception: 

374 logger.debug( 

375 f"Pipeline: specialized downloader failed for {url}", 

376 exc_info=True, 

377 ) 

378 finally: 

379 try: 

380 downloader.close() 

381 except Exception: # noqa: silent-exception 

382 pass 

383 

384 # Specialized downloader didn't produce content — fall back to HTML 

385 logger.debug( 

386 f"Pipeline: specialized downloader ({url_type.value}) returned " 

387 f"no content for {url}, falling back to HTML pipeline" 

388 ) 

389 return None 

390 

391 

392def fetch_and_extract( 

393 url: str, 

394 timeout: int = 30, 

395 language: str = "English", 

396) -> Optional[str]: 

397 """Fetch a URL and extract clean text content. 

398 

399 Pipeline: 

400 1. Specialized downloader (arXiv PDF, PubMed API, etc.) if URL matches 

401 2. Static HTTP fetch → Playwright fallback (if JS needed) 

402 → trafilatura → readability → justext 

403 

404 Args: 

405 url: The URL to fetch. 

406 timeout: Request timeout in seconds. 

407 language: Language for justext stoplist. 

408 

409 Returns: 

410 Extracted plain text, or None if fetch or extraction failed. 

411 """ 

412 # Try specialized downloader first (arXiv, PubMed, etc.) 

413 specialized = _try_specialized_downloader(url, timeout=timeout) 

414 if specialized: 

415 return specialized 

416 

417 # Generic HTML pipeline 

418 from ..playwright_html import AutoHTMLDownloader 

419 

420 downloader = AutoHTMLDownloader( 

421 timeout=timeout, 

422 language=language, 

423 ) 

424 try: 

425 # download() returns extracted text as UTF-8 bytes (not raw HTML): 

426 # AutoHTMLDownloader inherits HTMLDownloader.download() which runs 

427 # _fetch_html() → _extract_content() → the full extraction pipeline. 

428 result = downloader.download(url) 

429 if result: 

430 return result.decode("utf-8", errors="replace") 

431 return None 

432 except Exception: 

433 logger.exception(f"fetch_and_extract failed for {url}") 

434 return None 

435 finally: 

436 try: 

437 downloader.close() 

438 except Exception: 

439 logger.debug("Failed to close downloader in fetch_and_extract") 

440 

441 

442def batch_fetch_and_extract( 

443 urls: List[str], 

444 timeout: int = 30, 

445 language: str = "English", 

446) -> Dict[str, Optional[str]]: 

447 """Fetch multiple URLs and extract clean text from each. 

448 

449 For each URL: 

450 1. Try specialized downloader (arXiv, PubMed, etc.) if URL matches 

451 2. Fall back to generic HTML pipeline (AutoHTMLDownloader) 

452 

453 Uses a single AutoHTMLDownloader (and thus a single Playwright 

454 browser if JS fallback is triggered) for the generic HTML URLs. 

455 

456 Args: 

457 urls: List of URLs to fetch. 

458 timeout: Request timeout in seconds per URL. 

459 language: Language for justext stoplist. 

460 

461 Returns: 

462 Dict mapping URL → extracted text (or None if failed). 

463 """ 

464 from ..playwright_html import AutoHTMLDownloader 

465 

466 results: Dict[str, Optional[str]] = {} 

467 

468 # Try specialized downloaders first — collect URLs that need HTML fallback 

469 html_urls: List[str] = [] 

470 for url in urls: 

471 try: 

472 specialized = _try_specialized_downloader(url, timeout=timeout) 

473 if specialized: 

474 results[url] = specialized 

475 continue 

476 except Exception: 

477 logger.debug( 

478 f"Pipeline: specialized downloader error for {url}", 

479 exc_info=True, 

480 ) 

481 html_urls.append(url) 

482 

483 # Generic HTML pipeline for remaining URLs 

484 if html_urls: 

485 downloader = AutoHTMLDownloader( 

486 timeout=timeout, 

487 language=language, 

488 ) 

489 try: 

490 for url in html_urls: 

491 try: 

492 data = downloader.download(url) 

493 if data: 

494 results[url] = data.decode("utf-8", errors="replace") 

495 else: 

496 results[url] = None 

497 except Exception: 

498 logger.exception( 

499 f"batch_fetch_and_extract failed for {url}" 

500 ) 

501 results[url] = None 

502 finally: 

503 try: 

504 downloader.close() 

505 except Exception: 

506 logger.debug( 

507 "Failed to close downloader in batch_fetch_and_extract" 

508 ) 

509 

510 return results