Coverage for src / local_deep_research / advanced_search_system / filters / journal_reputation_filter.py: 22%

111 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1import time 

2import traceback 

3from datetime import timedelta 

4from typing import Any, Dict, List, Optional 

5 

6from langchain_core.language_models.chat_models import BaseChatModel 

7from loguru import logger 

8from methodtools import lru_cache 

9from sqlalchemy.orm import Session 

10 

11from ...config.llm_config import get_llm 

12from ...database.models import Journal 

13from ...database.session_context import get_user_db_session 

14from ...search_system import AdvancedSearchSystem 

15from ...utilities.thread_context import get_search_context 

16from ...web_search_engines.search_engine_factory import create_search_engine 

17from .base_filter import BaseFilter 

18 

19 

20class JournalFilterError(Exception): 

21 """ 

22 Custom exception for errors related to journal filtering. 

23 """ 

24 

25 

26class JournalReputationFilter(BaseFilter): 

27 """ 

28 A filter for academic results that considers the reputation of journals. 

29 

30 Note that this filter requires SearXNG to be available in order to work. 

31 """ 

32 

33 def __init__( 

34 self, 

35 model: BaseChatModel | None = None, 

36 reliability_threshold: int | None = None, 

37 max_context: int | None = None, 

38 exclude_non_published: bool | None = None, 

39 quality_reanalysis_period: timedelta | None = None, 

40 settings_snapshot: Dict[str, Any] | None = None, 

41 ): 

42 """ 

43 Args: 

44 model: The LLM model to use for analysis. 

45 reliability_threshold: The filter scores journal reliability on a 

46 scale of 1-10. Results from any journal with a reliability 

47 below this threshold will be culled. Will be read from the 

48 settings if not specified. 

49 max_context: The maximum number of characters to feed into the 

50 LLM when assessing journal reliability. 

51 exclude_non_published: If true, it will exclude any results that 

52 don't have an associated journal publication. 

53 quality_reanalysis_period: Period at which to update journal 

54 quality assessments. 

55 settings_snapshot: Settings snapshot for thread context. 

56 

57 """ 

58 super().__init__(model) 

59 

60 if self.model is None: 

61 self.model = get_llm() 

62 

63 self.__threshold = reliability_threshold 

64 if self.__threshold is None: 

65 # Import here to avoid circular import 

66 from ...config.search_config import get_setting_from_snapshot 

67 

68 self.__threshold = int( 

69 get_setting_from_snapshot( 

70 "search.journal_reputation.threshold", 

71 4, 

72 settings_snapshot=settings_snapshot, 

73 ) 

74 ) 

75 self.__max_context = max_context 

76 if self.__max_context is None: 

77 self.__max_context = int( 

78 get_setting_from_snapshot( 

79 "search.journal_reputation.max_context", 

80 3000, 

81 settings_snapshot=settings_snapshot, 

82 ) 

83 ) 

84 self.__exclude_non_published = exclude_non_published 

85 if self.__exclude_non_published is None: 

86 self.__exclude_non_published = bool( 

87 get_setting_from_snapshot( 

88 "search.journal_reputation.exclude_non_published", 

89 False, 

90 settings_snapshot=settings_snapshot, 

91 ) 

92 ) 

93 self.__quality_reanalysis_period = quality_reanalysis_period 

94 if self.__quality_reanalysis_period is None: 

95 self.__quality_reanalysis_period = timedelta( 

96 days=int( 

97 get_setting_from_snapshot( 

98 "search.journal_reputation.reanalysis_period", 

99 365, 

100 settings_snapshot=settings_snapshot, 

101 ) 

102 ) 

103 ) 

104 

105 # Store settings_snapshot for later use 

106 self.__settings_snapshot = settings_snapshot 

107 

108 # SearXNG is required so we can search the open web for reputational 

109 # information. 

110 self.__engine = create_search_engine( 

111 "searxng", llm=self.model, settings_snapshot=settings_snapshot 

112 ) 

113 if self.__engine is None: 

114 raise JournalFilterError("SearXNG initialization failed.") 

115 

116 @classmethod 

117 def create_default( 

118 cls, 

119 model: BaseChatModel | None = None, 

120 *, 

121 engine_name: str, 

122 settings_snapshot: Dict[str, Any] | None = None, 

123 ) -> Optional["JournalReputationFilter"]: 

124 """ 

125 Initializes a default configuration of the filter based on the settings. 

126 

127 Args: 

128 model: Explicitly specify the LLM to use. 

129 engine_name: The name of the search engine. Will be used to check 

130 the enablement status for that engine. 

131 settings_snapshot: Settings snapshot for thread context. 

132 

133 Returns: 

134 The filter that it created, or None if filtering is disabled in 

135 the settings, or misconfigured. 

136 

137 """ 

138 # Import here to avoid circular import 

139 from ...config.search_config import get_setting_from_snapshot 

140 

141 if not bool( 

142 get_setting_from_snapshot( 

143 f"search.engine.web.{engine_name}.journal_reputation.enabled", 

144 True, 

145 settings_snapshot=settings_snapshot, 

146 ) 

147 ): 

148 return None 

149 

150 try: 

151 # Initialize the filter with default settings. 

152 return JournalReputationFilter( 

153 model=model, settings_snapshot=settings_snapshot 

154 ) 

155 except JournalFilterError: 

156 logger.exception( 

157 "SearXNG is not configured, but is required for " 

158 "journal reputation filtering. Disabling filtering." 

159 ) 

160 return None 

161 

162 @staticmethod 

163 def __db_session() -> Session: 

164 """ 

165 Returns: 

166 The database session to use. 

167 

168 """ 

169 context = get_search_context() 

170 username = context.get("username") 

171 password = context.get("user_password") 

172 

173 return get_user_db_session(username=username, password=password) 

174 

175 def __make_search_system(self) -> AdvancedSearchSystem: 

176 """ 

177 Creates a new `AdvancedSearchSystem` instance. 

178 

179 Returns: 

180 The system it created. 

181 

182 """ 

183 return AdvancedSearchSystem( 

184 llm=self.model, 

185 search=self.__engine, 

186 # We clamp down on the default iterations and questions for speed. 

187 max_iterations=1, 

188 questions_per_iteration=3, 

189 settings_snapshot=self.__settings_snapshot, 

190 ) 

191 

192 @lru_cache(maxsize=1024) 

193 def __analyze_journal_reputation(self, journal_name: str) -> int: 

194 """ 

195 Analyzes the reputation of a particular journal. 

196 

197 Args: 

198 journal_name: The name of the journal. 

199 

200 Returns: 

201 The reputation of the journal, on a scale from 1-10. 

202 

203 """ 

204 logger.info(f"Analyzing reputation of journal '{journal_name}'...") 

205 

206 # Perform a search for information about this journal. 

207 journal_info = self.__make_search_system().analyze_topic( 

208 f'Assess the reputability and reliability of the journal "' 

209 f'{journal_name}", with a particular focus on its quartile ' 

210 f"ranking and peer review status. Be sure to specify the journal " 

211 f"name in any generated questions." 

212 ) 

213 journal_info = "\n".join( 

214 [f["content"] for f in journal_info["findings"]] 

215 ) 

216 logger.debug(f"Received raw info about journal: {journal_info}") 

217 

218 # Have the LLM assess the reliability based on this information. 

219 prompt = f""" 

220 You are a research assistant helping to assess the reliability and 

221 reputability of scientific journals. A reputable journal should be 

222 peer-reviewed, not predatory, and high-impact. Please review the 

223 following information on the journal "{journal_name}" and output a 

224 reputability score between 1 and 10, where 1-3 is not reputable and 

225 probably predatory, 4-6 is reputable but low-impact (Q2 or Q3), 

226 and 7-10 is reputable Q1 journals. Only output the number, do not 

227 provide any explanation or other output. 

228 

229 JOURNAL INFORMATION: 

230 

231 {journal_info} 

232 """ 

233 if len(prompt) > self.__max_context: 

234 # If the prompt is too long, truncate it to fit within the max context size. 

235 prompt = prompt[: self.__max_context] + "..." 

236 

237 # Generate a response from the LLM model. 

238 response = self.model.invoke(prompt).text() 

239 logger.debug(f"Got raw LLM response: {response}") 

240 

241 # Extract the score from the response. 

242 try: 

243 reputation_score = int(response.strip()) 

244 except ValueError: 

245 logger.exception( 

246 "Failed to parse reputation score from LLM response." 

247 ) 

248 raise ValueError( 

249 "Failed to parse reputation score from LLM response." 

250 ) 

251 

252 return max(min(reputation_score, 10), 1) 

253 

254 def __add_journal_to_db(self, *, name: str, quality: int) -> None: 

255 """ 

256 Saves the journal quality information to the database. 

257 

258 Args: 

259 name: The name of the journal. 

260 quality: The quality assessment for the journal. 

261 

262 """ 

263 with self.__db_session() as db_session: 

264 journal = db_session.query(Journal).filter_by(name=name).first() 

265 if journal is not None: 

266 journal.quality = quality 

267 journal.quality_model = self.model.name 

268 journal.quality_analysis_time = int(time.time()) 

269 else: 

270 journal = Journal( 

271 name=name, 

272 quality=quality, 

273 quality_model=self.model.name, 

274 quality_analysis_time=int(time.time()), 

275 ) 

276 db_session.add(journal) 

277 

278 db_session.commit() 

279 

280 def __clean_journal_name(self, journal_name: str) -> str: 

281 """ 

282 Cleans up the name of a journal to remove any extraneous information. 

283 This is mostly to make caching more effective. 

284 

285 Args: 

286 journal_name: The raw name of the journal. 

287 

288 Returns: 

289 The cleaned name. 

290 

291 """ 

292 logger.debug(f"Cleaning raw journal name: {journal_name}") 

293 

294 prompt = f""" 

295 Clean up the following journal or conference name: 

296 

297 "{journal_name}" 

298 

299 Remove any references to volumes, pages, months, or years. Expand 

300 abbreviations if possible. For conferences, remove locations. Only 

301 output the clean name, do not provide any explanation or other output. 

302 """ 

303 

304 response = self.model.invoke(prompt).text() 

305 return response.strip() 

306 

307 def __check_result(self, result: Dict[str, Any]) -> bool: 

308 """ 

309 Performs a search to determine the reputability of a result journal.. 

310 

311 Args: 

312 result: The result to check. 

313 

314 Returns: 

315 True if the journal is reputable or if it couldn't determine a 

316 reputability score, false otherwise. 

317 

318 """ 

319 journal_name = result.get("journal_ref") 

320 if journal_name is None: 

321 logger.debug( 

322 f"Result {result.get('title')} has no associated " 

323 f"journal, not evaluating reputation." 

324 ) 

325 return not self.__exclude_non_published 

326 journal_name = self.__clean_journal_name(journal_name) 

327 

328 # Check the database first. 

329 with self.__db_session() as session: 

330 journal = ( 

331 session.query(Journal).filter_by(name=journal_name).first() 

332 ) 

333 if ( 

334 journal is not None 

335 and (time.time() - journal.quality_analysis_time) 

336 < self.__quality_reanalysis_period.total_seconds() 

337 ): 

338 logger.debug( 

339 f"Found existing reputation for {journal_name} in database." 

340 ) 

341 return journal.quality >= self.__threshold 

342 

343 # Evaluate reputation. 

344 try: 

345 quality = self.__analyze_journal_reputation(journal_name) 

346 # Save to the database. 

347 self.__add_journal_to_db(name=journal_name, quality=quality) 

348 return quality >= self.__threshold 

349 except ValueError: 

350 # The LLM behaved weirdly. In this case, we will just assume it's 

351 # okay. 

352 return True 

353 

354 def filter_results( 

355 self, results: List[Dict], query: str, **kwargs 

356 ) -> List[Dict]: 

357 try: 

358 return list(filter(self.__check_result, results)) 

359 except Exception as e: 

360 logger.exception( 

361 f"Journal quality filtering failed: {e}, {traceback.format_exc()}" 

362 ) 

363 return results