Coverage for src / local_deep_research / research_library / services / library_rag_service.py: 27%

431 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Library RAG Service 

3 

4Handles indexing and searching library documents using RAG: 

5- Index text documents into vector database 

6- Chunk documents for semantic search 

7- Generate embeddings using local models 

8- Manage FAISS indices per research 

9- Track RAG status in library 

10""" 

11 

12from pathlib import Path 

13from typing import Any, Dict, List, Optional 

14 

15from langchain_core.documents import Document as LangchainDocument 

16from loguru import logger 

17from sqlalchemy import func 

18 

19from ...config.paths import get_cache_directory 

20from ...database.models.library import ( 

21 Document, 

22 DocumentChunk, 

23 DocumentCollection, 

24 Collection, 

25 RAGIndex, 

26 RagDocumentStatus, 

27 EmbeddingProvider, 

28) 

29from ...database.session_context import get_user_db_session 

30from ...embeddings.splitters import get_text_splitter 

31from ...web_search_engines.engines.search_engine_local import ( 

32 LocalEmbeddingManager, 

33) 

34from ...security.file_integrity import FileIntegrityManager, FAISSIndexVerifier 

35import hashlib 

36from faiss import IndexFlatL2, IndexFlatIP, IndexHNSWFlat 

37from langchain_community.vectorstores import FAISS 

38from langchain_community.docstore.in_memory import InMemoryDocstore 

39 

40 

41class LibraryRAGService: 

42 """Service for managing RAG indexing of library documents.""" 

43 

44 def __init__( 

45 self, 

46 username: str, 

47 embedding_model: str = "all-MiniLM-L6-v2", 

48 embedding_provider: str = "sentence_transformers", 

49 chunk_size: int = 1000, 

50 chunk_overlap: int = 200, 

51 splitter_type: str = "recursive", 

52 text_separators: Optional[list] = None, 

53 distance_metric: str = "cosine", 

54 normalize_vectors: bool = True, 

55 index_type: str = "flat", 

56 embedding_manager: Optional["LocalEmbeddingManager"] = None, 

57 db_password: Optional[str] = None, 

58 ): 

59 """ 

60 Initialize library RAG service for a user. 

61 

62 Args: 

63 username: Username for database access 

64 embedding_model: Name of the embedding model to use 

65 embedding_provider: Provider type ('sentence_transformers' or 'ollama') 

66 chunk_size: Size of text chunks for splitting 

67 chunk_overlap: Overlap between consecutive chunks 

68 splitter_type: Type of splitter ('recursive', 'token', 'sentence', 'semantic') 

69 text_separators: List of text separators for chunking (default: ["\n\n", "\n", ". ", " ", ""]) 

70 distance_metric: Distance metric ('cosine', 'l2', or 'dot_product') 

71 normalize_vectors: Whether to normalize vectors with L2 

72 index_type: FAISS index type ('flat', 'hnsw', or 'ivf') 

73 embedding_manager: Optional pre-constructed LocalEmbeddingManager for testing/flexibility 

74 db_password: Optional database password for background thread access 

75 """ 

76 self.username = username 

77 self._db_password = db_password # Can be used for thread access 

78 # Initialize optional attributes to None before they're set below 

79 # This allows the db_password setter to check them without hasattr 

80 self.embedding_manager = None 

81 self.integrity_manager = None 

82 self.embedding_model = embedding_model 

83 self.embedding_provider = embedding_provider 

84 self.chunk_size = chunk_size 

85 self.chunk_overlap = chunk_overlap 

86 self.splitter_type = splitter_type 

87 self.text_separators = ( 

88 text_separators 

89 if text_separators is not None 

90 else ["\n\n", "\n", ". ", " ", ""] 

91 ) 

92 self.distance_metric = distance_metric 

93 # Ensure normalize_vectors is always a proper boolean 

94 if isinstance(normalize_vectors, str): 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true

95 self.normalize_vectors = normalize_vectors.lower() in ( 

96 "true", 

97 "1", 

98 "yes", 

99 ) 

100 else: 

101 self.normalize_vectors = bool(normalize_vectors) 

102 self.index_type = index_type 

103 

104 # Use provided embedding manager or create a new one 

105 # (Must be created before text splitter for semantic chunking) 

106 if embedding_manager is not None: 

107 self.embedding_manager = embedding_manager 

108 else: 

109 # Initialize embedding manager with library collection 

110 # Load the complete user settings snapshot from database using the proper method 

111 from ...settings.manager import SettingsManager 

112 

113 # Use proper database session for SettingsManager 

114 with get_user_db_session(username) as session: 

115 settings_manager = SettingsManager(session) 

116 settings_snapshot = settings_manager.get_settings_snapshot() 

117 

118 # Add the specific settings needed for this RAG service 

119 settings_snapshot.update( 

120 { 

121 "_username": username, 

122 "embeddings.provider": embedding_provider, 

123 f"embeddings.{embedding_provider}.model": embedding_model, 

124 "local_search_chunk_size": chunk_size, 

125 "local_search_chunk_overlap": chunk_overlap, 

126 } 

127 ) 

128 

129 self.embedding_manager = LocalEmbeddingManager( 

130 embedding_model=embedding_model, 

131 embedding_model_type=embedding_provider, 

132 chunk_size=chunk_size, 

133 chunk_overlap=chunk_overlap, 

134 settings_snapshot=settings_snapshot, 

135 ) 

136 

137 # Initialize text splitter based on type 

138 # (Must be created AFTER embedding_manager for semantic chunking) 

139 self.text_splitter = get_text_splitter( 

140 splitter_type=self.splitter_type, 

141 chunk_size=self.chunk_size, 

142 chunk_overlap=self.chunk_overlap, 

143 text_separators=self.text_separators, 

144 embeddings=self.embedding_manager.embeddings 

145 if self.splitter_type == "semantic" 

146 else None, 

147 ) 

148 

149 # Initialize or load FAISS index for library collection 

150 self.faiss_index = None 

151 self.rag_index_record = None 

152 

153 # Initialize file integrity manager for FAISS indexes 

154 self.integrity_manager = FileIntegrityManager( 

155 username, password=self._db_password 

156 ) 

157 self.integrity_manager.register_verifier(FAISSIndexVerifier()) 

158 

159 @property 

160 def db_password(self): 

161 """Get database password.""" 

162 return self._db_password 

163 

164 @db_password.setter 

165 def db_password(self, value): 

166 """Set database password and propagate to embedding manager and integrity manager.""" 

167 self._db_password = value 

168 if self.embedding_manager: 

169 self.embedding_manager.db_password = value 

170 if self.integrity_manager: 

171 self.integrity_manager.password = value 

172 

173 def _get_index_hash( 

174 self, 

175 collection_name: str, 

176 embedding_model: str, 

177 embedding_model_type: str, 

178 ) -> str: 

179 """Generate hash for index identification.""" 

180 hash_input = ( 

181 f"{collection_name}:{embedding_model}:{embedding_model_type}" 

182 ) 

183 return hashlib.sha256(hash_input.encode()).hexdigest() 

184 

185 def _get_index_path(self, index_hash: str) -> Path: 

186 """Get path for FAISS index file.""" 

187 # Store in centralized cache directory (respects LDR_DATA_DIR) 

188 cache_dir = get_cache_directory() / "rag_indices" 

189 cache_dir.mkdir(parents=True, exist_ok=True) 

190 return cache_dir / f"{index_hash}.faiss" 

191 

192 def _get_or_create_rag_index(self, collection_id: str) -> RAGIndex: 

193 """Get or create RAGIndex record for the current configuration.""" 

194 with get_user_db_session(self.username, self.db_password) as session: 

195 # Use collection_<uuid> format 

196 collection_name = f"collection_{collection_id}" 

197 index_hash = self._get_index_hash( 

198 collection_name, self.embedding_model, self.embedding_provider 

199 ) 

200 

201 # Try to get existing index 

202 rag_index = ( 

203 session.query(RAGIndex).filter_by(index_hash=index_hash).first() 

204 ) 

205 

206 if not rag_index: 

207 # Create new index record 

208 index_path = self._get_index_path(index_hash) 

209 

210 # Get embedding dimension by embedding a test string 

211 test_embedding = self.embedding_manager.embeddings.embed_query( 

212 "test" 

213 ) 

214 embedding_dim = len(test_embedding) 

215 

216 rag_index = RAGIndex( 

217 collection_name=collection_name, 

218 embedding_model=self.embedding_model, 

219 embedding_model_type=EmbeddingProvider( 

220 self.embedding_provider 

221 ), 

222 embedding_dimension=embedding_dim, 

223 index_path=str(index_path), 

224 index_hash=index_hash, 

225 chunk_size=self.chunk_size, 

226 chunk_overlap=self.chunk_overlap, 

227 splitter_type=self.splitter_type, 

228 text_separators=self.text_separators, 

229 distance_metric=self.distance_metric, 

230 normalize_vectors=self.normalize_vectors, 

231 index_type=self.index_type, 

232 chunk_count=0, 

233 total_documents=0, 

234 status="active", 

235 is_current=True, 

236 ) 

237 session.add(rag_index) 

238 session.commit() 

239 session.refresh(rag_index) 

240 logger.info(f"Created new RAG index: {index_hash}") 

241 

242 return rag_index 

243 

244 def load_or_create_faiss_index(self, collection_id: str) -> FAISS: 

245 """ 

246 Load existing FAISS index or create new one. 

247 

248 Args: 

249 collection_id: UUID of the collection 

250 

251 Returns: 

252 FAISS vector store instance 

253 """ 

254 rag_index = self._get_or_create_rag_index(collection_id) 

255 self.rag_index_record = rag_index 

256 

257 index_path = Path(rag_index.index_path) 

258 

259 if index_path.exists(): 259 ↛ 261line 259 didn't jump to line 261 because the condition on line 259 was never true

260 # Verify integrity before loading 

261 verified, reason = self.integrity_manager.verify_file(index_path) 

262 if not verified: 

263 logger.error( 

264 f"Integrity verification failed for {index_path}: {reason}. " 

265 f"Refusing to load. Creating new index." 

266 ) 

267 # Remove corrupted index 

268 try: 

269 index_path.unlink() 

270 logger.info(f"Removed corrupted index file: {index_path}") 

271 except Exception: 

272 logger.exception("Failed to remove corrupted index") 

273 else: 

274 try: 

275 # Check for embedding dimension mismatch before loading 

276 current_dim = len( 

277 self.embedding_manager.embeddings.embed_query( 

278 "dimension_check" 

279 ) 

280 ) 

281 stored_dim = rag_index.embedding_dimension 

282 

283 if stored_dim and current_dim != stored_dim: 

284 logger.warning( 

285 f"Embedding dimension mismatch detected! " 

286 f"Index created with dim={stored_dim}, " 

287 f"current model returns dim={current_dim}. " 

288 f"Deleting old index and rebuilding." 

289 ) 

290 # Delete old index files 

291 try: 

292 index_path.unlink() 

293 pkl_path = index_path.with_suffix(".pkl") 

294 if pkl_path.exists(): 

295 pkl_path.unlink() 

296 logger.info( 

297 f"Deleted old FAISS index files at {index_path}" 

298 ) 

299 except Exception: 

300 logger.exception("Failed to delete old index files") 

301 

302 # Update RAGIndex with new dimension and reset counts 

303 with get_user_db_session( 

304 self.username, self.db_password 

305 ) as session: 

306 idx = ( 

307 session.query(RAGIndex) 

308 .filter_by(id=rag_index.id) 

309 .first() 

310 ) 

311 if idx: 

312 idx.embedding_dimension = current_dim 

313 idx.chunk_count = 0 

314 idx.total_documents = 0 

315 session.commit() 

316 logger.info( 

317 f"Updated RAGIndex dimension to {current_dim}" 

318 ) 

319 

320 # Clear rag_document_status for this index 

321 session.query(RagDocumentStatus).filter_by( 

322 rag_index_id=rag_index.id 

323 ).delete() 

324 session.commit() 

325 logger.info( 

326 "Cleared indexed status for documents in this " 

327 "collection" 

328 ) 

329 

330 # Update local reference for index creation below 

331 rag_index.embedding_dimension = current_dim 

332 # Fall through to create new index below 

333 else: 

334 # Dimensions match (or no stored dimension), load index 

335 faiss_index = FAISS.load_local( 

336 str(index_path.parent), 

337 self.embedding_manager.embeddings, 

338 index_name=index_path.stem, 

339 allow_dangerous_deserialization=True, 

340 normalize_L2=True, 

341 ) 

342 logger.info( 

343 f"Loaded existing FAISS index from {index_path}" 

344 ) 

345 return faiss_index 

346 except Exception as e: 

347 logger.warning( 

348 f"Failed to load FAISS index: {e}, creating new one" 

349 ) 

350 

351 # Create new FAISS index with configurable type and distance metric 

352 logger.info( 

353 f"Creating new FAISS index: type={self.index_type}, metric={self.distance_metric}, dimension={rag_index.embedding_dimension}" 

354 ) 

355 

356 # Create index based on type and distance metric 

357 if self.index_type == "hnsw": 357 ↛ 360line 357 didn't jump to line 360 because the condition on line 357 was never true

358 # HNSW: Fast approximate search, best for large collections 

359 # M=32 is a good default for connections per layer 

360 index = IndexHNSWFlat(rag_index.embedding_dimension, 32) 

361 logger.info("Created HNSW index with M=32 connections") 

362 elif self.index_type == "ivf": 362 ↛ 365line 362 didn't jump to line 365 because the condition on line 362 was never true

363 # IVF requires training, for now fall back to flat 

364 # TODO: Implement IVF with proper training 

365 logger.warning( 

366 "IVF index type not yet fully implemented, using Flat index" 

367 ) 

368 if self.distance_metric in ("cosine", "dot_product"): 

369 index = IndexFlatIP(rag_index.embedding_dimension) 

370 else: 

371 index = IndexFlatL2(rag_index.embedding_dimension) 

372 else: # "flat" or default 

373 # Flat index: Exact search 

374 if self.distance_metric in ("cosine", "dot_product"): 374 ↛ 381line 374 didn't jump to line 381 because the condition on line 374 was always true

375 # For cosine similarity, use inner product (IP) with normalized vectors 

376 index = IndexFlatIP(rag_index.embedding_dimension) 

377 logger.info( 

378 "Created Flat index with Inner Product (for cosine similarity)" 

379 ) 

380 else: # l2 

381 index = IndexFlatL2(rag_index.embedding_dimension) 

382 logger.info("Created Flat index with L2 distance") 

383 

384 faiss_index = FAISS( 

385 self.embedding_manager.embeddings, 

386 index=index, 

387 docstore=InMemoryDocstore(), # Minimal - chunks in DB 

388 index_to_docstore_id={}, 

389 normalize_L2=self.normalize_vectors, # Use configurable normalization 

390 ) 

391 logger.info( 

392 f"FAISS index created with normalization={self.normalize_vectors}" 

393 ) 

394 return faiss_index 

395 

396 def get_current_index_info( 

397 self, collection_id: Optional[str] = None 

398 ) -> Optional[Dict[str, Any]]: 

399 """ 

400 Get information about the current RAG index for a collection. 

401 

402 Args: 

403 collection_id: UUID of collection (defaults to Library if None) 

404 """ 

405 with get_user_db_session(self.username, self.db_password) as session: 

406 # Get collection name in the format stored in RAGIndex (collection_<uuid>) 

407 if collection_id: 

408 collection = ( 

409 session.query(Collection) 

410 .filter_by(id=collection_id) 

411 .first() 

412 ) 

413 collection_name = ( 

414 f"collection_{collection_id}" if collection else "unknown" 

415 ) 

416 else: 

417 # Default to Library collection 

418 from ...database.library_init import get_default_library_id 

419 

420 collection_id = get_default_library_id(self.username) 

421 collection_name = f"collection_{collection_id}" 

422 

423 rag_index = ( 

424 session.query(RAGIndex) 

425 .filter_by(collection_name=collection_name, is_current=True) 

426 .first() 

427 ) 

428 

429 if not rag_index: 

430 # Debug: check all RAG indices for this collection 

431 all_indices = session.query(RAGIndex).all() 

432 logger.info( 

433 f"No RAG index found for collection_name='{collection_name}'. All indices: {[(idx.collection_name, idx.is_current) for idx in all_indices]}" 

434 ) 

435 return None 

436 

437 # Calculate actual counts from rag_document_status table 

438 from ...database.models.library import RagDocumentStatus 

439 

440 actual_chunk_count = ( 

441 session.query(func.sum(RagDocumentStatus.chunk_count)) 

442 .filter_by(collection_id=collection_id) 

443 .scalar() 

444 or 0 

445 ) 

446 

447 actual_doc_count = ( 

448 session.query(RagDocumentStatus) 

449 .filter_by(collection_id=collection_id) 

450 .count() 

451 ) 

452 

453 return { 

454 "embedding_model": rag_index.embedding_model, 

455 "embedding_model_type": rag_index.embedding_model_type.value 

456 if rag_index.embedding_model_type 

457 else None, 

458 "embedding_dimension": rag_index.embedding_dimension, 

459 "chunk_size": rag_index.chunk_size, 

460 "chunk_overlap": rag_index.chunk_overlap, 

461 "chunk_count": actual_chunk_count, 

462 "total_documents": actual_doc_count, 

463 "created_at": rag_index.created_at.isoformat(), 

464 "last_updated_at": rag_index.last_updated_at.isoformat(), 

465 } 

466 

467 def index_document( 

468 self, document_id: str, collection_id: str, force_reindex: bool = False 

469 ) -> Dict[str, Any]: 

470 """ 

471 Index a single document into RAG for a specific collection. 

472 

473 Args: 

474 document_id: UUID of the Document to index 

475 collection_id: UUID of the Collection to index for 

476 force_reindex: Whether to force reindexing even if already indexed 

477 

478 Returns: 

479 Dict with status, chunk_count, and any errors 

480 """ 

481 with get_user_db_session(self.username, self.db_password) as session: 

482 # Get the document 

483 document = session.query(Document).filter_by(id=document_id).first() 

484 

485 if not document: 

486 return {"status": "error", "error": "Document not found"} 

487 

488 # Get or create DocumentCollection entry 

489 all_doc_collections = ( 

490 session.query(DocumentCollection) 

491 .filter_by(document_id=document_id, collection_id=collection_id) 

492 .all() 

493 ) 

494 

495 logger.info( 

496 f"Found {len(all_doc_collections)} DocumentCollection entries for doc={document_id}, coll={collection_id}" 

497 ) 

498 

499 doc_collection = ( 

500 all_doc_collections[0] if all_doc_collections else None 

501 ) 

502 

503 if not doc_collection: 

504 # Create new DocumentCollection entry 

505 doc_collection = DocumentCollection( 

506 document_id=document_id, 

507 collection_id=collection_id, 

508 indexed=False, 

509 chunk_count=0, 

510 ) 

511 session.add(doc_collection) 

512 logger.info( 

513 f"Created new DocumentCollection entry for doc={document_id}, coll={collection_id}" 

514 ) 

515 

516 # Check if already indexed for this collection 

517 if doc_collection.indexed and not force_reindex: 

518 return { 

519 "status": "skipped", 

520 "message": "Document already indexed for this collection", 

521 "chunk_count": doc_collection.chunk_count, 

522 } 

523 

524 # Validate text content 

525 if not document.text_content: 525 ↛ 531line 525 didn't jump to line 531 because the condition on line 525 was always true

526 return { 

527 "status": "error", 

528 "error": "Document has no text content", 

529 } 

530 

531 try: 

532 # Create LangChain Document from text 

533 doc = LangchainDocument( 

534 page_content=document.text_content, 

535 metadata={ 

536 "source": document.original_url, 

537 "document_id": document_id, # Add document ID for source linking 

538 "collection_id": collection_id, # Add collection ID 

539 "title": document.title 

540 or document.filename 

541 or "Untitled", 

542 "document_title": document.title 

543 or document.filename 

544 or "Untitled", # Add for compatibility 

545 "authors": document.authors, 

546 "published_date": str(document.published_date) 

547 if document.published_date 

548 else None, 

549 "doi": document.doi, 

550 "arxiv_id": document.arxiv_id, 

551 "pmid": document.pmid, 

552 "pmcid": document.pmcid, 

553 "extraction_method": document.extraction_method, 

554 "word_count": document.word_count, 

555 }, 

556 ) 

557 

558 # Split into chunks 

559 chunks = self.text_splitter.split_documents([doc]) 

560 logger.info( 

561 f"Split document {document_id} into {len(chunks)} chunks" 

562 ) 

563 

564 # Get collection name for chunk storage 

565 collection = ( 

566 session.query(Collection) 

567 .filter_by(id=collection_id) 

568 .first() 

569 ) 

570 # Use collection_<uuid> format for internal storage 

571 collection_name = ( 

572 f"collection_{collection_id}" if collection else "unknown" 

573 ) 

574 

575 # Store chunks in database using embedding manager 

576 embedding_ids = self.embedding_manager._store_chunks_to_db( 

577 chunks=chunks, 

578 collection_name=collection_name, 

579 source_type="document", 

580 source_id=document_id, 

581 ) 

582 

583 # Load or create FAISS index 

584 if self.faiss_index is None: 

585 self.faiss_index = self.load_or_create_faiss_index( 

586 collection_id 

587 ) 

588 

589 # If force_reindex, remove old chunks from FAISS before adding new ones 

590 if force_reindex: 

591 existing_ids = ( 

592 set(self.faiss_index.docstore._dict.keys()) 

593 if hasattr(self.faiss_index, "docstore") 

594 else set() 

595 ) 

596 old_chunk_ids = [ 

597 eid for eid in embedding_ids if eid in existing_ids 

598 ] 

599 if old_chunk_ids: 

600 logger.info( 

601 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS" 

602 ) 

603 self.faiss_index.delete(old_chunk_ids) 

604 

605 # Filter out chunks that already exist in FAISS (unless force_reindex) 

606 if not force_reindex: 

607 existing_ids = ( 

608 set(self.faiss_index.docstore._dict.keys()) 

609 if hasattr(self.faiss_index, "docstore") 

610 else set() 

611 ) 

612 new_chunks = [] 

613 new_ids = [] 

614 for chunk, chunk_id in zip(chunks, embedding_ids): 

615 if chunk_id not in existing_ids: 

616 new_chunks.append(chunk) 

617 new_ids.append(chunk_id) 

618 else: 

619 # force_reindex: add all chunks 

620 new_chunks = chunks 

621 new_ids = embedding_ids 

622 

623 # Add embeddings to FAISS index 

624 if new_chunks: 

625 if force_reindex: 

626 logger.info( 

627 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index" 

628 ) 

629 else: 

630 logger.info( 

631 f"Adding {len(new_chunks)} new embeddings to FAISS index ({len(chunks) - len(new_chunks)} already exist)" 

632 ) 

633 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

634 else: 

635 logger.info( 

636 f"All {len(chunks)} chunks already exist in FAISS index, skipping" 

637 ) 

638 

639 # Save FAISS index 

640 index_path = Path(self.rag_index_record.index_path) 

641 self.faiss_index.save_local( 

642 str(index_path.parent), index_name=index_path.stem 

643 ) 

644 # Record file integrity 

645 self.integrity_manager.record_file( 

646 index_path, 

647 related_entity_type="rag_index", 

648 related_entity_id=self.rag_index_record.id, 

649 ) 

650 logger.info( 

651 f"Saved FAISS index to {index_path} with integrity tracking" 

652 ) 

653 

654 from datetime import datetime, UTC 

655 from sqlalchemy import text 

656 

657 # Check if document was already indexed (for stats update) 

658 existing_status = ( 

659 session.query(RagDocumentStatus) 

660 .filter_by( 

661 document_id=document_id, collection_id=collection_id 

662 ) 

663 .first() 

664 ) 

665 was_already_indexed = existing_status is not None 

666 

667 # Mark document as indexed using rag_document_status table 

668 # Row existence = indexed, simple and clean 

669 timestamp = datetime.now(UTC) 

670 

671 # Create or update RagDocumentStatus using ORM merge (atomic upsert) 

672 rag_status = RagDocumentStatus( 

673 document_id=document_id, 

674 collection_id=collection_id, 

675 rag_index_id=self.rag_index_record.id, 

676 chunk_count=len(chunks), 

677 indexed_at=timestamp, 

678 ) 

679 session.merge(rag_status) 

680 

681 logger.info( 

682 f"Marked document as indexed in rag_document_status: doc_id={document_id}, coll_id={collection_id}, chunks={len(chunks)}" 

683 ) 

684 

685 # Also update DocumentCollection table for backward compatibility 

686 session.query(DocumentCollection).filter_by( 

687 document_id=document_id, collection_id=collection_id 

688 ).update( 

689 { 

690 "indexed": True, 

691 "chunk_count": len(chunks), 

692 "last_indexed_at": timestamp, 

693 } 

694 ) 

695 

696 logger.info( 

697 "Also updated DocumentCollection.indexed for backward compatibility" 

698 ) 

699 

700 # Update RAGIndex statistics (only if not already indexed) 

701 rag_index_obj = ( 

702 session.query(RAGIndex) 

703 .filter_by(id=self.rag_index_record.id) 

704 .first() 

705 ) 

706 if rag_index_obj and not was_already_indexed: 

707 rag_index_obj.chunk_count += len(chunks) 

708 rag_index_obj.total_documents += 1 

709 rag_index_obj.last_updated_at = datetime.now(UTC) 

710 logger.info( 

711 f"Updated RAGIndex stats: chunk_count +{len(chunks)}, total_documents +1" 

712 ) 

713 

714 # Flush ORM changes to database before commit 

715 session.flush() 

716 logger.info(f"Flushed ORM changes for document {document_id}") 

717 

718 # Commit the transaction 

719 session.commit() 

720 

721 # WAL checkpoint after commit to ensure persistence 

722 session.execute(text("PRAGMA wal_checkpoint(FULL)")) 

723 

724 logger.info( 

725 f"Successfully indexed document {document_id} for collection {collection_id} " 

726 f"with {len(chunks)} chunks" 

727 ) 

728 

729 return { 

730 "status": "success", 

731 "chunk_count": len(chunks), 

732 "embedding_ids": embedding_ids, 

733 } 

734 

735 except Exception as e: 

736 logger.exception( 

737 f"Error indexing document {document_id} for collection {collection_id}: {str(e)}" 

738 ) 

739 return { 

740 "status": "error", 

741 "error": f"Operation failed: {type(e).__name__}", 

742 } 

743 

744 def index_all_documents( 

745 self, 

746 collection_id: str, 

747 force_reindex: bool = False, 

748 progress_callback=None, 

749 ) -> Dict[str, Any]: 

750 """ 

751 Index all documents in a collection into RAG. 

752 

753 Args: 

754 collection_id: UUID of the collection to index 

755 force_reindex: Whether to force reindexing already indexed documents 

756 progress_callback: Optional callback function called after each document with (current, total, doc_title, status) 

757 

758 Returns: 

759 Dict with counts of successful, skipped, and failed documents 

760 """ 

761 with get_user_db_session(self.username, self.db_password) as session: 

762 # Get all DocumentCollection entries for this collection 

763 query = session.query(DocumentCollection).filter_by( 

764 collection_id=collection_id 

765 ) 

766 

767 if not force_reindex: 

768 # Only index documents that haven't been indexed yet 

769 query = query.filter_by(indexed=False) 

770 

771 doc_collections = query.all() 

772 

773 if not doc_collections: 

774 return { 

775 "status": "info", 

776 "message": "No documents to index", 

777 "successful": 0, 

778 "skipped": 0, 

779 "failed": 0, 

780 } 

781 

782 results = {"successful": 0, "skipped": 0, "failed": 0, "errors": []} 

783 total = len(doc_collections) 

784 

785 for idx, doc_collection in enumerate(doc_collections, 1): 

786 # Get the document for title info 

787 document = ( 

788 session.query(Document) 

789 .filter_by(id=doc_collection.document_id) 

790 .first() 

791 ) 

792 title = document.title if document else "Unknown" 

793 

794 result = self.index_document( 

795 doc_collection.document_id, collection_id, force_reindex 

796 ) 

797 

798 if result["status"] == "success": 

799 results["successful"] += 1 

800 elif result["status"] == "skipped": 

801 results["skipped"] += 1 

802 else: 

803 results["failed"] += 1 

804 results["errors"].append( 

805 { 

806 "doc_id": doc_collection.document_id, 

807 "title": title, 

808 "error": result.get("error"), 

809 } 

810 ) 

811 

812 # Call progress callback if provided 

813 if progress_callback: 

814 progress_callback(idx, total, title, result["status"]) 

815 

816 logger.info( 

817 f"Indexed collection {collection_id}: " 

818 f"{results['successful']} successful, " 

819 f"{results['skipped']} skipped, " 

820 f"{results['failed']} failed" 

821 ) 

822 

823 return results 

824 

825 def remove_document_from_rag( 

826 self, document_id: str, collection_id: str 

827 ) -> Dict[str, Any]: 

828 """ 

829 Remove a document's chunks from RAG for a specific collection. 

830 

831 Args: 

832 document_id: UUID of the Document to remove 

833 collection_id: UUID of the Collection to remove from 

834 

835 Returns: 

836 Dict with status and count of removed chunks 

837 """ 

838 with get_user_db_session(self.username, self.db_password) as session: 

839 # Get the DocumentCollection entry 

840 doc_collection = ( 

841 session.query(DocumentCollection) 

842 .filter_by(document_id=document_id, collection_id=collection_id) 

843 .first() 

844 ) 

845 

846 if not doc_collection: 846 ↛ 852line 846 didn't jump to line 852 because the condition on line 846 was always true

847 return { 

848 "status": "error", 

849 "error": "Document not found in collection", 

850 } 

851 

852 try: 

853 # Get collection name in the format collection_<uuid> 

854 collection = ( 

855 session.query(Collection) 

856 .filter_by(id=collection_id) 

857 .first() 

858 ) 

859 # Use collection_<uuid> format for internal storage 

860 collection_name = ( 

861 f"collection_{collection_id}" if collection else "unknown" 

862 ) 

863 

864 # Delete chunks from database 

865 deleted_count = self.embedding_manager._delete_chunks_from_db( 

866 collection_name=collection_name, 

867 source_id=document_id, 

868 ) 

869 

870 # Update DocumentCollection RAG status 

871 doc_collection.indexed = False 

872 doc_collection.chunk_count = 0 

873 doc_collection.last_indexed_at = None 

874 session.commit() 

875 

876 logger.info( 

877 f"Removed {deleted_count} chunks for document {document_id} from collection {collection_id}" 

878 ) 

879 

880 return {"status": "success", "deleted_count": deleted_count} 

881 

882 except Exception as e: 

883 logger.exception( 

884 f"Error removing document {document_id} from collection {collection_id}: {str(e)}" 

885 ) 

886 return { 

887 "status": "error", 

888 "error": f"Operation failed: {type(e).__name__}", 

889 } 

890 

891 def index_documents_batch( 

892 self, 

893 doc_info: List[tuple], 

894 collection_id: str, 

895 force_reindex: bool = False, 

896 ) -> Dict[str, Dict[str, Any]]: 

897 """ 

898 Index multiple documents in a batch for a specific collection. 

899 

900 Args: 

901 doc_info: List of (doc_id, title) tuples 

902 collection_id: UUID of the collection to index for 

903 force_reindex: Whether to force reindexing even if already indexed 

904 

905 Returns: 

906 Dict mapping doc_id to individual result 

907 """ 

908 results = {} 

909 doc_ids = [doc_id for doc_id, _ in doc_info] 

910 

911 # Use single database session for querying 

912 with get_user_db_session(self.username, self.db_password) as session: 

913 # Pre-load all documents for this batch 

914 documents = ( 

915 session.query(Document).filter(Document.id.in_(doc_ids)).all() 

916 ) 

917 

918 # Create lookup for quick access 

919 doc_lookup = {doc.id: doc for doc in documents} 

920 

921 # Pre-load DocumentCollection entries 

922 doc_collections = ( 

923 session.query(DocumentCollection) 

924 .filter( 

925 DocumentCollection.document_id.in_(doc_ids), 

926 DocumentCollection.collection_id == collection_id, 

927 ) 

928 .all() 

929 ) 

930 doc_collection_lookup = { 

931 dc.document_id: dc for dc in doc_collections 

932 } 

933 

934 # Process each document in the batch 

935 for doc_id, title in doc_info: 

936 document = doc_lookup.get(doc_id) 

937 

938 if not document: 938 ↛ 939line 938 didn't jump to line 939 because the condition on line 938 was never true

939 results[doc_id] = { 

940 "status": "error", 

941 "error": "Document not found", 

942 } 

943 continue 

944 

945 # Check if already indexed via DocumentCollection 

946 doc_collection = doc_collection_lookup.get(doc_id) 

947 if ( 947 ↛ 952line 947 didn't jump to line 952 because the condition on line 947 was never true

948 doc_collection 

949 and doc_collection.indexed 

950 and not force_reindex 

951 ): 

952 results[doc_id] = { 

953 "status": "skipped", 

954 "message": "Document already indexed for this collection", 

955 "chunk_count": doc_collection.chunk_count, 

956 } 

957 continue 

958 

959 # Validate text content 

960 if not document.text_content: 960 ↛ 961line 960 didn't jump to line 961 because the condition on line 960 was never true

961 results[doc_id] = { 

962 "status": "error", 

963 "error": "Document has no text content", 

964 } 

965 continue 

966 

967 # Index the document 

968 try: 

969 result = self.index_document( 

970 doc_id, collection_id, force_reindex 

971 ) 

972 results[doc_id] = result 

973 except Exception as e: 

974 logger.exception( 

975 f"Error indexing document {doc_id} in batch" 

976 ) 

977 results[doc_id] = { 

978 "status": "error", 

979 "error": f"Indexing failed: {type(e).__name__}", 

980 } 

981 

982 return results 

983 

984 def get_rag_stats( 

985 self, collection_id: Optional[str] = None 

986 ) -> Dict[str, Any]: 

987 """ 

988 Get RAG statistics for a collection. 

989 

990 Args: 

991 collection_id: UUID of the collection (defaults to Library) 

992 

993 Returns: 

994 Dict with counts and metadata about indexed documents 

995 """ 

996 with get_user_db_session(self.username, self.db_password) as session: 

997 # Get collection ID (default to Library) 

998 if not collection_id: 998 ↛ 999line 998 didn't jump to line 999 because the condition on line 998 was never true

999 from ...database.library_init import get_default_library_id 

1000 

1001 collection_id = get_default_library_id(self.username) 

1002 

1003 # Count total documents in collection 

1004 total_docs = ( 

1005 session.query(DocumentCollection) 

1006 .filter_by(collection_id=collection_id) 

1007 .count() 

1008 ) 

1009 

1010 # Count indexed documents from rag_document_status table 

1011 from ...database.models.library import RagDocumentStatus 

1012 

1013 indexed_docs = ( 

1014 session.query(RagDocumentStatus) 

1015 .filter_by(collection_id=collection_id) 

1016 .count() 

1017 ) 

1018 

1019 # Count total chunks from rag_document_status table 

1020 total_chunks = ( 

1021 session.query(func.sum(RagDocumentStatus.chunk_count)) 

1022 .filter_by(collection_id=collection_id) 

1023 .scalar() 

1024 or 0 

1025 ) 

1026 

1027 # Get collection name in the format stored in DocumentChunk (collection_<uuid>) 

1028 collection = ( 

1029 session.query(Collection).filter_by(id=collection_id).first() 

1030 ) 

1031 collection_name = ( 

1032 f"collection_{collection_id}" if collection else "library" 

1033 ) 

1034 

1035 # Get embedding model info from chunks 

1036 chunk_sample = ( 

1037 session.query(DocumentChunk) 

1038 .filter_by(collection_name=collection_name) 

1039 .first() 

1040 ) 

1041 

1042 embedding_info = {} 

1043 if chunk_sample: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true

1044 embedding_info = { 

1045 "model": chunk_sample.embedding_model, 

1046 "model_type": chunk_sample.embedding_model_type.value 

1047 if chunk_sample.embedding_model_type 

1048 else None, 

1049 "dimension": chunk_sample.embedding_dimension, 

1050 } 

1051 

1052 return { 

1053 "total_documents": total_docs, 

1054 "indexed_documents": indexed_docs, 

1055 "unindexed_documents": total_docs - indexed_docs, 

1056 "total_chunks": total_chunks, 

1057 "embedding_info": embedding_info, 

1058 "chunk_size": self.chunk_size, 

1059 "chunk_overlap": self.chunk_overlap, 

1060 } 

1061 

1062 def index_local_file(self, file_path: str) -> Dict[str, Any]: 

1063 """ 

1064 Index a local file from the filesystem into RAG. 

1065 

1066 Args: 

1067 file_path: Path to the file to index 

1068 

1069 Returns: 

1070 Dict with status, chunk_count, and any errors 

1071 """ 

1072 from pathlib import Path 

1073 import mimetypes 

1074 

1075 file_path = Path(file_path) 

1076 

1077 if not file_path.exists(): 

1078 return {"status": "error", "error": f"File not found: {file_path}"} 

1079 

1080 if not file_path.is_file(): 

1081 return {"status": "error", "error": f"Not a file: {file_path}"} 

1082 

1083 # Determine file type 

1084 mime_type, _ = mimetypes.guess_type(str(file_path)) 

1085 

1086 # Read file content based on type 

1087 try: 

1088 if file_path.suffix.lower() in [".txt", ".md", ".markdown"]: 

1089 # Text files 

1090 with open(file_path, "r", encoding="utf-8") as f: 

1091 content = f.read() 

1092 elif file_path.suffix.lower() in [".html", ".htm"]: 

1093 # HTML files - strip tags 

1094 from bs4 import BeautifulSoup 

1095 

1096 with open(file_path, "r", encoding="utf-8") as f: 

1097 soup = BeautifulSoup(f.read(), "html.parser") 

1098 content = soup.get_text() 

1099 elif file_path.suffix.lower() == ".pdf": 

1100 # PDF files - extract text 

1101 import PyPDF2 

1102 

1103 content = "" 

1104 with open(file_path, "rb") as f: 

1105 pdf_reader = PyPDF2.PdfReader(f) 

1106 for page in pdf_reader.pages: 

1107 content += page.extract_text() 

1108 else: 

1109 return { 

1110 "status": "skipped", 

1111 "error": f"Unsupported file type: {file_path.suffix}", 

1112 } 

1113 

1114 if not content or len(content.strip()) < 10: 

1115 return { 

1116 "status": "error", 

1117 "error": "File has no extractable text content", 

1118 } 

1119 

1120 # Create LangChain Document from text 

1121 doc = LangchainDocument( 

1122 page_content=content, 

1123 metadata={ 

1124 "source": str(file_path), 

1125 "source_id": f"local_{file_path.stem}_{hash(str(file_path))}", 

1126 "title": file_path.stem, 

1127 "document_title": file_path.stem, 

1128 "file_type": file_path.suffix.lower(), 

1129 "file_size": file_path.stat().st_size, 

1130 "source_type": "local_file", 

1131 "collection": "local_library", 

1132 }, 

1133 ) 

1134 

1135 # Split into chunks 

1136 chunks = self.text_splitter.split_documents([doc]) 

1137 logger.info( 

1138 f"Split local file {file_path} into {len(chunks)} chunks" 

1139 ) 

1140 

1141 # Generate unique IDs for chunks 

1142 import hashlib 

1143 

1144 file_hash = hashlib.sha256(str(file_path).encode()).hexdigest()[:8] 

1145 embedding_ids = [ 

1146 f"local_{file_hash}_{i}" for i in range(len(chunks)) 

1147 ] 

1148 

1149 # Store chunks in database 

1150 self.embedding_manager._store_chunks_to_db( 

1151 chunks=chunks, 

1152 collection_name="local_library", 

1153 source_type="local_file", 

1154 source_id=str(file_path), 

1155 ) 

1156 

1157 # Load or create FAISS index 

1158 if self.faiss_index is None: 

1159 self.faiss_index = self.load_or_create_faiss_index() 

1160 

1161 # Add embeddings to FAISS index 

1162 self.faiss_index.add_documents(chunks, ids=embedding_ids) 

1163 

1164 # Save FAISS index 

1165 index_path = ( 

1166 Path(self.rag_index_record.index_path) 

1167 if self.rag_index_record 

1168 else None 

1169 ) 

1170 if index_path: 

1171 self.faiss_index.save_local( 

1172 str(index_path.parent), index_name=index_path.stem 

1173 ) 

1174 # Record file integrity 

1175 self.integrity_manager.record_file( 

1176 index_path, 

1177 related_entity_type="rag_index", 

1178 related_entity_id=self.rag_index_record.id, 

1179 ) 

1180 logger.info( 

1181 f"Saved FAISS index to {index_path} with integrity tracking" 

1182 ) 

1183 

1184 logger.info( 

1185 f"Successfully indexed local file {file_path} with {len(chunks)} chunks" 

1186 ) 

1187 

1188 return { 

1189 "status": "success", 

1190 "chunk_count": len(chunks), 

1191 "embedding_ids": embedding_ids, 

1192 } 

1193 

1194 except Exception as e: 

1195 logger.exception(f"Error indexing local file {file_path}: {str(e)}") 

1196 return { 

1197 "status": "error", 

1198 "error": f"Operation failed: {type(e).__name__}", 

1199 } 

1200 

1201 def index_user_document( 

1202 self, user_doc, collection_name: str, force_reindex: bool = False 

1203 ) -> Dict[str, Any]: 

1204 """ 

1205 Index a user-uploaded document into a specific collection. 

1206 

1207 Args: 

1208 user_doc: UserDocument object 

1209 collection_name: Name of the collection (e.g., "collection_123") 

1210 force_reindex: Whether to force reindexing 

1211 

1212 Returns: 

1213 Dict with status, chunk_count, and any errors 

1214 """ 

1215 

1216 try: 

1217 # Use the pre-extracted text content 

1218 content = user_doc.text_content 

1219 

1220 if not content or len(content.strip()) < 10: 

1221 return { 

1222 "status": "error", 

1223 "error": "Document has no extractable text content", 

1224 } 

1225 

1226 # Create LangChain Document 

1227 doc = LangchainDocument( 

1228 page_content=content, 

1229 metadata={ 

1230 "source": f"user_upload_{user_doc.id}", 

1231 "source_id": user_doc.id, 

1232 "title": user_doc.filename, 

1233 "document_title": user_doc.filename, 

1234 "file_type": user_doc.file_type, 

1235 "file_size": user_doc.file_size, 

1236 "collection": collection_name, 

1237 }, 

1238 ) 

1239 

1240 # Split into chunks 

1241 chunks = self.text_splitter.split_documents([doc]) 

1242 logger.info( 

1243 f"Split user document {user_doc.filename} into {len(chunks)} chunks" 

1244 ) 

1245 

1246 # Store chunks in database 

1247 embedding_ids = self.embedding_manager._store_chunks_to_db( 

1248 chunks=chunks, 

1249 collection_name=collection_name, 

1250 source_type="user_document", 

1251 source_id=user_doc.id, 

1252 ) 

1253 

1254 # Load or create FAISS index for this collection 

1255 if self.faiss_index is None: 

1256 self.faiss_index = self.load_or_create_faiss_index() 

1257 

1258 # If force_reindex, remove old chunks from FAISS before adding new ones 

1259 if force_reindex: 

1260 existing_ids = ( 

1261 set(self.faiss_index.docstore._dict.keys()) 

1262 if hasattr(self.faiss_index, "docstore") 

1263 else set() 

1264 ) 

1265 old_chunk_ids = [ 

1266 eid for eid in embedding_ids if eid in existing_ids 

1267 ] 

1268 if old_chunk_ids: 

1269 logger.info( 

1270 f"Force re-index: removing {len(old_chunk_ids)} existing chunks from FAISS" 

1271 ) 

1272 self.faiss_index.delete(old_chunk_ids) 

1273 

1274 # Filter out chunks that already exist in FAISS (unless force_reindex) 

1275 if not force_reindex: 

1276 existing_ids = ( 

1277 set(self.faiss_index.docstore._dict.keys()) 

1278 if hasattr(self.faiss_index, "docstore") 

1279 else set() 

1280 ) 

1281 new_chunks = [] 

1282 new_ids = [] 

1283 for chunk, chunk_id in zip(chunks, embedding_ids): 

1284 if chunk_id not in existing_ids: 

1285 new_chunks.append(chunk) 

1286 new_ids.append(chunk_id) 

1287 else: 

1288 # force_reindex: add all chunks 

1289 new_chunks = chunks 

1290 new_ids = embedding_ids 

1291 

1292 # Add embeddings to FAISS index 

1293 if new_chunks: 

1294 if force_reindex: 

1295 logger.info( 

1296 f"Force re-index: adding {len(new_chunks)} chunks with updated metadata to FAISS index" 

1297 ) 

1298 else: 

1299 logger.info( 

1300 f"Adding {len(new_chunks)} new chunks to FAISS index ({len(chunks) - len(new_chunks)} already exist)" 

1301 ) 

1302 self.faiss_index.add_documents(new_chunks, ids=new_ids) 

1303 else: 

1304 logger.info( 

1305 f"All {len(chunks)} chunks already exist in FAISS index, skipping" 

1306 ) 

1307 

1308 # Save FAISS index 

1309 index_path = ( 

1310 Path(self.rag_index_record.index_path) 

1311 if self.rag_index_record 

1312 else None 

1313 ) 

1314 if index_path: 

1315 self.faiss_index.save_local( 

1316 str(index_path.parent), index_name=index_path.stem 

1317 ) 

1318 # Record file integrity 

1319 self.integrity_manager.record_file( 

1320 index_path, 

1321 related_entity_type="rag_index", 

1322 related_entity_id=self.rag_index_record.id, 

1323 ) 

1324 

1325 logger.info( 

1326 f"Successfully indexed user document {user_doc.filename} with {len(chunks)} chunks" 

1327 ) 

1328 

1329 return { 

1330 "status": "success", 

1331 "chunk_count": len(chunks), 

1332 "embedding_ids": embedding_ids, 

1333 } 

1334 

1335 except Exception as e: 

1336 logger.exception( 

1337 f"Error indexing user document {user_doc.filename}: {str(e)}" 

1338 ) 

1339 return { 

1340 "status": "error", 

1341 "error": f"Operation failed: {type(e).__name__}", 

1342 } 

1343 

1344 def remove_collection_from_index( 

1345 self, collection_name: str 

1346 ) -> Dict[str, Any]: 

1347 """ 

1348 Remove all documents from a collection from the FAISS index. 

1349 

1350 Args: 

1351 collection_name: Name of the collection (e.g., "collection_123") 

1352 

1353 Returns: 

1354 Dict with status and count of removed chunks 

1355 """ 

1356 from ...database.models import DocumentChunk 

1357 from ...database.session_context import get_user_db_session 

1358 

1359 try: 

1360 with get_user_db_session( 

1361 self.username, self.db_password 

1362 ) as session: 

1363 # Get all chunk IDs for this collection 

1364 chunks = ( 

1365 session.query(DocumentChunk) 

1366 .filter_by(collection_name=collection_name) 

1367 .all() 

1368 ) 

1369 

1370 if not chunks: 

1371 return {"status": "success", "deleted_count": 0} 

1372 

1373 chunk_ids = [ 

1374 f"{collection_name}_{chunk.id}" for chunk in chunks 

1375 ] 

1376 

1377 # Load FAISS index if not already loaded 

1378 if self.faiss_index is None: 

1379 self.faiss_index = self.load_or_create_faiss_index() 

1380 

1381 # Remove from FAISS index 

1382 if hasattr(self.faiss_index, "delete"): 

1383 try: 

1384 self.faiss_index.delete(chunk_ids) 

1385 

1386 # Save updated index 

1387 index_path = ( 

1388 Path(self.rag_index_record.index_path) 

1389 if self.rag_index_record 

1390 else None 

1391 ) 

1392 if index_path: 

1393 self.faiss_index.save_local( 

1394 str(index_path.parent), 

1395 index_name=index_path.stem, 

1396 ) 

1397 # Record file integrity 

1398 self.integrity_manager.record_file( 

1399 index_path, 

1400 related_entity_type="rag_index", 

1401 related_entity_id=self.rag_index_record.id, 

1402 ) 

1403 except Exception as e: 

1404 logger.warning( 

1405 f"Could not delete chunks from FAISS: {e}" 

1406 ) 

1407 

1408 logger.info( 

1409 f"Removed {len(chunk_ids)} chunks from collection {collection_name}" 

1410 ) 

1411 

1412 return {"status": "success", "deleted_count": len(chunk_ids)} 

1413 

1414 except Exception as e: 

1415 logger.exception( 

1416 f"Error removing collection {collection_name} from index: {str(e)}" 

1417 ) 

1418 return { 

1419 "status": "error", 

1420 "error": f"Operation failed: {type(e).__name__}", 

1421 } 

1422 

1423 def search_library( 

1424 self, query: str, limit: int = 10, score_threshold: float = 0.0 

1425 ) -> List[Dict[str, Any]]: 

1426 """ 

1427 Search library documents using semantic search. 

1428 

1429 Args: 

1430 query: Search query 

1431 limit: Maximum number of results 

1432 score_threshold: Minimum similarity score 

1433 

1434 Returns: 

1435 List of results with content, metadata, and similarity scores 

1436 """ 

1437 # This will be implemented when we integrate with the search system 

1438 # For now, raise NotImplementedError 

1439 raise NotImplementedError( 

1440 "Library search will be implemented in the search integration phase" 

1441 )