Coverage for src / local_deep_research / web_search_engines / engines / search_engine_local.py: 60%
563 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1import hashlib
2import json
3import os
4import threading
5import time
6import uuid
7from concurrent.futures import ProcessPoolExecutor
8from datetime import UTC, datetime
9from pathlib import Path
10from typing import Any, Dict, Iterable, List, Optional
12import numpy as np
13from faiss import IndexFlatL2
14from langchain_community.docstore.in_memory import InMemoryDocstore
15from langchain_community.document_loaders import TextLoader
16from langchain_community.embeddings import (
17 HuggingFaceEmbeddings,
18)
19from langchain_community.vectorstores import FAISS
20from langchain_core.document_loaders import BaseLoader
21from langchain_core.documents import Document
22from langchain_core.language_models import BaseLLM
23from langchain_text_splitters import RecursiveCharacterTextSplitter
24from loguru import logger
26from ...config import search_config
27from ...config.paths import get_cache_directory
28from ...database.models.library import DocumentChunk
29from ...database.session_context import get_user_db_session
30from ...document_loaders import get_loader_for_path, is_extension_supported
31from ...utilities.url_utils import normalize_url
32from ..search_engine_base import BaseSearchEngine
35def _get_file_loader(file_path: str) -> Optional[BaseLoader]:
36 """Get an appropriate document loader for a file based on its extension.
38 Uses the centralized document_loaders registry which supports 35+ file formats.
39 """
40 file_path_obj = Path(file_path)
41 extension = file_path_obj.suffix.lower()
43 # Check if extension is supported by the registry
44 if is_extension_supported(extension):
45 loader = get_loader_for_path(file_path)
46 if loader:
47 return loader
49 # Fallback to TextLoader for unknown extensions
50 logger.warning(f"Unknown file extension for {file_path}, trying TextLoader")
51 try:
52 return TextLoader(
53 str(file_path), encoding="utf-8", autodetect_encoding=True
54 )
55 except Exception:
56 logger.exception(f"Error creating loader for {file_path}")
57 return None
60def _load_document(file_path: Path) -> List[Document]:
61 """
62 Loads documents from a file.
64 Args:
65 file_path: The path to the document to load.
67 Returns:
68 The loaded documents, or an empty list if it failed to load.
70 """
71 # Get a loader for this file
72 loader = _get_file_loader(str(file_path))
74 if loader is None:
75 # No loader for this filetype.
76 return []
78 try:
79 # Load the document
80 docs = loader.load()
82 # Add source path metadata and ID.
83 for doc in docs:
84 doc.metadata["source"] = str(file_path)
85 doc.metadata["filename"] = file_path.name
87 except Exception:
88 logger.exception(f"Error loading {file_path}")
89 return []
91 return docs
94class LocalEmbeddingManager:
95 """Handles embedding generation and storage for local document search"""
97 def __init__(
98 self,
99 embedding_model: str = "all-MiniLM-L6-v2",
100 embedding_device: str = "cpu",
101 embedding_model_type: str = "sentence_transformers", # or 'ollama'
102 ollama_base_url: Optional[str] = None,
103 chunk_size: int = 1000,
104 chunk_overlap: int = 200,
105 cache_dir: Optional[str] = None,
106 settings_snapshot: Optional[Dict[str, Any]] = None,
107 ):
108 """
109 Initialize the embedding manager for local document search.
111 Args:
112 embedding_model: Name of the embedding model to use
113 embedding_device: Device to run embeddings on ('cpu' or 'cuda')
114 embedding_model_type: Type of embedding model ('sentence_transformers' or 'ollama')
115 ollama_base_url: Base URL for Ollama API if using ollama embeddings
116 chunk_size: Size of text chunks for splitting documents
117 chunk_overlap: Overlap between chunks
118 cache_dir: Directory to store embedding cache and index.
119 If None, uses the app's configured cache directory.
120 settings_snapshot: Optional settings snapshot for background threads
121 """
123 self.embedding_model = embedding_model
124 self.embedding_device = embedding_device
125 self.embedding_model_type = embedding_model_type
126 self.ollama_base_url = ollama_base_url
127 self.chunk_size = chunk_size
128 self.chunk_overlap = chunk_overlap
129 # Use configured cache directory if not specified
130 if cache_dir is None:
131 self.cache_dir = get_cache_directory() / "local_search"
132 else:
133 self.cache_dir = Path(cache_dir)
134 self.settings_snapshot = settings_snapshot or {}
136 # Username for database access (extracted from settings if available)
137 self.username = (
138 settings_snapshot.get("_username") if settings_snapshot else None
139 )
140 # Password for encrypted database access (can be set later)
141 self.db_password = None
143 # Create cache directory if it doesn't exist
144 self.cache_dir.mkdir(parents=True, exist_ok=True)
146 # Initialize the embedding model (with lock for thread-safe lazy init)
147 self._embeddings = None
148 self._embedding_lock = threading.Lock()
150 # Initialize the text splitter
151 self.text_splitter = RecursiveCharacterTextSplitter(
152 chunk_size=chunk_size, chunk_overlap=chunk_overlap
153 )
155 # Track indexed folders and their metadata
156 self.indexed_folders = self._load_indexed_folders()
158 # Vector store cache
159 self.vector_stores = {}
161 # Track if this manager has been closed
162 self._closed = False
164 def close(self):
165 """Release embedding model resources."""
166 if self._closed:
167 return
168 self._closed = True
169 # Clear embedding model reference to allow garbage collection
170 self._embeddings = None
171 # Clear vector store cache
172 self.vector_stores.clear()
173 logger.debug("LocalEmbeddingManager closed")
175 def __enter__(self):
176 """Context manager entry."""
177 return self
179 def __exit__(self, exc_type, exc_val, exc_tb):
180 """Context manager exit - ensures resources are released."""
181 self.close()
182 return False
184 @property
185 def embeddings(self):
186 """
187 Lazily initialize embeddings when first accessed.
188 This allows the LocalEmbeddingManager to be created without
189 immediately loading models, which is helpful when no local search is performed.
191 Uses double-checked locking to ensure thread-safe initialization.
192 Concurrent SentenceTransformer model loading causes meta tensor errors
193 in PyTorch when multiple threads call model.to(device) simultaneously.
194 """
195 if self._embeddings is None:
196 with self._embedding_lock:
197 if self._embeddings is None:
198 logger.info("Initializing embeddings on first use")
199 self._embeddings = self._initialize_embeddings()
200 return self._embeddings
202 def _initialize_embeddings(self):
203 """Initialize the embedding model based on configuration"""
204 try:
205 # Use the new unified embedding system
206 from ...embeddings import get_embeddings
208 # Prepare kwargs for provider-specific parameters
209 kwargs = {}
211 # Add device for sentence transformers
212 if self.embedding_model_type == "sentence_transformers": 212 ↛ 216line 212 didn't jump to line 216 because the condition on line 212 was always true
213 kwargs["device"] = self.embedding_device
215 # Add base_url for ollama if specified
216 if self.embedding_model_type == "ollama" and self.ollama_base_url: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 kwargs["base_url"] = normalize_url(self.ollama_base_url)
219 logger.info(
220 f"Initializing embeddings with provider={self.embedding_model_type}, model={self.embedding_model}"
221 )
223 return get_embeddings(
224 provider=self.embedding_model_type,
225 model=self.embedding_model,
226 settings_snapshot=self.settings_snapshot,
227 **kwargs,
228 )
229 except Exception:
230 logger.exception("Error initializing embeddings")
231 logger.warning(
232 "Falling back to HuggingFaceEmbeddings with all-MiniLM-L6-v2"
233 )
234 return HuggingFaceEmbeddings(
235 model_name="sentence-transformers/all-MiniLM-L6-v2"
236 )
238 def _store_chunks_to_db(
239 self,
240 chunks: List[Document],
241 collection_name: str,
242 source_path: Optional[str] = None,
243 source_id: Optional[int] = None,
244 source_type: str = "local_file",
245 ) -> List[str]:
246 """
247 Store document chunks in the database.
249 Args:
250 chunks: List of LangChain Document chunks
251 collection_name: Name of the collection (e.g., 'personal_notes', 'library')
252 source_path: Path to source file (for local files)
253 source_id: ID of source document (for library documents)
254 source_type: Type of source ('local_file' or 'library')
256 Returns:
257 List of chunk embedding IDs (UUIDs) for FAISS mapping
258 """
259 if not self.username:
260 logger.warning(
261 "No username available, cannot store chunks in database"
262 )
263 return []
265 chunk_ids = []
267 try:
268 with get_user_db_session(
269 self.username, self.db_password
270 ) as session:
271 for idx, chunk in enumerate(chunks):
272 # Generate unique hash for chunk
273 chunk_text = chunk.page_content
274 chunk_hash = hashlib.sha256(chunk_text.encode()).hexdigest()
276 # Generate unique embedding ID
277 embedding_id = uuid.uuid4().hex
279 # Extract metadata
280 metadata = chunk.metadata or {}
281 document_title = metadata.get(
282 "filename", metadata.get("title", "Unknown")
283 )
285 # Calculate word count
286 word_count = len(chunk_text.split())
288 # Get character positions from metadata if available
289 start_char = metadata.get("start_char", 0)
290 end_char = metadata.get("end_char", len(chunk_text))
292 # Check if chunk already exists
293 existing_chunk = (
294 session.query(DocumentChunk)
295 .filter_by(chunk_hash=chunk_hash)
296 .first()
297 )
299 if existing_chunk:
300 # Update existing chunk
301 existing_chunk.last_accessed = datetime.now(UTC)
302 chunk_ids.append(existing_chunk.embedding_id)
303 logger.debug(
304 f"Chunk already exists, reusing: {existing_chunk.embedding_id}"
305 )
306 else:
307 # Create new chunk
308 db_chunk = DocumentChunk(
309 chunk_hash=chunk_hash,
310 source_type=source_type,
311 source_id=source_id,
312 source_path=str(source_path)
313 if source_path
314 else None,
315 collection_name=collection_name,
316 chunk_text=chunk_text,
317 chunk_index=idx,
318 start_char=start_char,
319 end_char=end_char,
320 word_count=word_count,
321 embedding_id=embedding_id,
322 embedding_model=self.embedding_model,
323 embedding_model_type=self.embedding_model_type,
324 document_title=document_title,
325 document_metadata=metadata,
326 )
327 session.add(db_chunk)
328 chunk_ids.append(embedding_id)
330 session.commit()
331 logger.info(
332 f"Stored {len(chunk_ids)} chunks to database for collection '{collection_name}'"
333 )
335 except Exception:
336 logger.exception(
337 f"Error storing chunks to database for collection '{collection_name}'"
338 )
339 return []
341 return chunk_ids
343 def _load_chunks_from_db(
344 self, chunk_ids: List[str], username: Optional[str] = None
345 ) -> List[Dict[str, Any]]:
346 """
347 Load chunks from database by their embedding IDs.
349 Args:
350 chunk_ids: List of embedding IDs to load
351 username: Username for database access (uses self.username if not provided)
353 Returns:
354 List of chunk dictionaries with content and metadata
355 """
356 username = username or self.username
357 if not username:
358 logger.warning(
359 "No username available, cannot load chunks from database"
360 )
361 return []
363 chunks = []
365 try:
366 with get_user_db_session(username) as session:
367 db_chunks = (
368 session.query(DocumentChunk)
369 .filter(DocumentChunk.embedding_id.in_(chunk_ids))
370 .all()
371 )
373 for db_chunk in db_chunks:
374 # Update last accessed time
375 db_chunk.last_accessed = datetime.now(UTC)
377 chunks.append(
378 {
379 "id": db_chunk.embedding_id,
380 "content": db_chunk.chunk_text,
381 "metadata": {
382 "source_type": db_chunk.source_type,
383 "source_path": db_chunk.source_path,
384 "source_id": db_chunk.source_id,
385 "collection": db_chunk.collection_name,
386 "chunk_index": db_chunk.chunk_index,
387 "word_count": db_chunk.word_count,
388 "title": db_chunk.document_title,
389 **db_chunk.document_metadata,
390 },
391 }
392 )
394 session.commit() # Commit the last_accessed updates
396 except Exception:
397 logger.exception("Error loading chunks from database")
398 return []
400 return chunks
402 def _delete_chunks_from_db(
403 self,
404 collection_name: str,
405 source_path: Optional[str] = None,
406 source_id: Optional[int] = None,
407 ) -> int:
408 """
409 Delete chunks from database.
411 Args:
412 collection_name: Name of the collection
413 source_path: Path to source file (for local files)
414 source_id: ID of source document (for library documents)
416 Returns:
417 Number of chunks deleted
418 """
419 if not self.username:
420 logger.warning(
421 "No username available, cannot delete chunks from database"
422 )
423 return 0
425 try:
426 with get_user_db_session(
427 self.username, self.db_password
428 ) as session:
429 query = session.query(DocumentChunk).filter_by(
430 collection_name=collection_name
431 )
433 if source_path:
434 query = query.filter_by(source_path=str(source_path))
435 if source_id:
436 query = query.filter_by(source_id=source_id)
438 count = query.delete()
439 session.commit()
441 logger.info(
442 f"Deleted {count} chunks from database for collection '{collection_name}'"
443 )
444 return count
446 except Exception:
447 logger.exception(
448 f"Error deleting chunks from database for collection '{collection_name}'"
449 )
450 return 0
452 def _load_or_create_vector_store(self):
453 """Load the vector store from disk or create it if needed"""
454 vector_store_path = self._get_vector_store_path()
456 # Check if vector store exists and is up to date
457 if vector_store_path.exists() and not self._check_folders_modified():
458 logger.info(
459 f"Loading existing vector store from {vector_store_path}"
460 )
461 try:
462 vector_store = FAISS.load_local(
463 str(vector_store_path),
464 self.embeddings,
465 allow_dangerous_deserialization=True,
466 )
468 # Add this code to show document count
469 doc_count = len(vector_store.index_to_docstore_id)
470 logger.info(f"Loaded index with {doc_count} document chunks")
472 return vector_store
473 except Exception:
474 logger.exception("Error loading vector store")
475 logger.info("Will create a new vector store")
477 # Create a new vector store
478 return self._create_vector_store()
480 def _load_indexed_folders(self) -> Dict[str, Dict[str, Any]]:
481 """Load metadata about indexed folders from disk"""
482 index_metadata_path = self.cache_dir / "index_metadata.json"
484 if index_metadata_path.exists():
485 try:
486 with open(index_metadata_path, "r") as f:
487 return json.load(f)
488 except Exception:
489 logger.exception("Error loading index metadata")
491 return {}
493 def _save_indexed_folders(self):
494 """Save metadata about indexed folders to disk"""
495 index_metadata_path = self.cache_dir / "index_metadata.json"
497 try:
498 with open(index_metadata_path, "w") as f:
499 json.dump(self.indexed_folders, f, indent=2)
500 except Exception:
501 logger.exception("Error saving index metadata")
503 @staticmethod
504 def get_folder_hash(folder_path: Path) -> str:
505 """Generate a hash for a folder based on its path"""
506 # Canonicalize the path so we don't have weird Windows vs. Linux
507 # problems or issues with trailing slashes.
508 canonical_folder_path = "/".join(folder_path.parts)
509 return hashlib.md5( # DevSkim: ignore DS126858
510 canonical_folder_path.encode(), usedforsecurity=False
511 ).hexdigest()
513 def _get_index_path(self, folder_path: Path) -> Path:
514 """Get the path where the index for a specific folder should be stored"""
515 folder_hash = self.get_folder_hash(folder_path)
516 return self.cache_dir / f"index_{folder_hash}"
518 def _check_folder_modified(self, folder_path: Path) -> bool:
519 """Check if a folder has been modified since it was last indexed"""
521 @staticmethod
522 def _get_all_files(folder_path: Path) -> Iterable[Path]:
523 """
524 Gets all the files, recursively, in a folder.
526 Args:
527 folder_path: The path to the folder.
529 Yields:
530 Each of the files in the folder.
532 """
533 for root, _, files in os.walk(folder_path):
534 for file in files:
535 yield Path(root) / file
537 def _get_modified_files(self, folder_path: Path) -> List[Path]:
538 """
539 Gets the files in a folder that have been modified since it was last
540 indexed.
542 Args:
543 folder_path: The path to the folder to check.
545 Returns:
546 A list of the files that were modified.
548 """
549 if not folder_path.exists() or not folder_path.is_dir():
550 return []
552 folder_hash = self.get_folder_hash(folder_path)
554 if folder_hash not in self.indexed_folders:
555 # If folder has never been indexed, everything has been modified.
556 last_indexed = 0
557 indexed_files = set()
558 else:
559 last_indexed = self.indexed_folders[folder_hash].get(
560 "last_indexed", 0
561 )
562 indexed_files = (
563 self.indexed_folders[folder_hash]
564 .get("indexed_files", {})
565 .keys()
566 )
568 # Check if any file in the folder has been modified since last indexing
569 modified_files = []
570 for file_path in self._get_all_files(folder_path):
571 file_stats = file_path.stat()
572 if file_stats.st_mtime > last_indexed:
573 modified_files.append(file_path)
574 elif str(file_path.relative_to(folder_path)) not in indexed_files:
575 # This file somehow never got indexed.
576 modified_files.append(file_path)
578 return modified_files
580 def _check_config_changed(self, folder_path: Path) -> bool:
581 """
582 Checks if the embedding configuration for a folder has been changed
583 since it was last indexed.
584 """
585 folder_hash = self.get_folder_hash(folder_path)
587 if folder_hash not in self.indexed_folders:
588 # It hasn't been indexed at all. That's a new configuration,
589 # technically.
590 return True
592 embedding_config = self.indexed_folders[folder_hash]
593 chunk_size = int(embedding_config.get("chunk_size", 0))
594 chunk_overlap = int(embedding_config.get("chunk_overlap", 0))
595 embedding_model = embedding_config.get("embedding_model", "")
597 if (chunk_size, chunk_overlap, embedding_model) != (
598 self.chunk_size,
599 self.chunk_overlap,
600 self.embedding_model,
601 ):
602 logger.info(
603 "Embedding configuration has changed, re-indexing folder."
604 )
605 return True
606 return False
608 def index_folder(
609 self, folder_path: str, force_reindex: bool = False
610 ) -> bool:
611 """
612 Index all documents in a folder for vector search.
614 Args:
615 folder_path: Path to the folder to index
616 force_reindex: Whether to force reindexing even if unchanged
618 Returns:
619 bool: True if indexing was successful, False otherwise
620 """
621 folder_path = Path(folder_path)
623 # Validate folder
624 if not folder_path.exists(): 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true
625 logger.error(f"Folder not found: {folder_path}")
626 return False
628 if not folder_path.is_dir(): 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true
629 logger.error(f"Path is not a directory: {folder_path}")
630 return False
632 folder_str = str(folder_path)
633 folder_hash = self.get_folder_hash(folder_path)
634 index_path = self._get_index_path(folder_path)
636 if force_reindex or self._check_config_changed(folder_path): 636 ↛ 641line 636 didn't jump to line 641 because the condition on line 636 was always true
637 logger.info(f"Re-indexing entire folder: {folder_path}")
638 modified_files = list(self._get_all_files(folder_path))
639 else:
640 # Just re-index the modified files if we can get away with it.
641 modified_files = self._get_modified_files(folder_path)
642 logger.info(f"Re-indexing {len(modified_files)} modified files...")
644 # Load the vector store from disk if not already loaded
645 if folder_hash not in self.vector_stores and index_path.exists(): 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true
646 try:
647 self.vector_stores[folder_hash] = FAISS.load_local(
648 str(index_path),
649 self.embeddings,
650 allow_dangerous_deserialization=True,
651 )
652 logger.info(f"Loaded index for {folder_path} from disk")
653 except Exception:
654 logger.exception(f"Error loading index for {folder_path}")
655 # If loading fails, force reindexing
656 force_reindex = True
658 logger.info(f"Indexing folder: {folder_path}")
659 start_time = time.time()
661 # Find documents to index
662 all_docs = []
664 # Remove hidden files and directories.
665 modified_files = [
666 p
667 for p in modified_files
668 if not p.name.startswith(".")
669 and not any(part.startswith(".") for part in p.parts)
670 ]
671 # Index them.
672 with ProcessPoolExecutor() as executor:
673 all_docs_nested = executor.map(_load_document, modified_files)
674 # Flatten the result.
675 for docs in all_docs_nested: 675 ↛ 676line 675 didn't jump to line 676 because the loop on line 675 never started
676 all_docs.extend(docs)
678 if force_reindex or folder_hash not in self.vector_stores: 678 ↛ 693line 678 didn't jump to line 693 because the condition on line 678 was always true
679 logger.info(f"Creating new index for {folder_path}")
680 # Embed a test query to figure out embedding length.
681 test_embedding = self.embeddings.embed_query("hello world")
682 index = IndexFlatL2(len(test_embedding))
683 # Use minimal docstore - chunks are stored in database
684 self.vector_stores[folder_hash] = FAISS(
685 self.embeddings,
686 index=index,
687 docstore=InMemoryDocstore(), # Minimal - just for FAISS compatibility
688 index_to_docstore_id={},
689 normalize_L2=True,
690 )
692 # Split documents into chunks
693 logger.info(f"Splitting {len(all_docs)} documents into chunks")
694 splits = self.text_splitter.split_documents(all_docs)
695 logger.info(
696 f"Created {len(splits)} chunks from {len(modified_files)} files"
697 )
699 # Store chunks in database and get embedding IDs
700 embedding_ids = []
701 if splits: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 logger.info(f"Storing {len(splits)} chunks in database")
703 # Get collection name from folder path (last folder name)
704 collection_name = folder_path.name
706 # Store chunks to database
707 embedding_ids = self._store_chunks_to_db(
708 chunks=splits,
709 collection_name=collection_name,
710 source_type="local_file",
711 )
713 logger.info(f"Adding {len(splits)} chunks to FAISS index")
714 # Add embeddings to FAISS using the database-generated IDs
715 self.vector_stores[folder_hash].add_documents(
716 splits, ids=embedding_ids
717 )
719 # Update indexing time for individual files.
720 index_time = time.time()
721 indexed_files = {}
722 if folder_hash in self.indexed_folders: 722 ↛ 723line 722 didn't jump to line 723 because the condition on line 722 was never true
723 indexed_files = (
724 self.indexed_folders[folder_hash]
725 .get("indexed_files", {})
726 .copy()
727 )
728 for embedding_id, split in zip(embedding_ids, splits, strict=False): 728 ↛ 729line 728 didn't jump to line 729 because the loop on line 728 never started
729 split_source = str(
730 Path(split.metadata["source"]).relative_to(folder_path)
731 )
732 id_list = indexed_files.setdefault(split_source, [])
733 id_list.append(embedding_id)
735 # Check for any files that were removed and remove them from the
736 # vector store and database.
737 delete_ids = []
738 delete_paths = []
739 for relative_path, chunk_ids in indexed_files.items(): 739 ↛ 740line 739 didn't jump to line 740 because the loop on line 739 never started
740 if not (folder_path / Path(relative_path)).exists():
741 delete_ids.extend(chunk_ids)
742 delete_paths.append(relative_path)
743 if delete_ids: 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true
744 logger.info(
745 f"Deleting {len(delete_paths)} non-existent files from the "
746 f"index and database."
747 )
748 # Delete from FAISS index
749 self.vector_stores[folder_hash].delete(delete_ids)
751 # Delete from database
752 collection_name = folder_path.name
753 for delete_path in delete_paths:
754 full_path = str(folder_path / delete_path)
755 deleted_count = self._delete_chunks_from_db(
756 collection_name=collection_name,
757 source_path=full_path,
758 )
759 logger.debug(
760 f"Deleted {deleted_count} chunks for {delete_path} from database"
761 )
762 for path in delete_paths: 762 ↛ 763line 762 didn't jump to line 763 because the loop on line 762 never started
763 del indexed_files[path]
765 # Save the vector store to disk
766 logger.info(f"Saving index to {index_path}")
767 self.vector_stores[folder_hash].save_local(str(index_path))
769 # Update metadata
770 self.indexed_folders[folder_hash] = {
771 "path": folder_str,
772 "last_indexed": index_time,
773 "file_count": len(modified_files),
774 "chunk_count": len(splits),
775 "embedding_model": self.embedding_model,
776 "chunk_size": self.chunk_size,
777 "chunk_overlap": self.chunk_overlap,
778 "indexed_files": indexed_files,
779 }
781 # Save updated metadata
782 self._save_indexed_folders()
784 elapsed_time = time.time() - start_time
785 logger.info(
786 f"Indexed {len(modified_files)} files in {elapsed_time:.2f} seconds"
787 )
789 return True
791 def search(
792 self,
793 query: str,
794 folder_paths: List[str],
795 limit: int = 10,
796 score_threshold: float = 0.0,
797 ) -> List[Dict[str, Any]]:
798 """
799 Search for documents relevant to a query across specified folders.
801 Args:
802 query: The search query
803 folder_paths: List of folder paths to search in
804 limit: Maximum number of results to return
805 score_threshold: Minimum similarity score threshold
807 Returns:
808 List of results with document content and metadata
809 """
810 folder_paths = [Path(p) for p in folder_paths]
812 # Add detailed debugging for each folder
813 for folder_path in folder_paths:
814 folder_hash = self.get_folder_hash(folder_path)
815 index_path = self._get_index_path(folder_path)
817 logger.info(f"Diagnostic for {folder_path}:")
818 logger.info(f" - Folder hash: {folder_hash}")
819 logger.info(f" - Index path: {index_path}")
820 logger.info(f" - Index exists on disk: {index_path.exists()}")
821 logger.info(
822 f" - Is in indexed_folders: {folder_hash in self.indexed_folders}"
823 )
825 if folder_hash in self.indexed_folders:
826 meta = self.indexed_folders[folder_hash]
827 logger.info(
828 f" - Metadata: file_count={meta.get('file_count', 0)}, chunk_count={meta.get('chunk_count', 0)}"
829 )
831 # Validate folders exist
832 valid_folder_paths = []
833 for path in folder_paths:
834 if path.exists() and path.is_dir():
835 valid_folder_paths.append(path)
836 else:
837 logger.warning(
838 f"Skipping non-existent folder in search: {path}"
839 )
841 # If no valid folders, return empty results
842 if not valid_folder_paths:
843 logger.warning(f"No valid folders to search among: {folder_paths}")
844 return []
846 all_results = []
848 for folder_path in valid_folder_paths:
849 folder_hash = self.get_folder_hash(folder_path)
851 # Skip folders that haven't been indexed
852 if folder_hash not in self.indexed_folders:
853 logger.warning(f"Folder {folder_path} has not been indexed")
854 continue
856 # Make sure the vector store is loaded
857 if folder_hash not in self.vector_stores:
858 index_path = self._get_index_path(folder_path)
859 try:
860 self.vector_stores[folder_hash] = FAISS.load_local(
861 str(index_path),
862 self.embeddings,
863 allow_dangerous_deserialization=True,
864 )
865 except Exception:
866 logger.exception(f"Error loading index for {folder_path}")
867 continue
869 # Search in this folder
870 vector_store = self.vector_stores[folder_hash]
872 try:
873 # Get query embedding
874 query_vector = self.embeddings.embed_query(query)
876 # Search FAISS index for similar vectors
877 # Returns: (distances, indices) where indices are FAISS internal indices
878 distances, indices = vector_store.index.search(
879 np.array([query_vector], dtype=np.float32), limit
880 )
882 # Convert distances to similarity scores (L2 distance -> similarity)
883 # For L2: smaller distance = more similar
884 # Convert to similarity: 1 / (1 + distance)
885 similarities = 1 / (1 + distances[0])
887 # Get embedding IDs from FAISS mapping
888 embedding_ids = []
889 valid_indices = []
890 for idx, faiss_idx in enumerate(indices[0]):
891 if faiss_idx == -1: # FAISS returns -1 for empty results
892 continue
893 if faiss_idx in vector_store.index_to_docstore_id:
894 embedding_id = vector_store.index_to_docstore_id[
895 faiss_idx
896 ]
897 embedding_ids.append(embedding_id)
898 valid_indices.append(idx)
900 # Load chunks from database
901 if embedding_ids:
902 db_chunks = self._load_chunks_from_db(
903 embedding_ids, self.username
904 )
906 # Create results from database chunks
907 for idx, chunk in zip(valid_indices, db_chunks):
908 similarity = float(similarities[idx])
910 # Skip results below the threshold
911 if similarity < score_threshold:
912 continue
914 # Extract metadata from chunk
915 metadata = chunk.get("document_metadata", {})
916 if "source" not in metadata and chunk.get(
917 "source_path"
918 ):
919 metadata["source"] = chunk["source_path"]
921 result = {
922 "content": chunk["chunk_text"],
923 "metadata": metadata,
924 "similarity": similarity,
925 "folder": folder_path,
926 }
928 all_results.append(result)
929 except Exception:
930 logger.exception(f"Error searching in {folder_path}")
932 # Sort by similarity (highest first)
933 all_results.sort(key=lambda x: x["similarity"], reverse=True)
935 # Limit to the requested number
936 return all_results[:limit]
938 def clear_cache(self):
939 """Clear all cached vector stores from memory (not disk)"""
940 self.vector_stores.clear()
942 def get_indexed_folders_info(self) -> List[Dict[str, Any]]:
943 """Get information about all indexed folders"""
944 info = []
946 for folder_hash, metadata in self.indexed_folders.items():
947 folder_info = metadata.copy()
949 # Add formatted last indexed time
950 if "last_indexed" in folder_info: 950 ↛ 956line 950 didn't jump to line 956 because the condition on line 950 was always true
951 folder_info["last_indexed_formatted"] = datetime.fromtimestamp(
952 folder_info["last_indexed"]
953 ).strftime("%Y-%m-%d %H:%M:%S")
955 # Check if index file exists
956 index_path = self._get_index_path(Path(folder_info["path"]))
957 folder_info["index_exists"] = index_path.exists()
959 info.append(folder_info)
961 return info
964class LocalSearchEngine(BaseSearchEngine):
965 """Local document search engine with two-phase retrieval"""
967 def __init__(
968 self,
969 paths: List[str],
970 llm: Optional[BaseLLM] = None,
971 max_results: int = 10,
972 max_filtered_results: Optional[int] = None,
973 embedding_model: str = "all-MiniLM-L6-v2",
974 embedding_device: str = "cpu",
975 embedding_model_type: str = "sentence_transformers",
976 ollama_base_url: Optional[str] = None,
977 force_reindex: bool = False,
978 chunk_size: int = 1000,
979 chunk_overlap: int = 200,
980 cache_dir: Optional[str] = None,
981 collections: Optional[Dict[str, Dict[str, Any]]] = None,
982 name: str = "",
983 description: str = "",
984 ):
985 """
986 Initialize the local search engine.
988 Args:
989 paths: List of folder paths to search in
990 llm: Language model for relevance filtering
991 max_results: Maximum number of results to return
992 max_filtered_results: Maximum results after filtering
993 embedding_model: Name of the embedding model to use
994 embedding_device: Device to run embeddings on ('cpu' or 'cuda')
995 embedding_model_type: Type of embedding model
996 ollama_base_url: Base URL for Ollama API
997 force_reindex: Whether to force reindexing
998 chunk_size: Size of text chunks for splitting documents
999 chunk_overlap: Overlap between chunks
1000 cache_dir: Directory to store embedding cache and index
1001 collections: Dictionary of named collections with paths and descriptions
1002 name: Human-readable name of the collection we are searching.
1003 description: Human-readable description of the collection we are
1004 searching.
1005 """
1006 # Initialize the base search engine
1007 super().__init__(llm=llm, max_filtered_results=max_filtered_results)
1009 self.name = name
1010 self.description = description
1012 # Validate folder paths
1013 self.folder_paths = paths
1014 self.valid_folder_paths = []
1015 for path_str in paths:
1016 path = Path(path_str)
1017 if path.exists() and path.is_dir():
1018 self.valid_folder_paths.append(path_str)
1019 else:
1020 logger.warning(
1021 f"Folder not found or is not a directory: {path_str}"
1022 )
1024 # If no valid folders, log a clear message
1025 if not self.valid_folder_paths and paths:
1026 logger.warning(f"No valid folders found among: {paths}")
1027 logger.warning(
1028 "This search engine will return no results until valid folders are configured"
1029 )
1031 self.max_results = max_results
1032 self.collections = collections or {
1033 "default": {"paths": paths, "description": "Default collection"}
1034 }
1036 # Initialize the embedding manager with only valid folders
1037 self.embedding_manager = LocalEmbeddingManager(
1038 embedding_model=embedding_model,
1039 embedding_device=embedding_device,
1040 embedding_model_type=embedding_model_type,
1041 ollama_base_url=ollama_base_url,
1042 chunk_size=chunk_size,
1043 chunk_overlap=chunk_overlap,
1044 cache_dir=cache_dir,
1045 settings_snapshot=self.settings_snapshot,
1046 )
1048 # Index all folders
1049 self._index_folders(force_reindex)
1051 def _index_folders(self, force_reindex: bool = False):
1052 """Index all valid configured folders"""
1053 indexed = []
1054 failed = []
1055 skipped = []
1057 # Keep track of invalid folders
1058 for folder in self.folder_paths:
1059 if folder not in self.valid_folder_paths:
1060 skipped.append(folder)
1061 continue
1063 success = self.embedding_manager.index_folder(folder, force_reindex)
1064 if success: 1064 ↛ 1067line 1064 didn't jump to line 1067 because the condition on line 1064 was always true
1065 indexed.append(folder)
1066 else:
1067 failed.append(folder)
1069 if indexed:
1070 logger.info(
1071 f"Successfully indexed {len(indexed)} folders: {', '.join(indexed)}"
1072 )
1074 if failed: 1074 ↛ 1075line 1074 didn't jump to line 1075 because the condition on line 1074 was never true
1075 logger.warning(
1076 f"Failed to index {len(failed)} folders: {', '.join(failed)}"
1077 )
1079 if skipped:
1080 logger.warning(
1081 f"Skipped {len(skipped)} invalid folders: {', '.join(skipped)}"
1082 )
1084 def _get_previews(
1085 self, query: str, collection_names: Optional[List[str]] = None
1086 ) -> List[Dict[str, Any]]:
1087 """
1088 Get preview information for documents matching the query.
1090 Args:
1091 query: The search query
1092 collection_names: Specific collections to search within (if None, search all)
1094 Returns:
1095 List of preview dictionaries
1096 """
1097 # Determine which collections to search
1098 if collection_names:
1099 # Search only in specified collections
1100 collections_to_search = {
1101 name: self.collections[name]
1102 for name in collection_names
1103 if name in self.collections
1104 }
1105 if not collections_to_search: 1105 ↛ 1106line 1105 didn't jump to line 1106 because the condition on line 1105 was never true
1106 logger.warning(
1107 f"No valid collections found among: {collection_names}"
1108 )
1109 return []
1110 else:
1111 # Search in all collections
1112 collections_to_search = self.collections
1114 # Extract all folder paths from the collections to search
1115 search_paths = []
1116 for collection_config in collections_to_search.values():
1117 if "paths" in collection_config: 1117 ↛ 1116line 1117 didn't jump to line 1116 because the condition on line 1117 was always true
1118 search_paths.extend(collection_config["paths"])
1120 logger.info(
1121 f"Searching local documents in collections: {list(collections_to_search.keys())}"
1122 )
1124 # Filter out invalid paths
1125 valid_search_paths = [
1126 path for path in search_paths if path in self.valid_folder_paths
1127 ]
1129 if not valid_search_paths:
1130 logger.warning(
1131 f"No valid folders to search in collections: {list(collections_to_search.keys())}"
1132 )
1133 return []
1135 # Search across the valid selected folders
1136 raw_results = self.embedding_manager.search(
1137 query=query,
1138 folder_paths=valid_search_paths,
1139 limit=self.max_results,
1140 score_threshold=0.1, # Skip very low relevance results
1141 )
1143 if not raw_results:
1144 logger.info(f"No local documents found for query: {query}")
1145 return []
1147 # Convert to preview format
1148 previews = []
1149 for i, result in enumerate(raw_results):
1150 # Create a unique ID
1151 result_id = f"local-{i}-{hashlib.md5(result['content'][:50].encode(), usedforsecurity=False).hexdigest()}" # DevSkim: ignore DS126858
1153 # Extract filename and path
1154 source_path = result["metadata"].get("source", "Unknown")
1155 filename = result["metadata"].get(
1156 "filename", Path(source_path).name
1157 )
1159 # Create preview snippet (first ~200 chars of content)
1160 snippet = (
1161 result["content"][:200] + "..."
1162 if len(result["content"]) > 200
1163 else result["content"]
1164 )
1166 # Determine which collection this document belongs to
1167 collection_name = "Unknown"
1168 folder_path = result["folder"]
1169 for name, collection in self.collections.items(): 1169 ↛ 1177line 1169 didn't jump to line 1177 because the loop on line 1169 didn't complete
1170 if any( 1170 ↛ 1169line 1170 didn't jump to line 1169 because the condition on line 1170 was always true
1171 folder_path.is_relative_to(path)
1172 for path in collection.get("paths", [])
1173 ):
1174 break
1176 # Format the preview
1177 preview = {
1178 "id": result_id,
1179 "title": filename,
1180 "snippet": snippet,
1181 "link": source_path,
1182 "similarity": result["similarity"],
1183 "folder": folder_path.as_posix(),
1184 "collection": collection_name,
1185 "collection_description": self.collections.get(
1186 collection_name, {}
1187 ).get("description", ""),
1188 "_full_content": result[
1189 "content"
1190 ], # Store full content for later
1191 "_metadata": result["metadata"], # Store metadata for later
1192 }
1194 previews.append(preview)
1196 logger.info(f"Found {len(previews)} local document matches")
1197 return previews
1199 def _get_full_content(
1200 self, relevant_items: List[Dict[str, Any]]
1201 ) -> List[Dict[str, Any]]:
1202 """
1203 Get full content for the relevant documents.
1204 For local search, the full content is already available.
1206 Args:
1207 relevant_items: List of relevant preview dictionaries
1209 Returns:
1210 List of result dictionaries with full content
1211 """
1212 # Check if we should add full content
1213 if (
1214 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
1215 and search_config.SEARCH_SNIPPETS_ONLY
1216 ):
1217 logger.info("Snippet-only mode, skipping full content addition")
1218 return relevant_items
1220 # For local search, we already have the full content
1221 results = []
1222 for item in relevant_items:
1223 # Create a copy with full content
1224 result = item.copy()
1226 # Add full content if we have it
1227 if "_full_content" in item: 1227 ↛ 1236line 1227 didn't jump to line 1236 because the condition on line 1227 was always true
1228 result["content"] = item["_full_content"]
1229 result["full_content"] = item["_full_content"]
1231 # Remove temporary fields
1232 if "_full_content" in result: 1232 ↛ 1236line 1232 didn't jump to line 1236 because the condition on line 1232 was always true
1233 del result["_full_content"]
1235 # Add metadata if we have it
1236 if "_metadata" in item: 1236 ↛ 1243line 1236 didn't jump to line 1243 because the condition on line 1236 was always true
1237 result["document_metadata"] = item["_metadata"]
1239 # Remove temporary fields
1240 if "_metadata" in result: 1240 ↛ 1243line 1240 didn't jump to line 1243 because the condition on line 1240 was always true
1241 del result["_metadata"]
1243 results.append(result)
1245 return results
1247 def run(
1248 self,
1249 query: str,
1250 research_context: Dict[str, Any] | None = None,
1251 collection_names: Optional[List[str]] = None,
1252 ) -> List[Dict[str, Any]]:
1253 """
1254 Execute a search using the two-phase approach.
1256 Args:
1257 query: The search query
1258 research_context: Context from previous research to use.
1259 collection_names: Specific collections to search within (if None, search all)
1261 Returns:
1262 List of search result dictionaries with full content
1263 """
1264 logger.info("---Execute a search using Local Documents---")
1266 # Check if we have any special collection parameters in the query
1267 collection_prefix = "collection:"
1268 remaining_query = query
1269 specified_collections = []
1271 # Parse query for collection specifications like "collection:research_papers query terms"
1272 query_parts = query.split()
1273 for part in query_parts:
1274 if part.lower().startswith(collection_prefix):
1275 collection_name = part[len(collection_prefix) :].strip()
1276 if collection_name in self.collections: 1276 ↛ 1273line 1276 didn't jump to line 1273 because the condition on line 1276 was always true
1277 specified_collections.append(collection_name)
1278 # Remove this part from the query
1279 remaining_query = remaining_query.replace(
1280 part, "", 1
1281 ).strip()
1283 # If collections were specified in the query, they override the parameter
1284 if specified_collections:
1285 collection_names = specified_collections
1286 query = remaining_query
1288 # Phase 1: Get previews (with collection filtering)
1289 previews = self._get_previews(query, collection_names)
1291 if not previews:
1292 return []
1294 # Phase 2: Filter for relevance
1295 relevant_items = self._filter_for_relevance(previews, query)
1297 if not relevant_items: 1297 ↛ 1298line 1297 didn't jump to line 1298 because the condition on line 1297 was never true
1298 return []
1300 # Phase 3: Get full content for relevant items
1301 if ( 1301 ↛ 1305line 1301 didn't jump to line 1305 because the condition on line 1301 was never true
1302 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
1303 and search_config.SEARCH_SNIPPETS_ONLY
1304 ):
1305 logger.info("Returning snippet-only results as per config")
1306 results = relevant_items
1307 else:
1308 results = self._get_full_content(relevant_items)
1310 # Clean up temporary data
1311 self.embedding_manager.clear_cache()
1313 return results
1315 def get_collections_info(self) -> List[Dict[str, Any]]:
1316 """
1317 Get information about all collections, including indexing status.
1319 Returns:
1320 List of collection information dictionaries
1321 """
1322 collections_info = []
1324 for name, collection in self.collections.items():
1325 paths = collection.get("paths", [])
1326 paths = [Path(p) for p in paths]
1327 description = collection.get("description", "")
1329 # Get indexing information for each path
1330 paths_info = []
1331 for path in paths:
1332 # Check if folder exists
1333 exists = path.exists() and path.is_dir()
1335 # Check if folder is indexed
1336 folder_hash = self.embedding_manager.get_folder_hash(path)
1337 indexed = folder_hash in self.embedding_manager.indexed_folders
1339 # Get index details if available
1340 index_info = {}
1341 if indexed: 1341 ↛ 1342line 1341 didn't jump to line 1342 because the condition on line 1341 was never true
1342 index_info = self.embedding_manager.indexed_folders[
1343 folder_hash
1344 ].copy()
1346 paths_info.append(
1347 {
1348 "path": path,
1349 "exists": exists,
1350 "indexed": indexed,
1351 "index_info": index_info,
1352 }
1353 )
1355 collections_info.append(
1356 {
1357 "name": name,
1358 "description": description,
1359 "paths": paths,
1360 "paths_info": paths_info,
1361 "document_count": sum(
1362 info.get("index_info", {}).get("file_count", 0)
1363 for info in paths_info
1364 ),
1365 "chunk_count": sum(
1366 info.get("index_info", {}).get("chunk_count", 0)
1367 for info in paths_info
1368 ),
1369 "all_indexed": all(
1370 info["indexed"] for info in paths_info if info["exists"]
1371 ),
1372 }
1373 )
1375 return collections_info
1377 def reindex_collection(self, collection_name: str) -> bool:
1378 """
1379 Reindex a specific collection.
1381 Args:
1382 collection_name: Name of the collection to reindex
1384 Returns:
1385 True if reindexing was successful, False otherwise
1386 """
1387 if collection_name not in self.collections:
1388 logger.error(f"Collection '{collection_name}' not found")
1389 return False
1391 paths = self.collections[collection_name].get("paths", [])
1392 success = True
1394 for path in paths:
1395 if not self.embedding_manager.index_folder( 1395 ↛ 1398line 1395 didn't jump to line 1398 because the condition on line 1395 was never true
1396 path, force_reindex=True
1397 ):
1398 success = False
1400 return success
1402 @classmethod
1403 def from_config(
1404 cls, config_dict: Dict[str, Any], llm: Optional[BaseLLM] = None
1405 ) -> "LocalSearchEngine":
1406 """
1407 Create a LocalSearchEngine instance from a configuration dictionary.
1409 Args:
1410 config_dict: Configuration dictionary
1411 llm: Language model for relevance filtering
1413 Returns:
1414 Initialized LocalSearchEngine instance
1415 """
1416 # Required parameters
1417 folder_paths = []
1418 collections = config_dict.get("collections", {})
1420 # Extract all folder paths from collections
1421 for collection_config in collections.values():
1422 if "paths" in collection_config: 1422 ↛ 1421line 1422 didn't jump to line 1421 because the condition on line 1422 was always true
1423 folder_paths.extend(collection_config["paths"])
1425 # Fall back to folder_paths if no collections defined
1426 if not folder_paths:
1427 folder_paths = config_dict.get("folder_paths", [])
1428 # Create a default collection if using folder_paths
1429 if folder_paths: 1429 ↛ 1438line 1429 didn't jump to line 1438 because the condition on line 1429 was always true
1430 collections = {
1431 "default": {
1432 "paths": folder_paths,
1433 "description": "Default collection",
1434 }
1435 }
1437 # Optional parameters with defaults
1438 max_results = config_dict.get("max_results", 10)
1439 max_filtered_results = config_dict.get("max_filtered_results")
1440 embedding_model = config_dict.get("embedding_model", "all-MiniLM-L6-v2")
1441 embedding_device = config_dict.get("embedding_device", "cpu")
1442 embedding_model_type = config_dict.get(
1443 "embedding_model_type", "sentence_transformers"
1444 )
1445 ollama_base_url = config_dict.get("ollama_base_url")
1446 force_reindex = config_dict.get("force_reindex", False)
1447 chunk_size = config_dict.get("chunk_size", 1000)
1448 chunk_overlap = config_dict.get("chunk_overlap", 200)
1449 cache_dir = config_dict.get(
1450 "cache_dir"
1451 ) # None uses app's cache directory
1453 return cls(
1454 paths=folder_paths,
1455 collections=collections,
1456 llm=llm,
1457 max_results=max_results,
1458 max_filtered_results=max_filtered_results,
1459 embedding_model=embedding_model,
1460 embedding_device=embedding_device,
1461 embedding_model_type=embedding_model_type,
1462 ollama_base_url=ollama_base_url,
1463 force_reindex=force_reindex,
1464 chunk_size=chunk_size,
1465 chunk_overlap=chunk_overlap,
1466 cache_dir=cache_dir,
1467 )