Coverage for src/local_deep_research/research_library/downloaders/extraction/pipeline.py: 87%
202 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Shared extraction pipeline.
4Two entry points:
5 extract_content(html) — HTML string in, clean text out.
6 fetch_and_extract(url) — URL in, clean text out (static + JS fallback).
8This is the single source of truth for content extraction in the project.
9Used by HTMLDownloader, ContentFetcher, FullSearchResults, WaybackSearchEngine,
10and any other code that needs clean text from a web page.
12Academic URLs (arXiv, PubMed, bioRxiv, etc.) are automatically routed to
13specialized downloaders first, with the generic HTML pipeline as fallback.
14"""
16from typing import Any, Dict, List, Optional
18from bs4 import BeautifulSoup
19from loguru import logger
21from .trafilatura_extractor import TrafilaturaExtractor
22from .readability_extractor import ReadabilityExtractor
23from .justext_extractor import JustextExtractor
24from .newspaper_extractor import NewspaperExtractor
25from .metadata_extractor import extract_metadata, metadata_to_text
28# --- Pipeline thresholds ---
29# Minimum extracted text length to consider extraction successful
30MIN_CONTENT_LENGTH = 50
31# If content is shorter than this, enrich with structured metadata
32# (JSON-LD, OpenGraph) — helps product pages, JS-heavy sites
33METADATA_ENRICHMENT_THRESHOLD = 1000
34# Boilerplate penalty per keyword when scoring extraction quality
35BOILERPLATE_PENALTY = 500
36# If an extractor discards more than this fraction of the previous
37# extractor's output, skip it (protects non-English content)
38SAFETY_DISCARD_RATIO = 0.2
40# Module-level singleton extractors (avoid re-creating per call)
41_trafilatura = TrafilaturaExtractor()
42_readability = ReadabilityExtractor()
43_justext_en = JustextExtractor(language="English")
44_newspaper = NewspaperExtractor()
46# Boilerplate keywords — used to penalize low-quality extraction
47_BOILERPLATE_KEYWORDS = [
48 "cookie",
49 "sign up",
50 "newsletter",
51 "subscribe",
52 "accept all",
53 "privacy policy",
54 "terms of service",
55]
58def _run_extractors_parallel(
59 html: str, url: str
60) -> tuple[str | None, str | None]:
61 """Run trafilatura and newspaper4k sequentially.
63 Both extractors call into lxml's C extension, which is not safe to
64 share across threads — running them in a ThreadPoolExecutor caused
65 Fatal Python error: Aborted during pool teardown on Python 3.14
66 (the workers would deadlock in shutdown's join). Serializing the
67 calls eliminates the crash; the perf cost is one extra extraction's
68 worth of CPU per page, which is acceptable.
70 The function name is preserved for backwards compatibility with
71 any callers/tests that import it directly.
72 """
73 try:
74 trafilatura_content = _trafilatura.extract(html)
75 except Exception:
76 logger.debug("Pipeline: trafilatura raised an exception")
77 trafilatura_content = None
79 try:
80 newspaper_content = _newspaper.extract(html, url)
81 except Exception:
82 logger.debug("Pipeline: newspaper4k raised an exception")
83 newspaper_content = None
85 return trafilatura_content, newspaper_content
88def _count_boilerplate(text: str) -> int:
89 """Count boilerplate keyword occurrences in text."""
90 if not text:
91 return 0
92 lower = text.lower()
93 return sum(1 for kw in _BOILERPLATE_KEYWORDS if kw in lower)
96def _quality_score(text: str) -> int:
97 """Score extraction quality: length minus boilerplate penalty."""
98 if not text:
99 return 0
100 return len(text) - (_count_boilerplate(text) * BOILERPLATE_PENALTY)
103def extract_content(
104 html: str,
105 language: str = "English",
106 min_length: int = MIN_CONTENT_LENGTH,
107 url: str = "",
108) -> Optional[str]:
109 """Extract clean text content from HTML.
111 Pipeline:
112 1. trafilatura (primary — best benchmarks, multilingual, markdown)
113 2. newspaper4k (parallel — strong on news/forum pages)
114 → pick the higher-quality result from steps 1-2
115 3. readability → justext (fallback if both above fail, with 80% safety)
116 4. soup.get_text() (last resort)
118 Args:
119 html: Raw HTML string.
120 language: Language for justext stoplist (fallback only).
121 min_length: Minimum content length to accept.
122 url: Source URL (improves newspaper4k extraction accuracy).
124 Returns:
125 Extracted plain text, or None if content is below min_length.
126 """
127 if not html or not html.strip():
128 return None
130 # Run trafilatura and newspaper4k in parallel, pick the better result.
131 # newspaper4k is strong on news front pages and multi-answer threads
132 # where trafilatura sometimes extracts less content.
133 # 5s timeout per extractor — covers P95 of pages, cuts off outliers.
134 trafilatura_content, newspaper_content = _run_extractors_parallel(html, url)
136 traf_score = _quality_score(trafilatura_content)
137 np_score = _quality_score(newspaper_content)
139 if traf_score >= np_score and trafilatura_content:
140 content = trafilatura_content
141 winner = "trafilatura"
142 elif newspaper_content:
143 content = newspaper_content
144 winner = "newspaper4k"
145 else:
146 content = trafilatura_content
147 winner = "trafilatura"
149 if content and len(content.strip()) >= min_length:
150 logger.debug(
151 f"Pipeline: {winner} extracted {len(content)} chars"
152 + (
153 f" (traf={len(trafilatura_content or '')}, "
154 f"np4k={len(newspaper_content or '')})"
155 if newspaper_content and trafilatura_content
156 else ""
157 )
158 )
159 else:
160 # Fallback: readability → justext
161 logger.debug(
162 "Pipeline: primary extractors insufficient, using fallback"
163 )
165 soup = BeautifulSoup(html, "html.parser")
166 for tag_name in [
167 "script",
168 "style",
169 "iframe",
170 "noscript",
171 "svg",
172 "form",
173 "button",
174 "input",
175 "select",
176 "textarea",
177 ]:
178 for tag in soup.find_all(tag_name):
179 tag.decompose()
180 cleaned_html = str(soup)
182 justext_extractor = (
183 _justext_en
184 if language == "English"
185 else JustextExtractor(language=language)
186 )
188 content = None
189 prev_text_len = 0
191 for extractor in [_readability, justext_extractor]:
192 result = extractor.extract(
193 cleaned_html if prev_text_len == 0 else content
194 )
195 if result and result.strip():
196 result_len = len(result.strip())
197 # Safety: skip if extractor discards >80% of content.
198 # Compare text lengths (strip HTML tags for fair comparison
199 # since readability returns HTML but justext returns text).
200 if (
201 prev_text_len > 0
202 and result_len < prev_text_len * SAFETY_DISCARD_RATIO
203 ):
204 logger.debug(
205 f"Pipeline: {extractor.__class__.__name__} discarded "
206 f">80% of content — skipping"
207 )
208 continue
209 content = result
210 # Store text-equivalent length for fair comparison
211 if "<" in result:
212 prev_text_len = len(
213 BeautifulSoup(result, "html.parser").get_text()
214 )
215 else:
216 prev_text_len = result_len
217 logger.debug(
218 f"Pipeline: {extractor.__class__.__name__} "
219 f"returned {result_len} chars"
220 )
222 # Strip remaining HTML tags (e.g. readability-only mode)
223 if content and "<" in content:
224 content = BeautifulSoup(content, "html.parser").get_text(
225 separator="\n", strip=True
226 )
228 # Last resort
229 if not content or len(content.strip()) < min_length:
230 logger.debug("Pipeline: all extractors failed, using get_text()")
231 content = soup.get_text(separator="\n", strip=True)
233 if not content or len(content.strip()) < min_length:
234 return None
236 # Enrich with structured metadata when text extraction is thin
237 # (e.g. product pages, JS-heavy sites)
238 if len(content.strip()) < METADATA_ENRICHMENT_THRESHOLD:
239 metadata = extract_metadata(html)
240 supplement = metadata_to_text(metadata)
241 if supplement and supplement.strip():
242 logger.debug(
243 f"Pipeline: enriching with {len(supplement)} chars "
244 f"of structured metadata"
245 )
246 content = content.rstrip() + "\n\n" + supplement
248 return content
251def extract_content_with_metadata(
252 html: str,
253 language: str = "English",
254 min_length: int = MIN_CONTENT_LENGTH,
255) -> Optional[Dict[str, Any]]:
256 """Extract clean text and page metadata from HTML in a single pass.
258 Combines content extraction (trafilatura/readability/justext pipeline)
259 with title and description extraction from HTML meta tags. This avoids
260 the need for callers to do a separate BeautifulSoup parse for metadata.
262 Args:
263 html: Raw HTML string.
264 language: Language for justext stoplist (fallback only).
265 min_length: Minimum content length to accept.
267 Returns:
268 Dict with keys: content, title, description — or None if content
269 is below min_length.
270 """
271 if not html or not html.strip():
272 return None
274 # Single parse for metadata (title, description, og:*)
275 soup = BeautifulSoup(html, "html.parser")
277 title = None
278 if soup.title and soup.title.string:
279 title = soup.title.string.strip()
280 og_title = soup.find("meta", property="og:title")
281 if og_title and og_title.get("content"):
282 title = str(og_title["content"]).strip()
284 description = None
285 meta_desc = soup.find("meta", attrs={"name": "description"})
286 if meta_desc and meta_desc.get("content"):
287 description = str(meta_desc["content"]).strip()
288 og_desc = soup.find("meta", property="og:description")
289 if og_desc and og_desc.get("content"):
290 description = str(og_desc["content"]).strip()
292 # Extract content using the shared pipeline
293 content = extract_content(html, language=language, min_length=min_length)
294 if not content:
295 return None
297 return {
298 "title": title,
299 "description": description,
300 "content": content,
301 }
304def _try_specialized_downloader(url: str, timeout: int = 30) -> Optional[str]:
305 """Try a specialized downloader (arXiv, PubMed, etc.) for the URL.
307 Returns extracted text if a specialized downloader handles this URL
308 and succeeds, or None to signal "fall back to generic HTML pipeline".
309 """
310 try:
311 from local_deep_research.content_fetcher.url_classifier import (
312 URLClassifier,
313 URLType,
314 )
315 except ImportError:
316 return None
318 url_type = URLClassifier.classify(url)
320 # Only academic URL types have specialized downloaders worth trying.
321 # HTML, DOI, PDF, INVALID fall through to the generic pipeline.
322 _SPECIALIZED_TYPES = {
323 URLType.ARXIV,
324 URLType.PUBMED,
325 URLType.PMC,
326 URLType.SEMANTIC_SCHOLAR,
327 URLType.BIORXIV,
328 URLType.MEDRXIV,
329 }
330 if url_type not in _SPECIALIZED_TYPES:
331 return None
333 # Map URL type to downloader class (lazy imports to avoid circular deps)
334 downloader = None
335 try:
336 if url_type == URLType.ARXIV: 336 ↛ 340line 336 didn't jump to line 340 because the condition on line 336 was always true
337 from ..arxiv import ArxivDownloader
339 downloader = ArxivDownloader(timeout=timeout)
340 elif url_type in (URLType.PUBMED, URLType.PMC):
341 from ..pubmed import PubMedDownloader
343 downloader = PubMedDownloader(timeout=timeout)
344 elif url_type == URLType.SEMANTIC_SCHOLAR:
345 from ..semantic_scholar import SemanticScholarDownloader
347 downloader = SemanticScholarDownloader(timeout=timeout)
348 elif url_type in (URLType.BIORXIV, URLType.MEDRXIV):
349 from ..biorxiv import BioRxivDownloader
351 downloader = BioRxivDownloader(timeout=timeout)
352 except ImportError:
353 logger.debug(
354 f"Pipeline: specialized downloader not available for {url_type.value}"
355 )
356 return None
358 if not downloader: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 return None
361 try:
362 from ..base import ContentType
364 result = downloader.download_with_result(url, ContentType.TEXT)
365 if result.is_success and result.content: 365 ↛ 379line 365 didn't jump to line 379 because the condition on line 365 was always true
366 text = result.content.decode("utf-8", errors="replace")
367 if len(text.strip()) >= MIN_CONTENT_LENGTH: 367 ↛ 379line 367 didn't jump to line 379 because the condition on line 367 was always true
368 logger.debug(
369 f"Pipeline: specialized downloader ({url_type.value}) "
370 f"returned {len(text)} chars for {url}"
371 )
372 return text
373 except Exception:
374 logger.debug(
375 f"Pipeline: specialized downloader failed for {url}",
376 exc_info=True,
377 )
378 finally:
379 try:
380 downloader.close()
381 except Exception: # noqa: silent-exception
382 pass
384 # Specialized downloader didn't produce content — fall back to HTML
385 logger.debug(
386 f"Pipeline: specialized downloader ({url_type.value}) returned "
387 f"no content for {url}, falling back to HTML pipeline"
388 )
389 return None
392def fetch_and_extract(
393 url: str,
394 timeout: int = 30,
395 language: str = "English",
396 enable_js_rendering: bool = False,
397) -> Optional[str]:
398 """Fetch a URL and extract clean text content.
400 Pipeline:
401 1. Specialized downloader (arXiv PDF, PubMed API, etc.) if URL matches
402 2. Static HTTP fetch → Playwright fallback (if JS needed and
403 ``enable_js_rendering`` is True) → trafilatura → readability →
404 justext
406 Args:
407 url: The URL to fetch.
408 timeout: Request timeout in seconds.
409 language: Language for justext stoplist.
410 enable_js_rendering: When True, the HTML pipeline falls back to a
411 headless browser for pages that need JavaScript. Defaults to
412 False because the default Docker production image ships without
413 Chromium. Limited internal benchmark comparisons (dev instances
414 with Chromium vs Docker without) showed no measurable
415 research-quality improvement from JS rendering, and most regular
416 benchmark runs are on Docker without Chromium anyway. The
417 user-facing toggle is ``web.enable_javascript_rendering``.
419 Returns:
420 Extracted plain text, or None if fetch or extraction failed.
421 """
422 # Try specialized downloader first (arXiv, PubMed, etc.)
423 specialized = _try_specialized_downloader(url, timeout=timeout)
424 if specialized:
425 return specialized
427 # Generic HTML pipeline
428 from ..playwright_html import AutoHTMLDownloader
430 downloader = AutoHTMLDownloader(
431 timeout=timeout,
432 language=language,
433 enable_js_rendering=enable_js_rendering,
434 )
435 try:
436 # download() returns extracted text as UTF-8 bytes (not raw HTML):
437 # AutoHTMLDownloader inherits HTMLDownloader.download() which runs
438 # _fetch_html() → _extract_content() → the full extraction pipeline.
439 result = downloader.download(url)
440 if result:
441 return result.decode("utf-8", errors="replace")
442 return None
443 except Exception:
444 logger.exception(f"fetch_and_extract failed for {url}")
445 return None
446 finally:
447 try:
448 downloader.close()
449 except Exception:
450 logger.debug("Failed to close downloader in fetch_and_extract")
453def batch_fetch_and_extract(
454 urls: List[str],
455 timeout: int = 30,
456 language: str = "English",
457 enable_js_rendering: bool = False,
458) -> Dict[str, Optional[str]]:
459 """Fetch multiple URLs and extract clean text from each.
461 For each URL:
462 1. Try specialized downloader (arXiv, PubMed, etc.) if URL matches
463 2. Fall back to generic HTML pipeline (AutoHTMLDownloader)
465 Uses a single AutoHTMLDownloader (and thus a single Playwright
466 browser if JS fallback is triggered) for the generic HTML URLs.
468 Args:
469 urls: List of URLs to fetch.
470 timeout: Request timeout in seconds per URL.
471 language: Language for justext stoplist.
472 enable_js_rendering: When True, the HTML pipeline falls back to a
473 headless browser for pages that need JavaScript. Defaults to
474 False because the default Docker production image ships without
475 Chromium. Limited internal benchmark comparisons (dev instances
476 with Chromium vs Docker without) showed no measurable
477 research-quality improvement from JS rendering, and most regular
478 benchmark runs are on Docker without Chromium anyway. The
479 user-facing toggle is ``web.enable_javascript_rendering``.
481 Returns:
482 Dict mapping URL → extracted text (or None if failed).
483 """
484 from ..playwright_html import AutoHTMLDownloader
486 results: Dict[str, Optional[str]] = {}
488 # Try specialized downloaders first — collect URLs that need HTML fallback
489 html_urls: List[str] = []
490 for url in urls:
491 try:
492 specialized = _try_specialized_downloader(url, timeout=timeout)
493 if specialized:
494 results[url] = specialized
495 continue
496 except Exception:
497 logger.debug(
498 f"Pipeline: specialized downloader error for {url}",
499 exc_info=True,
500 )
501 html_urls.append(url)
503 # Generic HTML pipeline for remaining URLs
504 if html_urls:
505 downloader = AutoHTMLDownloader(
506 timeout=timeout,
507 language=language,
508 enable_js_rendering=enable_js_rendering,
509 )
510 try:
511 for url in html_urls:
512 try:
513 data = downloader.download(url)
514 if data: 514 ↛ 517line 514 didn't jump to line 517 because the condition on line 514 was always true
515 results[url] = data.decode("utf-8", errors="replace")
516 else:
517 results[url] = None
518 except Exception:
519 logger.exception(
520 f"batch_fetch_and_extract failed for {url}"
521 )
522 results[url] = None
523 finally:
524 try:
525 downloader.close()
526 except Exception:
527 logger.debug(
528 "Failed to close downloader in batch_fetch_and_extract"
529 )
531 return results