Coverage for src / local_deep_research / utilities / es_utils.py: 88%
122 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Elasticsearch utilities for indexing and managing documents.
3"""
5from loguru import logger
6from pathlib import Path
7from typing import Any, Dict, List, Optional
9from elasticsearch import Elasticsearch
10from elasticsearch.helpers import bulk
13class ElasticsearchManager:
14 """
15 Utility class for managing Elasticsearch indices and documents.
17 This class provides methods for creating indices, indexing documents,
18 and performing other Elasticsearch management tasks.
19 """
21 def __init__(
22 self,
23 hosts: List[str] = ["http://localhost:9200"],
24 username: Optional[str] = None,
25 password: Optional[str] = None,
26 api_key: Optional[str] = None,
27 cloud_id: Optional[str] = None,
28 ):
29 """
30 Initialize the Elasticsearch manager.
32 Args:
33 hosts: List of Elasticsearch hosts
34 username: Optional username for authentication
35 password: Optional password for authentication
36 api_key: Optional API key for authentication
37 cloud_id: Optional Elastic Cloud ID
38 """
39 # Initialize the Elasticsearch client
40 es_args = {}
42 # Basic authentication
43 if username and password:
44 es_args["basic_auth"] = (username, password)
46 # API key authentication
47 if api_key:
48 es_args["api_key"] = api_key
50 # Cloud ID for Elastic Cloud
51 if cloud_id:
52 es_args["cloud_id"] = cloud_id
54 # Connect to Elasticsearch
55 self.client = Elasticsearch(hosts, **es_args)
57 # Verify connection
58 try:
59 info = self.client.info()
60 logger.info(
61 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}"
62 )
63 logger.info(
64 f"Elasticsearch version: {info.get('version', {}).get('number')}"
65 )
66 except Exception as e:
67 logger.exception("Failed to connect to Elasticsearch")
68 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}")
70 def create_index(
71 self,
72 index_name: str,
73 mappings: Optional[Dict[str, Any]] = None,
74 settings: Optional[Dict[str, Any]] = None,
75 ) -> bool:
76 """
77 Create an Elasticsearch index with optional mappings and settings.
79 Args:
80 index_name: Name of the index to create
81 mappings: Optional mappings for the index fields
82 settings: Optional settings for the index
84 Returns:
85 bool: True if successful, False otherwise
86 """
87 try:
88 # Check if index already exists
89 if self.client.indices.exists(index=index_name):
90 logger.warning(
91 f"Index '{index_name}' already exists - skipping creation"
92 )
93 return True
95 # Default mappings for better text search if none provided
96 if mappings is None:
97 mappings = {
98 "properties": {
99 "title": {
100 "type": "text",
101 "analyzer": "standard",
102 "fields": {
103 "keyword": {
104 "type": "keyword",
105 "ignore_above": 256,
106 }
107 },
108 },
109 "content": {"type": "text", "analyzer": "standard"},
110 "url": {"type": "keyword"},
111 "source": {"type": "keyword"},
112 "timestamp": {"type": "date"},
113 "metadata": {"type": "object", "enabled": True},
114 }
115 }
117 # Default settings if none provided
118 if settings is None:
119 settings = {
120 "number_of_shards": 1,
121 "number_of_replicas": 0,
122 "analysis": {
123 "analyzer": {"standard": {"type": "standard"}}
124 },
125 }
127 # Create the index with mappings and settings
128 create_response = self.client.indices.create(
129 index=index_name,
130 mappings=mappings,
131 settings=settings,
132 )
134 logger.info(f"Created index '{index_name}': {create_response}")
135 return True
137 except Exception:
138 logger.exception(f"Error creating index '{index_name}'")
139 return False
141 def delete_index(self, index_name: str) -> bool:
142 """
143 Delete an Elasticsearch index.
145 Args:
146 index_name: Name of the index to delete
148 Returns:
149 bool: True if successful, False otherwise
150 """
151 try:
152 # Check if index exists
153 if not self.client.indices.exists(index=index_name):
154 logger.warning(
155 f"Index '{index_name}' does not exist - skipping deletion"
156 )
157 return True
159 # Delete the index
160 delete_response = self.client.indices.delete(index=index_name)
161 logger.info(f"Deleted index '{index_name}': {delete_response}")
162 return True
164 except Exception:
165 logger.exception(f"Error deleting index '{index_name}'")
166 return False
168 def index_document(
169 self,
170 index_name: str,
171 document: Dict[str, Any],
172 document_id: Optional[str] = None,
173 refresh: bool = False,
174 ) -> Optional[str]:
175 """
176 Index a single document in Elasticsearch.
178 Args:
179 index_name: Name of the index to add the document to
180 document: The document to index
181 document_id: Optional document ID (will be generated if not provided)
182 refresh: Whether to refresh the index after indexing
184 Returns:
185 str: Document ID if successful, None otherwise
186 """
187 try:
188 # Index the document
189 response = self.client.index(
190 index=index_name,
191 document=document,
192 id=document_id,
193 refresh=refresh,
194 )
196 logger.info(
197 f"Indexed document in '{index_name}' with ID: {response['_id']}"
198 )
199 return response["_id"]
201 except Exception:
202 logger.exception(f"Error indexing document in '{index_name}'")
203 return None
205 def bulk_index_documents(
206 self,
207 index_name: str,
208 documents: List[Dict[str, Any]],
209 id_field: Optional[str] = None,
210 refresh: bool = False,
211 ) -> int:
212 """
213 Bulk index multiple documents in Elasticsearch.
215 Args:
216 index_name: Name of the index to add the documents to
217 documents: List of documents to index
218 id_field: Optional field in the documents to use as the document ID
219 refresh: Whether to refresh the index after indexing
221 Returns:
222 int: Number of successfully indexed documents
223 """
224 try:
225 # Prepare the bulk actions
226 actions = []
227 for doc in documents:
228 action = {
229 "_index": index_name,
230 "_source": doc,
231 }
233 # Use the specified field as the document ID if provided
234 if id_field and id_field in doc:
235 action["_id"] = doc[id_field]
237 actions.append(action)
239 # Execute the bulk indexing
240 success, failed = bulk(
241 self.client,
242 actions,
243 refresh=refresh,
244 stats_only=True,
245 )
247 logger.info(
248 f"Bulk indexed {success} documents in '{index_name}', failed: {failed}"
249 )
250 return success
252 except Exception:
253 logger.exception(f"Error bulk indexing documents in '{index_name}'")
254 return 0
256 def index_file(
257 self,
258 index_name: str,
259 file_path: str,
260 content_field: str = "content",
261 title_field: Optional[str] = "title",
262 extract_metadata: bool = True,
263 refresh: bool = False,
264 ) -> Optional[str]:
265 """
266 Index a file in Elasticsearch, extracting text content and metadata.
268 Args:
269 index_name: Name of the index to add the document to
270 file_path: Path to the file to index
271 content_field: Field name to store the file content
272 title_field: Field name to store the file title (filename if not specified)
273 extract_metadata: Whether to extract file metadata
274 refresh: Whether to refresh the index after indexing
276 Returns:
277 str: Document ID if successful, None otherwise
278 """
279 try:
280 from langchain_community.document_loaders import (
281 UnstructuredFileLoader,
282 )
284 # Extract file content and metadata
285 loader = UnstructuredFileLoader(file_path)
286 documents = loader.load()
288 # Combine all content from the documents
289 content = "\n\n".join([doc.page_content for doc in documents])
291 # Get the filename for the title
292 filename = Path(file_path).name
293 title = filename
295 # Prepare the document
296 document = {
297 content_field: content,
298 }
300 # Add title if requested
301 if title_field:
302 document[title_field] = title
304 # Add metadata if requested
305 if extract_metadata and documents:
306 # Include metadata from the first document
307 document["metadata"] = documents[0].metadata
309 # Add file-specific metadata
310 document["source"] = file_path
311 document["file_extension"] = Path(filename).suffix.lstrip(".")
312 document["filename"] = filename
314 # Index the document
315 return self.index_document(index_name, document, refresh=refresh)
317 except ImportError:
318 logger.error(
319 "UnstructuredFileLoader not available. Please install the 'unstructured' package."
320 )
321 return None
322 except Exception:
323 logger.exception(f"Error indexing file '{file_path}'")
324 return None
326 def index_directory(
327 self,
328 index_name: str,
329 directory_path: str,
330 file_patterns: List[str] = ["*.txt", "*.pdf", "*.docx", "*.md"],
331 content_field: str = "content",
332 title_field: str = "title",
333 extract_metadata: bool = True,
334 refresh: bool = False,
335 ) -> int:
336 """
337 Index all matching files in a directory in Elasticsearch.
339 Args:
340 index_name: Name of the index to add the documents to
341 directory_path: Path to the directory containing files to index
342 file_patterns: List of file patterns to match (glob patterns)
343 content_field: Field name to store the file content
344 title_field: Field name to store the file title
345 extract_metadata: Whether to extract file metadata
346 refresh: Whether to refresh the index after indexing
348 Returns:
349 int: Number of successfully indexed files
350 """
351 try:
352 # Find all matching files
353 all_files = []
354 directory = Path(directory_path)
355 for pattern in file_patterns:
356 matching_files = list(directory.glob(pattern))
357 all_files.extend([str(f) for f in matching_files])
359 logger.info(
360 f"Found {len(all_files)} files matching patterns {file_patterns} in {directory_path}"
361 )
363 # Index each file
364 successful_count = 0
365 for file_path in all_files:
366 logger.info(f"Indexing file: {file_path}")
367 doc_id = self.index_file(
368 index_name=index_name,
369 file_path=file_path,
370 content_field=content_field,
371 title_field=title_field,
372 extract_metadata=extract_metadata,
373 refresh=refresh,
374 )
376 if doc_id:
377 successful_count += 1
379 logger.info(
380 f"Successfully indexed {successful_count} files out of {len(all_files)}"
381 )
382 return successful_count
384 except Exception:
385 logger.exception(f"Error indexing directory '{directory_path}'")
386 return 0
388 def search(
389 self,
390 index_name: str,
391 query: str,
392 fields: List[str] = ["content", "title"],
393 size: int = 10,
394 highlight: bool = True,
395 ) -> Dict[str, Any]:
396 """
397 Search for documents in Elasticsearch.
399 Args:
400 index_name: Name of the index to search
401 query: Search query
402 fields: Fields to search in
403 size: Maximum number of results to return
404 highlight: Whether to include highlighted excerpts in results
406 Returns:
407 Dict: Elasticsearch search response
408 """
409 try:
410 search_query = {
411 "query": {
412 "multi_match": {
413 "query": query,
414 "fields": fields,
415 "type": "best_fields",
416 "tie_breaker": 0.3,
417 }
418 },
419 "size": size,
420 }
422 # Add highlighting if requested
423 if highlight:
424 search_query["highlight"] = {
425 "fields": {field: {} for field in fields},
426 "pre_tags": ["<em>"],
427 "post_tags": ["</em>"],
428 }
430 # Execute the search
431 response = self.client.search(
432 index=index_name,
433 body=search_query,
434 )
436 return response
438 except Exception:
439 logger.exception(f"Error searching index '{index_name}'")
440 return {"error": "Search failed"}