Coverage for src / local_deep_research / utilities / es_utils.py: 96%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Elasticsearch utilities for indexing and managing documents. 

3""" 

4 

5from loguru import logger 

6from pathlib import Path 

7from typing import Any, Dict, List, Optional 

8 

9from elasticsearch import Elasticsearch 

10from elasticsearch.helpers import bulk 

11 

12 

13class ElasticsearchManager: 

14 """ 

15 Utility class for managing Elasticsearch indices and documents. 

16 

17 This class provides methods for creating indices, indexing documents, 

18 and performing other Elasticsearch management tasks. 

19 """ 

20 

21 def __init__( 

22 self, 

23 hosts: Optional[List[str]] = None, 

24 username: Optional[str] = None, 

25 password: Optional[str] = None, 

26 api_key: Optional[str] = None, 

27 cloud_id: Optional[str] = None, 

28 ): 

29 """ 

30 Initialize the Elasticsearch manager. 

31 

32 Args: 

33 hosts: List of Elasticsearch hosts 

34 username: Optional username for authentication 

35 password: Optional password for authentication 

36 api_key: Optional API key for authentication 

37 cloud_id: Optional Elastic Cloud ID 

38 """ 

39 hosts = hosts or ["http://localhost:9200"] 

40 # Initialize the Elasticsearch client 

41 es_args: Dict[str, Any] = {} 

42 

43 # Basic authentication 

44 if username and password: 

45 es_args["basic_auth"] = (username, password) 

46 

47 # API key authentication 

48 if api_key: 

49 es_args["api_key"] = api_key 

50 

51 # Cloud ID for Elastic Cloud 

52 if cloud_id: 

53 es_args["cloud_id"] = cloud_id 

54 

55 # Connect to Elasticsearch 

56 self.client = Elasticsearch(hosts, **es_args) 

57 

58 # Verify connection 

59 try: 

60 info = self.client.info() 

61 logger.info( 

62 f"Connected to Elasticsearch cluster: {info.get('cluster_name')}" 

63 ) 

64 logger.info( 

65 f"Elasticsearch version: {info.get('version', {}).get('number')}" 

66 ) 

67 except Exception as e: 

68 logger.exception("Failed to connect to Elasticsearch") 

69 raise ConnectionError(f"Could not connect to Elasticsearch: {e!s}") 

70 

71 def close(self) -> None: 

72 """Close the Elasticsearch client and its connection pool.""" 

73 from .resource_utils import safe_close 

74 

75 safe_close(self.client, "Elasticsearch client") 

76 

77 def create_index( 

78 self, 

79 index_name: str, 

80 mappings: Optional[Dict[str, Any]] = None, 

81 settings: Optional[Dict[str, Any]] = None, 

82 ) -> bool: 

83 """ 

84 Create an Elasticsearch index with optional mappings and settings. 

85 

86 Args: 

87 index_name: Name of the index to create 

88 mappings: Optional mappings for the index fields 

89 settings: Optional settings for the index 

90 

91 Returns: 

92 bool: True if successful, False otherwise 

93 """ 

94 try: 

95 # Check if index already exists 

96 if self.client.indices.exists(index=index_name): 

97 logger.warning( 

98 f"Index '{index_name}' already exists - skipping creation" 

99 ) 

100 return True 

101 

102 # Default mappings for better text search if none provided 

103 if mappings is None: 

104 mappings = { 

105 "properties": { 

106 "title": { 

107 "type": "text", 

108 "analyzer": "standard", 

109 "fields": { 

110 "keyword": { 

111 "type": "keyword", 

112 "ignore_above": 256, 

113 } 

114 }, 

115 }, 

116 "content": {"type": "text", "analyzer": "standard"}, 

117 "url": {"type": "keyword"}, 

118 "source": {"type": "keyword"}, 

119 "timestamp": {"type": "date"}, 

120 "metadata": {"type": "object", "enabled": True}, 

121 } 

122 } 

123 

124 # Default settings if none provided 

125 if settings is None: 

126 settings = { 

127 "number_of_shards": 1, 

128 "number_of_replicas": 0, 

129 "analysis": { 

130 "analyzer": {"standard": {"type": "standard"}} 

131 }, 

132 } 

133 

134 # Create the index with mappings and settings 

135 create_response = self.client.indices.create( 

136 index=index_name, 

137 mappings=mappings, 

138 settings=settings, 

139 ) 

140 

141 logger.info(f"Created index '{index_name}': {create_response}") 

142 return True 

143 

144 except Exception: 

145 logger.exception(f"Error creating index '{index_name}'") 

146 return False 

147 

148 def delete_index(self, index_name: str) -> bool: 

149 """ 

150 Delete an Elasticsearch index. 

151 

152 Args: 

153 index_name: Name of the index to delete 

154 

155 Returns: 

156 bool: True if successful, False otherwise 

157 """ 

158 try: 

159 # Check if index exists 

160 if not self.client.indices.exists(index=index_name): 

161 logger.warning( 

162 f"Index '{index_name}' does not exist - skipping deletion" 

163 ) 

164 return True 

165 

166 # Delete the index 

167 delete_response = self.client.indices.delete(index=index_name) 

168 logger.info(f"Deleted index '{index_name}': {delete_response}") 

169 return True 

170 

171 except Exception: 

172 logger.exception(f"Error deleting index '{index_name}'") 

173 return False 

174 

175 def index_document( 

176 self, 

177 index_name: str, 

178 document: Dict[str, Any], 

179 document_id: Optional[str] = None, 

180 refresh: bool = False, 

181 ) -> Optional[str]: 

182 """ 

183 Index a single document in Elasticsearch. 

184 

185 Args: 

186 index_name: Name of the index to add the document to 

187 document: The document to index 

188 document_id: Optional document ID (will be generated if not provided) 

189 refresh: Whether to refresh the index after indexing 

190 

191 Returns: 

192 str: Document ID if successful, None otherwise 

193 """ 

194 try: 

195 # Index the document 

196 response = self.client.index( 

197 index=index_name, 

198 document=document, 

199 id=document_id, 

200 refresh=refresh, 

201 ) 

202 

203 logger.info( 

204 f"Indexed document in '{index_name}' with ID: {response['_id']}" 

205 ) 

206 return response["_id"] 

207 

208 except Exception: 

209 logger.exception(f"Error indexing document in '{index_name}'") 

210 return None 

211 

212 def bulk_index_documents( 

213 self, 

214 index_name: str, 

215 documents: List[Dict[str, Any]], 

216 id_field: Optional[str] = None, 

217 refresh: bool = False, 

218 ) -> int: 

219 """ 

220 Bulk index multiple documents in Elasticsearch. 

221 

222 Args: 

223 index_name: Name of the index to add the documents to 

224 documents: List of documents to index 

225 id_field: Optional field in the documents to use as the document ID 

226 refresh: Whether to refresh the index after indexing 

227 

228 Returns: 

229 int: Number of successfully indexed documents 

230 """ 

231 try: 

232 # Prepare the bulk actions 

233 actions = [] 

234 for doc in documents: 

235 action = { 

236 "_index": index_name, 

237 "_source": doc, 

238 } 

239 

240 # Use the specified field as the document ID if provided 

241 if id_field and id_field in doc: 

242 action["_id"] = doc[id_field] 

243 

244 actions.append(action) 

245 

246 # Execute the bulk indexing 

247 success, failed = bulk( 

248 self.client, 

249 actions, 

250 refresh=refresh, 

251 stats_only=True, 

252 ) 

253 

254 logger.info( 

255 f"Bulk indexed {success} documents in '{index_name}', failed: {failed}" 

256 ) 

257 return success 

258 

259 except Exception: 

260 logger.exception(f"Error bulk indexing documents in '{index_name}'") 

261 return 0 

262 

263 def index_file( 

264 self, 

265 index_name: str, 

266 file_path: str, 

267 content_field: str = "content", 

268 title_field: Optional[str] = "title", 

269 extract_metadata: bool = True, 

270 refresh: bool = False, 

271 ) -> Optional[str]: 

272 """ 

273 Index a file in Elasticsearch, extracting text content and metadata. 

274 

275 Args: 

276 index_name: Name of the index to add the document to 

277 file_path: Path to the file to index 

278 content_field: Field name to store the file content 

279 title_field: Field name to store the file title (filename if not specified) 

280 extract_metadata: Whether to extract file metadata 

281 refresh: Whether to refresh the index after indexing 

282 

283 Returns: 

284 str: Document ID if successful, None otherwise 

285 """ 

286 try: 

287 from langchain_community.document_loaders import ( 

288 UnstructuredFileLoader, 

289 ) 

290 

291 # Extract file content and metadata 

292 loader = UnstructuredFileLoader(file_path) 

293 documents = loader.load() 

294 

295 # Combine all content from the documents 

296 content = "\n\n".join([doc.page_content for doc in documents]) 

297 

298 # Get the filename for the title 

299 filename = Path(file_path).name 

300 title = filename 

301 

302 # Prepare the document 

303 document = { 

304 content_field: content, 

305 } 

306 

307 # Add title if requested 

308 if title_field: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 document[title_field] = title 

310 

311 # Add metadata if requested 

312 if extract_metadata and documents: 312 ↛ 322line 312 didn't jump to line 322 because the condition on line 312 was always true

313 # Include metadata from the first document 

314 document["metadata"] = documents[0].metadata 

315 

316 # Add file-specific metadata 

317 document["source"] = file_path 

318 document["file_extension"] = Path(filename).suffix.lstrip(".") 

319 document["filename"] = filename 

320 

321 # Index the document 

322 return self.index_document(index_name, document, refresh=refresh) 

323 

324 except ImportError: 

325 logger.error( 

326 "UnstructuredFileLoader not available. Please install the 'unstructured' package." 

327 ) 

328 return None 

329 except Exception: 

330 logger.exception(f"Error indexing file '{file_path}'") 

331 return None 

332 

333 def index_directory( 

334 self, 

335 index_name: str, 

336 directory_path: str, 

337 file_patterns: Optional[List[str]] = None, 

338 content_field: str = "content", 

339 title_field: str = "title", 

340 extract_metadata: bool = True, 

341 refresh: bool = False, 

342 ) -> int: 

343 """ 

344 Index all matching files in a directory in Elasticsearch. 

345 

346 Args: 

347 index_name: Name of the index to add the documents to 

348 directory_path: Path to the directory containing files to index 

349 file_patterns: List of file patterns to match (glob patterns) 

350 content_field: Field name to store the file content 

351 title_field: Field name to store the file title 

352 extract_metadata: Whether to extract file metadata 

353 refresh: Whether to refresh the index after indexing 

354 

355 Returns: 

356 int: Number of successfully indexed files 

357 """ 

358 file_patterns = file_patterns or ["*.txt", "*.pdf", "*.docx", "*.md"] 

359 try: 

360 # Find all matching files 

361 all_files = [] 

362 directory = Path(directory_path) 

363 for pattern in file_patterns: 

364 matching_files = list(directory.glob(pattern)) 

365 all_files.extend([str(f) for f in matching_files]) 

366 

367 logger.info( 

368 f"Found {len(all_files)} files matching patterns {file_patterns} in {directory_path}" 

369 ) 

370 

371 # Index each file 

372 successful_count = 0 

373 for file_path in all_files: 

374 logger.info(f"Indexing file: {file_path}") 

375 doc_id = self.index_file( 

376 index_name=index_name, 

377 file_path=file_path, 

378 content_field=content_field, 

379 title_field=title_field, 

380 extract_metadata=extract_metadata, 

381 refresh=refresh, 

382 ) 

383 

384 if doc_id: 

385 successful_count += 1 

386 

387 logger.info( 

388 f"Successfully indexed {successful_count} files out of {len(all_files)}" 

389 ) 

390 return successful_count 

391 

392 except Exception: 

393 logger.exception(f"Error indexing directory '{directory_path}'") 

394 return 0 

395 

396 def search( 

397 self, 

398 index_name: str, 

399 query: str, 

400 fields: List[str] = ["content", "title"], 

401 size: int = 10, 

402 highlight: bool = True, 

403 ) -> Dict[str, Any]: 

404 """ 

405 Search for documents in Elasticsearch. 

406 

407 Args: 

408 index_name: Name of the index to search 

409 query: Search query 

410 fields: Fields to search in 

411 size: Maximum number of results to return 

412 highlight: Whether to include highlighted excerpts in results 

413 

414 Returns: 

415 Dict: Elasticsearch search response 

416 """ 

417 try: 

418 search_query = { 

419 "query": { 

420 "multi_match": { 

421 "query": query, 

422 "fields": fields, 

423 "type": "best_fields", 

424 "tie_breaker": 0.3, 

425 } 

426 }, 

427 "size": size, 

428 } 

429 

430 # Add highlighting if requested 

431 if highlight: 

432 search_query["highlight"] = { 

433 "fields": {field: {} for field in fields}, 

434 "pre_tags": ["<em>"], 

435 "post_tags": ["</em>"], 

436 } 

437 

438 # Execute the search 

439 return self.client.search( 

440 index=index_name, 

441 body=search_query, 

442 ) 

443 

444 except Exception: 

445 logger.exception(f"Error searching index '{index_name}'") 

446 return {"error": "Search failed"}