Coverage for src / local_deep_research / research_library / deletion / utils / cascade_helper.py: 85%
126 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-01-11 00:51 +0000
1"""
2Cascade helper for deletion operations.
4Handles cleanup of related records that don't have proper FK constraints:
5- DocumentChunk (source_id has no FK constraint)
6- FAISS index files
7- Filesystem files
8"""
10from pathlib import Path
11from typing import Dict, List, Optional, Any
13from loguru import logger
14from sqlalchemy.orm import Session
16from ....database.models.library import (
17 Document,
18 DocumentBlob,
19 DocumentChunk,
20 DocumentCollection,
21 RAGIndex,
22)
23from ....database.models.download_tracker import DownloadTracker
26class CascadeHelper:
27 """Helper class for cleaning up related records during deletion."""
29 @staticmethod
30 def delete_document_chunks(
31 session: Session,
32 document_id: str,
33 collection_name: Optional[str] = None,
34 ) -> int:
35 """
36 Delete DocumentChunks for a document.
38 Since DocumentChunk.source_id has no FK constraint, we must manually
39 clean up chunks when deleting a document.
41 Args:
42 session: Database session
43 document_id: The document ID to delete chunks for
44 collection_name: Optional collection name to limit deletion scope
46 Returns:
47 Number of chunks deleted
48 """
49 query = session.query(DocumentChunk).filter(
50 DocumentChunk.source_id == document_id,
51 DocumentChunk.source_type == "document",
52 )
54 if collection_name:
55 query = query.filter(
56 DocumentChunk.collection_name == collection_name
57 )
59 count = query.delete(synchronize_session=False)
60 logger.debug(
61 f"Deleted {count} chunks for document {document_id[:8]}..."
62 + (f" in collection {collection_name}" if collection_name else "")
63 )
64 return count
66 @staticmethod
67 def delete_collection_chunks(
68 session: Session,
69 collection_name: str,
70 ) -> int:
71 """
72 Delete all DocumentChunks for a collection.
74 Args:
75 session: Database session
76 collection_name: The collection name (e.g., "collection_<uuid>")
78 Returns:
79 Number of chunks deleted
80 """
81 count = (
82 session.query(DocumentChunk)
83 .filter_by(collection_name=collection_name)
84 .delete(synchronize_session=False)
85 )
86 logger.debug(f"Deleted {count} chunks for collection {collection_name}")
87 return count
89 @staticmethod
90 def get_document_blob_size(session: Session, document_id: str) -> int:
91 """
92 Get the size of a document's blob in bytes.
94 Args:
95 session: Database session
96 document_id: The document ID
98 Returns:
99 Size in bytes, or 0 if no blob exists
100 """
101 blob = (
102 session.query(DocumentBlob)
103 .filter_by(document_id=document_id)
104 .first()
105 )
106 if blob and blob.pdf_binary:
107 return len(blob.pdf_binary)
108 return 0
110 @staticmethod
111 def delete_document_blob(session: Session, document_id: str) -> int:
112 """
113 Delete a document's blob record.
115 Note: This is typically handled by CASCADE, but can be called explicitly
116 for blob-only deletion.
118 Args:
119 session: Database session
120 document_id: The document ID
122 Returns:
123 Size of deleted blob in bytes
124 """
125 blob = (
126 session.query(DocumentBlob)
127 .filter_by(document_id=document_id)
128 .first()
129 )
130 if blob:
131 size = len(blob.pdf_binary) if blob.pdf_binary else 0
132 session.delete(blob)
133 logger.debug(
134 f"Deleted blob for document {document_id[:8]}... ({size} bytes)"
135 )
136 return size
137 return 0
139 @staticmethod
140 def delete_filesystem_file(file_path: Optional[str]) -> bool:
141 """
142 Delete a file from the filesystem.
144 Args:
145 file_path: Path to the file (can be relative or absolute)
147 Returns:
148 True if file was deleted, False otherwise
149 """
150 if not file_path:
151 return False
153 # Skip special path markers
154 if file_path in (
155 "metadata_only",
156 "text_only_not_stored",
157 "blob_deleted",
158 ):
159 return False
161 try:
162 path = Path(file_path)
163 if path.exists():
164 path.unlink()
165 logger.debug(f"Deleted filesystem file: {file_path}")
166 return True
167 except Exception:
168 logger.exception(f"Failed to delete filesystem file: {file_path}")
169 return False
171 @staticmethod
172 def delete_faiss_index_files(index_path: Optional[str]) -> bool:
173 """
174 Delete FAISS index files.
176 FAISS stores indices as .faiss and .pkl files.
178 Args:
179 index_path: Path to the FAISS index file (without extension)
181 Returns:
182 True if files were deleted, False otherwise
183 """
184 if not index_path:
185 return False
187 try:
188 path = Path(index_path)
189 deleted_any = False
191 # FAISS index file
192 faiss_file = path.with_suffix(".faiss")
193 if faiss_file.exists():
194 faiss_file.unlink()
195 logger.debug(f"Deleted FAISS index file: {faiss_file}")
196 deleted_any = True
198 # Pickle file for metadata
199 pkl_file = path.with_suffix(".pkl")
200 if pkl_file.exists():
201 pkl_file.unlink()
202 logger.debug(f"Deleted FAISS pkl file: {pkl_file}")
203 deleted_any = True
205 return deleted_any
206 except Exception:
207 logger.exception(f"Failed to delete FAISS files for: {index_path}")
208 return False
210 @staticmethod
211 def delete_rag_indices_for_collection(
212 session: Session,
213 collection_name: str,
214 ) -> Dict[str, Any]:
215 """
216 Delete RAGIndex records and their FAISS files for a collection.
218 Args:
219 session: Database session
220 collection_name: The collection name (e.g., "collection_<uuid>")
222 Returns:
223 Dict with deletion results
224 """
225 indices = (
226 session.query(RAGIndex)
227 .filter_by(collection_name=collection_name)
228 .all()
229 )
231 deleted_indices = 0
232 deleted_files = 0
234 for index in indices:
235 # Delete FAISS files
236 if CascadeHelper.delete_faiss_index_files(index.index_path): 236 ↛ 239line 236 didn't jump to line 239 because the condition on line 236 was always true
237 deleted_files += 1
239 session.delete(index)
240 deleted_indices += 1
242 logger.debug(
243 f"Deleted {deleted_indices} RAGIndex records and {deleted_files} "
244 f"FAISS files for collection {collection_name}"
245 )
247 return {
248 "deleted_indices": deleted_indices,
249 "deleted_files": deleted_files,
250 }
252 @staticmethod
253 def update_download_tracker(
254 session: Session,
255 document: Document,
256 ) -> bool:
257 """
258 Update DownloadTracker when a document is deleted.
260 The FK has SET NULL, but we also need to update is_downloaded flag.
262 Args:
263 session: Database session
264 document: The document being deleted
266 Returns:
267 True if tracker was updated
268 """
269 if not document.original_url:
270 return False
272 # Get URL hash using the same method as library_service
273 from ...utils import get_url_hash
275 try:
276 url_hash = get_url_hash(document.original_url)
277 tracker = (
278 session.query(DownloadTracker)
279 .filter_by(url_hash=url_hash)
280 .first()
281 )
283 if tracker: 283 ↛ 292line 283 didn't jump to line 292 because the condition on line 283 was always true
284 tracker.is_downloaded = False
285 tracker.file_path = None
286 logger.debug(
287 f"Updated DownloadTracker for document {document.id[:8]}..."
288 )
289 return True
290 except Exception:
291 logger.exception("Failed to update DownloadTracker")
292 return False
294 @staticmethod
295 def count_document_in_collections(
296 session: Session,
297 document_id: str,
298 ) -> int:
299 """
300 Count how many collections a document is in.
302 Args:
303 session: Database session
304 document_id: The document ID
306 Returns:
307 Number of collections the document is in
308 """
309 return (
310 session.query(DocumentCollection)
311 .filter_by(document_id=document_id)
312 .count()
313 )
315 @staticmethod
316 def get_document_collections(
317 session: Session,
318 document_id: str,
319 ) -> List[str]:
320 """
321 Get all collection IDs a document belongs to.
323 Args:
324 session: Database session
325 document_id: The document ID
327 Returns:
328 List of collection IDs
329 """
330 doc_collections = (
331 session.query(DocumentCollection.collection_id)
332 .filter_by(document_id=document_id)
333 .all()
334 )
335 return [dc.collection_id for dc in doc_collections]
337 @staticmethod
338 def remove_from_faiss_index(
339 username: str,
340 collection_name: str,
341 chunk_ids: List[str],
342 ) -> bool:
343 """
344 Remove specific chunks from a FAISS index.
346 Args:
347 username: Username for RAG service
348 collection_name: Collection name
349 chunk_ids: List of chunk IDs to remove
351 Returns:
352 True if successful
353 """
354 try:
355 from ..services.library_rag_service import LibraryRAGService
357 rag_service = LibraryRAGService(
358 username=username,
359 collection_name=collection_name,
360 )
362 # This uses the existing remove functionality
363 if hasattr(rag_service, "faiss_index") and rag_service.faiss_index:
364 if hasattr(rag_service.faiss_index, "delete"):
365 rag_service.faiss_index.delete(chunk_ids)
366 return True
367 except Exception:
368 logger.exception("Failed to remove chunks from FAISS index")
369 return False
371 @staticmethod
372 def delete_document_completely(
373 session: Session,
374 document_id: str,
375 ) -> bool:
376 """
377 Delete a document and all related records using query-based deletes.
379 This avoids ORM cascade issues where SQLAlchemy tries to set
380 DocumentBlob.document_id to NULL (which fails because it's a PK).
382 Deletes in order:
383 1. DocumentBlob
384 2. DocumentCollection links
385 3. Document itself
387 Note: DocumentChunks should be deleted separately before calling this,
388 as they may need collection-specific handling.
390 Args:
391 session: Database session
392 document_id: The document ID to delete
394 Returns:
395 True if document was deleted
396 """
397 # Delete blob (has document_id as PK, can't be nulled by cascade)
398 session.query(DocumentBlob).filter_by(document_id=document_id).delete(
399 synchronize_session=False
400 )
402 # Delete collection links
403 session.query(DocumentCollection).filter_by(
404 document_id=document_id
405 ).delete(synchronize_session=False)
407 # Delete document itself
408 deleted = (
409 session.query(Document)
410 .filter_by(id=document_id)
411 .delete(synchronize_session=False)
412 )
414 if deleted:
415 logger.debug(f"Deleted document {document_id[:8]}... completely")
417 return deleted > 0