Coverage for src / local_deep_research / utilities / es_utils.py: 88%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Elasticsearch utilities for indexing and managing documents. 

3""" 

4 

5from loguru import logger 

6from pathlib import Path 

7from typing import Any, Dict, List, Optional 

8 

9from elasticsearch import Elasticsearch 

10from elasticsearch.helpers import bulk 

11 

12 

13class ElasticsearchManager: 

14 """ 

15 Utility class for managing Elasticsearch indices and documents. 

16 

17 This class provides methods for creating indices, indexing documents, 

18 and performing other Elasticsearch management tasks. 

19 """ 

20 

21 def __init__( 

22 self, 

23 hosts: List[str] = ["http://localhost:9200"], 

24 username: Optional[str] = None, 

25 password: Optional[str] = None, 

26 api_key: Optional[str] = None, 

27 cloud_id: Optional[str] = None, 

28 ): 

29 """ 

30 Initialize the Elasticsearch manager. 

31 

32 Args: 

33 hosts: List of Elasticsearch hosts 

34 username: Optional username for authentication 

35 password: Optional password for authentication 

36 api_key: Optional API key for authentication 

37 cloud_id: Optional Elastic Cloud ID 

38 """ 

39 # Initialize the Elasticsearch client 

40 es_args = {} 

41 

42 # Basic authentication 

43 if username and password: 

44 es_args["basic_auth"] = (username, password) 

45 

46 # API key authentication 

47 if api_key: 

48 es_args["api_key"] = api_key 

49 

50 # Cloud ID for Elastic Cloud 

51 if cloud_id: 

52 es_args["cloud_id"] = cloud_id 

53 

54 # Connect to Elasticsearch 

55 self.client = Elasticsearch(hosts, **es_args) 

56 

57 # Verify connection 

58 try: 

59 info = self.client.info() 

60 logger.info( 

61 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}" 

62 ) 

63 logger.info( 

64 f"Elasticsearch version: {info.get('version', {}).get('number')}" 

65 ) 

66 except Exception as e: 

67 logger.exception("Failed to connect to Elasticsearch") 

68 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}") 

69 

70 def create_index( 

71 self, 

72 index_name: str, 

73 mappings: Optional[Dict[str, Any]] = None, 

74 settings: Optional[Dict[str, Any]] = None, 

75 ) -> bool: 

76 """ 

77 Create an Elasticsearch index with optional mappings and settings. 

78 

79 Args: 

80 index_name: Name of the index to create 

81 mappings: Optional mappings for the index fields 

82 settings: Optional settings for the index 

83 

84 Returns: 

85 bool: True if successful, False otherwise 

86 """ 

87 try: 

88 # Check if index already exists 

89 if self.client.indices.exists(index=index_name): 

90 logger.warning( 

91 f"Index '{index_name}' already exists - skipping creation" 

92 ) 

93 return True 

94 

95 # Default mappings for better text search if none provided 

96 if mappings is None: 

97 mappings = { 

98 "properties": { 

99 "title": { 

100 "type": "text", 

101 "analyzer": "standard", 

102 "fields": { 

103 "keyword": { 

104 "type": "keyword", 

105 "ignore_above": 256, 

106 } 

107 }, 

108 }, 

109 "content": {"type": "text", "analyzer": "standard"}, 

110 "url": {"type": "keyword"}, 

111 "source": {"type": "keyword"}, 

112 "timestamp": {"type": "date"}, 

113 "metadata": {"type": "object", "enabled": True}, 

114 } 

115 } 

116 

117 # Default settings if none provided 

118 if settings is None: 

119 settings = { 

120 "number_of_shards": 1, 

121 "number_of_replicas": 0, 

122 "analysis": { 

123 "analyzer": {"standard": {"type": "standard"}} 

124 }, 

125 } 

126 

127 # Create the index with mappings and settings 

128 create_response = self.client.indices.create( 

129 index=index_name, 

130 mappings=mappings, 

131 settings=settings, 

132 ) 

133 

134 logger.info(f"Created index '{index_name}': {create_response}") 

135 return True 

136 

137 except Exception: 

138 logger.exception(f"Error creating index '{index_name}'") 

139 return False 

140 

141 def delete_index(self, index_name: str) -> bool: 

142 """ 

143 Delete an Elasticsearch index. 

144 

145 Args: 

146 index_name: Name of the index to delete 

147 

148 Returns: 

149 bool: True if successful, False otherwise 

150 """ 

151 try: 

152 # Check if index exists 

153 if not self.client.indices.exists(index=index_name): 

154 logger.warning( 

155 f"Index '{index_name}' does not exist - skipping deletion" 

156 ) 

157 return True 

158 

159 # Delete the index 

160 delete_response = self.client.indices.delete(index=index_name) 

161 logger.info(f"Deleted index '{index_name}': {delete_response}") 

162 return True 

163 

164 except Exception: 

165 logger.exception(f"Error deleting index '{index_name}'") 

166 return False 

167 

168 def index_document( 

169 self, 

170 index_name: str, 

171 document: Dict[str, Any], 

172 document_id: Optional[str] = None, 

173 refresh: bool = False, 

174 ) -> Optional[str]: 

175 """ 

176 Index a single document in Elasticsearch. 

177 

178 Args: 

179 index_name: Name of the index to add the document to 

180 document: The document to index 

181 document_id: Optional document ID (will be generated if not provided) 

182 refresh: Whether to refresh the index after indexing 

183 

184 Returns: 

185 str: Document ID if successful, None otherwise 

186 """ 

187 try: 

188 # Index the document 

189 response = self.client.index( 

190 index=index_name, 

191 document=document, 

192 id=document_id, 

193 refresh=refresh, 

194 ) 

195 

196 logger.info( 

197 f"Indexed document in '{index_name}' with ID: {response['_id']}" 

198 ) 

199 return response["_id"] 

200 

201 except Exception: 

202 logger.exception(f"Error indexing document in '{index_name}'") 

203 return None 

204 

205 def bulk_index_documents( 

206 self, 

207 index_name: str, 

208 documents: List[Dict[str, Any]], 

209 id_field: Optional[str] = None, 

210 refresh: bool = False, 

211 ) -> int: 

212 """ 

213 Bulk index multiple documents in Elasticsearch. 

214 

215 Args: 

216 index_name: Name of the index to add the documents to 

217 documents: List of documents to index 

218 id_field: Optional field in the documents to use as the document ID 

219 refresh: Whether to refresh the index after indexing 

220 

221 Returns: 

222 int: Number of successfully indexed documents 

223 """ 

224 try: 

225 # Prepare the bulk actions 

226 actions = [] 

227 for doc in documents: 

228 action = { 

229 "_index": index_name, 

230 "_source": doc, 

231 } 

232 

233 # Use the specified field as the document ID if provided 

234 if id_field and id_field in doc: 

235 action["_id"] = doc[id_field] 

236 

237 actions.append(action) 

238 

239 # Execute the bulk indexing 

240 success, failed = bulk( 

241 self.client, 

242 actions, 

243 refresh=refresh, 

244 stats_only=True, 

245 ) 

246 

247 logger.info( 

248 f"Bulk indexed {success} documents in '{index_name}', failed: {failed}" 

249 ) 

250 return success 

251 

252 except Exception: 

253 logger.exception(f"Error bulk indexing documents in '{index_name}'") 

254 return 0 

255 

256 def index_file( 

257 self, 

258 index_name: str, 

259 file_path: str, 

260 content_field: str = "content", 

261 title_field: Optional[str] = "title", 

262 extract_metadata: bool = True, 

263 refresh: bool = False, 

264 ) -> Optional[str]: 

265 """ 

266 Index a file in Elasticsearch, extracting text content and metadata. 

267 

268 Args: 

269 index_name: Name of the index to add the document to 

270 file_path: Path to the file to index 

271 content_field: Field name to store the file content 

272 title_field: Field name to store the file title (filename if not specified) 

273 extract_metadata: Whether to extract file metadata 

274 refresh: Whether to refresh the index after indexing 

275 

276 Returns: 

277 str: Document ID if successful, None otherwise 

278 """ 

279 try: 

280 from langchain_community.document_loaders import ( 

281 UnstructuredFileLoader, 

282 ) 

283 

284 # Extract file content and metadata 

285 loader = UnstructuredFileLoader(file_path) 

286 documents = loader.load() 

287 

288 # Combine all content from the documents 

289 content = "\n\n".join([doc.page_content for doc in documents]) 

290 

291 # Get the filename for the title 

292 filename = Path(file_path).name 

293 title = filename 

294 

295 # Prepare the document 

296 document = { 

297 content_field: content, 

298 } 

299 

300 # Add title if requested 

301 if title_field: 

302 document[title_field] = title 

303 

304 # Add metadata if requested 

305 if extract_metadata and documents: 

306 # Include metadata from the first document 

307 document["metadata"] = documents[0].metadata 

308 

309 # Add file-specific metadata 

310 document["source"] = file_path 

311 document["file_extension"] = Path(filename).suffix.lstrip(".") 

312 document["filename"] = filename 

313 

314 # Index the document 

315 return self.index_document(index_name, document, refresh=refresh) 

316 

317 except ImportError: 

318 logger.error( 

319 "UnstructuredFileLoader not available. Please install the 'unstructured' package." 

320 ) 

321 return None 

322 except Exception: 

323 logger.exception(f"Error indexing file '{file_path}'") 

324 return None 

325 

326 def index_directory( 

327 self, 

328 index_name: str, 

329 directory_path: str, 

330 file_patterns: List[str] = ["*.txt", "*.pdf", "*.docx", "*.md"], 

331 content_field: str = "content", 

332 title_field: str = "title", 

333 extract_metadata: bool = True, 

334 refresh: bool = False, 

335 ) -> int: 

336 """ 

337 Index all matching files in a directory in Elasticsearch. 

338 

339 Args: 

340 index_name: Name of the index to add the documents to 

341 directory_path: Path to the directory containing files to index 

342 file_patterns: List of file patterns to match (glob patterns) 

343 content_field: Field name to store the file content 

344 title_field: Field name to store the file title 

345 extract_metadata: Whether to extract file metadata 

346 refresh: Whether to refresh the index after indexing 

347 

348 Returns: 

349 int: Number of successfully indexed files 

350 """ 

351 try: 

352 # Find all matching files 

353 all_files = [] 

354 directory = Path(directory_path) 

355 for pattern in file_patterns: 

356 matching_files = list(directory.glob(pattern)) 

357 all_files.extend([str(f) for f in matching_files]) 

358 

359 logger.info( 

360 f"Found {len(all_files)} files matching patterns {file_patterns} in {directory_path}" 

361 ) 

362 

363 # Index each file 

364 successful_count = 0 

365 for file_path in all_files: 

366 logger.info(f"Indexing file: {file_path}") 

367 doc_id = self.index_file( 

368 index_name=index_name, 

369 file_path=file_path, 

370 content_field=content_field, 

371 title_field=title_field, 

372 extract_metadata=extract_metadata, 

373 refresh=refresh, 

374 ) 

375 

376 if doc_id: 

377 successful_count += 1 

378 

379 logger.info( 

380 f"Successfully indexed {successful_count} files out of {len(all_files)}" 

381 ) 

382 return successful_count 

383 

384 except Exception: 

385 logger.exception(f"Error indexing directory '{directory_path}'") 

386 return 0 

387 

388 def search( 

389 self, 

390 index_name: str, 

391 query: str, 

392 fields: List[str] = ["content", "title"], 

393 size: int = 10, 

394 highlight: bool = True, 

395 ) -> Dict[str, Any]: 

396 """ 

397 Search for documents in Elasticsearch. 

398 

399 Args: 

400 index_name: Name of the index to search 

401 query: Search query 

402 fields: Fields to search in 

403 size: Maximum number of results to return 

404 highlight: Whether to include highlighted excerpts in results 

405 

406 Returns: 

407 Dict: Elasticsearch search response 

408 """ 

409 try: 

410 search_query = { 

411 "query": { 

412 "multi_match": { 

413 "query": query, 

414 "fields": fields, 

415 "type": "best_fields", 

416 "tie_breaker": 0.3, 

417 } 

418 }, 

419 "size": size, 

420 } 

421 

422 # Add highlighting if requested 

423 if highlight: 

424 search_query["highlight"] = { 

425 "fields": {field: {} for field in fields}, 

426 "pre_tags": ["<em>"], 

427 "post_tags": ["</em>"], 

428 } 

429 

430 # Execute the search 

431 response = self.client.search( 

432 index=index_name, 

433 body=search_query, 

434 ) 

435 

436 return response 

437 

438 except Exception: 

439 logger.exception(f"Error searching index '{index_name}'") 

440 return {"error": "Search failed"}