Coverage for src / local_deep_research / web_search_engines / engines / search_engine_elasticsearch.py: 98%
123 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1from loguru import logger
2from typing import Any, Dict, List, Optional
4from elasticsearch import Elasticsearch
5from langchain_core.language_models import BaseLLM
7from ...config import search_config
8from ...constants import SNIPPET_LENGTH_SHORT
9from ..search_engine_base import BaseSearchEngine
12class ElasticsearchSearchEngine(BaseSearchEngine):
13 """Elasticsearch search engine implementation with two-phase approach"""
15 is_local = True
16 is_lexical = True
17 needs_llm_relevance_filter = True
19 def __init__(
20 self,
21 hosts: Optional[List[str]] = None,
22 index_name: str = "documents",
23 username: Optional[str] = None,
24 password: Optional[str] = None,
25 api_key: Optional[str] = None,
26 cloud_id: Optional[str] = None,
27 max_results: int = 10,
28 highlight_fields: List[str] = ["content", "title"],
29 search_fields: List[str] = ["content", "title"],
30 filter_query: Optional[Dict[str, Any]] = None,
31 llm: Optional[BaseLLM] = None,
32 max_filtered_results: Optional[int] = None,
33 settings_snapshot: Optional[Dict[str, Any]] = None,
34 ):
35 """
36 Initialize the Elasticsearch search engine.
38 Args:
39 hosts: List of Elasticsearch hosts
40 index_name: Name of the index to search
41 username: Optional username for authentication
42 password: Optional password for authentication
43 api_key: Optional API key for authentication
44 cloud_id: Optional Elastic Cloud ID
45 max_results: Maximum number of search results
46 highlight_fields: Fields to highlight in search results
47 search_fields: Fields to search in
48 filter_query: Optional filter query in Elasticsearch DSL format
49 llm: Language model for relevance filtering
50 max_filtered_results: Maximum number of results to keep after filtering
51 """
52 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results
53 super().__init__(
54 llm=llm,
55 max_filtered_results=max_filtered_results,
56 max_results=max_results,
57 settings_snapshot=settings_snapshot,
58 )
60 self.index_name = index_name
61 self.highlight_fields = self._ensure_list(
62 highlight_fields, default=["content", "title"]
63 )
64 self.search_fields = self._ensure_list(
65 search_fields, default=["content", "title"]
66 )
67 self.filter_query = filter_query or {}
69 # Normalize hosts – may arrive as a JSON-encoded string from settings
70 hosts = self._ensure_list(hosts, default=["http://localhost:9200"])
72 # Initialize the Elasticsearch client
73 es_args: Dict[str, Any] = {}
75 # Basic authentication
76 if username and password:
77 es_args["basic_auth"] = (username, password)
79 # API key authentication
80 if api_key:
81 es_args["api_key"] = api_key
83 # Cloud ID for Elastic Cloud
84 if cloud_id:
85 es_args["cloud_id"] = cloud_id
87 # Connect to Elasticsearch
88 self.client = Elasticsearch(hosts, **es_args)
90 # Verify connection
91 try:
92 info = self.client.info()
93 logger.info(
94 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}"
95 )
96 logger.info(
97 f"Elasticsearch version: {info.get('version', {}).get('number')}"
98 )
99 except Exception as e:
100 logger.exception("Failed to connect to Elasticsearch")
101 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}")
103 def close(self) -> None:
104 """Close the Elasticsearch client and its connection pool."""
105 from ...utilities.resource_utils import safe_close
107 safe_close(self.client, "Elasticsearch client")
108 super().close()
110 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
111 """
112 Get preview information for Elasticsearch documents.
114 Args:
115 query: The search query
117 Returns:
118 List of preview dictionaries
119 """
120 logger.info(
121 f"Getting document previews from Elasticsearch with query: {query}"
122 )
124 try:
125 # Build the search query
126 search_query = {
127 "query": {
128 "multi_match": {
129 "query": query,
130 "fields": self.search_fields,
131 "type": "best_fields",
132 "tie_breaker": 0.3,
133 }
134 },
135 "highlight": {
136 "fields": {field: {} for field in self.highlight_fields},
137 "pre_tags": ["<em>"],
138 "post_tags": ["</em>"],
139 },
140 "size": self.max_results,
141 }
143 # Add filter if provided
144 if self.filter_query:
145 search_query["query"] = {
146 "bool": {
147 "must": search_query["query"],
148 "filter": self.filter_query,
149 }
150 }
152 # Execute the search
153 response = self.client.search(
154 index=self.index_name,
155 body=search_query,
156 )
158 # Process the search results
159 hits = response.get("hits", {}).get("hits", [])
161 # Format results as previews with basic information
162 previews = []
163 for hit in hits:
164 source = hit.get("_source", {})
165 highlight = hit.get("highlight", {})
167 # Extract highlighted snippets or fall back to original content
168 snippet = ""
169 for field in self.highlight_fields:
170 if highlight.get(field):
171 # Join all highlights for this field
172 field_snippets = " ... ".join(highlight[field])
173 snippet += field_snippets + " "
175 # If no highlights, use a portion of the content
176 if not snippet and "content" in source:
177 content = source.get("content", "")
178 snippet = (
179 content[:SNIPPET_LENGTH_SHORT] + "..."
180 if len(content) > SNIPPET_LENGTH_SHORT
181 else content
182 )
184 # Create preview object
185 preview = {
186 "id": hit.get("_id", ""),
187 "title": source.get("title", "Untitled Document"),
188 "link": source.get("url", "")
189 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}",
190 "snippet": snippet.strip(),
191 "score": hit.get("_score", 0),
192 "_index": hit.get("_index", self.index_name),
193 }
195 previews.append(preview)
197 logger.info(
198 f"Found {len(previews)} preview results from Elasticsearch"
199 )
200 return previews
202 except Exception:
203 logger.exception("Error getting Elasticsearch previews")
204 return []
206 def _get_full_content(
207 self, relevant_items: List[Dict[str, Any]]
208 ) -> List[Dict[str, Any]]:
209 """
210 Get full content for the relevant Elasticsearch documents.
212 Args:
213 relevant_items: List of relevant preview dictionaries
215 Returns:
216 List of result dictionaries with full content
217 """
218 # Check if we should get full content
219 if (
220 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
221 and search_config.SEARCH_SNIPPETS_ONLY
222 ):
223 logger.info("Snippet-only mode, skipping full content retrieval")
224 return relevant_items
226 logger.info("Getting full content for relevant Elasticsearch documents")
228 results = []
229 for item in relevant_items:
230 # Start with the preview data
231 result = item.copy()
233 # Get the document ID
234 doc_id = item.get("id")
235 if not doc_id:
236 # Skip items without ID
237 logger.warning(f"Skipping item without ID: {item}")
238 results.append(result)
239 continue
241 try:
242 # Fetch the full document
243 doc_response = self.client.get(
244 index=self.index_name,
245 id=doc_id,
246 )
248 # Get the source document
249 source = doc_response.get("_source", {})
251 # Add full content to the result
252 result["content"] = source.get(
253 "content", result.get("snippet", "")
254 )
255 result["full_content"] = source.get("content", "")
257 # Add metadata from source
258 for key, value in source.items():
259 if key not in result and key not in ["content"]:
260 result[key] = value
262 except Exception:
263 logger.exception(
264 f"Error fetching full content for document {doc_id}"
265 )
266 # Keep the preview data if we can't get the full content
268 results.append(result)
270 return results
272 def search_by_query_string(self, query_string: str) -> List[Dict[str, Any]]:
273 """
274 Perform a search using Elasticsearch Query String syntax.
276 Args:
277 query_string: The query in Elasticsearch Query String syntax
279 Returns:
280 List of search results
281 """
282 try:
283 # Build the search query
284 search_query = {
285 "query": {
286 "query_string": {
287 "query": query_string,
288 "fields": self.search_fields,
289 }
290 },
291 "highlight": {
292 "fields": {field: {} for field in self.highlight_fields},
293 "pre_tags": ["<em>"],
294 "post_tags": ["</em>"],
295 },
296 "size": self.max_results,
297 }
299 # Execute the search
300 response = self.client.search(
301 index=self.index_name,
302 body=search_query,
303 )
305 # Process and return the results
306 previews = self._process_es_response(response)
307 return self._get_full_content(previews)
309 except Exception:
310 logger.exception("Error in query_string search")
311 return []
313 def search_by_dsl(self, query_dsl: Dict[str, Any]) -> List[Dict[str, Any]]:
314 """
315 Perform a search using Elasticsearch DSL (Query Domain Specific Language).
317 Args:
318 query_dsl: The query in Elasticsearch DSL format
320 Returns:
321 List of search results
322 """
323 try:
324 # Execute the search with the provided DSL
325 response = self.client.search(
326 index=self.index_name,
327 body=query_dsl,
328 )
330 # Process and return the results
331 previews = self._process_es_response(response)
332 return self._get_full_content(previews)
334 except Exception:
335 logger.exception("Error in DSL search")
336 return []
338 def _process_es_response(self, response: Any) -> List[Dict[str, Any]]:
339 """
340 Process Elasticsearch response into preview dictionaries.
342 Args:
343 response: Elasticsearch response dictionary
345 Returns:
346 List of preview dictionaries
347 """
348 hits = response.get("hits", {}).get("hits", [])
350 # Format results as previews
351 previews = []
352 for hit in hits:
353 source = hit.get("_source", {})
354 highlight = hit.get("highlight", {})
356 # Extract highlighted snippets or fall back to original content
357 snippet = ""
358 for field in self.highlight_fields:
359 if highlight.get(field):
360 field_snippets = " ... ".join(highlight[field])
361 snippet += field_snippets + " "
363 # If no highlights, use a portion of the content
364 if not snippet and "content" in source:
365 content = source.get("content", "")
366 snippet = (
367 content[:SNIPPET_LENGTH_SHORT] + "..."
368 if len(content) > SNIPPET_LENGTH_SHORT
369 else content
370 )
372 # Create preview object
373 preview = {
374 "id": hit.get("_id", ""),
375 "title": source.get("title", "Untitled Document"),
376 "link": source.get("url", "")
377 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}",
378 "snippet": snippet.strip(),
379 "score": hit.get("_score", 0),
380 "_index": hit.get("_index", self.index_name),
381 }
383 previews.append(preview)
385 return previews