Coverage for src / local_deep_research / research_library / downloaders / base.py: 87%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Base Academic Content Downloader Abstract Class 

3""" 

4 

5from abc import ABC, abstractmethod 

6from typing import Optional, Dict, Any, NamedTuple 

7from enum import Enum 

8import requests 

9from urllib.parse import urlparse 

10from loguru import logger 

11 

12# Import our adaptive rate limiting system 

13from ...web_search_engines.rate_limiting import ( 

14 AdaptiveRateLimitTracker, 

15) 

16from ...security import SafeSession 

17 

18# Import centralized User-Agent from constants 

19from ...constants import USER_AGENT # noqa: F401 - re-exported for backward compatibility 

20 

21 

22class ContentType(Enum): 

23 """Supported content types for download.""" 

24 

25 PDF = "pdf" 

26 TEXT = "text" 

27 

28 

29class DownloadResult(NamedTuple): 

30 """Result of a download attempt.""" 

31 

32 content: Optional[bytes] = None 

33 skip_reason: Optional[str] = None 

34 is_success: bool = False 

35 

36 

37class BaseDownloader(ABC): 

38 """Abstract base class for academic content downloaders.""" 

39 

40 def __init__(self, timeout: int = 30): 

41 """ 

42 Initialize the downloader. 

43 

44 Args: 

45 timeout: Request timeout in seconds 

46 """ 

47 self.timeout = timeout 

48 self.session = SafeSession() 

49 self.session.headers.update({"User-Agent": USER_AGENT}) 

50 

51 # Initialize rate limiter for PDF downloads 

52 # We'll use domain-specific rate limiting 

53 self.rate_tracker = AdaptiveRateLimitTracker( 

54 programmatic_mode=False # We want to persist rate limit data 

55 ) 

56 

57 def close(self): 

58 """ 

59 Close the HTTP session and clean up resources. 

60 

61 Call this method when done using the downloader to prevent 

62 connection/file descriptor leaks. 

63 """ 

64 if hasattr(self, "session") and self.session: 

65 try: 

66 self.session.close() 

67 except Exception: 

68 logger.exception("Error closing downloader session") 

69 finally: 

70 self.session = None 

71 

72 def __del__(self): 

73 """Destructor to ensure session is closed.""" 

74 self.close() 

75 

76 def __enter__(self): 

77 """Context manager entry.""" 

78 return self 

79 

80 def __exit__(self, exc_type, exc_val, exc_tb): 

81 """Context manager exit - ensures session cleanup.""" 

82 self.close() 

83 return False 

84 

85 @abstractmethod 

86 def can_handle(self, url: str) -> bool: 

87 """ 

88 Check if this downloader can handle the given URL. 

89 

90 Args: 

91 url: The URL to check 

92 

93 Returns: 

94 True if this downloader can handle the URL 

95 """ 

96 pass 

97 

98 @abstractmethod 

99 def download( 

100 self, url: str, content_type: ContentType = ContentType.PDF 

101 ) -> Optional[bytes]: 

102 """ 

103 Download content from the given URL. 

104 

105 Args: 

106 url: The URL to download from 

107 content_type: Type of content to download (PDF or TEXT) 

108 

109 Returns: 

110 Content as bytes, or None if download failed 

111 For TEXT type, returns UTF-8 encoded text as bytes 

112 """ 

113 pass 

114 

115 def download_pdf(self, url: str) -> Optional[bytes]: 

116 """ 

117 Convenience method to download PDF. 

118 

119 Args: 

120 url: The URL to download from 

121 

122 Returns: 

123 PDF content as bytes, or None if download failed 

124 """ 

125 return self.download(url, ContentType.PDF) 

126 

127 def download_with_result( 

128 self, url: str, content_type: ContentType = ContentType.PDF 

129 ) -> DownloadResult: 

130 """ 

131 Download content and return detailed result with skip reason. 

132 

133 Args: 

134 url: The URL to download from 

135 content_type: Type of content to download 

136 

137 Returns: 

138 DownloadResult with content and/or skip reason 

139 """ 

140 # Default implementation - derived classes should override for specific reasons 

141 content = self.download(url, content_type) 

142 if content: 

143 return DownloadResult(content=content, is_success=True) 

144 else: 

145 return DownloadResult( 

146 skip_reason="Download failed - content not available" 

147 ) 

148 

149 def download_text(self, url: str) -> Optional[str]: 

150 """ 

151 Convenience method to download and return text content. 

152 

153 Args: 

154 url: The URL to download from 

155 

156 Returns: 

157 Text content as string, or None if download failed 

158 """ 

159 content = self.download(url, ContentType.TEXT) 

160 if content: 

161 try: 

162 return content.decode("utf-8") 

163 except UnicodeDecodeError: 

164 logger.exception(f"Failed to decode text content from {url}") 

165 return None 

166 

167 def _is_pdf_content(self, response: requests.Response) -> bool: 

168 """ 

169 Check if response contains PDF content. 

170 

171 Args: 

172 response: The response to check 

173 

174 Returns: 

175 True if response appears to contain PDF content 

176 """ 

177 content_type = response.headers.get("content-type", "").lower() 

178 

179 # Check content type 

180 if "pdf" in content_type: 

181 return True 

182 

183 # Check if content starts with PDF magic bytes 

184 if len(response.content) > 4: 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was always true

185 return response.content[:4] == b"%PDF" 

186 

187 return False 

188 

189 def _download_pdf( 

190 self, url: str, headers: Optional[Dict[str, str]] = None 

191 ) -> Optional[bytes]: 

192 """ 

193 Helper method to download PDF with error handling and retry logic. 

194 Uses our optimized adaptive rate limiting/retry system. 

195 

196 Args: 

197 url: The URL to download 

198 headers: Optional additional headers 

199 

200 Returns: 

201 PDF content as bytes, or None if download failed 

202 """ 

203 # Extract domain for rate limiting (each domain gets its own rate limit) 

204 domain = urlparse(url).netloc 

205 engine_type = f"pdf_download_{domain}" 

206 

207 max_attempts = 3 

208 

209 logger.debug( 

210 f"Downloading PDF from {url} with adaptive rate limiting (max {max_attempts} attempts)" 

211 ) 

212 

213 for attempt in range(1, max_attempts + 1): 213 ↛ 342line 213 didn't jump to line 342 because the loop on line 213 didn't complete

214 # Apply adaptive rate limiting before the request 

215 wait_time = self.rate_tracker.apply_rate_limit(engine_type) 

216 

217 try: 

218 # Prepare headers 

219 if headers: 

220 request_headers = self.session.headers.copy() 

221 request_headers.update(headers) 

222 else: 

223 request_headers = self.session.headers 

224 

225 # Make the request 

226 response = self.session.get( 

227 url, 

228 headers=request_headers, 

229 timeout=self.timeout, 

230 allow_redirects=True, 

231 ) 

232 

233 # Check response 

234 if response.status_code == 200: 

235 if self._is_pdf_content(response): 

236 logger.debug( 

237 f"Successfully downloaded PDF from {url} on attempt {attempt}" 

238 ) 

239 # Record successful outcome 

240 self.rate_tracker.record_outcome( 

241 engine_type=engine_type, 

242 wait_time=wait_time, 

243 success=True, 

244 retry_count=attempt, 

245 search_result_count=1, # We got the PDF 

246 ) 

247 return response.content 

248 else: 

249 logger.warning( 

250 f"Response is not a PDF: {response.headers.get('content-type', 'unknown')}" 

251 ) 

252 # Record failure but don't retry for wrong content type 

253 self.rate_tracker.record_outcome( 

254 engine_type=engine_type, 

255 wait_time=wait_time, 

256 success=False, 

257 retry_count=attempt, 

258 error_type="NotPDF", 

259 ) 

260 return None 

261 elif response.status_code in [ 

262 429, 

263 503, 

264 ]: # Rate limit or service unavailable 

265 logger.warning( 

266 f"Attempt {attempt}/{max_attempts} - HTTP {response.status_code} from {url}" 

267 ) 

268 # Record rate limit failure 

269 self.rate_tracker.record_outcome( 

270 engine_type=engine_type, 

271 wait_time=wait_time, 

272 success=False, 

273 retry_count=attempt, 

274 error_type=f"HTTP_{response.status_code}", 

275 ) 

276 if attempt == max_attempts: 

277 logger.error( 

278 f"Failed to download from {url}: HTTP {response.status_code} after {max_attempts} attempts" 

279 ) 

280 return None 

281 # Continue retry loop with adaptive wait 

282 continue 

283 else: 

284 logger.warning( 

285 f"Failed to download from {url}: HTTP {response.status_code}" 

286 ) 

287 # Record failure but don't retry for other status codes 

288 self.rate_tracker.record_outcome( 

289 engine_type=engine_type, 

290 wait_time=wait_time, 

291 success=False, 

292 retry_count=attempt, 

293 error_type=f"HTTP_{response.status_code}", 

294 ) 

295 return None 

296 

297 except ( 

298 requests.exceptions.Timeout, 

299 requests.exceptions.ConnectionError, 

300 ) as e: 

301 # Record network failure 

302 self.rate_tracker.record_outcome( 

303 engine_type=engine_type, 

304 wait_time=wait_time, 

305 success=False, 

306 retry_count=attempt, 

307 error_type=type(e).__name__, 

308 ) 

309 if attempt == max_attempts: 

310 logger.exception( 

311 f"{type(e).__name__} downloading from {url} after {max_attempts} attempts" 

312 ) 

313 return None 

314 else: 

315 logger.warning( 

316 f"Attempt {attempt}/{max_attempts} - {type(e).__name__} downloading from {url}" 

317 ) 

318 continue # Retry with adaptive wait 

319 except requests.exceptions.RequestException as e: 

320 logger.exception(f"Request error downloading from {url}") 

321 # Record failure but don't retry 

322 self.rate_tracker.record_outcome( 

323 engine_type=engine_type, 

324 wait_time=wait_time, 

325 success=False, 

326 retry_count=attempt, 

327 error_type=type(e).__name__, 

328 ) 

329 return None 

330 except Exception: 

331 logger.exception(f"Unexpected error downloading from {url}") 

332 # Record failure but don't retry 

333 self.rate_tracker.record_outcome( 

334 engine_type=engine_type, 

335 wait_time=wait_time, 

336 success=False, 

337 retry_count=attempt, 

338 error_type="UnexpectedError", 

339 ) 

340 return None 

341 

342 return None 

343 

344 @staticmethod 

345 def extract_text_from_pdf(pdf_content: bytes) -> Optional[str]: 

346 """ 

347 Extract text from PDF content using in-memory processing. 

348 

349 This is part of the public API and can be used by other modules. 

350 

351 Args: 

352 pdf_content: PDF file content as bytes 

353 

354 Returns: 

355 Extracted text, or None if extraction failed 

356 """ 

357 try: 

358 import io 

359 

360 # Use pypdf for in-memory PDF text extraction (no disk writes) 

361 try: 

362 from pypdf import PdfReader 

363 except ImportError: 

364 from PyPDF2 import PdfReader 

365 

366 pdf_file = io.BytesIO(pdf_content) 

367 pdf_reader = PdfReader(pdf_file) 

368 

369 text_content = [] 

370 for page in pdf_reader.pages: 

371 text = page.extract_text() 

372 if text: 

373 text_content.append(text) 

374 

375 full_text = "\n".join(text_content) 

376 return full_text if full_text.strip() else None 

377 

378 except Exception: 

379 logger.exception("Failed to extract text from PDF") 

380 return None 

381 

382 def _fetch_text_from_api(self, url: str) -> Optional[str]: 

383 """ 

384 Fetch full text directly from API. 

385 

386 This is a placeholder - derived classes should implement 

387 API-specific text fetching logic. 

388 

389 Args: 

390 url: The URL or identifier 

391 

392 Returns: 

393 Full text content, or None if not available 

394 """ 

395 return None 

396 

397 def get_metadata(self, url: str) -> Dict[str, Any]: 

398 """ 

399 Get metadata about the resource (optional override). 

400 

401 Args: 

402 url: The URL to get metadata for 

403 

404 Returns: 

405 Dictionary with metadata 

406 """ 

407 return {}