Coverage for src / local_deep_research / web_search_engines / engines / search_engine_collection.py: 97%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Collection-specific RAG Search Engine 

3 

4Provides semantic search within a specific document collection using RAG. 

5""" 

6 

7import os 

8from typing import List, Dict, Any, Optional 

9from loguru import logger 

10 

11from .search_engine_library import LibraryRAGSearchEngine 

12from ...constants import SNIPPET_LENGTH_LONG 

13from ...research_library.services.library_rag_service import LibraryRAGService 

14from ...database.models.library import RAGIndex, Document 

15from ...research_library.services.pdf_storage_manager import PDFStorageManager 

16from ...database.session_context import get_user_db_session 

17from ...config.thread_settings import get_setting_from_snapshot 

18from ...config.paths import get_library_directory 

19 

20 

21class CollectionSearchEngine(LibraryRAGSearchEngine): 

22 """ 

23 Search engine for a specific document collection using RAG. 

24 Directly searches only the specified collection's FAISS index. 

25 Each collection uses its own embedding model that was used during indexing. 

26 """ 

27 

28 # Mark as local RAG engine 

29 is_local = True 

30 

31 def __init__( 

32 self, 

33 collection_id: str, 

34 collection_name: str, 

35 llm: Optional[Any] = None, 

36 max_filtered_results: Optional[int] = None, 

37 max_results: int = 10, 

38 settings_snapshot: Optional[Dict[str, Any]] = None, 

39 **kwargs, 

40 ): 

41 """ 

42 Initialize the collection-specific search engine. 

43 

44 Args: 

45 collection_id: UUID of the collection to search within 

46 collection_name: Name of the collection for display 

47 llm: Language model for relevance filtering 

48 max_filtered_results: Maximum number of results to keep after filtering 

49 max_results: Maximum number of search results 

50 settings_snapshot: Settings snapshot from thread context 

51 **kwargs: Additional engine-specific parameters 

52 """ 

53 super().__init__( 

54 llm=llm, 

55 max_filtered_results=max_filtered_results, 

56 max_results=max_results, 

57 settings_snapshot=settings_snapshot, 

58 **kwargs, 

59 ) 

60 self.collection_id = collection_id 

61 self.collection_name = collection_name 

62 self.collection_key = f"collection_{collection_id}" 

63 

64 # Load collection-specific embedding settings 

65 self._load_collection_embedding_settings() 

66 

67 def _load_collection_embedding_settings(self): 

68 """ 

69 Load embedding settings from the collection's RAG index. 

70 Uses the same embedding model that was used during indexing. 

71 """ 

72 if not self.username: 

73 logger.warning("Cannot load collection settings without username") 

74 return 

75 

76 try: 

77 with get_user_db_session(self.username) as db_session: 

78 # Get RAG index for this collection 

79 rag_index = ( 

80 db_session.query(RAGIndex) 

81 .filter_by( 

82 collection_name=self.collection_key, 

83 is_current=True, 

84 ) 

85 .first() 

86 ) 

87 

88 if not rag_index: 

89 logger.warning( 

90 f"No RAG index found for collection {self.collection_id}" 

91 ) 

92 return 

93 

94 # Use embedding settings from the RAG index 

95 self.embedding_model = rag_index.embedding_model 

96 self.embedding_provider = rag_index.embedding_model_type.value 

97 self.chunk_size = rag_index.chunk_size or self.chunk_size 

98 self.chunk_overlap = ( 

99 rag_index.chunk_overlap or self.chunk_overlap 

100 ) 

101 

102 logger.info( 

103 f"Collection '{self.collection_name}' using embedding: " 

104 f"{self.embedding_provider}/{self.embedding_model}" 

105 ) 

106 

107 except Exception: 

108 logger.exception( 

109 f"Error loading collection {self.collection_id} settings" 

110 ) 

111 

112 def search( 

113 self, 

114 query: str, 

115 limit: int = 10, 

116 llm_callback=None, 

117 extra_params: Optional[Dict[str, Any]] = None, 

118 ) -> List[Dict[str, Any]]: 

119 """ 

120 Search within the specific collection using semantic search. 

121 

122 Directly searches only this collection's FAISS index instead of 

123 searching all collections and filtering. 

124 

125 Args: 

126 query: Search query 

127 limit: Maximum number of results to return 

128 llm_callback: Optional LLM callback for processing results 

129 extra_params: Additional search parameters 

130 

131 Returns: 

132 List of search results from this collection 

133 """ 

134 if not self.username: 

135 logger.error("Cannot search collection without username") 

136 return [] 

137 

138 try: 

139 # Get RAG index info for this collection 

140 with get_user_db_session(self.username) as db_session: 

141 rag_index = ( 

142 db_session.query(RAGIndex) 

143 .filter_by( 

144 collection_name=self.collection_key, 

145 is_current=True, 

146 ) 

147 .first() 

148 ) 

149 

150 if not rag_index: 

151 logger.info( 

152 f"No RAG index for collection '{self.collection_name}'" 

153 ) 

154 return [] 

155 

156 # Get embedding settings from RAG index 

157 embedding_model = rag_index.embedding_model 

158 embedding_provider = rag_index.embedding_model_type.value 

159 chunk_size = rag_index.chunk_size or self.chunk_size 

160 chunk_overlap = rag_index.chunk_overlap or self.chunk_overlap 

161 

162 # Create RAG service with collection's embedding settings 

163 with LibraryRAGService( 

164 username=self.username, 

165 embedding_model=embedding_model, 

166 embedding_provider=embedding_provider, 

167 chunk_size=chunk_size, 

168 chunk_overlap=chunk_overlap, 

169 ) as rag_service: 

170 # Check if there are indexed documents 

171 stats = rag_service.get_rag_stats(self.collection_id) 

172 if stats.get("indexed_documents", 0) == 0: 

173 logger.info( 

174 f"No documents indexed in collection '{self.collection_name}'" 

175 ) 

176 return [] 

177 

178 # Load and search the FAISS index for this collection 

179 vector_store = rag_service.load_or_create_faiss_index( 

180 self.collection_id 

181 ) 

182 

183 docs_with_scores = vector_store.similarity_search_with_score( 

184 query, k=limit 

185 ) 

186 

187 if not docs_with_scores: 

188 logger.info( 

189 f"No results found in collection '{self.collection_name}'" 

190 ) 

191 return [] 

192 

193 # Convert to search result format 

194 results = [] 

195 for doc, score in docs_with_scores: 

196 metadata = doc.metadata or {} 

197 

198 # Get document ID 

199 doc_id = metadata.get("source_id") or metadata.get( 

200 "document_id" 

201 ) 

202 

203 # Get title 

204 title = ( 

205 metadata.get("document_title") 

206 or metadata.get("title") 

207 or (f"Document {doc_id}" if doc_id else "Untitled") 

208 ) 

209 

210 # Create snippet from content 

211 snippet = ( 

212 doc.page_content[:SNIPPET_LENGTH_LONG] + "..." 

213 if len(doc.page_content) > SNIPPET_LENGTH_LONG 

214 else doc.page_content 

215 ) 

216 

217 # Generate document URL 

218 document_url = self._get_document_url(doc_id) 

219 

220 # Add collection info to metadata 

221 metadata["collection_id"] = self.collection_id 

222 metadata["collection_name"] = self.collection_name 

223 

224 result = { 

225 "title": title, 

226 "snippet": snippet, 

227 "url": document_url, 

228 "link": document_url, 

229 "source": "library", 

230 "source_type": "library", 

231 "relevance_score": float(1 / (1 + score)), 

232 "metadata": metadata, 

233 } 

234 results.append(result) 

235 

236 logger.info( 

237 f"Collection '{self.collection_name}' search returned " 

238 f"{len(results)} results for query: {query[:50]}..." 

239 ) 

240 

241 return results 

242 

243 except Exception: 

244 logger.exception( 

245 f"Error searching collection '{self.collection_name}'" 

246 ) 

247 return [] 

248 

249 def _get_document_url(self, doc_id: Optional[str]) -> str: 

250 """Get the URL for viewing a document.""" 

251 if not doc_id: 

252 return "#" 

253 

254 # Default to root document page (shows all options: PDF, Text, Chunks, etc.) 

255 document_url = f"/library/document/{doc_id}" 

256 

257 try: 

258 with get_user_db_session(self.username) as session: 

259 document = session.query(Document).filter_by(id=doc_id).first() 

260 if document: 

261 from pathlib import Path 

262 

263 library_root = get_setting_from_snapshot( 

264 "research_library.storage_path", 

265 default=str(get_library_directory()), 

266 settings_snapshot=self.settings_snapshot, 

267 ) 

268 library_root = ( 

269 Path(os.path.expandvars(library_root)) 

270 .expanduser() 

271 .resolve() 

272 ) 

273 if PDFStorageManager.pdf_exists( 273 ↛ 280line 273 didn't jump to line 280

274 library_root, document, session 

275 ): 

276 document_url = f"/library/document/{doc_id}/pdf" 

277 except Exception: 

278 logger.warning(f"Error getting document URL for {doc_id}") 

279 

280 return document_url