Coverage for src / local_deep_research / research_library / deletion / services / document_deletion.py: 100%
122 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:55 +0000
1"""
2Document deletion service.
4Handles:
5- Full document deletion with proper cascade cleanup
6- Blob-only deletion (remove PDF, keep text)
7- Remove from collection (unlink or delete if orphaned)
8"""
10from typing import Dict, Any
12from loguru import logger
14from ....constants import FILE_PATH_BLOB_DELETED
15from ....database.models.library import (
16 Document,
17 DocumentChunk,
18 DocumentCollection,
19)
20from ....database.session_context import get_user_db_session
21from ..utils.cascade_helper import CascadeHelper
24class DocumentDeletionService:
25 """Service for document deletion operations."""
27 def __init__(self, username: str):
28 """
29 Initialize document deletion service.
31 Args:
32 username: Username for database session
33 """
34 self.username = username
36 def delete_document(self, document_id: str) -> Dict[str, Any]:
37 """
38 Delete a document and ALL related data.
40 This method ensures complete cleanup:
41 - DocumentChunks (no FK constraint, manual cleanup required)
42 - DocumentBlob (CASCADE handles, but we track for stats)
43 - Filesystem files
44 - FAISS index entries
45 - DownloadTracker update
46 - DocumentCollection links (CASCADE)
47 - RagDocumentStatus (CASCADE)
49 Args:
50 document_id: ID of the document to delete
52 Returns:
53 Dict with deletion details:
54 {
55 "deleted": True/False,
56 "document_id": str,
57 "title": str,
58 "blob_deleted": bool,
59 "blob_size": int,
60 "chunks_deleted": int,
61 "collections_unlinked": int,
62 "error": str (if failed)
63 }
64 """
65 with get_user_db_session(self.username) as session:
66 try:
67 # Get document
68 document = session.query(Document).get(document_id)
69 if not document:
70 return {
71 "deleted": False,
72 "document_id": document_id,
73 "error": "Document not found",
74 }
76 title = document.title or document.filename or "Untitled"
77 result: Dict[str, Any] = {
78 "deleted": False,
79 "document_id": document_id,
80 "title": title,
81 "blob_deleted": False,
82 "blob_size": 0,
83 "chunks_deleted": 0,
84 "collections_unlinked": 0,
85 "file_deleted": False,
86 }
88 # 1. Get collections before deletion for chunk cleanup
89 collections = CascadeHelper.get_document_collections(
90 session, document_id
91 )
92 result["collections_unlinked"] = len(collections)
94 # 2. Delete DocumentChunks for ALL collections this document is in
95 total_chunks_deleted = 0
96 for collection_id in collections:
97 collection_name = f"collection_{collection_id}"
98 chunks_deleted = CascadeHelper.delete_document_chunks(
99 session, document_id, collection_name
100 )
101 total_chunks_deleted += chunks_deleted
102 result["chunks_deleted"] = total_chunks_deleted
104 # 3. Get blob size before deletion (for stats)
105 result["blob_size"] = CascadeHelper.get_document_blob_size(
106 session, document_id
107 )
108 result["blob_deleted"] = result["blob_size"] > 0
110 # 4. Delete filesystem file if exists
111 if document.storage_mode == "filesystem" and document.file_path:
112 from ...utils import get_absolute_path_from_settings
114 try:
115 file_path = get_absolute_path_from_settings(
116 document.file_path
117 )
118 if file_path:
119 result["file_deleted"] = (
120 CascadeHelper.delete_filesystem_file(
121 str(file_path)
122 )
123 )
124 except Exception:
125 logger.exception("Failed to delete filesystem file")
127 # 5. Update DownloadTracker
128 CascadeHelper.update_download_tracker(session, document)
130 # 6. Delete the document and all related records
131 CascadeHelper.delete_document_completely(session, document_id)
132 session.commit()
134 result["deleted"] = True
135 logger.info(
136 f"Deleted document {document_id[:8]}... ({title}): "
137 f"{total_chunks_deleted} chunks, "
138 f"{result['blob_size']} bytes blob"
139 )
141 return result
143 except Exception:
144 logger.exception(f"Failed to delete document {document_id}")
145 session.rollback()
146 return {
147 "deleted": False,
148 "document_id": document_id,
149 "error": "Failed to delete document",
150 }
152 def delete_blob_only(self, document_id: str) -> Dict[str, Any]:
153 """
154 Delete PDF binary but keep document metadata and text content.
156 This saves database space while preserving searchability.
158 Args:
159 document_id: ID of the document
161 Returns:
162 Dict with deletion details:
163 {
164 "deleted": True/False,
165 "document_id": str,
166 "bytes_freed": int,
167 "storage_mode_updated": bool,
168 "error": str (if failed)
169 }
170 """
171 with get_user_db_session(self.username) as session:
172 try:
173 # Get document
174 document = session.query(Document).get(document_id)
175 if not document:
176 return {
177 "deleted": False,
178 "document_id": document_id,
179 "bytes_freed": 0,
180 "error": "Document not found",
181 }
183 result = {
184 "deleted": False,
185 "document_id": document_id,
186 "bytes_freed": 0,
187 "storage_mode_updated": False,
188 }
190 # Handle based on storage mode
191 if document.storage_mode == "database":
192 # Delete blob from database
193 result["bytes_freed"] = CascadeHelper.delete_document_blob(
194 session, document_id
195 )
197 elif document.storage_mode == "filesystem":
198 # Delete filesystem file
199 from ...utils import get_absolute_path_from_settings
201 if document.file_path:
202 try:
203 file_path = get_absolute_path_from_settings(
204 document.file_path
205 )
206 if file_path and file_path.is_file():
207 result["bytes_freed"] = file_path.stat().st_size
208 CascadeHelper.delete_filesystem_file(
209 str(file_path)
210 )
211 except Exception:
212 logger.exception("Failed to delete filesystem file")
214 else:
215 # No blob to delete
216 return {
217 "deleted": False,
218 "document_id": document_id,
219 "bytes_freed": 0,
220 "error": "Document has no stored PDF (storage_mode is 'none')",
221 }
223 # Update document to indicate blob is deleted
224 document.storage_mode = "none"
225 document.file_path = FILE_PATH_BLOB_DELETED
226 result["storage_mode_updated"] = True
228 session.commit()
229 result["deleted"] = True
231 logger.info(
232 f"Deleted blob for document {document_id[:8]}...: "
233 f"{result['bytes_freed']} bytes freed"
234 )
236 return result
238 except Exception:
239 logger.exception(
240 f"Failed to delete blob for document {document_id}"
241 )
242 session.rollback()
243 return {
244 "deleted": False,
245 "document_id": document_id,
246 "bytes_freed": 0,
247 "error": "Failed to delete document blob",
248 }
250 def remove_from_collection(
251 self,
252 document_id: str,
253 collection_id: str,
254 ) -> Dict[str, Any]:
255 """
256 Remove document from a collection.
258 If the document is not in any other collection after removal,
259 it will be completely deleted.
261 Args:
262 document_id: ID of the document
263 collection_id: ID of the collection
265 Returns:
266 Dict with operation details:
267 {
268 "unlinked": True/False,
269 "document_deleted": bool,
270 "document_id": str,
271 "collection_id": str,
272 "chunks_deleted": int,
273 "error": str (if failed)
274 }
275 """
276 with get_user_db_session(self.username) as session:
277 try:
278 # Verify document exists
279 document = session.query(Document).get(document_id)
280 if not document:
281 return {
282 "unlinked": False,
283 "document_deleted": False,
284 "document_id": document_id,
285 "collection_id": collection_id,
286 "error": "Document not found",
287 }
289 # Verify collection exists and document is in it
290 doc_collection = (
291 session.query(DocumentCollection)
292 .filter_by(
293 document_id=document_id, collection_id=collection_id
294 )
295 .first()
296 )
298 if not doc_collection:
299 return {
300 "unlinked": False,
301 "document_deleted": False,
302 "document_id": document_id,
303 "collection_id": collection_id,
304 "error": "Document not in this collection",
305 }
307 result = {
308 "unlinked": False,
309 "document_deleted": False,
310 "document_id": document_id,
311 "collection_id": collection_id,
312 "chunks_deleted": 0,
313 }
315 # Delete chunks for this document in this collection
316 collection_name = f"collection_{collection_id}"
317 result["chunks_deleted"] = CascadeHelper.delete_document_chunks(
318 session, document_id, collection_name
319 )
321 # Remove the link
322 session.delete(doc_collection)
323 session.flush()
325 # Check if document is in any other collection
326 remaining_count = CascadeHelper.count_document_in_collections(
327 session, document_id
328 )
330 if remaining_count == 0:
331 # Document is orphaned - delete it completely
332 # Note: We're already in a session, so we need to do this
333 # directly rather than calling delete_document()
334 logger.info(
335 f"Document {document_id[:8]}... is orphaned, deleting"
336 )
338 # Delete remaining chunks (shouldn't be any, but be safe)
339 session.query(DocumentChunk).filter(
340 DocumentChunk.source_id == document_id,
341 DocumentChunk.source_type == "document",
342 ).delete(synchronize_session=False)
344 # Update DownloadTracker
345 CascadeHelper.update_download_tracker(session, document)
347 # Delete filesystem file if applicable
348 if (
349 document.storage_mode == "filesystem"
350 and document.file_path
351 ):
352 from ...utils import get_absolute_path_from_settings
354 try:
355 file_path = get_absolute_path_from_settings(
356 document.file_path
357 )
358 if file_path:
359 CascadeHelper.delete_filesystem_file(
360 str(file_path)
361 )
362 except Exception:
363 logger.exception("Failed to delete filesystem file")
365 # Delete document and all related records
366 CascadeHelper.delete_document_completely(
367 session, document_id
368 )
369 result["document_deleted"] = True
371 session.commit()
372 result["unlinked"] = True
374 logger.info(
375 f"Removed document {document_id[:8]}... from collection "
376 f"{collection_id[:8]}... "
377 f"(deleted={result['document_deleted']})"
378 )
380 return result
382 except Exception:
383 logger.exception(
384 f"Failed to remove document {document_id} "
385 f"from collection {collection_id}"
386 )
387 session.rollback()
388 return {
389 "unlinked": False,
390 "document_deleted": False,
391 "document_id": document_id,
392 "collection_id": collection_id,
393 "error": "Failed to remove document from collection",
394 }
396 def get_deletion_preview(self, document_id: str) -> Dict[str, Any]:
397 """
398 Get a preview of what will be deleted.
400 Useful for showing the user what will happen before confirming.
402 Args:
403 document_id: ID of the document
405 Returns:
406 Dict with preview information
407 """
408 with get_user_db_session(self.username) as session:
409 document = session.query(Document).get(document_id)
410 if not document:
411 return {"found": False, "document_id": document_id}
413 collections = CascadeHelper.get_document_collections(
414 session, document_id
415 )
417 # Count chunks
418 total_chunks = (
419 session.query(DocumentChunk)
420 .filter(
421 DocumentChunk.source_id == document_id,
422 DocumentChunk.source_type == "document",
423 )
424 .count()
425 )
427 blob_size = CascadeHelper.get_document_blob_size(
428 session, document_id
429 )
431 return {
432 "found": True,
433 "document_id": document_id,
434 "title": document.title or document.filename or "Untitled",
435 "file_type": document.file_type,
436 "storage_mode": document.storage_mode,
437 "has_blob": blob_size > 0,
438 "blob_size": blob_size,
439 "has_text": bool(document.text_content),
440 "collections_count": len(collections),
441 "chunks_count": total_chunks,
442 }