Coverage for src / local_deep_research / research_library / services / library_rag_service.py: 59%
458 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Library RAG Service
4Handles indexing and searching library documents using RAG:
5- Index text documents into vector database
6- Chunk documents for semantic search
7- Generate embeddings using local models
8- Manage FAISS indices per research
9- Track RAG status in library
10"""
12from pathlib import Path
13from typing import Any, Dict, List, Optional, Tuple
15from langchain_core.documents import Document as LangchainDocument
16from loguru import logger
17from sqlalchemy import func
19from ...config.paths import get_cache_directory
20from ...database.models.library import (
21 Document,
22 DocumentChunk,
23 DocumentCollection,
24 Collection,
25 RAGIndex,
26 RagDocumentStatus,
27 EmbeddingProvider,
28)
29from ...database.session_context import get_user_db_session
30from ...utilities.type_utils import to_bool
31from ...embeddings.splitters import get_text_splitter
32from ...web_search_engines.engines.search_engine_local import (
33 LocalEmbeddingManager,
34)
35from ...security.file_integrity import FileIntegrityManager, FAISSIndexVerifier
36import hashlib
37from faiss import IndexFlatL2, IndexFlatIP, IndexHNSWFlat
38from langchain_community.vectorstores import FAISS
39from langchain_community.docstore.in_memory import InMemoryDocstore
42class LibraryRAGService:
43 """Service for managing RAG indexing of library documents."""
45 def __init__(
46 self,
47 username: str,
48 embedding_model: str = "all-MiniLM-L6-v2",
49 embedding_provider: str = "sentence_transformers",
50 chunk_size: int = 1000,
51 chunk_overlap: int = 200,
52 splitter_type: str = "recursive",
53 text_separators: Optional[list] = None,
54 distance_metric: str = "cosine",
55 normalize_vectors: bool = True,
56 index_type: str = "flat",
57 embedding_manager: Optional["LocalEmbeddingManager"] = None,
58 db_password: Optional[str] = None,
59 ):
60 """
61 Initialize library RAG service for a user.
63 Args:
64 username: Username for database access
65 embedding_model: Name of the embedding model to use
66 embedding_provider: Provider type ('sentence_transformers' or 'ollama')
67 chunk_size: Size of text chunks for splitting
68 chunk_overlap: Overlap between consecutive chunks
69 splitter_type: Type of splitter ('recursive', 'token', 'sentence', 'semantic')
70 text_separators: List of text separators for chunking (default: ["\n\n", "\n", ". ", " ", ""])
71 distance_metric: Distance metric ('cosine', 'l2', or 'dot_product')
72 normalize_vectors: Whether to normalize vectors with L2
73 index_type: FAISS index type ('flat', 'hnsw', or 'ivf')
74 embedding_manager: Optional pre-constructed LocalEmbeddingManager for testing/flexibility
75 db_password: Optional database password for background thread access
76 """
77 self.username = username
78 self._db_password = db_password # Can be used for thread access
79 # Initialize optional attributes to None before they're set below
80 # This allows the db_password setter to check them without hasattr
81 self.embedding_manager = None
82 self.integrity_manager = None
83 self.embedding_model = embedding_model
84 self.embedding_provider = embedding_provider
85 self.chunk_size = chunk_size
86 self.chunk_overlap = chunk_overlap
87 self.splitter_type = splitter_type
88 self.text_separators = (
89 text_separators
90 if text_separators is not None
91 else ["\n\n", "\n", ". ", " ", ""]
92 )
93 self.distance_metric = distance_metric
94 # Ensure normalize_vectors is always a proper boolean
95 self.normalize_vectors = to_bool(normalize_vectors, default=True)
96 self.index_type = index_type
98 # Use provided embedding manager or create a new one
99 # (Must be created before text splitter for semantic chunking)
100 if embedding_manager is not None:
101 self.embedding_manager = embedding_manager
102 else:
103 # Initialize embedding manager with library collection
104 # Load the complete user settings snapshot from database using the proper method
105 from ...settings.manager import SettingsManager
107 # Use proper database session for SettingsManager
108 # Note: using _db_password (backing field) directly here because the
109 # db_password property setter propagates to embedding_manager/integrity_manager,
110 # which are still None at this point in __init__.
111 with get_user_db_session(username, self._db_password) as session:
112 settings_manager = SettingsManager(session)
113 settings_snapshot = settings_manager.get_settings_snapshot()
115 # Add the specific settings needed for this RAG service
116 settings_snapshot.update(
117 {
118 "_username": username,
119 "embeddings.provider": embedding_provider,
120 f"embeddings.{embedding_provider}.model": embedding_model,
121 "local_search_chunk_size": chunk_size,
122 "local_search_chunk_overlap": chunk_overlap,
123 }
124 )
126 self.embedding_manager = LocalEmbeddingManager(
127 embedding_model=embedding_model,
128 embedding_model_type=embedding_provider,
129 chunk_size=chunk_size,
130 chunk_overlap=chunk_overlap,
131 settings_snapshot=settings_snapshot,
132 )
134 # Initialize text splitter based on type
135 # (Must be created AFTER embedding_manager for semantic chunking)
136 self.text_splitter = get_text_splitter(
137 splitter_type=self.splitter_type,
138 chunk_size=self.chunk_size,
139 chunk_overlap=self.chunk_overlap,
140 text_separators=self.text_separators,
141 embeddings=self.embedding_manager.embeddings
142 if self.splitter_type == "semantic"
143 else None,
144 )
146 # Initialize or load FAISS index for library collection
147 self.faiss_index = None
148 self.rag_index_record = None
150 # Initialize file integrity manager for FAISS indexes
151 self.integrity_manager = FileIntegrityManager(
152 username, password=self._db_password
153 )
154 self.integrity_manager.register_verifier(FAISSIndexVerifier())
156 self._closed = False
158 def close(self):
159 """Release embedding model and index resources."""
160 if self._closed:
161 return
162 self._closed = True
164 # Clear embedding manager resources
165 if self.embedding_manager is not None:
166 # Clear references to allow garbage collection
167 self.embedding_manager = None
169 # Clear FAISS index
170 if self.faiss_index is not None:
171 self.faiss_index = None
173 # Clear other resources
174 self.rag_index_record = None
175 self.integrity_manager = None
176 self.text_splitter = None
178 def __enter__(self):
179 """Enter context manager."""
180 return self
182 def __exit__(self, exc_type, exc_val, exc_tb):
183 """Exit context manager, ensuring cleanup."""
184 self.close()
185 return False
187 @property
188 def db_password(self):
189 """Get database password."""
190 return self._db_password
192 @db_password.setter
193 def db_password(self, value):
194 """Set database password and propagate to embedding manager and integrity manager."""
195 self._db_password = value
196 if self.embedding_manager:
197 self.embedding_manager.db_password = value
198 if self.integrity_manager:
199 self.integrity_manager.password = value
201 def _get_index_hash(
202 self,
203 collection_name: str,
204 embedding_model: str,
205 embedding_model_type: str,
206 ) -> str:
207 """Generate hash for index identification."""
208 hash_input = (
209 f"{collection_name}:{embedding_model}:{embedding_model_type}"
210 )
211 return hashlib.sha256(hash_input.encode()).hexdigest()
213 def _get_index_path(self, index_hash: str) -> Path:
214 """Get path for FAISS index file."""
215 # Store in centralized cache directory (respects LDR_DATA_DIR)
216 cache_dir = get_cache_directory() / "rag_indices"
217 cache_dir.mkdir(parents=True, exist_ok=True)
218 return cache_dir / f"{index_hash}.faiss"
220 @staticmethod
221 def _deduplicate_chunks(
222 chunks: List[LangchainDocument],
223 chunk_ids: List[str],
224 existing_ids: Optional[set] = None,
225 ) -> Tuple[List[LangchainDocument], List[str]]:
226 """Deduplicate chunks by ID within a batch, optionally excluding existing IDs."""
227 seen_ids: set = set()
228 new_chunks: List[LangchainDocument] = []
229 new_ids: List[str] = []
230 for chunk, chunk_id in zip(chunks, chunk_ids):
231 if chunk_id not in seen_ids and (
232 existing_ids is None or chunk_id not in existing_ids
233 ):
234 new_chunks.append(chunk)
235 new_ids.append(chunk_id)
236 seen_ids.add(chunk_id)
237 return new_chunks, new_ids
239 def _get_or_create_rag_index(self, collection_id: str) -> RAGIndex:
240 """Get or create RAGIndex record for the current configuration."""
241 with get_user_db_session(self.username, self.db_password) as session:
242 # Use collection_<uuid> format
243 collection_name = f"collection_{collection_id}"
244 index_hash = self._get_index_hash(
245 collection_name, self.embedding_model, self.embedding_provider
246 )
248 # Try to get existing index
249 rag_index = (
250 session.query(RAGIndex).filter_by(index_hash=index_hash).first()
251 )
253 if not rag_index: 253 ↛ 255line 253 didn't jump to line 255 because the condition on line 253 was never true
254 # Create new index record
255 index_path = self._get_index_path(index_hash)
257 # Get embedding dimension by embedding a test string
258 test_embedding = self.embedding_manager.embeddings.embed_query(
259 "test"
260 )
261 embedding_dim = len(test_embedding)
263 rag_index = RAGIndex(
264 collection_name=collection_name,
265 embedding_model=self.embedding_model,
266 embedding_model_type=EmbeddingProvider(
267 self.embedding_provider
268 ),
269 embedding_dimension=embedding_dim,
270 index_path=str(index_path),
271 index_hash=index_hash,
272 chunk_size=self.chunk_size,
273 chunk_overlap=self.chunk_overlap,
274 splitter_type=self.splitter_type,
275 text_separators=self.text_separators,
276 distance_metric=self.distance_metric,
277 normalize_vectors=self.normalize_vectors,
278 index_type=self.index_type,
279 chunk_count=0,
280 total_documents=0,
281 status="active",
282 is_current=True,
283 )
284 session.add(rag_index)
285 session.commit()
286 session.refresh(rag_index)
287 logger.info(f"Created new RAG index: {index_hash}")
289 return rag_index
291 def load_or_create_faiss_index(self, collection_id: str) -> FAISS:
292 """
293 Load existing FAISS index or create new one.
295 Args:
296 collection_id: UUID of the collection
298 Returns:
299 FAISS vector store instance
300 """
301 rag_index = self._get_or_create_rag_index(collection_id)
302 self.rag_index_record = rag_index
304 index_path = Path(rag_index.index_path)
306 if index_path.exists():
307 # Verify integrity before loading
308 verified, reason = self.integrity_manager.verify_file(index_path)
309 if not verified: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 logger.error(
311 f"Integrity verification failed for {index_path}: {reason}. "
312 f"Refusing to load. Creating new index."
313 )
314 # Remove corrupted index
315 try:
316 index_path.unlink()
317 logger.info(f"Removed corrupted index file: {index_path}")
318 except Exception:
319 logger.exception("Failed to remove corrupted index")
320 else:
321 try:
322 # Check for embedding dimension mismatch before loading
323 current_dim = len(
324 self.embedding_manager.embeddings.embed_query(
325 "dimension_check"
326 )
327 )
328 stored_dim = rag_index.embedding_dimension
330 if stored_dim and current_dim != stored_dim: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 logger.warning(
332 f"Embedding dimension mismatch detected! "
333 f"Index created with dim={stored_dim}, "
334 f"current model returns dim={current_dim}. "
335 f"Deleting old index and rebuilding."
336 )
337 # Delete old index files
338 try:
339 index_path.unlink()
340 pkl_path = index_path.with_suffix(".pkl")
341 if pkl_path.exists():
342 pkl_path.unlink()
343 logger.info(
344 f"Deleted old FAISS index files at {index_path}"
345 )
346 except Exception:
347 logger.exception("Failed to delete old index files")
349 # Update RAGIndex with new dimension and reset counts
350 with get_user_db_session(
351 self.username, self.db_password
352 ) as session:
353 idx = (
354 session.query(RAGIndex)
355 .filter_by(id=rag_index.id)
356 .first()
357 )
358 if idx:
359 idx.embedding_dimension = current_dim
360 idx.chunk_count = 0
361 idx.total_documents = 0
362 session.commit()
363 logger.info(
364 f"Updated RAGIndex dimension to {current_dim}"
365 )
367 # Clear rag_document_status for this index
368 session.query(RagDocumentStatus).filter_by(
369 rag_index_id=rag_index.id
370 ).delete()
371 session.commit()
372 logger.info(
373 "Cleared indexed status for documents in this "
374 "collection"
375 )
377 # Update local reference for index creation below
378 rag_index.embedding_dimension = current_dim
379 # Fall through to create new index below
380 else:
381 # Dimensions match (or no stored dimension), load index
382 faiss_index = FAISS.load_local(
383 str(index_path.parent),
384 self.embedding_manager.embeddings,
385 index_name=index_path.stem,
386 allow_dangerous_deserialization=True,
387 normalize_L2=True,
388 )
389 logger.info(
390 f"Loaded existing FAISS index from {index_path}"
391 )
392 return faiss_index
393 except Exception as e:
394 logger.warning(
395 f"Failed to load FAISS index: {e}, creating new one"
396 )
398 # Create new FAISS index with configurable type and distance metric
399 logger.info(
400 f"Creating new FAISS index: type={self.index_type}, metric={self.distance_metric}, dimension={rag_index.embedding_dimension}"
401 )
403 # Create index based on type and distance metric
404 if self.index_type == "hnsw": 404 ↛ 407line 404 didn't jump to line 407 because the condition on line 404 was never true
405 # HNSW: Fast approximate search, best for large collections
406 # M=32 is a good default for connections per layer
407 index = IndexHNSWFlat(rag_index.embedding_dimension, 32)
408 logger.info("Created HNSW index with M=32 connections")
409 elif self.index_type == "ivf": 409 ↛ 412line 409 didn't jump to line 412 because the condition on line 409 was never true
410 # IVF requires training, for now fall back to flat
411 # TODO: Implement IVF with proper training
412 logger.warning(
413 "IVF index type not yet fully implemented, using Flat index"
414 )
415 if self.distance_metric in ("cosine", "dot_product"):
416 index = IndexFlatIP(rag_index.embedding_dimension)
417 else:
418 index = IndexFlatL2(rag_index.embedding_dimension)
419 else: # "flat" or default
420 # Flat index: Exact search
421 if self.distance_metric in ("cosine", "dot_product"): 421 ↛ 428line 421 didn't jump to line 428 because the condition on line 421 was always true
422 # For cosine similarity, use inner product (IP) with normalized vectors
423 index = IndexFlatIP(rag_index.embedding_dimension)
424 logger.info(
425 "Created Flat index with Inner Product (for cosine similarity)"
426 )
427 else: # l2
428 index = IndexFlatL2(rag_index.embedding_dimension)
429 logger.info("Created Flat index with L2 distance")
431 faiss_index = FAISS(
432 self.embedding_manager.embeddings,
433 index=index,
434 docstore=InMemoryDocstore(), # Minimal - chunks in DB
435 index_to_docstore_id={},
436 normalize_L2=self.normalize_vectors, # Use configurable normalization
437 )
438 logger.info(
439 f"FAISS index created with normalization={self.normalize_vectors}"
440 )
441 return faiss_index
443 def get_current_index_info(
444 self, collection_id: Optional[str] = None
445 ) -> Optional[Dict[str, Any]]:
446 """
447 Get information about the current RAG index for a collection.
449 Args:
450 collection_id: UUID of collection (defaults to Library if None)
451 """
452 with get_user_db_session(self.username, self.db_password) as session:
453 # Get collection name in the format stored in RAGIndex (collection_<uuid>)
454 if collection_id:
455 collection = (
456 session.query(Collection)
457 .filter_by(id=collection_id)
458 .first()
459 )
460 collection_name = (
461 f"collection_{collection_id}" if collection else "unknown"
462 )
463 else:
464 # Default to Library collection
465 from ...database.library_init import get_default_library_id
467 collection_id = get_default_library_id(
468 self.username, self.db_password
469 )
470 collection_name = f"collection_{collection_id}"
472 rag_index = (
473 session.query(RAGIndex)
474 .filter_by(collection_name=collection_name, is_current=True)
475 .first()
476 )
478 if not rag_index:
479 # Debug: check all RAG indices for this collection
480 all_indices = session.query(RAGIndex).all()
481 logger.info(
482 f"No RAG index found for collection_name='{collection_name}'. All indices: {[(idx.collection_name, idx.is_current) for idx in all_indices]}"
483 )
484 return None
486 # Calculate actual counts from rag_document_status table
487 from ...database.models.library import RagDocumentStatus
489 actual_chunk_count = (
490 session.query(func.sum(RagDocumentStatus.chunk_count))
491 .filter_by(collection_id=collection_id)
492 .scalar()
493 or 0
494 )
496 actual_doc_count = (
497 session.query(RagDocumentStatus)
498 .filter_by(collection_id=collection_id)
499 .count()
500 )
502 return {
503 "embedding_model": rag_index.embedding_model,
504 "embedding_model_type": rag_index.embedding_model_type.value
505 if rag_index.embedding_model_type
506 else None,
507 "embedding_dimension": rag_index.embedding_dimension,
508 "chunk_size": rag_index.chunk_size,
509 "chunk_overlap": rag_index.chunk_overlap,
510 "chunk_count": actual_chunk_count,
511 "total_documents": actual_doc_count,
512 "created_at": rag_index.created_at.isoformat(),
513 "last_updated_at": rag_index.last_updated_at.isoformat(),
514 }
516 def index_document(
517 self, document_id: str, collection_id: str, force_reindex: bool = False
518 ) -> Dict[str, Any]:
519 """
520 Index a single document into RAG for a specific collection.
522 Args:
523 document_id: UUID of the Document to index
524 collection_id: UUID of the Collection to index for
525 force_reindex: Whether to force reindexing even if already indexed
527 Returns:
528 Dict with status, chunk_count, and any errors
529 """
530 with get_user_db_session(self.username, self.db_password) as session:
531 # Get the document
532 document = session.query(Document).filter_by(id=document_id).first()
534 if not document:
535 return {"status": "error", "error": "Document not found"}
537 # Get or create DocumentCollection entry
538 all_doc_collections = (
539 session.query(DocumentCollection)
540 .filter_by(document_id=document_id, collection_id=collection_id)
541 .all()
542 )
544 logger.info(
545 f"Found {len(all_doc_collections)} DocumentCollection entries for doc={document_id}, coll={collection_id}"
546 )
548 doc_collection = (
549 all_doc_collections[0] if all_doc_collections else None
550 )
552 if not doc_collection:
553 # Create new DocumentCollection entry
554 doc_collection = DocumentCollection(
555 document_id=document_id,
556 collection_id=collection_id,
557 indexed=False,
558 chunk_count=0,
559 )
560 session.add(doc_collection)
561 logger.info(
562 f"Created new DocumentCollection entry for doc={document_id}, coll={collection_id}"
563 )
565 # Check if already indexed for this collection
566 if doc_collection.indexed and not force_reindex:
567 return {
568 "status": "skipped",
569 "message": "Document already indexed for this collection",
570 "chunk_count": doc_collection.chunk_count,
571 }
573 # Validate text content
574 if not document.text_content:
575 return {
576 "status": "error",
577 "error": "Document has no text content",
578 }
580 try:
581 # Create LangChain Document from text
582 doc = LangchainDocument(
583 page_content=document.text_content,
584 metadata={
585 "source": document.original_url,
586 "document_id": document_id, # Add document ID for source linking
587 "collection_id": collection_id, # Add collection ID
588 "title": document.title
589 or document.filename
590 or "Untitled",
591 "document_title": document.title
592 or document.filename
593 or "Untitled", # Add for compatibility
594 "authors": document.authors,
595 "published_date": str(document.published_date)
596 if document.published_date
597 else None,
598 "doi": document.doi,
599 "arxiv_id": document.arxiv_id,
600 "pmid": document.pmid,
601 "pmcid": document.pmcid,
602 "extraction_method": document.extraction_method,
603 "word_count": document.word_count,
604 },
605 )
607 # Split into chunks
608 chunks = self.text_splitter.split_documents([doc])
609 logger.info(
610 f"Split document {document_id} into {len(chunks)} chunks"
611 )
613 # Get collection name for chunk storage
614 collection = (
615 session.query(Collection)
616 .filter_by(id=collection_id)
617 .first()
618 )
619 # Use collection_<uuid> format for internal storage
620 collection_name = (
621 f"collection_{collection_id}" if collection else "unknown"
622 )
624 # Store chunks in database using embedding manager
625 embedding_ids = self.embedding_manager._store_chunks_to_db(
626 chunks=chunks,
627 collection_name=collection_name,
628 source_type="document",
629 source_id=document_id,
630 )
632 # Load or create FAISS index
633 if self.faiss_index is None: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true
634 self.faiss_index = self.load_or_create_faiss_index(
635 collection_id
636 )
638 # If force_reindex, remove old chunks from FAISS before adding new ones
639 if force_reindex: 639 ↛ 655line 639 didn't jump to line 655 because the condition on line 639 was always true
640 existing_ids = (
641 set(self.faiss_index.docstore._dict.keys())
642 if hasattr(self.faiss_index, "docstore")
643 else set()
644 )
645 old_chunk_ids = list(
646 {eid for eid in embedding_ids if eid in existing_ids}
647 )
648 if old_chunk_ids: 648 ↛ 655line 648 didn't jump to line 655 because the condition on line 648 was always true
649 logger.info(
650 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS"
651 )
652 self.faiss_index.delete(old_chunk_ids)
654 # Filter out chunks that already exist in FAISS (unless force_reindex)
655 if not force_reindex: 655 ↛ 656line 655 didn't jump to line 656 because the condition on line 655 was never true
656 existing_ids = (
657 set(self.faiss_index.docstore._dict.keys())
658 if hasattr(self.faiss_index, "docstore")
659 else set()
660 )
661 else:
662 existing_ids = None
664 unique_count = len(set(embedding_ids))
665 batch_dups = len(chunks) - unique_count
667 new_chunks, new_ids = self._deduplicate_chunks(
668 chunks, embedding_ids, existing_ids
669 )
671 # Add embeddings to FAISS index
672 if new_chunks: 672 ↛ 685line 672 didn't jump to line 685 because the condition on line 672 was always true
673 if force_reindex: 673 ↛ 678line 673 didn't jump to line 678 because the condition on line 673 was always true
674 logger.info(
675 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index"
676 )
677 else:
678 already_exist = unique_count - len(new_chunks)
679 logger.info(
680 f"Adding {len(new_chunks)} new embeddings to FAISS index "
681 f"({already_exist} already exist, {batch_dups} batch duplicates removed)"
682 )
683 self.faiss_index.add_documents(new_chunks, ids=new_ids)
684 else:
685 logger.info(
686 f"All {len(chunks)} chunks already exist in FAISS index, skipping"
687 )
689 # Save FAISS index
690 index_path = Path(self.rag_index_record.index_path)
691 self.faiss_index.save_local(
692 str(index_path.parent), index_name=index_path.stem
693 )
694 # Record file integrity
695 self.integrity_manager.record_file(
696 index_path,
697 related_entity_type="rag_index",
698 related_entity_id=self.rag_index_record.id,
699 )
700 logger.info(
701 f"Saved FAISS index to {index_path} with integrity tracking"
702 )
704 from datetime import datetime, UTC
705 from sqlalchemy import text
707 # Check if document was already indexed (for stats update)
708 existing_status = (
709 session.query(RagDocumentStatus)
710 .filter_by(
711 document_id=document_id, collection_id=collection_id
712 )
713 .first()
714 )
715 was_already_indexed = existing_status is not None
717 # Mark document as indexed using rag_document_status table
718 # Row existence = indexed, simple and clean
719 timestamp = datetime.now(UTC)
721 # Create or update RagDocumentStatus using ORM merge (atomic upsert)
722 rag_status = RagDocumentStatus(
723 document_id=document_id,
724 collection_id=collection_id,
725 rag_index_id=self.rag_index_record.id,
726 chunk_count=len(chunks),
727 indexed_at=timestamp,
728 )
729 session.merge(rag_status)
731 logger.info(
732 f"Marked document as indexed in rag_document_status: doc_id={document_id}, coll_id={collection_id}, chunks={len(chunks)}"
733 )
735 # Also update DocumentCollection table for backward compatibility
736 session.query(DocumentCollection).filter_by(
737 document_id=document_id, collection_id=collection_id
738 ).update(
739 {
740 "indexed": True,
741 "chunk_count": len(chunks),
742 "last_indexed_at": timestamp,
743 }
744 )
746 logger.info(
747 "Also updated DocumentCollection.indexed for backward compatibility"
748 )
750 # Update RAGIndex statistics (only if not already indexed)
751 rag_index_obj = (
752 session.query(RAGIndex)
753 .filter_by(id=self.rag_index_record.id)
754 .first()
755 )
756 if rag_index_obj and not was_already_indexed: 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true
757 rag_index_obj.chunk_count += len(chunks)
758 rag_index_obj.total_documents += 1
759 rag_index_obj.last_updated_at = datetime.now(UTC)
760 logger.info(
761 f"Updated RAGIndex stats: chunk_count +{len(chunks)}, total_documents +1"
762 )
764 # Flush ORM changes to database before commit
765 session.flush()
766 logger.info(f"Flushed ORM changes for document {document_id}")
768 # Commit the transaction
769 session.commit()
771 # WAL checkpoint after commit to ensure persistence
772 session.execute(text("PRAGMA wal_checkpoint(FULL)"))
774 logger.info(
775 f"Successfully indexed document {document_id} for collection {collection_id} "
776 f"with {len(chunks)} chunks"
777 )
779 return {
780 "status": "success",
781 "chunk_count": len(chunks),
782 "embedding_ids": embedding_ids,
783 }
785 except Exception as e:
786 logger.exception(
787 f"Error indexing document {document_id} for collection {collection_id}"
788 )
789 return {
790 "status": "error",
791 "error": f"Operation failed: {type(e).__name__}",
792 }
794 def index_all_documents(
795 self,
796 collection_id: str,
797 force_reindex: bool = False,
798 progress_callback=None,
799 ) -> Dict[str, Any]:
800 """
801 Index all documents in a collection into RAG.
803 Args:
804 collection_id: UUID of the collection to index
805 force_reindex: Whether to force reindexing already indexed documents
806 progress_callback: Optional callback function called after each document with (current, total, doc_title, status)
808 Returns:
809 Dict with counts of successful, skipped, and failed documents
810 """
811 with get_user_db_session(self.username, self.db_password) as session:
812 # Get all DocumentCollection entries for this collection
813 query = session.query(DocumentCollection).filter_by(
814 collection_id=collection_id
815 )
817 if not force_reindex:
818 # Only index documents that haven't been indexed yet
819 query = query.filter_by(indexed=False)
821 doc_collections = query.all()
823 if not doc_collections:
824 return {
825 "status": "info",
826 "message": "No documents to index",
827 "successful": 0,
828 "skipped": 0,
829 "failed": 0,
830 }
832 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
833 total = len(doc_collections)
835 for idx, doc_collection in enumerate(doc_collections, 1):
836 # Get the document for title info
837 document = (
838 session.query(Document)
839 .filter_by(id=doc_collection.document_id)
840 .first()
841 )
842 title = document.title if document else "Unknown"
844 result = self.index_document(
845 doc_collection.document_id, collection_id, force_reindex
846 )
848 if result["status"] == "success":
849 results["successful"] += 1
850 elif result["status"] == "skipped":
851 results["skipped"] += 1
852 else:
853 results["failed"] += 1
854 results["errors"].append(
855 {
856 "doc_id": doc_collection.document_id,
857 "title": title,
858 "error": result.get("error"),
859 }
860 )
862 # Call progress callback if provided
863 if progress_callback:
864 progress_callback(idx, total, title, result["status"])
866 logger.info(
867 f"Indexed collection {collection_id}: "
868 f"{results['successful']} successful, "
869 f"{results['skipped']} skipped, "
870 f"{results['failed']} failed"
871 )
873 return results
875 def remove_document_from_rag(
876 self, document_id: str, collection_id: str
877 ) -> Dict[str, Any]:
878 """
879 Remove a document's chunks from RAG for a specific collection.
881 Args:
882 document_id: UUID of the Document to remove
883 collection_id: UUID of the Collection to remove from
885 Returns:
886 Dict with status and count of removed chunks
887 """
888 with get_user_db_session(self.username, self.db_password) as session:
889 # Get the DocumentCollection entry
890 doc_collection = (
891 session.query(DocumentCollection)
892 .filter_by(document_id=document_id, collection_id=collection_id)
893 .first()
894 )
896 if not doc_collection: 896 ↛ 902line 896 didn't jump to line 902 because the condition on line 896 was always true
897 return {
898 "status": "error",
899 "error": "Document not found in collection",
900 }
902 try:
903 # Get collection name in the format collection_<uuid>
904 collection = (
905 session.query(Collection)
906 .filter_by(id=collection_id)
907 .first()
908 )
909 # Use collection_<uuid> format for internal storage
910 collection_name = (
911 f"collection_{collection_id}" if collection else "unknown"
912 )
914 # Delete chunks from database
915 deleted_count = self.embedding_manager._delete_chunks_from_db(
916 collection_name=collection_name,
917 source_id=document_id,
918 )
920 # Update DocumentCollection RAG status
921 doc_collection.indexed = False
922 doc_collection.chunk_count = 0
923 doc_collection.last_indexed_at = None
924 session.commit()
926 logger.info(
927 f"Removed {deleted_count} chunks for document {document_id} from collection {collection_id}"
928 )
930 return {"status": "success", "deleted_count": deleted_count}
932 except Exception as e:
933 logger.exception(
934 f"Error removing document {document_id} from collection {collection_id}"
935 )
936 return {
937 "status": "error",
938 "error": f"Operation failed: {type(e).__name__}",
939 }
941 def index_documents_batch(
942 self,
943 doc_info: List[tuple],
944 collection_id: str,
945 force_reindex: bool = False,
946 ) -> Dict[str, Dict[str, Any]]:
947 """
948 Index multiple documents in a batch for a specific collection.
950 Args:
951 doc_info: List of (doc_id, title) tuples
952 collection_id: UUID of the collection to index for
953 force_reindex: Whether to force reindexing even if already indexed
955 Returns:
956 Dict mapping doc_id to individual result
957 """
958 results = {}
959 doc_ids = [doc_id for doc_id, _ in doc_info]
961 # Use single database session for querying
962 with get_user_db_session(self.username, self.db_password) as session:
963 # Pre-load all documents for this batch
964 documents = (
965 session.query(Document).filter(Document.id.in_(doc_ids)).all()
966 )
968 # Create lookup for quick access
969 doc_lookup = {doc.id: doc for doc in documents}
971 # Pre-load DocumentCollection entries
972 doc_collections = (
973 session.query(DocumentCollection)
974 .filter(
975 DocumentCollection.document_id.in_(doc_ids),
976 DocumentCollection.collection_id == collection_id,
977 )
978 .all()
979 )
980 doc_collection_lookup = {
981 dc.document_id: dc for dc in doc_collections
982 }
984 # Process each document in the batch
985 for doc_id, title in doc_info:
986 document = doc_lookup.get(doc_id)
988 if not document: 988 ↛ 989line 988 didn't jump to line 989 because the condition on line 988 was never true
989 results[doc_id] = {
990 "status": "error",
991 "error": "Document not found",
992 }
993 continue
995 # Check if already indexed via DocumentCollection
996 doc_collection = doc_collection_lookup.get(doc_id)
997 if ( 997 ↛ 1002line 997 didn't jump to line 1002 because the condition on line 997 was never true
998 doc_collection
999 and doc_collection.indexed
1000 and not force_reindex
1001 ):
1002 results[doc_id] = {
1003 "status": "skipped",
1004 "message": "Document already indexed for this collection",
1005 "chunk_count": doc_collection.chunk_count,
1006 }
1007 continue
1009 # Validate text content
1010 if not document.text_content: 1010 ↛ 1011line 1010 didn't jump to line 1011 because the condition on line 1010 was never true
1011 results[doc_id] = {
1012 "status": "error",
1013 "error": "Document has no text content",
1014 }
1015 continue
1017 # Index the document
1018 try:
1019 result = self.index_document(
1020 doc_id, collection_id, force_reindex
1021 )
1022 results[doc_id] = result
1023 except Exception as e:
1024 logger.exception(
1025 f"Error indexing document {doc_id} in batch"
1026 )
1027 results[doc_id] = {
1028 "status": "error",
1029 "error": f"Indexing failed: {type(e).__name__}",
1030 }
1032 return results
1034 def get_rag_stats(
1035 self, collection_id: Optional[str] = None
1036 ) -> Dict[str, Any]:
1037 """
1038 Get RAG statistics for a collection.
1040 Args:
1041 collection_id: UUID of the collection (defaults to Library)
1043 Returns:
1044 Dict with counts and metadata about indexed documents
1045 """
1046 with get_user_db_session(self.username, self.db_password) as session:
1047 # Get collection ID (default to Library)
1048 if not collection_id: 1048 ↛ 1049line 1048 didn't jump to line 1049 because the condition on line 1048 was never true
1049 from ...database.library_init import get_default_library_id
1051 collection_id = get_default_library_id(
1052 self.username, self.db_password
1053 )
1055 # Count total documents in collection
1056 total_docs = (
1057 session.query(DocumentCollection)
1058 .filter_by(collection_id=collection_id)
1059 .count()
1060 )
1062 # Count indexed documents from rag_document_status table
1063 from ...database.models.library import RagDocumentStatus
1065 indexed_docs = (
1066 session.query(RagDocumentStatus)
1067 .filter_by(collection_id=collection_id)
1068 .count()
1069 )
1071 # Count total chunks from rag_document_status table
1072 total_chunks = (
1073 session.query(func.sum(RagDocumentStatus.chunk_count))
1074 .filter_by(collection_id=collection_id)
1075 .scalar()
1076 or 0
1077 )
1079 # Get collection name in the format stored in DocumentChunk (collection_<uuid>)
1080 collection = (
1081 session.query(Collection).filter_by(id=collection_id).first()
1082 )
1083 collection_name = (
1084 f"collection_{collection_id}" if collection else "library"
1085 )
1087 # Get embedding model info from chunks
1088 chunk_sample = (
1089 session.query(DocumentChunk)
1090 .filter_by(collection_name=collection_name)
1091 .first()
1092 )
1094 embedding_info = {}
1095 if chunk_sample: 1095 ↛ 1096line 1095 didn't jump to line 1096 because the condition on line 1095 was never true
1096 embedding_info = {
1097 "model": chunk_sample.embedding_model,
1098 "model_type": chunk_sample.embedding_model_type.value
1099 if chunk_sample.embedding_model_type
1100 else None,
1101 "dimension": chunk_sample.embedding_dimension,
1102 }
1104 return {
1105 "total_documents": total_docs,
1106 "indexed_documents": indexed_docs,
1107 "unindexed_documents": total_docs - indexed_docs,
1108 "total_chunks": total_chunks,
1109 "embedding_info": embedding_info,
1110 "chunk_size": self.chunk_size,
1111 "chunk_overlap": self.chunk_overlap,
1112 }
1114 def index_local_file(self, file_path: str) -> Dict[str, Any]:
1115 """
1116 Index a local file from the filesystem into RAG.
1118 Args:
1119 file_path: Path to the file to index
1121 Returns:
1122 Dict with status, chunk_count, and any errors
1123 """
1124 from pathlib import Path
1125 import mimetypes
1127 file_path = Path(file_path)
1129 if not file_path.exists(): 1129 ↛ 1130line 1129 didn't jump to line 1130 because the condition on line 1129 was never true
1130 return {"status": "error", "error": f"File not found: {file_path}"}
1132 if not file_path.is_file(): 1132 ↛ 1133line 1132 didn't jump to line 1133 because the condition on line 1132 was never true
1133 return {"status": "error", "error": f"Not a file: {file_path}"}
1135 # Determine file type
1136 mime_type, _ = mimetypes.guess_type(str(file_path))
1138 # Read file content based on type
1139 try:
1140 if file_path.suffix.lower() in [".txt", ".md", ".markdown"]: 1140 ↛ 1144line 1140 didn't jump to line 1144 because the condition on line 1140 was always true
1141 # Text files
1142 with open(file_path, "r", encoding="utf-8") as f:
1143 content = f.read()
1144 elif file_path.suffix.lower() in [".html", ".htm"]:
1145 # HTML files - strip tags
1146 from bs4 import BeautifulSoup
1148 with open(file_path, "r", encoding="utf-8") as f:
1149 soup = BeautifulSoup(f.read(), "html.parser")
1150 content = soup.get_text()
1151 elif file_path.suffix.lower() == ".pdf":
1152 # PDF files - extract text
1153 import PyPDF2
1155 content = ""
1156 with open(file_path, "rb") as f:
1157 pdf_reader = PyPDF2.PdfReader(f)
1158 for page in pdf_reader.pages:
1159 content += page.extract_text()
1160 else:
1161 return {
1162 "status": "skipped",
1163 "error": f"Unsupported file type: {file_path.suffix}",
1164 }
1166 if not content or len(content.strip()) < 10: 1166 ↛ 1167line 1166 didn't jump to line 1167 because the condition on line 1166 was never true
1167 return {
1168 "status": "error",
1169 "error": "File has no extractable text content",
1170 }
1172 # Create LangChain Document from text
1173 doc = LangchainDocument(
1174 page_content=content,
1175 metadata={
1176 "source": str(file_path),
1177 "source_id": f"local_{file_path.stem}_{hash(str(file_path))}",
1178 "title": file_path.stem,
1179 "document_title": file_path.stem,
1180 "file_type": file_path.suffix.lower(),
1181 "file_size": file_path.stat().st_size,
1182 "source_type": "local_file",
1183 "collection": "local_library",
1184 },
1185 )
1187 # Split into chunks
1188 chunks = self.text_splitter.split_documents([doc])
1189 logger.info(
1190 f"Split local file {file_path} into {len(chunks)} chunks"
1191 )
1193 # Store chunks in database (returns UUID-based IDs)
1194 embedding_ids = self.embedding_manager._store_chunks_to_db(
1195 chunks=chunks,
1196 collection_name="local_library",
1197 source_type="local_file",
1198 source_id=str(file_path),
1199 )
1201 # Load or create FAISS index using default library collection
1202 if self.faiss_index is None:
1203 from ...database.library_init import get_default_library_id
1205 default_collection_id = get_default_library_id(
1206 self.username, self.db_password
1207 )
1208 self.faiss_index = self.load_or_create_faiss_index(
1209 default_collection_id
1210 )
1212 # Filter out chunks that already exist in FAISS and deduplicate
1213 if self.faiss_index is not None: 1213 ↛ 1220line 1213 didn't jump to line 1220 because the condition on line 1213 was always true
1214 existing_ids = (
1215 set(self.faiss_index.docstore._dict.keys())
1216 if hasattr(self.faiss_index, "docstore")
1217 else set()
1218 )
1219 else:
1220 existing_ids = None
1221 new_chunks, new_ids = self._deduplicate_chunks(
1222 chunks, embedding_ids, existing_ids
1223 )
1225 # Add embeddings to FAISS index
1226 if new_chunks: 1226 ↛ 1230line 1226 didn't jump to line 1230 because the condition on line 1226 was always true
1227 self.faiss_index.add_documents(new_chunks, ids=new_ids)
1229 # Save FAISS index
1230 index_path = (
1231 Path(self.rag_index_record.index_path)
1232 if self.rag_index_record
1233 else None
1234 )
1235 if index_path:
1236 self.faiss_index.save_local(
1237 str(index_path.parent), index_name=index_path.stem
1238 )
1239 # Record file integrity
1240 self.integrity_manager.record_file(
1241 index_path,
1242 related_entity_type="rag_index",
1243 related_entity_id=self.rag_index_record.id,
1244 )
1245 logger.info(
1246 f"Saved FAISS index to {index_path} with integrity tracking"
1247 )
1249 logger.info(
1250 f"Successfully indexed local file {file_path} with {len(new_chunks)} new chunks "
1251 f"({len(chunks) - len(new_chunks)} skipped)"
1252 )
1254 return {
1255 "status": "success",
1256 "chunk_count": len(new_chunks),
1257 "embedding_ids": new_ids,
1258 }
1260 except Exception as e:
1261 logger.exception(f"Error indexing local file {file_path}")
1262 return {
1263 "status": "error",
1264 "error": f"Operation failed: {type(e).__name__}",
1265 }
1267 def index_user_document(
1268 self, user_doc, collection_name: str, force_reindex: bool = False
1269 ) -> Dict[str, Any]:
1270 """
1271 Index a user-uploaded document into a specific collection.
1273 Args:
1274 user_doc: UserDocument object
1275 collection_name: Name of the collection (e.g., "collection_123")
1276 force_reindex: Whether to force reindexing
1278 Returns:
1279 Dict with status, chunk_count, and any errors
1280 """
1282 try:
1283 # Use the pre-extracted text content
1284 content = user_doc.text_content
1286 if not content or len(content.strip()) < 10: 1286 ↛ 1287line 1286 didn't jump to line 1287 because the condition on line 1286 was never true
1287 return {
1288 "status": "error",
1289 "error": "Document has no extractable text content",
1290 }
1292 # Create LangChain Document
1293 doc = LangchainDocument(
1294 page_content=content,
1295 metadata={
1296 "source": f"user_upload_{user_doc.id}",
1297 "source_id": user_doc.id,
1298 "title": user_doc.filename,
1299 "document_title": user_doc.filename,
1300 "file_type": user_doc.file_type,
1301 "file_size": user_doc.file_size,
1302 "collection": collection_name,
1303 },
1304 )
1306 # Split into chunks
1307 chunks = self.text_splitter.split_documents([doc])
1308 logger.info(
1309 f"Split user document {user_doc.filename} into {len(chunks)} chunks"
1310 )
1312 # Store chunks in database
1313 embedding_ids = self.embedding_manager._store_chunks_to_db(
1314 chunks=chunks,
1315 collection_name=collection_name,
1316 source_type="user_document",
1317 source_id=user_doc.id,
1318 )
1320 # Load or create FAISS index for this collection
1321 if self.faiss_index is None:
1322 # Extract collection_id from collection_name (format: "collection_<uuid>")
1323 collection_id = collection_name.removeprefix("collection_")
1324 self.faiss_index = self.load_or_create_faiss_index(
1325 collection_id
1326 )
1328 # If force_reindex, remove old chunks from FAISS before adding new ones
1329 if force_reindex:
1330 existing_ids = (
1331 set(self.faiss_index.docstore._dict.keys())
1332 if hasattr(self.faiss_index, "docstore")
1333 else set()
1334 )
1335 old_chunk_ids = list(
1336 {eid for eid in embedding_ids if eid in existing_ids}
1337 )
1338 if old_chunk_ids: 1338 ↛ 1345line 1338 didn't jump to line 1345 because the condition on line 1338 was always true
1339 logger.info(
1340 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS"
1341 )
1342 self.faiss_index.delete(old_chunk_ids)
1344 # Filter out chunks that already exist in FAISS (unless force_reindex)
1345 if not force_reindex:
1346 existing_ids = (
1347 set(self.faiss_index.docstore._dict.keys())
1348 if hasattr(self.faiss_index, "docstore")
1349 else set()
1350 )
1351 else:
1352 existing_ids = None
1354 unique_count = len(set(embedding_ids))
1355 batch_dups = len(chunks) - unique_count
1357 new_chunks, new_ids = self._deduplicate_chunks(
1358 chunks, embedding_ids, existing_ids
1359 )
1361 # Add embeddings to FAISS index
1362 if new_chunks: 1362 ↛ 1375line 1362 didn't jump to line 1375 because the condition on line 1362 was always true
1363 if force_reindex:
1364 logger.info(
1365 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index"
1366 )
1367 else:
1368 already_exist = unique_count - len(new_chunks)
1369 logger.info(
1370 f"Adding {len(new_chunks)} new chunks to FAISS index "
1371 f"({already_exist} already exist, {batch_dups} batch duplicates removed)"
1372 )
1373 self.faiss_index.add_documents(new_chunks, ids=new_ids)
1374 else:
1375 logger.info(
1376 f"All {len(chunks)} chunks already exist in FAISS index, skipping"
1377 )
1379 # Save FAISS index
1380 index_path = (
1381 Path(self.rag_index_record.index_path)
1382 if self.rag_index_record
1383 else None
1384 )
1385 if index_path:
1386 self.faiss_index.save_local(
1387 str(index_path.parent), index_name=index_path.stem
1388 )
1389 # Record file integrity
1390 self.integrity_manager.record_file(
1391 index_path,
1392 related_entity_type="rag_index",
1393 related_entity_id=self.rag_index_record.id,
1394 )
1396 logger.info(
1397 f"Successfully indexed user document {user_doc.filename} with {len(chunks)} chunks"
1398 )
1400 return {
1401 "status": "success",
1402 "chunk_count": len(chunks),
1403 "embedding_ids": embedding_ids,
1404 }
1406 except Exception as e:
1407 logger.exception(
1408 f"Error indexing user document {user_doc.filename}"
1409 )
1410 return {
1411 "status": "error",
1412 "error": f"Operation failed: {type(e).__name__}",
1413 }
1415 def remove_collection_from_index(
1416 self, collection_name: str
1417 ) -> Dict[str, Any]:
1418 """
1419 Remove all documents from a collection from the FAISS index.
1421 Args:
1422 collection_name: Name of the collection (e.g., "collection_123")
1424 Returns:
1425 Dict with status and count of removed chunks
1426 """
1427 from ...database.models import DocumentChunk
1428 from ...database.session_context import get_user_db_session
1430 try:
1431 with get_user_db_session(
1432 self.username, self.db_password
1433 ) as session:
1434 # Get all chunk IDs for this collection
1435 chunks = (
1436 session.query(DocumentChunk)
1437 .filter_by(collection_name=collection_name)
1438 .all()
1439 )
1441 if not chunks: 1441 ↛ 1442line 1441 didn't jump to line 1442 because the condition on line 1441 was never true
1442 return {"status": "success", "deleted_count": 0}
1444 chunk_ids = [
1445 f"{collection_name}_{chunk.id}" for chunk in chunks
1446 ]
1448 # Load FAISS index if not already loaded
1449 if self.faiss_index is None: 1449 ↛ 1457line 1449 didn't jump to line 1457 because the condition on line 1449 was always true
1450 # Extract collection_id from collection_name (format: "collection_<uuid>")
1451 collection_id = collection_name.removeprefix("collection_")
1452 self.faiss_index = self.load_or_create_faiss_index(
1453 collection_id
1454 )
1456 # Remove from FAISS index
1457 if hasattr(self.faiss_index, "delete"): 1457 ↛ 1483line 1457 didn't jump to line 1483 because the condition on line 1457 was always true
1458 try:
1459 self.faiss_index.delete(chunk_ids)
1461 # Save updated index
1462 index_path = (
1463 Path(self.rag_index_record.index_path)
1464 if self.rag_index_record
1465 else None
1466 )
1467 if index_path: 1467 ↛ 1468line 1467 didn't jump to line 1468 because the condition on line 1467 was never true
1468 self.faiss_index.save_local(
1469 str(index_path.parent),
1470 index_name=index_path.stem,
1471 )
1472 # Record file integrity
1473 self.integrity_manager.record_file(
1474 index_path,
1475 related_entity_type="rag_index",
1476 related_entity_id=self.rag_index_record.id,
1477 )
1478 except Exception as e:
1479 logger.warning(
1480 f"Could not delete chunks from FAISS: {e}"
1481 )
1483 logger.info(
1484 f"Removed {len(chunk_ids)} chunks from collection {collection_name}"
1485 )
1487 return {"status": "success", "deleted_count": len(chunk_ids)}
1489 except Exception as e:
1490 logger.exception(
1491 f"Error removing collection {collection_name} from index"
1492 )
1493 return {
1494 "status": "error",
1495 "error": f"Operation failed: {type(e).__name__}",
1496 }
1498 def search_library(
1499 self, query: str, limit: int = 10, score_threshold: float = 0.0
1500 ) -> List[Dict[str, Any]]:
1501 """
1502 Search library documents using semantic search.
1504 Args:
1505 query: Search query
1506 limit: Maximum number of results
1507 score_threshold: Minimum similarity score
1509 Returns:
1510 List of results with content, metadata, and similarity scores
1511 """
1512 # This will be implemented when we integrate with the search system
1513 # For now, raise NotImplementedError
1514 raise NotImplementedError(
1515 "Library search will be implemented in the search integration phase"
1516 )