Coverage for src/local_deep_research/web_search_engines/engines/search_engine_semantic_scholar.py: 97%

276 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import re 

2from typing import Any, Dict, List, Optional, Tuple 

3 

4import requests 

5from langchain_core.language_models import BaseLLM 

6from loguru import logger 

7from requests.adapters import HTTPAdapter 

8from urllib3.util import Retry 

9 

10from ...constants import SNIPPET_LENGTH_SHORT 

11from ..rate_limiting import RateLimitError 

12from ..search_engine_base import BaseSearchEngine 

13from ...security import SafeSession 

14 

15 

16class SemanticScholarSearchEngine(BaseSearchEngine): 

17 """ 

18 Semantic Scholar search engine implementation with two-phase approach. 

19 Provides efficient access to scientific literature across all fields. 

20 """ 

21 

22 # Mark as public search engine 

23 is_public = True 

24 # Scientific/academic search engine 

25 is_scientific = True 

26 is_lexical = True 

27 needs_llm_relevance_filter = True 

28 

29 def __init__( 

30 self, 

31 max_results: int = 10, 

32 api_key: Optional[str] = None, 

33 year_range: Optional[Tuple[int, int]] = None, 

34 get_abstracts: bool = True, 

35 get_references: bool = False, 

36 get_citations: bool = False, 

37 get_embeddings: bool = False, 

38 get_tldr: bool = True, 

39 citation_limit: int = 10, 

40 reference_limit: int = 10, 

41 llm: Optional[BaseLLM] = None, 

42 max_filtered_results: Optional[int] = None, 

43 optimize_queries: bool = True, 

44 max_retries: int = 5, 

45 retry_backoff_factor: float = 1.0, 

46 fields_of_study: Optional[List[str]] = None, 

47 publication_types: Optional[List[str]] = None, 

48 settings_snapshot: Optional[Dict[str, Any]] = None, 

49 **kwargs, 

50 ): 

51 """ 

52 Initialize the Semantic Scholar search engine. 

53 

54 Args: 

55 max_results: Maximum number of search results 

56 api_key: Semantic Scholar API key for higher rate limits (optional) 

57 year_range: Optional tuple of (start_year, end_year) to filter results 

58 get_abstracts: Whether to fetch abstracts for all results 

59 get_references: Whether to fetch references for papers 

60 get_citations: Whether to fetch citations for papers 

61 get_embeddings: Whether to fetch SPECTER embeddings for papers 

62 get_tldr: Whether to fetch TLDR summaries for papers 

63 citation_limit: Maximum number of citations to fetch per paper 

64 reference_limit: Maximum number of references to fetch per paper 

65 llm: Language model for relevance filtering 

66 max_filtered_results: Maximum number of results to keep after filtering 

67 optimize_queries: Whether to optimize natural language queries 

68 max_retries: Maximum number of retries for API requests 

69 retry_backoff_factor: Backoff factor for retries 

70 fields_of_study: List of fields of study to filter results 

71 publication_types: List of publication types to filter results 

72 settings_snapshot: Settings snapshot for configuration 

73 **kwargs: Additional parameters to pass to parent class 

74 """ 

75 # Initialize journal reputation filter 

76 from ...advanced_search_system.filters.journal_reputation_filter import ( 

77 JournalReputationFilter, 

78 ) 

79 

80 # Journal filter runs before LLM relevance (Tiers 1-3 are instant) 

81 preview_filters = [] 

82 journal_filter = JournalReputationFilter.create_default( 

83 model=llm, # type: ignore[arg-type] 

84 engine_name="semantic_scholar", 

85 settings_snapshot=settings_snapshot, 

86 ) 

87 if journal_filter is not None: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 preview_filters.append(journal_filter) 

89 

90 super().__init__( 

91 llm=llm, 

92 max_filtered_results=max_filtered_results, 

93 max_results=max_results, 

94 preview_filters=preview_filters, # type: ignore[arg-type] 

95 settings_snapshot=settings_snapshot, 

96 **kwargs, 

97 ) 

98 

99 # Get API key from settings if not provided 

100 if not api_key and settings_snapshot: 

101 from ...config.search_config import get_setting_from_snapshot 

102 

103 try: 

104 api_key = get_setting_from_snapshot( 

105 "search.engine.web.semantic_scholar.api_key", 

106 settings_snapshot=settings_snapshot, 

107 ) 

108 except Exception: 

109 logger.debug( 

110 "Failed to read semantic_scholar.api_key from settings snapshot", 

111 exc_info=True, 

112 ) 

113 

114 self.api_key = api_key 

115 self.year_range = year_range 

116 self.get_abstracts = get_abstracts 

117 self.get_references = get_references 

118 self.get_citations = get_citations 

119 self.get_embeddings = get_embeddings 

120 self.get_tldr = get_tldr 

121 self.citation_limit = citation_limit 

122 self.reference_limit = reference_limit 

123 self.optimize_queries = optimize_queries 

124 self.max_retries = max_retries 

125 self.retry_backoff_factor = retry_backoff_factor 

126 self.fields_of_study = ( 

127 self._ensure_list(fields_of_study) 

128 if fields_of_study is not None 

129 else None 

130 ) 

131 self.publication_types = ( 

132 self._ensure_list(publication_types) 

133 if publication_types is not None 

134 else None 

135 ) 

136 

137 # Base API URLs 

138 self.base_url = "https://api.semanticscholar.org/graph/v1" 

139 self.paper_search_url = f"{self.base_url}/paper/search" 

140 self.paper_details_url = f"{self.base_url}/paper" 

141 

142 # Create a session with retry capabilities 

143 self.session: SafeSession | None = self._create_session() 

144 

145 # Log API key status 

146 if self.api_key: 

147 logger.info( 

148 "Using Semantic Scholar with API key (higher rate limits)" 

149 ) 

150 else: 

151 logger.info( 

152 "Using Semantic Scholar without API key (lower rate limits)" 

153 ) 

154 

155 def _create_session(self) -> SafeSession: 

156 """Create and configure a requests session with retry capabilities""" 

157 session = SafeSession() 

158 

159 # Configure automatic retries with exponential backoff 

160 retry_strategy = Retry( 

161 total=self.max_retries, 

162 backoff_factor=self.retry_backoff_factor, 

163 status_forcelist=[429, 500, 502, 503, 504], 

164 allowed_methods={"HEAD", "GET", "POST", "OPTIONS"}, 

165 ) 

166 

167 adapter = HTTPAdapter(max_retries=retry_strategy) 

168 session.mount("https://", adapter) 

169 

170 # Set up headers 

171 headers = {"Accept": "application/json"} 

172 if self.api_key: 

173 headers["x-api-key"] = self.api_key 

174 

175 session.headers.update(headers) 

176 

177 return session 

178 

179 def close(self): 

180 """ 

181 Close the HTTP session and clean up resources. 

182 

183 Call this method when done using the search engine to prevent 

184 connection/file descriptor leaks. 

185 """ 

186 if hasattr(self, "session") and self.session: 

187 try: 

188 self.session.close() 

189 except Exception: 

190 logger.exception("Error closing SemanticScholar session") 

191 finally: 

192 self.session = None 

193 # Close content filters (JournalReputationFilter) via parent 

194 super().close() 

195 

196 def __del__(self): 

197 """Destructor to ensure session is closed.""" 

198 self.close() 

199 

200 def __enter__(self): 

201 """Context manager entry.""" 

202 return self 

203 

204 def __exit__(self, exc_type, exc_val, exc_tb): 

205 """Context manager exit - ensures session cleanup.""" 

206 self.close() 

207 return False 

208 

209 def _respect_rate_limit(self): 

210 """Apply rate limiting between requests""" 

211 # Apply rate limiting before request 

212 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

213 self.engine_type 

214 ) 

215 logger.debug(f"Applied rate limit wait: {self._last_wait_time:.2f}s") 

216 

217 def _make_request( 

218 self, 

219 url: str, 

220 params: Optional[Dict] = None, 

221 data: Optional[Dict] = None, 

222 method: str = "GET", 

223 ) -> Dict: 

224 """ 

225 Make a request to the Semantic Scholar API. 

226 

227 Args: 

228 url: API endpoint URL 

229 params: Query parameters 

230 data: JSON data for POST requests 

231 method: HTTP method (GET or POST) 

232 

233 Returns: 

234 API response as dictionary 

235 """ 

236 self._respect_rate_limit() 

237 

238 try: 

239 if self.session is None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 raise RuntimeError("Session is not initialized") 

241 if method.upper() == "GET": 

242 response = self.session.get(url, params=params, timeout=30) 

243 elif method.upper() == "POST": 

244 response = self.session.post( 

245 url, params=params, json=data, timeout=30 

246 ) 

247 else: 

248 raise ValueError(f"Unsupported HTTP method: {method}") 

249 

250 # Handle rate limiting 

251 if response.status_code == 429: 

252 logger.warning("Semantic Scholar rate limit exceeded") 

253 raise RateLimitError("Semantic Scholar rate limit exceeded") 

254 

255 response.raise_for_status() 

256 return response.json() # type: ignore[no-any-return] 

257 except requests.RequestException: 

258 logger.exception("API request failed") 

259 return {} 

260 

261 def _optimize_query(self, query: str) -> str: 

262 """ 

263 Optimize a natural language query for Semantic Scholar search. 

264 If LLM is available, uses it to extract key terms and concepts. 

265 

266 Args: 

267 query: Natural language query 

268 

269 Returns: 

270 Optimized query string 

271 """ 

272 if not self.llm or not self.optimize_queries: 

273 return query 

274 

275 try: 

276 prompt = f"""Transform this natural language question into an optimized academic search query. 

277 

278Original query: "{query}" 

279 

280INSTRUCTIONS: 

2811. Extract key academic concepts, technical terms, and proper nouns 

2822. Remove generic words, filler words, and non-technical terms 

2833. Add quotation marks around specific phrases that should be kept together 

2844. Return ONLY the optimized search query with no explanation 

2855. Keep it under 100 characters if possible 

286 

287EXAMPLE TRANSFORMATIONS: 

288"What are the latest findings about mRNA vaccines and COVID-19?" → "mRNA vaccines COVID-19 recent findings" 

289"How does machine learning impact climate change prediction?" → "machine learning "climate change" prediction" 

290"Tell me about quantum computing approaches for encryption" → "quantum computing encryption" 

291 

292Return ONLY the optimized search query with no explanation. 

293""" 

294 

295 response = self.llm.invoke(prompt) 

296 optimized_query = ( 

297 str(response.content) 

298 if hasattr(response, "content") 

299 else str(response) 

300 ).strip() 

301 

302 # Clean up the query - remove any explanations 

303 lines = optimized_query.split("\n") 

304 optimized_query = lines[0].strip() 

305 

306 # Safety check - if query looks too much like an explanation, use original 

307 if len(optimized_query.split()) > 15 or ":" in optimized_query: 

308 logger.warning( 

309 "Query optimization result looks too verbose, using original" 

310 ) 

311 return query 

312 

313 logger.info(f"Original query: '{query}'") 

314 logger.info(f"Optimized for search: '{optimized_query}'") 

315 

316 return optimized_query 

317 except Exception: 

318 logger.exception("Error optimizing query") 

319 return query # Fall back to original query on error 

320 

321 def _direct_search(self, query: str) -> List[Dict[str, Any]]: 

322 """ 

323 Make a direct search request to the Semantic Scholar API. 

324 

325 Args: 

326 query: The search query 

327 

328 Returns: 

329 List of paper dictionaries 

330 """ 

331 try: 

332 # Configure fields to retrieve 

333 fields = [ 

334 "paperId", 

335 "externalIds", 

336 "url", 

337 "title", 

338 "abstract", 

339 "venue", 

340 "publicationVenue", # Structured venue with name/type/ISSN 

341 "year", 

342 "authors", 

343 "citationCount", # Add citation count for ranking 

344 "openAccessPdf", # PDF URL for open access papers 

345 ] 

346 

347 if self.get_tldr: 

348 fields.append("tldr") 

349 

350 params = { 

351 "query": query, 

352 "limit": min( 

353 self.max_results, 100 

354 ), # API limit is 100 per request 

355 "fields": ",".join(fields), 

356 } 

357 

358 # Add year filter if specified 

359 if self.year_range: 

360 start_year, end_year = self.year_range 

361 params["year"] = f"{start_year}-{end_year}" 

362 

363 # Add fields of study filter if specified 

364 if self.fields_of_study: 

365 params["fieldsOfStudy"] = ",".join(self.fields_of_study) 

366 

367 # Add publication types filter if specified 

368 if self.publication_types: 

369 params["publicationTypes"] = ",".join(self.publication_types) 

370 

371 response = self._make_request(self.paper_search_url, params) 

372 

373 if "data" in response: 

374 papers = response["data"] 

375 logger.info( 

376 f"Found {len(papers)} papers with direct search for query: '{query}'" 

377 ) 

378 return papers # type: ignore[no-any-return] 

379 logger.warning( 

380 f"No data in response for direct search query: '{query}'" 

381 ) 

382 return [] 

383 

384 except Exception: 

385 logger.exception("Error in direct search") 

386 return [] 

387 

388 def _adaptive_search(self, query: str) -> Tuple[List[Dict[str, Any]], str]: 

389 """ 

390 Perform an adaptive search that adjusts based on result volume. 

391 Uses LLM to generate better fallback queries when available. 

392 

393 Args: 

394 query: The search query 

395 

396 Returns: 

397 Tuple of (list of paper results, search strategy used) 

398 """ 

399 # Start with a standard search 

400 papers = self._direct_search(query) 

401 strategy = "standard" 

402 

403 # If no results, try different variations 

404 if not papers: 

405 # Try removing quotes to broaden search 

406 if '"' in query: 

407 unquoted_query = query.replace('"', "") 

408 logger.info( 

409 "No results with quoted terms, trying without quotes: {}", 

410 unquoted_query, 

411 ) 

412 papers = self._direct_search(unquoted_query) 

413 

414 if papers: 

415 strategy = "unquoted" 

416 return papers, strategy 

417 

418 # If LLM is available, use it to generate better fallback queries 

419 if self.llm: 

420 try: 

421 # Generate alternate search queries focusing on core concepts 

422 prompt = f"""You are helping refine a search query that returned no results. 

423 

424Original query: "{query}" 

425 

426The query might be too specific or use natural language phrasing that doesn't match academic paper keywords. 

427 

428Please provide THREE alternative search queries that: 

4291. Focus on the core academic concepts 

4302. Use precise terminology commonly found in academic papers 

4313. Break down complex queries into more searchable components 

4324. Format each as a concise keyword-focused search term (not a natural language question) 

433 

434Format each query on a new line with no numbering or explanation. Keep each query under 8 words and very focused. 

435""" 

436 # Get the LLM's response 

437 response = self.llm.invoke(prompt) 

438 

439 # Extract the alternative queries 

440 alt_queries = [] 

441 if hasattr( 

442 response, "content" 

443 ): # Handle various LLM response formats 

444 content = response.content 

445 alt_queries = [ 

446 q.strip() 

447 for q in content.strip().split("\n") 

448 if q.strip() 

449 ] 

450 elif isinstance(response, str): 450 ↛ 458line 450 didn't jump to line 458 because the condition on line 450 was always true

451 alt_queries = [ 

452 q.strip() 

453 for q in response.strip().split("\n") 

454 if q.strip() 

455 ] 

456 

457 # Try each alternative query 

458 for alt_query in alt_queries[ 

459 :3 

460 ]: # Limit to first 3 alternatives 

461 logger.info("Trying LLM-suggested query: {}", alt_query) 

462 alt_papers = self._direct_search(alt_query) 

463 

464 if alt_papers: 

465 logger.info( 

466 "Found {} papers using LLM-suggested query: {}", 

467 len(alt_papers), 

468 alt_query, 

469 ) 

470 strategy = "llm_alternative" 

471 return alt_papers, strategy 

472 except Exception: 

473 logger.exception("Error using LLM for query refinement") 

474 # Fall through to simpler strategies 

475 

476 # Fallback: Try with the longest words (likely specific terms) 

477 words = re.findall(r"\w+", query) 

478 longer_words = [word for word in words if len(word) > 6] 

479 if longer_words: 

480 # Use up to 3 of the longest words 

481 longer_words = sorted(longer_words, key=len, reverse=True)[:3] 

482 key_terms_query = " ".join(longer_words) 

483 logger.info("Trying with key terms: {}", key_terms_query) 

484 papers = self._direct_search(key_terms_query) 

485 

486 if papers: 

487 strategy = "key_terms" 

488 return papers, strategy 

489 

490 # Final fallback: Try with just the longest word 

491 if words: 491 ↛ 501line 491 didn't jump to line 501 because the condition on line 491 was always true

492 longest_word = max(words, key=len) 

493 if len(longest_word) > 5: # Only use if it's reasonably long 

494 logger.info("Trying with single key term: {}", longest_word) 

495 papers = self._direct_search(longest_word) 

496 

497 if papers: 

498 strategy = "single_term" 

499 return papers, strategy 

500 

501 return papers, strategy 

502 

503 def _get_paper_details(self, paper_id: str) -> Dict[str, Any]: 

504 """ 

505 Get detailed information about a specific paper. 

506 

507 Args: 

508 paper_id: Semantic Scholar Paper ID 

509 

510 Returns: 

511 Dictionary with paper details 

512 """ 

513 try: 

514 # Construct fields parameter 

515 fields = [ 

516 "paperId", 

517 "externalIds", 

518 "corpusId", 

519 "url", 

520 "title", 

521 "abstract", 

522 "venue", 

523 "publicationVenue", # Structured venue with name/type/ISSN 

524 "year", 

525 "authors", 

526 "fieldsOfStudy", 

527 "citationCount", # Add citation count 

528 ] 

529 

530 if self.get_tldr: 

531 fields.append("tldr") 

532 

533 if self.get_embeddings: 

534 fields.append("embedding") 

535 

536 # Add citation and reference fields if requested 

537 if self.get_citations: 

538 fields.append(f"citations.limit({self.citation_limit})") 

539 

540 if self.get_references: 

541 fields.append(f"references.limit({self.reference_limit})") 

542 

543 # Make the request 

544 url = f"{self.paper_details_url}/{paper_id}" 

545 params = {"fields": ",".join(fields)} 

546 

547 return self._make_request(url, params) 

548 

549 except Exception: 

550 logger.exception("Error getting paper details for paper") 

551 return {} 

552 

553 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

554 """ 

555 Get preview information for Semantic Scholar papers. 

556 

557 Args: 

558 query: The search query 

559 

560 Returns: 

561 List of preview dictionaries 

562 """ 

563 logger.info(f"Getting Semantic Scholar previews for query: {query}") 

564 

565 # Optimize the query if LLM is available 

566 optimized_query = self._optimize_query(query) 

567 

568 # Use the adaptive search approach 

569 papers, strategy = self._adaptive_search(optimized_query) 

570 

571 if not papers: 

572 logger.warning("No Semantic Scholar results found") 

573 return [] 

574 

575 # Format as previews 

576 previews = [] 

577 for paper in papers: 

578 try: 

579 # Format authors - ensure we have a valid list with string values 

580 authors = [] 

581 if paper.get("authors"): 

582 authors = [ 

583 author.get("name", "") 

584 for author in paper["authors"] 

585 if author and author.get("name") 

586 ] 

587 

588 # Ensure we have valid strings for all fields 

589 paper_id = paper.get("paperId", "") 

590 title = paper.get("title", "") 

591 url = paper.get("url", "") 

592 

593 # Handle abstract safely, ensuring we always have a string 

594 abstract = paper.get("abstract") 

595 snippet = "" 

596 if abstract: 

597 snippet = ( 

598 abstract[:SNIPPET_LENGTH_SHORT] + "..." 

599 if len(abstract) > SNIPPET_LENGTH_SHORT 

600 else abstract 

601 ) 

602 

603 # Prefer publicationVenue (structured, with ISSN) over 

604 # venue (plain string, often empty for many papers). 

605 pub_venue = paper.get("publicationVenue") or {} 

606 venue_name = pub_venue.get("name") or paper.get("venue", "") 

607 venue_issn = pub_venue.get("issn") 

608 

609 year = paper.get("year") 

610 external_ids = paper.get("externalIds", {}) 

611 

612 # Handle TLDR safely 

613 tldr_text = "" 

614 if paper.get("tldr") and isinstance(paper.get("tldr"), dict): 

615 tldr_text = paper.get("tldr", {}).get("text", "") 

616 

617 # Create preview with basic information, ensuring no None values 

618 preview = { 

619 "id": paper_id if paper_id else "", 

620 "title": title if title else "", 

621 "link": url if url else "", 

622 "snippet": snippet, 

623 "authors": authors, 

624 "venue": venue_name if venue_name else "", 

625 "journal_ref": venue_name if venue_name else None, 

626 "issn": venue_issn, 

627 "year": year, 

628 "external_ids": external_ids if external_ids else {}, 

629 "source": "Semantic Scholar", 

630 "_paper_id": paper_id if paper_id else "", 

631 "_search_strategy": strategy, 

632 "tldr": tldr_text, 

633 } 

634 

635 # Store the full paper object for later reference 

636 preview["_full_paper"] = paper 

637 

638 previews.append(preview) 

639 except Exception: 

640 logger.exception("Error processing paper preview") 

641 # Continue with the next paper 

642 

643 # Sort by year (newer first) if available 

644 def _year_key(p: dict[str, Any]) -> int: 

645 year = p.get("year") 

646 try: 

647 return int(year) if year is not None else 0 

648 except (TypeError, ValueError): 

649 return 0 

650 

651 previews = sorted(previews, key=_year_key, reverse=True) 

652 

653 logger.info( 

654 f"Found {len(previews)} Semantic Scholar previews using strategy: {strategy}" 

655 ) 

656 return previews 

657 

658 def _get_full_content( 

659 self, relevant_items: List[Dict[str, Any]] 

660 ) -> List[Dict[str, Any]]: 

661 """ 

662 Get full content for the relevant Semantic Scholar papers. 

663 Gets additional details like citations, references, and full metadata. 

664 

665 Args: 

666 relevant_items: List of relevant preview dictionaries 

667 

668 Returns: 

669 List of result dictionaries with full content 

670 """ 

671 # For Semantic Scholar, we already have most content from the preview 

672 # Additional API calls are only needed for citations/references 

673 

674 logger.info( 

675 f"Getting content for {len(relevant_items)} Semantic Scholar papers" 

676 ) 

677 

678 results = [] 

679 for item in relevant_items: 

680 result = item.copy() 

681 paper_id = item.get("_paper_id", "") 

682 

683 # Skip if no paper ID 

684 if not paper_id: 

685 results.append(result) 

686 continue 

687 

688 # Get paper details if citations or references are requested 

689 if self.get_citations or self.get_references or self.get_embeddings: 

690 paper_details = self._get_paper_details(paper_id) 

691 

692 if paper_details: 

693 # Add citation information 

694 if self.get_citations and "citations" in paper_details: 

695 result["citations"] = paper_details["citations"] 

696 

697 # Add reference information 

698 if self.get_references and "references" in paper_details: 

699 result["references"] = paper_details["references"] 

700 

701 # Add embedding if available 

702 if self.get_embeddings and "embedding" in paper_details: 

703 result["embedding"] = paper_details["embedding"] 

704 

705 # Add fields of study 

706 if "fieldsOfStudy" in paper_details: 

707 result["fields_of_study"] = paper_details[ 

708 "fieldsOfStudy" 

709 ] 

710 

711 # Promote useful fields from _full_paper to top level before 

712 # dropping the raw paper (consistent with NASA ADS/OpenAlex which 

713 # expose citations/journal_ref at the top level). 

714 full_paper = result.get("_full_paper") or {} 

715 if ( 715 ↛ 719line 715 didn't jump to line 719 because the condition on line 715 was never true

716 full_paper.get("citationCount") is not None 

717 and "citations" not in result 

718 ): 

719 result["citations"] = full_paper.get("citationCount") 

720 

721 # Remove temporary fields 

722 if "_paper_id" in result: 722 ↛ 724line 722 didn't jump to line 724 because the condition on line 722 was always true

723 del result["_paper_id"] 

724 if "_search_strategy" in result: 724 ↛ 726line 724 didn't jump to line 726 because the condition on line 724 was always true

725 del result["_search_strategy"] 

726 if "_full_paper" in result: 726 ↛ 729line 726 didn't jump to line 729 because the condition on line 726 was always true

727 del result["_full_paper"] 

728 

729 results.append(result) 

730 

731 return results