Coverage for src / local_deep_research / web_search_engines / engines / search_engine_library.py: 93%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Library RAG Search Engine
4Provides semantic search over the user's personal research library using RAG.
5"""
7import os
8from typing import List, Dict, Any, Optional
9from loguru import logger
11from ..search_engine_base import BaseSearchEngine
12from ...constants import SNIPPET_LENGTH_LONG
13from ...research_library.services.library_rag_service import LibraryRAGService
14from ...research_library.services.library_service import LibraryService
15from ...config.thread_settings import get_setting_from_snapshot
16from ...utilities.llm_utils import get_server_url
17from ...database.models.library import RAGIndex, Document
18from ...research_library.services.pdf_storage_manager import PDFStorageManager
19from ...database.session_context import get_user_db_session
20from ...config.paths import get_library_directory
23class LibraryRAGSearchEngine(BaseSearchEngine):
24 """
25 Search engine that queries the user's research library using RAG/semantic search.
26 """
28 # Mark as local RAG engine
29 is_local = True
31 def __init__(
32 self,
33 llm: Optional[Any] = None,
34 max_filtered_results: Optional[int] = None,
35 max_results: int = 10,
36 settings_snapshot: Optional[Dict[str, Any]] = None,
37 **kwargs,
38 ):
39 """
40 Initialize the Library RAG search engine.
42 Args:
43 llm: Language model for relevance filtering
44 max_filtered_results: Maximum number of results to keep after filtering
45 max_results: Maximum number of search results
46 settings_snapshot: Settings snapshot from thread context
47 **kwargs: Additional engine-specific parameters
48 """
49 super().__init__(
50 llm=llm,
51 max_filtered_results=max_filtered_results,
52 max_results=max_results,
53 settings_snapshot=settings_snapshot,
54 **kwargs,
55 )
56 self.username = (
57 settings_snapshot.get("_username") if settings_snapshot else None
58 )
60 if not self.username:
61 logger.warning(
62 "Library RAG search engine initialized without username"
63 )
65 # Get RAG configuration from settings
66 self.embedding_model = get_setting_from_snapshot(
67 "local_search_embedding_model",
68 settings_snapshot,
69 "all-MiniLM-L6-v2",
70 )
71 self.embedding_provider = get_setting_from_snapshot(
72 "local_search_embedding_provider",
73 settings_snapshot,
74 "sentence_transformers",
75 )
76 self.chunk_size = get_setting_from_snapshot(
77 "local_search_chunk_size", settings_snapshot, 1000
78 )
79 self.chunk_overlap = get_setting_from_snapshot(
80 "local_search_chunk_overlap", settings_snapshot, 200
81 )
83 # Extract server URL from settings snapshot for link generation
84 self.server_url = get_server_url(settings_snapshot)
86 def search(
87 self,
88 query: str,
89 limit: int = 10,
90 llm_callback=None,
91 extra_params: Optional[Dict[str, Any]] = None,
92 ) -> List[Dict[str, Any]]:
93 """
94 Search the library using semantic search.
96 Args:
97 query: Search query
98 limit: Maximum number of results to return
99 llm_callback: Optional LLM callback for processing results
100 extra_params: Additional search parameters
102 Returns:
103 List of search results with title, url, snippet, etc.
104 """
105 if not self.username:
106 logger.error("Cannot search library without username")
107 return []
109 try:
110 # Initialize services
111 library_service = LibraryService(username=self.username)
113 # Get all collections for this user
114 collections = library_service.get_all_collections()
115 if not collections:
116 logger.info("No collections found for user")
117 return []
119 # Search across all collections and merge results
120 all_docs_with_scores = []
121 for collection in collections:
122 collection_id = collection.get("id")
123 if not collection_id:
124 continue
126 try:
127 # Get the RAG index for this collection to find embedding settings
128 with get_user_db_session(self.username) as session:
129 collection_name = f"collection_{collection_id}"
130 rag_index = (
131 session.query(RAGIndex)
132 .filter_by(
133 collection_name=collection_name,
134 is_current=True,
135 )
136 .first()
137 )
139 if not rag_index:
140 logger.debug(
141 f"No RAG index found for collection {collection_id}"
142 )
143 continue
145 # Get embedding settings from the RAG index
146 embedding_model = rag_index.embedding_model
147 embedding_provider = (
148 rag_index.embedding_model_type.value
149 )
150 chunk_size = rag_index.chunk_size or self.chunk_size
151 chunk_overlap = (
152 rag_index.chunk_overlap or self.chunk_overlap
153 )
155 # Create RAG service with the collection's embedding settings
156 with LibraryRAGService(
157 username=self.username,
158 embedding_model=embedding_model,
159 embedding_provider=embedding_provider,
160 chunk_size=chunk_size,
161 chunk_overlap=chunk_overlap,
162 ) as rag_service:
163 # Get RAG stats to check if there are any indexed documents
164 stats = rag_service.get_rag_stats(collection_id)
165 if stats.get("indexed_documents", 0) == 0:
166 logger.debug(
167 f"No documents indexed in collection {collection_id}"
168 )
169 continue
171 # Load the FAISS index for this collection
172 vector_store = rag_service.load_or_create_faiss_index(
173 collection_id
174 )
176 # Search this collection's index
177 docs_with_scores = (
178 vector_store.similarity_search_with_score(
179 query, k=limit
180 )
181 )
183 # Add collection info to metadata and append to results
184 for doc, score in docs_with_scores:
185 if not doc.metadata:
186 doc.metadata = {}
187 doc.metadata["collection_id"] = collection_id
188 doc.metadata["collection_name"] = collection.get(
189 "name", "Unknown"
190 )
191 all_docs_with_scores.append((doc, score))
193 except Exception:
194 logger.warning(
195 f"Error searching collection {collection_id}"
196 )
197 continue
199 # Sort all results by score (lower is better for distance)
200 all_docs_with_scores.sort(key=lambda x: x[1])
202 # Take top results across all collections
203 docs_with_scores = all_docs_with_scores[:limit]
205 if not docs_with_scores:
206 logger.info("No results found across any collections")
207 return []
209 # Convert Document objects to search results format
210 results = []
211 for doc, score in docs_with_scores:
212 # Extract metadata from Document object
213 metadata = doc.metadata or {}
215 # Try both source_id and document_id for compatibility
216 doc_id = metadata.get("source_id") or metadata.get(
217 "document_id"
218 )
220 # Get title from metadata, with fallbacks
221 title = (
222 metadata.get("document_title")
223 or metadata.get("title")
224 or (f"Document {doc_id}" if doc_id else "Untitled")
225 )
227 # Content is stored in page_content
228 snippet = (
229 doc.page_content[:SNIPPET_LENGTH_LONG] + "..."
230 if len(doc.page_content) > SNIPPET_LENGTH_LONG
231 else doc.page_content
232 )
234 # Generate URL to document content
235 # Default to root document page (shows all options: PDF, Text, Chunks, etc.)
236 document_url = f"/library/document/{doc_id}" if doc_id else "#"
238 if doc_id:
239 try:
240 with get_user_db_session(self.username) as session:
241 document = (
242 session.query(Document)
243 .filter_by(id=doc_id)
244 .first()
245 )
246 if document: 246 ↛ 268line 246 didn't jump to line 268
247 from pathlib import Path
249 library_root = get_setting_from_snapshot(
250 "research_library.storage_path",
251 self.settings_snapshot,
252 str(get_library_directory()),
253 )
254 library_root = (
255 Path(os.path.expandvars(library_root))
256 .expanduser()
257 .resolve()
258 )
259 if PDFStorageManager.pdf_exists(
260 library_root, document, session
261 ):
262 document_url = (
263 f"/library/document/{doc_id}/pdf"
264 )
265 except Exception:
266 logger.warning(f"Error querying document {doc_id}")
268 result = {
269 "title": title,
270 "snippet": snippet,
271 "url": document_url,
272 "link": document_url, # Add "link" for source extraction
273 "source": "library",
274 "source_type": "library",
275 "relevance_score": float(
276 1 / (1 + score)
277 ), # Convert distance to similarity
278 "metadata": metadata,
279 }
281 results.append(result)
283 logger.info(
284 f"Library RAG search returned {len(results)} results for query: {query}"
285 )
286 return results
288 except Exception:
289 logger.exception("Error searching library RAG")
290 return []
292 def _get_previews(
293 self,
294 query: str,
295 limit: int = 10,
296 llm_callback=None,
297 extra_params: Optional[Dict[str, Any]] = None,
298 ) -> List[Dict[str, Any]]:
299 """
300 Get preview results for the query.
301 Delegates to the search method.
302 """
303 return self.search(query, limit, llm_callback, extra_params)
305 def _get_full_content(
306 self, relevant_items: List[Dict[str, Any]]
307 ) -> List[Dict[str, Any]]:
308 """
309 Get full content for relevant library documents.
310 Retrieves complete document text instead of just snippets.
311 """
312 # Check if we should get full content
313 from ...config import search_config
315 if (
316 hasattr(search_config, "SEARCH_SNIPPETS_ONLY")
317 and search_config.SEARCH_SNIPPETS_ONLY
318 ):
319 logger.info("Snippet-only mode, skipping full content retrieval")
320 return relevant_items
322 if not self.username:
323 logger.error("Cannot retrieve full content without username")
324 return relevant_items
326 try:
327 from ...database.models.library import Document
328 from ...database.session_context import get_user_db_session
330 # Retrieve full content for each document
331 for item in relevant_items: 331 ↛ 354line 331 didn't jump to line 354 because the loop on line 331 didn't complete
332 doc_id = item.get("metadata", {}).get("document_id")
333 if not doc_id: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 continue
336 # Get full document text from database
337 with get_user_db_session(self.username) as db_session:
338 document = (
339 db_session.query(Document).filter_by(id=doc_id).first()
340 )
342 if document and document.text_content:
343 # Replace snippet with full content
344 item["content"] = document.text_content
345 item["snippet"] = (
346 document.text_content[:SNIPPET_LENGTH_LONG] + "..."
347 if len(document.text_content) > SNIPPET_LENGTH_LONG
348 else document.text_content
349 )
350 logger.debug(
351 f"Retrieved full content for document {doc_id}"
352 )
354 return relevant_items
356 except Exception:
357 logger.exception("Error retrieving full content from library")
358 return relevant_items
360 def close(self):
361 """Clean up resources."""
362 pass