Coverage for src / local_deep_research / web_search_engines / engines / search_engine_collection.py: 97%
92 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Collection-specific RAG Search Engine
4Provides semantic search within a specific document collection using RAG.
5"""
7import os
8from typing import List, Dict, Any, Optional
9from loguru import logger
11from .search_engine_library import LibraryRAGSearchEngine
12from ...constants import SNIPPET_LENGTH_LONG
13from ...research_library.services.library_rag_service import LibraryRAGService
14from ...database.models.library import RAGIndex, Document
15from ...research_library.services.pdf_storage_manager import PDFStorageManager
16from ...database.session_context import get_user_db_session
17from ...config.thread_settings import get_setting_from_snapshot
18from ...config.paths import get_library_directory
21class CollectionSearchEngine(LibraryRAGSearchEngine):
22 """
23 Search engine for a specific document collection using RAG.
24 Directly searches only the specified collection's FAISS index.
25 Each collection uses its own embedding model that was used during indexing.
26 """
28 # Mark as local RAG engine
29 is_local = True
31 def __init__(
32 self,
33 collection_id: str,
34 collection_name: str,
35 llm: Optional[Any] = None,
36 max_filtered_results: Optional[int] = None,
37 max_results: int = 10,
38 settings_snapshot: Optional[Dict[str, Any]] = None,
39 **kwargs,
40 ):
41 """
42 Initialize the collection-specific search engine.
44 Args:
45 collection_id: UUID of the collection to search within
46 collection_name: Name of the collection for display
47 llm: Language model for relevance filtering
48 max_filtered_results: Maximum number of results to keep after filtering
49 max_results: Maximum number of search results
50 settings_snapshot: Settings snapshot from thread context
51 **kwargs: Additional engine-specific parameters
52 """
53 super().__init__(
54 llm=llm,
55 max_filtered_results=max_filtered_results,
56 max_results=max_results,
57 settings_snapshot=settings_snapshot,
58 **kwargs,
59 )
60 self.collection_id = collection_id
61 self.collection_name = collection_name
62 self.collection_key = f"collection_{collection_id}"
64 # Load collection-specific embedding settings
65 self._load_collection_embedding_settings()
67 def _load_collection_embedding_settings(self):
68 """
69 Load embedding settings from the collection's RAG index.
70 Uses the same embedding model that was used during indexing.
71 """
72 if not self.username:
73 logger.warning("Cannot load collection settings without username")
74 return
76 try:
77 with get_user_db_session(self.username) as db_session:
78 # Get RAG index for this collection
79 rag_index = (
80 db_session.query(RAGIndex)
81 .filter_by(
82 collection_name=self.collection_key,
83 is_current=True,
84 )
85 .first()
86 )
88 if not rag_index:
89 logger.warning(
90 f"No RAG index found for collection {self.collection_id}"
91 )
92 return
94 # Use embedding settings from the RAG index
95 self.embedding_model = rag_index.embedding_model
96 self.embedding_provider = rag_index.embedding_model_type.value
97 self.chunk_size = rag_index.chunk_size or self.chunk_size
98 self.chunk_overlap = (
99 rag_index.chunk_overlap or self.chunk_overlap
100 )
102 logger.info(
103 f"Collection '{self.collection_name}' using embedding: "
104 f"{self.embedding_provider}/{self.embedding_model}"
105 )
107 except Exception:
108 logger.exception(
109 f"Error loading collection {self.collection_id} settings"
110 )
112 def search(
113 self,
114 query: str,
115 limit: int = 10,
116 llm_callback=None,
117 extra_params: Optional[Dict[str, Any]] = None,
118 ) -> List[Dict[str, Any]]:
119 """
120 Search within the specific collection using semantic search.
122 Directly searches only this collection's FAISS index instead of
123 searching all collections and filtering.
125 Args:
126 query: Search query
127 limit: Maximum number of results to return
128 llm_callback: Optional LLM callback for processing results
129 extra_params: Additional search parameters
131 Returns:
132 List of search results from this collection
133 """
134 if not self.username:
135 logger.error("Cannot search collection without username")
136 return []
138 try:
139 # Get RAG index info for this collection
140 with get_user_db_session(self.username) as db_session:
141 rag_index = (
142 db_session.query(RAGIndex)
143 .filter_by(
144 collection_name=self.collection_key,
145 is_current=True,
146 )
147 .first()
148 )
150 if not rag_index:
151 logger.info(
152 f"No RAG index for collection '{self.collection_name}'"
153 )
154 return []
156 # Get embedding settings from RAG index
157 embedding_model = rag_index.embedding_model
158 embedding_provider = rag_index.embedding_model_type.value
159 chunk_size = rag_index.chunk_size or self.chunk_size
160 chunk_overlap = rag_index.chunk_overlap or self.chunk_overlap
162 # Create RAG service with collection's embedding settings
163 with LibraryRAGService(
164 username=self.username,
165 embedding_model=embedding_model,
166 embedding_provider=embedding_provider,
167 chunk_size=chunk_size,
168 chunk_overlap=chunk_overlap,
169 ) as rag_service:
170 # Check if there are indexed documents
171 stats = rag_service.get_rag_stats(self.collection_id)
172 if stats.get("indexed_documents", 0) == 0:
173 logger.info(
174 f"No documents indexed in collection '{self.collection_name}'"
175 )
176 return []
178 # Load and search the FAISS index for this collection
179 vector_store = rag_service.load_or_create_faiss_index(
180 self.collection_id
181 )
183 docs_with_scores = vector_store.similarity_search_with_score(
184 query, k=limit
185 )
187 if not docs_with_scores:
188 logger.info(
189 f"No results found in collection '{self.collection_name}'"
190 )
191 return []
193 # Convert to search result format
194 results = []
195 for doc, score in docs_with_scores:
196 metadata = doc.metadata or {}
198 # Get document ID
199 doc_id = metadata.get("source_id") or metadata.get(
200 "document_id"
201 )
203 # Get title
204 title = (
205 metadata.get("document_title")
206 or metadata.get("title")
207 or (f"Document {doc_id}" if doc_id else "Untitled")
208 )
210 # Create snippet from content
211 snippet = (
212 doc.page_content[:SNIPPET_LENGTH_LONG] + "..."
213 if len(doc.page_content) > SNIPPET_LENGTH_LONG
214 else doc.page_content
215 )
217 # Generate document URL
218 document_url = self._get_document_url(doc_id)
220 # Add collection info to metadata
221 metadata["collection_id"] = self.collection_id
222 metadata["collection_name"] = self.collection_name
224 result = {
225 "title": title,
226 "snippet": snippet,
227 "url": document_url,
228 "link": document_url,
229 "source": "library",
230 "source_type": "library",
231 "relevance_score": float(1 / (1 + score)),
232 "metadata": metadata,
233 }
234 results.append(result)
236 logger.info(
237 f"Collection '{self.collection_name}' search returned "
238 f"{len(results)} results for query: {query[:50]}..."
239 )
241 return results
243 except Exception:
244 logger.exception(
245 f"Error searching collection '{self.collection_name}'"
246 )
247 return []
249 def _get_document_url(self, doc_id: Optional[str]) -> str:
250 """Get the URL for viewing a document."""
251 if not doc_id:
252 return "#"
254 # Default to root document page (shows all options: PDF, Text, Chunks, etc.)
255 document_url = f"/library/document/{doc_id}"
257 try:
258 with get_user_db_session(self.username) as session:
259 document = session.query(Document).filter_by(id=doc_id).first()
260 if document:
261 from pathlib import Path
263 library_root = get_setting_from_snapshot(
264 "research_library.storage_path",
265 default=str(get_library_directory()),
266 settings_snapshot=self.settings_snapshot,
267 )
268 library_root = (
269 Path(os.path.expandvars(library_root))
270 .expanduser()
271 .resolve()
272 )
273 if PDFStorageManager.pdf_exists( 273 ↛ 280line 273 didn't jump to line 280
274 library_root, document, session
275 ):
276 document_url = f"/library/document/{doc_id}/pdf"
277 except Exception:
278 logger.warning(f"Error getting document URL for {doc_id}")
280 return document_url