Coverage for src / local_deep_research / web_search_engines / engines / search_engine_elasticsearch.py: 75%

114 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1from loguru import logger 

2from typing import Any, Dict, List, Optional 

3 

4from elasticsearch import Elasticsearch 

5from langchain_core.language_models import BaseLLM 

6 

7from ...config import search_config 

8from ..search_engine_base import BaseSearchEngine 

9 

10 

11class ElasticsearchSearchEngine(BaseSearchEngine): 

12 """Elasticsearch search engine implementation with two-phase approach""" 

13 

14 def __init__( 

15 self, 

16 hosts: List[str] = ["http://localhost:9200"], 

17 index_name: str = "documents", 

18 username: Optional[str] = None, 

19 password: Optional[str] = None, 

20 api_key: Optional[str] = None, 

21 cloud_id: Optional[str] = None, 

22 max_results: int = 10, 

23 highlight_fields: List[str] = ["content", "title"], 

24 search_fields: List[str] = ["content", "title"], 

25 filter_query: Optional[Dict[str, Any]] = None, 

26 llm: Optional[BaseLLM] = None, 

27 max_filtered_results: Optional[int] = None, 

28 ): 

29 """ 

30 Initialize the Elasticsearch search engine. 

31 

32 Args: 

33 hosts: List of Elasticsearch hosts 

34 index_name: Name of the index to search 

35 username: Optional username for authentication 

36 password: Optional password for authentication 

37 api_key: Optional API key for authentication 

38 cloud_id: Optional Elastic Cloud ID 

39 max_results: Maximum number of search results 

40 highlight_fields: Fields to highlight in search results 

41 search_fields: Fields to search in 

42 filter_query: Optional filter query in Elasticsearch DSL format 

43 llm: Language model for relevance filtering 

44 max_filtered_results: Maximum number of results to keep after filtering 

45 """ 

46 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

47 super().__init__( 

48 llm=llm, 

49 max_filtered_results=max_filtered_results, 

50 max_results=max_results, 

51 ) 

52 

53 self.index_name = index_name 

54 self.highlight_fields = highlight_fields 

55 self.search_fields = search_fields 

56 self.filter_query = filter_query or {} 

57 

58 # Initialize the Elasticsearch client 

59 es_args = {} 

60 

61 # Basic authentication 

62 if username and password: 

63 es_args["basic_auth"] = (username, password) 

64 

65 # API key authentication 

66 if api_key: 

67 es_args["api_key"] = api_key 

68 

69 # Cloud ID for Elastic Cloud 

70 if cloud_id: 

71 es_args["cloud_id"] = cloud_id 

72 

73 # Connect to Elasticsearch 

74 self.client = Elasticsearch(hosts, **es_args) 

75 

76 # Verify connection 

77 try: 

78 info = self.client.info() 

79 logger.info( 

80 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}" 

81 ) 

82 logger.info( 

83 f"Elasticsearch version: {info.get('version', {}).get('number')}" 

84 ) 

85 except Exception as e: 

86 logger.exception(f"Failed to connect to Elasticsearch: {e!s}") 

87 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}") 

88 

89 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

90 """ 

91 Get preview information for Elasticsearch documents. 

92 

93 Args: 

94 query: The search query 

95 

96 Returns: 

97 List of preview dictionaries 

98 """ 

99 logger.info( 

100 f"Getting document previews from Elasticsearch with query: {query}" 

101 ) 

102 

103 try: 

104 # Build the search query 

105 search_query = { 

106 "query": { 

107 "multi_match": { 

108 "query": query, 

109 "fields": self.search_fields, 

110 "type": "best_fields", 

111 "tie_breaker": 0.3, 

112 } 

113 }, 

114 "highlight": { 

115 "fields": {field: {} for field in self.highlight_fields}, 

116 "pre_tags": ["<em>"], 

117 "post_tags": ["</em>"], 

118 }, 

119 "size": self.max_results, 

120 } 

121 

122 # Add filter if provided 

123 if self.filter_query: 

124 search_query["query"] = { 

125 "bool": { 

126 "must": search_query["query"], 

127 "filter": self.filter_query, 

128 } 

129 } 

130 

131 # Execute the search 

132 response = self.client.search( 

133 index=self.index_name, 

134 body=search_query, 

135 ) 

136 

137 # Process the search results 

138 hits = response.get("hits", {}).get("hits", []) 

139 

140 # Format results as previews with basic information 

141 previews = [] 

142 for hit in hits: 

143 source = hit.get("_source", {}) 

144 highlight = hit.get("highlight", {}) 

145 

146 # Extract highlighted snippets or fall back to original content 

147 snippet = "" 

148 for field in self.highlight_fields: 

149 if highlight.get(field): 

150 # Join all highlights for this field 

151 field_snippets = " ... ".join(highlight[field]) 

152 snippet += field_snippets + " " 

153 

154 # If no highlights, use a portion of the content 

155 if not snippet and "content" in source: 

156 content = source.get("content", "") 

157 snippet = ( 

158 content[:250] + "..." if len(content) > 250 else content 

159 ) 

160 

161 # Create preview object 

162 preview = { 

163 "id": hit.get("_id", ""), 

164 "title": source.get("title", "Untitled Document"), 

165 "link": source.get("url", "") 

166 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}", 

167 "snippet": snippet.strip(), 

168 "score": hit.get("_score", 0), 

169 "_index": hit.get("_index", self.index_name), 

170 } 

171 

172 previews.append(preview) 

173 

174 logger.info( 

175 f"Found {len(previews)} preview results from Elasticsearch" 

176 ) 

177 return previews 

178 

179 except Exception as e: 

180 logger.exception(f"Error getting Elasticsearch previews: {e!s}") 

181 return [] 

182 

183 def _get_full_content( 

184 self, relevant_items: List[Dict[str, Any]] 

185 ) -> List[Dict[str, Any]]: 

186 """ 

187 Get full content for the relevant Elasticsearch documents. 

188 

189 Args: 

190 relevant_items: List of relevant preview dictionaries 

191 

192 Returns: 

193 List of result dictionaries with full content 

194 """ 

195 # Check if we should get full content 

196 if ( 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was never true

197 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

198 and search_config.SEARCH_SNIPPETS_ONLY 

199 ): 

200 logger.info("Snippet-only mode, skipping full content retrieval") 

201 return relevant_items 

202 

203 logger.info("Getting full content for relevant Elasticsearch documents") 

204 

205 results = [] 

206 for item in relevant_items: 

207 # Start with the preview data 

208 result = item.copy() 

209 

210 # Get the document ID 

211 doc_id = item.get("id") 

212 if not doc_id: 212 ↛ 214line 212 didn't jump to line 214 because the condition on line 212 was never true

213 # Skip items without ID 

214 logger.warning(f"Skipping item without ID: {item}") 

215 results.append(result) 

216 continue 

217 

218 try: 

219 # Fetch the full document 

220 doc_response = self.client.get( 

221 index=self.index_name, 

222 id=doc_id, 

223 ) 

224 

225 # Get the source document 

226 source = doc_response.get("_source", {}) 

227 

228 # Add full content to the result 

229 result["content"] = source.get( 

230 "content", result.get("snippet", "") 

231 ) 

232 result["full_content"] = source.get("content", "") 

233 

234 # Add metadata from source 

235 for key, value in source.items(): 

236 if key not in result and key not in ["content"]: 

237 result[key] = value 

238 

239 except Exception as e: 

240 logger.exception( 

241 f"Error fetching full content for document {doc_id}: {e!s}" 

242 ) 

243 # Keep the preview data if we can't get the full content 

244 

245 results.append(result) 

246 

247 return results 

248 

249 def search_by_query_string(self, query_string: str) -> List[Dict[str, Any]]: 

250 """ 

251 Perform a search using Elasticsearch Query String syntax. 

252 

253 Args: 

254 query_string: The query in Elasticsearch Query String syntax 

255 

256 Returns: 

257 List of search results 

258 """ 

259 try: 

260 # Build the search query 

261 search_query = { 

262 "query": { 

263 "query_string": { 

264 "query": query_string, 

265 "fields": self.search_fields, 

266 } 

267 }, 

268 "highlight": { 

269 "fields": {field: {} for field in self.highlight_fields}, 

270 "pre_tags": ["<em>"], 

271 "post_tags": ["</em>"], 

272 }, 

273 "size": self.max_results, 

274 } 

275 

276 # Execute the search 

277 response = self.client.search( 

278 index=self.index_name, 

279 body=search_query, 

280 ) 

281 

282 # Process and return the results 

283 previews = self._process_es_response(response) 

284 return self._get_full_content(previews) 

285 

286 except Exception as e: 

287 logger.exception(f"Error in query_string search: {e!s}") 

288 return [] 

289 

290 def search_by_dsl(self, query_dsl: Dict[str, Any]) -> List[Dict[str, Any]]: 

291 """ 

292 Perform a search using Elasticsearch DSL (Query Domain Specific Language). 

293 

294 Args: 

295 query_dsl: The query in Elasticsearch DSL format 

296 

297 Returns: 

298 List of search results 

299 """ 

300 try: 

301 # Execute the search with the provided DSL 

302 response = self.client.search( 

303 index=self.index_name, 

304 body=query_dsl, 

305 ) 

306 

307 # Process and return the results 

308 previews = self._process_es_response(response) 

309 return self._get_full_content(previews) 

310 

311 except Exception as e: 

312 logger.exception(f"Error in DSL search: {e!s}") 

313 return [] 

314 

315 def _process_es_response( 

316 self, response: Dict[str, Any] 

317 ) -> List[Dict[str, Any]]: 

318 """ 

319 Process Elasticsearch response into preview dictionaries. 

320 

321 Args: 

322 response: Elasticsearch response dictionary 

323 

324 Returns: 

325 List of preview dictionaries 

326 """ 

327 hits = response.get("hits", {}).get("hits", []) 

328 

329 # Format results as previews 

330 previews = [] 

331 for hit in hits: 331 ↛ 332line 331 didn't jump to line 332 because the loop on line 331 never started

332 source = hit.get("_source", {}) 

333 highlight = hit.get("highlight", {}) 

334 

335 # Extract highlighted snippets or fall back to original content 

336 snippet = "" 

337 for field in self.highlight_fields: 

338 if highlight.get(field): 

339 field_snippets = " ... ".join(highlight[field]) 

340 snippet += field_snippets + " " 

341 

342 # If no highlights, use a portion of the content 

343 if not snippet and "content" in source: 

344 content = source.get("content", "") 

345 snippet = ( 

346 content[:250] + "..." if len(content) > 250 else content 

347 ) 

348 

349 # Create preview object 

350 preview = { 

351 "id": hit.get("_id", ""), 

352 "title": source.get("title", "Untitled Document"), 

353 "link": source.get("url", "") 

354 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}", 

355 "snippet": snippet.strip(), 

356 "score": hit.get("_score", 0), 

357 "_index": hit.get("_index", self.index_name), 

358 } 

359 

360 previews.append(preview) 

361 

362 return previews