Coverage for src / local_deep_research / web_search_engines / engines / search_engine_openalex.py: 82%
157 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""OpenAlex search engine implementation for academic papers and research."""
3from typing import Any, Dict, List, Optional
5from langchain_core.language_models import BaseLLM
6from loguru import logger
8from ...advanced_search_system.filters.journal_reputation_filter import (
9 JournalReputationFilter,
10)
11from ...security.safe_requests import safe_get
12from ..rate_limiting import RateLimitError
13from ..search_engine_base import BaseSearchEngine
16class OpenAlexSearchEngine(BaseSearchEngine):
17 """OpenAlex search engine implementation with natural language query support."""
19 # Mark as public search engine
20 is_public = True
21 # Scientific/academic search engine
22 is_scientific = True
24 def __init__(
25 self,
26 max_results: int = 25,
27 email: Optional[str] = None,
28 sort_by: str = "relevance",
29 filter_open_access: bool = False,
30 min_citations: int = 0,
31 from_publication_date: Optional[str] = None,
32 llm: Optional[BaseLLM] = None,
33 max_filtered_results: Optional[int] = None,
34 settings_snapshot: Optional[Dict[str, Any]] = None,
35 **kwargs,
36 ):
37 """
38 Initialize the OpenAlex search engine.
40 Args:
41 max_results: Maximum number of search results
42 email: Email for polite pool (gets faster response) - optional
43 sort_by: Sort order ('relevance', 'cited_by_count', 'publication_date')
44 filter_open_access: Only return open access papers
45 min_citations: Minimum citation count filter
46 from_publication_date: Filter papers from this date (YYYY-MM-DD)
47 llm: Language model for relevance filtering
48 max_filtered_results: Maximum number of results to keep after filtering
49 settings_snapshot: Settings snapshot for configuration
50 **kwargs: Additional parameters to pass to parent class
51 """
52 # Initialize journal reputation filter if needed
53 content_filters = []
54 journal_filter = JournalReputationFilter.create_default(
55 model=llm,
56 engine_name="openalex",
57 settings_snapshot=settings_snapshot,
58 )
59 if journal_filter is not None:
60 content_filters.append(journal_filter)
62 # Initialize the BaseSearchEngine
63 super().__init__(
64 llm=llm,
65 max_filtered_results=max_filtered_results,
66 max_results=max_results,
67 content_filters=content_filters,
68 settings_snapshot=settings_snapshot,
69 **kwargs,
70 )
72 self.sort_by = sort_by
73 self.filter_open_access = filter_open_access
74 self.min_citations = min_citations
75 # Only set from_publication_date if it's not empty or "False"
76 self.from_publication_date = (
77 from_publication_date
78 if from_publication_date and from_publication_date != "False"
79 else None
80 )
82 # Get email from settings if not provided
83 if not email and settings_snapshot:
84 from ...config.search_config import get_setting_from_snapshot
86 try:
87 email = get_setting_from_snapshot(
88 "search.engine.web.openalex.email",
89 settings_snapshot=settings_snapshot,
90 )
91 except Exception:
92 pass
94 # Handle "False" string for email
95 self.email = email if email and email != "False" else None
97 # API configuration
98 self.api_base = "https://api.openalex.org"
99 self.headers = {
100 "User-Agent": f"Local-Deep-Research-Agent{f' ({email})' if email else ''}",
101 "Accept": "application/json",
102 }
104 if email:
105 # Email allows access to polite pool with faster response times
106 logger.info(f"Using OpenAlex polite pool with email: {email}")
107 else:
108 logger.info(
109 "Using OpenAlex without email (consider adding email for faster responses)"
110 )
112 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
113 """
114 Get preview information for OpenAlex search results.
116 Args:
117 query: The search query (natural language supported!)
119 Returns:
120 List of preview dictionaries
121 """
122 logger.info(f"Searching OpenAlex for: {query}")
124 # Build the search URL with parameters
125 params = {
126 "search": query, # OpenAlex handles natural language beautifully
127 "per_page": min(self.max_results, 200), # OpenAlex allows up to 200
128 "page": 1,
129 # Request specific fields including abstract for snippets
130 "select": "id,display_name,publication_year,publication_date,doi,primary_location,authorships,cited_by_count,open_access,best_oa_location,abstract_inverted_index",
131 }
133 # Add optional filters
134 filters = []
136 if self.filter_open_access: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 filters.append("is_oa:true")
139 if self.min_citations > 0: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 filters.append(f"cited_by_count:>{self.min_citations}")
142 if self.from_publication_date and self.from_publication_date != "False": 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 filters.append(
144 f"from_publication_date:{self.from_publication_date}"
145 )
147 if filters: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 params["filter"] = ",".join(filters)
150 # Add sorting
151 sort_map = {
152 "relevance": "relevance_score:desc",
153 "cited_by_count": "cited_by_count:desc",
154 "publication_date": "publication_date:desc",
155 }
156 params["sort"] = sort_map.get(self.sort_by, "relevance_score:desc")
158 # Add email to params for polite pool
159 if self.email and self.email != "False": 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 params["mailto"] = self.email
162 try:
163 # Apply rate limiting before making the request (simple like PubMed)
164 self._last_wait_time = self.rate_tracker.apply_rate_limit(
165 self.engine_type
166 )
167 logger.debug(
168 f"Applied rate limit wait: {self._last_wait_time:.2f}s"
169 )
171 # Make the API request
172 logger.info(f"Making OpenAlex API request with params: {params}")
173 response = safe_get(
174 f"{self.api_base}/works",
175 params=params,
176 headers=self.headers,
177 timeout=30,
178 )
179 logger.info(f"OpenAlex API response status: {response.status_code}")
181 # Log rate limit info if available
182 if "x-ratelimit-remaining" in response.headers: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 remaining = response.headers.get("x-ratelimit-remaining")
184 limit = response.headers.get("x-ratelimit-limit", "unknown")
185 logger.debug(
186 f"OpenAlex rate limit: {remaining}/{limit} requests remaining"
187 )
189 if response.status_code == 200:
190 data = response.json()
191 results = data.get("results", [])
192 meta = data.get("meta", {})
193 total_count = meta.get("count", 0)
195 logger.info(
196 f"OpenAlex returned {len(results)} results (total available: {total_count:,})"
197 )
199 # Log first result structure for debugging
200 if results:
201 first_result = results[0]
202 logger.debug(
203 f"First result keys: {list(first_result.keys())}"
204 )
205 logger.debug(
206 f"First result has abstract: {'abstract_inverted_index' in first_result}"
207 )
208 if "open_access" in first_result: 208 ↛ 214line 208 didn't jump to line 214 because the condition on line 208 was always true
209 logger.debug(
210 f"Open access structure: {first_result['open_access']}"
211 )
213 # Format results as previews
214 previews = []
215 for i, work in enumerate(results):
216 logger.debug(
217 f"Formatting work {i + 1}/{len(results)}: {work.get('display_name', 'Unknown')[:50]}"
218 )
219 preview = self._format_work_preview(work)
220 if preview: 220 ↛ 226line 220 didn't jump to line 226 because the condition on line 220 was always true
221 previews.append(preview)
222 logger.debug(
223 f"Preview created with snippet: {preview.get('snippet', '')[:100]}..."
224 )
225 else:
226 logger.warning(f"Failed to format work {i + 1}")
228 logger.info(
229 f"Successfully formatted {len(previews)} previews from {len(results)} results"
230 )
231 return previews
233 elif response.status_code == 429:
234 # Rate limited (very rare with OpenAlex)
235 logger.warning("OpenAlex rate limit reached")
236 raise RateLimitError("OpenAlex rate limit exceeded")
238 else:
239 logger.error(
240 f"OpenAlex API error: {response.status_code} - {response.text[:200]}"
241 )
242 return []
244 except RateLimitError:
245 # Re-raise rate limit errors for base class retry handling
246 raise
247 except Exception:
248 logger.exception("Error searching OpenAlex")
249 return []
251 def _format_work_preview(
252 self, work: Dict[str, Any]
253 ) -> Optional[Dict[str, Any]]:
254 """
255 Format an OpenAlex work as a preview dictionary.
257 Args:
258 work: OpenAlex work object
260 Returns:
261 Formatted preview dictionary or None if formatting fails
262 """
263 try:
264 # Extract basic information
265 work_id = work.get("id", "")
266 title = work.get("display_name", "No title")
267 logger.debug(f"Formatting work: {title[:50]}")
269 # Build snippet from abstract or first part of title
270 abstract = None
271 if work.get("abstract_inverted_index"):
272 logger.debug(
273 f"Found abstract_inverted_index with {len(work['abstract_inverted_index'])} words"
274 )
275 # Reconstruct abstract from inverted index
276 abstract = self._reconstruct_abstract(
277 work["abstract_inverted_index"]
278 )
279 logger.debug(
280 f"Reconstructed abstract length: {len(abstract) if abstract else 0}"
281 )
282 else:
283 logger.debug("No abstract_inverted_index found")
285 snippet = abstract[:500] if abstract else f"Academic paper: {title}"
286 logger.debug(f"Created snippet: {snippet[:100]}...")
288 # Get publication info
289 publication_year = work.get("publication_year", "unknown")
290 publication_date = work.get("publication_date", "unknown")
292 # Get venue/journal info
293 venue = work.get("primary_location", {})
294 journal_name = "unknown"
295 if venue:
296 source = venue.get("source", {})
297 if source: 297 ↛ 301line 297 didn't jump to line 301 because the condition on line 297 was always true
298 journal_name = source.get("display_name", "unknown")
300 # Get authors
301 authors = []
302 for authorship in work.get("authorships", [])[
303 :5
304 ]: # Limit to 5 authors
305 author = authorship.get("author", {})
306 if author: 306 ↛ 302line 306 didn't jump to line 302 because the condition on line 306 was always true
307 authors.append(author.get("display_name", ""))
309 authors_str = ", ".join(authors)
310 if len(work.get("authorships", [])) > 5:
311 authors_str += " et al."
313 # Get metrics
314 cited_by_count = work.get("cited_by_count", 0)
316 # Get URL - prefer DOI, fallback to OpenAlex URL
317 url = work.get("doi", work_id)
318 if not url.startswith("http"): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 if url.startswith("https://doi.org/"):
320 pass # Already a full DOI URL
321 elif url.startswith("10."):
322 url = f"https://doi.org/{url}"
323 else:
324 url = work_id # OpenAlex URL
326 # Check if open access
327 open_access_info = work.get("open_access", {})
328 is_oa = (
329 open_access_info.get("is_oa", False)
330 if open_access_info
331 else False
332 )
333 oa_url = None
334 if is_oa:
335 best_location = work.get("best_oa_location", {})
336 if best_location: 336 ↛ 341line 336 didn't jump to line 341 because the condition on line 336 was always true
337 oa_url = best_location.get("pdf_url") or best_location.get(
338 "landing_page_url"
339 )
341 preview = {
342 "id": work_id,
343 "title": title,
344 "link": url,
345 "snippet": snippet,
346 "authors": authors_str,
347 "year": publication_year,
348 "date": publication_date,
349 "journal": journal_name,
350 "citations": cited_by_count,
351 "is_open_access": is_oa,
352 "oa_url": oa_url,
353 "abstract": abstract,
354 "type": "academic_paper",
355 }
357 return preview
359 except Exception:
360 logger.exception(
361 f"Error formatting OpenAlex work: {work.get('id', 'unknown')}"
362 )
363 return None
365 def _reconstruct_abstract(
366 self, inverted_index: Dict[str, List[int]]
367 ) -> str:
368 """
369 Reconstruct abstract text from OpenAlex inverted index format.
371 Args:
372 inverted_index: Dictionary mapping words to their positions
374 Returns:
375 Reconstructed abstract text
376 """
377 try:
378 # Create position-word mapping
379 position_word = {}
380 for word, positions in inverted_index.items():
381 for pos in positions:
382 position_word[pos] = word
384 # Sort by position and reconstruct
385 sorted_positions = sorted(position_word.keys())
386 words = [position_word[pos] for pos in sorted_positions]
388 return " ".join(words)
390 except Exception:
391 logger.debug("Could not reconstruct abstract from inverted index")
392 return ""
394 def _get_full_content(
395 self, relevant_items: List[Dict[str, Any]]
396 ) -> List[Dict[str, Any]]:
397 """
398 Get full content for relevant items (OpenAlex provides most content in preview).
400 Args:
401 relevant_items: List of relevant preview dictionaries
403 Returns:
404 List of result dictionaries with full content
405 """
406 # OpenAlex returns comprehensive data in the initial search,
407 # so we don't need a separate full content fetch
408 results = []
409 for item in relevant_items:
410 result = {
411 "title": item.get("title", ""),
412 "link": item.get("link", ""),
413 "snippet": item.get("snippet", ""),
414 "content": item.get("abstract", item.get("snippet", "")),
415 "metadata": {
416 "authors": item.get("authors", ""),
417 "year": item.get("year", ""),
418 "journal": item.get("journal", ""),
419 "citations": item.get("citations", 0),
420 "is_open_access": item.get("is_open_access", False),
421 "oa_url": item.get("oa_url"),
422 },
423 }
424 results.append(result)
426 return results