Coverage for src/local_deep_research/research_library/services/library_rag_service.py: 95%

549 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Library RAG Service 

3 

4Handles indexing and searching library documents using RAG: 

5- Index text documents into vector database 

6- Chunk documents for semantic search 

7- Generate embeddings using local models 

8- Manage FAISS indices per research 

9- Track RAG status in library 

10""" 

11 

12import threading 

13import time 

14from pathlib import Path 

15from typing import Any, Dict, List, Optional, Tuple 

16 

17from langchain_core.documents import Document as LangchainDocument 

18from loguru import logger 

19from sqlalchemy import func 

20 

21from ...config.paths import get_cache_directory 

22from ...database.models.library import ( 

23 Document, 

24 DocumentChunk, 

25 DocumentCollection, 

26 Collection, 

27 RAGIndex, 

28 RagDocumentStatus, 

29 EmbeddingProvider, 

30) 

31from ...database.session_context import get_user_db_session, safe_rollback 

32from ...utilities.type_utils import to_bool 

33from ..utils import ensure_in_collection 

34from ...embeddings.splitters import get_text_splitter 

35from ...web_search_engines.engines.local_embedding_manager import ( 

36 LocalEmbeddingManager, 

37) 

38from ...security.file_integrity import FileIntegrityManager, FAISSIndexVerifier 

39import hashlib 

40from faiss import IndexFlatL2, IndexFlatIP, IndexHNSWFlat 

41from langchain_community.vectorstores import FAISS 

42from langchain_community.docstore.in_memory import InMemoryDocstore 

43 

44 

45# Module-level locks serialise the FAISS save+record critical section and the 

46# load_or_create verify→quarantine→build sequence per (username, index_path). 

47# MUST be module-level: each auto-index / scheduler / search worker constructs 

48# its own LibraryRAGService, so an instance-scoped lock would coordinate 

49# nothing. Pattern is adapted from web/queue/processor_v2._user_critical_locks 

50# — instance-scoped→module-scoped, username-only-key→(username, path)-key. 

51# See #4197 for the race this guards (concurrent save_local interleaves bytes, 

52# producing checksum_mismatch → destructive unlink, lost data). 

53_faiss_write_locks: Dict[Tuple[str, str], threading.Lock] = {} 

54_faiss_write_locks_lock = threading.Lock() 

55 

56# Hard cap on suffix-increment retries when generating the .corrupt-<ns> path. 

57# Normal case is one attempt — same-ns collisions only happen if the user 

58# manually created such files. 32 is a safety bound that converts a deadlock 

59# into a loud OSError. 

60_QUARANTINE_SUFFIX_RETRY_CAP = 32 

61 

62# After a successful quarantine, keep at most this many older .corrupt-* 

63# files per base path (per side: .faiss.corrupt-* and .pkl.corrupt-* are 

64# counted independently). Prevents unbounded disk growth on systems that 

65# experience recurring corruption while preserving recent diagnostic 

66# artefacts. Keeping 5 means the user has the last 5 corruption events 

67# to inspect / submit with a bug report; anything older is dropped. 

68_QUARANTINE_KEEP_RECENT = 5 

69 

70 

71def _get_faiss_write_lock(username: str, index_path: str) -> threading.Lock: 

72 """Return the lock for ``(username, index_path)``, creating it on first 

73 access. Key is normalised via ``Path.resolve()`` to match 

74 ``FileIntegrityManager._normalize_path`` so writers/readers/quarantine 

75 all agree on identity. 

76 """ 

77 key = (username, str(Path(index_path).resolve())) 

78 with _faiss_write_locks_lock: 

79 lock = _faiss_write_locks.get(key) 

80 if lock is None: 

81 lock = threading.Lock() 

82 _faiss_write_locks[key] = lock 

83 return lock 

84 

85 

86def pop_faiss_locks_for_user(username: str) -> None: 

87 """Remove all FAISS-write locks belonging to ``username``. 

88 

89 Called from the user-close paths (connection_cleanup) so the dict 

90 doesn't grow one entry per (user × collection) across the process 

91 lifetime. Safe to call while a lock is held: the holder keeps using 

92 its local reference, and the next access lazily creates a fresh 

93 lock. Same semantics as ``pop_user_critical_lock`` in 

94 web/queue/processor_v2.py. 

95 """ 

96 with _faiss_write_locks_lock: 

97 stale = [k for k in _faiss_write_locks if k[0] == username] 

98 for k in stale: 

99 _faiss_write_locks.pop(k, None) 

100 

101 

102class LibraryRAGService: 

103 """Service for managing RAG indexing of library documents.""" 

104 

105 def __init__( 

106 self, 

107 username: str, 

108 embedding_model: str = "all-MiniLM-L6-v2", 

109 embedding_provider: str = "sentence_transformers", 

110 chunk_size: int = 1000, 

111 chunk_overlap: int = 200, 

112 splitter_type: str = "recursive", 

113 text_separators: Optional[list] = None, 

114 distance_metric: str = "cosine", 

115 normalize_vectors: bool = True, 

116 index_type: str = "flat", 

117 embedding_manager: Optional["LocalEmbeddingManager"] = None, 

118 db_password: Optional[str] = None, 

119 ): 

120 """ 

121 Initialize library RAG service for a user. 

122 

123 Args: 

124 username: Username for database access 

125 embedding_model: Name of the embedding model to use 

126 embedding_provider: Provider type ('sentence_transformers' or 'ollama') 

127 chunk_size: Size of text chunks for splitting 

128 chunk_overlap: Overlap between consecutive chunks 

129 splitter_type: Type of splitter ('recursive', 'token', 'sentence', 'semantic') 

130 text_separators: List of text separators for chunking (default: ["\n\n", "\n", ". ", " ", ""]) 

131 distance_metric: Distance metric ('cosine', 'l2', or 'dot_product') 

132 normalize_vectors: Whether to normalize vectors with L2 

133 index_type: FAISS index type ('flat', 'hnsw', or 'ivf') 

134 embedding_manager: Optional pre-constructed LocalEmbeddingManager for testing/flexibility 

135 db_password: Optional database password for background thread access 

136 """ 

137 self.username = username 

138 self._db_password = db_password # Can be used for thread access 

139 # Initialize optional attributes to None before they're set below 

140 # This allows the db_password setter to check them without hasattr 

141 self.embedding_manager = None 

142 self.integrity_manager = None 

143 self.embedding_model = embedding_model 

144 self.embedding_provider = embedding_provider 

145 self.chunk_size = chunk_size 

146 self.chunk_overlap = chunk_overlap 

147 self.splitter_type = splitter_type 

148 self.text_separators = ( 

149 text_separators 

150 if text_separators is not None 

151 else ["\n\n", "\n", ". ", " ", ""] 

152 ) 

153 self.distance_metric = distance_metric 

154 # Ensure normalize_vectors is always a proper boolean 

155 self.normalize_vectors = to_bool(normalize_vectors, default=True) 

156 self.index_type = index_type 

157 

158 # Emit the active configuration so users can confirm their 

159 # UI-configured embedding settings are being honored (regression 

160 # signal for #3453). 

161 logger.info( 

162 f"RAG service initialized for user={username}: " 

163 f"provider={embedding_provider} model={embedding_model} " 

164 f"chunk_size={chunk_size} chunk_overlap={chunk_overlap} " 

165 f"splitter={splitter_type} index_type={index_type}" 

166 ) 

167 

168 # Use provided embedding manager or create a new one 

169 # (Must be created before text splitter for semantic chunking) 

170 # Track ownership so close() only tears down the manager when we 

171 # constructed it — a caller-supplied manager stays under caller 

172 # control (test fixtures, multi-service callers reusing one manager). 

173 self._owns_embedding_manager = embedding_manager is None 

174 if embedding_manager is not None: 

175 self.embedding_manager = embedding_manager 

176 else: 

177 # Initialize embedding manager with library collection 

178 # Load the complete user settings snapshot from database using the proper method 

179 from ...settings.manager import SettingsManager 

180 

181 # Use proper database session for SettingsManager 

182 # Note: using _db_password (backing field) directly here because the 

183 # db_password property setter propagates to embedding_manager/integrity_manager, 

184 # which are still None at this point in __init__. 

185 with get_user_db_session(username, self._db_password) as session: 

186 settings_manager = SettingsManager(session) 

187 settings_snapshot = settings_manager.get_settings_snapshot() 

188 

189 # Add the specific settings needed for this RAG service 

190 settings_snapshot.update( 

191 { 

192 "_username": username, 

193 "embeddings.provider": embedding_provider, 

194 f"embeddings.{embedding_provider}.model": embedding_model, 

195 "local_search_chunk_size": chunk_size, 

196 "local_search_chunk_overlap": chunk_overlap, 

197 } 

198 ) 

199 

200 self.embedding_manager = LocalEmbeddingManager( 

201 embedding_model=embedding_model, 

202 embedding_model_type=embedding_provider, 

203 settings_snapshot=settings_snapshot, 

204 ) 

205 

206 # Initialize text splitter based on type 

207 # (Must be created AFTER embedding_manager for semantic chunking) 

208 self.text_splitter = get_text_splitter( 

209 splitter_type=self.splitter_type, 

210 chunk_size=self.chunk_size, 

211 chunk_overlap=self.chunk_overlap, 

212 text_separators=self.text_separators, 

213 embeddings=self.embedding_manager.embeddings 

214 if self.splitter_type == "semantic" 

215 else None, 

216 ) 

217 

218 # Initialize or load FAISS index for library collection 

219 self.faiss_index = None 

220 self.rag_index_record = None 

221 

222 # Initialize file integrity manager for FAISS indexes 

223 self.integrity_manager = FileIntegrityManager( 

224 username, password=self._db_password 

225 ) 

226 self.integrity_manager.register_verifier(FAISSIndexVerifier()) 

227 

228 self._closed = False 

229 

230 def close(self): 

231 """Release embedding model and index resources.""" 

232 if self._closed: 

233 return 

234 self._closed = True 

235 

236 # Release embedding manager (which in turn closes the underlying 

237 # OllamaEmbeddings httpx clients — see LocalEmbeddingManager.close). 

238 # Only when we own it; caller-supplied managers stay under caller 

239 # control to avoid double-close / use-after-close. 

240 if self.embedding_manager is not None: 

241 if self._owns_embedding_manager: 

242 self.embedding_manager.close() 

243 self.embedding_manager = None 

244 

245 # Clear FAISS index 

246 if self.faiss_index is not None: 

247 self.faiss_index = None 

248 

249 # Clear other resources 

250 self.rag_index_record = None 

251 self.integrity_manager = None 

252 self.text_splitter = None 

253 

254 def __enter__(self): 

255 """Enter context manager.""" 

256 return self 

257 

258 def __exit__(self, exc_type, exc_val, exc_tb): 

259 """Exit context manager, ensuring cleanup.""" 

260 self.close() 

261 return False 

262 

263 @property 

264 def db_password(self): 

265 """Get database password.""" 

266 return self._db_password 

267 

268 @db_password.setter 

269 def db_password(self, value): 

270 """Set database password and propagate to embedding manager and integrity manager.""" 

271 self._db_password = value 

272 if self.embedding_manager: 

273 self.embedding_manager.db_password = value 

274 if self.integrity_manager: 

275 self.integrity_manager.password = value 

276 

277 def _get_index_hash( 

278 self, 

279 collection_name: str, 

280 embedding_model: str, 

281 embedding_model_type: str, 

282 ) -> str: 

283 """Generate hash for index identification.""" 

284 hash_input = ( 

285 f"{collection_name}:{embedding_model}:{embedding_model_type}" 

286 ) 

287 return hashlib.sha256(hash_input.encode()).hexdigest() 

288 

289 def _get_index_path(self, index_hash: str) -> Path: 

290 """Get path for FAISS index file.""" 

291 # Store in centralized cache directory (respects LDR_DATA_DIR) 

292 cache_dir = get_cache_directory() / "rag_indices" 

293 cache_dir.mkdir(parents=True, exist_ok=True) 

294 return cache_dir / f"{index_hash}.faiss" 

295 

296 @staticmethod 

297 def _deduplicate_chunks( 

298 chunks: List[LangchainDocument], 

299 chunk_ids: List[str], 

300 existing_ids: Optional[set] = None, 

301 ) -> Tuple[List[LangchainDocument], List[str]]: 

302 """Deduplicate chunks by ID within a batch, optionally excluding existing IDs.""" 

303 seen_ids: set = set() 

304 new_chunks: List[LangchainDocument] = [] 

305 new_ids: List[str] = [] 

306 for chunk, chunk_id in zip(chunks, chunk_ids): 

307 if chunk_id not in seen_ids and ( 

308 existing_ids is None or chunk_id not in existing_ids 

309 ): 

310 new_chunks.append(chunk) 

311 new_ids.append(chunk_id) 

312 seen_ids.add(chunk_id) 

313 return new_chunks, new_ids 

314 

315 def _get_or_create_rag_index(self, collection_id: str) -> RAGIndex: 

316 """Get or create RAGIndex record for the current configuration.""" 

317 with get_user_db_session(self.username, self.db_password) as session: 

318 # Use collection_<uuid> format 

319 collection_name = f"collection_{collection_id}" 

320 index_hash = self._get_index_hash( 

321 collection_name, self.embedding_model, self.embedding_provider 

322 ) 

323 

324 # Try to get existing index 

325 rag_index = ( 

326 session.query(RAGIndex).filter_by(index_hash=index_hash).first() 

327 ) 

328 

329 if not rag_index: 

330 # Create new index record 

331 index_path = self._get_index_path(index_hash) 

332 

333 # Get embedding dimension by embedding a test string 

334 test_embedding = self.embedding_manager.embeddings.embed_query( 

335 "test" 

336 ) 

337 embedding_dim = len(test_embedding) 

338 

339 rag_index = RAGIndex( 

340 collection_name=collection_name, 

341 embedding_model=self.embedding_model, 

342 embedding_model_type=EmbeddingProvider( 

343 self.embedding_provider 

344 ), 

345 embedding_dimension=embedding_dim, 

346 index_path=str(index_path), 

347 index_hash=index_hash, 

348 chunk_size=self.chunk_size, 

349 chunk_overlap=self.chunk_overlap, 

350 splitter_type=self.splitter_type, 

351 text_separators=self.text_separators, 

352 distance_metric=self.distance_metric, 

353 normalize_vectors=self.normalize_vectors, 

354 index_type=self.index_type, 

355 chunk_count=0, 

356 total_documents=0, 

357 status="active", 

358 is_current=True, 

359 ) 

360 session.add(rag_index) 

361 session.commit() 

362 session.refresh(rag_index) 

363 logger.info(f"Created new RAG index: {index_hash}") 

364 

365 return rag_index 

366 

367 def _quarantine_corrupt_index(self, index_path: Path, reason: str) -> None: 

368 """Rename a corrupted FAISS index and its ``.pkl`` companion to 

369 ``<path>.corrupt-<ns>`` instead of deleting them. 

370 

371 Preserves user data for inspection/recovery. The 

372 dimension-mismatch branch elsewhere in this method intentionally 

373 *deletes* — that case rebuilds from scratch and the old bytes 

374 are unreadable with the new model. This helper is for the two 

375 "transient or unknown failure" branches (verify_file said no, 

376 load_local raised) where the on-disk bytes may still be 

377 usable by a human. 

378 

379 Raises ``OSError`` on rename failure (disk full, read-only fs, 

380 permission denied). Re-raising prevents silent data loss: if we 

381 swallowed the error, the next ``save_local`` would truncate the 

382 corrupt bytes anyway. Caller paths log the exception via the 

383 broader try/except around their indexer call. 

384 

385 # TODO(#4197-followup): FileIntegrityRecord.consecutive_failures 

386 # is not reset by the next record_file call, leaking failure 

387 # counts across recovery cycles. Orthogonal to this fix. 

388 """ 

389 ns = time.time_ns() 

390 pkl_path = index_path.with_suffix(".pkl") 

391 

392 # Same-nanosecond collisions essentially can't happen between 

393 # concurrent threads (we hold the per-path lock), but a user 

394 # could manually create such files. Loop with a hard cap so a 

395 # weird state surfaces as a clean OSError rather than a hang. 

396 suffix_n = 0 

397 faiss_target = Path(f"{index_path}.corrupt-{ns}") 

398 pkl_target = Path(f"{pkl_path}.corrupt-{ns}") 

399 while faiss_target.exists() or pkl_target.exists(): 

400 suffix_n += 1 

401 if suffix_n > _QUARANTINE_SUFFIX_RETRY_CAP: 

402 raise OSError( 

403 f"Quarantine path collisions exceeded " 

404 f"{_QUARANTINE_SUFFIX_RETRY_CAP} retries for " 

405 f"{index_path}" 

406 ) 

407 faiss_target = Path(f"{index_path}.corrupt-{ns}-{suffix_n}") 

408 pkl_target = Path(f"{pkl_path}.corrupt-{ns}-{suffix_n}") 

409 

410 try: 

411 index_path.rename(faiss_target) 

412 logger.warning( 

413 f"Quarantined corrupted FAISS index to {faiss_target} " 

414 f"(reason: {reason})" 

415 ) 

416 if pkl_path.exists(): 

417 pkl_path.rename(pkl_target) 

418 logger.info(f"Quarantined companion PKL to {pkl_target}") 

419 else: 

420 # Missing .pkl is recoverable (FAISS may have crashed 

421 # mid-write between .faiss and .pkl). Not re-raised: 

422 # the .faiss is already preserved, fresh build will 

423 # write a fresh .pkl. 

424 logger.warning( 

425 f"PKL companion missing at {pkl_path}; only " 

426 f".faiss quarantined." 

427 ) 

428 except OSError: 

429 # Disk-full, read-only fs, or permission error. Re-raise 

430 # so the caller surfaces a real failure instead of letting 

431 # the next save_local truncate the corrupt bytes. 

432 logger.exception( 

433 f"Failed to quarantine corrupted index at {index_path}" 

434 ) 

435 raise 

436 

437 # Best-effort retention sweep. The quarantine itself succeeded 

438 # above; failing to prune older files is not a correctness 

439 # issue, just a disk-usage one — log and move on. 

440 self._prune_old_quarantined_files(index_path) 

441 

442 @staticmethod 

443 def _corrupt_sort_key(path: Path) -> Tuple[int, int]: 

444 """Extract ``(ns, suffix_n)`` from a ``.corrupt-<ns>[-<n>]`` 

445 filename so retention can sort by the monotonic nanosecond 

446 suffix the quarantine path embeds — *not* by ``st_mtime``. 

447 

448 Filesystem timestamp granularity is sometimes 1s or 2s 

449 (FAT32/ext3/SMB shares), making mtime ordering non-deterministic 

450 when multiple quarantines happen within one tick. The 

451 ``.corrupt-<ns>`` suffix carries the original ``time.time_ns()`` 

452 from the quarantine and is reliable across all filesystems. 

453 

454 Returns ``(-1, -1)`` for malformed names (manually-placed 

455 files) so they sort below any real entry under ``reverse=True`` 

456 — i.e., they get pruned first. 

457 """ 

458 name = path.name 

459 marker = ".corrupt-" 

460 idx = name.rfind(marker) 

461 if idx == -1: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 return (-1, -1) 

463 tail = name[idx + len(marker) :] 

464 parts = tail.split("-") 

465 try: 

466 ns = int(parts[0]) 

467 except ValueError: 

468 return (-1, -1) 

469 suffix_n = 0 

470 if len(parts) > 1: 

471 try: 

472 suffix_n = int(parts[-1]) 

473 except ValueError: 

474 # Unknown trailing component — keep the file but at the 

475 # base ns ordering. 

476 pass 

477 return (ns, suffix_n) 

478 

479 def _prune_old_quarantined_files(self, index_path: Path) -> None: 

480 """Keep only the ``_QUARANTINE_KEEP_RECENT`` most-recent 

481 ``.corrupt-*`` files for ``index_path`` and its ``.pkl`` 

482 companion. Sweeps the two sides independently — pairs share 

483 the same ``-<ns>`` suffix so they're ordered identically. 

484 

485 Ordering uses the embedded ``-<ns>`` from the quarantine 

486 filename, not ``st_mtime``: file systems with 1-2s timestamp 

487 granularity can otherwise produce non-deterministic retention 

488 on bursts. 

489 

490 Best-effort: logs and swallows any error so a sweep failure 

491 never propagates back into the indexing path. 

492 """ 

493 parent = index_path.parent 

494 pkl_path = index_path.with_suffix(".pkl") 

495 

496 for base in (index_path, pkl_path): 

497 pattern = f"{base.name}.corrupt-*" 

498 try: 

499 # Sort newest-first by the embedded -<ns>; everything 

500 # past the keep window is stale. 

501 candidates = sorted( 

502 parent.glob(pattern), 

503 key=self._corrupt_sort_key, 

504 reverse=True, 

505 ) 

506 except OSError: 

507 logger.warning( 

508 f"Failed to enumerate {pattern} in {parent} for " 

509 f"quarantine retention sweep" 

510 ) 

511 continue 

512 

513 for stale in candidates[_QUARANTINE_KEEP_RECENT:]: 

514 try: 

515 stale.unlink() 

516 logger.info(f"Pruned old quarantined file: {stale}") 

517 except OSError: 

518 logger.warning( 

519 f"Failed to prune quarantined file {stale}; continuing" 

520 ) 

521 

522 def _merge_and_persist_locked( 

523 self, 

524 index_path: Path, 

525 chunks_to_add: list, 

526 embedding_ids: list, 

527 force_reindex: bool = False, 

528 ) -> Dict[str, int]: 

529 """Read-modify-write the on-disk FAISS index under the 

530 ``(username, index_path)`` write lock so concurrent workers 

531 don't lose each other's embeddings. 

532 

533 Without this, two workers indexing different documents into 

534 the same collection both load on-disk state X into memory, 

535 each calls ``add_documents`` on their own private FAISS object 

536 (worker A: ``X+docA``, worker B: ``X+docB``), then they save 

537 in sequence — last writer wins, the loser's chunks are gone 

538 from the FAISS file. The chunks survive in the per-document 

539 DB rows so a force-reindex rebuilds, but the index file is 

540 wrong until then. See AI review of #4200. 

541 

542 The slow embedding + DB-chunk-insert path stays OUTSIDE this 

543 lock (handled by the caller). The lock only covers the fast 

544 FAISS reload→merge→save sequence (~10-100ms even for large 

545 indices), so cross-document parallelism is preserved up to 

546 the point where the in-memory states need reconciliation. 

547 

548 Args: 

549 index_path: Path of the on-disk ``.faiss`` file. 

550 chunks_to_add: Chunks the caller wants to add. 

551 embedding_ids: Matching embedding/chunk IDs (same length). 

552 force_reindex: If True, delete any IDs from 

553 ``embedding_ids`` that already exist in the index 

554 (so the caller's metadata wins). If False, skip IDs 

555 that already exist (idempotent re-indexing). 

556 

557 Returns: 

558 ``{"added": n, "skipped": m}`` for caller logging. 

559 """ 

560 with _get_faiss_write_lock(self.username, str(index_path)): 

561 # Reload from disk to absorb other writers' saves. If the 

562 # file doesn't exist or fails verification, keep whatever 

563 # the caller already had in ``self.faiss_index`` (likely 

564 # a fresh in-memory index from load_or_create_faiss_index). 

565 if index_path.exists(): 

566 verified, _reason = self.integrity_manager.verify_file( 

567 index_path 

568 ) 

569 if verified: 569 ↛ 592line 569 didn't jump to line 592 because the condition on line 569 was always true

570 try: 

571 self.faiss_index = FAISS.load_local( 

572 str(index_path.parent), 

573 self.embedding_manager.embeddings, 

574 index_name=index_path.stem, 

575 allow_dangerous_deserialization=True, 

576 normalize_L2=True, 

577 ) 

578 except Exception: 

579 # Reload failed (torn write, etc.). Keep 

580 # in-memory state — it's stale but valid; 

581 # better than losing this write entirely. 

582 logger.warning( 

583 "Failed to reload FAISS for merge; " 

584 "proceeding with in-memory state." 

585 ) 

586 

587 # Force-reindex: remove old copies of IDs we're about to 

588 # re-add, so updated metadata replaces stale. Dedup via 

589 # set→list because ``embedding_ids`` can contain repeats 

590 # (same chunk hash appearing twice in the document) — 

591 # FAISS.delete with duplicate IDs raises. 

592 if force_reindex and hasattr(self.faiss_index, "docstore"): 

593 fresh_ids = set(self.faiss_index.docstore._dict.keys()) 

594 old_chunk_ids = list( 

595 {eid for eid in embedding_ids if eid in fresh_ids} 

596 ) 

597 if old_chunk_ids: 

598 logger.info( 

599 f"Force re-index: removing {len(old_chunk_ids)} " 

600 f"existing chunks from FAISS" 

601 ) 

602 self.faiss_index.delete(old_chunk_ids) 

603 

604 # Dedup against the freshly-loaded state. 

605 if not force_reindex and hasattr(self.faiss_index, "docstore"): 

606 fresh_ids = set(self.faiss_index.docstore._dict.keys()) 

607 else: 

608 fresh_ids = None 

609 

610 new_chunks, new_ids = self._deduplicate_chunks( 

611 chunks_to_add, embedding_ids, fresh_ids 

612 ) 

613 

614 if new_chunks: 

615 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

616 

617 self.faiss_index.save_local( 

618 str(index_path.parent), index_name=index_path.stem 

619 ) 

620 self.integrity_manager.record_file( 

621 index_path, 

622 related_entity_type="rag_index", 

623 related_entity_id=self.rag_index_record.id, 

624 ) 

625 

626 return { 

627 "added": len(new_chunks), 

628 "skipped": len(chunks_to_add) - len(new_chunks), 

629 "added_ids": new_ids, 

630 } 

631 

632 def load_or_create_faiss_index(self, collection_id: str) -> FAISS: 

633 """ 

634 Load existing FAISS index or create new one. 

635 

636 Args: 

637 collection_id: UUID of the collection 

638 

639 Returns: 

640 FAISS vector store instance 

641 """ 

642 rag_index = self._get_or_create_rag_index(collection_id) 

643 self.rag_index_record = rag_index 

644 

645 index_path = Path(rag_index.index_path) 

646 

647 # Hold the per-(username, index_path) write lock across the 

648 # entire verify → quarantine → load sequence. A narrower scope 

649 # leaves room for a concurrent save_local to race the 

650 # verification (verify sees bytes A, save overwrites with bytes 

651 # B, load_local reads bytes B which no longer match verified 

652 # checksum). See #4197. The fresh-build path below this block 

653 # is in-memory only and doesn't need the lock. 

654 if index_path.exists(): 

655 load_lock = _get_faiss_write_lock(self.username, str(index_path)) 

656 with load_lock: 

657 # Verify integrity before loading 

658 verified, reason = self.integrity_manager.verify_file( 

659 index_path 

660 ) 

661 if not verified: 

662 logger.error( 

663 f"Integrity verification failed for {index_path}: " 

664 f"{reason}. Quarantining for recovery; creating " 

665 f"new index." 

666 ) 

667 self._quarantine_corrupt_index(index_path, reason) 

668 else: 

669 try: 

670 # Check for embedding dimension mismatch before loading 

671 current_dim = len( 

672 self.embedding_manager.embeddings.embed_query( 

673 "dimension_check" 

674 ) 

675 ) 

676 stored_dim = rag_index.embedding_dimension 

677 

678 if stored_dim and current_dim != stored_dim: 

679 logger.warning( 

680 f"Embedding dimension mismatch detected! " 

681 f"Index created with dim={stored_dim}, " 

682 f"current model returns dim={current_dim}. " 

683 f"Deleting old index and rebuilding." 

684 ) 

685 # Delete old index files (legitimate deletion: 

686 # the bytes are unreadable with the new model). 

687 try: 

688 index_path.unlink() 

689 pkl_path = index_path.with_suffix(".pkl") 

690 if pkl_path.exists(): 690 ↛ 692line 690 didn't jump to line 692 because the condition on line 690 was always true

691 pkl_path.unlink() 

692 logger.info( 

693 f"Deleted old FAISS index files at {index_path}" 

694 ) 

695 except Exception: 

696 logger.exception( 

697 "Failed to delete old index files" 

698 ) 

699 

700 # Update RAGIndex with new dimension and reset counts 

701 with get_user_db_session( 

702 self.username, self.db_password 

703 ) as session: 

704 idx = ( 

705 session.query(RAGIndex) 

706 .filter_by(id=rag_index.id) 

707 .first() 

708 ) 

709 if idx: 709 ↛ 719line 709 didn't jump to line 719 because the condition on line 709 was always true

710 idx.embedding_dimension = current_dim 

711 idx.chunk_count = 0 

712 idx.total_documents = 0 

713 session.commit() 

714 logger.info( 

715 f"Updated RAGIndex dimension to {current_dim}" 

716 ) 

717 

718 # Clear rag_document_status for this index 

719 session.query(RagDocumentStatus).filter_by( 

720 rag_index_id=rag_index.id 

721 ).delete() 

722 session.commit() 

723 logger.info( 

724 "Cleared indexed status for documents in this " 

725 "collection" 

726 ) 

727 

728 # Update local reference for index creation below 

729 rag_index.embedding_dimension = current_dim 

730 # Fall through to create new index below 

731 else: 

732 # Dimensions match (or no stored dimension), load index 

733 faiss_index = FAISS.load_local( 

734 str(index_path.parent), 

735 self.embedding_manager.embeddings, 

736 index_name=index_path.stem, 

737 allow_dangerous_deserialization=True, 

738 normalize_L2=True, 

739 ) 

740 logger.info( 

741 f"Loaded existing FAISS index from {index_path}" 

742 ) 

743 return faiss_index 

744 except Exception: 

745 # load_local raised (torn .pkl, malformed pickle, 

746 # FAISS read failure). The .faiss bytes may still 

747 # be recoverable, so quarantine before falling 

748 # through to fresh build — don't just leave 

749 # broken state on disk. 

750 logger.warning( 

751 "Failed to load FAISS index, quarantining " 

752 "and creating new one" 

753 ) 

754 if index_path.exists(): 754 ↛ 760line 754 didn't jump to line 760

755 self._quarantine_corrupt_index( 

756 index_path, "load_local_raised" 

757 ) 

758 

759 # Create new FAISS index with configurable type and distance metric 

760 logger.info( 

761 f"Creating new FAISS index: type={self.index_type}, metric={self.distance_metric}, dimension={rag_index.embedding_dimension}" 

762 ) 

763 

764 # Create index based on type and distance metric 

765 if self.index_type == "hnsw": 

766 # HNSW: Fast approximate search, best for large collections 

767 # M=32 is a good default for connections per layer 

768 index = IndexHNSWFlat(rag_index.embedding_dimension, 32) 

769 logger.info("Created HNSW index with M=32 connections") 

770 elif self.index_type == "ivf": 

771 # IVF requires training, for now fall back to flat 

772 # TODO: Implement IVF with proper training 

773 logger.warning( 

774 "IVF index type not yet fully implemented, using Flat index" 

775 ) 

776 if self.distance_metric in ("cosine", "dot_product"): 

777 index = IndexFlatIP(rag_index.embedding_dimension) 

778 else: 

779 index = IndexFlatL2(rag_index.embedding_dimension) 

780 else: # "flat" or default 

781 # Flat index: Exact search 

782 if self.distance_metric in ("cosine", "dot_product"): 

783 # For cosine similarity, use inner product (IP) with normalized vectors 

784 index = IndexFlatIP(rag_index.embedding_dimension) 

785 logger.info( 

786 "Created Flat index with Inner Product (for cosine similarity)" 

787 ) 

788 else: # l2 

789 index = IndexFlatL2(rag_index.embedding_dimension) 

790 logger.info("Created Flat index with L2 distance") 

791 

792 faiss_index = FAISS( 

793 self.embedding_manager.embeddings, 

794 index=index, 

795 docstore=InMemoryDocstore(), # Minimal - chunks in DB 

796 index_to_docstore_id={}, 

797 normalize_L2=self.normalize_vectors, # Use configurable normalization 

798 ) 

799 logger.info( 

800 f"FAISS index created with normalization={self.normalize_vectors}" 

801 ) 

802 return faiss_index 

803 

804 def get_current_index_info( 

805 self, collection_id: Optional[str] = None 

806 ) -> Optional[Dict[str, Any]]: 

807 """ 

808 Get information about the current RAG index for a collection. 

809 

810 Args: 

811 collection_id: UUID of collection (defaults to Library if None) 

812 """ 

813 with get_user_db_session(self.username, self.db_password) as session: 

814 # Get collection name in the format stored in RAGIndex (collection_<uuid>) 

815 if collection_id: 

816 collection = ( 

817 session.query(Collection) 

818 .filter_by(id=collection_id) 

819 .first() 

820 ) 

821 collection_name = ( 

822 f"collection_{collection_id}" if collection else "unknown" 

823 ) 

824 else: 

825 # Default to Library collection 

826 from ...database.library_init import get_default_library_id 

827 

828 collection_id = get_default_library_id( 

829 self.username, self.db_password 

830 ) 

831 collection_name = f"collection_{collection_id}" 

832 

833 rag_index = ( 

834 session.query(RAGIndex) 

835 .filter_by(collection_name=collection_name, is_current=True) 

836 .first() 

837 ) 

838 

839 if not rag_index: 

840 # Debug: check all RAG indices for this collection 

841 all_indices = session.query(RAGIndex).all() 

842 logger.info( 

843 f"No RAG index found for collection_name='{collection_name}'. All indices: {[(idx.collection_name, idx.is_current) for idx in all_indices]}" 

844 ) 

845 return None 

846 

847 # Calculate actual counts from rag_document_status table 

848 from ...database.models.library import RagDocumentStatus 

849 

850 actual_chunk_count = ( 

851 session.query(func.sum(RagDocumentStatus.chunk_count)) 

852 .filter_by(collection_id=collection_id) 

853 .scalar() 

854 or 0 

855 ) 

856 

857 actual_doc_count = ( 

858 session.query(RagDocumentStatus) 

859 .filter_by(collection_id=collection_id) 

860 .count() 

861 ) 

862 

863 return { 

864 "embedding_model": rag_index.embedding_model, 

865 "embedding_model_type": rag_index.embedding_model_type.value 

866 if rag_index.embedding_model_type 

867 else None, 

868 "embedding_dimension": rag_index.embedding_dimension, 

869 "chunk_size": rag_index.chunk_size, 

870 "chunk_overlap": rag_index.chunk_overlap, 

871 "chunk_count": actual_chunk_count, 

872 "total_documents": actual_doc_count, 

873 "created_at": rag_index.created_at.isoformat(), 

874 "last_updated_at": rag_index.last_updated_at.isoformat(), 

875 } 

876 

877 def index_document( 

878 self, document_id: str, collection_id: str, force_reindex: bool = False 

879 ) -> Dict[str, Any]: 

880 """ 

881 Index a single document into RAG for a specific collection. 

882 

883 Args: 

884 document_id: UUID of the Document to index 

885 collection_id: UUID of the Collection to index for 

886 force_reindex: Whether to force reindexing even if already indexed 

887 

888 Returns: 

889 Dict with status, chunk_count, and any errors 

890 """ 

891 with get_user_db_session(self.username, self.db_password) as session: 

892 # Get the document 

893 document = session.query(Document).filter_by(id=document_id).first() 

894 

895 if not document: 

896 return {"status": "error", "error": "Document not found"} 

897 

898 # Get or create DocumentCollection entry 

899 doc_collection = ensure_in_collection( 

900 session, document_id, collection_id 

901 ) 

902 

903 # Check if already indexed for this collection 

904 if doc_collection.indexed and not force_reindex: 

905 return { 

906 "status": "skipped", 

907 "message": "Document already indexed for this collection", 

908 "chunk_count": doc_collection.chunk_count, 

909 } 

910 

911 # Validate text content 

912 if not document.text_content: 

913 return { 

914 "status": "error", 

915 "error": "Document has no text content", 

916 } 

917 

918 try: 

919 # Create LangChain Document from text 

920 doc = LangchainDocument( 

921 page_content=document.text_content, 

922 metadata={ 

923 "source": document.original_url, 

924 "document_id": document_id, # Add document ID for source linking 

925 "collection_id": collection_id, # Add collection ID 

926 "title": document.title 

927 or document.filename 

928 or "Untitled", 

929 "document_title": document.title 

930 or document.filename 

931 or "Untitled", # Add for compatibility 

932 "authors": document.authors, 

933 "published_date": str(document.published_date) 

934 if document.published_date 

935 else None, 

936 "doi": document.doi, 

937 "arxiv_id": document.arxiv_id, 

938 "pmid": document.pmid, 

939 "pmcid": document.pmcid, 

940 "extraction_method": document.extraction_method, 

941 "word_count": document.word_count, 

942 }, 

943 ) 

944 

945 # Split into chunks 

946 chunks = self.text_splitter.split_documents([doc]) 

947 logger.info( 

948 f"Split document {document_id} into {len(chunks)} chunks" 

949 ) 

950 

951 # Get collection name for chunk storage 

952 collection = ( 

953 session.query(Collection) 

954 .filter_by(id=collection_id) 

955 .first() 

956 ) 

957 # Use collection_<uuid> format for internal storage 

958 collection_name = ( 

959 f"collection_{collection_id}" if collection else "unknown" 

960 ) 

961 

962 # Store chunks in database using embedding manager 

963 embedding_ids = self.embedding_manager._store_chunks_to_db( 

964 chunks=chunks, 

965 collection_name=collection_name, 

966 source_type="document", 

967 source_id=document_id, 

968 ) 

969 

970 # Load or create FAISS index (lazy; the merge step 

971 # below will reload from disk under the lock anyway). 

972 if self.faiss_index is None: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true

973 self.faiss_index = self.load_or_create_faiss_index( 

974 collection_id 

975 ) 

976 

977 # Read-modify-write the on-disk FAISS index under 

978 # the per-(user, index_path) lock. The lock spans 

979 # reload + dedup + add + save so concurrent indexers 

980 # of different documents into the same collection 

981 # don't lose each other's embeddings (see AI review 

982 # of #4200). 

983 index_path = Path(self.rag_index_record.index_path) 

984 unique_count = len(set(embedding_ids)) 

985 batch_dups = len(chunks) - unique_count 

986 merge_stats = self._merge_and_persist_locked( 

987 index_path, 

988 chunks, 

989 embedding_ids, 

990 force_reindex=force_reindex, 

991 ) 

992 if merge_stats["added"]: 992 ↛ 1006line 992 didn't jump to line 1006 because the condition on line 992 was always true

993 if force_reindex: 993 ↛ 999line 993 didn't jump to line 999 because the condition on line 993 was always true

994 logger.info( 

995 f"Force re-index: added {merge_stats['added']} " 

996 f"chunks with updated metadata to FAISS index" 

997 ) 

998 else: 

999 already_exist = unique_count - merge_stats["added"] 

1000 logger.info( 

1001 f"Added {merge_stats['added']} new embeddings to FAISS " 

1002 f"({already_exist} already exist, " 

1003 f"{batch_dups} batch duplicates removed)" 

1004 ) 

1005 else: 

1006 logger.info( 

1007 f"All {len(chunks)} chunks already exist in FAISS index, skipping" 

1008 ) 

1009 logger.info( 

1010 f"Saved FAISS index to {index_path} with integrity tracking" 

1011 ) 

1012 

1013 from datetime import datetime, UTC 

1014 

1015 # Check if document was already indexed (for stats update) 

1016 existing_status = ( 

1017 session.query(RagDocumentStatus) 

1018 .filter_by( 

1019 document_id=document_id, collection_id=collection_id 

1020 ) 

1021 .first() 

1022 ) 

1023 was_already_indexed = existing_status is not None 

1024 

1025 # Mark document as indexed using rag_document_status table 

1026 # Row existence = indexed, simple and clean 

1027 timestamp = datetime.now(UTC) 

1028 

1029 # Create or update RagDocumentStatus using ORM merge (atomic upsert) 

1030 rag_status = RagDocumentStatus( 

1031 document_id=document_id, 

1032 collection_id=collection_id, 

1033 rag_index_id=self.rag_index_record.id, 

1034 chunk_count=len(chunks), 

1035 indexed_at=timestamp, 

1036 ) 

1037 session.merge(rag_status) 

1038 

1039 logger.info( 

1040 f"Marked document as indexed in rag_document_status: doc_id={document_id}, coll_id={collection_id}, chunks={len(chunks)}" 

1041 ) 

1042 

1043 # Also update DocumentCollection table for backward compatibility 

1044 session.query(DocumentCollection).filter_by( 

1045 document_id=document_id, collection_id=collection_id 

1046 ).update( 

1047 { 

1048 "indexed": True, 

1049 "chunk_count": len(chunks), 

1050 "last_indexed_at": timestamp, 

1051 } 

1052 ) 

1053 

1054 logger.info( 

1055 "Also updated DocumentCollection.indexed for backward compatibility" 

1056 ) 

1057 

1058 # Update RAGIndex statistics (only if not already indexed) 

1059 rag_index_obj = ( 

1060 session.query(RAGIndex) 

1061 .filter_by(id=self.rag_index_record.id) 

1062 .first() 

1063 ) 

1064 if rag_index_obj and not was_already_indexed: 

1065 rag_index_obj.chunk_count += len(chunks) 

1066 rag_index_obj.total_documents += 1 

1067 rag_index_obj.last_updated_at = datetime.now(UTC) 

1068 logger.info( 

1069 f"Updated RAGIndex stats: chunk_count +{len(chunks)}, total_documents +1" 

1070 ) 

1071 

1072 # Flush ORM changes to database before commit 

1073 session.flush() 

1074 logger.info(f"Flushed ORM changes for document {document_id}") 

1075 

1076 # Commit the transaction. Durability is provided by 

1077 # synchronous=NORMAL (sqlcipher_utils.py); SQLite 

1078 # auto-checkpoints WAL at wal_autocheckpoint=250 frames. 

1079 # An explicit PRAGMA wal_checkpoint(FULL) here used to 

1080 # block other writers long enough to exhaust busy_timeout 

1081 # under bulk-download concurrency (#4197). 

1082 session.commit() 

1083 

1084 logger.info( 

1085 f"Successfully indexed document {document_id} for collection {collection_id} " 

1086 f"with {len(chunks)} chunks" 

1087 ) 

1088 

1089 return { 

1090 "status": "success", 

1091 "chunk_count": len(chunks), 

1092 "embedding_ids": embedding_ids, 

1093 } 

1094 

1095 except Exception as e: 

1096 # The session is shared (thread-local) with the caller. 

1097 # If session.flush() or session.commit() raised, the session 

1098 # is in PendingRollbackError state until rolled back — 

1099 # leaving subsequent operations to cascade. Roll back BEFORE 

1100 # returning the error dict so the caller sees a clean 

1101 # session. (Same pattern as the #3827 fix.) 

1102 safe_rollback(session, "library_rag_service.index_document") 

1103 logger.exception( 

1104 f"Error indexing document {document_id} for collection {collection_id}" 

1105 ) 

1106 return { 

1107 "status": "error", 

1108 "error": f"Operation failed: {type(e).__name__}", 

1109 } 

1110 

1111 def index_all_documents( 

1112 self, 

1113 collection_id: str, 

1114 force_reindex: bool = False, 

1115 progress_callback=None, 

1116 ) -> Dict[str, Any]: 

1117 """ 

1118 Index all documents in a collection into RAG. 

1119 

1120 Args: 

1121 collection_id: UUID of the collection to index 

1122 force_reindex: Whether to force reindexing already indexed documents 

1123 progress_callback: Optional callback function called after each document with (current, total, doc_title, status) 

1124 

1125 Returns: 

1126 Dict with counts of successful, skipped, and failed documents 

1127 """ 

1128 with get_user_db_session(self.username, self.db_password) as session: 

1129 # Get all DocumentCollection entries for this collection 

1130 query = session.query(DocumentCollection).filter_by( 

1131 collection_id=collection_id 

1132 ) 

1133 

1134 if not force_reindex: 

1135 # Only index documents that haven't been indexed yet 

1136 query = query.filter_by(indexed=False) 

1137 

1138 doc_collections = query.all() 

1139 

1140 if not doc_collections: 

1141 return { 

1142 "status": "info", 

1143 "message": "No documents to index", 

1144 "successful": 0, 

1145 "skipped": 0, 

1146 "failed": 0, 

1147 } 

1148 

1149 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

1150 total = len(doc_collections) 

1151 

1152 for idx, doc_collection in enumerate(doc_collections, 1): 

1153 # Get the document for title info 

1154 document = ( 

1155 session.query(Document) 

1156 .filter_by(id=doc_collection.document_id) 

1157 .first() 

1158 ) 

1159 title = document.title if document else "Unknown" 

1160 

1161 result = self.index_document( 

1162 doc_collection.document_id, collection_id, force_reindex 

1163 ) 

1164 

1165 if result["status"] == "success": 

1166 results["successful"] += 1 

1167 elif result["status"] == "skipped": 

1168 results["skipped"] += 1 

1169 else: 

1170 results["failed"] += 1 

1171 results["errors"].append( 

1172 { 

1173 "doc_id": doc_collection.document_id, 

1174 "title": title, 

1175 "error": result.get("error"), 

1176 } 

1177 ) 

1178 

1179 # Call progress callback if provided 

1180 if progress_callback: 

1181 progress_callback(idx, total, title, result["status"]) 

1182 

1183 logger.info( 

1184 f"Indexed collection {collection_id}: " 

1185 f"{results['successful']} successful, " 

1186 f"{results['skipped']} skipped, " 

1187 f"{results['failed']} failed" 

1188 ) 

1189 

1190 return results 

1191 

1192 def remove_document_from_rag( 

1193 self, document_id: str, collection_id: str 

1194 ) -> Dict[str, Any]: 

1195 """ 

1196 Remove a document's chunks from RAG for a specific collection. 

1197 

1198 Args: 

1199 document_id: UUID of the Document to remove 

1200 collection_id: UUID of the Collection to remove from 

1201 

1202 Returns: 

1203 Dict with status and count of removed chunks 

1204 """ 

1205 with get_user_db_session(self.username, self.db_password) as session: 

1206 # Get the DocumentCollection entry 

1207 doc_collection = ( 

1208 session.query(DocumentCollection) 

1209 .filter_by(document_id=document_id, collection_id=collection_id) 

1210 .first() 

1211 ) 

1212 

1213 if not doc_collection: 

1214 return { 

1215 "status": "error", 

1216 "error": "Document not found in collection", 

1217 } 

1218 

1219 try: 

1220 # Get collection name in the format collection_<uuid> 

1221 collection = ( 

1222 session.query(Collection) 

1223 .filter_by(id=collection_id) 

1224 .first() 

1225 ) 

1226 # Use collection_<uuid> format for internal storage 

1227 collection_name = ( 

1228 f"collection_{collection_id}" if collection else "unknown" 

1229 ) 

1230 

1231 # Delete chunks from database 

1232 deleted_count = self.embedding_manager._delete_chunks_from_db( 

1233 collection_name=collection_name, 

1234 source_id=document_id, 

1235 ) 

1236 

1237 # Update DocumentCollection RAG status 

1238 doc_collection.indexed = False 

1239 doc_collection.chunk_count = 0 

1240 doc_collection.last_indexed_at = None 

1241 session.commit() 

1242 

1243 logger.info( 

1244 f"Removed {deleted_count} chunks for document {document_id} from collection {collection_id}" 

1245 ) 

1246 

1247 return {"status": "success", "deleted_count": deleted_count} 

1248 

1249 except Exception as e: 

1250 # session.commit() above can raise; without rollback the 

1251 # shared thread-local session stays poisoned for the 

1252 # caller's next operation (issue #3827 pattern). 

1253 safe_rollback( 

1254 session, "library_rag_service.remove_document_from_rag" 

1255 ) 

1256 logger.exception( 

1257 f"Error removing document {document_id} from collection {collection_id}" 

1258 ) 

1259 return { 

1260 "status": "error", 

1261 "error": f"Operation failed: {type(e).__name__}", 

1262 } 

1263 

1264 def index_documents_batch( 

1265 self, 

1266 doc_info: List[tuple], 

1267 collection_id: str, 

1268 force_reindex: bool = False, 

1269 ) -> Dict[str, Dict[str, Any]]: 

1270 """ 

1271 Index multiple documents in a batch for a specific collection. 

1272 

1273 Args: 

1274 doc_info: List of (doc_id, title) tuples 

1275 collection_id: UUID of the collection to index for 

1276 force_reindex: Whether to force reindexing even if already indexed 

1277 

1278 Returns: 

1279 Dict mapping doc_id to individual result 

1280 """ 

1281 results = {} 

1282 doc_ids = [doc_id for doc_id, _ in doc_info] 

1283 

1284 # Use single database session for querying 

1285 with get_user_db_session(self.username, self.db_password) as session: 

1286 # Pre-load all documents for this batch 

1287 documents = ( 

1288 session.query(Document).filter(Document.id.in_(doc_ids)).all() 

1289 ) 

1290 

1291 # Create lookup for quick access 

1292 doc_lookup = {doc.id: doc for doc in documents} 

1293 

1294 # Pre-load DocumentCollection entries 

1295 doc_collections = ( 

1296 session.query(DocumentCollection) 

1297 .filter( 

1298 DocumentCollection.document_id.in_(doc_ids), 

1299 DocumentCollection.collection_id == collection_id, 

1300 ) 

1301 .all() 

1302 ) 

1303 doc_collection_lookup = { 

1304 dc.document_id: dc for dc in doc_collections 

1305 } 

1306 

1307 # Process each document in the batch 

1308 for doc_id, title in doc_info: 

1309 document = doc_lookup.get(doc_id) 

1310 

1311 if not document: 

1312 results[doc_id] = { 

1313 "status": "error", 

1314 "error": "Document not found", 

1315 } 

1316 continue 

1317 

1318 # Check if already indexed via DocumentCollection 

1319 doc_collection = doc_collection_lookup.get(doc_id) 

1320 if ( 

1321 doc_collection 

1322 and doc_collection.indexed 

1323 and not force_reindex 

1324 ): 

1325 results[doc_id] = { 

1326 "status": "skipped", 

1327 "message": "Document already indexed for this collection", 

1328 "chunk_count": doc_collection.chunk_count, 

1329 } 

1330 continue 

1331 

1332 # Validate text content 

1333 if not document.text_content: 

1334 results[doc_id] = { 

1335 "status": "error", 

1336 "error": "Document has no text content", 

1337 } 

1338 continue 

1339 

1340 # Index the document 

1341 try: 

1342 result = self.index_document( 

1343 doc_id, collection_id, force_reindex 

1344 ) 

1345 results[doc_id] = result 

1346 except Exception as e: 

1347 logger.exception( 

1348 f"Error indexing document {doc_id} in batch" 

1349 ) 

1350 results[doc_id] = { 

1351 "status": "error", 

1352 "error": f"Indexing failed: {type(e).__name__}", 

1353 } 

1354 

1355 return results 

1356 

1357 def get_rag_stats( 

1358 self, collection_id: Optional[str] = None 

1359 ) -> Dict[str, Any]: 

1360 """ 

1361 Get RAG statistics for a collection. 

1362 

1363 Args: 

1364 collection_id: UUID of the collection (defaults to Library) 

1365 

1366 Returns: 

1367 Dict with counts and metadata about indexed documents 

1368 """ 

1369 with get_user_db_session(self.username, self.db_password) as session: 

1370 # Get collection ID (default to Library) 

1371 if not collection_id: 1371 ↛ 1372line 1371 didn't jump to line 1372 because the condition on line 1371 was never true

1372 from ...database.library_init import get_default_library_id 

1373 

1374 collection_id = get_default_library_id( 

1375 self.username, self.db_password 

1376 ) 

1377 

1378 # Count total documents in collection 

1379 total_docs = ( 

1380 session.query(DocumentCollection) 

1381 .filter_by(collection_id=collection_id) 

1382 .count() 

1383 ) 

1384 

1385 # Count indexed documents from rag_document_status table 

1386 from ...database.models.library import RagDocumentStatus 

1387 

1388 indexed_docs = ( 

1389 session.query(RagDocumentStatus) 

1390 .filter_by(collection_id=collection_id) 

1391 .count() 

1392 ) 

1393 

1394 # Count total chunks from rag_document_status table 

1395 total_chunks = ( 

1396 session.query(func.sum(RagDocumentStatus.chunk_count)) 

1397 .filter_by(collection_id=collection_id) 

1398 .scalar() 

1399 or 0 

1400 ) 

1401 

1402 # Get collection name in the format stored in DocumentChunk (collection_<uuid>) 

1403 collection = ( 

1404 session.query(Collection).filter_by(id=collection_id).first() 

1405 ) 

1406 collection_name = ( 

1407 f"collection_{collection_id}" if collection else "library" 

1408 ) 

1409 

1410 # Get embedding model info from chunks 

1411 chunk_sample = ( 

1412 session.query(DocumentChunk) 

1413 .filter_by(collection_name=collection_name) 

1414 .first() 

1415 ) 

1416 

1417 embedding_info = {} 

1418 if chunk_sample: 

1419 embedding_info = { 

1420 "model": chunk_sample.embedding_model, 

1421 "model_type": chunk_sample.embedding_model_type.value 

1422 if chunk_sample.embedding_model_type 

1423 else None, 

1424 "dimension": chunk_sample.embedding_dimension, 

1425 } 

1426 

1427 return { 

1428 "total_documents": total_docs, 

1429 "indexed_documents": indexed_docs, 

1430 "unindexed_documents": total_docs - indexed_docs, 

1431 "total_chunks": total_chunks, 

1432 "embedding_info": embedding_info, 

1433 "chunk_size": self.chunk_size, 

1434 "chunk_overlap": self.chunk_overlap, 

1435 } 

1436 

1437 def index_local_file(self, file_path: str) -> Dict[str, Any]: 

1438 """ 

1439 Index a local file from the filesystem into RAG. 

1440 

1441 Args: 

1442 file_path: Path to the file to index 

1443 

1444 Returns: 

1445 Dict with status, chunk_count, and any errors 

1446 """ 

1447 from pathlib import Path 

1448 import mimetypes 

1449 

1450 file_path = Path(file_path) 

1451 

1452 if not file_path.exists(): 

1453 return {"status": "error", "error": f"File not found: {file_path}"} 

1454 

1455 if not file_path.is_file(): 

1456 return {"status": "error", "error": f"Not a file: {file_path}"} 

1457 

1458 # Determine file type 

1459 mime_type, _ = mimetypes.guess_type(str(file_path)) 

1460 

1461 # Read file content based on type 

1462 try: 

1463 if file_path.suffix.lower() in [".txt", ".md", ".markdown"]: 

1464 # Text files 

1465 with open(file_path, "r", encoding="utf-8") as f: 

1466 content = f.read() 

1467 elif file_path.suffix.lower() in [".html", ".htm"]: 

1468 # HTML files - strip tags 

1469 from bs4 import BeautifulSoup 

1470 

1471 with open(file_path, "r", encoding="utf-8") as f: 

1472 soup = BeautifulSoup(f.read(), "html.parser") 

1473 content = soup.get_text() 

1474 elif file_path.suffix.lower() == ".pdf": 

1475 # PDF files - extract text 

1476 from pypdf import PdfReader 

1477 

1478 content = "" 

1479 with open(file_path, "rb") as f: 

1480 pdf_reader = PdfReader(f) 

1481 for page in pdf_reader.pages: 

1482 content += page.extract_text() 

1483 else: 

1484 return { 

1485 "status": "skipped", 

1486 "error": f"Unsupported file type: {file_path.suffix}", 

1487 } 

1488 

1489 if not content or len(content.strip()) < 10: 

1490 return { 

1491 "status": "error", 

1492 "error": "File has no extractable text content", 

1493 } 

1494 

1495 # Create LangChain Document from text 

1496 doc = LangchainDocument( 

1497 page_content=content, 

1498 metadata={ 

1499 "source": str(file_path), 

1500 "source_id": f"local_{file_path.stem}_{hash(str(file_path))}", 

1501 "title": file_path.stem, 

1502 "document_title": file_path.stem, 

1503 "file_type": file_path.suffix.lower(), 

1504 "file_size": file_path.stat().st_size, 

1505 "source_type": "local_file", 

1506 "collection": "local_library", 

1507 }, 

1508 ) 

1509 

1510 # Split into chunks 

1511 chunks = self.text_splitter.split_documents([doc]) 

1512 logger.info( 

1513 f"Split local file {file_path} into {len(chunks)} chunks" 

1514 ) 

1515 

1516 # Store chunks in database (returns UUID-based IDs) 

1517 embedding_ids = self.embedding_manager._store_chunks_to_db( 

1518 chunks=chunks, 

1519 collection_name="local_library", 

1520 source_type="local_file", 

1521 source_id=str(file_path), 

1522 ) 

1523 

1524 # Load or create FAISS index using default library collection 

1525 # (lazy; merge step below reloads under the lock anyway). 

1526 if self.faiss_index is None: 

1527 from ...database.library_init import get_default_library_id 

1528 

1529 default_collection_id = get_default_library_id( 

1530 self.username, self.db_password 

1531 ) 

1532 self.faiss_index = self.load_or_create_faiss_index( 

1533 default_collection_id 

1534 ) 

1535 

1536 # Read-modify-write the on-disk FAISS under the lock. 

1537 index_path = ( 

1538 Path(self.rag_index_record.index_path) 

1539 if self.rag_index_record 

1540 else None 

1541 ) 

1542 if index_path: 

1543 merge_stats = self._merge_and_persist_locked( 

1544 index_path, chunks, embedding_ids, force_reindex=False 

1545 ) 

1546 logger.info( 

1547 f"Saved FAISS index to {index_path} with integrity tracking" 

1548 ) 

1549 else: 

1550 # No persistent index path — in-memory only path. 

1551 # Preserve old behavior: dedup against in-memory state 

1552 # and add directly without saving. 

1553 if self.faiss_index is not None and hasattr( 1553 ↛ 1558line 1553 didn't jump to line 1558 because the condition on line 1553 was always true

1554 self.faiss_index, "docstore" 

1555 ): 

1556 existing_ids = set(self.faiss_index.docstore._dict.keys()) 

1557 else: 

1558 existing_ids = None 

1559 new_chunks, new_ids = self._deduplicate_chunks( 

1560 chunks, embedding_ids, existing_ids 

1561 ) 

1562 if new_chunks: 1562 ↛ 1564line 1562 didn't jump to line 1564 because the condition on line 1562 was always true

1563 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

1564 merge_stats = { 

1565 "added": len(new_chunks), 

1566 "skipped": len(chunks) - len(new_chunks), 

1567 "added_ids": new_ids, 

1568 } 

1569 

1570 logger.info( 

1571 f"Successfully indexed local file {file_path} with " 

1572 f"{merge_stats['added']} new chunks " 

1573 f"({merge_stats['skipped']} skipped)" 

1574 ) 

1575 

1576 return { 

1577 "status": "success", 

1578 "chunk_count": merge_stats["added"], 

1579 "embedding_ids": merge_stats["added_ids"], 

1580 } 

1581 

1582 except Exception as e: 

1583 logger.exception(f"Error indexing local file {file_path}") 

1584 return { 

1585 "status": "error", 

1586 "error": f"Operation failed: {type(e).__name__}", 

1587 } 

1588 

1589 def index_user_document( 

1590 self, user_doc, collection_name: str, force_reindex: bool = False 

1591 ) -> Dict[str, Any]: 

1592 """ 

1593 Index a user-uploaded document into a specific collection. 

1594 

1595 Args: 

1596 user_doc: UserDocument object 

1597 collection_name: Name of the collection (e.g., "collection_123") 

1598 force_reindex: Whether to force reindexing 

1599 

1600 Returns: 

1601 Dict with status, chunk_count, and any errors 

1602 """ 

1603 

1604 try: 

1605 # Use the pre-extracted text content 

1606 content = user_doc.text_content 

1607 

1608 if not content or len(content.strip()) < 10: 

1609 return { 

1610 "status": "error", 

1611 "error": "Document has no extractable text content", 

1612 } 

1613 

1614 # Create LangChain Document 

1615 doc = LangchainDocument( 

1616 page_content=content, 

1617 metadata={ 

1618 "source": f"user_upload_{user_doc.id}", 

1619 "source_id": user_doc.id, 

1620 "title": user_doc.filename, 

1621 "document_title": user_doc.filename, 

1622 "file_type": user_doc.file_type, 

1623 "file_size": user_doc.file_size, 

1624 "collection": collection_name, 

1625 }, 

1626 ) 

1627 

1628 # Split into chunks 

1629 chunks = self.text_splitter.split_documents([doc]) 

1630 logger.info( 

1631 f"Split user document {user_doc.filename} into {len(chunks)} chunks" 

1632 ) 

1633 

1634 # Store chunks in database 

1635 embedding_ids = self.embedding_manager._store_chunks_to_db( 

1636 chunks=chunks, 

1637 collection_name=collection_name, 

1638 source_type="user_document", 

1639 source_id=user_doc.id, 

1640 ) 

1641 

1642 # Load or create FAISS index for this collection (lazy; 

1643 # merge step below reloads under the lock anyway). 

1644 if self.faiss_index is None: 

1645 # Extract collection_id from collection_name (format: "collection_<uuid>") 

1646 collection_id = collection_name.removeprefix("collection_") 

1647 self.faiss_index = self.load_or_create_faiss_index( 

1648 collection_id 

1649 ) 

1650 

1651 unique_count = len(set(embedding_ids)) 

1652 batch_dups = len(chunks) - unique_count 

1653 

1654 # Read-modify-write the on-disk FAISS under the lock so 

1655 # concurrent uploads to the same collection don't lose 

1656 # each other's chunks. 

1657 index_path = ( 

1658 Path(self.rag_index_record.index_path) 

1659 if self.rag_index_record 

1660 else None 

1661 ) 

1662 if index_path: 

1663 merge_stats = self._merge_and_persist_locked( 

1664 index_path, 

1665 chunks, 

1666 embedding_ids, 

1667 force_reindex=force_reindex, 

1668 ) 

1669 else: 

1670 # No persistent index path — in-memory only path. 

1671 # Preserve old behavior: handle force_reindex deletion 

1672 # and dedup add against in-memory state without saving. 

1673 if force_reindex and hasattr(self.faiss_index, "docstore"): 

1674 existing_ids = set(self.faiss_index.docstore._dict.keys()) 

1675 old_chunk_ids = list( 

1676 {eid for eid in embedding_ids if eid in existing_ids} 

1677 ) 

1678 if old_chunk_ids: 1678 ↛ 1684line 1678 didn't jump to line 1684 because the condition on line 1678 was always true

1679 logger.info( 

1680 f"Force re-index: removing {len(old_chunk_ids)} " 

1681 f"existing chunks from FAISS" 

1682 ) 

1683 self.faiss_index.delete(old_chunk_ids) 

1684 if not force_reindex and hasattr(self.faiss_index, "docstore"): 

1685 existing_ids = set(self.faiss_index.docstore._dict.keys()) 

1686 else: 

1687 existing_ids = None 

1688 new_chunks, new_ids = self._deduplicate_chunks( 

1689 chunks, embedding_ids, existing_ids 

1690 ) 

1691 if new_chunks: 1691 ↛ 1693line 1691 didn't jump to line 1693 because the condition on line 1691 was always true

1692 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

1693 merge_stats = { 

1694 "added": len(new_chunks), 

1695 "skipped": len(chunks) - len(new_chunks), 

1696 "added_ids": new_ids, 

1697 } 

1698 if merge_stats["added"]: 1698 ↛ 1712line 1698 didn't jump to line 1712 because the condition on line 1698 was always true

1699 if force_reindex: 

1700 logger.info( 

1701 f"Force re-index: added {merge_stats['added']} " 

1702 f"chunks with updated metadata to FAISS index" 

1703 ) 

1704 else: 

1705 already_exist = unique_count - merge_stats["added"] 

1706 logger.info( 

1707 f"Added {merge_stats['added']} new chunks to FAISS " 

1708 f"({already_exist} already exist, " 

1709 f"{batch_dups} batch duplicates removed)" 

1710 ) 

1711 else: 

1712 logger.info( 

1713 f"All {len(chunks)} chunks already exist in FAISS index, skipping" 

1714 ) 

1715 

1716 logger.info( 

1717 f"Successfully indexed user document {user_doc.filename} with {len(chunks)} chunks" 

1718 ) 

1719 

1720 return { 

1721 "status": "success", 

1722 "chunk_count": len(chunks), 

1723 "embedding_ids": embedding_ids, 

1724 } 

1725 

1726 except Exception as e: 

1727 logger.exception( 

1728 f"Error indexing user document {user_doc.filename}" 

1729 ) 

1730 return { 

1731 "status": "error", 

1732 "error": f"Operation failed: {type(e).__name__}", 

1733 } 

1734 

1735 def remove_collection_from_index( 

1736 self, collection_name: str 

1737 ) -> Dict[str, Any]: 

1738 """ 

1739 Remove all documents from a collection from the FAISS index. 

1740 

1741 Args: 

1742 collection_name: Name of the collection (e.g., "collection_123") 

1743 

1744 Returns: 

1745 Dict with status and count of removed chunks 

1746 """ 

1747 from ...database.models import DocumentChunk 

1748 from ...database.session_context import get_user_db_session 

1749 

1750 try: 

1751 with get_user_db_session( 

1752 self.username, self.db_password 

1753 ) as session: 

1754 # Get all chunk IDs for this collection 

1755 chunks = ( 

1756 session.query(DocumentChunk) 

1757 .filter_by(collection_name=collection_name) 

1758 .all() 

1759 ) 

1760 

1761 if not chunks: 

1762 return {"status": "success", "deleted_count": 0} 

1763 

1764 chunk_ids = [ 

1765 f"{collection_name}_{chunk.id}" for chunk in chunks 

1766 ] 

1767 

1768 # Load FAISS index if not already loaded 

1769 if self.faiss_index is None: 

1770 # Extract collection_id from collection_name (format: "collection_<uuid>") 

1771 collection_id = collection_name.removeprefix("collection_") 

1772 self.faiss_index = self.load_or_create_faiss_index( 

1773 collection_id 

1774 ) 

1775 

1776 # Remove from FAISS index. delete + save + record must all 

1777 # be inside the same lock — otherwise a concurrent writer 

1778 # could sandwich a save_local between our delete and our 

1779 # save, leaving stale chunks back on disk (#4197). 

1780 if hasattr(self.faiss_index, "delete"): 1780 ↛ 1807line 1780 didn't jump to line 1807 because the condition on line 1780 was always true

1781 try: 

1782 index_path = ( 

1783 Path(self.rag_index_record.index_path) 

1784 if self.rag_index_record 

1785 else None 

1786 ) 

1787 if index_path: 

1788 with _get_faiss_write_lock( 

1789 self.username, str(index_path) 

1790 ): 

1791 self.faiss_index.delete(chunk_ids) 

1792 self.faiss_index.save_local( 

1793 str(index_path.parent), 

1794 index_name=index_path.stem, 

1795 ) 

1796 self.integrity_manager.record_file( 

1797 index_path, 

1798 related_entity_type="rag_index", 

1799 related_entity_id=self.rag_index_record.id, 

1800 ) 

1801 else: 

1802 # No index path → in-memory-only delete 

1803 self.faiss_index.delete(chunk_ids) 

1804 except Exception: 

1805 logger.warning("Could not delete chunks from FAISS") 

1806 

1807 logger.info( 

1808 f"Removed {len(chunk_ids)} chunks from collection {collection_name}" 

1809 ) 

1810 

1811 return {"status": "success", "deleted_count": len(chunk_ids)} 

1812 

1813 except Exception as e: 

1814 logger.exception( 

1815 f"Error removing collection {collection_name} from index" 

1816 ) 

1817 return { 

1818 "status": "error", 

1819 "error": f"Operation failed: {type(e).__name__}", 

1820 }