Coverage for src/local_deep_research/web_search_engines/engines/search_engine_semantic_scholar.py: 97%
276 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import re
2from typing import Any, Dict, List, Optional, Tuple
4import requests
5from langchain_core.language_models import BaseLLM
6from loguru import logger
7from requests.adapters import HTTPAdapter
8from urllib3.util import Retry
10from ...constants import SNIPPET_LENGTH_SHORT
11from ..rate_limiting import RateLimitError
12from ..search_engine_base import BaseSearchEngine
13from ...security import SafeSession
16class SemanticScholarSearchEngine(BaseSearchEngine):
17 """
18 Semantic Scholar search engine implementation with two-phase approach.
19 Provides efficient access to scientific literature across all fields.
20 """
22 # Mark as public search engine
23 is_public = True
24 # Scientific/academic search engine
25 is_scientific = True
26 is_lexical = True
27 needs_llm_relevance_filter = True
29 def __init__(
30 self,
31 max_results: int = 10,
32 api_key: Optional[str] = None,
33 year_range: Optional[Tuple[int, int]] = None,
34 get_abstracts: bool = True,
35 get_references: bool = False,
36 get_citations: bool = False,
37 get_embeddings: bool = False,
38 get_tldr: bool = True,
39 citation_limit: int = 10,
40 reference_limit: int = 10,
41 llm: Optional[BaseLLM] = None,
42 max_filtered_results: Optional[int] = None,
43 optimize_queries: bool = True,
44 max_retries: int = 5,
45 retry_backoff_factor: float = 1.0,
46 fields_of_study: Optional[List[str]] = None,
47 publication_types: Optional[List[str]] = None,
48 settings_snapshot: Optional[Dict[str, Any]] = None,
49 **kwargs,
50 ):
51 """
52 Initialize the Semantic Scholar search engine.
54 Args:
55 max_results: Maximum number of search results
56 api_key: Semantic Scholar API key for higher rate limits (optional)
57 year_range: Optional tuple of (start_year, end_year) to filter results
58 get_abstracts: Whether to fetch abstracts for all results
59 get_references: Whether to fetch references for papers
60 get_citations: Whether to fetch citations for papers
61 get_embeddings: Whether to fetch SPECTER embeddings for papers
62 get_tldr: Whether to fetch TLDR summaries for papers
63 citation_limit: Maximum number of citations to fetch per paper
64 reference_limit: Maximum number of references to fetch per paper
65 llm: Language model for relevance filtering
66 max_filtered_results: Maximum number of results to keep after filtering
67 optimize_queries: Whether to optimize natural language queries
68 max_retries: Maximum number of retries for API requests
69 retry_backoff_factor: Backoff factor for retries
70 fields_of_study: List of fields of study to filter results
71 publication_types: List of publication types to filter results
72 settings_snapshot: Settings snapshot for configuration
73 **kwargs: Additional parameters to pass to parent class
74 """
75 # Initialize journal reputation filter
76 from ...advanced_search_system.filters.journal_reputation_filter import (
77 JournalReputationFilter,
78 )
80 # Journal filter runs before LLM relevance (Tiers 1-3 are instant)
81 preview_filters = []
82 journal_filter = JournalReputationFilter.create_default(
83 model=llm, # type: ignore[arg-type]
84 engine_name="semantic_scholar",
85 settings_snapshot=settings_snapshot,
86 )
87 if journal_filter is not None: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 preview_filters.append(journal_filter)
90 super().__init__(
91 llm=llm,
92 max_filtered_results=max_filtered_results,
93 max_results=max_results,
94 preview_filters=preview_filters, # type: ignore[arg-type]
95 settings_snapshot=settings_snapshot,
96 **kwargs,
97 )
99 # Get API key from settings if not provided
100 if not api_key and settings_snapshot:
101 from ...config.search_config import get_setting_from_snapshot
103 try:
104 api_key = get_setting_from_snapshot(
105 "search.engine.web.semantic_scholar.api_key",
106 settings_snapshot=settings_snapshot,
107 )
108 except Exception:
109 logger.debug(
110 "Failed to read semantic_scholar.api_key from settings snapshot",
111 exc_info=True,
112 )
114 self.api_key = api_key
115 self.year_range = year_range
116 self.get_abstracts = get_abstracts
117 self.get_references = get_references
118 self.get_citations = get_citations
119 self.get_embeddings = get_embeddings
120 self.get_tldr = get_tldr
121 self.citation_limit = citation_limit
122 self.reference_limit = reference_limit
123 self.optimize_queries = optimize_queries
124 self.max_retries = max_retries
125 self.retry_backoff_factor = retry_backoff_factor
126 self.fields_of_study = (
127 self._ensure_list(fields_of_study)
128 if fields_of_study is not None
129 else None
130 )
131 self.publication_types = (
132 self._ensure_list(publication_types)
133 if publication_types is not None
134 else None
135 )
137 # Base API URLs
138 self.base_url = "https://api.semanticscholar.org/graph/v1"
139 self.paper_search_url = f"{self.base_url}/paper/search"
140 self.paper_details_url = f"{self.base_url}/paper"
142 # Create a session with retry capabilities
143 self.session: SafeSession | None = self._create_session()
145 # Log API key status
146 if self.api_key:
147 logger.info(
148 "Using Semantic Scholar with API key (higher rate limits)"
149 )
150 else:
151 logger.info(
152 "Using Semantic Scholar without API key (lower rate limits)"
153 )
155 def _create_session(self) -> SafeSession:
156 """Create and configure a requests session with retry capabilities"""
157 session = SafeSession()
159 # Configure automatic retries with exponential backoff
160 retry_strategy = Retry(
161 total=self.max_retries,
162 backoff_factor=self.retry_backoff_factor,
163 status_forcelist=[429, 500, 502, 503, 504],
164 allowed_methods={"HEAD", "GET", "POST", "OPTIONS"},
165 )
167 adapter = HTTPAdapter(max_retries=retry_strategy)
168 session.mount("https://", adapter)
170 # Set up headers
171 headers = {"Accept": "application/json"}
172 if self.api_key:
173 headers["x-api-key"] = self.api_key
175 session.headers.update(headers)
177 return session
179 def close(self):
180 """
181 Close the HTTP session and clean up resources.
183 Call this method when done using the search engine to prevent
184 connection/file descriptor leaks.
185 """
186 if hasattr(self, "session") and self.session:
187 try:
188 self.session.close()
189 except Exception:
190 logger.exception("Error closing SemanticScholar session")
191 finally:
192 self.session = None
193 # Close content filters (JournalReputationFilter) via parent
194 super().close()
196 def __del__(self):
197 """Destructor to ensure session is closed."""
198 self.close()
200 def __enter__(self):
201 """Context manager entry."""
202 return self
204 def __exit__(self, exc_type, exc_val, exc_tb):
205 """Context manager exit - ensures session cleanup."""
206 self.close()
207 return False
209 def _respect_rate_limit(self):
210 """Apply rate limiting between requests"""
211 # Apply rate limiting before request
212 self._last_wait_time = self.rate_tracker.apply_rate_limit(
213 self.engine_type
214 )
215 logger.debug(f"Applied rate limit wait: {self._last_wait_time:.2f}s")
217 def _make_request(
218 self,
219 url: str,
220 params: Optional[Dict] = None,
221 data: Optional[Dict] = None,
222 method: str = "GET",
223 ) -> Dict:
224 """
225 Make a request to the Semantic Scholar API.
227 Args:
228 url: API endpoint URL
229 params: Query parameters
230 data: JSON data for POST requests
231 method: HTTP method (GET or POST)
233 Returns:
234 API response as dictionary
235 """
236 self._respect_rate_limit()
238 try:
239 if self.session is None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 raise RuntimeError("Session is not initialized")
241 if method.upper() == "GET":
242 response = self.session.get(url, params=params, timeout=30)
243 elif method.upper() == "POST":
244 response = self.session.post(
245 url, params=params, json=data, timeout=30
246 )
247 else:
248 raise ValueError(f"Unsupported HTTP method: {method}")
250 # Handle rate limiting
251 if response.status_code == 429:
252 logger.warning("Semantic Scholar rate limit exceeded")
253 raise RateLimitError("Semantic Scholar rate limit exceeded")
255 response.raise_for_status()
256 return response.json() # type: ignore[no-any-return]
257 except requests.RequestException:
258 logger.exception("API request failed")
259 return {}
261 def _optimize_query(self, query: str) -> str:
262 """
263 Optimize a natural language query for Semantic Scholar search.
264 If LLM is available, uses it to extract key terms and concepts.
266 Args:
267 query: Natural language query
269 Returns:
270 Optimized query string
271 """
272 if not self.llm or not self.optimize_queries:
273 return query
275 try:
276 prompt = f"""Transform this natural language question into an optimized academic search query.
278Original query: "{query}"
280INSTRUCTIONS:
2811. Extract key academic concepts, technical terms, and proper nouns
2822. Remove generic words, filler words, and non-technical terms
2833. Add quotation marks around specific phrases that should be kept together
2844. Return ONLY the optimized search query with no explanation
2855. Keep it under 100 characters if possible
287EXAMPLE TRANSFORMATIONS:
288"What are the latest findings about mRNA vaccines and COVID-19?" → "mRNA vaccines COVID-19 recent findings"
289"How does machine learning impact climate change prediction?" → "machine learning "climate change" prediction"
290"Tell me about quantum computing approaches for encryption" → "quantum computing encryption"
292Return ONLY the optimized search query with no explanation.
293"""
295 response = self.llm.invoke(prompt)
296 optimized_query = (
297 str(response.content)
298 if hasattr(response, "content")
299 else str(response)
300 ).strip()
302 # Clean up the query - remove any explanations
303 lines = optimized_query.split("\n")
304 optimized_query = lines[0].strip()
306 # Safety check - if query looks too much like an explanation, use original
307 if len(optimized_query.split()) > 15 or ":" in optimized_query:
308 logger.warning(
309 "Query optimization result looks too verbose, using original"
310 )
311 return query
313 logger.info(f"Original query: '{query}'")
314 logger.info(f"Optimized for search: '{optimized_query}'")
316 return optimized_query
317 except Exception:
318 logger.exception("Error optimizing query")
319 return query # Fall back to original query on error
321 def _direct_search(self, query: str) -> List[Dict[str, Any]]:
322 """
323 Make a direct search request to the Semantic Scholar API.
325 Args:
326 query: The search query
328 Returns:
329 List of paper dictionaries
330 """
331 try:
332 # Configure fields to retrieve
333 fields = [
334 "paperId",
335 "externalIds",
336 "url",
337 "title",
338 "abstract",
339 "venue",
340 "publicationVenue", # Structured venue with name/type/ISSN
341 "year",
342 "authors",
343 "citationCount", # Add citation count for ranking
344 "openAccessPdf", # PDF URL for open access papers
345 ]
347 if self.get_tldr:
348 fields.append("tldr")
350 params = {
351 "query": query,
352 "limit": min(
353 self.max_results, 100
354 ), # API limit is 100 per request
355 "fields": ",".join(fields),
356 }
358 # Add year filter if specified
359 if self.year_range:
360 start_year, end_year = self.year_range
361 params["year"] = f"{start_year}-{end_year}"
363 # Add fields of study filter if specified
364 if self.fields_of_study:
365 params["fieldsOfStudy"] = ",".join(self.fields_of_study)
367 # Add publication types filter if specified
368 if self.publication_types:
369 params["publicationTypes"] = ",".join(self.publication_types)
371 response = self._make_request(self.paper_search_url, params)
373 if "data" in response:
374 papers = response["data"]
375 logger.info(
376 f"Found {len(papers)} papers with direct search for query: '{query}'"
377 )
378 return papers # type: ignore[no-any-return]
379 logger.warning(
380 f"No data in response for direct search query: '{query}'"
381 )
382 return []
384 except Exception:
385 logger.exception("Error in direct search")
386 return []
388 def _adaptive_search(self, query: str) -> Tuple[List[Dict[str, Any]], str]:
389 """
390 Perform an adaptive search that adjusts based on result volume.
391 Uses LLM to generate better fallback queries when available.
393 Args:
394 query: The search query
396 Returns:
397 Tuple of (list of paper results, search strategy used)
398 """
399 # Start with a standard search
400 papers = self._direct_search(query)
401 strategy = "standard"
403 # If no results, try different variations
404 if not papers:
405 # Try removing quotes to broaden search
406 if '"' in query:
407 unquoted_query = query.replace('"', "")
408 logger.info(
409 "No results with quoted terms, trying without quotes: {}",
410 unquoted_query,
411 )
412 papers = self._direct_search(unquoted_query)
414 if papers:
415 strategy = "unquoted"
416 return papers, strategy
418 # If LLM is available, use it to generate better fallback queries
419 if self.llm:
420 try:
421 # Generate alternate search queries focusing on core concepts
422 prompt = f"""You are helping refine a search query that returned no results.
424Original query: "{query}"
426The query might be too specific or use natural language phrasing that doesn't match academic paper keywords.
428Please provide THREE alternative search queries that:
4291. Focus on the core academic concepts
4302. Use precise terminology commonly found in academic papers
4313. Break down complex queries into more searchable components
4324. Format each as a concise keyword-focused search term (not a natural language question)
434Format each query on a new line with no numbering or explanation. Keep each query under 8 words and very focused.
435"""
436 # Get the LLM's response
437 response = self.llm.invoke(prompt)
439 # Extract the alternative queries
440 alt_queries = []
441 if hasattr(
442 response, "content"
443 ): # Handle various LLM response formats
444 content = response.content
445 alt_queries = [
446 q.strip()
447 for q in content.strip().split("\n")
448 if q.strip()
449 ]
450 elif isinstance(response, str): 450 ↛ 458line 450 didn't jump to line 458 because the condition on line 450 was always true
451 alt_queries = [
452 q.strip()
453 for q in response.strip().split("\n")
454 if q.strip()
455 ]
457 # Try each alternative query
458 for alt_query in alt_queries[
459 :3
460 ]: # Limit to first 3 alternatives
461 logger.info("Trying LLM-suggested query: {}", alt_query)
462 alt_papers = self._direct_search(alt_query)
464 if alt_papers:
465 logger.info(
466 "Found {} papers using LLM-suggested query: {}",
467 len(alt_papers),
468 alt_query,
469 )
470 strategy = "llm_alternative"
471 return alt_papers, strategy
472 except Exception:
473 logger.exception("Error using LLM for query refinement")
474 # Fall through to simpler strategies
476 # Fallback: Try with the longest words (likely specific terms)
477 words = re.findall(r"\w+", query)
478 longer_words = [word for word in words if len(word) > 6]
479 if longer_words:
480 # Use up to 3 of the longest words
481 longer_words = sorted(longer_words, key=len, reverse=True)[:3]
482 key_terms_query = " ".join(longer_words)
483 logger.info("Trying with key terms: {}", key_terms_query)
484 papers = self._direct_search(key_terms_query)
486 if papers:
487 strategy = "key_terms"
488 return papers, strategy
490 # Final fallback: Try with just the longest word
491 if words: 491 ↛ 501line 491 didn't jump to line 501 because the condition on line 491 was always true
492 longest_word = max(words, key=len)
493 if len(longest_word) > 5: # Only use if it's reasonably long
494 logger.info("Trying with single key term: {}", longest_word)
495 papers = self._direct_search(longest_word)
497 if papers:
498 strategy = "single_term"
499 return papers, strategy
501 return papers, strategy
503 def _get_paper_details(self, paper_id: str) -> Dict[str, Any]:
504 """
505 Get detailed information about a specific paper.
507 Args:
508 paper_id: Semantic Scholar Paper ID
510 Returns:
511 Dictionary with paper details
512 """
513 try:
514 # Construct fields parameter
515 fields = [
516 "paperId",
517 "externalIds",
518 "corpusId",
519 "url",
520 "title",
521 "abstract",
522 "venue",
523 "publicationVenue", # Structured venue with name/type/ISSN
524 "year",
525 "authors",
526 "fieldsOfStudy",
527 "citationCount", # Add citation count
528 ]
530 if self.get_tldr:
531 fields.append("tldr")
533 if self.get_embeddings:
534 fields.append("embedding")
536 # Add citation and reference fields if requested
537 if self.get_citations:
538 fields.append(f"citations.limit({self.citation_limit})")
540 if self.get_references:
541 fields.append(f"references.limit({self.reference_limit})")
543 # Make the request
544 url = f"{self.paper_details_url}/{paper_id}"
545 params = {"fields": ",".join(fields)}
547 return self._make_request(url, params)
549 except Exception:
550 logger.exception("Error getting paper details for paper")
551 return {}
553 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
554 """
555 Get preview information for Semantic Scholar papers.
557 Args:
558 query: The search query
560 Returns:
561 List of preview dictionaries
562 """
563 logger.info(f"Getting Semantic Scholar previews for query: {query}")
565 # Optimize the query if LLM is available
566 optimized_query = self._optimize_query(query)
568 # Use the adaptive search approach
569 papers, strategy = self._adaptive_search(optimized_query)
571 if not papers:
572 logger.warning("No Semantic Scholar results found")
573 return []
575 # Format as previews
576 previews = []
577 for paper in papers:
578 try:
579 # Format authors - ensure we have a valid list with string values
580 authors = []
581 if paper.get("authors"):
582 authors = [
583 author.get("name", "")
584 for author in paper["authors"]
585 if author and author.get("name")
586 ]
588 # Ensure we have valid strings for all fields
589 paper_id = paper.get("paperId", "")
590 title = paper.get("title", "")
591 url = paper.get("url", "")
593 # Handle abstract safely, ensuring we always have a string
594 abstract = paper.get("abstract")
595 snippet = ""
596 if abstract:
597 snippet = (
598 abstract[:SNIPPET_LENGTH_SHORT] + "..."
599 if len(abstract) > SNIPPET_LENGTH_SHORT
600 else abstract
601 )
603 # Prefer publicationVenue (structured, with ISSN) over
604 # venue (plain string, often empty for many papers).
605 pub_venue = paper.get("publicationVenue") or {}
606 venue_name = pub_venue.get("name") or paper.get("venue", "")
607 venue_issn = pub_venue.get("issn")
609 year = paper.get("year")
610 external_ids = paper.get("externalIds", {})
612 # Handle TLDR safely
613 tldr_text = ""
614 if paper.get("tldr") and isinstance(paper.get("tldr"), dict):
615 tldr_text = paper.get("tldr", {}).get("text", "")
617 # Create preview with basic information, ensuring no None values
618 preview = {
619 "id": paper_id if paper_id else "",
620 "title": title if title else "",
621 "link": url if url else "",
622 "snippet": snippet,
623 "authors": authors,
624 "venue": venue_name if venue_name else "",
625 "journal_ref": venue_name if venue_name else None,
626 "issn": venue_issn,
627 "year": year,
628 "external_ids": external_ids if external_ids else {},
629 "source": "Semantic Scholar",
630 "_paper_id": paper_id if paper_id else "",
631 "_search_strategy": strategy,
632 "tldr": tldr_text,
633 }
635 # Store the full paper object for later reference
636 preview["_full_paper"] = paper
638 previews.append(preview)
639 except Exception:
640 logger.exception("Error processing paper preview")
641 # Continue with the next paper
643 # Sort by year (newer first) if available
644 def _year_key(p: dict[str, Any]) -> int:
645 year = p.get("year")
646 try:
647 return int(year) if year is not None else 0
648 except (TypeError, ValueError):
649 return 0
651 previews = sorted(previews, key=_year_key, reverse=True)
653 logger.info(
654 f"Found {len(previews)} Semantic Scholar previews using strategy: {strategy}"
655 )
656 return previews
658 def _get_full_content(
659 self, relevant_items: List[Dict[str, Any]]
660 ) -> List[Dict[str, Any]]:
661 """
662 Get full content for the relevant Semantic Scholar papers.
663 Gets additional details like citations, references, and full metadata.
665 Args:
666 relevant_items: List of relevant preview dictionaries
668 Returns:
669 List of result dictionaries with full content
670 """
671 # For Semantic Scholar, we already have most content from the preview
672 # Additional API calls are only needed for citations/references
674 logger.info(
675 f"Getting content for {len(relevant_items)} Semantic Scholar papers"
676 )
678 results = []
679 for item in relevant_items:
680 result = item.copy()
681 paper_id = item.get("_paper_id", "")
683 # Skip if no paper ID
684 if not paper_id:
685 results.append(result)
686 continue
688 # Get paper details if citations or references are requested
689 if self.get_citations or self.get_references or self.get_embeddings:
690 paper_details = self._get_paper_details(paper_id)
692 if paper_details:
693 # Add citation information
694 if self.get_citations and "citations" in paper_details:
695 result["citations"] = paper_details["citations"]
697 # Add reference information
698 if self.get_references and "references" in paper_details:
699 result["references"] = paper_details["references"]
701 # Add embedding if available
702 if self.get_embeddings and "embedding" in paper_details:
703 result["embedding"] = paper_details["embedding"]
705 # Add fields of study
706 if "fieldsOfStudy" in paper_details:
707 result["fields_of_study"] = paper_details[
708 "fieldsOfStudy"
709 ]
711 # Promote useful fields from _full_paper to top level before
712 # dropping the raw paper (consistent with NASA ADS/OpenAlex which
713 # expose citations/journal_ref at the top level).
714 full_paper = result.get("_full_paper") or {}
715 if ( 715 ↛ 719line 715 didn't jump to line 719 because the condition on line 715 was never true
716 full_paper.get("citationCount") is not None
717 and "citations" not in result
718 ):
719 result["citations"] = full_paper.get("citationCount")
721 # Remove temporary fields
722 if "_paper_id" in result: 722 ↛ 724line 722 didn't jump to line 724 because the condition on line 722 was always true
723 del result["_paper_id"]
724 if "_search_strategy" in result: 724 ↛ 726line 724 didn't jump to line 726 because the condition on line 724 was always true
725 del result["_search_strategy"]
726 if "_full_paper" in result: 726 ↛ 729line 726 didn't jump to line 729 because the condition on line 726 was always true
727 del result["_full_paper"]
729 results.append(result)
731 return results