Coverage for src / local_deep_research / web_search_engines / engines / search_engine_openalex.py: 87%
158 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""OpenAlex search engine implementation for academic papers and research."""
3from typing import Any, Dict, List, Optional
5from langchain_core.language_models import BaseLLM
6from loguru import logger
8from ...constants import SNIPPET_LENGTH_LONG
9from ...advanced_search_system.filters.journal_reputation_filter import (
10 JournalReputationFilter,
11)
12from ...security.safe_requests import safe_get
13from ..rate_limiting import RateLimitError
14from ..search_engine_base import BaseSearchEngine
17class OpenAlexSearchEngine(BaseSearchEngine):
18 """OpenAlex search engine implementation with natural language query support."""
20 # Mark as public search engine
21 is_public = True
22 # Scientific/academic search engine
23 is_scientific = True
25 def __init__(
26 self,
27 max_results: int = 25,
28 email: Optional[str] = None,
29 sort_by: str = "relevance",
30 filter_open_access: bool = False,
31 min_citations: int = 0,
32 from_publication_date: Optional[str] = None,
33 llm: Optional[BaseLLM] = None,
34 max_filtered_results: Optional[int] = None,
35 settings_snapshot: Optional[Dict[str, Any]] = None,
36 **kwargs,
37 ):
38 """
39 Initialize the OpenAlex search engine.
41 Args:
42 max_results: Maximum number of search results
43 email: Email for polite pool (gets faster response) - optional
44 sort_by: Sort order ('relevance', 'cited_by_count', 'publication_date')
45 filter_open_access: Only return open access papers
46 min_citations: Minimum citation count filter
47 from_publication_date: Filter papers from this date (YYYY-MM-DD)
48 llm: Language model for relevance filtering
49 max_filtered_results: Maximum number of results to keep after filtering
50 settings_snapshot: Settings snapshot for configuration
51 **kwargs: Additional parameters to pass to parent class
52 """
53 # Initialize journal reputation filter if needed
54 content_filters = []
55 journal_filter = JournalReputationFilter.create_default(
56 model=llm,
57 engine_name="openalex",
58 settings_snapshot=settings_snapshot,
59 )
60 if journal_filter is not None: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 content_filters.append(journal_filter)
63 # Initialize the BaseSearchEngine
64 super().__init__(
65 llm=llm,
66 max_filtered_results=max_filtered_results,
67 max_results=max_results,
68 content_filters=content_filters,
69 settings_snapshot=settings_snapshot,
70 **kwargs,
71 )
73 self.sort_by = sort_by
74 self.filter_open_access = filter_open_access
75 self.min_citations = min_citations
76 # Only set from_publication_date if it's not empty or "False"
77 self.from_publication_date = (
78 from_publication_date
79 if from_publication_date and from_publication_date != "False"
80 else None
81 )
83 # Get email from settings if not provided
84 if not email and settings_snapshot: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 from ...config.search_config import get_setting_from_snapshot
87 try:
88 email = get_setting_from_snapshot(
89 "search.engine.web.openalex.email",
90 settings_snapshot=settings_snapshot,
91 )
92 except Exception:
93 pass
95 # Handle "False" string for email
96 self.email = email if email and email != "False" else None
98 # API configuration
99 self.api_base = "https://api.openalex.org"
100 self.headers = {
101 "User-Agent": f"Local-Deep-Research-Agent{f' ({email})' if email else ''}",
102 "Accept": "application/json",
103 }
105 if email:
106 # Email allows access to polite pool with faster response times
107 logger.info(f"Using OpenAlex polite pool with email: {email}")
108 else:
109 logger.info(
110 "Using OpenAlex without email (consider adding email for faster responses)"
111 )
113 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
114 """
115 Get preview information for OpenAlex search results.
117 Args:
118 query: The search query (natural language supported!)
120 Returns:
121 List of preview dictionaries
122 """
123 logger.info(f"Searching OpenAlex for: {query}")
125 # Build the search URL with parameters
126 params = {
127 "search": query, # OpenAlex handles natural language beautifully
128 "per_page": min(self.max_results, 200), # OpenAlex allows up to 200
129 "page": 1,
130 # Request specific fields including abstract for snippets
131 "select": "id,display_name,publication_year,publication_date,doi,primary_location,authorships,cited_by_count,open_access,best_oa_location,abstract_inverted_index",
132 }
134 # Add optional filters
135 filters = []
137 if self.filter_open_access:
138 filters.append("is_oa:true")
140 if self.min_citations > 0:
141 filters.append(f"cited_by_count:>{self.min_citations}")
143 if self.from_publication_date and self.from_publication_date != "False":
144 filters.append(
145 f"from_publication_date:{self.from_publication_date}"
146 )
148 if filters:
149 params["filter"] = ",".join(filters)
151 # Add sorting
152 sort_map = {
153 "relevance": "relevance_score:desc",
154 "cited_by_count": "cited_by_count:desc",
155 "publication_date": "publication_date:desc",
156 }
157 params["sort"] = sort_map.get(self.sort_by, "relevance_score:desc")
159 # Add email to params for polite pool
160 if self.email and self.email != "False":
161 params["mailto"] = self.email
163 try:
164 # Apply rate limiting before making the request (simple like PubMed)
165 self._last_wait_time = self.rate_tracker.apply_rate_limit(
166 self.engine_type
167 )
168 logger.debug(
169 f"Applied rate limit wait: {self._last_wait_time:.2f}s"
170 )
172 # Make the API request
173 logger.info(f"Making OpenAlex API request with params: {params}")
174 response = safe_get(
175 f"{self.api_base}/works",
176 params=params,
177 headers=self.headers,
178 timeout=30,
179 )
180 logger.info(f"OpenAlex API response status: {response.status_code}")
182 # Log rate limit info if available
183 if "x-ratelimit-remaining" in response.headers: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 remaining = response.headers.get("x-ratelimit-remaining")
185 limit = response.headers.get("x-ratelimit-limit", "unknown")
186 logger.debug(
187 f"OpenAlex rate limit: {remaining}/{limit} requests remaining"
188 )
190 if response.status_code == 200:
191 data = response.json()
192 results = data.get("results", [])
193 meta = data.get("meta", {})
194 total_count = meta.get("count", 0)
196 logger.info(
197 f"OpenAlex returned {len(results)} results (total available: {total_count:,})"
198 )
200 # Log first result structure for debugging
201 if results:
202 first_result = results[0]
203 logger.debug(
204 f"First result keys: {list(first_result.keys())}"
205 )
206 logger.debug(
207 f"First result has abstract: {'abstract_inverted_index' in first_result}"
208 )
209 if "open_access" in first_result: 209 ↛ 215line 209 didn't jump to line 215 because the condition on line 209 was always true
210 logger.debug(
211 f"Open access structure: {first_result['open_access']}"
212 )
214 # Format results as previews
215 previews = []
216 for i, work in enumerate(results):
217 logger.debug(
218 f"Formatting work {i + 1}/{len(results)}: {work.get('display_name', 'Unknown')[:50]}"
219 )
220 preview = self._format_work_preview(work)
221 if preview: 221 ↛ 227line 221 didn't jump to line 227 because the condition on line 221 was always true
222 previews.append(preview)
223 logger.debug(
224 f"Preview created with snippet: {preview.get('snippet', '')[:100]}..."
225 )
226 else:
227 logger.warning(f"Failed to format work {i + 1}")
229 logger.info(
230 f"Successfully formatted {len(previews)} previews from {len(results)} results"
231 )
232 return previews
234 elif response.status_code == 429:
235 # Rate limited (very rare with OpenAlex)
236 logger.warning("OpenAlex rate limit reached")
237 raise RateLimitError("OpenAlex rate limit exceeded")
239 else:
240 logger.error(
241 f"OpenAlex API error: {response.status_code} - {response.text[:200]}"
242 )
243 return []
245 except RateLimitError:
246 # Re-raise rate limit errors for base class retry handling
247 raise
248 except Exception:
249 logger.exception("Error searching OpenAlex")
250 return []
252 def _format_work_preview(
253 self, work: Dict[str, Any]
254 ) -> Optional[Dict[str, Any]]:
255 """
256 Format an OpenAlex work as a preview dictionary.
258 Args:
259 work: OpenAlex work object
261 Returns:
262 Formatted preview dictionary or None if formatting fails
263 """
264 try:
265 # Extract basic information
266 work_id = work.get("id", "")
267 title = work.get("display_name", "No title")
268 logger.debug(f"Formatting work: {title[:50]}")
270 # Build snippet from abstract or first part of title
271 abstract = None
272 if work.get("abstract_inverted_index"):
273 logger.debug(
274 f"Found abstract_inverted_index with {len(work['abstract_inverted_index'])} words"
275 )
276 # Reconstruct abstract from inverted index
277 abstract = self._reconstruct_abstract(
278 work["abstract_inverted_index"]
279 )
280 logger.debug(
281 f"Reconstructed abstract length: {len(abstract) if abstract else 0}"
282 )
283 else:
284 logger.debug("No abstract_inverted_index found")
286 snippet = (
287 abstract[:SNIPPET_LENGTH_LONG]
288 if abstract
289 else f"Academic paper: {title}"
290 )
291 logger.debug(f"Created snippet: {snippet[:100]}...")
293 # Get publication info
294 publication_year = work.get("publication_year", "unknown")
295 publication_date = work.get("publication_date", "unknown")
297 # Get venue/journal info
298 venue = work.get("primary_location", {})
299 journal_name = "unknown"
300 if venue:
301 source = venue.get("source", {})
302 if source: 302 ↛ 306line 302 didn't jump to line 306 because the condition on line 302 was always true
303 journal_name = source.get("display_name", "unknown")
305 # Get authors
306 authors = []
307 for authorship in work.get("authorships", [])[
308 :5
309 ]: # Limit to 5 authors
310 author = authorship.get("author", {})
311 if author: 311 ↛ 307line 311 didn't jump to line 307 because the condition on line 311 was always true
312 authors.append(author.get("display_name", ""))
314 authors_str = ", ".join(authors)
315 if len(work.get("authorships", [])) > 5:
316 authors_str += " et al."
318 # Get metrics
319 cited_by_count = work.get("cited_by_count", 0)
321 # Get URL - prefer DOI, fallback to OpenAlex URL
322 url = work.get("doi", work_id)
323 if not url.startswith("http"):
324 if url.startswith("https://doi.org/"): 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true
325 pass # Already a full DOI URL
326 elif url.startswith("10."): 326 ↛ 329line 326 didn't jump to line 329 because the condition on line 326 was always true
327 url = f"https://doi.org/{url}"
328 else:
329 url = work_id # OpenAlex URL
331 # Check if open access
332 open_access_info = work.get("open_access", {})
333 is_oa = (
334 open_access_info.get("is_oa", False)
335 if open_access_info
336 else False
337 )
338 oa_url = None
339 if is_oa:
340 best_location = work.get("best_oa_location", {})
341 if best_location: 341 ↛ 346line 341 didn't jump to line 346 because the condition on line 341 was always true
342 oa_url = best_location.get("pdf_url") or best_location.get(
343 "landing_page_url"
344 )
346 preview = {
347 "id": work_id,
348 "title": title,
349 "link": url,
350 "snippet": snippet,
351 "authors": authors_str,
352 "year": publication_year,
353 "date": publication_date,
354 "journal": journal_name,
355 "citations": cited_by_count,
356 "is_open_access": is_oa,
357 "oa_url": oa_url,
358 "abstract": abstract,
359 "type": "academic_paper",
360 }
362 return preview
364 except Exception:
365 logger.exception(
366 f"Error formatting OpenAlex work: {work.get('id', 'unknown')}"
367 )
368 return None
370 def _reconstruct_abstract(
371 self, inverted_index: Dict[str, List[int]]
372 ) -> str:
373 """
374 Reconstruct abstract text from OpenAlex inverted index format.
376 Args:
377 inverted_index: Dictionary mapping words to their positions
379 Returns:
380 Reconstructed abstract text
381 """
382 try:
383 # Create position-word mapping
384 position_word = {}
385 for word, positions in inverted_index.items():
386 for pos in positions:
387 position_word[pos] = word
389 # Sort by position and reconstruct
390 sorted_positions = sorted(position_word.keys())
391 words = [position_word[pos] for pos in sorted_positions]
393 return " ".join(words)
395 except Exception:
396 logger.debug("Could not reconstruct abstract from inverted index")
397 return ""
399 def _get_full_content(
400 self, relevant_items: List[Dict[str, Any]]
401 ) -> List[Dict[str, Any]]:
402 """
403 Get full content for relevant items (OpenAlex provides most content in preview).
405 Args:
406 relevant_items: List of relevant preview dictionaries
408 Returns:
409 List of result dictionaries with full content
410 """
411 # OpenAlex returns comprehensive data in the initial search,
412 # so we don't need a separate full content fetch
413 results = []
414 for item in relevant_items:
415 result = {
416 "title": item.get("title", ""),
417 "link": item.get("link", ""),
418 "snippet": item.get("snippet", ""),
419 "content": item.get("abstract", item.get("snippet", "")),
420 "metadata": {
421 "authors": item.get("authors", ""),
422 "year": item.get("year", ""),
423 "journal": item.get("journal", ""),
424 "citations": item.get("citations", 0),
425 "is_open_access": item.get("is_open_access", False),
426 "oa_url": item.get("oa_url"),
427 },
428 }
429 results.append(result)
431 return results