Coverage for src / local_deep_research / advanced_search_system / filters / journal_reputation_filter.py: 22%
111 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import time
2import traceback
3from datetime import timedelta
4from typing import Any, Dict, List, Optional
6from langchain_core.language_models.chat_models import BaseChatModel
7from loguru import logger
8from methodtools import lru_cache
9from sqlalchemy.orm import Session
11from ...config.llm_config import get_llm
12from ...database.models import Journal
13from ...database.session_context import get_user_db_session
14from ...search_system import AdvancedSearchSystem
15from ...utilities.thread_context import get_search_context
16from ...web_search_engines.search_engine_factory import create_search_engine
17from .base_filter import BaseFilter
20class JournalFilterError(Exception):
21 """
22 Custom exception for errors related to journal filtering.
23 """
26class JournalReputationFilter(BaseFilter):
27 """
28 A filter for academic results that considers the reputation of journals.
30 Note that this filter requires SearXNG to be available in order to work.
31 """
33 def __init__(
34 self,
35 model: BaseChatModel | None = None,
36 reliability_threshold: int | None = None,
37 max_context: int | None = None,
38 exclude_non_published: bool | None = None,
39 quality_reanalysis_period: timedelta | None = None,
40 settings_snapshot: Dict[str, Any] | None = None,
41 ):
42 """
43 Args:
44 model: The LLM model to use for analysis.
45 reliability_threshold: The filter scores journal reliability on a
46 scale of 1-10. Results from any journal with a reliability
47 below this threshold will be culled. Will be read from the
48 settings if not specified.
49 max_context: The maximum number of characters to feed into the
50 LLM when assessing journal reliability.
51 exclude_non_published: If true, it will exclude any results that
52 don't have an associated journal publication.
53 quality_reanalysis_period: Period at which to update journal
54 quality assessments.
55 settings_snapshot: Settings snapshot for thread context.
57 """
58 super().__init__(model)
60 if self.model is None:
61 self.model = get_llm()
63 self.__threshold = reliability_threshold
64 if self.__threshold is None:
65 # Import here to avoid circular import
66 from ...config.search_config import get_setting_from_snapshot
68 self.__threshold = int(
69 get_setting_from_snapshot(
70 "search.journal_reputation.threshold",
71 4,
72 settings_snapshot=settings_snapshot,
73 )
74 )
75 self.__max_context = max_context
76 if self.__max_context is None:
77 self.__max_context = int(
78 get_setting_from_snapshot(
79 "search.journal_reputation.max_context",
80 3000,
81 settings_snapshot=settings_snapshot,
82 )
83 )
84 self.__exclude_non_published = exclude_non_published
85 if self.__exclude_non_published is None:
86 self.__exclude_non_published = bool(
87 get_setting_from_snapshot(
88 "search.journal_reputation.exclude_non_published",
89 False,
90 settings_snapshot=settings_snapshot,
91 )
92 )
93 self.__quality_reanalysis_period = quality_reanalysis_period
94 if self.__quality_reanalysis_period is None:
95 self.__quality_reanalysis_period = timedelta(
96 days=int(
97 get_setting_from_snapshot(
98 "search.journal_reputation.reanalysis_period",
99 365,
100 settings_snapshot=settings_snapshot,
101 )
102 )
103 )
105 # Store settings_snapshot for later use
106 self.__settings_snapshot = settings_snapshot
108 # SearXNG is required so we can search the open web for reputational
109 # information.
110 self.__engine = create_search_engine(
111 "searxng", llm=self.model, settings_snapshot=settings_snapshot
112 )
113 if self.__engine is None:
114 raise JournalFilterError("SearXNG initialization failed.")
116 @classmethod
117 def create_default(
118 cls,
119 model: BaseChatModel | None = None,
120 *,
121 engine_name: str,
122 settings_snapshot: Dict[str, Any] | None = None,
123 ) -> Optional["JournalReputationFilter"]:
124 """
125 Initializes a default configuration of the filter based on the settings.
127 Args:
128 model: Explicitly specify the LLM to use.
129 engine_name: The name of the search engine. Will be used to check
130 the enablement status for that engine.
131 settings_snapshot: Settings snapshot for thread context.
133 Returns:
134 The filter that it created, or None if filtering is disabled in
135 the settings, or misconfigured.
137 """
138 # Import here to avoid circular import
139 from ...config.search_config import get_setting_from_snapshot
141 if not bool(
142 get_setting_from_snapshot(
143 f"search.engine.web.{engine_name}.journal_reputation.enabled",
144 True,
145 settings_snapshot=settings_snapshot,
146 )
147 ):
148 return None
150 try:
151 # Initialize the filter with default settings.
152 return JournalReputationFilter(
153 model=model, settings_snapshot=settings_snapshot
154 )
155 except JournalFilterError:
156 logger.exception(
157 "SearXNG is not configured, but is required for "
158 "journal reputation filtering. Disabling filtering."
159 )
160 return None
162 @staticmethod
163 def __db_session() -> Session:
164 """
165 Returns:
166 The database session to use.
168 """
169 context = get_search_context()
170 username = context.get("username")
171 password = context.get("user_password")
173 return get_user_db_session(username=username, password=password)
175 def __make_search_system(self) -> AdvancedSearchSystem:
176 """
177 Creates a new `AdvancedSearchSystem` instance.
179 Returns:
180 The system it created.
182 """
183 return AdvancedSearchSystem(
184 llm=self.model,
185 search=self.__engine,
186 # We clamp down on the default iterations and questions for speed.
187 max_iterations=1,
188 questions_per_iteration=3,
189 settings_snapshot=self.__settings_snapshot,
190 )
192 @lru_cache(maxsize=1024)
193 def __analyze_journal_reputation(self, journal_name: str) -> int:
194 """
195 Analyzes the reputation of a particular journal.
197 Args:
198 journal_name: The name of the journal.
200 Returns:
201 The reputation of the journal, on a scale from 1-10.
203 """
204 logger.info(f"Analyzing reputation of journal '{journal_name}'...")
206 # Perform a search for information about this journal.
207 journal_info = self.__make_search_system().analyze_topic(
208 f'Assess the reputability and reliability of the journal "'
209 f'{journal_name}", with a particular focus on its quartile '
210 f"ranking and peer review status. Be sure to specify the journal "
211 f"name in any generated questions."
212 )
213 journal_info = "\n".join(
214 [f["content"] for f in journal_info["findings"]]
215 )
216 logger.debug(f"Received raw info about journal: {journal_info}")
218 # Have the LLM assess the reliability based on this information.
219 prompt = f"""
220 You are a research assistant helping to assess the reliability and
221 reputability of scientific journals. A reputable journal should be
222 peer-reviewed, not predatory, and high-impact. Please review the
223 following information on the journal "{journal_name}" and output a
224 reputability score between 1 and 10, where 1-3 is not reputable and
225 probably predatory, 4-6 is reputable but low-impact (Q2 or Q3),
226 and 7-10 is reputable Q1 journals. Only output the number, do not
227 provide any explanation or other output.
229 JOURNAL INFORMATION:
231 {journal_info}
232 """
233 if len(prompt) > self.__max_context:
234 # If the prompt is too long, truncate it to fit within the max context size.
235 prompt = prompt[: self.__max_context] + "..."
237 # Generate a response from the LLM model.
238 response = self.model.invoke(prompt).text()
239 logger.debug(f"Got raw LLM response: {response}")
241 # Extract the score from the response.
242 try:
243 reputation_score = int(response.strip())
244 except ValueError:
245 logger.exception(
246 "Failed to parse reputation score from LLM response."
247 )
248 raise ValueError(
249 "Failed to parse reputation score from LLM response."
250 )
252 return max(min(reputation_score, 10), 1)
254 def __add_journal_to_db(self, *, name: str, quality: int) -> None:
255 """
256 Saves the journal quality information to the database.
258 Args:
259 name: The name of the journal.
260 quality: The quality assessment for the journal.
262 """
263 with self.__db_session() as db_session:
264 journal = db_session.query(Journal).filter_by(name=name).first()
265 if journal is not None:
266 journal.quality = quality
267 journal.quality_model = self.model.name
268 journal.quality_analysis_time = int(time.time())
269 else:
270 journal = Journal(
271 name=name,
272 quality=quality,
273 quality_model=self.model.name,
274 quality_analysis_time=int(time.time()),
275 )
276 db_session.add(journal)
278 db_session.commit()
280 def __clean_journal_name(self, journal_name: str) -> str:
281 """
282 Cleans up the name of a journal to remove any extraneous information.
283 This is mostly to make caching more effective.
285 Args:
286 journal_name: The raw name of the journal.
288 Returns:
289 The cleaned name.
291 """
292 logger.debug(f"Cleaning raw journal name: {journal_name}")
294 prompt = f"""
295 Clean up the following journal or conference name:
297 "{journal_name}"
299 Remove any references to volumes, pages, months, or years. Expand
300 abbreviations if possible. For conferences, remove locations. Only
301 output the clean name, do not provide any explanation or other output.
302 """
304 response = self.model.invoke(prompt).text()
305 return response.strip()
307 def __check_result(self, result: Dict[str, Any]) -> bool:
308 """
309 Performs a search to determine the reputability of a result journal..
311 Args:
312 result: The result to check.
314 Returns:
315 True if the journal is reputable or if it couldn't determine a
316 reputability score, false otherwise.
318 """
319 journal_name = result.get("journal_ref")
320 if journal_name is None:
321 logger.debug(
322 f"Result {result.get('title')} has no associated "
323 f"journal, not evaluating reputation."
324 )
325 return not self.__exclude_non_published
326 journal_name = self.__clean_journal_name(journal_name)
328 # Check the database first.
329 with self.__db_session() as session:
330 journal = (
331 session.query(Journal).filter_by(name=journal_name).first()
332 )
333 if (
334 journal is not None
335 and (time.time() - journal.quality_analysis_time)
336 < self.__quality_reanalysis_period.total_seconds()
337 ):
338 logger.debug(
339 f"Found existing reputation for {journal_name} in database."
340 )
341 return journal.quality >= self.__threshold
343 # Evaluate reputation.
344 try:
345 quality = self.__analyze_journal_reputation(journal_name)
346 # Save to the database.
347 self.__add_journal_to_db(name=journal_name, quality=quality)
348 return quality >= self.__threshold
349 except ValueError:
350 # The LLM behaved weirdly. In this case, we will just assume it's
351 # okay.
352 return True
354 def filter_results(
355 self, results: List[Dict], query: str, **kwargs
356 ) -> List[Dict]:
357 try:
358 return list(filter(self.__check_result, results))
359 except Exception as e:
360 logger.exception(
361 f"Journal quality filtering failed: {e}, {traceback.format_exc()}"
362 )
363 return results