Coverage for src / local_deep_research / utilities / es_utils.py: 96%
126 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Elasticsearch utilities for indexing and managing documents.
3"""
5from loguru import logger
6from pathlib import Path
7from typing import Any, Dict, List, Optional
9from elasticsearch import Elasticsearch
10from elasticsearch.helpers import bulk
13class ElasticsearchManager:
14 """
15 Utility class for managing Elasticsearch indices and documents.
17 This class provides methods for creating indices, indexing documents,
18 and performing other Elasticsearch management tasks.
19 """
21 def __init__(
22 self,
23 hosts: Optional[List[str]] = None,
24 username: Optional[str] = None,
25 password: Optional[str] = None,
26 api_key: Optional[str] = None,
27 cloud_id: Optional[str] = None,
28 ):
29 """
30 Initialize the Elasticsearch manager.
32 Args:
33 hosts: List of Elasticsearch hosts
34 username: Optional username for authentication
35 password: Optional password for authentication
36 api_key: Optional API key for authentication
37 cloud_id: Optional Elastic Cloud ID
38 """
39 hosts = hosts or ["http://localhost:9200"]
40 # Initialize the Elasticsearch client
41 es_args: Dict[str, Any] = {}
43 # Basic authentication
44 if username and password:
45 es_args["basic_auth"] = (username, password)
47 # API key authentication
48 if api_key:
49 es_args["api_key"] = api_key
51 # Cloud ID for Elastic Cloud
52 if cloud_id:
53 es_args["cloud_id"] = cloud_id
55 # Connect to Elasticsearch
56 self.client = Elasticsearch(hosts, **es_args)
58 # Verify connection
59 try:
60 info = self.client.info()
61 logger.info(
62 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}"
63 )
64 logger.info(
65 f"Elasticsearch version: {info.get('version', {}).get('number')}"
66 )
67 except Exception as e:
68 logger.exception("Failed to connect to Elasticsearch")
69 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}")
71 def close(self) -> None:
72 """Close the Elasticsearch client and its connection pool."""
73 from .resource_utils import safe_close
75 safe_close(self.client, "Elasticsearch client")
77 def create_index(
78 self,
79 index_name: str,
80 mappings: Optional[Dict[str, Any]] = None,
81 settings: Optional[Dict[str, Any]] = None,
82 ) -> bool:
83 """
84 Create an Elasticsearch index with optional mappings and settings.
86 Args:
87 index_name: Name of the index to create
88 mappings: Optional mappings for the index fields
89 settings: Optional settings for the index
91 Returns:
92 bool: True if successful, False otherwise
93 """
94 try:
95 # Check if index already exists
96 if self.client.indices.exists(index=index_name):
97 logger.warning(
98 f"Index '{index_name}' already exists - skipping creation"
99 )
100 return True
102 # Default mappings for better text search if none provided
103 if mappings is None:
104 mappings = {
105 "properties": {
106 "title": {
107 "type": "text",
108 "analyzer": "standard",
109 "fields": {
110 "keyword": {
111 "type": "keyword",
112 "ignore_above": 256,
113 }
114 },
115 },
116 "content": {"type": "text", "analyzer": "standard"},
117 "url": {"type": "keyword"},
118 "source": {"type": "keyword"},
119 "timestamp": {"type": "date"},
120 "metadata": {"type": "object", "enabled": True},
121 }
122 }
124 # Default settings if none provided
125 if settings is None:
126 settings = {
127 "number_of_shards": 1,
128 "number_of_replicas": 0,
129 "analysis": {
130 "analyzer": {"standard": {"type": "standard"}}
131 },
132 }
134 # Create the index with mappings and settings
135 create_response = self.client.indices.create(
136 index=index_name,
137 mappings=mappings,
138 settings=settings,
139 )
141 logger.info(f"Created index '{index_name}': {create_response}")
142 return True
144 except Exception:
145 logger.exception(f"Error creating index '{index_name}'")
146 return False
148 def delete_index(self, index_name: str) -> bool:
149 """
150 Delete an Elasticsearch index.
152 Args:
153 index_name: Name of the index to delete
155 Returns:
156 bool: True if successful, False otherwise
157 """
158 try:
159 # Check if index exists
160 if not self.client.indices.exists(index=index_name):
161 logger.warning(
162 f"Index '{index_name}' does not exist - skipping deletion"
163 )
164 return True
166 # Delete the index
167 delete_response = self.client.indices.delete(index=index_name)
168 logger.info(f"Deleted index '{index_name}': {delete_response}")
169 return True
171 except Exception:
172 logger.exception(f"Error deleting index '{index_name}'")
173 return False
175 def index_document(
176 self,
177 index_name: str,
178 document: Dict[str, Any],
179 document_id: Optional[str] = None,
180 refresh: bool = False,
181 ) -> Optional[str]:
182 """
183 Index a single document in Elasticsearch.
185 Args:
186 index_name: Name of the index to add the document to
187 document: The document to index
188 document_id: Optional document ID (will be generated if not provided)
189 refresh: Whether to refresh the index after indexing
191 Returns:
192 str: Document ID if successful, None otherwise
193 """
194 try:
195 # Index the document
196 response = self.client.index(
197 index=index_name,
198 document=document,
199 id=document_id,
200 refresh=refresh,
201 )
203 logger.info(
204 f"Indexed document in '{index_name}' with ID: {response['_id']}"
205 )
206 return response["_id"]
208 except Exception:
209 logger.exception(f"Error indexing document in '{index_name}'")
210 return None
212 def bulk_index_documents(
213 self,
214 index_name: str,
215 documents: List[Dict[str, Any]],
216 id_field: Optional[str] = None,
217 refresh: bool = False,
218 ) -> int:
219 """
220 Bulk index multiple documents in Elasticsearch.
222 Args:
223 index_name: Name of the index to add the documents to
224 documents: List of documents to index
225 id_field: Optional field in the documents to use as the document ID
226 refresh: Whether to refresh the index after indexing
228 Returns:
229 int: Number of successfully indexed documents
230 """
231 try:
232 # Prepare the bulk actions
233 actions = []
234 for doc in documents:
235 action = {
236 "_index": index_name,
237 "_source": doc,
238 }
240 # Use the specified field as the document ID if provided
241 if id_field and id_field in doc:
242 action["_id"] = doc[id_field]
244 actions.append(action)
246 # Execute the bulk indexing
247 success, failed = bulk(
248 self.client,
249 actions,
250 refresh=refresh,
251 stats_only=True,
252 )
254 logger.info(
255 f"Bulk indexed {success} documents in '{index_name}', failed: {failed}"
256 )
257 return success
259 except Exception:
260 logger.exception(f"Error bulk indexing documents in '{index_name}'")
261 return 0
263 def index_file(
264 self,
265 index_name: str,
266 file_path: str,
267 content_field: str = "content",
268 title_field: Optional[str] = "title",
269 extract_metadata: bool = True,
270 refresh: bool = False,
271 ) -> Optional[str]:
272 """
273 Index a file in Elasticsearch, extracting text content and metadata.
275 Args:
276 index_name: Name of the index to add the document to
277 file_path: Path to the file to index
278 content_field: Field name to store the file content
279 title_field: Field name to store the file title (filename if not specified)
280 extract_metadata: Whether to extract file metadata
281 refresh: Whether to refresh the index after indexing
283 Returns:
284 str: Document ID if successful, None otherwise
285 """
286 try:
287 from langchain_community.document_loaders import (
288 UnstructuredFileLoader,
289 )
291 # Extract file content and metadata
292 loader = UnstructuredFileLoader(file_path)
293 documents = loader.load()
295 # Combine all content from the documents
296 content = "\n\n".join([doc.page_content for doc in documents])
298 # Get the filename for the title
299 filename = Path(file_path).name
300 title = filename
302 # Prepare the document
303 document = {
304 content_field: content,
305 }
307 # Add title if requested
308 if title_field: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 document[title_field] = title
311 # Add metadata if requested
312 if extract_metadata and documents: 312 ↛ 322line 312 didn't jump to line 322 because the condition on line 312 was always true
313 # Include metadata from the first document
314 document["metadata"] = documents[0].metadata
316 # Add file-specific metadata
317 document["source"] = file_path
318 document["file_extension"] = Path(filename).suffix.lstrip(".")
319 document["filename"] = filename
321 # Index the document
322 return self.index_document(index_name, document, refresh=refresh)
324 except ImportError:
325 logger.error(
326 "UnstructuredFileLoader not available. Please install the 'unstructured' package."
327 )
328 return None
329 except Exception:
330 logger.exception(f"Error indexing file '{file_path}'")
331 return None
333 def index_directory(
334 self,
335 index_name: str,
336 directory_path: str,
337 file_patterns: Optional[List[str]] = None,
338 content_field: str = "content",
339 title_field: str = "title",
340 extract_metadata: bool = True,
341 refresh: bool = False,
342 ) -> int:
343 """
344 Index all matching files in a directory in Elasticsearch.
346 Args:
347 index_name: Name of the index to add the documents to
348 directory_path: Path to the directory containing files to index
349 file_patterns: List of file patterns to match (glob patterns)
350 content_field: Field name to store the file content
351 title_field: Field name to store the file title
352 extract_metadata: Whether to extract file metadata
353 refresh: Whether to refresh the index after indexing
355 Returns:
356 int: Number of successfully indexed files
357 """
358 file_patterns = file_patterns or ["*.txt", "*.pdf", "*.docx", "*.md"]
359 try:
360 # Find all matching files
361 all_files = []
362 directory = Path(directory_path)
363 for pattern in file_patterns:
364 matching_files = list(directory.glob(pattern))
365 all_files.extend([str(f) for f in matching_files])
367 logger.info(
368 f"Found {len(all_files)} files matching patterns {file_patterns} in {directory_path}"
369 )
371 # Index each file
372 successful_count = 0
373 for file_path in all_files:
374 logger.info(f"Indexing file: {file_path}")
375 doc_id = self.index_file(
376 index_name=index_name,
377 file_path=file_path,
378 content_field=content_field,
379 title_field=title_field,
380 extract_metadata=extract_metadata,
381 refresh=refresh,
382 )
384 if doc_id:
385 successful_count += 1
387 logger.info(
388 f"Successfully indexed {successful_count} files out of {len(all_files)}"
389 )
390 return successful_count
392 except Exception:
393 logger.exception(f"Error indexing directory '{directory_path}'")
394 return 0
396 def search(
397 self,
398 index_name: str,
399 query: str,
400 fields: List[str] = ["content", "title"],
401 size: int = 10,
402 highlight: bool = True,
403 ) -> Dict[str, Any]:
404 """
405 Search for documents in Elasticsearch.
407 Args:
408 index_name: Name of the index to search
409 query: Search query
410 fields: Fields to search in
411 size: Maximum number of results to return
412 highlight: Whether to include highlighted excerpts in results
414 Returns:
415 Dict: Elasticsearch search response
416 """
417 try:
418 search_query = {
419 "query": {
420 "multi_match": {
421 "query": query,
422 "fields": fields,
423 "type": "best_fields",
424 "tie_breaker": 0.3,
425 }
426 },
427 "size": size,
428 }
430 # Add highlighting if requested
431 if highlight:
432 search_query["highlight"] = {
433 "fields": {field: {} for field in fields},
434 "pre_tags": ["<em>"],
435 "post_tags": ["</em>"],
436 }
438 # Execute the search
439 return self.client.search(
440 index=index_name,
441 body=search_query,
442 )
444 except Exception:
445 logger.exception(f"Error searching index '{index_name}'")
446 return {"error": "Search failed"}