Coverage for src / local_deep_research / web_search_engines / engines / search_engine_wayback.py: 84%

199 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import re 

2from typing import Any, Dict, List, Optional, Tuple 

3 

4import justext 

5from langchain_core.language_models import BaseLLM 

6from loguru import logger 

7 

8from ...config import search_config 

9from ...security.safe_requests import safe_get 

10from ..rate_limiting import RateLimitError 

11from ..search_engine_base import BaseSearchEngine 

12 

13 

14class WaybackSearchEngine(BaseSearchEngine): 

15 """ 

16 Internet Archive Wayback Machine search engine implementation 

17 Provides access to historical versions of web pages 

18 """ 

19 

20 # Mark as public search engine 

21 is_public = True 

22 

23 def __init__( 

24 self, 

25 max_results: int = 10, 

26 max_snapshots_per_url: int = 3, 

27 llm: Optional[BaseLLM] = None, 

28 language: str = "English", 

29 max_filtered_results: Optional[int] = None, 

30 closest_only: bool = False, 

31 ): 

32 """ 

33 Initialize the Wayback Machine search engine. 

34 

35 Args: 

36 max_results: Maximum number of search results 

37 max_snapshots_per_url: Maximum snapshots to retrieve per URL 

38 llm: Language model for relevance filtering 

39 language: Language for content processing 

40 max_filtered_results: Maximum number of results to keep after filtering 

41 closest_only: If True, only retrieves the closest snapshot for each URL 

42 """ 

43 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

44 super().__init__( 

45 llm=llm, 

46 max_filtered_results=max_filtered_results, 

47 max_results=max_results, 

48 ) 

49 self.max_snapshots_per_url = max_snapshots_per_url 

50 self.language = language 

51 self.closest_only = closest_only 

52 

53 # API endpoints 

54 self.available_api = "https://archive.org/wayback/available" 

55 self.cdx_api = "https://web.archive.org/cdx/search/cdx" 

56 

57 def _extract_urls_from_query(self, query: str) -> List[str]: 

58 """ 

59 Extract URLs from a query string or interpret as an URL if possible. 

60 For non-URL queries, use a DuckDuckGo search to find relevant URLs. 

61 

62 Args: 

63 query: The search query or URL 

64 

65 Returns: 

66 List of URLs to search in the Wayback Machine 

67 """ 

68 # Check if the query is already a URL 

69 url_pattern = re.compile(r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+") 

70 urls = url_pattern.findall(query) 

71 

72 if urls: 

73 logger.info(f"Found {len(urls)} URLs in query") 

74 return urls 

75 

76 # Check if query is a domain without http prefix 

77 domain_pattern = re.compile(r"^(?:[-\w.]|(?:%[\da-fA-F]{2}))+\.\w+$") 

78 if domain_pattern.match(query): 

79 logger.info(f"Query appears to be a domain: {query}") 

80 return [f"http://{query}"] 

81 

82 # For non-URL queries, use DuckDuckGo to find relevant URLs 

83 logger.info( 

84 "Query is not a URL, using DuckDuckGo to find relevant URLs" 

85 ) 

86 try: 

87 # Import DuckDuckGo search engine 

88 from langchain_community.utilities import DuckDuckGoSearchAPIWrapper 

89 

90 # Use max_results from parent class, but limit to 5 for URL discovery 

91 url_search_limit = min(5, self.max_results) 

92 ddg = DuckDuckGoSearchAPIWrapper(max_results=url_search_limit) 

93 # Pass max_results as a positional argument 

94 results = ddg.results(query, url_search_limit) 

95 

96 # Extract URLs from results 

97 ddg_urls = [ 

98 result.get("link") for result in results if result.get("link") 

99 ] 

100 if ddg_urls: 

101 logger.info( 

102 f"Found {len(ddg_urls)} URLs from DuckDuckGo search" 

103 ) 

104 return ddg_urls 

105 except Exception: 

106 logger.exception("Error using DuckDuckGo for URL discovery") 

107 

108 # Fallback: treat the query as a potential domain or path 

109 if "/" in query and "." in query: 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true

110 logger.info(f"Treating query as a partial URL: {query}") 

111 return [f"http://{query}"] 

112 elif "." in query: 

113 logger.info(f"Treating query as a domain: {query}") 

114 return [f"http://{query}"] 

115 

116 # Return empty list if nothing worked 

117 logger.warning(f"Could not extract any URLs from query: {query}") 

118 return [] 

119 

120 def _format_timestamp(self, timestamp: str) -> str: 

121 """Format Wayback Machine timestamp into readable date""" 

122 if len(timestamp) < 14: 

123 return timestamp 

124 

125 try: 

126 year = timestamp[0:4] 

127 month = timestamp[4:6] 

128 day = timestamp[6:8] 

129 hour = timestamp[8:10] 

130 minute = timestamp[10:12] 

131 second = timestamp[12:14] 

132 return f"{year}-{month}-{day} {hour}:{minute}:{second}" 

133 except Exception: 

134 return timestamp 

135 

136 def _get_wayback_snapshots(self, url: str) -> List[Dict[str, Any]]: 

137 """ 

138 Get snapshots from the Wayback Machine for a specific URL. 

139 

140 Args: 

141 url: URL to get snapshots for 

142 

143 Returns: 

144 List of snapshot dictionaries 

145 """ 

146 snapshots = [] 

147 

148 try: 

149 if self.closest_only: 

150 # Get only the closest snapshot 

151 response = safe_get(self.available_api, params={"url": url}) 

152 

153 # Check for rate limit 

154 if response.status_code == 429: 

155 raise RateLimitError("Wayback Machine rate limit exceeded") 

156 

157 data = response.json() 

158 

159 if ( 

160 "archived_snapshots" in data 

161 and "closest" in data["archived_snapshots"] 

162 ): 

163 snapshot = data["archived_snapshots"]["closest"] 

164 snapshot_url = snapshot["url"] 

165 timestamp = snapshot["timestamp"] 

166 

167 snapshots.append( 

168 { 

169 "timestamp": timestamp, 

170 "formatted_date": self._format_timestamp(timestamp), 

171 "url": snapshot_url, 

172 "original_url": url, 

173 "available": snapshot.get("available", True), 

174 "status": snapshot.get("status", "200"), 

175 } 

176 ) 

177 else: 

178 # Get multiple snapshots using CDX API 

179 response = safe_get( 

180 self.cdx_api, 

181 params={ 

182 "url": url, 

183 "output": "json", 

184 "fl": "timestamp,original,statuscode,mimetype", 

185 "collapse": "timestamp:4", # Group by year 

186 "limit": self.max_snapshots_per_url, 

187 }, 

188 ) 

189 

190 # Check for rate limit 

191 if response.status_code == 429: 

192 raise RateLimitError( 

193 "Wayback Machine CDX API rate limit exceeded" 

194 ) 

195 

196 # Check if response is valid JSON 

197 data = response.json() 

198 

199 # First item is the header 

200 if len(data) > 1: 200 ↛ 224line 200 didn't jump to line 224 because the condition on line 200 was always true

201 headers = data[0] 

202 for item in data[1:]: 

203 snapshot = dict(zip(headers, item, strict=False)) 

204 timestamp = snapshot.get("timestamp", "") 

205 

206 wayback_url = ( 

207 f"https://web.archive.org/web/{timestamp}/{url}" 

208 ) 

209 

210 snapshots.append( 

211 { 

212 "timestamp": timestamp, 

213 "formatted_date": self._format_timestamp( 

214 timestamp 

215 ), 

216 "url": wayback_url, 

217 "original_url": url, 

218 "available": True, 

219 "status": snapshot.get("statuscode", "200"), 

220 } 

221 ) 

222 

223 # Limit to max snapshots per URL 

224 snapshots = snapshots[: self.max_snapshots_per_url] 

225 

226 except RateLimitError: 

227 # Re-raise rate limit errors for base class retry handling 

228 raise 

229 except Exception: 

230 logger.exception(f"Error getting Wayback snapshots for {url}") 

231 

232 return snapshots 

233 

234 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

235 """ 

236 Get preview information for Wayback Machine snapshots. 

237 

238 Args: 

239 query: The search query 

240 

241 Returns: 

242 List of preview dictionaries 

243 """ 

244 logger.info(f"Getting Wayback Machine previews for query: {query}") 

245 

246 # Extract URLs from query 

247 urls = self._extract_urls_from_query(query) 

248 

249 if not urls: 

250 logger.warning(f"No URLs found in query: {query}") 

251 return [] 

252 

253 # Get snapshots for each URL 

254 all_snapshots = [] 

255 for url in urls: 

256 snapshots = self._get_wayback_snapshots(url) 

257 all_snapshots.extend(snapshots) 

258 

259 # Apply rate limiting between requests 

260 if len(urls) > 1: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 self.rate_tracker.apply_rate_limit(self.engine_type) 

262 

263 # Format as previews 

264 previews = [] 

265 for snapshot in all_snapshots: 

266 preview = { 

267 "id": f"{snapshot['timestamp']}_{snapshot['original_url']}", 

268 "title": f"Archive of {snapshot['original_url']} ({snapshot['formatted_date']})", 

269 "link": snapshot["url"], 

270 "snippet": f"Archived version from {snapshot['formatted_date']}", 

271 "original_url": snapshot["original_url"], 

272 "timestamp": snapshot["timestamp"], 

273 "formatted_date": snapshot["formatted_date"], 

274 } 

275 previews.append(preview) 

276 

277 logger.info(f"Found {len(previews)} Wayback Machine snapshots") 

278 return previews 

279 

280 def _remove_boilerplate(self, html: str) -> str: 

281 """ 

282 Remove boilerplate content from HTML. 

283 

284 Args: 

285 html: HTML content 

286 

287 Returns: 

288 Cleaned text content 

289 """ 

290 if not html or not html.strip(): 

291 return "" 

292 try: 

293 paragraphs = justext.justext( 

294 html, justext.get_stoplist(self.language) 

295 ) 

296 cleaned = "\n".join( 

297 [p.text for p in paragraphs if not p.is_boilerplate] 

298 ) 

299 return cleaned 

300 except Exception: 

301 logger.exception("Error removing boilerplate") 

302 return html 

303 

304 def _get_wayback_content(self, url: str) -> Tuple[str, str]: 

305 """ 

306 Retrieve content from a Wayback Machine URL. 

307 

308 Args: 

309 url: Wayback Machine URL 

310 

311 Returns: 

312 Tuple of (raw_html, cleaned_text) 

313 """ 

314 try: 

315 headers = { 

316 "User-Agent": "Mozilla/5.0 (Local Deep Research Bot; research project)" 

317 } 

318 response = safe_get(url, headers=headers, timeout=10) 

319 raw_html = response.text 

320 

321 # Clean the HTML 

322 cleaned_text = self._remove_boilerplate(raw_html) 

323 

324 return raw_html, cleaned_text 

325 except Exception as e: 

326 logger.exception(f"Error retrieving content from {url}") 

327 return "", f"Error retrieving content: {e!s}" 

328 

329 def _get_full_content( 

330 self, relevant_items: List[Dict[str, Any]] 

331 ) -> List[Dict[str, Any]]: 

332 """ 

333 Get full content for the relevant Wayback Machine snapshots. 

334 

335 Args: 

336 relevant_items: List of relevant preview dictionaries 

337 

338 Returns: 

339 List of result dictionaries with full content 

340 """ 

341 # Check if we should add full content 

342 if ( 

343 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

344 and search_config.SEARCH_SNIPPETS_ONLY 

345 ): 

346 logger.info("Snippet-only mode, skipping full content retrieval") 

347 return relevant_items 

348 

349 logger.info( 

350 f"Getting full content for {len(relevant_items)} Wayback Machine snapshots" 

351 ) 

352 

353 results = [] 

354 for item in relevant_items: 

355 wayback_url = item.get("link") 

356 if not wayback_url: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 results.append(item) 

358 continue 

359 

360 logger.info(f"Retrieving content from {wayback_url}") 

361 

362 try: 

363 # Retrieve content 

364 raw_html, full_content = self._get_wayback_content(wayback_url) 

365 

366 # Add full content to the result 

367 result = item.copy() 

368 result["raw_html"] = raw_html 

369 result["full_content"] = full_content 

370 

371 results.append(result) 

372 

373 # Apply rate limiting 

374 self.rate_tracker.apply_rate_limit(self.engine_type) 

375 except Exception: 

376 logger.exception(f"Error processing {wayback_url}") 

377 results.append(item) 

378 

379 return results 

380 

381 def search_by_url( 

382 self, url: str, max_snapshots: int = None 

383 ) -> List[Dict[str, Any]]: 

384 """ 

385 Search for archived versions of a specific URL. 

386 

387 Args: 

388 url: The URL to search for archives 

389 max_snapshots: Maximum number of snapshots to return 

390 

391 Returns: 

392 List of snapshot dictionaries 

393 """ 

394 max_snapshots = max_snapshots or self.max_snapshots_per_url 

395 

396 snapshots = self._get_wayback_snapshots(url) 

397 previews = [] 

398 

399 for snapshot in snapshots[:max_snapshots]: 

400 preview = { 

401 "id": f"{snapshot['timestamp']}_{snapshot['original_url']}", 

402 "title": f"Archive of {snapshot['original_url']} ({snapshot['formatted_date']})", 

403 "link": snapshot["url"], 

404 "snippet": f"Archived version from {snapshot['formatted_date']}", 

405 "original_url": snapshot["original_url"], 

406 "timestamp": snapshot["timestamp"], 

407 "formatted_date": snapshot["formatted_date"], 

408 } 

409 previews.append(preview) 

410 

411 # Get full content if not in snippets-only mode 

412 if ( 412 ↛ 416line 412 didn't jump to line 416 because the condition on line 412 was never true

413 not hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

414 or not search_config.SEARCH_SNIPPETS_ONLY 

415 ): 

416 return self._get_full_content(previews) 

417 

418 return previews 

419 

420 def search_by_date_range( 

421 self, url: str, start_date: str, end_date: str 

422 ) -> List[Dict[str, Any]]: 

423 """ 

424 Search for archived versions of a URL within a date range. 

425 

426 Args: 

427 url: The URL to search for archives 

428 start_date: Start date in format YYYYMMDD 

429 end_date: End date in format YYYYMMDD 

430 

431 Returns: 

432 List of snapshot dictionaries 

433 """ 

434 try: 

435 # Use CDX API with date range 

436 response = safe_get( 

437 self.cdx_api, 

438 params={ 

439 "url": url, 

440 "output": "json", 

441 "fl": "timestamp,original,statuscode,mimetype", 

442 "from": start_date, 

443 "to": end_date, 

444 "limit": self.max_snapshots_per_url, 

445 }, 

446 ) 

447 

448 # Process response 

449 data = response.json() 

450 

451 # First item is the header 

452 if len(data) <= 1: 

453 return [] 

454 

455 headers = data[0] 

456 snapshots = [] 

457 

458 for item in data[1:]: 

459 snapshot = dict(zip(headers, item, strict=False)) 

460 timestamp = snapshot.get("timestamp", "") 

461 

462 wayback_url = f"https://web.archive.org/web/{timestamp}/{url}" 

463 

464 snapshots.append( 

465 { 

466 "id": f"{timestamp}_{url}", 

467 "title": f"Archive of {url} ({self._format_timestamp(timestamp)})", 

468 "link": wayback_url, 

469 "snippet": f"Archived version from {self._format_timestamp(timestamp)}", 

470 "original_url": url, 

471 "timestamp": timestamp, 

472 "formatted_date": self._format_timestamp(timestamp), 

473 } 

474 ) 

475 

476 # Get full content if not in snippets-only mode 

477 if ( 477 ↛ 481line 477 didn't jump to line 481 because the condition on line 477 was never true

478 not hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

479 or not search_config.SEARCH_SNIPPETS_ONLY 

480 ): 

481 return self._get_full_content(snapshots) 

482 

483 return snapshots 

484 

485 except Exception: 

486 logger.exception(f"Error searching date range for {url}") 

487 return [] 

488 

489 def get_latest_snapshot(self, url: str) -> Optional[Dict[str, Any]]: 

490 """ 

491 Get the most recent snapshot of a URL. 

492 

493 Args: 

494 url: The URL to get the latest snapshot for 

495 

496 Returns: 

497 Dictionary with snapshot information or None if not found 

498 """ 

499 try: 

500 response = safe_get(self.available_api, params={"url": url}) 

501 data = response.json() 

502 

503 if ( 

504 "archived_snapshots" in data 

505 and "closest" in data["archived_snapshots"] 

506 ): 

507 snapshot = data["archived_snapshots"]["closest"] 

508 timestamp = snapshot["timestamp"] 

509 wayback_url = snapshot["url"] 

510 

511 result = { 

512 "id": f"{timestamp}_{url}", 

513 "title": f"Latest archive of {url} ({self._format_timestamp(timestamp)})", 

514 "link": wayback_url, 

515 "snippet": f"Archived version from {self._format_timestamp(timestamp)}", 

516 "original_url": url, 

517 "timestamp": timestamp, 

518 "formatted_date": self._format_timestamp(timestamp), 

519 } 

520 

521 # Get full content if not in snippets-only mode 

522 if ( 522 ↛ 526line 522 didn't jump to line 526 because the condition on line 522 was never true

523 not hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

524 or not search_config.SEARCH_SNIPPETS_ONLY 

525 ): 

526 raw_html, full_content = self._get_wayback_content( 

527 wayback_url 

528 ) 

529 result["raw_html"] = raw_html 

530 result["full_content"] = full_content 

531 

532 return result 

533 

534 return None 

535 

536 except Exception: 

537 logger.exception(f"Error getting latest snapshot for {url}") 

538 return None