Coverage for src/local_deep_research/journal_quality/db.py: 36%

672 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""Read-only SQLAlchemy accessor for the compiled journal-quality DB. 

2 

3INVARIANT: this module's `JournalQualityDB` class **never writes**. 

4The runtime engine is opened with SQLite URI flags `mode=ro` and 

5`immutable=1`, the file is `chmod 0o444` after every build, and a 

6pre-commit hook bans cross-module opens of the file without `mode=ro`. 

7 

8The only writer is `build_db()` in this same module, which opens its 

9own short-lived writable engine, populates the schema, runs ANALYZE 

10+ VACUUM, closes the engine, and chmods the file back to 0o444. 

11 

12The DB compiles five gzipped JSON snapshots (downloaded by 

13`journal_quality.downloader`) into one queryable file: 

14 

15- OpenAlex sources → `sources` table (with predatory + DOAJ flags) 

16- Stop Predatory Journals → `predatory_journals/_publishers/_hijacked` 

17- DOAJ → cross-referenced into `sources` 

18- JabRef abbreviations → `abbreviations` table 

19- OpenAlex Institutions → `institutions` table 

20 

21Built fresh on every download, no migrations. 

22""" 

23 

24from __future__ import annotations 

25 

26import gzip 

27import json 

28import os 

29import secrets 

30import sqlite3 

31import sys 

32import threading 

33import time 

34from contextlib import contextmanager 

35from pathlib import Path 

36from typing import Iterable, Iterator, Optional 

37 

38from loguru import logger 

39from sqlalchemy import create_engine, func, or_, select 

40from sqlalchemy.engine import Engine 

41from sqlalchemy.exc import DatabaseError, OperationalError 

42from sqlalchemy.orm import Session, sessionmaker 

43 

44from .models import ( 

45 Abbreviation, 

46 Institution, 

47 JournalQualityBase, 

48 PredatoryHijacked, 

49 PredatoryJournal, 

50 PredatoryPublisher, 

51 Source, 

52) 

53from ..constants import PREDATORY_WHITELIST_HINDEX 

54from ..utilities.citation_normalizer import normalize_issn 

55from .scoring import ( 

56 derive_quality_score, 

57 institution_score_from_h_index, 

58 normalize_name, 

59) 

60 

61DB_FILENAME = "journal_quality.db" 

62_BATCH_SIZE = 5000 

63 

64# Bump when the reference DB schema (models.py) changes in a way that 

65# requires a rebuild even if the upstream data version hasn't changed. 

66# Stamped as SQLite `PRAGMA user_version` during build_db; checked on 

67# _ensure_engine. Separate from JOURNAL_DATA_VERSION (downloader.py) 

68# which tracks upstream source-data freshness. 

69JOURNAL_QUALITY_SCHEMA_VERSION = 3 

70 

71# Quality tier → score range, used by the dashboard tier filter. 

72_TIER_RANGES = { 

73 "elite": (9, 10), 

74 "strong": (7, 8), 

75 "moderate": (5, 6), 

76 "low": (3, 4), 

77 "predatory": (1, 2), 

78} 

79 

80# Columns safe to use in ORDER BY (prevents injection via the dashboard 

81# `sort` query parameter). 

82_SORT_COLUMNS = frozenset( 

83 { 

84 "name", 

85 "quality", 

86 "quartile", 

87 "h_index", 

88 "impact_factor", 

89 "score_source", 

90 "source_type", 

91 "publisher", 

92 "is_predatory", 

93 } 

94) 

95 

96# Max length for user-supplied search strings. Even with LIKE 

97# wildcards escaped, a 10 KB pattern against 217K rows is slow enough 

98# to matter for CPU budget under concurrent requests. 

99_MAX_SEARCH_LEN = 100 

100 

101 

102def _escape_like(s: str) -> str: 

103 """Escape SQL LIKE metacharacters so user-supplied search strings 

104 can't force degenerate full-table scans via ``%`` or ``_``. 

105 

106 Pair with ``.like(pattern, escape="/")`` at the call site. 

107 """ 

108 return s.replace("/", "//").replace("%", "/%").replace("_", "/_") 

109 

110 

111# --------------------------------------------------------------------------- 

112# Read-only accessor 

113# --------------------------------------------------------------------------- 

114 

115 

116class JournalQualityDB: 

117 """Read-only SQLAlchemy 2.0 accessor for `journal_quality.db`. 

118 

119 All filter hot-path methods return plain dicts (not mapped Source 

120 objects) so call sites in `journal_reputation_filter.py` keep the 

121 same call shape they had against the dict-based predecessor. The 

122 dashboard methods can return either dicts or Source instances — 

123 they're called once per page view so the ORM overhead is fine. 

124 """ 

125 

126 def __init__(self) -> None: 

127 self._engine: Optional[Engine] = None 

128 self._SessionLocal: Optional[sessionmaker[Session]] = None 

129 # RLock (not Lock): _ensure_engine holds the lock while calling 

130 # _build_or_raise → build_db → reset_db → self.reset(), which 

131 # re-acquires the same lock. A non-reentrant Lock would deadlock 

132 # the very first request on a fresh install. 

133 self._lock = threading.RLock() 

134 # Whether we've already logged a stale-data-version warning for 

135 # this engine lifetime. Prevents log spam — one WARNING per 

136 # server start is enough to surface the problem to admins. 

137 self._stale_version_warned = False 

138 

139 # --- engine + session lifecycle --- 

140 

141 def _resolve_db_path(self) -> Path: 

142 from ..config.paths import get_journal_data_directory 

143 

144 return get_journal_data_directory() / DB_FILENAME 

145 

146 def _ensure_engine(self) -> None: 

147 # Acquire lock BEFORE first read to avoid DCLP publication hazard. 

148 # With the GIL this is safe on CPython but explicit locking makes 

149 # the happens-before relationship clear and portable. 

150 with self._lock: 

151 if self._engine is not None: 

152 return 

153 path = self._resolve_db_path() 

154 if not path.exists(): 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 self._build_or_raise(path) 

156 else: 

157 # Validate existing file before wiring up the read-only 

158 # engine. Catches two failure modes at open time instead 

159 # of letting them propagate to first query: 

160 # 1. Schema drift — ORM changed since this file was 

161 # built (PRAGMA user_version mismatch) → rebuild. 

162 # 2. Corruption — file exists but isn't a valid DB 

163 # (truncated build, disk error) → rebuild. 

164 if not self._validate_existing_db(path): 164 ↛ 175line 164 didn't jump to line 175 because the condition on line 164 was always true

165 self._build_or_raise(path) 

166 

167 # mode=ro + immutable=1: SQLite physically refuses writes, 

168 # skips locking entirely, and reads via mmap. The OS page 

169 # cache holds one shared resident copy of the hot pages. 

170 # 

171 # Use a creator callback because SQLAlchemy's URL parser 

172 # eats the ?mode=ro&immutable=1 query string before it can 

173 # reach sqlite3. The creator builds the connection directly 

174 # with the SQLite URI flags intact. 

175 def _make_ro_conn() -> sqlite3.Connection: 

176 return sqlite3.connect( 

177 f"file:{path}?mode=ro&immutable=1", 

178 uri=True, 

179 check_same_thread=False, 

180 ) 

181 

182 # StaticPool: with immutable=1 SQLite skips locking and the 

183 # OS page cache handles concurrency. A single shared connection 

184 # is safe and avoids the default QueuePool's 15-connection 

185 # footprint that offers no benefit for immutable reads. 

186 from sqlalchemy.pool import StaticPool 

187 

188 engine = create_engine( 

189 "sqlite://", 

190 creator=_make_ro_conn, 

191 poolclass=StaticPool, 

192 echo=False, 

193 ) 

194 session_local = sessionmaker(bind=engine, expire_on_commit=False) 

195 # Publish both together so readers never see engine-without-session. 

196 self._engine = engine 

197 self._SessionLocal = session_local 

198 logger.info(f"Opened journal_quality.db (read-only): {path}") 

199 # One-shot check: is the DATA version (the one the sources 

200 # JSON + build logic produce) behind the bundled latest? 

201 # Schema drift is already handled by `_validate_existing_db` 

202 # via ``PRAGMA user_version``. A data-version mismatch is a 

203 # different concern: the DB schema is fine, but the scoring 

204 # logic (e.g. the repository cap) or source snapshots have 

205 # been updated since this file was built. The hot path 

206 # (filter scoring) would silently serve stale scores if we 

207 # didn't surface the mismatch anywhere but the admin 

208 # dashboard. Log once, don't auto-rebuild — user consent 

209 # via the dashboard "Download Data" button remains the 

210 # explicit refresh trigger. 

211 self._warn_on_stale_data_version(path.parent) 

212 

213 def _warn_on_stale_data_version(self, data_dir: Path) -> None: 

214 """Log WARNING once if ``version.json`` is behind ``JOURNAL_DATA_VERSION``.""" 

215 if self._stale_version_warned: 

216 return 

217 # Lazy import to avoid any downloader → db cycle even though 

218 # today's module graph doesn't have one. 

219 from .downloader import JOURNAL_DATA_VERSION 

220 

221 version_file = data_dir / "version.json" 

222 if not version_file.exists(): 

223 return # Brand-new install — the dashboard's banner handles this. 

224 try: 

225 with open(version_file, encoding="utf-8") as f: 

226 info = json.load(f) 

227 installed = info.get("version") 

228 except (json.JSONDecodeError, OSError): 

229 return # Malformed — dashboard's banner surfaces; don't double-log. 

230 if installed and installed != JOURNAL_DATA_VERSION: 

231 logger.warning( 

232 f"journal_quality data version is stale: on-disk={installed!r} " 

233 f"bundled-latest={JOURNAL_DATA_VERSION!r}. " 

234 f"Scoring is continuing with the older data. Visit " 

235 f"/metrics/journals and click 'Download Data' to refresh." 

236 ) 

237 self._stale_version_warned = True 

238 

239 def _validate_existing_db(self, path: Path) -> bool: 

240 """Return True if the existing DB file is usable as-is. 

241 

242 A version of 0 means the file was built before schema stamping 

243 existed and is grandfathered in — we don't force a rebuild just 

244 because the stamp is missing. A non-zero version that doesn't 

245 match the current schema is a real drift signal and triggers a 

246 rebuild. File-open errors also trigger a rebuild. 

247 """ 

248 from ..utilities.resource_utils import safe_close 

249 

250 conn: Optional[sqlite3.Connection] = None 

251 try: 

252 conn = sqlite3.connect(f"file:{path}?mode=ro", uri=True) 

253 version = conn.execute("PRAGMA user_version").fetchone()[0] 

254 if version != 0 and version != JOURNAL_QUALITY_SCHEMA_VERSION: 

255 logger.warning( 

256 f"journal_quality.db schema_version={version}, " 

257 f"expected {JOURNAL_QUALITY_SCHEMA_VERSION}" 

258 f"rebuilding" 

259 ) 

260 # NB: no explicit safe_close here — the finally block 

261 # handles closing. Calling it twice produced a spurious 

262 # "Cannot operate on a closed database" warning on 

263 # every schema-triggered rebuild. 

264 self._unlink_unusable_db(path) 

265 return False 

266 # Cheap sanity check — confirms the file is a valid DB. 

267 conn.execute("SELECT 1 FROM sqlite_master LIMIT 1").fetchone() 

268 return True 

269 except (sqlite3.DatabaseError, OSError): 

270 logger.exception( 

271 f"journal_quality.db at {path} is unusable; rebuilding" 

272 ) 

273 self._unlink_unusable_db(path) 

274 return False 

275 finally: 

276 if conn is not None: 

277 safe_close(conn, "journal_quality validate") 

278 

279 @staticmethod 

280 def _unlink_unusable_db(path: Path) -> None: 

281 """Best-effort cleanup of a corrupted / schema-drifted DB file. 

282 

283 Corruption was already logged by the caller (``_validate_existing_db``). 

284 Both operations below are best-effort — on failure we *log and 

285 continue* rather than raise, because the build path will rebuild 

286 the file regardless. But we don't silence: if chmod / unlink 

287 fails (permissions, read-only mount, file held open on Windows) 

288 the next build will likely also fail and the user needs the 

289 warning to diagnose the real problem. 

290 """ 

291 try: 

292 # bearer:disable python_lang_file_permissions 

293 os.chmod(path, 0o644) 

294 except OSError: 

295 logger.warning( 

296 f"Could not chmod 0644 on unusable DB {path} before " 

297 f"unlink (continuing to unlink attempt)" 

298 ) 

299 try: 

300 path.unlink() 

301 except OSError: 

302 logger.warning( 

303 f"Could not unlink unusable DB {path} (will be " 

304 f"overwritten on next build)" 

305 ) 

306 

307 def _build_or_raise(self, path: Path) -> None: 

308 """Lazy-build the DB on first access if it's missing.""" 

309 from .downloader import ensure_journal_data 

310 

311 data_dir, available = ensure_journal_data() 

312 if not available: 

313 raise FileNotFoundError( 

314 "Journal data files not available. " 

315 "Check your network connection or download manually " 

316 "from the dashboard." 

317 ) 

318 logger.info(f"Building {DB_FILENAME} from data files...") 

319 build_db(data_dir=data_dir, output_path=path) 

320 

321 @contextmanager 

322 def session(self) -> Iterator[Session]: 

323 """Yield a read-only SQLAlchemy session. 

324 

325 If the underlying file becomes corrupt mid-session (e.g. a 

326 rebuild ran and this engine is pointed at a now-unlinked inode), 

327 DatabaseError propagates but we drop the cached engine so the 

328 next call rebuilds cleanly instead of failing forever. 

329 """ 

330 self._ensure_engine() 

331 if self._SessionLocal is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise RuntimeError("JournalQualityDB engine failed to initialize") 

333 from ..utilities.resource_utils import safe_close 

334 

335 s = self._SessionLocal() 

336 try: 

337 yield s 

338 except (OperationalError, DatabaseError): 

339 logger.exception("journal_quality.db error — resetting engine") 

340 safe_close(s, "journal_quality session") 

341 self.reset() 

342 raise 

343 else: 

344 safe_close(s, "journal_quality session") 

345 

346 @property 

347 def available(self) -> bool: 

348 try: 

349 self._ensure_engine() 

350 return True 

351 except FileNotFoundError: 

352 return False 

353 

354 def reset(self) -> None: 

355 """Drop the cached engine — call after `build_db` rebuilds the file.""" 

356 with self._lock: 

357 if self._engine is not None: 357 ↛ exitline 357 didn't jump to the function exit

358 self._engine.dispose() 

359 self._engine = None 

360 self._SessionLocal = None 

361 logger.info("Reset journal_quality.db engine") 

362 

363 # --- filter hot path: return plain dicts --- 

364 

365 def lookup_openalex( 

366 self, 

367 *, 

368 source_id: Optional[str] = None, 

369 issn: Optional[str] = None, 

370 name: Optional[str] = None, 

371 ) -> Optional[dict]: 

372 """Look up a source by OpenAlex ID, ISSN, or name. 

373 

374 Returns a dict with the same shape the dict-based predecessor 

375 produced (`name`, `type`, `h_index`, `impact_factor`, 

376 `is_in_doaj`, `publisher`, `issn_l`) so the filter code at 

377 `journal_reputation_filter.py` doesn't need to change. 

378 """ 

379 issn = normalize_issn(issn) 

380 try: 

381 self._ensure_engine() 

382 except FileNotFoundError: 

383 return None 

384 with self.session() as s: 

385 row = self._lookup_source_row(s, source_id, issn, name) 

386 return _source_to_lookup_dict(row) if row else None 

387 

388 # Alias used by the dashboard / future call sites 

389 lookup_source = lookup_openalex 

390 

391 def count_predatory_by_names(self, names: Iterable[str]) -> int: 

392 """Count how many of the given journal names are flagged predatory. 

393 

394 One SQL round-trip using ``WHERE name_lower IN (…) AND is_predatory = TRUE``. 

395 Names are normalized (NFKC + lower + strip) so the caller can pass raw 

396 display names; matches the normalization used at build time. 

397 

398 Used by the per-user metrics dashboard to report a global "N predatory 

399 journals across all your research" stat without making N round trips 

400 to the reference DB. Returns 0 if the reference DB is missing or if 

401 ``names`` is empty. 

402 

403 .. note:: 

404 Deliberately unchunked. SQLite's ``SQLITE_MAX_VARIABLE_NUMBER`` 

405 has been 250,000 since SQLite 3.32 (2020); Python 3.11 ships 

406 with 3.45.1. A heavy user with 100k distinct container_titles 

407 is still well under the limit. Re-confirmed in the PR #3081 

408 audit — no chunking needed. 

409 """ 

410 normed = {normalize_name(n) for n in names if n} 

411 normed.discard("") 

412 if not normed: 

413 return 0 

414 try: 

415 self._ensure_engine() 

416 except FileNotFoundError: 

417 return 0 

418 with self.session() as s: 

419 stmt = select(func.count(Source.id)).where( 

420 Source.name_lower.in_(normed), 

421 Source.is_predatory.is_(True), 

422 ) 

423 result = s.execute(stmt).scalar() 

424 return int(result or 0) 

425 

426 def lookup_sources_batch(self, names: Iterable[str]) -> dict: 

427 """Batch-look-up multiple journal names in one query. 

428 

429 Takes an iterable of raw display names and returns a 

430 ``{normalized_name: dashboard_dict}`` map for every name that 

431 matched a Source. Names that didn't match are simply absent 

432 from the result (caller decides how to handle misses). 

433 

434 Dashboard hot path: the ``/api/journals/user-research`` endpoint 

435 collects up to 200 unique ``container_title`` values from the 

436 user's Papers and hands them in here — one SQL round-trip vs. 

437 200 per-row lookups. 

438 

439 Normalization matches ``normalize_name`` (NFKC + lower + strip) 

440 so the reference DB's ``name_lower`` column hits directly. No 

441 "the " / "proceedings of" fallback tiers — those live in 

442 ``_lookup_source_row`` for precision per-call; the batch path 

443 is for dashboard display where a miss is acceptable. 

444 

445 Chunked at 900 params per chunk. Defensive: SQLite's actual 

446 limit (``SQLITE_MAX_VARIABLE_NUMBER``) has been 250,000 since 

447 3.32 (2020) — we could easily put the whole batch in one IN — 

448 but 900 keeps us well under any older embedded-SQLite ceiling 

449 a deployment might pin to. 

450 """ 

451 normed = [normalize_name(n) for n in names if n] 

452 normed = [n for n in normed if n] 

453 if not normed: 

454 return {} 

455 try: 

456 self._ensure_engine() 

457 except FileNotFoundError: 

458 return {} 

459 # De-duplicate while preserving insertion order for stable 

460 # iteration in tests. 

461 seen: set = set() 

462 uniq: list = [] 

463 for n in normed: 

464 if n not in seen: 

465 seen.add(n) 

466 uniq.append(n) 

467 

468 out: dict = {} 

469 CHUNK = 900 

470 with self.session() as s: 

471 for i in range(0, len(uniq), CHUNK): 

472 batch = uniq[i : i + CHUNK] 

473 stmt = select(Source).where(Source.name_lower.in_(batch)) 

474 for row in s.scalars(stmt): 

475 out[row.name_lower] = _source_to_dashboard_dict(row) 

476 return out 

477 

478 def lookup_doaj(self, *, issn: Optional[str] = None) -> Optional[dict]: 

479 issn = normalize_issn(issn) 

480 if not issn: 

481 return None 

482 try: 

483 self._ensure_engine() 

484 except FileNotFoundError: 

485 return None 

486 with self.session() as s: 

487 stmt = ( 

488 select(Source) 

489 .where(Source.issn == issn, Source.is_in_doaj.is_(True)) 

490 .limit(1) 

491 ) 

492 row = s.scalars(stmt).first() 

493 if row is None: 

494 return None 

495 return { 

496 "name": row.name, 

497 "has_seal": row.has_doaj_seal, 

498 "publisher": row.publisher, 

499 } 

500 

501 def is_in_doaj(self, issn: Optional[str]) -> bool: 

502 return self.lookup_doaj(issn=issn) is not None 

503 

504 def has_doaj_seal(self, issn: Optional[str]) -> bool: 

505 entry = self.lookup_doaj(issn=issn) 

506 return bool(entry and entry.get("has_seal")) 

507 

508 def is_predatory( 

509 self, 

510 *, 

511 journal_name: Optional[str] = None, 

512 publisher_name: Optional[str] = None, 

513 ) -> tuple[bool, Optional[str]]: 

514 """Check if a journal/publisher is on the predatory list. 

515 

516 Looks up the dedicated predatory tables (NOT just the 

517 `is_predatory` flag on `Source`), so checks work for arbitrary 

518 input names that aren't in OpenAlex. 

519 """ 

520 try: 

521 self._ensure_engine() 

522 except FileNotFoundError: 

523 return False, None 

524 with self.session() as s: 

525 if journal_name: 525 ↛ 532line 525 didn't jump to line 532 because the condition on line 525 was always true

526 norm = normalize_name(journal_name) 

527 if s.get(PredatoryJournal, norm) is not None: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true

528 return True, "stop-predatory-journals" 

529 if s.get(PredatoryHijacked, norm) is not None: 529 ↛ 530line 529 didn't jump to line 530 because the condition on line 529 was never true

530 return True, "stop-predatory-hijacked" 

531 

532 if publisher_name: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 pub_norm = normalize_name(publisher_name) 

534 if s.get(PredatoryPublisher, pub_norm) is not None: 

535 return True, "stop-predatory-publishers" 

536 # Substring scan over long entries (~1162 rows) 

537 stmt = select(PredatoryPublisher.name_lower).where( 

538 PredatoryPublisher.is_long.is_(True) 

539 ) 

540 for (entry,) in s.execute(stmt).all(): 

541 if pub_norm in entry or entry in pub_norm: 

542 return True, "stop-predatory-publishers" 

543 

544 return False, None 

545 

546 def is_whitelisted( 

547 self, 

548 *, 

549 issn: Optional[str] = None, 

550 name: Optional[str] = None, 

551 ) -> bool: 

552 if self.is_in_doaj(issn): 

553 return True 

554 oa = self.lookup_openalex(issn=issn, name=name) 

555 if oa and (oa.get("h_index") or 0) > PREDATORY_WHITELIST_HINDEX: 

556 return True 

557 return False 

558 

559 def lookup_institution( 

560 self, 

561 *, 

562 ror_id: Optional[str] = None, 

563 openalex_id: Optional[str] = None, 

564 name: Optional[str] = None, 

565 ) -> Optional[dict]: 

566 """Look up an institution. 

567 

568 Order: openalex_id → ror → name. Returns a dict with full-name 

569 keys (``name``, ``country``, ``type``, ``h_index``, 

570 ``impact_factor``, ``works_count``, ``cited_by_count``, ``ror_id``) 

571 or ``None`` if no match. The on-disk snapshot uses one-character 

572 keys (``n``, ``c``, etc.) for space efficiency; the accessor 

573 returns full names instead for legibility and schema robustness. 

574 """ 

575 try: 

576 self._ensure_engine() 

577 except FileNotFoundError: 

578 return None 

579 with self.session() as s: 

580 row: Optional[Institution] = None 

581 

582 if openalex_id: 

583 sid = openalex_id.split("/")[-1] 

584 row = s.get(Institution, sid) 

585 

586 if row is None and ror_id: 

587 ror = ror_id.rstrip("/").split("/")[-1] 

588 stmt = ( 

589 select(Institution) 

590 .where(Institution.ror_id == ror) 

591 .limit(1) 

592 ) 

593 row = s.scalars(stmt).first() 

594 

595 if row is None and name: 

596 norm = normalize_name(name) 

597 stmt = ( 

598 select(Institution) 

599 .where(Institution.name_lower == norm) 

600 .limit(1) 

601 ) 

602 row = s.scalars(stmt).first() 

603 

604 return _institution_to_dict(row) if row else None 

605 

606 def score_from_affiliations(self, affiliations: list) -> Optional[int]: 

607 """Derive a score from author affiliations in ONE SQL query.""" 

608 if not affiliations: 

609 return None 

610 

611 openalex_ids: list[str] = [] 

612 ror_ids: list[str] = [] 

613 names: list[str] = [] 

614 

615 for aff in affiliations: 

616 if isinstance(aff, str): 

617 names.append(normalize_name(aff)) 

618 elif isinstance(aff, dict): 

619 if oid := (aff.get("openalex_id") or aff.get("id")): 

620 openalex_ids.append(oid.split("/")[-1]) 

621 if rid := aff.get("ror"): 

622 ror_ids.append(rid.rstrip("/").split("/")[-1]) 

623 if nm := aff.get("name"): 

624 names.append(normalize_name(nm)) 

625 

626 if not (openalex_ids or ror_ids or names): 

627 return None 

628 

629 try: 

630 self._ensure_engine() 

631 except FileNotFoundError: 

632 return None 

633 

634 clauses = [] 

635 if openalex_ids: 

636 clauses.append(Institution.openalex_id.in_(openalex_ids)) 

637 if ror_ids: 

638 clauses.append(Institution.ror_id.in_(ror_ids)) 

639 if names: 

640 clauses.append(Institution.name_lower.in_(names)) 

641 

642 with self.session() as s: 

643 stmt = select(func.max(Institution.h_index)).where( 

644 or_(*clauses), Institution.h_index.is_not(None) 

645 ) 

646 best_h = s.scalar(stmt) 

647 

648 # Single source of truth for institution scoring lives in 

649 # scoring.py — delegate so the build phase, the runtime filter, 

650 # and this affiliation-salvage path can never disagree. 

651 return institution_score_from_h_index(best_h) 

652 

653 # Static passthrough so the filter can call dm.derive_quality_score(...) 

654 # without importing from .scoring directly. Single home for the 

655 # scoring rules in scoring.py. 

656 derive_quality_score = staticmethod(derive_quality_score) 

657 

658 def expand_abbreviation(self, name: str) -> Optional[str]: 

659 if not name: 659 ↛ 660line 659 didn't jump to line 660 because the condition on line 659 was never true

660 return None 

661 try: 

662 self._ensure_engine() 

663 except FileNotFoundError: 

664 return None 

665 normalized = normalize_name(name) 

666 with self.session() as s: 

667 row = s.get(Abbreviation, normalized) 

668 if row is not None: 

669 return row.full_name 

670 no_dots = normalized.replace(".", "").strip() 

671 if no_dots != normalized: 

672 row = s.get(Abbreviation, no_dots) 

673 if row is not None: 

674 return row.full_name 

675 return None 

676 

677 # --- internal source lookup with name fallbacks --- 

678 

679 def _lookup_source_row( 

680 self, 

681 s: Session, 

682 source_id: Optional[str], 

683 issn: Optional[str], 

684 name: Optional[str], 

685 ) -> Optional[Source]: 

686 if source_id: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true

687 sid = source_id.split("/")[-1] if "/" in source_id else source_id 

688 stmt = ( 

689 select(Source).where(Source.openalex_source_id == sid).limit(1) 

690 ) 

691 row = s.scalars(stmt).first() 

692 if row is not None: 

693 return row 

694 

695 if issn: 695 ↛ 696line 695 didn't jump to line 696 because the condition on line 695 was never true

696 stmt = select(Source).where(Source.issn == issn).limit(1) 

697 row = s.scalars(stmt).first() 

698 if row is not None: 

699 return row 

700 

701 if name: 701 ↛ 751line 701 didn't jump to line 751 because the condition on line 701 was always true

702 norm = normalize_name(name) 

703 row = self._fetch_by_name_lower(s, norm) 

704 if row is not None: 704 ↛ 705line 704 didn't jump to line 705 because the condition on line 704 was never true

705 return row 

706 # Try with/without "the " prefix (~5K journals have it) 

707 if norm.startswith("the "): 707 ↛ 708line 707 didn't jump to line 708 because the condition on line 707 was never true

708 row = self._fetch_by_name_lower(s, norm[4:]) 

709 else: 

710 row = self._fetch_by_name_lower(s, "the " + norm) 

711 if row is not None: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true

712 return row 

713 # Strip "proceedings of (the) (conference on) " prefix 

714 stripped = norm 

715 for prefix in ( 

716 "proceedings of the conference on ", 

717 "proceedings of the ", 

718 "proceedings of ", 

719 ): 

720 if stripped.startswith(prefix): 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 stripped = stripped[len(prefix) :] 

722 break 

723 if stripped != norm: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true

724 row = self._fetch_by_name_lower(s, stripped) 

725 if row is not None: 

726 return row 

727 

728 # MEDLINE-style "Title : long subtitle" — try the segment 

729 # before the colon. Catches PubMed names like 

730 # "Molecular therapy : the journal of the American Society..." 

731 # → "Molecular therapy" 

732 if " : " in norm: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 head = norm.split(" : ", 1)[0].strip() 

734 if head and head != norm: 

735 row = self._fetch_by_name_lower(s, head) 

736 if row is not None: 

737 return row 

738 

739 # MEDLINE-style "Title. Section name" — try the segment 

740 # before the first period. Catches PubMed names like 

741 # "Molecular therapy. Methods and clinical development" 

742 # but only when the head is meaningfully shorter (we don't 

743 # want to match "Nat" from "Nat. Commun."). 

744 if "." in norm: 744 ↛ 745line 744 didn't jump to line 745 because the condition on line 744 was never true

745 head = norm.split(".", 1)[0].strip() 

746 if head and len(head) >= 6 and head != norm: 

747 row = self._fetch_by_name_lower(s, head) 

748 if row is not None: 

749 return row 

750 

751 return None 

752 

753 @staticmethod 

754 def _fetch_by_name_lower(s: Session, name_lower: str) -> Optional[Source]: 

755 stmt = select(Source).where(Source.name_lower == name_lower).limit(1) 

756 return s.scalars(stmt).first() 

757 

758 # --- dashboard queries --- 

759 

760 def get_summary(self) -> dict: 

761 if not self.available: 

762 return { 

763 "total": 0, 

764 "avg_quality": 0, 

765 "avg_h_index": None, 

766 "predatory_count": 0, 

767 "doaj_count": 0, 

768 "seal_count": 0, 

769 "llm_count": 0, 

770 } 

771 

772 with self.session() as s: 

773 row = s.execute( 

774 select( 

775 func.count().label("total"), 

776 func.round(func.avg(Source.quality), 1).label( 

777 "avg_quality" 

778 ), 

779 func.round(func.avg(Source.h_index)).label("avg_h_index"), 

780 func.sum(func.iif(Source.is_predatory, 1, 0)).label( 

781 "predatory_count" 

782 ), 

783 func.sum(func.iif(Source.is_in_doaj, 1, 0)).label( 

784 "doaj_count" 

785 ), 

786 func.sum(func.iif(Source.has_doaj_seal, 1, 0)).label( 

787 "seal_count" 

788 ), 

789 func.sum( 

790 func.iif(Source.score_source == "llm", 1, 0) 

791 ).label("llm_count"), 

792 ) 

793 ).first() 

794 return dict(row._mapping) if row else {} 

795 

796 def get_quality_distribution(self) -> dict[str, int]: 

797 if not self.available: 

798 return {} 

799 with self.session() as s: 

800 rows = s.execute( 

801 select(Source.quality, func.count().label("cnt")) 

802 .where(Source.quality.is_not(None)) 

803 .group_by(Source.quality) 

804 .order_by(Source.quality) 

805 ).all() 

806 return {str(q): c for q, c in rows} 

807 

808 def get_source_distribution(self) -> dict[str, int]: 

809 if not self.available: 

810 return {} 

811 with self.session() as s: 

812 rows = s.execute( 

813 select( 

814 func.coalesce(Source.score_source, "unknown").label("src"), 

815 func.count().label("cnt"), 

816 ).group_by(Source.score_source) 

817 ).all() 

818 return {row.src: row.cnt for row in rows} 

819 

820 def get_journals_page( 

821 self, 

822 *, 

823 page: int = 1, 

824 per_page: int = 50, 

825 search: str = "", 

826 tier: str = "", 

827 score_source: str = "", 

828 sort: str = "quality", 

829 order: str = "desc", 

830 ) -> tuple[list[dict], int]: 

831 if not self.available: 

832 return [], 0 

833 

834 if sort not in _SORT_COLUMNS: 

835 sort = "quality" 

836 if order not in ("asc", "desc"): 

837 order = "desc" 

838 

839 wheres: list = [] 

840 if search: 

841 needle = _escape_like(normalize_name(search)[:_MAX_SEARCH_LEN]) 

842 wheres.append(Source.name_lower.like(f"%{needle}%", escape="/")) 

843 if tier and tier in _TIER_RANGES: 

844 lo, hi = _TIER_RANGES[tier] 

845 wheres.append(Source.quality.between(lo, hi)) 

846 if score_source: 

847 wheres.append(Source.score_source == score_source) 

848 

849 sort_col = getattr(Source, sort) 

850 order_clause = ( 

851 sort_col.desc().nulls_last() 

852 if order == "desc" 

853 else sort_col.asc().nulls_last() 

854 ) 

855 

856 offset = (max(1, page) - 1) * per_page 

857 

858 with self.session() as s: 

859 total = ( 

860 s.scalar( 

861 select(func.count()).select_from(Source).where(*wheres) 

862 ) 

863 or 0 

864 ) 

865 rows = s.scalars( 

866 select(Source) 

867 .where(*wheres) 

868 .order_by(order_clause) 

869 .limit(per_page) 

870 .offset(offset) 

871 ).all() 

872 

873 return [_source_to_dashboard_dict(r) for r in rows], total 

874 

875 def get_institutions_page( 

876 self, 

877 *, 

878 page: int = 1, 

879 per_page: int = 50, 

880 search: str = "", 

881 sort: str = "h_index", 

882 order: str = "desc", 

883 ) -> tuple[list[dict], int]: 

884 if not self.available: 

885 return [], 0 

886 

887 # Defensive allowlist — matches the pattern in get_journals_page. 

888 # The ternary below is already safe (non-"desc" falls through to 

889 # .asc()), but the explicit check prevents future refactors from 

890 # accidentally interpolating a tainted value into SQL. 

891 if order not in ("asc", "desc"): 

892 order = "desc" 

893 

894 wheres = [] 

895 if search: 

896 needle = _escape_like(normalize_name(search)[:_MAX_SEARCH_LEN]) 

897 wheres.append( 

898 Institution.name_lower.like(f"%{needle}%", escape="/") 

899 ) 

900 

901 sort_col = ( 

902 Institution.h_index if sort == "h_index" else Institution.name 

903 ) 

904 order_clause = ( 

905 sort_col.desc().nulls_last() 

906 if order == "desc" 

907 else sort_col.asc().nulls_last() 

908 ) 

909 

910 offset = (max(1, page) - 1) * per_page 

911 

912 with self.session() as s: 

913 total = ( 

914 s.scalar( 

915 select(func.count()).select_from(Institution).where(*wheres) 

916 ) 

917 or 0 

918 ) 

919 rows = s.scalars( 

920 select(Institution) 

921 .where(*wheres) 

922 .order_by(order_clause) 

923 .limit(per_page) 

924 .offset(offset) 

925 ).all() 

926 

927 return [_institution_to_dashboard_dict(r) for r in rows], total 

928 

929 

930# --------------------------------------------------------------------------- 

931# Dict adapters — keep filter/dashboard call sites unchanged 

932# --------------------------------------------------------------------------- 

933 

934 

935def _source_to_lookup_dict(row: Source) -> dict: 

936 """Convert a Source row to the dict shape `lookup_openalex` produces. 

937 

938 Includes `openalex_source_id` so dashboard / test code can chain 

939 a follow-up `lookup_source(source_id=...)` call. Also exposes 

940 ``quartile`` so the filter can store it on the per-user Journal row 

941 and feed it into score derivation. 

942 """ 

943 return { 

944 "name": row.name, 

945 "type": row.source_type, 

946 "h_index": row.h_index, 

947 "impact_factor": row.impact_factor, 

948 "is_in_doaj": row.is_in_doaj, 

949 "publisher": row.publisher, 

950 "issn_l": row.issn, 

951 "openalex_source_id": row.openalex_source_id, 

952 "quartile": row.quartile, 

953 } 

954 

955 

956def _source_to_dashboard_dict(row: Source) -> dict: 

957 return { 

958 "name": row.name, 

959 "quality": row.quality, 

960 "quartile": row.quartile, 

961 "cited_by_count": row.cited_by_count, 

962 "h_index": row.h_index, 

963 "impact_factor": ( 

964 round(row.impact_factor, 2) if row.impact_factor else None 

965 ), 

966 "is_in_doaj": bool(row.is_in_doaj), 

967 "has_doaj_seal": bool(row.has_doaj_seal), 

968 "is_predatory": bool(row.is_predatory), 

969 "predatory_source": row.predatory_source, 

970 "score_source": row.score_source, 

971 "source_type": row.source_type, 

972 "publisher": row.publisher, 

973 "issn": row.issn, 

974 "openalex_source_id": row.openalex_source_id, 

975 } 

976 

977 

978def _institution_to_dict(row: Institution) -> dict: 

979 """Public accessor shape for `lookup_institution`. 

980 

981 The on-disk JSON snapshot uses one-character keys (``n``, ``c``, 

982 ``t``, …) purely for space efficiency — 200k institutions × seven 

983 long field names adds real bytes. Callers of the accessor don't 

984 care about on-disk layout, so here we return the full names to 

985 keep the public API legible and robust to future schema tweaks. 

986 """ 

987 return { 

988 "name": row.name, 

989 "country": row.country, 

990 "type": row.type, 

991 "h_index": row.h_index, 

992 "impact_factor": row.impact_factor, 

993 "works_count": row.works_count, 

994 "cited_by_count": row.cited_by_count, 

995 "ror_id": row.ror_id, 

996 } 

997 

998 

999def _institution_to_dashboard_dict(row: Institution) -> dict: 

1000 return { 

1001 "openalex_id": row.openalex_id, 

1002 "name": row.name, 

1003 "ror_id": row.ror_id, 

1004 "country": row.country, 

1005 "type": row.type, 

1006 "h_index": row.h_index, 

1007 "impact_factor": row.impact_factor, 

1008 "works_count": row.works_count, 

1009 "cited_by_count": row.cited_by_count, 

1010 } 

1011 

1012 

1013# --------------------------------------------------------------------------- 

1014# Module singleton 

1015# --------------------------------------------------------------------------- 

1016 

1017 

1018_db: Optional[JournalQualityDB] = None 

1019_db_lock = threading.Lock() 

1020 

1021 

1022def get_db() -> JournalQualityDB: 

1023 """Get or create the singleton `JournalQualityDB`.""" 

1024 global _db 

1025 if _db is None: 

1026 with _db_lock: 

1027 if _db is None: 1027 ↛ 1029line 1027 didn't jump to line 1029

1028 _db = JournalQualityDB() 

1029 return _db 

1030 

1031 

1032# Backwards-compat aliases used by metrics_routes.py and a couple of tests 

1033get_journal_reference_db = get_db 

1034JournalReferenceDB = JournalQualityDB 

1035 

1036 

1037def reset_db() -> None: 

1038 """Reset the cached engine after a build_db rebuild. 

1039 

1040 Held under `_db_lock` so a concurrent `get_db()` call can't see a 

1041 half-disposed singleton — without the lock, Thread B could pass 

1042 the `if _db is None` check in `get_db()` while Thread A is still 

1043 inside `_db.reset()`, then call `_ensure_engine()` which short- 

1044 circuits on the still-set `_engine` and hands back a disposed 

1045 pool. The lock makes the read-then-reset pair atomic with respect 

1046 to `get_db()`'s lazy-init path. 

1047 """ 

1048 global _db 

1049 with _db_lock: 

1050 if _db is not None: 

1051 _db.reset() 

1052 

1053 

1054# --------------------------------------------------------------------------- 

1055# Build phase — the ONLY writer 

1056# --------------------------------------------------------------------------- 

1057 

1058 

1059def build_db( 

1060 data_dir: Optional[Path] = None, 

1061 output_path: Optional[Path] = None, 

1062) -> None: 

1063 """Compile `journal_quality.db` from the gzipped JSON sources. 

1064 

1065 Opens a SHORT-LIVED writable engine, creates the schema, populates 

1066 every table from the gz files, runs ANALYZE + VACUUM, closes the 

1067 engine, then `chmod 0o444` the file. 

1068 """ 

1069 start = time.time() 

1070 

1071 if data_dir is None: 

1072 from ..config.paths import get_journal_data_directory 

1073 

1074 data_dir = get_journal_data_directory() 

1075 if output_path is None: 

1076 output_path = data_dir / DB_FILENAME 

1077 

1078 logger.info( 

1079 "Building journal quality reference DB (one-time, " 

1080 "~30s, decompresses ~25 MB of bundled data)…" 

1081 ) 

1082 

1083 # Sweep stale temp files from prior crashed builds so they don't 

1084 # accumulate. Any .tmp-* older than 1h is assumed dead. 

1085 _sweep_stale_tmp_files(output_path.parent, output_path.name) 

1086 

1087 # Build into a unique temp path, then os.replace() atomically at 

1088 # the end. A random suffix (not a fixed .tmp) lets concurrent 

1089 # builders write to separate files instead of racing on the same 

1090 # path — os.replace picks a winner atomically and neither corrupts 

1091 # the live file. 

1092 tmp_path = output_path.with_name( 

1093 f"{output_path.name}.tmp-{os.getpid()}-{secrets.token_hex(4)}" 

1094 ) 

1095 

1096 write_url = f"sqlite:///{tmp_path}" 

1097 engine = create_engine(write_url, connect_args={"check_same_thread": False}) 

1098 

1099 try: 

1100 # Pragmas for fast bulk insert. `journal_mode = OFF` plus 

1101 # `synchronous = OFF` is deliberately unsafe for general use but 

1102 # correct here because durability is guaranteed by the temp-file 

1103 # + os.replace() pattern around this block: we write to a unique 

1104 # `.tmp-PID-RAND` path, and on any crash mid-build the incomplete 

1105 # temp file is orphaned (and swept by `_sweep_stale_tmp_files()` 

1106 # on the next build). The live file is only ever moved into place 

1107 # by the atomic `os.replace()` at the bottom of this function — 

1108 # it never sees a partial write. Do NOT copy this pragma set 

1109 # elsewhere without the same atomic rename discipline. 

1110 with engine.connect() as conn: 

1111 conn.exec_driver_sql("PRAGMA journal_mode = OFF") 

1112 conn.exec_driver_sql("PRAGMA synchronous = OFF") 

1113 conn.exec_driver_sql("PRAGMA cache_size = -64000") 

1114 conn.exec_driver_sql("PRAGMA page_size = 4096") 

1115 

1116 JournalQualityBase.metadata.create_all(engine) 

1117 

1118 SessionWrite = sessionmaker(bind=engine) 

1119 with SessionWrite() as session: 

1120 sources = _load_openalex(data_dir) 

1121 doaj_data = _load_doaj(data_dir) 

1122 pred_data = _load_predatory(data_dir) 

1123 institutions = _load_institutions(data_dir) 

1124 abbreviations = _load_abbreviations(data_dir) 

1125 

1126 _populate_predatory(session, pred_data) 

1127 _populate_sources(session, sources, doaj_data, pred_data) 

1128 _populate_institutions(session, institutions) 

1129 _populate_abbreviations(session, abbreviations) 

1130 session.commit() 

1131 

1132 with engine.connect() as conn: 

1133 conn.exec_driver_sql("ANALYZE") 

1134 conn.exec_driver_sql("VACUUM") 

1135 # Stamp schema version so _ensure_engine can detect drift 

1136 # without depending on the external version.json. 

1137 conn.exec_driver_sql( 

1138 f"PRAGMA user_version = {JOURNAL_QUALITY_SCHEMA_VERSION}" 

1139 ) 

1140 except Exception: 

1141 engine.dispose() 

1142 if tmp_path.exists(): 

1143 try: 

1144 # bearer:disable python_lang_file_permissions 

1145 os.chmod(tmp_path, 0o644) 

1146 tmp_path.unlink() 

1147 except OSError: 

1148 logger.exception(f"Failed to clean up tmp DB at {tmp_path}") 

1149 raise 

1150 

1151 engine.dispose() 

1152 

1153 # Atomically swap tmp into place. os.replace is atomic on POSIX and 

1154 # overwrites an existing output_path if present. 

1155 if output_path.exists(): 

1156 # Prior file is chmod 0444 from the previous build — relax it 

1157 # so os.replace can overwrite. Best-effort: if chmod fails 

1158 # (e.g. read-only mount), os.replace will raise and surface 

1159 # the real problem. Log so the cause is visible. 

1160 try: 

1161 # bearer:disable python_lang_file_permissions 

1162 os.chmod(output_path, 0o644) 

1163 except OSError: 

1164 logger.warning( 

1165 f"Could not chmod 0644 on existing {output_path} before " 

1166 f"os.replace; if the replace fails this is likely why" 

1167 ) 

1168 os.replace(tmp_path, output_path) 

1169 

1170 # OS-level read-only flag — third layer of write protection. 

1171 # POSIX chmod is a no-op on Windows, so we also set the Windows 

1172 # read-only file attribute via SetFileAttributesW. The pre-commit 

1173 # hook check-journal-quality-readonly.py remains the primary 

1174 # defense against accidental writable opens. 

1175 # bearer:disable python_lang_file_permissions 

1176 os.chmod(output_path, 0o444) 

1177 if sys.platform == "win32": 

1178 try: 

1179 import ctypes 

1180 

1181 # FILE_ATTRIBUTE_READONLY = 0x1 

1182 ok = ctypes.windll.kernel32.SetFileAttributesW( 

1183 str(output_path), 0x1 

1184 ) 

1185 if not ok: 

1186 logger.warning( 

1187 f"SetFileAttributesW failed on {output_path.name}; " 

1188 "readonly pre-commit hook is the sole defense." 

1189 ) 

1190 except Exception: 

1191 logger.warning( 

1192 f"Could not set Windows readonly attribute on " 

1193 f"{output_path.name}" 

1194 ) 

1195 

1196 elapsed = time.time() - start 

1197 size_mb = output_path.stat().st_size / (1024 * 1024) 

1198 with sqlite3.connect( 

1199 f"file:{output_path}?mode=ro&immutable=1", uri=True 

1200 ) as _count_conn: 

1201 source_count = _count_conn.execute( 

1202 "SELECT COUNT(*) FROM sources" 

1203 ).fetchone()[0] 

1204 logger.info( 

1205 f"Journal quality DB ready: {source_count} sources, " 

1206 f"{size_mb:.1f} MB in {elapsed:.1f}s ({output_path.name}, chmod 0o444)" 

1207 ) 

1208 

1209 reset_db() 

1210 

1211 

1212def _sweep_stale_tmp_files(directory: Path, base_name: str) -> None: 

1213 """Remove journal_quality.db.tmp-* files older than 1h. 

1214 

1215 Per-file OSError (vanished between glob+stat, no permission, etc.) 

1216 is logged at debug — the sweep is best-effort and shouldn't stop 

1217 the build, but silent-pass on filesystem errors hides the cause of 

1218 accumulating stale tmp files that would otherwise eat disk over 

1219 time. 

1220 """ 

1221 if not directory.exists(): 

1222 return 

1223 cutoff = time.time() - 3600 

1224 for tmp in directory.glob(f"{base_name}.tmp-*"): 

1225 try: 

1226 if tmp.stat().st_mtime < cutoff: 

1227 tmp.unlink() 

1228 logger.info(f"Swept stale temp build file: {tmp.name}") 

1229 except OSError: 

1230 logger.debug(f"Could not sweep stale tmp file {tmp.name}") 

1231 

1232 

1233# --------------------------------------------------------------------------- 

1234# Source-data loaders (used by build_db only) 

1235# --------------------------------------------------------------------------- 

1236 

1237 

1238def _load_openalex(data_dir: Path) -> dict: 

1239 path = data_dir / "openalex_sources.json.gz" 

1240 if not path.exists(): 

1241 raise FileNotFoundError(f"OpenAlex source file not found: {path}") 

1242 with gzip.open(path, "rt", encoding="utf-8") as f: 

1243 data = json.load(f) 

1244 sources = data.get("s", data.get("sources", {})) 

1245 logger.info(f"Loaded {len(sources)} OpenAlex sources") 

1246 return dict(sources) 

1247 

1248 

1249def _load_doaj(data_dir: Path) -> dict: 

1250 path = data_dir / "doaj_journals.json" 

1251 if not path.exists(): 

1252 logger.warning(f"{path} not found — DOAJ cross-ref will be skipped") 

1253 return {} 

1254 with open(path, encoding="utf-8") as f: 

1255 data = json.load(f) 

1256 journals = data.get("journals", {}) 

1257 logger.info(f"Loaded {len(journals)} DOAJ entries") 

1258 return dict(journals) 

1259 

1260 

1261def _load_predatory(data_dir: Path) -> dict: 

1262 """Returns {journals: set, publishers: set, hijacked: set, long_pubs: list}.""" 

1263 path = data_dir / "predatory.json" 

1264 if not path.exists(): 

1265 logger.warning(f"{path} not found — predatory check will be skipped") 

1266 return { 

1267 "journals": set(), 

1268 "publishers": set(), 

1269 "hijacked": set(), 

1270 "long_pubs": [], 

1271 } 

1272 

1273 with open(path, encoding="utf-8") as f: 

1274 data = json.load(f) 

1275 

1276 journal_names = { 

1277 normalize_name(e.get("name", "")) 

1278 for e in data.get("journals", []) 

1279 if e.get("name", "").strip() 

1280 } 

1281 publisher_names = { 

1282 normalize_name(e.get("name", "")) 

1283 for e in data.get("publishers", []) 

1284 if e.get("name", "").strip() 

1285 } 

1286 hijacked_names = { 

1287 normalize_name(e.get("hijacked_name", "")) 

1288 for e in data.get("hijacked", []) 

1289 if e.get("hijacked_name", "").strip() 

1290 } 

1291 long_pubs = [ 

1292 normalize_name(e.get("name", "")) 

1293 for e in data.get("publishers", []) 

1294 if len(e.get("name", "").strip()) >= 10 

1295 ] 

1296 logger.info( 

1297 f"Loaded predatory: {len(journal_names)} journals, " 

1298 f"{len(publisher_names)} publishers, " 

1299 f"{len(hijacked_names)} hijacked" 

1300 ) 

1301 return { 

1302 "journals": journal_names, 

1303 "publishers": publisher_names, 

1304 "hijacked": hijacked_names, 

1305 "long_pubs": long_pubs, 

1306 } 

1307 

1308 

1309def _load_institutions(data_dir: Path) -> dict: 

1310 path = data_dir / "openalex_institutions.json.gz" 

1311 if not path.exists(): 

1312 logger.warning(f"{path} not found — institution tier will be empty") 

1313 return {} 

1314 with gzip.open(path, "rt", encoding="utf-8") as f: 

1315 data = json.load(f) 

1316 institutions = data.get("i", {}) 

1317 logger.info(f"Loaded {len(institutions)} institutions") 

1318 return dict(institutions) 

1319 

1320 

1321def _load_abbreviations(data_dir: Path) -> dict: 

1322 path = data_dir / "jabref_abbreviations.json.gz" 

1323 if not path.exists(): 

1324 logger.warning( 

1325 f"{path} not found — abbreviation expansion will be empty" 

1326 ) 

1327 return {} 

1328 with gzip.open(path, "rt", encoding="utf-8") as f: 

1329 data = json.load(f) 

1330 mappings = data.get("abbrev_to_full", {}) 

1331 logger.info(f"Loaded {len(mappings)} abbreviation mappings") 

1332 return dict(mappings) 

1333 

1334 

1335# --------------------------------------------------------------------------- 

1336# Table populators 

1337# --------------------------------------------------------------------------- 

1338 

1339 

1340def _populate_predatory(session: Session, pred: dict) -> None: 

1341 long_set = set(pred.get("long_pubs", [])) 

1342 

1343 journals = [{"name_lower": n} for n in pred.get("journals", set()) if n] 

1344 if journals: 

1345 session.bulk_insert_mappings(PredatoryJournal, journals) # type: ignore[arg-type] 

1346 

1347 hijacked = [{"name_lower": n} for n in pred.get("hijacked", set()) if n] 

1348 if hijacked: 

1349 session.bulk_insert_mappings(PredatoryHijacked, hijacked) # type: ignore[arg-type] 

1350 

1351 publishers = [ 

1352 {"name_lower": n, "is_long": n in long_set} 

1353 for n in pred.get("publishers", set()) 

1354 if n 

1355 ] 

1356 if publishers: 

1357 session.bulk_insert_mappings(PredatoryPublisher, publishers) # type: ignore[arg-type] 

1358 

1359 logger.info( 

1360 f"Inserted predatory tables: " 

1361 f"{len(journals)} journals, " 

1362 f"{len(publishers)} publishers, " 

1363 f"{len(hijacked)} hijacked" 

1364 ) 

1365 

1366 

1367def _populate_sources( 

1368 session: Session, 

1369 sources: dict, 

1370 doaj_data: dict, 

1371 pred: dict, 

1372) -> None: 

1373 """Build Source rows with cross-referenced DOAJ + predatory flags.""" 

1374 type_map = {"j": "journal", "c": "conference"} 

1375 pred_journals = pred.get("journals", set()) 

1376 pred_publishers = pred.get("publishers", set()) 

1377 pred_hijacked = pred.get("hijacked", set()) 

1378 

1379 # Dedup key is (name_lower, issn or "") so journals with separate 

1380 # print and electronic ISSNs in OpenAlex both survive instead of 

1381 # collapsing onto one row. 

1382 seen: dict[tuple[str, str], dict] = {} 

1383 

1384 for source_id, compact in sources.items(): 

1385 name = (compact.get("n") or "").strip() 

1386 if not name: 1386 ↛ 1387line 1386 didn't jump to line 1387 because the condition on line 1386 was never true

1387 continue 

1388 

1389 name_lower = normalize_name(name) 

1390 issn = normalize_issn(compact.get("i")) 

1391 publisher = compact.get("p") or None 

1392 h_index = compact.get("h") 

1393 impact_factor = compact.get("if") 

1394 cited_by_count = compact.get("cb") 

1395 source_type = type_map.get(compact.get("t", ""), compact.get("t", "")) 

1396 

1397 doaj_entry = doaj_data.get(issn) if issn else None 

1398 is_in_doaj = doaj_entry is not None 

1399 has_doaj_seal = bool(doaj_entry and doaj_entry.get("has_seal")) 

1400 

1401 is_pred = name_lower in pred_journals 

1402 pred_source = "stop-predatory-journals" if is_pred else None 

1403 if not is_pred and publisher: 1403 ↛ 1404line 1403 didn't jump to line 1404 because the condition on line 1403 was never true

1404 pub_norm = normalize_name(publisher) 

1405 if pub_norm in pred_publishers: 

1406 is_pred = True 

1407 pred_source = "stop-predatory-publishers" 

1408 if not is_pred and name_lower in pred_hijacked: 

1409 is_pred = True 

1410 pred_source = "stop-predatory-hijacked" 

1411 

1412 # Whitelist override 

1413 if is_pred and ( 1413 ↛ 1416line 1413 didn't jump to line 1416 because the condition on line 1413 was never true

1414 is_in_doaj or (h_index or 0) > PREDATORY_WHITELIST_HINDEX 

1415 ): 

1416 is_pred = False 

1417 pred_source = None 

1418 

1419 quality = derive_quality_score( 

1420 h_index=h_index, 

1421 is_in_doaj=is_in_doaj, 

1422 has_doaj_seal=has_doaj_seal, 

1423 is_predatory=is_pred, 

1424 source_type=source_type, 

1425 ) 

1426 

1427 rec = { 

1428 "name": name, 

1429 "name_lower": name_lower, 

1430 "issn": issn, 

1431 "openalex_source_id": source_id, 

1432 "source_type": source_type, 

1433 "publisher": publisher, 

1434 "h_index": h_index, 

1435 "impact_factor": impact_factor, 

1436 "cited_by_count": cited_by_count, 

1437 "quartile": None, # filled in by the post-pass below 

1438 "quality": quality, 

1439 "is_in_doaj": is_in_doaj, 

1440 "has_doaj_seal": has_doaj_seal, 

1441 "is_predatory": is_pred, 

1442 "predatory_source": pred_source, 

1443 "score_source": "openalex", 

1444 } 

1445 

1446 key = (name_lower, issn or "") 

1447 prev = seen.get(key) 

1448 if prev is None or (h_index or 0) > (prev.get("h_index") or 0): 1448 ↛ 1384line 1448 didn't jump to line 1384 because the condition on line 1448 was always true

1449 seen[key] = rec 

1450 

1451 # Second pass: DOAJ-only journals (not in OpenAlex). Without this 

1452 # we lose ~4-7K small open-access venues. Keyed by name_lower so 

1453 # we don't double-insert anything OpenAlex already covered. 

1454 openalex_names = {k[0] for k in seen.keys()} 

1455 doaj_added = 0 

1456 for issn, doaj_entry in doaj_data.items(): 

1457 name = (doaj_entry.get("name") or "").strip() 

1458 if not name: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the condition on line 1458 was never true

1459 continue 

1460 name_lower = normalize_name(name) 

1461 if name_lower in openalex_names: 1461 ↛ 1462line 1461 didn't jump to line 1462 because the condition on line 1461 was never true

1462 continue 

1463 has_seal = bool(doaj_entry.get("has_seal")) 

1464 publisher = doaj_entry.get("publisher") or None 

1465 quality = derive_quality_score( 

1466 h_index=None, 

1467 is_in_doaj=True, 

1468 has_doaj_seal=has_seal, 

1469 is_predatory=False, 

1470 source_type="journal", 

1471 ) 

1472 seen[(name_lower, issn or "")] = { 

1473 "name": name, 

1474 "name_lower": name_lower, 

1475 "issn": issn, 

1476 "openalex_source_id": None, 

1477 "source_type": "journal", 

1478 "publisher": publisher, 

1479 "h_index": None, 

1480 "impact_factor": None, 

1481 "cited_by_count": None, 

1482 "quartile": None, 

1483 "quality": quality, 

1484 "is_in_doaj": True, 

1485 "has_doaj_seal": has_seal, 

1486 "is_predatory": False, 

1487 "predatory_source": None, 

1488 "score_source": "doaj", 

1489 } 

1490 openalex_names.add(name_lower) 

1491 doaj_added += 1 

1492 

1493 records = list(seen.values()) 

1494 

1495 # Derive quartile (Q1–Q4) from cited_by_count percentile within each 

1496 # source_type. Field-specific quartiles would be more accurate but 

1497 # require per-source topic data that 4–7×s the snapshot size, so we 

1498 # use global per-type percentiles as a defensible approximation 

1499 # given the license constraint that ruled out SJR. 

1500 by_type: dict[str, list[dict]] = {} 

1501 for r in records: 

1502 if r.get("cited_by_count") is None: 

1503 continue # NULL quartile for entries without citation data 

1504 by_type.setdefault(r.get("source_type") or "", []).append(r) 

1505 for type_records in by_type.values(): 

1506 type_records.sort(key=lambda r: r["cited_by_count"]) 

1507 n = len(type_records) 

1508 if n == 0: 1508 ↛ 1509line 1508 didn't jump to line 1509 because the condition on line 1508 was never true

1509 continue 

1510 for rank, r in enumerate(type_records): 

1511 pct = rank / n # 0.0 = lowest, ~1.0 = highest 

1512 if pct >= 0.75: 

1513 r["quartile"] = "Q1" 

1514 elif pct >= 0.50: 

1515 r["quartile"] = "Q2" 

1516 elif pct >= 0.25: 

1517 r["quartile"] = "Q3" 

1518 else: 

1519 r["quartile"] = "Q4" 

1520 

1521 # Re-derive quality now that quartile is available. The first-pass 

1522 # `quality` values above were computed without quartile and are 

1523 # therefore suboptimal — a Q1 journal without h-index data would 

1524 # have scored `None` (fall-through) instead of 8. The runtime filter 

1525 # code in journal_reputation_filter.py does pass quartile, so the 

1526 # stored column should agree with the live score. 

1527 for r in records: 

1528 r["quality"] = derive_quality_score( 

1529 h_index=r.get("h_index"), 

1530 quartile=r.get("quartile"), 

1531 is_in_doaj=r.get("is_in_doaj") or False, 

1532 has_doaj_seal=r.get("has_doaj_seal") or False, 

1533 is_predatory=r.get("is_predatory") or False, 

1534 source_type=r.get("source_type"), 

1535 ) 

1536 

1537 logger.info( 

1538 f"Inserting {len(records)} source records ({doaj_added} DOAJ-only)..." 

1539 ) 

1540 for i in range(0, len(records), _BATCH_SIZE): 

1541 session.bulk_insert_mappings(Source, records[i : i + _BATCH_SIZE]) # type: ignore[arg-type] 

1542 

1543 

1544def _populate_institutions(session: Session, institutions: dict) -> None: 

1545 records: list[dict] = [] 

1546 for inst_id, compact in institutions.items(): 

1547 name = (compact.get("n") or "").strip() 

1548 if not name: 1548 ↛ 1549line 1548 didn't jump to line 1549 because the condition on line 1548 was never true

1549 continue 

1550 records.append( 

1551 { 

1552 "openalex_id": inst_id, 

1553 "name": name, 

1554 "name_lower": normalize_name(name), 

1555 "ror_id": compact.get("r"), 

1556 "country": compact.get("c"), 

1557 "type": compact.get("t"), 

1558 "h_index": compact.get("h"), 

1559 "impact_factor": compact.get("if"), 

1560 "works_count": compact.get("w"), 

1561 "cited_by_count": compact.get("cb"), 

1562 } 

1563 ) 

1564 logger.info(f"Inserting {len(records)} institution records...") 

1565 for i in range(0, len(records), _BATCH_SIZE): 

1566 session.bulk_insert_mappings(Institution, records[i : i + _BATCH_SIZE]) # type: ignore[arg-type] 

1567 

1568 

1569def _populate_abbreviations(session: Session, mappings: dict) -> None: 

1570 records: list[dict] = [] 

1571 seen: set[str] = set() 

1572 for abbrev, full in mappings.items(): 

1573 norm = normalize_name(abbrev) 

1574 if not norm or norm in seen: 

1575 continue 

1576 seen.add(norm) 

1577 records.append({"abbrev_lower": norm, "full_name": full}) 

1578 logger.info(f"Inserting {len(records)} abbreviation records...") 

1579 for i in range(0, len(records), _BATCH_SIZE): 

1580 session.bulk_insert_mappings(Abbreviation, records[i : i + _BATCH_SIZE]) # type: ignore[arg-type] 

1581 

1582 

1583# --------------------------------------------------------------------------- 

1584# Backwards-compat shim for the old build_reference_db name 

1585# --------------------------------------------------------------------------- 

1586 

1587 

1588def build_reference_db( 

1589 data_dir: Optional[Path] = None, 

1590 output_path: Optional[Path] = None, 

1591) -> None: 

1592 """Deprecated alias for `build_db`.""" 

1593 build_db(data_dir=data_dir, output_path=output_path)