Coverage for src/local_deep_research/web_search_engines/relevance_filter.py: 91%
124 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""LLM-based relevance filter using plain text output.
3Filters search previews by asking the LLM to return a list of relevant
4indices as plain text (e.g. ``0, 2, 5``). We parse the response with a
5regex over integers, which is robust to wrappers like "Indices: 0, 2"
6or "[0, 2, 5]" and dodges all the structured-output provider quirks
7(qwen prose-mode, function_calling latency, schema bikeshedding).
9Design notes:
10- An empty LLM response is treated as a valid judgment ("none of these
11 results are relevant"). We do not second-guess the model — if the
12 filter says reject all, we reject all, and log a warning so users
13 can notice if their chosen model is misbehaving.
14- On exception (network error, parse failure, provider outage) the
15 filter is considered unavailable, not "reject all". In that case we
16 fall back to a capped slice of the original previews so downstream
17 processing is not overwhelmed by unfiltered results.
18- The filter can split large preview lists into smaller ``batch_size``
19 chunks. Smaller batches are faster per call and tend to be more
20 reliable on weaker models which struggle to track many indices in a
21 single context. A failed individual batch is skipped (logged); only
22 a hard exception falls back to the capped slice.
23- Partial success is the common outcome under batching: some batches
24 return valid judgments, others raise or time out. Successful batches
25 are kept; failed/timed-out batches contribute nothing. The capped
26 fallback only fires when *every* batch failed to produce a result.
27"""
29import re
30import time
31from concurrent.futures import (
32 ThreadPoolExecutor,
33 TimeoutError as FuturesTimeoutError,
34 as_completed,
35)
36from datetime import UTC, datetime
37from typing import Any, Dict, List, Optional
39from loguru import logger
41from langchain_ollama import ChatOllama
43from ..config.constants import DEFAULT_MAX_FILTERED_RESULTS
45# Real wrapper chains (RateLimitedLLMWrapper -> ProcessingLLMWrapper -> base)
46# are at most 2-3 levels deep. A depth limit avoids spurious infinite chains
47# on test mocks where every getattr produces a new child object.
48_MAX_UNWRAP_DEPTH = 10
50# Matches standalone integers in the LLM response. ``\b`` ensures we
51# don't match digits glued to letters ("v2"), and the negative lookbehind
52# for ``.`` rejects the fractional part of decimals ("0.5" → just "0").
53# Combined with the prompt instruction to output ONLY the numbers, this
54# is robust against prose like "The top 3 results from 2024 are ...".
55_INT_RE = re.compile(r"(?<![\w.])\d+\b")
57# Wall-clock timeout passed to ``as_completed`` when batches run in
58# parallel. Note: this bounds the time from iteration start until all
59# parallel batches complete, not per-batch. A single batch (≤ 10
60# previews on a 9B-class local model) typically completes in 5-15s.
61# When Ollama serializes requests (OLLAMA_NUM_PARALLEL=1, the default)
62# 10 parallel submissions effectively run sequentially server-side, so
63# 10 × ~15s already brushes against the old 120s bound. 300s leaves
64# room for slow-but-progressing work on serialized backends while still
65# flagging genuine hangs. The sequential (workers==1) path does not
66# apply this — only the parallel branch uses ``as_completed``.
67# Without a bound here a stuck Ollama socket would block the pipeline
68# indefinitely, because langchain-ollama's httpx client has no default
69# socket timeout.
70_FILTER_WALL_TIMEOUT_S = 300.0
73# Per-preview snippet character cap. The snippet field often carries the
74# paper abstract for academic engines (arxiv, semantic scholar, pubmed).
75# 200 was too tight — it truncated abstracts before the judge could tell
76# whether a paper's primary topic actually matches the query, leading to
77# "paper mentions LLMs in passing" sources leaking past the filter. 800
78# comfortably fits a typical abstract opening while keeping total prompt
79# size bounded (≈16KB for 20 previews).
80_SNIPPET_CHAR_CAP = 800
83_RELEVANCE_PROMPT_TEMPLATE = """This is a relevance-filtering step. Kept results move forward and may be used in the final answer; dropped ones are excluded from further processing.
85Query: "{query}"
86Current date: {current_date}
88Search results:
89{preview_text}
91Direct topic match matters more than keyword match — results that just mention the query terms usually don't help.
93Output ONLY the 0-based indices of relevant results as a comma-separated list, nothing else.
94Example: 0, 2, 5""" # noqa: S608
97def _unwrap_llm(llm):
98 """Unwrap known LLM wrapper chains to get the base LangChain LLM.
100 Walks ``.base_llm`` attributes up to ``_MAX_UNWRAP_DEPTH`` levels.
101 The depth limit guards against test mocks (e.g. ``unittest.mock.Mock``)
102 that lazily create a fresh child object on every attribute access.
103 """
104 probe = llm
105 for _ in range(_MAX_UNWRAP_DEPTH):
106 inner = getattr(probe, "base_llm", None)
107 if inner is None or inner is probe:
108 return probe
109 probe = inner
110 return probe
113def _build_batch_prompt(
114 query: str,
115 batch: List[Dict[str, Any]],
116 total_in_full: int,
117 prompt_template: str,
118) -> str:
119 """Build the relevance prompt for a single batch of previews.
121 Indices in the prompt are local to the batch (0..len(batch)-1).
122 ``total_in_full`` is the size of the original full preview list,
123 shown to the model for context — it doesn't affect the index range.
124 ``prompt_template`` is passed in (defaulting at the public entry
125 point) rather than read from a module global, so eval harnesses can
126 test variants without monkey-patching module state.
127 """
128 preview_lines = []
129 for i, preview in enumerate(batch):
130 title = preview.get("title", "Untitled").strip()
131 snippet = preview.get("snippet", "").strip()
132 if len(snippet) > _SNIPPET_CHAR_CAP:
133 snippet = snippet[:_SNIPPET_CHAR_CAP] + "..."
134 preview_lines.append(f"[{i}] {title}\n {snippet}")
136 return prompt_template.format(
137 query=query,
138 current_date=datetime.now(UTC).strftime("%Y-%m-%d"),
139 preview_text="\n\n".join(preview_lines),
140 )
143def _run_batch(
144 llm,
145 batch: List[Dict[str, Any]],
146 query: str,
147 total_in_full: int,
148 engine_name: str,
149 prompt_template: str,
150) -> List[int]:
151 """Invoke the LLM on a single batch and return the parsed local indices.
153 Empty list = "none relevant" (valid judgment). Raises on LLM
154 exceptions; the caller falls back to a capped slice in that case.
155 """
156 prompt = _build_batch_prompt(query, batch, total_in_full, prompt_template)
157 return _invoke_text(llm, prompt, engine_name)
160def filter_previews_for_relevance(
161 llm,
162 previews: List[Dict[str, Any]],
163 query: str,
164 max_filtered_results: Optional[int] = None,
165 engine_name: str = "",
166 batch_size: Optional[int] = None,
167 max_parallel_batches: int = 1,
168 prompt_template: Optional[str] = None,
169) -> List[Dict[str, Any]]:
170 """Filter search previews for relevance via plain-text LLM output.
172 Args:
173 llm: LangChain LLM instance (may be wrapped)
174 previews: List of preview dicts with title/snippet/url
175 query: The search query
176 max_filtered_results: Optional cap on the final result count
177 (None = LLM decides)
178 engine_name: Engine class name for log messages
179 batch_size: If set and smaller than ``len(previews)``, the LLM
180 is called once per batch of this many previews. Smaller
181 batches are faster per call and more reliable on weaker
182 models. None or 0 disables batching (single-call mode).
183 max_parallel_batches: Number of batches to dispatch concurrently
184 against the LLM (via a thread pool). 1 = sequential.
185 Most providers (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI,
186 Anthropic) handle concurrent requests fine. Ignored when
187 there is only one batch.
188 prompt_template: Override the built-in ``_RELEVANCE_PROMPT_TEMPLATE``.
189 Used by eval harnesses that want to compare variants without
190 mutating module state. None (default) uses the shipping
191 template.
193 Returns:
194 Filtered list of preview dicts (subset of input). Order matches
195 the original preview order across batches.
196 """
197 if prompt_template is None: 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was always true
198 prompt_template = _RELEVANCE_PROMPT_TEMPLATE
199 if not previews: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 return []
202 for i, preview in enumerate(previews):
203 title = preview.get("title", "Untitled").strip()
204 logger.debug(f"[{engine_name}] INPUT [{i}]: {title[:80]}")
206 # Cap used when the filter is unavailable (LLM exception) so we
207 # don't flood downstream processing with unfiltered results.
208 unavailable_cap = max_filtered_results or DEFAULT_MAX_FILTERED_RESULTS
210 # Determine batch boundaries. A batch_size of None or 0 means
211 # "single batch" — process all previews in one LLM call.
212 effective_batch = (
213 batch_size if (batch_size and batch_size > 0) else len(previews)
214 )
215 batch_starts = list(range(0, len(previews), effective_batch))
216 batches = [previews[s : s + effective_batch] for s in batch_starts]
218 workers = max(1, min(max_parallel_batches, len(batches)))
219 logger.debug(
220 f"[{engine_name}] Dispatching {len(batches)} batch(es) of "
221 f"<= {effective_batch} previews each, {workers} parallel worker(s)"
222 )
224 t0 = time.monotonic()
225 # None marks "batch never completed" (exception or timeout). A
226 # completed batch is List[int], possibly empty (valid "none relevant"
227 # judgment). We distinguish the two so an all-None outcome falls
228 # back to the capped-slice while all-[] is kept as a valid decision.
229 results_per_batch: List[Optional[List[int]]] = [None for _ in batches]
231 if workers == 1:
232 for i, batch in enumerate(batches):
233 try:
234 results_per_batch[i] = _run_batch(
235 llm,
236 batch,
237 query,
238 len(previews),
239 engine_name,
240 prompt_template,
241 )
242 except Exception:
243 logger.exception(
244 f"[{engine_name}] batch {i} failed — skipping its results"
245 )
246 else:
247 # Not using ``with ThreadPoolExecutor(...) as pool:`` on purpose.
248 # ``__exit__`` calls ``shutdown(wait=True)`` which blocks on any
249 # still-running worker — defeating the whole point of the
250 # timeout when a worker is stuck on an Ollama HTTP call. We
251 # manage the pool lifetime explicitly and always shut down with
252 # ``wait=False`` so timed-out batches are orphaned (they'll
253 # error out when the socket eventually fails) rather than
254 # blocking the caller.
255 pool = ThreadPoolExecutor(max_workers=workers)
256 try:
257 futures_to_idx = {
258 pool.submit(
259 _run_batch,
260 llm,
261 batch,
262 query,
263 len(previews),
264 engine_name,
265 prompt_template,
266 ): i
267 for i, batch in enumerate(batches)
268 }
269 try:
270 for fut in as_completed(
271 futures_to_idx, timeout=_FILTER_WALL_TIMEOUT_S
272 ):
273 i = futures_to_idx[fut]
274 try:
275 results_per_batch[i] = fut.result()
276 except Exception:
277 logger.exception(
278 f"[{engine_name}] batch {i} failed — skipping"
279 )
280 except FuturesTimeoutError:
281 pending = [
282 futures_to_idx[f] for f in futures_to_idx if not f.done()
283 ]
284 logger.warning(
285 f"[{engine_name}] {len(pending)} batch(es) still "
286 f"running after {_FILTER_WALL_TIMEOUT_S}s — abandoning "
287 f"(pending indices: {pending})"
288 )
289 finally:
290 # ``cancel_futures=True`` only cancels queued futures the
291 # executor hasn't started yet; already-running workers stay
292 # alive until their blocking call returns. We accept that —
293 # the alternative is hanging forever.
294 pool.shutdown(wait=False, cancel_futures=True)
295 total_elapsed = time.monotonic() - t0
297 # If no batch even completed, fall back to an unfiltered capped slice
298 # — otherwise downstream would receive zero results from a filter
299 # outage rather than from a valid "all irrelevant" judgment.
300 if all(r is None for r in results_per_batch):
301 logger.error(
302 f"[{engine_name}] every relevance-filter batch failed — "
303 f"returning first {unavailable_cap} previews as fallback"
304 )
305 return previews[:unavailable_cap]
307 # Normalise None → [] for the aggregation pass below; a batch that
308 # was cancelled or errored just contributes nothing. Assign to a
309 # new variable with a narrower type so mypy can prove the list
310 # elements are no longer Optional.
311 completed_batches: List[List[int]] = [
312 [] if r is None else r for r in results_per_batch
313 ]
315 # Aggregate results in original batch order so the final list
316 # mirrors the input ordering across batches.
317 ranked_results: List[Dict[str, Any]] = []
318 kept_indices: List[int] = []
319 seen: set = set()
321 for batch_start, batch_result in zip(batch_starts, completed_batches):
322 batch_len = min(effective_batch, len(previews) - batch_start)
323 for li in batch_result:
324 if not (0 <= li < batch_len):
325 continue
326 global_idx = batch_start + li
327 if global_idx in seen:
328 continue
329 seen.add(global_idx)
330 ranked_results.append(previews[global_idx])
331 kept_indices.append(global_idx)
333 logger.info(
334 f"[{engine_name}] LLM relevance filter took {total_elapsed:.1f}s "
335 f"across {len(batches)} batch(es) ({workers} parallel) "
336 f"for {len(previews)} previews"
337 )
339 # Empty result is a valid LLM judgment ("none relevant"). Log a
340 # warning on larger batches so users can notice a misbehaving model,
341 # but do not override the decision.
342 if not ranked_results and len(previews) > 2:
343 logger.warning(
344 f"[{engine_name}] LLM filter judged all {len(previews)} "
345 f"results irrelevant. If this is unexpected, verify your "
346 f"model handles structured output correctly."
347 )
349 # Apply cap if set, keeping ranked_results and kept_indices aligned.
350 if (
351 max_filtered_results is not None
352 and len(ranked_results) > max_filtered_results
353 ):
354 ranked_results = ranked_results[:max_filtered_results]
355 kept_indices = kept_indices[:max_filtered_results]
357 # Log kept/removed/skipped. A preview is "skipped" when its batch
358 # raised or timed out — the judge never saw it — which is distinct
359 # from "removed" (judge returned a verdict that dropped it).
360 skipped_indices: set = set()
361 for i, (batch_start, r) in enumerate(zip(batch_starts, results_per_batch)):
362 if r is None: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true
363 batch_len = min(effective_batch, len(previews) - batch_start)
364 skipped_indices.update(range(batch_start, batch_start + batch_len))
365 removed_indices = (
366 set(range(len(previews))) - set(kept_indices) - skipped_indices
367 )
368 logger.info(
369 f"[{engine_name}] Relevance filter: "
370 f"kept {len(ranked_results)} of {len(previews)} results"
371 )
372 for idx in kept_indices:
373 title = previews[idx].get("title", "Untitled")[:80]
374 logger.debug(f"[{engine_name}] KEPT [{idx}]: {title}")
375 for idx in sorted(removed_indices):
376 title = previews[idx].get("title", "Untitled")[:80]
377 logger.debug(f"[{engine_name}] REMOVED [{idx}]: {title}")
378 for idx in sorted(skipped_indices): 378 ↛ 379line 378 didn't jump to line 379 because the loop on line 378 never started
379 title = previews[idx].get("title", "Untitled")[:80]
380 logger.debug(f"[{engine_name}] SKIPPED [{idx}]: {title}")
382 return ranked_results
385def _invoke_text(llm, prompt: str, engine_name: str) -> List[int]:
386 """Invoke the LLM with a plain text prompt and parse out integer indices.
388 Returns the list of parsed ints (empty list = "no integers found",
389 treated as a valid "none relevant" judgment by the caller).
390 Range/dedup validation happens in ``filter_previews_for_relevance``.
391 """
392 # Ollama thinking-by-default models (qwen3 dense variants, etc.)
393 # burn 30-60s on CoT before emitting the answer. Index selection does
394 # not benefit from reasoning, so suppress it on Ollama where supported.
395 base_llm = _unwrap_llm(llm)
396 invoke_kwargs = {}
397 if isinstance(base_llm, ChatOllama): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 invoke_kwargs["reasoning"] = False
400 result = llm.invoke(prompt, **invoke_kwargs)
402 # LangChain chat models return a Message; LLMs return a string.
403 text = getattr(result, "content", result)
404 if not isinstance(text, str): 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 logger.warning(
406 f"[{engine_name}] Unexpected LLM response type: "
407 f"{type(text).__name__}"
408 )
409 return []
411 indices = [int(m) for m in _INT_RE.findall(text)]
412 logger.debug(
413 f"[{engine_name}] Text output parsed {len(indices)} indices: {indices}"
414 )
415 return indices