Coverage for src / local_deep_research / research_library / downloaders / base.py: 93%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Base Academic Content Downloader Abstract Class
3"""
5from abc import ABC, abstractmethod
6from typing import Optional, Dict, Any, NamedTuple
7from enum import Enum
8import requests
9from urllib.parse import urlparse
10from loguru import logger
12# Import our adaptive rate limiting system
13from ...web_search_engines.rate_limiting import (
14 AdaptiveRateLimitTracker,
15)
16from ...security import SafeSession
18# Import centralized User-Agent from constants
19from ...constants import USER_AGENT # noqa: F401 - re-exported for backward compatibility
22class ContentType(Enum):
23 """Supported content types for download."""
25 PDF = "pdf"
26 TEXT = "text"
29class DownloadResult(NamedTuple):
30 """Result of a download attempt."""
32 content: Optional[bytes] = None
33 skip_reason: Optional[str] = None
34 is_success: bool = False
35 status_code: Optional[int] = None
38class BaseDownloader(ABC):
39 """Abstract base class for academic content downloaders."""
41 def __init__(self, timeout: int = 30):
42 """
43 Initialize the downloader.
45 Args:
46 timeout: Request timeout in seconds
47 """
48 self.timeout = timeout
49 self.session = SafeSession()
50 self.session.headers.update({"User-Agent": USER_AGENT})
52 # Initialize rate limiter for PDF downloads
53 # We'll use domain-specific rate limiting
54 self.rate_tracker = AdaptiveRateLimitTracker(
55 programmatic_mode=False # We want to persist rate limit data
56 )
58 def close(self):
59 """
60 Close the HTTP session and clean up resources.
62 Call this method when done using the downloader to prevent
63 connection/file descriptor leaks.
64 """
65 if hasattr(self, "session") and self.session:
66 try:
67 self.session.close()
68 except Exception:
69 logger.exception("Error closing downloader session")
70 finally:
71 self.session = None # type: ignore[assignment]
73 def __del__(self):
74 """Destructor to ensure session is closed."""
75 self.close()
77 def __enter__(self):
78 """Context manager entry."""
79 return self
81 def __exit__(self, exc_type, exc_val, exc_tb):
82 """Context manager exit - ensures session cleanup."""
83 self.close()
84 return False
86 @abstractmethod
87 def can_handle(self, url: str) -> bool:
88 """
89 Check if this downloader can handle the given URL.
91 Args:
92 url: The URL to check
94 Returns:
95 True if this downloader can handle the URL
96 """
97 pass
99 @abstractmethod
100 def download(
101 self, url: str, content_type: ContentType = ContentType.PDF
102 ) -> Optional[bytes]:
103 """
104 Download content from the given URL.
106 Args:
107 url: The URL to download from
108 content_type: Type of content to download (PDF or TEXT)
110 Returns:
111 Content as bytes, or None if download failed
112 For TEXT type, returns UTF-8 encoded text as bytes
113 """
114 pass
116 def download_pdf(self, url: str) -> Optional[bytes]:
117 """
118 Convenience method to download PDF.
120 Args:
121 url: The URL to download from
123 Returns:
124 PDF content as bytes, or None if download failed
125 """
126 return self.download(url, ContentType.PDF)
128 def download_with_result(
129 self, url: str, content_type: ContentType = ContentType.PDF
130 ) -> DownloadResult:
131 """
132 Download content and return detailed result with skip reason.
134 Args:
135 url: The URL to download from
136 content_type: Type of content to download
138 Returns:
139 DownloadResult with content and/or skip reason
140 """
141 # Default implementation - derived classes should override for specific reasons
142 content = self.download(url, content_type)
143 if content:
144 return DownloadResult(content=content, is_success=True)
145 return DownloadResult(
146 skip_reason="Download failed - content not available"
147 )
149 def download_text(self, url: str) -> Optional[str]:
150 """
151 Convenience method to download and return text content.
153 Args:
154 url: The URL to download from
156 Returns:
157 Text content as string, or None if download failed
158 """
159 content = self.download(url, ContentType.TEXT)
160 if content:
161 try:
162 return content.decode("utf-8")
163 except UnicodeDecodeError:
164 logger.exception(f"Failed to decode text content from {url}")
165 return None
167 def _is_pdf_content(self, response: requests.Response) -> bool:
168 """
169 Check if response contains PDF content.
171 Args:
172 response: The response to check
174 Returns:
175 True if response appears to contain PDF content
176 """
177 content_type = response.headers.get("content-type", "").lower()
179 # Check content type
180 if "pdf" in content_type:
181 return True
183 # Check if content starts with PDF magic bytes
184 if len(response.content) > 4: 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was always true
185 return response.content[:4] == b"%PDF"
187 return False
189 def _download_pdf(
190 self, url: str, headers: Optional[Dict[str, str]] = None
191 ) -> Optional[bytes]:
192 """
193 Helper method to download PDF with error handling and retry logic.
194 Uses our optimized adaptive rate limiting/retry system.
196 Args:
197 url: The URL to download
198 headers: Optional additional headers
200 Returns:
201 PDF content as bytes, or None if download failed
202 """
203 # Extract domain for rate limiting (each domain gets its own rate limit)
204 domain = urlparse(url).netloc
205 engine_type = f"pdf_download_{domain}"
207 max_attempts = 3
209 logger.debug(
210 f"Downloading PDF from {url} with adaptive rate limiting (max {max_attempts} attempts)"
211 )
213 for attempt in range(1, max_attempts + 1): 213 ↛ 339line 213 didn't jump to line 339 because the loop on line 213 didn't complete
214 # Apply adaptive rate limiting before the request
215 wait_time = self.rate_tracker.apply_rate_limit(engine_type)
217 try:
218 # Prepare headers
219 if headers:
220 request_headers = dict(self.session.headers)
221 request_headers.update(headers)
222 else:
223 request_headers = dict(self.session.headers)
225 # Make the request
226 response = self.session.get(
227 url,
228 headers=request_headers,
229 timeout=self.timeout,
230 allow_redirects=True,
231 )
233 # Check response
234 if response.status_code == 200:
235 if self._is_pdf_content(response):
236 logger.debug(
237 f"Successfully downloaded PDF from {url} on attempt {attempt}"
238 )
239 # Record successful outcome
240 self.rate_tracker.record_outcome(
241 engine_type=engine_type,
242 wait_time=wait_time,
243 success=True,
244 retry_count=attempt,
245 search_result_count=1, # We got the PDF
246 )
247 return response.content
248 logger.warning(
249 f"Response is not a PDF: {response.headers.get('content-type', 'unknown')}"
250 )
251 # Record failure but don't retry for wrong content type
252 self.rate_tracker.record_outcome(
253 engine_type=engine_type,
254 wait_time=wait_time,
255 success=False,
256 retry_count=attempt,
257 error_type="NotPDF",
258 )
259 return None
260 if response.status_code in [
261 429,
262 503,
263 ]: # Rate limit or service unavailable
264 logger.warning(
265 f"Attempt {attempt}/{max_attempts} - HTTP {response.status_code} from {url}"
266 )
267 # Record rate limit failure
268 self.rate_tracker.record_outcome(
269 engine_type=engine_type,
270 wait_time=wait_time,
271 success=False,
272 retry_count=attempt,
273 error_type=f"HTTP_{response.status_code}",
274 )
275 if attempt == max_attempts:
276 logger.error(
277 f"Failed to download from {url}: HTTP {response.status_code} after {max_attempts} attempts"
278 )
279 return None
280 # Continue retry loop with adaptive wait
281 continue
282 logger.warning(
283 f"Failed to download from {url}: HTTP {response.status_code}"
284 )
285 # Record failure but don't retry for other status codes
286 self.rate_tracker.record_outcome(
287 engine_type=engine_type,
288 wait_time=wait_time,
289 success=False,
290 retry_count=attempt,
291 error_type=f"HTTP_{response.status_code}",
292 )
293 return None
295 except (
296 requests.exceptions.Timeout,
297 requests.exceptions.ConnectionError,
298 ) as e:
299 # Record network failure
300 self.rate_tracker.record_outcome(
301 engine_type=engine_type,
302 wait_time=wait_time,
303 success=False,
304 retry_count=attempt,
305 error_type=type(e).__name__,
306 )
307 if attempt == max_attempts:
308 logger.exception(
309 f"{type(e).__name__} downloading from {url} after {max_attempts} attempts"
310 )
311 return None
312 logger.warning(
313 f"Attempt {attempt}/{max_attempts} - {type(e).__name__} downloading from {url}"
314 )
315 continue # Retry with adaptive wait
316 except requests.exceptions.RequestException as e:
317 logger.exception(f"Request error downloading from {url}")
318 # Record failure but don't retry
319 self.rate_tracker.record_outcome(
320 engine_type=engine_type,
321 wait_time=wait_time,
322 success=False,
323 retry_count=attempt,
324 error_type=type(e).__name__,
325 )
326 return None
327 except Exception:
328 logger.exception(f"Unexpected error downloading from {url}")
329 # Record failure but don't retry
330 self.rate_tracker.record_outcome(
331 engine_type=engine_type,
332 wait_time=wait_time,
333 success=False,
334 retry_count=attempt,
335 error_type="UnexpectedError",
336 )
337 return None
339 return None
341 @staticmethod
342 def extract_text_from_pdf(pdf_content: bytes) -> Optional[str]:
343 """
344 Extract text from PDF content using in-memory processing.
346 This is part of the public API and can be used by other modules.
348 Args:
349 pdf_content: PDF file content as bytes
351 Returns:
352 Extracted text, or None if extraction failed
353 """
354 try:
355 import io
357 # Use pypdf for in-memory PDF text extraction (no disk writes)
358 from pypdf import PdfReader
360 pdf_file = io.BytesIO(pdf_content)
361 pdf_reader = PdfReader(pdf_file)
363 text_content = []
364 for page in pdf_reader.pages:
365 text = page.extract_text()
366 if text:
367 text_content.append(text)
369 full_text = "\n".join(text_content)
370 return full_text if full_text.strip() else None
372 except Exception:
373 logger.exception("Failed to extract text from PDF")
374 return None
376 def _fetch_text_from_api(self, url: str) -> Optional[str]:
377 """
378 Fetch full text directly from API.
380 This is a placeholder - derived classes should implement
381 API-specific text fetching logic.
383 Args:
384 url: The URL or identifier
386 Returns:
387 Full text content, or None if not available
388 """
389 return None
391 def get_metadata(self, url: str) -> Dict[str, Any]:
392 """
393 Get metadata about the resource (optional override).
395 Args:
396 url: The URL to get metadata for
398 Returns:
399 Dictionary with metadata
400 """
401 return {}