Coverage for src / local_deep_research / web_search_engines / engines / search_engine_elasticsearch.py: 75%
114 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1from loguru import logger
2from typing import Any, Dict, List, Optional
4from elasticsearch import Elasticsearch
5from langchain_core.language_models import BaseLLM
7from ...config import search_config
8from ..search_engine_base import BaseSearchEngine
11class ElasticsearchSearchEngine(BaseSearchEngine):
12 """Elasticsearch search engine implementation with two-phase approach"""
14 def __init__(
15 self,
16 hosts: List[str] = ["http://localhost:9200"],
17 index_name: str = "documents",
18 username: Optional[str] = None,
19 password: Optional[str] = None,
20 api_key: Optional[str] = None,
21 cloud_id: Optional[str] = None,
22 max_results: int = 10,
23 highlight_fields: List[str] = ["content", "title"],
24 search_fields: List[str] = ["content", "title"],
25 filter_query: Optional[Dict[str, Any]] = None,
26 llm: Optional[BaseLLM] = None,
27 max_filtered_results: Optional[int] = None,
28 ):
29 """
30 Initialize the Elasticsearch search engine.
32 Args:
33 hosts: List of Elasticsearch hosts
34 index_name: Name of the index to search
35 username: Optional username for authentication
36 password: Optional password for authentication
37 api_key: Optional API key for authentication
38 cloud_id: Optional Elastic Cloud ID
39 max_results: Maximum number of search results
40 highlight_fields: Fields to highlight in search results
41 search_fields: Fields to search in
42 filter_query: Optional filter query in Elasticsearch DSL format
43 llm: Language model for relevance filtering
44 max_filtered_results: Maximum number of results to keep after filtering
45 """
46 # Initialize the BaseSearchEngine with LLM, max_filtered_results, and max_results
47 super().__init__(
48 llm=llm,
49 max_filtered_results=max_filtered_results,
50 max_results=max_results,
51 )
53 self.index_name = index_name
54 self.highlight_fields = highlight_fields
55 self.search_fields = search_fields
56 self.filter_query = filter_query or {}
58 # Initialize the Elasticsearch client
59 es_args = {}
61 # Basic authentication
62 if username and password:
63 es_args["basic_auth"] = (username, password)
65 # API key authentication
66 if api_key:
67 es_args["api_key"] = api_key
69 # Cloud ID for Elastic Cloud
70 if cloud_id:
71 es_args["cloud_id"] = cloud_id
73 # Connect to Elasticsearch
74 self.client = Elasticsearch(hosts, **es_args)
76 # Verify connection
77 try:
78 info = self.client.info()
79 logger.info(
80 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}"
81 )
82 logger.info(
83 f"Elasticsearch version: {info.get('version', {}).get('number')}"
84 )
85 except Exception as e:
86 logger.exception(f"Failed to connect to Elasticsearch: {e!s}")
87 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}")
89 def _get_previews(self, query: str) -> List[Dict[str, Any]]:
90 """
91 Get preview information for Elasticsearch documents.
93 Args:
94 query: The search query
96 Returns:
97 List of preview dictionaries
98 """
99 logger.info(
100 f"Getting document previews from Elasticsearch with query: {query}"
101 )
103 try:
104 # Build the search query
105 search_query = {
106 "query": {
107 "multi_match": {
108 "query": query,
109 "fields": self.search_fields,
110 "type": "best_fields",
111 "tie_breaker": 0.3,
112 }
113 },
114 "highlight": {
115 "fields": {field: {} for field in self.highlight_fields},
116 "pre_tags": ["<em>"],
117 "post_tags": ["</em>"],
118 },
119 "size": self.max_results,
120 }
122 # Add filter if provided
123 if self.filter_query:
124 search_query["query"] = {
125 "bool": {
126 "must": search_query["query"],
127 "filter": self.filter_query,
128 }
129 }
131 # Execute the search
132 response = self.client.search(
133 index=self.index_name,
134 body=search_query,
135 )
137 # Process the search results
138 hits = response.get("hits", {}).get("hits", [])
140 # Format results as previews with basic information
141 previews = []
142 for hit in hits:
143 source = hit.get("_source", {})
144 highlight = hit.get("highlight", {})
146 # Extract highlighted snippets or fall back to original content
147 snippet = ""
148 for field in self.highlight_fields:
149 if highlight.get(field):
150 # Join all highlights for this field
151 field_snippets = " ... ".join(highlight[field])
152 snippet += field_snippets + " "
154 # If no highlights, use a portion of the content
155 if not snippet and "content" in source:
156 content = source.get("content", "")
157 snippet = (
158 content[:250] + "..." if len(content) > 250 else content
159 )
161 # Create preview object
162 preview = {
163 "id": hit.get("_id", ""),
164 "title": source.get("title", "Untitled Document"),
165 "link": source.get("url", "")
166 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}",
167 "snippet": snippet.strip(),
168 "score": hit.get("_score", 0),
169 "_index": hit.get("_index", self.index_name),
170 }
172 previews.append(preview)
174 logger.info(
175 f"Found {len(previews)} preview results from Elasticsearch"
176 )
177 return previews
179 except Exception as e:
180 logger.exception(f"Error getting Elasticsearch previews: {e!s}")
181 return []
183 def _get_full_content(
184 self, relevant_items: List[Dict[str, Any]]
185 ) -> List[Dict[str, Any]]:
186 """
187 Get full content for the relevant Elasticsearch documents.
189 Args:
190 relevant_items: List of relevant preview dictionaries
192 Returns:
193 List of result dictionaries with full content
194 """
195 # Check if we should get full content
196 if ( 196 ↛ 200line 196 didn't jump to line 200 because the condition on line 196 was never true
197 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
198 and search_config.SEARCH_SNIPPETS_ONLY
199 ):
200 logger.info("Snippet-only mode, skipping full content retrieval")
201 return relevant_items
203 logger.info("Getting full content for relevant Elasticsearch documents")
205 results = []
206 for item in relevant_items:
207 # Start with the preview data
208 result = item.copy()
210 # Get the document ID
211 doc_id = item.get("id")
212 if not doc_id: 212 ↛ 214line 212 didn't jump to line 214 because the condition on line 212 was never true
213 # Skip items without ID
214 logger.warning(f"Skipping item without ID: {item}")
215 results.append(result)
216 continue
218 try:
219 # Fetch the full document
220 doc_response = self.client.get(
221 index=self.index_name,
222 id=doc_id,
223 )
225 # Get the source document
226 source = doc_response.get("_source", {})
228 # Add full content to the result
229 result["content"] = source.get(
230 "content", result.get("snippet", "")
231 )
232 result["full_content"] = source.get("content", "")
234 # Add metadata from source
235 for key, value in source.items():
236 if key not in result and key not in ["content"]:
237 result[key] = value
239 except Exception as e:
240 logger.exception(
241 f"Error fetching full content for document {doc_id}: {e!s}"
242 )
243 # Keep the preview data if we can't get the full content
245 results.append(result)
247 return results
249 def search_by_query_string(self, query_string: str) -> List[Dict[str, Any]]:
250 """
251 Perform a search using Elasticsearch Query String syntax.
253 Args:
254 query_string: The query in Elasticsearch Query String syntax
256 Returns:
257 List of search results
258 """
259 try:
260 # Build the search query
261 search_query = {
262 "query": {
263 "query_string": {
264 "query": query_string,
265 "fields": self.search_fields,
266 }
267 },
268 "highlight": {
269 "fields": {field: {} for field in self.highlight_fields},
270 "pre_tags": ["<em>"],
271 "post_tags": ["</em>"],
272 },
273 "size": self.max_results,
274 }
276 # Execute the search
277 response = self.client.search(
278 index=self.index_name,
279 body=search_query,
280 )
282 # Process and return the results
283 previews = self._process_es_response(response)
284 return self._get_full_content(previews)
286 except Exception as e:
287 logger.exception(f"Error in query_string search: {e!s}")
288 return []
290 def search_by_dsl(self, query_dsl: Dict[str, Any]) -> List[Dict[str, Any]]:
291 """
292 Perform a search using Elasticsearch DSL (Query Domain Specific Language).
294 Args:
295 query_dsl: The query in Elasticsearch DSL format
297 Returns:
298 List of search results
299 """
300 try:
301 # Execute the search with the provided DSL
302 response = self.client.search(
303 index=self.index_name,
304 body=query_dsl,
305 )
307 # Process and return the results
308 previews = self._process_es_response(response)
309 return self._get_full_content(previews)
311 except Exception as e:
312 logger.exception(f"Error in DSL search: {e!s}")
313 return []
315 def _process_es_response(
316 self, response: Dict[str, Any]
317 ) -> List[Dict[str, Any]]:
318 """
319 Process Elasticsearch response into preview dictionaries.
321 Args:
322 response: Elasticsearch response dictionary
324 Returns:
325 List of preview dictionaries
326 """
327 hits = response.get("hits", {}).get("hits", [])
329 # Format results as previews
330 previews = []
331 for hit in hits: 331 ↛ 332line 331 didn't jump to line 332 because the loop on line 331 never started
332 source = hit.get("_source", {})
333 highlight = hit.get("highlight", {})
335 # Extract highlighted snippets or fall back to original content
336 snippet = ""
337 for field in self.highlight_fields:
338 if highlight.get(field):
339 field_snippets = " ... ".join(highlight[field])
340 snippet += field_snippets + " "
342 # If no highlights, use a portion of the content
343 if not snippet and "content" in source:
344 content = source.get("content", "")
345 snippet = (
346 content[:250] + "..." if len(content) > 250 else content
347 )
349 # Create preview object
350 preview = {
351 "id": hit.get("_id", ""),
352 "title": source.get("title", "Untitled Document"),
353 "link": source.get("url", "")
354 or f"elasticsearch://{self.index_name}/{hit.get('_id', '')}",
355 "snippet": snippet.strip(),
356 "score": hit.get("_score", 0),
357 "_index": hit.get("_index", self.index_name),
358 }
360 previews.append(preview)
362 return previews