Coverage for src / local_deep_research / web_search_engines / engines / search_engine_arxiv.py: 98%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1from typing import Any, Dict, List, Optional 

2 

3import arxiv 

4from langchain_core.language_models import BaseLLM 

5from loguru import logger 

6 

7from ...advanced_search_system.filters.journal_reputation_filter import ( 

8 JournalReputationFilter, 

9) 

10from ...config import search_config 

11from ...constants import SNIPPET_LENGTH_SHORT 

12from ..rate_limiting import RateLimitError 

13from ..search_engine_base import BaseSearchEngine 

14 

15 

16class ArXivSearchEngine(BaseSearchEngine): 

17 """arXiv search engine implementation with two-phase approach""" 

18 

19 # Mark as public search engine 

20 is_public = True 

21 # Not a generic search engine (specialized for academic papers) 

22 is_generic = False 

23 # Scientific/academic search engine 

24 is_scientific = True 

25 is_lexical = True 

26 needs_llm_relevance_filter = True 

27 

28 def __init__( 

29 self, 

30 max_results: int = 10, 

31 sort_by: str = "relevance", 

32 sort_order: str = "descending", 

33 include_full_text: bool = False, 

34 download_dir: Optional[str] = None, 

35 max_full_text: int = 1, 

36 llm: Optional[BaseLLM] = None, 

37 max_filtered_results: Optional[int] = None, 

38 settings_snapshot: Optional[Dict[str, Any]] = None, 

39 ): # Added this parameter 

40 """ 

41 Initialize the arXiv search engine. 

42 

43 Args: 

44 max_results: Maximum number of search results 

45 sort_by: Sorting criteria ('relevance', 'lastUpdatedDate', or 'submittedDate') 

46 sort_order: Sort order ('ascending' or 'descending') 

47 include_full_text: Whether to include full paper content in results (downloads PDF) 

48 download_dir: Directory to download PDFs to (if include_full_text is True) 

49 max_full_text: Maximum number of PDFs to download and process (default: 1) 

50 llm: Language model for relevance filtering 

51 max_filtered_results: Maximum number of results to keep after filtering 

52 settings_snapshot: Settings snapshot for thread context 

53 """ 

54 # Initialize the journal reputation filter if needed. 

55 content_filters = [] 

56 journal_filter = JournalReputationFilter.create_default( 

57 model=llm, # type: ignore[arg-type] 

58 engine_name="arxiv", 

59 settings_snapshot=settings_snapshot, 

60 ) 

61 if journal_filter is not None: 

62 content_filters.append(journal_filter) 

63 

64 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

65 super().__init__( 

66 llm=llm, 

67 max_filtered_results=max_filtered_results, 

68 max_results=max_results, 

69 # We deliberately do this filtering after relevancy checks, 

70 # because it is potentially quite slow. 

71 content_filters=content_filters, # type: ignore[arg-type] 

72 settings_snapshot=settings_snapshot, 

73 ) 

74 self.max_results = max(self.max_results, 25) 

75 self.sort_by = sort_by 

76 self.sort_order = sort_order 

77 self.include_full_text = include_full_text 

78 self.download_dir = download_dir 

79 self.max_full_text = max_full_text 

80 

81 # Map sort parameters to arxiv package parameters 

82 self.sort_criteria = { 

83 "relevance": arxiv.SortCriterion.Relevance, 

84 "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate, 

85 "submittedDate": arxiv.SortCriterion.SubmittedDate, 

86 } 

87 

88 self.sort_directions = { 

89 "ascending": arxiv.SortOrder.Ascending, 

90 "descending": arxiv.SortOrder.Descending, 

91 } 

92 

93 def _get_search_results(self, query: str) -> List[Any]: 

94 """ 

95 Helper method to get search results from arXiv API. 

96 

97 Args: 

98 query: The search query 

99 

100 Returns: 

101 List of arXiv paper objects 

102 """ 

103 # Configure the search client 

104 sort_criteria = self.sort_criteria.get( 

105 self.sort_by, arxiv.SortCriterion.Relevance 

106 ) 

107 sort_order = self.sort_directions.get( 

108 self.sort_order, arxiv.SortOrder.Descending 

109 ) 

110 

111 # Create the search client 

112 client = arxiv.Client(page_size=self.max_results) 

113 

114 # Create the search query 

115 search = arxiv.Search( 

116 query=query, 

117 max_results=self.max_results, 

118 sort_by=sort_criteria, 

119 sort_order=sort_order, 

120 ) 

121 

122 # Apply rate limiting before making the request 

123 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

124 self.engine_type 

125 ) 

126 

127 # Get the search results 

128 return list(client.results(search)) 

129 

130 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

131 """ 

132 Get preview information for arXiv papers. 

133 

134 Args: 

135 query: The search query 

136 

137 Returns: 

138 List of preview dictionaries 

139 """ 

140 logger.info("Getting paper previews from arXiv") 

141 

142 try: 

143 # Get search results from arXiv 

144 papers = self._get_search_results(query) 

145 

146 # Store the paper objects for later use 

147 self._papers = {paper.entry_id: paper for paper in papers} 

148 

149 # Format results as previews with basic information 

150 previews = [] 

151 for paper in papers: 

152 preview = { 

153 "id": paper.entry_id, # Use entry_id as ID 

154 "title": paper.title, 

155 "link": paper.entry_id, # arXiv URL 

156 "snippet": ( 

157 paper.summary[:SNIPPET_LENGTH_SHORT] + "..." 

158 if len(paper.summary) > SNIPPET_LENGTH_SHORT 

159 else paper.summary 

160 ), 

161 "authors": [ 

162 author.name for author in paper.authors[:3] 

163 ], # First 3 authors 

164 "published": ( 

165 paper.published.strftime("%Y-%m-%d") 

166 if paper.published 

167 else None 

168 ), 

169 "journal_ref": paper.journal_ref, 

170 "source": "arXiv", 

171 } 

172 

173 previews.append(preview) 

174 

175 return previews 

176 

177 except Exception as e: 

178 error_msg = str(e) 

179 logger.exception("Error getting arXiv previews") 

180 

181 # Check for rate limiting patterns 

182 if ( 

183 "429" in error_msg 

184 or "too many requests" in error_msg.lower() 

185 or "rate limit" in error_msg.lower() 

186 or "service unavailable" in error_msg.lower() 

187 or "503" in error_msg 

188 ): 

189 raise RateLimitError(f"arXiv rate limit hit: {error_msg}") 

190 

191 return [] 

192 

193 def _get_full_content( 

194 self, relevant_items: List[Dict[str, Any]] 

195 ) -> List[Dict[str, Any]]: 

196 """ 

197 Get full content for the relevant arXiv papers. 

198 Downloads PDFs and extracts text when include_full_text is True. 

199 Limits the number of PDFs processed to max_full_text. 

200 

201 Args: 

202 relevant_items: List of relevant preview dictionaries 

203 

204 Returns: 

205 List of result dictionaries with full content 

206 """ 

207 # Check if we should get full content 

208 if ( 

209 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

210 and search_config.SEARCH_SNIPPETS_ONLY 

211 ): 

212 logger.info("Snippet-only mode, skipping full content retrieval") 

213 return relevant_items 

214 

215 logger.info("Getting full content for relevant arXiv papers") 

216 

217 results = [] 

218 pdf_count = 0 # Track number of PDFs processed 

219 

220 for item in relevant_items: 

221 # Start with the preview data 

222 result = item.copy() 

223 

224 # Get the paper ID 

225 paper_id = item.get("id") 

226 

227 # Try to get the full paper from our cache 

228 paper = None 

229 if hasattr(self, "_papers") and paper_id in self._papers: 

230 paper = self._papers[paper_id] 

231 

232 if paper: 

233 # Add complete paper information 

234 result.update( 

235 { 

236 "pdf_url": paper.pdf_url, 

237 "authors": [ 

238 author.name for author in paper.authors 

239 ], # All authors 

240 "published": ( 

241 paper.published.strftime("%Y-%m-%d") 

242 if paper.published 

243 else None 

244 ), 

245 "updated": ( 

246 paper.updated.strftime("%Y-%m-%d") 

247 if paper.updated 

248 else None 

249 ), 

250 "categories": paper.categories, 

251 "summary": paper.summary, # Full summary 

252 "comment": paper.comment, 

253 "doi": paper.doi, 

254 } 

255 ) 

256 

257 # Default to using summary as content 

258 result["content"] = paper.summary 

259 result["full_content"] = paper.summary 

260 

261 # Download PDF and extract text if requested and within limit 

262 if ( 

263 self.include_full_text 

264 and self.download_dir 

265 and pdf_count < self.max_full_text 

266 ): 

267 try: 

268 # Download the paper 

269 pdf_count += ( 

270 1 # Increment counter before attempting download 

271 ) 

272 # Apply rate limiting before PDF download 

273 self.rate_tracker.apply_rate_limit(self.engine_type) 

274 

275 paper_path = paper.download_pdf( 

276 dirpath=self.download_dir 

277 ) 

278 result["pdf_path"] = str(paper_path) 

279 

280 # Extract text from PDF 

281 try: 

282 # Try pypdf first 

283 try: 

284 from pypdf import PdfReader 

285 

286 with open(paper_path, "rb") as pdf_file: 

287 pdf_reader = PdfReader(pdf_file) 

288 pdf_text = "" 

289 for page in pdf_reader.pages: 

290 pdf_text += page.extract_text() + "\n\n" 

291 

292 if ( 

293 pdf_text.strip() 

294 ): # Only use if we got meaningful text 

295 result["content"] = pdf_text 

296 result["full_content"] = pdf_text 

297 logger.info( 

298 "Successfully extracted text from PDF using pypdf" 

299 ) 

300 except (ImportError, Exception) as e1: 

301 # Fall back to pdfplumber 

302 try: 

303 import pdfplumber 

304 

305 with pdfplumber.open(paper_path) as pdf: 

306 pdf_text = "" 

307 for plumber_page in pdf.pages: 

308 pdf_text += ( 

309 plumber_page.extract_text() 

310 + "\n\n" 

311 ) 

312 

313 if ( 313 ↛ 351line 313 didn't jump to line 351

314 pdf_text.strip() 

315 ): # Only use if we got meaningful text 

316 result["content"] = pdf_text 

317 result["full_content"] = pdf_text 

318 logger.info( 

319 "Successfully extracted text from PDF using pdfplumber" 

320 ) 

321 except (ImportError, Exception) as e2: 

322 logger.exception( 

323 f"PDF text extraction failed: {e1!s}, then {e2!s}" 

324 ) 

325 logger.info( 

326 "Using paper summary as content instead" 

327 ) 

328 except Exception: 

329 logger.exception("Error extracting text from PDF") 

330 logger.info( 

331 "Using paper summary as content instead" 

332 ) 

333 except Exception: 

334 logger.exception( 

335 f"Error downloading paper {paper.title}" 

336 ) 

337 result["pdf_path"] = None 

338 pdf_count -= 1 # Decrement counter if download fails 

339 elif ( 

340 self.include_full_text 

341 and self.download_dir 

342 and pdf_count >= self.max_full_text 

343 ): 

344 # Reached PDF limit 

345 logger.info( 

346 f"Maximum number of PDFs ({self.max_full_text}) reached. Skipping remaining PDFs." 

347 ) 

348 result["content"] = paper.summary 

349 result["full_content"] = paper.summary 

350 

351 results.append(result) 

352 

353 return results 

354 

355 def run( 

356 self, query: str, research_context: Dict[str, Any] | None = None 

357 ) -> List[Dict[str, Any]]: 

358 """ 

359 Execute a search using arXiv with the two-phase approach. 

360 

361 Args: 

362 query: The search query 

363 research_context: Context from previous research to use. 

364 

365 Returns: 

366 List of search results 

367 """ 

368 logger.info("---Execute a search using arXiv---") 

369 

370 # Use the implementation from the parent class which handles all phases 

371 results = super().run(query, research_context=research_context) 

372 

373 # Clean up 

374 if hasattr(self, "_papers"): 

375 del self._papers 

376 

377 return results 

378 

379 def get_paper_details(self, arxiv_id: str) -> Dict[str, Any]: 

380 """ 

381 Get detailed information about a specific arXiv paper. 

382 

383 Args: 

384 arxiv_id: arXiv ID of the paper (e.g., '2101.12345') 

385 

386 Returns: 

387 Dictionary with paper information 

388 """ 

389 try: 

390 # Create the search client 

391 client = arxiv.Client() 

392 

393 # Search for the specific paper 

394 search = arxiv.Search(id_list=[arxiv_id], max_results=1) 

395 

396 # Apply rate limiting before fetching paper by ID 

397 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

398 self.engine_type 

399 ) 

400 

401 # Get the paper 

402 papers = list(client.results(search)) 

403 if not papers: 

404 return {} 

405 

406 paper = papers[0] 

407 

408 # Format result based on config 

409 result = { 

410 "title": paper.title, 

411 "link": paper.entry_id, 

412 "snippet": ( 

413 paper.summary[:250] + "..." 

414 if len(paper.summary) > 250 

415 else paper.summary 

416 ), 

417 "authors": [ 

418 author.name for author in paper.authors[:3] 

419 ], # First 3 authors 

420 "journal_ref": paper.journal_ref, 

421 } 

422 

423 # Add full content if not in snippet-only mode 

424 if ( 

425 not hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

426 or not search_config.SEARCH_SNIPPETS_ONLY 

427 ): 

428 result.update( 

429 { 

430 "pdf_url": paper.pdf_url, 

431 "authors": [ 

432 author.name for author in paper.authors 

433 ], # All authors 

434 "published": ( 

435 paper.published.strftime("%Y-%m-%d") 

436 if paper.published 

437 else None 

438 ), 

439 "updated": ( 

440 paper.updated.strftime("%Y-%m-%d") 

441 if paper.updated 

442 else None 

443 ), 

444 "categories": paper.categories, 

445 "summary": paper.summary, # Full summary 

446 "comment": paper.comment, 

447 "doi": paper.doi, 

448 "content": paper.summary, # Use summary as content 

449 "full_content": paper.summary, # For consistency 

450 } 

451 ) 

452 

453 # Download PDF if requested 

454 if self.include_full_text and self.download_dir: 

455 try: 

456 # Apply rate limiting before PDF download 

457 self.rate_tracker.apply_rate_limit(self.engine_type) 

458 

459 # Download the paper 

460 paper_path = paper.download_pdf( 

461 dirpath=self.download_dir 

462 ) 

463 result["pdf_path"] = str(paper_path) 

464 except Exception: 

465 logger.exception("Error downloading paper") 

466 

467 return result 

468 

469 except Exception: 

470 logger.exception("Error getting paper details") 

471 return {} 

472 

473 def search_by_author( 

474 self, author_name: str, max_results: Optional[int] = None 

475 ) -> List[Dict[str, Any]]: 

476 """ 

477 Search for papers by a specific author. 

478 

479 Args: 

480 author_name: Name of the author 

481 max_results: Maximum number of results (defaults to self.max_results) 

482 

483 Returns: 

484 List of papers by the author 

485 """ 

486 original_max_results = self.max_results 

487 

488 try: 

489 if max_results: 

490 self.max_results = max_results 

491 

492 query = f'au:"{author_name}"' 

493 return self.run(query) 

494 

495 finally: 

496 # Restore original value 

497 self.max_results = original_max_results 

498 

499 def search_by_category( 

500 self, category: str, max_results: Optional[int] = None 

501 ) -> List[Dict[str, Any]]: 

502 """ 

503 Search for papers in a specific arXiv category. 

504 

505 Args: 

506 category: arXiv category (e.g., 'cs.AI', 'physics.optics') 

507 max_results: Maximum number of results (defaults to self.max_results) 

508 

509 Returns: 

510 List of papers in the category 

511 """ 

512 original_max_results = self.max_results 

513 

514 try: 

515 if max_results: 

516 self.max_results = max_results 

517 

518 query = f"cat:{category}" 

519 return self.run(query) 

520 

521 finally: 

522 # Restore original value 

523 self.max_results = original_max_results