Coverage for src / local_deep_research / web_search_engines / engines / search_engine_openalex.py: 97%

159 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""OpenAlex search engine implementation for academic papers and research.""" 

2 

3from typing import Any, Dict, List, Optional 

4 

5from langchain_core.language_models import BaseLLM 

6from loguru import logger 

7 

8from ...constants import SNIPPET_LENGTH_LONG 

9from ...advanced_search_system.filters.journal_reputation_filter import ( 

10 JournalReputationFilter, 

11) 

12from ...security.safe_requests import safe_get 

13from ..rate_limiting import RateLimitError 

14from ..search_engine_base import BaseSearchEngine 

15 

16 

17class OpenAlexSearchEngine(BaseSearchEngine): 

18 """OpenAlex search engine implementation with natural language query support.""" 

19 

20 # Mark as public search engine 

21 is_public = True 

22 # Scientific/academic search engine 

23 is_scientific = True 

24 is_lexical = True 

25 needs_llm_relevance_filter = True 

26 

27 def __init__( 

28 self, 

29 max_results: int = 25, 

30 email: Optional[str] = None, 

31 sort_by: str = "relevance", 

32 filter_open_access: bool = False, 

33 min_citations: int = 0, 

34 from_publication_date: Optional[str] = None, 

35 llm: Optional[BaseLLM] = None, 

36 max_filtered_results: Optional[int] = None, 

37 settings_snapshot: Optional[Dict[str, Any]] = None, 

38 **kwargs, 

39 ): 

40 """ 

41 Initialize the OpenAlex search engine. 

42 

43 Args: 

44 max_results: Maximum number of search results 

45 email: Email for polite pool (gets faster response) - optional 

46 sort_by: Sort order ('relevance', 'cited_by_count', 'publication_date') 

47 filter_open_access: Only return open access papers 

48 min_citations: Minimum citation count filter 

49 from_publication_date: Filter papers from this date (YYYY-MM-DD) 

50 llm: Language model for relevance filtering 

51 max_filtered_results: Maximum number of results to keep after filtering 

52 settings_snapshot: Settings snapshot for configuration 

53 **kwargs: Additional parameters to pass to parent class 

54 """ 

55 # Initialize journal reputation filter if needed 

56 content_filters = [] 

57 journal_filter = JournalReputationFilter.create_default( 

58 model=llm, # type: ignore[arg-type] 

59 engine_name="openalex", 

60 settings_snapshot=settings_snapshot, 

61 ) 

62 if journal_filter is not None: 

63 content_filters.append(journal_filter) 

64 

65 # Initialize the BaseSearchEngine 

66 super().__init__( 

67 llm=llm, 

68 max_filtered_results=max_filtered_results, 

69 max_results=max_results, 

70 content_filters=content_filters, # type: ignore[arg-type] 

71 settings_snapshot=settings_snapshot, 

72 **kwargs, 

73 ) 

74 

75 self.sort_by = sort_by 

76 self.filter_open_access = filter_open_access 

77 self.min_citations = min_citations 

78 # Only set from_publication_date if it's not empty or "False" 

79 self.from_publication_date = ( 

80 from_publication_date 

81 if from_publication_date and from_publication_date != "False" 

82 else None 

83 ) 

84 

85 # Get email from settings if not provided 

86 if not email and settings_snapshot: 

87 from ...config.search_config import get_setting_from_snapshot 

88 

89 try: 

90 email = get_setting_from_snapshot( 

91 "search.engine.web.openalex.email", 

92 settings_snapshot=settings_snapshot, 

93 ) 

94 except Exception: 

95 logger.debug( 

96 "Failed to read openalex.email from settings snapshot", 

97 exc_info=True, 

98 ) 

99 

100 # Handle "False" string for email 

101 self.email = email if email and email != "False" else None 

102 

103 # API configuration 

104 self.api_base = "https://api.openalex.org" 

105 self.headers = { 

106 "User-Agent": f"Local-Deep-Research-Agent{f' ({email})' if email else ''}", 

107 "Accept": "application/json", 

108 } 

109 

110 if email: 

111 # Email allows access to polite pool with faster response times 

112 logger.info(f"Using OpenAlex polite pool with email: {email}") 

113 else: 

114 logger.info( 

115 "Using OpenAlex without email (consider adding email for faster responses)" 

116 ) 

117 

118 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

119 """ 

120 Get preview information for OpenAlex search results. 

121 

122 Args: 

123 query: The search query (natural language supported!) 

124 

125 Returns: 

126 List of preview dictionaries 

127 """ 

128 logger.info(f"Searching OpenAlex for: {query}") 

129 

130 # Build the search URL with parameters 

131 params = { 

132 "search": query, # OpenAlex handles natural language beautifully 

133 "per_page": min(self.max_results, 200), # OpenAlex allows up to 200 

134 "page": 1, 

135 # Request specific fields including abstract for snippets 

136 "select": "id,display_name,publication_year,publication_date,doi,primary_location,authorships,cited_by_count,open_access,best_oa_location,abstract_inverted_index", 

137 } 

138 

139 # Add optional filters 

140 filters = [] 

141 

142 if self.filter_open_access: 

143 filters.append("is_oa:true") 

144 

145 if self.min_citations > 0: 

146 filters.append(f"cited_by_count:>{self.min_citations}") 

147 

148 if self.from_publication_date and self.from_publication_date != "False": 

149 filters.append( 

150 f"from_publication_date:{self.from_publication_date}" 

151 ) 

152 

153 if filters: 

154 params["filter"] = ",".join(filters) 

155 

156 # Add sorting 

157 sort_map = { 

158 "relevance": "relevance_score:desc", 

159 "cited_by_count": "cited_by_count:desc", 

160 "publication_date": "publication_date:desc", 

161 } 

162 params["sort"] = sort_map.get(self.sort_by, "relevance_score:desc") 

163 

164 # Add email to params for polite pool 

165 if self.email and self.email != "False": 

166 params["mailto"] = self.email 

167 

168 try: 

169 # Apply rate limiting before making the request (simple like PubMed) 

170 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

171 self.engine_type 

172 ) 

173 logger.debug( 

174 f"Applied rate limit wait: {self._last_wait_time:.2f}s" 

175 ) 

176 

177 # Make the API request 

178 logger.info(f"Making OpenAlex API request with params: {params}") 

179 response = safe_get( 

180 f"{self.api_base}/works", 

181 params=params, 

182 headers=self.headers, 

183 timeout=30, 

184 ) 

185 logger.info(f"OpenAlex API response status: {response.status_code}") 

186 

187 # Log rate limit info if available 

188 if "x-ratelimit-remaining" in response.headers: 

189 remaining = response.headers.get("x-ratelimit-remaining") 

190 limit = response.headers.get("x-ratelimit-limit", "unknown") 

191 logger.debug( 

192 f"OpenAlex rate limit: {remaining}/{limit} requests remaining" 

193 ) 

194 

195 if response.status_code == 200: 

196 data = response.json() 

197 results = data.get("results", []) 

198 meta = data.get("meta", {}) 

199 total_count = meta.get("count", 0) 

200 

201 logger.info( 

202 f"OpenAlex returned {len(results)} results (total available: {total_count:,})" 

203 ) 

204 

205 # Log first result structure for debugging 

206 if results: 

207 first_result = results[0] 

208 logger.debug( 

209 f"First result keys: {list(first_result.keys())}" 

210 ) 

211 logger.debug( 

212 f"First result has abstract: {'abstract_inverted_index' in first_result}" 

213 ) 

214 if "open_access" in first_result: 

215 logger.debug( 

216 f"Open access structure: {first_result['open_access']}" 

217 ) 

218 

219 # Format results as previews 

220 previews = [] 

221 for i, work in enumerate(results): 

222 logger.debug( 

223 f"Formatting work {i + 1}/{len(results)}: {work.get('display_name', 'Unknown')[:50]}" 

224 ) 

225 preview = self._format_work_preview(work) 

226 if preview: 

227 previews.append(preview) 

228 logger.debug( 

229 f"Preview created with snippet: {preview.get('snippet', '')[:100]}..." 

230 ) 

231 else: 

232 logger.warning(f"Failed to format work {i + 1}") 

233 

234 logger.info( 

235 f"Successfully formatted {len(previews)} previews from {len(results)} results" 

236 ) 

237 return previews 

238 

239 if response.status_code == 429: 

240 # Rate limited (very rare with OpenAlex) 

241 logger.warning("OpenAlex rate limit reached") 

242 raise RateLimitError("OpenAlex rate limit exceeded") # noqa: TRY301 — re-raised by except RateLimitError for base class retry 

243 

244 logger.error( 

245 f"OpenAlex API error: {response.status_code} - {response.text[:200]}" 

246 ) 

247 return [] 

248 

249 except RateLimitError: 

250 # Re-raise rate limit errors for base class retry handling 

251 raise 

252 except Exception: 

253 logger.exception("Error searching OpenAlex") 

254 return [] 

255 

256 def _format_work_preview( 

257 self, work: Dict[str, Any] 

258 ) -> Optional[Dict[str, Any]]: 

259 """ 

260 Format an OpenAlex work as a preview dictionary. 

261 

262 Args: 

263 work: OpenAlex work object 

264 

265 Returns: 

266 Formatted preview dictionary or None if formatting fails 

267 """ 

268 try: 

269 # Extract basic information 

270 work_id = work.get("id", "") 

271 title = work.get("display_name", "No title") 

272 logger.debug(f"Formatting work: {title[:50]}") 

273 

274 # Build snippet from abstract or first part of title 

275 abstract = None 

276 if work.get("abstract_inverted_index"): 

277 logger.debug( 

278 f"Found abstract_inverted_index with {len(work['abstract_inverted_index'])} words" 

279 ) 

280 # Reconstruct abstract from inverted index 

281 abstract = self._reconstruct_abstract( 

282 work["abstract_inverted_index"] 

283 ) 

284 logger.debug( 

285 f"Reconstructed abstract length: {len(abstract) if abstract else 0}" 

286 ) 

287 else: 

288 logger.debug("No abstract_inverted_index found") 

289 

290 snippet = ( 

291 abstract[:SNIPPET_LENGTH_LONG] 

292 if abstract 

293 else f"Academic paper: {title}" 

294 ) 

295 logger.debug(f"Created snippet: {snippet[:100]}...") 

296 

297 # Get publication info 

298 publication_year = work.get("publication_year", "unknown") 

299 publication_date = work.get("publication_date", "unknown") 

300 

301 # Get venue/journal info 

302 venue = work.get("primary_location", {}) 

303 journal_name = "unknown" 

304 if venue: 

305 source = venue.get("source", {}) 

306 if source: 

307 journal_name = source.get("display_name", "unknown") 

308 

309 # Get authors 

310 authors = [] 

311 for authorship in work.get("authorships", [])[ 

312 :5 

313 ]: # Limit to 5 authors 

314 author = authorship.get("author", {}) 

315 if author: 315 ↛ 311line 315 didn't jump to line 311 because the condition on line 315 was always true

316 authors.append(author.get("display_name", "")) 

317 

318 authors_str = ", ".join(authors) 

319 if len(work.get("authorships", [])) > 5: 

320 authors_str += " et al." 

321 

322 # Get metrics 

323 cited_by_count = work.get("cited_by_count", 0) 

324 

325 # Get URL - prefer DOI, fallback to OpenAlex URL 

326 url = work.get("doi", work_id) 

327 if not url.startswith("http"): 

328 if url.startswith("https://doi.org/"): 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true

329 pass # Already a full DOI URL 

330 elif url.startswith("10."): 

331 url = f"https://doi.org/{url}" 

332 else: 

333 url = work_id # OpenAlex URL 

334 

335 # Check if open access 

336 open_access_info = work.get("open_access", {}) 

337 is_oa = ( 

338 open_access_info.get("is_oa", False) 

339 if open_access_info 

340 else False 

341 ) 

342 oa_url = None 

343 if is_oa: 

344 best_location = work.get("best_oa_location", {}) 

345 if best_location: 345 ↛ 350line 345 didn't jump to line 350 because the condition on line 345 was always true

346 oa_url = best_location.get("pdf_url") or best_location.get( 

347 "landing_page_url" 

348 ) 

349 

350 return { 

351 "id": work_id, 

352 "title": title, 

353 "link": url, 

354 "snippet": snippet, 

355 "authors": authors_str, 

356 "year": publication_year, 

357 "date": publication_date, 

358 "journal": journal_name, 

359 "citations": cited_by_count, 

360 "is_open_access": is_oa, 

361 "oa_url": oa_url, 

362 "abstract": abstract, 

363 "type": "academic_paper", 

364 } 

365 

366 except Exception: 

367 logger.exception( 

368 f"Error formatting OpenAlex work: {work.get('id', 'unknown')}" 

369 ) 

370 return None 

371 

372 def _reconstruct_abstract( 

373 self, inverted_index: Dict[str, List[int]] 

374 ) -> str: 

375 """ 

376 Reconstruct abstract text from OpenAlex inverted index format. 

377 

378 Args: 

379 inverted_index: Dictionary mapping words to their positions 

380 

381 Returns: 

382 Reconstructed abstract text 

383 """ 

384 try: 

385 # Create position-word mapping 

386 position_word = {} 

387 for word, positions in inverted_index.items(): 

388 for pos in positions: 

389 position_word[pos] = word 

390 

391 # Sort by position and reconstruct 

392 sorted_positions = sorted(position_word.keys()) 

393 words = [position_word[pos] for pos in sorted_positions] 

394 

395 return " ".join(words) 

396 

397 except Exception: 

398 logger.debug("Could not reconstruct abstract from inverted index") 

399 return "" 

400 

401 def _get_full_content( 

402 self, relevant_items: List[Dict[str, Any]] 

403 ) -> List[Dict[str, Any]]: 

404 """ 

405 Get full content for relevant items (OpenAlex provides most content in preview). 

406 

407 Args: 

408 relevant_items: List of relevant preview dictionaries 

409 

410 Returns: 

411 List of result dictionaries with full content 

412 """ 

413 # OpenAlex returns comprehensive data in the initial search, 

414 # so we don't need a separate full content fetch 

415 results = [] 

416 for item in relevant_items: 

417 result = { 

418 "title": item.get("title", ""), 

419 "link": item.get("link", ""), 

420 "snippet": item.get("snippet", ""), 

421 "content": item.get("abstract", item.get("snippet", "")), 

422 "metadata": { 

423 "authors": item.get("authors", ""), 

424 "year": item.get("year", ""), 

425 "journal": item.get("journal", ""), 

426 "citations": item.get("citations", 0), 

427 "is_open_access": item.get("is_open_access", False), 

428 "oa_url": item.get("oa_url"), 

429 }, 

430 } 

431 results.append(result) 

432 

433 return results