Coverage for src / local_deep_research / research_library / deletion / services / collection_deletion.py: 94%
77 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Collection deletion service.
4Handles:
5- Full collection deletion with proper cleanup
6- Documents are preserved but unlinked
7- RAG index and chunks are deleted
8"""
10from typing import Dict, Any
12from loguru import logger
14from ....database.models.library import (
15 Collection,
16 DocumentCollection,
17 DocumentChunk,
18 CollectionFolder,
19 RAGIndex,
20 RagDocumentStatus,
21)
22from ....database.session_context import get_user_db_session
23from ..utils.cascade_helper import CascadeHelper
26class CollectionDeletionService:
27 """Service for collection deletion operations."""
29 def __init__(self, username: str):
30 """
31 Initialize collection deletion service.
33 Args:
34 username: Username for database session
35 """
36 self.username = username
38 def delete_collection(
39 self, collection_id: str, delete_orphaned_documents: bool = True
40 ) -> Dict[str, Any]:
41 """
42 Delete a collection and clean up all related data.
44 By default, orphaned documents (not in any other collection) are deleted.
45 Set delete_orphaned_documents=False to preserve all documents.
47 The following are deleted:
48 - DocumentChunks for this collection
49 - FAISS index files
50 - RAGIndex records
51 - CollectionFolder records (CASCADE)
52 - DocumentCollection links (CASCADE)
53 - RagDocumentStatus records (CASCADE)
54 - Orphaned documents (if delete_orphaned_documents=True)
56 Args:
57 collection_id: ID of the collection to delete
58 delete_orphaned_documents: If True, delete documents not in any
59 other collection after unlinking
61 Returns:
62 Dict with deletion details:
63 {
64 "deleted": True/False,
65 "collection_id": str,
66 "collection_name": str,
67 "chunks_deleted": int,
68 "documents_unlinked": int,
69 "indices_deleted": int,
70 "folders_deleted": int,
71 "orphaned_documents_deleted": int,
72 "error": str (if failed)
73 }
74 """
75 with get_user_db_session(self.username) as session:
76 try:
77 # Get collection
78 collection = session.query(Collection).get(collection_id)
79 if not collection:
80 return {
81 "deleted": False,
82 "collection_id": collection_id,
83 "error": "Collection not found",
84 }
86 collection_name = f"collection_{collection_id}"
87 result = {
88 "deleted": False,
89 "collection_id": collection_id,
90 "collection_name": collection.name,
91 "chunks_deleted": 0,
92 "documents_unlinked": 0,
93 "indices_deleted": 0,
94 "folders_deleted": 0,
95 "orphaned_documents_deleted": 0,
96 }
98 # 1. Get document IDs BEFORE deleting links (for orphan check)
99 doc_ids_in_collection = [
100 dc.document_id
101 for dc in session.query(DocumentCollection)
102 .filter_by(collection_id=collection_id)
103 .all()
104 ]
105 result["documents_unlinked"] = len(doc_ids_in_collection)
107 # 2. Delete DocumentChunks for this collection
108 result["chunks_deleted"] = (
109 CascadeHelper.delete_collection_chunks(
110 session, collection_name
111 )
112 )
114 # 3. Delete RAGIndex records and FAISS files
115 rag_result = CascadeHelper.delete_rag_indices_for_collection(
116 session, collection_name
117 )
118 result["indices_deleted"] = rag_result["deleted_indices"]
120 # 4. Count folders before deletion
121 result["folders_deleted"] = (
122 session.query(CollectionFolder)
123 .filter_by(collection_id=collection_id)
124 .count()
125 )
127 # 5. Delete DocumentCollection links explicitly before collection
128 session.query(DocumentCollection).filter_by(
129 collection_id=collection_id
130 ).delete(synchronize_session=False)
132 # 6. Delete linked folders explicitly
133 session.query(CollectionFolder).filter_by(
134 collection_id=collection_id
135 ).delete(synchronize_session=False)
137 # 7. Delete the collection itself
138 session.delete(collection)
140 # 8. Delete orphaned documents if requested
141 if delete_orphaned_documents: 141 ↛ 159line 141 didn't jump to line 159 because the condition on line 141 was always true
142 for doc_id in doc_ids_in_collection:
143 # Check if document is in any other collection
144 remaining = (
145 session.query(DocumentCollection)
146 .filter_by(document_id=doc_id)
147 .count()
148 )
149 if remaining == 0:
150 # Document is orphaned - delete it
151 CascadeHelper.delete_document_completely(
152 session, doc_id
153 )
154 result["orphaned_documents_deleted"] += 1
155 logger.info(
156 f"Deleted orphaned document {doc_id[:8]}..."
157 )
159 session.commit()
161 result["deleted"] = True
162 logger.info(
163 f"Deleted collection {collection_id[:8]}... "
164 f"({result['collection_name']}): {result['chunks_deleted']} chunks, "
165 f"{result['documents_unlinked']} documents unlinked, "
166 f"{result['orphaned_documents_deleted']} orphaned deleted"
167 )
169 return result
171 except Exception:
172 logger.exception(f"Failed to delete collection {collection_id}")
173 session.rollback()
174 return {
175 "deleted": False,
176 "collection_id": collection_id,
177 "error": "Failed to delete collection",
178 }
180 def delete_collection_index_only(
181 self, collection_id: str
182 ) -> Dict[str, Any]:
183 """
184 Delete only the RAG index for a collection, keeping the collection itself.
186 This is useful for rebuilding an index from scratch.
188 Args:
189 collection_id: ID of the collection
191 Returns:
192 Dict with deletion details
193 """
194 with get_user_db_session(self.username) as session:
195 try:
196 # Verify collection exists
197 collection = session.query(Collection).get(collection_id)
198 if not collection:
199 return {
200 "deleted": False,
201 "collection_id": collection_id,
202 "error": "Collection not found",
203 }
205 collection_name = f"collection_{collection_id}"
206 result = {
207 "deleted": False,
208 "collection_id": collection_id,
209 "chunks_deleted": 0,
210 "indices_deleted": 0,
211 "documents_reset": 0,
212 }
214 # 1. Delete DocumentChunks
215 result["chunks_deleted"] = (
216 CascadeHelper.delete_collection_chunks(
217 session, collection_name
218 )
219 )
221 # 2. Delete RAGIndex records and FAISS files
222 rag_result = CascadeHelper.delete_rag_indices_for_collection(
223 session, collection_name
224 )
225 result["indices_deleted"] = rag_result["deleted_indices"]
227 # 3. Reset DocumentCollection indexed status
228 result["documents_reset"] = (
229 session.query(DocumentCollection)
230 .filter_by(collection_id=collection_id)
231 .update({"indexed": False, "chunk_count": 0})
232 )
234 # 4. Delete RagDocumentStatus for this collection
235 session.query(RagDocumentStatus).filter_by(
236 collection_id=collection_id
237 ).delete(synchronize_session=False)
239 # 5. Reset collection embedding info
240 collection.embedding_model = None
241 collection.embedding_model_type = None
242 collection.embedding_dimension = None
243 collection.chunk_size = None
244 collection.chunk_overlap = None
246 session.commit()
247 result["deleted"] = True
249 logger.info(
250 f"Deleted index for collection {collection_id[:8]}...: "
251 f"{result['chunks_deleted']} chunks, "
252 f"{result['documents_reset']} documents reset"
253 )
255 return result
257 except Exception:
258 logger.exception(
259 f"Failed to delete index for collection {collection_id}"
260 )
261 session.rollback()
262 return {
263 "deleted": False,
264 "collection_id": collection_id,
265 "error": "Failed to delete collection index",
266 }
268 def get_deletion_preview(self, collection_id: str) -> Dict[str, Any]:
269 """
270 Get a preview of what will be deleted.
272 Useful for showing the user what will happen before confirming.
274 Args:
275 collection_id: ID of the collection
277 Returns:
278 Dict with preview information
279 """
280 with get_user_db_session(self.username) as session:
281 collection = session.query(Collection).get(collection_id)
282 if not collection:
283 return {"found": False, "collection_id": collection_id}
285 collection_name = f"collection_{collection_id}"
287 # Count documents
288 documents_count = (
289 session.query(DocumentCollection)
290 .filter_by(collection_id=collection_id)
291 .count()
292 )
294 # Count chunks
295 chunks_count = (
296 session.query(DocumentChunk)
297 .filter_by(collection_name=collection_name)
298 .count()
299 )
301 # Count folders
302 folders_count = (
303 session.query(CollectionFolder)
304 .filter_by(collection_id=collection_id)
305 .count()
306 )
308 # Check for RAG index
309 has_index = (
310 session.query(RAGIndex)
311 .filter_by(collection_name=collection_name)
312 .first()
313 is not None
314 )
316 return {
317 "found": True,
318 "collection_id": collection_id,
319 "name": collection.name,
320 "description": collection.description,
321 "is_default": collection.is_default,
322 "documents_count": documents_count,
323 "chunks_count": chunks_count,
324 "folders_count": folders_count,
325 "has_rag_index": has_index,
326 "embedding_model": collection.embedding_model,
327 }