Coverage for src/local_deep_research/journal_quality/data_sources/openalex.py: 87%

83 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""OpenAlex sources data source. 

2 

3Downloads the **bulk snapshot** of ~280K journals + conferences from 

4``openalex.s3.amazonaws.com`` and writes a compact gzipped JSON file 

5used by the in-memory tier-2 lookups. 

6 

7This is the public OpenAlex S3 snapshot — CC0 licensed, no auth, no 

8rate limits, ~350 MB compressed across ~40 partition files. We stream 

9each part, extract the few fields we need, and discard the rest. Total 

10wall-clock is ~30–60 s on a normal connection. 

11 

12Why bulk snapshot instead of the REST API 

13----------------------------------------- 

14 

15The previous implementation paginated ``/api/sources`` with 

16``per_page=200`` and a 100 ms polite-rate-limiting sleep between 

17requests. For ~280K records that meant ~1,400 HTTP requests + ~140 s 

18of pure sleep, plus actual transfer time — wall-clock 5–10 minutes, 

19and that's the time the user spends staring at the dashboard 

20"Download Data" button. The S3 dump is the recommended bulk path per 

21OpenAlex docs and finishes in well under a minute. 

22""" 

23 

24from __future__ import annotations 

25 

26import gzip 

27import json 

28import time 

29from pathlib import Path 

30 

31from loguru import logger 

32 

33from ...utilities.citation_normalizer import normalize_issn 

34from .base import DataSource 

35 

36from ._openalex_common import ( 

37 OPENALEX_S3_BASE, 

38 iter_partitions, 

39 validate_manifest_entries, 

40) 

41 

42_OPENALEX_SOURCES_MANIFEST = f"{OPENALEX_S3_BASE}/data/sources/manifest" 

43 

44 

45class SchemaDriftError(RuntimeError): 

46 """OpenAlex renamed / removed a required field in the snapshot. 

47 

48 A row-count floor catches the case where the whole fetch collapses, 

49 but not the case where every row loads but a key field (``h_index``, 

50 ``cited_by_count``) is silently None. We refuse to overwrite the 

51 existing snapshot in that case — better to keep the old data than 

52 rebuild an all-None DB that would quietly reclassify every journal 

53 into the "unknown" quality tier. 

54 """ 

55 

56 

57class OpenAlexSource(DataSource): 

58 key = "openalex" # gitleaks:allow 

59 name = "OpenAlex" 

60 url = "https://openalex.org" 

61 dataset_url = ( 

62 "https://docs.openalex.org/download-all-data/openalex-snapshot" 

63 ) 

64 license = "CC0 1.0" 

65 license_url = "https://creativecommons.org/publicdomain/zero/1.0/" 

66 description = ( 

67 "~280K journals and conferences with h-index, impact factor, " 

68 "and publisher metadata" 

69 ) 

70 filename = "openalex_sources.json.gz" 

71 count_label = "OpenAlex sources" 

72 auto_download = False # large; user opts in via dashboard 

73 required = True # bulk-download fatal-on-failure 

74 approx_size_mb = 13.0 # final compact output, NOT the raw snapshot 

75 

76 def fetch(self, data_dir: Path, progress_cb=None) -> int: 

77 from ...security.safe_requests import ( 

78 safe_get_with_retries as safe_get, 

79 ) 

80 

81 # 1. Fetch the manifest. Tells us which partition files exist 

82 # and how many records to expect — so we can give the user 

83 # an accurate progress log instead of just dots. 

84 logger.info( 

85 f"Fetching OpenAlex sources manifest: {_OPENALEX_SOURCES_MANIFEST}" 

86 ) 

87 # consume_body=True: the manifest is small but a serial 

88 # bottleneck for the whole download. A body-read transient 

89 # here aborts everything. 

90 manifest_resp = safe_get( 

91 _OPENALEX_SOURCES_MANIFEST, timeout=30, consume_body=True 

92 ) 

93 manifest_resp.raise_for_status() 

94 manifest = manifest_resp.json() 

95 

96 entries = manifest.get("entries", []) 

97 total_records = sum( 

98 e.get("meta", {}).get("record_count", 0) for e in entries 

99 ) 

100 total_bytes = sum( 

101 e.get("meta", {}).get("content_length", 0) for e in entries 

102 ) 

103 logger.info( 

104 f"OpenAlex sources snapshot: {len(entries)} parts, " 

105 f"{total_records:,} records, " 

106 f"{total_bytes / 1024 / 1024:.0f} MB compressed" 

107 ) 

108 

109 # Validate manifest URLs before fetching anything, so a 

110 # compromised manifest cannot redirect fetches to an arbitrary 

111 # host. Legitimate OpenAlex manifest entries always use the 

112 # s3://openalex/ prefix. 

113 validate_manifest_entries(entries, "OpenAlex sources") 

114 

115 # 2. Stream-process each part. The ``iter_partitions`` helper 

116 # in ``_openalex_common`` owns the tmp-file lifecycle + the 

117 # first-10 malformed-JSON suppression; we just consume 

118 # records and track our own schema-drift counters. 

119 type_map = {"journal": "j", "conference": "c"} 

120 sources: dict = {} 

121 # Raw parse counters feed the ``id``-rename drift check below — 

122 # records without an ``id`` are silently skipped at the point of 

123 # extraction, so we need to know the ratio to distinguish a 

124 # collapsed fetch from a renamed identifier field. 

125 parsed_records = 0 

126 parsed_with_id = 0 

127 start = time.time() 

128 

129 for idx, total_parts, records in iter_partitions( 

130 entries, 

131 data_dir, 

132 file_prefix="openalex_sources", 

133 label="OpenAlex sources", 

134 safe_get=safe_get, 

135 ): 

136 for rec in records: 

137 parsed_records += 1 

138 src_id = (rec.get("id") or "").split("/")[-1] 

139 if not src_id: 

140 continue 

141 parsed_with_id += 1 

142 

143 stats = rec.get("summary_stats") or {} 

144 compact = { 

145 "n": rec.get("display_name", ""), 

146 "t": type_map.get(rec.get("type", ""), rec.get("type", "")), 

147 "h": stats.get("h_index"), 

148 "if": stats.get("2yr_mean_citedness"), 

149 "cb": rec.get("cited_by_count"), 

150 "p": rec.get("host_organization_name") or "", 

151 "i": normalize_issn(rec.get("issn_l")), 

152 } 

153 if rec.get("is_in_doaj"): 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 compact["d"] = 1 

155 if rec.get("is_core"): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 compact["s"] = 1 

157 sources[src_id] = compact 

158 

159 if (idx + 1) % 5 == 0 or idx == total_parts - 1: 159 ↛ 167line 159 didn't jump to line 167 because the condition on line 159 was always true

160 elapsed = time.time() - start 

161 logger.info( 

162 f"OpenAlex sources: processed {idx + 1}/{total_parts} " 

163 f"parts ({len(sources):,} records, {elapsed:.0f}s)" 

164 ) 

165 # Report on EVERY partition, not just every 5th — the UI 

166 # needs smoother updates than the human-readable log. 

167 if progress_cb is not None: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 try: 

169 progress_cb( 

170 idx + 1, 

171 total_parts, 

172 f"{len(sources):,} records", 

173 ) 

174 except Exception: 

175 logger.debug( 

176 "OpenAlex progress_cb raised; continuing", 

177 exc_info=True, 

178 ) 

179 

180 # ``id``-rename drift runs *before* the row-count floor: a 

181 # renamed identifier makes every record drop out at parse time 

182 # (src_id empty → continue), so ``sources`` is empty and the 

183 # row-count floor would fire first with a generic RuntimeError 

184 # that hides the actual cause. 

185 if parsed_records >= 10_000 and parsed_with_id == 0: 

186 raise SchemaDriftError( 

187 f"OpenAlex snapshot parsed {parsed_records:,} records but " 

188 "none carried an 'id' field — the source identifier may " 

189 "have been renamed (e.g. to 'source_id'). Refusing to " 

190 "overwrite existing data — please check " 

191 "https://docs.openalex.org/download-all-data/" 

192 "snapshot-data-format for schema changes." 

193 ) 

194 

195 # 3. Write the compact snapshot in the same shape the existing 

196 # build pipeline expects (`{"s": {src_id: compact}}`). 

197 # Sanity check: OpenAlex normally has ~280K sources. If the 

198 # fetch silently returned a tiny subset (e.g., every partition 

199 # returned an empty shard), refuse to overwrite existing data. 

200 _MIN_OPENALEX_SOURCES = 10_000 

201 if len(sources) < _MIN_OPENALEX_SOURCES: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise RuntimeError( 

203 f"OpenAlex sources: suspiciously few records " 

204 f"({len(sources):,} < {_MIN_OPENALEX_SOURCES:,}); " 

205 "refusing to overwrite existing data" 

206 ) 

207 

208 # Field-level schema drift detection. The row-count floor above 

209 # catches a collapsed fetch, but not a silent upstream rename. 

210 # The journal-only sample avoids false-triggering on snapshots 

211 # that skew to non-journal types (conferences, repositories, 

212 # etc.) which legitimately lack ``h_index``. 

213 _SCHEMA_SAMPLE_SIZE = 100 

214 journal_sample = [r for r in sources.values() if r.get("t") == "j"][ 

215 :_SCHEMA_SAMPLE_SIZE 

216 ] 

217 if len(journal_sample) >= _SCHEMA_SAMPLE_SIZE: 217 ↛ 237line 217 didn't jump to line 237 because the condition on line 217 was always true

218 has_hindex = any(r.get("h") is not None for r in journal_sample) 

219 has_cited = any(r.get("cb") is not None for r in journal_sample) 

220 if not has_hindex or not has_cited: 

221 raise SchemaDriftError( 

222 "OpenAlex snapshot appears to have renamed a required " 

223 "field: " 

224 f"h_index present in journal sample={has_hindex}, " 

225 f"cited_by_count present in journal sample={has_cited}. " 

226 "Refusing to overwrite existing data — please check " 

227 "https://docs.openalex.org/download-all-data/" 

228 "snapshot-data-format for schema changes." 

229 ) 

230 else: 

231 # The row-count floor above already refuses a collapsed 

232 # fetch. This branch only fires in unusual cases (truncated 

233 # test snapshot, snapshot with very few journal-typed 

234 # sources). Log at info so operators see the drift check 

235 # was bypassed — debug would be invisible at production 

236 # log levels. 

237 logger.info( 

238 "OpenAlex schema-drift check skipped: " 

239 f"only {len(journal_sample)} journal source(s) in sample " 

240 f"(< {_SCHEMA_SAMPLE_SIZE} required)" 

241 ) 

242 

243 output = data_dir / self.filename 

244 tmp = data_dir / f"{self.filename}.tmp" 

245 with gzip.open(tmp, "wt", encoding="utf-8") as f: 

246 json.dump({"s": sources}, f) 

247 tmp.rename(output) 

248 

249 elapsed = time.time() - start 

250 logger.info( 

251 f"OpenAlex sources: saved {len(sources):,} sources in {elapsed:.0f}s" 

252 ) 

253 return len(sources)