Coverage for src / local_deep_research / advanced_search_system / filters / cross_engine_filter.py: 92%

94 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Cross-engine search result filter implementation. 

3""" 

4 

5import json 

6from typing import Dict, List 

7 

8from loguru import logger 

9 

10from ...utilities.search_utilities import remove_think_tags 

11from .base_filter import BaseFilter 

12 

13 

14class CrossEngineFilter(BaseFilter): 

15 """Filter that ranks and filters results from multiple search engines.""" 

16 

17 def __init__( 

18 self, 

19 model, 

20 max_results=None, 

21 default_reorder=True, 

22 default_reindex=True, 

23 settings_snapshot=None, 

24 ): 

25 """ 

26 Initialize the cross-engine filter. 

27 

28 Args: 

29 model: Language model to use for relevance assessment 

30 max_results: Maximum number of results to keep after filtering 

31 default_reorder: Default setting for reordering results by relevance 

32 default_reindex: Default setting for reindexing results after filtering 

33 settings_snapshot: Settings snapshot for thread context 

34 """ 

35 super().__init__(model) 

36 # Get max_results from database settings if not provided 

37 if max_results is None: 

38 # Import from thread_settings to avoid database dependencies 

39 from ...config.thread_settings import ( 

40 get_setting_from_snapshot, 

41 NoSettingsContextError, 

42 ) 

43 

44 try: 

45 max_results = get_setting_from_snapshot( 

46 "search.cross_engine_max_results", 

47 default=100, 

48 settings_snapshot=settings_snapshot, 

49 ) 

50 # Ensure we have an integer 

51 if max_results is not None: 51 ↛ 54line 51 didn't jump to line 54 because the condition on line 51 was always true

52 max_results = int(max_results) 

53 else: 

54 max_results = 100 

55 except (NoSettingsContextError, TypeError, ValueError): 

56 max_results = 100 # Explicit default 

57 self.max_results = max_results 

58 self.default_reorder = default_reorder 

59 self.default_reindex = default_reindex 

60 

61 def filter_results( 

62 self, 

63 results: List[Dict], 

64 query: str, 

65 reorder=None, 

66 reindex=None, 

67 start_index=0, 

68 **kwargs, 

69 ) -> List[Dict]: 

70 """ 

71 Filter and rank search results from multiple engines by relevance. 

72 

73 Args: 

74 results: Combined list of search results from all engines 

75 query: The original search query 

76 reorder: Whether to reorder results by relevance (default: use instance default) 

77 reindex: Whether to update result indices after filtering (default: use instance default) 

78 start_index: Starting index for the results (used for continuous indexing) 

79 **kwargs: Additional parameters 

80 

81 Returns: 

82 Filtered list of search results 

83 """ 

84 # Use instance defaults if not specified 

85 if reorder is None: 

86 reorder = self.default_reorder 

87 if reindex is None: 

88 reindex = self.default_reindex 

89 

90 if not self.model or len(results) <= 10: # Don't filter if few results 

91 # Even if not filtering, update indices if requested 

92 if reindex: 92 ↛ 97line 92 didn't jump to line 97 because the condition on line 92 was always true

93 for i, result in enumerate( 

94 results[: min(self.max_results, len(results))] 

95 ): 

96 result["index"] = str(i + start_index + 1) 

97 return results[: min(self.max_results, len(results))] 

98 

99 # Create context for LLM 

100 preview_context = [] 

101 for i, result in enumerate(results): 

102 title = result.get("title", "Untitled").strip() 

103 snippet = result.get("snippet", "").strip() 

104 engine = result.get("engine", "Unknown engine") 

105 

106 # Clean up snippet if too long 

107 if len(snippet) > 200: 

108 snippet = snippet[:200] + "..." 

109 

110 preview_context.append( 

111 f"[{i}] Engine: {engine} | Title: {title}\nSnippet: {snippet}" 

112 ) 

113 

114 # Set a reasonable limit on context length 

115 max_context_items = min(30, len(preview_context)) 

116 context = "\n\n".join(preview_context[:max_context_items]) 

117 

118 prompt = f"""You are a search result filter. Your task is to rank search results from multiple engines by relevance to a query. 

119 

120Query: "{query}" 

121 

122Search Results: 

123{context} 

124 

125Return the search results as a JSON array of indices, ranked from most to least relevant to the query. 

126Only include indices of results that are actually relevant to the query. 

127For example: [3, 0, 7, 1] 

128 

129If no results seem relevant to the query, return an empty array: []""" 

130 

131 try: 

132 # Get LLM's evaluation 

133 response = self.model.invoke(prompt) 

134 

135 # Extract response text 

136 if hasattr(response, "content"): 136 ↛ 139line 136 didn't jump to line 139 because the condition on line 136 was always true

137 response_text = remove_think_tags(response.content) 

138 else: 

139 response_text = remove_think_tags(str(response)) 

140 

141 # Clean up response 

142 response_text = response_text.strip() 

143 

144 # Find JSON array in response 

145 start_idx = response_text.find("[") 

146 end_idx = response_text.rfind("]") 

147 

148 if start_idx >= 0 and end_idx > start_idx: 

149 array_text = response_text[start_idx : end_idx + 1] 

150 ranked_indices = json.loads(array_text) 

151 

152 # If not reordering, just filter based on the indices 

153 if not reorder: 

154 # Just keep the results that were deemed relevant 

155 filtered_results = [] 

156 for idx in sorted( 

157 ranked_indices 

158 ): # Sort to maintain original order 

159 if idx < len(results): 159 ↛ 156line 159 didn't jump to line 156 because the condition on line 159 was always true

160 filtered_results.append(results[idx]) 

161 

162 # Limit results if needed 

163 final_results = filtered_results[ 

164 : min(self.max_results, len(filtered_results)) 

165 ] 

166 

167 # Reindex if requested 

168 if reindex: 168 ↛ 172line 168 didn't jump to line 172 because the condition on line 168 was always true

169 for i, result in enumerate(final_results): 

170 result["index"] = str(i + start_index + 1) 

171 

172 logger.info( 

173 f"Cross-engine filtering kept {len(final_results)} out of {len(results)} results without reordering" 

174 ) 

175 return final_results 

176 

177 # Create ranked results list (reordering) 

178 ranked_results = [] 

179 for idx in ranked_indices: 

180 if idx < len(results): 180 ↛ 179line 180 didn't jump to line 179 because the condition on line 180 was always true

181 ranked_results.append(results[idx]) 

182 

183 # If filtering removed everything, return top results 

184 if not ranked_results and results: 

185 logger.info( 

186 "Cross-engine filtering removed all results, returning top 10 originals instead" 

187 ) 

188 top_results = results[: min(10, len(results))] 

189 # Update indices if requested 

190 if reindex: 190 ↛ 193line 190 didn't jump to line 193 because the condition on line 190 was always true

191 for i, result in enumerate(top_results): 

192 result["index"] = str(i + start_index + 1) 

193 return top_results 

194 

195 # Limit results if needed 

196 max_filtered = min(self.max_results, len(ranked_results)) 

197 final_results = ranked_results[:max_filtered] 

198 

199 # Update indices if requested 

200 if reindex: 200 ↛ 204line 200 didn't jump to line 204 because the condition on line 200 was always true

201 for i, result in enumerate(final_results): 

202 result["index"] = str(i + start_index + 1) 

203 

204 logger.info( 

205 f"Cross-engine filtering kept {len(final_results)} out of {len(results)} results with reordering={reorder}, reindex={reindex}" 

206 ) 

207 return final_results 

208 else: 

209 logger.info( 

210 "Could not find JSON array in response, returning original results" 

211 ) 

212 top_results = results[: min(self.max_results, len(results))] 

213 # Update indices if requested 

214 if reindex: 214 ↛ 217line 214 didn't jump to line 217 because the condition on line 214 was always true

215 for i, result in enumerate(top_results): 

216 result["index"] = str(i + start_index + 1) 

217 return top_results 

218 

219 except Exception: 

220 logger.exception("Cross-engine filtering error") 

221 top_results = results[: min(self.max_results, len(results))] 

222 # Update indices if requested 

223 if reindex: 223 ↛ 226line 223 didn't jump to line 226 because the condition on line 223 was always true

224 for i, result in enumerate(top_results): 

225 result["index"] = str(i + start_index + 1) 

226 return top_results