Coverage for src / local_deep_research / research_library / deletion / services / bulk_deletion.py: 92%
67 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Bulk deletion service.
4Handles bulk operations:
5- Delete multiple documents
6- Delete blobs for multiple documents
7- Remove multiple documents from a collection
8"""
10from typing import Dict, Any, List
12from loguru import logger
14from .document_deletion import DocumentDeletionService
17class BulkDeletionService:
18 """Service for bulk deletion operations."""
20 def __init__(self, username: str):
21 """
22 Initialize bulk deletion service.
24 Args:
25 username: Username for database session
26 """
27 self.username = username
28 self._document_service = DocumentDeletionService(username)
30 def delete_documents(self, document_ids: List[str]) -> Dict[str, Any]:
31 """
32 Delete multiple documents.
34 Args:
35 document_ids: List of document IDs to delete
37 Returns:
38 Dict with bulk deletion results:
39 {
40 "total": int,
41 "deleted": int,
42 "failed": int,
43 "total_chunks_deleted": int,
44 "total_bytes_freed": int,
45 "results": List[Dict],
46 "errors": List[Dict]
47 }
48 """
49 result = {
50 "total": len(document_ids),
51 "deleted": 0,
52 "failed": 0,
53 "total_chunks_deleted": 0,
54 "total_bytes_freed": 0,
55 "results": [],
56 "errors": [],
57 }
59 for document_id in document_ids:
60 delete_result = self._document_service.delete_document(document_id)
62 if delete_result.get("deleted"):
63 result["deleted"] += 1
64 result["total_chunks_deleted"] += delete_result.get(
65 "chunks_deleted", 0
66 )
67 result["total_bytes_freed"] += delete_result.get("blob_size", 0)
68 result["results"].append(
69 {
70 "document_id": document_id,
71 "title": delete_result.get("title", "Unknown"),
72 "chunks_deleted": delete_result.get(
73 "chunks_deleted", 0
74 ),
75 "blob_size": delete_result.get("blob_size", 0),
76 }
77 )
78 else:
79 result["failed"] += 1
80 result["errors"].append(
81 {
82 "document_id": document_id,
83 "error": delete_result.get("error", "Unknown error"),
84 }
85 )
87 logger.info(
88 f"Bulk delete: {result['deleted']}/{result['total']} documents, "
89 f"{result['total_chunks_deleted']} chunks, "
90 f"{result['total_bytes_freed']} bytes"
91 )
93 return result
95 def delete_blobs(self, document_ids: List[str]) -> Dict[str, Any]:
96 """
97 Delete PDF binaries for multiple documents, keeping text content.
99 Args:
100 document_ids: List of document IDs to delete blobs for
102 Returns:
103 Dict with bulk blob deletion results:
104 {
105 "total": int,
106 "deleted": int,
107 "skipped": int,
108 "failed": int,
109 "total_bytes_freed": int,
110 "results": List[Dict],
111 "errors": List[Dict]
112 }
113 """
114 result = {
115 "total": len(document_ids),
116 "deleted": 0,
117 "skipped": 0,
118 "failed": 0,
119 "total_bytes_freed": 0,
120 "results": [],
121 "errors": [],
122 }
124 for document_id in document_ids:
125 delete_result = self._document_service.delete_blob_only(document_id)
127 if delete_result.get("deleted"):
128 result["deleted"] += 1
129 result["total_bytes_freed"] += delete_result.get(
130 "bytes_freed", 0
131 )
132 result["results"].append(
133 {
134 "document_id": document_id,
135 "bytes_freed": delete_result.get("bytes_freed", 0),
136 }
137 )
138 elif "no stored PDF" in delete_result.get("error", "").lower(): 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 result["skipped"] += 1
140 else:
141 result["failed"] += 1
142 result["errors"].append(
143 {
144 "document_id": document_id,
145 "error": delete_result.get("error", "Unknown error"),
146 }
147 )
149 logger.info(
150 f"Bulk blob delete: {result['deleted']}/{result['total']} blobs, "
151 f"{result['total_bytes_freed']} bytes freed"
152 )
154 return result
156 def remove_documents_from_collection(
157 self,
158 document_ids: List[str],
159 collection_id: str,
160 ) -> Dict[str, Any]:
161 """
162 Remove multiple documents from a collection.
164 Documents that are not in any other collection will be deleted.
166 Args:
167 document_ids: List of document IDs to remove
168 collection_id: ID of the collection
170 Returns:
171 Dict with bulk removal results:
172 {
173 "total": int,
174 "unlinked": int,
175 "deleted": int,
176 "failed": int,
177 "total_chunks_deleted": int,
178 "results": List[Dict],
179 "errors": List[Dict]
180 }
181 """
182 result = {
183 "total": len(document_ids),
184 "unlinked": 0,
185 "deleted": 0,
186 "failed": 0,
187 "total_chunks_deleted": 0,
188 "results": [],
189 "errors": [],
190 }
192 for document_id in document_ids:
193 remove_result = self._document_service.remove_from_collection(
194 document_id, collection_id
195 )
197 if remove_result.get("unlinked"):
198 result["unlinked"] += 1
199 result["total_chunks_deleted"] += remove_result.get(
200 "chunks_deleted", 0
201 )
202 if remove_result.get("document_deleted"):
203 result["deleted"] += 1
204 result["results"].append(
205 {
206 "document_id": document_id,
207 "document_deleted": remove_result.get(
208 "document_deleted", False
209 ),
210 "chunks_deleted": remove_result.get(
211 "chunks_deleted", 0
212 ),
213 }
214 )
215 else:
216 result["failed"] += 1
217 result["errors"].append(
218 {
219 "document_id": document_id,
220 "error": remove_result.get("error", "Unknown error"),
221 }
222 )
224 logger.info(
225 f"Bulk remove from collection: {result['unlinked']}/{result['total']} "
226 f"unlinked, {result['deleted']} deleted, "
227 f"{result['total_chunks_deleted']} chunks"
228 )
230 return result
232 def get_bulk_preview(
233 self,
234 document_ids: List[str],
235 operation: str = "delete",
236 ) -> Dict[str, Any]:
237 """
238 Get a preview of what will be affected by a bulk operation.
240 Args:
241 document_ids: List of document IDs
242 operation: Type of operation ("delete", "delete_blobs")
244 Returns:
245 Dict with preview information
246 """
247 from ....database.models.library import Document, DocumentChunk
248 from ....database.session_context import get_user_db_session
249 from ..utils.cascade_helper import CascadeHelper
251 result = {
252 "total_documents": len(document_ids),
253 "found_documents": 0,
254 "total_blob_size": 0,
255 "documents_with_blobs": 0,
256 "total_chunks": 0,
257 "documents": [],
258 }
260 with get_user_db_session(self.username) as session:
261 for document_id in document_ids:
262 document = session.query(Document).get(document_id)
263 if not document: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 continue
266 result["found_documents"] += 1
267 blob_size = CascadeHelper.get_document_blob_size(
268 session, document_id
269 )
271 if blob_size > 0: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 result["documents_with_blobs"] += 1
273 result["total_blob_size"] += blob_size
275 chunks = (
276 session.query(DocumentChunk)
277 .filter(
278 DocumentChunk.source_id == document_id,
279 DocumentChunk.source_type == "document",
280 )
281 .count()
282 )
283 result["total_chunks"] += chunks
285 result["documents"].append(
286 {
287 "id": document_id,
288 "title": document.title
289 or document.filename
290 or "Untitled",
291 "has_blob": blob_size > 0,
292 "blob_size": blob_size,
293 "chunks_count": chunks,
294 }
295 )
297 return result