Coverage for src / local_deep_research / research_library / downloaders / base.py: 86%
121 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Base Academic Content Downloader Abstract Class
3"""
5from abc import ABC, abstractmethod
6from typing import Optional, Dict, Any, NamedTuple
7from enum import Enum
8import requests
9from urllib.parse import urlparse
10from loguru import logger
12# Import our adaptive rate limiting system
13from ...web_search_engines.rate_limiting import (
14 AdaptiveRateLimitTracker,
15)
16from ...security import SafeSession
18# Import centralized User-Agent from constants
19from ...constants import USER_AGENT # noqa: F401 - re-exported for backward compatibility
22class ContentType(Enum):
23 """Supported content types for download."""
25 PDF = "pdf"
26 TEXT = "text"
29class DownloadResult(NamedTuple):
30 """Result of a download attempt."""
32 content: Optional[bytes] = None
33 skip_reason: Optional[str] = None
34 is_success: bool = False
37class BaseDownloader(ABC):
38 """Abstract base class for academic content downloaders."""
40 def __init__(self, timeout: int = 30):
41 """
42 Initialize the downloader.
44 Args:
45 timeout: Request timeout in seconds
46 """
47 self.timeout = timeout
48 self.session = SafeSession()
49 self.session.headers.update({"User-Agent": USER_AGENT})
51 # Initialize rate limiter for PDF downloads
52 # We'll use domain-specific rate limiting
53 self.rate_tracker = AdaptiveRateLimitTracker(
54 programmatic_mode=False # We want to persist rate limit data
55 )
57 @abstractmethod
58 def can_handle(self, url: str) -> bool:
59 """
60 Check if this downloader can handle the given URL.
62 Args:
63 url: The URL to check
65 Returns:
66 True if this downloader can handle the URL
67 """
68 pass
70 @abstractmethod
71 def download(
72 self, url: str, content_type: ContentType = ContentType.PDF
73 ) -> Optional[bytes]:
74 """
75 Download content from the given URL.
77 Args:
78 url: The URL to download from
79 content_type: Type of content to download (PDF or TEXT)
81 Returns:
82 Content as bytes, or None if download failed
83 For TEXT type, returns UTF-8 encoded text as bytes
84 """
85 pass
87 def download_pdf(self, url: str) -> Optional[bytes]:
88 """
89 Convenience method to download PDF.
91 Args:
92 url: The URL to download from
94 Returns:
95 PDF content as bytes, or None if download failed
96 """
97 return self.download(url, ContentType.PDF)
99 def download_with_result(
100 self, url: str, content_type: ContentType = ContentType.PDF
101 ) -> DownloadResult:
102 """
103 Download content and return detailed result with skip reason.
105 Args:
106 url: The URL to download from
107 content_type: Type of content to download
109 Returns:
110 DownloadResult with content and/or skip reason
111 """
112 # Default implementation - derived classes should override for specific reasons
113 content = self.download(url, content_type)
114 if content:
115 return DownloadResult(content=content, is_success=True)
116 else:
117 return DownloadResult(
118 skip_reason="Download failed - content not available"
119 )
121 def download_text(self, url: str) -> Optional[str]:
122 """
123 Convenience method to download and return text content.
125 Args:
126 url: The URL to download from
128 Returns:
129 Text content as string, or None if download failed
130 """
131 content = self.download(url, ContentType.TEXT)
132 if content:
133 try:
134 return content.decode("utf-8")
135 except UnicodeDecodeError:
136 logger.exception(f"Failed to decode text content from {url}")
137 return None
139 def _is_pdf_content(self, response: requests.Response) -> bool:
140 """
141 Check if response contains PDF content.
143 Args:
144 response: The response to check
146 Returns:
147 True if response appears to contain PDF content
148 """
149 content_type = response.headers.get("content-type", "").lower()
151 # Check content type
152 if "pdf" in content_type:
153 return True
155 # Check if content starts with PDF magic bytes
156 if len(response.content) > 4: 156 ↛ 159line 156 didn't jump to line 159 because the condition on line 156 was always true
157 return response.content[:4] == b"%PDF"
159 return False
161 def _download_pdf(
162 self, url: str, headers: Optional[Dict[str, str]] = None
163 ) -> Optional[bytes]:
164 """
165 Helper method to download PDF with error handling and retry logic.
166 Uses our optimized adaptive rate limiting/retry system.
168 Args:
169 url: The URL to download
170 headers: Optional additional headers
172 Returns:
173 PDF content as bytes, or None if download failed
174 """
175 # Extract domain for rate limiting (each domain gets its own rate limit)
176 domain = urlparse(url).netloc
177 engine_type = f"pdf_download_{domain}"
179 max_attempts = 3
181 logger.debug(
182 f"Downloading PDF from {url} with adaptive rate limiting (max {max_attempts} attempts)"
183 )
185 for attempt in range(1, max_attempts + 1): 185 ↛ 314line 185 didn't jump to line 314 because the loop on line 185 didn't complete
186 # Apply adaptive rate limiting before the request
187 wait_time = self.rate_tracker.apply_rate_limit(engine_type)
189 try:
190 # Prepare headers
191 if headers:
192 request_headers = self.session.headers.copy()
193 request_headers.update(headers)
194 else:
195 request_headers = self.session.headers
197 # Make the request
198 response = self.session.get(
199 url,
200 headers=request_headers,
201 timeout=self.timeout,
202 allow_redirects=True,
203 )
205 # Check response
206 if response.status_code == 200:
207 if self._is_pdf_content(response):
208 logger.debug(
209 f"Successfully downloaded PDF from {url} on attempt {attempt}"
210 )
211 # Record successful outcome
212 self.rate_tracker.record_outcome(
213 engine_type=engine_type,
214 wait_time=wait_time,
215 success=True,
216 retry_count=attempt,
217 search_result_count=1, # We got the PDF
218 )
219 return response.content
220 else:
221 logger.warning(
222 f"Response is not a PDF: {response.headers.get('content-type', 'unknown')}"
223 )
224 # Record failure but don't retry for wrong content type
225 self.rate_tracker.record_outcome(
226 engine_type=engine_type,
227 wait_time=wait_time,
228 success=False,
229 retry_count=attempt,
230 error_type="NotPDF",
231 )
232 return None
233 elif response.status_code in [
234 429,
235 503,
236 ]: # Rate limit or service unavailable
237 logger.warning(
238 f"Attempt {attempt}/{max_attempts} - HTTP {response.status_code} from {url}"
239 )
240 # Record rate limit failure
241 self.rate_tracker.record_outcome(
242 engine_type=engine_type,
243 wait_time=wait_time,
244 success=False,
245 retry_count=attempt,
246 error_type=f"HTTP_{response.status_code}",
247 )
248 if attempt == max_attempts:
249 logger.error(
250 f"Failed to download from {url}: HTTP {response.status_code} after {max_attempts} attempts"
251 )
252 return None
253 # Continue retry loop with adaptive wait
254 continue
255 else:
256 logger.warning(
257 f"Failed to download from {url}: HTTP {response.status_code}"
258 )
259 # Record failure but don't retry for other status codes
260 self.rate_tracker.record_outcome(
261 engine_type=engine_type,
262 wait_time=wait_time,
263 success=False,
264 retry_count=attempt,
265 error_type=f"HTTP_{response.status_code}",
266 )
267 return None
269 except (
270 requests.exceptions.Timeout,
271 requests.exceptions.ConnectionError,
272 ) as e:
273 # Record network failure
274 self.rate_tracker.record_outcome(
275 engine_type=engine_type,
276 wait_time=wait_time,
277 success=False,
278 retry_count=attempt,
279 error_type=type(e).__name__,
280 )
281 if attempt == max_attempts:
282 logger.exception(
283 f"{type(e).__name__} downloading from {url} after {max_attempts} attempts"
284 )
285 return None
286 else:
287 logger.warning(
288 f"Attempt {attempt}/{max_attempts} - {type(e).__name__} downloading from {url}"
289 )
290 continue # Retry with adaptive wait
291 except requests.exceptions.RequestException as e:
292 logger.exception(f"Request error downloading from {url}")
293 # Record failure but don't retry
294 self.rate_tracker.record_outcome(
295 engine_type=engine_type,
296 wait_time=wait_time,
297 success=False,
298 retry_count=attempt,
299 error_type=type(e).__name__,
300 )
301 return None
302 except Exception:
303 logger.exception(f"Unexpected error downloading from {url}")
304 # Record failure but don't retry
305 self.rate_tracker.record_outcome(
306 engine_type=engine_type,
307 wait_time=wait_time,
308 success=False,
309 retry_count=attempt,
310 error_type="UnexpectedError",
311 )
312 return None
314 return None
316 @staticmethod
317 def extract_text_from_pdf(pdf_content: bytes) -> Optional[str]:
318 """
319 Extract text from PDF content using in-memory processing.
321 This is part of the public API and can be used by other modules.
323 Args:
324 pdf_content: PDF file content as bytes
326 Returns:
327 Extracted text, or None if extraction failed
328 """
329 try:
330 import io
332 # Use pypdf for in-memory PDF text extraction (no disk writes)
333 try:
334 from pypdf import PdfReader
335 except ImportError:
336 from PyPDF2 import PdfReader
338 pdf_file = io.BytesIO(pdf_content)
339 pdf_reader = PdfReader(pdf_file)
341 text_content = []
342 for page in pdf_reader.pages:
343 text = page.extract_text()
344 if text:
345 text_content.append(text)
347 full_text = "\n".join(text_content)
348 return full_text if full_text.strip() else None
350 except Exception:
351 logger.exception("Failed to extract text from PDF")
352 return None
354 def _fetch_text_from_api(self, url: str) -> Optional[str]:
355 """
356 Fetch full text directly from API.
358 This is a placeholder - derived classes should implement
359 API-specific text fetching logic.
361 Args:
362 url: The URL or identifier
364 Returns:
365 Full text content, or None if not available
366 """
367 return None
369 def get_metadata(self, url: str) -> Dict[str, Any]:
370 """
371 Get metadata about the resource (optional override).
373 Args:
374 url: The URL to get metadata for
376 Returns:
377 Dictionary with metadata
378 """
379 return {}