Coverage for src / local_deep_research / research_library / services / library_rag_service.py: 27%
431 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Library RAG Service
4Handles indexing and searching library documents using RAG:
5- Index text documents into vector database
6- Chunk documents for semantic search
7- Generate embeddings using local models
8- Manage FAISS indices per research
9- Track RAG status in library
10"""
12from pathlib import Path
13from typing import Any, Dict, List, Optional
15from langchain_core.documents import Document as LangchainDocument
16from loguru import logger
17from sqlalchemy import func
19from ...config.paths import get_cache_directory
20from ...database.models.library import (
21 Document,
22 DocumentChunk,
23 DocumentCollection,
24 Collection,
25 RAGIndex,
26 RagDocumentStatus,
27 EmbeddingProvider,
28)
29from ...database.session_context import get_user_db_session
30from ...embeddings.splitters import get_text_splitter
31from ...web_search_engines.engines.search_engine_local import (
32 LocalEmbeddingManager,
33)
34from ...security.file_integrity import FileIntegrityManager, FAISSIndexVerifier
35import hashlib
36from faiss import IndexFlatL2, IndexFlatIP, IndexHNSWFlat
37from langchain_community.vectorstores import FAISS
38from langchain_community.docstore.in_memory import InMemoryDocstore
41class LibraryRAGService:
42 """Service for managing RAG indexing of library documents."""
44 def __init__(
45 self,
46 username: str,
47 embedding_model: str = "all-MiniLM-L6-v2",
48 embedding_provider: str = "sentence_transformers",
49 chunk_size: int = 1000,
50 chunk_overlap: int = 200,
51 splitter_type: str = "recursive",
52 text_separators: Optional[list] = None,
53 distance_metric: str = "cosine",
54 normalize_vectors: bool = True,
55 index_type: str = "flat",
56 embedding_manager: Optional["LocalEmbeddingManager"] = None,
57 db_password: Optional[str] = None,
58 ):
59 """
60 Initialize library RAG service for a user.
62 Args:
63 username: Username for database access
64 embedding_model: Name of the embedding model to use
65 embedding_provider: Provider type ('sentence_transformers' or 'ollama')
66 chunk_size: Size of text chunks for splitting
67 chunk_overlap: Overlap between consecutive chunks
68 splitter_type: Type of splitter ('recursive', 'token', 'sentence', 'semantic')
69 text_separators: List of text separators for chunking (default: ["\n\n", "\n", ". ", " ", ""])
70 distance_metric: Distance metric ('cosine', 'l2', or 'dot_product')
71 normalize_vectors: Whether to normalize vectors with L2
72 index_type: FAISS index type ('flat', 'hnsw', or 'ivf')
73 embedding_manager: Optional pre-constructed LocalEmbeddingManager for testing/flexibility
74 db_password: Optional database password for background thread access
75 """
76 self.username = username
77 self._db_password = db_password # Can be used for thread access
78 # Initialize optional attributes to None before they're set below
79 # This allows the db_password setter to check them without hasattr
80 self.embedding_manager = None
81 self.integrity_manager = None
82 self.embedding_model = embedding_model
83 self.embedding_provider = embedding_provider
84 self.chunk_size = chunk_size
85 self.chunk_overlap = chunk_overlap
86 self.splitter_type = splitter_type
87 self.text_separators = (
88 text_separators
89 if text_separators is not None
90 else ["\n\n", "\n", ". ", " ", ""]
91 )
92 self.distance_metric = distance_metric
93 # Ensure normalize_vectors is always a proper boolean
94 if isinstance(normalize_vectors, str): 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true
95 self.normalize_vectors = normalize_vectors.lower() in (
96 "true",
97 "1",
98 "yes",
99 )
100 else:
101 self.normalize_vectors = bool(normalize_vectors)
102 self.index_type = index_type
104 # Use provided embedding manager or create a new one
105 # (Must be created before text splitter for semantic chunking)
106 if embedding_manager is not None:
107 self.embedding_manager = embedding_manager
108 else:
109 # Initialize embedding manager with library collection
110 # Load the complete user settings snapshot from database using the proper method
111 from ...settings.manager import SettingsManager
113 # Use proper database session for SettingsManager
114 with get_user_db_session(username) as session:
115 settings_manager = SettingsManager(session)
116 settings_snapshot = settings_manager.get_settings_snapshot()
118 # Add the specific settings needed for this RAG service
119 settings_snapshot.update(
120 {
121 "_username": username,
122 "embeddings.provider": embedding_provider,
123 f"embeddings.{embedding_provider}.model": embedding_model,
124 "local_search_chunk_size": chunk_size,
125 "local_search_chunk_overlap": chunk_overlap,
126 }
127 )
129 self.embedding_manager = LocalEmbeddingManager(
130 embedding_model=embedding_model,
131 embedding_model_type=embedding_provider,
132 chunk_size=chunk_size,
133 chunk_overlap=chunk_overlap,
134 settings_snapshot=settings_snapshot,
135 )
137 # Initialize text splitter based on type
138 # (Must be created AFTER embedding_manager for semantic chunking)
139 self.text_splitter = get_text_splitter(
140 splitter_type=self.splitter_type,
141 chunk_size=self.chunk_size,
142 chunk_overlap=self.chunk_overlap,
143 text_separators=self.text_separators,
144 embeddings=self.embedding_manager.embeddings
145 if self.splitter_type == "semantic"
146 else None,
147 )
149 # Initialize or load FAISS index for library collection
150 self.faiss_index = None
151 self.rag_index_record = None
153 # Initialize file integrity manager for FAISS indexes
154 self.integrity_manager = FileIntegrityManager(
155 username, password=self._db_password
156 )
157 self.integrity_manager.register_verifier(FAISSIndexVerifier())
159 @property
160 def db_password(self):
161 """Get database password."""
162 return self._db_password
164 @db_password.setter
165 def db_password(self, value):
166 """Set database password and propagate to embedding manager and integrity manager."""
167 self._db_password = value
168 if self.embedding_manager:
169 self.embedding_manager.db_password = value
170 if self.integrity_manager:
171 self.integrity_manager.password = value
173 def _get_index_hash(
174 self,
175 collection_name: str,
176 embedding_model: str,
177 embedding_model_type: str,
178 ) -> str:
179 """Generate hash for index identification."""
180 hash_input = (
181 f"{collection_name}:{embedding_model}:{embedding_model_type}"
182 )
183 return hashlib.sha256(hash_input.encode()).hexdigest()
185 def _get_index_path(self, index_hash: str) -> Path:
186 """Get path for FAISS index file."""
187 # Store in centralized cache directory (respects LDR_DATA_DIR)
188 cache_dir = get_cache_directory() / "rag_indices"
189 cache_dir.mkdir(parents=True, exist_ok=True)
190 return cache_dir / f"{index_hash}.faiss"
192 def _get_or_create_rag_index(self, collection_id: str) -> RAGIndex:
193 """Get or create RAGIndex record for the current configuration."""
194 with get_user_db_session(self.username, self.db_password) as session:
195 # Use collection_<uuid> format
196 collection_name = f"collection_{collection_id}"
197 index_hash = self._get_index_hash(
198 collection_name, self.embedding_model, self.embedding_provider
199 )
201 # Try to get existing index
202 rag_index = (
203 session.query(RAGIndex).filter_by(index_hash=index_hash).first()
204 )
206 if not rag_index:
207 # Create new index record
208 index_path = self._get_index_path(index_hash)
210 # Get embedding dimension by embedding a test string
211 test_embedding = self.embedding_manager.embeddings.embed_query(
212 "test"
213 )
214 embedding_dim = len(test_embedding)
216 rag_index = RAGIndex(
217 collection_name=collection_name,
218 embedding_model=self.embedding_model,
219 embedding_model_type=EmbeddingProvider(
220 self.embedding_provider
221 ),
222 embedding_dimension=embedding_dim,
223 index_path=str(index_path),
224 index_hash=index_hash,
225 chunk_size=self.chunk_size,
226 chunk_overlap=self.chunk_overlap,
227 splitter_type=self.splitter_type,
228 text_separators=self.text_separators,
229 distance_metric=self.distance_metric,
230 normalize_vectors=self.normalize_vectors,
231 index_type=self.index_type,
232 chunk_count=0,
233 total_documents=0,
234 status="active",
235 is_current=True,
236 )
237 session.add(rag_index)
238 session.commit()
239 session.refresh(rag_index)
240 logger.info(f"Created new RAG index: {index_hash}")
242 return rag_index
244 def load_or_create_faiss_index(self, collection_id: str) -> FAISS:
245 """
246 Load existing FAISS index or create new one.
248 Args:
249 collection_id: UUID of the collection
251 Returns:
252 FAISS vector store instance
253 """
254 rag_index = self._get_or_create_rag_index(collection_id)
255 self.rag_index_record = rag_index
257 index_path = Path(rag_index.index_path)
259 if index_path.exists(): 259 ↛ 261line 259 didn't jump to line 261 because the condition on line 259 was never true
260 # Verify integrity before loading
261 verified, reason = self.integrity_manager.verify_file(index_path)
262 if not verified:
263 logger.error(
264 f"Integrity verification failed for {index_path}: {reason}. "
265 f"Refusing to load. Creating new index."
266 )
267 # Remove corrupted index
268 try:
269 index_path.unlink()
270 logger.info(f"Removed corrupted index file: {index_path}")
271 except Exception:
272 logger.exception("Failed to remove corrupted index")
273 else:
274 try:
275 # Check for embedding dimension mismatch before loading
276 current_dim = len(
277 self.embedding_manager.embeddings.embed_query(
278 "dimension_check"
279 )
280 )
281 stored_dim = rag_index.embedding_dimension
283 if stored_dim and current_dim != stored_dim:
284 logger.warning(
285 f"Embedding dimension mismatch detected! "
286 f"Index created with dim={stored_dim}, "
287 f"current model returns dim={current_dim}. "
288 f"Deleting old index and rebuilding."
289 )
290 # Delete old index files
291 try:
292 index_path.unlink()
293 pkl_path = index_path.with_suffix(".pkl")
294 if pkl_path.exists():
295 pkl_path.unlink()
296 logger.info(
297 f"Deleted old FAISS index files at {index_path}"
298 )
299 except Exception:
300 logger.exception("Failed to delete old index files")
302 # Update RAGIndex with new dimension and reset counts
303 with get_user_db_session(
304 self.username, self.db_password
305 ) as session:
306 idx = (
307 session.query(RAGIndex)
308 .filter_by(id=rag_index.id)
309 .first()
310 )
311 if idx:
312 idx.embedding_dimension = current_dim
313 idx.chunk_count = 0
314 idx.total_documents = 0
315 session.commit()
316 logger.info(
317 f"Updated RAGIndex dimension to {current_dim}"
318 )
320 # Clear rag_document_status for this index
321 session.query(RagDocumentStatus).filter_by(
322 rag_index_id=rag_index.id
323 ).delete()
324 session.commit()
325 logger.info(
326 "Cleared indexed status for documents in this "
327 "collection"
328 )
330 # Update local reference for index creation below
331 rag_index.embedding_dimension = current_dim
332 # Fall through to create new index below
333 else:
334 # Dimensions match (or no stored dimension), load index
335 faiss_index = FAISS.load_local(
336 str(index_path.parent),
337 self.embedding_manager.embeddings,
338 index_name=index_path.stem,
339 allow_dangerous_deserialization=True,
340 normalize_L2=True,
341 )
342 logger.info(
343 f"Loaded existing FAISS index from {index_path}"
344 )
345 return faiss_index
346 except Exception as e:
347 logger.warning(
348 f"Failed to load FAISS index: {e}, creating new one"
349 )
351 # Create new FAISS index with configurable type and distance metric
352 logger.info(
353 f"Creating new FAISS index: type={self.index_type}, metric={self.distance_metric}, dimension={rag_index.embedding_dimension}"
354 )
356 # Create index based on type and distance metric
357 if self.index_type == "hnsw": 357 ↛ 360line 357 didn't jump to line 360 because the condition on line 357 was never true
358 # HNSW: Fast approximate search, best for large collections
359 # M=32 is a good default for connections per layer
360 index = IndexHNSWFlat(rag_index.embedding_dimension, 32)
361 logger.info("Created HNSW index with M=32 connections")
362 elif self.index_type == "ivf": 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was never true
363 # IVF requires training, for now fall back to flat
364 # TODO: Implement IVF with proper training
365 logger.warning(
366 "IVF index type not yet fully implemented, using Flat index"
367 )
368 if self.distance_metric in ("cosine", "dot_product"):
369 index = IndexFlatIP(rag_index.embedding_dimension)
370 else:
371 index = IndexFlatL2(rag_index.embedding_dimension)
372 else: # "flat" or default
373 # Flat index: Exact search
374 if self.distance_metric in ("cosine", "dot_product"): 374 ↛ 381line 374 didn't jump to line 381 because the condition on line 374 was always true
375 # For cosine similarity, use inner product (IP) with normalized vectors
376 index = IndexFlatIP(rag_index.embedding_dimension)
377 logger.info(
378 "Created Flat index with Inner Product (for cosine similarity)"
379 )
380 else: # l2
381 index = IndexFlatL2(rag_index.embedding_dimension)
382 logger.info("Created Flat index with L2 distance")
384 faiss_index = FAISS(
385 self.embedding_manager.embeddings,
386 index=index,
387 docstore=InMemoryDocstore(), # Minimal - chunks in DB
388 index_to_docstore_id={},
389 normalize_L2=self.normalize_vectors, # Use configurable normalization
390 )
391 logger.info(
392 f"FAISS index created with normalization={self.normalize_vectors}"
393 )
394 return faiss_index
396 def get_current_index_info(
397 self, collection_id: Optional[str] = None
398 ) -> Optional[Dict[str, Any]]:
399 """
400 Get information about the current RAG index for a collection.
402 Args:
403 collection_id: UUID of collection (defaults to Library if None)
404 """
405 with get_user_db_session(self.username, self.db_password) as session:
406 # Get collection name in the format stored in RAGIndex (collection_<uuid>)
407 if collection_id:
408 collection = (
409 session.query(Collection)
410 .filter_by(id=collection_id)
411 .first()
412 )
413 collection_name = (
414 f"collection_{collection_id}" if collection else "unknown"
415 )
416 else:
417 # Default to Library collection
418 from ...database.library_init import get_default_library_id
420 collection_id = get_default_library_id(self.username)
421 collection_name = f"collection_{collection_id}"
423 rag_index = (
424 session.query(RAGIndex)
425 .filter_by(collection_name=collection_name, is_current=True)
426 .first()
427 )
429 if not rag_index:
430 # Debug: check all RAG indices for this collection
431 all_indices = session.query(RAGIndex).all()
432 logger.info(
433 f"No RAG index found for collection_name='{collection_name}'. All indices: {[(idx.collection_name, idx.is_current) for idx in all_indices]}"
434 )
435 return None
437 # Calculate actual counts from rag_document_status table
438 from ...database.models.library import RagDocumentStatus
440 actual_chunk_count = (
441 session.query(func.sum(RagDocumentStatus.chunk_count))
442 .filter_by(collection_id=collection_id)
443 .scalar()
444 or 0
445 )
447 actual_doc_count = (
448 session.query(RagDocumentStatus)
449 .filter_by(collection_id=collection_id)
450 .count()
451 )
453 return {
454 "embedding_model": rag_index.embedding_model,
455 "embedding_model_type": rag_index.embedding_model_type.value
456 if rag_index.embedding_model_type
457 else None,
458 "embedding_dimension": rag_index.embedding_dimension,
459 "chunk_size": rag_index.chunk_size,
460 "chunk_overlap": rag_index.chunk_overlap,
461 "chunk_count": actual_chunk_count,
462 "total_documents": actual_doc_count,
463 "created_at": rag_index.created_at.isoformat(),
464 "last_updated_at": rag_index.last_updated_at.isoformat(),
465 }
467 def index_document(
468 self, document_id: str, collection_id: str, force_reindex: bool = False
469 ) -> Dict[str, Any]:
470 """
471 Index a single document into RAG for a specific collection.
473 Args:
474 document_id: UUID of the Document to index
475 collection_id: UUID of the Collection to index for
476 force_reindex: Whether to force reindexing even if already indexed
478 Returns:
479 Dict with status, chunk_count, and any errors
480 """
481 with get_user_db_session(self.username, self.db_password) as session:
482 # Get the document
483 document = session.query(Document).filter_by(id=document_id).first()
485 if not document:
486 return {"status": "error", "error": "Document not found"}
488 # Get or create DocumentCollection entry
489 all_doc_collections = (
490 session.query(DocumentCollection)
491 .filter_by(document_id=document_id, collection_id=collection_id)
492 .all()
493 )
495 logger.info(
496 f"Found {len(all_doc_collections)} DocumentCollection entries for doc={document_id}, coll={collection_id}"
497 )
499 doc_collection = (
500 all_doc_collections[0] if all_doc_collections else None
501 )
503 if not doc_collection:
504 # Create new DocumentCollection entry
505 doc_collection = DocumentCollection(
506 document_id=document_id,
507 collection_id=collection_id,
508 indexed=False,
509 chunk_count=0,
510 )
511 session.add(doc_collection)
512 logger.info(
513 f"Created new DocumentCollection entry for doc={document_id}, coll={collection_id}"
514 )
516 # Check if already indexed for this collection
517 if doc_collection.indexed and not force_reindex:
518 return {
519 "status": "skipped",
520 "message": "Document already indexed for this collection",
521 "chunk_count": doc_collection.chunk_count,
522 }
524 # Validate text content
525 if not document.text_content: 525 ↛ 531line 525 didn't jump to line 531 because the condition on line 525 was always true
526 return {
527 "status": "error",
528 "error": "Document has no text content",
529 }
531 try:
532 # Create LangChain Document from text
533 doc = LangchainDocument(
534 page_content=document.text_content,
535 metadata={
536 "source": document.original_url,
537 "document_id": document_id, # Add document ID for source linking
538 "collection_id": collection_id, # Add collection ID
539 "title": document.title
540 or document.filename
541 or "Untitled",
542 "document_title": document.title
543 or document.filename
544 or "Untitled", # Add for compatibility
545 "authors": document.authors,
546 "published_date": str(document.published_date)
547 if document.published_date
548 else None,
549 "doi": document.doi,
550 "arxiv_id": document.arxiv_id,
551 "pmid": document.pmid,
552 "pmcid": document.pmcid,
553 "extraction_method": document.extraction_method,
554 "word_count": document.word_count,
555 },
556 )
558 # Split into chunks
559 chunks = self.text_splitter.split_documents([doc])
560 logger.info(
561 f"Split document {document_id} into {len(chunks)} chunks"
562 )
564 # Get collection name for chunk storage
565 collection = (
566 session.query(Collection)
567 .filter_by(id=collection_id)
568 .first()
569 )
570 # Use collection_<uuid> format for internal storage
571 collection_name = (
572 f"collection_{collection_id}" if collection else "unknown"
573 )
575 # Store chunks in database using embedding manager
576 embedding_ids = self.embedding_manager._store_chunks_to_db(
577 chunks=chunks,
578 collection_name=collection_name,
579 source_type="document",
580 source_id=document_id,
581 )
583 # Load or create FAISS index
584 if self.faiss_index is None:
585 self.faiss_index = self.load_or_create_faiss_index(
586 collection_id
587 )
589 # If force_reindex, remove old chunks from FAISS before adding new ones
590 if force_reindex:
591 existing_ids = (
592 set(self.faiss_index.docstore._dict.keys())
593 if hasattr(self.faiss_index, "docstore")
594 else set()
595 )
596 old_chunk_ids = [
597 eid for eid in embedding_ids if eid in existing_ids
598 ]
599 if old_chunk_ids:
600 logger.info(
601 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS"
602 )
603 self.faiss_index.delete(old_chunk_ids)
605 # Filter out chunks that already exist in FAISS (unless force_reindex)
606 if not force_reindex:
607 existing_ids = (
608 set(self.faiss_index.docstore._dict.keys())
609 if hasattr(self.faiss_index, "docstore")
610 else set()
611 )
612 new_chunks = []
613 new_ids = []
614 for chunk, chunk_id in zip(chunks, embedding_ids):
615 if chunk_id not in existing_ids:
616 new_chunks.append(chunk)
617 new_ids.append(chunk_id)
618 else:
619 # force_reindex: add all chunks
620 new_chunks = chunks
621 new_ids = embedding_ids
623 # Add embeddings to FAISS index
624 if new_chunks:
625 if force_reindex:
626 logger.info(
627 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index"
628 )
629 else:
630 logger.info(
631 f"Adding {len(new_chunks)} new embeddings to FAISS index ({len(chunks) - len(new_chunks)} already exist)"
632 )
633 self.faiss_index.add_documents(new_chunks, ids=new_ids)
634 else:
635 logger.info(
636 f"All {len(chunks)} chunks already exist in FAISS index, skipping"
637 )
639 # Save FAISS index
640 index_path = Path(self.rag_index_record.index_path)
641 self.faiss_index.save_local(
642 str(index_path.parent), index_name=index_path.stem
643 )
644 # Record file integrity
645 self.integrity_manager.record_file(
646 index_path,
647 related_entity_type="rag_index",
648 related_entity_id=self.rag_index_record.id,
649 )
650 logger.info(
651 f"Saved FAISS index to {index_path} with integrity tracking"
652 )
654 from datetime import datetime, UTC
655 from sqlalchemy import text
657 # Check if document was already indexed (for stats update)
658 existing_status = (
659 session.query(RagDocumentStatus)
660 .filter_by(
661 document_id=document_id, collection_id=collection_id
662 )
663 .first()
664 )
665 was_already_indexed = existing_status is not None
667 # Mark document as indexed using rag_document_status table
668 # Row existence = indexed, simple and clean
669 timestamp = datetime.now(UTC)
671 # Create or update RagDocumentStatus using ORM merge (atomic upsert)
672 rag_status = RagDocumentStatus(
673 document_id=document_id,
674 collection_id=collection_id,
675 rag_index_id=self.rag_index_record.id,
676 chunk_count=len(chunks),
677 indexed_at=timestamp,
678 )
679 session.merge(rag_status)
681 logger.info(
682 f"Marked document as indexed in rag_document_status: doc_id={document_id}, coll_id={collection_id}, chunks={len(chunks)}"
683 )
685 # Also update DocumentCollection table for backward compatibility
686 session.query(DocumentCollection).filter_by(
687 document_id=document_id, collection_id=collection_id
688 ).update(
689 {
690 "indexed": True,
691 "chunk_count": len(chunks),
692 "last_indexed_at": timestamp,
693 }
694 )
696 logger.info(
697 "Also updated DocumentCollection.indexed for backward compatibility"
698 )
700 # Update RAGIndex statistics (only if not already indexed)
701 rag_index_obj = (
702 session.query(RAGIndex)
703 .filter_by(id=self.rag_index_record.id)
704 .first()
705 )
706 if rag_index_obj and not was_already_indexed:
707 rag_index_obj.chunk_count += len(chunks)
708 rag_index_obj.total_documents += 1
709 rag_index_obj.last_updated_at = datetime.now(UTC)
710 logger.info(
711 f"Updated RAGIndex stats: chunk_count +{len(chunks)}, total_documents +1"
712 )
714 # Flush ORM changes to database before commit
715 session.flush()
716 logger.info(f"Flushed ORM changes for document {document_id}")
718 # Commit the transaction
719 session.commit()
721 # WAL checkpoint after commit to ensure persistence
722 session.execute(text("PRAGMA wal_checkpoint(FULL)"))
724 logger.info(
725 f"Successfully indexed document {document_id} for collection {collection_id} "
726 f"with {len(chunks)} chunks"
727 )
729 return {
730 "status": "success",
731 "chunk_count": len(chunks),
732 "embedding_ids": embedding_ids,
733 }
735 except Exception as e:
736 logger.exception(
737 f"Error indexing document {document_id} for collection {collection_id}: {str(e)}"
738 )
739 return {
740 "status": "error",
741 "error": f"Operation failed: {type(e).__name__}",
742 }
744 def index_all_documents(
745 self,
746 collection_id: str,
747 force_reindex: bool = False,
748 progress_callback=None,
749 ) -> Dict[str, Any]:
750 """
751 Index all documents in a collection into RAG.
753 Args:
754 collection_id: UUID of the collection to index
755 force_reindex: Whether to force reindexing already indexed documents
756 progress_callback: Optional callback function called after each document with (current, total, doc_title, status)
758 Returns:
759 Dict with counts of successful, skipped, and failed documents
760 """
761 with get_user_db_session(self.username, self.db_password) as session:
762 # Get all DocumentCollection entries for this collection
763 query = session.query(DocumentCollection).filter_by(
764 collection_id=collection_id
765 )
767 if not force_reindex:
768 # Only index documents that haven't been indexed yet
769 query = query.filter_by(indexed=False)
771 doc_collections = query.all()
773 if not doc_collections:
774 return {
775 "status": "info",
776 "message": "No documents to index",
777 "successful": 0,
778 "skipped": 0,
779 "failed": 0,
780 }
782 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
783 total = len(doc_collections)
785 for idx, doc_collection in enumerate(doc_collections, 1):
786 # Get the document for title info
787 document = (
788 session.query(Document)
789 .filter_by(id=doc_collection.document_id)
790 .first()
791 )
792 title = document.title if document else "Unknown"
794 result = self.index_document(
795 doc_collection.document_id, collection_id, force_reindex
796 )
798 if result["status"] == "success":
799 results["successful"] += 1
800 elif result["status"] == "skipped":
801 results["skipped"] += 1
802 else:
803 results["failed"] += 1
804 results["errors"].append(
805 {
806 "doc_id": doc_collection.document_id,
807 "title": title,
808 "error": result.get("error"),
809 }
810 )
812 # Call progress callback if provided
813 if progress_callback:
814 progress_callback(idx, total, title, result["status"])
816 logger.info(
817 f"Indexed collection {collection_id}: "
818 f"{results['successful']} successful, "
819 f"{results['skipped']} skipped, "
820 f"{results['failed']} failed"
821 )
823 return results
825 def remove_document_from_rag(
826 self, document_id: str, collection_id: str
827 ) -> Dict[str, Any]:
828 """
829 Remove a document's chunks from RAG for a specific collection.
831 Args:
832 document_id: UUID of the Document to remove
833 collection_id: UUID of the Collection to remove from
835 Returns:
836 Dict with status and count of removed chunks
837 """
838 with get_user_db_session(self.username, self.db_password) as session:
839 # Get the DocumentCollection entry
840 doc_collection = (
841 session.query(DocumentCollection)
842 .filter_by(document_id=document_id, collection_id=collection_id)
843 .first()
844 )
846 if not doc_collection: 846 ↛ 852line 846 didn't jump to line 852 because the condition on line 846 was always true
847 return {
848 "status": "error",
849 "error": "Document not found in collection",
850 }
852 try:
853 # Get collection name in the format collection_<uuid>
854 collection = (
855 session.query(Collection)
856 .filter_by(id=collection_id)
857 .first()
858 )
859 # Use collection_<uuid> format for internal storage
860 collection_name = (
861 f"collection_{collection_id}" if collection else "unknown"
862 )
864 # Delete chunks from database
865 deleted_count = self.embedding_manager._delete_chunks_from_db(
866 collection_name=collection_name,
867 source_id=document_id,
868 )
870 # Update DocumentCollection RAG status
871 doc_collection.indexed = False
872 doc_collection.chunk_count = 0
873 doc_collection.last_indexed_at = None
874 session.commit()
876 logger.info(
877 f"Removed {deleted_count} chunks for document {document_id} from collection {collection_id}"
878 )
880 return {"status": "success", "deleted_count": deleted_count}
882 except Exception as e:
883 logger.exception(
884 f"Error removing document {document_id} from collection {collection_id}: {str(e)}"
885 )
886 return {
887 "status": "error",
888 "error": f"Operation failed: {type(e).__name__}",
889 }
891 def index_documents_batch(
892 self,
893 doc_info: List[tuple],
894 collection_id: str,
895 force_reindex: bool = False,
896 ) -> Dict[str, Dict[str, Any]]:
897 """
898 Index multiple documents in a batch for a specific collection.
900 Args:
901 doc_info: List of (doc_id, title) tuples
902 collection_id: UUID of the collection to index for
903 force_reindex: Whether to force reindexing even if already indexed
905 Returns:
906 Dict mapping doc_id to individual result
907 """
908 results = {}
909 doc_ids = [doc_id for doc_id, _ in doc_info]
911 # Use single database session for querying
912 with get_user_db_session(self.username, self.db_password) as session:
913 # Pre-load all documents for this batch
914 documents = (
915 session.query(Document).filter(Document.id.in_(doc_ids)).all()
916 )
918 # Create lookup for quick access
919 doc_lookup = {doc.id: doc for doc in documents}
921 # Pre-load DocumentCollection entries
922 doc_collections = (
923 session.query(DocumentCollection)
924 .filter(
925 DocumentCollection.document_id.in_(doc_ids),
926 DocumentCollection.collection_id == collection_id,
927 )
928 .all()
929 )
930 doc_collection_lookup = {
931 dc.document_id: dc for dc in doc_collections
932 }
934 # Process each document in the batch
935 for doc_id, title in doc_info:
936 document = doc_lookup.get(doc_id)
938 if not document: 938 ↛ 939line 938 didn't jump to line 939 because the condition on line 938 was never true
939 results[doc_id] = {
940 "status": "error",
941 "error": "Document not found",
942 }
943 continue
945 # Check if already indexed via DocumentCollection
946 doc_collection = doc_collection_lookup.get(doc_id)
947 if ( 947 ↛ 952line 947 didn't jump to line 952 because the condition on line 947 was never true
948 doc_collection
949 and doc_collection.indexed
950 and not force_reindex
951 ):
952 results[doc_id] = {
953 "status": "skipped",
954 "message": "Document already indexed for this collection",
955 "chunk_count": doc_collection.chunk_count,
956 }
957 continue
959 # Validate text content
960 if not document.text_content: 960 ↛ 961line 960 didn't jump to line 961 because the condition on line 960 was never true
961 results[doc_id] = {
962 "status": "error",
963 "error": "Document has no text content",
964 }
965 continue
967 # Index the document
968 try:
969 result = self.index_document(
970 doc_id, collection_id, force_reindex
971 )
972 results[doc_id] = result
973 except Exception as e:
974 logger.exception(
975 f"Error indexing document {doc_id} in batch"
976 )
977 results[doc_id] = {
978 "status": "error",
979 "error": f"Indexing failed: {type(e).__name__}",
980 }
982 return results
984 def get_rag_stats(
985 self, collection_id: Optional[str] = None
986 ) -> Dict[str, Any]:
987 """
988 Get RAG statistics for a collection.
990 Args:
991 collection_id: UUID of the collection (defaults to Library)
993 Returns:
994 Dict with counts and metadata about indexed documents
995 """
996 with get_user_db_session(self.username, self.db_password) as session:
997 # Get collection ID (default to Library)
998 if not collection_id: 998 ↛ 999line 998 didn't jump to line 999 because the condition on line 998 was never true
999 from ...database.library_init import get_default_library_id
1001 collection_id = get_default_library_id(self.username)
1003 # Count total documents in collection
1004 total_docs = (
1005 session.query(DocumentCollection)
1006 .filter_by(collection_id=collection_id)
1007 .count()
1008 )
1010 # Count indexed documents from rag_document_status table
1011 from ...database.models.library import RagDocumentStatus
1013 indexed_docs = (
1014 session.query(RagDocumentStatus)
1015 .filter_by(collection_id=collection_id)
1016 .count()
1017 )
1019 # Count total chunks from rag_document_status table
1020 total_chunks = (
1021 session.query(func.sum(RagDocumentStatus.chunk_count))
1022 .filter_by(collection_id=collection_id)
1023 .scalar()
1024 or 0
1025 )
1027 # Get collection name in the format stored in DocumentChunk (collection_<uuid>)
1028 collection = (
1029 session.query(Collection).filter_by(id=collection_id).first()
1030 )
1031 collection_name = (
1032 f"collection_{collection_id}" if collection else "library"
1033 )
1035 # Get embedding model info from chunks
1036 chunk_sample = (
1037 session.query(DocumentChunk)
1038 .filter_by(collection_name=collection_name)
1039 .first()
1040 )
1042 embedding_info = {}
1043 if chunk_sample: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true
1044 embedding_info = {
1045 "model": chunk_sample.embedding_model,
1046 "model_type": chunk_sample.embedding_model_type.value
1047 if chunk_sample.embedding_model_type
1048 else None,
1049 "dimension": chunk_sample.embedding_dimension,
1050 }
1052 return {
1053 "total_documents": total_docs,
1054 "indexed_documents": indexed_docs,
1055 "unindexed_documents": total_docs - indexed_docs,
1056 "total_chunks": total_chunks,
1057 "embedding_info": embedding_info,
1058 "chunk_size": self.chunk_size,
1059 "chunk_overlap": self.chunk_overlap,
1060 }
1062 def index_local_file(self, file_path: str) -> Dict[str, Any]:
1063 """
1064 Index a local file from the filesystem into RAG.
1066 Args:
1067 file_path: Path to the file to index
1069 Returns:
1070 Dict with status, chunk_count, and any errors
1071 """
1072 from pathlib import Path
1073 import mimetypes
1075 file_path = Path(file_path)
1077 if not file_path.exists():
1078 return {"status": "error", "error": f"File not found: {file_path}"}
1080 if not file_path.is_file():
1081 return {"status": "error", "error": f"Not a file: {file_path}"}
1083 # Determine file type
1084 mime_type, _ = mimetypes.guess_type(str(file_path))
1086 # Read file content based on type
1087 try:
1088 if file_path.suffix.lower() in [".txt", ".md", ".markdown"]:
1089 # Text files
1090 with open(file_path, "r", encoding="utf-8") as f:
1091 content = f.read()
1092 elif file_path.suffix.lower() in [".html", ".htm"]:
1093 # HTML files - strip tags
1094 from bs4 import BeautifulSoup
1096 with open(file_path, "r", encoding="utf-8") as f:
1097 soup = BeautifulSoup(f.read(), "html.parser")
1098 content = soup.get_text()
1099 elif file_path.suffix.lower() == ".pdf":
1100 # PDF files - extract text
1101 import PyPDF2
1103 content = ""
1104 with open(file_path, "rb") as f:
1105 pdf_reader = PyPDF2.PdfReader(f)
1106 for page in pdf_reader.pages:
1107 content += page.extract_text()
1108 else:
1109 return {
1110 "status": "skipped",
1111 "error": f"Unsupported file type: {file_path.suffix}",
1112 }
1114 if not content or len(content.strip()) < 10:
1115 return {
1116 "status": "error",
1117 "error": "File has no extractable text content",
1118 }
1120 # Create LangChain Document from text
1121 doc = LangchainDocument(
1122 page_content=content,
1123 metadata={
1124 "source": str(file_path),
1125 "source_id": f"local_{file_path.stem}_{hash(str(file_path))}",
1126 "title": file_path.stem,
1127 "document_title": file_path.stem,
1128 "file_type": file_path.suffix.lower(),
1129 "file_size": file_path.stat().st_size,
1130 "source_type": "local_file",
1131 "collection": "local_library",
1132 },
1133 )
1135 # Split into chunks
1136 chunks = self.text_splitter.split_documents([doc])
1137 logger.info(
1138 f"Split local file {file_path} into {len(chunks)} chunks"
1139 )
1141 # Generate unique IDs for chunks
1142 import hashlib
1144 file_hash = hashlib.sha256(str(file_path).encode()).hexdigest()[:8]
1145 embedding_ids = [
1146 f"local_{file_hash}_{i}" for i in range(len(chunks))
1147 ]
1149 # Store chunks in database
1150 self.embedding_manager._store_chunks_to_db(
1151 chunks=chunks,
1152 collection_name="local_library",
1153 source_type="local_file",
1154 source_id=str(file_path),
1155 )
1157 # Load or create FAISS index
1158 if self.faiss_index is None:
1159 self.faiss_index = self.load_or_create_faiss_index()
1161 # Add embeddings to FAISS index
1162 self.faiss_index.add_documents(chunks, ids=embedding_ids)
1164 # Save FAISS index
1165 index_path = (
1166 Path(self.rag_index_record.index_path)
1167 if self.rag_index_record
1168 else None
1169 )
1170 if index_path:
1171 self.faiss_index.save_local(
1172 str(index_path.parent), index_name=index_path.stem
1173 )
1174 # Record file integrity
1175 self.integrity_manager.record_file(
1176 index_path,
1177 related_entity_type="rag_index",
1178 related_entity_id=self.rag_index_record.id,
1179 )
1180 logger.info(
1181 f"Saved FAISS index to {index_path} with integrity tracking"
1182 )
1184 logger.info(
1185 f"Successfully indexed local file {file_path} with {len(chunks)} chunks"
1186 )
1188 return {
1189 "status": "success",
1190 "chunk_count": len(chunks),
1191 "embedding_ids": embedding_ids,
1192 }
1194 except Exception as e:
1195 logger.exception(f"Error indexing local file {file_path}: {str(e)}")
1196 return {
1197 "status": "error",
1198 "error": f"Operation failed: {type(e).__name__}",
1199 }
1201 def index_user_document(
1202 self, user_doc, collection_name: str, force_reindex: bool = False
1203 ) -> Dict[str, Any]:
1204 """
1205 Index a user-uploaded document into a specific collection.
1207 Args:
1208 user_doc: UserDocument object
1209 collection_name: Name of the collection (e.g., "collection_123")
1210 force_reindex: Whether to force reindexing
1212 Returns:
1213 Dict with status, chunk_count, and any errors
1214 """
1216 try:
1217 # Use the pre-extracted text content
1218 content = user_doc.text_content
1220 if not content or len(content.strip()) < 10:
1221 return {
1222 "status": "error",
1223 "error": "Document has no extractable text content",
1224 }
1226 # Create LangChain Document
1227 doc = LangchainDocument(
1228 page_content=content,
1229 metadata={
1230 "source": f"user_upload_{user_doc.id}",
1231 "source_id": user_doc.id,
1232 "title": user_doc.filename,
1233 "document_title": user_doc.filename,
1234 "file_type": user_doc.file_type,
1235 "file_size": user_doc.file_size,
1236 "collection": collection_name,
1237 },
1238 )
1240 # Split into chunks
1241 chunks = self.text_splitter.split_documents([doc])
1242 logger.info(
1243 f"Split user document {user_doc.filename} into {len(chunks)} chunks"
1244 )
1246 # Store chunks in database
1247 embedding_ids = self.embedding_manager._store_chunks_to_db(
1248 chunks=chunks,
1249 collection_name=collection_name,
1250 source_type="user_document",
1251 source_id=user_doc.id,
1252 )
1254 # Load or create FAISS index for this collection
1255 if self.faiss_index is None:
1256 self.faiss_index = self.load_or_create_faiss_index()
1258 # If force_reindex, remove old chunks from FAISS before adding new ones
1259 if force_reindex:
1260 existing_ids = (
1261 set(self.faiss_index.docstore._dict.keys())
1262 if hasattr(self.faiss_index, "docstore")
1263 else set()
1264 )
1265 old_chunk_ids = [
1266 eid for eid in embedding_ids if eid in existing_ids
1267 ]
1268 if old_chunk_ids:
1269 logger.info(
1270 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS"
1271 )
1272 self.faiss_index.delete(old_chunk_ids)
1274 # Filter out chunks that already exist in FAISS (unless force_reindex)
1275 if not force_reindex:
1276 existing_ids = (
1277 set(self.faiss_index.docstore._dict.keys())
1278 if hasattr(self.faiss_index, "docstore")
1279 else set()
1280 )
1281 new_chunks = []
1282 new_ids = []
1283 for chunk, chunk_id in zip(chunks, embedding_ids):
1284 if chunk_id not in existing_ids:
1285 new_chunks.append(chunk)
1286 new_ids.append(chunk_id)
1287 else:
1288 # force_reindex: add all chunks
1289 new_chunks = chunks
1290 new_ids = embedding_ids
1292 # Add embeddings to FAISS index
1293 if new_chunks:
1294 if force_reindex:
1295 logger.info(
1296 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index"
1297 )
1298 else:
1299 logger.info(
1300 f"Adding {len(new_chunks)} new chunks to FAISS index ({len(chunks) - len(new_chunks)} already exist)"
1301 )
1302 self.faiss_index.add_documents(new_chunks, ids=new_ids)
1303 else:
1304 logger.info(
1305 f"All {len(chunks)} chunks already exist in FAISS index, skipping"
1306 )
1308 # Save FAISS index
1309 index_path = (
1310 Path(self.rag_index_record.index_path)
1311 if self.rag_index_record
1312 else None
1313 )
1314 if index_path:
1315 self.faiss_index.save_local(
1316 str(index_path.parent), index_name=index_path.stem
1317 )
1318 # Record file integrity
1319 self.integrity_manager.record_file(
1320 index_path,
1321 related_entity_type="rag_index",
1322 related_entity_id=self.rag_index_record.id,
1323 )
1325 logger.info(
1326 f"Successfully indexed user document {user_doc.filename} with {len(chunks)} chunks"
1327 )
1329 return {
1330 "status": "success",
1331 "chunk_count": len(chunks),
1332 "embedding_ids": embedding_ids,
1333 }
1335 except Exception as e:
1336 logger.exception(
1337 f"Error indexing user document {user_doc.filename}: {str(e)}"
1338 )
1339 return {
1340 "status": "error",
1341 "error": f"Operation failed: {type(e).__name__}",
1342 }
1344 def remove_collection_from_index(
1345 self, collection_name: str
1346 ) -> Dict[str, Any]:
1347 """
1348 Remove all documents from a collection from the FAISS index.
1350 Args:
1351 collection_name: Name of the collection (e.g., "collection_123")
1353 Returns:
1354 Dict with status and count of removed chunks
1355 """
1356 from ...database.models import DocumentChunk
1357 from ...database.session_context import get_user_db_session
1359 try:
1360 with get_user_db_session(
1361 self.username, self.db_password
1362 ) as session:
1363 # Get all chunk IDs for this collection
1364 chunks = (
1365 session.query(DocumentChunk)
1366 .filter_by(collection_name=collection_name)
1367 .all()
1368 )
1370 if not chunks:
1371 return {"status": "success", "deleted_count": 0}
1373 chunk_ids = [
1374 f"{collection_name}_{chunk.id}" for chunk in chunks
1375 ]
1377 # Load FAISS index if not already loaded
1378 if self.faiss_index is None:
1379 self.faiss_index = self.load_or_create_faiss_index()
1381 # Remove from FAISS index
1382 if hasattr(self.faiss_index, "delete"):
1383 try:
1384 self.faiss_index.delete(chunk_ids)
1386 # Save updated index
1387 index_path = (
1388 Path(self.rag_index_record.index_path)
1389 if self.rag_index_record
1390 else None
1391 )
1392 if index_path:
1393 self.faiss_index.save_local(
1394 str(index_path.parent),
1395 index_name=index_path.stem,
1396 )
1397 # Record file integrity
1398 self.integrity_manager.record_file(
1399 index_path,
1400 related_entity_type="rag_index",
1401 related_entity_id=self.rag_index_record.id,
1402 )
1403 except Exception as e:
1404 logger.warning(
1405 f"Could not delete chunks from FAISS: {e}"
1406 )
1408 logger.info(
1409 f"Removed {len(chunk_ids)} chunks from collection {collection_name}"
1410 )
1412 return {"status": "success", "deleted_count": len(chunk_ids)}
1414 except Exception as e:
1415 logger.exception(
1416 f"Error removing collection {collection_name} from index: {str(e)}"
1417 )
1418 return {
1419 "status": "error",
1420 "error": f"Operation failed: {type(e).__name__}",
1421 }
1423 def search_library(
1424 self, query: str, limit: int = 10, score_threshold: float = 0.0
1425 ) -> List[Dict[str, Any]]:
1426 """
1427 Search library documents using semantic search.
1429 Args:
1430 query: Search query
1431 limit: Maximum number of results
1432 score_threshold: Minimum similarity score
1434 Returns:
1435 List of results with content, metadata, and similarity scores
1436 """
1437 # This will be implemented when we integrate with the search system
1438 # For now, raise NotImplementedError
1439 raise NotImplementedError(
1440 "Library search will be implemented in the search integration phase"
1441 )