Coverage for src / local_deep_research / web_search_engines / engines / search_engine_elasticsearch.py: 98%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1from loguru import logger 

2from typing import Any, Dict, List, Optional 

3 

4from elasticsearch import Elasticsearch 

5from langchain_core.language_models import BaseLLM 

6 

7from ...config import search_config 

8from ...constants import SNIPPET_LENGTH_SHORT 

9from ..search_engine_base import BaseSearchEngine 

10 

11 

12class ElasticsearchSearchEngine(BaseSearchEngine): 

13 """Elasticsearch search engine implementation with two-phase approach""" 

14 

15 is_local = True 

16 is_lexical = True 

17 needs_llm_relevance_filter = True 

18 

19 def __init__( 

20 self, 

21 hosts: Optional[List[str]] = None, 

22 index_name: str = "documents", 

23 username: Optional[str] = None, 

24 password: Optional[str] = None, 

25 api_key: Optional[str] = None, 

26 cloud_id: Optional[str] = None, 

27 max_results: int = 10, 

28 highlight_fields: List[str] = ["content", "title"], 

29 search_fields: List[str] = ["content", "title"], 

30 filter_query: Optional[Dict[str, Any]] = None, 

31 llm: Optional[BaseLLM] = None, 

32 max_filtered_results: Optional[int] = None, 

33 settings_snapshot: Optional[Dict[str, Any]] = None, 

34 ): 

35 """ 

36 Initialize the Elasticsearch search engine. 

37 

38 Args: 

39 hosts: List of Elasticsearch hosts 

40 index_name: Name of the index to search 

41 username: Optional username for authentication 

42 password: Optional password for authentication 

43 api_key: Optional API key for authentication 

44 cloud_id: Optional Elastic Cloud ID 

45 max_results: Maximum number of search results 

46 highlight_fields: Fields to highlight in search results 

47 search_fields: Fields to search in 

48 filter_query: Optional filter query in Elasticsearch DSL format 

49 llm: Language model for relevance filtering 

50 max_filtered_results: Maximum number of results to keep after filtering 

51 """ 

52 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results 

53 super().__init__( 

54 llm=llm, 

55 max_filtered_results=max_filtered_results, 

56 max_results=max_results, 

57 settings_snapshot=settings_snapshot, 

58 ) 

59 

60 self.index_name = index_name 

61 self.highlight_fields = self._ensure_list( 

62 highlight_fields, default=["content", "title"] 

63 ) 

64 self.search_fields = self._ensure_list( 

65 search_fields, default=["content", "title"] 

66 ) 

67 self.filter_query = filter_query or {} 

68 

69 # Normalize hosts – may arrive as a JSON-encoded string from settings 

70 hosts = self._ensure_list(hosts, default=["http://localhost:9200"]) 

71 

72 # Initialize the Elasticsearch client 

73 es_args: Dict[str, Any] = {} 

74 

75 # Basic authentication 

76 if username and password: 

77 es_args["basic_auth"] = (username, password) 

78 

79 # API key authentication 

80 if api_key: 

81 es_args["api_key"] = api_key 

82 

83 # Cloud ID for Elastic Cloud 

84 if cloud_id: 

85 es_args["cloud_id"] = cloud_id 

86 

87 # Connect to Elasticsearch 

88 self.client = Elasticsearch(hosts, **es_args) 

89 

90 # Verify connection 

91 try: 

92 info = self.client.info() 

93 logger.info( 

94 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}" 

95 ) 

96 logger.info( 

97 f"Elasticsearch version: {info.get('version', {}).get('number')}" 

98 ) 

99 except Exception as e: 

100 logger.exception("Failed to connect to Elasticsearch") 

101 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}") 

102 

103 def close(self) -> None: 

104 """Close the Elasticsearch client and its connection pool.""" 

105 from ...utilities.resource_utils import safe_close 

106 

107 safe_close(self.client, "Elasticsearch client") 

108 super().close() 

109 

110 def _get_previews(self, query: str) -> List[Dict[str, Any]]: 

111 """ 

112 Get preview information for Elasticsearch documents. 

113 

114 Args: 

115 query: The search query 

116 

117 Returns: 

118 List of preview dictionaries 

119 """ 

120 logger.info( 

121 f"Getting document previews from Elasticsearch with query: {query}" 

122 ) 

123 

124 try: 

125 # Build the search query 

126 search_query = { 

127 "query": { 

128 "multi_match": { 

129 "query": query, 

130 "fields": self.search_fields, 

131 "type": "best_fields", 

132 "tie_breaker": 0.3, 

133 } 

134 }, 

135 "highlight": { 

136 "fields": {field: {} for field in self.highlight_fields}, 

137 "pre_tags": ["<em>"], 

138 "post_tags": ["</em>"], 

139 }, 

140 "size": self.max_results, 

141 } 

142 

143 # Add filter if provided 

144 if self.filter_query: 

145 search_query["query"] = { 

146 "bool": { 

147 "must": search_query["query"], 

148 "filter": self.filter_query, 

149 } 

150 } 

151 

152 # Execute the search 

153 response = self.client.search( 

154 index=self.index_name, 

155 body=search_query, 

156 ) 

157 

158 # Process the search results 

159 hits = response.get("hits", {}).get("hits", []) 

160 

161 # Format results as previews with basic information 

162 previews = [] 

163 for hit in hits: 

164 source = hit.get("_source", {}) 

165 highlight = hit.get("highlight", {}) 

166 

167 # Extract highlighted snippets or fall back to original content 

168 snippet = "" 

169 for field in self.highlight_fields: 

170 if highlight.get(field): 

171 # Join all highlights for this field 

172 field_snippets = " ... ".join(highlight[field]) 

173 snippet += field_snippets + " " 

174 

175 # If no highlights, use a portion of the content 

176 if not snippet and "content" in source: 

177 content = source.get("content", "") 

178 snippet = ( 

179 content[:SNIPPET_LENGTH_SHORT] + "..." 

180 if len(content) > SNIPPET_LENGTH_SHORT 

181 else content 

182 ) 

183 

184 # Create preview object 

185 preview = { 

186 "id": hit.get("_id", ""), 

187 "title": source.get("title", "Untitled Document"), 

188 "link": source.get("url", "") 

189 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}", 

190 "snippet": snippet.strip(), 

191 "score": hit.get("_score", 0), 

192 "_index": hit.get("_index", self.index_name), 

193 } 

194 

195 previews.append(preview) 

196 

197 logger.info( 

198 f"Found {len(previews)} preview results from Elasticsearch" 

199 ) 

200 return previews 

201 

202 except Exception: 

203 logger.exception("Error getting Elasticsearch previews") 

204 return [] 

205 

206 def _get_full_content( 

207 self, relevant_items: List[Dict[str, Any]] 

208 ) -> List[Dict[str, Any]]: 

209 """ 

210 Get full content for the relevant Elasticsearch documents. 

211 

212 Args: 

213 relevant_items: List of relevant preview dictionaries 

214 

215 Returns: 

216 List of result dictionaries with full content 

217 """ 

218 # Check if we should get full content 

219 if ( 

220 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

221 and search_config.SEARCH_SNIPPETS_ONLY 

222 ): 

223 logger.info("Snippet-only mode, skipping full content retrieval") 

224 return relevant_items 

225 

226 logger.info("Getting full content for relevant Elasticsearch documents") 

227 

228 results = [] 

229 for item in relevant_items: 

230 # Start with the preview data 

231 result = item.copy() 

232 

233 # Get the document ID 

234 doc_id = item.get("id") 

235 if not doc_id: 

236 # Skip items without ID 

237 logger.warning(f"Skipping item without ID: {item}") 

238 results.append(result) 

239 continue 

240 

241 try: 

242 # Fetch the full document 

243 doc_response = self.client.get( 

244 index=self.index_name, 

245 id=doc_id, 

246 ) 

247 

248 # Get the source document 

249 source = doc_response.get("_source", {}) 

250 

251 # Add full content to the result 

252 result["content"] = source.get( 

253 "content", result.get("snippet", "") 

254 ) 

255 result["full_content"] = source.get("content", "") 

256 

257 # Add metadata from source 

258 for key, value in source.items(): 

259 if key not in result and key not in ["content"]: 

260 result[key] = value 

261 

262 except Exception: 

263 logger.exception( 

264 f"Error fetching full content for document {doc_id}" 

265 ) 

266 # Keep the preview data if we can't get the full content 

267 

268 results.append(result) 

269 

270 return results 

271 

272 def search_by_query_string(self, query_string: str) -> List[Dict[str, Any]]: 

273 """ 

274 Perform a search using Elasticsearch Query String syntax. 

275 

276 Args: 

277 query_string: The query in Elasticsearch Query String syntax 

278 

279 Returns: 

280 List of search results 

281 """ 

282 try: 

283 # Build the search query 

284 search_query = { 

285 "query": { 

286 "query_string": { 

287 "query": query_string, 

288 "fields": self.search_fields, 

289 } 

290 }, 

291 "highlight": { 

292 "fields": {field: {} for field in self.highlight_fields}, 

293 "pre_tags": ["<em>"], 

294 "post_tags": ["</em>"], 

295 }, 

296 "size": self.max_results, 

297 } 

298 

299 # Execute the search 

300 response = self.client.search( 

301 index=self.index_name, 

302 body=search_query, 

303 ) 

304 

305 # Process and return the results 

306 previews = self._process_es_response(response) 

307 return self._get_full_content(previews) 

308 

309 except Exception: 

310 logger.exception("Error in query_string search") 

311 return [] 

312 

313 def search_by_dsl(self, query_dsl: Dict[str, Any]) -> List[Dict[str, Any]]: 

314 """ 

315 Perform a search using Elasticsearch DSL (Query Domain Specific Language). 

316 

317 Args: 

318 query_dsl: The query in Elasticsearch DSL format 

319 

320 Returns: 

321 List of search results 

322 """ 

323 try: 

324 # Execute the search with the provided DSL 

325 response = self.client.search( 

326 index=self.index_name, 

327 body=query_dsl, 

328 ) 

329 

330 # Process and return the results 

331 previews = self._process_es_response(response) 

332 return self._get_full_content(previews) 

333 

334 except Exception: 

335 logger.exception("Error in DSL search") 

336 return [] 

337 

338 def _process_es_response(self, response: Any) -> List[Dict[str, Any]]: 

339 """ 

340 Process Elasticsearch response into preview dictionaries. 

341 

342 Args: 

343 response: Elasticsearch response dictionary 

344 

345 Returns: 

346 List of preview dictionaries 

347 """ 

348 hits = response.get("hits", {}).get("hits", []) 

349 

350 # Format results as previews 

351 previews = [] 

352 for hit in hits: 

353 source = hit.get("_source", {}) 

354 highlight = hit.get("highlight", {}) 

355 

356 # Extract highlighted snippets or fall back to original content 

357 snippet = "" 

358 for field in self.highlight_fields: 

359 if highlight.get(field): 

360 field_snippets = " ... ".join(highlight[field]) 

361 snippet += field_snippets + " " 

362 

363 # If no highlights, use a portion of the content 

364 if not snippet and "content" in source: 

365 content = source.get("content", "") 

366 snippet = ( 

367 content[:SNIPPET_LENGTH_SHORT] + "..." 

368 if len(content) > SNIPPET_LENGTH_SHORT 

369 else content 

370 ) 

371 

372 # Create preview object 

373 preview = { 

374 "id": hit.get("_id", ""), 

375 "title": source.get("title", "Untitled Document"), 

376 "link": source.get("url", "") 

377 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}", 

378 "snippet": snippet.strip(), 

379 "score": hit.get("_score", 0), 

380 "_index": hit.get("_index", self.index_name), 

381 } 

382 

383 previews.append(preview) 

384 

385 return previews