Coverage for src / local_deep_research / advanced_search_system / filters / journal_reputation_filter.py: 85%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import time 

2from datetime import timedelta 

3from typing import Any, Dict, List, Optional 

4 

5from langchain_core.language_models.chat_models import BaseChatModel 

6from loguru import logger 

7from methodtools import lru_cache 

8from sqlalchemy.orm import Session 

9 

10from ...config.llm_config import get_llm 

11from ...database.models import Journal 

12from ...database.session_context import get_user_db_session 

13from ...search_system import AdvancedSearchSystem 

14from ...utilities.thread_context import get_search_context 

15from ...web_search_engines.search_engine_factory import create_search_engine 

16from .base_filter import BaseFilter 

17 

18 

19class JournalFilterError(Exception): 

20 """ 

21 Custom exception for errors related to journal filtering. 

22 """ 

23 

24 

25class JournalReputationFilter(BaseFilter): 

26 """ 

27 A filter for academic results that considers the reputation of journals. 

28 

29 Note that this filter requires SearXNG to be available in order to work. 

30 """ 

31 

32 def __init__( 

33 self, 

34 model: BaseChatModel | None = None, 

35 reliability_threshold: int | None = None, 

36 max_context: int | None = None, 

37 exclude_non_published: bool | None = None, 

38 quality_reanalysis_period: timedelta | None = None, 

39 settings_snapshot: Dict[str, Any] | None = None, 

40 ): 

41 """ 

42 Args: 

43 model: The LLM model to use for analysis. 

44 reliability_threshold: The filter scores journal reliability on a 

45 scale of 1-10. Results from any journal with a reliability 

46 below this threshold will be culled. Will be read from the 

47 settings if not specified. 

48 max_context: The maximum number of characters to feed into the 

49 LLM when assessing journal reliability. 

50 exclude_non_published: If true, it will exclude any results that 

51 don't have an associated journal publication. 

52 quality_reanalysis_period: Period at which to update journal 

53 quality assessments. 

54 settings_snapshot: Settings snapshot for thread context. 

55 

56 """ 

57 super().__init__(model) 

58 

59 if self.model is None: 

60 self.model = get_llm() 

61 

62 self.__threshold = reliability_threshold 

63 if self.__threshold is None: 

64 # Import here to avoid circular import 

65 from ...config.search_config import get_setting_from_snapshot 

66 

67 self.__threshold = int( 

68 get_setting_from_snapshot( 

69 "search.journal_reputation.threshold", 

70 4, 

71 settings_snapshot=settings_snapshot, 

72 ) 

73 ) 

74 self.__max_context = max_context 

75 if self.__max_context is None: 

76 self.__max_context = int( 

77 get_setting_from_snapshot( 

78 "search.journal_reputation.max_context", 

79 3000, 

80 settings_snapshot=settings_snapshot, 

81 ) 

82 ) 

83 self.__exclude_non_published = exclude_non_published 

84 if self.__exclude_non_published is None: 

85 self.__exclude_non_published = bool( 

86 get_setting_from_snapshot( 

87 "search.journal_reputation.exclude_non_published", 

88 False, 

89 settings_snapshot=settings_snapshot, 

90 ) 

91 ) 

92 self.__quality_reanalysis_period = quality_reanalysis_period 

93 if self.__quality_reanalysis_period is None: 

94 self.__quality_reanalysis_period = timedelta( 

95 days=int( 

96 get_setting_from_snapshot( 

97 "search.journal_reputation.reanalysis_period", 

98 365, 

99 settings_snapshot=settings_snapshot, 

100 ) 

101 ) 

102 ) 

103 

104 # Store settings_snapshot for later use 

105 self.__settings_snapshot = settings_snapshot 

106 

107 # SearXNG is required so we can search the open web for reputational 

108 # information. 

109 self.__engine = create_search_engine( 

110 "searxng", llm=self.model, settings_snapshot=settings_snapshot 

111 ) 

112 if self.__engine is None: 

113 raise JournalFilterError("SearXNG initialization failed.") 

114 

115 @classmethod 

116 def create_default( 

117 cls, 

118 model: BaseChatModel | None = None, 

119 *, 

120 engine_name: str, 

121 settings_snapshot: Dict[str, Any] | None = None, 

122 ) -> Optional["JournalReputationFilter"]: 

123 """ 

124 Initializes a default configuration of the filter based on the settings. 

125 

126 Args: 

127 model: Explicitly specify the LLM to use. 

128 engine_name: The name of the search engine. Will be used to check 

129 the enablement status for that engine. 

130 settings_snapshot: Settings snapshot for thread context. 

131 

132 Returns: 

133 The filter that it created, or None if filtering is disabled in 

134 the settings, or misconfigured. 

135 

136 """ 

137 # Import here to avoid circular import 

138 from ...config.search_config import get_setting_from_snapshot 

139 

140 if not bool( 

141 get_setting_from_snapshot( 

142 f"search.engine.web.{engine_name}.journal_reputation.enabled", 

143 True, 

144 settings_snapshot=settings_snapshot, 

145 ) 

146 ): 

147 return None 

148 

149 try: 

150 # Initialize the filter with default settings. 

151 return JournalReputationFilter( 

152 model=model, settings_snapshot=settings_snapshot 

153 ) 

154 except JournalFilterError: 

155 logger.exception( 

156 "SearXNG is not configured, but is required for " 

157 "journal reputation filtering. Disabling filtering." 

158 ) 

159 return None 

160 

161 @staticmethod 

162 def __db_session() -> Session: 

163 """ 

164 Returns: 

165 The database session to use. 

166 

167 """ 

168 context = get_search_context() 

169 username = context.get("username") 

170 password = context.get("user_password") 

171 

172 return get_user_db_session(username=username, password=password) 

173 

174 def __make_search_system(self) -> AdvancedSearchSystem: 

175 """ 

176 Creates a new `AdvancedSearchSystem` instance. 

177 

178 Returns: 

179 The system it created. 

180 

181 """ 

182 return AdvancedSearchSystem( 

183 llm=self.model, 

184 search=self.__engine, 

185 # We clamp down on the default iterations and questions for speed. 

186 max_iterations=1, 

187 questions_per_iteration=3, 

188 settings_snapshot=self.__settings_snapshot, 

189 ) 

190 

191 @lru_cache(maxsize=1024) 

192 def __analyze_journal_reputation(self, journal_name: str) -> int: 

193 """ 

194 Analyzes the reputation of a particular journal. 

195 

196 Args: 

197 journal_name: The name of the journal. 

198 

199 Returns: 

200 The reputation of the journal, on a scale from 1-10. 

201 

202 """ 

203 logger.info(f"Analyzing reputation of journal '{journal_name}'...") 

204 

205 # Perform a search for information about this journal. 

206 journal_info = self.__make_search_system().analyze_topic( 

207 f'Assess the reputability and reliability of the journal "' 

208 f'{journal_name}", with a particular focus on its quartile ' 

209 f"ranking and peer review status. Be sure to specify the journal " 

210 f"name in any generated questions." 

211 ) 

212 journal_info = "\n".join( 

213 [f["content"] for f in journal_info["findings"]] 

214 ) 

215 logger.debug(f"Received raw info about journal: {journal_info}") 

216 

217 # Have the LLM assess the reliability based on this information. 

218 prompt = f""" 

219 You are a research assistant helping to assess the reliability and 

220 reputability of scientific journals. A reputable journal should be 

221 peer-reviewed, not predatory, and high-impact. Please review the 

222 following information on the journal "{journal_name}" and output a 

223 reputability score between 1 and 10, where 1-3 is not reputable and 

224 probably predatory, 4-6 is reputable but low-impact (Q2 or Q3), 

225 and 7-10 is reputable Q1 journals. Only output the number, do not 

226 provide any explanation or other output. 

227 

228 JOURNAL INFORMATION: 

229 

230 {journal_info} 

231 """ 

232 if len(prompt) > self.__max_context: 232 ↛ 234line 232 didn't jump to line 234 because the condition on line 232 was never true

233 # If the prompt is too long, truncate it to fit within the max context size. 

234 prompt = prompt[: self.__max_context] + "..." 

235 

236 # Generate a response from the LLM model. 

237 response = self.model.invoke(prompt).text() 

238 logger.debug(f"Got raw LLM response: {response}") 

239 

240 # Extract the score from the response. 

241 try: 

242 reputation_score = int(response.strip()) 

243 except ValueError: 

244 logger.exception( 

245 "Failed to parse reputation score from LLM response." 

246 ) 

247 raise ValueError( 

248 "Failed to parse reputation score from LLM response." 

249 ) 

250 

251 return max(min(reputation_score, 10), 1) 

252 

253 def __add_journal_to_db(self, *, name: str, quality: int) -> None: 

254 """ 

255 Saves the journal quality information to the database. 

256 

257 Args: 

258 name: The name of the journal. 

259 quality: The quality assessment for the journal. 

260 

261 """ 

262 with self.__db_session() as db_session: 

263 journal = db_session.query(Journal).filter_by(name=name).first() 

264 if journal is not None: 

265 journal.quality = quality 

266 journal.quality_model = self.model.name 

267 journal.quality_analysis_time = int(time.time()) 

268 else: 

269 journal = Journal( 

270 name=name, 

271 quality=quality, 

272 quality_model=self.model.name, 

273 quality_analysis_time=int(time.time()), 

274 ) 

275 db_session.add(journal) 

276 

277 db_session.commit() 

278 

279 def __clean_journal_name(self, journal_name: str) -> str: 

280 """ 

281 Cleans up the name of a journal to remove any extraneous information. 

282 This is mostly to make caching more effective. 

283 

284 Args: 

285 journal_name: The raw name of the journal. 

286 

287 Returns: 

288 The cleaned name. 

289 

290 """ 

291 logger.debug(f"Cleaning raw journal name: {journal_name}") 

292 

293 prompt = f""" 

294 Clean up the following journal or conference name: 

295 

296 "{journal_name}" 

297 

298 Remove any references to volumes, pages, months, or years. Expand 

299 abbreviations if possible. For conferences, remove locations. Only 

300 output the clean name, do not provide any explanation or other output. 

301 """ 

302 

303 response = self.model.invoke(prompt).text() 

304 return response.strip() 

305 

306 def __check_result(self, result: Dict[str, Any]) -> bool: 

307 """ 

308 Performs a search to determine the reputability of a result journal.. 

309 

310 Args: 

311 result: The result to check. 

312 

313 Returns: 

314 True if the journal is reputable or if it couldn't determine a 

315 reputability score, false otherwise. 

316 

317 """ 

318 journal_name = result.get("journal_ref") 

319 if journal_name is None: 

320 logger.debug( 

321 f"Result {result.get('title')} has no associated " 

322 f"journal, not evaluating reputation." 

323 ) 

324 return not self.__exclude_non_published 

325 journal_name = self.__clean_journal_name(journal_name) 

326 

327 # Check the database first. 

328 with self.__db_session() as session: 

329 journal = ( 

330 session.query(Journal).filter_by(name=journal_name).first() 

331 ) 

332 if ( 332 ↛ 343line 332 didn't jump to line 343

333 journal is not None 

334 and (time.time() - journal.quality_analysis_time) 

335 < self.__quality_reanalysis_period.total_seconds() 

336 ): 

337 logger.debug( 

338 f"Found existing reputation for {journal_name} in database." 

339 ) 

340 return journal.quality >= self.__threshold 

341 

342 # Evaluate reputation. 

343 try: 

344 quality = self.__analyze_journal_reputation(journal_name) 

345 # Save to the database. 

346 self.__add_journal_to_db(name=journal_name, quality=quality) 

347 return quality >= self.__threshold 

348 except ValueError: 

349 # The LLM behaved weirdly. In this case, we will just assume it's 

350 # okay. 

351 return True 

352 

353 def filter_results( 

354 self, results: List[Dict], query: str, **kwargs 

355 ) -> List[Dict]: 

356 try: 

357 return list(filter(self.__check_result, results)) 

358 except Exception: 

359 logger.exception("Journal quality filtering failed") 

360 return results