Coverage for src / local_deep_research / web_search_engines / engines / search_engine_local.py: 60%

563 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1import hashlib 

2import json 

3import os 

4import threading 

5import time 

6import uuid 

7from concurrent.futures import ProcessPoolExecutor 

8from datetime import UTC, datetime 

9from pathlib import Path 

10from typing import Any, Dict, Iterable, List, Optional 

11 

12import numpy as np 

13from faiss import IndexFlatL2 

14from langchain_community.docstore.in_memory import InMemoryDocstore 

15from langchain_community.document_loaders import TextLoader 

16from langchain_community.embeddings import ( 

17 HuggingFaceEmbeddings, 

18) 

19from langchain_community.vectorstores import FAISS 

20from langchain_core.document_loaders import BaseLoader 

21from langchain_core.documents import Document 

22from langchain_core.language_models import BaseLLM 

23from langchain_text_splitters import RecursiveCharacterTextSplitter 

24from loguru import logger 

25 

26from ...config import search_config 

27from ...config.paths import get_cache_directory 

28from ...database.models.library import DocumentChunk 

29from ...database.session_context import get_user_db_session 

30from ...document_loaders import get_loader_for_path, is_extension_supported 

31from ...utilities.url_utils import normalize_url 

32from ..search_engine_base import BaseSearchEngine 

33 

34 

35def _get_file_loader(file_path: str) -> Optional[BaseLoader]: 

36 """Get an appropriate document loader for a file based on its extension. 

37 

38 Uses the centralized document_loaders registry which supports 35+ file formats. 

39 """ 

40 file_path_obj = Path(file_path) 

41 extension = file_path_obj.suffix.lower() 

42 

43 # Check if extension is supported by the registry 

44 if is_extension_supported(extension): 

45 loader = get_loader_for_path(file_path) 

46 if loader: 

47 return loader 

48 

49 # Fallback to TextLoader for unknown extensions 

50 logger.warning(f"Unknown file extension for {file_path}, trying TextLoader") 

51 try: 

52 return TextLoader( 

53 str(file_path), encoding="utf-8", autodetect_encoding=True 

54 ) 

55 except Exception: 

56 logger.exception(f"Error creating loader for {file_path}") 

57 return None 

58 

59 

60def _load_document(file_path: Path) -> List[Document]: 

61 """ 

62 Loads documents from a file. 

63 

64 Args: 

65 file_path: The path to the document to load. 

66 

67 Returns: 

68 The loaded documents, or an empty list if it failed to load. 

69 

70 """ 

71 # Get a loader for this file 

72 loader = _get_file_loader(str(file_path)) 

73 

74 if loader is None: 

75 # No loader for this filetype. 

76 return [] 

77 

78 try: 

79 # Load the document 

80 docs = loader.load() 

81 

82 # Add source path metadata and ID. 

83 for doc in docs: 

84 doc.metadata["source"] = str(file_path) 

85 doc.metadata["filename"] = file_path.name 

86 

87 except Exception: 

88 logger.exception(f"Error loading {file_path}") 

89 return [] 

90 

91 return docs 

92 

93 

94class LocalEmbeddingManager: 

95 """Handles embedding generation and storage for local document search""" 

96 

97 def __init__( 

98 self, 

99 embedding_model: str = "all-MiniLM-L6-v2", 

100 embedding_device: str = "cpu", 

101 embedding_model_type: str = "sentence_transformers", # or 'ollama' 

102 ollama_base_url: Optional[str] = None, 

103 chunk_size: int = 1000, 

104 chunk_overlap: int = 200, 

105 cache_dir: Optional[str] = None, 

106 settings_snapshot: Optional[Dict[str, Any]] = None, 

107 ): 

108 """ 

109 Initialize the embedding manager for local document search. 

110 

111 Args: 

112 embedding_model: Name of the embedding model to use 

113 embedding_device: Device to run embeddings on ('cpu' or 'cuda') 

114 embedding_model_type: Type of embedding model ('sentence_transformers' or 'ollama') 

115 ollama_base_url: Base URL for Ollama API if using ollama embeddings 

116 chunk_size: Size of text chunks for splitting documents 

117 chunk_overlap: Overlap between chunks 

118 cache_dir: Directory to store embedding cache and index. 

119 If None, uses the app's configured cache directory. 

120 settings_snapshot: Optional settings snapshot for background threads 

121 """ 

122 

123 self.embedding_model = embedding_model 

124 self.embedding_device = embedding_device 

125 self.embedding_model_type = embedding_model_type 

126 self.ollama_base_url = ollama_base_url 

127 self.chunk_size = chunk_size 

128 self.chunk_overlap = chunk_overlap 

129 # Use configured cache directory if not specified 

130 if cache_dir is None: 

131 self.cache_dir = get_cache_directory() / "local_search" 

132 else: 

133 self.cache_dir = Path(cache_dir) 

134 self.settings_snapshot = settings_snapshot or {} 

135 

136 # Username for database access (extracted from settings if available) 

137 self.username = ( 

138 settings_snapshot.get("_username") if settings_snapshot else None 

139 ) 

140 # Password for encrypted database access (can be set later) 

141 self.db_password = None 

142 

143 # Create cache directory if it doesn't exist 

144 self.cache_dir.mkdir(parents=True, exist_ok=True) 

145 

146 # Initialize the embedding model (with lock for thread-safe lazy init) 

147 self._embeddings = None 

148 self._embedding_lock = threading.Lock() 

149 

150 # Initialize the text splitter 

151 self.text_splitter = RecursiveCharacterTextSplitter( 

152 chunk_size=chunk_size, chunk_overlap=chunk_overlap 

153 ) 

154 

155 # Track indexed folders and their metadata 

156 self.indexed_folders = self._load_indexed_folders() 

157 

158 # Vector store cache 

159 self.vector_stores = {} 

160 

161 # Track if this manager has been closed 

162 self._closed = False 

163 

164 def close(self): 

165 """Release embedding model resources.""" 

166 if self._closed: 

167 return 

168 self._closed = True 

169 # Clear embedding model reference to allow garbage collection 

170 self._embeddings = None 

171 # Clear vector store cache 

172 self.vector_stores.clear() 

173 logger.debug("LocalEmbeddingManager closed") 

174 

175 def __enter__(self): 

176 """Context manager entry.""" 

177 return self 

178 

179 def __exit__(self, exc_type, exc_val, exc_tb): 

180 """Context manager exit - ensures resources are released.""" 

181 self.close() 

182 return False 

183 

184 @property 

185 def embeddings(self): 

186 """ 

187 Lazily initialize embeddings when first accessed. 

188 This allows the LocalEmbeddingManager to be created without 

189 immediately loading models, which is helpful when no local search is performed. 

190 

191 Uses double-checked locking to ensure thread-safe initialization. 

192 Concurrent SentenceTransformer model loading causes meta tensor errors 

193 in PyTorch when multiple threads call model.to(device) simultaneously. 

194 """ 

195 if self._embeddings is None: 

196 with self._embedding_lock: 

197 if self._embeddings is None: 

198 logger.info("Initializing embeddings on first use") 

199 self._embeddings = self._initialize_embeddings() 

200 return self._embeddings 

201 

202 def _initialize_embeddings(self): 

203 """Initialize the embedding model based on configuration""" 

204 try: 

205 # Use the new unified embedding system 

206 from ...embeddings import get_embeddings 

207 

208 # Prepare kwargs for provider-specific parameters 

209 kwargs = {} 

210 

211 # Add device for sentence transformers 

212 if self.embedding_model_type == "sentence_transformers": 212 ↛ 216line 212 didn't jump to line 216 because the condition on line 212 was always true

213 kwargs["device"] = self.embedding_device 

214 

215 # Add base_url for ollama if specified 

216 if self.embedding_model_type == "ollama" and self.ollama_base_url: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 kwargs["base_url"] = normalize_url(self.ollama_base_url) 

218 

219 logger.info( 

220 f"Initializing embeddings with provider={self.embedding_model_type}, model={self.embedding_model}" 

221 ) 

222 

223 return get_embeddings( 

224 provider=self.embedding_model_type, 

225 model=self.embedding_model, 

226 settings_snapshot=self.settings_snapshot, 

227 **kwargs, 

228 ) 

229 except Exception: 

230 logger.exception("Error initializing embeddings") 

231 logger.warning( 

232 "Falling back to HuggingFaceEmbeddings with all-MiniLM-L6-v2" 

233 ) 

234 return HuggingFaceEmbeddings( 

235 model_name="sentence-transformers/all-MiniLM-L6-v2" 

236 ) 

237 

238 def _store_chunks_to_db( 

239 self, 

240 chunks: List[Document], 

241 collection_name: str, 

242 source_path: Optional[str] = None, 

243 source_id: Optional[int] = None, 

244 source_type: str = "local_file", 

245 ) -> List[str]: 

246 """ 

247 Store document chunks in the database. 

248 

249 Args: 

250 chunks: List of LangChain Document chunks 

251 collection_name: Name of the collection (e.g., 'personal_notes', 'library') 

252 source_path: Path to source file (for local files) 

253 source_id: ID of source document (for library documents) 

254 source_type: Type of source ('local_file' or 'library') 

255 

256 Returns: 

257 List of chunk embedding IDs (UUIDs) for FAISS mapping 

258 """ 

259 if not self.username: 

260 logger.warning( 

261 "No username available, cannot store chunks in database" 

262 ) 

263 return [] 

264 

265 chunk_ids = [] 

266 

267 try: 

268 with get_user_db_session( 

269 self.username, self.db_password 

270 ) as session: 

271 for idx, chunk in enumerate(chunks): 

272 # Generate unique hash for chunk 

273 chunk_text = chunk.page_content 

274 chunk_hash = hashlib.sha256(chunk_text.encode()).hexdigest() 

275 

276 # Generate unique embedding ID 

277 embedding_id = uuid.uuid4().hex 

278 

279 # Extract metadata 

280 metadata = chunk.metadata or {} 

281 document_title = metadata.get( 

282 "filename", metadata.get("title", "Unknown") 

283 ) 

284 

285 # Calculate word count 

286 word_count = len(chunk_text.split()) 

287 

288 # Get character positions from metadata if available 

289 start_char = metadata.get("start_char", 0) 

290 end_char = metadata.get("end_char", len(chunk_text)) 

291 

292 # Check if chunk already exists 

293 existing_chunk = ( 

294 session.query(DocumentChunk) 

295 .filter_by(chunk_hash=chunk_hash) 

296 .first() 

297 ) 

298 

299 if existing_chunk: 

300 # Update existing chunk 

301 existing_chunk.last_accessed = datetime.now(UTC) 

302 chunk_ids.append(existing_chunk.embedding_id) 

303 logger.debug( 

304 f"Chunk already exists, reusing: {existing_chunk.embedding_id}" 

305 ) 

306 else: 

307 # Create new chunk 

308 db_chunk = DocumentChunk( 

309 chunk_hash=chunk_hash, 

310 source_type=source_type, 

311 source_id=source_id, 

312 source_path=str(source_path) 

313 if source_path 

314 else None, 

315 collection_name=collection_name, 

316 chunk_text=chunk_text, 

317 chunk_index=idx, 

318 start_char=start_char, 

319 end_char=end_char, 

320 word_count=word_count, 

321 embedding_id=embedding_id, 

322 embedding_model=self.embedding_model, 

323 embedding_model_type=self.embedding_model_type, 

324 document_title=document_title, 

325 document_metadata=metadata, 

326 ) 

327 session.add(db_chunk) 

328 chunk_ids.append(embedding_id) 

329 

330 session.commit() 

331 logger.info( 

332 f"Stored {len(chunk_ids)} chunks to database for collection '{collection_name}'" 

333 ) 

334 

335 except Exception: 

336 logger.exception( 

337 f"Error storing chunks to database for collection '{collection_name}'" 

338 ) 

339 return [] 

340 

341 return chunk_ids 

342 

343 def _load_chunks_from_db( 

344 self, chunk_ids: List[str], username: Optional[str] = None 

345 ) -> List[Dict[str, Any]]: 

346 """ 

347 Load chunks from database by their embedding IDs. 

348 

349 Args: 

350 chunk_ids: List of embedding IDs to load 

351 username: Username for database access (uses self.username if not provided) 

352 

353 Returns: 

354 List of chunk dictionaries with content and metadata 

355 """ 

356 username = username or self.username 

357 if not username: 

358 logger.warning( 

359 "No username available, cannot load chunks from database" 

360 ) 

361 return [] 

362 

363 chunks = [] 

364 

365 try: 

366 with get_user_db_session(username) as session: 

367 db_chunks = ( 

368 session.query(DocumentChunk) 

369 .filter(DocumentChunk.embedding_id.in_(chunk_ids)) 

370 .all() 

371 ) 

372 

373 for db_chunk in db_chunks: 

374 # Update last accessed time 

375 db_chunk.last_accessed = datetime.now(UTC) 

376 

377 chunks.append( 

378 { 

379 "id": db_chunk.embedding_id, 

380 "content": db_chunk.chunk_text, 

381 "metadata": { 

382 "source_type": db_chunk.source_type, 

383 "source_path": db_chunk.source_path, 

384 "source_id": db_chunk.source_id, 

385 "collection": db_chunk.collection_name, 

386 "chunk_index": db_chunk.chunk_index, 

387 "word_count": db_chunk.word_count, 

388 "title": db_chunk.document_title, 

389 **db_chunk.document_metadata, 

390 }, 

391 } 

392 ) 

393 

394 session.commit() # Commit the last_accessed updates 

395 

396 except Exception: 

397 logger.exception("Error loading chunks from database") 

398 return [] 

399 

400 return chunks 

401 

402 def _delete_chunks_from_db( 

403 self, 

404 collection_name: str, 

405 source_path: Optional[str] = None, 

406 source_id: Optional[int] = None, 

407 ) -> int: 

408 """ 

409 Delete chunks from database. 

410 

411 Args: 

412 collection_name: Name of the collection 

413 source_path: Path to source file (for local files) 

414 source_id: ID of source document (for library documents) 

415 

416 Returns: 

417 Number of chunks deleted 

418 """ 

419 if not self.username: 

420 logger.warning( 

421 "No username available, cannot delete chunks from database" 

422 ) 

423 return 0 

424 

425 try: 

426 with get_user_db_session( 

427 self.username, self.db_password 

428 ) as session: 

429 query = session.query(DocumentChunk).filter_by( 

430 collection_name=collection_name 

431 ) 

432 

433 if source_path: 

434 query = query.filter_by(source_path=str(source_path)) 

435 if source_id: 

436 query = query.filter_by(source_id=source_id) 

437 

438 count = query.delete() 

439 session.commit() 

440 

441 logger.info( 

442 f"Deleted {count} chunks from database for collection '{collection_name}'" 

443 ) 

444 return count 

445 

446 except Exception: 

447 logger.exception( 

448 f"Error deleting chunks from database for collection '{collection_name}'" 

449 ) 

450 return 0 

451 

452 def _load_or_create_vector_store(self): 

453 """Load the vector store from disk or create it if needed""" 

454 vector_store_path = self._get_vector_store_path() 

455 

456 # Check if vector store exists and is up to date 

457 if vector_store_path.exists() and not self._check_folders_modified(): 

458 logger.info( 

459 f"Loading existing vector store from {vector_store_path}" 

460 ) 

461 try: 

462 vector_store = FAISS.load_local( 

463 str(vector_store_path), 

464 self.embeddings, 

465 allow_dangerous_deserialization=True, 

466 ) 

467 

468 # Add this code to show document count 

469 doc_count = len(vector_store.index_to_docstore_id) 

470 logger.info(f"Loaded index with {doc_count} document chunks") 

471 

472 return vector_store 

473 except Exception: 

474 logger.exception("Error loading vector store") 

475 logger.info("Will create a new vector store") 

476 

477 # Create a new vector store 

478 return self._create_vector_store() 

479 

480 def _load_indexed_folders(self) -> Dict[str, Dict[str, Any]]: 

481 """Load metadata about indexed folders from disk""" 

482 index_metadata_path = self.cache_dir / "index_metadata.json" 

483 

484 if index_metadata_path.exists(): 

485 try: 

486 with open(index_metadata_path, "r") as f: 

487 return json.load(f) 

488 except Exception: 

489 logger.exception("Error loading index metadata") 

490 

491 return {} 

492 

493 def _save_indexed_folders(self): 

494 """Save metadata about indexed folders to disk""" 

495 index_metadata_path = self.cache_dir / "index_metadata.json" 

496 

497 try: 

498 with open(index_metadata_path, "w") as f: 

499 json.dump(self.indexed_folders, f, indent=2) 

500 except Exception: 

501 logger.exception("Error saving index metadata") 

502 

503 @staticmethod 

504 def get_folder_hash(folder_path: Path) -> str: 

505 """Generate a hash for a folder based on its path""" 

506 # Canonicalize the path so we don't have weird Windows vs. Linux 

507 # problems or issues with trailing slashes. 

508 canonical_folder_path = "/".join(folder_path.parts) 

509 return hashlib.md5( # DevSkim: ignore DS126858 

510 canonical_folder_path.encode(), usedforsecurity=False 

511 ).hexdigest() 

512 

513 def _get_index_path(self, folder_path: Path) -> Path: 

514 """Get the path where the index for a specific folder should be stored""" 

515 folder_hash = self.get_folder_hash(folder_path) 

516 return self.cache_dir / f"index_{folder_hash}" 

517 

518 def _check_folder_modified(self, folder_path: Path) -> bool: 

519 """Check if a folder has been modified since it was last indexed""" 

520 

521 @staticmethod 

522 def _get_all_files(folder_path: Path) -> Iterable[Path]: 

523 """ 

524 Gets all the files, recursively, in a folder. 

525 

526 Args: 

527 folder_path: The path to the folder. 

528 

529 Yields: 

530 Each of the files in the folder. 

531 

532 """ 

533 for root, _, files in os.walk(folder_path): 

534 for file in files: 

535 yield Path(root) / file 

536 

537 def _get_modified_files(self, folder_path: Path) -> List[Path]: 

538 """ 

539 Gets the files in a folder that have been modified since it was last 

540 indexed. 

541 

542 Args: 

543 folder_path: The path to the folder to check. 

544 

545 Returns: 

546 A list of the files that were modified. 

547 

548 """ 

549 if not folder_path.exists() or not folder_path.is_dir(): 

550 return [] 

551 

552 folder_hash = self.get_folder_hash(folder_path) 

553 

554 if folder_hash not in self.indexed_folders: 

555 # If folder has never been indexed, everything has been modified. 

556 last_indexed = 0 

557 indexed_files = set() 

558 else: 

559 last_indexed = self.indexed_folders[folder_hash].get( 

560 "last_indexed", 0 

561 ) 

562 indexed_files = ( 

563 self.indexed_folders[folder_hash] 

564 .get("indexed_files", {}) 

565 .keys() 

566 ) 

567 

568 # Check if any file in the folder has been modified since last indexing 

569 modified_files = [] 

570 for file_path in self._get_all_files(folder_path): 

571 file_stats = file_path.stat() 

572 if file_stats.st_mtime > last_indexed: 

573 modified_files.append(file_path) 

574 elif str(file_path.relative_to(folder_path)) not in indexed_files: 

575 # This file somehow never got indexed. 

576 modified_files.append(file_path) 

577 

578 return modified_files 

579 

580 def _check_config_changed(self, folder_path: Path) -> bool: 

581 """ 

582 Checks if the embedding configuration for a folder has been changed 

583 since it was last indexed. 

584 """ 

585 folder_hash = self.get_folder_hash(folder_path) 

586 

587 if folder_hash not in self.indexed_folders: 

588 # It hasn't been indexed at all. That's a new configuration, 

589 # technically. 

590 return True 

591 

592 embedding_config = self.indexed_folders[folder_hash] 

593 chunk_size = int(embedding_config.get("chunk_size", 0)) 

594 chunk_overlap = int(embedding_config.get("chunk_overlap", 0)) 

595 embedding_model = embedding_config.get("embedding_model", "") 

596 

597 if (chunk_size, chunk_overlap, embedding_model) != ( 

598 self.chunk_size, 

599 self.chunk_overlap, 

600 self.embedding_model, 

601 ): 

602 logger.info( 

603 "Embedding configuration has changed, re-indexing folder." 

604 ) 

605 return True 

606 return False 

607 

608 def index_folder( 

609 self, folder_path: str, force_reindex: bool = False 

610 ) -> bool: 

611 """ 

612 Index all documents in a folder for vector search. 

613 

614 Args: 

615 folder_path: Path to the folder to index 

616 force_reindex: Whether to force reindexing even if unchanged 

617 

618 Returns: 

619 bool: True if indexing was successful, False otherwise 

620 """ 

621 folder_path = Path(folder_path) 

622 

623 # Validate folder 

624 if not folder_path.exists(): 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true

625 logger.error(f"Folder not found: {folder_path}") 

626 return False 

627 

628 if not folder_path.is_dir(): 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true

629 logger.error(f"Path is not a directory: {folder_path}") 

630 return False 

631 

632 folder_str = str(folder_path) 

633 folder_hash = self.get_folder_hash(folder_path) 

634 index_path = self._get_index_path(folder_path) 

635 

636 if force_reindex or self._check_config_changed(folder_path): 636 ↛ 641line 636 didn't jump to line 641 because the condition on line 636 was always true

637 logger.info(f"Re-indexing entire folder: {folder_path}") 

638 modified_files = list(self._get_all_files(folder_path)) 

639 else: 

640 # Just re-index the modified files if we can get away with it. 

641 modified_files = self._get_modified_files(folder_path) 

642 logger.info(f"Re-indexing {len(modified_files)} modified files...") 

643 

644 # Load the vector store from disk if not already loaded 

645 if folder_hash not in self.vector_stores and index_path.exists(): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true

646 try: 

647 self.vector_stores[folder_hash] = FAISS.load_local( 

648 str(index_path), 

649 self.embeddings, 

650 allow_dangerous_deserialization=True, 

651 ) 

652 logger.info(f"Loaded index for {folder_path} from disk") 

653 except Exception: 

654 logger.exception(f"Error loading index for {folder_path}") 

655 # If loading fails, force reindexing 

656 force_reindex = True 

657 

658 logger.info(f"Indexing folder: {folder_path}") 

659 start_time = time.time() 

660 

661 # Find documents to index 

662 all_docs = [] 

663 

664 # Remove hidden files and directories. 

665 modified_files = [ 

666 p 

667 for p in modified_files 

668 if not p.name.startswith(".") 

669 and not any(part.startswith(".") for part in p.parts) 

670 ] 

671 # Index them. 

672 with ProcessPoolExecutor() as executor: 

673 all_docs_nested = executor.map(_load_document, modified_files) 

674 # Flatten the result. 

675 for docs in all_docs_nested: 675 ↛ 676line 675 didn't jump to line 676 because the loop on line 675 never started

676 all_docs.extend(docs) 

677 

678 if force_reindex or folder_hash not in self.vector_stores: 678 ↛ 693line 678 didn't jump to line 693 because the condition on line 678 was always true

679 logger.info(f"Creating new index for {folder_path}") 

680 # Embed a test query to figure out embedding length. 

681 test_embedding = self.embeddings.embed_query("hello world") 

682 index = IndexFlatL2(len(test_embedding)) 

683 # Use minimal docstore - chunks are stored in database 

684 self.vector_stores[folder_hash] = FAISS( 

685 self.embeddings, 

686 index=index, 

687 docstore=InMemoryDocstore(), # Minimal - just for FAISS compatibility 

688 index_to_docstore_id={}, 

689 normalize_L2=True, 

690 ) 

691 

692 # Split documents into chunks 

693 logger.info(f"Splitting {len(all_docs)} documents into chunks") 

694 splits = self.text_splitter.split_documents(all_docs) 

695 logger.info( 

696 f"Created {len(splits)} chunks from {len(modified_files)} files" 

697 ) 

698 

699 # Store chunks in database and get embedding IDs 

700 embedding_ids = [] 

701 if splits: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true

702 logger.info(f"Storing {len(splits)} chunks in database") 

703 # Get collection name from folder path (last folder name) 

704 collection_name = folder_path.name 

705 

706 # Store chunks to database 

707 embedding_ids = self._store_chunks_to_db( 

708 chunks=splits, 

709 collection_name=collection_name, 

710 source_type="local_file", 

711 ) 

712 

713 logger.info(f"Adding {len(splits)} chunks to FAISS index") 

714 # Add embeddings to FAISS using the database-generated IDs 

715 self.vector_stores[folder_hash].add_documents( 

716 splits, ids=embedding_ids 

717 ) 

718 

719 # Update indexing time for individual files. 

720 index_time = time.time() 

721 indexed_files = {} 

722 if folder_hash in self.indexed_folders: 722 ↛ 723line 722 didn't jump to line 723 because the condition on line 722 was never true

723 indexed_files = ( 

724 self.indexed_folders[folder_hash] 

725 .get("indexed_files", {}) 

726 .copy() 

727 ) 

728 for embedding_id, split in zip(embedding_ids, splits, strict=False): 728 ↛ 729line 728 didn't jump to line 729 because the loop on line 728 never started

729 split_source = str( 

730 Path(split.metadata["source"]).relative_to(folder_path) 

731 ) 

732 id_list = indexed_files.setdefault(split_source, []) 

733 id_list.append(embedding_id) 

734 

735 # Check for any files that were removed and remove them from the 

736 # vector store and database. 

737 delete_ids = [] 

738 delete_paths = [] 

739 for relative_path, chunk_ids in indexed_files.items(): 739 ↛ 740line 739 didn't jump to line 740 because the loop on line 739 never started

740 if not (folder_path / Path(relative_path)).exists(): 

741 delete_ids.extend(chunk_ids) 

742 delete_paths.append(relative_path) 

743 if delete_ids: 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true

744 logger.info( 

745 f"Deleting {len(delete_paths)} non-existent files from the " 

746 f"index and database." 

747 ) 

748 # Delete from FAISS index 

749 self.vector_stores[folder_hash].delete(delete_ids) 

750 

751 # Delete from database 

752 collection_name = folder_path.name 

753 for delete_path in delete_paths: 

754 full_path = str(folder_path / delete_path) 

755 deleted_count = self._delete_chunks_from_db( 

756 collection_name=collection_name, 

757 source_path=full_path, 

758 ) 

759 logger.debug( 

760 f"Deleted {deleted_count} chunks for {delete_path} from database" 

761 ) 

762 for path in delete_paths: 762 ↛ 763line 762 didn't jump to line 763 because the loop on line 762 never started

763 del indexed_files[path] 

764 

765 # Save the vector store to disk 

766 logger.info(f"Saving index to {index_path}") 

767 self.vector_stores[folder_hash].save_local(str(index_path)) 

768 

769 # Update metadata 

770 self.indexed_folders[folder_hash] = { 

771 "path": folder_str, 

772 "last_indexed": index_time, 

773 "file_count": len(modified_files), 

774 "chunk_count": len(splits), 

775 "embedding_model": self.embedding_model, 

776 "chunk_size": self.chunk_size, 

777 "chunk_overlap": self.chunk_overlap, 

778 "indexed_files": indexed_files, 

779 } 

780 

781 # Save updated metadata 

782 self._save_indexed_folders() 

783 

784 elapsed_time = time.time() - start_time 

785 logger.info( 

786 f"Indexed {len(modified_files)} files in {elapsed_time:.2f} seconds" 

787 ) 

788 

789 return True 

790 

791 def search( 

792 self, 

793 query: str, 

794 folder_paths: List[str], 

795 limit: int = 10, 

796 score_threshold: float = 0.0, 

797 ) -> List[Dict[str, Any]]: 

798 """ 

799 Search for documents relevant to a query across specified folders. 

800 

801 Args: 

802 query: The search query 

803 folder_paths: List of folder paths to search in 

804 limit: Maximum number of results to return 

805 score_threshold: Minimum similarity score threshold 

806 

807 Returns: 

808 List of results with document content and metadata 

809 """ 

810 folder_paths = [Path(p) for p in folder_paths] 

811 

812 # Add detailed debugging for each folder 

813 for folder_path in folder_paths: 

814 folder_hash = self.get_folder_hash(folder_path) 

815 index_path = self._get_index_path(folder_path) 

816 

817 logger.info(f"Diagnostic for {folder_path}:") 

818 logger.info(f" - Folder hash: {folder_hash}") 

819 logger.info(f" - Index path: {index_path}") 

820 logger.info(f" - Index exists on disk: {index_path.exists()}") 

821 logger.info( 

822 f" - Is in indexed_folders: {folder_hash in self.indexed_folders}" 

823 ) 

824 

825 if folder_hash in self.indexed_folders: 

826 meta = self.indexed_folders[folder_hash] 

827 logger.info( 

828 f" - Metadata: file_count={meta.get('file_count', 0)}, chunk_count={meta.get('chunk_count', 0)}" 

829 ) 

830 

831 # Validate folders exist 

832 valid_folder_paths = [] 

833 for path in folder_paths: 

834 if path.exists() and path.is_dir(): 

835 valid_folder_paths.append(path) 

836 else: 

837 logger.warning( 

838 f"Skipping non-existent folder in search: {path}" 

839 ) 

840 

841 # If no valid folders, return empty results 

842 if not valid_folder_paths: 

843 logger.warning(f"No valid folders to search among: {folder_paths}") 

844 return [] 

845 

846 all_results = [] 

847 

848 for folder_path in valid_folder_paths: 

849 folder_hash = self.get_folder_hash(folder_path) 

850 

851 # Skip folders that haven't been indexed 

852 if folder_hash not in self.indexed_folders: 

853 logger.warning(f"Folder {folder_path} has not been indexed") 

854 continue 

855 

856 # Make sure the vector store is loaded 

857 if folder_hash not in self.vector_stores: 

858 index_path = self._get_index_path(folder_path) 

859 try: 

860 self.vector_stores[folder_hash] = FAISS.load_local( 

861 str(index_path), 

862 self.embeddings, 

863 allow_dangerous_deserialization=True, 

864 ) 

865 except Exception: 

866 logger.exception(f"Error loading index for {folder_path}") 

867 continue 

868 

869 # Search in this folder 

870 vector_store = self.vector_stores[folder_hash] 

871 

872 try: 

873 # Get query embedding 

874 query_vector = self.embeddings.embed_query(query) 

875 

876 # Search FAISS index for similar vectors 

877 # Returns: (distances, indices) where indices are FAISS internal indices 

878 distances, indices = vector_store.index.search( 

879 np.array([query_vector], dtype=np.float32), limit 

880 ) 

881 

882 # Convert distances to similarity scores (L2 distance -> similarity) 

883 # For L2: smaller distance = more similar 

884 # Convert to similarity: 1 / (1 + distance) 

885 similarities = 1 / (1 + distances[0]) 

886 

887 # Get embedding IDs from FAISS mapping 

888 embedding_ids = [] 

889 valid_indices = [] 

890 for idx, faiss_idx in enumerate(indices[0]): 

891 if faiss_idx == -1: # FAISS returns -1 for empty results 

892 continue 

893 if faiss_idx in vector_store.index_to_docstore_id: 

894 embedding_id = vector_store.index_to_docstore_id[ 

895 faiss_idx 

896 ] 

897 embedding_ids.append(embedding_id) 

898 valid_indices.append(idx) 

899 

900 # Load chunks from database 

901 if embedding_ids: 

902 db_chunks = self._load_chunks_from_db( 

903 embedding_ids, self.username 

904 ) 

905 

906 # Create results from database chunks 

907 for idx, chunk in zip(valid_indices, db_chunks): 

908 similarity = float(similarities[idx]) 

909 

910 # Skip results below the threshold 

911 if similarity < score_threshold: 

912 continue 

913 

914 # Extract metadata from chunk 

915 metadata = chunk.get("document_metadata", {}) 

916 if "source" not in metadata and chunk.get( 

917 "source_path" 

918 ): 

919 metadata["source"] = chunk["source_path"] 

920 

921 result = { 

922 "content": chunk["chunk_text"], 

923 "metadata": metadata, 

924 "similarity": similarity, 

925 "folder": folder_path, 

926 } 

927 

928 all_results.append(result) 

929 except Exception: 

930 logger.exception(f"Error searching in {folder_path}") 

931 

932 # Sort by similarity (highest first) 

933 all_results.sort(key=lambda x: x["similarity"], reverse=True) 

934 

935 # Limit to the requested number 

936 return all_results[:limit] 

937 

938 def clear_cache(self): 

939 """Clear all cached vector stores from memory (not disk)""" 

940 self.vector_stores.clear() 

941 

942 def get_indexed_folders_info(self) -> List[Dict[str, Any]]: 

943 """Get information about all indexed folders""" 

944 info = [] 

945 

946 for folder_hash, metadata in self.indexed_folders.items(): 

947 folder_info = metadata.copy() 

948 

949 # Add formatted last indexed time 

950 if "last_indexed" in folder_info: 950 ↛ 956line 950 didn't jump to line 956 because the condition on line 950 was always true

951 folder_info["last_indexed_formatted"] = datetime.fromtimestamp( 

952 folder_info["last_indexed"] 

953 ).strftime("%Y-%m-%d %H:%M:%S") 

954 

955 # Check if index file exists 

956 index_path = self._get_index_path(Path(folder_info["path"])) 

957 folder_info["index_exists"] = index_path.exists() 

958 

959 info.append(folder_info) 

960 

961 return info 

962 

963 

964class LocalSearchEngine(BaseSearchEngine): 

965 """Local document search engine with two-phase retrieval""" 

966 

967 def __init__( 

968 self, 

969 paths: List[str], 

970 llm: Optional[BaseLLM] = None, 

971 max_results: int = 10, 

972 max_filtered_results: Optional[int] = None, 

973 embedding_model: str = "all-MiniLM-L6-v2", 

974 embedding_device: str = "cpu", 

975 embedding_model_type: str = "sentence_transformers", 

976 ollama_base_url: Optional[str] = None, 

977 force_reindex: bool = False, 

978 chunk_size: int = 1000, 

979 chunk_overlap: int = 200, 

980 cache_dir: Optional[str] = None, 

981 collections: Optional[Dict[str, Dict[str, Any]]] = None, 

982 name: str = "", 

983 description: str = "", 

984 ): 

985 """ 

986 Initialize the local search engine. 

987 

988 Args: 

989 paths: List of folder paths to search in 

990 llm: Language model for relevance filtering 

991 max_results: Maximum number of results to return 

992 max_filtered_results: Maximum results after filtering 

993 embedding_model: Name of the embedding model to use 

994 embedding_device: Device to run embeddings on ('cpu' or 'cuda') 

995 embedding_model_type: Type of embedding model 

996 ollama_base_url: Base URL for Ollama API 

997 force_reindex: Whether to force reindexing 

998 chunk_size: Size of text chunks for splitting documents 

999 chunk_overlap: Overlap between chunks 

1000 cache_dir: Directory to store embedding cache and index 

1001 collections: Dictionary of named collections with paths and descriptions 

1002 name: Human-readable name of the collection we are searching. 

1003 description: Human-readable description of the collection we are 

1004 searching. 

1005 """ 

1006 # Initialize the base search engine 

1007 super().__init__(llm=llm, max_filtered_results=max_filtered_results) 

1008 

1009 self.name = name 

1010 self.description = description 

1011 

1012 # Validate folder paths 

1013 self.folder_paths = paths 

1014 self.valid_folder_paths = [] 

1015 for path_str in paths: 

1016 path = Path(path_str) 

1017 if path.exists() and path.is_dir(): 

1018 self.valid_folder_paths.append(path_str) 

1019 else: 

1020 logger.warning( 

1021 f"Folder not found or is not a directory: {path_str}" 

1022 ) 

1023 

1024 # If no valid folders, log a clear message 

1025 if not self.valid_folder_paths and paths: 

1026 logger.warning(f"No valid folders found among: {paths}") 

1027 logger.warning( 

1028 "This search engine will return no results until valid folders are configured" 

1029 ) 

1030 

1031 self.max_results = max_results 

1032 self.collections = collections or { 

1033 "default": {"paths": paths, "description": "Default collection"} 

1034 } 

1035 

1036 # Initialize the embedding manager with only valid folders 

1037 self.embedding_manager = LocalEmbeddingManager( 

1038 embedding_model=embedding_model, 

1039 embedding_device=embedding_device, 

1040 embedding_model_type=embedding_model_type, 

1041 ollama_base_url=ollama_base_url, 

1042 chunk_size=chunk_size, 

1043 chunk_overlap=chunk_overlap, 

1044 cache_dir=cache_dir, 

1045 settings_snapshot=self.settings_snapshot, 

1046 ) 

1047 

1048 # Index all folders 

1049 self._index_folders(force_reindex) 

1050 

1051 def _index_folders(self, force_reindex: bool = False): 

1052 """Index all valid configured folders""" 

1053 indexed = [] 

1054 failed = [] 

1055 skipped = [] 

1056 

1057 # Keep track of invalid folders 

1058 for folder in self.folder_paths: 

1059 if folder not in self.valid_folder_paths: 

1060 skipped.append(folder) 

1061 continue 

1062 

1063 success = self.embedding_manager.index_folder(folder, force_reindex) 

1064 if success: 1064 ↛ 1067line 1064 didn't jump to line 1067 because the condition on line 1064 was always true

1065 indexed.append(folder) 

1066 else: 

1067 failed.append(folder) 

1068 

1069 if indexed: 

1070 logger.info( 

1071 f"Successfully indexed {len(indexed)} folders: {', '.join(indexed)}" 

1072 ) 

1073 

1074 if failed: 1074 ↛ 1075line 1074 didn't jump to line 1075 because the condition on line 1074 was never true

1075 logger.warning( 

1076 f"Failed to index {len(failed)} folders: {', '.join(failed)}" 

1077 ) 

1078 

1079 if skipped: 

1080 logger.warning( 

1081 f"Skipped {len(skipped)} invalid folders: {', '.join(skipped)}" 

1082 ) 

1083 

1084 def _get_previews( 

1085 self, query: str, collection_names: Optional[List[str]] = None 

1086 ) -> List[Dict[str, Any]]: 

1087 """ 

1088 Get preview information for documents matching the query. 

1089 

1090 Args: 

1091 query: The search query 

1092 collection_names: Specific collections to search within (if None, search all) 

1093 

1094 Returns: 

1095 List of preview dictionaries 

1096 """ 

1097 # Determine which collections to search 

1098 if collection_names: 

1099 # Search only in specified collections 

1100 collections_to_search = { 

1101 name: self.collections[name] 

1102 for name in collection_names 

1103 if name in self.collections 

1104 } 

1105 if not collections_to_search: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true

1106 logger.warning( 

1107 f"No valid collections found among: {collection_names}" 

1108 ) 

1109 return [] 

1110 else: 

1111 # Search in all collections 

1112 collections_to_search = self.collections 

1113 

1114 # Extract all folder paths from the collections to search 

1115 search_paths = [] 

1116 for collection_config in collections_to_search.values(): 

1117 if "paths" in collection_config: 1117 ↛ 1116line 1117 didn't jump to line 1116 because the condition on line 1117 was always true

1118 search_paths.extend(collection_config["paths"]) 

1119 

1120 logger.info( 

1121 f"Searching local documents in collections: {list(collections_to_search.keys())}" 

1122 ) 

1123 

1124 # Filter out invalid paths 

1125 valid_search_paths = [ 

1126 path for path in search_paths if path in self.valid_folder_paths 

1127 ] 

1128 

1129 if not valid_search_paths: 

1130 logger.warning( 

1131 f"No valid folders to search in collections: {list(collections_to_search.keys())}" 

1132 ) 

1133 return [] 

1134 

1135 # Search across the valid selected folders 

1136 raw_results = self.embedding_manager.search( 

1137 query=query, 

1138 folder_paths=valid_search_paths, 

1139 limit=self.max_results, 

1140 score_threshold=0.1, # Skip very low relevance results 

1141 ) 

1142 

1143 if not raw_results: 

1144 logger.info(f"No local documents found for query: {query}") 

1145 return [] 

1146 

1147 # Convert to preview format 

1148 previews = [] 

1149 for i, result in enumerate(raw_results): 

1150 # Create a unique ID 

1151 result_id = f"local-{i}-{hashlib.md5(result['content'][:50].encode(), usedforsecurity=False).hexdigest()}" # DevSkim: ignore DS126858 

1152 

1153 # Extract filename and path 

1154 source_path = result["metadata"].get("source", "Unknown") 

1155 filename = result["metadata"].get( 

1156 "filename", Path(source_path).name 

1157 ) 

1158 

1159 # Create preview snippet (first ~200 chars of content) 

1160 snippet = ( 

1161 result["content"][:200] + "..." 

1162 if len(result["content"]) > 200 

1163 else result["content"] 

1164 ) 

1165 

1166 # Determine which collection this document belongs to 

1167 collection_name = "Unknown" 

1168 folder_path = result["folder"] 

1169 for name, collection in self.collections.items(): 1169 ↛ 1177line 1169 didn't jump to line 1177 because the loop on line 1169 didn't complete

1170 if any( 1170 ↛ 1169line 1170 didn't jump to line 1169 because the condition on line 1170 was always true

1171 folder_path.is_relative_to(path) 

1172 for path in collection.get("paths", []) 

1173 ): 

1174 break 

1175 

1176 # Format the preview 

1177 preview = { 

1178 "id": result_id, 

1179 "title": filename, 

1180 "snippet": snippet, 

1181 "link": source_path, 

1182 "similarity": result["similarity"], 

1183 "folder": folder_path.as_posix(), 

1184 "collection": collection_name, 

1185 "collection_description": self.collections.get( 

1186 collection_name, {} 

1187 ).get("description", ""), 

1188 "_full_content": result[ 

1189 "content" 

1190 ], # Store full content for later 

1191 "_metadata": result["metadata"], # Store metadata for later 

1192 } 

1193 

1194 previews.append(preview) 

1195 

1196 logger.info(f"Found {len(previews)} local document matches") 

1197 return previews 

1198 

1199 def _get_full_content( 

1200 self, relevant_items: List[Dict[str, Any]] 

1201 ) -> List[Dict[str, Any]]: 

1202 """ 

1203 Get full content for the relevant documents. 

1204 For local search, the full content is already available. 

1205 

1206 Args: 

1207 relevant_items: List of relevant preview dictionaries 

1208 

1209 Returns: 

1210 List of result dictionaries with full content 

1211 """ 

1212 # Check if we should add full content 

1213 if ( 

1214 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

1215 and search_config.SEARCH_SNIPPETS_ONLY 

1216 ): 

1217 logger.info("Snippet-only mode, skipping full content addition") 

1218 return relevant_items 

1219 

1220 # For local search, we already have the full content 

1221 results = [] 

1222 for item in relevant_items: 

1223 # Create a copy with full content 

1224 result = item.copy() 

1225 

1226 # Add full content if we have it 

1227 if "_full_content" in item: 1227 ↛ 1236line 1227 didn't jump to line 1236 because the condition on line 1227 was always true

1228 result["content"] = item["_full_content"] 

1229 result["full_content"] = item["_full_content"] 

1230 

1231 # Remove temporary fields 

1232 if "_full_content" in result: 1232 ↛ 1236line 1232 didn't jump to line 1236 because the condition on line 1232 was always true

1233 del result["_full_content"] 

1234 

1235 # Add metadata if we have it 

1236 if "_metadata" in item: 1236 ↛ 1243line 1236 didn't jump to line 1243 because the condition on line 1236 was always true

1237 result["document_metadata"] = item["_metadata"] 

1238 

1239 # Remove temporary fields 

1240 if "_metadata" in result: 1240 ↛ 1243line 1240 didn't jump to line 1243 because the condition on line 1240 was always true

1241 del result["_metadata"] 

1242 

1243 results.append(result) 

1244 

1245 return results 

1246 

1247 def run( 

1248 self, 

1249 query: str, 

1250 research_context: Dict[str, Any] | None = None, 

1251 collection_names: Optional[List[str]] = None, 

1252 ) -> List[Dict[str, Any]]: 

1253 """ 

1254 Execute a search using the two-phase approach. 

1255 

1256 Args: 

1257 query: The search query 

1258 research_context: Context from previous research to use. 

1259 collection_names: Specific collections to search within (if None, search all) 

1260 

1261 Returns: 

1262 List of search result dictionaries with full content 

1263 """ 

1264 logger.info("---Execute a search using Local Documents---") 

1265 

1266 # Check if we have any special collection parameters in the query 

1267 collection_prefix = "collection:" 

1268 remaining_query = query 

1269 specified_collections = [] 

1270 

1271 # Parse query for collection specifications like "collection:research_papers query terms" 

1272 query_parts = query.split() 

1273 for part in query_parts: 

1274 if part.lower().startswith(collection_prefix): 

1275 collection_name = part[len(collection_prefix) :].strip() 

1276 if collection_name in self.collections: 1276 ↛ 1273line 1276 didn't jump to line 1273 because the condition on line 1276 was always true

1277 specified_collections.append(collection_name) 

1278 # Remove this part from the query 

1279 remaining_query = remaining_query.replace( 

1280 part, "", 1 

1281 ).strip() 

1282 

1283 # If collections were specified in the query, they override the parameter 

1284 if specified_collections: 

1285 collection_names = specified_collections 

1286 query = remaining_query 

1287 

1288 # Phase 1: Get previews (with collection filtering) 

1289 previews = self._get_previews(query, collection_names) 

1290 

1291 if not previews: 

1292 return [] 

1293 

1294 # Phase 2: Filter for relevance 

1295 relevant_items = self._filter_for_relevance(previews, query) 

1296 

1297 if not relevant_items: 1297 ↛ 1298line 1297 didn't jump to line 1298 because the condition on line 1297 was never true

1298 return [] 

1299 

1300 # Phase 3: Get full content for relevant items 

1301 if ( 1301 ↛ 1305line 1301 didn't jump to line 1305 because the condition on line 1301 was never true

1302 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

1303 and search_config.SEARCH_SNIPPETS_ONLY 

1304 ): 

1305 logger.info("Returning snippet-only results as per config") 

1306 results = relevant_items 

1307 else: 

1308 results = self._get_full_content(relevant_items) 

1309 

1310 # Clean up temporary data 

1311 self.embedding_manager.clear_cache() 

1312 

1313 return results 

1314 

1315 def get_collections_info(self) -> List[Dict[str, Any]]: 

1316 """ 

1317 Get information about all collections, including indexing status. 

1318 

1319 Returns: 

1320 List of collection information dictionaries 

1321 """ 

1322 collections_info = [] 

1323 

1324 for name, collection in self.collections.items(): 

1325 paths = collection.get("paths", []) 

1326 paths = [Path(p) for p in paths] 

1327 description = collection.get("description", "") 

1328 

1329 # Get indexing information for each path 

1330 paths_info = [] 

1331 for path in paths: 

1332 # Check if folder exists 

1333 exists = path.exists() and path.is_dir() 

1334 

1335 # Check if folder is indexed 

1336 folder_hash = self.embedding_manager.get_folder_hash(path) 

1337 indexed = folder_hash in self.embedding_manager.indexed_folders 

1338 

1339 # Get index details if available 

1340 index_info = {} 

1341 if indexed: 1341 ↛ 1342line 1341 didn't jump to line 1342 because the condition on line 1341 was never true

1342 index_info = self.embedding_manager.indexed_folders[ 

1343 folder_hash 

1344 ].copy() 

1345 

1346 paths_info.append( 

1347 { 

1348 "path": path, 

1349 "exists": exists, 

1350 "indexed": indexed, 

1351 "index_info": index_info, 

1352 } 

1353 ) 

1354 

1355 collections_info.append( 

1356 { 

1357 "name": name, 

1358 "description": description, 

1359 "paths": paths, 

1360 "paths_info": paths_info, 

1361 "document_count": sum( 

1362 info.get("index_info", {}).get("file_count", 0) 

1363 for info in paths_info 

1364 ), 

1365 "chunk_count": sum( 

1366 info.get("index_info", {}).get("chunk_count", 0) 

1367 for info in paths_info 

1368 ), 

1369 "all_indexed": all( 

1370 info["indexed"] for info in paths_info if info["exists"] 

1371 ), 

1372 } 

1373 ) 

1374 

1375 return collections_info 

1376 

1377 def reindex_collection(self, collection_name: str) -> bool: 

1378 """ 

1379 Reindex a specific collection. 

1380 

1381 Args: 

1382 collection_name: Name of the collection to reindex 

1383 

1384 Returns: 

1385 True if reindexing was successful, False otherwise 

1386 """ 

1387 if collection_name not in self.collections: 

1388 logger.error(f"Collection '{collection_name}' not found") 

1389 return False 

1390 

1391 paths = self.collections[collection_name].get("paths", []) 

1392 success = True 

1393 

1394 for path in paths: 

1395 if not self.embedding_manager.index_folder( 1395 ↛ 1398line 1395 didn't jump to line 1398 because the condition on line 1395 was never true

1396 path, force_reindex=True 

1397 ): 

1398 success = False 

1399 

1400 return success 

1401 

1402 @classmethod 

1403 def from_config( 

1404 cls, config_dict: Dict[str, Any], llm: Optional[BaseLLM] = None 

1405 ) -> "LocalSearchEngine": 

1406 """ 

1407 Create a LocalSearchEngine instance from a configuration dictionary. 

1408 

1409 Args: 

1410 config_dict: Configuration dictionary 

1411 llm: Language model for relevance filtering 

1412 

1413 Returns: 

1414 Initialized LocalSearchEngine instance 

1415 """ 

1416 # Required parameters 

1417 folder_paths = [] 

1418 collections = config_dict.get("collections", {}) 

1419 

1420 # Extract all folder paths from collections 

1421 for collection_config in collections.values(): 

1422 if "paths" in collection_config: 1422 ↛ 1421line 1422 didn't jump to line 1421 because the condition on line 1422 was always true

1423 folder_paths.extend(collection_config["paths"]) 

1424 

1425 # Fall back to folder_paths if no collections defined 

1426 if not folder_paths: 

1427 folder_paths = config_dict.get("folder_paths", []) 

1428 # Create a default collection if using folder_paths 

1429 if folder_paths: 1429 ↛ 1438line 1429 didn't jump to line 1438 because the condition on line 1429 was always true

1430 collections = { 

1431 "default": { 

1432 "paths": folder_paths, 

1433 "description": "Default collection", 

1434 } 

1435 } 

1436 

1437 # Optional parameters with defaults 

1438 max_results = config_dict.get("max_results", 10) 

1439 max_filtered_results = config_dict.get("max_filtered_results") 

1440 embedding_model = config_dict.get("embedding_model", "all-MiniLM-L6-v2") 

1441 embedding_device = config_dict.get("embedding_device", "cpu") 

1442 embedding_model_type = config_dict.get( 

1443 "embedding_model_type", "sentence_transformers" 

1444 ) 

1445 ollama_base_url = config_dict.get("ollama_base_url") 

1446 force_reindex = config_dict.get("force_reindex", False) 

1447 chunk_size = config_dict.get("chunk_size", 1000) 

1448 chunk_overlap = config_dict.get("chunk_overlap", 200) 

1449 cache_dir = config_dict.get( 

1450 "cache_dir" 

1451 ) # None uses app's cache directory 

1452 

1453 return cls( 

1454 paths=folder_paths, 

1455 collections=collections, 

1456 llm=llm, 

1457 max_results=max_results, 

1458 max_filtered_results=max_filtered_results, 

1459 embedding_model=embedding_model, 

1460 embedding_device=embedding_device, 

1461 embedding_model_type=embedding_model_type, 

1462 ollama_base_url=ollama_base_url, 

1463 force_reindex=force_reindex, 

1464 chunk_size=chunk_size, 

1465 chunk_overlap=chunk_overlap, 

1466 cache_dir=cache_dir, 

1467 )