Coverage for src/local_deep_research/research_library/services/library_rag_service.py: 95%
549 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1"""
2Library RAG Service
4Handles indexing and searching library documents using RAG:
5- Index text documents into vector database
6- Chunk documents for semantic search
7- Generate embeddings using local models
8- Manage FAISS indices per research
9- Track RAG status in library
10"""
12import threading
13import time
14from pathlib import Path
15from typing import Any, Dict, List, Optional, Tuple
17from langchain_core.documents import Document as LangchainDocument
18from loguru import logger
19from sqlalchemy import func
21from ...config.paths import get_cache_directory
22from ...database.models.library import (
23 Document,
24 DocumentChunk,
25 DocumentCollection,
26 Collection,
27 RAGIndex,
28 RagDocumentStatus,
29 EmbeddingProvider,
30)
31from ...database.session_context import get_user_db_session, safe_rollback
32from ...utilities.type_utils import to_bool
33from ..utils import ensure_in_collection
34from ...embeddings.splitters import get_text_splitter
35from ...web_search_engines.engines.local_embedding_manager import (
36 LocalEmbeddingManager,
37)
38from ...security.file_integrity import FileIntegrityManager, FAISSIndexVerifier
39import hashlib
40from faiss import IndexFlatL2, IndexFlatIP, IndexHNSWFlat
41from langchain_community.vectorstores import FAISS
42from langchain_community.docstore.in_memory import InMemoryDocstore
45# Module-level locks serialise the FAISS save+record critical section and the
46# load_or_create verify→quarantine→build sequence per (username, index_path).
47# MUST be module-level: each auto-index / scheduler / search worker constructs
48# its own LibraryRAGService, so an instance-scoped lock would coordinate
49# nothing. Pattern is adapted from web/queue/processor_v2._user_critical_locks
50# — instance-scoped→module-scoped, username-only-key→(username, path)-key.
51# See #4197 for the race this guards (concurrent save_local interleaves bytes,
52# producing checksum_mismatch → destructive unlink, lost data).
53_faiss_write_locks: Dict[Tuple[str, str], threading.Lock] = {}
54_faiss_write_locks_lock = threading.Lock()
56# Hard cap on suffix-increment retries when generating the .corrupt-<ns> path.
57# Normal case is one attempt — same-ns collisions only happen if the user
58# manually created such files. 32 is a safety bound that converts a deadlock
59# into a loud OSError.
60_QUARANTINE_SUFFIX_RETRY_CAP = 32
62# After a successful quarantine, keep at most this many older .corrupt-*
63# files per base path (per side: .faiss.corrupt-* and .pkl.corrupt-* are
64# counted independently). Prevents unbounded disk growth on systems that
65# experience recurring corruption while preserving recent diagnostic
66# artefacts. Keeping 5 means the user has the last 5 corruption events
67# to inspect / submit with a bug report; anything older is dropped.
68_QUARANTINE_KEEP_RECENT = 5
71def _get_faiss_write_lock(username: str, index_path: str) -> threading.Lock:
72 """Return the lock for ``(username, index_path)``, creating it on first
73 access. Key is normalised via ``Path.resolve()`` to match
74 ``FileIntegrityManager._normalize_path`` so writers/readers/quarantine
75 all agree on identity.
76 """
77 key = (username, str(Path(index_path).resolve()))
78 with _faiss_write_locks_lock:
79 lock = _faiss_write_locks.get(key)
80 if lock is None:
81 lock = threading.Lock()
82 _faiss_write_locks[key] = lock
83 return lock
86def pop_faiss_locks_for_user(username: str) -> None:
87 """Remove all FAISS-write locks belonging to ``username``.
89 Called from the user-close paths (connection_cleanup) so the dict
90 doesn't grow one entry per (user × collection) across the process
91 lifetime. Safe to call while a lock is held: the holder keeps using
92 its local reference, and the next access lazily creates a fresh
93 lock. Same semantics as ``pop_user_critical_lock`` in
94 web/queue/processor_v2.py.
95 """
96 with _faiss_write_locks_lock:
97 stale = [k for k in _faiss_write_locks if k[0] == username]
98 for k in stale:
99 _faiss_write_locks.pop(k, None)
102class LibraryRAGService:
103 """Service for managing RAG indexing of library documents."""
105 def __init__(
106 self,
107 username: str,
108 embedding_model: str = "all-MiniLM-L6-v2",
109 embedding_provider: str = "sentence_transformers",
110 chunk_size: int = 1000,
111 chunk_overlap: int = 200,
112 splitter_type: str = "recursive",
113 text_separators: Optional[list] = None,
114 distance_metric: str = "cosine",
115 normalize_vectors: bool = True,
116 index_type: str = "flat",
117 embedding_manager: Optional["LocalEmbeddingManager"] = None,
118 db_password: Optional[str] = None,
119 ):
120 """
121 Initialize library RAG service for a user.
123 Args:
124 username: Username for database access
125 embedding_model: Name of the embedding model to use
126 embedding_provider: Provider type ('sentence_transformers' or 'ollama')
127 chunk_size: Size of text chunks for splitting
128 chunk_overlap: Overlap between consecutive chunks
129 splitter_type: Type of splitter ('recursive', 'token', 'sentence', 'semantic')
130 text_separators: List of text separators for chunking (default: ["\n\n", "\n", ". ", " ", ""])
131 distance_metric: Distance metric ('cosine', 'l2', or 'dot_product')
132 normalize_vectors: Whether to normalize vectors with L2
133 index_type: FAISS index type ('flat', 'hnsw', or 'ivf')
134 embedding_manager: Optional pre-constructed LocalEmbeddingManager for testing/flexibility
135 db_password: Optional database password for background thread access
136 """
137 self.username = username
138 self._db_password = db_password # Can be used for thread access
139 # Initialize optional attributes to None before they're set below
140 # This allows the db_password setter to check them without hasattr
141 self.embedding_manager = None
142 self.integrity_manager = None
143 self.embedding_model = embedding_model
144 self.embedding_provider = embedding_provider
145 self.chunk_size = chunk_size
146 self.chunk_overlap = chunk_overlap
147 self.splitter_type = splitter_type
148 self.text_separators = (
149 text_separators
150 if text_separators is not None
151 else ["\n\n", "\n", ". ", " ", ""]
152 )
153 self.distance_metric = distance_metric
154 # Ensure normalize_vectors is always a proper boolean
155 self.normalize_vectors = to_bool(normalize_vectors, default=True)
156 self.index_type = index_type
158 # Emit the active configuration so users can confirm their
159 # UI-configured embedding settings are being honored (regression
160 # signal for #3453).
161 logger.info(
162 f"RAG service initialized for user={username}: "
163 f"provider={embedding_provider} model={embedding_model} "
164 f"chunk_size={chunk_size} chunk_overlap={chunk_overlap} "
165 f"splitter={splitter_type} index_type={index_type}"
166 )
168 # Use provided embedding manager or create a new one
169 # (Must be created before text splitter for semantic chunking)
170 # Track ownership so close() only tears down the manager when we
171 # constructed it — a caller-supplied manager stays under caller
172 # control (test fixtures, multi-service callers reusing one manager).
173 self._owns_embedding_manager = embedding_manager is None
174 if embedding_manager is not None:
175 self.embedding_manager = embedding_manager
176 else:
177 # Initialize embedding manager with library collection
178 # Load the complete user settings snapshot from database using the proper method
179 from ...settings.manager import SettingsManager
181 # Use proper database session for SettingsManager
182 # Note: using _db_password (backing field) directly here because the
183 # db_password property setter propagates to embedding_manager/integrity_manager,
184 # which are still None at this point in __init__.
185 with get_user_db_session(username, self._db_password) as session:
186 settings_manager = SettingsManager(session)
187 settings_snapshot = settings_manager.get_settings_snapshot()
189 # Add the specific settings needed for this RAG service
190 settings_snapshot.update(
191 {
192 "_username": username,
193 "embeddings.provider": embedding_provider,
194 f"embeddings.{embedding_provider}.model": embedding_model,
195 "local_search_chunk_size": chunk_size,
196 "local_search_chunk_overlap": chunk_overlap,
197 }
198 )
200 self.embedding_manager = LocalEmbeddingManager(
201 embedding_model=embedding_model,
202 embedding_model_type=embedding_provider,
203 settings_snapshot=settings_snapshot,
204 )
206 # Initialize text splitter based on type
207 # (Must be created AFTER embedding_manager for semantic chunking)
208 self.text_splitter = get_text_splitter(
209 splitter_type=self.splitter_type,
210 chunk_size=self.chunk_size,
211 chunk_overlap=self.chunk_overlap,
212 text_separators=self.text_separators,
213 embeddings=self.embedding_manager.embeddings
214 if self.splitter_type == "semantic"
215 else None,
216 )
218 # Initialize or load FAISS index for library collection
219 self.faiss_index = None
220 self.rag_index_record = None
222 # Initialize file integrity manager for FAISS indexes
223 self.integrity_manager = FileIntegrityManager(
224 username, password=self._db_password
225 )
226 self.integrity_manager.register_verifier(FAISSIndexVerifier())
228 self._closed = False
230 def close(self):
231 """Release embedding model and index resources."""
232 if self._closed:
233 return
234 self._closed = True
236 # Release embedding manager (which in turn closes the underlying
237 # OllamaEmbeddings httpx clients — see LocalEmbeddingManager.close).
238 # Only when we own it; caller-supplied managers stay under caller
239 # control to avoid double-close / use-after-close.
240 if self.embedding_manager is not None:
241 if self._owns_embedding_manager:
242 self.embedding_manager.close()
243 self.embedding_manager = None
245 # Clear FAISS index
246 if self.faiss_index is not None:
247 self.faiss_index = None
249 # Clear other resources
250 self.rag_index_record = None
251 self.integrity_manager = None
252 self.text_splitter = None
254 def __enter__(self):
255 """Enter context manager."""
256 return self
258 def __exit__(self, exc_type, exc_val, exc_tb):
259 """Exit context manager, ensuring cleanup."""
260 self.close()
261 return False
263 @property
264 def db_password(self):
265 """Get database password."""
266 return self._db_password
268 @db_password.setter
269 def db_password(self, value):
270 """Set database password and propagate to embedding manager and integrity manager."""
271 self._db_password = value
272 if self.embedding_manager:
273 self.embedding_manager.db_password = value
274 if self.integrity_manager:
275 self.integrity_manager.password = value
277 def _get_index_hash(
278 self,
279 collection_name: str,
280 embedding_model: str,
281 embedding_model_type: str,
282 ) -> str:
283 """Generate hash for index identification."""
284 hash_input = (
285 f"{collection_name}:{embedding_model}:{embedding_model_type}"
286 )
287 return hashlib.sha256(hash_input.encode()).hexdigest()
289 def _get_index_path(self, index_hash: str) -> Path:
290 """Get path for FAISS index file."""
291 # Store in centralized cache directory (respects LDR_DATA_DIR)
292 cache_dir = get_cache_directory() / "rag_indices"
293 cache_dir.mkdir(parents=True, exist_ok=True)
294 return cache_dir / f"{index_hash}.faiss"
296 @staticmethod
297 def _deduplicate_chunks(
298 chunks: List[LangchainDocument],
299 chunk_ids: List[str],
300 existing_ids: Optional[set] = None,
301 ) -> Tuple[List[LangchainDocument], List[str]]:
302 """Deduplicate chunks by ID within a batch, optionally excluding existing IDs."""
303 seen_ids: set = set()
304 new_chunks: List[LangchainDocument] = []
305 new_ids: List[str] = []
306 for chunk, chunk_id in zip(chunks, chunk_ids):
307 if chunk_id not in seen_ids and (
308 existing_ids is None or chunk_id not in existing_ids
309 ):
310 new_chunks.append(chunk)
311 new_ids.append(chunk_id)
312 seen_ids.add(chunk_id)
313 return new_chunks, new_ids
315 def _get_or_create_rag_index(self, collection_id: str) -> RAGIndex:
316 """Get or create RAGIndex record for the current configuration."""
317 with get_user_db_session(self.username, self.db_password) as session:
318 # Use collection_<uuid> format
319 collection_name = f"collection_{collection_id}"
320 index_hash = self._get_index_hash(
321 collection_name, self.embedding_model, self.embedding_provider
322 )
324 # Try to get existing index
325 rag_index = (
326 session.query(RAGIndex).filter_by(index_hash=index_hash).first()
327 )
329 if not rag_index:
330 # Create new index record
331 index_path = self._get_index_path(index_hash)
333 # Get embedding dimension by embedding a test string
334 test_embedding = self.embedding_manager.embeddings.embed_query(
335 "test"
336 )
337 embedding_dim = len(test_embedding)
339 rag_index = RAGIndex(
340 collection_name=collection_name,
341 embedding_model=self.embedding_model,
342 embedding_model_type=EmbeddingProvider(
343 self.embedding_provider
344 ),
345 embedding_dimension=embedding_dim,
346 index_path=str(index_path),
347 index_hash=index_hash,
348 chunk_size=self.chunk_size,
349 chunk_overlap=self.chunk_overlap,
350 splitter_type=self.splitter_type,
351 text_separators=self.text_separators,
352 distance_metric=self.distance_metric,
353 normalize_vectors=self.normalize_vectors,
354 index_type=self.index_type,
355 chunk_count=0,
356 total_documents=0,
357 status="active",
358 is_current=True,
359 )
360 session.add(rag_index)
361 session.commit()
362 session.refresh(rag_index)
363 logger.info(f"Created new RAG index: {index_hash}")
365 return rag_index
367 def _quarantine_corrupt_index(self, index_path: Path, reason: str) -> None:
368 """Rename a corrupted FAISS index and its ``.pkl`` companion to
369 ``<path>.corrupt-<ns>`` instead of deleting them.
371 Preserves user data for inspection/recovery. The
372 dimension-mismatch branch elsewhere in this method intentionally
373 *deletes* — that case rebuilds from scratch and the old bytes
374 are unreadable with the new model. This helper is for the two
375 "transient or unknown failure" branches (verify_file said no,
376 load_local raised) where the on-disk bytes may still be
377 usable by a human.
379 Raises ``OSError`` on rename failure (disk full, read-only fs,
380 permission denied). Re-raising prevents silent data loss: if we
381 swallowed the error, the next ``save_local`` would truncate the
382 corrupt bytes anyway. Caller paths log the exception via the
383 broader try/except around their indexer call.
385 # TODO(#4197-followup): FileIntegrityRecord.consecutive_failures
386 # is not reset by the next record_file call, leaking failure
387 # counts across recovery cycles. Orthogonal to this fix.
388 """
389 ns = time.time_ns()
390 pkl_path = index_path.with_suffix(".pkl")
392 # Same-nanosecond collisions essentially can't happen between
393 # concurrent threads (we hold the per-path lock), but a user
394 # could manually create such files. Loop with a hard cap so a
395 # weird state surfaces as a clean OSError rather than a hang.
396 suffix_n = 0
397 faiss_target = Path(f"{index_path}.corrupt-{ns}")
398 pkl_target = Path(f"{pkl_path}.corrupt-{ns}")
399 while faiss_target.exists() or pkl_target.exists():
400 suffix_n += 1
401 if suffix_n > _QUARANTINE_SUFFIX_RETRY_CAP:
402 raise OSError(
403 f"Quarantine path collisions exceeded "
404 f"{_QUARANTINE_SUFFIX_RETRY_CAP} retries for "
405 f"{index_path}"
406 )
407 faiss_target = Path(f"{index_path}.corrupt-{ns}-{suffix_n}")
408 pkl_target = Path(f"{pkl_path}.corrupt-{ns}-{suffix_n}")
410 try:
411 index_path.rename(faiss_target)
412 logger.warning(
413 f"Quarantined corrupted FAISS index to {faiss_target} "
414 f"(reason: {reason})"
415 )
416 if pkl_path.exists():
417 pkl_path.rename(pkl_target)
418 logger.info(f"Quarantined companion PKL to {pkl_target}")
419 else:
420 # Missing .pkl is recoverable (FAISS may have crashed
421 # mid-write between .faiss and .pkl). Not re-raised:
422 # the .faiss is already preserved, fresh build will
423 # write a fresh .pkl.
424 logger.warning(
425 f"PKL companion missing at {pkl_path}; only "
426 f".faiss quarantined."
427 )
428 except OSError:
429 # Disk-full, read-only fs, or permission error. Re-raise
430 # so the caller surfaces a real failure instead of letting
431 # the next save_local truncate the corrupt bytes.
432 logger.exception(
433 f"Failed to quarantine corrupted index at {index_path}"
434 )
435 raise
437 # Best-effort retention sweep. The quarantine itself succeeded
438 # above; failing to prune older files is not a correctness
439 # issue, just a disk-usage one — log and move on.
440 self._prune_old_quarantined_files(index_path)
442 @staticmethod
443 def _corrupt_sort_key(path: Path) -> Tuple[int, int]:
444 """Extract ``(ns, suffix_n)`` from a ``.corrupt-<ns>[-<n>]``
445 filename so retention can sort by the monotonic nanosecond
446 suffix the quarantine path embeds — *not* by ``st_mtime``.
448 Filesystem timestamp granularity is sometimes 1s or 2s
449 (FAT32/ext3/SMB shares), making mtime ordering non-deterministic
450 when multiple quarantines happen within one tick. The
451 ``.corrupt-<ns>`` suffix carries the original ``time.time_ns()``
452 from the quarantine and is reliable across all filesystems.
454 Returns ``(-1, -1)`` for malformed names (manually-placed
455 files) so they sort below any real entry under ``reverse=True``
456 — i.e., they get pruned first.
457 """
458 name = path.name
459 marker = ".corrupt-"
460 idx = name.rfind(marker)
461 if idx == -1: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 return (-1, -1)
463 tail = name[idx + len(marker) :]
464 parts = tail.split("-")
465 try:
466 ns = int(parts[0])
467 except ValueError:
468 return (-1, -1)
469 suffix_n = 0
470 if len(parts) > 1:
471 try:
472 suffix_n = int(parts[-1])
473 except ValueError:
474 # Unknown trailing component — keep the file but at the
475 # base ns ordering.
476 pass
477 return (ns, suffix_n)
479 def _prune_old_quarantined_files(self, index_path: Path) -> None:
480 """Keep only the ``_QUARANTINE_KEEP_RECENT`` most-recent
481 ``.corrupt-*`` files for ``index_path`` and its ``.pkl``
482 companion. Sweeps the two sides independently — pairs share
483 the same ``-<ns>`` suffix so they're ordered identically.
485 Ordering uses the embedded ``-<ns>`` from the quarantine
486 filename, not ``st_mtime``: file systems with 1-2s timestamp
487 granularity can otherwise produce non-deterministic retention
488 on bursts.
490 Best-effort: logs and swallows any error so a sweep failure
491 never propagates back into the indexing path.
492 """
493 parent = index_path.parent
494 pkl_path = index_path.with_suffix(".pkl")
496 for base in (index_path, pkl_path):
497 pattern = f"{base.name}.corrupt-*"
498 try:
499 # Sort newest-first by the embedded -<ns>; everything
500 # past the keep window is stale.
501 candidates = sorted(
502 parent.glob(pattern),
503 key=self._corrupt_sort_key,
504 reverse=True,
505 )
506 except OSError:
507 logger.warning(
508 f"Failed to enumerate {pattern} in {parent} for "
509 f"quarantine retention sweep"
510 )
511 continue
513 for stale in candidates[_QUARANTINE_KEEP_RECENT:]:
514 try:
515 stale.unlink()
516 logger.info(f"Pruned old quarantined file: {stale}")
517 except OSError:
518 logger.warning(
519 f"Failed to prune quarantined file {stale}; continuing"
520 )
522 def _merge_and_persist_locked(
523 self,
524 index_path: Path,
525 chunks_to_add: list,
526 embedding_ids: list,
527 force_reindex: bool = False,
528 ) -> Dict[str, int]:
529 """Read-modify-write the on-disk FAISS index under the
530 ``(username, index_path)`` write lock so concurrent workers
531 don't lose each other's embeddings.
533 Without this, two workers indexing different documents into
534 the same collection both load on-disk state X into memory,
535 each calls ``add_documents`` on their own private FAISS object
536 (worker A: ``X+docA``, worker B: ``X+docB``), then they save
537 in sequence — last writer wins, the loser's chunks are gone
538 from the FAISS file. The chunks survive in the per-document
539 DB rows so a force-reindex rebuilds, but the index file is
540 wrong until then. See AI review of #4200.
542 The slow embedding + DB-chunk-insert path stays OUTSIDE this
543 lock (handled by the caller). The lock only covers the fast
544 FAISS reload→merge→save sequence (~10-100ms even for large
545 indices), so cross-document parallelism is preserved up to
546 the point where the in-memory states need reconciliation.
548 Args:
549 index_path: Path of the on-disk ``.faiss`` file.
550 chunks_to_add: Chunks the caller wants to add.
551 embedding_ids: Matching embedding/chunk IDs (same length).
552 force_reindex: If True, delete any IDs from
553 ``embedding_ids`` that already exist in the index
554 (so the caller's metadata wins). If False, skip IDs
555 that already exist (idempotent re-indexing).
557 Returns:
558 ``{"added": n, "skipped": m}`` for caller logging.
559 """
560 with _get_faiss_write_lock(self.username, str(index_path)):
561 # Reload from disk to absorb other writers' saves. If the
562 # file doesn't exist or fails verification, keep whatever
563 # the caller already had in ``self.faiss_index`` (likely
564 # a fresh in-memory index from load_or_create_faiss_index).
565 if index_path.exists():
566 verified, _reason = self.integrity_manager.verify_file(
567 index_path
568 )
569 if verified: 569 ↛ 592line 569 didn't jump to line 592 because the condition on line 569 was always true
570 try:
571 self.faiss_index = FAISS.load_local(
572 str(index_path.parent),
573 self.embedding_manager.embeddings,
574 index_name=index_path.stem,
575 allow_dangerous_deserialization=True,
576 normalize_L2=True,
577 )
578 except Exception:
579 # Reload failed (torn write, etc.). Keep
580 # in-memory state — it's stale but valid;
581 # better than losing this write entirely.
582 logger.warning(
583 "Failed to reload FAISS for merge; "
584 "proceeding with in-memory state."
585 )
587 # Force-reindex: remove old copies of IDs we're about to
588 # re-add, so updated metadata replaces stale. Dedup via
589 # set→list because ``embedding_ids`` can contain repeats
590 # (same chunk hash appearing twice in the document) —
591 # FAISS.delete with duplicate IDs raises.
592 if force_reindex and hasattr(self.faiss_index, "docstore"):
593 fresh_ids = set(self.faiss_index.docstore._dict.keys())
594 old_chunk_ids = list(
595 {eid for eid in embedding_ids if eid in fresh_ids}
596 )
597 if old_chunk_ids:
598 logger.info(
599 f"Force re-index: removing {len(old_chunk_ids)} "
600 f"existing chunks from FAISS"
601 )
602 self.faiss_index.delete(old_chunk_ids)
604 # Dedup against the freshly-loaded state.
605 if not force_reindex and hasattr(self.faiss_index, "docstore"):
606 fresh_ids = set(self.faiss_index.docstore._dict.keys())
607 else:
608 fresh_ids = None
610 new_chunks, new_ids = self._deduplicate_chunks(
611 chunks_to_add, embedding_ids, fresh_ids
612 )
614 if new_chunks:
615 self.faiss_index.add_documents(new_chunks, ids=new_ids)
617 self.faiss_index.save_local(
618 str(index_path.parent), index_name=index_path.stem
619 )
620 self.integrity_manager.record_file(
621 index_path,
622 related_entity_type="rag_index",
623 related_entity_id=self.rag_index_record.id,
624 )
626 return {
627 "added": len(new_chunks),
628 "skipped": len(chunks_to_add) - len(new_chunks),
629 "added_ids": new_ids,
630 }
632 def load_or_create_faiss_index(self, collection_id: str) -> FAISS:
633 """
634 Load existing FAISS index or create new one.
636 Args:
637 collection_id: UUID of the collection
639 Returns:
640 FAISS vector store instance
641 """
642 rag_index = self._get_or_create_rag_index(collection_id)
643 self.rag_index_record = rag_index
645 index_path = Path(rag_index.index_path)
647 # Hold the per-(username, index_path) write lock across the
648 # entire verify → quarantine → load sequence. A narrower scope
649 # leaves room for a concurrent save_local to race the
650 # verification (verify sees bytes A, save overwrites with bytes
651 # B, load_local reads bytes B which no longer match verified
652 # checksum). See #4197. The fresh-build path below this block
653 # is in-memory only and doesn't need the lock.
654 if index_path.exists():
655 load_lock = _get_faiss_write_lock(self.username, str(index_path))
656 with load_lock:
657 # Verify integrity before loading
658 verified, reason = self.integrity_manager.verify_file(
659 index_path
660 )
661 if not verified:
662 logger.error(
663 f"Integrity verification failed for {index_path}: "
664 f"{reason}. Quarantining for recovery; creating "
665 f"new index."
666 )
667 self._quarantine_corrupt_index(index_path, reason)
668 else:
669 try:
670 # Check for embedding dimension mismatch before loading
671 current_dim = len(
672 self.embedding_manager.embeddings.embed_query(
673 "dimension_check"
674 )
675 )
676 stored_dim = rag_index.embedding_dimension
678 if stored_dim and current_dim != stored_dim:
679 logger.warning(
680 f"Embedding dimension mismatch detected! "
681 f"Index created with dim={stored_dim}, "
682 f"current model returns dim={current_dim}. "
683 f"Deleting old index and rebuilding."
684 )
685 # Delete old index files (legitimate deletion:
686 # the bytes are unreadable with the new model).
687 try:
688 index_path.unlink()
689 pkl_path = index_path.with_suffix(".pkl")
690 if pkl_path.exists(): 690 ↛ 692line 690 didn't jump to line 692 because the condition on line 690 was always true
691 pkl_path.unlink()
692 logger.info(
693 f"Deleted old FAISS index files at {index_path}"
694 )
695 except Exception:
696 logger.exception(
697 "Failed to delete old index files"
698 )
700 # Update RAGIndex with new dimension and reset counts
701 with get_user_db_session(
702 self.username, self.db_password
703 ) as session:
704 idx = (
705 session.query(RAGIndex)
706 .filter_by(id=rag_index.id)
707 .first()
708 )
709 if idx: 709 ↛ 719line 709 didn't jump to line 719 because the condition on line 709 was always true
710 idx.embedding_dimension = current_dim
711 idx.chunk_count = 0
712 idx.total_documents = 0
713 session.commit()
714 logger.info(
715 f"Updated RAGIndex dimension to {current_dim}"
716 )
718 # Clear rag_document_status for this index
719 session.query(RagDocumentStatus).filter_by(
720 rag_index_id=rag_index.id
721 ).delete()
722 session.commit()
723 logger.info(
724 "Cleared indexed status for documents in this "
725 "collection"
726 )
728 # Update local reference for index creation below
729 rag_index.embedding_dimension = current_dim
730 # Fall through to create new index below
731 else:
732 # Dimensions match (or no stored dimension), load index
733 faiss_index = FAISS.load_local(
734 str(index_path.parent),
735 self.embedding_manager.embeddings,
736 index_name=index_path.stem,
737 allow_dangerous_deserialization=True,
738 normalize_L2=True,
739 )
740 logger.info(
741 f"Loaded existing FAISS index from {index_path}"
742 )
743 return faiss_index
744 except Exception:
745 # load_local raised (torn .pkl, malformed pickle,
746 # FAISS read failure). The .faiss bytes may still
747 # be recoverable, so quarantine before falling
748 # through to fresh build — don't just leave
749 # broken state on disk.
750 logger.warning(
751 "Failed to load FAISS index, quarantining "
752 "and creating new one"
753 )
754 if index_path.exists(): 754 ↛ 760line 754 didn't jump to line 760
755 self._quarantine_corrupt_index(
756 index_path, "load_local_raised"
757 )
759 # Create new FAISS index with configurable type and distance metric
760 logger.info(
761 f"Creating new FAISS index: type={self.index_type}, metric={self.distance_metric}, dimension={rag_index.embedding_dimension}"
762 )
764 # Create index based on type and distance metric
765 if self.index_type == "hnsw":
766 # HNSW: Fast approximate search, best for large collections
767 # M=32 is a good default for connections per layer
768 index = IndexHNSWFlat(rag_index.embedding_dimension, 32)
769 logger.info("Created HNSW index with M=32 connections")
770 elif self.index_type == "ivf":
771 # IVF requires training, for now fall back to flat
772 # TODO: Implement IVF with proper training
773 logger.warning(
774 "IVF index type not yet fully implemented, using Flat index"
775 )
776 if self.distance_metric in ("cosine", "dot_product"):
777 index = IndexFlatIP(rag_index.embedding_dimension)
778 else:
779 index = IndexFlatL2(rag_index.embedding_dimension)
780 else: # "flat" or default
781 # Flat index: Exact search
782 if self.distance_metric in ("cosine", "dot_product"):
783 # For cosine similarity, use inner product (IP) with normalized vectors
784 index = IndexFlatIP(rag_index.embedding_dimension)
785 logger.info(
786 "Created Flat index with Inner Product (for cosine similarity)"
787 )
788 else: # l2
789 index = IndexFlatL2(rag_index.embedding_dimension)
790 logger.info("Created Flat index with L2 distance")
792 faiss_index = FAISS(
793 self.embedding_manager.embeddings,
794 index=index,
795 docstore=InMemoryDocstore(), # Minimal - chunks in DB
796 index_to_docstore_id={},
797 normalize_L2=self.normalize_vectors, # Use configurable normalization
798 )
799 logger.info(
800 f"FAISS index created with normalization={self.normalize_vectors}"
801 )
802 return faiss_index
804 def get_current_index_info(
805 self, collection_id: Optional[str] = None
806 ) -> Optional[Dict[str, Any]]:
807 """
808 Get information about the current RAG index for a collection.
810 Args:
811 collection_id: UUID of collection (defaults to Library if None)
812 """
813 with get_user_db_session(self.username, self.db_password) as session:
814 # Get collection name in the format stored in RAGIndex (collection_<uuid>)
815 if collection_id:
816 collection = (
817 session.query(Collection)
818 .filter_by(id=collection_id)
819 .first()
820 )
821 collection_name = (
822 f"collection_{collection_id}" if collection else "unknown"
823 )
824 else:
825 # Default to Library collection
826 from ...database.library_init import get_default_library_id
828 collection_id = get_default_library_id(
829 self.username, self.db_password
830 )
831 collection_name = f"collection_{collection_id}"
833 rag_index = (
834 session.query(RAGIndex)
835 .filter_by(collection_name=collection_name, is_current=True)
836 .first()
837 )
839 if not rag_index:
840 # Debug: check all RAG indices for this collection
841 all_indices = session.query(RAGIndex).all()
842 logger.info(
843 f"No RAG index found for collection_name='{collection_name}'. All indices: {[(idx.collection_name, idx.is_current) for idx in all_indices]}"
844 )
845 return None
847 # Calculate actual counts from rag_document_status table
848 from ...database.models.library import RagDocumentStatus
850 actual_chunk_count = (
851 session.query(func.sum(RagDocumentStatus.chunk_count))
852 .filter_by(collection_id=collection_id)
853 .scalar()
854 or 0
855 )
857 actual_doc_count = (
858 session.query(RagDocumentStatus)
859 .filter_by(collection_id=collection_id)
860 .count()
861 )
863 return {
864 "embedding_model": rag_index.embedding_model,
865 "embedding_model_type": rag_index.embedding_model_type.value
866 if rag_index.embedding_model_type
867 else None,
868 "embedding_dimension": rag_index.embedding_dimension,
869 "chunk_size": rag_index.chunk_size,
870 "chunk_overlap": rag_index.chunk_overlap,
871 "chunk_count": actual_chunk_count,
872 "total_documents": actual_doc_count,
873 "created_at": rag_index.created_at.isoformat(),
874 "last_updated_at": rag_index.last_updated_at.isoformat(),
875 }
877 def index_document(
878 self, document_id: str, collection_id: str, force_reindex: bool = False
879 ) -> Dict[str, Any]:
880 """
881 Index a single document into RAG for a specific collection.
883 Args:
884 document_id: UUID of the Document to index
885 collection_id: UUID of the Collection to index for
886 force_reindex: Whether to force reindexing even if already indexed
888 Returns:
889 Dict with status, chunk_count, and any errors
890 """
891 with get_user_db_session(self.username, self.db_password) as session:
892 # Get the document
893 document = session.query(Document).filter_by(id=document_id).first()
895 if not document:
896 return {"status": "error", "error": "Document not found"}
898 # Get or create DocumentCollection entry
899 doc_collection = ensure_in_collection(
900 session, document_id, collection_id
901 )
903 # Check if already indexed for this collection
904 if doc_collection.indexed and not force_reindex:
905 return {
906 "status": "skipped",
907 "message": "Document already indexed for this collection",
908 "chunk_count": doc_collection.chunk_count,
909 }
911 # Validate text content
912 if not document.text_content:
913 return {
914 "status": "error",
915 "error": "Document has no text content",
916 }
918 try:
919 # Create LangChain Document from text
920 doc = LangchainDocument(
921 page_content=document.text_content,
922 metadata={
923 "source": document.original_url,
924 "document_id": document_id, # Add document ID for source linking
925 "collection_id": collection_id, # Add collection ID
926 "title": document.title
927 or document.filename
928 or "Untitled",
929 "document_title": document.title
930 or document.filename
931 or "Untitled", # Add for compatibility
932 "authors": document.authors,
933 "published_date": str(document.published_date)
934 if document.published_date
935 else None,
936 "doi": document.doi,
937 "arxiv_id": document.arxiv_id,
938 "pmid": document.pmid,
939 "pmcid": document.pmcid,
940 "extraction_method": document.extraction_method,
941 "word_count": document.word_count,
942 },
943 )
945 # Split into chunks
946 chunks = self.text_splitter.split_documents([doc])
947 logger.info(
948 f"Split document {document_id} into {len(chunks)} chunks"
949 )
951 # Get collection name for chunk storage
952 collection = (
953 session.query(Collection)
954 .filter_by(id=collection_id)
955 .first()
956 )
957 # Use collection_<uuid> format for internal storage
958 collection_name = (
959 f"collection_{collection_id}" if collection else "unknown"
960 )
962 # Store chunks in database using embedding manager
963 embedding_ids = self.embedding_manager._store_chunks_to_db(
964 chunks=chunks,
965 collection_name=collection_name,
966 source_type="document",
967 source_id=document_id,
968 )
970 # Load or create FAISS index (lazy; the merge step
971 # below will reload from disk under the lock anyway).
972 if self.faiss_index is None: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true
973 self.faiss_index = self.load_or_create_faiss_index(
974 collection_id
975 )
977 # Read-modify-write the on-disk FAISS index under
978 # the per-(user, index_path) lock. The lock spans
979 # reload + dedup + add + save so concurrent indexers
980 # of different documents into the same collection
981 # don't lose each other's embeddings (see AI review
982 # of #4200).
983 index_path = Path(self.rag_index_record.index_path)
984 unique_count = len(set(embedding_ids))
985 batch_dups = len(chunks) - unique_count
986 merge_stats = self._merge_and_persist_locked(
987 index_path,
988 chunks,
989 embedding_ids,
990 force_reindex=force_reindex,
991 )
992 if merge_stats["added"]: 992 ↛ 1006line 992 didn't jump to line 1006 because the condition on line 992 was always true
993 if force_reindex: 993 ↛ 999line 993 didn't jump to line 999 because the condition on line 993 was always true
994 logger.info(
995 f"Force re-index: added {merge_stats['added']} "
996 f"chunks with updated metadata to FAISS index"
997 )
998 else:
999 already_exist = unique_count - merge_stats["added"]
1000 logger.info(
1001 f"Added {merge_stats['added']} new embeddings to FAISS "
1002 f"({already_exist} already exist, "
1003 f"{batch_dups} batch duplicates removed)"
1004 )
1005 else:
1006 logger.info(
1007 f"All {len(chunks)} chunks already exist in FAISS index, skipping"
1008 )
1009 logger.info(
1010 f"Saved FAISS index to {index_path} with integrity tracking"
1011 )
1013 from datetime import datetime, UTC
1015 # Check if document was already indexed (for stats update)
1016 existing_status = (
1017 session.query(RagDocumentStatus)
1018 .filter_by(
1019 document_id=document_id, collection_id=collection_id
1020 )
1021 .first()
1022 )
1023 was_already_indexed = existing_status is not None
1025 # Mark document as indexed using rag_document_status table
1026 # Row existence = indexed, simple and clean
1027 timestamp = datetime.now(UTC)
1029 # Create or update RagDocumentStatus using ORM merge (atomic upsert)
1030 rag_status = RagDocumentStatus(
1031 document_id=document_id,
1032 collection_id=collection_id,
1033 rag_index_id=self.rag_index_record.id,
1034 chunk_count=len(chunks),
1035 indexed_at=timestamp,
1036 )
1037 session.merge(rag_status)
1039 logger.info(
1040 f"Marked document as indexed in rag_document_status: doc_id={document_id}, coll_id={collection_id}, chunks={len(chunks)}"
1041 )
1043 # Also update DocumentCollection table for backward compatibility
1044 session.query(DocumentCollection).filter_by(
1045 document_id=document_id, collection_id=collection_id
1046 ).update(
1047 {
1048 "indexed": True,
1049 "chunk_count": len(chunks),
1050 "last_indexed_at": timestamp,
1051 }
1052 )
1054 logger.info(
1055 "Also updated DocumentCollection.indexed for backward compatibility"
1056 )
1058 # Update RAGIndex statistics (only if not already indexed)
1059 rag_index_obj = (
1060 session.query(RAGIndex)
1061 .filter_by(id=self.rag_index_record.id)
1062 .first()
1063 )
1064 if rag_index_obj and not was_already_indexed:
1065 rag_index_obj.chunk_count += len(chunks)
1066 rag_index_obj.total_documents += 1
1067 rag_index_obj.last_updated_at = datetime.now(UTC)
1068 logger.info(
1069 f"Updated RAGIndex stats: chunk_count +{len(chunks)}, total_documents +1"
1070 )
1072 # Flush ORM changes to database before commit
1073 session.flush()
1074 logger.info(f"Flushed ORM changes for document {document_id}")
1076 # Commit the transaction. Durability is provided by
1077 # synchronous=NORMAL (sqlcipher_utils.py); SQLite
1078 # auto-checkpoints WAL at wal_autocheckpoint=250 frames.
1079 # An explicit PRAGMA wal_checkpoint(FULL) here used to
1080 # block other writers long enough to exhaust busy_timeout
1081 # under bulk-download concurrency (#4197).
1082 session.commit()
1084 logger.info(
1085 f"Successfully indexed document {document_id} for collection {collection_id} "
1086 f"with {len(chunks)} chunks"
1087 )
1089 return {
1090 "status": "success",
1091 "chunk_count": len(chunks),
1092 "embedding_ids": embedding_ids,
1093 }
1095 except Exception as e:
1096 # The session is shared (thread-local) with the caller.
1097 # If session.flush() or session.commit() raised, the session
1098 # is in PendingRollbackError state until rolled back —
1099 # leaving subsequent operations to cascade. Roll back BEFORE
1100 # returning the error dict so the caller sees a clean
1101 # session. (Same pattern as the #3827 fix.)
1102 safe_rollback(session, "library_rag_service.index_document")
1103 logger.exception(
1104 f"Error indexing document {document_id} for collection {collection_id}"
1105 )
1106 return {
1107 "status": "error",
1108 "error": f"Operation failed: {type(e).__name__}",
1109 }
1111 def index_all_documents(
1112 self,
1113 collection_id: str,
1114 force_reindex: bool = False,
1115 progress_callback=None,
1116 ) -> Dict[str, Any]:
1117 """
1118 Index all documents in a collection into RAG.
1120 Args:
1121 collection_id: UUID of the collection to index
1122 force_reindex: Whether to force reindexing already indexed documents
1123 progress_callback: Optional callback function called after each document with (current, total, doc_title, status)
1125 Returns:
1126 Dict with counts of successful, skipped, and failed documents
1127 """
1128 with get_user_db_session(self.username, self.db_password) as session:
1129 # Get all DocumentCollection entries for this collection
1130 query = session.query(DocumentCollection).filter_by(
1131 collection_id=collection_id
1132 )
1134 if not force_reindex:
1135 # Only index documents that haven't been indexed yet
1136 query = query.filter_by(indexed=False)
1138 doc_collections = query.all()
1140 if not doc_collections:
1141 return {
1142 "status": "info",
1143 "message": "No documents to index",
1144 "successful": 0,
1145 "skipped": 0,
1146 "failed": 0,
1147 }
1149 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []}
1150 total = len(doc_collections)
1152 for idx, doc_collection in enumerate(doc_collections, 1):
1153 # Get the document for title info
1154 document = (
1155 session.query(Document)
1156 .filter_by(id=doc_collection.document_id)
1157 .first()
1158 )
1159 title = document.title if document else "Unknown"
1161 result = self.index_document(
1162 doc_collection.document_id, collection_id, force_reindex
1163 )
1165 if result["status"] == "success":
1166 results["successful"] += 1
1167 elif result["status"] == "skipped":
1168 results["skipped"] += 1
1169 else:
1170 results["failed"] += 1
1171 results["errors"].append(
1172 {
1173 "doc_id": doc_collection.document_id,
1174 "title": title,
1175 "error": result.get("error"),
1176 }
1177 )
1179 # Call progress callback if provided
1180 if progress_callback:
1181 progress_callback(idx, total, title, result["status"])
1183 logger.info(
1184 f"Indexed collection {collection_id}: "
1185 f"{results['successful']} successful, "
1186 f"{results['skipped']} skipped, "
1187 f"{results['failed']} failed"
1188 )
1190 return results
1192 def remove_document_from_rag(
1193 self, document_id: str, collection_id: str
1194 ) -> Dict[str, Any]:
1195 """
1196 Remove a document's chunks from RAG for a specific collection.
1198 Args:
1199 document_id: UUID of the Document to remove
1200 collection_id: UUID of the Collection to remove from
1202 Returns:
1203 Dict with status and count of removed chunks
1204 """
1205 with get_user_db_session(self.username, self.db_password) as session:
1206 # Get the DocumentCollection entry
1207 doc_collection = (
1208 session.query(DocumentCollection)
1209 .filter_by(document_id=document_id, collection_id=collection_id)
1210 .first()
1211 )
1213 if not doc_collection:
1214 return {
1215 "status": "error",
1216 "error": "Document not found in collection",
1217 }
1219 try:
1220 # Get collection name in the format collection_<uuid>
1221 collection = (
1222 session.query(Collection)
1223 .filter_by(id=collection_id)
1224 .first()
1225 )
1226 # Use collection_<uuid> format for internal storage
1227 collection_name = (
1228 f"collection_{collection_id}" if collection else "unknown"
1229 )
1231 # Delete chunks from database
1232 deleted_count = self.embedding_manager._delete_chunks_from_db(
1233 collection_name=collection_name,
1234 source_id=document_id,
1235 )
1237 # Update DocumentCollection RAG status
1238 doc_collection.indexed = False
1239 doc_collection.chunk_count = 0
1240 doc_collection.last_indexed_at = None
1241 session.commit()
1243 logger.info(
1244 f"Removed {deleted_count} chunks for document {document_id} from collection {collection_id}"
1245 )
1247 return {"status": "success", "deleted_count": deleted_count}
1249 except Exception as e:
1250 # session.commit() above can raise; without rollback the
1251 # shared thread-local session stays poisoned for the
1252 # caller's next operation (issue #3827 pattern).
1253 safe_rollback(
1254 session, "library_rag_service.remove_document_from_rag"
1255 )
1256 logger.exception(
1257 f"Error removing document {document_id} from collection {collection_id}"
1258 )
1259 return {
1260 "status": "error",
1261 "error": f"Operation failed: {type(e).__name__}",
1262 }
1264 def index_documents_batch(
1265 self,
1266 doc_info: List[tuple],
1267 collection_id: str,
1268 force_reindex: bool = False,
1269 ) -> Dict[str, Dict[str, Any]]:
1270 """
1271 Index multiple documents in a batch for a specific collection.
1273 Args:
1274 doc_info: List of (doc_id, title) tuples
1275 collection_id: UUID of the collection to index for
1276 force_reindex: Whether to force reindexing even if already indexed
1278 Returns:
1279 Dict mapping doc_id to individual result
1280 """
1281 results = {}
1282 doc_ids = [doc_id for doc_id, _ in doc_info]
1284 # Use single database session for querying
1285 with get_user_db_session(self.username, self.db_password) as session:
1286 # Pre-load all documents for this batch
1287 documents = (
1288 session.query(Document).filter(Document.id.in_(doc_ids)).all()
1289 )
1291 # Create lookup for quick access
1292 doc_lookup = {doc.id: doc for doc in documents}
1294 # Pre-load DocumentCollection entries
1295 doc_collections = (
1296 session.query(DocumentCollection)
1297 .filter(
1298 DocumentCollection.document_id.in_(doc_ids),
1299 DocumentCollection.collection_id == collection_id,
1300 )
1301 .all()
1302 )
1303 doc_collection_lookup = {
1304 dc.document_id: dc for dc in doc_collections
1305 }
1307 # Process each document in the batch
1308 for doc_id, title in doc_info:
1309 document = doc_lookup.get(doc_id)
1311 if not document:
1312 results[doc_id] = {
1313 "status": "error",
1314 "error": "Document not found",
1315 }
1316 continue
1318 # Check if already indexed via DocumentCollection
1319 doc_collection = doc_collection_lookup.get(doc_id)
1320 if (
1321 doc_collection
1322 and doc_collection.indexed
1323 and not force_reindex
1324 ):
1325 results[doc_id] = {
1326 "status": "skipped",
1327 "message": "Document already indexed for this collection",
1328 "chunk_count": doc_collection.chunk_count,
1329 }
1330 continue
1332 # Validate text content
1333 if not document.text_content:
1334 results[doc_id] = {
1335 "status": "error",
1336 "error": "Document has no text content",
1337 }
1338 continue
1340 # Index the document
1341 try:
1342 result = self.index_document(
1343 doc_id, collection_id, force_reindex
1344 )
1345 results[doc_id] = result
1346 except Exception as e:
1347 logger.exception(
1348 f"Error indexing document {doc_id} in batch"
1349 )
1350 results[doc_id] = {
1351 "status": "error",
1352 "error": f"Indexing failed: {type(e).__name__}",
1353 }
1355 return results
1357 def get_rag_stats(
1358 self, collection_id: Optional[str] = None
1359 ) -> Dict[str, Any]:
1360 """
1361 Get RAG statistics for a collection.
1363 Args:
1364 collection_id: UUID of the collection (defaults to Library)
1366 Returns:
1367 Dict with counts and metadata about indexed documents
1368 """
1369 with get_user_db_session(self.username, self.db_password) as session:
1370 # Get collection ID (default to Library)
1371 if not collection_id: 1371 ↛ 1372line 1371 didn't jump to line 1372 because the condition on line 1371 was never true
1372 from ...database.library_init import get_default_library_id
1374 collection_id = get_default_library_id(
1375 self.username, self.db_password
1376 )
1378 # Count total documents in collection
1379 total_docs = (
1380 session.query(DocumentCollection)
1381 .filter_by(collection_id=collection_id)
1382 .count()
1383 )
1385 # Count indexed documents from rag_document_status table
1386 from ...database.models.library import RagDocumentStatus
1388 indexed_docs = (
1389 session.query(RagDocumentStatus)
1390 .filter_by(collection_id=collection_id)
1391 .count()
1392 )
1394 # Count total chunks from rag_document_status table
1395 total_chunks = (
1396 session.query(func.sum(RagDocumentStatus.chunk_count))
1397 .filter_by(collection_id=collection_id)
1398 .scalar()
1399 or 0
1400 )
1402 # Get collection name in the format stored in DocumentChunk (collection_<uuid>)
1403 collection = (
1404 session.query(Collection).filter_by(id=collection_id).first()
1405 )
1406 collection_name = (
1407 f"collection_{collection_id}" if collection else "library"
1408 )
1410 # Get embedding model info from chunks
1411 chunk_sample = (
1412 session.query(DocumentChunk)
1413 .filter_by(collection_name=collection_name)
1414 .first()
1415 )
1417 embedding_info = {}
1418 if chunk_sample:
1419 embedding_info = {
1420 "model": chunk_sample.embedding_model,
1421 "model_type": chunk_sample.embedding_model_type.value
1422 if chunk_sample.embedding_model_type
1423 else None,
1424 "dimension": chunk_sample.embedding_dimension,
1425 }
1427 return {
1428 "total_documents": total_docs,
1429 "indexed_documents": indexed_docs,
1430 "unindexed_documents": total_docs - indexed_docs,
1431 "total_chunks": total_chunks,
1432 "embedding_info": embedding_info,
1433 "chunk_size": self.chunk_size,
1434 "chunk_overlap": self.chunk_overlap,
1435 }
1437 def index_local_file(self, file_path: str) -> Dict[str, Any]:
1438 """
1439 Index a local file from the filesystem into RAG.
1441 Args:
1442 file_path: Path to the file to index
1444 Returns:
1445 Dict with status, chunk_count, and any errors
1446 """
1447 from pathlib import Path
1448 import mimetypes
1450 file_path = Path(file_path)
1452 if not file_path.exists():
1453 return {"status": "error", "error": f"File not found: {file_path}"}
1455 if not file_path.is_file():
1456 return {"status": "error", "error": f"Not a file: {file_path}"}
1458 # Determine file type
1459 mime_type, _ = mimetypes.guess_type(str(file_path))
1461 # Read file content based on type
1462 try:
1463 if file_path.suffix.lower() in [".txt", ".md", ".markdown"]:
1464 # Text files
1465 with open(file_path, "r", encoding="utf-8") as f:
1466 content = f.read()
1467 elif file_path.suffix.lower() in [".html", ".htm"]:
1468 # HTML files - strip tags
1469 from bs4 import BeautifulSoup
1471 with open(file_path, "r", encoding="utf-8") as f:
1472 soup = BeautifulSoup(f.read(), "html.parser")
1473 content = soup.get_text()
1474 elif file_path.suffix.lower() == ".pdf":
1475 # PDF files - extract text
1476 from pypdf import PdfReader
1478 content = ""
1479 with open(file_path, "rb") as f:
1480 pdf_reader = PdfReader(f)
1481 for page in pdf_reader.pages:
1482 content += page.extract_text()
1483 else:
1484 return {
1485 "status": "skipped",
1486 "error": f"Unsupported file type: {file_path.suffix}",
1487 }
1489 if not content or len(content.strip()) < 10:
1490 return {
1491 "status": "error",
1492 "error": "File has no extractable text content",
1493 }
1495 # Create LangChain Document from text
1496 doc = LangchainDocument(
1497 page_content=content,
1498 metadata={
1499 "source": str(file_path),
1500 "source_id": f"local_{file_path.stem}_{hash(str(file_path))}",
1501 "title": file_path.stem,
1502 "document_title": file_path.stem,
1503 "file_type": file_path.suffix.lower(),
1504 "file_size": file_path.stat().st_size,
1505 "source_type": "local_file",
1506 "collection": "local_library",
1507 },
1508 )
1510 # Split into chunks
1511 chunks = self.text_splitter.split_documents([doc])
1512 logger.info(
1513 f"Split local file {file_path} into {len(chunks)} chunks"
1514 )
1516 # Store chunks in database (returns UUID-based IDs)
1517 embedding_ids = self.embedding_manager._store_chunks_to_db(
1518 chunks=chunks,
1519 collection_name="local_library",
1520 source_type="local_file",
1521 source_id=str(file_path),
1522 )
1524 # Load or create FAISS index using default library collection
1525 # (lazy; merge step below reloads under the lock anyway).
1526 if self.faiss_index is None:
1527 from ...database.library_init import get_default_library_id
1529 default_collection_id = get_default_library_id(
1530 self.username, self.db_password
1531 )
1532 self.faiss_index = self.load_or_create_faiss_index(
1533 default_collection_id
1534 )
1536 # Read-modify-write the on-disk FAISS under the lock.
1537 index_path = (
1538 Path(self.rag_index_record.index_path)
1539 if self.rag_index_record
1540 else None
1541 )
1542 if index_path:
1543 merge_stats = self._merge_and_persist_locked(
1544 index_path, chunks, embedding_ids, force_reindex=False
1545 )
1546 logger.info(
1547 f"Saved FAISS index to {index_path} with integrity tracking"
1548 )
1549 else:
1550 # No persistent index path — in-memory only path.
1551 # Preserve old behavior: dedup against in-memory state
1552 # and add directly without saving.
1553 if self.faiss_index is not None and hasattr( 1553 ↛ 1558line 1553 didn't jump to line 1558 because the condition on line 1553 was always true
1554 self.faiss_index, "docstore"
1555 ):
1556 existing_ids = set(self.faiss_index.docstore._dict.keys())
1557 else:
1558 existing_ids = None
1559 new_chunks, new_ids = self._deduplicate_chunks(
1560 chunks, embedding_ids, existing_ids
1561 )
1562 if new_chunks: 1562 ↛ 1564line 1562 didn't jump to line 1564 because the condition on line 1562 was always true
1563 self.faiss_index.add_documents(new_chunks, ids=new_ids)
1564 merge_stats = {
1565 "added": len(new_chunks),
1566 "skipped": len(chunks) - len(new_chunks),
1567 "added_ids": new_ids,
1568 }
1570 logger.info(
1571 f"Successfully indexed local file {file_path} with "
1572 f"{merge_stats['added']} new chunks "
1573 f"({merge_stats['skipped']} skipped)"
1574 )
1576 return {
1577 "status": "success",
1578 "chunk_count": merge_stats["added"],
1579 "embedding_ids": merge_stats["added_ids"],
1580 }
1582 except Exception as e:
1583 logger.exception(f"Error indexing local file {file_path}")
1584 return {
1585 "status": "error",
1586 "error": f"Operation failed: {type(e).__name__}",
1587 }
1589 def index_user_document(
1590 self, user_doc, collection_name: str, force_reindex: bool = False
1591 ) -> Dict[str, Any]:
1592 """
1593 Index a user-uploaded document into a specific collection.
1595 Args:
1596 user_doc: UserDocument object
1597 collection_name: Name of the collection (e.g., "collection_123")
1598 force_reindex: Whether to force reindexing
1600 Returns:
1601 Dict with status, chunk_count, and any errors
1602 """
1604 try:
1605 # Use the pre-extracted text content
1606 content = user_doc.text_content
1608 if not content or len(content.strip()) < 10:
1609 return {
1610 "status": "error",
1611 "error": "Document has no extractable text content",
1612 }
1614 # Create LangChain Document
1615 doc = LangchainDocument(
1616 page_content=content,
1617 metadata={
1618 "source": f"user_upload_{user_doc.id}",
1619 "source_id": user_doc.id,
1620 "title": user_doc.filename,
1621 "document_title": user_doc.filename,
1622 "file_type": user_doc.file_type,
1623 "file_size": user_doc.file_size,
1624 "collection": collection_name,
1625 },
1626 )
1628 # Split into chunks
1629 chunks = self.text_splitter.split_documents([doc])
1630 logger.info(
1631 f"Split user document {user_doc.filename} into {len(chunks)} chunks"
1632 )
1634 # Store chunks in database
1635 embedding_ids = self.embedding_manager._store_chunks_to_db(
1636 chunks=chunks,
1637 collection_name=collection_name,
1638 source_type="user_document",
1639 source_id=user_doc.id,
1640 )
1642 # Load or create FAISS index for this collection (lazy;
1643 # merge step below reloads under the lock anyway).
1644 if self.faiss_index is None:
1645 # Extract collection_id from collection_name (format: "collection_<uuid>")
1646 collection_id = collection_name.removeprefix("collection_")
1647 self.faiss_index = self.load_or_create_faiss_index(
1648 collection_id
1649 )
1651 unique_count = len(set(embedding_ids))
1652 batch_dups = len(chunks) - unique_count
1654 # Read-modify-write the on-disk FAISS under the lock so
1655 # concurrent uploads to the same collection don't lose
1656 # each other's chunks.
1657 index_path = (
1658 Path(self.rag_index_record.index_path)
1659 if self.rag_index_record
1660 else None
1661 )
1662 if index_path:
1663 merge_stats = self._merge_and_persist_locked(
1664 index_path,
1665 chunks,
1666 embedding_ids,
1667 force_reindex=force_reindex,
1668 )
1669 else:
1670 # No persistent index path — in-memory only path.
1671 # Preserve old behavior: handle force_reindex deletion
1672 # and dedup add against in-memory state without saving.
1673 if force_reindex and hasattr(self.faiss_index, "docstore"):
1674 existing_ids = set(self.faiss_index.docstore._dict.keys())
1675 old_chunk_ids = list(
1676 {eid for eid in embedding_ids if eid in existing_ids}
1677 )
1678 if old_chunk_ids: 1678 ↛ 1684line 1678 didn't jump to line 1684 because the condition on line 1678 was always true
1679 logger.info(
1680 f"Force re-index: removing {len(old_chunk_ids)} "
1681 f"existing chunks from FAISS"
1682 )
1683 self.faiss_index.delete(old_chunk_ids)
1684 if not force_reindex and hasattr(self.faiss_index, "docstore"):
1685 existing_ids = set(self.faiss_index.docstore._dict.keys())
1686 else:
1687 existing_ids = None
1688 new_chunks, new_ids = self._deduplicate_chunks(
1689 chunks, embedding_ids, existing_ids
1690 )
1691 if new_chunks: 1691 ↛ 1693line 1691 didn't jump to line 1693 because the condition on line 1691 was always true
1692 self.faiss_index.add_documents(new_chunks, ids=new_ids)
1693 merge_stats = {
1694 "added": len(new_chunks),
1695 "skipped": len(chunks) - len(new_chunks),
1696 "added_ids": new_ids,
1697 }
1698 if merge_stats["added"]: 1698 ↛ 1712line 1698 didn't jump to line 1712 because the condition on line 1698 was always true
1699 if force_reindex:
1700 logger.info(
1701 f"Force re-index: added {merge_stats['added']} "
1702 f"chunks with updated metadata to FAISS index"
1703 )
1704 else:
1705 already_exist = unique_count - merge_stats["added"]
1706 logger.info(
1707 f"Added {merge_stats['added']} new chunks to FAISS "
1708 f"({already_exist} already exist, "
1709 f"{batch_dups} batch duplicates removed)"
1710 )
1711 else:
1712 logger.info(
1713 f"All {len(chunks)} chunks already exist in FAISS index, skipping"
1714 )
1716 logger.info(
1717 f"Successfully indexed user document {user_doc.filename} with {len(chunks)} chunks"
1718 )
1720 return {
1721 "status": "success",
1722 "chunk_count": len(chunks),
1723 "embedding_ids": embedding_ids,
1724 }
1726 except Exception as e:
1727 logger.exception(
1728 f"Error indexing user document {user_doc.filename}"
1729 )
1730 return {
1731 "status": "error",
1732 "error": f"Operation failed: {type(e).__name__}",
1733 }
1735 def remove_collection_from_index(
1736 self, collection_name: str
1737 ) -> Dict[str, Any]:
1738 """
1739 Remove all documents from a collection from the FAISS index.
1741 Args:
1742 collection_name: Name of the collection (e.g., "collection_123")
1744 Returns:
1745 Dict with status and count of removed chunks
1746 """
1747 from ...database.models import DocumentChunk
1748 from ...database.session_context import get_user_db_session
1750 try:
1751 with get_user_db_session(
1752 self.username, self.db_password
1753 ) as session:
1754 # Get all chunk IDs for this collection
1755 chunks = (
1756 session.query(DocumentChunk)
1757 .filter_by(collection_name=collection_name)
1758 .all()
1759 )
1761 if not chunks:
1762 return {"status": "success", "deleted_count": 0}
1764 chunk_ids = [
1765 f"{collection_name}_{chunk.id}" for chunk in chunks
1766 ]
1768 # Load FAISS index if not already loaded
1769 if self.faiss_index is None:
1770 # Extract collection_id from collection_name (format: "collection_<uuid>")
1771 collection_id = collection_name.removeprefix("collection_")
1772 self.faiss_index = self.load_or_create_faiss_index(
1773 collection_id
1774 )
1776 # Remove from FAISS index. delete + save + record must all
1777 # be inside the same lock — otherwise a concurrent writer
1778 # could sandwich a save_local between our delete and our
1779 # save, leaving stale chunks back on disk (#4197).
1780 if hasattr(self.faiss_index, "delete"): 1780 ↛ 1807line 1780 didn't jump to line 1807 because the condition on line 1780 was always true
1781 try:
1782 index_path = (
1783 Path(self.rag_index_record.index_path)
1784 if self.rag_index_record
1785 else None
1786 )
1787 if index_path:
1788 with _get_faiss_write_lock(
1789 self.username, str(index_path)
1790 ):
1791 self.faiss_index.delete(chunk_ids)
1792 self.faiss_index.save_local(
1793 str(index_path.parent),
1794 index_name=index_path.stem,
1795 )
1796 self.integrity_manager.record_file(
1797 index_path,
1798 related_entity_type="rag_index",
1799 related_entity_id=self.rag_index_record.id,
1800 )
1801 else:
1802 # No index path → in-memory-only delete
1803 self.faiss_index.delete(chunk_ids)
1804 except Exception:
1805 logger.warning("Could not delete chunks from FAISS")
1807 logger.info(
1808 f"Removed {len(chunk_ids)} chunks from collection {collection_name}"
1809 )
1811 return {"status": "success", "deleted_count": len(chunk_ids)}
1813 except Exception as e:
1814 logger.exception(
1815 f"Error removing collection {collection_name} from index"
1816 )
1817 return {
1818 "status": "error",
1819 "error": f"Operation failed: {type(e).__name__}",
1820 }