Coverage for src / local_deep_research / web_search_engines / relevance_filter.py: 95%
97 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""LLM-based relevance filter using plain text output.
3Filters search previews by asking the LLM to return a list of relevant
4indices as plain text (e.g. ``0, 2, 5``). We parse the response with a
5regex over integers, which is robust to wrappers like "Indices: 0, 2"
6or "[0, 2, 5]" and dodges all the structured-output provider quirks
7(qwen prose-mode, function_calling latency, schema bikeshedding).
9Design notes:
10- An empty LLM response is treated as a valid judgment ("none of these
11 results are relevant"). We do not second-guess the model — if the
12 filter says reject all, we reject all, and log a warning so users
13 can notice if their chosen model is misbehaving.
14- On exception (network error, parse failure, provider outage) the
15 filter is considered unavailable, not "reject all". In that case we
16 fall back to a capped slice of the original previews so downstream
17 processing is not overwhelmed by unfiltered results.
18- The filter can split large preview lists into smaller ``batch_size``
19 chunks. Smaller batches are faster per call and tend to be more
20 reliable on weaker models which struggle to track many indices in a
21 single context. A failed individual batch is skipped (logged); only
22 a hard exception falls back to the capped slice.
23"""
25import re
26import time
27from concurrent.futures import ThreadPoolExecutor
28from typing import Any, Dict, List, Optional
30from loguru import logger
32from langchain_ollama import ChatOllama
34# Real wrapper chains (RateLimitedLLMWrapper -> ProcessingLLMWrapper -> base)
35# are at most 2-3 levels deep. A depth limit avoids spurious infinite chains
36# on test mocks where every getattr produces a new child object.
37_MAX_UNWRAP_DEPTH = 10
39# Matches standalone integers in the LLM response. ``\b`` ensures we
40# don't match digits glued to letters ("v2"), and the negative lookbehind
41# for ``.`` rejects the fractional part of decimals ("0.5" → just "0").
42# Combined with the prompt instruction to output ONLY the numbers, this
43# is robust against prose like "The top 3 results from 2024 are ...".
44_INT_RE = re.compile(r"(?<![\w.])\d+\b")
47_RELEVANCE_PROMPT_TEMPLATE = """Query: "{query}"
49Search results:
50{preview_text}
52Output ONLY the 0-based indices of relevant results as a comma-separated list, nothing else.
53Example: 0, 2, 5""" # noqa: S608
56def _unwrap_llm(llm):
57 """Unwrap known LLM wrapper chains to get the base LangChain LLM.
59 Walks ``.base_llm`` attributes up to ``_MAX_UNWRAP_DEPTH`` levels.
60 The depth limit guards against test mocks (e.g. ``unittest.mock.Mock``)
61 that lazily create a fresh child object on every attribute access.
62 """
63 probe = llm
64 for _ in range(_MAX_UNWRAP_DEPTH):
65 inner = getattr(probe, "base_llm", None)
66 if inner is None or inner is probe:
67 return probe
68 probe = inner
69 return probe
72def _build_batch_prompt(
73 query: str, batch: List[Dict[str, Any]], total_in_full: int
74) -> str:
75 """Build the relevance prompt for a single batch of previews.
77 Indices in the prompt are local to the batch (0..len(batch)-1).
78 ``total_in_full`` is the size of the original full preview list,
79 shown to the model for context — it doesn't affect the index range.
80 """
81 preview_lines = []
82 for i, preview in enumerate(batch):
83 title = preview.get("title", "Untitled").strip()
84 snippet = preview.get("snippet", "").strip()
85 if len(snippet) > 200:
86 snippet = snippet[:200] + "..."
87 preview_lines.append(f"[{i}] {title}\n {snippet}")
89 return _RELEVANCE_PROMPT_TEMPLATE.format(
90 query=query,
91 preview_text="\n\n".join(preview_lines),
92 )
95def _run_batch(
96 llm,
97 batch: List[Dict[str, Any]],
98 query: str,
99 total_in_full: int,
100 engine_name: str,
101) -> List[int]:
102 """Invoke the LLM on a single batch and return the parsed local indices.
104 Empty list = "none relevant" (valid judgment). Raises on LLM
105 exceptions; the caller falls back to a capped slice in that case.
106 """
107 prompt = _build_batch_prompt(query, batch, total_in_full)
108 return _invoke_text(llm, prompt, engine_name)
111def filter_previews_for_relevance(
112 llm,
113 previews: List[Dict[str, Any]],
114 query: str,
115 max_filtered_results: Optional[int] = None,
116 engine_name: str = "",
117 batch_size: Optional[int] = None,
118 max_parallel_batches: int = 1,
119) -> List[Dict[str, Any]]:
120 """Filter search previews for relevance via plain-text LLM output.
122 Args:
123 llm: LangChain LLM instance (may be wrapped)
124 previews: List of preview dicts with title/snippet/url
125 query: The search query
126 max_filtered_results: Optional cap on the final result count
127 (None = LLM decides)
128 engine_name: Engine class name for log messages
129 batch_size: If set and smaller than ``len(previews)``, the LLM
130 is called once per batch of this many previews. Smaller
131 batches are faster per call and more reliable on weaker
132 models. None or 0 disables batching (single-call mode).
133 max_parallel_batches: Number of batches to dispatch concurrently
134 against the LLM (via a thread pool). 1 = sequential.
135 Most providers (Ollama with OLLAMA_NUM_PARALLEL>1, OpenAI,
136 Anthropic) handle concurrent requests fine. Ignored when
137 there is only one batch.
139 Returns:
140 Filtered list of preview dicts (subset of input). Order matches
141 the original preview order across batches.
142 """
143 if not previews: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 return []
146 for i, preview in enumerate(previews):
147 title = preview.get("title", "Untitled").strip()
148 logger.debug(f"[{engine_name}] INPUT [{i}]: {title[:80]}")
150 # Cap used when the filter is unavailable (LLM exception) so we
151 # don't flood downstream processing with unfiltered results.
152 unavailable_cap = max_filtered_results or 5
154 # Determine batch boundaries. A batch_size of None or 0 means
155 # "single batch" — process all previews in one LLM call.
156 effective_batch = (
157 batch_size if (batch_size and batch_size > 0) else len(previews)
158 )
159 batch_starts = list(range(0, len(previews), effective_batch))
160 batches = [previews[s : s + effective_batch] for s in batch_starts]
162 workers = max(1, min(max_parallel_batches, len(batches)))
163 logger.debug(
164 f"[{engine_name}] Dispatching {len(batches)} batch(es) of "
165 f"<= {effective_batch} previews each, {workers} parallel worker(s)"
166 )
168 t0 = time.monotonic()
169 results_per_batch: List[List[int]] = [[] for _ in batches]
170 try:
171 if workers == 1:
172 for i, batch in enumerate(batches):
173 results_per_batch[i] = _run_batch(
174 llm, batch, query, len(previews), engine_name
175 )
176 else:
177 with ThreadPoolExecutor(max_workers=workers) as pool:
178 futures = {
179 pool.submit(
180 _run_batch,
181 llm,
182 batch,
183 query,
184 len(previews),
185 engine_name,
186 ): i
187 for i, batch in enumerate(batches)
188 }
189 for fut, i in futures.items():
190 results_per_batch[i] = fut.result()
191 except Exception:
192 logger.exception(
193 f"[{engine_name}] LLM relevance filter failed — returning "
194 f"first {unavailable_cap} previews as fallback"
195 )
196 return previews[:unavailable_cap]
197 total_elapsed = time.monotonic() - t0
199 # Aggregate results in original batch order so the final list
200 # mirrors the input ordering across batches.
201 ranked_results: List[Dict[str, Any]] = []
202 kept_indices: List[int] = []
203 seen: set = set()
205 for batch_start, batch_result in zip(batch_starts, results_per_batch):
206 batch_len = min(effective_batch, len(previews) - batch_start)
207 for li in batch_result:
208 if not (0 <= li < batch_len):
209 continue
210 global_idx = batch_start + li
211 if global_idx in seen:
212 continue
213 seen.add(global_idx)
214 ranked_results.append(previews[global_idx])
215 kept_indices.append(global_idx)
217 logger.info(
218 f"[{engine_name}] LLM relevance filter took {total_elapsed:.1f}s "
219 f"across {len(batches)} batch(es) ({workers} parallel) "
220 f"for {len(previews)} previews"
221 )
223 # Empty result is a valid LLM judgment ("none relevant"). Log a
224 # warning on larger batches so users can notice a misbehaving model,
225 # but do not override the decision.
226 if not ranked_results and len(previews) > 2:
227 logger.warning(
228 f"[{engine_name}] LLM filter judged all {len(previews)} "
229 f"results irrelevant. If this is unexpected, verify your "
230 f"model handles structured output correctly."
231 )
233 # Apply cap if set, keeping ranked_results and kept_indices aligned.
234 if (
235 max_filtered_results is not None
236 and len(ranked_results) > max_filtered_results
237 ):
238 ranked_results = ranked_results[:max_filtered_results]
239 kept_indices = kept_indices[:max_filtered_results]
241 # Log kept/removed
242 removed_indices = set(range(len(previews))) - set(kept_indices)
243 logger.info(
244 f"[{engine_name}] Relevance filter: "
245 f"kept {len(ranked_results)} of {len(previews)} results"
246 )
247 for idx in kept_indices:
248 title = previews[idx].get("title", "Untitled")[:80]
249 logger.debug(f"[{engine_name}] KEPT [{idx}]: {title}")
250 for idx in sorted(removed_indices):
251 title = previews[idx].get("title", "Untitled")[:80]
252 logger.debug(f"[{engine_name}] REMOVED [{idx}]: {title}")
254 return ranked_results
257def _invoke_text(llm, prompt: str, engine_name: str) -> List[int]:
258 """Invoke the LLM with a plain text prompt and parse out integer indices.
260 Returns the list of parsed ints (empty list = "no integers found",
261 treated as a valid "none relevant" judgment by the caller).
262 Range/dedup validation happens in ``filter_previews_for_relevance``.
263 """
264 # Ollama thinking-by-default models (qwen3 dense variants, etc.)
265 # burn 30-60s on CoT before emitting the answer. Index selection does
266 # not benefit from reasoning, so suppress it on Ollama where supported.
267 base_llm = _unwrap_llm(llm)
268 invoke_kwargs = {}
269 if isinstance(base_llm, ChatOllama): 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true
270 invoke_kwargs["reasoning"] = False
272 result = llm.invoke(prompt, **invoke_kwargs)
274 # LangChain chat models return a Message; LLMs return a string.
275 text = getattr(result, "content", result)
276 if not isinstance(text, str): 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 logger.warning(
278 f"[{engine_name}] Unexpected LLM response type: "
279 f"{type(text).__name__}"
280 )
281 return []
283 indices = [int(m) for m in _INT_RE.findall(text)]
284 logger.debug(
285 f"[{engine_name}] Text output parsed {len(indices)} indices: {indices}"
286 )
287 return indices