Coverage for src / local_deep_research / web_search_engines / engines / search_engine_collection.py: 14%
91 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Collection-specific RAG Search Engine
4Provides semantic search within a specific document collection using RAG.
5"""
7from typing import List, Dict, Any, Optional
8from loguru import logger
10from .search_engine_library import LibraryRAGSearchEngine
11from ...research_library.services.library_rag_service import LibraryRAGService
12from ...database.models.library import RAGIndex, Document
13from ...research_library.services.pdf_storage_manager import PDFStorageManager
14from ...database.session_context import get_user_db_session
15from ...config.thread_settings import get_setting_from_snapshot
16from ...config.paths import get_library_directory
19class CollectionSearchEngine(LibraryRAGSearchEngine):
20 """
21 Search engine for a specific document collection using RAG.
22 Directly searches only the specified collection's FAISS index.
23 Each collection uses its own embedding model that was used during indexing.
24 """
26 # Mark as local RAG engine
27 is_local = True
29 def __init__(
30 self,
31 collection_id: str,
32 collection_name: str,
33 llm: Optional[Any] = None,
34 max_filtered_results: Optional[int] = None,
35 max_results: int = 10,
36 settings_snapshot: Optional[Dict[str, Any]] = None,
37 **kwargs,
38 ):
39 """
40 Initialize the collection-specific search engine.
42 Args:
43 collection_id: UUID of the collection to search within
44 collection_name: Name of the collection for display
45 llm: Language model for relevance filtering
46 max_filtered_results: Maximum number of results to keep after filtering
47 max_results: Maximum number of search results
48 settings_snapshot: Settings snapshot from thread context
49 **kwargs: Additional engine-specific parameters
50 """
51 super().__init__(
52 llm=llm,
53 max_filtered_results=max_filtered_results,
54 max_results=max_results,
55 settings_snapshot=settings_snapshot,
56 **kwargs,
57 )
58 self.collection_id = collection_id
59 self.collection_name = collection_name
60 self.collection_key = f"collection_{collection_id}"
62 # Load collection-specific embedding settings
63 self._load_collection_embedding_settings()
65 def _load_collection_embedding_settings(self):
66 """
67 Load embedding settings from the collection's RAG index.
68 Uses the same embedding model that was used during indexing.
69 """
70 if not self.username:
71 logger.warning("Cannot load collection settings without username")
72 return
74 try:
75 with get_user_db_session(self.username) as db_session:
76 # Get RAG index for this collection
77 rag_index = (
78 db_session.query(RAGIndex)
79 .filter_by(
80 collection_name=self.collection_key,
81 is_current=True,
82 )
83 .first()
84 )
86 if not rag_index:
87 logger.warning(
88 f"No RAG index found for collection {self.collection_id}"
89 )
90 return
92 # Use embedding settings from the RAG index
93 self.embedding_model = rag_index.embedding_model
94 self.embedding_provider = rag_index.embedding_model_type.value
95 self.chunk_size = rag_index.chunk_size or self.chunk_size
96 self.chunk_overlap = (
97 rag_index.chunk_overlap or self.chunk_overlap
98 )
100 logger.info(
101 f"Collection '{self.collection_name}' using embedding: "
102 f"{self.embedding_provider}/{self.embedding_model}"
103 )
105 except Exception:
106 logger.exception(
107 f"Error loading collection {self.collection_id} settings"
108 )
110 def search(
111 self,
112 query: str,
113 limit: int = 10,
114 llm_callback=None,
115 extra_params: Optional[Dict[str, Any]] = None,
116 ) -> List[Dict[str, Any]]:
117 """
118 Search within the specific collection using semantic search.
120 Directly searches only this collection's FAISS index instead of
121 searching all collections and filtering.
123 Args:
124 query: Search query
125 limit: Maximum number of results to return
126 llm_callback: Optional LLM callback for processing results
127 extra_params: Additional search parameters
129 Returns:
130 List of search results from this collection
131 """
132 if not self.username:
133 logger.error("Cannot search collection without username")
134 return []
136 try:
137 # Get RAG index info for this collection
138 with get_user_db_session(self.username) as db_session:
139 rag_index = (
140 db_session.query(RAGIndex)
141 .filter_by(
142 collection_name=self.collection_key,
143 is_current=True,
144 )
145 .first()
146 )
148 if not rag_index:
149 logger.info(
150 f"No RAG index for collection '{self.collection_name}'"
151 )
152 return []
154 # Get embedding settings from RAG index
155 embedding_model = rag_index.embedding_model
156 embedding_provider = rag_index.embedding_model_type.value
157 chunk_size = rag_index.chunk_size or self.chunk_size
158 chunk_overlap = rag_index.chunk_overlap or self.chunk_overlap
160 # Create RAG service with collection's embedding settings
161 rag_service = LibraryRAGService(
162 username=self.username,
163 embedding_model=embedding_model,
164 embedding_provider=embedding_provider,
165 chunk_size=chunk_size,
166 chunk_overlap=chunk_overlap,
167 )
169 # Check if there are indexed documents
170 stats = rag_service.get_rag_stats(self.collection_id)
171 if stats.get("indexed_documents", 0) == 0:
172 logger.info(
173 f"No documents indexed in collection '{self.collection_name}'"
174 )
175 return []
177 # Load and search the FAISS index for this collection
178 vector_store = rag_service.load_or_create_faiss_index(
179 self.collection_id
180 )
182 docs_with_scores = vector_store.similarity_search_with_score(
183 query, k=limit
184 )
186 if not docs_with_scores:
187 logger.info(
188 f"No results found in collection '{self.collection_name}'"
189 )
190 return []
192 # Convert to search result format
193 results = []
194 for doc, score in docs_with_scores:
195 metadata = doc.metadata or {}
197 # Get document ID
198 doc_id = metadata.get("source_id") or metadata.get(
199 "document_id"
200 )
202 # Get title
203 title = (
204 metadata.get("document_title")
205 or metadata.get("title")
206 or (f"Document {doc_id}" if doc_id else "Untitled")
207 )
209 # Create snippet from content
210 snippet = (
211 doc.page_content[:500] + "..."
212 if len(doc.page_content) > 500
213 else doc.page_content
214 )
216 # Generate document URL
217 document_url = self._get_document_url(doc_id)
219 # Add collection info to metadata
220 metadata["collection_id"] = self.collection_id
221 metadata["collection_name"] = self.collection_name
223 result = {
224 "title": title,
225 "snippet": snippet,
226 "url": document_url,
227 "link": document_url,
228 "source": "library",
229 "relevance_score": float(1 / (1 + score)),
230 "metadata": metadata,
231 }
232 results.append(result)
234 logger.info(
235 f"Collection '{self.collection_name}' search returned "
236 f"{len(results)} results for query: {query[:50]}..."
237 )
239 return results
241 except Exception:
242 logger.exception(
243 f"Error searching collection '{self.collection_name}'"
244 )
245 return []
247 def _get_document_url(self, doc_id: Optional[str]) -> str:
248 """Get the URL for viewing a document."""
249 if not doc_id:
250 return "#"
252 # Default to root document page (shows all options: PDF, Text, Chunks, etc.)
253 document_url = f"/library/document/{doc_id}"
255 try:
256 with get_user_db_session(self.username) as session:
257 document = session.query(Document).filter_by(id=doc_id).first()
258 if document:
259 from pathlib import Path
261 library_root = get_setting_from_snapshot(
262 "research_library.storage_path",
263 self.settings_snapshot,
264 str(get_library_directory()),
265 )
266 library_root = Path(library_root).expanduser()
267 pdf_manager = PDFStorageManager(library_root, "auto")
268 if pdf_manager.has_pdf(document, session):
269 document_url = f"/library/document/{doc_id}/pdf"
270 except Exception as e:
271 logger.warning(f"Error getting document URL for {doc_id}: {e}")
273 return document_url