Coverage for src/local_deep_research/web_search_engines/relevance_filter.py: 91%

124 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""LLM-based relevance filter using plain text output. 

2 

3Filters search previews by asking the LLM to return a list of relevant 

4indices as plain text (e.g. ``0, 2, 5``). We parse the response with a 

5regex over integers, which is robust to wrappers like "Indices: 0, 2" 

6or "[0, 2, 5]" and dodges all the structured-output provider quirks 

7(qwen prose-mode, function_calling latency, schema bikeshedding). 

8 

9Design notes: 

10- An empty LLM response is treated as a valid judgment ("none of these 

11 results are relevant"). We do not second-guess the model — if the 

12 filter says reject all, we reject all, and log a warning so users 

13 can notice if their chosen model is misbehaving. 

14- On exception (network error, parse failure, provider outage) the 

15 filter is considered unavailable, not "reject all". In that case we 

16 fall back to a capped slice of the original previews so downstream 

17 processing is not overwhelmed by unfiltered results. 

18- The filter can split large preview lists into smaller ``batch_size`` 

19 chunks. Smaller batches are faster per call and tend to be more 

20 reliable on weaker models which struggle to track many indices in a 

21 single context. A failed individual batch is skipped (logged); only 

22 a hard exception falls back to the capped slice. 

23- Partial success is the common outcome under batching: some batches 

24 return valid judgments, others raise or time out. Successful batches 

25 are kept; failed/timed-out batches contribute nothing. The capped 

26 fallback only fires when *every* batch failed to produce a result. 

27""" 

28 

29import re 

30import time 

31from concurrent.futures import ( 

32 ThreadPoolExecutor, 

33 TimeoutError as FuturesTimeoutError, 

34 as_completed, 

35) 

36from datetime import UTC, datetime 

37from typing import Any, Dict, List, Optional 

38 

39from loguru import logger 

40 

41from langchain_ollama import ChatOllama 

42 

43from ..config.constants import DEFAULT_MAX_FILTERED_RESULTS 

44 

45# Real wrapper chains (RateLimitedLLMWrapper -> ProcessingLLMWrapper -> base) 

46# are at most 2-3 levels deep. A depth limit avoids spurious infinite chains 

47# on test mocks where every getattr produces a new child object. 

48_MAX_UNWRAP_DEPTH = 10 

49 

50# Matches standalone integers in the LLM response. ``\b`` ensures we 

51# don't match digits glued to letters ("v2"), and the negative lookbehind 

52# for ``.`` rejects the fractional part of decimals ("0.5" → just "0"). 

53# Combined with the prompt instruction to output ONLY the numbers, this 

54# is robust against prose like "The top 3 results from 2024 are ...". 

55_INT_RE = re.compile(r"(?<![\w.])\d+\b") 

56 

57# Wall-clock timeout passed to ``as_completed`` when batches run in 

58# parallel. Note: this bounds the time from iteration start until all 

59# parallel batches complete, not per-batch. A single batch (≤ 10 

60# previews on a 9B-class local model) typically completes in 5-15s. 

61# When Ollama serializes requests (OLLAMA_NUM_PARALLEL=1, the default) 

62# 10 parallel submissions effectively run sequentially server-side, so 

63# 10 × ~15s already brushes against the old 120s bound. 300s leaves 

64# room for slow-but-progressing work on serialized backends while still 

65# flagging genuine hangs. The sequential (workers==1) path does not 

66# apply this — only the parallel branch uses ``as_completed``. 

67# Without a bound here a stuck Ollama socket would block the pipeline 

68# indefinitely, because langchain-ollama's httpx client has no default 

69# socket timeout. 

70_FILTER_WALL_TIMEOUT_S = 300.0 

71 

72 

73# Per-preview snippet character cap. The snippet field often carries the 

74# paper abstract for academic engines (arxiv, semantic scholar, pubmed). 

75# 200 was too tight — it truncated abstracts before the judge could tell 

76# whether a paper's primary topic actually matches the query, leading to 

77# "paper mentions LLMs in passing" sources leaking past the filter. 800 

78# comfortably fits a typical abstract opening while keeping total prompt 

79# size bounded (≈16KB for 20 previews). 

80_SNIPPET_CHAR_CAP = 800 

81 

82 

83_RELEVANCE_PROMPT_TEMPLATE = """This is a relevance-filtering step. Kept results move forward and may be used in the final answer; dropped ones are excluded from further processing. 

84 

85Query: "{query}" 

86Current date: {current_date} 

87 

88Search results: 

89{preview_text} 

90 

91Direct topic match matters more than keyword match — results that just mention the query terms usually don't help. 

92 

93Output ONLY the 0-based indices of relevant results as a comma-separated list, nothing else. 

94Example: 0, 2, 5""" # noqa: S608 

95 

96 

97def _unwrap_llm(llm): 

98 """Unwrap known LLM wrapper chains to get the base LangChain LLM. 

99 

100 Walks ``.base_llm`` attributes up to ``_MAX_UNWRAP_DEPTH`` levels. 

101 The depth limit guards against test mocks (e.g. ``unittest.mock.Mock``) 

102 that lazily create a fresh child object on every attribute access. 

103 """ 

104 probe = llm 

105 for _ in range(_MAX_UNWRAP_DEPTH): 

106 inner = getattr(probe, "base_llm", None) 

107 if inner is None or inner is probe: 

108 return probe 

109 probe = inner 

110 return probe 

111 

112 

113def _build_batch_prompt( 

114 query: str, 

115 batch: List[Dict[str, Any]], 

116 total_in_full: int, 

117 prompt_template: str, 

118) -> str: 

119 """Build the relevance prompt for a single batch of previews. 

120 

121 Indices in the prompt are local to the batch (0..len(batch)-1). 

122 ``total_in_full`` is the size of the original full preview list, 

123 shown to the model for context — it doesn't affect the index range. 

124 ``prompt_template`` is passed in (defaulting at the public entry 

125 point) rather than read from a module global, so eval harnesses can 

126 test variants without monkey-patching module state. 

127 """ 

128 preview_lines = [] 

129 for i, preview in enumerate(batch): 

130 title = preview.get("title", "Untitled").strip() 

131 snippet = preview.get("snippet", "").strip() 

132 if len(snippet) > _SNIPPET_CHAR_CAP: 

133 snippet = snippet[:_SNIPPET_CHAR_CAP] + "..." 

134 preview_lines.append(f"[{i}] {title}\n {snippet}") 

135 

136 return prompt_template.format( 

137 query=query, 

138 current_date=datetime.now(UTC).strftime("%Y-%m-%d"), 

139 preview_text="\n\n".join(preview_lines), 

140 ) 

141 

142 

143def _run_batch( 

144 llm, 

145 batch: List[Dict[str, Any]], 

146 query: str, 

147 total_in_full: int, 

148 engine_name: str, 

149 prompt_template: str, 

150) -> List[int]: 

151 """Invoke the LLM on a single batch and return the parsed local indices. 

152 

153 Empty list = "none relevant" (valid judgment). Raises on LLM 

154 exceptions; the caller falls back to a capped slice in that case. 

155 """ 

156 prompt = _build_batch_prompt(query, batch, total_in_full, prompt_template) 

157 return _invoke_text(llm, prompt, engine_name) 

158 

159 

160def filter_previews_for_relevance( 

161 llm, 

162 previews: List[Dict[str, Any]], 

163 query: str, 

164 max_filtered_results: Optional[int] = None, 

165 engine_name: str = "", 

166 batch_size: Optional[int] = None, 

167 max_parallel_batches: int = 1, 

168 prompt_template: Optional[str] = None, 

169) -> List[Dict[str, Any]]: 

170 """Filter search previews for relevance via plain-text LLM output. 

171 

172 Args: 

173 llm: LangChain LLM instance (may be wrapped) 

174 previews: List of preview dicts with title/snippet/url 

175 query: The search query 

176 max_filtered_results: Optional cap on the final result count 

177 (None = LLM decides) 

178 engine_name: Engine class name for log messages 

179 batch_size: If set and smaller than ``len(previews)``, the LLM 

180 is called once per batch of this many previews. Smaller 

181 batches are faster per call and more reliable on weaker 

182 models. None or 0 disables batching (single-call mode). 

183 max_parallel_batches: Number of batches to dispatch concurrently 

184 against the LLM (via a thread pool). 1 = sequential. 

185 Most providers (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI, 

186 Anthropic) handle concurrent requests fine. Ignored when 

187 there is only one batch. 

188 prompt_template: Override the built-in ``_RELEVANCE_PROMPT_TEMPLATE``. 

189 Used by eval harnesses that want to compare variants without 

190 mutating module state. None (default) uses the shipping 

191 template. 

192 

193 Returns: 

194 Filtered list of preview dicts (subset of input). Order matches 

195 the original preview order across batches. 

196 """ 

197 if prompt_template is None: 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was always true

198 prompt_template = _RELEVANCE_PROMPT_TEMPLATE 

199 if not previews: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 return [] 

201 

202 for i, preview in enumerate(previews): 

203 title = preview.get("title", "Untitled").strip() 

204 logger.debug(f"[{engine_name}] INPUT [{i}]: {title[:80]}") 

205 

206 # Cap used when the filter is unavailable (LLM exception) so we 

207 # don't flood downstream processing with unfiltered results. 

208 unavailable_cap = max_filtered_results or DEFAULT_MAX_FILTERED_RESULTS 

209 

210 # Determine batch boundaries. A batch_size of None or 0 means 

211 # "single batch" — process all previews in one LLM call. 

212 effective_batch = ( 

213 batch_size if (batch_size and batch_size > 0) else len(previews) 

214 ) 

215 batch_starts = list(range(0, len(previews), effective_batch)) 

216 batches = [previews[s : s + effective_batch] for s in batch_starts] 

217 

218 workers = max(1, min(max_parallel_batches, len(batches))) 

219 logger.debug( 

220 f"[{engine_name}] Dispatching {len(batches)} batch(es) of " 

221 f"<= {effective_batch} previews each, {workers} parallel worker(s)" 

222 ) 

223 

224 t0 = time.monotonic() 

225 # None marks "batch never completed" (exception or timeout). A 

226 # completed batch is List[int], possibly empty (valid "none relevant" 

227 # judgment). We distinguish the two so an all-None outcome falls 

228 # back to the capped-slice while all-[] is kept as a valid decision. 

229 results_per_batch: List[Optional[List[int]]] = [None for _ in batches] 

230 

231 if workers == 1: 

232 for i, batch in enumerate(batches): 

233 try: 

234 results_per_batch[i] = _run_batch( 

235 llm, 

236 batch, 

237 query, 

238 len(previews), 

239 engine_name, 

240 prompt_template, 

241 ) 

242 except Exception: 

243 logger.exception( 

244 f"[{engine_name}] batch {i} failed — skipping its results" 

245 ) 

246 else: 

247 # Not using ``with ThreadPoolExecutor(...) as pool:`` on purpose. 

248 # ``__exit__`` calls ``shutdown(wait=True)`` which blocks on any 

249 # still-running worker — defeating the whole point of the 

250 # timeout when a worker is stuck on an Ollama HTTP call. We 

251 # manage the pool lifetime explicitly and always shut down with 

252 # ``wait=False`` so timed-out batches are orphaned (they'll 

253 # error out when the socket eventually fails) rather than 

254 # blocking the caller. 

255 pool = ThreadPoolExecutor(max_workers=workers) 

256 try: 

257 futures_to_idx = { 

258 pool.submit( 

259 _run_batch, 

260 llm, 

261 batch, 

262 query, 

263 len(previews), 

264 engine_name, 

265 prompt_template, 

266 ): i 

267 for i, batch in enumerate(batches) 

268 } 

269 try: 

270 for fut in as_completed( 

271 futures_to_idx, timeout=_FILTER_WALL_TIMEOUT_S 

272 ): 

273 i = futures_to_idx[fut] 

274 try: 

275 results_per_batch[i] = fut.result() 

276 except Exception: 

277 logger.exception( 

278 f"[{engine_name}] batch {i} failed — skipping" 

279 ) 

280 except FuturesTimeoutError: 

281 pending = [ 

282 futures_to_idx[f] for f in futures_to_idx if not f.done() 

283 ] 

284 logger.warning( 

285 f"[{engine_name}] {len(pending)} batch(es) still " 

286 f"running after {_FILTER_WALL_TIMEOUT_S}s — abandoning " 

287 f"(pending indices: {pending})" 

288 ) 

289 finally: 

290 # ``cancel_futures=True`` only cancels queued futures the 

291 # executor hasn't started yet; already-running workers stay 

292 # alive until their blocking call returns. We accept that — 

293 # the alternative is hanging forever. 

294 pool.shutdown(wait=False, cancel_futures=True) 

295 total_elapsed = time.monotonic() - t0 

296 

297 # If no batch even completed, fall back to an unfiltered capped slice 

298 # — otherwise downstream would receive zero results from a filter 

299 # outage rather than from a valid "all irrelevant" judgment. 

300 if all(r is None for r in results_per_batch): 

301 logger.error( 

302 f"[{engine_name}] every relevance-filter batch failed — " 

303 f"returning first {unavailable_cap} previews as fallback" 

304 ) 

305 return previews[:unavailable_cap] 

306 

307 # Normalise None → [] for the aggregation pass below; a batch that 

308 # was cancelled or errored just contributes nothing. Assign to a 

309 # new variable with a narrower type so mypy can prove the list 

310 # elements are no longer Optional. 

311 completed_batches: List[List[int]] = [ 

312 [] if r is None else r for r in results_per_batch 

313 ] 

314 

315 # Aggregate results in original batch order so the final list 

316 # mirrors the input ordering across batches. 

317 ranked_results: List[Dict[str, Any]] = [] 

318 kept_indices: List[int] = [] 

319 seen: set = set() 

320 

321 for batch_start, batch_result in zip(batch_starts, completed_batches): 

322 batch_len = min(effective_batch, len(previews) - batch_start) 

323 for li in batch_result: 

324 if not (0 <= li < batch_len): 

325 continue 

326 global_idx = batch_start + li 

327 if global_idx in seen: 

328 continue 

329 seen.add(global_idx) 

330 ranked_results.append(previews[global_idx]) 

331 kept_indices.append(global_idx) 

332 

333 logger.info( 

334 f"[{engine_name}] LLM relevance filter took {total_elapsed:.1f}s " 

335 f"across {len(batches)} batch(es) ({workers} parallel) " 

336 f"for {len(previews)} previews" 

337 ) 

338 

339 # Empty result is a valid LLM judgment ("none relevant"). Log a 

340 # warning on larger batches so users can notice a misbehaving model, 

341 # but do not override the decision. 

342 if not ranked_results and len(previews) > 2: 

343 logger.warning( 

344 f"[{engine_name}] LLM filter judged all {len(previews)} " 

345 f"results irrelevant. If this is unexpected, verify your " 

346 f"model handles structured output correctly." 

347 ) 

348 

349 # Apply cap if set, keeping ranked_results and kept_indices aligned. 

350 if ( 

351 max_filtered_results is not None 

352 and len(ranked_results) > max_filtered_results 

353 ): 

354 ranked_results = ranked_results[:max_filtered_results] 

355 kept_indices = kept_indices[:max_filtered_results] 

356 

357 # Log kept/removed/skipped. A preview is "skipped" when its batch 

358 # raised or timed out — the judge never saw it — which is distinct 

359 # from "removed" (judge returned a verdict that dropped it). 

360 skipped_indices: set = set() 

361 for i, (batch_start, r) in enumerate(zip(batch_starts, results_per_batch)): 

362 if r is None: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true

363 batch_len = min(effective_batch, len(previews) - batch_start) 

364 skipped_indices.update(range(batch_start, batch_start + batch_len)) 

365 removed_indices = ( 

366 set(range(len(previews))) - set(kept_indices) - skipped_indices 

367 ) 

368 logger.info( 

369 f"[{engine_name}] Relevance filter: " 

370 f"kept {len(ranked_results)} of {len(previews)} results" 

371 ) 

372 for idx in kept_indices: 

373 title = previews[idx].get("title", "Untitled")[:80] 

374 logger.debug(f"[{engine_name}] KEPT [{idx}]: {title}") 

375 for idx in sorted(removed_indices): 

376 title = previews[idx].get("title", "Untitled")[:80] 

377 logger.debug(f"[{engine_name}] REMOVED [{idx}]: {title}") 

378 for idx in sorted(skipped_indices): 378 ↛ 379line 378 didn't jump to line 379 because the loop on line 378 never started

379 title = previews[idx].get("title", "Untitled")[:80] 

380 logger.debug(f"[{engine_name}] SKIPPED [{idx}]: {title}") 

381 

382 return ranked_results 

383 

384 

385def _invoke_text(llm, prompt: str, engine_name: str) -> List[int]: 

386 """Invoke the LLM with a plain text prompt and parse out integer indices. 

387 

388 Returns the list of parsed ints (empty list = "no integers found", 

389 treated as a valid "none relevant" judgment by the caller). 

390 Range/dedup validation happens in ``filter_previews_for_relevance``. 

391 """ 

392 # Ollama thinking-by-default models (qwen3 dense variants, etc.) 

393 # burn 30-60s on CoT before emitting the answer. Index selection does 

394 # not benefit from reasoning, so suppress it on Ollama where supported. 

395 base_llm = _unwrap_llm(llm) 

396 invoke_kwargs = {} 

397 if isinstance(base_llm, ChatOllama): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 invoke_kwargs["reasoning"] = False 

399 

400 result = llm.invoke(prompt, **invoke_kwargs) 

401 

402 # LangChain chat models return a Message; LLMs return a string. 

403 text = getattr(result, "content", result) 

404 if not isinstance(text, str): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 logger.warning( 

406 f"[{engine_name}] Unexpected LLM response type: " 

407 f"{type(text).__name__}" 

408 ) 

409 return [] 

410 

411 indices = [int(m) for m in _INT_RE.findall(text)] 

412 logger.debug( 

413 f"[{engine_name}] Text output parsed {len(indices)} indices: {indices}" 

414 ) 

415 return indices