Coverage for src / local_deep_research / web_search_engines / relevance_filter.py: 95%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1"""LLM-based relevance filter using plain text output. 

2 

3Filters search previews by asking the LLM to return a list of relevant 

4indices as plain text (e.g. ``0, 2, 5``). We parse the response with a 

5regex over integers, which is robust to wrappers like "Indices: 0, 2" 

6or "[0, 2, 5]" and dodges all the structured-output provider quirks 

7(qwen prose-mode, function_calling latency, schema bikeshedding). 

8 

9Design notes: 

10- An empty LLM response is treated as a valid judgment ("none of these 

11 results are relevant"). We do not second-guess the model — if the 

12 filter says reject all, we reject all, and log a warning so users 

13 can notice if their chosen model is misbehaving. 

14- On exception (network error, parse failure, provider outage) the 

15 filter is considered unavailable, not "reject all". In that case we 

16 fall back to a capped slice of the original previews so downstream 

17 processing is not overwhelmed by unfiltered results. 

18- The filter can split large preview lists into smaller ``batch_size`` 

19 chunks. Smaller batches are faster per call and tend to be more 

20 reliable on weaker models which struggle to track many indices in a 

21 single context. A failed individual batch is skipped (logged); only 

22 a hard exception falls back to the capped slice. 

23""" 

24 

25import re 

26import time 

27from concurrent.futures import ThreadPoolExecutor 

28from typing import Any, Dict, List, Optional 

29 

30from loguru import logger 

31 

32from langchain_ollama import ChatOllama 

33 

34# Real wrapper chains (RateLimitedLLMWrapper -> ProcessingLLMWrapper -> base) 

35# are at most 2-3 levels deep. A depth limit avoids spurious infinite chains 

36# on test mocks where every getattr produces a new child object. 

37_MAX_UNWRAP_DEPTH = 10 

38 

39# Matches standalone integers in the LLM response. ``\b`` ensures we 

40# don't match digits glued to letters ("v2"), and the negative lookbehind 

41# for ``.`` rejects the fractional part of decimals ("0.5" → just "0"). 

42# Combined with the prompt instruction to output ONLY the numbers, this 

43# is robust against prose like "The top 3 results from 2024 are ...". 

44_INT_RE = re.compile(r"(?<![\w.])\d+\b") 

45 

46 

47_RELEVANCE_PROMPT_TEMPLATE = """Query: "{query}" 

48 

49Search results: 

50{preview_text} 

51 

52Output ONLY the 0-based indices of relevant results as a comma-separated list, nothing else. 

53Example: 0, 2, 5""" # noqa: S608 

54 

55 

56def _unwrap_llm(llm): 

57 """Unwrap known LLM wrapper chains to get the base LangChain LLM. 

58 

59 Walks ``.base_llm`` attributes up to ``_MAX_UNWRAP_DEPTH`` levels. 

60 The depth limit guards against test mocks (e.g. ``unittest.mock.Mock``) 

61 that lazily create a fresh child object on every attribute access. 

62 """ 

63 probe = llm 

64 for _ in range(_MAX_UNWRAP_DEPTH): 

65 inner = getattr(probe, "base_llm", None) 

66 if inner is None or inner is probe: 

67 return probe 

68 probe = inner 

69 return probe 

70 

71 

72def _build_batch_prompt( 

73 query: str, batch: List[Dict[str, Any]], total_in_full: int 

74) -> str: 

75 """Build the relevance prompt for a single batch of previews. 

76 

77 Indices in the prompt are local to the batch (0..len(batch)-1). 

78 ``total_in_full`` is the size of the original full preview list, 

79 shown to the model for context — it doesn't affect the index range. 

80 """ 

81 preview_lines = [] 

82 for i, preview in enumerate(batch): 

83 title = preview.get("title", "Untitled").strip() 

84 snippet = preview.get("snippet", "").strip() 

85 if len(snippet) > 200: 

86 snippet = snippet[:200] + "..." 

87 preview_lines.append(f"[{i}] {title}\n {snippet}") 

88 

89 return _RELEVANCE_PROMPT_TEMPLATE.format( 

90 query=query, 

91 preview_text="\n\n".join(preview_lines), 

92 ) 

93 

94 

95def _run_batch( 

96 llm, 

97 batch: List[Dict[str, Any]], 

98 query: str, 

99 total_in_full: int, 

100 engine_name: str, 

101) -> List[int]: 

102 """Invoke the LLM on a single batch and return the parsed local indices. 

103 

104 Empty list = "none relevant" (valid judgment). Raises on LLM 

105 exceptions; the caller falls back to a capped slice in that case. 

106 """ 

107 prompt = _build_batch_prompt(query, batch, total_in_full) 

108 return _invoke_text(llm, prompt, engine_name) 

109 

110 

111def filter_previews_for_relevance( 

112 llm, 

113 previews: List[Dict[str, Any]], 

114 query: str, 

115 max_filtered_results: Optional[int] = None, 

116 engine_name: str = "", 

117 batch_size: Optional[int] = None, 

118 max_parallel_batches: int = 1, 

119) -> List[Dict[str, Any]]: 

120 """Filter search previews for relevance via plain-text LLM output. 

121 

122 Args: 

123 llm: LangChain LLM instance (may be wrapped) 

124 previews: List of preview dicts with title/snippet/url 

125 query: The search query 

126 max_filtered_results: Optional cap on the final result count 

127 (None = LLM decides) 

128 engine_name: Engine class name for log messages 

129 batch_size: If set and smaller than ``len(previews)``, the LLM 

130 is called once per batch of this many previews. Smaller 

131 batches are faster per call and more reliable on weaker 

132 models. None or 0 disables batching (single-call mode). 

133 max_parallel_batches: Number of batches to dispatch concurrently 

134 against the LLM (via a thread pool). 1 = sequential. 

135 Most providers (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI, 

136 Anthropic) handle concurrent requests fine. Ignored when 

137 there is only one batch. 

138 

139 Returns: 

140 Filtered list of preview dicts (subset of input). Order matches 

141 the original preview order across batches. 

142 """ 

143 if not previews: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 return [] 

145 

146 for i, preview in enumerate(previews): 

147 title = preview.get("title", "Untitled").strip() 

148 logger.debug(f"[{engine_name}] INPUT [{i}]: {title[:80]}") 

149 

150 # Cap used when the filter is unavailable (LLM exception) so we 

151 # don't flood downstream processing with unfiltered results. 

152 unavailable_cap = max_filtered_results or 5 

153 

154 # Determine batch boundaries. A batch_size of None or 0 means 

155 # "single batch" — process all previews in one LLM call. 

156 effective_batch = ( 

157 batch_size if (batch_size and batch_size > 0) else len(previews) 

158 ) 

159 batch_starts = list(range(0, len(previews), effective_batch)) 

160 batches = [previews[s : s + effective_batch] for s in batch_starts] 

161 

162 workers = max(1, min(max_parallel_batches, len(batches))) 

163 logger.debug( 

164 f"[{engine_name}] Dispatching {len(batches)} batch(es) of " 

165 f"<= {effective_batch} previews each, {workers} parallel worker(s)" 

166 ) 

167 

168 t0 = time.monotonic() 

169 results_per_batch: List[List[int]] = [[] for _ in batches] 

170 try: 

171 if workers == 1: 

172 for i, batch in enumerate(batches): 

173 results_per_batch[i] = _run_batch( 

174 llm, batch, query, len(previews), engine_name 

175 ) 

176 else: 

177 with ThreadPoolExecutor(max_workers=workers) as pool: 

178 futures = { 

179 pool.submit( 

180 _run_batch, 

181 llm, 

182 batch, 

183 query, 

184 len(previews), 

185 engine_name, 

186 ): i 

187 for i, batch in enumerate(batches) 

188 } 

189 for fut, i in futures.items(): 

190 results_per_batch[i] = fut.result() 

191 except Exception: 

192 logger.exception( 

193 f"[{engine_name}] LLM relevance filter failed — returning " 

194 f"first {unavailable_cap} previews as fallback" 

195 ) 

196 return previews[:unavailable_cap] 

197 total_elapsed = time.monotonic() - t0 

198 

199 # Aggregate results in original batch order so the final list 

200 # mirrors the input ordering across batches. 

201 ranked_results: List[Dict[str, Any]] = [] 

202 kept_indices: List[int] = [] 

203 seen: set = set() 

204 

205 for batch_start, batch_result in zip(batch_starts, results_per_batch): 

206 batch_len = min(effective_batch, len(previews) - batch_start) 

207 for li in batch_result: 

208 if not (0 <= li < batch_len): 

209 continue 

210 global_idx = batch_start + li 

211 if global_idx in seen: 

212 continue 

213 seen.add(global_idx) 

214 ranked_results.append(previews[global_idx]) 

215 kept_indices.append(global_idx) 

216 

217 logger.info( 

218 f"[{engine_name}] LLM relevance filter took {total_elapsed:.1f}s " 

219 f"across {len(batches)} batch(es) ({workers} parallel) " 

220 f"for {len(previews)} previews" 

221 ) 

222 

223 # Empty result is a valid LLM judgment ("none relevant"). Log a 

224 # warning on larger batches so users can notice a misbehaving model, 

225 # but do not override the decision. 

226 if not ranked_results and len(previews) > 2: 

227 logger.warning( 

228 f"[{engine_name}] LLM filter judged all {len(previews)} " 

229 f"results irrelevant. If this is unexpected, verify your " 

230 f"model handles structured output correctly." 

231 ) 

232 

233 # Apply cap if set, keeping ranked_results and kept_indices aligned. 

234 if ( 

235 max_filtered_results is not None 

236 and len(ranked_results) > max_filtered_results 

237 ): 

238 ranked_results = ranked_results[:max_filtered_results] 

239 kept_indices = kept_indices[:max_filtered_results] 

240 

241 # Log kept/removed 

242 removed_indices = set(range(len(previews))) - set(kept_indices) 

243 logger.info( 

244 f"[{engine_name}] Relevance filter: " 

245 f"kept {len(ranked_results)} of {len(previews)} results" 

246 ) 

247 for idx in kept_indices: 

248 title = previews[idx].get("title", "Untitled")[:80] 

249 logger.debug(f"[{engine_name}] KEPT [{idx}]: {title}") 

250 for idx in sorted(removed_indices): 

251 title = previews[idx].get("title", "Untitled")[:80] 

252 logger.debug(f"[{engine_name}] REMOVED [{idx}]: {title}") 

253 

254 return ranked_results 

255 

256 

257def _invoke_text(llm, prompt: str, engine_name: str) -> List[int]: 

258 """Invoke the LLM with a plain text prompt and parse out integer indices. 

259 

260 Returns the list of parsed ints (empty list = "no integers found", 

261 treated as a valid "none relevant" judgment by the caller). 

262 Range/dedup validation happens in ``filter_previews_for_relevance``. 

263 """ 

264 # Ollama thinking-by-default models (qwen3 dense variants, etc.) 

265 # burn 30-60s on CoT before emitting the answer. Index selection does 

266 # not benefit from reasoning, so suppress it on Ollama where supported. 

267 base_llm = _unwrap_llm(llm) 

268 invoke_kwargs = {} 

269 if isinstance(base_llm, ChatOllama): 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 invoke_kwargs["reasoning"] = False 

271 

272 result = llm.invoke(prompt, **invoke_kwargs) 

273 

274 # LangChain chat models return a Message; LLMs return a string. 

275 text = getattr(result, "content", result) 

276 if not isinstance(text, str): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 logger.warning( 

278 f"[{engine_name}] Unexpected LLM response type: " 

279 f"{type(text).__name__}" 

280 ) 

281 return [] 

282 

283 indices = [int(m) for m in _INT_RE.findall(text)] 

284 logger.debug( 

285 f"[{engine_name}] Text output parsed {len(indices)} indices: {indices}" 

286 ) 

287 return indices