Coverage for src / local_deep_research / advanced_search_system / filters / journal_reputation_filter.py: 88%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1import time 

2from datetime import timedelta 

3from typing import Any, Dict, List, Optional 

4 

5from langchain_core.language_models.chat_models import BaseChatModel 

6from loguru import logger 

7from methodtools import lru_cache 

8from sqlalchemy.orm import Session 

9 

10from ...config.llm_config import get_llm 

11from ...database.models import Journal 

12from ...database.session_context import get_user_db_session 

13from ...search_system import AdvancedSearchSystem 

14from ...utilities.resource_utils import safe_close 

15from ...utilities.thread_context import get_search_context 

16from ...web_search_engines.search_engine_factory import create_search_engine 

17from .base_filter import BaseFilter 

18 

19 

20class JournalFilterError(Exception): 

21 """ 

22 Custom exception for errors related to journal filtering. 

23 """ 

24 

25 

26class JournalReputationFilter(BaseFilter): 

27 """ 

28 A filter for academic results that considers the reputation of journals. 

29 

30 Note that this filter requires SearXNG to be available in order to work. 

31 """ 

32 

33 def __init__( 

34 self, 

35 model: BaseChatModel | None = None, 

36 reliability_threshold: int | None = None, 

37 max_context: int | None = None, 

38 exclude_non_published: bool | None = None, 

39 quality_reanalysis_period: timedelta | None = None, 

40 settings_snapshot: Dict[str, Any] | None = None, 

41 ): 

42 """ 

43 Args: 

44 model: The LLM model to use for analysis. 

45 reliability_threshold: The filter scores journal reliability on a 

46 scale of 1-10. Results from any journal with a reliability 

47 below this threshold will be culled. Will be read from the 

48 settings if not specified. 

49 max_context: The maximum number of characters to feed into the 

50 LLM when assessing journal reliability. 

51 exclude_non_published: If true, it will exclude any results that 

52 don't have an associated journal publication. 

53 quality_reanalysis_period: Period at which to update journal 

54 quality assessments. 

55 settings_snapshot: Settings snapshot for thread context. 

56 

57 """ 

58 super().__init__(model) 

59 

60 self._owns_llm = self.model is None 

61 if self.model is None: 

62 self.model = get_llm() 

63 

64 self.__threshold = reliability_threshold 

65 if self.__threshold is None: 

66 # Import here to avoid circular import 

67 from ...config.search_config import get_setting_from_snapshot 

68 

69 self.__threshold = int( 

70 get_setting_from_snapshot( 

71 "search.journal_reputation.threshold", 

72 4, 

73 settings_snapshot=settings_snapshot, 

74 ) 

75 ) 

76 self.__max_context = max_context 

77 if self.__max_context is None: 

78 self.__max_context = int( 

79 get_setting_from_snapshot( 

80 "search.journal_reputation.max_context", 

81 3000, 

82 settings_snapshot=settings_snapshot, 

83 ) 

84 ) 

85 self.__exclude_non_published = exclude_non_published 

86 if self.__exclude_non_published is None: 

87 self.__exclude_non_published = bool( 

88 get_setting_from_snapshot( 

89 "search.journal_reputation.exclude_non_published", 

90 False, 

91 settings_snapshot=settings_snapshot, 

92 ) 

93 ) 

94 self.__quality_reanalysis_period = quality_reanalysis_period 

95 if self.__quality_reanalysis_period is None: 

96 self.__quality_reanalysis_period = timedelta( 

97 days=int( 

98 get_setting_from_snapshot( 

99 "search.journal_reputation.reanalysis_period", 

100 365, 

101 settings_snapshot=settings_snapshot, 

102 ) 

103 ) 

104 ) 

105 

106 # Store settings_snapshot for later use 

107 self.__settings_snapshot = settings_snapshot 

108 

109 # SearXNG is required so we can search the open web for reputational 

110 # information. 

111 self.__engine = create_search_engine( 

112 "searxng", llm=self.model, settings_snapshot=settings_snapshot 

113 ) 

114 # create_search_engine() can return a non-None engine whose backing 

115 # service is unreachable (e.g. SearXNG is down), so we also verify 

116 # the engine reports itself as available. 

117 if self.__engine is None or not getattr( 

118 self.__engine, "is_available", False 

119 ): 

120 raise JournalFilterError( 

121 "SearXNG initialization failed or not available." 

122 ) 

123 

124 def close(self) -> None: 

125 """Close the SearXNG engine and LLM client.""" 

126 if hasattr(self, "_JournalReputationFilter__engine"): 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true

127 safe_close(self.__engine, "SearXNG engine") 

128 if self._owns_llm: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 safe_close(self.model, "journal filter LLM") 

130 

131 @classmethod 

132 def create_default( 

133 cls, 

134 model: BaseChatModel | None = None, 

135 *, 

136 engine_name: str, 

137 settings_snapshot: Dict[str, Any] | None = None, 

138 ) -> Optional["JournalReputationFilter"]: 

139 """ 

140 Initializes a default configuration of the filter based on the settings. 

141 

142 Args: 

143 model: Explicitly specify the LLM to use. 

144 engine_name: The name of the search engine. Will be used to check 

145 the enablement status for that engine. 

146 settings_snapshot: Settings snapshot for thread context. 

147 

148 Returns: 

149 The filter that it created, or None if filtering is disabled in 

150 the settings, or misconfigured. 

151 

152 """ 

153 # Import here to avoid circular import 

154 from ...config.search_config import get_setting_from_snapshot 

155 

156 if not bool( 

157 get_setting_from_snapshot( 

158 f"search.engine.web.{engine_name}.journal_reputation.enabled", 

159 True, 

160 settings_snapshot=settings_snapshot, 

161 ) 

162 ): 

163 return None 

164 

165 try: 

166 # Initialize the filter with default settings. 

167 return JournalReputationFilter( 

168 model=model, settings_snapshot=settings_snapshot 

169 ) 

170 except JournalFilterError: 

171 logger.exception( 

172 "SearXNG is not configured, but is required for " 

173 "journal reputation filtering. Disabling filtering." 

174 ) 

175 return None 

176 

177 @staticmethod 

178 def __db_session() -> Session: 

179 """ 

180 Returns: 

181 The database session to use. 

182 

183 """ 

184 context = get_search_context() 

185 username = context.get("username") 

186 password = context.get("user_password") 

187 

188 return get_user_db_session(username=username, password=password) 

189 

190 def __make_search_system(self) -> AdvancedSearchSystem: 

191 """ 

192 Creates a new `AdvancedSearchSystem` instance. 

193 

194 Returns: 

195 The system it created. 

196 

197 """ 

198 return AdvancedSearchSystem( 

199 llm=self.model, 

200 search=self.__engine, 

201 # We clamp down on the default iterations and questions for speed. 

202 max_iterations=1, 

203 questions_per_iteration=3, 

204 settings_snapshot=self.__settings_snapshot, 

205 ) 

206 

207 @lru_cache(maxsize=1024) 

208 def __analyze_journal_reputation(self, journal_name: str) -> int: 

209 """ 

210 Analyzes the reputation of a particular journal. 

211 

212 Args: 

213 journal_name: The name of the journal. 

214 

215 Returns: 

216 The reputation of the journal, on a scale from 1-10. 

217 

218 """ 

219 logger.info(f"Analyzing reputation of journal '{journal_name}'...") 

220 

221 # Perform a search for information about this journal. 

222 search_system = self.__make_search_system() 

223 try: 

224 journal_info = search_system.analyze_topic( 

225 f'Assess the reputability and reliability of the journal "' 

226 f'{journal_name}", with a particular focus on its quartile ' 

227 f"ranking and peer review status. Be sure to specify the journal " 

228 f"name in any generated questions." 

229 ) 

230 finally: 

231 safe_close(search_system, "journal search system") 

232 journal_info = "\n".join( 

233 [f["content"] for f in journal_info["findings"]] 

234 ) 

235 logger.debug(f"Received raw info about journal: {journal_info}") 

236 

237 # Have the LLM assess the reliability based on this information. 

238 prompt = f""" 

239 You are a research assistant helping to assess the reliability and 

240 reputability of scientific journals. A reputable journal should be 

241 peer-reviewed, not predatory, and high-impact. Please review the 

242 following information on the journal "{journal_name}" and output a 

243 reputability score between 1 and 10, where 1-3 is not reputable and 

244 probably predatory, 4-6 is reputable but low-impact (Q2 or Q3), 

245 and 7-10 is reputable Q1 journals. Only output the number, do not 

246 provide any explanation or other output. 

247 

248 JOURNAL INFORMATION: 

249 

250 {journal_info} 

251 """ 

252 if len(prompt) > self.__max_context: 252 ↛ 254line 252 didn't jump to line 254 because the condition on line 252 was never true

253 # If the prompt is too long, truncate it to fit within the max context size. 

254 prompt = prompt[: self.__max_context] + "..." 

255 

256 # Generate a response from the LLM model. 

257 response = self.model.invoke(prompt).content 

258 logger.debug(f"Got raw LLM response: {response}") 

259 

260 # Extract the score from the response. 

261 try: 

262 reputation_score = int(response.strip()) 

263 except ValueError: 

264 logger.exception( 

265 "Failed to parse reputation score from LLM response." 

266 ) 

267 raise ValueError( 

268 "Failed to parse reputation score from LLM response." 

269 ) 

270 

271 return max(min(reputation_score, 10), 1) 

272 

273 def __add_journal_to_db(self, *, name: str, quality: int) -> None: 

274 """ 

275 Saves the journal quality information to the database. 

276 

277 Args: 

278 name: The name of the journal. 

279 quality: The quality assessment for the journal. 

280 

281 """ 

282 with self.__db_session() as db_session: 

283 journal = db_session.query(Journal).filter_by(name=name).first() 

284 if journal is not None: 

285 journal.quality = quality 

286 journal.quality_model = self.model.name 

287 journal.quality_analysis_time = int(time.time()) 

288 else: 

289 journal = Journal( 

290 name=name, 

291 quality=quality, 

292 quality_model=self.model.name, 

293 quality_analysis_time=int(time.time()), 

294 ) 

295 db_session.add(journal) 

296 

297 db_session.commit() 

298 

299 def __clean_journal_name(self, journal_name: str) -> str: 

300 """ 

301 Cleans up the name of a journal to remove any extraneous information. 

302 This is mostly to make caching more effective. 

303 

304 Args: 

305 journal_name: The raw name of the journal. 

306 

307 Returns: 

308 The cleaned name. 

309 

310 """ 

311 logger.debug(f"Cleaning raw journal name: {journal_name}") 

312 

313 prompt = f""" 

314 Clean up the following journal or conference name: 

315 

316 "{journal_name}" 

317 

318 Remove any references to volumes, pages, months, or years. Expand 

319 abbreviations if possible. For conferences, remove locations. Only 

320 output the clean name, do not provide any explanation or other output. 

321 """ 

322 

323 response = self.model.invoke(prompt).content 

324 return response.strip() 

325 

326 def __check_result(self, result: Dict[str, Any]) -> bool: 

327 """ 

328 Performs a search to determine the reputability of a result journal.. 

329 

330 Args: 

331 result: The result to check. 

332 

333 Returns: 

334 True if the journal is reputable or if it couldn't determine a 

335 reputability score, false otherwise. 

336 

337 """ 

338 journal_name = result.get("journal_ref") 

339 if journal_name is None: 

340 logger.debug( 

341 f"Result {result.get('title')} has no associated " 

342 f"journal, not evaluating reputation." 

343 ) 

344 return not self.__exclude_non_published 

345 journal_name = self.__clean_journal_name(journal_name) 

346 

347 # Check the database first. 

348 with self.__db_session() as session: 

349 journal = ( 

350 session.query(Journal).filter_by(name=journal_name).first() 

351 ) 

352 if ( 

353 journal is not None 

354 and (time.time() - journal.quality_analysis_time) 

355 < self.__quality_reanalysis_period.total_seconds() 

356 ): 

357 logger.debug( 

358 f"Found existing reputation for {journal_name} in database." 

359 ) 

360 return journal.quality >= self.__threshold 

361 

362 # Evaluate reputation. 

363 try: 

364 quality = self.__analyze_journal_reputation(journal_name) 

365 # Save to the database. 

366 self.__add_journal_to_db(name=journal_name, quality=quality) 

367 return quality >= self.__threshold 

368 except ValueError: 

369 # The LLM behaved weirdly. In this case, we will just assume it's 

370 # okay. 

371 return True 

372 

373 def filter_results( 

374 self, results: List[Dict], query: str, **kwargs 

375 ) -> List[Dict]: 

376 try: 

377 return list(filter(self.__check_result, results)) 

378 except Exception: 

379 logger.exception("Journal quality filtering failed") 

380 return results