Coverage for src/local_deep_research/advanced_search_system/filters/journal_reputation_filter.py: 70%
444 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Journal reputation filter with tiered quality scoring.
4Scores journals 1-10 and filters academic search results by quality.
5Uses bundled bibliometric data for most journals; LLM analysis is an
6opt-in last resort. Predatory journals are auto-removed.
8Scoring tiers (tried in order, first match wins):
10 1. Predatory check — auto-removes blacklisted journals/publishers
11 (whitelist override prevents false positives)
12 2. OpenAlex — h-index, quartile, DOAJ from ~217K bundled sources;
13 preprint repos can be lifted via institution affiliations
14 3. DOAJ — quality floor (5) for listed OA journals;
15 DOAJ Seal → 8
16 3.5 Institutions — author affiliation lookup when no venue matched
17 (capped at 6, never beats a real venue)
19 --- DB cache check (only for cached LLM results from previous runs) ---
21 3.6 LLM cleanup — LLM canonicalises the name, then retries Tier 2
22 (opt-in via ``enable_llm_scoring``)
23 4. LLM analysis — SearXNG web search + LLM scoring (opt-in, expensive);
24 disabled after 2 consecutive failures
25 Conference — name-pattern heuristic for unmatched conferences
27Unknown journals that no tier can score receive a low-confidence score (3).
28"""
30import re
31import threading
32import time
33import unicodedata
34from datetime import timedelta
35from typing import Any, Dict, List, Optional
37from langchain_core.language_models.chat_models import BaseChatModel
38from loguru import logger
39from sqlalchemy.orm import Session
41from ...config.llm_config import get_llm
42from ...constants import VALID_QUALITY_SCORES
43from ...database.models import Journal
44from ...database.session_context import get_user_db_session
46# normalize_name applies NFKC + lower + strip — must match the
47# migration backfill/dedupe expressions in 0006_journal_quality_system.py
48# and the reference DB builder in journal_quality/db.py so name_lower
49# is single-valued across every writer.
50from ...journal_quality.db import get_db as get_journal_data_manager
51from ...journal_quality.scoring import normalize_name
52from ...security.log_sanitizer import strip_control_chars
53from ...utilities.llm_utils import get_model_identifier
54from ...utilities.resource_utils import safe_close
55from ...utilities.thread_context import get_search_context
56from ...web_search_engines.search_engine_factory import create_search_engine
57from .base_filter import BaseFilter
60# Patterns that indicate a venue is a conference, not a journal.
61# Used as a fallback when DOI enrichment and OpenAlex lookup both miss.
62_CONFERENCE_PATTERNS = [
63 re.compile(r"\b(?:proceedings|proc\.)\b", re.I),
64 re.compile(r"\b(?:conference|conf\.)\b", re.I),
65 re.compile(r"\b(?:symposium|symp\.)\b", re.I),
66 re.compile(r"\bworkshop\b", re.I),
67 re.compile(
68 r"\b(?:ICML|NeurIPS|NIPS|AAAI|CVPR|ICLR|ACL|EMNLP|NAACL|ECCV|ICCV|ICSE|SIGMOD|VLDB|KDD|WWW|SIGIR|CIKM|WSDM|RecSys|ISCA|MICRO|ASPLOS|OSDI|SOSP|NSDI|USENIX)\b"
69 ),
70]
73def _is_likely_conference(name: str) -> bool:
74 """Detect if a venue name is likely a conference based on common patterns."""
75 return any(p.search(name) for p in _CONFERENCE_PATTERNS)
78def _sanitize_name(name: str) -> str:
79 """Sanitize a journal name for safe use in logs and LLM prompts.
81 Args:
82 name: Raw journal name string, potentially containing control
83 characters, excessive length, or quotes.
85 Returns:
86 Sanitized string safe for use in logs and LLM prompts.
87 """
88 # Strip control + format characters. Covers C0/C1 (log injection),
89 # bidi overrides (U+202A-E, U+2066-9), zero-width / joiner chars
90 # (U+200B-F, U+2060-4, U+FEFF), Arabic letter mark, and digit-shape
91 # controls — the comprehensive pattern audited in log_sanitizer,
92 # not the narrow C0/C1-only regex we used to have here.
93 name = strip_control_chars(name)
94 # Normalize Unicode (prevents lookalike bypasses)
95 name = unicodedata.normalize("NFKC", name)
96 # Limit length (prevents resource exhaustion in prompts)
97 if len(name) > 500: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 name = name[:500] + "..."
99 # Strip quotes that could break prompt structure
100 name = name.replace('"', "'")
101 return name.strip()
104def _format_affiliations(affiliations: list, max_n: int = 3) -> str:
105 """Render an affiliation list as a compact human-readable string for
106 log lines. Accepts the same shapes as ``score_from_affiliations``
107 (plain strings or dicts with a ``name`` key) and truncates after
108 ``max_n`` entries so a 20-author paper doesn't blow up the log.
109 """
110 if not affiliations: 110 ↛ 112line 110 didn't jump to line 112 because the condition on line 110 was always true
111 return "(none)"
112 names: list[str] = []
113 for aff in affiliations:
114 if isinstance(aff, str):
115 names.append(aff)
116 elif isinstance(aff, dict):
117 nm = aff.get("name") or aff.get("display_name")
118 if nm:
119 names.append(nm)
120 if not names:
121 return "(unknown)"
122 shown = names[:max_n]
123 suffix = "" if len(names) <= max_n else f" (+{len(names) - max_n} more)"
124 return ", ".join(shown) + suffix
127_bg_fetch_lock = threading.Lock()
128_bg_fetch_thread: Optional[threading.Thread] = None
131def _start_background_journal_fetch() -> None:
132 """Kick off ``ensure_journal_data(auto_download=True)`` in a daemon
133 thread the first time a search hits the pending path.
135 The worker returns immediately if another thread is already in
136 flight (``_bg_fetch_thread.is_alive()``) — so 30 concurrent filter
137 workers can't each spawn their own download. The ``ensure_journal_data``
138 TTL cache provides a second line of defence.
140 Daemon thread so it doesn't block process exit.
141 """
142 global _bg_fetch_thread
143 with _bg_fetch_lock:
144 if _bg_fetch_thread is not None and _bg_fetch_thread.is_alive():
145 logger.debug(
146 "journal-data background fetch already in flight — "
147 "not spawning a second thread"
148 )
149 return
151 def _run():
152 try:
153 # Late import so this module doesn't pay the cost of
154 # importing the downloader at its own import time.
155 from ...journal_quality.downloader import (
156 ensure_journal_data,
157 )
159 logger.info(
160 "journal-data background fetch: starting "
161 "(triggered by filter pending path)"
162 )
163 ensure_journal_data(auto_download=True)
164 logger.info("journal-data background fetch: done")
165 except Exception:
166 logger.exception(
167 "journal-data background fetch crashed — "
168 "next search will retry"
169 )
171 _bg_fetch_thread = threading.Thread(
172 target=_run,
173 name="journal-data-bg-fetch",
174 daemon=True,
175 )
176 _bg_fetch_thread.start()
179class JournalFilterError(Exception):
180 """
181 Custom exception for errors related to journal filtering.
182 """
185class JournalReputationFilter(BaseFilter):
186 """
187 A filter for academic results that considers the reputation of journals.
189 Uses a tiered scoring approach: bundled data (OpenAlex, DOAJ, predatory
190 lists) for most journals, with LLM-based analysis via SearXNG as a
191 fallback for truly unknown journals.
193 Predatory journals are **automatically removed** from results.
194 """
196 def __init__(
197 self,
198 model: BaseChatModel | None = None,
199 reliability_threshold: int | None = None,
200 max_context: int | None = None,
201 exclude_non_published: bool | None = None,
202 quality_reanalysis_period: timedelta | None = None,
203 settings_snapshot: Dict[str, Any] | None = None,
204 ):
205 """Initialize the journal reputation filter.
207 Args:
208 model: The LLM model to use for Tier 4 analysis. If None,
209 the default LLM from settings will be used.
210 reliability_threshold: Minimum quality score (1-10) for a
211 result to pass. Read from settings if not specified.
212 max_context: Maximum characters of source content for LLM
213 quality evaluation.
214 exclude_non_published: If True, exclude results that don't
215 have an associated journal publication reference.
216 quality_reanalysis_period: Period after which cached journal
217 quality assessments are refreshed.
218 settings_snapshot: Settings snapshot for thread context.
219 """
220 super().__init__(model)
222 self._owns_llm = self.model is None
223 if self.model is None:
224 self.model = get_llm()
226 # Import here to avoid circular import
227 from ...config.search_config import get_setting_from_snapshot
229 self.__threshold = reliability_threshold
230 if self.__threshold is None:
231 self.__threshold = int(
232 get_setting_from_snapshot(
233 "search.journal_reputation.threshold",
234 2,
235 settings_snapshot=settings_snapshot,
236 )
237 )
238 self.__max_context = max_context
239 if self.__max_context is None:
240 self.__max_context = int(
241 get_setting_from_snapshot(
242 "search.journal_reputation.max_context",
243 3000,
244 settings_snapshot=settings_snapshot,
245 )
246 )
247 self.__exclude_non_published = exclude_non_published
248 if self.__exclude_non_published is None:
249 self.__exclude_non_published = bool(
250 get_setting_from_snapshot(
251 "search.journal_reputation.exclude_non_published",
252 False,
253 settings_snapshot=settings_snapshot,
254 )
255 )
256 self.__quality_reanalysis_period = quality_reanalysis_period
257 if self.__quality_reanalysis_period is None:
258 self.__quality_reanalysis_period = timedelta(
259 days=int(
260 get_setting_from_snapshot(
261 "search.journal_reputation.reanalysis_period",
262 365,
263 settings_snapshot=settings_snapshot,
264 )
265 )
266 )
268 self.__settings_snapshot = settings_snapshot
270 # SearXNG for Tier 4 (LLM fallback). Not strictly required anymore
271 # since bundled data covers most journals.
272 self.__engine = create_search_engine(
273 "searxng", llm=self.model, settings_snapshot=settings_snapshot
274 )
275 self.__searxng_available = self.__engine is not None and getattr(
276 self.__engine, "is_available", False
277 )
278 if not self.__searxng_available:
279 logger.info(
280 "SearXNG not available — Tier 4 (LLM analysis) disabled. "
281 "Bundled data tiers still active."
282 )
284 # Fail-fast counter for SearXNG failures within a batch.
285 # Stored in `threading.local()` so concurrent `filter_results`
286 # calls on the same cached filter instance (the parallel search
287 # engine reuses instances across worker threads — see
288 # parallel_search_engine.py:177) can't clobber each other's
289 # counter. Per-thread state is reset at the top of every
290 # `filter_results` invocation.
291 self.__tls = threading.local()
293 # Lock serializing access to the shared SearXNG engine for
294 # Tier 4. BaseSearchEngine instances keep mutable bookkeeping
295 # state (_last_results_count, _search_results, rate-limit
296 # tracker) on self; two concurrent .run() calls on the same
297 # instance would clobber that state. Tier 4 is rarely hit in
298 # practice (requires enable_llm_scoring=True + SearXNG +
299 # bundled-data miss), so the lock's contention cost is
300 # negligible compared to the correctness guarantee.
301 self.__engine_lock = threading.Lock()
303 # Journal data manager (loads bundled datasets lazily)
304 self.__data_manager = get_journal_data_manager()
306 # ------------------------------------------------------------------
307 # Thread-local fail-fast counter accessors
308 # ------------------------------------------------------------------
310 def __searxng_failures(self) -> int:
311 return getattr(self.__tls, "searxng_failures", 0)
313 def __reset_searxng_failures(self) -> None:
314 self.__tls.searxng_failures = 0
316 def __bump_searxng_failures(self) -> int:
317 n = self.__searxng_failures() + 1
318 self.__tls.searxng_failures = n
319 return n
321 def close(self) -> None:
322 """Close the SearXNG engine and LLM client."""
323 if hasattr(self, "_JournalReputationFilter__engine"): 323 ↛ 327line 323 didn't jump to line 327 because the condition on line 323 was always true
324 # allow_none=True: SearXNG is optional (Tier 4 only), so
325 # __engine is None in the common no-SearXNG configuration.
326 safe_close(self.__engine, "SearXNG engine", allow_none=True)
327 if self._owns_llm: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 safe_close(self.model, "journal filter LLM")
330 @classmethod
331 def create_default(
332 cls,
333 model: BaseChatModel | None = None,
334 *,
335 engine_name: str,
336 settings_snapshot: Dict[str, Any] | None = None,
337 ) -> Optional["JournalReputationFilter"]:
338 """Initializes a default configuration of the filter based on settings.
340 SearXNG is not required — the filter works with bundled data alone.
341 SearXNG enables the optional Tier 4 (LLM analysis) for journals
342 not found in the bundled datasets.
344 Args:
345 model: Optional LLM model for Tier 4 analysis.
346 engine_name: Search engine configuration key (e.g. "arxiv").
347 settings_snapshot: Optional frozen settings dict.
349 Returns:
350 A configured JournalReputationFilter, or None if filtering
351 is disabled in settings for this engine.
352 """
353 from ...config.search_config import get_setting_from_snapshot
355 try:
356 enabled = get_setting_from_snapshot(
357 f"search.engine.web.{engine_name}.journal_reputation.enabled",
358 True,
359 settings_snapshot=settings_snapshot,
360 )
361 logger.info(
362 f"Journal filter create_default: engine={engine_name}, "
363 f"enabled={enabled} (type={type(enabled).__name__})"
364 )
365 if not bool(enabled):
366 logger.info(
367 f"Journal filter disabled for {engine_name} in settings"
368 )
369 return None
371 filt = JournalReputationFilter(
372 model=model, settings_snapshot=settings_snapshot
373 )
374 logger.info(
375 f"Journal filter created for {engine_name} — "
376 f"threshold={filt._JournalReputationFilter__threshold}"
377 )
378 return filt
379 except Exception:
380 # Any failure — settings read, filter init — returns None
381 # rather than silently defaulting to enabled. A separate
382 # silent ``except Exception: enabled = True`` branch used to
383 # wrap only the settings read, which hid legitimate
384 # configuration errors; per CLAUDE.md, fallbacks have to be
385 # explicit, not default-on.
386 logger.exception(
387 f"Failed to configure journal filter for {engine_name}; "
388 "results will not be journal-quality filtered for this engine"
389 )
390 return None
392 @staticmethod
393 def __db_session() -> Session | None:
394 """Get a database session using the current search context credentials.
396 Returns:
397 SQLAlchemy Session context manager for the user's database, or
398 ``None`` if no search context is available (e.g. when called
399 from the preview filter phase, before per-user thread context
400 has been propagated). Callers should treat ``None`` as
401 "skip the DB operation".
402 """
403 context = get_search_context()
404 if context is None:
405 return None
406 username = context.get("username")
407 password = context.get("user_password")
408 return get_user_db_session(username=username, password=password)
410 # ------------------------------------------------------------------
411 # Journal name cleaning (with LRU cache to avoid duplicate LLM calls)
412 # ------------------------------------------------------------------
414 def __clean_journal_name(self, journal_name: str) -> str:
415 """Clean journal name to normalize for deduplication and lookup.
417 Deterministic regex-based cleaning only: strips volume / page /
418 year / month references, then tries a JabRef abbreviation
419 expansion. This method does NOT call the LLM — it is the cheap,
420 instant cleaning path that every Tier runs through first.
422 The separate ``__llm_clean_journal_name`` method is the LLM
423 fallback, invoked later only as a salvage step when the bundled
424 data tiers all miss and ``enable_llm_scoring`` is on.
425 Abbreviations not in the JabRef list and location suffixes
426 ("ICML 2023, Honolulu") only get LLM-cleaned on that salvage
427 path; this method returns them unchanged.
429 Args:
430 journal_name: Raw journal name from search results.
432 Returns:
433 Cleaned, normalized journal name.
434 """
435 # Sanitize first (strips control chars, normalizes Unicode)
436 journal_name = _sanitize_name(journal_name)
437 # Regex handles volume/page/year stripping (instant)
438 cleaned = self.__regex_clean_journal_name(journal_name)
440 # Try JabRef abbreviation expansion (deterministic, instant)
441 expanded = self.__data_manager.expand_abbreviation(cleaned)
442 if expanded:
443 logger.debug(f"Abbreviation expanded: '{cleaned}' → '{expanded}'")
444 return expanded
446 if cleaned != journal_name:
447 logger.debug(
448 f"Regex-cleaned journal name: '{journal_name}' → '{cleaned}'"
449 )
450 return cleaned
452 @staticmethod
453 def __regex_clean_journal_name(name: str) -> str:
454 """
455 Fast regex-based journal name normalization. Strips volume, issue,
456 page, year, and month references. No LLM needed.
457 """
458 months = (
459 "january|february|march|april|may|june|july|"
460 "august|september|october|november|december|"
461 "jan|feb|mar|apr|jun|jul|aug|sep|oct|nov|dec"
462 )
464 # Strip leading/trailing whitespace
465 name = name.strip()
467 # Strip a leading [bracketed-original-language] prefix that
468 # MEDLINE uses for non-English journals:
469 # "[Rinsho ketsueki] The Japanese journal of clinical hematology"
470 # → "The Japanese journal of clinical hematology"
471 name = re.sub(r"^\[[^\]]+\]\s*", "", name)
473 # Strip trailing publisher suffixes that some search engines glue
474 # onto the journal name (e.g. "Information Fusion Elsevier" or
475 # "Cell Press"). Conservative — only the handful of well-known
476 # academic publishers, anchored at end of string with a leading
477 # space so we don't eat them mid-name.
478 name = re.sub(
479 r"\s+(?:Elsevier|Springer|Wiley|Nature\s+Publishing|"
480 r"Cell\s+Press|MDPI|Sage|Taylor\s*&?\s*Francis|"
481 r"Oxford\s+University\s+Press|Cambridge\s+University\s+Press|"
482 r"IEEE|ACM|Routledge|Frontiers)\s*$",
483 "",
484 name,
485 flags=re.IGNORECASE,
486 )
488 # Strip a leading 4-digit year ("2015 Plasma Phys. ..." → "Plasma Phys. ...")
489 name = re.sub(r"^(?:19|20)\d{2}\s+", "", name)
491 # Strip a leading ordinal volume marker, e.g.
492 # "31st Conference on Neural Information Processing Systems" → "Conference on …"
493 # Without this the OpenAlex name lookup fails on most conference
494 # entries because the canonical name has no ordinal prefix.
495 name = re.sub(
496 r"^\d+(?:st|nd|rd|th)\s+",
497 "",
498 name,
499 flags=re.IGNORECASE,
500 )
502 # Remove month+year references FIRST (before bare year strip),
503 # so "September 2023" is consumed as a unit and we don't leave
504 # the month behind as an orphan word.
505 name = re.sub(
506 rf",?\s*\b(?:{months})\b\.?\s+(?:19|20)?\d{{2,4}}",
507 "",
508 name,
509 flags=re.IGNORECASE,
510 )
512 # Remove volume/issue/page refs: "Vol. 12", "Issue 3", "pp. 100-200"
513 name = re.sub(
514 r",?\s*(?:vol(?:ume)?\.?\s*\d+|"
515 r"issue\s*\d+|"
516 r"no\.?\s*\d+|"
517 r"pp?\.?\s*\d+[\s–-]*\d*|"
518 r"pages?\s*\d+[\s–-]*\d*)",
519 "",
520 name,
521 flags=re.IGNORECASE,
522 )
523 # Remove volume(issue) patterns: "141(5)" — and bare "(15)" issues.
524 name = re.sub(r",?\s*\d+\(\d+\)", "", name)
525 name = re.sub(r"\s*\(\d+\)\s*", " ", name)
526 # Remove year references: "(2023)", ", 2023". Anchored to 19xx/20xx
527 # so 4-digit page numbers like ", 1063" in "106335" aren't eaten.
528 name = re.sub(r"\s*[\(,]\s*(?:19|20)\d{2}\b\s*\)?", "", name)
529 # Remove bare trailing citation data: ", 95, 146802" style
530 # Only strips when there's a comma before the first number
531 # (preserves "NeurIPS 2023" where space-number is part of the name)
532 name = re.sub(r",\s*\d[\d,\s]*$", "", name)
533 # Strip trailing alphanumeric volume markers: "E48", "R569", "L102"
534 # (single uppercase letter followed by digits at end of string).
535 name = re.sub(r"\s+[A-Z]\d+\s*$", "", name)
536 # Strip trailing volume/page debris like "170 266-275", "71: 1-10",
537 # "151:48-60", or a bare trailing volume "116". Repeated to peel
538 # multiple chunks. Stops when only the journal name remains. We
539 # require the run to start with whitespace or punctuation so we
540 # don't eat the "2023" of "NeurIPS 2023" — the trailing-year regex
541 # below handles that case.
542 prev = None
543 while prev != name:
544 prev = name
545 name = re.sub(
546 r"[\s,;:]+\d+(?:\s*[:\-–]\s*\d+)?(?:\s*[-–]\s*\d+)?\s*$",
547 "",
548 name,
549 )
550 # Strip leftover trailing month name (no year) — happens when the
551 # year was stripped by another regex first.
552 name = re.sub(
553 rf",?\s*\b(?:{months})\b\.?\s*$",
554 "",
555 name,
556 flags=re.IGNORECASE,
557 )
558 # Strip leftover bare volume/page keyword at the end ("p", "pp",
559 # "vol", "vol.", "no", "no.") that survives when the number got
560 # truncated upstream by the search engine result preview.
561 name = re.sub(
562 r",?\s*\b(?:vol(?:ume)?|pp?|no)\b\.?\s*$",
563 "",
564 name,
565 flags=re.IGNORECASE,
566 )
567 # Strip empty / whitespace-only trailing parens — arXiv
568 # journal_ref fields sometimes end with "()" where the citation
569 # year was stripped upstream, e.g. "Physical Review Research ()".
570 name = re.sub(r"\s*\(\s*\)\s*$", "", name)
571 # Strip geographic qualifiers: "(London)", "(New York)", "(US)"
572 # Only strip parenthesized suffixes that contain no digits
573 # (preserves "NeurIPS (2023)" which is handled by the year regex)
574 name = re.sub(r"\s*\([^()0-9]+\)\s*$", "", name)
575 # Strip trailing truncated volume/page markers: ", v", ", p",
576 # ", vol" — these appear when the search engine preview cut the
577 # citation mid-keyword ("Plasma Physics and Controlled Fusion,
578 # vol. 63, no. 8" → "Plasma Physics and Controlled Fusion, v").
579 name = re.sub(
580 r",\s*(?:v|vol|p|pp|no|n)\.?\s*$",
581 "",
582 name,
583 flags=re.IGNORECASE,
584 )
585 # Remove trailing punctuation and whitespace
586 name = re.sub(r"[,;.\s]+$", "", name)
587 # Strip a trailing 4-digit year/volume (conferences: "NeurIPS 2023"
588 # → "NeurIPS"). Comes after the punctuation strip so any trailing
589 # comma/period is already gone, and after the parenthesized-year
590 # regex above so we don't double-process "(2023)".
591 name = re.sub(r"\s+\d{4}\s*$", "", name)
592 # Normalize "&" → "and" for consistent matching
593 name = re.sub(r"\s*&\s*", " and ", name)
594 # Normalize internal whitespace
595 return re.sub(r"\s+", " ", name).strip()
597 def __llm_clean_journal_name(self, journal_name: str) -> Optional[str]:
598 """LLM-based fallback for canonicalizing unusual journal names.
600 The regex + JabRef abbreviation tiers handle the common cases
601 (volume/year/page stripping, well-known abbreviations like
602 "Phys. Rev. Lett." → "Physical Review Letters"). They cannot
603 handle locations ("ICML 2023, Honolulu"), unusual abbreviations
604 not in the JabRef list, or non-English title transliterations.
606 This is gated behind ``enable_llm_scoring`` so it never fires
607 unless the user opted into the Tier 4 LLM path. Called only as a
608 salvage step when bundled tiers all miss, so the LLM bill is
609 bounded by the number of *unrecognised* journals per query, not
610 every journal.
612 Args:
613 journal_name: A name that the regex tier could not match
614 against any bundled dataset.
616 Returns:
617 A canonicalised name from the LLM, or ``None`` if the call
618 failed or the response was empty.
619 """
620 prompt = (
621 f"Clean up the following journal or conference name:\n\n"
622 f'"{journal_name}"\n\n'
623 "Remove any references to volumes, pages, months, or years. "
624 "Expand common abbreviations. For conferences, remove "
625 "locations. Output only the clean name, no explanation."
626 )
627 try:
628 response = self.model.invoke(prompt)
629 content = getattr(response, "content", None) or response
630 cleaned = str(content).strip().strip('"').strip("'")
631 if not cleaned: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true
632 return None
633 return cleaned
634 except (
635 ConnectionError,
636 TimeoutError,
637 ValueError,
638 ) as e:
639 # Network / service / parse failures are expected and
640 # recoverable — caller falls back to the regex-cleaned name.
641 # Surface at WARNING (not silent / not DEBUG) so they're
642 # visible during triage without flooding info-level logs.
643 # Log only the exception class name; the message can carry
644 # request-specific data (URLs, prompts) that doesn't belong
645 # in operational logs.
646 logger.warning(
647 f"LLM name cleaning failed for '{journal_name}' "
648 f"({type(e).__name__}); using regex-cleaned version"
649 )
650 return None
652 # ------------------------------------------------------------------
653 # Tier 4: LLM-based analysis (last resort)
654 # ------------------------------------------------------------------
656 def __analyze_journal_reputation(self, journal_name: str) -> int:
657 """Analyze journal reputation via 1 SearXNG search + 1 LLM call.
659 This is Tier 4 — the last-resort scoring path. Only used when
660 the journal is not found in bundled data (OpenAlex, DOAJ, predatory).
661 Uses a single web search for context, then a single LLM call to score.
663 Args:
664 journal_name: Cleaned journal name to research.
666 Returns:
667 Reputation score between 1 and 10.
669 Raises:
670 ValueError: If the LLM response cannot be parsed as a score.
671 """
672 logger.info(f"Tier 4: LLM analysis for journal '{journal_name}'...")
674 # Single SearXNG search for journal info. Serialize access to
675 # the shared SearXNG engine to prevent two threads from
676 # clobbering its instance state (_last_results_count,
677 # _search_results, rate tracker).
678 query = f'"{journal_name}" academic journal impact factor quartile'
679 with self.__engine_lock:
680 results = self.__engine.run(query)
682 # Extract snippets from search results
683 snippets = []
684 for r in results[:10]:
685 snippet = r.get("snippet", "") or r.get("content", "")
686 if snippet: 686 ↛ 684line 686 didn't jump to line 684 because the condition on line 686 was always true
687 snippets.append(snippet)
688 journal_info_text = "\n".join(snippets)
690 if not journal_info_text: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 logger.warning(
692 f"No SearXNG results for '{journal_name}' — "
693 f"cannot score via Tier 4"
694 )
695 raise ValueError(f"No search results for journal '{journal_name}'")
697 # Truncate to fit context
698 if len(journal_info_text) > self.__max_context: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true
699 journal_info_text = journal_info_text[: self.__max_context] + "..."
701 # Single LLM call to score. Wording mirrors the long-standing
702 # original prompt — earlier code review flagged that arbitrary
703 # rewrites of this prompt have a real chance of regressing the
704 # Q1/Q2/Q3 calibration the rest of the code depends on.
705 prompt = f"""
706You are a research assistant helping to assess the reliability and
707reputability of scientific journals. A reputable journal should be
708peer-reviewed, not predatory, and high-impact. Please review the
709following information on the journal "{journal_name}" and output a
710reputability score between 1 and 10, where 1-3 is not reputable and
711probably predatory, 4-6 is reputable but low-impact (Q2 or Q3),
712and 7-10 is reputable Q1 journals. Only output the number, do not
713provide any explanation or other output.
715JOURNAL INFORMATION:
717{journal_info_text}
718"""
720 response = self.model.invoke(prompt).content
721 logger.debug(f"Tier 4 LLM response for '{journal_name}': {response}")
723 match = re.search(r"\d+", response.strip())
724 if match is None:
725 logger.warning(
726 f"Failed to parse score from LLM response for "
727 f"'{journal_name}': {response!r}"
728 )
729 raise ValueError(
730 "Failed to parse reputation score from LLM response."
731 )
733 reputation_score = int(match.group())
734 if reputation_score not in VALID_QUALITY_SCORES:
735 # Scoring tiers emit {1,4,5,6,7,8,10}; LLM returning anything
736 # else is almost certainly prompt drift. Treat as a parse
737 # failure so the existing failure counter + circuit breaker
738 # observe it, rather than snapping to the nearest bucket and
739 # silently masking the problem.
740 logger.warning(
741 f"LLM returned out-of-set score {reputation_score} for "
742 f"'{journal_name}' (expected one of "
743 f"{sorted(VALID_QUALITY_SCORES)}); treating as parse failure"
744 )
745 raise ValueError(
746 f"LLM returned out-of-set score {reputation_score}."
747 )
748 return reputation_score
750 # ------------------------------------------------------------------
751 # Database operations
752 # ------------------------------------------------------------------
754 def __save_llm_score_to_db(self, *, name: str, quality: int) -> None:
755 """Cache a Tier 4 LLM score for future research runs.
757 Only Tier 4 (LLM) results are cached — Tiers 1–3 read directly
758 from the read-only reference DB on every scoring pass. The
759 lookup predicate filters on ``score_source == "llm"`` and the
760 current ``quality_model`` so stale scores from a superseded LLM
761 don't get served.
763 No-op during the preview filter phase (no thread context).
764 """
765 session_ctx = self.__db_session()
766 if session_ctx is None: 766 ↛ 768line 766 didn't jump to line 768 because the condition on line 766 was always true
767 return
768 try:
769 self._save_journal_to_db_inner(
770 session_ctx, name=name, quality=quality
771 )
772 except Exception:
773 # Score is still valid and returned to the caller — only
774 # the cache write failed.
775 logger.exception(
776 f"Failed to cache LLM score for '{name}' — "
777 f"score is still returned but won't be cached."
778 )
780 def _save_journal_to_db_inner(
781 self, session_ctx, *, name: str, quality: int
782 ) -> None:
783 """Race-safe upsert of the Tier 4 LLM cache row.
785 Mirrors the Paper upsert pattern: select-then-insert in a
786 savepoint, and on IntegrityError (a concurrent writer created
787 the row first) roll back the savepoint and re-fetch. This
788 prevents the pre-fix bug where two concurrent scorings of the
789 same journal collided on the UNIQUE(name) constraint and the
790 exception handler left the cache empty.
791 """
792 from sqlalchemy.exc import IntegrityError
794 now = int(time.time())
795 model_id = get_model_identifier(self.model)
797 with session_ctx as db_session:
798 journal = db_session.query(Journal).filter_by(name=name).first()
799 if journal is not None:
800 journal.quality = quality
801 journal.score_source = "llm"
802 journal.quality_analysis_time = now
803 journal.quality_model = model_id
804 # name_lower MUST go through normalize_name, not bare
805 # .lower(). Bare lowercase leaves U+2122 (TM), ligatures,
806 # fullwidth letters intact; NFKC collapses them. The
807 # migration backfill (0006:257) uses the same normalization;
808 # divergence produces silent cache misses and, on migration,
809 # UNIQUE violations that abort the upgrade.
810 journal.name_lower = normalize_name(name)
811 try:
812 db_session.commit()
813 except Exception:
814 db_session.rollback()
815 logger.warning(
816 f"Failed to update cached LLM score for '{name}'"
817 )
818 return
820 sp = db_session.begin_nested()
821 try:
822 journal = Journal(
823 name=name,
824 name_lower=normalize_name(name),
825 quality=quality,
826 score_source="llm",
827 quality_model=model_id,
828 quality_analysis_time=now,
829 )
830 db_session.add(journal)
831 db_session.flush()
832 sp.commit()
833 db_session.commit()
834 return
835 except IntegrityError:
836 sp.rollback()
838 # Competing writer inserted first; re-fetch and update.
839 journal = db_session.query(Journal).filter_by(name=name).first()
840 if journal is None:
841 # Genuinely unexpected — UNIQUE violation with no row.
842 logger.warning(
843 f"IntegrityError on Journal '{name}' insert but "
844 f"row not found on re-fetch; skipping cache write."
845 )
846 return
847 journal.quality = quality
848 journal.score_source = "llm"
849 journal.quality_analysis_time = now
850 journal.quality_model = model_id
851 journal.name_lower = normalize_name(name)
852 try:
853 db_session.commit()
854 except Exception:
855 db_session.rollback()
856 logger.warning(f"Failed to merge cached LLM score for '{name}'")
858 # ------------------------------------------------------------------
859 # Tiered scoring for a single journal
860 # ------------------------------------------------------------------
862 def __score_journal(
863 self, journal_name: str, result: Dict[str, Any]
864 ) -> tuple[int | None, str | None]:
865 """
866 Score a journal using the tiered approach.
868 Returns ``(score, source_tag)``:
869 - ``score`` is the 1-10 quality value, or ``None`` if the
870 journal is predatory (signal to auto-remove).
871 - ``source_tag`` identifies which tier produced the score:
872 ``"openalex"``, ``"doaj"``, ``"institution"``, ``"llm"``
873 (Tier 4 live scoring OR cache hit on a prior LLM row),
874 ``"conference"``, or ``"low_confidence"`` (no tier matched).
875 ``None`` when predatory.
877 The tag is attached to the result dict for rendering. Nothing
878 is frozen on the Paper row; the dashboard resolves current
879 quality live from ``journals.quality`` (Tier 4) or the bundled
880 reference DB (Tier 1-3) so a re-scored journal propagates
881 automatically.
882 """
883 dm = self.__data_manager
885 # Extract IDs from result for richer lookups
886 issn = result.get("issn")
887 openalex_sid = result.get("openalex_source_id")
888 publisher = result.get("publisher")
890 # --- Tier 1: Predatory check ---
891 is_pred, pred_source = dm.is_predatory(
892 journal_name=journal_name,
893 publisher_name=publisher,
894 )
895 if is_pred:
896 # Check whitelist override (avoids false positives)
897 if dm.is_whitelisted(issn=issn, name=journal_name):
898 logger.debug(
899 f"Tier 1: '{journal_name}' is on predatory list "
900 f"({pred_source}) but whitelisted — not removing"
901 )
902 else:
903 logger.warning(
904 f"Tier 1: PREDATORY — removing results from "
905 f"'{journal_name}' (source: {pred_source})"
906 )
907 return None, None # Signal auto-remove
909 # --- Tier 2: OpenAlex snapshot ---
910 oa_entry = dm.lookup_openalex(
911 source_id=openalex_sid, issn=issn, name=journal_name
912 )
913 if oa_entry:
914 h_idx = oa_entry.get("h_index")
915 oa_doaj = oa_entry.get("is_in_doaj", False)
916 # OpenAlex has is_in_doaj but not has_doaj_seal — cross-ref DOAJ
917 oa_seal = (
918 dm.has_doaj_seal(oa_entry.get("issn_l")) if oa_doaj else False
919 )
920 oa_type = oa_entry.get("type", "journal")
921 oa_quartile = oa_entry.get("quartile")
922 score = dm.derive_quality_score(
923 h_index=h_idx,
924 quartile=oa_quartile,
925 is_in_doaj=oa_doaj,
926 has_doaj_seal=oa_seal,
927 source_type=oa_type,
928 )
929 if score is not None: 929 ↛ 959line 929 didn't jump to line 959 because the condition on line 929 was always true
930 logger.debug(
931 f"Tier 2 (OpenAlex): '{journal_name}' → "
932 f"score {score}/10 "
933 f"(quartile: {oa_quartile or '—'}, h-index: {h_idx})"
934 )
935 # Tier 2 results are NOT cached to the user DB — the
936 # read-only reference DB is already a 100–300µs lookup,
937 # so a second-level cache adds no value. Only Tier 4
938 # (LLM) results are cached, below.
939 # Preprint repositories (arxiv, biorxiv, ssrn, ...) get a
940 # low Tier-2 floor because they aren't peer-reviewed. If
941 # the authors are at a strong institution, lift the score
942 # via the institution tier — taking max so a real venue
943 # match (≥6) is never demoted. Only applies to repository
944 # source types and only when score is weak (≤5).
945 if oa_type == "repository" and score <= 5: 945 ↛ 946line 945 didn't jump to line 946 because the condition on line 945 was never true
946 affs = result.get("affiliations")
947 if affs:
948 inst = dm.score_from_affiliations(affs)
949 if inst is not None and inst > score:
950 logger.debug(
951 f"Tier 2+3.5 (preprint lift): "
952 f"'{journal_name}' {score} → {inst} via "
953 f"institutions: {_format_affiliations(affs)}"
954 )
955 return inst, "institution"
956 return score, "openalex"
958 # --- Tier 3: DOAJ ---
959 if issn:
960 doaj_entry = dm.lookup_doaj(issn=issn)
961 if doaj_entry:
962 seal = doaj_entry.get("has_seal", False)
963 score = dm.derive_quality_score(
964 is_in_doaj=True,
965 has_doaj_seal=seal,
966 )
967 logger.debug(
968 f"Tier 3 (DOAJ): '{journal_name}' → "
969 f"score {score}/10 (DOAJ Seal: {seal})"
970 )
971 return score, "doaj"
973 # --- Tier 3.5: Institution lookup ---
974 # When the venue tiers couldn't score the paper, fall back to
975 # author affiliations. This is the *only* trust signal we have
976 # for preprints with no journal_ref or for venues OpenAlex
977 # doesn't index. Score is capped at 6 inside score_from_affiliations
978 # so institution alone never beats a real venue match.
979 affiliations = result.get("affiliations")
980 if affiliations: 980 ↛ 981line 980 didn't jump to line 981 because the condition on line 980 was never true
981 inst_score = dm.score_from_affiliations(affiliations)
982 if inst_score is not None:
983 logger.debug(
984 f"Tier 3.5 (Institution): '{journal_name}' → "
985 f"score {inst_score}/10 from institutions: "
986 f"{_format_affiliations(affiliations)}"
987 )
988 return inst_score, "institution"
990 # --- DB cache: check for cached LLM results before expensive tiers ---
991 # Tiers 1-3 use bundled data (instant, no caching needed).
992 # Only Tier 4 (LLM) results are expensive and worth caching.
993 # The predicate filters on quality_model so scores from a
994 # superseded LLM model miss the cache and re-score, and
995 # `cached.quality` is validated against VALID_QUALITY_SCORES to
996 # evict any pre-fix rows that stored an out-of-set value.
997 session_ctx = self.__db_session()
998 if session_ctx is not None:
999 try:
1000 with session_ctx as session:
1001 cached = (
1002 session.query(Journal)
1003 .filter_by(name=journal_name)
1004 .filter(Journal.score_source == "llm")
1005 .filter(
1006 Journal.quality_model
1007 == get_model_identifier(self.model)
1008 )
1009 .first()
1010 )
1011 if cached is not None: 1011 ↛ 1046line 1011 didn't jump to line 1046
1012 is_fresh = (
1013 time.time() - cached.quality_analysis_time
1014 ) < self.__quality_reanalysis_period.total_seconds()
1015 is_valid = cached.quality in VALID_QUALITY_SCORES
1017 if is_fresh and is_valid:
1018 logger.info(
1019 f"DB cache hit: '{journal_name}' → "
1020 f"score {cached.quality}/10 [cached LLM]"
1021 )
1022 # Cache hit on a Journal row written by the
1023 # LLM path — tag as "llm" so the dashboard
1024 # surfaces it as a Tier 4 verdict.
1025 return cached.quality, "llm"
1026 if is_fresh and not is_valid: 1026 ↛ 1046line 1026 didn't jump to line 1046
1027 logger.warning(
1028 f"Cached score {cached.quality} for "
1029 f"'{journal_name}' not in valid set "
1030 f"{sorted(VALID_QUALITY_SCORES)}; rescoring"
1031 )
1032 # Expired or invalid — fall through to re-evaluate
1033 except Exception:
1034 logger.exception(
1035 f"DB cache read failed for '{journal_name}', "
1036 f"continuing with LLM tiers"
1037 )
1039 # --- Tier 4: LLM analysis (last resort) ---
1040 # Off by default — bundled data covers 217K+ sources and this tier
1041 # adds significant latency (1 SearXNG search + 1 LLM call per
1042 # unknown journal). Users opt in via the
1043 # `search.journal_reputation.enable_llm_scoring` setting. The
1044 # __searxng_available and consecutive-failures checks below remain
1045 # as runtime safety nets even when the user enabled it.
1046 from ...config.search_config import get_setting_from_snapshot
1048 _enable_tier4 = bool(
1049 get_setting_from_snapshot(
1050 "search.journal_reputation.enable_llm_scoring",
1051 False,
1052 settings_snapshot=self.__settings_snapshot,
1053 )
1054 )
1056 # Tier 3.6: LLM-based name cleanup salvage. Gated behind the same
1057 # opt-in flag as Tier 4. Asks the LLM to canonicalise the name
1058 # (handles abbreviations and locations the regex can't), then
1059 # retries the cheap bundled tiers. This costs one extra LLM call
1060 # per unknown journal but can avoid Tier 4's full SearXNG search.
1061 if _enable_tier4:
1062 relabeled = self.__llm_clean_journal_name(journal_name)
1063 if relabeled and relabeled != journal_name: 1063 ↛ 1098line 1063 didn't jump to line 1098 because the condition on line 1063 was always true
1064 logger.debug(
1065 f"Tier 3.6 (LLM cleanup): '{journal_name}' → "
1066 f"'{relabeled}', retrying bundled tiers"
1067 )
1068 oa_retry = dm.lookup_openalex(name=relabeled)
1069 if oa_retry:
1070 h_idx = oa_retry.get("h_index")
1071 oa_doaj = oa_retry.get("is_in_doaj", False)
1072 oa_seal = (
1073 dm.has_doaj_seal(oa_retry.get("issn_l"))
1074 if oa_doaj
1075 else False
1076 )
1077 score = dm.derive_quality_score(
1078 h_index=h_idx,
1079 quartile=oa_retry.get("quartile"),
1080 is_in_doaj=oa_doaj,
1081 has_doaj_seal=oa_seal,
1082 source_type=oa_retry.get("type", "journal"),
1083 )
1084 if score is not None: 1084 ↛ 1098line 1084 didn't jump to line 1098 because the condition on line 1084 was always true
1085 logger.info(
1086 f"Tier 3.6 (LLM cleanup → OpenAlex): "
1087 f"'{journal_name}' (as '{relabeled}') → "
1088 f"score {score}/10"
1089 )
1090 # Tier 3.6 is a Tier 2 retry under a cleaned
1091 # name; the result is effectively Tier 2 data
1092 # and is NOT cached in the user DB (reference
1093 # DB lookups are already instant). Tagged
1094 # "openalex" — only the NAME came from the LLM,
1095 # not the score.
1096 return score, "openalex"
1098 if (
1099 _enable_tier4
1100 and self.__searxng_available
1101 and self.__searxng_failures() < 2
1102 ):
1103 try:
1104 quality = self.__analyze_journal_reputation(journal_name)
1105 self.__reset_searxng_failures()
1106 # DOAJ Seal bump: if we know the journal has the Seal,
1107 # give the LLM score a +1 (capped at 10). The Seal is a
1108 # strong independent OA signal the LLM can't see from
1109 # SearXNG snippets. After the bump, snap back to the
1110 # emitted-score set so the cache stays canonical.
1111 seal_bonus = False
1112 if issn and dm.has_doaj_seal(issn):
1113 bumped = min(quality + 1, 10)
1114 if bumped in VALID_QUALITY_SCORES:
1115 quality = bumped
1116 seal_bonus = True
1117 else:
1118 # Bump lands outside the canonical score set
1119 # (e.g. q=8 → 9, and 9 ∉ VALID_QUALITY_SCORES).
1120 # Log the skip so operators can see the Seal
1121 # had no effect instead of dropping silently.
1122 logger.debug(
1123 f"DOAJ Seal +1 bonus skipped for "
1124 f"'{journal_name}': {quality}+1={bumped} "
1125 f"not in VALID_QUALITY_SCORES "
1126 f"{sorted(VALID_QUALITY_SCORES)}"
1127 )
1128 self.__save_llm_score_to_db(name=journal_name, quality=quality)
1129 logger.debug(
1130 f"Tier 4 (LLM): '{journal_name}' → "
1131 f"score {quality}/10 "
1132 f"[via SearXNG + LLM analysis"
1133 f"{', +1 DOAJ Seal bonus' if seal_bonus else ''}]"
1134 )
1135 return quality, "llm"
1136 except Exception:
1137 failures = self.__bump_searxng_failures()
1138 logger.exception(
1139 f"Tier 4 failed for '{journal_name}'. "
1140 f"Consecutive failures: {failures}"
1141 )
1142 if failures >= 2:
1143 logger.warning(
1144 "Tier 4 disabled for remaining journals in "
1145 "this batch (2 consecutive failures)."
1146 )
1148 # --- Conference heuristic (for papers without DOI or OpenAlex match) ---
1149 # Guard: many high-tier journals start with "Proceedings of …"
1150 # (PNAS, Royal Society A/B, AMS, LMS, …). The bare `proceedings`
1151 # token in `_CONFERENCE_PATTERNS` would otherwise classify them
1152 # as Q3 conferences and throw away their real h-index. Skip the
1153 # heuristic for these — they fall through to the unknown-journal
1154 # score (3) and the user's threshold decides what to do.
1155 if journal_name.lower().lstrip().startswith("proceedings of "): 1155 ↛ 1156line 1155 didn't jump to line 1156 because the condition on line 1155 was never true
1156 logger.debug(
1157 f"Conference heuristic: skipped for '{journal_name}' "
1158 f"(starts with 'Proceedings of' — likely a journal, "
1159 f"not a conference)"
1160 )
1161 elif _is_likely_conference(journal_name): 1161 ↛ 1162line 1161 didn't jump to line 1162 because the condition on line 1161 was never true
1162 score = dm.derive_quality_score(source_type="conference")
1163 logger.debug(
1164 f"Conference heuristic: '{journal_name}' → "
1165 f"score {score}/10 (detected as conference by name pattern)"
1166 )
1167 return score, "conference"
1169 # No tier could score this journal — neither OpenAlex/DOAJ
1170 # venue match nor Tier 3.5 institution salvage produced a
1171 # signal. Score it as low-confidence (3) so the default
1172 # threshold (4) actually filters it out. Distinct from
1173 # predatory (1) — these are merely unknown, not blacklisted.
1174 affs_for_log = result.get("affiliations") or []
1175 logger.debug(
1176 f"No scoring data for '{journal_name}' — flagging as "
1177 f"low-confidence (score 3); tried institutions: "
1178 f"{_format_affiliations(affs_for_log)}"
1179 )
1180 return 3, "low_confidence"
1182 # ------------------------------------------------------------------
1183 # Main filter entry point
1184 # ------------------------------------------------------------------
1186 def filter_results(
1187 self, results: List[Dict], query: str, **kwargs
1188 ) -> List[Dict]:
1189 """Filter results by journal quality, with deduplication."""
1190 logger.info(
1191 f"Journal filter: processing {len(results)} results "
1192 f"(threshold={self.__threshold})"
1193 )
1194 # Fail-soft during a fresh install: if the reference DB file
1195 # isn't on disk yet, don't score anything. Every journal would
1196 # otherwise fall through to the "no scoring data" branch and
1197 # be marked as score 3, which is semantically wrong — we don't
1198 # know the journal is unknown, we just haven't loaded the data
1199 # yet. Tag each result with the QUALITY_PENDING sentinel so the
1200 # report renderer can show the user a helpful note pointing
1201 # them at /metrics/journals.
1202 #
1203 # We probe the DB file path directly rather than calling
1204 # ``data_manager.available`` — the latter has side effects
1205 # (triggers `_ensure_engine`, which tries to lazy-build the DB
1206 # and can block on an in-flight download for several minutes).
1207 # A cached engine on the data manager means either the file
1208 # was successfully opened earlier, or a test fixture has
1209 # injected an in-memory DB — either way we're good to score.
1210 try:
1211 from ...config.paths import get_journal_data_directory
1213 db_file = get_journal_data_directory() / "journal_quality.db"
1214 on_disk = db_file.exists() and db_file.stat().st_size > 0
1215 engine_cached = (
1216 getattr(self.__data_manager, "_engine", None) is not None
1217 )
1218 db_ready = on_disk or engine_cached
1219 logger.info(
1220 f"Journal filter: db_ready={db_ready} "
1221 f"(on_disk={on_disk}, engine_cached={engine_cached})"
1222 )
1223 except Exception:
1224 logger.exception(
1225 "Journal filter: db-ready probe raised; "
1226 "assuming DB not ready (pending)."
1227 )
1228 db_ready = False
1229 if not db_ready: 1229 ↛ 1230line 1229 didn't jump to line 1230 because the condition on line 1229 was never true
1230 from ...utilities.search_utilities import QUALITY_PENDING
1232 # Fire-and-forget the download in a daemon thread so the
1233 # pending-marker copy ("by the time you check, it may
1234 # already be done") is actually true. Without this the
1235 # filter would just tag every search "pending" forever
1236 # and nobody would ever fetch the data unless the user
1237 # clicked Download manually. The 30-second TTL cache in
1238 # ensure_journal_data prevents multiple concurrent filter
1239 # workers from all racing to spawn the same download.
1240 _start_background_journal_fetch()
1242 # Respect exclude_non_published — results without a venue
1243 # still get dropped in pending mode, same as in the full
1244 # scoring path. Only venued results carry the marker.
1245 out = []
1246 tagged = 0
1247 dropped = 0
1248 for r in results:
1249 if r.get("journal_ref"):
1250 r.setdefault("journal_quality", QUALITY_PENDING)
1251 out.append(r)
1252 tagged += 1
1253 elif not self.__exclude_non_published:
1254 out.append(r)
1255 else:
1256 dropped += 1
1257 logger.warning(
1258 f"Journal filter: reference DB not yet built — "
1259 f"tagged {tagged} result(s) with QUALITY_PENDING, "
1260 f"kept {len(out) - tagged} venueless, "
1261 f"dropped {dropped} (exclude_non_published). "
1262 f"Background download triggered if not already running."
1263 )
1264 return out
1266 # Initialize `filtered` outside the try so the predatory-safe
1267 # fallback in the except handler can always reference it even
1268 # if the crash happens before Pass-1 populates anything.
1269 filtered: list = []
1271 try:
1272 # Reset the per-thread fail-fast counter for each batch.
1273 # The counter lives in `threading.local()` so concurrent
1274 # callers on the same filter instance don't clobber each
1275 # other (see Bug A3 / parallel_search_engine.py:177).
1276 self.__reset_searxng_failures()
1278 # Pass 1: collect the richest metadata per journal (the result
1279 # with ISSN/source_id) so scoring uses the best available data.
1280 journal_best_result: Dict[str, Dict] = {}
1281 results_with_journals: list[tuple[Dict, str]] = []
1283 def _handle_no_venue(result: Dict) -> None:
1284 """Institution-salvage then exclude_non_published policy.
1286 If Tier 3.5 salvages a score from author affiliations,
1287 the result carries that numeric score. Otherwise we
1288 tag it with ``QUALITY_PREPRINT`` so the report renderer
1289 can show a "preprint — not in journal catalog" label
1290 instead of leaving the quality column ambiguously blank.
1291 """
1292 from ...utilities.search_utilities import QUALITY_PREPRINT
1294 affs = result.get("affiliations")
1295 if affs: 1295 ↛ 1296line 1295 didn't jump to line 1296 because the condition on line 1295 was never true
1296 inst_score = self.__data_manager.score_from_affiliations(
1297 affs
1298 )
1299 if (
1300 inst_score is not None
1301 and inst_score >= self.__threshold
1302 ):
1303 result["journal_quality"] = inst_score
1304 logger.debug(
1305 f"Tier 3.5 (Institution, no venue): "
1306 f"'{result.get('title', '')[:60]}' → "
1307 f"score {inst_score}/10 from institutions: "
1308 f"{_format_affiliations(affs)}"
1309 )
1310 filtered.append(result)
1311 return
1312 if not self.__exclude_non_published:
1313 result.setdefault("journal_quality", QUALITY_PREPRINT)
1314 filtered.append(result)
1316 # Per-batch cache for journal name cleaning — avoids redundant
1317 # regex + abbreviation DB lookups when multiple results come
1318 # from the same raw journal_ref string (common in OpenAlex
1319 # result batches).
1320 _name_cache: Dict[str, str] = {}
1322 for result in results:
1323 journal_ref = result.get("journal_ref")
1324 # Strip whitespace-only refs — " " is truthy but has no
1325 # meaningful content for the filter.
1326 if isinstance(journal_ref, str):
1327 journal_ref = journal_ref.strip()
1328 if not journal_ref:
1329 _handle_no_venue(result)
1330 continue
1332 # Use per-batch cache to skip redundant cleaning for
1333 # repeated raw journal_ref values in the same batch.
1334 clean_name = _name_cache.get(journal_ref)
1335 if clean_name is None: 1335 ↛ 1341line 1335 didn't jump to line 1341 because the condition on line 1335 was always true
1336 clean_name = self.__clean_journal_name(journal_ref)
1337 _name_cache[journal_ref] = clean_name
1338 # Cleanup can reduce a volume/page-only ref to "". Treat
1339 # that the same as "no venue" rather than bucketing all
1340 # affected results under a degenerate empty-string key.
1341 if not clean_name: 1341 ↛ 1342line 1341 didn't jump to line 1342 because the condition on line 1341 was never true
1342 _handle_no_venue(result)
1343 continue
1345 results_with_journals.append((result, clean_name))
1347 if clean_name not in journal_best_result:
1348 journal_best_result[clean_name] = result
1349 else:
1350 prev = journal_best_result[clean_name]
1351 if ( 1351 ↛ 1357line 1351 didn't jump to line 1357 because the condition on line 1351 was never true
1352 not prev.get("issn")
1353 and not prev.get("openalex_source_id")
1354 ) and (
1355 result.get("issn") or result.get("openalex_source_id")
1356 ):
1357 journal_best_result[clean_name] = result
1359 # Pass 2: score each unique journal once, then filter
1360 journal_scores: Dict[str, tuple[int | None, str | None]] = {}
1362 for result, clean_name in results_with_journals:
1363 if clean_name not in journal_scores:
1364 journal_scores[clean_name] = self.__score_journal(
1365 clean_name, journal_best_result[clean_name]
1366 )
1368 score, source_tag = journal_scores[clean_name]
1370 if score is None:
1371 # Predatory → auto-remove. Include original journal_ref
1372 # and URL in the log so false-positive reports can be
1373 # debugged without re-running the query.
1374 logger.warning(
1375 f"Auto-removed predatory: "
1376 f"title='{result.get('title', '')[:80]}' "
1377 f"journal_ref='{result.get('journal_ref', '')!r}' "
1378 f"cleaned='{clean_name}' "
1379 f"url={result.get('link') or result.get('url') or '—'}"
1380 )
1381 continue
1383 if score >= self.__threshold:
1384 result["journal_quality"] = score
1385 # Cleaned name that keyed the successful score —
1386 # persisted on Paper.container_title so the dashboard
1387 # can GROUP BY it and enrich from the reference DB.
1388 result["journal_name_matched"] = clean_name
1389 # Source tag — propagates to the rendered quality
1390 # tag in the research output. Nothing is frozen on
1391 # the Paper row; the dashboard resolves current
1392 # quality live from journals.quality (Tier 4) or
1393 # the bundled reference DB (Tier 1-3).
1394 result["journal_quality_source"] = source_tag
1395 filtered.append(result)
1397 predatory_count = sum(
1398 1 for s, _ in journal_scores.values() if s is None
1399 )
1400 passed_count = sum(
1401 1
1402 for s, _ in journal_scores.values()
1403 if s is not None and s >= self.__threshold
1404 )
1405 below_count = sum(
1406 1
1407 for s, _ in journal_scores.values()
1408 if s is not None and s < self.__threshold
1409 )
1410 logger.info(
1411 f"Journal quality filter: {len(results)} → "
1412 f"{len(filtered)} results | "
1413 f"{len(journal_scores)} unique journals scored | "
1414 f"{passed_count} passed, {below_count} below threshold, "
1415 f"{predatory_count} predatory removed"
1416 )
1417 return filtered
1419 except Exception:
1420 # Safety net: a filter crash should not kill the entire search,
1421 # but it MUST NOT re-admit predatory journals either.
1422 # `filtered` is predatory-free by construction (the pass-2 loop
1423 # `continue`s on predatory scores). Returning `results` — the
1424 # original unfiltered list — would leak predatory sources that
1425 # Tier 1 had already caught. Prefer losing in-flight non-
1426 # predatory results over breaking the predatory-removal
1427 # safety contract. Logged at ERROR so the root cause surfaces.
1428 logger.exception(
1429 "Journal quality filtering failed — returning partial "
1430 "(predatory-free) results. This is a bug that should be "
1431 "investigated."
1432 )
1433 return filtered