Coverage for src / local_deep_research / web_search_engines / engines / search_engine_arxiv.py: 32%

157 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1from typing import Any, Dict, List, Optional 

2 

3import arxiv 

4from langchain_core.language_models import BaseLLM 

5from loguru import logger 

6 

7from ...advanced_search_system.filters.journal_reputation_filter import ( 

8 JournalReputationFilter, 

9) 

10from ...config import search_config 

11from ..rate_limiting import RateLimitError 

12from ..search_engine_base import BaseSearchEngine 

13 

14 

15class ArXivSearchEngine(BaseSearchEngine): 

16 """arXiv search engine implementation with two-phase approach""" 

17 

18 # Mark as public search engine 

19 is_public = True 

20 # Not a generic search engine (specialized for academic papers) 

21 is_generic = False 

22 # Scientific/academic search engine 

23 is_scientific = True 

24 

25 def __init__( 

26 self, 

27 max_results: int = 10, 

28 sort_by: str = "relevance", 

29 sort_order: str = "descending", 

30 include_full_text: bool = False, 

31 download_dir: Optional[str] = None, 

32 max_full_text: int = 1, 

33 llm: Optional[BaseLLM] = None, 

34 max_filtered_results: Optional[int] = None, 

35 settings_snapshot: Optional[Dict[str, Any]] = None, 

36 ): # Added this parameter 

37 """ 

38 Initialize the arXiv search engine. 

39 

40 Args: 

41 max_results: Maximum number of search results 

42 sort_by: Sorting criteria ('relevance', 'lastUpdatedDate', or 'submittedDate') 

43 sort_order: Sort order ('ascending' or 'descending') 

44 include_full_text: Whether to include full paper content in results (downloads PDF) 

45 download_dir: Directory to download PDFs to (if include_full_text is True) 

46 max_full_text: Maximum number of PDFs to download and process (default: 1) 

47 llm: Language model for relevance filtering 

48 max_filtered_results: Maximum number of results to keep after filtering 

49 settings_snapshot: Settings snapshot for thread context 

50 """ 

51 # Initialize the journal reputation filter if needed. 

52 content_filters = [] 

53 journal_filter = JournalReputationFilter.create_default( 

54 model=llm, engine_name="arxiv", settings_snapshot=settings_snapshot 

55 ) 

56 if journal_filter is not None: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 content_filters.append(journal_filter) 

58 

59 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

60 super().__init__( 

61 llm=llm, 

62 max_filtered_results=max_filtered_results, 

63 max_results=max_results, 

64 # We deliberately do this filtering after relevancy checks, 

65 # because it is potentially quite slow. 

66 content_filters=content_filters, 

67 settings_snapshot=settings_snapshot, 

68 ) 

69 self.max_results = max(self.max_results, 25) 

70 self.sort_by = sort_by 

71 self.sort_order = sort_order 

72 self.include_full_text = include_full_text 

73 self.download_dir = download_dir 

74 self.max_full_text = max_full_text 

75 

76 # Map sort parameters to arxiv package parameters 

77 self.sort_criteria = { 

78 "relevance": arxiv.SortCriterion.Relevance, 

79 "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate, 

80 "submittedDate": arxiv.SortCriterion.SubmittedDate, 

81 } 

82 

83 self.sort_directions = { 

84 "ascending": arxiv.SortOrder.Ascending, 

85 "descending": arxiv.SortOrder.Descending, 

86 } 

87 

88 def _get_search_results(self, query: str) -> List[Any]: 

89 """ 

90 Helper method to get search results from arXiv API. 

91 

92 Args: 

93 query: The search query 

94 

95 Returns: 

96 List of arXiv paper objects 

97 """ 

98 # Configure the search client 

99 sort_criteria = self.sort_criteria.get( 

100 self.sort_by, arxiv.SortCriterion.Relevance 

101 ) 

102 sort_order = self.sort_directions.get( 

103 self.sort_order, arxiv.SortOrder.Descending 

104 ) 

105 

106 # Create the search client 

107 client = arxiv.Client(page_size=self.max_results) 

108 

109 # Create the search query 

110 search = arxiv.Search( 

111 query=query, 

112 max_results=self.max_results, 

113 sort_by=sort_criteria, 

114 sort_order=sort_order, 

115 ) 

116 

117 # Apply rate limiting before making the request 

118 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

119 self.engine_type 

120 ) 

121 

122 # Get the search results 

123 papers = list(client.results(search)) 

124 

125 return papers 

126 

127 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

128 """ 

129 Get preview information for arXiv papers. 

130 

131 Args: 

132 query: The search query 

133 

134 Returns: 

135 List of preview dictionaries 

136 """ 

137 logger.info("Getting paper previews from arXiv") 

138 

139 try: 

140 # Get search results from arXiv 

141 papers = self._get_search_results(query) 

142 

143 # Store the paper objects for later use 

144 self._papers = {paper.entry_id: paper for paper in papers} 

145 

146 # Format results as previews with basic information 

147 previews = [] 

148 for paper in papers: 

149 preview = { 

150 "id": paper.entry_id, # Use entry_id as ID 

151 "title": paper.title, 

152 "link": paper.entry_id, # arXiv URL 

153 "snippet": ( 

154 paper.summary[:250] + "..." 

155 if len(paper.summary) > 250 

156 else paper.summary 

157 ), 

158 "authors": [ 

159 author.name for author in paper.authors[:3] 

160 ], # First 3 authors 

161 "published": ( 

162 paper.published.strftime("%Y-%m-%d") 

163 if paper.published 

164 else None 

165 ), 

166 "journal_ref": paper.journal_ref, 

167 "source": "arXiv", 

168 } 

169 

170 previews.append(preview) 

171 

172 return previews 

173 

174 except Exception as e: 

175 error_msg = str(e) 

176 logger.exception("Error getting arXiv previews") 

177 

178 # Check for rate limiting patterns 

179 if ( 179 ↛ 186line 179 didn't jump to line 186 because the condition on line 179 was never true

180 "429" in error_msg 

181 or "too many requests" in error_msg.lower() 

182 or "rate limit" in error_msg.lower() 

183 or "service unavailable" in error_msg.lower() 

184 or "503" in error_msg 

185 ): 

186 raise RateLimitError(f"arXiv rate limit hit: {error_msg}") 

187 

188 return [] 

189 

190 def _get_full_content( 

191 self, relevant_items: List[Dict[str, Any]] 

192 ) -> List[Dict[str, Any]]: 

193 """ 

194 Get full content for the relevant arXiv papers. 

195 Downloads PDFs and extracts text when include_full_text is True. 

196 Limits the number of PDFs processed to max_full_text. 

197 

198 Args: 

199 relevant_items: List of relevant preview dictionaries 

200 

201 Returns: 

202 List of result dictionaries with full content 

203 """ 

204 # Check if we should get full content 

205 if ( 

206 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

207 and search_config.SEARCH_SNIPPETS_ONLY 

208 ): 

209 logger.info("Snippet-only mode, skipping full content retrieval") 

210 return relevant_items 

211 

212 logger.info("Getting full content for relevant arXiv papers") 

213 

214 results = [] 

215 pdf_count = 0 # Track number of PDFs processed 

216 

217 for item in relevant_items: 

218 # Start with the preview data 

219 result = item.copy() 

220 

221 # Get the paper ID 

222 paper_id = item.get("id") 

223 

224 # Try to get the full paper from our cache 

225 paper = None 

226 if hasattr(self, "_papers") and paper_id in self._papers: 

227 paper = self._papers[paper_id] 

228 

229 if paper: 

230 # Add complete paper information 

231 result.update( 

232 { 

233 "pdf_url": paper.pdf_url, 

234 "authors": [ 

235 author.name for author in paper.authors 

236 ], # All authors 

237 "published": ( 

238 paper.published.strftime("%Y-%m-%d") 

239 if paper.published 

240 else None 

241 ), 

242 "updated": ( 

243 paper.updated.strftime("%Y-%m-%d") 

244 if paper.updated 

245 else None 

246 ), 

247 "categories": paper.categories, 

248 "summary": paper.summary, # Full summary 

249 "comment": paper.comment, 

250 "doi": paper.doi, 

251 } 

252 ) 

253 

254 # Default to using summary as content 

255 result["content"] = paper.summary 

256 result["full_content"] = paper.summary 

257 

258 # Download PDF and extract text if requested and within limit 

259 if ( 

260 self.include_full_text 

261 and self.download_dir 

262 and pdf_count < self.max_full_text 

263 ): 

264 try: 

265 # Download the paper 

266 pdf_count += ( 

267 1 # Increment counter before attempting download 

268 ) 

269 # Apply rate limiting before PDF download 

270 self.rate_tracker.apply_rate_limit(self.engine_type) 

271 

272 paper_path = paper.download_pdf( 

273 dirpath=self.download_dir 

274 ) 

275 result["pdf_path"] = str(paper_path) 

276 

277 # Extract text from PDF 

278 try: 

279 # Try PyPDF2 first 

280 try: 

281 import PyPDF2 

282 

283 with open(paper_path, "rb") as pdf_file: 

284 pdf_reader = PyPDF2.PdfReader(pdf_file) 

285 pdf_text = "" 

286 for page in pdf_reader.pages: 

287 pdf_text += page.extract_text() + "\n\n" 

288 

289 if ( 

290 pdf_text.strip() 

291 ): # Only use if we got meaningful text 

292 result["content"] = pdf_text 

293 result["full_content"] = pdf_text 

294 logger.info( 

295 "Successfully extracted text from PDF using PyPDF2" 

296 ) 

297 except (ImportError, Exception) as e1: 

298 # Fall back to pdfplumber 

299 try: 

300 import pdfplumber 

301 

302 with pdfplumber.open(paper_path) as pdf: 

303 pdf_text = "" 

304 for page in pdf.pages: 

305 pdf_text += ( 

306 page.extract_text() + "\n\n" 

307 ) 

308 

309 if ( 

310 pdf_text.strip() 

311 ): # Only use if we got meaningful text 

312 result["content"] = pdf_text 

313 result["full_content"] = pdf_text 

314 logger.info( 

315 "Successfully extracted text from PDF using pdfplumber" 

316 ) 

317 except (ImportError, Exception) as e2: 

318 logger.exception( 

319 f"PDF text extraction failed: {e1!s}, then {e2!s}" 

320 ) 

321 logger.info( 

322 "Using paper summary as content instead" 

323 ) 

324 except Exception: 

325 logger.exception("Error extracting text from PDF") 

326 logger.info( 

327 "Using paper summary as content instead" 

328 ) 

329 except Exception: 

330 logger.exception( 

331 f"Error downloading paper {paper.title}" 

332 ) 

333 result["pdf_path"] = None 

334 pdf_count -= 1 # Decrement counter if download fails 

335 elif ( 

336 self.include_full_text 

337 and self.download_dir 

338 and pdf_count >= self.max_full_text 

339 ): 

340 # Reached PDF limit 

341 logger.info( 

342 f"Maximum number of PDFs ({self.max_full_text}) reached. Skipping remaining PDFs." 

343 ) 

344 result["content"] = paper.summary 

345 result["full_content"] = paper.summary 

346 

347 results.append(result) 

348 

349 return results 

350 

351 def run( 

352 self, query: str, research_context: Dict[str, Any] | None = None 

353 ) -> List[Dict[str, Any]]: 

354 """ 

355 Execute a search using arXiv with the two-phase approach. 

356 

357 Args: 

358 query: The search query 

359 research_context: Context from previous research to use. 

360 

361 Returns: 

362 List of search results 

363 """ 

364 logger.info("---Execute a search using arXiv---") 

365 

366 # Use the implementation from the parent class which handles all phases 

367 results = super().run(query, research_context=research_context) 

368 

369 # Clean up 

370 if hasattr(self, "_papers"): 370 ↛ 373line 370 didn't jump to line 373 because the condition on line 370 was always true

371 del self._papers 

372 

373 return results 

374 

375 def get_paper_details(self, arxiv_id: str) -> Dict[str, Any]: 

376 """ 

377 Get detailed information about a specific arXiv paper. 

378 

379 Args: 

380 arxiv_id: arXiv ID of the paper (e.g., '2101.12345') 

381 

382 Returns: 

383 Dictionary with paper information 

384 """ 

385 try: 

386 # Create the search client 

387 client = arxiv.Client() 

388 

389 # Search for the specific paper 

390 search = arxiv.Search(id_list=[arxiv_id], max_results=1) 

391 

392 # Apply rate limiting before fetching paper by ID 

393 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

394 self.engine_type 

395 ) 

396 

397 # Get the paper 

398 papers = list(client.results(search)) 

399 if not papers: 

400 return {} 

401 

402 paper = papers[0] 

403 

404 # Format result based on config 

405 result = { 

406 "title": paper.title, 

407 "link": paper.entry_id, 

408 "snippet": ( 

409 paper.summary[:250] + "..." 

410 if len(paper.summary) > 250 

411 else paper.summary 

412 ), 

413 "authors": [ 

414 author.name for author in paper.authors[:3] 

415 ], # First 3 authors 

416 "journal_ref": paper.journal_ref, 

417 } 

418 

419 # Add full content if not in snippet-only mode 

420 if ( 

421 not hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

422 or not search_config.SEARCH_SNIPPETS_ONLY 

423 ): 

424 result.update( 

425 { 

426 "pdf_url": paper.pdf_url, 

427 "authors": [ 

428 author.name for author in paper.authors 

429 ], # All authors 

430 "published": ( 

431 paper.published.strftime("%Y-%m-%d") 

432 if paper.published 

433 else None 

434 ), 

435 "updated": ( 

436 paper.updated.strftime("%Y-%m-%d") 

437 if paper.updated 

438 else None 

439 ), 

440 "categories": paper.categories, 

441 "summary": paper.summary, # Full summary 

442 "comment": paper.comment, 

443 "doi": paper.doi, 

444 "content": paper.summary, # Use summary as content 

445 "full_content": paper.summary, # For consistency 

446 } 

447 ) 

448 

449 # Download PDF if requested 

450 if self.include_full_text and self.download_dir: 

451 try: 

452 # Apply rate limiting before PDF download 

453 self.rate_tracker.apply_rate_limit(self.engine_type) 

454 

455 # Download the paper 

456 paper_path = paper.download_pdf( 

457 dirpath=self.download_dir 

458 ) 

459 result["pdf_path"] = str(paper_path) 

460 except Exception: 

461 logger.exception("Error downloading paper") 

462 

463 return result 

464 

465 except Exception: 

466 logger.exception("Error getting paper details") 

467 return {} 

468 

469 def search_by_author( 

470 self, author_name: str, max_results: Optional[int] = None 

471 ) -> List[Dict[str, Any]]: 

472 """ 

473 Search for papers by a specific author. 

474 

475 Args: 

476 author_name: Name of the author 

477 max_results: Maximum number of results (defaults to self.max_results) 

478 

479 Returns: 

480 List of papers by the author 

481 """ 

482 original_max_results = self.max_results 

483 

484 try: 

485 if max_results: 

486 self.max_results = max_results 

487 

488 query = f'au:"{author_name}"' 

489 return self.run(query) 

490 

491 finally: 

492 # Restore original value 

493 self.max_results = original_max_results 

494 

495 def search_by_category( 

496 self, category: str, max_results: Optional[int] = None 

497 ) -> List[Dict[str, Any]]: 

498 """ 

499 Search for papers in a specific arXiv category. 

500 

501 Args: 

502 category: arXiv category (e.g., 'cs.AI', 'physics.optics') 

503 max_results: Maximum number of results (defaults to self.max_results) 

504 

505 Returns: 

506 List of papers in the category 

507 """ 

508 original_max_results = self.max_results 

509 

510 try: 

511 if max_results: 

512 self.max_results = max_results 

513 

514 query = f"cat:{category}" 

515 return self.run(query) 

516 

517 finally: 

518 # Restore original value 

519 self.max_results = original_max_results