Coverage for src/local_deep_research/journal_quality/data_sources/institutions.py: 82%

76 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""OpenAlex Institutions data source. 

2 

3Downloads the **bulk snapshot** of ~120K institutions from the public 

4OpenAlex S3 gateway (``openalex.s3.amazonaws.com``) and writes a 

5compact gzipped JSON snapshot used by the institution-scoring tier of 

6the journal filter. Each institution carries its ROR ID, country, 

7h-index, works count, and 2-year mean citedness. 

8 

9Why bulk snapshot instead of the REST API 

10----------------------------------------- 

11 

12The previous implementation cursor-paginated ``/api/institutions`` at 

13200/page with 100 ms polite sleeps — ~550 requests + ~55 s of sleep + 

14actual transfer. Wall-clock was 5–10 minutes per "Download Data" 

15click, which dominated the user-facing latency. The S3 dump is the 

16documented bulk path, CC0, no auth, no rate limits, and finishes 

17much faster. 

18""" 

19 

20from __future__ import annotations 

21 

22import gzip 

23import json 

24import time 

25from pathlib import Path 

26 

27from loguru import logger 

28 

29from ._openalex_common import ( 

30 OPENALEX_S3_BASE, 

31 iter_partitions, 

32 validate_manifest_entries, 

33) 

34from ..scoring import normalize_name 

35from .base import DataSource 

36 

37_OPENALEX_INSTITUTIONS_MANIFEST = ( 

38 f"{OPENALEX_S3_BASE}/data/institutions/manifest" 

39) 

40 

41# Safety floor — OpenAlex has ~120K institutions. Refuse to overwrite 

42# good data if the fetch produced far fewer records (likely schema 

43# change, empty partitions, or broken manifest). 

44_MIN_INSTITUTIONS = 50_000 

45 

46 

47class InstitutionSource(DataSource): 

48 key = "institutions" # gitleaks:allow 

49 name = "OpenAlex Institutions" 

50 url = "https://openalex.org" 

51 dataset_url = ( 

52 "https://docs.openalex.org/download-all-data/openalex-snapshot" 

53 ) 

54 license = "CC0 1.0" 

55 license_url = "https://creativecommons.org/publicdomain/zero/1.0/" 

56 description = ( 

57 "~120K research institutions with ROR ID, country, h-index, " 

58 "works count, and citation counts" 

59 ) 

60 filename = "openalex_institutions.json.gz" 

61 count_label = "institutions" 

62 auto_download = False # large; user opts in via dashboard 

63 required = False 

64 approx_size_mb = 10.0 # final compact output, NOT the raw snapshot 

65 

66 def fetch(self, data_dir: Path, progress_cb=None) -> int: 

67 from ...security.safe_requests import ( 

68 safe_get_with_retries as safe_get, 

69 ) 

70 

71 # 1. Fetch the manifest. 

72 logger.info( 

73 f"Fetching OpenAlex institutions manifest: " 

74 f"{_OPENALEX_INSTITUTIONS_MANIFEST}" 

75 ) 

76 # consume_body=True: small JSON but serial bottleneck — a body 

77 # transient here aborts the whole 10-min institutions pull. 

78 manifest_resp = safe_get( 

79 _OPENALEX_INSTITUTIONS_MANIFEST, timeout=30, consume_body=True 

80 ) 

81 manifest_resp.raise_for_status() 

82 manifest = manifest_resp.json() 

83 

84 entries = manifest.get("entries", []) 

85 

86 # Validate every part URL before fetching any part — SSRF 

87 # defense in depth. If any entry points outside the OpenAlex 

88 # bucket we refuse the whole fetch rather than partially trust. 

89 validate_manifest_entries(entries, "Institutions") 

90 

91 total_records = sum( 

92 e.get("meta", {}).get("record_count", 0) for e in entries 

93 ) 

94 total_bytes = sum( 

95 e.get("meta", {}).get("content_length", 0) for e in entries 

96 ) 

97 logger.info( 

98 f"OpenAlex institutions snapshot: {len(entries)} parts, " 

99 f"{total_records:,} records, " 

100 f"{total_bytes / 1024 / 1024:.0f} MB compressed" 

101 ) 

102 

103 # 2. Stream-process each part. The ``iter_partitions`` helper 

104 # in ``_openalex_common`` owns the tmp-file lifecycle and 

105 # first-10 malformed-JSON suppression; we focus on the 

106 # compact-format + secondary-index extraction. Compact 

107 # format matches the journal sources snapshot; ROR and 

108 # name indexes keep the runtime lookup path O(1). 

109 institutions: dict = {} 

110 ror_index: dict = {} 

111 name_index: dict = {} 

112 start = time.time() 

113 

114 for idx, total_parts, records in iter_partitions( 

115 entries, 

116 data_dir, 

117 file_prefix="openalex_institutions", 

118 label="Institutions", 

119 safe_get=safe_get, 

120 ): 

121 for inst in records: 

122 inst_id = (inst.get("id") or "").split("/")[-1] 

123 if not inst_id: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true

124 continue 

125 

126 stats = inst.get("summary_stats") or {} 

127 ror = (inst.get("ror") or "").rstrip("/").split("/")[-1] 

128 compact = { 

129 "n": inst.get("display_name", ""), 

130 "c": inst.get("country_code", ""), 

131 "t": inst.get("type", ""), 

132 "h": stats.get("h_index"), 

133 "if": stats.get("2yr_mean_citedness"), 

134 "w": inst.get("works_count"), 

135 "cb": inst.get("cited_by_count"), 

136 "r": ror or None, 

137 } 

138 institutions[inst_id] = compact 

139 

140 if ror: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 ror_index[ror] = inst_id 

142 

143 primary = normalize_name(inst.get("display_name", "") or "") 

144 if primary: 144 ↛ 146line 144 didn't jump to line 146 because the condition on line 144 was always true

145 name_index[primary] = inst_id 

146 for alt in inst.get("display_name_alternatives") or []: 146 ↛ 147line 146 didn't jump to line 147 because the loop on line 146 never started

147 alt_lower = normalize_name(alt or "") 

148 if alt_lower and alt_lower not in name_index: 

149 name_index[alt_lower] = inst_id 

150 

151 if (idx + 1) % 5 == 0 or idx == total_parts - 1: 151 ↛ 160line 151 didn't jump to line 160 because the condition on line 151 was always true

152 elapsed = time.time() - start 

153 logger.info( 

154 f"OpenAlex institutions: processed " 

155 f"{idx + 1}/{total_parts} parts " 

156 f"({len(institutions):,} records, {elapsed:.0f}s)" 

157 ) 

158 # Per-partition UI ping so the dashboard bar moves 

159 # frequently enough to feel live (vs every 5th partition). 

160 if progress_cb is not None: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 try: 

162 progress_cb( 

163 idx + 1, 

164 total_parts, 

165 f"{len(institutions):,} records", 

166 ) 

167 except Exception: 

168 logger.debug( 

169 "institutions progress_cb raised; continuing", 

170 exc_info=True, 

171 ) 

172 

173 if len(institutions) < _MIN_INSTITUTIONS: 

174 raise RuntimeError( 

175 f"OpenAlex institutions: suspiciously few records " 

176 f"({len(institutions):,} < {_MIN_INSTITUTIONS:,}); " 

177 "refusing to overwrite existing data" 

178 ) 

179 

180 payload = { 

181 "i": institutions, # institution ID → compact record 

182 "r": ror_index, # ROR ID → institution ID 

183 "nm": name_index, # lower-cased name → institution ID 

184 } 

185 

186 output = data_dir / self.filename 

187 tmp = data_dir / f"{self.filename}.tmp" 

188 with gzip.open(tmp, "wt", encoding="utf-8") as f: 

189 json.dump(payload, f) 

190 tmp.rename(output) 

191 

192 elapsed = time.time() - start 

193 logger.info( 

194 f"OpenAlex institutions: saved {len(institutions):,} " 

195 f"institutions in {elapsed:.0f}s" 

196 ) 

197 return len(institutions)