Coverage for src / local_deep_research / utilities / es_utils.py: 88%
122 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Elasticsearch utilities for indexing and managing documents.
3"""
5from loguru import logger
6from pathlib import Path
7from typing import Any, Dict, List, Optional
9from elasticsearch import Elasticsearch
10from elasticsearch.helpers import bulk
13class ElasticsearchManager:
14 """
15 Utility class for managing Elasticsearch indices and documents.
17 This class provides methods for creating indices, indexing documents,
18 and performing other Elasticsearch management tasks.
19 """
21 def __init__(
22 self,
23 hosts: List[str] = ["http://localhost:9200"],
24 username: Optional[str] = None,
25 password: Optional[str] = None,
26 api_key: Optional[str] = None,
27 cloud_id: Optional[str] = None,
28 ):
29 """
30 Initialize the Elasticsearch manager.
32 Args:
33 hosts: List of Elasticsearch hosts
34 username: Optional username for authentication
35 password: Optional password for authentication
36 api_key: Optional API key for authentication
37 cloud_id: Optional Elastic Cloud ID
38 """
39 # Initialize the Elasticsearch client
40 es_args = {}
42 # Basic authentication
43 if username and password:
44 es_args["basic_auth"] = (username, password)
46 # API key authentication
47 if api_key:
48 es_args["api_key"] = api_key
50 # Cloud ID for Elastic Cloud
51 if cloud_id:
52 es_args["cloud_id"] = cloud_id
54 # Connect to Elasticsearch
55 self.client = Elasticsearch(hosts, **es_args)
57 # Verify connection
58 try:
59 info = self.client.info()
60 logger.info(
61 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}"
62 )
63 logger.info(
64 f"Elasticsearch version: {info.get('version', {}).get('number')}"
65 )
66 except Exception as e:
67 logger.exception(f"Failed to connect to Elasticsearch: {e!s}")
68 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}")
70 def create_index(
71 self,
72 index_name: str,
73 mappings: Optional[Dict[str, Any]] = None,
74 settings: Optional[Dict[str, Any]] = None,
75 ) -> bool:
76 """
77 Create an Elasticsearch index with optional mappings and settings.
79 Args:
80 index_name: Name of the index to create
81 mappings: Optional mappings for the index fields
82 settings: Optional settings for the index
84 Returns:
85 bool: True if successful, False otherwise
86 """
87 try:
88 # Check if index already exists
89 if self.client.indices.exists(index=index_name):
90 logger.warning(
91 f"Index '{index_name}' already exists - skipping creation"
92 )
93 return True
95 # Default mappings for better text search if none provided
96 if mappings is None:
97 mappings = {
98 "properties": {
99 "title": {
100 "type": "text",
101 "analyzer": "standard",
102 "fields": {
103 "keyword": {
104 "type": "keyword",
105 "ignore_above": 256,
106 }
107 },
108 },
109 "content": {"type": "text", "analyzer": "standard"},
110 "url": {"type": "keyword"},
111 "source": {"type": "keyword"},
112 "timestamp": {"type": "date"},
113 "metadata": {"type": "object", "enabled": True},
114 }
115 }
117 # Default settings if none provided
118 if settings is None:
119 settings = {
120 "number_of_shards": 1,
121 "number_of_replicas": 0,
122 "analysis": {
123 "analyzer": {"standard": {"type": "standard"}}
124 },
125 }
127 # Create the index with mappings and settings
128 create_response = self.client.indices.create(
129 index=index_name,
130 mappings=mappings,
131 settings=settings,
132 )
134 logger.info(f"Created index '{index_name}': {create_response}")
135 return True
137 except Exception as e:
138 logger.exception(f"Error creating index '{index_name}': {e!s}")
139 return False
141 def delete_index(self, index_name: str) -> bool:
142 """
143 Delete an Elasticsearch index.
145 Args:
146 index_name: Name of the index to delete
148 Returns:
149 bool: True if successful, False otherwise
150 """
151 try:
152 # Check if index exists
153 if not self.client.indices.exists(index=index_name):
154 logger.warning(
155 f"Index '{index_name}' does not exist - skipping deletion"
156 )
157 return True
159 # Delete the index
160 delete_response = self.client.indices.delete(index=index_name)
161 logger.info(f"Deleted index '{index_name}': {delete_response}")
162 return True
164 except Exception as e:
165 logger.exception(f"Error deleting index '{index_name}': {e!s}")
166 return False
168 def index_document(
169 self,
170 index_name: str,
171 document: Dict[str, Any],
172 document_id: Optional[str] = None,
173 refresh: bool = False,
174 ) -> Optional[str]:
175 """
176 Index a single document in Elasticsearch.
178 Args:
179 index_name: Name of the index to add the document to
180 document: The document to index
181 document_id: Optional document ID (will be generated if not provided)
182 refresh: Whether to refresh the index after indexing
184 Returns:
185 str: Document ID if successful, None otherwise
186 """
187 try:
188 # Index the document
189 response = self.client.index(
190 index=index_name,
191 document=document,
192 id=document_id,
193 refresh=refresh,
194 )
196 logger.info(
197 f"Indexed document in '{index_name}' with ID: {response['_id']}"
198 )
199 return response["_id"]
201 except Exception as e:
202 logger.exception(
203 f"Error indexing document in '{index_name}': {e!s}"
204 )
205 return None
207 def bulk_index_documents(
208 self,
209 index_name: str,
210 documents: List[Dict[str, Any]],
211 id_field: Optional[str] = None,
212 refresh: bool = False,
213 ) -> int:
214 """
215 Bulk index multiple documents in Elasticsearch.
217 Args:
218 index_name: Name of the index to add the documents to
219 documents: List of documents to index
220 id_field: Optional field in the documents to use as the document ID
221 refresh: Whether to refresh the index after indexing
223 Returns:
224 int: Number of successfully indexed documents
225 """
226 try:
227 # Prepare the bulk actions
228 actions = []
229 for doc in documents:
230 action = {
231 "_index": index_name,
232 "_source": doc,
233 }
235 # Use the specified field as the document ID if provided
236 if id_field and id_field in doc:
237 action["_id"] = doc[id_field]
239 actions.append(action)
241 # Execute the bulk indexing
242 success, failed = bulk(
243 self.client,
244 actions,
245 refresh=refresh,
246 stats_only=True,
247 )
249 logger.info(
250 f"Bulk indexed {success} documents in '{index_name}', failed: {failed}"
251 )
252 return success
254 except Exception as e:
255 logger.exception(
256 f"Error bulk indexing documents in '{index_name}': {e!s}"
257 )
258 return 0
260 def index_file(
261 self,
262 index_name: str,
263 file_path: str,
264 content_field: str = "content",
265 title_field: Optional[str] = "title",
266 extract_metadata: bool = True,
267 refresh: bool = False,
268 ) -> Optional[str]:
269 """
270 Index a file in Elasticsearch, extracting text content and metadata.
272 Args:
273 index_name: Name of the index to add the document to
274 file_path: Path to the file to index
275 content_field: Field name to store the file content
276 title_field: Field name to store the file title (filename if not specified)
277 extract_metadata: Whether to extract file metadata
278 refresh: Whether to refresh the index after indexing
280 Returns:
281 str: Document ID if successful, None otherwise
282 """
283 try:
284 from langchain_community.document_loaders import (
285 UnstructuredFileLoader,
286 )
288 # Extract file content and metadata
289 loader = UnstructuredFileLoader(file_path)
290 documents = loader.load()
292 # Combine all content from the documents
293 content = "\n\n".join([doc.page_content for doc in documents])
295 # Get the filename for the title
296 filename = Path(file_path).name
297 title = filename
299 # Prepare the document
300 document = {
301 content_field: content,
302 }
304 # Add title if requested
305 if title_field:
306 document[title_field] = title
308 # Add metadata if requested
309 if extract_metadata and documents:
310 # Include metadata from the first document
311 document["metadata"] = documents[0].metadata
313 # Add file-specific metadata
314 document["source"] = file_path
315 document["file_extension"] = Path(filename).suffix.lstrip(".")
316 document["filename"] = filename
318 # Index the document
319 return self.index_document(index_name, document, refresh=refresh)
321 except ImportError:
322 logger.error(
323 "UnstructuredFileLoader not available. Please install the 'unstructured' package."
324 )
325 return None
326 except Exception as e:
327 logger.exception(f"Error indexing file '{file_path}': {e!s}")
328 return None
330 def index_directory(
331 self,
332 index_name: str,
333 directory_path: str,
334 file_patterns: List[str] = ["*.txt", "*.pdf", "*.docx", "*.md"],
335 content_field: str = "content",
336 title_field: str = "title",
337 extract_metadata: bool = True,
338 refresh: bool = False,
339 ) -> int:
340 """
341 Index all matching files in a directory in Elasticsearch.
343 Args:
344 index_name: Name of the index to add the documents to
345 directory_path: Path to the directory containing files to index
346 file_patterns: List of file patterns to match (glob patterns)
347 content_field: Field name to store the file content
348 title_field: Field name to store the file title
349 extract_metadata: Whether to extract file metadata
350 refresh: Whether to refresh the index after indexing
352 Returns:
353 int: Number of successfully indexed files
354 """
355 try:
356 # Find all matching files
357 all_files = []
358 directory = Path(directory_path)
359 for pattern in file_patterns:
360 matching_files = list(directory.glob(pattern))
361 all_files.extend([str(f) for f in matching_files])
363 logger.info(
364 f"Found {len(all_files)} files matching patterns {file_patterns} in {directory_path}"
365 )
367 # Index each file
368 successful_count = 0
369 for file_path in all_files:
370 logger.info(f"Indexing file: {file_path}")
371 doc_id = self.index_file(
372 index_name=index_name,
373 file_path=file_path,
374 content_field=content_field,
375 title_field=title_field,
376 extract_metadata=extract_metadata,
377 refresh=refresh,
378 )
380 if doc_id:
381 successful_count += 1
383 logger.info(
384 f"Successfully indexed {successful_count} files out of {len(all_files)}"
385 )
386 return successful_count
388 except Exception as e:
389 logger.exception(
390 f"Error indexing directory '{directory_path}': {e!s}"
391 )
392 return 0
394 def search(
395 self,
396 index_name: str,
397 query: str,
398 fields: List[str] = ["content", "title"],
399 size: int = 10,
400 highlight: bool = True,
401 ) -> Dict[str, Any]:
402 """
403 Search for documents in Elasticsearch.
405 Args:
406 index_name: Name of the index to search
407 query: Search query
408 fields: Fields to search in
409 size: Maximum number of results to return
410 highlight: Whether to include highlighted excerpts in results
412 Returns:
413 Dict: Elasticsearch search response
414 """
415 try:
416 search_query = {
417 "query": {
418 "multi_match": {
419 "query": query,
420 "fields": fields,
421 "type": "best_fields",
422 "tie_breaker": 0.3,
423 }
424 },
425 "size": size,
426 }
428 # Add highlighting if requested
429 if highlight:
430 search_query["highlight"] = {
431 "fields": {field: {} for field in fields},
432 "pre_tags": ["<em>"],
433 "post_tags": ["</em>"],
434 }
436 # Execute the search
437 response = self.client.search(
438 index=index_name,
439 body=search_query,
440 )
442 return response
444 except Exception as e:
445 logger.exception(f"Error searching index '{index_name}': {e!s}")
446 return {"error": "Search failed"}