Coverage for src / local_deep_research / research_library / downloaders / base.py: 86%

121 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Base Academic Content Downloader Abstract Class 

3""" 

4 

5from abc import ABC, abstractmethod 

6from typing import Optional, Dict, Any, NamedTuple 

7from enum import Enum 

8import requests 

9from urllib.parse import urlparse 

10from loguru import logger 

11 

12# Import our adaptive rate limiting system 

13from ...web_search_engines.rate_limiting import ( 

14 AdaptiveRateLimitTracker, 

15) 

16from ...security import SafeSession 

17 

18# Import centralized User-Agent from constants 

19from ...constants import USER_AGENT # noqa: F401 - re-exported for backward compatibility 

20 

21 

22class ContentType(Enum): 

23 """Supported content types for download.""" 

24 

25 PDF = "pdf" 

26 TEXT = "text" 

27 

28 

29class DownloadResult(NamedTuple): 

30 """Result of a download attempt.""" 

31 

32 content: Optional[bytes] = None 

33 skip_reason: Optional[str] = None 

34 is_success: bool = False 

35 

36 

37class BaseDownloader(ABC): 

38 """Abstract base class for academic content downloaders.""" 

39 

40 def __init__(self, timeout: int = 30): 

41 """ 

42 Initialize the downloader. 

43 

44 Args: 

45 timeout: Request timeout in seconds 

46 """ 

47 self.timeout = timeout 

48 self.session = SafeSession() 

49 self.session.headers.update({"User-Agent": USER_AGENT}) 

50 

51 # Initialize rate limiter for PDF downloads 

52 # We'll use domain-specific rate limiting 

53 self.rate_tracker = AdaptiveRateLimitTracker( 

54 programmatic_mode=False # We want to persist rate limit data 

55 ) 

56 

57 @abstractmethod 

58 def can_handle(self, url: str) -> bool: 

59 """ 

60 Check if this downloader can handle the given URL. 

61 

62 Args: 

63 url: The URL to check 

64 

65 Returns: 

66 True if this downloader can handle the URL 

67 """ 

68 pass 

69 

70 @abstractmethod 

71 def download( 

72 self, url: str, content_type: ContentType = ContentType.PDF 

73 ) -> Optional[bytes]: 

74 """ 

75 Download content from the given URL. 

76 

77 Args: 

78 url: The URL to download from 

79 content_type: Type of content to download (PDF or TEXT) 

80 

81 Returns: 

82 Content as bytes, or None if download failed 

83 For TEXT type, returns UTF-8 encoded text as bytes 

84 """ 

85 pass 

86 

87 def download_pdf(self, url: str) -> Optional[bytes]: 

88 """ 

89 Convenience method to download PDF. 

90 

91 Args: 

92 url: The URL to download from 

93 

94 Returns: 

95 PDF content as bytes, or None if download failed 

96 """ 

97 return self.download(url, ContentType.PDF) 

98 

99 def download_with_result( 

100 self, url: str, content_type: ContentType = ContentType.PDF 

101 ) -> DownloadResult: 

102 """ 

103 Download content and return detailed result with skip reason. 

104 

105 Args: 

106 url: The URL to download from 

107 content_type: Type of content to download 

108 

109 Returns: 

110 DownloadResult with content and/or skip reason 

111 """ 

112 # Default implementation - derived classes should override for specific reasons 

113 content = self.download(url, content_type) 

114 if content: 

115 return DownloadResult(content=content, is_success=True) 

116 else: 

117 return DownloadResult( 

118 skip_reason="Download failed - content not available" 

119 ) 

120 

121 def download_text(self, url: str) -> Optional[str]: 

122 """ 

123 Convenience method to download and return text content. 

124 

125 Args: 

126 url: The URL to download from 

127 

128 Returns: 

129 Text content as string, or None if download failed 

130 """ 

131 content = self.download(url, ContentType.TEXT) 

132 if content: 

133 try: 

134 return content.decode("utf-8") 

135 except UnicodeDecodeError: 

136 logger.exception(f"Failed to decode text content from {url}") 

137 return None 

138 

139 def _is_pdf_content(self, response: requests.Response) -> bool: 

140 """ 

141 Check if response contains PDF content. 

142 

143 Args: 

144 response: The response to check 

145 

146 Returns: 

147 True if response appears to contain PDF content 

148 """ 

149 content_type = response.headers.get("content-type", "").lower() 

150 

151 # Check content type 

152 if "pdf" in content_type: 

153 return True 

154 

155 # Check if content starts with PDF magic bytes 

156 if len(response.content) > 4: 156 ↛ 159line 156 didn't jump to line 159 because the condition on line 156 was always true

157 return response.content[:4] == b"%PDF" 

158 

159 return False 

160 

161 def _download_pdf( 

162 self, url: str, headers: Optional[Dict[str, str]] = None 

163 ) -> Optional[bytes]: 

164 """ 

165 Helper method to download PDF with error handling and retry logic. 

166 Uses our optimized adaptive rate limiting/retry system. 

167 

168 Args: 

169 url: The URL to download 

170 headers: Optional additional headers 

171 

172 Returns: 

173 PDF content as bytes, or None if download failed 

174 """ 

175 # Extract domain for rate limiting (each domain gets its own rate limit) 

176 domain = urlparse(url).netloc 

177 engine_type = f"pdf_download_{domain}" 

178 

179 max_attempts = 3 

180 

181 logger.debug( 

182 f"Downloading PDF from {url} with adaptive rate limiting (max {max_attempts} attempts)" 

183 ) 

184 

185 for attempt in range(1, max_attempts + 1): 185 ↛ 314line 185 didn't jump to line 314 because the loop on line 185 didn't complete

186 # Apply adaptive rate limiting before the request 

187 wait_time = self.rate_tracker.apply_rate_limit(engine_type) 

188 

189 try: 

190 # Prepare headers 

191 if headers: 

192 request_headers = self.session.headers.copy() 

193 request_headers.update(headers) 

194 else: 

195 request_headers = self.session.headers 

196 

197 # Make the request 

198 response = self.session.get( 

199 url, 

200 headers=request_headers, 

201 timeout=self.timeout, 

202 allow_redirects=True, 

203 ) 

204 

205 # Check response 

206 if response.status_code == 200: 

207 if self._is_pdf_content(response): 

208 logger.debug( 

209 f"Successfully downloaded PDF from {url} on attempt {attempt}" 

210 ) 

211 # Record successful outcome 

212 self.rate_tracker.record_outcome( 

213 engine_type=engine_type, 

214 wait_time=wait_time, 

215 success=True, 

216 retry_count=attempt, 

217 search_result_count=1, # We got the PDF 

218 ) 

219 return response.content 

220 else: 

221 logger.warning( 

222 f"Response is not a PDF: {response.headers.get('content-type', 'unknown')}" 

223 ) 

224 # Record failure but don't retry for wrong content type 

225 self.rate_tracker.record_outcome( 

226 engine_type=engine_type, 

227 wait_time=wait_time, 

228 success=False, 

229 retry_count=attempt, 

230 error_type="NotPDF", 

231 ) 

232 return None 

233 elif response.status_code in [ 

234 429, 

235 503, 

236 ]: # Rate limit or service unavailable 

237 logger.warning( 

238 f"Attempt {attempt}/{max_attempts} - HTTP {response.status_code} from {url}" 

239 ) 

240 # Record rate limit failure 

241 self.rate_tracker.record_outcome( 

242 engine_type=engine_type, 

243 wait_time=wait_time, 

244 success=False, 

245 retry_count=attempt, 

246 error_type=f"HTTP_{response.status_code}", 

247 ) 

248 if attempt == max_attempts: 

249 logger.error( 

250 f"Failed to download from {url}: HTTP {response.status_code} after {max_attempts} attempts" 

251 ) 

252 return None 

253 # Continue retry loop with adaptive wait 

254 continue 

255 else: 

256 logger.warning( 

257 f"Failed to download from {url}: HTTP {response.status_code}" 

258 ) 

259 # Record failure but don't retry for other status codes 

260 self.rate_tracker.record_outcome( 

261 engine_type=engine_type, 

262 wait_time=wait_time, 

263 success=False, 

264 retry_count=attempt, 

265 error_type=f"HTTP_{response.status_code}", 

266 ) 

267 return None 

268 

269 except ( 

270 requests.exceptions.Timeout, 

271 requests.exceptions.ConnectionError, 

272 ) as e: 

273 # Record network failure 

274 self.rate_tracker.record_outcome( 

275 engine_type=engine_type, 

276 wait_time=wait_time, 

277 success=False, 

278 retry_count=attempt, 

279 error_type=type(e).__name__, 

280 ) 

281 if attempt == max_attempts: 

282 logger.exception( 

283 f"{type(e).__name__} downloading from {url} after {max_attempts} attempts" 

284 ) 

285 return None 

286 else: 

287 logger.warning( 

288 f"Attempt {attempt}/{max_attempts} - {type(e).__name__} downloading from {url}" 

289 ) 

290 continue # Retry with adaptive wait 

291 except requests.exceptions.RequestException as e: 

292 logger.exception(f"Request error downloading from {url}") 

293 # Record failure but don't retry 

294 self.rate_tracker.record_outcome( 

295 engine_type=engine_type, 

296 wait_time=wait_time, 

297 success=False, 

298 retry_count=attempt, 

299 error_type=type(e).__name__, 

300 ) 

301 return None 

302 except Exception: 

303 logger.exception(f"Unexpected error downloading from {url}") 

304 # Record failure but don't retry 

305 self.rate_tracker.record_outcome( 

306 engine_type=engine_type, 

307 wait_time=wait_time, 

308 success=False, 

309 retry_count=attempt, 

310 error_type="UnexpectedError", 

311 ) 

312 return None 

313 

314 return None 

315 

316 @staticmethod 

317 def extract_text_from_pdf(pdf_content: bytes) -> Optional[str]: 

318 """ 

319 Extract text from PDF content using in-memory processing. 

320 

321 This is part of the public API and can be used by other modules. 

322 

323 Args: 

324 pdf_content: PDF file content as bytes 

325 

326 Returns: 

327 Extracted text, or None if extraction failed 

328 """ 

329 try: 

330 import io 

331 

332 # Use pypdf for in-memory PDF text extraction (no disk writes) 

333 try: 

334 from pypdf import PdfReader 

335 except ImportError: 

336 from PyPDF2 import PdfReader 

337 

338 pdf_file = io.BytesIO(pdf_content) 

339 pdf_reader = PdfReader(pdf_file) 

340 

341 text_content = [] 

342 for page in pdf_reader.pages: 

343 text = page.extract_text() 

344 if text: 

345 text_content.append(text) 

346 

347 full_text = "\n".join(text_content) 

348 return full_text if full_text.strip() else None 

349 

350 except Exception: 

351 logger.exception("Failed to extract text from PDF") 

352 return None 

353 

354 def _fetch_text_from_api(self, url: str) -> Optional[str]: 

355 """ 

356 Fetch full text directly from API. 

357 

358 This is a placeholder - derived classes should implement 

359 API-specific text fetching logic. 

360 

361 Args: 

362 url: The URL or identifier 

363 

364 Returns: 

365 Full text content, or None if not available 

366 """ 

367 return None 

368 

369 def get_metadata(self, url: str) -> Dict[str, Any]: 

370 """ 

371 Get metadata about the resource (optional override). 

372 

373 Args: 

374 url: The URL to get metadata for 

375 

376 Returns: 

377 Dictionary with metadata 

378 """ 

379 return {}