Coverage for src/local_deep_research/web_search_engines/engines/search_engine_arxiv.py: 98%

159 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1from typing import Any, Dict, List, Optional 

2 

3import arxiv 

4from langchain_core.language_models import BaseLLM 

5from loguru import logger 

6 

7from ...advanced_search_system.filters.journal_reputation_filter import ( 

8 JournalReputationFilter, 

9) 

10from ...config import search_config 

11from ...constants import SNIPPET_LENGTH_SHORT 

12from ..rate_limiting import RateLimitError 

13from ..search_engine_base import BaseSearchEngine 

14 

15 

16class ArXivSearchEngine(BaseSearchEngine): 

17 """arXiv search engine implementation with two-phase approach""" 

18 

19 # Mark as public search engine 

20 is_public = True 

21 # Not a generic search engine (specialized for academic papers) 

22 is_generic = False 

23 # Scientific/academic search engine 

24 is_scientific = True 

25 is_lexical = True 

26 needs_llm_relevance_filter = True 

27 

28 def __init__( 

29 self, 

30 max_results: int = 10, 

31 sort_by: str = "relevance", 

32 sort_order: str = "descending", 

33 include_full_text: bool = False, 

34 download_dir: Optional[str] = None, 

35 max_full_text: int = 1, 

36 llm: Optional[BaseLLM] = None, 

37 max_filtered_results: Optional[int] = None, 

38 settings_snapshot: Optional[Dict[str, Any]] = None, 

39 ): # Added this parameter 

40 """ 

41 Initialize the arXiv search engine. 

42 

43 Args: 

44 max_results: Maximum number of search results 

45 sort_by: Sorting criteria ('relevance', 'lastUpdatedDate', or 'submittedDate') 

46 sort_order: Sort order ('ascending' or 'descending') 

47 include_full_text: Whether to include full paper content in results (downloads PDF) 

48 download_dir: Directory to download PDFs to (if include_full_text is True) 

49 max_full_text: Maximum number of PDFs to download and process (default: 1) 

50 llm: Language model for relevance filtering 

51 max_filtered_results: Maximum number of results to keep after filtering 

52 settings_snapshot: Settings snapshot for thread context 

53 """ 

54 # Initialize the journal reputation filter if needed. 

55 # Runs as a preview filter (before LLM relevance) because Tiers 1-3 

56 # are instant data lookups — no point sending irrelevant journals 

57 # through the expensive LLM relevance filter. 

58 preview_filters = [] 

59 journal_filter = JournalReputationFilter.create_default( 

60 model=llm, # type: ignore[arg-type] 

61 engine_name="arxiv", 

62 settings_snapshot=settings_snapshot, 

63 ) 

64 if journal_filter is not None: 

65 preview_filters.append(journal_filter) 

66 

67 super().__init__( 

68 llm=llm, 

69 max_filtered_results=max_filtered_results, 

70 max_results=max_results, 

71 preview_filters=preview_filters, # type: ignore[arg-type] 

72 settings_snapshot=settings_snapshot, 

73 ) 

74 self.max_results = max(self.max_results, 25) 

75 self.sort_by = sort_by 

76 self.sort_order = sort_order 

77 self.include_full_text = include_full_text 

78 self.download_dir = download_dir 

79 self.max_full_text = max_full_text 

80 

81 # Map sort parameters to arxiv package parameters 

82 self.sort_criteria = { 

83 "relevance": arxiv.SortCriterion.Relevance, 

84 "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate, 

85 "submittedDate": arxiv.SortCriterion.SubmittedDate, 

86 } 

87 

88 self.sort_directions = { 

89 "ascending": arxiv.SortOrder.Ascending, 

90 "descending": arxiv.SortOrder.Descending, 

91 } 

92 

93 def _get_search_results(self, query: str) -> List[Any]: 

94 """ 

95 Helper method to get search results from arXiv API. 

96 

97 Args: 

98 query: The search query 

99 

100 Returns: 

101 List of arXiv paper objects 

102 """ 

103 # Configure the search client 

104 sort_criteria = self.sort_criteria.get( 

105 self.sort_by, arxiv.SortCriterion.Relevance 

106 ) 

107 sort_order = self.sort_directions.get( 

108 self.sort_order, arxiv.SortOrder.Descending 

109 ) 

110 

111 # Create the search client 

112 client = arxiv.Client(page_size=self.max_results) 

113 

114 # Create the search query 

115 search = arxiv.Search( 

116 query=query, 

117 max_results=self.max_results, 

118 sort_by=sort_criteria, 

119 sort_order=sort_order, 

120 ) 

121 

122 # Apply rate limiting before making the request 

123 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

124 self.engine_type 

125 ) 

126 

127 # Get the search results 

128 return list(client.results(search)) 

129 

130 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

131 """ 

132 Get preview information for arXiv papers. 

133 

134 Args: 

135 query: The search query 

136 

137 Returns: 

138 List of preview dictionaries 

139 """ 

140 logger.info("Getting paper previews from arXiv") 

141 

142 try: 

143 # Get search results from arXiv 

144 papers = self._get_search_results(query) 

145 

146 # Store the paper objects for later use 

147 self._papers = {paper.entry_id: paper for paper in papers} 

148 

149 # Format results as previews with basic information 

150 previews = [] 

151 for paper in papers: 

152 preview = { 

153 "id": paper.entry_id, # Use entry_id as ID 

154 "title": paper.title, 

155 "link": paper.entry_id, # arXiv URL 

156 "snippet": ( 

157 paper.summary[:SNIPPET_LENGTH_SHORT] + "..." 

158 if len(paper.summary) > SNIPPET_LENGTH_SHORT 

159 else paper.summary 

160 ), 

161 "authors": [ 

162 author.name for author in paper.authors[:3] 

163 ], # First 3 authors 

164 "published": ( 

165 paper.published.strftime("%Y-%m-%d") 

166 if paper.published 

167 else None 

168 ), 

169 "journal_ref": paper.journal_ref, 

170 "source": "arXiv", 

171 } 

172 

173 previews.append(preview) 

174 

175 return previews 

176 

177 except Exception as e: 

178 error_msg = str(e) 

179 logger.exception("Error getting arXiv previews") 

180 

181 # Check for rate limiting patterns 

182 if ( 

183 "429" in error_msg 

184 or "too many requests" in error_msg.lower() 

185 or "rate limit" in error_msg.lower() 

186 or "service unavailable" in error_msg.lower() 

187 or "503" in error_msg 

188 ): 

189 raise RateLimitError(f"arXiv rate limit hit: {error_msg}") 

190 

191 return [] 

192 

193 def _get_full_content( 

194 self, relevant_items: List[Dict[str, Any]] 

195 ) -> List[Dict[str, Any]]: 

196 """ 

197 Get full content for the relevant arXiv papers. 

198 Downloads PDFs and extracts text when include_full_text is True. 

199 Limits the number of PDFs processed to max_full_text. 

200 

201 Args: 

202 relevant_items: List of relevant preview dictionaries 

203 

204 Returns: 

205 List of result dictionaries with full content 

206 """ 

207 # Check if we should get full content 

208 if ( 

209 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

210 and search_config.SEARCH_SNIPPETS_ONLY 

211 ): 

212 logger.info("Snippet-only mode, skipping full content retrieval") 

213 return relevant_items 

214 

215 logger.info("Getting full content for relevant arXiv papers") 

216 

217 results = [] 

218 pdf_count = 0 # Track number of PDFs processed 

219 

220 for item in relevant_items: 

221 # Start with the preview data 

222 result = item.copy() 

223 

224 # Get the paper ID 

225 paper_id = item.get("id") 

226 

227 # Try to get the full paper from our cache 

228 paper = None 

229 if hasattr(self, "_papers") and paper_id in self._papers: 

230 paper = self._papers[paper_id] 

231 

232 if paper: 

233 # Add complete paper information 

234 result.update( 

235 { 

236 "pdf_url": paper.pdf_url, 

237 "authors": [ 

238 author.name for author in paper.authors 

239 ], # All authors 

240 "published": ( 

241 paper.published.strftime("%Y-%m-%d") 

242 if paper.published 

243 else None 

244 ), 

245 "updated": ( 

246 paper.updated.strftime("%Y-%m-%d") 

247 if paper.updated 

248 else None 

249 ), 

250 "categories": paper.categories, 

251 "summary": paper.summary, # Full summary 

252 "comment": paper.comment, 

253 "doi": paper.doi, 

254 # Explicitly forward for journal quality filter 

255 "journal_ref": paper.journal_ref, 

256 } 

257 ) 

258 

259 # Default to using summary as content 

260 result["content"] = paper.summary 

261 result["full_content"] = paper.summary 

262 

263 # Download PDF and extract text if requested and within limit 

264 if ( 

265 self.include_full_text 

266 and self.download_dir 

267 and pdf_count < self.max_full_text 

268 ): 

269 try: 

270 # Download the paper 

271 pdf_count += ( 

272 1 # Increment counter before attempting download 

273 ) 

274 # Apply rate limiting before PDF download 

275 self.rate_tracker.apply_rate_limit(self.engine_type) 

276 

277 paper_path = paper.download_pdf( 

278 dirpath=self.download_dir 

279 ) 

280 result["pdf_path"] = str(paper_path) 

281 

282 # Extract text from PDF 

283 try: 

284 # Try pypdf first 

285 try: 

286 from pypdf import PdfReader 

287 

288 with open(paper_path, "rb") as pdf_file: 

289 pdf_reader = PdfReader(pdf_file) 

290 pdf_text = "" 

291 for page in pdf_reader.pages: 

292 pdf_text += page.extract_text() + "\n\n" 

293 

294 if ( 

295 pdf_text.strip() 

296 ): # Only use if we got meaningful text 

297 result["content"] = pdf_text 

298 result["full_content"] = pdf_text 

299 logger.info( 

300 "Successfully extracted text from PDF using pypdf" 

301 ) 

302 except (ImportError, Exception) as e1: 

303 # Fall back to pdfplumber 

304 try: 

305 import pdfplumber 

306 

307 with pdfplumber.open(paper_path) as pdf: 

308 pdf_text = "" 

309 for plumber_page in pdf.pages: 

310 pdf_text += ( 

311 plumber_page.extract_text() 

312 + "\n\n" 

313 ) 

314 

315 if ( 315 ↛ 353line 315 didn't jump to line 353

316 pdf_text.strip() 

317 ): # Only use if we got meaningful text 

318 result["content"] = pdf_text 

319 result["full_content"] = pdf_text 

320 logger.info( 

321 "Successfully extracted text from PDF using pdfplumber" 

322 ) 

323 except (ImportError, Exception) as e2: 

324 logger.exception( 

325 f"PDF text extraction failed: {e1!s}, then {e2!s}" 

326 ) 

327 logger.info( 

328 "Using paper summary as content instead" 

329 ) 

330 except Exception: 

331 logger.exception("Error extracting text from PDF") 

332 logger.info( 

333 "Using paper summary as content instead" 

334 ) 

335 except Exception: 

336 logger.exception( 

337 f"Error downloading paper {paper.title}" 

338 ) 

339 result["pdf_path"] = None 

340 pdf_count -= 1 # Decrement counter if download fails 

341 elif ( 

342 self.include_full_text 

343 and self.download_dir 

344 and pdf_count >= self.max_full_text 

345 ): 

346 # Reached PDF limit 

347 logger.info( 

348 f"Maximum number of PDFs ({self.max_full_text}) reached. Skipping remaining PDFs." 

349 ) 

350 result["content"] = paper.summary 

351 result["full_content"] = paper.summary 

352 

353 results.append(result) 

354 

355 return results 

356 

357 def run( 

358 self, query: str, research_context: Dict[str, Any] | None = None 

359 ) -> List[Dict[str, Any]]: 

360 """ 

361 Execute a search using arXiv with the two-phase approach. 

362 

363 Args: 

364 query: The search query 

365 research_context: Context from previous research to use. 

366 

367 Returns: 

368 List of search results 

369 """ 

370 logger.info("---Execute a search using arXiv---") 

371 

372 # Use the implementation from the parent class which handles all phases 

373 results = super().run(query, research_context=research_context) 

374 

375 # Clean up 

376 if hasattr(self, "_papers"): 

377 del self._papers 

378 

379 return results 

380 

381 def get_paper_details(self, arxiv_id: str) -> Dict[str, Any]: 

382 """ 

383 Get detailed information about a specific arXiv paper. 

384 

385 Args: 

386 arxiv_id: arXiv ID of the paper (e.g., '2101.12345') 

387 

388 Returns: 

389 Dictionary with paper information 

390 """ 

391 try: 

392 # Create the search client 

393 client = arxiv.Client() 

394 

395 # Search for the specific paper 

396 search = arxiv.Search(id_list=[arxiv_id], max_results=1) 

397 

398 # Apply rate limiting before fetching paper by ID 

399 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

400 self.engine_type 

401 ) 

402 

403 # Get the paper 

404 papers = list(client.results(search)) 

405 if not papers: 

406 return {} 

407 

408 paper = papers[0] 

409 

410 # Format result based on config 

411 result = { 

412 "title": paper.title, 

413 "link": paper.entry_id, 

414 "snippet": ( 

415 paper.summary[:250] + "..." 

416 if len(paper.summary) > 250 

417 else paper.summary 

418 ), 

419 "authors": [ 

420 author.name for author in paper.authors[:3] 

421 ], # First 3 authors 

422 "journal_ref": paper.journal_ref, 

423 } 

424 

425 # Add full content if not in snippet-only mode 

426 if ( 

427 not hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

428 or not search_config.SEARCH_SNIPPETS_ONLY 

429 ): 

430 result.update( 

431 { 

432 "pdf_url": paper.pdf_url, 

433 "authors": [ 

434 author.name for author in paper.authors 

435 ], # All authors 

436 "published": ( 

437 paper.published.strftime("%Y-%m-%d") 

438 if paper.published 

439 else None 

440 ), 

441 "updated": ( 

442 paper.updated.strftime("%Y-%m-%d") 

443 if paper.updated 

444 else None 

445 ), 

446 "categories": paper.categories, 

447 "summary": paper.summary, # Full summary 

448 "comment": paper.comment, 

449 "doi": paper.doi, 

450 "content": paper.summary, # Use summary as content 

451 "full_content": paper.summary, # For consistency 

452 } 

453 ) 

454 

455 # Download PDF if requested 

456 if self.include_full_text and self.download_dir: 

457 try: 

458 # Apply rate limiting before PDF download 

459 self.rate_tracker.apply_rate_limit(self.engine_type) 

460 

461 # Download the paper 

462 paper_path = paper.download_pdf( 

463 dirpath=self.download_dir 

464 ) 

465 result["pdf_path"] = str(paper_path) 

466 except Exception: 

467 logger.exception("Error downloading paper") 

468 

469 return result 

470 

471 except Exception: 

472 logger.exception("Error getting paper details") 

473 return {} 

474 

475 def search_by_author( 

476 self, author_name: str, max_results: Optional[int] = None 

477 ) -> List[Dict[str, Any]]: 

478 """ 

479 Search for papers by a specific author. 

480 

481 Args: 

482 author_name: Name of the author 

483 max_results: Maximum number of results (defaults to self.max_results) 

484 

485 Returns: 

486 List of papers by the author 

487 """ 

488 original_max_results = self.max_results 

489 

490 try: 

491 if max_results: 

492 self.max_results = max_results 

493 

494 query = f'au:"{author_name}"' 

495 return self.run(query) 

496 

497 finally: 

498 # Restore original value 

499 self.max_results = original_max_results 

500 

501 def search_by_category( 

502 self, category: str, max_results: Optional[int] = None 

503 ) -> List[Dict[str, Any]]: 

504 """ 

505 Search for papers in a specific arXiv category. 

506 

507 Args: 

508 category: arXiv category (e.g., 'cs.AI', 'physics.optics') 

509 max_results: Maximum number of results (defaults to self.max_results) 

510 

511 Returns: 

512 List of papers in the category 

513 """ 

514 original_max_results = self.max_results 

515 

516 try: 

517 if max_results: 

518 self.max_results = max_results 

519 

520 query = f"cat:{category}" 

521 return self.run(query) 

522 

523 finally: 

524 # Restore original value 

525 self.max_results = original_max_results