Coverage for src / local_deep_research / research_library / downloaders / base.py: 93%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Base Academic Content Downloader Abstract Class 

3""" 

4 

5from abc import ABC, abstractmethod 

6from typing import Optional, Dict, Any, NamedTuple 

7from enum import Enum 

8import requests 

9from urllib.parse import urlparse 

10from loguru import logger 

11 

12# Import our adaptive rate limiting system 

13from ...web_search_engines.rate_limiting import ( 

14 AdaptiveRateLimitTracker, 

15) 

16from ...security import SafeSession 

17 

18# Import centralized User-Agent from constants 

19from ...constants import USER_AGENT # noqa: F401 - re-exported for backward compatibility 

20 

21 

22class ContentType(Enum): 

23 """Supported content types for download.""" 

24 

25 PDF = "pdf" 

26 TEXT = "text" 

27 

28 

29class DownloadResult(NamedTuple): 

30 """Result of a download attempt.""" 

31 

32 content: Optional[bytes] = None 

33 skip_reason: Optional[str] = None 

34 is_success: bool = False 

35 status_code: Optional[int] = None 

36 

37 

38class BaseDownloader(ABC): 

39 """Abstract base class for academic content downloaders.""" 

40 

41 def __init__(self, timeout: int = 30): 

42 """ 

43 Initialize the downloader. 

44 

45 Args: 

46 timeout: Request timeout in seconds 

47 """ 

48 self.timeout = timeout 

49 self.session = SafeSession() 

50 self.session.headers.update({"User-Agent": USER_AGENT}) 

51 

52 # Initialize rate limiter for PDF downloads 

53 # We'll use domain-specific rate limiting 

54 self.rate_tracker = AdaptiveRateLimitTracker( 

55 programmatic_mode=False # We want to persist rate limit data 

56 ) 

57 

58 def close(self): 

59 """ 

60 Close the HTTP session and clean up resources. 

61 

62 Call this method when done using the downloader to prevent 

63 connection/file descriptor leaks. 

64 """ 

65 if hasattr(self, "session") and self.session: 

66 try: 

67 self.session.close() 

68 except Exception: 

69 logger.exception("Error closing downloader session") 

70 finally: 

71 self.session = None # type: ignore[assignment] 

72 

73 def __del__(self): 

74 """Destructor to ensure session is closed.""" 

75 self.close() 

76 

77 def __enter__(self): 

78 """Context manager entry.""" 

79 return self 

80 

81 def __exit__(self, exc_type, exc_val, exc_tb): 

82 """Context manager exit - ensures session cleanup.""" 

83 self.close() 

84 return False 

85 

86 @abstractmethod 

87 def can_handle(self, url: str) -> bool: 

88 """ 

89 Check if this downloader can handle the given URL. 

90 

91 Args: 

92 url: The URL to check 

93 

94 Returns: 

95 True if this downloader can handle the URL 

96 """ 

97 pass 

98 

99 @abstractmethod 

100 def download( 

101 self, url: str, content_type: ContentType = ContentType.PDF 

102 ) -> Optional[bytes]: 

103 """ 

104 Download content from the given URL. 

105 

106 Args: 

107 url: The URL to download from 

108 content_type: Type of content to download (PDF or TEXT) 

109 

110 Returns: 

111 Content as bytes, or None if download failed 

112 For TEXT type, returns UTF-8 encoded text as bytes 

113 """ 

114 pass 

115 

116 def download_pdf(self, url: str) -> Optional[bytes]: 

117 """ 

118 Convenience method to download PDF. 

119 

120 Args: 

121 url: The URL to download from 

122 

123 Returns: 

124 PDF content as bytes, or None if download failed 

125 """ 

126 return self.download(url, ContentType.PDF) 

127 

128 def download_with_result( 

129 self, url: str, content_type: ContentType = ContentType.PDF 

130 ) -> DownloadResult: 

131 """ 

132 Download content and return detailed result with skip reason. 

133 

134 Args: 

135 url: The URL to download from 

136 content_type: Type of content to download 

137 

138 Returns: 

139 DownloadResult with content and/or skip reason 

140 """ 

141 # Default implementation - derived classes should override for specific reasons 

142 content = self.download(url, content_type) 

143 if content: 

144 return DownloadResult(content=content, is_success=True) 

145 return DownloadResult( 

146 skip_reason="Download failed - content not available" 

147 ) 

148 

149 def download_text(self, url: str) -> Optional[str]: 

150 """ 

151 Convenience method to download and return text content. 

152 

153 Args: 

154 url: The URL to download from 

155 

156 Returns: 

157 Text content as string, or None if download failed 

158 """ 

159 content = self.download(url, ContentType.TEXT) 

160 if content: 

161 try: 

162 return content.decode("utf-8") 

163 except UnicodeDecodeError: 

164 logger.exception(f"Failed to decode text content from {url}") 

165 return None 

166 

167 def _is_pdf_content(self, response: requests.Response) -> bool: 

168 """ 

169 Check if response contains PDF content. 

170 

171 Args: 

172 response: The response to check 

173 

174 Returns: 

175 True if response appears to contain PDF content 

176 """ 

177 content_type = response.headers.get("content-type", "").lower() 

178 

179 # Check content type 

180 if "pdf" in content_type: 

181 return True 

182 

183 # Check if content starts with PDF magic bytes 

184 if len(response.content) > 4: 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was always true

185 return response.content[:4] == b"%PDF" 

186 

187 return False 

188 

189 def _download_pdf( 

190 self, url: str, headers: Optional[Dict[str, str]] = None 

191 ) -> Optional[bytes]: 

192 """ 

193 Helper method to download PDF with error handling and retry logic. 

194 Uses our optimized adaptive rate limiting/retry system. 

195 

196 Args: 

197 url: The URL to download 

198 headers: Optional additional headers 

199 

200 Returns: 

201 PDF content as bytes, or None if download failed 

202 """ 

203 # Extract domain for rate limiting (each domain gets its own rate limit) 

204 domain = urlparse(url).netloc 

205 engine_type = f"pdf_download_{domain}" 

206 

207 max_attempts = 3 

208 

209 logger.debug( 

210 f"Downloading PDF from {url} with adaptive rate limiting (max {max_attempts} attempts)" 

211 ) 

212 

213 for attempt in range(1, max_attempts + 1): 213 ↛ 339line 213 didn't jump to line 339 because the loop on line 213 didn't complete

214 # Apply adaptive rate limiting before the request 

215 wait_time = self.rate_tracker.apply_rate_limit(engine_type) 

216 

217 try: 

218 # Prepare headers 

219 if headers: 

220 request_headers = dict(self.session.headers) 

221 request_headers.update(headers) 

222 else: 

223 request_headers = dict(self.session.headers) 

224 

225 # Make the request 

226 response = self.session.get( 

227 url, 

228 headers=request_headers, 

229 timeout=self.timeout, 

230 allow_redirects=True, 

231 ) 

232 

233 # Check response 

234 if response.status_code == 200: 

235 if self._is_pdf_content(response): 

236 logger.debug( 

237 f"Successfully downloaded PDF from {url} on attempt {attempt}" 

238 ) 

239 # Record successful outcome 

240 self.rate_tracker.record_outcome( 

241 engine_type=engine_type, 

242 wait_time=wait_time, 

243 success=True, 

244 retry_count=attempt, 

245 search_result_count=1, # We got the PDF 

246 ) 

247 return response.content 

248 logger.warning( 

249 f"Response is not a PDF: {response.headers.get('content-type', 'unknown')}" 

250 ) 

251 # Record failure but don't retry for wrong content type 

252 self.rate_tracker.record_outcome( 

253 engine_type=engine_type, 

254 wait_time=wait_time, 

255 success=False, 

256 retry_count=attempt, 

257 error_type="NotPDF", 

258 ) 

259 return None 

260 if response.status_code in [ 

261 429, 

262 503, 

263 ]: # Rate limit or service unavailable 

264 logger.warning( 

265 f"Attempt {attempt}/{max_attempts} - HTTP {response.status_code} from {url}" 

266 ) 

267 # Record rate limit failure 

268 self.rate_tracker.record_outcome( 

269 engine_type=engine_type, 

270 wait_time=wait_time, 

271 success=False, 

272 retry_count=attempt, 

273 error_type=f"HTTP_{response.status_code}", 

274 ) 

275 if attempt == max_attempts: 

276 logger.error( 

277 f"Failed to download from {url}: HTTP {response.status_code} after {max_attempts} attempts" 

278 ) 

279 return None 

280 # Continue retry loop with adaptive wait 

281 continue 

282 logger.warning( 

283 f"Failed to download from {url}: HTTP {response.status_code}" 

284 ) 

285 # Record failure but don't retry for other status codes 

286 self.rate_tracker.record_outcome( 

287 engine_type=engine_type, 

288 wait_time=wait_time, 

289 success=False, 

290 retry_count=attempt, 

291 error_type=f"HTTP_{response.status_code}", 

292 ) 

293 return None 

294 

295 except ( 

296 requests.exceptions.Timeout, 

297 requests.exceptions.ConnectionError, 

298 ) as e: 

299 # Record network failure 

300 self.rate_tracker.record_outcome( 

301 engine_type=engine_type, 

302 wait_time=wait_time, 

303 success=False, 

304 retry_count=attempt, 

305 error_type=type(e).__name__, 

306 ) 

307 if attempt == max_attempts: 

308 logger.exception( 

309 f"{type(e).__name__} downloading from {url} after {max_attempts} attempts" 

310 ) 

311 return None 

312 logger.warning( 

313 f"Attempt {attempt}/{max_attempts} - {type(e).__name__} downloading from {url}" 

314 ) 

315 continue # Retry with adaptive wait 

316 except requests.exceptions.RequestException as e: 

317 logger.exception(f"Request error downloading from {url}") 

318 # Record failure but don't retry 

319 self.rate_tracker.record_outcome( 

320 engine_type=engine_type, 

321 wait_time=wait_time, 

322 success=False, 

323 retry_count=attempt, 

324 error_type=type(e).__name__, 

325 ) 

326 return None 

327 except Exception: 

328 logger.exception(f"Unexpected error downloading from {url}") 

329 # Record failure but don't retry 

330 self.rate_tracker.record_outcome( 

331 engine_type=engine_type, 

332 wait_time=wait_time, 

333 success=False, 

334 retry_count=attempt, 

335 error_type="UnexpectedError", 

336 ) 

337 return None 

338 

339 return None 

340 

341 @staticmethod 

342 def extract_text_from_pdf(pdf_content: bytes) -> Optional[str]: 

343 """ 

344 Extract text from PDF content using in-memory processing. 

345 

346 This is part of the public API and can be used by other modules. 

347 

348 Args: 

349 pdf_content: PDF file content as bytes 

350 

351 Returns: 

352 Extracted text, or None if extraction failed 

353 """ 

354 try: 

355 import io 

356 

357 # Use pypdf for in-memory PDF text extraction (no disk writes) 

358 from pypdf import PdfReader 

359 

360 pdf_file = io.BytesIO(pdf_content) 

361 pdf_reader = PdfReader(pdf_file) 

362 

363 text_content = [] 

364 for page in pdf_reader.pages: 

365 text = page.extract_text() 

366 if text: 

367 text_content.append(text) 

368 

369 full_text = "\n".join(text_content) 

370 return full_text if full_text.strip() else None 

371 

372 except Exception: 

373 logger.exception("Failed to extract text from PDF") 

374 return None 

375 

376 def _fetch_text_from_api(self, url: str) -> Optional[str]: 

377 """ 

378 Fetch full text directly from API. 

379 

380 This is a placeholder - derived classes should implement 

381 API-specific text fetching logic. 

382 

383 Args: 

384 url: The URL or identifier 

385 

386 Returns: 

387 Full text content, or None if not available 

388 """ 

389 return None 

390 

391 def get_metadata(self, url: str) -> Dict[str, Any]: 

392 """ 

393 Get metadata about the resource (optional override). 

394 

395 Args: 

396 url: The URL to get metadata for 

397 

398 Returns: 

399 Dictionary with metadata 

400 """ 

401 return {}