Coverage for src / local_deep_research / web_search_engines / engines / search_engine_library.py: 65%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-25 01:07 +0000

1""" 

2Library RAG Search Engine 

3 

4Provides semantic search over the user's personal research library using RAG. 

5""" 

6 

7from typing import List, Dict, Any, Optional 

8from loguru import logger 

9 

10from ..search_engine_base import BaseSearchEngine 

11from ...constants import SNIPPET_LENGTH_LONG 

12from ...research_library.services.library_rag_service import LibraryRAGService 

13from ...research_library.services.library_service import LibraryService 

14from ...config.thread_settings import get_setting_from_snapshot 

15from ...utilities.llm_utils import get_server_url 

16from ...database.models.library import RAGIndex, Document 

17from ...research_library.services.pdf_storage_manager import PDFStorageManager 

18from ...database.session_context import get_user_db_session 

19from ...config.paths import get_library_directory 

20 

21 

22class LibraryRAGSearchEngine(BaseSearchEngine): 

23 """ 

24 Search engine that queries the user's research library using RAG/semantic search. 

25 """ 

26 

27 # Mark as local RAG engine 

28 is_local = True 

29 

30 def __init__( 

31 self, 

32 llm: Optional[Any] = None, 

33 max_filtered_results: Optional[int] = None, 

34 max_results: int = 10, 

35 settings_snapshot: Optional[Dict[str, Any]] = None, 

36 **kwargs, 

37 ): 

38 """ 

39 Initialize the Library RAG search engine. 

40 

41 Args: 

42 llm: Language model for relevance filtering 

43 max_filtered_results: Maximum number of results to keep after filtering 

44 max_results: Maximum number of search results 

45 settings_snapshot: Settings snapshot from thread context 

46 **kwargs: Additional engine-specific parameters 

47 """ 

48 super().__init__( 

49 llm=llm, 

50 max_filtered_results=max_filtered_results, 

51 max_results=max_results, 

52 settings_snapshot=settings_snapshot, 

53 **kwargs, 

54 ) 

55 self.username = ( 

56 settings_snapshot.get("_username") if settings_snapshot else None 

57 ) 

58 

59 if not self.username: 

60 logger.warning( 

61 "Library RAG search engine initialized without username" 

62 ) 

63 

64 # Get RAG configuration from settings 

65 self.embedding_model = get_setting_from_snapshot( 

66 "local_search_embedding_model", 

67 settings_snapshot, 

68 "all-MiniLM-L6-v2", 

69 ) 

70 self.embedding_provider = get_setting_from_snapshot( 

71 "local_search_embedding_provider", 

72 settings_snapshot, 

73 "sentence_transformers", 

74 ) 

75 self.chunk_size = get_setting_from_snapshot( 

76 "local_search_chunk_size", settings_snapshot, 1000 

77 ) 

78 self.chunk_overlap = get_setting_from_snapshot( 

79 "local_search_chunk_overlap", settings_snapshot, 200 

80 ) 

81 

82 # Extract server URL from settings snapshot for link generation 

83 self.server_url = get_server_url(settings_snapshot) 

84 

85 def search( 

86 self, 

87 query: str, 

88 limit: int = 10, 

89 llm_callback=None, 

90 extra_params: Optional[Dict[str, Any]] = None, 

91 ) -> List[Dict[str, Any]]: 

92 """ 

93 Search the library using semantic search. 

94 

95 Args: 

96 query: Search query 

97 limit: Maximum number of results to return 

98 llm_callback: Optional LLM callback for processing results 

99 extra_params: Additional search parameters 

100 

101 Returns: 

102 List of search results with title, url, snippet, etc. 

103 """ 

104 if not self.username: 

105 logger.error("Cannot search library without username") 

106 return [] 

107 

108 try: 

109 # Initialize services 

110 library_service = LibraryService(username=self.username) 

111 

112 # Get all collections for this user 

113 collections = library_service.get_all_collections() 

114 if not collections: 

115 logger.info("No collections found for user") 

116 return [] 

117 

118 # Search across all collections and merge results 

119 all_docs_with_scores = [] 

120 for collection in collections: 

121 collection_id = collection.get("id") 

122 if not collection_id: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 continue 

124 

125 try: 

126 # Get the RAG index for this collection to find embedding settings 

127 with get_user_db_session(self.username) as session: 

128 collection_name = f"collection_{collection_id}" 

129 rag_index = ( 

130 session.query(RAGIndex) 

131 .filter_by( 

132 collection_name=collection_name, 

133 is_current=True, 

134 ) 

135 .first() 

136 ) 

137 

138 if not rag_index: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 logger.debug( 

140 f"No RAG index found for collection {collection_id}" 

141 ) 

142 continue 

143 

144 # Get embedding settings from the RAG index 

145 embedding_model = rag_index.embedding_model 

146 embedding_provider = ( 

147 rag_index.embedding_model_type.value 

148 ) 

149 chunk_size = rag_index.chunk_size or self.chunk_size 

150 chunk_overlap = ( 

151 rag_index.chunk_overlap or self.chunk_overlap 

152 ) 

153 

154 # Create RAG service with the collection's embedding settings 

155 with LibraryRAGService( 

156 username=self.username, 

157 embedding_model=embedding_model, 

158 embedding_provider=embedding_provider, 

159 chunk_size=chunk_size, 

160 chunk_overlap=chunk_overlap, 

161 ) as rag_service: 

162 # Get RAG stats to check if there are any indexed documents 

163 stats = rag_service.get_rag_stats(collection_id) 

164 if stats.get("indexed_documents", 0) == 0: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 logger.debug( 

166 f"No documents indexed in collection {collection_id}" 

167 ) 

168 continue 

169 

170 # Load the FAISS index for this collection 

171 vector_store = rag_service.load_or_create_faiss_index( 

172 collection_id 

173 ) 

174 

175 # Search this collection's index 

176 docs_with_scores = ( 

177 vector_store.similarity_search_with_score( 

178 query, k=limit 

179 ) 

180 ) 

181 

182 # Add collection info to metadata and append to results 

183 for doc, score in docs_with_scores: 

184 if not doc.metadata: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 doc.metadata = {} 

186 doc.metadata["collection_id"] = collection_id 

187 doc.metadata["collection_name"] = collection.get( 

188 "name", "Unknown" 

189 ) 

190 all_docs_with_scores.append((doc, score)) 

191 

192 except Exception as e: 

193 logger.warning( 

194 f"Error searching collection {collection_id}: {e}" 

195 ) 

196 continue 

197 

198 # Sort all results by score (lower is better for distance) 

199 all_docs_with_scores.sort(key=lambda x: x[1]) 

200 

201 # Take top results across all collections 

202 docs_with_scores = all_docs_with_scores[:limit] 

203 

204 if not docs_with_scores: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 logger.info("No results found across any collections") 

206 return [] 

207 

208 # Convert Document objects to search results format 

209 results = [] 

210 for doc, score in docs_with_scores: 

211 # Extract metadata from Document object 

212 metadata = doc.metadata or {} 

213 

214 # Try both source_id and document_id for compatibility 

215 doc_id = metadata.get("source_id") or metadata.get( 

216 "document_id" 

217 ) 

218 

219 # Get title from metadata, with fallbacks 

220 title = ( 

221 metadata.get("document_title") 

222 or metadata.get("title") 

223 or (f"Document {doc_id}" if doc_id else "Untitled") 

224 ) 

225 

226 # Content is stored in page_content 

227 snippet = ( 

228 doc.page_content[:SNIPPET_LENGTH_LONG] + "..." 

229 if len(doc.page_content) > SNIPPET_LENGTH_LONG 

230 else doc.page_content 

231 ) 

232 

233 # Generate URL to document content 

234 # Default to root document page (shows all options: PDF, Text, Chunks, etc.) 

235 document_url = f"/library/document/{doc_id}" if doc_id else "#" 

236 

237 if doc_id: 237 ↛ 264line 237 didn't jump to line 264 because the condition on line 237 was always true

238 try: 

239 with get_user_db_session(self.username) as session: 

240 document = ( 

241 session.query(Document) 

242 .filter_by(id=doc_id) 

243 .first() 

244 ) 

245 if document: 245 ↛ 264line 245 didn't jump to line 264

246 from pathlib import Path 

247 

248 library_root = get_setting_from_snapshot( 

249 "research_library.storage_path", 

250 self.settings_snapshot, 

251 str(get_library_directory()), 

252 ) 

253 library_root = Path(library_root).expanduser() 

254 pdf_manager = PDFStorageManager( 

255 library_root, "auto" 

256 ) 

257 if pdf_manager.has_pdf(document, session): 

258 document_url = ( 

259 f"/library/document/{doc_id}/pdf" 

260 ) 

261 except Exception as e: 

262 logger.warning(f"Error querying document {doc_id}: {e}") 

263 

264 result = { 

265 "title": title, 

266 "snippet": snippet, 

267 "url": document_url, 

268 "link": document_url, # Add "link" for source extraction 

269 "source": "library", 

270 "relevance_score": float( 

271 1 / (1 + score) 

272 ), # Convert distance to similarity 

273 "metadata": metadata, 

274 } 

275 

276 results.append(result) 

277 

278 logger.info( 

279 f"Library RAG search returned {len(results)} results for query: {query}" 

280 ) 

281 return results 

282 

283 except Exception: 

284 logger.exception("Error searching library RAG") 

285 return [] 

286 

287 def _get_previews( 

288 self, 

289 query: str, 

290 limit: int = 10, 

291 llm_callback=None, 

292 extra_params: Optional[Dict[str, Any]] = None, 

293 ) -> List[Dict[str, Any]]: 

294 """ 

295 Get preview results for the query. 

296 Delegates to the search method. 

297 """ 

298 return self.search(query, limit, llm_callback, extra_params) 

299 

300 def _get_full_content( 

301 self, relevant_items: List[Dict[str, Any]] 

302 ) -> List[Dict[str, Any]]: 

303 """ 

304 Get full content for relevant library documents. 

305 Retrieves complete document text instead of just snippets. 

306 """ 

307 # Check if we should get full content 

308 from ... import search_config 

309 

310 if ( 

311 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

312 and search_config.SEARCH_SNIPPETS_ONLY 

313 ): 

314 logger.info("Snippet-only mode, skipping full content retrieval") 

315 return relevant_items 

316 

317 if not self.username: 

318 logger.error("Cannot retrieve full content without username") 

319 return relevant_items 

320 

321 try: 

322 from ...database.models.library import Document 

323 from ...database.session_context import get_user_db_session 

324 

325 # Retrieve full content for each document 

326 for item in relevant_items: 

327 doc_id = item.get("metadata", {}).get("document_id") 

328 if not doc_id: 

329 continue 

330 

331 # Get full document text from database 

332 with get_user_db_session(self.username) as db_session: 

333 document = ( 

334 db_session.query(Document).filter_by(id=doc_id).first() 

335 ) 

336 

337 if document and document.text_content: 

338 # Replace snippet with full content 

339 item["content"] = document.text_content 

340 item["snippet"] = ( 

341 document.text_content[:SNIPPET_LENGTH_LONG] + "..." 

342 if len(document.text_content) > SNIPPET_LENGTH_LONG 

343 else document.text_content 

344 ) 

345 logger.debug( 

346 f"Retrieved full content for document {doc_id}" 

347 ) 

348 

349 return relevant_items 

350 

351 except Exception: 

352 logger.exception("Error retrieving full content from library") 

353 return relevant_items 

354 

355 def close(self): 

356 """Clean up resources.""" 

357 pass