Coverage for src / local_deep_research / advanced_search_system / filters / journal_reputation_filter.py: 85%
110 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import time
2from datetime import timedelta
3from typing import Any, Dict, List, Optional
5from langchain_core.language_models.chat_models import BaseChatModel
6from loguru import logger
7from methodtools import lru_cache
8from sqlalchemy.orm import Session
10from ...config.llm_config import get_llm
11from ...database.models import Journal
12from ...database.session_context import get_user_db_session
13from ...search_system import AdvancedSearchSystem
14from ...utilities.thread_context import get_search_context
15from ...web_search_engines.search_engine_factory import create_search_engine
16from .base_filter import BaseFilter
19class JournalFilterError(Exception):
20 """
21 Custom exception for errors related to journal filtering.
22 """
25class JournalReputationFilter(BaseFilter):
26 """
27 A filter for academic results that considers the reputation of journals.
29 Note that this filter requires SearXNG to be available in order to work.
30 """
32 def __init__(
33 self,
34 model: BaseChatModel | None = None,
35 reliability_threshold: int | None = None,
36 max_context: int | None = None,
37 exclude_non_published: bool | None = None,
38 quality_reanalysis_period: timedelta | None = None,
39 settings_snapshot: Dict[str, Any] | None = None,
40 ):
41 """
42 Args:
43 model: The LLM model to use for analysis.
44 reliability_threshold: The filter scores journal reliability on a
45 scale of 1-10. Results from any journal with a reliability
46 below this threshold will be culled. Will be read from the
47 settings if not specified.
48 max_context: The maximum number of characters to feed into the
49 LLM when assessing journal reliability.
50 exclude_non_published: If true, it will exclude any results that
51 don't have an associated journal publication.
52 quality_reanalysis_period: Period at which to update journal
53 quality assessments.
54 settings_snapshot: Settings snapshot for thread context.
56 """
57 super().__init__(model)
59 if self.model is None:
60 self.model = get_llm()
62 self.__threshold = reliability_threshold
63 if self.__threshold is None:
64 # Import here to avoid circular import
65 from ...config.search_config import get_setting_from_snapshot
67 self.__threshold = int(
68 get_setting_from_snapshot(
69 "search.journal_reputation.threshold",
70 4,
71 settings_snapshot=settings_snapshot,
72 )
73 )
74 self.__max_context = max_context
75 if self.__max_context is None:
76 self.__max_context = int(
77 get_setting_from_snapshot(
78 "search.journal_reputation.max_context",
79 3000,
80 settings_snapshot=settings_snapshot,
81 )
82 )
83 self.__exclude_non_published = exclude_non_published
84 if self.__exclude_non_published is None:
85 self.__exclude_non_published = bool(
86 get_setting_from_snapshot(
87 "search.journal_reputation.exclude_non_published",
88 False,
89 settings_snapshot=settings_snapshot,
90 )
91 )
92 self.__quality_reanalysis_period = quality_reanalysis_period
93 if self.__quality_reanalysis_period is None:
94 self.__quality_reanalysis_period = timedelta(
95 days=int(
96 get_setting_from_snapshot(
97 "search.journal_reputation.reanalysis_period",
98 365,
99 settings_snapshot=settings_snapshot,
100 )
101 )
102 )
104 # Store settings_snapshot for later use
105 self.__settings_snapshot = settings_snapshot
107 # SearXNG is required so we can search the open web for reputational
108 # information.
109 self.__engine = create_search_engine(
110 "searxng", llm=self.model, settings_snapshot=settings_snapshot
111 )
112 if self.__engine is None:
113 raise JournalFilterError("SearXNG initialization failed.")
115 @classmethod
116 def create_default(
117 cls,
118 model: BaseChatModel | None = None,
119 *,
120 engine_name: str,
121 settings_snapshot: Dict[str, Any] | None = None,
122 ) -> Optional["JournalReputationFilter"]:
123 """
124 Initializes a default configuration of the filter based on the settings.
126 Args:
127 model: Explicitly specify the LLM to use.
128 engine_name: The name of the search engine. Will be used to check
129 the enablement status for that engine.
130 settings_snapshot: Settings snapshot for thread context.
132 Returns:
133 The filter that it created, or None if filtering is disabled in
134 the settings, or misconfigured.
136 """
137 # Import here to avoid circular import
138 from ...config.search_config import get_setting_from_snapshot
140 if not bool(
141 get_setting_from_snapshot(
142 f"search.engine.web.{engine_name}.journal_reputation.enabled",
143 True,
144 settings_snapshot=settings_snapshot,
145 )
146 ):
147 return None
149 try:
150 # Initialize the filter with default settings.
151 return JournalReputationFilter(
152 model=model, settings_snapshot=settings_snapshot
153 )
154 except JournalFilterError:
155 logger.exception(
156 "SearXNG is not configured, but is required for "
157 "journal reputation filtering. Disabling filtering."
158 )
159 return None
161 @staticmethod
162 def __db_session() -> Session:
163 """
164 Returns:
165 The database session to use.
167 """
168 context = get_search_context()
169 username = context.get("username")
170 password = context.get("user_password")
172 return get_user_db_session(username=username, password=password)
174 def __make_search_system(self) -> AdvancedSearchSystem:
175 """
176 Creates a new `AdvancedSearchSystem` instance.
178 Returns:
179 The system it created.
181 """
182 return AdvancedSearchSystem(
183 llm=self.model,
184 search=self.__engine,
185 # We clamp down on the default iterations and questions for speed.
186 max_iterations=1,
187 questions_per_iteration=3,
188 settings_snapshot=self.__settings_snapshot,
189 )
191 @lru_cache(maxsize=1024)
192 def __analyze_journal_reputation(self, journal_name: str) -> int:
193 """
194 Analyzes the reputation of a particular journal.
196 Args:
197 journal_name: The name of the journal.
199 Returns:
200 The reputation of the journal, on a scale from 1-10.
202 """
203 logger.info(f"Analyzing reputation of journal '{journal_name}'...")
205 # Perform a search for information about this journal.
206 journal_info = self.__make_search_system().analyze_topic(
207 f'Assess the reputability and reliability of the journal "'
208 f'{journal_name}", with a particular focus on its quartile '
209 f"ranking and peer review status. Be sure to specify the journal "
210 f"name in any generated questions."
211 )
212 journal_info = "\n".join(
213 [f["content"] for f in journal_info["findings"]]
214 )
215 logger.debug(f"Received raw info about journal: {journal_info}")
217 # Have the LLM assess the reliability based on this information.
218 prompt = f"""
219 You are a research assistant helping to assess the reliability and
220 reputability of scientific journals. A reputable journal should be
221 peer-reviewed, not predatory, and high-impact. Please review the
222 following information on the journal "{journal_name}" and output a
223 reputability score between 1 and 10, where 1-3 is not reputable and
224 probably predatory, 4-6 is reputable but low-impact (Q2 or Q3),
225 and 7-10 is reputable Q1 journals. Only output the number, do not
226 provide any explanation or other output.
228 JOURNAL INFORMATION:
230 {journal_info}
231 """
232 if len(prompt) > self.__max_context: 232 ↛ 234line 232 didn't jump to line 234 because the condition on line 232 was never true
233 # If the prompt is too long, truncate it to fit within the max context size.
234 prompt = prompt[: self.__max_context] + "..."
236 # Generate a response from the LLM model.
237 response = self.model.invoke(prompt).text()
238 logger.debug(f"Got raw LLM response: {response}")
240 # Extract the score from the response.
241 try:
242 reputation_score = int(response.strip())
243 except ValueError:
244 logger.exception(
245 "Failed to parse reputation score from LLM response."
246 )
247 raise ValueError(
248 "Failed to parse reputation score from LLM response."
249 )
251 return max(min(reputation_score, 10), 1)
253 def __add_journal_to_db(self, *, name: str, quality: int) -> None:
254 """
255 Saves the journal quality information to the database.
257 Args:
258 name: The name of the journal.
259 quality: The quality assessment for the journal.
261 """
262 with self.__db_session() as db_session:
263 journal = db_session.query(Journal).filter_by(name=name).first()
264 if journal is not None:
265 journal.quality = quality
266 journal.quality_model = self.model.name
267 journal.quality_analysis_time = int(time.time())
268 else:
269 journal = Journal(
270 name=name,
271 quality=quality,
272 quality_model=self.model.name,
273 quality_analysis_time=int(time.time()),
274 )
275 db_session.add(journal)
277 db_session.commit()
279 def __clean_journal_name(self, journal_name: str) -> str:
280 """
281 Cleans up the name of a journal to remove any extraneous information.
282 This is mostly to make caching more effective.
284 Args:
285 journal_name: The raw name of the journal.
287 Returns:
288 The cleaned name.
290 """
291 logger.debug(f"Cleaning raw journal name: {journal_name}")
293 prompt = f"""
294 Clean up the following journal or conference name:
296 "{journal_name}"
298 Remove any references to volumes, pages, months, or years. Expand
299 abbreviations if possible. For conferences, remove locations. Only
300 output the clean name, do not provide any explanation or other output.
301 """
303 response = self.model.invoke(prompt).text()
304 return response.strip()
306 def __check_result(self, result: Dict[str, Any]) -> bool:
307 """
308 Performs a search to determine the reputability of a result journal..
310 Args:
311 result: The result to check.
313 Returns:
314 True if the journal is reputable or if it couldn't determine a
315 reputability score, false otherwise.
317 """
318 journal_name = result.get("journal_ref")
319 if journal_name is None:
320 logger.debug(
321 f"Result {result.get('title')} has no associated "
322 f"journal, not evaluating reputation."
323 )
324 return not self.__exclude_non_published
325 journal_name = self.__clean_journal_name(journal_name)
327 # Check the database first.
328 with self.__db_session() as session:
329 journal = (
330 session.query(Journal).filter_by(name=journal_name).first()
331 )
332 if ( 332 ↛ 343line 332 didn't jump to line 343
333 journal is not None
334 and (time.time() - journal.quality_analysis_time)
335 < self.__quality_reanalysis_period.total_seconds()
336 ):
337 logger.debug(
338 f"Found existing reputation for {journal_name} in database."
339 )
340 return journal.quality >= self.__threshold
342 # Evaluate reputation.
343 try:
344 quality = self.__analyze_journal_reputation(journal_name)
345 # Save to the database.
346 self.__add_journal_to_db(name=journal_name, quality=quality)
347 return quality >= self.__threshold
348 except ValueError:
349 # The LLM behaved weirdly. In this case, we will just assume it's
350 # okay.
351 return True
353 def filter_results(
354 self, results: List[Dict], query: str, **kwargs
355 ) -> List[Dict]:
356 try:
357 return list(filter(self.__check_result, results))
358 except Exception:
359 logger.exception("Journal quality filtering failed")
360 return results