Coverage for src/local_deep_research/web_search_engines/engines/search_engine_openalex.py: 93%

178 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""OpenAlex search engine implementation for academic papers and research.""" 

2 

3from typing import Any, Dict, List, Optional 

4 

5from langchain_core.language_models import BaseLLM 

6from loguru import logger 

7 

8from ...constants import SNIPPET_LENGTH_LONG, USER_AGENT 

9from ...advanced_search_system.filters.journal_reputation_filter import ( 

10 JournalReputationFilter, 

11) 

12from ...security.safe_requests import safe_get 

13from ..rate_limiting import RateLimitError 

14from ..search_engine_base import BaseSearchEngine 

15 

16 

17class OpenAlexSearchEngine(BaseSearchEngine): 

18 """OpenAlex search engine implementation with natural language query support.""" 

19 

20 # Mark as public search engine 

21 is_public = True 

22 # Scientific/academic search engine 

23 is_scientific = True 

24 is_lexical = True 

25 needs_llm_relevance_filter = True 

26 

27 def __init__( 

28 self, 

29 max_results: int = 25, 

30 email: Optional[str] = None, 

31 sort_by: str = "relevance", 

32 filter_open_access: bool = False, 

33 min_citations: int = 0, 

34 from_publication_date: Optional[str] = None, 

35 llm: Optional[BaseLLM] = None, 

36 max_filtered_results: Optional[int] = None, 

37 settings_snapshot: Optional[Dict[str, Any]] = None, 

38 **kwargs, 

39 ): 

40 """ 

41 Initialize the OpenAlex search engine. 

42 

43 Args: 

44 max_results: Maximum number of search results 

45 email: Email for polite pool (gets faster response) - optional 

46 sort_by: Sort order ('relevance', 'cited_by_count', 'publication_date') 

47 filter_open_access: Only return open access papers 

48 min_citations: Minimum citation count filter 

49 from_publication_date: Filter papers from this date (YYYY-MM-DD) 

50 llm: Language model for relevance filtering 

51 max_filtered_results: Maximum number of results to keep after filtering 

52 settings_snapshot: Settings snapshot for configuration 

53 **kwargs: Additional parameters to pass to parent class 

54 """ 

55 # Journal filter runs before LLM relevance (Tiers 1-3 are instant) 

56 preview_filters = [] 

57 journal_filter = JournalReputationFilter.create_default( 

58 model=llm, # type: ignore[arg-type] 

59 engine_name="openalex", 

60 settings_snapshot=settings_snapshot, 

61 ) 

62 if journal_filter is not None: 

63 preview_filters.append(journal_filter) 

64 

65 super().__init__( 

66 llm=llm, 

67 max_filtered_results=max_filtered_results, 

68 max_results=max_results, 

69 preview_filters=preview_filters, # type: ignore[arg-type] 

70 settings_snapshot=settings_snapshot, 

71 **kwargs, 

72 ) 

73 

74 self.sort_by = sort_by 

75 self.filter_open_access = filter_open_access 

76 self.min_citations = min_citations 

77 # Only set from_publication_date if it's not empty or "False" 

78 self.from_publication_date = ( 

79 from_publication_date 

80 if from_publication_date and from_publication_date != "False" 

81 else None 

82 ) 

83 

84 # Get email from settings if not provided 

85 if not email and settings_snapshot: 

86 from ...config.search_config import get_setting_from_snapshot 

87 

88 try: 

89 email = get_setting_from_snapshot( 

90 "search.engine.web.openalex.email", 

91 settings_snapshot=settings_snapshot, 

92 ) 

93 except Exception: 

94 logger.debug( 

95 "Failed to read openalex.email from settings snapshot", 

96 exc_info=True, 

97 ) 

98 

99 # Handle "False" string for email 

100 self.email = email if email and email != "False" else None 

101 

102 # API configuration 

103 self.api_base = "https://api.openalex.org" 

104 self.headers = { 

105 "User-Agent": f"{USER_AGENT} ({email})" if email else USER_AGENT, 

106 "Accept": "application/json", 

107 } 

108 

109 if email: 

110 # Email allows access to polite pool with faster response times 

111 logger.info(f"Using OpenAlex polite pool with email: {email}") 

112 else: 

113 logger.info( 

114 "Using OpenAlex without email (consider adding email for faster responses)" 

115 ) 

116 

117 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

118 """ 

119 Get preview information for OpenAlex search results. 

120 

121 Args: 

122 query: The search query (natural language supported!) 

123 

124 Returns: 

125 List of preview dictionaries 

126 """ 

127 logger.info(f"Searching OpenAlex for: {query}") 

128 

129 # Build the search URL with parameters 

130 params = { 

131 "search": query, # OpenAlex handles natural language beautifully 

132 "per_page": min(self.max_results, 200), # OpenAlex allows up to 200 

133 "page": 1, 

134 # Request specific fields including abstract for snippets 

135 "select": "id,display_name,publication_year,publication_date,doi,primary_location,authorships,cited_by_count,open_access,best_oa_location,abstract_inverted_index", 

136 } 

137 

138 # Add optional filters 

139 filters = [] 

140 

141 if self.filter_open_access: 

142 filters.append("is_oa:true") 

143 

144 if self.min_citations > 0: 

145 filters.append(f"cited_by_count:>{self.min_citations}") 

146 

147 if self.from_publication_date and self.from_publication_date != "False": 

148 filters.append( 

149 f"from_publication_date:{self.from_publication_date}" 

150 ) 

151 

152 if filters: 

153 params["filter"] = ",".join(filters) 

154 

155 # Add sorting 

156 sort_map = { 

157 "relevance": "relevance_score:desc", 

158 "cited_by_count": "cited_by_count:desc", 

159 "publication_date": "publication_date:desc", 

160 } 

161 params["sort"] = sort_map.get(self.sort_by, "relevance_score:desc") 

162 

163 # Add email to params for polite pool 

164 if self.email and self.email != "False": 

165 params["mailto"] = self.email 

166 

167 try: 

168 # Apply rate limiting before making the request (simple like PubMed) 

169 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

170 self.engine_type 

171 ) 

172 logger.debug( 

173 f"Applied rate limit wait: {self._last_wait_time:.2f}s" 

174 ) 

175 

176 # Make the API request 

177 logger.info(f"Making OpenAlex API request with params: {params}") 

178 response = safe_get( 

179 f"{self.api_base}/works", 

180 params=params, 

181 headers=self.headers, 

182 timeout=30, 

183 ) 

184 logger.info(f"OpenAlex API response status: {response.status_code}") 

185 

186 # Log rate limit info if available 

187 if "x-ratelimit-remaining" in response.headers: 

188 remaining = response.headers.get("x-ratelimit-remaining") 

189 limit = response.headers.get("x-ratelimit-limit", "unknown") 

190 logger.debug( 

191 f"OpenAlex rate limit: {remaining}/{limit} requests remaining" 

192 ) 

193 

194 if response.status_code == 200: 

195 data = response.json() 

196 results = data.get("results", []) 

197 meta = data.get("meta", {}) 

198 total_count = meta.get("count", 0) 

199 

200 logger.info( 

201 f"OpenAlex returned {len(results)} results (total available: {total_count:,})" 

202 ) 

203 

204 # Log first result structure for debugging 

205 if results: 

206 first_result = results[0] 

207 logger.debug( 

208 f"First result keys: {list(first_result.keys())}" 

209 ) 

210 logger.debug( 

211 f"First result has abstract: {'abstract_inverted_index' in first_result}" 

212 ) 

213 if "open_access" in first_result: 

214 logger.debug( 

215 f"Open access structure: {first_result['open_access']}" 

216 ) 

217 

218 # Format results as previews 

219 previews = [] 

220 for i, work in enumerate(results): 

221 logger.debug( 

222 f"Formatting work {i + 1}/{len(results)}: {(work.get('display_name') or 'Unknown')[:50]}" 

223 ) 

224 preview = self._format_work_preview(work) 

225 if preview: 

226 previews.append(preview) 

227 logger.debug( 

228 f"Preview created with snippet: {preview.get('snippet', '')[:100]}..." 

229 ) 

230 else: 

231 logger.warning(f"Failed to format work {i + 1}") 

232 

233 logger.info( 

234 f"Successfully formatted {len(previews)} previews from {len(results)} results" 

235 ) 

236 return previews 

237 

238 if response.status_code == 429: 

239 # Rate limited (very rare with OpenAlex) 

240 logger.warning("OpenAlex rate limit reached") 

241 raise RateLimitError("OpenAlex rate limit exceeded") # noqa: TRY301 — re-raised by except RateLimitError for base class retry 

242 

243 logger.error( 

244 f"OpenAlex API error: {response.status_code} - {response.text[:200]}" 

245 ) 

246 return [] 

247 

248 except RateLimitError: 

249 # Re-raise rate limit errors for base class retry handling 

250 raise 

251 except Exception: 

252 logger.exception("Error searching OpenAlex") 

253 return [] 

254 

255 def _format_work_preview( 

256 self, work: Dict[str, Any] 

257 ) -> Optional[Dict[str, Any]]: 

258 """ 

259 Format an OpenAlex work as a preview dictionary. 

260 

261 Args: 

262 work: OpenAlex work object 

263 

264 Returns: 

265 Formatted preview dictionary or None if formatting fails 

266 """ 

267 try: 

268 # Extract basic information 

269 # Use `or` instead of dict.get default — OpenAlex routinely 

270 # returns these keys with explicit None values, which would 

271 # bypass the default and crash on downstream string ops. 

272 work_id = work.get("id") or "" 

273 title = work.get("display_name") or "No title" 

274 logger.debug(f"Formatting work: {title[:50]}") 

275 

276 # Build snippet from abstract or first part of title 

277 abstract = None 

278 if work.get("abstract_inverted_index"): 

279 logger.debug( 

280 f"Found abstract_inverted_index with {len(work['abstract_inverted_index'])} words" 

281 ) 

282 # Reconstruct abstract from inverted index 

283 abstract = self._reconstruct_abstract( 

284 work["abstract_inverted_index"] 

285 ) 

286 logger.debug( 

287 f"Reconstructed abstract length: {len(abstract) if abstract else 0}" 

288 ) 

289 else: 

290 logger.debug("No abstract_inverted_index found") 

291 

292 snippet = ( 

293 abstract[:SNIPPET_LENGTH_LONG] 

294 if abstract 

295 else f"Academic paper: {title}" 

296 ) 

297 logger.debug(f"Created snippet: {snippet[:100]}...") 

298 

299 # Get publication info 

300 publication_year = work.get("publication_year", "unknown") 

301 publication_date = work.get("publication_date", "unknown") 

302 

303 # Get venue/journal info 

304 venue = work.get("primary_location", {}) 

305 journal_name = "unknown" 

306 openalex_source_id = None 

307 source_type = None 

308 issn = None 

309 if venue: 

310 source = venue.get("source", {}) 

311 if source: 

312 journal_name = source.get("display_name") or "unknown" 

313 # Extract source ID for journal quality lookups 

314 raw_sid = source.get("id") or "" 

315 if raw_sid: 

316 openalex_source_id = raw_sid.split("/")[-1] 

317 source_type = source.get("type") 

318 # Forward the linking ISSN so the reputation filter's 

319 # Tier 2/3 lookups can use it instead of falling back 

320 # to fuzzy name matching. 

321 issn = source.get("issn_l") or None 

322 

323 # Get authors 

324 authors = [] 

325 for authorship in work.get("authorships", [])[ 

326 :5 

327 ]: # Limit to 5 authors 

328 author = authorship.get("author", {}) 

329 if author: 329 ↛ 325line 329 didn't jump to line 325 because the condition on line 329 was always true

330 authors.append(author.get("display_name", "")) 

331 

332 authors_str = ", ".join(authors) 

333 if len(work.get("authorships", [])) > 5: 

334 authors_str += " et al." 

335 

336 # Extract author affiliations for the institution-tier scoring. 

337 # Each entry is a dict with the OpenAlex institution id, ROR id, 

338 # and display name — the lookup_institution() helper accepts any 

339 # of those three. 

340 affiliations: list[dict] = [] 

341 seen_inst_ids: set[str] = set() 

342 for authorship in work.get("authorships", []): 

343 for inst in authorship.get("institutions", []) or []: 343 ↛ 344line 343 didn't jump to line 344 because the loop on line 343 never started

344 raw_id = inst.get("id") or "" 

345 short_id = raw_id.split("/")[-1] if raw_id else "" 

346 if short_id and short_id in seen_inst_ids: 

347 continue 

348 if short_id: 

349 seen_inst_ids.add(short_id) 

350 affiliations.append( 

351 { 

352 "openalex_id": short_id or None, 

353 "ror": (inst.get("ror") or "") 

354 .rstrip("/") 

355 .split("/")[-1] 

356 or None, 

357 "name": inst.get("display_name"), 

358 } 

359 ) 

360 

361 # Get metrics 

362 cited_by_count = work.get("cited_by_count", 0) 

363 

364 # Get URL - prefer DOI, fallback to OpenAlex URL. 

365 # `.get("doi", work_id)` is wrong: when the key exists with value 

366 # None (common for non-DOI works) it returns None, not the 

367 # default. Use `or` so a None DOI falls through to work_id. 

368 url = work.get("doi") or work_id 

369 if not url.startswith("http"): 

370 if url.startswith("https://doi.org/"): 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true

371 pass # Already a full DOI URL 

372 elif url.startswith("10."): 

373 url = f"https://doi.org/{url}" 

374 else: 

375 url = work_id # OpenAlex URL 

376 

377 # Check if open access 

378 open_access_info = work.get("open_access", {}) 

379 is_oa = ( 

380 open_access_info.get("is_oa", False) 

381 if open_access_info 

382 else False 

383 ) 

384 oa_url = None 

385 if is_oa: 

386 best_location = work.get("best_oa_location", {}) 

387 if best_location: 387 ↛ 392line 387 didn't jump to line 392 because the condition on line 387 was always true

388 oa_url = best_location.get("pdf_url") or best_location.get( 

389 "landing_page_url" 

390 ) 

391 

392 return { 

393 "id": work_id, 

394 "title": title, 

395 "link": url, 

396 "snippet": snippet, 

397 "authors": authors_str, 

398 "year": publication_year, 

399 "date": publication_date, 

400 # Both fields emit None (not the "unknown" sentinel) when 

401 # OpenAlex has no venue for this work. Downstream consumers 

402 # (citation normalizer, journal reputation filter) treat 

403 # missing venue as "no scoring signal", which is accurate; 

404 # the old "unknown" sentinel leaked through the normalizer 

405 # as a literal container_title and even matched a real 

406 # OpenAlex source named "unknown" (h_index=5, Q1) in the 

407 # reference DB. 

408 "journal": journal_name if journal_name != "unknown" else None, 

409 "journal_ref": journal_name 

410 if journal_name != "unknown" 

411 else None, 

412 "issn": issn, 

413 "affiliations": affiliations or None, 

414 "openalex_source_id": openalex_source_id, 

415 "source_type": source_type, 

416 "citations": cited_by_count, 

417 "is_open_access": is_oa, 

418 "oa_url": oa_url, 

419 "abstract": abstract, 

420 "type": "academic_paper", 

421 } 

422 

423 except Exception: 

424 logger.exception( 

425 f"Error formatting OpenAlex work: {work.get('id', 'unknown')}" 

426 ) 

427 return None 

428 

429 def _reconstruct_abstract( 

430 self, inverted_index: Dict[str, List[int]] 

431 ) -> str: 

432 """ 

433 Reconstruct abstract text from OpenAlex inverted index format. 

434 

435 Args: 

436 inverted_index: Dictionary mapping words to their positions 

437 

438 Returns: 

439 Reconstructed abstract text 

440 """ 

441 try: 

442 # Create position-word mapping 

443 position_word = {} 

444 for word, positions in inverted_index.items(): 

445 for pos in positions: 

446 position_word[pos] = word 

447 

448 # Sort by position and reconstruct 

449 sorted_positions = sorted(position_word.keys()) 

450 words = [position_word[pos] for pos in sorted_positions] 

451 

452 return " ".join(words) 

453 

454 except Exception: 

455 logger.debug("Could not reconstruct abstract from inverted index") 

456 return "" 

457 

458 def _get_full_content( 

459 self, relevant_items: List[Dict[str, Any]] 

460 ) -> List[Dict[str, Any]]: 

461 """ 

462 Get full content for relevant items (OpenAlex provides most content in preview). 

463 

464 Args: 

465 relevant_items: List of relevant preview dictionaries 

466 

467 Returns: 

468 List of result dictionaries with full content 

469 """ 

470 # OpenAlex returns comprehensive data in the initial search, 

471 # so we don't need a separate full content fetch 

472 results = [] 

473 for item in relevant_items: 

474 result = { 

475 "title": item.get("title", ""), 

476 "link": item.get("link", ""), 

477 "snippet": item.get("snippet", ""), 

478 "content": item.get("abstract", item.get("snippet", "")), 

479 # Forward journal quality fields for content filters 

480 "journal_ref": item.get("journal_ref"), 

481 "openalex_source_id": item.get("openalex_source_id"), 

482 "source_type": item.get("source_type"), 

483 "affiliations": item.get("affiliations"), 

484 "metadata": { 

485 "authors": item.get("authors", ""), 

486 "year": item.get("year", ""), 

487 "journal": item.get("journal", ""), 

488 "citations": item.get("citations", 0), 

489 "is_open_access": item.get("is_open_access", False), 

490 "oa_url": item.get("oa_url"), 

491 "affiliations": item.get("affiliations"), 

492 }, 

493 } 

494 results.append(result) 

495 

496 return results