Coverage for src / local_deep_research / utilities / es_utils.py: 88%

122 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Elasticsearch utilities for indexing and managing documents. 

3""" 

4 

5from loguru import logger 

6from pathlib import Path 

7from typing import Any, Dict, List, Optional 

8 

9from elasticsearch import Elasticsearch 

10from elasticsearch.helpers import bulk 

11 

12 

13class ElasticsearchManager: 

14 """ 

15 Utility class for managing Elasticsearch indices and documents. 

16 

17 This class provides methods for creating indices, indexing documents, 

18 and performing other Elasticsearch management tasks. 

19 """ 

20 

21 def __init__( 

22 self, 

23 hosts: List[str] = ["http://localhost:9200"], 

24 username: Optional[str] = None, 

25 password: Optional[str] = None, 

26 api_key: Optional[str] = None, 

27 cloud_id: Optional[str] = None, 

28 ): 

29 """ 

30 Initialize the Elasticsearch manager. 

31 

32 Args: 

33 hosts: List of Elasticsearch hosts 

34 username: Optional username for authentication 

35 password: Optional password for authentication 

36 api_key: Optional API key for authentication 

37 cloud_id: Optional Elastic Cloud ID 

38 """ 

39 # Initialize the Elasticsearch client 

40 es_args = {} 

41 

42 # Basic authentication 

43 if username and password: 

44 es_args["basic_auth"] = (username, password) 

45 

46 # API key authentication 

47 if api_key: 

48 es_args["api_key"] = api_key 

49 

50 # Cloud ID for Elastic Cloud 

51 if cloud_id: 

52 es_args["cloud_id"] = cloud_id 

53 

54 # Connect to Elasticsearch 

55 self.client = Elasticsearch(hosts, **es_args) 

56 

57 # Verify connection 

58 try: 

59 info = self.client.info() 

60 logger.info( 

61 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}" 

62 ) 

63 logger.info( 

64 f"Elasticsearch version: {info.get('version', {}).get('number')}" 

65 ) 

66 except Exception as e: 

67 logger.exception(f"Failed to connect to Elasticsearch: {e!s}") 

68 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}") 

69 

70 def create_index( 

71 self, 

72 index_name: str, 

73 mappings: Optional[Dict[str, Any]] = None, 

74 settings: Optional[Dict[str, Any]] = None, 

75 ) -> bool: 

76 """ 

77 Create an Elasticsearch index with optional mappings and settings. 

78 

79 Args: 

80 index_name: Name of the index to create 

81 mappings: Optional mappings for the index fields 

82 settings: Optional settings for the index 

83 

84 Returns: 

85 bool: True if successful, False otherwise 

86 """ 

87 try: 

88 # Check if index already exists 

89 if self.client.indices.exists(index=index_name): 

90 logger.warning( 

91 f"Index '{index_name}' already exists - skipping creation" 

92 ) 

93 return True 

94 

95 # Default mappings for better text search if none provided 

96 if mappings is None: 

97 mappings = { 

98 "properties": { 

99 "title": { 

100 "type": "text", 

101 "analyzer": "standard", 

102 "fields": { 

103 "keyword": { 

104 "type": "keyword", 

105 "ignore_above": 256, 

106 } 

107 }, 

108 }, 

109 "content": {"type": "text", "analyzer": "standard"}, 

110 "url": {"type": "keyword"}, 

111 "source": {"type": "keyword"}, 

112 "timestamp": {"type": "date"}, 

113 "metadata": {"type": "object", "enabled": True}, 

114 } 

115 } 

116 

117 # Default settings if none provided 

118 if settings is None: 

119 settings = { 

120 "number_of_shards": 1, 

121 "number_of_replicas": 0, 

122 "analysis": { 

123 "analyzer": {"standard": {"type": "standard"}} 

124 }, 

125 } 

126 

127 # Create the index with mappings and settings 

128 create_response = self.client.indices.create( 

129 index=index_name, 

130 mappings=mappings, 

131 settings=settings, 

132 ) 

133 

134 logger.info(f"Created index '{index_name}': {create_response}") 

135 return True 

136 

137 except Exception as e: 

138 logger.exception(f"Error creating index '{index_name}': {e!s}") 

139 return False 

140 

141 def delete_index(self, index_name: str) -> bool: 

142 """ 

143 Delete an Elasticsearch index. 

144 

145 Args: 

146 index_name: Name of the index to delete 

147 

148 Returns: 

149 bool: True if successful, False otherwise 

150 """ 

151 try: 

152 # Check if index exists 

153 if not self.client.indices.exists(index=index_name): 

154 logger.warning( 

155 f"Index '{index_name}' does not exist - skipping deletion" 

156 ) 

157 return True 

158 

159 # Delete the index 

160 delete_response = self.client.indices.delete(index=index_name) 

161 logger.info(f"Deleted index '{index_name}': {delete_response}") 

162 return True 

163 

164 except Exception as e: 

165 logger.exception(f"Error deleting index '{index_name}': {e!s}") 

166 return False 

167 

168 def index_document( 

169 self, 

170 index_name: str, 

171 document: Dict[str, Any], 

172 document_id: Optional[str] = None, 

173 refresh: bool = False, 

174 ) -> Optional[str]: 

175 """ 

176 Index a single document in Elasticsearch. 

177 

178 Args: 

179 index_name: Name of the index to add the document to 

180 document: The document to index 

181 document_id: Optional document ID (will be generated if not provided) 

182 refresh: Whether to refresh the index after indexing 

183 

184 Returns: 

185 str: Document ID if successful, None otherwise 

186 """ 

187 try: 

188 # Index the document 

189 response = self.client.index( 

190 index=index_name, 

191 document=document, 

192 id=document_id, 

193 refresh=refresh, 

194 ) 

195 

196 logger.info( 

197 f"Indexed document in '{index_name}' with ID: {response['_id']}" 

198 ) 

199 return response["_id"] 

200 

201 except Exception as e: 

202 logger.exception( 

203 f"Error indexing document in '{index_name}': {e!s}" 

204 ) 

205 return None 

206 

207 def bulk_index_documents( 

208 self, 

209 index_name: str, 

210 documents: List[Dict[str, Any]], 

211 id_field: Optional[str] = None, 

212 refresh: bool = False, 

213 ) -> int: 

214 """ 

215 Bulk index multiple documents in Elasticsearch. 

216 

217 Args: 

218 index_name: Name of the index to add the documents to 

219 documents: List of documents to index 

220 id_field: Optional field in the documents to use as the document ID 

221 refresh: Whether to refresh the index after indexing 

222 

223 Returns: 

224 int: Number of successfully indexed documents 

225 """ 

226 try: 

227 # Prepare the bulk actions 

228 actions = [] 

229 for doc in documents: 

230 action = { 

231 "_index": index_name, 

232 "_source": doc, 

233 } 

234 

235 # Use the specified field as the document ID if provided 

236 if id_field and id_field in doc: 

237 action["_id"] = doc[id_field] 

238 

239 actions.append(action) 

240 

241 # Execute the bulk indexing 

242 success, failed = bulk( 

243 self.client, 

244 actions, 

245 refresh=refresh, 

246 stats_only=True, 

247 ) 

248 

249 logger.info( 

250 f"Bulk indexed {success} documents in '{index_name}', failed: {failed}" 

251 ) 

252 return success 

253 

254 except Exception as e: 

255 logger.exception( 

256 f"Error bulk indexing documents in '{index_name}': {e!s}" 

257 ) 

258 return 0 

259 

260 def index_file( 

261 self, 

262 index_name: str, 

263 file_path: str, 

264 content_field: str = "content", 

265 title_field: Optional[str] = "title", 

266 extract_metadata: bool = True, 

267 refresh: bool = False, 

268 ) -> Optional[str]: 

269 """ 

270 Index a file in Elasticsearch, extracting text content and metadata. 

271 

272 Args: 

273 index_name: Name of the index to add the document to 

274 file_path: Path to the file to index 

275 content_field: Field name to store the file content 

276 title_field: Field name to store the file title (filename if not specified) 

277 extract_metadata: Whether to extract file metadata 

278 refresh: Whether to refresh the index after indexing 

279 

280 Returns: 

281 str: Document ID if successful, None otherwise 

282 """ 

283 try: 

284 from langchain_community.document_loaders import ( 

285 UnstructuredFileLoader, 

286 ) 

287 

288 # Extract file content and metadata 

289 loader = UnstructuredFileLoader(file_path) 

290 documents = loader.load() 

291 

292 # Combine all content from the documents 

293 content = "\n\n".join([doc.page_content for doc in documents]) 

294 

295 # Get the filename for the title 

296 filename = Path(file_path).name 

297 title = filename 

298 

299 # Prepare the document 

300 document = { 

301 content_field: content, 

302 } 

303 

304 # Add title if requested 

305 if title_field: 

306 document[title_field] = title 

307 

308 # Add metadata if requested 

309 if extract_metadata and documents: 

310 # Include metadata from the first document 

311 document["metadata"] = documents[0].metadata 

312 

313 # Add file-specific metadata 

314 document["source"] = file_path 

315 document["file_extension"] = Path(filename).suffix.lstrip(".") 

316 document["filename"] = filename 

317 

318 # Index the document 

319 return self.index_document(index_name, document, refresh=refresh) 

320 

321 except ImportError: 

322 logger.error( 

323 "UnstructuredFileLoader not available. Please install the 'unstructured' package." 

324 ) 

325 return None 

326 except Exception as e: 

327 logger.exception(f"Error indexing file '{file_path}': {e!s}") 

328 return None 

329 

330 def index_directory( 

331 self, 

332 index_name: str, 

333 directory_path: str, 

334 file_patterns: List[str] = ["*.txt", "*.pdf", "*.docx", "*.md"], 

335 content_field: str = "content", 

336 title_field: str = "title", 

337 extract_metadata: bool = True, 

338 refresh: bool = False, 

339 ) -> int: 

340 """ 

341 Index all matching files in a directory in Elasticsearch. 

342 

343 Args: 

344 index_name: Name of the index to add the documents to 

345 directory_path: Path to the directory containing files to index 

346 file_patterns: List of file patterns to match (glob patterns) 

347 content_field: Field name to store the file content 

348 title_field: Field name to store the file title 

349 extract_metadata: Whether to extract file metadata 

350 refresh: Whether to refresh the index after indexing 

351 

352 Returns: 

353 int: Number of successfully indexed files 

354 """ 

355 try: 

356 # Find all matching files 

357 all_files = [] 

358 directory = Path(directory_path) 

359 for pattern in file_patterns: 

360 matching_files = list(directory.glob(pattern)) 

361 all_files.extend([str(f) for f in matching_files]) 

362 

363 logger.info( 

364 f"Found {len(all_files)} files matching patterns {file_patterns} in {directory_path}" 

365 ) 

366 

367 # Index each file 

368 successful_count = 0 

369 for file_path in all_files: 

370 logger.info(f"Indexing file: {file_path}") 

371 doc_id = self.index_file( 

372 index_name=index_name, 

373 file_path=file_path, 

374 content_field=content_field, 

375 title_field=title_field, 

376 extract_metadata=extract_metadata, 

377 refresh=refresh, 

378 ) 

379 

380 if doc_id: 

381 successful_count += 1 

382 

383 logger.info( 

384 f"Successfully indexed {successful_count} files out of {len(all_files)}" 

385 ) 

386 return successful_count 

387 

388 except Exception as e: 

389 logger.exception( 

390 f"Error indexing directory '{directory_path}': {e!s}" 

391 ) 

392 return 0 

393 

394 def search( 

395 self, 

396 index_name: str, 

397 query: str, 

398 fields: List[str] = ["content", "title"], 

399 size: int = 10, 

400 highlight: bool = True, 

401 ) -> Dict[str, Any]: 

402 """ 

403 Search for documents in Elasticsearch. 

404 

405 Args: 

406 index_name: Name of the index to search 

407 query: Search query 

408 fields: Fields to search in 

409 size: Maximum number of results to return 

410 highlight: Whether to include highlighted excerpts in results 

411 

412 Returns: 

413 Dict: Elasticsearch search response 

414 """ 

415 try: 

416 search_query = { 

417 "query": { 

418 "multi_match": { 

419 "query": query, 

420 "fields": fields, 

421 "type": "best_fields", 

422 "tie_breaker": 0.3, 

423 } 

424 }, 

425 "size": size, 

426 } 

427 

428 # Add highlighting if requested 

429 if highlight: 

430 search_query["highlight"] = { 

431 "fields": {field: {} for field in fields}, 

432 "pre_tags": ["<em>"], 

433 "post_tags": ["</em>"], 

434 } 

435 

436 # Execute the search 

437 response = self.client.search( 

438 index=index_name, 

439 body=search_query, 

440 ) 

441 

442 return response 

443 

444 except Exception as e: 

445 logger.exception(f"Error searching index '{index_name}': {e!s}") 

446 return {"error": "Search failed"}