Coverage for src / local_deep_research / web_search_engines / engines / search_engine_library.py: 11%

126 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Library RAG Search Engine 

3 

4Provides semantic search over the user's personal research library using RAG. 

5""" 

6 

7from typing import List, Dict, Any, Optional 

8from loguru import logger 

9 

10from ..search_engine_base import BaseSearchEngine 

11from ...research_library.services.library_rag_service import LibraryRAGService 

12from ...research_library.services.library_service import LibraryService 

13from ...config.thread_settings import get_setting_from_snapshot 

14from ...utilities.llm_utils import get_server_url 

15from ...database.models.library import RAGIndex, Document 

16from ...research_library.services.pdf_storage_manager import PDFStorageManager 

17from ...database.session_context import get_user_db_session 

18from ...config.paths import get_library_directory 

19 

20 

21class LibraryRAGSearchEngine(BaseSearchEngine): 

22 """ 

23 Search engine that queries the user's research library using RAG/semantic search. 

24 """ 

25 

26 # Mark as local RAG engine 

27 is_local = True 

28 

29 def __init__( 

30 self, 

31 llm: Optional[Any] = None, 

32 max_filtered_results: Optional[int] = None, 

33 max_results: int = 10, 

34 settings_snapshot: Optional[Dict[str, Any]] = None, 

35 **kwargs, 

36 ): 

37 """ 

38 Initialize the Library RAG search engine. 

39 

40 Args: 

41 llm: Language model for relevance filtering 

42 max_filtered_results: Maximum number of results to keep after filtering 

43 max_results: Maximum number of search results 

44 settings_snapshot: Settings snapshot from thread context 

45 **kwargs: Additional engine-specific parameters 

46 """ 

47 super().__init__( 

48 llm=llm, 

49 max_filtered_results=max_filtered_results, 

50 max_results=max_results, 

51 settings_snapshot=settings_snapshot, 

52 **kwargs, 

53 ) 

54 self.username = ( 

55 settings_snapshot.get("_username") if settings_snapshot else None 

56 ) 

57 

58 if not self.username: 

59 logger.warning( 

60 "Library RAG search engine initialized without username" 

61 ) 

62 

63 # Get RAG configuration from settings 

64 self.embedding_model = get_setting_from_snapshot( 

65 "local_search_embedding_model", 

66 settings_snapshot, 

67 "all-MiniLM-L6-v2", 

68 ) 

69 self.embedding_provider = get_setting_from_snapshot( 

70 "local_search_embedding_provider", 

71 settings_snapshot, 

72 "sentence_transformers", 

73 ) 

74 self.chunk_size = get_setting_from_snapshot( 

75 "local_search_chunk_size", settings_snapshot, 1000 

76 ) 

77 self.chunk_overlap = get_setting_from_snapshot( 

78 "local_search_chunk_overlap", settings_snapshot, 200 

79 ) 

80 

81 # Extract server URL from settings snapshot for link generation 

82 self.server_url = get_server_url(settings_snapshot) 

83 

84 def search( 

85 self, 

86 query: str, 

87 limit: int = 10, 

88 llm_callback=None, 

89 extra_params: Optional[Dict[str, Any]] = None, 

90 ) -> List[Dict[str, Any]]: 

91 """ 

92 Search the library using semantic search. 

93 

94 Args: 

95 query: Search query 

96 limit: Maximum number of results to return 

97 llm_callback: Optional LLM callback for processing results 

98 extra_params: Additional search parameters 

99 

100 Returns: 

101 List of search results with title, url, snippet, etc. 

102 """ 

103 if not self.username: 

104 logger.error("Cannot search library without username") 

105 return [] 

106 

107 try: 

108 # Initialize services 

109 library_service = LibraryService(username=self.username) 

110 

111 # Get all collections for this user 

112 collections = library_service.get_all_collections() 

113 if not collections: 

114 logger.info("No collections found for user") 

115 return [] 

116 

117 # Search across all collections and merge results 

118 all_docs_with_scores = [] 

119 for collection in collections: 

120 collection_id = collection.get("id") 

121 if not collection_id: 

122 continue 

123 

124 try: 

125 # Get the RAG index for this collection to find embedding settings 

126 with get_user_db_session(self.username) as session: 

127 collection_name = f"collection_{collection_id}" 

128 rag_index = ( 

129 session.query(RAGIndex) 

130 .filter_by( 

131 collection_name=collection_name, 

132 is_current=True, 

133 ) 

134 .first() 

135 ) 

136 

137 if not rag_index: 

138 logger.debug( 

139 f"No RAG index found for collection {collection_id}" 

140 ) 

141 continue 

142 

143 # Get embedding settings from the RAG index 

144 embedding_model = rag_index.embedding_model 

145 embedding_provider = ( 

146 rag_index.embedding_model_type.value 

147 ) 

148 chunk_size = rag_index.chunk_size or self.chunk_size 

149 chunk_overlap = ( 

150 rag_index.chunk_overlap or self.chunk_overlap 

151 ) 

152 

153 # Create RAG service with the collection's embedding settings 

154 rag_service = LibraryRAGService( 

155 username=self.username, 

156 embedding_model=embedding_model, 

157 embedding_provider=embedding_provider, 

158 chunk_size=chunk_size, 

159 chunk_overlap=chunk_overlap, 

160 ) 

161 

162 # Get RAG stats to check if there are any indexed documents 

163 stats = rag_service.get_rag_stats(collection_id) 

164 if stats.get("indexed_documents", 0) == 0: 

165 logger.debug( 

166 f"No documents indexed in collection {collection_id}" 

167 ) 

168 continue 

169 

170 # Load the FAISS index for this collection 

171 vector_store = rag_service.load_or_create_faiss_index( 

172 collection_id 

173 ) 

174 

175 # Search this collection's index 

176 docs_with_scores = ( 

177 vector_store.similarity_search_with_score( 

178 query, k=limit 

179 ) 

180 ) 

181 

182 # Add collection info to metadata and append to results 

183 for doc, score in docs_with_scores: 

184 if not doc.metadata: 

185 doc.metadata = {} 

186 doc.metadata["collection_id"] = collection_id 

187 doc.metadata["collection_name"] = collection.get( 

188 "name", "Unknown" 

189 ) 

190 all_docs_with_scores.append((doc, score)) 

191 

192 except Exception as e: 

193 logger.warning( 

194 f"Error searching collection {collection_id}: {e}" 

195 ) 

196 continue 

197 

198 # Sort all results by score (lower is better for distance) 

199 all_docs_with_scores.sort(key=lambda x: x[1]) 

200 

201 # Take top results across all collections 

202 docs_with_scores = all_docs_with_scores[:limit] 

203 

204 if not docs_with_scores: 

205 logger.info("No results found across any collections") 

206 return [] 

207 

208 # Convert Document objects to search results format 

209 results = [] 

210 for doc, score in docs_with_scores: 

211 # Extract metadata from Document object 

212 metadata = doc.metadata or {} 

213 

214 # Try both source_id and document_id for compatibility 

215 doc_id = metadata.get("source_id") or metadata.get( 

216 "document_id" 

217 ) 

218 

219 # Get title from metadata, with fallbacks 

220 title = ( 

221 metadata.get("document_title") 

222 or metadata.get("title") 

223 or (f"Document {doc_id}" if doc_id else "Untitled") 

224 ) 

225 

226 # Content is stored in page_content 

227 snippet = ( 

228 doc.page_content[:500] + "..." 

229 if len(doc.page_content) > 500 

230 else doc.page_content 

231 ) 

232 

233 # Generate URL to document content 

234 # Default to root document page (shows all options: PDF, Text, Chunks, etc.) 

235 document_url = f"/library/document/{doc_id}" if doc_id else "#" 

236 

237 if doc_id: 

238 try: 

239 with get_user_db_session(self.username) as session: 

240 document = ( 

241 session.query(Document) 

242 .filter_by(id=doc_id) 

243 .first() 

244 ) 

245 if document: 

246 from pathlib import Path 

247 

248 library_root = get_setting_from_snapshot( 

249 "research_library.storage_path", 

250 self.settings_snapshot, 

251 str(get_library_directory()), 

252 ) 

253 library_root = Path(library_root).expanduser() 

254 pdf_manager = PDFStorageManager( 

255 library_root, "auto" 

256 ) 

257 if pdf_manager.has_pdf(document, session): 

258 document_url = ( 

259 f"/library/document/{doc_id}/pdf" 

260 ) 

261 except Exception as e: 

262 logger.warning(f"Error querying document {doc_id}: {e}") 

263 

264 result = { 

265 "title": title, 

266 "snippet": snippet, 

267 "url": document_url, 

268 "link": document_url, # Add "link" for source extraction 

269 "source": "library", 

270 "relevance_score": float( 

271 1 / (1 + score) 

272 ), # Convert distance to similarity 

273 "metadata": metadata, 

274 } 

275 

276 results.append(result) 

277 

278 logger.info( 

279 f"Library RAG search returned {len(results)} results for query: {query}" 

280 ) 

281 return results 

282 

283 except Exception: 

284 logger.exception("Error searching library RAG") 

285 return [] 

286 

287 def _get_previews( 

288 self, 

289 query: str, 

290 limit: int = 10, 

291 llm_callback=None, 

292 extra_params: Optional[Dict[str, Any]] = None, 

293 ) -> List[Dict[str, Any]]: 

294 """ 

295 Get preview results for the query. 

296 Delegates to the search method. 

297 """ 

298 return self.search(query, limit, llm_callback, extra_params) 

299 

300 def _get_full_content( 

301 self, relevant_items: List[Dict[str, Any]] 

302 ) -> List[Dict[str, Any]]: 

303 """ 

304 Get full content for relevant library documents. 

305 Retrieves complete document text instead of just snippets. 

306 """ 

307 # Check if we should get full content 

308 from ... import search_config 

309 

310 if ( 

311 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

312 and search_config.SEARCH_SNIPPETS_ONLY 

313 ): 

314 logger.info("Snippet-only mode, skipping full content retrieval") 

315 return relevant_items 

316 

317 if not self.username: 

318 logger.error("Cannot retrieve full content without username") 

319 return relevant_items 

320 

321 try: 

322 from ...database.models.library import Document 

323 from ...database.session_context import get_user_db_session 

324 

325 # Retrieve full content for each document 

326 for item in relevant_items: 

327 doc_id = item.get("metadata", {}).get("document_id") 

328 if not doc_id: 

329 continue 

330 

331 # Get full document text from database 

332 with get_user_db_session(self.username) as db_session: 

333 document = ( 

334 db_session.query(Document).filter_by(id=doc_id).first() 

335 ) 

336 

337 if document and document.text_content: 

338 # Replace snippet with full content 

339 item["content"] = document.text_content 

340 item["snippet"] = ( 

341 document.text_content[:500] + "..." 

342 if len(document.text_content) > 500 

343 else document.text_content 

344 ) 

345 logger.debug( 

346 f"Retrieved full content for document {doc_id}" 

347 ) 

348 

349 return relevant_items 

350 

351 except Exception: 

352 logger.exception("Error retrieving full content from library") 

353 return relevant_items 

354 

355 def close(self): 

356 """Clean up resources.""" 

357 pass