Coverage for src/local_deep_research/journal_quality/db.py: 36%
672 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""Read-only SQLAlchemy accessor for the compiled journal-quality DB.
3INVARIANT: this module's `JournalQualityDB` class **never writes**.
4The runtime engine is opened with SQLite URI flags `mode=ro` and
5`immutable=1`, the file is `chmod 0o444` after every build, and a
6pre-commit hook bans cross-module opens of the file without `mode=ro`.
8The only writer is `build_db()` in this same module, which opens its
9own short-lived writable engine, populates the schema, runs ANALYZE
10+ VACUUM, closes the engine, and chmods the file back to 0o444.
12The DB compiles five gzipped JSON snapshots (downloaded by
13`journal_quality.downloader`) into one queryable file:
15- OpenAlex sources → `sources` table (with predatory + DOAJ flags)
16- Stop Predatory Journals → `predatory_journals/_publishers/_hijacked`
17- DOAJ → cross-referenced into `sources`
18- JabRef abbreviations → `abbreviations` table
19- OpenAlex Institutions → `institutions` table
21Built fresh on every download, no migrations.
22"""
24from __future__ import annotations
26import gzip
27import json
28import os
29import secrets
30import sqlite3
31import sys
32import threading
33import time
34from contextlib import contextmanager
35from pathlib import Path
36from typing import Iterable, Iterator, Optional
38from loguru import logger
39from sqlalchemy import create_engine, func, or_, select
40from sqlalchemy.engine import Engine
41from sqlalchemy.exc import DatabaseError, OperationalError
42from sqlalchemy.orm import Session, sessionmaker
44from .models import (
45 Abbreviation,
46 Institution,
47 JournalQualityBase,
48 PredatoryHijacked,
49 PredatoryJournal,
50 PredatoryPublisher,
51 Source,
52)
53from ..constants import PREDATORY_WHITELIST_HINDEX
54from ..utilities.citation_normalizer import normalize_issn
55from .scoring import (
56 derive_quality_score,
57 institution_score_from_h_index,
58 normalize_name,
59)
61DB_FILENAME = "journal_quality.db"
62_BATCH_SIZE = 5000
64# Bump when the reference DB schema (models.py) changes in a way that
65# requires a rebuild even if the upstream data version hasn't changed.
66# Stamped as SQLite `PRAGMA user_version` during build_db; checked on
67# _ensure_engine. Separate from JOURNAL_DATA_VERSION (downloader.py)
68# which tracks upstream source-data freshness.
69JOURNAL_QUALITY_SCHEMA_VERSION = 3
71# Quality tier → score range, used by the dashboard tier filter.
72_TIER_RANGES = {
73 "elite": (9, 10),
74 "strong": (7, 8),
75 "moderate": (5, 6),
76 "low": (3, 4),
77 "predatory": (1, 2),
78}
80# Columns safe to use in ORDER BY (prevents injection via the dashboard
81# `sort` query parameter).
82_SORT_COLUMNS = frozenset(
83 {
84 "name",
85 "quality",
86 "quartile",
87 "h_index",
88 "impact_factor",
89 "score_source",
90 "source_type",
91 "publisher",
92 "is_predatory",
93 }
94)
96# Max length for user-supplied search strings. Even with LIKE
97# wildcards escaped, a 10 KB pattern against 217K rows is slow enough
98# to matter for CPU budget under concurrent requests.
99_MAX_SEARCH_LEN = 100
102def _escape_like(s: str) -> str:
103 """Escape SQL LIKE metacharacters so user-supplied search strings
104 can't force degenerate full-table scans via ``%`` or ``_``.
106 Pair with ``.like(pattern, escape="/")`` at the call site.
107 """
108 return s.replace("/", "//").replace("%", "/%").replace("_", "/_")
111# ---------------------------------------------------------------------------
112# Read-only accessor
113# ---------------------------------------------------------------------------
116class JournalQualityDB:
117 """Read-only SQLAlchemy 2.0 accessor for `journal_quality.db`.
119 All filter hot-path methods return plain dicts (not mapped Source
120 objects) so call sites in `journal_reputation_filter.py` keep the
121 same call shape they had against the dict-based predecessor. The
122 dashboard methods can return either dicts or Source instances —
123 they're called once per page view so the ORM overhead is fine.
124 """
126 def __init__(self) -> None:
127 self._engine: Optional[Engine] = None
128 self._SessionLocal: Optional[sessionmaker[Session]] = None
129 # RLock (not Lock): _ensure_engine holds the lock while calling
130 # _build_or_raise → build_db → reset_db → self.reset(), which
131 # re-acquires the same lock. A non-reentrant Lock would deadlock
132 # the very first request on a fresh install.
133 self._lock = threading.RLock()
134 # Whether we've already logged a stale-data-version warning for
135 # this engine lifetime. Prevents log spam — one WARNING per
136 # server start is enough to surface the problem to admins.
137 self._stale_version_warned = False
139 # --- engine + session lifecycle ---
141 def _resolve_db_path(self) -> Path:
142 from ..config.paths import get_journal_data_directory
144 return get_journal_data_directory() / DB_FILENAME
146 def _ensure_engine(self) -> None:
147 # Acquire lock BEFORE first read to avoid DCLP publication hazard.
148 # With the GIL this is safe on CPython but explicit locking makes
149 # the happens-before relationship clear and portable.
150 with self._lock:
151 if self._engine is not None:
152 return
153 path = self._resolve_db_path()
154 if not path.exists(): 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 self._build_or_raise(path)
156 else:
157 # Validate existing file before wiring up the read-only
158 # engine. Catches two failure modes at open time instead
159 # of letting them propagate to first query:
160 # 1. Schema drift — ORM changed since this file was
161 # built (PRAGMA user_version mismatch) → rebuild.
162 # 2. Corruption — file exists but isn't a valid DB
163 # (truncated build, disk error) → rebuild.
164 if not self._validate_existing_db(path): 164 ↛ 175line 164 didn't jump to line 175 because the condition on line 164 was always true
165 self._build_or_raise(path)
167 # mode=ro + immutable=1: SQLite physically refuses writes,
168 # skips locking entirely, and reads via mmap. The OS page
169 # cache holds one shared resident copy of the hot pages.
170 #
171 # Use a creator callback because SQLAlchemy's URL parser
172 # eats the ?mode=ro&immutable=1 query string before it can
173 # reach sqlite3. The creator builds the connection directly
174 # with the SQLite URI flags intact.
175 def _make_ro_conn() -> sqlite3.Connection:
176 return sqlite3.connect(
177 f"file:{path}?mode=ro&immutable=1",
178 uri=True,
179 check_same_thread=False,
180 )
182 # StaticPool: with immutable=1 SQLite skips locking and the
183 # OS page cache handles concurrency. A single shared connection
184 # is safe and avoids the default QueuePool's 15-connection
185 # footprint that offers no benefit for immutable reads.
186 from sqlalchemy.pool import StaticPool
188 engine = create_engine(
189 "sqlite://",
190 creator=_make_ro_conn,
191 poolclass=StaticPool,
192 echo=False,
193 )
194 session_local = sessionmaker(bind=engine, expire_on_commit=False)
195 # Publish both together so readers never see engine-without-session.
196 self._engine = engine
197 self._SessionLocal = session_local
198 logger.info(f"Opened journal_quality.db (read-only): {path}")
199 # One-shot check: is the DATA version (the one the sources
200 # JSON + build logic produce) behind the bundled latest?
201 # Schema drift is already handled by `_validate_existing_db`
202 # via ``PRAGMA user_version``. A data-version mismatch is a
203 # different concern: the DB schema is fine, but the scoring
204 # logic (e.g. the repository cap) or source snapshots have
205 # been updated since this file was built. The hot path
206 # (filter scoring) would silently serve stale scores if we
207 # didn't surface the mismatch anywhere but the admin
208 # dashboard. Log once, don't auto-rebuild — user consent
209 # via the dashboard "Download Data" button remains the
210 # explicit refresh trigger.
211 self._warn_on_stale_data_version(path.parent)
213 def _warn_on_stale_data_version(self, data_dir: Path) -> None:
214 """Log WARNING once if ``version.json`` is behind ``JOURNAL_DATA_VERSION``."""
215 if self._stale_version_warned:
216 return
217 # Lazy import to avoid any downloader → db cycle even though
218 # today's module graph doesn't have one.
219 from .downloader import JOURNAL_DATA_VERSION
221 version_file = data_dir / "version.json"
222 if not version_file.exists():
223 return # Brand-new install — the dashboard's banner handles this.
224 try:
225 with open(version_file, encoding="utf-8") as f:
226 info = json.load(f)
227 installed = info.get("version")
228 except (json.JSONDecodeError, OSError):
229 return # Malformed — dashboard's banner surfaces; don't double-log.
230 if installed and installed != JOURNAL_DATA_VERSION:
231 logger.warning(
232 f"journal_quality data version is stale: on-disk={installed!r} "
233 f"bundled-latest={JOURNAL_DATA_VERSION!r}. "
234 f"Scoring is continuing with the older data. Visit "
235 f"/metrics/journals and click 'Download Data' to refresh."
236 )
237 self._stale_version_warned = True
239 def _validate_existing_db(self, path: Path) -> bool:
240 """Return True if the existing DB file is usable as-is.
242 A version of 0 means the file was built before schema stamping
243 existed and is grandfathered in — we don't force a rebuild just
244 because the stamp is missing. A non-zero version that doesn't
245 match the current schema is a real drift signal and triggers a
246 rebuild. File-open errors also trigger a rebuild.
247 """
248 from ..utilities.resource_utils import safe_close
250 conn: Optional[sqlite3.Connection] = None
251 try:
252 conn = sqlite3.connect(f"file:{path}?mode=ro", uri=True)
253 version = conn.execute("PRAGMA user_version").fetchone()[0]
254 if version != 0 and version != JOURNAL_QUALITY_SCHEMA_VERSION:
255 logger.warning(
256 f"journal_quality.db schema_version={version}, "
257 f"expected {JOURNAL_QUALITY_SCHEMA_VERSION} — "
258 f"rebuilding"
259 )
260 # NB: no explicit safe_close here — the finally block
261 # handles closing. Calling it twice produced a spurious
262 # "Cannot operate on a closed database" warning on
263 # every schema-triggered rebuild.
264 self._unlink_unusable_db(path)
265 return False
266 # Cheap sanity check — confirms the file is a valid DB.
267 conn.execute("SELECT 1 FROM sqlite_master LIMIT 1").fetchone()
268 return True
269 except (sqlite3.DatabaseError, OSError):
270 logger.exception(
271 f"journal_quality.db at {path} is unusable; rebuilding"
272 )
273 self._unlink_unusable_db(path)
274 return False
275 finally:
276 if conn is not None:
277 safe_close(conn, "journal_quality validate")
279 @staticmethod
280 def _unlink_unusable_db(path: Path) -> None:
281 """Best-effort cleanup of a corrupted / schema-drifted DB file.
283 Corruption was already logged by the caller (``_validate_existing_db``).
284 Both operations below are best-effort — on failure we *log and
285 continue* rather than raise, because the build path will rebuild
286 the file regardless. But we don't silence: if chmod / unlink
287 fails (permissions, read-only mount, file held open on Windows)
288 the next build will likely also fail and the user needs the
289 warning to diagnose the real problem.
290 """
291 try:
292 # bearer:disable python_lang_file_permissions
293 os.chmod(path, 0o644)
294 except OSError:
295 logger.warning(
296 f"Could not chmod 0644 on unusable DB {path} before "
297 f"unlink (continuing to unlink attempt)"
298 )
299 try:
300 path.unlink()
301 except OSError:
302 logger.warning(
303 f"Could not unlink unusable DB {path} (will be "
304 f"overwritten on next build)"
305 )
307 def _build_or_raise(self, path: Path) -> None:
308 """Lazy-build the DB on first access if it's missing."""
309 from .downloader import ensure_journal_data
311 data_dir, available = ensure_journal_data()
312 if not available:
313 raise FileNotFoundError(
314 "Journal data files not available. "
315 "Check your network connection or download manually "
316 "from the dashboard."
317 )
318 logger.info(f"Building {DB_FILENAME} from data files...")
319 build_db(data_dir=data_dir, output_path=path)
321 @contextmanager
322 def session(self) -> Iterator[Session]:
323 """Yield a read-only SQLAlchemy session.
325 If the underlying file becomes corrupt mid-session (e.g. a
326 rebuild ran and this engine is pointed at a now-unlinked inode),
327 DatabaseError propagates but we drop the cached engine so the
328 next call rebuilds cleanly instead of failing forever.
329 """
330 self._ensure_engine()
331 if self._SessionLocal is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 raise RuntimeError("JournalQualityDB engine failed to initialize")
333 from ..utilities.resource_utils import safe_close
335 s = self._SessionLocal()
336 try:
337 yield s
338 except (OperationalError, DatabaseError):
339 logger.exception("journal_quality.db error — resetting engine")
340 safe_close(s, "journal_quality session")
341 self.reset()
342 raise
343 else:
344 safe_close(s, "journal_quality session")
346 @property
347 def available(self) -> bool:
348 try:
349 self._ensure_engine()
350 return True
351 except FileNotFoundError:
352 return False
354 def reset(self) -> None:
355 """Drop the cached engine — call after `build_db` rebuilds the file."""
356 with self._lock:
357 if self._engine is not None: 357 ↛ exitline 357 didn't jump to the function exit
358 self._engine.dispose()
359 self._engine = None
360 self._SessionLocal = None
361 logger.info("Reset journal_quality.db engine")
363 # --- filter hot path: return plain dicts ---
365 def lookup_openalex(
366 self,
367 *,
368 source_id: Optional[str] = None,
369 issn: Optional[str] = None,
370 name: Optional[str] = None,
371 ) -> Optional[dict]:
372 """Look up a source by OpenAlex ID, ISSN, or name.
374 Returns a dict with the same shape the dict-based predecessor
375 produced (`name`, `type`, `h_index`, `impact_factor`,
376 `is_in_doaj`, `publisher`, `issn_l`) so the filter code at
377 `journal_reputation_filter.py` doesn't need to change.
378 """
379 issn = normalize_issn(issn)
380 try:
381 self._ensure_engine()
382 except FileNotFoundError:
383 return None
384 with self.session() as s:
385 row = self._lookup_source_row(s, source_id, issn, name)
386 return _source_to_lookup_dict(row) if row else None
388 # Alias used by the dashboard / future call sites
389 lookup_source = lookup_openalex
391 def count_predatory_by_names(self, names: Iterable[str]) -> int:
392 """Count how many of the given journal names are flagged predatory.
394 One SQL round-trip using ``WHERE name_lower IN (…) AND is_predatory = TRUE``.
395 Names are normalized (NFKC + lower + strip) so the caller can pass raw
396 display names; matches the normalization used at build time.
398 Used by the per-user metrics dashboard to report a global "N predatory
399 journals across all your research" stat without making N round trips
400 to the reference DB. Returns 0 if the reference DB is missing or if
401 ``names`` is empty.
403 .. note::
404 Deliberately unchunked. SQLite's ``SQLITE_MAX_VARIABLE_NUMBER``
405 has been 250,000 since SQLite 3.32 (2020); Python 3.11 ships
406 with 3.45.1. A heavy user with 100k distinct container_titles
407 is still well under the limit. Re-confirmed in the PR #3081
408 audit — no chunking needed.
409 """
410 normed = {normalize_name(n) for n in names if n}
411 normed.discard("")
412 if not normed:
413 return 0
414 try:
415 self._ensure_engine()
416 except FileNotFoundError:
417 return 0
418 with self.session() as s:
419 stmt = select(func.count(Source.id)).where(
420 Source.name_lower.in_(normed),
421 Source.is_predatory.is_(True),
422 )
423 result = s.execute(stmt).scalar()
424 return int(result or 0)
426 def lookup_sources_batch(self, names: Iterable[str]) -> dict:
427 """Batch-look-up multiple journal names in one query.
429 Takes an iterable of raw display names and returns a
430 ``{normalized_name: dashboard_dict}`` map for every name that
431 matched a Source. Names that didn't match are simply absent
432 from the result (caller decides how to handle misses).
434 Dashboard hot path: the ``/api/journals/user-research`` endpoint
435 collects up to 200 unique ``container_title`` values from the
436 user's Papers and hands them in here — one SQL round-trip vs.
437 200 per-row lookups.
439 Normalization matches ``normalize_name`` (NFKC + lower + strip)
440 so the reference DB's ``name_lower`` column hits directly. No
441 "the " / "proceedings of" fallback tiers — those live in
442 ``_lookup_source_row`` for precision per-call; the batch path
443 is for dashboard display where a miss is acceptable.
445 Chunked at 900 params per chunk. Defensive: SQLite's actual
446 limit (``SQLITE_MAX_VARIABLE_NUMBER``) has been 250,000 since
447 3.32 (2020) — we could easily put the whole batch in one IN —
448 but 900 keeps us well under any older embedded-SQLite ceiling
449 a deployment might pin to.
450 """
451 normed = [normalize_name(n) for n in names if n]
452 normed = [n for n in normed if n]
453 if not normed:
454 return {}
455 try:
456 self._ensure_engine()
457 except FileNotFoundError:
458 return {}
459 # De-duplicate while preserving insertion order for stable
460 # iteration in tests.
461 seen: set = set()
462 uniq: list = []
463 for n in normed:
464 if n not in seen:
465 seen.add(n)
466 uniq.append(n)
468 out: dict = {}
469 CHUNK = 900
470 with self.session() as s:
471 for i in range(0, len(uniq), CHUNK):
472 batch = uniq[i : i + CHUNK]
473 stmt = select(Source).where(Source.name_lower.in_(batch))
474 for row in s.scalars(stmt):
475 out[row.name_lower] = _source_to_dashboard_dict(row)
476 return out
478 def lookup_doaj(self, *, issn: Optional[str] = None) -> Optional[dict]:
479 issn = normalize_issn(issn)
480 if not issn:
481 return None
482 try:
483 self._ensure_engine()
484 except FileNotFoundError:
485 return None
486 with self.session() as s:
487 stmt = (
488 select(Source)
489 .where(Source.issn == issn, Source.is_in_doaj.is_(True))
490 .limit(1)
491 )
492 row = s.scalars(stmt).first()
493 if row is None:
494 return None
495 return {
496 "name": row.name,
497 "has_seal": row.has_doaj_seal,
498 "publisher": row.publisher,
499 }
501 def is_in_doaj(self, issn: Optional[str]) -> bool:
502 return self.lookup_doaj(issn=issn) is not None
504 def has_doaj_seal(self, issn: Optional[str]) -> bool:
505 entry = self.lookup_doaj(issn=issn)
506 return bool(entry and entry.get("has_seal"))
508 def is_predatory(
509 self,
510 *,
511 journal_name: Optional[str] = None,
512 publisher_name: Optional[str] = None,
513 ) -> tuple[bool, Optional[str]]:
514 """Check if a journal/publisher is on the predatory list.
516 Looks up the dedicated predatory tables (NOT just the
517 `is_predatory` flag on `Source`), so checks work for arbitrary
518 input names that aren't in OpenAlex.
519 """
520 try:
521 self._ensure_engine()
522 except FileNotFoundError:
523 return False, None
524 with self.session() as s:
525 if journal_name: 525 ↛ 532line 525 didn't jump to line 532 because the condition on line 525 was always true
526 norm = normalize_name(journal_name)
527 if s.get(PredatoryJournal, norm) is not None: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true
528 return True, "stop-predatory-journals"
529 if s.get(PredatoryHijacked, norm) is not None: 529 ↛ 530line 529 didn't jump to line 530 because the condition on line 529 was never true
530 return True, "stop-predatory-hijacked"
532 if publisher_name: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 pub_norm = normalize_name(publisher_name)
534 if s.get(PredatoryPublisher, pub_norm) is not None:
535 return True, "stop-predatory-publishers"
536 # Substring scan over long entries (~1162 rows)
537 stmt = select(PredatoryPublisher.name_lower).where(
538 PredatoryPublisher.is_long.is_(True)
539 )
540 for (entry,) in s.execute(stmt).all():
541 if pub_norm in entry or entry in pub_norm:
542 return True, "stop-predatory-publishers"
544 return False, None
546 def is_whitelisted(
547 self,
548 *,
549 issn: Optional[str] = None,
550 name: Optional[str] = None,
551 ) -> bool:
552 if self.is_in_doaj(issn):
553 return True
554 oa = self.lookup_openalex(issn=issn, name=name)
555 if oa and (oa.get("h_index") or 0) > PREDATORY_WHITELIST_HINDEX:
556 return True
557 return False
559 def lookup_institution(
560 self,
561 *,
562 ror_id: Optional[str] = None,
563 openalex_id: Optional[str] = None,
564 name: Optional[str] = None,
565 ) -> Optional[dict]:
566 """Look up an institution.
568 Order: openalex_id → ror → name. Returns a dict with full-name
569 keys (``name``, ``country``, ``type``, ``h_index``,
570 ``impact_factor``, ``works_count``, ``cited_by_count``, ``ror_id``)
571 or ``None`` if no match. The on-disk snapshot uses one-character
572 keys (``n``, ``c``, etc.) for space efficiency; the accessor
573 returns full names instead for legibility and schema robustness.
574 """
575 try:
576 self._ensure_engine()
577 except FileNotFoundError:
578 return None
579 with self.session() as s:
580 row: Optional[Institution] = None
582 if openalex_id:
583 sid = openalex_id.split("/")[-1]
584 row = s.get(Institution, sid)
586 if row is None and ror_id:
587 ror = ror_id.rstrip("/").split("/")[-1]
588 stmt = (
589 select(Institution)
590 .where(Institution.ror_id == ror)
591 .limit(1)
592 )
593 row = s.scalars(stmt).first()
595 if row is None and name:
596 norm = normalize_name(name)
597 stmt = (
598 select(Institution)
599 .where(Institution.name_lower == norm)
600 .limit(1)
601 )
602 row = s.scalars(stmt).first()
604 return _institution_to_dict(row) if row else None
606 def score_from_affiliations(self, affiliations: list) -> Optional[int]:
607 """Derive a score from author affiliations in ONE SQL query."""
608 if not affiliations:
609 return None
611 openalex_ids: list[str] = []
612 ror_ids: list[str] = []
613 names: list[str] = []
615 for aff in affiliations:
616 if isinstance(aff, str):
617 names.append(normalize_name(aff))
618 elif isinstance(aff, dict):
619 if oid := (aff.get("openalex_id") or aff.get("id")):
620 openalex_ids.append(oid.split("/")[-1])
621 if rid := aff.get("ror"):
622 ror_ids.append(rid.rstrip("/").split("/")[-1])
623 if nm := aff.get("name"):
624 names.append(normalize_name(nm))
626 if not (openalex_ids or ror_ids or names):
627 return None
629 try:
630 self._ensure_engine()
631 except FileNotFoundError:
632 return None
634 clauses = []
635 if openalex_ids:
636 clauses.append(Institution.openalex_id.in_(openalex_ids))
637 if ror_ids:
638 clauses.append(Institution.ror_id.in_(ror_ids))
639 if names:
640 clauses.append(Institution.name_lower.in_(names))
642 with self.session() as s:
643 stmt = select(func.max(Institution.h_index)).where(
644 or_(*clauses), Institution.h_index.is_not(None)
645 )
646 best_h = s.scalar(stmt)
648 # Single source of truth for institution scoring lives in
649 # scoring.py — delegate so the build phase, the runtime filter,
650 # and this affiliation-salvage path can never disagree.
651 return institution_score_from_h_index(best_h)
653 # Static passthrough so the filter can call dm.derive_quality_score(...)
654 # without importing from .scoring directly. Single home for the
655 # scoring rules in scoring.py.
656 derive_quality_score = staticmethod(derive_quality_score)
658 def expand_abbreviation(self, name: str) -> Optional[str]:
659 if not name: 659 ↛ 660line 659 didn't jump to line 660 because the condition on line 659 was never true
660 return None
661 try:
662 self._ensure_engine()
663 except FileNotFoundError:
664 return None
665 normalized = normalize_name(name)
666 with self.session() as s:
667 row = s.get(Abbreviation, normalized)
668 if row is not None:
669 return row.full_name
670 no_dots = normalized.replace(".", "").strip()
671 if no_dots != normalized:
672 row = s.get(Abbreviation, no_dots)
673 if row is not None:
674 return row.full_name
675 return None
677 # --- internal source lookup with name fallbacks ---
679 def _lookup_source_row(
680 self,
681 s: Session,
682 source_id: Optional[str],
683 issn: Optional[str],
684 name: Optional[str],
685 ) -> Optional[Source]:
686 if source_id: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true
687 sid = source_id.split("/")[-1] if "/" in source_id else source_id
688 stmt = (
689 select(Source).where(Source.openalex_source_id == sid).limit(1)
690 )
691 row = s.scalars(stmt).first()
692 if row is not None:
693 return row
695 if issn: 695 ↛ 696line 695 didn't jump to line 696 because the condition on line 695 was never true
696 stmt = select(Source).where(Source.issn == issn).limit(1)
697 row = s.scalars(stmt).first()
698 if row is not None:
699 return row
701 if name: 701 ↛ 751line 701 didn't jump to line 751 because the condition on line 701 was always true
702 norm = normalize_name(name)
703 row = self._fetch_by_name_lower(s, norm)
704 if row is not None: 704 ↛ 705line 704 didn't jump to line 705 because the condition on line 704 was never true
705 return row
706 # Try with/without "the " prefix (~5K journals have it)
707 if norm.startswith("the "): 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true
708 row = self._fetch_by_name_lower(s, norm[4:])
709 else:
710 row = self._fetch_by_name_lower(s, "the " + norm)
711 if row is not None: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true
712 return row
713 # Strip "proceedings of (the) (conference on) " prefix
714 stripped = norm
715 for prefix in (
716 "proceedings of the conference on ",
717 "proceedings of the ",
718 "proceedings of ",
719 ):
720 if stripped.startswith(prefix): 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 stripped = stripped[len(prefix) :]
722 break
723 if stripped != norm: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 row = self._fetch_by_name_lower(s, stripped)
725 if row is not None:
726 return row
728 # MEDLINE-style "Title : long subtitle" — try the segment
729 # before the colon. Catches PubMed names like
730 # "Molecular therapy : the journal of the American Society..."
731 # → "Molecular therapy"
732 if " : " in norm: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 head = norm.split(" : ", 1)[0].strip()
734 if head and head != norm:
735 row = self._fetch_by_name_lower(s, head)
736 if row is not None:
737 return row
739 # MEDLINE-style "Title. Section name" — try the segment
740 # before the first period. Catches PubMed names like
741 # "Molecular therapy. Methods and clinical development"
742 # but only when the head is meaningfully shorter (we don't
743 # want to match "Nat" from "Nat. Commun.").
744 if "." in norm: 744 ↛ 745line 744 didn't jump to line 745 because the condition on line 744 was never true
745 head = norm.split(".", 1)[0].strip()
746 if head and len(head) >= 6 and head != norm:
747 row = self._fetch_by_name_lower(s, head)
748 if row is not None:
749 return row
751 return None
753 @staticmethod
754 def _fetch_by_name_lower(s: Session, name_lower: str) -> Optional[Source]:
755 stmt = select(Source).where(Source.name_lower == name_lower).limit(1)
756 return s.scalars(stmt).first()
758 # --- dashboard queries ---
760 def get_summary(self) -> dict:
761 if not self.available:
762 return {
763 "total": 0,
764 "avg_quality": 0,
765 "avg_h_index": None,
766 "predatory_count": 0,
767 "doaj_count": 0,
768 "seal_count": 0,
769 "llm_count": 0,
770 }
772 with self.session() as s:
773 row = s.execute(
774 select(
775 func.count().label("total"),
776 func.round(func.avg(Source.quality), 1).label(
777 "avg_quality"
778 ),
779 func.round(func.avg(Source.h_index)).label("avg_h_index"),
780 func.sum(func.iif(Source.is_predatory, 1, 0)).label(
781 "predatory_count"
782 ),
783 func.sum(func.iif(Source.is_in_doaj, 1, 0)).label(
784 "doaj_count"
785 ),
786 func.sum(func.iif(Source.has_doaj_seal, 1, 0)).label(
787 "seal_count"
788 ),
789 func.sum(
790 func.iif(Source.score_source == "llm", 1, 0)
791 ).label("llm_count"),
792 )
793 ).first()
794 return dict(row._mapping) if row else {}
796 def get_quality_distribution(self) -> dict[str, int]:
797 if not self.available:
798 return {}
799 with self.session() as s:
800 rows = s.execute(
801 select(Source.quality, func.count().label("cnt"))
802 .where(Source.quality.is_not(None))
803 .group_by(Source.quality)
804 .order_by(Source.quality)
805 ).all()
806 return {str(q): c for q, c in rows}
808 def get_source_distribution(self) -> dict[str, int]:
809 if not self.available:
810 return {}
811 with self.session() as s:
812 rows = s.execute(
813 select(
814 func.coalesce(Source.score_source, "unknown").label("src"),
815 func.count().label("cnt"),
816 ).group_by(Source.score_source)
817 ).all()
818 return {row.src: row.cnt for row in rows}
820 def get_journals_page(
821 self,
822 *,
823 page: int = 1,
824 per_page: int = 50,
825 search: str = "",
826 tier: str = "",
827 score_source: str = "",
828 sort: str = "quality",
829 order: str = "desc",
830 ) -> tuple[list[dict], int]:
831 if not self.available:
832 return [], 0
834 if sort not in _SORT_COLUMNS:
835 sort = "quality"
836 if order not in ("asc", "desc"):
837 order = "desc"
839 wheres: list = []
840 if search:
841 needle = _escape_like(normalize_name(search)[:_MAX_SEARCH_LEN])
842 wheres.append(Source.name_lower.like(f"%{needle}%", escape="/"))
843 if tier and tier in _TIER_RANGES:
844 lo, hi = _TIER_RANGES[tier]
845 wheres.append(Source.quality.between(lo, hi))
846 if score_source:
847 wheres.append(Source.score_source == score_source)
849 sort_col = getattr(Source, sort)
850 order_clause = (
851 sort_col.desc().nulls_last()
852 if order == "desc"
853 else sort_col.asc().nulls_last()
854 )
856 offset = (max(1, page) - 1) * per_page
858 with self.session() as s:
859 total = (
860 s.scalar(
861 select(func.count()).select_from(Source).where(*wheres)
862 )
863 or 0
864 )
865 rows = s.scalars(
866 select(Source)
867 .where(*wheres)
868 .order_by(order_clause)
869 .limit(per_page)
870 .offset(offset)
871 ).all()
873 return [_source_to_dashboard_dict(r) for r in rows], total
875 def get_institutions_page(
876 self,
877 *,
878 page: int = 1,
879 per_page: int = 50,
880 search: str = "",
881 sort: str = "h_index",
882 order: str = "desc",
883 ) -> tuple[list[dict], int]:
884 if not self.available:
885 return [], 0
887 # Defensive allowlist — matches the pattern in get_journals_page.
888 # The ternary below is already safe (non-"desc" falls through to
889 # .asc()), but the explicit check prevents future refactors from
890 # accidentally interpolating a tainted value into SQL.
891 if order not in ("asc", "desc"):
892 order = "desc"
894 wheres = []
895 if search:
896 needle = _escape_like(normalize_name(search)[:_MAX_SEARCH_LEN])
897 wheres.append(
898 Institution.name_lower.like(f"%{needle}%", escape="/")
899 )
901 sort_col = (
902 Institution.h_index if sort == "h_index" else Institution.name
903 )
904 order_clause = (
905 sort_col.desc().nulls_last()
906 if order == "desc"
907 else sort_col.asc().nulls_last()
908 )
910 offset = (max(1, page) - 1) * per_page
912 with self.session() as s:
913 total = (
914 s.scalar(
915 select(func.count()).select_from(Institution).where(*wheres)
916 )
917 or 0
918 )
919 rows = s.scalars(
920 select(Institution)
921 .where(*wheres)
922 .order_by(order_clause)
923 .limit(per_page)
924 .offset(offset)
925 ).all()
927 return [_institution_to_dashboard_dict(r) for r in rows], total
930# ---------------------------------------------------------------------------
931# Dict adapters — keep filter/dashboard call sites unchanged
932# ---------------------------------------------------------------------------
935def _source_to_lookup_dict(row: Source) -> dict:
936 """Convert a Source row to the dict shape `lookup_openalex` produces.
938 Includes `openalex_source_id` so dashboard / test code can chain
939 a follow-up `lookup_source(source_id=...)` call. Also exposes
940 ``quartile`` so the filter can store it on the per-user Journal row
941 and feed it into score derivation.
942 """
943 return {
944 "name": row.name,
945 "type": row.source_type,
946 "h_index": row.h_index,
947 "impact_factor": row.impact_factor,
948 "is_in_doaj": row.is_in_doaj,
949 "publisher": row.publisher,
950 "issn_l": row.issn,
951 "openalex_source_id": row.openalex_source_id,
952 "quartile": row.quartile,
953 }
956def _source_to_dashboard_dict(row: Source) -> dict:
957 return {
958 "name": row.name,
959 "quality": row.quality,
960 "quartile": row.quartile,
961 "cited_by_count": row.cited_by_count,
962 "h_index": row.h_index,
963 "impact_factor": (
964 round(row.impact_factor, 2) if row.impact_factor else None
965 ),
966 "is_in_doaj": bool(row.is_in_doaj),
967 "has_doaj_seal": bool(row.has_doaj_seal),
968 "is_predatory": bool(row.is_predatory),
969 "predatory_source": row.predatory_source,
970 "score_source": row.score_source,
971 "source_type": row.source_type,
972 "publisher": row.publisher,
973 "issn": row.issn,
974 "openalex_source_id": row.openalex_source_id,
975 }
978def _institution_to_dict(row: Institution) -> dict:
979 """Public accessor shape for `lookup_institution`.
981 The on-disk JSON snapshot uses one-character keys (``n``, ``c``,
982 ``t``, …) purely for space efficiency — 200k institutions × seven
983 long field names adds real bytes. Callers of the accessor don't
984 care about on-disk layout, so here we return the full names to
985 keep the public API legible and robust to future schema tweaks.
986 """
987 return {
988 "name": row.name,
989 "country": row.country,
990 "type": row.type,
991 "h_index": row.h_index,
992 "impact_factor": row.impact_factor,
993 "works_count": row.works_count,
994 "cited_by_count": row.cited_by_count,
995 "ror_id": row.ror_id,
996 }
999def _institution_to_dashboard_dict(row: Institution) -> dict:
1000 return {
1001 "openalex_id": row.openalex_id,
1002 "name": row.name,
1003 "ror_id": row.ror_id,
1004 "country": row.country,
1005 "type": row.type,
1006 "h_index": row.h_index,
1007 "impact_factor": row.impact_factor,
1008 "works_count": row.works_count,
1009 "cited_by_count": row.cited_by_count,
1010 }
1013# ---------------------------------------------------------------------------
1014# Module singleton
1015# ---------------------------------------------------------------------------
1018_db: Optional[JournalQualityDB] = None
1019_db_lock = threading.Lock()
1022def get_db() -> JournalQualityDB:
1023 """Get or create the singleton `JournalQualityDB`."""
1024 global _db
1025 if _db is None:
1026 with _db_lock:
1027 if _db is None: 1027 ↛ 1029line 1027 didn't jump to line 1029
1028 _db = JournalQualityDB()
1029 return _db
1032# Backwards-compat aliases used by metrics_routes.py and a couple of tests
1033get_journal_reference_db = get_db
1034JournalReferenceDB = JournalQualityDB
1037def reset_db() -> None:
1038 """Reset the cached engine after a build_db rebuild.
1040 Held under `_db_lock` so a concurrent `get_db()` call can't see a
1041 half-disposed singleton — without the lock, Thread B could pass
1042 the `if _db is None` check in `get_db()` while Thread A is still
1043 inside `_db.reset()`, then call `_ensure_engine()` which short-
1044 circuits on the still-set `_engine` and hands back a disposed
1045 pool. The lock makes the read-then-reset pair atomic with respect
1046 to `get_db()`'s lazy-init path.
1047 """
1048 global _db
1049 with _db_lock:
1050 if _db is not None:
1051 _db.reset()
1054# ---------------------------------------------------------------------------
1055# Build phase — the ONLY writer
1056# ---------------------------------------------------------------------------
1059def build_db(
1060 data_dir: Optional[Path] = None,
1061 output_path: Optional[Path] = None,
1062) -> None:
1063 """Compile `journal_quality.db` from the gzipped JSON sources.
1065 Opens a SHORT-LIVED writable engine, creates the schema, populates
1066 every table from the gz files, runs ANALYZE + VACUUM, closes the
1067 engine, then `chmod 0o444` the file.
1068 """
1069 start = time.time()
1071 if data_dir is None:
1072 from ..config.paths import get_journal_data_directory
1074 data_dir = get_journal_data_directory()
1075 if output_path is None:
1076 output_path = data_dir / DB_FILENAME
1078 logger.info(
1079 "Building journal quality reference DB (one-time, "
1080 "~30s, decompresses ~25 MB of bundled data)…"
1081 )
1083 # Sweep stale temp files from prior crashed builds so they don't
1084 # accumulate. Any .tmp-* older than 1h is assumed dead.
1085 _sweep_stale_tmp_files(output_path.parent, output_path.name)
1087 # Build into a unique temp path, then os.replace() atomically at
1088 # the end. A random suffix (not a fixed .tmp) lets concurrent
1089 # builders write to separate files instead of racing on the same
1090 # path — os.replace picks a winner atomically and neither corrupts
1091 # the live file.
1092 tmp_path = output_path.with_name(
1093 f"{output_path.name}.tmp-{os.getpid()}-{secrets.token_hex(4)}"
1094 )
1096 write_url = f"sqlite:///{tmp_path}"
1097 engine = create_engine(write_url, connect_args={"check_same_thread": False})
1099 try:
1100 # Pragmas for fast bulk insert. `journal_mode = OFF` plus
1101 # `synchronous = OFF` is deliberately unsafe for general use but
1102 # correct here because durability is guaranteed by the temp-file
1103 # + os.replace() pattern around this block: we write to a unique
1104 # `.tmp-PID-RAND` path, and on any crash mid-build the incomplete
1105 # temp file is orphaned (and swept by `_sweep_stale_tmp_files()`
1106 # on the next build). The live file is only ever moved into place
1107 # by the atomic `os.replace()` at the bottom of this function —
1108 # it never sees a partial write. Do NOT copy this pragma set
1109 # elsewhere without the same atomic rename discipline.
1110 with engine.connect() as conn:
1111 conn.exec_driver_sql("PRAGMA journal_mode = OFF")
1112 conn.exec_driver_sql("PRAGMA synchronous = OFF")
1113 conn.exec_driver_sql("PRAGMA cache_size = -64000")
1114 conn.exec_driver_sql("PRAGMA page_size = 4096")
1116 JournalQualityBase.metadata.create_all(engine)
1118 SessionWrite = sessionmaker(bind=engine)
1119 with SessionWrite() as session:
1120 sources = _load_openalex(data_dir)
1121 doaj_data = _load_doaj(data_dir)
1122 pred_data = _load_predatory(data_dir)
1123 institutions = _load_institutions(data_dir)
1124 abbreviations = _load_abbreviations(data_dir)
1126 _populate_predatory(session, pred_data)
1127 _populate_sources(session, sources, doaj_data, pred_data)
1128 _populate_institutions(session, institutions)
1129 _populate_abbreviations(session, abbreviations)
1130 session.commit()
1132 with engine.connect() as conn:
1133 conn.exec_driver_sql("ANALYZE")
1134 conn.exec_driver_sql("VACUUM")
1135 # Stamp schema version so _ensure_engine can detect drift
1136 # without depending on the external version.json.
1137 conn.exec_driver_sql(
1138 f"PRAGMA user_version = {JOURNAL_QUALITY_SCHEMA_VERSION}"
1139 )
1140 except Exception:
1141 engine.dispose()
1142 if tmp_path.exists():
1143 try:
1144 # bearer:disable python_lang_file_permissions
1145 os.chmod(tmp_path, 0o644)
1146 tmp_path.unlink()
1147 except OSError:
1148 logger.exception(f"Failed to clean up tmp DB at {tmp_path}")
1149 raise
1151 engine.dispose()
1153 # Atomically swap tmp into place. os.replace is atomic on POSIX and
1154 # overwrites an existing output_path if present.
1155 if output_path.exists():
1156 # Prior file is chmod 0444 from the previous build — relax it
1157 # so os.replace can overwrite. Best-effort: if chmod fails
1158 # (e.g. read-only mount), os.replace will raise and surface
1159 # the real problem. Log so the cause is visible.
1160 try:
1161 # bearer:disable python_lang_file_permissions
1162 os.chmod(output_path, 0o644)
1163 except OSError:
1164 logger.warning(
1165 f"Could not chmod 0644 on existing {output_path} before "
1166 f"os.replace; if the replace fails this is likely why"
1167 )
1168 os.replace(tmp_path, output_path)
1170 # OS-level read-only flag — third layer of write protection.
1171 # POSIX chmod is a no-op on Windows, so we also set the Windows
1172 # read-only file attribute via SetFileAttributesW. The pre-commit
1173 # hook check-journal-quality-readonly.py remains the primary
1174 # defense against accidental writable opens.
1175 # bearer:disable python_lang_file_permissions
1176 os.chmod(output_path, 0o444)
1177 if sys.platform == "win32":
1178 try:
1179 import ctypes
1181 # FILE_ATTRIBUTE_READONLY = 0x1
1182 ok = ctypes.windll.kernel32.SetFileAttributesW(
1183 str(output_path), 0x1
1184 )
1185 if not ok:
1186 logger.warning(
1187 f"SetFileAttributesW failed on {output_path.name}; "
1188 "readonly pre-commit hook is the sole defense."
1189 )
1190 except Exception:
1191 logger.warning(
1192 f"Could not set Windows readonly attribute on "
1193 f"{output_path.name}"
1194 )
1196 elapsed = time.time() - start
1197 size_mb = output_path.stat().st_size / (1024 * 1024)
1198 with sqlite3.connect(
1199 f"file:{output_path}?mode=ro&immutable=1", uri=True
1200 ) as _count_conn:
1201 source_count = _count_conn.execute(
1202 "SELECT COUNT(*) FROM sources"
1203 ).fetchone()[0]
1204 logger.info(
1205 f"Journal quality DB ready: {source_count} sources, "
1206 f"{size_mb:.1f} MB in {elapsed:.1f}s ({output_path.name}, chmod 0o444)"
1207 )
1209 reset_db()
1212def _sweep_stale_tmp_files(directory: Path, base_name: str) -> None:
1213 """Remove journal_quality.db.tmp-* files older than 1h.
1215 Per-file OSError (vanished between glob+stat, no permission, etc.)
1216 is logged at debug — the sweep is best-effort and shouldn't stop
1217 the build, but silent-pass on filesystem errors hides the cause of
1218 accumulating stale tmp files that would otherwise eat disk over
1219 time.
1220 """
1221 if not directory.exists():
1222 return
1223 cutoff = time.time() - 3600
1224 for tmp in directory.glob(f"{base_name}.tmp-*"):
1225 try:
1226 if tmp.stat().st_mtime < cutoff:
1227 tmp.unlink()
1228 logger.info(f"Swept stale temp build file: {tmp.name}")
1229 except OSError:
1230 logger.debug(f"Could not sweep stale tmp file {tmp.name}")
1233# ---------------------------------------------------------------------------
1234# Source-data loaders (used by build_db only)
1235# ---------------------------------------------------------------------------
1238def _load_openalex(data_dir: Path) -> dict:
1239 path = data_dir / "openalex_sources.json.gz"
1240 if not path.exists():
1241 raise FileNotFoundError(f"OpenAlex source file not found: {path}")
1242 with gzip.open(path, "rt", encoding="utf-8") as f:
1243 data = json.load(f)
1244 sources = data.get("s", data.get("sources", {}))
1245 logger.info(f"Loaded {len(sources)} OpenAlex sources")
1246 return dict(sources)
1249def _load_doaj(data_dir: Path) -> dict:
1250 path = data_dir / "doaj_journals.json"
1251 if not path.exists():
1252 logger.warning(f"{path} not found — DOAJ cross-ref will be skipped")
1253 return {}
1254 with open(path, encoding="utf-8") as f:
1255 data = json.load(f)
1256 journals = data.get("journals", {})
1257 logger.info(f"Loaded {len(journals)} DOAJ entries")
1258 return dict(journals)
1261def _load_predatory(data_dir: Path) -> dict:
1262 """Returns {journals: set, publishers: set, hijacked: set, long_pubs: list}."""
1263 path = data_dir / "predatory.json"
1264 if not path.exists():
1265 logger.warning(f"{path} not found — predatory check will be skipped")
1266 return {
1267 "journals": set(),
1268 "publishers": set(),
1269 "hijacked": set(),
1270 "long_pubs": [],
1271 }
1273 with open(path, encoding="utf-8") as f:
1274 data = json.load(f)
1276 journal_names = {
1277 normalize_name(e.get("name", ""))
1278 for e in data.get("journals", [])
1279 if e.get("name", "").strip()
1280 }
1281 publisher_names = {
1282 normalize_name(e.get("name", ""))
1283 for e in data.get("publishers", [])
1284 if e.get("name", "").strip()
1285 }
1286 hijacked_names = {
1287 normalize_name(e.get("hijacked_name", ""))
1288 for e in data.get("hijacked", [])
1289 if e.get("hijacked_name", "").strip()
1290 }
1291 long_pubs = [
1292 normalize_name(e.get("name", ""))
1293 for e in data.get("publishers", [])
1294 if len(e.get("name", "").strip()) >= 10
1295 ]
1296 logger.info(
1297 f"Loaded predatory: {len(journal_names)} journals, "
1298 f"{len(publisher_names)} publishers, "
1299 f"{len(hijacked_names)} hijacked"
1300 )
1301 return {
1302 "journals": journal_names,
1303 "publishers": publisher_names,
1304 "hijacked": hijacked_names,
1305 "long_pubs": long_pubs,
1306 }
1309def _load_institutions(data_dir: Path) -> dict:
1310 path = data_dir / "openalex_institutions.json.gz"
1311 if not path.exists():
1312 logger.warning(f"{path} not found — institution tier will be empty")
1313 return {}
1314 with gzip.open(path, "rt", encoding="utf-8") as f:
1315 data = json.load(f)
1316 institutions = data.get("i", {})
1317 logger.info(f"Loaded {len(institutions)} institutions")
1318 return dict(institutions)
1321def _load_abbreviations(data_dir: Path) -> dict:
1322 path = data_dir / "jabref_abbreviations.json.gz"
1323 if not path.exists():
1324 logger.warning(
1325 f"{path} not found — abbreviation expansion will be empty"
1326 )
1327 return {}
1328 with gzip.open(path, "rt", encoding="utf-8") as f:
1329 data = json.load(f)
1330 mappings = data.get("abbrev_to_full", {})
1331 logger.info(f"Loaded {len(mappings)} abbreviation mappings")
1332 return dict(mappings)
1335# ---------------------------------------------------------------------------
1336# Table populators
1337# ---------------------------------------------------------------------------
1340def _populate_predatory(session: Session, pred: dict) -> None:
1341 long_set = set(pred.get("long_pubs", []))
1343 journals = [{"name_lower": n} for n in pred.get("journals", set()) if n]
1344 if journals:
1345 session.bulk_insert_mappings(PredatoryJournal, journals) # type: ignore[arg-type]
1347 hijacked = [{"name_lower": n} for n in pred.get("hijacked", set()) if n]
1348 if hijacked:
1349 session.bulk_insert_mappings(PredatoryHijacked, hijacked) # type: ignore[arg-type]
1351 publishers = [
1352 {"name_lower": n, "is_long": n in long_set}
1353 for n in pred.get("publishers", set())
1354 if n
1355 ]
1356 if publishers:
1357 session.bulk_insert_mappings(PredatoryPublisher, publishers) # type: ignore[arg-type]
1359 logger.info(
1360 f"Inserted predatory tables: "
1361 f"{len(journals)} journals, "
1362 f"{len(publishers)} publishers, "
1363 f"{len(hijacked)} hijacked"
1364 )
1367def _populate_sources(
1368 session: Session,
1369 sources: dict,
1370 doaj_data: dict,
1371 pred: dict,
1372) -> None:
1373 """Build Source rows with cross-referenced DOAJ + predatory flags."""
1374 type_map = {"j": "journal", "c": "conference"}
1375 pred_journals = pred.get("journals", set())
1376 pred_publishers = pred.get("publishers", set())
1377 pred_hijacked = pred.get("hijacked", set())
1379 # Dedup key is (name_lower, issn or "") so journals with separate
1380 # print and electronic ISSNs in OpenAlex both survive instead of
1381 # collapsing onto one row.
1382 seen: dict[tuple[str, str], dict] = {}
1384 for source_id, compact in sources.items():
1385 name = (compact.get("n") or "").strip()
1386 if not name: 1386 ↛ 1387line 1386 didn't jump to line 1387 because the condition on line 1386 was never true
1387 continue
1389 name_lower = normalize_name(name)
1390 issn = normalize_issn(compact.get("i"))
1391 publisher = compact.get("p") or None
1392 h_index = compact.get("h")
1393 impact_factor = compact.get("if")
1394 cited_by_count = compact.get("cb")
1395 source_type = type_map.get(compact.get("t", ""), compact.get("t", ""))
1397 doaj_entry = doaj_data.get(issn) if issn else None
1398 is_in_doaj = doaj_entry is not None
1399 has_doaj_seal = bool(doaj_entry and doaj_entry.get("has_seal"))
1401 is_pred = name_lower in pred_journals
1402 pred_source = "stop-predatory-journals" if is_pred else None
1403 if not is_pred and publisher: 1403 ↛ 1404line 1403 didn't jump to line 1404 because the condition on line 1403 was never true
1404 pub_norm = normalize_name(publisher)
1405 if pub_norm in pred_publishers:
1406 is_pred = True
1407 pred_source = "stop-predatory-publishers"
1408 if not is_pred and name_lower in pred_hijacked:
1409 is_pred = True
1410 pred_source = "stop-predatory-hijacked"
1412 # Whitelist override
1413 if is_pred and ( 1413 ↛ 1416line 1413 didn't jump to line 1416 because the condition on line 1413 was never true
1414 is_in_doaj or (h_index or 0) > PREDATORY_WHITELIST_HINDEX
1415 ):
1416 is_pred = False
1417 pred_source = None
1419 quality = derive_quality_score(
1420 h_index=h_index,
1421 is_in_doaj=is_in_doaj,
1422 has_doaj_seal=has_doaj_seal,
1423 is_predatory=is_pred,
1424 source_type=source_type,
1425 )
1427 rec = {
1428 "name": name,
1429 "name_lower": name_lower,
1430 "issn": issn,
1431 "openalex_source_id": source_id,
1432 "source_type": source_type,
1433 "publisher": publisher,
1434 "h_index": h_index,
1435 "impact_factor": impact_factor,
1436 "cited_by_count": cited_by_count,
1437 "quartile": None, # filled in by the post-pass below
1438 "quality": quality,
1439 "is_in_doaj": is_in_doaj,
1440 "has_doaj_seal": has_doaj_seal,
1441 "is_predatory": is_pred,
1442 "predatory_source": pred_source,
1443 "score_source": "openalex",
1444 }
1446 key = (name_lower, issn or "")
1447 prev = seen.get(key)
1448 if prev is None or (h_index or 0) > (prev.get("h_index") or 0): 1448 ↛ 1384line 1448 didn't jump to line 1384 because the condition on line 1448 was always true
1449 seen[key] = rec
1451 # Second pass: DOAJ-only journals (not in OpenAlex). Without this
1452 # we lose ~4-7K small open-access venues. Keyed by name_lower so
1453 # we don't double-insert anything OpenAlex already covered.
1454 openalex_names = {k[0] for k in seen.keys()}
1455 doaj_added = 0
1456 for issn, doaj_entry in doaj_data.items():
1457 name = (doaj_entry.get("name") or "").strip()
1458 if not name: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the condition on line 1458 was never true
1459 continue
1460 name_lower = normalize_name(name)
1461 if name_lower in openalex_names: 1461 ↛ 1462line 1461 didn't jump to line 1462 because the condition on line 1461 was never true
1462 continue
1463 has_seal = bool(doaj_entry.get("has_seal"))
1464 publisher = doaj_entry.get("publisher") or None
1465 quality = derive_quality_score(
1466 h_index=None,
1467 is_in_doaj=True,
1468 has_doaj_seal=has_seal,
1469 is_predatory=False,
1470 source_type="journal",
1471 )
1472 seen[(name_lower, issn or "")] = {
1473 "name": name,
1474 "name_lower": name_lower,
1475 "issn": issn,
1476 "openalex_source_id": None,
1477 "source_type": "journal",
1478 "publisher": publisher,
1479 "h_index": None,
1480 "impact_factor": None,
1481 "cited_by_count": None,
1482 "quartile": None,
1483 "quality": quality,
1484 "is_in_doaj": True,
1485 "has_doaj_seal": has_seal,
1486 "is_predatory": False,
1487 "predatory_source": None,
1488 "score_source": "doaj",
1489 }
1490 openalex_names.add(name_lower)
1491 doaj_added += 1
1493 records = list(seen.values())
1495 # Derive quartile (Q1–Q4) from cited_by_count percentile within each
1496 # source_type. Field-specific quartiles would be more accurate but
1497 # require per-source topic data that 4–7×s the snapshot size, so we
1498 # use global per-type percentiles as a defensible approximation
1499 # given the license constraint that ruled out SJR.
1500 by_type: dict[str, list[dict]] = {}
1501 for r in records:
1502 if r.get("cited_by_count") is None:
1503 continue # NULL quartile for entries without citation data
1504 by_type.setdefault(r.get("source_type") or "", []).append(r)
1505 for type_records in by_type.values():
1506 type_records.sort(key=lambda r: r["cited_by_count"])
1507 n = len(type_records)
1508 if n == 0: 1508 ↛ 1509line 1508 didn't jump to line 1509 because the condition on line 1508 was never true
1509 continue
1510 for rank, r in enumerate(type_records):
1511 pct = rank / n # 0.0 = lowest, ~1.0 = highest
1512 if pct >= 0.75:
1513 r["quartile"] = "Q1"
1514 elif pct >= 0.50:
1515 r["quartile"] = "Q2"
1516 elif pct >= 0.25:
1517 r["quartile"] = "Q3"
1518 else:
1519 r["quartile"] = "Q4"
1521 # Re-derive quality now that quartile is available. The first-pass
1522 # `quality` values above were computed without quartile and are
1523 # therefore suboptimal — a Q1 journal without h-index data would
1524 # have scored `None` (fall-through) instead of 8. The runtime filter
1525 # code in journal_reputation_filter.py does pass quartile, so the
1526 # stored column should agree with the live score.
1527 for r in records:
1528 r["quality"] = derive_quality_score(
1529 h_index=r.get("h_index"),
1530 quartile=r.get("quartile"),
1531 is_in_doaj=r.get("is_in_doaj") or False,
1532 has_doaj_seal=r.get("has_doaj_seal") or False,
1533 is_predatory=r.get("is_predatory") or False,
1534 source_type=r.get("source_type"),
1535 )
1537 logger.info(
1538 f"Inserting {len(records)} source records ({doaj_added} DOAJ-only)..."
1539 )
1540 for i in range(0, len(records), _BATCH_SIZE):
1541 session.bulk_insert_mappings(Source, records[i : i + _BATCH_SIZE]) # type: ignore[arg-type]
1544def _populate_institutions(session: Session, institutions: dict) -> None:
1545 records: list[dict] = []
1546 for inst_id, compact in institutions.items():
1547 name = (compact.get("n") or "").strip()
1548 if not name: 1548 ↛ 1549line 1548 didn't jump to line 1549 because the condition on line 1548 was never true
1549 continue
1550 records.append(
1551 {
1552 "openalex_id": inst_id,
1553 "name": name,
1554 "name_lower": normalize_name(name),
1555 "ror_id": compact.get("r"),
1556 "country": compact.get("c"),
1557 "type": compact.get("t"),
1558 "h_index": compact.get("h"),
1559 "impact_factor": compact.get("if"),
1560 "works_count": compact.get("w"),
1561 "cited_by_count": compact.get("cb"),
1562 }
1563 )
1564 logger.info(f"Inserting {len(records)} institution records...")
1565 for i in range(0, len(records), _BATCH_SIZE):
1566 session.bulk_insert_mappings(Institution, records[i : i + _BATCH_SIZE]) # type: ignore[arg-type]
1569def _populate_abbreviations(session: Session, mappings: dict) -> None:
1570 records: list[dict] = []
1571 seen: set[str] = set()
1572 for abbrev, full in mappings.items():
1573 norm = normalize_name(abbrev)
1574 if not norm or norm in seen:
1575 continue
1576 seen.add(norm)
1577 records.append({"abbrev_lower": norm, "full_name": full})
1578 logger.info(f"Inserting {len(records)} abbreviation records...")
1579 for i in range(0, len(records), _BATCH_SIZE):
1580 session.bulk_insert_mappings(Abbreviation, records[i : i + _BATCH_SIZE]) # type: ignore[arg-type]
1583# ---------------------------------------------------------------------------
1584# Backwards-compat shim for the old build_reference_db name
1585# ---------------------------------------------------------------------------
1588def build_reference_db(
1589 data_dir: Optional[Path] = None,
1590 output_path: Optional[Path] = None,
1591) -> None:
1592 """Deprecated alias for `build_db`."""
1593 build_db(data_dir=data_dir, output_path=output_path)