Coverage for src / local_deep_research / research_library / downloaders / base.py: 87%
135 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Base Academic Content Downloader Abstract Class
3"""
5from abc import ABC, abstractmethod
6from typing import Optional, Dict, Any, NamedTuple
7from enum import Enum
8import requests
9from urllib.parse import urlparse
10from loguru import logger
12# Import our adaptive rate limiting system
13from ...web_search_engines.rate_limiting import (
14 AdaptiveRateLimitTracker,
15)
16from ...security import SafeSession
18# Import centralized User-Agent from constants
19from ...constants import USER_AGENT # noqa: F401 - re-exported for backward compatibility
22class ContentType(Enum):
23 """Supported content types for download."""
25 PDF = "pdf"
26 TEXT = "text"
29class DownloadResult(NamedTuple):
30 """Result of a download attempt."""
32 content: Optional[bytes] = None
33 skip_reason: Optional[str] = None
34 is_success: bool = False
37class BaseDownloader(ABC):
38 """Abstract base class for academic content downloaders."""
40 def __init__(self, timeout: int = 30):
41 """
42 Initialize the downloader.
44 Args:
45 timeout: Request timeout in seconds
46 """
47 self.timeout = timeout
48 self.session = SafeSession()
49 self.session.headers.update({"User-Agent": USER_AGENT})
51 # Initialize rate limiter for PDF downloads
52 # We'll use domain-specific rate limiting
53 self.rate_tracker = AdaptiveRateLimitTracker(
54 programmatic_mode=False # We want to persist rate limit data
55 )
57 def close(self):
58 """
59 Close the HTTP session and clean up resources.
61 Call this method when done using the downloader to prevent
62 connection/file descriptor leaks.
63 """
64 if hasattr(self, "session") and self.session:
65 try:
66 self.session.close()
67 except Exception:
68 logger.exception("Error closing downloader session")
69 finally:
70 self.session = None
72 def __del__(self):
73 """Destructor to ensure session is closed."""
74 self.close()
76 def __enter__(self):
77 """Context manager entry."""
78 return self
80 def __exit__(self, exc_type, exc_val, exc_tb):
81 """Context manager exit - ensures session cleanup."""
82 self.close()
83 return False
85 @abstractmethod
86 def can_handle(self, url: str) -> bool:
87 """
88 Check if this downloader can handle the given URL.
90 Args:
91 url: The URL to check
93 Returns:
94 True if this downloader can handle the URL
95 """
96 pass
98 @abstractmethod
99 def download(
100 self, url: str, content_type: ContentType = ContentType.PDF
101 ) -> Optional[bytes]:
102 """
103 Download content from the given URL.
105 Args:
106 url: The URL to download from
107 content_type: Type of content to download (PDF or TEXT)
109 Returns:
110 Content as bytes, or None if download failed
111 For TEXT type, returns UTF-8 encoded text as bytes
112 """
113 pass
115 def download_pdf(self, url: str) -> Optional[bytes]:
116 """
117 Convenience method to download PDF.
119 Args:
120 url: The URL to download from
122 Returns:
123 PDF content as bytes, or None if download failed
124 """
125 return self.download(url, ContentType.PDF)
127 def download_with_result(
128 self, url: str, content_type: ContentType = ContentType.PDF
129 ) -> DownloadResult:
130 """
131 Download content and return detailed result with skip reason.
133 Args:
134 url: The URL to download from
135 content_type: Type of content to download
137 Returns:
138 DownloadResult with content and/or skip reason
139 """
140 # Default implementation - derived classes should override for specific reasons
141 content = self.download(url, content_type)
142 if content:
143 return DownloadResult(content=content, is_success=True)
144 else:
145 return DownloadResult(
146 skip_reason="Download failed - content not available"
147 )
149 def download_text(self, url: str) -> Optional[str]:
150 """
151 Convenience method to download and return text content.
153 Args:
154 url: The URL to download from
156 Returns:
157 Text content as string, or None if download failed
158 """
159 content = self.download(url, ContentType.TEXT)
160 if content:
161 try:
162 return content.decode("utf-8")
163 except UnicodeDecodeError:
164 logger.exception(f"Failed to decode text content from {url}")
165 return None
167 def _is_pdf_content(self, response: requests.Response) -> bool:
168 """
169 Check if response contains PDF content.
171 Args:
172 response: The response to check
174 Returns:
175 True if response appears to contain PDF content
176 """
177 content_type = response.headers.get("content-type", "").lower()
179 # Check content type
180 if "pdf" in content_type:
181 return True
183 # Check if content starts with PDF magic bytes
184 if len(response.content) > 4: 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was always true
185 return response.content[:4] == b"%PDF"
187 return False
189 def _download_pdf(
190 self, url: str, headers: Optional[Dict[str, str]] = None
191 ) -> Optional[bytes]:
192 """
193 Helper method to download PDF with error handling and retry logic.
194 Uses our optimized adaptive rate limiting/retry system.
196 Args:
197 url: The URL to download
198 headers: Optional additional headers
200 Returns:
201 PDF content as bytes, or None if download failed
202 """
203 # Extract domain for rate limiting (each domain gets its own rate limit)
204 domain = urlparse(url).netloc
205 engine_type = f"pdf_download_{domain}"
207 max_attempts = 3
209 logger.debug(
210 f"Downloading PDF from {url} with adaptive rate limiting (max {max_attempts} attempts)"
211 )
213 for attempt in range(1, max_attempts + 1): 213 ↛ 342line 213 didn't jump to line 342 because the loop on line 213 didn't complete
214 # Apply adaptive rate limiting before the request
215 wait_time = self.rate_tracker.apply_rate_limit(engine_type)
217 try:
218 # Prepare headers
219 if headers:
220 request_headers = self.session.headers.copy()
221 request_headers.update(headers)
222 else:
223 request_headers = self.session.headers
225 # Make the request
226 response = self.session.get(
227 url,
228 headers=request_headers,
229 timeout=self.timeout,
230 allow_redirects=True,
231 )
233 # Check response
234 if response.status_code == 200:
235 if self._is_pdf_content(response):
236 logger.debug(
237 f"Successfully downloaded PDF from {url} on attempt {attempt}"
238 )
239 # Record successful outcome
240 self.rate_tracker.record_outcome(
241 engine_type=engine_type,
242 wait_time=wait_time,
243 success=True,
244 retry_count=attempt,
245 search_result_count=1, # We got the PDF
246 )
247 return response.content
248 else:
249 logger.warning(
250 f"Response is not a PDF: {response.headers.get('content-type', 'unknown')}"
251 )
252 # Record failure but don't retry for wrong content type
253 self.rate_tracker.record_outcome(
254 engine_type=engine_type,
255 wait_time=wait_time,
256 success=False,
257 retry_count=attempt,
258 error_type="NotPDF",
259 )
260 return None
261 elif response.status_code in [
262 429,
263 503,
264 ]: # Rate limit or service unavailable
265 logger.warning(
266 f"Attempt {attempt}/{max_attempts} - HTTP {response.status_code} from {url}"
267 )
268 # Record rate limit failure
269 self.rate_tracker.record_outcome(
270 engine_type=engine_type,
271 wait_time=wait_time,
272 success=False,
273 retry_count=attempt,
274 error_type=f"HTTP_{response.status_code}",
275 )
276 if attempt == max_attempts:
277 logger.error(
278 f"Failed to download from {url}: HTTP {response.status_code} after {max_attempts} attempts"
279 )
280 return None
281 # Continue retry loop with adaptive wait
282 continue
283 else:
284 logger.warning(
285 f"Failed to download from {url}: HTTP {response.status_code}"
286 )
287 # Record failure but don't retry for other status codes
288 self.rate_tracker.record_outcome(
289 engine_type=engine_type,
290 wait_time=wait_time,
291 success=False,
292 retry_count=attempt,
293 error_type=f"HTTP_{response.status_code}",
294 )
295 return None
297 except (
298 requests.exceptions.Timeout,
299 requests.exceptions.ConnectionError,
300 ) as e:
301 # Record network failure
302 self.rate_tracker.record_outcome(
303 engine_type=engine_type,
304 wait_time=wait_time,
305 success=False,
306 retry_count=attempt,
307 error_type=type(e).__name__,
308 )
309 if attempt == max_attempts:
310 logger.exception(
311 f"{type(e).__name__} downloading from {url} after {max_attempts} attempts"
312 )
313 return None
314 else:
315 logger.warning(
316 f"Attempt {attempt}/{max_attempts} - {type(e).__name__} downloading from {url}"
317 )
318 continue # Retry with adaptive wait
319 except requests.exceptions.RequestException as e:
320 logger.exception(f"Request error downloading from {url}")
321 # Record failure but don't retry
322 self.rate_tracker.record_outcome(
323 engine_type=engine_type,
324 wait_time=wait_time,
325 success=False,
326 retry_count=attempt,
327 error_type=type(e).__name__,
328 )
329 return None
330 except Exception:
331 logger.exception(f"Unexpected error downloading from {url}")
332 # Record failure but don't retry
333 self.rate_tracker.record_outcome(
334 engine_type=engine_type,
335 wait_time=wait_time,
336 success=False,
337 retry_count=attempt,
338 error_type="UnexpectedError",
339 )
340 return None
342 return None
344 @staticmethod
345 def extract_text_from_pdf(pdf_content: bytes) -> Optional[str]:
346 """
347 Extract text from PDF content using in-memory processing.
349 This is part of the public API and can be used by other modules.
351 Args:
352 pdf_content: PDF file content as bytes
354 Returns:
355 Extracted text, or None if extraction failed
356 """
357 try:
358 import io
360 # Use pypdf for in-memory PDF text extraction (no disk writes)
361 try:
362 from pypdf import PdfReader
363 except ImportError:
364 from PyPDF2 import PdfReader
366 pdf_file = io.BytesIO(pdf_content)
367 pdf_reader = PdfReader(pdf_file)
369 text_content = []
370 for page in pdf_reader.pages:
371 text = page.extract_text()
372 if text:
373 text_content.append(text)
375 full_text = "\n".join(text_content)
376 return full_text if full_text.strip() else None
378 except Exception:
379 logger.exception("Failed to extract text from PDF")
380 return None
382 def _fetch_text_from_api(self, url: str) -> Optional[str]:
383 """
384 Fetch full text directly from API.
386 This is a placeholder - derived classes should implement
387 API-specific text fetching logic.
389 Args:
390 url: The URL or identifier
392 Returns:
393 Full text content, or None if not available
394 """
395 return None
397 def get_metadata(self, url: str) -> Dict[str, Any]:
398 """
399 Get metadata about the resource (optional override).
401 Args:
402 url: The URL to get metadata for
404 Returns:
405 Dictionary with metadata
406 """
407 return {}