Coverage for src / local_deep_research / research_library / downloaders / html.py: 94%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2HTML Content Downloader for web pages. 

3 

4Downloads and extracts clean text content from HTML web pages. 

5Extraction is handled by the shared pipeline in extraction/pipeline.py. 

6""" 

7 

8from typing import Optional, Dict, Any 

9from urllib.parse import urlparse 

10from loguru import logger 

11from bs4 import BeautifulSoup 

12 

13from .base import BaseDownloader, ContentType, DownloadResult 

14from .extraction.pipeline import extract_content_with_metadata 

15from ...constants import BROWSER_USER_AGENT 

16 

17 

18class HTMLDownloader(BaseDownloader): 

19 """Downloader for HTML web pages - extracts clean text content.""" 

20 

21 def __init__( 

22 self, 

23 timeout: int = 30, 

24 language: str = "English", 

25 **kwargs, 

26 ): 

27 super().__init__(timeout) 

28 self.session.headers.update({"User-Agent": BROWSER_USER_AGENT}) 

29 self.language = language 

30 

31 def can_handle(self, url: str) -> bool: 

32 """ 

33 Check if this downloader can handle the given URL. 

34 

35 Returns True for any HTTP/HTTPS URL (fallback downloader for web content). 

36 """ 

37 try: 

38 parsed = urlparse(url) 

39 return parsed.scheme in ("http", "https") 

40 except Exception: 

41 return False 

42 

43 def download( 

44 self, url: str, content_type: ContentType = ContentType.TEXT 

45 ) -> Optional[bytes]: 

46 """ 

47 Download and extract text content from HTML page. 

48 

49 Args: 

50 url: The URL to download 

51 content_type: Type of content (TEXT for HTML extraction) 

52 

53 Returns: 

54 Extracted text as UTF-8 bytes, or None if failed 

55 """ 

56 if content_type == ContentType.PDF: 

57 logger.warning(f"HTML downloader cannot download PDFs: {url}") 

58 return None 

59 

60 try: 

61 html_content = self._fetch_html(url) 

62 if not html_content: 

63 return None 

64 

65 extracted = self._extract_content(html_content, url) 

66 if extracted: 

67 text = self._format_extracted_content(extracted) 

68 return text.encode("utf-8") 

69 

70 return None 

71 

72 except Exception: 

73 logger.exception(f"Failed to download HTML from {url}") 

74 return None 

75 

76 def download_with_result( 

77 self, url: str, content_type: ContentType = ContentType.TEXT 

78 ) -> DownloadResult: 

79 """Download content and return detailed result with skip reason.""" 

80 if content_type == ContentType.PDF: 

81 return DownloadResult( 

82 skip_reason="HTML downloader does not support PDF downloads" 

83 ) 

84 

85 try: 

86 html_content = self._fetch_html(url) 

87 if not html_content: 

88 return DownloadResult( 

89 skip_reason="Failed to fetch HTML content from URL" 

90 ) 

91 

92 extracted = self._extract_content(html_content, url) 

93 if not extracted: 

94 return DownloadResult( 

95 skip_reason="Could not extract meaningful content from page" 

96 ) 

97 

98 text = self._format_extracted_content(extracted) 

99 if not text.strip(): 

100 return DownloadResult(skip_reason="Extracted content is empty") 

101 

102 return DownloadResult( 

103 content=text.encode("utf-8"), 

104 is_success=True, 

105 ) 

106 

107 except Exception as e: 

108 logger.exception(f"Failed to download HTML from {url}") 

109 return DownloadResult(skip_reason=f"Error: {str(e)}") 

110 

111 def _fetch_html(self, url: str) -> Optional[str]: 

112 """Fetch raw HTML content from URL.""" 

113 logger.debug(f"Static fetch: {url}") 

114 domain = urlparse(url).netloc 

115 engine_type = f"html_download_{domain}" 

116 

117 wait_time = self.rate_tracker.apply_rate_limit(engine_type) 

118 

119 try: 

120 response = self.session.get( 

121 url, 

122 timeout=self.timeout, 

123 allow_redirects=True, 

124 ) 

125 

126 if response.status_code == 200: 

127 content_type = response.headers.get("content-type", "").lower() 

128 if ( 

129 "text/html" in content_type 

130 or "application/xhtml" in content_type 

131 ): 

132 self.rate_tracker.record_outcome( 

133 engine_type=engine_type, 

134 wait_time=wait_time, 

135 success=True, 

136 retry_count=1, 

137 search_result_count=1, 

138 ) 

139 return response.text 

140 logger.warning( 

141 f"Unexpected content type for HTML download: {content_type}" 

142 ) 

143 return None 

144 logger.warning(f"HTTP {response.status_code} fetching {url}") 

145 self.rate_tracker.record_outcome( 

146 engine_type=engine_type, 

147 wait_time=wait_time, 

148 success=False, 

149 retry_count=1, 

150 error_type=f"HTTP_{response.status_code}", 

151 ) 

152 return None 

153 

154 except Exception as e: 

155 logger.exception(f"Error fetching HTML from {url}") 

156 self.rate_tracker.record_outcome( 

157 engine_type=engine_type, 

158 wait_time=wait_time, 

159 success=False, 

160 retry_count=1, 

161 error_type=type(e).__name__, 

162 ) 

163 return None 

164 

165 def _extract_content(self, html: str, url: str) -> Optional[Dict[str, Any]]: 

166 """Extract clean content and metadata from HTML. 

167 

168 Delegates to the shared extraction pipeline which handles 

169 trafilatura, readability, justext, and metadata enrichment. 

170 """ 

171 try: 

172 result = extract_content_with_metadata(html, language=self.language) 

173 if not result: 

174 return None 

175 

176 title = result.get("title") 

177 content = result["content"] 

178 

179 logger.info( 

180 f"Extracted {len(content)} chars from {url} " 

181 f"(title: {title[:50] + '...' if title and len(title) > 50 else title})" 

182 ) 

183 return { 

184 "title": title, 

185 "description": result.get("description"), 

186 "content": content, 

187 "url": url, 

188 } 

189 

190 except Exception: 

191 logger.exception("Error extracting content from HTML") 

192 return None 

193 

194 def _format_extracted_content(self, extracted: Dict[str, Any]) -> str: 

195 """Format extracted content as readable text.""" 

196 parts = [] 

197 

198 if extracted.get("title"): 

199 parts.append(f"# {extracted['title']}") 

200 parts.append("") 

201 

202 if extracted.get("description"): 

203 parts.append(f"*{extracted['description']}*") 

204 parts.append("") 

205 

206 if extracted.get("url"): 

207 parts.append(f"Source: {extracted['url']}") 

208 parts.append("") 

209 

210 if extracted.get("content"): 

211 parts.append(extracted["content"]) 

212 

213 return "\n".join(parts) 

214 

215 def get_metadata(self, url: str) -> Dict[str, Any]: 

216 """Get metadata about the page.""" 

217 html_content = self._fetch_html(url) 

218 if not html_content: 

219 return {} 

220 

221 try: 

222 soup = BeautifulSoup(html_content, "html.parser") 

223 

224 metadata = {"url": url} 

225 

226 if soup.title and soup.title.string: 226 ↛ 229line 226 didn't jump to line 229 because the condition on line 226 was always true

227 metadata["title"] = soup.title.string.strip() 

228 

229 meta_desc = soup.find("meta", attrs={"name": "description"}) 

230 if meta_desc and meta_desc.get("content"): 

231 metadata["description"] = str(meta_desc["content"]) 

232 

233 author = soup.find("meta", attrs={"name": "author"}) 

234 if author and author.get("content"): 

235 metadata["author"] = str(author["content"]) 

236 

237 for prop in ["article:published_time", "datePublished"]: 

238 date_tag = soup.find("meta", property=prop) 

239 if date_tag and date_tag.get("content"): 

240 metadata["published_date"] = str(date_tag["content"]) 

241 break 

242 

243 return metadata 

244 

245 except Exception: 

246 logger.exception(f"Error extracting metadata from {url}") 

247 return {"url": url}