Coverage for src/local_deep_research/journal_quality/downloader.py: 80%
223 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""Bulk-fetch and lazy-load journal-quality datasets.
3This module is now a thin orchestration layer over the
4`data_sources` package. Each academic dataset is a `DataSource`
5subclass; this module just iterates `ALL_SOURCES` to drive the bulk
6download flow (the dashboard "Download Data" button) and to compute the
7status payload returned by the `/metrics/api/journal-data/status`
8endpoint.
10Public API (kept stable for existing callers and tests):
11- `JOURNAL_DATA_VERSION`
12- `get_journal_data_status()`
13- `download_journal_data(force=False)`
14- `ensure_journal_data(auto_download=True)`
16Test-patch surface (kept stable for tests that mock by string path):
17- `_get_data_dir`
18- `_fetch_openalex_sources` (re-export shim)
19- `_fetch_doaj_journals` (re-export shim)
21Data source registry lives in `.data_sources` and is the single source
22of truth for source metadata, fetch logic, and download policy.
23"""
25from __future__ import annotations
27import json
28import time
29from pathlib import Path
30from typing import Optional
32from loguru import logger
34from .data_sources import ALL_SOURCES, get_source
36JOURNAL_DATA_VERSION = "v4"
38_SENTINEL = ".downloading"
40# If the sentinel is older than this, assume the previous download
41# crashed mid-way (thread died before the `finally` cleanup could run)
42# and reclaim it. The expected wall-clock is ~7 minutes, so 20 minutes
43# is a generous safety margin that still catches stuck sentinels.
44_SENTINEL_STALE_SECS = 20 * 60
46# Shared progress state for the dashboard's status endpoint.
47# A module-level dict is deliberate: there's only ever one concurrent
48# download (enforced by the O_EXCL sentinel), and the status endpoint
49# reads it on a best-effort basis. Structure:
50# state: "idle" | "running" | "error" | "success"
51# started_at: epoch seconds, or None
52# finished_at: epoch seconds, or None
53# sources: {src_key: {name, state, detail, count}}
54# db_build: {state, detail}
55# error_msg: str or None
56#
57# Per-source entries track independent parallel downloads. Writes are
58# atomic at the per-key level in CPython, so workers can update their
59# own sub-dict without a lock; the main thread composes the overall
60# `state` / `error_msg` after joining.
61_download_state: dict = {
62 "state": "idle",
63 "started_at": None,
64 "finished_at": None,
65 "sources": {},
66 "db_build": {"state": "pending", "detail": ""},
67 "error_msg": None,
68 # Per-source final counts from the most recently COMPLETED download.
69 # Set to a dict only on the success path; explicitly invalidated to
70 # None on every other return in `download_journal_data` (up-to-date,
71 # disk-space, sentinel-race, required-failure, DB-build-failure) so
72 # callers cannot read stale counts from a prior successful run.
73 # Callers rendering a user-facing summary should prefer this
74 # structured field over parsing the `(success, message)` tuple's
75 # string.
76 "counts": None,
77}
80def get_download_state() -> dict:
81 """Return a copy of the current download progress state.
83 A shallow dict copy is not enough — callers (the status endpoint)
84 serialize this to JSON and would otherwise race with live updates.
85 We copy the nested dicts too.
86 """
87 counts = _download_state["counts"]
88 return {
89 "state": _download_state["state"],
90 "started_at": _download_state["started_at"],
91 "finished_at": _download_state["finished_at"],
92 "db_build": dict(_download_state["db_build"]),
93 "error_msg": _download_state["error_msg"],
94 "sources": {k: dict(v) for k, v in _download_state["sources"].items()},
95 "counts": dict(counts) if counts is not None else None,
96 }
99def _set_source_state(key: str, **updates) -> None:
100 """Update a single source's state entry. Safe to call from worker
101 threads because the only writer of ``sources[key]`` is that key's
102 own worker (CPython guarantees dict-item writes are atomic).
103 """
104 entry = _download_state["sources"].setdefault(
105 key, {"name": key, "state": "pending", "detail": "", "count": 0}
106 )
107 entry.update(updates)
108 logger.info(
109 f"journal-data progress: {entry.get('name', key)} "
110 f"{entry.get('state', '?')} {entry.get('detail', '')}".rstrip()
111 )
114def _get_data_dir() -> Path:
115 """Get the journal data directory (user-writable).
117 Kept as a module-level function so tests can patch it via
118 `mock.patch("...journal_data_downloader._get_data_dir", ...)`.
119 """
120 from ..config.paths import get_journal_data_directory
122 return get_journal_data_directory()
125def _clear_orphan_sentinel_on_startup() -> None:
126 """Remove any ``.downloading`` sentinel left over from a previous
127 process that got killed mid-download.
129 The sentinel is created inside ``download_journal_data`` and cleaned
130 up in its ``finally`` block. If the process is SIGKILLed (or crashes
131 hard enough that the ``finally`` doesn't run) the file sits on disk
132 forever. Every subsequent call then sees "Download already in
133 progress" and bows out, even though nothing is actually downloading.
135 Called once at import time. A fresh process can't possibly own an
136 in-progress download, so any pre-existing sentinel is by definition
137 an orphan. The 20-minute ``_SENTINEL_STALE_SECS`` recovery path
138 still exists for the "process still alive but hung" case.
140 Swallows all exceptions — a misread on startup must not break the
141 module. Tests that run concurrently in the same process
142 (test_concurrent_download_blocked) set the sentinel deliberately
143 *after* import, so this runs once and doesn't interfere.
144 """
145 try:
146 sentinel = _get_data_dir() / _SENTINEL
147 if sentinel.exists(): 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 sentinel.unlink()
149 logger.warning(
150 f"Cleared orphan {_SENTINEL} sentinel from a previous run "
151 f"(the old process was killed mid-download; a fresh process "
152 f"cannot own an in-progress download)."
153 )
154 except Exception:
155 logger.exception(
156 "Could not clear orphan sentinel on startup; "
157 "new downloads may be blocked until the 20-minute stale timer."
158 )
161_clear_orphan_sentinel_on_startup()
164# ---------------------------------------------------------------------------
165# Test-patch shims for `_fetch_openalex_sources` and `_fetch_doaj_journals`
166#
167# Tests in `tests/utilities/test_journal_data_downloader.py` patch these
168# names by string path. The bodies have moved into `OpenAlexSource.fetch`
169# and `DOAJSource.fetch`, but we expose module-level wrappers so the
170# existing patches keep intercepting calls. The bulk download loop below
171# routes both sources through these wrappers (not directly through
172# `.fetch()`) so that `mock.patch` substitutions take effect.
173# ---------------------------------------------------------------------------
176def _fetch_openalex_sources(data_dir: Path, progress_cb=None) -> int:
177 return get_source("openalex").fetch(data_dir, progress_cb=progress_cb)
180def _fetch_doaj_journals(data_dir: Path, progress_cb=None) -> int:
181 return get_source("doaj").fetch(data_dir, progress_cb=progress_cb)
184def _fetch_predatory(data_dir: Path, progress_cb=None) -> int:
185 return get_source("predatory").fetch(data_dir, progress_cb=progress_cb)
188def _fetch_jabref_abbreviations(data_dir: Path, progress_cb=None) -> int:
189 return get_source("jabref").fetch(data_dir, progress_cb=progress_cb)
192def _fetch_institutions(data_dir: Path, progress_cb=None) -> int:
193 return get_source("institutions").fetch(data_dir, progress_cb=progress_cb)
196# Map source key → module-level shim *name*, resolved at call time so
197# `mock.patch` substitutions on the module attribute take effect (capturing
198# function objects in a dict at import time would defeat patching).
199_FETCH_SHIM_NAMES = {
200 "openalex": "_fetch_openalex_sources",
201 "doaj": "_fetch_doaj_journals",
202 "predatory": "_fetch_predatory",
203 "jabref": "_fetch_jabref_abbreviations",
204 "institutions": "_fetch_institutions",
205}
208# ---------------------------------------------------------------------------
209# Status
210# ---------------------------------------------------------------------------
213def get_journal_data_status() -> dict:
214 """Return the status payload for the dashboard data sources banner.
216 Shape (kept stable for existing JS consumers and tests):
217 {
218 "available": bool, # OpenAlex source OR compiled DB present
219 "version": Optional[str],
220 "latest_version": str,
221 "needs_update": bool,
222 "files": dict[str, bool], # legacy: filename → present
223 "sources": list[dict], # per-source detail for the banner
224 "data_dir": str,
225 }
226 """
227 data_dir = _get_data_dir()
228 version_file = data_dir / "version.json"
230 installed_version: Optional[str] = None
231 if version_file.exists():
232 try:
233 with open(version_file, encoding="utf-8") as f:
234 info = json.load(f)
235 installed_version = info.get("version")
236 except (json.JSONDecodeError, OSError):
237 pass
239 files = {src.filename: src.is_present(data_dir) for src in ALL_SOURCES}
240 sources = [src.status_dict(data_dir) for src in ALL_SOURCES]
242 # Available if the (required) OpenAlex source exists OR the compiled
243 # reference DB exists. The compiled DB is useful even when the source
244 # JSON has been deleted, since the dashboard can still query it.
245 has_source = files.get("openalex_sources.json.gz", False)
246 has_db = (data_dir / "journal_quality.db").exists() or (
247 data_dir / "journal_reference.db"
248 ).exists()
250 return {
251 "available": has_source or has_db,
252 "version": installed_version,
253 "latest_version": JOURNAL_DATA_VERSION,
254 # `needs_update` is True both when no version is installed at all
255 # (first run — show the download CTA) and when the installed
256 # version is older than the bundled latest. The previous `and`
257 # spelling silently hid the first-run case from the dashboard.
258 "needs_update": (
259 installed_version is None
260 or installed_version != JOURNAL_DATA_VERSION
261 ),
262 "files": files,
263 "sources": sources,
264 "data_dir": str(data_dir),
265 # Live progress for the dashboard's status indicator. The client
266 # polls this endpoint while a download is in flight; see
267 # downloadJournalData() in journal_quality.html.
268 "download_progress": get_download_state(),
269 }
272# ---------------------------------------------------------------------------
273# Bulk download (dashboard "Download Data" button)
274# ---------------------------------------------------------------------------
277def download_journal_data(force: bool = False) -> tuple[bool, str]:
278 """Fetch every registered data source into the user data directory.
280 Iterates `ALL_SOURCES` in order. Sources marked `required=True`
281 (OpenAlex) abort the batch on failure; `required=False` sources are
282 best-effort and continue on error.
284 Args:
285 force: Re-fetch even if data exists and is current version.
287 Returns:
288 (success, message) tuple. Message format is
289 `"Fetched <N1> <label1> + <N2> <label2> + ... in <S>s"` so the
290 existing test substring assertions ("100 OpenAlex", "50 DOAJ")
291 continue to match.
292 """
293 data_dir = _get_data_dir()
294 sentinel = data_dir / _SENTINEL
296 if not force:
297 status = get_journal_data_status()
298 if status["available"] and not status["needs_update"]:
299 # No fresh fetch ran, so invalidate any counts cached from a
300 # previous in-process download. Callers keying a "what just
301 # happened" summary off `counts` must see None here.
302 _download_state["counts"] = None
303 return True, "Journal data is already up to date"
305 # Disk-space pre-check. The five data sources uncompress to ~1 GB
306 # intermediate, plus the compiled DB. Fail fast with a clear message
307 # rather than crashing mid-download and leaving a corrupt tmp file.
308 import shutil as _shutil
310 from ..constants import JOURNAL_QUALITY_MIN_FREE_DISK_BYTES
312 try:
313 free_bytes = _shutil.disk_usage(str(data_dir)).free
314 except OSError:
315 logger.warning(
316 f"Could not check free disk space for {data_dir}; proceeding."
317 )
318 free_bytes = None
319 if (
320 free_bytes is not None
321 and free_bytes < JOURNAL_QUALITY_MIN_FREE_DISK_BYTES
322 ):
323 # No fetch ran → invalidate any stale counts from a prior call
324 # so `get_download_state()["counts"]` cannot leak them.
325 _download_state["counts"] = None
326 return False, (
327 f"Insufficient disk space: "
328 f"{free_bytes / (1024**3):.1f} GB available, "
329 f"{JOURNAL_QUALITY_MIN_FREE_DISK_BYTES / (1024**3):.0f} GB required."
330 )
332 # Atomic sentinel creation (O_CREAT | O_EXCL). Replaces the previous
333 # exists()+touch() TOCTOU race so two concurrent download triggers
334 # (dashboard click + scheduler) cannot both proceed.
335 #
336 # Stale-sentinel recovery. Two triggers are checked each call:
337 #
338 # 1. PID-based: the sentinel holds the PID of the process that
339 # created it. If that PID is not alive, the owner process
340 # crashed or was killed and the sentinel is orphan.
341 # 2. Age-based (fallback): if the sentinel is older than
342 # _SENTINEL_STALE_SECS we reclaim it even if the PID check is
343 # inconclusive (exotic environments, unreadable sentinel).
344 #
345 # The startup hook in _clear_orphan_sentinel_on_startup handles the
346 # common case of "server was restarted mid-download"; these two
347 # runtime checks cover the case where the server is still running
348 # but the download worker thread itself crashed out of the sentinel.
349 import os
351 def _sentinel_owner_alive() -> bool:
352 try:
353 owner_pid = int(sentinel.read_text(encoding="utf-8").strip())
354 except (OSError, ValueError):
355 return False # unreadable / malformed → treat as orphan
356 if owner_pid == os.getpid(): 356 ↛ 361line 356 didn't jump to line 361 because the condition on line 356 was always true
357 # Same process. Something is wrong (we should never race
358 # with ourselves — the module-level lock guards that) but
359 # err on the side of "alive" to avoid self-nuking.
360 return True
361 try:
362 os.kill(owner_pid, 0) # signal 0 = liveness probe
363 except ProcessLookupError:
364 return False
365 except PermissionError:
366 # Process exists, owned by another user — treat as alive.
367 return True
368 except OSError:
369 return False
370 return True
372 def _try_claim_sentinel() -> bool:
373 """Create the sentinel + stamp our PID. Returns True on success."""
374 try:
375 with sentinel.open("x", encoding="utf-8") as f:
376 f.write(str(os.getpid()))
377 return True
378 except FileExistsError:
379 return False
381 if not _try_claim_sentinel():
382 try:
383 age = time.time() - sentinel.stat().st_mtime
384 except OSError:
385 age = 0
386 orphan = not _sentinel_owner_alive()
387 if orphan or age > _SENTINEL_STALE_SECS: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 reason = (
389 "owner process not alive"
390 if orphan
391 else f"age {age:.0f}s > {_SENTINEL_STALE_SECS}s"
392 )
393 logger.warning(
394 f"Reclaiming stale .downloading sentinel ({reason}); "
395 "previous download likely crashed without cleanup."
396 )
397 sentinel.unlink(missing_ok=True)
398 if not _try_claim_sentinel():
399 # Lost a race with another caller reclaiming the same
400 # stale sentinel — bow out cleanly.
401 _download_state["counts"] = None
402 return False, "Download already in progress"
403 else:
404 _download_state["counts"] = None
405 return False, "Download already in progress"
406 try:
407 start = time.time()
408 counts: dict[str, int] = {}
409 parts: list[str] = []
411 # Reset per-source state to a clean "pending" row for each
412 # known source. The dashboard renders one row per entry.
413 _download_state["state"] = "running"
414 _download_state["started_at"] = start
415 _download_state["finished_at"] = None
416 _download_state["error_msg"] = None
417 # Invalidate counts from any prior run — callers inspecting the
418 # structured summary must not see stale data if this download
419 # fails or is still running.
420 _download_state["counts"] = None
421 _download_state["db_build"] = {"state": "pending", "detail": ""}
422 _download_state["sources"] = {
423 src.key: {
424 "name": src.name,
425 "state": "pending",
426 "detail": "",
427 "percent": 0,
428 "count": 0,
429 "required": src.required,
430 }
431 for src in ALL_SOURCES
432 }
434 def _fetch_one(src):
435 """Worker: run one source's fetch, mirror state as it goes."""
436 _set_source_state(
437 src.key, state="running", detail="downloading", percent=5
438 )
440 # Per-partition callback: the chunked sources (openalex
441 # sources + institutions) call this on every partition so
442 # the dashboard's bar moves smoothly. One-shot sources
443 # don't call it and stay at the initial 5% → final 100%.
444 def _on_progress(done, total, detail):
445 pct = int(5 + (done / total) * 90) if total > 0 else 5
446 _set_source_state(
447 src.key,
448 state="running",
449 detail=detail,
450 percent=max(5, min(95, pct)),
451 )
453 try:
454 shim_name = _FETCH_SHIM_NAMES.get(src.key)
455 if shim_name: 455 ↛ 462line 455 didn't jump to line 462 because the condition on line 455 was always true
456 # bearer:disable python_lang_code_injection
457 # _FETCH_SHIM_NAMES is a hardcoded dict (line 188); the
458 # late-bound globals() lookup is needed for mock.patch
459 # compatibility in tests. No user input reaches the key.
460 n = globals()[shim_name](data_dir, progress_cb=_on_progress)
461 else:
462 n = src.fetch(data_dir, progress_cb=_on_progress)
463 _set_source_state(
464 src.key,
465 state="success",
466 detail=f"{n} {src.count_label}",
467 percent=100,
468 count=n,
469 )
470 return (src, n, None)
471 except Exception as exc:
472 logger.exception(
473 f"{src.name} fetch failed "
474 f"({'required' if src.required else 'optional'})"
475 )
476 _set_source_state(
477 src.key,
478 state="error",
479 detail=exc.__class__.__name__,
480 percent=100,
481 )
482 return (src, 0, exc)
484 # Parallel fetch — every source streams from a different host
485 # (openalex S3, DOAJ CSV, raw.githubusercontent.com, api.openalex.org
486 # for institutions). No single-host contention; the wall-clock is
487 # dominated by the slowest source (OpenAlex snapshot ~30-60 s).
488 from concurrent.futures import ThreadPoolExecutor
490 with ThreadPoolExecutor(
491 max_workers=max(1, len(ALL_SOURCES)),
492 thread_name_prefix="journal-dl",
493 ) as pool:
494 results = list(pool.map(_fetch_one, ALL_SOURCES))
496 required_failure = None
497 for src, n, exc in results:
498 counts[src.key] = n
499 parts.append(f"{n} {src.count_label}")
500 if exc is not None and src.required:
501 required_failure = (src, exc)
503 if required_failure is not None:
504 src, _exc = required_failure
505 msg = f"Failed to fetch {src.name}. Check your network connection."
506 _download_state["state"] = "error"
507 _download_state["error_msg"] = msg
508 _download_state["finished_at"] = time.time()
509 return False, msg
511 # Write version marker. Per-source key names are preserved for
512 # any external consumer that might read them, even though no
513 # production code does today.
514 version_file = data_dir / "version.json"
515 with open(version_file, "w", encoding="utf-8") as f:
516 json.dump(
517 {
518 "version": JOURNAL_DATA_VERSION,
519 "downloaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
520 "openalex_count": counts.get("openalex", 0),
521 "doaj_count": counts.get("doaj", 0),
522 "jabref_count": counts.get("jabref", 0),
523 "predatory_count": counts.get("predatory", 0),
524 },
525 f,
526 )
528 # Rebuild journal_quality.db synchronously from the freshly
529 # downloaded gz files. The build is the ONLY writer of this
530 # file; everything else opens it read-only via mode=ro.
531 # Also clean up any leftover legacy journal_reference.db files
532 # from before the rename so existing installs don't carry junk.
533 legacy_db = data_dir / "journal_reference.db"
534 if legacy_db.exists(): 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 try:
536 import os as _os
538 # bearer:disable python_lang_file_permissions
539 _os.chmod(legacy_db, 0o644)
540 legacy_db.unlink()
541 logger.info(
542 "Removed legacy journal_reference.db "
543 "(replaced by journal_quality.db)"
544 )
545 except OSError:
546 logger.exception("Could not remove legacy DB")
548 _download_state["db_build"] = {
549 "state": "running",
550 "detail": "parsing bundled data",
551 }
552 db_build_error: Optional[str] = None
553 try:
554 from .db import DB_FILENAME, build_db
556 new_db_file = data_dir / DB_FILENAME
557 if new_db_file.exists(): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 import os as _os
560 # bearer:disable python_lang_file_permissions
561 _os.chmod(new_db_file, 0o644)
562 new_db_file.unlink()
563 build_db(data_dir=data_dir, output_path=new_db_file)
564 except Exception as exc:
565 logger.exception(
566 "Failed to rebuild journal_quality.db; "
567 "the runtime accessor will lazy-build on next access"
568 )
569 # SchemaDriftError messages are developer-authored literals
570 # (no SQL, paths, or stack fragments) so they're safe to
571 # surface — operators need to see *which* field drifted to
572 # act on it. For any other exception, fall back to the
573 # class name only, per CodeQL "Information exposure through
574 # an exception" (alerts 7650, 7684). The full trace always
575 # stays in logger.exception above (server-side only).
576 from .data_sources.openalex import SchemaDriftError
578 if isinstance(exc, SchemaDriftError):
579 db_build_error = str(exc)
580 else:
581 db_build_error = exc.__class__.__name__
583 elapsed = time.time() - start
584 if db_build_error:
585 msg = (
586 f"Downloaded data ({' + '.join(parts)}) in {elapsed:.0f}s "
587 f"but DB build failed ({db_build_error}). "
588 f"Lazy-build will retry on next access."
589 )
590 _download_state["db_build"] = {
591 "state": "error",
592 "detail": db_build_error,
593 }
594 _download_state["state"] = "error"
595 _download_state["error_msg"] = msg
596 _download_state["finished_at"] = time.time()
597 return False, msg
599 success_msg = f"Fetched {' + '.join(parts)} in {elapsed:.0f}s"
600 _download_state["db_build"] = {
601 "state": "success",
602 "detail": "ready",
603 }
604 _download_state["state"] = "success"
605 _download_state["error_msg"] = None
606 _download_state["finished_at"] = time.time()
607 # Publish structured counts for callers that want to render a
608 # user-facing summary without echoing `success_msg`. All values
609 # are ints populated from source `.fetch()` returns above.
610 _download_state["counts"] = dict(counts)
611 return True, success_msg
613 finally:
614 sentinel.unlink(missing_ok=True)
617_ensure_cache: Optional[tuple[float, tuple[Optional[Path], bool]]] = None
618_ENSURE_CACHE_TTL = 30.0 # seconds
621def ensure_journal_data(
622 auto_download: bool = True,
623) -> tuple[Optional[Path], bool]:
624 """Ensure journal data is available, optionally triggering a bulk fetch.
626 Returns:
627 (data_dir, is_available) — data_dir is None if unavailable.
629 Thundering-herd guard: when a search runs, every search engine's
630 reputation-filter worker (~30 threads) calls this concurrently.
631 Without the cache, 29 of them race to create the sentinel and
632 each logs a WARNING. One call does the real work; the rest get
633 the cached answer for 30 seconds. The success path (data files
634 present) is already fast — we only cache the negative / race
635 result, which is the noisy path.
636 """
637 global _ensure_cache
639 user_dir = _get_data_dir()
640 if (user_dir / "openalex_sources.json.gz").exists():
641 # Positive path is cheap (one stat call) — no need to cache.
642 return user_dir, True
644 now = time.time()
645 if _ensure_cache is not None:
646 ts, cached = _ensure_cache
647 if now - ts < _ENSURE_CACHE_TTL: 647 ↛ 650line 647 didn't jump to line 650 because the condition on line 647 was always true
648 return cached
650 if auto_download: 650 ↛ 661line 650 didn't jump to line 661 because the condition on line 650 was always true
651 logger.info(
652 "Journal data not found — fetching from upstream sources..."
653 )
654 success, message = download_journal_data()
655 if success: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true
656 logger.info(message)
657 _ensure_cache = (now, (user_dir, True))
658 return user_dir, True
659 logger.warning(f"Journal data fetch failed: {message}")
661 result: tuple[Optional[Path], bool] = (None, False)
662 _ensure_cache = (now, result)
663 return result