Coverage for src / local_deep_research / web_search_engines / engines / search_engine_openalex.py: 82%

157 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1"""OpenAlex search engine implementation for academic papers and research.""" 

2 

3from typing import Any, Dict, List, Optional 

4 

5from langchain_core.language_models import BaseLLM 

6from loguru import logger 

7 

8from ...advanced_search_system.filters.journal_reputation_filter import ( 

9 JournalReputationFilter, 

10) 

11from ...security.safe_requests import safe_get 

12from ..rate_limiting import RateLimitError 

13from ..search_engine_base import BaseSearchEngine 

14 

15 

16class OpenAlexSearchEngine(BaseSearchEngine): 

17 """OpenAlex search engine implementation with natural language query support.""" 

18 

19 # Mark as public search engine 

20 is_public = True 

21 # Scientific/academic search engine 

22 is_scientific = True 

23 

24 def __init__( 

25 self, 

26 max_results: int = 25, 

27 email: Optional[str] = None, 

28 sort_by: str = "relevance", 

29 filter_open_access: bool = False, 

30 min_citations: int = 0, 

31 from_publication_date: Optional[str] = None, 

32 llm: Optional[BaseLLM] = None, 

33 max_filtered_results: Optional[int] = None, 

34 settings_snapshot: Optional[Dict[str, Any]] = None, 

35 **kwargs, 

36 ): 

37 """ 

38 Initialize the OpenAlex search engine. 

39 

40 Args: 

41 max_results: Maximum number of search results 

42 email: Email for polite pool (gets faster response) - optional 

43 sort_by: Sort order ('relevance', 'cited_by_count', 'publication_date') 

44 filter_open_access: Only return open access papers 

45 min_citations: Minimum citation count filter 

46 from_publication_date: Filter papers from this date (YYYY-MM-DD) 

47 llm: Language model for relevance filtering 

48 max_filtered_results: Maximum number of results to keep after filtering 

49 settings_snapshot: Settings snapshot for configuration 

50 **kwargs: Additional parameters to pass to parent class 

51 """ 

52 # Initialize journal reputation filter if needed 

53 content_filters = [] 

54 journal_filter = JournalReputationFilter.create_default( 

55 model=llm, 

56 engine_name="openalex", 

57 settings_snapshot=settings_snapshot, 

58 ) 

59 if journal_filter is not None: 

60 content_filters.append(journal_filter) 

61 

62 # Initialize the BaseSearchEngine 

63 super().__init__( 

64 llm=llm, 

65 max_filtered_results=max_filtered_results, 

66 max_results=max_results, 

67 content_filters=content_filters, 

68 settings_snapshot=settings_snapshot, 

69 **kwargs, 

70 ) 

71 

72 self.sort_by = sort_by 

73 self.filter_open_access = filter_open_access 

74 self.min_citations = min_citations 

75 # Only set from_publication_date if it's not empty or "False" 

76 self.from_publication_date = ( 

77 from_publication_date 

78 if from_publication_date and from_publication_date != "False" 

79 else None 

80 ) 

81 

82 # Get email from settings if not provided 

83 if not email and settings_snapshot: 

84 from ...config.search_config import get_setting_from_snapshot 

85 

86 try: 

87 email = get_setting_from_snapshot( 

88 "search.engine.web.openalex.email", 

89 settings_snapshot=settings_snapshot, 

90 ) 

91 except Exception: 

92 pass 

93 

94 # Handle "False" string for email 

95 self.email = email if email and email != "False" else None 

96 

97 # API configuration 

98 self.api_base = "https://api.openalex.org" 

99 self.headers = { 

100 "User-Agent": f"Local-Deep-Research-Agent{f' ({email})' if email else ''}", 

101 "Accept": "application/json", 

102 } 

103 

104 if email: 

105 # Email allows access to polite pool with faster response times 

106 logger.info(f"Using OpenAlex polite pool with email: {email}") 

107 else: 

108 logger.info( 

109 "Using OpenAlex without email (consider adding email for faster responses)" 

110 ) 

111 

112 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

113 """ 

114 Get preview information for OpenAlex search results. 

115 

116 Args: 

117 query: The search query (natural language supported!) 

118 

119 Returns: 

120 List of preview dictionaries 

121 """ 

122 logger.info(f"Searching OpenAlex for: {query}") 

123 

124 # Build the search URL with parameters 

125 params = { 

126 "search": query, # OpenAlex handles natural language beautifully 

127 "per_page": min(self.max_results, 200), # OpenAlex allows up to 200 

128 "page": 1, 

129 # Request specific fields including abstract for snippets 

130 "select": "id,display_name,publication_year,publication_date,doi,primary_location,authorships,cited_by_count,open_access,best_oa_location,abstract_inverted_index", 

131 } 

132 

133 # Add optional filters 

134 filters = [] 

135 

136 if self.filter_open_access: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 filters.append("is_oa:true") 

138 

139 if self.min_citations > 0: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 filters.append(f"cited_by_count:>{self.min_citations}") 

141 

142 if self.from_publication_date and self.from_publication_date != "False": 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 filters.append( 

144 f"from_publication_date:{self.from_publication_date}" 

145 ) 

146 

147 if filters: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 params["filter"] = ",".join(filters) 

149 

150 # Add sorting 

151 sort_map = { 

152 "relevance": "relevance_score:desc", 

153 "cited_by_count": "cited_by_count:desc", 

154 "publication_date": "publication_date:desc", 

155 } 

156 params["sort"] = sort_map.get(self.sort_by, "relevance_score:desc") 

157 

158 # Add email to params for polite pool 

159 if self.email and self.email != "False": 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 params["mailto"] = self.email 

161 

162 try: 

163 # Apply rate limiting before making the request (simple like PubMed) 

164 self._last_wait_time = self.rate_tracker.apply_rate_limit( 

165 self.engine_type 

166 ) 

167 logger.debug( 

168 f"Applied rate limit wait: {self._last_wait_time:.2f}s" 

169 ) 

170 

171 # Make the API request 

172 logger.info(f"Making OpenAlex API request with params: {params}") 

173 response = safe_get( 

174 f"{self.api_base}/works", 

175 params=params, 

176 headers=self.headers, 

177 timeout=30, 

178 ) 

179 logger.info(f"OpenAlex API response status: {response.status_code}") 

180 

181 # Log rate limit info if available 

182 if "x-ratelimit-remaining" in response.headers: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 remaining = response.headers.get("x-ratelimit-remaining") 

184 limit = response.headers.get("x-ratelimit-limit", "unknown") 

185 logger.debug( 

186 f"OpenAlex rate limit: {remaining}/{limit} requests remaining" 

187 ) 

188 

189 if response.status_code == 200: 

190 data = response.json() 

191 results = data.get("results", []) 

192 meta = data.get("meta", {}) 

193 total_count = meta.get("count", 0) 

194 

195 logger.info( 

196 f"OpenAlex returned {len(results)} results (total available: {total_count:,})" 

197 ) 

198 

199 # Log first result structure for debugging 

200 if results: 

201 first_result = results[0] 

202 logger.debug( 

203 f"First result keys: {list(first_result.keys())}" 

204 ) 

205 logger.debug( 

206 f"First result has abstract: {'abstract_inverted_index' in first_result}" 

207 ) 

208 if "open_access" in first_result: 208 ↛ 214line 208 didn't jump to line 214 because the condition on line 208 was always true

209 logger.debug( 

210 f"Open access structure: {first_result['open_access']}" 

211 ) 

212 

213 # Format results as previews 

214 previews = [] 

215 for i, work in enumerate(results): 

216 logger.debug( 

217 f"Formatting work {i + 1}/{len(results)}: {work.get('display_name', 'Unknown')[:50]}" 

218 ) 

219 preview = self._format_work_preview(work) 

220 if preview: 220 ↛ 226line 220 didn't jump to line 226 because the condition on line 220 was always true

221 previews.append(preview) 

222 logger.debug( 

223 f"Preview created with snippet: {preview.get('snippet', '')[:100]}..." 

224 ) 

225 else: 

226 logger.warning(f"Failed to format work {i + 1}") 

227 

228 logger.info( 

229 f"Successfully formatted {len(previews)} previews from {len(results)} results" 

230 ) 

231 return previews 

232 

233 elif response.status_code == 429: 

234 # Rate limited (very rare with OpenAlex) 

235 logger.warning("OpenAlex rate limit reached") 

236 raise RateLimitError("OpenAlex rate limit exceeded") 

237 

238 else: 

239 logger.error( 

240 f"OpenAlex API error: {response.status_code} - {response.text[:200]}" 

241 ) 

242 return [] 

243 

244 except RateLimitError: 

245 # Re-raise rate limit errors for base class retry handling 

246 raise 

247 except Exception: 

248 logger.exception("Error searching OpenAlex") 

249 return [] 

250 

251 def _format_work_preview( 

252 self, work: Dict[str, Any] 

253 ) -> Optional[Dict[str, Any]]: 

254 """ 

255 Format an OpenAlex work as a preview dictionary. 

256 

257 Args: 

258 work: OpenAlex work object 

259 

260 Returns: 

261 Formatted preview dictionary or None if formatting fails 

262 """ 

263 try: 

264 # Extract basic information 

265 work_id = work.get("id", "") 

266 title = work.get("display_name", "No title") 

267 logger.debug(f"Formatting work: {title[:50]}") 

268 

269 # Build snippet from abstract or first part of title 

270 abstract = None 

271 if work.get("abstract_inverted_index"): 

272 logger.debug( 

273 f"Found abstract_inverted_index with {len(work['abstract_inverted_index'])} words" 

274 ) 

275 # Reconstruct abstract from inverted index 

276 abstract = self._reconstruct_abstract( 

277 work["abstract_inverted_index"] 

278 ) 

279 logger.debug( 

280 f"Reconstructed abstract length: {len(abstract) if abstract else 0}" 

281 ) 

282 else: 

283 logger.debug("No abstract_inverted_index found") 

284 

285 snippet = abstract[:500] if abstract else f"Academic paper: {title}" 

286 logger.debug(f"Created snippet: {snippet[:100]}...") 

287 

288 # Get publication info 

289 publication_year = work.get("publication_year", "unknown") 

290 publication_date = work.get("publication_date", "unknown") 

291 

292 # Get venue/journal info 

293 venue = work.get("primary_location", {}) 

294 journal_name = "unknown" 

295 if venue: 

296 source = venue.get("source", {}) 

297 if source: 297 ↛ 301line 297 didn't jump to line 301 because the condition on line 297 was always true

298 journal_name = source.get("display_name", "unknown") 

299 

300 # Get authors 

301 authors = [] 

302 for authorship in work.get("authorships", [])[ 

303 :5 

304 ]: # Limit to 5 authors 

305 author = authorship.get("author", {}) 

306 if author: 306 ↛ 302line 306 didn't jump to line 302 because the condition on line 306 was always true

307 authors.append(author.get("display_name", "")) 

308 

309 authors_str = ", ".join(authors) 

310 if len(work.get("authorships", [])) > 5: 

311 authors_str += " et al." 

312 

313 # Get metrics 

314 cited_by_count = work.get("cited_by_count", 0) 

315 

316 # Get URL - prefer DOI, fallback to OpenAlex URL 

317 url = work.get("doi", work_id) 

318 if not url.startswith("http"): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 if url.startswith("https://doi.org/"): 

320 pass # Already a full DOI URL 

321 elif url.startswith("10."): 

322 url = f"https://doi.org/{url}" 

323 else: 

324 url = work_id # OpenAlex URL 

325 

326 # Check if open access 

327 open_access_info = work.get("open_access", {}) 

328 is_oa = ( 

329 open_access_info.get("is_oa", False) 

330 if open_access_info 

331 else False 

332 ) 

333 oa_url = None 

334 if is_oa: 

335 best_location = work.get("best_oa_location", {}) 

336 if best_location: 336 ↛ 341line 336 didn't jump to line 341 because the condition on line 336 was always true

337 oa_url = best_location.get("pdf_url") or best_location.get( 

338 "landing_page_url" 

339 ) 

340 

341 preview = { 

342 "id": work_id, 

343 "title": title, 

344 "link": url, 

345 "snippet": snippet, 

346 "authors": authors_str, 

347 "year": publication_year, 

348 "date": publication_date, 

349 "journal": journal_name, 

350 "citations": cited_by_count, 

351 "is_open_access": is_oa, 

352 "oa_url": oa_url, 

353 "abstract": abstract, 

354 "type": "academic_paper", 

355 } 

356 

357 return preview 

358 

359 except Exception: 

360 logger.exception( 

361 f"Error formatting OpenAlex work: {work.get('id', 'unknown')}" 

362 ) 

363 return None 

364 

365 def _reconstruct_abstract( 

366 self, inverted_index: Dict[str, List[int]] 

367 ) -> str: 

368 """ 

369 Reconstruct abstract text from OpenAlex inverted index format. 

370 

371 Args: 

372 inverted_index: Dictionary mapping words to their positions 

373 

374 Returns: 

375 Reconstructed abstract text 

376 """ 

377 try: 

378 # Create position-word mapping 

379 position_word = {} 

380 for word, positions in inverted_index.items(): 

381 for pos in positions: 

382 position_word[pos] = word 

383 

384 # Sort by position and reconstruct 

385 sorted_positions = sorted(position_word.keys()) 

386 words = [position_word[pos] for pos in sorted_positions] 

387 

388 return " ".join(words) 

389 

390 except Exception: 

391 logger.debug("Could not reconstruct abstract from inverted index") 

392 return "" 

393 

394 def _get_full_content( 

395 self, relevant_items: List[Dict[str, Any]] 

396 ) -> List[Dict[str, Any]]: 

397 """ 

398 Get full content for relevant items (OpenAlex provides most content in preview). 

399 

400 Args: 

401 relevant_items: List of relevant preview dictionaries 

402 

403 Returns: 

404 List of result dictionaries with full content 

405 """ 

406 # OpenAlex returns comprehensive data in the initial search, 

407 # so we don't need a separate full content fetch 

408 results = [] 

409 for item in relevant_items: 

410 result = { 

411 "title": item.get("title", ""), 

412 "link": item.get("link", ""), 

413 "snippet": item.get("snippet", ""), 

414 "content": item.get("abstract", item.get("snippet", "")), 

415 "metadata": { 

416 "authors": item.get("authors", ""), 

417 "year": item.get("year", ""), 

418 "journal": item.get("journal", ""), 

419 "citations": item.get("citations", 0), 

420 "is_open_access": item.get("is_open_access", False), 

421 "oa_url": item.get("oa_url"), 

422 }, 

423 } 

424 results.append(result) 

425 

426 return results