Coverage for src / local_deep_research / research_library / downloaders / extraction / pipeline.py: 50%
202 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Shared extraction pipeline.
4Two entry points:
5 extract_content(html) — HTML string in, clean text out.
6 fetch_and_extract(url) — URL in, clean text out (static + JS fallback).
8This is the single source of truth for content extraction in the project.
9Used by HTMLDownloader, ContentFetcher, FullSearchResults, WaybackSearchEngine,
10and any other code that needs clean text from a web page.
12Academic URLs (arXiv, PubMed, bioRxiv, etc.) are automatically routed to
13specialized downloaders first, with the generic HTML pipeline as fallback.
14"""
16from typing import Any, Dict, List, Optional
18from bs4 import BeautifulSoup
19from loguru import logger
21from .trafilatura_extractor import TrafilaturaExtractor
22from .readability_extractor import ReadabilityExtractor
23from .justext_extractor import JustextExtractor
24from .newspaper_extractor import NewspaperExtractor
25from .metadata_extractor import extract_metadata, metadata_to_text
28# --- Pipeline thresholds ---
29# Minimum extracted text length to consider extraction successful
30MIN_CONTENT_LENGTH = 50
31# If content is shorter than this, enrich with structured metadata
32# (JSON-LD, OpenGraph) — helps product pages, JS-heavy sites
33METADATA_ENRICHMENT_THRESHOLD = 1000
34# Boilerplate penalty per keyword when scoring extraction quality
35BOILERPLATE_PENALTY = 500
36# If an extractor discards more than this fraction of the previous
37# extractor's output, skip it (protects non-English content)
38SAFETY_DISCARD_RATIO = 0.2
40# Module-level singleton extractors (avoid re-creating per call)
41_trafilatura = TrafilaturaExtractor()
42_readability = ReadabilityExtractor()
43_justext_en = JustextExtractor(language="English")
44_newspaper = NewspaperExtractor()
46# Boilerplate keywords — used to penalize low-quality extraction
47_BOILERPLATE_KEYWORDS = [
48 "cookie",
49 "sign up",
50 "newsletter",
51 "subscribe",
52 "accept all",
53 "privacy policy",
54 "terms of service",
55]
58def _run_extractors_parallel(
59 html: str, url: str
60) -> tuple[str | None, str | None]:
61 """Run trafilatura and newspaper4k sequentially.
63 Both extractors call into lxml's C extension, which is not safe to
64 share across threads — running them in a ThreadPoolExecutor caused
65 Fatal Python error: Aborted during pool teardown on Python 3.14
66 (the workers would deadlock in shutdown's join). Serializing the
67 calls eliminates the crash; the perf cost is one extra extraction's
68 worth of CPU per page, which is acceptable.
70 The function name is preserved for backwards compatibility with
71 any callers/tests that import it directly.
72 """
73 try:
74 trafilatura_content = _trafilatura.extract(html)
75 except Exception:
76 logger.debug("Pipeline: trafilatura raised an exception")
77 trafilatura_content = None
79 try:
80 newspaper_content = _newspaper.extract(html, url)
81 except Exception:
82 logger.debug("Pipeline: newspaper4k raised an exception")
83 newspaper_content = None
85 return trafilatura_content, newspaper_content
88def _count_boilerplate(text: str) -> int:
89 """Count boilerplate keyword occurrences in text."""
90 if not text: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 return 0
92 lower = text.lower()
93 return sum(1 for kw in _BOILERPLATE_KEYWORDS if kw in lower)
96def _quality_score(text: str) -> int:
97 """Score extraction quality: length minus boilerplate penalty."""
98 if not text:
99 return 0
100 return len(text) - (_count_boilerplate(text) * BOILERPLATE_PENALTY)
103def extract_content(
104 html: str,
105 language: str = "English",
106 min_length: int = MIN_CONTENT_LENGTH,
107 url: str = "",
108) -> Optional[str]:
109 """Extract clean text content from HTML.
111 Pipeline:
112 1. trafilatura (primary — best benchmarks, multilingual, markdown)
113 2. newspaper4k (parallel — strong on news/forum pages)
114 → pick the higher-quality result from steps 1-2
115 3. readability → justext (fallback if both above fail, with 80% safety)
116 4. soup.get_text() (last resort)
118 Args:
119 html: Raw HTML string.
120 language: Language for justext stoplist (fallback only).
121 min_length: Minimum content length to accept.
122 url: Source URL (improves newspaper4k extraction accuracy).
124 Returns:
125 Extracted plain text, or None if content is below min_length.
126 """
127 if not html or not html.strip(): 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 return None
130 # Run trafilatura and newspaper4k in parallel, pick the better result.
131 # newspaper4k is strong on news front pages and multi-answer threads
132 # where trafilatura sometimes extracts less content.
133 # 5s timeout per extractor — covers P95 of pages, cuts off outliers.
134 trafilatura_content, newspaper_content = _run_extractors_parallel(html, url)
136 traf_score = _quality_score(trafilatura_content)
137 np_score = _quality_score(newspaper_content)
139 if traf_score >= np_score and trafilatura_content:
140 content = trafilatura_content
141 winner = "trafilatura"
142 elif newspaper_content:
143 content = newspaper_content
144 winner = "newspaper4k"
145 else:
146 content = trafilatura_content
147 winner = "trafilatura"
149 if content and len(content.strip()) >= min_length:
150 logger.debug(
151 f"Pipeline: {winner} extracted {len(content)} chars"
152 + (
153 f" (traf={len(trafilatura_content or '')}, "
154 f"np4k={len(newspaper_content or '')})"
155 if newspaper_content and trafilatura_content
156 else ""
157 )
158 )
159 else:
160 # Fallback: readability → justext
161 logger.debug(
162 "Pipeline: primary extractors insufficient, using fallback"
163 )
165 soup = BeautifulSoup(html, "html.parser")
166 for tag_name in [
167 "script",
168 "style",
169 "iframe",
170 "noscript",
171 "svg",
172 "form",
173 "button",
174 "input",
175 "select",
176 "textarea",
177 ]:
178 for tag in soup.find_all(tag_name): 178 ↛ 179line 178 didn't jump to line 179 because the loop on line 178 never started
179 tag.decompose()
180 cleaned_html = str(soup)
182 justext_extractor = (
183 _justext_en
184 if language == "English"
185 else JustextExtractor(language=language)
186 )
188 content = None
189 prev_text_len = 0
191 for extractor in [_readability, justext_extractor]:
192 result = extractor.extract(
193 cleaned_html if prev_text_len == 0 else content
194 )
195 if result and result.strip():
196 result_len = len(result.strip())
197 # Safety: skip if extractor discards >80% of content.
198 # Compare text lengths (strip HTML tags for fair comparison
199 # since readability returns HTML but justext returns text).
200 if ( 200 ↛ 204line 200 didn't jump to line 204 because the condition on line 200 was never true
201 prev_text_len > 0
202 and result_len < prev_text_len * SAFETY_DISCARD_RATIO
203 ):
204 logger.debug(
205 f"Pipeline: {extractor.__class__.__name__} discarded "
206 f">80% of content — skipping"
207 )
208 continue
209 content = result
210 # Store text-equivalent length for fair comparison
211 if "<" in result: 211 ↛ 216line 211 didn't jump to line 216 because the condition on line 211 was always true
212 prev_text_len = len(
213 BeautifulSoup(result, "html.parser").get_text()
214 )
215 else:
216 prev_text_len = result_len
217 logger.debug(
218 f"Pipeline: {extractor.__class__.__name__} "
219 f"returned {result_len} chars"
220 )
222 # Strip remaining HTML tags (e.g. readability-only mode)
223 if content and "<" in content:
224 content = BeautifulSoup(content, "html.parser").get_text(
225 separator="\n", strip=True
226 )
228 # Last resort
229 if not content or len(content.strip()) < min_length: 229 ↛ 233line 229 didn't jump to line 233 because the condition on line 229 was always true
230 logger.debug("Pipeline: all extractors failed, using get_text()")
231 content = soup.get_text(separator="\n", strip=True)
233 if not content or len(content.strip()) < min_length:
234 return None
236 # Enrich with structured metadata when text extraction is thin
237 # (e.g. product pages, JS-heavy sites)
238 if len(content.strip()) < METADATA_ENRICHMENT_THRESHOLD:
239 metadata = extract_metadata(html)
240 supplement = metadata_to_text(metadata)
241 if supplement and supplement.strip():
242 logger.debug(
243 f"Pipeline: enriching with {len(supplement)} chars "
244 f"of structured metadata"
245 )
246 content = content.rstrip() + "\n\n" + supplement
248 return content
251def extract_content_with_metadata(
252 html: str,
253 language: str = "English",
254 min_length: int = MIN_CONTENT_LENGTH,
255) -> Optional[Dict[str, Any]]:
256 """Extract clean text and page metadata from HTML in a single pass.
258 Combines content extraction (trafilatura/readability/justext pipeline)
259 with title and description extraction from HTML meta tags. This avoids
260 the need for callers to do a separate BeautifulSoup parse for metadata.
262 Args:
263 html: Raw HTML string.
264 language: Language for justext stoplist (fallback only).
265 min_length: Minimum content length to accept.
267 Returns:
268 Dict with keys: content, title, description — or None if content
269 is below min_length.
270 """
271 if not html or not html.strip(): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 return None
274 # Single parse for metadata (title, description, og:*)
275 soup = BeautifulSoup(html, "html.parser")
277 title = None
278 if soup.title and soup.title.string:
279 title = soup.title.string.strip()
280 og_title = soup.find("meta", property="og:title")
281 if og_title and og_title.get("content"):
282 title = str(og_title["content"]).strip()
284 description = None
285 meta_desc = soup.find("meta", attrs={"name": "description"})
286 if meta_desc and meta_desc.get("content"):
287 description = str(meta_desc["content"]).strip()
288 og_desc = soup.find("meta", property="og:description")
289 if og_desc and og_desc.get("content"):
290 description = str(og_desc["content"]).strip()
292 # Extract content using the shared pipeline
293 content = extract_content(html, language=language, min_length=min_length)
294 if not content:
295 return None
297 return {
298 "title": title,
299 "description": description,
300 "content": content,
301 }
304def _try_specialized_downloader(url: str, timeout: int = 30) -> Optional[str]:
305 """Try a specialized downloader (arXiv, PubMed, etc.) for the URL.
307 Returns extracted text if a specialized downloader handles this URL
308 and succeeds, or None to signal "fall back to generic HTML pipeline".
309 """
310 try:
311 from local_deep_research.content_fetcher.url_classifier import (
312 URLClassifier,
313 URLType,
314 )
315 except ImportError:
316 return None
318 url_type = URLClassifier.classify(url)
320 # Only academic URL types have specialized downloaders worth trying.
321 # HTML, DOI, PDF, INVALID fall through to the generic pipeline.
322 _SPECIALIZED_TYPES = {
323 URLType.ARXIV,
324 URLType.PUBMED,
325 URLType.PMC,
326 URLType.SEMANTIC_SCHOLAR,
327 URLType.BIORXIV,
328 URLType.MEDRXIV,
329 }
330 if url_type not in _SPECIALIZED_TYPES:
331 return None
333 # Map URL type to downloader class (lazy imports to avoid circular deps)
334 downloader = None
335 try:
336 if url_type == URLType.ARXIV:
337 from ..arxiv import ArxivDownloader
339 downloader = ArxivDownloader(timeout=timeout)
340 elif url_type in (URLType.PUBMED, URLType.PMC):
341 from ..pubmed import PubMedDownloader
343 downloader = PubMedDownloader(timeout=timeout)
344 elif url_type == URLType.SEMANTIC_SCHOLAR:
345 from ..semantic_scholar import SemanticScholarDownloader
347 downloader = SemanticScholarDownloader(timeout=timeout)
348 elif url_type in (URLType.BIORXIV, URLType.MEDRXIV):
349 from ..biorxiv import BioRxivDownloader
351 downloader = BioRxivDownloader(timeout=timeout)
352 except ImportError:
353 logger.debug(
354 f"Pipeline: specialized downloader not available for {url_type.value}"
355 )
356 return None
358 if not downloader:
359 return None
361 try:
362 from ..base import ContentType
364 result = downloader.download_with_result(url, ContentType.TEXT)
365 if result.is_success and result.content:
366 text = result.content.decode("utf-8", errors="replace")
367 if len(text.strip()) >= MIN_CONTENT_LENGTH:
368 logger.debug(
369 f"Pipeline: specialized downloader ({url_type.value}) "
370 f"returned {len(text)} chars for {url}"
371 )
372 return text
373 except Exception:
374 logger.debug(
375 f"Pipeline: specialized downloader failed for {url}",
376 exc_info=True,
377 )
378 finally:
379 try:
380 downloader.close()
381 except Exception: # noqa: silent-exception
382 pass
384 # Specialized downloader didn't produce content — fall back to HTML
385 logger.debug(
386 f"Pipeline: specialized downloader ({url_type.value}) returned "
387 f"no content for {url}, falling back to HTML pipeline"
388 )
389 return None
392def fetch_and_extract(
393 url: str,
394 timeout: int = 30,
395 language: str = "English",
396) -> Optional[str]:
397 """Fetch a URL and extract clean text content.
399 Pipeline:
400 1. Specialized downloader (arXiv PDF, PubMed API, etc.) if URL matches
401 2. Static HTTP fetch → Playwright fallback (if JS needed)
402 → trafilatura → readability → justext
404 Args:
405 url: The URL to fetch.
406 timeout: Request timeout in seconds.
407 language: Language for justext stoplist.
409 Returns:
410 Extracted plain text, or None if fetch or extraction failed.
411 """
412 # Try specialized downloader first (arXiv, PubMed, etc.)
413 specialized = _try_specialized_downloader(url, timeout=timeout)
414 if specialized:
415 return specialized
417 # Generic HTML pipeline
418 from ..playwright_html import AutoHTMLDownloader
420 downloader = AutoHTMLDownloader(
421 timeout=timeout,
422 language=language,
423 )
424 try:
425 # download() returns extracted text as UTF-8 bytes (not raw HTML):
426 # AutoHTMLDownloader inherits HTMLDownloader.download() which runs
427 # _fetch_html() → _extract_content() → the full extraction pipeline.
428 result = downloader.download(url)
429 if result:
430 return result.decode("utf-8", errors="replace")
431 return None
432 except Exception:
433 logger.exception(f"fetch_and_extract failed for {url}")
434 return None
435 finally:
436 try:
437 downloader.close()
438 except Exception:
439 logger.debug("Failed to close downloader in fetch_and_extract")
442def batch_fetch_and_extract(
443 urls: List[str],
444 timeout: int = 30,
445 language: str = "English",
446) -> Dict[str, Optional[str]]:
447 """Fetch multiple URLs and extract clean text from each.
449 For each URL:
450 1. Try specialized downloader (arXiv, PubMed, etc.) if URL matches
451 2. Fall back to generic HTML pipeline (AutoHTMLDownloader)
453 Uses a single AutoHTMLDownloader (and thus a single Playwright
454 browser if JS fallback is triggered) for the generic HTML URLs.
456 Args:
457 urls: List of URLs to fetch.
458 timeout: Request timeout in seconds per URL.
459 language: Language for justext stoplist.
461 Returns:
462 Dict mapping URL → extracted text (or None if failed).
463 """
464 from ..playwright_html import AutoHTMLDownloader
466 results: Dict[str, Optional[str]] = {}
468 # Try specialized downloaders first — collect URLs that need HTML fallback
469 html_urls: List[str] = []
470 for url in urls:
471 try:
472 specialized = _try_specialized_downloader(url, timeout=timeout)
473 if specialized:
474 results[url] = specialized
475 continue
476 except Exception:
477 logger.debug(
478 f"Pipeline: specialized downloader error for {url}",
479 exc_info=True,
480 )
481 html_urls.append(url)
483 # Generic HTML pipeline for remaining URLs
484 if html_urls:
485 downloader = AutoHTMLDownloader(
486 timeout=timeout,
487 language=language,
488 )
489 try:
490 for url in html_urls:
491 try:
492 data = downloader.download(url)
493 if data:
494 results[url] = data.decode("utf-8", errors="replace")
495 else:
496 results[url] = None
497 except Exception:
498 logger.exception(
499 f"batch_fetch_and_extract failed for {url}"
500 )
501 results[url] = None
502 finally:
503 try:
504 downloader.close()
505 except Exception:
506 logger.debug(
507 "Failed to close downloader in batch_fetch_and_extract"
508 )
510 return results