Coverage for src/local_deep_research/web_search_engines/engines/search_engine_openalex.py: 93%
178 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""OpenAlex search engine implementation for academic papers and research."""
3from typing import Any, Dict, List, Optional
5from langchain_core.language_models import BaseLLM
6from loguru import logger
8from ...constants import SNIPPET_LENGTH_LONG, USER_AGENT
9from ...advanced_search_system.filters.journal_reputation_filter import (
10 JournalReputationFilter,
11)
12from ...security.safe_requests import safe_get
13from ..rate_limiting import RateLimitError
14from ..search_engine_base import BaseSearchEngine
17class OpenAlexSearchEngine(BaseSearchEngine):
18 """OpenAlex search engine implementation with natural language query support."""
20 # Mark as public search engine
21 is_public = True
22 # Scientific/academic search engine
23 is_scientific = True
24 is_lexical = True
25 needs_llm_relevance_filter = True
27 def __init__(
28 self,
29 max_results: int = 25,
30 email: Optional[str] = None,
31 sort_by: str = "relevance",
32 filter_open_access: bool = False,
33 min_citations: int = 0,
34 from_publication_date: Optional[str] = None,
35 llm: Optional[BaseLLM] = None,
36 max_filtered_results: Optional[int] = None,
37 settings_snapshot: Optional[Dict[str, Any]] = None,
38 **kwargs,
39 ):
40 """
41 Initialize the OpenAlex search engine.
43 Args:
44 max_results: Maximum number of search results
45 email: Email for polite pool (gets faster response) - optional
46 sort_by: Sort order ('relevance', 'cited_by_count', 'publication_date')
47 filter_open_access: Only return open access papers
48 min_citations: Minimum citation count filter
49 from_publication_date: Filter papers from this date (YYYY-MM-DD)
50 llm: Language model for relevance filtering
51 max_filtered_results: Maximum number of results to keep after filtering
52 settings_snapshot: Settings snapshot for configuration
53 **kwargs: Additional parameters to pass to parent class
54 """
55 # Journal filter runs before LLM relevance (Tiers 1-3 are instant)
56 preview_filters = []
57 journal_filter = JournalReputationFilter.create_default(
58 model=llm, # type: ignore[arg-type]
59 engine_name="openalex",
60 settings_snapshot=settings_snapshot,
61 )
62 if journal_filter is not None:
63 preview_filters.append(journal_filter)
65 super().__init__(
66 llm=llm,
67 max_filtered_results=max_filtered_results,
68 max_results=max_results,
69 preview_filters=preview_filters, # type: ignore[arg-type]
70 settings_snapshot=settings_snapshot,
71 **kwargs,
72 )
74 self.sort_by = sort_by
75 self.filter_open_access = filter_open_access
76 self.min_citations = min_citations
77 # Only set from_publication_date if it's not empty or "False"
78 self.from_publication_date = (
79 from_publication_date
80 if from_publication_date and from_publication_date != "False"
81 else None
82 )
84 # Get email from settings if not provided
85 if not email and settings_snapshot:
86 from ...config.search_config import get_setting_from_snapshot
88 try:
89 email = get_setting_from_snapshot(
90 "search.engine.web.openalex.email",
91 settings_snapshot=settings_snapshot,
92 )
93 except Exception:
94 logger.debug(
95 "Failed to read openalex.email from settings snapshot",
96 exc_info=True,
97 )
99 # Handle "False" string for email
100 self.email = email if email and email != "False" else None
102 # API configuration
103 self.api_base = "https://api.openalex.org"
104 self.headers = {
105 "User-Agent": f"{USER_AGENT} ({email})" if email else USER_AGENT,
106 "Accept": "application/json",
107 }
109 if email:
110 # Email allows access to polite pool with faster response times
111 logger.info(f"Using OpenAlex polite pool with email: {email}")
112 else:
113 logger.info(
114 "Using OpenAlex without email (consider adding email for faster responses)"
115 )
117 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
118 """
119 Get preview information for OpenAlex search results.
121 Args:
122 query: The search query (natural language supported!)
124 Returns:
125 List of preview dictionaries
126 """
127 logger.info(f"Searching OpenAlex for: {query}")
129 # Build the search URL with parameters
130 params = {
131 "search": query, # OpenAlex handles natural language beautifully
132 "per_page": min(self.max_results, 200), # OpenAlex allows up to 200
133 "page": 1,
134 # Request specific fields including abstract for snippets
135 "select": "id,display_name,publication_year,publication_date,doi,primary_location,authorships,cited_by_count,open_access,best_oa_location,abstract_inverted_index",
136 }
138 # Add optional filters
139 filters = []
141 if self.filter_open_access:
142 filters.append("is_oa:true")
144 if self.min_citations > 0:
145 filters.append(f"cited_by_count:>{self.min_citations}")
147 if self.from_publication_date and self.from_publication_date != "False":
148 filters.append(
149 f"from_publication_date:{self.from_publication_date}"
150 )
152 if filters:
153 params["filter"] = ",".join(filters)
155 # Add sorting
156 sort_map = {
157 "relevance": "relevance_score:desc",
158 "cited_by_count": "cited_by_count:desc",
159 "publication_date": "publication_date:desc",
160 }
161 params["sort"] = sort_map.get(self.sort_by, "relevance_score:desc")
163 # Add email to params for polite pool
164 if self.email and self.email != "False":
165 params["mailto"] = self.email
167 try:
168 # Apply rate limiting before making the request (simple like PubMed)
169 self._last_wait_time = self.rate_tracker.apply_rate_limit(
170 self.engine_type
171 )
172 logger.debug(
173 f"Applied rate limit wait: {self._last_wait_time:.2f}s"
174 )
176 # Make the API request
177 logger.info(f"Making OpenAlex API request with params: {params}")
178 response = safe_get(
179 f"{self.api_base}/works",
180 params=params,
181 headers=self.headers,
182 timeout=30,
183 )
184 logger.info(f"OpenAlex API response status: {response.status_code}")
186 # Log rate limit info if available
187 if "x-ratelimit-remaining" in response.headers:
188 remaining = response.headers.get("x-ratelimit-remaining")
189 limit = response.headers.get("x-ratelimit-limit", "unknown")
190 logger.debug(
191 f"OpenAlex rate limit: {remaining}/{limit} requests remaining"
192 )
194 if response.status_code == 200:
195 data = response.json()
196 results = data.get("results", [])
197 meta = data.get("meta", {})
198 total_count = meta.get("count", 0)
200 logger.info(
201 f"OpenAlex returned {len(results)} results (total available: {total_count:,})"
202 )
204 # Log first result structure for debugging
205 if results:
206 first_result = results[0]
207 logger.debug(
208 f"First result keys: {list(first_result.keys())}"
209 )
210 logger.debug(
211 f"First result has abstract: {'abstract_inverted_index' in first_result}"
212 )
213 if "open_access" in first_result:
214 logger.debug(
215 f"Open access structure: {first_result['open_access']}"
216 )
218 # Format results as previews
219 previews = []
220 for i, work in enumerate(results):
221 logger.debug(
222 f"Formatting work {i + 1}/{len(results)}: {(work.get('display_name') or 'Unknown')[:50]}"
223 )
224 preview = self._format_work_preview(work)
225 if preview:
226 previews.append(preview)
227 logger.debug(
228 f"Preview created with snippet: {preview.get('snippet', '')[:100]}..."
229 )
230 else:
231 logger.warning(f"Failed to format work {i + 1}")
233 logger.info(
234 f"Successfully formatted {len(previews)} previews from {len(results)} results"
235 )
236 return previews
238 if response.status_code == 429:
239 # Rate limited (very rare with OpenAlex)
240 logger.warning("OpenAlex rate limit reached")
241 raise RateLimitError("OpenAlex rate limit exceeded") # noqa: TRY301 — re-raised by except RateLimitError for base class retry
243 logger.error(
244 f"OpenAlex API error: {response.status_code} - {response.text[:200]}"
245 )
246 return []
248 except RateLimitError:
249 # Re-raise rate limit errors for base class retry handling
250 raise
251 except Exception:
252 logger.exception("Error searching OpenAlex")
253 return []
255 def _format_work_preview(
256 self, work: Dict[str, Any]
257 ) -> Optional[Dict[str, Any]]:
258 """
259 Format an OpenAlex work as a preview dictionary.
261 Args:
262 work: OpenAlex work object
264 Returns:
265 Formatted preview dictionary or None if formatting fails
266 """
267 try:
268 # Extract basic information
269 # Use `or` instead of dict.get default — OpenAlex routinely
270 # returns these keys with explicit None values, which would
271 # bypass the default and crash on downstream string ops.
272 work_id = work.get("id") or ""
273 title = work.get("display_name") or "No title"
274 logger.debug(f"Formatting work: {title[:50]}")
276 # Build snippet from abstract or first part of title
277 abstract = None
278 if work.get("abstract_inverted_index"):
279 logger.debug(
280 f"Found abstract_inverted_index with {len(work['abstract_inverted_index'])} words"
281 )
282 # Reconstruct abstract from inverted index
283 abstract = self._reconstruct_abstract(
284 work["abstract_inverted_index"]
285 )
286 logger.debug(
287 f"Reconstructed abstract length: {len(abstract) if abstract else 0}"
288 )
289 else:
290 logger.debug("No abstract_inverted_index found")
292 snippet = (
293 abstract[:SNIPPET_LENGTH_LONG]
294 if abstract
295 else f"Academic paper: {title}"
296 )
297 logger.debug(f"Created snippet: {snippet[:100]}...")
299 # Get publication info
300 publication_year = work.get("publication_year", "unknown")
301 publication_date = work.get("publication_date", "unknown")
303 # Get venue/journal info
304 venue = work.get("primary_location", {})
305 journal_name = "unknown"
306 openalex_source_id = None
307 source_type = None
308 issn = None
309 if venue:
310 source = venue.get("source", {})
311 if source:
312 journal_name = source.get("display_name") or "unknown"
313 # Extract source ID for journal quality lookups
314 raw_sid = source.get("id") or ""
315 if raw_sid:
316 openalex_source_id = raw_sid.split("/")[-1]
317 source_type = source.get("type")
318 # Forward the linking ISSN so the reputation filter's
319 # Tier 2/3 lookups can use it instead of falling back
320 # to fuzzy name matching.
321 issn = source.get("issn_l") or None
323 # Get authors
324 authors = []
325 for authorship in work.get("authorships", [])[
326 :5
327 ]: # Limit to 5 authors
328 author = authorship.get("author", {})
329 if author: 329 ↛ 325line 329 didn't jump to line 325 because the condition on line 329 was always true
330 authors.append(author.get("display_name", ""))
332 authors_str = ", ".join(authors)
333 if len(work.get("authorships", [])) > 5:
334 authors_str += " et al."
336 # Extract author affiliations for the institution-tier scoring.
337 # Each entry is a dict with the OpenAlex institution id, ROR id,
338 # and display name — the lookup_institution() helper accepts any
339 # of those three.
340 affiliations: list[dict] = []
341 seen_inst_ids: set[str] = set()
342 for authorship in work.get("authorships", []):
343 for inst in authorship.get("institutions", []) or []: 343 ↛ 344line 343 didn't jump to line 344 because the loop on line 343 never started
344 raw_id = inst.get("id") or ""
345 short_id = raw_id.split("/")[-1] if raw_id else ""
346 if short_id and short_id in seen_inst_ids:
347 continue
348 if short_id:
349 seen_inst_ids.add(short_id)
350 affiliations.append(
351 {
352 "openalex_id": short_id or None,
353 "ror": (inst.get("ror") or "")
354 .rstrip("/")
355 .split("/")[-1]
356 or None,
357 "name": inst.get("display_name"),
358 }
359 )
361 # Get metrics
362 cited_by_count = work.get("cited_by_count", 0)
364 # Get URL - prefer DOI, fallback to OpenAlex URL.
365 # `.get("doi", work_id)` is wrong: when the key exists with value
366 # None (common for non-DOI works) it returns None, not the
367 # default. Use `or` so a None DOI falls through to work_id.
368 url = work.get("doi") or work_id
369 if not url.startswith("http"):
370 if url.startswith("https://doi.org/"): 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 pass # Already a full DOI URL
372 elif url.startswith("10."):
373 url = f"https://doi.org/{url}"
374 else:
375 url = work_id # OpenAlex URL
377 # Check if open access
378 open_access_info = work.get("open_access", {})
379 is_oa = (
380 open_access_info.get("is_oa", False)
381 if open_access_info
382 else False
383 )
384 oa_url = None
385 if is_oa:
386 best_location = work.get("best_oa_location", {})
387 if best_location: 387 ↛ 392line 387 didn't jump to line 392 because the condition on line 387 was always true
388 oa_url = best_location.get("pdf_url") or best_location.get(
389 "landing_page_url"
390 )
392 return {
393 "id": work_id,
394 "title": title,
395 "link": url,
396 "snippet": snippet,
397 "authors": authors_str,
398 "year": publication_year,
399 "date": publication_date,
400 # Both fields emit None (not the "unknown" sentinel) when
401 # OpenAlex has no venue for this work. Downstream consumers
402 # (citation normalizer, journal reputation filter) treat
403 # missing venue as "no scoring signal", which is accurate;
404 # the old "unknown" sentinel leaked through the normalizer
405 # as a literal container_title and even matched a real
406 # OpenAlex source named "unknown" (h_index=5, Q1) in the
407 # reference DB.
408 "journal": journal_name if journal_name != "unknown" else None,
409 "journal_ref": journal_name
410 if journal_name != "unknown"
411 else None,
412 "issn": issn,
413 "affiliations": affiliations or None,
414 "openalex_source_id": openalex_source_id,
415 "source_type": source_type,
416 "citations": cited_by_count,
417 "is_open_access": is_oa,
418 "oa_url": oa_url,
419 "abstract": abstract,
420 "type": "academic_paper",
421 }
423 except Exception:
424 logger.exception(
425 f"Error formatting OpenAlex work: {work.get('id', 'unknown')}"
426 )
427 return None
429 def _reconstruct_abstract(
430 self, inverted_index: Dict[str, List[int]]
431 ) -> str:
432 """
433 Reconstruct abstract text from OpenAlex inverted index format.
435 Args:
436 inverted_index: Dictionary mapping words to their positions
438 Returns:
439 Reconstructed abstract text
440 """
441 try:
442 # Create position-word mapping
443 position_word = {}
444 for word, positions in inverted_index.items():
445 for pos in positions:
446 position_word[pos] = word
448 # Sort by position and reconstruct
449 sorted_positions = sorted(position_word.keys())
450 words = [position_word[pos] for pos in sorted_positions]
452 return " ".join(words)
454 except Exception:
455 logger.debug("Could not reconstruct abstract from inverted index")
456 return ""
458 def _get_full_content(
459 self, relevant_items: List[Dict[str, Any]]
460 ) -> List[Dict[str, Any]]:
461 """
462 Get full content for relevant items (OpenAlex provides most content in preview).
464 Args:
465 relevant_items: List of relevant preview dictionaries
467 Returns:
468 List of result dictionaries with full content
469 """
470 # OpenAlex returns comprehensive data in the initial search,
471 # so we don't need a separate full content fetch
472 results = []
473 for item in relevant_items:
474 result = {
475 "title": item.get("title", ""),
476 "link": item.get("link", ""),
477 "snippet": item.get("snippet", ""),
478 "content": item.get("abstract", item.get("snippet", "")),
479 # Forward journal quality fields for content filters
480 "journal_ref": item.get("journal_ref"),
481 "openalex_source_id": item.get("openalex_source_id"),
482 "source_type": item.get("source_type"),
483 "affiliations": item.get("affiliations"),
484 "metadata": {
485 "authors": item.get("authors", ""),
486 "year": item.get("year", ""),
487 "journal": item.get("journal", ""),
488 "citations": item.get("citations", 0),
489 "is_open_access": item.get("is_open_access", False),
490 "oa_url": item.get("oa_url"),
491 "affiliations": item.get("affiliations"),
492 },
493 }
494 results.append(result)
496 return results