Coverage for src/local_deep_research/journal_quality/data_sources/institutions.py: 82%
76 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""OpenAlex Institutions data source.
3Downloads the **bulk snapshot** of ~120K institutions from the public
4OpenAlex S3 gateway (``openalex.s3.amazonaws.com``) and writes a
5compact gzipped JSON snapshot used by the institution-scoring tier of
6the journal filter. Each institution carries its ROR ID, country,
7h-index, works count, and 2-year mean citedness.
9Why bulk snapshot instead of the REST API
10-----------------------------------------
12The previous implementation cursor-paginated ``/api/institutions`` at
13200/page with 100 ms polite sleeps — ~550 requests + ~55 s of sleep +
14actual transfer. Wall-clock was 5–10 minutes per "Download Data"
15click, which dominated the user-facing latency. The S3 dump is the
16documented bulk path, CC0, no auth, no rate limits, and finishes
17much faster.
18"""
20from __future__ import annotations
22import gzip
23import json
24import time
25from pathlib import Path
27from loguru import logger
29from ._openalex_common import (
30 OPENALEX_S3_BASE,
31 iter_partitions,
32 validate_manifest_entries,
33)
34from ..scoring import normalize_name
35from .base import DataSource
37_OPENALEX_INSTITUTIONS_MANIFEST = (
38 f"{OPENALEX_S3_BASE}/data/institutions/manifest"
39)
41# Safety floor — OpenAlex has ~120K institutions. Refuse to overwrite
42# good data if the fetch produced far fewer records (likely schema
43# change, empty partitions, or broken manifest).
44_MIN_INSTITUTIONS = 50_000
47class InstitutionSource(DataSource):
48 key = "institutions" # gitleaks:allow
49 name = "OpenAlex Institutions"
50 url = "https://openalex.org"
51 dataset_url = (
52 "https://docs.openalex.org/download-all-data/openalex-snapshot"
53 )
54 license = "CC0 1.0"
55 license_url = "https://creativecommons.org/publicdomain/zero/1.0/"
56 description = (
57 "~120K research institutions with ROR ID, country, h-index, "
58 "works count, and citation counts"
59 )
60 filename = "openalex_institutions.json.gz"
61 count_label = "institutions"
62 auto_download = False # large; user opts in via dashboard
63 required = False
64 approx_size_mb = 10.0 # final compact output, NOT the raw snapshot
66 def fetch(self, data_dir: Path, progress_cb=None) -> int:
67 from ...security.safe_requests import (
68 safe_get_with_retries as safe_get,
69 )
71 # 1. Fetch the manifest.
72 logger.info(
73 f"Fetching OpenAlex institutions manifest: "
74 f"{_OPENALEX_INSTITUTIONS_MANIFEST}"
75 )
76 # consume_body=True: small JSON but serial bottleneck — a body
77 # transient here aborts the whole 10-min institutions pull.
78 manifest_resp = safe_get(
79 _OPENALEX_INSTITUTIONS_MANIFEST, timeout=30, consume_body=True
80 )
81 manifest_resp.raise_for_status()
82 manifest = manifest_resp.json()
84 entries = manifest.get("entries", [])
86 # Validate every part URL before fetching any part — SSRF
87 # defense in depth. If any entry points outside the OpenAlex
88 # bucket we refuse the whole fetch rather than partially trust.
89 validate_manifest_entries(entries, "Institutions")
91 total_records = sum(
92 e.get("meta", {}).get("record_count", 0) for e in entries
93 )
94 total_bytes = sum(
95 e.get("meta", {}).get("content_length", 0) for e in entries
96 )
97 logger.info(
98 f"OpenAlex institutions snapshot: {len(entries)} parts, "
99 f"{total_records:,} records, "
100 f"{total_bytes / 1024 / 1024:.0f} MB compressed"
101 )
103 # 2. Stream-process each part. The ``iter_partitions`` helper
104 # in ``_openalex_common`` owns the tmp-file lifecycle and
105 # first-10 malformed-JSON suppression; we focus on the
106 # compact-format + secondary-index extraction. Compact
107 # format matches the journal sources snapshot; ROR and
108 # name indexes keep the runtime lookup path O(1).
109 institutions: dict = {}
110 ror_index: dict = {}
111 name_index: dict = {}
112 start = time.time()
114 for idx, total_parts, records in iter_partitions(
115 entries,
116 data_dir,
117 file_prefix="openalex_institutions",
118 label="Institutions",
119 safe_get=safe_get,
120 ):
121 for inst in records:
122 inst_id = (inst.get("id") or "").split("/")[-1]
123 if not inst_id: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 continue
126 stats = inst.get("summary_stats") or {}
127 ror = (inst.get("ror") or "").rstrip("/").split("/")[-1]
128 compact = {
129 "n": inst.get("display_name", ""),
130 "c": inst.get("country_code", ""),
131 "t": inst.get("type", ""),
132 "h": stats.get("h_index"),
133 "if": stats.get("2yr_mean_citedness"),
134 "w": inst.get("works_count"),
135 "cb": inst.get("cited_by_count"),
136 "r": ror or None,
137 }
138 institutions[inst_id] = compact
140 if ror: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 ror_index[ror] = inst_id
143 primary = normalize_name(inst.get("display_name", "") or "")
144 if primary: 144 ↛ 146line 144 didn't jump to line 146 because the condition on line 144 was always true
145 name_index[primary] = inst_id
146 for alt in inst.get("display_name_alternatives") or []: 146 ↛ 147line 146 didn't jump to line 147 because the loop on line 146 never started
147 alt_lower = normalize_name(alt or "")
148 if alt_lower and alt_lower not in name_index:
149 name_index[alt_lower] = inst_id
151 if (idx + 1) % 5 == 0 or idx == total_parts - 1: 151 ↛ 160line 151 didn't jump to line 160 because the condition on line 151 was always true
152 elapsed = time.time() - start
153 logger.info(
154 f"OpenAlex institutions: processed "
155 f"{idx + 1}/{total_parts} parts "
156 f"({len(institutions):,} records, {elapsed:.0f}s)"
157 )
158 # Per-partition UI ping so the dashboard bar moves
159 # frequently enough to feel live (vs every 5th partition).
160 if progress_cb is not None: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 try:
162 progress_cb(
163 idx + 1,
164 total_parts,
165 f"{len(institutions):,} records",
166 )
167 except Exception:
168 logger.debug(
169 "institutions progress_cb raised; continuing",
170 exc_info=True,
171 )
173 if len(institutions) < _MIN_INSTITUTIONS:
174 raise RuntimeError(
175 f"OpenAlex institutions: suspiciously few records "
176 f"({len(institutions):,} < {_MIN_INSTITUTIONS:,}); "
177 "refusing to overwrite existing data"
178 )
180 payload = {
181 "i": institutions, # institution ID → compact record
182 "r": ror_index, # ROR ID → institution ID
183 "nm": name_index, # lower-cased name → institution ID
184 }
186 output = data_dir / self.filename
187 tmp = data_dir / f"{self.filename}.tmp"
188 with gzip.open(tmp, "wt", encoding="utf-8") as f:
189 json.dump(payload, f)
190 tmp.rename(output)
192 elapsed = time.time() - start
193 logger.info(
194 f"OpenAlex institutions: saved {len(institutions):,} "
195 f"institutions in {elapsed:.0f}s"
196 )
197 return len(institutions)