Coverage for src / local_deep_research / web_search_engines / engines / search_engine_collection.py: 97%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Collection-specific RAG Search Engine 

3 

4Provides semantic search within a specific document collection using RAG. 

5""" 

6 

7from typing import List, Dict, Any, Optional 

8from loguru import logger 

9 

10from .search_engine_library import LibraryRAGSearchEngine 

11from ...constants import SNIPPET_LENGTH_LONG 

12from ...research_library.services.library_rag_service import LibraryRAGService 

13from ...database.models.library import RAGIndex, Document 

14from ...research_library.services.pdf_storage_manager import PDFStorageManager 

15from ...database.session_context import get_user_db_session 

16from ...config.thread_settings import get_setting_from_snapshot 

17from ...config.paths import get_library_directory 

18 

19 

20class CollectionSearchEngine(LibraryRAGSearchEngine): 

21 """ 

22 Search engine for a specific document collection using RAG. 

23 Directly searches only the specified collection's FAISS index. 

24 Each collection uses its own embedding model that was used during indexing. 

25 """ 

26 

27 # Mark as local RAG engine 

28 is_local = True 

29 

30 def __init__( 

31 self, 

32 collection_id: str, 

33 collection_name: str, 

34 llm: Optional[Any] = None, 

35 max_filtered_results: Optional[int] = None, 

36 max_results: int = 10, 

37 settings_snapshot: Optional[Dict[str, Any]] = None, 

38 **kwargs, 

39 ): 

40 """ 

41 Initialize the collection-specific search engine. 

42 

43 Args: 

44 collection_id: UUID of the collection to search within 

45 collection_name: Name of the collection for display 

46 llm: Language model for relevance filtering 

47 max_filtered_results: Maximum number of results to keep after filtering 

48 max_results: Maximum number of search results 

49 settings_snapshot: Settings snapshot from thread context 

50 **kwargs: Additional engine-specific parameters 

51 """ 

52 super().__init__( 

53 llm=llm, 

54 max_filtered_results=max_filtered_results, 

55 max_results=max_results, 

56 settings_snapshot=settings_snapshot, 

57 **kwargs, 

58 ) 

59 self.collection_id = collection_id 

60 self.collection_name = collection_name 

61 self.collection_key = f"collection_{collection_id}" 

62 

63 # Load collection-specific embedding settings 

64 self._load_collection_embedding_settings() 

65 

66 def _load_collection_embedding_settings(self): 

67 """ 

68 Load embedding settings from the collection's RAG index. 

69 Uses the same embedding model that was used during indexing. 

70 """ 

71 if not self.username: 

72 logger.warning("Cannot load collection settings without username") 

73 return 

74 

75 try: 

76 with get_user_db_session(self.username) as db_session: 

77 # Get RAG index for this collection 

78 rag_index = ( 

79 db_session.query(RAGIndex) 

80 .filter_by( 

81 collection_name=self.collection_key, 

82 is_current=True, 

83 ) 

84 .first() 

85 ) 

86 

87 if not rag_index: 

88 logger.warning( 

89 f"No RAG index found for collection {self.collection_id}" 

90 ) 

91 return 

92 

93 # Use embedding settings from the RAG index 

94 self.embedding_model = rag_index.embedding_model 

95 self.embedding_provider = rag_index.embedding_model_type.value 

96 self.chunk_size = rag_index.chunk_size or self.chunk_size 

97 self.chunk_overlap = ( 

98 rag_index.chunk_overlap or self.chunk_overlap 

99 ) 

100 

101 logger.info( 

102 f"Collection '{self.collection_name}' using embedding: " 

103 f"{self.embedding_provider}/{self.embedding_model}" 

104 ) 

105 

106 except Exception: 

107 logger.exception( 

108 f"Error loading collection {self.collection_id} settings" 

109 ) 

110 

111 def search( 

112 self, 

113 query: str, 

114 limit: int = 10, 

115 llm_callback=None, 

116 extra_params: Optional[Dict[str, Any]] = None, 

117 ) -> List[Dict[str, Any]]: 

118 """ 

119 Search within the specific collection using semantic search. 

120 

121 Directly searches only this collection's FAISS index instead of 

122 searching all collections and filtering. 

123 

124 Args: 

125 query: Search query 

126 limit: Maximum number of results to return 

127 llm_callback: Optional LLM callback for processing results 

128 extra_params: Additional search parameters 

129 

130 Returns: 

131 List of search results from this collection 

132 """ 

133 if not self.username: 

134 logger.error("Cannot search collection without username") 

135 return [] 

136 

137 try: 

138 # Get RAG index info for this collection 

139 with get_user_db_session(self.username) as db_session: 

140 rag_index = ( 

141 db_session.query(RAGIndex) 

142 .filter_by( 

143 collection_name=self.collection_key, 

144 is_current=True, 

145 ) 

146 .first() 

147 ) 

148 

149 if not rag_index: 

150 logger.info( 

151 f"No RAG index for collection '{self.collection_name}'" 

152 ) 

153 return [] 

154 

155 # Get embedding settings from RAG index 

156 embedding_model = rag_index.embedding_model 

157 embedding_provider = rag_index.embedding_model_type.value 

158 chunk_size = rag_index.chunk_size or self.chunk_size 

159 chunk_overlap = rag_index.chunk_overlap or self.chunk_overlap 

160 

161 # Create RAG service with collection's embedding settings 

162 with LibraryRAGService( 

163 username=self.username, 

164 embedding_model=embedding_model, 

165 embedding_provider=embedding_provider, 

166 chunk_size=chunk_size, 

167 chunk_overlap=chunk_overlap, 

168 ) as rag_service: 

169 # Check if there are indexed documents 

170 stats = rag_service.get_rag_stats(self.collection_id) 

171 if stats.get("indexed_documents", 0) == 0: 

172 logger.info( 

173 f"No documents indexed in collection '{self.collection_name}'" 

174 ) 

175 return [] 

176 

177 # Load and search the FAISS index for this collection 

178 vector_store = rag_service.load_or_create_faiss_index( 

179 self.collection_id 

180 ) 

181 

182 docs_with_scores = vector_store.similarity_search_with_score( 

183 query, k=limit 

184 ) 

185 

186 if not docs_with_scores: 

187 logger.info( 

188 f"No results found in collection '{self.collection_name}'" 

189 ) 

190 return [] 

191 

192 # Convert to search result format 

193 results = [] 

194 for doc, score in docs_with_scores: 

195 metadata = doc.metadata or {} 

196 

197 # Get document ID 

198 doc_id = metadata.get("source_id") or metadata.get( 

199 "document_id" 

200 ) 

201 

202 # Get title 

203 title = ( 

204 metadata.get("document_title") 

205 or metadata.get("title") 

206 or (f"Document {doc_id}" if doc_id else "Untitled") 

207 ) 

208 

209 # Create snippet from content 

210 snippet = ( 

211 doc.page_content[:SNIPPET_LENGTH_LONG] + "..." 

212 if len(doc.page_content) > SNIPPET_LENGTH_LONG 

213 else doc.page_content 

214 ) 

215 

216 # Generate document URL 

217 document_url = self._get_document_url(doc_id) 

218 

219 # Add collection info to metadata 

220 metadata["collection_id"] = self.collection_id 

221 metadata["collection_name"] = self.collection_name 

222 

223 result = { 

224 "title": title, 

225 "snippet": snippet, 

226 "url": document_url, 

227 "link": document_url, 

228 "source": "library", 

229 "relevance_score": float(1 / (1 + score)), 

230 "metadata": metadata, 

231 } 

232 results.append(result) 

233 

234 logger.info( 

235 f"Collection '{self.collection_name}' search returned " 

236 f"{len(results)} results for query: {query[:50]}..." 

237 ) 

238 

239 return results 

240 

241 except Exception: 

242 logger.exception( 

243 f"Error searching collection '{self.collection_name}'" 

244 ) 

245 return [] 

246 

247 def _get_document_url(self, doc_id: Optional[str]) -> str: 

248 """Get the URL for viewing a document.""" 

249 if not doc_id: 

250 return "#" 

251 

252 # Default to root document page (shows all options: PDF, Text, Chunks, etc.) 

253 document_url = f"/library/document/{doc_id}" 

254 

255 try: 

256 with get_user_db_session(self.username) as session: 

257 document = session.query(Document).filter_by(id=doc_id).first() 

258 if document: 

259 from pathlib import Path 

260 

261 library_root = get_setting_from_snapshot( 

262 "research_library.storage_path", 

263 self.settings_snapshot, 

264 str(get_library_directory()), 

265 ) 

266 library_root = Path(library_root).expanduser() 

267 pdf_manager = PDFStorageManager(library_root, "auto") 

268 if pdf_manager.has_pdf(document, session): 268 ↛ 273line 268 didn't jump to line 273

269 document_url = f"/library/document/{doc_id}/pdf" 

270 except Exception as e: 

271 logger.warning(f"Error getting document URL for {doc_id}: {e}") 

272 

273 return document_url