Coverage for src/local_deep_research/advanced_search_system/filters/cross_engine_filter.py: 98%
88 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Cross-engine search result filter implementation.
3"""
5from typing import Dict, List
7from loguru import logger
9from ...utilities.json_utils import extract_json, get_llm_response_text
10from .base_filter import BaseFilter
13class CrossEngineFilter(BaseFilter):
14 """Filter that ranks and filters results from multiple search engines."""
16 def __init__(
17 self,
18 model,
19 max_results=None,
20 default_reorder=True,
21 default_reindex=True,
22 settings_snapshot=None,
23 ):
24 """
25 Initialize the cross-engine filter.
27 Args:
28 model: Language model to use for relevance assessment
29 max_results: Maximum number of results to keep after filtering
30 default_reorder: Default setting for reordering results by relevance
31 default_reindex: Default setting for reindexing results after filtering
32 settings_snapshot: Settings snapshot for thread context
33 """
34 super().__init__(model)
35 # Import from thread_settings to avoid database dependencies
36 from ...config.thread_settings import (
37 get_setting_from_snapshot,
38 NoSettingsContextError,
39 )
41 # Get max_results from database settings if not provided
42 if max_results is None:
43 try:
44 max_results = get_setting_from_snapshot(
45 "search.cross_engine_max_results",
46 default=100,
47 settings_snapshot=settings_snapshot,
48 )
49 if max_results is not None:
50 max_results = int(max_results)
51 else:
52 max_results = 100
53 except (NoSettingsContextError, TypeError, ValueError):
54 max_results = 100
55 self.max_results = max_results
57 # Max number of result previews shown to the LLM for relevance ranking.
58 # Higher values let the LLM evaluate more candidates but increase prompt
59 # size and latency.
60 try:
61 self.max_context_items = int(
62 get_setting_from_snapshot(
63 "search.cross_engine_max_context_items",
64 default=30,
65 settings_snapshot=settings_snapshot,
66 )
67 )
68 except (NoSettingsContextError, TypeError, ValueError):
69 self.max_context_items = 30
71 self.default_reorder = default_reorder
72 self.default_reindex = default_reindex
74 def _prepare_and_return(self, results, *, reindex, start_index):
75 """Optionally reindex results and return them."""
76 if reindex:
77 for i, result in enumerate(results):
78 result["index"] = str(i + start_index + 1)
79 return results
81 def _valid_unique_indices(self, ranked_indices, upper_bound):
82 """Yield valid indices once, preserving first-seen order."""
83 seen = set()
84 for idx in ranked_indices:
85 if not isinstance(idx, int) or isinstance(idx, bool):
86 logger.warning(
87 f"Skipping non-integer ranked index from cross-engine filter: {idx!r}"
88 )
89 continue
90 if idx in seen:
91 continue
92 if 0 <= idx < upper_bound:
93 seen.add(idx)
94 yield idx
96 def filter_results(
97 self,
98 results: List[Dict],
99 query: str,
100 reorder=None,
101 reindex=None,
102 start_index=0,
103 **kwargs,
104 ) -> List[Dict]:
105 """
106 Filter and rank search results from multiple engines by relevance.
108 Args:
109 results: Combined list of search results from all engines
110 query: The original search query
111 reorder: Whether to reorder results by relevance (default: use instance default)
112 reindex: Whether to update result indices after filtering (default: use instance default)
113 start_index: Starting index for the results (used for continuous indexing)
114 **kwargs: Additional parameters
116 Returns:
117 Filtered list of search results
118 """
119 # Use instance defaults if not specified
120 if reorder is None:
121 reorder = self.default_reorder
122 if reindex is None:
123 reindex = self.default_reindex
125 if not self.model or len(results) <= 10: # Don't filter if few results
126 return self._prepare_and_return(
127 results[: min(self.max_results, len(results))],
128 reindex=reindex,
129 start_index=start_index,
130 )
132 max_context_items = min(self.max_context_items, len(results))
133 context_results = results[:max_context_items]
135 # Create context for LLM
136 preview_context = []
137 for i, result in enumerate(context_results):
138 title = result.get("title", "Untitled").strip()
139 snippet = result.get("snippet", "").strip()
140 engine = result.get("engine", "Unknown engine")
142 # Clean up snippet if too long
143 if len(snippet) > 200:
144 snippet = snippet[:200] + "..."
146 preview_context.append(
147 f"[{i}] Engine: {engine} | Title: {title}\nSnippet: {snippet}"
148 )
150 context = "\n\n".join(preview_context)
152 prompt = f"""You are a search result filter. Your task is to rank search results from multiple engines by relevance to a query.
154Query: "{query}"
156Search Results:
157{context}
159Return the search results as a JSON array of indices, ranked from most to least relevant to the query.
160Only include indices of results that are actually relevant to the query.
161For example: [3, 0, 7, 1]
163If no results seem relevant to the query, return an empty array: []"""
165 try:
166 # Get LLM's evaluation
167 response = self.model.invoke(prompt)
168 response_text = get_llm_response_text(response)
169 ranked_indices = extract_json(response_text, expected_type=list)
171 if ranked_indices is not None:
172 # If not reordering, just filter based on the indices
173 if not reorder:
174 # Just keep the results that were deemed relevant
175 filtered_results = []
176 for idx in sorted(
177 self._valid_unique_indices(
178 ranked_indices, len(context_results)
179 )
180 ): # Sort to maintain original order
181 filtered_results.append(context_results[idx])
183 # Limit results if needed
184 final_results = filtered_results[
185 : min(self.max_results, len(filtered_results))
186 ]
188 if not final_results and results:
189 logger.info(
190 "Cross-engine filtering removed all "
191 "results, returning top 10 originals"
192 )
193 return self._prepare_and_return(
194 context_results[: min(10, len(context_results))],
195 reindex=reindex,
196 start_index=start_index,
197 )
199 logger.info(
200 f"Cross-engine filtering kept {len(final_results)} out of {len(results)} results without reordering"
201 )
202 return self._prepare_and_return(
203 final_results,
204 reindex=reindex,
205 start_index=start_index,
206 )
208 # Create ranked results list (reordering)
209 ranked_results = []
210 for idx in self._valid_unique_indices(
211 ranked_indices, len(context_results)
212 ):
213 ranked_results.append(context_results[idx])
215 # If filtering removed everything, return top results
216 if not ranked_results and results:
217 logger.info(
218 "Cross-engine filtering removed all results, returning top 10 originals instead"
219 )
220 return self._prepare_and_return(
221 context_results[: min(10, len(context_results))],
222 reindex=reindex,
223 start_index=start_index,
224 )
226 # Limit results if needed
227 max_filtered = min(self.max_results, len(ranked_results))
228 final_results = ranked_results[:max_filtered]
230 logger.info(
231 f"Cross-engine filtering kept {len(final_results)} out of {len(results)} results with reordering={reorder}, reindex={reindex}"
232 )
233 return self._prepare_and_return(
234 final_results,
235 reindex=reindex,
236 start_index=start_index,
237 )
238 logger.info(
239 "Could not find JSON array in response, returning original results"
240 )
241 return self._prepare_and_return(
242 context_results[: min(self.max_results, len(context_results))],
243 reindex=reindex,
244 start_index=start_index,
245 )
247 except Exception:
248 logger.exception("Cross-engine filtering error")
249 return self._prepare_and_return(
250 context_results[: min(self.max_results, len(context_results))],
251 reindex=reindex,
252 start_index=start_index,
253 )