Coverage for src / local_deep_research / web_search_engines / engines / search_engine_local.py: 30%
553 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1import hashlib
2import json
3import os
4import time
5import uuid
6from concurrent.futures import ProcessPoolExecutor
7from datetime import UTC, datetime
8from pathlib import Path
9from typing import Any, Dict, Iterable, List, Optional
11import numpy as np
12from faiss import IndexFlatL2
13from langchain_community.docstore.in_memory import InMemoryDocstore
14from langchain_community.document_loaders import (
15 CSVLoader,
16 PyPDFLoader,
17 TextLoader,
18 UnstructuredExcelLoader,
19 UnstructuredMarkdownLoader,
20 UnstructuredWordDocumentLoader,
21)
22from langchain_community.document_loaders.base import BaseLoader
23from langchain_community.embeddings import (
24 HuggingFaceEmbeddings,
25)
26from langchain_community.vectorstores import FAISS
27from langchain_core.documents import Document
28from langchain_core.language_models import BaseLLM
29from langchain_text_splitters import RecursiveCharacterTextSplitter
30from loguru import logger
32from ...config import search_config
33from ...config.paths import get_cache_directory
34from ...database.models.library import DocumentChunk
35from ...database.session_context import get_user_db_session
36from ...utilities.url_utils import normalize_url
37from ..search_engine_base import BaseSearchEngine
40def _get_file_loader(file_path: str) -> Optional[BaseLoader]:
41 """Get an appropriate document loader for a file based on its extension"""
42 file_path = Path(file_path)
43 extension = file_path.suffix.lower()
45 try:
46 if extension == ".pdf":
47 return PyPDFLoader(str(file_path))
48 elif extension == ".txt":
49 return TextLoader(str(file_path))
50 elif extension in [".md", ".markdown"]:
51 return UnstructuredMarkdownLoader(str(file_path))
52 elif extension in [".doc", ".docx"]:
53 return UnstructuredWordDocumentLoader(str(file_path))
54 elif extension == ".csv":
55 return CSVLoader(str(file_path))
56 elif extension in [".xls", ".xlsx"]:
57 return UnstructuredExcelLoader(str(file_path))
58 else:
59 # Try the text loader as a fallback for unknown extensions
60 logger.warning(
61 f"Unknown file extension for {file_path}, trying TextLoader"
62 )
63 return TextLoader(str(file_path), encoding="utf-8")
64 except Exception:
65 logger.exception(f"Error creating loader for {file_path}")
66 return None
69def _load_document(file_path: Path) -> List[Document]:
70 """
71 Loads documents from a file.
73 Args:
74 file_path: The path to the document to load.
76 Returns:
77 The loaded documents, or an empty list if it failed to load.
79 """
80 # Get a loader for this file
81 loader = _get_file_loader(str(file_path))
83 if loader is None:
84 # No loader for this filetype.
85 return []
87 try:
88 # Load the document
89 docs = loader.load()
91 # Add source path metadata and ID.
92 for doc in docs:
93 doc.metadata["source"] = str(file_path)
94 doc.metadata["filename"] = file_path.name
96 except Exception:
97 logger.exception(f"Error loading {file_path}")
98 return []
100 return docs
103class LocalEmbeddingManager:
104 """Handles embedding generation and storage for local document search"""
106 def __init__(
107 self,
108 embedding_model: str = "all-MiniLM-L6-v2",
109 embedding_device: str = "cpu",
110 embedding_model_type: str = "sentence_transformers", # or 'ollama'
111 ollama_base_url: Optional[str] = None,
112 chunk_size: int = 1000,
113 chunk_overlap: int = 200,
114 cache_dir: Optional[str] = None,
115 settings_snapshot: Optional[Dict[str, Any]] = None,
116 ):
117 """
118 Initialize the embedding manager for local document search.
120 Args:
121 embedding_model: Name of the embedding model to use
122 embedding_device: Device to run embeddings on ('cpu' or 'cuda')
123 embedding_model_type: Type of embedding model ('sentence_transformers' or 'ollama')
124 ollama_base_url: Base URL for Ollama API if using ollama embeddings
125 chunk_size: Size of text chunks for splitting documents
126 chunk_overlap: Overlap between chunks
127 cache_dir: Directory to store embedding cache and index.
128 If None, uses the app's configured cache directory.
129 settings_snapshot: Optional settings snapshot for background threads
130 """
132 self.embedding_model = embedding_model
133 self.embedding_device = embedding_device
134 self.embedding_model_type = embedding_model_type
135 self.ollama_base_url = ollama_base_url
136 self.chunk_size = chunk_size
137 self.chunk_overlap = chunk_overlap
138 # Use configured cache directory if not specified
139 if cache_dir is None:
140 self.cache_dir = get_cache_directory() / "local_search"
141 else:
142 self.cache_dir = Path(cache_dir)
143 self.settings_snapshot = settings_snapshot or {}
145 # Username for database access (extracted from settings if available)
146 self.username = (
147 settings_snapshot.get("_username") if settings_snapshot else None
148 )
149 # Password for encrypted database access (can be set later)
150 self.db_password = None
152 # Create cache directory if it doesn't exist
153 self.cache_dir.mkdir(parents=True, exist_ok=True)
155 # Initialize the embedding model
156 self._embeddings = None
158 # Initialize the text splitter
159 self.text_splitter = RecursiveCharacterTextSplitter(
160 chunk_size=chunk_size, chunk_overlap=chunk_overlap
161 )
163 # Track indexed folders and their metadata
164 self.indexed_folders = self._load_indexed_folders()
166 # Vector store cache
167 self.vector_stores = {}
169 @property
170 def embeddings(self):
171 """
172 Lazily initialize embeddings when first accessed.
173 This allows the LocalEmbeddingManager to be created without
174 immediately loading models, which is helpful when no local search is performed.
175 """
176 if self._embeddings is None:
177 logger.info("Initializing embeddings on first use")
178 self._embeddings = self._initialize_embeddings()
179 return self._embeddings
181 def _initialize_embeddings(self):
182 """Initialize the embedding model based on configuration"""
183 try:
184 # Use the new unified embedding system
185 from ...embeddings import get_embeddings
187 # Prepare kwargs for provider-specific parameters
188 kwargs = {}
190 # Add device for sentence transformers
191 if self.embedding_model_type == "sentence_transformers": 191 ↛ 195line 191 didn't jump to line 195 because the condition on line 191 was always true
192 kwargs["device"] = self.embedding_device
194 # Add base_url for ollama if specified
195 if self.embedding_model_type == "ollama" and self.ollama_base_url: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 kwargs["base_url"] = normalize_url(self.ollama_base_url)
198 logger.info(
199 f"Initializing embeddings with provider={self.embedding_model_type}, model={self.embedding_model}"
200 )
202 return get_embeddings(
203 provider=self.embedding_model_type,
204 model=self.embedding_model,
205 settings_snapshot=self.settings_snapshot,
206 **kwargs,
207 )
208 except Exception:
209 logger.exception("Error initializing embeddings")
210 logger.warning(
211 "Falling back to HuggingFaceEmbeddings with all-MiniLM-L6-v2"
212 )
213 return HuggingFaceEmbeddings(
214 model_name="sentence-transformers/all-MiniLM-L6-v2"
215 )
217 def _store_chunks_to_db(
218 self,
219 chunks: List[Document],
220 collection_name: str,
221 source_path: Optional[str] = None,
222 source_id: Optional[int] = None,
223 source_type: str = "local_file",
224 ) -> List[str]:
225 """
226 Store document chunks in the database.
228 Args:
229 chunks: List of LangChain Document chunks
230 collection_name: Name of the collection (e.g., 'personal_notes', 'library')
231 source_path: Path to source file (for local files)
232 source_id: ID of source document (for library documents)
233 source_type: Type of source ('local_file' or 'library')
235 Returns:
236 List of chunk embedding IDs (UUIDs) for FAISS mapping
237 """
238 if not self.username:
239 logger.warning(
240 "No username available, cannot store chunks in database"
241 )
242 return []
244 chunk_ids = []
246 try:
247 with get_user_db_session(
248 self.username, self.db_password
249 ) as session:
250 for idx, chunk in enumerate(chunks):
251 # Generate unique hash for chunk
252 chunk_text = chunk.page_content
253 chunk_hash = hashlib.sha256(chunk_text.encode()).hexdigest()
255 # Generate unique embedding ID
256 embedding_id = uuid.uuid4().hex
258 # Extract metadata
259 metadata = chunk.metadata or {}
260 document_title = metadata.get(
261 "filename", metadata.get("title", "Unknown")
262 )
264 # Calculate word count
265 word_count = len(chunk_text.split())
267 # Get character positions from metadata if available
268 start_char = metadata.get("start_char", 0)
269 end_char = metadata.get("end_char", len(chunk_text))
271 # Check if chunk already exists
272 existing_chunk = (
273 session.query(DocumentChunk)
274 .filter_by(chunk_hash=chunk_hash)
275 .first()
276 )
278 if existing_chunk:
279 # Update existing chunk
280 existing_chunk.last_accessed = datetime.now(UTC)
281 chunk_ids.append(existing_chunk.embedding_id)
282 logger.debug(
283 f"Chunk already exists, reusing: {existing_chunk.embedding_id}"
284 )
285 else:
286 # Create new chunk
287 db_chunk = DocumentChunk(
288 chunk_hash=chunk_hash,
289 source_type=source_type,
290 source_id=source_id,
291 source_path=str(source_path)
292 if source_path
293 else None,
294 collection_name=collection_name,
295 chunk_text=chunk_text,
296 chunk_index=idx,
297 start_char=start_char,
298 end_char=end_char,
299 word_count=word_count,
300 embedding_id=embedding_id,
301 embedding_model=self.embedding_model,
302 embedding_model_type=self.embedding_model_type,
303 document_title=document_title,
304 document_metadata=metadata,
305 )
306 session.add(db_chunk)
307 chunk_ids.append(embedding_id)
309 session.commit()
310 logger.info(
311 f"Stored {len(chunk_ids)} chunks to database for collection '{collection_name}'"
312 )
314 except Exception:
315 logger.exception(
316 f"Error storing chunks to database for collection '{collection_name}'"
317 )
318 return []
320 return chunk_ids
322 def _load_chunks_from_db(
323 self, chunk_ids: List[str], username: Optional[str] = None
324 ) -> List[Dict[str, Any]]:
325 """
326 Load chunks from database by their embedding IDs.
328 Args:
329 chunk_ids: List of embedding IDs to load
330 username: Username for database access (uses self.username if not provided)
332 Returns:
333 List of chunk dictionaries with content and metadata
334 """
335 username = username or self.username
336 if not username:
337 logger.warning(
338 "No username available, cannot load chunks from database"
339 )
340 return []
342 chunks = []
344 try:
345 with get_user_db_session(username) as session:
346 db_chunks = (
347 session.query(DocumentChunk)
348 .filter(DocumentChunk.embedding_id.in_(chunk_ids))
349 .all()
350 )
352 for db_chunk in db_chunks:
353 # Update last accessed time
354 db_chunk.last_accessed = datetime.now(UTC)
356 chunks.append(
357 {
358 "id": db_chunk.embedding_id,
359 "content": db_chunk.chunk_text,
360 "metadata": {
361 "source_type": db_chunk.source_type,
362 "source_path": db_chunk.source_path,
363 "source_id": db_chunk.source_id,
364 "collection": db_chunk.collection_name,
365 "chunk_index": db_chunk.chunk_index,
366 "word_count": db_chunk.word_count,
367 "title": db_chunk.document_title,
368 **db_chunk.document_metadata,
369 },
370 }
371 )
373 session.commit() # Commit the last_accessed updates
375 except Exception:
376 logger.exception("Error loading chunks from database")
377 return []
379 return chunks
381 def _delete_chunks_from_db(
382 self,
383 collection_name: str,
384 source_path: Optional[str] = None,
385 source_id: Optional[int] = None,
386 ) -> int:
387 """
388 Delete chunks from database.
390 Args:
391 collection_name: Name of the collection
392 source_path: Path to source file (for local files)
393 source_id: ID of source document (for library documents)
395 Returns:
396 Number of chunks deleted
397 """
398 if not self.username:
399 logger.warning(
400 "No username available, cannot delete chunks from database"
401 )
402 return 0
404 try:
405 with get_user_db_session(
406 self.username, self.db_password
407 ) as session:
408 query = session.query(DocumentChunk).filter_by(
409 collection_name=collection_name
410 )
412 if source_path:
413 query = query.filter_by(source_path=str(source_path))
414 if source_id:
415 query = query.filter_by(source_id=source_id)
417 count = query.delete()
418 session.commit()
420 logger.info(
421 f"Deleted {count} chunks from database for collection '{collection_name}'"
422 )
423 return count
425 except Exception:
426 logger.exception(
427 f"Error deleting chunks from database for collection '{collection_name}'"
428 )
429 return 0
431 def _load_or_create_vector_store(self):
432 """Load the vector store from disk or create it if needed"""
433 vector_store_path = self._get_vector_store_path()
435 # Check if vector store exists and is up to date
436 if vector_store_path.exists() and not self._check_folders_modified():
437 logger.info(
438 f"Loading existing vector store from {vector_store_path}"
439 )
440 try:
441 vector_store = FAISS.load_local(
442 str(vector_store_path),
443 self.embeddings,
444 allow_dangerous_deserialization=True,
445 )
447 # Add this code to show document count
448 doc_count = len(vector_store.index_to_docstore_id)
449 logger.info(f"Loaded index with {doc_count} document chunks")
451 return vector_store
452 except Exception:
453 logger.exception("Error loading vector store")
454 logger.info("Will create a new vector store")
456 # Create a new vector store
457 return self._create_vector_store()
459 def _load_indexed_folders(self) -> Dict[str, Dict[str, Any]]:
460 """Load metadata about indexed folders from disk"""
461 index_metadata_path = self.cache_dir / "index_metadata.json"
463 if index_metadata_path.exists():
464 try:
465 with open(index_metadata_path, "r") as f:
466 return json.load(f)
467 except Exception:
468 logger.exception("Error loading index metadata")
470 return {}
472 def _save_indexed_folders(self):
473 """Save metadata about indexed folders to disk"""
474 index_metadata_path = self.cache_dir / "index_metadata.json"
476 try:
477 with open(index_metadata_path, "w") as f:
478 json.dump(self.indexed_folders, f, indent=2)
479 except Exception:
480 logger.exception("Error saving index metadata")
482 @staticmethod
483 def get_folder_hash(folder_path: Path) -> str:
484 """Generate a hash for a folder based on its path"""
485 # Canonicalize the path so we don't have weird Windows vs. Linux
486 # problems or issues with trailing slashes.
487 canonical_folder_path = "/".join(folder_path.parts)
488 return hashlib.md5( # DevSkim: ignore DS126858
489 canonical_folder_path.encode(), usedforsecurity=False
490 ).hexdigest()
492 def _get_index_path(self, folder_path: Path) -> Path:
493 """Get the path where the index for a specific folder should be stored"""
494 folder_hash = self.get_folder_hash(folder_path)
495 return self.cache_dir / f"index_{folder_hash}"
497 def _check_folder_modified(self, folder_path: Path) -> bool:
498 """Check if a folder has been modified since it was last indexed"""
500 @staticmethod
501 def _get_all_files(folder_path: Path) -> Iterable[Path]:
502 """
503 Gets all the files, recursively, in a folder.
505 Args:
506 folder_path: The path to the folder.
508 Yields:
509 Each of the files in the folder.
511 """
512 for root, _, files in os.walk(folder_path):
513 for file in files: 513 ↛ 514line 513 didn't jump to line 514 because the loop on line 513 never started
514 yield Path(root) / file
516 def _get_modified_files(self, folder_path: Path) -> List[Path]:
517 """
518 Gets the files in a folder that have been modified since it was last
519 indexed.
521 Args:
522 folder_path: The path to the folder to check.
524 Returns:
525 A list of the files that were modified.
527 """
528 if not folder_path.exists() or not folder_path.is_dir():
529 return []
531 folder_hash = self.get_folder_hash(folder_path)
533 if folder_hash not in self.indexed_folders:
534 # If folder has never been indexed, everything has been modified.
535 last_indexed = 0
536 indexed_files = set()
537 else:
538 last_indexed = self.indexed_folders[folder_hash].get(
539 "last_indexed", 0
540 )
541 indexed_files = (
542 self.indexed_folders[folder_hash]
543 .get("indexed_files", {})
544 .keys()
545 )
547 # Check if any file in the folder has been modified since last indexing
548 modified_files = []
549 for file_path in self._get_all_files(folder_path):
550 file_stats = file_path.stat()
551 if file_stats.st_mtime > last_indexed:
552 modified_files.append(file_path)
553 elif str(file_path.relative_to(folder_path)) not in indexed_files:
554 # This file somehow never got indexed.
555 modified_files.append(file_path)
557 return modified_files
559 def _check_config_changed(self, folder_path: Path) -> bool:
560 """
561 Checks if the embedding configuration for a folder has been changed
562 since it was last indexed.
563 """
564 folder_hash = self.get_folder_hash(folder_path)
566 if folder_hash not in self.indexed_folders: 566 ↛ 571line 566 didn't jump to line 571 because the condition on line 566 was always true
567 # It hasn't been indexed at all. That's a new configuration,
568 # technically.
569 return True
571 embedding_config = self.indexed_folders[folder_hash]
572 chunk_size = int(embedding_config.get("chunk_size", 0))
573 chunk_overlap = int(embedding_config.get("chunk_overlap", 0))
574 embedding_model = embedding_config.get("embedding_model", "")
576 if (chunk_size, chunk_overlap, embedding_model) != (
577 self.chunk_size,
578 self.chunk_overlap,
579 self.embedding_model,
580 ):
581 logger.info(
582 "Embedding configuration has changed, re-indexing folder."
583 )
584 return True
585 return False
587 def index_folder(
588 self, folder_path: str, force_reindex: bool = False
589 ) -> bool:
590 """
591 Index all documents in a folder for vector search.
593 Args:
594 folder_path: Path to the folder to index
595 force_reindex: Whether to force reindexing even if unchanged
597 Returns:
598 bool: True if indexing was successful, False otherwise
599 """
600 folder_path = Path(folder_path)
602 # Validate folder
603 if not folder_path.exists(): 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 logger.error(f"Folder not found: {folder_path}")
605 return False
607 if not folder_path.is_dir(): 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true
608 logger.error(f"Path is not a directory: {folder_path}")
609 return False
611 folder_str = str(folder_path)
612 folder_hash = self.get_folder_hash(folder_path)
613 index_path = self._get_index_path(folder_path)
615 if force_reindex or self._check_config_changed(folder_path): 615 ↛ 620line 615 didn't jump to line 620 because the condition on line 615 was always true
616 logger.info(f"Re-indexing entire folder: {folder_path}")
617 modified_files = list(self._get_all_files(folder_path))
618 else:
619 # Just re-index the modified files if we can get away with it.
620 modified_files = self._get_modified_files(folder_path)
621 logger.info(f"Re-indexing {len(modified_files)} modified files...")
623 # Load the vector store from disk if not already loaded
624 if folder_hash not in self.vector_stores and index_path.exists(): 624 ↛ 625line 624 didn't jump to line 625 because the condition on line 624 was never true
625 try:
626 self.vector_stores[folder_hash] = FAISS.load_local(
627 str(index_path),
628 self.embeddings,
629 allow_dangerous_deserialization=True,
630 )
631 logger.info(f"Loaded index for {folder_path} from disk")
632 except Exception:
633 logger.exception(f"Error loading index for {folder_path}")
634 # If loading fails, force reindexing
635 force_reindex = True
637 logger.info(f"Indexing folder: {folder_path}")
638 start_time = time.time()
640 # Find documents to index
641 all_docs = []
643 # Remove hidden files and directories.
644 modified_files = [
645 p
646 for p in modified_files
647 if not p.name.startswith(".")
648 and not any(part.startswith(".") for part in p.parts)
649 ]
650 # Index them.
651 with ProcessPoolExecutor() as executor:
652 all_docs_nested = executor.map(_load_document, modified_files)
653 # Flatten the result.
654 for docs in all_docs_nested: 654 ↛ 655line 654 didn't jump to line 655 because the loop on line 654 never started
655 all_docs.extend(docs)
657 if force_reindex or folder_hash not in self.vector_stores: 657 ↛ 672line 657 didn't jump to line 672 because the condition on line 657 was always true
658 logger.info(f"Creating new index for {folder_path}")
659 # Embed a test query to figure out embedding length.
660 test_embedding = self.embeddings.embed_query("hello world")
661 index = IndexFlatL2(len(test_embedding))
662 # Use minimal docstore - chunks are stored in database
663 self.vector_stores[folder_hash] = FAISS(
664 self.embeddings,
665 index=index,
666 docstore=InMemoryDocstore(), # Minimal - just for FAISS compatibility
667 index_to_docstore_id={},
668 normalize_L2=True,
669 )
671 # Split documents into chunks
672 logger.info(f"Splitting {len(all_docs)} documents into chunks")
673 splits = self.text_splitter.split_documents(all_docs)
674 logger.info(
675 f"Created {len(splits)} chunks from {len(modified_files)} files"
676 )
678 # Store chunks in database and get embedding IDs
679 embedding_ids = []
680 if splits: 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true
681 logger.info(f"Storing {len(splits)} chunks in database")
682 # Get collection name from folder path (last folder name)
683 collection_name = folder_path.name
685 # Store chunks to database
686 embedding_ids = self._store_chunks_to_db(
687 chunks=splits,
688 collection_name=collection_name,
689 source_type="local_file",
690 )
692 logger.info(f"Adding {len(splits)} chunks to FAISS index")
693 # Add embeddings to FAISS using the database-generated IDs
694 self.vector_stores[folder_hash].add_documents(
695 splits, ids=embedding_ids
696 )
698 # Update indexing time for individual files.
699 index_time = time.time()
700 indexed_files = {}
701 if folder_hash in self.indexed_folders: 701 ↛ 702line 701 didn't jump to line 702 because the condition on line 701 was never true
702 indexed_files = (
703 self.indexed_folders[folder_hash]
704 .get("indexed_files", {})
705 .copy()
706 )
707 for embedding_id, split in zip(embedding_ids, splits, strict=False): 707 ↛ 708line 707 didn't jump to line 708 because the loop on line 707 never started
708 split_source = str(
709 Path(split.metadata["source"]).relative_to(folder_path)
710 )
711 id_list = indexed_files.setdefault(split_source, [])
712 id_list.append(embedding_id)
714 # Check for any files that were removed and remove them from the
715 # vector store and database.
716 delete_ids = []
717 delete_paths = []
718 for relative_path, chunk_ids in indexed_files.items(): 718 ↛ 719line 718 didn't jump to line 719 because the loop on line 718 never started
719 if not (folder_path / Path(relative_path)).exists():
720 delete_ids.extend(chunk_ids)
721 delete_paths.append(relative_path)
722 if delete_ids: 722 ↛ 723line 722 didn't jump to line 723 because the condition on line 722 was never true
723 logger.info(
724 f"Deleting {len(delete_paths)} non-existent files from the "
725 f"index and database."
726 )
727 # Delete from FAISS index
728 self.vector_stores[folder_hash].delete(delete_ids)
730 # Delete from database
731 collection_name = folder_path.name
732 for delete_path in delete_paths:
733 full_path = str(folder_path / delete_path)
734 deleted_count = self._delete_chunks_from_db(
735 collection_name=collection_name,
736 source_path=full_path,
737 )
738 logger.debug(
739 f"Deleted {deleted_count} chunks for {delete_path} from database"
740 )
741 for path in delete_paths: 741 ↛ 742line 741 didn't jump to line 742 because the loop on line 741 never started
742 del indexed_files[path]
744 # Save the vector store to disk
745 logger.info(f"Saving index to {index_path}")
746 self.vector_stores[folder_hash].save_local(str(index_path))
748 # Update metadata
749 self.indexed_folders[folder_hash] = {
750 "path": folder_str,
751 "last_indexed": index_time,
752 "file_count": len(modified_files),
753 "chunk_count": len(splits),
754 "embedding_model": self.embedding_model,
755 "chunk_size": self.chunk_size,
756 "chunk_overlap": self.chunk_overlap,
757 "indexed_files": indexed_files,
758 }
760 # Save updated metadata
761 self._save_indexed_folders()
763 elapsed_time = time.time() - start_time
764 logger.info(
765 f"Indexed {len(modified_files)} files in {elapsed_time:.2f} seconds"
766 )
768 return True
770 def search(
771 self,
772 query: str,
773 folder_paths: List[str],
774 limit: int = 10,
775 score_threshold: float = 0.0,
776 ) -> List[Dict[str, Any]]:
777 """
778 Search for documents relevant to a query across specified folders.
780 Args:
781 query: The search query
782 folder_paths: List of folder paths to search in
783 limit: Maximum number of results to return
784 score_threshold: Minimum similarity score threshold
786 Returns:
787 List of results with document content and metadata
788 """
789 folder_paths = [Path(p) for p in folder_paths]
791 # Add detailed debugging for each folder
792 for folder_path in folder_paths:
793 folder_hash = self.get_folder_hash(folder_path)
794 index_path = self._get_index_path(folder_path)
796 logger.info(f"Diagnostic for {folder_path}:")
797 logger.info(f" - Folder hash: {folder_hash}")
798 logger.info(f" - Index path: {index_path}")
799 logger.info(f" - Index exists on disk: {index_path.exists()}")
800 logger.info(
801 f" - Is in indexed_folders: {folder_hash in self.indexed_folders}"
802 )
804 if folder_hash in self.indexed_folders:
805 meta = self.indexed_folders[folder_hash]
806 logger.info(
807 f" - Metadata: file_count={meta.get('file_count', 0)}, chunk_count={meta.get('chunk_count', 0)}"
808 )
810 # Validate folders exist
811 valid_folder_paths = []
812 for path in folder_paths:
813 if path.exists() and path.is_dir():
814 valid_folder_paths.append(path)
815 else:
816 logger.warning(
817 f"Skipping non-existent folder in search: {path}"
818 )
820 # If no valid folders, return empty results
821 if not valid_folder_paths:
822 logger.warning(f"No valid folders to search among: {folder_paths}")
823 return []
825 all_results = []
827 for folder_path in valid_folder_paths:
828 folder_hash = self.get_folder_hash(folder_path)
830 # Skip folders that haven't been indexed
831 if folder_hash not in self.indexed_folders:
832 logger.warning(f"Folder {folder_path} has not been indexed")
833 continue
835 # Make sure the vector store is loaded
836 if folder_hash not in self.vector_stores:
837 index_path = self._get_index_path(folder_path)
838 try:
839 self.vector_stores[folder_hash] = FAISS.load_local(
840 str(index_path),
841 self.embeddings,
842 allow_dangerous_deserialization=True,
843 )
844 except Exception:
845 logger.exception(f"Error loading index for {folder_path}")
846 continue
848 # Search in this folder
849 vector_store = self.vector_stores[folder_hash]
851 try:
852 # Get query embedding
853 query_vector = self.embeddings.embed_query(query)
855 # Search FAISS index for similar vectors
856 # Returns: (distances, indices) where indices are FAISS internal indices
857 distances, indices = vector_store.index.search(
858 np.array([query_vector], dtype=np.float32), limit
859 )
861 # Convert distances to similarity scores (L2 distance -> similarity)
862 # For L2: smaller distance = more similar
863 # Convert to similarity: 1 / (1 + distance)
864 similarities = 1 / (1 + distances[0])
866 # Get embedding IDs from FAISS mapping
867 embedding_ids = []
868 valid_indices = []
869 for idx, faiss_idx in enumerate(indices[0]):
870 if faiss_idx == -1: # FAISS returns -1 for empty results
871 continue
872 if faiss_idx in vector_store.index_to_docstore_id:
873 embedding_id = vector_store.index_to_docstore_id[
874 faiss_idx
875 ]
876 embedding_ids.append(embedding_id)
877 valid_indices.append(idx)
879 # Load chunks from database
880 if embedding_ids:
881 db_chunks = self._load_chunks_from_db(
882 embedding_ids, self.username
883 )
885 # Create results from database chunks
886 for idx, chunk in zip(valid_indices, db_chunks):
887 similarity = float(similarities[idx])
889 # Skip results below the threshold
890 if similarity < score_threshold:
891 continue
893 # Extract metadata from chunk
894 metadata = chunk.get("document_metadata", {})
895 if "source" not in metadata and chunk.get(
896 "source_path"
897 ):
898 metadata["source"] = chunk["source_path"]
900 result = {
901 "content": chunk["chunk_text"],
902 "metadata": metadata,
903 "similarity": similarity,
904 "folder": folder_path,
905 }
907 all_results.append(result)
908 except Exception:
909 logger.exception(f"Error searching in {folder_path}")
911 # Sort by similarity (highest first)
912 all_results.sort(key=lambda x: x["similarity"], reverse=True)
914 # Limit to the requested number
915 return all_results[:limit]
917 def clear_cache(self):
918 """Clear all cached vector stores from memory (not disk)"""
919 self.vector_stores.clear()
921 def get_indexed_folders_info(self) -> List[Dict[str, Any]]:
922 """Get information about all indexed folders"""
923 info = []
925 for folder_hash, metadata in self.indexed_folders.items():
926 folder_info = metadata.copy()
928 # Add formatted last indexed time
929 if "last_indexed" in folder_info:
930 folder_info["last_indexed_formatted"] = datetime.fromtimestamp(
931 folder_info["last_indexed"]
932 ).strftime("%Y-%m-%d %H:%M:%S")
934 # Check if index file exists
935 index_path = self._get_index_path(Path(folder_info["path"]))
936 folder_info["index_exists"] = index_path.exists()
938 info.append(folder_info)
940 return info
943class LocalSearchEngine(BaseSearchEngine):
944 """Local document search engine with two-phase retrieval"""
946 def __init__(
947 self,
948 paths: List[str],
949 llm: Optional[BaseLLM] = None,
950 max_results: int = 10,
951 max_filtered_results: Optional[int] = None,
952 embedding_model: str = "all-MiniLM-L6-v2",
953 embedding_device: str = "cpu",
954 embedding_model_type: str = "sentence_transformers",
955 ollama_base_url: Optional[str] = None,
956 force_reindex: bool = False,
957 chunk_size: int = 1000,
958 chunk_overlap: int = 200,
959 cache_dir: Optional[str] = None,
960 collections: Optional[Dict[str, Dict[str, Any]]] = None,
961 name: str = "",
962 description: str = "",
963 ):
964 """
965 Initialize the local search engine.
967 Args:
968 paths: List of folder paths to search in
969 llm: Language model for relevance filtering
970 max_results: Maximum number of results to return
971 max_filtered_results: Maximum results after filtering
972 embedding_model: Name of the embedding model to use
973 embedding_device: Device to run embeddings on ('cpu' or 'cuda')
974 embedding_model_type: Type of embedding model
975 ollama_base_url: Base URL for Ollama API
976 force_reindex: Whether to force reindexing
977 chunk_size: Size of text chunks for splitting documents
978 chunk_overlap: Overlap between chunks
979 cache_dir: Directory to store embedding cache and index
980 collections: Dictionary of named collections with paths and descriptions
981 name: Human-readable name of the collection we are searching.
982 description: Human-readable description of the collection we are
983 searching.
984 """
985 # Initialize the base search engine
986 super().__init__(llm=llm, max_filtered_results=max_filtered_results)
988 self.name = name
989 self.description = description
991 # Validate folder paths
992 self.folder_paths = paths
993 self.valid_folder_paths = []
994 for path_str in paths:
995 path = Path(path_str)
996 if path.exists() and path.is_dir(): 996 ↛ 999line 996 didn't jump to line 999 because the condition on line 996 was always true
997 self.valid_folder_paths.append(path_str)
998 else:
999 logger.warning(
1000 f"Folder not found or is not a directory: {path_str}"
1001 )
1003 # If no valid folders, log a clear message
1004 if not self.valid_folder_paths and paths: 1004 ↛ 1005line 1004 didn't jump to line 1005 because the condition on line 1004 was never true
1005 logger.warning(f"No valid folders found among: {paths}")
1006 logger.warning(
1007 "This search engine will return no results until valid folders are configured"
1008 )
1010 self.max_results = max_results
1011 self.collections = collections or {
1012 "default": {"paths": paths, "description": "Default collection"}
1013 }
1015 # Initialize the embedding manager with only valid folders
1016 self.embedding_manager = LocalEmbeddingManager(
1017 embedding_model=embedding_model,
1018 embedding_device=embedding_device,
1019 embedding_model_type=embedding_model_type,
1020 ollama_base_url=ollama_base_url,
1021 chunk_size=chunk_size,
1022 chunk_overlap=chunk_overlap,
1023 cache_dir=cache_dir,
1024 settings_snapshot=self.settings_snapshot,
1025 )
1027 # Index all folders
1028 self._index_folders(force_reindex)
1030 def _index_folders(self, force_reindex: bool = False):
1031 """Index all valid configured folders"""
1032 indexed = []
1033 failed = []
1034 skipped = []
1036 # Keep track of invalid folders
1037 for folder in self.folder_paths:
1038 if folder not in self.valid_folder_paths: 1038 ↛ 1039line 1038 didn't jump to line 1039 because the condition on line 1038 was never true
1039 skipped.append(folder)
1040 continue
1042 success = self.embedding_manager.index_folder(folder, force_reindex)
1043 if success: 1043 ↛ 1046line 1043 didn't jump to line 1046 because the condition on line 1043 was always true
1044 indexed.append(folder)
1045 else:
1046 failed.append(folder)
1048 if indexed: 1048 ↛ 1053line 1048 didn't jump to line 1053 because the condition on line 1048 was always true
1049 logger.info(
1050 f"Successfully indexed {len(indexed)} folders: {', '.join(indexed)}"
1051 )
1053 if failed: 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true
1054 logger.warning(
1055 f"Failed to index {len(failed)} folders: {', '.join(failed)}"
1056 )
1058 if skipped: 1058 ↛ 1059line 1058 didn't jump to line 1059 because the condition on line 1058 was never true
1059 logger.warning(
1060 f"Skipped {len(skipped)} invalid folders: {', '.join(skipped)}"
1061 )
1063 def _get_previews(
1064 self, query: str, collection_names: Optional[List[str]] = None
1065 ) -> List[Dict[str, Any]]:
1066 """
1067 Get preview information for documents matching the query.
1069 Args:
1070 query: The search query
1071 collection_names: Specific collections to search within (if None, search all)
1073 Returns:
1074 List of preview dictionaries
1075 """
1076 # Determine which collections to search
1077 if collection_names:
1078 # Search only in specified collections
1079 collections_to_search = {
1080 name: self.collections[name]
1081 for name in collection_names
1082 if name in self.collections
1083 }
1084 if not collections_to_search:
1085 logger.warning(
1086 f"No valid collections found among: {collection_names}"
1087 )
1088 return []
1089 else:
1090 # Search in all collections
1091 collections_to_search = self.collections
1093 # Extract all folder paths from the collections to search
1094 search_paths = []
1095 for collection_config in collections_to_search.values():
1096 if "paths" in collection_config:
1097 search_paths.extend(collection_config["paths"])
1099 logger.info(
1100 f"Searching local documents in collections: {list(collections_to_search.keys())}"
1101 )
1103 # Filter out invalid paths
1104 valid_search_paths = [
1105 path for path in search_paths if path in self.valid_folder_paths
1106 ]
1108 if not valid_search_paths:
1109 logger.warning(
1110 f"No valid folders to search in collections: {list(collections_to_search.keys())}"
1111 )
1112 return []
1114 # Search across the valid selected folders
1115 raw_results = self.embedding_manager.search(
1116 query=query,
1117 folder_paths=valid_search_paths,
1118 limit=self.max_results,
1119 score_threshold=0.1, # Skip very low relevance results
1120 )
1122 if not raw_results:
1123 logger.info(f"No local documents found for query: {query}")
1124 return []
1126 # Convert to preview format
1127 previews = []
1128 for i, result in enumerate(raw_results):
1129 # Create a unique ID
1130 result_id = f"local-{i}-{hashlib.md5(result['content'][:50].encode(), usedforsecurity=False).hexdigest()}" # DevSkim: ignore DS126858
1132 # Extract filename and path
1133 source_path = result["metadata"].get("source", "Unknown")
1134 filename = result["metadata"].get(
1135 "filename", Path(source_path).name
1136 )
1138 # Create preview snippet (first ~200 chars of content)
1139 snippet = (
1140 result["content"][:200] + "..."
1141 if len(result["content"]) > 200
1142 else result["content"]
1143 )
1145 # Determine which collection this document belongs to
1146 collection_name = "Unknown"
1147 folder_path = result["folder"]
1148 for name, collection in self.collections.items():
1149 if any(
1150 folder_path.is_relative_to(path)
1151 for path in collection.get("paths", [])
1152 ):
1153 break
1155 # Format the preview
1156 preview = {
1157 "id": result_id,
1158 "title": filename,
1159 "snippet": snippet,
1160 "link": source_path,
1161 "similarity": result["similarity"],
1162 "folder": folder_path.as_posix(),
1163 "collection": collection_name,
1164 "collection_description": self.collections.get(
1165 collection_name, {}
1166 ).get("description", ""),
1167 "_full_content": result[
1168 "content"
1169 ], # Store full content for later
1170 "_metadata": result["metadata"], # Store metadata for later
1171 }
1173 previews.append(preview)
1175 logger.info(f"Found {len(previews)} local document matches")
1176 return previews
1178 def _get_full_content(
1179 self, relevant_items: List[Dict[str, Any]]
1180 ) -> List[Dict[str, Any]]:
1181 """
1182 Get full content for the relevant documents.
1183 For local search, the full content is already available.
1185 Args:
1186 relevant_items: List of relevant preview dictionaries
1188 Returns:
1189 List of result dictionaries with full content
1190 """
1191 # Check if we should add full content
1192 if (
1193 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
1194 and search_config.SEARCH_SNIPPETS_ONLY
1195 ):
1196 logger.info("Snippet-only mode, skipping full content addition")
1197 return relevant_items
1199 # For local search, we already have the full content
1200 results = []
1201 for item in relevant_items:
1202 # Create a copy with full content
1203 result = item.copy()
1205 # Add full content if we have it
1206 if "_full_content" in item:
1207 result["content"] = item["_full_content"]
1208 result["full_content"] = item["_full_content"]
1210 # Remove temporary fields
1211 if "_full_content" in result:
1212 del result["_full_content"]
1214 # Add metadata if we have it
1215 if "_metadata" in item:
1216 result["document_metadata"] = item["_metadata"]
1218 # Remove temporary fields
1219 if "_metadata" in result:
1220 del result["_metadata"]
1222 results.append(result)
1224 return results
1226 def run(
1227 self,
1228 query: str,
1229 research_context: Dict[str, Any] | None = None,
1230 collection_names: Optional[List[str]] = None,
1231 ) -> List[Dict[str, Any]]:
1232 """
1233 Execute a search using the two-phase approach.
1235 Args:
1236 query: The search query
1237 research_context: Context from previous research to use.
1238 collection_names: Specific collections to search within (if None, search all)
1240 Returns:
1241 List of search result dictionaries with full content
1242 """
1243 logger.info("---Execute a search using Local Documents---")
1245 # Check if we have any special collection parameters in the query
1246 collection_prefix = "collection:"
1247 remaining_query = query
1248 specified_collections = []
1250 # Parse query for collection specifications like "collection:research_papers query terms"
1251 query_parts = query.split()
1252 for part in query_parts:
1253 if part.lower().startswith(collection_prefix):
1254 collection_name = part[len(collection_prefix) :].strip()
1255 if collection_name in self.collections:
1256 specified_collections.append(collection_name)
1257 # Remove this part from the query
1258 remaining_query = remaining_query.replace(
1259 part, "", 1
1260 ).strip()
1262 # If collections were specified in the query, they override the parameter
1263 if specified_collections:
1264 collection_names = specified_collections
1265 query = remaining_query
1267 # Phase 1: Get previews (with collection filtering)
1268 previews = self._get_previews(query, collection_names)
1270 if not previews:
1271 return []
1273 # Phase 2: Filter for relevance
1274 relevant_items = self._filter_for_relevance(previews, query)
1276 if not relevant_items:
1277 return []
1279 # Phase 3: Get full content for relevant items
1280 if (
1281 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
1282 and search_config.SEARCH_SNIPPETS_ONLY
1283 ):
1284 logger.info("Returning snippet-only results as per config")
1285 results = relevant_items
1286 else:
1287 results = self._get_full_content(relevant_items)
1289 # Clean up temporary data
1290 self.embedding_manager.clear_cache()
1292 return results
1294 def get_collections_info(self) -> List[Dict[str, Any]]:
1295 """
1296 Get information about all collections, including indexing status.
1298 Returns:
1299 List of collection information dictionaries
1300 """
1301 collections_info = []
1303 for name, collection in self.collections.items():
1304 paths = collection.get("paths", [])
1305 paths = [Path(p) for p in paths]
1306 description = collection.get("description", "")
1308 # Get indexing information for each path
1309 paths_info = []
1310 for path in paths:
1311 # Check if folder exists
1312 exists = path.exists() and path.is_dir()
1314 # Check if folder is indexed
1315 folder_hash = self.embedding_manager.get_folder_hash(path)
1316 indexed = folder_hash in self.embedding_manager.indexed_folders
1318 # Get index details if available
1319 index_info = {}
1320 if indexed:
1321 index_info = self.embedding_manager.indexed_folders[
1322 folder_hash
1323 ].copy()
1325 paths_info.append(
1326 {
1327 "path": path,
1328 "exists": exists,
1329 "indexed": indexed,
1330 "index_info": index_info,
1331 }
1332 )
1334 collections_info.append(
1335 {
1336 "name": name,
1337 "description": description,
1338 "paths": paths,
1339 "paths_info": paths_info,
1340 "document_count": sum(
1341 info.get("index_info", {}).get("file_count", 0)
1342 for info in paths_info
1343 ),
1344 "chunk_count": sum(
1345 info.get("index_info", {}).get("chunk_count", 0)
1346 for info in paths_info
1347 ),
1348 "all_indexed": all(
1349 info["indexed"] for info in paths_info if info["exists"]
1350 ),
1351 }
1352 )
1354 return collections_info
1356 def reindex_collection(self, collection_name: str) -> bool:
1357 """
1358 Reindex a specific collection.
1360 Args:
1361 collection_name: Name of the collection to reindex
1363 Returns:
1364 True if reindexing was successful, False otherwise
1365 """
1366 if collection_name not in self.collections:
1367 logger.error(f"Collection '{collection_name}' not found")
1368 return False
1370 paths = self.collections[collection_name].get("paths", [])
1371 success = True
1373 for path in paths:
1374 if not self.embedding_manager.index_folder(
1375 path, force_reindex=True
1376 ):
1377 success = False
1379 return success
1381 @classmethod
1382 def from_config(
1383 cls, config_dict: Dict[str, Any], llm: Optional[BaseLLM] = None
1384 ) -> "LocalSearchEngine":
1385 """
1386 Create a LocalSearchEngine instance from a configuration dictionary.
1388 Args:
1389 config_dict: Configuration dictionary
1390 llm: Language model for relevance filtering
1392 Returns:
1393 Initialized LocalSearchEngine instance
1394 """
1395 # Required parameters
1396 folder_paths = []
1397 collections = config_dict.get("collections", {})
1399 # Extract all folder paths from collections
1400 for collection_config in collections.values(): 1400 ↛ 1401line 1400 didn't jump to line 1401 because the loop on line 1400 never started
1401 if "paths" in collection_config:
1402 folder_paths.extend(collection_config["paths"])
1404 # Fall back to folder_paths if no collections defined
1405 if not folder_paths: 1405 ↛ 1417line 1405 didn't jump to line 1417 because the condition on line 1405 was always true
1406 folder_paths = config_dict.get("folder_paths", [])
1407 # Create a default collection if using folder_paths
1408 if folder_paths: 1408 ↛ 1417line 1408 didn't jump to line 1417 because the condition on line 1408 was always true
1409 collections = {
1410 "default": {
1411 "paths": folder_paths,
1412 "description": "Default collection",
1413 }
1414 }
1416 # Optional parameters with defaults
1417 max_results = config_dict.get("max_results", 10)
1418 max_filtered_results = config_dict.get("max_filtered_results")
1419 embedding_model = config_dict.get("embedding_model", "all-MiniLM-L6-v2")
1420 embedding_device = config_dict.get("embedding_device", "cpu")
1421 embedding_model_type = config_dict.get(
1422 "embedding_model_type", "sentence_transformers"
1423 )
1424 ollama_base_url = config_dict.get("ollama_base_url")
1425 force_reindex = config_dict.get("force_reindex", False)
1426 chunk_size = config_dict.get("chunk_size", 1000)
1427 chunk_overlap = config_dict.get("chunk_overlap", 200)
1428 cache_dir = config_dict.get(
1429 "cache_dir"
1430 ) # None uses app's cache directory
1432 return cls(
1433 paths=folder_paths,
1434 collections=collections,
1435 llm=llm,
1436 max_results=max_results,
1437 max_filtered_results=max_filtered_results,
1438 embedding_model=embedding_model,
1439 embedding_device=embedding_device,
1440 embedding_model_type=embedding_model_type,
1441 ollama_base_url=ollama_base_url,
1442 force_reindex=force_reindex,
1443 chunk_size=chunk_size,
1444 chunk_overlap=chunk_overlap,
1445 cache_dir=cache_dir,
1446 )