Coverage for src / local_deep_research / web_search_engines / engines / search_engine_arxiv.py: 98%
159 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1from typing import Any, Dict, List, Optional
3import arxiv
4from langchain_core.language_models import BaseLLM
5from loguru import logger
7from ...advanced_search_system.filters.journal_reputation_filter import (
8 JournalReputationFilter,
9)
10from ...config import search_config
11from ...constants import SNIPPET_LENGTH_SHORT
12from ..rate_limiting import RateLimitError
13from ..search_engine_base import BaseSearchEngine
16class ArXivSearchEngine(BaseSearchEngine):
17 """arXiv search engine implementation with two-phase approach"""
19 # Mark as public search engine
20 is_public = True
21 # Not a generic search engine (specialized for academic papers)
22 is_generic = False
23 # Scientific/academic search engine
24 is_scientific = True
25 is_lexical = True
26 needs_llm_relevance_filter = True
28 def __init__(
29 self,
30 max_results: int = 10,
31 sort_by: str = "relevance",
32 sort_order: str = "descending",
33 include_full_text: bool = False,
34 download_dir: Optional[str] = None,
35 max_full_text: int = 1,
36 llm: Optional[BaseLLM] = None,
37 max_filtered_results: Optional[int] = None,
38 settings_snapshot: Optional[Dict[str, Any]] = None,
39 ): # Added this parameter
40 """
41 Initialize the arXiv search engine.
43 Args:
44 max_results: Maximum number of search results
45 sort_by: Sorting criteria ('relevance', 'lastUpdatedDate', or 'submittedDate')
46 sort_order: Sort order ('ascending' or 'descending')
47 include_full_text: Whether to include full paper content in results (downloads PDF)
48 download_dir: Directory to download PDFs to (if include_full_text is True)
49 max_full_text: Maximum number of PDFs to download and process (default: 1)
50 llm: Language model for relevance filtering
51 max_filtered_results: Maximum number of results to keep after filtering
52 settings_snapshot: Settings snapshot for thread context
53 """
54 # Initialize the journal reputation filter if needed.
55 content_filters = []
56 journal_filter = JournalReputationFilter.create_default(
57 model=llm, # type: ignore[arg-type]
58 engine_name="arxiv",
59 settings_snapshot=settings_snapshot,
60 )
61 if journal_filter is not None:
62 content_filters.append(journal_filter)
64 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results
65 super().__init__(
66 llm=llm,
67 max_filtered_results=max_filtered_results,
68 max_results=max_results,
69 # We deliberately do this filtering after relevancy checks,
70 # because it is potentially quite slow.
71 content_filters=content_filters, # type: ignore[arg-type]
72 settings_snapshot=settings_snapshot,
73 )
74 self.max_results = max(self.max_results, 25)
75 self.sort_by = sort_by
76 self.sort_order = sort_order
77 self.include_full_text = include_full_text
78 self.download_dir = download_dir
79 self.max_full_text = max_full_text
81 # Map sort parameters to arxiv package parameters
82 self.sort_criteria = {
83 "relevance": arxiv.SortCriterion.Relevance,
84 "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate,
85 "submittedDate": arxiv.SortCriterion.SubmittedDate,
86 }
88 self.sort_directions = {
89 "ascending": arxiv.SortOrder.Ascending,
90 "descending": arxiv.SortOrder.Descending,
91 }
93 def _get_search_results(self, query: str) -> List[Any]:
94 """
95 Helper method to get search results from arXiv API.
97 Args:
98 query: The search query
100 Returns:
101 List of arXiv paper objects
102 """
103 # Configure the search client
104 sort_criteria = self.sort_criteria.get(
105 self.sort_by, arxiv.SortCriterion.Relevance
106 )
107 sort_order = self.sort_directions.get(
108 self.sort_order, arxiv.SortOrder.Descending
109 )
111 # Create the search client
112 client = arxiv.Client(page_size=self.max_results)
114 # Create the search query
115 search = arxiv.Search(
116 query=query,
117 max_results=self.max_results,
118 sort_by=sort_criteria,
119 sort_order=sort_order,
120 )
122 # Apply rate limiting before making the request
123 self._last_wait_time = self.rate_tracker.apply_rate_limit(
124 self.engine_type
125 )
127 # Get the search results
128 return list(client.results(search))
130 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
131 """
132 Get preview information for arXiv papers.
134 Args:
135 query: The search query
137 Returns:
138 List of preview dictionaries
139 """
140 logger.info("Getting paper previews from arXiv")
142 try:
143 # Get search results from arXiv
144 papers = self._get_search_results(query)
146 # Store the paper objects for later use
147 self._papers = {paper.entry_id: paper for paper in papers}
149 # Format results as previews with basic information
150 previews = []
151 for paper in papers:
152 preview = {
153 "id": paper.entry_id, # Use entry_id as ID
154 "title": paper.title,
155 "link": paper.entry_id, # arXiv URL
156 "snippet": (
157 paper.summary[:SNIPPET_LENGTH_SHORT] + "..."
158 if len(paper.summary) > SNIPPET_LENGTH_SHORT
159 else paper.summary
160 ),
161 "authors": [
162 author.name for author in paper.authors[:3]
163 ], # First 3 authors
164 "published": (
165 paper.published.strftime("%Y-%m-%d")
166 if paper.published
167 else None
168 ),
169 "journal_ref": paper.journal_ref,
170 "source": "arXiv",
171 }
173 previews.append(preview)
175 return previews
177 except Exception as e:
178 error_msg = str(e)
179 logger.exception("Error getting arXiv previews")
181 # Check for rate limiting patterns
182 if (
183 "429" in error_msg
184 or "too many requests" in error_msg.lower()
185 or "rate limit" in error_msg.lower()
186 or "service unavailable" in error_msg.lower()
187 or "503" in error_msg
188 ):
189 raise RateLimitError(f"arXiv rate limit hit: {error_msg}")
191 return []
193 def _get_full_content(
194 self, relevant_items: List[Dict[str, Any]]
195 ) -> List[Dict[str, Any]]:
196 """
197 Get full content for the relevant arXiv papers.
198 Downloads PDFs and extracts text when include_full_text is True.
199 Limits the number of PDFs processed to max_full_text.
201 Args:
202 relevant_items: List of relevant preview dictionaries
204 Returns:
205 List of result dictionaries with full content
206 """
207 # Check if we should get full content
208 if (
209 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
210 and search_config.SEARCH_SNIPPETS_ONLY
211 ):
212 logger.info("Snippet-only mode, skipping full content retrieval")
213 return relevant_items
215 logger.info("Getting full content for relevant arXiv papers")
217 results = []
218 pdf_count = 0 # Track number of PDFs processed
220 for item in relevant_items:
221 # Start with the preview data
222 result = item.copy()
224 # Get the paper ID
225 paper_id = item.get("id")
227 # Try to get the full paper from our cache
228 paper = None
229 if hasattr(self, "_papers") and paper_id in self._papers:
230 paper = self._papers[paper_id]
232 if paper:
233 # Add complete paper information
234 result.update(
235 {
236 "pdf_url": paper.pdf_url,
237 "authors": [
238 author.name for author in paper.authors
239 ], # All authors
240 "published": (
241 paper.published.strftime("%Y-%m-%d")
242 if paper.published
243 else None
244 ),
245 "updated": (
246 paper.updated.strftime("%Y-%m-%d")
247 if paper.updated
248 else None
249 ),
250 "categories": paper.categories,
251 "summary": paper.summary, # Full summary
252 "comment": paper.comment,
253 "doi": paper.doi,
254 }
255 )
257 # Default to using summary as content
258 result["content"] = paper.summary
259 result["full_content"] = paper.summary
261 # Download PDF and extract text if requested and within limit
262 if (
263 self.include_full_text
264 and self.download_dir
265 and pdf_count < self.max_full_text
266 ):
267 try:
268 # Download the paper
269 pdf_count += (
270 1 # Increment counter before attempting download
271 )
272 # Apply rate limiting before PDF download
273 self.rate_tracker.apply_rate_limit(self.engine_type)
275 paper_path = paper.download_pdf(
276 dirpath=self.download_dir
277 )
278 result["pdf_path"] = str(paper_path)
280 # Extract text from PDF
281 try:
282 # Try pypdf first
283 try:
284 from pypdf import PdfReader
286 with open(paper_path, "rb") as pdf_file:
287 pdf_reader = PdfReader(pdf_file)
288 pdf_text = ""
289 for page in pdf_reader.pages:
290 pdf_text += page.extract_text() + "\n\n"
292 if (
293 pdf_text.strip()
294 ): # Only use if we got meaningful text
295 result["content"] = pdf_text
296 result["full_content"] = pdf_text
297 logger.info(
298 "Successfully extracted text from PDF using pypdf"
299 )
300 except (ImportError, Exception) as e1:
301 # Fall back to pdfplumber
302 try:
303 import pdfplumber
305 with pdfplumber.open(paper_path) as pdf:
306 pdf_text = ""
307 for plumber_page in pdf.pages:
308 pdf_text += (
309 plumber_page.extract_text()
310 + "\n\n"
311 )
313 if ( 313 ↛ 351line 313 didn't jump to line 351
314 pdf_text.strip()
315 ): # Only use if we got meaningful text
316 result["content"] = pdf_text
317 result["full_content"] = pdf_text
318 logger.info(
319 "Successfully extracted text from PDF using pdfplumber"
320 )
321 except (ImportError, Exception) as e2:
322 logger.exception(
323 f"PDF text extraction failed: {e1!s}, then {e2!s}"
324 )
325 logger.info(
326 "Using paper summary as content instead"
327 )
328 except Exception:
329 logger.exception("Error extracting text from PDF")
330 logger.info(
331 "Using paper summary as content instead"
332 )
333 except Exception:
334 logger.exception(
335 f"Error downloading paper {paper.title}"
336 )
337 result["pdf_path"] = None
338 pdf_count -= 1 # Decrement counter if download fails
339 elif (
340 self.include_full_text
341 and self.download_dir
342 and pdf_count >= self.max_full_text
343 ):
344 # Reached PDF limit
345 logger.info(
346 f"Maximum number of PDFs ({self.max_full_text}) reached. Skipping remaining PDFs."
347 )
348 result["content"] = paper.summary
349 result["full_content"] = paper.summary
351 results.append(result)
353 return results
355 def run(
356 self, query: str, research_context: Dict[str, Any] | None = None
357 ) -> List[Dict[str, Any]]:
358 """
359 Execute a search using arXiv with the two-phase approach.
361 Args:
362 query: The search query
363 research_context: Context from previous research to use.
365 Returns:
366 List of search results
367 """
368 logger.info("---Execute a search using arXiv---")
370 # Use the implementation from the parent class which handles all phases
371 results = super().run(query, research_context=research_context)
373 # Clean up
374 if hasattr(self, "_papers"):
375 del self._papers
377 return results
379 def get_paper_details(self, arxiv_id: str) -> Dict[str, Any]:
380 """
381 Get detailed information about a specific arXiv paper.
383 Args:
384 arxiv_id: arXiv ID of the paper (e.g., '2101.12345')
386 Returns:
387 Dictionary with paper information
388 """
389 try:
390 # Create the search client
391 client = arxiv.Client()
393 # Search for the specific paper
394 search = arxiv.Search(id_list=[arxiv_id], max_results=1)
396 # Apply rate limiting before fetching paper by ID
397 self._last_wait_time = self.rate_tracker.apply_rate_limit(
398 self.engine_type
399 )
401 # Get the paper
402 papers = list(client.results(search))
403 if not papers:
404 return {}
406 paper = papers[0]
408 # Format result based on config
409 result = {
410 "title": paper.title,
411 "link": paper.entry_id,
412 "snippet": (
413 paper.summary[:250] + "..."
414 if len(paper.summary) > 250
415 else paper.summary
416 ),
417 "authors": [
418 author.name for author in paper.authors[:3]
419 ], # First 3 authors
420 "journal_ref": paper.journal_ref,
421 }
423 # Add full content if not in snippet-only mode
424 if (
425 not hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
426 or not search_config.SEARCH_SNIPPETS_ONLY
427 ):
428 result.update(
429 {
430 "pdf_url": paper.pdf_url,
431 "authors": [
432 author.name for author in paper.authors
433 ], # All authors
434 "published": (
435 paper.published.strftime("%Y-%m-%d")
436 if paper.published
437 else None
438 ),
439 "updated": (
440 paper.updated.strftime("%Y-%m-%d")
441 if paper.updated
442 else None
443 ),
444 "categories": paper.categories,
445 "summary": paper.summary, # Full summary
446 "comment": paper.comment,
447 "doi": paper.doi,
448 "content": paper.summary, # Use summary as content
449 "full_content": paper.summary, # For consistency
450 }
451 )
453 # Download PDF if requested
454 if self.include_full_text and self.download_dir:
455 try:
456 # Apply rate limiting before PDF download
457 self.rate_tracker.apply_rate_limit(self.engine_type)
459 # Download the paper
460 paper_path = paper.download_pdf(
461 dirpath=self.download_dir
462 )
463 result["pdf_path"] = str(paper_path)
464 except Exception:
465 logger.exception("Error downloading paper")
467 return result
469 except Exception:
470 logger.exception("Error getting paper details")
471 return {}
473 def search_by_author(
474 self, author_name: str, max_results: Optional[int] = None
475 ) -> List[Dict[str, Any]]:
476 """
477 Search for papers by a specific author.
479 Args:
480 author_name: Name of the author
481 max_results: Maximum number of results (defaults to self.max_results)
483 Returns:
484 List of papers by the author
485 """
486 original_max_results = self.max_results
488 try:
489 if max_results:
490 self.max_results = max_results
492 query = f'au:"{author_name}"'
493 return self.run(query)
495 finally:
496 # Restore original value
497 self.max_results = original_max_results
499 def search_by_category(
500 self, category: str, max_results: Optional[int] = None
501 ) -> List[Dict[str, Any]]:
502 """
503 Search for papers in a specific arXiv category.
505 Args:
506 category: arXiv category (e.g., 'cs.AI', 'physics.optics')
507 max_results: Maximum number of results (defaults to self.max_results)
509 Returns:
510 List of papers in the category
511 """
512 original_max_results = self.max_results
514 try:
515 if max_results:
516 self.max_results = max_results
518 query = f"cat:{category}"
519 return self.run(query)
521 finally:
522 # Restore original value
523 self.max_results = original_max_results