Coverage for src/local_deep_research/journal_quality/data_sources/openalex.py: 87%
83 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""OpenAlex sources data source.
3Downloads the **bulk snapshot** of ~280K journals + conferences from
4``openalex.s3.amazonaws.com`` and writes a compact gzipped JSON file
5used by the in-memory tier-2 lookups.
7This is the public OpenAlex S3 snapshot — CC0 licensed, no auth, no
8rate limits, ~350 MB compressed across ~40 partition files. We stream
9each part, extract the few fields we need, and discard the rest. Total
10wall-clock is ~30–60 s on a normal connection.
12Why bulk snapshot instead of the REST API
13-----------------------------------------
15The previous implementation paginated ``/api/sources`` with
16``per_page=200`` and a 100 ms polite-rate-limiting sleep between
17requests. For ~280K records that meant ~1,400 HTTP requests + ~140 s
18of pure sleep, plus actual transfer time — wall-clock 5–10 minutes,
19and that's the time the user spends staring at the dashboard
20"Download Data" button. The S3 dump is the recommended bulk path per
21OpenAlex docs and finishes in well under a minute.
22"""
24from __future__ import annotations
26import gzip
27import json
28import time
29from pathlib import Path
31from loguru import logger
33from ...utilities.citation_normalizer import normalize_issn
34from .base import DataSource
36from ._openalex_common import (
37 OPENALEX_S3_BASE,
38 iter_partitions,
39 validate_manifest_entries,
40)
42_OPENALEX_SOURCES_MANIFEST = f"{OPENALEX_S3_BASE}/data/sources/manifest"
45class SchemaDriftError(RuntimeError):
46 """OpenAlex renamed / removed a required field in the snapshot.
48 A row-count floor catches the case where the whole fetch collapses,
49 but not the case where every row loads but a key field (``h_index``,
50 ``cited_by_count``) is silently None. We refuse to overwrite the
51 existing snapshot in that case — better to keep the old data than
52 rebuild an all-None DB that would quietly reclassify every journal
53 into the "unknown" quality tier.
54 """
57class OpenAlexSource(DataSource):
58 key = "openalex" # gitleaks:allow
59 name = "OpenAlex"
60 url = "https://openalex.org"
61 dataset_url = (
62 "https://docs.openalex.org/download-all-data/openalex-snapshot"
63 )
64 license = "CC0 1.0"
65 license_url = "https://creativecommons.org/publicdomain/zero/1.0/"
66 description = (
67 "~280K journals and conferences with h-index, impact factor, "
68 "and publisher metadata"
69 )
70 filename = "openalex_sources.json.gz"
71 count_label = "OpenAlex sources"
72 auto_download = False # large; user opts in via dashboard
73 required = True # bulk-download fatal-on-failure
74 approx_size_mb = 13.0 # final compact output, NOT the raw snapshot
76 def fetch(self, data_dir: Path, progress_cb=None) -> int:
77 from ...security.safe_requests import (
78 safe_get_with_retries as safe_get,
79 )
81 # 1. Fetch the manifest. Tells us which partition files exist
82 # and how many records to expect — so we can give the user
83 # an accurate progress log instead of just dots.
84 logger.info(
85 f"Fetching OpenAlex sources manifest: {_OPENALEX_SOURCES_MANIFEST}"
86 )
87 # consume_body=True: the manifest is small but a serial
88 # bottleneck for the whole download. A body-read transient
89 # here aborts everything.
90 manifest_resp = safe_get(
91 _OPENALEX_SOURCES_MANIFEST, timeout=30, consume_body=True
92 )
93 manifest_resp.raise_for_status()
94 manifest = manifest_resp.json()
96 entries = manifest.get("entries", [])
97 total_records = sum(
98 e.get("meta", {}).get("record_count", 0) for e in entries
99 )
100 total_bytes = sum(
101 e.get("meta", {}).get("content_length", 0) for e in entries
102 )
103 logger.info(
104 f"OpenAlex sources snapshot: {len(entries)} parts, "
105 f"{total_records:,} records, "
106 f"{total_bytes / 1024 / 1024:.0f} MB compressed"
107 )
109 # Validate manifest URLs before fetching anything, so a
110 # compromised manifest cannot redirect fetches to an arbitrary
111 # host. Legitimate OpenAlex manifest entries always use the
112 # s3://openalex/ prefix.
113 validate_manifest_entries(entries, "OpenAlex sources")
115 # 2. Stream-process each part. The ``iter_partitions`` helper
116 # in ``_openalex_common`` owns the tmp-file lifecycle + the
117 # first-10 malformed-JSON suppression; we just consume
118 # records and track our own schema-drift counters.
119 type_map = {"journal": "j", "conference": "c"}
120 sources: dict = {}
121 # Raw parse counters feed the ``id``-rename drift check below —
122 # records without an ``id`` are silently skipped at the point of
123 # extraction, so we need to know the ratio to distinguish a
124 # collapsed fetch from a renamed identifier field.
125 parsed_records = 0
126 parsed_with_id = 0
127 start = time.time()
129 for idx, total_parts, records in iter_partitions(
130 entries,
131 data_dir,
132 file_prefix="openalex_sources",
133 label="OpenAlex sources",
134 safe_get=safe_get,
135 ):
136 for rec in records:
137 parsed_records += 1
138 src_id = (rec.get("id") or "").split("/")[-1]
139 if not src_id:
140 continue
141 parsed_with_id += 1
143 stats = rec.get("summary_stats") or {}
144 compact = {
145 "n": rec.get("display_name", ""),
146 "t": type_map.get(rec.get("type", ""), rec.get("type", "")),
147 "h": stats.get("h_index"),
148 "if": stats.get("2yr_mean_citedness"),
149 "cb": rec.get("cited_by_count"),
150 "p": rec.get("host_organization_name") or "",
151 "i": normalize_issn(rec.get("issn_l")),
152 }
153 if rec.get("is_in_doaj"): 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 compact["d"] = 1
155 if rec.get("is_core"): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 compact["s"] = 1
157 sources[src_id] = compact
159 if (idx + 1) % 5 == 0 or idx == total_parts - 1: 159 ↛ 167line 159 didn't jump to line 167 because the condition on line 159 was always true
160 elapsed = time.time() - start
161 logger.info(
162 f"OpenAlex sources: processed {idx + 1}/{total_parts} "
163 f"parts ({len(sources):,} records, {elapsed:.0f}s)"
164 )
165 # Report on EVERY partition, not just every 5th — the UI
166 # needs smoother updates than the human-readable log.
167 if progress_cb is not None: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 try:
169 progress_cb(
170 idx + 1,
171 total_parts,
172 f"{len(sources):,} records",
173 )
174 except Exception:
175 logger.debug(
176 "OpenAlex progress_cb raised; continuing",
177 exc_info=True,
178 )
180 # ``id``-rename drift runs *before* the row-count floor: a
181 # renamed identifier makes every record drop out at parse time
182 # (src_id empty → continue), so ``sources`` is empty and the
183 # row-count floor would fire first with a generic RuntimeError
184 # that hides the actual cause.
185 if parsed_records >= 10_000 and parsed_with_id == 0:
186 raise SchemaDriftError(
187 f"OpenAlex snapshot parsed {parsed_records:,} records but "
188 "none carried an 'id' field — the source identifier may "
189 "have been renamed (e.g. to 'source_id'). Refusing to "
190 "overwrite existing data — please check "
191 "https://docs.openalex.org/download-all-data/"
192 "snapshot-data-format for schema changes."
193 )
195 # 3. Write the compact snapshot in the same shape the existing
196 # build pipeline expects (`{"s": {src_id: compact}}`).
197 # Sanity check: OpenAlex normally has ~280K sources. If the
198 # fetch silently returned a tiny subset (e.g., every partition
199 # returned an empty shard), refuse to overwrite existing data.
200 _MIN_OPENALEX_SOURCES = 10_000
201 if len(sources) < _MIN_OPENALEX_SOURCES: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise RuntimeError(
203 f"OpenAlex sources: suspiciously few records "
204 f"({len(sources):,} < {_MIN_OPENALEX_SOURCES:,}); "
205 "refusing to overwrite existing data"
206 )
208 # Field-level schema drift detection. The row-count floor above
209 # catches a collapsed fetch, but not a silent upstream rename.
210 # The journal-only sample avoids false-triggering on snapshots
211 # that skew to non-journal types (conferences, repositories,
212 # etc.) which legitimately lack ``h_index``.
213 _SCHEMA_SAMPLE_SIZE = 100
214 journal_sample = [r for r in sources.values() if r.get("t") == "j"][
215 :_SCHEMA_SAMPLE_SIZE
216 ]
217 if len(journal_sample) >= _SCHEMA_SAMPLE_SIZE: 217 ↛ 237line 217 didn't jump to line 237 because the condition on line 217 was always true
218 has_hindex = any(r.get("h") is not None for r in journal_sample)
219 has_cited = any(r.get("cb") is not None for r in journal_sample)
220 if not has_hindex or not has_cited:
221 raise SchemaDriftError(
222 "OpenAlex snapshot appears to have renamed a required "
223 "field: "
224 f"h_index present in journal sample={has_hindex}, "
225 f"cited_by_count present in journal sample={has_cited}. "
226 "Refusing to overwrite existing data — please check "
227 "https://docs.openalex.org/download-all-data/"
228 "snapshot-data-format for schema changes."
229 )
230 else:
231 # The row-count floor above already refuses a collapsed
232 # fetch. This branch only fires in unusual cases (truncated
233 # test snapshot, snapshot with very few journal-typed
234 # sources). Log at info so operators see the drift check
235 # was bypassed — debug would be invisible at production
236 # log levels.
237 logger.info(
238 "OpenAlex schema-drift check skipped: "
239 f"only {len(journal_sample)} journal source(s) in sample "
240 f"(< {_SCHEMA_SAMPLE_SIZE} required)"
241 )
243 output = data_dir / self.filename
244 tmp = data_dir / f"{self.filename}.tmp"
245 with gzip.open(tmp, "wt", encoding="utf-8") as f:
246 json.dump({"s": sources}, f)
247 tmp.rename(output)
249 elapsed = time.time() - start
250 logger.info(
251 f"OpenAlex sources: saved {len(sources):,} sources in {elapsed:.0f}s"
252 )
253 return len(sources)