Coverage for src/local_deep_research/journal_quality/downloader.py: 80%

223 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""Bulk-fetch and lazy-load journal-quality datasets. 

2 

3This module is now a thin orchestration layer over the 

4`data_sources` package. Each academic dataset is a `DataSource` 

5subclass; this module just iterates `ALL_SOURCES` to drive the bulk 

6download flow (the dashboard "Download Data" button) and to compute the 

7status payload returned by the `/metrics/api/journal-data/status` 

8endpoint. 

9 

10Public API (kept stable for existing callers and tests): 

11- `JOURNAL_DATA_VERSION` 

12- `get_journal_data_status()` 

13- `download_journal_data(force=False)` 

14- `ensure_journal_data(auto_download=True)` 

15 

16Test-patch surface (kept stable for tests that mock by string path): 

17- `_get_data_dir` 

18- `_fetch_openalex_sources` (re-export shim) 

19- `_fetch_doaj_journals` (re-export shim) 

20 

21Data source registry lives in `.data_sources` and is the single source 

22of truth for source metadata, fetch logic, and download policy. 

23""" 

24 

25from __future__ import annotations 

26 

27import json 

28import time 

29from pathlib import Path 

30from typing import Optional 

31 

32from loguru import logger 

33 

34from .data_sources import ALL_SOURCES, get_source 

35 

36JOURNAL_DATA_VERSION = "v4" 

37 

38_SENTINEL = ".downloading" 

39 

40# If the sentinel is older than this, assume the previous download 

41# crashed mid-way (thread died before the `finally` cleanup could run) 

42# and reclaim it. The expected wall-clock is ~7 minutes, so 20 minutes 

43# is a generous safety margin that still catches stuck sentinels. 

44_SENTINEL_STALE_SECS = 20 * 60 

45 

46# Shared progress state for the dashboard's status endpoint. 

47# A module-level dict is deliberate: there's only ever one concurrent 

48# download (enforced by the O_EXCL sentinel), and the status endpoint 

49# reads it on a best-effort basis. Structure: 

50# state: "idle" | "running" | "error" | "success" 

51# started_at: epoch seconds, or None 

52# finished_at: epoch seconds, or None 

53# sources: {src_key: {name, state, detail, count}} 

54# db_build: {state, detail} 

55# error_msg: str or None 

56# 

57# Per-source entries track independent parallel downloads. Writes are 

58# atomic at the per-key level in CPython, so workers can update their 

59# own sub-dict without a lock; the main thread composes the overall 

60# `state` / `error_msg` after joining. 

61_download_state: dict = { 

62 "state": "idle", 

63 "started_at": None, 

64 "finished_at": None, 

65 "sources": {}, 

66 "db_build": {"state": "pending", "detail": ""}, 

67 "error_msg": None, 

68 # Per-source final counts from the most recently COMPLETED download. 

69 # Set to a dict only on the success path; explicitly invalidated to 

70 # None on every other return in `download_journal_data` (up-to-date, 

71 # disk-space, sentinel-race, required-failure, DB-build-failure) so 

72 # callers cannot read stale counts from a prior successful run. 

73 # Callers rendering a user-facing summary should prefer this 

74 # structured field over parsing the `(success, message)` tuple's 

75 # string. 

76 "counts": None, 

77} 

78 

79 

80def get_download_state() -> dict: 

81 """Return a copy of the current download progress state. 

82 

83 A shallow dict copy is not enough — callers (the status endpoint) 

84 serialize this to JSON and would otherwise race with live updates. 

85 We copy the nested dicts too. 

86 """ 

87 counts = _download_state["counts"] 

88 return { 

89 "state": _download_state["state"], 

90 "started_at": _download_state["started_at"], 

91 "finished_at": _download_state["finished_at"], 

92 "db_build": dict(_download_state["db_build"]), 

93 "error_msg": _download_state["error_msg"], 

94 "sources": {k: dict(v) for k, v in _download_state["sources"].items()}, 

95 "counts": dict(counts) if counts is not None else None, 

96 } 

97 

98 

99def _set_source_state(key: str, **updates) -> None: 

100 """Update a single source's state entry. Safe to call from worker 

101 threads because the only writer of ``sources[key]`` is that key's 

102 own worker (CPython guarantees dict-item writes are atomic). 

103 """ 

104 entry = _download_state["sources"].setdefault( 

105 key, {"name": key, "state": "pending", "detail": "", "count": 0} 

106 ) 

107 entry.update(updates) 

108 logger.info( 

109 f"journal-data progress: {entry.get('name', key)} " 

110 f"{entry.get('state', '?')} {entry.get('detail', '')}".rstrip() 

111 ) 

112 

113 

114def _get_data_dir() -> Path: 

115 """Get the journal data directory (user-writable). 

116 

117 Kept as a module-level function so tests can patch it via 

118 `mock.patch("...journal_data_downloader._get_data_dir", ...)`. 

119 """ 

120 from ..config.paths import get_journal_data_directory 

121 

122 return get_journal_data_directory() 

123 

124 

125def _clear_orphan_sentinel_on_startup() -> None: 

126 """Remove any ``.downloading`` sentinel left over from a previous 

127 process that got killed mid-download. 

128 

129 The sentinel is created inside ``download_journal_data`` and cleaned 

130 up in its ``finally`` block. If the process is SIGKILLed (or crashes 

131 hard enough that the ``finally`` doesn't run) the file sits on disk 

132 forever. Every subsequent call then sees "Download already in 

133 progress" and bows out, even though nothing is actually downloading. 

134 

135 Called once at import time. A fresh process can't possibly own an 

136 in-progress download, so any pre-existing sentinel is by definition 

137 an orphan. The 20-minute ``_SENTINEL_STALE_SECS`` recovery path 

138 still exists for the "process still alive but hung" case. 

139 

140 Swallows all exceptions — a misread on startup must not break the 

141 module. Tests that run concurrently in the same process 

142 (test_concurrent_download_blocked) set the sentinel deliberately 

143 *after* import, so this runs once and doesn't interfere. 

144 """ 

145 try: 

146 sentinel = _get_data_dir() / _SENTINEL 

147 if sentinel.exists(): 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 sentinel.unlink() 

149 logger.warning( 

150 f"Cleared orphan {_SENTINEL} sentinel from a previous run " 

151 f"(the old process was killed mid-download; a fresh process " 

152 f"cannot own an in-progress download)." 

153 ) 

154 except Exception: 

155 logger.exception( 

156 "Could not clear orphan sentinel on startup; " 

157 "new downloads may be blocked until the 20-minute stale timer." 

158 ) 

159 

160 

161_clear_orphan_sentinel_on_startup() 

162 

163 

164# --------------------------------------------------------------------------- 

165# Test-patch shims for `_fetch_openalex_sources` and `_fetch_doaj_journals` 

166# 

167# Tests in `tests/utilities/test_journal_data_downloader.py` patch these 

168# names by string path. The bodies have moved into `OpenAlexSource.fetch` 

169# and `DOAJSource.fetch`, but we expose module-level wrappers so the 

170# existing patches keep intercepting calls. The bulk download loop below 

171# routes both sources through these wrappers (not directly through 

172# `.fetch()`) so that `mock.patch` substitutions take effect. 

173# --------------------------------------------------------------------------- 

174 

175 

176def _fetch_openalex_sources(data_dir: Path, progress_cb=None) -> int: 

177 return get_source("openalex").fetch(data_dir, progress_cb=progress_cb) 

178 

179 

180def _fetch_doaj_journals(data_dir: Path, progress_cb=None) -> int: 

181 return get_source("doaj").fetch(data_dir, progress_cb=progress_cb) 

182 

183 

184def _fetch_predatory(data_dir: Path, progress_cb=None) -> int: 

185 return get_source("predatory").fetch(data_dir, progress_cb=progress_cb) 

186 

187 

188def _fetch_jabref_abbreviations(data_dir: Path, progress_cb=None) -> int: 

189 return get_source("jabref").fetch(data_dir, progress_cb=progress_cb) 

190 

191 

192def _fetch_institutions(data_dir: Path, progress_cb=None) -> int: 

193 return get_source("institutions").fetch(data_dir, progress_cb=progress_cb) 

194 

195 

196# Map source key → module-level shim *name*, resolved at call time so 

197# `mock.patch` substitutions on the module attribute take effect (capturing 

198# function objects in a dict at import time would defeat patching). 

199_FETCH_SHIM_NAMES = { 

200 "openalex": "_fetch_openalex_sources", 

201 "doaj": "_fetch_doaj_journals", 

202 "predatory": "_fetch_predatory", 

203 "jabref": "_fetch_jabref_abbreviations", 

204 "institutions": "_fetch_institutions", 

205} 

206 

207 

208# --------------------------------------------------------------------------- 

209# Status 

210# --------------------------------------------------------------------------- 

211 

212 

213def get_journal_data_status() -> dict: 

214 """Return the status payload for the dashboard data sources banner. 

215 

216 Shape (kept stable for existing JS consumers and tests): 

217 { 

218 "available": bool, # OpenAlex source OR compiled DB present 

219 "version": Optional[str], 

220 "latest_version": str, 

221 "needs_update": bool, 

222 "files": dict[str, bool], # legacy: filename → present 

223 "sources": list[dict], # per-source detail for the banner 

224 "data_dir": str, 

225 } 

226 """ 

227 data_dir = _get_data_dir() 

228 version_file = data_dir / "version.json" 

229 

230 installed_version: Optional[str] = None 

231 if version_file.exists(): 

232 try: 

233 with open(version_file, encoding="utf-8") as f: 

234 info = json.load(f) 

235 installed_version = info.get("version") 

236 except (json.JSONDecodeError, OSError): 

237 pass 

238 

239 files = {src.filename: src.is_present(data_dir) for src in ALL_SOURCES} 

240 sources = [src.status_dict(data_dir) for src in ALL_SOURCES] 

241 

242 # Available if the (required) OpenAlex source exists OR the compiled 

243 # reference DB exists. The compiled DB is useful even when the source 

244 # JSON has been deleted, since the dashboard can still query it. 

245 has_source = files.get("openalex_sources.json.gz", False) 

246 has_db = (data_dir / "journal_quality.db").exists() or ( 

247 data_dir / "journal_reference.db" 

248 ).exists() 

249 

250 return { 

251 "available": has_source or has_db, 

252 "version": installed_version, 

253 "latest_version": JOURNAL_DATA_VERSION, 

254 # `needs_update` is True both when no version is installed at all 

255 # (first run — show the download CTA) and when the installed 

256 # version is older than the bundled latest. The previous `and` 

257 # spelling silently hid the first-run case from the dashboard. 

258 "needs_update": ( 

259 installed_version is None 

260 or installed_version != JOURNAL_DATA_VERSION 

261 ), 

262 "files": files, 

263 "sources": sources, 

264 "data_dir": str(data_dir), 

265 # Live progress for the dashboard's status indicator. The client 

266 # polls this endpoint while a download is in flight; see 

267 # downloadJournalData() in journal_quality.html. 

268 "download_progress": get_download_state(), 

269 } 

270 

271 

272# --------------------------------------------------------------------------- 

273# Bulk download (dashboard "Download Data" button) 

274# --------------------------------------------------------------------------- 

275 

276 

277def download_journal_data(force: bool = False) -> tuple[bool, str]: 

278 """Fetch every registered data source into the user data directory. 

279 

280 Iterates `ALL_SOURCES` in order. Sources marked `required=True` 

281 (OpenAlex) abort the batch on failure; `required=False` sources are 

282 best-effort and continue on error. 

283 

284 Args: 

285 force: Re-fetch even if data exists and is current version. 

286 

287 Returns: 

288 (success, message) tuple. Message format is 

289 `"Fetched <N1> <label1> + <N2> <label2> + ... in <S>s"` so the 

290 existing test substring assertions ("100 OpenAlex", "50 DOAJ") 

291 continue to match. 

292 """ 

293 data_dir = _get_data_dir() 

294 sentinel = data_dir / _SENTINEL 

295 

296 if not force: 

297 status = get_journal_data_status() 

298 if status["available"] and not status["needs_update"]: 

299 # No fresh fetch ran, so invalidate any counts cached from a 

300 # previous in-process download. Callers keying a "what just 

301 # happened" summary off `counts` must see None here. 

302 _download_state["counts"] = None 

303 return True, "Journal data is already up to date" 

304 

305 # Disk-space pre-check. The five data sources uncompress to ~1 GB 

306 # intermediate, plus the compiled DB. Fail fast with a clear message 

307 # rather than crashing mid-download and leaving a corrupt tmp file. 

308 import shutil as _shutil 

309 

310 from ..constants import JOURNAL_QUALITY_MIN_FREE_DISK_BYTES 

311 

312 try: 

313 free_bytes = _shutil.disk_usage(str(data_dir)).free 

314 except OSError: 

315 logger.warning( 

316 f"Could not check free disk space for {data_dir}; proceeding." 

317 ) 

318 free_bytes = None 

319 if ( 

320 free_bytes is not None 

321 and free_bytes < JOURNAL_QUALITY_MIN_FREE_DISK_BYTES 

322 ): 

323 # No fetch ran → invalidate any stale counts from a prior call 

324 # so `get_download_state()["counts"]` cannot leak them. 

325 _download_state["counts"] = None 

326 return False, ( 

327 f"Insufficient disk space: " 

328 f"{free_bytes / (1024**3):.1f} GB available, " 

329 f"{JOURNAL_QUALITY_MIN_FREE_DISK_BYTES / (1024**3):.0f} GB required." 

330 ) 

331 

332 # Atomic sentinel creation (O_CREAT | O_EXCL). Replaces the previous 

333 # exists()+touch() TOCTOU race so two concurrent download triggers 

334 # (dashboard click + scheduler) cannot both proceed. 

335 # 

336 # Stale-sentinel recovery. Two triggers are checked each call: 

337 # 

338 # 1. PID-based: the sentinel holds the PID of the process that 

339 # created it. If that PID is not alive, the owner process 

340 # crashed or was killed and the sentinel is orphan. 

341 # 2. Age-based (fallback): if the sentinel is older than 

342 # _SENTINEL_STALE_SECS we reclaim it even if the PID check is 

343 # inconclusive (exotic environments, unreadable sentinel). 

344 # 

345 # The startup hook in _clear_orphan_sentinel_on_startup handles the 

346 # common case of "server was restarted mid-download"; these two 

347 # runtime checks cover the case where the server is still running 

348 # but the download worker thread itself crashed out of the sentinel. 

349 import os 

350 

351 def _sentinel_owner_alive() -> bool: 

352 try: 

353 owner_pid = int(sentinel.read_text(encoding="utf-8").strip()) 

354 except (OSError, ValueError): 

355 return False # unreadable / malformed → treat as orphan 

356 if owner_pid == os.getpid(): 356 ↛ 361line 356 didn't jump to line 361 because the condition on line 356 was always true

357 # Same process. Something is wrong (we should never race 

358 # with ourselves — the module-level lock guards that) but 

359 # err on the side of "alive" to avoid self-nuking. 

360 return True 

361 try: 

362 os.kill(owner_pid, 0) # signal 0 = liveness probe 

363 except ProcessLookupError: 

364 return False 

365 except PermissionError: 

366 # Process exists, owned by another user — treat as alive. 

367 return True 

368 except OSError: 

369 return False 

370 return True 

371 

372 def _try_claim_sentinel() -> bool: 

373 """Create the sentinel + stamp our PID. Returns True on success.""" 

374 try: 

375 with sentinel.open("x", encoding="utf-8") as f: 

376 f.write(str(os.getpid())) 

377 return True 

378 except FileExistsError: 

379 return False 

380 

381 if not _try_claim_sentinel(): 

382 try: 

383 age = time.time() - sentinel.stat().st_mtime 

384 except OSError: 

385 age = 0 

386 orphan = not _sentinel_owner_alive() 

387 if orphan or age > _SENTINEL_STALE_SECS: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 reason = ( 

389 "owner process not alive" 

390 if orphan 

391 else f"age {age:.0f}s > {_SENTINEL_STALE_SECS}s" 

392 ) 

393 logger.warning( 

394 f"Reclaiming stale .downloading sentinel ({reason}); " 

395 "previous download likely crashed without cleanup." 

396 ) 

397 sentinel.unlink(missing_ok=True) 

398 if not _try_claim_sentinel(): 

399 # Lost a race with another caller reclaiming the same 

400 # stale sentinel — bow out cleanly. 

401 _download_state["counts"] = None 

402 return False, "Download already in progress" 

403 else: 

404 _download_state["counts"] = None 

405 return False, "Download already in progress" 

406 try: 

407 start = time.time() 

408 counts: dict[str, int] = {} 

409 parts: list[str] = [] 

410 

411 # Reset per-source state to a clean "pending" row for each 

412 # known source. The dashboard renders one row per entry. 

413 _download_state["state"] = "running" 

414 _download_state["started_at"] = start 

415 _download_state["finished_at"] = None 

416 _download_state["error_msg"] = None 

417 # Invalidate counts from any prior run — callers inspecting the 

418 # structured summary must not see stale data if this download 

419 # fails or is still running. 

420 _download_state["counts"] = None 

421 _download_state["db_build"] = {"state": "pending", "detail": ""} 

422 _download_state["sources"] = { 

423 src.key: { 

424 "name": src.name, 

425 "state": "pending", 

426 "detail": "", 

427 "percent": 0, 

428 "count": 0, 

429 "required": src.required, 

430 } 

431 for src in ALL_SOURCES 

432 } 

433 

434 def _fetch_one(src): 

435 """Worker: run one source's fetch, mirror state as it goes.""" 

436 _set_source_state( 

437 src.key, state="running", detail="downloading", percent=5 

438 ) 

439 

440 # Per-partition callback: the chunked sources (openalex 

441 # sources + institutions) call this on every partition so 

442 # the dashboard's bar moves smoothly. One-shot sources 

443 # don't call it and stay at the initial 5% → final 100%. 

444 def _on_progress(done, total, detail): 

445 pct = int(5 + (done / total) * 90) if total > 0 else 5 

446 _set_source_state( 

447 src.key, 

448 state="running", 

449 detail=detail, 

450 percent=max(5, min(95, pct)), 

451 ) 

452 

453 try: 

454 shim_name = _FETCH_SHIM_NAMES.get(src.key) 

455 if shim_name: 455 ↛ 462line 455 didn't jump to line 462 because the condition on line 455 was always true

456 # bearer:disable python_lang_code_injection 

457 # _FETCH_SHIM_NAMES is a hardcoded dict (line 188); the 

458 # late-bound globals() lookup is needed for mock.patch 

459 # compatibility in tests. No user input reaches the key. 

460 n = globals()[shim_name](data_dir, progress_cb=_on_progress) 

461 else: 

462 n = src.fetch(data_dir, progress_cb=_on_progress) 

463 _set_source_state( 

464 src.key, 

465 state="success", 

466 detail=f"{n} {src.count_label}", 

467 percent=100, 

468 count=n, 

469 ) 

470 return (src, n, None) 

471 except Exception as exc: 

472 logger.exception( 

473 f"{src.name} fetch failed " 

474 f"({'required' if src.required else 'optional'})" 

475 ) 

476 _set_source_state( 

477 src.key, 

478 state="error", 

479 detail=exc.__class__.__name__, 

480 percent=100, 

481 ) 

482 return (src, 0, exc) 

483 

484 # Parallel fetch — every source streams from a different host 

485 # (openalex S3, DOAJ CSV, raw.githubusercontent.com, api.openalex.org 

486 # for institutions). No single-host contention; the wall-clock is 

487 # dominated by the slowest source (OpenAlex snapshot ~30-60 s). 

488 from concurrent.futures import ThreadPoolExecutor 

489 

490 with ThreadPoolExecutor( 

491 max_workers=max(1, len(ALL_SOURCES)), 

492 thread_name_prefix="journal-dl", 

493 ) as pool: 

494 results = list(pool.map(_fetch_one, ALL_SOURCES)) 

495 

496 required_failure = None 

497 for src, n, exc in results: 

498 counts[src.key] = n 

499 parts.append(f"{n} {src.count_label}") 

500 if exc is not None and src.required: 

501 required_failure = (src, exc) 

502 

503 if required_failure is not None: 

504 src, _exc = required_failure 

505 msg = f"Failed to fetch {src.name}. Check your network connection." 

506 _download_state["state"] = "error" 

507 _download_state["error_msg"] = msg 

508 _download_state["finished_at"] = time.time() 

509 return False, msg 

510 

511 # Write version marker. Per-source key names are preserved for 

512 # any external consumer that might read them, even though no 

513 # production code does today. 

514 version_file = data_dir / "version.json" 

515 with open(version_file, "w", encoding="utf-8") as f: 

516 json.dump( 

517 { 

518 "version": JOURNAL_DATA_VERSION, 

519 "downloaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"), 

520 "openalex_count": counts.get("openalex", 0), 

521 "doaj_count": counts.get("doaj", 0), 

522 "jabref_count": counts.get("jabref", 0), 

523 "predatory_count": counts.get("predatory", 0), 

524 }, 

525 f, 

526 ) 

527 

528 # Rebuild journal_quality.db synchronously from the freshly 

529 # downloaded gz files. The build is the ONLY writer of this 

530 # file; everything else opens it read-only via mode=ro. 

531 # Also clean up any leftover legacy journal_reference.db files 

532 # from before the rename so existing installs don't carry junk. 

533 legacy_db = data_dir / "journal_reference.db" 

534 if legacy_db.exists(): 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 try: 

536 import os as _os 

537 

538 # bearer:disable python_lang_file_permissions 

539 _os.chmod(legacy_db, 0o644) 

540 legacy_db.unlink() 

541 logger.info( 

542 "Removed legacy journal_reference.db " 

543 "(replaced by journal_quality.db)" 

544 ) 

545 except OSError: 

546 logger.exception("Could not remove legacy DB") 

547 

548 _download_state["db_build"] = { 

549 "state": "running", 

550 "detail": "parsing bundled data", 

551 } 

552 db_build_error: Optional[str] = None 

553 try: 

554 from .db import DB_FILENAME, build_db 

555 

556 new_db_file = data_dir / DB_FILENAME 

557 if new_db_file.exists(): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 import os as _os 

559 

560 # bearer:disable python_lang_file_permissions 

561 _os.chmod(new_db_file, 0o644) 

562 new_db_file.unlink() 

563 build_db(data_dir=data_dir, output_path=new_db_file) 

564 except Exception as exc: 

565 logger.exception( 

566 "Failed to rebuild journal_quality.db; " 

567 "the runtime accessor will lazy-build on next access" 

568 ) 

569 # SchemaDriftError messages are developer-authored literals 

570 # (no SQL, paths, or stack fragments) so they're safe to 

571 # surface — operators need to see *which* field drifted to 

572 # act on it. For any other exception, fall back to the 

573 # class name only, per CodeQL "Information exposure through 

574 # an exception" (alerts 7650, 7684). The full trace always 

575 # stays in logger.exception above (server-side only). 

576 from .data_sources.openalex import SchemaDriftError 

577 

578 if isinstance(exc, SchemaDriftError): 

579 db_build_error = str(exc) 

580 else: 

581 db_build_error = exc.__class__.__name__ 

582 

583 elapsed = time.time() - start 

584 if db_build_error: 

585 msg = ( 

586 f"Downloaded data ({' + '.join(parts)}) in {elapsed:.0f}s " 

587 f"but DB build failed ({db_build_error}). " 

588 f"Lazy-build will retry on next access." 

589 ) 

590 _download_state["db_build"] = { 

591 "state": "error", 

592 "detail": db_build_error, 

593 } 

594 _download_state["state"] = "error" 

595 _download_state["error_msg"] = msg 

596 _download_state["finished_at"] = time.time() 

597 return False, msg 

598 

599 success_msg = f"Fetched {' + '.join(parts)} in {elapsed:.0f}s" 

600 _download_state["db_build"] = { 

601 "state": "success", 

602 "detail": "ready", 

603 } 

604 _download_state["state"] = "success" 

605 _download_state["error_msg"] = None 

606 _download_state["finished_at"] = time.time() 

607 # Publish structured counts for callers that want to render a 

608 # user-facing summary without echoing `success_msg`. All values 

609 # are ints populated from source `.fetch()` returns above. 

610 _download_state["counts"] = dict(counts) 

611 return True, success_msg 

612 

613 finally: 

614 sentinel.unlink(missing_ok=True) 

615 

616 

617_ensure_cache: Optional[tuple[float, tuple[Optional[Path], bool]]] = None 

618_ENSURE_CACHE_TTL = 30.0 # seconds 

619 

620 

621def ensure_journal_data( 

622 auto_download: bool = True, 

623) -> tuple[Optional[Path], bool]: 

624 """Ensure journal data is available, optionally triggering a bulk fetch. 

625 

626 Returns: 

627 (data_dir, is_available) — data_dir is None if unavailable. 

628 

629 Thundering-herd guard: when a search runs, every search engine's 

630 reputation-filter worker (~30 threads) calls this concurrently. 

631 Without the cache, 29 of them race to create the sentinel and 

632 each logs a WARNING. One call does the real work; the rest get 

633 the cached answer for 30 seconds. The success path (data files 

634 present) is already fast — we only cache the negative / race 

635 result, which is the noisy path. 

636 """ 

637 global _ensure_cache 

638 

639 user_dir = _get_data_dir() 

640 if (user_dir / "openalex_sources.json.gz").exists(): 

641 # Positive path is cheap (one stat call) — no need to cache. 

642 return user_dir, True 

643 

644 now = time.time() 

645 if _ensure_cache is not None: 

646 ts, cached = _ensure_cache 

647 if now - ts < _ENSURE_CACHE_TTL: 647 ↛ 650line 647 didn't jump to line 650 because the condition on line 647 was always true

648 return cached 

649 

650 if auto_download: 650 ↛ 661line 650 didn't jump to line 661 because the condition on line 650 was always true

651 logger.info( 

652 "Journal data not found — fetching from upstream sources..." 

653 ) 

654 success, message = download_journal_data() 

655 if success: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true

656 logger.info(message) 

657 _ensure_cache = (now, (user_dir, True)) 

658 return user_dir, True 

659 logger.warning(f"Journal data fetch failed: {message}") 

660 

661 result: tuple[Optional[Path], bool] = (None, False) 

662 _ensure_cache = (now, result) 

663 return result