Coverage for src / local_deep_research / research_library / services / library_rag_service.py: 59%

458 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Library RAG Service 

3 

4Handles indexing and searching library documents using RAG: 

5- Index text documents into vector database 

6- Chunk documents for semantic search 

7- Generate embeddings using local models 

8- Manage FAISS indices per research 

9- Track RAG status in library 

10""" 

11 

12from pathlib import Path 

13from typing import Any, Dict, List, Optional, Tuple 

14 

15from langchain_core.documents import Document as LangchainDocument 

16from loguru import logger 

17from sqlalchemy import func 

18 

19from ...config.paths import get_cache_directory 

20from ...database.models.library import ( 

21 Document, 

22 DocumentChunk, 

23 DocumentCollection, 

24 Collection, 

25 RAGIndex, 

26 RagDocumentStatus, 

27 EmbeddingProvider, 

28) 

29from ...database.session_context import get_user_db_session 

30from ...utilities.type_utils import to_bool 

31from ...embeddings.splitters import get_text_splitter 

32from ...web_search_engines.engines.search_engine_local import ( 

33 LocalEmbeddingManager, 

34) 

35from ...security.file_integrity import FileIntegrityManager, FAISSIndexVerifier 

36import hashlib 

37from faiss import IndexFlatL2, IndexFlatIP, IndexHNSWFlat 

38from langchain_community.vectorstores import FAISS 

39from langchain_community.docstore.in_memory import InMemoryDocstore 

40 

41 

42class LibraryRAGService: 

43 """Service for managing RAG indexing of library documents.""" 

44 

45 def __init__( 

46 self, 

47 username: str, 

48 embedding_model: str = "all-MiniLM-L6-v2", 

49 embedding_provider: str = "sentence_transformers", 

50 chunk_size: int = 1000, 

51 chunk_overlap: int = 200, 

52 splitter_type: str = "recursive", 

53 text_separators: Optional[list] = None, 

54 distance_metric: str = "cosine", 

55 normalize_vectors: bool = True, 

56 index_type: str = "flat", 

57 embedding_manager: Optional["LocalEmbeddingManager"] = None, 

58 db_password: Optional[str] = None, 

59 ): 

60 """ 

61 Initialize library RAG service for a user. 

62 

63 Args: 

64 username: Username for database access 

65 embedding_model: Name of the embedding model to use 

66 embedding_provider: Provider type ('sentence_transformers' or 'ollama') 

67 chunk_size: Size of text chunks for splitting 

68 chunk_overlap: Overlap between consecutive chunks 

69 splitter_type: Type of splitter ('recursive', 'token', 'sentence', 'semantic') 

70 text_separators: List of text separators for chunking (default: ["\n\n", "\n", ". ", " ", ""]) 

71 distance_metric: Distance metric ('cosine', 'l2', or 'dot_product') 

72 normalize_vectors: Whether to normalize vectors with L2 

73 index_type: FAISS index type ('flat', 'hnsw', or 'ivf') 

74 embedding_manager: Optional pre-constructed LocalEmbeddingManager for testing/flexibility 

75 db_password: Optional database password for background thread access 

76 """ 

77 self.username = username 

78 self._db_password = db_password # Can be used for thread access 

79 # Initialize optional attributes to None before they're set below 

80 # This allows the db_password setter to check them without hasattr 

81 self.embedding_manager = None 

82 self.integrity_manager = None 

83 self.embedding_model = embedding_model 

84 self.embedding_provider = embedding_provider 

85 self.chunk_size = chunk_size 

86 self.chunk_overlap = chunk_overlap 

87 self.splitter_type = splitter_type 

88 self.text_separators = ( 

89 text_separators 

90 if text_separators is not None 

91 else ["\n\n", "\n", ". ", " ", ""] 

92 ) 

93 self.distance_metric = distance_metric 

94 # Ensure normalize_vectors is always a proper boolean 

95 self.normalize_vectors = to_bool(normalize_vectors, default=True) 

96 self.index_type = index_type 

97 

98 # Use provided embedding manager or create a new one 

99 # (Must be created before text splitter for semantic chunking) 

100 if embedding_manager is not None: 

101 self.embedding_manager = embedding_manager 

102 else: 

103 # Initialize embedding manager with library collection 

104 # Load the complete user settings snapshot from database using the proper method 

105 from ...settings.manager import SettingsManager 

106 

107 # Use proper database session for SettingsManager 

108 # Note: using _db_password (backing field) directly here because the 

109 # db_password property setter propagates to embedding_manager/integrity_manager, 

110 # which are still None at this point in __init__. 

111 with get_user_db_session(username, self._db_password) as session: 

112 settings_manager = SettingsManager(session) 

113 settings_snapshot = settings_manager.get_settings_snapshot() 

114 

115 # Add the specific settings needed for this RAG service 

116 settings_snapshot.update( 

117 { 

118 "_username": username, 

119 "embeddings.provider": embedding_provider, 

120 f"embeddings.{embedding_provider}.model": embedding_model, 

121 "local_search_chunk_size": chunk_size, 

122 "local_search_chunk_overlap": chunk_overlap, 

123 } 

124 ) 

125 

126 self.embedding_manager = LocalEmbeddingManager( 

127 embedding_model=embedding_model, 

128 embedding_model_type=embedding_provider, 

129 chunk_size=chunk_size, 

130 chunk_overlap=chunk_overlap, 

131 settings_snapshot=settings_snapshot, 

132 ) 

133 

134 # Initialize text splitter based on type 

135 # (Must be created AFTER embedding_manager for semantic chunking) 

136 self.text_splitter = get_text_splitter( 

137 splitter_type=self.splitter_type, 

138 chunk_size=self.chunk_size, 

139 chunk_overlap=self.chunk_overlap, 

140 text_separators=self.text_separators, 

141 embeddings=self.embedding_manager.embeddings 

142 if self.splitter_type == "semantic" 

143 else None, 

144 ) 

145 

146 # Initialize or load FAISS index for library collection 

147 self.faiss_index = None 

148 self.rag_index_record = None 

149 

150 # Initialize file integrity manager for FAISS indexes 

151 self.integrity_manager = FileIntegrityManager( 

152 username, password=self._db_password 

153 ) 

154 self.integrity_manager.register_verifier(FAISSIndexVerifier()) 

155 

156 self._closed = False 

157 

158 def close(self): 

159 """Release embedding model and index resources.""" 

160 if self._closed: 

161 return 

162 self._closed = True 

163 

164 # Clear embedding manager resources 

165 if self.embedding_manager is not None: 

166 # Clear references to allow garbage collection 

167 self.embedding_manager = None 

168 

169 # Clear FAISS index 

170 if self.faiss_index is not None: 

171 self.faiss_index = None 

172 

173 # Clear other resources 

174 self.rag_index_record = None 

175 self.integrity_manager = None 

176 self.text_splitter = None 

177 

178 def __enter__(self): 

179 """Enter context manager.""" 

180 return self 

181 

182 def __exit__(self, exc_type, exc_val, exc_tb): 

183 """Exit context manager, ensuring cleanup.""" 

184 self.close() 

185 return False 

186 

187 @property 

188 def db_password(self): 

189 """Get database password.""" 

190 return self._db_password 

191 

192 @db_password.setter 

193 def db_password(self, value): 

194 """Set database password and propagate to embedding manager and integrity manager.""" 

195 self._db_password = value 

196 if self.embedding_manager: 

197 self.embedding_manager.db_password = value 

198 if self.integrity_manager: 

199 self.integrity_manager.password = value 

200 

201 def _get_index_hash( 

202 self, 

203 collection_name: str, 

204 embedding_model: str, 

205 embedding_model_type: str, 

206 ) -> str: 

207 """Generate hash for index identification.""" 

208 hash_input = ( 

209 f"{collection_name}:{embedding_model}:{embedding_model_type}" 

210 ) 

211 return hashlib.sha256(hash_input.encode()).hexdigest() 

212 

213 def _get_index_path(self, index_hash: str) -> Path: 

214 """Get path for FAISS index file.""" 

215 # Store in centralized cache directory (respects LDR_DATA_DIR) 

216 cache_dir = get_cache_directory() / "rag_indices" 

217 cache_dir.mkdir(parents=True, exist_ok=True) 

218 return cache_dir / f"{index_hash}.faiss" 

219 

220 @staticmethod 

221 def _deduplicate_chunks( 

222 chunks: List[LangchainDocument], 

223 chunk_ids: List[str], 

224 existing_ids: Optional[set] = None, 

225 ) -> Tuple[List[LangchainDocument], List[str]]: 

226 """Deduplicate chunks by ID within a batch, optionally excluding existing IDs.""" 

227 seen_ids: set = set() 

228 new_chunks: List[LangchainDocument] = [] 

229 new_ids: List[str] = [] 

230 for chunk, chunk_id in zip(chunks, chunk_ids): 

231 if chunk_id not in seen_ids and ( 

232 existing_ids is None or chunk_id not in existing_ids 

233 ): 

234 new_chunks.append(chunk) 

235 new_ids.append(chunk_id) 

236 seen_ids.add(chunk_id) 

237 return new_chunks, new_ids 

238 

239 def _get_or_create_rag_index(self, collection_id: str) -> RAGIndex: 

240 """Get or create RAGIndex record for the current configuration.""" 

241 with get_user_db_session(self.username, self.db_password) as session: 

242 # Use collection_<uuid> format 

243 collection_name = f"collection_{collection_id}" 

244 index_hash = self._get_index_hash( 

245 collection_name, self.embedding_model, self.embedding_provider 

246 ) 

247 

248 # Try to get existing index 

249 rag_index = ( 

250 session.query(RAGIndex).filter_by(index_hash=index_hash).first() 

251 ) 

252 

253 if not rag_index: 253 ↛ 255line 253 didn't jump to line 255 because the condition on line 253 was never true

254 # Create new index record 

255 index_path = self._get_index_path(index_hash) 

256 

257 # Get embedding dimension by embedding a test string 

258 test_embedding = self.embedding_manager.embeddings.embed_query( 

259 "test" 

260 ) 

261 embedding_dim = len(test_embedding) 

262 

263 rag_index = RAGIndex( 

264 collection_name=collection_name, 

265 embedding_model=self.embedding_model, 

266 embedding_model_type=EmbeddingProvider( 

267 self.embedding_provider 

268 ), 

269 embedding_dimension=embedding_dim, 

270 index_path=str(index_path), 

271 index_hash=index_hash, 

272 chunk_size=self.chunk_size, 

273 chunk_overlap=self.chunk_overlap, 

274 splitter_type=self.splitter_type, 

275 text_separators=self.text_separators, 

276 distance_metric=self.distance_metric, 

277 normalize_vectors=self.normalize_vectors, 

278 index_type=self.index_type, 

279 chunk_count=0, 

280 total_documents=0, 

281 status="active", 

282 is_current=True, 

283 ) 

284 session.add(rag_index) 

285 session.commit() 

286 session.refresh(rag_index) 

287 logger.info(f"Created new RAG index: {index_hash}") 

288 

289 return rag_index 

290 

291 def load_or_create_faiss_index(self, collection_id: str) -> FAISS: 

292 """ 

293 Load existing FAISS index or create new one. 

294 

295 Args: 

296 collection_id: UUID of the collection 

297 

298 Returns: 

299 FAISS vector store instance 

300 """ 

301 rag_index = self._get_or_create_rag_index(collection_id) 

302 self.rag_index_record = rag_index 

303 

304 index_path = Path(rag_index.index_path) 

305 

306 if index_path.exists(): 

307 # Verify integrity before loading 

308 verified, reason = self.integrity_manager.verify_file(index_path) 

309 if not verified: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 logger.error( 

311 f"Integrity verification failed for {index_path}: {reason}. " 

312 f"Refusing to load. Creating new index." 

313 ) 

314 # Remove corrupted index 

315 try: 

316 index_path.unlink() 

317 logger.info(f"Removed corrupted index file: {index_path}") 

318 except Exception: 

319 logger.exception("Failed to remove corrupted index") 

320 else: 

321 try: 

322 # Check for embedding dimension mismatch before loading 

323 current_dim = len( 

324 self.embedding_manager.embeddings.embed_query( 

325 "dimension_check" 

326 ) 

327 ) 

328 stored_dim = rag_index.embedding_dimension 

329 

330 if stored_dim and current_dim != stored_dim: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 logger.warning( 

332 f"Embedding dimension mismatch detected! " 

333 f"Index created with dim={stored_dim}, " 

334 f"current model returns dim={current_dim}. " 

335 f"Deleting old index and rebuilding." 

336 ) 

337 # Delete old index files 

338 try: 

339 index_path.unlink() 

340 pkl_path = index_path.with_suffix(".pkl") 

341 if pkl_path.exists(): 

342 pkl_path.unlink() 

343 logger.info( 

344 f"Deleted old FAISS index files at {index_path}" 

345 ) 

346 except Exception: 

347 logger.exception("Failed to delete old index files") 

348 

349 # Update RAGIndex with new dimension and reset counts 

350 with get_user_db_session( 

351 self.username, self.db_password 

352 ) as session: 

353 idx = ( 

354 session.query(RAGIndex) 

355 .filter_by(id=rag_index.id) 

356 .first() 

357 ) 

358 if idx: 

359 idx.embedding_dimension = current_dim 

360 idx.chunk_count = 0 

361 idx.total_documents = 0 

362 session.commit() 

363 logger.info( 

364 f"Updated RAGIndex dimension to {current_dim}" 

365 ) 

366 

367 # Clear rag_document_status for this index 

368 session.query(RagDocumentStatus).filter_by( 

369 rag_index_id=rag_index.id 

370 ).delete() 

371 session.commit() 

372 logger.info( 

373 "Cleared indexed status for documents in this " 

374 "collection" 

375 ) 

376 

377 # Update local reference for index creation below 

378 rag_index.embedding_dimension = current_dim 

379 # Fall through to create new index below 

380 else: 

381 # Dimensions match (or no stored dimension), load index 

382 faiss_index = FAISS.load_local( 

383 str(index_path.parent), 

384 self.embedding_manager.embeddings, 

385 index_name=index_path.stem, 

386 allow_dangerous_deserialization=True, 

387 normalize_L2=True, 

388 ) 

389 logger.info( 

390 f"Loaded existing FAISS index from {index_path}" 

391 ) 

392 return faiss_index 

393 except Exception as e: 

394 logger.warning( 

395 f"Failed to load FAISS index: {e}, creating new one" 

396 ) 

397 

398 # Create new FAISS index with configurable type and distance metric 

399 logger.info( 

400 f"Creating new FAISS index: type={self.index_type}, metric={self.distance_metric}, dimension={rag_index.embedding_dimension}" 

401 ) 

402 

403 # Create index based on type and distance metric 

404 if self.index_type == "hnsw": 404 ↛ 407line 404 didn't jump to line 407 because the condition on line 404 was never true

405 # HNSW: Fast approximate search, best for large collections 

406 # M=32 is a good default for connections per layer 

407 index = IndexHNSWFlat(rag_index.embedding_dimension, 32) 

408 logger.info("Created HNSW index with M=32 connections") 

409 elif self.index_type == "ivf": 409 ↛ 412line 409 didn't jump to line 412 because the condition on line 409 was never true

410 # IVF requires training, for now fall back to flat 

411 # TODO: Implement IVF with proper training 

412 logger.warning( 

413 "IVF index type not yet fully implemented, using Flat index" 

414 ) 

415 if self.distance_metric in ("cosine", "dot_product"): 

416 index = IndexFlatIP(rag_index.embedding_dimension) 

417 else: 

418 index = IndexFlatL2(rag_index.embedding_dimension) 

419 else: # "flat" or default 

420 # Flat index: Exact search 

421 if self.distance_metric in ("cosine", "dot_product"): 421 ↛ 428line 421 didn't jump to line 428 because the condition on line 421 was always true

422 # For cosine similarity, use inner product (IP) with normalized vectors 

423 index = IndexFlatIP(rag_index.embedding_dimension) 

424 logger.info( 

425 "Created Flat index with Inner Product (for cosine similarity)" 

426 ) 

427 else: # l2 

428 index = IndexFlatL2(rag_index.embedding_dimension) 

429 logger.info("Created Flat index with L2 distance") 

430 

431 faiss_index = FAISS( 

432 self.embedding_manager.embeddings, 

433 index=index, 

434 docstore=InMemoryDocstore(), # Minimal - chunks in DB 

435 index_to_docstore_id={}, 

436 normalize_L2=self.normalize_vectors, # Use configurable normalization 

437 ) 

438 logger.info( 

439 f"FAISS index created with normalization={self.normalize_vectors}" 

440 ) 

441 return faiss_index 

442 

443 def get_current_index_info( 

444 self, collection_id: Optional[str] = None 

445 ) -> Optional[Dict[str, Any]]: 

446 """ 

447 Get information about the current RAG index for a collection. 

448 

449 Args: 

450 collection_id: UUID of collection (defaults to Library if None) 

451 """ 

452 with get_user_db_session(self.username, self.db_password) as session: 

453 # Get collection name in the format stored in RAGIndex (collection_<uuid>) 

454 if collection_id: 

455 collection = ( 

456 session.query(Collection) 

457 .filter_by(id=collection_id) 

458 .first() 

459 ) 

460 collection_name = ( 

461 f"collection_{collection_id}" if collection else "unknown" 

462 ) 

463 else: 

464 # Default to Library collection 

465 from ...database.library_init import get_default_library_id 

466 

467 collection_id = get_default_library_id( 

468 self.username, self.db_password 

469 ) 

470 collection_name = f"collection_{collection_id}" 

471 

472 rag_index = ( 

473 session.query(RAGIndex) 

474 .filter_by(collection_name=collection_name, is_current=True) 

475 .first() 

476 ) 

477 

478 if not rag_index: 

479 # Debug: check all RAG indices for this collection 

480 all_indices = session.query(RAGIndex).all() 

481 logger.info( 

482 f"No RAG index found for collection_name='{collection_name}'. All indices: {[(idx.collection_name, idx.is_current) for idx in all_indices]}" 

483 ) 

484 return None 

485 

486 # Calculate actual counts from rag_document_status table 

487 from ...database.models.library import RagDocumentStatus 

488 

489 actual_chunk_count = ( 

490 session.query(func.sum(RagDocumentStatus.chunk_count)) 

491 .filter_by(collection_id=collection_id) 

492 .scalar() 

493 or 0 

494 ) 

495 

496 actual_doc_count = ( 

497 session.query(RagDocumentStatus) 

498 .filter_by(collection_id=collection_id) 

499 .count() 

500 ) 

501 

502 return { 

503 "embedding_model": rag_index.embedding_model, 

504 "embedding_model_type": rag_index.embedding_model_type.value 

505 if rag_index.embedding_model_type 

506 else None, 

507 "embedding_dimension": rag_index.embedding_dimension, 

508 "chunk_size": rag_index.chunk_size, 

509 "chunk_overlap": rag_index.chunk_overlap, 

510 "chunk_count": actual_chunk_count, 

511 "total_documents": actual_doc_count, 

512 "created_at": rag_index.created_at.isoformat(), 

513 "last_updated_at": rag_index.last_updated_at.isoformat(), 

514 } 

515 

516 def index_document( 

517 self, document_id: str, collection_id: str, force_reindex: bool = False 

518 ) -> Dict[str, Any]: 

519 """ 

520 Index a single document into RAG for a specific collection. 

521 

522 Args: 

523 document_id: UUID of the Document to index 

524 collection_id: UUID of the Collection to index for 

525 force_reindex: Whether to force reindexing even if already indexed 

526 

527 Returns: 

528 Dict with status, chunk_count, and any errors 

529 """ 

530 with get_user_db_session(self.username, self.db_password) as session: 

531 # Get the document 

532 document = session.query(Document).filter_by(id=document_id).first() 

533 

534 if not document: 

535 return {"status": "error", "error": "Document not found"} 

536 

537 # Get or create DocumentCollection entry 

538 all_doc_collections = ( 

539 session.query(DocumentCollection) 

540 .filter_by(document_id=document_id, collection_id=collection_id) 

541 .all() 

542 ) 

543 

544 logger.info( 

545 f"Found {len(all_doc_collections)} DocumentCollection entries for doc={document_id}, coll={collection_id}" 

546 ) 

547 

548 doc_collection = ( 

549 all_doc_collections[0] if all_doc_collections else None 

550 ) 

551 

552 if not doc_collection: 

553 # Create new DocumentCollection entry 

554 doc_collection = DocumentCollection( 

555 document_id=document_id, 

556 collection_id=collection_id, 

557 indexed=False, 

558 chunk_count=0, 

559 ) 

560 session.add(doc_collection) 

561 logger.info( 

562 f"Created new DocumentCollection entry for doc={document_id}, coll={collection_id}" 

563 ) 

564 

565 # Check if already indexed for this collection 

566 if doc_collection.indexed and not force_reindex: 

567 return { 

568 "status": "skipped", 

569 "message": "Document already indexed for this collection", 

570 "chunk_count": doc_collection.chunk_count, 

571 } 

572 

573 # Validate text content 

574 if not document.text_content: 

575 return { 

576 "status": "error", 

577 "error": "Document has no text content", 

578 } 

579 

580 try: 

581 # Create LangChain Document from text 

582 doc = LangchainDocument( 

583 page_content=document.text_content, 

584 metadata={ 

585 "source": document.original_url, 

586 "document_id": document_id, # Add document ID for source linking 

587 "collection_id": collection_id, # Add collection ID 

588 "title": document.title 

589 or document.filename 

590 or "Untitled", 

591 "document_title": document.title 

592 or document.filename 

593 or "Untitled", # Add for compatibility 

594 "authors": document.authors, 

595 "published_date": str(document.published_date) 

596 if document.published_date 

597 else None, 

598 "doi": document.doi, 

599 "arxiv_id": document.arxiv_id, 

600 "pmid": document.pmid, 

601 "pmcid": document.pmcid, 

602 "extraction_method": document.extraction_method, 

603 "word_count": document.word_count, 

604 }, 

605 ) 

606 

607 # Split into chunks 

608 chunks = self.text_splitter.split_documents([doc]) 

609 logger.info( 

610 f"Split document {document_id} into {len(chunks)} chunks" 

611 ) 

612 

613 # Get collection name for chunk storage 

614 collection = ( 

615 session.query(Collection) 

616 .filter_by(id=collection_id) 

617 .first() 

618 ) 

619 # Use collection_<uuid> format for internal storage 

620 collection_name = ( 

621 f"collection_{collection_id}" if collection else "unknown" 

622 ) 

623 

624 # Store chunks in database using embedding manager 

625 embedding_ids = self.embedding_manager._store_chunks_to_db( 

626 chunks=chunks, 

627 collection_name=collection_name, 

628 source_type="document", 

629 source_id=document_id, 

630 ) 

631 

632 # Load or create FAISS index 

633 if self.faiss_index is None: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true

634 self.faiss_index = self.load_or_create_faiss_index( 

635 collection_id 

636 ) 

637 

638 # If force_reindex, remove old chunks from FAISS before adding new ones 

639 if force_reindex: 639 ↛ 655line 639 didn't jump to line 655 because the condition on line 639 was always true

640 existing_ids = ( 

641 set(self.faiss_index.docstore._dict.keys()) 

642 if hasattr(self.faiss_index, "docstore") 

643 else set() 

644 ) 

645 old_chunk_ids = list( 

646 {eid for eid in embedding_ids if eid in existing_ids} 

647 ) 

648 if old_chunk_ids: 648 ↛ 655line 648 didn't jump to line 655 because the condition on line 648 was always true

649 logger.info( 

650 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS" 

651 ) 

652 self.faiss_index.delete(old_chunk_ids) 

653 

654 # Filter out chunks that already exist in FAISS (unless force_reindex) 

655 if not force_reindex: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true

656 existing_ids = ( 

657 set(self.faiss_index.docstore._dict.keys()) 

658 if hasattr(self.faiss_index, "docstore") 

659 else set() 

660 ) 

661 else: 

662 existing_ids = None 

663 

664 unique_count = len(set(embedding_ids)) 

665 batch_dups = len(chunks) - unique_count 

666 

667 new_chunks, new_ids = self._deduplicate_chunks( 

668 chunks, embedding_ids, existing_ids 

669 ) 

670 

671 # Add embeddings to FAISS index 

672 if new_chunks: 672 ↛ 685line 672 didn't jump to line 685 because the condition on line 672 was always true

673 if force_reindex: 673 ↛ 678line 673 didn't jump to line 678 because the condition on line 673 was always true

674 logger.info( 

675 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index" 

676 ) 

677 else: 

678 already_exist = unique_count - len(new_chunks) 

679 logger.info( 

680 f"Adding {len(new_chunks)} new embeddings to FAISS index " 

681 f"({already_exist} already exist, {batch_dups} batch duplicates removed)" 

682 ) 

683 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

684 else: 

685 logger.info( 

686 f"All {len(chunks)} chunks already exist in FAISS index, skipping" 

687 ) 

688 

689 # Save FAISS index 

690 index_path = Path(self.rag_index_record.index_path) 

691 self.faiss_index.save_local( 

692 str(index_path.parent), index_name=index_path.stem 

693 ) 

694 # Record file integrity 

695 self.integrity_manager.record_file( 

696 index_path, 

697 related_entity_type="rag_index", 

698 related_entity_id=self.rag_index_record.id, 

699 ) 

700 logger.info( 

701 f"Saved FAISS index to {index_path} with integrity tracking" 

702 ) 

703 

704 from datetime import datetime, UTC 

705 from sqlalchemy import text 

706 

707 # Check if document was already indexed (for stats update) 

708 existing_status = ( 

709 session.query(RagDocumentStatus) 

710 .filter_by( 

711 document_id=document_id, collection_id=collection_id 

712 ) 

713 .first() 

714 ) 

715 was_already_indexed = existing_status is not None 

716 

717 # Mark document as indexed using rag_document_status table 

718 # Row existence = indexed, simple and clean 

719 timestamp = datetime.now(UTC) 

720 

721 # Create or update RagDocumentStatus using ORM merge (atomic upsert) 

722 rag_status = RagDocumentStatus( 

723 document_id=document_id, 

724 collection_id=collection_id, 

725 rag_index_id=self.rag_index_record.id, 

726 chunk_count=len(chunks), 

727 indexed_at=timestamp, 

728 ) 

729 session.merge(rag_status) 

730 

731 logger.info( 

732 f"Marked document as indexed in rag_document_status: doc_id={document_id}, coll_id={collection_id}, chunks={len(chunks)}" 

733 ) 

734 

735 # Also update DocumentCollection table for backward compatibility 

736 session.query(DocumentCollection).filter_by( 

737 document_id=document_id, collection_id=collection_id 

738 ).update( 

739 { 

740 "indexed": True, 

741 "chunk_count": len(chunks), 

742 "last_indexed_at": timestamp, 

743 } 

744 ) 

745 

746 logger.info( 

747 "Also updated DocumentCollection.indexed for backward compatibility" 

748 ) 

749 

750 # Update RAGIndex statistics (only if not already indexed) 

751 rag_index_obj = ( 

752 session.query(RAGIndex) 

753 .filter_by(id=self.rag_index_record.id) 

754 .first() 

755 ) 

756 if rag_index_obj and not was_already_indexed: 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true

757 rag_index_obj.chunk_count += len(chunks) 

758 rag_index_obj.total_documents += 1 

759 rag_index_obj.last_updated_at = datetime.now(UTC) 

760 logger.info( 

761 f"Updated RAGIndex stats: chunk_count +{len(chunks)}, total_documents +1" 

762 ) 

763 

764 # Flush ORM changes to database before commit 

765 session.flush() 

766 logger.info(f"Flushed ORM changes for document {document_id}") 

767 

768 # Commit the transaction 

769 session.commit() 

770 

771 # WAL checkpoint after commit to ensure persistence 

772 session.execute(text("PRAGMA wal_checkpoint(FULL)")) 

773 

774 logger.info( 

775 f"Successfully indexed document {document_id} for collection {collection_id} " 

776 f"with {len(chunks)} chunks" 

777 ) 

778 

779 return { 

780 "status": "success", 

781 "chunk_count": len(chunks), 

782 "embedding_ids": embedding_ids, 

783 } 

784 

785 except Exception as e: 

786 logger.exception( 

787 f"Error indexing document {document_id} for collection {collection_id}" 

788 ) 

789 return { 

790 "status": "error", 

791 "error": f"Operation failed: {type(e).__name__}", 

792 } 

793 

794 def index_all_documents( 

795 self, 

796 collection_id: str, 

797 force_reindex: bool = False, 

798 progress_callback=None, 

799 ) -> Dict[str, Any]: 

800 """ 

801 Index all documents in a collection into RAG. 

802 

803 Args: 

804 collection_id: UUID of the collection to index 

805 force_reindex: Whether to force reindexing already indexed documents 

806 progress_callback: Optional callback function called after each document with (current, total, doc_title, status) 

807 

808 Returns: 

809 Dict with counts of successful, skipped, and failed documents 

810 """ 

811 with get_user_db_session(self.username, self.db_password) as session: 

812 # Get all DocumentCollection entries for this collection 

813 query = session.query(DocumentCollection).filter_by( 

814 collection_id=collection_id 

815 ) 

816 

817 if not force_reindex: 

818 # Only index documents that haven't been indexed yet 

819 query = query.filter_by(indexed=False) 

820 

821 doc_collections = query.all() 

822 

823 if not doc_collections: 

824 return { 

825 "status": "info", 

826 "message": "No documents to index", 

827 "successful": 0, 

828 "skipped": 0, 

829 "failed": 0, 

830 } 

831 

832 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

833 total = len(doc_collections) 

834 

835 for idx, doc_collection in enumerate(doc_collections, 1): 

836 # Get the document for title info 

837 document = ( 

838 session.query(Document) 

839 .filter_by(id=doc_collection.document_id) 

840 .first() 

841 ) 

842 title = document.title if document else "Unknown" 

843 

844 result = self.index_document( 

845 doc_collection.document_id, collection_id, force_reindex 

846 ) 

847 

848 if result["status"] == "success": 

849 results["successful"] += 1 

850 elif result["status"] == "skipped": 

851 results["skipped"] += 1 

852 else: 

853 results["failed"] += 1 

854 results["errors"].append( 

855 { 

856 "doc_id": doc_collection.document_id, 

857 "title": title, 

858 "error": result.get("error"), 

859 } 

860 ) 

861 

862 # Call progress callback if provided 

863 if progress_callback: 

864 progress_callback(idx, total, title, result["status"]) 

865 

866 logger.info( 

867 f"Indexed collection {collection_id}: " 

868 f"{results['successful']} successful, " 

869 f"{results['skipped']} skipped, " 

870 f"{results['failed']} failed" 

871 ) 

872 

873 return results 

874 

875 def remove_document_from_rag( 

876 self, document_id: str, collection_id: str 

877 ) -> Dict[str, Any]: 

878 """ 

879 Remove a document's chunks from RAG for a specific collection. 

880 

881 Args: 

882 document_id: UUID of the Document to remove 

883 collection_id: UUID of the Collection to remove from 

884 

885 Returns: 

886 Dict with status and count of removed chunks 

887 """ 

888 with get_user_db_session(self.username, self.db_password) as session: 

889 # Get the DocumentCollection entry 

890 doc_collection = ( 

891 session.query(DocumentCollection) 

892 .filter_by(document_id=document_id, collection_id=collection_id) 

893 .first() 

894 ) 

895 

896 if not doc_collection: 896 ↛ 902line 896 didn't jump to line 902 because the condition on line 896 was always true

897 return { 

898 "status": "error", 

899 "error": "Document not found in collection", 

900 } 

901 

902 try: 

903 # Get collection name in the format collection_<uuid> 

904 collection = ( 

905 session.query(Collection) 

906 .filter_by(id=collection_id) 

907 .first() 

908 ) 

909 # Use collection_<uuid> format for internal storage 

910 collection_name = ( 

911 f"collection_{collection_id}" if collection else "unknown" 

912 ) 

913 

914 # Delete chunks from database 

915 deleted_count = self.embedding_manager._delete_chunks_from_db( 

916 collection_name=collection_name, 

917 source_id=document_id, 

918 ) 

919 

920 # Update DocumentCollection RAG status 

921 doc_collection.indexed = False 

922 doc_collection.chunk_count = 0 

923 doc_collection.last_indexed_at = None 

924 session.commit() 

925 

926 logger.info( 

927 f"Removed {deleted_count} chunks for document {document_id} from collection {collection_id}" 

928 ) 

929 

930 return {"status": "success", "deleted_count": deleted_count} 

931 

932 except Exception as e: 

933 logger.exception( 

934 f"Error removing document {document_id} from collection {collection_id}" 

935 ) 

936 return { 

937 "status": "error", 

938 "error": f"Operation failed: {type(e).__name__}", 

939 } 

940 

941 def index_documents_batch( 

942 self, 

943 doc_info: List[tuple], 

944 collection_id: str, 

945 force_reindex: bool = False, 

946 ) -> Dict[str, Dict[str, Any]]: 

947 """ 

948 Index multiple documents in a batch for a specific collection. 

949 

950 Args: 

951 doc_info: List of (doc_id, title) tuples 

952 collection_id: UUID of the collection to index for 

953 force_reindex: Whether to force reindexing even if already indexed 

954 

955 Returns: 

956 Dict mapping doc_id to individual result 

957 """ 

958 results = {} 

959 doc_ids = [doc_id for doc_id, _ in doc_info] 

960 

961 # Use single database session for querying 

962 with get_user_db_session(self.username, self.db_password) as session: 

963 # Pre-load all documents for this batch 

964 documents = ( 

965 session.query(Document).filter(Document.id.in_(doc_ids)).all() 

966 ) 

967 

968 # Create lookup for quick access 

969 doc_lookup = {doc.id: doc for doc in documents} 

970 

971 # Pre-load DocumentCollection entries 

972 doc_collections = ( 

973 session.query(DocumentCollection) 

974 .filter( 

975 DocumentCollection.document_id.in_(doc_ids), 

976 DocumentCollection.collection_id == collection_id, 

977 ) 

978 .all() 

979 ) 

980 doc_collection_lookup = { 

981 dc.document_id: dc for dc in doc_collections 

982 } 

983 

984 # Process each document in the batch 

985 for doc_id, title in doc_info: 

986 document = doc_lookup.get(doc_id) 

987 

988 if not document: 988 ↛ 989line 988 didn't jump to line 989 because the condition on line 988 was never true

989 results[doc_id] = { 

990 "status": "error", 

991 "error": "Document not found", 

992 } 

993 continue 

994 

995 # Check if already indexed via DocumentCollection 

996 doc_collection = doc_collection_lookup.get(doc_id) 

997 if ( 997 ↛ 1002line 997 didn't jump to line 1002 because the condition on line 997 was never true

998 doc_collection 

999 and doc_collection.indexed 

1000 and not force_reindex 

1001 ): 

1002 results[doc_id] = { 

1003 "status": "skipped", 

1004 "message": "Document already indexed for this collection", 

1005 "chunk_count": doc_collection.chunk_count, 

1006 } 

1007 continue 

1008 

1009 # Validate text content 

1010 if not document.text_content: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true

1011 results[doc_id] = { 

1012 "status": "error", 

1013 "error": "Document has no text content", 

1014 } 

1015 continue 

1016 

1017 # Index the document 

1018 try: 

1019 result = self.index_document( 

1020 doc_id, collection_id, force_reindex 

1021 ) 

1022 results[doc_id] = result 

1023 except Exception as e: 

1024 logger.exception( 

1025 f"Error indexing document {doc_id} in batch" 

1026 ) 

1027 results[doc_id] = { 

1028 "status": "error", 

1029 "error": f"Indexing failed: {type(e).__name__}", 

1030 } 

1031 

1032 return results 

1033 

1034 def get_rag_stats( 

1035 self, collection_id: Optional[str] = None 

1036 ) -> Dict[str, Any]: 

1037 """ 

1038 Get RAG statistics for a collection. 

1039 

1040 Args: 

1041 collection_id: UUID of the collection (defaults to Library) 

1042 

1043 Returns: 

1044 Dict with counts and metadata about indexed documents 

1045 """ 

1046 with get_user_db_session(self.username, self.db_password) as session: 

1047 # Get collection ID (default to Library) 

1048 if not collection_id: 1048 ↛ 1049line 1048 didn't jump to line 1049 because the condition on line 1048 was never true

1049 from ...database.library_init import get_default_library_id 

1050 

1051 collection_id = get_default_library_id( 

1052 self.username, self.db_password 

1053 ) 

1054 

1055 # Count total documents in collection 

1056 total_docs = ( 

1057 session.query(DocumentCollection) 

1058 .filter_by(collection_id=collection_id) 

1059 .count() 

1060 ) 

1061 

1062 # Count indexed documents from rag_document_status table 

1063 from ...database.models.library import RagDocumentStatus 

1064 

1065 indexed_docs = ( 

1066 session.query(RagDocumentStatus) 

1067 .filter_by(collection_id=collection_id) 

1068 .count() 

1069 ) 

1070 

1071 # Count total chunks from rag_document_status table 

1072 total_chunks = ( 

1073 session.query(func.sum(RagDocumentStatus.chunk_count)) 

1074 .filter_by(collection_id=collection_id) 

1075 .scalar() 

1076 or 0 

1077 ) 

1078 

1079 # Get collection name in the format stored in DocumentChunk (collection_<uuid>) 

1080 collection = ( 

1081 session.query(Collection).filter_by(id=collection_id).first() 

1082 ) 

1083 collection_name = ( 

1084 f"collection_{collection_id}" if collection else "library" 

1085 ) 

1086 

1087 # Get embedding model info from chunks 

1088 chunk_sample = ( 

1089 session.query(DocumentChunk) 

1090 .filter_by(collection_name=collection_name) 

1091 .first() 

1092 ) 

1093 

1094 embedding_info = {} 

1095 if chunk_sample: 1095 ↛ 1096line 1095 didn't jump to line 1096 because the condition on line 1095 was never true

1096 embedding_info = { 

1097 "model": chunk_sample.embedding_model, 

1098 "model_type": chunk_sample.embedding_model_type.value 

1099 if chunk_sample.embedding_model_type 

1100 else None, 

1101 "dimension": chunk_sample.embedding_dimension, 

1102 } 

1103 

1104 return { 

1105 "total_documents": total_docs, 

1106 "indexed_documents": indexed_docs, 

1107 "unindexed_documents": total_docs - indexed_docs, 

1108 "total_chunks": total_chunks, 

1109 "embedding_info": embedding_info, 

1110 "chunk_size": self.chunk_size, 

1111 "chunk_overlap": self.chunk_overlap, 

1112 } 

1113 

1114 def index_local_file(self, file_path: str) -> Dict[str, Any]: 

1115 """ 

1116 Index a local file from the filesystem into RAG. 

1117 

1118 Args: 

1119 file_path: Path to the file to index 

1120 

1121 Returns: 

1122 Dict with status, chunk_count, and any errors 

1123 """ 

1124 from pathlib import Path 

1125 import mimetypes 

1126 

1127 file_path = Path(file_path) 

1128 

1129 if not file_path.exists(): 1129 ↛ 1130line 1129 didn't jump to line 1130 because the condition on line 1129 was never true

1130 return {"status": "error", "error": f"File not found: {file_path}"} 

1131 

1132 if not file_path.is_file(): 1132 ↛ 1133line 1132 didn't jump to line 1133 because the condition on line 1132 was never true

1133 return {"status": "error", "error": f"Not a file: {file_path}"} 

1134 

1135 # Determine file type 

1136 mime_type, _ = mimetypes.guess_type(str(file_path)) 

1137 

1138 # Read file content based on type 

1139 try: 

1140 if file_path.suffix.lower() in [".txt", ".md", ".markdown"]: 1140 ↛ 1144line 1140 didn't jump to line 1144 because the condition on line 1140 was always true

1141 # Text files 

1142 with open(file_path, "r", encoding="utf-8") as f: 

1143 content = f.read() 

1144 elif file_path.suffix.lower() in [".html", ".htm"]: 

1145 # HTML files - strip tags 

1146 from bs4 import BeautifulSoup 

1147 

1148 with open(file_path, "r", encoding="utf-8") as f: 

1149 soup = BeautifulSoup(f.read(), "html.parser") 

1150 content = soup.get_text() 

1151 elif file_path.suffix.lower() == ".pdf": 

1152 # PDF files - extract text 

1153 import PyPDF2 

1154 

1155 content = "" 

1156 with open(file_path, "rb") as f: 

1157 pdf_reader = PyPDF2.PdfReader(f) 

1158 for page in pdf_reader.pages: 

1159 content += page.extract_text() 

1160 else: 

1161 return { 

1162 "status": "skipped", 

1163 "error": f"Unsupported file type: {file_path.suffix}", 

1164 } 

1165 

1166 if not content or len(content.strip()) < 10: 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true

1167 return { 

1168 "status": "error", 

1169 "error": "File has no extractable text content", 

1170 } 

1171 

1172 # Create LangChain Document from text 

1173 doc = LangchainDocument( 

1174 page_content=content, 

1175 metadata={ 

1176 "source": str(file_path), 

1177 "source_id": f"local_{file_path.stem}_{hash(str(file_path))}", 

1178 "title": file_path.stem, 

1179 "document_title": file_path.stem, 

1180 "file_type": file_path.suffix.lower(), 

1181 "file_size": file_path.stat().st_size, 

1182 "source_type": "local_file", 

1183 "collection": "local_library", 

1184 }, 

1185 ) 

1186 

1187 # Split into chunks 

1188 chunks = self.text_splitter.split_documents([doc]) 

1189 logger.info( 

1190 f"Split local file {file_path} into {len(chunks)} chunks" 

1191 ) 

1192 

1193 # Store chunks in database (returns UUID-based IDs) 

1194 embedding_ids = self.embedding_manager._store_chunks_to_db( 

1195 chunks=chunks, 

1196 collection_name="local_library", 

1197 source_type="local_file", 

1198 source_id=str(file_path), 

1199 ) 

1200 

1201 # Load or create FAISS index using default library collection 

1202 if self.faiss_index is None: 

1203 from ...database.library_init import get_default_library_id 

1204 

1205 default_collection_id = get_default_library_id( 

1206 self.username, self.db_password 

1207 ) 

1208 self.faiss_index = self.load_or_create_faiss_index( 

1209 default_collection_id 

1210 ) 

1211 

1212 # Filter out chunks that already exist in FAISS and deduplicate 

1213 if self.faiss_index is not None: 1213 ↛ 1220line 1213 didn't jump to line 1220 because the condition on line 1213 was always true

1214 existing_ids = ( 

1215 set(self.faiss_index.docstore._dict.keys()) 

1216 if hasattr(self.faiss_index, "docstore") 

1217 else set() 

1218 ) 

1219 else: 

1220 existing_ids = None 

1221 new_chunks, new_ids = self._deduplicate_chunks( 

1222 chunks, embedding_ids, existing_ids 

1223 ) 

1224 

1225 # Add embeddings to FAISS index 

1226 if new_chunks: 1226 ↛ 1230line 1226 didn't jump to line 1230 because the condition on line 1226 was always true

1227 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

1228 

1229 # Save FAISS index 

1230 index_path = ( 

1231 Path(self.rag_index_record.index_path) 

1232 if self.rag_index_record 

1233 else None 

1234 ) 

1235 if index_path: 

1236 self.faiss_index.save_local( 

1237 str(index_path.parent), index_name=index_path.stem 

1238 ) 

1239 # Record file integrity 

1240 self.integrity_manager.record_file( 

1241 index_path, 

1242 related_entity_type="rag_index", 

1243 related_entity_id=self.rag_index_record.id, 

1244 ) 

1245 logger.info( 

1246 f"Saved FAISS index to {index_path} with integrity tracking" 

1247 ) 

1248 

1249 logger.info( 

1250 f"Successfully indexed local file {file_path} with {len(new_chunks)} new chunks " 

1251 f"({len(chunks) - len(new_chunks)} skipped)" 

1252 ) 

1253 

1254 return { 

1255 "status": "success", 

1256 "chunk_count": len(new_chunks), 

1257 "embedding_ids": new_ids, 

1258 } 

1259 

1260 except Exception as e: 

1261 logger.exception(f"Error indexing local file {file_path}") 

1262 return { 

1263 "status": "error", 

1264 "error": f"Operation failed: {type(e).__name__}", 

1265 } 

1266 

1267 def index_user_document( 

1268 self, user_doc, collection_name: str, force_reindex: bool = False 

1269 ) -> Dict[str, Any]: 

1270 """ 

1271 Index a user-uploaded document into a specific collection. 

1272 

1273 Args: 

1274 user_doc: UserDocument object 

1275 collection_name: Name of the collection (e.g., "collection_123") 

1276 force_reindex: Whether to force reindexing 

1277 

1278 Returns: 

1279 Dict with status, chunk_count, and any errors 

1280 """ 

1281 

1282 try: 

1283 # Use the pre-extracted text content 

1284 content = user_doc.text_content 

1285 

1286 if not content or len(content.strip()) < 10: 1286 ↛ 1287line 1286 didn't jump to line 1287 because the condition on line 1286 was never true

1287 return { 

1288 "status": "error", 

1289 "error": "Document has no extractable text content", 

1290 } 

1291 

1292 # Create LangChain Document 

1293 doc = LangchainDocument( 

1294 page_content=content, 

1295 metadata={ 

1296 "source": f"user_upload_{user_doc.id}", 

1297 "source_id": user_doc.id, 

1298 "title": user_doc.filename, 

1299 "document_title": user_doc.filename, 

1300 "file_type": user_doc.file_type, 

1301 "file_size": user_doc.file_size, 

1302 "collection": collection_name, 

1303 }, 

1304 ) 

1305 

1306 # Split into chunks 

1307 chunks = self.text_splitter.split_documents([doc]) 

1308 logger.info( 

1309 f"Split user document {user_doc.filename} into {len(chunks)} chunks" 

1310 ) 

1311 

1312 # Store chunks in database 

1313 embedding_ids = self.embedding_manager._store_chunks_to_db( 

1314 chunks=chunks, 

1315 collection_name=collection_name, 

1316 source_type="user_document", 

1317 source_id=user_doc.id, 

1318 ) 

1319 

1320 # Load or create FAISS index for this collection 

1321 if self.faiss_index is None: 

1322 # Extract collection_id from collection_name (format: "collection_<uuid>") 

1323 collection_id = collection_name.removeprefix("collection_") 

1324 self.faiss_index = self.load_or_create_faiss_index( 

1325 collection_id 

1326 ) 

1327 

1328 # If force_reindex, remove old chunks from FAISS before adding new ones 

1329 if force_reindex: 

1330 existing_ids = ( 

1331 set(self.faiss_index.docstore._dict.keys()) 

1332 if hasattr(self.faiss_index, "docstore") 

1333 else set() 

1334 ) 

1335 old_chunk_ids = list( 

1336 {eid for eid in embedding_ids if eid in existing_ids} 

1337 ) 

1338 if old_chunk_ids: 1338 ↛ 1345line 1338 didn't jump to line 1345 because the condition on line 1338 was always true

1339 logger.info( 

1340 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS" 

1341 ) 

1342 self.faiss_index.delete(old_chunk_ids) 

1343 

1344 # Filter out chunks that already exist in FAISS (unless force_reindex) 

1345 if not force_reindex: 

1346 existing_ids = ( 

1347 set(self.faiss_index.docstore._dict.keys()) 

1348 if hasattr(self.faiss_index, "docstore") 

1349 else set() 

1350 ) 

1351 else: 

1352 existing_ids = None 

1353 

1354 unique_count = len(set(embedding_ids)) 

1355 batch_dups = len(chunks) - unique_count 

1356 

1357 new_chunks, new_ids = self._deduplicate_chunks( 

1358 chunks, embedding_ids, existing_ids 

1359 ) 

1360 

1361 # Add embeddings to FAISS index 

1362 if new_chunks: 1362 ↛ 1375line 1362 didn't jump to line 1375 because the condition on line 1362 was always true

1363 if force_reindex: 

1364 logger.info( 

1365 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index" 

1366 ) 

1367 else: 

1368 already_exist = unique_count - len(new_chunks) 

1369 logger.info( 

1370 f"Adding {len(new_chunks)} new chunks to FAISS index " 

1371 f"({already_exist} already exist, {batch_dups} batch duplicates removed)" 

1372 ) 

1373 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

1374 else: 

1375 logger.info( 

1376 f"All {len(chunks)} chunks already exist in FAISS index, skipping" 

1377 ) 

1378 

1379 # Save FAISS index 

1380 index_path = ( 

1381 Path(self.rag_index_record.index_path) 

1382 if self.rag_index_record 

1383 else None 

1384 ) 

1385 if index_path: 

1386 self.faiss_index.save_local( 

1387 str(index_path.parent), index_name=index_path.stem 

1388 ) 

1389 # Record file integrity 

1390 self.integrity_manager.record_file( 

1391 index_path, 

1392 related_entity_type="rag_index", 

1393 related_entity_id=self.rag_index_record.id, 

1394 ) 

1395 

1396 logger.info( 

1397 f"Successfully indexed user document {user_doc.filename} with {len(chunks)} chunks" 

1398 ) 

1399 

1400 return { 

1401 "status": "success", 

1402 "chunk_count": len(chunks), 

1403 "embedding_ids": embedding_ids, 

1404 } 

1405 

1406 except Exception as e: 

1407 logger.exception( 

1408 f"Error indexing user document {user_doc.filename}" 

1409 ) 

1410 return { 

1411 "status": "error", 

1412 "error": f"Operation failed: {type(e).__name__}", 

1413 } 

1414 

1415 def remove_collection_from_index( 

1416 self, collection_name: str 

1417 ) -> Dict[str, Any]: 

1418 """ 

1419 Remove all documents from a collection from the FAISS index. 

1420 

1421 Args: 

1422 collection_name: Name of the collection (e.g., "collection_123") 

1423 

1424 Returns: 

1425 Dict with status and count of removed chunks 

1426 """ 

1427 from ...database.models import DocumentChunk 

1428 from ...database.session_context import get_user_db_session 

1429 

1430 try: 

1431 with get_user_db_session( 

1432 self.username, self.db_password 

1433 ) as session: 

1434 # Get all chunk IDs for this collection 

1435 chunks = ( 

1436 session.query(DocumentChunk) 

1437 .filter_by(collection_name=collection_name) 

1438 .all() 

1439 ) 

1440 

1441 if not chunks: 1441 ↛ 1442line 1441 didn't jump to line 1442 because the condition on line 1441 was never true

1442 return {"status": "success", "deleted_count": 0} 

1443 

1444 chunk_ids = [ 

1445 f"{collection_name}_{chunk.id}" for chunk in chunks 

1446 ] 

1447 

1448 # Load FAISS index if not already loaded 

1449 if self.faiss_index is None: 1449 ↛ 1457line 1449 didn't jump to line 1457 because the condition on line 1449 was always true

1450 # Extract collection_id from collection_name (format: "collection_<uuid>") 

1451 collection_id = collection_name.removeprefix("collection_") 

1452 self.faiss_index = self.load_or_create_faiss_index( 

1453 collection_id 

1454 ) 

1455 

1456 # Remove from FAISS index 

1457 if hasattr(self.faiss_index, "delete"): 1457 ↛ 1483line 1457 didn't jump to line 1483 because the condition on line 1457 was always true

1458 try: 

1459 self.faiss_index.delete(chunk_ids) 

1460 

1461 # Save updated index 

1462 index_path = ( 

1463 Path(self.rag_index_record.index_path) 

1464 if self.rag_index_record 

1465 else None 

1466 ) 

1467 if index_path: 1467 ↛ 1468line 1467 didn't jump to line 1468 because the condition on line 1467 was never true

1468 self.faiss_index.save_local( 

1469 str(index_path.parent), 

1470 index_name=index_path.stem, 

1471 ) 

1472 # Record file integrity 

1473 self.integrity_manager.record_file( 

1474 index_path, 

1475 related_entity_type="rag_index", 

1476 related_entity_id=self.rag_index_record.id, 

1477 ) 

1478 except Exception as e: 

1479 logger.warning( 

1480 f"Could not delete chunks from FAISS: {e}" 

1481 ) 

1482 

1483 logger.info( 

1484 f"Removed {len(chunk_ids)} chunks from collection {collection_name}" 

1485 ) 

1486 

1487 return {"status": "success", "deleted_count": len(chunk_ids)} 

1488 

1489 except Exception as e: 

1490 logger.exception( 

1491 f"Error removing collection {collection_name} from index" 

1492 ) 

1493 return { 

1494 "status": "error", 

1495 "error": f"Operation failed: {type(e).__name__}", 

1496 } 

1497 

1498 def search_library( 

1499 self, query: str, limit: int = 10, score_threshold: float = 0.0 

1500 ) -> List[Dict[str, Any]]: 

1501 """ 

1502 Search library documents using semantic search. 

1503 

1504 Args: 

1505 query: Search query 

1506 limit: Maximum number of results 

1507 score_threshold: Minimum similarity score 

1508 

1509 Returns: 

1510 List of results with content, metadata, and similarity scores 

1511 """ 

1512 # This will be implemented when we integrate with the search system 

1513 # For now, raise NotImplementedError 

1514 raise NotImplementedError( 

1515 "Library search will be implemented in the search integration phase" 

1516 )