Coverage for src / local_deep_research / advanced_search_system / filters / journal_reputation_filter.py: 88%
120 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1import time
2from datetime import timedelta
3from typing import Any, Dict, List, Optional
5from langchain_core.language_models.chat_models import BaseChatModel
6from loguru import logger
7from methodtools import lru_cache
8from sqlalchemy.orm import Session
10from ...config.llm_config import get_llm
11from ...database.models import Journal
12from ...database.session_context import get_user_db_session
13from ...search_system import AdvancedSearchSystem
14from ...utilities.resource_utils import safe_close
15from ...utilities.thread_context import get_search_context
16from ...web_search_engines.search_engine_factory import create_search_engine
17from .base_filter import BaseFilter
20class JournalFilterError(Exception):
21 """
22 Custom exception for errors related to journal filtering.
23 """
26class JournalReputationFilter(BaseFilter):
27 """
28 A filter for academic results that considers the reputation of journals.
30 Note that this filter requires SearXNG to be available in order to work.
31 """
33 def __init__(
34 self,
35 model: BaseChatModel | None = None,
36 reliability_threshold: int | None = None,
37 max_context: int | None = None,
38 exclude_non_published: bool | None = None,
39 quality_reanalysis_period: timedelta | None = None,
40 settings_snapshot: Dict[str, Any] | None = None,
41 ):
42 """
43 Args:
44 model: The LLM model to use for analysis.
45 reliability_threshold: The filter scores journal reliability on a
46 scale of 1-10. Results from any journal with a reliability
47 below this threshold will be culled. Will be read from the
48 settings if not specified.
49 max_context: The maximum number of characters to feed into the
50 LLM when assessing journal reliability.
51 exclude_non_published: If true, it will exclude any results that
52 don't have an associated journal publication.
53 quality_reanalysis_period: Period at which to update journal
54 quality assessments.
55 settings_snapshot: Settings snapshot for thread context.
57 """
58 super().__init__(model)
60 self._owns_llm = self.model is None
61 if self.model is None:
62 self.model = get_llm()
64 self.__threshold = reliability_threshold
65 if self.__threshold is None:
66 # Import here to avoid circular import
67 from ...config.search_config import get_setting_from_snapshot
69 self.__threshold = int(
70 get_setting_from_snapshot(
71 "search.journal_reputation.threshold",
72 4,
73 settings_snapshot=settings_snapshot,
74 )
75 )
76 self.__max_context = max_context
77 if self.__max_context is None:
78 self.__max_context = int(
79 get_setting_from_snapshot(
80 "search.journal_reputation.max_context",
81 3000,
82 settings_snapshot=settings_snapshot,
83 )
84 )
85 self.__exclude_non_published = exclude_non_published
86 if self.__exclude_non_published is None:
87 self.__exclude_non_published = bool(
88 get_setting_from_snapshot(
89 "search.journal_reputation.exclude_non_published",
90 False,
91 settings_snapshot=settings_snapshot,
92 )
93 )
94 self.__quality_reanalysis_period = quality_reanalysis_period
95 if self.__quality_reanalysis_period is None:
96 self.__quality_reanalysis_period = timedelta(
97 days=int(
98 get_setting_from_snapshot(
99 "search.journal_reputation.reanalysis_period",
100 365,
101 settings_snapshot=settings_snapshot,
102 )
103 )
104 )
106 # Store settings_snapshot for later use
107 self.__settings_snapshot = settings_snapshot
109 # SearXNG is required so we can search the open web for reputational
110 # information.
111 self.__engine = create_search_engine(
112 "searxng", llm=self.model, settings_snapshot=settings_snapshot
113 )
114 # create_search_engine() can return a non-None engine whose backing
115 # service is unreachable (e.g. SearXNG is down), so we also verify
116 # the engine reports itself as available.
117 if self.__engine is None or not getattr(
118 self.__engine, "is_available", False
119 ):
120 raise JournalFilterError(
121 "SearXNG initialization failed or not available."
122 )
124 def close(self) -> None:
125 """Close the SearXNG engine and LLM client."""
126 if hasattr(self, "_JournalReputationFilter__engine"): 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true
127 safe_close(self.__engine, "SearXNG engine")
128 if self._owns_llm: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 safe_close(self.model, "journal filter LLM")
131 @classmethod
132 def create_default(
133 cls,
134 model: BaseChatModel | None = None,
135 *,
136 engine_name: str,
137 settings_snapshot: Dict[str, Any] | None = None,
138 ) -> Optional["JournalReputationFilter"]:
139 """
140 Initializes a default configuration of the filter based on the settings.
142 Args:
143 model: Explicitly specify the LLM to use.
144 engine_name: The name of the search engine. Will be used to check
145 the enablement status for that engine.
146 settings_snapshot: Settings snapshot for thread context.
148 Returns:
149 The filter that it created, or None if filtering is disabled in
150 the settings, or misconfigured.
152 """
153 # Import here to avoid circular import
154 from ...config.search_config import get_setting_from_snapshot
156 if not bool(
157 get_setting_from_snapshot(
158 f"search.engine.web.{engine_name}.journal_reputation.enabled",
159 True,
160 settings_snapshot=settings_snapshot,
161 )
162 ):
163 return None
165 try:
166 # Initialize the filter with default settings.
167 return JournalReputationFilter(
168 model=model, settings_snapshot=settings_snapshot
169 )
170 except JournalFilterError:
171 logger.exception(
172 "SearXNG is not configured, but is required for "
173 "journal reputation filtering. Disabling filtering."
174 )
175 return None
177 @staticmethod
178 def __db_session() -> Session:
179 """
180 Returns:
181 The database session to use.
183 """
184 context = get_search_context()
185 username = context.get("username")
186 password = context.get("user_password")
188 return get_user_db_session(username=username, password=password)
190 def __make_search_system(self) -> AdvancedSearchSystem:
191 """
192 Creates a new `AdvancedSearchSystem` instance.
194 Returns:
195 The system it created.
197 """
198 return AdvancedSearchSystem(
199 llm=self.model,
200 search=self.__engine,
201 # We clamp down on the default iterations and questions for speed.
202 max_iterations=1,
203 questions_per_iteration=3,
204 settings_snapshot=self.__settings_snapshot,
205 )
207 @lru_cache(maxsize=1024)
208 def __analyze_journal_reputation(self, journal_name: str) -> int:
209 """
210 Analyzes the reputation of a particular journal.
212 Args:
213 journal_name: The name of the journal.
215 Returns:
216 The reputation of the journal, on a scale from 1-10.
218 """
219 logger.info(f"Analyzing reputation of journal '{journal_name}'...")
221 # Perform a search for information about this journal.
222 search_system = self.__make_search_system()
223 try:
224 journal_info = search_system.analyze_topic(
225 f'Assess the reputability and reliability of the journal "'
226 f'{journal_name}", with a particular focus on its quartile '
227 f"ranking and peer review status. Be sure to specify the journal "
228 f"name in any generated questions."
229 )
230 finally:
231 safe_close(search_system, "journal search system")
232 journal_info = "\n".join(
233 [f["content"] for f in journal_info["findings"]]
234 )
235 logger.debug(f"Received raw info about journal: {journal_info}")
237 # Have the LLM assess the reliability based on this information.
238 prompt = f"""
239 You are a research assistant helping to assess the reliability and
240 reputability of scientific journals. A reputable journal should be
241 peer-reviewed, not predatory, and high-impact. Please review the
242 following information on the journal "{journal_name}" and output a
243 reputability score between 1 and 10, where 1-3 is not reputable and
244 probably predatory, 4-6 is reputable but low-impact (Q2 or Q3),
245 and 7-10 is reputable Q1 journals. Only output the number, do not
246 provide any explanation or other output.
248 JOURNAL INFORMATION:
250 {journal_info}
251 """
252 if len(prompt) > self.__max_context: 252 ↛ 254line 252 didn't jump to line 254 because the condition on line 252 was never true
253 # If the prompt is too long, truncate it to fit within the max context size.
254 prompt = prompt[: self.__max_context] + "..."
256 # Generate a response from the LLM model.
257 response = self.model.invoke(prompt).content
258 logger.debug(f"Got raw LLM response: {response}")
260 # Extract the score from the response.
261 try:
262 reputation_score = int(response.strip())
263 except ValueError:
264 logger.exception(
265 "Failed to parse reputation score from LLM response."
266 )
267 raise ValueError(
268 "Failed to parse reputation score from LLM response."
269 )
271 return max(min(reputation_score, 10), 1)
273 def __add_journal_to_db(self, *, name: str, quality: int) -> None:
274 """
275 Saves the journal quality information to the database.
277 Args:
278 name: The name of the journal.
279 quality: The quality assessment for the journal.
281 """
282 with self.__db_session() as db_session:
283 journal = db_session.query(Journal).filter_by(name=name).first()
284 if journal is not None:
285 journal.quality = quality
286 journal.quality_model = self.model.name
287 journal.quality_analysis_time = int(time.time())
288 else:
289 journal = Journal(
290 name=name,
291 quality=quality,
292 quality_model=self.model.name,
293 quality_analysis_time=int(time.time()),
294 )
295 db_session.add(journal)
297 db_session.commit()
299 def __clean_journal_name(self, journal_name: str) -> str:
300 """
301 Cleans up the name of a journal to remove any extraneous information.
302 This is mostly to make caching more effective.
304 Args:
305 journal_name: The raw name of the journal.
307 Returns:
308 The cleaned name.
310 """
311 logger.debug(f"Cleaning raw journal name: {journal_name}")
313 prompt = f"""
314 Clean up the following journal or conference name:
316 "{journal_name}"
318 Remove any references to volumes, pages, months, or years. Expand
319 abbreviations if possible. For conferences, remove locations. Only
320 output the clean name, do not provide any explanation or other output.
321 """
323 response = self.model.invoke(prompt).content
324 return response.strip()
326 def __check_result(self, result: Dict[str, Any]) -> bool:
327 """
328 Performs a search to determine the reputability of a result journal..
330 Args:
331 result: The result to check.
333 Returns:
334 True if the journal is reputable or if it couldn't determine a
335 reputability score, false otherwise.
337 """
338 journal_name = result.get("journal_ref")
339 if journal_name is None:
340 logger.debug(
341 f"Result {result.get('title')} has no associated "
342 f"journal, not evaluating reputation."
343 )
344 return not self.__exclude_non_published
345 journal_name = self.__clean_journal_name(journal_name)
347 # Check the database first.
348 with self.__db_session() as session:
349 journal = (
350 session.query(Journal).filter_by(name=journal_name).first()
351 )
352 if (
353 journal is not None
354 and (time.time() - journal.quality_analysis_time)
355 < self.__quality_reanalysis_period.total_seconds()
356 ):
357 logger.debug(
358 f"Found existing reputation for {journal_name} in database."
359 )
360 return journal.quality >= self.__threshold
362 # Evaluate reputation.
363 try:
364 quality = self.__analyze_journal_reputation(journal_name)
365 # Save to the database.
366 self.__add_journal_to_db(name=journal_name, quality=quality)
367 return quality >= self.__threshold
368 except ValueError:
369 # The LLM behaved weirdly. In this case, we will just assume it's
370 # okay.
371 return True
373 def filter_results(
374 self, results: List[Dict], query: str, **kwargs
375 ) -> List[Dict]:
376 try:
377 return list(filter(self.__check_result, results))
378 except Exception:
379 logger.exception("Journal quality filtering failed")
380 return results