Coverage for src / local_deep_research / web_search_engines / engines / search_engine_collection.py: 14%

91 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Collection-specific RAG Search Engine 

3 

4Provides semantic search within a specific document collection using RAG. 

5""" 

6 

7from typing import List, Dict, Any, Optional 

8from loguru import logger 

9 

10from .search_engine_library import LibraryRAGSearchEngine 

11from ...research_library.services.library_rag_service import LibraryRAGService 

12from ...database.models.library import RAGIndex, Document 

13from ...research_library.services.pdf_storage_manager import PDFStorageManager 

14from ...database.session_context import get_user_db_session 

15from ...config.thread_settings import get_setting_from_snapshot 

16from ...config.paths import get_library_directory 

17 

18 

19class CollectionSearchEngine(LibraryRAGSearchEngine): 

20 """ 

21 Search engine for a specific document collection using RAG. 

22 Directly searches only the specified collection's FAISS index. 

23 Each collection uses its own embedding model that was used during indexing. 

24 """ 

25 

26 # Mark as local RAG engine 

27 is_local = True 

28 

29 def __init__( 

30 self, 

31 collection_id: str, 

32 collection_name: str, 

33 llm: Optional[Any] = None, 

34 max_filtered_results: Optional[int] = None, 

35 max_results: int = 10, 

36 settings_snapshot: Optional[Dict[str, Any]] = None, 

37 **kwargs, 

38 ): 

39 """ 

40 Initialize the collection-specific search engine. 

41 

42 Args: 

43 collection_id: UUID of the collection to search within 

44 collection_name: Name of the collection for display 

45 llm: Language model for relevance filtering 

46 max_filtered_results: Maximum number of results to keep after filtering 

47 max_results: Maximum number of search results 

48 settings_snapshot: Settings snapshot from thread context 

49 **kwargs: Additional engine-specific parameters 

50 """ 

51 super().__init__( 

52 llm=llm, 

53 max_filtered_results=max_filtered_results, 

54 max_results=max_results, 

55 settings_snapshot=settings_snapshot, 

56 **kwargs, 

57 ) 

58 self.collection_id = collection_id 

59 self.collection_name = collection_name 

60 self.collection_key = f"collection_{collection_id}" 

61 

62 # Load collection-specific embedding settings 

63 self._load_collection_embedding_settings() 

64 

65 def _load_collection_embedding_settings(self): 

66 """ 

67 Load embedding settings from the collection's RAG index. 

68 Uses the same embedding model that was used during indexing. 

69 """ 

70 if not self.username: 

71 logger.warning("Cannot load collection settings without username") 

72 return 

73 

74 try: 

75 with get_user_db_session(self.username) as db_session: 

76 # Get RAG index for this collection 

77 rag_index = ( 

78 db_session.query(RAGIndex) 

79 .filter_by( 

80 collection_name=self.collection_key, 

81 is_current=True, 

82 ) 

83 .first() 

84 ) 

85 

86 if not rag_index: 

87 logger.warning( 

88 f"No RAG index found for collection {self.collection_id}" 

89 ) 

90 return 

91 

92 # Use embedding settings from the RAG index 

93 self.embedding_model = rag_index.embedding_model 

94 self.embedding_provider = rag_index.embedding_model_type.value 

95 self.chunk_size = rag_index.chunk_size or self.chunk_size 

96 self.chunk_overlap = ( 

97 rag_index.chunk_overlap or self.chunk_overlap 

98 ) 

99 

100 logger.info( 

101 f"Collection '{self.collection_name}' using embedding: " 

102 f"{self.embedding_provider}/{self.embedding_model}" 

103 ) 

104 

105 except Exception: 

106 logger.exception( 

107 f"Error loading collection {self.collection_id} settings" 

108 ) 

109 

110 def search( 

111 self, 

112 query: str, 

113 limit: int = 10, 

114 llm_callback=None, 

115 extra_params: Optional[Dict[str, Any]] = None, 

116 ) -> List[Dict[str, Any]]: 

117 """ 

118 Search within the specific collection using semantic search. 

119 

120 Directly searches only this collection's FAISS index instead of 

121 searching all collections and filtering. 

122 

123 Args: 

124 query: Search query 

125 limit: Maximum number of results to return 

126 llm_callback: Optional LLM callback for processing results 

127 extra_params: Additional search parameters 

128 

129 Returns: 

130 List of search results from this collection 

131 """ 

132 if not self.username: 

133 logger.error("Cannot search collection without username") 

134 return [] 

135 

136 try: 

137 # Get RAG index info for this collection 

138 with get_user_db_session(self.username) as db_session: 

139 rag_index = ( 

140 db_session.query(RAGIndex) 

141 .filter_by( 

142 collection_name=self.collection_key, 

143 is_current=True, 

144 ) 

145 .first() 

146 ) 

147 

148 if not rag_index: 

149 logger.info( 

150 f"No RAG index for collection '{self.collection_name}'" 

151 ) 

152 return [] 

153 

154 # Get embedding settings from RAG index 

155 embedding_model = rag_index.embedding_model 

156 embedding_provider = rag_index.embedding_model_type.value 

157 chunk_size = rag_index.chunk_size or self.chunk_size 

158 chunk_overlap = rag_index.chunk_overlap or self.chunk_overlap 

159 

160 # Create RAG service with collection's embedding settings 

161 rag_service = LibraryRAGService( 

162 username=self.username, 

163 embedding_model=embedding_model, 

164 embedding_provider=embedding_provider, 

165 chunk_size=chunk_size, 

166 chunk_overlap=chunk_overlap, 

167 ) 

168 

169 # Check if there are indexed documents 

170 stats = rag_service.get_rag_stats(self.collection_id) 

171 if stats.get("indexed_documents", 0) == 0: 

172 logger.info( 

173 f"No documents indexed in collection '{self.collection_name}'" 

174 ) 

175 return [] 

176 

177 # Load and search the FAISS index for this collection 

178 vector_store = rag_service.load_or_create_faiss_index( 

179 self.collection_id 

180 ) 

181 

182 docs_with_scores = vector_store.similarity_search_with_score( 

183 query, k=limit 

184 ) 

185 

186 if not docs_with_scores: 

187 logger.info( 

188 f"No results found in collection '{self.collection_name}'" 

189 ) 

190 return [] 

191 

192 # Convert to search result format 

193 results = [] 

194 for doc, score in docs_with_scores: 

195 metadata = doc.metadata or {} 

196 

197 # Get document ID 

198 doc_id = metadata.get("source_id") or metadata.get( 

199 "document_id" 

200 ) 

201 

202 # Get title 

203 title = ( 

204 metadata.get("document_title") 

205 or metadata.get("title") 

206 or (f"Document {doc_id}" if doc_id else "Untitled") 

207 ) 

208 

209 # Create snippet from content 

210 snippet = ( 

211 doc.page_content[:500] + "..." 

212 if len(doc.page_content) > 500 

213 else doc.page_content 

214 ) 

215 

216 # Generate document URL 

217 document_url = self._get_document_url(doc_id) 

218 

219 # Add collection info to metadata 

220 metadata["collection_id"] = self.collection_id 

221 metadata["collection_name"] = self.collection_name 

222 

223 result = { 

224 "title": title, 

225 "snippet": snippet, 

226 "url": document_url, 

227 "link": document_url, 

228 "source": "library", 

229 "relevance_score": float(1 / (1 + score)), 

230 "metadata": metadata, 

231 } 

232 results.append(result) 

233 

234 logger.info( 

235 f"Collection '{self.collection_name}' search returned " 

236 f"{len(results)} results for query: {query[:50]}..." 

237 ) 

238 

239 return results 

240 

241 except Exception: 

242 logger.exception( 

243 f"Error searching collection '{self.collection_name}'" 

244 ) 

245 return [] 

246 

247 def _get_document_url(self, doc_id: Optional[str]) -> str: 

248 """Get the URL for viewing a document.""" 

249 if not doc_id: 

250 return "#" 

251 

252 # Default to root document page (shows all options: PDF, Text, Chunks, etc.) 

253 document_url = f"/library/document/{doc_id}" 

254 

255 try: 

256 with get_user_db_session(self.username) as session: 

257 document = session.query(Document).filter_by(id=doc_id).first() 

258 if document: 

259 from pathlib import Path 

260 

261 library_root = get_setting_from_snapshot( 

262 "research_library.storage_path", 

263 self.settings_snapshot, 

264 str(get_library_directory()), 

265 ) 

266 library_root = Path(library_root).expanduser() 

267 pdf_manager = PDFStorageManager(library_root, "auto") 

268 if pdf_manager.has_pdf(document, session): 

269 document_url = f"/library/document/{doc_id}/pdf" 

270 except Exception as e: 

271 logger.warning(f"Error getting document URL for {doc_id}: {e}") 

272 

273 return document_url