Coverage for src/local_deep_research/web_search_engines/engines/local_embedding_manager.py: 98%
109 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 23:15 +0000
1import hashlib
2import threading
3import uuid
4from datetime import UTC, datetime
5from typing import Any, Dict, List, Optional
7from langchain_community.embeddings import (
8 HuggingFaceEmbeddings,
9)
10from langchain_core.documents import Document
11from loguru import logger
13from ...database.models.library import DocumentChunk
14from ...database.session_context import get_user_db_session
15from ...utilities.url_utils import normalize_url
18class LocalEmbeddingManager:
19 """Handles embedding generation and storage for local document search"""
21 def __init__(
22 self,
23 embedding_model: str = "all-MiniLM-L6-v2",
24 embedding_device: str = "cpu",
25 embedding_model_type: str = "sentence_transformers", # or 'ollama'
26 ollama_base_url: Optional[str] = None,
27 settings_snapshot: Optional[Dict[str, Any]] = None,
28 ):
29 """
30 Initialize the embedding manager for local document search.
32 Args:
33 embedding_model: Name of the embedding model to use
34 embedding_device: Device to run embeddings on ('cpu' or 'cuda')
35 embedding_model_type: Type of embedding model ('sentence_transformers' or 'ollama')
36 ollama_base_url: Base URL for Ollama API if using ollama embeddings
37 settings_snapshot: Optional settings snapshot for background threads
38 """
40 self.embedding_model = embedding_model
41 self.embedding_device = embedding_device
42 self.embedding_model_type = embedding_model_type
43 self.ollama_base_url = ollama_base_url
44 self.settings_snapshot = settings_snapshot or {}
46 # Username for database access (extracted from settings if available)
47 self.username = (
48 settings_snapshot.get("_username") if settings_snapshot else None
49 )
50 # Password for encrypted database access (can be set later)
51 self.db_password = None
53 # Initialize the embedding model (with lock for thread-safe lazy init)
54 self._embeddings = None
55 self._embedding_lock = threading.Lock()
57 # Vector store cache
58 self.vector_stores: dict[str, Any] = {}
60 # Track if this manager has been closed
61 self._closed = False
63 def close(self):
64 """Release embedding model resources.
66 For Ollama embeddings, this also closes the underlying per-instance
67 ``httpx.Client`` / ``httpx.AsyncClient`` pair. langchain_ollama's
68 ``OllamaEmbeddings`` eagerly constructs both clients in its Pydantic
69 ``@model_validator(mode="after")``, so dropping the Python reference
70 alone leaks ~2 FDs per instance — see the migration regression note
71 in docs/developing/resource-cleanup.md. Non-Ollama providers
72 (sentence_transformers, OpenAI's lru_cache'd shared client) are
73 no-ops via the module-prefix check inside ``_close_base_llm``.
74 """
75 if self._closed:
76 return
77 self._closed = True
78 if self._embeddings is not None:
79 from ...utilities.llm_utils import _close_base_llm
81 _close_base_llm(self._embeddings)
82 self._embeddings = None
83 # Clear vector store cache
84 self.vector_stores.clear()
85 logger.debug("LocalEmbeddingManager closed")
87 def __enter__(self):
88 """Context manager entry."""
89 return self
91 def __exit__(self, exc_type, exc_val, exc_tb):
92 """Context manager exit - ensures resources are released."""
93 self.close()
94 return False
96 @property
97 def embeddings(self):
98 """
99 Lazily initialize embeddings when first accessed.
100 This allows the LocalEmbeddingManager to be created without
101 immediately loading models, which is helpful when no local search is performed.
103 Uses double-checked locking to ensure thread-safe initialization.
104 Concurrent SentenceTransformer model loading causes meta tensor errors
105 in PyTorch when multiple threads call model.to(device) simultaneously.
106 """
107 if self._embeddings is None:
108 with self._embedding_lock:
109 if self._embeddings is None:
110 logger.info("Initializing embeddings on first use")
111 self._embeddings = self._initialize_embeddings()
112 return self._embeddings
114 def _initialize_embeddings(self):
115 """Initialize the embedding model based on configuration"""
116 try:
117 # Use the new unified embedding system
118 from ...embeddings import get_embeddings
120 # Prepare kwargs for provider-specific parameters
121 kwargs = {}
123 # Add device for sentence transformers
124 if self.embedding_model_type == "sentence_transformers":
125 kwargs["device"] = self.embedding_device
127 # Add base_url for ollama if specified
128 if self.embedding_model_type == "ollama" and self.ollama_base_url:
129 kwargs["base_url"] = normalize_url(self.ollama_base_url)
131 logger.info(
132 f"Initializing embeddings with provider={self.embedding_model_type}, model={self.embedding_model}"
133 )
135 return get_embeddings(
136 provider=self.embedding_model_type,
137 model=self.embedding_model,
138 settings_snapshot=self.settings_snapshot,
139 **kwargs,
140 )
141 except Exception:
142 logger.exception("Error initializing embeddings")
143 logger.warning(
144 "Falling back to HuggingFaceEmbeddings with all-MiniLM-L6-v2"
145 )
146 return HuggingFaceEmbeddings(
147 model_name="sentence-transformers/all-MiniLM-L6-v2"
148 )
150 def _store_chunks_to_db(
151 self,
152 chunks: List[Document],
153 collection_name: str,
154 source_path: Optional[str] = None,
155 source_id: Optional[int] = None,
156 source_type: str = "local_file",
157 ) -> List[str]:
158 """
159 Store document chunks in the database.
161 Args:
162 chunks: List of LangChain Document chunks
163 collection_name: Name of the collection (e.g., 'personal_notes', 'library')
164 source_path: Path to source file (for local files)
165 source_id: ID of source document (for library documents)
166 source_type: Type of source ('local_file' or 'library')
168 Returns:
169 List of chunk embedding IDs (UUIDs) for FAISS mapping
170 """
171 if not self.username:
172 logger.warning(
173 "No username available, cannot store chunks in database"
174 )
175 return []
177 chunk_ids = []
179 try:
180 with get_user_db_session(
181 self.username, self.db_password
182 ) as session:
183 for idx, chunk in enumerate(chunks):
184 # Generate unique hash for chunk
185 chunk_text = chunk.page_content
186 chunk_hash = hashlib.sha256(chunk_text.encode()).hexdigest()
188 # Generate unique embedding ID
189 embedding_id = uuid.uuid4().hex
191 # Extract metadata
192 metadata = chunk.metadata or {}
193 document_title = metadata.get(
194 "filename", metadata.get("title", "Unknown")
195 )
197 # Calculate word count
198 word_count = len(chunk_text.split())
200 # Get character positions from metadata if available
201 start_char = metadata.get("start_char", 0)
202 end_char = metadata.get("end_char", len(chunk_text))
204 # Check if chunk already exists
205 existing_chunk = (
206 session.query(DocumentChunk)
207 .filter_by(chunk_hash=chunk_hash)
208 .first()
209 )
211 if existing_chunk:
212 # Update existing chunk
213 existing_chunk.last_accessed = datetime.now(UTC)
214 chunk_ids.append(existing_chunk.embedding_id)
215 logger.debug(
216 f"Chunk already exists, reusing: {existing_chunk.embedding_id}"
217 )
218 else:
219 # Create new chunk
220 db_chunk = DocumentChunk(
221 chunk_hash=chunk_hash,
222 source_type=source_type,
223 source_id=source_id,
224 source_path=str(source_path)
225 if source_path
226 else None,
227 collection_name=collection_name,
228 chunk_text=chunk_text,
229 chunk_index=idx,
230 start_char=start_char,
231 end_char=end_char,
232 word_count=word_count,
233 embedding_id=embedding_id,
234 embedding_model=self.embedding_model,
235 embedding_model_type=self.embedding_model_type,
236 document_title=document_title,
237 document_metadata=metadata,
238 )
239 session.add(db_chunk)
240 chunk_ids.append(embedding_id)
242 session.commit()
243 logger.info(
244 f"Stored {len(chunk_ids)} chunks to database for collection '{collection_name}'"
245 )
247 except Exception:
248 logger.exception(
249 f"Error storing chunks to database for collection '{collection_name}'"
250 )
251 return []
253 return chunk_ids
255 def _delete_chunks_from_db(
256 self,
257 collection_name: str,
258 source_path: Optional[str] = None,
259 source_id: Optional[int] = None,
260 ) -> int:
261 """
262 Delete chunks from database.
264 Args:
265 collection_name: Name of the collection
266 source_path: Path to source file (for local files)
267 source_id: ID of source document (for library documents)
269 Returns:
270 Number of chunks deleted
271 """
272 if not self.username:
273 logger.warning(
274 "No username available, cannot delete chunks from database"
275 )
276 return 0
278 try:
279 with get_user_db_session(
280 self.username, self.db_password
281 ) as session:
282 query = session.query(DocumentChunk).filter_by(
283 collection_name=collection_name
284 )
286 if source_path:
287 query = query.filter_by(source_path=str(source_path))
288 if source_id: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true
289 query = query.filter_by(source_id=source_id)
291 count = int(query.delete())
292 session.commit()
294 logger.info(
295 f"Deleted {count} chunks from database for collection '{collection_name}'"
296 )
297 return count
299 except Exception:
300 logger.exception(
301 f"Error deleting chunks from database for collection '{collection_name}'"
302 )
303 return 0