Coverage for src / local_deep_research / research_library / deletion / services / document_deletion.py: 70%
119 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Document deletion service.
4Handles:
5- Full document deletion with proper cascade cleanup
6- Blob-only deletion (remove PDF, keep text)
7- Remove from collection (unlink or delete if orphaned)
8"""
10from typing import Dict, Any
12from loguru import logger
14from ....database.models.library import (
15 Document,
16 DocumentChunk,
17 DocumentCollection,
18)
19from ....database.session_context import get_user_db_session
20from ..utils.cascade_helper import CascadeHelper
23class DocumentDeletionService:
24 """Service for document deletion operations."""
26 def __init__(self, username: str):
27 """
28 Initialize document deletion service.
30 Args:
31 username: Username for database session
32 """
33 self.username = username
35 def delete_document(self, document_id: str) -> Dict[str, Any]:
36 """
37 Delete a document and ALL related data.
39 This method ensures complete cleanup:
40 - DocumentChunks (no FK constraint, manual cleanup required)
41 - DocumentBlob (CASCADE handles, but we track for stats)
42 - Filesystem files
43 - FAISS index entries
44 - DownloadTracker update
45 - DocumentCollection links (CASCADE)
46 - RagDocumentStatus (CASCADE)
48 Args:
49 document_id: ID of the document to delete
51 Returns:
52 Dict with deletion details:
53 {
54 "deleted": True/False,
55 "document_id": str,
56 "title": str,
57 "blob_deleted": bool,
58 "blob_size": int,
59 "chunks_deleted": int,
60 "collections_unlinked": int,
61 "error": str (if failed)
62 }
63 """
64 with get_user_db_session(self.username) as session:
65 try:
66 # Get document
67 document = session.query(Document).get(document_id)
68 if not document:
69 return {
70 "deleted": False,
71 "document_id": document_id,
72 "error": "Document not found",
73 }
75 title = document.title or document.filename or "Untitled"
76 result = {
77 "deleted": False,
78 "document_id": document_id,
79 "title": title,
80 "blob_deleted": False,
81 "blob_size": 0,
82 "chunks_deleted": 0,
83 "collections_unlinked": 0,
84 "file_deleted": False,
85 }
87 # 1. Get collections before deletion for chunk cleanup
88 collections = CascadeHelper.get_document_collections(
89 session, document_id
90 )
91 result["collections_unlinked"] = len(collections)
93 # 2. Delete DocumentChunks for ALL collections this document is in
94 total_chunks_deleted = 0
95 for collection_id in collections:
96 collection_name = f"collection_{collection_id}"
97 chunks_deleted = CascadeHelper.delete_document_chunks(
98 session, document_id, collection_name
99 )
100 total_chunks_deleted += chunks_deleted
101 result["chunks_deleted"] = total_chunks_deleted
103 # 3. Get blob size before deletion (for stats)
104 result["blob_size"] = CascadeHelper.get_document_blob_size(
105 session, document_id
106 )
107 result["blob_deleted"] = result["blob_size"] > 0
109 # 4. Delete filesystem file if exists
110 if document.storage_mode == "filesystem" and document.file_path: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 from ...utils import get_absolute_path_from_settings
113 try:
114 file_path = get_absolute_path_from_settings(
115 document.file_path
116 )
117 result["file_deleted"] = (
118 CascadeHelper.delete_filesystem_file(str(file_path))
119 )
120 except Exception:
121 logger.exception("Failed to delete filesystem file")
123 # 5. Update DownloadTracker
124 CascadeHelper.update_download_tracker(session, document)
126 # 6. Delete the document and all related records
127 CascadeHelper.delete_document_completely(session, document_id)
128 session.commit()
130 result["deleted"] = True
131 logger.info(
132 f"Deleted document {document_id[:8]}... ({title}): "
133 f"{total_chunks_deleted} chunks, "
134 f"{result['blob_size']} bytes blob"
135 )
137 return result
139 except Exception:
140 logger.exception(f"Failed to delete document {document_id}")
141 session.rollback()
142 return {
143 "deleted": False,
144 "document_id": document_id,
145 "error": "Failed to delete document",
146 }
148 def delete_blob_only(self, document_id: str) -> Dict[str, Any]:
149 """
150 Delete PDF binary but keep document metadata and text content.
152 This saves database space while preserving searchability.
154 Args:
155 document_id: ID of the document
157 Returns:
158 Dict with deletion details:
159 {
160 "deleted": True/False,
161 "document_id": str,
162 "bytes_freed": int,
163 "storage_mode_updated": bool,
164 "error": str (if failed)
165 }
166 """
167 with get_user_db_session(self.username) as session:
168 try:
169 # Get document
170 document = session.query(Document).get(document_id)
171 if not document:
172 return {
173 "deleted": False,
174 "document_id": document_id,
175 "bytes_freed": 0,
176 "error": "Document not found",
177 }
179 result = {
180 "deleted": False,
181 "document_id": document_id,
182 "bytes_freed": 0,
183 "storage_mode_updated": False,
184 }
186 # Handle based on storage mode
187 if document.storage_mode == "database":
188 # Delete blob from database
189 result["bytes_freed"] = CascadeHelper.delete_document_blob(
190 session, document_id
191 )
193 elif document.storage_mode == "filesystem": 193 ↛ 195line 193 didn't jump to line 195 because the condition on line 193 was never true
194 # Delete filesystem file
195 from ...utils import get_absolute_path_from_settings
197 if document.file_path:
198 try:
199 file_path = get_absolute_path_from_settings(
200 document.file_path
201 )
202 if file_path.exists():
203 result["bytes_freed"] = file_path.stat().st_size
204 CascadeHelper.delete_filesystem_file(
205 str(file_path)
206 )
207 except Exception:
208 logger.exception("Failed to delete filesystem file")
210 else:
211 # No blob to delete
212 return {
213 "deleted": False,
214 "document_id": document_id,
215 "bytes_freed": 0,
216 "error": "Document has no stored PDF (storage_mode is 'none')",
217 }
219 # Update document to indicate blob is deleted
220 document.storage_mode = "none"
221 document.file_path = "blob_deleted"
222 result["storage_mode_updated"] = True
224 session.commit()
225 result["deleted"] = True
227 logger.info(
228 f"Deleted blob for document {document_id[:8]}...: "
229 f"{result['bytes_freed']} bytes freed"
230 )
232 return result
234 except Exception:
235 logger.exception(
236 f"Failed to delete blob for document {document_id}"
237 )
238 session.rollback()
239 return {
240 "deleted": False,
241 "document_id": document_id,
242 "bytes_freed": 0,
243 "error": "Failed to delete document blob",
244 }
246 def remove_from_collection(
247 self,
248 document_id: str,
249 collection_id: str,
250 ) -> Dict[str, Any]:
251 """
252 Remove document from a collection.
254 If the document is not in any other collection after removal,
255 it will be completely deleted.
257 Args:
258 document_id: ID of the document
259 collection_id: ID of the collection
261 Returns:
262 Dict with operation details:
263 {
264 "unlinked": True/False,
265 "document_deleted": bool,
266 "document_id": str,
267 "collection_id": str,
268 "chunks_deleted": int,
269 "error": str (if failed)
270 }
271 """
272 with get_user_db_session(self.username) as session:
273 try:
274 # Verify document exists
275 document = session.query(Document).get(document_id)
276 if not document:
277 return {
278 "unlinked": False,
279 "document_deleted": False,
280 "document_id": document_id,
281 "collection_id": collection_id,
282 "error": "Document not found",
283 }
285 # Verify collection exists and document is in it
286 doc_collection = (
287 session.query(DocumentCollection)
288 .filter_by(
289 document_id=document_id, collection_id=collection_id
290 )
291 .first()
292 )
294 if not doc_collection:
295 return {
296 "unlinked": False,
297 "document_deleted": False,
298 "document_id": document_id,
299 "collection_id": collection_id,
300 "error": "Document not in this collection",
301 }
303 result = {
304 "unlinked": False,
305 "document_deleted": False,
306 "document_id": document_id,
307 "collection_id": collection_id,
308 "chunks_deleted": 0,
309 }
311 # Delete chunks for this document in this collection
312 collection_name = f"collection_{collection_id}"
313 result["chunks_deleted"] = CascadeHelper.delete_document_chunks(
314 session, document_id, collection_name
315 )
317 # Remove the link
318 session.delete(doc_collection)
319 session.flush()
321 # Check if document is in any other collection
322 remaining_count = CascadeHelper.count_document_in_collections(
323 session, document_id
324 )
326 if remaining_count == 0: 326 ↛ 330line 326 didn't jump to line 330 because the condition on line 326 was never true
327 # Document is orphaned - delete it completely
328 # Note: We're already in a session, so we need to do this
329 # directly rather than calling delete_document()
330 logger.info(
331 f"Document {document_id[:8]}... is orphaned, deleting"
332 )
334 # Delete remaining chunks (shouldn't be any, but be safe)
335 session.query(DocumentChunk).filter(
336 DocumentChunk.source_id == document_id,
337 DocumentChunk.source_type == "document",
338 ).delete(synchronize_session=False)
340 # Update DownloadTracker
341 CascadeHelper.update_download_tracker(session, document)
343 # Delete filesystem file if applicable
344 if (
345 document.storage_mode == "filesystem"
346 and document.file_path
347 ):
348 from ...utils import get_absolute_path_from_settings
350 try:
351 file_path = get_absolute_path_from_settings(
352 document.file_path
353 )
354 CascadeHelper.delete_filesystem_file(str(file_path))
355 except Exception:
356 logger.exception("Failed to delete filesystem file")
358 # Delete document and all related records
359 CascadeHelper.delete_document_completely(
360 session, document_id
361 )
362 result["document_deleted"] = True
364 session.commit()
365 result["unlinked"] = True
367 logger.info(
368 f"Removed document {document_id[:8]}... from collection "
369 f"{collection_id[:8]}... "
370 f"(deleted={result['document_deleted']})"
371 )
373 return result
375 except Exception:
376 logger.exception(
377 f"Failed to remove document {document_id} "
378 f"from collection {collection_id}"
379 )
380 session.rollback()
381 return {
382 "unlinked": False,
383 "document_deleted": False,
384 "document_id": document_id,
385 "collection_id": collection_id,
386 "error": "Failed to remove document from collection",
387 }
389 def get_deletion_preview(self, document_id: str) -> Dict[str, Any]:
390 """
391 Get a preview of what will be deleted.
393 Useful for showing the user what will happen before confirming.
395 Args:
396 document_id: ID of the document
398 Returns:
399 Dict with preview information
400 """
401 with get_user_db_session(self.username) as session:
402 document = session.query(Document).get(document_id)
403 if not document:
404 return {"found": False, "document_id": document_id}
406 collections = CascadeHelper.get_document_collections(
407 session, document_id
408 )
410 # Count chunks
411 total_chunks = (
412 session.query(DocumentChunk)
413 .filter(
414 DocumentChunk.source_id == document_id,
415 DocumentChunk.source_type == "document",
416 )
417 .count()
418 )
420 blob_size = CascadeHelper.get_document_blob_size(
421 session, document_id
422 )
424 return {
425 "found": True,
426 "document_id": document_id,
427 "title": document.title or document.filename or "Untitled",
428 "file_type": document.file_type,
429 "storage_mode": document.storage_mode,
430 "has_blob": blob_size > 0,
431 "blob_size": blob_size,
432 "has_text": bool(document.text_content),
433 "collections_count": len(collections),
434 "chunks_count": total_chunks,
435 }