Coverage for src / local_deep_research / web_search_engines / engines / search_engine_library.py: 93%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:55 +0000

1""" 

2Library RAG Search Engine 

3 

4Provides semantic search over the user's personal research library using RAG. 

5""" 

6 

7import os 

8from typing import List, Dict, Any, Optional 

9from loguru import logger 

10 

11from ..search_engine_base import BaseSearchEngine 

12from ...constants import SNIPPET_LENGTH_LONG 

13from ...research_library.services.library_rag_service import LibraryRAGService 

14from ...research_library.services.library_service import LibraryService 

15from ...config.thread_settings import get_setting_from_snapshot 

16from ...utilities.llm_utils import get_server_url 

17from ...database.models.library import RAGIndex, Document 

18from ...research_library.services.pdf_storage_manager import PDFStorageManager 

19from ...database.session_context import get_user_db_session 

20from ...config.paths import get_library_directory 

21 

22 

23class LibraryRAGSearchEngine(BaseSearchEngine): 

24 """ 

25 Search engine that queries the user's research library using RAG/semantic search. 

26 """ 

27 

28 # Mark as local RAG engine 

29 is_local = True 

30 

31 def __init__( 

32 self, 

33 llm: Optional[Any] = None, 

34 max_filtered_results: Optional[int] = None, 

35 max_results: int = 10, 

36 settings_snapshot: Optional[Dict[str, Any]] = None, 

37 **kwargs, 

38 ): 

39 """ 

40 Initialize the Library RAG search engine. 

41 

42 Args: 

43 llm: Language model for relevance filtering 

44 max_filtered_results: Maximum number of results to keep after filtering 

45 max_results: Maximum number of search results 

46 settings_snapshot: Settings snapshot from thread context 

47 **kwargs: Additional engine-specific parameters 

48 """ 

49 super().__init__( 

50 llm=llm, 

51 max_filtered_results=max_filtered_results, 

52 max_results=max_results, 

53 settings_snapshot=settings_snapshot, 

54 **kwargs, 

55 ) 

56 self.username = ( 

57 settings_snapshot.get("_username") if settings_snapshot else None 

58 ) 

59 

60 if not self.username: 

61 logger.warning( 

62 "Library RAG search engine initialized without username" 

63 ) 

64 

65 # Get RAG configuration from settings 

66 self.embedding_model = get_setting_from_snapshot( 

67 "local_search_embedding_model", 

68 settings_snapshot, 

69 "all-MiniLM-L6-v2", 

70 ) 

71 self.embedding_provider = get_setting_from_snapshot( 

72 "local_search_embedding_provider", 

73 settings_snapshot, 

74 "sentence_transformers", 

75 ) 

76 self.chunk_size = get_setting_from_snapshot( 

77 "local_search_chunk_size", settings_snapshot, 1000 

78 ) 

79 self.chunk_overlap = get_setting_from_snapshot( 

80 "local_search_chunk_overlap", settings_snapshot, 200 

81 ) 

82 

83 # Extract server URL from settings snapshot for link generation 

84 self.server_url = get_server_url(settings_snapshot) 

85 

86 def search( 

87 self, 

88 query: str, 

89 limit: int = 10, 

90 llm_callback=None, 

91 extra_params: Optional[Dict[str, Any]] = None, 

92 ) -> List[Dict[str, Any]]: 

93 """ 

94 Search the library using semantic search. 

95 

96 Args: 

97 query: Search query 

98 limit: Maximum number of results to return 

99 llm_callback: Optional LLM callback for processing results 

100 extra_params: Additional search parameters 

101 

102 Returns: 

103 List of search results with title, url, snippet, etc. 

104 """ 

105 if not self.username: 

106 logger.error("Cannot search library without username") 

107 return [] 

108 

109 try: 

110 # Initialize services 

111 library_service = LibraryService(username=self.username) 

112 

113 # Get all collections for this user 

114 collections = library_service.get_all_collections() 

115 if not collections: 

116 logger.info("No collections found for user") 

117 return [] 

118 

119 # Search across all collections and merge results 

120 all_docs_with_scores = [] 

121 for collection in collections: 

122 collection_id = collection.get("id") 

123 if not collection_id: 

124 continue 

125 

126 try: 

127 # Get the RAG index for this collection to find embedding settings 

128 with get_user_db_session(self.username) as session: 

129 collection_name = f"collection_{collection_id}" 

130 rag_index = ( 

131 session.query(RAGIndex) 

132 .filter_by( 

133 collection_name=collection_name, 

134 is_current=True, 

135 ) 

136 .first() 

137 ) 

138 

139 if not rag_index: 

140 logger.debug( 

141 f"No RAG index found for collection {collection_id}" 

142 ) 

143 continue 

144 

145 # Get embedding settings from the RAG index 

146 embedding_model = rag_index.embedding_model 

147 embedding_provider = ( 

148 rag_index.embedding_model_type.value 

149 ) 

150 chunk_size = rag_index.chunk_size or self.chunk_size 

151 chunk_overlap = ( 

152 rag_index.chunk_overlap or self.chunk_overlap 

153 ) 

154 

155 # Create RAG service with the collection's embedding settings 

156 with LibraryRAGService( 

157 username=self.username, 

158 embedding_model=embedding_model, 

159 embedding_provider=embedding_provider, 

160 chunk_size=chunk_size, 

161 chunk_overlap=chunk_overlap, 

162 ) as rag_service: 

163 # Get RAG stats to check if there are any indexed documents 

164 stats = rag_service.get_rag_stats(collection_id) 

165 if stats.get("indexed_documents", 0) == 0: 

166 logger.debug( 

167 f"No documents indexed in collection {collection_id}" 

168 ) 

169 continue 

170 

171 # Load the FAISS index for this collection 

172 vector_store = rag_service.load_or_create_faiss_index( 

173 collection_id 

174 ) 

175 

176 # Search this collection's index 

177 docs_with_scores = ( 

178 vector_store.similarity_search_with_score( 

179 query, k=limit 

180 ) 

181 ) 

182 

183 # Add collection info to metadata and append to results 

184 for doc, score in docs_with_scores: 

185 if not doc.metadata: 

186 doc.metadata = {} 

187 doc.metadata["collection_id"] = collection_id 

188 doc.metadata["collection_name"] = collection.get( 

189 "name", "Unknown" 

190 ) 

191 all_docs_with_scores.append((doc, score)) 

192 

193 except Exception: 

194 logger.warning( 

195 f"Error searching collection {collection_id}" 

196 ) 

197 continue 

198 

199 # Sort all results by score (lower is better for distance) 

200 all_docs_with_scores.sort(key=lambda x: x[1]) 

201 

202 # Take top results across all collections 

203 docs_with_scores = all_docs_with_scores[:limit] 

204 

205 if not docs_with_scores: 

206 logger.info("No results found across any collections") 

207 return [] 

208 

209 # Convert Document objects to search results format 

210 results = [] 

211 for doc, score in docs_with_scores: 

212 # Extract metadata from Document object 

213 metadata = doc.metadata or {} 

214 

215 # Try both source_id and document_id for compatibility 

216 doc_id = metadata.get("source_id") or metadata.get( 

217 "document_id" 

218 ) 

219 

220 # Get title from metadata, with fallbacks 

221 title = ( 

222 metadata.get("document_title") 

223 or metadata.get("title") 

224 or (f"Document {doc_id}" if doc_id else "Untitled") 

225 ) 

226 

227 # Content is stored in page_content 

228 snippet = ( 

229 doc.page_content[:SNIPPET_LENGTH_LONG] + "..." 

230 if len(doc.page_content) > SNIPPET_LENGTH_LONG 

231 else doc.page_content 

232 ) 

233 

234 # Generate URL to document content 

235 # Default to root document page (shows all options: PDF, Text, Chunks, etc.) 

236 document_url = f"/library/document/{doc_id}" if doc_id else "#" 

237 

238 if doc_id: 

239 try: 

240 with get_user_db_session(self.username) as session: 

241 document = ( 

242 session.query(Document) 

243 .filter_by(id=doc_id) 

244 .first() 

245 ) 

246 if document: 246 ↛ 268line 246 didn't jump to line 268

247 from pathlib import Path 

248 

249 library_root = get_setting_from_snapshot( 

250 "research_library.storage_path", 

251 self.settings_snapshot, 

252 str(get_library_directory()), 

253 ) 

254 library_root = ( 

255 Path(os.path.expandvars(library_root)) 

256 .expanduser() 

257 .resolve() 

258 ) 

259 if PDFStorageManager.pdf_exists( 

260 library_root, document, session 

261 ): 

262 document_url = ( 

263 f"/library/document/{doc_id}/pdf" 

264 ) 

265 except Exception: 

266 logger.warning(f"Error querying document {doc_id}") 

267 

268 result = { 

269 "title": title, 

270 "snippet": snippet, 

271 "url": document_url, 

272 "link": document_url, # Add "link" for source extraction 

273 "source": "library", 

274 "source_type": "library", 

275 "relevance_score": float( 

276 1 / (1 + score) 

277 ), # Convert distance to similarity 

278 "metadata": metadata, 

279 } 

280 

281 results.append(result) 

282 

283 logger.info( 

284 f"Library RAG search returned {len(results)} results for query: {query}" 

285 ) 

286 return results 

287 

288 except Exception: 

289 logger.exception("Error searching library RAG") 

290 return [] 

291 

292 def _get_previews( 

293 self, 

294 query: str, 

295 limit: int = 10, 

296 llm_callback=None, 

297 extra_params: Optional[Dict[str, Any]] = None, 

298 ) -> List[Dict[str, Any]]: 

299 """ 

300 Get preview results for the query. 

301 Delegates to the search method. 

302 """ 

303 return self.search(query, limit, llm_callback, extra_params) 

304 

305 def _get_full_content( 

306 self, relevant_items: List[Dict[str, Any]] 

307 ) -> List[Dict[str, Any]]: 

308 """ 

309 Get full content for relevant library documents. 

310 Retrieves complete document text instead of just snippets. 

311 """ 

312 # Check if we should get full content 

313 from ...config import search_config 

314 

315 if ( 

316 hasattr(search_config, "SEARCH_SNIPPETS_ONLY") 

317 and search_config.SEARCH_SNIPPETS_ONLY 

318 ): 

319 logger.info("Snippet-only mode, skipping full content retrieval") 

320 return relevant_items 

321 

322 if not self.username: 

323 logger.error("Cannot retrieve full content without username") 

324 return relevant_items 

325 

326 try: 

327 from ...database.models.library import Document 

328 from ...database.session_context import get_user_db_session 

329 

330 # Retrieve full content for each document 

331 for item in relevant_items: 331 ↛ 354line 331 didn't jump to line 354 because the loop on line 331 didn't complete

332 doc_id = item.get("metadata", {}).get("document_id") 

333 if not doc_id: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true

334 continue 

335 

336 # Get full document text from database 

337 with get_user_db_session(self.username) as db_session: 

338 document = ( 

339 db_session.query(Document).filter_by(id=doc_id).first() 

340 ) 

341 

342 if document and document.text_content: 

343 # Replace snippet with full content 

344 item["content"] = document.text_content 

345 item["snippet"] = ( 

346 document.text_content[:SNIPPET_LENGTH_LONG] + "..." 

347 if len(document.text_content) > SNIPPET_LENGTH_LONG 

348 else document.text_content 

349 ) 

350 logger.debug( 

351 f"Retrieved full content for document {doc_id}" 

352 ) 

353 

354 return relevant_items 

355 

356 except Exception: 

357 logger.exception("Error retrieving full content from library") 

358 return relevant_items 

359 

360 def close(self): 

361 """Clean up resources.""" 

362 pass