Coverage for src/local_deep_research/web_search_engines/engines/local_embedding_manager.py: 98%

109 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1import hashlib 

2import threading 

3import uuid 

4from datetime import UTC, datetime 

5from typing import Any, Dict, List, Optional 

6 

7from langchain_community.embeddings import ( 

8 HuggingFaceEmbeddings, 

9) 

10from langchain_core.documents import Document 

11from loguru import logger 

12 

13from ...database.models.library import DocumentChunk 

14from ...database.session_context import get_user_db_session 

15from ...utilities.url_utils import normalize_url 

16 

17 

18class LocalEmbeddingManager: 

19 """Handles embedding generation and storage for local document search""" 

20 

21 def __init__( 

22 self, 

23 embedding_model: str = "all-MiniLM-L6-v2", 

24 embedding_device: str = "cpu", 

25 embedding_model_type: str = "sentence_transformers", # or 'ollama' 

26 ollama_base_url: Optional[str] = None, 

27 settings_snapshot: Optional[Dict[str, Any]] = None, 

28 ): 

29 """ 

30 Initialize the embedding manager for local document search. 

31 

32 Args: 

33 embedding_model: Name of the embedding model to use 

34 embedding_device: Device to run embeddings on ('cpu' or 'cuda') 

35 embedding_model_type: Type of embedding model ('sentence_transformers' or 'ollama') 

36 ollama_base_url: Base URL for Ollama API if using ollama embeddings 

37 settings_snapshot: Optional settings snapshot for background threads 

38 """ 

39 

40 self.embedding_model = embedding_model 

41 self.embedding_device = embedding_device 

42 self.embedding_model_type = embedding_model_type 

43 self.ollama_base_url = ollama_base_url 

44 self.settings_snapshot = settings_snapshot or {} 

45 

46 # Username for database access (extracted from settings if available) 

47 self.username = ( 

48 settings_snapshot.get("_username") if settings_snapshot else None 

49 ) 

50 # Password for encrypted database access (can be set later) 

51 self.db_password = None 

52 

53 # Initialize the embedding model (with lock for thread-safe lazy init) 

54 self._embeddings = None 

55 self._embedding_lock = threading.Lock() 

56 

57 # Vector store cache 

58 self.vector_stores: dict[str, Any] = {} 

59 

60 # Track if this manager has been closed 

61 self._closed = False 

62 

63 def close(self): 

64 """Release embedding model resources. 

65 

66 For Ollama embeddings, this also closes the underlying per-instance 

67 ``httpx.Client`` / ``httpx.AsyncClient`` pair. langchain_ollama's 

68 ``OllamaEmbeddings`` eagerly constructs both clients in its Pydantic 

69 ``@model_validator(mode="after")``, so dropping the Python reference 

70 alone leaks ~2 FDs per instance — see the migration regression note 

71 in docs/developing/resource-cleanup.md. Non-Ollama providers 

72 (sentence_transformers, OpenAI's lru_cache'd shared client) are 

73 no-ops via the module-prefix check inside ``_close_base_llm``. 

74 """ 

75 if self._closed: 

76 return 

77 self._closed = True 

78 if self._embeddings is not None: 

79 from ...utilities.llm_utils import _close_base_llm 

80 

81 _close_base_llm(self._embeddings) 

82 self._embeddings = None 

83 # Clear vector store cache 

84 self.vector_stores.clear() 

85 logger.debug("LocalEmbeddingManager closed") 

86 

87 def __enter__(self): 

88 """Context manager entry.""" 

89 return self 

90 

91 def __exit__(self, exc_type, exc_val, exc_tb): 

92 """Context manager exit - ensures resources are released.""" 

93 self.close() 

94 return False 

95 

96 @property 

97 def embeddings(self): 

98 """ 

99 Lazily initialize embeddings when first accessed. 

100 This allows the LocalEmbeddingManager to be created without 

101 immediately loading models, which is helpful when no local search is performed. 

102 

103 Uses double-checked locking to ensure thread-safe initialization. 

104 Concurrent SentenceTransformer model loading causes meta tensor errors 

105 in PyTorch when multiple threads call model.to(device) simultaneously. 

106 """ 

107 if self._embeddings is None: 

108 with self._embedding_lock: 

109 if self._embeddings is None: 

110 logger.info("Initializing embeddings on first use") 

111 self._embeddings = self._initialize_embeddings() 

112 return self._embeddings 

113 

114 def _initialize_embeddings(self): 

115 """Initialize the embedding model based on configuration""" 

116 try: 

117 # Use the new unified embedding system 

118 from ...embeddings import get_embeddings 

119 

120 # Prepare kwargs for provider-specific parameters 

121 kwargs = {} 

122 

123 # Add device for sentence transformers 

124 if self.embedding_model_type == "sentence_transformers": 

125 kwargs["device"] = self.embedding_device 

126 

127 # Add base_url for ollama if specified 

128 if self.embedding_model_type == "ollama" and self.ollama_base_url: 

129 kwargs["base_url"] = normalize_url(self.ollama_base_url) 

130 

131 logger.info( 

132 f"Initializing embeddings with provider={self.embedding_model_type}, model={self.embedding_model}" 

133 ) 

134 

135 return get_embeddings( 

136 provider=self.embedding_model_type, 

137 model=self.embedding_model, 

138 settings_snapshot=self.settings_snapshot, 

139 **kwargs, 

140 ) 

141 except Exception: 

142 logger.exception("Error initializing embeddings") 

143 logger.warning( 

144 "Falling back to HuggingFaceEmbeddings with all-MiniLM-L6-v2" 

145 ) 

146 return HuggingFaceEmbeddings( 

147 model_name="sentence-transformers/all-MiniLM-L6-v2" 

148 ) 

149 

150 def _store_chunks_to_db( 

151 self, 

152 chunks: List[Document], 

153 collection_name: str, 

154 source_path: Optional[str] = None, 

155 source_id: Optional[int] = None, 

156 source_type: str = "local_file", 

157 ) -> List[str]: 

158 """ 

159 Store document chunks in the database. 

160 

161 Args: 

162 chunks: List of LangChain Document chunks 

163 collection_name: Name of the collection (e.g., 'personal_notes', 'library') 

164 source_path: Path to source file (for local files) 

165 source_id: ID of source document (for library documents) 

166 source_type: Type of source ('local_file' or 'library') 

167 

168 Returns: 

169 List of chunk embedding IDs (UUIDs) for FAISS mapping 

170 """ 

171 if not self.username: 

172 logger.warning( 

173 "No username available, cannot store chunks in database" 

174 ) 

175 return [] 

176 

177 chunk_ids = [] 

178 

179 try: 

180 with get_user_db_session( 

181 self.username, self.db_password 

182 ) as session: 

183 for idx, chunk in enumerate(chunks): 

184 # Generate unique hash for chunk 

185 chunk_text = chunk.page_content 

186 chunk_hash = hashlib.sha256(chunk_text.encode()).hexdigest() 

187 

188 # Generate unique embedding ID 

189 embedding_id = uuid.uuid4().hex 

190 

191 # Extract metadata 

192 metadata = chunk.metadata or {} 

193 document_title = metadata.get( 

194 "filename", metadata.get("title", "Unknown") 

195 ) 

196 

197 # Calculate word count 

198 word_count = len(chunk_text.split()) 

199 

200 # Get character positions from metadata if available 

201 start_char = metadata.get("start_char", 0) 

202 end_char = metadata.get("end_char", len(chunk_text)) 

203 

204 # Check if chunk already exists 

205 existing_chunk = ( 

206 session.query(DocumentChunk) 

207 .filter_by(chunk_hash=chunk_hash) 

208 .first() 

209 ) 

210 

211 if existing_chunk: 

212 # Update existing chunk 

213 existing_chunk.last_accessed = datetime.now(UTC) 

214 chunk_ids.append(existing_chunk.embedding_id) 

215 logger.debug( 

216 f"Chunk already exists, reusing: {existing_chunk.embedding_id}" 

217 ) 

218 else: 

219 # Create new chunk 

220 db_chunk = DocumentChunk( 

221 chunk_hash=chunk_hash, 

222 source_type=source_type, 

223 source_id=source_id, 

224 source_path=str(source_path) 

225 if source_path 

226 else None, 

227 collection_name=collection_name, 

228 chunk_text=chunk_text, 

229 chunk_index=idx, 

230 start_char=start_char, 

231 end_char=end_char, 

232 word_count=word_count, 

233 embedding_id=embedding_id, 

234 embedding_model=self.embedding_model, 

235 embedding_model_type=self.embedding_model_type, 

236 document_title=document_title, 

237 document_metadata=metadata, 

238 ) 

239 session.add(db_chunk) 

240 chunk_ids.append(embedding_id) 

241 

242 session.commit() 

243 logger.info( 

244 f"Stored {len(chunk_ids)} chunks to database for collection '{collection_name}'" 

245 ) 

246 

247 except Exception: 

248 logger.exception( 

249 f"Error storing chunks to database for collection '{collection_name}'" 

250 ) 

251 return [] 

252 

253 return chunk_ids 

254 

255 def _delete_chunks_from_db( 

256 self, 

257 collection_name: str, 

258 source_path: Optional[str] = None, 

259 source_id: Optional[int] = None, 

260 ) -> int: 

261 """ 

262 Delete chunks from database. 

263 

264 Args: 

265 collection_name: Name of the collection 

266 source_path: Path to source file (for local files) 

267 source_id: ID of source document (for library documents) 

268 

269 Returns: 

270 Number of chunks deleted 

271 """ 

272 if not self.username: 

273 logger.warning( 

274 "No username available, cannot delete chunks from database" 

275 ) 

276 return 0 

277 

278 try: 

279 with get_user_db_session( 

280 self.username, self.db_password 

281 ) as session: 

282 query = session.query(DocumentChunk).filter_by( 

283 collection_name=collection_name 

284 ) 

285 

286 if source_path: 

287 query = query.filter_by(source_path=str(source_path)) 

288 if source_id: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true

289 query = query.filter_by(source_id=source_id) 

290 

291 count = int(query.delete()) 

292 session.commit() 

293 

294 logger.info( 

295 f"Deleted {count} chunks from database for collection '{collection_name}'" 

296 ) 

297 return count 

298 

299 except Exception: 

300 logger.exception( 

301 f"Error deleting chunks from database for collection '{collection_name}'" 

302 ) 

303 return 0