Coverage for src / local_deep_research / web_search_engines / engines / search_engine_library.py: 65%
127 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-25 01:07 +0000
1"""
2Library RAG Search Engine
4Provides semantic search over the user's personal research library using RAG.
5"""
7from typing import List, Dict, Any, Optional
8from loguru import logger
10from ..search_engine_base import BaseSearchEngine
11from ...constants import SNIPPET_LENGTH_LONG
12from ...research_library.services.library_rag_service import LibraryRAGService
13from ...research_library.services.library_service import LibraryService
14from ...config.thread_settings import get_setting_from_snapshot
15from ...utilities.llm_utils import get_server_url
16from ...database.models.library import RAGIndex, Document
17from ...research_library.services.pdf_storage_manager import PDFStorageManager
18from ...database.session_context import get_user_db_session
19from ...config.paths import get_library_directory
22class LibraryRAGSearchEngine(BaseSearchEngine):
23 """
24 Search engine that queries the user's research library using RAG/semantic search.
25 """
27 # Mark as local RAG engine
28 is_local = True
30 def __init__(
31 self,
32 llm: Optional[Any] = None,
33 max_filtered_results: Optional[int] = None,
34 max_results: int = 10,
35 settings_snapshot: Optional[Dict[str, Any]] = None,
36 **kwargs,
37 ):
38 """
39 Initialize the Library RAG search engine.
41 Args:
42 llm: Language model for relevance filtering
43 max_filtered_results: Maximum number of results to keep after filtering
44 max_results: Maximum number of search results
45 settings_snapshot: Settings snapshot from thread context
46 **kwargs: Additional engine-specific parameters
47 """
48 super().__init__(
49 llm=llm,
50 max_filtered_results=max_filtered_results,
51 max_results=max_results,
52 settings_snapshot=settings_snapshot,
53 **kwargs,
54 )
55 self.username = (
56 settings_snapshot.get("_username") if settings_snapshot else None
57 )
59 if not self.username:
60 logger.warning(
61 "Library RAG search engine initialized without username"
62 )
64 # Get RAG configuration from settings
65 self.embedding_model = get_setting_from_snapshot(
66 "local_search_embedding_model",
67 settings_snapshot,
68 "all-MiniLM-L6-v2",
69 )
70 self.embedding_provider = get_setting_from_snapshot(
71 "local_search_embedding_provider",
72 settings_snapshot,
73 "sentence_transformers",
74 )
75 self.chunk_size = get_setting_from_snapshot(
76 "local_search_chunk_size", settings_snapshot, 1000
77 )
78 self.chunk_overlap = get_setting_from_snapshot(
79 "local_search_chunk_overlap", settings_snapshot, 200
80 )
82 # Extract server URL from settings snapshot for link generation
83 self.server_url = get_server_url(settings_snapshot)
85 def search(
86 self,
87 query: str,
88 limit: int = 10,
89 llm_callback=None,
90 extra_params: Optional[Dict[str, Any]] = None,
91 ) -> List[Dict[str, Any]]:
92 """
93 Search the library using semantic search.
95 Args:
96 query: Search query
97 limit: Maximum number of results to return
98 llm_callback: Optional LLM callback for processing results
99 extra_params: Additional search parameters
101 Returns:
102 List of search results with title, url, snippet, etc.
103 """
104 if not self.username:
105 logger.error("Cannot search library without username")
106 return []
108 try:
109 # Initialize services
110 library_service = LibraryService(username=self.username)
112 # Get all collections for this user
113 collections = library_service.get_all_collections()
114 if not collections:
115 logger.info("No collections found for user")
116 return []
118 # Search across all collections and merge results
119 all_docs_with_scores = []
120 for collection in collections:
121 collection_id = collection.get("id")
122 if not collection_id: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 continue
125 try:
126 # Get the RAG index for this collection to find embedding settings
127 with get_user_db_session(self.username) as session:
128 collection_name = f"collection_{collection_id}"
129 rag_index = (
130 session.query(RAGIndex)
131 .filter_by(
132 collection_name=collection_name,
133 is_current=True,
134 )
135 .first()
136 )
138 if not rag_index: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 logger.debug(
140 f"No RAG index found for collection {collection_id}"
141 )
142 continue
144 # Get embedding settings from the RAG index
145 embedding_model = rag_index.embedding_model
146 embedding_provider = (
147 rag_index.embedding_model_type.value
148 )
149 chunk_size = rag_index.chunk_size or self.chunk_size
150 chunk_overlap = (
151 rag_index.chunk_overlap or self.chunk_overlap
152 )
154 # Create RAG service with the collection's embedding settings
155 with LibraryRAGService(
156 username=self.username,
157 embedding_model=embedding_model,
158 embedding_provider=embedding_provider,
159 chunk_size=chunk_size,
160 chunk_overlap=chunk_overlap,
161 ) as rag_service:
162 # Get RAG stats to check if there are any indexed documents
163 stats = rag_service.get_rag_stats(collection_id)
164 if stats.get("indexed_documents", 0) == 0: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 logger.debug(
166 f"No documents indexed in collection {collection_id}"
167 )
168 continue
170 # Load the FAISS index for this collection
171 vector_store = rag_service.load_or_create_faiss_index(
172 collection_id
173 )
175 # Search this collection's index
176 docs_with_scores = (
177 vector_store.similarity_search_with_score(
178 query, k=limit
179 )
180 )
182 # Add collection info to metadata and append to results
183 for doc, score in docs_with_scores:
184 if not doc.metadata: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true
185 doc.metadata = {}
186 doc.metadata["collection_id"] = collection_id
187 doc.metadata["collection_name"] = collection.get(
188 "name", "Unknown"
189 )
190 all_docs_with_scores.append((doc, score))
192 except Exception as e:
193 logger.warning(
194 f"Error searching collection {collection_id}: {e}"
195 )
196 continue
198 # Sort all results by score (lower is better for distance)
199 all_docs_with_scores.sort(key=lambda x: x[1])
201 # Take top results across all collections
202 docs_with_scores = all_docs_with_scores[:limit]
204 if not docs_with_scores: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 logger.info("No results found across any collections")
206 return []
208 # Convert Document objects to search results format
209 results = []
210 for doc, score in docs_with_scores:
211 # Extract metadata from Document object
212 metadata = doc.metadata or {}
214 # Try both source_id and document_id for compatibility
215 doc_id = metadata.get("source_id") or metadata.get(
216 "document_id"
217 )
219 # Get title from metadata, with fallbacks
220 title = (
221 metadata.get("document_title")
222 or metadata.get("title")
223 or (f"Document {doc_id}" if doc_id else "Untitled")
224 )
226 # Content is stored in page_content
227 snippet = (
228 doc.page_content[:SNIPPET_LENGTH_LONG] + "..."
229 if len(doc.page_content) > SNIPPET_LENGTH_LONG
230 else doc.page_content
231 )
233 # Generate URL to document content
234 # Default to root document page (shows all options: PDF, Text, Chunks, etc.)
235 document_url = f"/library/document/{doc_id}" if doc_id else "#"
237 if doc_id: 237 ↛ 264line 237 didn't jump to line 264 because the condition on line 237 was always true
238 try:
239 with get_user_db_session(self.username) as session:
240 document = (
241 session.query(Document)
242 .filter_by(id=doc_id)
243 .first()
244 )
245 if document: 245 ↛ 264line 245 didn't jump to line 264
246 from pathlib import Path
248 library_root = get_setting_from_snapshot(
249 "research_library.storage_path",
250 self.settings_snapshot,
251 str(get_library_directory()),
252 )
253 library_root = Path(library_root).expanduser()
254 pdf_manager = PDFStorageManager(
255 library_root, "auto"
256 )
257 if pdf_manager.has_pdf(document, session):
258 document_url = (
259 f"/library/document/{doc_id}/pdf"
260 )
261 except Exception as e:
262 logger.warning(f"Error querying document {doc_id}: {e}")
264 result = {
265 "title": title,
266 "snippet": snippet,
267 "url": document_url,
268 "link": document_url, # Add "link" for source extraction
269 "source": "library",
270 "relevance_score": float(
271 1 / (1 + score)
272 ), # Convert distance to similarity
273 "metadata": metadata,
274 }
276 results.append(result)
278 logger.info(
279 f"Library RAG search returned {len(results)} results for query: {query}"
280 )
281 return results
283 except Exception:
284 logger.exception("Error searching library RAG")
285 return []
287 def _get_previews(
288 self,
289 query: str,
290 limit: int = 10,
291 llm_callback=None,
292 extra_params: Optional[Dict[str, Any]] = None,
293 ) -> List[Dict[str, Any]]:
294 """
295 Get preview results for the query.
296 Delegates to the search method.
297 """
298 return self.search(query, limit, llm_callback, extra_params)
300 def _get_full_content(
301 self, relevant_items: List[Dict[str, Any]]
302 ) -> List[Dict[str, Any]]:
303 """
304 Get full content for relevant library documents.
305 Retrieves complete document text instead of just snippets.
306 """
307 # Check if we should get full content
308 from ... import search_config
310 if (
311 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
312 and search_config.SEARCH_SNIPPETS_ONLY
313 ):
314 logger.info("Snippet-only mode, skipping full content retrieval")
315 return relevant_items
317 if not self.username:
318 logger.error("Cannot retrieve full content without username")
319 return relevant_items
321 try:
322 from ...database.models.library import Document
323 from ...database.session_context import get_user_db_session
325 # Retrieve full content for each document
326 for item in relevant_items:
327 doc_id = item.get("metadata", {}).get("document_id")
328 if not doc_id:
329 continue
331 # Get full document text from database
332 with get_user_db_session(self.username) as db_session:
333 document = (
334 db_session.query(Document).filter_by(id=doc_id).first()
335 )
337 if document and document.text_content:
338 # Replace snippet with full content
339 item["content"] = document.text_content
340 item["snippet"] = (
341 document.text_content[:SNIPPET_LENGTH_LONG] + "..."
342 if len(document.text_content) > SNIPPET_LENGTH_LONG
343 else document.text_content
344 )
345 logger.debug(
346 f"Retrieved full content for document {doc_id}"
347 )
349 return relevant_items
351 except Exception:
352 logger.exception("Error retrieving full content from library")
353 return relevant_items
355 def close(self):
356 """Clean up resources."""
357 pass